Skip to main content

tempo_commonware_node/epoch/manager/
actor.rs

1//! Actor implementing the epoch manager logic.
2//!
3//! This actor is responsible for:
4//!
5//! 1. entering and exiting epochs given messages it receives from the DKG
6//!    manager.
7//! 2. catching the node up by listening to votes for unknown epoch and
8//!    requesting finalizations for the currently known boundary height.
9//!
10//! # Entering and exiting epochs
11//!
12//! When the actor receives an `Enter` message, it spins up a new simplex
13//! consensus engine backing the epoch stored in the message. The message also
14//! contains the public polynomial, share of the private key for this node,
15//! and the participants in the next epoch - all determined by the DKG ceremony.
16//! The engine receives a subchannel of the vote, certificate, and resolver
17//! p2p channels, multiplexed by the epoch.
18//!
19//! When the actor receives an `Exit` message, it exists the engine backing the
20//! epoch stored in it.
21//!
22//! # Catching up the node
23//!
24//! The actor makes use of the backup mechanism exposed by the subchannel
25//! multiplexer API: assume the actor has a simplex engine running for epoch 0,
26//! then this engine will have a subchannel registered on the multiplexer for
27//! epoch 0.
28//!
29//! If the actor now receives a vote in epoch 5 over its vote mux backup
30//! channel (since there are no subchannels registered with the muxer on
31//! epochs 1 through 5), it hints to the marshal actor that a finalization
32//! certificate for the node's *current* epoch's boundary height must exist.
33//!
34//! If such a finalization certificate exists, the marshal actor will fetch
35//! and verify it, and move the network finalized tip there. If that happens,
36//! the epoch manager actor will read the DKG outcome from the finalized tip
37//! and move on to the next epoch. It will not start a full simplex engine
38//! (the DKG manager is responsible for driving that), but it will "soft-enter"
39//! the new epoch by registering the new public polynomial on the scheme
40//! provider.
41//!
42//! This process is repeated until the node catches up to the current network
43//! epoch.
44use std::{collections::BTreeMap, num::NonZeroUsize};
45
46use alloy_consensus::BlockHeader as _;
47use commonware_codec::ReadExt as _;
48use commonware_consensus::{
49    Reporters,
50    marshal::Update,
51    simplex::{self, elector, scheme::bls12381_threshold::vrf::Scheme},
52    types::{Epoch, Epocher as _, Height},
53};
54use commonware_cryptography::ed25519::PublicKey;
55use commonware_macros::select;
56use commonware_p2p::{
57    Blocker, Receiver, Sender,
58    utils::mux::{Builder as _, MuxHandle, Muxer},
59};
60use commonware_parallel::Sequential;
61use commonware_runtime::{
62    BufferPooler, Clock, ContextCell, Handle, Metrics as _, Network, Spawner, Storage, spawn_cell,
63    telemetry::metrics::status::GaugeExt as _,
64};
65use commonware_utils::{Acknowledgement as _, vec::NonEmptyVec};
66use eyre::{ensure, eyre};
67use futures::{StreamExt as _, channel::mpsc};
68use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
69use rand_08::{CryptoRng, Rng};
70use tracing::{Level, Span, debug, error, error_span, info, instrument, warn, warn_span};
71
72use crate::{
73    consensus::Digest,
74    epoch::manager::ingress::{EpochTransition, Exit},
75};
76
77use super::ingress::{Content, Message};
78
79const REPLAY_BUFFER: NonZeroUsize = NonZeroUsize::new(8 * 1024 * 1024).expect("value is not zero"); // 8MB
80const WRITE_BUFFER: NonZeroUsize = NonZeroUsize::new(1024 * 1024).expect("value is not zero"); // 1MB
81
82pub(crate) struct Actor<TContext, TBlocker> {
83    active_epochs: BTreeMap<Epoch, (Handle<()>, ContextCell<TContext>)>,
84    config: super::Config<TBlocker>,
85    context: ContextCell<TContext>,
86    confirmed_latest_network_epoch: Option<Epoch>,
87    mailbox: mpsc::UnboundedReceiver<Message>,
88    metrics: Metrics,
89}
90
91impl<TContext, TBlocker> Actor<TContext, TBlocker>
92where
93    TBlocker: Blocker<PublicKey = PublicKey>,
94    // TODO(janis): are all of these bounds necessary?
95    TContext: BufferPooler
96        + Spawner
97        + commonware_runtime::Metrics
98        + Rng
99        + CryptoRng
100        + Clock
101        + governor::clock::Clock
102        + Storage
103        + Network,
104{
105    pub(super) fn new(
106        config: super::Config<TBlocker>,
107        context: TContext,
108        mailbox: mpsc::UnboundedReceiver<Message>,
109    ) -> Self {
110        let active_epochs = Gauge::default();
111        let latest_epoch = Gauge::default();
112        let latest_participants = Gauge::default();
113        let how_often_signer = Counter::default();
114        let how_often_verifier = Counter::default();
115
116        context.register(
117            "active_epochs",
118            "the number of epochs currently managed by the epoch manager",
119            active_epochs.clone(),
120        );
121        context.register(
122            "latest_epoch",
123            "the latest epoch managed by this epoch manager",
124            latest_epoch.clone(),
125        );
126        context.register(
127            "latest_participants",
128            "the number of participants in the most recently started epoch",
129            latest_participants.clone(),
130        );
131        context.register(
132            "how_often_signer",
133            "how often a node is a signer; a node is a signer if it has a share",
134            how_often_signer.clone(),
135        );
136        context.register(
137            "how_often_verifier",
138            "how often a node is a verifier; a node is a verifier if it does not have a share",
139            how_often_verifier.clone(),
140        );
141
142        Self {
143            config,
144            context: ContextCell::new(context),
145            mailbox,
146            metrics: Metrics {
147                active_epochs,
148                latest_epoch,
149                latest_participants,
150                how_often_signer,
151                how_often_verifier,
152            },
153            active_epochs: BTreeMap::new(),
154            confirmed_latest_network_epoch: None,
155        }
156    }
157
158    pub(crate) fn start(
159        mut self,
160        votes: (
161            impl Sender<PublicKey = PublicKey>,
162            impl Receiver<PublicKey = PublicKey>,
163        ),
164        certificates: (
165            impl Sender<PublicKey = PublicKey>,
166            impl Receiver<PublicKey = PublicKey>,
167        ),
168        resolver: (
169            impl Sender<PublicKey = PublicKey>,
170            impl Receiver<PublicKey = PublicKey>,
171        ),
172    ) -> Handle<()> {
173        spawn_cell!(self.context, self.run(votes, certificates, resolver).await)
174    }
175
176    async fn run(
177        mut self,
178        (vote_sender, vote_receiver): (
179            impl Sender<PublicKey = PublicKey>,
180            impl Receiver<PublicKey = PublicKey>,
181        ),
182        (certificate_sender, certificate_receiver): (
183            impl Sender<PublicKey = PublicKey>,
184            impl Receiver<PublicKey = PublicKey>,
185        ),
186        (resolver_sender, resolver_receiver): (
187            impl Sender<PublicKey = PublicKey>,
188            impl Receiver<PublicKey = PublicKey>,
189        ),
190    ) {
191        let (mux, mut vote_mux, mut vote_backup) = Muxer::builder(
192            self.context.with_label("vote_mux"),
193            vote_sender,
194            vote_receiver,
195            self.config.mailbox_size,
196        )
197        .with_backup()
198        .build();
199        mux.start();
200
201        let (mux, mut certificate_mux) = Muxer::builder(
202            self.context.with_label("certificate_mux"),
203            certificate_sender,
204            certificate_receiver,
205            self.config.mailbox_size,
206        )
207        .build();
208        mux.start();
209
210        let (mux, mut resolver_mux) = Muxer::new(
211            self.context.with_label("resolver_mux"),
212            resolver_sender,
213            resolver_receiver,
214            self.config.mailbox_size,
215        );
216        mux.start();
217
218        loop {
219            select!(
220                message = vote_backup.recv() => {
221                    let Some((their_epoch, (from, _))) = message else {
222                        error_span!("mux channel closed").in_scope(||
223                            error!("vote p2p mux channel closed; exiting actor")
224                        );
225                        break;
226                    };
227                    self.handle_msg_for_unregistered_epoch(
228                        Epoch::new(their_epoch),
229                        from,
230                    ).await;
231                },
232
233                msg = self.mailbox.next() => {
234                    let Some(msg) = msg else {
235                        warn_span!("mailboxes dropped").in_scope(||
236                             warn!("all mailboxes dropped; exiting actor"
237                        ));
238                        break;
239                    };
240                    let cause = msg.cause;
241                    match msg.content {
242                        Content::Enter(enter) => {
243                            let _: Result<_, _> = self
244                                .enter(
245                                    cause,
246                                    enter,
247                                    &mut vote_mux,
248                                    &mut certificate_mux,
249                                    &mut resolver_mux,
250                                )
251                                .await;
252                        }
253                        Content::Exit(exit) => self.exit(cause, exit),
254                        Content::Update(update) => {
255                            match *update {
256                                Update::Tip(_, height, digest) => {
257                                    let _ = self.handle_finalized_tip(height, digest).await;
258                                }
259                                Update::Block(_block, ack) => {
260                                    ack.acknowledge();
261                                }
262                            }
263                        }
264                    }
265                },
266            )
267        }
268    }
269
270    #[instrument(
271        parent = &cause,
272        skip_all,
273        fields(
274            %epoch,
275            network_identity = %public.public(),
276            ?participants,
277        ),
278        err(level = Level::WARN)
279    )]
280    async fn enter(
281        &mut self,
282        cause: Span,
283        EpochTransition {
284            epoch,
285            public,
286            share,
287            participants,
288        }: EpochTransition,
289        vote_mux: &mut MuxHandle<
290            impl Sender<PublicKey = PublicKey>,
291            impl Receiver<PublicKey = PublicKey>,
292        >,
293        certificates_mux: &mut MuxHandle<
294            impl Sender<PublicKey = PublicKey>,
295            impl Receiver<PublicKey = PublicKey>,
296        >,
297        resolver_mux: &mut MuxHandle<
298            impl Sender<PublicKey = PublicKey>,
299            impl Receiver<PublicKey = PublicKey>,
300        >,
301    ) -> eyre::Result<()> {
302        if let Some(latest) = self.active_epochs.last_key_value().map(|(k, _)| *k) {
303            ensure!(
304                epoch > latest,
305                "requested to start an epoch `{epoch}` older than the latest \
306                running, `{latest}`; refusing",
307            );
308        }
309
310        let n_participants = participants.len();
311        // Register the new signing scheme with the scheme provider.
312        let is_signer = matches!(share, Some(..));
313        let scheme = if let Some(share) = share {
314            info!("we have a share for this epoch, participating as a signer",);
315            Scheme::signer(crate::config::NAMESPACE, participants, public, share)
316                .expect("our private share must match our slice of the public key")
317        } else {
318            info!("we don't have a share for this epoch, participating as a verifier",);
319            Scheme::verifier(crate::config::NAMESPACE, participants, public)
320        };
321        self.config.scheme_provider.register(epoch, scheme.clone());
322
323        // Manage the context so we can explicitly drop during cleanup, releasing
324        // all metrics associated with this context.
325        let engine_ctx = self
326            .context
327            .with_label("simplex")
328            .with_attribute("epoch", epoch)
329            .with_scope();
330
331        let engine = simplex::Engine::new(
332            engine_ctx.clone(),
333            simplex::Config {
334                scheme,
335                elector: elector::Random,
336                blocker: self.config.blocker.clone(),
337                automaton: self.config.application.clone(),
338                relay: self.config.application.clone(),
339                reporter: Reporters::<_, crate::subblocks::Mailbox, _>::from((
340                    self.config.subblocks.clone(),
341                    Reporters::from((self.config.marshal.clone(), self.config.feed.clone())),
342                )),
343                partition: format!(
344                    "{partition_prefix}_consensus_epoch_{epoch}",
345                    partition_prefix = self.config.partition_prefix
346                ),
347                mailbox_size: self.config.mailbox_size,
348                epoch,
349
350                replay_buffer: REPLAY_BUFFER,
351                write_buffer: WRITE_BUFFER,
352                page_cache: self.config.page_cache.clone(),
353
354                leader_timeout: self.config.time_to_propose,
355                certification_timeout: self.config.time_to_collect_notarizations,
356                timeout_retry: self.config.time_to_retry_nullify_broadcast,
357                fetch_timeout: self.config.time_for_peer_response,
358                activity_timeout: self.config.views_to_track,
359                skip_timeout: self.config.views_until_leader_skip,
360
361                fetch_concurrent: crate::config::NUMBER_CONCURRENT_FETCHES,
362
363                strategy: Sequential,
364            },
365        );
366
367        let vote = vote_mux.register(epoch.get()).await.unwrap();
368        let certificate = certificates_mux.register(epoch.get()).await.unwrap();
369        let resolver = resolver_mux.register(epoch.get()).await.unwrap();
370
371        assert!(
372            self.active_epochs
373                .insert(
374                    epoch,
375                    (engine.start(vote, certificate, resolver), engine_ctx)
376                )
377                .is_none(),
378            "there must be no other active engine running: this was ensured at \
379            the beginning of this method",
380        );
381
382        let latest = self.confirmed_latest_network_epoch.get_or_insert(epoch);
383        *latest = (*latest).max(epoch);
384
385        info!("started consensus engine backing the epoch");
386
387        self.metrics.latest_participants.set(n_participants as i64);
388        self.metrics.active_epochs.inc();
389        let _ = self.metrics.latest_epoch.try_set(epoch.get());
390        self.metrics.how_often_signer.inc_by(is_signer as u64);
391        self.metrics.how_often_verifier.inc_by(!is_signer as u64);
392
393        Ok(())
394    }
395
396    #[instrument(parent = &cause, skip_all, fields(epoch))]
397    fn exit(&mut self, cause: Span, Exit { epoch }: Exit) {
398        if let Some((engine, engine_ctx)) = self.active_epochs.remove(&epoch) {
399            drop(engine_ctx);
400            engine.abort();
401            info!("stopped engine backing epoch");
402        } else {
403            warn!(
404                "attempted to exit unknown epoch, but epoch was not backed by \
405                an active engine",
406            );
407        }
408
409        if !self.config.scheme_provider.delete(&epoch) {
410            warn!(
411                "attempted to delete scheme for epoch, but epoch had no scheme \
412                registered"
413            );
414        }
415    }
416
417    #[instrument(
418        skip_all,
419        fields(%height, epoch = tracing::field::Empty),
420        err,
421    )]
422    async fn handle_finalized_tip(&mut self, height: Height, digest: Digest) -> eyre::Result<()> {
423        let epoch_info = self
424            .config
425            .epoch_strategy
426            .containing(height)
427            .expect("epoch strategy is valid for all epochs and heights");
428        Span::current().record("epoch", tracing::field::display(epoch_info.epoch()));
429
430        {
431            let network_epoch = self
432                .confirmed_latest_network_epoch
433                .get_or_insert(epoch_info.epoch());
434            *network_epoch = (*network_epoch).max(epoch_info.epoch());
435        }
436
437        // If the tip contains a boundary block, then:
438        //
439        // 1. request the block from the marshal actor;
440        // 2. read the DKG outcome from the block header;
441        // 3. register the DKG scheme on the scheme provider;
442        // 4. set the confirmed network height to the value in the on-chain
443        // DKG outcome.
444        //
445        // This soft enters the new epoch without spinning up a new simplex
446        // engine, and allows the epoch manager to forward more finalization
447        // hints to the marshal actor.
448        if epoch_info.last() == height {
449            info!(
450                "the finalized tip is a boundary block; requesting the \
451                block to set the scheme for its epoch"
452            );
453            let block = self
454                .config
455                .marshal
456                .subscribe_by_digest(None, digest)
457                .await
458                .await
459                .map_err(|_| eyre!("marshal never returned the block"))?;
460            let onchain_outcome = tempo_dkg_onchain_artifacts::OnchainDkgOutcome::read(
461                &mut block.header().extra_data().as_ref(),
462            )
463            .expect("boundary blocks must contain DKG outcomes");
464            self.config.scheme_provider.register(
465                onchain_outcome.epoch,
466                Scheme::verifier(
467                    crate::config::NAMESPACE,
468                    onchain_outcome.players().clone(),
469                    onchain_outcome.sharing().clone(),
470                ),
471            );
472            self.confirmed_latest_network_epoch
473                .replace(onchain_outcome.epoch);
474            debug!(
475                next_epoch = %onchain_outcome.epoch,
476                "read DKG outcome from boundary and registered scheme",
477            );
478        }
479        Ok(())
480    }
481
482    /// Handles messages for epochs received on un-registered sub-channels.
483    ///
484    /// If `their_epoch` is known (equal to our current epoch or in the past),
485    /// no action is taken.
486    ///
487    /// If `their_epoch` is in the future, then a hint is sent to the marshal
488    /// actor that a boundary certificate could be fetched.
489    #[instrument(
490        skip_all,
491        fields(msg.epoch = %their_epoch, msg.from = %from),
492    )]
493    async fn handle_msg_for_unregistered_epoch(&mut self, their_epoch: Epoch, from: PublicKey) {
494        let reference_epoch = match (
495            self.active_epochs.keys().last().copied(),
496            self.confirmed_latest_network_epoch,
497        ) {
498            (Some(our), None) => our,
499            (Some(our), Some(confirmed_finalized)) => our.max(confirmed_finalized),
500            (None, Some(confirmed_finalized)) => confirmed_finalized,
501            (None, None) => {
502                debug!(
503                    "received message for unregistered epoch, but we are \
504                    neither running a consensus engine backing an epoch, nor \
505                    do we know what the latest finalized epoch is; there is \
506                    nothing to do",
507                );
508                return;
509            }
510        };
511
512        if reference_epoch >= their_epoch {
513            return;
514        }
515
516        let boundary_height = self
517            .config
518            .epoch_strategy
519            .last(reference_epoch)
520            .expect("our epoch strategy should cover all epochs");
521
522        tracing::debug!(
523            %reference_epoch,
524            %boundary_height,
525            "hinting to sync system that a finalization certificate might be \
526            available for our reference epoch",
527        );
528        self.config
529            .marshal
530            .hint_finalized(boundary_height, NonEmptyVec::new(from))
531            .await;
532    }
533}
534
535struct Metrics {
536    active_epochs: Gauge,
537    latest_epoch: Gauge,
538    latest_participants: Gauge,
539    how_often_signer: Counter,
540    how_often_verifier: Counter,
541}