tempo_commonware_node/epoch/manager/
actor.rs

1//! Actor implementing the epoch manager logic.
2//!
3//! This actor is responsible for:
4//!
5//! 1. entering and exiting epochs given messages it receives from the DKG
6//!    manager.
7//! 2. catching the node up by listening to votes for unknown epoch and
8//!    requesting finalizations for the currently known boundary height.
9//!
10//! # Entering and exiting epochs
11//!
12//! When the actor receives an `Enter` message, it spins up a new simplex
13//! consensus engine backing the epoch stored in the message. The message also
14//! contains the public polynomial, share of the private key for this node,
15//! and the participants in the next epoch - all determined by the DKG ceremony.
16//! The engine receives a subchannel of the recovered, pending, and resolver
17//! p2p channels, multiplexed by the epoch.
18//!
19//! When the actor receives an `Exit` message, it exists the engine backing the
20//! epoch stored in it.
21//!
22//! # Catching up the node
23//!
24//! The actor makes use of the backup mechanism exposed by the subchannel
25//! multiplexer API: assume the actor has a simplex engine running for epoch 0,
26//! then this engine will have a subchannel registered on the multiplexer for
27//! epoch 0.
28//!
29//! If the actor now receives a vote in epoch 5 over its pending mux backup
30//! channel (since there are no subchannels registered with the muxer on
31//! epochs 1 through 5), it will request the finalization certificate for the
32//! boundary height of epoch 0 from the voter. This request is done over the
33//! boundary certificates p2p network.
34//!
35//! Upon receipt of the request for epoch 0 over the boundary certificates p2p
36//! network, the voter will send the finalization certificate to the *recovered*
37//! p2p network, tagged by epoch 0.
38//!
39//! Finally, this certificate is received by the running simplex engine
40//! (since remember, it's active for epoch 0), and subsequently forwarded to
41//! the marshal actor, which finally is able to fetch all finalizations up to
42//! the boundary height, which will eventually trigger the node to transition to
43//! epoch 1.
44//!
45//! This process is repeated until the node catches up to the current network
46//! epoch.
47use std::{collections::BTreeMap, num::NonZeroUsize};
48
49use bytes::Bytes;
50use commonware_codec::{DecodeExt as _, Encode as _, varint::UInt};
51use commonware_consensus::{
52    Reporters,
53    simplex::{self, signing_scheme::bls12381_threshold::Scheme, types::Voter},
54    types::Epoch,
55    utils,
56};
57use commonware_cryptography::{bls12381::primitives::variant::MinSig, ed25519::PublicKey};
58use commonware_macros::select;
59use commonware_p2p::{
60    Blocker, Receiver, Recipients, Sender,
61    utils::mux::{Builder as _, GlobalSender, MuxHandle, Muxer},
62};
63use commonware_runtime::{
64    Clock, ContextCell, Handle, Metrics as _, Network, Spawner, Storage, spawn_cell,
65};
66use eyre::{WrapErr as _, ensure, eyre};
67use futures::{StreamExt as _, channel::mpsc};
68use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
69use rand::{CryptoRng, Rng};
70use tracing::{Level, Span, error, error_span, info, instrument, warn, warn_span};
71
72use crate::{
73    consensus::Digest,
74    epoch::manager::ingress::{Enter, Exit},
75};
76
77use super::ingress::Message;
78
79const REPLAY_BUFFER: NonZeroUsize = NonZeroUsize::new(8 * 1024 * 1024).expect("value is not zero"); // 8MB
80const WRITE_BUFFER: NonZeroUsize = NonZeroUsize::new(1024 * 1024).expect("value is not zero"); // 1MB
81
82pub(crate) struct Actor<TBlocker, TContext> {
83    active_epochs: BTreeMap<Epoch, Handle<()>>,
84    config: super::Config<TBlocker>,
85    context: ContextCell<TContext>,
86    mailbox: mpsc::UnboundedReceiver<Message>,
87    metrics: Metrics,
88}
89
90impl<TBlocker, TContext> Actor<TBlocker, TContext>
91where
92    TBlocker: Blocker<PublicKey = PublicKey>,
93    // TODO(janis): are all of these bounds necessary?
94    TContext: Spawner
95        + commonware_runtime::Metrics
96        + Rng
97        + CryptoRng
98        + Clock
99        + governor::clock::Clock
100        + Storage
101        + Network,
102{
103    pub(super) fn new(
104        config: super::Config<TBlocker>,
105        context: TContext,
106        mailbox: mpsc::UnboundedReceiver<Message>,
107    ) -> Self {
108        let active_epochs = Gauge::default();
109        let latest_epoch = Gauge::default();
110        let latest_participants = Gauge::default();
111        let how_often_signer = Counter::default();
112        let how_often_verifier = Counter::default();
113
114        context.register(
115            "active_epochs",
116            "the number of epochs currently managed by the epoch manager",
117            active_epochs.clone(),
118        );
119        context.register(
120            "latest_epoch",
121            "the latest epoch managed by this epoch manager",
122            latest_epoch.clone(),
123        );
124        context.register(
125            "latest_participants",
126            "the number of participants in the most recently started epoch",
127            latest_participants.clone(),
128        );
129        context.register(
130            "how_often_signer",
131            "how often a node is a signer; a node is a signer if it has a share",
132            how_often_signer.clone(),
133        );
134        context.register(
135            "how_often_verifier",
136            "how often a node is a verifier; a node is a verifier if it does not have a share",
137            how_often_verifier.clone(),
138        );
139
140        Self {
141            config,
142            context: ContextCell::new(context),
143            mailbox,
144            metrics: Metrics {
145                active_epochs,
146                latest_epoch,
147                latest_participants,
148                how_often_signer,
149                how_often_verifier,
150            },
151            active_epochs: BTreeMap::new(),
152        }
153    }
154
155    pub(crate) fn start(
156        mut self,
157        pending: (
158            impl Sender<PublicKey = PublicKey>,
159            impl Receiver<PublicKey = PublicKey>,
160        ),
161        recovered: (
162            impl Sender<PublicKey = PublicKey>,
163            impl Receiver<PublicKey = PublicKey>,
164        ),
165        resolver: (
166            impl Sender<PublicKey = PublicKey>,
167            impl Receiver<PublicKey = PublicKey>,
168        ),
169        boundary_certificates: (
170            impl Sender<PublicKey = PublicKey>,
171            impl Receiver<PublicKey = PublicKey>,
172        ),
173    ) -> Handle<()> {
174        spawn_cell!(
175            self.context,
176            self.run(pending, recovered, resolver, boundary_certificates)
177                .await
178        )
179    }
180
181    async fn run(
182        mut self,
183        (pending_sender, pending_receiver): (
184            impl Sender<PublicKey = PublicKey>,
185            impl Receiver<PublicKey = PublicKey>,
186        ),
187        (recovered_sender, recovered_receiver): (
188            impl Sender<PublicKey = PublicKey>,
189            impl Receiver<PublicKey = PublicKey>,
190        ),
191        (resolver_sender, resolver_receiver): (
192            impl Sender<PublicKey = PublicKey>,
193            impl Receiver<PublicKey = PublicKey>,
194        ),
195        (mut boundary_certificates_sender, mut boundary_certificates_receiver): (
196            impl Sender<PublicKey = PublicKey>,
197            impl Receiver<PublicKey = PublicKey>,
198        ),
199    ) {
200        let (mux, mut pending_mux, mut pending_backup) = Muxer::builder(
201            self.context.with_label("pending_mux"),
202            pending_sender,
203            pending_receiver,
204            self.config.mailbox_size,
205        )
206        .with_backup()
207        .build();
208        mux.start();
209
210        let (mux, mut recovered_mux, mut recovered_global_sender) = Muxer::builder(
211            self.context.with_label("recovered_mux"),
212            recovered_sender,
213            recovered_receiver,
214            self.config.mailbox_size,
215        )
216        .with_global_sender()
217        .build();
218        mux.start();
219
220        let (mux, mut resolver_mux) = Muxer::new(
221            self.context.with_label("resolver_mux"),
222            resolver_sender,
223            resolver_receiver,
224            self.config.mailbox_size,
225        );
226        mux.start();
227
228        loop {
229            select!(
230                message = pending_backup.next() => {
231                    let Some((their_epoch, (from, _))) = message else {
232                        error_span!("mux channel closed").in_scope(||
233                            error!("pending p2p mux channel closed; exiting actor"
234                        ));
235                        break;
236                    };
237                    let _: Result<_, _>  = self.handle_msg_for_unregistered_epoch(
238                        &mut boundary_certificates_sender,
239                        their_epoch,
240                        from,
241                    ).await;
242                },
243
244                message = boundary_certificates_receiver.recv() => {
245                    let (from, payload) = match message {
246                        Err(error) => {
247                            error_span!("epoch channel closed").in_scope(||
248                                error!(
249                                    error = %eyre::Report::new(error),
250                                    "epoch p2p channel closed; exiting actor",
251                            ));
252                        break;
253                        }
254                        Ok(msg) => msg,
255                    };
256                    let _: Result<_, _>  = self.handle_boundary_certificate_request(
257                        from,
258                        payload,
259                        &mut recovered_global_sender)
260                    .await;
261                },
262
263                msg = self.mailbox.next()=>  {
264                    let Some(msg) = msg else {
265                        warn_span!("mailboxes dropped").in_scope(||
266                             warn!("all mailboxes dropped; exiting actor"
267                        ));
268                        break;
269                    };
270                    let cause = msg.cause;
271                    match msg.activity {
272                        super::ingress::Activity::Enter(enter) => {
273                            let _: Result<_, _> = self
274                                .enter(
275                                    cause,
276                                    enter,
277                                    &mut pending_mux,
278                                    &mut recovered_mux,
279                                    &mut resolver_mux,
280                                )
281                                .await;
282                        }
283                        super::ingress::Activity::Exit(exit) => self.exit(cause, exit),
284                    }
285                },
286            )
287        }
288    }
289
290    #[instrument(
291        parent = &cause,
292        skip_all,
293        fields(
294            %epoch,
295            ?public,
296            ?participants,
297        ),
298        err(level = Level::WARN)
299    )]
300    async fn enter(
301        &mut self,
302        cause: Span,
303        Enter {
304            epoch,
305            public,
306            share,
307            participants,
308        }: Enter,
309        pending_mux: &mut MuxHandle<
310            impl Sender<PublicKey = PublicKey>,
311            impl Receiver<PublicKey = PublicKey>,
312        >,
313        recovered_mux: &mut MuxHandle<
314            impl Sender<PublicKey = PublicKey>,
315            impl Receiver<PublicKey = PublicKey>,
316        >,
317        resolver_mux: &mut MuxHandle<
318            impl Sender<PublicKey = PublicKey>,
319            impl Receiver<PublicKey = PublicKey>,
320        >,
321    ) -> eyre::Result<()> {
322        ensure!(
323            !self.active_epochs.contains_key(&epoch),
324            "an engine for the entered epoch is already running; ignoring",
325        );
326
327        let n_participants = participants.len();
328        // Register the new signing scheme with the scheme provider.
329        let scheme = if let Some(share) = share {
330            info!("we have a share for this epoch, participating as a signer",);
331            Scheme::new(participants, &public, share)
332        } else {
333            info!("we don't have a share for this epoch, participating as a verifier",);
334            Scheme::verifier(participants, &public)
335        };
336        assert!(
337            self.config.scheme_provider.register(epoch, scheme.clone()),
338            "a scheme must never be registered twice",
339        );
340
341        let is_signer = matches!(scheme, Scheme::Signer { .. });
342
343        let engine = simplex::Engine::new(
344            self.context.with_label("consensus_engine"),
345            simplex::Config {
346                scheme,
347                blocker: self.config.blocker.clone(),
348                automaton: self.config.application.clone(),
349                relay: self.config.application.clone(),
350                reporter: Reporters::from((
351                    self.config.subblocks.clone(),
352                    self.config.marshal.clone(),
353                )),
354                partition: format!(
355                    "{partition_prefix}_consensus_epoch_{epoch}",
356                    partition_prefix = self.config.partition_prefix
357                ),
358                mailbox_size: self.config.mailbox_size,
359                epoch,
360                namespace: crate::config::NAMESPACE.to_vec(),
361
362                replay_buffer: REPLAY_BUFFER,
363                write_buffer: WRITE_BUFFER,
364                buffer_pool: self.config.buffer_pool.clone(),
365
366                leader_timeout: self.config.time_to_propose,
367                notarization_timeout: self.config.time_to_collect_notarizations,
368                nullify_retry: self.config.time_to_retry_nullify_broadcast,
369                fetch_timeout: self.config.time_for_peer_response,
370                activity_timeout: self.config.views_to_track,
371                skip_timeout: self.config.views_until_leader_skip,
372
373                fetch_concurrent: crate::config::NUMBER_CONCURRENT_FETCHES,
374                fetch_rate_per_peer: crate::config::RESOLVER_LIMIT,
375            },
376        );
377
378        let pending_sc = pending_mux.register(epoch).await.unwrap();
379        let recovered_sc = recovered_mux.register(epoch).await.unwrap();
380        let resolver_sc = resolver_mux.register(epoch).await.unwrap();
381
382        assert!(
383            self.active_epochs
384                .insert(epoch, engine.start(pending_sc, recovered_sc, resolver_sc))
385                .is_none(),
386            "there must be no other active engine running: this was ensured at \
387            the beginning of this method",
388        );
389
390        info!("started consensus engine backing the epoch");
391
392        self.metrics.latest_participants.set(n_participants as i64);
393        self.metrics.active_epochs.inc();
394        self.metrics.latest_epoch.set(epoch as i64);
395        self.metrics.how_often_signer.inc_by(is_signer as u64);
396        self.metrics.how_often_verifier.inc_by(!is_signer as u64);
397
398        Ok(())
399    }
400
401    #[instrument(parent = &cause, skip_all, fields(epoch))]
402    fn exit(&mut self, cause: Span, Exit { epoch }: Exit) {
403        if let Some(engine) = self.active_epochs.remove(&epoch) {
404            engine.abort();
405            info!("stopped engine backing epoch");
406        } else {
407            warn!(
408                "attempted to exit unknown epoch, but epoch was not backed by \
409                an active engine",
410            );
411        }
412
413        if !self.config.scheme_provider.delete(&epoch) {
414            warn!(
415                "attempted to delete scheme for epoch, but epoch had no scheme \
416                registered"
417            );
418        }
419    }
420
421    /// Handles messages for epochs received on un-registered sub-channels.
422    ///
423    /// If `their_epoch` is known (equal to our current epoch or in the past),
424    /// no action is taken.
425    ///
426    /// If `their_epoch` is in the future, then the finalization certificate for
427    /// our latest epoch is requested from the sender.
428    ///
429    /// This makes use of commonware's backup channels: when starting a new
430    /// engine, we register a new subchannel with the muxer and tagged with that
431    /// epoch. Upon receiving a message on an un-registered epoch, the
432    /// commonware p2p muxer will send the message to the backup channel, tagged
433    /// with the unknown epoch.
434    #[instrument(skip_all, fields(msg.epoch = their_epoch, msg.from = %from), err(level = Level::INFO))]
435    async fn handle_msg_for_unregistered_epoch(
436        &mut self,
437        boundary_certificates_sender: &mut impl Sender<PublicKey = PublicKey>,
438        their_epoch: Epoch,
439        from: PublicKey,
440    ) -> eyre::Result<()> {
441        let Some(our_epoch) = self.active_epochs.keys().last().copied() else {
442            return Err(eyre!(
443                "received message over unregistered epoch channel, but we have no active epochs at all"
444            ));
445        };
446        ensure!(
447            their_epoch > our_epoch,
448            "request epoch `{their_epoch}` is in our past, no action is necessary",
449        );
450
451        let boundary_height = utils::last_block_in_epoch(self.config.epoch_length, our_epoch);
452        ensure!(
453            self.config
454                .marshal
455                .get_finalization(boundary_height)
456                .await
457                .is_none(),
458            "finalization certificate for epoch `{our_epoch}` at boundary \
459            height `{boundary_height}` is already known; no action necessary",
460        );
461
462        boundary_certificates_sender
463            .send(
464                Recipients::One(from),
465                UInt(our_epoch).encode().freeze(),
466                true,
467            )
468            .await
469            .wrap_err("failed request for finalization certificate of our epoch")?;
470
471        info!("requested finalization certificate for our epoch");
472
473        Ok(())
474    }
475
476    #[instrument(skip_all, fields(
477        msg.from = %from,
478        msg.payload_len = bytes.len(),
479        msg.decoded_epoch = tracing::field::Empty,
480    ), err(level = Level::WARN))]
481    async fn handle_boundary_certificate_request(
482        &mut self,
483        from: PublicKey,
484        bytes: Bytes,
485        recovered_global_sender: &mut GlobalSender<impl Sender<PublicKey = PublicKey>>,
486    ) -> eyre::Result<()> {
487        let requested_epoch = UInt::<Epoch>::decode(bytes.as_ref())
488            .wrap_err("failed decoding epoch channel payload as epoch")?
489            .into();
490        tracing::Span::current().record("msg.decoded_epoch", requested_epoch);
491        let boundary_height = utils::last_block_in_epoch(self.config.epoch_length, requested_epoch);
492        let cert = self
493            .config
494            .marshal
495            .get_finalization(boundary_height)
496            .await
497            .ok_or_else(|| {
498                eyre!(
499                    "do not have finalization for requested epoch \
500                    `{requested_epoch}`, boundary height `{boundary_height}` \
501                    available locally; cannot serve request"
502                )
503            })?;
504        let message = Voter::<Scheme<PublicKey, MinSig>, Digest>::Finalization(cert);
505        recovered_global_sender
506            .send(
507                requested_epoch,
508                Recipients::One(from),
509                message.encode().freeze(),
510                false,
511            )
512            .await
513            .wrap_err(
514                "failed forwarding finalization certificate to requester via `recovered` channel",
515            )?;
516        Ok(())
517    }
518}
519
520struct Metrics {
521    active_epochs: Gauge,
522    latest_epoch: Gauge,
523    latest_participants: Gauge,
524    how_often_signer: Counter,
525    how_often_verifier: Counter,
526}