Skip to main content

tempo_commonware_node/peer_manager/
actor.rs

1use std::{pin::Pin, sync::Arc, time::Duration};
2
3use alloy_consensus::{BlockHeader as _, Sealable as _};
4use commonware_codec::ReadExt as _;
5use commonware_consensus::{
6    marshal::Update,
7    types::{Epocher, FixedEpocher, Height},
8};
9use commonware_cryptography::ed25519::PublicKey;
10use commonware_p2p::{AddressableManager, Provider};
11use commonware_runtime::{Clock, ContextCell, Metrics, Spawner, spawn_cell};
12use commonware_utils::{Acknowledgement, ordered};
13use eyre::{OptionExt as _, WrapErr as _};
14use futures::{StreamExt as _, channel::mpsc};
15use prometheus_client::metrics::gauge::Gauge;
16use reth_provider::{BlockIdReader as _, HeaderProvider as _};
17use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
18use tempo_node::TempoFullNode;
19use tempo_primitives::TempoHeader;
20use tracing::{Span, debug, error, info_span, instrument};
21
22use crate::validators::read_active_and_known_peers_at_block_hash;
23
24/// The interval on which the peer set is update during bootstrapping.
25/// Aggressive timing to get started.
26const BOOTSTRAP_UPDATE_INTERVAL: Duration = Duration::from_secs(5);
27
28/// The interval on which peer sets are freshed during normal operation.
29/// Relaxed timing during normal operation.
30const HEARTBEAT_UPDATE_INTERVAL: Duration = Duration::from_secs(30);
31
32use super::ingress::{Message, MessageWithCause};
33
34pub(crate) struct Actor<TContext, TPeerManager>
35where
36    TPeerManager: AddressableManager<PublicKey = PublicKey>,
37{
38    context: ContextCell<TContext>,
39
40    oracle: TPeerManager,
41    execution_node: Arc<TempoFullNode>,
42    epoch_strategy: FixedEpocher,
43    last_finalized_height: Height,
44    mailbox: mpsc::UnboundedReceiver<MessageWithCause>,
45
46    peers: Gauge,
47
48    last_tracked_peer_set: Option<LastTrackedPeerSet>,
49
50    peer_update_timer: Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
51}
52
53impl<TContext, TPeerManager> Actor<TContext, TPeerManager>
54where
55    TContext: Clock + Metrics + Spawner,
56    TPeerManager: AddressableManager<PublicKey = PublicKey>,
57{
58    pub(super) fn new(
59        context: TContext,
60        super::Config {
61            oracle,
62            execution_node,
63            epoch_strategy,
64            last_finalized_height,
65        }: super::Config<TPeerManager>,
66        mailbox: mpsc::UnboundedReceiver<MessageWithCause>,
67    ) -> Self {
68        let peers = Gauge::default();
69        context.register(
70            "peers",
71            "how many peers are registered overall for the latest epoch",
72            peers.clone(),
73        );
74        let context = ContextCell::new(context);
75        let peer_update_timer = Box::pin(context.sleep(BOOTSTRAP_UPDATE_INTERVAL));
76        Self {
77            context,
78            oracle,
79            execution_node,
80            epoch_strategy,
81            last_finalized_height,
82            mailbox,
83            peers,
84            last_tracked_peer_set: None,
85
86            peer_update_timer,
87        }
88    }
89
90    async fn run(mut self) {
91        let reason = 'event_loop: loop {
92            tokio::select!(
93                biased;
94                msg = self.mailbox.next() => {
95                    match msg {
96                        None => break 'event_loop eyre::eyre!("mailbox closed unexpectedly"),
97
98                        Some(msg) => {
99                            if let Err(error) = self.handle_message(msg.cause, msg.message).await {
100                                break 'event_loop error;
101                            }
102                        }
103                    }
104                }
105                // Perform aggressive retries if no peer set is tracked yet.
106                // Otherwise just do it every minute.
107                _ = &mut self.peer_update_timer => {
108                    let _ = self.refresh_peers().await;
109                    self.reset_peer_update_timer();
110                }
111            )
112        };
113        info_span!("peer_manager").in_scope(|| error!(%reason,"agent shutting down"));
114    }
115    pub(crate) fn start(mut self) -> commonware_runtime::Handle<()> {
116        spawn_cell!(self.context, self.run())
117    }
118
119    #[instrument(parent = &cause, skip_all)]
120    async fn handle_message(&mut self, cause: Span, message: Message) -> eyre::Result<()> {
121        match message {
122            Message::Track { id, peers } => {
123                AddressableManager::track(&mut self.oracle, id, peers).await;
124            }
125            Message::Overwrite { peers } => {
126                AddressableManager::overwrite(&mut self.oracle, peers).await;
127            }
128            Message::PeerSet { id, response } => {
129                let result = Provider::peer_set(&mut self.oracle, id).await;
130                let _ = response.send(result);
131            }
132            Message::Subscribe { response } => {
133                let receiver = Provider::subscribe(&mut self.oracle).await;
134                let _ = response.send(receiver);
135            }
136            Message::Finalized(update) => match *update {
137                Update::Block(_, ack) => {
138                    let _ = self.refresh_peers().await;
139                    ack.acknowledge();
140                    self.reset_peer_update_timer();
141                }
142                Update::Tip { .. } => {}
143            },
144        }
145        Ok(())
146    }
147
148    /// Reads the peers given the latest finalized state.
149    /// and finalized state.
150    #[instrument(skip_all, err)]
151    async fn refresh_peers(&mut self) -> eyre::Result<()> {
152        // Always take whatever is higher: the last finalized height as per
153        // consensus layer (greater than 0 only on restarts with populated
154        // consensus state), or the highest finalized block number from the
155        // execution layer.
156        //
157        // This works even if the execution layer was replaced with a snapshot.
158        //
159        // There is no point taking an outdated state because the network has
160        // moved on and there is no guarantee that older peers are even around.
161        //
162        // Compare this to the DKG actor, which boots into older DKG epochs
163        // because it attempts to replay older rounds.
164        let highest_finalized = self
165            .execution_node
166            .provider
167            .finalized_block_number()
168            .wrap_err("unable to read highest finalized block from execution layer")?
169            .unwrap_or(self.last_finalized_height.get())
170            .max(self.last_finalized_height.get());
171
172        // Short circuit - no need to read the same state if there is no new data.
173        if self
174            .last_tracked_peer_set
175            .as_ref()
176            .is_some_and(|tracked| tracked.height >= highest_finalized)
177        {
178            return Ok(());
179        }
180
181        let epoch_info = self
182            .epoch_strategy
183            .containing(Height::new(highest_finalized))
184            .expect("epoch strategy covers all heights");
185
186        // If we're exactly on a boundary, use it; otherwise use the previous
187        // epoch's last block (or genesis).
188        //
189        // This height is guaranteed to be finalized.
190        let latest_boundary = if epoch_info.last().get() == highest_finalized {
191            highest_finalized
192        } else {
193            epoch_info
194                .epoch()
195                .previous()
196                .map_or_else(Height::zero, |prev| {
197                    self.epoch_strategy
198                        .last(prev)
199                        .expect("epoch strategy covers all epochs")
200                })
201                .get()
202        };
203
204        let latest_boundary_header = read_header_at_height(&self.execution_node, latest_boundary)
205            .wrap_err("failed reading latest boundary header")?;
206        let highest_finalized_header =
207            read_header_at_height(&self.execution_node, highest_finalized)
208                .wrap_err("failed reading highest finalized header")?;
209
210        let onchain_outcome =
211            OnchainDkgOutcome::read(&mut latest_boundary_header.extra_data().as_ref())
212                .wrap_err_with(|| {
213                    format!(
214                        "boundary block at `{latest_boundary}` did not contain a valid DKG outcome"
215                    )
216                })?;
217
218        let peers_as_per_dkg = ordered::Set::from_iter_dedup(
219            onchain_outcome
220                .players()
221                .iter()
222                .cloned()
223                .chain(onchain_outcome.next_players().iter().cloned()),
224        );
225        let peers = read_active_and_known_peers_at_block_hash(
226            &self.execution_node,
227            &peers_as_per_dkg,
228            highest_finalized_header.hash_slow(),
229        )
230        .wrap_err("unable to read initial peer set from execution layer")?;
231
232        self.track_or_overwrite(highest_finalized_header.number(), peers)
233            .await;
234
235        Ok(())
236    }
237
238    async fn track_or_overwrite(
239        &mut self,
240        height: u64,
241        peers: ordered::Map<PublicKey, commonware_p2p::Address>,
242    ) {
243        if let Some(tracked) = &self.last_tracked_peer_set {
244            // Overwrite the addresses if only the addresses are changed.
245            if peers.keys() == tracked.peers.keys() {
246                if peers.values() != tracked.peers.values() {
247                    self.oracle.overwrite(peers.clone()).await;
248                }
249            // Otherwise track the new peers.
250            } else {
251                self.oracle.track(height, peers.clone()).await;
252            }
253        } else {
254            self.oracle.track(height, peers.clone()).await;
255        }
256
257        // Always bump the last-tracked peer set. If the peers are unchanged
258        // this only updates the height, but we use the height to determine if
259        // state should be read or not.
260        self.last_tracked_peer_set
261            .replace(LastTrackedPeerSet { height, peers });
262
263        if let Some(tracked) = &self.last_tracked_peer_set {
264            self.peers.set(tracked.peers.len() as i64);
265        }
266
267        debug!(
268            last_tracked_peer_set = ?self.last_tracked_peer_set.as_ref().expect("just set it"),
269            "latest tracked peerset",
270        );
271    }
272
273    fn reset_peer_update_timer(&mut self) {
274        // Perform aggressive retries if no peer set is tracked yet.
275        // Otherwise just do it every minute.
276        self.peer_update_timer = Box::pin(
277            self.context.sleep(
278                self.last_tracked_peer_set
279                    .as_ref()
280                    .map_or(BOOTSTRAP_UPDATE_INTERVAL, |_| HEARTBEAT_UPDATE_INTERVAL),
281            ),
282        );
283    }
284}
285
286#[derive(Debug)]
287struct LastTrackedPeerSet {
288    height: u64,
289    peers: ordered::Map<PublicKey, commonware_p2p::Address>,
290}
291
292#[instrument(skip_all, fields(height), err)]
293fn read_header_at_height(execution_node: &TempoFullNode, height: u64) -> eyre::Result<TempoHeader> {
294    execution_node
295        .provider
296        .header_by_number(height)
297        .map_err(eyre::Report::new)
298        .and_then(|h| h.ok_or_eyre("execution layer did not have a header at the requested height"))
299        .wrap_err_with(|| format!("failed reading header at height `{height}`"))
300}