1use std::{
2 collections::{BTreeMap, HashMap},
3 net::SocketAddr,
4 num::{NonZeroU16, NonZeroU32, NonZeroUsize},
5};
6
7use alloy_consensus::BlockHeader as _;
8use commonware_codec::{EncodeSize, RangeCfg, Read, ReadExt, Write};
9use commonware_consensus::{
10 Block as _, Heightable as _,
11 types::{Epoch, Height},
12};
13use commonware_cryptography::{
14 Signer as _,
15 bls12381::{
16 dkg::{self, DealerPrivMsg, DealerPubMsg, Info, Output, PlayerAck, SignedDealerLog},
17 primitives::{
18 group::Share,
19 sharing::{Mode, ModeVersion},
20 variant::MinSig,
21 },
22 },
23 ed25519::{PrivateKey, PublicKey},
24 transcript::{Summary, Transcript},
25};
26use commonware_parallel::Strategy;
27use commonware_runtime::{BufferPooler, Clock, Metrics, buffer::paged::CacheRef};
28use commonware_storage::{
29 journal::{contiguous, contiguous::Reader as _, segmented},
30 metadata,
31};
32use commonware_utils::{N3f1, NZU16, NZU32, NZU64, NZUsize, ordered};
33use eyre::{OptionExt, WrapErr as _, bail, eyre};
34use futures::{FutureExt as _, StreamExt as _, future::BoxFuture};
35use tracing::{debug, info, instrument, warn};
36
37use crate::consensus::{Digest, block::Block};
38
39const PAGE_SIZE: NonZeroU16 = NZU16!(1 << 12);
40const POOL_CAPACITY: NonZeroUsize = NZUsize!(1 << 13);
41const WRITE_BUFFER: NonZeroUsize = NZUsize!(1 << 12);
42const READ_BUFFER: NonZeroUsize = NZUsize!(1 << 20);
43
44const MAXIMUM_VALIDATORS: NonZeroU32 = NZU32!(u16::MAX as u32);
50
51pub(super) fn builder() -> Builder {
52 Builder::default()
53}
54
55pub(super) struct Storage<TContext>
56where
57 TContext: commonware_runtime::Storage + Clock + Metrics,
58{
59 states: metadata::Metadata<TContext, u64, State>,
60 events: segmented::variable::Journal<TContext, Event>,
61
62 current: State,
63 cache: BTreeMap<Epoch, Events>,
64}
65
66impl<TContext> Storage<TContext>
67where
68 TContext: commonware_runtime::Storage + Clock + Metrics,
69{
70 fn acks_for_epoch(
72 &self,
73 epoch: Epoch,
74 ) -> impl Iterator<Item = (&PublicKey, &PlayerAck<PublicKey>)> {
75 self.cache
76 .get(&epoch)
77 .into_iter()
78 .flat_map(|cache| cache.acks.iter())
79 }
80
81 fn dealings_for_epoch(
83 &self,
84 epoch: Epoch,
85 ) -> impl Iterator<Item = (&PublicKey, &(DealerPubMsg<MinSig>, DealerPrivMsg))> {
86 self.cache
87 .get(&epoch)
88 .into_iter()
89 .flat_map(|cache| cache.dealings.iter())
90 }
91
92 pub(super) fn logs_for_epoch(
94 &self,
95 epoch: Epoch,
96 ) -> impl Iterator<Item = (&PublicKey, &dkg::DealerLog<MinSig, PublicKey>)> {
97 self.cache
98 .get(&epoch)
99 .into_iter()
100 .flat_map(|cache| cache.logs.iter())
101 }
102
103 pub(super) fn current(&self) -> State {
105 self.current.clone()
106 }
107
108 pub(super) async fn set_state(&mut self, state: State) -> eyre::Result<()> {
110 if let Some(old) = self.states.put(state.epoch.get(), state.clone()) {
111 warn!(epoch = %old.epoch, "overwriting existing state");
112 }
113 self.states.sync().await.wrap_err("failed writing state")?;
114 self.current = state;
115 Ok(())
116 }
117
118 #[instrument(
120 skip_all,
121 fields(
122 %epoch,
123 %player,
124 ),
125 err,
126 )]
127 async fn append_ack(
128 &mut self,
129 epoch: Epoch,
130 player: PublicKey,
131 ack: PlayerAck<PublicKey>,
132 ) -> eyre::Result<()> {
133 if self
134 .cache
135 .get(&epoch)
136 .is_some_and(|events| events.acks.contains_key(&player))
137 {
138 info!(%player, %epoch, "ack for player already found in cache, dropping");
139 return Ok(());
140 }
141
142 let section = epoch.get();
143 self.events
144 .append(
145 section,
146 &Event::Ack {
147 player: player.clone(),
148 ack: ack.clone(),
149 },
150 )
151 .await
152 .wrap_err("unable to write event to storage")?;
153
154 self.events
155 .sync(section)
156 .await
157 .wrap_err("unable to sync events journal")?;
158
159 self.cache
160 .entry(epoch)
161 .or_default()
162 .acks
163 .insert(player, ack);
164
165 Ok(())
166 }
167
168 #[instrument(
170 skip_all,
171 fields(
172 %epoch,
173 %dealer,
174 ),
175 err,
176 )]
177 async fn append_dealing(
178 &mut self,
179 epoch: Epoch,
180 dealer: PublicKey,
181 pub_msg: DealerPubMsg<MinSig>,
182 priv_msg: DealerPrivMsg,
183 ) -> eyre::Result<()> {
184 if self
185 .cache
186 .get(&epoch)
187 .is_some_and(|events| events.dealings.contains_key(&dealer))
188 {
189 info!(%dealer, %epoch, "dealing of dealer already found in cache, dropping");
190 return Ok(());
191 }
192
193 let section = epoch.get();
194 self.events
195 .append(
196 section,
197 &Event::Dealing {
198 dealer: dealer.clone(),
199 public_msg: pub_msg.clone(),
200 private_msg: priv_msg.clone(),
201 },
202 )
203 .await
204 .wrap_err("unable to write event to storage")?;
205
206 self.events
207 .sync(section)
208 .await
209 .wrap_err("unable to sync events journal")?;
210
211 self.cache
212 .entry(epoch)
213 .or_default()
214 .dealings
215 .insert(dealer, (pub_msg, priv_msg));
216
217 Ok(())
218 }
219
220 pub(super) async fn append_dealer_log(
222 &mut self,
223 epoch: Epoch,
224 dealer: PublicKey,
225 log: dkg::DealerLog<MinSig, PublicKey>,
226 ) -> eyre::Result<()> {
227 if self
228 .cache
229 .get(&epoch)
230 .is_some_and(|events| events.logs.contains_key(&dealer))
231 {
232 info!(
233 %dealer,
234 %epoch,
235 "dealer log already found in cache; dropping"
236 );
237 return Ok(());
238 }
239
240 let section = epoch.get();
241 self.events
242 .append(
243 section,
244 &Event::Log {
245 dealer: dealer.clone(),
246 log: log.clone(),
247 },
248 )
249 .await
250 .wrap_err("failed to append log to journal")?;
251 self.events
252 .sync(section)
253 .await
254 .wrap_err("unable to sync journal")?;
255
256 let cache = self.cache.entry(epoch).or_default();
257 cache.logs.insert(dealer, log);
258 Ok(())
259 }
260
261 pub(super) async fn append_finalized_block(
263 &mut self,
264 epoch: Epoch,
265 block: Block,
266 ) -> eyre::Result<()> {
267 let height = block.height();
268 let digest = block.digest();
269 let parent = block.parent();
270 if self
271 .cache
272 .get(&epoch)
273 .is_some_and(|events| events.finalized.contains_key(&height))
274 {
275 info!(
276 %height,
277 %digest,
278 %parent,
279 "finalized block was already found in cache; dropping",
280 );
281 return Ok(());
282 }
283
284 let section = epoch.get();
285 self.events
286 .append(
287 section,
288 &Event::Finalized {
289 digest,
290 parent,
291 height,
292 },
293 )
294 .await
295 .wrap_err("failed to append finalized block to journal")?;
296 self.events
297 .sync(section)
298 .await
299 .wrap_err("unable to sync journal")?;
300
301 let cache = self.cache.entry(epoch).or_default();
302 cache.finalized.insert(
303 height,
304 FinalizedBlockInfo {
305 height: block.height(),
306 digest: block.digest(),
307 parent: block.parent_digest(),
308 },
309 );
310 Ok(())
311 }
312
313 pub(super) fn cache_dkg_outcome(
314 &mut self,
315 epoch: Epoch,
316 digest: Digest,
317 output: Output<MinSig, PublicKey>,
318 share: ShareState,
319 ) {
320 self.cache
321 .entry(epoch)
322 .or_default()
323 .dkg_outcomes
324 .insert(digest, (output, share));
325 }
326
327 pub(super) fn get_dkg_outcome(
328 &self,
329 epoch: &Epoch,
330 digest: &Digest,
331 ) -> Option<&(Output<MinSig, PublicKey>, ShareState)> {
332 self.cache
333 .get(epoch)
334 .and_then(|events| events.dkg_outcomes.get(digest))
335 }
336
337 pub(super) fn cache_notarized_block(&mut self, round: &Round, block: Block) {
343 let cache = self.cache.entry(round.epoch).or_default();
344 let log = ReducedBlock::from_block_for_round(&block, round);
345 cache.notarized_blocks.insert(log.digest, log);
346 }
347
348 #[instrument(
349 skip_all,
350 fields(
351 me = %me.public_key(),
352 epoch = %round.epoch,
353 )
354 err,
355 )]
356 pub(super) fn create_dealer_for_round(
357 &mut self,
358 me: PrivateKey,
359 round: Round,
360 share: ShareState,
361 seed: Summary,
362 ) -> eyre::Result<Option<Dealer>> {
363 if round.dealers.position(&me.public_key()).is_none() {
364 return Ok(None);
365 }
366
367 let share = if round.is_full_dkg() {
368 info!("running full DKG ceremony as dealer (new polynomial)");
369 None
370 } else {
371 let inner = share.into_inner();
372 if inner.is_none() {
373 warn!(
374 "we are a dealer in this round, but we do not have a share, \
375 which means we likely lost it; will not instantiate a dealer \
376 instance and hope to get a new share in the next round if we \
377 are a player"
378 );
379 return Ok(None);
380 }
381 inner
382 };
383
384 let (mut dealer, pub_msg, priv_msgs) = dkg::Dealer::start::<N3f1>(
385 Transcript::resume(seed).noise(b"dealer-rng"),
386 round.info.clone(),
387 me.clone(),
388 share,
389 )
390 .wrap_err("unable to start cryptographic dealer instance")?;
391
392 let mut unsent: BTreeMap<PublicKey, DealerPrivMsg> = priv_msgs.into_iter().collect();
394 for (player, ack) in self.acks_for_epoch(round.epoch) {
395 if unsent.contains_key(player)
396 && dealer
397 .receive_player_ack(player.clone(), ack.clone())
398 .is_ok()
399 {
400 unsent.remove(player);
401 debug!(%player, "replayed player ack");
402 }
403 }
404
405 Ok(Some(Dealer::new(Some(dealer), pub_msg, unsent)))
406 }
407
408 #[instrument(
410 skip_all,
411 fields(
412 epoch = %round.epoch,
413 me = %me.public_key(),
414 )
415 err,
416 )]
417 pub(super) fn create_player_for_round(
418 &self,
419 me: PrivateKey,
420 round: &Round,
421 ) -> eyre::Result<Option<Player>> {
422 if round.players.position(&me.public_key()).is_none() {
423 return Ok(None);
424 }
425
426 let mut player = Player::new(
427 dkg::Player::new(round.info.clone(), me)
428 .wrap_err("unable to start cryptographic player instance")?,
429 );
430
431 for (dealer, (pub_msg, priv_msg)) in self.dealings_for_epoch(round.epoch()) {
433 player.replay(dealer.clone(), pub_msg.clone(), priv_msg.clone());
434 debug!(%dealer, "replayed committed dealer message");
435 }
436
437 Ok(Some(player))
438 }
439
440 pub(super) fn get_latest_finalized_block_for_epoch(
441 &self,
442 epoch: &Epoch,
443 ) -> Option<(&Height, &FinalizedBlockInfo)> {
444 self.cache
445 .get(epoch)
446 .and_then(|cache| cache.finalized.last_key_value())
447 }
448
449 pub(super) fn get_notarized_reduced_block(
450 &mut self,
451 epoch: &Epoch,
452 digest: &Digest,
453 ) -> Option<&ReducedBlock> {
454 self.cache
455 .get(epoch)
456 .and_then(|cache| cache.notarized_blocks.get(digest))
457 }
458
459 #[instrument(skip_all, fields(%up_to_epoch), err)]
460 pub(super) async fn prune(&mut self, up_to_epoch: Epoch) -> eyre::Result<()> {
461 self.events
462 .prune(up_to_epoch.get())
463 .await
464 .wrap_err("unable to prune events journal")?;
465 self.states.retain(|&key, _| key >= up_to_epoch.get());
466 self.states
467 .sync()
468 .await
469 .wrap_err("unable to prune events metadata")?;
470 self.cache.retain(|&epoch, _| epoch >= up_to_epoch);
471 Ok(())
472 }
473}
474
475#[derive(Default)]
476pub(super) struct Builder {
477 initial_state: Option<BoxFuture<'static, eyre::Result<State>>>,
478 partition_prefix: Option<String>,
479}
480
481impl Builder {
482 pub(super) fn initial_state(
483 self,
484 initial_state: impl Future<Output = eyre::Result<State>> + Send + 'static,
485 ) -> Self {
486 Self {
487 initial_state: Some(initial_state.boxed()),
488 ..self
489 }
490 }
491
492 pub(super) fn partition_prefix(self, partition_prefix: &str) -> Self {
493 Self {
494 partition_prefix: Some(partition_prefix.to_string()),
495 ..self
496 }
497 }
498
499 #[instrument(skip_all, err)]
500 pub(super) async fn init<TContext>(self, context: TContext) -> eyre::Result<Storage<TContext>>
501 where
502 TContext: BufferPooler + commonware_runtime::Storage + Clock + Metrics,
503 {
504 let Self {
505 initial_state,
506 partition_prefix,
507 } = self;
508 let partition_prefix =
509 partition_prefix.ok_or_eyre("DKG actors state must have its partition prefix set")?;
510
511 let page_cache = CacheRef::from_pooler(&context, PAGE_SIZE, POOL_CAPACITY);
512
513 let states_partition = format!("{partition_prefix}_states");
514 let states_metadata_partition = format!("{partition_prefix}_states_metadata");
515
516 let mut states = metadata::Metadata::init(
517 context.with_label("states"),
518 metadata::Config {
519 partition: states_metadata_partition,
520 codec_config: MAXIMUM_VALIDATORS,
521 },
522 )
523 .await
524 .wrap_err("unable to initialize DKG states metadata")?;
525
526 migrate_journal_to_metadata_if_necessary(
527 &context,
528 &mut states,
529 &states_partition,
530 &page_cache,
531 )
532 .await
533 .wrap_err("migrating legacy states journal failed")?;
534
535 if states.keys().max().is_none() {
536 let initial_state = match initial_state {
537 None => {
538 return Err(eyre!(
539 "states metadata was empty, legacy journal was empty, \
540 and initializer was not set"
541 ));
542 }
543 Some(initial_state) => initial_state
544 .await
545 .wrap_err("failed constructing initial state to populate storage")?,
546 };
547 states
548 .put_sync(initial_state.epoch.get(), initial_state)
549 .await
550 .wrap_err("unable to write initial state to metadata")?;
551 }
552
553 let current = states
554 .keys()
555 .max()
556 .map(|epoch| {
557 states
558 .get(epoch)
559 .expect("state at keys iterator must exist")
560 .clone()
561 })
562 .expect("states storage must contain a state after initialization");
563
564 let events = segmented::variable::Journal::init(
565 context.with_label("events"),
566 segmented::variable::Config {
567 partition: format!("{partition_prefix}_events"),
568 compression: None,
569 codec_config: MAXIMUM_VALIDATORS,
570 page_cache,
571 write_buffer: WRITE_BUFFER,
572 },
573 )
574 .await
575 .expect("should be able to initialize events journal");
576
577 let mut cache = BTreeMap::<Epoch, Events>::new();
579 {
580 let replay = events
581 .replay(0, 0, READ_BUFFER)
582 .await
583 .wrap_err("unable to start a replay stream to populate events cache")?;
584 futures::pin_mut!(replay);
585
586 while let Some(result) = replay.next().await {
587 let (section, _, _, event) =
588 result.wrap_err("unable to read entry in replay stream")?;
589 let epoch = Epoch::new(section);
590 let events = cache.entry(epoch).or_default();
591 events.insert(event);
592 }
593 }
594
595 Ok(Storage {
596 states,
597 events,
598 current,
599 cache,
600 })
601 }
602}
603
604async fn migrate_journal_to_metadata_if_necessary<TContext>(
605 context: &TContext,
606 states: &mut metadata::Metadata<TContext, u64, State>,
607 states_partition: &str,
608 page_cache: &CacheRef,
609) -> eyre::Result<()>
610where
611 TContext: BufferPooler + commonware_runtime::Storage + Clock + Metrics,
612{
613 if states.keys().next().is_some() {
614 debug!("states already exists in new format; not migrating");
615 return Ok(());
616 }
617
618 let legacy_journal = contiguous::variable::Journal::<TContext, LegacyState>::init(
619 context.with_label("states_legacy"),
620 contiguous::variable::Config {
621 partition: states_partition.to_string(),
622 compression: None,
623 codec_config: MAXIMUM_VALIDATORS,
624 page_cache: page_cache.clone(),
625 write_buffer: WRITE_BUFFER,
626 items_per_section: NZU64!(1),
627 },
628 )
629 .await
630 .wrap_err("unable to initialize legacy DKG states journal for migration")?;
631
632 if let Some(latest_segment) = legacy_journal.size().await.checked_sub(1) {
633 info!(
634 latest_segment,
635 "legacy journal contains states; migrating last 2 segments",
636 );
637
638 let reader = legacy_journal.reader().await;
639 for segment in latest_segment.saturating_sub(1)..=latest_segment {
640 let legacy_state = reader
641 .read(segment)
642 .await
643 .wrap_err("unable to read state from legacy journal")?;
644 let state: State = legacy_state.into();
645 let epoch = state.epoch;
646 states.put(epoch.get(), state);
647 info!(%epoch, "migrated state to new format");
648 }
649 states
650 .sync()
651 .await
652 .wrap_err("unable to persist migrated states")?;
653
654 info!("states migrated to new format, deleting journal");
655 }
656
657 legacy_journal
658 .destroy()
659 .await
660 .wrap_err("unable to destroy legacy states journal")?;
661
662 Ok(())
663}
664
665#[derive(Clone, Debug, PartialEq, Eq)]
673pub(super) enum ShareState {
674 Plaintext(Option<Share>),
675}
676
677impl ShareState {
678 pub(super) fn into_inner(self) -> Option<Share> {
679 match self {
680 Self::Plaintext(share) => share,
681 }
682 }
683}
684
685impl EncodeSize for ShareState {
686 fn encode_size(&self) -> usize {
687 match self {
688 Self::Plaintext(share) => 1 + share.encode_size(),
689 }
690 }
691}
692
693impl Write for ShareState {
694 fn write(&self, buf: &mut impl bytes::BufMut) {
695 match self {
696 Self::Plaintext(share) => {
697 0u8.write(buf);
698 share.write(buf);
699 }
700 }
701 }
702}
703
704impl Read for ShareState {
705 type Cfg = ();
706
707 fn read_cfg(
708 buf: &mut impl bytes::Buf,
709 _cfg: &Self::Cfg,
710 ) -> Result<Self, commonware_codec::Error> {
711 let tag = u8::read(buf)?;
712 match tag {
713 0 => Ok(Self::Plaintext(ReadExt::read(buf)?)),
714 other => Err(commonware_codec::Error::InvalidEnum(other)),
715 }
716 }
717}
718
719#[derive(Clone, Debug, PartialEq, Eq)]
721pub(super) struct State {
722 pub(super) epoch: Epoch,
723 pub(super) seed: Summary,
724 pub(super) output: Output<MinSig, PublicKey>,
725 pub(super) share: ShareState,
726 pub(super) players: ordered::Set<PublicKey>,
727 pub(super) syncers: ordered::Set<PublicKey>,
728 pub(super) is_full_dkg: bool,
729}
730
731impl State {
732 pub(super) fn dealers(&self) -> &ordered::Set<PublicKey> {
734 self.output.players()
735 }
736
737 pub(super) fn players(&self) -> &ordered::Set<PublicKey> {
739 &self.players
740 }
741}
742
743impl EncodeSize for State {
744 fn encode_size(&self) -> usize {
745 self.epoch.encode_size()
746 + self.seed.encode_size()
747 + self.output.encode_size()
748 + self.share.encode_size()
749 + self.players.encode_size()
750 + self.syncers.encode_size()
751 + self.is_full_dkg.encode_size()
752 }
753}
754
755impl Write for State {
756 fn write(&self, buf: &mut impl bytes::BufMut) {
757 self.epoch.write(buf);
758 self.seed.write(buf);
759 self.output.write(buf);
760 self.share.write(buf);
761 self.players.write(buf);
762 self.syncers.write(buf);
763 self.is_full_dkg.write(buf);
764 }
765}
766
767impl Read for State {
768 type Cfg = NonZeroU32;
769
770 fn read_cfg(
771 buf: &mut impl bytes::Buf,
772 cfg: &Self::Cfg,
773 ) -> Result<Self, commonware_codec::Error> {
774 Ok(Self {
775 epoch: ReadExt::read(buf)?,
776 seed: ReadExt::read(buf)?,
777 output: Read::read_cfg(buf, &(*cfg, ModeVersion::v0()))?,
778 share: ReadExt::read(buf)?,
779 players: Read::read_cfg(buf, &(RangeCfg::from(1..=(u16::MAX as usize)), ()))?,
780 syncers: Read::read_cfg(buf, &(RangeCfg::from(0..=(u16::MAX as usize)), ()))?,
783 is_full_dkg: ReadExt::read(buf)?,
784 })
785 }
786}
787
788#[derive(Clone)]
794struct LegacyState {
795 epoch: Epoch,
796 seed: Summary,
797 output: Output<MinSig, PublicKey>,
798 share: Option<Share>,
799 dealers: ordered::Map<PublicKey, SocketAddr>,
800 players: ordered::Map<PublicKey, SocketAddr>,
801 syncers: ordered::Map<PublicKey, SocketAddr>,
802 is_full_dkg: bool,
803}
804
805impl EncodeSize for LegacyState {
806 fn encode_size(&self) -> usize {
807 self.epoch.encode_size()
808 + self.seed.encode_size()
809 + self.output.encode_size()
810 + self.share.encode_size()
811 + self.dealers.encode_size()
812 + self.players.encode_size()
813 + self.syncers.encode_size()
814 + self.is_full_dkg.encode_size()
815 }
816}
817
818impl Write for LegacyState {
819 fn write(&self, buf: &mut impl bytes::BufMut) {
820 self.epoch.write(buf);
821 self.seed.write(buf);
822 self.output.write(buf);
823 self.share.write(buf);
824 self.dealers.write(buf);
825 self.players.write(buf);
826 self.syncers.write(buf);
827 self.is_full_dkg.write(buf);
828 }
829}
830
831impl Read for LegacyState {
832 type Cfg = NonZeroU32;
833
834 fn read_cfg(
835 buf: &mut impl bytes::Buf,
836 cfg: &Self::Cfg,
837 ) -> Result<Self, commonware_codec::Error> {
838 Ok(Self {
839 epoch: ReadExt::read(buf)?,
840 seed: ReadExt::read(buf)?,
841 output: Read::read_cfg(buf, &(*cfg, ModeVersion::v0()))?,
842 share: ReadExt::read(buf)?,
843 dealers: Read::read_cfg(buf, &(RangeCfg::from(1..=(u16::MAX as usize)), (), ()))?,
844 players: Read::read_cfg(buf, &(RangeCfg::from(1..=(u16::MAX as usize)), (), ()))?,
845 syncers: Read::read_cfg(buf, &(RangeCfg::from(1..=(u16::MAX as usize)), (), ()))?,
846 is_full_dkg: ReadExt::read(buf)?,
847 })
848 }
849}
850
851impl From<LegacyState> for State {
852 fn from(legacy: LegacyState) -> Self {
853 Self {
854 epoch: legacy.epoch,
855 seed: legacy.seed,
856 output: legacy.output,
857 share: ShareState::Plaintext(legacy.share),
858 players: legacy.players.keys().clone(),
859 syncers: legacy.syncers.keys().clone(),
860 is_full_dkg: legacy.is_full_dkg,
861 }
862 }
863}
864
865#[expect(
866 dead_code,
867 reason = "tracking this data is virtually free and might become useful later"
868)]
869#[derive(Clone, Debug)]
870pub(super) struct FinalizedBlockInfo {
871 pub(super) height: Height,
872 pub(super) digest: Digest,
873 pub(super) parent: Digest,
874}
875
876#[derive(Debug, Default)]
878struct Events {
879 acks: BTreeMap<PublicKey, PlayerAck<PublicKey>>,
880 dealings: BTreeMap<PublicKey, (DealerPubMsg<MinSig>, DealerPrivMsg)>,
881 logs: BTreeMap<PublicKey, dkg::DealerLog<MinSig, PublicKey>>,
882 finalized: BTreeMap<Height, FinalizedBlockInfo>,
883
884 notarized_blocks: HashMap<Digest, ReducedBlock>,
885 dkg_outcomes: HashMap<Digest, (Output<MinSig, PublicKey>, ShareState)>,
886}
887
888impl Events {
889 fn insert(&mut self, event: Event) {
890 match event {
891 Event::Dealing {
892 dealer: public_key,
893 public_msg,
894 private_msg,
895 } => {
896 self.dealings.insert(public_key, (public_msg, private_msg));
897 }
898 Event::Ack {
899 player: public_key,
900 ack,
901 } => {
902 self.acks.insert(public_key, ack);
903 }
904 Event::Log { dealer, log } => {
905 self.logs.insert(dealer, log);
906 }
907 Event::Finalized {
908 digest,
909 parent,
910 height,
911 } => {
912 self.finalized.insert(
913 height,
914 FinalizedBlockInfo {
915 height,
916 digest,
917 parent,
918 },
919 );
920 }
921 }
922 }
923}
924
925enum Event {
926 Dealing {
928 dealer: PublicKey,
929 public_msg: DealerPubMsg<MinSig>,
930 private_msg: DealerPrivMsg,
931 },
932 Ack {
934 player: PublicKey,
935 ack: PlayerAck<PublicKey>,
936 },
937 Log {
939 dealer: PublicKey,
940 log: dkg::DealerLog<MinSig, PublicKey>,
941 },
942 Finalized {
944 digest: Digest,
945 parent: Digest,
946 height: Height,
947 },
948}
949
950impl EncodeSize for Event {
951 fn encode_size(&self) -> usize {
952 1 + match self {
953 Self::Dealing {
954 dealer: public_key,
955 public_msg,
956 private_msg,
957 } => public_key.encode_size() + public_msg.encode_size() + private_msg.encode_size(),
958 Self::Ack {
959 player: public_key,
960 ack,
961 } => public_key.encode_size() + ack.encode_size(),
962 Self::Log { dealer, log } => dealer.encode_size() + log.encode_size(),
963 Self::Finalized {
964 digest,
965 parent,
966 height,
967 } => digest.encode_size() + parent.encode_size() + height.encode_size(),
968 }
969 }
970}
971
972impl Write for Event {
973 fn write(&self, buf: &mut impl bytes::BufMut) {
974 match self {
975 Self::Dealing {
976 dealer: public_key,
977 public_msg,
978 private_msg,
979 } => {
980 0u8.write(buf);
981 public_key.write(buf);
982 public_msg.write(buf);
983 private_msg.write(buf);
984 }
985 Self::Ack {
986 player: public_key,
987 ack,
988 } => {
989 1u8.write(buf);
990 public_key.write(buf);
991 ack.write(buf);
992 }
993 Self::Log { dealer, log } => {
994 2u8.write(buf);
995 dealer.write(buf);
996 log.write(buf);
997 }
998 Self::Finalized {
999 digest,
1000 parent,
1001 height,
1002 } => {
1003 3u8.write(buf);
1004 digest.write(buf);
1005 parent.write(buf);
1006 height.write(buf);
1007 }
1008 }
1009 }
1010}
1011
1012impl Read for Event {
1013 type Cfg = NonZeroU32;
1014
1015 fn read_cfg(
1016 buf: &mut impl bytes::Buf,
1017 cfg: &Self::Cfg,
1018 ) -> Result<Self, commonware_codec::Error> {
1019 let tag = u8::read(buf)?;
1020 match tag {
1021 0 => Ok(Self::Dealing {
1022 dealer: ReadExt::read(buf)?,
1023 public_msg: Read::read_cfg(buf, cfg)?,
1024 private_msg: ReadExt::read(buf)?,
1025 }),
1026 1 => Ok(Self::Ack {
1027 player: ReadExt::read(buf)?,
1028 ack: ReadExt::read(buf)?,
1029 }),
1030 2 => Ok(Self::Log {
1031 dealer: ReadExt::read(buf)?,
1032 log: Read::read_cfg(buf, &NZU32!(u16::MAX as u32))?,
1033 }),
1034 3 => Ok(Self::Finalized {
1035 digest: ReadExt::read(buf)?,
1036 parent: ReadExt::read(buf)?,
1037 height: ReadExt::read(buf)?,
1038 }),
1039 other => Err(commonware_codec::Error::InvalidEnum(other)),
1040 }
1041 }
1042}
1043
1044pub(super) struct Dealer {
1046 dealer: Option<dkg::Dealer<MinSig, PrivateKey>>,
1049
1050 pub_msg: DealerPubMsg<MinSig>,
1053
1054 unsent: BTreeMap<PublicKey, DealerPrivMsg>,
1057
1058 finalized: Option<SignedDealerLog<MinSig, PrivateKey>>,
1062}
1063
1064impl Dealer {
1065 pub(super) const fn new(
1066 dealer: Option<dkg::Dealer<MinSig, PrivateKey>>,
1067 pub_msg: DealerPubMsg<MinSig>,
1068 unsent: BTreeMap<PublicKey, DealerPrivMsg>,
1069 ) -> Self {
1070 Self {
1071 dealer,
1072 pub_msg,
1073 unsent,
1074 finalized: None,
1075 }
1076 }
1077
1078 pub(super) async fn receive_ack<TContext>(
1083 &mut self,
1084 storage: &mut Storage<TContext>,
1085 epoch: Epoch,
1086 player: PublicKey,
1087 ack: PlayerAck<PublicKey>,
1088 ) -> eyre::Result<()>
1089 where
1090 TContext: commonware_runtime::Storage + Clock + Metrics,
1091 {
1092 if !self.unsent.contains_key(&player) {
1093 bail!("already received an ack from `{player}`");
1094 }
1095 match &mut self.dealer {
1096 Some(dealer) => {
1097 dealer
1098 .receive_player_ack(player.clone(), ack.clone())
1099 .wrap_err("unable to receive player ack")?;
1100 self.unsent.remove(&player);
1101 storage
1102 .append_ack(epoch, player.clone(), ack.clone())
1103 .await
1104 .wrap_err("unable to append ack to journal")?;
1105 }
1106 None => bail!("dealer was already finalized, dropping ack of player `{player}`"),
1107 }
1108 Ok(())
1109 }
1110
1111 pub(super) fn finalize(&mut self) {
1113 if self.finalized.is_some() {
1114 return;
1115 }
1116
1117 if let Some(dealer) = self.dealer.take() {
1120 let log = dealer.finalize::<N3f1>();
1121 self.finalized = Some(log);
1122 }
1123 }
1124
1125 pub(super) fn finalized(&self) -> Option<SignedDealerLog<MinSig, PrivateKey>> {
1127 self.finalized.clone()
1128 }
1129
1130 pub(super) const fn take_finalized(&mut self) -> Option<SignedDealerLog<MinSig, PrivateKey>> {
1132 self.finalized.take()
1133 }
1134
1135 pub(super) fn shares_to_distribute(
1140 &self,
1141 ) -> impl Iterator<Item = (PublicKey, DealerPubMsg<MinSig>, DealerPrivMsg)> + '_ {
1142 self.unsent
1143 .iter()
1144 .map(|(player, priv_msg)| (player.clone(), self.pub_msg.clone(), priv_msg.clone()))
1145 }
1146}
1147
1148#[derive(Clone, Debug)]
1149pub(super) struct Round {
1150 epoch: Epoch,
1151 info: dkg::Info<MinSig, PublicKey>,
1152 dealers: ordered::Set<PublicKey>,
1153 players: ordered::Set<PublicKey>,
1154 is_full_dkg: bool,
1155}
1156
1157impl Round {
1158 pub(super) fn from_state(state: &State, namespace: &[u8]) -> Self {
1159 let previous_output = if state.is_full_dkg {
1161 None
1162 } else {
1163 Some(state.output.clone())
1164 };
1165
1166 let dealers = state.dealers().clone();
1167 let players = state.players().clone();
1168
1169 Self {
1170 epoch: state.epoch,
1171 info: Info::new::<N3f1>(
1172 namespace,
1173 state.epoch.get(),
1174 previous_output,
1175 Mode::NonZeroCounter,
1176 dealers.clone(),
1177 players.clone(),
1178 )
1179 .expect("a DKG round must always be initializable given some epoch state"),
1180 dealers,
1181 players,
1182 is_full_dkg: state.is_full_dkg,
1183 }
1184 }
1185
1186 pub(super) fn info(&self) -> &dkg::Info<MinSig, PublicKey> {
1187 &self.info
1188 }
1189
1190 pub(super) fn epoch(&self) -> Epoch {
1191 self.epoch
1192 }
1193
1194 pub(super) fn dealers(&self) -> &ordered::Set<PublicKey> {
1195 &self.dealers
1196 }
1197
1198 pub(super) fn players(&self) -> &ordered::Set<PublicKey> {
1199 &self.players
1200 }
1201
1202 pub(super) fn is_full_dkg(&self) -> bool {
1203 self.is_full_dkg
1204 }
1205}
1206
1207pub(super) struct Player {
1209 player: dkg::Player<MinSig, PrivateKey>,
1210 acks: BTreeMap<PublicKey, PlayerAck<PublicKey>>,
1213}
1214
1215impl Player {
1216 pub(super) const fn new(player: dkg::Player<MinSig, PrivateKey>) -> Self {
1217 Self {
1218 player,
1219 acks: BTreeMap::new(),
1220 }
1221 }
1222
1223 pub(super) async fn receive_dealing<TContext>(
1227 &mut self,
1228 storage: &mut Storage<TContext>,
1229 epoch: Epoch,
1230 dealer: PublicKey,
1231 pub_msg: DealerPubMsg<MinSig>,
1232 priv_msg: DealerPrivMsg,
1233 ) -> eyre::Result<PlayerAck<PublicKey>>
1234 where
1235 TContext: commonware_runtime::Storage + Clock + Metrics,
1236 {
1237 if let Some(ack) = self.acks.get(&dealer) {
1239 return Ok(ack.clone());
1240 }
1241
1242 let ack = self
1244 .player
1245 .dealer_message::<N3f1>(dealer.clone(), pub_msg.clone(), priv_msg.clone())
1246 .ok_or_eyre(
1248 "applying dealer message to player instance did not result in a usable ack",
1249 )?;
1250 storage
1251 .append_dealing(epoch, dealer.clone(), pub_msg, priv_msg)
1252 .await
1253 .wrap_err("unable to append dealing to journal")?;
1254 self.acks.insert(dealer, ack.clone());
1255 Ok(ack)
1256 }
1257
1258 fn replay(
1260 &mut self,
1261 dealer: PublicKey,
1262 pub_msg: DealerPubMsg<MinSig>,
1263 priv_msg: DealerPrivMsg,
1264 ) {
1265 if self.acks.contains_key(&dealer) {
1266 return;
1267 }
1268 if let Some(ack) = self
1269 .player
1270 .dealer_message::<N3f1>(dealer.clone(), pub_msg, priv_msg)
1271 {
1272 self.acks.insert(dealer, ack);
1273 }
1274 }
1275
1276 pub(super) fn finalize(
1278 self,
1279 logs: BTreeMap<PublicKey, dkg::DealerLog<MinSig, PublicKey>>,
1280 strategy: &impl Strategy,
1281 ) -> Result<(Output<MinSig, PublicKey>, Share), dkg::Error> {
1282 self.player.finalize::<N3f1>(logs, strategy)
1283 }
1284}
1285
1286#[derive(Clone, Debug)]
1288pub(super) struct ReducedBlock {
1289 pub(super) height: Height,
1291
1292 pub(super) parent: Digest,
1294
1295 pub(super) digest: Digest,
1297
1298 pub(super) log: Option<(PublicKey, dkg::DealerLog<MinSig, PublicKey>)>,
1300}
1301
1302impl ReducedBlock {
1303 pub(super) fn from_block_for_round(block: &Block, round: &Round) -> Self {
1304 let log = if block.header().extra_data().is_empty() {
1305 None
1306 } else {
1307 dkg::SignedDealerLog::<MinSig, PrivateKey>::read_cfg(
1308 &mut block.header().extra_data().as_ref(),
1309 &NZU32!(round.players.len() as u32),
1310 )
1311 .inspect(|_| {
1312 info!(
1313 height = %block.height(),
1314 digest = %block.digest(),
1315 "found dealer log in block"
1316 )
1317 })
1318 .inspect_err(|error| {
1319 warn!(
1320 %error,
1321 "block header extraData had data, but it could not be read as \
1322 a signed dealer log",
1323 )
1324 })
1325 .ok()
1326 .and_then(|log| match log.check(&round.info) {
1327 Some((dealer, log)) => Some((dealer, log)),
1328 None => {
1329 warn!("log failed check against current round");
1331 None
1332 }
1333 })
1334 };
1335 Self {
1336 height: block.height(),
1337 parent: block.parent(),
1338 digest: block.digest(),
1339 log,
1340 }
1341 }
1342}
1343
1344#[cfg(test)]
1345mod tests {
1346 use super::*;
1347 use commonware_codec::Encode as _;
1348 use commonware_cryptography::{
1349 bls12381::{dkg, primitives::sharing::Mode},
1350 ed25519::PrivateKey,
1351 transcript::Summary,
1352 };
1353 use commonware_math::algebra::Random as _;
1354 use commonware_runtime::{Runner as _, deterministic};
1355 use commonware_utils::TryFromIterator as _;
1356
1357 fn make_test_state(rng: &mut impl rand_core::CryptoRngCore, epoch: u64) -> State {
1358 let mut keys: Vec<_> = (0..3)
1359 .map(|i| PrivateKey::from_seed(i + epoch * 100))
1360 .collect();
1361 keys.sort_by_key(|k| k.public_key());
1362 let pubkeys = ordered::Set::try_from_iter(keys.iter().map(|k| k.public_key())).unwrap();
1363
1364 let (output, _shares) =
1365 dkg::deal::<_, _, N3f1>(&mut *rng, Mode::NonZeroCounter, pubkeys.clone()).unwrap();
1366
1367 State {
1368 epoch: Epoch::new(epoch),
1369 seed: Summary::random(rng),
1370 output,
1371 share: ShareState::Plaintext(None),
1372 players: pubkeys.clone(),
1373 syncers: pubkeys,
1374 is_full_dkg: false,
1375 }
1376 }
1377
1378 fn make_legacy_state(rng: &mut impl rand_core::CryptoRngCore, epoch: u64) -> LegacyState {
1379 let mut keys: Vec<_> = (0..3)
1380 .map(|i| PrivateKey::from_seed(i + epoch * 100))
1381 .collect();
1382 keys.sort_by_key(|k| k.public_key());
1383 let pubkeys = ordered::Set::try_from_iter(keys.iter().map(|k| k.public_key())).unwrap();
1384
1385 let (output, _shares) =
1386 dkg::deal::<_, _, N3f1>(&mut *rng, Mode::NonZeroCounter, pubkeys.clone()).unwrap();
1387
1388 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
1389 let peers =
1390 ordered::Map::from_iter_dedup(pubkeys.iter().map(|k: &PublicKey| (k.clone(), addr)));
1391
1392 LegacyState {
1393 epoch: Epoch::new(epoch),
1394 seed: Summary::random(rng),
1395 output,
1396 share: None,
1397 dealers: peers.clone(),
1398 players: peers.clone(),
1399 syncers: peers,
1400 is_full_dkg: false,
1401 }
1402 }
1403
1404 #[test]
1405 fn state_round_trip_with() {
1406 let executor = deterministic::Runner::default();
1407 executor.start(|mut context| async move {
1408 let state = make_test_state(&mut context, 0);
1409 let mut bytes = state.encode();
1410 assert_eq!(
1411 state,
1412 State::read_cfg(&mut bytes, &NZU32!(u32::MAX)).unwrap(),
1413 );
1414
1415 let state_without_syncers = {
1416 let mut s = make_test_state(&mut context, 0);
1417 s.syncers = Default::default();
1418 s
1419 };
1420 let mut bytes = state_without_syncers.encode();
1421 assert_eq!(
1422 state_without_syncers,
1423 State::read_cfg(&mut bytes, &NZU32!(u32::MAX)).unwrap(),
1424 );
1425 });
1426 }
1427
1428 #[test]
1429 fn states_migration_migrates_last_two() {
1430 let executor = deterministic::Runner::default();
1431 executor.start(|mut context| async move {
1432 let partition_prefix = "test_dkg";
1433 let states_partition = format!("{partition_prefix}_states");
1434 let states_metadata_partition = format!("{partition_prefix}_states_metadata");
1435 let page_cache = CacheRef::from_pooler(&context, PAGE_SIZE, POOL_CAPACITY);
1436
1437 let ancient_legacy = make_legacy_state(&mut context, 1);
1438 let previous_legacy = make_legacy_state(&mut context, 2);
1439 let latest_legacy = make_legacy_state(&mut context, 3);
1440 let previous_expected: State = previous_legacy.clone().into();
1441 let latest_expected: State = latest_legacy.clone().into();
1442
1443 {
1445 let journal = contiguous::variable::Journal::<_, LegacyState>::init(
1446 context.with_label("journal_setup"),
1447 contiguous::variable::Config {
1448 partition: states_partition.clone(),
1449 compression: None,
1450 codec_config: MAXIMUM_VALIDATORS,
1451 page_cache: page_cache.clone(),
1452 write_buffer: WRITE_BUFFER,
1453 items_per_section: NZU64!(1),
1454 },
1455 )
1456 .await
1457 .unwrap();
1458
1459 journal.append(&ancient_legacy).await.unwrap();
1460 journal.append(&previous_legacy).await.unwrap();
1461 journal.append(&latest_legacy).await.unwrap();
1462 journal.sync().await.unwrap();
1463 }
1464
1465 let mut states = metadata::Metadata::init(
1466 context.with_label("states"),
1467 metadata::Config {
1468 partition: states_metadata_partition.clone(),
1469 codec_config: MAXIMUM_VALIDATORS,
1470 },
1471 )
1472 .await
1473 .unwrap();
1474
1475 migrate_journal_to_metadata_if_necessary(
1476 &context,
1477 &mut states,
1478 &states_partition,
1479 &page_cache,
1480 )
1481 .await
1482 .unwrap();
1483
1484 assert!(
1486 states.get(&1).is_none(),
1487 "ancient state should not be migrated"
1488 );
1489 assert_eq!(
1490 &previous_expected,
1491 states.get(&previous_expected.epoch.get()).unwrap(),
1492 );
1493 assert_eq!(
1494 &latest_expected,
1495 states.get(&latest_expected.epoch.get()).unwrap(),
1496 );
1497
1498 let reopened = contiguous::variable::Journal::<_, LegacyState>::init(
1500 context.with_label("journal_verify"),
1501 contiguous::variable::Config {
1502 partition: states_partition.clone(),
1503 compression: None,
1504 codec_config: MAXIMUM_VALIDATORS,
1505 page_cache: page_cache.clone(),
1506 write_buffer: WRITE_BUFFER,
1507 items_per_section: NZU64!(1),
1508 },
1509 )
1510 .await
1511 .unwrap();
1512
1513 assert_eq!(reopened.size().await, 0);
1514
1515 drop(states);
1517 let states = metadata::Metadata::<_, u64, State>::init(
1518 context.with_label("states_reopen"),
1519 metadata::Config {
1520 partition: states_metadata_partition,
1521 codec_config: MAXIMUM_VALIDATORS,
1522 },
1523 )
1524 .await
1525 .unwrap();
1526 assert_eq!(
1527 &previous_expected,
1528 states.get(&previous_expected.epoch.get()).unwrap(),
1529 );
1530 assert_eq!(
1531 &latest_expected,
1532 states.get(&latest_expected.epoch.get()).unwrap(),
1533 );
1534 });
1535 }
1536
1537 #[test]
1538 fn states_migration_single_entry() {
1539 let executor = deterministic::Runner::default();
1540 executor.start(|mut context| async move {
1541 let partition_prefix = "test_dkg_single";
1542 let states_partition = format!("{partition_prefix}_states");
1543 let states_metadata_partition = format!("{partition_prefix}_states_metadata");
1544 let page_cache = CacheRef::from_pooler(&context, PAGE_SIZE, POOL_CAPACITY);
1545
1546 let only_legacy = make_legacy_state(&mut context, 5);
1547 let only_expected: State = only_legacy.clone().into();
1548
1549 {
1551 let journal = contiguous::variable::Journal::<_, LegacyState>::init(
1552 context.with_label("journal_setup"),
1553 contiguous::variable::Config {
1554 partition: states_partition.clone(),
1555 compression: None,
1556 codec_config: MAXIMUM_VALIDATORS,
1557 page_cache: page_cache.clone(),
1558 write_buffer: WRITE_BUFFER,
1559 items_per_section: NZU64!(1),
1560 },
1561 )
1562 .await
1563 .unwrap();
1564
1565 journal.append(&only_legacy).await.unwrap();
1566 journal.sync().await.unwrap();
1567 }
1568
1569 let mut states = metadata::Metadata::init(
1570 context.with_label("states"),
1571 metadata::Config {
1572 partition: states_metadata_partition,
1573 codec_config: MAXIMUM_VALIDATORS,
1574 },
1575 )
1576 .await
1577 .unwrap();
1578
1579 migrate_journal_to_metadata_if_necessary(
1580 &context,
1581 &mut states,
1582 &states_partition,
1583 &page_cache,
1584 )
1585 .await
1586 .unwrap();
1587
1588 assert_eq!(states.keys().count(), 1);
1590 assert_eq!(
1591 &only_expected,
1592 states.get(&only_expected.epoch.get()).unwrap(),
1593 );
1594 });
1595 }
1596
1597 #[test]
1598 fn states_migration_does_not_overwrite_existing_metadata() {
1599 let executor = deterministic::Runner::default();
1600 executor.start(|mut context| async move {
1601 let partition_prefix = "test_dkg_noop";
1602 let states_partition = format!("{partition_prefix}_states");
1603 let states_metadata_partition = format!("{partition_prefix}_states_metadata");
1604 let page_cache = CacheRef::from_pooler(&context, PAGE_SIZE, POOL_CAPACITY);
1605
1606 let existing_state = make_test_state(&mut context, 10);
1607 let journal_legacy = make_legacy_state(&mut context, 20);
1608 let journal_expected_epoch = journal_legacy.epoch.get();
1609
1610 let mut states = metadata::Metadata::init(
1612 context.with_label("states"),
1613 metadata::Config {
1614 partition: states_metadata_partition.clone(),
1615 codec_config: MAXIMUM_VALIDATORS,
1616 },
1617 )
1618 .await
1619 .unwrap();
1620 states.put(existing_state.epoch.get(), existing_state.clone());
1621 states.sync().await.unwrap();
1622
1623 {
1625 let journal = contiguous::variable::Journal::<_, LegacyState>::init(
1626 context.with_label("journal_setup"),
1627 contiguous::variable::Config {
1628 partition: states_partition.clone(),
1629 compression: None,
1630 codec_config: MAXIMUM_VALIDATORS,
1631 page_cache: page_cache.clone(),
1632 write_buffer: WRITE_BUFFER,
1633 items_per_section: NZU64!(1),
1634 },
1635 )
1636 .await
1637 .unwrap();
1638 journal.append(&journal_legacy).await.unwrap();
1639 journal.sync().await.unwrap();
1640 }
1641
1642 migrate_journal_to_metadata_if_necessary(
1644 &context,
1645 &mut states,
1646 &states_partition,
1647 &page_cache,
1648 )
1649 .await
1650 .unwrap();
1651
1652 assert_eq!(states.keys().count(), 1);
1653 assert_eq!(
1654 &existing_state,
1655 states.get(&existing_state.epoch.get()).unwrap(),
1656 );
1657 assert!(
1658 states.get(&journal_expected_epoch).is_none(),
1659 "journal state must not be written into already-populated metadata"
1660 );
1661 });
1662 }
1663
1664 #[track_caller]
1665 fn assert_roundtrip(original: &ShareState) {
1666 use commonware_codec::Encode as _;
1667 let encoded = original.encode();
1668 let decoded = ShareState::read_cfg(&mut encoded.as_ref(), &()).unwrap();
1669 assert_eq!(original, &decoded);
1670 }
1671
1672 #[test]
1673 fn share_state_roundtrip_plaintext_none() {
1674 assert_roundtrip(&ShareState::Plaintext(None));
1675 }
1676
1677 #[test]
1678 fn share_state_roundtrip_plaintext_some() {
1679 use rand_08::SeedableRng as _;
1680 let mut rng = rand_08::rngs::StdRng::seed_from_u64(42);
1681
1682 let keys = std::iter::repeat_with(|| PrivateKey::random(&mut rng))
1683 .take(3)
1684 .collect::<Vec<_>>();
1685 let pubkeys = ordered::Set::try_from_iter(keys.iter().map(|k| k.public_key())).unwrap();
1686
1687 let (_output, shares) =
1688 dkg::deal::<MinSig, _, N3f1>(&mut rng, Mode::NonZeroCounter, pubkeys).unwrap();
1689
1690 let share = shares.into_iter().next().unwrap().1;
1691 assert_roundtrip(&ShareState::Plaintext(Some(share)));
1692 }
1693}