Skip to main content

tempo_commonware_node/dkg/manager/
ingress.rs

1use commonware_consensus::{
2    Reporter,
3    marshal::Update,
4    types::{Epoch, Height},
5};
6use commonware_cryptography::{
7    bls12381::{dkg::SignedDealerLog, primitives::variant::MinSig},
8    ed25519::{PrivateKey, PublicKey},
9};
10use commonware_utils::acknowledgement::Exact;
11use eyre::WrapErr as _;
12use futures::channel::{mpsc, oneshot};
13use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
14use tracing::{Span, warn};
15
16use crate::consensus::{Digest, block::Block};
17
18/// A mailbox to handle finalized blocks.
19///
20/// It implements the `Reporter` trait with associated
21/// `type Activity = Update<Block, Exact>` and is passed to the marshal actor.
22#[derive(Clone, Debug)]
23pub(crate) struct Mailbox {
24    inner: mpsc::UnboundedSender<Message>,
25}
26
27impl Mailbox {
28    pub(super) fn new(inner: mpsc::UnboundedSender<Message>) -> Self {
29        Self { inner }
30    }
31
32    /// Returns the dealer log of the node to include in a proposal.
33    ///
34    /// Returns `None` if this node was not a dealer, or if the request is
35    /// for a different epoch than the ceremony that's currently running.
36    pub(crate) async fn get_dealer_log(
37        &self,
38        epoch: Epoch,
39    ) -> eyre::Result<Option<SignedDealerLog<MinSig, PrivateKey>>> {
40        let (response, rx) = oneshot::channel();
41        self.inner
42            .unbounded_send(Message::in_current_span(GetDealerLog { epoch, response }))
43            .wrap_err("failed sending message to actor")?;
44        rx.await
45            .wrap_err("actor dropped channel before responding with signed dealer log")
46    }
47
48    pub(crate) async fn get_dkg_outcome(
49        &self,
50        digest: Digest,
51        height: Height,
52    ) -> eyre::Result<OnchainDkgOutcome> {
53        let (response, rx) = oneshot::channel();
54        self.inner
55            .unbounded_send(Message::in_current_span(GetDkgOutcome {
56                digest,
57                height,
58                response,
59            }))
60            .wrap_err("failed sending message to actor")?;
61        rx.await
62            .wrap_err("actor dropped channel before responding with ceremony deal outcome")
63    }
64
65    /// Verifies the `dealing` based on the current status of the DKG actor.
66    ///
67    /// This method is intended to be called by the application when verifying
68    /// the dealing found in a proposal.
69    pub(crate) async fn verify_dealer_log(
70        &self,
71        epoch: Epoch,
72        bytes: Vec<u8>,
73    ) -> eyre::Result<PublicKey> {
74        let (response, rx) = oneshot::channel();
75        self.inner
76            .unbounded_send(Message::in_current_span(VerifyDealerLog {
77                bytes,
78                epoch,
79                response,
80            }))
81            .wrap_err("failed sending message to actor")?;
82        rx.await
83            .wrap_err("actor dropped channel before responding with ceremony info")
84            // TODO: replace by Result::flatten once MRSV >= 1.89
85            .and_then(|res| res)
86    }
87}
88
89pub(super) struct Message {
90    pub(super) cause: Span,
91    pub(super) command: Command,
92}
93
94impl Message {
95    fn in_current_span(cmd: impl Into<Command>) -> Self {
96        Self {
97            cause: Span::current(),
98            command: cmd.into(),
99        }
100    }
101}
102
103pub(super) enum Command {
104    Update(Box<Update<Block>>),
105
106    // From application
107    GetDealerLog(GetDealerLog),
108    GetDkgOutcome(GetDkgOutcome),
109    VerifyDealerLog(VerifyDealerLog),
110}
111
112impl From<Update<Block>> for Command {
113    fn from(value: Update<Block>) -> Self {
114        Self::Update(Box::new(value))
115    }
116}
117
118impl From<GetDealerLog> for Command {
119    fn from(value: GetDealerLog) -> Self {
120        Self::GetDealerLog(value)
121    }
122}
123
124impl From<VerifyDealerLog> for Command {
125    fn from(value: VerifyDealerLog) -> Self {
126        Self::VerifyDealerLog(value)
127    }
128}
129
130impl From<GetDkgOutcome> for Command {
131    fn from(value: GetDkgOutcome) -> Self {
132        Self::GetDkgOutcome(value)
133    }
134}
135
136pub(super) struct GetDealerLog {
137    pub(super) epoch: Epoch,
138    pub(super) response: oneshot::Sender<Option<SignedDealerLog<MinSig, PrivateKey>>>,
139}
140
141pub(super) struct GetDkgOutcome {
142    pub(super) digest: Digest,
143    pub(super) height: Height,
144    pub(super) response: oneshot::Sender<OnchainDkgOutcome>,
145}
146
147pub(super) struct VerifyDealerLog {
148    pub(super) bytes: Vec<u8>,
149    pub(super) epoch: Epoch,
150    pub(super) response: oneshot::Sender<eyre::Result<PublicKey>>,
151}
152
153impl Reporter for Mailbox {
154    type Activity = Update<Block, Exact>;
155
156    async fn report(&mut self, activity: Self::Activity) {
157        if let Err(error) = self
158            .inner
159            .unbounded_send(Message::in_current_span(activity))
160            .wrap_err("dkg manager no longer running")
161        {
162            warn!(%error, "failed to report finalization activity to dkg manager")
163        }
164    }
165}