Skip to main content

tempo_commonware_node/consensus/application/
ingress.rs

1use commonware_consensus::{
2    Automaton, CertifiableAutomaton, Relay,
3    simplex::{Plan, types::Context},
4    types::{Epoch, Round, View},
5};
6
7use commonware_cryptography::ed25519::PublicKey;
8use commonware_utils::channel::oneshot;
9use futures::{SinkExt as _, channel::mpsc};
10
11use crate::consensus::Digest;
12
13#[derive(Clone)]
14pub(crate) struct Mailbox {
15    inner: mpsc::Sender<Message>,
16}
17
18impl Mailbox {
19    pub(super) fn from_sender(inner: mpsc::Sender<Message>) -> Self {
20        Self { inner }
21    }
22}
23
24/// Messages forwarded from consensus to application.
25// TODO: add trace spans into all of these messages.
26pub(super) enum Message {
27    Broadcast(Box<Broadcast>),
28    Genesis(Genesis),
29    Propose(Box<Propose>),
30    Verify(Box<Verify>),
31}
32
33pub(super) struct Genesis {
34    pub(super) epoch: Epoch,
35    pub(super) response: oneshot::Sender<Digest>,
36}
37
38impl From<Genesis> for Message {
39    fn from(value: Genesis) -> Self {
40        Self::Genesis(value)
41    }
42}
43
44pub(super) struct Propose {
45    pub(super) parent: (View, Digest),
46    pub(super) response: oneshot::Sender<Digest>,
47    pub(super) round: Round,
48    pub(super) leader: PublicKey,
49}
50
51impl From<Propose> for Message {
52    fn from(value: Propose) -> Self {
53        Self::Propose(Box::new(value))
54    }
55}
56
57pub(super) struct Broadcast {
58    pub(super) digest: Digest,
59    pub(super) plan: Plan<PublicKey>,
60}
61
62impl From<Broadcast> for Message {
63    fn from(value: Broadcast) -> Self {
64        Self::Broadcast(Box::new(value))
65    }
66}
67
68pub(super) struct Verify {
69    pub(super) parent: (View, Digest),
70    pub(super) payload: Digest,
71    pub(super) proposer: PublicKey,
72    pub(super) response: oneshot::Sender<bool>,
73    pub(super) round: Round,
74}
75
76impl From<Verify> for Message {
77    fn from(value: Verify) -> Self {
78        Self::Verify(Box::new(value))
79    }
80}
81
82impl Automaton for Mailbox {
83    type Context = Context<Self::Digest, PublicKey>;
84
85    type Digest = Digest;
86
87    async fn genesis(&mut self, epoch: Epoch) -> Self::Digest {
88        let (tx, rx) = oneshot::channel();
89        // XXX: Cannot propagate the error upstream because of the trait def.
90        // But if the actor no longer responds the application is dead.
91        self.inner
92            .send(
93                Genesis {
94                    epoch,
95                    response: tx,
96                }
97                .into(),
98            )
99            .await
100            .expect("application is present and ready to receive genesis");
101        rx.await
102            .expect("application returns the digest of the genesis")
103    }
104
105    async fn propose(&mut self, context: Self::Context) -> oneshot::Receiver<Self::Digest> {
106        // XXX: Cannot propagate the error upstream because of the trait def.
107        // But if the actor no longer responds the application is dead.
108        let (tx, rx) = oneshot::channel();
109        self.inner
110            .send(
111                Propose {
112                    parent: context.parent,
113                    response: tx,
114                    round: context.round,
115                    leader: context.leader,
116                }
117                .into(),
118            )
119            .await
120            .expect("application is present and ready to receive proposals");
121        rx
122    }
123
124    async fn verify(
125        &mut self,
126        context: Self::Context,
127        payload: Self::Digest,
128    ) -> oneshot::Receiver<bool> {
129        // XXX: Cannot propagate the error upstream because of the trait def.
130        // But if the actor no longer responds the application is dead.
131        let (tx, rx) = oneshot::channel();
132        self.inner
133            .send(
134                Verify {
135                    parent: context.parent,
136                    payload,
137                    proposer: context.leader,
138                    round: context.round,
139                    response: tx,
140                }
141                .into(),
142            )
143            .await
144            .expect("application is present and ready to receive verify requests");
145        rx
146    }
147}
148
149// TODO: figure out if this can be useful for tempo. The original PR implementing
150// this trait:
151// https://github.com/commonwarexyz/monorepo/pull/2565
152// Associated issue:
153// https://github.com/commonwarexyz/monorepo/issues/1767
154impl CertifiableAutomaton for Mailbox {
155    // NOTE: uses the default impl for CertifiableAutomaton which always
156    // returns true.
157}
158
159impl Relay for Mailbox {
160    type Digest = Digest;
161    type PublicKey = PublicKey;
162    type Plan = commonware_consensus::simplex::Plan<PublicKey>;
163
164    async fn broadcast(&mut self, digest: Self::Digest, plan: Self::Plan) {
165        // TODO: panicking here is really not necessary. Just log at the ERROR or WARN levels instead?
166        self.inner
167            .send(Broadcast { digest, plan }.into())
168            .await
169            .expect("application is present and ready to receive broadcasts");
170    }
171}