Skip to main content

tempo_commonware_node/epoch/manager/
ingress.rs

1use commonware_consensus::{Reporter, marshal::Update, types::Epoch};
2use commonware_cryptography::{
3    bls12381::primitives::{group::Share, sharing::Sharing, variant::MinSig},
4    ed25519::PublicKey,
5};
6use commonware_utils::ordered;
7use eyre::WrapErr as _;
8use futures::channel::mpsc;
9use tracing::{Span, error};
10
11use crate::consensus::block::Block;
12
13#[derive(Clone, Debug)]
14pub(crate) struct Mailbox {
15    inner: mpsc::UnboundedSender<Message>,
16}
17
18impl Mailbox {
19    pub(super) fn new(inner: mpsc::UnboundedSender<Message>) -> Self {
20        Self { inner }
21    }
22
23    pub(crate) fn enter(
24        &mut self,
25        epoch: Epoch,
26        public: Sharing<MinSig>,
27        share: Option<Share>,
28        participants: ordered::Set<PublicKey>,
29    ) -> eyre::Result<()> {
30        self.inner
31            .unbounded_send(Message::in_current_span(EpochTransition {
32                epoch,
33                public,
34                share,
35                participants,
36            }))
37            .wrap_err("epoch manager no longer running")
38    }
39
40    pub(crate) fn exit(&mut self, epoch: Epoch) -> eyre::Result<()> {
41        self.inner
42            .unbounded_send(Message::in_current_span(Exit { epoch }))
43            .wrap_err("epoch manager no longer running")
44    }
45}
46
47#[derive(Debug)]
48pub(super) struct Message {
49    pub(super) cause: Span,
50    pub(super) content: Content,
51}
52
53impl Message {
54    fn in_current_span(activity: impl Into<Content>) -> Self {
55        Self {
56            cause: Span::current(),
57            content: activity.into(),
58        }
59    }
60}
61
62#[derive(Debug)]
63pub(super) enum Content {
64    Enter(EpochTransition),
65    Exit(Exit),
66    Update(Box<Update<Block>>),
67}
68
69impl From<EpochTransition> for Content {
70    fn from(value: EpochTransition) -> Self {
71        Self::Enter(value)
72    }
73}
74
75impl From<Exit> for Content {
76    fn from(value: Exit) -> Self {
77        Self::Exit(value)
78    }
79}
80
81impl From<Update<Block>> for Content {
82    fn from(value: Update<Block>) -> Self {
83        Self::Update(Box::new(value))
84    }
85}
86
87#[derive(Debug)]
88pub(super) struct EpochTransition {
89    pub(super) epoch: Epoch,
90    pub(super) public: Sharing<MinSig>,
91    pub(super) share: Option<Share>,
92    pub(super) participants: ordered::Set<PublicKey>,
93}
94
95#[derive(Debug)]
96pub(super) struct Exit {
97    pub(super) epoch: Epoch,
98}
99
100impl Reporter for Mailbox {
101    type Activity = Update<Block>;
102
103    async fn report(&mut self, activity: Self::Activity) {
104        if self
105            .inner
106            .unbounded_send(Message::in_current_span(activity))
107            .is_err()
108        {
109            error!(
110                "failed sending finalization activity to epoch manager because \
111                it is no longer running"
112            );
113        }
114    }
115}