1use std::{collections::BTreeMap, num::NonZeroU32, task::Poll, time::Duration};
2
3use alloy_consensus::{BlockHeader as _, Sealable as _};
4use alloy_primitives::B256;
5use bytes::{Buf, BufMut};
6use commonware_codec::{Encode as _, EncodeSize, Read, ReadExt as _, Write};
7use commonware_consensus::{
8 Heightable as _,
9 marshal::{self, Update},
10 types::{Epoch, EpochPhase, Epocher as _, FixedEpocher, Height},
11};
12use commonware_cryptography::{
13 Signer as _,
14 bls12381::{
15 dkg::{self, DealerLog, DealerPrivMsg, DealerPubMsg, PlayerAck, SignedDealerLog, observe},
16 primitives::{group::Share, variant::MinSig},
17 },
18 ed25519::{PrivateKey, PublicKey},
19 transcript::Summary,
20};
21use commonware_math::algebra::Random as _;
22use commonware_p2p::{
23 Receiver, Recipients, Sender,
24 utils::mux::{self, MuxHandle},
25};
26use commonware_parallel::Sequential;
27use commonware_runtime::{Clock, ContextCell, Handle, IoBuf, Metrics as _, Spawner, spawn_cell};
28use commonware_utils::{Acknowledgement, N3f1, NZU32, ordered};
29
30use eyre::{OptionExt as _, WrapErr as _, bail, ensure, eyre};
31use futures::{
32 FutureExt as _, Stream, StreamExt as _, channel::mpsc, select_biased, stream::FusedStream,
33};
34use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
35use rand_core::CryptoRngCore;
36use reth_ethereum::{
37 chainspec::EthChainSpec, network::NetworkInfo, rpc::eth::primitives::BlockNumHash,
38};
39use reth_provider::{BlockIdReader as _, HeaderProvider as _};
40use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
41use tempo_node::TempoFullNode;
42use tempo_precompiles::{
43 validator_config::ValidatorConfig, validator_config_v2::ValidatorConfigV2,
44};
45use tracing::{Level, Span, debug, info, info_span, instrument, warn, warn_span};
46
47use crate::{
48 consensus::{Digest, block::Block},
49 validators::{
50 can_use_v2_at_block_hash, read_active_and_known_peers_at_block_hash_v1,
51 read_active_and_known_peers_at_block_hash_v2, read_validator_config_at_block_hash,
52 },
53};
54
55mod state;
56use state::State;
57
58use super::{
59 Command,
60 ingress::{GetDkgOutcome, VerifyDealerLog},
61};
62
63pub(crate) enum Message {
65 Dealer(DealerPubMsg<MinSig>, DealerPrivMsg),
67 Ack(PlayerAck<PublicKey>),
69}
70
71impl Write for Message {
72 fn write(&self, writer: &mut impl BufMut) {
73 match self {
74 Self::Dealer(pub_msg, priv_msg) => {
75 0u8.write(writer);
76 pub_msg.write(writer);
77 priv_msg.write(writer);
78 }
79 Self::Ack(ack) => {
80 1u8.write(writer);
81 ack.write(writer);
82 }
83 }
84 }
85}
86
87impl EncodeSize for Message {
88 fn encode_size(&self) -> usize {
89 1 + match self {
90 Self::Dealer(pub_msg, priv_msg) => pub_msg.encode_size() + priv_msg.encode_size(),
91 Self::Ack(ack) => ack.encode_size(),
92 }
93 }
94}
95
96impl Read for Message {
97 type Cfg = NonZeroU32;
98
99 fn read_cfg(reader: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
100 let tag = u8::read(reader)?;
101 match tag {
102 0 => {
103 let pub_msg = DealerPubMsg::read_cfg(reader, cfg)?;
104 let priv_msg = DealerPrivMsg::read(reader)?;
105 Ok(Self::Dealer(pub_msg, priv_msg))
106 }
107 1 => {
108 let ack = PlayerAck::read(reader)?;
109 Ok(Self::Ack(ack))
110 }
111 other => Err(commonware_codec::Error::InvalidEnum(other)),
112 }
113 }
114}
115
116pub(crate) struct Actor<TContext>
117where
118 TContext: Clock + commonware_runtime::Metrics + commonware_runtime::Storage,
119{
120 config: super::Config,
122
123 context: ContextCell<TContext>,
125
126 mailbox: mpsc::UnboundedReceiver<super::Message>,
128
129 metrics: Metrics,
132}
133
134impl<TContext> Actor<TContext>
135where
136 TContext: commonware_runtime::BufferPooler
137 + Clock
138 + CryptoRngCore
139 + commonware_runtime::Metrics
140 + Spawner
141 + commonware_runtime::Storage,
142{
143 pub(super) async fn new(
144 config: super::Config,
145 context: TContext,
146 mailbox: mpsc::UnboundedReceiver<super::ingress::Message>,
147 ) -> eyre::Result<Self> {
148 let context = ContextCell::new(context);
149
150 let metrics = Metrics::init(&context);
151
152 Ok(Self {
153 config,
154 context,
155 mailbox,
156 metrics,
157 })
158 }
159
160 pub(crate) fn start(
161 mut self,
162 dkg_channel: (
163 impl Sender<PublicKey = PublicKey>,
164 impl Receiver<PublicKey = PublicKey>,
165 ),
166 ) -> Handle<()> {
167 spawn_cell!(self.context, self.run(dkg_channel).await)
168 }
169
170 async fn run(
171 mut self,
172 (sender, receiver): (
173 impl Sender<PublicKey = PublicKey>,
174 impl Receiver<PublicKey = PublicKey>,
175 ),
176 ) {
177 let Ok(mut storage) = state::builder()
178 .partition_prefix(&self.config.partition_prefix)
179 .initial_state({
180 let mut context = self.context.clone();
181 let execution_node = self.config.execution_node.clone();
182 let initial_share = self.config.initial_share.clone();
183 let epoch_strategy = self.config.epoch_strategy.clone();
184 let mut marshal = self.config.marshal.clone();
185 async move {
186 read_initial_state_and_set_floor(
187 &mut context,
188 &execution_node,
189 initial_share.clone(),
190 &epoch_strategy,
191 &mut marshal,
192 )
193 .await
194 }
195 })
196 .init(self.context.with_label("state"))
197 .await
198 else {
199 return;
201 };
202
203 let (mux, mut dkg_mux) = mux::Muxer::new(
204 self.context.with_label("dkg_mux"),
205 sender,
206 receiver,
207 self.config.mailbox_size,
208 );
209 mux.start();
210
211 let reason = loop {
212 if let Err(error) = self.run_dkg_loop(&mut storage, &mut dkg_mux).await {
213 break error;
214 }
215 };
216
217 tracing::warn_span!("dkg_actor").in_scope(|| {
218 warn!(
219 %reason,
220 "actor exited",
221 );
222 });
223 }
224
225 async fn run_dkg_loop<TStorageContext, TSender, TReceiver>(
226 &mut self,
227 storage: &mut state::Storage<TStorageContext>,
228 mux: &mut MuxHandle<TSender, TReceiver>,
229 ) -> eyre::Result<()>
230 where
231 TStorageContext: commonware_runtime::Metrics + Clock + commonware_runtime::Storage,
232 TSender: Sender<PublicKey = PublicKey>,
233 TReceiver: Receiver<PublicKey = PublicKey>,
234 {
235 let state = storage.current();
236
237 self.metrics.reset();
238
239 self.metrics.dealers.set(state.dealers().len() as i64);
240 self.metrics.players.set(state.players().len() as i64);
241 self.metrics.syncing_players.set(state.syncers.len() as i64);
242
243 if let Some(previous) = state.epoch.previous() {
244 storage.prune(previous).await.wrap_err_with(|| {
246 format!("unable to prune storage before up until epoch `{previous}`",)
247 })?;
248 }
249
250 self.enter_epoch(&state)
251 .wrap_err("could not instruct epoch manager to enter a new epoch")?;
252
253 let round = state::Round::from_state(&state, &self.config.namespace);
255
256 let mut dealer_state = storage
257 .create_dealer_for_round(
258 self.config.me.clone(),
259 round.clone(),
260 state.share.clone(),
261 state.seed,
262 )
263 .wrap_err("unable to instantiate dealer state")?;
264
265 if dealer_state.is_some() {
266 self.metrics.how_often_dealer.inc();
267 }
268
269 let mut player_state = storage
270 .create_player_for_round(self.config.me.clone(), &round)
271 .wrap_err("unable to instantiate player state")?;
272
273 if player_state.is_some() {
274 self.metrics.how_often_player.inc();
275 }
276
277 let (mut round_sender, mut round_receiver) =
279 mux.register(state.epoch.get()).await.wrap_err_with(|| {
280 format!(
281 "unable to create subchannel for this DKG ceremony of epoch `{}`",
282 state.epoch
283 )
284 })?;
285
286 let mut ancestry_stream = AncestorStream::new();
287
288 info_span!("run_dkg_loop", epoch = %state.epoch).in_scope(|| {
289 info!(
290 me = %self.config.me.public_key(),
291 dealers = ?state.dealers(),
292 players = ?state.players(),
293 syncers = ?state.syncers,
294 as_dealer = dealer_state.is_some(),
295 as_player = player_state.is_some(),
296 "entering a new DKG ceremony",
297 )
298 });
299
300 let mut skip_to_boundary = false;
301 loop {
302 let mut shutdown = self.context.stopped().fuse();
303 select_biased!(
304
305 _ = &mut shutdown => {
306 break Err(eyre!("shutdown triggered"));
307 }
308
309 network_msg = round_receiver.recv().fuse() => {
310 match network_msg {
311 Ok((sender, message)) => {
312 let _ = self.handle_network_msg(
314 &round,
315 &mut round_sender,
316 storage,
317 dealer_state.as_mut(),
318 player_state.as_mut(),
319 sender,
320 message,
321 ).await;
322 }
323 Err(err) => {
324 break Err(err).wrap_err("network p2p subchannel closed")
325 }
326 }
327 }
328
329 msg = self.mailbox.next() => {
330 let Some(msg) = msg else {
331 break Err(eyre!("all instances of the DKG actor's mailbox are dropped"));
332 };
333
334 match msg.command {
335 Command::Update(update) => {
336 match *update {
337 Update::Tip(_, height, _) => {
338 if !skip_to_boundary {
339 skip_to_boundary |= self.should_skip_round(
340 &round,
341 height,
342 ).await;
343 if skip_to_boundary {
344 self.metrics.rounds_skipped.inc();
345 }
346 }
347 }
348 Update::Block(block, ack) => {
349 let res = if skip_to_boundary {
350 self.handle_finalized_boundary(
351 msg.cause,
352 &round,
353 block,
354 ).await
355 } else {
356 self.handle_finalized_block(
357 msg.cause,
358 &state,
359 &round,
360 &mut round_sender,
361 storage,
362 &mut dealer_state,
363 &mut player_state,
364 block,
365 ).await
366 };
367 let should_break = match res {
368 Ok(Some(new_state)) => {
369 info_span!(
370 "run_dkg_loop",
371 epoch = %state.epoch
372 ).in_scope(|| info!(
373 "constructed a new epoch state; \
374 persisting new state and exiting \
375 current epoch",
376 ));
377
378 if let Err(err) = storage
379 .set_state(new_state)
380 .await
381 .wrap_err("failed appending new state to journal")
382 {
383 break Err(err);
384 }
385 let _ = self.exit_epoch(&state);
387
388 true
389 }
390 Ok(None) => false,
391 Err(err) => break Err(err).wrap_err("failed handling finalized block"),
392 };
393 ack.acknowledge();
394 if should_break {
395 break Ok(());
396 }
397 }
398 }
399 }
400
401 Command::GetDealerLog(get_dealer_log) => {
402 warn_span!("get_dealer_log").in_scope(|| {
403 let log = if get_dealer_log.epoch != round.epoch() {
404 warn!(
405 request.epoch = %get_dealer_log.epoch,
406 round.epoch = %round.epoch(),
407 "application requested dealer log for \
408 an epoch other than we are currently \
409 running",
410 );
411 None
412 } else {
413 dealer_state
414 .as_ref()
415 .and_then(|dealer_state| dealer_state.finalized())
416 };
417 let _ = get_dealer_log
418 .response
419 .send(log);
420 });
421 }
422
423 Command::GetDkgOutcome(request) => {
424 if let Some(target) = ancestry_stream.tip()
425 && target == request.digest
426 {
427 ancestry_stream.update_receiver((msg.cause, request));
428 continue;
429 }
430 if let Ok(Some((hole, request))) = self
431 .handle_get_dkg_outcome(
432 &msg.cause,
433 storage,
434 &player_state,
435 &round,
436 &state,
437 request,
438 )
439 .await
440 {
441 let stream = match self.config.marshal.ancestry((None, hole)).await {
442 Some(stream) => stream,
443 None => break Err(eyre!("marshal mailbox is closed")),
444 };
445 ancestry_stream.set(
446 (msg.cause, request),
447 stream,
448 );
449 }
450 }
451 Command::VerifyDealerLog(verify) => {
452 self.handle_verify_dealer_log(
453 &state,
454 &round,
455 verify,
456 );
457 }
458 }
459 }
460
461 notarized_block = ancestry_stream.next() => {
462 if let Some(block) = notarized_block {
463 storage.cache_notarized_block(&round, block);
464 let (cause, request) = ancestry_stream
465 .take_request()
466 .expect("if the stream is yielding blocks, there must be a receiver");
467 if let Ok(Some((hole, request))) = self
468 .handle_get_dkg_outcome(&cause, storage, &player_state, &round, &state, request)
469 .await
470 {
471 let stream = match self.config.marshal.ancestry((None, hole)).await {
472 Some(stream) => stream,
473 None => break Err(eyre!("marshal mailbox is closed")),
474 };
475 ancestry_stream.set(
476 (cause, request),
477 stream,
478 );
479 }
480 }
481 }
482 )
483 }
484 }
485
486 fn handle_verify_dealer_log(
487 &self,
488 state: &state::State,
489 round: &state::Round,
490 VerifyDealerLog {
491 epoch,
492 bytes,
493 response,
494 }: VerifyDealerLog,
495 ) {
496 if state.epoch != epoch {
497 let _ = response.send(Err(eyre!(
498 "requested dealer log for epoch `{epoch}`, but current round \
499 is for epoch `{}`",
500 state.epoch
501 )));
502 return;
503 }
504 let res = SignedDealerLog::<MinSig, PrivateKey>::read_cfg(
505 &mut &bytes[..],
506 &NZU32!(round.players().len() as u32),
507 )
508 .wrap_err("failed reading dealer log from header")
509 .and_then(|log| {
510 log.check(round.info())
511 .map(|(dealer, _)| dealer)
512 .ok_or_eyre("not a dealer in the current round")
513 })
514 .inspect(|_| {
515 self.metrics.dealings_read.inc();
516 })
517 .inspect_err(|_| {
518 self.metrics.bad_dealings.inc();
519 });
520 let _ = response.send(res);
521 }
522
523 #[instrument(
531 skip_all,
532 fields(
533 round.epoch = %round.epoch(),
534 finalized.tip = %finalized_tip,
535 finalized.epoch = tracing::field::Empty,
536 ),
537 )]
538 async fn should_skip_round(&mut self, round: &state::Round, finalized_tip: Height) -> bool {
539 let epoch_info = self
540 .config
541 .epoch_strategy
542 .containing(finalized_tip)
543 .expect("epoch strategy is valid for all heights");
544 Span::current().record(
545 "finalized.epoch",
546 tracing::field::display(epoch_info.epoch()),
547 );
548
549 let should_skip_round = epoch_info.epoch() > round.epoch().next()
550 || (epoch_info.epoch() == round.epoch().next() && epoch_info.last() == finalized_tip);
551
552 if should_skip_round {
553 let boundary_height = self
554 .config
555 .epoch_strategy
556 .last(round.epoch())
557 .expect("epoch strategy is valid for all epochs");
558 info!(
559 %boundary_height,
560 "confirmed that the network is at least 2 epochs aheads of us; \
561 setting synchronization floor to boundary height of our DKG's \
562 epoch and reporting that the rest of the DKG round should be \
563 skipped",
564 );
565
566 if let Some(one_before_boundary) = boundary_height.previous() {
569 self.config.marshal.set_floor(one_before_boundary).await;
570 }
571 }
572 should_skip_round
573 }
574
575 #[instrument(
599 parent = &cause,
600 skip_all,
601 fields(
602 dkg.epoch = %round.epoch(),
603 block.height = %block.height(),
604 block.extra_data.bytes = block.header().extra_data().len(),
605 ),
606 err,
607 )]
608 #[expect(
609 clippy::too_many_arguments,
610 reason = "easiest way to express this for now"
611 )]
612 async fn handle_finalized_block<TStorageContext, TSender>(
614 &mut self,
615 cause: Span,
616 state: &state::State,
617 round: &state::Round,
618 round_channel: &mut TSender,
619 storage: &mut state::Storage<TStorageContext>,
620 dealer_state: &mut Option<state::Dealer>,
621 player_state: &mut Option<state::Player>,
622 block: Block,
623 ) -> eyre::Result<Option<State>>
624 where
625 TStorageContext: commonware_runtime::Metrics + Clock + commonware_runtime::Storage,
626 TSender: Sender<PublicKey = PublicKey>,
627 {
628 let epoch_info = self
629 .config
630 .epoch_strategy
631 .containing(block.height())
632 .expect("epoch strategy is covering all block heights");
633
634 ensure!(
635 epoch_info.epoch() == round.epoch(),
636 "block was not for this epoch; must observe all blocks epoch by \
637 epoch; cannot deal with observing blocks out-of-order"
638 );
639
640 match epoch_info.phase() {
641 EpochPhase::Early => {
642 if let Some(dealer_state) = dealer_state {
643 self.distribute_shares(
644 storage,
645 round.epoch(),
646 dealer_state,
647 player_state,
648 round_channel,
649 )
650 .await;
651 }
652 }
653 EpochPhase::Midpoint | EpochPhase::Late => {
654 if let Some(dealer_state) = dealer_state {
655 dealer_state.finalize();
656 }
657 }
658 }
659
660 if block.height() != epoch_info.last() {
661 if !block.header().extra_data().is_empty() {
662 'handle_log: {
663 let (dealer, log) =
664 match read_dealer_log(block.header().extra_data().as_ref(), round) {
665 Err(reason) => {
666 warn!(
667 %reason,
668 "failed to read dealer log from block \
669 extraData header field");
670 break 'handle_log;
671 }
672 Ok((dealer, log)) => (dealer, log),
673 };
674 storage
675 .append_dealer_log(round.epoch(), dealer.clone(), log)
676 .await
677 .wrap_err("failed to append log to journal")?;
678 if self.config.me.public_key() == dealer
679 && let Some(dealer_state) = dealer_state
680 {
681 info!(
682 "found own dealing in finalized block; deleting it \
683 from state to not write it again"
684 );
685 dealer_state.take_finalized();
686 }
687 }
688 }
689
690 storage
691 .append_finalized_block(round.epoch(), block)
692 .await
693 .wrap_err("failed to append finalized block to journal")?;
694
695 return Ok(None);
696 }
697
698 info!("reached last block of epoch; reading DKG outcome from header");
699
700 let onchain_outcome = tempo_dkg_onchain_artifacts::OnchainDkgOutcome::read(
701 &mut block.header().extra_data().as_ref(),
702 )
703 .expect("the last block of an epoch must contain the DKG outcome");
704
705 info!("reading validator from contract");
706
707 let (local_output, mut share) = if let Some((outcome, share)) =
708 storage.get_dkg_outcome(&state.epoch, &block.parent_digest())
709 {
710 debug!("using cached DKG outcome");
711 (outcome.clone(), share.clone())
712 } else {
713 let logs = storage
714 .logs_for_epoch(round.epoch())
715 .map(|(k, v)| (k.clone(), v.clone()))
716 .collect::<BTreeMap<_, _>>();
717
718 let player_outcome = player_state.take().and_then(|player| {
719 info!("we were a player in the ceremony; finalizing share");
720 match player.finalize(logs.clone(), &Sequential) {
721 Ok((new_output, new_share)) => {
722 info!("local DKG ceremony was a success");
723 Some((new_output, state::ShareState::Plaintext(Some(new_share))))
724 }
725 Err(
726 reason
727 @ commonware_cryptography::bls12381::dkg::Error::MissingPlayerDealing,
728 ) => {
729 warn!(
730 reason = %eyre::Report::new(reason),
731 "missing critical DKG state to reconstruct a share in this epoch; has \
732 consensus state been deleted or a node with the same identity started \
733 without consensus state? Finalizing the current round as an observer \
734 and will not have a share in the next epoch"
735 );
736 None
737 }
738 Err(error) => {
739 warn!(
740 error = %eyre::Report::new(error),
741 "local DKG ceremony was a failure",
742 );
743 Some((state.output.clone(), state.share.clone()))
744 }
745 }
746 });
747
748 player_outcome.unwrap_or_else(move || {
749 match observe::<_, _, N3f1>(round.info().clone(), logs, &Sequential) {
750 Ok(output) => {
751 info!("local DKG ceremony was a success");
752 (output, state::ShareState::Plaintext(None))
753 }
754 Err(error) => {
755 warn!(
756 error = %eyre::Report::new(error),
757 "local DKG ceremony was a failure",
758 );
759 (state.output.clone(), state.share.clone())
760 }
761 }
762 })
763 };
764
765 if local_output != onchain_outcome.output {
766 let am_player = onchain_outcome
767 .next_players
768 .position(&self.config.me.public_key())
769 .is_some();
770 warn!(
771 am_player,
772 "the output of the local DKG ceremony does not match what is \
773 on chain; something is terribly wrong; will try and participate \
774 in the next round (if a player), but if we are misbehaving and \
775 other nodes are blocking us it might be time to delete this node \
776 and spin up a new identity",
777 );
778 share = state::ShareState::Plaintext(None);
779 }
780
781 if onchain_outcome.output == state.output {
785 self.metrics.failures.inc();
786 } else {
787 self.metrics.successes.inc();
788 }
789
790 let syncers = read_syncers_if_v2_not_initialized_with_retry(
791 &self.context,
792 &self.config.execution_node,
793 &block,
794 &self.metrics.attempts_to_read_validator_contract,
795 )
796 .await
797 .wrap_err("failed reading contract to determine syncers")?;
798
799 Ok(Some(state::State {
800 epoch: onchain_outcome.epoch,
801 seed: Summary::random(&mut self.context),
802 output: onchain_outcome.output.clone(),
803 share,
804 players: onchain_outcome.next_players,
805 syncers,
806 is_full_dkg: onchain_outcome.is_next_full_dkg,
807 }))
808 }
809
810 #[instrument(
816 parent = &cause,
817 skip_all,
818 fields(
819 dkg.epoch = %round.epoch(),
820 block.height = %block.height(),
821 block.extra_data.bytes = block.header().extra_data().len(),
822 ),
823 err,
824 )]
825 async fn handle_finalized_boundary(
826 &mut self,
827 cause: Span,
828 round: &state::Round,
829 block: Block,
830 ) -> eyre::Result<Option<State>> {
831 let epoch_info = self
832 .config
833 .epoch_strategy
834 .containing(block.height())
835 .expect("epoch strategy is covering all block heights");
836
837 ensure!(
838 epoch_info.epoch() == round.epoch(),
839 "block was not for this epoch; must observe all blocks epoch by \
840 epoch; cannot deal with observing blocks out-of-order"
841 );
842
843 if block.height() != epoch_info.last() {
844 return Ok(None);
845 }
846
847 info!("found boundary block; reading DKG outcome from header");
848
849 let onchain_outcome = tempo_dkg_onchain_artifacts::OnchainDkgOutcome::read(
850 &mut block.header().extra_data().as_ref(),
851 )
852 .expect("the last block of an epoch must contain the DKG outcome");
853
854 info!("reading validators from contract");
855
856 let syncers = read_syncers_if_v2_not_initialized_with_retry(
859 &self.context,
860 &self.config.execution_node,
861 &block,
862 &self.metrics.attempts_to_read_validator_contract,
863 )
864 .await
865 .wrap_err("failed reading contract; must be able to read contract to continue")?;
866
867 Ok(Some(state::State {
868 epoch: onchain_outcome.epoch,
869 seed: Summary::random(&mut self.context),
870 output: onchain_outcome.output.clone(),
871 share: state::ShareState::Plaintext(None),
872 players: onchain_outcome.next_players,
873 syncers,
874 is_full_dkg: onchain_outcome.is_next_full_dkg,
875 }))
876 }
877
878 #[instrument(skip_all, fields(me = %self.config.me.public_key(), %epoch))]
879 async fn distribute_shares<TStorageContext, TSender>(
880 &self,
881 storage: &mut state::Storage<TStorageContext>,
882 epoch: Epoch,
883 dealer_state: &mut state::Dealer,
884 player_state: &mut Option<state::Player>,
885 round_channel: &mut TSender,
886 ) where
887 TStorageContext: commonware_runtime::Metrics + Clock + commonware_runtime::Storage,
888 TSender: Sender<PublicKey = PublicKey>,
889 {
890 let me = self.config.me.public_key();
891 for (player, pub_msg, priv_msg) in dealer_state.shares_to_distribute().collect::<Vec<_>>() {
892 if player == me {
893 if let Some(player_state) = player_state
894 && let Ok(ack) = player_state
895 .receive_dealing(storage, epoch, me.clone(), pub_msg, priv_msg)
896 .await
897 .inspect(|_| {
898 self.metrics.shares_distributed.inc();
899 self.metrics.shares_received.inc();
900 })
901 .inspect_err(|error| warn!(%error, "failed to store our own dealing"))
902 && let Ok(()) = dealer_state
903 .receive_ack(storage, epoch, me.clone(), ack)
904 .await
905 .inspect_err(|error| warn!(%error, "failed to store our own ACK"))
906 {
907 self.metrics.acks_received.inc();
908 self.metrics.acks_sent.inc();
909 info!("stored our own ACK and share");
910 }
911 } else {
912 let payload = Message::Dealer(pub_msg, priv_msg).encode();
914 match round_channel
915 .send(Recipients::One(player.clone()), payload, true)
916 .await
917 {
918 Ok(success) => {
919 if success.is_empty() {
920 info!(%player, "failed to send share");
924 } else {
925 self.metrics.shares_distributed.inc();
926 info!(%player, "share sent");
927 }
928 }
929 Err(error) => {
930 warn!(%player, %error, "error sending share");
931 }
932 }
933 }
934 }
935 }
936
937 #[instrument(
938 skip_all,
939 fields(
940 epoch = %round.epoch(),
941 %from,
942 bytes = message.len()),
943 err)]
944 #[expect(
945 clippy::too_many_arguments,
946 reason = "easiest way to express this for now"
947 )]
948 async fn handle_network_msg<TStorageContext>(
950 &self,
951 round: &state::Round,
952 round_channel: &mut impl Sender<PublicKey = PublicKey>,
953 storage: &mut state::Storage<TStorageContext>,
954 dealer_state: Option<&mut state::Dealer>,
955 player_state: Option<&mut state::Player>,
956 from: PublicKey,
957 mut message: IoBuf,
958 ) -> eyre::Result<()>
959 where
960 TStorageContext: commonware_runtime::Metrics + Clock + commonware_runtime::Storage,
961 {
962 let msg = Message::read_cfg(&mut message, &NZU32!(round.players().len() as u32))
963 .wrap_err("failed reading p2p message")?;
964
965 match msg {
966 Message::Dealer(pub_msg, priv_msg) => {
967 if let Some(player_state) = player_state {
968 info!("received message from a dealer");
969 self.metrics.shares_received.inc();
970 let ack = player_state
971 .receive_dealing(storage, round.epoch(), from.clone(), pub_msg, priv_msg)
972 .await
973 .wrap_err("failed storing dealing")?;
974
975 if let Err(error) = round_channel
976 .send(
977 Recipients::One(from.clone()),
978 Message::Ack(ack).encode(),
979 true,
980 )
981 .await
982 {
983 warn!(
988 reason = ?error,
989 "failed returning ACK to dealer",
990 );
991 bail!("failed returning ACK to dealer");
992 }
993 info!("returned ACK to dealer");
994 self.metrics.acks_sent.inc();
995 } else {
996 info!("received a dealer message, but we are not a player");
997 }
998 }
999 Message::Ack(ack) => {
1000 if let Some(dealer_state) = dealer_state {
1001 info!("received an ACK");
1002 self.metrics.acks_received.inc();
1003 dealer_state
1004 .receive_ack(storage, round.epoch(), from, ack)
1005 .await
1006 .wrap_err("failed storing ACK")?;
1007 } else {
1008 info!("received an ACK, but we are not a dealer");
1009 }
1010 }
1011 }
1012 Ok(())
1013 }
1014
1015 #[instrument(
1028 parent = cause,
1029 skip_all,
1030 fields(
1031 as_player = player_state.is_some(),
1032 our.epoch = %round.epoch(),
1033 ),
1034 err(level = Level::WARN),
1035 )]
1036 async fn handle_get_dkg_outcome<TStorageContext>(
1037 &mut self,
1038 cause: &Span,
1039 storage: &mut state::Storage<TStorageContext>,
1040 player_state: &Option<state::Player>,
1041 round: &state::Round,
1042 state: &State,
1043 request: GetDkgOutcome,
1044 ) -> eyre::Result<Option<(Digest, GetDkgOutcome)>>
1045 where
1046 TStorageContext: commonware_runtime::Metrics + Clock + commonware_runtime::Storage,
1047 {
1048 let epoch_info = self
1049 .config
1050 .epoch_strategy
1051 .containing(request.height)
1052 .expect("our strategy covers all epochs");
1053
1054 ensure!(
1055 round.epoch() == epoch_info.epoch(),
1056 "request is for epoch `{}`, not our epoch",
1057 epoch_info.epoch(),
1058 );
1059
1060 let output = if let Some((output, _)) = storage
1061 .get_dkg_outcome(&state.epoch, &request.digest)
1062 .cloned()
1063 {
1064 output
1065 } else {
1066 let mut logs = storage
1067 .logs_for_epoch(round.epoch())
1068 .map(|(k, v)| (k.clone(), v.clone()))
1069 .collect::<BTreeMap<_, _>>();
1070
1071 'ensure_enough_logs: {
1072 if logs.len() == round.dealers().len() {
1073 info!("collected as many logs as there are dealers; concluding DKG");
1074 break 'ensure_enough_logs;
1075 }
1076
1077 info!(
1078 "did not have all dealer logs yet; will try to extend with \
1079 logs read from notarized blocks and concluding DKG that way",
1080 );
1081 let (mut height, mut digest) = (request.height, request.digest);
1082 while height >= epoch_info.first()
1083 && Some(height)
1084 >= storage
1085 .get_latest_finalized_block_for_epoch(&round.epoch())
1086 .map(|(_, info)| info.height)
1087 {
1088 if let Some(block) =
1089 storage.get_notarized_reduced_block(&round.epoch(), &digest)
1090 {
1091 logs.extend(block.log.clone());
1092 height = if let Some(height) = block.height.previous() {
1093 height
1094 } else {
1095 break 'ensure_enough_logs;
1096 };
1097 digest = block.parent;
1098 } else {
1099 return Ok(Some((digest, request)));
1100 }
1101 }
1102 }
1103
1104 let player_state = player_state.is_some().then(||
1107 storage
1108 .create_player_for_round(self.config.me.clone(), round)
1109 .expect("created a player instance before, must be able to create it again")
1110 .expect("did not return a player instance even though we created it for this round already")
1111 );
1112
1113 let (output, share) = {
1114 let player_outcome = player_state.and_then(|player| {
1115 info!("we were a player in the ceremony; finalizing share");
1116 match player.finalize(logs.clone(), &Sequential) {
1117 Ok((new_output, new_share)) => {
1118 info!("local DKG ceremony was a success");
1119 Some((new_output, state::ShareState::Plaintext(Some(new_share))))
1120 }
1121 Err(
1122 reason
1123 @ commonware_cryptography::bls12381::dkg::Error::MissingPlayerDealing,
1124 ) => {
1125 warn!(
1126 reason = %eyre::Report::new(reason),
1127 "missing critical DKG state to reconstruct a share in this epoch; has \
1128 consensus state been deleted or a node with the same identity started \
1129 without consensus state? Finalizing the current round as an observer \
1130 and will not have a share in the next epoch"
1131 );
1132 None
1133 }
1134 Err(error) => {
1135 warn!(
1136 error = %eyre::Report::new(error),
1137 "local DKG ceremony was a failure",
1138 );
1139 Some((state.output.clone(), state.share.clone()))
1140 }
1141 }
1142 });
1143
1144 player_outcome.unwrap_or_else(move || {
1145 match observe::<_, _, N3f1>(round.info().clone(), logs, &Sequential) {
1146 Ok(output) => {
1147 info!("local DKG ceremony was a success");
1148 (output, state::ShareState::Plaintext(None))
1149 }
1150 Err(error) => {
1151 warn!(
1152 error = %eyre::Report::new(error),
1153 "local DKG ceremony was a failure",
1154 );
1155 (state.output.clone(), state.share.clone())
1156 }
1157 }
1158 })
1159 };
1160
1161 storage.cache_dkg_outcome(state.epoch, request.digest, output.clone(), share);
1162 output
1163 };
1164
1165 let next_epoch = state.epoch.next();
1167 let will_be_re_dkg = read_re_dkg_epoch(&self.config.execution_node, request.digest)
1168 .is_ok_and(|epoch| epoch == next_epoch.get());
1170 info!(
1171 will_be_re_dkg,
1172 %next_epoch,
1173 "determined if the next epoch will be a reshare or full re-dkg process",
1174 );
1175
1176 let next_players =
1177 determine_next_players_at_hash(state, &self.config.execution_node, request.digest.0)
1178 .wrap_err("could not determine who the next players are supposed to be")?;
1179 request
1180 .response
1181 .send(OnchainDkgOutcome {
1182 epoch: next_epoch,
1183 output,
1184 next_players,
1185 is_next_full_dkg: will_be_re_dkg,
1186 })
1187 .map_err(|_| {
1188 eyre!("requester went away before speculative DKG outcome could be sent")
1189 })?;
1190
1191 Ok(None)
1192 }
1193
1194 #[instrument(skip_all, fields(epoch = %state.epoch), err(level = Level::WARN))]
1195 fn enter_epoch(&mut self, state: &state::State) -> eyre::Result<()> {
1196 self.config
1197 .epoch_manager
1198 .enter(
1199 state.epoch,
1200 state.output.public().clone(),
1201 state.share.clone().into_inner(),
1202 state.dealers().clone(),
1203 )
1204 .wrap_err("could not instruct epoch manager to enter epoch")
1205 }
1206
1207 #[instrument(skip_all, fields(epoch = %state.epoch), err(level = Level::WARN))]
1208 fn exit_epoch(&mut self, state: &state::State) -> eyre::Result<()> {
1209 self.config
1210 .epoch_manager
1211 .exit(state.epoch)
1212 .wrap_err("could not instruct epoch manager to enter epoch")
1213 }
1214}
1215
1216#[instrument(skip_all, err)]
1217async fn read_initial_state_and_set_floor<TContext>(
1218 context: &mut TContext,
1219 node: &TempoFullNode,
1220 share: Option<Share>,
1221 epoch_strategy: &FixedEpocher,
1222 marshal: &mut crate::alias::marshal::Mailbox,
1223) -> eyre::Result<State>
1224where
1225 TContext: Clock + CryptoRngCore,
1226{
1227 let latest_finalized = node
1228 .provider
1229 .finalized_block_num_hash()
1230 .wrap_err("unable to read highest finalized block from execution layer")?
1231 .unwrap_or_else(|| BlockNumHash::new(0, node.chain_spec().genesis_hash()));
1232
1233 let epoch_info = epoch_strategy
1234 .containing(Height::new(latest_finalized.number))
1235 .expect("epoch strategy is for all heights");
1236
1237 let latest_boundary = if epoch_info.last().get() == latest_finalized.number {
1238 latest_finalized.number
1239 } else {
1240 epoch_info
1241 .epoch()
1242 .previous()
1243 .map_or_else(Height::zero, |previous| {
1244 epoch_strategy
1245 .last(previous)
1246 .expect("epoch strategy is for all epochs")
1247 })
1248 .get()
1249 };
1250 info!(
1251 %latest_boundary,
1252 latest_finalized.number,
1253 %latest_finalized.hash,
1254 "execution layer reported newest available block, reading on-chain \
1255 DKG outcome from last boundary height, and validator state from newest \
1256 block"
1257 );
1258
1259 let boundary_header = node
1260 .provider
1261 .header_by_number(latest_boundary)
1262 .map_or_else(
1263 |e| Err(eyre::Report::new(e)),
1264 |header| header.ok_or_eyre("execution layer reported it had no header"),
1265 )
1266 .wrap_err_with(|| {
1267 format!("failed to read header for latest boundary block number `{latest_boundary}`")
1268 })?;
1269
1270 let parent_of_boundary_header = node
1271 .provider
1272 .header_by_number(latest_boundary.saturating_sub(1))
1273 .map_or_else(
1274 |e| Err(eyre::Report::new(e)),
1275 |header| header.ok_or_eyre("execution layer reported it had no header"),
1276 )
1277 .wrap_err_with(|| {
1278 format!(
1279 "failed to read header for the boundary's parent `{}`",
1280 latest_boundary.saturating_sub(1)
1281 )
1282 })?;
1283
1284 let onchain_outcome = tempo_dkg_onchain_artifacts::OnchainDkgOutcome::read(
1285 &mut boundary_header.extra_data().as_ref(),
1286 )
1287 .wrap_err("the boundary header did not contain the on-chain DKG outcome")?;
1288
1289 let syncers = read_syncers_if_v2_not_initialized(
1298 1,
1299 node,
1300 parent_of_boundary_header.hash_slow(),
1301 boundary_header.hash_slow(),
1302 )
1303 .wrap_err("failed determining syncers")?;
1304
1305 let share = state::ShareState::Plaintext('verify_initial_share: {
1306 let Some(share) = share else {
1307 break 'verify_initial_share None;
1308 };
1309 let Ok(partial) = onchain_outcome.sharing().partial_public(share.index) else {
1310 warn!(
1311 "the index of the provided share exceeds the polynomial of the \
1312 on-chain DKG outcome; ignoring the share"
1313 );
1314 break 'verify_initial_share None;
1315 };
1316 if share.public::<MinSig>() != partial {
1317 warn!(
1318 "the provided share does not match the polynomial of the \
1319 on-chain DKG outcome; ignoring the share"
1320 );
1321 break 'verify_initial_share None;
1322 }
1323 Some(share)
1324 });
1325
1326 info!(%latest_boundary, "setting sync floor");
1327 marshal.set_floor(Height::new(latest_boundary)).await;
1328
1329 Ok(State {
1330 epoch: onchain_outcome.epoch,
1331 seed: Summary::random(context),
1332 output: onchain_outcome.output.clone(),
1333 share,
1334 players: onchain_outcome.next_players,
1335 syncers,
1336 is_full_dkg: onchain_outcome.is_next_full_dkg,
1337 })
1338}
1339
1340#[derive(Clone)]
1341struct Metrics {
1342 shares_distributed: Gauge,
1343 shares_received: Gauge,
1344 acks_received: Gauge,
1345 acks_sent: Gauge,
1346 dealings_read: Gauge,
1347 bad_dealings: Gauge,
1348
1349 failures: Counter,
1350 successes: Counter,
1351
1352 dealers: Gauge,
1353 players: Gauge,
1354 syncing_players: Gauge,
1355
1356 how_often_dealer: Counter,
1357 how_often_player: Counter,
1358
1359 rounds_skipped: Counter,
1360 attempts_to_read_validator_contract: Counter,
1361}
1362
1363impl Metrics {
1364 fn init<TContext>(context: &TContext) -> Self
1365 where
1366 TContext: commonware_runtime::Metrics,
1367 {
1368 let syncing_players = Gauge::default();
1369 context.register(
1370 "syncing_players",
1371 "how many syncing players were registered; these will become players in the next ceremony",
1372 syncing_players.clone(),
1373 );
1374
1375 let failures = Counter::default();
1376 context.register(
1377 "ceremony_failures",
1378 "the number of failed ceremonies a node participated in",
1379 failures.clone(),
1380 );
1381
1382 let successes = Counter::default();
1383 context.register(
1384 "ceremony_successes",
1385 "the number of successful ceremonies a node participated in",
1386 successes.clone(),
1387 );
1388
1389 let dealers = Gauge::default();
1390 context.register(
1391 "ceremony_dealers",
1392 "the number of dealers in the currently running ceremony",
1393 dealers.clone(),
1394 );
1395 let players = Gauge::default();
1396 context.register(
1397 "ceremony_players",
1398 "the number of players in the currently running ceremony",
1399 players.clone(),
1400 );
1401
1402 let how_often_dealer = Counter::default();
1403 context.register(
1404 "how_often_dealer",
1405 "number of the times as node was active as a dealer",
1406 how_often_dealer.clone(),
1407 );
1408 let how_often_player = Counter::default();
1409 context.register(
1410 "how_often_player",
1411 "number of the times as node was active as a player",
1412 how_often_player.clone(),
1413 );
1414
1415 let shares_distributed = Gauge::default();
1416 context.register(
1417 "ceremony_shares_distributed",
1418 "the number of shares distributed by this node as a dealer in the current ceremony",
1419 shares_distributed.clone(),
1420 );
1421
1422 let shares_received = Gauge::default();
1423 context.register(
1424 "ceremony_shares_received",
1425 "the number of shares received by this node as a player in the current ceremony",
1426 shares_received.clone(),
1427 );
1428
1429 let acks_received = Gauge::default();
1430 context.register(
1431 "ceremony_acks_received",
1432 "the number of acknowledgments received by this node as a dealer in the current ceremony",
1433 acks_received.clone(),
1434 );
1435
1436 let acks_sent = Gauge::default();
1437 context.register(
1438 "ceremony_acks_sent",
1439 "the number of acknowledgments sent by this node as a player in the current ceremony",
1440 acks_sent.clone(),
1441 );
1442
1443 let dealings_read = Gauge::default();
1444 context.register(
1445 "ceremony_dealings_read",
1446 "the number of dealings read from the blockchain in the current ceremony",
1447 dealings_read.clone(),
1448 );
1449
1450 let bad_dealings = Gauge::default();
1451 context.register(
1452 "ceremony_bad_dealings",
1453 "the number of blocks where decoding and verifying dealings failed in the current ceremony",
1454 bad_dealings.clone(),
1455 );
1456
1457 let rounds_skipped = Counter::default();
1458 context.register(
1459 "rounds_skipped",
1460 "how many DKG rounds were skipped because the node fell too far behind and tried to catch up",
1461 rounds_skipped.clone(),
1462 );
1463
1464 let attempts_to_read_validator_contract = Counter::default();
1465 context.register(
1466 "attempts_to_read_validator_contract",
1467 "the total number of attempts it took to read the validators from the smart contract",
1468 attempts_to_read_validator_contract.clone(),
1469 );
1470
1471 Self {
1472 syncing_players,
1473 shares_distributed,
1474 shares_received,
1475 acks_received,
1476 acks_sent,
1477 dealings_read,
1478 bad_dealings,
1479 dealers,
1480 players,
1481 how_often_dealer,
1482 how_often_player,
1483 failures,
1484 successes,
1485 rounds_skipped,
1486 attempts_to_read_validator_contract,
1487 }
1488 }
1489
1490 fn reset(&self) {
1491 self.shares_distributed.set(0);
1492 self.shares_received.set(0);
1493 self.acks_received.set(0);
1494 self.acks_sent.set(0);
1495 self.dealings_read.set(0);
1496 self.bad_dealings.set(0);
1497 }
1498}
1499
1500struct AncestorStream {
1506 pending_request: Option<(Span, GetDkgOutcome)>,
1507 inner: Option<marshal::ancestry::AncestorStream<crate::alias::marshal::Mailbox, Block>>,
1508}
1509
1510impl AncestorStream {
1511 fn new() -> Self {
1512 Self {
1513 pending_request: None,
1514 inner: None,
1515 }
1516 }
1517
1518 fn take_request(&mut self) -> Option<(Span, GetDkgOutcome)> {
1519 self.inner.take();
1520 self.pending_request.take()
1521 }
1522
1523 fn set(
1524 &mut self,
1525 pending_request: (Span, GetDkgOutcome),
1526 stream: marshal::ancestry::AncestorStream<crate::alias::marshal::Mailbox, Block>,
1527 ) {
1528 self.pending_request.replace(pending_request);
1529 self.inner.replace(stream);
1530 }
1531
1532 fn tip(&self) -> Option<Digest> {
1533 self.pending_request.as_ref().map(|(_, req)| req.digest)
1534 }
1535
1536 fn update_receiver(&mut self, pending_request: (Span, GetDkgOutcome)) {
1537 self.pending_request.replace(pending_request);
1538 }
1539}
1540
1541impl Stream for AncestorStream {
1542 type Item = Block;
1543
1544 fn poll_next(
1545 mut self: std::pin::Pin<&mut Self>,
1546 cx: &mut std::task::Context<'_>,
1547 ) -> std::task::Poll<Option<Self::Item>> {
1548 let item = {
1549 let this = match self.inner.as_mut() {
1550 Some(inner) => inner,
1551 None => return Poll::Ready(None),
1552 };
1553 this.poll_next_unpin(cx)
1554 };
1555 match futures::ready!(item) {
1556 None => {
1557 self.inner.take();
1558 Poll::Ready(None)
1559 }
1560 Some(block) => Poll::Ready(Some(block)),
1561 }
1562 }
1563}
1564
1565impl FusedStream for AncestorStream {
1566 fn is_terminated(&self) -> bool {
1567 self.inner.is_none()
1568 }
1569}
1570
1571fn read_dealer_log(
1572 mut bytes: &[u8],
1573 round: &state::Round,
1574) -> eyre::Result<(PublicKey, DealerLog<MinSig, PublicKey>)> {
1575 let signed_log = dkg::SignedDealerLog::<MinSig, PrivateKey>::read_cfg(
1576 &mut bytes,
1577 &NZU32!(round.players().len() as u32),
1578 )
1579 .wrap_err("could not decode as signed dealer log")?;
1580
1581 let (dealer, log) = signed_log
1582 .check(round.info())
1583 .ok_or_eyre("failed checking signed log against current round")?;
1584 Ok((dealer, log))
1585}
1586
1587async fn read_syncers_if_v2_not_initialized_with_retry(
1589 context: &impl commonware_runtime::Clock,
1590 node: &TempoFullNode,
1591 boundary_block: &Block,
1592 total_attempts: &Counter,
1593) -> eyre::Result<ordered::Set<PublicKey>> {
1594 let mut attempts = 0;
1595 const MIN_RETRY: Duration = Duration::from_secs(1);
1596 const MAX_RETRY: Duration = Duration::from_secs(30);
1597
1598 let parent_hash = boundary_block.parent_hash();
1599 let block_hash = boundary_block.hash();
1600 'read_contract: loop {
1601 total_attempts.inc();
1602 attempts += 1;
1603
1604 if let Ok(syncers) =
1605 read_syncers_if_v2_not_initialized(attempts, node, parent_hash, block_hash)
1606 {
1607 break 'read_contract Ok(syncers);
1608 }
1609
1610 let retry_after = MIN_RETRY.saturating_mul(attempts).min(MAX_RETRY);
1611 let is_syncing = node.network.is_syncing();
1612 let best_finalized_block = node.provider.finalized_block_number().ok().flatten();
1613 let blocks_behind = best_finalized_block
1614 .as_ref()
1615 .map(|best| boundary_block.number().saturating_sub(*best));
1616 tracing::warn_span!("read_validator_config_with_retry").in_scope(|| {
1617 warn!(
1618 attempts,
1619 retry_after = %tempo_telemetry_util::display_duration(retry_after),
1620 is_syncing,
1621 best_finalized_block = %tempo_telemetry_util::display_option(&best_finalized_block),
1622 height_if_v1 = boundary_block.number(),
1623 blocks_behind = %tempo_telemetry_util::display_option(&blocks_behind),
1624 "reading validator config from contract failed; will retry",
1625 );
1626 });
1627 context.sleep(retry_after).await;
1628 }
1629}
1630
1631#[instrument(
1645 skip_all,
1646 fields(
1647 attempt = _attempt,
1648 parent_hash,
1649 block_hash,
1650 ),
1651 err
1652)]
1653pub(crate) fn read_syncers_if_v2_not_initialized(
1654 _attempt: u32,
1655 node: &TempoFullNode,
1656 parent_hash: B256,
1657 block_hash: B256,
1658) -> eyre::Result<ordered::Set<PublicKey>> {
1659 if can_use_v2_at_block_hash(node, parent_hash, Some(block_hash))
1660 .wrap_err("unable to determine if the validator config v2 can be used or not")?
1661 {
1662 return Ok(ordered::Set::default());
1663 }
1664 let peers =
1665 read_active_and_known_peers_at_block_hash_v1(node, &ordered::Set::default(), block_hash)
1666 .wrap_err("unable to read peers from validator config v1 contract")?;
1667 info!(?peers, "read validators from validator config v1 contract");
1668 Ok(peers.into_keys())
1669}
1670
1671#[instrument(skip_all, fields(%hash), err(level = Level::WARN))]
1679fn determine_next_players_at_hash(
1680 state: &State,
1681 node: &TempoFullNode,
1682 hash: B256,
1683) -> eyre::Result<ordered::Set<PublicKey>> {
1684 let next_players = if can_use_v2_at_block_hash(node, hash, None)
1685 .wrap_err("failed determining if validator config v2 can be used")?
1686 {
1687 debug!("reading next players from validator config v2 contract");
1688 read_active_and_known_peers_at_block_hash_v2(node, &ordered::Set::default(), hash)
1689 .wrap_err("failed reading peers from validator config v2")?
1690 .into_keys()
1691 } else {
1692 debug!("using validator config v1 syncers from state");
1693 state.syncers.clone()
1694 };
1695
1696 debug!(?next_players, "determined next players");
1697 Ok(next_players)
1698}
1699
1700#[instrument(
1711 skip_all,
1712 fields(
1713 %digest,
1714 ),
1715 err(level = Level::WARN)
1716 ret,
1717)]
1718pub(crate) fn read_re_dkg_epoch(node: &TempoFullNode, digest: Digest) -> eyre::Result<u64> {
1719 if can_use_v2_at_block_hash(node, digest.0, None)
1720 .wrap_err("failed determining if validator config v2 can be used")?
1721 {
1722 read_validator_config_at_block_hash(node, digest.0, |config: &ValidatorConfigV2| {
1723 config
1724 .get_next_network_identity_rotation_epoch()
1725 .map_err(eyre::Report::new)
1726 })
1727 .map(|(_, _, epoch)| epoch)
1728 } else {
1729 read_validator_config_at_block_hash(node, digest.0, |config: &ValidatorConfig| {
1730 config
1731 .get_next_full_dkg_ceremony()
1732 .map_err(eyre::Report::new)
1733 })
1734 .map(|(_, _, epoch)| epoch)
1735 }
1736}