Skip to main content

tempo_consensus/dkg/manager/actor/
mod.rs

1use std::{collections::BTreeMap, num::NonZeroU32, task::Poll};
2
3use alloy_consensus::BlockHeader as _;
4use alloy_primitives::B256;
5use bytes::{Buf, BufMut};
6use commonware_codec::{Encode as _, EncodeSize, Read, ReadExt as _, Write};
7use commonware_consensus::{
8    Heightable as _,
9    marshal::{self, Update},
10    types::{Epoch, EpochPhase, Epocher as _, FixedEpocher, Height},
11};
12use commonware_cryptography::{
13    Signer as _,
14    bls12381::{
15        dkg::{
16            self, DealerLog, DealerPrivMsg, DealerPubMsg, Logs, PlayerAck, SignedDealerLog, observe,
17        },
18        primitives::{group::Share, variant::MinSig},
19    },
20    ed25519::{self, PrivateKey, PublicKey},
21    transcript::Summary,
22};
23use commonware_math::algebra::Random as _;
24use commonware_p2p::{
25    Receiver, Recipients, Sender,
26    utils::mux::{self, MuxHandle},
27};
28use commonware_parallel::Sequential;
29use commonware_runtime::{Clock, ContextCell, Handle, IoBuf, Metrics as _, Spawner, spawn_cell};
30use commonware_utils::{Acknowledgement, N3f1, NZU32, ordered};
31
32use eyre::{OptionExt as _, WrapErr as _, bail, ensure, eyre};
33use futures::{
34    FutureExt as _, Stream, StreamExt as _, channel::mpsc, select_biased, stream::FusedStream,
35};
36use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
37use rand_core::CryptoRngCore;
38use reth_ethereum::{chainspec::EthChainSpec, rpc::eth::primitives::BlockNumHash};
39use reth_provider::{BlockIdReader as _, HeaderProvider as _};
40use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
41use tempo_node::TempoFullNode;
42use tempo_precompiles::validator_config_v2::ValidatorConfigV2;
43use tracing::{Level, Span, debug, info, info_span, instrument, warn, warn_span};
44
45use crate::{
46    consensus::{Digest, block::Block},
47    validators::{read_active_and_known_peers_at_block_hash, read_validator_config_at_block_hash},
48};
49
50mod state;
51use state::State;
52
53use super::{
54    Command,
55    ingress::{GetDkgOutcome, VerifyDealerLog},
56};
57
58/// Wire message type for DKG protocol communication.
59pub(crate) enum Message {
60    /// A dealer message containing public and private components for a player.
61    Dealer(DealerPubMsg<MinSig>, DealerPrivMsg),
62    /// A player acknowledgment sent back to a dealer.
63    Ack(PlayerAck<PublicKey>),
64}
65
66impl Write for Message {
67    fn write(&self, writer: &mut impl BufMut) {
68        match self {
69            Self::Dealer(pub_msg, priv_msg) => {
70                0u8.write(writer);
71                pub_msg.write(writer);
72                priv_msg.write(writer);
73            }
74            Self::Ack(ack) => {
75                1u8.write(writer);
76                ack.write(writer);
77            }
78        }
79    }
80}
81
82impl EncodeSize for Message {
83    fn encode_size(&self) -> usize {
84        1 + match self {
85            Self::Dealer(pub_msg, priv_msg) => pub_msg.encode_size() + priv_msg.encode_size(),
86            Self::Ack(ack) => ack.encode_size(),
87        }
88    }
89}
90
91impl Read for Message {
92    type Cfg = NonZeroU32;
93
94    fn read_cfg(reader: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
95        let tag = u8::read(reader)?;
96        match tag {
97            0 => {
98                let pub_msg = DealerPubMsg::read_cfg(reader, cfg)?;
99                let priv_msg = DealerPrivMsg::read(reader)?;
100                Ok(Self::Dealer(pub_msg, priv_msg))
101            }
102            1 => {
103                let ack = PlayerAck::read(reader)?;
104                Ok(Self::Ack(ack))
105            }
106            other => Err(commonware_codec::Error::InvalidEnum(other)),
107        }
108    }
109}
110
111pub(crate) struct Actor<TContext>
112where
113    TContext: Clock + commonware_runtime::Metrics + commonware_runtime::Storage,
114{
115    /// The actor configuration passed in when constructing the actor.
116    config: super::Config,
117
118    /// The runtime context passed in when constructing the actor.
119    context: ContextCell<TContext>,
120
121    /// The channel over which the actor will receive messages.
122    mailbox: mpsc::UnboundedReceiver<super::Message>,
123
124    /// Handles to the metrics objects that the actor will update during its
125    /// runtime.
126    metrics: Metrics,
127}
128
129impl<TContext> Actor<TContext>
130where
131    TContext: commonware_runtime::BufferPooler
132        + Clock
133        + CryptoRngCore
134        + commonware_runtime::Metrics
135        + Spawner
136        + commonware_runtime::Storage,
137{
138    pub(super) async fn new(
139        config: super::Config,
140        context: TContext,
141        mailbox: mpsc::UnboundedReceiver<super::ingress::Message>,
142    ) -> eyre::Result<Self> {
143        let context = ContextCell::new(context);
144
145        let metrics = Metrics::init(&context);
146
147        Ok(Self {
148            config,
149            context,
150            mailbox,
151            metrics,
152        })
153    }
154
155    pub(crate) fn start(
156        mut self,
157        dkg_channel: (
158            impl Sender<PublicKey = PublicKey>,
159            impl Receiver<PublicKey = PublicKey>,
160        ),
161    ) -> Handle<()> {
162        spawn_cell!(self.context, self.run(dkg_channel))
163    }
164
165    async fn run(
166        mut self,
167        (sender, receiver): (
168            impl Sender<PublicKey = PublicKey>,
169            impl Receiver<PublicKey = PublicKey>,
170        ),
171    ) {
172        let Ok(mut storage) = state::builder()
173            .partition_prefix(&self.config.partition_prefix)
174            .initial_state({
175                let mut context = self.context.clone();
176                let execution_node = self.config.execution_node.clone();
177                let initial_share = self.config.initial_share.clone();
178                let epoch_strategy = self.config.epoch_strategy.clone();
179                let mut marshal = self.config.marshal.clone();
180                async move {
181                    read_initial_state_and_set_floor(
182                        &mut context,
183                        &execution_node,
184                        initial_share.clone(),
185                        &epoch_strategy,
186                        &mut marshal,
187                    )
188                    .await
189                }
190            })
191            .init(self.context.with_label("state"))
192            .await
193        else {
194            // NOTE: Builder::init emits en error event.
195            return;
196        };
197
198        let (mux, mut dkg_mux) = mux::Muxer::new(
199            self.context.with_label("dkg_mux"),
200            sender,
201            receiver,
202            self.config.mailbox_size,
203        );
204        mux.start();
205
206        let reason = loop {
207            if let Err(error) = self.run_dkg_loop(&mut storage, &mut dkg_mux).await {
208                break error;
209            }
210        };
211
212        tracing::warn_span!("dkg_actor").in_scope(|| {
213            warn!(
214                %reason,
215                "actor exited",
216            );
217        });
218    }
219
220    async fn run_dkg_loop<TStorageContext, TSender, TReceiver>(
221        &mut self,
222        storage: &mut state::Storage<TStorageContext>,
223        mux: &mut MuxHandle<TSender, TReceiver>,
224    ) -> eyre::Result<()>
225    where
226        TStorageContext: commonware_runtime::Metrics + Clock + commonware_runtime::Storage,
227        TSender: Sender<PublicKey = PublicKey>,
228        TReceiver: Receiver<PublicKey = PublicKey>,
229    {
230        let state = storage.current();
231
232        self.metrics.reset();
233
234        self.metrics.dealers.set(state.dealers().len() as i64);
235        self.metrics.players.set(state.players().len() as i64);
236
237        if let Some(previous) = state.epoch.previous() {
238            // NOTE: State::prune emits an error event.
239            storage.prune(previous).await.wrap_err_with(|| {
240                format!("unable to prune storage before up until epoch `{previous}`",)
241            })?;
242        }
243
244        self.enter_epoch(&state)
245            .wrap_err("could not instruct epoch manager to enter a new epoch")?;
246
247        // TODO: emit an event with round info
248        let round = state::Round::from_state(&state, &self.config.namespace);
249
250        let mut dealer_state = storage
251            .create_dealer_for_round(
252                self.config.me.clone(),
253                round.clone(),
254                state.share.clone(),
255                state.seed,
256            )
257            .wrap_err("unable to instantiate dealer state")?;
258
259        if dealer_state.is_some() {
260            self.metrics.how_often_dealer.inc();
261        }
262
263        let mut player_state = storage
264            .create_player_for_round(self.config.me.clone(), &round)
265            .wrap_err("unable to instantiate player state")?;
266
267        if player_state.is_some() {
268            self.metrics.how_often_player.inc();
269        }
270
271        // Register a channel for this round
272        let (mut round_sender, mut round_receiver) =
273            mux.register(state.epoch.get()).await.wrap_err_with(|| {
274                format!(
275                    "unable to create subchannel for this DKG ceremony of epoch `{}`",
276                    state.epoch
277                )
278            })?;
279
280        let mut ancestry_stream = AncestorStream::new();
281
282        info_span!("run_dkg_loop", epoch = %state.epoch).in_scope(|| {
283            info!(
284                me = %self.config.me.public_key(),
285                dealers = ?state.dealers(),
286                players = ?state.players(),
287                as_dealer = dealer_state.is_some(),
288                as_player = player_state.is_some(),
289                "entering a new DKG ceremony",
290            )
291        });
292
293        let mut skip_to_boundary = false;
294        loop {
295            let mut shutdown = self.context.stopped().fuse();
296            select_biased!(
297
298                _ = &mut shutdown => {
299                    break Err(eyre!("shutdown triggered"));
300                }
301
302                network_msg = round_receiver.recv().fuse() => {
303                    match network_msg {
304                        Ok((sender, message)) => {
305                            // Produces an error event.
306                            let _ = self.handle_network_msg(
307                                &round,
308                                &mut round_sender,
309                                storage,
310                                dealer_state.as_mut(),
311                                player_state.as_mut(),
312                                sender,
313                                message,
314                            ).await;
315                        }
316                        Err(err) => {
317                            break Err(err).wrap_err("network p2p subchannel closed")
318                        }
319                    }
320                }
321
322                msg = self.mailbox.next() => {
323                    let Some(msg) = msg else {
324                        break Err(eyre!("all instances of the DKG actor's mailbox are dropped"));
325                    };
326
327                    match msg.command {
328                        Command::Update(update) => {
329                            match *update {
330                                Update::Tip(_, height, _) => {
331                                    if !skip_to_boundary {
332                                        skip_to_boundary |= self.should_skip_round(
333                                            &round,
334                                            height,
335                                        ).await;
336                                        if skip_to_boundary {
337                                            self.metrics.rounds_skipped.inc();
338                                        }
339                                    }
340                                }
341                                Update::Block(block, ack) => {
342                                    let res = if skip_to_boundary {
343                                        self.handle_finalized_boundary(
344                                            msg.cause,
345                                            &round,
346                                            block,
347                                        ).await
348                                    } else {
349                                        self.handle_finalized_block(
350                                            msg.cause,
351                                            &state,
352                                            &round,
353                                            &mut round_sender,
354                                            storage,
355                                            &mut dealer_state,
356                                            &mut player_state,
357                                            block,
358                                        ).await
359                                    };
360                                    let should_break = match res {
361                                        Ok(Some(new_state)) => {
362                                            info_span!(
363                                                "run_dkg_loop",
364                                                epoch = %state.epoch
365                                            ).in_scope(|| info!(
366                                                "constructed a new epoch state; \
367                                                persisting new state and exiting \
368                                                current epoch",
369                                            ));
370
371                                            if let Err(err) = storage
372                                                .set_state(new_state)
373                                                .await
374                                                .wrap_err("failed appending new state to journal")
375                                            {
376                                                break Err(err);
377                                            }
378                                            // Emits an error event.
379                                            let _ = self.exit_epoch(&state);
380
381                                            true
382                                        }
383                                        Ok(None) => false,
384                                        Err(err) => break Err(err).wrap_err("failed handling finalized block"),
385                                    };
386                                    ack.acknowledge();
387                                    if should_break {
388                                        break Ok(());
389                                    }
390                                }
391                            }
392                        }
393
394                        Command::GetDealerLog(get_dealer_log) => {
395                            warn_span!("get_dealer_log").in_scope(|| {
396                                let log = if get_dealer_log.epoch != round.epoch() {
397                                    warn!(
398                                        request.epoch = %get_dealer_log.epoch,
399                                        round.epoch = %round.epoch(),
400                                        "application requested dealer log for \
401                                        an epoch other than we are currently \
402                                        running",
403                                    );
404                                    None
405                                } else {
406                                    dealer_state
407                                        .as_ref()
408                                        .and_then(|dealer_state| dealer_state.finalized())
409                                };
410                                let _ = get_dealer_log
411                                .response
412                                .send(log);
413                            });
414                        }
415
416                        Command::GetDkgOutcome(request) => {
417                            if let Some(target) = ancestry_stream.tip()
418                            && target == request.digest
419                            {
420                                ancestry_stream.update_receiver((msg.cause, request));
421                                continue;
422                            }
423                            if let Ok(Some((hole, request))) = self
424                                .handle_get_dkg_outcome(
425                                    &msg.cause,
426                                    storage,
427                                    &player_state,
428                                    &round,
429                                    &state,
430                                    request,
431                                )
432                                .await
433                            {
434                                let stream = match self.config.marshal.ancestry((None, hole)).await {
435                                    Some(stream) => stream,
436                                    None => break Err(eyre!("marshal mailbox is closed")),
437                                };
438                                ancestry_stream.set(
439                                    (msg.cause, request),
440                                    stream,
441                                );
442                            }
443                        }
444                        Command::VerifyDealerLog(verify) => {
445                            self.handle_verify_dealer_log(
446                                &state,
447                                &round,
448                                verify,
449                            );
450                        }
451                    }
452                }
453
454                notarized_block = ancestry_stream.next() => {
455                    if let Some(block) = notarized_block {
456                        storage.cache_notarized_block(&round, block);
457                        let (cause, request) = ancestry_stream
458                            .take_request()
459                            .expect("if the stream is yielding blocks, there must be a receiver");
460                        if let Ok(Some((hole, request))) = self
461                            .handle_get_dkg_outcome(&cause, storage, &player_state, &round, &state, request)
462                            .await
463                        {
464                            let stream = match self.config.marshal.ancestry((None, hole)).await {
465                                Some(stream) => stream,
466                                None => break Err(eyre!("marshal mailbox is closed")),
467                            };
468                            ancestry_stream.set(
469                                (cause, request),
470                                stream,
471                            );
472                        }
473                    }
474                }
475            )
476        }
477    }
478
479    fn handle_verify_dealer_log(
480        &self,
481        state: &state::State,
482        round: &state::Round,
483        VerifyDealerLog {
484            epoch,
485            bytes,
486            response,
487        }: VerifyDealerLog,
488    ) {
489        if state.epoch != epoch {
490            let _ = response.send(Err(eyre!(
491                "requested dealer log for epoch `{epoch}`, but current round \
492                is for epoch `{}`",
493                state.epoch
494            )));
495            return;
496        }
497        let res = SignedDealerLog::<MinSig, PrivateKey>::read_cfg(
498            &mut &bytes[..],
499            &NZU32!(round.players().len() as u32),
500        )
501        .wrap_err("failed reading dealer log from header")
502        .and_then(|log| {
503            log.check(round.info())
504                .map(|(dealer, _)| dealer)
505                .ok_or_eyre("not a dealer in the current round")
506        })
507        .inspect(|_| {
508            self.metrics.dealings_read.inc();
509        })
510        .inspect_err(|_| {
511            self.metrics.bad_dealings.inc();
512        });
513        let _ = response.send(res);
514    }
515
516    /// Determines if it makes sense to continue with the current DKG ceremony.
517    ///
518    /// If `finalized_tip` indicates that the *next* epoch was already finalized,
519    /// then there is no point in continuing with the current DKG round.
520    ///
521    /// We know that an epoch was finalized by either observing the boundary
522    /// block for said epoch, or by observing an even newer epoch.
523    #[instrument(
524        skip_all,
525        fields(
526            round.epoch = %round.epoch(),
527            finalized.tip = %finalized_tip,
528            finalized.epoch = tracing::field::Empty,
529        ),
530    )]
531    async fn should_skip_round(&mut self, round: &state::Round, finalized_tip: Height) -> bool {
532        let epoch_info = self
533            .config
534            .epoch_strategy
535            .containing(finalized_tip)
536            .expect("epoch strategy is valid for all heights");
537        Span::current().record(
538            "finalized.epoch",
539            tracing::field::display(epoch_info.epoch()),
540        );
541
542        let should_skip_round = epoch_info.epoch() > round.epoch().next()
543            || (epoch_info.epoch() == round.epoch().next() && epoch_info.last() == finalized_tip);
544
545        if should_skip_round {
546            let boundary_height = self
547                .config
548                .epoch_strategy
549                .last(round.epoch())
550                .expect("epoch strategy is valid for all epochs");
551            info!(
552                %boundary_height,
553                "confirmed that the network is at least 2 epochs aheads of us; \
554                setting synchronization floor to boundary height of our DKG's \
555                epoch and reporting that the rest of the DKG round should be \
556                skipped",
557            );
558
559            // NOTE: `set_floor(height)` implies that the next block sent by
560            // marshal will be height + 1.
561            if let Some(one_before_boundary) = boundary_height.previous() {
562                self.config.marshal.set_floor(one_before_boundary).await;
563            }
564        }
565        should_skip_round
566    }
567
568    /// Handles a finalized block.
569    ///
570    /// Returns a new [`State`] after finalizing the boundary block of the epoch.
571    ///
572    /// Some block heights are special cased:
573    ///
574    /// + first height of an epoch: notify the epoch manager that the previous
575    ///   epoch can be shut down.
576    /// + last height of an epoch:
577    ///     1. notify the epoch manager that a new epoch can be entered;
578    ///     2. prepare for the state of the next iteration by finalizing the current
579    ///        DKG round and reading the next players (players in the DKG round after
580    ///        the immediately next one) from the smart contract.
581    ///
582    /// The processing of all other blocks depends on which part of the epoch
583    /// they fall in:
584    ///
585    /// + first half: if we are a dealer, distribute the generated DKG shares
586    ///   to the players and collect their acks. If we are a player, receive
587    ///   DKG shares and respond with an ack.
588    /// + exact middle of an epoch: if we are a dealer, generate the dealer log
589    ///   of the DKG ceremony.
590    /// + second half of the epoch: read dealer logs from blocks.
591    #[instrument(
592        parent = &cause,
593        skip_all,
594        fields(
595            dkg.epoch = %round.epoch(),
596            block.height = %block.height(),
597            block.extra_data.bytes = block.header().extra_data().len(),
598        ),
599        err,
600    )]
601    #[expect(
602        clippy::too_many_arguments,
603        reason = "easiest way to express this for now"
604    )]
605    // TODO(janis): replace this by a struct?
606    async fn handle_finalized_block<TStorageContext, TSender>(
607        &mut self,
608        cause: Span,
609        state: &state::State,
610        round: &state::Round,
611        round_channel: &mut TSender,
612        storage: &mut state::Storage<TStorageContext>,
613        dealer_state: &mut Option<state::Dealer>,
614        player_state: &mut Option<state::Player>,
615        block: Block,
616    ) -> eyre::Result<Option<State>>
617    where
618        TStorageContext: commonware_runtime::Metrics + Clock + commonware_runtime::Storage,
619        TSender: Sender<PublicKey = PublicKey>,
620    {
621        let epoch_info = self
622            .config
623            .epoch_strategy
624            .containing(block.height())
625            .expect("epoch strategy is covering all block heights");
626
627        match round.epoch().cmp(&epoch_info.epoch()) {
628            std::cmp::Ordering::Less => {
629                bail!(
630                    "block is for a future epoch `{}`, but the current DKG \
631                    loop is for epoch `{}`; this should never happen because \
632                    the DKG actor drives which epochs are entered or skipped",
633                    epoch_info.epoch(),
634                    round.epoch(),
635                );
636            }
637            std::cmp::Ordering::Greater => {
638                warn!(
639                    "ignoring block for prior epoch; older blocks are replayed \
640                    against the DKG loop when a node was shut down right \
641                    after a boundary block completed an epoch, but before \
642                    it was fully processed by other actors"
643                );
644                return Ok(None);
645            }
646            std::cmp::Ordering::Equal => {
647                // Normal, expected behavior.
648            }
649        }
650
651        match epoch_info.phase() {
652            EpochPhase::Early => {
653                if let Some(dealer_state) = dealer_state {
654                    self.distribute_shares(
655                        storage,
656                        round.epoch(),
657                        dealer_state,
658                        player_state,
659                        round_channel,
660                    )
661                    .await;
662                }
663            }
664            EpochPhase::Midpoint | EpochPhase::Late => {
665                if let Some(dealer_state) = dealer_state {
666                    dealer_state.finalize();
667                }
668            }
669        }
670
671        if block.height() != epoch_info.last() {
672            if !block.header().extra_data().is_empty() {
673                'handle_log: {
674                    let (dealer, log) =
675                        match read_dealer_log(block.header().extra_data().as_ref(), round) {
676                            Err(reason) => {
677                                warn!(
678                                    %reason,
679                                    "failed to read dealer log from block \
680                                    extraData header field");
681                                break 'handle_log;
682                            }
683                            Ok((dealer, log)) => (dealer, log),
684                        };
685                    storage
686                        .append_dealer_log(round.epoch(), dealer.clone(), log)
687                        .await
688                        .wrap_err("failed to append log to journal")?;
689                    if self.config.me.public_key() == dealer
690                        && let Some(dealer_state) = dealer_state
691                    {
692                        info!(
693                            "found own dealing in finalized block; deleting it \
694                            from state to not write it again"
695                        );
696                        dealer_state.take_finalized();
697                    }
698                }
699            }
700
701            storage
702                .append_finalized_block(round.epoch(), block)
703                .await
704                .wrap_err("failed to append finalized block to journal")?;
705
706            return Ok(None);
707        }
708
709        info!("reached last block of epoch; reading DKG outcome from header");
710
711        let onchain_outcome = tempo_dkg_onchain_artifacts::OnchainDkgOutcome::read(
712            &mut block.header().extra_data().as_ref(),
713        )
714        .expect("the last block of an epoch must contain the DKG outcome");
715
716        info!("reading validator from contract");
717
718        let (local_output, mut share) = if let Some((outcome, share)) =
719            storage.get_dkg_outcome(&state.epoch, &block.parent_digest())
720        {
721            debug!("using cached DKG outcome");
722            (outcome.clone(), share.clone())
723        } else {
724            let mut logs = Logs::<MinSig, PublicKey, N3f1>::new(round.info().clone());
725            for (k, v) in storage.logs_for_epoch(round.epoch()) {
726                logs.record(k.clone(), v.clone());
727            }
728
729            let player_outcome = if let Some(player) = player_state.take() {
730                info!("we were a player in the ceremony; finalizing share");
731                match player.finalize(&mut self.context, logs.clone(), &Sequential) {
732                    Ok((new_output, new_share)) => {
733                        info!("local DKG ceremony was a success");
734                        Some((new_output, state::ShareState::Plaintext(Some(new_share))))
735                    }
736                    Err(
737                        reason
738                        @ commonware_cryptography::bls12381::dkg::Error::MissingPlayerDealing,
739                    ) => {
740                        warn!(
741                            reason = %eyre::Report::new(reason),
742                            "missing critical DKG state to reconstruct a share in this epoch; has \
743                            consensus state been deleted or a node with the same identity started \
744                            without consensus state? Finalizing the current round as an observer \
745                            and will not have a share in the next epoch"
746                        );
747                        None
748                    }
749                    Err(error) => {
750                        warn!(
751                            error = %eyre::Report::new(error),
752                            "local DKG ceremony was a failure",
753                        );
754                        Some((state.output.clone(), state.share.clone()))
755                    }
756                }
757            } else {
758                None
759            };
760
761            if let Some(outcome) = player_outcome {
762                outcome
763            } else {
764                match observe::<_, _, N3f1, ed25519::Batch>(&mut self.context, logs, &Sequential) {
765                    Ok(output) => {
766                        info!("local DKG ceremony was a success");
767                        (output, state::ShareState::Plaintext(None))
768                    }
769                    Err(error) => {
770                        warn!(
771                            error = %eyre::Report::new(error),
772                            "local DKG ceremony was a failure",
773                        );
774                        (state.output.clone(), state.share.clone())
775                    }
776                }
777            }
778        };
779
780        if local_output != onchain_outcome.output {
781            let am_player = onchain_outcome
782                .next_players
783                .position(&self.config.me.public_key())
784                .is_some();
785            warn!(
786                am_player,
787                "the output of the local DKG ceremony does not match what is \
788                on chain; something is terribly wrong; will try and participate \
789                in the next round (if a player), but if we are misbehaving and \
790                other nodes are blocking us it might be time to delete this node \
791                and spin up a new identity",
792            );
793            share = state::ShareState::Plaintext(None);
794        }
795
796        // Because we use cached data, we need to check for DKG success here:
797        // if the on-chain output is the same as the input into the loop (which
798        // is just state.output), then we know the DKG failed.
799        if onchain_outcome.output == state.output {
800            self.metrics.failures.inc();
801        } else {
802            self.metrics.successes.inc();
803        }
804
805        Ok(Some(state::State {
806            epoch: onchain_outcome.epoch,
807            seed: Summary::random(&mut self.context),
808            output: onchain_outcome.output.clone(),
809            share,
810            players: onchain_outcome.next_players,
811            is_full_dkg: onchain_outcome.is_next_full_dkg,
812        }))
813    }
814
815    /// Looks for and handles a finalized boundary block.
816    ///
817    /// Called if the DKG round if asked to skip ahead to the boundary block.
818    /// Does not consider any state for the current DKG round; just reads the
819    /// DKG outcome from the header and returns it.
820    #[instrument(
821        parent = &cause,
822        skip_all,
823        fields(
824            dkg.epoch = %round.epoch(),
825            block.height = %block.height(),
826            block.extra_data.bytes = block.header().extra_data().len(),
827        ),
828        err,
829    )]
830    async fn handle_finalized_boundary(
831        &mut self,
832        cause: Span,
833        round: &state::Round,
834        block: Block,
835    ) -> eyre::Result<Option<State>> {
836        let epoch_info = self
837            .config
838            .epoch_strategy
839            .containing(block.height())
840            .expect("epoch strategy is covering all block heights");
841
842        // This check exists to match that of `handle_finalized_block`.
843        // However, in practice it is extremely unlikely to ever be hit because
844        // it would require that the node observes the finalized network tip
845        // (from the network) before replaying a locally replayed block.
846        match round.epoch().cmp(&epoch_info.epoch()) {
847            std::cmp::Ordering::Less => {
848                bail!(
849                    "block is for a future epoch `{}`, but the current DKG \
850                    loop is for epoch `{}`; this should never happen because \
851                    the DKG actor drives which epochs are entered or skipped",
852                    epoch_info.epoch(),
853                    round.epoch(),
854                );
855            }
856            std::cmp::Ordering::Greater => {
857                warn!(
858                    "ignoring block for prior epoch; older blocks are replayed \
859                    against the DKG loop when a node was shut down right \
860                    after a boundary block completed an epoch, but before \
861                    it was fully processed by other actors"
862                );
863                return Ok(None);
864            }
865            std::cmp::Ordering::Equal => {
866                // Normal, expected behavior.
867            }
868        }
869
870        if block.height() != epoch_info.last() {
871            return Ok(None);
872        }
873
874        info!("found boundary block; reading DKG outcome from header");
875
876        let onchain_outcome = tempo_dkg_onchain_artifacts::OnchainDkgOutcome::read(
877            &mut block.header().extra_data().as_ref(),
878        )
879        .expect("the last block of an epoch must contain the DKG outcome");
880
881        info!("reading validators from contract");
882
883        Ok(Some(state::State {
884            epoch: onchain_outcome.epoch,
885            seed: Summary::random(&mut self.context),
886            output: onchain_outcome.output.clone(),
887            share: state::ShareState::Plaintext(None),
888            players: onchain_outcome.next_players,
889            is_full_dkg: onchain_outcome.is_next_full_dkg,
890        }))
891    }
892
893    #[instrument(skip_all, fields(me = %self.config.me.public_key(), %epoch))]
894    async fn distribute_shares<TStorageContext, TSender>(
895        &self,
896        storage: &mut state::Storage<TStorageContext>,
897        epoch: Epoch,
898        dealer_state: &mut state::Dealer,
899        player_state: &mut Option<state::Player>,
900        round_channel: &mut TSender,
901    ) where
902        TStorageContext: commonware_runtime::Metrics + Clock + commonware_runtime::Storage,
903        TSender: Sender<PublicKey = PublicKey>,
904    {
905        let me = self.config.me.public_key();
906        for (player, pub_msg, priv_msg) in dealer_state.shares_to_distribute().collect::<Vec<_>>() {
907            if player == me {
908                if let Some(player_state) = player_state
909                    && let Ok(ack) = player_state
910                        .receive_dealing(storage, epoch, me.clone(), pub_msg, priv_msg)
911                        .await
912                        .inspect(|_| {
913                            self.metrics.shares_distributed.inc();
914                            self.metrics.shares_received.inc();
915                        })
916                        .inspect_err(|error| warn!(%error, "failed to store our own dealing"))
917                    && let Ok(()) = dealer_state
918                        .receive_ack(storage, epoch, me.clone(), ack)
919                        .await
920                        .inspect_err(|error| warn!(%error, "failed to store our own ACK"))
921                {
922                    self.metrics.acks_received.inc();
923                    self.metrics.acks_sent.inc();
924                    info!("stored our own ACK and share");
925                }
926            } else {
927                // Send to remote player
928                let payload = Message::Dealer(pub_msg, priv_msg).encode();
929                match round_channel
930                    .send(Recipients::One(player.clone()), payload, true)
931                    .await
932                {
933                    Ok(success) => {
934                        if success.is_empty() {
935                            // TODO(janis): figure out what it means if the response
936                            // is empty. Does it just mean the other party failed
937                            // to respond?
938                            info!(%player, "failed to send share");
939                        } else {
940                            self.metrics.shares_distributed.inc();
941                            info!(%player, "share sent");
942                        }
943                    }
944                    Err(error) => {
945                        warn!(%player, %error, "error sending share");
946                    }
947                }
948            }
949        }
950    }
951
952    #[instrument(
953        skip_all,
954        fields(
955            epoch = %round.epoch(),
956            %from,
957            bytes = message.len()),
958        err)]
959    #[expect(
960        clippy::too_many_arguments,
961        reason = "easiest way to express this for now"
962    )]
963    // TODO(janis): replace this by a struct?
964    async fn handle_network_msg<TStorageContext>(
965        &self,
966        round: &state::Round,
967        round_channel: &mut impl Sender<PublicKey = PublicKey>,
968        storage: &mut state::Storage<TStorageContext>,
969        dealer_state: Option<&mut state::Dealer>,
970        player_state: Option<&mut state::Player>,
971        from: PublicKey,
972        mut message: IoBuf,
973    ) -> eyre::Result<()>
974    where
975        TStorageContext: commonware_runtime::Metrics + Clock + commonware_runtime::Storage,
976    {
977        let msg = Message::read_cfg(&mut message, &NZU32!(round.players().len() as u32))
978            .wrap_err("failed reading p2p message")?;
979
980        match msg {
981            Message::Dealer(pub_msg, priv_msg) => {
982                if let Some(player_state) = player_state {
983                    info!("received message from a dealer");
984                    self.metrics.shares_received.inc();
985                    let ack = player_state
986                        .receive_dealing(storage, round.epoch(), from.clone(), pub_msg, priv_msg)
987                        .await
988                        .wrap_err("failed storing dealing")?;
989
990                    if let Err(error) = round_channel
991                        .send(
992                            Recipients::One(from.clone()),
993                            Message::Ack(ack).encode(),
994                            true,
995                        )
996                        .await
997                    {
998                        // FIXME(janis): the GATs in the Sender (and LimitedSender)
999                        // lead to `borrowed data escapes outside of method` errors.
1000                        // `wrap_err` with early return does not work, and neither
1001                        // does `Report::new` nor `&error as &dyn std::error::Error`.
1002                        warn!(
1003                            reason = ?error,
1004                            "failed returning ACK to dealer",
1005                        );
1006                        bail!("failed returning ACK to dealer");
1007                    }
1008                    info!("returned ACK to dealer");
1009                    self.metrics.acks_sent.inc();
1010                } else {
1011                    info!("received a dealer message, but we are not a player");
1012                }
1013            }
1014            Message::Ack(ack) => {
1015                if let Some(dealer_state) = dealer_state {
1016                    info!("received an ACK");
1017                    self.metrics.acks_received.inc();
1018                    dealer_state
1019                        .receive_ack(storage, round.epoch(), from, ack)
1020                        .await
1021                        .wrap_err("failed storing ACK")?;
1022                } else {
1023                    info!("received an ACK, but we are not a dealer");
1024                }
1025            }
1026        }
1027        Ok(())
1028    }
1029
1030    /// Attempts to serve a `GetDkgOutcome` request by finalizing the DKG outcome.
1031    ///
1032    /// A DKG outcome can be finalized in one of the following cases:
1033    ///
1034    /// 1. if the DKG actor has observed as many dealer logs as there are dealers.
1035    /// 2. if all blocks in an epoch were observed (finalized + notarized leading
1036    /// up to `request.digest`).
1037    ///
1038    /// If the DKG was finalized this way, this method will return `None`.
1039    /// Otherwise will return `Some((digest, request))` if the block identified
1040    /// by `digest` was missing and needs to be fetched first to ensure all
1041    /// blocks in an epoch were observed.
1042    #[instrument(
1043        parent = cause,
1044        skip_all,
1045        fields(
1046            as_player = player_state.is_some(),
1047            our.epoch = %round.epoch(),
1048        ),
1049        err(level = Level::WARN),
1050    )]
1051    async fn handle_get_dkg_outcome<TStorageContext>(
1052        &mut self,
1053        cause: &Span,
1054        storage: &mut state::Storage<TStorageContext>,
1055        player_state: &Option<state::Player>,
1056        round: &state::Round,
1057        state: &State,
1058        request: GetDkgOutcome,
1059    ) -> eyre::Result<Option<(Digest, GetDkgOutcome)>>
1060    where
1061        TStorageContext: commonware_runtime::Metrics + Clock + commonware_runtime::Storage,
1062    {
1063        let epoch_info = self
1064            .config
1065            .epoch_strategy
1066            .containing(request.height)
1067            .expect("our strategy covers all epochs");
1068
1069        ensure!(
1070            round.epoch() == epoch_info.epoch(),
1071            "request is for epoch `{}`, not our epoch",
1072            epoch_info.epoch(),
1073        );
1074
1075        let output = if let Some((output, _)) = storage
1076            .get_dkg_outcome(&state.epoch, &request.digest)
1077            .cloned()
1078        {
1079            output
1080        } else {
1081            let mut raw_logs = storage
1082                .logs_for_epoch(round.epoch())
1083                .map(|(k, v)| (k.clone(), v.clone()))
1084                .collect::<BTreeMap<_, _>>();
1085
1086            'ensure_enough_logs: {
1087                if raw_logs.len() == round.dealers().len() {
1088                    info!("collected as many logs as there are dealers; concluding DKG");
1089                    break 'ensure_enough_logs;
1090                }
1091
1092                info!(
1093                    "did not have all dealer logs yet; will try to extend with \
1094                    logs read from notarized blocks and concluding DKG that way",
1095                );
1096                let (mut height, mut digest) = (request.height, request.digest);
1097                while height >= epoch_info.first()
1098                    && Some(height)
1099                        >= storage
1100                            .get_latest_finalized_block_for_epoch(&round.epoch())
1101                            .map(|(_, info)| info.height)
1102                {
1103                    if let Some(block) =
1104                        storage.get_notarized_reduced_block(&round.epoch(), &digest)
1105                    {
1106                        raw_logs.extend(block.log.clone());
1107                        height = if let Some(height) = block.height.previous() {
1108                            height
1109                        } else {
1110                            break 'ensure_enough_logs;
1111                        };
1112                        digest = block.parent;
1113                    } else {
1114                        return Ok(Some((digest, request)));
1115                    }
1116                }
1117            }
1118
1119            let mut logs = Logs::<MinSig, PublicKey, N3f1>::new(round.info().clone());
1120            for (k, v) in raw_logs {
1121                logs.record(k, v);
1122            }
1123
1124            // Create a player-state ad hoc: the DKG player object is not
1125            // cloneable, and finalizing consumes it.
1126            let player_state = player_state.is_some().then(||
1127                storage
1128                        .create_player_for_round(self.config.me.clone(), round)
1129                        .expect("created a player instance before, must be able to create it again")
1130                        .expect("did not return a player instance even though we created it for this round already")
1131            );
1132
1133            let (output, share) = {
1134                let player_outcome = if let Some(player) = player_state {
1135                    info!("we were a player in the ceremony; finalizing share");
1136                    match player.finalize(&mut self.context, logs.clone(), &Sequential) {
1137                        Ok((new_output, new_share)) => {
1138                            info!("local DKG ceremony was a success");
1139                            Some((new_output, state::ShareState::Plaintext(Some(new_share))))
1140                        }
1141                        Err(
1142                            reason
1143                            @ commonware_cryptography::bls12381::dkg::Error::MissingPlayerDealing,
1144                        ) => {
1145                            warn!(
1146                                reason = %eyre::Report::new(reason),
1147                                "missing critical DKG state to reconstruct a share in this epoch; has \
1148                                consensus state been deleted or a node with the same identity started \
1149                                without consensus state? Finalizing the current round as an observer \
1150                                and will not have a share in the next epoch"
1151                            );
1152                            None
1153                        }
1154                        Err(error) => {
1155                            warn!(
1156                                error = %eyre::Report::new(error),
1157                                "local DKG ceremony was a failure",
1158                            );
1159                            Some((state.output.clone(), state.share.clone()))
1160                        }
1161                    }
1162                } else {
1163                    None
1164                };
1165
1166                if let Some(outcome) = player_outcome {
1167                    outcome
1168                } else {
1169                    match observe::<_, _, N3f1, ed25519::Batch>(
1170                        &mut self.context,
1171                        logs,
1172                        &Sequential,
1173                    ) {
1174                        Ok(output) => {
1175                            info!("local DKG ceremony was a success");
1176                            (output, state::ShareState::Plaintext(None))
1177                        }
1178                        Err(error) => {
1179                            warn!(
1180                                error = %eyre::Report::new(error),
1181                                "local DKG ceremony was a failure",
1182                            );
1183                            (state.output.clone(), state.share.clone())
1184                        }
1185                    }
1186                }
1187            };
1188
1189            storage.cache_dkg_outcome(state.epoch, request.digest, output.clone(), share);
1190            output
1191        };
1192
1193        // Check if next ceremony should be full.
1194        let next_epoch = state.epoch.next();
1195        let will_be_re_dkg = read_re_dkg_epoch(&self.config.execution_node, request.digest)
1196            // in theory it should never fail, but if it does, just stick to reshare.
1197            .is_ok_and(|epoch| epoch == next_epoch.get());
1198        info!(
1199            will_be_re_dkg,
1200            %next_epoch,
1201            "determined if the next epoch will be a reshare or full re-dkg process",
1202        );
1203
1204        let next_players =
1205            determine_next_players_at_hash(&self.config.execution_node, request.digest.0)
1206                .wrap_err("could not determine who the next players are supposed to be")?;
1207
1208        request
1209            .response
1210            .send(OnchainDkgOutcome {
1211                epoch: next_epoch,
1212                output,
1213                next_players,
1214                is_next_full_dkg: will_be_re_dkg,
1215            })
1216            .map_err(|_| {
1217                eyre!("requester went away before speculative DKG outcome could be sent")
1218            })?;
1219
1220        Ok(None)
1221    }
1222
1223    #[instrument(skip_all, fields(epoch = %state.epoch), err(level = Level::WARN))]
1224    fn enter_epoch(&mut self, state: &state::State) -> eyre::Result<()> {
1225        self.config
1226            .epoch_manager
1227            .enter(
1228                state.epoch,
1229                state.output.public().clone(),
1230                state.share.clone().into_inner(),
1231                state.dealers().clone(),
1232            )
1233            .wrap_err("could not instruct epoch manager to enter epoch")
1234    }
1235
1236    #[instrument(skip_all, fields(epoch = %state.epoch), err(level = Level::WARN))]
1237    fn exit_epoch(&mut self, state: &state::State) -> eyre::Result<()> {
1238        self.config
1239            .epoch_manager
1240            .exit(state.epoch)
1241            .wrap_err("could not instruct epoch manager to enter epoch")
1242    }
1243}
1244
1245#[instrument(skip_all, err)]
1246async fn read_initial_state_and_set_floor<TContext>(
1247    context: &mut TContext,
1248    node: &TempoFullNode,
1249    share: Option<Share>,
1250    epoch_strategy: &FixedEpocher,
1251    marshal: &mut crate::alias::marshal::Mailbox,
1252) -> eyre::Result<State>
1253where
1254    TContext: Clock + CryptoRngCore,
1255{
1256    let latest_finalized = node
1257        .provider
1258        .finalized_block_num_hash()
1259        .wrap_err("unable to read highest finalized block from execution layer")?
1260        .unwrap_or_else(|| BlockNumHash::new(0, node.chain_spec().genesis_hash()));
1261
1262    let epoch_info = epoch_strategy
1263        .containing(Height::new(latest_finalized.number))
1264        .expect("epoch strategy is for all heights");
1265
1266    let latest_boundary = if epoch_info.last().get() == latest_finalized.number {
1267        latest_finalized.number
1268    } else {
1269        epoch_info
1270            .epoch()
1271            .previous()
1272            .map_or_else(Height::zero, |previous| {
1273                epoch_strategy
1274                    .last(previous)
1275                    .expect("epoch strategy is for all epochs")
1276            })
1277            .get()
1278    };
1279    info!(
1280        %latest_boundary,
1281        latest_finalized.number,
1282        %latest_finalized.hash,
1283        "execution layer reported newest available block, reading on-chain \
1284        DKG outcome from last boundary height, and validator state from newest \
1285        block"
1286    );
1287
1288    let boundary_header = node
1289        .provider
1290        .header_by_number(latest_boundary)
1291        .map_or_else(
1292            |e| Err(eyre::Report::new(e)),
1293            |header| header.ok_or_eyre("execution layer reported it had no header"),
1294        )
1295        .wrap_err_with(|| {
1296            format!("failed to read header for latest boundary block number `{latest_boundary}`")
1297        })?;
1298
1299    let onchain_outcome = tempo_dkg_onchain_artifacts::OnchainDkgOutcome::read(
1300        &mut boundary_header.extra_data().as_ref(),
1301    )
1302    .wrap_err("the boundary header did not contain the on-chain DKG outcome")?;
1303
1304    let share = state::ShareState::Plaintext('verify_initial_share: {
1305        let Some(share) = share else {
1306            break 'verify_initial_share None;
1307        };
1308        let Ok(partial) = onchain_outcome.sharing().partial_public(share.index) else {
1309            warn!(
1310                "the index of the provided share exceeds the polynomial of the \
1311                on-chain DKG outcome; ignoring the share"
1312            );
1313            break 'verify_initial_share None;
1314        };
1315        if share.public::<MinSig>() != partial {
1316            warn!(
1317                "the provided share does not match the polynomial of the \
1318                on-chain DKG outcome; ignoring the share"
1319            );
1320            break 'verify_initial_share None;
1321        }
1322        Some(share)
1323    });
1324
1325    info!(%latest_boundary, "setting sync floor");
1326    marshal.set_floor(Height::new(latest_boundary)).await;
1327
1328    Ok(State {
1329        epoch: onchain_outcome.epoch,
1330        seed: Summary::random(context),
1331        output: onchain_outcome.output.clone(),
1332        share,
1333        players: onchain_outcome.next_players,
1334        is_full_dkg: onchain_outcome.is_next_full_dkg,
1335    })
1336}
1337
1338#[derive(Clone)]
1339struct Metrics {
1340    shares_distributed: Gauge,
1341    shares_received: Gauge,
1342    acks_received: Gauge,
1343    acks_sent: Gauge,
1344    dealings_read: Gauge,
1345    bad_dealings: Gauge,
1346
1347    failures: Counter,
1348    successes: Counter,
1349
1350    dealers: Gauge,
1351    players: Gauge,
1352
1353    how_often_dealer: Counter,
1354    how_often_player: Counter,
1355
1356    rounds_skipped: Counter,
1357}
1358
1359impl Metrics {
1360    fn init<TContext>(context: &TContext) -> Self
1361    where
1362        TContext: commonware_runtime::Metrics,
1363    {
1364        let failures = Counter::default();
1365        context.register(
1366            "ceremony_failures",
1367            "the number of failed ceremonies a node participated in",
1368            failures.clone(),
1369        );
1370
1371        let successes = Counter::default();
1372        context.register(
1373            "ceremony_successes",
1374            "the number of successful ceremonies a node participated in",
1375            successes.clone(),
1376        );
1377
1378        let dealers = Gauge::default();
1379        context.register(
1380            "ceremony_dealers",
1381            "the number of dealers in the currently running ceremony",
1382            dealers.clone(),
1383        );
1384        let players = Gauge::default();
1385        context.register(
1386            "ceremony_players",
1387            "the number of players in the currently running ceremony",
1388            players.clone(),
1389        );
1390
1391        let how_often_dealer = Counter::default();
1392        context.register(
1393            "how_often_dealer",
1394            "number of the times as node was active as a dealer",
1395            how_often_dealer.clone(),
1396        );
1397        let how_often_player = Counter::default();
1398        context.register(
1399            "how_often_player",
1400            "number of the times as node was active as a player",
1401            how_often_player.clone(),
1402        );
1403
1404        let shares_distributed = Gauge::default();
1405        context.register(
1406            "ceremony_shares_distributed",
1407            "the number of shares distributed by this node as a dealer in the current ceremony",
1408            shares_distributed.clone(),
1409        );
1410
1411        let shares_received = Gauge::default();
1412        context.register(
1413            "ceremony_shares_received",
1414            "the number of shares received by this node as a player in the current ceremony",
1415            shares_received.clone(),
1416        );
1417
1418        let acks_received = Gauge::default();
1419        context.register(
1420            "ceremony_acks_received",
1421            "the number of acknowledgments received by this node as a dealer in the current ceremony",
1422            acks_received.clone(),
1423        );
1424
1425        let acks_sent = Gauge::default();
1426        context.register(
1427            "ceremony_acks_sent",
1428            "the number of acknowledgments sent by this node as a player in the current ceremony",
1429            acks_sent.clone(),
1430        );
1431
1432        let dealings_read = Gauge::default();
1433        context.register(
1434            "ceremony_dealings_read",
1435            "the number of dealings read from the blockchain in the current ceremony",
1436            dealings_read.clone(),
1437        );
1438
1439        let bad_dealings = Gauge::default();
1440        context.register(
1441            "ceremony_bad_dealings",
1442            "the number of blocks where decoding and verifying dealings failed in the current ceremony",
1443            bad_dealings.clone(),
1444        );
1445
1446        let rounds_skipped = Counter::default();
1447        context.register(
1448            "rounds_skipped",
1449            "how many DKG rounds were skipped because the node fell too far behind and tried to catch up",
1450            rounds_skipped.clone(),
1451        );
1452
1453        Self {
1454            shares_distributed,
1455            shares_received,
1456            acks_received,
1457            acks_sent,
1458            dealings_read,
1459            bad_dealings,
1460            dealers,
1461            players,
1462            how_often_dealer,
1463            how_often_player,
1464            failures,
1465            successes,
1466            rounds_skipped,
1467        }
1468    }
1469
1470    fn reset(&self) {
1471        self.shares_distributed.set(0);
1472        self.shares_received.set(0);
1473        self.acks_received.set(0);
1474        self.acks_sent.set(0);
1475        self.dealings_read.set(0);
1476        self.bad_dealings.set(0);
1477    }
1478}
1479
1480/// A wrapper around [`marshal::ancestry::AncestorStream`] wrapped in
1481/// an option to make it easier to work with select macros.
1482///
1483/// Invariants: if the inner stream is set, then the matching original request
1484/// is also set.
1485struct AncestorStream {
1486    pending_request: Option<(Span, GetDkgOutcome)>,
1487    inner: Option<marshal::ancestry::AncestorStream<crate::alias::marshal::Mailbox, Block>>,
1488}
1489
1490impl AncestorStream {
1491    fn new() -> Self {
1492        Self {
1493            pending_request: None,
1494            inner: None,
1495        }
1496    }
1497
1498    fn take_request(&mut self) -> Option<(Span, GetDkgOutcome)> {
1499        self.inner.take();
1500        self.pending_request.take()
1501    }
1502
1503    fn set(
1504        &mut self,
1505        pending_request: (Span, GetDkgOutcome),
1506        stream: marshal::ancestry::AncestorStream<crate::alias::marshal::Mailbox, Block>,
1507    ) {
1508        self.pending_request.replace(pending_request);
1509        self.inner.replace(stream);
1510    }
1511
1512    fn tip(&self) -> Option<Digest> {
1513        self.pending_request.as_ref().map(|(_, req)| req.digest)
1514    }
1515
1516    fn update_receiver(&mut self, pending_request: (Span, GetDkgOutcome)) {
1517        self.pending_request.replace(pending_request);
1518    }
1519}
1520
1521impl Stream for AncestorStream {
1522    type Item = Block;
1523
1524    fn poll_next(
1525        mut self: std::pin::Pin<&mut Self>,
1526        cx: &mut std::task::Context<'_>,
1527    ) -> std::task::Poll<Option<Self::Item>> {
1528        let item = {
1529            let this = match self.inner.as_mut() {
1530                Some(inner) => inner,
1531                None => return Poll::Ready(None),
1532            };
1533            this.poll_next_unpin(cx)
1534        };
1535        match futures::ready!(item) {
1536            None => {
1537                self.inner.take();
1538                Poll::Ready(None)
1539            }
1540            Some(block) => Poll::Ready(Some(block)),
1541        }
1542    }
1543}
1544
1545impl FusedStream for AncestorStream {
1546    fn is_terminated(&self) -> bool {
1547        self.inner.is_none()
1548    }
1549}
1550
1551fn read_dealer_log(
1552    mut bytes: &[u8],
1553    round: &state::Round,
1554) -> eyre::Result<(PublicKey, DealerLog<MinSig, PublicKey>)> {
1555    let signed_log = dkg::SignedDealerLog::<MinSig, PrivateKey>::read_cfg(
1556        &mut bytes,
1557        &NZU32!(round.players().len() as u32),
1558    )
1559    .wrap_err("could not decode as signed dealer log")?;
1560
1561    let (dealer, log) = signed_log
1562        .check(round.info())
1563        .ok_or_eyre("failed checking signed log against current round")?;
1564    Ok((dealer, log))
1565}
1566
1567/// Determines the next players depending on the header timestamp identified by `digest`.
1568///
1569/// This function should only be used when constructing or verifying a proposal.
1570/// `digest` should therefore always refer to the parent parent of the proposal.
1571///
1572/// If the execution layer does not have a block corresponding to `digest`
1573/// available then it cannot propose or verify a block.
1574#[instrument(skip_all, fields(%hash), err(level = Level::WARN))]
1575fn determine_next_players_at_hash(
1576    node: &TempoFullNode,
1577    hash: B256,
1578) -> eyre::Result<ordered::Set<PublicKey>> {
1579    let next_players =
1580        read_active_and_known_peers_at_block_hash(node, &ordered::Set::default(), hash)
1581            .wrap_err("failed reading peers from  validator config v2")?
1582            .into_keys();
1583
1584    debug!(?next_players, "determined next players");
1585    Ok(next_players)
1586}
1587
1588/// Reads the `nextFullDkgCeremony` epoch value from one of the validator config contracts.
1589///
1590/// This is used to determine if the next DKG ceremony should be a full ceremony
1591/// (new polynomial) instead of a reshare.
1592///
1593/// This function should only be used when constructing or verifying a proposal.
1594/// `digest` should therefore always refer to the parent parent of the proposal.
1595///
1596/// If the execution layer does not have a block corresponding to `digest`
1597/// available then it cannot propose or verify a block.
1598#[instrument(
1599    skip_all,
1600    fields(
1601        %digest,
1602    ),
1603    err(level = Level::WARN)
1604    ret,
1605)]
1606pub(crate) fn read_re_dkg_epoch(node: &TempoFullNode, digest: Digest) -> eyre::Result<u64> {
1607    read_validator_config_at_block_hash(node, digest.0, |config: &ValidatorConfigV2| {
1608        config
1609            .get_next_network_identity_rotation_epoch()
1610            .map_err(eyre::Report::new)
1611    })
1612    .map(|(_, _, epoch)| epoch)
1613}