1use std::{
3 net::{IpAddr, SocketAddr},
4 path::{Path, PathBuf},
5 sync::Arc,
6 time::Duration,
7};
8
9use alloy::{
10 providers::ProviderBuilder,
11 rpc::types::TransactionReceipt,
12 signers::{local::MnemonicBuilder, utils::secret_key_to_address},
13 transports::http::reqwest::Url,
14};
15use alloy_evm::{EvmFactory as _, revm::inspector::JournalExt as _};
16use alloy_genesis::{Genesis, GenesisAccount};
17use alloy_primitives::{Address, B256, Keccak256, U256};
18use commonware_codec::Encode;
19use commonware_cryptography::{
20 Signer,
21 ed25519::{PrivateKey, PublicKey, Signature},
22};
23use commonware_runtime::Clock;
24use commonware_utils::ordered;
25use eyre::{OptionExt as _, WrapErr as _};
26use futures::{StreamExt, future::BoxFuture};
27use reth_chainspec::EthChainSpec;
28use reth_db::mdbx::DatabaseEnv;
29use reth_ethereum::{
30 evm::{
31 primitives::EvmEnv,
32 revm::db::{CacheDB, EmptyDB},
33 },
34 network::{
35 Peers as _,
36 api::{
37 NetworkEventListenerProvider, PeersInfo,
38 events::{NetworkEvent, PeerEvent},
39 },
40 },
41 provider::providers::RocksDBProvider,
42 tasks::Runtime,
43};
44use reth_network_peers::{NodeRecord, TrustedPeer};
45use reth_node_builder::{NodeBuilder, NodeConfig};
46use reth_node_core::{
47 args::{DatadirArgs, PayloadBuilderArgs, RpcServerArgs, StorageArgs},
48 exit::NodeExitFuture,
49};
50use reth_rpc_builder::RpcModuleSelection;
51use secp256k1::SecretKey;
52use std::net::TcpListener;
53use tempfile::TempDir;
54use tempo_chainspec::TempoChainSpec;
55use tempo_commonware_node::feed::FeedStateHandle;
56use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
57use tempo_node::{
58 TempoFullNode,
59 evm::{TempoEvmFactory, evm::TempoEvm},
60 node::TempoNode,
61 rpc::consensus::{TempoConsensusApiServer, TempoConsensusRpc},
62};
63use tempo_precompiles::{
64 VALIDATOR_CONFIG_ADDRESS, VALIDATOR_CONFIG_V2_ADDRESS,
65 storage::StorageCtx,
66 validator_config::{IValidatorConfig, ValidatorConfig},
67 validator_config_v2::{
68 IValidatorConfigV2, VALIDATOR_NS_ADD, VALIDATOR_NS_ROTATE, ValidatorConfigV2,
69 },
70};
71use tokio::sync::oneshot;
72
73use crate::{ConsensusNodeConfig, TestingNode};
74
75const ADMIN_INDEX: u32 = 0;
76const VALIDATOR_START_INDEX: u32 = 1;
77
78pub const TEST_MNEMONIC: &str = "test test test test test test test test test test test junk";
80
81#[derive(Default, Debug)]
82pub struct Builder {
83 epoch_length: Option<u64>,
84 initial_dkg_outcome: Option<OnchainDkgOutcome>,
85 t2_time: Option<u64>,
86 validators: Option<ordered::Map<PublicKey, ConsensusNodeConfig>>,
87}
88
89impl Builder {
90 pub fn new() -> Self {
91 Self {
92 epoch_length: None,
93 initial_dkg_outcome: None,
94 t2_time: None,
95 validators: None,
96 }
97 }
98
99 pub fn with_epoch_length(self, epoch_length: u64) -> Self {
100 Self {
101 epoch_length: Some(epoch_length),
102 ..self
103 }
104 }
105
106 pub fn with_initial_dkg_outcome(self, initial_dkg_outcome: OnchainDkgOutcome) -> Self {
107 Self {
108 initial_dkg_outcome: Some(initial_dkg_outcome),
109 ..self
110 }
111 }
112
113 pub fn with_validators(self, validators: ordered::Map<PublicKey, ConsensusNodeConfig>) -> Self {
114 Self {
115 validators: Some(validators),
116 ..self
117 }
118 }
119
120 pub fn with_t2_time(self, t2_time: u64) -> Self {
121 Self {
122 t2_time: Some(t2_time),
123 ..self
124 }
125 }
126
127 pub fn launch(self) -> eyre::Result<ExecutionRuntime> {
128 let Self {
129 epoch_length,
130 initial_dkg_outcome,
131 t2_time,
132 validators,
133 } = self;
134
135 let epoch_length = epoch_length.ok_or_eyre("must specify epoch length")?;
136 let initial_dkg_outcome =
137 initial_dkg_outcome.ok_or_eyre("must specify initial DKG outcome")?;
138 let t2_time = t2_time.ok_or_eyre("must specify t2 time")?;
139 let validators = validators.ok_or_eyre("must specify validators")?;
140
141 assert_eq!(
142 initial_dkg_outcome.next_players(),
143 &ordered::Set::from_iter_dedup(
144 validators
145 .iter_pairs()
146 .filter_map(|(key, val)| val.share.is_some().then_some(key.clone()))
147 )
148 );
149
150 let mut genesis = genesis();
151 genesis
152 .config
153 .extra_fields
154 .insert_value("epochLength".to_string(), epoch_length)
155 .unwrap();
156 genesis
157 .config
158 .extra_fields
159 .insert_value("t2Time".to_string(), t2_time)
160 .unwrap();
161
162 genesis.extra_data = initial_dkg_outcome.encode().to_vec().into();
163
164 genesis.alloc.remove(&VALIDATOR_CONFIG_ADDRESS);
166 genesis.alloc.remove(&VALIDATOR_CONFIG_V2_ADDRESS);
167
168 let mut evm = setup_tempo_evm(genesis.config.chain_id);
169 {
170 let cx = evm.ctx_mut();
171 StorageCtx::enter_evm(&mut cx.journaled_state, &cx.block, &cx.cfg, &cx.tx, || {
172 let mut validator_config = ValidatorConfig::new();
174 validator_config
175 .initialize(admin())
176 .wrap_err("failed to initialize validator config v1")
177 .unwrap();
178
179 let mut validator_config_v2 = ValidatorConfigV2::new();
180 if t2_time == 0 {
181 validator_config_v2
182 .initialize(admin())
183 .wrap_err("failed to initialize validator config v2")
184 .unwrap();
185 }
186
187 for (public_key, validator) in validators {
188 if let ConsensusNodeConfig {
189 address,
190 ingress,
191 egress,
192 fee_recipient,
193 private_key,
194 share: Some(_),
195 } = validator
196 {
197 validator_config
198 .add_validator(
199 admin(),
200 IValidatorConfig::addValidatorCall {
201 newValidatorAddress: address,
202 publicKey: public_key.encode().as_ref().try_into().unwrap(),
203 active: true,
204 inboundAddress: ingress.to_string(),
205 outboundAddress: egress.to_string(),
206 },
207 )
208 .unwrap();
209
210 if t2_time == 0 {
211 validator_config_v2
212 .add_validator(
213 admin(),
214 IValidatorConfigV2::addValidatorCall {
215 validatorAddress: address,
216 publicKey: public_key.encode().as_ref().try_into().unwrap(),
217 ingress: ingress.to_string(),
218 egress: egress.ip().to_string(),
219 feeRecipient: fee_recipient,
220 signature: sign_add_validator_args(
221 genesis.config.chain_id,
222 &private_key,
223 address,
224 ingress,
225 egress.ip(),
226 fee_recipient,
227 )
228 .encode()
229 .to_vec()
230 .into(),
231 },
232 )
233 .unwrap();
234 }
235 }
236 }
237 })
238 }
239
240 let evm_state = evm.ctx_mut().journaled_state.evm_state();
241 for (address, account) in evm_state.iter() {
242 let storage = if !account.storage.is_empty() {
243 Some(
244 account
245 .storage
246 .iter()
247 .map(|(key, val)| ((*key).into(), val.present_value.into()))
248 .collect(),
249 )
250 } else {
251 None
252 };
253 genesis.alloc.insert(
254 *address,
255 GenesisAccount {
256 nonce: Some(account.info.nonce),
257 code: account.info.code.as_ref().map(|c| c.original_bytes()),
258 storage,
259 ..Default::default()
260 },
261 );
262 }
263
264 Ok(ExecutionRuntime::with_chain_spec(
265 TempoChainSpec::from_genesis(genesis),
266 ))
267 }
268}
269
270#[derive(Clone, Debug)]
272pub struct ExecutionNodeConfig {
273 pub secret_key: B256,
275 pub trusted_peers: Vec<String>,
277 pub port: u16,
279 pub validator_key: Option<B256>,
281 pub feed_state: Option<FeedStateHandle>,
283}
284
285impl ExecutionNodeConfig {
286 pub fn generator() -> ExecutionNodeConfigGenerator {
288 ExecutionNodeConfigGenerator::default()
289 }
290}
291
292#[derive(Default)]
294pub struct ExecutionNodeConfigGenerator {
295 count: u32,
296 connect_peers: bool,
297}
298
299impl ExecutionNodeConfigGenerator {
300 pub fn with_count(mut self, count: u32) -> Self {
302 self.count = count;
303 self
304 }
305
306 pub fn with_peers(mut self, connect: bool) -> Self {
308 self.connect_peers = connect;
309 self
310 }
311
312 pub fn generate(self) -> Vec<ExecutionNodeConfig> {
314 if !self.connect_peers {
315 return (0..self.count)
317 .map(|_| ExecutionNodeConfig {
318 secret_key: B256::random(),
319 trusted_peers: vec![],
320 port: 0,
321 validator_key: None,
322 feed_state: None,
323 })
324 .collect();
325 }
326
327 let ports: Vec<u16> = (0..self.count)
329 .map(|_| {
330 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
332 let port = listener
333 .local_addr()
334 .expect("failed to get local addr")
335 .port();
336 drop(listener);
337 port
338 })
339 .collect();
340
341 let mut configs: Vec<ExecutionNodeConfig> = ports
342 .into_iter()
343 .map(|port| ExecutionNodeConfig {
344 secret_key: B256::random(),
345 trusted_peers: vec![],
346 port,
347 validator_key: None,
348 feed_state: None,
349 })
350 .collect();
351
352 let enode_urls: Vec<String> = configs
353 .iter()
354 .map(|config| {
355 let secret_key =
356 SecretKey::from_slice(config.secret_key.as_slice()).expect("valid secret key");
357 let addr = SocketAddr::from(([127, 0, 0, 1], config.port));
358 NodeRecord::from_secret_key(addr, &secret_key).to_string()
359 })
360 .collect();
361
362 for (i, config) in configs.iter_mut().enumerate() {
363 for (j, enode_url) in enode_urls.iter().enumerate() {
364 if i != j {
365 config.trusted_peers.push(enode_url.clone());
366 }
367 }
368 }
369
370 configs
371 }
372}
373
374pub struct ExecutionRuntime {
381 rt: std::thread::JoinHandle<()>,
383
384 _tempdir: TempDir,
386
387 to_runtime: tokio::sync::mpsc::UnboundedSender<Message>,
389}
390
391impl ExecutionRuntime {
392 pub fn builder() -> Builder {
393 Builder::new()
394 }
395
396 pub fn with_chain_spec(chain_spec: TempoChainSpec) -> Self {
398 let tempdir = tempfile::Builder::new()
399 .prefix("tempo_e2e_test")
401 .disable_cleanup(true)
402 .tempdir()
403 .expect("must be able to create a temp directory run tun tests");
404
405 let (to_runtime, mut from_handle) = tokio::sync::mpsc::unbounded_channel();
406
407 let datadir = tempdir.path().to_path_buf();
408 let rt = std::thread::spawn(move || {
409 let rt = tokio::runtime::Runtime::new()
410 .expect("must be able to initialize a runtime to run execution/reth nodes");
411 let wallet = MnemonicBuilder::from_phrase(crate::execution_runtime::TEST_MNEMONIC)
412 .build()
413 .unwrap();
414 rt.block_on(async move {
415 while let Some(msg) = from_handle.recv().await {
416 let runtime = Runtime::test();
418 match msg {
419 Message::AddValidator(add_validator) => {
420 let AddValidator {
421 http_url,
422 address,
423 public_key,
424 addr,
425 response,
426 } = add_validator;
427 let provider = ProviderBuilder::new()
428 .wallet(wallet.clone())
429 .connect_http(http_url);
430 let validator_config =
431 IValidatorConfig::new(VALIDATOR_CONFIG_ADDRESS, provider);
432 let receipt = validator_config
433 .addValidator(
434 address,
435 public_key.encode().as_ref().try_into().unwrap(),
436 true,
437 addr.to_string(),
438 addr.to_string(),
439 )
440 .send()
441 .await
442 .unwrap()
443 .get_receipt()
444 .await
445 .unwrap();
446 let _ = response.send(receipt);
447 }
448 Message::AddValidatorV2(add_validator_v2) => {
449 let AddValidatorV2 {
450 http_url,
451 private_key,
452 address,
453 ingress,
454 egress,
455 fee_recipient,
456 response,
457 } = add_validator_v2;
458 let provider = ProviderBuilder::new()
459 .wallet(wallet.clone())
460 .connect_http(http_url);
461 let validator_config =
462 IValidatorConfigV2::new(VALIDATOR_CONFIG_V2_ADDRESS, provider);
463 let receipt = validator_config
464 .addValidator(
465 address,
466 private_key
467 .public_key()
468 .encode()
469 .as_ref()
470 .try_into()
471 .unwrap(),
472 ingress.to_string(),
473 egress.to_string(),
474 fee_recipient,
475 sign_add_validator_args(
476 EthChainSpec::chain(&chain_spec).id(),
477 &private_key,
478 address,
479 ingress,
480 egress,
481 fee_recipient,
482 )
483 .encode()
484 .to_vec()
485 .into(),
486 )
487 .send()
488 .await
489 .unwrap()
490 .get_receipt()
491 .await
492 .unwrap();
493 let _ = response.send(receipt);
494 }
495 Message::ChangeValidatorStatus(change_validator_status) => {
496 let ChangeValidatorStatus {
497 http_url,
498 active,
499 index,
500 response,
501 } = change_validator_status;
502 let provider = ProviderBuilder::new()
503 .wallet(wallet.clone())
504 .connect_http(http_url);
505 let validator_config =
506 IValidatorConfig::new(VALIDATOR_CONFIG_ADDRESS, provider);
507 let receipt = validator_config
508 .changeValidatorStatusByIndex(index, active)
509 .send()
510 .await
511 .unwrap()
512 .get_receipt()
513 .await
514 .unwrap();
515 let _ = response.send(receipt);
516 }
517 Message::DeactivateValidatorV2(deacivate_validator_v2) => {
518 let DeactivateValidatorV2 {
519 http_url,
520 address,
521 response,
522 } = deacivate_validator_v2;
523 let provider = ProviderBuilder::new()
524 .wallet(wallet.clone())
525 .connect_http(http_url);
526 let validator_config_v2 =
527 IValidatorConfigV2::new(VALIDATOR_CONFIG_V2_ADDRESS, provider);
528 let id = validator_config_v2
529 .validatorByAddress(address)
530 .call()
531 .await
532 .unwrap()
533 .index;
534 let receipt = validator_config_v2
535 .deactivateValidator(id)
536 .send()
537 .await
538 .unwrap()
539 .get_receipt()
540 .await
541 .unwrap();
542 let _ = response.send(receipt);
543 }
544 Message::GetV1Validators(get_v1_validators) => {
545 let GetV1Validators { http_url, response } = get_v1_validators;
546 let provider = ProviderBuilder::new()
547 .wallet(wallet.clone())
548 .connect_http(http_url);
549 let validator_config =
550 IValidatorConfig::new(VALIDATOR_CONFIG_ADDRESS, provider);
551 let validators = validator_config.getValidators().call().await.unwrap();
552 let _ = response.send(validators);
553 }
554 Message::GetV2Validators(get_v2_validators) => {
555 let GetV2Validators { http_url, response } = get_v2_validators;
556 let provider = ProviderBuilder::new()
557 .wallet(wallet.clone())
558 .connect_http(http_url);
559 let validator_config =
560 IValidatorConfigV2::new(VALIDATOR_CONFIG_V2_ADDRESS, provider);
561 let validators =
562 validator_config.getActiveValidators().call().await.unwrap();
563 let _ = response.send(validators);
564 }
565 Message::InitializeIfMigrated(InitializeIfMigrated {
566 http_url,
567 response,
568 }) => {
569 let provider = ProviderBuilder::new()
570 .wallet(wallet.clone())
571 .connect_http(http_url);
572 let validator_config_v2 =
573 IValidatorConfigV2::new(VALIDATOR_CONFIG_V2_ADDRESS, provider);
574 let receipt = validator_config_v2
575 .initializeIfMigrated()
576 .send()
577 .await
578 .unwrap()
579 .get_receipt()
580 .await
581 .unwrap();
582 let _ = response.send(receipt);
583 }
584 Message::MigrateValidator(migrate_validator) => {
585 let MigrateValidator {
586 http_url,
587 index,
588 response,
589 } = migrate_validator;
590 let provider = ProviderBuilder::new()
591 .wallet(wallet.clone())
592 .connect_http(http_url);
593 let validator_config_v2 =
594 IValidatorConfigV2::new(VALIDATOR_CONFIG_V2_ADDRESS, provider);
595 let receipt = validator_config_v2
596 .migrateValidator(index)
597 .send()
598 .await
599 .unwrap()
600 .get_receipt()
601 .await
602 .unwrap();
603 let _ = response.send(receipt);
604 }
605 Message::RotateValidator(rotate_validator) => {
606 let RotateValidator {
607 http_url,
608 private_key,
609 address,
610 ingress,
611 egress,
612 response,
613 } = rotate_validator;
614 let provider = ProviderBuilder::new()
615 .wallet(wallet.clone())
616 .connect_http(http_url);
617 let validator_config =
618 IValidatorConfigV2::new(VALIDATOR_CONFIG_V2_ADDRESS, provider);
619 let id = validator_config
620 .validatorByAddress(address)
621 .call()
622 .await
623 .unwrap()
624 .index;
625 let receipt = validator_config
626 .rotateValidator(
627 id,
628 private_key
629 .public_key()
630 .encode()
631 .as_ref()
632 .try_into()
633 .unwrap(),
634 ingress.to_string(),
635 egress.to_string(),
636 sign_rotate_validator_args(
637 EthChainSpec::chain(&chain_spec).id(),
638 &private_key,
639 address,
640 ingress,
641 egress,
642 )
643 .encode()
644 .to_vec()
645 .into(),
646 )
647 .send()
648 .await
649 .unwrap()
650 .get_receipt()
651 .await
652 .unwrap();
653 let _ = response.send(receipt);
654 }
655 Message::SetNextFullDkgCeremony(set_next_full_dkg_ceremony) => {
656 let SetNextFullDkgCeremony {
657 http_url,
658 epoch,
659 response,
660 } = set_next_full_dkg_ceremony;
661 let provider = ProviderBuilder::new()
662 .wallet(wallet.clone())
663 .connect_http(http_url);
664 let validator_config =
665 IValidatorConfig::new(VALIDATOR_CONFIG_ADDRESS, provider);
666 let receipt = validator_config
667 .setNextFullDkgCeremony(epoch)
668 .send()
669 .await
670 .unwrap()
671 .get_receipt()
672 .await
673 .unwrap();
674 let _ = response.send(receipt);
675 }
676 Message::SetNextFullDkgCeremonyV2(set_next_full_dkg_ceremony_v2) => {
677 let SetNextFullDkgCeremonyV2 {
678 http_url,
679 epoch,
680 response,
681 } = set_next_full_dkg_ceremony_v2;
682 let provider = ProviderBuilder::new()
683 .wallet(wallet.clone())
684 .connect_http(http_url);
685 let validator_config =
686 IValidatorConfigV2::new(VALIDATOR_CONFIG_V2_ADDRESS, provider);
687 let receipt = validator_config
688 .setNetworkIdentityRotationEpoch(epoch)
689 .send()
690 .await
691 .unwrap()
692 .get_receipt()
693 .await
694 .unwrap();
695 let _ = response.send(receipt);
696 }
697 Message::SpawnNode {
698 name,
699 config,
700 database,
701 rocksdb,
702 response,
703 } => {
704 let node = launch_execution_node(
705 runtime,
706 chain_spec.clone(),
707 datadir.join(name),
708 *config,
709 database,
710 rocksdb,
711 )
712 .await
713 .expect("must be able to launch execution nodes");
714 response.send(node).expect(
715 "receiver must hold the return channel until the node is returned",
716 );
717 }
718 Message::RunAsync(fut) => {
719 fut.await;
720 }
721 Message::Stop => {
722 break;
723 }
724 }
725 }
726 })
727 });
728
729 Self {
730 rt,
731 _tempdir: tempdir,
732 to_runtime,
733 }
734 }
735
736 pub fn handle(&self) -> ExecutionRuntimeHandle {
740 ExecutionRuntimeHandle {
741 to_runtime: self.to_runtime.clone(),
742 nodes_dir: self._tempdir.path().to_path_buf(),
743 }
744 }
745
746 pub async fn add_validator(
747 &self,
748 http_url: Url,
749 address: Address,
750 public_key: PublicKey,
751 addr: SocketAddr,
752 ) -> eyre::Result<TransactionReceipt> {
753 let (tx, rx) = oneshot::channel();
754 self.to_runtime
755 .send(
756 AddValidator {
757 http_url,
758 address,
759 public_key,
760 addr,
761 response: tx,
762 }
763 .into(),
764 )
765 .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
766 rx.await
767 .wrap_err("the execution runtime dropped the response channel before sending a receipt")
768 }
769
770 pub async fn add_validator_v2<C: Clock>(
771 &self,
772 http_url: Url,
773 validator: &TestingNode<C>,
774 ) -> eyre::Result<TransactionReceipt> {
775 let (tx, rx) = oneshot::channel();
776 self.to_runtime
777 .send(
778 AddValidatorV2 {
779 http_url,
780 private_key: validator.private_key().clone(),
781 address: validator.chain_address,
782 ingress: validator.ingress(),
783 egress: validator.egress(),
784 fee_recipient: validator.fee_recipient(),
785 response: tx,
786 }
787 .into(),
788 )
789 .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
790 rx.await
791 .wrap_err("the execution runtime dropped the response channel before sending a receipt")
792 }
793
794 pub async fn change_validator_status(
795 &self,
796 http_url: Url,
797 index: u64,
798 active: bool,
799 ) -> eyre::Result<TransactionReceipt> {
800 let (tx, rx) = oneshot::channel();
801 self.to_runtime
802 .send(
803 ChangeValidatorStatus {
804 index,
805 active,
806 http_url,
807 response: tx,
808 }
809 .into(),
810 )
811 .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
812 rx.await
813 .wrap_err("the execution runtime dropped the response channel before sending a receipt")
814 }
815
816 pub async fn deactivate_validator_v2<C: Clock>(
817 &self,
818 http_url: Url,
819 validator: &TestingNode<C>,
820 ) -> eyre::Result<TransactionReceipt> {
821 let (tx, rx) = oneshot::channel();
822 self.to_runtime
823 .send(
824 DeactivateValidatorV2 {
825 http_url,
826 address: validator.chain_address,
827 response: tx,
828 }
829 .into(),
830 )
831 .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
832 rx.await
833 .wrap_err("the execution runtime dropped the response channel before sending a receipt")
834 }
835
836 pub async fn get_v1_validators(
837 &self,
838 http_url: Url,
839 ) -> eyre::Result<Vec<IValidatorConfig::Validator>> {
840 let (tx, rx) = oneshot::channel();
841 self.to_runtime
842 .send(
843 GetV1Validators {
844 http_url,
845 response: tx,
846 }
847 .into(),
848 )
849 .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
850 rx.await
851 .wrap_err("the execution runtime dropped the response channel before sending a receipt")
852 }
853
854 pub async fn get_v2_validators(
855 &self,
856 http_url: Url,
857 ) -> eyre::Result<Vec<IValidatorConfigV2::Validator>> {
858 let (tx, rx) = oneshot::channel();
859 self.to_runtime
860 .send(
861 GetV2Validators {
862 http_url,
863 response: tx,
864 }
865 .into(),
866 )
867 .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
868 rx.await
869 .wrap_err("the execution runtime dropped the response channel before sending a receipt")
870 }
871
872 pub async fn initialize_if_migrated(&self, http_url: Url) -> eyre::Result<TransactionReceipt> {
873 let (response, rx) = oneshot::channel();
874 self.to_runtime
875 .send(InitializeIfMigrated { http_url, response }.into())
876 .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
877 rx.await
878 .wrap_err("the execution runtime dropped the response channel before sending a receipt")
879 }
880
881 pub async fn migrate_validator(
882 &self,
883 http_url: Url,
884 index: u64,
885 ) -> eyre::Result<TransactionReceipt> {
886 let (response, rx) = oneshot::channel();
887 self.to_runtime
888 .send(
889 MigrateValidator {
890 http_url,
891 index,
892 response,
893 }
894 .into(),
895 )
896 .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
897 rx.await
898 .wrap_err("the execution runtime dropped the response channel before sending a receipt")
899 }
900
901 pub async fn rotate_validator<C: Clock>(
902 &self,
903 http_url: Url,
904 validator: &TestingNode<C>,
905 ) -> eyre::Result<TransactionReceipt> {
906 let (response, rx) = oneshot::channel();
907 self.to_runtime
908 .send(
909 RotateValidator {
910 http_url,
911 private_key: validator.private_key().clone(),
912 address: validator.chain_address,
913 ingress: validator.ingress(),
914 egress: validator.egress(),
915 response,
916 }
917 .into(),
918 )
919 .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
920 rx.await
921 .wrap_err("the execution runtime dropped the response channel before sending a receipt")
922 }
923
924 pub async fn set_next_full_dkg_ceremony(
925 &self,
926 http_url: Url,
927 epoch: u64,
928 ) -> eyre::Result<TransactionReceipt> {
929 let (tx, rx) = oneshot::channel();
930 self.to_runtime
931 .send(
932 SetNextFullDkgCeremony {
933 http_url,
934 epoch,
935 response: tx,
936 }
937 .into(),
938 )
939 .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
940 rx.await
941 .wrap_err("the execution runtime dropped the response channel before sending a receipt")
942 }
943
944 pub async fn set_next_full_dkg_ceremony_v2(
945 &self,
946 http_url: Url,
947 epoch: u64,
948 ) -> eyre::Result<TransactionReceipt> {
949 let (tx, rx) = oneshot::channel();
950 self.to_runtime
951 .send(
952 SetNextFullDkgCeremonyV2 {
953 http_url,
954 epoch,
955 response: tx,
956 }
957 .into(),
958 )
959 .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
960 rx.await
961 .wrap_err("the execution runtime dropped the response channel before sending a receipt")
962 }
963
964 pub async fn remove_validator(
965 &self,
966 http_url: Url,
967 address: Address,
968 public_key: PublicKey,
969 addr: SocketAddr,
970 ) -> eyre::Result<TransactionReceipt> {
971 let (tx, rx) = oneshot::channel();
972 self.to_runtime
973 .send(
974 AddValidator {
975 http_url,
976 address,
977 public_key,
978 addr,
979 response: tx,
980 }
981 .into(),
982 )
983 .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
984 rx.await
985 .wrap_err("the execution runtime dropped the response channel before sending a receipt")
986 }
987
988 pub async fn run_async<Fut, T>(&self, fut: Fut) -> eyre::Result<T>
993 where
994 Fut: std::future::Future<Output = T> + Send + 'static,
995 T: Send + 'static,
996 {
997 let (tx, rx) = oneshot::channel();
998 self.to_runtime
999 .send(Message::RunAsync(Box::pin(async move {
1000 let result = fut.await;
1001 let _ = tx.send(result);
1002 })))
1003 .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
1004 rx.await
1005 .wrap_err("the execution runtime dropped the response channel")
1006 }
1007
1008 pub fn stop(self) -> eyre::Result<()> {
1010 self.to_runtime
1011 .send(Message::Stop)
1012 .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
1013 match self.rt.join() {
1014 Ok(()) => Ok(()),
1015 Err(e) => std::panic::resume_unwind(e),
1016 }
1017 }
1018}
1019
1020#[derive(Clone)]
1024pub struct ExecutionRuntimeHandle {
1025 to_runtime: tokio::sync::mpsc::UnboundedSender<Message>,
1026 nodes_dir: PathBuf,
1027}
1028
1029impl ExecutionRuntimeHandle {
1030 pub fn nodes_dir(&self) -> &Path {
1032 &self.nodes_dir
1033 }
1034
1035 pub async fn spawn_node(
1037 &self,
1038 name: &str,
1039 config: ExecutionNodeConfig,
1040 database: DatabaseEnv,
1041 rocksdb: Option<RocksDBProvider>,
1042 ) -> eyre::Result<ExecutionNode> {
1043 let (tx, rx) = oneshot::channel();
1044 self.to_runtime
1045 .send(Message::SpawnNode {
1046 name: name.to_string(),
1047 config: Box::new(config),
1048 database,
1049 rocksdb,
1050 response: tx,
1051 })
1052 .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
1053 rx.await.wrap_err(
1054 "the execution runtime dropped the response channel before sending an execution node",
1055 )
1056 }
1057}
1058
1059pub struct ExecutionNode {
1064 pub node: TempoFullNode,
1066 pub runtime: Runtime,
1068 pub exit_fut: NodeExitFuture,
1070}
1071
1072impl ExecutionNode {
1073 pub async fn connect_peer(&self, other: &Self) {
1075 let self_record = self.node.network.local_node_record();
1076 let other_record = other.node.network.local_node_record();
1077 let mut events = self.node.network.event_listener();
1078
1079 self.node
1080 .network
1081 .add_trusted_peer(other_record.id, other_record.tcp_addr());
1082
1083 match events.next().await {
1084 Some(NetworkEvent::Peer(PeerEvent::PeerAdded(_))) => (),
1085 ev => panic!("Expected a peer added event, got: {ev:?}"),
1086 }
1087
1088 match events.next().await {
1089 Some(NetworkEvent::ActivePeerSession { .. }) => (),
1090 ev => panic!("Expected an active peer session event, got: {ev:?}"),
1091 }
1092
1093 tracing::debug!(
1094 "Connected peers: {:?} -> {:?}",
1095 self_record.id,
1096 other_record.id
1097 );
1098 }
1099
1100 pub async fn shutdown(self) {
1102 let _ = self.node.rpc_server_handle().clone().stop();
1103 self.runtime
1104 .graceful_shutdown_with_timeout(Duration::from_secs(10));
1105 let _ = self.exit_fut.await;
1106 }
1107}
1108
1109pub fn chainspec() -> TempoChainSpec {
1113 TempoChainSpec::from_genesis(genesis())
1114}
1115
1116pub fn execution_node_name(public_key: &PublicKey) -> String {
1118 format!("{}-{}", crate::EXECUTION_NODE_PREFIX, public_key)
1119}
1120
1121impl std::fmt::Debug for ExecutionNode {
1123 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1124 f.debug_struct("ExecutionNode")
1125 .field("node", &"<TempoFullNode>")
1126 .field("exit_fut", &"<NodeExitFuture>")
1127 .finish()
1128 }
1129}
1130
1131pub fn genesis() -> Genesis {
1132 serde_json::from_str(include_str!("../../node/tests/assets/test-genesis.json")).unwrap()
1133}
1134
1135pub async fn launch_execution_node<P: AsRef<Path>>(
1144 runtime: Runtime,
1145 chain_spec: TempoChainSpec,
1146 datadir: P,
1147 config: ExecutionNodeConfig,
1148 database: DatabaseEnv,
1149 rocksdb: Option<RocksDBProvider>,
1150) -> eyre::Result<ExecutionNode> {
1151 println!("launching node at {}", datadir.as_ref().display());
1152 let node_config = NodeConfig::new(Arc::new(chain_spec))
1153 .with_rpc(
1154 RpcServerArgs::default()
1155 .with_unused_ports()
1156 .with_http()
1157 .with_http_api(RpcModuleSelection::All)
1158 .with_ws()
1159 .with_ws_api(RpcModuleSelection::All),
1160 )
1161 .with_datadir_args(DatadirArgs {
1162 datadir: datadir.as_ref().to_path_buf().into(),
1163 ..DatadirArgs::default()
1164 })
1165 .with_payload_builder(PayloadBuilderArgs {
1166 interval: Duration::from_millis(100),
1167 ..Default::default()
1168 })
1169 .with_storage(StorageArgs { v2: false })
1170 .apply(|mut c| {
1171 c.network.discovery.disable_discovery = true;
1172 c.network.trusted_peers = config
1173 .trusted_peers
1174 .into_iter()
1175 .map(|s| {
1176 s.parse::<TrustedPeer>()
1177 .expect("invalid trusted peer enode")
1178 })
1179 .collect();
1180 c.network.port = config.port;
1181 c.network.p2p_secret_key_hex = Some(config.secret_key);
1182 c
1183 });
1184
1185 let tempo_node = TempoNode::default().with_validator_key(config.validator_key);
1186 let feed_state = config.feed_state;
1187
1188 let node_handle = if let Some(rocksdb) = rocksdb {
1189 NodeBuilder::new(node_config)
1190 .with_database(database)
1191 .with_rocksdb_provider(rocksdb)
1192 } else {
1193 NodeBuilder::new(node_config).with_database(database)
1194 }
1195 .with_launch_context(runtime.clone())
1196 .node(tempo_node)
1197 .extend_rpc_modules(move |ctx| {
1198 if let Some(feed_state) = feed_state {
1199 ctx.modules
1200 .merge_configured(TempoConsensusRpc::new(feed_state).into_rpc())?;
1201 }
1202 Ok(())
1203 })
1204 .launch()
1205 .await
1206 .wrap_err_with(|| {
1207 format!(
1208 "failed launching node; databasedir: `{}`",
1209 datadir.as_ref().display()
1210 )
1211 })?;
1212
1213 Ok(ExecutionNode {
1214 node: node_handle.node,
1215 runtime,
1216 exit_fut: node_handle.node_exit_future,
1217 })
1218}
1219
1220enum Message {
1221 AddValidator(AddValidator),
1222 AddValidatorV2(AddValidatorV2),
1223 ChangeValidatorStatus(ChangeValidatorStatus),
1224 DeactivateValidatorV2(DeactivateValidatorV2),
1225 GetV1Validators(GetV1Validators),
1226 GetV2Validators(GetV2Validators),
1227 InitializeIfMigrated(InitializeIfMigrated),
1228 MigrateValidator(MigrateValidator),
1229 RotateValidator(RotateValidator),
1230 SetNextFullDkgCeremony(SetNextFullDkgCeremony),
1231 SetNextFullDkgCeremonyV2(SetNextFullDkgCeremonyV2),
1232 SpawnNode {
1233 name: String,
1234 config: Box<ExecutionNodeConfig>,
1235 database: DatabaseEnv,
1236 rocksdb: Option<RocksDBProvider>,
1237 response: tokio::sync::oneshot::Sender<ExecutionNode>,
1238 },
1239 RunAsync(BoxFuture<'static, ()>),
1240 Stop,
1241}
1242
1243impl From<AddValidator> for Message {
1244 fn from(value: AddValidator) -> Self {
1245 Self::AddValidator(value)
1246 }
1247}
1248
1249impl From<AddValidatorV2> for Message {
1250 fn from(value: AddValidatorV2) -> Self {
1251 Self::AddValidatorV2(value)
1252 }
1253}
1254
1255impl From<ChangeValidatorStatus> for Message {
1256 fn from(value: ChangeValidatorStatus) -> Self {
1257 Self::ChangeValidatorStatus(value)
1258 }
1259}
1260
1261impl From<DeactivateValidatorV2> for Message {
1262 fn from(value: DeactivateValidatorV2) -> Self {
1263 Self::DeactivateValidatorV2(value)
1264 }
1265}
1266
1267impl From<GetV1Validators> for Message {
1268 fn from(value: GetV1Validators) -> Self {
1269 Self::GetV1Validators(value)
1270 }
1271}
1272
1273impl From<GetV2Validators> for Message {
1274 fn from(value: GetV2Validators) -> Self {
1275 Self::GetV2Validators(value)
1276 }
1277}
1278
1279impl From<InitializeIfMigrated> for Message {
1280 fn from(value: InitializeIfMigrated) -> Self {
1281 Self::InitializeIfMigrated(value)
1282 }
1283}
1284
1285impl From<MigrateValidator> for Message {
1286 fn from(value: MigrateValidator) -> Self {
1287 Self::MigrateValidator(value)
1288 }
1289}
1290
1291impl From<RotateValidator> for Message {
1292 fn from(value: RotateValidator) -> Self {
1293 Self::RotateValidator(value)
1294 }
1295}
1296
1297impl From<SetNextFullDkgCeremony> for Message {
1298 fn from(value: SetNextFullDkgCeremony) -> Self {
1299 Self::SetNextFullDkgCeremony(value)
1300 }
1301}
1302
1303impl From<SetNextFullDkgCeremonyV2> for Message {
1304 fn from(value: SetNextFullDkgCeremonyV2) -> Self {
1305 Self::SetNextFullDkgCeremonyV2(value)
1306 }
1307}
1308
1309#[derive(Debug)]
1310struct AddValidator {
1311 http_url: Url,
1313 address: Address,
1314 public_key: PublicKey,
1315 addr: SocketAddr,
1316 response: oneshot::Sender<TransactionReceipt>,
1317}
1318
1319#[derive(Debug)]
1320struct AddValidatorV2 {
1321 http_url: Url,
1323 private_key: PrivateKey,
1324 address: Address,
1325 ingress: SocketAddr,
1326 egress: IpAddr,
1327 fee_recipient: Address,
1328 response: oneshot::Sender<TransactionReceipt>,
1329}
1330
1331#[derive(Debug)]
1332struct ChangeValidatorStatus {
1333 http_url: Url,
1335 index: u64,
1336 active: bool,
1337 response: oneshot::Sender<TransactionReceipt>,
1338}
1339
1340#[derive(Debug)]
1341struct DeactivateValidatorV2 {
1342 http_url: Url,
1344 address: Address,
1345 response: oneshot::Sender<TransactionReceipt>,
1346}
1347
1348struct GetV1Validators {
1349 http_url: Url,
1350 response: oneshot::Sender<Vec<IValidatorConfig::Validator>>,
1351}
1352
1353struct GetV2Validators {
1354 http_url: Url,
1355 response: oneshot::Sender<Vec<IValidatorConfigV2::Validator>>,
1356}
1357
1358#[derive(Debug)]
1359struct InitializeIfMigrated {
1360 http_url: Url,
1362 response: oneshot::Sender<TransactionReceipt>,
1363}
1364
1365#[derive(Debug)]
1366struct MigrateValidator {
1367 http_url: Url,
1369 index: u64,
1370 response: oneshot::Sender<TransactionReceipt>,
1371}
1372
1373#[derive(Debug)]
1374struct RotateValidator {
1375 http_url: Url,
1377 private_key: PrivateKey,
1378 address: Address,
1379 ingress: SocketAddr,
1380 egress: IpAddr,
1381 response: oneshot::Sender<TransactionReceipt>,
1382}
1383
1384#[derive(Debug)]
1385struct SetNextFullDkgCeremony {
1386 http_url: Url,
1388 epoch: u64,
1389 response: oneshot::Sender<TransactionReceipt>,
1390}
1391
1392#[derive(Debug)]
1393struct SetNextFullDkgCeremonyV2 {
1394 http_url: Url,
1396 epoch: u64,
1397 response: oneshot::Sender<TransactionReceipt>,
1398}
1399
1400pub fn admin() -> Address {
1401 address(ADMIN_INDEX)
1402}
1403
1404pub fn validator(idx: u32) -> Address {
1405 address(VALIDATOR_START_INDEX + idx)
1406}
1407
1408pub fn address(index: u32) -> Address {
1409 secret_key_to_address(MnemonicBuilder::from_phrase_nth(TEST_MNEMONIC, index).credential())
1410}
1411
1412fn setup_tempo_evm(chain_id: u64) -> TempoEvm<CacheDB<EmptyDB>> {
1413 let db = CacheDB::default();
1414 let mut env = EvmEnv::default().with_timestamp(U256::ZERO);
1416 env.cfg_env.chain_id = chain_id;
1417
1418 let factory = TempoEvmFactory::default();
1419 factory.create_evm(db, env)
1420}
1421
1422fn sign_add_validator_args(
1423 chain_id: u64,
1424 key: &PrivateKey,
1425 address: Address,
1426 ingress: SocketAddr,
1427 egress: IpAddr,
1428 fee_recipient: Address,
1429) -> Signature {
1430 let mut hasher = Keccak256::new();
1431 hasher.update(chain_id.to_be_bytes());
1432 hasher.update(VALIDATOR_CONFIG_V2_ADDRESS.as_slice());
1433 hasher.update(address.as_slice());
1434 hasher.update([ingress.to_string().len() as u8]);
1435 hasher.update(ingress.to_string().as_bytes());
1436 hasher.update([egress.to_string().len() as u8]);
1437 hasher.update(egress.to_string().as_bytes());
1438 hasher.update(fee_recipient.as_slice());
1439 let msg = hasher.finalize();
1440 key.sign(VALIDATOR_NS_ADD, msg.as_slice())
1441}
1442
1443fn sign_rotate_validator_args(
1444 chain_id: u64,
1445 key: &PrivateKey,
1446 address: Address,
1447 ingress: SocketAddr,
1448 egress: IpAddr,
1449) -> Signature {
1450 let mut hasher = Keccak256::new();
1451 hasher.update(chain_id.to_be_bytes());
1452 hasher.update(VALIDATOR_CONFIG_V2_ADDRESS.as_slice());
1453 hasher.update(address.as_slice());
1454 hasher.update([ingress.to_string().len() as u8]);
1455 hasher.update(ingress.to_string().as_bytes());
1456 hasher.update([egress.to_string().len() as u8]);
1457 hasher.update(egress.to_string().as_bytes());
1458 let msg = hasher.finalize();
1459 key.sign(VALIDATOR_NS_ROTATE, msg.as_slice())
1460}