Skip to main content

tempo_consensus/consensus/
engine.rs

1//! [`Engine`] drives the application and is modelled after commonware's [`alto`] toy blockchain.
2//!
3//! [`alto`]: https://github.com/commonwarexyx/alto
4
5use std::{num::NonZeroUsize, sync::Arc, time::Duration};
6
7use commonware_broadcast::buffered;
8use commonware_consensus::{
9    Reporters, marshal,
10    types::{FixedEpocher, ViewDelta},
11};
12use commonware_cryptography::{
13    Signer as _,
14    bls12381::primitives::group::Share,
15    ed25519::{PrivateKey, PublicKey},
16};
17use commonware_p2p::{AddressableManager, Blocker, Receiver, Sender};
18use commonware_runtime::{
19    BufferPooler, Clock, ContextCell, Handle, Metrics, Network, Pacer, Spawner, Storage,
20    buffer::paged::CacheRef, spawn_cell,
21};
22use commonware_utils::{NZU64, NZUsize};
23use eyre::{OptionExt as _, WrapErr as _};
24use futures::future::try_join_all;
25use rand_08::{CryptoRng, Rng};
26use tempo_node::TempoFullNode;
27use tracing::info;
28
29use crate::{
30    alias,
31    consensus::application,
32    dkg,
33    epoch::{self, SchemeProvider},
34    peer_manager, storage, subblocks,
35};
36
37use super::block::Block;
38
39// A bunch of constants to configure commonwarexyz singletons and copied over form alto.
40
41/// To better support peers near tip during network instability, we multiply
42/// the consensus activity timeout by this factor.
43const SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER: u64 = 10;
44// Ensure the marshal delivers blocks sequentially.
45const MAX_PENDING_ACKS: NonZeroUsize = NZUsize!(1);
46
47/// Settings for [`Engine`].
48///
49// XXX: Mostly a one-to-one copy of alto for now. We also put the context in here
50// because there doesn't really seem to be a point putting it into an extra initializer.
51#[derive(Clone)]
52pub struct Builder<TBlocker, TPeerManager> {
53    pub execution_node: Option<Arc<TempoFullNode>>,
54
55    pub blocker: TBlocker,
56    pub peer_manager: TPeerManager,
57
58    pub partition_prefix: String,
59    pub signer: PrivateKey,
60    pub share: Option<Share>,
61
62    pub mailbox_size: usize,
63    pub deque_size: usize,
64
65    /// Maximum time to wait for the leader's proposal before timing out a view.
66    ///
67    /// This is a liveness timeout, not the normal block pacing target.
68    pub time_to_propose: Duration,
69    pub time_to_collect_notarizations: Duration,
70    pub time_to_retry_nullify_broadcast: Duration,
71    pub time_for_peer_response: Duration,
72    pub views_to_track: u64,
73    pub views_until_leader_skip: u64,
74    /// Local proposal return budget after reserving network propagation time.
75    ///
76    /// The leader uses this window for payload building, local marshal
77    /// persistence, and any final wait before returning the proposal.
78    pub proposal_return_budget: Duration,
79    pub time_to_build_subblock: Duration,
80    pub subblock_broadcast_interval: Duration,
81    pub fcu_heartbeat_interval: Duration,
82    pub with_subblocks: bool,
83
84    pub feed_state: crate::feed::FeedStateHandle,
85
86    /// Number of recently finalized blocks retained in the prunable archive
87    /// passed to the marshal actor. Older blocks are served from reth.
88    pub finalized_blocks_retention: u64,
89}
90
91impl<TBlocker, TPeerManager> Builder<TBlocker, TPeerManager>
92where
93    TBlocker: Blocker<PublicKey = PublicKey> + Sync,
94    TPeerManager: AddressableManager<PublicKey = PublicKey> + Sync,
95{
96    pub fn with_execution_node(mut self, execution_node: Arc<TempoFullNode>) -> Self {
97        self.execution_node = Some(execution_node);
98        self
99    }
100
101    pub async fn try_init<TContext>(
102        self,
103        context: TContext,
104    ) -> eyre::Result<Engine<TContext, TBlocker, TPeerManager>>
105    where
106        TContext: Clock
107            + governor::clock::Clock
108            + Rng
109            + CryptoRng
110            + Pacer
111            + Spawner
112            + Storage
113            + Metrics
114            + Network
115            + BufferPooler,
116    {
117        let execution_node = self
118            .execution_node
119            .clone()
120            .ok_or_eyre("execution_node must be set using with_execution_node()")?;
121
122        let epoch_length = execution_node
123            .chain_spec()
124            .info
125            .epoch_length()
126            .ok_or_eyre("chainspec did not contain epochLength; cannot go on without it")?;
127
128        let epoch_strategy = FixedEpocher::new(NZU64!(epoch_length));
129
130        info!(
131            identity = %self.signer.public_key(),
132            "using public ed25519 verifying key derived from provided private ed25519 signing key",
133        );
134
135        let page_cache_ref = CacheRef::from_pooler(
136            &context,
137            storage::BUFFER_POOL_PAGE_SIZE,
138            storage::BUFFER_POOL_CAPACITY,
139        );
140
141        let scheme_provider = SchemeProvider::new();
142
143        let alias::marshal::Initialized {
144            actor: marshal,
145            mailbox: marshal_mailbox,
146            last_finalized_height,
147        } = alias::marshal::init(
148            context.clone(),
149            page_cache_ref.clone(),
150            execution_node.clone(),
151            alias::marshal::Config {
152                partition_prefix: self.partition_prefix.clone(),
153                mailbox_size: self.mailbox_size,
154                view_retention_timeout: ViewDelta::new(
155                    self.views_to_track
156                        .saturating_mul(SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER),
157                ),
158                max_pending_acks: MAX_PENDING_ACKS,
159                finalized_blocks_retention: self.finalized_blocks_retention,
160                epoch_strategy: epoch_strategy.clone(),
161                scheme_provider: scheme_provider.clone(),
162            },
163        )
164        .await
165        .wrap_err("failed to initialize marshal")?;
166
167        let (executor, executor_mailbox) = crate::executor::init(
168            context.with_label("executor"),
169            crate::executor::Config {
170                execution_node: execution_node.clone(),
171                last_finalized_height,
172                marshal: marshal_mailbox.clone(),
173                fcu_heartbeat_interval: self.fcu_heartbeat_interval,
174                public_key: Some(self.signer.public_key()),
175            },
176        )
177        .wrap_err("failed initialization executor actor")?;
178
179        let (peer_manager, peer_manager_mailbox) = peer_manager::init(
180            context.with_label("peer_manager"),
181            peer_manager::Config {
182                execution_node: execution_node.clone(),
183                oracle: self.peer_manager.clone(),
184                epoch_strategy: epoch_strategy.clone(),
185                last_finalized_height,
186            },
187        );
188
189        let (broadcast, broadcast_mailbox) = buffered::Engine::new(
190            context.with_label("broadcast"),
191            buffered::Config {
192                public_key: self.signer.public_key(),
193                mailbox_size: self.mailbox_size,
194                deque_size: self.deque_size,
195                peer_provider: peer_manager_mailbox.clone(),
196                priority: true,
197                codec_config: (),
198            },
199        );
200
201        // XXX: All hard-coded values here are the same as prior to commonware
202        // making the resolver configurable in
203        // https://github.com/commonwarexyz/monorepo/commit/92870f39b4a9e64a28434b3729ebff5aba67fb4e
204        let resolver_config = commonware_consensus::marshal::resolver::p2p::Config {
205            public_key: self.signer.public_key(),
206            peer_provider: peer_manager_mailbox.clone(),
207            mailbox_size: self.mailbox_size,
208            blocker: self.blocker.clone(),
209            initial: Duration::from_secs(1),
210            timeout: Duration::from_secs(2),
211            fetch_retry_timeout: Duration::from_millis(100),
212            priority_requests: false,
213            priority_responses: false,
214        };
215
216        let subblocks = self.with_subblocks.then(|| {
217            subblocks::Actor::new(subblocks::Config {
218                context: context.clone(),
219                signer: self.signer.clone(),
220                scheme_provider: scheme_provider.clone(),
221                node: execution_node.clone(),
222                // TODO: subblocks are currently dead; hardcode the recipient to
223                // zero until this is wired through V2 or the subblocks logic is
224                // replaced.
225                fee_recipient: alloy_primitives::Address::ZERO,
226                time_to_build_subblock: self.time_to_build_subblock,
227                subblock_broadcast_interval: self.subblock_broadcast_interval,
228                epoch_strategy: epoch_strategy.clone(),
229            })
230        });
231
232        let (feed, feed_mailbox) = crate::feed::init(
233            context.with_label("feed"),
234            marshal_mailbox.clone(),
235            epoch_strategy.clone(),
236            execution_node.clone(),
237            self.feed_state,
238        );
239
240        let (application, application_mailbox) = application::init(super::application::Config {
241            context: context.with_label("application"),
242            public_key: self.signer.public_key(),
243            mailbox_size: self.mailbox_size,
244            marshal: marshal_mailbox.clone(),
245            execution_node: execution_node.clone(),
246            executor: executor_mailbox.clone(),
247            proposal_return_budget: self.proposal_return_budget,
248            subblocks: subblocks.as_ref().map(|s| s.mailbox()),
249            scheme_provider: scheme_provider.clone(),
250            epoch_strategy: epoch_strategy.clone(),
251        })
252        .await
253        .wrap_err("failed initializing application actor")?;
254
255        let (epoch_manager, epoch_manager_mailbox) = epoch::manager::init(
256            context.with_label("epoch_manager"),
257            epoch::manager::Config {
258                application: application_mailbox.clone(),
259                blocker: self.blocker.clone(),
260                page_cache: page_cache_ref,
261                epoch_strategy: epoch_strategy.clone(),
262                time_for_peer_response: self.time_for_peer_response,
263                time_to_propose: self.time_to_propose,
264                mailbox_size: self.mailbox_size,
265                subblocks: subblocks.as_ref().map(|s| s.mailbox()),
266                marshal: marshal_mailbox.clone(),
267                feed: feed_mailbox.clone(),
268                scheme_provider: scheme_provider.clone(),
269                time_to_collect_notarizations: self.time_to_collect_notarizations,
270                time_to_retry_nullify_broadcast: self.time_to_retry_nullify_broadcast,
271                partition_prefix: format!("{}_epoch_manager", self.partition_prefix),
272                views_to_track: ViewDelta::new(self.views_to_track),
273                views_until_leader_skip: ViewDelta::new(self.views_until_leader_skip),
274            },
275        );
276
277        let (dkg_manager, dkg_manager_mailbox) = dkg::manager::init(
278            context.with_label("dkg_manager"),
279            dkg::manager::Config {
280                epoch_manager: epoch_manager_mailbox.clone(),
281                epoch_strategy: epoch_strategy.clone(),
282                execution_node,
283                initial_share: self.share.clone(),
284                mailbox_size: self.mailbox_size,
285                marshal: marshal_mailbox,
286                namespace: crate::config::NAMESPACE.to_vec(),
287                me: self.signer.clone(),
288                partition_prefix: format!("{}_dkg_manager", self.partition_prefix),
289            },
290        )
291        .await
292        .wrap_err("failed initializing dkg manager")?;
293
294        Ok(Engine {
295            context: ContextCell::new(context),
296
297            broadcast,
298            broadcast_mailbox,
299
300            dkg_manager,
301            dkg_manager_mailbox,
302
303            application,
304
305            executor,
306            executor_mailbox,
307
308            resolver_config,
309            marshal,
310
311            epoch_manager,
312            epoch_manager_mailbox,
313
314            peer_manager,
315            peer_manager_mailbox,
316
317            feed,
318
319            subblocks,
320        })
321    }
322}
323
324pub struct Engine<TContext, TBlocker, TPeerManager>
325where
326    TContext: BufferPooler
327        + Clock
328        + governor::clock::Clock
329        + Rng
330        + CryptoRng
331        + Metrics
332        + Network
333        + Pacer
334        + Spawner
335        + Storage,
336    TBlocker: Blocker<PublicKey = PublicKey>,
337    TPeerManager: AddressableManager<PublicKey = PublicKey>,
338{
339    context: ContextCell<TContext>,
340
341    /// broadcasts messages to and caches messages from untrusted peers.
342    // XXX: alto calls this `buffered`. That's confusing. We call it `broadcast`.
343    broadcast: buffered::Engine<TContext, PublicKey, Block, peer_manager::Mailbox>,
344    broadcast_mailbox: buffered::Mailbox<PublicKey, Block>,
345
346    dkg_manager: dkg::manager::Actor<TContext>,
347    dkg_manager_mailbox: dkg::manager::Mailbox,
348
349    /// Acts as the glue between the consensus and execution layers implementing
350    /// the `[commonware_consensus::Automaton]` trait.
351    application: application::Actor<TContext>,
352
353    /// Responsible for keeping the consensus layer state and execution layer
354    /// states in sync. Drives the chain state of the execution layer by sending
355    /// forkchoice-updates.
356    executor: crate::executor::Actor<TContext>,
357    executor_mailbox: crate::executor::Mailbox,
358
359    /// Resolver config that will be passed to the marshal actor upon start.
360    resolver_config: marshal::resolver::p2p::Config<PublicKey, peer_manager::Mailbox, TBlocker>,
361
362    /// Listens to consensus events and syncs blocks from the network to the
363    /// local node.
364    marshal: crate::alias::marshal::Actor<TContext>,
365
366    epoch_manager: epoch::manager::Actor<TContext, TBlocker>,
367    epoch_manager_mailbox: epoch::manager::Mailbox,
368
369    peer_manager: peer_manager::Actor<TContext, TPeerManager>,
370    peer_manager_mailbox: peer_manager::Mailbox,
371
372    feed: crate::feed::Actor<TContext>,
373
374    subblocks: Option<subblocks::Actor<TContext>>,
375}
376
377impl<TContext, TBlocker, TPeerManager> Engine<TContext, TBlocker, TPeerManager>
378where
379    TContext: BufferPooler
380        + Clock
381        + governor::clock::Clock
382        + Rng
383        + CryptoRng
384        + Metrics
385        + Network
386        + Pacer
387        + Spawner
388        + Storage,
389    TBlocker: Blocker<PublicKey = PublicKey> + Sync,
390    TPeerManager: AddressableManager<PublicKey = PublicKey> + Sync,
391{
392    #[expect(
393        clippy::too_many_arguments,
394        reason = "following commonware's style of writing"
395    )]
396    pub fn start(
397        mut self,
398        votes_network: (
399            impl Sender<PublicKey = PublicKey>,
400            impl Receiver<PublicKey = PublicKey>,
401        ),
402        certificates_network: (
403            impl Sender<PublicKey = PublicKey>,
404            impl Receiver<PublicKey = PublicKey>,
405        ),
406        resolver_network: (
407            impl Sender<PublicKey = PublicKey>,
408            impl Receiver<PublicKey = PublicKey>,
409        ),
410        broadcast_network: (
411            impl Sender<PublicKey = PublicKey>,
412            impl Receiver<PublicKey = PublicKey>,
413        ),
414        marshal_network: (
415            impl Sender<PublicKey = PublicKey>,
416            impl Receiver<PublicKey = PublicKey>,
417        ),
418        dkg_channel: (
419            impl Sender<PublicKey = PublicKey>,
420            impl Receiver<PublicKey = PublicKey>,
421        ),
422        subblocks_channel: (
423            impl Sender<PublicKey = PublicKey>,
424            impl Receiver<PublicKey = PublicKey>,
425        ),
426    ) -> Handle<eyre::Result<()>> {
427        spawn_cell!(
428            self.context,
429            self.run(
430                votes_network,
431                certificates_network,
432                resolver_network,
433                broadcast_network,
434                marshal_network,
435                dkg_channel,
436                subblocks_channel,
437            )
438        )
439    }
440
441    #[expect(
442        clippy::too_many_arguments,
443        reason = "following commonware's style of writing"
444    )]
445    async fn run(
446        self,
447        votes_channel: (
448            impl Sender<PublicKey = PublicKey>,
449            impl Receiver<PublicKey = PublicKey>,
450        ),
451        certificates_channel: (
452            impl Sender<PublicKey = PublicKey>,
453            impl Receiver<PublicKey = PublicKey>,
454        ),
455        resolver_channel: (
456            impl Sender<PublicKey = PublicKey>,
457            impl Receiver<PublicKey = PublicKey>,
458        ),
459        broadcast_channel: (
460            impl Sender<PublicKey = PublicKey>,
461            impl Receiver<PublicKey = PublicKey>,
462        ),
463        marshal_channel: (
464            impl Sender<PublicKey = PublicKey>,
465            impl Receiver<PublicKey = PublicKey>,
466        ),
467        dkg_channel: (
468            impl Sender<PublicKey = PublicKey>,
469            impl Receiver<PublicKey = PublicKey>,
470        ),
471        subblocks_channel: (
472            impl Sender<PublicKey = PublicKey>,
473            impl Receiver<PublicKey = PublicKey>,
474        ),
475    ) -> eyre::Result<()> {
476        let peer_manager = self.peer_manager.start();
477
478        let broadcast = self.broadcast.start(broadcast_channel);
479        let resolver =
480            marshal::resolver::p2p::init(&self.context, self.resolver_config, marshal_channel);
481
482        let application = self.application.start(self.dkg_manager_mailbox.clone());
483        let executor = self.executor.start();
484
485        let marshal = self.marshal.start(
486            Reporters::from((
487                self.epoch_manager_mailbox,
488                Reporters::from((
489                    self.executor_mailbox,
490                    Reporters::from((self.dkg_manager_mailbox.clone(), self.peer_manager_mailbox)),
491                )),
492            )),
493            self.broadcast_mailbox,
494            resolver,
495        );
496
497        let epoch_manager =
498            self.epoch_manager
499                .start(votes_channel, certificates_channel, resolver_channel);
500
501        let feed = self.feed.start();
502
503        let dkg_manager = self.dkg_manager.start(dkg_channel);
504
505        let mut tasks = vec![
506            application,
507            broadcast,
508            epoch_manager,
509            executor,
510            feed,
511            marshal,
512            dkg_manager,
513            peer_manager,
514        ];
515
516        if let Some(subblocks) = self.subblocks {
517            tasks.push(self.context.spawn(|_| subblocks.run(subblocks_channel)));
518        } else {
519            drop(subblocks_channel);
520        }
521
522        try_join_all(tasks)
523            .await
524            .map(|_| ())
525            // TODO: look into adding error context so that we know which
526            // component failed.
527            .wrap_err("one of the consensus engine's actors failed")
528    }
529}