Skip to main content

tempo_commonware_node/dkg/manager/actor/
state.rs

1use std::{
2    collections::{BTreeMap, HashMap},
3    net::SocketAddr,
4    num::{NonZeroU16, NonZeroU32, NonZeroUsize},
5};
6
7use alloy_consensus::BlockHeader as _;
8use commonware_codec::{EncodeSize, RangeCfg, Read, ReadExt, Write};
9use commonware_consensus::{
10    Block as _, Heightable as _,
11    types::{Epoch, Height},
12};
13use commonware_cryptography::{
14    Signer as _,
15    bls12381::{
16        dkg::{self, DealerPrivMsg, DealerPubMsg, Info, Output, PlayerAck, SignedDealerLog},
17        primitives::{
18            group::Share,
19            sharing::{Mode, ModeVersion},
20            variant::MinSig,
21        },
22    },
23    ed25519::{PrivateKey, PublicKey},
24    transcript::{Summary, Transcript},
25};
26use commonware_parallel::Strategy;
27use commonware_runtime::{BufferPooler, Clock, Metrics, buffer::paged::CacheRef};
28use commonware_storage::{
29    journal::{contiguous, contiguous::Reader as _, segmented},
30    metadata,
31};
32use commonware_utils::{N3f1, NZU16, NZU32, NZU64, NZUsize, ordered};
33use eyre::{OptionExt, WrapErr as _, bail, eyre};
34use futures::{FutureExt as _, StreamExt as _, future::BoxFuture};
35use tracing::{debug, info, instrument, warn};
36
37use crate::consensus::{Digest, block::Block};
38
39const PAGE_SIZE: NonZeroU16 = NZU16!(1 << 12);
40const POOL_CAPACITY: NonZeroUsize = NZUsize!(1 << 13);
41const WRITE_BUFFER: NonZeroUsize = NZUsize!(1 << 12);
42const READ_BUFFER: NonZeroUsize = NZUsize!(1 << 20);
43
44/// The maximum number of validators ever permitted in the DKG ceremony.
45///
46/// u16::MAX is 2^16-1 validators, i.e. 65536, which is probably more than
47/// we will ever need. An alternative would be u8::MAX but that feels a bit
48/// too limited. There is extremely little cost doing u16::MAX instead.
49const MAXIMUM_VALIDATORS: NonZeroU32 = NZU32!(u16::MAX as u32);
50
51pub(super) fn builder() -> Builder {
52    Builder::default()
53}
54
55pub(super) struct Storage<TContext>
56where
57    TContext: commonware_runtime::Storage + Clock + Metrics,
58{
59    states: metadata::Metadata<TContext, u64, State>,
60    events: segmented::variable::Journal<TContext, Event>,
61
62    current: State,
63    cache: BTreeMap<Epoch, Events>,
64}
65
66impl<TContext> Storage<TContext>
67where
68    TContext: commonware_runtime::Storage + Clock + Metrics,
69{
70    /// Returns all player acknowledgments received during the given epoch.
71    fn acks_for_epoch(
72        &self,
73        epoch: Epoch,
74    ) -> impl Iterator<Item = (&PublicKey, &PlayerAck<PublicKey>)> {
75        self.cache
76            .get(&epoch)
77            .into_iter()
78            .flat_map(|cache| cache.acks.iter())
79    }
80
81    /// Returns all dealings received during the given epoch.
82    fn dealings_for_epoch(
83        &self,
84        epoch: Epoch,
85    ) -> impl Iterator<Item = (&PublicKey, &(DealerPubMsg<MinSig>, DealerPrivMsg))> {
86        self.cache
87            .get(&epoch)
88            .into_iter()
89            .flat_map(|cache| cache.dealings.iter())
90    }
91
92    /// Returns all dealings received during the given epoch.
93    pub(super) fn logs_for_epoch(
94        &self,
95        epoch: Epoch,
96    ) -> impl Iterator<Item = (&PublicKey, &dkg::DealerLog<MinSig, PublicKey>)> {
97        self.cache
98            .get(&epoch)
99            .into_iter()
100            .flat_map(|cache| cache.logs.iter())
101    }
102
103    /// Returns the DKG outcome for the current epoch.
104    pub(super) fn current(&self) -> State {
105        self.current.clone()
106    }
107
108    /// Persists the outcome of a DKG ceremony to state
109    pub(super) async fn set_state(&mut self, state: State) -> eyre::Result<()> {
110        if let Some(old) = self.states.put(state.epoch.get(), state.clone()) {
111            warn!(epoch = %old.epoch, "overwriting existing state");
112        }
113        self.states.sync().await.wrap_err("failed writing state")?;
114        self.current = state;
115        Ok(())
116    }
117
118    /// Append a player ACK to the journal.
119    #[instrument(
120        skip_all,
121        fields(
122            %epoch,
123            %player,
124        ),
125        err,
126    )]
127    async fn append_ack(
128        &mut self,
129        epoch: Epoch,
130        player: PublicKey,
131        ack: PlayerAck<PublicKey>,
132    ) -> eyre::Result<()> {
133        if self
134            .cache
135            .get(&epoch)
136            .is_some_and(|events| events.acks.contains_key(&player))
137        {
138            info!(%player, %epoch, "ack for player already found in cache, dropping");
139            return Ok(());
140        }
141
142        let section = epoch.get();
143        self.events
144            .append(
145                section,
146                &Event::Ack {
147                    player: player.clone(),
148                    ack: ack.clone(),
149                },
150            )
151            .await
152            .wrap_err("unable to write event to storage")?;
153
154        self.events
155            .sync(section)
156            .await
157            .wrap_err("unable to sync events journal")?;
158
159        self.cache
160            .entry(epoch)
161            .or_default()
162            .acks
163            .insert(player, ack);
164
165        Ok(())
166    }
167
168    /// Append a dealer's dealing to the journal.
169    #[instrument(
170        skip_all,
171        fields(
172            %epoch,
173            %dealer,
174        ),
175        err,
176    )]
177    async fn append_dealing(
178        &mut self,
179        epoch: Epoch,
180        dealer: PublicKey,
181        pub_msg: DealerPubMsg<MinSig>,
182        priv_msg: DealerPrivMsg,
183    ) -> eyre::Result<()> {
184        if self
185            .cache
186            .get(&epoch)
187            .is_some_and(|events| events.dealings.contains_key(&dealer))
188        {
189            info!(%dealer, %epoch, "dealing of dealer already found in cache, dropping");
190            return Ok(());
191        }
192
193        let section = epoch.get();
194        self.events
195            .append(
196                section,
197                &Event::Dealing {
198                    dealer: dealer.clone(),
199                    public_msg: pub_msg.clone(),
200                    private_msg: priv_msg.clone(),
201                },
202            )
203            .await
204            .wrap_err("unable to write event to storage")?;
205
206        self.events
207            .sync(section)
208            .await
209            .wrap_err("unable to sync events journal")?;
210
211        self.cache
212            .entry(epoch)
213            .or_default()
214            .dealings
215            .insert(dealer, (pub_msg, priv_msg));
216
217        Ok(())
218    }
219
220    /// Appends a dealer log to the journal
221    pub(super) async fn append_dealer_log(
222        &mut self,
223        epoch: Epoch,
224        dealer: PublicKey,
225        log: dkg::DealerLog<MinSig, PublicKey>,
226    ) -> eyre::Result<()> {
227        if self
228            .cache
229            .get(&epoch)
230            .is_some_and(|events| events.logs.contains_key(&dealer))
231        {
232            info!(
233                %dealer,
234                %epoch,
235                "dealer log already found in cache; dropping"
236            );
237            return Ok(());
238        }
239
240        let section = epoch.get();
241        self.events
242            .append(
243                section,
244                &Event::Log {
245                    dealer: dealer.clone(),
246                    log: log.clone(),
247                },
248            )
249            .await
250            .wrap_err("failed to append log to journal")?;
251        self.events
252            .sync(section)
253            .await
254            .wrap_err("unable to sync journal")?;
255
256        let cache = self.cache.entry(epoch).or_default();
257        cache.logs.insert(dealer, log);
258        Ok(())
259    }
260
261    /// Appends the height, digest, and parent of the finalized block to the journal.
262    pub(super) async fn append_finalized_block(
263        &mut self,
264        epoch: Epoch,
265        block: Block,
266    ) -> eyre::Result<()> {
267        let height = block.height();
268        let digest = block.digest();
269        let parent = block.parent();
270        if self
271            .cache
272            .get(&epoch)
273            .is_some_and(|events| events.finalized.contains_key(&height))
274        {
275            info!(
276                %height,
277                %digest,
278                %parent,
279                "finalized block was already found in cache; dropping",
280            );
281            return Ok(());
282        }
283
284        let section = epoch.get();
285        self.events
286            .append(
287                section,
288                &Event::Finalized {
289                    digest,
290                    parent,
291                    height,
292                },
293            )
294            .await
295            .wrap_err("failed to append finalized block to journal")?;
296        self.events
297            .sync(section)
298            .await
299            .wrap_err("unable to sync journal")?;
300
301        let cache = self.cache.entry(epoch).or_default();
302        cache.finalized.insert(
303            height,
304            FinalizedBlockInfo {
305                height: block.height(),
306                digest: block.digest(),
307                parent: block.parent_digest(),
308            },
309        );
310        Ok(())
311    }
312
313    pub(super) fn cache_dkg_outcome(
314        &mut self,
315        epoch: Epoch,
316        digest: Digest,
317        output: Output<MinSig, PublicKey>,
318        share: ShareState,
319    ) {
320        self.cache
321            .entry(epoch)
322            .or_default()
323            .dkg_outcomes
324            .insert(digest, (output, share));
325    }
326
327    pub(super) fn get_dkg_outcome(
328        &self,
329        epoch: &Epoch,
330        digest: &Digest,
331    ) -> Option<&(Output<MinSig, PublicKey>, ShareState)> {
332        self.cache
333            .get(epoch)
334            .and_then(|events| events.dkg_outcomes.get(digest))
335    }
336
337    /// Caches the notarized log in memory.
338    ///
339    /// Notably, this does not persist the dealer logs to disk! On restart, it
340    /// is expected that the actor reads the dealer logs from the marshal actor
341    /// and forwards them one-by-one to the state cache.
342    pub(super) fn cache_notarized_block(&mut self, round: &Round, block: Block) {
343        let cache = self.cache.entry(round.epoch).or_default();
344        let log = ReducedBlock::from_block_for_round(&block, round);
345        cache.notarized_blocks.insert(log.digest, log);
346    }
347
348    #[instrument(
349        skip_all,
350        fields(
351            me = %me.public_key(),
352            epoch = %round.epoch,
353        )
354        err,
355    )]
356    pub(super) fn create_dealer_for_round(
357        &mut self,
358        me: PrivateKey,
359        round: Round,
360        share: ShareState,
361        seed: Summary,
362    ) -> eyre::Result<Option<Dealer>> {
363        if round.dealers.position(&me.public_key()).is_none() {
364            return Ok(None);
365        }
366
367        let share = if round.is_full_dkg() {
368            info!("running full DKG ceremony as dealer (new polynomial)");
369            None
370        } else {
371            let inner = share.into_inner();
372            if inner.is_none() {
373                warn!(
374                    "we are a dealer in this round, but we do not have a share, \
375                    which means we likely lost it; will not instantiate a dealer \
376                    instance and hope to get a new share in the next round if we \
377                    are a player"
378                );
379                return Ok(None);
380            }
381            inner
382        };
383
384        let (mut dealer, pub_msg, priv_msgs) = dkg::Dealer::start::<N3f1>(
385            Transcript::resume(seed).noise(b"dealer-rng"),
386            round.info.clone(),
387            me.clone(),
388            share,
389        )
390        .wrap_err("unable to start cryptographic dealer instance")?;
391
392        // Replay stored acks
393        let mut unsent: BTreeMap<PublicKey, DealerPrivMsg> = priv_msgs.into_iter().collect();
394        for (player, ack) in self.acks_for_epoch(round.epoch) {
395            if unsent.contains_key(player)
396                && dealer
397                    .receive_player_ack(player.clone(), ack.clone())
398                    .is_ok()
399            {
400                unsent.remove(player);
401                debug!(%player, "replayed player ack");
402            }
403        }
404
405        Ok(Some(Dealer::new(Some(dealer), pub_msg, unsent)))
406    }
407
408    /// Create a Player for the given epoch, replaying any stored dealer messages.
409    #[instrument(
410        skip_all,
411        fields(
412            epoch = %round.epoch,
413            me = %me.public_key(),
414        )
415        err,
416    )]
417    pub(super) fn create_player_for_round(
418        &self,
419        me: PrivateKey,
420        round: &Round,
421    ) -> eyre::Result<Option<Player>> {
422        if round.players.position(&me.public_key()).is_none() {
423            return Ok(None);
424        }
425
426        let mut player = Player::new(
427            dkg::Player::new(round.info.clone(), me)
428                .wrap_err("unable to start cryptographic player instance")?,
429        );
430
431        // Replay persisted dealer messages
432        for (dealer, (pub_msg, priv_msg)) in self.dealings_for_epoch(round.epoch()) {
433            player.replay(dealer.clone(), pub_msg.clone(), priv_msg.clone());
434            debug!(%dealer, "replayed committed dealer message");
435        }
436
437        Ok(Some(player))
438    }
439
440    pub(super) fn get_latest_finalized_block_for_epoch(
441        &self,
442        epoch: &Epoch,
443    ) -> Option<(&Height, &FinalizedBlockInfo)> {
444        self.cache
445            .get(epoch)
446            .and_then(|cache| cache.finalized.last_key_value())
447    }
448
449    pub(super) fn get_notarized_reduced_block(
450        &mut self,
451        epoch: &Epoch,
452        digest: &Digest,
453    ) -> Option<&ReducedBlock> {
454        self.cache
455            .get(epoch)
456            .and_then(|cache| cache.notarized_blocks.get(digest))
457    }
458
459    #[instrument(skip_all, fields(%up_to_epoch), err)]
460    pub(super) async fn prune(&mut self, up_to_epoch: Epoch) -> eyre::Result<()> {
461        self.events
462            .prune(up_to_epoch.get())
463            .await
464            .wrap_err("unable to prune events journal")?;
465        self.states.retain(|&key, _| key >= up_to_epoch.get());
466        self.states
467            .sync()
468            .await
469            .wrap_err("unable to prune events metadata")?;
470        self.cache.retain(|&epoch, _| epoch >= up_to_epoch);
471        Ok(())
472    }
473}
474
475#[derive(Default)]
476pub(super) struct Builder {
477    initial_state: Option<BoxFuture<'static, eyre::Result<State>>>,
478    partition_prefix: Option<String>,
479}
480
481impl Builder {
482    pub(super) fn initial_state(
483        self,
484        initial_state: impl Future<Output = eyre::Result<State>> + Send + 'static,
485    ) -> Self {
486        Self {
487            initial_state: Some(initial_state.boxed()),
488            ..self
489        }
490    }
491
492    pub(super) fn partition_prefix(self, partition_prefix: &str) -> Self {
493        Self {
494            partition_prefix: Some(partition_prefix.to_string()),
495            ..self
496        }
497    }
498
499    #[instrument(skip_all, err)]
500    pub(super) async fn init<TContext>(self, context: TContext) -> eyre::Result<Storage<TContext>>
501    where
502        TContext: BufferPooler + commonware_runtime::Storage + Clock + Metrics,
503    {
504        let Self {
505            initial_state,
506            partition_prefix,
507        } = self;
508        let partition_prefix =
509            partition_prefix.ok_or_eyre("DKG actors state must have its partition prefix set")?;
510
511        let page_cache = CacheRef::from_pooler(&context, PAGE_SIZE, POOL_CAPACITY);
512
513        let states_partition = format!("{partition_prefix}_states");
514        let states_metadata_partition = format!("{partition_prefix}_states_metadata");
515
516        let mut states = metadata::Metadata::init(
517            context.with_label("states"),
518            metadata::Config {
519                partition: states_metadata_partition,
520                codec_config: MAXIMUM_VALIDATORS,
521            },
522        )
523        .await
524        .wrap_err("unable to initialize DKG states metadata")?;
525
526        migrate_journal_to_metadata_if_necessary(
527            &context,
528            &mut states,
529            &states_partition,
530            &page_cache,
531        )
532        .await
533        .wrap_err("migrating legacy states journal failed")?;
534
535        if states.keys().max().is_none() {
536            let initial_state = match initial_state {
537                None => {
538                    return Err(eyre!(
539                        "states metadata was empty, legacy journal was empty, \
540                         and initializer was not set"
541                    ));
542                }
543                Some(initial_state) => initial_state
544                    .await
545                    .wrap_err("failed constructing initial state to populate storage")?,
546            };
547            states
548                .put_sync(initial_state.epoch.get(), initial_state)
549                .await
550                .wrap_err("unable to write initial state to metadata")?;
551        }
552
553        let current = states
554            .keys()
555            .max()
556            .map(|epoch| {
557                states
558                    .get(epoch)
559                    .expect("state at keys iterator must exist")
560                    .clone()
561            })
562            .expect("states storage must contain a state after initialization");
563
564        let events = segmented::variable::Journal::init(
565            context.with_label("events"),
566            segmented::variable::Config {
567                partition: format!("{partition_prefix}_events"),
568                compression: None,
569                codec_config: MAXIMUM_VALIDATORS,
570                page_cache,
571                write_buffer: WRITE_BUFFER,
572            },
573        )
574        .await
575        .expect("should be able to initialize events journal");
576
577        // Replay msgs to populate epoch caches
578        let mut cache = BTreeMap::<Epoch, Events>::new();
579        {
580            let replay = events
581                .replay(0, 0, READ_BUFFER)
582                .await
583                .wrap_err("unable to start a replay stream to populate events cache")?;
584            futures::pin_mut!(replay);
585
586            while let Some(result) = replay.next().await {
587                let (section, _, _, event) =
588                    result.wrap_err("unable to read entry in replay stream")?;
589                let epoch = Epoch::new(section);
590                let events = cache.entry(epoch).or_default();
591                events.insert(event);
592            }
593        }
594
595        Ok(Storage {
596            states,
597            events,
598            current,
599            cache,
600        })
601    }
602}
603
604async fn migrate_journal_to_metadata_if_necessary<TContext>(
605    context: &TContext,
606    states: &mut metadata::Metadata<TContext, u64, State>,
607    states_partition: &str,
608    page_cache: &CacheRef,
609) -> eyre::Result<()>
610where
611    TContext: BufferPooler + commonware_runtime::Storage + Clock + Metrics,
612{
613    if states.keys().next().is_some() {
614        debug!("states already exists in new format; not migrating");
615        return Ok(());
616    }
617
618    let legacy_journal = contiguous::variable::Journal::<TContext, LegacyState>::init(
619        context.with_label("states_legacy"),
620        contiguous::variable::Config {
621            partition: states_partition.to_string(),
622            compression: None,
623            codec_config: MAXIMUM_VALIDATORS,
624            page_cache: page_cache.clone(),
625            write_buffer: WRITE_BUFFER,
626            items_per_section: NZU64!(1),
627        },
628    )
629    .await
630    .wrap_err("unable to initialize legacy DKG states journal for migration")?;
631
632    if let Some(latest_segment) = legacy_journal.size().await.checked_sub(1) {
633        info!(
634            latest_segment,
635            "legacy journal contains states; migrating last 2 segments",
636        );
637
638        let reader = legacy_journal.reader().await;
639        for segment in latest_segment.saturating_sub(1)..=latest_segment {
640            let legacy_state = reader
641                .read(segment)
642                .await
643                .wrap_err("unable to read state from legacy journal")?;
644            let state: State = legacy_state.into();
645            let epoch = state.epoch;
646            states.put(epoch.get(), state);
647            info!(%epoch, "migrated state to new format");
648        }
649        states
650            .sync()
651            .await
652            .wrap_err("unable to persist migrated states")?;
653
654        info!("states migrated to new format, deleting journal");
655    }
656
657    legacy_journal
658        .destroy()
659        .await
660        .wrap_err("unable to destroy legacy states journal")?;
661
662    Ok(())
663}
664
665/// Wrapper around a DKG share that tracks how it is stored at rest.
666///
667/// The `Option<Share>` is inside the enum so that a future encrypted variant
668/// can hide whether a share is present at all.
669///
670/// Currently only plaintext storage is supported, but additional variants
671/// (e.g. encrypted-at-rest) can be added in the future.
672#[derive(Clone, Debug, PartialEq, Eq)]
673pub(super) enum ShareState {
674    Plaintext(Option<Share>),
675}
676
677impl ShareState {
678    pub(super) fn into_inner(self) -> Option<Share> {
679        match self {
680            Self::Plaintext(share) => share,
681        }
682    }
683}
684
685impl EncodeSize for ShareState {
686    fn encode_size(&self) -> usize {
687        match self {
688            Self::Plaintext(share) => 1 + share.encode_size(),
689        }
690    }
691}
692
693impl Write for ShareState {
694    fn write(&self, buf: &mut impl bytes::BufMut) {
695        match self {
696            Self::Plaintext(share) => {
697                0u8.write(buf);
698                share.write(buf);
699            }
700        }
701    }
702}
703
704impl Read for ShareState {
705    type Cfg = ();
706
707    fn read_cfg(
708        buf: &mut impl bytes::Buf,
709        _cfg: &Self::Cfg,
710    ) -> Result<Self, commonware_codec::Error> {
711        let tag = u8::read(buf)?;
712        match tag {
713            0 => Ok(Self::Plaintext(ReadExt::read(buf)?)),
714            other => Err(commonware_codec::Error::InvalidEnum(other)),
715        }
716    }
717}
718
719/// The outcome of a DKG ceremony.
720#[derive(Clone, Debug, PartialEq, Eq)]
721pub(super) struct State {
722    pub(super) epoch: Epoch,
723    pub(super) seed: Summary,
724    pub(super) output: Output<MinSig, PublicKey>,
725    pub(super) share: ShareState,
726    pub(super) players: ordered::Set<PublicKey>,
727    pub(super) syncers: ordered::Set<PublicKey>,
728    pub(super) is_full_dkg: bool,
729}
730
731impl State {
732    /// Returns the dealers active in the DKG round tracked by this state.
733    pub(super) fn dealers(&self) -> &ordered::Set<PublicKey> {
734        self.output.players()
735    }
736
737    /// Returns the players active in the DKG round tracked by this state.
738    pub(super) fn players(&self) -> &ordered::Set<PublicKey> {
739        &self.players
740    }
741}
742
743impl EncodeSize for State {
744    fn encode_size(&self) -> usize {
745        self.epoch.encode_size()
746            + self.seed.encode_size()
747            + self.output.encode_size()
748            + self.share.encode_size()
749            + self.players.encode_size()
750            + self.syncers.encode_size()
751            + self.is_full_dkg.encode_size()
752    }
753}
754
755impl Write for State {
756    fn write(&self, buf: &mut impl bytes::BufMut) {
757        self.epoch.write(buf);
758        self.seed.write(buf);
759        self.output.write(buf);
760        self.share.write(buf);
761        self.players.write(buf);
762        self.syncers.write(buf);
763        self.is_full_dkg.write(buf);
764    }
765}
766
767impl Read for State {
768    type Cfg = NonZeroU32;
769
770    fn read_cfg(
771        buf: &mut impl bytes::Buf,
772        cfg: &Self::Cfg,
773    ) -> Result<Self, commonware_codec::Error> {
774        Ok(Self {
775            epoch: ReadExt::read(buf)?,
776            seed: ReadExt::read(buf)?,
777            output: Read::read_cfg(buf, &(*cfg, ModeVersion::v0()))?,
778            share: ReadExt::read(buf)?,
779            players: Read::read_cfg(buf, &(RangeCfg::from(1..=(u16::MAX as usize)), ()))?,
780            // Range from 0 to u16::MAX because after T2/V2 syncers are no longer
781            // stored in state.
782            syncers: Read::read_cfg(buf, &(RangeCfg::from(0..=(u16::MAX as usize)), ()))?,
783            is_full_dkg: ReadExt::read(buf)?,
784        })
785    }
786}
787
788/// Legacy state format used before the migration that removed peer address
789/// mappings. Kept only to read existing data from disk during migration.
790///
791/// `Write` and `EncodeSize` are required by the journal's `Codec` bound even
792/// though we only read from the journal during migration.
793#[derive(Clone)]
794struct LegacyState {
795    epoch: Epoch,
796    seed: Summary,
797    output: Output<MinSig, PublicKey>,
798    share: Option<Share>,
799    dealers: ordered::Map<PublicKey, SocketAddr>,
800    players: ordered::Map<PublicKey, SocketAddr>,
801    syncers: ordered::Map<PublicKey, SocketAddr>,
802    is_full_dkg: bool,
803}
804
805impl EncodeSize for LegacyState {
806    fn encode_size(&self) -> usize {
807        self.epoch.encode_size()
808            + self.seed.encode_size()
809            + self.output.encode_size()
810            + self.share.encode_size()
811            + self.dealers.encode_size()
812            + self.players.encode_size()
813            + self.syncers.encode_size()
814            + self.is_full_dkg.encode_size()
815    }
816}
817
818impl Write for LegacyState {
819    fn write(&self, buf: &mut impl bytes::BufMut) {
820        self.epoch.write(buf);
821        self.seed.write(buf);
822        self.output.write(buf);
823        self.share.write(buf);
824        self.dealers.write(buf);
825        self.players.write(buf);
826        self.syncers.write(buf);
827        self.is_full_dkg.write(buf);
828    }
829}
830
831impl Read for LegacyState {
832    type Cfg = NonZeroU32;
833
834    fn read_cfg(
835        buf: &mut impl bytes::Buf,
836        cfg: &Self::Cfg,
837    ) -> Result<Self, commonware_codec::Error> {
838        Ok(Self {
839            epoch: ReadExt::read(buf)?,
840            seed: ReadExt::read(buf)?,
841            output: Read::read_cfg(buf, &(*cfg, ModeVersion::v0()))?,
842            share: ReadExt::read(buf)?,
843            dealers: Read::read_cfg(buf, &(RangeCfg::from(1..=(u16::MAX as usize)), (), ()))?,
844            players: Read::read_cfg(buf, &(RangeCfg::from(1..=(u16::MAX as usize)), (), ()))?,
845            syncers: Read::read_cfg(buf, &(RangeCfg::from(1..=(u16::MAX as usize)), (), ()))?,
846            is_full_dkg: ReadExt::read(buf)?,
847        })
848    }
849}
850
851impl From<LegacyState> for State {
852    fn from(legacy: LegacyState) -> Self {
853        Self {
854            epoch: legacy.epoch,
855            seed: legacy.seed,
856            output: legacy.output,
857            share: ShareState::Plaintext(legacy.share),
858            players: legacy.players.keys().clone(),
859            syncers: legacy.syncers.keys().clone(),
860            is_full_dkg: legacy.is_full_dkg,
861        }
862    }
863}
864
865#[expect(
866    dead_code,
867    reason = "tracking this data is virtually free and might become useful later"
868)]
869#[derive(Clone, Debug)]
870pub(super) struct FinalizedBlockInfo {
871    pub(super) height: Height,
872    pub(super) digest: Digest,
873    pub(super) parent: Digest,
874}
875
876/// A cache of all events that transpired during a given epoch.
877#[derive(Debug, Default)]
878struct Events {
879    acks: BTreeMap<PublicKey, PlayerAck<PublicKey>>,
880    dealings: BTreeMap<PublicKey, (DealerPubMsg<MinSig>, DealerPrivMsg)>,
881    logs: BTreeMap<PublicKey, dkg::DealerLog<MinSig, PublicKey>>,
882    finalized: BTreeMap<Height, FinalizedBlockInfo>,
883
884    notarized_blocks: HashMap<Digest, ReducedBlock>,
885    dkg_outcomes: HashMap<Digest, (Output<MinSig, PublicKey>, ShareState)>,
886}
887
888impl Events {
889    fn insert(&mut self, event: Event) {
890        match event {
891            Event::Dealing {
892                dealer: public_key,
893                public_msg,
894                private_msg,
895            } => {
896                self.dealings.insert(public_key, (public_msg, private_msg));
897            }
898            Event::Ack {
899                player: public_key,
900                ack,
901            } => {
902                self.acks.insert(public_key, ack);
903            }
904            Event::Log { dealer, log } => {
905                self.logs.insert(dealer, log);
906            }
907            Event::Finalized {
908                digest,
909                parent,
910                height,
911            } => {
912                self.finalized.insert(
913                    height,
914                    FinalizedBlockInfo {
915                        height,
916                        digest,
917                        parent,
918                    },
919                );
920            }
921        }
922    }
923}
924
925enum Event {
926    /// A message received from a dealer (as a player).
927    Dealing {
928        dealer: PublicKey,
929        public_msg: DealerPubMsg<MinSig>,
930        private_msg: DealerPrivMsg,
931    },
932    /// An ack (of a dealing) received from a player (as a dealer).
933    Ack {
934        player: PublicKey,
935        ack: PlayerAck<PublicKey>,
936    },
937    /// A dealer log read from a finalized block.
938    Log {
939        dealer: PublicKey,
940        log: dkg::DealerLog<MinSig, PublicKey>,
941    },
942    /// Information of finalized block observed by the actor.
943    Finalized {
944        digest: Digest,
945        parent: Digest,
946        height: Height,
947    },
948}
949
950impl EncodeSize for Event {
951    fn encode_size(&self) -> usize {
952        1 + match self {
953            Self::Dealing {
954                dealer: public_key,
955                public_msg,
956                private_msg,
957            } => public_key.encode_size() + public_msg.encode_size() + private_msg.encode_size(),
958            Self::Ack {
959                player: public_key,
960                ack,
961            } => public_key.encode_size() + ack.encode_size(),
962            Self::Log { dealer, log } => dealer.encode_size() + log.encode_size(),
963            Self::Finalized {
964                digest,
965                parent,
966                height,
967            } => digest.encode_size() + parent.encode_size() + height.encode_size(),
968        }
969    }
970}
971
972impl Write for Event {
973    fn write(&self, buf: &mut impl bytes::BufMut) {
974        match self {
975            Self::Dealing {
976                dealer: public_key,
977                public_msg,
978                private_msg,
979            } => {
980                0u8.write(buf);
981                public_key.write(buf);
982                public_msg.write(buf);
983                private_msg.write(buf);
984            }
985            Self::Ack {
986                player: public_key,
987                ack,
988            } => {
989                1u8.write(buf);
990                public_key.write(buf);
991                ack.write(buf);
992            }
993            Self::Log { dealer, log } => {
994                2u8.write(buf);
995                dealer.write(buf);
996                log.write(buf);
997            }
998            Self::Finalized {
999                digest,
1000                parent,
1001                height,
1002            } => {
1003                3u8.write(buf);
1004                digest.write(buf);
1005                parent.write(buf);
1006                height.write(buf);
1007            }
1008        }
1009    }
1010}
1011
1012impl Read for Event {
1013    type Cfg = NonZeroU32;
1014
1015    fn read_cfg(
1016        buf: &mut impl bytes::Buf,
1017        cfg: &Self::Cfg,
1018    ) -> Result<Self, commonware_codec::Error> {
1019        let tag = u8::read(buf)?;
1020        match tag {
1021            0 => Ok(Self::Dealing {
1022                dealer: ReadExt::read(buf)?,
1023                public_msg: Read::read_cfg(buf, cfg)?,
1024                private_msg: ReadExt::read(buf)?,
1025            }),
1026            1 => Ok(Self::Ack {
1027                player: ReadExt::read(buf)?,
1028                ack: ReadExt::read(buf)?,
1029            }),
1030            2 => Ok(Self::Log {
1031                dealer: ReadExt::read(buf)?,
1032                log: Read::read_cfg(buf, &NZU32!(u16::MAX as u32))?,
1033            }),
1034            3 => Ok(Self::Finalized {
1035                digest: ReadExt::read(buf)?,
1036                parent: ReadExt::read(buf)?,
1037                height: ReadExt::read(buf)?,
1038            }),
1039            other => Err(commonware_codec::Error::InvalidEnum(other)),
1040        }
1041    }
1042}
1043
1044/// Internal state for a dealer in the current round.
1045pub(super) struct Dealer {
1046    /// The inner cryptographic dealer state. Is `None` if
1047    /// the dealer log was already finalized so that it is not finalized again.
1048    dealer: Option<dkg::Dealer<MinSig, PrivateKey>>,
1049
1050    /// The message containing the generated commitment by this dealer, which
1051    /// is shared with all players and posted on chain.
1052    pub_msg: DealerPubMsg<MinSig>,
1053
1054    /// A map of players that we have not yet successfully sent their private
1055    /// messages to (containing their share generated by this dealer).
1056    unsent: BTreeMap<PublicKey, DealerPrivMsg>,
1057
1058    /// The finalized, signed log of this dealer. Initially `None` and set after
1059    /// the middle point of the epoch. Set to `None` again after this node
1060    /// observes it dealer log on chain to not post it again.
1061    finalized: Option<SignedDealerLog<MinSig, PrivateKey>>,
1062}
1063
1064impl Dealer {
1065    pub(super) const fn new(
1066        dealer: Option<dkg::Dealer<MinSig, PrivateKey>>,
1067        pub_msg: DealerPubMsg<MinSig>,
1068        unsent: BTreeMap<PublicKey, DealerPrivMsg>,
1069    ) -> Self {
1070        Self {
1071            dealer,
1072            pub_msg,
1073            unsent,
1074            finalized: None,
1075        }
1076    }
1077
1078    /// Handle an incoming ack from a player.
1079    ///
1080    /// If the ack is valid and new, persists it to storage.
1081    /// Returns true if the ack was successfully processed.
1082    pub(super) async fn receive_ack<TContext>(
1083        &mut self,
1084        storage: &mut Storage<TContext>,
1085        epoch: Epoch,
1086        player: PublicKey,
1087        ack: PlayerAck<PublicKey>,
1088    ) -> eyre::Result<()>
1089    where
1090        TContext: commonware_runtime::Storage + Clock + Metrics,
1091    {
1092        if !self.unsent.contains_key(&player) {
1093            bail!("already received an ack from `{player}`");
1094        }
1095        match &mut self.dealer {
1096            Some(dealer) => {
1097                dealer
1098                    .receive_player_ack(player.clone(), ack.clone())
1099                    .wrap_err("unable to receive player ack")?;
1100                self.unsent.remove(&player);
1101                storage
1102                    .append_ack(epoch, player.clone(), ack.clone())
1103                    .await
1104                    .wrap_err("unable to append ack to journal")?;
1105            }
1106            None => bail!("dealer was already finalized, dropping ack of player `{player}`"),
1107        }
1108        Ok(())
1109    }
1110
1111    /// Finalize the dealer and produce a signed log for inclusion in a block.
1112    pub(super) fn finalize(&mut self) {
1113        if self.finalized.is_some() {
1114            return;
1115        }
1116
1117        // Even after the finalized_log is taken, we won't attempt to finalize
1118        // again because the dealer will be None.
1119        if let Some(dealer) = self.dealer.take() {
1120            let log = dealer.finalize::<N3f1>();
1121            self.finalized = Some(log);
1122        }
1123    }
1124
1125    /// Returns a clone of the finalized log if it exists.
1126    pub(super) fn finalized(&self) -> Option<SignedDealerLog<MinSig, PrivateKey>> {
1127        self.finalized.clone()
1128    }
1129
1130    /// Takes and returns the finalized log, leaving None in its place.
1131    pub(super) const fn take_finalized(&mut self) -> Option<SignedDealerLog<MinSig, PrivateKey>> {
1132        self.finalized.take()
1133    }
1134
1135    /// Returns shares to distribute to players.
1136    ///
1137    /// Returns an iterator of (player, pub_msg, priv_msg) tuples for each player
1138    /// that hasn't yet acknowledged their share.
1139    pub(super) fn shares_to_distribute(
1140        &self,
1141    ) -> impl Iterator<Item = (PublicKey, DealerPubMsg<MinSig>, DealerPrivMsg)> + '_ {
1142        self.unsent
1143            .iter()
1144            .map(|(player, priv_msg)| (player.clone(), self.pub_msg.clone(), priv_msg.clone()))
1145    }
1146}
1147
1148#[derive(Clone, Debug)]
1149pub(super) struct Round {
1150    epoch: Epoch,
1151    info: dkg::Info<MinSig, PublicKey>,
1152    dealers: ordered::Set<PublicKey>,
1153    players: ordered::Set<PublicKey>,
1154    is_full_dkg: bool,
1155}
1156
1157impl Round {
1158    pub(super) fn from_state(state: &State, namespace: &[u8]) -> Self {
1159        // For full DKG, don't pass the previous output - this creates a new polynomial
1160        let previous_output = if state.is_full_dkg {
1161            None
1162        } else {
1163            Some(state.output.clone())
1164        };
1165
1166        let dealers = state.dealers().clone();
1167        let players = state.players().clone();
1168
1169        Self {
1170            epoch: state.epoch,
1171            info: Info::new::<N3f1>(
1172                namespace,
1173                state.epoch.get(),
1174                previous_output,
1175                Mode::NonZeroCounter,
1176                dealers.clone(),
1177                players.clone(),
1178            )
1179            .expect("a DKG round must always be initializable given some epoch state"),
1180            dealers,
1181            players,
1182            is_full_dkg: state.is_full_dkg,
1183        }
1184    }
1185
1186    pub(super) fn info(&self) -> &dkg::Info<MinSig, PublicKey> {
1187        &self.info
1188    }
1189
1190    pub(super) fn epoch(&self) -> Epoch {
1191        self.epoch
1192    }
1193
1194    pub(super) fn dealers(&self) -> &ordered::Set<PublicKey> {
1195        &self.dealers
1196    }
1197
1198    pub(super) fn players(&self) -> &ordered::Set<PublicKey> {
1199        &self.players
1200    }
1201
1202    pub(super) fn is_full_dkg(&self) -> bool {
1203        self.is_full_dkg
1204    }
1205}
1206
1207/// Internal state for a player in the current round.
1208pub(super) struct Player {
1209    player: dkg::Player<MinSig, PrivateKey>,
1210    /// Acks we've generated, keyed by dealer. Once we generate an ack for a dealer,
1211    /// we will not generate a different one (to avoid conflicting votes).
1212    acks: BTreeMap<PublicKey, PlayerAck<PublicKey>>,
1213}
1214
1215impl Player {
1216    pub(super) const fn new(player: dkg::Player<MinSig, PrivateKey>) -> Self {
1217        Self {
1218            player,
1219            acks: BTreeMap::new(),
1220        }
1221    }
1222
1223    /// Handle an incoming dealer message.
1224    ///
1225    /// If this is a new valid dealer message, persists it to storage before returning.
1226    pub(super) async fn receive_dealing<TContext>(
1227        &mut self,
1228        storage: &mut Storage<TContext>,
1229        epoch: Epoch,
1230        dealer: PublicKey,
1231        pub_msg: DealerPubMsg<MinSig>,
1232        priv_msg: DealerPrivMsg,
1233    ) -> eyre::Result<PlayerAck<PublicKey>>
1234    where
1235        TContext: commonware_runtime::Storage + Clock + Metrics,
1236    {
1237        // If we've already generated an ack, return the cached version
1238        if let Some(ack) = self.acks.get(&dealer) {
1239            return Ok(ack.clone());
1240        }
1241
1242        // Otherwise generate a new ack
1243        let ack = self
1244            .player
1245            .dealer_message::<N3f1>(dealer.clone(), pub_msg.clone(), priv_msg.clone())
1246            // FIXME(janis): it would be great to know why exactly that is not the case.
1247            .ok_or_eyre(
1248                "applying dealer message to player instance did not result in a usable ack",
1249            )?;
1250        storage
1251            .append_dealing(epoch, dealer.clone(), pub_msg, priv_msg)
1252            .await
1253            .wrap_err("unable to append dealing to journal")?;
1254        self.acks.insert(dealer, ack.clone());
1255        Ok(ack)
1256    }
1257
1258    /// Replay an already-persisted dealer message (updates in-memory state only).
1259    fn replay(
1260        &mut self,
1261        dealer: PublicKey,
1262        pub_msg: DealerPubMsg<MinSig>,
1263        priv_msg: DealerPrivMsg,
1264    ) {
1265        if self.acks.contains_key(&dealer) {
1266            return;
1267        }
1268        if let Some(ack) = self
1269            .player
1270            .dealer_message::<N3f1>(dealer.clone(), pub_msg, priv_msg)
1271        {
1272            self.acks.insert(dealer, ack);
1273        }
1274    }
1275
1276    /// Finalize the player's participation in the DKG round.
1277    pub(super) fn finalize(
1278        self,
1279        logs: BTreeMap<PublicKey, dkg::DealerLog<MinSig, PublicKey>>,
1280        strategy: &impl Strategy,
1281    ) -> Result<(Output<MinSig, PublicKey>, Share), dkg::Error> {
1282        self.player.finalize::<N3f1>(logs, strategy)
1283    }
1284}
1285
1286/// Contains a block's height, parent, digest, and dealer log, if there was one.
1287#[derive(Clone, Debug)]
1288pub(super) struct ReducedBlock {
1289    // The block height.
1290    pub(super) height: Height,
1291
1292    // The block parent.
1293    pub(super) parent: Digest,
1294
1295    // The block digest (hash).
1296    pub(super) digest: Digest,
1297
1298    // The (dealer, log) tuple, if a block contained a signed dealear log.
1299    pub(super) log: Option<(PublicKey, dkg::DealerLog<MinSig, PublicKey>)>,
1300}
1301
1302impl ReducedBlock {
1303    pub(super) fn from_block_for_round(block: &Block, round: &Round) -> Self {
1304        let log = if block.header().extra_data().is_empty() {
1305            None
1306        } else {
1307            dkg::SignedDealerLog::<MinSig, PrivateKey>::read_cfg(
1308                &mut block.header().extra_data().as_ref(),
1309                &NZU32!(round.players.len() as u32),
1310            )
1311            .inspect(|_| {
1312                info!(
1313                    height = %block.height(),
1314                    digest = %block.digest(),
1315                    "found dealer log in block"
1316                )
1317            })
1318            .inspect_err(|error| {
1319                warn!(
1320                    %error,
1321                    "block header extraData had data, but it could not be read as \
1322                    a signed dealer log",
1323                )
1324            })
1325            .ok()
1326            .and_then(|log| match log.check(&round.info) {
1327                Some((dealer, log)) => Some((dealer, log)),
1328                None => {
1329                    // TODO(janis): some more fidelity here would be nice.
1330                    warn!("log failed check against current round");
1331                    None
1332                }
1333            })
1334        };
1335        Self {
1336            height: block.height(),
1337            parent: block.parent(),
1338            digest: block.digest(),
1339            log,
1340        }
1341    }
1342}
1343
1344#[cfg(test)]
1345mod tests {
1346    use super::*;
1347    use commonware_codec::Encode as _;
1348    use commonware_cryptography::{
1349        bls12381::{dkg, primitives::sharing::Mode},
1350        ed25519::PrivateKey,
1351        transcript::Summary,
1352    };
1353    use commonware_math::algebra::Random as _;
1354    use commonware_runtime::{Runner as _, deterministic};
1355    use commonware_utils::TryFromIterator as _;
1356
1357    fn make_test_state(rng: &mut impl rand_core::CryptoRngCore, epoch: u64) -> State {
1358        let mut keys: Vec<_> = (0..3)
1359            .map(|i| PrivateKey::from_seed(i + epoch * 100))
1360            .collect();
1361        keys.sort_by_key(|k| k.public_key());
1362        let pubkeys = ordered::Set::try_from_iter(keys.iter().map(|k| k.public_key())).unwrap();
1363
1364        let (output, _shares) =
1365            dkg::deal::<_, _, N3f1>(&mut *rng, Mode::NonZeroCounter, pubkeys.clone()).unwrap();
1366
1367        State {
1368            epoch: Epoch::new(epoch),
1369            seed: Summary::random(rng),
1370            output,
1371            share: ShareState::Plaintext(None),
1372            players: pubkeys.clone(),
1373            syncers: pubkeys,
1374            is_full_dkg: false,
1375        }
1376    }
1377
1378    fn make_legacy_state(rng: &mut impl rand_core::CryptoRngCore, epoch: u64) -> LegacyState {
1379        let mut keys: Vec<_> = (0..3)
1380            .map(|i| PrivateKey::from_seed(i + epoch * 100))
1381            .collect();
1382        keys.sort_by_key(|k| k.public_key());
1383        let pubkeys = ordered::Set::try_from_iter(keys.iter().map(|k| k.public_key())).unwrap();
1384
1385        let (output, _shares) =
1386            dkg::deal::<_, _, N3f1>(&mut *rng, Mode::NonZeroCounter, pubkeys.clone()).unwrap();
1387
1388        let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
1389        let peers =
1390            ordered::Map::from_iter_dedup(pubkeys.iter().map(|k: &PublicKey| (k.clone(), addr)));
1391
1392        LegacyState {
1393            epoch: Epoch::new(epoch),
1394            seed: Summary::random(rng),
1395            output,
1396            share: None,
1397            dealers: peers.clone(),
1398            players: peers.clone(),
1399            syncers: peers,
1400            is_full_dkg: false,
1401        }
1402    }
1403
1404    #[test]
1405    fn state_round_trip_with() {
1406        let executor = deterministic::Runner::default();
1407        executor.start(|mut context| async move {
1408            let state = make_test_state(&mut context, 0);
1409            let mut bytes = state.encode();
1410            assert_eq!(
1411                state,
1412                State::read_cfg(&mut bytes, &NZU32!(u32::MAX)).unwrap(),
1413            );
1414
1415            let state_without_syncers = {
1416                let mut s = make_test_state(&mut context, 0);
1417                s.syncers = Default::default();
1418                s
1419            };
1420            let mut bytes = state_without_syncers.encode();
1421            assert_eq!(
1422                state_without_syncers,
1423                State::read_cfg(&mut bytes, &NZU32!(u32::MAX)).unwrap(),
1424            );
1425        });
1426    }
1427
1428    #[test]
1429    fn states_migration_migrates_last_two() {
1430        let executor = deterministic::Runner::default();
1431        executor.start(|mut context| async move {
1432            let partition_prefix = "test_dkg";
1433            let states_partition = format!("{partition_prefix}_states");
1434            let states_metadata_partition = format!("{partition_prefix}_states_metadata");
1435            let page_cache = CacheRef::from_pooler(&context, PAGE_SIZE, POOL_CAPACITY);
1436
1437            let ancient_legacy = make_legacy_state(&mut context, 1);
1438            let previous_legacy = make_legacy_state(&mut context, 2);
1439            let latest_legacy = make_legacy_state(&mut context, 3);
1440            let previous_expected: State = previous_legacy.clone().into();
1441            let latest_expected: State = latest_legacy.clone().into();
1442
1443            // Populate the legacy journal with three entries.
1444            {
1445                let journal = contiguous::variable::Journal::<_, LegacyState>::init(
1446                    context.with_label("journal_setup"),
1447                    contiguous::variable::Config {
1448                        partition: states_partition.clone(),
1449                        compression: None,
1450                        codec_config: MAXIMUM_VALIDATORS,
1451                        page_cache: page_cache.clone(),
1452                        write_buffer: WRITE_BUFFER,
1453                        items_per_section: NZU64!(1),
1454                    },
1455                )
1456                .await
1457                .unwrap();
1458
1459                journal.append(&ancient_legacy).await.unwrap();
1460                journal.append(&previous_legacy).await.unwrap();
1461                journal.append(&latest_legacy).await.unwrap();
1462                journal.sync().await.unwrap();
1463            }
1464
1465            let mut states = metadata::Metadata::init(
1466                context.with_label("states"),
1467                metadata::Config {
1468                    partition: states_metadata_partition.clone(),
1469                    codec_config: MAXIMUM_VALIDATORS,
1470                },
1471            )
1472            .await
1473            .unwrap();
1474
1475            migrate_journal_to_metadata_if_necessary(
1476                &context,
1477                &mut states,
1478                &states_partition,
1479                &page_cache,
1480            )
1481            .await
1482            .unwrap();
1483
1484            // Only the last two states should be migrated.
1485            assert!(
1486                states.get(&1).is_none(),
1487                "ancient state should not be migrated"
1488            );
1489            assert_eq!(
1490                &previous_expected,
1491                states.get(&previous_expected.epoch.get()).unwrap(),
1492            );
1493            assert_eq!(
1494                &latest_expected,
1495                states.get(&latest_expected.epoch.get()).unwrap(),
1496            );
1497
1498            // Journal should be destroyed.
1499            let reopened = contiguous::variable::Journal::<_, LegacyState>::init(
1500                context.with_label("journal_verify"),
1501                contiguous::variable::Config {
1502                    partition: states_partition.clone(),
1503                    compression: None,
1504                    codec_config: MAXIMUM_VALIDATORS,
1505                    page_cache: page_cache.clone(),
1506                    write_buffer: WRITE_BUFFER,
1507                    items_per_section: NZU64!(1),
1508                },
1509            )
1510            .await
1511            .unwrap();
1512
1513            assert_eq!(reopened.size().await, 0);
1514
1515            // Metadata persists across reopens.
1516            drop(states);
1517            let states = metadata::Metadata::<_, u64, State>::init(
1518                context.with_label("states_reopen"),
1519                metadata::Config {
1520                    partition: states_metadata_partition,
1521                    codec_config: MAXIMUM_VALIDATORS,
1522                },
1523            )
1524            .await
1525            .unwrap();
1526            assert_eq!(
1527                &previous_expected,
1528                states.get(&previous_expected.epoch.get()).unwrap(),
1529            );
1530            assert_eq!(
1531                &latest_expected,
1532                states.get(&latest_expected.epoch.get()).unwrap(),
1533            );
1534        });
1535    }
1536
1537    #[test]
1538    fn states_migration_single_entry() {
1539        let executor = deterministic::Runner::default();
1540        executor.start(|mut context| async move {
1541            let partition_prefix = "test_dkg_single";
1542            let states_partition = format!("{partition_prefix}_states");
1543            let states_metadata_partition = format!("{partition_prefix}_states_metadata");
1544            let page_cache = CacheRef::from_pooler(&context, PAGE_SIZE, POOL_CAPACITY);
1545
1546            let only_legacy = make_legacy_state(&mut context, 5);
1547            let only_expected: State = only_legacy.clone().into();
1548
1549            // Populate the legacy journal with a single entry.
1550            {
1551                let journal = contiguous::variable::Journal::<_, LegacyState>::init(
1552                    context.with_label("journal_setup"),
1553                    contiguous::variable::Config {
1554                        partition: states_partition.clone(),
1555                        compression: None,
1556                        codec_config: MAXIMUM_VALIDATORS,
1557                        page_cache: page_cache.clone(),
1558                        write_buffer: WRITE_BUFFER,
1559                        items_per_section: NZU64!(1),
1560                    },
1561                )
1562                .await
1563                .unwrap();
1564
1565                journal.append(&only_legacy).await.unwrap();
1566                journal.sync().await.unwrap();
1567            }
1568
1569            let mut states = metadata::Metadata::init(
1570                context.with_label("states"),
1571                metadata::Config {
1572                    partition: states_metadata_partition,
1573                    codec_config: MAXIMUM_VALIDATORS,
1574                },
1575            )
1576            .await
1577            .unwrap();
1578
1579            migrate_journal_to_metadata_if_necessary(
1580                &context,
1581                &mut states,
1582                &states_partition,
1583                &page_cache,
1584            )
1585            .await
1586            .unwrap();
1587
1588            // Single entry: saturating_sub(1) == 0, range 0..=0, so only that one state.
1589            assert_eq!(states.keys().count(), 1);
1590            assert_eq!(
1591                &only_expected,
1592                states.get(&only_expected.epoch.get()).unwrap(),
1593            );
1594        });
1595    }
1596
1597    #[test]
1598    fn states_migration_does_not_overwrite_existing_metadata() {
1599        let executor = deterministic::Runner::default();
1600        executor.start(|mut context| async move {
1601            let partition_prefix = "test_dkg_noop";
1602            let states_partition = format!("{partition_prefix}_states");
1603            let states_metadata_partition = format!("{partition_prefix}_states_metadata");
1604            let page_cache = CacheRef::from_pooler(&context, PAGE_SIZE, POOL_CAPACITY);
1605
1606            let existing_state = make_test_state(&mut context, 10);
1607            let journal_legacy = make_legacy_state(&mut context, 20);
1608            let journal_expected_epoch = journal_legacy.epoch.get();
1609
1610            // Pre-populate metadata.
1611            let mut states = metadata::Metadata::init(
1612                context.with_label("states"),
1613                metadata::Config {
1614                    partition: states_metadata_partition.clone(),
1615                    codec_config: MAXIMUM_VALIDATORS,
1616                },
1617            )
1618            .await
1619            .unwrap();
1620            states.put(existing_state.epoch.get(), existing_state.clone());
1621            states.sync().await.unwrap();
1622
1623            // Populate a legacy journal with different data.
1624            {
1625                let journal = contiguous::variable::Journal::<_, LegacyState>::init(
1626                    context.with_label("journal_setup"),
1627                    contiguous::variable::Config {
1628                        partition: states_partition.clone(),
1629                        compression: None,
1630                        codec_config: MAXIMUM_VALIDATORS,
1631                        page_cache: page_cache.clone(),
1632                        write_buffer: WRITE_BUFFER,
1633                        items_per_section: NZU64!(1),
1634                    },
1635                )
1636                .await
1637                .unwrap();
1638                journal.append(&journal_legacy).await.unwrap();
1639                journal.sync().await.unwrap();
1640            }
1641
1642            // Run migration — should be a no-op since metadata is already populated.
1643            migrate_journal_to_metadata_if_necessary(
1644                &context,
1645                &mut states,
1646                &states_partition,
1647                &page_cache,
1648            )
1649            .await
1650            .unwrap();
1651
1652            assert_eq!(states.keys().count(), 1);
1653            assert_eq!(
1654                &existing_state,
1655                states.get(&existing_state.epoch.get()).unwrap(),
1656            );
1657            assert!(
1658                states.get(&journal_expected_epoch).is_none(),
1659                "journal state must not be written into already-populated metadata"
1660            );
1661        });
1662    }
1663
1664    #[track_caller]
1665    fn assert_roundtrip(original: &ShareState) {
1666        use commonware_codec::Encode as _;
1667        let encoded = original.encode();
1668        let decoded = ShareState::read_cfg(&mut encoded.as_ref(), &()).unwrap();
1669        assert_eq!(original, &decoded);
1670    }
1671
1672    #[test]
1673    fn share_state_roundtrip_plaintext_none() {
1674        assert_roundtrip(&ShareState::Plaintext(None));
1675    }
1676
1677    #[test]
1678    fn share_state_roundtrip_plaintext_some() {
1679        use rand_08::SeedableRng as _;
1680        let mut rng = rand_08::rngs::StdRng::seed_from_u64(42);
1681
1682        let keys = std::iter::repeat_with(|| PrivateKey::random(&mut rng))
1683            .take(3)
1684            .collect::<Vec<_>>();
1685        let pubkeys = ordered::Set::try_from_iter(keys.iter().map(|k| k.public_key())).unwrap();
1686
1687        let (_output, shares) =
1688            dkg::deal::<MinSig, _, N3f1>(&mut rng, Mode::NonZeroCounter, pubkeys).unwrap();
1689
1690        let share = shares.into_iter().next().unwrap().1;
1691        assert_roundtrip(&ShareState::Plaintext(Some(share)));
1692    }
1693}