Skip to main content

tempo_commonware_node/epoch/manager/
actor.rs

1//! Actor implementing the epoch manager logic.
2//!
3//! This actor is responsible for:
4//!
5//! 1. entering and exiting epochs given messages it receives from the DKG
6//!    manager.
7//! 2. catching the node up by listening to votes for unknown epoch and
8//!    requesting finalizations for the currently known boundary height.
9//!
10//! # Entering and exiting epochs
11//!
12//! When the actor receives an `Enter` message, it spins up a new simplex
13//! consensus engine backing the epoch stored in the message. The message also
14//! contains the public polynomial, share of the private key for this node,
15//! and the participants in the next epoch - all determined by the DKG ceremony.
16//! The engine receives a subchannel of the vote, certificate, and resolver
17//! p2p channels, multiplexed by the epoch.
18//!
19//! When the actor receives an `Exit` message, it exists the engine backing the
20//! epoch stored in it.
21//!
22//! # Catching up the node
23//!
24//! The actor makes use of the backup mechanism exposed by the subchannel
25//! multiplexer API: assume the actor has a simplex engine running for epoch 0,
26//! then this engine will have a subchannel registered on the multiplexer for
27//! epoch 0.
28//!
29//! If the actor now receives a vote in epoch 5 over its vote mux backup
30//! channel (since there are no subchannels registered with the muxer on
31//! epochs 1 through 5), it hints to the marshal actor that a finalization
32//! certificate for the node's *current* epoch's boundary height must exist.
33//!
34//! If such a finalization certificate exists, the marshal actor will fetch
35//! and verify it, and move the network finalized tip there. If that happens,
36//! the epoch manager actor will read the DKG outcome from the finalized tip
37//! and move on to the next epoch. It will not start a full simplex engine
38//! (the DKG manager is responsible for driving that), but it will "soft-enter"
39//! the new epoch by registering the new public polynomial on the scheme
40//! provider.
41//!
42//! This process is repeated until the node catches up to the current network
43//! epoch.
44use std::{collections::BTreeMap, num::NonZeroUsize};
45
46use alloy_consensus::BlockHeader as _;
47use commonware_codec::ReadExt as _;
48use commonware_consensus::{
49    Reporters,
50    marshal::Update,
51    simplex::{self, elector, scheme::bls12381_threshold::vrf::Scheme},
52    types::{Epoch, EpochDelta, Epocher as _, Height},
53};
54use commonware_cryptography::ed25519::PublicKey;
55use commonware_macros::select;
56use commonware_p2p::{
57    Blocker, Receiver, Sender,
58    utils::mux::{Builder as _, MuxHandle, Muxer},
59};
60use commonware_parallel::Sequential;
61use commonware_runtime::{
62    BufferPooler, Clock, ContextCell, Handle, Metrics as _, Network, Spawner, Storage, spawn_cell,
63    telemetry::metrics::status::GaugeExt as _,
64};
65use commonware_utils::{Acknowledgement as _, vec::NonEmptyVec};
66use eyre::{ensure, eyre};
67use futures::{StreamExt as _, channel::mpsc};
68use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
69use rand_08::{CryptoRng, Rng};
70use tracing::{Level, Span, debug, error, error_span, info, instrument, warn, warn_span};
71
72use crate::{
73    consensus::Digest,
74    epoch::manager::ingress::{EpochTransition, Exit},
75};
76
77use super::ingress::{Content, Message};
78
79const REPLAY_BUFFER: NonZeroUsize = NonZeroUsize::new(8 * 1024 * 1024).expect("value is not zero"); // 8MB
80const WRITE_BUFFER: NonZeroUsize = NonZeroUsize::new(1024 * 1024).expect("value is not zero"); // 1MB
81
82pub(crate) struct Actor<TContext, TBlocker> {
83    active_epochs: BTreeMap<Epoch, (Handle<()>, ContextCell<TContext>)>,
84    config: super::Config<TBlocker>,
85    context: ContextCell<TContext>,
86    confirmed_latest_network_epoch: Option<Epoch>,
87    mailbox: mpsc::UnboundedReceiver<Message>,
88    metrics: Metrics,
89}
90
91impl<TContext, TBlocker> Actor<TContext, TBlocker>
92where
93    TBlocker: Blocker<PublicKey = PublicKey>,
94    // TODO(janis): are all of these bounds necessary?
95    TContext: BufferPooler
96        + Spawner
97        + commonware_runtime::Metrics
98        + Rng
99        + CryptoRng
100        + Clock
101        + governor::clock::Clock
102        + Storage
103        + Network,
104{
105    pub(super) fn new(
106        config: super::Config<TBlocker>,
107        context: TContext,
108        mailbox: mpsc::UnboundedReceiver<Message>,
109    ) -> Self {
110        let active_epochs = Gauge::default();
111        let latest_epoch = Gauge::default();
112        let latest_participants = Gauge::default();
113        let how_often_signer = Counter::default();
114        let how_often_verifier = Counter::default();
115
116        context.register(
117            "active_epochs",
118            "the number of epochs currently managed by the epoch manager",
119            active_epochs.clone(),
120        );
121        context.register(
122            "latest_epoch",
123            "the latest epoch managed by this epoch manager",
124            latest_epoch.clone(),
125        );
126        context.register(
127            "latest_participants",
128            "the number of participants in the most recently started epoch",
129            latest_participants.clone(),
130        );
131        context.register(
132            "how_often_signer",
133            "how often a node is a signer; a node is a signer if it has a share",
134            how_often_signer.clone(),
135        );
136        context.register(
137            "how_often_verifier",
138            "how often a node is a verifier; a node is a verifier if it does not have a share",
139            how_often_verifier.clone(),
140        );
141
142        Self {
143            config,
144            context: ContextCell::new(context),
145            mailbox,
146            metrics: Metrics {
147                active_epochs,
148                latest_epoch,
149                latest_participants,
150                how_often_signer,
151                how_often_verifier,
152            },
153            active_epochs: BTreeMap::new(),
154            confirmed_latest_network_epoch: None,
155        }
156    }
157
158    pub(crate) fn start(
159        mut self,
160        votes: (
161            impl Sender<PublicKey = PublicKey>,
162            impl Receiver<PublicKey = PublicKey>,
163        ),
164        certificates: (
165            impl Sender<PublicKey = PublicKey>,
166            impl Receiver<PublicKey = PublicKey>,
167        ),
168        resolver: (
169            impl Sender<PublicKey = PublicKey>,
170            impl Receiver<PublicKey = PublicKey>,
171        ),
172    ) -> Handle<()> {
173        spawn_cell!(self.context, self.run(votes, certificates, resolver))
174    }
175
176    async fn run(
177        mut self,
178        (vote_sender, vote_receiver): (
179            impl Sender<PublicKey = PublicKey>,
180            impl Receiver<PublicKey = PublicKey>,
181        ),
182        (certificate_sender, certificate_receiver): (
183            impl Sender<PublicKey = PublicKey>,
184            impl Receiver<PublicKey = PublicKey>,
185        ),
186        (resolver_sender, resolver_receiver): (
187            impl Sender<PublicKey = PublicKey>,
188            impl Receiver<PublicKey = PublicKey>,
189        ),
190    ) {
191        let (mux, mut vote_mux, mut vote_backup) = Muxer::builder(
192            self.context.with_label("vote_mux"),
193            vote_sender,
194            vote_receiver,
195            self.config.mailbox_size,
196        )
197        .with_backup()
198        .build();
199        mux.start();
200
201        let (mux, mut certificate_mux) = Muxer::builder(
202            self.context.with_label("certificate_mux"),
203            certificate_sender,
204            certificate_receiver,
205            self.config.mailbox_size,
206        )
207        .build();
208        mux.start();
209
210        let (mux, mut resolver_mux) = Muxer::new(
211            self.context.with_label("resolver_mux"),
212            resolver_sender,
213            resolver_receiver,
214            self.config.mailbox_size,
215        );
216        mux.start();
217
218        loop {
219            select!(
220                message = vote_backup.recv() => {
221                    let Some((their_epoch, (from, _))) = message else {
222                        error_span!("mux channel closed").in_scope(||
223                            error!("vote p2p mux channel closed; exiting actor")
224                        );
225                        break;
226                    };
227                    self.handle_msg_for_unregistered_epoch(
228                        Epoch::new(their_epoch),
229                        from,
230                    ).await;
231                },
232
233                msg = self.mailbox.next() => {
234                    let Some(msg) = msg else {
235                        warn_span!("mailboxes dropped").in_scope(||
236                             warn!("all mailboxes dropped; exiting actor"
237                        ));
238                        break;
239                    };
240                    let cause = msg.cause;
241                    match msg.content {
242                        Content::Enter(enter) => {
243                            let _: Result<_, _> = self
244                                .enter(
245                                    cause,
246                                    enter,
247                                    &mut vote_mux,
248                                    &mut certificate_mux,
249                                    &mut resolver_mux,
250                                )
251                                .await;
252                        }
253                        Content::Exit(exit) => self.exit(cause, exit),
254                        Content::Update(update) => {
255                            match *update {
256                                Update::Tip(_, height, digest) => {
257                                    let _ = self.handle_finalized_tip(height, digest).await;
258                                }
259                                Update::Block(_block, ack) => {
260                                    ack.acknowledge();
261                                }
262                            }
263                        }
264                    }
265                },
266            )
267        }
268    }
269
270    #[instrument(
271        parent = &cause,
272        skip_all,
273        fields(
274            %epoch,
275            network_identity = %public.public(),
276            ?participants,
277        ),
278        err(level = Level::WARN)
279    )]
280    async fn enter(
281        &mut self,
282        cause: Span,
283        EpochTransition {
284            epoch,
285            public,
286            share,
287            participants,
288        }: EpochTransition,
289        vote_mux: &mut MuxHandle<
290            impl Sender<PublicKey = PublicKey>,
291            impl Receiver<PublicKey = PublicKey>,
292        >,
293        certificates_mux: &mut MuxHandle<
294            impl Sender<PublicKey = PublicKey>,
295            impl Receiver<PublicKey = PublicKey>,
296        >,
297        resolver_mux: &mut MuxHandle<
298            impl Sender<PublicKey = PublicKey>,
299            impl Receiver<PublicKey = PublicKey>,
300        >,
301    ) -> eyre::Result<()> {
302        if let Some(latest) = self.active_epochs.last_key_value().map(|(k, _)| *k) {
303            ensure!(
304                epoch > latest,
305                "requested to start an epoch `{epoch}` older than the latest \
306                running, `{latest}`; refusing",
307            );
308        }
309
310        let n_participants = participants.len();
311        // Register the new signing scheme with the scheme provider.
312        let is_signer = matches!(share, Some(..));
313        let scheme = if let Some(share) = share {
314            info!("we have a share for this epoch, participating as a signer",);
315            Scheme::signer(crate::config::NAMESPACE, participants, public, share)
316                .expect("our private share must match our slice of the public key")
317        } else {
318            info!("we don't have a share for this epoch, participating as a verifier",);
319            Scheme::verifier(crate::config::NAMESPACE, participants, public)
320        };
321        self.config.scheme_provider.register(epoch, scheme.clone());
322
323        // Manage the context so we can explicitly drop during cleanup, releasing
324        // all metrics associated with this context.
325        let engine_ctx = self
326            .context
327            .with_label("simplex")
328            .with_attribute("epoch", epoch)
329            .with_scope();
330
331        let engine = simplex::Engine::new(
332            engine_ctx.clone(),
333            simplex::Config {
334                scheme,
335                elector: elector::Random,
336                blocker: self.config.blocker.clone(),
337                automaton: self.config.application.clone(),
338                relay: self.config.application.clone(),
339                reporter: Reporters::<_, crate::subblocks::Mailbox, _>::from((
340                    self.config.subblocks.clone(),
341                    Reporters::from((self.config.marshal.clone(), self.config.feed.clone())),
342                )),
343                partition: format!(
344                    "{partition_prefix}_consensus_epoch_{epoch}",
345                    partition_prefix = self.config.partition_prefix
346                ),
347                mailbox_size: self.config.mailbox_size,
348                epoch,
349
350                replay_buffer: REPLAY_BUFFER,
351                write_buffer: WRITE_BUFFER,
352                page_cache: self.config.page_cache.clone(),
353
354                leader_timeout: self.config.time_to_propose,
355                certification_timeout: self.config.time_to_collect_notarizations,
356                timeout_retry: self.config.time_to_retry_nullify_broadcast,
357                fetch_timeout: self.config.time_for_peer_response,
358                activity_timeout: self.config.views_to_track,
359                skip_timeout: self.config.views_until_leader_skip,
360
361                fetch_concurrent: crate::config::NUMBER_CONCURRENT_FETCHES,
362
363                forwarding: commonware_consensus::simplex::config::ForwardingPolicy::Disabled,
364
365                strategy: Sequential,
366            },
367        );
368
369        let vote = vote_mux.register(epoch.get()).await.unwrap();
370        let certificate = certificates_mux.register(epoch.get()).await.unwrap();
371        let resolver = resolver_mux.register(epoch.get()).await.unwrap();
372
373        assert!(
374            self.active_epochs
375                .insert(
376                    epoch,
377                    (engine.start(vote, certificate, resolver), engine_ctx)
378                )
379                .is_none(),
380            "there must be no other active engine running: this was ensured at \
381            the beginning of this method",
382        );
383
384        let latest = self.confirmed_latest_network_epoch.get_or_insert(epoch);
385        *latest = (*latest).max(epoch);
386
387        info!("started consensus engine backing the epoch");
388
389        self.metrics.latest_participants.set(n_participants as i64);
390        self.metrics.active_epochs.inc();
391        let _ = self.metrics.latest_epoch.try_set(epoch.get());
392        self.metrics.how_often_signer.inc_by(is_signer as u64);
393        self.metrics.how_often_verifier.inc_by(!is_signer as u64);
394
395        Ok(())
396    }
397
398    #[instrument(parent = &cause, skip_all, fields(epoch))]
399    fn exit(&mut self, cause: Span, Exit { epoch }: Exit) {
400        if let Some((engine, engine_ctx)) = self.active_epochs.remove(&epoch) {
401            drop(engine_ctx);
402            engine.abort();
403            info!("stopped engine backing epoch");
404        } else {
405            warn!(
406                "attempted to exit unknown epoch, but epoch was not backed by \
407                an active engine",
408            );
409        }
410
411        // XXX: Keep the last 2 epochs around: the marshal actor might get
412        // finalization certificates from straggling nodes that have not yet
413        // transitioned and are still (re-)propsing the boundary block of the
414        // outgoing epoch with new certificate.
415        //
416        // If we delete the scheme too eagerly here, then i) we won't be able
417        // to verify the certificate, ii) consider their message invalid, and
418        // finally iii) block them because this is treated as Byzantine
419        // behavior.
420        if let Some(to_delete) = epoch.checked_sub(EpochDelta::new(2))
421            && !self.config.scheme_provider.delete(&to_delete)
422        {
423            debug!(
424                to_exit = %epoch,
425                %to_delete,
426                "attempted to delete scheme for epoch, but epoch had no scheme \
427                registered"
428            );
429        }
430    }
431
432    #[instrument(
433        skip_all,
434        fields(%height, epoch = tracing::field::Empty),
435        err,
436    )]
437    async fn handle_finalized_tip(&mut self, height: Height, digest: Digest) -> eyre::Result<()> {
438        let epoch_info = self
439            .config
440            .epoch_strategy
441            .containing(height)
442            .expect("epoch strategy is valid for all epochs and heights");
443        Span::current().record("epoch", tracing::field::display(epoch_info.epoch()));
444
445        {
446            let network_epoch = self
447                .confirmed_latest_network_epoch
448                .get_or_insert(epoch_info.epoch());
449            *network_epoch = (*network_epoch).max(epoch_info.epoch());
450        }
451
452        // If the tip contains a boundary block, then:
453        //
454        // 1. request the block from the marshal actor;
455        // 2. read the DKG outcome from the block header;
456        // 3. register the DKG scheme on the scheme provider;
457        // 4. set the confirmed network height to the value in the on-chain
458        // DKG outcome.
459        //
460        // This soft enters the new epoch without spinning up a new simplex
461        // engine, and allows the epoch manager to forward more finalization
462        // hints to the marshal actor.
463        if epoch_info.last() == height {
464            info!(
465                "the finalized tip is a boundary block; requesting the \
466                block to set the scheme for its epoch"
467            );
468            let block = self
469                .config
470                .marshal
471                .subscribe_by_digest(None, digest)
472                .await
473                .await
474                .map_err(|_| eyre!("marshal never returned the block"))?;
475            let onchain_outcome = tempo_dkg_onchain_artifacts::OnchainDkgOutcome::read(
476                &mut block.header().extra_data().as_ref(),
477            )
478            .expect("boundary blocks must contain DKG outcomes");
479            self.config.scheme_provider.register(
480                onchain_outcome.epoch,
481                Scheme::verifier(
482                    crate::config::NAMESPACE,
483                    onchain_outcome.players().clone(),
484                    onchain_outcome.sharing().clone(),
485                ),
486            );
487            self.confirmed_latest_network_epoch
488                .replace(onchain_outcome.epoch);
489            debug!(
490                next_epoch = %onchain_outcome.epoch,
491                "read DKG outcome from boundary and registered scheme",
492            );
493        }
494        Ok(())
495    }
496
497    /// Handles messages for epochs received on un-registered sub-channels.
498    ///
499    /// If `their_epoch` is known (equal to our current epoch or in the past),
500    /// no action is taken.
501    ///
502    /// If `their_epoch` is in the future, then a hint is sent to the marshal
503    /// actor that a boundary certificate could be fetched.
504    #[instrument(
505        skip_all,
506        fields(msg.epoch = %their_epoch, msg.from = %from),
507    )]
508    async fn handle_msg_for_unregistered_epoch(&mut self, their_epoch: Epoch, from: PublicKey) {
509        let reference_epoch = match (
510            self.active_epochs.keys().last().copied(),
511            self.confirmed_latest_network_epoch,
512        ) {
513            (Some(our), None) => our,
514            (Some(our), Some(confirmed_finalized)) => our.max(confirmed_finalized),
515            (None, Some(confirmed_finalized)) => confirmed_finalized,
516            (None, None) => {
517                debug!(
518                    "received message for unregistered epoch, but we are \
519                    neither running a consensus engine backing an epoch, nor \
520                    do we know what the latest finalized epoch is; there is \
521                    nothing to do",
522                );
523                return;
524            }
525        };
526
527        if reference_epoch >= their_epoch {
528            return;
529        }
530
531        let boundary_height = self
532            .config
533            .epoch_strategy
534            .last(reference_epoch)
535            .expect("our epoch strategy should cover all epochs");
536
537        tracing::debug!(
538            %reference_epoch,
539            %boundary_height,
540            "hinting to sync system that a finalization certificate might be \
541            available for our reference epoch",
542        );
543        self.config
544            .marshal
545            .hint_finalized(boundary_height, NonEmptyVec::new(from))
546            .await;
547    }
548}
549
550struct Metrics {
551    active_epochs: Gauge,
552    latest_epoch: Gauge,
553    latest_participants: Gauge,
554    how_often_signer: Counter,
555    how_often_verifier: Counter,
556}