1use std::{
2 collections::{HashMap, hash_map::Entry},
3 pin::Pin,
4 sync::Arc,
5 time::Duration,
6};
7
8use alloy_consensus::{BlockHeader as _, Sealable as _};
9use alloy_primitives::B256;
10use commonware_codec::ReadExt as _;
11use commonware_consensus::{
12 marshal::Update,
13 types::{Epocher, FixedEpocher, Height},
14};
15use commonware_cryptography::ed25519::PublicKey;
16use commonware_p2p::{Address, AddressableManager, AddressableTrackedPeers, Provider};
17use commonware_runtime::{Clock, ContextCell, Metrics, Spawner, spawn_cell};
18use commonware_utils::{Acknowledgement, ordered};
19use eyre::{OptionExt as _, WrapErr as _};
20use futures::{StreamExt as _, channel::mpsc};
21use prometheus_client::metrics::gauge::Gauge;
22use reth_provider::{BlockIdReader as _, HeaderProvider as _};
23use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
24use tempo_node::TempoFullNode;
25use tempo_precompiles::validator_config_v2::ValidatorConfigV2;
26use tempo_primitives::TempoHeader;
27use tracing::{Span, debug, error, info_span, instrument, warn};
28
29use crate::{
30 utils::public_key_to_b256,
31 validators::{DecodedValidatorV2, ExecutionNode, read_validator_config_at_block_hash},
32};
33
34const BOOTSTRAP_UPDATE_INTERVAL: Duration = Duration::from_secs(5);
37
38const HEARTBEAT_UPDATE_INTERVAL: Duration = Duration::from_secs(30);
41
42use super::ingress::{Message, MessageWithCause};
43
44pub(crate) struct Actor<TContext, TPeerManager>
45where
46 TPeerManager: AddressableManager<PublicKey = PublicKey>,
47{
48 context: ContextCell<TContext>,
49
50 oracle: TPeerManager,
51 execution_node: Arc<TempoFullNode>,
52 epoch_strategy: FixedEpocher,
53 last_finalized_height: Height,
54 mailbox: mpsc::UnboundedReceiver<MessageWithCause>,
55
56 peers: Gauge,
57
58 last_tracked_peer_set: Option<LastTrackedPeerSet>,
59
60 peer_update_timer: Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
61}
62
63impl<TContext, TPeerManager> Actor<TContext, TPeerManager>
64where
65 TContext: Clock + Metrics + Spawner,
66 TPeerManager: AddressableManager<PublicKey = PublicKey>,
67{
68 pub(super) fn new(
69 context: TContext,
70 super::Config {
71 oracle,
72 execution_node,
73 epoch_strategy,
74 last_finalized_height,
75 }: super::Config<TPeerManager>,
76 mailbox: mpsc::UnboundedReceiver<MessageWithCause>,
77 ) -> Self {
78 let peers = Gauge::default();
79 context.register(
80 "peers",
81 "how many peers are registered overall for the latest epoch",
82 peers.clone(),
83 );
84 let context = ContextCell::new(context);
85 let peer_update_timer = Box::pin(context.sleep(BOOTSTRAP_UPDATE_INTERVAL));
86 Self {
87 context,
88 oracle,
89 execution_node,
90 epoch_strategy,
91 last_finalized_height,
92 mailbox,
93 peers,
94 last_tracked_peer_set: None,
95
96 peer_update_timer,
97 }
98 }
99
100 async fn run(mut self) {
101 let reason = 'event_loop: loop {
102 tokio::select!(
103 biased;
104 msg = self.mailbox.next() => {
105 match msg {
106 None => break 'event_loop eyre::eyre!("mailbox closed unexpectedly"),
107
108 Some(msg) => {
109 if let Err(error) = self.handle_message(msg.cause, msg.message).await {
110 break 'event_loop error;
111 }
112 }
113 }
114 }
115 _ = &mut self.peer_update_timer => {
118 let _ = self.refresh_peers().await;
119 self.reset_peer_update_timer();
120 }
121 )
122 };
123 info_span!("peer_manager").in_scope(|| error!(%reason,"agent shutting down"));
124 }
125 pub(crate) fn start(mut self) -> commonware_runtime::Handle<()> {
126 spawn_cell!(self.context, self.run())
127 }
128
129 #[instrument(parent = &cause, skip_all)]
130 async fn handle_message(&mut self, cause: Span, message: Message) -> eyre::Result<()> {
131 match message {
132 Message::Track { id, peers } => {
133 AddressableManager::track(&mut self.oracle, id, peers).await;
134 }
135 Message::Overwrite { peers } => {
136 AddressableManager::overwrite(&mut self.oracle, peers).await;
137 }
138 Message::PeerSet { id, response } => {
139 let result = Provider::peer_set(&mut self.oracle, id).await;
140 let _ = response.send(result);
141 }
142 Message::Subscribe { response } => {
143 let receiver = Provider::subscribe(&mut self.oracle).await;
144 let _ = response.send(receiver);
145 }
146 Message::Finalized(update) => match *update {
147 Update::Block(_, ack) => {
148 let _ = self.refresh_peers().await;
149 ack.acknowledge();
150 self.reset_peer_update_timer();
151 }
152 Update::Tip { .. } => {}
153 },
154 }
155 Ok(())
156 }
157
158 #[instrument(skip_all, err)]
161 async fn refresh_peers(&mut self) -> eyre::Result<()> {
162 let highest_finalized = self
175 .execution_node
176 .provider
177 .finalized_block_number()
178 .wrap_err("unable to read highest finalized block from execution layer")?
179 .unwrap_or(self.last_finalized_height.get())
180 .max(self.last_finalized_height.get());
181
182 if self
184 .last_tracked_peer_set
185 .as_ref()
186 .is_some_and(|tracked| tracked.height >= highest_finalized)
187 {
188 return Ok(());
189 }
190
191 let epoch_info = self
192 .epoch_strategy
193 .containing(Height::new(highest_finalized))
194 .expect("epoch strategy covers all heights");
195
196 let latest_boundary = if epoch_info.last().get() == highest_finalized {
201 highest_finalized
202 } else {
203 epoch_info
204 .epoch()
205 .previous()
206 .map_or_else(Height::zero, |prev| {
207 self.epoch_strategy
208 .last(prev)
209 .expect("epoch strategy covers all epochs")
210 })
211 .get()
212 };
213
214 let latest_boundary_header = read_header_at_height(&self.execution_node, latest_boundary)
215 .wrap_err("failed reading latest boundary header")?;
216 let highest_finalized_header =
217 read_header_at_height(&self.execution_node, highest_finalized)
218 .wrap_err("failed reading highest finalized header")?;
219
220 let onchain_outcome =
221 OnchainDkgOutcome::read(&mut latest_boundary_header.extra_data().as_ref())
222 .wrap_err_with(|| {
223 format!(
224 "boundary block at `{latest_boundary}` did not contain a valid DKG outcome"
225 )
226 })?;
227
228 let peers = PeersBuilder::with_dkg_outcome(&onchain_outcome)
229 .resolve_at_hash(
230 self.execution_node.as_ref(),
231 highest_finalized_header.hash_slow(),
232 )
233 .wrap_err("failed reading peer set from execution layer")?;
234
235 debug!(
236 boundary.height = latest_boundary_header.number(),
237 boundary.hash = %latest_boundary_header.hash_slow(),
238 highest_finalized.height = highest_finalized_header.number(),
239 highest_finalized.hash = %highest_finalized_header.hash_slow(),
240 ?peers.primary,
241 ?peers.secondary,
242 "read active peers from DKG outcome in latest available \
243 boundary header and resolved p2p addresses against validator \
244 config contract"
245 );
246
247 self.track_or_overwrite(highest_finalized_header.number(), peers)
248 .await;
249
250 Ok(())
251 }
252
253 async fn track_or_overwrite(&mut self, height: u64, peers: Peers) {
254 if let Some(tracked) = &self.last_tracked_peer_set {
255 match peers.what_has_changed_compared_to(&tracked.peers) {
256 WhatHasChanged::Nothing => {}
257 WhatHasChanged::Addresses => self.oracle.overwrite(peers.to_flat_map()).await,
258 WhatHasChanged::Peers => self.oracle.track(height, peers.clone()).await,
259 }
260 } else {
261 self.oracle.track(height, peers.clone()).await;
262 }
263
264 self.last_tracked_peer_set
268 .replace(LastTrackedPeerSet { height, peers });
269
270 if let Some(tracked) = &self.last_tracked_peer_set {
271 self.peers.set(tracked.peers.len() as i64);
272 }
273
274 debug!(
275 last_tracked_peer_set = ?self.last_tracked_peer_set.as_ref().expect("just set it"),
276 "latest tracked peerset",
277 );
278 }
279
280 fn reset_peer_update_timer(&mut self) {
281 self.peer_update_timer = Box::pin(
284 self.context.sleep(
285 self.last_tracked_peer_set
286 .as_ref()
287 .map_or(BOOTSTRAP_UPDATE_INTERVAL, |_| HEARTBEAT_UPDATE_INTERVAL),
288 ),
289 );
290 }
291}
292
293enum WhatHasChanged {
294 Nothing,
295 Addresses,
296 Peers,
297}
298
299#[derive(Clone, Debug)]
300struct Peers {
301 primary: ordered::Map<PublicKey, Address>,
302 secondary: ordered::Map<PublicKey, Address>,
303}
304
305impl Peers {
306 fn what_has_changed_compared_to(&self, old: &Self) -> WhatHasChanged {
307 if old.primary.keys() == self.primary.keys()
308 && old.secondary.keys() == self.secondary.keys()
309 {
310 if old.primary.values() == self.primary.values()
311 && old.secondary.values() == self.secondary.values()
312 {
313 WhatHasChanged::Nothing
314 } else {
315 WhatHasChanged::Addresses
316 }
317 } else {
318 WhatHasChanged::Peers
319 }
320 }
321
322 fn len(&self) -> usize {
323 self.primary.len().saturating_add(self.secondary.len())
324 }
325
326 fn to_flat_map(&self) -> ordered::Map<PublicKey, Address> {
327 ordered::Map::from_iter_dedup(
328 self.primary
329 .iter_pairs()
330 .chain(self.secondary.iter_pairs())
331 .map(|(key, val)| (key.clone(), val.clone())),
332 )
333 }
334}
335
336impl From<Peers> for AddressableTrackedPeers<PublicKey> {
337 fn from(value: Peers) -> Self {
338 Self {
339 primary: value.primary,
340 secondary: value.secondary,
341 }
342 }
343}
344
345struct PeersBuilder {
346 primary: ordered::Set<PublicKey>,
347 secondary: ordered::Set<PublicKey>,
348}
349
350impl PeersBuilder {
351 fn with_dkg_outcome(outcome: &OnchainDkgOutcome) -> Self {
352 let primary = outcome.players().clone();
353 let secondary = ordered::Set::from_iter_dedup(
354 outcome
355 .next_players()
356 .iter()
357 .filter(|key| primary.position(key).is_none())
360 .cloned(),
361 );
362 Self { primary, secondary }
363 }
364
365 #[instrument(skip_all, fields(%hash))]
366 fn resolve_at_hash(self, node: impl ExecutionNode, hash: B256) -> eyre::Result<Peers> {
367 let Self { primary, secondary } = self;
368 let (_, _, (primary, secondary)) = read_validator_config_at_block_hash(
369 node,
370 hash,
371 |config: &ValidatorConfigV2| {
372 let mut active_validators = HashMap::new();
373 for (i, raw) in config
374 .get_active_validators()
375 .wrap_err("failed reading active validator set from contract")?
376 .into_iter()
377 .enumerate()
378 {
379 if let Ok(decoded) =
380 DecodedValidatorV2::decode_from_contract(raw).inspect_err(|error| {
381 warn!(
382 %error,
383 position = i,
384 "failed decoding active validator in contract",
385 )
386 })
387 && active_validators
388 .insert(decoded.public_key().clone(), decoded.to_p2p_address())
389 .is_some()
390 {
391 warn!(
392 duplicate = %decoded.public_key(),
393 "found duplicate public keys",
394 );
395 }
396 }
397 debug!(
398 ?active_validators,
399 "read active validators from contract, now extending with \
400 historic peers that are still in the peer set but no \
401 longer marked active",
402 );
403 let primary = ordered::Map::from_iter_dedup(primary.into_iter().map(|peer| {
404 active_validators.remove_entry(&peer).unwrap_or_else(|| {
405 let decoded = config
406 .validator_by_public_key(public_key_to_b256(&peer))
407 .map_err(eyre::Report::new)
408 .and_then(DecodedValidatorV2::decode_from_contract)
409 .wrap_err_with(|| {
410 format!(
411 "failed to read DKG peer `{peer}` from validator config contract"
412 )
413 })
414 .expect(
415 "invariant: DKG peers must have an entry in the \
416 smart contract and be well formed",
417 );
418 (decoded.public_key().clone(), decoded.to_p2p_address())
419 })
420 }));
421
422 for peer in secondary {
423 if let Entry::Vacant(slot) = active_validators.entry(peer.clone()) {
424 let decoded = config
425 .validator_by_public_key(public_key_to_b256(&peer))
426 .map_err(eyre::Report::new)
427 .and_then(DecodedValidatorV2::decode_from_contract)
428 .wrap_err_with(|| {
429 format!(
430 "failed to read next DKG peer `{peer}` from validator config contract"
431 )
432 })
433 .expect(
434 "invariant: next DKG peers must have an entry in the \
435 smart contract and be well formed",
436 );
437 slot.insert_entry(decoded.to_p2p_address());
438 }
439 }
440
441 let secondary = ordered::Map::from_iter_dedup(active_validators.into_iter());
442
443 Ok((primary, secondary))
444 },
445 )?;
446 Ok(Peers { primary, secondary })
447 }
448}
449
450#[derive(Debug)]
451struct LastTrackedPeerSet {
452 height: u64,
453 peers: Peers,
454}
455
456#[instrument(skip_all, fields(height), err)]
457fn read_header_at_height(execution_node: &TempoFullNode, height: u64) -> eyre::Result<TempoHeader> {
458 execution_node
459 .provider
460 .header_by_number(height)
461 .map_err(eyre::Report::new)
462 .and_then(|h| h.ok_or_eyre("execution layer did not have a header at the requested height"))
463 .wrap_err_with(|| format!("failed reading header at height `{height}`"))
464}
465
466#[cfg(test)]
467mod tests {
468 use std::{
469 collections::HashMap,
470 net::{IpAddr, Ipv4Addr, SocketAddr},
471 };
472
473 use alloy_consensus::Header;
474 use alloy_primitives::{Address as AlloyAddress, B256, Keccak256, U256};
475 use commonware_codec::Encode as _;
476 use commonware_consensus::types::Epoch;
477 use commonware_cryptography::{
478 Signer as _,
479 bls12381::{
480 dkg,
481 primitives::{sharing::Mode, variant::MinSig},
482 },
483 ed25519::PrivateKey,
484 };
485 use commonware_utils::{N3f1, TryFromIterator as _};
486 use rand_08::SeedableRng as _;
487 use reth_ethereum::evm::revm::{State, database::StateProviderDatabase};
488 use reth_node_builder::ConfigureEvm as _;
489 use reth_provider::{
490 StateProviderBox,
491 test_utils::{ExtendedAccount, MockEthProvider},
492 };
493 use tempo_node::evm::{TempoEvmConfig, evm::TempoEvm};
494 use tempo_precompiles::{
495 storage::{StorageCtx, hashmap::HashMapStorageProvider},
496 validator_config_v2::{IValidatorConfigV2, VALIDATOR_NS_ADD},
497 };
498
499 use super::*;
500
501 const VALIDATOR_CONFIG_V2_ADDRESS: AlloyAddress =
502 alloy_primitives::address!("0xCCCCCCCC00000000000000000000000000000001");
503
504 struct TestExecutionNode {
505 hash: B256,
506 height: u64,
507 provider: MockEthProvider,
508 }
509
510 impl ExecutionNode for TestExecutionNode {
511 fn header(&self, block_hash: B256) -> eyre::Result<TempoHeader> {
512 assert_eq!(block_hash, self.hash);
513 Ok(TempoHeader {
514 general_gas_limit: 30_000_000,
515 inner: Header {
516 number: self.height,
517 timestamp: 1,
518 gas_limit: 30_000_000,
519 base_fee_per_gas: Some(1),
520 ..Default::default()
521 },
522 ..Default::default()
523 })
524 }
525
526 fn state_by_block_hash(&self, block_hash: B256) -> eyre::Result<StateProviderBox> {
527 assert_eq!(block_hash, self.hash);
528 Ok(Box::new(self.provider.clone()))
529 }
530
531 fn evm_for_block(
532 &self,
533 db: State<StateProviderDatabase<StateProviderBox>>,
534 header: &TempoHeader,
535 ) -> eyre::Result<TempoEvm<State<StateProviderDatabase<StateProviderBox>>>> {
536 TempoEvmConfig::moderato()
537 .evm_for_block(db, header)
538 .map_err(eyre::Report::new)
539 }
540 }
541
542 struct ValidatorFixture {
543 private_key: PrivateKey,
544 public_key: PublicKey,
545 validator_address: AlloyAddress,
546 ingress: String,
547 egress: String,
548 p2p_address: Address,
549 }
550
551 fn peer(seed: u8) -> ValidatorFixture {
552 let private_key = PrivateKey::from_seed(u64::from(seed));
553 let public_key = private_key.public_key();
554 let validator_address = AlloyAddress::from([seed; 20]);
555 let egress_ip = IpAddr::V4(Ipv4Addr::new(192, 168, 1, seed));
556 let ingress_socket = SocketAddr::new(egress_ip, 8000 + u16::from(seed));
557 let p2p_address = Address::Asymmetric {
558 ingress: commonware_p2p::Ingress::Socket(ingress_socket),
559 egress: SocketAddr::new(egress_ip, 0),
560 };
561
562 ValidatorFixture {
563 private_key,
564 public_key,
565 validator_address,
566 ingress: ingress_socket.to_string(),
567 egress: egress_ip.to_string(),
568 p2p_address,
569 }
570 }
571
572 impl ValidatorFixture {
573 fn add_validator_call(&self) -> IValidatorConfigV2::addValidatorCall {
574 let mut hasher = Keccak256::new();
575 hasher.update(1u64.to_be_bytes());
576 hasher.update(VALIDATOR_CONFIG_V2_ADDRESS.as_slice());
577 hasher.update(self.validator_address.as_slice());
578 hasher.update([self.ingress.len() as u8]);
579 hasher.update(self.ingress.as_bytes());
580 hasher.update([self.egress.len() as u8]);
581 hasher.update(self.egress.as_bytes());
582 hasher.update(self.validator_address.as_slice());
583 let message = hasher.finalize();
584 let signature = self
585 .private_key
586 .sign(VALIDATOR_NS_ADD, message.as_slice())
587 .encode()
588 .to_vec();
589
590 IValidatorConfigV2::addValidatorCall {
591 validatorAddress: self.validator_address,
592 publicKey: public_key_to_b256(&self.public_key),
593 ingress: self.ingress.clone(),
594 egress: self.egress.clone(),
595 feeRecipient: self.validator_address,
596 signature: signature.into(),
597 }
598 }
599 }
600
601 fn execution_with_validators(
602 validators: &[ValidatorFixture],
603 ) -> eyre::Result<TestExecutionNode> {
604 let mut storage = HashMapStorageProvider::new(1);
605 let owner = AlloyAddress::from([0xAA; 20]);
606
607 StorageCtx::enter(&mut storage, || -> eyre::Result<()> {
608 let mut config = ValidatorConfigV2::new();
609 config.initialize(owner)?;
610 for validator in validators {
611 config.add_validator(owner, validator.add_validator_call())?;
612 }
613 Ok(())
614 })?;
615
616 let mut storage_by_account = HashMap::<AlloyAddress, Vec<(B256, U256)>>::new();
617 for (address, slot, value) in storage.into_storage() {
618 storage_by_account
619 .entry(address)
620 .or_default()
621 .push((B256::from(slot), value));
622 }
623 let provider = MockEthProvider::new();
624 for (address, storage) in storage_by_account {
625 provider.add_account(
626 address,
627 ExtendedAccount::new(0, U256::ZERO).extend_storage(storage),
628 );
629 }
630
631 Ok(TestExecutionNode {
632 hash: B256::from([0x42; 32]),
633 height: 7,
634 provider,
635 })
636 }
637
638 fn dkg_outcome(
639 players: impl IntoIterator<Item = PublicKey>,
640 next_players: impl IntoIterator<Item = PublicKey>,
641 ) -> eyre::Result<OnchainDkgOutcome> {
642 let mut rng = rand_08::rngs::StdRng::seed_from_u64(42);
643 let (output, _) = dkg::deal::<MinSig, _, N3f1>(
644 &mut rng,
645 Mode::NonZeroCounter,
646 ordered::Set::try_from_iter(players)?,
647 )?;
648
649 Ok(OnchainDkgOutcome {
650 epoch: Epoch::new(0),
651 output,
652 next_players: ordered::Set::try_from_iter(next_players)?,
653 is_next_full_dkg: false,
654 })
655 }
656
657 fn assert_peer(map: &ordered::Map<PublicKey, Address>, validator: &ValidatorFixture) {
658 assert_eq!(
659 map.get_value(&validator.public_key),
660 Some(&validator.p2p_address),
661 );
662 }
663
664 fn assert_no_peer(map: &ordered::Map<PublicKey, Address>, validator: &ValidatorFixture) {
665 assert!(map.get_value(&validator.public_key).is_none());
666 }
667
668 #[test]
669 fn resolve_at_hash_has_no_secondaries_when_players_are_next_players() -> eyre::Result<()> {
670 let execution = execution_with_validators(&[peer(1), peer(2)])?;
671 let outcome = dkg_outcome(
672 [peer(1).public_key, peer(2).public_key],
673 [peer(1).public_key, peer(2).public_key],
674 )?;
675 let peers =
676 PeersBuilder::with_dkg_outcome(&outcome).resolve_at_hash(&execution, execution.hash)?;
677
678 assert_eq!(peers.primary.len(), 2);
679 assert_eq!(peers.secondary.len(), 0);
680 assert_peer(&peers.primary, &peer(1));
681 assert_peer(&peers.primary, &peer(2));
682
683 Ok(())
684 }
685
686 #[test]
687 fn resolve_at_hash_keeps_dropped_player_primary() -> eyre::Result<()> {
688 let execution = execution_with_validators(&[peer(1), peer(2)])?;
689 let outcome = dkg_outcome(
690 [peer(1).public_key, peer(2).public_key],
691 [peer(1).public_key],
692 )?;
693 let peers =
694 PeersBuilder::with_dkg_outcome(&outcome).resolve_at_hash(&execution, execution.hash)?;
695
696 assert_eq!(peers.primary.len(), 2);
697 assert_eq!(peers.secondary.len(), 0);
698 assert_peer(&peers.primary, &peer(1));
699 assert_peer(&peers.primary, &peer(2));
700
701 Ok(())
702 }
703
704 #[test]
705 fn resolve_at_hash_adds_next_player_as_secondary() -> eyre::Result<()> {
706 let execution = execution_with_validators(&[peer(1), peer(2)])?;
707 let outcome = dkg_outcome(
708 [peer(1).public_key],
709 [peer(1).public_key, peer(2).public_key],
710 )?;
711 let peers =
712 PeersBuilder::with_dkg_outcome(&outcome).resolve_at_hash(&execution, execution.hash)?;
713
714 assert_eq!(peers.primary.len(), 1);
715 assert_eq!(peers.secondary.len(), 1);
716 assert_peer(&peers.primary, &peer(1));
717 assert_no_peer(&peers.secondary, &peer(1));
718 assert_peer(&peers.secondary, &peer(2));
719
720 Ok(())
721 }
722
723 #[test]
724 fn resolve_at_hash_adds_active_non_dkg_validator_as_secondary() -> eyre::Result<()> {
725 let execution = execution_with_validators(&[peer(1), peer(2)])?;
726 let outcome = dkg_outcome([peer(1).public_key], [peer(1).public_key])?;
727 let peers =
728 PeersBuilder::with_dkg_outcome(&outcome).resolve_at_hash(&execution, execution.hash)?;
729
730 assert_eq!(peers.primary.len(), 1);
731 assert_eq!(peers.secondary.len(), 1);
732 assert_peer(&peers.primary, &peer(1));
733 assert_no_peer(&peers.secondary, &peer(1));
734 assert_peer(&peers.secondary, &peer(2));
735
736 Ok(())
737 }
738}