tempo_commonware_node/consensus/
engine.rs

1//! [`Engine`] drives the application and is modelled after commonware's [`alto`] toy blockchain.
2//!
3//! [`alto`]: https://github.com/commonwarexyx/alto
4
5use std::{
6    net::SocketAddr,
7    num::{NonZeroU64, NonZeroUsize},
8    time::Duration,
9};
10
11use commonware_broadcast::buffered;
12use commonware_consensus::{Reporters, marshal};
13use commonware_cryptography::{
14    Signer as _,
15    bls12381::primitives::group::Share,
16    ed25519::{PrivateKey, PublicKey},
17};
18use commonware_p2p::{Blocker, Receiver, Sender};
19use commonware_runtime::{
20    Clock, ContextCell, Handle, Metrics, Network, Pacer, Spawner, Storage, buffer::PoolRef,
21    spawn_cell,
22};
23use commonware_utils::set::OrderedAssociated;
24use eyre::{OptionExt as _, WrapErr as _};
25use futures::future::try_join_all;
26use rand::{CryptoRng, Rng};
27use tempo_node::TempoFullNode;
28use tracing::info;
29
30use crate::{
31    config::{BLOCKS_FREEZER_TABLE_INITIAL_SIZE_BYTES, MARSHAL_LIMIT},
32    consensus::application,
33    dkg,
34    epoch::{self, SchemeProvider},
35    subblocks,
36};
37
38use super::block::Block;
39
40// A bunch of constants to configure commonwarexyz singletons and copied over form alto.
41
42/// To better support peers near tip during network instability, we multiply
43/// the consensus activity timeout by this factor.
44const SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER: u64 = 10;
45const PRUNABLE_ITEMS_PER_SECTION: NonZeroU64 = NonZeroU64::new(4_096).expect("value is not zero");
46const IMMUTABLE_ITEMS_PER_SECTION: NonZeroU64 =
47    NonZeroU64::new(262_144).expect("value is not zero");
48const FREEZER_TABLE_RESIZE_FREQUENCY: u8 = 4;
49const FREEZER_TABLE_RESIZE_CHUNK_SIZE: u32 = 2u32.pow(16); // 3MB
50const FREEZER_JOURNAL_TARGET_SIZE: u64 = 1024 * 1024 * 1024; // 1GB
51const FREEZER_JOURNAL_COMPRESSION: Option<u8> = Some(3);
52const REPLAY_BUFFER: NonZeroUsize = NonZeroUsize::new(8 * 1024 * 1024).expect("value is not zero"); // 8MB
53const WRITE_BUFFER: NonZeroUsize = NonZeroUsize::new(1024 * 1024).expect("value is not zero"); // 1MB
54const BUFFER_POOL_PAGE_SIZE: NonZeroUsize = NonZeroUsize::new(4_096).expect("value is not zero"); // 4KB
55const BUFFER_POOL_CAPACITY: NonZeroUsize = NonZeroUsize::new(8_192).expect("value is not zero"); // 32MB
56const MAX_REPAIR: NonZeroU64 = NonZeroU64::new(20).expect("value is not zero");
57
58/// Settings for [`Engine`].
59///
60// XXX: Mostly a one-to-one copy of alto for now. We also put the context in here
61// because there doesn't really seem to be a point putting it into an extra initializer.
62#[derive(Clone)]
63pub struct Builder<TBlocker, TContext, TPeerManager> {
64    /// The contextg
65    pub context: TContext,
66
67    pub fee_recipient: alloy_primitives::Address,
68
69    pub execution_node: Option<TempoFullNode>,
70
71    pub blocker: TBlocker,
72    pub peer_manager: TPeerManager,
73
74    pub partition_prefix: String,
75    pub signer: PrivateKey,
76    pub share: Option<Share>,
77    pub mailbox_size: usize,
78    pub deque_size: usize,
79
80    pub time_to_propose: Duration,
81    pub time_to_collect_notarizations: Duration,
82    pub time_to_retry_nullify_broadcast: Duration,
83    pub time_for_peer_response: Duration,
84    pub views_to_track: u64,
85    pub views_until_leader_skip: u64,
86    pub new_payload_wait_time: Duration,
87    pub time_to_build_subblock: Duration,
88    pub subblock_broadcast_interval: Duration,
89}
90
91impl<TBlocker, TContext, TPeerManager> Builder<TBlocker, TContext, TPeerManager>
92where
93    TBlocker: Blocker<PublicKey = PublicKey>,
94    TContext: Clock
95        + governor::clock::Clock
96        + Rng
97        + CryptoRng
98        + Pacer
99        + Spawner
100        + Storage
101        + Metrics
102        + Network,
103    TPeerManager: commonware_p2p::Manager<
104            PublicKey = PublicKey,
105            Peers = OrderedAssociated<PublicKey, SocketAddr>,
106        >,
107{
108    pub fn with_execution_node(mut self, execution_node: TempoFullNode) -> Self {
109        self.execution_node = Some(execution_node);
110        self
111    }
112
113    pub async fn try_init(self) -> eyre::Result<Engine<TBlocker, TContext, TPeerManager>> {
114        let execution_node = self
115            .execution_node
116            .clone()
117            .ok_or_eyre("execution_node must be set using with_execution_node()")?;
118
119        let epoch_length = execution_node
120            .chain_spec()
121            .info
122            .epoch_length()
123            .ok_or_eyre("chainspec did not contain epochLength; cannot go on without it")?;
124
125        info!(
126            identity = %self.signer.public_key(),
127            "using public ed25519 verifying key derived from provided private ed25519 signing key",
128        );
129
130        let (broadcast, broadcast_mailbox) = buffered::Engine::new(
131            self.context.with_label("broadcast"),
132            buffered::Config {
133                public_key: self.signer.public_key(),
134                mailbox_size: self.mailbox_size,
135                deque_size: self.deque_size,
136                priority: true,
137                codec_config: (),
138            },
139        );
140
141        // Create the buffer pool
142        let buffer_pool = PoolRef::new(BUFFER_POOL_PAGE_SIZE, BUFFER_POOL_CAPACITY);
143
144        // XXX: All hard-coded values here are the same as prior to commonware
145        // making the resolver configurable in
146        // https://github.com/commonwarexyz/monorepo/commit/92870f39b4a9e64a28434b3729ebff5aba67fb4e
147        let resolver_config = commonware_consensus::marshal::resolver::p2p::Config {
148            public_key: self.signer.public_key(),
149            manager: self.peer_manager.clone(),
150            mailbox_size: self.mailbox_size,
151            blocker: self.blocker.clone(),
152            requester_config: commonware_p2p::utils::requester::Config {
153                me: Some(self.signer.public_key()),
154                rate_limit: MARSHAL_LIMIT,
155                initial: Duration::from_secs(1),
156                timeout: Duration::from_secs(2),
157            },
158            fetch_retry_timeout: Duration::from_millis(100),
159            priority_requests: false,
160            priority_responses: false,
161        };
162        let scheme_provider = SchemeProvider::new();
163        let (marshal, marshal_mailbox) = marshal::Actor::init(
164            self.context.with_label("marshal"),
165            marshal::Config {
166                scheme_provider: scheme_provider.clone(),
167                epoch_length,
168                partition_prefix: self.partition_prefix.clone(),
169                mailbox_size: self.mailbox_size,
170                view_retention_timeout: self
171                    .views_to_track
172                    .saturating_mul(SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER),
173                namespace: crate::config::NAMESPACE.to_vec(),
174                prunable_items_per_section: PRUNABLE_ITEMS_PER_SECTION,
175                immutable_items_per_section: IMMUTABLE_ITEMS_PER_SECTION,
176                freezer_table_initial_size: BLOCKS_FREEZER_TABLE_INITIAL_SIZE_BYTES,
177                freezer_table_resize_frequency: FREEZER_TABLE_RESIZE_FREQUENCY,
178                freezer_table_resize_chunk_size: FREEZER_TABLE_RESIZE_CHUNK_SIZE,
179                freezer_journal_target_size: FREEZER_JOURNAL_TARGET_SIZE,
180                freezer_journal_compression: FREEZER_JOURNAL_COMPRESSION,
181
182                freezer_journal_buffer_pool: buffer_pool.clone(),
183
184                replay_buffer: REPLAY_BUFFER,
185                write_buffer: WRITE_BUFFER,
186                block_codec_config: (),
187                max_repair: MAX_REPAIR,
188                _marker: std::marker::PhantomData,
189            },
190        )
191        .await;
192
193        let subblocks = subblocks::Actor::new(subblocks::Config {
194            context: self.context.clone(),
195            signer: self.signer.clone(),
196            scheme_provider: scheme_provider.clone(),
197            node: execution_node.clone(),
198            fee_recipient: self.fee_recipient,
199            time_to_build_subblock: self.time_to_build_subblock,
200            subblock_broadcast_interval: self.subblock_broadcast_interval,
201            epoch_length,
202        });
203
204        let (application, application_mailbox) = application::init(super::application::Config {
205            context: self.context.with_label("application"),
206            // TODO: pass in from the outside,
207            fee_recipient: self.fee_recipient,
208            mailbox_size: self.mailbox_size,
209            marshal: marshal_mailbox.clone(),
210            execution_node: execution_node.clone(),
211            new_payload_wait_time: self.new_payload_wait_time,
212            subblocks: subblocks.mailbox(),
213            scheme_provider: scheme_provider.clone(),
214            epoch_length,
215        })
216        .await
217        .wrap_err("failed initializing application actor")?;
218
219        let (epoch_manager, epoch_manager_mailbox) = epoch::manager::init(
220            epoch::manager::Config {
221                application: application_mailbox.clone(),
222                blocker: self.blocker.clone(),
223                buffer_pool: buffer_pool.clone(),
224                epoch_length,
225                time_for_peer_response: self.time_for_peer_response,
226                time_to_propose: self.time_to_propose,
227                mailbox_size: self.mailbox_size,
228                subblocks: subblocks.mailbox(),
229                marshal: marshal_mailbox.clone(),
230                scheme_provider: scheme_provider.clone(),
231                time_to_collect_notarizations: self.time_to_collect_notarizations,
232                time_to_retry_nullify_broadcast: self.time_to_retry_nullify_broadcast,
233                partition_prefix: format!("{}_epoch_manager", self.partition_prefix),
234                views_to_track: self.views_to_track,
235                views_until_leader_skip: self.views_until_leader_skip,
236            },
237            self.context.with_label("epoch_manager"),
238        );
239
240        let (dkg_manager, dkg_manager_mailbox) = dkg::manager::init(
241            self.context.with_label("dkg_manager"),
242            dkg::manager::Config {
243                epoch_manager: epoch_manager_mailbox,
244                epoch_length,
245                execution_node,
246                initial_share: self.share.clone(),
247                mailbox_size: self.mailbox_size,
248                marshal: marshal_mailbox,
249                namespace: crate::config::NAMESPACE.to_vec(),
250                me: self.signer.clone(),
251                partition_prefix: format!("{}_dkg_manager", self.partition_prefix),
252                peer_manager: self.peer_manager.clone(),
253            },
254        )
255        .await
256        .wrap_err("failed initializing dkg manager")?;
257
258        Ok(Engine {
259            context: ContextCell::new(self.context),
260
261            broadcast,
262            broadcast_mailbox,
263
264            dkg_manager,
265            dkg_manager_mailbox,
266
267            application,
268            application_mailbox,
269
270            resolver_config,
271            marshal,
272
273            epoch_manager,
274
275            subblocks,
276        })
277    }
278}
279
280pub struct Engine<TBlocker, TContext, TPeerManager>
281where
282    TBlocker: Blocker<PublicKey = PublicKey>,
283    TContext: Clock
284        + governor::clock::Clock
285        + Rng
286        + CryptoRng
287        + Metrics
288        + Network
289        + Pacer
290        + Spawner
291        + Storage,
292    TPeerManager: commonware_p2p::Manager<
293            PublicKey = PublicKey,
294            Peers = OrderedAssociated<PublicKey, SocketAddr>,
295        >,
296{
297    context: ContextCell<TContext>,
298
299    /// broadcasts messages to and caches messages from untrusted peers.
300    // XXX: alto calls this `buffered`. That's confusing. We call it `broadcast`.
301    broadcast: buffered::Engine<TContext, PublicKey, Block>,
302    broadcast_mailbox: buffered::Mailbox<PublicKey, Block>,
303
304    dkg_manager: dkg::manager::Actor<TContext, TPeerManager>,
305    dkg_manager_mailbox: dkg::manager::Mailbox,
306
307    /// The core of the application, the glue between commonware-xyz consensus and reth-execution.
308    application: application::Actor<TContext>,
309    application_mailbox: application::Mailbox,
310
311    /// Resolver config that will be passed to the marshal actor upon start.
312    resolver_config: marshal::resolver::p2p::Config<PublicKey, TPeerManager, TBlocker>,
313
314    /// Listens to consensus events and syncs blocks from the network to the
315    /// local node.
316    marshal: crate::alias::marshal::Actor<TContext>,
317
318    epoch_manager: epoch::manager::Actor<TBlocker, TContext>,
319
320    subblocks: subblocks::Actor<TContext>,
321}
322
323impl<TBlocker, TContext, TPeerManager> Engine<TBlocker, TContext, TPeerManager>
324where
325    TBlocker: Blocker<PublicKey = PublicKey>,
326    TContext: Clock
327        + governor::clock::Clock
328        + Rng
329        + CryptoRng
330        + Metrics
331        + Network
332        + Pacer
333        + Spawner
334        + Storage,
335    TPeerManager: commonware_p2p::Manager<
336            PublicKey = PublicKey,
337            Peers = OrderedAssociated<PublicKey, SocketAddr>,
338        >,
339{
340    #[expect(
341        clippy::too_many_arguments,
342        reason = "following commonware's style of writing"
343    )]
344    pub fn start(
345        mut self,
346        pending_network: (
347            impl Sender<PublicKey = PublicKey>,
348            impl Receiver<PublicKey = PublicKey>,
349        ),
350        recovered_network: (
351            impl Sender<PublicKey = PublicKey>,
352            impl Receiver<PublicKey = PublicKey>,
353        ),
354        resolver_network: (
355            impl Sender<PublicKey = PublicKey>,
356            impl Receiver<PublicKey = PublicKey>,
357        ),
358        broadcast_network: (
359            impl Sender<PublicKey = PublicKey>,
360            impl Receiver<PublicKey = PublicKey>,
361        ),
362        marshal_network: (
363            impl Sender<PublicKey = PublicKey>,
364            impl Receiver<PublicKey = PublicKey>,
365        ),
366        dkg_channel: (
367            impl Sender<PublicKey = PublicKey>,
368            impl Receiver<PublicKey = PublicKey>,
369        ),
370        boundary_certificates_channel: (
371            impl Sender<PublicKey = PublicKey>,
372            impl Receiver<PublicKey = PublicKey>,
373        ),
374        subblocks_channel: (
375            impl Sender<PublicKey = PublicKey>,
376            impl Receiver<PublicKey = PublicKey>,
377        ),
378    ) -> Handle<eyre::Result<()>> {
379        spawn_cell!(
380            self.context,
381            self.run(
382                pending_network,
383                recovered_network,
384                resolver_network,
385                broadcast_network,
386                marshal_network,
387                dkg_channel,
388                boundary_certificates_channel,
389                subblocks_channel,
390            )
391            .await
392        )
393    }
394
395    #[expect(
396        clippy::too_many_arguments,
397        reason = "following commonware's style of writing"
398    )]
399    async fn run(
400        self,
401        pending_channel: (
402            impl Sender<PublicKey = PublicKey>,
403            impl Receiver<PublicKey = PublicKey>,
404        ),
405        recovered_channel: (
406            impl Sender<PublicKey = PublicKey>,
407            impl Receiver<PublicKey = PublicKey>,
408        ),
409        resolver_channel: (
410            impl Sender<PublicKey = PublicKey>,
411            impl Receiver<PublicKey = PublicKey>,
412        ),
413        broadcast_channel: (
414            impl Sender<PublicKey = PublicKey>,
415            impl Receiver<PublicKey = PublicKey>,
416        ),
417        marshal_channel: (
418            impl Sender<PublicKey = PublicKey>,
419            impl Receiver<PublicKey = PublicKey>,
420        ),
421        dkg_channel: (
422            impl Sender<PublicKey = PublicKey>,
423            impl Receiver<PublicKey = PublicKey>,
424        ),
425        boundary_certificates_channel: (
426            impl Sender<PublicKey = PublicKey>,
427            impl Receiver<PublicKey = PublicKey>,
428        ),
429        subblocks_channel: (
430            impl Sender<PublicKey = PublicKey>,
431            impl Receiver<PublicKey = PublicKey>,
432        ),
433    ) -> eyre::Result<()> {
434        let broadcast = self.broadcast.start(broadcast_channel);
435        let resolver =
436            marshal::resolver::p2p::init(&self.context, self.resolver_config, marshal_channel);
437
438        let application = self.application.start(self.dkg_manager_mailbox.clone());
439
440        let marshal = self.marshal.start(
441            Reporters::from((self.application_mailbox, self.dkg_manager_mailbox.clone())),
442            self.broadcast_mailbox,
443            resolver,
444        );
445
446        let epoch_manager = self.epoch_manager.start(
447            pending_channel,
448            recovered_channel,
449            resolver_channel,
450            boundary_certificates_channel,
451        );
452
453        let subblocks = self
454            .context
455            .spawn(|_| self.subblocks.run(subblocks_channel));
456
457        let dkg_manager = self.dkg_manager.start(dkg_channel);
458
459        try_join_all(vec![
460            application,
461            broadcast,
462            epoch_manager,
463            marshal,
464            dkg_manager,
465            subblocks,
466        ])
467        .await
468        .map(|_| ())
469        // TODO: look into adding error context so that we know which
470        // component failed.
471        .wrap_err("one of the consensus engine's actors failed")
472    }
473}