1use std::{collections::BTreeMap, num::NonZeroUsize};
48
49use bytes::Bytes;
50use commonware_codec::{DecodeExt as _, Encode as _, varint::UInt};
51use commonware_consensus::{
52 Reporters,
53 simplex::{self, signing_scheme::bls12381_threshold::Scheme, types::Voter},
54 types::Epoch,
55 utils,
56};
57use commonware_cryptography::{bls12381::primitives::variant::MinSig, ed25519::PublicKey};
58use commonware_macros::select;
59use commonware_p2p::{
60 Blocker, Receiver, Recipients, Sender,
61 utils::mux::{Builder as _, GlobalSender, MuxHandle, Muxer},
62};
63use commonware_runtime::{
64 Clock, ContextCell, Handle, Metrics as _, Network, Spawner, Storage, spawn_cell,
65};
66use eyre::{WrapErr as _, ensure, eyre};
67use futures::{StreamExt as _, channel::mpsc};
68use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
69use rand::{CryptoRng, Rng};
70use tracing::{Level, Span, error, error_span, info, instrument, warn, warn_span};
71
72use crate::{
73 consensus::Digest,
74 epoch::manager::ingress::{Enter, Exit},
75};
76
77use super::ingress::Message;
78
79const REPLAY_BUFFER: NonZeroUsize = NonZeroUsize::new(8 * 1024 * 1024).expect("value is not zero"); const WRITE_BUFFER: NonZeroUsize = NonZeroUsize::new(1024 * 1024).expect("value is not zero"); pub(crate) struct Actor<TBlocker, TContext> {
83 active_epochs: BTreeMap<Epoch, Handle<()>>,
84 config: super::Config<TBlocker>,
85 context: ContextCell<TContext>,
86 mailbox: mpsc::UnboundedReceiver<Message>,
87 metrics: Metrics,
88}
89
90impl<TBlocker, TContext> Actor<TBlocker, TContext>
91where
92 TBlocker: Blocker<PublicKey = PublicKey>,
93 TContext: Spawner
95 + commonware_runtime::Metrics
96 + Rng
97 + CryptoRng
98 + Clock
99 + governor::clock::Clock
100 + Storage
101 + Network,
102{
103 pub(super) fn new(
104 config: super::Config<TBlocker>,
105 context: TContext,
106 mailbox: mpsc::UnboundedReceiver<Message>,
107 ) -> Self {
108 let active_epochs = Gauge::default();
109 let latest_epoch = Gauge::default();
110 let latest_participants = Gauge::default();
111 let how_often_signer = Counter::default();
112 let how_often_verifier = Counter::default();
113
114 context.register(
115 "active_epochs",
116 "the number of epochs currently managed by the epoch manager",
117 active_epochs.clone(),
118 );
119 context.register(
120 "latest_epoch",
121 "the latest epoch managed by this epoch manager",
122 latest_epoch.clone(),
123 );
124 context.register(
125 "latest_participants",
126 "the number of participants in the most recently started epoch",
127 latest_participants.clone(),
128 );
129 context.register(
130 "how_often_signer",
131 "how often a node is a signer; a node is a signer if it has a share",
132 how_often_signer.clone(),
133 );
134 context.register(
135 "how_often_verifier",
136 "how often a node is a verifier; a node is a verifier if it does not have a share",
137 how_often_verifier.clone(),
138 );
139
140 Self {
141 config,
142 context: ContextCell::new(context),
143 mailbox,
144 metrics: Metrics {
145 active_epochs,
146 latest_epoch,
147 latest_participants,
148 how_often_signer,
149 how_often_verifier,
150 },
151 active_epochs: BTreeMap::new(),
152 }
153 }
154
155 pub(crate) fn start(
156 mut self,
157 pending: (
158 impl Sender<PublicKey = PublicKey>,
159 impl Receiver<PublicKey = PublicKey>,
160 ),
161 recovered: (
162 impl Sender<PublicKey = PublicKey>,
163 impl Receiver<PublicKey = PublicKey>,
164 ),
165 resolver: (
166 impl Sender<PublicKey = PublicKey>,
167 impl Receiver<PublicKey = PublicKey>,
168 ),
169 boundary_certificates: (
170 impl Sender<PublicKey = PublicKey>,
171 impl Receiver<PublicKey = PublicKey>,
172 ),
173 ) -> Handle<()> {
174 spawn_cell!(
175 self.context,
176 self.run(pending, recovered, resolver, boundary_certificates)
177 .await
178 )
179 }
180
181 async fn run(
182 mut self,
183 (pending_sender, pending_receiver): (
184 impl Sender<PublicKey = PublicKey>,
185 impl Receiver<PublicKey = PublicKey>,
186 ),
187 (recovered_sender, recovered_receiver): (
188 impl Sender<PublicKey = PublicKey>,
189 impl Receiver<PublicKey = PublicKey>,
190 ),
191 (resolver_sender, resolver_receiver): (
192 impl Sender<PublicKey = PublicKey>,
193 impl Receiver<PublicKey = PublicKey>,
194 ),
195 (mut boundary_certificates_sender, mut boundary_certificates_receiver): (
196 impl Sender<PublicKey = PublicKey>,
197 impl Receiver<PublicKey = PublicKey>,
198 ),
199 ) {
200 let (mux, mut pending_mux, mut pending_backup) = Muxer::builder(
201 self.context.with_label("pending_mux"),
202 pending_sender,
203 pending_receiver,
204 self.config.mailbox_size,
205 )
206 .with_backup()
207 .build();
208 mux.start();
209
210 let (mux, mut recovered_mux, mut recovered_global_sender) = Muxer::builder(
211 self.context.with_label("recovered_mux"),
212 recovered_sender,
213 recovered_receiver,
214 self.config.mailbox_size,
215 )
216 .with_global_sender()
217 .build();
218 mux.start();
219
220 let (mux, mut resolver_mux) = Muxer::new(
221 self.context.with_label("resolver_mux"),
222 resolver_sender,
223 resolver_receiver,
224 self.config.mailbox_size,
225 );
226 mux.start();
227
228 loop {
229 select!(
230 message = pending_backup.next() => {
231 let Some((their_epoch, (from, _))) = message else {
232 error_span!("mux channel closed").in_scope(||
233 error!("pending p2p mux channel closed; exiting actor"
234 ));
235 break;
236 };
237 let _: Result<_, _> = self.handle_msg_for_unregistered_epoch(
238 &mut boundary_certificates_sender,
239 their_epoch,
240 from,
241 ).await;
242 },
243
244 message = boundary_certificates_receiver.recv() => {
245 let (from, payload) = match message {
246 Err(error) => {
247 error_span!("epoch channel closed").in_scope(||
248 error!(
249 error = %eyre::Report::new(error),
250 "epoch p2p channel closed; exiting actor",
251 ));
252 break;
253 }
254 Ok(msg) => msg,
255 };
256 let _: Result<_, _> = self.handle_boundary_certificate_request(
257 from,
258 payload,
259 &mut recovered_global_sender)
260 .await;
261 },
262
263 msg = self.mailbox.next()=> {
264 let Some(msg) = msg else {
265 warn_span!("mailboxes dropped").in_scope(||
266 warn!("all mailboxes dropped; exiting actor"
267 ));
268 break;
269 };
270 let cause = msg.cause;
271 match msg.activity {
272 super::ingress::Activity::Enter(enter) => {
273 let _: Result<_, _> = self
274 .enter(
275 cause,
276 enter,
277 &mut pending_mux,
278 &mut recovered_mux,
279 &mut resolver_mux,
280 )
281 .await;
282 }
283 super::ingress::Activity::Exit(exit) => self.exit(cause, exit),
284 }
285 },
286 )
287 }
288 }
289
290 #[instrument(
291 parent = &cause,
292 skip_all,
293 fields(
294 %epoch,
295 ?public,
296 ?participants,
297 ),
298 err(level = Level::WARN)
299 )]
300 async fn enter(
301 &mut self,
302 cause: Span,
303 Enter {
304 epoch,
305 public,
306 share,
307 participants,
308 }: Enter,
309 pending_mux: &mut MuxHandle<
310 impl Sender<PublicKey = PublicKey>,
311 impl Receiver<PublicKey = PublicKey>,
312 >,
313 recovered_mux: &mut MuxHandle<
314 impl Sender<PublicKey = PublicKey>,
315 impl Receiver<PublicKey = PublicKey>,
316 >,
317 resolver_mux: &mut MuxHandle<
318 impl Sender<PublicKey = PublicKey>,
319 impl Receiver<PublicKey = PublicKey>,
320 >,
321 ) -> eyre::Result<()> {
322 ensure!(
323 !self.active_epochs.contains_key(&epoch),
324 "an engine for the entered epoch is already running; ignoring",
325 );
326
327 let n_participants = participants.len();
328 let scheme = if let Some(share) = share {
330 info!("we have a share for this epoch, participating as a signer",);
331 Scheme::new(participants, &public, share)
332 } else {
333 info!("we don't have a share for this epoch, participating as a verifier",);
334 Scheme::verifier(participants, &public)
335 };
336 assert!(
337 self.config.scheme_provider.register(epoch, scheme.clone()),
338 "a scheme must never be registered twice",
339 );
340
341 let is_signer = matches!(scheme, Scheme::Signer { .. });
342
343 let engine = simplex::Engine::new(
344 self.context.with_label("consensus_engine"),
345 simplex::Config {
346 scheme,
347 blocker: self.config.blocker.clone(),
348 automaton: self.config.application.clone(),
349 relay: self.config.application.clone(),
350 reporter: Reporters::from((
351 self.config.subblocks.clone(),
352 self.config.marshal.clone(),
353 )),
354 partition: format!(
355 "{partition_prefix}_consensus_epoch_{epoch}",
356 partition_prefix = self.config.partition_prefix
357 ),
358 mailbox_size: self.config.mailbox_size,
359 epoch,
360 namespace: crate::config::NAMESPACE.to_vec(),
361
362 replay_buffer: REPLAY_BUFFER,
363 write_buffer: WRITE_BUFFER,
364 buffer_pool: self.config.buffer_pool.clone(),
365
366 leader_timeout: self.config.time_to_propose,
367 notarization_timeout: self.config.time_to_collect_notarizations,
368 nullify_retry: self.config.time_to_retry_nullify_broadcast,
369 fetch_timeout: self.config.time_for_peer_response,
370 activity_timeout: self.config.views_to_track,
371 skip_timeout: self.config.views_until_leader_skip,
372
373 fetch_concurrent: crate::config::NUMBER_CONCURRENT_FETCHES,
374 fetch_rate_per_peer: crate::config::RESOLVER_LIMIT,
375 },
376 );
377
378 let pending_sc = pending_mux.register(epoch).await.unwrap();
379 let recovered_sc = recovered_mux.register(epoch).await.unwrap();
380 let resolver_sc = resolver_mux.register(epoch).await.unwrap();
381
382 assert!(
383 self.active_epochs
384 .insert(epoch, engine.start(pending_sc, recovered_sc, resolver_sc))
385 .is_none(),
386 "there must be no other active engine running: this was ensured at \
387 the beginning of this method",
388 );
389
390 info!("started consensus engine backing the epoch");
391
392 self.metrics.latest_participants.set(n_participants as i64);
393 self.metrics.active_epochs.inc();
394 self.metrics.latest_epoch.set(epoch as i64);
395 self.metrics.how_often_signer.inc_by(is_signer as u64);
396 self.metrics.how_often_verifier.inc_by(!is_signer as u64);
397
398 Ok(())
399 }
400
401 #[instrument(parent = &cause, skip_all, fields(epoch))]
402 fn exit(&mut self, cause: Span, Exit { epoch }: Exit) {
403 if let Some(engine) = self.active_epochs.remove(&epoch) {
404 engine.abort();
405 info!("stopped engine backing epoch");
406 } else {
407 warn!(
408 "attempted to exit unknown epoch, but epoch was not backed by \
409 an active engine",
410 );
411 }
412
413 if !self.config.scheme_provider.delete(&epoch) {
414 warn!(
415 "attempted to delete scheme for epoch, but epoch had no scheme \
416 registered"
417 );
418 }
419 }
420
421 #[instrument(skip_all, fields(msg.epoch = their_epoch, msg.from = %from), err(level = Level::INFO))]
435 async fn handle_msg_for_unregistered_epoch(
436 &mut self,
437 boundary_certificates_sender: &mut impl Sender<PublicKey = PublicKey>,
438 their_epoch: Epoch,
439 from: PublicKey,
440 ) -> eyre::Result<()> {
441 let Some(our_epoch) = self.active_epochs.keys().last().copied() else {
442 return Err(eyre!(
443 "received message over unregistered epoch channel, but we have no active epochs at all"
444 ));
445 };
446 ensure!(
447 their_epoch > our_epoch,
448 "request epoch `{their_epoch}` is in our past, no action is necessary",
449 );
450
451 let boundary_height = utils::last_block_in_epoch(self.config.epoch_length, our_epoch);
452 ensure!(
453 self.config
454 .marshal
455 .get_finalization(boundary_height)
456 .await
457 .is_none(),
458 "finalization certificate for epoch `{our_epoch}` at boundary \
459 height `{boundary_height}` is already known; no action necessary",
460 );
461
462 boundary_certificates_sender
463 .send(
464 Recipients::One(from),
465 UInt(our_epoch).encode().freeze(),
466 true,
467 )
468 .await
469 .wrap_err("failed request for finalization certificate of our epoch")?;
470
471 info!("requested finalization certificate for our epoch");
472
473 Ok(())
474 }
475
476 #[instrument(skip_all, fields(
477 msg.from = %from,
478 msg.payload_len = bytes.len(),
479 msg.decoded_epoch = tracing::field::Empty,
480 ), err(level = Level::WARN))]
481 async fn handle_boundary_certificate_request(
482 &mut self,
483 from: PublicKey,
484 bytes: Bytes,
485 recovered_global_sender: &mut GlobalSender<impl Sender<PublicKey = PublicKey>>,
486 ) -> eyre::Result<()> {
487 let requested_epoch = UInt::<Epoch>::decode(bytes.as_ref())
488 .wrap_err("failed decoding epoch channel payload as epoch")?
489 .into();
490 tracing::Span::current().record("msg.decoded_epoch", requested_epoch);
491 let boundary_height = utils::last_block_in_epoch(self.config.epoch_length, requested_epoch);
492 let cert = self
493 .config
494 .marshal
495 .get_finalization(boundary_height)
496 .await
497 .ok_or_else(|| {
498 eyre!(
499 "do not have finalization for requested epoch \
500 `{requested_epoch}`, boundary height `{boundary_height}` \
501 available locally; cannot serve request"
502 )
503 })?;
504 let message = Voter::<Scheme<PublicKey, MinSig>, Digest>::Finalization(cert);
505 recovered_global_sender
506 .send(
507 requested_epoch,
508 Recipients::One(from),
509 message.encode().freeze(),
510 false,
511 )
512 .await
513 .wrap_err(
514 "failed forwarding finalization certificate to requester via `recovered` channel",
515 )?;
516 Ok(())
517 }
518}
519
520struct Metrics {
521 active_epochs: Gauge,
522 latest_epoch: Gauge,
523 latest_participants: Gauge,
524 how_often_signer: Counter,
525 how_often_verifier: Counter,
526}