1use std::net::SocketAddr;
2
3use commonware_codec::{DecodeExt as _, EncodeSize, Read, Write};
4use commonware_consensus::{
5 Block as _, Reporter as _, simplex::signing_scheme::bls12381_threshold::Scheme, types::Epoch,
6 utils,
7};
8use commonware_cryptography::{
9 bls12381::primitives::{group::Share, poly::Public, variant::MinSig},
10 ed25519::PublicKey,
11};
12use commonware_p2p::{Receiver, Sender, utils::mux::MuxHandle};
13use commonware_runtime::{Clock, ContextCell, Metrics, Spawner, Storage};
14use commonware_storage::metadata::Metadata;
15use commonware_utils::{
16 sequence::U64,
17 set::{Ordered, OrderedAssociated},
18};
19use eyre::{WrapErr as _, ensure};
20use rand_core::CryptoRngCore;
21use reth_ethereum::chainspec::EthChainSpec as _;
22use tempo_chainspec::hardfork::TempoHardforks as _;
23use tempo_dkg_onchain_artifacts::PublicOutcome;
24use tracing::{Span, info, instrument, warn};
25
26use crate::{
27 consensus::block::Block,
28 dkg::{
29 HardforkRegime,
30 ceremony::{self, Ceremony},
31 manager::{
32 actor::{DkgOutcome, pre_allegretto},
33 validators::{self, ValidatorState},
34 },
35 },
36 epoch::{self, is_first_block_in_epoch},
37};
38
39const CURRENT_EPOCH_KEY: U64 = U64::new(0);
40const PREVIOUS_EPOCH_KEY: U64 = U64::new(1);
41
42const DKG_OUTCOME_KEY: U64 = U64::new(0);
43
44impl<TContext, TPeerManager> super::Actor<TContext, TPeerManager>
45where
46 TContext: Clock + CryptoRngCore + commonware_runtime::Metrics + Spawner + Storage,
47 TPeerManager: commonware_p2p::Manager<
48 PublicKey = PublicKey,
49 Peers = OrderedAssociated<PublicKey, SocketAddr>,
50 >,
51{
52 #[instrument(skip_all, err)]
53 pub(super) async fn post_allegretto_init(&mut self) -> eyre::Result<()> {
54 let spec = self.config.execution_node.chain_spec();
55 if !self.post_allegretto_metadatas.exists() && spec.is_allegretto_active_at_timestamp(0) {
56 info!(
57 "allegretto hardfork is active at timestamp 0, reading initial validators and public polynomial from genesis block"
58 );
59
60 let initial_dkg_outcome = PublicOutcome::decode(spec.genesis().extra_data.as_ref())
61 .wrap_err_with(|| {
62 format!(
63 "failed decoding the genesis.extra_data field as an \
64 initial DKG outcome; this field must be set and it \
65 must be decodable; bytes = {}",
66 spec.genesis().extra_data.len(),
67 )
68 })?;
69
70 ensure!(
71 initial_dkg_outcome.epoch == 0,
72 "at genesis, the epoch must be zero, but genesis reported `{}`",
73 initial_dkg_outcome.epoch
74 );
75
76 let our_share = self.config.initial_share.clone();
77 if let Some(our_share) = our_share.clone() {
78 let signer_or_verifier = Scheme::<_, MinSig>::new(
85 initial_dkg_outcome.participants.clone(),
86 &initial_dkg_outcome.public,
87 our_share,
88 );
89 ensure!(
90 matches!(signer_or_verifier, Scheme::Signer { .. },),
91 "incorrect signing share provided: the node would not be a \
92 signer in the ceremony"
93 );
94 }
95
96 let initial_validators = validators::read_from_contract(
97 0,
98 &self.config.execution_node,
99 0,
100 self.config.epoch_length,
101 )
102 .await
103 .wrap_err("validator config could not be read from genesis block validator config smart contract")?;
104
105 let initial_validator_state = ValidatorState::new(initial_validators);
108 let peers_as_per_contract = initial_validator_state.resolve_addresses_and_merge_peers();
109 ensure!(
110 peers_as_per_contract.keys() == &initial_dkg_outcome.participants,
111 "the DKG participants stored in the genesis extraData header \
112 don't match the peers determined from the onchain contract of \
113 the genesis block; \
114 extraData.participants = `{:?}; \
115 contract.peers = `{:?}",
116 initial_dkg_outcome.participants,
117 peers_as_per_contract.keys(),
118 );
119
120 info!(
121 initial_public_polynomial = ?initial_dkg_outcome.public,
122 initial_validators = ?peers_as_per_contract,
123 "using public polynomial and validators read from contract",);
124
125 self.post_allegretto_metadatas
126 .epoch_metadata
127 .put_sync(
128 CURRENT_EPOCH_KEY,
129 EpochState {
130 dkg_outcome: DkgOutcome {
131 dkg_successful: true,
132 epoch: 0,
133 participants: initial_dkg_outcome.participants,
134 public: initial_dkg_outcome.public,
135 share: self.config.initial_share.clone(),
136 },
137 validator_state: initial_validator_state.clone(),
138 },
139 )
140 .await
141 .expect("persisting epoch state must always work");
142 }
143 Ok(())
144 }
145
146 #[instrument(
172 parent = &cause,
173 skip_all,
174 fields(
175 block.derived_epoch = utils::epoch(self.config.epoch_length, block.height()),
176 block.height = block.height(),
177 ceremony.epoch = maybe_ceremony.as_ref().map(|c| c.epoch()),
178 ),
179 )]
180 pub(super) async fn handle_finalized_post_allegretto<TReceiver, TSender>(
181 &mut self,
182 cause: Span,
183 block: Block,
184 maybe_ceremony: &mut Option<Ceremony<ContextCell<TContext>, TReceiver, TSender>>,
185 ceremony_mux: &mut MuxHandle<TSender, TReceiver>,
186 ) where
187 TReceiver: Receiver<PublicKey = PublicKey>,
188 TSender: Sender<PublicKey = PublicKey>,
189 {
190 let block_epoch = utils::epoch(self.config.epoch_length, block.height());
191 if block_epoch != self.current_epoch_state().epoch() {
199 info!(
200 block_epoch,
201 actor_epoch = self.current_epoch_state().epoch(),
202 "block was for an epoch other than what the actor is currently tracking; ignoring",
203 );
204 return;
205 }
206
207 if utils::is_last_block_in_epoch(self.config.epoch_length, block.height()).is_some() {
215 self.update_and_register_current_epoch_state().await;
216
217 maybe_ceremony.replace(self.start_post_allegretto_ceremony(ceremony_mux).await);
218 return;
221 }
222
223 if is_first_block_in_epoch(self.config.epoch_length, block.height()).is_some() {
227 self.enter_current_epoch_and_remove_old_state().await;
228
229 if let Some(epoch) = self.current_epoch_state().epoch().checked_sub(3) {
232 self.validators_metadata.remove(&epoch.into());
233 self.validators_metadata
234 .sync()
235 .await
236 .expect("metadata must always be writable");
237 }
238 }
239
240 let mut ceremony = maybe_ceremony.take().expect(
241 "past this point a ceremony must always be defined; the only \
242 time a ceremony is not permitted to exist is exactly on the \
243 boundary; did the code after ensure that the ceremony is \
244 returned to its Option?",
245 );
246
247 match epoch::relative_position(block.height(), self.config.epoch_length) {
248 epoch::RelativePosition::FirstHalf => {
249 let _ = ceremony.distribute_shares().await;
250 let _ = ceremony.process_messages().await;
251 }
252 epoch::RelativePosition::Middle => {
253 let _ = ceremony.process_messages().await;
254 let _ = ceremony
255 .construct_intermediate_outcome(HardforkRegime::PostAllegretto)
256 .await;
257 }
258 epoch::RelativePosition::SecondHalf => {
259 let _ = ceremony
260 .process_dealings_in_block(&block, HardforkRegime::PostAllegretto)
261 .await;
262 }
263 }
264
265 let is_one_before_boundary =
269 utils::is_last_block_in_epoch(self.config.epoch_length, block.height() + 1).is_some();
270 if !is_one_before_boundary {
271 assert!(
272 maybe_ceremony.replace(ceremony).is_none(),
273 "putting back the ceremony we just took out",
274 );
275 return;
276 }
277
278 info!("on pre-to-last height of epoch; finalizing ceremony");
279
280 let current_epoch = ceremony.epoch();
281
282 let (ceremony_outcome, dkg_successful) = match ceremony.finalize() {
283 Ok(outcome) => {
284 self.metrics.ceremony.one_more_success();
285 info!(
286 "ceremony was successful; using the new participants, polynomial and secret key"
287 );
288 (outcome, true)
289 }
290 Err(outcome) => {
291 self.metrics.ceremony.one_more_failure();
292 warn!(
293 "ceremony was a failure; using the old participants, polynomial and secret key"
294 );
295 (outcome, false)
296 }
297 };
298 let (public, share) = ceremony_outcome.role.into_key_pair();
299
300 self.post_allegretto_metadatas
301 .dkg_outcome_metadata
302 .put_sync(
303 DKG_OUTCOME_KEY,
304 DkgOutcome {
305 dkg_successful,
306 epoch: current_epoch + 1,
307 participants: ceremony_outcome.participants,
308 public,
309 share,
310 },
311 )
312 .await
313 .expect("must always be able to persist the DKG outcome");
314
315 if let Some(epoch) = current_epoch.checked_sub(1) {
317 let mut ceremony_metadata = self.ceremony_metadata.lock().await;
318 ceremony_metadata.remove(&epoch.into());
319 ceremony_metadata.sync().await.expect("metadata must sync");
320 }
321 }
322
323 #[instrument(skip_all)]
324 pub(super) async fn transition_from_static_validator_sets<TReceiver, TSender>(
325 &mut self,
326 pre_allegretto_epoch_state: pre_allegretto::EpochState,
327 mux: &mut MuxHandle<TSender, TReceiver>,
328 ) -> eyre::Result<Ceremony<ContextCell<TContext>, TReceiver, TSender>>
329 where
330 TReceiver: Receiver<PublicKey = PublicKey>,
331 TSender: Sender<PublicKey = PublicKey>,
332 {
333 let pre_allegretto_validator_state = self
334 .validators_metadata
335 .get(&pre_allegretto_epoch_state.epoch().saturating_sub(1).into())
336 .cloned()
337 .expect("it is enforced at startup that the validator state for epoch-1 is written");
338
339 let on_chain_validators = super::read_validator_config_with_retry(
340 &self.context,
341 &self.config.execution_node,
342 pre_allegretto_epoch_state.epoch(),
343 self.config.epoch_length,
344 )
345 .await;
346
347 ensure!(
348 pre_allegretto_epoch_state.participants() == on_chain_validators.keys(),
349 "ed25519 public keys of validators read from contract do not match \
350 those of the last pre-allegretto static DKG ceremony; \
351 DKG participants = {:?}; \
352 contract = {:?}",
353 self.current_epoch_state().participants(),
354 on_chain_validators.keys(),
355 );
356
357 {
358 let static_validators = pre_allegretto_validator_state
359 .dealers()
360 .iter_pairs()
361 .map(|(key, val)| (key, &val.inbound))
362 .collect::<OrderedAssociated<_, _>>();
363 let on_chain_validators = on_chain_validators
364 .iter_pairs()
365 .map(|(key, val)| (key, &val.inbound))
366 .collect::<OrderedAssociated<_, _>>();
367
368 ensure!(
369 static_validators == on_chain_validators,
370 "static validators known to node (derived from config or \
371 chainspec) do not match the validators read from the on-chain
372 contract; \
373 static validators = {static_validators:?}; \
374 on chain validators = {on_chain_validators:?}",
375 );
376 }
377
378 let mut new_validator_state = pre_allegretto_validator_state.clone();
379 new_validator_state.push_on_failure(on_chain_validators.clone());
383 new_validator_state.push_on_failure(on_chain_validators);
384
385 self.post_allegretto_metadatas
386 .epoch_metadata
387 .put_sync(
388 CURRENT_EPOCH_KEY,
389 EpochState {
390 dkg_outcome: DkgOutcome {
391 dkg_successful: true,
392 epoch: pre_allegretto_epoch_state.epoch(),
393 participants: pre_allegretto_epoch_state.participants().clone(),
394 public: pre_allegretto_epoch_state.public_polynomial().clone(),
395 share: pre_allegretto_epoch_state.private_share().clone(),
396 },
397 validator_state: new_validator_state.clone(),
398 },
399 )
400 .await
401 .expect("syncing state must always work");
402 self.register_current_epoch_state().await;
403
404 Ok(self.start_post_allegretto_ceremony(mux).await)
405 }
406
407 #[instrument(skip_all, fields(epoch = self.post_allegretto_metadatas.current_epoch_state().unwrap().epoch()))]
408 pub(super) async fn start_post_allegretto_ceremony<TReceiver, TSender>(
409 &mut self,
410 mux: &mut MuxHandle<TSender, TReceiver>,
411 ) -> Ceremony<ContextCell<TContext>, TReceiver, TSender>
412 where
413 TReceiver: Receiver<PublicKey = PublicKey>,
414 TSender: Sender<PublicKey = PublicKey>,
415 {
416 let epoch_state = self.post_allegretto_metadatas.current_epoch_state().expect(
417 "the post-allegretto epoch state must exist in order to start a ceremony for it",
418 );
419 let config = ceremony::Config {
420 namespace: self.config.namespace.clone(),
421 me: self.config.me.clone(),
422 public: epoch_state.public_polynomial().clone(),
423 share: epoch_state.private_share().clone(),
424 epoch: epoch_state.epoch(),
425 dealers: epoch_state.dealer_pubkeys(),
426 players: epoch_state.player_pubkeys(),
427 };
428 let ceremony = ceremony::Ceremony::init(
429 &mut self.context,
430 mux,
431 self.ceremony_metadata.clone(),
432 config,
433 self.metrics.ceremony.clone(),
434 )
435 .await
436 .expect("must always be able to initialize ceremony");
437
438 info!(
439 us = %self.config.me,
440 n_dealers = ceremony.dealers().len(),
441 dealers = ?ceremony.dealers(),
442 n_players = ceremony.players().len(),
443 players = ?ceremony.players(),
444 as_player = ceremony.is_player(),
445 as_dealer = ceremony.is_dealer(),
446 n_syncing_players = epoch_state.validator_state.syncing_players().len(),
447 syncing_players = ?epoch_state.validator_state.syncing_players(),
448 "started a ceremony",
449 );
450
451 self.metrics
452 .syncing_players
453 .set(epoch_state.validator_state.syncing_players().len() as i64);
454
455 self.metrics.post_allegretto_ceremonies.inc();
456
457 ceremony
458 }
459
460 #[instrument(skip_all)]
461 async fn update_and_register_current_epoch_state(&mut self) {
462 let old_epoch_state = self
463 .post_allegretto_metadatas
464 .epoch_metadata
465 .remove(&CURRENT_EPOCH_KEY)
466 .expect("there must always exist an epoch state");
467
468 let dkg_outcome = self
470 .post_allegretto_metadatas
471 .dkg_outcome_metadata
472 .get(&DKG_OUTCOME_KEY)
473 .cloned()
474 .expect(
475 "when updating the current epoch state, there must be a DKG \
476 outcome of some ceremony",
477 );
478
479 assert_eq!(
480 old_epoch_state.epoch() + 1,
481 dkg_outcome.epoch,
482 "sanity check: old outcome must be new outcome - 1"
483 );
484
485 let syncing_players = super::read_validator_config_with_retry(
486 &self.context,
487 &self.config.execution_node,
488 dkg_outcome.epoch,
489 self.config.epoch_length,
490 )
491 .await;
492
493 let mut new_validator_state = old_epoch_state.validator_state.clone();
494 if dkg_outcome.dkg_successful {
495 new_validator_state.push_on_success(syncing_players);
496 } else {
497 new_validator_state.push_on_failure(syncing_players);
498 }
499
500 self.post_allegretto_metadatas.epoch_metadata.put(
501 CURRENT_EPOCH_KEY,
502 EpochState {
503 dkg_outcome,
504 validator_state: new_validator_state.clone(),
505 },
506 );
507 self.post_allegretto_metadatas
508 .epoch_metadata
509 .put(PREVIOUS_EPOCH_KEY, old_epoch_state);
510
511 self.post_allegretto_metadatas
512 .epoch_metadata
513 .sync()
514 .await
515 .expect("must always be able to persists epoch state");
516
517 self.register_current_epoch_state().await;
518 }
519
520 async fn enter_current_epoch_and_remove_old_state(&mut self) {
522 let epoch_to_shutdown = if let Some(old_epoch_state) = self
523 .post_allegretto_metadatas
524 .epoch_metadata
525 .remove(&PREVIOUS_EPOCH_KEY)
526 {
527 self.post_allegretto_metadatas
528 .epoch_metadata
529 .sync()
530 .await
531 .expect("must always be able to persist state");
532 Some(old_epoch_state.epoch())
533 } else {
534 self.pre_allegretto_metadatas
535 .delete_previous_epoch_state()
536 .await
537 .map(|old_state| old_state.epoch())
538 };
539
540 if let Some(epoch) = epoch_to_shutdown {
541 self.config
542 .epoch_manager
543 .report(epoch::Exit { epoch }.into())
544 .await;
545 }
546
547 if let Some(epoch) = epoch_to_shutdown.and_then(|epoch| epoch.checked_sub(2)) {
548 self.validators_metadata.remove(&epoch.into());
549 self.validators_metadata
550 .sync()
551 .await
552 .expect("must always be able to persist data");
553 }
554 }
555}
556
557pub(super) struct Metadatas<TContext>
558where
559 TContext: Clock + Metrics + Storage,
560{
561 epoch_metadata: Metadata<TContext, U64, EpochState>,
564
565 dkg_outcome_metadata: Metadata<TContext, U64, DkgOutcome>,
568}
569
570impl<TContext> Metadatas<TContext>
571where
572 TContext: Clock + Metrics + Storage,
573{
574 pub(super) async fn init(context: &TContext, partition_prefix: &str) -> Self
575 where
576 TContext: Metrics,
577 {
578 let epoch_metadata = Metadata::init(
579 context.with_label("post_allegretto_epoch_metadata"),
580 commonware_storage::metadata::Config {
581 partition: format!("{partition_prefix}_post_allegretto_current_epoch"),
582 codec_config: (),
583 },
584 )
585 .await
586 .expect("must be able to initialize metadata on disk to function");
587
588 let dkg_outcome_metadata = Metadata::init(
589 context.with_label("dkg_outcome_metadata"),
590 commonware_storage::metadata::Config {
591 partition: format!("{partition_prefix}_next_dkg_outcome"),
592 codec_config: (),
593 },
594 )
595 .await
596 .expect("must be able to initialize metadata on disk to function");
597
598 Self {
599 epoch_metadata,
600 dkg_outcome_metadata,
601 }
602 }
603
604 pub(super) fn current_epoch_state(&self) -> Option<&EpochState> {
605 self.epoch_metadata.get(&CURRENT_EPOCH_KEY)
606 }
607
608 pub(super) fn previous_epoch_state(&self) -> Option<&EpochState> {
609 self.epoch_metadata.get(&PREVIOUS_EPOCH_KEY)
610 }
611
612 pub(super) fn dkg_outcome(&self) -> Option<PublicOutcome> {
613 if let Some(dkg_outcome) = self.dkg_outcome_metadata.get(&DKG_OUTCOME_KEY) {
614 Some(PublicOutcome {
615 epoch: dkg_outcome.epoch,
616 participants: dkg_outcome.participants.clone(),
617 public: dkg_outcome.public.clone(),
618 })
619 } else {
620 self.epoch_metadata
621 .get(&CURRENT_EPOCH_KEY)
622 .map(|epoch_state| PublicOutcome {
623 epoch: epoch_state.dkg_outcome.epoch,
624 participants: epoch_state.dkg_outcome.participants.clone(),
625 public: epoch_state.dkg_outcome.public.clone(),
626 })
627 }
628 }
629
630 pub(super) fn exists(&self) -> bool {
631 self.current_epoch_state().is_some()
632 }
633}
634
635#[derive(Clone, Debug)]
643pub(super) struct EpochState {
644 pub(super) dkg_outcome: DkgOutcome,
645 pub(super) validator_state: ValidatorState,
646}
647
648impl EpochState {
649 pub(super) fn epoch(&self) -> Epoch {
650 self.dkg_outcome.epoch
651 }
652
653 pub(super) fn participants(&self) -> &Ordered<PublicKey> {
654 &self.dkg_outcome.participants
655 }
656
657 pub(super) fn public_polynomial(&self) -> &Public<MinSig> {
658 &self.dkg_outcome.public
659 }
660
661 pub(super) fn private_share(&self) -> &Option<Share> {
662 &self.dkg_outcome.share
663 }
664
665 pub(super) fn dealer_pubkeys(&self) -> Ordered<PublicKey> {
666 self.validator_state.dealer_pubkeys()
667 }
668
669 pub(super) fn player_pubkeys(&self) -> Ordered<PublicKey> {
670 self.validator_state.player_pubkeys()
671 }
672}
673
674impl Write for EpochState {
675 fn write(&self, buf: &mut impl bytes::BufMut) {
676 self.dkg_outcome.write(buf);
677 self.validator_state.write(buf);
678 }
679}
680
681impl EncodeSize for EpochState {
682 fn encode_size(&self) -> usize {
683 self.dkg_outcome.encode_size() + self.validator_state.encode_size()
684 }
685}
686
687impl Read for EpochState {
688 type Cfg = ();
689
690 fn read_cfg(
691 buf: &mut impl bytes::Buf,
692 _cfg: &Self::Cfg,
693 ) -> Result<Self, commonware_codec::Error> {
694 let dkg_outcome = DkgOutcome::read_cfg(buf, &())?;
695 let validator_state = ValidatorState::read_cfg(buf, &())?;
696 Ok(Self {
697 dkg_outcome,
698 validator_state,
699 })
700 }
701}