Skip to main content

tempo_commonware_node/dkg/manager/actor/
state.rs

1use std::{
2    collections::{BTreeMap, HashMap},
3    num::{NonZeroU16, NonZeroU32, NonZeroUsize},
4};
5
6use alloy_consensus::BlockHeader as _;
7use commonware_codec::{EncodeSize, RangeCfg, Read, ReadExt, Write};
8use commonware_consensus::{
9    Block as _, Heightable as _,
10    types::{Epoch, Height},
11};
12use commonware_cryptography::{
13    Signer as _,
14    bls12381::{
15        dkg::{self, DealerPrivMsg, DealerPubMsg, Info, Output, PlayerAck, SignedDealerLog},
16        primitives::{
17            group::Share,
18            sharing::{Mode, ModeVersion},
19            variant::MinSig,
20        },
21    },
22    ed25519::{PrivateKey, PublicKey},
23    transcript::{Summary, Transcript},
24};
25use commonware_parallel::Strategy;
26use commonware_runtime::{BufferPooler, Clock, Metrics, buffer::paged::CacheRef};
27use commonware_storage::{journal::segmented, metadata};
28use commonware_utils::{N3f1, NZU16, NZU32, NZUsize, ordered};
29use eyre::{OptionExt, WrapErr as _, bail, eyre};
30use futures::{FutureExt as _, StreamExt as _, future::BoxFuture};
31use tracing::{debug, info, instrument, warn};
32
33use crate::consensus::{Digest, block::Block};
34
35const PAGE_SIZE: NonZeroU16 = NZU16!(1 << 12);
36const POOL_CAPACITY: NonZeroUsize = NZUsize!(1 << 13);
37const WRITE_BUFFER: NonZeroUsize = NZUsize!(1 << 12);
38const READ_BUFFER: NonZeroUsize = NZUsize!(1 << 20);
39
40/// The maximum number of validators ever permitted in the DKG ceremony.
41///
42/// u16::MAX is 2^16-1 validators, i.e. 65536, which is probably more than
43/// we will ever need. An alternative would be u8::MAX but that feels a bit
44/// too limited. There is extremely little cost doing u16::MAX instead.
45const MAXIMUM_VALIDATORS: NonZeroU32 = NZU32!(u16::MAX as u32);
46
47pub(super) fn builder() -> Builder {
48    Builder::default()
49}
50
51pub(super) struct Storage<TContext>
52where
53    TContext: commonware_runtime::Storage + Clock + Metrics,
54{
55    states: metadata::Metadata<TContext, u64, State>,
56    events: segmented::variable::Journal<TContext, Event>,
57
58    current: State,
59    cache: BTreeMap<Epoch, Events>,
60}
61
62impl<TContext> Storage<TContext>
63where
64    TContext: commonware_runtime::Storage + Clock + Metrics,
65{
66    /// Returns all player acknowledgments received during the given epoch.
67    fn acks_for_epoch(
68        &self,
69        epoch: Epoch,
70    ) -> impl Iterator<Item = (&PublicKey, &PlayerAck<PublicKey>)> {
71        self.cache
72            .get(&epoch)
73            .into_iter()
74            .flat_map(|cache| cache.acks.iter())
75    }
76
77    /// Returns all dealings received during the given epoch.
78    fn dealings_for_epoch(
79        &self,
80        epoch: Epoch,
81    ) -> impl Iterator<Item = (&PublicKey, &(DealerPubMsg<MinSig>, DealerPrivMsg))> {
82        self.cache
83            .get(&epoch)
84            .into_iter()
85            .flat_map(|cache| cache.dealings.iter())
86    }
87
88    /// Returns all dealings received during the given epoch.
89    pub(super) fn logs_for_epoch(
90        &self,
91        epoch: Epoch,
92    ) -> impl Iterator<Item = (&PublicKey, &dkg::DealerLog<MinSig, PublicKey>)> {
93        self.cache
94            .get(&epoch)
95            .into_iter()
96            .flat_map(|cache| cache.logs.iter())
97    }
98
99    /// Returns the DKG outcome for the current epoch.
100    pub(super) fn current(&self) -> State {
101        self.current.clone()
102    }
103
104    /// Persists the outcome of a DKG ceremony to state
105    pub(super) async fn set_state(&mut self, state: State) -> eyre::Result<()> {
106        if let Some(old) = self.states.put(state.epoch.get(), state.clone()) {
107            warn!(epoch = %old.epoch, "overwriting existing state");
108        }
109        self.states.sync().await.wrap_err("failed writing state")?;
110        self.current = state;
111        Ok(())
112    }
113
114    /// Append a player ACK to the journal.
115    #[instrument(
116        skip_all,
117        fields(
118            %epoch,
119            %player,
120        ),
121        err,
122    )]
123    async fn append_ack(
124        &mut self,
125        epoch: Epoch,
126        player: PublicKey,
127        ack: PlayerAck<PublicKey>,
128    ) -> eyre::Result<()> {
129        if self
130            .cache
131            .get(&epoch)
132            .is_some_and(|events| events.acks.contains_key(&player))
133        {
134            info!(%player, %epoch, "ack for player already found in cache, dropping");
135            return Ok(());
136        }
137
138        let section = epoch.get();
139        self.events
140            .append(
141                section,
142                &Event::Ack {
143                    player: player.clone(),
144                    ack: ack.clone(),
145                },
146            )
147            .await
148            .wrap_err("unable to write event to storage")?;
149
150        self.events
151            .sync(section)
152            .await
153            .wrap_err("unable to sync events journal")?;
154
155        self.cache
156            .entry(epoch)
157            .or_default()
158            .acks
159            .insert(player, ack);
160
161        Ok(())
162    }
163
164    /// Append a dealer's dealing to the journal.
165    #[instrument(
166        skip_all,
167        fields(
168            %epoch,
169            %dealer,
170        ),
171        err,
172    )]
173    async fn append_dealing(
174        &mut self,
175        epoch: Epoch,
176        dealer: PublicKey,
177        pub_msg: DealerPubMsg<MinSig>,
178        priv_msg: DealerPrivMsg,
179    ) -> eyre::Result<()> {
180        if self
181            .cache
182            .get(&epoch)
183            .is_some_and(|events| events.dealings.contains_key(&dealer))
184        {
185            info!(%dealer, %epoch, "dealing of dealer already found in cache, dropping");
186            return Ok(());
187        }
188
189        let section = epoch.get();
190        self.events
191            .append(
192                section,
193                &Event::Dealing {
194                    dealer: dealer.clone(),
195                    public_msg: pub_msg.clone(),
196                    private_msg: priv_msg.clone(),
197                },
198            )
199            .await
200            .wrap_err("unable to write event to storage")?;
201
202        self.events
203            .sync(section)
204            .await
205            .wrap_err("unable to sync events journal")?;
206
207        self.cache
208            .entry(epoch)
209            .or_default()
210            .dealings
211            .insert(dealer, (pub_msg, priv_msg));
212
213        Ok(())
214    }
215
216    /// Appends a dealer log to the journal
217    pub(super) async fn append_dealer_log(
218        &mut self,
219        epoch: Epoch,
220        dealer: PublicKey,
221        log: dkg::DealerLog<MinSig, PublicKey>,
222    ) -> eyre::Result<()> {
223        if self
224            .cache
225            .get(&epoch)
226            .is_some_and(|events| events.logs.contains_key(&dealer))
227        {
228            info!(
229                %dealer,
230                %epoch,
231                "dealer log already found in cache; dropping"
232            );
233            return Ok(());
234        }
235
236        let section = epoch.get();
237        self.events
238            .append(
239                section,
240                &Event::Log {
241                    dealer: dealer.clone(),
242                    log: log.clone(),
243                },
244            )
245            .await
246            .wrap_err("failed to append log to journal")?;
247        self.events
248            .sync(section)
249            .await
250            .wrap_err("unable to sync journal")?;
251
252        let cache = self.cache.entry(epoch).or_default();
253        cache.logs.insert(dealer, log);
254        Ok(())
255    }
256
257    /// Appends the height, digest, and parent of the finalized block to the journal.
258    pub(super) async fn append_finalized_block(
259        &mut self,
260        epoch: Epoch,
261        block: Block,
262    ) -> eyre::Result<()> {
263        let height = block.height();
264        let digest = block.digest();
265        let parent = block.parent();
266        if self
267            .cache
268            .get(&epoch)
269            .is_some_and(|events| events.finalized.contains_key(&height))
270        {
271            info!(
272                %height,
273                %digest,
274                %parent,
275                "finalized block was already found in cache; dropping",
276            );
277            return Ok(());
278        }
279
280        let section = epoch.get();
281        self.events
282            .append(
283                section,
284                &Event::Finalized {
285                    digest,
286                    parent,
287                    height,
288                },
289            )
290            .await
291            .wrap_err("failed to append finalized block to journal")?;
292        self.events
293            .sync(section)
294            .await
295            .wrap_err("unable to sync journal")?;
296
297        let cache = self.cache.entry(epoch).or_default();
298        cache.finalized.insert(
299            height,
300            FinalizedBlockInfo {
301                height: block.height(),
302                digest: block.digest(),
303                parent: block.parent_digest(),
304            },
305        );
306        Ok(())
307    }
308
309    pub(super) fn cache_dkg_outcome(
310        &mut self,
311        epoch: Epoch,
312        digest: Digest,
313        output: Output<MinSig, PublicKey>,
314        share: ShareState,
315    ) {
316        self.cache
317            .entry(epoch)
318            .or_default()
319            .dkg_outcomes
320            .insert(digest, (output, share));
321    }
322
323    pub(super) fn get_dkg_outcome(
324        &self,
325        epoch: &Epoch,
326        digest: &Digest,
327    ) -> Option<&(Output<MinSig, PublicKey>, ShareState)> {
328        self.cache
329            .get(epoch)
330            .and_then(|events| events.dkg_outcomes.get(digest))
331    }
332
333    /// Caches the notarized log in memory.
334    ///
335    /// Notably, this does not persist the dealer logs to disk! On restart, it
336    /// is expected that the actor reads the dealer logs from the marshal actor
337    /// and forwards them one-by-one to the state cache.
338    pub(super) fn cache_notarized_block(&mut self, round: &Round, block: Block) {
339        let cache = self.cache.entry(round.epoch).or_default();
340        let log = ReducedBlock::from_block_for_round(&block, round);
341        cache.notarized_blocks.insert(log.digest, log);
342    }
343
344    #[instrument(
345        skip_all,
346        fields(
347            me = %me.public_key(),
348            epoch = %round.epoch,
349        )
350        err,
351    )]
352    pub(super) fn create_dealer_for_round(
353        &mut self,
354        me: PrivateKey,
355        round: Round,
356        share: ShareState,
357        seed: Summary,
358    ) -> eyre::Result<Option<Dealer>> {
359        if round.dealers.position(&me.public_key()).is_none() {
360            return Ok(None);
361        }
362
363        let share = if round.is_full_dkg() {
364            info!("running full DKG ceremony as dealer (new polynomial)");
365            None
366        } else {
367            let inner = share.into_inner();
368            if inner.is_none() {
369                warn!(
370                    "we are a dealer in this round, but we do not have a share, \
371                    which means we likely lost it; will not instantiate a dealer \
372                    instance and hope to get a new share in the next round if we \
373                    are a player"
374                );
375                return Ok(None);
376            }
377            inner
378        };
379
380        let (mut dealer, pub_msg, priv_msgs) = dkg::Dealer::start::<N3f1>(
381            Transcript::resume(seed).noise(b"dealer-rng"),
382            round.info.clone(),
383            me.clone(),
384            share,
385        )
386        .wrap_err("unable to start cryptographic dealer instance")?;
387
388        // Replay stored acks
389        let mut unsent: BTreeMap<PublicKey, DealerPrivMsg> = priv_msgs.into_iter().collect();
390        for (player, ack) in self.acks_for_epoch(round.epoch) {
391            if unsent.contains_key(player)
392                && dealer
393                    .receive_player_ack(player.clone(), ack.clone())
394                    .is_ok()
395            {
396                unsent.remove(player);
397                debug!(%player, "replayed player ack");
398            }
399        }
400
401        Ok(Some(Dealer::new(Some(dealer), pub_msg, unsent)))
402    }
403
404    /// Create a Player for the given epoch, replaying any stored dealer messages.
405    #[instrument(
406        skip_all,
407        fields(
408            epoch = %round.epoch,
409            me = %me.public_key(),
410        )
411        err,
412    )]
413    pub(super) fn create_player_for_round(
414        &self,
415        me: PrivateKey,
416        round: &Round,
417    ) -> eyre::Result<Option<Player>> {
418        if round.players.position(&me.public_key()).is_none() {
419            return Ok(None);
420        }
421
422        let mut player = Player::new(
423            dkg::Player::new(round.info.clone(), me)
424                .wrap_err("unable to start cryptographic player instance")?,
425        );
426
427        // Replay persisted dealer messages
428        for (dealer, (pub_msg, priv_msg)) in self.dealings_for_epoch(round.epoch()) {
429            player.replay(dealer.clone(), pub_msg.clone(), priv_msg.clone());
430            debug!(%dealer, "replayed committed dealer message");
431        }
432
433        Ok(Some(player))
434    }
435
436    pub(super) fn get_latest_finalized_block_for_epoch(
437        &self,
438        epoch: &Epoch,
439    ) -> Option<(&Height, &FinalizedBlockInfo)> {
440        self.cache
441            .get(epoch)
442            .and_then(|cache| cache.finalized.last_key_value())
443    }
444
445    pub(super) fn get_notarized_reduced_block(
446        &mut self,
447        epoch: &Epoch,
448        digest: &Digest,
449    ) -> Option<&ReducedBlock> {
450        self.cache
451            .get(epoch)
452            .and_then(|cache| cache.notarized_blocks.get(digest))
453    }
454
455    #[instrument(skip_all, fields(%up_to_epoch), err)]
456    pub(super) async fn prune(&mut self, up_to_epoch: Epoch) -> eyre::Result<()> {
457        self.events
458            .prune(up_to_epoch.get())
459            .await
460            .wrap_err("unable to prune events journal")?;
461        self.states.retain(|&key, _| key >= up_to_epoch.get());
462        self.states
463            .sync()
464            .await
465            .wrap_err("unable to prune events metadata")?;
466        self.cache.retain(|&epoch, _| epoch >= up_to_epoch);
467        Ok(())
468    }
469}
470
471#[derive(Default)]
472pub(super) struct Builder {
473    initial_state: Option<BoxFuture<'static, eyre::Result<State>>>,
474    partition_prefix: Option<String>,
475}
476
477impl Builder {
478    pub(super) fn initial_state(
479        self,
480        initial_state: impl Future<Output = eyre::Result<State>> + Send + 'static,
481    ) -> Self {
482        Self {
483            initial_state: Some(initial_state.boxed()),
484            ..self
485        }
486    }
487
488    pub(super) fn partition_prefix(self, partition_prefix: &str) -> Self {
489        Self {
490            partition_prefix: Some(partition_prefix.to_string()),
491            ..self
492        }
493    }
494
495    #[instrument(skip_all, err)]
496    pub(super) async fn init<TContext>(self, context: TContext) -> eyre::Result<Storage<TContext>>
497    where
498        TContext: BufferPooler + commonware_runtime::Storage + Clock + Metrics,
499    {
500        let Self {
501            initial_state,
502            partition_prefix,
503        } = self;
504        let partition_prefix =
505            partition_prefix.ok_or_eyre("DKG actors state must have its partition prefix set")?;
506
507        let page_cache = CacheRef::from_pooler(&context, PAGE_SIZE, POOL_CAPACITY);
508
509        let states_metadata_partition = format!("{partition_prefix}_states_metadata");
510
511        let mut states = metadata::Metadata::init(
512            context.with_label("states"),
513            metadata::Config {
514                partition: states_metadata_partition,
515                codec_config: MAXIMUM_VALIDATORS,
516            },
517        )
518        .await
519        .wrap_err("unable to initialize DKG states metadata")?;
520
521        if states.keys().max().is_none() {
522            let initial_state = match initial_state {
523                None => {
524                    return Err(eyre!(
525                        "states metadata was empty and initializer was not set"
526                    ));
527                }
528                Some(initial_state) => initial_state
529                    .await
530                    .wrap_err("failed constructing initial state to populate storage")?,
531            };
532            states
533                .put_sync(initial_state.epoch.get(), initial_state)
534                .await
535                .wrap_err("unable to write initial state to metadata")?;
536        }
537
538        let current = states
539            .keys()
540            .max()
541            .map(|epoch| {
542                states
543                    .get(epoch)
544                    .expect("state at keys iterator must exist")
545                    .clone()
546            })
547            .expect("states storage must contain a state after initialization");
548
549        let events = segmented::variable::Journal::init(
550            context.with_label("events"),
551            segmented::variable::Config {
552                partition: format!("{partition_prefix}_events"),
553                compression: None,
554                codec_config: MAXIMUM_VALIDATORS,
555                page_cache,
556                write_buffer: WRITE_BUFFER,
557            },
558        )
559        .await
560        .expect("should be able to initialize events journal");
561
562        // Replay msgs to populate epoch caches
563        let mut cache = BTreeMap::<Epoch, Events>::new();
564        {
565            let replay = events
566                .replay(0, 0, READ_BUFFER)
567                .await
568                .wrap_err("unable to start a replay stream to populate events cache")?;
569            futures::pin_mut!(replay);
570
571            while let Some(result) = replay.next().await {
572                let (section, _, _, event) =
573                    result.wrap_err("unable to read entry in replay stream")?;
574                let epoch = Epoch::new(section);
575                let events = cache.entry(epoch).or_default();
576                events.insert(event);
577            }
578        }
579
580        Ok(Storage {
581            states,
582            events,
583            current,
584            cache,
585        })
586    }
587}
588
589/// Wrapper around a DKG share that tracks how it is stored at rest.
590///
591/// The `Option<Share>` is inside the enum so that a future encrypted variant
592/// can hide whether a share is present at all.
593///
594/// Currently only plaintext storage is supported, but additional variants
595/// (e.g. encrypted-at-rest) can be added in the future.
596#[derive(Clone, Debug, PartialEq, Eq)]
597pub(super) enum ShareState {
598    Plaintext(Option<Share>),
599}
600
601impl ShareState {
602    pub(super) fn into_inner(self) -> Option<Share> {
603        match self {
604            Self::Plaintext(share) => share,
605        }
606    }
607}
608
609impl EncodeSize for ShareState {
610    fn encode_size(&self) -> usize {
611        match self {
612            Self::Plaintext(share) => 1 + share.encode_size(),
613        }
614    }
615}
616
617impl Write for ShareState {
618    fn write(&self, buf: &mut impl bytes::BufMut) {
619        match self {
620            Self::Plaintext(share) => {
621                0u8.write(buf);
622                share.write(buf);
623            }
624        }
625    }
626}
627
628impl Read for ShareState {
629    type Cfg = ();
630
631    fn read_cfg(
632        buf: &mut impl bytes::Buf,
633        _cfg: &Self::Cfg,
634    ) -> Result<Self, commonware_codec::Error> {
635        let tag = u8::read(buf)?;
636        match tag {
637            0 => Ok(Self::Plaintext(ReadExt::read(buf)?)),
638            other => Err(commonware_codec::Error::InvalidEnum(other)),
639        }
640    }
641}
642
643/// The outcome of a DKG ceremony.
644#[derive(Clone, Debug, PartialEq, Eq)]
645pub(super) struct State {
646    pub(super) epoch: Epoch,
647    pub(super) seed: Summary,
648    pub(super) output: Output<MinSig, PublicKey>,
649    pub(super) share: ShareState,
650    pub(super) players: ordered::Set<PublicKey>,
651    pub(super) is_full_dkg: bool,
652}
653
654impl State {
655    /// Returns the dealers active in the DKG round tracked by this state.
656    pub(super) fn dealers(&self) -> &ordered::Set<PublicKey> {
657        self.output.players()
658    }
659
660    /// Returns the players active in the DKG round tracked by this state.
661    pub(super) fn players(&self) -> &ordered::Set<PublicKey> {
662        &self.players
663    }
664
665    /// Placeholder for the legacy `syncers` field.
666    fn legacy_syncers(&self) -> ordered::Set<PublicKey> {
667        ordered::Set::default()
668    }
669}
670
671impl EncodeSize for State {
672    fn encode_size(&self) -> usize {
673        self.epoch.encode_size()
674            + self.seed.encode_size()
675            + self.output.encode_size()
676            + self.share.encode_size()
677            + self.players.encode_size()
678            // Until the next state migration, the unused syncers field must
679            // still be written to remain backwards compatible.
680            + self.legacy_syncers().encode_size()
681            + self.is_full_dkg.encode_size()
682    }
683}
684
685impl Write for State {
686    fn write(&self, buf: &mut impl bytes::BufMut) {
687        self.epoch.write(buf);
688        self.seed.write(buf);
689        self.output.write(buf);
690        self.share.write(buf);
691        self.players.write(buf);
692        // Until the next state migration, the unused syncers field must
693        // still be written to remain backwards compatible.
694        self.legacy_syncers().write(buf);
695        self.is_full_dkg.write(buf);
696    }
697}
698
699impl Read for State {
700    type Cfg = NonZeroU32;
701
702    fn read_cfg(
703        buf: &mut impl bytes::Buf,
704        cfg: &Self::Cfg,
705    ) -> Result<Self, commonware_codec::Error> {
706        let epoch = ReadExt::read(buf)?;
707        let seed = ReadExt::read(buf)?;
708        let output = Read::read_cfg(buf, &(*cfg, ModeVersion::v0()))?;
709        let share = ReadExt::read(buf)?;
710        let players = Read::read_cfg(buf, &(RangeCfg::from(1..=(u16::MAX as usize)), ()))?;
711
712        // Until the next state migration, the unused syncers field must still be read to remain backwards compatible.
713        ordered::Set::<PublicKey>::read_cfg(buf, &(RangeCfg::from(0..=(u16::MAX as usize)), ()))?;
714
715        let is_full_dkg = ReadExt::read(buf)?;
716
717        Ok(Self {
718            epoch,
719            seed,
720            output,
721            share,
722            players,
723            is_full_dkg,
724        })
725    }
726}
727
728#[expect(
729    dead_code,
730    reason = "tracking this data is virtually free and might become useful later"
731)]
732#[derive(Clone, Debug)]
733pub(super) struct FinalizedBlockInfo {
734    pub(super) height: Height,
735    pub(super) digest: Digest,
736    pub(super) parent: Digest,
737}
738
739/// A cache of all events that transpired during a given epoch.
740#[derive(Debug, Default)]
741struct Events {
742    acks: BTreeMap<PublicKey, PlayerAck<PublicKey>>,
743    dealings: BTreeMap<PublicKey, (DealerPubMsg<MinSig>, DealerPrivMsg)>,
744    logs: BTreeMap<PublicKey, dkg::DealerLog<MinSig, PublicKey>>,
745    finalized: BTreeMap<Height, FinalizedBlockInfo>,
746
747    notarized_blocks: HashMap<Digest, ReducedBlock>,
748    dkg_outcomes: HashMap<Digest, (Output<MinSig, PublicKey>, ShareState)>,
749}
750
751impl Events {
752    fn insert(&mut self, event: Event) {
753        match event {
754            Event::Dealing {
755                dealer: public_key,
756                public_msg,
757                private_msg,
758            } => {
759                self.dealings.insert(public_key, (public_msg, private_msg));
760            }
761            Event::Ack {
762                player: public_key,
763                ack,
764            } => {
765                self.acks.insert(public_key, ack);
766            }
767            Event::Log { dealer, log } => {
768                self.logs.insert(dealer, log);
769            }
770            Event::Finalized {
771                digest,
772                parent,
773                height,
774            } => {
775                self.finalized.insert(
776                    height,
777                    FinalizedBlockInfo {
778                        height,
779                        digest,
780                        parent,
781                    },
782                );
783            }
784        }
785    }
786}
787
788enum Event {
789    /// A message received from a dealer (as a player).
790    Dealing {
791        dealer: PublicKey,
792        public_msg: DealerPubMsg<MinSig>,
793        private_msg: DealerPrivMsg,
794    },
795    /// An ack (of a dealing) received from a player (as a dealer).
796    Ack {
797        player: PublicKey,
798        ack: PlayerAck<PublicKey>,
799    },
800    /// A dealer log read from a finalized block.
801    Log {
802        dealer: PublicKey,
803        log: dkg::DealerLog<MinSig, PublicKey>,
804    },
805    /// Information of finalized block observed by the actor.
806    Finalized {
807        digest: Digest,
808        parent: Digest,
809        height: Height,
810    },
811}
812
813impl EncodeSize for Event {
814    fn encode_size(&self) -> usize {
815        1 + match self {
816            Self::Dealing {
817                dealer: public_key,
818                public_msg,
819                private_msg,
820            } => public_key.encode_size() + public_msg.encode_size() + private_msg.encode_size(),
821            Self::Ack {
822                player: public_key,
823                ack,
824            } => public_key.encode_size() + ack.encode_size(),
825            Self::Log { dealer, log } => dealer.encode_size() + log.encode_size(),
826            Self::Finalized {
827                digest,
828                parent,
829                height,
830            } => digest.encode_size() + parent.encode_size() + height.encode_size(),
831        }
832    }
833}
834
835impl Write for Event {
836    fn write(&self, buf: &mut impl bytes::BufMut) {
837        match self {
838            Self::Dealing {
839                dealer: public_key,
840                public_msg,
841                private_msg,
842            } => {
843                0u8.write(buf);
844                public_key.write(buf);
845                public_msg.write(buf);
846                private_msg.write(buf);
847            }
848            Self::Ack {
849                player: public_key,
850                ack,
851            } => {
852                1u8.write(buf);
853                public_key.write(buf);
854                ack.write(buf);
855            }
856            Self::Log { dealer, log } => {
857                2u8.write(buf);
858                dealer.write(buf);
859                log.write(buf);
860            }
861            Self::Finalized {
862                digest,
863                parent,
864                height,
865            } => {
866                3u8.write(buf);
867                digest.write(buf);
868                parent.write(buf);
869                height.write(buf);
870            }
871        }
872    }
873}
874
875impl Read for Event {
876    type Cfg = NonZeroU32;
877
878    fn read_cfg(
879        buf: &mut impl bytes::Buf,
880        cfg: &Self::Cfg,
881    ) -> Result<Self, commonware_codec::Error> {
882        let tag = u8::read(buf)?;
883        match tag {
884            0 => Ok(Self::Dealing {
885                dealer: ReadExt::read(buf)?,
886                public_msg: Read::read_cfg(buf, cfg)?,
887                private_msg: ReadExt::read(buf)?,
888            }),
889            1 => Ok(Self::Ack {
890                player: ReadExt::read(buf)?,
891                ack: ReadExt::read(buf)?,
892            }),
893            2 => Ok(Self::Log {
894                dealer: ReadExt::read(buf)?,
895                log: Read::read_cfg(buf, &NZU32!(u16::MAX as u32))?,
896            }),
897            3 => Ok(Self::Finalized {
898                digest: ReadExt::read(buf)?,
899                parent: ReadExt::read(buf)?,
900                height: ReadExt::read(buf)?,
901            }),
902            other => Err(commonware_codec::Error::InvalidEnum(other)),
903        }
904    }
905}
906
907/// Internal state for a dealer in the current round.
908pub(super) struct Dealer {
909    /// The inner cryptographic dealer state. Is `None` if
910    /// the dealer log was already finalized so that it is not finalized again.
911    dealer: Option<dkg::Dealer<MinSig, PrivateKey>>,
912
913    /// The message containing the generated commitment by this dealer, which
914    /// is shared with all players and posted on chain.
915    pub_msg: DealerPubMsg<MinSig>,
916
917    /// A map of players that we have not yet successfully sent their private
918    /// messages to (containing their share generated by this dealer).
919    unsent: BTreeMap<PublicKey, DealerPrivMsg>,
920
921    /// The finalized, signed log of this dealer. Initially `None` and set after
922    /// the middle point of the epoch. Set to `None` again after this node
923    /// observes it dealer log on chain to not post it again.
924    finalized: Option<SignedDealerLog<MinSig, PrivateKey>>,
925}
926
927impl Dealer {
928    pub(super) const fn new(
929        dealer: Option<dkg::Dealer<MinSig, PrivateKey>>,
930        pub_msg: DealerPubMsg<MinSig>,
931        unsent: BTreeMap<PublicKey, DealerPrivMsg>,
932    ) -> Self {
933        Self {
934            dealer,
935            pub_msg,
936            unsent,
937            finalized: None,
938        }
939    }
940
941    /// Handle an incoming ack from a player.
942    ///
943    /// If the ack is valid and new, persists it to storage.
944    /// Returns true if the ack was successfully processed.
945    pub(super) async fn receive_ack<TContext>(
946        &mut self,
947        storage: &mut Storage<TContext>,
948        epoch: Epoch,
949        player: PublicKey,
950        ack: PlayerAck<PublicKey>,
951    ) -> eyre::Result<()>
952    where
953        TContext: commonware_runtime::Storage + Clock + Metrics,
954    {
955        if !self.unsent.contains_key(&player) {
956            bail!("already received an ack from `{player}`");
957        }
958        match &mut self.dealer {
959            Some(dealer) => {
960                dealer
961                    .receive_player_ack(player.clone(), ack.clone())
962                    .wrap_err("unable to receive player ack")?;
963                self.unsent.remove(&player);
964                storage
965                    .append_ack(epoch, player.clone(), ack.clone())
966                    .await
967                    .wrap_err("unable to append ack to journal")?;
968            }
969            None => bail!("dealer was already finalized, dropping ack of player `{player}`"),
970        }
971        Ok(())
972    }
973
974    /// Finalize the dealer and produce a signed log for inclusion in a block.
975    pub(super) fn finalize(&mut self) {
976        if self.finalized.is_some() {
977            return;
978        }
979
980        // Even after the finalized_log is taken, we won't attempt to finalize
981        // again because the dealer will be None.
982        if let Some(dealer) = self.dealer.take() {
983            let log = dealer.finalize::<N3f1>();
984            self.finalized = Some(log);
985        }
986    }
987
988    /// Returns a clone of the finalized log if it exists.
989    pub(super) fn finalized(&self) -> Option<SignedDealerLog<MinSig, PrivateKey>> {
990        self.finalized.clone()
991    }
992
993    /// Takes and returns the finalized log, leaving None in its place.
994    pub(super) const fn take_finalized(&mut self) -> Option<SignedDealerLog<MinSig, PrivateKey>> {
995        self.finalized.take()
996    }
997
998    /// Returns shares to distribute to players.
999    ///
1000    /// Returns an iterator of (player, pub_msg, priv_msg) tuples for each player
1001    /// that hasn't yet acknowledged their share.
1002    pub(super) fn shares_to_distribute(
1003        &self,
1004    ) -> impl Iterator<Item = (PublicKey, DealerPubMsg<MinSig>, DealerPrivMsg)> + '_ {
1005        self.unsent
1006            .iter()
1007            .map(|(player, priv_msg)| (player.clone(), self.pub_msg.clone(), priv_msg.clone()))
1008    }
1009}
1010
1011#[derive(Clone, Debug)]
1012pub(super) struct Round {
1013    epoch: Epoch,
1014    info: dkg::Info<MinSig, PublicKey>,
1015    dealers: ordered::Set<PublicKey>,
1016    players: ordered::Set<PublicKey>,
1017    is_full_dkg: bool,
1018}
1019
1020impl Round {
1021    pub(super) fn from_state(state: &State, namespace: &[u8]) -> Self {
1022        // For full DKG, don't pass the previous output - this creates a new polynomial
1023        let previous_output = if state.is_full_dkg {
1024            None
1025        } else {
1026            Some(state.output.clone())
1027        };
1028
1029        let dealers = state.dealers().clone();
1030        let players = state.players().clone();
1031
1032        Self {
1033            epoch: state.epoch,
1034            info: Info::new::<N3f1>(
1035                namespace,
1036                state.epoch.get(),
1037                previous_output,
1038                Mode::NonZeroCounter,
1039                dealers.clone(),
1040                players.clone(),
1041            )
1042            .expect("a DKG round must always be initializable given some epoch state"),
1043            dealers,
1044            players,
1045            is_full_dkg: state.is_full_dkg,
1046        }
1047    }
1048
1049    pub(super) fn info(&self) -> &dkg::Info<MinSig, PublicKey> {
1050        &self.info
1051    }
1052
1053    pub(super) fn epoch(&self) -> Epoch {
1054        self.epoch
1055    }
1056
1057    pub(super) fn dealers(&self) -> &ordered::Set<PublicKey> {
1058        &self.dealers
1059    }
1060
1061    pub(super) fn players(&self) -> &ordered::Set<PublicKey> {
1062        &self.players
1063    }
1064
1065    pub(super) fn is_full_dkg(&self) -> bool {
1066        self.is_full_dkg
1067    }
1068}
1069
1070/// Internal state for a player in the current round.
1071pub(super) struct Player {
1072    player: dkg::Player<MinSig, PrivateKey>,
1073    /// Acks we've generated, keyed by dealer. Once we generate an ack for a dealer,
1074    /// we will not generate a different one (to avoid conflicting votes).
1075    acks: BTreeMap<PublicKey, PlayerAck<PublicKey>>,
1076}
1077
1078impl Player {
1079    pub(super) const fn new(player: dkg::Player<MinSig, PrivateKey>) -> Self {
1080        Self {
1081            player,
1082            acks: BTreeMap::new(),
1083        }
1084    }
1085
1086    /// Handle an incoming dealer message.
1087    ///
1088    /// If this is a new valid dealer message, persists it to storage before returning.
1089    pub(super) async fn receive_dealing<TContext>(
1090        &mut self,
1091        storage: &mut Storage<TContext>,
1092        epoch: Epoch,
1093        dealer: PublicKey,
1094        pub_msg: DealerPubMsg<MinSig>,
1095        priv_msg: DealerPrivMsg,
1096    ) -> eyre::Result<PlayerAck<PublicKey>>
1097    where
1098        TContext: commonware_runtime::Storage + Clock + Metrics,
1099    {
1100        // If we've already generated an ack, return the cached version
1101        if let Some(ack) = self.acks.get(&dealer) {
1102            return Ok(ack.clone());
1103        }
1104
1105        // Otherwise generate a new ack
1106        let ack = self
1107            .player
1108            .dealer_message::<N3f1>(dealer.clone(), pub_msg.clone(), priv_msg.clone())
1109            // FIXME(janis): it would be great to know why exactly that is not the case.
1110            .ok_or_eyre(
1111                "applying dealer message to player instance did not result in a usable ack",
1112            )?;
1113        storage
1114            .append_dealing(epoch, dealer.clone(), pub_msg, priv_msg)
1115            .await
1116            .wrap_err("unable to append dealing to journal")?;
1117        self.acks.insert(dealer, ack.clone());
1118        Ok(ack)
1119    }
1120
1121    /// Replay an already-persisted dealer message (updates in-memory state only).
1122    fn replay(
1123        &mut self,
1124        dealer: PublicKey,
1125        pub_msg: DealerPubMsg<MinSig>,
1126        priv_msg: DealerPrivMsg,
1127    ) {
1128        if self.acks.contains_key(&dealer) {
1129            return;
1130        }
1131        if let Some(ack) = self
1132            .player
1133            .dealer_message::<N3f1>(dealer.clone(), pub_msg, priv_msg)
1134        {
1135            self.acks.insert(dealer, ack);
1136        }
1137    }
1138
1139    /// Finalize the player's participation in the DKG round.
1140    pub(super) fn finalize(
1141        self,
1142        rng: &mut impl rand_core::CryptoRngCore,
1143        logs: dkg::Logs<MinSig, PublicKey, N3f1>,
1144        strategy: &impl Strategy,
1145    ) -> Result<(Output<MinSig, PublicKey>, Share), dkg::Error> {
1146        self.player
1147            .finalize::<N3f1, commonware_cryptography::ed25519::Batch>(rng, logs, strategy)
1148    }
1149}
1150
1151/// Contains a block's height, parent, digest, and dealer log, if there was one.
1152#[derive(Clone, Debug)]
1153pub(super) struct ReducedBlock {
1154    // The block height.
1155    pub(super) height: Height,
1156
1157    // The block parent.
1158    pub(super) parent: Digest,
1159
1160    // The block digest (hash).
1161    pub(super) digest: Digest,
1162
1163    // The (dealer, log) tuple, if a block contained a signed dealear log.
1164    pub(super) log: Option<(PublicKey, dkg::DealerLog<MinSig, PublicKey>)>,
1165}
1166
1167impl ReducedBlock {
1168    pub(super) fn from_block_for_round(block: &Block, round: &Round) -> Self {
1169        let log = if block.header().extra_data().is_empty() {
1170            None
1171        } else {
1172            dkg::SignedDealerLog::<MinSig, PrivateKey>::read_cfg(
1173                &mut block.header().extra_data().as_ref(),
1174                &NZU32!(round.players.len() as u32),
1175            )
1176            .inspect(|_| {
1177                info!(
1178                    height = %block.height(),
1179                    digest = %block.digest(),
1180                    "found dealer log in block"
1181                )
1182            })
1183            .inspect_err(|error| {
1184                warn!(
1185                    %error,
1186                    "block header extraData had data, but it could not be read as \
1187                    a signed dealer log",
1188                )
1189            })
1190            .ok()
1191            .and_then(|log| match log.check(&round.info) {
1192                Some((dealer, log)) => Some((dealer, log)),
1193                None => {
1194                    // TODO(janis): some more fidelity here would be nice.
1195                    warn!("log failed check against current round");
1196                    None
1197                }
1198            })
1199        };
1200        Self {
1201            height: block.height(),
1202            parent: block.parent(),
1203            digest: block.digest(),
1204            log,
1205        }
1206    }
1207}
1208
1209#[cfg(test)]
1210mod tests {
1211    use super::*;
1212    use commonware_codec::Encode as _;
1213    use commonware_cryptography::{
1214        bls12381::{dkg, primitives::sharing::Mode},
1215        ed25519::PrivateKey,
1216        transcript::Summary,
1217    };
1218    use commonware_math::algebra::Random as _;
1219    use commonware_runtime::{Runner as _, deterministic};
1220    use commonware_utils::TryFromIterator as _;
1221
1222    fn make_test_state(rng: &mut impl rand_core::CryptoRngCore, epoch: u64) -> State {
1223        let mut keys: Vec<_> = (0..3)
1224            .map(|i| PrivateKey::from_seed(i + epoch * 100))
1225            .collect();
1226
1227        keys.sort_by_key(|k| k.public_key());
1228
1229        let pubkeys = ordered::Set::try_from_iter(keys.iter().map(|k| k.public_key())).unwrap();
1230
1231        let (output, _shares) =
1232            dkg::deal::<_, _, N3f1>(&mut *rng, Mode::NonZeroCounter, pubkeys.clone()).unwrap();
1233
1234        State {
1235            epoch: Epoch::new(epoch),
1236            seed: Summary::random(rng),
1237            output,
1238            share: ShareState::Plaintext(None),
1239            players: pubkeys,
1240            is_full_dkg: false,
1241        }
1242    }
1243
1244    #[test]
1245    fn state_codec_round_trip() {
1246        let executor = deterministic::Runner::default();
1247        executor.start(|mut context| async move {
1248            let state = make_test_state(&mut context, 0);
1249            let mut bytes = state.encode();
1250            let decoded = State::read_cfg(&mut bytes, &NZU32!(u32::MAX)).unwrap();
1251            assert_eq!(state, decoded);
1252        });
1253    }
1254
1255    #[test]
1256    fn state_codec_read_ignores_legacy_populated_syncers() {
1257        let executor = deterministic::Runner::default();
1258        executor.start(|mut context| async move {
1259            let state = make_test_state(&mut context, 0);
1260
1261            // Serialize using the legacy layout: same field order as today, but
1262            // with a non-empty syncers set in place of `legacy_syncers()`.
1263            let legacy_syncers = state.players.clone();
1264            let mut bytes = Vec::new();
1265            state.epoch.write(&mut bytes);
1266            state.seed.write(&mut bytes);
1267            state.output.write(&mut bytes);
1268            state.share.write(&mut bytes);
1269            state.players.write(&mut bytes);
1270
1271            // Legacy slot that is still written/read but ignored
1272            legacy_syncers.write(&mut bytes);
1273
1274            state.is_full_dkg.write(&mut bytes);
1275
1276            let decoded = State::read_cfg(&mut bytes.as_slice(), &NZU32!(u32::MAX)).unwrap();
1277            assert_eq!(state, decoded);
1278        });
1279    }
1280
1281    #[track_caller]
1282    fn assert_roundtrip(original: &ShareState) {
1283        use commonware_codec::Encode as _;
1284        let encoded = original.encode();
1285        let decoded = ShareState::read_cfg(&mut encoded.as_ref(), &()).unwrap();
1286        assert_eq!(original, &decoded);
1287    }
1288
1289    #[test]
1290    fn share_state_roundtrip_plaintext_none() {
1291        assert_roundtrip(&ShareState::Plaintext(None));
1292    }
1293
1294    #[test]
1295    fn share_state_roundtrip_plaintext_some() {
1296        use rand_08::SeedableRng as _;
1297        let mut rng = rand_08::rngs::StdRng::seed_from_u64(42);
1298
1299        let keys = std::iter::repeat_with(|| PrivateKey::random(&mut rng))
1300            .take(3)
1301            .collect::<Vec<_>>();
1302        let pubkeys = ordered::Set::try_from_iter(keys.iter().map(|k| k.public_key())).unwrap();
1303
1304        let (_output, shares) =
1305            dkg::deal::<MinSig, _, N3f1>(&mut rng, Mode::NonZeroCounter, pubkeys).unwrap();
1306
1307        let share = shares.into_iter().next().unwrap().1;
1308        assert_roundtrip(&ShareState::Plaintext(Some(share)));
1309    }
1310}