Skip to main content

tempo_consensus/executor/
ingress.rs

1use commonware_consensus::{Reporter, marshal::Update, types::Height};
2use eyre::WrapErr as _;
3use futures::{
4    SinkExt as _,
5    channel::{mpsc, oneshot},
6};
7use tempo_payload_types::{TempoBuiltPayload, TempoPayloadAttributes};
8use tracing::Span;
9
10use crate::consensus::{Digest, block::Block};
11
12#[derive(Clone, Debug)]
13pub(crate) struct Mailbox {
14    pub(super) inner: mpsc::UnboundedSender<Message>,
15}
16
17impl Mailbox {
18    /// Requests the agent to update the head of the canonical chain to `digest`.
19    pub(crate) async fn canonicalize_head(
20        &self,
21        height: Height,
22        digest: Digest,
23    ) -> eyre::Result<()> {
24        let (response, rx) = oneshot::channel();
25        self.inner
26            .unbounded_send(Message::in_current_span(CanonicalizeHead {
27                height,
28                digest,
29                response,
30            }))
31            .wrap_err("failed sending canonicalize request to agent, this means it exited")?;
32        rx.await.wrap_err(
33            "executor dropped the response channel: the forkchoice update \
34            failed (the executor logs the cause) or the executor shut down",
35        )
36    }
37
38    /// Canonicalizes the given head and requests a new payload to be built.
39    ///
40    /// The built payload is delivered on the returned channel once the
41    /// execution layer finishes constructing it. The receiver may be dropped
42    /// to signal that the payload is no longer wanted, whereupon the executor
43    /// will drop the payload job.
44    ///
45    /// Conversely, the executor dropping its sender means the build failed;
46    /// the executor logs the cause.
47    pub(crate) fn canonicalize_and_build(
48        &self,
49        height: Height,
50        digest: Digest,
51        attributes: TempoPayloadAttributes,
52    ) -> eyre::Result<oneshot::Receiver<TempoBuiltPayload>> {
53        let (response, rx) = oneshot::channel();
54        self.inner
55            .unbounded_send(Message::in_current_span(CanonicalizeAndBuild {
56                height,
57                digest,
58                attributes: Box::new(attributes),
59                response,
60            }))
61            .wrap_err(
62                "failed sending canonicalize and build request to agent, this means it exited",
63            )?;
64        Ok(rx)
65    }
66}
67
68#[derive(Debug)]
69pub(super) struct Message {
70    pub(super) cause: Span,
71    pub(super) command: Command,
72}
73
74impl Message {
75    fn in_current_span(command: impl Into<Command>) -> Self {
76        Self {
77            cause: Span::current(),
78            command: command.into(),
79        }
80    }
81}
82
83#[derive(Debug)]
84pub(super) enum Command {
85    /// Requests the agent to set the head of the canonical chain to `digest`.
86    CanonicalizeHead(CanonicalizeHead),
87    /// Requests the agent to canonicalize the head and build a new payload.
88    CanonicalizeAndBuild(CanonicalizeAndBuild),
89    /// Requests the agent to forward a finalization event to the execution layer.
90    Finalize(Box<Update<Block>>),
91}
92
93#[derive(Debug)]
94pub(super) struct CanonicalizeHead {
95    pub(super) height: Height,
96    pub(super) digest: Digest,
97    pub(super) response: oneshot::Sender<()>,
98}
99
100#[derive(Debug)]
101pub(super) struct CanonicalizeAndBuild {
102    pub(super) height: Height,
103    pub(super) digest: Digest,
104    pub(super) attributes: Box<TempoPayloadAttributes>,
105    pub(super) response: oneshot::Sender<TempoBuiltPayload>,
106}
107
108impl From<CanonicalizeHead> for Command {
109    fn from(value: CanonicalizeHead) -> Self {
110        Self::CanonicalizeHead(value)
111    }
112}
113
114impl From<CanonicalizeAndBuild> for Command {
115    fn from(value: CanonicalizeAndBuild) -> Self {
116        Self::CanonicalizeAndBuild(value)
117    }
118}
119
120impl From<Update<Block>> for Command {
121    fn from(value: Update<Block>) -> Self {
122        Self::Finalize(value.into())
123    }
124}
125
126impl Reporter for Mailbox {
127    type Activity = Update<Block>;
128
129    async fn report(&mut self, update: Self::Activity) {
130        self.inner
131            .send(Message::in_current_span(update))
132            .await
133            .expect("actor is present and ready to receive broadcasts");
134    }
135}