1use std::{collections::BTreeMap, num::NonZeroUsize};
45
46use alloy_consensus::BlockHeader as _;
47use commonware_codec::ReadExt as _;
48use commonware_consensus::{
49 Reporters,
50 marshal::Update,
51 simplex::{self, elector, scheme::bls12381_threshold::vrf::Scheme},
52 types::{Epoch, EpochDelta, Epocher as _, Height},
53};
54use commonware_cryptography::ed25519::PublicKey;
55use commonware_macros::select;
56use commonware_p2p::{
57 Blocker, Receiver, Sender,
58 utils::mux::{Builder as _, MuxHandle, Muxer},
59};
60use commonware_parallel::Sequential;
61use commonware_runtime::{
62 BufferPooler, Clock, ContextCell, Handle, Metrics as _, Network, Spawner, Storage, spawn_cell,
63 telemetry::metrics::status::GaugeExt as _,
64};
65use commonware_utils::{Acknowledgement as _, vec::NonEmptyVec};
66use eyre::{ensure, eyre};
67use futures::{StreamExt as _, channel::mpsc};
68use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
69use rand_08::{CryptoRng, Rng};
70use tracing::{Level, Span, debug, error, error_span, info, instrument, warn, warn_span};
71
72use crate::{
73 consensus::Digest,
74 epoch::manager::ingress::{EpochTransition, Exit},
75};
76
77use super::ingress::{Content, Message};
78
79const REPLAY_BUFFER: NonZeroUsize = NonZeroUsize::new(8 * 1024 * 1024).expect("value is not zero"); const WRITE_BUFFER: NonZeroUsize = NonZeroUsize::new(1024 * 1024).expect("value is not zero"); pub(crate) struct Actor<TContext, TBlocker> {
83 active_epochs: BTreeMap<Epoch, (Handle<()>, ContextCell<TContext>)>,
84 config: super::Config<TBlocker>,
85 context: ContextCell<TContext>,
86 confirmed_latest_network_epoch: Option<Epoch>,
87 mailbox: mpsc::UnboundedReceiver<Message>,
88 metrics: Metrics,
89}
90
91impl<TContext, TBlocker> Actor<TContext, TBlocker>
92where
93 TBlocker: Blocker<PublicKey = PublicKey>,
94 TContext: BufferPooler
96 + Spawner
97 + commonware_runtime::Metrics
98 + Rng
99 + CryptoRng
100 + Clock
101 + governor::clock::Clock
102 + Storage
103 + Network,
104{
105 pub(super) fn new(
106 config: super::Config<TBlocker>,
107 context: TContext,
108 mailbox: mpsc::UnboundedReceiver<Message>,
109 ) -> Self {
110 let active_epochs = Gauge::default();
111 let latest_epoch = Gauge::default();
112 let latest_participants = Gauge::default();
113 let how_often_signer = Counter::default();
114 let how_often_verifier = Counter::default();
115
116 context.register(
117 "active_epochs",
118 "the number of epochs currently managed by the epoch manager",
119 active_epochs.clone(),
120 );
121 context.register(
122 "latest_epoch",
123 "the latest epoch managed by this epoch manager",
124 latest_epoch.clone(),
125 );
126 context.register(
127 "latest_participants",
128 "the number of participants in the most recently started epoch",
129 latest_participants.clone(),
130 );
131 context.register(
132 "how_often_signer",
133 "how often a node is a signer; a node is a signer if it has a share",
134 how_often_signer.clone(),
135 );
136 context.register(
137 "how_often_verifier",
138 "how often a node is a verifier; a node is a verifier if it does not have a share",
139 how_often_verifier.clone(),
140 );
141
142 Self {
143 config,
144 context: ContextCell::new(context),
145 mailbox,
146 metrics: Metrics {
147 active_epochs,
148 latest_epoch,
149 latest_participants,
150 how_often_signer,
151 how_often_verifier,
152 },
153 active_epochs: BTreeMap::new(),
154 confirmed_latest_network_epoch: None,
155 }
156 }
157
158 pub(crate) fn start(
159 mut self,
160 votes: (
161 impl Sender<PublicKey = PublicKey>,
162 impl Receiver<PublicKey = PublicKey>,
163 ),
164 certificates: (
165 impl Sender<PublicKey = PublicKey>,
166 impl Receiver<PublicKey = PublicKey>,
167 ),
168 resolver: (
169 impl Sender<PublicKey = PublicKey>,
170 impl Receiver<PublicKey = PublicKey>,
171 ),
172 ) -> Handle<()> {
173 spawn_cell!(self.context, self.run(votes, certificates, resolver))
174 }
175
176 async fn run(
177 mut self,
178 (vote_sender, vote_receiver): (
179 impl Sender<PublicKey = PublicKey>,
180 impl Receiver<PublicKey = PublicKey>,
181 ),
182 (certificate_sender, certificate_receiver): (
183 impl Sender<PublicKey = PublicKey>,
184 impl Receiver<PublicKey = PublicKey>,
185 ),
186 (resolver_sender, resolver_receiver): (
187 impl Sender<PublicKey = PublicKey>,
188 impl Receiver<PublicKey = PublicKey>,
189 ),
190 ) {
191 let (mux, mut vote_mux, mut vote_backup) = Muxer::builder(
192 self.context.with_label("vote_mux"),
193 vote_sender,
194 vote_receiver,
195 self.config.mailbox_size,
196 )
197 .with_backup()
198 .build();
199 mux.start();
200
201 let (mux, mut certificate_mux) = Muxer::builder(
202 self.context.with_label("certificate_mux"),
203 certificate_sender,
204 certificate_receiver,
205 self.config.mailbox_size,
206 )
207 .build();
208 mux.start();
209
210 let (mux, mut resolver_mux) = Muxer::new(
211 self.context.with_label("resolver_mux"),
212 resolver_sender,
213 resolver_receiver,
214 self.config.mailbox_size,
215 );
216 mux.start();
217
218 loop {
219 select!(
220 message = vote_backup.recv() => {
221 let Some((their_epoch, (from, _))) = message else {
222 error_span!("mux channel closed").in_scope(||
223 error!("vote p2p mux channel closed; exiting actor")
224 );
225 break;
226 };
227 self.handle_msg_for_unregistered_epoch(
228 Epoch::new(their_epoch),
229 from,
230 ).await;
231 },
232
233 msg = self.mailbox.next() => {
234 let Some(msg) = msg else {
235 warn_span!("mailboxes dropped").in_scope(||
236 warn!("all mailboxes dropped; exiting actor"
237 ));
238 break;
239 };
240 let cause = msg.cause;
241 match msg.content {
242 Content::Enter(enter) => {
243 let _: Result<_, _> = self
244 .enter(
245 cause,
246 enter,
247 &mut vote_mux,
248 &mut certificate_mux,
249 &mut resolver_mux,
250 )
251 .await;
252 }
253 Content::Exit(exit) => self.exit(cause, exit),
254 Content::Update(update) => {
255 match *update {
256 Update::Tip(_, height, digest) => {
257 let _ = self.handle_finalized_tip(height, digest).await;
258 }
259 Update::Block(_block, ack) => {
260 ack.acknowledge();
261 }
262 }
263 }
264 }
265 },
266 )
267 }
268 }
269
270 #[instrument(
271 parent = &cause,
272 skip_all,
273 fields(
274 %epoch,
275 network_identity = %public.public(),
276 ?participants,
277 ),
278 err(level = Level::WARN)
279 )]
280 async fn enter(
281 &mut self,
282 cause: Span,
283 EpochTransition {
284 epoch,
285 public,
286 share,
287 participants,
288 }: EpochTransition,
289 vote_mux: &mut MuxHandle<
290 impl Sender<PublicKey = PublicKey>,
291 impl Receiver<PublicKey = PublicKey>,
292 >,
293 certificates_mux: &mut MuxHandle<
294 impl Sender<PublicKey = PublicKey>,
295 impl Receiver<PublicKey = PublicKey>,
296 >,
297 resolver_mux: &mut MuxHandle<
298 impl Sender<PublicKey = PublicKey>,
299 impl Receiver<PublicKey = PublicKey>,
300 >,
301 ) -> eyre::Result<()> {
302 if let Some(latest) = self.active_epochs.last_key_value().map(|(k, _)| *k) {
303 ensure!(
304 epoch > latest,
305 "requested to start an epoch `{epoch}` older than the latest \
306 running, `{latest}`; refusing",
307 );
308 }
309
310 let n_participants = participants.len();
311 let is_signer = matches!(share, Some(..));
313 let scheme = if let Some(share) = share {
314 info!("we have a share for this epoch, participating as a signer",);
315 Scheme::signer(crate::config::NAMESPACE, participants, public, share)
316 .expect("our private share must match our slice of the public key")
317 } else {
318 info!("we don't have a share for this epoch, participating as a verifier",);
319 Scheme::verifier(crate::config::NAMESPACE, participants, public)
320 };
321 self.config.scheme_provider.register(epoch, scheme.clone());
322
323 let engine_ctx = self
326 .context
327 .with_label("simplex")
328 .with_attribute("epoch", epoch)
329 .with_scope();
330
331 let engine = simplex::Engine::new(
332 engine_ctx.clone(),
333 simplex::Config {
334 scheme,
335 elector: elector::Random,
336 blocker: self.config.blocker.clone(),
337 automaton: self.config.application.clone(),
338 relay: self.config.application.clone(),
339 reporter: Reporters::<_, crate::subblocks::Mailbox, _>::from((
340 self.config.subblocks.clone(),
341 Reporters::from((self.config.marshal.clone(), self.config.feed.clone())),
342 )),
343 partition: format!(
344 "{partition_prefix}_consensus_epoch_{epoch}",
345 partition_prefix = self.config.partition_prefix
346 ),
347 mailbox_size: self.config.mailbox_size,
348 epoch,
349
350 replay_buffer: REPLAY_BUFFER,
351 write_buffer: WRITE_BUFFER,
352 page_cache: self.config.page_cache.clone(),
353
354 leader_timeout: self.config.time_to_propose,
355 certification_timeout: self.config.time_to_collect_notarizations,
356 timeout_retry: self.config.time_to_retry_nullify_broadcast,
357 fetch_timeout: self.config.time_for_peer_response,
358 activity_timeout: self.config.views_to_track,
359 skip_timeout: self.config.views_until_leader_skip,
360
361 fetch_concurrent: crate::config::NUMBER_CONCURRENT_FETCHES,
362
363 forwarding: commonware_consensus::simplex::config::ForwardingPolicy::Disabled,
364
365 strategy: Sequential,
366 },
367 );
368
369 let vote = vote_mux.register(epoch.get()).await.unwrap();
370 let certificate = certificates_mux.register(epoch.get()).await.unwrap();
371 let resolver = resolver_mux.register(epoch.get()).await.unwrap();
372
373 assert!(
374 self.active_epochs
375 .insert(
376 epoch,
377 (engine.start(vote, certificate, resolver), engine_ctx)
378 )
379 .is_none(),
380 "there must be no other active engine running: this was ensured at \
381 the beginning of this method",
382 );
383
384 let latest = self.confirmed_latest_network_epoch.get_or_insert(epoch);
385 *latest = (*latest).max(epoch);
386
387 info!("started consensus engine backing the epoch");
388
389 self.metrics.latest_participants.set(n_participants as i64);
390 self.metrics.active_epochs.inc();
391 let _ = self.metrics.latest_epoch.try_set(epoch.get());
392 self.metrics.how_often_signer.inc_by(is_signer as u64);
393 self.metrics.how_often_verifier.inc_by(!is_signer as u64);
394
395 Ok(())
396 }
397
398 #[instrument(parent = &cause, skip_all, fields(epoch))]
399 fn exit(&mut self, cause: Span, Exit { epoch }: Exit) {
400 if let Some((engine, engine_ctx)) = self.active_epochs.remove(&epoch) {
401 drop(engine_ctx);
402 engine.abort();
403 info!("stopped engine backing epoch");
404 } else {
405 warn!(
406 "attempted to exit unknown epoch, but epoch was not backed by \
407 an active engine",
408 );
409 }
410
411 if let Some(to_delete) = epoch.checked_sub(EpochDelta::new(2))
421 && !self.config.scheme_provider.delete(&to_delete)
422 {
423 debug!(
424 to_exit = %epoch,
425 %to_delete,
426 "attempted to delete scheme for epoch, but epoch had no scheme \
427 registered"
428 );
429 }
430 }
431
432 #[instrument(
433 skip_all,
434 fields(%height, epoch = tracing::field::Empty),
435 err,
436 )]
437 async fn handle_finalized_tip(&mut self, height: Height, digest: Digest) -> eyre::Result<()> {
438 let epoch_info = self
439 .config
440 .epoch_strategy
441 .containing(height)
442 .expect("epoch strategy is valid for all epochs and heights");
443 Span::current().record("epoch", tracing::field::display(epoch_info.epoch()));
444
445 {
446 let network_epoch = self
447 .confirmed_latest_network_epoch
448 .get_or_insert(epoch_info.epoch());
449 *network_epoch = (*network_epoch).max(epoch_info.epoch());
450 }
451
452 if epoch_info.last() == height {
464 info!(
465 "the finalized tip is a boundary block; requesting the \
466 block to set the scheme for its epoch"
467 );
468 let block = self
469 .config
470 .marshal
471 .subscribe_by_digest(None, digest)
472 .await
473 .await
474 .map_err(|_| eyre!("marshal never returned the block"))?;
475 let onchain_outcome = tempo_dkg_onchain_artifacts::OnchainDkgOutcome::read(
476 &mut block.header().extra_data().as_ref(),
477 )
478 .expect("boundary blocks must contain DKG outcomes");
479 self.config.scheme_provider.register(
480 onchain_outcome.epoch,
481 Scheme::verifier(
482 crate::config::NAMESPACE,
483 onchain_outcome.players().clone(),
484 onchain_outcome.sharing().clone(),
485 ),
486 );
487 self.confirmed_latest_network_epoch
488 .replace(onchain_outcome.epoch);
489 debug!(
490 next_epoch = %onchain_outcome.epoch,
491 "read DKG outcome from boundary and registered scheme",
492 );
493 }
494 Ok(())
495 }
496
497 #[instrument(
505 skip_all,
506 fields(msg.epoch = %their_epoch, msg.from = %from),
507 )]
508 async fn handle_msg_for_unregistered_epoch(&mut self, their_epoch: Epoch, from: PublicKey) {
509 let reference_epoch = match (
510 self.active_epochs.keys().last().copied(),
511 self.confirmed_latest_network_epoch,
512 ) {
513 (Some(our), None) => our,
514 (Some(our), Some(confirmed_finalized)) => our.max(confirmed_finalized),
515 (None, Some(confirmed_finalized)) => confirmed_finalized,
516 (None, None) => {
517 debug!(
518 "received message for unregistered epoch, but we are \
519 neither running a consensus engine backing an epoch, nor \
520 do we know what the latest finalized epoch is; there is \
521 nothing to do",
522 );
523 return;
524 }
525 };
526
527 if reference_epoch >= their_epoch {
528 return;
529 }
530
531 let boundary_height = self
532 .config
533 .epoch_strategy
534 .last(reference_epoch)
535 .expect("our epoch strategy should cover all epochs");
536
537 tracing::debug!(
538 %reference_epoch,
539 %boundary_height,
540 "hinting to sync system that a finalization certificate might be \
541 available for our reference epoch",
542 );
543 self.config
544 .marshal
545 .hint_finalized(boundary_height, NonEmptyVec::new(from))
546 .await;
547 }
548}
549
550struct Metrics {
551 active_epochs: Gauge,
552 latest_epoch: Gauge,
553 latest_participants: Gauge,
554 how_often_signer: Counter,
555 how_often_verifier: Counter,
556}