1use std::{collections::BTreeMap, sync::Arc};
4
5use alloy_consensus::BlockHeader as _;
6use commonware_codec::{Decode as _, Encode as _};
7use commonware_consensus::{Block as _, types::Epoch};
8use commonware_cryptography::{
9 Signer as _,
10 bls12381::{
11 dkg::{self, Arbiter, Player, arbiter},
12 primitives::{group, poly::Public, variant::MinSig},
13 },
14 ed25519::{PrivateKey, PublicKey},
15};
16use commonware_p2p::{
17 Receiver, Recipients, Sender,
18 utils::mux::{MuxHandle, SubReceiver, SubSender},
19};
20use commonware_runtime::{Clock, Storage};
21use commonware_storage::metadata::Metadata;
22use commonware_utils::{max_faults, sequence::U64, set::Ordered, union};
23use eyre::{WrapErr as _, bail, ensure};
24use futures::{FutureExt as _, lock::Mutex};
25use indexmap::IndexSet;
26use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
27use rand_core::CryptoRngCore;
28use tracing::{Level, debug, error, info, instrument, warn};
29
30use tempo_dkg_onchain_artifacts::{Ack, IntermediateOutcome};
31
32use crate::{consensus::block::Block, dkg::HardforkRegime};
33
34mod payload;
35mod persisted;
36
37pub(super) use persisted::State;
38
39use payload::{Message, Payload, Share};
40use persisted::Dealing;
41
42const ACK_NAMESPACE: &[u8] = b"_DKG_ACK";
43pub(super) const OUTCOME_NAMESPACE: &[u8] = b"_DKG_OUTCOME";
44
45const WEIGHT_RECOVERY_CONCURRENCY: usize = 1;
48
49pub(super) struct Config {
50 pub(super) namespace: Vec<u8>,
52
53 pub(super) me: PrivateKey,
54
55 pub(super) public: Public<MinSig>,
57
58 pub(super) share: Option<group::Share>,
61
62 pub(super) epoch: Epoch,
64
65 pub(super) dealers: Ordered<PublicKey>,
67
68 pub(super) players: Ordered<PublicKey>,
70}
71pub(super) struct Ceremony<TContext, TReceiver, TSender>
72where
73 TContext: Clock + commonware_runtime::Metrics + Storage,
74 TReceiver: Receiver,
75 TSender: Sender,
76{
77 config: Config,
78
79 previous_role: Role,
83
84 dealer_me: Option<Dealer>,
86
87 player_me: Option<Player<PublicKey, MinSig>>,
91
92 players_indexed: IndexSet<PublicKey>,
98
99 arbiter: Arbiter<PublicKey, MinSig>,
101
102 ceremony_metadata: Arc<Mutex<Metadata<TContext, U64, State>>>,
103 receiver: SubReceiver<TReceiver>,
104 sender: SubSender<TSender>,
105 metrics: Metrics,
106}
107
108impl<TContext, TReceiver, TSender> Ceremony<TContext, TReceiver, TSender>
109where
110 TContext: Clock + CryptoRngCore + commonware_runtime::Metrics + Storage,
111 TReceiver: Receiver<PublicKey = PublicKey>,
112 TSender: Sender<PublicKey = PublicKey>,
113{
114 #[instrument(skip_all, fields(for_epoch = config.epoch), err)]
116 pub(super) async fn init(
117 context: &mut TContext,
118 mux: &mut MuxHandle<TSender, TReceiver>,
119 ceremony_metadata: Arc<Mutex<Metadata<TContext, U64, State>>>,
120 config: Config,
121 metrics: Metrics,
122 ) -> eyre::Result<Self> {
123 metrics.reset_per_ceremony_metrics();
127
128 metrics.dealers.set(config.dealers.len() as i64);
129 metrics.players.set(config.players.len() as i64);
130
131 let (sender, receiver) = mux
132 .register(config.epoch)
133 .await
134 .wrap_err("mux subchannel already running for epoch; this is a problem")?;
135
136 let players_indexed: IndexSet<_> = config.players.iter().cloned().collect();
137 let mut player_me = players_indexed.get(&config.me.public_key()).map(|_| {
138 Player::new(
139 config.me.public_key(),
140 Some(config.public.clone()),
141 config.dealers.clone(),
142 config.players.clone(),
143 WEIGHT_RECOVERY_CONCURRENCY,
144 )
145 });
146
147 let mut arbiter = Arbiter::new(
148 Some(config.public.clone()),
149 config.dealers.clone(),
150 config.players.clone(),
151 WEIGHT_RECOVERY_CONCURRENCY,
152 );
153
154 let mut dealer_me = None;
155
156 debug!("attempting to read ceremony state from disk");
157 let recovered = ceremony_metadata
160 .lock()
161 .await
162 .get(&config.epoch.into())
163 .cloned();
164
165 if let Some(recovered) = recovered {
166 info!("found a previous ceremony state written to disk; recovering it");
167 for outcome in &recovered.outcomes {
168 let ack_indices = outcome
169 .acks()
170 .iter()
171 .filter_map(|ack| {
172 let idx = players_indexed.get_index_of(ack.player());
173 if idx.is_none() {
174 warn!(
175 player = %ack.player(),
176 "ack for player recovered from disk not among players of this ceremony",
177 );
178 }
179 idx.map(|idx| idx as u32)
180 })
181 .collect::<Vec<_>>();
182
183 if let Err(error) = arbiter
184 .commitment(
185 outcome.dealer().clone(),
186 outcome.commitment().clone(),
187 ack_indices,
188 outcome.reveals().to_vec(),
189 )
190 .wrap_err("failed to verify and track commitment")
191 {
192 warn!(
193 %error,
194 "failed to update arbiter with metadata recovered from disk",
195 );
196 }
197 }
198
199 if let Some(me) = &mut player_me {
200 for (dealer, commitment, share) in recovered.received_shares.clone() {
201 me.share(dealer, commitment, share)
202 .wrap_err("failed updating my player information with stored metadata")?;
203 }
204 }
205
206 if let Some(dealing) = recovered.dealing.clone() {
207 let (mut dkg_dealer, _, _) = dkg::Dealer::<PublicKey, MinSig>::new(
208 context,
209 config.share.clone(),
210 config.players.clone(),
211 );
212 for ack in dealing.acks.values() {
213 dkg_dealer.ack(ack.player().clone()).wrap_err_with(|| {
214 format!(
215 "failed updating dealer information with ack for \
216 player `{player}` recovered from disk",
217 player = ack.player(),
218 )
219 })?;
220 }
221 dealer_me = Some(Dealer {
222 commitment: dealing.commitment,
223 shares: dealing.shares,
224 acks: dealing.acks,
225 outcome: recovered.dealing_outcome,
226 });
227 }
228 } else {
229 info!("starting a fresh ceremony");
230
231 if let Some(share) = config.share.clone() {
232 info!("we have a share, so we are a dealer in this ceremony");
233 let (_, commitment, shares) = dkg::Dealer::<PublicKey, MinSig>::new(
234 context,
235 Some(share),
236 config.players.clone(),
237 );
238 let shares = config
239 .players
240 .iter()
241 .zip(&shares)
242 .map(|(player, share)| (player.clone(), share.clone()))
243 .collect();
244 dealer_me = Some(Dealer {
245 commitment,
246 shares,
247 acks: BTreeMap::new(),
248 outcome: None,
249 });
250 }
251
252 ceremony_metadata
253 .lock()
254 .await
255 .put_sync(
256 config.epoch.into(),
257 State {
258 num_players: config
259 .players
260 .len()
261 .try_into()
262 .expect("there should never be more than u16::MAX players"),
263 dealing: dealer_me.as_ref().map(|me| Dealing {
264 commitment: me.commitment.clone(),
265 shares: me.shares.clone(),
266 acks: BTreeMap::new(),
267 }),
268 ..State::default()
269 },
270 )
271 .await
272 .expect("must always be able to initialize the ceremony state to disk");
273 };
274
275 metrics.how_often_player.inc_by(player_me.is_some() as u64);
276 metrics.how_often_dealer.inc_by(dealer_me.is_some() as u64);
277
278 let previous = config.share.clone().map_or_else(
279 || Role::Verifier {
280 public: config.public.clone(),
281 },
282 |share| Role::Signer {
283 public: config.public.clone(),
284 share,
285 },
286 );
287 Ok(Self {
288 config,
289 previous_role: previous,
290 dealer_me,
291 player_me,
292 players_indexed,
293 arbiter,
294 ceremony_metadata,
295 receiver,
296 sender,
297 metrics,
298 })
299 }
300
301 #[instrument(skip_all, fields(epoch = self.config.epoch), err)]
308 pub(super) async fn distribute_shares(&mut self) -> eyre::Result<()> {
309 let Some(dealer_me) = &mut self.dealer_me else {
310 debug!("not a dealer, not distributing shares");
311 return Ok(());
312 };
313 for player in &self.config.players {
314 if dealer_me.acks.contains_key(player) {
315 continue;
316 }
317
318 let share = dealer_me
319 .shares
320 .get(player)
321 .cloned()
322 .expect("invariant violated: all players must have an entry in the shares map");
323
324 if let Some(player_me) = &mut self.player_me
325 && player == &self.config.me.public_key()
326 {
327 player_me
328 .share(
329 self.config.me.public_key(),
330 dealer_me.commitment.clone(),
331 share.clone(),
332 )
333 .expect(
334 "must work: updating player with own dealer \
335 commitment",
336 );
337
338 let ack = Ack::new(
342 &union(&self.config.namespace, ACK_NAMESPACE),
343 self.config.me.clone(),
344 self.config.me.public_key(),
345 self.config.epoch,
346 &self.config.me.public_key(),
347 &dealer_me.commitment,
348 );
349 assert_eq!(
350 None,
351 dealer_me
352 .acks
353 .insert(self.config.me.public_key(), ack.clone()),
354 "must only insert our own ack once",
355 );
356
357 self.ceremony_metadata
358 .lock()
359 .await
360 .upsert_sync(self.config.epoch.into(), |info| {
361 if let Some(dealing) = &mut info.dealing {
362 dealing.acks.insert(self.config.me.public_key(), ack);
363 } else {
364 info.dealing = Some(Dealing {
365 commitment: dealer_me.commitment.clone(),
366 shares: dealer_me.shares.clone(),
367 acks: BTreeMap::from([(self.config.me.public_key(), ack)]),
368 });
369 }
370 info.received_shares.push((
371 self.config.me.public_key(),
372 dealer_me.commitment.clone(),
373 share,
374 ));
375 })
376 .await
377 .expect("must be able to persists acks");
378 self.metrics.shares_distributed.inc();
380 self.metrics.acks_received.inc();
381 self.metrics.acks_sent.inc();
382 continue;
383 }
384
385 let payload = Share {
386 commitment: dealer_me.commitment.clone(),
387 share,
388 }
389 .into();
390 let success = self
391 .sender
392 .send(
393 Recipients::One(player.clone()),
394 Message {
395 epoch: self.config.epoch,
396 payload,
397 }
398 .encode()
399 .freeze(),
400 true,
401 )
402 .await
403 .wrap_err("unable to forward share to p2p network")?;
404
405 if success.is_empty() {
406 warn!(%player, "failed to send share to player");
407 } else {
408 info!(%player, "sent share to player");
409 self.metrics.shares_distributed.inc();
410 }
411 }
412 Ok(())
413 }
414
415 #[instrument(skip_all, fields(epoch = self.epoch()), err)]
422 pub(super) async fn process_messages(&mut self) -> eyre::Result<()> {
423 while let Some(msg) = self.receiver.recv().now_or_never() {
424 let (peer, mut msg) = msg.wrap_err("receiver p2p channel was closed")?;
425
426 debug!(%peer, "received message from");
427 let msg = Message::decode_cfg(&mut msg, &(self.config.players.len() as u32))
428 .wrap_err("unable to decode message")?;
429 if msg.epoch != self.epoch() {
430 warn!(
431 ceremony.epoch = self.epoch(),
432 msg.epoch = msg.epoch,
433 "ignoring message for different round"
434 );
435 continue;
436 }
437
438 match msg.payload {
439 Payload::Ack(ack) => {
440 let _: Result<_, _> = self.process_ack(peer, *ack).await;
441 }
442 Payload::Share(share) => {
443 let _: Result<_, _> = self.process_share(peer, share).await;
444 }
445 }
446 }
447
448 Ok(())
449 }
450
451 #[instrument(
452 skip_all,
453 fields(
454 epoch = %self.epoch(),
455 %peer,
456 player = %ack.player(),
457 ),
458 err(level = Level::WARN),
459 ret,
460 )]
461 async fn process_ack(&mut self, peer: PublicKey, ack: Ack) -> eyre::Result<&'static str> {
462 self.metrics.acks_received.inc();
463 let Some(dealer_me) = &mut self.dealer_me else {
464 return Ok("not a dealer, dropping ack");
465 };
466
467 ensure!(
468 ack.player() == &peer,
469 "player recorded in ack does not match peer that sent it; dropping ack",
470 );
471
472 ensure!(
473 self.players_indexed.contains(&peer),
474 "peer not among players for this ceremony; dropping ack",
475 );
476
477 ensure!(
478 ack.verify(
479 &union(&self.config.namespace, ACK_NAMESPACE),
480 &peer,
481 self.config.epoch,
482 &self.config.me.public_key(),
483 &dealer_me.commitment,
484 ),
485 "failed verifying ack signature against peer",
486 );
487
488 if let std::collections::btree_map::Entry::Vacant(vacant) =
489 dealer_me.acks.entry(peer.clone())
490 {
491 vacant.insert(ack.clone());
492 } else {
493 bail!("duplicate ack for peer");
494 }
495
496 self.ceremony_metadata
497 .lock()
498 .await
499 .upsert_sync(self.config.epoch.into(), |info| {
500 if let Some(dealing) = &mut info.dealing {
501 dealing.acks.insert(peer.clone(), ack);
502 } else {
503 info.dealing = Some(Dealing {
504 commitment: dealer_me.commitment.clone(),
505 shares: dealer_me.shares.clone(),
506 acks: BTreeMap::from([(peer.clone(), ack)]),
507 });
508 }
509 })
510 .await
511 .expect("must always be able to persist tracked acks to disk");
512
513 Ok("ack recorded")
514 }
515
516 #[instrument(
517 skip_all,
518 fields(
519 epoch = %self.epoch(),
520 %peer,
521 ),
522 err(level = Level::WARN),
523 ret,
524 )]
525 async fn process_share(
526 &mut self,
527 peer: PublicKey,
528 Share { commitment, share }: Share,
529 ) -> eyre::Result<&'static str> {
530 self.metrics.shares_received.inc();
531 let Some(player_me) = &mut self.player_me else {
532 return Ok("not a player, dropping share");
533 };
534
535 player_me
537 .share(peer.clone(), commitment.clone(), share.clone())
538 .wrap_err("failed to record and track share")?;
539
540 self.ceremony_metadata
541 .lock()
542 .await
543 .upsert_sync(self.epoch().into(), |info| {
544 info.received_shares
545 .push((peer.clone(), commitment.clone(), share));
546 })
547 .await
548 .expect("must always be able to persist tracked shares to disk");
549
550 let payload = Ack::new(
551 &union(&self.config.namespace, ACK_NAMESPACE),
552 self.config.me.clone(),
553 self.config.me.public_key(),
554 self.epoch(),
555 &peer,
556 &commitment,
557 )
558 .into();
559 self.sender
560 .send(
561 Recipients::One(peer.clone()),
562 Message {
563 epoch: self.epoch(),
564 payload,
565 }
566 .encode()
567 .freeze(),
568 true,
569 )
570 .await
571 .wrap_err("failed returning ack to peer")?;
572
573 self.metrics.acks_sent.inc();
574 Ok("recorded share and returned signed ack to peer")
575 }
576
577 #[instrument(skip_all, fields(epoch = self.epoch(), block.height = block.height()), err)]
582 pub(super) async fn process_dealings_in_block(
583 &mut self,
584 block: &Block,
585 hardfork_regime: HardforkRegime,
586 ) -> eyre::Result<()> {
587 if block.header().extra_data().is_empty() {
589 self.metrics.dealings_empty.inc();
590 return Ok(());
591 }
592
593 let block_outcome = match block
594 .try_read_ceremony_deal_outcome()
595 .wrap_err("failed reading intermediate DKG dealings from block")
596 {
597 Ok(outcome) => outcome,
598 Err(error) => {
599 self.metrics.bad_dealings.inc();
600 return Err(error);
601 }
602 };
603
604 info!(
605 dealer = %block_outcome.dealer(),
606 "found DKG dealing in block",
607 );
608
609 ensure!(
611 block_outcome.epoch() == self.epoch(),
612 "deal outcome in block was for epoch `{}`, but current dkg ceremony is for epoch `{}`",
613 block_outcome.epoch(),
614 self.epoch(),
615 );
616
617 ensure!(
618 self.dealers().position(block_outcome.dealer()).is_some(),
619 "dealer `{}` recorded in dealing outcome is not among the dealers of this ceremony",
620 block_outcome.dealer(),
621 );
622
623 let is_verified = match hardfork_regime {
625 HardforkRegime::PostAllegretto => {
626 block_outcome.verify(&union(&self.config.namespace, OUTCOME_NAMESPACE))
627 }
628 HardforkRegime::PreAllegretto => block_outcome
629 .verify_pre_allegretto(&union(&self.config.namespace, OUTCOME_NAMESPACE)),
630 };
631 if !is_verified {
632 self.metrics.bad_dealings.inc();
633 bail!("intermediate DKG dealing could not be verified");
634 }
635
636 if !block_outcome.acks().iter().all(|ack| {
638 self.players_indexed.contains(ack.player())
639 && ack.verify(
640 &union(&self.config.namespace, ACK_NAMESPACE),
641 ack.player(),
642 self.epoch(),
643 block_outcome.dealer(),
644 block_outcome.commitment(),
645 )
646 }) {
647 self.arbiter.disqualify(block_outcome.dealer().clone());
648 bail!("invalid ack signatures; disqualifying dealer");
649 }
650
651 let ack_indices = block_outcome
653 .acks()
654 .iter()
655 .filter_map(|ack| {
656 let idx = self.players_indexed.get_index_of(ack.player());
657 if idx.is_none() {
658 warn!(
659 player = %ack.player(),
660 "ack for player stored on disk not among players of this ceremony",
661 );
662 }
663 idx.map(|idx| idx as u32)
664 })
665 .collect::<Vec<_>>();
666
667 self.arbiter
668 .commitment(
669 block_outcome.dealer().clone(),
670 block_outcome.commitment().clone(),
671 ack_indices,
672 block_outcome.reveals().to_vec(),
673 )
674 .wrap_err("failed to track dealer outcome in arbiter")?;
675
676 let block_dealer = block_outcome.dealer().clone();
677 self.ceremony_metadata
678 .lock()
679 .await
680 .upsert_sync(self.epoch().into(), |info| {
681 if let Some(pos) = info
682 .outcomes
683 .iter()
684 .position(|outcome| outcome.dealer() == block_outcome.dealer())
685 {
686 info.outcomes[pos] = block_outcome;
687 } else {
688 info.outcomes.push(block_outcome);
689 }
690 })
691 .await
692 .expect("must persist deal outcome");
693
694 if let Some(dealer_me) = &mut self.dealer_me
695 && block_dealer == self.config.me.public_key()
696 {
697 let _ = dealer_me.outcome.take();
698
699 self.ceremony_metadata
700 .lock()
701 .await
702 .upsert_sync(self.epoch().into(), |info| {
703 let _ = info.dealing_outcome.take();
704 })
705 .await
706 .expect("must persist deal outcome");
707
708 info!(
709 "found own dealing in a block; removed it from ceremony to \
710 not include it again"
711 );
712 }
713
714 self.metrics.dealings_read.inc();
715 Ok(())
716 }
717
718 #[instrument(skip_all, fields(epoch = self.epoch()), err)]
722 pub(super) async fn construct_intermediate_outcome(
723 &mut self,
724 hardfork_regime: HardforkRegime,
725 ) -> eyre::Result<()> {
726 let Some(dealer_me) = &mut self.dealer_me else {
727 debug!("not a dealer; skipping construction of deal outcome");
728 return Ok(());
729 };
730 let reveals = self
731 .config
732 .players
733 .iter()
734 .filter_map(|player| {
735 (!dealer_me.acks.contains_key(player))
736 .then(|| dealer_me.shares.get(player).cloned())
737 .flatten()
738 })
739 .collect::<Vec<_>>();
740
741 ensure!(
742 reveals.len() as u32 <= max_faults(self.config.players.len() as u32),
743 "too many reveals; skipping deal outcome construction",
744 );
745
746 let dealing_outcome = match hardfork_regime {
747 HardforkRegime::PostAllegretto => Some(IntermediateOutcome::new(
748 self.config
749 .players
750 .len()
751 .try_into()
752 .expect("we should never have more than u16::MAX validators/players"),
753 &self.config.me,
754 &union(&self.config.namespace, OUTCOME_NAMESPACE),
755 self.config.epoch,
756 dealer_me.commitment.clone(),
757 dealer_me.acks.values().cloned().collect(),
758 reveals,
759 )),
760 HardforkRegime::PreAllegretto => Some(IntermediateOutcome::new_pre_allegretto(
761 self.config
762 .players
763 .len()
764 .try_into()
765 .expect("we should never have more than u16::MAX validators/players"),
766 &self.config.me,
767 &union(&self.config.namespace, OUTCOME_NAMESPACE),
768 self.config.epoch,
769 dealer_me.commitment.clone(),
770 dealer_me.acks.values().cloned().collect(),
771 reveals,
772 )),
773 };
774
775 self.ceremony_metadata
776 .lock()
777 .await
778 .upsert_sync(self.config.epoch.into(), |info| {
779 info.dealing_outcome = dealing_outcome.clone();
780 })
781 .await
782 .expect("must persist local outcome");
783
784 dealer_me.outcome = dealing_outcome;
785
786 Ok(())
787 }
788
789 #[instrument(skip_all, fields(epoch = self.epoch()))]
800 pub(super) fn finalize(self) -> Result<PrivateOutcome, PrivateOutcome> {
801 let (result, disqualified) = self.arbiter.finalize();
802
803 let arbiter::Output {
804 public,
805 commitments,
806 reveals,
807 } = match result {
808 Ok(output) => output,
809 Err(error) => {
810 error!(
811 error = %eyre::Report::new(error),
812 ?disqualified,
813 "failed to finalize arbiter; aborting ceremony and \
814 returning previous dealers and commitment",
815 );
816 return Err(PrivateOutcome {
817 participants: self.config.dealers,
818 role: self.previous_role,
819 });
820 }
821 };
822
823 let new_role = if let Some(player_me) = self.player_me {
824 let my_index = self
825 .players_indexed
826 .get_index_of(&self.config.me.public_key())
827 .expect("if I am a player, I must be indexed");
828 let reveals = reveals
829 .into_iter()
830 .filter_map(|(dealer_idx, shares)| {
831 shares
832 .iter()
833 .find(|s| s.index == my_index as u32)
834 .cloned()
835 .map(|share| (dealer_idx, share))
836 })
837 .collect::<BTreeMap<_, _>>();
838
839 let n_commitments = commitments.len();
840 let n_reveals = reveals.len();
841
842 let output = match player_me.finalize(commitments, reveals) {
843 Ok(output) => output,
844 Err(error) => {
845 error!(
846 error = %eyre::Report::new(error),
847 "failed to finalize player; aborting ceremony and \
848 returning previous dealers and commitment"
849 );
850 return Err(PrivateOutcome {
851 participants: self.config.dealers,
852 role: self.previous_role,
853 });
854 }
855 };
856
857 info!(
858 ?disqualified,
859 n_commitments,
860 n_reveals,
861 "successfully finalized DKG ceremony; returning new \
862 players and commitment"
863 );
864
865 Role::Signer {
866 public: output.public,
867 share: output.share,
868 }
869 } else {
870 Role::Verifier { public }
871 };
872
873 Ok(PrivateOutcome {
874 participants: self.config.players,
875 role: new_role,
876 })
877 }
878
879 pub(super) fn epoch(&self) -> Epoch {
880 self.config.epoch
881 }
882
883 pub(super) fn deal_outcome(&self) -> Option<&IntermediateOutcome> {
884 let dealer_me = self.dealer_me.as_ref()?;
885 dealer_me.outcome.as_ref()
886 }
887
888 pub(super) fn dealers(&self) -> &Ordered<PublicKey> {
889 &self.config.dealers
890 }
891
892 pub(super) fn players(&self) -> &[PublicKey] {
893 self.config.players.as_ref()
894 }
895
896 pub(super) fn is_dealer(&self) -> bool {
897 self.dealer_me.is_some()
898 }
899
900 pub(super) fn is_player(&self) -> bool {
901 self.player_me.is_some()
902 }
903}
904
905struct Dealer {
907 commitment: Public<MinSig>,
909 shares: BTreeMap<PublicKey, group::Share>,
911 acks: BTreeMap<PublicKey, Ack>,
913 outcome: Option<IntermediateOutcome>,
917}
918
919pub(super) struct PrivateOutcome {
923 pub(super) participants: Ordered<PublicKey>,
926
927 pub(super) role: Role,
929}
930
931pub(super) enum Role {
934 Signer {
936 public: Public<MinSig>,
937 share: group::Share,
938 },
939 Verifier { public: Public<MinSig> },
941}
942
943impl Role {
944 pub(super) fn into_key_pair(self) -> (Public<MinSig>, Option<group::Share>) {
948 match self {
949 Self::Signer {
950 public: polynomial,
951 share,
952 } => (polynomial, Some(share)),
953 Self::Verifier { public: polynomial } => (polynomial, None),
954 }
955 }
956}
957
958#[derive(Clone)]
960pub(super) struct Metrics {
961 shares_distributed: Gauge,
962 shares_received: Gauge,
963 acks_received: Gauge,
964 acks_sent: Gauge,
965 dealings_read: Gauge,
966 dealings_empty: Gauge,
967 bad_dealings: Gauge,
968
969 failures: Counter,
970 successes: Counter,
971 dealers: Gauge,
972 players: Gauge,
973
974 how_often_dealer: Counter,
975 how_often_player: Counter,
976}
977
978impl Metrics {
979 pub(super) fn register<C: commonware_runtime::Metrics>(context: &C) -> Self {
984 let failures = Counter::default();
985 let successes = Counter::default();
986
987 let dealers = Gauge::default();
988 let players = Gauge::default();
989
990 let how_often_dealer = Counter::default();
991 let how_often_player = Counter::default();
992
993 let shares_distributed = Gauge::default();
994 let shares_received = Gauge::default();
995 let acks_received = Gauge::default();
996 let acks_sent = Gauge::default();
997 let dealings_read = Gauge::default();
998 let dealings_empty = Gauge::default();
999 let bad_dealings = Gauge::default();
1000
1001 context.register(
1002 "ceremony_failures",
1003 "the number of failed ceremonies a node participated in",
1004 failures.clone(),
1005 );
1006 context.register(
1007 "ceremony_successes",
1008 "the number of successful ceremonies a node participated in",
1009 successes.clone(),
1010 );
1011 context.register(
1012 "ceremony_dealers",
1013 "the number of dealers in the currently running ceremony",
1014 dealers.clone(),
1015 );
1016 context.register(
1017 "ceremony_players",
1018 "the number of players in the currently running ceremony",
1019 players.clone(),
1020 );
1021
1022 context.register(
1024 "how_often_dealer",
1025 "number of the times as node was active as a dealer",
1026 how_often_dealer.clone(),
1027 );
1028 context.register(
1030 "how_often_player",
1031 "number of the times as node was active as a player",
1032 how_often_player.clone(),
1033 );
1034
1035 context.register(
1036 "ceremony_shares_distributed",
1037 "the number of shares distributed by this node as a dealer in the current ceremony",
1038 shares_distributed.clone(),
1039 );
1040 context.register(
1041 "ceremony_shares_received",
1042 "the number of shares received by this node as a playr in the current ceremony",
1043 shares_received.clone(),
1044 );
1045 context.register(
1046 "ceremony_acks_received",
1047 "the number of acknowledgments received by this node as a dealer in the current ceremony",
1048 acks_received.clone(),
1049 );
1050 context.register(
1051 "ceremony_acks_sent",
1052 "the number of acknowledgments sent by this node as a player in the current ceremony",
1053 acks_sent.clone(),
1054 );
1055 context.register(
1056 "ceremony_dealings_read",
1057 "the number of dealings read from the blockchain in the current ceremony",
1058 dealings_read.clone(),
1059 );
1060 context.register(
1061 "ceremony_dealings_empty",
1062 "the number of blocks with empty extra_data (no dealing) in the current ceremony",
1063 dealings_empty.clone(),
1064 );
1065 context.register(
1066 "ceremony_bad_dealings",
1067 "the number of blocks where decoding and verifying dealings failed in the current ceremony",
1068 bad_dealings.clone(),
1069 );
1070
1071 Self {
1072 shares_distributed,
1073 shares_received,
1074 acks_received,
1075 acks_sent,
1076 dealings_read,
1077 dealings_empty,
1078 bad_dealings,
1079 dealers,
1080 players,
1081 how_often_dealer,
1082 how_often_player,
1083 failures,
1084 successes,
1085 }
1086 }
1087
1088 fn reset_per_ceremony_metrics(&self) {
1091 self.shares_distributed.set(0);
1092 self.shares_received.set(0);
1093 self.acks_received.set(0);
1094 self.acks_sent.set(0);
1095 self.dealings_read.set(0);
1096 self.dealings_empty.set(0);
1097 self.bad_dealings.set(0);
1098 }
1099
1100 pub(super) fn one_more_failure(&self) {
1102 self.failures.inc();
1103 }
1104
1105 pub(super) fn one_more_success(&self) {
1107 self.successes.inc();
1108 }
1109}