Skip to main content

tempo_consensus/consensus/application/
ingress.rs

1use commonware_consensus::{
2    Automaton, CertifiableAutomaton, Relay,
3    simplex::{Plan, types::Context},
4    types::{Epoch, Round, View},
5};
6
7use commonware_cryptography::ed25519::PublicKey;
8use commonware_utils::channel::oneshot;
9use futures::{SinkExt as _, channel::mpsc};
10use std::time::Instant;
11
12use crate::consensus::Digest;
13
14#[derive(Clone)]
15pub(crate) struct Mailbox {
16    inner: mpsc::Sender<Message>,
17}
18
19impl Mailbox {
20    pub(super) fn from_sender(inner: mpsc::Sender<Message>) -> Self {
21        Self { inner }
22    }
23}
24
25/// Messages forwarded from consensus to application.
26// TODO: add trace spans into all of these messages.
27pub(super) enum Message {
28    Broadcast(Box<Broadcast>),
29    Genesis(Genesis),
30    Propose(Box<Propose>),
31    Verify(Box<Verify>),
32}
33
34pub(super) struct Genesis {
35    pub(super) epoch: Epoch,
36    pub(super) response: oneshot::Sender<Digest>,
37}
38
39impl From<Genesis> for Message {
40    fn from(value: Genesis) -> Self {
41        Self::Genesis(value)
42    }
43}
44
45pub(super) struct Propose {
46    pub(super) parent: (View, Digest),
47    pub(super) response: oneshot::Sender<Digest>,
48    pub(super) round: Round,
49    pub(super) leader: PublicKey,
50    pub(super) started_at: Instant,
51}
52
53impl From<Propose> for Message {
54    fn from(value: Propose) -> Self {
55        Self::Propose(Box::new(value))
56    }
57}
58
59pub(super) struct Broadcast {
60    pub(super) digest: Digest,
61    pub(super) plan: Plan<PublicKey>,
62}
63
64impl From<Broadcast> for Message {
65    fn from(value: Broadcast) -> Self {
66        Self::Broadcast(Box::new(value))
67    }
68}
69
70pub(super) struct Verify {
71    pub(super) parent: (View, Digest),
72    pub(super) payload: Digest,
73    pub(super) proposer: PublicKey,
74    pub(super) response: oneshot::Sender<bool>,
75    pub(super) round: Round,
76}
77
78impl From<Verify> for Message {
79    fn from(value: Verify) -> Self {
80        Self::Verify(Box::new(value))
81    }
82}
83
84impl Automaton for Mailbox {
85    type Context = Context<Self::Digest, PublicKey>;
86
87    type Digest = Digest;
88
89    async fn genesis(&mut self, epoch: Epoch) -> Self::Digest {
90        let (tx, rx) = oneshot::channel();
91        // XXX: Cannot propagate the error upstream because of the trait def.
92        // But if the actor no longer responds the application is dead.
93        self.inner
94            .send(
95                Genesis {
96                    epoch,
97                    response: tx,
98                }
99                .into(),
100            )
101            .await
102            .expect("application is present and ready to receive genesis");
103        rx.await
104            .expect("application returns the digest of the genesis")
105    }
106
107    async fn propose(&mut self, context: Self::Context) -> oneshot::Receiver<Self::Digest> {
108        // XXX: Cannot propagate the error upstream because of the trait def.
109        // But if the actor no longer responds the application is dead.
110        let (tx, rx) = oneshot::channel();
111        self.inner
112            .send(
113                Propose {
114                    parent: context.parent,
115                    response: tx,
116                    round: context.round,
117                    leader: context.leader,
118                    started_at: Instant::now(),
119                }
120                .into(),
121            )
122            .await
123            .expect("application is present and ready to receive proposals");
124        rx
125    }
126
127    async fn verify(
128        &mut self,
129        context: Self::Context,
130        payload: Self::Digest,
131    ) -> oneshot::Receiver<bool> {
132        // XXX: Cannot propagate the error upstream because of the trait def.
133        // But if the actor no longer responds the application is dead.
134        let (tx, rx) = oneshot::channel();
135        self.inner
136            .send(
137                Verify {
138                    parent: context.parent,
139                    payload,
140                    proposer: context.leader,
141                    round: context.round,
142                    response: tx,
143                }
144                .into(),
145            )
146            .await
147            .expect("application is present and ready to receive verify requests");
148        rx
149    }
150}
151
152// TODO: figure out if this can be useful for tempo. The original PR implementing
153// this trait:
154// https://github.com/commonwarexyz/monorepo/pull/2565
155// Associated issue:
156// https://github.com/commonwarexyz/monorepo/issues/1767
157impl CertifiableAutomaton for Mailbox {
158    // NOTE: uses the default impl for CertifiableAutomaton which always
159    // returns true.
160}
161
162impl Relay for Mailbox {
163    type Digest = Digest;
164    type PublicKey = PublicKey;
165    type Plan = commonware_consensus::simplex::Plan<PublicKey>;
166
167    async fn broadcast(&mut self, digest: Self::Digest, plan: Self::Plan) {
168        // TODO: panicking here is really not necessary. Just log at the ERROR or WARN levels instead?
169        self.inner
170            .send(Broadcast { digest, plan }.into())
171            .await
172            .expect("application is present and ready to receive broadcasts");
173    }
174}