tempo_commonware_node/dkg/manager/actor/
pre_allegretto.rs

1use std::net::SocketAddr;
2
3use commonware_codec::{EncodeSize, RangeCfg, Read, ReadExt as _, Write, varint::UInt};
4use commonware_consensus::{Block as _, Reporter as _, types::Epoch, utils};
5use commonware_cryptography::{
6    bls12381::primitives::{group::Share, poly::Public, variant::MinSig},
7    ed25519::PublicKey,
8};
9use commonware_p2p::{Receiver, Sender, utils::mux::MuxHandle};
10use commonware_runtime::{Clock, ContextCell, Metrics, Spawner, Storage};
11use commonware_storage::metadata::Metadata;
12use commonware_utils::{
13    quorum,
14    sequence::U64,
15    set::{Ordered, OrderedAssociated},
16};
17use eyre::{OptionExt as _, WrapErr as _};
18use rand_core::CryptoRngCore;
19use tempo_chainspec::hardfork::TempoHardforks;
20use tempo_dkg_onchain_artifacts::PublicOutcome;
21use tracing::{Span, info, instrument, warn};
22
23use crate::{
24    consensus::block::Block,
25    dkg::{
26        HardforkRegime,
27        ceremony::{self, Ceremony},
28        manager::validators::ValidatorState,
29    },
30    epoch,
31};
32
33const CURRENT_EPOCH_KEY: U64 = U64::new(0);
34const PREVIOUS_EPOCH_KEY: U64 = U64::new(1);
35
36impl<TContext, TPeerManager> super::Actor<TContext, TPeerManager>
37where
38    TContext: Clock + CryptoRngCore + commonware_runtime::Metrics + Spawner + Storage,
39    TPeerManager: commonware_p2p::Manager<
40            PublicKey = PublicKey,
41            Peers = OrderedAssociated<PublicKey, SocketAddr>,
42        >,
43{
44    /// Runs the pre-allegretto initialization routines.
45    ///
46    /// This is a no-op if post-allegretto artifacts exists on disk and there no
47    /// pre-allegretto artifacts remaining. The assumption is that the last pre-
48    /// allegretto ceremony deletes its state from disk.
49    ///
50    /// If neither pre- nor post-allegretto artifacts are found, this method
51    /// assumes that the node is starting from genesis.
52    #[instrument(skip_all, err)]
53    pub(super) async fn pre_allegretto_init(&mut self) -> eyre::Result<()> {
54        if !self.post_allegretto_metadatas.exists() {
55            let spec = self.config.execution_node.chain_spec();
56            let public_polynomial = spec
57                .info
58                .public_polynomial()
59                .clone()
60                .ok_or_eyre("chainspec did not contain publicPolynomial; cannot go on without it")?
61                .into_inner();
62
63            let validators = spec
64                .info
65                .validators()
66                .clone()
67                .ok_or_eyre("chainspec did not contain validators; cannot go on without them")?
68                .into_inner();
69
70            if self
71                .pre_allegretto_metadatas
72                .epoch_metadata
73                .get(&CURRENT_EPOCH_KEY)
74                .is_none()
75            {
76                self.pre_allegretto_metadatas
77                    .epoch_metadata
78                    .put_sync(
79                        CURRENT_EPOCH_KEY,
80                        EpochState {
81                            epoch: 0,
82                            participants: validators.keys().clone(),
83                            public: public_polynomial,
84                            share: self.config.initial_share.clone(),
85                        },
86                    )
87                    .await
88                    .expect("must always be able to persists state");
89            }
90
91            // Safeguard when updating from older binaries that might not yet have written
92            // the validators metadata.
93            //
94            // Note that pre-allegretto the validator set never changes.
95            let current_epoch = self
96                .pre_allegretto_metadatas
97                .epoch_metadata
98                .get(&CURRENT_EPOCH_KEY)
99                .expect("we ensured above that the epoch state is initialized")
100                .epoch();
101            self.validators_metadata
102                .put_sync(
103                    // Write the validators for the *previous* epoch. This assumes
104                    // that after this state is written, self.register_current_epoch_state
105                    // is called that will set the validators for the *current*
106                    // epoch.
107                    current_epoch.saturating_sub(1).into(),
108                    ValidatorState::with_unknown_contract_state(validators.clone()),
109                )
110                .await
111                .expect("must always be able to write state");
112        }
113        Ok(())
114    }
115
116    /// Handles a finalized block.
117    ///
118    /// Depending on which height of an epoch the block is, this method exhibits
119    /// different behavior:
120    ///
121    /// + first height of an epoch: notify the epoch manager that the previous
122    /// epoch can be shut down.
123    /// + first half of an epoch: distribute the shares generated during the
124    /// DKG ceremony and collect shares from other dealers, and acks from other
125    /// players.
126    /// + exact middle of an epoch: generate the intermediate outcome of the
127    /// ceremony.
128    /// + second half of an epoch: read intermediate outcomes from blocks.
129    /// + pre-to-last height of an epoch: generate the overall ceremony outcome,
130    /// update CURRENT_EPOCH_KEY.
131    /// + last height of an epoch: notify the epoch manager that a new epoch can
132    /// be started, using the outcome of the last epoch. Start a new ceremony
133    /// for the next epoch.
134    #[instrument(
135        parent = &cause,
136        skip_all,
137        fields(
138            block.derived_epoch = utils::epoch(self.config.epoch_length, block.height()),
139            block.height = block.height(),
140            block.timestamp = block.timestamp(),
141            latest_epoch = self.current_epoch_state().epoch(),
142        ),
143    )]
144    pub(super) async fn handle_finalized_pre_allegretto<TReceiver, TSender>(
145        &mut self,
146        cause: Span,
147        block: Block,
148        maybe_ceremony: &mut Option<Ceremony<ContextCell<TContext>, TReceiver, TSender>>,
149        ceremony_mux: &mut MuxHandle<TSender, TReceiver>,
150    ) where
151        TReceiver: Receiver<PublicKey = PublicKey>,
152        TSender: Sender<PublicKey = PublicKey>,
153    {
154        // Special case the last height.
155        //
156        // On the last height, the new ("current") ceremony can be entered
157        // because that is what provides the "genesis" of the new epoch.
158        if let Some(block_epoch) =
159            utils::is_last_block_in_epoch(self.config.epoch_length, block.height())
160        {
161            let epoch_state = self.current_epoch_state();
162            if block_epoch + 1 == epoch_state.epoch() {
163                if self
164                    .config
165                    .execution_node
166                    .chain_spec()
167                    .is_allegretto_active_at_timestamp(block.timestamp())
168                {
169                    info!(
170                        "block timestamp is after allegretto hardfork; attempting \
171                        to transition to dynamic validator sets by reading validators \
172                        from smart contract",
173                    );
174                    match self
175                        .transition_to_dynamic_validator_sets(ceremony_mux)
176                        .await
177                    {
178                        Ok(ceremony) => {
179                            maybe_ceremony.replace(ceremony);
180                            info!(
181                                "transitioning to dynamic validator sets was successful; \
182                                deleting current pre-allegretto epoch state and leaving \
183                                DKG logic to the post-hardfork routines",
184                            );
185                            self.pre_allegretto_metadatas
186                                .delete_current_epoch_state()
187                                .await;
188                            return;
189                        }
190                        Err(error) => {
191                            self.metrics.failed_allegretto_transitions.inc();
192                            warn!(
193                                %error,
194                                "transitioning to dynamic validator sets was not \
195                                successful; will attempt again next epoch"
196                            );
197                        }
198                    }
199                }
200
201                // NOTE: This acts as restart protection: on pre-allegretto,
202                // CURRENT_EPOCH_KEY is updated on the block *last height - 1*.
203                // If a node restarts, it immediately starts a ceremony for
204                // CURRENT_EPOCH_KEY, and then starts processing *last height*.
205                //
206                // This attempt to create a ceremony with the same mux subchannel
207                // and fail.
208                if maybe_ceremony.is_none()
209                    || maybe_ceremony
210                        .as_ref()
211                        .is_some_and(|ceremony| ceremony.epoch() != epoch_state.epoch())
212                {
213                    maybe_ceremony.replace(self.start_pre_allegretto_ceremony(ceremony_mux).await);
214                }
215                self.register_current_epoch_state().await;
216            } else {
217                warn!(
218                    "block was a boundary block, but not the last block of the \
219                    previous epoch; ignoring it"
220                );
221            }
222            return;
223        }
224
225        // Notify the epoch manager that the first height of the new epoch
226        // was entered and the previous epoch can be exited.
227        //
228        // Recall, for an epoch length E the first heights are 0E, 1E, 2E, ...
229        if block.height().is_multiple_of(self.config.epoch_length)
230            && let Some(old_epoch_state) = self
231                .pre_allegretto_metadatas
232                .epoch_metadata
233                .remove(&PREVIOUS_EPOCH_KEY)
234        {
235            self.config
236                .epoch_manager
237                .report(
238                    epoch::Exit {
239                        epoch: old_epoch_state.epoch,
240                    }
241                    .into(),
242                )
243                .await;
244            self.pre_allegretto_metadatas
245                .epoch_metadata
246                .sync()
247                .await
248                .expect("must always be able to persist state");
249        }
250
251        let mut ceremony = maybe_ceremony
252            .take()
253            .expect("a ceremony must always exist except for the last block");
254
255        match epoch::relative_position(block.height(), self.config.epoch_length) {
256            epoch::RelativePosition::FirstHalf => {
257                let _ = ceremony.distribute_shares().await;
258                let _ = ceremony.process_messages().await;
259            }
260            epoch::RelativePosition::Middle => {
261                let _ = ceremony.process_messages().await;
262                let _ = ceremony
263                    .construct_intermediate_outcome(HardforkRegime::PreAllegretto)
264                    .await;
265            }
266            epoch::RelativePosition::SecondHalf => {
267                let _ = ceremony
268                    .process_dealings_in_block(&block, HardforkRegime::PreAllegretto)
269                    .await;
270            }
271        }
272
273        let is_one_before_boundary =
274            utils::is_last_block_in_epoch(self.config.epoch_length, block.height() + 1).is_some();
275
276        if !is_one_before_boundary {
277            assert!(
278                maybe_ceremony.replace(ceremony).is_none(),
279                "ceremony was taken out of the option and is being put back"
280            );
281            return;
282        }
283
284        // XXX: Need to finalize on the pre-to-last height of the epoch so that
285        // the information becomes available on the last height and can be
286        // stored on chain.
287        info!("on pre-to-last height of epoch; finalizing ceremony");
288
289        let next_epoch = ceremony.epoch() + 1;
290
291        let ceremony_outcome = match ceremony.finalize() {
292            Ok(outcome) => {
293                self.metrics.ceremony.one_more_success();
294                info!(
295                    "ceremony was successful; using the new participants, polynomial and secret key"
296                );
297                outcome
298            }
299            Err(outcome) => {
300                self.metrics.ceremony.one_more_failure();
301                warn!(
302                    "ceremony was a failure; using the old participants, polynomial and secret key"
303                );
304                outcome
305            }
306        };
307        let (public, share) = ceremony_outcome.role.into_key_pair();
308
309        let old_epoch_state = self
310            .pre_allegretto_metadatas
311            .epoch_metadata
312            .remove(&CURRENT_EPOCH_KEY)
313            .expect("there must always be a current epoch state");
314
315        self.pre_allegretto_metadatas
316            .epoch_metadata
317            .put(PREVIOUS_EPOCH_KEY, old_epoch_state);
318
319        let new_epoch_state = EpochState {
320            epoch: next_epoch,
321            participants: ceremony_outcome.participants,
322            public,
323            share,
324        };
325        self.pre_allegretto_metadatas
326            .epoch_metadata
327            .put(CURRENT_EPOCH_KEY, new_epoch_state.clone());
328
329        self.pre_allegretto_metadatas
330            .epoch_metadata
331            .sync()
332            .await
333            .expect("must always be able to write epoch state to disk");
334
335        // Prune older ceremony.
336        if let Some(epoch) = new_epoch_state.epoch.checked_sub(2) {
337            let mut ceremony_metadata = self.ceremony_metadata.lock().await;
338            ceremony_metadata.remove(&epoch.into());
339            ceremony_metadata.sync().await.expect("metadata must sync");
340        }
341    }
342
343    #[instrument(skip_all, fields(epoch = self.pre_allegretto_metadatas.current_epoch_state().unwrap().epoch()))]
344    pub(super) async fn start_pre_allegretto_ceremony<TReceiver, TSender>(
345        &mut self,
346        mux: &mut MuxHandle<TSender, TReceiver>,
347    ) -> Ceremony<ContextCell<TContext>, TReceiver, TSender>
348    where
349        TReceiver: Receiver<PublicKey = PublicKey>,
350        TSender: Sender<PublicKey = PublicKey>,
351    {
352        let epoch_state = self
353            .pre_allegretto_metadatas
354            .epoch_metadata
355            .get(&CURRENT_EPOCH_KEY)
356            .expect("the epoch state must always during the lifetime of the actor");
357        let config = ceremony::Config {
358            namespace: self.config.namespace.clone(),
359            me: self.config.me.clone(),
360            public: epoch_state.public.clone(),
361            share: epoch_state.share.clone(),
362            epoch: epoch_state.epoch,
363            dealers: epoch_state.participants.clone(),
364            players: epoch_state.participants.clone(),
365        };
366
367        let ceremony = ceremony::Ceremony::init(
368            &mut self.context,
369            mux,
370            self.ceremony_metadata.clone(),
371            config,
372            self.metrics.ceremony.clone(),
373        )
374        .await
375        .expect("must always be able to initialize ceremony");
376
377        info!(
378            us = %self.config.me,
379            n_dealers = ceremony.dealers().len(),
380            dealers = ?ceremony.dealers(),
381            n_players = ceremony.players().len(),
382            players = ?ceremony.players(),
383            as_player = ceremony.is_player(),
384            as_dealer = ceremony.is_dealer(),
385            "started a ceremony",
386        );
387
388        self.metrics.pre_allegretto_ceremonies.inc();
389        ceremony
390    }
391
392    async fn transition_to_dynamic_validator_sets<TReceiver, TSender>(
393        &mut self,
394        mux: &mut MuxHandle<TSender, TReceiver>,
395    ) -> eyre::Result<Ceremony<ContextCell<TContext>, TReceiver, TSender>>
396    where
397        TReceiver: Receiver<PublicKey = PublicKey>,
398        TSender: Sender<PublicKey = PublicKey>,
399    {
400        let epoch_state = self
401            .pre_allegretto_metadatas
402            .epoch_metadata
403            .get(&CURRENT_EPOCH_KEY)
404            .cloned()
405            .expect(
406                "when transitioning from pre-allegretto static validator sets to \
407                post-allegretto dynamic validator sets the pre-allegretto epoch \
408                state must exist",
409            );
410
411        self.transition_from_static_validator_sets(epoch_state, mux)
412            .await
413            .wrap_err("hand-over to post-allegretto dynamic validator set logic failed")
414    }
415}
416
417pub(super) struct Metadatas<TContext>
418where
419    TContext: Clock + Metrics + Storage,
420{
421    /// Persisted information on the current epoch for DKG ceremonies that were
422    /// started after the allegretto hardfork.
423    epoch_metadata: Metadata<TContext, U64, EpochState>,
424}
425
426impl<TContext> Metadatas<TContext>
427where
428    TContext: Clock + Metrics + Storage,
429{
430    pub(super) async fn init(context: &TContext, partition_prefix: &str) -> Self
431    where
432        TContext: Metrics,
433    {
434        let epoch_metadata = Metadata::init(
435            context.with_label("post_allegretto_epoch_metadata"),
436            commonware_storage::metadata::Config {
437                // XXX: the prefix of this partition must stay fixed to be
438                // backward compatible with the pre-allegretto hardfork.
439                partition: format!("{partition_prefix}_current_epoch"),
440                codec_config: (),
441            },
442        )
443        .await
444        .expect("must be able to initialize metadata on disk to function");
445
446        Self { epoch_metadata }
447    }
448
449    pub(super) fn dkg_outcome(&self) -> Option<PublicOutcome> {
450        let epoch_state = self.current_epoch_state()?;
451        Some(PublicOutcome {
452            epoch: epoch_state.epoch(),
453            participants: epoch_state.participants().clone(),
454            public: epoch_state.public_polynomial().clone(),
455        })
456    }
457
458    pub(super) fn previous_epoch_state(&self) -> Option<&EpochState> {
459        self.epoch_metadata.get(&PREVIOUS_EPOCH_KEY)
460    }
461
462    pub(super) fn current_epoch_state(&self) -> Option<&EpochState> {
463        self.epoch_metadata.get(&CURRENT_EPOCH_KEY)
464    }
465
466    /// Removes all pre-allegretto state from disk.
467    ///
468    /// Returns the current epoch state on the left-hand side, if it exists, and
469    /// the previous epoch state on the right.
470    async fn delete_current_epoch_state(&mut self) -> Option<EpochState> {
471        let current_state = self.epoch_metadata.remove(&CURRENT_EPOCH_KEY);
472        self.epoch_metadata
473            .sync()
474            .await
475            .expect("must always be able to sync state to disk");
476        current_state
477    }
478
479    pub(super) async fn delete_previous_epoch_state(&mut self) -> Option<EpochState> {
480        let previous_state = self.epoch_metadata.remove(&PREVIOUS_EPOCH_KEY);
481        self.epoch_metadata
482            .sync()
483            .await
484            .expect("must always be able to sync state to disk");
485        previous_state
486    }
487}
488
489/// The state with all participants, public and private key share for an epoch.
490#[derive(Clone)]
491pub(super) struct EpochState {
492    epoch: Epoch,
493    participants: Ordered<PublicKey>,
494    public: Public<MinSig>,
495    share: Option<Share>,
496}
497
498impl std::fmt::Debug for EpochState {
499    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
500        f.debug_struct("EpochState")
501            .field("epoch", &self.epoch)
502            .field("participants", &self.participants)
503            .field("public", &self.public)
504            .field("share", &self.share.as_ref().map(|_| "<private share>"))
505            .finish()
506    }
507}
508
509impl EpochState {
510    pub(super) fn epoch(&self) -> Epoch {
511        self.epoch
512    }
513
514    pub(super) fn participants(&self) -> &Ordered<PublicKey> {
515        &self.participants
516    }
517
518    pub(super) fn public_polynomial(&self) -> &Public<MinSig> {
519        &self.public
520    }
521
522    pub(super) fn private_share(&self) -> &Option<Share> {
523        &self.share
524    }
525}
526
527impl Write for EpochState {
528    fn write(&self, buf: &mut impl bytes::BufMut) {
529        UInt(self.epoch).write(buf);
530        self.participants.write(buf);
531        self.public.write(buf);
532        self.share.write(buf);
533    }
534}
535
536impl EncodeSize for EpochState {
537    fn encode_size(&self) -> usize {
538        UInt(self.epoch).encode_size()
539            + self.participants.encode_size()
540            + self.public.encode_size()
541            + self.share.encode_size()
542    }
543}
544
545impl Read for EpochState {
546    type Cfg = ();
547
548    fn read_cfg(
549        buf: &mut impl bytes::Buf,
550        _cfg: &Self::Cfg,
551    ) -> Result<Self, commonware_codec::Error> {
552        let epoch = UInt::read(buf)?.into();
553        let participants = Ordered::read_cfg(buf, &(RangeCfg::from(0..=usize::MAX), ()))?;
554        let public =
555            Public::<MinSig>::read_cfg(buf, &(quorum(participants.len() as u32) as usize))?;
556        let share = Option::<Share>::read_cfg(buf, &())?;
557        Ok(Self {
558            epoch,
559            participants,
560            public,
561            share,
562        })
563    }
564}