1use crate::execution_runtime::{
4 self, ExecutionNode, ExecutionNodeConfig, ExecutionRuntimeHandle, test_db_args,
5};
6use alloy_primitives::{Address, B256};
7use commonware_cryptography::{
8 Signer as _,
9 ed25519::{PrivateKey, PublicKey},
10};
11use commonware_p2p::simulated::{Control, Oracle, SocketManager};
12use commonware_runtime::{Handle, Metrics as _, deterministic::Context};
13use reth_config::config::StageConfig;
14use reth_db::{Database, DatabaseEnv, open_db_read_only};
15use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
16use reth_ethereum::{
17 consensus::noop::NoopConsensus,
18 provider::{
19 DatabaseProviderFactory, ProviderFactory, RocksDBProviderFactory,
20 providers::{BlockchainProvider, RocksDBProvider, StaticFileProvider},
21 },
22 storage::BlockNumReader,
23};
24use reth_node_builder::NodeTypesWithDBAdapter;
25use reth_prune_types::PruneModes;
26use reth_stages::{Pipeline, sets::DefaultStages};
27use reth_static_file::StaticFileProducer;
28use std::{
29 net::{IpAddr, SocketAddr},
30 path::PathBuf,
31 sync::Arc,
32};
33use tempo_consensus::{
34 BROADCASTER_CHANNEL_IDENT, BROADCASTER_LIMIT, CERTIFICATES_CHANNEL_IDENT, CERTIFICATES_LIMIT,
35 DKG_CHANNEL_IDENT, DKG_LIMIT, MARSHAL_CHANNEL_IDENT, MARSHAL_LIMIT, RESOLVER_CHANNEL_IDENT,
36 RESOLVER_LIMIT, SUBBLOCKS_CHANNEL_IDENT, SUBBLOCKS_LIMIT, VOTES_CHANNEL_IDENT, VOTES_LIMIT,
37 consensus,
38};
39use tempo_evm::TempoEvmConfig;
40use tempo_node::node::TempoNode;
41use tracing::{debug, instrument};
42
43pub struct TestingNode<TClock>
45where
46 TClock: commonware_runtime::Clock,
47{
48 pub uid: String,
50 pub private_key: PrivateKey,
52 pub oracle: Oracle<PublicKey, TClock>,
54 pub consensus_config:
56 consensus::Builder<Control<PublicKey, TClock>, SocketManager<PublicKey, TClock>>,
57 pub consensus_handle: Option<Handle<eyre::Result<()>>>,
59 pub execution_node_datadir: PathBuf,
61 pub execution_node: Option<ExecutionNode>,
63 pub execution_runtime: ExecutionRuntimeHandle,
65 pub execution_config: ExecutionNodeConfig,
67 pub execution_database: Option<DatabaseEnv>,
69 pub execution_rocksdb: Option<RocksDBProvider>,
71 pub execution_node_name: String,
74 pub last_db_block_on_stop: Option<u64>,
76 pub network_address: SocketAddr,
79 pub chain_address: Address,
82
83 n_starts: u32,
84}
85
86impl<TClock> TestingNode<TClock>
87where
88 TClock: commonware_runtime::Clock,
89{
90 #[expect(clippy::too_many_arguments, reason = "quickly threw this together")]
95 pub fn new(
96 uid: String,
97 private_key: PrivateKey,
98 oracle: Oracle<PublicKey, TClock>,
99 consensus_config: consensus::Builder<
100 Control<PublicKey, TClock>,
101 SocketManager<PublicKey, TClock>,
102 >,
103 execution_runtime: ExecutionRuntimeHandle,
104 execution_config: ExecutionNodeConfig,
105 network_address: SocketAddr,
106 chain_address: Address,
107 ) -> Self {
108 let public_key = private_key.public_key();
109 let execution_node_datadir = execution_runtime
110 .nodes_dir()
111 .join(execution_runtime::execution_node_name(&public_key));
112
113 let execution_node_name = execution_runtime::execution_node_name(&public_key);
114 Self {
115 uid,
116 private_key,
117 oracle,
118 consensus_config,
119 consensus_handle: None,
120 execution_node: None,
121 execution_node_datadir,
122 execution_runtime,
123 execution_config,
124 execution_node_name,
125 execution_database: None,
126 execution_rocksdb: None,
127 last_db_block_on_stop: None,
128 network_address,
129 chain_address,
130
131 n_starts: 0,
132 }
133 }
134
135 pub fn fee_recipient(&self) -> Address {
136 Address::ZERO
137 }
138
139 pub fn private_key(&self) -> &PrivateKey {
140 &self.private_key
141 }
142
143 pub fn public_key(&self) -> PublicKey {
145 self.private_key.public_key()
146 }
147
148 pub fn uid(&self) -> &str {
150 &self.uid
151 }
152
153 pub fn metric_prefix(&self) -> String {
158 assert!(self.n_starts > 0, "node has never been started");
159 format!("{}_{}", self.uid, self.n_starts - 1)
160 }
161
162 pub fn consensus_config(
164 &self,
165 ) -> &consensus::Builder<Control<PublicKey, TClock>, SocketManager<PublicKey, TClock>> {
166 &self.consensus_config
167 }
168
169 pub fn consensus_config_mut(
171 &mut self,
172 ) -> &mut consensus::Builder<Control<PublicKey, TClock>, SocketManager<PublicKey, TClock>> {
173 &mut self.consensus_config
174 }
175
176 pub fn oracle(&self) -> &Oracle<PublicKey, TClock> {
178 &self.oracle
179 }
180
181 pub fn ingress(&self) -> SocketAddr {
182 self.network_address
183 }
184
185 pub fn egress(&self) -> IpAddr {
186 self.network_address.ip()
187 }
188
189 pub fn is_signer(&self) -> bool {
191 self.consensus_config.share.is_some()
192 }
193
194 pub fn is_verifier(&self) -> bool {
196 self.consensus_config.share.is_none()
197 }
198
199 pub async fn start(&mut self, context: &Context) {
205 Box::pin(self.start_inner(context)).await
206 }
207
208 async fn start_inner(&mut self, context: &Context) {
209 self.start_execution().await;
210 self.start_consensus(context).await;
211 self.n_starts += 1;
212 }
213
214 #[instrument(skip_all, fields(last_db_block = self.last_db_block_on_stop))]
219 async fn start_execution(&mut self) {
220 assert!(
221 self.execution_node.is_none(),
222 "execution node is already running for {}",
223 self.uid
224 );
225
226 if self.execution_database.is_none() {
228 let db_path = self.execution_node_datadir.join("db");
229 self.execution_database = Some(
230 reth_db::init_db(db_path, test_db_args())
231 .expect("failed to init database")
232 .with_metrics(),
233 );
234 }
235
236 let execution_node = self
237 .execution_runtime
238 .spawn_node(
239 &self.execution_node_name,
240 self.execution_config.clone(),
241 self.execution_database.as_ref().unwrap().clone(),
242 self.execution_rocksdb.clone(),
243 )
244 .await
245 .expect("must be able to spawn execution node");
246
247 if self.execution_rocksdb.is_none() {
248 self.execution_rocksdb = Some(execution_node.node.provider().rocksdb_provider());
249 }
250
251 if let Some(expected_block) = self.last_db_block_on_stop {
253 let current_db_block = execution_node
254 .node
255 .provider
256 .database_provider_ro()
257 .expect("failed to get database provider")
258 .last_block_number()
259 .expect("failed to get last block number from database");
260
261 assert!(current_db_block >= expected_block,);
262 }
263
264 self.consensus_config = self
266 .consensus_config
267 .clone()
268 .with_execution_node(execution_node.node.clone().into());
269 self.execution_node = Some(execution_node);
270 debug!(%self.uid, "started execution node for testing node");
271 }
272
273 async fn start_consensus(&mut self, context: &Context) {
278 assert!(
279 self.consensus_handle.is_none(),
280 "consensus is already running for {}",
281 self.uid
282 );
283 let engine = self
284 .consensus_config
285 .clone()
286 .try_init(context.with_label(&format!("{}_{}", self.uid, self.n_starts)))
287 .await
288 .expect("must be able to start the engine");
289
290 let votes = self
291 .oracle
292 .control(self.public_key())
293 .register(VOTES_CHANNEL_IDENT, VOTES_LIMIT)
294 .await
295 .unwrap();
296 let certificates = self
297 .oracle
298 .control(self.public_key())
299 .register(CERTIFICATES_CHANNEL_IDENT, CERTIFICATES_LIMIT)
300 .await
301 .unwrap();
302 let resolver = self
303 .oracle
304 .control(self.public_key())
305 .register(RESOLVER_CHANNEL_IDENT, RESOLVER_LIMIT)
306 .await
307 .unwrap();
308 let broadcast = self
309 .oracle
310 .control(self.public_key())
311 .register(BROADCASTER_CHANNEL_IDENT, BROADCASTER_LIMIT)
312 .await
313 .unwrap();
314 let marshal = self
315 .oracle
316 .control(self.public_key())
317 .register(MARSHAL_CHANNEL_IDENT, MARSHAL_LIMIT)
318 .await
319 .unwrap();
320 let dkg = self
321 .oracle
322 .control(self.public_key())
323 .register(DKG_CHANNEL_IDENT, DKG_LIMIT)
324 .await
325 .unwrap();
326 let subblocks = self
327 .oracle
328 .control(self.public_key())
329 .register(SUBBLOCKS_CHANNEL_IDENT, SUBBLOCKS_LIMIT)
330 .await
331 .unwrap();
332
333 let consensus_handle = engine.start(
334 votes,
335 certificates,
336 resolver,
337 broadcast,
338 marshal,
339 dkg,
340 subblocks,
341 );
342
343 self.consensus_handle = Some(consensus_handle);
344 debug!(%self.uid, "started consensus for testing node");
345 }
346
347 pub async fn stop(&mut self) {
352 self.stop_consensus().await;
353 self.stop_execution().await;
354 }
355
356 #[instrument(skip_all)]
361 pub async fn stop_consensus(&mut self) {
362 let handle = self
363 .consensus_handle
364 .take()
365 .unwrap_or_else(|| panic!("consensus is not running for {}, cannot stop", self.uid));
366 handle.abort();
367
368 let _ = handle.await;
370
371 debug!(%self.uid, "stopped consensus for testing node");
372 }
373
374 #[instrument(skip_all)]
382 async fn stop_execution(&mut self) {
383 debug!(%self.uid, "stopping execution node for testing node");
384 let execution_node = self.execution_node.take().unwrap_or_else(|| {
385 panic!(
386 "execution node is not running for {}, cannot stop",
387 self.uid
388 )
389 });
390
391 let last_db_block = execution_node
392 .node
393 .provider
394 .database_provider_ro()
395 .expect("failed to get database provider")
396 .last_block_number()
397 .expect("failed to get last block number from database");
398 tracing::debug!(
399 last_db_block,
400 "storing last block block number to verify restart"
401 );
402 self.last_db_block_on_stop = Some(last_db_block);
403
404 execution_node.shutdown().await;
405
406 drop(
412 self.execution_database
413 .as_ref()
414 .expect("database should exist")
415 .tx_mut()
416 .expect("failed to acquire rw transaction"),
417 );
418
419 debug!(%self.uid, "stopped execution node for testing node");
420 }
421
422 pub fn is_running(&self) -> bool {
424 self.consensus_handle.is_some() && self.execution_node.is_some()
425 }
426
427 pub fn is_consensus_running(&self) -> bool {
429 self.consensus_handle.is_some()
430 }
431
432 pub fn is_execution_running(&self) -> bool {
434 self.execution_node.is_some()
435 }
436
437 pub fn execution(&self) -> &tempo_node::TempoFullNode {
442 &self
443 .execution_node
444 .as_ref()
445 .expect("execution node is not running")
446 .node
447 }
448
449 pub fn consensus(&self) -> &Handle<eyre::Result<()>> {
454 self.consensus_handle
455 .as_ref()
456 .expect("consensus is not running")
457 }
458
459 pub fn execution_provider(
464 &self,
465 ) -> BlockchainProvider<NodeTypesWithDBAdapter<TempoNode, DatabaseEnv>> {
466 self.execution().provider.clone()
467 }
468
469 pub fn execution_provider_offline(
473 &self,
474 ) -> BlockchainProvider<NodeTypesWithDBAdapter<TempoNode, DatabaseEnv>> {
475 let database = open_db_read_only(self.execution_node_datadir.join("db"), test_db_args())
479 .expect("failed to open execution node database")
480 .with_metrics();
481
482 let static_file_provider =
483 StaticFileProvider::read_only(self.execution_node_datadir.join("static_files"))
484 .expect("failed to open static files");
485
486 let rocksdb = RocksDBProvider::builder(self.execution_node_datadir.join("rocksdb"))
487 .build()
488 .unwrap();
489
490 let provider_factory = ProviderFactory::<NodeTypesWithDBAdapter<TempoNode, _>>::new(
491 database,
492 Arc::new(execution_runtime::chainspec()),
493 static_file_provider,
494 rocksdb,
495 reth_ethereum::tasks::Runtime::test(),
496 )
497 .expect("failed to create provider factory");
498
499 BlockchainProvider::new(provider_factory).expect("failed to create blockchain provider")
500 }
501
502 pub fn unwind(&mut self, n: u64) -> (u64, u64) {
511 assert!(
512 self.execution_node.is_none(),
513 "execution node must be stopped before unwinding for {}",
514 self.uid
515 );
516
517 let db = self
518 .execution_database
519 .as_ref()
520 .expect("database should exist")
521 .clone();
522
523 let static_file_provider =
524 StaticFileProvider::read_write(self.execution_node_datadir.join("static_files"))
525 .expect("failed to open static files for rw");
526
527 let rocksdb = self
528 .execution_rocksdb
529 .as_ref()
530 .expect("rocksdb should exist")
531 .clone();
532
533 let provider_factory = ProviderFactory::<NodeTypesWithDBAdapter<TempoNode, _>>::new(
534 db,
535 Arc::new(execution_runtime::chainspec()),
536 static_file_provider,
537 rocksdb,
538 reth_ethereum::tasks::Runtime::test(),
539 )
540 .expect("failed to create provider factory");
541
542 let current = provider_factory
543 .provider()
544 .expect("failed to get provider")
545 .last_block_number()
546 .expect("failed to get last block number");
547
548 let target = current.saturating_sub(n);
549 debug!(
550 %self.uid, current, target, n,
551 "unwinding execution layer to simulate crash"
552 );
553
554 let evm_config = TempoEvmConfig::new(Arc::new(execution_runtime::chainspec()));
555 let prune_modes = PruneModes::default();
556 let (_tip_tx, tip_rx) = tokio::sync::watch::channel(B256::ZERO);
557
558 let mut pipeline = Pipeline::<NodeTypesWithDBAdapter<TempoNode, _>>::builder()
559 .add_stages(DefaultStages::new(
560 provider_factory.clone(),
561 tip_rx,
562 NoopConsensus::arc(),
563 NoopHeaderDownloader::default(),
564 NoopBodiesDownloader::default(),
565 evm_config,
566 StageConfig::default(),
567 prune_modes.clone(),
568 None,
569 ))
570 .build(
571 provider_factory.clone(),
572 StaticFileProducer::new(provider_factory, prune_modes),
573 );
574
575 pipeline
576 .unwind(target, None)
577 .expect("failed to unwind pipeline");
578
579 self.last_db_block_on_stop = Some(target);
582
583 debug!(%self.uid, target, "execution layer unwound successfully");
584 (current, target)
585 }
586}
587
588#[cfg(test)]
589mod tests {
590 use crate::{Setup, setup_validators};
591 use alloy::providers::{Provider, ProviderBuilder};
592 use commonware_p2p::simulated::Link;
593 use commonware_runtime::{
594 Runner as _,
595 deterministic::{Config, Runner},
596 };
597 use std::time::Duration;
598 use tokio::sync::{oneshot, oneshot::Sender};
599
600 enum Message {
601 Stop(Sender<()>),
602 Start(Sender<std::net::SocketAddr>),
603 }
604
605 async fn start_and_verify(tx_msg: &tokio::sync::mpsc::UnboundedSender<Message>) -> String {
607 let (tx_rpc_addr, rx_rpc_addr) = oneshot::channel();
608 let _ = tx_msg.send(Message::Start(tx_rpc_addr));
609 let rpc_addr = rx_rpc_addr.await.unwrap();
610 let rpc_url = format!("http://{rpc_addr}");
611
612 let provider = ProviderBuilder::new().connect_http(rpc_url.parse().unwrap());
614 let block_number = provider.get_block_number().await;
615 assert!(block_number.is_ok(), "RPC should be accessible after start");
616
617 rpc_url
618 }
619
620 #[tokio::test]
621 async fn just_restart() {
622 let _ = tempo_eyre::install();
624
625 let runner = Runner::from(Config::default().with_seed(0));
626 let (tx_msg, mut rx_msg) = tokio::sync::mpsc::unbounded_channel::<Message>();
627
628 std::thread::spawn(move || {
629 runner.start(|mut context| async move {
630 let setup = Setup::new()
631 .how_many_signers(1)
632 .linkage(Link {
633 latency: Duration::from_millis(10),
634 jitter: Duration::from_millis(1),
635 success_rate: 1.0,
636 })
637 .epoch_length(100);
638
639 let (mut nodes, _execution_runtime) = setup_validators(&mut context, setup).await;
640
641 let mut node = nodes.pop().unwrap();
642
643 loop {
644 match rx_msg.blocking_recv() {
645 Some(Message::Stop(tx_stopped)) => {
646 node.stop().await;
647 assert!(!node.is_running(), "node should not be running after stop");
648 assert!(
649 !node.is_consensus_running(),
650 "consensus should not be running after stop"
651 );
652 assert!(
653 !node.is_execution_running(),
654 "execution should not be running after stop"
655 );
656
657 let _ = tx_stopped.send(());
658 }
659 Some(Message::Start(tx_rpc_addr)) => {
660 node.start(&context).await;
661 assert!(node.is_running(), "node should be running after start");
662
663 let rpc_addr = node
665 .execution()
666 .rpc_server_handles
667 .rpc
668 .http_local_addr()
669 .expect("http rpc server should be running");
670
671 let _ = tx_rpc_addr.send(rpc_addr);
672 }
673 None => {
674 break;
675 }
676 }
677 }
678 });
679 });
680
681 let rpc_url = start_and_verify(&tx_msg).await;
683
684 let (tx_stopped, rx_stopped) = oneshot::channel();
686 let _ = tx_msg.send(Message::Stop(tx_stopped));
687 rx_stopped.await.unwrap();
688
689 let provider = ProviderBuilder::new().connect_http(rpc_url.parse().unwrap());
691 let result =
692 tokio::time::timeout(Duration::from_millis(500), provider.get_block_number()).await;
693 assert!(
694 result.is_err() || result.unwrap().is_err(),
695 "RPC should not be accessible after stopping"
696 );
697
698 start_and_verify(&tx_msg).await;
700 }
701}