tempo_commonware_node/epoch/manager/
ingress.rs1use commonware_consensus::{Reporter, marshal::Update, types::Epoch};
2use commonware_cryptography::{
3 bls12381::primitives::{group::Share, sharing::Sharing, variant::MinSig},
4 ed25519::PublicKey,
5};
6use commonware_utils::ordered;
7use eyre::WrapErr as _;
8use futures::channel::mpsc;
9use tracing::{Span, error};
10
11use crate::consensus::block::Block;
12
13#[derive(Clone, Debug)]
14pub(crate) struct Mailbox {
15 inner: mpsc::UnboundedSender<Message>,
16}
17
18impl Mailbox {
19 pub(super) fn new(inner: mpsc::UnboundedSender<Message>) -> Self {
20 Self { inner }
21 }
22
23 pub(crate) fn enter(
24 &mut self,
25 epoch: Epoch,
26 public: Sharing<MinSig>,
27 share: Option<Share>,
28 participants: ordered::Set<PublicKey>,
29 ) -> eyre::Result<()> {
30 self.inner
31 .unbounded_send(Message::in_current_span(EpochTransition {
32 epoch,
33 public,
34 share,
35 participants,
36 }))
37 .wrap_err("epoch manager no longer running")
38 }
39
40 pub(crate) fn exit(&mut self, epoch: Epoch) -> eyre::Result<()> {
41 self.inner
42 .unbounded_send(Message::in_current_span(Exit { epoch }))
43 .wrap_err("epoch manager no longer running")
44 }
45}
46
47#[derive(Debug)]
48pub(super) struct Message {
49 pub(super) cause: Span,
50 pub(super) content: Content,
51}
52
53impl Message {
54 fn in_current_span(activity: impl Into<Content>) -> Self {
55 Self {
56 cause: Span::current(),
57 content: activity.into(),
58 }
59 }
60}
61
62#[derive(Debug)]
63pub(super) enum Content {
64 Enter(EpochTransition),
65 Exit(Exit),
66 Update(Box<Update<Block>>),
67}
68
69impl From<EpochTransition> for Content {
70 fn from(value: EpochTransition) -> Self {
71 Self::Enter(value)
72 }
73}
74
75impl From<Exit> for Content {
76 fn from(value: Exit) -> Self {
77 Self::Exit(value)
78 }
79}
80
81impl From<Update<Block>> for Content {
82 fn from(value: Update<Block>) -> Self {
83 Self::Update(Box::new(value))
84 }
85}
86
87#[derive(Debug)]
88pub(super) struct EpochTransition {
89 pub(super) epoch: Epoch,
90 pub(super) public: Sharing<MinSig>,
91 pub(super) share: Option<Share>,
92 pub(super) participants: ordered::Set<PublicKey>,
93}
94
95#[derive(Debug)]
96pub(super) struct Exit {
97 pub(super) epoch: Epoch,
98}
99
100impl Reporter for Mailbox {
101 type Activity = Update<Block>;
102
103 async fn report(&mut self, activity: Self::Activity) {
104 if self
105 .inner
106 .unbounded_send(Message::in_current_span(activity))
107 .is_err()
108 {
109 error!(
110 "failed sending finalization activity to epoch manager because \
111 it is no longer running"
112 );
113 }
114 }
115}