1#![cfg_attr(not(test), warn(unused_crate_dependencies))]
11#![cfg_attr(docsrs, feature(doc_cfg))]
12
13use std::{net::SocketAddr, time::Duration};
14
15use commonware_cryptography::{
16 PrivateKeyExt as _, Signer as _,
17 bls12381::{dkg::ops, primitives::variant::MinSig},
18 ed25519::{PrivateKey, PublicKey},
19};
20use commonware_p2p::simulated::{self, Link, Network, Oracle};
21
22use commonware_runtime::{
23 Clock, Metrics as _, Runner as _,
24 deterministic::{self, Context, Runner},
25};
26use commonware_utils::{SystemTimeExt as _, quorum, set::OrderedAssociated};
27use futures::future::join_all;
28use itertools::Itertools as _;
29use reth_node_metrics::recorder::PrometheusRecorder;
30use tempo_commonware_node::consensus;
31
32pub mod execution_runtime;
33pub use execution_runtime::ExecutionNodeConfig;
34pub mod testing_node;
35pub use execution_runtime::ExecutionRuntime;
36pub use testing_node::TestingNode;
37
38#[cfg(test)]
39mod tests;
40
41pub const CONSENSUS_NODE_PREFIX: &str = "consensus";
42pub const EXECUTION_NODE_PREFIX: &str = "execution";
43
44#[derive(Clone)]
46pub struct Setup {
47 pub how_many_signers: u32,
49
50 pub how_many_verifiers: u32,
53
54 pub seed: u64,
56
57 pub linkage: Link,
59
60 pub epoch_length: u64,
62
63 pub connect_execution_layer_nodes: bool,
65
66 pub allegretto_time: Option<u64>,
70
71 pub allegretto_in_seconds: Option<u64>,
77
78 pub no_validators_in_genesis: bool,
80}
81
82impl Setup {
83 pub fn new() -> Self {
84 Self {
85 how_many_signers: 4,
86 how_many_verifiers: 0,
87 seed: 0,
88 linkage: Link {
89 latency: Duration::from_millis(10),
90 jitter: Duration::from_millis(1),
91 success_rate: 1.0,
92 },
93 epoch_length: 20,
94 connect_execution_layer_nodes: false,
95 allegretto_time: None,
96 allegretto_in_seconds: None,
97 no_validators_in_genesis: false,
98 }
99 }
100
101 pub fn how_many_signers(self, how_many_signers: u32) -> Self {
102 Self {
103 how_many_signers,
104 ..self
105 }
106 }
107
108 pub fn how_many_verifiers(self, how_many_verifiers: u32) -> Self {
109 Self {
110 how_many_verifiers,
111 ..self
112 }
113 }
114
115 pub fn seed(self, seed: u64) -> Self {
116 Self { seed, ..self }
117 }
118
119 pub fn linkage(self, linkage: Link) -> Self {
120 Self { linkage, ..self }
121 }
122
123 pub fn epoch_length(self, epoch_length: u64) -> Self {
124 Self {
125 epoch_length,
126 ..self
127 }
128 }
129
130 pub fn connect_execution_layer_nodes(self, connect_execution_layer_nodes: bool) -> Self {
131 Self {
132 connect_execution_layer_nodes,
133 ..self
134 }
135 }
136
137 pub fn allegretto_in_seconds(self, seconds: u64) -> Self {
141 Self {
142 allegretto_in_seconds: Some(seconds),
143 ..self
144 }
145 }
146
147 pub fn allegretto_time(self, allegretto_time: u64) -> Self {
154 Self {
155 allegretto_time: Some(allegretto_time),
156 ..self
157 }
158 }
159
160 pub fn no_validators_in_genesis(self) -> Self {
161 Self {
162 no_validators_in_genesis: true,
163 ..self
164 }
165 }
166}
167
168impl Default for Setup {
169 fn default() -> Self {
170 Self::new()
171 }
172}
173
174pub async fn setup_validators(
181 mut context: Context,
182 Setup {
183 how_many_signers,
184 how_many_verifiers,
185 seed,
186 connect_execution_layer_nodes,
187 linkage,
188 epoch_length,
189 allegretto_in_seconds,
190 allegretto_time,
191 no_validators_in_genesis,
192 }: Setup,
193) -> (Vec<TestingNode>, ExecutionRuntime) {
194 let (network, mut oracle) = Network::new(
195 context.with_label("network"),
196 simulated::Config {
197 max_size: 1024 * 1024,
198 disconnect_on_block: true,
199 tracked_peer_sets: Some(3),
200 },
201 );
202 network.start();
203
204 let mut private_keys = Vec::new();
205
206 for i in 0..(how_many_signers + how_many_verifiers) {
207 let signer = PrivateKey::from_seed(seed + u64::from(i));
208 private_keys.push(signer);
209 }
210 private_keys.sort_by_key(|s| s.public_key());
211
212 let threshold = quorum(how_many_signers);
213 let (polynomial, shares) =
214 ops::generate_shares::<_, MinSig>(&mut context, None, how_many_signers, threshold);
215
216 let mut nodes = Vec::new();
217
218 let peers: OrderedAssociated<_, _> = private_keys
222 .iter()
223 .take(how_many_signers as usize)
224 .cloned()
225 .enumerate()
226 .map(|(i, signer)| {
227 (
228 signer.public_key(),
229 SocketAddr::from(([127, 0, 0, 1], i as u16 + 1)),
230 )
231 })
232 .collect::<Vec<_>>()
233 .into();
234
235 let allegretto_time = match (allegretto_time, allegretto_in_seconds) {
236 (Some(_), Some(_)) => {
237 panic!("allegretto_time and allegretto_in_seconds are mutually exclusive")
238 }
239 (time @ Some(_), None) => time,
240 (None, Some(secs)) => Some(context.current().epoch().as_secs() + secs),
241 (None, None) => None,
242 };
243
244 let execution_runtime = ExecutionRuntime::builder()
245 .with_epoch_length(epoch_length)
246 .with_public_polynomial(polynomial)
247 .with_validators(peers)
248 .set_allegretto_time(allegretto_time)
249 .set_write_validators_into_genesis(!no_validators_in_genesis)
250 .launch()
251 .unwrap();
252
253 let shares: Vec<_> = shares
255 .into_iter()
256 .map(Some)
257 .chain(std::iter::repeat_n(None, how_many_verifiers as usize))
258 .collect();
259
260 let execution_configs = ExecutionNodeConfig::generator()
261 .with_count(how_many_signers + how_many_verifiers)
262 .with_peers(connect_execution_layer_nodes)
263 .generate();
264
265 for ((private_key, share), execution_config) in private_keys
266 .into_iter()
267 .zip_eq(shares)
268 .zip_eq(execution_configs)
269 {
270 let oracle = oracle.clone();
271 let uid = format!("{CONSENSUS_NODE_PREFIX}-{}", private_key.public_key());
272
273 let engine_config = consensus::Builder {
274 context: context.with_label(&uid),
275 fee_recipient: alloy_primitives::Address::ZERO,
276 execution_node: None,
277 blocker: oracle.control(private_key.public_key()),
278 peer_manager: oracle.socket_manager(),
279 partition_prefix: uid.clone(),
280 share,
281 signer: private_key.clone(),
282 mailbox_size: 1024,
283 deque_size: 10,
284 time_to_propose: Duration::from_secs(2),
285 time_to_collect_notarizations: Duration::from_secs(3),
286 time_to_retry_nullify_broadcast: Duration::from_secs(10),
287 time_for_peer_response: Duration::from_secs(2),
288 views_to_track: 10,
289 views_until_leader_skip: 5,
290 new_payload_wait_time: Duration::from_millis(200),
291 time_to_build_subblock: Duration::from_millis(100),
292 subblock_broadcast_interval: Duration::from_millis(50),
293 };
294
295 nodes.push(TestingNode::new(
296 uid,
297 private_key.public_key(),
298 oracle.clone(),
299 engine_config,
300 execution_runtime.handle(),
301 execution_config,
302 ));
303 }
304
305 link_validators(&mut oracle, &nodes, linkage, None).await;
306
307 (nodes, execution_runtime)
308}
309
310pub fn run(setup: Setup, mut stop_condition: impl FnMut(&str, &str) -> bool) -> String {
312 let cfg = deterministic::Config::default().with_seed(setup.seed);
313 let executor = Runner::from(cfg);
314
315 executor.start(|context| async move {
316 let (mut nodes, _execution_runtime) = setup_validators(context.clone(), setup).await;
318
319 join_all(nodes.iter_mut().map(|node| node.start())).await;
320
321 let pat = format!("{CONSENSUS_NODE_PREFIX}-");
322 loop {
323 let metrics = context.encode();
324
325 let mut success = false;
326 for line in metrics.lines() {
327 if !line.starts_with(&pat) {
328 continue;
329 }
330
331 let mut parts = line.split_whitespace();
332 let metric = parts.next().unwrap();
333 let value = parts.next().unwrap();
334
335 if metrics.ends_with("_peers_blocked") {
336 let value = value.parse::<u64>().unwrap();
337 assert_eq!(value, 0);
338 }
339
340 if stop_condition(metric, value) {
341 success = true;
342 break;
343 }
344 }
345
346 if success {
347 break;
348 }
349
350 context.sleep(Duration::from_secs(1)).await;
351 }
352
353 context.auditor().state()
354 })
355}
356
357pub async fn link_validators(
362 oracle: &mut Oracle<PublicKey>,
363 validators: &[TestingNode],
364 link: Link,
365 restrict_to: Option<fn(usize, usize, usize) -> bool>,
366) {
367 for (i1, v1) in validators.iter().enumerate() {
368 for (i2, v2) in validators.iter().enumerate() {
369 if v1.public_key() == v2.public_key() {
371 continue;
372 }
373
374 if let Some(f) = restrict_to
376 && !f(validators.len(), i1, i2)
377 {
378 continue;
379 }
380
381 match oracle
383 .add_link(
384 v1.public_key().clone(),
385 v2.public_key().clone(),
386 link.clone(),
387 )
388 .await
389 {
390 Ok(()) => (),
391 Err(commonware_p2p::simulated::Error::PeerMissing) => (),
395 Err(commonware_p2p::simulated::Error::LinkExists) => (),
397 res @ Err(_) => res.unwrap(),
398 }
399 }
400 }
401}
402
403pub fn get_pipeline_runs(recorder: &PrometheusRecorder) -> u64 {
405 recorder
406 .handle()
407 .render()
408 .lines()
409 .find(|line| line.starts_with("reth_consensus_engine_beacon_pipeline_runs"))
410 .and_then(|line| line.split_whitespace().nth(1)?.parse().ok())
411 .unwrap_or(0)
412}