tempo_e2e/
testing_node.rs

1//! A testing node that can start and stop both consensus and execution layers.
2
3use crate::execution_runtime::{self, ExecutionNode, ExecutionNodeConfig, ExecutionRuntimeHandle};
4use commonware_cryptography::ed25519::PublicKey;
5use commonware_p2p::simulated::{Control, Oracle, SocketManager};
6use commonware_runtime::{Handle, deterministic::Context};
7use reth_db::{Database, DatabaseEnv, mdbx::DatabaseArguments, open_db_read_only};
8use reth_ethereum::{
9    provider::{
10        DatabaseProviderFactory, ProviderFactory,
11        providers::{BlockchainProvider, StaticFileProvider},
12    },
13    storage::BlockNumReader,
14};
15use reth_node_builder::NodeTypesWithDBAdapter;
16use std::{path::PathBuf, sync::Arc};
17use tempo_commonware_node::consensus;
18use tempo_node::node::TempoNode;
19use tracing::{debug, instrument};
20
21/// A testing node that can start and stop both consensus and execution layers.
22pub struct TestingNode {
23    /// Unique identifier for this node
24    uid: String,
25    /// Public key of the validator
26    public_key: PublicKey,
27    /// Simulated network oracle for test environments
28    oracle: Oracle<PublicKey>,
29    /// Consensus configuration used to start the consensus engine
30    consensus_config: consensus::Builder<Control<PublicKey>, Context, SocketManager<PublicKey>>,
31    /// Running consensus handle (None if consensus is stopped)
32    consensus_handle: Option<Handle<eyre::Result<()>>>,
33    /// Path to the execution node's data directory
34    execution_node_datadir: PathBuf,
35    /// Running execution node (None if execution is stopped)
36    execution_node: Option<ExecutionNode>,
37    /// Handle to the execution runtime for spawning new execution nodes
38    execution_runtime: ExecutionRuntimeHandle,
39    /// Configuration for the execution node
40    execution_config: ExecutionNodeConfig,
41    /// Database instance for the execution node
42    execution_database: Option<Arc<DatabaseEnv>>,
43    /// Last block number in database when stopped (used for restart verification)
44    last_db_block_on_stop: Option<u64>,
45}
46
47impl TestingNode {
48    /// Create a new TestingNode without spawning execution or starting consensus.
49    ///
50    /// Call `start()` to start both consensus and execution.
51    pub fn new(
52        uid: String,
53        public_key: PublicKey,
54        oracle: Oracle<PublicKey>,
55        consensus_config: consensus::Builder<Control<PublicKey>, Context, SocketManager<PublicKey>>,
56        execution_runtime: ExecutionRuntimeHandle,
57        execution_config: ExecutionNodeConfig,
58    ) -> Self {
59        let execution_node_datadir = execution_runtime
60            .nodes_dir()
61            .join(execution_runtime::execution_node_name(&public_key));
62
63        Self {
64            uid,
65            public_key,
66            oracle,
67            consensus_config,
68            consensus_handle: None,
69            execution_node: None,
70            execution_node_datadir,
71            execution_runtime,
72            execution_config,
73            execution_database: None,
74            last_db_block_on_stop: None,
75        }
76    }
77
78    /// Get the validator public key of this node.
79    pub fn public_key(&self) -> &PublicKey {
80        &self.public_key
81    }
82
83    /// Get the unique identifier of this node.
84    pub fn uid(&self) -> &str {
85        &self.uid
86    }
87
88    /// Get a reference to the consensus config.
89    pub fn consensus_config(
90        &self,
91    ) -> &consensus::Builder<Control<PublicKey>, Context, SocketManager<PublicKey>> {
92        &self.consensus_config
93    }
94
95    /// Get a mutable reference to the consensus config.
96    pub fn consensus_config_mut(
97        &mut self,
98    ) -> &mut consensus::Builder<Control<PublicKey>, Context, SocketManager<PublicKey>> {
99        &mut self.consensus_config
100    }
101
102    /// Get a reference to the oracle.
103    pub fn oracle(&self) -> &Oracle<PublicKey> {
104        &self.oracle
105    }
106
107    /// Start both consensus and execution layers.
108    ///
109    ///
110    /// # Panics
111    /// Panics if either consensus or execution is already running.
112    pub async fn start(&mut self) {
113        self.start_execution().await;
114        self.start_consensus().await;
115    }
116
117    /// Start the execution node and update consensus config to reference it.
118    ///
119    /// # Panics
120    /// Panics if execution node is already running.
121    #[instrument(skip_all, fields(last_db_block = self.last_db_block_on_stop))]
122    async fn start_execution(&mut self) {
123        assert!(
124            self.execution_node.is_none(),
125            "execution node is already running for {}",
126            self.uid
127        );
128
129        // Create database if not exists
130        if self.execution_database.is_none() {
131            let db_path = self.execution_node_datadir.join("db");
132            self.execution_database = Some(Arc::new(
133                reth_db::init_db(db_path, DatabaseArguments::default())
134                    .expect("failed to init database")
135                    .with_metrics(),
136            ));
137        }
138
139        let execution_node = self
140            .execution_runtime
141            .spawn_node(
142                &execution_runtime::execution_node_name(&self.public_key),
143                self.execution_config.clone(),
144                self.execution_database.as_ref().unwrap().clone(),
145            )
146            .await
147            .expect("must be able to spawn execution node");
148
149        // verify database persistence on restart
150        if let Some(expected_block) = self.last_db_block_on_stop {
151            let current_db_block = execution_node
152                .node
153                .provider
154                .database_provider_ro()
155                .expect("failed to get database provider")
156                .last_block_number()
157                .expect("failed to get last block number from database");
158
159            assert!(current_db_block >= expected_block,);
160        }
161
162        // Update consensus config to point to the new execution node
163        self.consensus_config = self
164            .consensus_config
165            .clone()
166            .with_execution_node(execution_node.node.clone());
167        self.execution_node = Some(execution_node);
168        debug!(%self.uid, "started execution node for testing node");
169    }
170
171    /// Start the consensus engine with oracle registration.
172    ///
173    /// # Panics
174    /// Panics if consensus is already running.
175    async fn start_consensus(&mut self) {
176        assert!(
177            self.consensus_handle.is_none(),
178            "consensus is already running for {}",
179            self.uid
180        );
181        let engine = self
182            .consensus_config
183            .clone()
184            .try_init()
185            .await
186            .expect("must be able to start the engine");
187
188        let pending = self
189            .oracle
190            .control(self.public_key.clone())
191            .register(0)
192            .await
193            .unwrap();
194        let recovered = self
195            .oracle
196            .control(self.public_key.clone())
197            .register(1)
198            .await
199            .unwrap();
200        let resolver = self
201            .oracle
202            .control(self.public_key.clone())
203            .register(2)
204            .await
205            .unwrap();
206        let broadcast = self
207            .oracle
208            .control(self.public_key.clone())
209            .register(3)
210            .await
211            .unwrap();
212        let marshal = self
213            .oracle
214            .control(self.public_key.clone())
215            .register(4)
216            .await
217            .unwrap();
218        let dkg = self
219            .oracle
220            .control(self.public_key.clone())
221            .register(5)
222            .await
223            .unwrap();
224        let boundary_certs = self
225            .oracle
226            .control(self.public_key.clone())
227            .register(6)
228            .await
229            .unwrap();
230        let subblocks = self
231            .oracle
232            .control(self.public_key.clone())
233            .register(7)
234            .await
235            .unwrap();
236
237        let consensus_handle = engine.start(
238            pending,
239            recovered,
240            resolver,
241            broadcast,
242            marshal,
243            dkg,
244            boundary_certs,
245            subblocks,
246        );
247
248        self.consensus_handle = Some(consensus_handle);
249        debug!(%self.uid, "started consensus for testing node");
250    }
251
252    /// Stop both consensus and execution layers.
253    ///
254    /// # Panics
255    /// Panics if either consensus or execution is not running.
256    pub async fn stop(&mut self) {
257        self.stop_consensus().await;
258        self.stop_execution().await
259    }
260
261    /// Stop only the consensus engine.
262    ///
263    /// # Panics
264    /// Panics if consensus is not running.
265    #[instrument(skip_all)]
266    async fn stop_consensus(&mut self) {
267        let handle = self
268            .consensus_handle
269            .take()
270            .unwrap_or_else(|| panic!("consensus is not running for {}, cannot stop", self.uid));
271        handle.abort();
272
273        // Wait for the consensus handle to actually finish
274        let _ = handle.await;
275
276        debug!(%self.uid, "stopped consensus for testing node");
277    }
278
279    /// Stop only the execution node.
280    ///
281    /// This triggers a critical task failure which will cause the execution node's
282    /// executor to shutdown.
283    ///
284    /// # Panics
285    /// Panics if execution node is not running.
286    #[instrument(skip_all)]
287    async fn stop_execution(&mut self) {
288        debug!(%self.uid, "stopping execution node for testing node");
289        let execution_node = self.execution_node.take().unwrap_or_else(|| {
290            panic!(
291                "execution node is not running for {}, cannot stop",
292                self.uid
293            )
294        });
295
296        let last_db_block = execution_node
297            .node
298            .provider
299            .database_provider_ro()
300            .expect("failed to get database provider")
301            .last_block_number()
302            .expect("failed to get last block number from database");
303        tracing::debug!(
304            last_db_block,
305            "storing last block block number to verify restart"
306        );
307        self.last_db_block_on_stop = Some(last_db_block);
308
309        execution_node.shutdown().await;
310
311        // Acquire a RW transaction and immediately drop it. This blocks until any
312        // pending write transaction completes, ensuring all database writes are
313        // fully flushed. Without this, a pending write could still be in-flight
314        // after shutdown returns, leading to database/static-file inconsistencies
315        // when the node restarts.
316        drop(
317            self.execution_database
318                .as_ref()
319                .expect("database should exist")
320                .tx_mut()
321                .expect("failed to acquire rw transaction"),
322        );
323
324        debug!(%self.uid, "stopped execution node for testing node");
325    }
326
327    /// Check if both consensus and execution are running
328    pub fn is_running(&self) -> bool {
329        self.consensus_handle.is_some() && self.execution_node.is_some()
330    }
331
332    /// Check if consensus is running
333    pub fn is_consensus_running(&self) -> bool {
334        self.consensus_handle.is_some()
335    }
336
337    /// Check if execution is running
338    pub fn is_execution_running(&self) -> bool {
339        self.execution_node.is_some()
340    }
341
342    /// Get a reference to the running execution node.
343    ///
344    /// # Panics
345    /// Panics if the execution node is not running.
346    pub fn execution(&self) -> &tempo_node::TempoFullNode {
347        &self
348            .execution_node
349            .as_ref()
350            .expect("execution node is not running")
351            .node
352    }
353
354    /// Get a reference to the running consensus handle.
355    ///
356    /// # Panics
357    /// Panics if the consensus engine is not running.
358    pub fn consensus(&self) -> &Handle<eyre::Result<()>> {
359        self.consensus_handle
360            .as_ref()
361            .expect("consensus is not running")
362    }
363
364    /// Get a blockchain provider for the execution node.
365    ///
366    /// # Panics
367    /// Panics if the execution node is not running.
368    pub fn execution_provider(
369        &self,
370    ) -> BlockchainProvider<NodeTypesWithDBAdapter<TempoNode, Arc<DatabaseEnv>>> {
371        self.execution().provider.clone()
372    }
373
374    /// Get a blockchain provider for when the execution node is down.
375    ///
376    /// This provider MUST BE DROPPED before starting the node again.
377    pub fn execution_provider_offline(
378        &self,
379    ) -> BlockchainProvider<NodeTypesWithDBAdapter<TempoNode, Arc<DatabaseEnv>>> {
380        // Open a read-only provider to the database
381        // Note: MDBX allows multiple readers, so this is safe even if another process
382        // has the database open for reading
383        let database = Arc::new(
384            open_db_read_only(
385                self.execution_node_datadir.join("db"),
386                DatabaseArguments::default(),
387            )
388            .expect("failed to open execution node database")
389            .with_metrics(),
390        );
391
392        let static_file_provider =
393            StaticFileProvider::read_only(self.execution_node_datadir.join("static_files"), true)
394                .expect("failed to open static files");
395
396        let provider_factory = ProviderFactory::<NodeTypesWithDBAdapter<TempoNode, _>>::new(
397            database,
398            Arc::new(execution_runtime::chainspec()),
399            static_file_provider,
400        )
401        .expect("failed to create provider factory");
402
403        BlockchainProvider::new(provider_factory).expect("failed to create blockchain provider")
404    }
405}
406
407#[cfg(test)]
408mod tests {
409    use crate::{Setup, setup_validators};
410    use alloy::providers::{Provider, ProviderBuilder};
411    use commonware_p2p::simulated::Link;
412    use commonware_runtime::{
413        Runner as _,
414        deterministic::{Config, Runner},
415    };
416    use std::time::Duration;
417    use tokio::sync::{oneshot, oneshot::Sender};
418
419    enum Message {
420        Stop(Sender<()>),
421        Start(Sender<std::net::SocketAddr>),
422    }
423
424    /// Start node and verify RPC is accessible
425    async fn start_and_verify(tx_msg: &tokio::sync::mpsc::UnboundedSender<Message>) -> String {
426        let (tx_rpc_addr, rx_rpc_addr) = oneshot::channel();
427        let _ = tx_msg.send(Message::Start(tx_rpc_addr));
428        let rpc_addr = rx_rpc_addr.await.unwrap();
429        let rpc_url = format!("http://{rpc_addr}");
430
431        // Verify RPC is accessible
432        let provider = ProviderBuilder::new().connect_http(rpc_url.parse().unwrap());
433        let block_number = provider.get_block_number().await;
434        assert!(block_number.is_ok(), "RPC should be accessible after start");
435
436        rpc_url
437    }
438
439    #[tokio::test]
440    async fn test_restart() {
441        // Ensures that the node can be stopped completely and brought up inside a test.
442        let _ = tempo_eyre::install();
443
444        let runner = Runner::from(Config::default().with_seed(0));
445        let (tx_msg, mut rx_msg) = tokio::sync::mpsc::unbounded_channel::<Message>();
446
447        std::thread::spawn(move || {
448            runner.start(|context| async move {
449                let setup = Setup::new()
450                    .how_many_signers(1)
451                    .linkage(Link {
452                        latency: Duration::from_millis(10),
453                        jitter: Duration::from_millis(1),
454                        success_rate: 1.0,
455                    })
456                    .epoch_length(100);
457
458                let (mut nodes, _execution_runtime) =
459                    setup_validators(context.clone(), setup).await;
460
461                let mut node = nodes.pop().unwrap();
462
463                loop {
464                    match rx_msg.blocking_recv() {
465                        Some(Message::Stop(tx_stopped)) => {
466                            node.stop().await;
467                            assert!(!node.is_running(), "node should not be running after stop");
468                            assert!(
469                                !node.is_consensus_running(),
470                                "consensus should not be running after stop"
471                            );
472                            assert!(
473                                !node.is_execution_running(),
474                                "execution should not be running after stop"
475                            );
476
477                            let _ = tx_stopped.send(());
478                        }
479                        Some(Message::Start(tx_rpc_addr)) => {
480                            node.start().await;
481                            assert!(node.is_running(), "node should be running after start");
482
483                            // Get the RPC HTTP address while running
484                            let rpc_addr = node
485                                .execution()
486                                .rpc_server_handles
487                                .rpc
488                                .http_local_addr()
489                                .expect("http rpc server should be running");
490
491                            let _ = tx_rpc_addr.send(rpc_addr);
492                        }
493                        None => {
494                            break;
495                        }
496                    }
497                }
498            });
499        });
500
501        // Start the node initially
502        let rpc_url = start_and_verify(&tx_msg).await;
503
504        // Signal to stop the node
505        let (tx_stopped, rx_stopped) = oneshot::channel();
506        let _ = tx_msg.send(Message::Stop(tx_stopped));
507        rx_stopped.await.unwrap();
508
509        // Verify RPC is no longer accessible after stopping
510        let provider = ProviderBuilder::new().connect_http(rpc_url.parse().unwrap());
511        let result =
512            tokio::time::timeout(Duration::from_millis(500), provider.get_block_number()).await;
513        assert!(
514            result.is_err() || result.unwrap().is_err(),
515            "RPC should not be accessible after stopping"
516        );
517
518        // Start the node again
519        start_and_verify(&tx_msg).await;
520    }
521}