tempo_consensus/executor/
ingress.rs1use commonware_consensus::{Reporter, marshal::Update, types::Height};
2use eyre::WrapErr as _;
3use futures::{
4 SinkExt as _,
5 channel::{mpsc, oneshot},
6};
7use tempo_payload_types::{TempoBuiltPayload, TempoPayloadAttributes};
8use tracing::Span;
9
10use crate::consensus::{Digest, block::Block};
11
12#[derive(Clone, Debug)]
13pub(crate) struct Mailbox {
14 pub(super) inner: mpsc::UnboundedSender<Message>,
15}
16
17impl Mailbox {
18 pub(crate) async fn canonicalize_head(
20 &self,
21 height: Height,
22 digest: Digest,
23 ) -> eyre::Result<()> {
24 let (response, rx) = oneshot::channel();
25 self.inner
26 .unbounded_send(Message::in_current_span(CanonicalizeHead {
27 height,
28 digest,
29 response,
30 }))
31 .wrap_err("failed sending canonicalize request to agent, this means it exited")?;
32 rx.await.wrap_err(
33 "executor dropped the response channel: the forkchoice update \
34 failed (the executor logs the cause) or the executor shut down",
35 )
36 }
37
38 pub(crate) fn canonicalize_and_build(
48 &self,
49 height: Height,
50 digest: Digest,
51 attributes: TempoPayloadAttributes,
52 ) -> eyre::Result<oneshot::Receiver<TempoBuiltPayload>> {
53 let (response, rx) = oneshot::channel();
54 self.inner
55 .unbounded_send(Message::in_current_span(CanonicalizeAndBuild {
56 height,
57 digest,
58 attributes: Box::new(attributes),
59 response,
60 }))
61 .wrap_err(
62 "failed sending canonicalize and build request to agent, this means it exited",
63 )?;
64 Ok(rx)
65 }
66}
67
68#[derive(Debug)]
69pub(super) struct Message {
70 pub(super) cause: Span,
71 pub(super) command: Command,
72}
73
74impl Message {
75 fn in_current_span(command: impl Into<Command>) -> Self {
76 Self {
77 cause: Span::current(),
78 command: command.into(),
79 }
80 }
81}
82
83#[derive(Debug)]
84pub(super) enum Command {
85 CanonicalizeHead(CanonicalizeHead),
87 CanonicalizeAndBuild(CanonicalizeAndBuild),
89 Finalize(Box<Update<Block>>),
91}
92
93#[derive(Debug)]
94pub(super) struct CanonicalizeHead {
95 pub(super) height: Height,
96 pub(super) digest: Digest,
97 pub(super) response: oneshot::Sender<()>,
98}
99
100#[derive(Debug)]
101pub(super) struct CanonicalizeAndBuild {
102 pub(super) height: Height,
103 pub(super) digest: Digest,
104 pub(super) attributes: Box<TempoPayloadAttributes>,
105 pub(super) response: oneshot::Sender<TempoBuiltPayload>,
106}
107
108impl From<CanonicalizeHead> for Command {
109 fn from(value: CanonicalizeHead) -> Self {
110 Self::CanonicalizeHead(value)
111 }
112}
113
114impl From<CanonicalizeAndBuild> for Command {
115 fn from(value: CanonicalizeAndBuild) -> Self {
116 Self::CanonicalizeAndBuild(value)
117 }
118}
119
120impl From<Update<Block>> for Command {
121 fn from(value: Update<Block>) -> Self {
122 Self::Finalize(value.into())
123 }
124}
125
126impl Reporter for Mailbox {
127 type Activity = Update<Block>;
128
129 async fn report(&mut self, update: Self::Activity) {
130 self.inner
131 .send(Message::in_current_span(update))
132 .await
133 .expect("actor is present and ready to receive broadcasts");
134 }
135}