Skip to main content

tempo_e2e/
lib.rs

1//! e2e tests using the [`commonware_runtime::deterministic`].
2//!
3//! This crate mimics how a full tempo node is run in production but runs the
4//! consensus engine in a deterministic runtime while maintaining a tokio
5//! async environment to launch execution nodes.
6//!
7//! All definitions herein are only intended to support the the tests defined
8//! in tests/.
9
10#![cfg_attr(not(test), warn(unused_crate_dependencies))]
11#![cfg_attr(docsrs, feature(doc_cfg))]
12
13use std::{iter::repeat_with, net::SocketAddr, time::Duration};
14
15use alloy::signers::k256::schnorr::CryptoRngCore;
16use alloy_primitives::Address;
17use commonware_consensus::types::Epoch;
18use commonware_cryptography::{
19    Signer as _,
20    bls12381::{
21        dkg::{self},
22        primitives::{group::Share, sharing::Mode},
23    },
24    ed25519::{PrivateKey, PublicKey},
25};
26use commonware_math::algebra::Random as _;
27use commonware_p2p::simulated::{self, Link, Network, Oracle};
28
29use commonware_codec::Encode;
30use commonware_runtime::{
31    Clock, Metrics as _, Runner as _,
32    deterministic::{self, Context, Runner},
33};
34use commonware_utils::{N3f1, TryFromIterator as _, ordered};
35use futures::future::join_all;
36use itertools::Itertools as _;
37use reth_node_metrics::recorder::PrometheusRecorder;
38use tempo_commonware_node::{consensus, feed::FeedStateHandle};
39
40pub mod execution_runtime;
41pub use execution_runtime::ExecutionNodeConfig;
42pub mod testing_node;
43pub use execution_runtime::ExecutionRuntime;
44use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
45pub use testing_node::TestingNode;
46
47#[cfg(test)]
48mod tests;
49
50pub const CONSENSUS_NODE_PREFIX: &str = "consensus";
51pub const EXECUTION_NODE_PREFIX: &str = "execution";
52
53fn generate_consensus_node_config(
54    rng: &mut impl CryptoRngCore,
55    signers: u32,
56    verifiers: u32,
57) -> (
58    OnchainDkgOutcome,
59    ordered::Map<PublicKey, ConsensusNodeConfig>,
60) {
61    let signer_keys = repeat_with(|| PrivateKey::random(&mut *rng))
62        .take(signers as usize)
63        .collect::<Vec<_>>();
64
65    let (initial_dkg_outcome, shares) = dkg::deal::<_, _, N3f1>(
66        &mut *rng,
67        Mode::NonZeroCounter,
68        ordered::Set::try_from_iter(signer_keys.iter().map(|key| key.public_key())).unwrap(),
69    )
70    .unwrap();
71
72    let onchain_dkg_outcome = OnchainDkgOutcome {
73        epoch: Epoch::zero(),
74        output: initial_dkg_outcome,
75        next_players: shares.keys().clone(),
76        is_next_full_dkg: false,
77    };
78
79    let verifier_keys = repeat_with(|| PrivateKey::random(&mut *rng))
80        .take(verifiers as usize)
81        .collect::<Vec<_>>();
82
83    let validators = ordered::Map::try_from_iter(
84        signer_keys
85            .into_iter()
86            .chain(verifier_keys)
87            .enumerate()
88            .map(|(i, private_key)| {
89                let public_key = private_key.public_key();
90                let config = ConsensusNodeConfig {
91                    address: crate::execution_runtime::validator(i as u32),
92                    ingress: SocketAddr::from(([127, 0, 0, (i + 1) as u8], 8000)),
93                    egress: SocketAddr::from(([127, 0, 0, (i + 1) as u8], 0)),
94                    fee_recipient: Address::ZERO,
95                    private_key,
96                    share: shares.get_value(&public_key).cloned(),
97                };
98                (public_key, config)
99            }),
100    )
101    .unwrap();
102
103    (onchain_dkg_outcome, validators)
104}
105
106/// Configuration for a validator.
107#[derive(Clone, Debug)]
108pub struct ConsensusNodeConfig {
109    pub address: Address,
110    pub ingress: SocketAddr,
111    pub egress: SocketAddr,
112    pub fee_recipient: Address,
113    pub private_key: PrivateKey,
114    pub share: Option<Share>,
115}
116
117/// The test setup run by [`run`].
118#[derive(Clone)]
119pub struct Setup {
120    /// How many signing validators to launch.
121    pub how_many_signers: u32,
122
123    /// How many non-signing validators (verifiers) to launch.
124    /// These nodes participate in consensus but don't have shares.
125    pub how_many_verifiers: u32,
126
127    /// The seed used for setting up the deterministic runtime.
128    pub seed: u64,
129
130    /// The linkage between individual validators.
131    pub linkage: Link,
132
133    /// The number of heights in an epoch.
134    pub epoch_length: u64,
135
136    /// Whether to connect execution layer nodes directly.
137    pub connect_execution_layer_nodes: bool,
138
139    /// The amount of time the node waits for the execution layer to return
140    /// a build a payload.
141    pub new_payload_wait_time: Duration,
142
143    /// The t2 hardfork time.
144    ///
145    /// Validators will only be written into the V2 contract if t2_time == 0.
146    ///
147    /// Default: 1.
148    pub t2_time: u64,
149
150    /// Whether to activate subblocks building.
151    pub with_subblocks: bool,
152}
153
154impl Setup {
155    pub fn new() -> Self {
156        Self {
157            how_many_signers: 4,
158            how_many_verifiers: 0,
159            seed: 0,
160            linkage: Link {
161                latency: Duration::from_millis(10),
162                jitter: Duration::from_millis(1),
163                success_rate: 1.0,
164            },
165            epoch_length: 20,
166            connect_execution_layer_nodes: false,
167            new_payload_wait_time: Duration::from_millis(300),
168            t2_time: 1,
169            with_subblocks: false,
170        }
171    }
172
173    pub fn how_many_signers(self, how_many_signers: u32) -> Self {
174        Self {
175            how_many_signers,
176            ..self
177        }
178    }
179
180    pub fn how_many_verifiers(self, how_many_verifiers: u32) -> Self {
181        Self {
182            how_many_verifiers,
183            ..self
184        }
185    }
186
187    pub fn seed(self, seed: u64) -> Self {
188        Self { seed, ..self }
189    }
190
191    pub fn linkage(self, linkage: Link) -> Self {
192        Self { linkage, ..self }
193    }
194
195    pub fn epoch_length(self, epoch_length: u64) -> Self {
196        Self {
197            epoch_length,
198            ..self
199        }
200    }
201
202    pub fn connect_execution_layer_nodes(self, connect_execution_layer_nodes: bool) -> Self {
203        Self {
204            connect_execution_layer_nodes,
205            ..self
206        }
207    }
208
209    pub fn new_payload_wait_time(self, new_payload_wait_time: Duration) -> Self {
210        Self {
211            new_payload_wait_time,
212            ..self
213        }
214    }
215
216    pub fn subblocks(self, with_subblocks: bool) -> Self {
217        Self {
218            with_subblocks,
219            ..self
220        }
221    }
222
223    pub fn t2_time(self, t2_time: u64) -> Self {
224        Self { t2_time, ..self }
225    }
226}
227
228impl Default for Setup {
229    fn default() -> Self {
230        Self::new()
231    }
232}
233
234/// Sets up validators and returns the nodes and execution runtime.
235///
236/// The execution runtime is created internally with a chainspec configured
237/// according to the Setup parameters (epoch_length, validators, polynomial).
238///
239/// The oracle is accessible via `TestingNode::oracle()` if needed for dynamic linking.
240pub async fn setup_validators(
241    context: &mut Context,
242    Setup {
243        epoch_length,
244        how_many_signers,
245        how_many_verifiers,
246        connect_execution_layer_nodes,
247        linkage,
248        new_payload_wait_time,
249        t2_time,
250        with_subblocks,
251        ..
252    }: Setup,
253) -> (Vec<TestingNode<Context>>, ExecutionRuntime) {
254    let (network, mut oracle) = Network::new(
255        context.with_label("network"),
256        simulated::Config {
257            max_size: 1024 * 1024,
258            disconnect_on_block: true,
259            tracked_peer_sets: Some(3),
260        },
261    );
262    network.start();
263
264    let (onchain_dkg_outcome, validators) =
265        generate_consensus_node_config(context, how_many_signers, how_many_verifiers);
266
267    let execution_runtime = ExecutionRuntime::builder()
268        .with_epoch_length(epoch_length)
269        .with_initial_dkg_outcome(onchain_dkg_outcome)
270        .with_t2_time(t2_time)
271        .with_validators(validators.clone())
272        .launch()
273        .unwrap();
274
275    let execution_configs = ExecutionNodeConfig::generator()
276        .with_count(how_many_signers + how_many_verifiers)
277        .with_peers(connect_execution_layer_nodes)
278        .generate();
279
280    let mut nodes = vec![];
281
282    for ((public_key, consensus_node_config), mut execution_config) in
283        validators.into_iter().zip_eq(execution_configs)
284    {
285        let ConsensusNodeConfig {
286            address,
287            ingress,
288            private_key,
289            share,
290            ..
291        } = consensus_node_config;
292        let oracle = oracle.clone();
293        let uid = format!("{CONSENSUS_NODE_PREFIX}_{public_key}");
294        let feed_state = FeedStateHandle::new();
295
296        execution_config.validator_key = Some(public_key.encode().as_ref().try_into().unwrap());
297        execution_config.feed_state = Some(feed_state.clone());
298
299        let engine_config = consensus::Builder {
300            fee_recipient: alloy_primitives::Address::ZERO,
301            execution_node: None,
302            blocker: oracle.control(private_key.public_key()),
303            peer_manager: oracle.socket_manager(),
304            partition_prefix: uid.clone(),
305            share,
306            signer: private_key.clone(),
307            mailbox_size: 1024,
308            deque_size: 10,
309            time_to_propose: Duration::from_secs(2),
310            time_to_collect_notarizations: Duration::from_secs(3),
311            time_to_retry_nullify_broadcast: Duration::from_secs(10),
312            time_for_peer_response: Duration::from_secs(2),
313            views_to_track: 10,
314            views_until_leader_skip: 5,
315            payload_interrupt_time: Duration::from_millis(200),
316            new_payload_wait_time,
317            time_to_build_subblock: Duration::from_millis(100),
318            subblock_broadcast_interval: Duration::from_millis(50),
319            fcu_heartbeat_interval: Duration::from_secs(3),
320            feed_state,
321            with_subblocks,
322        };
323
324        nodes.push(TestingNode::new(
325            uid,
326            private_key,
327            oracle.clone(),
328            engine_config,
329            execution_runtime.handle(),
330            execution_config,
331            ingress,
332            address,
333        ));
334    }
335
336    link_validators(&mut oracle, &nodes, linkage, None).await;
337
338    (nodes, execution_runtime)
339}
340
341/// Runs a test configured by [`Setup`].
342pub fn run(setup: Setup, mut stop_condition: impl FnMut(&str, &str) -> bool) -> String {
343    let cfg = deterministic::Config::default().with_seed(setup.seed);
344    let executor = Runner::from(cfg);
345
346    executor.start(|mut context| async move {
347        // Setup and run all validators.
348        let (mut nodes, _execution_runtime) = setup_validators(&mut context, setup.clone()).await;
349
350        join_all(nodes.iter_mut().map(|node| node.start(&context))).await;
351
352        loop {
353            let metrics = context.encode();
354
355            let mut success = false;
356            for line in metrics.lines() {
357                if !line.starts_with(CONSENSUS_NODE_PREFIX) {
358                    continue;
359                }
360
361                let mut parts = line.split_whitespace();
362                let metric = parts.next().unwrap();
363                let value = parts.next().unwrap();
364
365                if metric.ends_with("_peers_blocked") {
366                    let value = value.parse::<u64>().unwrap();
367                    assert_eq!(value, 0);
368                }
369
370                if setup.t2_time == 0 {
371                    if metric.ends_with("_dkg_manager_read_players_from_v1_contract_total") {
372                        assert_eq!(0, value.parse::<u64>().unwrap());
373                    }
374                    if metric.ends_with("_dkg_manager_syncing_players") {
375                        assert_eq!(0, value.parse::<u64>().unwrap());
376                    }
377                    if metric.ends_with("_dkg_manager_read_re_dkg_epoch_from_v1_contract_total") {
378                        assert_eq!(0, value.parse::<u64>().unwrap());
379                    }
380                }
381
382                if stop_condition(metric, value) {
383                    success = true;
384                    break;
385                }
386            }
387
388            if success {
389                break;
390            }
391
392            context.sleep(Duration::from_secs(1)).await;
393        }
394
395        context.auditor().state()
396    })
397}
398
399/// Links (or unlinks) validators using the oracle.
400///
401/// The `restrict_to` function can be used to restrict the linking to certain connections,
402/// otherwise all validators will be linked to all other validators.
403pub async fn link_validators<TClock: commonware_runtime::Clock>(
404    oracle: &mut Oracle<PublicKey, TClock>,
405    validators: &[TestingNode<TClock>],
406    link: Link,
407    restrict_to: Option<fn(usize, usize, usize) -> bool>,
408) {
409    for (i1, v1) in validators.iter().enumerate() {
410        for (i2, v2) in validators.iter().enumerate() {
411            // Ignore self
412            if v1.public_key() == v2.public_key() {
413                continue;
414            }
415
416            // Restrict to certain connections
417            if let Some(f) = restrict_to
418                && !f(validators.len(), i1, i2)
419            {
420                continue;
421            }
422
423            // Add link
424            match oracle
425                .add_link(
426                    v1.public_key().clone(),
427                    v2.public_key().clone(),
428                    link.clone(),
429                )
430                .await
431            {
432                Ok(()) => (),
433                // TODO: it should be possible to remove the below if Commonware simulated network exposes list of registered peers.
434                //
435                // This is fine because some of the peers might be registered later
436                Err(commonware_p2p::simulated::Error::PeerMissing) => (),
437                // This is fine because we might call this multiple times as peers are joining the network.
438                Err(commonware_p2p::simulated::Error::LinkExists) => (),
439                res @ Err(_) => res.unwrap(),
440            }
441        }
442    }
443}
444
445/// Get the number of pipeline runs from the Prometheus metrics recorder
446pub fn get_pipeline_runs(recorder: &PrometheusRecorder) -> u64 {
447    recorder
448        .handle()
449        .render()
450        .lines()
451        .find(|line| line.starts_with("reth_consensus_engine_beacon_pipeline_runs"))
452        .and_then(|line| line.split_whitespace().nth(1)?.parse().ok())
453        .unwrap_or(0)
454}