1use std::{collections::BTreeMap, num::NonZeroUsize};
45
46use alloy_consensus::BlockHeader as _;
47use commonware_codec::ReadExt as _;
48use commonware_consensus::{
49 Reporters,
50 marshal::Update,
51 simplex::{self, elector, scheme::bls12381_threshold::vrf::Scheme},
52 types::{Epoch, EpochDelta, Epocher as _, Height},
53};
54use commonware_cryptography::ed25519::PublicKey;
55use commonware_macros::select;
56use commonware_p2p::{
57 Blocker, Receiver, Sender,
58 utils::mux::{Builder as _, MuxHandle, Muxer},
59};
60use commonware_parallel::Sequential;
61use commonware_runtime::{
62 BufferPooler, Clock, ContextCell, Handle, Metrics as _, Network, Spawner, Storage, spawn_cell,
63 telemetry::metrics::status::GaugeExt as _,
64};
65use commonware_utils::{Acknowledgement as _, vec::NonEmptyVec};
66use eyre::{ensure, eyre};
67use futures::{StreamExt as _, channel::mpsc};
68use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
69use rand_08::{CryptoRng, Rng};
70use tracing::{Level, Span, debug, error, error_span, info, instrument, warn, warn_span};
71
72use crate::{
73 consensus::Digest,
74 epoch::manager::ingress::{EpochTransition, Exit},
75};
76
77use super::ingress::{Content, Message};
78
79const REPLAY_BUFFER: NonZeroUsize = NonZeroUsize::new(8 * 1024 * 1024).expect("value is not zero"); const WRITE_BUFFER: NonZeroUsize = NonZeroUsize::new(1024 * 1024).expect("value is not zero"); pub(crate) struct Actor<TContext, TBlocker> {
83 active_epochs: BTreeMap<Epoch, (Handle<()>, ContextCell<TContext>)>,
84 config: super::Config<TBlocker>,
85 context: ContextCell<TContext>,
86 confirmed_latest_network_epoch: Option<Epoch>,
87 mailbox: mpsc::UnboundedReceiver<Message>,
88 metrics: Metrics,
89}
90
91impl<TContext, TBlocker> Actor<TContext, TBlocker>
92where
93 TBlocker: Blocker<PublicKey = PublicKey>,
94 TContext: BufferPooler
96 + Spawner
97 + commonware_runtime::Metrics
98 + Rng
99 + CryptoRng
100 + Clock
101 + governor::clock::Clock
102 + Storage
103 + Network,
104{
105 pub(super) fn new(
106 config: super::Config<TBlocker>,
107 context: TContext,
108 mailbox: mpsc::UnboundedReceiver<Message>,
109 ) -> Self {
110 let active_epochs = Gauge::default();
111 let latest_epoch = Gauge::default();
112 let latest_participants = Gauge::default();
113 let how_often_signer = Counter::default();
114 let how_often_verifier = Counter::default();
115
116 context.register(
117 "active_epochs",
118 "the number of epochs currently managed by the epoch manager",
119 active_epochs.clone(),
120 );
121 context.register(
122 "latest_epoch",
123 "the latest epoch managed by this epoch manager",
124 latest_epoch.clone(),
125 );
126 context.register(
127 "latest_participants",
128 "the number of participants in the most recently started epoch",
129 latest_participants.clone(),
130 );
131 context.register(
132 "how_often_signer",
133 "how often a node is a signer; a node is a signer if it has a share",
134 how_often_signer.clone(),
135 );
136 context.register(
137 "how_often_verifier",
138 "how often a node is a verifier; a node is a verifier if it does not have a share",
139 how_often_verifier.clone(),
140 );
141
142 Self {
143 config,
144 context: ContextCell::new(context),
145 mailbox,
146 metrics: Metrics {
147 active_epochs,
148 latest_epoch,
149 latest_participants,
150 how_often_signer,
151 how_often_verifier,
152 },
153 active_epochs: BTreeMap::new(),
154 confirmed_latest_network_epoch: None,
155 }
156 }
157
158 pub(crate) fn start(
159 mut self,
160 votes: (
161 impl Sender<PublicKey = PublicKey>,
162 impl Receiver<PublicKey = PublicKey>,
163 ),
164 certificates: (
165 impl Sender<PublicKey = PublicKey>,
166 impl Receiver<PublicKey = PublicKey>,
167 ),
168 resolver: (
169 impl Sender<PublicKey = PublicKey>,
170 impl Receiver<PublicKey = PublicKey>,
171 ),
172 ) -> Handle<()> {
173 spawn_cell!(self.context, self.run(votes, certificates, resolver))
174 }
175
176 async fn run(
177 mut self,
178 (vote_sender, vote_receiver): (
179 impl Sender<PublicKey = PublicKey>,
180 impl Receiver<PublicKey = PublicKey>,
181 ),
182 (certificate_sender, certificate_receiver): (
183 impl Sender<PublicKey = PublicKey>,
184 impl Receiver<PublicKey = PublicKey>,
185 ),
186 (resolver_sender, resolver_receiver): (
187 impl Sender<PublicKey = PublicKey>,
188 impl Receiver<PublicKey = PublicKey>,
189 ),
190 ) {
191 let (mux, mut vote_mux, mut vote_backup) = Muxer::builder(
192 self.context.with_label("vote_mux"),
193 vote_sender,
194 vote_receiver,
195 self.config.mailbox_size,
196 )
197 .with_backup()
198 .build();
199 mux.start();
200
201 let (mux, mut certificate_mux) = Muxer::builder(
202 self.context.with_label("certificate_mux"),
203 certificate_sender,
204 certificate_receiver,
205 self.config.mailbox_size,
206 )
207 .build();
208 mux.start();
209
210 let (mux, mut resolver_mux) = Muxer::new(
211 self.context.with_label("resolver_mux"),
212 resolver_sender,
213 resolver_receiver,
214 self.config.mailbox_size,
215 );
216 mux.start();
217
218 loop {
219 select!(
220 message = vote_backup.recv() => {
221 let Some((their_epoch, (from, _))) = message else {
222 error_span!("mux channel closed").in_scope(||
223 error!("vote p2p mux channel closed; exiting actor")
224 );
225 break;
226 };
227 self.handle_msg_for_unregistered_epoch(
228 Epoch::new(their_epoch),
229 from,
230 ).await;
231 },
232
233 msg = self.mailbox.next() => {
234 let Some(msg) = msg else {
235 warn_span!("mailboxes dropped").in_scope(||
236 warn!("all mailboxes dropped; exiting actor"
237 ));
238 break;
239 };
240 let cause = msg.cause;
241 match msg.content {
242 Content::Enter(enter) => {
243 let _: Result<_, _> = self
244 .enter(
245 cause,
246 enter,
247 &mut vote_mux,
248 &mut certificate_mux,
249 &mut resolver_mux,
250 )
251 .await;
252 }
253 Content::Exit(exit) => self.exit(cause, exit),
254 Content::Update(update) => {
255 match *update {
256 Update::Tip(_, height, digest) => {
257 let _ = self.handle_finalized_tip(height, digest).await;
258 }
259 Update::Block(_block, ack) => {
260 ack.acknowledge();
261 }
262 }
263 }
264 }
265 },
266 )
267 }
268 }
269
270 #[instrument(
271 parent = &cause,
272 skip_all,
273 fields(
274 %epoch,
275 network_identity = %public.public(),
276 ?participants,
277 ),
278 err(level = Level::WARN)
279 )]
280 async fn enter(
281 &mut self,
282 cause: Span,
283 EpochTransition {
284 epoch,
285 public,
286 share,
287 participants,
288 }: EpochTransition,
289 vote_mux: &mut MuxHandle<
290 impl Sender<PublicKey = PublicKey>,
291 impl Receiver<PublicKey = PublicKey>,
292 >,
293 certificates_mux: &mut MuxHandle<
294 impl Sender<PublicKey = PublicKey>,
295 impl Receiver<PublicKey = PublicKey>,
296 >,
297 resolver_mux: &mut MuxHandle<
298 impl Sender<PublicKey = PublicKey>,
299 impl Receiver<PublicKey = PublicKey>,
300 >,
301 ) -> eyre::Result<()> {
302 if let Some(latest) = self.active_epochs.last_key_value().map(|(k, _)| *k) {
303 ensure!(
304 epoch > latest,
305 "requested to start an epoch `{epoch}` older than the latest \
306 running, `{latest}`; refusing",
307 );
308 }
309
310 let n_participants = participants.len();
311 let is_signer = matches!(share, Some(..));
313 let scheme = if let Some(share) = share {
314 info!("we have a share for this epoch, participating as a signer",);
315 Scheme::signer(crate::config::NAMESPACE, participants, public, share)
316 .expect("our private share must match our slice of the public key")
317 } else {
318 info!("we don't have a share for this epoch, participating as a verifier",);
319 Scheme::verifier(crate::config::NAMESPACE, participants, public)
320 };
321 self.config.scheme_provider.register(epoch, scheme.clone());
322
323 let engine_ctx = self
326 .context
327 .with_label("simplex")
328 .with_attribute("epoch", epoch)
329 .with_scope();
330
331 let engine = simplex::Engine::new(
332 engine_ctx.clone(),
333 simplex::Config {
334 scheme,
335 elector: elector::Random,
336 blocker: self.config.blocker.clone(),
337 automaton: self.config.application.clone(),
338 relay: self.config.application.clone(),
339 reporter: Reporters::<_, crate::subblocks::Mailbox, _>::from((
340 self.config.subblocks.clone(),
341 Reporters::from((self.config.marshal.clone(), self.config.feed.clone())),
342 )),
343 partition: format!(
344 "{partition_prefix}_consensus_epoch_{epoch}",
345 partition_prefix = self.config.partition_prefix
346 ),
347 mailbox_size: self.config.mailbox_size,
348 epoch,
349
350 replay_buffer: REPLAY_BUFFER,
351 write_buffer: WRITE_BUFFER,
352 page_cache: self.config.page_cache.clone(),
353
354 leader_timeout: self.config.time_to_propose,
355 certification_timeout: self.config.time_to_collect_notarizations,
356 timeout_retry: self.config.time_to_retry_nullify_broadcast,
357 fetch_timeout: self.config.time_for_peer_response,
358 activity_timeout: self.config.views_to_track,
359 skip_timeout: self.config.views_until_leader_skip,
360
361 fetch_concurrent: crate::config::NUMBER_CONCURRENT_FETCHES,
362
363 forwarding: commonware_consensus::simplex::config::ForwardingPolicy::Disabled,
364
365 strategy: Sequential,
366 },
367 );
368
369 let vote = vote_mux.register(epoch.get()).await.unwrap();
370 let certificate = certificates_mux.register(epoch.get()).await.unwrap();
371 let resolver = resolver_mux.register(epoch.get()).await.unwrap();
372
373 assert!(
374 self.active_epochs
375 .insert(
376 epoch,
377 (engine.start(vote, certificate, resolver), engine_ctx)
378 )
379 .is_none(),
380 "there must be no other active engine running: this was ensured at \
381 the beginning of this method",
382 );
383
384 let latest = self.confirmed_latest_network_epoch.get_or_insert(epoch);
385 *latest = (*latest).max(epoch);
386
387 info!("started consensus engine backing the epoch");
388
389 self.metrics.latest_participants.set(n_participants as i64);
390 self.metrics.active_epochs.inc();
391 let _ = self.metrics.latest_epoch.try_set(epoch.get());
392 self.metrics.how_often_signer.inc_by(u64::from(is_signer));
393 self.metrics
394 .how_often_verifier
395 .inc_by(u64::from(!is_signer));
396
397 Ok(())
398 }
399
400 #[instrument(parent = &cause, skip_all, fields(epoch))]
401 fn exit(&mut self, cause: Span, Exit { epoch }: Exit) {
402 if let Some((engine, engine_ctx)) = self.active_epochs.remove(&epoch) {
403 drop(engine_ctx);
404 engine.abort();
405 info!("stopped engine backing epoch");
406 } else {
407 warn!(
408 "attempted to exit unknown epoch, but epoch was not backed by \
409 an active engine",
410 );
411 }
412
413 if let Some(to_delete) = epoch.checked_sub(EpochDelta::new(2))
423 && !self.config.scheme_provider.delete(&to_delete)
424 {
425 debug!(
426 to_exit = %epoch,
427 %to_delete,
428 "attempted to delete scheme for epoch, but epoch had no scheme \
429 registered"
430 );
431 }
432 }
433
434 #[instrument(
435 skip_all,
436 fields(%height, epoch = tracing::field::Empty),
437 err,
438 )]
439 async fn handle_finalized_tip(&mut self, height: Height, digest: Digest) -> eyre::Result<()> {
440 let epoch_info = self
441 .config
442 .epoch_strategy
443 .containing(height)
444 .expect("epoch strategy is valid for all epochs and heights");
445 Span::current().record("epoch", tracing::field::display(epoch_info.epoch()));
446
447 {
448 let network_epoch = self
449 .confirmed_latest_network_epoch
450 .get_or_insert(epoch_info.epoch());
451 *network_epoch = (*network_epoch).max(epoch_info.epoch());
452 }
453
454 if epoch_info.last() == height {
466 info!(
467 "the finalized tip is a boundary block; requesting the \
468 block to set the scheme for its epoch"
469 );
470 let block = self
471 .config
472 .marshal
473 .subscribe_by_digest(None, digest)
474 .await
475 .await
476 .map_err(|_| eyre!("marshal never returned the block"))?;
477 let onchain_outcome = tempo_dkg_onchain_artifacts::OnchainDkgOutcome::read(
478 &mut block.header().extra_data().as_ref(),
479 )
480 .expect("boundary blocks must contain DKG outcomes");
481 self.config.scheme_provider.register(
482 onchain_outcome.epoch,
483 Scheme::verifier(
484 crate::config::NAMESPACE,
485 onchain_outcome.players().clone(),
486 onchain_outcome.sharing().clone(),
487 ),
488 );
489 self.confirmed_latest_network_epoch
490 .replace(onchain_outcome.epoch);
491 debug!(
492 next_epoch = %onchain_outcome.epoch,
493 "read DKG outcome from boundary and registered scheme",
494 );
495 }
496 Ok(())
497 }
498
499 #[instrument(
507 skip_all,
508 fields(msg.epoch = %their_epoch, msg.from = %from),
509 )]
510 async fn handle_msg_for_unregistered_epoch(&mut self, their_epoch: Epoch, from: PublicKey) {
511 let reference_epoch = match (
512 self.active_epochs.keys().last().copied(),
513 self.confirmed_latest_network_epoch,
514 ) {
515 (Some(our), None) => our,
516 (Some(our), Some(confirmed_finalized)) => our.max(confirmed_finalized),
517 (None, Some(confirmed_finalized)) => confirmed_finalized,
518 (None, None) => {
519 debug!(
520 "received message for unregistered epoch, but we are \
521 neither running a consensus engine backing an epoch, nor \
522 do we know what the latest finalized epoch is; there is \
523 nothing to do",
524 );
525 return;
526 }
527 };
528
529 if reference_epoch >= their_epoch {
530 return;
531 }
532
533 let boundary_height = self
534 .config
535 .epoch_strategy
536 .last(reference_epoch)
537 .expect("our epoch strategy should cover all epochs");
538
539 tracing::debug!(
540 %reference_epoch,
541 %boundary_height,
542 "hinting to sync system that a finalization certificate might be \
543 available for our reference epoch",
544 );
545 self.config
546 .marshal
547 .hint_finalized(boundary_height, NonEmptyVec::new(from))
548 .await;
549 }
550}
551
552struct Metrics {
553 active_epochs: Gauge,
554 latest_epoch: Gauge,
555 latest_participants: Gauge,
556 how_often_signer: Counter,
557 how_often_verifier: Counter,
558}