tempo_commonware_node/executor/
ingress.rs1use alloy_rpc_types_engine::PayloadId;
2use commonware_consensus::{Reporter, marshal::Update, types::Height};
3use eyre::WrapErr as _;
4use futures::{
5 SinkExt as _,
6 channel::{mpsc, oneshot},
7};
8use tempo_payload_types::TempoPayloadAttributes;
9use tracing::Span;
10
11use crate::consensus::{Digest, block::Block};
12
13#[derive(Clone, Debug)]
14pub(crate) struct Mailbox {
15 pub(super) inner: mpsc::UnboundedSender<Message>,
16}
17
18impl Mailbox {
19 pub(crate) async fn canonicalize_head(
21 &self,
22 height: Height,
23 digest: Digest,
24 ) -> eyre::Result<()> {
25 let (response, rx) = oneshot::channel();
26 self.inner
27 .unbounded_send(Message::in_current_span(CanonicalizeHead {
28 height,
29 digest,
30 response,
31 }))
32 .wrap_err("failed sending canonicalize request to agent, this means it exited")?;
33 rx.await
34 .wrap_err("executor dropped response")
35 .and_then(|res| res)
36 }
37
38 pub(crate) fn canonicalize_and_build(
40 &self,
41 height: Height,
42 digest: Digest,
43 attributes: TempoPayloadAttributes,
44 ) -> eyre::Result<oneshot::Receiver<eyre::Result<PayloadId>>> {
45 let (response, rx) = oneshot::channel();
46 self.inner
47 .unbounded_send(Message::in_current_span(CanonicalizeAndBuild {
48 height,
49 digest,
50 attributes: Box::new(attributes),
51 response,
52 }))
53 .wrap_err(
54 "failed sending canonicalize and build request to agent, this means it exited",
55 )?;
56 Ok(rx)
57 }
58}
59
60#[derive(Debug)]
61pub(super) struct Message {
62 pub(super) cause: Span,
63 pub(super) command: Command,
64}
65
66impl Message {
67 fn in_current_span(command: impl Into<Command>) -> Self {
68 Self {
69 cause: Span::current(),
70 command: command.into(),
71 }
72 }
73}
74
75#[derive(Debug)]
76pub(super) enum Command {
77 CanonicalizeHead(CanonicalizeHead),
79 CanonicalizeAndBuild(CanonicalizeAndBuild),
81 Finalize(Box<Update<Block>>),
83}
84
85#[derive(Debug)]
86pub(super) struct CanonicalizeHead {
87 pub(super) height: Height,
88 pub(super) digest: Digest,
89 pub(super) response: oneshot::Sender<eyre::Result<()>>,
90}
91
92#[derive(Debug)]
93pub(super) struct CanonicalizeAndBuild {
94 pub(super) height: Height,
95 pub(super) digest: Digest,
96 pub(super) attributes: Box<TempoPayloadAttributes>,
97 pub(super) response: oneshot::Sender<eyre::Result<PayloadId>>,
98}
99
100impl From<CanonicalizeHead> for Command {
101 fn from(value: CanonicalizeHead) -> Self {
102 Self::CanonicalizeHead(value)
103 }
104}
105
106impl From<CanonicalizeAndBuild> for Command {
107 fn from(value: CanonicalizeAndBuild) -> Self {
108 Self::CanonicalizeAndBuild(value)
109 }
110}
111
112impl From<Update<Block>> for Command {
113 fn from(value: Update<Block>) -> Self {
114 Self::Finalize(value.into())
115 }
116}
117
118impl Reporter for Mailbox {
119 type Activity = Update<Block>;
120
121 async fn report(&mut self, update: Self::Activity) {
122 self.inner
123 .send(Message::in_current_span(update))
124 .await
125 .expect("actor is present and ready to receive broadcasts");
126 }
127}