tempo_commonware_node/peer_manager/
actor.rs1use std::{pin::Pin, time::Duration};
2
3use alloy_consensus::{BlockHeader as _, Sealable as _};
4use commonware_codec::ReadExt as _;
5use commonware_consensus::{
6 Heightable as _,
7 marshal::Update,
8 types::{Epocher, FixedEpocher, Height},
9};
10use commonware_cryptography::ed25519::PublicKey;
11use commonware_p2p::{Address, AddressableManager, Provider};
12use commonware_runtime::{Clock, ContextCell, Metrics, Spawner, spawn_cell};
13use commonware_utils::{Acknowledgement, ordered};
14use eyre::{OptionExt as _, WrapErr as _};
15use futures::{StreamExt as _, channel::mpsc};
16use prometheus_client::metrics::gauge::Gauge;
17use reth_ethereum::{chainspec::EthChainSpec, network::NetworkInfo};
18use reth_provider::{BlockIdReader as _, HeaderProvider as _};
19use tempo_chainspec::hardfork::TempoHardforks as _;
20use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
21use tempo_node::TempoFullNode;
22use tempo_primitives::TempoHeader;
23use tracing::{Span, debug, error, info_span, instrument, warn};
24
25use crate::{
26 consensus::block::Block,
27 validators::{
28 read_active_and_known_peers_at_block_hash, read_active_and_known_peers_at_block_hash_v1,
29 },
30};
31
32const BOOTSTRAP_UPDATE_INTERVAL: Duration = Duration::from_secs(5);
35
36const HEARTBEAT_UPDATE_INTERVAL: Duration = Duration::from_secs(30);
39
40use super::ingress::{Message, MessageWithCause};
41
42pub(crate) struct Actor<TContext, TPeerManager>
43where
44 TPeerManager: AddressableManager<PublicKey = PublicKey>,
45{
46 context: ContextCell<TContext>,
47
48 oracle: TPeerManager,
49 execution_node: TempoFullNode,
50 executor: crate::executor::Mailbox,
51 epoch_strategy: FixedEpocher,
52 last_finalized_height: Height,
53 mailbox: mpsc::UnboundedReceiver<MessageWithCause>,
54
55 peers: Gauge,
56
57 last_tracked_peer_set: Option<LastTrackedPeerSet>,
58
59 peer_update_timer: Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
60}
61
62impl<TContext, TPeerManager> Actor<TContext, TPeerManager>
63where
64 TContext: Clock + Metrics + Spawner,
65 TPeerManager: AddressableManager<PublicKey = PublicKey>,
66{
67 pub(super) fn new(
68 context: TContext,
69 super::Config {
70 oracle,
71 execution_node,
72 executor,
73 epoch_strategy,
74 last_finalized_height,
75 }: super::Config<TPeerManager>,
76 mailbox: mpsc::UnboundedReceiver<MessageWithCause>,
77 ) -> Self {
78 let peers = Gauge::default();
79 context.register(
80 "peers",
81 "how many peers are registered overall for the latest epoch",
82 peers.clone(),
83 );
84 let context = ContextCell::new(context);
85 let peer_update_timer = Box::pin(context.sleep(BOOTSTRAP_UPDATE_INTERVAL));
86 Self {
87 context,
88 oracle,
89 execution_node,
90 executor,
91 epoch_strategy,
92 last_finalized_height,
93 mailbox,
94 peers,
95 last_tracked_peer_set: None,
96
97 peer_update_timer,
98 }
99 }
100
101 async fn run(mut self) {
102 let reason = 'event_loop: loop {
103 tokio::select!(
104 biased;
105
106 msg = self.mailbox.next() => {
107 match msg {
108 None => break 'event_loop eyre::eyre!("mailbox closed unexpectedly"),
109
110 Some(msg) => {
111 if let Err(error) = self.handle_message(msg.cause, msg.message).await {
112 break 'event_loop error;
113 }
114 }
115 }
116 }
117
118 _ = &mut self.peer_update_timer => {
121 let _ = self.update_peer_set(None).await;
122 self.reset_peer_update_timer();
123 }
124 )
125 };
126 info_span!("peer_manager").in_scope(|| error!(%reason,"agent shutting down"));
127 }
128 pub(crate) fn start(mut self) -> commonware_runtime::Handle<()> {
129 spawn_cell!(self.context, self.run().await)
130 }
131
132 #[instrument(parent = &cause, skip_all)]
133 async fn handle_message(&mut self, cause: Span, message: Message) -> eyre::Result<()> {
134 match message {
135 Message::Track { id, peers } => {
136 AddressableManager::track(&mut self.oracle, id, peers).await;
137 }
138 Message::Overwrite { peers } => {
139 AddressableManager::overwrite(&mut self.oracle, peers).await;
140 }
141 Message::PeerSet { id, response } => {
142 let result = Provider::peer_set(&mut self.oracle, id).await;
143 let _ = response.send(result);
144 }
145 Message::Subscribe { response } => {
146 let receiver = Provider::subscribe(&mut self.oracle).await;
147 let _ = response.send(receiver);
148 }
149 Message::Finalized(update) => match *update {
150 Update::Block(block, ack) => {
151 let _ = self.update_peer_set(Some(block)).await;
152 ack.acknowledge();
153 self.reset_peer_update_timer();
154 }
155 Update::Tip { .. } => {}
156 },
157 }
158 Ok(())
159 }
160
161 #[instrument(
163 skip_all,
164 fields(
165 block.height = block.as_ref().map(|b| tracing::field::display(b.height())),
166 ),
167 err,
168 )]
169 async fn update_peer_set(&mut self, block: Option<Block>) -> eyre::Result<()> {
170 if let Some(block) = &block
171 && let Err(reason) = self.executor.subscribe_finalized(block.height()).await
172 {
173 warn!(
174 %reason,
175 "unable to clarify whether the finalized block was already \
176 forwarded to execution layer; will try to read validator \
177 config contract, but it will likely fail",
178 );
179 }
180
181 let maybe_latest_finalized_header = self.read_highest_finalized_header();
182 let reference_timestamp = match &block {
183 Some(block) => maybe_latest_finalized_header.ok().map_or_else(
184 || block.timestamp(),
185 |header| header.timestamp().max(block.timestamp()),
186 ),
187 None => maybe_latest_finalized_header
188 .wrap_err("could not determine a timestamp to determine peer behavior")?
189 .timestamp(),
190 };
191
192 if self
197 .execution_node
198 .chain_spec()
199 .is_t2_active_at_timestamp(reference_timestamp)
200 || self.last_tracked_peer_set.is_none()
201 {
202 self.refresh_peers()
203 .await
204 .wrap_err("failed refreshing peer set")?;
205 } else if let Some(block) = block {
206 let height = block.number();
207 if let Some(peers) = read_peer_set_if_boundary(
208 &self.context,
209 &self.epoch_strategy,
210 &self.execution_node,
211 block,
212 )
213 .await
214 {
215 self.track_or_overwrite(height, peers).await;
216 }
217 }
218 Ok(())
219 }
220
221 #[instrument(skip_all, err)]
224 async fn refresh_peers(&mut self) -> eyre::Result<()> {
225 let highest_finalized = self
238 .execution_node
239 .provider
240 .finalized_block_number()
241 .wrap_err("unable to read highest finalized block from execution layer")?
242 .unwrap_or(self.last_finalized_height.get())
243 .max(self.last_finalized_height.get());
244
245 if self
247 .last_tracked_peer_set
248 .as_ref()
249 .is_some_and(|tracked| tracked.height >= highest_finalized)
250 {
251 return Ok(());
252 }
253
254 let epoch_info = self
255 .epoch_strategy
256 .containing(Height::new(highest_finalized))
257 .expect("epoch strategy covers all heights");
258
259 let latest_boundary = if epoch_info.last().get() == highest_finalized {
264 highest_finalized
265 } else {
266 epoch_info
267 .epoch()
268 .previous()
269 .map_or_else(Height::zero, |prev| {
270 self.epoch_strategy
271 .last(prev)
272 .expect("epoch strategy covers all epochs")
273 })
274 .get()
275 };
276
277 let latest_boundary_header = read_header_at_height(&self.execution_node, latest_boundary)
278 .wrap_err("failed reading latest boundary header")?;
279 let highest_finalized_header =
280 read_header_at_height(&self.execution_node, highest_finalized)
281 .wrap_err("failed reading highest finalized header")?;
282
283 let onchain_outcome =
284 OnchainDkgOutcome::read(&mut latest_boundary_header.extra_data().as_ref())
285 .wrap_err_with(|| {
286 format!(
287 "boundary block at `{latest_boundary}` did not contain a valid DKG outcome"
288 )
289 })?;
290
291 let peers_as_per_dkg = ordered::Set::from_iter_dedup(
292 onchain_outcome
293 .players()
294 .iter()
295 .cloned()
296 .chain(onchain_outcome.next_players().iter().cloned()),
297 );
298 let peers = read_active_and_known_peers_at_block_hash(
299 &self.execution_node,
300 &peers_as_per_dkg,
301 highest_finalized_header.hash_slow(),
302 )
303 .wrap_err("unable to read initial peer set from execution layer")?;
304
305 self.track_or_overwrite(highest_finalized_header.number(), peers)
306 .await;
307
308 Ok(())
309 }
310
311 async fn track_or_overwrite(
312 &mut self,
313 height: u64,
314 peers: ordered::Map<PublicKey, commonware_p2p::Address>,
315 ) {
316 if let Some(tracked) = &self.last_tracked_peer_set {
317 if peers.keys() == tracked.peers.keys() {
319 if peers.values() != tracked.peers.values() {
320 self.oracle.overwrite(peers.clone()).await;
321 }
322 } else {
324 self.oracle.track(height, peers.clone()).await;
325 }
326 } else {
327 self.oracle.track(height, peers.clone()).await;
328 }
329
330 self.last_tracked_peer_set
334 .replace(LastTrackedPeerSet { height, peers });
335
336 if let Some(tracked) = &self.last_tracked_peer_set {
337 self.peers.set(tracked.peers.len() as i64);
338 }
339
340 debug!(
341 last_tracked_peer_set = ?self.last_tracked_peer_set.as_ref().expect("just set it"),
342 "latest tracked peerset",
343 );
344 }
345
346 fn reset_peer_update_timer(&mut self) {
347 self.peer_update_timer = Box::pin(
350 self.context.sleep(
351 self.last_tracked_peer_set
352 .as_ref()
353 .map_or(BOOTSTRAP_UPDATE_INTERVAL, |_| HEARTBEAT_UPDATE_INTERVAL),
354 ),
355 );
356 }
357 #[instrument(skip_all, fields(height), err)]
358 fn read_highest_finalized_header(&self) -> eyre::Result<TempoHeader> {
359 let highest_finalized = match self.execution_node.provider.finalized_block_hash() {
360 Ok(Some(highest_finalized)) => Ok(highest_finalized),
361 Ok(None) if self.last_finalized_height == Height::zero() => {
362 Ok(self.execution_node.chain_spec().genesis_hash())
363 }
364 Ok(None) => Err(eyre::eyre!(
365 "execution layer has no record of any finalization hashes"
366 )),
367 Err(err) => Err(eyre::Report::new(err)),
368 }
369 .wrap_err("failed reading latest finalizhed hash from execution layer")?;
370 self.execution_node
371 .provider
372 .header_by_hash_or_number(highest_finalized.into())
373 .map_err(eyre::Report::new)
374 .and_then(|h| {
375 h.ok_or_eyre(
376 "execution layer did not have the header for the advertised finalized hash",
377 )
378 })
379 .wrap_err_with(|| format!("failed reading header for hash `{highest_finalized}`"))
380 }
381}
382
383#[derive(Debug)]
384struct LastTrackedPeerSet {
385 height: u64,
386 peers: ordered::Map<PublicKey, commonware_p2p::Address>,
387}
388
389#[instrument(skip_all, fields(height), err)]
390fn read_header_at_height(execution_node: &TempoFullNode, height: u64) -> eyre::Result<TempoHeader> {
391 execution_node
392 .provider
393 .header_by_number(height)
394 .map_err(eyre::Report::new)
395 .and_then(|h| h.ok_or_eyre("execution layer did not have a header at the requested height"))
396 .wrap_err_with(|| format!("failed reading header at height `{height}`"))
397}
398
399async fn read_peer_set_if_boundary(
400 context: &impl commonware_runtime::Clock,
401 epoch_strategy: &FixedEpocher,
402 node: &TempoFullNode,
403 block: Block,
404) -> Option<ordered::Map<PublicKey, Address>> {
405 let mut attempts = 0;
406 const MIN_RETRY: Duration = Duration::from_secs(1);
407 const MAX_RETRY: Duration = Duration::from_secs(30);
408
409 if epoch_strategy
410 .containing(block.height())
411 .expect("valid for all heights")
412 .last()
413 != block.height()
414 {
415 return None;
416 }
417
418 let onchain_outcome = OnchainDkgOutcome::read(&mut block.header().extra_data().as_ref())
419 .expect("invariant: boundary blocks must contain DKG outcome");
420
421 let peers_as_per_dkg = ordered::Set::from_iter_dedup(
422 onchain_outcome
423 .players()
424 .iter()
425 .cloned()
426 .chain(onchain_outcome.next_players().iter().cloned()),
427 );
428
429 loop {
430 attempts += 1;
431 if let Ok(peers) =
432 read_active_and_known_peers_at_block_hash_v1(node, &peers_as_per_dkg, block.hash())
433 {
434 return Some(peers);
435 }
436
437 let retry_after = MIN_RETRY.saturating_mul(attempts).min(MAX_RETRY);
438 let is_syncing = node.network.is_syncing();
439 let best_finalized = node.provider.finalized_block_number().ok().flatten();
440 let blocks_behind = best_finalized
441 .as_ref()
442 .map(|best| block.height().get().saturating_sub(*best));
443 tracing::warn_span!("read_peer_set_if_boundary").in_scope(|| {
444 warn!(
445 attempts,
446 retry_after = %tempo_telemetry_util::display_duration(retry_after),
447 is_syncing,
448 best_finalized = %tempo_telemetry_util::display_option(&best_finalized),
449 target_height = %block.height(),
450 blocks_behind = %tempo_telemetry_util::display_option(&blocks_behind),
451 "reading validator config from contract failed; will retry",
452 );
453 });
454 context.sleep(retry_after).await;
455 }
456}