Skip to main content

tempo_commonware_node/executor/
ingress.rs

1use alloy_rpc_types_engine::PayloadId;
2use commonware_consensus::{Reporter, marshal::Update, types::Height};
3use eyre::WrapErr as _;
4use futures::{
5    SinkExt as _,
6    channel::{mpsc, oneshot},
7};
8use tempo_payload_types::TempoPayloadAttributes;
9use tracing::Span;
10
11use crate::consensus::{Digest, block::Block};
12
13#[derive(Clone, Debug)]
14pub(crate) struct Mailbox {
15    pub(super) inner: mpsc::UnboundedSender<Message>,
16}
17
18impl Mailbox {
19    /// Requests the agent to update the head of the canonical chain to `digest`.
20    pub(crate) async fn canonicalize_head(
21        &self,
22        height: Height,
23        digest: Digest,
24    ) -> eyre::Result<()> {
25        let (response, rx) = oneshot::channel();
26        self.inner
27            .unbounded_send(Message::in_current_span(CanonicalizeHead {
28                height,
29                digest,
30                response,
31            }))
32            .wrap_err("failed sending canonicalize request to agent, this means it exited")?;
33        rx.await
34            .wrap_err("executor dropped response")
35            .and_then(|res| res)
36    }
37
38    /// Canonicalizes the given head and requests a new payload to be built.
39    pub(crate) fn canonicalize_and_build(
40        &self,
41        height: Height,
42        digest: Digest,
43        attributes: TempoPayloadAttributes,
44    ) -> eyre::Result<oneshot::Receiver<eyre::Result<PayloadId>>> {
45        let (response, rx) = oneshot::channel();
46        self.inner
47            .unbounded_send(Message::in_current_span(CanonicalizeAndBuild {
48                height,
49                digest,
50                attributes: Box::new(attributes),
51                response,
52            }))
53            .wrap_err(
54                "failed sending canonicalize and build request to agent, this means it exited",
55            )?;
56        Ok(rx)
57    }
58}
59
60#[derive(Debug)]
61pub(super) struct Message {
62    pub(super) cause: Span,
63    pub(super) command: Command,
64}
65
66impl Message {
67    fn in_current_span(command: impl Into<Command>) -> Self {
68        Self {
69            cause: Span::current(),
70            command: command.into(),
71        }
72    }
73}
74
75#[derive(Debug)]
76pub(super) enum Command {
77    /// Requests the agent to set the head of the canonical chain to `digest`.
78    CanonicalizeHead(CanonicalizeHead),
79    /// Requests the agent to canonicalize the head and build a new payload.
80    CanonicalizeAndBuild(CanonicalizeAndBuild),
81    /// Requests the agent to forward a finalization event to the execution layer.
82    Finalize(Box<Update<Block>>),
83}
84
85#[derive(Debug)]
86pub(super) struct CanonicalizeHead {
87    pub(super) height: Height,
88    pub(super) digest: Digest,
89    pub(super) response: oneshot::Sender<eyre::Result<()>>,
90}
91
92#[derive(Debug)]
93pub(super) struct CanonicalizeAndBuild {
94    pub(super) height: Height,
95    pub(super) digest: Digest,
96    pub(super) attributes: Box<TempoPayloadAttributes>,
97    pub(super) response: oneshot::Sender<eyre::Result<PayloadId>>,
98}
99
100impl From<CanonicalizeHead> for Command {
101    fn from(value: CanonicalizeHead) -> Self {
102        Self::CanonicalizeHead(value)
103    }
104}
105
106impl From<CanonicalizeAndBuild> for Command {
107    fn from(value: CanonicalizeAndBuild) -> Self {
108        Self::CanonicalizeAndBuild(value)
109    }
110}
111
112impl From<Update<Block>> for Command {
113    fn from(value: Update<Block>) -> Self {
114        Self::Finalize(value.into())
115    }
116}
117
118impl Reporter for Mailbox {
119    type Activity = Update<Block>;
120
121    async fn report(&mut self, update: Self::Activity) {
122        self.inner
123            .send(Message::in_current_span(update))
124            .await
125            .expect("actor is present and ready to receive broadcasts");
126    }
127}