Skip to main content

tempo_commonware_node/peer_manager/
actor.rs

1use std::{pin::Pin, time::Duration};
2
3use alloy_consensus::{BlockHeader as _, Sealable as _};
4use commonware_codec::ReadExt as _;
5use commonware_consensus::{
6    Heightable as _,
7    marshal::Update,
8    types::{Epocher, FixedEpocher, Height},
9};
10use commonware_cryptography::ed25519::PublicKey;
11use commonware_p2p::{Address, AddressableManager, Provider};
12use commonware_runtime::{Clock, ContextCell, Metrics, Spawner, spawn_cell};
13use commonware_utils::{Acknowledgement, ordered};
14use eyre::{OptionExt as _, WrapErr as _};
15use futures::{StreamExt as _, channel::mpsc};
16use prometheus_client::metrics::gauge::Gauge;
17use reth_ethereum::{chainspec::EthChainSpec, network::NetworkInfo};
18use reth_provider::{BlockIdReader as _, HeaderProvider as _};
19use tempo_chainspec::hardfork::TempoHardforks as _;
20use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
21use tempo_node::TempoFullNode;
22use tempo_primitives::TempoHeader;
23use tracing::{Span, debug, error, info_span, instrument, warn};
24
25use crate::{
26    consensus::block::Block,
27    validators::{
28        read_active_and_known_peers_at_block_hash, read_active_and_known_peers_at_block_hash_v1,
29    },
30};
31
32/// The interval on which the peer set is update during bootstrapping.
33/// Aggressive timing to get started.
34const BOOTSTRAP_UPDATE_INTERVAL: Duration = Duration::from_secs(5);
35
36/// The interval on which peer sets are freshed during normal operation.
37/// Relaxed timing during normal operation.
38const HEARTBEAT_UPDATE_INTERVAL: Duration = Duration::from_secs(30);
39
40use super::ingress::{Message, MessageWithCause};
41
42pub(crate) struct Actor<TContext, TPeerManager>
43where
44    TPeerManager: AddressableManager<PublicKey = PublicKey>,
45{
46    context: ContextCell<TContext>,
47
48    oracle: TPeerManager,
49    execution_node: TempoFullNode,
50    executor: crate::executor::Mailbox,
51    epoch_strategy: FixedEpocher,
52    last_finalized_height: Height,
53    mailbox: mpsc::UnboundedReceiver<MessageWithCause>,
54
55    peers: Gauge,
56
57    last_tracked_peer_set: Option<LastTrackedPeerSet>,
58
59    peer_update_timer: Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
60}
61
62impl<TContext, TPeerManager> Actor<TContext, TPeerManager>
63where
64    TContext: Clock + Metrics + Spawner,
65    TPeerManager: AddressableManager<PublicKey = PublicKey>,
66{
67    pub(super) fn new(
68        context: TContext,
69        super::Config {
70            oracle,
71            execution_node,
72            executor,
73            epoch_strategy,
74            last_finalized_height,
75        }: super::Config<TPeerManager>,
76        mailbox: mpsc::UnboundedReceiver<MessageWithCause>,
77    ) -> Self {
78        let peers = Gauge::default();
79        context.register(
80            "peers",
81            "how many peers are registered overall for the latest epoch",
82            peers.clone(),
83        );
84        let context = ContextCell::new(context);
85        let peer_update_timer = Box::pin(context.sleep(BOOTSTRAP_UPDATE_INTERVAL));
86        Self {
87            context,
88            oracle,
89            execution_node,
90            executor,
91            epoch_strategy,
92            last_finalized_height,
93            mailbox,
94            peers,
95            last_tracked_peer_set: None,
96
97            peer_update_timer,
98        }
99    }
100
101    async fn run(mut self) {
102        let reason = 'event_loop: loop {
103            tokio::select!(
104                biased;
105
106                msg = self.mailbox.next() => {
107                    match msg {
108                        None => break 'event_loop eyre::eyre!("mailbox closed unexpectedly"),
109
110                        Some(msg) => {
111                            if let Err(error) = self.handle_message(msg.cause, msg.message).await {
112                                break 'event_loop error;
113                            }
114                        }
115                    }
116                }
117
118                // Perform aggressive retries if no peer set is tracked yet.
119                // Otherwise just do it every minute.
120                _ = &mut self.peer_update_timer => {
121                    let _ = self.update_peer_set(None).await;
122                    self.reset_peer_update_timer();
123                }
124            )
125        };
126        info_span!("peer_manager").in_scope(|| error!(%reason,"agent shutting down"));
127    }
128    pub(crate) fn start(mut self) -> commonware_runtime::Handle<()> {
129        spawn_cell!(self.context, self.run().await)
130    }
131
132    #[instrument(parent = &cause, skip_all)]
133    async fn handle_message(&mut self, cause: Span, message: Message) -> eyre::Result<()> {
134        match message {
135            Message::Track { id, peers } => {
136                AddressableManager::track(&mut self.oracle, id, peers).await;
137            }
138            Message::Overwrite { peers } => {
139                AddressableManager::overwrite(&mut self.oracle, peers).await;
140            }
141            Message::PeerSet { id, response } => {
142                let result = Provider::peer_set(&mut self.oracle, id).await;
143                let _ = response.send(result);
144            }
145            Message::Subscribe { response } => {
146                let receiver = Provider::subscribe(&mut self.oracle).await;
147                let _ = response.send(receiver);
148            }
149            Message::Finalized(update) => match *update {
150                Update::Block(block, ack) => {
151                    let _ = self.update_peer_set(Some(block)).await;
152                    ack.acknowledge();
153                    self.reset_peer_update_timer();
154                }
155                Update::Tip { .. } => {}
156            },
157        }
158        Ok(())
159    }
160
161    /// Updates the peer set.
162    #[instrument(
163        skip_all,
164        fields(
165            block.height = block.as_ref().map(|b| tracing::field::display(b.height())),
166        ),
167        err,
168    )]
169    async fn update_peer_set(&mut self, block: Option<Block>) -> eyre::Result<()> {
170        if let Some(block) = &block
171            && let Err(reason) = self.executor.subscribe_finalized(block.height()).await
172        {
173            warn!(
174                %reason,
175                "unable to clarify whether the finalized block was already \
176                forwarded to execution layer; will try to read validator \
177                config contract, but it will likely fail",
178            );
179        }
180
181        let maybe_latest_finalized_header = self.read_highest_finalized_header();
182        let reference_timestamp = match &block {
183            Some(block) => maybe_latest_finalized_header.ok().map_or_else(
184                || block.timestamp(),
185                |header| header.timestamp().max(block.timestamp()),
186            ),
187            None => maybe_latest_finalized_header
188                .wrap_err("could not determine a timestamp to determine peer behavior")?
189                .timestamp(),
190        };
191
192        // Post T2 behavior: do a best-effort update of the peerset, to whatever
193        // is available as long as it is newer than what we are already tracking.
194        //
195        // Also run this if we do not yet have any peer set available.
196        if self
197            .execution_node
198            .chain_spec()
199            .is_t2_active_at_timestamp(reference_timestamp)
200            || self.last_tracked_peer_set.is_none()
201        {
202            self.refresh_peers()
203                .await
204                .wrap_err("failed refreshing peer set")?;
205        } else if let Some(block) = block {
206            let height = block.number();
207            if let Some(peers) = read_peer_set_if_boundary(
208                &self.context,
209                &self.epoch_strategy,
210                &self.execution_node,
211                block,
212            )
213            .await
214            {
215                self.track_or_overwrite(height, peers).await;
216            }
217        }
218        Ok(())
219    }
220
221    /// Reads the peers given the latest finalized state.
222    /// and finalized state.
223    #[instrument(skip_all, err)]
224    async fn refresh_peers(&mut self) -> eyre::Result<()> {
225        // Always take whatever is higher: the last finalized height as per
226        // consensus layer (greater than 0 only on restarts with populated
227        // consensus state), or the highest finalized block number from the
228        // execution layer.
229        //
230        // This works even if the execution layer was replaced with a snapshot.
231        //
232        // There is no point taking an outdated state because the network has
233        // moved on and there is no guarantee that older peers are even around.
234        //
235        // Compare this to the DKG actor, which boots into older DKG epochs
236        // because it attempts to replay older rounds.
237        let highest_finalized = self
238            .execution_node
239            .provider
240            .finalized_block_number()
241            .wrap_err("unable to read highest finalized block from execution layer")?
242            .unwrap_or(self.last_finalized_height.get())
243            .max(self.last_finalized_height.get());
244
245        // Short circuit - no need to read the same state if there is no new data.
246        if self
247            .last_tracked_peer_set
248            .as_ref()
249            .is_some_and(|tracked| tracked.height >= highest_finalized)
250        {
251            return Ok(());
252        }
253
254        let epoch_info = self
255            .epoch_strategy
256            .containing(Height::new(highest_finalized))
257            .expect("epoch strategy covers all heights");
258
259        // If we're exactly on a boundary, use it; otherwise use the previous
260        // epoch's last block (or genesis).
261        //
262        // This height is guaranteed to be finalized.
263        let latest_boundary = if epoch_info.last().get() == highest_finalized {
264            highest_finalized
265        } else {
266            epoch_info
267                .epoch()
268                .previous()
269                .map_or_else(Height::zero, |prev| {
270                    self.epoch_strategy
271                        .last(prev)
272                        .expect("epoch strategy covers all epochs")
273                })
274                .get()
275        };
276
277        let latest_boundary_header = read_header_at_height(&self.execution_node, latest_boundary)
278            .wrap_err("failed reading latest boundary header")?;
279        let highest_finalized_header =
280            read_header_at_height(&self.execution_node, highest_finalized)
281                .wrap_err("failed reading highest finalized header")?;
282
283        let onchain_outcome =
284            OnchainDkgOutcome::read(&mut latest_boundary_header.extra_data().as_ref())
285                .wrap_err_with(|| {
286                    format!(
287                        "boundary block at `{latest_boundary}` did not contain a valid DKG outcome"
288                    )
289                })?;
290
291        let peers_as_per_dkg = ordered::Set::from_iter_dedup(
292            onchain_outcome
293                .players()
294                .iter()
295                .cloned()
296                .chain(onchain_outcome.next_players().iter().cloned()),
297        );
298        let peers = read_active_and_known_peers_at_block_hash(
299            &self.execution_node,
300            &peers_as_per_dkg,
301            highest_finalized_header.hash_slow(),
302        )
303        .wrap_err("unable to read initial peer set from execution layer")?;
304
305        self.track_or_overwrite(highest_finalized_header.number(), peers)
306            .await;
307
308        Ok(())
309    }
310
311    async fn track_or_overwrite(
312        &mut self,
313        height: u64,
314        peers: ordered::Map<PublicKey, commonware_p2p::Address>,
315    ) {
316        if let Some(tracked) = &self.last_tracked_peer_set {
317            // Overwrite the addresses if only the addresses are changed.
318            if peers.keys() == tracked.peers.keys() {
319                if peers.values() != tracked.peers.values() {
320                    self.oracle.overwrite(peers.clone()).await;
321                }
322            // Otherwise track the new peers.
323            } else {
324                self.oracle.track(height, peers.clone()).await;
325            }
326        } else {
327            self.oracle.track(height, peers.clone()).await;
328        }
329
330        // Always bump the last-tracked peer set. If the peers are unchanged
331        // this only updates the height, but we use the height to determine if
332        // state should be read or not.
333        self.last_tracked_peer_set
334            .replace(LastTrackedPeerSet { height, peers });
335
336        if let Some(tracked) = &self.last_tracked_peer_set {
337            self.peers.set(tracked.peers.len() as i64);
338        }
339
340        debug!(
341            last_tracked_peer_set = ?self.last_tracked_peer_set.as_ref().expect("just set it"),
342            "latest tracked peerset",
343        );
344    }
345
346    fn reset_peer_update_timer(&mut self) {
347        // Perform aggressive retries if no peer set is tracked yet.
348        // Otherwise just do it every minute.
349        self.peer_update_timer = Box::pin(
350            self.context.sleep(
351                self.last_tracked_peer_set
352                    .as_ref()
353                    .map_or(BOOTSTRAP_UPDATE_INTERVAL, |_| HEARTBEAT_UPDATE_INTERVAL),
354            ),
355        );
356    }
357    #[instrument(skip_all, fields(height), err)]
358    fn read_highest_finalized_header(&self) -> eyre::Result<TempoHeader> {
359        let highest_finalized = match self.execution_node.provider.finalized_block_hash() {
360            Ok(Some(highest_finalized)) => Ok(highest_finalized),
361            Ok(None) if self.last_finalized_height == Height::zero() => {
362                Ok(self.execution_node.chain_spec().genesis_hash())
363            }
364            Ok(None) => Err(eyre::eyre!(
365                "execution layer has no record of any finalization hashes"
366            )),
367            Err(err) => Err(eyre::Report::new(err)),
368        }
369        .wrap_err("failed reading latest finalizhed hash from execution layer")?;
370        self.execution_node
371            .provider
372            .header_by_hash_or_number(highest_finalized.into())
373            .map_err(eyre::Report::new)
374            .and_then(|h| {
375                h.ok_or_eyre(
376                    "execution layer did not have the header for the advertised finalized hash",
377                )
378            })
379            .wrap_err_with(|| format!("failed reading header for hash `{highest_finalized}`"))
380    }
381}
382
383#[derive(Debug)]
384struct LastTrackedPeerSet {
385    height: u64,
386    peers: ordered::Map<PublicKey, commonware_p2p::Address>,
387}
388
389#[instrument(skip_all, fields(height), err)]
390fn read_header_at_height(execution_node: &TempoFullNode, height: u64) -> eyre::Result<TempoHeader> {
391    execution_node
392        .provider
393        .header_by_number(height)
394        .map_err(eyre::Report::new)
395        .and_then(|h| h.ok_or_eyre("execution layer did not have a header at the requested height"))
396        .wrap_err_with(|| format!("failed reading header at height `{height}`"))
397}
398
399async fn read_peer_set_if_boundary(
400    context: &impl commonware_runtime::Clock,
401    epoch_strategy: &FixedEpocher,
402    node: &TempoFullNode,
403    block: Block,
404) -> Option<ordered::Map<PublicKey, Address>> {
405    let mut attempts = 0;
406    const MIN_RETRY: Duration = Duration::from_secs(1);
407    const MAX_RETRY: Duration = Duration::from_secs(30);
408
409    if epoch_strategy
410        .containing(block.height())
411        .expect("valid for all heights")
412        .last()
413        != block.height()
414    {
415        return None;
416    }
417
418    let onchain_outcome = OnchainDkgOutcome::read(&mut block.header().extra_data().as_ref())
419        .expect("invariant: boundary blocks must contain DKG outcome");
420
421    let peers_as_per_dkg = ordered::Set::from_iter_dedup(
422        onchain_outcome
423            .players()
424            .iter()
425            .cloned()
426            .chain(onchain_outcome.next_players().iter().cloned()),
427    );
428
429    loop {
430        attempts += 1;
431        if let Ok(peers) =
432            read_active_and_known_peers_at_block_hash_v1(node, &peers_as_per_dkg, block.hash())
433        {
434            return Some(peers);
435        }
436
437        let retry_after = MIN_RETRY.saturating_mul(attempts).min(MAX_RETRY);
438        let is_syncing = node.network.is_syncing();
439        let best_finalized = node.provider.finalized_block_number().ok().flatten();
440        let blocks_behind = best_finalized
441            .as_ref()
442            .map(|best| block.height().get().saturating_sub(*best));
443        tracing::warn_span!("read_peer_set_if_boundary").in_scope(|| {
444            warn!(
445                attempts,
446                retry_after = %tempo_telemetry_util::display_duration(retry_after),
447                is_syncing,
448                best_finalized = %tempo_telemetry_util::display_option(&best_finalized),
449                target_height = %block.height(),
450                blocks_behind = %tempo_telemetry_util::display_option(&blocks_behind),
451                "reading validator config from contract failed; will retry",
452            );
453        });
454        context.sleep(retry_after).await;
455    }
456}