Skip to main content

tempo_commonware_node/consensus/
engine.rs

1//! [`Engine`] drives the application and is modelled after commonware's [`alto`] toy blockchain.
2//!
3//! [`alto`]: https://github.com/commonwarexyx/alto
4
5use std::{
6    num::{NonZeroU16, NonZeroU64, NonZeroUsize},
7    time::{Duration, Instant},
8};
9
10use commonware_broadcast::buffered;
11use commonware_consensus::{
12    Reporters, marshal,
13    simplex::scheme::bls12381_threshold::vrf::Scheme,
14    types::{FixedEpocher, ViewDelta},
15};
16use commonware_cryptography::{
17    Signer as _,
18    bls12381::primitives::{group::Share, variant::MinSig},
19    certificate::Scheme as _,
20    ed25519::{PrivateKey, PublicKey},
21};
22use commonware_p2p::{AddressableManager, Blocker, Receiver, Sender};
23use commonware_parallel::Sequential;
24use commonware_runtime::{
25    BufferPooler, Clock, ContextCell, Handle, Metrics, Network, Pacer, Spawner, Storage,
26    buffer::paged::CacheRef, spawn_cell,
27};
28use commonware_storage::archive::immutable;
29use commonware_utils::{NZU16, NZU64, NZUsize};
30use eyre::{OptionExt as _, WrapErr as _};
31use futures::future::try_join_all;
32use rand_08::{CryptoRng, Rng};
33use tempo_node::TempoFullNode;
34use tracing::info;
35
36use crate::{
37    config::BLOCKS_FREEZER_TABLE_INITIAL_SIZE_BYTES,
38    consensus::application,
39    dkg,
40    epoch::{self, SchemeProvider},
41    peer_manager, subblocks,
42};
43
44use super::block::Block;
45
46// A bunch of constants to configure commonwarexyz singletons and copied over form alto.
47
48/// To better support peers near tip during network instability, we multiply
49/// the consensus activity timeout by this factor.
50const SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER: u64 = 10;
51const PRUNABLE_ITEMS_PER_SECTION: NonZeroU64 = NZU64!(4_096);
52const IMMUTABLE_ITEMS_PER_SECTION: NonZeroU64 = NZU64!(262_144);
53const FREEZER_TABLE_RESIZE_FREQUENCY: u8 = 4;
54const FREEZER_TABLE_RESIZE_CHUNK_SIZE: u32 = 2u32.pow(16); // 3MB
55const FREEZER_VALUE_TARGET_SIZE: u64 = 1024 * 1024 * 1024; // 1GB
56const FREEZER_VALUE_COMPRESSION: Option<u8> = Some(3);
57const REPLAY_BUFFER: NonZeroUsize = NZUsize!(8 * 1024 * 1024); // 8MB
58const WRITE_BUFFER: NonZeroUsize = NZUsize!(1024 * 1024); // 1MB
59const BUFFER_POOL_PAGE_SIZE: NonZeroU16 = NZU16!(4_096); // 4KB
60const BUFFER_POOL_CAPACITY: NonZeroUsize = NZUsize!(8_192); // 32MB
61const MAX_REPAIR: NonZeroUsize = NZUsize!(20);
62
63// Ensure the marshal delivers blocks sequentially.
64const MAX_PENDING_ACKS: NonZeroUsize = NZUsize!(1);
65
66/// Settings for [`Engine`].
67///
68// XXX: Mostly a one-to-one copy of alto for now. We also put the context in here
69// because there doesn't really seem to be a point putting it into an extra initializer.
70#[derive(Clone)]
71pub struct Builder<TBlocker, TPeerManager> {
72    pub fee_recipient: alloy_primitives::Address,
73
74    pub execution_node: Option<TempoFullNode>,
75
76    pub blocker: TBlocker,
77    pub peer_manager: TPeerManager,
78
79    pub partition_prefix: String,
80    pub signer: PrivateKey,
81    pub share: Option<Share>,
82
83    pub mailbox_size: usize,
84    pub deque_size: usize,
85
86    pub time_to_propose: Duration,
87    pub time_to_collect_notarizations: Duration,
88    pub time_to_retry_nullify_broadcast: Duration,
89    pub time_for_peer_response: Duration,
90    pub views_to_track: u64,
91    pub views_until_leader_skip: u64,
92    pub payload_interrupt_time: Duration,
93    pub new_payload_wait_time: Duration,
94    pub time_to_build_subblock: Duration,
95    pub subblock_broadcast_interval: Duration,
96    pub fcu_heartbeat_interval: Duration,
97    pub with_subblocks: bool,
98
99    pub feed_state: crate::feed::FeedStateHandle,
100}
101
102impl<TBlocker, TPeerManager> Builder<TBlocker, TPeerManager>
103where
104    TBlocker: Blocker<PublicKey = PublicKey> + Sync,
105    TPeerManager: AddressableManager<PublicKey = PublicKey> + Sync,
106{
107    pub fn with_execution_node(mut self, execution_node: TempoFullNode) -> Self {
108        self.execution_node = Some(execution_node);
109        self
110    }
111
112    pub async fn try_init<TContext>(
113        self,
114        context: TContext,
115    ) -> eyre::Result<Engine<TContext, TBlocker, TPeerManager>>
116    where
117        TContext: Clock
118            + governor::clock::Clock
119            + Rng
120            + CryptoRng
121            + Pacer
122            + Spawner
123            + Storage
124            + Metrics
125            + Network
126            + BufferPooler,
127    {
128        let execution_node = self
129            .execution_node
130            .clone()
131            .ok_or_eyre("execution_node must be set using with_execution_node()")?;
132
133        let epoch_length = execution_node
134            .chain_spec()
135            .info
136            .epoch_length()
137            .ok_or_eyre("chainspec did not contain epochLength; cannot go on without it")?;
138
139        let epoch_strategy = FixedEpocher::new(NZU64!(epoch_length));
140
141        info!(
142            identity = %self.signer.public_key(),
143            "using public ed25519 verifying key derived from provided private ed25519 signing key",
144        );
145
146        let page_cache_ref =
147            CacheRef::from_pooler(&context, BUFFER_POOL_PAGE_SIZE, BUFFER_POOL_CAPACITY);
148
149        let scheme_provider = SchemeProvider::new();
150
151        const FINALIZATIONS_BY_HEIGHT: &str = "finalizations-by-height";
152        let start = Instant::now();
153        let finalizations_by_height = immutable::Archive::init(
154            context.with_label("finalizations_by_height"),
155            immutable::Config {
156                metadata_partition: format!(
157                    "{}-{FINALIZATIONS_BY_HEIGHT}-metadata",
158                    self.partition_prefix,
159                ),
160
161                freezer_table_partition: format!(
162                    "{}-{FINALIZATIONS_BY_HEIGHT}-freezer-table",
163                    self.partition_prefix,
164                ),
165
166                freezer_table_initial_size: BLOCKS_FREEZER_TABLE_INITIAL_SIZE_BYTES,
167                freezer_table_resize_frequency: FREEZER_TABLE_RESIZE_FREQUENCY,
168                freezer_table_resize_chunk_size: FREEZER_TABLE_RESIZE_CHUNK_SIZE,
169
170                freezer_key_partition: format!(
171                    "{}-{FINALIZATIONS_BY_HEIGHT}-freezer-key",
172                    self.partition_prefix,
173                ),
174                freezer_key_page_cache: page_cache_ref.clone(),
175
176                freezer_value_partition: format!(
177                    "{}-{FINALIZATIONS_BY_HEIGHT}-freezer-value",
178                    self.partition_prefix,
179                ),
180                freezer_value_target_size: FREEZER_VALUE_TARGET_SIZE,
181                freezer_value_compression: FREEZER_VALUE_COMPRESSION,
182
183                ordinal_partition: format!(
184                    "{}-{FINALIZATIONS_BY_HEIGHT}-ordinal",
185                    self.partition_prefix,
186                ),
187
188                items_per_section: IMMUTABLE_ITEMS_PER_SECTION,
189                codec_config: Scheme::<PublicKey, MinSig>::certificate_codec_config_unbounded(),
190
191                replay_buffer: REPLAY_BUFFER,
192                freezer_key_write_buffer: WRITE_BUFFER,
193                freezer_value_write_buffer: WRITE_BUFFER,
194                ordinal_write_buffer: WRITE_BUFFER,
195            },
196        )
197        .await
198        .wrap_err("failed to initialize finalizations by height archive")?;
199        info!(elapsed = ?start.elapsed(), "restored finalizations by height archive");
200
201        const FINALIZED_BLOCKS: &str = "finalized_blocks";
202        let start = Instant::now();
203        let finalized_blocks = immutable::Archive::init(
204            context.with_label("finalized_blocks"),
205            immutable::Config {
206                metadata_partition: format!(
207                    "{}-{FINALIZED_BLOCKS}-metadata",
208                    self.partition_prefix,
209                ),
210
211                freezer_table_partition: format!(
212                    "{}-{FINALIZED_BLOCKS}-freezer-table",
213                    self.partition_prefix,
214                ),
215
216                freezer_table_initial_size: BLOCKS_FREEZER_TABLE_INITIAL_SIZE_BYTES,
217                freezer_table_resize_frequency: FREEZER_TABLE_RESIZE_FREQUENCY,
218                freezer_table_resize_chunk_size: FREEZER_TABLE_RESIZE_CHUNK_SIZE,
219
220                freezer_key_partition: format!(
221                    "{}-{FINALIZED_BLOCKS}-freezer-key",
222                    self.partition_prefix,
223                ),
224                freezer_key_page_cache: page_cache_ref.clone(),
225
226                freezer_value_partition: format!(
227                    "{}-{FINALIZED_BLOCKS}-freezer-value",
228                    self.partition_prefix,
229                ),
230                freezer_value_target_size: FREEZER_VALUE_TARGET_SIZE,
231                freezer_value_compression: FREEZER_VALUE_COMPRESSION,
232
233                ordinal_partition: format!("{}-{FINALIZED_BLOCKS}-ordinal", self.partition_prefix,),
234                items_per_section: IMMUTABLE_ITEMS_PER_SECTION,
235                codec_config: (),
236
237                replay_buffer: REPLAY_BUFFER,
238                freezer_key_write_buffer: WRITE_BUFFER,
239                freezer_value_write_buffer: WRITE_BUFFER,
240                ordinal_write_buffer: WRITE_BUFFER,
241            },
242        )
243        .await
244        .wrap_err("failed to initialize finalizations by height archive")?;
245        info!(elapsed = ?start.elapsed(), "restored finalizations by height archive");
246
247        // TODO(janis): forward `last_finalized_height` to application so it can
248        // forward missing blocks to EL.
249        let (marshal, marshal_mailbox, last_finalized_height) = marshal::core::Actor::init(
250            context.with_label("marshal"),
251            finalizations_by_height,
252            finalized_blocks,
253            marshal::Config {
254                provider: scheme_provider.clone(),
255                epocher: epoch_strategy.clone(),
256                partition_prefix: self.partition_prefix.clone(),
257                mailbox_size: self.mailbox_size,
258                view_retention_timeout: ViewDelta::new(
259                    self.views_to_track
260                        .saturating_mul(SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER),
261                ),
262                prunable_items_per_section: PRUNABLE_ITEMS_PER_SECTION,
263
264                page_cache: page_cache_ref.clone(),
265
266                replay_buffer: REPLAY_BUFFER,
267                key_write_buffer: WRITE_BUFFER,
268                value_write_buffer: WRITE_BUFFER,
269                max_repair: MAX_REPAIR,
270                max_pending_acks: MAX_PENDING_ACKS,
271                block_codec_config: (),
272
273                strategy: Sequential,
274            },
275        )
276        .await;
277
278        let (executor, executor_mailbox) = crate::executor::init(
279            context.with_label("executor"),
280            crate::executor::Config {
281                execution_node: execution_node.clone(),
282                last_finalized_height,
283                marshal: marshal_mailbox.clone(),
284                fcu_heartbeat_interval: self.fcu_heartbeat_interval,
285            },
286        )
287        .wrap_err("failed initialization executor actor")?;
288
289        let (peer_manager, peer_manager_mailbox) = peer_manager::init(
290            context.with_label("peer_manager"),
291            peer_manager::Config {
292                execution_node: execution_node.clone(),
293                executor: executor_mailbox.clone(),
294                oracle: self.peer_manager.clone(),
295                epoch_strategy: epoch_strategy.clone(),
296                last_finalized_height,
297            },
298        );
299
300        let (broadcast, broadcast_mailbox) = buffered::Engine::new(
301            context.with_label("broadcast"),
302            buffered::Config {
303                public_key: self.signer.public_key(),
304                mailbox_size: self.mailbox_size,
305                deque_size: self.deque_size,
306                peer_provider: peer_manager_mailbox.clone(),
307                priority: true,
308                codec_config: (),
309            },
310        );
311
312        // XXX: All hard-coded values here are the same as prior to commonware
313        // making the resolver configurable in
314        // https://github.com/commonwarexyz/monorepo/commit/92870f39b4a9e64a28434b3729ebff5aba67fb4e
315        let resolver_config = commonware_consensus::marshal::resolver::p2p::Config {
316            public_key: self.signer.public_key(),
317            peer_provider: peer_manager_mailbox.clone(),
318            mailbox_size: self.mailbox_size,
319            blocker: self.blocker.clone(),
320            initial: Duration::from_secs(1),
321            timeout: Duration::from_secs(2),
322            fetch_retry_timeout: Duration::from_millis(100),
323            priority_requests: false,
324            priority_responses: false,
325        };
326
327        let subblocks = self.with_subblocks.then(|| {
328            subblocks::Actor::new(subblocks::Config {
329                context: context.clone(),
330                signer: self.signer.clone(),
331                scheme_provider: scheme_provider.clone(),
332                node: execution_node.clone(),
333                fee_recipient: self.fee_recipient,
334                time_to_build_subblock: self.time_to_build_subblock,
335                subblock_broadcast_interval: self.subblock_broadcast_interval,
336                epoch_strategy: epoch_strategy.clone(),
337            })
338        });
339
340        let (feed, feed_mailbox) = crate::feed::init(
341            context.with_label("feed"),
342            marshal_mailbox.clone(),
343            epoch_strategy.clone(),
344            execution_node.clone(),
345            self.feed_state,
346        );
347
348        let (application, application_mailbox) = application::init(super::application::Config {
349            context: context.with_label("application"),
350            fee_recipient: self.fee_recipient,
351            mailbox_size: self.mailbox_size,
352            marshal: marshal_mailbox.clone(),
353            execution_node: execution_node.clone(),
354            executor: executor_mailbox.clone(),
355            payload_resolve_time: self.payload_interrupt_time,
356            payload_return_time: self.new_payload_wait_time,
357            subblocks: subblocks.as_ref().map(|s| s.mailbox()),
358            scheme_provider: scheme_provider.clone(),
359            epoch_strategy: epoch_strategy.clone(),
360        })
361        .await
362        .wrap_err("failed initializing application actor")?;
363
364        let (epoch_manager, epoch_manager_mailbox) = epoch::manager::init(
365            context.with_label("epoch_manager"),
366            epoch::manager::Config {
367                application: application_mailbox.clone(),
368                blocker: self.blocker.clone(),
369                page_cache: page_cache_ref,
370                epoch_strategy: epoch_strategy.clone(),
371                time_for_peer_response: self.time_for_peer_response,
372                time_to_propose: self.time_to_propose,
373                mailbox_size: self.mailbox_size,
374                subblocks: subblocks.as_ref().map(|s| s.mailbox()),
375                marshal: marshal_mailbox.clone(),
376                feed: feed_mailbox.clone(),
377                scheme_provider: scheme_provider.clone(),
378                time_to_collect_notarizations: self.time_to_collect_notarizations,
379                time_to_retry_nullify_broadcast: self.time_to_retry_nullify_broadcast,
380                partition_prefix: format!("{}_epoch_manager", self.partition_prefix),
381                views_to_track: ViewDelta::new(self.views_to_track),
382                views_until_leader_skip: ViewDelta::new(self.views_until_leader_skip),
383            },
384        );
385
386        let (dkg_manager, dkg_manager_mailbox) = dkg::manager::init(
387            context.with_label("dkg_manager"),
388            dkg::manager::Config {
389                epoch_manager: epoch_manager_mailbox.clone(),
390                epoch_strategy: epoch_strategy.clone(),
391                execution_node,
392                initial_share: self.share.clone(),
393                mailbox_size: self.mailbox_size,
394                marshal: marshal_mailbox,
395                namespace: crate::config::NAMESPACE.to_vec(),
396                me: self.signer.clone(),
397                partition_prefix: format!("{}_dkg_manager", self.partition_prefix),
398            },
399        )
400        .await
401        .wrap_err("failed initializing dkg manager")?;
402
403        Ok(Engine {
404            context: ContextCell::new(context),
405
406            broadcast,
407            broadcast_mailbox,
408
409            dkg_manager,
410            dkg_manager_mailbox,
411
412            application,
413
414            executor,
415            executor_mailbox,
416
417            resolver_config,
418            marshal,
419
420            epoch_manager,
421            epoch_manager_mailbox,
422
423            peer_manager,
424            peer_manager_mailbox,
425
426            feed,
427
428            subblocks,
429        })
430    }
431}
432
433pub struct Engine<TContext, TBlocker, TPeerManager>
434where
435    TContext: BufferPooler
436        + Clock
437        + governor::clock::Clock
438        + Rng
439        + CryptoRng
440        + Metrics
441        + Network
442        + Pacer
443        + Spawner
444        + Storage,
445    TBlocker: Blocker<PublicKey = PublicKey>,
446    TPeerManager: AddressableManager<PublicKey = PublicKey>,
447{
448    context: ContextCell<TContext>,
449
450    /// broadcasts messages to and caches messages from untrusted peers.
451    // XXX: alto calls this `buffered`. That's confusing. We call it `broadcast`.
452    broadcast: buffered::Engine<TContext, PublicKey, Block, peer_manager::Mailbox>,
453    broadcast_mailbox: buffered::Mailbox<PublicKey, Block>,
454
455    dkg_manager: dkg::manager::Actor<TContext>,
456    dkg_manager_mailbox: dkg::manager::Mailbox,
457
458    /// Acts as the glue between the consensus and execution layers implementing
459    /// the `[commonware_consensus::Automaton]` trait.
460    application: application::Actor<TContext>,
461
462    /// Responsible for keeping the consensus layer state and execution layer
463    /// states in sync. Drives the chain state of the execution layer by sending
464    /// forkchoice-updates.
465    executor: crate::executor::Actor<TContext>,
466    executor_mailbox: crate::executor::Mailbox,
467
468    /// Resolver config that will be passed to the marshal actor upon start.
469    resolver_config: marshal::resolver::p2p::Config<PublicKey, peer_manager::Mailbox, TBlocker>,
470
471    /// Listens to consensus events and syncs blocks from the network to the
472    /// local node.
473    marshal: crate::alias::marshal::Actor<TContext>,
474
475    epoch_manager: epoch::manager::Actor<TContext, TBlocker>,
476    epoch_manager_mailbox: epoch::manager::Mailbox,
477
478    peer_manager: peer_manager::Actor<TContext, TPeerManager>,
479    peer_manager_mailbox: peer_manager::Mailbox,
480
481    feed: crate::feed::Actor<TContext>,
482
483    subblocks: Option<subblocks::Actor<TContext>>,
484}
485
486impl<TContext, TBlocker, TPeerManager> Engine<TContext, TBlocker, TPeerManager>
487where
488    TContext: BufferPooler
489        + Clock
490        + governor::clock::Clock
491        + Rng
492        + CryptoRng
493        + Metrics
494        + Network
495        + Pacer
496        + Spawner
497        + Storage,
498    TBlocker: Blocker<PublicKey = PublicKey> + Sync,
499    TPeerManager: AddressableManager<PublicKey = PublicKey> + Sync,
500{
501    #[expect(
502        clippy::too_many_arguments,
503        reason = "following commonware's style of writing"
504    )]
505    pub fn start(
506        mut self,
507        votes_network: (
508            impl Sender<PublicKey = PublicKey>,
509            impl Receiver<PublicKey = PublicKey>,
510        ),
511        certificates_network: (
512            impl Sender<PublicKey = PublicKey>,
513            impl Receiver<PublicKey = PublicKey>,
514        ),
515        resolver_network: (
516            impl Sender<PublicKey = PublicKey>,
517            impl Receiver<PublicKey = PublicKey>,
518        ),
519        broadcast_network: (
520            impl Sender<PublicKey = PublicKey>,
521            impl Receiver<PublicKey = PublicKey>,
522        ),
523        marshal_network: (
524            impl Sender<PublicKey = PublicKey>,
525            impl Receiver<PublicKey = PublicKey>,
526        ),
527        dkg_channel: (
528            impl Sender<PublicKey = PublicKey>,
529            impl Receiver<PublicKey = PublicKey>,
530        ),
531        subblocks_channel: (
532            impl Sender<PublicKey = PublicKey>,
533            impl Receiver<PublicKey = PublicKey>,
534        ),
535    ) -> Handle<eyre::Result<()>> {
536        spawn_cell!(
537            self.context,
538            self.run(
539                votes_network,
540                certificates_network,
541                resolver_network,
542                broadcast_network,
543                marshal_network,
544                dkg_channel,
545                subblocks_channel,
546            )
547            .await
548        )
549    }
550
551    #[expect(
552        clippy::too_many_arguments,
553        reason = "following commonware's style of writing"
554    )]
555    async fn run(
556        self,
557        votes_channel: (
558            impl Sender<PublicKey = PublicKey>,
559            impl Receiver<PublicKey = PublicKey>,
560        ),
561        certificates_channel: (
562            impl Sender<PublicKey = PublicKey>,
563            impl Receiver<PublicKey = PublicKey>,
564        ),
565        resolver_channel: (
566            impl Sender<PublicKey = PublicKey>,
567            impl Receiver<PublicKey = PublicKey>,
568        ),
569        broadcast_channel: (
570            impl Sender<PublicKey = PublicKey>,
571            impl Receiver<PublicKey = PublicKey>,
572        ),
573        marshal_channel: (
574            impl Sender<PublicKey = PublicKey>,
575            impl Receiver<PublicKey = PublicKey>,
576        ),
577        dkg_channel: (
578            impl Sender<PublicKey = PublicKey>,
579            impl Receiver<PublicKey = PublicKey>,
580        ),
581        subblocks_channel: (
582            impl Sender<PublicKey = PublicKey>,
583            impl Receiver<PublicKey = PublicKey>,
584        ),
585    ) -> eyre::Result<()> {
586        let peer_manager = self.peer_manager.start();
587
588        let broadcast = self.broadcast.start(broadcast_channel);
589        let resolver =
590            marshal::resolver::p2p::init(&self.context, self.resolver_config, marshal_channel);
591
592        let application = self.application.start(self.dkg_manager_mailbox.clone());
593        let executor = self.executor.start();
594
595        let marshal = self.marshal.start(
596            Reporters::from((
597                self.epoch_manager_mailbox,
598                Reporters::from((
599                    self.executor_mailbox,
600                    Reporters::from((self.dkg_manager_mailbox.clone(), self.peer_manager_mailbox)),
601                )),
602            )),
603            self.broadcast_mailbox,
604            resolver,
605        );
606
607        let epoch_manager =
608            self.epoch_manager
609                .start(votes_channel, certificates_channel, resolver_channel);
610
611        let feed = self.feed.start();
612
613        let dkg_manager = self.dkg_manager.start(dkg_channel);
614
615        let mut tasks = vec![
616            application,
617            broadcast,
618            epoch_manager,
619            executor,
620            feed,
621            marshal,
622            dkg_manager,
623            peer_manager,
624        ];
625
626        if let Some(subblocks) = self.subblocks {
627            tasks.push(self.context.spawn(|_| subblocks.run(subblocks_channel)));
628        } else {
629            drop(subblocks_channel);
630        }
631
632        try_join_all(tasks)
633            .await
634            .map(|_| ())
635            // TODO: look into adding error context so that we know which
636            // component failed.
637            .wrap_err("one of the consensus engine's actors failed")
638    }
639}