tempo_commonware_node/dkg/ceremony/
mod.rs

1//! An actively running DKG ceremony.
2
3use std::{collections::BTreeMap, sync::Arc};
4
5use alloy_consensus::BlockHeader as _;
6use commonware_codec::{Decode as _, Encode as _};
7use commonware_consensus::{Block as _, types::Epoch};
8use commonware_cryptography::{
9    Signer as _,
10    bls12381::{
11        dkg::{self, Arbiter, Player, arbiter},
12        primitives::{group, poly::Public, variant::MinSig},
13    },
14    ed25519::{PrivateKey, PublicKey},
15};
16use commonware_p2p::{
17    Receiver, Recipients, Sender,
18    utils::mux::{MuxHandle, SubReceiver, SubSender},
19};
20use commonware_runtime::{Clock, Storage};
21use commonware_storage::metadata::Metadata;
22use commonware_utils::{max_faults, sequence::U64, set::Ordered, union};
23use eyre::{WrapErr as _, bail, ensure};
24use futures::{FutureExt as _, lock::Mutex};
25use indexmap::IndexSet;
26use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
27use rand_core::CryptoRngCore;
28use tracing::{Level, debug, error, info, instrument, warn};
29
30use tempo_dkg_onchain_artifacts::{Ack, IntermediateOutcome};
31
32use crate::{consensus::block::Block, dkg::HardforkRegime};
33
34mod payload;
35mod persisted;
36
37pub(super) use persisted::State;
38
39use payload::{Message, Payload, Share};
40use persisted::Dealing;
41
42const ACK_NAMESPACE: &[u8] = b"_DKG_ACK";
43pub(super) const OUTCOME_NAMESPACE: &[u8] = b"_DKG_OUTCOME";
44
45/// Recovering public weights is a heavy operation. For simplicity, we use just
46/// 1 thread for now.
47const WEIGHT_RECOVERY_CONCURRENCY: usize = 1;
48
49pub(super) struct Config {
50    /// Prefix all signed messages to prevent replay attacks.
51    pub(super) namespace: Vec<u8>,
52
53    pub(super) me: PrivateKey,
54
55    /// The previous public polynomial.
56    pub(super) public: Public<MinSig>,
57
58    /// Our previous share of the private polynomial. This dictates if we
59    /// become a dealer in the new round: no share -> not a dealer.
60    pub(super) share: Option<group::Share>,
61
62    /// The current epoch.
63    pub(super) epoch: Epoch,
64
65    /// The dealers in the round.
66    pub(super) dealers: Ordered<PublicKey>,
67
68    /// The players in the round.
69    pub(super) players: Ordered<PublicKey>,
70}
71pub(super) struct Ceremony<TContext, TReceiver, TSender>
72where
73    TContext: Clock + commonware_runtime::Metrics + Storage,
74    TReceiver: Receiver,
75    TSender: Sender,
76{
77    config: Config,
78
79    /// The previous role of this node in the network. This contains either
80    /// the polynomial (if the node was just a verifier), or the polynomial
81    /// and share of the private key if the node was a signer.
82    previous_role: Role,
83
84    /// [Dealer] metadata, if this manager is also dealing.
85    dealer_me: Option<Dealer>,
86
87    /// The local [Player] for this round, if the manager is playing.
88    //
89    // NOTE: right now we should always be playing.
90    player_me: Option<Player<PublicKey, MinSig>>,
91
92    /// The indexed set of all players. Whereas config.players is a sorted
93    /// list of unique playes, players_indexed is an actual set data structure
94    /// that allows O(1) lookup of both keys and indices.
95    ///
96    /// It is an invariant that `players_indexed.get_index_of(players[i]) == i`.
97    players_indexed: IndexSet<PublicKey>,
98
99    /// The local [Arbiter] for this round.
100    arbiter: Arbiter<PublicKey, MinSig>,
101
102    ceremony_metadata: Arc<Mutex<Metadata<TContext, U64, State>>>,
103    receiver: SubReceiver<TReceiver>,
104    sender: SubSender<TSender>,
105    metrics: Metrics,
106}
107
108impl<TContext, TReceiver, TSender> Ceremony<TContext, TReceiver, TSender>
109where
110    TContext: Clock + CryptoRngCore + commonware_runtime::Metrics + Storage,
111    TReceiver: Receiver<PublicKey = PublicKey>,
112    TSender: Sender<PublicKey = PublicKey>,
113{
114    /// Initialize a DKG ceremony.
115    #[instrument(skip_all, fields(for_epoch = config.epoch), err)]
116    pub(super) async fn init(
117        context: &mut TContext,
118        mux: &mut MuxHandle<TSender, TReceiver>,
119        ceremony_metadata: Arc<Mutex<Metadata<TContext, U64, State>>>,
120        config: Config,
121        metrics: Metrics,
122    ) -> eyre::Result<Self> {
123        // Reset the cumulants for the current ceremony back to zero instead
124        // of creating fresh metrics: registering new metrics would just push
125        // more and more into the prometheus registry without ever pruning.
126        metrics.reset_per_ceremony_metrics();
127
128        metrics.dealers.set(config.dealers.len() as i64);
129        metrics.players.set(config.players.len() as i64);
130
131        let (sender, receiver) = mux
132            .register(config.epoch)
133            .await
134            .wrap_err("mux subchannel already running for epoch; this is a problem")?;
135
136        let players_indexed: IndexSet<_> = config.players.iter().cloned().collect();
137        let mut player_me = players_indexed.get(&config.me.public_key()).map(|_| {
138            Player::new(
139                config.me.public_key(),
140                Some(config.public.clone()),
141                config.dealers.clone(),
142                config.players.clone(),
143                WEIGHT_RECOVERY_CONCURRENCY,
144            )
145        });
146
147        let mut arbiter = Arbiter::new(
148            Some(config.public.clone()),
149            config.dealers.clone(),
150            config.players.clone(),
151            WEIGHT_RECOVERY_CONCURRENCY,
152        );
153
154        let mut dealer_me = None;
155
156        debug!("attempting to read ceremony state from disk");
157        // TODO(janis): move this "recovery" logic to a function.
158        // Clone in order to not hold onto the lock too long.
159        let recovered = ceremony_metadata
160            .lock()
161            .await
162            .get(&config.epoch.into())
163            .cloned();
164
165        if let Some(recovered) = recovered {
166            info!("found a previous ceremony state written to disk; recovering it");
167            for outcome in &recovered.outcomes {
168                let ack_indices = outcome
169                    .acks()
170                    .iter()
171                    .filter_map(|ack| {
172                        let idx = players_indexed.get_index_of(ack.player());
173                        if idx.is_none() {
174                            warn!(
175                                player = %ack.player(),
176                                "ack for player recovered from disk not among players of this ceremony",
177                            );
178                        }
179                        idx.map(|idx| idx as u32)
180                    })
181                    .collect::<Vec<_>>();
182
183                if let Err(error) = arbiter
184                    .commitment(
185                        outcome.dealer().clone(),
186                        outcome.commitment().clone(),
187                        ack_indices,
188                        outcome.reveals().to_vec(),
189                    )
190                    .wrap_err("failed to verify and track commitment")
191                {
192                    warn!(
193                        %error,
194                        "failed to update arbiter with metadata recovered from disk",
195                    );
196                }
197            }
198
199            if let Some(me) = &mut player_me {
200                for (dealer, commitment, share) in recovered.received_shares.clone() {
201                    me.share(dealer, commitment, share)
202                        .wrap_err("failed updating my player information with stored metadata")?;
203                }
204            }
205
206            if let Some(dealing) = recovered.dealing.clone() {
207                let (mut dkg_dealer, _, _) = dkg::Dealer::<PublicKey, MinSig>::new(
208                    context,
209                    config.share.clone(),
210                    config.players.clone(),
211                );
212                for ack in dealing.acks.values() {
213                    dkg_dealer.ack(ack.player().clone()).wrap_err_with(|| {
214                        format!(
215                            "failed updating dealer information with ack for \
216                             player `{player}` recovered from disk",
217                            player = ack.player(),
218                        )
219                    })?;
220                }
221                dealer_me = Some(Dealer {
222                    commitment: dealing.commitment,
223                    shares: dealing.shares,
224                    acks: dealing.acks,
225                    outcome: recovered.dealing_outcome,
226                });
227            }
228        } else {
229            info!("starting a fresh ceremony");
230
231            if let Some(share) = config.share.clone() {
232                info!("we have a share, so we are a dealer in this ceremony");
233                let (_, commitment, shares) = dkg::Dealer::<PublicKey, MinSig>::new(
234                    context,
235                    Some(share),
236                    config.players.clone(),
237                );
238                let shares = config
239                    .players
240                    .iter()
241                    .zip(&shares)
242                    .map(|(player, share)| (player.clone(), share.clone()))
243                    .collect();
244                dealer_me = Some(Dealer {
245                    commitment,
246                    shares,
247                    acks: BTreeMap::new(),
248                    outcome: None,
249                });
250            }
251
252            ceremony_metadata
253                .lock()
254                .await
255                .put_sync(
256                    config.epoch.into(),
257                    State {
258                        num_players: config
259                            .players
260                            .len()
261                            .try_into()
262                            .expect("there should never be more than u16::MAX players"),
263                        dealing: dealer_me.as_ref().map(|me| Dealing {
264                            commitment: me.commitment.clone(),
265                            shares: me.shares.clone(),
266                            acks: BTreeMap::new(),
267                        }),
268                        ..State::default()
269                    },
270                )
271                .await
272                .expect("must always be able to initialize the ceremony state to disk");
273        };
274
275        metrics.how_often_player.inc_by(player_me.is_some() as u64);
276        metrics.how_often_dealer.inc_by(dealer_me.is_some() as u64);
277
278        let previous = config.share.clone().map_or_else(
279            || Role::Verifier {
280                public: config.public.clone(),
281            },
282            |share| Role::Signer {
283                public: config.public.clone(),
284                share,
285            },
286        );
287        Ok(Self {
288            config,
289            previous_role: previous,
290            dealer_me,
291            player_me,
292            players_indexed,
293            arbiter,
294            ceremony_metadata,
295            receiver,
296            sender,
297            metrics,
298        })
299    }
300
301    /// Sends shares to all players for acknowledgements.
302    ///
303    /// Does not send shares if we are not a dealer in this ceremony.
304    ///
305    /// If we are both a dealer and a player, then we acknowledge our shares
306    /// immediately without going over the p2p network.
307    #[instrument(skip_all, fields(epoch = self.config.epoch), err)]
308    pub(super) async fn distribute_shares(&mut self) -> eyre::Result<()> {
309        let Some(dealer_me) = &mut self.dealer_me else {
310            debug!("not a dealer, not distributing shares");
311            return Ok(());
312        };
313        for player in &self.config.players {
314            if dealer_me.acks.contains_key(player) {
315                continue;
316            }
317
318            let share = dealer_me
319                .shares
320                .get(player)
321                .cloned()
322                .expect("invariant violated: all players must have an entry in the shares map");
323
324            if let Some(player_me) = &mut self.player_me
325                && player == &self.config.me.public_key()
326            {
327                player_me
328                    .share(
329                        self.config.me.public_key(),
330                        dealer_me.commitment.clone(),
331                        share.clone(),
332                    )
333                    .expect(
334                        "must work: updating player with own dealer \
335                        commitment",
336                    );
337
338                // TODO(janis): easy to mess up the fields because some of them
339                // are of the same type. Better pass in a struct or create a
340                // builder.
341                let ack = Ack::new(
342                    &union(&self.config.namespace, ACK_NAMESPACE),
343                    self.config.me.clone(),
344                    self.config.me.public_key(),
345                    self.config.epoch,
346                    &self.config.me.public_key(),
347                    &dealer_me.commitment,
348                );
349                assert_eq!(
350                    None,
351                    dealer_me
352                        .acks
353                        .insert(self.config.me.public_key(), ack.clone()),
354                    "must only insert our own ack once",
355                );
356
357                self.ceremony_metadata
358                    .lock()
359                    .await
360                    .upsert_sync(self.config.epoch.into(), |info| {
361                        if let Some(dealing) = &mut info.dealing {
362                            dealing.acks.insert(self.config.me.public_key(), ack);
363                        } else {
364                            info.dealing = Some(Dealing {
365                                commitment: dealer_me.commitment.clone(),
366                                shares: dealer_me.shares.clone(),
367                                acks: BTreeMap::from([(self.config.me.public_key(), ack)]),
368                            });
369                        }
370                        info.received_shares.push((
371                            self.config.me.public_key(),
372                            dealer_me.commitment.clone(),
373                            share,
374                        ));
375                    })
376                    .await
377                    .expect("must be able to persists acks");
378                // When self-distributing, we also "receive" the share and "send" an ack to ourselves
379                self.metrics.shares_distributed.inc();
380                self.metrics.acks_received.inc();
381                self.metrics.acks_sent.inc();
382                continue;
383            }
384
385            let payload = Share {
386                commitment: dealer_me.commitment.clone(),
387                share,
388            }
389            .into();
390            let success = self
391                .sender
392                .send(
393                    Recipients::One(player.clone()),
394                    Message {
395                        epoch: self.config.epoch,
396                        payload,
397                    }
398                    .encode()
399                    .freeze(),
400                    true,
401                )
402                .await
403                .wrap_err("unable to forward share to p2p network")?;
404
405            if success.is_empty() {
406                warn!(%player, "failed to send share to player");
407            } else {
408                info!(%player, "sent share to player");
409                self.metrics.shares_distributed.inc();
410            }
411        }
412        Ok(())
413    }
414
415    /// Processes all received shares and acks on the ceremony's p2p subchannel.
416    ///
417    /// If we receive a share and are a player: construct an ack and return it
418    /// to the sender.
419    ///
420    /// If we receive an ack and are a dealer: track the ack.
421    #[instrument(skip_all, fields(epoch = self.epoch()), err)]
422    pub(super) async fn process_messages(&mut self) -> eyre::Result<()> {
423        while let Some(msg) = self.receiver.recv().now_or_never() {
424            let (peer, mut msg) = msg.wrap_err("receiver p2p channel was closed")?;
425
426            debug!(%peer, "received message from");
427            let msg = Message::decode_cfg(&mut msg, &(self.config.players.len() as u32))
428                .wrap_err("unable to decode message")?;
429            if msg.epoch != self.epoch() {
430                warn!(
431                    ceremony.epoch = self.epoch(),
432                    msg.epoch = msg.epoch,
433                    "ignoring message for different round"
434                );
435                continue;
436            }
437
438            match msg.payload {
439                Payload::Ack(ack) => {
440                    let _: Result<_, _> = self.process_ack(peer, *ack).await;
441                }
442                Payload::Share(share) => {
443                    let _: Result<_, _> = self.process_share(peer, share).await;
444                }
445            }
446        }
447
448        Ok(())
449    }
450
451    #[instrument(
452        skip_all,
453        fields(
454            epoch = %self.epoch(),
455            %peer,
456            player = %ack.player(),
457        ),
458        err(level = Level::WARN),
459        ret,
460    )]
461    async fn process_ack(&mut self, peer: PublicKey, ack: Ack) -> eyre::Result<&'static str> {
462        self.metrics.acks_received.inc();
463        let Some(dealer_me) = &mut self.dealer_me else {
464            return Ok("not a dealer, dropping ack");
465        };
466
467        ensure!(
468            ack.player() == &peer,
469            "player recorded in ack does not match peer that sent it; dropping ack",
470        );
471
472        ensure!(
473            self.players_indexed.contains(&peer),
474            "peer not among players for this ceremony; dropping ack",
475        );
476
477        ensure!(
478            ack.verify(
479                &union(&self.config.namespace, ACK_NAMESPACE),
480                &peer,
481                self.config.epoch,
482                &self.config.me.public_key(),
483                &dealer_me.commitment,
484            ),
485            "failed verifying ack signature against peer",
486        );
487
488        if let std::collections::btree_map::Entry::Vacant(vacant) =
489            dealer_me.acks.entry(peer.clone())
490        {
491            vacant.insert(ack.clone());
492        } else {
493            bail!("duplicate ack for peer");
494        }
495
496        self.ceremony_metadata
497            .lock()
498            .await
499            .upsert_sync(self.config.epoch.into(), |info| {
500                if let Some(dealing) = &mut info.dealing {
501                    dealing.acks.insert(peer.clone(), ack);
502                } else {
503                    info.dealing = Some(Dealing {
504                        commitment: dealer_me.commitment.clone(),
505                        shares: dealer_me.shares.clone(),
506                        acks: BTreeMap::from([(peer.clone(), ack)]),
507                    });
508                }
509            })
510            .await
511            .expect("must always be able to persist tracked acks to disk");
512
513        Ok("ack recorded")
514    }
515
516    #[instrument(
517        skip_all,
518        fields(
519            epoch = %self.epoch(),
520            %peer,
521        ),
522        err(level = Level::WARN),
523        ret,
524    )]
525    async fn process_share(
526        &mut self,
527        peer: PublicKey,
528        Share { commitment, share }: Share,
529    ) -> eyre::Result<&'static str> {
530        self.metrics.shares_received.inc();
531        let Some(player_me) = &mut self.player_me else {
532            return Ok("not a player, dropping share");
533        };
534
535        // This also checks peer is the correct dealer.
536        player_me
537            .share(peer.clone(), commitment.clone(), share.clone())
538            .wrap_err("failed to record and track share")?;
539
540        self.ceremony_metadata
541            .lock()
542            .await
543            .upsert_sync(self.epoch().into(), |info| {
544                info.received_shares
545                    .push((peer.clone(), commitment.clone(), share));
546            })
547            .await
548            .expect("must always be able to persist tracked shares to disk");
549
550        let payload = Ack::new(
551            &union(&self.config.namespace, ACK_NAMESPACE),
552            self.config.me.clone(),
553            self.config.me.public_key(),
554            self.epoch(),
555            &peer,
556            &commitment,
557        )
558        .into();
559        self.sender
560            .send(
561                Recipients::One(peer.clone()),
562                Message {
563                    epoch: self.epoch(),
564                    payload,
565                }
566                .encode()
567                .freeze(),
568                true,
569            )
570            .await
571            .wrap_err("failed returning ack to peer")?;
572
573        self.metrics.acks_sent.inc();
574        Ok("recorded share and returned signed ack to peer")
575    }
576
577    /// Process `block` by reading [`IntermediateOutcome`] from its header.
578    ///
579    /// If the block contains this outcome, the ceremony will verify it and
580    /// track it in its arbiter.
581    #[instrument(skip_all, fields(epoch = self.epoch(), block.height = block.height()), err)]
582    pub(super) async fn process_dealings_in_block(
583        &mut self,
584        block: &Block,
585        hardfork_regime: HardforkRegime,
586    ) -> eyre::Result<()> {
587        // Track empty vs failed metrics separately
588        if block.header().extra_data().is_empty() {
589            self.metrics.dealings_empty.inc();
590            return Ok(());
591        }
592
593        let block_outcome = match block
594            .try_read_ceremony_deal_outcome()
595            .wrap_err("failed reading intermediate DKG dealings from block")
596        {
597            Ok(outcome) => outcome,
598            Err(error) => {
599                self.metrics.bad_dealings.inc();
600                return Err(error);
601            }
602        };
603
604        info!(
605            dealer = %block_outcome.dealer(),
606            "found DKG dealing in block",
607        );
608
609        // Ensure the outcome is for the current round.
610        ensure!(
611            block_outcome.epoch() == self.epoch(),
612            "deal outcome in block was for epoch `{}`, but current dkg ceremony is for epoch `{}`",
613            block_outcome.epoch(),
614            self.epoch(),
615        );
616
617        ensure!(
618            self.dealers().position(block_outcome.dealer()).is_some(),
619            "dealer `{}` recorded in dealing outcome is not among the dealers of this ceremony",
620            block_outcome.dealer(),
621        );
622
623        // Verify the dealer's signature before considering processing the outcome.
624        let is_verified = match hardfork_regime {
625            HardforkRegime::PostAllegretto => {
626                block_outcome.verify(&union(&self.config.namespace, OUTCOME_NAMESPACE))
627            }
628            HardforkRegime::PreAllegretto => block_outcome
629                .verify_pre_allegretto(&union(&self.config.namespace, OUTCOME_NAMESPACE)),
630        };
631        if !is_verified {
632            self.metrics.bad_dealings.inc();
633            bail!("intermediate DKG dealing could not be verified");
634        }
635
636        // Verify all ack signatures
637        if !block_outcome.acks().iter().all(|ack| {
638            self.players_indexed.contains(ack.player())
639                && ack.verify(
640                    &union(&self.config.namespace, ACK_NAMESPACE),
641                    ack.player(),
642                    self.epoch(),
643                    block_outcome.dealer(),
644                    block_outcome.commitment(),
645                )
646        }) {
647            self.arbiter.disqualify(block_outcome.dealer().clone());
648            bail!("invalid ack signatures; disqualifying dealer");
649        }
650
651        // Check dealer commitment
652        let ack_indices = block_outcome
653            .acks()
654            .iter()
655            .filter_map(|ack| {
656                let idx = self.players_indexed.get_index_of(ack.player());
657                if idx.is_none() {
658                    warn!(
659                        player = %ack.player(),
660                        "ack for player stored on disk not among players of this ceremony",
661                    );
662                }
663                idx.map(|idx| idx as u32)
664            })
665            .collect::<Vec<_>>();
666
667        self.arbiter
668            .commitment(
669                block_outcome.dealer().clone(),
670                block_outcome.commitment().clone(),
671                ack_indices,
672                block_outcome.reveals().to_vec(),
673            )
674            .wrap_err("failed to track dealer outcome in arbiter")?;
675
676        let block_dealer = block_outcome.dealer().clone();
677        self.ceremony_metadata
678            .lock()
679            .await
680            .upsert_sync(self.epoch().into(), |info| {
681                if let Some(pos) = info
682                    .outcomes
683                    .iter()
684                    .position(|outcome| outcome.dealer() == block_outcome.dealer())
685                {
686                    info.outcomes[pos] = block_outcome;
687                } else {
688                    info.outcomes.push(block_outcome);
689                }
690            })
691            .await
692            .expect("must persist deal outcome");
693
694        if let Some(dealer_me) = &mut self.dealer_me
695            && block_dealer == self.config.me.public_key()
696        {
697            let _ = dealer_me.outcome.take();
698
699            self.ceremony_metadata
700                .lock()
701                .await
702                .upsert_sync(self.epoch().into(), |info| {
703                    let _ = info.dealing_outcome.take();
704                })
705                .await
706                .expect("must persist deal outcome");
707
708            info!(
709                "found own dealing in a block; removed it from ceremony to \
710                not include it again"
711            );
712        }
713
714        self.metrics.dealings_read.inc();
715        Ok(())
716    }
717
718    /// Constructs and stores the intermediate ceremony outcome.
719    ///
720    /// If the node is not a dealer, then this is a no-op.
721    #[instrument(skip_all, fields(epoch = self.epoch()), err)]
722    pub(super) async fn construct_intermediate_outcome(
723        &mut self,
724        hardfork_regime: HardforkRegime,
725    ) -> eyre::Result<()> {
726        let Some(dealer_me) = &mut self.dealer_me else {
727            debug!("not a dealer; skipping construction of deal outcome");
728            return Ok(());
729        };
730        let reveals = self
731            .config
732            .players
733            .iter()
734            .filter_map(|player| {
735                (!dealer_me.acks.contains_key(player))
736                    .then(|| dealer_me.shares.get(player).cloned())
737                    .flatten()
738            })
739            .collect::<Vec<_>>();
740
741        ensure!(
742            reveals.len() as u32 <= max_faults(self.config.players.len() as u32),
743            "too many reveals; skipping deal outcome construction",
744        );
745
746        let dealing_outcome = match hardfork_regime {
747            HardforkRegime::PostAllegretto => Some(IntermediateOutcome::new(
748                self.config
749                    .players
750                    .len()
751                    .try_into()
752                    .expect("we should never have more than u16::MAX validators/players"),
753                &self.config.me,
754                &union(&self.config.namespace, OUTCOME_NAMESPACE),
755                self.config.epoch,
756                dealer_me.commitment.clone(),
757                dealer_me.acks.values().cloned().collect(),
758                reveals,
759            )),
760            HardforkRegime::PreAllegretto => Some(IntermediateOutcome::new_pre_allegretto(
761                self.config
762                    .players
763                    .len()
764                    .try_into()
765                    .expect("we should never have more than u16::MAX validators/players"),
766                &self.config.me,
767                &union(&self.config.namespace, OUTCOME_NAMESPACE),
768                self.config.epoch,
769                dealer_me.commitment.clone(),
770                dealer_me.acks.values().cloned().collect(),
771                reveals,
772            )),
773        };
774
775        self.ceremony_metadata
776            .lock()
777            .await
778            .upsert_sync(self.config.epoch.into(), |info| {
779                info.dealing_outcome = dealing_outcome.clone();
780            })
781            .await
782            .expect("must persist local outcome");
783
784        dealer_me.outcome = dealing_outcome;
785
786        Ok(())
787    }
788
789    /// Finalizes the ceremony, returning the participants and key pair for the
790    /// next epoch.
791    ///
792    /// If the ceremony was successful, the players of the ceremony and the new
793    /// public key will be returned in Ok-position. If this node was a player,
794    /// it will also contain its private share.
795    ///
796    /// If the ceremony failed, the dealers of the ceremony and the old public
797    /// key will be returned in Err-position. If this node was a dealer, this
798    /// will include its old private share.
799    #[instrument(skip_all, fields(epoch = self.epoch()))]
800    pub(super) fn finalize(self) -> Result<PrivateOutcome, PrivateOutcome> {
801        let (result, disqualified) = self.arbiter.finalize();
802
803        let arbiter::Output {
804            public,
805            commitments,
806            reveals,
807        } = match result {
808            Ok(output) => output,
809            Err(error) => {
810                error!(
811                    error = %eyre::Report::new(error),
812                    ?disqualified,
813                    "failed to finalize arbiter; aborting ceremony and \
814                    returning previous dealers and commitment",
815                );
816                return Err(PrivateOutcome {
817                    participants: self.config.dealers,
818                    role: self.previous_role,
819                });
820            }
821        };
822
823        let new_role = if let Some(player_me) = self.player_me {
824            let my_index = self
825                .players_indexed
826                .get_index_of(&self.config.me.public_key())
827                .expect("if I am a player, I must be indexed");
828            let reveals = reveals
829                .into_iter()
830                .filter_map(|(dealer_idx, shares)| {
831                    shares
832                        .iter()
833                        .find(|s| s.index == my_index as u32)
834                        .cloned()
835                        .map(|share| (dealer_idx, share))
836                })
837                .collect::<BTreeMap<_, _>>();
838
839            let n_commitments = commitments.len();
840            let n_reveals = reveals.len();
841
842            let output = match player_me.finalize(commitments, reveals) {
843                Ok(output) => output,
844                Err(error) => {
845                    error!(
846                        error = %eyre::Report::new(error),
847                        "failed to finalize player; aborting ceremony and \
848                        returning previous dealers and commitment"
849                    );
850                    return Err(PrivateOutcome {
851                        participants: self.config.dealers,
852                        role: self.previous_role,
853                    });
854                }
855            };
856
857            info!(
858                ?disqualified,
859                n_commitments,
860                n_reveals,
861                "successfully finalized DKG ceremony; returning new \
862                    players and commitment"
863            );
864
865            Role::Signer {
866                public: output.public,
867                share: output.share,
868            }
869        } else {
870            Role::Verifier { public }
871        };
872
873        Ok(PrivateOutcome {
874            participants: self.config.players,
875            role: new_role,
876        })
877    }
878
879    pub(super) fn epoch(&self) -> Epoch {
880        self.config.epoch
881    }
882
883    pub(super) fn deal_outcome(&self) -> Option<&IntermediateOutcome> {
884        let dealer_me = self.dealer_me.as_ref()?;
885        dealer_me.outcome.as_ref()
886    }
887
888    pub(super) fn dealers(&self) -> &Ordered<PublicKey> {
889        &self.config.dealers
890    }
891
892    pub(super) fn players(&self) -> &[PublicKey] {
893        self.config.players.as_ref()
894    }
895
896    pub(super) fn is_dealer(&self) -> bool {
897        self.dealer_me.is_some()
898    }
899
900    pub(super) fn is_player(&self) -> bool {
901        self.player_me.is_some()
902    }
903}
904
905/// Metadata associated with a [Dealer].
906struct Dealer {
907    /// The [Dealer]'s commitment.
908    commitment: Public<MinSig>,
909    /// The dealer's shares for all players.
910    shares: BTreeMap<PublicKey, group::Share>,
911    /// Signed acknowledgements from contributors.
912    acks: BTreeMap<PublicKey, Ack>,
913    /// The constructed dealing for inclusion in a block.
914    ///
915    /// This is moved out once the outcome was successfully written to chain.
916    outcome: Option<IntermediateOutcome>,
917}
918
919/// The outcome of the ceremony for the local node.
920///
921/// Called private because it potentially contains the private key share.
922pub(super) struct PrivateOutcome {
923    /// The participants of the new epoch. If successful, this will the players
924    /// in the ceremony. If not successful, these are the dealers.
925    pub(super) participants: Ordered<PublicKey>,
926
927    /// The role the node will have in the next epoch.
928    pub(super) role: Role,
929}
930
931/// The resulting keys of the round, dictating whether the node will be a
932/// signer or a verifier in the next epoch.
933pub(super) enum Role {
934    /// The new group polynomial and the local share, if the node was a player.
935    Signer {
936        public: Public<MinSig>,
937        share: group::Share,
938    },
939    /// If the node was not a player in the round it will be just a verifier.
940    Verifier { public: Public<MinSig> },
941}
942
943impl Role {
944    /// Splits the role into a pair of public polynomial and private share.
945    ///
946    /// If a signer, the share will not be unset.
947    pub(super) fn into_key_pair(self) -> (Public<MinSig>, Option<group::Share>) {
948        match self {
949            Self::Signer {
950                public: polynomial,
951                share,
952            } => (polynomial, Some(share)),
953            Self::Verifier { public: polynomial } => (polynomial, None),
954        }
955    }
956}
957
958/// Ceremony specific metrics.
959#[derive(Clone)]
960pub(super) struct Metrics {
961    shares_distributed: Gauge,
962    shares_received: Gauge,
963    acks_received: Gauge,
964    acks_sent: Gauge,
965    dealings_read: Gauge,
966    dealings_empty: Gauge,
967    bad_dealings: Gauge,
968
969    failures: Counter,
970    successes: Counter,
971    dealers: Gauge,
972    players: Gauge,
973
974    how_often_dealer: Counter,
975    how_often_player: Counter,
976}
977
978impl Metrics {
979    /// Construct and register the ceremony-related metrics on `context`.
980    ///
981    /// Note: because there exists no long-lived ceremony specific context,
982    /// most of the metrics defined here carry a manual `ceremony` prefix.
983    pub(super) fn register<C: commonware_runtime::Metrics>(context: &C) -> Self {
984        let failures = Counter::default();
985        let successes = Counter::default();
986
987        let dealers = Gauge::default();
988        let players = Gauge::default();
989
990        let how_often_dealer = Counter::default();
991        let how_often_player = Counter::default();
992
993        let shares_distributed = Gauge::default();
994        let shares_received = Gauge::default();
995        let acks_received = Gauge::default();
996        let acks_sent = Gauge::default();
997        let dealings_read = Gauge::default();
998        let dealings_empty = Gauge::default();
999        let bad_dealings = Gauge::default();
1000
1001        context.register(
1002            "ceremony_failures",
1003            "the number of failed ceremonies a node participated in",
1004            failures.clone(),
1005        );
1006        context.register(
1007            "ceremony_successes",
1008            "the number of successful ceremonies a node participated in",
1009            successes.clone(),
1010        );
1011        context.register(
1012            "ceremony_dealers",
1013            "the number of dealers in the currently running ceremony",
1014            dealers.clone(),
1015        );
1016        context.register(
1017            "ceremony_players",
1018            "the number of players in the currently running ceremony",
1019            players.clone(),
1020        );
1021
1022        // no prefix for legacy reasons
1023        context.register(
1024            "how_often_dealer",
1025            "number of the times as node was active as a dealer",
1026            how_often_dealer.clone(),
1027        );
1028        // no prefix for for legacy reasons
1029        context.register(
1030            "how_often_player",
1031            "number of the times as node was active as a player",
1032            how_often_player.clone(),
1033        );
1034
1035        context.register(
1036            "ceremony_shares_distributed",
1037            "the number of shares distributed by this node as a dealer in the current ceremony",
1038            shares_distributed.clone(),
1039        );
1040        context.register(
1041            "ceremony_shares_received",
1042            "the number of shares received by this node as a playr in the current ceremony",
1043            shares_received.clone(),
1044        );
1045        context.register(
1046            "ceremony_acks_received",
1047            "the number of acknowledgments received by this node as a dealer in the current ceremony",
1048            acks_received.clone(),
1049        );
1050        context.register(
1051            "ceremony_acks_sent",
1052            "the number of acknowledgments sent by this node as a player in the current ceremony",
1053            acks_sent.clone(),
1054        );
1055        context.register(
1056            "ceremony_dealings_read",
1057            "the number of dealings read from the blockchain in the current ceremony",
1058            dealings_read.clone(),
1059        );
1060        context.register(
1061            "ceremony_dealings_empty",
1062            "the number of blocks with empty extra_data (no dealing) in the current ceremony",
1063            dealings_empty.clone(),
1064        );
1065        context.register(
1066            "ceremony_bad_dealings",
1067            "the number of blocks where decoding and verifying dealings failed in the current ceremony",
1068            bad_dealings.clone(),
1069        );
1070
1071        Self {
1072            shares_distributed,
1073            shares_received,
1074            acks_received,
1075            acks_sent,
1076            dealings_read,
1077            dealings_empty,
1078            bad_dealings,
1079            dealers,
1080            players,
1081            how_often_dealer,
1082            how_often_player,
1083            failures,
1084            successes,
1085        }
1086    }
1087
1088    /// Resets per-ceremony gauges to zero. Called when a new ceremony is
1089    /// initialized.
1090    fn reset_per_ceremony_metrics(&self) {
1091        self.shares_distributed.set(0);
1092        self.shares_received.set(0);
1093        self.acks_received.set(0);
1094        self.acks_sent.set(0);
1095        self.dealings_read.set(0);
1096        self.dealings_empty.set(0);
1097        self.bad_dealings.set(0);
1098    }
1099
1100    /// Increments the failed ceremonies counter.
1101    pub(super) fn one_more_failure(&self) {
1102        self.failures.inc();
1103    }
1104
1105    /// Increments the successful ceremonies counter.
1106    pub(super) fn one_more_success(&self) {
1107        self.successes.inc();
1108    }
1109}