Skip to main content

tempo_e2e/
execution_runtime.rs

1//! The environment to launch tempo execution nodes in.
2use std::{
3    net::{IpAddr, SocketAddr},
4    path::{Path, PathBuf},
5    sync::Arc,
6    time::Duration,
7};
8
9use alloy::{
10    providers::ProviderBuilder,
11    rpc::types::TransactionReceipt,
12    signers::{local::MnemonicBuilder, utils::secret_key_to_address},
13    transports::http::reqwest::Url,
14};
15use alloy_evm::{EvmFactory as _, revm::context::JournalTr};
16use alloy_genesis::{Genesis, GenesisAccount};
17use alloy_primitives::{Address, B256, Keccak256, U256};
18use commonware_codec::Encode;
19use commonware_cryptography::{
20    Signer,
21    ed25519::{PrivateKey, PublicKey, Signature},
22};
23use commonware_runtime::Clock;
24use commonware_utils::ordered;
25use eyre::{OptionExt as _, WrapErr as _};
26use futures::{StreamExt, future::BoxFuture};
27use reth_chainspec::EthChainSpec;
28use reth_db::mdbx::DatabaseEnv;
29use reth_ethereum::{
30    evm::{
31        primitives::EvmEnv,
32        revm::db::{CacheDB, EmptyDB},
33    },
34    network::{
35        Peers as _,
36        api::{NetworkEventListenerProvider, PeerKind, PeersInfo, events::NetworkEvent},
37    },
38    provider::providers::RocksDBProvider,
39    tasks::Runtime,
40};
41use reth_node_builder::{NodeBuilder, NodeConfig};
42use reth_node_core::{
43    args::{DatadirArgs, PayloadBuilderArgs, RpcServerArgs},
44    exit::NodeExitFuture,
45};
46use reth_rpc_builder::RpcModuleSelection;
47use tempfile::TempDir;
48use tempo_chainspec::TempoChainSpec;
49use tempo_consensus::feed::FeedStateHandle;
50use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
51use tempo_node::{
52    TempoFullNode,
53    evm::{TempoEvmFactory, evm::TempoEvm},
54    node::TempoNode,
55    rpc::consensus::{TempoConsensusApiServer, TempoConsensusRpc},
56};
57use tempo_precompiles::{
58    VALIDATOR_CONFIG_V2_ADDRESS,
59    storage::{StorageActions, StorageCtx},
60    validator_config_v2::{
61        IValidatorConfigV2, VALIDATOR_NS_ADD, VALIDATOR_NS_ROTATE, ValidatorConfigV2,
62    },
63};
64use tokio::sync::oneshot;
65
66use crate::{ConsensusNodeConfig, TestingNode};
67
68const ADMIN_INDEX: u32 = 0;
69const VALIDATOR_START_INDEX: u32 = 1;
70
71/// Same mnemonic as used in the imported test-genesis and in the `tempo-node` integration tests.
72pub const TEST_MNEMONIC: &str = "test test test test test test test test test test test junk";
73
74#[derive(Default, Debug)]
75pub struct Builder {
76    epoch_length: Option<u64>,
77    initial_dkg_outcome: Option<OnchainDkgOutcome>,
78    validators: Option<ordered::Map<PublicKey, ConsensusNodeConfig>>,
79}
80
81impl Builder {
82    pub fn new() -> Self {
83        Self {
84            epoch_length: None,
85            initial_dkg_outcome: None,
86            validators: None,
87        }
88    }
89
90    pub fn with_epoch_length(self, epoch_length: u64) -> Self {
91        Self {
92            epoch_length: Some(epoch_length),
93            ..self
94        }
95    }
96
97    pub fn with_initial_dkg_outcome(self, initial_dkg_outcome: OnchainDkgOutcome) -> Self {
98        Self {
99            initial_dkg_outcome: Some(initial_dkg_outcome),
100            ..self
101        }
102    }
103
104    pub fn with_validators(self, validators: ordered::Map<PublicKey, ConsensusNodeConfig>) -> Self {
105        Self {
106            validators: Some(validators),
107            ..self
108        }
109    }
110
111    pub fn launch(self) -> eyre::Result<ExecutionRuntime> {
112        let Self {
113            epoch_length,
114            initial_dkg_outcome,
115            validators,
116        } = self;
117
118        let epoch_length = epoch_length.ok_or_eyre("must specify epoch length")?;
119        let initial_dkg_outcome =
120            initial_dkg_outcome.ok_or_eyre("must specify initial DKG outcome")?;
121        let validators = validators.ok_or_eyre("must specify validators")?;
122
123        assert_eq!(
124            initial_dkg_outcome.next_players(),
125            &ordered::Set::from_iter_dedup(
126                validators
127                    .iter_pairs()
128                    .filter_map(|(key, val)| val.share.is_some().then_some(key.clone()))
129            )
130        );
131
132        let mut genesis = genesis();
133        genesis
134            .config
135            .extra_fields
136            .insert_value("epochLength".to_string(), epoch_length)
137            .unwrap();
138
139        genesis.extra_data = initial_dkg_outcome.encode().to_vec().into();
140
141        // Just remove whatever is already written into chainspec.
142        genesis.alloc.remove(&VALIDATOR_CONFIG_V2_ADDRESS);
143
144        let mut evm = setup_tempo_evm(genesis.config.chain_id);
145        {
146            let cx = evm.ctx_mut();
147            StorageCtx::enter_evm(
148                &mut cx.journaled_state,
149                &cx.block,
150                &cx.cfg,
151                &cx.tx,
152                StorageActions::disabled(),
153                || {
154                    let mut validator_config_v2 = ValidatorConfigV2::new();
155                    validator_config_v2
156                        .initialize(admin())
157                        .wrap_err("failed to initialize validator config v2")
158                        .unwrap();
159
160                    for (public_key, validator) in validators {
161                        if let ConsensusNodeConfig {
162                            address,
163                            ingress,
164                            egress,
165                            fee_recipient,
166                            private_key,
167                            share: Some(_),
168                        } = validator
169                        {
170                            validator_config_v2
171                                .add_validator(
172                                    admin(),
173                                    IValidatorConfigV2::addValidatorCall {
174                                        validatorAddress: address,
175                                        publicKey: public_key.encode().as_ref().try_into().unwrap(),
176                                        ingress: ingress.to_string(),
177                                        egress: egress.ip().to_string(),
178                                        feeRecipient: fee_recipient,
179                                        signature: sign_add_validator_args(
180                                            genesis.config.chain_id,
181                                            &private_key,
182                                            address,
183                                            ingress,
184                                            egress.ip(),
185                                            fee_recipient,
186                                        )
187                                        .encode()
188                                        .to_vec()
189                                        .into(),
190                                    },
191                                )
192                                .unwrap();
193                        }
194                    }
195                },
196            );
197        }
198
199        let evm_state = evm.ctx_mut().journaled_state.evm_state();
200        for (address, account) in evm_state.iter() {
201            let storage = if !account.storage.is_empty() {
202                Some(
203                    account
204                        .storage
205                        .iter()
206                        .map(|(key, val)| ((*key).into(), val.present_value.into()))
207                        .collect(),
208                )
209            } else {
210                None
211            };
212            genesis.alloc.insert(
213                *address,
214                GenesisAccount {
215                    nonce: Some(account.info.nonce),
216                    code: account.info.code.as_ref().map(|c| c.original_bytes()),
217                    storage,
218                    ..Default::default()
219                },
220            );
221        }
222
223        Ok(ExecutionRuntime::with_chain_spec(
224            TempoChainSpec::from_genesis(genesis),
225        ))
226    }
227}
228
229/// Configuration for launching an execution node.
230#[derive(Clone, Debug)]
231pub struct ExecutionNodeConfig {
232    /// Network secret key for the node's identity.
233    pub secret_key: B256,
234    /// Validator public key for filtering subblock transactions.
235    pub validator_key: Option<B256>,
236    /// Feed state handle for consensus RPC (if validator).
237    pub feed_state: Option<FeedStateHandle>,
238    /// Share the engine's sparse trie pipeline with the payload builder.
239    pub share_sparse_trie_with_payload_builder: bool,
240}
241
242impl ExecutionNodeConfig {
243    /// Create a default generator for building multiple execution node configs.
244    pub fn generator() -> ExecutionNodeConfigGenerator {
245        ExecutionNodeConfigGenerator::default()
246    }
247
248    pub fn generate() -> Self {
249        Self {
250            secret_key: B256::random(),
251            validator_key: None,
252            feed_state: None,
253            share_sparse_trie_with_payload_builder: false,
254        }
255    }
256}
257
258/// Generator for creating multiple execution node configurations.
259#[derive(Default)]
260pub struct ExecutionNodeConfigGenerator {
261    count: u32,
262}
263
264impl ExecutionNodeConfigGenerator {
265    /// Set the number of nodes to generate.
266    pub fn with_count(mut self, count: u32) -> Self {
267        self.count = count;
268        self
269    }
270
271    /// Generate the execution node configurations.
272    pub fn generate(self) -> Vec<ExecutionNodeConfig> {
273        (0..self.count)
274            .map(|_| ExecutionNodeConfig::generate())
275            .collect()
276    }
277}
278
279/// An execution runtime wrapping a thread running a [`tokio::runtime::Runtime`].
280///
281/// This is needed to spawn tempo execution nodes, which require a tokio runtime.
282///
283/// The commonware itself is launched in their
284/// [`commonware_runtime::deterministic`] and so this extra effort is necessary.
285pub struct ExecutionRuntime {
286    // The tokio runtime launched on a different thread.
287    rt: std::thread::JoinHandle<()>,
288
289    // Base directory where all reth databases will be initialized.
290    _tempdir: TempDir,
291
292    // Channel to request the runtime to launch new execution nodes.
293    to_runtime: tokio::sync::mpsc::UnboundedSender<Message>,
294}
295
296impl ExecutionRuntime {
297    pub fn builder() -> Builder {
298        Builder::new()
299    }
300
301    /// Constructs a new execution runtime to launch execution nodes.
302    pub fn with_chain_spec(chain_spec: TempoChainSpec) -> Self {
303        let tempdir = tempfile::Builder::new()
304            // TODO(janis): cargo manifest prefix?
305            .prefix("tempo_e2e_test")
306            .disable_cleanup(true)
307            .tempdir()
308            .expect("must be able to create a temp directory run tun tests");
309
310        let (to_runtime, mut from_handle) = tokio::sync::mpsc::unbounded_channel();
311
312        let datadir = tempdir.path().to_path_buf();
313        let rt = std::thread::spawn(move || {
314            let rt = tokio::runtime::Runtime::new()
315                .expect("must be able to initialize a runtime to run execution/reth nodes");
316            let wallet = MnemonicBuilder::from_phrase(crate::execution_runtime::TEST_MNEMONIC)
317                .build()
318                .unwrap();
319            rt.block_on(async move {
320                while let Some(msg) = from_handle.recv().await {
321                    // create a new task manager for the new node instance
322                    let runtime = Runtime::test();
323                    match msg {
324                        Message::AddValidatorV2(add_validator_v2) => {
325                            let AddValidatorV2 {
326                                http_url,
327                                private_key,
328                                address,
329                                ingress,
330                                egress,
331                                fee_recipient,
332                                response,
333                            } = add_validator_v2;
334                            let provider = ProviderBuilder::new()
335                                .wallet(wallet.clone())
336                                .connect_http(http_url);
337                            let validator_config =
338                                IValidatorConfigV2::new(VALIDATOR_CONFIG_V2_ADDRESS, provider);
339                            let receipt = validator_config
340                                .addValidator(
341                                    address,
342                                    private_key
343                                        .public_key()
344                                        .encode()
345                                        .as_ref()
346                                        .try_into()
347                                        .unwrap(),
348                                    ingress.to_string(),
349                                    egress.to_string(),
350                                    fee_recipient,
351                                    sign_add_validator_args(
352                                        EthChainSpec::chain(&chain_spec).id(),
353                                        &private_key,
354                                        address,
355                                        ingress,
356                                        egress,
357                                        fee_recipient,
358                                    )
359                                    .encode()
360                                    .to_vec()
361                                    .into(),
362                                )
363                                .send()
364                                .await
365                                .unwrap()
366                                .get_receipt()
367                                .await
368                                .unwrap();
369                            let _ = response.send(receipt);
370                        }
371                        Message::DeactivateValidatorV2(deacivate_validator_v2) => {
372                            let DeactivateValidatorV2 {
373                                http_url,
374                                address,
375                                response,
376                            } = deacivate_validator_v2;
377                            let provider = ProviderBuilder::new()
378                                .wallet(wallet.clone())
379                                .connect_http(http_url);
380                            let validator_config_v2 =
381                                IValidatorConfigV2::new(VALIDATOR_CONFIG_V2_ADDRESS, provider);
382                            let id = validator_config_v2
383                                .validatorByAddress(address)
384                                .call()
385                                .await
386                                .unwrap()
387                                .index;
388                            let receipt = validator_config_v2
389                                .deactivateValidator(id)
390                                .send()
391                                .await
392                                .unwrap()
393                                .get_receipt()
394                                .await
395                                .unwrap();
396                            let _ = response.send(receipt);
397                        }
398                        Message::GetV2Validators(get_v2_validators) => {
399                            let GetV2Validators { http_url, response } = get_v2_validators;
400                            let provider = ProviderBuilder::new()
401                                .wallet(wallet.clone())
402                                .connect_http(http_url);
403                            let validator_config =
404                                IValidatorConfigV2::new(VALIDATOR_CONFIG_V2_ADDRESS, provider);
405                            let validators =
406                                validator_config.getActiveValidators().call().await.unwrap();
407                            let _ = response.send(validators);
408                        }
409                        Message::RotateValidator(rotate_validator) => {
410                            let RotateValidator {
411                                http_url,
412                                private_key,
413                                address,
414                                ingress,
415                                egress,
416                                response,
417                            } = rotate_validator;
418                            let provider = ProviderBuilder::new()
419                                .wallet(wallet.clone())
420                                .connect_http(http_url);
421                            let validator_config =
422                                IValidatorConfigV2::new(VALIDATOR_CONFIG_V2_ADDRESS, provider);
423                            let id = validator_config
424                                .validatorByAddress(address)
425                                .call()
426                                .await
427                                .unwrap()
428                                .index;
429                            let receipt = validator_config
430                                .rotateValidator(
431                                    id,
432                                    private_key
433                                        .public_key()
434                                        .encode()
435                                        .as_ref()
436                                        .try_into()
437                                        .unwrap(),
438                                    ingress.to_string(),
439                                    egress.to_string(),
440                                    sign_rotate_validator_args(
441                                        EthChainSpec::chain(&chain_spec).id(),
442                                        &private_key,
443                                        address,
444                                        ingress,
445                                        egress,
446                                    )
447                                    .encode()
448                                    .to_vec()
449                                    .into(),
450                                )
451                                .send()
452                                .await
453                                .unwrap()
454                                .get_receipt()
455                                .await
456                                .unwrap();
457                            let _ = response.send(receipt);
458                        }
459                        Message::SetFeeRecipientV2(set_fee_recipient_v2) => {
460                            let SetFeeRecipientV2 {
461                                http_url,
462                                index,
463                                fee_recipient,
464                                response,
465                            } = set_fee_recipient_v2;
466                            let provider = ProviderBuilder::new()
467                                .wallet(wallet.clone())
468                                .connect_http(http_url);
469                            let validator_config_v2 =
470                                IValidatorConfigV2::new(VALIDATOR_CONFIG_V2_ADDRESS, provider);
471                            let receipt = validator_config_v2
472                                .setFeeRecipient(index, fee_recipient)
473                                .send()
474                                .await
475                                .unwrap()
476                                .get_receipt()
477                                .await
478                                .unwrap();
479                            let _ = response.send(receipt);
480                        }
481                        Message::SetNextFullDkgCeremonyV2(set_next_full_dkg_ceremony_v2) => {
482                            let SetNextFullDkgCeremonyV2 {
483                                http_url,
484                                epoch,
485                                response,
486                            } = set_next_full_dkg_ceremony_v2;
487                            let provider = ProviderBuilder::new()
488                                .wallet(wallet.clone())
489                                .connect_http(http_url);
490                            let validator_config =
491                                IValidatorConfigV2::new(VALIDATOR_CONFIG_V2_ADDRESS, provider);
492                            let receipt = validator_config
493                                .setNetworkIdentityRotationEpoch(epoch)
494                                .send()
495                                .await
496                                .unwrap()
497                                .get_receipt()
498                                .await
499                                .unwrap();
500                            let _ = response.send(receipt);
501                        }
502                        Message::SpawnNode {
503                            name,
504                            config,
505                            database,
506                            rocksdb,
507                            response,
508                        } => {
509                            let node = launch_execution_node(
510                                runtime,
511                                chain_spec.clone(),
512                                datadir.join(name),
513                                *config,
514                                database,
515                                rocksdb,
516                            )
517                            .await
518                            .expect("must be able to launch execution nodes");
519                            response.send(node).expect(
520                                "receiver must hold the return channel until the node is returned",
521                            );
522                        }
523                        Message::RunAsync(fut) => {
524                            fut.await;
525                        }
526                        Message::Stop => {
527                            break;
528                        }
529                    }
530                }
531            })
532        });
533
534        Self {
535            rt,
536            _tempdir: tempdir,
537            to_runtime,
538        }
539    }
540
541    /// Returns a handle to this runtime.
542    ///
543    /// Can be used to spawn nodes.
544    pub fn handle(&self) -> ExecutionRuntimeHandle {
545        ExecutionRuntimeHandle {
546            to_runtime: self.to_runtime.clone(),
547            nodes_dir: self._tempdir.path().to_path_buf(),
548        }
549    }
550
551    pub async fn add_validator_v2<C: Clock>(
552        &self,
553        http_url: Url,
554        validator: &TestingNode<C>,
555    ) -> eyre::Result<TransactionReceipt> {
556        let (tx, rx) = oneshot::channel();
557        self.to_runtime
558            .send(
559                AddValidatorV2 {
560                    http_url,
561                    private_key: validator.private_key().clone(),
562                    address: validator.chain_address,
563                    ingress: validator.ingress(),
564                    egress: validator.egress(),
565                    fee_recipient: validator.fee_recipient(),
566                    response: tx,
567                }
568                .into(),
569            )
570            .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
571        rx.await
572            .wrap_err("the execution runtime dropped the response channel before sending a receipt")
573    }
574
575    pub async fn deactivate_validator_v2<C: Clock>(
576        &self,
577        http_url: Url,
578        validator: &TestingNode<C>,
579    ) -> eyre::Result<TransactionReceipt> {
580        let (tx, rx) = oneshot::channel();
581        self.to_runtime
582            .send(
583                DeactivateValidatorV2 {
584                    http_url,
585                    address: validator.chain_address,
586                    response: tx,
587                }
588                .into(),
589            )
590            .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
591        rx.await
592            .wrap_err("the execution runtime dropped the response channel before sending a receipt")
593    }
594
595    pub async fn set_fee_recipient_v2(
596        &self,
597        http_url: Url,
598        index: u64,
599        fee_recipient: Address,
600    ) -> eyre::Result<TransactionReceipt> {
601        let (tx, rx) = oneshot::channel();
602        self.to_runtime
603            .send(
604                SetFeeRecipientV2 {
605                    http_url,
606                    index,
607                    fee_recipient,
608                    response: tx,
609                }
610                .into(),
611            )
612            .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
613        rx.await
614            .wrap_err("the execution runtime dropped the response channel before sending a receipt")
615    }
616
617    pub async fn get_v2_validators(
618        &self,
619        http_url: Url,
620    ) -> eyre::Result<Vec<IValidatorConfigV2::Validator>> {
621        let (tx, rx) = oneshot::channel();
622        self.to_runtime
623            .send(
624                GetV2Validators {
625                    http_url,
626                    response: tx,
627                }
628                .into(),
629            )
630            .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
631        rx.await
632            .wrap_err("the execution runtime dropped the response channel before sending a receipt")
633    }
634
635    pub async fn rotate_validator<C: Clock>(
636        &self,
637        http_url: Url,
638        validator: &TestingNode<C>,
639    ) -> eyre::Result<TransactionReceipt> {
640        let (response, rx) = oneshot::channel();
641        self.to_runtime
642            .send(
643                RotateValidator {
644                    http_url,
645                    private_key: validator.private_key().clone(),
646                    address: validator.chain_address,
647                    ingress: validator.ingress(),
648                    egress: validator.egress(),
649                    response,
650                }
651                .into(),
652            )
653            .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
654        rx.await
655            .wrap_err("the execution runtime dropped the response channel before sending a receipt")
656    }
657
658    pub async fn set_next_full_dkg_ceremony_v2(
659        &self,
660        http_url: Url,
661        epoch: u64,
662    ) -> eyre::Result<TransactionReceipt> {
663        let (tx, rx) = oneshot::channel();
664        self.to_runtime
665            .send(
666                SetNextFullDkgCeremonyV2 {
667                    http_url,
668                    epoch,
669                    response: tx,
670                }
671                .into(),
672            )
673            .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
674        rx.await
675            .wrap_err("the execution runtime dropped the response channel before sending a receipt")
676    }
677
678    /// Run an async task on the execution runtime's tokio runtime.
679    ///
680    /// This is useful for running code that requires a tokio runtime (like jsonrpsee clients)
681    /// from within the deterministic executor context.
682    pub async fn run_async<Fut, T>(&self, fut: Fut) -> eyre::Result<T>
683    where
684        Fut: std::future::Future<Output = T> + Send + 'static,
685        T: Send + 'static,
686    {
687        let (tx, rx) = oneshot::channel();
688        self.to_runtime
689            .send(Message::RunAsync(Box::pin(async move {
690                let result = fut.await;
691                let _ = tx.send(result);
692            })))
693            .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
694        rx.await
695            .wrap_err("the execution runtime dropped the response channel")
696    }
697
698    /// Instructs the runtime to stop and exit.
699    pub fn stop(self) -> eyre::Result<()> {
700        self.to_runtime
701            .send(Message::Stop)
702            .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
703        match self.rt.join() {
704            Ok(()) => Ok(()),
705            Err(e) => std::panic::resume_unwind(e),
706        }
707    }
708}
709
710/// A handle to the execution runtime.
711///
712/// Can be used to spawn nodes.
713#[derive(Clone)]
714pub struct ExecutionRuntimeHandle {
715    to_runtime: tokio::sync::mpsc::UnboundedSender<Message>,
716    nodes_dir: PathBuf,
717}
718
719impl ExecutionRuntimeHandle {
720    /// Returns the base directory where execution node data is stored.
721    pub fn nodes_dir(&self) -> &Path {
722        &self.nodes_dir
723    }
724
725    /// Requests a new execution node and blocks until its returned.
726    pub async fn spawn_node(
727        &self,
728        name: &str,
729        config: ExecutionNodeConfig,
730        database: DatabaseEnv,
731        rocksdb: Option<RocksDBProvider>,
732    ) -> eyre::Result<ExecutionNode> {
733        let (tx, rx) = oneshot::channel();
734        self.to_runtime
735            .send(Message::SpawnNode {
736                name: name.to_string(),
737                config: Box::new(config),
738                database,
739                rocksdb,
740                response: tx,
741            })
742            .map_err(|_| eyre::eyre!("the execution runtime went away"))?;
743        rx.await.wrap_err(
744            "the execution runtime dropped the response channel before sending an execution node",
745        )
746    }
747}
748
749/// An execution node spawned by the execution runtime.
750///
751/// This is essentially the same as [`reth_node_builder::NodeHandle`], but
752/// avoids the type parameters.
753pub struct ExecutionNode {
754    /// All handles to interact with the launched node instances and services.
755    pub node: Box<TempoFullNode>,
756    /// The [`Runtime`] that drives the node's services.
757    pub runtime: Runtime,
758    /// The exist future that resolves when the node's engine future resolves.
759    pub exit_fut: NodeExitFuture,
760}
761
762impl ExecutionNode {
763    /// Connect peers bidirectionally.
764    pub async fn connect_peer(&self, other: &Self) {
765        let self_record = self.node.network.local_node_record();
766        let other_record = other.node.network.local_node_record();
767
768        // Skip if already connected
769        if let Ok(Some(_)) = self.node.network.get_peer_by_id(other_record.id).await {
770            return;
771        }
772
773        // Remove any stale peer entries on the other side if present.
774        other
775            .node
776            .network
777            .remove_peer(self_record.id, PeerKind::Basic);
778
779        let mut events = self.node.network.event_listener();
780        self.node.network.connect_peer_kind(
781            other_record.id,
782            PeerKind::Basic,
783            other_record.tcp_addr(),
784            None,
785        );
786
787        // Wait for the active session
788        'wait_for_session: loop {
789            match events.next().await {
790                Some(NetworkEvent::ActivePeerSession { info, .. })
791                    if info.peer_id == other_record.id =>
792                {
793                    break 'wait_for_session;
794                }
795                Some(_) => continue,
796                None => panic!("Network event stream ended unexpectedly"),
797            }
798        }
799
800        tracing::debug!("Connected: {:?} -> {:?}", self_record.id, other_record.id);
801    }
802
803    /// Shuts down the node and awaits until the node is terminated.
804    pub async fn shutdown(self) {
805        let _ = self.node.rpc_server_handle().clone().stop();
806        self.runtime
807            .graceful_shutdown_with_timeout(Duration::from_secs(10));
808        let _ = self.exit_fut.await;
809    }
810}
811
812/// Returns the chainspec used for e2e tests.
813///
814/// TODO(janis): allow configuring this.
815pub fn chainspec() -> TempoChainSpec {
816    TempoChainSpec::from_genesis(genesis())
817}
818
819/// Generate execution node name from public key.
820pub fn execution_node_name(public_key: &PublicKey) -> String {
821    format!("{}-{}", crate::EXECUTION_NODE_PREFIX, public_key)
822}
823
824// TODO(janis): would be nicer if we could identify the node somehow?
825impl std::fmt::Debug for ExecutionNode {
826    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
827        f.debug_struct("ExecutionNode")
828            .field("node", &"<TempoFullNode>")
829            .field("exit_fut", &"<NodeExitFuture>")
830            .finish()
831    }
832}
833
834pub fn genesis() -> Genesis {
835    serde_json::from_str(include_str!("../../node/tests/assets/test-genesis.json")).unwrap()
836}
837
838/// Returns MDBX DB args sized for tests (64 MB max with 4 MB growth step).
839///
840/// The default 8 TB geometry with 4 GB growth step both exhausts process
841/// virtual-address space when many databases are open concurrently across
842/// parallel test threads, and pre-allocates multi-GB files on disk that
843/// can fill the CI runner's disk.
844pub fn test_db_args() -> reth_db::mdbx::DatabaseArguments {
845    reth_db::mdbx::DatabaseArguments::test()
846}
847
848/// Launches a tempo execution node.
849///
850/// Difference compared to starting the node through the binary:
851///
852/// 1. faucet is always disabled
853/// 2. components are not provided (looking at the node command, the components
854///    are not passed to it).
855/// 3. consensus config is not necessary
856pub async fn launch_execution_node<P: AsRef<Path>>(
857    runtime: Runtime,
858    chain_spec: TempoChainSpec,
859    datadir: P,
860    config: ExecutionNodeConfig,
861    database: DatabaseEnv,
862    rocksdb: Option<RocksDBProvider>,
863) -> eyre::Result<ExecutionNode> {
864    println!("launching node at {}", datadir.as_ref().display());
865    let ExecutionNodeConfig {
866        secret_key,
867        validator_key,
868        feed_state,
869        share_sparse_trie_with_payload_builder,
870    } = config;
871    let node_config = NodeConfig::new(Arc::new(chain_spec))
872        .with_rpc(
873            RpcServerArgs::default()
874                .with_unused_ports()
875                .with_http()
876                .with_http_api(RpcModuleSelection::All)
877                .with_ws()
878                .with_ws_api(RpcModuleSelection::All),
879        )
880        .with_datadir_args(DatadirArgs {
881            datadir: datadir.as_ref().to_path_buf().into(),
882            ..DatadirArgs::default()
883        })
884        .with_payload_builder(PayloadBuilderArgs {
885            interval: Duration::from_millis(100),
886            ..Default::default()
887        })
888        .apply(|mut c| {
889            c.network.discovery.disable_discovery = true;
890            c.network = c.network.with_unused_ports();
891            c.network.p2p_secret_key_hex = Some(secret_key);
892            // Match Tempo's engine default for nodes launched by tests.
893            c.engine.suppress_persistence_during_build = true;
894            c.engine.share_sparse_trie_with_payload_builder =
895                share_sparse_trie_with_payload_builder;
896            c
897        });
898
899    let tempo_node = TempoNode::default().with_validator_key(validator_key);
900
901    let node_handle = if let Some(rocksdb) = rocksdb {
902        NodeBuilder::new(node_config)
903            .with_database(database)
904            .with_rocksdb_provider(rocksdb)
905    } else {
906        NodeBuilder::new(node_config).with_database(database)
907    }
908    .with_launch_context(runtime.clone())
909    .node(tempo_node)
910    .extend_rpc_modules(move |ctx| {
911        if let Some(feed_state) = feed_state {
912            ctx.modules
913                .merge_configured(TempoConsensusRpc::new(feed_state).into_rpc())?;
914        }
915        Ok(())
916    })
917    .launch()
918    .await
919    .wrap_err_with(|| {
920        format!(
921            "failed launching node; databasedir: `{}`",
922            datadir.as_ref().display()
923        )
924    })?;
925
926    Ok(ExecutionNode {
927        node: Box::new(node_handle.node),
928        runtime,
929        exit_fut: node_handle.node_exit_future,
930    })
931}
932
933enum Message {
934    AddValidatorV2(AddValidatorV2),
935    DeactivateValidatorV2(DeactivateValidatorV2),
936    GetV2Validators(GetV2Validators),
937    RotateValidator(RotateValidator),
938    SetFeeRecipientV2(SetFeeRecipientV2),
939    SetNextFullDkgCeremonyV2(SetNextFullDkgCeremonyV2),
940    SpawnNode {
941        name: String,
942        config: Box<ExecutionNodeConfig>,
943        database: DatabaseEnv,
944        rocksdb: Option<RocksDBProvider>,
945        response: tokio::sync::oneshot::Sender<ExecutionNode>,
946    },
947    RunAsync(BoxFuture<'static, ()>),
948    Stop,
949}
950
951impl From<AddValidatorV2> for Message {
952    fn from(value: AddValidatorV2) -> Self {
953        Self::AddValidatorV2(value)
954    }
955}
956
957impl From<DeactivateValidatorV2> for Message {
958    fn from(value: DeactivateValidatorV2) -> Self {
959        Self::DeactivateValidatorV2(value)
960    }
961}
962
963impl From<GetV2Validators> for Message {
964    fn from(value: GetV2Validators) -> Self {
965        Self::GetV2Validators(value)
966    }
967}
968
969impl From<RotateValidator> for Message {
970    fn from(value: RotateValidator) -> Self {
971        Self::RotateValidator(value)
972    }
973}
974
975impl From<SetFeeRecipientV2> for Message {
976    fn from(value: SetFeeRecipientV2) -> Self {
977        Self::SetFeeRecipientV2(value)
978    }
979}
980
981impl From<SetNextFullDkgCeremonyV2> for Message {
982    fn from(value: SetNextFullDkgCeremonyV2) -> Self {
983        Self::SetNextFullDkgCeremonyV2(value)
984    }
985}
986
987#[derive(Debug)]
988struct AddValidatorV2 {
989    /// URL of the node to send this to.
990    http_url: Url,
991    private_key: PrivateKey,
992    address: Address,
993    ingress: SocketAddr,
994    egress: IpAddr,
995    fee_recipient: Address,
996    response: oneshot::Sender<TransactionReceipt>,
997}
998
999#[derive(Debug)]
1000struct DeactivateValidatorV2 {
1001    /// URL of the node to send this to.
1002    http_url: Url,
1003    address: Address,
1004    response: oneshot::Sender<TransactionReceipt>,
1005}
1006
1007struct GetV2Validators {
1008    http_url: Url,
1009    response: oneshot::Sender<Vec<IValidatorConfigV2::Validator>>,
1010}
1011
1012#[derive(Debug)]
1013struct RotateValidator {
1014    /// URL of the node to send this to.
1015    http_url: Url,
1016    private_key: PrivateKey,
1017    address: Address,
1018    ingress: SocketAddr,
1019    egress: IpAddr,
1020    response: oneshot::Sender<TransactionReceipt>,
1021}
1022
1023#[derive(Debug)]
1024struct SetFeeRecipientV2 {
1025    /// URL of the node to send this to.
1026    http_url: Url,
1027    index: u64,
1028    fee_recipient: Address,
1029    response: oneshot::Sender<TransactionReceipt>,
1030}
1031
1032#[derive(Debug)]
1033struct SetNextFullDkgCeremonyV2 {
1034    /// URL of the node to send this to.
1035    http_url: Url,
1036    epoch: u64,
1037    response: oneshot::Sender<TransactionReceipt>,
1038}
1039
1040pub fn admin() -> Address {
1041    address(ADMIN_INDEX)
1042}
1043
1044pub fn validator(idx: u32) -> Address {
1045    address(VALIDATOR_START_INDEX + idx)
1046}
1047
1048pub fn address(index: u32) -> Address {
1049    secret_key_to_address(MnemonicBuilder::from_phrase_nth(TEST_MNEMONIC, index).credential())
1050}
1051
1052fn setup_tempo_evm(chain_id: u64) -> TempoEvm<CacheDB<EmptyDB>> {
1053    let db = CacheDB::default();
1054    // revm sets timestamp to 1 by default, override it to 0 for genesis initializations
1055    let mut env = EvmEnv::default().with_timestamp(U256::ZERO);
1056    env.cfg_env.chain_id = chain_id;
1057
1058    let factory = TempoEvmFactory::default();
1059    factory.create_evm(db, env)
1060}
1061
1062fn sign_add_validator_args(
1063    chain_id: u64,
1064    key: &PrivateKey,
1065    address: Address,
1066    ingress: SocketAddr,
1067    egress: IpAddr,
1068    fee_recipient: Address,
1069) -> Signature {
1070    let mut hasher = Keccak256::new();
1071    hasher.update(chain_id.to_be_bytes());
1072    hasher.update(VALIDATOR_CONFIG_V2_ADDRESS.as_slice());
1073    hasher.update(address.as_slice());
1074    hasher.update([ingress.to_string().len() as u8]);
1075    hasher.update(ingress.to_string().as_bytes());
1076    hasher.update([egress.to_string().len() as u8]);
1077    hasher.update(egress.to_string().as_bytes());
1078    hasher.update(fee_recipient.as_slice());
1079    let msg = hasher.finalize();
1080    key.sign(VALIDATOR_NS_ADD, msg.as_slice())
1081}
1082
1083fn sign_rotate_validator_args(
1084    chain_id: u64,
1085    key: &PrivateKey,
1086    address: Address,
1087    ingress: SocketAddr,
1088    egress: IpAddr,
1089) -> Signature {
1090    let mut hasher = Keccak256::new();
1091    hasher.update(chain_id.to_be_bytes());
1092    hasher.update(VALIDATOR_CONFIG_V2_ADDRESS.as_slice());
1093    hasher.update(address.as_slice());
1094    hasher.update([ingress.to_string().len() as u8]);
1095    hasher.update(ingress.to_string().as_bytes());
1096    hasher.update([egress.to_string().len() as u8]);
1097    hasher.update(egress.to_string().as_bytes());
1098    let msg = hasher.finalize();
1099    key.sign(VALIDATOR_NS_ROTATE, msg.as_slice())
1100}