Skip to main content

tempo_e2e/
testing_node.rs

1//! A testing node that can start and stop both consensus and execution layers.
2
3use crate::execution_runtime::{
4    self, ExecutionNode, ExecutionNodeConfig, ExecutionRuntimeHandle, test_db_args,
5};
6use alloy_primitives::{Address, B256};
7use commonware_cryptography::{
8    Signer as _,
9    ed25519::{PrivateKey, PublicKey},
10};
11use commonware_p2p::simulated::{Control, Oracle, SocketManager};
12use commonware_runtime::{Handle, Metrics as _, deterministic::Context};
13use reth_config::config::StageConfig;
14use reth_db::{Database, DatabaseEnv, open_db_read_only};
15use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
16use reth_ethereum::{
17    consensus::noop::NoopConsensus,
18    provider::{
19        DatabaseProviderFactory, ProviderFactory, RocksDBProviderFactory,
20        providers::{BlockchainProvider, RocksDBProvider, StaticFileProvider},
21    },
22    storage::BlockNumReader,
23};
24use reth_node_builder::NodeTypesWithDBAdapter;
25use reth_prune_types::PruneModes;
26use reth_stages::{Pipeline, sets::DefaultStages};
27use reth_static_file::StaticFileProducer;
28use std::{
29    net::{IpAddr, SocketAddr},
30    path::PathBuf,
31    sync::Arc,
32};
33use tempo_consensus::{
34    BROADCASTER_CHANNEL_IDENT, BROADCASTER_LIMIT, CERTIFICATES_CHANNEL_IDENT, CERTIFICATES_LIMIT,
35    DKG_CHANNEL_IDENT, DKG_LIMIT, MARSHAL_CHANNEL_IDENT, MARSHAL_LIMIT, RESOLVER_CHANNEL_IDENT,
36    RESOLVER_LIMIT, SUBBLOCKS_CHANNEL_IDENT, SUBBLOCKS_LIMIT, VOTES_CHANNEL_IDENT, VOTES_LIMIT,
37    consensus,
38};
39use tempo_evm::TempoEvmConfig;
40use tempo_node::node::TempoNode;
41use tracing::{debug, instrument};
42
43/// A testing node that can start and stop both consensus and execution layers.
44pub struct TestingNode<TClock>
45where
46    TClock: commonware_runtime::Clock,
47{
48    /// Unique identifier for this node
49    pub uid: String,
50    /// Public key of the validator
51    pub private_key: PrivateKey,
52    /// Simulated network oracle for test environments
53    pub oracle: Oracle<PublicKey, TClock>,
54    /// Consensus configuration used to start the consensus engine
55    pub consensus_config:
56        consensus::Builder<Control<PublicKey, TClock>, SocketManager<PublicKey, TClock>>,
57    /// Running consensus handle (None if consensus is stopped)
58    pub consensus_handle: Option<Handle<eyre::Result<()>>>,
59    /// Path to the execution node's data directory
60    pub execution_node_datadir: PathBuf,
61    /// Running execution node (None if execution is stopped)
62    pub execution_node: Option<ExecutionNode>,
63    /// Handle to the execution runtime for spawning new execution nodes
64    pub execution_runtime: ExecutionRuntimeHandle,
65    /// Configuration for the execution node
66    pub execution_config: ExecutionNodeConfig,
67    /// Database instance for the execution node
68    pub execution_database: Option<DatabaseEnv>,
69    /// RocksDB provider for the execution node
70    pub execution_rocksdb: Option<RocksDBProvider>,
71    /// The execution node name assigned at initialization. Important when
72    /// constructing the datadir at which to find the node.
73    pub execution_node_name: String,
74    /// Last block number in database when stopped (used for restart verification)
75    pub last_db_block_on_stop: Option<u64>,
76    /// Network address of the node. Used for execution the validator-config
77    /// addValidator contract call.
78    pub network_address: SocketAddr,
79    /// The chain address of the node. Used for executing validator-config smart
80    /// contract calls.
81    pub chain_address: Address,
82
83    n_starts: u32,
84}
85
86impl<TClock> TestingNode<TClock>
87where
88    TClock: commonware_runtime::Clock,
89{
90    /// Create a new TestingNode without spawning execution or starting consensus.
91    ///
92    /// Call `start()` to start both consensus and execution.
93    // FIXME: replace this by a `Config` to make this more digestible.
94    #[expect(clippy::too_many_arguments, reason = "quickly threw this together")]
95    pub fn new(
96        uid: String,
97        private_key: PrivateKey,
98        oracle: Oracle<PublicKey, TClock>,
99        consensus_config: consensus::Builder<
100            Control<PublicKey, TClock>,
101            SocketManager<PublicKey, TClock>,
102        >,
103        execution_runtime: ExecutionRuntimeHandle,
104        execution_config: ExecutionNodeConfig,
105        network_address: SocketAddr,
106        chain_address: Address,
107    ) -> Self {
108        let public_key = private_key.public_key();
109        let execution_node_datadir = execution_runtime
110            .nodes_dir()
111            .join(execution_runtime::execution_node_name(&public_key));
112
113        let execution_node_name = execution_runtime::execution_node_name(&public_key);
114        Self {
115            uid,
116            private_key,
117            oracle,
118            consensus_config,
119            consensus_handle: None,
120            execution_node: None,
121            execution_node_datadir,
122            execution_runtime,
123            execution_config,
124            execution_node_name,
125            execution_database: None,
126            execution_rocksdb: None,
127            last_db_block_on_stop: None,
128            network_address,
129            chain_address,
130
131            n_starts: 0,
132        }
133    }
134
135    pub fn fee_recipient(&self) -> Address {
136        Address::ZERO
137    }
138
139    pub fn private_key(&self) -> &PrivateKey {
140        &self.private_key
141    }
142
143    /// Get the validator public key of this node.
144    pub fn public_key(&self) -> PublicKey {
145        self.private_key.public_key()
146    }
147
148    /// Get the unique identifier of this node.
149    pub fn uid(&self) -> &str {
150        &self.uid
151    }
152
153    /// Get the metric prefix used by the most recently started instance.
154    ///
155    /// # Panics
156    /// Panics if the node has was never started.
157    pub fn metric_prefix(&self) -> String {
158        assert!(self.n_starts > 0, "node has never been started");
159        format!("{}_{}", self.uid, self.n_starts - 1)
160    }
161
162    /// Get a reference to the consensus config.
163    pub fn consensus_config(
164        &self,
165    ) -> &consensus::Builder<Control<PublicKey, TClock>, SocketManager<PublicKey, TClock>> {
166        &self.consensus_config
167    }
168
169    /// Get a mutable reference to the consensus config.
170    pub fn consensus_config_mut(
171        &mut self,
172    ) -> &mut consensus::Builder<Control<PublicKey, TClock>, SocketManager<PublicKey, TClock>> {
173        &mut self.consensus_config
174    }
175
176    /// Get a reference to the oracle.
177    pub fn oracle(&self) -> &Oracle<PublicKey, TClock> {
178        &self.oracle
179    }
180
181    pub fn ingress(&self) -> SocketAddr {
182        self.network_address
183    }
184
185    pub fn egress(&self) -> IpAddr {
186        self.network_address.ip()
187    }
188
189    /// A verifier is a node that has a share.
190    pub fn is_signer(&self) -> bool {
191        self.consensus_config.share.is_some()
192    }
193
194    /// A verifier is a node that has no share.
195    pub fn is_verifier(&self) -> bool {
196        self.consensus_config.share.is_none()
197    }
198
199    /// Start both consensus and execution layers.
200    ///
201    ///
202    /// # Panics
203    /// Panics if either consensus or execution is already running.
204    pub async fn start(&mut self, context: &Context) {
205        Box::pin(self.start_inner(context)).await
206    }
207
208    async fn start_inner(&mut self, context: &Context) {
209        self.start_execution().await;
210        self.start_consensus(context).await;
211        self.n_starts += 1;
212    }
213
214    /// Start the execution node and update consensus config to reference it.
215    ///
216    /// # Panics
217    /// Panics if execution node is already running.
218    #[instrument(skip_all, fields(last_db_block = self.last_db_block_on_stop))]
219    async fn start_execution(&mut self) {
220        assert!(
221            self.execution_node.is_none(),
222            "execution node is already running for {}",
223            self.uid
224        );
225
226        // Create database if not exists
227        if self.execution_database.is_none() {
228            let db_path = self.execution_node_datadir.join("db");
229            self.execution_database = Some(
230                reth_db::init_db(db_path, test_db_args())
231                    .expect("failed to init database")
232                    .with_metrics(),
233            );
234        }
235
236        let execution_node = self
237            .execution_runtime
238            .spawn_node(
239                &self.execution_node_name,
240                self.execution_config.clone(),
241                self.execution_database.as_ref().unwrap().clone(),
242                self.execution_rocksdb.clone(),
243            )
244            .await
245            .expect("must be able to spawn execution node");
246
247        if self.execution_rocksdb.is_none() {
248            self.execution_rocksdb = Some(execution_node.node.provider().rocksdb_provider());
249        }
250
251        // verify database persistence on restart
252        if let Some(expected_block) = self.last_db_block_on_stop {
253            let current_db_block = execution_node
254                .node
255                .provider
256                .database_provider_ro()
257                .expect("failed to get database provider")
258                .last_block_number()
259                .expect("failed to get last block number from database");
260
261            assert!(current_db_block >= expected_block,);
262        }
263
264        // Update consensus config to point to the new execution node
265        self.consensus_config = self
266            .consensus_config
267            .clone()
268            .with_execution_node(execution_node.node.clone().into());
269        self.execution_node = Some(execution_node);
270        debug!(%self.uid, "started execution node for testing node");
271    }
272
273    /// Start the consensus engine with oracle registration.
274    ///
275    /// # Panics
276    /// Panics if consensus is already running.
277    async fn start_consensus(&mut self, context: &Context) {
278        assert!(
279            self.consensus_handle.is_none(),
280            "consensus is already running for {}",
281            self.uid
282        );
283        let engine = self
284            .consensus_config
285            .clone()
286            .try_init(context.with_label(&format!("{}_{}", self.uid, self.n_starts)))
287            .await
288            .expect("must be able to start the engine");
289
290        let votes = self
291            .oracle
292            .control(self.public_key())
293            .register(VOTES_CHANNEL_IDENT, VOTES_LIMIT)
294            .await
295            .unwrap();
296        let certificates = self
297            .oracle
298            .control(self.public_key())
299            .register(CERTIFICATES_CHANNEL_IDENT, CERTIFICATES_LIMIT)
300            .await
301            .unwrap();
302        let resolver = self
303            .oracle
304            .control(self.public_key())
305            .register(RESOLVER_CHANNEL_IDENT, RESOLVER_LIMIT)
306            .await
307            .unwrap();
308        let broadcast = self
309            .oracle
310            .control(self.public_key())
311            .register(BROADCASTER_CHANNEL_IDENT, BROADCASTER_LIMIT)
312            .await
313            .unwrap();
314        let marshal = self
315            .oracle
316            .control(self.public_key())
317            .register(MARSHAL_CHANNEL_IDENT, MARSHAL_LIMIT)
318            .await
319            .unwrap();
320        let dkg = self
321            .oracle
322            .control(self.public_key())
323            .register(DKG_CHANNEL_IDENT, DKG_LIMIT)
324            .await
325            .unwrap();
326        let subblocks = self
327            .oracle
328            .control(self.public_key())
329            .register(SUBBLOCKS_CHANNEL_IDENT, SUBBLOCKS_LIMIT)
330            .await
331            .unwrap();
332
333        let consensus_handle = engine.start(
334            votes,
335            certificates,
336            resolver,
337            broadcast,
338            marshal,
339            dkg,
340            subblocks,
341        );
342
343        self.consensus_handle = Some(consensus_handle);
344        debug!(%self.uid, "started consensus for testing node");
345    }
346
347    /// Stop both consensus and execution layers.
348    ///
349    /// # Panics
350    /// Panics if either consensus or execution is not running.
351    pub async fn stop(&mut self) {
352        self.stop_consensus().await;
353        self.stop_execution().await;
354    }
355
356    /// Stop only the consensus engine.
357    ///
358    /// # Panics
359    /// Panics if consensus is not running.
360    #[instrument(skip_all)]
361    pub async fn stop_consensus(&mut self) {
362        let handle = self
363            .consensus_handle
364            .take()
365            .unwrap_or_else(|| panic!("consensus is not running for {}, cannot stop", self.uid));
366        handle.abort();
367
368        // Wait for the consensus handle to actually finish
369        let _ = handle.await;
370
371        debug!(%self.uid, "stopped consensus for testing node");
372    }
373
374    /// Stop only the execution node.
375    ///
376    /// This triggers a critical task failure which will cause the execution node's
377    /// executor to shutdown.
378    ///
379    /// # Panics
380    /// Panics if execution node is not running.
381    #[instrument(skip_all)]
382    async fn stop_execution(&mut self) {
383        debug!(%self.uid, "stopping execution node for testing node");
384        let execution_node = self.execution_node.take().unwrap_or_else(|| {
385            panic!(
386                "execution node is not running for {}, cannot stop",
387                self.uid
388            )
389        });
390
391        let last_db_block = execution_node
392            .node
393            .provider
394            .database_provider_ro()
395            .expect("failed to get database provider")
396            .last_block_number()
397            .expect("failed to get last block number from database");
398        tracing::debug!(
399            last_db_block,
400            "storing last block block number to verify restart"
401        );
402        self.last_db_block_on_stop = Some(last_db_block);
403
404        execution_node.shutdown().await;
405
406        // Acquire a RW transaction and immediately drop it. This blocks until any
407        // pending write transaction completes, ensuring all database writes are
408        // fully flushed. Without this, a pending write could still be in-flight
409        // after shutdown returns, leading to database/static-file inconsistencies
410        // when the node restarts.
411        drop(
412            self.execution_database
413                .as_ref()
414                .expect("database should exist")
415                .tx_mut()
416                .expect("failed to acquire rw transaction"),
417        );
418
419        debug!(%self.uid, "stopped execution node for testing node");
420    }
421
422    /// Check if both consensus and execution are running
423    pub fn is_running(&self) -> bool {
424        self.consensus_handle.is_some() && self.execution_node.is_some()
425    }
426
427    /// Check if consensus is running
428    pub fn is_consensus_running(&self) -> bool {
429        self.consensus_handle.is_some()
430    }
431
432    /// Check if execution is running
433    pub fn is_execution_running(&self) -> bool {
434        self.execution_node.is_some()
435    }
436
437    /// Get a reference to the running execution node.
438    ///
439    /// # Panics
440    /// Panics if the execution node is not running.
441    pub fn execution(&self) -> &tempo_node::TempoFullNode {
442        &self
443            .execution_node
444            .as_ref()
445            .expect("execution node is not running")
446            .node
447    }
448
449    /// Get a reference to the running consensus handle.
450    ///
451    /// # Panics
452    /// Panics if the consensus engine is not running.
453    pub fn consensus(&self) -> &Handle<eyre::Result<()>> {
454        self.consensus_handle
455            .as_ref()
456            .expect("consensus is not running")
457    }
458
459    /// Get a blockchain provider for the execution node.
460    ///
461    /// # Panics
462    /// Panics if the execution node is not running.
463    pub fn execution_provider(
464        &self,
465    ) -> BlockchainProvider<NodeTypesWithDBAdapter<TempoNode, DatabaseEnv>> {
466        self.execution().provider.clone()
467    }
468
469    /// Get a blockchain provider for when the execution node is down.
470    ///
471    /// This provider MUST BE DROPPED before starting the node again.
472    pub fn execution_provider_offline(
473        &self,
474    ) -> BlockchainProvider<NodeTypesWithDBAdapter<TempoNode, DatabaseEnv>> {
475        // Open a read-only provider to the database
476        // Note: MDBX allows multiple readers, so this is safe even if another process
477        // has the database open for reading
478        let database = open_db_read_only(self.execution_node_datadir.join("db"), test_db_args())
479            .expect("failed to open execution node database")
480            .with_metrics();
481
482        let static_file_provider =
483            StaticFileProvider::read_only(self.execution_node_datadir.join("static_files"))
484                .expect("failed to open static files");
485
486        let rocksdb = RocksDBProvider::builder(self.execution_node_datadir.join("rocksdb"))
487            .build()
488            .unwrap();
489
490        let provider_factory = ProviderFactory::<NodeTypesWithDBAdapter<TempoNode, _>>::new(
491            database,
492            Arc::new(execution_runtime::chainspec()),
493            static_file_provider,
494            rocksdb,
495            reth_ethereum::tasks::Runtime::test(),
496        )
497        .expect("failed to create provider factory");
498
499        BlockchainProvider::new(provider_factory).expect("failed to create blockchain provider")
500    }
501
502    /// Simulates a crash by unwinding the execution layer database by `n` blocks.
503    /// This creates a gap between CL (untouched) and EL state, triggering
504    /// `backfill_on_start` on the next restart.
505    ///
506    /// Returns `(height_before, height_after)`.
507    ///
508    /// # Panics
509    /// Panics if the execution node is currently running.
510    pub fn unwind(&mut self, n: u64) -> (u64, u64) {
511        assert!(
512            self.execution_node.is_none(),
513            "execution node must be stopped before unwinding for {}",
514            self.uid
515        );
516
517        let db = self
518            .execution_database
519            .as_ref()
520            .expect("database should exist")
521            .clone();
522
523        let static_file_provider =
524            StaticFileProvider::read_write(self.execution_node_datadir.join("static_files"))
525                .expect("failed to open static files for rw");
526
527        let rocksdb = self
528            .execution_rocksdb
529            .as_ref()
530            .expect("rocksdb should exist")
531            .clone();
532
533        let provider_factory = ProviderFactory::<NodeTypesWithDBAdapter<TempoNode, _>>::new(
534            db,
535            Arc::new(execution_runtime::chainspec()),
536            static_file_provider,
537            rocksdb,
538            reth_ethereum::tasks::Runtime::test(),
539        )
540        .expect("failed to create provider factory");
541
542        let current = provider_factory
543            .provider()
544            .expect("failed to get provider")
545            .last_block_number()
546            .expect("failed to get last block number");
547
548        let target = current.saturating_sub(n);
549        debug!(
550            %self.uid, current, target, n,
551            "unwinding execution layer to simulate crash"
552        );
553
554        let evm_config = TempoEvmConfig::new(Arc::new(execution_runtime::chainspec()));
555        let prune_modes = PruneModes::default();
556        let (_tip_tx, tip_rx) = tokio::sync::watch::channel(B256::ZERO);
557
558        let mut pipeline = Pipeline::<NodeTypesWithDBAdapter<TempoNode, _>>::builder()
559            .add_stages(DefaultStages::new(
560                provider_factory.clone(),
561                tip_rx,
562                NoopConsensus::arc(),
563                NoopHeaderDownloader::default(),
564                NoopBodiesDownloader::default(),
565                evm_config,
566                StageConfig::default(),
567                prune_modes.clone(),
568                None,
569            ))
570            .build(
571                provider_factory.clone(),
572                StaticFileProducer::new(provider_factory, prune_modes),
573            );
574
575        pipeline
576            .unwind(target, None)
577            .expect("failed to unwind pipeline");
578
579        // Update to the unwound height so the restart assertion still checks
580        // that the DB didn't regress further.
581        self.last_db_block_on_stop = Some(target);
582
583        debug!(%self.uid, target, "execution layer unwound successfully");
584        (current, target)
585    }
586}
587
588#[cfg(test)]
589mod tests {
590    use crate::{Setup, setup_validators};
591    use alloy::providers::{Provider, ProviderBuilder};
592    use commonware_p2p::simulated::Link;
593    use commonware_runtime::{
594        Runner as _,
595        deterministic::{Config, Runner},
596    };
597    use std::time::Duration;
598    use tokio::sync::{oneshot, oneshot::Sender};
599
600    enum Message {
601        Stop(Sender<()>),
602        Start(Sender<std::net::SocketAddr>),
603    }
604
605    /// Start node and verify RPC is accessible
606    async fn start_and_verify(tx_msg: &tokio::sync::mpsc::UnboundedSender<Message>) -> String {
607        let (tx_rpc_addr, rx_rpc_addr) = oneshot::channel();
608        let _ = tx_msg.send(Message::Start(tx_rpc_addr));
609        let rpc_addr = rx_rpc_addr.await.unwrap();
610        let rpc_url = format!("http://{rpc_addr}");
611
612        // Verify RPC is accessible
613        let provider = ProviderBuilder::new().connect_http(rpc_url.parse().unwrap());
614        let block_number = provider.get_block_number().await;
615        assert!(block_number.is_ok(), "RPC should be accessible after start");
616
617        rpc_url
618    }
619
620    #[tokio::test]
621    async fn just_restart() {
622        // Ensures that the node can be stopped completely and brought up inside a test.
623        let _ = tempo_eyre::install();
624
625        let runner = Runner::from(Config::default().with_seed(0));
626        let (tx_msg, mut rx_msg) = tokio::sync::mpsc::unbounded_channel::<Message>();
627
628        std::thread::spawn(move || {
629            runner.start(|mut context| async move {
630                let setup = Setup::new()
631                    .how_many_signers(1)
632                    .linkage(Link {
633                        latency: Duration::from_millis(10),
634                        jitter: Duration::from_millis(1),
635                        success_rate: 1.0,
636                    })
637                    .epoch_length(100);
638
639                let (mut nodes, _execution_runtime) = setup_validators(&mut context, setup).await;
640
641                let mut node = nodes.pop().unwrap();
642
643                loop {
644                    match rx_msg.blocking_recv() {
645                        Some(Message::Stop(tx_stopped)) => {
646                            node.stop().await;
647                            assert!(!node.is_running(), "node should not be running after stop");
648                            assert!(
649                                !node.is_consensus_running(),
650                                "consensus should not be running after stop"
651                            );
652                            assert!(
653                                !node.is_execution_running(),
654                                "execution should not be running after stop"
655                            );
656
657                            let _ = tx_stopped.send(());
658                        }
659                        Some(Message::Start(tx_rpc_addr)) => {
660                            node.start(&context).await;
661                            assert!(node.is_running(), "node should be running after start");
662
663                            // Get the RPC HTTP address while running
664                            let rpc_addr = node
665                                .execution()
666                                .rpc_server_handles
667                                .rpc
668                                .http_local_addr()
669                                .expect("http rpc server should be running");
670
671                            let _ = tx_rpc_addr.send(rpc_addr);
672                        }
673                        None => {
674                            break;
675                        }
676                    }
677                }
678            });
679        });
680
681        // Start the node initially
682        let rpc_url = start_and_verify(&tx_msg).await;
683
684        // Signal to stop the node
685        let (tx_stopped, rx_stopped) = oneshot::channel();
686        let _ = tx_msg.send(Message::Stop(tx_stopped));
687        rx_stopped.await.unwrap();
688
689        // Verify RPC is no longer accessible after stopping
690        let provider = ProviderBuilder::new().connect_http(rpc_url.parse().unwrap());
691        let result =
692            tokio::time::timeout(Duration::from_millis(500), provider.get_block_number()).await;
693        assert!(
694            result.is_err() || result.unwrap().is_err(),
695            "RPC should not be accessible after stopping"
696        );
697
698        // Start the node again
699        start_and_verify(&tx_msg).await;
700    }
701}