Skip to main content

tempo_commonware_node/peer_manager/
ingress.rs

1use commonware_consensus::{Reporter, marshal::Update};
2use commonware_p2p::{Address, AddressableManager, Provider};
3use commonware_utils::ordered::{Map, Set};
4use eyre::WrapErr as _;
5use futures::channel::{mpsc, oneshot};
6use tracing::{Span, error};
7
8type SubscribeReceiver =
9    commonware_utils::channel::mpsc::UnboundedReceiver<(u64, Set<PublicKey>, Set<PublicKey>)>;
10
11use commonware_cryptography::ed25519::PublicKey;
12
13use crate::consensus::block::Block;
14
15#[derive(Clone, Debug)]
16pub(crate) struct Mailbox {
17    inner: mpsc::UnboundedSender<MessageWithCause>,
18}
19
20impl Mailbox {
21    pub(super) fn new(inner: mpsc::UnboundedSender<MessageWithCause>) -> Self {
22        Self { inner }
23    }
24}
25
26pub(super) struct MessageWithCause {
27    pub(super) cause: Span,
28    pub(super) message: Message,
29}
30
31impl MessageWithCause {
32    fn in_current_span(cmd: impl Into<Message>) -> Self {
33        Self {
34            cause: Span::current(),
35            message: cmd.into(),
36        }
37    }
38}
39
40pub(super) enum Message {
41    Track {
42        id: u64,
43        peers: Map<PublicKey, Address>,
44    },
45    Overwrite {
46        peers: Map<PublicKey, Address>,
47    },
48    PeerSet {
49        id: u64,
50        response: oneshot::Sender<Option<Set<PublicKey>>>,
51    },
52    Subscribe {
53        response: oneshot::Sender<SubscribeReceiver>,
54    },
55    Finalized(Box<Update<Block>>),
56}
57
58impl From<Update<Block>> for Message {
59    fn from(value: Update<Block>) -> Self {
60        Self::Finalized(Box::new(value))
61    }
62}
63
64impl Provider for Mailbox {
65    type PublicKey = PublicKey;
66
67    async fn peer_set(&mut self, id: u64) -> Option<Set<Self::PublicKey>> {
68        let (tx, rx) = oneshot::channel();
69        if let Err(error) =
70            self.inner
71                .unbounded_send(MessageWithCause::in_current_span(Message::PeerSet {
72                    id,
73                    response: tx,
74                }))
75        {
76            error!(%error, "failed to send message to peer_manager");
77            return None;
78        }
79        rx.await.ok().flatten()
80    }
81
82    async fn subscribe(
83        &mut self,
84    ) -> commonware_utils::channel::mpsc::UnboundedReceiver<(
85        u64,
86        Set<Self::PublicKey>,
87        Set<Self::PublicKey>,
88    )> {
89        let (tx, rx) = oneshot::channel();
90
91        let (_, fallback_rx) = commonware_utils::channel::mpsc::unbounded_channel();
92
93        if let Err(error) =
94            self.inner
95                .unbounded_send(MessageWithCause::in_current_span(Message::Subscribe {
96                    response: tx,
97                }))
98        {
99            error!(%error, "failed to send message to peer_manager");
100            return fallback_rx;
101        }
102
103        if let Ok(subscription) = rx.await {
104            return subscription;
105        }
106
107        error!(
108            error = "actor dropped channel before returning subscription",
109            "failed to send message to peer_manager",
110        );
111
112        fallback_rx
113    }
114}
115
116impl AddressableManager for Mailbox {
117    async fn track(&mut self, id: u64, peers: Map<Self::PublicKey, Address>) {
118        if let Err(error) = self
119            .inner
120            .unbounded_send(MessageWithCause::in_current_span(Message::Track {
121                id,
122                peers,
123            }))
124            .wrap_err("actor no longer running")
125        {
126            error!(%error, "failed to send message to peer_manager");
127        }
128    }
129
130    async fn overwrite(&mut self, peers: Map<Self::PublicKey, Address>) {
131        if let Err(error) = self
132            .inner
133            .unbounded_send(MessageWithCause::in_current_span(Message::Overwrite {
134                peers,
135            }))
136            .wrap_err("actor no longer running")
137        {
138            error!(%error, "failed to send message to peer_manager");
139        }
140    }
141}
142
143impl Reporter for Mailbox {
144    type Activity = Update<Block>;
145
146    async fn report(&mut self, activity: Self::Activity) {
147        if let Err(error) = self
148            .inner
149            .unbounded_send(MessageWithCause::in_current_span(activity))
150        {
151            error!(%error, "failed to send message to peer_manager");
152        }
153    }
154}