1use std::{net::SocketAddr, sync::Arc, time::Duration};
2
3use commonware_codec::{
4 Encode as _, EncodeSize, RangeCfg, Read, ReadExt as _, Write, varint::UInt,
5};
6use commonware_consensus::{Block as _, Reporter, types::Epoch, utils};
7use commonware_cryptography::{
8 Signer as _,
9 bls12381::primitives::{group::Share, poly::Public, variant::MinSig},
10 ed25519::PublicKey,
11};
12use commonware_p2p::{
13 Receiver, Sender,
14 utils::{mux, mux::MuxHandle},
15};
16use commonware_runtime::{Clock, ContextCell, Handle, Metrics as _, Spawner, Storage, spawn_cell};
17use commonware_storage::metadata::Metadata;
18use commonware_utils::{
19 Acknowledgement, quorum,
20 sequence::U64,
21 set::{Ordered, OrderedAssociated},
22 union,
23};
24
25use eyre::{OptionExt as _, eyre};
26use futures::{StreamExt as _, channel::mpsc, lock::Mutex};
27use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
28use rand_core::CryptoRngCore;
29use tempo_chainspec::hardfork::TempoHardforks as _;
30use tempo_node::TempoFullNode;
31use tracing::{Span, error, info, instrument, warn};
32
33use crate::{
34 consensus::block::Block,
35 dkg::{
36 ceremony,
37 ceremony::{Ceremony, OUTCOME_NAMESPACE},
38 manager::{
39 DecodedValidator,
40 ingress::{Finalize, GetIntermediateDealing, GetOutcome},
41 validators::{self, ValidatorState},
42 },
43 },
44 epoch,
45};
46
47mod post_allegretto;
48mod pre_allegretto;
49
50pub(crate) struct Actor<TContext, TPeerManager>
51where
52 TContext: Clock + commonware_runtime::Metrics + Storage,
53 TPeerManager: commonware_p2p::Manager,
54{
55 config: super::Config<TPeerManager>,
57
58 context: ContextCell<TContext>,
60
61 mailbox: mpsc::UnboundedReceiver<super::Message>,
63
64 ceremony_metadata: Arc<Mutex<Metadata<ContextCell<TContext>, U64, ceremony::State>>>,
69
70 post_allegretto_metadatas: post_allegretto::Metadatas<ContextCell<TContext>>,
73
74 pre_allegretto_metadatas: pre_allegretto::Metadatas<ContextCell<TContext>>,
77
78 validators_metadata: Metadata<ContextCell<TContext>, U64, ValidatorState>,
87
88 metrics: Metrics,
91}
92
93impl<TContext, TPeerManager> Actor<TContext, TPeerManager>
94where
95 TContext: Clock + CryptoRngCore + commonware_runtime::Metrics + Spawner + Storage,
96 TPeerManager: commonware_p2p::Manager<
97 PublicKey = PublicKey,
98 Peers = OrderedAssociated<PublicKey, SocketAddr>,
99 >,
100{
101 pub(super) async fn new(
102 config: super::Config<TPeerManager>,
103 context: TContext,
104 mailbox: mpsc::UnboundedReceiver<super::ingress::Message>,
105 ) -> eyre::Result<Self> {
106 let context = ContextCell::new(context);
107
108 let ceremony_metadata = Metadata::init(
109 context.with_label("ceremony_metadata"),
110 commonware_storage::metadata::Config {
111 partition: format!("{}_ceremony", config.partition_prefix),
112 codec_config: (),
113 },
114 )
115 .await
116 .expect("must be able to initialize metadata on disk to function");
117
118 let post_allegretto_metadatas =
119 post_allegretto::Metadatas::init(&context, &config.partition_prefix).await;
120
121 let pre_allegretto_metadatas =
122 pre_allegretto::Metadatas::init(&context, &config.partition_prefix).await;
123
124 let validators_metadata = Metadata::init(
125 context.with_label("validators__metadata"),
126 commonware_storage::metadata::Config {
127 partition: format!("{}_validators", config.partition_prefix),
128 codec_config: (),
129 },
130 )
131 .await
132 .expect("must be able to initialize metadata on disk to function");
133
134 let syncing_players = Gauge::default();
135
136 let peers = Gauge::default();
137
138 let pre_allegretto_ceremonies = Counter::default();
139 let post_allegretto_ceremonies = Counter::default();
140 let failed_allegretto_transitions = Counter::default();
141
142 context.register(
143 "syncing_players",
144 "how many syncing players were registered; these will become players in the next ceremony",
145 syncing_players.clone(),
146 );
147
148 context.register(
149 "peers",
150 "how many peers are registered overall for the latest epoch",
151 peers.clone(),
152 );
153
154 context.register(
155 "pre_allegretto_ceremonies",
156 "how many ceremonies the node ran pre-allegretto",
157 pre_allegretto_ceremonies.clone(),
158 );
159 context.register(
160 "post_allegretto_ceremonies",
161 "how many ceremonies the node ran post-allegretto",
162 post_allegretto_ceremonies.clone(),
163 );
164
165 context.register(
166 "failed_allegretto_transitions",
167 "how many transitions from pre- to post-allegretto failed",
168 failed_allegretto_transitions.clone(),
169 );
170
171 let ceremony = ceremony::Metrics::register(&context);
172
173 let metrics = Metrics {
174 peers,
175 syncing_players,
176 pre_allegretto_ceremonies,
177 post_allegretto_ceremonies,
178 failed_allegretto_transitions,
179 ceremony,
180 };
181
182 Ok(Self {
183 config,
184 context,
185 mailbox,
186 ceremony_metadata: Arc::new(Mutex::new(ceremony_metadata)),
187 post_allegretto_metadatas,
188 pre_allegretto_metadatas,
189 validators_metadata,
190 metrics,
191 })
192 }
193
194 async fn run(
195 mut self,
196 (sender, receiver): (
197 impl Sender<PublicKey = PublicKey>,
198 impl Receiver<PublicKey = PublicKey>,
199 ),
200 ) {
201 if self.post_allegretto_init().await.is_err() {
203 return;
204 }
205 if self.pre_allegretto_init().await.is_err() {
207 return;
208 }
209
210 self.register_previous_epoch_state().await;
211 self.register_current_epoch_state().await;
212
213 let (mux, mut ceremony_mux) = mux::Muxer::new(
214 self.context.with_label("ceremony_mux"),
215 sender,
216 receiver,
217 self.config.mailbox_size,
218 );
219 mux.start();
220
221 let mut ceremony = Some(
222 self.start_ceremony_for_current_epoch_state(&mut ceremony_mux)
223 .await,
224 );
225
226 while let Some(message) = self.mailbox.next().await {
227 let cause = message.cause;
228 match message.command {
229 super::Command::Finalize(finalize) => {
230 self.handle_finalized(cause, finalize, &mut ceremony, &mut ceremony_mux)
231 .await;
232 }
233 super::Command::GetIntermediateDealing(get_ceremony_deal) => {
234 let _: Result<_, _> = self
235 .handle_get_intermediate_dealing(
236 cause,
237 get_ceremony_deal,
238 ceremony.as_mut(),
239 )
240 .await;
241 }
242 super::Command::GetOutcome(get_ceremony_outcome) => {
243 let _: Result<_, _> =
244 self.handle_get_outcome(cause, get_ceremony_outcome).await;
245 }
246
247 super::Command::VerifyDealing(verify_dealing) => {
258 let outcome = if self
259 .post_allegretto_metadatas
260 .current_epoch_state()
261 .is_some()
262 {
263 verify_dealing
264 .dealing
265 .verify(&union(&self.config.namespace, OUTCOME_NAMESPACE))
266 } else if self
267 .pre_allegretto_metadatas
268 .current_epoch_state()
269 .is_some()
270 {
271 verify_dealing.dealing.verify_pre_allegretto(&union(
272 &self.config.namespace,
273 OUTCOME_NAMESPACE,
274 ))
275 } else {
276 error!("could not determine if we are running pre- or post allegretto;");
277 continue;
278 };
279 let _ = verify_dealing.response.send(outcome);
280 }
281 }
282 }
283 }
284
285 pub(crate) fn start(
286 mut self,
287 dkg_channel: (
288 impl Sender<PublicKey = PublicKey>,
289 impl Receiver<PublicKey = PublicKey>,
290 ),
291 ) -> Handle<()> {
292 spawn_cell!(self.context, self.run(dkg_channel).await)
293 }
294
295 #[instrument(
296 parent = &cause,
297 skip_all,
298 fields(
299 request.epoch = epoch,
300 ceremony.epoch = ceremony.as_ref().map(|c| c.epoch()),
301 ),
302 err,
303 )]
304 async fn handle_get_intermediate_dealing<TReceiver, TSender>(
305 &mut self,
306 cause: Span,
307 GetIntermediateDealing { epoch, response }: GetIntermediateDealing,
308 ceremony: Option<&mut Ceremony<ContextCell<TContext>, TReceiver, TSender>>,
309 ) -> eyre::Result<()>
310 where
311 TReceiver: Receiver<PublicKey = PublicKey>,
312 TSender: Sender<PublicKey = PublicKey>,
313 {
314 let ceremony =
315 ceremony.ok_or_eyre("no ceremony running, can't serve intermediate dealings")?;
316 let mut outcome = None;
317
318 'get_outcome: {
319 if ceremony.epoch() != epoch {
320 warn!(
321 ceremony.epoch = %ceremony.epoch(),
322 "deal outcome for ceremony of different epoch requested",
323 );
324 break 'get_outcome;
325 }
326 outcome = ceremony.deal_outcome().cloned();
327 }
328
329 response
330 .send(outcome)
331 .map_err(|_| eyre!("failed returning outcome because requester went away"))
332 }
333
334 #[instrument(
335 parent = &cause,
336 skip_all,
337 err,
338 )]
339 async fn handle_get_outcome(
340 &mut self,
341 cause: Span,
342 GetOutcome { response }: GetOutcome,
343 ) -> eyre::Result<()> {
344 let outcome = if let Some(outcome) = self.post_allegretto_metadatas.dkg_outcome() {
345 outcome
346 } else if let Some(outcome) = self.pre_allegretto_metadatas.dkg_outcome() {
347 outcome
348 } else {
349 return Err(eyre!(
350 "no DKG outcome was found in state, even though it must exist \
351 - derived from the epoch state from either the pre- or \
352 post-allegretto logic"
353 ));
354 };
355
356 response
357 .send(outcome)
358 .map_err(|_| eyre!("failed returning outcome because requester went away"))
359 }
360
361 #[instrument(
387 parent = &cause,
388 skip_all,
389 fields(
390 block.derived_epoch = utils::epoch(self.config.epoch_length, block.height()),
391 block.height = block.height(),
392 ceremony.epoch = maybe_ceremony.as_ref().map(|c| c.epoch()),
393 ),
394 )]
395 async fn handle_finalized<TReceiver, TSender>(
396 &mut self,
397 cause: Span,
398 Finalize {
399 block,
400 acknowledgment,
401 }: Finalize,
402 maybe_ceremony: &mut Option<Ceremony<ContextCell<TContext>, TReceiver, TSender>>,
403 ceremony_mux: &mut MuxHandle<TSender, TReceiver>,
404 ) where
405 TReceiver: Receiver<PublicKey = PublicKey>,
406 TSender: Sender<PublicKey = PublicKey>,
407 {
408 if self.is_running_post_allegretto(&block) {
409 self.handle_finalized_post_allegretto(cause, *block, maybe_ceremony, ceremony_mux)
410 .await;
411 } else {
412 self.handle_finalized_pre_allegretto(cause, *block, maybe_ceremony, ceremony_mux)
413 .await;
414 }
415 acknowledgment.acknowledge();
416 }
417
418 #[instrument(
420 skip_all,
421 fields(
422 me = %self.config.me.public_key(),
423 current_epoch = self.current_epoch_state().epoch(),
424 )
425 )]
426 async fn start_ceremony_for_current_epoch_state<TReceiver, TSender>(
427 &mut self,
428 mux: &mut MuxHandle<TSender, TReceiver>,
429 ) -> Ceremony<ContextCell<TContext>, TReceiver, TSender>
430 where
431 TReceiver: Receiver<PublicKey = PublicKey>,
432 TSender: Sender<PublicKey = PublicKey>,
433 {
434 if self.post_allegretto_metadatas.exists() {
435 self.start_post_allegretto_ceremony(mux).await
436 } else {
437 self.start_pre_allegretto_ceremony(mux).await
438 }
439 }
440
441 #[instrument(skip_all, fields(epoch = self.current_epoch_state().epoch()))]
444 async fn register_current_epoch_state(&mut self) {
445 let epoch_state = self.current_epoch_state();
446
447 if let Some(previous_epoch) = epoch_state.epoch().checked_sub(1)
448 && let boundary_height =
449 utils::last_block_in_epoch(self.config.epoch_length, previous_epoch)
450 && let None = self.config.marshal.get_info(boundary_height).await
451 {
452 info!(
453 boundary_height,
454 previous_epoch,
455 "don't have the finalized boundary block of the previous epoch; \
456 this usually happens if the node restarted right after processing \
457 the the pre-to-last block; not starting a consensus engine backing \
458 the current epoch because the boundary block is its \"genesis\""
459 );
460 return;
461 }
462
463 let new_validator_state = match &epoch_state {
464 EpochState::PreModerato(epoch_state) => self
465 .validators_metadata
466 .get(&epoch_state.epoch().saturating_sub(1).into())
467 .cloned()
468 .expect(
469 "there must always be a validator state for the previous \
470 epoch state written for pre-allegretto logic; this is \
471 ensured at startup",
472 ),
473 EpochState::PostModerato(epoch_state) => epoch_state.validator_state.clone(),
474 };
475
476 self.validators_metadata
477 .put_sync(epoch_state.epoch().into(), new_validator_state.clone())
478 .await
479 .expect("must always be able to sync state");
480
481 self.config
482 .epoch_manager
483 .report(
484 epoch::Enter {
485 epoch: epoch_state.epoch(),
486 public: epoch_state.public_polynomial().clone(),
487 share: epoch_state.private_share().clone(),
488 participants: epoch_state.participants().clone(),
489 }
490 .into(),
491 )
492 .await;
493 info!(
494 epoch = epoch_state.epoch(),
495 participants = ?epoch_state.participants(),
496 public = alloy_primitives::hex::encode(epoch_state.public_polynomial().encode()),
497 "reported epoch state to epoch manager",
498 );
499 self.register_validators(epoch_state.epoch(), new_validator_state)
500 .await;
501 }
502
503 #[instrument(
515 skip_all,
516 fields(previous_epoch = self.previous_epoch_state().map(|s| s.epoch())))]
517 async fn register_previous_epoch_state(&mut self) {
518 if let Some(epoch_state) = self.previous_epoch_state() {
519 self.config
520 .epoch_manager
521 .report(
522 epoch::Enter {
523 epoch: epoch_state.epoch(),
524 public: epoch_state.public_polynomial().clone(),
525 share: epoch_state.private_share().clone(),
526 participants: epoch_state.participants().clone(),
527 }
528 .into(),
529 )
530 .await;
531 info!(
532 epoch = epoch_state.epoch(),
533 participants = ?epoch_state.participants(),
534 public = alloy_primitives::hex::encode(epoch_state.public_polynomial().encode()),
535 "reported epoch state to epoch manager",
536 );
537 }
538
539 if let Some(epoch) = self.current_epoch_state().epoch().checked_sub(2)
540 && let Some(validator_state) = self.validators_metadata.get(&epoch.into()).cloned()
541 {
542 self.register_validators(epoch, validator_state).await;
543 }
544 if let Some(epoch) = self.current_epoch_state().epoch().checked_sub(1)
545 && let Some(validator_state) = self.validators_metadata.get(&epoch.into()).cloned()
546 {
547 self.register_validators(epoch, validator_state).await;
548 }
549 }
550
551 #[instrument(skip_all, fields(epoch))]
553 async fn register_validators(&mut self, epoch: Epoch, validator_state: ValidatorState) {
554 let peers_to_register = validator_state.resolve_addresses_and_merge_peers();
555 self.metrics.peers.set(peers_to_register.len() as i64);
556 self.config
557 .peer_manager
558 .update(epoch, peers_to_register.clone())
559 .await;
560
561 info!(
562 peers_registered = ?peers_to_register,
563 "registered p2p peers by merging dealers, players, syncing players",
564 );
565 }
566
567 fn is_running_post_allegretto(&self, block: &Block) -> bool {
578 self.config
579 .execution_node
580 .chain_spec()
581 .is_allegretto_active_at_timestamp(block.timestamp())
582 && self.post_allegretto_metadatas.exists()
583 }
584
585 fn previous_epoch_state(&self) -> Option<EpochState> {
589 if let Some(epoch_state) = self
590 .post_allegretto_metadatas
591 .previous_epoch_state()
592 .cloned()
593 {
594 Some(EpochState::PostModerato(epoch_state))
595 } else {
596 self.pre_allegretto_metadatas
597 .previous_epoch_state()
598 .cloned()
599 .map(EpochState::PreModerato)
600 }
601 }
602
603 fn current_epoch_state(&self) -> EpochState {
612 if let Some(epoch_state) = self
613 .post_allegretto_metadatas
614 .current_epoch_state()
615 .cloned()
616 {
617 EpochState::PostModerato(epoch_state)
618 } else if let Some(epoch_state) =
619 self.pre_allegretto_metadatas.current_epoch_state().cloned()
620 {
621 EpochState::PreModerato(epoch_state)
622 } else {
623 panic!("either pre- or post-allegretto current-epoch-state should exist")
624 }
625 }
626}
627
628#[derive(Clone, Debug)]
629enum EpochState {
630 PreModerato(pre_allegretto::EpochState),
631 PostModerato(post_allegretto::EpochState),
632}
633
634impl EpochState {
635 fn epoch(&self) -> Epoch {
636 match self {
637 Self::PreModerato(epoch_state) => epoch_state.epoch(),
638 Self::PostModerato(epoch_state) => epoch_state.epoch(),
639 }
640 }
641
642 fn participants(&self) -> &Ordered<PublicKey> {
643 match self {
644 Self::PreModerato(epoch_state) => epoch_state.participants(),
645 Self::PostModerato(epoch_state) => epoch_state.participants(),
646 }
647 }
648
649 fn public_polynomial(&self) -> &Public<MinSig> {
650 match self {
651 Self::PreModerato(epoch_state) => epoch_state.public_polynomial(),
652 Self::PostModerato(epoch_state) => epoch_state.public_polynomial(),
653 }
654 }
655
656 fn private_share(&self) -> &Option<Share> {
657 match self {
658 Self::PreModerato(epoch_state) => epoch_state.private_share(),
659 Self::PostModerato(epoch_state) => epoch_state.private_share(),
660 }
661 }
662}
663
664#[derive(Clone)]
665struct Metrics {
666 peers: Gauge,
667 pre_allegretto_ceremonies: Counter,
668 post_allegretto_ceremonies: Counter,
669 failed_allegretto_transitions: Counter,
670 syncing_players: Gauge,
671 ceremony: ceremony::Metrics,
672}
673
674async fn read_validator_config_with_retry<C: commonware_runtime::Clock>(
676 context: &C,
677 node: &TempoFullNode,
678 epoch: Epoch,
679 epoch_length: u64,
680) -> OrderedAssociated<PublicKey, DecodedValidator> {
681 let mut attempts = 1;
682 let retry_after = Duration::from_secs(1);
683 loop {
684 if let Ok(validators) =
685 validators::read_from_contract(attempts, node, epoch, epoch_length).await
686 {
687 break validators;
688 }
689 tracing::warn_span!("read_validator_config_with_retry").in_scope(|| {
690 warn!(
691 attempts,
692 retry_after = %tempo_telemetry_util::display_duration(retry_after),
693 "reading validator config from contract failed; will retry",
694 );
695 });
696 attempts += 1;
697 context.sleep(retry_after).await;
698 }
699}
700
701#[derive(Clone, Debug)]
702struct DkgOutcome {
703 dkg_successful: bool,
705
706 epoch: Epoch,
708
709 participants: Ordered<PublicKey>,
711
712 public: Public<MinSig>,
714
715 share: Option<Share>,
717}
718
719impl Write for DkgOutcome {
720 fn write(&self, buf: &mut impl bytes::BufMut) {
721 self.dkg_successful.write(buf);
722 UInt(self.epoch).write(buf);
723 self.participants.write(buf);
724 self.public.write(buf);
725 self.share.write(buf);
726 }
727}
728
729impl EncodeSize for DkgOutcome {
730 fn encode_size(&self) -> usize {
731 self.dkg_successful.encode_size()
732 + UInt(self.epoch).encode_size()
733 + self.participants.encode_size()
734 + self.public.encode_size()
735 + self.share.encode_size()
736 }
737}
738
739impl Read for DkgOutcome {
740 type Cfg = ();
741
742 fn read_cfg(
743 buf: &mut impl bytes::Buf,
744 _cfg: &Self::Cfg,
745 ) -> Result<Self, commonware_codec::Error> {
746 let dkg_successful = bool::read(buf)?;
747 let epoch = UInt::read(buf)?.into();
748 let participants = Ordered::read_cfg(buf, &(RangeCfg::from(0..=usize::MAX), ()))?;
749 let public =
750 Public::<MinSig>::read_cfg(buf, &(quorum(participants.len() as u32) as usize))?;
751 let share = Option::<Share>::read_cfg(buf, &())?;
752 Ok(Self {
753 dkg_successful,
754 epoch,
755 participants,
756 public,
757 share,
758 })
759 }
760}