Skip to main content

tempo_e2e/
testing_node.rs

1//! A testing node that can start and stop both consensus and execution layers.
2
3use crate::execution_runtime::{self, ExecutionNode, ExecutionNodeConfig, ExecutionRuntimeHandle};
4use alloy_primitives::Address;
5use commonware_cryptography::{
6    Signer as _,
7    ed25519::{PrivateKey, PublicKey},
8};
9use commonware_p2p::simulated::{Control, Oracle, SocketManager};
10use commonware_runtime::{Handle, Metrics as _, deterministic::Context};
11use reth_db::{Database, DatabaseEnv, mdbx::DatabaseArguments, open_db_read_only};
12use reth_ethereum::{
13    provider::{
14        DatabaseProviderFactory, ProviderFactory, RocksDBProviderFactory,
15        providers::{BlockchainProvider, RocksDBProvider, StaticFileProvider},
16    },
17    storage::BlockNumReader,
18};
19use reth_node_builder::NodeTypesWithDBAdapter;
20use std::{
21    net::{IpAddr, SocketAddr},
22    path::PathBuf,
23    sync::Arc,
24};
25use tempo_commonware_node::{
26    BROADCASTER_CHANNEL_IDENT, BROADCASTER_LIMIT, CERTIFICATES_CHANNEL_IDENT, CERTIFICATES_LIMIT,
27    DKG_CHANNEL_IDENT, DKG_LIMIT, MARSHAL_CHANNEL_IDENT, MARSHAL_LIMIT, RESOLVER_CHANNEL_IDENT,
28    RESOLVER_LIMIT, SUBBLOCKS_CHANNEL_IDENT, SUBBLOCKS_LIMIT, VOTES_CHANNEL_IDENT, VOTES_LIMIT,
29    consensus,
30};
31use tempo_node::node::TempoNode;
32use tracing::{debug, instrument};
33
34/// A testing node that can start and stop both consensus and execution layers.
35pub struct TestingNode<TClock>
36where
37    TClock: commonware_runtime::Clock,
38{
39    /// Unique identifier for this node
40    pub uid: String,
41    /// Public key of the validator
42    pub private_key: PrivateKey,
43    /// Simulated network oracle for test environments
44    pub oracle: Oracle<PublicKey, TClock>,
45    /// Consensus configuration used to start the consensus engine
46    pub consensus_config:
47        consensus::Builder<Control<PublicKey, TClock>, SocketManager<PublicKey, TClock>>,
48    /// Running consensus handle (None if consensus is stopped)
49    pub consensus_handle: Option<Handle<eyre::Result<()>>>,
50    /// Path to the execution node's data directory
51    pub execution_node_datadir: PathBuf,
52    /// Running execution node (None if execution is stopped)
53    pub execution_node: Option<ExecutionNode>,
54    /// Handle to the execution runtime for spawning new execution nodes
55    pub execution_runtime: ExecutionRuntimeHandle,
56    /// Configuration for the execution node
57    pub execution_config: ExecutionNodeConfig,
58    /// Database instance for the execution node
59    pub execution_database: Option<DatabaseEnv>,
60    /// RocksDB provider for the execution node
61    pub execution_rocksdb: Option<RocksDBProvider>,
62    /// The execution node name assigned at initialization. Important when
63    /// constructing the datadir at which to find the node.
64    pub execution_node_name: String,
65    /// Last block number in database when stopped (used for restart verification)
66    pub last_db_block_on_stop: Option<u64>,
67    /// Network address of the node. Used for execution the validator-config
68    /// addValidator contract call.
69    pub network_address: SocketAddr,
70    /// The chain address of the node. Used for executing validator-config smart
71    /// contract calls.
72    pub chain_address: Address,
73
74    n_starts: u32,
75}
76
77impl<TClock> TestingNode<TClock>
78where
79    TClock: commonware_runtime::Clock,
80{
81    /// Create a new TestingNode without spawning execution or starting consensus.
82    ///
83    /// Call `start()` to start both consensus and execution.
84    // FIXME: replace this by a `Config` to make this more digestible.
85    #[expect(clippy::too_many_arguments, reason = "quickly threw this together")]
86    pub fn new(
87        uid: String,
88        private_key: PrivateKey,
89        oracle: Oracle<PublicKey, TClock>,
90        consensus_config: consensus::Builder<
91            Control<PublicKey, TClock>,
92            SocketManager<PublicKey, TClock>,
93        >,
94        execution_runtime: ExecutionRuntimeHandle,
95        execution_config: ExecutionNodeConfig,
96        network_address: SocketAddr,
97        chain_address: Address,
98    ) -> Self {
99        let public_key = private_key.public_key();
100        let execution_node_datadir = execution_runtime
101            .nodes_dir()
102            .join(execution_runtime::execution_node_name(&public_key));
103
104        let execution_node_name = execution_runtime::execution_node_name(&public_key);
105        Self {
106            uid,
107            private_key,
108            oracle,
109            consensus_config,
110            consensus_handle: None,
111            execution_node: None,
112            execution_node_datadir,
113            execution_runtime,
114            execution_config,
115            execution_node_name,
116            execution_database: None,
117            execution_rocksdb: None,
118            last_db_block_on_stop: None,
119            network_address,
120            chain_address,
121
122            n_starts: 0,
123        }
124    }
125
126    pub fn fee_recipient(&self) -> Address {
127        Address::ZERO
128    }
129
130    pub fn private_key(&self) -> &PrivateKey {
131        &self.private_key
132    }
133
134    /// Get the validator public key of this node.
135    pub fn public_key(&self) -> PublicKey {
136        self.private_key.public_key()
137    }
138
139    /// Get the unique identifier of this node.
140    pub fn uid(&self) -> &str {
141        &self.uid
142    }
143
144    /// Get the metric prefix used by the most recently started instance.
145    ///
146    /// # Panics
147    /// Panics if the node has was never started.
148    pub fn metric_prefix(&self) -> String {
149        assert!(self.n_starts > 0, "node has never been started");
150        format!("{}_{}", self.uid, self.n_starts - 1)
151    }
152
153    /// Get a reference to the consensus config.
154    pub fn consensus_config(
155        &self,
156    ) -> &consensus::Builder<Control<PublicKey, TClock>, SocketManager<PublicKey, TClock>> {
157        &self.consensus_config
158    }
159
160    /// Get a mutable reference to the consensus config.
161    pub fn consensus_config_mut(
162        &mut self,
163    ) -> &mut consensus::Builder<Control<PublicKey, TClock>, SocketManager<PublicKey, TClock>> {
164        &mut self.consensus_config
165    }
166
167    /// Get a reference to the oracle.
168    pub fn oracle(&self) -> &Oracle<PublicKey, TClock> {
169        &self.oracle
170    }
171
172    pub fn ingress(&self) -> SocketAddr {
173        self.network_address
174    }
175
176    pub fn egress(&self) -> IpAddr {
177        self.network_address.ip()
178    }
179
180    /// A verifier is a node that has a share.
181    pub fn is_signer(&self) -> bool {
182        self.consensus_config.share.is_some()
183    }
184
185    /// A verifier is a node that has no share.
186    pub fn is_verifier(&self) -> bool {
187        self.consensus_config.share.is_none()
188    }
189
190    /// Start both consensus and execution layers.
191    ///
192    ///
193    /// # Panics
194    /// Panics if either consensus or execution is already running.
195    pub async fn start(&mut self, context: &Context) {
196        self.start_execution().await;
197        self.start_consensus(context).await;
198        self.n_starts += 1;
199    }
200
201    /// Start the execution node and update consensus config to reference it.
202    ///
203    /// # Panics
204    /// Panics if execution node is already running.
205    #[instrument(skip_all, fields(last_db_block = self.last_db_block_on_stop))]
206    async fn start_execution(&mut self) {
207        assert!(
208            self.execution_node.is_none(),
209            "execution node is already running for {}",
210            self.uid
211        );
212
213        // Create database if not exists
214        if self.execution_database.is_none() {
215            let db_path = self.execution_node_datadir.join("db");
216            self.execution_database = Some(
217                reth_db::init_db(db_path, DatabaseArguments::default())
218                    .expect("failed to init database")
219                    .with_metrics(),
220            );
221        }
222
223        let execution_node = self
224            .execution_runtime
225            .spawn_node(
226                &self.execution_node_name,
227                self.execution_config.clone(),
228                self.execution_database.as_ref().unwrap().clone(),
229                self.execution_rocksdb.clone(),
230            )
231            .await
232            .expect("must be able to spawn execution node");
233
234        if self.execution_rocksdb.is_none() {
235            self.execution_rocksdb = Some(execution_node.node.provider().rocksdb_provider());
236        }
237
238        // verify database persistence on restart
239        if let Some(expected_block) = self.last_db_block_on_stop {
240            let current_db_block = execution_node
241                .node
242                .provider
243                .database_provider_ro()
244                .expect("failed to get database provider")
245                .last_block_number()
246                .expect("failed to get last block number from database");
247
248            assert!(current_db_block >= expected_block,);
249        }
250
251        // Update consensus config to point to the new execution node
252        self.consensus_config = self
253            .consensus_config
254            .clone()
255            .with_execution_node(execution_node.node.clone());
256        self.execution_node = Some(execution_node);
257        debug!(%self.uid, "started execution node for testing node");
258    }
259
260    /// Start the consensus engine with oracle registration.
261    ///
262    /// # Panics
263    /// Panics if consensus is already running.
264    async fn start_consensus(&mut self, context: &Context) {
265        assert!(
266            self.consensus_handle.is_none(),
267            "consensus is already running for {}",
268            self.uid
269        );
270        let engine = self
271            .consensus_config
272            .clone()
273            .try_init(context.with_label(&format!("{}_{}", self.uid, self.n_starts)))
274            .await
275            .expect("must be able to start the engine");
276
277        let votes = self
278            .oracle
279            .control(self.public_key())
280            .register(VOTES_CHANNEL_IDENT, VOTES_LIMIT)
281            .await
282            .unwrap();
283        let certificates = self
284            .oracle
285            .control(self.public_key())
286            .register(CERTIFICATES_CHANNEL_IDENT, CERTIFICATES_LIMIT)
287            .await
288            .unwrap();
289        let resolver = self
290            .oracle
291            .control(self.public_key())
292            .register(RESOLVER_CHANNEL_IDENT, RESOLVER_LIMIT)
293            .await
294            .unwrap();
295        let broadcast = self
296            .oracle
297            .control(self.public_key())
298            .register(BROADCASTER_CHANNEL_IDENT, BROADCASTER_LIMIT)
299            .await
300            .unwrap();
301        let marshal = self
302            .oracle
303            .control(self.public_key())
304            .register(MARSHAL_CHANNEL_IDENT, MARSHAL_LIMIT)
305            .await
306            .unwrap();
307        let dkg = self
308            .oracle
309            .control(self.public_key())
310            .register(DKG_CHANNEL_IDENT, DKG_LIMIT)
311            .await
312            .unwrap();
313        let subblocks = self
314            .oracle
315            .control(self.public_key())
316            .register(SUBBLOCKS_CHANNEL_IDENT, SUBBLOCKS_LIMIT)
317            .await
318            .unwrap();
319
320        let consensus_handle = engine.start(
321            votes,
322            certificates,
323            resolver,
324            broadcast,
325            marshal,
326            dkg,
327            subblocks,
328        );
329
330        self.consensus_handle = Some(consensus_handle);
331        debug!(%self.uid, "started consensus for testing node");
332    }
333
334    /// Stop both consensus and execution layers.
335    ///
336    /// # Panics
337    /// Panics if either consensus or execution is not running.
338    pub async fn stop(&mut self) {
339        self.stop_consensus().await;
340        self.stop_execution().await;
341    }
342
343    /// Stop only the consensus engine.
344    ///
345    /// # Panics
346    /// Panics if consensus is not running.
347    #[instrument(skip_all)]
348    async fn stop_consensus(&mut self) {
349        let handle = self
350            .consensus_handle
351            .take()
352            .unwrap_or_else(|| panic!("consensus is not running for {}, cannot stop", self.uid));
353        handle.abort();
354
355        // Wait for the consensus handle to actually finish
356        let _ = handle.await;
357
358        debug!(%self.uid, "stopped consensus for testing node");
359    }
360
361    /// Stop only the execution node.
362    ///
363    /// This triggers a critical task failure which will cause the execution node's
364    /// executor to shutdown.
365    ///
366    /// # Panics
367    /// Panics if execution node is not running.
368    #[instrument(skip_all)]
369    async fn stop_execution(&mut self) {
370        debug!(%self.uid, "stopping execution node for testing node");
371        let execution_node = self.execution_node.take().unwrap_or_else(|| {
372            panic!(
373                "execution node is not running for {}, cannot stop",
374                self.uid
375            )
376        });
377
378        let last_db_block = execution_node
379            .node
380            .provider
381            .database_provider_ro()
382            .expect("failed to get database provider")
383            .last_block_number()
384            .expect("failed to get last block number from database");
385        tracing::debug!(
386            last_db_block,
387            "storing last block block number to verify restart"
388        );
389        self.last_db_block_on_stop = Some(last_db_block);
390
391        execution_node.shutdown().await;
392
393        // Acquire a RW transaction and immediately drop it. This blocks until any
394        // pending write transaction completes, ensuring all database writes are
395        // fully flushed. Without this, a pending write could still be in-flight
396        // after shutdown returns, leading to database/static-file inconsistencies
397        // when the node restarts.
398        drop(
399            self.execution_database
400                .as_ref()
401                .expect("database should exist")
402                .tx_mut()
403                .expect("failed to acquire rw transaction"),
404        );
405
406        debug!(%self.uid, "stopped execution node for testing node");
407    }
408
409    /// Check if both consensus and execution are running
410    pub fn is_running(&self) -> bool {
411        self.consensus_handle.is_some() && self.execution_node.is_some()
412    }
413
414    /// Check if consensus is running
415    pub fn is_consensus_running(&self) -> bool {
416        self.consensus_handle.is_some()
417    }
418
419    /// Check if execution is running
420    pub fn is_execution_running(&self) -> bool {
421        self.execution_node.is_some()
422    }
423
424    /// Get a reference to the running execution node.
425    ///
426    /// # Panics
427    /// Panics if the execution node is not running.
428    pub fn execution(&self) -> &tempo_node::TempoFullNode {
429        &self
430            .execution_node
431            .as_ref()
432            .expect("execution node is not running")
433            .node
434    }
435
436    /// Get a reference to the running consensus handle.
437    ///
438    /// # Panics
439    /// Panics if the consensus engine is not running.
440    pub fn consensus(&self) -> &Handle<eyre::Result<()>> {
441        self.consensus_handle
442            .as_ref()
443            .expect("consensus is not running")
444    }
445
446    /// Get a blockchain provider for the execution node.
447    ///
448    /// # Panics
449    /// Panics if the execution node is not running.
450    pub fn execution_provider(
451        &self,
452    ) -> BlockchainProvider<NodeTypesWithDBAdapter<TempoNode, DatabaseEnv>> {
453        self.execution().provider.clone()
454    }
455
456    /// Get a blockchain provider for when the execution node is down.
457    ///
458    /// This provider MUST BE DROPPED before starting the node again.
459    pub fn execution_provider_offline(
460        &self,
461    ) -> BlockchainProvider<NodeTypesWithDBAdapter<TempoNode, DatabaseEnv>> {
462        // Open a read-only provider to the database
463        // Note: MDBX allows multiple readers, so this is safe even if another process
464        // has the database open for reading
465        let database = open_db_read_only(
466            self.execution_node_datadir.join("db"),
467            DatabaseArguments::default(),
468        )
469        .expect("failed to open execution node database")
470        .with_metrics();
471
472        let static_file_provider =
473            StaticFileProvider::read_only(self.execution_node_datadir.join("static_files"), true)
474                .expect("failed to open static files");
475
476        let rocksdb = RocksDBProvider::builder(self.execution_node_datadir.join("rocksdb"))
477            .build()
478            .unwrap();
479
480        let provider_factory = ProviderFactory::<NodeTypesWithDBAdapter<TempoNode, _>>::new(
481            database,
482            Arc::new(execution_runtime::chainspec()),
483            static_file_provider,
484            rocksdb,
485            reth_ethereum::tasks::Runtime::test(),
486        )
487        .expect("failed to create provider factory");
488
489        BlockchainProvider::new(provider_factory).expect("failed to create blockchain provider")
490    }
491}
492
493#[cfg(test)]
494mod tests {
495    use crate::{Setup, setup_validators};
496    use alloy::providers::{Provider, ProviderBuilder};
497    use commonware_p2p::simulated::Link;
498    use commonware_runtime::{
499        Runner as _,
500        deterministic::{Config, Runner},
501    };
502    use std::time::Duration;
503    use tokio::sync::{oneshot, oneshot::Sender};
504
505    enum Message {
506        Stop(Sender<()>),
507        Start(Sender<std::net::SocketAddr>),
508    }
509
510    /// Start node and verify RPC is accessible
511    async fn start_and_verify(tx_msg: &tokio::sync::mpsc::UnboundedSender<Message>) -> String {
512        let (tx_rpc_addr, rx_rpc_addr) = oneshot::channel();
513        let _ = tx_msg.send(Message::Start(tx_rpc_addr));
514        let rpc_addr = rx_rpc_addr.await.unwrap();
515        let rpc_url = format!("http://{rpc_addr}");
516
517        // Verify RPC is accessible
518        let provider = ProviderBuilder::new().connect_http(rpc_url.parse().unwrap());
519        let block_number = provider.get_block_number().await;
520        assert!(block_number.is_ok(), "RPC should be accessible after start");
521
522        rpc_url
523    }
524
525    #[tokio::test]
526    async fn just_restart() {
527        // Ensures that the node can be stopped completely and brought up inside a test.
528        let _ = tempo_eyre::install();
529
530        let runner = Runner::from(Config::default().with_seed(0));
531        let (tx_msg, mut rx_msg) = tokio::sync::mpsc::unbounded_channel::<Message>();
532
533        std::thread::spawn(move || {
534            runner.start(|mut context| async move {
535                let setup = Setup::new()
536                    .how_many_signers(1)
537                    .linkage(Link {
538                        latency: Duration::from_millis(10),
539                        jitter: Duration::from_millis(1),
540                        success_rate: 1.0,
541                    })
542                    .epoch_length(100);
543
544                let (mut nodes, _execution_runtime) = setup_validators(&mut context, setup).await;
545
546                let mut node = nodes.pop().unwrap();
547
548                loop {
549                    match rx_msg.blocking_recv() {
550                        Some(Message::Stop(tx_stopped)) => {
551                            node.stop().await;
552                            assert!(!node.is_running(), "node should not be running after stop");
553                            assert!(
554                                !node.is_consensus_running(),
555                                "consensus should not be running after stop"
556                            );
557                            assert!(
558                                !node.is_execution_running(),
559                                "execution should not be running after stop"
560                            );
561
562                            let _ = tx_stopped.send(());
563                        }
564                        Some(Message::Start(tx_rpc_addr)) => {
565                            node.start(&context).await;
566                            assert!(node.is_running(), "node should be running after start");
567
568                            // Get the RPC HTTP address while running
569                            let rpc_addr = node
570                                .execution()
571                                .rpc_server_handles
572                                .rpc
573                                .http_local_addr()
574                                .expect("http rpc server should be running");
575
576                            let _ = tx_rpc_addr.send(rpc_addr);
577                        }
578                        None => {
579                            break;
580                        }
581                    }
582                }
583            });
584        });
585
586        // Start the node initially
587        let rpc_url = start_and_verify(&tx_msg).await;
588
589        // Signal to stop the node
590        let (tx_stopped, rx_stopped) = oneshot::channel();
591        let _ = tx_msg.send(Message::Stop(tx_stopped));
592        rx_stopped.await.unwrap();
593
594        // Verify RPC is no longer accessible after stopping
595        let provider = ProviderBuilder::new().connect_http(rpc_url.parse().unwrap());
596        let result =
597            tokio::time::timeout(Duration::from_millis(500), provider.get_block_number()).await;
598        assert!(
599            result.is_err() || result.unwrap().is_err(),
600            "RPC should not be accessible after stopping"
601        );
602
603        // Start the node again
604        start_and_verify(&tx_msg).await;
605    }
606}