1use crate::execution_runtime::{self, ExecutionNode, ExecutionNodeConfig, ExecutionRuntimeHandle};
4use alloy_primitives::Address;
5use commonware_cryptography::{
6 Signer as _,
7 ed25519::{PrivateKey, PublicKey},
8};
9use commonware_p2p::simulated::{Control, Oracle, SocketManager};
10use commonware_runtime::{Handle, Metrics as _, deterministic::Context};
11use reth_db::{Database, DatabaseEnv, mdbx::DatabaseArguments, open_db_read_only};
12use reth_ethereum::{
13 provider::{
14 DatabaseProviderFactory, ProviderFactory, RocksDBProviderFactory,
15 providers::{BlockchainProvider, RocksDBProvider, StaticFileProvider},
16 },
17 storage::BlockNumReader,
18};
19use reth_node_builder::NodeTypesWithDBAdapter;
20use std::{
21 net::{IpAddr, SocketAddr},
22 path::PathBuf,
23 sync::Arc,
24};
25use tempo_commonware_node::{
26 BROADCASTER_CHANNEL_IDENT, BROADCASTER_LIMIT, CERTIFICATES_CHANNEL_IDENT, CERTIFICATES_LIMIT,
27 DKG_CHANNEL_IDENT, DKG_LIMIT, MARSHAL_CHANNEL_IDENT, MARSHAL_LIMIT, RESOLVER_CHANNEL_IDENT,
28 RESOLVER_LIMIT, SUBBLOCKS_CHANNEL_IDENT, SUBBLOCKS_LIMIT, VOTES_CHANNEL_IDENT, VOTES_LIMIT,
29 consensus,
30};
31use tempo_node::node::TempoNode;
32use tracing::{debug, instrument};
33
34pub struct TestingNode<TClock>
36where
37 TClock: commonware_runtime::Clock,
38{
39 pub uid: String,
41 pub private_key: PrivateKey,
43 pub oracle: Oracle<PublicKey, TClock>,
45 pub consensus_config:
47 consensus::Builder<Control<PublicKey, TClock>, SocketManager<PublicKey, TClock>>,
48 pub consensus_handle: Option<Handle<eyre::Result<()>>>,
50 pub execution_node_datadir: PathBuf,
52 pub execution_node: Option<ExecutionNode>,
54 pub execution_runtime: ExecutionRuntimeHandle,
56 pub execution_config: ExecutionNodeConfig,
58 pub execution_database: Option<DatabaseEnv>,
60 pub execution_rocksdb: Option<RocksDBProvider>,
62 pub execution_node_name: String,
65 pub last_db_block_on_stop: Option<u64>,
67 pub network_address: SocketAddr,
70 pub chain_address: Address,
73
74 n_starts: u32,
75}
76
77impl<TClock> TestingNode<TClock>
78where
79 TClock: commonware_runtime::Clock,
80{
81 #[expect(clippy::too_many_arguments, reason = "quickly threw this together")]
86 pub fn new(
87 uid: String,
88 private_key: PrivateKey,
89 oracle: Oracle<PublicKey, TClock>,
90 consensus_config: consensus::Builder<
91 Control<PublicKey, TClock>,
92 SocketManager<PublicKey, TClock>,
93 >,
94 execution_runtime: ExecutionRuntimeHandle,
95 execution_config: ExecutionNodeConfig,
96 network_address: SocketAddr,
97 chain_address: Address,
98 ) -> Self {
99 let public_key = private_key.public_key();
100 let execution_node_datadir = execution_runtime
101 .nodes_dir()
102 .join(execution_runtime::execution_node_name(&public_key));
103
104 let execution_node_name = execution_runtime::execution_node_name(&public_key);
105 Self {
106 uid,
107 private_key,
108 oracle,
109 consensus_config,
110 consensus_handle: None,
111 execution_node: None,
112 execution_node_datadir,
113 execution_runtime,
114 execution_config,
115 execution_node_name,
116 execution_database: None,
117 execution_rocksdb: None,
118 last_db_block_on_stop: None,
119 network_address,
120 chain_address,
121
122 n_starts: 0,
123 }
124 }
125
126 pub fn fee_recipient(&self) -> Address {
127 Address::ZERO
128 }
129
130 pub fn private_key(&self) -> &PrivateKey {
131 &self.private_key
132 }
133
134 pub fn public_key(&self) -> PublicKey {
136 self.private_key.public_key()
137 }
138
139 pub fn uid(&self) -> &str {
141 &self.uid
142 }
143
144 pub fn metric_prefix(&self) -> String {
149 assert!(self.n_starts > 0, "node has never been started");
150 format!("{}_{}", self.uid, self.n_starts - 1)
151 }
152
153 pub fn consensus_config(
155 &self,
156 ) -> &consensus::Builder<Control<PublicKey, TClock>, SocketManager<PublicKey, TClock>> {
157 &self.consensus_config
158 }
159
160 pub fn consensus_config_mut(
162 &mut self,
163 ) -> &mut consensus::Builder<Control<PublicKey, TClock>, SocketManager<PublicKey, TClock>> {
164 &mut self.consensus_config
165 }
166
167 pub fn oracle(&self) -> &Oracle<PublicKey, TClock> {
169 &self.oracle
170 }
171
172 pub fn ingress(&self) -> SocketAddr {
173 self.network_address
174 }
175
176 pub fn egress(&self) -> IpAddr {
177 self.network_address.ip()
178 }
179
180 pub fn is_signer(&self) -> bool {
182 self.consensus_config.share.is_some()
183 }
184
185 pub fn is_verifier(&self) -> bool {
187 self.consensus_config.share.is_none()
188 }
189
190 pub async fn start(&mut self, context: &Context) {
196 self.start_execution().await;
197 self.start_consensus(context).await;
198 self.n_starts += 1;
199 }
200
201 #[instrument(skip_all, fields(last_db_block = self.last_db_block_on_stop))]
206 async fn start_execution(&mut self) {
207 assert!(
208 self.execution_node.is_none(),
209 "execution node is already running for {}",
210 self.uid
211 );
212
213 if self.execution_database.is_none() {
215 let db_path = self.execution_node_datadir.join("db");
216 self.execution_database = Some(
217 reth_db::init_db(db_path, DatabaseArguments::default())
218 .expect("failed to init database")
219 .with_metrics(),
220 );
221 }
222
223 let execution_node = self
224 .execution_runtime
225 .spawn_node(
226 &self.execution_node_name,
227 self.execution_config.clone(),
228 self.execution_database.as_ref().unwrap().clone(),
229 self.execution_rocksdb.clone(),
230 )
231 .await
232 .expect("must be able to spawn execution node");
233
234 if self.execution_rocksdb.is_none() {
235 self.execution_rocksdb = Some(execution_node.node.provider().rocksdb_provider());
236 }
237
238 if let Some(expected_block) = self.last_db_block_on_stop {
240 let current_db_block = execution_node
241 .node
242 .provider
243 .database_provider_ro()
244 .expect("failed to get database provider")
245 .last_block_number()
246 .expect("failed to get last block number from database");
247
248 assert!(current_db_block >= expected_block,);
249 }
250
251 self.consensus_config = self
253 .consensus_config
254 .clone()
255 .with_execution_node(execution_node.node.clone());
256 self.execution_node = Some(execution_node);
257 debug!(%self.uid, "started execution node for testing node");
258 }
259
260 async fn start_consensus(&mut self, context: &Context) {
265 assert!(
266 self.consensus_handle.is_none(),
267 "consensus is already running for {}",
268 self.uid
269 );
270 let engine = self
271 .consensus_config
272 .clone()
273 .try_init(context.with_label(&format!("{}_{}", self.uid, self.n_starts)))
274 .await
275 .expect("must be able to start the engine");
276
277 let votes = self
278 .oracle
279 .control(self.public_key())
280 .register(VOTES_CHANNEL_IDENT, VOTES_LIMIT)
281 .await
282 .unwrap();
283 let certificates = self
284 .oracle
285 .control(self.public_key())
286 .register(CERTIFICATES_CHANNEL_IDENT, CERTIFICATES_LIMIT)
287 .await
288 .unwrap();
289 let resolver = self
290 .oracle
291 .control(self.public_key())
292 .register(RESOLVER_CHANNEL_IDENT, RESOLVER_LIMIT)
293 .await
294 .unwrap();
295 let broadcast = self
296 .oracle
297 .control(self.public_key())
298 .register(BROADCASTER_CHANNEL_IDENT, BROADCASTER_LIMIT)
299 .await
300 .unwrap();
301 let marshal = self
302 .oracle
303 .control(self.public_key())
304 .register(MARSHAL_CHANNEL_IDENT, MARSHAL_LIMIT)
305 .await
306 .unwrap();
307 let dkg = self
308 .oracle
309 .control(self.public_key())
310 .register(DKG_CHANNEL_IDENT, DKG_LIMIT)
311 .await
312 .unwrap();
313 let subblocks = self
314 .oracle
315 .control(self.public_key())
316 .register(SUBBLOCKS_CHANNEL_IDENT, SUBBLOCKS_LIMIT)
317 .await
318 .unwrap();
319
320 let consensus_handle = engine.start(
321 votes,
322 certificates,
323 resolver,
324 broadcast,
325 marshal,
326 dkg,
327 subblocks,
328 );
329
330 self.consensus_handle = Some(consensus_handle);
331 debug!(%self.uid, "started consensus for testing node");
332 }
333
334 pub async fn stop(&mut self) {
339 self.stop_consensus().await;
340 self.stop_execution().await;
341 }
342
343 #[instrument(skip_all)]
348 async fn stop_consensus(&mut self) {
349 let handle = self
350 .consensus_handle
351 .take()
352 .unwrap_or_else(|| panic!("consensus is not running for {}, cannot stop", self.uid));
353 handle.abort();
354
355 let _ = handle.await;
357
358 debug!(%self.uid, "stopped consensus for testing node");
359 }
360
361 #[instrument(skip_all)]
369 async fn stop_execution(&mut self) {
370 debug!(%self.uid, "stopping execution node for testing node");
371 let execution_node = self.execution_node.take().unwrap_or_else(|| {
372 panic!(
373 "execution node is not running for {}, cannot stop",
374 self.uid
375 )
376 });
377
378 let last_db_block = execution_node
379 .node
380 .provider
381 .database_provider_ro()
382 .expect("failed to get database provider")
383 .last_block_number()
384 .expect("failed to get last block number from database");
385 tracing::debug!(
386 last_db_block,
387 "storing last block block number to verify restart"
388 );
389 self.last_db_block_on_stop = Some(last_db_block);
390
391 execution_node.shutdown().await;
392
393 drop(
399 self.execution_database
400 .as_ref()
401 .expect("database should exist")
402 .tx_mut()
403 .expect("failed to acquire rw transaction"),
404 );
405
406 debug!(%self.uid, "stopped execution node for testing node");
407 }
408
409 pub fn is_running(&self) -> bool {
411 self.consensus_handle.is_some() && self.execution_node.is_some()
412 }
413
414 pub fn is_consensus_running(&self) -> bool {
416 self.consensus_handle.is_some()
417 }
418
419 pub fn is_execution_running(&self) -> bool {
421 self.execution_node.is_some()
422 }
423
424 pub fn execution(&self) -> &tempo_node::TempoFullNode {
429 &self
430 .execution_node
431 .as_ref()
432 .expect("execution node is not running")
433 .node
434 }
435
436 pub fn consensus(&self) -> &Handle<eyre::Result<()>> {
441 self.consensus_handle
442 .as_ref()
443 .expect("consensus is not running")
444 }
445
446 pub fn execution_provider(
451 &self,
452 ) -> BlockchainProvider<NodeTypesWithDBAdapter<TempoNode, DatabaseEnv>> {
453 self.execution().provider.clone()
454 }
455
456 pub fn execution_provider_offline(
460 &self,
461 ) -> BlockchainProvider<NodeTypesWithDBAdapter<TempoNode, DatabaseEnv>> {
462 let database = open_db_read_only(
466 self.execution_node_datadir.join("db"),
467 DatabaseArguments::default(),
468 )
469 .expect("failed to open execution node database")
470 .with_metrics();
471
472 let static_file_provider =
473 StaticFileProvider::read_only(self.execution_node_datadir.join("static_files"), true)
474 .expect("failed to open static files");
475
476 let rocksdb = RocksDBProvider::builder(self.execution_node_datadir.join("rocksdb"))
477 .build()
478 .unwrap();
479
480 let provider_factory = ProviderFactory::<NodeTypesWithDBAdapter<TempoNode, _>>::new(
481 database,
482 Arc::new(execution_runtime::chainspec()),
483 static_file_provider,
484 rocksdb,
485 reth_ethereum::tasks::Runtime::test(),
486 )
487 .expect("failed to create provider factory");
488
489 BlockchainProvider::new(provider_factory).expect("failed to create blockchain provider")
490 }
491}
492
493#[cfg(test)]
494mod tests {
495 use crate::{Setup, setup_validators};
496 use alloy::providers::{Provider, ProviderBuilder};
497 use commonware_p2p::simulated::Link;
498 use commonware_runtime::{
499 Runner as _,
500 deterministic::{Config, Runner},
501 };
502 use std::time::Duration;
503 use tokio::sync::{oneshot, oneshot::Sender};
504
505 enum Message {
506 Stop(Sender<()>),
507 Start(Sender<std::net::SocketAddr>),
508 }
509
510 async fn start_and_verify(tx_msg: &tokio::sync::mpsc::UnboundedSender<Message>) -> String {
512 let (tx_rpc_addr, rx_rpc_addr) = oneshot::channel();
513 let _ = tx_msg.send(Message::Start(tx_rpc_addr));
514 let rpc_addr = rx_rpc_addr.await.unwrap();
515 let rpc_url = format!("http://{rpc_addr}");
516
517 let provider = ProviderBuilder::new().connect_http(rpc_url.parse().unwrap());
519 let block_number = provider.get_block_number().await;
520 assert!(block_number.is_ok(), "RPC should be accessible after start");
521
522 rpc_url
523 }
524
525 #[tokio::test]
526 async fn just_restart() {
527 let _ = tempo_eyre::install();
529
530 let runner = Runner::from(Config::default().with_seed(0));
531 let (tx_msg, mut rx_msg) = tokio::sync::mpsc::unbounded_channel::<Message>();
532
533 std::thread::spawn(move || {
534 runner.start(|mut context| async move {
535 let setup = Setup::new()
536 .how_many_signers(1)
537 .linkage(Link {
538 latency: Duration::from_millis(10),
539 jitter: Duration::from_millis(1),
540 success_rate: 1.0,
541 })
542 .epoch_length(100);
543
544 let (mut nodes, _execution_runtime) = setup_validators(&mut context, setup).await;
545
546 let mut node = nodes.pop().unwrap();
547
548 loop {
549 match rx_msg.blocking_recv() {
550 Some(Message::Stop(tx_stopped)) => {
551 node.stop().await;
552 assert!(!node.is_running(), "node should not be running after stop");
553 assert!(
554 !node.is_consensus_running(),
555 "consensus should not be running after stop"
556 );
557 assert!(
558 !node.is_execution_running(),
559 "execution should not be running after stop"
560 );
561
562 let _ = tx_stopped.send(());
563 }
564 Some(Message::Start(tx_rpc_addr)) => {
565 node.start(&context).await;
566 assert!(node.is_running(), "node should be running after start");
567
568 let rpc_addr = node
570 .execution()
571 .rpc_server_handles
572 .rpc
573 .http_local_addr()
574 .expect("http rpc server should be running");
575
576 let _ = tx_rpc_addr.send(rpc_addr);
577 }
578 None => {
579 break;
580 }
581 }
582 }
583 });
584 });
585
586 let rpc_url = start_and_verify(&tx_msg).await;
588
589 let (tx_stopped, rx_stopped) = oneshot::channel();
591 let _ = tx_msg.send(Message::Stop(tx_stopped));
592 rx_stopped.await.unwrap();
593
594 let provider = ProviderBuilder::new().connect_http(rpc_url.parse().unwrap());
596 let result =
597 tokio::time::timeout(Duration::from_millis(500), provider.get_block_number()).await;
598 assert!(
599 result.is_err() || result.unwrap().is_err(),
600 "RPC should not be accessible after stopping"
601 );
602
603 start_and_verify(&tx_msg).await;
605 }
606}