1use std::{
2 collections::{BTreeMap, HashMap},
3 num::{NonZeroU16, NonZeroU32, NonZeroUsize},
4};
5
6use alloy_consensus::BlockHeader as _;
7use commonware_codec::{EncodeSize, RangeCfg, Read, ReadExt, Write};
8use commonware_consensus::{
9 Block as _, Heightable as _,
10 types::{Epoch, Height},
11};
12use commonware_cryptography::{
13 Signer as _,
14 bls12381::{
15 dkg::{self, DealerPrivMsg, DealerPubMsg, Info, Output, PlayerAck, SignedDealerLog},
16 primitives::{
17 group::Share,
18 sharing::{Mode, ModeVersion},
19 variant::MinSig,
20 },
21 },
22 ed25519::{PrivateKey, PublicKey},
23 transcript::{Summary, Transcript},
24};
25use commonware_parallel::Strategy;
26use commonware_runtime::{BufferPooler, Clock, Metrics, buffer::paged::CacheRef};
27use commonware_storage::{journal::segmented, metadata};
28use commonware_utils::{N3f1, NZU16, NZU32, NZUsize, ordered};
29use eyre::{OptionExt, WrapErr as _, bail, eyre};
30use futures::{FutureExt as _, StreamExt as _, future::BoxFuture};
31use tracing::{debug, info, instrument, warn};
32
33use crate::consensus::{Digest, block::Block};
34
35const PAGE_SIZE: NonZeroU16 = NZU16!(1 << 12);
36const POOL_CAPACITY: NonZeroUsize = NZUsize!(1 << 13);
37const WRITE_BUFFER: NonZeroUsize = NZUsize!(1 << 12);
38const READ_BUFFER: NonZeroUsize = NZUsize!(1 << 20);
39
40const MAXIMUM_VALIDATORS: NonZeroU32 = NZU32!(u16::MAX as u32);
46
47pub(super) fn builder() -> Builder {
48 Builder::default()
49}
50
51pub(super) struct Storage<TContext>
52where
53 TContext: commonware_runtime::Storage + Clock + Metrics,
54{
55 states: metadata::Metadata<TContext, u64, State>,
56 events: segmented::variable::Journal<TContext, Event>,
57
58 current: State,
59 cache: BTreeMap<Epoch, Events>,
60}
61
62impl<TContext> Storage<TContext>
63where
64 TContext: commonware_runtime::Storage + Clock + Metrics,
65{
66 fn acks_for_epoch(
68 &self,
69 epoch: Epoch,
70 ) -> impl Iterator<Item = (&PublicKey, &PlayerAck<PublicKey>)> {
71 self.cache
72 .get(&epoch)
73 .into_iter()
74 .flat_map(|cache| cache.acks.iter())
75 }
76
77 fn dealings_for_epoch(
79 &self,
80 epoch: Epoch,
81 ) -> impl Iterator<Item = (&PublicKey, &(DealerPubMsg<MinSig>, DealerPrivMsg))> {
82 self.cache
83 .get(&epoch)
84 .into_iter()
85 .flat_map(|cache| cache.dealings.iter())
86 }
87
88 pub(super) fn logs_for_epoch(
90 &self,
91 epoch: Epoch,
92 ) -> impl Iterator<Item = (&PublicKey, &dkg::DealerLog<MinSig, PublicKey>)> {
93 self.cache
94 .get(&epoch)
95 .into_iter()
96 .flat_map(|cache| cache.logs.iter())
97 }
98
99 pub(super) fn current(&self) -> State {
101 self.current.clone()
102 }
103
104 pub(super) async fn set_state(&mut self, state: State) -> eyre::Result<()> {
106 if let Some(old) = self.states.put(state.epoch.get(), state.clone()) {
107 warn!(epoch = %old.epoch, "overwriting existing state");
108 }
109 self.states.sync().await.wrap_err("failed writing state")?;
110 self.current = state;
111 Ok(())
112 }
113
114 #[instrument(
116 skip_all,
117 fields(
118 %epoch,
119 %player,
120 ),
121 err,
122 )]
123 async fn append_ack(
124 &mut self,
125 epoch: Epoch,
126 player: PublicKey,
127 ack: PlayerAck<PublicKey>,
128 ) -> eyre::Result<()> {
129 if self
130 .cache
131 .get(&epoch)
132 .is_some_and(|events| events.acks.contains_key(&player))
133 {
134 info!(%player, %epoch, "ack for player already found in cache, dropping");
135 return Ok(());
136 }
137
138 let section = epoch.get();
139 self.events
140 .append(
141 section,
142 &Event::Ack {
143 player: player.clone(),
144 ack: ack.clone(),
145 },
146 )
147 .await
148 .wrap_err("unable to write event to storage")?;
149
150 self.events
151 .sync(section)
152 .await
153 .wrap_err("unable to sync events journal")?;
154
155 self.cache
156 .entry(epoch)
157 .or_default()
158 .acks
159 .insert(player, ack);
160
161 Ok(())
162 }
163
164 #[instrument(
166 skip_all,
167 fields(
168 %epoch,
169 %dealer,
170 ),
171 err,
172 )]
173 async fn append_dealing(
174 &mut self,
175 epoch: Epoch,
176 dealer: PublicKey,
177 pub_msg: DealerPubMsg<MinSig>,
178 priv_msg: DealerPrivMsg,
179 ) -> eyre::Result<()> {
180 if self
181 .cache
182 .get(&epoch)
183 .is_some_and(|events| events.dealings.contains_key(&dealer))
184 {
185 info!(%dealer, %epoch, "dealing of dealer already found in cache, dropping");
186 return Ok(());
187 }
188
189 let section = epoch.get();
190 self.events
191 .append(
192 section,
193 &Event::Dealing {
194 dealer: dealer.clone(),
195 public_msg: pub_msg.clone(),
196 private_msg: priv_msg.clone(),
197 },
198 )
199 .await
200 .wrap_err("unable to write event to storage")?;
201
202 self.events
203 .sync(section)
204 .await
205 .wrap_err("unable to sync events journal")?;
206
207 self.cache
208 .entry(epoch)
209 .or_default()
210 .dealings
211 .insert(dealer, (pub_msg, priv_msg));
212
213 Ok(())
214 }
215
216 pub(super) async fn append_dealer_log(
218 &mut self,
219 epoch: Epoch,
220 dealer: PublicKey,
221 log: dkg::DealerLog<MinSig, PublicKey>,
222 ) -> eyre::Result<()> {
223 if self
224 .cache
225 .get(&epoch)
226 .is_some_and(|events| events.logs.contains_key(&dealer))
227 {
228 info!(
229 %dealer,
230 %epoch,
231 "dealer log already found in cache; dropping"
232 );
233 return Ok(());
234 }
235
236 let section = epoch.get();
237 self.events
238 .append(
239 section,
240 &Event::Log {
241 dealer: dealer.clone(),
242 log: log.clone(),
243 },
244 )
245 .await
246 .wrap_err("failed to append log to journal")?;
247 self.events
248 .sync(section)
249 .await
250 .wrap_err("unable to sync journal")?;
251
252 let cache = self.cache.entry(epoch).or_default();
253 cache.logs.insert(dealer, log);
254 Ok(())
255 }
256
257 pub(super) async fn append_finalized_block(
259 &mut self,
260 epoch: Epoch,
261 block: Block,
262 ) -> eyre::Result<()> {
263 let height = block.height();
264 let digest = block.digest();
265 let parent = block.parent();
266 if self
267 .cache
268 .get(&epoch)
269 .is_some_and(|events| events.finalized.contains_key(&height))
270 {
271 info!(
272 %height,
273 %digest,
274 %parent,
275 "finalized block was already found in cache; dropping",
276 );
277 return Ok(());
278 }
279
280 let section = epoch.get();
281 self.events
282 .append(
283 section,
284 &Event::Finalized {
285 digest,
286 parent,
287 height,
288 },
289 )
290 .await
291 .wrap_err("failed to append finalized block to journal")?;
292 self.events
293 .sync(section)
294 .await
295 .wrap_err("unable to sync journal")?;
296
297 let cache = self.cache.entry(epoch).or_default();
298 cache.finalized.insert(
299 height,
300 FinalizedBlockInfo {
301 height: block.height(),
302 digest: block.digest(),
303 parent: block.parent_digest(),
304 },
305 );
306 Ok(())
307 }
308
309 pub(super) fn cache_dkg_outcome(
310 &mut self,
311 epoch: Epoch,
312 digest: Digest,
313 output: Output<MinSig, PublicKey>,
314 share: ShareState,
315 ) {
316 self.cache
317 .entry(epoch)
318 .or_default()
319 .dkg_outcomes
320 .insert(digest, (output, share));
321 }
322
323 pub(super) fn get_dkg_outcome(
324 &self,
325 epoch: &Epoch,
326 digest: &Digest,
327 ) -> Option<&(Output<MinSig, PublicKey>, ShareState)> {
328 self.cache
329 .get(epoch)
330 .and_then(|events| events.dkg_outcomes.get(digest))
331 }
332
333 pub(super) fn cache_notarized_block(&mut self, round: &Round, block: Block) {
339 let cache = self.cache.entry(round.epoch).or_default();
340 let log = ReducedBlock::from_block_for_round(&block, round);
341 cache.notarized_blocks.insert(log.digest, log);
342 }
343
344 #[instrument(
345 skip_all,
346 fields(
347 me = %me.public_key(),
348 epoch = %round.epoch,
349 )
350 err,
351 )]
352 pub(super) fn create_dealer_for_round(
353 &mut self,
354 me: PrivateKey,
355 round: Round,
356 share: ShareState,
357 seed: Summary,
358 ) -> eyre::Result<Option<Dealer>> {
359 if round.dealers.position(&me.public_key()).is_none() {
360 return Ok(None);
361 }
362
363 let share = if round.is_full_dkg() {
364 info!("running full DKG ceremony as dealer (new polynomial)");
365 None
366 } else {
367 let inner = share.into_inner();
368 if inner.is_none() {
369 warn!(
370 "we are a dealer in this round, but we do not have a share, \
371 which means we likely lost it; will not instantiate a dealer \
372 instance and hope to get a new share in the next round if we \
373 are a player"
374 );
375 return Ok(None);
376 }
377 inner
378 };
379
380 let (mut dealer, pub_msg, priv_msgs) = dkg::Dealer::start::<N3f1>(
381 Transcript::resume(seed).noise(b"dealer-rng"),
382 round.info.clone(),
383 me.clone(),
384 share,
385 )
386 .wrap_err("unable to start cryptographic dealer instance")?;
387
388 let mut unsent: BTreeMap<PublicKey, DealerPrivMsg> = priv_msgs.into_iter().collect();
390 for (player, ack) in self.acks_for_epoch(round.epoch) {
391 if unsent.contains_key(player)
392 && dealer
393 .receive_player_ack(player.clone(), ack.clone())
394 .is_ok()
395 {
396 unsent.remove(player);
397 debug!(%player, "replayed player ack");
398 }
399 }
400
401 Ok(Some(Dealer::new(Some(dealer), pub_msg, unsent)))
402 }
403
404 #[instrument(
406 skip_all,
407 fields(
408 epoch = %round.epoch,
409 me = %me.public_key(),
410 )
411 err,
412 )]
413 pub(super) fn create_player_for_round(
414 &self,
415 me: PrivateKey,
416 round: &Round,
417 ) -> eyre::Result<Option<Player>> {
418 if round.players.position(&me.public_key()).is_none() {
419 return Ok(None);
420 }
421
422 let mut player = Player::new(
423 dkg::Player::new(round.info.clone(), me)
424 .wrap_err("unable to start cryptographic player instance")?,
425 );
426
427 for (dealer, (pub_msg, priv_msg)) in self.dealings_for_epoch(round.epoch()) {
429 player.replay(dealer.clone(), pub_msg.clone(), priv_msg.clone());
430 debug!(%dealer, "replayed committed dealer message");
431 }
432
433 Ok(Some(player))
434 }
435
436 pub(super) fn get_latest_finalized_block_for_epoch(
437 &self,
438 epoch: &Epoch,
439 ) -> Option<(&Height, &FinalizedBlockInfo)> {
440 self.cache
441 .get(epoch)
442 .and_then(|cache| cache.finalized.last_key_value())
443 }
444
445 pub(super) fn get_notarized_reduced_block(
446 &mut self,
447 epoch: &Epoch,
448 digest: &Digest,
449 ) -> Option<&ReducedBlock> {
450 self.cache
451 .get(epoch)
452 .and_then(|cache| cache.notarized_blocks.get(digest))
453 }
454
455 #[instrument(skip_all, fields(%up_to_epoch), err)]
456 pub(super) async fn prune(&mut self, up_to_epoch: Epoch) -> eyre::Result<()> {
457 self.events
458 .prune(up_to_epoch.get())
459 .await
460 .wrap_err("unable to prune events journal")?;
461 self.states.retain(|&key, _| key >= up_to_epoch.get());
462 self.states
463 .sync()
464 .await
465 .wrap_err("unable to prune events metadata")?;
466 self.cache.retain(|&epoch, _| epoch >= up_to_epoch);
467 Ok(())
468 }
469}
470
471#[derive(Default)]
472pub(super) struct Builder {
473 initial_state: Option<BoxFuture<'static, eyre::Result<State>>>,
474 partition_prefix: Option<String>,
475}
476
477impl Builder {
478 pub(super) fn initial_state(
479 self,
480 initial_state: impl Future<Output = eyre::Result<State>> + Send + 'static,
481 ) -> Self {
482 Self {
483 initial_state: Some(initial_state.boxed()),
484 ..self
485 }
486 }
487
488 pub(super) fn partition_prefix(self, partition_prefix: &str) -> Self {
489 Self {
490 partition_prefix: Some(partition_prefix.to_string()),
491 ..self
492 }
493 }
494
495 #[instrument(skip_all, err)]
496 pub(super) async fn init<TContext>(self, context: TContext) -> eyre::Result<Storage<TContext>>
497 where
498 TContext: BufferPooler + commonware_runtime::Storage + Clock + Metrics,
499 {
500 let Self {
501 initial_state,
502 partition_prefix,
503 } = self;
504 let partition_prefix =
505 partition_prefix.ok_or_eyre("DKG actors state must have its partition prefix set")?;
506
507 let page_cache = CacheRef::from_pooler(&context, PAGE_SIZE, POOL_CAPACITY);
508
509 let states_metadata_partition = format!("{partition_prefix}_states_metadata");
510
511 let mut states = metadata::Metadata::init(
512 context.with_label("states"),
513 metadata::Config {
514 partition: states_metadata_partition,
515 codec_config: MAXIMUM_VALIDATORS,
516 },
517 )
518 .await
519 .wrap_err("unable to initialize DKG states metadata")?;
520
521 if states.keys().max().is_none() {
522 let initial_state = match initial_state {
523 None => {
524 return Err(eyre!(
525 "states metadata was empty and initializer was not set"
526 ));
527 }
528 Some(initial_state) => initial_state
529 .await
530 .wrap_err("failed constructing initial state to populate storage")?,
531 };
532 states
533 .put_sync(initial_state.epoch.get(), initial_state)
534 .await
535 .wrap_err("unable to write initial state to metadata")?;
536 }
537
538 let current = states
539 .keys()
540 .max()
541 .map(|epoch| {
542 states
543 .get(epoch)
544 .expect("state at keys iterator must exist")
545 .clone()
546 })
547 .expect("states storage must contain a state after initialization");
548
549 let events = segmented::variable::Journal::init(
550 context.with_label("events"),
551 segmented::variable::Config {
552 partition: format!("{partition_prefix}_events"),
553 compression: None,
554 codec_config: MAXIMUM_VALIDATORS,
555 page_cache,
556 write_buffer: WRITE_BUFFER,
557 },
558 )
559 .await
560 .expect("should be able to initialize events journal");
561
562 let mut cache = BTreeMap::<Epoch, Events>::new();
564 {
565 let replay = events
566 .replay(0, 0, READ_BUFFER)
567 .await
568 .wrap_err("unable to start a replay stream to populate events cache")?;
569 futures::pin_mut!(replay);
570
571 while let Some(result) = replay.next().await {
572 let (section, _, _, event) =
573 result.wrap_err("unable to read entry in replay stream")?;
574 let epoch = Epoch::new(section);
575 let events = cache.entry(epoch).or_default();
576 events.insert(event);
577 }
578 }
579
580 Ok(Storage {
581 states,
582 events,
583 current,
584 cache,
585 })
586 }
587}
588
589#[derive(Clone, Debug, PartialEq, Eq)]
597pub(super) enum ShareState {
598 Plaintext(Option<Share>),
599}
600
601impl ShareState {
602 pub(super) fn into_inner(self) -> Option<Share> {
603 match self {
604 Self::Plaintext(share) => share,
605 }
606 }
607}
608
609impl EncodeSize for ShareState {
610 fn encode_size(&self) -> usize {
611 match self {
612 Self::Plaintext(share) => 1 + share.encode_size(),
613 }
614 }
615}
616
617impl Write for ShareState {
618 fn write(&self, buf: &mut impl bytes::BufMut) {
619 match self {
620 Self::Plaintext(share) => {
621 0u8.write(buf);
622 share.write(buf);
623 }
624 }
625 }
626}
627
628impl Read for ShareState {
629 type Cfg = ();
630
631 fn read_cfg(
632 buf: &mut impl bytes::Buf,
633 _cfg: &Self::Cfg,
634 ) -> Result<Self, commonware_codec::Error> {
635 let tag = u8::read(buf)?;
636 match tag {
637 0 => Ok(Self::Plaintext(ReadExt::read(buf)?)),
638 other => Err(commonware_codec::Error::InvalidEnum(other)),
639 }
640 }
641}
642
643#[derive(Clone, Debug, PartialEq, Eq)]
645pub(super) struct State {
646 pub(super) epoch: Epoch,
647 pub(super) seed: Summary,
648 pub(super) output: Output<MinSig, PublicKey>,
649 pub(super) share: ShareState,
650 pub(super) players: ordered::Set<PublicKey>,
651 pub(super) is_full_dkg: bool,
652}
653
654impl State {
655 pub(super) fn dealers(&self) -> &ordered::Set<PublicKey> {
657 self.output.players()
658 }
659
660 pub(super) fn players(&self) -> &ordered::Set<PublicKey> {
662 &self.players
663 }
664
665 fn legacy_syncers(&self) -> ordered::Set<PublicKey> {
667 ordered::Set::default()
668 }
669}
670
671impl EncodeSize for State {
672 fn encode_size(&self) -> usize {
673 self.epoch.encode_size()
674 + self.seed.encode_size()
675 + self.output.encode_size()
676 + self.share.encode_size()
677 + self.players.encode_size()
678 + self.legacy_syncers().encode_size()
681 + self.is_full_dkg.encode_size()
682 }
683}
684
685impl Write for State {
686 fn write(&self, buf: &mut impl bytes::BufMut) {
687 self.epoch.write(buf);
688 self.seed.write(buf);
689 self.output.write(buf);
690 self.share.write(buf);
691 self.players.write(buf);
692 self.legacy_syncers().write(buf);
695 self.is_full_dkg.write(buf);
696 }
697}
698
699impl Read for State {
700 type Cfg = NonZeroU32;
701
702 fn read_cfg(
703 buf: &mut impl bytes::Buf,
704 cfg: &Self::Cfg,
705 ) -> Result<Self, commonware_codec::Error> {
706 let epoch = ReadExt::read(buf)?;
707 let seed = ReadExt::read(buf)?;
708 let output = Read::read_cfg(buf, &(*cfg, ModeVersion::v0()))?;
709 let share = ReadExt::read(buf)?;
710 let players = Read::read_cfg(buf, &(RangeCfg::from(1..=(u16::MAX as usize)), ()))?;
711
712 ordered::Set::<PublicKey>::read_cfg(buf, &(RangeCfg::from(0..=(u16::MAX as usize)), ()))?;
714
715 let is_full_dkg = ReadExt::read(buf)?;
716
717 Ok(Self {
718 epoch,
719 seed,
720 output,
721 share,
722 players,
723 is_full_dkg,
724 })
725 }
726}
727
728#[expect(
729 dead_code,
730 reason = "tracking this data is virtually free and might become useful later"
731)]
732#[derive(Clone, Debug)]
733pub(super) struct FinalizedBlockInfo {
734 pub(super) height: Height,
735 pub(super) digest: Digest,
736 pub(super) parent: Digest,
737}
738
739#[derive(Debug, Default)]
741struct Events {
742 acks: BTreeMap<PublicKey, PlayerAck<PublicKey>>,
743 dealings: BTreeMap<PublicKey, (DealerPubMsg<MinSig>, DealerPrivMsg)>,
744 logs: BTreeMap<PublicKey, dkg::DealerLog<MinSig, PublicKey>>,
745 finalized: BTreeMap<Height, FinalizedBlockInfo>,
746
747 notarized_blocks: HashMap<Digest, ReducedBlock>,
748 dkg_outcomes: HashMap<Digest, (Output<MinSig, PublicKey>, ShareState)>,
749}
750
751impl Events {
752 fn insert(&mut self, event: Event) {
753 match event {
754 Event::Dealing {
755 dealer: public_key,
756 public_msg,
757 private_msg,
758 } => {
759 self.dealings.insert(public_key, (public_msg, private_msg));
760 }
761 Event::Ack {
762 player: public_key,
763 ack,
764 } => {
765 self.acks.insert(public_key, ack);
766 }
767 Event::Log { dealer, log } => {
768 self.logs.insert(dealer, log);
769 }
770 Event::Finalized {
771 digest,
772 parent,
773 height,
774 } => {
775 self.finalized.insert(
776 height,
777 FinalizedBlockInfo {
778 height,
779 digest,
780 parent,
781 },
782 );
783 }
784 }
785 }
786}
787
788enum Event {
789 Dealing {
791 dealer: PublicKey,
792 public_msg: DealerPubMsg<MinSig>,
793 private_msg: DealerPrivMsg,
794 },
795 Ack {
797 player: PublicKey,
798 ack: PlayerAck<PublicKey>,
799 },
800 Log {
802 dealer: PublicKey,
803 log: dkg::DealerLog<MinSig, PublicKey>,
804 },
805 Finalized {
807 digest: Digest,
808 parent: Digest,
809 height: Height,
810 },
811}
812
813impl EncodeSize for Event {
814 fn encode_size(&self) -> usize {
815 1 + match self {
816 Self::Dealing {
817 dealer: public_key,
818 public_msg,
819 private_msg,
820 } => public_key.encode_size() + public_msg.encode_size() + private_msg.encode_size(),
821 Self::Ack {
822 player: public_key,
823 ack,
824 } => public_key.encode_size() + ack.encode_size(),
825 Self::Log { dealer, log } => dealer.encode_size() + log.encode_size(),
826 Self::Finalized {
827 digest,
828 parent,
829 height,
830 } => digest.encode_size() + parent.encode_size() + height.encode_size(),
831 }
832 }
833}
834
835impl Write for Event {
836 fn write(&self, buf: &mut impl bytes::BufMut) {
837 match self {
838 Self::Dealing {
839 dealer: public_key,
840 public_msg,
841 private_msg,
842 } => {
843 0u8.write(buf);
844 public_key.write(buf);
845 public_msg.write(buf);
846 private_msg.write(buf);
847 }
848 Self::Ack {
849 player: public_key,
850 ack,
851 } => {
852 1u8.write(buf);
853 public_key.write(buf);
854 ack.write(buf);
855 }
856 Self::Log { dealer, log } => {
857 2u8.write(buf);
858 dealer.write(buf);
859 log.write(buf);
860 }
861 Self::Finalized {
862 digest,
863 parent,
864 height,
865 } => {
866 3u8.write(buf);
867 digest.write(buf);
868 parent.write(buf);
869 height.write(buf);
870 }
871 }
872 }
873}
874
875impl Read for Event {
876 type Cfg = NonZeroU32;
877
878 fn read_cfg(
879 buf: &mut impl bytes::Buf,
880 cfg: &Self::Cfg,
881 ) -> Result<Self, commonware_codec::Error> {
882 let tag = u8::read(buf)?;
883 match tag {
884 0 => Ok(Self::Dealing {
885 dealer: ReadExt::read(buf)?,
886 public_msg: Read::read_cfg(buf, cfg)?,
887 private_msg: ReadExt::read(buf)?,
888 }),
889 1 => Ok(Self::Ack {
890 player: ReadExt::read(buf)?,
891 ack: ReadExt::read(buf)?,
892 }),
893 2 => Ok(Self::Log {
894 dealer: ReadExt::read(buf)?,
895 log: Read::read_cfg(buf, &NZU32!(u16::MAX as u32))?,
896 }),
897 3 => Ok(Self::Finalized {
898 digest: ReadExt::read(buf)?,
899 parent: ReadExt::read(buf)?,
900 height: ReadExt::read(buf)?,
901 }),
902 other => Err(commonware_codec::Error::InvalidEnum(other)),
903 }
904 }
905}
906
907pub(super) struct Dealer {
909 dealer: Option<dkg::Dealer<MinSig, PrivateKey>>,
912
913 pub_msg: DealerPubMsg<MinSig>,
916
917 unsent: BTreeMap<PublicKey, DealerPrivMsg>,
920
921 finalized: Option<SignedDealerLog<MinSig, PrivateKey>>,
925}
926
927impl Dealer {
928 pub(super) const fn new(
929 dealer: Option<dkg::Dealer<MinSig, PrivateKey>>,
930 pub_msg: DealerPubMsg<MinSig>,
931 unsent: BTreeMap<PublicKey, DealerPrivMsg>,
932 ) -> Self {
933 Self {
934 dealer,
935 pub_msg,
936 unsent,
937 finalized: None,
938 }
939 }
940
941 pub(super) async fn receive_ack<TContext>(
946 &mut self,
947 storage: &mut Storage<TContext>,
948 epoch: Epoch,
949 player: PublicKey,
950 ack: PlayerAck<PublicKey>,
951 ) -> eyre::Result<()>
952 where
953 TContext: commonware_runtime::Storage + Clock + Metrics,
954 {
955 if !self.unsent.contains_key(&player) {
956 bail!("already received an ack from `{player}`");
957 }
958 match &mut self.dealer {
959 Some(dealer) => {
960 dealer
961 .receive_player_ack(player.clone(), ack.clone())
962 .wrap_err("unable to receive player ack")?;
963 self.unsent.remove(&player);
964 storage
965 .append_ack(epoch, player.clone(), ack.clone())
966 .await
967 .wrap_err("unable to append ack to journal")?;
968 }
969 None => bail!("dealer was already finalized, dropping ack of player `{player}`"),
970 }
971 Ok(())
972 }
973
974 pub(super) fn finalize(&mut self) {
976 if self.finalized.is_some() {
977 return;
978 }
979
980 if let Some(dealer) = self.dealer.take() {
983 let log = dealer.finalize::<N3f1>();
984 self.finalized = Some(log);
985 }
986 }
987
988 pub(super) fn finalized(&self) -> Option<SignedDealerLog<MinSig, PrivateKey>> {
990 self.finalized.clone()
991 }
992
993 pub(super) const fn take_finalized(&mut self) -> Option<SignedDealerLog<MinSig, PrivateKey>> {
995 self.finalized.take()
996 }
997
998 pub(super) fn shares_to_distribute(
1003 &self,
1004 ) -> impl Iterator<Item = (PublicKey, DealerPubMsg<MinSig>, DealerPrivMsg)> + '_ {
1005 self.unsent
1006 .iter()
1007 .map(|(player, priv_msg)| (player.clone(), self.pub_msg.clone(), priv_msg.clone()))
1008 }
1009}
1010
1011#[derive(Clone, Debug)]
1012pub(super) struct Round {
1013 epoch: Epoch,
1014 info: dkg::Info<MinSig, PublicKey>,
1015 dealers: ordered::Set<PublicKey>,
1016 players: ordered::Set<PublicKey>,
1017 is_full_dkg: bool,
1018}
1019
1020impl Round {
1021 pub(super) fn from_state(state: &State, namespace: &[u8]) -> Self {
1022 let previous_output = if state.is_full_dkg {
1024 None
1025 } else {
1026 Some(state.output.clone())
1027 };
1028
1029 let dealers = state.dealers().clone();
1030 let players = state.players().clone();
1031
1032 Self {
1033 epoch: state.epoch,
1034 info: Info::new::<N3f1>(
1035 namespace,
1036 state.epoch.get(),
1037 previous_output,
1038 Mode::NonZeroCounter,
1039 dealers.clone(),
1040 players.clone(),
1041 )
1042 .expect("a DKG round must always be initializable given some epoch state"),
1043 dealers,
1044 players,
1045 is_full_dkg: state.is_full_dkg,
1046 }
1047 }
1048
1049 pub(super) fn info(&self) -> &dkg::Info<MinSig, PublicKey> {
1050 &self.info
1051 }
1052
1053 pub(super) fn epoch(&self) -> Epoch {
1054 self.epoch
1055 }
1056
1057 pub(super) fn dealers(&self) -> &ordered::Set<PublicKey> {
1058 &self.dealers
1059 }
1060
1061 pub(super) fn players(&self) -> &ordered::Set<PublicKey> {
1062 &self.players
1063 }
1064
1065 pub(super) fn is_full_dkg(&self) -> bool {
1066 self.is_full_dkg
1067 }
1068}
1069
1070pub(super) struct Player {
1072 player: dkg::Player<MinSig, PrivateKey>,
1073 acks: BTreeMap<PublicKey, PlayerAck<PublicKey>>,
1076}
1077
1078impl Player {
1079 pub(super) const fn new(player: dkg::Player<MinSig, PrivateKey>) -> Self {
1080 Self {
1081 player,
1082 acks: BTreeMap::new(),
1083 }
1084 }
1085
1086 pub(super) async fn receive_dealing<TContext>(
1090 &mut self,
1091 storage: &mut Storage<TContext>,
1092 epoch: Epoch,
1093 dealer: PublicKey,
1094 pub_msg: DealerPubMsg<MinSig>,
1095 priv_msg: DealerPrivMsg,
1096 ) -> eyre::Result<PlayerAck<PublicKey>>
1097 where
1098 TContext: commonware_runtime::Storage + Clock + Metrics,
1099 {
1100 if let Some(ack) = self.acks.get(&dealer) {
1102 return Ok(ack.clone());
1103 }
1104
1105 let ack = self
1107 .player
1108 .dealer_message::<N3f1>(dealer.clone(), pub_msg.clone(), priv_msg.clone())
1109 .ok_or_eyre(
1111 "applying dealer message to player instance did not result in a usable ack",
1112 )?;
1113 storage
1114 .append_dealing(epoch, dealer.clone(), pub_msg, priv_msg)
1115 .await
1116 .wrap_err("unable to append dealing to journal")?;
1117 self.acks.insert(dealer, ack.clone());
1118 Ok(ack)
1119 }
1120
1121 fn replay(
1123 &mut self,
1124 dealer: PublicKey,
1125 pub_msg: DealerPubMsg<MinSig>,
1126 priv_msg: DealerPrivMsg,
1127 ) {
1128 if self.acks.contains_key(&dealer) {
1129 return;
1130 }
1131 if let Some(ack) = self
1132 .player
1133 .dealer_message::<N3f1>(dealer.clone(), pub_msg, priv_msg)
1134 {
1135 self.acks.insert(dealer, ack);
1136 }
1137 }
1138
1139 pub(super) fn finalize(
1141 self,
1142 rng: &mut impl rand_core::CryptoRngCore,
1143 logs: dkg::Logs<MinSig, PublicKey, N3f1>,
1144 strategy: &impl Strategy,
1145 ) -> Result<(Output<MinSig, PublicKey>, Share), dkg::Error> {
1146 self.player
1147 .finalize::<N3f1, commonware_cryptography::ed25519::Batch>(rng, logs, strategy)
1148 }
1149}
1150
1151#[derive(Clone, Debug)]
1153pub(super) struct ReducedBlock {
1154 pub(super) height: Height,
1156
1157 pub(super) parent: Digest,
1159
1160 pub(super) digest: Digest,
1162
1163 pub(super) log: Option<(PublicKey, dkg::DealerLog<MinSig, PublicKey>)>,
1165}
1166
1167impl ReducedBlock {
1168 pub(super) fn from_block_for_round(block: &Block, round: &Round) -> Self {
1169 let log = if block.header().extra_data().is_empty() {
1170 None
1171 } else {
1172 dkg::SignedDealerLog::<MinSig, PrivateKey>::read_cfg(
1173 &mut block.header().extra_data().as_ref(),
1174 &NZU32!(round.players.len() as u32),
1175 )
1176 .inspect(|_| {
1177 info!(
1178 height = %block.height(),
1179 digest = %block.digest(),
1180 "found dealer log in block"
1181 )
1182 })
1183 .inspect_err(|error| {
1184 warn!(
1185 %error,
1186 "block header extraData had data, but it could not be read as \
1187 a signed dealer log",
1188 )
1189 })
1190 .ok()
1191 .and_then(|log| match log.check(&round.info) {
1192 Some((dealer, log)) => Some((dealer, log)),
1193 None => {
1194 warn!("log failed check against current round");
1196 None
1197 }
1198 })
1199 };
1200 Self {
1201 height: block.height(),
1202 parent: block.parent(),
1203 digest: block.digest(),
1204 log,
1205 }
1206 }
1207}
1208
1209#[cfg(test)]
1210mod tests {
1211 use super::*;
1212 use commonware_codec::Encode as _;
1213 use commonware_cryptography::{
1214 bls12381::{dkg, primitives::sharing::Mode},
1215 ed25519::PrivateKey,
1216 transcript::Summary,
1217 };
1218 use commonware_math::algebra::Random as _;
1219 use commonware_runtime::{Runner as _, deterministic};
1220 use commonware_utils::TryFromIterator as _;
1221
1222 fn make_test_state(rng: &mut impl rand_core::CryptoRngCore, epoch: u64) -> State {
1223 let mut keys: Vec<_> = (0..3)
1224 .map(|i| PrivateKey::from_seed(i + epoch * 100))
1225 .collect();
1226
1227 keys.sort_by_key(|k| k.public_key());
1228
1229 let pubkeys = ordered::Set::try_from_iter(keys.iter().map(|k| k.public_key())).unwrap();
1230
1231 let (output, _shares) =
1232 dkg::deal::<_, _, N3f1>(&mut *rng, Mode::NonZeroCounter, pubkeys.clone()).unwrap();
1233
1234 State {
1235 epoch: Epoch::new(epoch),
1236 seed: Summary::random(rng),
1237 output,
1238 share: ShareState::Plaintext(None),
1239 players: pubkeys,
1240 is_full_dkg: false,
1241 }
1242 }
1243
1244 #[test]
1245 fn state_codec_round_trip() {
1246 let executor = deterministic::Runner::default();
1247 executor.start(|mut context| async move {
1248 let state = make_test_state(&mut context, 0);
1249 let mut bytes = state.encode();
1250 let decoded = State::read_cfg(&mut bytes, &NZU32!(u32::MAX)).unwrap();
1251 assert_eq!(state, decoded);
1252 });
1253 }
1254
1255 #[test]
1256 fn state_codec_read_ignores_legacy_populated_syncers() {
1257 let executor = deterministic::Runner::default();
1258 executor.start(|mut context| async move {
1259 let state = make_test_state(&mut context, 0);
1260
1261 let legacy_syncers = state.players.clone();
1264 let mut bytes = Vec::new();
1265 state.epoch.write(&mut bytes);
1266 state.seed.write(&mut bytes);
1267 state.output.write(&mut bytes);
1268 state.share.write(&mut bytes);
1269 state.players.write(&mut bytes);
1270
1271 legacy_syncers.write(&mut bytes);
1273
1274 state.is_full_dkg.write(&mut bytes);
1275
1276 let decoded = State::read_cfg(&mut bytes.as_slice(), &NZU32!(u32::MAX)).unwrap();
1277 assert_eq!(state, decoded);
1278 });
1279 }
1280
1281 #[track_caller]
1282 fn assert_roundtrip(original: &ShareState) {
1283 use commonware_codec::Encode as _;
1284 let encoded = original.encode();
1285 let decoded = ShareState::read_cfg(&mut encoded.as_ref(), &()).unwrap();
1286 assert_eq!(original, &decoded);
1287 }
1288
1289 #[test]
1290 fn share_state_roundtrip_plaintext_none() {
1291 assert_roundtrip(&ShareState::Plaintext(None));
1292 }
1293
1294 #[test]
1295 fn share_state_roundtrip_plaintext_some() {
1296 use rand_08::SeedableRng as _;
1297 let mut rng = rand_08::rngs::StdRng::seed_from_u64(42);
1298
1299 let keys = std::iter::repeat_with(|| PrivateKey::random(&mut rng))
1300 .take(3)
1301 .collect::<Vec<_>>();
1302 let pubkeys = ordered::Set::try_from_iter(keys.iter().map(|k| k.public_key())).unwrap();
1303
1304 let (_output, shares) =
1305 dkg::deal::<MinSig, _, N3f1>(&mut rng, Mode::NonZeroCounter, pubkeys).unwrap();
1306
1307 let share = shares.into_iter().next().unwrap().1;
1308 assert_roundtrip(&ShareState::Plaintext(Some(share)));
1309 }
1310}