tempo_commonware_node/peer_manager/
actor.rs1use std::{pin::Pin, sync::Arc, time::Duration};
2
3use alloy_consensus::{BlockHeader as _, Sealable as _};
4use commonware_codec::ReadExt as _;
5use commonware_consensus::{
6 marshal::Update,
7 types::{Epocher, FixedEpocher, Height},
8};
9use commonware_cryptography::ed25519::PublicKey;
10use commonware_p2p::{AddressableManager, Provider};
11use commonware_runtime::{Clock, ContextCell, Metrics, Spawner, spawn_cell};
12use commonware_utils::{Acknowledgement, ordered};
13use eyre::{OptionExt as _, WrapErr as _};
14use futures::{StreamExt as _, channel::mpsc};
15use prometheus_client::metrics::gauge::Gauge;
16use reth_provider::{BlockIdReader as _, HeaderProvider as _};
17use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
18use tempo_node::TempoFullNode;
19use tempo_primitives::TempoHeader;
20use tracing::{Span, debug, error, info_span, instrument};
21
22use crate::validators::read_active_and_known_peers_at_block_hash;
23
24const BOOTSTRAP_UPDATE_INTERVAL: Duration = Duration::from_secs(5);
27
28const HEARTBEAT_UPDATE_INTERVAL: Duration = Duration::from_secs(30);
31
32use super::ingress::{Message, MessageWithCause};
33
34pub(crate) struct Actor<TContext, TPeerManager>
35where
36 TPeerManager: AddressableManager<PublicKey = PublicKey>,
37{
38 context: ContextCell<TContext>,
39
40 oracle: TPeerManager,
41 execution_node: Arc<TempoFullNode>,
42 epoch_strategy: FixedEpocher,
43 last_finalized_height: Height,
44 mailbox: mpsc::UnboundedReceiver<MessageWithCause>,
45
46 peers: Gauge,
47
48 last_tracked_peer_set: Option<LastTrackedPeerSet>,
49
50 peer_update_timer: Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
51}
52
53impl<TContext, TPeerManager> Actor<TContext, TPeerManager>
54where
55 TContext: Clock + Metrics + Spawner,
56 TPeerManager: AddressableManager<PublicKey = PublicKey>,
57{
58 pub(super) fn new(
59 context: TContext,
60 super::Config {
61 oracle,
62 execution_node,
63 epoch_strategy,
64 last_finalized_height,
65 }: super::Config<TPeerManager>,
66 mailbox: mpsc::UnboundedReceiver<MessageWithCause>,
67 ) -> Self {
68 let peers = Gauge::default();
69 context.register(
70 "peers",
71 "how many peers are registered overall for the latest epoch",
72 peers.clone(),
73 );
74 let context = ContextCell::new(context);
75 let peer_update_timer = Box::pin(context.sleep(BOOTSTRAP_UPDATE_INTERVAL));
76 Self {
77 context,
78 oracle,
79 execution_node,
80 epoch_strategy,
81 last_finalized_height,
82 mailbox,
83 peers,
84 last_tracked_peer_set: None,
85
86 peer_update_timer,
87 }
88 }
89
90 async fn run(mut self) {
91 let reason = 'event_loop: loop {
92 tokio::select!(
93 biased;
94 msg = self.mailbox.next() => {
95 match msg {
96 None => break 'event_loop eyre::eyre!("mailbox closed unexpectedly"),
97
98 Some(msg) => {
99 if let Err(error) = self.handle_message(msg.cause, msg.message).await {
100 break 'event_loop error;
101 }
102 }
103 }
104 }
105 _ = &mut self.peer_update_timer => {
108 let _ = self.refresh_peers().await;
109 self.reset_peer_update_timer();
110 }
111 )
112 };
113 info_span!("peer_manager").in_scope(|| error!(%reason,"agent shutting down"));
114 }
115 pub(crate) fn start(mut self) -> commonware_runtime::Handle<()> {
116 spawn_cell!(self.context, self.run())
117 }
118
119 #[instrument(parent = &cause, skip_all)]
120 async fn handle_message(&mut self, cause: Span, message: Message) -> eyre::Result<()> {
121 match message {
122 Message::Track { id, peers } => {
123 AddressableManager::track(&mut self.oracle, id, peers).await;
124 }
125 Message::Overwrite { peers } => {
126 AddressableManager::overwrite(&mut self.oracle, peers).await;
127 }
128 Message::PeerSet { id, response } => {
129 let result = Provider::peer_set(&mut self.oracle, id).await;
130 let _ = response.send(result);
131 }
132 Message::Subscribe { response } => {
133 let receiver = Provider::subscribe(&mut self.oracle).await;
134 let _ = response.send(receiver);
135 }
136 Message::Finalized(update) => match *update {
137 Update::Block(_, ack) => {
138 let _ = self.refresh_peers().await;
139 ack.acknowledge();
140 self.reset_peer_update_timer();
141 }
142 Update::Tip { .. } => {}
143 },
144 }
145 Ok(())
146 }
147
148 #[instrument(skip_all, err)]
151 async fn refresh_peers(&mut self) -> eyre::Result<()> {
152 let highest_finalized = self
165 .execution_node
166 .provider
167 .finalized_block_number()
168 .wrap_err("unable to read highest finalized block from execution layer")?
169 .unwrap_or(self.last_finalized_height.get())
170 .max(self.last_finalized_height.get());
171
172 if self
174 .last_tracked_peer_set
175 .as_ref()
176 .is_some_and(|tracked| tracked.height >= highest_finalized)
177 {
178 return Ok(());
179 }
180
181 let epoch_info = self
182 .epoch_strategy
183 .containing(Height::new(highest_finalized))
184 .expect("epoch strategy covers all heights");
185
186 let latest_boundary = if epoch_info.last().get() == highest_finalized {
191 highest_finalized
192 } else {
193 epoch_info
194 .epoch()
195 .previous()
196 .map_or_else(Height::zero, |prev| {
197 self.epoch_strategy
198 .last(prev)
199 .expect("epoch strategy covers all epochs")
200 })
201 .get()
202 };
203
204 let latest_boundary_header = read_header_at_height(&self.execution_node, latest_boundary)
205 .wrap_err("failed reading latest boundary header")?;
206 let highest_finalized_header =
207 read_header_at_height(&self.execution_node, highest_finalized)
208 .wrap_err("failed reading highest finalized header")?;
209
210 let onchain_outcome =
211 OnchainDkgOutcome::read(&mut latest_boundary_header.extra_data().as_ref())
212 .wrap_err_with(|| {
213 format!(
214 "boundary block at `{latest_boundary}` did not contain a valid DKG outcome"
215 )
216 })?;
217
218 let peers_as_per_dkg = ordered::Set::from_iter_dedup(
219 onchain_outcome
220 .players()
221 .iter()
222 .cloned()
223 .chain(onchain_outcome.next_players().iter().cloned()),
224 );
225 let peers = read_active_and_known_peers_at_block_hash(
226 &self.execution_node,
227 &peers_as_per_dkg,
228 highest_finalized_header.hash_slow(),
229 )
230 .wrap_err("unable to read initial peer set from execution layer")?;
231
232 self.track_or_overwrite(highest_finalized_header.number(), peers)
233 .await;
234
235 Ok(())
236 }
237
238 async fn track_or_overwrite(
239 &mut self,
240 height: u64,
241 peers: ordered::Map<PublicKey, commonware_p2p::Address>,
242 ) {
243 if let Some(tracked) = &self.last_tracked_peer_set {
244 if peers.keys() == tracked.peers.keys() {
246 if peers.values() != tracked.peers.values() {
247 self.oracle.overwrite(peers.clone()).await;
248 }
249 } else {
251 self.oracle.track(height, peers.clone()).await;
252 }
253 } else {
254 self.oracle.track(height, peers.clone()).await;
255 }
256
257 self.last_tracked_peer_set
261 .replace(LastTrackedPeerSet { height, peers });
262
263 if let Some(tracked) = &self.last_tracked_peer_set {
264 self.peers.set(tracked.peers.len() as i64);
265 }
266
267 debug!(
268 last_tracked_peer_set = ?self.last_tracked_peer_set.as_ref().expect("just set it"),
269 "latest tracked peerset",
270 );
271 }
272
273 fn reset_peer_update_timer(&mut self) {
274 self.peer_update_timer = Box::pin(
277 self.context.sleep(
278 self.last_tracked_peer_set
279 .as_ref()
280 .map_or(BOOTSTRAP_UPDATE_INTERVAL, |_| HEARTBEAT_UPDATE_INTERVAL),
281 ),
282 );
283 }
284}
285
286#[derive(Debug)]
287struct LastTrackedPeerSet {
288 height: u64,
289 peers: ordered::Map<PublicKey, commonware_p2p::Address>,
290}
291
292#[instrument(skip_all, fields(height), err)]
293fn read_header_at_height(execution_node: &TempoFullNode, height: u64) -> eyre::Result<TempoHeader> {
294 execution_node
295 .provider
296 .header_by_number(height)
297 .map_err(eyre::Report::new)
298 .and_then(|h| h.ok_or_eyre("execution layer did not have a header at the requested height"))
299 .wrap_err_with(|| format!("failed reading header at height `{height}`"))
300}