Skip to main content

tempo/
main.rs

1//! Main executable for the Reth-Commonware node.
2//!
3//! This binary launches a blockchain node that combines:
4//! - Reth's execution layer for transaction processing and state management
5//! - Commonware's consensus engine for block agreement
6//!
7//! The node operates by:
8//! 1. Starting the Reth node infrastructure (database, networking, RPC)
9//! 2. Creating the application state that bridges Reth and Commonware
10//! 3. Launching the Commonware consensus engine via a separate task and a separate tokio runtime.
11//! 4. Running both components until shutdown
12//!
13//! Configuration can be provided via command-line arguments or configuration files.
14
15#![cfg_attr(not(test), warn(unused_crate_dependencies))]
16#![cfg_attr(docsrs, feature(doc_cfg))]
17
18// tracy-client is an optional dependency activated by the `tracy` feature.
19// It is not used directly but must be present for the `ondemand` feature flag.
20#[cfg(feature = "tracy")]
21use tracy_client as _;
22
23// opentelemetry-otlp is an optional dependency activated by the `otlp` feature.
24// It is not used directly but must be present to enable reqwest rustls support.
25#[cfg(feature = "otlp")]
26use opentelemetry_otlp as _;
27
28#[global_allocator]
29static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::new_allocator();
30
31/// Compile-time jemalloc configuration for heap profiling.
32///
33/// tikv-jemallocator uses prefixed symbols, so the runtime `MALLOC_CONF` env var is ignored.
34/// This exported symbol is read by jemalloc at init time to enable profiling unconditionally
35/// when the `jemalloc-prof` feature is active.
36///
37/// See <https://github.com/jemalloc/jemalloc/wiki/Getting-Started>
38#[cfg(all(feature = "jemalloc-prof", unix))]
39#[unsafe(export_name = "_rjem_malloc_conf")]
40static MALLOC_CONF: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0";
41
42mod defaults;
43mod init_state;
44mod p2p_proxy;
45mod tempo_cmd;
46
47use clap::{CommandFactory, FromArgMatches};
48use commonware_runtime::{Metrics, Runner};
49use eyre::{OptionExt, WrapErr as _};
50use futures::{
51    FutureExt as _,
52    future::{Either, FusedFuture as _},
53};
54use reth_ethereum::{chainspec::EthChainSpec as _, cli::Commands, evm::revm::primitives::B256};
55use reth_ethereum_cli::Cli;
56use reth_network_api::Peers;
57use reth_network_peers::pk2id;
58use reth_node_builder::{NodeHandle, WithLaunchContext};
59use reth_rpc_server_types::{RethRpcModule, RpcModuleSelection, RpcModuleValidator};
60use std::{sync::Arc, thread, time::Duration};
61use tempo_chainspec::spec::{TempoChainSpec, TempoChainSpecParser};
62use tempo_commonware_node::{feed as consensus_feed, run_consensus_stack, run_follow_stack};
63use tempo_consensus::TempoConsensus;
64use tempo_evm::TempoEvmConfig;
65use tempo_faucet::{
66    args::FaucetArgs,
67    faucet::{TempoFaucetExt, TempoFaucetExtApiServer},
68};
69use tempo_node::{
70    TempoFullNode, TempoNodeArgs,
71    node::TempoNode,
72    rpc::consensus::{TempoConsensusApiServer, TempoConsensusRpc},
73    telemetry::{PrometheusMetricsConfig, install_prometheus_metrics},
74};
75use tokio::sync::oneshot;
76use tracing::{debug, info, info_span, warn, warn_span};
77
78type TempoCli =
79    Cli<TempoChainSpecParser, TempoArgs, TempoRpcModuleValidator, tempo_cmd::TempoSubcommand>;
80
81const TEMPO_CUSTOM_RPC_MODULES: &[&str] = &["consensus", "operator", "tempo", "token"];
82
83#[derive(Debug, Clone, Copy)]
84struct TempoRpcModuleValidator;
85
86impl RpcModuleValidator for TempoRpcModuleValidator {
87    fn parse_selection(s: &str) -> Result<RpcModuleSelection, String> {
88        let selection = s
89            .parse::<RpcModuleSelection>()
90            .map_err(|e| format!("Failed to parse RPC modules: {e}"))?;
91
92        if let RpcModuleSelection::Selection(modules) = &selection {
93            for module in modules {
94                let RethRpcModule::Other(name) = module else {
95                    continue;
96                };
97
98                if !TEMPO_CUSTOM_RPC_MODULES.contains(&name.as_str()) {
99                    return Err(format!("Unknown RPC module: '{name}'"));
100                }
101            }
102        }
103
104        Ok(selection)
105    }
106}
107
108// TODO: migrate this to tempo_node eventually.
109#[derive(Debug, Clone, clap::Args)]
110struct TempoArgs {
111    /// Run in follow mode from an upstream node.
112    /// If provided without a value, defaults to the RPC URL for the selected chain.
113    #[arg(long, value_name = "WEBSOCKET_URL", default_missing_value = "auto", num_args(0..=1), env = "TEMPO_FOLLOW")]
114    pub follow: Option<String>,
115
116    /// Disable consensus certification in follow mode. The follower syncs execution
117    /// state from the upstream node without validating consensus state.
118    /// DO NOT USE IN PRODUCTION.
119    #[arg(
120        long = "follow.experimental.certify",
121        requires = "follow",
122        default_value_t = false
123    )]
124    pub follow_certify: bool,
125
126    /// HTTP endpoint that returns a JSON object mapping chain IDs to bootnode lists.
127    ///
128    /// The endpoint must return JSON in the format:
129    /// `{ "<chain_id>": ["enode://...", ...] }`
130    ///
131    /// Bootnodes for the current chain are added as peer hints to the discovery service.
132    ///
133    /// Set to "none" to disable.
134    #[arg(
135        long = "tempo.bootnodes-endpoint",
136        value_name = "URL",
137        default_value = "https://peers.tempo.xyz",
138        env = "TEMPO_BOOTNODES_ENDPOINT"
139    )]
140    pub bootnodes_endpoint: String,
141
142    #[command(flatten)]
143    pub telemetry: defaults::TelemetryArgs,
144
145    #[command(flatten)]
146    pub consensus: tempo_commonware_node::Args,
147
148    #[command(flatten)]
149    pub faucet_args: FaucetArgs,
150
151    #[command(flatten)]
152    pub node_args: TempoNodeArgs,
153
154    #[command(flatten)]
155    #[cfg(feature = "pyroscope")]
156    pub pyroscope_args: PyroscopeArgs,
157}
158
159impl TempoArgs {
160    fn is_following_uncertified(&self) -> bool {
161        self.follow.is_some() && !self.follow_certify
162    }
163
164    /// Whether the consensus engine should be active.
165    ///
166    /// The engine runs when not in dev mode and not following uncertified.
167    fn has_consensus_engine(&self, dev: bool) -> bool {
168        !dev && !self.is_following_uncertified()
169    }
170}
171
172/// Command line arguments for configuring Pyroscope continuous profiling.
173#[cfg(feature = "pyroscope")]
174#[derive(Debug, Clone, PartialEq, Eq, clap::Args)]
175struct PyroscopeArgs {
176    /// Enable Pyroscope continuous profiling
177    #[arg(long = "pyroscope.enabled", default_value_t = false)]
178    pub pyroscope_enabled: bool,
179
180    /// Pyroscope server URL
181    #[arg(long = "pyroscope.server-url", default_value = "http://localhost:4040")]
182    pub server_url: String,
183
184    /// Application name for Pyroscope
185    #[arg(long = "pyroscope.application-name", default_value = "tempo")]
186    pub application_name: String,
187
188    /// Sample rate for profiling (default: 100 Hz)
189    #[arg(long = "pyroscope.sample-rate", default_value_t = 100)]
190    pub sample_rate: u32,
191}
192
193/// Force-install the default crypto provider.
194///
195/// This is necessary in case there are more than one available backends enabled in rustls (ring,
196/// aws-lc-rs).
197///
198/// This should be called high in the main fn.
199///
200/// See also:
201///   <https://github.com/snapview/tokio-tungstenite/issues/353#issuecomment-2455100010>
202///   <https://github.com/awslabs/aws-sdk-rust/discussions/1257>
203fn install_crypto_provider() {
204    // https://github.com/snapview/tokio-tungstenite/issues/353
205    rustls::crypto::ring::default_provider()
206        .install_default()
207        .expect("Failed to install default rustls crypto provider");
208}
209
210trait NodeCommandExt {
211    /// Derive the peer id from the p2p secret key without starting the network.
212    fn peer_id(&self) -> reth_network_peers::PeerId;
213}
214
215impl NodeCommandExt for reth_cli_commands::node::NodeCommand<TempoChainSpecParser, TempoArgs> {
216    fn peer_id(&self) -> reth_network_peers::PeerId {
217        let data_dir = self.datadir.clone().resolve_datadir(self.chain.chain());
218        let sk = self
219            .network
220            .secret_key(data_dir.p2p_secret())
221            .expect("unable to derive peer id from p2p secret");
222
223        pk2id(&sk.public_key(secp256k1::SECP256K1))
224    }
225}
226
227/// Print installed extensions as a footer after root help output.
228/// Skips printing when help is for a subcommand (e.g. `tempo node --help`).
229fn print_extensions_footer() {
230    let is_subcommand_help = std::env::args()
231        .skip(1)
232        .any(|a| !a.starts_with('-') && a != "help");
233    if is_subcommand_help {
234        return;
235    }
236
237    let extensions = match tempo_ext::installed_extensions() {
238        Ok(e) => e,
239        Err(_) => return,
240    };
241    if extensions.is_empty() {
242        return;
243    }
244    let use_color = std::io::IsTerminal::is_terminal(&std::io::stdout());
245    let (b, bu, r) = if use_color {
246        ("\x1b[1m", "\x1b[1m\x1b[4m", "\x1b[0m")
247    } else {
248        ("", "", "")
249    };
250    println!("\n{bu}Extensions:{r}");
251    for (name, desc) in &extensions {
252        if desc.is_empty() {
253            println!("  {b}{name}{r}");
254        } else {
255            println!("  {b}{name:<22}{r} {desc}");
256        }
257    }
258}
259
260/// Fetches bootnodes from the given endpoint for the specified chain ID.
261///
262/// The endpoint must return JSON in the format:
263/// `{ "<chain_id>": ["enode://...", ...] }`
264async fn fetch_bootnodes(
265    endpoint: &str,
266    chain_id: u64,
267) -> eyre::Result<Vec<reth_network_peers::NodeRecord>> {
268    let client = reqwest::Client::builder()
269        .timeout(Duration::from_secs(5))
270        .build()
271        .wrap_err("failed to build HTTP client")?;
272
273    let resp: std::collections::HashMap<String, Vec<String>> = client
274        .get(endpoint)
275        .send()
276        .await
277        .wrap_err("request failed")?
278        .error_for_status()
279        .wrap_err("endpoint returned error status")?
280        .json()
281        .await
282        .wrap_err("failed to parse response as JSON")?;
283
284    let key = chain_id.to_string();
285    let enodes = match resp.get(&key) {
286        Some(enodes) => enodes,
287        None => return Ok(Vec::new()),
288    };
289
290    Ok(reth_network_peers::parse_nodes(enodes))
291}
292
293fn main() -> eyre::Result<()> {
294    install_crypto_provider();
295
296    reth_cli_util::sigsegv_handler::install();
297
298    // XXX: ensures that the error source chain is preserved in
299    // tracing-instrument generated error events. That is, this hook ensures
300    // that functions instrumented like `#[instrument(err)]` will emit an event
301    // that contains the entire error source chain.
302    //
303    // TODO: Can remove this if https://github.com/tokio-rs/tracing/issues/2648
304    // ever gets addressed.
305    tempo_eyre::install()
306        .expect("must install the eyre error hook before constructing any eyre reports");
307
308    // Enable backtraces unless a RUST_BACKTRACE value has already been explicitly provided.
309    if std::env::var_os("RUST_BACKTRACE").is_none() {
310        unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
311    }
312
313    tempo_node::init_version_metadata();
314    defaults::init_defaults();
315
316    let mut cli = match TempoCli::command()
317        .about("Tempo")
318        .try_get_matches_from(std::env::args_os())
319        .and_then(|matches| TempoCli::from_arg_matches(&matches))
320    {
321        Ok(cli) => cli,
322        Err(err) => {
323            if err.kind() == clap::error::ErrorKind::InvalidSubcommand {
324                // Unknown subcommand — try the extension launcher.
325                let code = match tempo_ext::run(std::env::args_os()) {
326                    Ok(code) => code,
327                    Err(e) => {
328                        eprintln!("{e}");
329                        1
330                    }
331                };
332                std::process::exit(code);
333            }
334
335            if matches!(
336                err.kind(),
337                clap::error::ErrorKind::DisplayHelp
338                    | clap::error::ErrorKind::DisplayHelpOnMissingArgumentOrSubcommand
339            ) {
340                let _ = err.print();
341                print_extensions_footer();
342                std::process::exit(0);
343            }
344
345            err.exit();
346        }
347    };
348
349    if let Commands::Node(node_cmd) = &cli.command
350        && node_cmd.engine.share_sparse_trie_with_payload_builder
351        && node_cmd.builder.max_payload_tasks != 1
352    {
353        eyre::bail!(
354            "--engine.share-sparse-trie-with-payload-builder requires --builder.max-tasks to be 1 (got {})",
355            node_cmd.builder.max_payload_tasks
356        );
357    }
358
359    // If telemetry is enabled, set logs OTLP (conflicts_with in TelemetryArgs prevents both being set)
360    let mut telemetry_config = None;
361    if let Commands::Node(node_cmd) = &cli.command
362        && let Some(config) = node_cmd
363            .ext
364            .telemetry
365            .try_to_config()
366            .wrap_err("failed to parse telemetry config")?
367    {
368        let consensus_pubkey = node_cmd
369            .ext
370            .consensus
371            .public_key()
372            .wrap_err("failed parsing consensus key")?
373            .map(|k| k.to_string());
374
375        let peer_id = format!("{:x}", node_cmd.peer_id());
376
377        // VictoriaMetrics does not support merging `extra_fields` query args like `extra_labels` for
378        // metrics. A workaround for now is to directly hook into the `OTEL_RESOURCE_ATTRIBUTES` env var
379        // used at startup to capture contextual information.
380        let mut extra_attrs = vec![format!("peer_id={peer_id}")];
381        if let Some(pubkey) = &consensus_pubkey {
382            extra_attrs.push(format!("consensus_pubkey={pubkey}"));
383        }
384
385        if !extra_attrs.is_empty() {
386            let current = std::env::var("OTEL_RESOURCE_ATTRIBUTES").unwrap_or_default();
387            let new_attrs = if current.is_empty() {
388                extra_attrs.join(",")
389            } else {
390                format!("{current},{}", extra_attrs.join(","))
391            };
392
393            // SAFETY: called at startup before the OTEL SDK is initialised
394            unsafe {
395                std::env::set_var("OTEL_RESOURCE_ATTRIBUTES", &new_attrs);
396            }
397        }
398
399        // Set Reth logs OTLP. Consensus logs are exported as well via the same tracing system.
400        cli.traces.logs_otlp = Some(config.logs_otlp_url.clone());
401        cli.traces.logs_otlp_filter = config
402            .logs_otlp_filter
403            .parse()
404            .wrap_err("invalid default logs filter")?;
405
406        telemetry_config.replace(config);
407    }
408
409    let is_node = matches!(cli.command, Commands::Node(_));
410
411    let (args_and_node_handle_tx, args_and_node_handle_rx) =
412        oneshot::channel::<(TempoFullNode, TempoArgs)>();
413    let (consensus_dead_tx, mut consensus_dead_rx) = oneshot::channel();
414
415    let shutdown_token = tokio_util::sync::CancellationToken::new();
416    let cl_feed_state = consensus_feed::FeedStateHandle::new();
417
418    let shutdown_token_clone = shutdown_token.clone();
419    let cl_feed_state_clone = cl_feed_state.clone();
420
421    let consensus_handle = thread::spawn(move || {
422        // Exit early if we are not executing `tempo node` command.
423        if !is_node {
424            return Ok(());
425        }
426
427        let (node, args) = args_and_node_handle_rx.blocking_recv().wrap_err(
428            "channel closed before consensus-relevant command line args \
429                and a handle to the execution node could be received",
430        )?;
431
432        if !args.has_consensus_engine(node.config.dev.dev) {
433            return futures::executor::block_on(async move {
434                shutdown_token_clone.cancelled().await;
435                Ok(())
436            });
437        }
438
439        let consensus_storage = args.consensus.storage_dir.clone().unwrap_or_else(|| {
440            node.config
441                .datadir
442                .clone()
443                .resolve_datadir(node.chain_spec().chain())
444                .data_dir()
445                .join("consensus")
446        });
447
448        info_span!("prepare_consensus").in_scope(|| {
449            info!(
450                path = %consensus_storage.display(),
451                "determined directory for consensus data",
452            )
453        });
454
455        let runtime_config = commonware_runtime::tokio::Config::default()
456            .with_tcp_nodelay(Some(true))
457            .with_worker_threads(args.consensus.worker_threads)
458            .with_storage_directory(consensus_storage)
459            .with_catch_panics(true);
460
461        let runner = commonware_runtime::tokio::Runner::new(runtime_config);
462        let ret = runner.start(async move |ctx| {
463            let mut metrics_server = tempo_commonware_node::metrics::install(
464                ctx.with_label("metrics"),
465                args.consensus.metrics_address,
466            )
467            .fuse();
468
469            // Start the unified metrics exporter if configured
470            if let Some(config) = telemetry_config {
471                let consensus_pubkey = args
472                    .consensus
473                    .public_key()
474                    .wrap_err("failed parsing consensus key")?
475                    .map(|k| k.to_string());
476
477                let prometheus_config = PrometheusMetricsConfig {
478                    endpoint: config.metrics_prometheus_url,
479                    interval: config.metrics_prometheus_interval,
480                    auth_header: config.metrics_auth_header,
481                    consensus_pubkey,
482                    peer_id: format!("{:x}", node.network.peer_id()),
483                };
484
485                install_prometheus_metrics(ctx.with_label("telemetry_metrics"), prometheus_config)
486                    .wrap_err("failed to start Prometheus metrics exporter")?;
487            }
488
489            let consensus_stack = if let Some(follow) = args.follow {
490                let follow_url = if follow == "auto" {
491                    node.chain_spec()
492                        .default_follow_url()
493                        .map(|s| s.to_string())
494                        .ok_or_eyre("No default follow URL for this chain")?
495                } else {
496                    follow
497                };
498
499                Either::Left(run_follow_stack(
500                    ctx.with_label("follow"),
501                    args.consensus,
502                    follow_url,
503                    Arc::new(node),
504                    cl_feed_state_clone,
505                ))
506            } else {
507                Either::Right(run_consensus_stack(
508                    ctx.with_label("consensus"),
509                    args.consensus,
510                    Arc::new(node),
511                    cl_feed_state_clone,
512                ))
513            };
514
515            tokio::pin!(consensus_stack);
516            loop {
517                tokio::select!(
518                    biased;
519
520                    () = shutdown_token_clone.cancelled() => {
521                        break Ok(());
522                    }
523
524                    ret = &mut consensus_stack => {
525                        break ret.and_then(|()| Err(eyre::eyre!(
526                            "consensus stack exited unexpectedly"))
527                        )
528                        .wrap_err("consensus stack failed");
529                    }
530
531                    ret = &mut metrics_server, if !metrics_server.is_terminated() => {
532                        let reason = match ret.wrap_err("task_panicked") {
533                            Ok(Ok(())) => "unexpected regular exit".to_string(),
534                            Ok(Err(err)) | Err(err) => format!("{err}"),
535                        };
536
537                        warn_span!("consensus_metrics").in_scope(|| {
538                            warn!(reason, "the metrics server exited");
539                        })
540                    }
541                )
542            }
543        });
544
545        let _ = consensus_dead_tx.send(());
546        ret
547    });
548
549    let components =
550        |spec: Arc<TempoChainSpec>| (TempoEvmConfig::new(spec.clone()), TempoConsensus::new(spec));
551
552    cli.run_with_components::<TempoNode>(components, async move |builder, args| {
553        let faucet_args = args.faucet_args.clone();
554        let validator_key = args
555            .consensus
556            .public_key()?
557            .map(|key| B256::from_slice(key.as_ref()));
558
559        // Initialize Pyroscope profiling if enabled
560        #[cfg(feature = "pyroscope")]
561        let pyroscope_agent = if args.pyroscope_args.pyroscope_enabled {
562            let agent = pyroscope::PyroscopeAgent::builder(
563                &args.pyroscope_args.server_url,
564                &args.pyroscope_args.application_name,
565            )
566            .backend(pyroscope_pprofrs::pprof_backend(
567                pyroscope_pprofrs::PprofConfig::new()
568                    .sample_rate(args.pyroscope_args.sample_rate)
569                    .report_thread_id()
570                    .report_thread_name(),
571            ))
572            .build()
573            .wrap_err("failed to build Pyroscope agent")?;
574
575            let agent = agent.start().wrap_err("failed to start Pyroscope agent")?;
576            info!(
577                server_url = %args.pyroscope_args.server_url,
578                application_name = %args.pyroscope_args.application_name,
579                "Pyroscope profiling enabled"
580            );
581
582            Some(agent)
583        } else {
584            None
585        };
586        let chain_id = builder.config().chain.chain().id();
587
588        // Resolve the bootnodes endpoint:
589        // --tempo.bootnodes-endpoint=none -> disabled
590        // otherwise -> use the provided/default URL
591        let bootnodes_endpoint = match args.bootnodes_endpoint.trim() {
592            value if value.eq_ignore_ascii_case("none") => None,
593            url => Some(url.to_string()),
594        };
595
596        let NodeHandle {
597            node,
598            node_exit_future,
599        } = builder
600            .node(TempoNode::new(&args.node_args, validator_key))
601            .apply(|mut builder: WithLaunchContext<_>| {
602                // Enable discv5 peer discovery
603                builder
604                    .config_mut()
605                    .network
606                    .discovery
607                    .enable_discv5_discovery = true;
608
609                // Uncertified follower mode: set debug RPC when certification is off
610                if args.is_following_uncertified() {
611                    let follow_url = args.follow.clone().and_then(|v| {
612                        if v != "auto" {
613                            Some(v)
614                        } else {
615                            builder
616                                .config()
617                                .chain
618                                .default_follow_url()
619                                .map(|s| s.to_string())
620                        }
621                    });
622
623                    builder.config_mut().debug.rpc_consensus_url = follow_url;
624                }
625
626
627                let has_consensus_engine =
628                    args.has_consensus_engine(builder.config().dev.dev);
629
630                builder.extend_rpc_modules(move |ctx| {
631                    if faucet_args.enabled {
632                        let faucet_ext = TempoFaucetExt::new(
633                            faucet_args.addresses(),
634                            faucet_args.amount(),
635                            faucet_args.provider(),
636                        );
637
638                        ctx.modules.merge_configured(faucet_ext.into_rpc())
639                            .wrap_err("failed to register faucet rpc module")?;
640                    }
641
642                    if has_consensus_engine {
643                        let consensus_rpc = TempoConsensusRpc::new(cl_feed_state);
644                        ctx.modules.merge_configured(consensus_rpc.into_rpc())
645                            .wrap_err("failed to register consensus rpc module")?;
646                    }
647
648                    Ok(())
649                })
650            })
651            .launch_with_debug_capabilities()
652            .await
653            .wrap_err("failed launching execution node")?;
654
655        // Fetch bootnodes from the endpoint in a background task and inject
656        // them into the already-running discovery services.
657        if let Some(endpoint) = bootnodes_endpoint {
658            let network = node.network.clone();
659            node.tasks().spawn_task(async move {
660                match fetch_bootnodes(&endpoint, chain_id).await {
661                    Ok(nodes) if nodes.is_empty() => {}
662                    Ok(nodes) => {
663                        info!(
664                            chain_id,
665                            count = nodes.len(),
666                            endpoint,
667                            "fetched bootnodes from endpoint"
668                        );
669                        for node in &nodes {
670                            if let Some(discv4) = network.discv4() {
671                                discv4.add_node(*node);
672                            }
673                            network.add_peer_kind(
674                                node.id,
675                                None,
676                                node.tcp_addr(),
677                                Some(node.udp_addr()),
678                            );
679                        }
680                        if let Some(discv5) = network.discv5() {
681                            let enr_requests = nodes.iter().filter_map(|node| {
682                                match reth_discv5::BootNode::from_unsigned(*node) {
683                                    Ok(boot_node) => Some(async move {
684                                        if let Err(err) = discv5
685                                            .with_discv5(|d| {
686                                                d.request_enr(boot_node.to_string())
687                                            })
688                                            .await
689                                        {
690                                            debug!(%err, %node, "failed adding boot node to discv5");
691                                        }
692                                    }),
693                                    Err(err) => {
694                                        warn!(%err, %node, "failed converting boot node for discv5");
695                                        None
696                                    }
697                                }
698                            });
699                            futures::future::join_all(enr_requests).await;
700                        }
701                    }
702                    Err(err) => {
703                        warn!(%err, endpoint, "failed to fetch bootnodes from endpoint");
704                    }
705                }
706            });
707        }
708
709        let _ = args_and_node_handle_tx.send((node, args));
710
711        // TODO: emit these inside a span
712        tokio::select! {
713            _ = node_exit_future => {
714                tracing::info!("execution node exited");
715            }
716            _ = &mut consensus_dead_rx => {
717                tracing::info!("consensus node exited");
718            }
719            _ = tokio::signal::ctrl_c() => {
720                tracing::info!("received shutdown signal");
721            }
722        }
723
724        #[cfg(feature = "pyroscope")]
725        if let Some(agent) = pyroscope_agent {
726            agent.shutdown();
727        }
728
729        Ok(())
730    })
731    .wrap_err("execution node failed")?;
732
733    shutdown_token.cancel();
734
735    match consensus_handle.join() {
736        Ok(Ok(())) => {}
737        Ok(Err(err)) => eprintln!("consensus task exited with error:\n{err:?}"),
738        Err(unwind) => std::panic::resume_unwind(unwind),
739    }
740    Ok(())
741}