tempo_commonware_node/dkg/manager/
ingress.rs1use commonware_consensus::{
2 Reporter,
3 marshal::Update,
4 types::{Epoch, Height},
5};
6use commonware_cryptography::{
7 bls12381::{dkg::SignedDealerLog, primitives::variant::MinSig},
8 ed25519::{PrivateKey, PublicKey},
9};
10use commonware_utils::acknowledgement::Exact;
11use eyre::WrapErr as _;
12use futures::channel::{mpsc, oneshot};
13use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
14use tracing::{Span, warn};
15
16use crate::consensus::{Digest, block::Block};
17
18#[derive(Clone, Debug)]
23pub(crate) struct Mailbox {
24 inner: mpsc::UnboundedSender<Message>,
25}
26
27impl Mailbox {
28 pub(super) fn new(inner: mpsc::UnboundedSender<Message>) -> Self {
29 Self { inner }
30 }
31
32 pub(crate) async fn get_dealer_log(
37 &self,
38 epoch: Epoch,
39 ) -> eyre::Result<Option<SignedDealerLog<MinSig, PrivateKey>>> {
40 let (response, rx) = oneshot::channel();
41 self.inner
42 .unbounded_send(Message::in_current_span(GetDealerLog { epoch, response }))
43 .wrap_err("failed sending message to actor")?;
44 rx.await
45 .wrap_err("actor dropped channel before responding with signed dealer log")
46 }
47
48 pub(crate) async fn get_dkg_outcome(
49 &self,
50 digest: Digest,
51 height: Height,
52 ) -> eyre::Result<OnchainDkgOutcome> {
53 let (response, rx) = oneshot::channel();
54 self.inner
55 .unbounded_send(Message::in_current_span(GetDkgOutcome {
56 digest,
57 height,
58 response,
59 }))
60 .wrap_err("failed sending message to actor")?;
61 rx.await
62 .wrap_err("actor dropped channel before responding with ceremony deal outcome")
63 }
64
65 pub(crate) async fn verify_dealer_log(
70 &self,
71 epoch: Epoch,
72 bytes: Vec<u8>,
73 ) -> eyre::Result<PublicKey> {
74 let (response, rx) = oneshot::channel();
75 self.inner
76 .unbounded_send(Message::in_current_span(VerifyDealerLog {
77 bytes,
78 epoch,
79 response,
80 }))
81 .wrap_err("failed sending message to actor")?;
82 rx.await
83 .wrap_err("actor dropped channel before responding with ceremony info")
84 .and_then(|res| res)
86 }
87}
88
89pub(super) struct Message {
90 pub(super) cause: Span,
91 pub(super) command: Command,
92}
93
94impl Message {
95 fn in_current_span(cmd: impl Into<Command>) -> Self {
96 Self {
97 cause: Span::current(),
98 command: cmd.into(),
99 }
100 }
101}
102
103pub(super) enum Command {
104 Update(Box<Update<Block>>),
105
106 GetDealerLog(GetDealerLog),
108 GetDkgOutcome(GetDkgOutcome),
109 VerifyDealerLog(VerifyDealerLog),
110}
111
112impl From<Update<Block>> for Command {
113 fn from(value: Update<Block>) -> Self {
114 Self::Update(Box::new(value))
115 }
116}
117
118impl From<GetDealerLog> for Command {
119 fn from(value: GetDealerLog) -> Self {
120 Self::GetDealerLog(value)
121 }
122}
123
124impl From<VerifyDealerLog> for Command {
125 fn from(value: VerifyDealerLog) -> Self {
126 Self::VerifyDealerLog(value)
127 }
128}
129
130impl From<GetDkgOutcome> for Command {
131 fn from(value: GetDkgOutcome) -> Self {
132 Self::GetDkgOutcome(value)
133 }
134}
135
136pub(super) struct GetDealerLog {
137 pub(super) epoch: Epoch,
138 pub(super) response: oneshot::Sender<Option<SignedDealerLog<MinSig, PrivateKey>>>,
139}
140
141pub(super) struct GetDkgOutcome {
142 pub(super) digest: Digest,
143 pub(super) height: Height,
144 pub(super) response: oneshot::Sender<OnchainDkgOutcome>,
145}
146
147pub(super) struct VerifyDealerLog {
148 pub(super) bytes: Vec<u8>,
149 pub(super) epoch: Epoch,
150 pub(super) response: oneshot::Sender<eyre::Result<PublicKey>>,
151}
152
153impl Reporter for Mailbox {
154 type Activity = Update<Block, Exact>;
155
156 async fn report(&mut self, activity: Self::Activity) {
157 if let Err(error) = self
158 .inner
159 .unbounded_send(Message::in_current_span(activity))
160 .wrap_err("dkg manager no longer running")
161 {
162 warn!(%error, "failed to report finalization activity to dkg manager")
163 }
164 }
165}