1use crate::execution_runtime::{self, ExecutionNode, ExecutionNodeConfig, ExecutionRuntimeHandle};
4use commonware_cryptography::ed25519::PublicKey;
5use commonware_p2p::simulated::{Control, Oracle, SocketManager};
6use commonware_runtime::{Handle, deterministic::Context};
7use reth_db::{Database, DatabaseEnv, mdbx::DatabaseArguments, open_db_read_only};
8use reth_ethereum::{
9 provider::{
10 DatabaseProviderFactory, ProviderFactory,
11 providers::{BlockchainProvider, StaticFileProvider},
12 },
13 storage::BlockNumReader,
14};
15use reth_node_builder::NodeTypesWithDBAdapter;
16use std::{path::PathBuf, sync::Arc};
17use tempo_commonware_node::consensus;
18use tempo_node::node::TempoNode;
19use tracing::{debug, instrument};
20
21pub struct TestingNode {
23 uid: String,
25 public_key: PublicKey,
27 oracle: Oracle<PublicKey>,
29 consensus_config: consensus::Builder<Control<PublicKey>, Context, SocketManager<PublicKey>>,
31 consensus_handle: Option<Handle<eyre::Result<()>>>,
33 execution_node_datadir: PathBuf,
35 execution_node: Option<ExecutionNode>,
37 execution_runtime: ExecutionRuntimeHandle,
39 execution_config: ExecutionNodeConfig,
41 execution_database: Option<Arc<DatabaseEnv>>,
43 last_db_block_on_stop: Option<u64>,
45}
46
47impl TestingNode {
48 pub fn new(
52 uid: String,
53 public_key: PublicKey,
54 oracle: Oracle<PublicKey>,
55 consensus_config: consensus::Builder<Control<PublicKey>, Context, SocketManager<PublicKey>>,
56 execution_runtime: ExecutionRuntimeHandle,
57 execution_config: ExecutionNodeConfig,
58 ) -> Self {
59 let execution_node_datadir = execution_runtime
60 .nodes_dir()
61 .join(execution_runtime::execution_node_name(&public_key));
62
63 Self {
64 uid,
65 public_key,
66 oracle,
67 consensus_config,
68 consensus_handle: None,
69 execution_node: None,
70 execution_node_datadir,
71 execution_runtime,
72 execution_config,
73 execution_database: None,
74 last_db_block_on_stop: None,
75 }
76 }
77
78 pub fn public_key(&self) -> &PublicKey {
80 &self.public_key
81 }
82
83 pub fn uid(&self) -> &str {
85 &self.uid
86 }
87
88 pub fn consensus_config(
90 &self,
91 ) -> &consensus::Builder<Control<PublicKey>, Context, SocketManager<PublicKey>> {
92 &self.consensus_config
93 }
94
95 pub fn consensus_config_mut(
97 &mut self,
98 ) -> &mut consensus::Builder<Control<PublicKey>, Context, SocketManager<PublicKey>> {
99 &mut self.consensus_config
100 }
101
102 pub fn oracle(&self) -> &Oracle<PublicKey> {
104 &self.oracle
105 }
106
107 pub async fn start(&mut self) {
113 self.start_execution().await;
114 self.start_consensus().await;
115 }
116
117 #[instrument(skip_all, fields(last_db_block = self.last_db_block_on_stop))]
122 async fn start_execution(&mut self) {
123 assert!(
124 self.execution_node.is_none(),
125 "execution node is already running for {}",
126 self.uid
127 );
128
129 if self.execution_database.is_none() {
131 let db_path = self.execution_node_datadir.join("db");
132 self.execution_database = Some(Arc::new(
133 reth_db::init_db(db_path, DatabaseArguments::default())
134 .expect("failed to init database")
135 .with_metrics(),
136 ));
137 }
138
139 let execution_node = self
140 .execution_runtime
141 .spawn_node(
142 &execution_runtime::execution_node_name(&self.public_key),
143 self.execution_config.clone(),
144 self.execution_database.as_ref().unwrap().clone(),
145 )
146 .await
147 .expect("must be able to spawn execution node");
148
149 if let Some(expected_block) = self.last_db_block_on_stop {
151 let current_db_block = execution_node
152 .node
153 .provider
154 .database_provider_ro()
155 .expect("failed to get database provider")
156 .last_block_number()
157 .expect("failed to get last block number from database");
158
159 assert!(current_db_block >= expected_block,);
160 }
161
162 self.consensus_config = self
164 .consensus_config
165 .clone()
166 .with_execution_node(execution_node.node.clone());
167 self.execution_node = Some(execution_node);
168 debug!(%self.uid, "started execution node for testing node");
169 }
170
171 async fn start_consensus(&mut self) {
176 assert!(
177 self.consensus_handle.is_none(),
178 "consensus is already running for {}",
179 self.uid
180 );
181 let engine = self
182 .consensus_config
183 .clone()
184 .try_init()
185 .await
186 .expect("must be able to start the engine");
187
188 let pending = self
189 .oracle
190 .control(self.public_key.clone())
191 .register(0)
192 .await
193 .unwrap();
194 let recovered = self
195 .oracle
196 .control(self.public_key.clone())
197 .register(1)
198 .await
199 .unwrap();
200 let resolver = self
201 .oracle
202 .control(self.public_key.clone())
203 .register(2)
204 .await
205 .unwrap();
206 let broadcast = self
207 .oracle
208 .control(self.public_key.clone())
209 .register(3)
210 .await
211 .unwrap();
212 let marshal = self
213 .oracle
214 .control(self.public_key.clone())
215 .register(4)
216 .await
217 .unwrap();
218 let dkg = self
219 .oracle
220 .control(self.public_key.clone())
221 .register(5)
222 .await
223 .unwrap();
224 let boundary_certs = self
225 .oracle
226 .control(self.public_key.clone())
227 .register(6)
228 .await
229 .unwrap();
230 let subblocks = self
231 .oracle
232 .control(self.public_key.clone())
233 .register(7)
234 .await
235 .unwrap();
236
237 let consensus_handle = engine.start(
238 pending,
239 recovered,
240 resolver,
241 broadcast,
242 marshal,
243 dkg,
244 boundary_certs,
245 subblocks,
246 );
247
248 self.consensus_handle = Some(consensus_handle);
249 debug!(%self.uid, "started consensus for testing node");
250 }
251
252 pub async fn stop(&mut self) {
257 self.stop_consensus().await;
258 self.stop_execution().await
259 }
260
261 #[instrument(skip_all)]
266 async fn stop_consensus(&mut self) {
267 let handle = self
268 .consensus_handle
269 .take()
270 .unwrap_or_else(|| panic!("consensus is not running for {}, cannot stop", self.uid));
271 handle.abort();
272
273 let _ = handle.await;
275
276 debug!(%self.uid, "stopped consensus for testing node");
277 }
278
279 #[instrument(skip_all)]
287 async fn stop_execution(&mut self) {
288 debug!(%self.uid, "stopping execution node for testing node");
289 let execution_node = self.execution_node.take().unwrap_or_else(|| {
290 panic!(
291 "execution node is not running for {}, cannot stop",
292 self.uid
293 )
294 });
295
296 let last_db_block = execution_node
297 .node
298 .provider
299 .database_provider_ro()
300 .expect("failed to get database provider")
301 .last_block_number()
302 .expect("failed to get last block number from database");
303 tracing::debug!(
304 last_db_block,
305 "storing last block block number to verify restart"
306 );
307 self.last_db_block_on_stop = Some(last_db_block);
308
309 execution_node.shutdown().await;
310
311 drop(
317 self.execution_database
318 .as_ref()
319 .expect("database should exist")
320 .tx_mut()
321 .expect("failed to acquire rw transaction"),
322 );
323
324 debug!(%self.uid, "stopped execution node for testing node");
325 }
326
327 pub fn is_running(&self) -> bool {
329 self.consensus_handle.is_some() && self.execution_node.is_some()
330 }
331
332 pub fn is_consensus_running(&self) -> bool {
334 self.consensus_handle.is_some()
335 }
336
337 pub fn is_execution_running(&self) -> bool {
339 self.execution_node.is_some()
340 }
341
342 pub fn execution(&self) -> &tempo_node::TempoFullNode {
347 &self
348 .execution_node
349 .as_ref()
350 .expect("execution node is not running")
351 .node
352 }
353
354 pub fn consensus(&self) -> &Handle<eyre::Result<()>> {
359 self.consensus_handle
360 .as_ref()
361 .expect("consensus is not running")
362 }
363
364 pub fn execution_provider(
369 &self,
370 ) -> BlockchainProvider<NodeTypesWithDBAdapter<TempoNode, Arc<DatabaseEnv>>> {
371 self.execution().provider.clone()
372 }
373
374 pub fn execution_provider_offline(
378 &self,
379 ) -> BlockchainProvider<NodeTypesWithDBAdapter<TempoNode, Arc<DatabaseEnv>>> {
380 let database = Arc::new(
384 open_db_read_only(
385 self.execution_node_datadir.join("db"),
386 DatabaseArguments::default(),
387 )
388 .expect("failed to open execution node database")
389 .with_metrics(),
390 );
391
392 let static_file_provider =
393 StaticFileProvider::read_only(self.execution_node_datadir.join("static_files"), true)
394 .expect("failed to open static files");
395
396 let provider_factory = ProviderFactory::<NodeTypesWithDBAdapter<TempoNode, _>>::new(
397 database,
398 Arc::new(execution_runtime::chainspec()),
399 static_file_provider,
400 )
401 .expect("failed to create provider factory");
402
403 BlockchainProvider::new(provider_factory).expect("failed to create blockchain provider")
404 }
405}
406
407#[cfg(test)]
408mod tests {
409 use crate::{Setup, setup_validators};
410 use alloy::providers::{Provider, ProviderBuilder};
411 use commonware_p2p::simulated::Link;
412 use commonware_runtime::{
413 Runner as _,
414 deterministic::{Config, Runner},
415 };
416 use std::time::Duration;
417 use tokio::sync::{oneshot, oneshot::Sender};
418
419 enum Message {
420 Stop(Sender<()>),
421 Start(Sender<std::net::SocketAddr>),
422 }
423
424 async fn start_and_verify(tx_msg: &tokio::sync::mpsc::UnboundedSender<Message>) -> String {
426 let (tx_rpc_addr, rx_rpc_addr) = oneshot::channel();
427 let _ = tx_msg.send(Message::Start(tx_rpc_addr));
428 let rpc_addr = rx_rpc_addr.await.unwrap();
429 let rpc_url = format!("http://{rpc_addr}");
430
431 let provider = ProviderBuilder::new().connect_http(rpc_url.parse().unwrap());
433 let block_number = provider.get_block_number().await;
434 assert!(block_number.is_ok(), "RPC should be accessible after start");
435
436 rpc_url
437 }
438
439 #[tokio::test]
440 async fn test_restart() {
441 let _ = tempo_eyre::install();
443
444 let runner = Runner::from(Config::default().with_seed(0));
445 let (tx_msg, mut rx_msg) = tokio::sync::mpsc::unbounded_channel::<Message>();
446
447 std::thread::spawn(move || {
448 runner.start(|context| async move {
449 let setup = Setup::new()
450 .how_many_signers(1)
451 .linkage(Link {
452 latency: Duration::from_millis(10),
453 jitter: Duration::from_millis(1),
454 success_rate: 1.0,
455 })
456 .epoch_length(100);
457
458 let (mut nodes, _execution_runtime) =
459 setup_validators(context.clone(), setup).await;
460
461 let mut node = nodes.pop().unwrap();
462
463 loop {
464 match rx_msg.blocking_recv() {
465 Some(Message::Stop(tx_stopped)) => {
466 node.stop().await;
467 assert!(!node.is_running(), "node should not be running after stop");
468 assert!(
469 !node.is_consensus_running(),
470 "consensus should not be running after stop"
471 );
472 assert!(
473 !node.is_execution_running(),
474 "execution should not be running after stop"
475 );
476
477 let _ = tx_stopped.send(());
478 }
479 Some(Message::Start(tx_rpc_addr)) => {
480 node.start().await;
481 assert!(node.is_running(), "node should be running after start");
482
483 let rpc_addr = node
485 .execution()
486 .rpc_server_handles
487 .rpc
488 .http_local_addr()
489 .expect("http rpc server should be running");
490
491 let _ = tx_rpc_addr.send(rpc_addr);
492 }
493 None => {
494 break;
495 }
496 }
497 }
498 });
499 });
500
501 let rpc_url = start_and_verify(&tx_msg).await;
503
504 let (tx_stopped, rx_stopped) = oneshot::channel();
506 let _ = tx_msg.send(Message::Stop(tx_stopped));
507 rx_stopped.await.unwrap();
508
509 let provider = ProviderBuilder::new().connect_http(rpc_url.parse().unwrap());
511 let result =
512 tokio::time::timeout(Duration::from_millis(500), provider.get_block_number()).await;
513 assert!(
514 result.is_err() || result.unwrap().is_err(),
515 "RPC should not be accessible after stopping"
516 );
517
518 start_and_verify(&tx_msg).await;
520 }
521}