tempo_commonware_node/peer_manager/
ingress.rs1use commonware_consensus::{Reporter, marshal::Update};
2use commonware_p2p::{
3 Address, AddressableManager, AddressableTrackedPeers, PeerSetSubscription, Provider,
4 TrackedPeers,
5};
6use commonware_utils::ordered::Map;
7use eyre::WrapErr as _;
8use futures::channel::{mpsc, oneshot};
9use tracing::{Span, error};
10
11use commonware_cryptography::ed25519::PublicKey;
12
13use crate::consensus::block::Block;
14
15#[derive(Clone, Debug)]
16pub(crate) struct Mailbox {
17 inner: mpsc::UnboundedSender<MessageWithCause>,
18}
19
20impl Mailbox {
21 pub(super) fn new(inner: mpsc::UnboundedSender<MessageWithCause>) -> Self {
22 Self { inner }
23 }
24}
25
26pub(super) struct MessageWithCause {
27 pub(super) cause: Span,
28 pub(super) message: Message,
29}
30
31impl MessageWithCause {
32 fn in_current_span(cmd: impl Into<Message>) -> Self {
33 Self {
34 cause: Span::current(),
35 message: cmd.into(),
36 }
37 }
38}
39
40pub(super) enum Message {
41 Track {
42 id: u64,
43 peers: Map<PublicKey, Address>,
44 },
45 Overwrite {
46 peers: Map<PublicKey, Address>,
47 },
48 PeerSet {
49 id: u64,
50 response: oneshot::Sender<Option<TrackedPeers<PublicKey>>>,
51 },
52 Subscribe {
53 response: oneshot::Sender<PeerSetSubscription<PublicKey>>,
54 },
55 Finalized(Box<Update<Block>>),
56}
57
58impl From<Update<Block>> for Message {
59 fn from(value: Update<Block>) -> Self {
60 Self::Finalized(Box::new(value))
61 }
62}
63
64impl Provider for Mailbox {
65 type PublicKey = PublicKey;
66
67 async fn peer_set(&mut self, id: u64) -> Option<TrackedPeers<Self::PublicKey>> {
68 let (tx, rx) = oneshot::channel();
69 if let Err(error) =
70 self.inner
71 .unbounded_send(MessageWithCause::in_current_span(Message::PeerSet {
72 id,
73 response: tx,
74 }))
75 {
76 error!(%error, "failed to send message to peer_manager");
77 return None;
78 }
79 rx.await.ok().flatten()
80 }
81
82 async fn subscribe(&mut self) -> PeerSetSubscription<Self::PublicKey> {
83 let (tx, rx) = oneshot::channel();
84
85 let (_, fallback_rx) = commonware_utils::channel::mpsc::unbounded_channel();
86
87 if let Err(error) =
88 self.inner
89 .unbounded_send(MessageWithCause::in_current_span(Message::Subscribe {
90 response: tx,
91 }))
92 {
93 error!(%error, "failed to send message to peer_manager");
94 return fallback_rx;
95 }
96
97 if let Ok(subscription) = rx.await {
98 return subscription;
99 }
100
101 error!(
102 error = "actor dropped channel before returning subscription",
103 "failed to send message to peer_manager",
104 );
105
106 fallback_rx
107 }
108}
109
110impl AddressableManager for Mailbox {
111 async fn track<R>(&mut self, id: u64, peers: R)
112 where
113 R: Into<AddressableTrackedPeers<Self::PublicKey>> + Send,
114 {
115 let addressable: AddressableTrackedPeers<Self::PublicKey> = peers.into();
116 if let Err(error) = self
117 .inner
118 .unbounded_send(MessageWithCause::in_current_span(Message::Track {
119 id,
120 peers: addressable.primary,
121 }))
122 .wrap_err("actor no longer running")
123 {
124 error!(%error, "failed to send message to peer_manager");
125 }
126 }
127
128 async fn overwrite(&mut self, peers: Map<Self::PublicKey, Address>) {
129 if let Err(error) = self
130 .inner
131 .unbounded_send(MessageWithCause::in_current_span(Message::Overwrite {
132 peers,
133 }))
134 .wrap_err("actor no longer running")
135 {
136 error!(%error, "failed to send message to peer_manager");
137 }
138 }
139}
140
141impl Reporter for Mailbox {
142 type Activity = Update<Block>;
143
144 async fn report(&mut self, activity: Self::Activity) {
145 if let Err(error) = self
146 .inner
147 .unbounded_send(MessageWithCause::in_current_span(activity))
148 {
149 error!(%error, "failed to send message to peer_manager");
150 }
151 }
152}