Skip to main content

tempo_e2e/
lib.rs

1//! e2e tests using the [`commonware_runtime::deterministic`].
2//!
3//! This crate mimics how a full tempo node is run in production but runs the
4//! consensus engine in a deterministic runtime while maintaining a tokio
5//! async environment to launch execution nodes.
6//!
7//! All definitions herein are only intended to support the the tests defined
8//! in tests/.
9
10#![cfg_attr(not(test), warn(unused_crate_dependencies))]
11#![cfg_attr(docsrs, feature(doc_cfg))]
12
13use std::{iter::repeat_with, net::SocketAddr, time::Duration};
14
15use alloy_primitives::Address;
16use commonware_consensus::types::Epoch;
17use commonware_cryptography::{
18    Signer as _,
19    bls12381::{
20        dkg::{self},
21        primitives::{group::Share, sharing::Mode},
22    },
23    ed25519::{PrivateKey, PublicKey},
24};
25use commonware_math::algebra::Random as _;
26use commonware_p2p::simulated::{self, Link, Network, Oracle};
27
28use commonware_codec::Encode;
29use commonware_runtime::{
30    Metrics as _, Runner as _,
31    deterministic::{self, Context, Runner},
32};
33use commonware_utils::{N3f1, TryFromIterator as _, ordered};
34use futures::future::join_all;
35use itertools::Itertools as _;
36use rand_core::CryptoRngCore;
37use reth_node_metrics::recorder::PrometheusRecorder;
38use tempo_consensus::{consensus, feed::FeedStateHandle};
39
40pub mod execution_runtime;
41pub mod metrics;
42pub use execution_runtime::ExecutionNodeConfig;
43pub mod testing_node;
44pub use execution_runtime::ExecutionRuntime;
45use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
46pub use testing_node::TestingNode;
47
48#[cfg(test)]
49mod tests;
50
51pub const CONSENSUS_NODE_PREFIX: &str = "consensus";
52pub const EXECUTION_NODE_PREFIX: &str = "execution";
53
54fn generate_consensus_node_config(
55    rng: &mut impl CryptoRngCore,
56    signers: u32,
57    verifiers: u32,
58    fee_recipient: Address,
59) -> (
60    OnchainDkgOutcome,
61    ordered::Map<PublicKey, ConsensusNodeConfig>,
62) {
63    let signer_keys = repeat_with(|| PrivateKey::random(&mut *rng))
64        .take(signers as usize)
65        .collect::<Vec<_>>();
66
67    let (initial_dkg_outcome, shares) = dkg::deal::<_, _, N3f1>(
68        &mut *rng,
69        Mode::NonZeroCounter,
70        ordered::Set::try_from_iter(signer_keys.iter().map(|key| key.public_key())).unwrap(),
71    )
72    .unwrap();
73
74    let onchain_dkg_outcome = OnchainDkgOutcome {
75        epoch: Epoch::zero(),
76        output: initial_dkg_outcome,
77        next_players: shares.keys().clone(),
78        is_next_full_dkg: false,
79    };
80
81    let verifier_keys = repeat_with(|| PrivateKey::random(&mut *rng))
82        .take(verifiers as usize)
83        .collect::<Vec<_>>();
84
85    let validators = ordered::Map::try_from_iter(
86        signer_keys
87            .into_iter()
88            .chain(verifier_keys)
89            .enumerate()
90            .map(|(i, private_key)| {
91                let public_key = private_key.public_key();
92                let config = ConsensusNodeConfig {
93                    address: crate::execution_runtime::validator(i as u32),
94                    ingress: SocketAddr::from(([127, 0, 0, (i + 1) as u8], 8000)),
95                    egress: SocketAddr::from(([127, 0, 0, (i + 1) as u8], 0)),
96                    fee_recipient,
97                    private_key,
98                    share: shares.get_value(&public_key).cloned(),
99                };
100                (public_key, config)
101            }),
102    )
103    .unwrap();
104
105    (onchain_dkg_outcome, validators)
106}
107
108/// Configuration for a validator.
109#[derive(Clone, Debug)]
110pub struct ConsensusNodeConfig {
111    pub address: Address,
112    pub ingress: SocketAddr,
113    pub egress: SocketAddr,
114    pub fee_recipient: Address,
115    pub private_key: PrivateKey,
116    pub share: Option<Share>,
117}
118
119/// The test setup run by [`run`].
120#[derive(Clone)]
121pub struct Setup {
122    /// How many signing validators to launch.
123    pub how_many_signers: u32,
124
125    /// How many non-signing validators (verifiers) to launch.
126    /// These nodes participate in consensus but don't have shares.
127    pub how_many_verifiers: u32,
128
129    /// The seed used for setting up the deterministic runtime.
130    pub seed: u64,
131
132    /// The linkage between individual validators.
133    pub linkage: Link,
134
135    /// The number of heights in an epoch.
136    pub epoch_length: u64,
137
138    /// Local proposal return budget, excluding the network propagation allowance.
139    pub proposal_return_budget: Duration,
140
141    /// Whether to activate subblocks building.
142    pub with_subblocks: bool,
143
144    /// The fee recipient written into the V2 contract for each validator.
145    pub fee_recipient: Address,
146}
147
148impl Setup {
149    pub fn new() -> Self {
150        Self {
151            how_many_signers: 4,
152            how_many_verifiers: 0,
153            seed: 0,
154            linkage: Link {
155                latency: Duration::from_millis(10),
156                jitter: Duration::from_millis(1),
157                success_rate: 1.0,
158            },
159            epoch_length: 20,
160            proposal_return_budget: Duration::from_millis(300),
161            with_subblocks: false,
162            fee_recipient: Address::ZERO,
163        }
164    }
165
166    pub fn how_many_signers(self, how_many_signers: u32) -> Self {
167        Self {
168            how_many_signers,
169            ..self
170        }
171    }
172
173    pub fn how_many_verifiers(self, how_many_verifiers: u32) -> Self {
174        Self {
175            how_many_verifiers,
176            ..self
177        }
178    }
179
180    pub fn seed(self, seed: u64) -> Self {
181        Self { seed, ..self }
182    }
183
184    pub fn linkage(self, linkage: Link) -> Self {
185        Self { linkage, ..self }
186    }
187
188    pub fn epoch_length(self, epoch_length: u64) -> Self {
189        Self {
190            epoch_length,
191            ..self
192        }
193    }
194
195    pub fn proposal_return_budget(self, proposal_return_budget: Duration) -> Self {
196        Self {
197            proposal_return_budget,
198            ..self
199        }
200    }
201
202    pub fn subblocks(self, with_subblocks: bool) -> Self {
203        Self {
204            with_subblocks,
205            ..self
206        }
207    }
208
209    pub fn fee_recipient(self, fee_recipient: Address) -> Self {
210        Self {
211            fee_recipient,
212            ..self
213        }
214    }
215}
216
217impl Default for Setup {
218    fn default() -> Self {
219        Self::new()
220    }
221}
222
223/// Sets up validators and returns the nodes and execution runtime.
224///
225/// The execution runtime is created internally with a chainspec configured
226/// according to the Setup parameters (epoch_length, validators, polynomial).
227///
228/// The oracle is accessible via `TestingNode::oracle()` if needed for dynamic linking.
229pub async fn setup_validators(
230    context: &mut Context,
231    Setup {
232        epoch_length,
233        how_many_signers,
234        how_many_verifiers,
235        linkage,
236        proposal_return_budget,
237        with_subblocks,
238        fee_recipient,
239        ..
240    }: Setup,
241) -> (Vec<TestingNode<Context>>, ExecutionRuntime) {
242    let (network, mut oracle) = Network::new(
243        context.with_label("network"),
244        simulated::Config {
245            max_size: 1024 * 1024,
246            disconnect_on_block: true,
247            tracked_peer_sets: commonware_utils::NZUsize!(3),
248        },
249    );
250    network.start();
251
252    let (onchain_dkg_outcome, validators) = generate_consensus_node_config(
253        context,
254        how_many_signers,
255        how_many_verifiers,
256        fee_recipient,
257    );
258
259    let execution_runtime = ExecutionRuntime::builder()
260        .with_epoch_length(epoch_length)
261        .with_initial_dkg_outcome(onchain_dkg_outcome)
262        .with_validators(validators.clone())
263        .launch()
264        .unwrap();
265
266    let execution_configs = ExecutionNodeConfig::generator()
267        .with_count(how_many_signers + how_many_verifiers)
268        .generate();
269
270    let mut nodes = vec![];
271
272    for ((public_key, consensus_node_config), mut execution_config) in
273        validators.into_iter().zip_eq(execution_configs)
274    {
275        let ConsensusNodeConfig {
276            address,
277            ingress,
278            private_key,
279            share,
280            ..
281        } = consensus_node_config;
282        let oracle = oracle.clone();
283        let uid = format!("{CONSENSUS_NODE_PREFIX}_{public_key}");
284        let feed_state = FeedStateHandle::new();
285
286        execution_config.validator_key = Some(public_key.encode().as_ref().try_into().unwrap());
287        execution_config.feed_state = Some(feed_state.clone());
288
289        let engine_config = consensus::Builder {
290            execution_node: None,
291            blocker: oracle.control(private_key.public_key()),
292            peer_manager: oracle.socket_manager(),
293            partition_prefix: uid.clone(),
294            share,
295            signer: private_key.clone(),
296            mailbox_size: 1024,
297            deque_size: 10,
298            time_to_propose: Duration::from_secs(2),
299            time_to_collect_notarizations: Duration::from_secs(3),
300            time_to_retry_nullify_broadcast: Duration::from_secs(10),
301            time_for_peer_response: Duration::from_secs(2),
302            views_to_track: 10,
303            views_until_leader_skip: 5,
304            proposal_return_budget,
305            time_to_build_subblock: Duration::from_millis(100),
306            subblock_broadcast_interval: Duration::from_millis(50),
307            fcu_heartbeat_interval: Duration::from_secs(3),
308            feed_state,
309            with_subblocks,
310            // Plenty of headroom for any test; the marshal will fall back to
311            // reth past this depth via the hybrid finalized blocks store.
312            finalized_blocks_retention: 1024,
313        };
314
315        nodes.push(TestingNode::new(
316            uid,
317            private_key,
318            oracle.clone(),
319            engine_config,
320            execution_runtime.handle(),
321            execution_config,
322            ingress,
323            address,
324        ));
325    }
326
327    link_validators(&mut oracle, &nodes, linkage, None).await;
328
329    (nodes, execution_runtime)
330}
331
332/// Runs a test configured by [`Setup`].
333pub fn run(setup: Setup, mut stop_condition: impl FnMut(&metrics::Metrics) -> bool) -> String {
334    let cfg = deterministic::Config::default().with_seed(setup.seed);
335    let executor = Runner::from(cfg);
336
337    executor.start(|mut context| async move {
338        // Setup and run all validators.
339        let (mut nodes, _execution_runtime) = setup_validators(&mut context, setup.clone()).await;
340        join_all(nodes.iter_mut().map(|node| node.start(&context))).await;
341
342        metrics::wait_for_metrics(&context, |metrics| {
343            metrics.assert_no_blocked_peers();
344            stop_condition(metrics)
345        })
346        .await;
347
348        context.auditor().state()
349    })
350}
351
352/// Connects a running node to a set of peers
353///
354/// Useful when a node is restarted and needs to re-connect to its previous peers as
355/// ports are not statically defined.
356pub async fn connect_execution_to_peers<TClock: commonware_runtime::Clock>(
357    node: &TestingNode<TClock>,
358    nodes: &[TestingNode<TClock>],
359) {
360    for other in nodes {
361        if node.public_key() == other.public_key() {
362            continue;
363        }
364
365        if let (Some(a), Some(b)) = (node.execution_node.as_ref(), other.execution_node.as_ref()) {
366            a.connect_peer(b).await;
367        }
368    }
369}
370
371/// Connects all running execution nodes as peers.
372///
373/// This must be called after nodes are started so that the ports are known
374pub async fn connect_execution_peers<TClock: commonware_runtime::Clock>(
375    nodes: &[TestingNode<TClock>],
376) {
377    for i in 0..nodes.len() {
378        connect_execution_to_peers(&nodes[i], &nodes[(i + 1)..]).await;
379    }
380}
381
382/// Links (or unlinks) validators using the oracle.
383///
384/// The `restrict_to` function can be used to restrict the linking to certain connections,
385/// otherwise all validators will be linked to all other validators.
386pub async fn link_validators<TClock: commonware_runtime::Clock>(
387    oracle: &mut Oracle<PublicKey, TClock>,
388    validators: &[TestingNode<TClock>],
389    link: Link,
390    restrict_to: Option<fn(usize, usize, usize) -> bool>,
391) {
392    for (i1, v1) in validators.iter().enumerate() {
393        for (i2, v2) in validators.iter().enumerate() {
394            // Ignore self
395            if v1.public_key() == v2.public_key() {
396                continue;
397            }
398
399            // Restrict to certain connections
400            if let Some(f) = restrict_to
401                && !f(validators.len(), i1, i2)
402            {
403                continue;
404            }
405
406            // Add link
407            match oracle
408                .add_link(
409                    v1.public_key().clone(),
410                    v2.public_key().clone(),
411                    link.clone(),
412                )
413                .await
414            {
415                Ok(()) => (),
416                // TODO: it should be possible to remove the below if Commonware simulated network exposes list of registered peers.
417                //
418                // This is fine because some of the peers might be registered later
419                Err(commonware_p2p::simulated::Error::PeerMissing) => (),
420                // This is fine because we might call this multiple times as peers are joining the network.
421                Err(commonware_p2p::simulated::Error::LinkExists) => (),
422                res @ Err(_) => res.unwrap(),
423            }
424        }
425    }
426}
427
428/// Get the number of pipeline runs from the Prometheus metrics recorder
429pub fn get_pipeline_runs(recorder: &PrometheusRecorder) -> u64 {
430    recorder
431        .handle()
432        .render()
433        .lines()
434        .find(|line| line.starts_with("reth_consensus_engine_beacon_pipeline_runs"))
435        .and_then(|line| line.split_whitespace().nth(1)?.parse().ok())
436        .unwrap_or(0)
437}