Skip to main content

tempo_commonware_node/peer_manager/
ingress.rs

1use commonware_consensus::{Reporter, marshal::Update};
2use commonware_p2p::{
3    Address, AddressableManager, AddressableTrackedPeers, PeerSetSubscription, Provider,
4    TrackedPeers,
5};
6use commonware_utils::ordered::Map;
7use eyre::WrapErr as _;
8use futures::channel::{mpsc, oneshot};
9use tracing::{Span, error};
10
11use commonware_cryptography::ed25519::PublicKey;
12
13use crate::consensus::block::Block;
14
15#[derive(Clone, Debug)]
16pub(crate) struct Mailbox {
17    inner: mpsc::UnboundedSender<MessageWithCause>,
18}
19
20impl Mailbox {
21    pub(super) fn new(inner: mpsc::UnboundedSender<MessageWithCause>) -> Self {
22        Self { inner }
23    }
24}
25
26pub(super) struct MessageWithCause {
27    pub(super) cause: Span,
28    pub(super) message: Message,
29}
30
31impl MessageWithCause {
32    fn in_current_span(cmd: impl Into<Message>) -> Self {
33        Self {
34            cause: Span::current(),
35            message: cmd.into(),
36        }
37    }
38}
39
40pub(super) enum Message {
41    Track {
42        id: u64,
43        peers: Map<PublicKey, Address>,
44    },
45    Overwrite {
46        peers: Map<PublicKey, Address>,
47    },
48    PeerSet {
49        id: u64,
50        response: oneshot::Sender<Option<TrackedPeers<PublicKey>>>,
51    },
52    Subscribe {
53        response: oneshot::Sender<PeerSetSubscription<PublicKey>>,
54    },
55    Finalized(Box<Update<Block>>),
56}
57
58impl From<Update<Block>> for Message {
59    fn from(value: Update<Block>) -> Self {
60        Self::Finalized(Box::new(value))
61    }
62}
63
64impl Provider for Mailbox {
65    type PublicKey = PublicKey;
66
67    async fn peer_set(&mut self, id: u64) -> Option<TrackedPeers<Self::PublicKey>> {
68        let (tx, rx) = oneshot::channel();
69        if let Err(error) =
70            self.inner
71                .unbounded_send(MessageWithCause::in_current_span(Message::PeerSet {
72                    id,
73                    response: tx,
74                }))
75        {
76            error!(%error, "failed to send message to peer_manager");
77            return None;
78        }
79        rx.await.ok().flatten()
80    }
81
82    async fn subscribe(&mut self) -> PeerSetSubscription<Self::PublicKey> {
83        let (tx, rx) = oneshot::channel();
84
85        let (_, fallback_rx) = commonware_utils::channel::mpsc::unbounded_channel();
86
87        if let Err(error) =
88            self.inner
89                .unbounded_send(MessageWithCause::in_current_span(Message::Subscribe {
90                    response: tx,
91                }))
92        {
93            error!(%error, "failed to send message to peer_manager");
94            return fallback_rx;
95        }
96
97        if let Ok(subscription) = rx.await {
98            return subscription;
99        }
100
101        error!(
102            error = "actor dropped channel before returning subscription",
103            "failed to send message to peer_manager",
104        );
105
106        fallback_rx
107    }
108}
109
110impl AddressableManager for Mailbox {
111    async fn track<R>(&mut self, id: u64, peers: R)
112    where
113        R: Into<AddressableTrackedPeers<Self::PublicKey>> + Send,
114    {
115        let addressable: AddressableTrackedPeers<Self::PublicKey> = peers.into();
116        if let Err(error) = self
117            .inner
118            .unbounded_send(MessageWithCause::in_current_span(Message::Track {
119                id,
120                peers: addressable.primary,
121            }))
122            .wrap_err("actor no longer running")
123        {
124            error!(%error, "failed to send message to peer_manager");
125        }
126    }
127
128    async fn overwrite(&mut self, peers: Map<Self::PublicKey, Address>) {
129        if let Err(error) = self
130            .inner
131            .unbounded_send(MessageWithCause::in_current_span(Message::Overwrite {
132                peers,
133            }))
134            .wrap_err("actor no longer running")
135        {
136            error!(%error, "failed to send message to peer_manager");
137        }
138    }
139}
140
141impl Reporter for Mailbox {
142    type Activity = Update<Block>;
143
144    async fn report(&mut self, activity: Self::Activity) {
145        if let Err(error) = self
146            .inner
147            .unbounded_send(MessageWithCause::in_current_span(activity))
148        {
149            error!(%error, "failed to send message to peer_manager");
150        }
151    }
152}