1use std::{
6 net::SocketAddr,
7 num::{NonZeroU64, NonZeroUsize},
8 time::Duration,
9};
10
11use commonware_broadcast::buffered;
12use commonware_consensus::{Reporters, marshal};
13use commonware_cryptography::{
14 Signer as _,
15 bls12381::primitives::group::Share,
16 ed25519::{PrivateKey, PublicKey},
17};
18use commonware_p2p::{Blocker, Receiver, Sender};
19use commonware_runtime::{
20 Clock, ContextCell, Handle, Metrics, Network, Pacer, Spawner, Storage, buffer::PoolRef,
21 spawn_cell,
22};
23use commonware_utils::set::OrderedAssociated;
24use eyre::{OptionExt as _, WrapErr as _};
25use futures::future::try_join_all;
26use rand::{CryptoRng, Rng};
27use tempo_node::TempoFullNode;
28use tracing::info;
29
30use crate::{
31 config::{BLOCKS_FREEZER_TABLE_INITIAL_SIZE_BYTES, MARSHAL_LIMIT},
32 consensus::application,
33 dkg,
34 epoch::{self, SchemeProvider},
35 subblocks,
36};
37
38use super::block::Block;
39
40const SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER: u64 = 10;
45const PRUNABLE_ITEMS_PER_SECTION: NonZeroU64 = NonZeroU64::new(4_096).expect("value is not zero");
46const IMMUTABLE_ITEMS_PER_SECTION: NonZeroU64 =
47 NonZeroU64::new(262_144).expect("value is not zero");
48const FREEZER_TABLE_RESIZE_FREQUENCY: u8 = 4;
49const FREEZER_TABLE_RESIZE_CHUNK_SIZE: u32 = 2u32.pow(16); const FREEZER_JOURNAL_TARGET_SIZE: u64 = 1024 * 1024 * 1024; const FREEZER_JOURNAL_COMPRESSION: Option<u8> = Some(3);
52const REPLAY_BUFFER: NonZeroUsize = NonZeroUsize::new(8 * 1024 * 1024).expect("value is not zero"); const WRITE_BUFFER: NonZeroUsize = NonZeroUsize::new(1024 * 1024).expect("value is not zero"); const BUFFER_POOL_PAGE_SIZE: NonZeroUsize = NonZeroUsize::new(4_096).expect("value is not zero"); const BUFFER_POOL_CAPACITY: NonZeroUsize = NonZeroUsize::new(8_192).expect("value is not zero"); const MAX_REPAIR: NonZeroU64 = NonZeroU64::new(20).expect("value is not zero");
57
58#[derive(Clone)]
63pub struct Builder<TBlocker, TContext, TPeerManager> {
64 pub context: TContext,
66
67 pub fee_recipient: alloy_primitives::Address,
68
69 pub execution_node: Option<TempoFullNode>,
70
71 pub blocker: TBlocker,
72 pub peer_manager: TPeerManager,
73
74 pub partition_prefix: String,
75 pub signer: PrivateKey,
76 pub share: Option<Share>,
77 pub mailbox_size: usize,
78 pub deque_size: usize,
79
80 pub time_to_propose: Duration,
81 pub time_to_collect_notarizations: Duration,
82 pub time_to_retry_nullify_broadcast: Duration,
83 pub time_for_peer_response: Duration,
84 pub views_to_track: u64,
85 pub views_until_leader_skip: u64,
86 pub new_payload_wait_time: Duration,
87 pub time_to_build_subblock: Duration,
88 pub subblock_broadcast_interval: Duration,
89}
90
91impl<TBlocker, TContext, TPeerManager> Builder<TBlocker, TContext, TPeerManager>
92where
93 TBlocker: Blocker<PublicKey = PublicKey>,
94 TContext: Clock
95 + governor::clock::Clock
96 + Rng
97 + CryptoRng
98 + Pacer
99 + Spawner
100 + Storage
101 + Metrics
102 + Network,
103 TPeerManager: commonware_p2p::Manager<
104 PublicKey = PublicKey,
105 Peers = OrderedAssociated<PublicKey, SocketAddr>,
106 >,
107{
108 pub fn with_execution_node(mut self, execution_node: TempoFullNode) -> Self {
109 self.execution_node = Some(execution_node);
110 self
111 }
112
113 pub async fn try_init(self) -> eyre::Result<Engine<TBlocker, TContext, TPeerManager>> {
114 let execution_node = self
115 .execution_node
116 .clone()
117 .ok_or_eyre("execution_node must be set using with_execution_node()")?;
118
119 let epoch_length = execution_node
120 .chain_spec()
121 .info
122 .epoch_length()
123 .ok_or_eyre("chainspec did not contain epochLength; cannot go on without it")?;
124
125 info!(
126 identity = %self.signer.public_key(),
127 "using public ed25519 verifying key derived from provided private ed25519 signing key",
128 );
129
130 let (broadcast, broadcast_mailbox) = buffered::Engine::new(
131 self.context.with_label("broadcast"),
132 buffered::Config {
133 public_key: self.signer.public_key(),
134 mailbox_size: self.mailbox_size,
135 deque_size: self.deque_size,
136 priority: true,
137 codec_config: (),
138 },
139 );
140
141 let buffer_pool = PoolRef::new(BUFFER_POOL_PAGE_SIZE, BUFFER_POOL_CAPACITY);
143
144 let resolver_config = commonware_consensus::marshal::resolver::p2p::Config {
148 public_key: self.signer.public_key(),
149 manager: self.peer_manager.clone(),
150 mailbox_size: self.mailbox_size,
151 blocker: self.blocker.clone(),
152 requester_config: commonware_p2p::utils::requester::Config {
153 me: Some(self.signer.public_key()),
154 rate_limit: MARSHAL_LIMIT,
155 initial: Duration::from_secs(1),
156 timeout: Duration::from_secs(2),
157 },
158 fetch_retry_timeout: Duration::from_millis(100),
159 priority_requests: false,
160 priority_responses: false,
161 };
162 let scheme_provider = SchemeProvider::new();
163 let (marshal, marshal_mailbox) = marshal::Actor::init(
164 self.context.with_label("marshal"),
165 marshal::Config {
166 scheme_provider: scheme_provider.clone(),
167 epoch_length,
168 partition_prefix: self.partition_prefix.clone(),
169 mailbox_size: self.mailbox_size,
170 view_retention_timeout: self
171 .views_to_track
172 .saturating_mul(SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER),
173 namespace: crate::config::NAMESPACE.to_vec(),
174 prunable_items_per_section: PRUNABLE_ITEMS_PER_SECTION,
175 immutable_items_per_section: IMMUTABLE_ITEMS_PER_SECTION,
176 freezer_table_initial_size: BLOCKS_FREEZER_TABLE_INITIAL_SIZE_BYTES,
177 freezer_table_resize_frequency: FREEZER_TABLE_RESIZE_FREQUENCY,
178 freezer_table_resize_chunk_size: FREEZER_TABLE_RESIZE_CHUNK_SIZE,
179 freezer_journal_target_size: FREEZER_JOURNAL_TARGET_SIZE,
180 freezer_journal_compression: FREEZER_JOURNAL_COMPRESSION,
181
182 freezer_journal_buffer_pool: buffer_pool.clone(),
183
184 replay_buffer: REPLAY_BUFFER,
185 write_buffer: WRITE_BUFFER,
186 block_codec_config: (),
187 max_repair: MAX_REPAIR,
188 _marker: std::marker::PhantomData,
189 },
190 )
191 .await;
192
193 let subblocks = subblocks::Actor::new(subblocks::Config {
194 context: self.context.clone(),
195 signer: self.signer.clone(),
196 scheme_provider: scheme_provider.clone(),
197 node: execution_node.clone(),
198 fee_recipient: self.fee_recipient,
199 time_to_build_subblock: self.time_to_build_subblock,
200 subblock_broadcast_interval: self.subblock_broadcast_interval,
201 epoch_length,
202 });
203
204 let (application, application_mailbox) = application::init(super::application::Config {
205 context: self.context.with_label("application"),
206 fee_recipient: self.fee_recipient,
208 mailbox_size: self.mailbox_size,
209 marshal: marshal_mailbox.clone(),
210 execution_node: execution_node.clone(),
211 new_payload_wait_time: self.new_payload_wait_time,
212 subblocks: subblocks.mailbox(),
213 scheme_provider: scheme_provider.clone(),
214 epoch_length,
215 })
216 .await
217 .wrap_err("failed initializing application actor")?;
218
219 let (epoch_manager, epoch_manager_mailbox) = epoch::manager::init(
220 epoch::manager::Config {
221 application: application_mailbox.clone(),
222 blocker: self.blocker.clone(),
223 buffer_pool: buffer_pool.clone(),
224 epoch_length,
225 time_for_peer_response: self.time_for_peer_response,
226 time_to_propose: self.time_to_propose,
227 mailbox_size: self.mailbox_size,
228 subblocks: subblocks.mailbox(),
229 marshal: marshal_mailbox.clone(),
230 scheme_provider: scheme_provider.clone(),
231 time_to_collect_notarizations: self.time_to_collect_notarizations,
232 time_to_retry_nullify_broadcast: self.time_to_retry_nullify_broadcast,
233 partition_prefix: format!("{}_epoch_manager", self.partition_prefix),
234 views_to_track: self.views_to_track,
235 views_until_leader_skip: self.views_until_leader_skip,
236 },
237 self.context.with_label("epoch_manager"),
238 );
239
240 let (dkg_manager, dkg_manager_mailbox) = dkg::manager::init(
241 self.context.with_label("dkg_manager"),
242 dkg::manager::Config {
243 epoch_manager: epoch_manager_mailbox,
244 epoch_length,
245 execution_node,
246 initial_share: self.share.clone(),
247 mailbox_size: self.mailbox_size,
248 marshal: marshal_mailbox,
249 namespace: crate::config::NAMESPACE.to_vec(),
250 me: self.signer.clone(),
251 partition_prefix: format!("{}_dkg_manager", self.partition_prefix),
252 peer_manager: self.peer_manager.clone(),
253 },
254 )
255 .await
256 .wrap_err("failed initializing dkg manager")?;
257
258 Ok(Engine {
259 context: ContextCell::new(self.context),
260
261 broadcast,
262 broadcast_mailbox,
263
264 dkg_manager,
265 dkg_manager_mailbox,
266
267 application,
268 application_mailbox,
269
270 resolver_config,
271 marshal,
272
273 epoch_manager,
274
275 subblocks,
276 })
277 }
278}
279
280pub struct Engine<TBlocker, TContext, TPeerManager>
281where
282 TBlocker: Blocker<PublicKey = PublicKey>,
283 TContext: Clock
284 + governor::clock::Clock
285 + Rng
286 + CryptoRng
287 + Metrics
288 + Network
289 + Pacer
290 + Spawner
291 + Storage,
292 TPeerManager: commonware_p2p::Manager<
293 PublicKey = PublicKey,
294 Peers = OrderedAssociated<PublicKey, SocketAddr>,
295 >,
296{
297 context: ContextCell<TContext>,
298
299 broadcast: buffered::Engine<TContext, PublicKey, Block>,
302 broadcast_mailbox: buffered::Mailbox<PublicKey, Block>,
303
304 dkg_manager: dkg::manager::Actor<TContext, TPeerManager>,
305 dkg_manager_mailbox: dkg::manager::Mailbox,
306
307 application: application::Actor<TContext>,
309 application_mailbox: application::Mailbox,
310
311 resolver_config: marshal::resolver::p2p::Config<PublicKey, TPeerManager, TBlocker>,
313
314 marshal: crate::alias::marshal::Actor<TContext>,
317
318 epoch_manager: epoch::manager::Actor<TBlocker, TContext>,
319
320 subblocks: subblocks::Actor<TContext>,
321}
322
323impl<TBlocker, TContext, TPeerManager> Engine<TBlocker, TContext, TPeerManager>
324where
325 TBlocker: Blocker<PublicKey = PublicKey>,
326 TContext: Clock
327 + governor::clock::Clock
328 + Rng
329 + CryptoRng
330 + Metrics
331 + Network
332 + Pacer
333 + Spawner
334 + Storage,
335 TPeerManager: commonware_p2p::Manager<
336 PublicKey = PublicKey,
337 Peers = OrderedAssociated<PublicKey, SocketAddr>,
338 >,
339{
340 #[expect(
341 clippy::too_many_arguments,
342 reason = "following commonware's style of writing"
343 )]
344 pub fn start(
345 mut self,
346 pending_network: (
347 impl Sender<PublicKey = PublicKey>,
348 impl Receiver<PublicKey = PublicKey>,
349 ),
350 recovered_network: (
351 impl Sender<PublicKey = PublicKey>,
352 impl Receiver<PublicKey = PublicKey>,
353 ),
354 resolver_network: (
355 impl Sender<PublicKey = PublicKey>,
356 impl Receiver<PublicKey = PublicKey>,
357 ),
358 broadcast_network: (
359 impl Sender<PublicKey = PublicKey>,
360 impl Receiver<PublicKey = PublicKey>,
361 ),
362 marshal_network: (
363 impl Sender<PublicKey = PublicKey>,
364 impl Receiver<PublicKey = PublicKey>,
365 ),
366 dkg_channel: (
367 impl Sender<PublicKey = PublicKey>,
368 impl Receiver<PublicKey = PublicKey>,
369 ),
370 boundary_certificates_channel: (
371 impl Sender<PublicKey = PublicKey>,
372 impl Receiver<PublicKey = PublicKey>,
373 ),
374 subblocks_channel: (
375 impl Sender<PublicKey = PublicKey>,
376 impl Receiver<PublicKey = PublicKey>,
377 ),
378 ) -> Handle<eyre::Result<()>> {
379 spawn_cell!(
380 self.context,
381 self.run(
382 pending_network,
383 recovered_network,
384 resolver_network,
385 broadcast_network,
386 marshal_network,
387 dkg_channel,
388 boundary_certificates_channel,
389 subblocks_channel,
390 )
391 .await
392 )
393 }
394
395 #[expect(
396 clippy::too_many_arguments,
397 reason = "following commonware's style of writing"
398 )]
399 async fn run(
400 self,
401 pending_channel: (
402 impl Sender<PublicKey = PublicKey>,
403 impl Receiver<PublicKey = PublicKey>,
404 ),
405 recovered_channel: (
406 impl Sender<PublicKey = PublicKey>,
407 impl Receiver<PublicKey = PublicKey>,
408 ),
409 resolver_channel: (
410 impl Sender<PublicKey = PublicKey>,
411 impl Receiver<PublicKey = PublicKey>,
412 ),
413 broadcast_channel: (
414 impl Sender<PublicKey = PublicKey>,
415 impl Receiver<PublicKey = PublicKey>,
416 ),
417 marshal_channel: (
418 impl Sender<PublicKey = PublicKey>,
419 impl Receiver<PublicKey = PublicKey>,
420 ),
421 dkg_channel: (
422 impl Sender<PublicKey = PublicKey>,
423 impl Receiver<PublicKey = PublicKey>,
424 ),
425 boundary_certificates_channel: (
426 impl Sender<PublicKey = PublicKey>,
427 impl Receiver<PublicKey = PublicKey>,
428 ),
429 subblocks_channel: (
430 impl Sender<PublicKey = PublicKey>,
431 impl Receiver<PublicKey = PublicKey>,
432 ),
433 ) -> eyre::Result<()> {
434 let broadcast = self.broadcast.start(broadcast_channel);
435 let resolver =
436 marshal::resolver::p2p::init(&self.context, self.resolver_config, marshal_channel);
437
438 let application = self.application.start(self.dkg_manager_mailbox.clone());
439
440 let marshal = self.marshal.start(
441 Reporters::from((self.application_mailbox, self.dkg_manager_mailbox.clone())),
442 self.broadcast_mailbox,
443 resolver,
444 );
445
446 let epoch_manager = self.epoch_manager.start(
447 pending_channel,
448 recovered_channel,
449 resolver_channel,
450 boundary_certificates_channel,
451 );
452
453 let subblocks = self
454 .context
455 .spawn(|_| self.subblocks.run(subblocks_channel));
456
457 let dkg_manager = self.dkg_manager.start(dkg_channel);
458
459 try_join_all(vec![
460 application,
461 broadcast,
462 epoch_manager,
463 marshal,
464 dkg_manager,
465 subblocks,
466 ])
467 .await
468 .map(|_| ())
469 .wrap_err("one of the consensus engine's actors failed")
472 }
473}