tempo_commonware_node/executor/
ingress.rs1use commonware_consensus::{Reporter, marshal::Update, types::Height};
2use eyre::{WrapErr as _, eyre};
3use futures::{
4 SinkExt as _,
5 channel::{mpsc, oneshot},
6};
7use tracing::Span;
8
9use crate::consensus::{Digest, block::Block};
10
11#[derive(Clone, Debug)]
12pub(crate) struct Mailbox {
13 pub(super) inner: mpsc::UnboundedSender<Message>,
14}
15
16impl Mailbox {
17 pub(crate) fn canonicalize_head(
19 &self,
20 height: Height,
21 digest: Digest,
22 ) -> eyre::Result<oneshot::Receiver<()>> {
23 let (tx, rx) = oneshot::channel();
24 self.inner
25 .unbounded_send(Message::in_current_span(CanonicalizeHead {
26 height,
27 digest,
28 ack: tx,
29 }))
30 .wrap_err("failed sending canonicalize request to agent, this means it exited")?;
31
32 Ok(rx)
33 }
34
35 pub(crate) async fn subscribe_finalized(&self, height: Height) -> eyre::Result<()> {
36 let (response, rx) = oneshot::channel();
37 self.inner
38 .unbounded_send(Message::in_current_span(SubscribeFinalized {
39 height,
40 response,
41 }))
42 .wrap_err(
43 "failed sending subscribe finalized request to agent, this means it exited",
44 )?;
45 rx.await.map_err(|_| eyre!("actor dropped response"))
46 }
47}
48
49#[derive(Debug)]
50pub(super) struct Message {
51 pub(super) cause: Span,
52 pub(super) command: Command,
53}
54
55impl Message {
56 fn in_current_span(command: impl Into<Command>) -> Self {
57 Self {
58 cause: Span::current(),
59 command: command.into(),
60 }
61 }
62}
63
64#[derive(Debug)]
65pub(super) enum Command {
66 CanonicalizeHead(CanonicalizeHead),
68 Finalize(Box<Update<Block>>),
70 SubscribeFinalized(SubscribeFinalized),
72}
73
74#[derive(Debug)]
75pub(super) struct CanonicalizeHead {
76 pub(super) height: Height,
77 pub(super) digest: Digest,
78 pub(super) ack: oneshot::Sender<()>,
79}
80
81#[derive(Debug)]
82pub(super) struct SubscribeFinalized {
83 pub(super) height: Height,
84 pub(super) response: oneshot::Sender<()>,
85}
86
87impl From<CanonicalizeHead> for Command {
88 fn from(value: CanonicalizeHead) -> Self {
89 Self::CanonicalizeHead(value)
90 }
91}
92
93impl From<Update<Block>> for Command {
94 fn from(value: Update<Block>) -> Self {
95 Self::Finalize(value.into())
96 }
97}
98
99impl From<SubscribeFinalized> for Command {
100 fn from(value: SubscribeFinalized) -> Self {
101 Self::SubscribeFinalized(value)
102 }
103}
104
105impl Reporter for Mailbox {
106 type Activity = Update<Block>;
107
108 async fn report(&mut self, update: Self::Activity) {
109 self.inner
110 .send(Message::in_current_span(update))
111 .await
112 .expect("actor is present and ready to receive broadcasts");
113 }
114}