tempo_commonware_node/peer_manager/
ingress.rs1use commonware_consensus::{Reporter, marshal::Update};
2use commonware_p2p::{Address, AddressableManager, Provider};
3use commonware_utils::ordered::{Map, Set};
4use eyre::WrapErr as _;
5use futures::channel::{mpsc, oneshot};
6use tracing::{Span, error};
7
8type SubscribeReceiver =
9 commonware_utils::channel::mpsc::UnboundedReceiver<(u64, Set<PublicKey>, Set<PublicKey>)>;
10
11use commonware_cryptography::ed25519::PublicKey;
12
13use crate::consensus::block::Block;
14
15#[derive(Clone, Debug)]
16pub(crate) struct Mailbox {
17 inner: mpsc::UnboundedSender<MessageWithCause>,
18}
19
20impl Mailbox {
21 pub(super) fn new(inner: mpsc::UnboundedSender<MessageWithCause>) -> Self {
22 Self { inner }
23 }
24}
25
26pub(super) struct MessageWithCause {
27 pub(super) cause: Span,
28 pub(super) message: Message,
29}
30
31impl MessageWithCause {
32 fn in_current_span(cmd: impl Into<Message>) -> Self {
33 Self {
34 cause: Span::current(),
35 message: cmd.into(),
36 }
37 }
38}
39
40pub(super) enum Message {
41 Track {
42 id: u64,
43 peers: Map<PublicKey, Address>,
44 },
45 Overwrite {
46 peers: Map<PublicKey, Address>,
47 },
48 PeerSet {
49 id: u64,
50 response: oneshot::Sender<Option<Set<PublicKey>>>,
51 },
52 Subscribe {
53 response: oneshot::Sender<SubscribeReceiver>,
54 },
55 Finalized(Box<Update<Block>>),
56}
57
58impl From<Update<Block>> for Message {
59 fn from(value: Update<Block>) -> Self {
60 Self::Finalized(Box::new(value))
61 }
62}
63
64impl Provider for Mailbox {
65 type PublicKey = PublicKey;
66
67 async fn peer_set(&mut self, id: u64) -> Option<Set<Self::PublicKey>> {
68 let (tx, rx) = oneshot::channel();
69 if let Err(error) =
70 self.inner
71 .unbounded_send(MessageWithCause::in_current_span(Message::PeerSet {
72 id,
73 response: tx,
74 }))
75 {
76 error!(%error, "failed to send message to peer_manager");
77 return None;
78 }
79 rx.await.ok().flatten()
80 }
81
82 async fn subscribe(
83 &mut self,
84 ) -> commonware_utils::channel::mpsc::UnboundedReceiver<(
85 u64,
86 Set<Self::PublicKey>,
87 Set<Self::PublicKey>,
88 )> {
89 let (tx, rx) = oneshot::channel();
90
91 let (_, fallback_rx) = commonware_utils::channel::mpsc::unbounded_channel();
92
93 if let Err(error) =
94 self.inner
95 .unbounded_send(MessageWithCause::in_current_span(Message::Subscribe {
96 response: tx,
97 }))
98 {
99 error!(%error, "failed to send message to peer_manager");
100 return fallback_rx;
101 }
102
103 if let Ok(subscription) = rx.await {
104 return subscription;
105 }
106
107 error!(
108 error = "actor dropped channel before returning subscription",
109 "failed to send message to peer_manager",
110 );
111
112 fallback_rx
113 }
114}
115
116impl AddressableManager for Mailbox {
117 async fn track(&mut self, id: u64, peers: Map<Self::PublicKey, Address>) {
118 if let Err(error) = self
119 .inner
120 .unbounded_send(MessageWithCause::in_current_span(Message::Track {
121 id,
122 peers,
123 }))
124 .wrap_err("actor no longer running")
125 {
126 error!(%error, "failed to send message to peer_manager");
127 }
128 }
129
130 async fn overwrite(&mut self, peers: Map<Self::PublicKey, Address>) {
131 if let Err(error) = self
132 .inner
133 .unbounded_send(MessageWithCause::in_current_span(Message::Overwrite {
134 peers,
135 }))
136 .wrap_err("actor no longer running")
137 {
138 error!(%error, "failed to send message to peer_manager");
139 }
140 }
141}
142
143impl Reporter for Mailbox {
144 type Activity = Update<Block>;
145
146 async fn report(&mut self, activity: Self::Activity) {
147 if let Err(error) = self
148 .inner
149 .unbounded_send(MessageWithCause::in_current_span(activity))
150 {
151 error!(%error, "failed to send message to peer_manager");
152 }
153 }
154}