1use std::{collections::BTreeMap, num::NonZeroUsize};
45
46use alloy_consensus::BlockHeader as _;
47use commonware_codec::ReadExt as _;
48use commonware_consensus::{
49 Reporters,
50 marshal::Update,
51 simplex::{self, elector, scheme::bls12381_threshold::vrf::Scheme},
52 types::{Epoch, Epocher as _, Height},
53};
54use commonware_cryptography::ed25519::PublicKey;
55use commonware_macros::select;
56use commonware_p2p::{
57 Blocker, Receiver, Sender,
58 utils::mux::{Builder as _, MuxHandle, Muxer},
59};
60use commonware_parallel::Sequential;
61use commonware_runtime::{
62 BufferPooler, Clock, ContextCell, Handle, Metrics as _, Network, Spawner, Storage, spawn_cell,
63 telemetry::metrics::status::GaugeExt as _,
64};
65use commonware_utils::{Acknowledgement as _, vec::NonEmptyVec};
66use eyre::{ensure, eyre};
67use futures::{StreamExt as _, channel::mpsc};
68use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
69use rand_08::{CryptoRng, Rng};
70use tracing::{Level, Span, debug, error, error_span, info, instrument, warn, warn_span};
71
72use crate::{
73 consensus::Digest,
74 epoch::manager::ingress::{EpochTransition, Exit},
75};
76
77use super::ingress::{Content, Message};
78
79const REPLAY_BUFFER: NonZeroUsize = NonZeroUsize::new(8 * 1024 * 1024).expect("value is not zero"); const WRITE_BUFFER: NonZeroUsize = NonZeroUsize::new(1024 * 1024).expect("value is not zero"); pub(crate) struct Actor<TContext, TBlocker> {
83 active_epochs: BTreeMap<Epoch, (Handle<()>, ContextCell<TContext>)>,
84 config: super::Config<TBlocker>,
85 context: ContextCell<TContext>,
86 confirmed_latest_network_epoch: Option<Epoch>,
87 mailbox: mpsc::UnboundedReceiver<Message>,
88 metrics: Metrics,
89}
90
91impl<TContext, TBlocker> Actor<TContext, TBlocker>
92where
93 TBlocker: Blocker<PublicKey = PublicKey>,
94 TContext: BufferPooler
96 + Spawner
97 + commonware_runtime::Metrics
98 + Rng
99 + CryptoRng
100 + Clock
101 + governor::clock::Clock
102 + Storage
103 + Network,
104{
105 pub(super) fn new(
106 config: super::Config<TBlocker>,
107 context: TContext,
108 mailbox: mpsc::UnboundedReceiver<Message>,
109 ) -> Self {
110 let active_epochs = Gauge::default();
111 let latest_epoch = Gauge::default();
112 let latest_participants = Gauge::default();
113 let how_often_signer = Counter::default();
114 let how_often_verifier = Counter::default();
115
116 context.register(
117 "active_epochs",
118 "the number of epochs currently managed by the epoch manager",
119 active_epochs.clone(),
120 );
121 context.register(
122 "latest_epoch",
123 "the latest epoch managed by this epoch manager",
124 latest_epoch.clone(),
125 );
126 context.register(
127 "latest_participants",
128 "the number of participants in the most recently started epoch",
129 latest_participants.clone(),
130 );
131 context.register(
132 "how_often_signer",
133 "how often a node is a signer; a node is a signer if it has a share",
134 how_often_signer.clone(),
135 );
136 context.register(
137 "how_often_verifier",
138 "how often a node is a verifier; a node is a verifier if it does not have a share",
139 how_often_verifier.clone(),
140 );
141
142 Self {
143 config,
144 context: ContextCell::new(context),
145 mailbox,
146 metrics: Metrics {
147 active_epochs,
148 latest_epoch,
149 latest_participants,
150 how_often_signer,
151 how_often_verifier,
152 },
153 active_epochs: BTreeMap::new(),
154 confirmed_latest_network_epoch: None,
155 }
156 }
157
158 pub(crate) fn start(
159 mut self,
160 votes: (
161 impl Sender<PublicKey = PublicKey>,
162 impl Receiver<PublicKey = PublicKey>,
163 ),
164 certificates: (
165 impl Sender<PublicKey = PublicKey>,
166 impl Receiver<PublicKey = PublicKey>,
167 ),
168 resolver: (
169 impl Sender<PublicKey = PublicKey>,
170 impl Receiver<PublicKey = PublicKey>,
171 ),
172 ) -> Handle<()> {
173 spawn_cell!(self.context, self.run(votes, certificates, resolver).await)
174 }
175
176 async fn run(
177 mut self,
178 (vote_sender, vote_receiver): (
179 impl Sender<PublicKey = PublicKey>,
180 impl Receiver<PublicKey = PublicKey>,
181 ),
182 (certificate_sender, certificate_receiver): (
183 impl Sender<PublicKey = PublicKey>,
184 impl Receiver<PublicKey = PublicKey>,
185 ),
186 (resolver_sender, resolver_receiver): (
187 impl Sender<PublicKey = PublicKey>,
188 impl Receiver<PublicKey = PublicKey>,
189 ),
190 ) {
191 let (mux, mut vote_mux, mut vote_backup) = Muxer::builder(
192 self.context.with_label("vote_mux"),
193 vote_sender,
194 vote_receiver,
195 self.config.mailbox_size,
196 )
197 .with_backup()
198 .build();
199 mux.start();
200
201 let (mux, mut certificate_mux) = Muxer::builder(
202 self.context.with_label("certificate_mux"),
203 certificate_sender,
204 certificate_receiver,
205 self.config.mailbox_size,
206 )
207 .build();
208 mux.start();
209
210 let (mux, mut resolver_mux) = Muxer::new(
211 self.context.with_label("resolver_mux"),
212 resolver_sender,
213 resolver_receiver,
214 self.config.mailbox_size,
215 );
216 mux.start();
217
218 loop {
219 select!(
220 message = vote_backup.recv() => {
221 let Some((their_epoch, (from, _))) = message else {
222 error_span!("mux channel closed").in_scope(||
223 error!("vote p2p mux channel closed; exiting actor")
224 );
225 break;
226 };
227 self.handle_msg_for_unregistered_epoch(
228 Epoch::new(their_epoch),
229 from,
230 ).await;
231 },
232
233 msg = self.mailbox.next() => {
234 let Some(msg) = msg else {
235 warn_span!("mailboxes dropped").in_scope(||
236 warn!("all mailboxes dropped; exiting actor"
237 ));
238 break;
239 };
240 let cause = msg.cause;
241 match msg.content {
242 Content::Enter(enter) => {
243 let _: Result<_, _> = self
244 .enter(
245 cause,
246 enter,
247 &mut vote_mux,
248 &mut certificate_mux,
249 &mut resolver_mux,
250 )
251 .await;
252 }
253 Content::Exit(exit) => self.exit(cause, exit),
254 Content::Update(update) => {
255 match *update {
256 Update::Tip(_, height, digest) => {
257 let _ = self.handle_finalized_tip(height, digest).await;
258 }
259 Update::Block(_block, ack) => {
260 ack.acknowledge();
261 }
262 }
263 }
264 }
265 },
266 )
267 }
268 }
269
270 #[instrument(
271 parent = &cause,
272 skip_all,
273 fields(
274 %epoch,
275 network_identity = %public.public(),
276 ?participants,
277 ),
278 err(level = Level::WARN)
279 )]
280 async fn enter(
281 &mut self,
282 cause: Span,
283 EpochTransition {
284 epoch,
285 public,
286 share,
287 participants,
288 }: EpochTransition,
289 vote_mux: &mut MuxHandle<
290 impl Sender<PublicKey = PublicKey>,
291 impl Receiver<PublicKey = PublicKey>,
292 >,
293 certificates_mux: &mut MuxHandle<
294 impl Sender<PublicKey = PublicKey>,
295 impl Receiver<PublicKey = PublicKey>,
296 >,
297 resolver_mux: &mut MuxHandle<
298 impl Sender<PublicKey = PublicKey>,
299 impl Receiver<PublicKey = PublicKey>,
300 >,
301 ) -> eyre::Result<()> {
302 if let Some(latest) = self.active_epochs.last_key_value().map(|(k, _)| *k) {
303 ensure!(
304 epoch > latest,
305 "requested to start an epoch `{epoch}` older than the latest \
306 running, `{latest}`; refusing",
307 );
308 }
309
310 let n_participants = participants.len();
311 let is_signer = matches!(share, Some(..));
313 let scheme = if let Some(share) = share {
314 info!("we have a share for this epoch, participating as a signer",);
315 Scheme::signer(crate::config::NAMESPACE, participants, public, share)
316 .expect("our private share must match our slice of the public key")
317 } else {
318 info!("we don't have a share for this epoch, participating as a verifier",);
319 Scheme::verifier(crate::config::NAMESPACE, participants, public)
320 };
321 self.config.scheme_provider.register(epoch, scheme.clone());
322
323 let engine_ctx = self
326 .context
327 .with_label("simplex")
328 .with_attribute("epoch", epoch)
329 .with_scope();
330
331 let engine = simplex::Engine::new(
332 engine_ctx.clone(),
333 simplex::Config {
334 scheme,
335 elector: elector::Random,
336 blocker: self.config.blocker.clone(),
337 automaton: self.config.application.clone(),
338 relay: self.config.application.clone(),
339 reporter: Reporters::<_, crate::subblocks::Mailbox, _>::from((
340 self.config.subblocks.clone(),
341 Reporters::from((self.config.marshal.clone(), self.config.feed.clone())),
342 )),
343 partition: format!(
344 "{partition_prefix}_consensus_epoch_{epoch}",
345 partition_prefix = self.config.partition_prefix
346 ),
347 mailbox_size: self.config.mailbox_size,
348 epoch,
349
350 replay_buffer: REPLAY_BUFFER,
351 write_buffer: WRITE_BUFFER,
352 page_cache: self.config.page_cache.clone(),
353
354 leader_timeout: self.config.time_to_propose,
355 certification_timeout: self.config.time_to_collect_notarizations,
356 timeout_retry: self.config.time_to_retry_nullify_broadcast,
357 fetch_timeout: self.config.time_for_peer_response,
358 activity_timeout: self.config.views_to_track,
359 skip_timeout: self.config.views_until_leader_skip,
360
361 fetch_concurrent: crate::config::NUMBER_CONCURRENT_FETCHES,
362
363 strategy: Sequential,
364 },
365 );
366
367 let vote = vote_mux.register(epoch.get()).await.unwrap();
368 let certificate = certificates_mux.register(epoch.get()).await.unwrap();
369 let resolver = resolver_mux.register(epoch.get()).await.unwrap();
370
371 assert!(
372 self.active_epochs
373 .insert(
374 epoch,
375 (engine.start(vote, certificate, resolver), engine_ctx)
376 )
377 .is_none(),
378 "there must be no other active engine running: this was ensured at \
379 the beginning of this method",
380 );
381
382 let latest = self.confirmed_latest_network_epoch.get_or_insert(epoch);
383 *latest = (*latest).max(epoch);
384
385 info!("started consensus engine backing the epoch");
386
387 self.metrics.latest_participants.set(n_participants as i64);
388 self.metrics.active_epochs.inc();
389 let _ = self.metrics.latest_epoch.try_set(epoch.get());
390 self.metrics.how_often_signer.inc_by(is_signer as u64);
391 self.metrics.how_often_verifier.inc_by(!is_signer as u64);
392
393 Ok(())
394 }
395
396 #[instrument(parent = &cause, skip_all, fields(epoch))]
397 fn exit(&mut self, cause: Span, Exit { epoch }: Exit) {
398 if let Some((engine, engine_ctx)) = self.active_epochs.remove(&epoch) {
399 drop(engine_ctx);
400 engine.abort();
401 info!("stopped engine backing epoch");
402 } else {
403 warn!(
404 "attempted to exit unknown epoch, but epoch was not backed by \
405 an active engine",
406 );
407 }
408
409 if !self.config.scheme_provider.delete(&epoch) {
410 warn!(
411 "attempted to delete scheme for epoch, but epoch had no scheme \
412 registered"
413 );
414 }
415 }
416
417 #[instrument(
418 skip_all,
419 fields(%height, epoch = tracing::field::Empty),
420 err,
421 )]
422 async fn handle_finalized_tip(&mut self, height: Height, digest: Digest) -> eyre::Result<()> {
423 let epoch_info = self
424 .config
425 .epoch_strategy
426 .containing(height)
427 .expect("epoch strategy is valid for all epochs and heights");
428 Span::current().record("epoch", tracing::field::display(epoch_info.epoch()));
429
430 {
431 let network_epoch = self
432 .confirmed_latest_network_epoch
433 .get_or_insert(epoch_info.epoch());
434 *network_epoch = (*network_epoch).max(epoch_info.epoch());
435 }
436
437 if epoch_info.last() == height {
449 info!(
450 "the finalized tip is a boundary block; requesting the \
451 block to set the scheme for its epoch"
452 );
453 let block = self
454 .config
455 .marshal
456 .subscribe_by_digest(None, digest)
457 .await
458 .await
459 .map_err(|_| eyre!("marshal never returned the block"))?;
460 let onchain_outcome = tempo_dkg_onchain_artifacts::OnchainDkgOutcome::read(
461 &mut block.header().extra_data().as_ref(),
462 )
463 .expect("boundary blocks must contain DKG outcomes");
464 self.config.scheme_provider.register(
465 onchain_outcome.epoch,
466 Scheme::verifier(
467 crate::config::NAMESPACE,
468 onchain_outcome.players().clone(),
469 onchain_outcome.sharing().clone(),
470 ),
471 );
472 self.confirmed_latest_network_epoch
473 .replace(onchain_outcome.epoch);
474 debug!(
475 next_epoch = %onchain_outcome.epoch,
476 "read DKG outcome from boundary and registered scheme",
477 );
478 }
479 Ok(())
480 }
481
482 #[instrument(
490 skip_all,
491 fields(msg.epoch = %their_epoch, msg.from = %from),
492 )]
493 async fn handle_msg_for_unregistered_epoch(&mut self, their_epoch: Epoch, from: PublicKey) {
494 let reference_epoch = match (
495 self.active_epochs.keys().last().copied(),
496 self.confirmed_latest_network_epoch,
497 ) {
498 (Some(our), None) => our,
499 (Some(our), Some(confirmed_finalized)) => our.max(confirmed_finalized),
500 (None, Some(confirmed_finalized)) => confirmed_finalized,
501 (None, None) => {
502 debug!(
503 "received message for unregistered epoch, but we are \
504 neither running a consensus engine backing an epoch, nor \
505 do we know what the latest finalized epoch is; there is \
506 nothing to do",
507 );
508 return;
509 }
510 };
511
512 if reference_epoch >= their_epoch {
513 return;
514 }
515
516 let boundary_height = self
517 .config
518 .epoch_strategy
519 .last(reference_epoch)
520 .expect("our epoch strategy should cover all epochs");
521
522 tracing::debug!(
523 %reference_epoch,
524 %boundary_height,
525 "hinting to sync system that a finalization certificate might be \
526 available for our reference epoch",
527 );
528 self.config
529 .marshal
530 .hint_finalized(boundary_height, NonEmptyVec::new(from))
531 .await;
532 }
533}
534
535struct Metrics {
536 active_epochs: Gauge,
537 latest_epoch: Gauge,
538 latest_participants: Gauge,
539 how_often_signer: Counter,
540 how_often_verifier: Counter,
541}