Skip to main content

tempo_commonware_node/executor/
ingress.rs

1use commonware_consensus::{Reporter, marshal::Update, types::Height};
2use eyre::{WrapErr as _, eyre};
3use futures::{
4    SinkExt as _,
5    channel::{mpsc, oneshot},
6};
7use tracing::Span;
8
9use crate::consensus::{Digest, block::Block};
10
11#[derive(Clone, Debug)]
12pub(crate) struct Mailbox {
13    pub(super) inner: mpsc::UnboundedSender<Message>,
14}
15
16impl Mailbox {
17    /// Requests the agent to update the head of the canonical chain to `digest`.
18    pub(crate) fn canonicalize_head(
19        &self,
20        height: Height,
21        digest: Digest,
22    ) -> eyre::Result<oneshot::Receiver<()>> {
23        let (tx, rx) = oneshot::channel();
24        self.inner
25            .unbounded_send(Message::in_current_span(CanonicalizeHead {
26                height,
27                digest,
28                ack: tx,
29            }))
30            .wrap_err("failed sending canonicalize request to agent, this means it exited")?;
31
32        Ok(rx)
33    }
34
35    pub(crate) async fn subscribe_finalized(&self, height: Height) -> eyre::Result<()> {
36        let (response, rx) = oneshot::channel();
37        self.inner
38            .unbounded_send(Message::in_current_span(SubscribeFinalized {
39                height,
40                response,
41            }))
42            .wrap_err(
43                "failed sending subscribe finalized request to agent, this means it exited",
44            )?;
45        rx.await.map_err(|_| eyre!("actor dropped response"))
46    }
47}
48
49#[derive(Debug)]
50pub(super) struct Message {
51    pub(super) cause: Span,
52    pub(super) command: Command,
53}
54
55impl Message {
56    fn in_current_span(command: impl Into<Command>) -> Self {
57        Self {
58            cause: Span::current(),
59            command: command.into(),
60        }
61    }
62}
63
64#[derive(Debug)]
65pub(super) enum Command {
66    /// Requests the agent to set the head of the canonical chain to `digest`.
67    CanonicalizeHead(CanonicalizeHead),
68    /// Requests the agent to forward a finalization event to the execution layer.
69    Finalize(Box<Update<Block>>),
70    /// Returns once the executor actor has finalized the block at `height`.
71    SubscribeFinalized(SubscribeFinalized),
72}
73
74#[derive(Debug)]
75pub(super) struct CanonicalizeHead {
76    pub(super) height: Height,
77    pub(super) digest: Digest,
78    pub(super) ack: oneshot::Sender<()>,
79}
80
81#[derive(Debug)]
82pub(super) struct SubscribeFinalized {
83    pub(super) height: Height,
84    pub(super) response: oneshot::Sender<()>,
85}
86
87impl From<CanonicalizeHead> for Command {
88    fn from(value: CanonicalizeHead) -> Self {
89        Self::CanonicalizeHead(value)
90    }
91}
92
93impl From<Update<Block>> for Command {
94    fn from(value: Update<Block>) -> Self {
95        Self::Finalize(value.into())
96    }
97}
98
99impl From<SubscribeFinalized> for Command {
100    fn from(value: SubscribeFinalized) -> Self {
101        Self::SubscribeFinalized(value)
102    }
103}
104
105impl Reporter for Mailbox {
106    type Activity = Update<Block>;
107
108    async fn report(&mut self, update: Self::Activity) {
109        self.inner
110            .send(Message::in_current_span(update))
111            .await
112            .expect("actor is present and ready to receive broadcasts");
113    }
114}