tempo_commonware_node/dkg/manager/
ingress.rs1use commonware_consensus::{Reporter, marshal::Update, types::Epoch};
2use commonware_utils::acknowledgement::Exact;
3use eyre::WrapErr as _;
4use futures::channel::{mpsc, oneshot};
5use tempo_dkg_onchain_artifacts::{IntermediateOutcome, PublicOutcome};
6use tracing::{Span, warn};
7
8use crate::consensus::block::Block;
9
10#[derive(Clone, Debug)]
11pub(crate) struct Mailbox {
12 pub(super) inner: mpsc::UnboundedSender<Message>,
13}
14
15impl Mailbox {
16 pub(crate) async fn get_intermediate_dealing(
21 &self,
22 epoch: Epoch,
23 ) -> eyre::Result<Option<IntermediateOutcome>> {
24 let (response, rx) = oneshot::channel();
25 self.inner
26 .unbounded_send(Message::in_current_span(GetIntermediateDealing {
27 epoch,
28 response,
29 }))
30 .wrap_err("failed sending message to actor")?;
31 rx.await
32 .wrap_err("actor dropped channel before responding with ceremony deal outcome")
33 }
34
35 pub(crate) async fn get_public_ceremony_outcome(&self) -> eyre::Result<PublicOutcome> {
36 let (response, rx) = oneshot::channel();
37 self.inner
38 .unbounded_send(Message::in_current_span(GetOutcome { response }))
39 .wrap_err("failed sending message to actor")?;
40 rx.await
41 .wrap_err("actor dropped channel before responding with ceremony deal outcome")
42 }
43
44 pub(crate) async fn verify_intermediate_dealings(
56 &self,
57 dealing: IntermediateOutcome,
58 ) -> eyre::Result<bool> {
59 let (response, rx) = oneshot::channel();
60 self.inner
61 .unbounded_send(Message::in_current_span(VerifyDealing {
62 dealing: dealing.into(),
63 response,
64 }))
65 .wrap_err("failed sending message to actor")?;
66 rx.await
67 .wrap_err("actor dropped channel before responding with ceremony info")
68 }
69}
70
71pub(super) struct Message {
72 pub(super) cause: Span,
73 pub(super) command: Command,
74}
75
76impl Message {
77 fn in_current_span(cmd: impl Into<Command>) -> Self {
78 Self {
79 cause: Span::current(),
80 command: cmd.into(),
81 }
82 }
83}
84
85pub(super) enum Command {
86 Finalize(Finalize),
87 GetIntermediateDealing(GetIntermediateDealing),
88 GetOutcome(GetOutcome),
89 VerifyDealing(VerifyDealing),
90}
91
92impl From<Finalize> for Command {
93 fn from(value: Finalize) -> Self {
94 Self::Finalize(value)
95 }
96}
97
98impl From<GetIntermediateDealing> for Command {
99 fn from(value: GetIntermediateDealing) -> Self {
100 Self::GetIntermediateDealing(value)
101 }
102}
103
104impl From<VerifyDealing> for Command {
105 fn from(value: VerifyDealing) -> Self {
106 Self::VerifyDealing(value)
107 }
108}
109
110impl From<GetOutcome> for Command {
111 fn from(value: GetOutcome) -> Self {
112 Self::GetOutcome(value)
113 }
114}
115
116pub(super) struct Finalize {
117 pub(super) block: Box<Block>,
118 pub(super) acknowledgment: Exact,
119}
120
121pub(super) struct GetIntermediateDealing {
122 pub(super) epoch: Epoch,
123 pub(super) response: oneshot::Sender<Option<IntermediateOutcome>>,
124}
125
126pub(super) struct GetOutcome {
127 pub(super) response: oneshot::Sender<PublicOutcome>,
128}
129
130pub(super) struct VerifyDealing {
131 pub(super) dealing: Box<IntermediateOutcome>,
132 pub(super) response: oneshot::Sender<bool>,
133}
134
135impl Reporter for Mailbox {
136 type Activity = Update<Block, Exact>;
137
138 async fn report(&mut self, update: Self::Activity) {
139 let Update::Block(block, acknowledgment) = update else {
140 tracing::trace!("dropping tip update; DKG manager is only interested in blocks");
141 return;
142 };
143 if let Err(error) = self
144 .inner
145 .unbounded_send(Message::in_current_span(Finalize {
146 block: block.into(),
147 acknowledgment,
148 }))
149 .wrap_err("dkg manager no longer running")
150 {
151 warn!(%error, "failed to report finalized block to dkg manager")
152 }
153 }
154}