Skip to main content

tempo_commonware_node/consensus/application/
ingress.rs

1use commonware_consensus::{
2    Automaton, CertifiableAutomaton, Relay,
3    simplex::types::Context,
4    types::{Epoch, Round, View},
5};
6
7use commonware_cryptography::ed25519::PublicKey;
8use commonware_utils::channel::oneshot;
9use futures::{SinkExt as _, channel::mpsc};
10
11use crate::consensus::Digest;
12
13#[derive(Clone)]
14pub(crate) struct Mailbox {
15    inner: mpsc::Sender<Message>,
16}
17
18impl Mailbox {
19    pub(super) fn from_sender(inner: mpsc::Sender<Message>) -> Self {
20        Self { inner }
21    }
22}
23
24/// Messages forwarded from consensus to application.
25// TODO: add trace spans into all of these messages.
26pub(super) enum Message {
27    Broadcast(Broadcast),
28    Genesis(Genesis),
29    Propose(Propose),
30    Verify(Box<Verify>),
31}
32
33pub(super) struct Genesis {
34    pub(super) epoch: Epoch,
35    pub(super) response: oneshot::Sender<Digest>,
36}
37
38impl From<Genesis> for Message {
39    fn from(value: Genesis) -> Self {
40        Self::Genesis(value)
41    }
42}
43
44pub(super) struct Propose {
45    pub(super) parent: (View, Digest),
46    pub(super) response: oneshot::Sender<Digest>,
47    pub(super) round: Round,
48}
49
50impl From<Propose> for Message {
51    fn from(value: Propose) -> Self {
52        Self::Propose(value)
53    }
54}
55
56pub(super) struct Broadcast {
57    pub(super) payload: Digest,
58}
59
60impl From<Broadcast> for Message {
61    fn from(value: Broadcast) -> Self {
62        Self::Broadcast(value)
63    }
64}
65
66pub(super) struct Verify {
67    pub(super) parent: (View, Digest),
68    pub(super) payload: Digest,
69    pub(super) proposer: PublicKey,
70    pub(super) response: oneshot::Sender<bool>,
71    pub(super) round: Round,
72}
73
74impl From<Verify> for Message {
75    fn from(value: Verify) -> Self {
76        Self::Verify(Box::new(value))
77    }
78}
79
80impl Automaton for Mailbox {
81    type Context = Context<Self::Digest, PublicKey>;
82
83    type Digest = Digest;
84
85    async fn genesis(&mut self, epoch: Epoch) -> Self::Digest {
86        let (tx, rx) = oneshot::channel();
87        // XXX: Cannot propagate the error upstream because of the trait def.
88        // But if the actor no longer responds the application is dead.
89        self.inner
90            .send(
91                Genesis {
92                    epoch,
93                    response: tx,
94                }
95                .into(),
96            )
97            .await
98            .expect("application is present and ready to receive genesis");
99        rx.await
100            .expect("application returns the digest of the genesis")
101    }
102
103    async fn propose(&mut self, context: Self::Context) -> oneshot::Receiver<Self::Digest> {
104        // XXX: Cannot propagate the error upstream because of the trait def.
105        // But if the actor no longer responds the application is dead.
106        let (tx, rx) = oneshot::channel();
107        self.inner
108            .send(
109                Propose {
110                    parent: context.parent,
111                    response: tx,
112                    round: context.round,
113                }
114                .into(),
115            )
116            .await
117            .expect("application is present and ready to receive proposals");
118        rx
119    }
120
121    async fn verify(
122        &mut self,
123        context: Self::Context,
124        payload: Self::Digest,
125    ) -> oneshot::Receiver<bool> {
126        // XXX: Cannot propagate the error upstream because of the trait def.
127        // But if the actor no longer responds the application is dead.
128        let (tx, rx) = oneshot::channel();
129        self.inner
130            .send(
131                Verify {
132                    parent: context.parent,
133                    payload,
134                    proposer: context.leader,
135                    round: context.round,
136                    response: tx,
137                }
138                .into(),
139            )
140            .await
141            .expect("application is present and ready to receive verify requests");
142        rx
143    }
144}
145
146// TODO: figure out if this can be useful for tempo. The original PR implementing
147// this trait:
148// https://github.com/commonwarexyz/monorepo/pull/2565
149// Associated issue:
150// https://github.com/commonwarexyz/monorepo/issues/1767
151impl CertifiableAutomaton for Mailbox {
152    // NOTE: uses the default impl for CertifiableAutomaton which always
153    // returns true.
154}
155
156impl Relay for Mailbox {
157    type Digest = Digest;
158
159    async fn broadcast(&mut self, digest: Self::Digest) {
160        // TODO: panicking here is really not necessary. Just log at the ERROR or WARN levels instead?
161        self.inner
162            .send(Broadcast { payload: digest }.into())
163            .await
164            .expect("application is present and ready to receive broadcasts");
165    }
166}