tempo_commonware_node/consensus/application/
ingress.rs

1use commonware_consensus::{
2    Automaton, Relay, Reporter,
3    marshal::Update,
4    simplex::types::Context,
5    types::{Epoch, Round, View},
6};
7
8use commonware_cryptography::ed25519::PublicKey;
9use commonware_utils::acknowledgement::Exact;
10use futures::{
11    SinkExt as _,
12    channel::{mpsc, oneshot},
13};
14
15use crate::consensus::{Digest, block::Block};
16
17#[derive(Clone)]
18pub(crate) struct Mailbox {
19    inner: mpsc::Sender<Message>,
20}
21
22impl Mailbox {
23    pub(super) fn from_sender(inner: mpsc::Sender<Message>) -> Self {
24        Self { inner }
25    }
26}
27
28/// Messages forwarded from consensus to application.
29// TODO: add trace spans into all of these messages.
30pub(super) enum Message {
31    Broadcast(Broadcast),
32    Finalized(Box<Finalized>),
33    Genesis(Genesis),
34    Propose(Propose),
35    Verify(Box<Verify>),
36}
37
38pub(super) struct Genesis {
39    pub(super) epoch: Epoch,
40    pub(super) response: oneshot::Sender<Digest>,
41}
42
43impl From<Genesis> for Message {
44    fn from(value: Genesis) -> Self {
45        Self::Genesis(value)
46    }
47}
48
49pub(super) struct Propose {
50    pub(super) parent: (View, Digest),
51    pub(super) response: oneshot::Sender<Digest>,
52    pub(super) round: Round,
53}
54
55impl From<Propose> for Message {
56    fn from(value: Propose) -> Self {
57        Self::Propose(value)
58    }
59}
60
61pub(super) struct Broadcast {
62    pub(super) payload: Digest,
63}
64
65impl From<Broadcast> for Message {
66    fn from(value: Broadcast) -> Self {
67        Self::Broadcast(value)
68    }
69}
70
71pub(super) struct Verify {
72    pub(super) parent: (View, Digest),
73    pub(super) payload: Digest,
74    pub(super) proposer: PublicKey,
75    pub(super) response: oneshot::Sender<bool>,
76    pub(super) round: Round,
77}
78
79impl From<Verify> for Message {
80    fn from(value: Verify) -> Self {
81        Self::Verify(Box::new(value))
82    }
83}
84
85/// A finalization forwarded from the marshal actor to the application's
86/// executor actor.
87///
88/// This enum unwraps `Update<Block>` into this `Finalized` enum so that
89/// a `response` channel is attached to a block-finalization.
90///
91/// The reason is that the marshal actor expects blocks finzalitions to be
92/// acknowledged by the application.
93///
94/// Updated tips on the other hand are fire-and-forget.
95#[derive(Debug)]
96pub(super) struct Finalized {
97    pub(super) inner: Update<Block, Exact>,
98}
99
100impl From<Finalized> for Message {
101    fn from(value: Finalized) -> Self {
102        Self::Finalized(value.into())
103    }
104}
105
106impl Automaton for Mailbox {
107    type Context = Context<Self::Digest, PublicKey>;
108
109    type Digest = Digest;
110
111    async fn genesis(&mut self, epoch: Epoch) -> Self::Digest {
112        let (tx, rx) = oneshot::channel();
113        // TODO: panicking here really is not good. there's actually no requirement on `Self::Context` nor `Self::Digest` to fulfill
114        // any invariants, so we could just turn them into `Result<Context, Error>` and be happy.
115        self.inner
116            .send(
117                Genesis {
118                    epoch,
119                    response: tx,
120                }
121                .into(),
122            )
123            .await
124            .expect("application is present and ready to receive genesis");
125        rx.await
126            .expect("application returns the digest of the genesis")
127    }
128
129    async fn propose(&mut self, context: Self::Context) -> oneshot::Receiver<Self::Digest> {
130        // TODO: panicking here really is not good. there's actually no requirement on `Self::Context` nor `Self::Digest` to fulfill
131        // any invariants, so we could just turn them into `Result<Context, Error>` and be happy.
132        //
133        // XXX: comment taken from alto - what does this mean? is this relevant to us?
134        // > If we linked payloads to their parent, we would verify
135        // > the parent included in the payload matches the provided `Context`.
136        let (tx, rx) = oneshot::channel();
137        self.inner
138            .send(
139                Propose {
140                    parent: context.parent,
141                    response: tx,
142                    round: context.round,
143                }
144                .into(),
145            )
146            .await
147            .expect("application is present and ready to receive proposals");
148        rx
149    }
150
151    async fn verify(
152        &mut self,
153        context: Self::Context,
154        payload: Self::Digest,
155    ) -> oneshot::Receiver<bool> {
156        // TODO: panicking here really is not good. there's actually no requirement on `Self::Context` nor `Self::Digest` to fulfill
157        // any invariants, so we could just turn them into `Result<Context, Error>` and be happy.
158        //
159        // XXX: comment taken from alto - what does this mean? is this relevant to us?
160        // > If we linked payloads to their parent, we would verify
161        // > the parent included in the payload matches the provided `Context`.
162        let (tx, rx) = oneshot::channel();
163        self.inner
164            .send(
165                Verify {
166                    parent: context.parent,
167                    payload,
168                    proposer: context.leader,
169                    round: context.round,
170                    response: tx,
171                }
172                .into(),
173            )
174            .await
175            .expect("application is present and ready to receive verify requests");
176        rx
177    }
178}
179
180impl Relay for Mailbox {
181    type Digest = Digest;
182
183    async fn broadcast(&mut self, digest: Self::Digest) {
184        // TODO: panicking here is really not necessary. Just log at the ERROR or WARN levels instead?
185        self.inner
186            .send(Broadcast { payload: digest }.into())
187            .await
188            .expect("application is present and ready to receive broadcasts");
189    }
190}
191
192impl Reporter for Mailbox {
193    type Activity = Update<Block>;
194
195    async fn report(&mut self, update: Self::Activity) {
196        // TODO: panicking here is really not necessary. Just log at the ERROR or WARN levels instead?
197        self.inner
198            .send(Finalized { inner: update }.into())
199            .await
200            .expect("application is present and ready to receive broadcasts");
201    }
202}