1use std::{num::NonZeroUsize, sync::Arc, time::Duration};
6
7use commonware_broadcast::buffered;
8use commonware_consensus::{
9 Reporters, marshal,
10 types::{FixedEpocher, ViewDelta},
11};
12use commonware_cryptography::{
13 Signer as _,
14 bls12381::primitives::group::Share,
15 ed25519::{PrivateKey, PublicKey},
16};
17use commonware_p2p::{AddressableManager, Blocker, Receiver, Sender};
18use commonware_runtime::{
19 BufferPooler, Clock, ContextCell, Handle, Metrics, Network, Pacer, Spawner, Storage,
20 buffer::paged::CacheRef, spawn_cell,
21};
22use commonware_utils::{NZU64, NZUsize};
23use eyre::{OptionExt as _, WrapErr as _};
24use futures::future::try_join_all;
25use rand_08::{CryptoRng, Rng};
26use tempo_node::TempoFullNode;
27use tracing::info;
28
29use crate::{
30 alias,
31 consensus::application,
32 dkg,
33 epoch::{self, SchemeProvider},
34 peer_manager, storage, subblocks,
35};
36
37use super::block::Block;
38
39const SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER: u64 = 10;
44const MAX_PENDING_ACKS: NonZeroUsize = NZUsize!(1);
46
47#[derive(Clone)]
52pub struct Builder<TBlocker, TPeerManager> {
53 pub execution_node: Option<Arc<TempoFullNode>>,
54
55 pub blocker: TBlocker,
56 pub peer_manager: TPeerManager,
57
58 pub partition_prefix: String,
59 pub signer: PrivateKey,
60 pub share: Option<Share>,
61
62 pub mailbox_size: usize,
63 pub deque_size: usize,
64
65 pub time_to_propose: Duration,
69 pub time_to_collect_notarizations: Duration,
70 pub time_to_retry_nullify_broadcast: Duration,
71 pub time_for_peer_response: Duration,
72 pub views_to_track: u64,
73 pub views_until_leader_skip: u64,
74 pub proposal_return_budget: Duration,
79 pub time_to_build_subblock: Duration,
80 pub subblock_broadcast_interval: Duration,
81 pub fcu_heartbeat_interval: Duration,
82 pub with_subblocks: bool,
83
84 pub feed_state: crate::feed::FeedStateHandle,
85
86 pub finalized_blocks_retention: u64,
89}
90
91impl<TBlocker, TPeerManager> Builder<TBlocker, TPeerManager>
92where
93 TBlocker: Blocker<PublicKey = PublicKey> + Sync,
94 TPeerManager: AddressableManager<PublicKey = PublicKey> + Sync,
95{
96 pub fn with_execution_node(mut self, execution_node: Arc<TempoFullNode>) -> Self {
97 self.execution_node = Some(execution_node);
98 self
99 }
100
101 pub async fn try_init<TContext>(
102 self,
103 context: TContext,
104 ) -> eyre::Result<Engine<TContext, TBlocker, TPeerManager>>
105 where
106 TContext: Clock
107 + governor::clock::Clock
108 + Rng
109 + CryptoRng
110 + Pacer
111 + Spawner
112 + Storage
113 + Metrics
114 + Network
115 + BufferPooler,
116 {
117 let execution_node = self
118 .execution_node
119 .clone()
120 .ok_or_eyre("execution_node must be set using with_execution_node()")?;
121
122 let epoch_length = execution_node
123 .chain_spec()
124 .info
125 .epoch_length()
126 .ok_or_eyre("chainspec did not contain epochLength; cannot go on without it")?;
127
128 let epoch_strategy = FixedEpocher::new(NZU64!(epoch_length));
129
130 info!(
131 identity = %self.signer.public_key(),
132 "using public ed25519 verifying key derived from provided private ed25519 signing key",
133 );
134
135 let page_cache_ref = CacheRef::from_pooler(
136 &context,
137 storage::BUFFER_POOL_PAGE_SIZE,
138 storage::BUFFER_POOL_CAPACITY,
139 );
140
141 let scheme_provider = SchemeProvider::new();
142
143 let alias::marshal::Initialized {
144 actor: marshal,
145 mailbox: marshal_mailbox,
146 last_finalized_height,
147 } = alias::marshal::init(
148 context.clone(),
149 page_cache_ref.clone(),
150 execution_node.clone(),
151 alias::marshal::Config {
152 partition_prefix: self.partition_prefix.clone(),
153 mailbox_size: self.mailbox_size,
154 view_retention_timeout: ViewDelta::new(
155 self.views_to_track
156 .saturating_mul(SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER),
157 ),
158 max_pending_acks: MAX_PENDING_ACKS,
159 finalized_blocks_retention: self.finalized_blocks_retention,
160 epoch_strategy: epoch_strategy.clone(),
161 scheme_provider: scheme_provider.clone(),
162 },
163 )
164 .await
165 .wrap_err("failed to initialize marshal")?;
166
167 let (executor, executor_mailbox) = crate::executor::init(
168 context.with_label("executor"),
169 crate::executor::Config {
170 execution_node: execution_node.clone(),
171 last_finalized_height,
172 marshal: marshal_mailbox.clone(),
173 fcu_heartbeat_interval: self.fcu_heartbeat_interval,
174 public_key: Some(self.signer.public_key()),
175 },
176 )
177 .wrap_err("failed initialization executor actor")?;
178
179 let (peer_manager, peer_manager_mailbox) = peer_manager::init(
180 context.with_label("peer_manager"),
181 peer_manager::Config {
182 execution_node: execution_node.clone(),
183 oracle: self.peer_manager.clone(),
184 epoch_strategy: epoch_strategy.clone(),
185 last_finalized_height,
186 },
187 );
188
189 let (broadcast, broadcast_mailbox) = buffered::Engine::new(
190 context.with_label("broadcast"),
191 buffered::Config {
192 public_key: self.signer.public_key(),
193 mailbox_size: self.mailbox_size,
194 deque_size: self.deque_size,
195 peer_provider: peer_manager_mailbox.clone(),
196 priority: true,
197 codec_config: (),
198 },
199 );
200
201 let resolver_config = commonware_consensus::marshal::resolver::p2p::Config {
205 public_key: self.signer.public_key(),
206 peer_provider: peer_manager_mailbox.clone(),
207 mailbox_size: self.mailbox_size,
208 blocker: self.blocker.clone(),
209 initial: Duration::from_secs(1),
210 timeout: Duration::from_secs(2),
211 fetch_retry_timeout: Duration::from_millis(100),
212 priority_requests: false,
213 priority_responses: false,
214 };
215
216 let subblocks = self.with_subblocks.then(|| {
217 subblocks::Actor::new(subblocks::Config {
218 context: context.clone(),
219 signer: self.signer.clone(),
220 scheme_provider: scheme_provider.clone(),
221 node: execution_node.clone(),
222 fee_recipient: alloy_primitives::Address::ZERO,
226 time_to_build_subblock: self.time_to_build_subblock,
227 subblock_broadcast_interval: self.subblock_broadcast_interval,
228 epoch_strategy: epoch_strategy.clone(),
229 })
230 });
231
232 let (feed, feed_mailbox) = crate::feed::init(
233 context.with_label("feed"),
234 marshal_mailbox.clone(),
235 epoch_strategy.clone(),
236 execution_node.clone(),
237 self.feed_state,
238 );
239
240 let (application, application_mailbox) = application::init(super::application::Config {
241 context: context.with_label("application"),
242 public_key: self.signer.public_key(),
243 mailbox_size: self.mailbox_size,
244 marshal: marshal_mailbox.clone(),
245 execution_node: execution_node.clone(),
246 executor: executor_mailbox.clone(),
247 proposal_return_budget: self.proposal_return_budget,
248 subblocks: subblocks.as_ref().map(|s| s.mailbox()),
249 scheme_provider: scheme_provider.clone(),
250 epoch_strategy: epoch_strategy.clone(),
251 })
252 .await
253 .wrap_err("failed initializing application actor")?;
254
255 let (epoch_manager, epoch_manager_mailbox) = epoch::manager::init(
256 context.with_label("epoch_manager"),
257 epoch::manager::Config {
258 application: application_mailbox.clone(),
259 blocker: self.blocker.clone(),
260 page_cache: page_cache_ref,
261 epoch_strategy: epoch_strategy.clone(),
262 time_for_peer_response: self.time_for_peer_response,
263 time_to_propose: self.time_to_propose,
264 mailbox_size: self.mailbox_size,
265 subblocks: subblocks.as_ref().map(|s| s.mailbox()),
266 marshal: marshal_mailbox.clone(),
267 feed: feed_mailbox.clone(),
268 scheme_provider: scheme_provider.clone(),
269 time_to_collect_notarizations: self.time_to_collect_notarizations,
270 time_to_retry_nullify_broadcast: self.time_to_retry_nullify_broadcast,
271 partition_prefix: format!("{}_epoch_manager", self.partition_prefix),
272 views_to_track: ViewDelta::new(self.views_to_track),
273 views_until_leader_skip: ViewDelta::new(self.views_until_leader_skip),
274 },
275 );
276
277 let (dkg_manager, dkg_manager_mailbox) = dkg::manager::init(
278 context.with_label("dkg_manager"),
279 dkg::manager::Config {
280 epoch_manager: epoch_manager_mailbox.clone(),
281 epoch_strategy: epoch_strategy.clone(),
282 execution_node,
283 initial_share: self.share.clone(),
284 mailbox_size: self.mailbox_size,
285 marshal: marshal_mailbox,
286 namespace: crate::config::NAMESPACE.to_vec(),
287 me: self.signer.clone(),
288 partition_prefix: format!("{}_dkg_manager", self.partition_prefix),
289 },
290 )
291 .await
292 .wrap_err("failed initializing dkg manager")?;
293
294 Ok(Engine {
295 context: ContextCell::new(context),
296
297 broadcast,
298 broadcast_mailbox,
299
300 dkg_manager,
301 dkg_manager_mailbox,
302
303 application,
304
305 executor,
306 executor_mailbox,
307
308 resolver_config,
309 marshal,
310
311 epoch_manager,
312 epoch_manager_mailbox,
313
314 peer_manager,
315 peer_manager_mailbox,
316
317 feed,
318
319 subblocks,
320 })
321 }
322}
323
324pub struct Engine<TContext, TBlocker, TPeerManager>
325where
326 TContext: BufferPooler
327 + Clock
328 + governor::clock::Clock
329 + Rng
330 + CryptoRng
331 + Metrics
332 + Network
333 + Pacer
334 + Spawner
335 + Storage,
336 TBlocker: Blocker<PublicKey = PublicKey>,
337 TPeerManager: AddressableManager<PublicKey = PublicKey>,
338{
339 context: ContextCell<TContext>,
340
341 broadcast: buffered::Engine<TContext, PublicKey, Block, peer_manager::Mailbox>,
344 broadcast_mailbox: buffered::Mailbox<PublicKey, Block>,
345
346 dkg_manager: dkg::manager::Actor<TContext>,
347 dkg_manager_mailbox: dkg::manager::Mailbox,
348
349 application: application::Actor<TContext>,
352
353 executor: crate::executor::Actor<TContext>,
357 executor_mailbox: crate::executor::Mailbox,
358
359 resolver_config: marshal::resolver::p2p::Config<PublicKey, peer_manager::Mailbox, TBlocker>,
361
362 marshal: crate::alias::marshal::Actor<TContext>,
365
366 epoch_manager: epoch::manager::Actor<TContext, TBlocker>,
367 epoch_manager_mailbox: epoch::manager::Mailbox,
368
369 peer_manager: peer_manager::Actor<TContext, TPeerManager>,
370 peer_manager_mailbox: peer_manager::Mailbox,
371
372 feed: crate::feed::Actor<TContext>,
373
374 subblocks: Option<subblocks::Actor<TContext>>,
375}
376
377impl<TContext, TBlocker, TPeerManager> Engine<TContext, TBlocker, TPeerManager>
378where
379 TContext: BufferPooler
380 + Clock
381 + governor::clock::Clock
382 + Rng
383 + CryptoRng
384 + Metrics
385 + Network
386 + Pacer
387 + Spawner
388 + Storage,
389 TBlocker: Blocker<PublicKey = PublicKey> + Sync,
390 TPeerManager: AddressableManager<PublicKey = PublicKey> + Sync,
391{
392 #[expect(
393 clippy::too_many_arguments,
394 reason = "following commonware's style of writing"
395 )]
396 pub fn start(
397 mut self,
398 votes_network: (
399 impl Sender<PublicKey = PublicKey>,
400 impl Receiver<PublicKey = PublicKey>,
401 ),
402 certificates_network: (
403 impl Sender<PublicKey = PublicKey>,
404 impl Receiver<PublicKey = PublicKey>,
405 ),
406 resolver_network: (
407 impl Sender<PublicKey = PublicKey>,
408 impl Receiver<PublicKey = PublicKey>,
409 ),
410 broadcast_network: (
411 impl Sender<PublicKey = PublicKey>,
412 impl Receiver<PublicKey = PublicKey>,
413 ),
414 marshal_network: (
415 impl Sender<PublicKey = PublicKey>,
416 impl Receiver<PublicKey = PublicKey>,
417 ),
418 dkg_channel: (
419 impl Sender<PublicKey = PublicKey>,
420 impl Receiver<PublicKey = PublicKey>,
421 ),
422 subblocks_channel: (
423 impl Sender<PublicKey = PublicKey>,
424 impl Receiver<PublicKey = PublicKey>,
425 ),
426 ) -> Handle<eyre::Result<()>> {
427 spawn_cell!(
428 self.context,
429 self.run(
430 votes_network,
431 certificates_network,
432 resolver_network,
433 broadcast_network,
434 marshal_network,
435 dkg_channel,
436 subblocks_channel,
437 )
438 )
439 }
440
441 #[expect(
442 clippy::too_many_arguments,
443 reason = "following commonware's style of writing"
444 )]
445 async fn run(
446 self,
447 votes_channel: (
448 impl Sender<PublicKey = PublicKey>,
449 impl Receiver<PublicKey = PublicKey>,
450 ),
451 certificates_channel: (
452 impl Sender<PublicKey = PublicKey>,
453 impl Receiver<PublicKey = PublicKey>,
454 ),
455 resolver_channel: (
456 impl Sender<PublicKey = PublicKey>,
457 impl Receiver<PublicKey = PublicKey>,
458 ),
459 broadcast_channel: (
460 impl Sender<PublicKey = PublicKey>,
461 impl Receiver<PublicKey = PublicKey>,
462 ),
463 marshal_channel: (
464 impl Sender<PublicKey = PublicKey>,
465 impl Receiver<PublicKey = PublicKey>,
466 ),
467 dkg_channel: (
468 impl Sender<PublicKey = PublicKey>,
469 impl Receiver<PublicKey = PublicKey>,
470 ),
471 subblocks_channel: (
472 impl Sender<PublicKey = PublicKey>,
473 impl Receiver<PublicKey = PublicKey>,
474 ),
475 ) -> eyre::Result<()> {
476 let peer_manager = self.peer_manager.start();
477
478 let broadcast = self.broadcast.start(broadcast_channel);
479 let resolver =
480 marshal::resolver::p2p::init(&self.context, self.resolver_config, marshal_channel);
481
482 let application = self.application.start(self.dkg_manager_mailbox.clone());
483 let executor = self.executor.start();
484
485 let marshal = self.marshal.start(
486 Reporters::from((
487 self.epoch_manager_mailbox,
488 Reporters::from((
489 self.executor_mailbox,
490 Reporters::from((self.dkg_manager_mailbox.clone(), self.peer_manager_mailbox)),
491 )),
492 )),
493 self.broadcast_mailbox,
494 resolver,
495 );
496
497 let epoch_manager =
498 self.epoch_manager
499 .start(votes_channel, certificates_channel, resolver_channel);
500
501 let feed = self.feed.start();
502
503 let dkg_manager = self.dkg_manager.start(dkg_channel);
504
505 let mut tasks = vec![
506 application,
507 broadcast,
508 epoch_manager,
509 executor,
510 feed,
511 marshal,
512 dkg_manager,
513 peer_manager,
514 ];
515
516 if let Some(subblocks) = self.subblocks {
517 tasks.push(self.context.spawn(|_| subblocks.run(subblocks_channel)));
518 } else {
519 drop(subblocks_channel);
520 }
521
522 try_join_all(tasks)
523 .await
524 .map(|_| ())
525 .wrap_err("one of the consensus engine's actors failed")
528 }
529}