1use std::{collections::BTreeMap, num::NonZeroU32, task::Poll};
2
3use alloy_consensus::BlockHeader as _;
4use alloy_primitives::B256;
5use bytes::{Buf, BufMut};
6use commonware_codec::{Encode as _, EncodeSize, Read, ReadExt as _, Write};
7use commonware_consensus::{
8 Heightable as _,
9 marshal::{self, Update},
10 types::{Epoch, EpochPhase, Epocher as _, FixedEpocher, Height},
11};
12use commonware_cryptography::{
13 Signer as _,
14 bls12381::{
15 dkg::{
16 self, DealerLog, DealerPrivMsg, DealerPubMsg, Logs, PlayerAck, SignedDealerLog, observe,
17 },
18 primitives::{group::Share, variant::MinSig},
19 },
20 ed25519::{self, PrivateKey, PublicKey},
21 transcript::Summary,
22};
23use commonware_math::algebra::Random as _;
24use commonware_p2p::{
25 Receiver, Recipients, Sender,
26 utils::mux::{self, MuxHandle},
27};
28use commonware_parallel::Sequential;
29use commonware_runtime::{Clock, ContextCell, Handle, IoBuf, Metrics as _, Spawner, spawn_cell};
30use commonware_utils::{Acknowledgement, N3f1, NZU32, ordered};
31
32use eyre::{OptionExt as _, WrapErr as _, bail, ensure, eyre};
33use futures::{
34 FutureExt as _, Stream, StreamExt as _, channel::mpsc, select_biased, stream::FusedStream,
35};
36use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
37use rand_core::CryptoRngCore;
38use reth_ethereum::{chainspec::EthChainSpec, rpc::eth::primitives::BlockNumHash};
39use reth_provider::{BlockIdReader as _, HeaderProvider as _};
40use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
41use tempo_node::TempoFullNode;
42use tempo_precompiles::validator_config_v2::ValidatorConfigV2;
43use tracing::{Level, Span, debug, info, info_span, instrument, warn, warn_span};
44
45use crate::{
46 consensus::{Digest, block::Block},
47 validators::{read_active_and_known_peers_at_block_hash, read_validator_config_at_block_hash},
48};
49
50mod state;
51use state::State;
52
53use super::{
54 Command,
55 ingress::{GetDkgOutcome, VerifyDealerLog},
56};
57
58pub(crate) enum Message {
60 Dealer(DealerPubMsg<MinSig>, DealerPrivMsg),
62 Ack(PlayerAck<PublicKey>),
64}
65
66impl Write for Message {
67 fn write(&self, writer: &mut impl BufMut) {
68 match self {
69 Self::Dealer(pub_msg, priv_msg) => {
70 0u8.write(writer);
71 pub_msg.write(writer);
72 priv_msg.write(writer);
73 }
74 Self::Ack(ack) => {
75 1u8.write(writer);
76 ack.write(writer);
77 }
78 }
79 }
80}
81
82impl EncodeSize for Message {
83 fn encode_size(&self) -> usize {
84 1 + match self {
85 Self::Dealer(pub_msg, priv_msg) => pub_msg.encode_size() + priv_msg.encode_size(),
86 Self::Ack(ack) => ack.encode_size(),
87 }
88 }
89}
90
91impl Read for Message {
92 type Cfg = NonZeroU32;
93
94 fn read_cfg(reader: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
95 let tag = u8::read(reader)?;
96 match tag {
97 0 => {
98 let pub_msg = DealerPubMsg::read_cfg(reader, cfg)?;
99 let priv_msg = DealerPrivMsg::read(reader)?;
100 Ok(Self::Dealer(pub_msg, priv_msg))
101 }
102 1 => {
103 let ack = PlayerAck::read(reader)?;
104 Ok(Self::Ack(ack))
105 }
106 other => Err(commonware_codec::Error::InvalidEnum(other)),
107 }
108 }
109}
110
111pub(crate) struct Actor<TContext>
112where
113 TContext: Clock + commonware_runtime::Metrics + commonware_runtime::Storage,
114{
115 config: super::Config,
117
118 context: ContextCell<TContext>,
120
121 mailbox: mpsc::UnboundedReceiver<super::Message>,
123
124 metrics: Metrics,
127}
128
129impl<TContext> Actor<TContext>
130where
131 TContext: commonware_runtime::BufferPooler
132 + Clock
133 + CryptoRngCore
134 + commonware_runtime::Metrics
135 + Spawner
136 + commonware_runtime::Storage,
137{
138 pub(super) async fn new(
139 config: super::Config,
140 context: TContext,
141 mailbox: mpsc::UnboundedReceiver<super::ingress::Message>,
142 ) -> eyre::Result<Self> {
143 let context = ContextCell::new(context);
144
145 let metrics = Metrics::init(&context);
146
147 Ok(Self {
148 config,
149 context,
150 mailbox,
151 metrics,
152 })
153 }
154
155 pub(crate) fn start(
156 mut self,
157 dkg_channel: (
158 impl Sender<PublicKey = PublicKey>,
159 impl Receiver<PublicKey = PublicKey>,
160 ),
161 ) -> Handle<()> {
162 spawn_cell!(self.context, self.run(dkg_channel))
163 }
164
165 async fn run(
166 mut self,
167 (sender, receiver): (
168 impl Sender<PublicKey = PublicKey>,
169 impl Receiver<PublicKey = PublicKey>,
170 ),
171 ) {
172 let Ok(mut storage) = state::builder()
173 .partition_prefix(&self.config.partition_prefix)
174 .initial_state({
175 let mut context = self.context.clone();
176 let execution_node = self.config.execution_node.clone();
177 let initial_share = self.config.initial_share.clone();
178 let epoch_strategy = self.config.epoch_strategy.clone();
179 let mut marshal = self.config.marshal.clone();
180 async move {
181 read_initial_state_and_set_floor(
182 &mut context,
183 &execution_node,
184 initial_share.clone(),
185 &epoch_strategy,
186 &mut marshal,
187 )
188 .await
189 }
190 })
191 .init(self.context.with_label("state"))
192 .await
193 else {
194 return;
196 };
197
198 let (mux, mut dkg_mux) = mux::Muxer::new(
199 self.context.with_label("dkg_mux"),
200 sender,
201 receiver,
202 self.config.mailbox_size,
203 );
204 mux.start();
205
206 let reason = loop {
207 if let Err(error) = self.run_dkg_loop(&mut storage, &mut dkg_mux).await {
208 break error;
209 }
210 };
211
212 tracing::warn_span!("dkg_actor").in_scope(|| {
213 warn!(
214 %reason,
215 "actor exited",
216 );
217 });
218 }
219
220 async fn run_dkg_loop<TStorageContext, TSender, TReceiver>(
221 &mut self,
222 storage: &mut state::Storage<TStorageContext>,
223 mux: &mut MuxHandle<TSender, TReceiver>,
224 ) -> eyre::Result<()>
225 where
226 TStorageContext: commonware_runtime::Metrics + Clock + commonware_runtime::Storage,
227 TSender: Sender<PublicKey = PublicKey>,
228 TReceiver: Receiver<PublicKey = PublicKey>,
229 {
230 let state = storage.current();
231
232 self.metrics.reset();
233
234 self.metrics.dealers.set(state.dealers().len() as i64);
235 self.metrics.players.set(state.players().len() as i64);
236
237 if let Some(previous) = state.epoch.previous() {
238 storage.prune(previous).await.wrap_err_with(|| {
240 format!("unable to prune storage before up until epoch `{previous}`",)
241 })?;
242 }
243
244 self.enter_epoch(&state)
245 .wrap_err("could not instruct epoch manager to enter a new epoch")?;
246
247 let round = state::Round::from_state(&state, &self.config.namespace);
249
250 let mut dealer_state = storage
251 .create_dealer_for_round(
252 self.config.me.clone(),
253 round.clone(),
254 state.share.clone(),
255 state.seed,
256 )
257 .wrap_err("unable to instantiate dealer state")?;
258
259 if dealer_state.is_some() {
260 self.metrics.how_often_dealer.inc();
261 }
262
263 let mut player_state = storage
264 .create_player_for_round(self.config.me.clone(), &round)
265 .wrap_err("unable to instantiate player state")?;
266
267 if player_state.is_some() {
268 self.metrics.how_often_player.inc();
269 }
270
271 let (mut round_sender, mut round_receiver) =
273 mux.register(state.epoch.get()).await.wrap_err_with(|| {
274 format!(
275 "unable to create subchannel for this DKG ceremony of epoch `{}`",
276 state.epoch
277 )
278 })?;
279
280 let mut ancestry_stream = AncestorStream::new();
281
282 info_span!("run_dkg_loop", epoch = %state.epoch).in_scope(|| {
283 info!(
284 me = %self.config.me.public_key(),
285 dealers = ?state.dealers(),
286 players = ?state.players(),
287 as_dealer = dealer_state.is_some(),
288 as_player = player_state.is_some(),
289 "entering a new DKG ceremony",
290 )
291 });
292
293 let mut skip_to_boundary = false;
294 loop {
295 let mut shutdown = self.context.stopped().fuse();
296 select_biased!(
297
298 _ = &mut shutdown => {
299 break Err(eyre!("shutdown triggered"));
300 }
301
302 network_msg = round_receiver.recv().fuse() => {
303 match network_msg {
304 Ok((sender, message)) => {
305 let _ = self.handle_network_msg(
307 &round,
308 &mut round_sender,
309 storage,
310 dealer_state.as_mut(),
311 player_state.as_mut(),
312 sender,
313 message,
314 ).await;
315 }
316 Err(err) => {
317 break Err(err).wrap_err("network p2p subchannel closed")
318 }
319 }
320 }
321
322 msg = self.mailbox.next() => {
323 let Some(msg) = msg else {
324 break Err(eyre!("all instances of the DKG actor's mailbox are dropped"));
325 };
326
327 match msg.command {
328 Command::Update(update) => {
329 match *update {
330 Update::Tip(_, height, _) => {
331 if !skip_to_boundary {
332 skip_to_boundary |= self.should_skip_round(
333 &round,
334 height,
335 ).await;
336 if skip_to_boundary {
337 self.metrics.rounds_skipped.inc();
338 }
339 }
340 }
341 Update::Block(block, ack) => {
342 let res = if skip_to_boundary {
343 self.handle_finalized_boundary(
344 msg.cause,
345 &round,
346 block,
347 ).await
348 } else {
349 self.handle_finalized_block(
350 msg.cause,
351 &state,
352 &round,
353 &mut round_sender,
354 storage,
355 &mut dealer_state,
356 &mut player_state,
357 block,
358 ).await
359 };
360 let should_break = match res {
361 Ok(Some(new_state)) => {
362 info_span!(
363 "run_dkg_loop",
364 epoch = %state.epoch
365 ).in_scope(|| info!(
366 "constructed a new epoch state; \
367 persisting new state and exiting \
368 current epoch",
369 ));
370
371 if let Err(err) = storage
372 .set_state(new_state)
373 .await
374 .wrap_err("failed appending new state to journal")
375 {
376 break Err(err);
377 }
378 let _ = self.exit_epoch(&state);
380
381 true
382 }
383 Ok(None) => false,
384 Err(err) => break Err(err).wrap_err("failed handling finalized block"),
385 };
386 ack.acknowledge();
387 if should_break {
388 break Ok(());
389 }
390 }
391 }
392 }
393
394 Command::GetDealerLog(get_dealer_log) => {
395 warn_span!("get_dealer_log").in_scope(|| {
396 let log = if get_dealer_log.epoch != round.epoch() {
397 warn!(
398 request.epoch = %get_dealer_log.epoch,
399 round.epoch = %round.epoch(),
400 "application requested dealer log for \
401 an epoch other than we are currently \
402 running",
403 );
404 None
405 } else {
406 dealer_state
407 .as_ref()
408 .and_then(|dealer_state| dealer_state.finalized())
409 };
410 let _ = get_dealer_log
411 .response
412 .send(log);
413 });
414 }
415
416 Command::GetDkgOutcome(request) => {
417 if let Some(target) = ancestry_stream.tip()
418 && target == request.digest
419 {
420 ancestry_stream.update_receiver((msg.cause, request));
421 continue;
422 }
423 if let Ok(Some((hole, request))) = self
424 .handle_get_dkg_outcome(
425 &msg.cause,
426 storage,
427 &player_state,
428 &round,
429 &state,
430 request,
431 )
432 .await
433 {
434 let stream = match self.config.marshal.ancestry((None, hole)).await {
435 Some(stream) => stream,
436 None => break Err(eyre!("marshal mailbox is closed")),
437 };
438 ancestry_stream.set(
439 (msg.cause, request),
440 stream,
441 );
442 }
443 }
444 Command::VerifyDealerLog(verify) => {
445 self.handle_verify_dealer_log(
446 &state,
447 &round,
448 verify,
449 );
450 }
451 }
452 }
453
454 notarized_block = ancestry_stream.next() => {
455 if let Some(block) = notarized_block {
456 storage.cache_notarized_block(&round, block);
457 let (cause, request) = ancestry_stream
458 .take_request()
459 .expect("if the stream is yielding blocks, there must be a receiver");
460 if let Ok(Some((hole, request))) = self
461 .handle_get_dkg_outcome(&cause, storage, &player_state, &round, &state, request)
462 .await
463 {
464 let stream = match self.config.marshal.ancestry((None, hole)).await {
465 Some(stream) => stream,
466 None => break Err(eyre!("marshal mailbox is closed")),
467 };
468 ancestry_stream.set(
469 (cause, request),
470 stream,
471 );
472 }
473 }
474 }
475 )
476 }
477 }
478
479 fn handle_verify_dealer_log(
480 &self,
481 state: &state::State,
482 round: &state::Round,
483 VerifyDealerLog {
484 epoch,
485 bytes,
486 response,
487 }: VerifyDealerLog,
488 ) {
489 if state.epoch != epoch {
490 let _ = response.send(Err(eyre!(
491 "requested dealer log for epoch `{epoch}`, but current round \
492 is for epoch `{}`",
493 state.epoch
494 )));
495 return;
496 }
497 let res = SignedDealerLog::<MinSig, PrivateKey>::read_cfg(
498 &mut &bytes[..],
499 &NZU32!(round.players().len() as u32),
500 )
501 .wrap_err("failed reading dealer log from header")
502 .and_then(|log| {
503 log.check(round.info())
504 .map(|(dealer, _)| dealer)
505 .ok_or_eyre("not a dealer in the current round")
506 })
507 .inspect(|_| {
508 self.metrics.dealings_read.inc();
509 })
510 .inspect_err(|_| {
511 self.metrics.bad_dealings.inc();
512 });
513 let _ = response.send(res);
514 }
515
516 #[instrument(
524 skip_all,
525 fields(
526 round.epoch = %round.epoch(),
527 finalized.tip = %finalized_tip,
528 finalized.epoch = tracing::field::Empty,
529 ),
530 )]
531 async fn should_skip_round(&mut self, round: &state::Round, finalized_tip: Height) -> bool {
532 let epoch_info = self
533 .config
534 .epoch_strategy
535 .containing(finalized_tip)
536 .expect("epoch strategy is valid for all heights");
537 Span::current().record(
538 "finalized.epoch",
539 tracing::field::display(epoch_info.epoch()),
540 );
541
542 let should_skip_round = epoch_info.epoch() > round.epoch().next()
543 || (epoch_info.epoch() == round.epoch().next() && epoch_info.last() == finalized_tip);
544
545 if should_skip_round {
546 let boundary_height = self
547 .config
548 .epoch_strategy
549 .last(round.epoch())
550 .expect("epoch strategy is valid for all epochs");
551 info!(
552 %boundary_height,
553 "confirmed that the network is at least 2 epochs aheads of us; \
554 setting synchronization floor to boundary height of our DKG's \
555 epoch and reporting that the rest of the DKG round should be \
556 skipped",
557 );
558
559 if let Some(one_before_boundary) = boundary_height.previous() {
562 self.config.marshal.set_floor(one_before_boundary).await;
563 }
564 }
565 should_skip_round
566 }
567
568 #[instrument(
592 parent = &cause,
593 skip_all,
594 fields(
595 dkg.epoch = %round.epoch(),
596 block.height = %block.height(),
597 block.extra_data.bytes = block.header().extra_data().len(),
598 ),
599 err,
600 )]
601 #[expect(
602 clippy::too_many_arguments,
603 reason = "easiest way to express this for now"
604 )]
605 async fn handle_finalized_block<TStorageContext, TSender>(
607 &mut self,
608 cause: Span,
609 state: &state::State,
610 round: &state::Round,
611 round_channel: &mut TSender,
612 storage: &mut state::Storage<TStorageContext>,
613 dealer_state: &mut Option<state::Dealer>,
614 player_state: &mut Option<state::Player>,
615 block: Block,
616 ) -> eyre::Result<Option<State>>
617 where
618 TStorageContext: commonware_runtime::Metrics + Clock + commonware_runtime::Storage,
619 TSender: Sender<PublicKey = PublicKey>,
620 {
621 let epoch_info = self
622 .config
623 .epoch_strategy
624 .containing(block.height())
625 .expect("epoch strategy is covering all block heights");
626
627 match round.epoch().cmp(&epoch_info.epoch()) {
628 std::cmp::Ordering::Less => {
629 bail!(
630 "block is for a future epoch `{}`, but the current DKG \
631 loop is for epoch `{}`; this should never happen because \
632 the DKG actor drives which epochs are entered or skipped",
633 epoch_info.epoch(),
634 round.epoch(),
635 );
636 }
637 std::cmp::Ordering::Greater => {
638 warn!(
639 "ignoring block for prior epoch; older blocks are replayed \
640 against the DKG loop when a node was shut down right \
641 after a boundary block completed an epoch, but before \
642 it was fully processed by other actors"
643 );
644 return Ok(None);
645 }
646 std::cmp::Ordering::Equal => {
647 }
649 }
650
651 match epoch_info.phase() {
652 EpochPhase::Early => {
653 if let Some(dealer_state) = dealer_state {
654 self.distribute_shares(
655 storage,
656 round.epoch(),
657 dealer_state,
658 player_state,
659 round_channel,
660 )
661 .await;
662 }
663 }
664 EpochPhase::Midpoint | EpochPhase::Late => {
665 if let Some(dealer_state) = dealer_state {
666 dealer_state.finalize();
667 }
668 }
669 }
670
671 if block.height() != epoch_info.last() {
672 if !block.header().extra_data().is_empty() {
673 'handle_log: {
674 let (dealer, log) =
675 match read_dealer_log(block.header().extra_data().as_ref(), round) {
676 Err(reason) => {
677 warn!(
678 %reason,
679 "failed to read dealer log from block \
680 extraData header field");
681 break 'handle_log;
682 }
683 Ok((dealer, log)) => (dealer, log),
684 };
685 storage
686 .append_dealer_log(round.epoch(), dealer.clone(), log)
687 .await
688 .wrap_err("failed to append log to journal")?;
689 if self.config.me.public_key() == dealer
690 && let Some(dealer_state) = dealer_state
691 {
692 info!(
693 "found own dealing in finalized block; deleting it \
694 from state to not write it again"
695 );
696 dealer_state.take_finalized();
697 }
698 }
699 }
700
701 storage
702 .append_finalized_block(round.epoch(), block)
703 .await
704 .wrap_err("failed to append finalized block to journal")?;
705
706 return Ok(None);
707 }
708
709 info!("reached last block of epoch; reading DKG outcome from header");
710
711 let onchain_outcome = tempo_dkg_onchain_artifacts::OnchainDkgOutcome::read(
712 &mut block.header().extra_data().as_ref(),
713 )
714 .expect("the last block of an epoch must contain the DKG outcome");
715
716 info!("reading validator from contract");
717
718 let (local_output, mut share) = if let Some((outcome, share)) =
719 storage.get_dkg_outcome(&state.epoch, &block.parent_digest())
720 {
721 debug!("using cached DKG outcome");
722 (outcome.clone(), share.clone())
723 } else {
724 let mut logs = Logs::<MinSig, PublicKey, N3f1>::new(round.info().clone());
725 for (k, v) in storage.logs_for_epoch(round.epoch()) {
726 logs.record(k.clone(), v.clone());
727 }
728
729 let player_outcome = if let Some(player) = player_state.take() {
730 info!("we were a player in the ceremony; finalizing share");
731 match player.finalize(&mut self.context, logs.clone(), &Sequential) {
732 Ok((new_output, new_share)) => {
733 info!("local DKG ceremony was a success");
734 Some((new_output, state::ShareState::Plaintext(Some(new_share))))
735 }
736 Err(
737 reason
738 @ commonware_cryptography::bls12381::dkg::Error::MissingPlayerDealing,
739 ) => {
740 warn!(
741 reason = %eyre::Report::new(reason),
742 "missing critical DKG state to reconstruct a share in this epoch; has \
743 consensus state been deleted or a node with the same identity started \
744 without consensus state? Finalizing the current round as an observer \
745 and will not have a share in the next epoch"
746 );
747 None
748 }
749 Err(error) => {
750 warn!(
751 error = %eyre::Report::new(error),
752 "local DKG ceremony was a failure",
753 );
754 Some((state.output.clone(), state.share.clone()))
755 }
756 }
757 } else {
758 None
759 };
760
761 if let Some(outcome) = player_outcome {
762 outcome
763 } else {
764 match observe::<_, _, N3f1, ed25519::Batch>(&mut self.context, logs, &Sequential) {
765 Ok(output) => {
766 info!("local DKG ceremony was a success");
767 (output, state::ShareState::Plaintext(None))
768 }
769 Err(error) => {
770 warn!(
771 error = %eyre::Report::new(error),
772 "local DKG ceremony was a failure",
773 );
774 (state.output.clone(), state.share.clone())
775 }
776 }
777 }
778 };
779
780 if local_output != onchain_outcome.output {
781 let am_player = onchain_outcome
782 .next_players
783 .position(&self.config.me.public_key())
784 .is_some();
785 warn!(
786 am_player,
787 "the output of the local DKG ceremony does not match what is \
788 on chain; something is terribly wrong; will try and participate \
789 in the next round (if a player), but if we are misbehaving and \
790 other nodes are blocking us it might be time to delete this node \
791 and spin up a new identity",
792 );
793 share = state::ShareState::Plaintext(None);
794 }
795
796 if onchain_outcome.output == state.output {
800 self.metrics.failures.inc();
801 } else {
802 self.metrics.successes.inc();
803 }
804
805 Ok(Some(state::State {
806 epoch: onchain_outcome.epoch,
807 seed: Summary::random(&mut self.context),
808 output: onchain_outcome.output.clone(),
809 share,
810 players: onchain_outcome.next_players,
811 is_full_dkg: onchain_outcome.is_next_full_dkg,
812 }))
813 }
814
815 #[instrument(
821 parent = &cause,
822 skip_all,
823 fields(
824 dkg.epoch = %round.epoch(),
825 block.height = %block.height(),
826 block.extra_data.bytes = block.header().extra_data().len(),
827 ),
828 err,
829 )]
830 async fn handle_finalized_boundary(
831 &mut self,
832 cause: Span,
833 round: &state::Round,
834 block: Block,
835 ) -> eyre::Result<Option<State>> {
836 let epoch_info = self
837 .config
838 .epoch_strategy
839 .containing(block.height())
840 .expect("epoch strategy is covering all block heights");
841
842 match round.epoch().cmp(&epoch_info.epoch()) {
847 std::cmp::Ordering::Less => {
848 bail!(
849 "block is for a future epoch `{}`, but the current DKG \
850 loop is for epoch `{}`; this should never happen because \
851 the DKG actor drives which epochs are entered or skipped",
852 epoch_info.epoch(),
853 round.epoch(),
854 );
855 }
856 std::cmp::Ordering::Greater => {
857 warn!(
858 "ignoring block for prior epoch; older blocks are replayed \
859 against the DKG loop when a node was shut down right \
860 after a boundary block completed an epoch, but before \
861 it was fully processed by other actors"
862 );
863 return Ok(None);
864 }
865 std::cmp::Ordering::Equal => {
866 }
868 }
869
870 if block.height() != epoch_info.last() {
871 return Ok(None);
872 }
873
874 info!("found boundary block; reading DKG outcome from header");
875
876 let onchain_outcome = tempo_dkg_onchain_artifacts::OnchainDkgOutcome::read(
877 &mut block.header().extra_data().as_ref(),
878 )
879 .expect("the last block of an epoch must contain the DKG outcome");
880
881 info!("reading validators from contract");
882
883 Ok(Some(state::State {
884 epoch: onchain_outcome.epoch,
885 seed: Summary::random(&mut self.context),
886 output: onchain_outcome.output.clone(),
887 share: state::ShareState::Plaintext(None),
888 players: onchain_outcome.next_players,
889 is_full_dkg: onchain_outcome.is_next_full_dkg,
890 }))
891 }
892
893 #[instrument(skip_all, fields(me = %self.config.me.public_key(), %epoch))]
894 async fn distribute_shares<TStorageContext, TSender>(
895 &self,
896 storage: &mut state::Storage<TStorageContext>,
897 epoch: Epoch,
898 dealer_state: &mut state::Dealer,
899 player_state: &mut Option<state::Player>,
900 round_channel: &mut TSender,
901 ) where
902 TStorageContext: commonware_runtime::Metrics + Clock + commonware_runtime::Storage,
903 TSender: Sender<PublicKey = PublicKey>,
904 {
905 let me = self.config.me.public_key();
906 for (player, pub_msg, priv_msg) in dealer_state.shares_to_distribute().collect::<Vec<_>>() {
907 if player == me {
908 if let Some(player_state) = player_state
909 && let Ok(ack) = player_state
910 .receive_dealing(storage, epoch, me.clone(), pub_msg, priv_msg)
911 .await
912 .inspect(|_| {
913 self.metrics.shares_distributed.inc();
914 self.metrics.shares_received.inc();
915 })
916 .inspect_err(|error| warn!(%error, "failed to store our own dealing"))
917 && let Ok(()) = dealer_state
918 .receive_ack(storage, epoch, me.clone(), ack)
919 .await
920 .inspect_err(|error| warn!(%error, "failed to store our own ACK"))
921 {
922 self.metrics.acks_received.inc();
923 self.metrics.acks_sent.inc();
924 info!("stored our own ACK and share");
925 }
926 } else {
927 let payload = Message::Dealer(pub_msg, priv_msg).encode();
929 match round_channel
930 .send(Recipients::One(player.clone()), payload, true)
931 .await
932 {
933 Ok(success) => {
934 if success.is_empty() {
935 info!(%player, "failed to send share");
939 } else {
940 self.metrics.shares_distributed.inc();
941 info!(%player, "share sent");
942 }
943 }
944 Err(error) => {
945 warn!(%player, %error, "error sending share");
946 }
947 }
948 }
949 }
950 }
951
952 #[instrument(
953 skip_all,
954 fields(
955 epoch = %round.epoch(),
956 %from,
957 bytes = message.len()),
958 err)]
959 #[expect(
960 clippy::too_many_arguments,
961 reason = "easiest way to express this for now"
962 )]
963 async fn handle_network_msg<TStorageContext>(
965 &self,
966 round: &state::Round,
967 round_channel: &mut impl Sender<PublicKey = PublicKey>,
968 storage: &mut state::Storage<TStorageContext>,
969 dealer_state: Option<&mut state::Dealer>,
970 player_state: Option<&mut state::Player>,
971 from: PublicKey,
972 mut message: IoBuf,
973 ) -> eyre::Result<()>
974 where
975 TStorageContext: commonware_runtime::Metrics + Clock + commonware_runtime::Storage,
976 {
977 let msg = Message::read_cfg(&mut message, &NZU32!(round.players().len() as u32))
978 .wrap_err("failed reading p2p message")?;
979
980 match msg {
981 Message::Dealer(pub_msg, priv_msg) => {
982 if let Some(player_state) = player_state {
983 info!("received message from a dealer");
984 self.metrics.shares_received.inc();
985 let ack = player_state
986 .receive_dealing(storage, round.epoch(), from.clone(), pub_msg, priv_msg)
987 .await
988 .wrap_err("failed storing dealing")?;
989
990 if let Err(error) = round_channel
991 .send(
992 Recipients::One(from.clone()),
993 Message::Ack(ack).encode(),
994 true,
995 )
996 .await
997 {
998 warn!(
1003 reason = ?error,
1004 "failed returning ACK to dealer",
1005 );
1006 bail!("failed returning ACK to dealer");
1007 }
1008 info!("returned ACK to dealer");
1009 self.metrics.acks_sent.inc();
1010 } else {
1011 info!("received a dealer message, but we are not a player");
1012 }
1013 }
1014 Message::Ack(ack) => {
1015 if let Some(dealer_state) = dealer_state {
1016 info!("received an ACK");
1017 self.metrics.acks_received.inc();
1018 dealer_state
1019 .receive_ack(storage, round.epoch(), from, ack)
1020 .await
1021 .wrap_err("failed storing ACK")?;
1022 } else {
1023 info!("received an ACK, but we are not a dealer");
1024 }
1025 }
1026 }
1027 Ok(())
1028 }
1029
1030 #[instrument(
1043 parent = cause,
1044 skip_all,
1045 fields(
1046 as_player = player_state.is_some(),
1047 our.epoch = %round.epoch(),
1048 ),
1049 err(level = Level::WARN),
1050 )]
1051 async fn handle_get_dkg_outcome<TStorageContext>(
1052 &mut self,
1053 cause: &Span,
1054 storage: &mut state::Storage<TStorageContext>,
1055 player_state: &Option<state::Player>,
1056 round: &state::Round,
1057 state: &State,
1058 request: GetDkgOutcome,
1059 ) -> eyre::Result<Option<(Digest, GetDkgOutcome)>>
1060 where
1061 TStorageContext: commonware_runtime::Metrics + Clock + commonware_runtime::Storage,
1062 {
1063 let epoch_info = self
1064 .config
1065 .epoch_strategy
1066 .containing(request.height)
1067 .expect("our strategy covers all epochs");
1068
1069 ensure!(
1070 round.epoch() == epoch_info.epoch(),
1071 "request is for epoch `{}`, not our epoch",
1072 epoch_info.epoch(),
1073 );
1074
1075 let output = if let Some((output, _)) = storage
1076 .get_dkg_outcome(&state.epoch, &request.digest)
1077 .cloned()
1078 {
1079 output
1080 } else {
1081 let mut raw_logs = storage
1082 .logs_for_epoch(round.epoch())
1083 .map(|(k, v)| (k.clone(), v.clone()))
1084 .collect::<BTreeMap<_, _>>();
1085
1086 'ensure_enough_logs: {
1087 if raw_logs.len() == round.dealers().len() {
1088 info!("collected as many logs as there are dealers; concluding DKG");
1089 break 'ensure_enough_logs;
1090 }
1091
1092 info!(
1093 "did not have all dealer logs yet; will try to extend with \
1094 logs read from notarized blocks and concluding DKG that way",
1095 );
1096 let (mut height, mut digest) = (request.height, request.digest);
1097 while height >= epoch_info.first()
1098 && Some(height)
1099 >= storage
1100 .get_latest_finalized_block_for_epoch(&round.epoch())
1101 .map(|(_, info)| info.height)
1102 {
1103 if let Some(block) =
1104 storage.get_notarized_reduced_block(&round.epoch(), &digest)
1105 {
1106 raw_logs.extend(block.log.clone());
1107 height = if let Some(height) = block.height.previous() {
1108 height
1109 } else {
1110 break 'ensure_enough_logs;
1111 };
1112 digest = block.parent;
1113 } else {
1114 return Ok(Some((digest, request)));
1115 }
1116 }
1117 }
1118
1119 let mut logs = Logs::<MinSig, PublicKey, N3f1>::new(round.info().clone());
1120 for (k, v) in raw_logs {
1121 logs.record(k, v);
1122 }
1123
1124 let player_state = player_state.is_some().then(||
1127 storage
1128 .create_player_for_round(self.config.me.clone(), round)
1129 .expect("created a player instance before, must be able to create it again")
1130 .expect("did not return a player instance even though we created it for this round already")
1131 );
1132
1133 let (output, share) = {
1134 let player_outcome = if let Some(player) = player_state {
1135 info!("we were a player in the ceremony; finalizing share");
1136 match player.finalize(&mut self.context, logs.clone(), &Sequential) {
1137 Ok((new_output, new_share)) => {
1138 info!("local DKG ceremony was a success");
1139 Some((new_output, state::ShareState::Plaintext(Some(new_share))))
1140 }
1141 Err(
1142 reason
1143 @ commonware_cryptography::bls12381::dkg::Error::MissingPlayerDealing,
1144 ) => {
1145 warn!(
1146 reason = %eyre::Report::new(reason),
1147 "missing critical DKG state to reconstruct a share in this epoch; has \
1148 consensus state been deleted or a node with the same identity started \
1149 without consensus state? Finalizing the current round as an observer \
1150 and will not have a share in the next epoch"
1151 );
1152 None
1153 }
1154 Err(error) => {
1155 warn!(
1156 error = %eyre::Report::new(error),
1157 "local DKG ceremony was a failure",
1158 );
1159 Some((state.output.clone(), state.share.clone()))
1160 }
1161 }
1162 } else {
1163 None
1164 };
1165
1166 if let Some(outcome) = player_outcome {
1167 outcome
1168 } else {
1169 match observe::<_, _, N3f1, ed25519::Batch>(
1170 &mut self.context,
1171 logs,
1172 &Sequential,
1173 ) {
1174 Ok(output) => {
1175 info!("local DKG ceremony was a success");
1176 (output, state::ShareState::Plaintext(None))
1177 }
1178 Err(error) => {
1179 warn!(
1180 error = %eyre::Report::new(error),
1181 "local DKG ceremony was a failure",
1182 );
1183 (state.output.clone(), state.share.clone())
1184 }
1185 }
1186 }
1187 };
1188
1189 storage.cache_dkg_outcome(state.epoch, request.digest, output.clone(), share);
1190 output
1191 };
1192
1193 let next_epoch = state.epoch.next();
1195 let will_be_re_dkg = read_re_dkg_epoch(&self.config.execution_node, request.digest)
1196 .is_ok_and(|epoch| epoch == next_epoch.get());
1198 info!(
1199 will_be_re_dkg,
1200 %next_epoch,
1201 "determined if the next epoch will be a reshare or full re-dkg process",
1202 );
1203
1204 let next_players =
1205 determine_next_players_at_hash(&self.config.execution_node, request.digest.0)
1206 .wrap_err("could not determine who the next players are supposed to be")?;
1207
1208 request
1209 .response
1210 .send(OnchainDkgOutcome {
1211 epoch: next_epoch,
1212 output,
1213 next_players,
1214 is_next_full_dkg: will_be_re_dkg,
1215 })
1216 .map_err(|_| {
1217 eyre!("requester went away before speculative DKG outcome could be sent")
1218 })?;
1219
1220 Ok(None)
1221 }
1222
1223 #[instrument(skip_all, fields(epoch = %state.epoch), err(level = Level::WARN))]
1224 fn enter_epoch(&mut self, state: &state::State) -> eyre::Result<()> {
1225 self.config
1226 .epoch_manager
1227 .enter(
1228 state.epoch,
1229 state.output.public().clone(),
1230 state.share.clone().into_inner(),
1231 state.dealers().clone(),
1232 )
1233 .wrap_err("could not instruct epoch manager to enter epoch")
1234 }
1235
1236 #[instrument(skip_all, fields(epoch = %state.epoch), err(level = Level::WARN))]
1237 fn exit_epoch(&mut self, state: &state::State) -> eyre::Result<()> {
1238 self.config
1239 .epoch_manager
1240 .exit(state.epoch)
1241 .wrap_err("could not instruct epoch manager to enter epoch")
1242 }
1243}
1244
1245#[instrument(skip_all, err)]
1246async fn read_initial_state_and_set_floor<TContext>(
1247 context: &mut TContext,
1248 node: &TempoFullNode,
1249 share: Option<Share>,
1250 epoch_strategy: &FixedEpocher,
1251 marshal: &mut crate::alias::marshal::Mailbox,
1252) -> eyre::Result<State>
1253where
1254 TContext: Clock + CryptoRngCore,
1255{
1256 let latest_finalized = node
1257 .provider
1258 .finalized_block_num_hash()
1259 .wrap_err("unable to read highest finalized block from execution layer")?
1260 .unwrap_or_else(|| BlockNumHash::new(0, node.chain_spec().genesis_hash()));
1261
1262 let epoch_info = epoch_strategy
1263 .containing(Height::new(latest_finalized.number))
1264 .expect("epoch strategy is for all heights");
1265
1266 let latest_boundary = if epoch_info.last().get() == latest_finalized.number {
1267 latest_finalized.number
1268 } else {
1269 epoch_info
1270 .epoch()
1271 .previous()
1272 .map_or_else(Height::zero, |previous| {
1273 epoch_strategy
1274 .last(previous)
1275 .expect("epoch strategy is for all epochs")
1276 })
1277 .get()
1278 };
1279 info!(
1280 %latest_boundary,
1281 latest_finalized.number,
1282 %latest_finalized.hash,
1283 "execution layer reported newest available block, reading on-chain \
1284 DKG outcome from last boundary height, and validator state from newest \
1285 block"
1286 );
1287
1288 let boundary_header = node
1289 .provider
1290 .header_by_number(latest_boundary)
1291 .map_or_else(
1292 |e| Err(eyre::Report::new(e)),
1293 |header| header.ok_or_eyre("execution layer reported it had no header"),
1294 )
1295 .wrap_err_with(|| {
1296 format!("failed to read header for latest boundary block number `{latest_boundary}`")
1297 })?;
1298
1299 let onchain_outcome = tempo_dkg_onchain_artifacts::OnchainDkgOutcome::read(
1300 &mut boundary_header.extra_data().as_ref(),
1301 )
1302 .wrap_err("the boundary header did not contain the on-chain DKG outcome")?;
1303
1304 let share = state::ShareState::Plaintext('verify_initial_share: {
1305 let Some(share) = share else {
1306 break 'verify_initial_share None;
1307 };
1308 let Ok(partial) = onchain_outcome.sharing().partial_public(share.index) else {
1309 warn!(
1310 "the index of the provided share exceeds the polynomial of the \
1311 on-chain DKG outcome; ignoring the share"
1312 );
1313 break 'verify_initial_share None;
1314 };
1315 if share.public::<MinSig>() != partial {
1316 warn!(
1317 "the provided share does not match the polynomial of the \
1318 on-chain DKG outcome; ignoring the share"
1319 );
1320 break 'verify_initial_share None;
1321 }
1322 Some(share)
1323 });
1324
1325 info!(%latest_boundary, "setting sync floor");
1326 marshal.set_floor(Height::new(latest_boundary)).await;
1327
1328 Ok(State {
1329 epoch: onchain_outcome.epoch,
1330 seed: Summary::random(context),
1331 output: onchain_outcome.output.clone(),
1332 share,
1333 players: onchain_outcome.next_players,
1334 is_full_dkg: onchain_outcome.is_next_full_dkg,
1335 })
1336}
1337
1338#[derive(Clone)]
1339struct Metrics {
1340 shares_distributed: Gauge,
1341 shares_received: Gauge,
1342 acks_received: Gauge,
1343 acks_sent: Gauge,
1344 dealings_read: Gauge,
1345 bad_dealings: Gauge,
1346
1347 failures: Counter,
1348 successes: Counter,
1349
1350 dealers: Gauge,
1351 players: Gauge,
1352
1353 how_often_dealer: Counter,
1354 how_often_player: Counter,
1355
1356 rounds_skipped: Counter,
1357}
1358
1359impl Metrics {
1360 fn init<TContext>(context: &TContext) -> Self
1361 where
1362 TContext: commonware_runtime::Metrics,
1363 {
1364 let failures = Counter::default();
1365 context.register(
1366 "ceremony_failures",
1367 "the number of failed ceremonies a node participated in",
1368 failures.clone(),
1369 );
1370
1371 let successes = Counter::default();
1372 context.register(
1373 "ceremony_successes",
1374 "the number of successful ceremonies a node participated in",
1375 successes.clone(),
1376 );
1377
1378 let dealers = Gauge::default();
1379 context.register(
1380 "ceremony_dealers",
1381 "the number of dealers in the currently running ceremony",
1382 dealers.clone(),
1383 );
1384 let players = Gauge::default();
1385 context.register(
1386 "ceremony_players",
1387 "the number of players in the currently running ceremony",
1388 players.clone(),
1389 );
1390
1391 let how_often_dealer = Counter::default();
1392 context.register(
1393 "how_often_dealer",
1394 "number of the times as node was active as a dealer",
1395 how_often_dealer.clone(),
1396 );
1397 let how_often_player = Counter::default();
1398 context.register(
1399 "how_often_player",
1400 "number of the times as node was active as a player",
1401 how_often_player.clone(),
1402 );
1403
1404 let shares_distributed = Gauge::default();
1405 context.register(
1406 "ceremony_shares_distributed",
1407 "the number of shares distributed by this node as a dealer in the current ceremony",
1408 shares_distributed.clone(),
1409 );
1410
1411 let shares_received = Gauge::default();
1412 context.register(
1413 "ceremony_shares_received",
1414 "the number of shares received by this node as a player in the current ceremony",
1415 shares_received.clone(),
1416 );
1417
1418 let acks_received = Gauge::default();
1419 context.register(
1420 "ceremony_acks_received",
1421 "the number of acknowledgments received by this node as a dealer in the current ceremony",
1422 acks_received.clone(),
1423 );
1424
1425 let acks_sent = Gauge::default();
1426 context.register(
1427 "ceremony_acks_sent",
1428 "the number of acknowledgments sent by this node as a player in the current ceremony",
1429 acks_sent.clone(),
1430 );
1431
1432 let dealings_read = Gauge::default();
1433 context.register(
1434 "ceremony_dealings_read",
1435 "the number of dealings read from the blockchain in the current ceremony",
1436 dealings_read.clone(),
1437 );
1438
1439 let bad_dealings = Gauge::default();
1440 context.register(
1441 "ceremony_bad_dealings",
1442 "the number of blocks where decoding and verifying dealings failed in the current ceremony",
1443 bad_dealings.clone(),
1444 );
1445
1446 let rounds_skipped = Counter::default();
1447 context.register(
1448 "rounds_skipped",
1449 "how many DKG rounds were skipped because the node fell too far behind and tried to catch up",
1450 rounds_skipped.clone(),
1451 );
1452
1453 Self {
1454 shares_distributed,
1455 shares_received,
1456 acks_received,
1457 acks_sent,
1458 dealings_read,
1459 bad_dealings,
1460 dealers,
1461 players,
1462 how_often_dealer,
1463 how_often_player,
1464 failures,
1465 successes,
1466 rounds_skipped,
1467 }
1468 }
1469
1470 fn reset(&self) {
1471 self.shares_distributed.set(0);
1472 self.shares_received.set(0);
1473 self.acks_received.set(0);
1474 self.acks_sent.set(0);
1475 self.dealings_read.set(0);
1476 self.bad_dealings.set(0);
1477 }
1478}
1479
1480struct AncestorStream {
1486 pending_request: Option<(Span, GetDkgOutcome)>,
1487 inner: Option<marshal::ancestry::AncestorStream<crate::alias::marshal::Mailbox, Block>>,
1488}
1489
1490impl AncestorStream {
1491 fn new() -> Self {
1492 Self {
1493 pending_request: None,
1494 inner: None,
1495 }
1496 }
1497
1498 fn take_request(&mut self) -> Option<(Span, GetDkgOutcome)> {
1499 self.inner.take();
1500 self.pending_request.take()
1501 }
1502
1503 fn set(
1504 &mut self,
1505 pending_request: (Span, GetDkgOutcome),
1506 stream: marshal::ancestry::AncestorStream<crate::alias::marshal::Mailbox, Block>,
1507 ) {
1508 self.pending_request.replace(pending_request);
1509 self.inner.replace(stream);
1510 }
1511
1512 fn tip(&self) -> Option<Digest> {
1513 self.pending_request.as_ref().map(|(_, req)| req.digest)
1514 }
1515
1516 fn update_receiver(&mut self, pending_request: (Span, GetDkgOutcome)) {
1517 self.pending_request.replace(pending_request);
1518 }
1519}
1520
1521impl Stream for AncestorStream {
1522 type Item = Block;
1523
1524 fn poll_next(
1525 mut self: std::pin::Pin<&mut Self>,
1526 cx: &mut std::task::Context<'_>,
1527 ) -> std::task::Poll<Option<Self::Item>> {
1528 let item = {
1529 let this = match self.inner.as_mut() {
1530 Some(inner) => inner,
1531 None => return Poll::Ready(None),
1532 };
1533 this.poll_next_unpin(cx)
1534 };
1535 match futures::ready!(item) {
1536 None => {
1537 self.inner.take();
1538 Poll::Ready(None)
1539 }
1540 Some(block) => Poll::Ready(Some(block)),
1541 }
1542 }
1543}
1544
1545impl FusedStream for AncestorStream {
1546 fn is_terminated(&self) -> bool {
1547 self.inner.is_none()
1548 }
1549}
1550
1551fn read_dealer_log(
1552 mut bytes: &[u8],
1553 round: &state::Round,
1554) -> eyre::Result<(PublicKey, DealerLog<MinSig, PublicKey>)> {
1555 let signed_log = dkg::SignedDealerLog::<MinSig, PrivateKey>::read_cfg(
1556 &mut bytes,
1557 &NZU32!(round.players().len() as u32),
1558 )
1559 .wrap_err("could not decode as signed dealer log")?;
1560
1561 let (dealer, log) = signed_log
1562 .check(round.info())
1563 .ok_or_eyre("failed checking signed log against current round")?;
1564 Ok((dealer, log))
1565}
1566
1567#[instrument(skip_all, fields(%hash), err(level = Level::WARN))]
1575fn determine_next_players_at_hash(
1576 node: &TempoFullNode,
1577 hash: B256,
1578) -> eyre::Result<ordered::Set<PublicKey>> {
1579 let next_players =
1580 read_active_and_known_peers_at_block_hash(node, &ordered::Set::default(), hash)
1581 .wrap_err("failed reading peers from validator config v2")?
1582 .into_keys();
1583
1584 debug!(?next_players, "determined next players");
1585 Ok(next_players)
1586}
1587
1588#[instrument(
1599 skip_all,
1600 fields(
1601 %digest,
1602 ),
1603 err(level = Level::WARN)
1604 ret,
1605)]
1606pub(crate) fn read_re_dkg_epoch(node: &TempoFullNode, digest: Digest) -> eyre::Result<u64> {
1607 read_validator_config_at_block_hash(node, digest.0, |config: &ValidatorConfigV2| {
1608 config
1609 .get_next_network_identity_rotation_epoch()
1610 .map_err(eyre::Report::new)
1611 })
1612 .map(|(_, _, epoch)| epoch)
1613}