1use std::{
6 num::{NonZeroU16, NonZeroU64, NonZeroUsize},
7 time::{Duration, Instant},
8};
9
10use commonware_broadcast::buffered;
11use commonware_consensus::{
12 Reporters, marshal,
13 simplex::scheme::bls12381_threshold::vrf::Scheme,
14 types::{FixedEpocher, ViewDelta},
15};
16use commonware_cryptography::{
17 Signer as _,
18 bls12381::primitives::{group::Share, variant::MinSig},
19 certificate::Scheme as _,
20 ed25519::{PrivateKey, PublicKey},
21};
22use commonware_p2p::{AddressableManager, Blocker, Receiver, Sender};
23use commonware_parallel::Sequential;
24use commonware_runtime::{
25 BufferPooler, Clock, ContextCell, Handle, Metrics, Network, Pacer, Spawner, Storage,
26 buffer::paged::CacheRef, spawn_cell,
27};
28use commonware_storage::archive::immutable;
29use commonware_utils::{NZU16, NZU64, NZUsize};
30use eyre::{OptionExt as _, WrapErr as _};
31use futures::future::try_join_all;
32use rand_08::{CryptoRng, Rng};
33use tempo_node::TempoFullNode;
34use tracing::info;
35
36use crate::{
37 config::BLOCKS_FREEZER_TABLE_INITIAL_SIZE_BYTES,
38 consensus::application,
39 dkg,
40 epoch::{self, SchemeProvider},
41 peer_manager, subblocks,
42};
43
44use super::block::Block;
45
46const SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER: u64 = 10;
51const PRUNABLE_ITEMS_PER_SECTION: NonZeroU64 = NZU64!(4_096);
52const IMMUTABLE_ITEMS_PER_SECTION: NonZeroU64 = NZU64!(262_144);
53const FREEZER_TABLE_RESIZE_FREQUENCY: u8 = 4;
54const FREEZER_TABLE_RESIZE_CHUNK_SIZE: u32 = 2u32.pow(16); const FREEZER_VALUE_TARGET_SIZE: u64 = 1024 * 1024 * 1024; const FREEZER_VALUE_COMPRESSION: Option<u8> = Some(3);
57const REPLAY_BUFFER: NonZeroUsize = NZUsize!(8 * 1024 * 1024); const WRITE_BUFFER: NonZeroUsize = NZUsize!(1024 * 1024); const BUFFER_POOL_PAGE_SIZE: NonZeroU16 = NZU16!(4_096); const BUFFER_POOL_CAPACITY: NonZeroUsize = NZUsize!(8_192); const MAX_REPAIR: NonZeroUsize = NZUsize!(20);
62
63const MAX_PENDING_ACKS: NonZeroUsize = NZUsize!(1);
65
66#[derive(Clone)]
71pub struct Builder<TBlocker, TPeerManager> {
72 pub fee_recipient: alloy_primitives::Address,
73
74 pub execution_node: Option<TempoFullNode>,
75
76 pub blocker: TBlocker,
77 pub peer_manager: TPeerManager,
78
79 pub partition_prefix: String,
80 pub signer: PrivateKey,
81 pub share: Option<Share>,
82
83 pub mailbox_size: usize,
84 pub deque_size: usize,
85
86 pub time_to_propose: Duration,
87 pub time_to_collect_notarizations: Duration,
88 pub time_to_retry_nullify_broadcast: Duration,
89 pub time_for_peer_response: Duration,
90 pub views_to_track: u64,
91 pub views_until_leader_skip: u64,
92 pub payload_interrupt_time: Duration,
93 pub new_payload_wait_time: Duration,
94 pub time_to_build_subblock: Duration,
95 pub subblock_broadcast_interval: Duration,
96 pub fcu_heartbeat_interval: Duration,
97 pub with_subblocks: bool,
98
99 pub feed_state: crate::feed::FeedStateHandle,
100}
101
102impl<TBlocker, TPeerManager> Builder<TBlocker, TPeerManager>
103where
104 TBlocker: Blocker<PublicKey = PublicKey> + Sync,
105 TPeerManager: AddressableManager<PublicKey = PublicKey> + Sync,
106{
107 pub fn with_execution_node(mut self, execution_node: TempoFullNode) -> Self {
108 self.execution_node = Some(execution_node);
109 self
110 }
111
112 pub async fn try_init<TContext>(
113 self,
114 context: TContext,
115 ) -> eyre::Result<Engine<TContext, TBlocker, TPeerManager>>
116 where
117 TContext: Clock
118 + governor::clock::Clock
119 + Rng
120 + CryptoRng
121 + Pacer
122 + Spawner
123 + Storage
124 + Metrics
125 + Network
126 + BufferPooler,
127 {
128 let execution_node = self
129 .execution_node
130 .clone()
131 .ok_or_eyre("execution_node must be set using with_execution_node()")?;
132
133 let epoch_length = execution_node
134 .chain_spec()
135 .info
136 .epoch_length()
137 .ok_or_eyre("chainspec did not contain epochLength; cannot go on without it")?;
138
139 let epoch_strategy = FixedEpocher::new(NZU64!(epoch_length));
140
141 info!(
142 identity = %self.signer.public_key(),
143 "using public ed25519 verifying key derived from provided private ed25519 signing key",
144 );
145
146 let page_cache_ref =
147 CacheRef::from_pooler(&context, BUFFER_POOL_PAGE_SIZE, BUFFER_POOL_CAPACITY);
148
149 let scheme_provider = SchemeProvider::new();
150
151 const FINALIZATIONS_BY_HEIGHT: &str = "finalizations-by-height";
152 let start = Instant::now();
153 let finalizations_by_height = immutable::Archive::init(
154 context.with_label("finalizations_by_height"),
155 immutable::Config {
156 metadata_partition: format!(
157 "{}-{FINALIZATIONS_BY_HEIGHT}-metadata",
158 self.partition_prefix,
159 ),
160
161 freezer_table_partition: format!(
162 "{}-{FINALIZATIONS_BY_HEIGHT}-freezer-table",
163 self.partition_prefix,
164 ),
165
166 freezer_table_initial_size: BLOCKS_FREEZER_TABLE_INITIAL_SIZE_BYTES,
167 freezer_table_resize_frequency: FREEZER_TABLE_RESIZE_FREQUENCY,
168 freezer_table_resize_chunk_size: FREEZER_TABLE_RESIZE_CHUNK_SIZE,
169
170 freezer_key_partition: format!(
171 "{}-{FINALIZATIONS_BY_HEIGHT}-freezer-key",
172 self.partition_prefix,
173 ),
174 freezer_key_page_cache: page_cache_ref.clone(),
175
176 freezer_value_partition: format!(
177 "{}-{FINALIZATIONS_BY_HEIGHT}-freezer-value",
178 self.partition_prefix,
179 ),
180 freezer_value_target_size: FREEZER_VALUE_TARGET_SIZE,
181 freezer_value_compression: FREEZER_VALUE_COMPRESSION,
182
183 ordinal_partition: format!(
184 "{}-{FINALIZATIONS_BY_HEIGHT}-ordinal",
185 self.partition_prefix,
186 ),
187
188 items_per_section: IMMUTABLE_ITEMS_PER_SECTION,
189 codec_config: Scheme::<PublicKey, MinSig>::certificate_codec_config_unbounded(),
190
191 replay_buffer: REPLAY_BUFFER,
192 freezer_key_write_buffer: WRITE_BUFFER,
193 freezer_value_write_buffer: WRITE_BUFFER,
194 ordinal_write_buffer: WRITE_BUFFER,
195 },
196 )
197 .await
198 .wrap_err("failed to initialize finalizations by height archive")?;
199 info!(elapsed = ?start.elapsed(), "restored finalizations by height archive");
200
201 const FINALIZED_BLOCKS: &str = "finalized_blocks";
202 let start = Instant::now();
203 let finalized_blocks = immutable::Archive::init(
204 context.with_label("finalized_blocks"),
205 immutable::Config {
206 metadata_partition: format!(
207 "{}-{FINALIZED_BLOCKS}-metadata",
208 self.partition_prefix,
209 ),
210
211 freezer_table_partition: format!(
212 "{}-{FINALIZED_BLOCKS}-freezer-table",
213 self.partition_prefix,
214 ),
215
216 freezer_table_initial_size: BLOCKS_FREEZER_TABLE_INITIAL_SIZE_BYTES,
217 freezer_table_resize_frequency: FREEZER_TABLE_RESIZE_FREQUENCY,
218 freezer_table_resize_chunk_size: FREEZER_TABLE_RESIZE_CHUNK_SIZE,
219
220 freezer_key_partition: format!(
221 "{}-{FINALIZED_BLOCKS}-freezer-key",
222 self.partition_prefix,
223 ),
224 freezer_key_page_cache: page_cache_ref.clone(),
225
226 freezer_value_partition: format!(
227 "{}-{FINALIZED_BLOCKS}-freezer-value",
228 self.partition_prefix,
229 ),
230 freezer_value_target_size: FREEZER_VALUE_TARGET_SIZE,
231 freezer_value_compression: FREEZER_VALUE_COMPRESSION,
232
233 ordinal_partition: format!("{}-{FINALIZED_BLOCKS}-ordinal", self.partition_prefix,),
234 items_per_section: IMMUTABLE_ITEMS_PER_SECTION,
235 codec_config: (),
236
237 replay_buffer: REPLAY_BUFFER,
238 freezer_key_write_buffer: WRITE_BUFFER,
239 freezer_value_write_buffer: WRITE_BUFFER,
240 ordinal_write_buffer: WRITE_BUFFER,
241 },
242 )
243 .await
244 .wrap_err("failed to initialize finalizations by height archive")?;
245 info!(elapsed = ?start.elapsed(), "restored finalizations by height archive");
246
247 let (marshal, marshal_mailbox, last_finalized_height) = marshal::core::Actor::init(
250 context.with_label("marshal"),
251 finalizations_by_height,
252 finalized_blocks,
253 marshal::Config {
254 provider: scheme_provider.clone(),
255 epocher: epoch_strategy.clone(),
256 partition_prefix: self.partition_prefix.clone(),
257 mailbox_size: self.mailbox_size,
258 view_retention_timeout: ViewDelta::new(
259 self.views_to_track
260 .saturating_mul(SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER),
261 ),
262 prunable_items_per_section: PRUNABLE_ITEMS_PER_SECTION,
263
264 page_cache: page_cache_ref.clone(),
265
266 replay_buffer: REPLAY_BUFFER,
267 key_write_buffer: WRITE_BUFFER,
268 value_write_buffer: WRITE_BUFFER,
269 max_repair: MAX_REPAIR,
270 max_pending_acks: MAX_PENDING_ACKS,
271 block_codec_config: (),
272
273 strategy: Sequential,
274 },
275 )
276 .await;
277
278 let (executor, executor_mailbox) = crate::executor::init(
279 context.with_label("executor"),
280 crate::executor::Config {
281 execution_node: execution_node.clone(),
282 last_finalized_height,
283 marshal: marshal_mailbox.clone(),
284 fcu_heartbeat_interval: self.fcu_heartbeat_interval,
285 },
286 )
287 .wrap_err("failed initialization executor actor")?;
288
289 let (peer_manager, peer_manager_mailbox) = peer_manager::init(
290 context.with_label("peer_manager"),
291 peer_manager::Config {
292 execution_node: execution_node.clone(),
293 executor: executor_mailbox.clone(),
294 oracle: self.peer_manager.clone(),
295 epoch_strategy: epoch_strategy.clone(),
296 last_finalized_height,
297 },
298 );
299
300 let (broadcast, broadcast_mailbox) = buffered::Engine::new(
301 context.with_label("broadcast"),
302 buffered::Config {
303 public_key: self.signer.public_key(),
304 mailbox_size: self.mailbox_size,
305 deque_size: self.deque_size,
306 peer_provider: peer_manager_mailbox.clone(),
307 priority: true,
308 codec_config: (),
309 },
310 );
311
312 let resolver_config = commonware_consensus::marshal::resolver::p2p::Config {
316 public_key: self.signer.public_key(),
317 peer_provider: peer_manager_mailbox.clone(),
318 mailbox_size: self.mailbox_size,
319 blocker: self.blocker.clone(),
320 initial: Duration::from_secs(1),
321 timeout: Duration::from_secs(2),
322 fetch_retry_timeout: Duration::from_millis(100),
323 priority_requests: false,
324 priority_responses: false,
325 };
326
327 let subblocks = self.with_subblocks.then(|| {
328 subblocks::Actor::new(subblocks::Config {
329 context: context.clone(),
330 signer: self.signer.clone(),
331 scheme_provider: scheme_provider.clone(),
332 node: execution_node.clone(),
333 fee_recipient: self.fee_recipient,
334 time_to_build_subblock: self.time_to_build_subblock,
335 subblock_broadcast_interval: self.subblock_broadcast_interval,
336 epoch_strategy: epoch_strategy.clone(),
337 })
338 });
339
340 let (feed, feed_mailbox) = crate::feed::init(
341 context.with_label("feed"),
342 marshal_mailbox.clone(),
343 epoch_strategy.clone(),
344 execution_node.clone(),
345 self.feed_state,
346 );
347
348 let (application, application_mailbox) = application::init(super::application::Config {
349 context: context.with_label("application"),
350 fee_recipient: self.fee_recipient,
351 mailbox_size: self.mailbox_size,
352 marshal: marshal_mailbox.clone(),
353 execution_node: execution_node.clone(),
354 executor: executor_mailbox.clone(),
355 payload_resolve_time: self.payload_interrupt_time,
356 payload_return_time: self.new_payload_wait_time,
357 subblocks: subblocks.as_ref().map(|s| s.mailbox()),
358 scheme_provider: scheme_provider.clone(),
359 epoch_strategy: epoch_strategy.clone(),
360 })
361 .await
362 .wrap_err("failed initializing application actor")?;
363
364 let (epoch_manager, epoch_manager_mailbox) = epoch::manager::init(
365 context.with_label("epoch_manager"),
366 epoch::manager::Config {
367 application: application_mailbox.clone(),
368 blocker: self.blocker.clone(),
369 page_cache: page_cache_ref,
370 epoch_strategy: epoch_strategy.clone(),
371 time_for_peer_response: self.time_for_peer_response,
372 time_to_propose: self.time_to_propose,
373 mailbox_size: self.mailbox_size,
374 subblocks: subblocks.as_ref().map(|s| s.mailbox()),
375 marshal: marshal_mailbox.clone(),
376 feed: feed_mailbox.clone(),
377 scheme_provider: scheme_provider.clone(),
378 time_to_collect_notarizations: self.time_to_collect_notarizations,
379 time_to_retry_nullify_broadcast: self.time_to_retry_nullify_broadcast,
380 partition_prefix: format!("{}_epoch_manager", self.partition_prefix),
381 views_to_track: ViewDelta::new(self.views_to_track),
382 views_until_leader_skip: ViewDelta::new(self.views_until_leader_skip),
383 },
384 );
385
386 let (dkg_manager, dkg_manager_mailbox) = dkg::manager::init(
387 context.with_label("dkg_manager"),
388 dkg::manager::Config {
389 epoch_manager: epoch_manager_mailbox.clone(),
390 epoch_strategy: epoch_strategy.clone(),
391 execution_node,
392 initial_share: self.share.clone(),
393 mailbox_size: self.mailbox_size,
394 marshal: marshal_mailbox,
395 namespace: crate::config::NAMESPACE.to_vec(),
396 me: self.signer.clone(),
397 partition_prefix: format!("{}_dkg_manager", self.partition_prefix),
398 },
399 )
400 .await
401 .wrap_err("failed initializing dkg manager")?;
402
403 Ok(Engine {
404 context: ContextCell::new(context),
405
406 broadcast,
407 broadcast_mailbox,
408
409 dkg_manager,
410 dkg_manager_mailbox,
411
412 application,
413
414 executor,
415 executor_mailbox,
416
417 resolver_config,
418 marshal,
419
420 epoch_manager,
421 epoch_manager_mailbox,
422
423 peer_manager,
424 peer_manager_mailbox,
425
426 feed,
427
428 subblocks,
429 })
430 }
431}
432
433pub struct Engine<TContext, TBlocker, TPeerManager>
434where
435 TContext: BufferPooler
436 + Clock
437 + governor::clock::Clock
438 + Rng
439 + CryptoRng
440 + Metrics
441 + Network
442 + Pacer
443 + Spawner
444 + Storage,
445 TBlocker: Blocker<PublicKey = PublicKey>,
446 TPeerManager: AddressableManager<PublicKey = PublicKey>,
447{
448 context: ContextCell<TContext>,
449
450 broadcast: buffered::Engine<TContext, PublicKey, Block, peer_manager::Mailbox>,
453 broadcast_mailbox: buffered::Mailbox<PublicKey, Block>,
454
455 dkg_manager: dkg::manager::Actor<TContext>,
456 dkg_manager_mailbox: dkg::manager::Mailbox,
457
458 application: application::Actor<TContext>,
461
462 executor: crate::executor::Actor<TContext>,
466 executor_mailbox: crate::executor::Mailbox,
467
468 resolver_config: marshal::resolver::p2p::Config<PublicKey, peer_manager::Mailbox, TBlocker>,
470
471 marshal: crate::alias::marshal::Actor<TContext>,
474
475 epoch_manager: epoch::manager::Actor<TContext, TBlocker>,
476 epoch_manager_mailbox: epoch::manager::Mailbox,
477
478 peer_manager: peer_manager::Actor<TContext, TPeerManager>,
479 peer_manager_mailbox: peer_manager::Mailbox,
480
481 feed: crate::feed::Actor<TContext>,
482
483 subblocks: Option<subblocks::Actor<TContext>>,
484}
485
486impl<TContext, TBlocker, TPeerManager> Engine<TContext, TBlocker, TPeerManager>
487where
488 TContext: BufferPooler
489 + Clock
490 + governor::clock::Clock
491 + Rng
492 + CryptoRng
493 + Metrics
494 + Network
495 + Pacer
496 + Spawner
497 + Storage,
498 TBlocker: Blocker<PublicKey = PublicKey> + Sync,
499 TPeerManager: AddressableManager<PublicKey = PublicKey> + Sync,
500{
501 #[expect(
502 clippy::too_many_arguments,
503 reason = "following commonware's style of writing"
504 )]
505 pub fn start(
506 mut self,
507 votes_network: (
508 impl Sender<PublicKey = PublicKey>,
509 impl Receiver<PublicKey = PublicKey>,
510 ),
511 certificates_network: (
512 impl Sender<PublicKey = PublicKey>,
513 impl Receiver<PublicKey = PublicKey>,
514 ),
515 resolver_network: (
516 impl Sender<PublicKey = PublicKey>,
517 impl Receiver<PublicKey = PublicKey>,
518 ),
519 broadcast_network: (
520 impl Sender<PublicKey = PublicKey>,
521 impl Receiver<PublicKey = PublicKey>,
522 ),
523 marshal_network: (
524 impl Sender<PublicKey = PublicKey>,
525 impl Receiver<PublicKey = PublicKey>,
526 ),
527 dkg_channel: (
528 impl Sender<PublicKey = PublicKey>,
529 impl Receiver<PublicKey = PublicKey>,
530 ),
531 subblocks_channel: (
532 impl Sender<PublicKey = PublicKey>,
533 impl Receiver<PublicKey = PublicKey>,
534 ),
535 ) -> Handle<eyre::Result<()>> {
536 spawn_cell!(
537 self.context,
538 self.run(
539 votes_network,
540 certificates_network,
541 resolver_network,
542 broadcast_network,
543 marshal_network,
544 dkg_channel,
545 subblocks_channel,
546 )
547 .await
548 )
549 }
550
551 #[expect(
552 clippy::too_many_arguments,
553 reason = "following commonware's style of writing"
554 )]
555 async fn run(
556 self,
557 votes_channel: (
558 impl Sender<PublicKey = PublicKey>,
559 impl Receiver<PublicKey = PublicKey>,
560 ),
561 certificates_channel: (
562 impl Sender<PublicKey = PublicKey>,
563 impl Receiver<PublicKey = PublicKey>,
564 ),
565 resolver_channel: (
566 impl Sender<PublicKey = PublicKey>,
567 impl Receiver<PublicKey = PublicKey>,
568 ),
569 broadcast_channel: (
570 impl Sender<PublicKey = PublicKey>,
571 impl Receiver<PublicKey = PublicKey>,
572 ),
573 marshal_channel: (
574 impl Sender<PublicKey = PublicKey>,
575 impl Receiver<PublicKey = PublicKey>,
576 ),
577 dkg_channel: (
578 impl Sender<PublicKey = PublicKey>,
579 impl Receiver<PublicKey = PublicKey>,
580 ),
581 subblocks_channel: (
582 impl Sender<PublicKey = PublicKey>,
583 impl Receiver<PublicKey = PublicKey>,
584 ),
585 ) -> eyre::Result<()> {
586 let peer_manager = self.peer_manager.start();
587
588 let broadcast = self.broadcast.start(broadcast_channel);
589 let resolver =
590 marshal::resolver::p2p::init(&self.context, self.resolver_config, marshal_channel);
591
592 let application = self.application.start(self.dkg_manager_mailbox.clone());
593 let executor = self.executor.start();
594
595 let marshal = self.marshal.start(
596 Reporters::from((
597 self.epoch_manager_mailbox,
598 Reporters::from((
599 self.executor_mailbox,
600 Reporters::from((self.dkg_manager_mailbox.clone(), self.peer_manager_mailbox)),
601 )),
602 )),
603 self.broadcast_mailbox,
604 resolver,
605 );
606
607 let epoch_manager =
608 self.epoch_manager
609 .start(votes_channel, certificates_channel, resolver_channel);
610
611 let feed = self.feed.start();
612
613 let dkg_manager = self.dkg_manager.start(dkg_channel);
614
615 let mut tasks = vec![
616 application,
617 broadcast,
618 epoch_manager,
619 executor,
620 feed,
621 marshal,
622 dkg_manager,
623 peer_manager,
624 ];
625
626 if let Some(subblocks) = self.subblocks {
627 tasks.push(self.context.spawn(|_| subblocks.run(subblocks_channel)));
628 } else {
629 drop(subblocks_channel);
630 }
631
632 try_join_all(tasks)
633 .await
634 .map(|_| ())
635 .wrap_err("one of the consensus engine's actors failed")
638 }
639}