1#![cfg_attr(not(test), warn(unused_crate_dependencies))]
11#![cfg_attr(docsrs, feature(doc_cfg))]
12
13use std::{iter::repeat_with, net::SocketAddr, time::Duration};
14
15use alloy_primitives::Address;
16use commonware_consensus::types::Epoch;
17use commonware_cryptography::{
18 Signer as _,
19 bls12381::{
20 dkg::{self},
21 primitives::{group::Share, sharing::Mode},
22 },
23 ed25519::{PrivateKey, PublicKey},
24};
25use commonware_math::algebra::Random as _;
26use commonware_p2p::simulated::{self, Link, Network, Oracle};
27
28use commonware_codec::Encode;
29use commonware_runtime::{
30 Clock, Metrics as _, Runner as _,
31 deterministic::{self, Context, Runner},
32};
33use commonware_utils::{N3f1, TryFromIterator as _, ordered};
34use futures::future::join_all;
35use itertools::Itertools as _;
36use rand_core::CryptoRngCore;
37use reth_node_metrics::recorder::PrometheusRecorder;
38use tempo_commonware_node::{consensus, feed::FeedStateHandle};
39
40pub mod execution_runtime;
41pub use execution_runtime::ExecutionNodeConfig;
42pub mod testing_node;
43pub use execution_runtime::ExecutionRuntime;
44use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
45pub use testing_node::TestingNode;
46
47#[cfg(test)]
48mod tests;
49
50pub const CONSENSUS_NODE_PREFIX: &str = "consensus";
51pub const EXECUTION_NODE_PREFIX: &str = "execution";
52
53fn generate_consensus_node_config(
54 rng: &mut impl CryptoRngCore,
55 signers: u32,
56 verifiers: u32,
57 fee_recipient: Address,
58) -> (
59 OnchainDkgOutcome,
60 ordered::Map<PublicKey, ConsensusNodeConfig>,
61) {
62 let signer_keys = repeat_with(|| PrivateKey::random(&mut *rng))
63 .take(signers as usize)
64 .collect::<Vec<_>>();
65
66 let (initial_dkg_outcome, shares) = dkg::deal::<_, _, N3f1>(
67 &mut *rng,
68 Mode::NonZeroCounter,
69 ordered::Set::try_from_iter(signer_keys.iter().map(|key| key.public_key())).unwrap(),
70 )
71 .unwrap();
72
73 let onchain_dkg_outcome = OnchainDkgOutcome {
74 epoch: Epoch::zero(),
75 output: initial_dkg_outcome,
76 next_players: shares.keys().clone(),
77 is_next_full_dkg: false,
78 };
79
80 let verifier_keys = repeat_with(|| PrivateKey::random(&mut *rng))
81 .take(verifiers as usize)
82 .collect::<Vec<_>>();
83
84 let validators = ordered::Map::try_from_iter(
85 signer_keys
86 .into_iter()
87 .chain(verifier_keys)
88 .enumerate()
89 .map(|(i, private_key)| {
90 let public_key = private_key.public_key();
91 let config = ConsensusNodeConfig {
92 address: crate::execution_runtime::validator(i as u32),
93 ingress: SocketAddr::from(([127, 0, 0, (i + 1) as u8], 8000)),
94 egress: SocketAddr::from(([127, 0, 0, (i + 1) as u8], 0)),
95 fee_recipient,
96 private_key,
97 share: shares.get_value(&public_key).cloned(),
98 };
99 (public_key, config)
100 }),
101 )
102 .unwrap();
103
104 (onchain_dkg_outcome, validators)
105}
106
107#[derive(Clone, Debug)]
109pub struct ConsensusNodeConfig {
110 pub address: Address,
111 pub ingress: SocketAddr,
112 pub egress: SocketAddr,
113 pub fee_recipient: Address,
114 pub private_key: PrivateKey,
115 pub share: Option<Share>,
116}
117
118#[derive(Clone)]
120pub struct Setup {
121 pub how_many_signers: u32,
123
124 pub how_many_verifiers: u32,
127
128 pub seed: u64,
130
131 pub linkage: Link,
133
134 pub epoch_length: u64,
136
137 pub new_payload_wait_time: Duration,
140
141 pub t4_time: Option<u64>,
145
146 pub with_subblocks: bool,
148
149 pub fee_recipient: Address,
151}
152
153impl Setup {
154 pub fn new() -> Self {
155 Self {
156 how_many_signers: 4,
157 how_many_verifiers: 0,
158 seed: 0,
159 linkage: Link {
160 latency: Duration::from_millis(10),
161 jitter: Duration::from_millis(1),
162 success_rate: 1.0,
163 },
164 epoch_length: 20,
165 new_payload_wait_time: Duration::from_millis(300),
166 t4_time: None,
167 with_subblocks: false,
168 fee_recipient: Address::ZERO,
169 }
170 }
171
172 pub fn how_many_signers(self, how_many_signers: u32) -> Self {
173 Self {
174 how_many_signers,
175 ..self
176 }
177 }
178
179 pub fn how_many_verifiers(self, how_many_verifiers: u32) -> Self {
180 Self {
181 how_many_verifiers,
182 ..self
183 }
184 }
185
186 pub fn seed(self, seed: u64) -> Self {
187 Self { seed, ..self }
188 }
189
190 pub fn linkage(self, linkage: Link) -> Self {
191 Self { linkage, ..self }
192 }
193
194 pub fn epoch_length(self, epoch_length: u64) -> Self {
195 Self {
196 epoch_length,
197 ..self
198 }
199 }
200
201 pub fn new_payload_wait_time(self, new_payload_wait_time: Duration) -> Self {
202 Self {
203 new_payload_wait_time,
204 ..self
205 }
206 }
207
208 pub fn subblocks(self, with_subblocks: bool) -> Self {
209 Self {
210 with_subblocks,
211 ..self
212 }
213 }
214
215 pub fn fee_recipient(self, fee_recipient: Address) -> Self {
216 Self {
217 fee_recipient,
218 ..self
219 }
220 }
221
222 pub fn t4_time(self, t4_time: u64) -> Self {
223 Self {
224 t4_time: Some(t4_time),
225 ..self
226 }
227 }
228}
229
230impl Default for Setup {
231 fn default() -> Self {
232 Self::new()
233 }
234}
235
236pub async fn setup_validators(
243 context: &mut Context,
244 Setup {
245 epoch_length,
246 how_many_signers,
247 how_many_verifiers,
248 linkage,
249 new_payload_wait_time,
250 t4_time,
251 with_subblocks,
252 fee_recipient,
253 ..
254 }: Setup,
255) -> (Vec<TestingNode<Context>>, ExecutionRuntime) {
256 let (network, mut oracle) = Network::new(
257 context.with_label("network"),
258 simulated::Config {
259 max_size: 1024 * 1024,
260 disconnect_on_block: true,
261 tracked_peer_sets: commonware_utils::NZUsize!(3),
262 },
263 );
264 network.start();
265
266 let (onchain_dkg_outcome, validators) = generate_consensus_node_config(
267 context,
268 how_many_signers,
269 how_many_verifiers,
270 fee_recipient,
271 );
272
273 let execution_runtime = ExecutionRuntime::builder()
274 .with_epoch_length(epoch_length)
275 .with_initial_dkg_outcome(onchain_dkg_outcome)
276 .with_t4_time(t4_time)
277 .with_validators(validators.clone())
278 .launch()
279 .unwrap();
280
281 let execution_configs = ExecutionNodeConfig::generator()
282 .with_count(how_many_signers + how_many_verifiers)
283 .generate();
284
285 let mut nodes = vec![];
286
287 for ((public_key, consensus_node_config), mut execution_config) in
288 validators.into_iter().zip_eq(execution_configs)
289 {
290 let ConsensusNodeConfig {
291 address,
292 ingress,
293 private_key,
294 share,
295 ..
296 } = consensus_node_config;
297 let oracle = oracle.clone();
298 let uid = format!("{CONSENSUS_NODE_PREFIX}_{public_key}");
299 let feed_state = FeedStateHandle::new();
300
301 execution_config.validator_key = Some(public_key.encode().as_ref().try_into().unwrap());
302 execution_config.feed_state = Some(feed_state.clone());
303
304 let engine_config = consensus::Builder {
305 execution_node: None,
306 blocker: oracle.control(private_key.public_key()),
307 peer_manager: oracle.socket_manager(),
308 partition_prefix: uid.clone(),
309 share,
310 signer: private_key.clone(),
311 mailbox_size: 1024,
312 deque_size: 10,
313 time_to_propose: Duration::from_secs(2),
314 time_to_collect_notarizations: Duration::from_secs(3),
315 time_to_retry_nullify_broadcast: Duration::from_secs(10),
316 time_for_peer_response: Duration::from_secs(2),
317 views_to_track: 10,
318 views_until_leader_skip: 5,
319 payload_interrupt_time: Duration::from_millis(200),
320 new_payload_wait_time,
321 time_to_build_subblock: Duration::from_millis(100),
322 subblock_broadcast_interval: Duration::from_millis(50),
323 fcu_heartbeat_interval: Duration::from_secs(3),
324 feed_state,
325 with_subblocks,
326 };
327
328 nodes.push(TestingNode::new(
329 uid,
330 private_key,
331 oracle.clone(),
332 engine_config,
333 execution_runtime.handle(),
334 execution_config,
335 ingress,
336 address,
337 ));
338 }
339
340 link_validators(&mut oracle, &nodes, linkage, None).await;
341
342 (nodes, execution_runtime)
343}
344
345pub fn run(setup: Setup, mut stop_condition: impl FnMut(&str, &str) -> bool) -> String {
347 let cfg = deterministic::Config::default().with_seed(setup.seed);
348 let executor = Runner::from(cfg);
349
350 executor.start(|mut context| async move {
351 let (mut nodes, _execution_runtime) = setup_validators(&mut context, setup.clone()).await;
353 join_all(nodes.iter_mut().map(|node| node.start(&context))).await;
354
355 loop {
356 let metrics = context.encode();
357
358 let mut success = false;
359 for line in metrics.lines() {
360 if !line.starts_with(CONSENSUS_NODE_PREFIX) {
361 continue;
362 }
363
364 let mut parts = line.split_whitespace();
365 let metric = parts.next().unwrap();
366 let value = parts.next().unwrap();
367
368 if metric.ends_with("_peers_blocked") {
369 let value = value.parse::<u64>().unwrap();
370 assert_eq!(value, 0);
371 }
372
373 if stop_condition(metric, value) {
374 success = true;
375 break;
376 }
377 }
378
379 if success {
380 break;
381 }
382
383 context.sleep(Duration::from_secs(1)).await;
384 }
385
386 context.auditor().state()
387 })
388}
389
390pub async fn connect_execution_to_peers<TClock: commonware_runtime::Clock>(
395 node: &TestingNode<TClock>,
396 nodes: &[TestingNode<TClock>],
397) {
398 for other in nodes {
399 if node.public_key() == other.public_key() {
400 continue;
401 }
402
403 if let (Some(a), Some(b)) = (node.execution_node.as_ref(), other.execution_node.as_ref()) {
404 a.connect_peer(b).await;
405 }
406 }
407}
408
409pub async fn connect_execution_peers<TClock: commonware_runtime::Clock>(
413 nodes: &[TestingNode<TClock>],
414) {
415 for i in 0..nodes.len() {
416 connect_execution_to_peers(&nodes[i], &nodes[(i + 1)..]).await;
417 }
418}
419
420pub async fn link_validators<TClock: commonware_runtime::Clock>(
425 oracle: &mut Oracle<PublicKey, TClock>,
426 validators: &[TestingNode<TClock>],
427 link: Link,
428 restrict_to: Option<fn(usize, usize, usize) -> bool>,
429) {
430 for (i1, v1) in validators.iter().enumerate() {
431 for (i2, v2) in validators.iter().enumerate() {
432 if v1.public_key() == v2.public_key() {
434 continue;
435 }
436
437 if let Some(f) = restrict_to
439 && !f(validators.len(), i1, i2)
440 {
441 continue;
442 }
443
444 match oracle
446 .add_link(
447 v1.public_key().clone(),
448 v2.public_key().clone(),
449 link.clone(),
450 )
451 .await
452 {
453 Ok(()) => (),
454 Err(commonware_p2p::simulated::Error::PeerMissing) => (),
458 Err(commonware_p2p::simulated::Error::LinkExists) => (),
460 res @ Err(_) => res.unwrap(),
461 }
462 }
463 }
464}
465
466pub fn get_pipeline_runs(recorder: &PrometheusRecorder) -> u64 {
468 recorder
469 .handle()
470 .render()
471 .lines()
472 .find(|line| line.starts_with("reth_consensus_engine_beacon_pipeline_runs"))
473 .and_then(|line| line.split_whitespace().nth(1)?.parse().ok())
474 .unwrap_or(0)
475}