Skip to main content

tempo_commonware_node/dkg/manager/actor/
mod.rs

1use std::{collections::BTreeMap, num::NonZeroU32, task::Poll, time::Duration};
2
3use alloy_consensus::{BlockHeader as _, Sealable as _};
4use alloy_primitives::B256;
5use bytes::{Buf, BufMut};
6use commonware_codec::{Encode as _, EncodeSize, Read, ReadExt as _, Write};
7use commonware_consensus::{
8    Heightable as _,
9    marshal::{self, Update},
10    types::{Epoch, EpochPhase, Epocher as _, FixedEpocher, Height},
11};
12use commonware_cryptography::{
13    Signer as _,
14    bls12381::{
15        dkg::{self, DealerLog, DealerPrivMsg, DealerPubMsg, PlayerAck, SignedDealerLog, observe},
16        primitives::{group::Share, variant::MinSig},
17    },
18    ed25519::{PrivateKey, PublicKey},
19    transcript::Summary,
20};
21use commonware_math::algebra::Random as _;
22use commonware_p2p::{
23    Receiver, Recipients, Sender,
24    utils::mux::{self, MuxHandle},
25};
26use commonware_parallel::Sequential;
27use commonware_runtime::{Clock, ContextCell, Handle, IoBuf, Metrics as _, Spawner, spawn_cell};
28use commonware_utils::{Acknowledgement, N3f1, NZU32, ordered};
29
30use eyre::{OptionExt as _, WrapErr as _, bail, ensure, eyre};
31use futures::{
32    FutureExt as _, Stream, StreamExt as _, channel::mpsc, select_biased, stream::FusedStream,
33};
34use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
35use rand_core::CryptoRngCore;
36use reth_ethereum::{
37    chainspec::EthChainSpec, network::NetworkInfo, rpc::eth::primitives::BlockNumHash,
38};
39use reth_provider::{BlockIdReader as _, HeaderProvider as _};
40use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
41use tempo_node::TempoFullNode;
42use tempo_precompiles::{
43    validator_config::ValidatorConfig, validator_config_v2::ValidatorConfigV2,
44};
45use tracing::{Level, Span, debug, info, info_span, instrument, warn, warn_span};
46
47use crate::{
48    consensus::{Digest, block::Block},
49    validators::{
50        can_use_v2_at_block_hash, read_active_and_known_peers_at_block_hash_v1,
51        read_active_and_known_peers_at_block_hash_v2, read_validator_config_at_block_hash,
52    },
53};
54
55mod state;
56use state::State;
57
58use super::{
59    Command,
60    ingress::{GetDkgOutcome, VerifyDealerLog},
61};
62
63/// Wire message type for DKG protocol communication.
64pub(crate) enum Message {
65    /// A dealer message containing public and private components for a player.
66    Dealer(DealerPubMsg<MinSig>, DealerPrivMsg),
67    /// A player acknowledgment sent back to a dealer.
68    Ack(PlayerAck<PublicKey>),
69}
70
71impl Write for Message {
72    fn write(&self, writer: &mut impl BufMut) {
73        match self {
74            Self::Dealer(pub_msg, priv_msg) => {
75                0u8.write(writer);
76                pub_msg.write(writer);
77                priv_msg.write(writer);
78            }
79            Self::Ack(ack) => {
80                1u8.write(writer);
81                ack.write(writer);
82            }
83        }
84    }
85}
86
87impl EncodeSize for Message {
88    fn encode_size(&self) -> usize {
89        1 + match self {
90            Self::Dealer(pub_msg, priv_msg) => pub_msg.encode_size() + priv_msg.encode_size(),
91            Self::Ack(ack) => ack.encode_size(),
92        }
93    }
94}
95
96impl Read for Message {
97    type Cfg = NonZeroU32;
98
99    fn read_cfg(reader: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
100        let tag = u8::read(reader)?;
101        match tag {
102            0 => {
103                let pub_msg = DealerPubMsg::read_cfg(reader, cfg)?;
104                let priv_msg = DealerPrivMsg::read(reader)?;
105                Ok(Self::Dealer(pub_msg, priv_msg))
106            }
107            1 => {
108                let ack = PlayerAck::read(reader)?;
109                Ok(Self::Ack(ack))
110            }
111            other => Err(commonware_codec::Error::InvalidEnum(other)),
112        }
113    }
114}
115
116pub(crate) struct Actor<TContext>
117where
118    TContext: Clock + commonware_runtime::Metrics + commonware_runtime::Storage,
119{
120    /// The actor configuration passed in when constructing the actor.
121    config: super::Config,
122
123    /// The runtime context passed in when constructing the actor.
124    context: ContextCell<TContext>,
125
126    /// The channel over which the actor will receive messages.
127    mailbox: mpsc::UnboundedReceiver<super::Message>,
128
129    /// Handles to the metrics objects that the actor will update during its
130    /// runtime.
131    metrics: Metrics,
132}
133
134impl<TContext> Actor<TContext>
135where
136    TContext: commonware_runtime::BufferPooler
137        + Clock
138        + CryptoRngCore
139        + commonware_runtime::Metrics
140        + Spawner
141        + commonware_runtime::Storage,
142{
143    pub(super) async fn new(
144        config: super::Config,
145        context: TContext,
146        mailbox: mpsc::UnboundedReceiver<super::ingress::Message>,
147    ) -> eyre::Result<Self> {
148        let context = ContextCell::new(context);
149
150        let metrics = Metrics::init(&context);
151
152        Ok(Self {
153            config,
154            context,
155            mailbox,
156            metrics,
157        })
158    }
159
160    pub(crate) fn start(
161        mut self,
162        dkg_channel: (
163            impl Sender<PublicKey = PublicKey>,
164            impl Receiver<PublicKey = PublicKey>,
165        ),
166    ) -> Handle<()> {
167        spawn_cell!(self.context, self.run(dkg_channel).await)
168    }
169
170    async fn run(
171        mut self,
172        (sender, receiver): (
173            impl Sender<PublicKey = PublicKey>,
174            impl Receiver<PublicKey = PublicKey>,
175        ),
176    ) {
177        let Ok(mut storage) = state::builder()
178            .partition_prefix(&self.config.partition_prefix)
179            .initial_state({
180                let mut context = self.context.clone();
181                let execution_node = self.config.execution_node.clone();
182                let initial_share = self.config.initial_share.clone();
183                let epoch_strategy = self.config.epoch_strategy.clone();
184                let mut marshal = self.config.marshal.clone();
185                async move {
186                    read_initial_state_and_set_floor(
187                        &mut context,
188                        &execution_node,
189                        initial_share.clone(),
190                        &epoch_strategy,
191                        &mut marshal,
192                    )
193                    .await
194                }
195            })
196            .init(self.context.with_label("state"))
197            .await
198        else {
199            // NOTE: Builder::init emits en error event.
200            return;
201        };
202
203        let (mux, mut dkg_mux) = mux::Muxer::new(
204            self.context.with_label("dkg_mux"),
205            sender,
206            receiver,
207            self.config.mailbox_size,
208        );
209        mux.start();
210
211        let reason = loop {
212            if let Err(error) = self.run_dkg_loop(&mut storage, &mut dkg_mux).await {
213                break error;
214            }
215        };
216
217        tracing::warn_span!("dkg_actor").in_scope(|| {
218            warn!(
219                %reason,
220                "actor exited",
221            );
222        });
223    }
224
225    async fn run_dkg_loop<TStorageContext, TSender, TReceiver>(
226        &mut self,
227        storage: &mut state::Storage<TStorageContext>,
228        mux: &mut MuxHandle<TSender, TReceiver>,
229    ) -> eyre::Result<()>
230    where
231        TStorageContext: commonware_runtime::Metrics + Clock + commonware_runtime::Storage,
232        TSender: Sender<PublicKey = PublicKey>,
233        TReceiver: Receiver<PublicKey = PublicKey>,
234    {
235        let state = storage.current();
236
237        self.metrics.reset();
238
239        self.metrics.dealers.set(state.dealers().len() as i64);
240        self.metrics.players.set(state.players().len() as i64);
241        self.metrics.syncing_players.set(state.syncers.len() as i64);
242
243        if let Some(previous) = state.epoch.previous() {
244            // NOTE: State::prune emits an error event.
245            storage.prune(previous).await.wrap_err_with(|| {
246                format!("unable to prune storage before up until epoch `{previous}`",)
247            })?;
248        }
249
250        self.enter_epoch(&state)
251            .wrap_err("could not instruct epoch manager to enter a new epoch")?;
252
253        // TODO: emit an event with round info
254        let round = state::Round::from_state(&state, &self.config.namespace);
255
256        let mut dealer_state = storage
257            .create_dealer_for_round(
258                self.config.me.clone(),
259                round.clone(),
260                state.share.clone(),
261                state.seed,
262            )
263            .wrap_err("unable to instantiate dealer state")?;
264
265        if dealer_state.is_some() {
266            self.metrics.how_often_dealer.inc();
267        }
268
269        let mut player_state = storage
270            .create_player_for_round(self.config.me.clone(), &round)
271            .wrap_err("unable to instantiate player state")?;
272
273        if player_state.is_some() {
274            self.metrics.how_often_player.inc();
275        }
276
277        // Register a channel for this round
278        let (mut round_sender, mut round_receiver) =
279            mux.register(state.epoch.get()).await.wrap_err_with(|| {
280                format!(
281                    "unable to create subchannel for this DKG ceremony of epoch `{}`",
282                    state.epoch
283                )
284            })?;
285
286        let mut ancestry_stream = AncestorStream::new();
287
288        info_span!("run_dkg_loop", epoch = %state.epoch).in_scope(|| {
289            info!(
290                me = %self.config.me.public_key(),
291                dealers = ?state.dealers(),
292                players = ?state.players(),
293                syncers = ?state.syncers,
294                as_dealer = dealer_state.is_some(),
295                as_player = player_state.is_some(),
296                "entering a new DKG ceremony",
297            )
298        });
299
300        let mut skip_to_boundary = false;
301        loop {
302            let mut shutdown = self.context.stopped().fuse();
303            select_biased!(
304
305                _ = &mut shutdown => {
306                    break Err(eyre!("shutdown triggered"));
307                }
308
309                network_msg = round_receiver.recv().fuse() => {
310                    match network_msg {
311                        Ok((sender, message)) => {
312                            // Produces an error event.
313                            let _ = self.handle_network_msg(
314                                &round,
315                                &mut round_sender,
316                                storage,
317                                dealer_state.as_mut(),
318                                player_state.as_mut(),
319                                sender,
320                                message,
321                            ).await;
322                        }
323                        Err(err) => {
324                            break Err(err).wrap_err("network p2p subchannel closed")
325                        }
326                    }
327                }
328
329                msg = self.mailbox.next() => {
330                    let Some(msg) = msg else {
331                        break Err(eyre!("all instances of the DKG actor's mailbox are dropped"));
332                    };
333
334                    match msg.command {
335                        Command::Update(update) => {
336                            match *update {
337                                Update::Tip(_, height, _) => {
338                                    if !skip_to_boundary {
339                                        skip_to_boundary |= self.should_skip_round(
340                                            &round,
341                                            height,
342                                        ).await;
343                                        if skip_to_boundary {
344                                            self.metrics.rounds_skipped.inc();
345                                        }
346                                    }
347                                }
348                                Update::Block(block, ack) => {
349                                    let res = if skip_to_boundary {
350                                        self.handle_finalized_boundary(
351                                            msg.cause,
352                                            &round,
353                                            block,
354                                        ).await
355                                    } else {
356                                        self.handle_finalized_block(
357                                            msg.cause,
358                                            &state,
359                                            &round,
360                                            &mut round_sender,
361                                            storage,
362                                            &mut dealer_state,
363                                            &mut player_state,
364                                            block,
365                                        ).await
366                                    };
367                                    let should_break = match res {
368                                        Ok(Some(new_state)) => {
369                                            info_span!(
370                                                "run_dkg_loop",
371                                                epoch = %state.epoch
372                                            ).in_scope(|| info!(
373                                                "constructed a new epoch state; \
374                                                persisting new state and exiting \
375                                                current epoch",
376                                            ));
377
378                                            if let Err(err) = storage
379                                                .set_state(new_state)
380                                                .await
381                                                .wrap_err("failed appending new state to journal")
382                                            {
383                                                break Err(err);
384                                            }
385                                            // Emits an error event.
386                                            let _ = self.exit_epoch(&state);
387
388                                            true
389                                        }
390                                        Ok(None) => false,
391                                        Err(err) => break Err(err).wrap_err("failed handling finalized block"),
392                                    };
393                                    ack.acknowledge();
394                                    if should_break {
395                                        break Ok(());
396                                    }
397                                }
398                            }
399                        }
400
401                        Command::GetDealerLog(get_dealer_log) => {
402                            warn_span!("get_dealer_log").in_scope(|| {
403                                let log = if get_dealer_log.epoch != round.epoch() {
404                                    warn!(
405                                        request.epoch = %get_dealer_log.epoch,
406                                        round.epoch = %round.epoch(),
407                                        "application requested dealer log for \
408                                        an epoch other than we are currently \
409                                        running",
410                                    );
411                                    None
412                                } else {
413                                    dealer_state
414                                        .as_ref()
415                                        .and_then(|dealer_state| dealer_state.finalized())
416                                };
417                                let _ = get_dealer_log
418                                .response
419                                .send(log);
420                            });
421                        }
422
423                        Command::GetDkgOutcome(request) => {
424                            if let Some(target) = ancestry_stream.tip()
425                            && target == request.digest
426                            {
427                                ancestry_stream.update_receiver((msg.cause, request));
428                                continue;
429                            }
430                            if let Ok(Some((hole, request))) = self
431                                .handle_get_dkg_outcome(
432                                    &msg.cause,
433                                    storage,
434                                    &player_state,
435                                    &round,
436                                    &state,
437                                    request,
438                                )
439                                .await
440                            {
441                                let stream = match self.config.marshal.ancestry((None, hole)).await {
442                                    Some(stream) => stream,
443                                    None => break Err(eyre!("marshal mailbox is closed")),
444                                };
445                                ancestry_stream.set(
446                                    (msg.cause, request),
447                                    stream,
448                                );
449                            }
450                        }
451                        Command::VerifyDealerLog(verify) => {
452                            self.handle_verify_dealer_log(
453                                &state,
454                                &round,
455                                verify,
456                            );
457                        }
458                    }
459                }
460
461                notarized_block = ancestry_stream.next() => {
462                    if let Some(block) = notarized_block {
463                        storage.cache_notarized_block(&round, block);
464                        let (cause, request) = ancestry_stream
465                            .take_request()
466                            .expect("if the stream is yielding blocks, there must be a receiver");
467                        if let Ok(Some((hole, request))) = self
468                            .handle_get_dkg_outcome(&cause, storage, &player_state, &round, &state, request)
469                            .await
470                        {
471                            let stream = match self.config.marshal.ancestry((None, hole)).await {
472                                Some(stream) => stream,
473                                None => break Err(eyre!("marshal mailbox is closed")),
474                            };
475                            ancestry_stream.set(
476                                (cause, request),
477                                stream,
478                            );
479                        }
480                    }
481                }
482            )
483        }
484    }
485
486    fn handle_verify_dealer_log(
487        &self,
488        state: &state::State,
489        round: &state::Round,
490        VerifyDealerLog {
491            epoch,
492            bytes,
493            response,
494        }: VerifyDealerLog,
495    ) {
496        if state.epoch != epoch {
497            let _ = response.send(Err(eyre!(
498                "requested dealer log for epoch `{epoch}`, but current round \
499                is for epoch `{}`",
500                state.epoch
501            )));
502            return;
503        }
504        let res = SignedDealerLog::<MinSig, PrivateKey>::read_cfg(
505            &mut &bytes[..],
506            &NZU32!(round.players().len() as u32),
507        )
508        .wrap_err("failed reading dealer log from header")
509        .and_then(|log| {
510            log.check(round.info())
511                .map(|(dealer, _)| dealer)
512                .ok_or_eyre("not a dealer in the current round")
513        })
514        .inspect(|_| {
515            self.metrics.dealings_read.inc();
516        })
517        .inspect_err(|_| {
518            self.metrics.bad_dealings.inc();
519        });
520        let _ = response.send(res);
521    }
522
523    /// Determines if it makes sense to continue with the current DKG ceremony.
524    ///
525    /// If `finalized_tip` indicates that the *next* epoch was already finalized,
526    /// then there is no point in continuing with the current DKG round.
527    ///
528    /// We know that an epoch was finalized by either observing the boundary
529    /// block for said epoch, or by observing an even newer epoch.
530    #[instrument(
531        skip_all,
532        fields(
533            round.epoch = %round.epoch(),
534            finalized.tip = %finalized_tip,
535            finalized.epoch = tracing::field::Empty,
536        ),
537    )]
538    async fn should_skip_round(&mut self, round: &state::Round, finalized_tip: Height) -> bool {
539        let epoch_info = self
540            .config
541            .epoch_strategy
542            .containing(finalized_tip)
543            .expect("epoch strategy is valid for all heights");
544        Span::current().record(
545            "finalized.epoch",
546            tracing::field::display(epoch_info.epoch()),
547        );
548
549        let should_skip_round = epoch_info.epoch() > round.epoch().next()
550            || (epoch_info.epoch() == round.epoch().next() && epoch_info.last() == finalized_tip);
551
552        if should_skip_round {
553            let boundary_height = self
554                .config
555                .epoch_strategy
556                .last(round.epoch())
557                .expect("epoch strategy is valid for all epochs");
558            info!(
559                %boundary_height,
560                "confirmed that the network is at least 2 epochs aheads of us; \
561                setting synchronization floor to boundary height of our DKG's \
562                epoch and reporting that the rest of the DKG round should be \
563                skipped",
564            );
565
566            // NOTE: `set_floor(height)` implies that the next block sent by
567            // marshal will be height + 1.
568            if let Some(one_before_boundary) = boundary_height.previous() {
569                self.config.marshal.set_floor(one_before_boundary).await;
570            }
571        }
572        should_skip_round
573    }
574
575    /// Handles a finalized block.
576    ///
577    /// Returns a new [`State`] after finalizing the boundary block of the epoch.
578    ///
579    /// Some block heights are special cased:
580    ///
581    /// + first height of an epoch: notify the epoch manager that the previous
582    ///   epoch can be shut down.
583    /// + last height of an epoch:
584    ///     1. notify the epoch manager that a new epoch can be entered;
585    ///     2. prepare for the state of the next iteration by finalizing the current
586    ///        DKG round and reading the next players (players in the DKG round after
587    ///        the immediately next one) from the smart contract.
588    ///
589    /// The processing of all other blocks depends on which part of the epoch
590    /// they fall in:
591    ///
592    /// + first half: if we are a dealer, distribute the generated DKG shares
593    ///   to the players and collect their acks. If we are a player, receive
594    ///   DKG shares and respond with an ack.
595    /// + exact middle of an epoch: if we are a dealer, generate the dealer log
596    ///   of the DKG ceremony.
597    /// + second half of the epoch: read dealer logs from blocks.
598    #[instrument(
599        parent = &cause,
600        skip_all,
601        fields(
602            dkg.epoch = %round.epoch(),
603            block.height = %block.height(),
604            block.extra_data.bytes = block.header().extra_data().len(),
605        ),
606        err,
607    )]
608    #[expect(
609        clippy::too_many_arguments,
610        reason = "easiest way to express this for now"
611    )]
612    // TODO(janis): replace this by a struct?
613    async fn handle_finalized_block<TStorageContext, TSender>(
614        &mut self,
615        cause: Span,
616        state: &state::State,
617        round: &state::Round,
618        round_channel: &mut TSender,
619        storage: &mut state::Storage<TStorageContext>,
620        dealer_state: &mut Option<state::Dealer>,
621        player_state: &mut Option<state::Player>,
622        block: Block,
623    ) -> eyre::Result<Option<State>>
624    where
625        TStorageContext: commonware_runtime::Metrics + Clock + commonware_runtime::Storage,
626        TSender: Sender<PublicKey = PublicKey>,
627    {
628        let epoch_info = self
629            .config
630            .epoch_strategy
631            .containing(block.height())
632            .expect("epoch strategy is covering all block heights");
633
634        ensure!(
635            epoch_info.epoch() == round.epoch(),
636            "block was not for this epoch; must observe all blocks epoch by \
637            epoch; cannot deal with observing blocks out-of-order"
638        );
639
640        match epoch_info.phase() {
641            EpochPhase::Early => {
642                if let Some(dealer_state) = dealer_state {
643                    self.distribute_shares(
644                        storage,
645                        round.epoch(),
646                        dealer_state,
647                        player_state,
648                        round_channel,
649                    )
650                    .await;
651                }
652            }
653            EpochPhase::Midpoint | EpochPhase::Late => {
654                if let Some(dealer_state) = dealer_state {
655                    dealer_state.finalize();
656                }
657            }
658        }
659
660        if block.height() != epoch_info.last() {
661            if !block.header().extra_data().is_empty() {
662                'handle_log: {
663                    let (dealer, log) =
664                        match read_dealer_log(block.header().extra_data().as_ref(), round) {
665                            Err(reason) => {
666                                warn!(
667                                    %reason,
668                                    "failed to read dealer log from block \
669                                    extraData header field");
670                                break 'handle_log;
671                            }
672                            Ok((dealer, log)) => (dealer, log),
673                        };
674                    storage
675                        .append_dealer_log(round.epoch(), dealer.clone(), log)
676                        .await
677                        .wrap_err("failed to append log to journal")?;
678                    if self.config.me.public_key() == dealer
679                        && let Some(dealer_state) = dealer_state
680                    {
681                        info!(
682                            "found own dealing in finalized block; deleting it \
683                            from state to not write it again"
684                        );
685                        dealer_state.take_finalized();
686                    }
687                }
688            }
689
690            storage
691                .append_finalized_block(round.epoch(), block)
692                .await
693                .wrap_err("failed to append finalized block to journal")?;
694
695            return Ok(None);
696        }
697
698        info!("reached last block of epoch; reading DKG outcome from header");
699
700        let onchain_outcome = tempo_dkg_onchain_artifacts::OnchainDkgOutcome::read(
701            &mut block.header().extra_data().as_ref(),
702        )
703        .expect("the last block of an epoch must contain the DKG outcome");
704
705        info!("reading validator from contract");
706
707        let (local_output, mut share) = if let Some((outcome, share)) =
708            storage.get_dkg_outcome(&state.epoch, &block.parent_digest())
709        {
710            debug!("using cached DKG outcome");
711            (outcome.clone(), share.clone())
712        } else {
713            let logs = storage
714                .logs_for_epoch(round.epoch())
715                .map(|(k, v)| (k.clone(), v.clone()))
716                .collect::<BTreeMap<_, _>>();
717
718            let player_outcome = player_state.take().and_then(|player| {
719                info!("we were a player in the ceremony; finalizing share");
720                match player.finalize(logs.clone(), &Sequential) {
721                    Ok((new_output, new_share)) => {
722                        info!("local DKG ceremony was a success");
723                        Some((new_output, state::ShareState::Plaintext(Some(new_share))))
724                    }
725                    Err(
726                        reason
727                        @ commonware_cryptography::bls12381::dkg::Error::MissingPlayerDealing,
728                    ) => {
729                        warn!(
730                            reason = %eyre::Report::new(reason),
731                            "missing critical DKG state to reconstruct a share in this epoch; has \
732                            consensus state been deleted or a node with the same identity started \
733                            without consensus state? Finalizing the current round as an observer \
734                            and will not have a share in the next epoch"
735                        );
736                        None
737                    }
738                    Err(error) => {
739                        warn!(
740                            error = %eyre::Report::new(error),
741                            "local DKG ceremony was a failure",
742                        );
743                        Some((state.output.clone(), state.share.clone()))
744                    }
745                }
746            });
747
748            player_outcome.unwrap_or_else(move || {
749                match observe::<_, _, N3f1>(round.info().clone(), logs, &Sequential) {
750                    Ok(output) => {
751                        info!("local DKG ceremony was a success");
752                        (output, state::ShareState::Plaintext(None))
753                    }
754                    Err(error) => {
755                        warn!(
756                            error = %eyre::Report::new(error),
757                            "local DKG ceremony was a failure",
758                        );
759                        (state.output.clone(), state.share.clone())
760                    }
761                }
762            })
763        };
764
765        if local_output != onchain_outcome.output {
766            let am_player = onchain_outcome
767                .next_players
768                .position(&self.config.me.public_key())
769                .is_some();
770            warn!(
771                am_player,
772                "the output of the local DKG ceremony does not match what is \
773                on chain; something is terribly wrong; will try and participate \
774                in the next round (if a player), but if we are misbehaving and \
775                other nodes are blocking us it might be time to delete this node \
776                and spin up a new identity",
777            );
778            share = state::ShareState::Plaintext(None);
779        }
780
781        // Because we use cached data, we need to check for DKG success here:
782        // if the on-chain output is the same as the input into the loop (which
783        // is just state.output), then we know the DKG failed.
784        if onchain_outcome.output == state.output {
785            self.metrics.failures.inc();
786        } else {
787            self.metrics.successes.inc();
788        }
789
790        let syncers = read_syncers_if_v2_not_initialized_with_retry(
791            &self.context,
792            &self.config.execution_node,
793            &block,
794            &self.metrics.attempts_to_read_validator_contract,
795        )
796        .await
797        .wrap_err("failed reading contract to determine syncers")?;
798
799        Ok(Some(state::State {
800            epoch: onchain_outcome.epoch,
801            seed: Summary::random(&mut self.context),
802            output: onchain_outcome.output.clone(),
803            share,
804            players: onchain_outcome.next_players,
805            syncers,
806            is_full_dkg: onchain_outcome.is_next_full_dkg,
807        }))
808    }
809
810    /// Looks for and handles a finalized boundary block.
811    ///
812    /// Called if the DKG round if asked to skip ahead to the boundary block.
813    /// Does not consider any state for the current DKG round; just reads the
814    /// DKG outcome from the header and returns it.
815    #[instrument(
816        parent = &cause,
817        skip_all,
818        fields(
819            dkg.epoch = %round.epoch(),
820            block.height = %block.height(),
821            block.extra_data.bytes = block.header().extra_data().len(),
822        ),
823        err,
824    )]
825    async fn handle_finalized_boundary(
826        &mut self,
827        cause: Span,
828        round: &state::Round,
829        block: Block,
830    ) -> eyre::Result<Option<State>> {
831        let epoch_info = self
832            .config
833            .epoch_strategy
834            .containing(block.height())
835            .expect("epoch strategy is covering all block heights");
836
837        ensure!(
838            epoch_info.epoch() == round.epoch(),
839            "block was not for this epoch; must observe all blocks epoch by \
840            epoch; cannot deal with observing blocks out-of-order"
841        );
842
843        if block.height() != epoch_info.last() {
844            return Ok(None);
845        }
846
847        info!("found boundary block; reading DKG outcome from header");
848
849        let onchain_outcome = tempo_dkg_onchain_artifacts::OnchainDkgOutcome::read(
850            &mut block.header().extra_data().as_ref(),
851        )
852        .expect("the last block of an epoch must contain the DKG outcome");
853
854        info!("reading validators from contract");
855
856        // Only read syncers from the contract if t2 is not yet active and if
857        // val conf v2 has not yet been initialized.
858        let syncers = read_syncers_if_v2_not_initialized_with_retry(
859            &self.context,
860            &self.config.execution_node,
861            &block,
862            &self.metrics.attempts_to_read_validator_contract,
863        )
864        .await
865        .wrap_err("failed reading contract; must be able to read contract to continue")?;
866
867        Ok(Some(state::State {
868            epoch: onchain_outcome.epoch,
869            seed: Summary::random(&mut self.context),
870            output: onchain_outcome.output.clone(),
871            share: state::ShareState::Plaintext(None),
872            players: onchain_outcome.next_players,
873            syncers,
874            is_full_dkg: onchain_outcome.is_next_full_dkg,
875        }))
876    }
877
878    #[instrument(skip_all, fields(me = %self.config.me.public_key(), %epoch))]
879    async fn distribute_shares<TStorageContext, TSender>(
880        &self,
881        storage: &mut state::Storage<TStorageContext>,
882        epoch: Epoch,
883        dealer_state: &mut state::Dealer,
884        player_state: &mut Option<state::Player>,
885        round_channel: &mut TSender,
886    ) where
887        TStorageContext: commonware_runtime::Metrics + Clock + commonware_runtime::Storage,
888        TSender: Sender<PublicKey = PublicKey>,
889    {
890        let me = self.config.me.public_key();
891        for (player, pub_msg, priv_msg) in dealer_state.shares_to_distribute().collect::<Vec<_>>() {
892            if player == me {
893                if let Some(player_state) = player_state
894                    && let Ok(ack) = player_state
895                        .receive_dealing(storage, epoch, me.clone(), pub_msg, priv_msg)
896                        .await
897                        .inspect(|_| {
898                            self.metrics.shares_distributed.inc();
899                            self.metrics.shares_received.inc();
900                        })
901                        .inspect_err(|error| warn!(%error, "failed to store our own dealing"))
902                    && let Ok(()) = dealer_state
903                        .receive_ack(storage, epoch, me.clone(), ack)
904                        .await
905                        .inspect_err(|error| warn!(%error, "failed to store our own ACK"))
906                {
907                    self.metrics.acks_received.inc();
908                    self.metrics.acks_sent.inc();
909                    info!("stored our own ACK and share");
910                }
911            } else {
912                // Send to remote player
913                let payload = Message::Dealer(pub_msg, priv_msg).encode();
914                match round_channel
915                    .send(Recipients::One(player.clone()), payload, true)
916                    .await
917                {
918                    Ok(success) => {
919                        if success.is_empty() {
920                            // TODO(janis): figure out what it means if the response
921                            // is empty. Does it just mean the other party failed
922                            // to respond?
923                            info!(%player, "failed to send share");
924                        } else {
925                            self.metrics.shares_distributed.inc();
926                            info!(%player, "share sent");
927                        }
928                    }
929                    Err(error) => {
930                        warn!(%player, %error, "error sending share");
931                    }
932                }
933            }
934        }
935    }
936
937    #[instrument(
938        skip_all,
939        fields(
940            epoch = %round.epoch(),
941            %from,
942            bytes = message.len()),
943        err)]
944    #[expect(
945        clippy::too_many_arguments,
946        reason = "easiest way to express this for now"
947    )]
948    // TODO(janis): replace this by a struct?
949    async fn handle_network_msg<TStorageContext>(
950        &self,
951        round: &state::Round,
952        round_channel: &mut impl Sender<PublicKey = PublicKey>,
953        storage: &mut state::Storage<TStorageContext>,
954        dealer_state: Option<&mut state::Dealer>,
955        player_state: Option<&mut state::Player>,
956        from: PublicKey,
957        mut message: IoBuf,
958    ) -> eyre::Result<()>
959    where
960        TStorageContext: commonware_runtime::Metrics + Clock + commonware_runtime::Storage,
961    {
962        let msg = Message::read_cfg(&mut message, &NZU32!(round.players().len() as u32))
963            .wrap_err("failed reading p2p message")?;
964
965        match msg {
966            Message::Dealer(pub_msg, priv_msg) => {
967                if let Some(player_state) = player_state {
968                    info!("received message from a dealer");
969                    self.metrics.shares_received.inc();
970                    let ack = player_state
971                        .receive_dealing(storage, round.epoch(), from.clone(), pub_msg, priv_msg)
972                        .await
973                        .wrap_err("failed storing dealing")?;
974
975                    if let Err(error) = round_channel
976                        .send(
977                            Recipients::One(from.clone()),
978                            Message::Ack(ack).encode(),
979                            true,
980                        )
981                        .await
982                    {
983                        // FIXME(janis): the GATs in the Sender (and LimitedSender)
984                        // lead to `borrowed data escapes outside of method` errors.
985                        // `wrap_err` with early return does not work, and neither
986                        // does `Report::new` nor `&error as &dyn std::error::Error`.
987                        warn!(
988                            reason = ?error,
989                            "failed returning ACK to dealer",
990                        );
991                        bail!("failed returning ACK to dealer");
992                    }
993                    info!("returned ACK to dealer");
994                    self.metrics.acks_sent.inc();
995                } else {
996                    info!("received a dealer message, but we are not a player");
997                }
998            }
999            Message::Ack(ack) => {
1000                if let Some(dealer_state) = dealer_state {
1001                    info!("received an ACK");
1002                    self.metrics.acks_received.inc();
1003                    dealer_state
1004                        .receive_ack(storage, round.epoch(), from, ack)
1005                        .await
1006                        .wrap_err("failed storing ACK")?;
1007                } else {
1008                    info!("received an ACK, but we are not a dealer");
1009                }
1010            }
1011        }
1012        Ok(())
1013    }
1014
1015    /// Attempts to serve a `GetDkgOutcome` request by finalizing the DKG outcome.
1016    ///
1017    /// A DKG outcome can be finalized in one of the following cases:
1018    ///
1019    /// 1. if the DKG actor has observed as many dealer logs as there are dealers.
1020    /// 2. if all blocks in an epoch were observed (finalized + notarized leading
1021    /// up to `request.digest`).
1022    ///
1023    /// If the DKG was finalized this way, this method will return `None`.
1024    /// Otherwise will return `Some((digest, request))` if the block identified
1025    /// by `digest` was missing and needs to be fetched first to ensure all
1026    /// blocks in an epoch were observed.
1027    #[instrument(
1028        parent = cause,
1029        skip_all,
1030        fields(
1031            as_player = player_state.is_some(),
1032            our.epoch = %round.epoch(),
1033        ),
1034        err(level = Level::WARN),
1035    )]
1036    async fn handle_get_dkg_outcome<TStorageContext>(
1037        &mut self,
1038        cause: &Span,
1039        storage: &mut state::Storage<TStorageContext>,
1040        player_state: &Option<state::Player>,
1041        round: &state::Round,
1042        state: &State,
1043        request: GetDkgOutcome,
1044    ) -> eyre::Result<Option<(Digest, GetDkgOutcome)>>
1045    where
1046        TStorageContext: commonware_runtime::Metrics + Clock + commonware_runtime::Storage,
1047    {
1048        let epoch_info = self
1049            .config
1050            .epoch_strategy
1051            .containing(request.height)
1052            .expect("our strategy covers all epochs");
1053
1054        ensure!(
1055            round.epoch() == epoch_info.epoch(),
1056            "request is for epoch `{}`, not our epoch",
1057            epoch_info.epoch(),
1058        );
1059
1060        let output = if let Some((output, _)) = storage
1061            .get_dkg_outcome(&state.epoch, &request.digest)
1062            .cloned()
1063        {
1064            output
1065        } else {
1066            let mut logs = storage
1067                .logs_for_epoch(round.epoch())
1068                .map(|(k, v)| (k.clone(), v.clone()))
1069                .collect::<BTreeMap<_, _>>();
1070
1071            'ensure_enough_logs: {
1072                if logs.len() == round.dealers().len() {
1073                    info!("collected as many logs as there are dealers; concluding DKG");
1074                    break 'ensure_enough_logs;
1075                }
1076
1077                info!(
1078                    "did not have all dealer logs yet; will try to extend with \
1079                    logs read from notarized blocks and concluding DKG that way",
1080                );
1081                let (mut height, mut digest) = (request.height, request.digest);
1082                while height >= epoch_info.first()
1083                    && Some(height)
1084                        >= storage
1085                            .get_latest_finalized_block_for_epoch(&round.epoch())
1086                            .map(|(_, info)| info.height)
1087                {
1088                    if let Some(block) =
1089                        storage.get_notarized_reduced_block(&round.epoch(), &digest)
1090                    {
1091                        logs.extend(block.log.clone());
1092                        height = if let Some(height) = block.height.previous() {
1093                            height
1094                        } else {
1095                            break 'ensure_enough_logs;
1096                        };
1097                        digest = block.parent;
1098                    } else {
1099                        return Ok(Some((digest, request)));
1100                    }
1101                }
1102            }
1103
1104            // Create a player-state ad hoc: the DKG player object is not
1105            // cloneable, and finalizing consumes it.
1106            let player_state = player_state.is_some().then(||
1107                storage
1108                        .create_player_for_round(self.config.me.clone(), round)
1109                        .expect("created a player instance before, must be able to create it again")
1110                        .expect("did not return a player instance even though we created it for this round already")
1111            );
1112
1113            let (output, share) = {
1114                let player_outcome = player_state.and_then(|player| {
1115                    info!("we were a player in the ceremony; finalizing share");
1116                    match player.finalize(logs.clone(), &Sequential) {
1117                        Ok((new_output, new_share)) => {
1118                            info!("local DKG ceremony was a success");
1119                            Some((new_output, state::ShareState::Plaintext(Some(new_share))))
1120                        }
1121                        Err(
1122                            reason
1123                            @ commonware_cryptography::bls12381::dkg::Error::MissingPlayerDealing,
1124                        ) => {
1125                            warn!(
1126                                reason = %eyre::Report::new(reason),
1127                                "missing critical DKG state to reconstruct a share in this epoch; has \
1128                                consensus state been deleted or a node with the same identity started \
1129                                without consensus state? Finalizing the current round as an observer \
1130                                and will not have a share in the next epoch"
1131                            );
1132                            None
1133                        }
1134                        Err(error) => {
1135                            warn!(
1136                                error = %eyre::Report::new(error),
1137                                "local DKG ceremony was a failure",
1138                            );
1139                            Some((state.output.clone(), state.share.clone()))
1140                        }
1141                    }
1142                });
1143
1144                player_outcome.unwrap_or_else(move || {
1145                    match observe::<_, _, N3f1>(round.info().clone(), logs, &Sequential) {
1146                        Ok(output) => {
1147                            info!("local DKG ceremony was a success");
1148                            (output, state::ShareState::Plaintext(None))
1149                        }
1150                        Err(error) => {
1151                            warn!(
1152                                error = %eyre::Report::new(error),
1153                                "local DKG ceremony was a failure",
1154                            );
1155                            (state.output.clone(), state.share.clone())
1156                        }
1157                    }
1158                })
1159            };
1160
1161            storage.cache_dkg_outcome(state.epoch, request.digest, output.clone(), share);
1162            output
1163        };
1164
1165        // Check if next ceremony should be full.
1166        let next_epoch = state.epoch.next();
1167        let will_be_re_dkg = read_re_dkg_epoch(&self.config.execution_node, request.digest)
1168            // in theory it should never fail, but if it does, just stick to reshare.
1169            .is_ok_and(|epoch| epoch == next_epoch.get());
1170        info!(
1171            will_be_re_dkg,
1172            %next_epoch,
1173            "determined if the next epoch will be a reshare or full re-dkg process",
1174        );
1175
1176        let next_players =
1177            determine_next_players_at_hash(state, &self.config.execution_node, request.digest.0)
1178                .wrap_err("could not determine who the next players are supposed to be")?;
1179        request
1180            .response
1181            .send(OnchainDkgOutcome {
1182                epoch: next_epoch,
1183                output,
1184                next_players,
1185                is_next_full_dkg: will_be_re_dkg,
1186            })
1187            .map_err(|_| {
1188                eyre!("requester went away before speculative DKG outcome could be sent")
1189            })?;
1190
1191        Ok(None)
1192    }
1193
1194    #[instrument(skip_all, fields(epoch = %state.epoch), err(level = Level::WARN))]
1195    fn enter_epoch(&mut self, state: &state::State) -> eyre::Result<()> {
1196        self.config
1197            .epoch_manager
1198            .enter(
1199                state.epoch,
1200                state.output.public().clone(),
1201                state.share.clone().into_inner(),
1202                state.dealers().clone(),
1203            )
1204            .wrap_err("could not instruct epoch manager to enter epoch")
1205    }
1206
1207    #[instrument(skip_all, fields(epoch = %state.epoch), err(level = Level::WARN))]
1208    fn exit_epoch(&mut self, state: &state::State) -> eyre::Result<()> {
1209        self.config
1210            .epoch_manager
1211            .exit(state.epoch)
1212            .wrap_err("could not instruct epoch manager to enter epoch")
1213    }
1214}
1215
1216#[instrument(skip_all, err)]
1217async fn read_initial_state_and_set_floor<TContext>(
1218    context: &mut TContext,
1219    node: &TempoFullNode,
1220    share: Option<Share>,
1221    epoch_strategy: &FixedEpocher,
1222    marshal: &mut crate::alias::marshal::Mailbox,
1223) -> eyre::Result<State>
1224where
1225    TContext: Clock + CryptoRngCore,
1226{
1227    let latest_finalized = node
1228        .provider
1229        .finalized_block_num_hash()
1230        .wrap_err("unable to read highest finalized block from execution layer")?
1231        .unwrap_or_else(|| BlockNumHash::new(0, node.chain_spec().genesis_hash()));
1232
1233    let epoch_info = epoch_strategy
1234        .containing(Height::new(latest_finalized.number))
1235        .expect("epoch strategy is for all heights");
1236
1237    let latest_boundary = if epoch_info.last().get() == latest_finalized.number {
1238        latest_finalized.number
1239    } else {
1240        epoch_info
1241            .epoch()
1242            .previous()
1243            .map_or_else(Height::zero, |previous| {
1244                epoch_strategy
1245                    .last(previous)
1246                    .expect("epoch strategy is for all epochs")
1247            })
1248            .get()
1249    };
1250    info!(
1251        %latest_boundary,
1252        latest_finalized.number,
1253        %latest_finalized.hash,
1254        "execution layer reported newest available block, reading on-chain \
1255        DKG outcome from last boundary height, and validator state from newest \
1256        block"
1257    );
1258
1259    let boundary_header = node
1260        .provider
1261        .header_by_number(latest_boundary)
1262        .map_or_else(
1263            |e| Err(eyre::Report::new(e)),
1264            |header| header.ok_or_eyre("execution layer reported it had no header"),
1265        )
1266        .wrap_err_with(|| {
1267            format!("failed to read header for latest boundary block number `{latest_boundary}`")
1268        })?;
1269
1270    let parent_of_boundary_header = node
1271        .provider
1272        .header_by_number(latest_boundary.saturating_sub(1))
1273        .map_or_else(
1274            |e| Err(eyre::Report::new(e)),
1275            |header| header.ok_or_eyre("execution layer reported it had no header"),
1276        )
1277        .wrap_err_with(|| {
1278            format!(
1279                "failed to read header for the boundary's parent `{}`",
1280                latest_boundary.saturating_sub(1)
1281            )
1282        })?;
1283
1284    let onchain_outcome = tempo_dkg_onchain_artifacts::OnchainDkgOutcome::read(
1285        &mut boundary_header.extra_data().as_ref(),
1286    )
1287    .wrap_err("the boundary header did not contain the on-chain DKG outcome")?;
1288
1289    // NOTE: On init this is intended to fail fast: if a consensus state exists
1290    // (such that there is a mismatch between CL and EL finalized block
1291    // requiring a backfill and hence a retry), then
1292    // `read_initial_state_and_set_floor` is never called.
1293    //
1294    // If a consensus state does not exist, then either EL has access to the
1295    // state at the boundary or it does not. There is no mechanism that would
1296    // allow it to get the block.
1297    let syncers = read_syncers_if_v2_not_initialized(
1298        1,
1299        node,
1300        parent_of_boundary_header.hash_slow(),
1301        boundary_header.hash_slow(),
1302    )
1303    .wrap_err("failed determining syncers")?;
1304
1305    let share = state::ShareState::Plaintext('verify_initial_share: {
1306        let Some(share) = share else {
1307            break 'verify_initial_share None;
1308        };
1309        let Ok(partial) = onchain_outcome.sharing().partial_public(share.index) else {
1310            warn!(
1311                "the index of the provided share exceeds the polynomial of the \
1312                on-chain DKG outcome; ignoring the share"
1313            );
1314            break 'verify_initial_share None;
1315        };
1316        if share.public::<MinSig>() != partial {
1317            warn!(
1318                "the provided share does not match the polynomial of the \
1319                on-chain DKG outcome; ignoring the share"
1320            );
1321            break 'verify_initial_share None;
1322        }
1323        Some(share)
1324    });
1325
1326    info!(%latest_boundary, "setting sync floor");
1327    marshal.set_floor(Height::new(latest_boundary)).await;
1328
1329    Ok(State {
1330        epoch: onchain_outcome.epoch,
1331        seed: Summary::random(context),
1332        output: onchain_outcome.output.clone(),
1333        share,
1334        players: onchain_outcome.next_players,
1335        syncers,
1336        is_full_dkg: onchain_outcome.is_next_full_dkg,
1337    })
1338}
1339
1340#[derive(Clone)]
1341struct Metrics {
1342    shares_distributed: Gauge,
1343    shares_received: Gauge,
1344    acks_received: Gauge,
1345    acks_sent: Gauge,
1346    dealings_read: Gauge,
1347    bad_dealings: Gauge,
1348
1349    failures: Counter,
1350    successes: Counter,
1351
1352    dealers: Gauge,
1353    players: Gauge,
1354    syncing_players: Gauge,
1355
1356    how_often_dealer: Counter,
1357    how_often_player: Counter,
1358
1359    rounds_skipped: Counter,
1360    attempts_to_read_validator_contract: Counter,
1361}
1362
1363impl Metrics {
1364    fn init<TContext>(context: &TContext) -> Self
1365    where
1366        TContext: commonware_runtime::Metrics,
1367    {
1368        let syncing_players = Gauge::default();
1369        context.register(
1370            "syncing_players",
1371            "how many syncing players were registered; these will become players in the next ceremony",
1372            syncing_players.clone(),
1373        );
1374
1375        let failures = Counter::default();
1376        context.register(
1377            "ceremony_failures",
1378            "the number of failed ceremonies a node participated in",
1379            failures.clone(),
1380        );
1381
1382        let successes = Counter::default();
1383        context.register(
1384            "ceremony_successes",
1385            "the number of successful ceremonies a node participated in",
1386            successes.clone(),
1387        );
1388
1389        let dealers = Gauge::default();
1390        context.register(
1391            "ceremony_dealers",
1392            "the number of dealers in the currently running ceremony",
1393            dealers.clone(),
1394        );
1395        let players = Gauge::default();
1396        context.register(
1397            "ceremony_players",
1398            "the number of players in the currently running ceremony",
1399            players.clone(),
1400        );
1401
1402        let how_often_dealer = Counter::default();
1403        context.register(
1404            "how_often_dealer",
1405            "number of the times as node was active as a dealer",
1406            how_often_dealer.clone(),
1407        );
1408        let how_often_player = Counter::default();
1409        context.register(
1410            "how_often_player",
1411            "number of the times as node was active as a player",
1412            how_often_player.clone(),
1413        );
1414
1415        let shares_distributed = Gauge::default();
1416        context.register(
1417            "ceremony_shares_distributed",
1418            "the number of shares distributed by this node as a dealer in the current ceremony",
1419            shares_distributed.clone(),
1420        );
1421
1422        let shares_received = Gauge::default();
1423        context.register(
1424            "ceremony_shares_received",
1425            "the number of shares received by this node as a player in the current ceremony",
1426            shares_received.clone(),
1427        );
1428
1429        let acks_received = Gauge::default();
1430        context.register(
1431            "ceremony_acks_received",
1432            "the number of acknowledgments received by this node as a dealer in the current ceremony",
1433            acks_received.clone(),
1434        );
1435
1436        let acks_sent = Gauge::default();
1437        context.register(
1438            "ceremony_acks_sent",
1439            "the number of acknowledgments sent by this node as a player in the current ceremony",
1440            acks_sent.clone(),
1441        );
1442
1443        let dealings_read = Gauge::default();
1444        context.register(
1445            "ceremony_dealings_read",
1446            "the number of dealings read from the blockchain in the current ceremony",
1447            dealings_read.clone(),
1448        );
1449
1450        let bad_dealings = Gauge::default();
1451        context.register(
1452            "ceremony_bad_dealings",
1453            "the number of blocks where decoding and verifying dealings failed in the current ceremony",
1454            bad_dealings.clone(),
1455        );
1456
1457        let rounds_skipped = Counter::default();
1458        context.register(
1459            "rounds_skipped",
1460            "how many DKG rounds were skipped because the node fell too far behind and tried to catch up",
1461            rounds_skipped.clone(),
1462        );
1463
1464        let attempts_to_read_validator_contract = Counter::default();
1465        context.register(
1466            "attempts_to_read_validator_contract",
1467            "the total number of attempts it took to read the validators from the smart contract",
1468            attempts_to_read_validator_contract.clone(),
1469        );
1470
1471        Self {
1472            syncing_players,
1473            shares_distributed,
1474            shares_received,
1475            acks_received,
1476            acks_sent,
1477            dealings_read,
1478            bad_dealings,
1479            dealers,
1480            players,
1481            how_often_dealer,
1482            how_often_player,
1483            failures,
1484            successes,
1485            rounds_skipped,
1486            attempts_to_read_validator_contract,
1487        }
1488    }
1489
1490    fn reset(&self) {
1491        self.shares_distributed.set(0);
1492        self.shares_received.set(0);
1493        self.acks_received.set(0);
1494        self.acks_sent.set(0);
1495        self.dealings_read.set(0);
1496        self.bad_dealings.set(0);
1497    }
1498}
1499
1500/// A wrapper around [`marshal::ancestry::AncestorStream`] wrapped in
1501/// an option to make it easier to work with select macros.
1502///
1503/// Invariants: if the inner stream is set, then the matching original request
1504/// is also set.
1505struct AncestorStream {
1506    pending_request: Option<(Span, GetDkgOutcome)>,
1507    inner: Option<marshal::ancestry::AncestorStream<crate::alias::marshal::Mailbox, Block>>,
1508}
1509
1510impl AncestorStream {
1511    fn new() -> Self {
1512        Self {
1513            pending_request: None,
1514            inner: None,
1515        }
1516    }
1517
1518    fn take_request(&mut self) -> Option<(Span, GetDkgOutcome)> {
1519        self.inner.take();
1520        self.pending_request.take()
1521    }
1522
1523    fn set(
1524        &mut self,
1525        pending_request: (Span, GetDkgOutcome),
1526        stream: marshal::ancestry::AncestorStream<crate::alias::marshal::Mailbox, Block>,
1527    ) {
1528        self.pending_request.replace(pending_request);
1529        self.inner.replace(stream);
1530    }
1531
1532    fn tip(&self) -> Option<Digest> {
1533        self.pending_request.as_ref().map(|(_, req)| req.digest)
1534    }
1535
1536    fn update_receiver(&mut self, pending_request: (Span, GetDkgOutcome)) {
1537        self.pending_request.replace(pending_request);
1538    }
1539}
1540
1541impl Stream for AncestorStream {
1542    type Item = Block;
1543
1544    fn poll_next(
1545        mut self: std::pin::Pin<&mut Self>,
1546        cx: &mut std::task::Context<'_>,
1547    ) -> std::task::Poll<Option<Self::Item>> {
1548        let item = {
1549            let this = match self.inner.as_mut() {
1550                Some(inner) => inner,
1551                None => return Poll::Ready(None),
1552            };
1553            this.poll_next_unpin(cx)
1554        };
1555        match futures::ready!(item) {
1556            None => {
1557                self.inner.take();
1558                Poll::Ready(None)
1559            }
1560            Some(block) => Poll::Ready(Some(block)),
1561        }
1562    }
1563}
1564
1565impl FusedStream for AncestorStream {
1566    fn is_terminated(&self) -> bool {
1567        self.inner.is_none()
1568    }
1569}
1570
1571fn read_dealer_log(
1572    mut bytes: &[u8],
1573    round: &state::Round,
1574) -> eyre::Result<(PublicKey, DealerLog<MinSig, PublicKey>)> {
1575    let signed_log = dkg::SignedDealerLog::<MinSig, PrivateKey>::read_cfg(
1576        &mut bytes,
1577        &NZU32!(round.players().len() as u32),
1578    )
1579    .wrap_err("could not decode as signed dealer log")?;
1580
1581    let (dealer, log) = signed_log
1582        .check(round.info())
1583        .ok_or_eyre("failed checking signed log against current round")?;
1584    Ok((dealer, log))
1585}
1586
1587/// Reads syncing validators if the V2 contract is not yet initialized.
1588async fn read_syncers_if_v2_not_initialized_with_retry(
1589    context: &impl commonware_runtime::Clock,
1590    node: &TempoFullNode,
1591    boundary_block: &Block,
1592    total_attempts: &Counter,
1593) -> eyre::Result<ordered::Set<PublicKey>> {
1594    let mut attempts = 0;
1595    const MIN_RETRY: Duration = Duration::from_secs(1);
1596    const MAX_RETRY: Duration = Duration::from_secs(30);
1597
1598    let parent_hash = boundary_block.parent_hash();
1599    let block_hash = boundary_block.hash();
1600    'read_contract: loop {
1601        total_attempts.inc();
1602        attempts += 1;
1603
1604        if let Ok(syncers) =
1605            read_syncers_if_v2_not_initialized(attempts, node, parent_hash, block_hash)
1606        {
1607            break 'read_contract Ok(syncers);
1608        }
1609
1610        let retry_after = MIN_RETRY.saturating_mul(attempts).min(MAX_RETRY);
1611        let is_syncing = node.network.is_syncing();
1612        let best_finalized_block = node.provider.finalized_block_number().ok().flatten();
1613        let blocks_behind = best_finalized_block
1614            .as_ref()
1615            .map(|best| boundary_block.number().saturating_sub(*best));
1616        tracing::warn_span!("read_validator_config_with_retry").in_scope(|| {
1617            warn!(
1618                attempts,
1619                retry_after = %tempo_telemetry_util::display_duration(retry_after),
1620                is_syncing,
1621                best_finalized_block = %tempo_telemetry_util::display_option(&best_finalized_block),
1622                height_if_v1 = boundary_block.number(),
1623                blocks_behind = %tempo_telemetry_util::display_option(&blocks_behind),
1624                "reading validator config from contract failed; will retry",
1625            );
1626        });
1627        context.sleep(retry_after).await;
1628    }
1629}
1630
1631/// Reads the pre-t2 hardfork syncers from the v1 contract.
1632///
1633/// If the validator config v2 contract is already initialized, then this
1634/// returns an empty set because after the hardfork syncers do not need to be
1635/// tracked.
1636///
1637/// IMPORTANT: it is expected that this function is called on boundary blocks.
1638/// Post-T2 hardfork, the next players are determined from the V2 smart contract
1639/// on the boundary block's parent(!) block, not on the boundary block itself.
1640///
1641/// Therefore, this function checks if the hardfork already happened by the
1642/// timestamp of the block identified by `parent_hash`, and likewise if the
1643/// contract was initialized at the state of `parent_hash`.
1644#[instrument(
1645    skip_all,
1646    fields(
1647        attempt = _attempt,
1648        parent_hash,
1649        block_hash,
1650    ),
1651    err
1652)]
1653pub(crate) fn read_syncers_if_v2_not_initialized(
1654    _attempt: u32,
1655    node: &TempoFullNode,
1656    parent_hash: B256,
1657    block_hash: B256,
1658) -> eyre::Result<ordered::Set<PublicKey>> {
1659    if can_use_v2_at_block_hash(node, parent_hash, Some(block_hash))
1660        .wrap_err("unable to determine if the validator config v2 can be used or not")?
1661    {
1662        return Ok(ordered::Set::default());
1663    }
1664    let peers =
1665        read_active_and_known_peers_at_block_hash_v1(node, &ordered::Set::default(), block_hash)
1666            .wrap_err("unable to read peers from validator config v1 contract")?;
1667    info!(?peers, "read validators from validator config v1 contract");
1668    Ok(peers.into_keys())
1669}
1670
1671/// Determines the next players depending on the header timestamp identified by `digest`.
1672///
1673/// This function should only be used when constructing or verifying a proposal.
1674/// `digest` should therefore always refer to the parent parent of the proposal.
1675///
1676/// If the execution layer does not have a block corresponding to `digest`
1677/// available then it cannot propose or verify a block.
1678#[instrument(skip_all, fields(%hash), err(level = Level::WARN))]
1679fn determine_next_players_at_hash(
1680    state: &State,
1681    node: &TempoFullNode,
1682    hash: B256,
1683) -> eyre::Result<ordered::Set<PublicKey>> {
1684    let next_players = if can_use_v2_at_block_hash(node, hash, None)
1685        .wrap_err("failed determining if validator config v2 can be used")?
1686    {
1687        debug!("reading next players from validator config v2 contract");
1688        read_active_and_known_peers_at_block_hash_v2(node, &ordered::Set::default(), hash)
1689            .wrap_err("failed reading peers from  validator config v2")?
1690            .into_keys()
1691    } else {
1692        debug!("using validator config v1 syncers from state");
1693        state.syncers.clone()
1694    };
1695
1696    debug!(?next_players, "determined next players");
1697    Ok(next_players)
1698}
1699
1700/// Reads the `nextFullDkgCeremony` epoch value from one of the validator config contracts.
1701///
1702/// This is used to determine if the next DKG ceremony should be a full ceremony
1703/// (new polynomial) instead of a reshare.
1704///
1705/// This function should only be used when constructing or verifying a proposal.
1706/// `digest` should therefore always refer to the parent parent of the proposal.
1707///
1708/// If the execution layer does not have a block corresponding to `digest`
1709/// available then it cannot propose or verify a block.
1710#[instrument(
1711    skip_all,
1712    fields(
1713        %digest,
1714    ),
1715    err(level = Level::WARN)
1716    ret,
1717)]
1718pub(crate) fn read_re_dkg_epoch(node: &TempoFullNode, digest: Digest) -> eyre::Result<u64> {
1719    if can_use_v2_at_block_hash(node, digest.0, None)
1720        .wrap_err("failed determining if validator config v2 can be used")?
1721    {
1722        read_validator_config_at_block_hash(node, digest.0, |config: &ValidatorConfigV2| {
1723            config
1724                .get_next_network_identity_rotation_epoch()
1725                .map_err(eyre::Report::new)
1726        })
1727        .map(|(_, _, epoch)| epoch)
1728    } else {
1729        read_validator_config_at_block_hash(node, digest.0, |config: &ValidatorConfig| {
1730            config
1731                .get_next_full_dkg_ceremony()
1732                .map_err(eyre::Report::new)
1733        })
1734        .map(|(_, _, epoch)| epoch)
1735    }
1736}