1#![cfg_attr(not(test), warn(unused_crate_dependencies))]
11#![cfg_attr(docsrs, feature(doc_cfg))]
12
13use std::{iter::repeat_with, net::SocketAddr, time::Duration};
14
15use alloy_primitives::Address;
16use commonware_consensus::types::Epoch;
17use commonware_cryptography::{
18 Signer as _,
19 bls12381::{
20 dkg::{self},
21 primitives::{group::Share, sharing::Mode},
22 },
23 ed25519::{PrivateKey, PublicKey},
24};
25use commonware_math::algebra::Random as _;
26use commonware_p2p::simulated::{self, Link, Network, Oracle};
27
28use commonware_codec::Encode;
29use commonware_runtime::{
30 Metrics as _, Runner as _,
31 deterministic::{self, Context, Runner},
32};
33use commonware_utils::{N3f1, TryFromIterator as _, ordered};
34use futures::future::join_all;
35use itertools::Itertools as _;
36use rand_core::CryptoRngCore;
37use reth_node_metrics::recorder::PrometheusRecorder;
38use tempo_consensus::{consensus, feed::FeedStateHandle};
39
40pub mod execution_runtime;
41pub mod metrics;
42pub use execution_runtime::ExecutionNodeConfig;
43pub mod testing_node;
44pub use execution_runtime::ExecutionRuntime;
45use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
46pub use testing_node::TestingNode;
47
48#[cfg(test)]
49mod tests;
50
51pub const CONSENSUS_NODE_PREFIX: &str = "consensus";
52pub const EXECUTION_NODE_PREFIX: &str = "execution";
53
54fn generate_consensus_node_config(
55 rng: &mut impl CryptoRngCore,
56 signers: u32,
57 verifiers: u32,
58 fee_recipient: Address,
59) -> (
60 OnchainDkgOutcome,
61 ordered::Map<PublicKey, ConsensusNodeConfig>,
62) {
63 let signer_keys = repeat_with(|| PrivateKey::random(&mut *rng))
64 .take(signers as usize)
65 .collect::<Vec<_>>();
66
67 let (initial_dkg_outcome, shares) = dkg::deal::<_, _, N3f1>(
68 &mut *rng,
69 Mode::NonZeroCounter,
70 ordered::Set::try_from_iter(signer_keys.iter().map(|key| key.public_key())).unwrap(),
71 )
72 .unwrap();
73
74 let onchain_dkg_outcome = OnchainDkgOutcome {
75 epoch: Epoch::zero(),
76 output: initial_dkg_outcome,
77 next_players: shares.keys().clone(),
78 is_next_full_dkg: false,
79 };
80
81 let verifier_keys = repeat_with(|| PrivateKey::random(&mut *rng))
82 .take(verifiers as usize)
83 .collect::<Vec<_>>();
84
85 let validators = ordered::Map::try_from_iter(
86 signer_keys
87 .into_iter()
88 .chain(verifier_keys)
89 .enumerate()
90 .map(|(i, private_key)| {
91 let public_key = private_key.public_key();
92 let config = ConsensusNodeConfig {
93 address: crate::execution_runtime::validator(i as u32),
94 ingress: SocketAddr::from(([127, 0, 0, (i + 1) as u8], 8000)),
95 egress: SocketAddr::from(([127, 0, 0, (i + 1) as u8], 0)),
96 fee_recipient,
97 private_key,
98 share: shares.get_value(&public_key).cloned(),
99 };
100 (public_key, config)
101 }),
102 )
103 .unwrap();
104
105 (onchain_dkg_outcome, validators)
106}
107
108#[derive(Clone, Debug)]
110pub struct ConsensusNodeConfig {
111 pub address: Address,
112 pub ingress: SocketAddr,
113 pub egress: SocketAddr,
114 pub fee_recipient: Address,
115 pub private_key: PrivateKey,
116 pub share: Option<Share>,
117}
118
119#[derive(Clone)]
121pub struct Setup {
122 pub how_many_signers: u32,
124
125 pub how_many_verifiers: u32,
128
129 pub seed: u64,
131
132 pub linkage: Link,
134
135 pub epoch_length: u64,
137
138 pub proposal_return_budget: Duration,
140
141 pub with_subblocks: bool,
143
144 pub fee_recipient: Address,
146}
147
148impl Setup {
149 pub fn new() -> Self {
150 Self {
151 how_many_signers: 4,
152 how_many_verifiers: 0,
153 seed: 0,
154 linkage: Link {
155 latency: Duration::from_millis(10),
156 jitter: Duration::from_millis(1),
157 success_rate: 1.0,
158 },
159 epoch_length: 20,
160 proposal_return_budget: Duration::from_millis(300),
161 with_subblocks: false,
162 fee_recipient: Address::ZERO,
163 }
164 }
165
166 pub fn how_many_signers(self, how_many_signers: u32) -> Self {
167 Self {
168 how_many_signers,
169 ..self
170 }
171 }
172
173 pub fn how_many_verifiers(self, how_many_verifiers: u32) -> Self {
174 Self {
175 how_many_verifiers,
176 ..self
177 }
178 }
179
180 pub fn seed(self, seed: u64) -> Self {
181 Self { seed, ..self }
182 }
183
184 pub fn linkage(self, linkage: Link) -> Self {
185 Self { linkage, ..self }
186 }
187
188 pub fn epoch_length(self, epoch_length: u64) -> Self {
189 Self {
190 epoch_length,
191 ..self
192 }
193 }
194
195 pub fn proposal_return_budget(self, proposal_return_budget: Duration) -> Self {
196 Self {
197 proposal_return_budget,
198 ..self
199 }
200 }
201
202 pub fn subblocks(self, with_subblocks: bool) -> Self {
203 Self {
204 with_subblocks,
205 ..self
206 }
207 }
208
209 pub fn fee_recipient(self, fee_recipient: Address) -> Self {
210 Self {
211 fee_recipient,
212 ..self
213 }
214 }
215}
216
217impl Default for Setup {
218 fn default() -> Self {
219 Self::new()
220 }
221}
222
223pub async fn setup_validators(
230 context: &mut Context,
231 Setup {
232 epoch_length,
233 how_many_signers,
234 how_many_verifiers,
235 linkage,
236 proposal_return_budget,
237 with_subblocks,
238 fee_recipient,
239 ..
240 }: Setup,
241) -> (Vec<TestingNode<Context>>, ExecutionRuntime) {
242 let (network, mut oracle) = Network::new(
243 context.with_label("network"),
244 simulated::Config {
245 max_size: 1024 * 1024,
246 disconnect_on_block: true,
247 tracked_peer_sets: commonware_utils::NZUsize!(3),
248 },
249 );
250 network.start();
251
252 let (onchain_dkg_outcome, validators) = generate_consensus_node_config(
253 context,
254 how_many_signers,
255 how_many_verifiers,
256 fee_recipient,
257 );
258
259 let execution_runtime = ExecutionRuntime::builder()
260 .with_epoch_length(epoch_length)
261 .with_initial_dkg_outcome(onchain_dkg_outcome)
262 .with_validators(validators.clone())
263 .launch()
264 .unwrap();
265
266 let execution_configs = ExecutionNodeConfig::generator()
267 .with_count(how_many_signers + how_many_verifiers)
268 .generate();
269
270 let mut nodes = vec![];
271
272 for ((public_key, consensus_node_config), mut execution_config) in
273 validators.into_iter().zip_eq(execution_configs)
274 {
275 let ConsensusNodeConfig {
276 address,
277 ingress,
278 private_key,
279 share,
280 ..
281 } = consensus_node_config;
282 let oracle = oracle.clone();
283 let uid = format!("{CONSENSUS_NODE_PREFIX}_{public_key}");
284 let feed_state = FeedStateHandle::new();
285
286 execution_config.validator_key = Some(public_key.encode().as_ref().try_into().unwrap());
287 execution_config.feed_state = Some(feed_state.clone());
288
289 let engine_config = consensus::Builder {
290 execution_node: None,
291 blocker: oracle.control(private_key.public_key()),
292 peer_manager: oracle.socket_manager(),
293 partition_prefix: uid.clone(),
294 share,
295 signer: private_key.clone(),
296 mailbox_size: 1024,
297 deque_size: 10,
298 time_to_propose: Duration::from_secs(2),
299 time_to_collect_notarizations: Duration::from_secs(3),
300 time_to_retry_nullify_broadcast: Duration::from_secs(10),
301 time_for_peer_response: Duration::from_secs(2),
302 views_to_track: 10,
303 views_until_leader_skip: 5,
304 proposal_return_budget,
305 time_to_build_subblock: Duration::from_millis(100),
306 subblock_broadcast_interval: Duration::from_millis(50),
307 fcu_heartbeat_interval: Duration::from_secs(3),
308 feed_state,
309 with_subblocks,
310 finalized_blocks_retention: 1024,
313 };
314
315 nodes.push(TestingNode::new(
316 uid,
317 private_key,
318 oracle.clone(),
319 engine_config,
320 execution_runtime.handle(),
321 execution_config,
322 ingress,
323 address,
324 ));
325 }
326
327 link_validators(&mut oracle, &nodes, linkage, None).await;
328
329 (nodes, execution_runtime)
330}
331
332pub fn run(setup: Setup, mut stop_condition: impl FnMut(&metrics::Metrics) -> bool) -> String {
334 let cfg = deterministic::Config::default().with_seed(setup.seed);
335 let executor = Runner::from(cfg);
336
337 executor.start(|mut context| async move {
338 let (mut nodes, _execution_runtime) = setup_validators(&mut context, setup.clone()).await;
340 join_all(nodes.iter_mut().map(|node| node.start(&context))).await;
341
342 metrics::wait_for_metrics(&context, |metrics| {
343 metrics.assert_no_blocked_peers();
344 stop_condition(metrics)
345 })
346 .await;
347
348 context.auditor().state()
349 })
350}
351
352pub async fn connect_execution_to_peers<TClock: commonware_runtime::Clock>(
357 node: &TestingNode<TClock>,
358 nodes: &[TestingNode<TClock>],
359) {
360 for other in nodes {
361 if node.public_key() == other.public_key() {
362 continue;
363 }
364
365 if let (Some(a), Some(b)) = (node.execution_node.as_ref(), other.execution_node.as_ref()) {
366 a.connect_peer(b).await;
367 }
368 }
369}
370
371pub async fn connect_execution_peers<TClock: commonware_runtime::Clock>(
375 nodes: &[TestingNode<TClock>],
376) {
377 for i in 0..nodes.len() {
378 connect_execution_to_peers(&nodes[i], &nodes[(i + 1)..]).await;
379 }
380}
381
382pub async fn link_validators<TClock: commonware_runtime::Clock>(
387 oracle: &mut Oracle<PublicKey, TClock>,
388 validators: &[TestingNode<TClock>],
389 link: Link,
390 restrict_to: Option<fn(usize, usize, usize) -> bool>,
391) {
392 for (i1, v1) in validators.iter().enumerate() {
393 for (i2, v2) in validators.iter().enumerate() {
394 if v1.public_key() == v2.public_key() {
396 continue;
397 }
398
399 if let Some(f) = restrict_to
401 && !f(validators.len(), i1, i2)
402 {
403 continue;
404 }
405
406 match oracle
408 .add_link(
409 v1.public_key().clone(),
410 v2.public_key().clone(),
411 link.clone(),
412 )
413 .await
414 {
415 Ok(()) => (),
416 Err(commonware_p2p::simulated::Error::PeerMissing) => (),
420 Err(commonware_p2p::simulated::Error::LinkExists) => (),
422 res @ Err(_) => res.unwrap(),
423 }
424 }
425 }
426}
427
428pub fn get_pipeline_runs(recorder: &PrometheusRecorder) -> u64 {
430 recorder
431 .handle()
432 .render()
433 .lines()
434 .find(|line| line.starts_with("reth_consensus_engine_beacon_pipeline_runs"))
435 .and_then(|line| line.split_whitespace().nth(1)?.parse().ok())
436 .unwrap_or(0)
437}