Skip to main content

tempo_e2e/
lib.rs

1//! e2e tests using the [`commonware_runtime::deterministic`].
2//!
3//! This crate mimics how a full tempo node is run in production but runs the
4//! consensus engine in a deterministic runtime while maintaining a tokio
5//! async environment to launch execution nodes.
6//!
7//! All definitions herein are only intended to support the the tests defined
8//! in tests/.
9
10#![cfg_attr(not(test), warn(unused_crate_dependencies))]
11#![cfg_attr(docsrs, feature(doc_cfg))]
12
13use std::{iter::repeat_with, net::SocketAddr, time::Duration};
14
15use alloy_primitives::Address;
16use commonware_consensus::types::Epoch;
17use commonware_cryptography::{
18    Signer as _,
19    bls12381::{
20        dkg::{self},
21        primitives::{group::Share, sharing::Mode},
22    },
23    ed25519::{PrivateKey, PublicKey},
24};
25use commonware_math::algebra::Random as _;
26use commonware_p2p::simulated::{self, Link, Network, Oracle};
27
28use commonware_codec::Encode;
29use commonware_runtime::{
30    Clock, Metrics as _, Runner as _,
31    deterministic::{self, Context, Runner},
32};
33use commonware_utils::{N3f1, TryFromIterator as _, ordered};
34use futures::future::join_all;
35use itertools::Itertools as _;
36use rand_core::CryptoRngCore;
37use reth_node_metrics::recorder::PrometheusRecorder;
38use tempo_commonware_node::{consensus, feed::FeedStateHandle};
39
40pub mod execution_runtime;
41pub use execution_runtime::ExecutionNodeConfig;
42pub mod testing_node;
43pub use execution_runtime::ExecutionRuntime;
44use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
45pub use testing_node::TestingNode;
46
47#[cfg(test)]
48mod tests;
49
50pub const CONSENSUS_NODE_PREFIX: &str = "consensus";
51pub const EXECUTION_NODE_PREFIX: &str = "execution";
52
53fn generate_consensus_node_config(
54    rng: &mut impl CryptoRngCore,
55    signers: u32,
56    verifiers: u32,
57    fee_recipient: Address,
58) -> (
59    OnchainDkgOutcome,
60    ordered::Map<PublicKey, ConsensusNodeConfig>,
61) {
62    let signer_keys = repeat_with(|| PrivateKey::random(&mut *rng))
63        .take(signers as usize)
64        .collect::<Vec<_>>();
65
66    let (initial_dkg_outcome, shares) = dkg::deal::<_, _, N3f1>(
67        &mut *rng,
68        Mode::NonZeroCounter,
69        ordered::Set::try_from_iter(signer_keys.iter().map(|key| key.public_key())).unwrap(),
70    )
71    .unwrap();
72
73    let onchain_dkg_outcome = OnchainDkgOutcome {
74        epoch: Epoch::zero(),
75        output: initial_dkg_outcome,
76        next_players: shares.keys().clone(),
77        is_next_full_dkg: false,
78    };
79
80    let verifier_keys = repeat_with(|| PrivateKey::random(&mut *rng))
81        .take(verifiers as usize)
82        .collect::<Vec<_>>();
83
84    let validators = ordered::Map::try_from_iter(
85        signer_keys
86            .into_iter()
87            .chain(verifier_keys)
88            .enumerate()
89            .map(|(i, private_key)| {
90                let public_key = private_key.public_key();
91                let config = ConsensusNodeConfig {
92                    address: crate::execution_runtime::validator(i as u32),
93                    ingress: SocketAddr::from(([127, 0, 0, (i + 1) as u8], 8000)),
94                    egress: SocketAddr::from(([127, 0, 0, (i + 1) as u8], 0)),
95                    fee_recipient,
96                    private_key,
97                    share: shares.get_value(&public_key).cloned(),
98                };
99                (public_key, config)
100            }),
101    )
102    .unwrap();
103
104    (onchain_dkg_outcome, validators)
105}
106
107/// Configuration for a validator.
108#[derive(Clone, Debug)]
109pub struct ConsensusNodeConfig {
110    pub address: Address,
111    pub ingress: SocketAddr,
112    pub egress: SocketAddr,
113    pub fee_recipient: Address,
114    pub private_key: PrivateKey,
115    pub share: Option<Share>,
116}
117
118/// The test setup run by [`run`].
119#[derive(Clone)]
120pub struct Setup {
121    /// How many signing validators to launch.
122    pub how_many_signers: u32,
123
124    /// How many non-signing validators (verifiers) to launch.
125    /// These nodes participate in consensus but don't have shares.
126    pub how_many_verifiers: u32,
127
128    /// The seed used for setting up the deterministic runtime.
129    pub seed: u64,
130
131    /// The linkage between individual validators.
132    pub linkage: Link,
133
134    /// The number of heights in an epoch.
135    pub epoch_length: u64,
136
137    /// The amount of time the node waits for the execution layer to return
138    /// a build a payload.
139    pub new_payload_wait_time: Duration,
140
141    /// The t4 hardfork time.
142    ///
143    /// Default: `None` (not activated).
144    pub t4_time: Option<u64>,
145
146    /// Whether to activate subblocks building.
147    pub with_subblocks: bool,
148
149    /// The fee recipient written into the V2 contract for each validator.
150    pub fee_recipient: Address,
151}
152
153impl Setup {
154    pub fn new() -> Self {
155        Self {
156            how_many_signers: 4,
157            how_many_verifiers: 0,
158            seed: 0,
159            linkage: Link {
160                latency: Duration::from_millis(10),
161                jitter: Duration::from_millis(1),
162                success_rate: 1.0,
163            },
164            epoch_length: 20,
165            new_payload_wait_time: Duration::from_millis(300),
166            t4_time: None,
167            with_subblocks: false,
168            fee_recipient: Address::ZERO,
169        }
170    }
171
172    pub fn how_many_signers(self, how_many_signers: u32) -> Self {
173        Self {
174            how_many_signers,
175            ..self
176        }
177    }
178
179    pub fn how_many_verifiers(self, how_many_verifiers: u32) -> Self {
180        Self {
181            how_many_verifiers,
182            ..self
183        }
184    }
185
186    pub fn seed(self, seed: u64) -> Self {
187        Self { seed, ..self }
188    }
189
190    pub fn linkage(self, linkage: Link) -> Self {
191        Self { linkage, ..self }
192    }
193
194    pub fn epoch_length(self, epoch_length: u64) -> Self {
195        Self {
196            epoch_length,
197            ..self
198        }
199    }
200
201    pub fn new_payload_wait_time(self, new_payload_wait_time: Duration) -> Self {
202        Self {
203            new_payload_wait_time,
204            ..self
205        }
206    }
207
208    pub fn subblocks(self, with_subblocks: bool) -> Self {
209        Self {
210            with_subblocks,
211            ..self
212        }
213    }
214
215    pub fn fee_recipient(self, fee_recipient: Address) -> Self {
216        Self {
217            fee_recipient,
218            ..self
219        }
220    }
221
222    pub fn t4_time(self, t4_time: u64) -> Self {
223        Self {
224            t4_time: Some(t4_time),
225            ..self
226        }
227    }
228}
229
230impl Default for Setup {
231    fn default() -> Self {
232        Self::new()
233    }
234}
235
236/// Sets up validators and returns the nodes and execution runtime.
237///
238/// The execution runtime is created internally with a chainspec configured
239/// according to the Setup parameters (epoch_length, validators, polynomial).
240///
241/// The oracle is accessible via `TestingNode::oracle()` if needed for dynamic linking.
242pub async fn setup_validators(
243    context: &mut Context,
244    Setup {
245        epoch_length,
246        how_many_signers,
247        how_many_verifiers,
248        linkage,
249        new_payload_wait_time,
250        t4_time,
251        with_subblocks,
252        fee_recipient,
253        ..
254    }: Setup,
255) -> (Vec<TestingNode<Context>>, ExecutionRuntime) {
256    let (network, mut oracle) = Network::new(
257        context.with_label("network"),
258        simulated::Config {
259            max_size: 1024 * 1024,
260            disconnect_on_block: true,
261            tracked_peer_sets: commonware_utils::NZUsize!(3),
262        },
263    );
264    network.start();
265
266    let (onchain_dkg_outcome, validators) = generate_consensus_node_config(
267        context,
268        how_many_signers,
269        how_many_verifiers,
270        fee_recipient,
271    );
272
273    let execution_runtime = ExecutionRuntime::builder()
274        .with_epoch_length(epoch_length)
275        .with_initial_dkg_outcome(onchain_dkg_outcome)
276        .with_t4_time(t4_time)
277        .with_validators(validators.clone())
278        .launch()
279        .unwrap();
280
281    let execution_configs = ExecutionNodeConfig::generator()
282        .with_count(how_many_signers + how_many_verifiers)
283        .generate();
284
285    let mut nodes = vec![];
286
287    for ((public_key, consensus_node_config), mut execution_config) in
288        validators.into_iter().zip_eq(execution_configs)
289    {
290        let ConsensusNodeConfig {
291            address,
292            ingress,
293            private_key,
294            share,
295            ..
296        } = consensus_node_config;
297        let oracle = oracle.clone();
298        let uid = format!("{CONSENSUS_NODE_PREFIX}_{public_key}");
299        let feed_state = FeedStateHandle::new();
300
301        execution_config.validator_key = Some(public_key.encode().as_ref().try_into().unwrap());
302        execution_config.feed_state = Some(feed_state.clone());
303
304        let engine_config = consensus::Builder {
305            execution_node: None,
306            blocker: oracle.control(private_key.public_key()),
307            peer_manager: oracle.socket_manager(),
308            partition_prefix: uid.clone(),
309            share,
310            signer: private_key.clone(),
311            mailbox_size: 1024,
312            deque_size: 10,
313            time_to_propose: Duration::from_secs(2),
314            time_to_collect_notarizations: Duration::from_secs(3),
315            time_to_retry_nullify_broadcast: Duration::from_secs(10),
316            time_for_peer_response: Duration::from_secs(2),
317            views_to_track: 10,
318            views_until_leader_skip: 5,
319            payload_interrupt_time: Duration::from_millis(200),
320            new_payload_wait_time,
321            time_to_build_subblock: Duration::from_millis(100),
322            subblock_broadcast_interval: Duration::from_millis(50),
323            fcu_heartbeat_interval: Duration::from_secs(3),
324            feed_state,
325            with_subblocks,
326        };
327
328        nodes.push(TestingNode::new(
329            uid,
330            private_key,
331            oracle.clone(),
332            engine_config,
333            execution_runtime.handle(),
334            execution_config,
335            ingress,
336            address,
337        ));
338    }
339
340    link_validators(&mut oracle, &nodes, linkage, None).await;
341
342    (nodes, execution_runtime)
343}
344
345/// Runs a test configured by [`Setup`].
346pub fn run(setup: Setup, mut stop_condition: impl FnMut(&str, &str) -> bool) -> String {
347    let cfg = deterministic::Config::default().with_seed(setup.seed);
348    let executor = Runner::from(cfg);
349
350    executor.start(|mut context| async move {
351        // Setup and run all validators.
352        let (mut nodes, _execution_runtime) = setup_validators(&mut context, setup.clone()).await;
353        join_all(nodes.iter_mut().map(|node| node.start(&context))).await;
354
355        loop {
356            let metrics = context.encode();
357
358            let mut success = false;
359            for line in metrics.lines() {
360                if !line.starts_with(CONSENSUS_NODE_PREFIX) {
361                    continue;
362                }
363
364                let mut parts = line.split_whitespace();
365                let metric = parts.next().unwrap();
366                let value = parts.next().unwrap();
367
368                if metric.ends_with("_peers_blocked") {
369                    let value = value.parse::<u64>().unwrap();
370                    assert_eq!(value, 0);
371                }
372
373                if stop_condition(metric, value) {
374                    success = true;
375                    break;
376                }
377            }
378
379            if success {
380                break;
381            }
382
383            context.sleep(Duration::from_secs(1)).await;
384        }
385
386        context.auditor().state()
387    })
388}
389
390/// Connects a running node to a set of peers
391///
392/// Useful when a node is restarted and needs to re-connect to its previous peers as
393/// ports are not statically defined.
394pub async fn connect_execution_to_peers<TClock: commonware_runtime::Clock>(
395    node: &TestingNode<TClock>,
396    nodes: &[TestingNode<TClock>],
397) {
398    for other in nodes {
399        if node.public_key() == other.public_key() {
400            continue;
401        }
402
403        if let (Some(a), Some(b)) = (node.execution_node.as_ref(), other.execution_node.as_ref()) {
404            a.connect_peer(b).await;
405        }
406    }
407}
408
409/// Connects all running execution nodes as peers.
410///
411/// This must be called after nodes are started so that the ports are known
412pub async fn connect_execution_peers<TClock: commonware_runtime::Clock>(
413    nodes: &[TestingNode<TClock>],
414) {
415    for i in 0..nodes.len() {
416        connect_execution_to_peers(&nodes[i], &nodes[(i + 1)..]).await;
417    }
418}
419
420/// Links (or unlinks) validators using the oracle.
421///
422/// The `restrict_to` function can be used to restrict the linking to certain connections,
423/// otherwise all validators will be linked to all other validators.
424pub async fn link_validators<TClock: commonware_runtime::Clock>(
425    oracle: &mut Oracle<PublicKey, TClock>,
426    validators: &[TestingNode<TClock>],
427    link: Link,
428    restrict_to: Option<fn(usize, usize, usize) -> bool>,
429) {
430    for (i1, v1) in validators.iter().enumerate() {
431        for (i2, v2) in validators.iter().enumerate() {
432            // Ignore self
433            if v1.public_key() == v2.public_key() {
434                continue;
435            }
436
437            // Restrict to certain connections
438            if let Some(f) = restrict_to
439                && !f(validators.len(), i1, i2)
440            {
441                continue;
442            }
443
444            // Add link
445            match oracle
446                .add_link(
447                    v1.public_key().clone(),
448                    v2.public_key().clone(),
449                    link.clone(),
450                )
451                .await
452            {
453                Ok(()) => (),
454                // TODO: it should be possible to remove the below if Commonware simulated network exposes list of registered peers.
455                //
456                // This is fine because some of the peers might be registered later
457                Err(commonware_p2p::simulated::Error::PeerMissing) => (),
458                // This is fine because we might call this multiple times as peers are joining the network.
459                Err(commonware_p2p::simulated::Error::LinkExists) => (),
460                res @ Err(_) => res.unwrap(),
461            }
462        }
463    }
464}
465
466/// Get the number of pipeline runs from the Prometheus metrics recorder
467pub fn get_pipeline_runs(recorder: &PrometheusRecorder) -> u64 {
468    recorder
469        .handle()
470        .render()
471        .lines()
472        .find(|line| line.starts_with("reth_consensus_engine_beacon_pipeline_runs"))
473        .and_then(|line| line.split_whitespace().nth(1)?.parse().ok())
474        .unwrap_or(0)
475}