tempo_e2e/
lib.rs

1//! e2e tests using the [`commonware_runtime::deterministic`].
2//!
3//! This crate mimics how a full tempo node is run in production but runs the
4//! consensus engine in a deterministic runtime while maintaining a tokio
5//! async environment to launch execution nodes.
6//!
7//! All definitions herein are only intended to support the the tests defined
8//! in tests/.
9
10#![cfg_attr(not(test), warn(unused_crate_dependencies))]
11#![cfg_attr(docsrs, feature(doc_cfg))]
12
13use std::{net::SocketAddr, time::Duration};
14
15use commonware_cryptography::{
16    PrivateKeyExt as _, Signer as _,
17    bls12381::{dkg::ops, primitives::variant::MinSig},
18    ed25519::{PrivateKey, PublicKey},
19};
20use commonware_p2p::simulated::{self, Link, Network, Oracle};
21
22use commonware_runtime::{
23    Clock, Metrics as _, Runner as _,
24    deterministic::{self, Context, Runner},
25};
26use commonware_utils::{SystemTimeExt as _, quorum, set::OrderedAssociated};
27use futures::future::join_all;
28use itertools::Itertools as _;
29use reth_node_metrics::recorder::PrometheusRecorder;
30use tempo_commonware_node::consensus;
31
32pub mod execution_runtime;
33pub use execution_runtime::ExecutionNodeConfig;
34pub mod testing_node;
35pub use execution_runtime::ExecutionRuntime;
36pub use testing_node::TestingNode;
37
38#[cfg(test)]
39mod tests;
40
41pub const CONSENSUS_NODE_PREFIX: &str = "consensus";
42pub const EXECUTION_NODE_PREFIX: &str = "execution";
43
44/// The test setup run by [`run`].
45#[derive(Clone)]
46pub struct Setup {
47    /// How many signing validators to launch.
48    pub how_many_signers: u32,
49
50    /// How many non-signing validators (verifiers) to launch.
51    /// These nodes participate in consensus but don't have shares.
52    pub how_many_verifiers: u32,
53
54    /// The seed used for setting up the deterministic runtime.
55    pub seed: u64,
56
57    /// The linkage between individual validators.
58    pub linkage: Link,
59
60    /// The number of heights in an epoch.
61    pub epoch_length: u64,
62
63    /// Whether to connect execution layer nodes directly.
64    pub connect_execution_layer_nodes: bool,
65
66    /// A specific value to set allegretto_time to in chainspec.
67    ///
68    /// Mutually exclusive with `allegretto_in_seconds`.
69    pub allegretto_time: Option<u64>,
70
71    /// The value to add to the current time (the system time the
72    /// test is run at), which will be used for allegretto_time in
73    /// chainspec.
74    ///
75    /// Mutually exclusive with `allegretto_in_seconds`.
76    pub allegretto_in_seconds: Option<u64>,
77
78    /// Whether validators should be written into the genesis block.
79    pub no_validators_in_genesis: bool,
80}
81
82impl Setup {
83    pub fn new() -> Self {
84        Self {
85            how_many_signers: 4,
86            how_many_verifiers: 0,
87            seed: 0,
88            linkage: Link {
89                latency: Duration::from_millis(10),
90                jitter: Duration::from_millis(1),
91                success_rate: 1.0,
92            },
93            epoch_length: 20,
94            connect_execution_layer_nodes: false,
95            allegretto_time: None,
96            allegretto_in_seconds: None,
97            no_validators_in_genesis: false,
98        }
99    }
100
101    pub fn how_many_signers(self, how_many_signers: u32) -> Self {
102        Self {
103            how_many_signers,
104            ..self
105        }
106    }
107
108    pub fn how_many_verifiers(self, how_many_verifiers: u32) -> Self {
109        Self {
110            how_many_verifiers,
111            ..self
112        }
113    }
114
115    pub fn seed(self, seed: u64) -> Self {
116        Self { seed, ..self }
117    }
118
119    pub fn linkage(self, linkage: Link) -> Self {
120        Self { linkage, ..self }
121    }
122
123    pub fn epoch_length(self, epoch_length: u64) -> Self {
124        Self {
125            epoch_length,
126            ..self
127        }
128    }
129
130    pub fn connect_execution_layer_nodes(self, connect_execution_layer_nodes: bool) -> Self {
131        Self {
132            connect_execution_layer_nodes,
133            ..self
134        }
135    }
136
137    /// Instructs setup to set chainspec allegretto time to `seconds` from now.
138    ///
139    /// Do not provide `allegretto_time` together with this option.
140    pub fn allegretto_in_seconds(self, seconds: u64) -> Self {
141        Self {
142            allegretto_in_seconds: Some(seconds),
143            ..self
144        }
145    }
146
147    /// Sets `allegretto_time`.
148    ///
149    /// If the allegretto hardfork is supposed to be active at genesis, pass
150    /// `allegretto_time = 0`.
151    ///
152    /// Do not provide `allegretto_in_seconds` together with this option.
153    pub fn allegretto_time(self, allegretto_time: u64) -> Self {
154        Self {
155            allegretto_time: Some(allegretto_time),
156            ..self
157        }
158    }
159
160    pub fn no_validators_in_genesis(self) -> Self {
161        Self {
162            no_validators_in_genesis: true,
163            ..self
164        }
165    }
166}
167
168impl Default for Setup {
169    fn default() -> Self {
170        Self::new()
171    }
172}
173
174/// Sets up validators and returns the nodes and execution runtime.
175///
176/// The execution runtime is created internally with a chainspec configured
177/// according to the Setup parameters (epoch_length, allegretto, validators, polynomial).
178///
179/// The oracle is accessible via `TestingNode::oracle()` if needed for dynamic linking.
180pub async fn setup_validators(
181    mut context: Context,
182    Setup {
183        how_many_signers,
184        how_many_verifiers,
185        seed,
186        connect_execution_layer_nodes,
187        linkage,
188        epoch_length,
189        allegretto_in_seconds,
190        allegretto_time,
191        no_validators_in_genesis,
192    }: Setup,
193) -> (Vec<TestingNode>, ExecutionRuntime) {
194    let (network, mut oracle) = Network::new(
195        context.with_label("network"),
196        simulated::Config {
197            max_size: 1024 * 1024,
198            disconnect_on_block: true,
199            tracked_peer_sets: Some(3),
200        },
201    );
202    network.start();
203
204    let mut private_keys = Vec::new();
205
206    for i in 0..(how_many_signers + how_many_verifiers) {
207        let signer = PrivateKey::from_seed(seed + u64::from(i));
208        private_keys.push(signer);
209    }
210    private_keys.sort_by_key(|s| s.public_key());
211
212    let threshold = quorum(how_many_signers);
213    let (polynomial, shares) =
214        ops::generate_shares::<_, MinSig>(&mut context, None, how_many_signers, threshold);
215
216    let mut nodes = Vec::new();
217
218    // The actual port here does not matter because in the simulated p2p
219    // oracle it will be ignored. But it's nice because the nodes can be
220    // more easily identified in some logs..
221    let peers: OrderedAssociated<_, _> = private_keys
222        .iter()
223        .take(how_many_signers as usize)
224        .cloned()
225        .enumerate()
226        .map(|(i, signer)| {
227            (
228                signer.public_key(),
229                SocketAddr::from(([127, 0, 0, 1], i as u16 + 1)),
230            )
231        })
232        .collect::<Vec<_>>()
233        .into();
234
235    let allegretto_time = match (allegretto_time, allegretto_in_seconds) {
236        (Some(_), Some(_)) => {
237            panic!("allegretto_time and allegretto_in_seconds are mutually exclusive")
238        }
239        (time @ Some(_), None) => time,
240        (None, Some(secs)) => Some(context.current().epoch().as_secs() + secs),
241        (None, None) => None,
242    };
243
244    let execution_runtime = ExecutionRuntime::builder()
245        .with_epoch_length(epoch_length)
246        .with_public_polynomial(polynomial)
247        .with_validators(peers)
248        .set_allegretto_time(allegretto_time)
249        .set_write_validators_into_genesis(!no_validators_in_genesis)
250        .launch()
251        .unwrap();
252
253    // Extend shares with None for verifiers
254    let shares: Vec<_> = shares
255        .into_iter()
256        .map(Some)
257        .chain(std::iter::repeat_n(None, how_many_verifiers as usize))
258        .collect();
259
260    let execution_configs = ExecutionNodeConfig::generator()
261        .with_count(how_many_signers + how_many_verifiers)
262        .with_peers(connect_execution_layer_nodes)
263        .generate();
264
265    for ((private_key, share), execution_config) in private_keys
266        .into_iter()
267        .zip_eq(shares)
268        .zip_eq(execution_configs)
269    {
270        let oracle = oracle.clone();
271        let uid = format!("{CONSENSUS_NODE_PREFIX}-{}", private_key.public_key());
272
273        let engine_config = consensus::Builder {
274            context: context.with_label(&uid),
275            fee_recipient: alloy_primitives::Address::ZERO,
276            execution_node: None,
277            blocker: oracle.control(private_key.public_key()),
278            peer_manager: oracle.socket_manager(),
279            partition_prefix: uid.clone(),
280            share,
281            signer: private_key.clone(),
282            mailbox_size: 1024,
283            deque_size: 10,
284            time_to_propose: Duration::from_secs(2),
285            time_to_collect_notarizations: Duration::from_secs(3),
286            time_to_retry_nullify_broadcast: Duration::from_secs(10),
287            time_for_peer_response: Duration::from_secs(2),
288            views_to_track: 10,
289            views_until_leader_skip: 5,
290            new_payload_wait_time: Duration::from_millis(200),
291            time_to_build_subblock: Duration::from_millis(100),
292            subblock_broadcast_interval: Duration::from_millis(50),
293        };
294
295        nodes.push(TestingNode::new(
296            uid,
297            private_key.public_key(),
298            oracle.clone(),
299            engine_config,
300            execution_runtime.handle(),
301            execution_config,
302        ));
303    }
304
305    link_validators(&mut oracle, &nodes, linkage, None).await;
306
307    (nodes, execution_runtime)
308}
309
310/// Runs a test configured by [`Setup`].
311pub fn run(setup: Setup, mut stop_condition: impl FnMut(&str, &str) -> bool) -> String {
312    let cfg = deterministic::Config::default().with_seed(setup.seed);
313    let executor = Runner::from(cfg);
314
315    executor.start(|context| async move {
316        // Setup and run all validators.
317        let (mut nodes, _execution_runtime) = setup_validators(context.clone(), setup).await;
318
319        join_all(nodes.iter_mut().map(|node| node.start())).await;
320
321        let pat = format!("{CONSENSUS_NODE_PREFIX}-");
322        loop {
323            let metrics = context.encode();
324
325            let mut success = false;
326            for line in metrics.lines() {
327                if !line.starts_with(&pat) {
328                    continue;
329                }
330
331                let mut parts = line.split_whitespace();
332                let metric = parts.next().unwrap();
333                let value = parts.next().unwrap();
334
335                if metrics.ends_with("_peers_blocked") {
336                    let value = value.parse::<u64>().unwrap();
337                    assert_eq!(value, 0);
338                }
339
340                if stop_condition(metric, value) {
341                    success = true;
342                    break;
343                }
344            }
345
346            if success {
347                break;
348            }
349
350            context.sleep(Duration::from_secs(1)).await;
351        }
352
353        context.auditor().state()
354    })
355}
356
357/// Links (or unlinks) validators using the oracle.
358///
359/// The `restrict_to` function can be used to restrict the linking to certain connections,
360/// otherwise all validators will be linked to all other validators.
361pub async fn link_validators(
362    oracle: &mut Oracle<PublicKey>,
363    validators: &[TestingNode],
364    link: Link,
365    restrict_to: Option<fn(usize, usize, usize) -> bool>,
366) {
367    for (i1, v1) in validators.iter().enumerate() {
368        for (i2, v2) in validators.iter().enumerate() {
369            // Ignore self
370            if v1.public_key() == v2.public_key() {
371                continue;
372            }
373
374            // Restrict to certain connections
375            if let Some(f) = restrict_to
376                && !f(validators.len(), i1, i2)
377            {
378                continue;
379            }
380
381            // Add link
382            match oracle
383                .add_link(
384                    v1.public_key().clone(),
385                    v2.public_key().clone(),
386                    link.clone(),
387                )
388                .await
389            {
390                Ok(()) => (),
391                // TODO: it should be possible to remove the below if Commonware simulated network exposes list of registered peers.
392                //
393                // This is fine because some of the peers might be registered later
394                Err(commonware_p2p::simulated::Error::PeerMissing) => (),
395                // This is fine because we might call this multiple times as peers are joining the network.
396                Err(commonware_p2p::simulated::Error::LinkExists) => (),
397                res @ Err(_) => res.unwrap(),
398            }
399        }
400    }
401}
402
403/// Get the number of pipeline runs from the Prometheus metrics recorder
404pub fn get_pipeline_runs(recorder: &PrometheusRecorder) -> u64 {
405    recorder
406        .handle()
407        .render()
408        .lines()
409        .find(|line| line.starts_with("reth_consensus_engine_beacon_pipeline_runs"))
410        .and_then(|line| line.split_whitespace().nth(1)?.parse().ok())
411        .unwrap_or(0)
412}