1#![cfg_attr(not(test), warn(unused_crate_dependencies))]
11#![cfg_attr(docsrs, feature(doc_cfg))]
12
13use std::{iter::repeat_with, net::SocketAddr, time::Duration};
14
15use alloy::signers::k256::schnorr::CryptoRngCore;
16use alloy_primitives::Address;
17use commonware_consensus::types::Epoch;
18use commonware_cryptography::{
19 Signer as _,
20 bls12381::{
21 dkg::{self},
22 primitives::{group::Share, sharing::Mode},
23 },
24 ed25519::{PrivateKey, PublicKey},
25};
26use commonware_math::algebra::Random as _;
27use commonware_p2p::simulated::{self, Link, Network, Oracle};
28
29use commonware_codec::Encode;
30use commonware_runtime::{
31 Clock, Metrics as _, Runner as _,
32 deterministic::{self, Context, Runner},
33};
34use commonware_utils::{N3f1, TryFromIterator as _, ordered};
35use futures::future::join_all;
36use itertools::Itertools as _;
37use reth_node_metrics::recorder::PrometheusRecorder;
38use tempo_commonware_node::{consensus, feed::FeedStateHandle};
39
40pub mod execution_runtime;
41pub use execution_runtime::ExecutionNodeConfig;
42pub mod testing_node;
43pub use execution_runtime::ExecutionRuntime;
44use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
45pub use testing_node::TestingNode;
46
47#[cfg(test)]
48mod tests;
49
50pub const CONSENSUS_NODE_PREFIX: &str = "consensus";
51pub const EXECUTION_NODE_PREFIX: &str = "execution";
52
53fn generate_consensus_node_config(
54 rng: &mut impl CryptoRngCore,
55 signers: u32,
56 verifiers: u32,
57) -> (
58 OnchainDkgOutcome,
59 ordered::Map<PublicKey, ConsensusNodeConfig>,
60) {
61 let signer_keys = repeat_with(|| PrivateKey::random(&mut *rng))
62 .take(signers as usize)
63 .collect::<Vec<_>>();
64
65 let (initial_dkg_outcome, shares) = dkg::deal::<_, _, N3f1>(
66 &mut *rng,
67 Mode::NonZeroCounter,
68 ordered::Set::try_from_iter(signer_keys.iter().map(|key| key.public_key())).unwrap(),
69 )
70 .unwrap();
71
72 let onchain_dkg_outcome = OnchainDkgOutcome {
73 epoch: Epoch::zero(),
74 output: initial_dkg_outcome,
75 next_players: shares.keys().clone(),
76 is_next_full_dkg: false,
77 };
78
79 let verifier_keys = repeat_with(|| PrivateKey::random(&mut *rng))
80 .take(verifiers as usize)
81 .collect::<Vec<_>>();
82
83 let validators = ordered::Map::try_from_iter(
84 signer_keys
85 .into_iter()
86 .chain(verifier_keys)
87 .enumerate()
88 .map(|(i, private_key)| {
89 let public_key = private_key.public_key();
90 let config = ConsensusNodeConfig {
91 address: crate::execution_runtime::validator(i as u32),
92 ingress: SocketAddr::from(([127, 0, 0, (i + 1) as u8], 8000)),
93 egress: SocketAddr::from(([127, 0, 0, (i + 1) as u8], 0)),
94 fee_recipient: Address::ZERO,
95 private_key,
96 share: shares.get_value(&public_key).cloned(),
97 };
98 (public_key, config)
99 }),
100 )
101 .unwrap();
102
103 (onchain_dkg_outcome, validators)
104}
105
106#[derive(Clone, Debug)]
108pub struct ConsensusNodeConfig {
109 pub address: Address,
110 pub ingress: SocketAddr,
111 pub egress: SocketAddr,
112 pub fee_recipient: Address,
113 pub private_key: PrivateKey,
114 pub share: Option<Share>,
115}
116
117#[derive(Clone)]
119pub struct Setup {
120 pub how_many_signers: u32,
122
123 pub how_many_verifiers: u32,
126
127 pub seed: u64,
129
130 pub linkage: Link,
132
133 pub epoch_length: u64,
135
136 pub connect_execution_layer_nodes: bool,
138
139 pub new_payload_wait_time: Duration,
142
143 pub t2_time: u64,
149
150 pub with_subblocks: bool,
152}
153
154impl Setup {
155 pub fn new() -> Self {
156 Self {
157 how_many_signers: 4,
158 how_many_verifiers: 0,
159 seed: 0,
160 linkage: Link {
161 latency: Duration::from_millis(10),
162 jitter: Duration::from_millis(1),
163 success_rate: 1.0,
164 },
165 epoch_length: 20,
166 connect_execution_layer_nodes: false,
167 new_payload_wait_time: Duration::from_millis(300),
168 t2_time: 1,
169 with_subblocks: false,
170 }
171 }
172
173 pub fn how_many_signers(self, how_many_signers: u32) -> Self {
174 Self {
175 how_many_signers,
176 ..self
177 }
178 }
179
180 pub fn how_many_verifiers(self, how_many_verifiers: u32) -> Self {
181 Self {
182 how_many_verifiers,
183 ..self
184 }
185 }
186
187 pub fn seed(self, seed: u64) -> Self {
188 Self { seed, ..self }
189 }
190
191 pub fn linkage(self, linkage: Link) -> Self {
192 Self { linkage, ..self }
193 }
194
195 pub fn epoch_length(self, epoch_length: u64) -> Self {
196 Self {
197 epoch_length,
198 ..self
199 }
200 }
201
202 pub fn connect_execution_layer_nodes(self, connect_execution_layer_nodes: bool) -> Self {
203 Self {
204 connect_execution_layer_nodes,
205 ..self
206 }
207 }
208
209 pub fn new_payload_wait_time(self, new_payload_wait_time: Duration) -> Self {
210 Self {
211 new_payload_wait_time,
212 ..self
213 }
214 }
215
216 pub fn subblocks(self, with_subblocks: bool) -> Self {
217 Self {
218 with_subblocks,
219 ..self
220 }
221 }
222
223 pub fn t2_time(self, t2_time: u64) -> Self {
224 Self { t2_time, ..self }
225 }
226}
227
228impl Default for Setup {
229 fn default() -> Self {
230 Self::new()
231 }
232}
233
234pub async fn setup_validators(
241 context: &mut Context,
242 Setup {
243 epoch_length,
244 how_many_signers,
245 how_many_verifiers,
246 connect_execution_layer_nodes,
247 linkage,
248 new_payload_wait_time,
249 t2_time,
250 with_subblocks,
251 ..
252 }: Setup,
253) -> (Vec<TestingNode<Context>>, ExecutionRuntime) {
254 let (network, mut oracle) = Network::new(
255 context.with_label("network"),
256 simulated::Config {
257 max_size: 1024 * 1024,
258 disconnect_on_block: true,
259 tracked_peer_sets: Some(3),
260 },
261 );
262 network.start();
263
264 let (onchain_dkg_outcome, validators) =
265 generate_consensus_node_config(context, how_many_signers, how_many_verifiers);
266
267 let execution_runtime = ExecutionRuntime::builder()
268 .with_epoch_length(epoch_length)
269 .with_initial_dkg_outcome(onchain_dkg_outcome)
270 .with_t2_time(t2_time)
271 .with_validators(validators.clone())
272 .launch()
273 .unwrap();
274
275 let execution_configs = ExecutionNodeConfig::generator()
276 .with_count(how_many_signers + how_many_verifiers)
277 .with_peers(connect_execution_layer_nodes)
278 .generate();
279
280 let mut nodes = vec![];
281
282 for ((public_key, consensus_node_config), mut execution_config) in
283 validators.into_iter().zip_eq(execution_configs)
284 {
285 let ConsensusNodeConfig {
286 address,
287 ingress,
288 private_key,
289 share,
290 ..
291 } = consensus_node_config;
292 let oracle = oracle.clone();
293 let uid = format!("{CONSENSUS_NODE_PREFIX}_{public_key}");
294 let feed_state = FeedStateHandle::new();
295
296 execution_config.validator_key = Some(public_key.encode().as_ref().try_into().unwrap());
297 execution_config.feed_state = Some(feed_state.clone());
298
299 let engine_config = consensus::Builder {
300 fee_recipient: alloy_primitives::Address::ZERO,
301 execution_node: None,
302 blocker: oracle.control(private_key.public_key()),
303 peer_manager: oracle.socket_manager(),
304 partition_prefix: uid.clone(),
305 share,
306 signer: private_key.clone(),
307 mailbox_size: 1024,
308 deque_size: 10,
309 time_to_propose: Duration::from_secs(2),
310 time_to_collect_notarizations: Duration::from_secs(3),
311 time_to_retry_nullify_broadcast: Duration::from_secs(10),
312 time_for_peer_response: Duration::from_secs(2),
313 views_to_track: 10,
314 views_until_leader_skip: 5,
315 payload_interrupt_time: Duration::from_millis(200),
316 new_payload_wait_time,
317 time_to_build_subblock: Duration::from_millis(100),
318 subblock_broadcast_interval: Duration::from_millis(50),
319 fcu_heartbeat_interval: Duration::from_secs(3),
320 feed_state,
321 with_subblocks,
322 };
323
324 nodes.push(TestingNode::new(
325 uid,
326 private_key,
327 oracle.clone(),
328 engine_config,
329 execution_runtime.handle(),
330 execution_config,
331 ingress,
332 address,
333 ));
334 }
335
336 link_validators(&mut oracle, &nodes, linkage, None).await;
337
338 (nodes, execution_runtime)
339}
340
341pub fn run(setup: Setup, mut stop_condition: impl FnMut(&str, &str) -> bool) -> String {
343 let cfg = deterministic::Config::default().with_seed(setup.seed);
344 let executor = Runner::from(cfg);
345
346 executor.start(|mut context| async move {
347 let (mut nodes, _execution_runtime) = setup_validators(&mut context, setup.clone()).await;
349
350 join_all(nodes.iter_mut().map(|node| node.start(&context))).await;
351
352 loop {
353 let metrics = context.encode();
354
355 let mut success = false;
356 for line in metrics.lines() {
357 if !line.starts_with(CONSENSUS_NODE_PREFIX) {
358 continue;
359 }
360
361 let mut parts = line.split_whitespace();
362 let metric = parts.next().unwrap();
363 let value = parts.next().unwrap();
364
365 if metric.ends_with("_peers_blocked") {
366 let value = value.parse::<u64>().unwrap();
367 assert_eq!(value, 0);
368 }
369
370 if setup.t2_time == 0 {
371 if metric.ends_with("_dkg_manager_read_players_from_v1_contract_total") {
372 assert_eq!(0, value.parse::<u64>().unwrap());
373 }
374 if metric.ends_with("_dkg_manager_syncing_players") {
375 assert_eq!(0, value.parse::<u64>().unwrap());
376 }
377 if metric.ends_with("_dkg_manager_read_re_dkg_epoch_from_v1_contract_total") {
378 assert_eq!(0, value.parse::<u64>().unwrap());
379 }
380 }
381
382 if stop_condition(metric, value) {
383 success = true;
384 break;
385 }
386 }
387
388 if success {
389 break;
390 }
391
392 context.sleep(Duration::from_secs(1)).await;
393 }
394
395 context.auditor().state()
396 })
397}
398
399pub async fn link_validators<TClock: commonware_runtime::Clock>(
404 oracle: &mut Oracle<PublicKey, TClock>,
405 validators: &[TestingNode<TClock>],
406 link: Link,
407 restrict_to: Option<fn(usize, usize, usize) -> bool>,
408) {
409 for (i1, v1) in validators.iter().enumerate() {
410 for (i2, v2) in validators.iter().enumerate() {
411 if v1.public_key() == v2.public_key() {
413 continue;
414 }
415
416 if let Some(f) = restrict_to
418 && !f(validators.len(), i1, i2)
419 {
420 continue;
421 }
422
423 match oracle
425 .add_link(
426 v1.public_key().clone(),
427 v2.public_key().clone(),
428 link.clone(),
429 )
430 .await
431 {
432 Ok(()) => (),
433 Err(commonware_p2p::simulated::Error::PeerMissing) => (),
437 Err(commonware_p2p::simulated::Error::LinkExists) => (),
439 res @ Err(_) => res.unwrap(),
440 }
441 }
442 }
443}
444
445pub fn get_pipeline_runs(recorder: &PrometheusRecorder) -> u64 {
447 recorder
448 .handle()
449 .render()
450 .lines()
451 .find(|line| line.starts_with("reth_consensus_engine_beacon_pipeline_runs"))
452 .and_then(|line| line.split_whitespace().nth(1)?.parse().ok())
453 .unwrap_or(0)
454}