Skip to main content

tempo_consensus/peer_manager/
actor.rs

1use std::{
2    collections::{HashMap, hash_map::Entry},
3    pin::Pin,
4    sync::Arc,
5    time::Duration,
6};
7
8use alloy_consensus::{BlockHeader as _, Sealable as _};
9use alloy_primitives::B256;
10use commonware_codec::ReadExt as _;
11use commonware_consensus::{
12    marshal::Update,
13    types::{Epocher, FixedEpocher, Height},
14};
15use commonware_cryptography::ed25519::PublicKey;
16use commonware_p2p::{Address, AddressableManager, AddressableTrackedPeers, Provider};
17use commonware_runtime::{Clock, ContextCell, Metrics, Spawner, spawn_cell};
18use commonware_utils::{Acknowledgement, ordered};
19use eyre::{OptionExt as _, WrapErr as _};
20use futures::{StreamExt as _, channel::mpsc};
21use prometheus_client::metrics::gauge::Gauge;
22use reth_provider::{BlockIdReader as _, HeaderProvider as _};
23use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
24use tempo_node::TempoFullNode;
25use tempo_precompiles::validator_config_v2::ValidatorConfigV2;
26use tempo_primitives::TempoHeader;
27use tracing::{Span, debug, error, info_span, instrument, warn};
28
29use crate::{
30    utils::public_key_to_b256,
31    validators::{DecodedValidatorV2, ExecutionNode, read_validator_config_at_block_hash},
32};
33
34/// The interval on which the peer set is update during bootstrapping.
35/// Aggressive timing to get started.
36const BOOTSTRAP_UPDATE_INTERVAL: Duration = Duration::from_secs(5);
37
38/// The interval on which peer sets are freshed during normal operation.
39/// Relaxed timing during normal operation.
40const HEARTBEAT_UPDATE_INTERVAL: Duration = Duration::from_secs(30);
41
42use super::ingress::{Message, MessageWithCause};
43
44pub(crate) struct Actor<TContext, TPeerManager>
45where
46    TPeerManager: AddressableManager<PublicKey = PublicKey>,
47{
48    context: ContextCell<TContext>,
49
50    oracle: TPeerManager,
51    execution_node: Arc<TempoFullNode>,
52    epoch_strategy: FixedEpocher,
53    last_finalized_height: Height,
54    mailbox: mpsc::UnboundedReceiver<MessageWithCause>,
55
56    peers: Gauge,
57
58    last_tracked_peer_set: Option<LastTrackedPeerSet>,
59
60    peer_update_timer: Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
61}
62
63impl<TContext, TPeerManager> Actor<TContext, TPeerManager>
64where
65    TContext: Clock + Metrics + Spawner,
66    TPeerManager: AddressableManager<PublicKey = PublicKey>,
67{
68    pub(super) fn new(
69        context: TContext,
70        super::Config {
71            oracle,
72            execution_node,
73            epoch_strategy,
74            last_finalized_height,
75        }: super::Config<TPeerManager>,
76        mailbox: mpsc::UnboundedReceiver<MessageWithCause>,
77    ) -> Self {
78        let peers = Gauge::default();
79        context.register(
80            "peers",
81            "how many peers are registered overall for the latest epoch",
82            peers.clone(),
83        );
84        let context = ContextCell::new(context);
85        let peer_update_timer = Box::pin(context.sleep(BOOTSTRAP_UPDATE_INTERVAL));
86        Self {
87            context,
88            oracle,
89            execution_node,
90            epoch_strategy,
91            last_finalized_height,
92            mailbox,
93            peers,
94            last_tracked_peer_set: None,
95
96            peer_update_timer,
97        }
98    }
99
100    async fn run(mut self) {
101        let reason = 'event_loop: loop {
102            tokio::select!(
103                biased;
104                msg = self.mailbox.next() => {
105                    match msg {
106                        None => break 'event_loop eyre::eyre!("mailbox closed unexpectedly"),
107
108                        Some(msg) => {
109                            if let Err(error) = self.handle_message(msg.cause, msg.message).await {
110                                break 'event_loop error;
111                            }
112                        }
113                    }
114                }
115                // Perform aggressive retries if no peer set is tracked yet.
116                // Otherwise just do it every minute.
117                _ = &mut self.peer_update_timer => {
118                    let _ = self.refresh_peers().await;
119                    self.reset_peer_update_timer();
120                }
121            )
122        };
123        info_span!("peer_manager").in_scope(|| error!(%reason,"agent shutting down"));
124    }
125    pub(crate) fn start(mut self) -> commonware_runtime::Handle<()> {
126        spawn_cell!(self.context, self.run())
127    }
128
129    #[instrument(parent = &cause, skip_all)]
130    async fn handle_message(&mut self, cause: Span, message: Message) -> eyre::Result<()> {
131        match message {
132            Message::Track { id, peers } => {
133                AddressableManager::track(&mut self.oracle, id, peers).await;
134            }
135            Message::Overwrite { peers } => {
136                AddressableManager::overwrite(&mut self.oracle, peers).await;
137            }
138            Message::PeerSet { id, response } => {
139                let result = Provider::peer_set(&mut self.oracle, id).await;
140                let _ = response.send(result);
141            }
142            Message::Subscribe { response } => {
143                let receiver = Provider::subscribe(&mut self.oracle).await;
144                let _ = response.send(receiver);
145            }
146            Message::Finalized(update) => match *update {
147                Update::Block(_, ack) => {
148                    let _ = self.refresh_peers().await;
149                    ack.acknowledge();
150                    self.reset_peer_update_timer();
151                }
152                Update::Tip { .. } => {}
153            },
154        }
155        Ok(())
156    }
157
158    /// Reads the peers given the latest finalized state.
159    /// and finalized state.
160    #[instrument(skip_all, err)]
161    async fn refresh_peers(&mut self) -> eyre::Result<()> {
162        // Always take whatever is higher: the last finalized height as per
163        // consensus layer (greater than 0 only on restarts with populated
164        // consensus state), or the highest finalized block number from the
165        // execution layer.
166        //
167        // This works even if the execution layer was replaced with a snapshot.
168        //
169        // There is no point taking an outdated state because the network has
170        // moved on and there is no guarantee that older peers are even around.
171        //
172        // Compare this to the DKG actor, which boots into older DKG epochs
173        // because it attempts to replay older rounds.
174        let highest_finalized = self
175            .execution_node
176            .provider
177            .finalized_block_number()
178            .wrap_err("unable to read highest finalized block from execution layer")?
179            .unwrap_or(self.last_finalized_height.get())
180            .max(self.last_finalized_height.get());
181
182        // Short circuit - no need to read the same state if there is no new data.
183        if self
184            .last_tracked_peer_set
185            .as_ref()
186            .is_some_and(|tracked| tracked.height >= highest_finalized)
187        {
188            return Ok(());
189        }
190
191        let epoch_info = self
192            .epoch_strategy
193            .containing(Height::new(highest_finalized))
194            .expect("epoch strategy covers all heights");
195
196        // If we're exactly on a boundary, use it; otherwise use the previous
197        // epoch's last block (or genesis).
198        //
199        // This height is guaranteed to be finalized.
200        let latest_boundary = if epoch_info.last().get() == highest_finalized {
201            highest_finalized
202        } else {
203            epoch_info
204                .epoch()
205                .previous()
206                .map_or_else(Height::zero, |prev| {
207                    self.epoch_strategy
208                        .last(prev)
209                        .expect("epoch strategy covers all epochs")
210                })
211                .get()
212        };
213
214        let latest_boundary_header = read_header_at_height(&self.execution_node, latest_boundary)
215            .wrap_err("failed reading latest boundary header")?;
216        let highest_finalized_header =
217            read_header_at_height(&self.execution_node, highest_finalized)
218                .wrap_err("failed reading highest finalized header")?;
219
220        let onchain_outcome =
221            OnchainDkgOutcome::read(&mut latest_boundary_header.extra_data().as_ref())
222                .wrap_err_with(|| {
223                    format!(
224                        "boundary block at `{latest_boundary}` did not contain a valid DKG outcome"
225                    )
226                })?;
227
228        let peers = PeersBuilder::with_dkg_outcome(&onchain_outcome)
229            .resolve_at_hash(
230                self.execution_node.as_ref(),
231                highest_finalized_header.hash_slow(),
232            )
233            .wrap_err("failed reading peer set from execution layer")?;
234
235        debug!(
236            boundary.height = latest_boundary_header.number(),
237            boundary.hash = %latest_boundary_header.hash_slow(),
238            highest_finalized.height = highest_finalized_header.number(),
239            highest_finalized.hash = %highest_finalized_header.hash_slow(),
240            ?peers.primary,
241            ?peers.secondary,
242            "read active peers from DKG outcome in latest available \
243            boundary header and resolved p2p addresses against validator \
244            config contract"
245        );
246
247        self.track_or_overwrite(highest_finalized_header.number(), peers)
248            .await;
249
250        Ok(())
251    }
252
253    async fn track_or_overwrite(&mut self, height: u64, peers: Peers) {
254        if let Some(tracked) = &self.last_tracked_peer_set {
255            match peers.what_has_changed_compared_to(&tracked.peers) {
256                WhatHasChanged::Nothing => {}
257                WhatHasChanged::Addresses => self.oracle.overwrite(peers.to_flat_map()).await,
258                WhatHasChanged::Peers => self.oracle.track(height, peers.clone()).await,
259            }
260        } else {
261            self.oracle.track(height, peers.clone()).await;
262        }
263
264        // Always bump the last-tracked peer set. If the peers are unchanged
265        // this only updates the height, but we use the height to determine if
266        // state should be read or not.
267        self.last_tracked_peer_set
268            .replace(LastTrackedPeerSet { height, peers });
269
270        if let Some(tracked) = &self.last_tracked_peer_set {
271            self.peers.set(tracked.peers.len() as i64);
272        }
273
274        debug!(
275            last_tracked_peer_set = ?self.last_tracked_peer_set.as_ref().expect("just set it"),
276            "latest tracked peerset",
277        );
278    }
279
280    fn reset_peer_update_timer(&mut self) {
281        // Perform aggressive retries if no peer set is tracked yet.
282        // Otherwise just do it every minute.
283        self.peer_update_timer = Box::pin(
284            self.context.sleep(
285                self.last_tracked_peer_set
286                    .as_ref()
287                    .map_or(BOOTSTRAP_UPDATE_INTERVAL, |_| HEARTBEAT_UPDATE_INTERVAL),
288            ),
289        );
290    }
291}
292
293enum WhatHasChanged {
294    Nothing,
295    Addresses,
296    Peers,
297}
298
299#[derive(Clone, Debug)]
300struct Peers {
301    primary: ordered::Map<PublicKey, Address>,
302    secondary: ordered::Map<PublicKey, Address>,
303}
304
305impl Peers {
306    fn what_has_changed_compared_to(&self, old: &Self) -> WhatHasChanged {
307        if old.primary.keys() == self.primary.keys()
308            && old.secondary.keys() == self.secondary.keys()
309        {
310            if old.primary.values() == self.primary.values()
311                && old.secondary.values() == self.secondary.values()
312            {
313                WhatHasChanged::Nothing
314            } else {
315                WhatHasChanged::Addresses
316            }
317        } else {
318            WhatHasChanged::Peers
319        }
320    }
321
322    fn len(&self) -> usize {
323        self.primary.len().saturating_add(self.secondary.len())
324    }
325
326    fn to_flat_map(&self) -> ordered::Map<PublicKey, Address> {
327        ordered::Map::from_iter_dedup(
328            self.primary
329                .iter_pairs()
330                .chain(self.secondary.iter_pairs())
331                .map(|(key, val)| (key.clone(), val.clone())),
332        )
333    }
334}
335
336impl From<Peers> for AddressableTrackedPeers<PublicKey> {
337    fn from(value: Peers) -> Self {
338        Self {
339            primary: value.primary,
340            secondary: value.secondary,
341        }
342    }
343}
344
345struct PeersBuilder {
346    primary: ordered::Set<PublicKey>,
347    secondary: ordered::Set<PublicKey>,
348}
349
350impl PeersBuilder {
351    fn with_dkg_outcome(outcome: &OnchainDkgOutcome) -> Self {
352        let primary = outcome.players().clone();
353        let secondary = ordered::Set::from_iter_dedup(
354            outcome
355                .next_players()
356                .iter()
357                // Performs a binary search since `primary` is a sorted vec
358                // under the hood - so performance of this is fine.
359                .filter(|key| primary.position(key).is_none())
360                .cloned(),
361        );
362        Self { primary, secondary }
363    }
364
365    #[instrument(skip_all, fields(%hash))]
366    fn resolve_at_hash(self, node: impl ExecutionNode, hash: B256) -> eyre::Result<Peers> {
367        let Self { primary, secondary } = self;
368        let (_, _, (primary, secondary)) = read_validator_config_at_block_hash(
369            node,
370            hash,
371            |config: &ValidatorConfigV2| {
372                let mut active_validators = HashMap::new();
373                for (i, raw) in config
374                    .get_active_validators()
375                    .wrap_err("failed reading active validator set from contract")?
376                    .into_iter()
377                    .enumerate()
378                {
379                    if let Ok(decoded) =
380                        DecodedValidatorV2::decode_from_contract(raw).inspect_err(|error| {
381                            warn!(
382                                    %error,
383                                    position = i,
384                                    "failed decoding active validator in contract",
385                            )
386                        })
387                        && active_validators
388                            .insert(decoded.public_key().clone(), decoded.to_p2p_address())
389                            .is_some()
390                    {
391                        warn!(
392                            duplicate = %decoded.public_key(),
393                            "found duplicate public keys",
394                        );
395                    }
396                }
397                debug!(
398                    ?active_validators,
399                    "read active validators from contract, now extending with \
400                    historic peers that are still in the peer set but no \
401                    longer marked active",
402                );
403                let primary = ordered::Map::from_iter_dedup(primary.into_iter().map(|peer| {
404                    active_validators.remove_entry(&peer).unwrap_or_else(|| {
405                        let decoded = config
406                            .validator_by_public_key(public_key_to_b256(&peer))
407                            .map_err(eyre::Report::new)
408                            .and_then(DecodedValidatorV2::decode_from_contract)
409                            .wrap_err_with(|| {
410                                format!(
411                                    "failed to read DKG peer `{peer}` from validator config contract"
412                                )
413                            })
414                            .expect(
415                                "invariant: DKG peers must have an entry in the \
416                                smart contract and be well formed",
417                            );
418                        (decoded.public_key().clone(), decoded.to_p2p_address())
419                    })
420                }));
421
422                for peer in secondary {
423                    if let Entry::Vacant(slot) = active_validators.entry(peer.clone()) {
424                        let decoded = config
425                            .validator_by_public_key(public_key_to_b256(&peer))
426                            .map_err(eyre::Report::new)
427                            .and_then(DecodedValidatorV2::decode_from_contract)
428                            .wrap_err_with(|| {
429                                format!(
430                                    "failed to read next DKG peer `{peer}` from validator config contract"
431                                )
432                            })
433                            .expect(
434                                "invariant: next DKG peers must have an entry in the \
435                                smart contract and be well formed",
436                            );
437                        slot.insert_entry(decoded.to_p2p_address());
438                    }
439                }
440
441                let secondary = ordered::Map::from_iter_dedup(active_validators.into_iter());
442
443                Ok((primary, secondary))
444            },
445        )?;
446        Ok(Peers { primary, secondary })
447    }
448}
449
450#[derive(Debug)]
451struct LastTrackedPeerSet {
452    height: u64,
453    peers: Peers,
454}
455
456#[instrument(skip_all, fields(height), err)]
457fn read_header_at_height(execution_node: &TempoFullNode, height: u64) -> eyre::Result<TempoHeader> {
458    execution_node
459        .provider
460        .header_by_number(height)
461        .map_err(eyre::Report::new)
462        .and_then(|h| h.ok_or_eyre("execution layer did not have a header at the requested height"))
463        .wrap_err_with(|| format!("failed reading header at height `{height}`"))
464}
465
466#[cfg(test)]
467mod tests {
468    use std::{
469        collections::HashMap,
470        net::{IpAddr, Ipv4Addr, SocketAddr},
471    };
472
473    use alloy_consensus::Header;
474    use alloy_primitives::{Address as AlloyAddress, B256, Keccak256, U256};
475    use commonware_codec::Encode as _;
476    use commonware_consensus::types::Epoch;
477    use commonware_cryptography::{
478        Signer as _,
479        bls12381::{
480            dkg,
481            primitives::{sharing::Mode, variant::MinSig},
482        },
483        ed25519::PrivateKey,
484    };
485    use commonware_utils::{N3f1, TryFromIterator as _};
486    use rand_08::SeedableRng as _;
487    use reth_ethereum::evm::revm::{State, database::StateProviderDatabase};
488    use reth_node_builder::ConfigureEvm as _;
489    use reth_provider::{
490        StateProviderBox,
491        test_utils::{ExtendedAccount, MockEthProvider},
492    };
493    use tempo_node::evm::{TempoEvmConfig, evm::TempoEvm};
494    use tempo_precompiles::{
495        storage::{StorageCtx, hashmap::HashMapStorageProvider},
496        validator_config_v2::{IValidatorConfigV2, VALIDATOR_NS_ADD},
497    };
498
499    use super::*;
500
501    const VALIDATOR_CONFIG_V2_ADDRESS: AlloyAddress =
502        alloy_primitives::address!("0xCCCCCCCC00000000000000000000000000000001");
503
504    struct TestExecutionNode {
505        hash: B256,
506        height: u64,
507        provider: MockEthProvider,
508    }
509
510    impl ExecutionNode for TestExecutionNode {
511        fn header(&self, block_hash: B256) -> eyre::Result<TempoHeader> {
512            assert_eq!(block_hash, self.hash);
513            Ok(TempoHeader {
514                general_gas_limit: 30_000_000,
515                inner: Header {
516                    number: self.height,
517                    timestamp: 1,
518                    gas_limit: 30_000_000,
519                    base_fee_per_gas: Some(1),
520                    ..Default::default()
521                },
522                ..Default::default()
523            })
524        }
525
526        fn state_by_block_hash(&self, block_hash: B256) -> eyre::Result<StateProviderBox> {
527            assert_eq!(block_hash, self.hash);
528            Ok(Box::new(self.provider.clone()))
529        }
530
531        fn evm_for_block(
532            &self,
533            db: State<StateProviderDatabase<StateProviderBox>>,
534            header: &TempoHeader,
535        ) -> eyre::Result<TempoEvm<State<StateProviderDatabase<StateProviderBox>>>> {
536            TempoEvmConfig::moderato()
537                .evm_for_block(db, header)
538                .map_err(eyre::Report::new)
539        }
540    }
541
542    struct ValidatorFixture {
543        private_key: PrivateKey,
544        public_key: PublicKey,
545        validator_address: AlloyAddress,
546        ingress: String,
547        egress: String,
548        p2p_address: Address,
549    }
550
551    fn peer(seed: u8) -> ValidatorFixture {
552        let private_key = PrivateKey::from_seed(u64::from(seed));
553        let public_key = private_key.public_key();
554        let validator_address = AlloyAddress::from([seed; 20]);
555        let egress_ip = IpAddr::V4(Ipv4Addr::new(192, 168, 1, seed));
556        let ingress_socket = SocketAddr::new(egress_ip, 8000 + u16::from(seed));
557        let p2p_address = Address::Asymmetric {
558            ingress: commonware_p2p::Ingress::Socket(ingress_socket),
559            egress: SocketAddr::new(egress_ip, 0),
560        };
561
562        ValidatorFixture {
563            private_key,
564            public_key,
565            validator_address,
566            ingress: ingress_socket.to_string(),
567            egress: egress_ip.to_string(),
568            p2p_address,
569        }
570    }
571
572    impl ValidatorFixture {
573        fn add_validator_call(&self) -> IValidatorConfigV2::addValidatorCall {
574            let mut hasher = Keccak256::new();
575            hasher.update(1u64.to_be_bytes());
576            hasher.update(VALIDATOR_CONFIG_V2_ADDRESS.as_slice());
577            hasher.update(self.validator_address.as_slice());
578            hasher.update([self.ingress.len() as u8]);
579            hasher.update(self.ingress.as_bytes());
580            hasher.update([self.egress.len() as u8]);
581            hasher.update(self.egress.as_bytes());
582            hasher.update(self.validator_address.as_slice());
583            let message = hasher.finalize();
584            let signature = self
585                .private_key
586                .sign(VALIDATOR_NS_ADD, message.as_slice())
587                .encode()
588                .to_vec();
589
590            IValidatorConfigV2::addValidatorCall {
591                validatorAddress: self.validator_address,
592                publicKey: public_key_to_b256(&self.public_key),
593                ingress: self.ingress.clone(),
594                egress: self.egress.clone(),
595                feeRecipient: self.validator_address,
596                signature: signature.into(),
597            }
598        }
599    }
600
601    fn execution_with_validators(
602        validators: &[ValidatorFixture],
603    ) -> eyre::Result<TestExecutionNode> {
604        let mut storage = HashMapStorageProvider::new(1);
605        let owner = AlloyAddress::from([0xAA; 20]);
606
607        StorageCtx::enter(&mut storage, || -> eyre::Result<()> {
608            let mut config = ValidatorConfigV2::new();
609            config.initialize(owner)?;
610            for validator in validators {
611                config.add_validator(owner, validator.add_validator_call())?;
612            }
613            Ok(())
614        })?;
615
616        let mut storage_by_account = HashMap::<AlloyAddress, Vec<(B256, U256)>>::new();
617        for (address, slot, value) in storage.into_storage() {
618            storage_by_account
619                .entry(address)
620                .or_default()
621                .push((B256::from(slot), value));
622        }
623        let provider = MockEthProvider::new();
624        for (address, storage) in storage_by_account {
625            provider.add_account(
626                address,
627                ExtendedAccount::new(0, U256::ZERO).extend_storage(storage),
628            );
629        }
630
631        Ok(TestExecutionNode {
632            hash: B256::from([0x42; 32]),
633            height: 7,
634            provider,
635        })
636    }
637
638    fn dkg_outcome(
639        players: impl IntoIterator<Item = PublicKey>,
640        next_players: impl IntoIterator<Item = PublicKey>,
641    ) -> eyre::Result<OnchainDkgOutcome> {
642        let mut rng = rand_08::rngs::StdRng::seed_from_u64(42);
643        let (output, _) = dkg::deal::<MinSig, _, N3f1>(
644            &mut rng,
645            Mode::NonZeroCounter,
646            ordered::Set::try_from_iter(players)?,
647        )?;
648
649        Ok(OnchainDkgOutcome {
650            epoch: Epoch::new(0),
651            output,
652            next_players: ordered::Set::try_from_iter(next_players)?,
653            is_next_full_dkg: false,
654        })
655    }
656
657    fn assert_peer(map: &ordered::Map<PublicKey, Address>, validator: &ValidatorFixture) {
658        assert_eq!(
659            map.get_value(&validator.public_key),
660            Some(&validator.p2p_address),
661        );
662    }
663
664    fn assert_no_peer(map: &ordered::Map<PublicKey, Address>, validator: &ValidatorFixture) {
665        assert!(map.get_value(&validator.public_key).is_none());
666    }
667
668    #[test]
669    fn resolve_at_hash_has_no_secondaries_when_players_are_next_players() -> eyre::Result<()> {
670        let execution = execution_with_validators(&[peer(1), peer(2)])?;
671        let outcome = dkg_outcome(
672            [peer(1).public_key, peer(2).public_key],
673            [peer(1).public_key, peer(2).public_key],
674        )?;
675        let peers =
676            PeersBuilder::with_dkg_outcome(&outcome).resolve_at_hash(&execution, execution.hash)?;
677
678        assert_eq!(peers.primary.len(), 2);
679        assert_eq!(peers.secondary.len(), 0);
680        assert_peer(&peers.primary, &peer(1));
681        assert_peer(&peers.primary, &peer(2));
682
683        Ok(())
684    }
685
686    #[test]
687    fn resolve_at_hash_keeps_dropped_player_primary() -> eyre::Result<()> {
688        let execution = execution_with_validators(&[peer(1), peer(2)])?;
689        let outcome = dkg_outcome(
690            [peer(1).public_key, peer(2).public_key],
691            [peer(1).public_key],
692        )?;
693        let peers =
694            PeersBuilder::with_dkg_outcome(&outcome).resolve_at_hash(&execution, execution.hash)?;
695
696        assert_eq!(peers.primary.len(), 2);
697        assert_eq!(peers.secondary.len(), 0);
698        assert_peer(&peers.primary, &peer(1));
699        assert_peer(&peers.primary, &peer(2));
700
701        Ok(())
702    }
703
704    #[test]
705    fn resolve_at_hash_adds_next_player_as_secondary() -> eyre::Result<()> {
706        let execution = execution_with_validators(&[peer(1), peer(2)])?;
707        let outcome = dkg_outcome(
708            [peer(1).public_key],
709            [peer(1).public_key, peer(2).public_key],
710        )?;
711        let peers =
712            PeersBuilder::with_dkg_outcome(&outcome).resolve_at_hash(&execution, execution.hash)?;
713
714        assert_eq!(peers.primary.len(), 1);
715        assert_eq!(peers.secondary.len(), 1);
716        assert_peer(&peers.primary, &peer(1));
717        assert_no_peer(&peers.secondary, &peer(1));
718        assert_peer(&peers.secondary, &peer(2));
719
720        Ok(())
721    }
722
723    #[test]
724    fn resolve_at_hash_adds_active_non_dkg_validator_as_secondary() -> eyre::Result<()> {
725        let execution = execution_with_validators(&[peer(1), peer(2)])?;
726        let outcome = dkg_outcome([peer(1).public_key], [peer(1).public_key])?;
727        let peers =
728            PeersBuilder::with_dkg_outcome(&outcome).resolve_at_hash(&execution, execution.hash)?;
729
730        assert_eq!(peers.primary.len(), 1);
731        assert_eq!(peers.secondary.len(), 1);
732        assert_peer(&peers.primary, &peer(1));
733        assert_no_peer(&peers.secondary, &peer(1));
734        assert_peer(&peers.secondary, &peer(2));
735
736        Ok(())
737    }
738}