Skip to main content

tempo_e2e/
execution_runtime.rs

1//! The environment to launch tempo execution nodes in.
2use std::{
3    net::{IpAddr, SocketAddr},
4    path::{Path, PathBuf},
5    sync::Arc,
6    time::Duration,
7};
8
9use alloy::{
10    providers::ProviderBuilder,
11    rpc::types::TransactionReceipt,
12    signers::{local::MnemonicBuilder, utils::secret_key_to_address},
13    transports::http::reqwest::Url,
14};
15use alloy_evm::{EvmFactory as _, revm::inspector::JournalExt as _};
16use alloy_genesis::{Genesis, GenesisAccount};
17use alloy_primitives::{Address, B256, Keccak256, U256};
18use commonware_codec::Encode;
19use commonware_cryptography::{
20    Signer,
21    ed25519::{PrivateKey, PublicKey, Signature},
22};
23use commonware_runtime::Clock;
24use commonware_utils::ordered;
25use eyre::{OptionExt as _, WrapErr as _};
26use futures::{StreamExt, future::BoxFuture};
27use reth_chainspec::EthChainSpec;
28use reth_db::mdbx::DatabaseEnv;
29use reth_ethereum::{
30    evm::{
31        primitives::EvmEnv,
32        revm::db::{CacheDB, EmptyDB},
33    },
34    network::{
35        Peers as _,
36        api::{
37            NetworkEventListenerProvider, PeersInfo,
38            events::{NetworkEvent, PeerEvent},
39        },
40    },
41    provider::providers::RocksDBProvider,
42    tasks::Runtime,
43};
44use reth_network_peers::{NodeRecord, TrustedPeer};
45use reth_node_builder::{NodeBuilder, NodeConfig};
46use reth_node_core::{
47    args::{DatadirArgs, PayloadBuilderArgs, RpcServerArgs, StorageArgs},
48    exit::NodeExitFuture,
49};
50use reth_rpc_builder::RpcModuleSelection;
51use secp256k1::SecretKey;
52use std::net::TcpListener;
53use tempfile::TempDir;
54use tempo_chainspec::TempoChainSpec;
55use tempo_commonware_node::feed::FeedStateHandle;
56use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
57use tempo_node::{
58    TempoFullNode,
59    evm::{TempoEvmFactory, evm::TempoEvm},
60    node::TempoNode,
61    rpc::consensus::{TempoConsensusApiServer, TempoConsensusRpc},
62};
63use tempo_precompiles::{
64    VALIDATOR_CONFIG_ADDRESS, VALIDATOR_CONFIG_V2_ADDRESS,
65    storage::StorageCtx,
66    validator_config::{IValidatorConfig, ValidatorConfig},
67    validator_config_v2::{
68        IValidatorConfigV2, VALIDATOR_NS_ADD, VALIDATOR_NS_ROTATE, ValidatorConfigV2,
69    },
70};
71use tokio::sync::oneshot;
72
73use crate::{ConsensusNodeConfig, TestingNode};
74
75const ADMIN_INDEX: u32 = 0;
76const VALIDATOR_START_INDEX: u32 = 1;
77
78/// Same mnemonic as used in the imported test-genesis and in the `tempo-node` integration tests.
79pub const TEST_MNEMONIC: &str = "test test test test test test test test test test test junk";
80
81#[derive(Default, Debug)]
82pub struct Builder {
83    epoch_length: Option<u64>,
84    initial_dkg_outcome: Option<OnchainDkgOutcome>,
85    t2_time: Option<u64>,
86    validators: Option<ordered::Map<PublicKey, ConsensusNodeConfig>>,
87}
88
89impl Builder {
90    pub fn new() -> Self {
91        Self {
92            epoch_length: None,
93            initial_dkg_outcome: None,
94            t2_time: None,
95            validators: None,
96        }
97    }
98
99    pub fn with_epoch_length(self, epoch_length: u64) -> Self {
100        Self {
101            epoch_length: Some(epoch_length),
102            ..self
103        }
104    }
105
106    pub fn with_initial_dkg_outcome(self, initial_dkg_outcome: OnchainDkgOutcome) -> Self {
107        Self {
108            initial_dkg_outcome: Some(initial_dkg_outcome),
109            ..self
110        }
111    }
112
113    pub fn with_validators(self, validators: ordered::Map<PublicKey, ConsensusNodeConfig>) -> Self {
114        Self {
115            validators: Some(validators),
116            ..self
117        }
118    }
119
120    pub fn with_t2_time(self, t2_time: u64) -> Self {
121        Self {
122            t2_time: Some(t2_time),
123            ..self
124        }
125    }
126
127    pub fn launch(self) -> eyre::Result<ExecutionRuntime> {
128        let Self {
129            epoch_length,
130            initial_dkg_outcome,
131            t2_time,
132            validators,
133        } = self;
134
135        let epoch_length = epoch_length.ok_or_eyre("must specify epoch length")?;
136        let initial_dkg_outcome =
137            initial_dkg_outcome.ok_or_eyre("must specify initial DKG outcome")?;
138        let t2_time = t2_time.ok_or_eyre("must specify t2 time")?;
139        let validators = validators.ok_or_eyre("must specify validators")?;
140
141        assert_eq!(
142            initial_dkg_outcome.next_players(),
143            &ordered::Set::from_iter_dedup(
144                validators
145                    .iter_pairs()
146                    .filter_map(|(key, val)| val.share.is_some().then_some(key.clone()))
147            )
148        );
149
150        let mut genesis = genesis();
151        genesis
152            .config
153            .extra_fields
154            .insert_value("epochLength".to_string(), epoch_length)
155            .unwrap();
156        genesis
157            .config
158            .extra_fields
159            .insert_value("t2Time".to_string(), t2_time)
160            .unwrap();
161
162        genesis.extra_data = initial_dkg_outcome.encode().to_vec().into();
163
164        // Just remove whatever is already written into chainspec.
165        genesis.alloc.remove(&VALIDATOR_CONFIG_ADDRESS);
166        genesis.alloc.remove(&VALIDATOR_CONFIG_V2_ADDRESS);
167
168        let mut evm = setup_tempo_evm(genesis.config.chain_id);
169        {
170            let cx = evm.ctx_mut();
171            StorageCtx::enter_evm(&mut cx.journaled_state, &cx.block, &cx.cfg, &cx.tx, || {
172                // TODO(janis): figure out the owner of the test-genesis.json
173                let mut validator_config = ValidatorConfig::new();
174                validator_config
175                    .initialize(admin())
176                    .wrap_err("failed to initialize validator config v1")
177                    .unwrap();
178
179                let mut validator_config_v2 = ValidatorConfigV2::new();
180                if t2_time == 0 {
181                    validator_config_v2
182                        .initialize(admin())
183                        .wrap_err("failed to initialize validator config v2")
184                        .unwrap();
185                }
186
187                for (public_key, validator) in validators {
188                    if let ConsensusNodeConfig {
189                        address,
190                        ingress,
191                        egress,
192                        fee_recipient,
193                        private_key,
194                        share: Some(_),
195                    } = validator
196                    {
197                        validator_config
198                            .add_validator(
199                                admin(),
200                                IValidatorConfig::addValidatorCall {
201                                    newValidatorAddress: address,
202                                    publicKey: public_key.encode().as_ref().try_into().unwrap(),
203                                    active: true,
204                                    inboundAddress: ingress.to_string(),
205                                    outboundAddress: egress.to_string(),
206                                },
207                            )
208                            .unwrap();
209
210                        if t2_time == 0 {
211                            validator_config_v2
212                                .add_validator(
213                                    admin(),
214                                    IValidatorConfigV2::addValidatorCall {
215                                        validatorAddress: address,
216                                        publicKey: public_key.encode().as_ref().try_into().unwrap(),
217                                        ingress: ingress.to_string(),
218                                        egress: egress.ip().to_string(),
219                                        feeRecipient: fee_recipient,
220                                        signature: sign_add_validator_args(
221                                            genesis.config.chain_id,
222                                            &private_key,
223                                            address,
224                                            ingress,
225                                            egress.ip(),
226                                            fee_recipient,
227                                        )
228                                        .encode()
229                                        .to_vec()
230                                        .into(),
231                                    },
232                                )
233                                .unwrap();
234                        }
235                    }
236                }
237            })
238        }
239
240        let evm_state = evm.ctx_mut().journaled_state.evm_state();
241        for (address, account) in evm_state.iter() {
242            let storage = if !account.storage.is_empty() {
243                Some(
244                    account
245                        .storage
246                        .iter()
247                        .map(|(key, val)| ((*key).into(), val.present_value.into()))
248                        .collect(),
249                )
250            } else {
251                None
252            };
253            genesis.alloc.insert(
254                *address,
255                GenesisAccount {
256                    nonce: Some(account.info.nonce),
257                    code: account.info.code.as_ref().map(|c| c.original_bytes()),
258                    storage,
259                    ..Default::default()
260                },
261            );
262        }
263
264        Ok(ExecutionRuntime::with_chain_spec(
265            TempoChainSpec::from_genesis(genesis),
266        ))
267    }
268}
269
270/// Configuration for launching an execution node.
271#[derive(Clone, Debug)]
272pub struct ExecutionNodeConfig {
273    /// Network secret key for the node's identity.
274    pub secret_key: B256,
275    /// List of trusted peer enode URLs to connect to.
276    pub trusted_peers: Vec<String>,
277    /// Port for the network service.
278    pub port: u16,
279    /// Validator public key for filtering subblock transactions.
280    pub validator_key: Option<B256>,
281    /// Feed state handle for consensus RPC (if validator).
282    pub feed_state: Option<FeedStateHandle>,
283}
284
285impl ExecutionNodeConfig {
286    /// Create a default generator for building multiple execution node configs.
287    pub fn generator() -> ExecutionNodeConfigGenerator {
288        ExecutionNodeConfigGenerator::default()
289    }
290}
291
292/// Generator for creating multiple execution node configurations.
293#[derive(Default)]
294pub struct ExecutionNodeConfigGenerator {
295    count: u32,
296    connect_peers: bool,
297}
298
299impl ExecutionNodeConfigGenerator {
300    /// Set the number of nodes to generate.
301    pub fn with_count(mut self, count: u32) -> Self {
302        self.count = count;
303        self
304    }
305
306    /// Set whether to enable peer connections between all generated nodes.
307    pub fn with_peers(mut self, connect: bool) -> Self {
308        self.connect_peers = connect;
309        self
310    }
311
312    /// Generate the execution node configurations.
313    pub fn generate(self) -> Vec<ExecutionNodeConfig> {
314        if !self.connect_peers {
315            // No peer connections needed, use port 0 (OS will assign)
316            return (0..self.count)
317                .map(|_| ExecutionNodeConfig {
318                    secret_key: B256::random(),
319                    trusted_peers: vec![],
320                    port: 0,
321                    validator_key: None,
322                    feed_state: None,
323                })
324                .collect();
325        }
326
327        // Reserve ports by binding to them for peer connections
328        let ports: Vec<u16> = (0..self.count)
329            .map(|_| {
330                // This should work, but there's a chance that it results in flaky tests
331                let listener = TcpListener::bind("127.0.0.1:0").unwrap();
332                let port = listener
333                    .local_addr()
334                    .expect("failed to get local addr")
335                    .port();
336                drop(listener);
337                port
338            })
339            .collect();
340
341        let mut configs: Vec<ExecutionNodeConfig> = ports
342            .into_iter()
343            .map(|port| ExecutionNodeConfig {
344                secret_key: B256::random(),
345                trusted_peers: vec![],
346                port,
347                validator_key: None,
348                feed_state: None,
349            })
350            .collect();
351
352        let enode_urls: Vec<String> = configs
353            .iter()
354            .map(|config| {
355                let secret_key =
356                    SecretKey::from_slice(config.secret_key.as_slice()).expect("valid secret key");
357                let addr = SocketAddr::from(([127, 0, 0, 1], config.port));
358                NodeRecord::from_secret_key(addr, &secret_key).to_string()
359            })
360            .collect();
361
362        for (i, config) in configs.iter_mut().enumerate() {
363            for (j, enode_url) in enode_urls.iter().enumerate() {
364                if i != j {
365                    config.trusted_peers.push(enode_url.clone());
366                }
367            }
368        }
369
370        configs
371    }
372}
373
374/// An execution runtime wrapping a thread running a [`tokio::runtime::Runtime`].
375///
376/// This is needed to spawn tempo execution nodes, which require a tokio runtime.
377///
378/// The commonware itself is launched in their
379/// [`commonware_runtime::deterministic`] and so this extra effort is necessary.
380pub struct ExecutionRuntime {
381    // The tokio runtime launched on a different thread.
382    rt: std::thread::JoinHandle<()>,
383
384    // Base directory where all reth databases will be initialized.
385    _tempdir: TempDir,
386
387    // Channel to request the runtime to launch new execution nodes.
388    to_runtime: tokio::sync::mpsc::UnboundedSender<Message>,
389}
390
391impl ExecutionRuntime {
392    pub fn builder() -> Builder {
393        Builder::new()
394    }
395
396    /// Constructs a new execution runtime to launch execution nodes.
397    pub fn with_chain_spec(chain_spec: TempoChainSpec) -> Self {
398        let tempdir = tempfile::Builder::new()
399            // TODO(janis): cargo manifest prefix?
400            .prefix("tempo_e2e_test")
401            .disable_cleanup(true)
402            .tempdir()
403            .expect("must be able to create a temp directory run tun tests");
404
405        let (to_runtime, mut from_handle) = tokio::sync::mpsc::unbounded_channel();
406
407        let datadir = tempdir.path().to_path_buf();
408        let rt = std::thread::spawn(move || {
409            let rt = tokio::runtime::Runtime::new()
410                .expect("must be able to initialize a runtime to run execution/reth nodes");
411            let wallet = MnemonicBuilder::from_phrase(crate::execution_runtime::TEST_MNEMONIC)
412                .build()
413                .unwrap();
414            rt.block_on(async move {
415                while let Some(msg) = from_handle.recv().await {
416                    // create a new task manager for the new node instance
417                    let runtime = Runtime::test();
418                    match msg {
419                        Message::AddValidator(add_validator) => {
420                            let AddValidator {
421                                http_url,
422                                address,
423                                public_key,
424                                addr,
425                                response,
426                            } = add_validator;
427                            let provider = ProviderBuilder::new()
428                                .wallet(wallet.clone())
429                                .connect_http(http_url);
430                            let validator_config =
431                                IValidatorConfig::new(VALIDATOR_CONFIG_ADDRESS, provider);
432                            let receipt = validator_config
433                                .addValidator(
434                                    address,
435                                    public_key.encode().as_ref().try_into().unwrap(),
436                                    true,
437                                    addr.to_string(),
438                                    addr.to_string(),
439                                )
440                                .send()
441                                .await
442                                .unwrap()
443                                .get_receipt()
444                                .await
445                                .unwrap();
446                            let _ = response.send(receipt);
447                        }
448                        Message::AddValidatorV2(add_validator_v2) => {
449                            let AddValidatorV2 {
450                                http_url,
451                                private_key,
452                                address,
453                                ingress,
454                                egress,
455                                fee_recipient,
456                                response,
457                            } = add_validator_v2;
458                            let provider = ProviderBuilder::new()
459                                .wallet(wallet.clone())
460                                .connect_http(http_url);
461                            let validator_config =
462                                IValidatorConfigV2::new(VALIDATOR_CONFIG_V2_ADDRESS, provider);
463                            let receipt = validator_config
464                                .addValidator(
465                                    address,
466                                    private_key
467                                        .public_key()
468                                        .encode()
469                                        .as_ref()
470                                        .try_into()
471                                        .unwrap(),
472                                    ingress.to_string(),
473                                    egress.to_string(),
474                                    fee_recipient,
475                                    sign_add_validator_args(
476                                        EthChainSpec::chain(&chain_spec).id(),
477                                        &private_key,
478                                        address,
479                                        ingress,
480                                        egress,
481                                        fee_recipient,
482                                    )
483                                    .encode()
484                                    .to_vec()
485                                    .into(),
486                                )
487                                .send()
488                                .await
489                                .unwrap()
490                                .get_receipt()
491                                .await
492                                .unwrap();
493                            let _ = response.send(receipt);
494                        }
495                        Message::ChangeValidatorStatus(change_validator_status) => {
496                            let ChangeValidatorStatus {
497                                http_url,
498                                active,
499                                index,
500                                response,
501                            } = change_validator_status;
502                            let provider = ProviderBuilder::new()
503                                .wallet(wallet.clone())
504                                .connect_http(http_url);
505                            let validator_config =
506                                IValidatorConfig::new(VALIDATOR_CONFIG_ADDRESS, provider);
507                            let receipt = validator_config
508                                .changeValidatorStatusByIndex(index, active)
509                                .send()
510                                .await
511                                .unwrap()
512                                .get_receipt()
513                                .await
514                                .unwrap();
515                            let _ = response.send(receipt);
516                        }
517                        Message::DeactivateValidatorV2(deacivate_validator_v2) => {
518                            let DeactivateValidatorV2 {
519                                http_url,
520                                address,
521                                response,
522                            } = deacivate_validator_v2;
523                            let provider = ProviderBuilder::new()
524                                .wallet(wallet.clone())
525                                .connect_http(http_url);
526                            let validator_config_v2 =
527                                IValidatorConfigV2::new(VALIDATOR_CONFIG_V2_ADDRESS, provider);
528                            let id = validator_config_v2
529                                .validatorByAddress(address)
530                                .call()
531                                .await
532                                .unwrap()
533                                .index;
534                            let receipt = validator_config_v2
535                                .deactivateValidator(id)
536                                .send()
537                                .await
538                                .unwrap()
539                                .get_receipt()
540                                .await
541                                .unwrap();
542                            let _ = response.send(receipt);
543                        }
544                        Message::GetV1Validators(get_v1_validators) => {
545                            let GetV1Validators { http_url, response } = get_v1_validators;
546                            let provider = ProviderBuilder::new()
547                                .wallet(wallet.clone())
548                                .connect_http(http_url);
549                            let validator_config =
550                                IValidatorConfig::new(VALIDATOR_CONFIG_ADDRESS, provider);
551                            let validators = validator_config.getValidators().call().await.unwrap();
552                            let _ = response.send(validators);
553                        }
554                        Message::GetV2Validators(get_v2_validators) => {
555                            let GetV2Validators { http_url, response } = get_v2_validators;
556                            let provider = ProviderBuilder::new()
557                                .wallet(wallet.clone())
558                                .connect_http(http_url);
559                            let validator_config =
560                                IValidatorConfigV2::new(VALIDATOR_CONFIG_V2_ADDRESS, provider);
561                            let validators =
562                                validator_config.getActiveValidators().call().await.unwrap();
563                            let _ = response.send(validators);
564                        }
565                        Message::InitializeIfMigrated(InitializeIfMigrated {
566                            http_url,
567                            response,
568                        }) => {
569                            let provider = ProviderBuilder::new()
570                                .wallet(wallet.clone())
571                                .connect_http(http_url);
572                            let validator_config_v2 =
573                                IValidatorConfigV2::new(VALIDATOR_CONFIG_V2_ADDRESS, provider);
574                            let receipt = validator_config_v2
575                                .initializeIfMigrated()
576                                .send()
577                                .await
578                                .unwrap()
579                                .get_receipt()
580                                .await
581                                .unwrap();
582                            let _ = response.send(receipt);
583                        }
584                        Message::MigrateValidator(migrate_validator) => {
585                            let MigrateValidator {
586                                http_url,
587                                index,
588                                response,
589                            } = migrate_validator;
590                            let provider = ProviderBuilder::new()
591                                .wallet(wallet.clone())
592                                .connect_http(http_url);
593                            let validator_config_v2 =
594                                IValidatorConfigV2::new(VALIDATOR_CONFIG_V2_ADDRESS, provider);
595                            let receipt = validator_config_v2
596                                .migrateValidator(index)
597                                .send()
598                                .await
599                                .unwrap()
600                                .get_receipt()
601                                .await
602                                .unwrap();
603                            let _ = response.send(receipt);
604                        }
605                        Message::RotateValidator(rotate_validator) => {
606                            let RotateValidator {
607                                http_url,
608                                private_key,
609                                address,
610                                ingress,
611                                egress,
612                                response,
613                            } = rotate_validator;
614                            let provider = ProviderBuilder::new()
615                                .wallet(wallet.clone())
616                                .connect_http(http_url);
617                            let validator_config =
618                                IValidatorConfigV2::new(VALIDATOR_CONFIG_V2_ADDRESS, provider);
619                            let id = validator_config
620                                .validatorByAddress(address)
621                                .call()
622                                .await
623                                .unwrap()
624                                .index;
625                            let receipt = validator_config
626                                .rotateValidator(
627                                    id,
628                                    private_key
629                                        .public_key()
630                                        .encode()
631                                        .as_ref()
632                                        .try_into()
633                                        .unwrap(),
634                                    ingress.to_string(),
635                                    egress.to_string(),
636                                    sign_rotate_validator_args(
637                                        EthChainSpec::chain(&chain_spec).id(),
638                                        &private_key,
639                                        address,
640                                        ingress,
641                                        egress,
642                                    )
643                                    .encode()
644                                    .to_vec()
645                                    .into(),
646                                )
647                                .send()
648                                .await
649                                .unwrap()
650                                .get_receipt()
651                                .await
652                                .unwrap();
653                            let _ = response.send(receipt);
654                        }
655                        Message::SetNextFullDkgCeremony(set_next_full_dkg_ceremony) => {
656                            let SetNextFullDkgCeremony {
657                                http_url,
658                                epoch,
659                                response,
660                            } = set_next_full_dkg_ceremony;
661                            let provider = ProviderBuilder::new()
662                                .wallet(wallet.clone())
663                                .connect_http(http_url);
664                            let validator_config =
665                                IValidatorConfig::new(VALIDATOR_CONFIG_ADDRESS, provider);
666                            let receipt = validator_config
667                                .setNextFullDkgCeremony(epoch)
668                                .send()
669                                .await
670                                .unwrap()
671                                .get_receipt()
672                                .await
673                                .unwrap();
674                            let _ = response.send(receipt);
675                        }
676                        Message::SetNextFullDkgCeremonyV2(set_next_full_dkg_ceremony_v2) => {
677                            let SetNextFullDkgCeremonyV2 {
678                                http_url,
679                                epoch,
680                                response,
681                            } = set_next_full_dkg_ceremony_v2;
682                            let provider = ProviderBuilder::new()
683                                .wallet(wallet.clone())
684                                .connect_http(http_url);
685                            let validator_config =
686                                IValidatorConfigV2::new(VALIDATOR_CONFIG_V2_ADDRESS, provider);
687                            let receipt = validator_config
688                                .setNetworkIdentityRotationEpoch(epoch)
689                                .send()
690                                .await
691                                .unwrap()
692                                .get_receipt()
693                                .await
694                                .unwrap();
695                            let _ = response.send(receipt);
696                        }
697                        Message::SpawnNode {
698                            name,
699                            config,
700                            database,
701                            rocksdb,
702                            response,
703                        } => {
704                            let node = launch_execution_node(
705                                runtime,
706                                chain_spec.clone(),
707                                datadir.join(name),
708                                *config,
709                                database,
710                                rocksdb,
711                            )
712                            .await
713                            .expect("must be able to launch execution nodes");
714                            response.send(node).expect(
715                                "receiver must hold the return channel until the node is returned",
716                            );
717                        }
718                        Message::RunAsync(fut) => {
719                            fut.await;
720                        }
721                        Message::Stop => {
722                            break;
723                        }
724                    }
725                }
726            })
727        });
728
729        Self {
730            rt,
731            _tempdir: tempdir,
732            to_runtime,
733        }
734    }
735
736    /// Returns a handle to this runtime.
737    ///
738    /// Can be used to spawn nodes.
739    pub fn handle(&self) -> ExecutionRuntimeHandle {
740        ExecutionRuntimeHandle {
741            to_runtime: self.to_runtime.clone(),
742            nodes_dir: self._tempdir.path().to_path_buf(),
743        }
744    }
745
746    pub async fn add_validator(
747        &self,
748        http_url: Url,
749        address: Address,
750        public_key: PublicKey,
751        addr: SocketAddr,
752    ) -> eyre::Result<TransactionReceipt> {
753        let (tx, rx) = oneshot::channel();
754        self.to_runtime
755            .send(
756                AddValidator {
757                    http_url,
758                    address,
759                    public_key,
760                    addr,
761                    response: tx,
762                }
763                .into(),
764            )
765            .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
766        rx.await
767            .wrap_err("the execution runtime dropped the response channel before sending a receipt")
768    }
769
770    pub async fn add_validator_v2<C: Clock>(
771        &self,
772        http_url: Url,
773        validator: &TestingNode<C>,
774    ) -> eyre::Result<TransactionReceipt> {
775        let (tx, rx) = oneshot::channel();
776        self.to_runtime
777            .send(
778                AddValidatorV2 {
779                    http_url,
780                    private_key: validator.private_key().clone(),
781                    address: validator.chain_address,
782                    ingress: validator.ingress(),
783                    egress: validator.egress(),
784                    fee_recipient: validator.fee_recipient(),
785                    response: tx,
786                }
787                .into(),
788            )
789            .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
790        rx.await
791            .wrap_err("the execution runtime dropped the response channel before sending a receipt")
792    }
793
794    pub async fn change_validator_status(
795        &self,
796        http_url: Url,
797        index: u64,
798        active: bool,
799    ) -> eyre::Result<TransactionReceipt> {
800        let (tx, rx) = oneshot::channel();
801        self.to_runtime
802            .send(
803                ChangeValidatorStatus {
804                    index,
805                    active,
806                    http_url,
807                    response: tx,
808                }
809                .into(),
810            )
811            .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
812        rx.await
813            .wrap_err("the execution runtime dropped the response channel before sending a receipt")
814    }
815
816    pub async fn deactivate_validator_v2<C: Clock>(
817        &self,
818        http_url: Url,
819        validator: &TestingNode<C>,
820    ) -> eyre::Result<TransactionReceipt> {
821        let (tx, rx) = oneshot::channel();
822        self.to_runtime
823            .send(
824                DeactivateValidatorV2 {
825                    http_url,
826                    address: validator.chain_address,
827                    response: tx,
828                }
829                .into(),
830            )
831            .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
832        rx.await
833            .wrap_err("the execution runtime dropped the response channel before sending a receipt")
834    }
835
836    pub async fn get_v1_validators(
837        &self,
838        http_url: Url,
839    ) -> eyre::Result<Vec<IValidatorConfig::Validator>> {
840        let (tx, rx) = oneshot::channel();
841        self.to_runtime
842            .send(
843                GetV1Validators {
844                    http_url,
845                    response: tx,
846                }
847                .into(),
848            )
849            .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
850        rx.await
851            .wrap_err("the execution runtime dropped the response channel before sending a receipt")
852    }
853
854    pub async fn get_v2_validators(
855        &self,
856        http_url: Url,
857    ) -> eyre::Result<Vec<IValidatorConfigV2::Validator>> {
858        let (tx, rx) = oneshot::channel();
859        self.to_runtime
860            .send(
861                GetV2Validators {
862                    http_url,
863                    response: tx,
864                }
865                .into(),
866            )
867            .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
868        rx.await
869            .wrap_err("the execution runtime dropped the response channel before sending a receipt")
870    }
871
872    pub async fn initialize_if_migrated(&self, http_url: Url) -> eyre::Result<TransactionReceipt> {
873        let (response, rx) = oneshot::channel();
874        self.to_runtime
875            .send(InitializeIfMigrated { http_url, response }.into())
876            .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
877        rx.await
878            .wrap_err("the execution runtime dropped the response channel before sending a receipt")
879    }
880
881    pub async fn migrate_validator(
882        &self,
883        http_url: Url,
884        index: u64,
885    ) -> eyre::Result<TransactionReceipt> {
886        let (response, rx) = oneshot::channel();
887        self.to_runtime
888            .send(
889                MigrateValidator {
890                    http_url,
891                    index,
892                    response,
893                }
894                .into(),
895            )
896            .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
897        rx.await
898            .wrap_err("the execution runtime dropped the response channel before sending a receipt")
899    }
900
901    pub async fn rotate_validator<C: Clock>(
902        &self,
903        http_url: Url,
904        validator: &TestingNode<C>,
905    ) -> eyre::Result<TransactionReceipt> {
906        let (response, rx) = oneshot::channel();
907        self.to_runtime
908            .send(
909                RotateValidator {
910                    http_url,
911                    private_key: validator.private_key().clone(),
912                    address: validator.chain_address,
913                    ingress: validator.ingress(),
914                    egress: validator.egress(),
915                    response,
916                }
917                .into(),
918            )
919            .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
920        rx.await
921            .wrap_err("the execution runtime dropped the response channel before sending a receipt")
922    }
923
924    pub async fn set_next_full_dkg_ceremony(
925        &self,
926        http_url: Url,
927        epoch: u64,
928    ) -> eyre::Result<TransactionReceipt> {
929        let (tx, rx) = oneshot::channel();
930        self.to_runtime
931            .send(
932                SetNextFullDkgCeremony {
933                    http_url,
934                    epoch,
935                    response: tx,
936                }
937                .into(),
938            )
939            .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
940        rx.await
941            .wrap_err("the execution runtime dropped the response channel before sending a receipt")
942    }
943
944    pub async fn set_next_full_dkg_ceremony_v2(
945        &self,
946        http_url: Url,
947        epoch: u64,
948    ) -> eyre::Result<TransactionReceipt> {
949        let (tx, rx) = oneshot::channel();
950        self.to_runtime
951            .send(
952                SetNextFullDkgCeremonyV2 {
953                    http_url,
954                    epoch,
955                    response: tx,
956                }
957                .into(),
958            )
959            .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
960        rx.await
961            .wrap_err("the execution runtime dropped the response channel before sending a receipt")
962    }
963
964    pub async fn remove_validator(
965        &self,
966        http_url: Url,
967        address: Address,
968        public_key: PublicKey,
969        addr: SocketAddr,
970    ) -> eyre::Result<TransactionReceipt> {
971        let (tx, rx) = oneshot::channel();
972        self.to_runtime
973            .send(
974                AddValidator {
975                    http_url,
976                    address,
977                    public_key,
978                    addr,
979                    response: tx,
980                }
981                .into(),
982            )
983            .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
984        rx.await
985            .wrap_err("the execution runtime dropped the response channel before sending a receipt")
986    }
987
988    /// Run an async task on the execution runtime's tokio runtime.
989    ///
990    /// This is useful for running code that requires a tokio runtime (like jsonrpsee clients)
991    /// from within the deterministic executor context.
992    pub async fn run_async<Fut, T>(&self, fut: Fut) -> eyre::Result<T>
993    where
994        Fut: std::future::Future<Output = T> + Send + 'static,
995        T: Send + 'static,
996    {
997        let (tx, rx) = oneshot::channel();
998        self.to_runtime
999            .send(Message::RunAsync(Box::pin(async move {
1000                let result = fut.await;
1001                let _ = tx.send(result);
1002            })))
1003            .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
1004        rx.await
1005            .wrap_err("the execution runtime dropped the response channel")
1006    }
1007
1008    /// Instructs the runtime to stop and exit.
1009    pub fn stop(self) -> eyre::Result<()> {
1010        self.to_runtime
1011            .send(Message::Stop)
1012            .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
1013        match self.rt.join() {
1014            Ok(()) => Ok(()),
1015            Err(e) => std::panic::resume_unwind(e),
1016        }
1017    }
1018}
1019
1020/// A handle to the execution runtime.
1021///
1022/// Can be used to spawn nodes.
1023#[derive(Clone)]
1024pub struct ExecutionRuntimeHandle {
1025    to_runtime: tokio::sync::mpsc::UnboundedSender<Message>,
1026    nodes_dir: PathBuf,
1027}
1028
1029impl ExecutionRuntimeHandle {
1030    /// Returns the base directory where execution node data is stored.
1031    pub fn nodes_dir(&self) -> &Path {
1032        &self.nodes_dir
1033    }
1034
1035    /// Requests a new execution node and blocks until its returned.
1036    pub async fn spawn_node(
1037        &self,
1038        name: &str,
1039        config: ExecutionNodeConfig,
1040        database: DatabaseEnv,
1041        rocksdb: Option<RocksDBProvider>,
1042    ) -> eyre::Result<ExecutionNode> {
1043        let (tx, rx) = oneshot::channel();
1044        self.to_runtime
1045            .send(Message::SpawnNode {
1046                name: name.to_string(),
1047                config: Box::new(config),
1048                database,
1049                rocksdb,
1050                response: tx,
1051            })
1052            .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
1053        rx.await.wrap_err(
1054            "the execution runtime dropped the response channel before sending an execution node",
1055        )
1056    }
1057}
1058
1059/// An execution node spawned by the execution runtime.
1060///
1061/// This is essentially the same as [`reth_node_builder::NodeHandle`], but
1062/// avoids the type parameters.
1063pub struct ExecutionNode {
1064    /// All handles to interact with the launched node instances and services.
1065    pub node: TempoFullNode,
1066    /// The [`Runtime`] that drives the node's services.
1067    pub runtime: Runtime,
1068    /// The exist future that resolves when the node's engine future resolves.
1069    pub exit_fut: NodeExitFuture,
1070}
1071
1072impl ExecutionNode {
1073    /// Connect peers bidirectionally.
1074    pub async fn connect_peer(&self, other: &Self) {
1075        let self_record = self.node.network.local_node_record();
1076        let other_record = other.node.network.local_node_record();
1077        let mut events = self.node.network.event_listener();
1078
1079        self.node
1080            .network
1081            .add_trusted_peer(other_record.id, other_record.tcp_addr());
1082
1083        match events.next().await {
1084            Some(NetworkEvent::Peer(PeerEvent::PeerAdded(_))) => (),
1085            ev => panic!("Expected a peer added event, got: {ev:?}"),
1086        }
1087
1088        match events.next().await {
1089            Some(NetworkEvent::ActivePeerSession { .. }) => (),
1090            ev => panic!("Expected an active peer session event, got: {ev:?}"),
1091        }
1092
1093        tracing::debug!(
1094            "Connected peers: {:?} -> {:?}",
1095            self_record.id,
1096            other_record.id
1097        );
1098    }
1099
1100    /// Shuts down the node and awaits until the node is terminated.
1101    pub async fn shutdown(self) {
1102        let _ = self.node.rpc_server_handle().clone().stop();
1103        self.runtime
1104            .graceful_shutdown_with_timeout(Duration::from_secs(10));
1105        let _ = self.exit_fut.await;
1106    }
1107}
1108
1109/// Returns the chainspec used for e2e tests.
1110///
1111/// TODO(janis): allow configuring this.
1112pub fn chainspec() -> TempoChainSpec {
1113    TempoChainSpec::from_genesis(genesis())
1114}
1115
1116/// Generate execution node name from public key.
1117pub fn execution_node_name(public_key: &PublicKey) -> String {
1118    format!("{}-{}", crate::EXECUTION_NODE_PREFIX, public_key)
1119}
1120
1121// TODO(janis): would be nicer if we could identify the node somehow?
1122impl std::fmt::Debug for ExecutionNode {
1123    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1124        f.debug_struct("ExecutionNode")
1125            .field("node", &"<TempoFullNode>")
1126            .field("exit_fut", &"<NodeExitFuture>")
1127            .finish()
1128    }
1129}
1130
1131pub fn genesis() -> Genesis {
1132    serde_json::from_str(include_str!("../../node/tests/assets/test-genesis.json")).unwrap()
1133}
1134
1135/// Launches a tempo execution node.
1136///
1137/// Difference compared to starting the node through the binary:
1138///
1139/// 1. faucet is always disabled
1140/// 2. components are not provided (looking at the node command, the components
1141///    are not passed to it).
1142/// 3. consensus config is not necessary
1143pub async fn launch_execution_node<P: AsRef<Path>>(
1144    runtime: Runtime,
1145    chain_spec: TempoChainSpec,
1146    datadir: P,
1147    config: ExecutionNodeConfig,
1148    database: DatabaseEnv,
1149    rocksdb: Option<RocksDBProvider>,
1150) -> eyre::Result<ExecutionNode> {
1151    println!("launching node at {}", datadir.as_ref().display());
1152    let node_config = NodeConfig::new(Arc::new(chain_spec))
1153        .with_rpc(
1154            RpcServerArgs::default()
1155                .with_unused_ports()
1156                .with_http()
1157                .with_http_api(RpcModuleSelection::All)
1158                .with_ws()
1159                .with_ws_api(RpcModuleSelection::All),
1160        )
1161        .with_datadir_args(DatadirArgs {
1162            datadir: datadir.as_ref().to_path_buf().into(),
1163            ..DatadirArgs::default()
1164        })
1165        .with_payload_builder(PayloadBuilderArgs {
1166            interval: Duration::from_millis(100),
1167            ..Default::default()
1168        })
1169        .with_storage(StorageArgs { v2: false })
1170        .apply(|mut c| {
1171            c.network.discovery.disable_discovery = true;
1172            c.network.trusted_peers = config
1173                .trusted_peers
1174                .into_iter()
1175                .map(|s| {
1176                    s.parse::<TrustedPeer>()
1177                        .expect("invalid trusted peer enode")
1178                })
1179                .collect();
1180            c.network.port = config.port;
1181            c.network.p2p_secret_key_hex = Some(config.secret_key);
1182            c
1183        });
1184
1185    let tempo_node = TempoNode::default().with_validator_key(config.validator_key);
1186    let feed_state = config.feed_state;
1187
1188    let node_handle = if let Some(rocksdb) = rocksdb {
1189        NodeBuilder::new(node_config)
1190            .with_database(database)
1191            .with_rocksdb_provider(rocksdb)
1192    } else {
1193        NodeBuilder::new(node_config).with_database(database)
1194    }
1195    .with_launch_context(runtime.clone())
1196    .node(tempo_node)
1197    .extend_rpc_modules(move |ctx| {
1198        if let Some(feed_state) = feed_state {
1199            ctx.modules
1200                .merge_configured(TempoConsensusRpc::new(feed_state).into_rpc())?;
1201        }
1202        Ok(())
1203    })
1204    .launch()
1205    .await
1206    .wrap_err_with(|| {
1207        format!(
1208            "failed launching node; databasedir: `{}`",
1209            datadir.as_ref().display()
1210        )
1211    })?;
1212
1213    Ok(ExecutionNode {
1214        node: node_handle.node,
1215        runtime,
1216        exit_fut: node_handle.node_exit_future,
1217    })
1218}
1219
1220enum Message {
1221    AddValidator(AddValidator),
1222    AddValidatorV2(AddValidatorV2),
1223    ChangeValidatorStatus(ChangeValidatorStatus),
1224    DeactivateValidatorV2(DeactivateValidatorV2),
1225    GetV1Validators(GetV1Validators),
1226    GetV2Validators(GetV2Validators),
1227    InitializeIfMigrated(InitializeIfMigrated),
1228    MigrateValidator(MigrateValidator),
1229    RotateValidator(RotateValidator),
1230    SetNextFullDkgCeremony(SetNextFullDkgCeremony),
1231    SetNextFullDkgCeremonyV2(SetNextFullDkgCeremonyV2),
1232    SpawnNode {
1233        name: String,
1234        config: Box<ExecutionNodeConfig>,
1235        database: DatabaseEnv,
1236        rocksdb: Option<RocksDBProvider>,
1237        response: tokio::sync::oneshot::Sender<ExecutionNode>,
1238    },
1239    RunAsync(BoxFuture<'static, ()>),
1240    Stop,
1241}
1242
1243impl From<AddValidator> for Message {
1244    fn from(value: AddValidator) -> Self {
1245        Self::AddValidator(value)
1246    }
1247}
1248
1249impl From<AddValidatorV2> for Message {
1250    fn from(value: AddValidatorV2) -> Self {
1251        Self::AddValidatorV2(value)
1252    }
1253}
1254
1255impl From<ChangeValidatorStatus> for Message {
1256    fn from(value: ChangeValidatorStatus) -> Self {
1257        Self::ChangeValidatorStatus(value)
1258    }
1259}
1260
1261impl From<DeactivateValidatorV2> for Message {
1262    fn from(value: DeactivateValidatorV2) -> Self {
1263        Self::DeactivateValidatorV2(value)
1264    }
1265}
1266
1267impl From<GetV1Validators> for Message {
1268    fn from(value: GetV1Validators) -> Self {
1269        Self::GetV1Validators(value)
1270    }
1271}
1272
1273impl From<GetV2Validators> for Message {
1274    fn from(value: GetV2Validators) -> Self {
1275        Self::GetV2Validators(value)
1276    }
1277}
1278
1279impl From<InitializeIfMigrated> for Message {
1280    fn from(value: InitializeIfMigrated) -> Self {
1281        Self::InitializeIfMigrated(value)
1282    }
1283}
1284
1285impl From<MigrateValidator> for Message {
1286    fn from(value: MigrateValidator) -> Self {
1287        Self::MigrateValidator(value)
1288    }
1289}
1290
1291impl From<RotateValidator> for Message {
1292    fn from(value: RotateValidator) -> Self {
1293        Self::RotateValidator(value)
1294    }
1295}
1296
1297impl From<SetNextFullDkgCeremony> for Message {
1298    fn from(value: SetNextFullDkgCeremony) -> Self {
1299        Self::SetNextFullDkgCeremony(value)
1300    }
1301}
1302
1303impl From<SetNextFullDkgCeremonyV2> for Message {
1304    fn from(value: SetNextFullDkgCeremonyV2) -> Self {
1305        Self::SetNextFullDkgCeremonyV2(value)
1306    }
1307}
1308
1309#[derive(Debug)]
1310struct AddValidator {
1311    /// URL of the node to send this to.
1312    http_url: Url,
1313    address: Address,
1314    public_key: PublicKey,
1315    addr: SocketAddr,
1316    response: oneshot::Sender<TransactionReceipt>,
1317}
1318
1319#[derive(Debug)]
1320struct AddValidatorV2 {
1321    /// URL of the node to send this to.
1322    http_url: Url,
1323    private_key: PrivateKey,
1324    address: Address,
1325    ingress: SocketAddr,
1326    egress: IpAddr,
1327    fee_recipient: Address,
1328    response: oneshot::Sender<TransactionReceipt>,
1329}
1330
1331#[derive(Debug)]
1332struct ChangeValidatorStatus {
1333    /// URL of the node to send this to.
1334    http_url: Url,
1335    index: u64,
1336    active: bool,
1337    response: oneshot::Sender<TransactionReceipt>,
1338}
1339
1340#[derive(Debug)]
1341struct DeactivateValidatorV2 {
1342    /// URL of the node to send this to.
1343    http_url: Url,
1344    address: Address,
1345    response: oneshot::Sender<TransactionReceipt>,
1346}
1347
1348struct GetV1Validators {
1349    http_url: Url,
1350    response: oneshot::Sender<Vec<IValidatorConfig::Validator>>,
1351}
1352
1353struct GetV2Validators {
1354    http_url: Url,
1355    response: oneshot::Sender<Vec<IValidatorConfigV2::Validator>>,
1356}
1357
1358#[derive(Debug)]
1359struct InitializeIfMigrated {
1360    /// URL of the node to send this to.
1361    http_url: Url,
1362    response: oneshot::Sender<TransactionReceipt>,
1363}
1364
1365#[derive(Debug)]
1366struct MigrateValidator {
1367    /// URL of the node to send this to.
1368    http_url: Url,
1369    index: u64,
1370    response: oneshot::Sender<TransactionReceipt>,
1371}
1372
1373#[derive(Debug)]
1374struct RotateValidator {
1375    /// URL of the node to send this to.
1376    http_url: Url,
1377    private_key: PrivateKey,
1378    address: Address,
1379    ingress: SocketAddr,
1380    egress: IpAddr,
1381    response: oneshot::Sender<TransactionReceipt>,
1382}
1383
1384#[derive(Debug)]
1385struct SetNextFullDkgCeremony {
1386    /// URL of the node to send this to.
1387    http_url: Url,
1388    epoch: u64,
1389    response: oneshot::Sender<TransactionReceipt>,
1390}
1391
1392#[derive(Debug)]
1393struct SetNextFullDkgCeremonyV2 {
1394    /// URL of the node to send this to.
1395    http_url: Url,
1396    epoch: u64,
1397    response: oneshot::Sender<TransactionReceipt>,
1398}
1399
1400pub fn admin() -> Address {
1401    address(ADMIN_INDEX)
1402}
1403
1404pub fn validator(idx: u32) -> Address {
1405    address(VALIDATOR_START_INDEX + idx)
1406}
1407
1408pub fn address(index: u32) -> Address {
1409    secret_key_to_address(MnemonicBuilder::from_phrase_nth(TEST_MNEMONIC, index).credential())
1410}
1411
1412fn setup_tempo_evm(chain_id: u64) -> TempoEvm<CacheDB<EmptyDB>> {
1413    let db = CacheDB::default();
1414    // revm sets timestamp to 1 by default, override it to 0 for genesis initializations
1415    let mut env = EvmEnv::default().with_timestamp(U256::ZERO);
1416    env.cfg_env.chain_id = chain_id;
1417
1418    let factory = TempoEvmFactory::default();
1419    factory.create_evm(db, env)
1420}
1421
1422fn sign_add_validator_args(
1423    chain_id: u64,
1424    key: &PrivateKey,
1425    address: Address,
1426    ingress: SocketAddr,
1427    egress: IpAddr,
1428    fee_recipient: Address,
1429) -> Signature {
1430    let mut hasher = Keccak256::new();
1431    hasher.update(chain_id.to_be_bytes());
1432    hasher.update(VALIDATOR_CONFIG_V2_ADDRESS.as_slice());
1433    hasher.update(address.as_slice());
1434    hasher.update([ingress.to_string().len() as u8]);
1435    hasher.update(ingress.to_string().as_bytes());
1436    hasher.update([egress.to_string().len() as u8]);
1437    hasher.update(egress.to_string().as_bytes());
1438    hasher.update(fee_recipient.as_slice());
1439    let msg = hasher.finalize();
1440    key.sign(VALIDATOR_NS_ADD, msg.as_slice())
1441}
1442
1443fn sign_rotate_validator_args(
1444    chain_id: u64,
1445    key: &PrivateKey,
1446    address: Address,
1447    ingress: SocketAddr,
1448    egress: IpAddr,
1449) -> Signature {
1450    let mut hasher = Keccak256::new();
1451    hasher.update(chain_id.to_be_bytes());
1452    hasher.update(VALIDATOR_CONFIG_V2_ADDRESS.as_slice());
1453    hasher.update(address.as_slice());
1454    hasher.update([ingress.to_string().len() as u8]);
1455    hasher.update(ingress.to_string().as_bytes());
1456    hasher.update([egress.to_string().len() as u8]);
1457    hasher.update(egress.to_string().as_bytes());
1458    let msg = hasher.finalize();
1459    key.sign(VALIDATOR_NS_ROTATE, msg.as_slice())
1460}