Skip to main content

tempo/
main.rs

1//! Main executable for the Reth-Commonware node.
2//!
3//! This binary launches a blockchain node that combines:
4//! - Reth's execution layer for transaction processing and state management
5//! - Commonware's consensus engine for block agreement
6//!
7//! The node operates by:
8//! 1. Starting the Reth node infrastructure (database, networking, RPC)
9//! 2. Creating the application state that bridges Reth and Commonware
10//! 3. Launching the Commonware consensus engine via a separate task and a separate tokio runtime.
11//! 4. Running both components until shutdown
12//!
13//! Configuration can be provided via command-line arguments or configuration files.
14
15#![cfg_attr(not(test), warn(unused_crate_dependencies))]
16#![cfg_attr(docsrs, feature(doc_cfg))]
17
18// tracy-client is an optional dependency activated by the `tracy` feature.
19// It is not used directly but must be present for the `ondemand` feature flag.
20#[cfg(feature = "tracy")]
21use tracy_client as _;
22
23#[global_allocator]
24static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::new_allocator();
25
26/// Compile-time jemalloc configuration for heap profiling.
27///
28/// tikv-jemallocator uses prefixed symbols, so the runtime `MALLOC_CONF` env var is ignored.
29/// This exported symbol is read by jemalloc at init time to enable profiling unconditionally
30/// when the `jemalloc-prof` feature is active.
31///
32/// See <https://github.com/jemalloc/jemalloc/wiki/Getting-Started>
33#[cfg(all(feature = "jemalloc-prof", unix))]
34#[unsafe(export_name = "_rjem_malloc_conf")]
35static MALLOC_CONF: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0";
36
37mod defaults;
38mod init_state;
39mod tempo_cmd;
40
41use clap::{CommandFactory, FromArgMatches};
42use commonware_runtime::{Metrics, Runner};
43use eyre::WrapErr as _;
44use futures::{FutureExt as _, future::FusedFuture as _};
45use reth_ethereum::{chainspec::EthChainSpec as _, cli::Commands, evm::revm::primitives::B256};
46use reth_ethereum_cli::Cli;
47use reth_node_builder::{NodeHandle, WithLaunchContext};
48use reth_rpc_server_types::DefaultRpcModuleValidator;
49use std::{sync::Arc, thread};
50use tempo_chainspec::spec::{TempoChainSpec, TempoChainSpecParser};
51use tempo_commonware_node::{feed as consensus_feed, run_consensus_stack};
52use tempo_consensus::TempoConsensus;
53use tempo_evm::TempoEvmConfig;
54use tempo_faucet::{
55    args::FaucetArgs,
56    faucet::{TempoFaucetExt, TempoFaucetExtApiServer},
57};
58use tempo_node::{
59    TempoFullNode, TempoNodeArgs,
60    node::TempoNode,
61    rpc::consensus::{TempoConsensusApiServer, TempoConsensusRpc},
62    telemetry::{PrometheusMetricsConfig, install_prometheus_metrics},
63};
64use tokio::sync::oneshot;
65use tracing::{info, info_span};
66
67type TempoCli =
68    Cli<TempoChainSpecParser, TempoArgs, DefaultRpcModuleValidator, tempo_cmd::TempoSubcommand>;
69
70// TODO: migrate this to tempo_node eventually.
71#[derive(Debug, Clone, clap::Args)]
72struct TempoArgs {
73    /// Follow this specific RPC node for block hashes.
74    /// If provided without a value, defaults to the RPC URL for the selected chain.
75    #[arg(long, value_name = "URL", default_missing_value = "auto", num_args(0..=1), env = "TEMPO_FOLLOW")]
76    pub follow: Option<String>,
77
78    #[command(flatten)]
79    pub telemetry: defaults::TelemetryArgs,
80
81    #[command(flatten)]
82    pub consensus: tempo_commonware_node::Args,
83
84    #[command(flatten)]
85    pub faucet_args: FaucetArgs,
86
87    #[command(flatten)]
88    pub node_args: TempoNodeArgs,
89
90    #[command(flatten)]
91    #[cfg(feature = "pyroscope")]
92    pub pyroscope_args: PyroscopeArgs,
93}
94
95/// Command line arguments for configuring Pyroscope continuous profiling.
96#[cfg(feature = "pyroscope")]
97#[derive(Debug, Clone, PartialEq, Eq, clap::Args)]
98struct PyroscopeArgs {
99    /// Enable Pyroscope continuous profiling
100    #[arg(long = "pyroscope.enabled", default_value_t = false)]
101    pub pyroscope_enabled: bool,
102
103    /// Pyroscope server URL
104    #[arg(long = "pyroscope.server-url", default_value = "http://localhost:4040")]
105    pub server_url: String,
106
107    /// Application name for Pyroscope
108    #[arg(long = "pyroscope.application-name", default_value = "tempo")]
109    pub application_name: String,
110
111    /// Sample rate for profiling (default: 100 Hz)
112    #[arg(long = "pyroscope.sample-rate", default_value_t = 100)]
113    pub sample_rate: u32,
114}
115
116/// Force-install the default crypto provider.
117///
118/// This is necessary in case there are more than one available backends enabled in rustls (ring,
119/// aws-lc-rs).
120///
121/// This should be called high in the main fn.
122///
123/// See also:
124///   <https://github.com/snapview/tokio-tungstenite/issues/353#issuecomment-2455100010>
125///   <https://github.com/awslabs/aws-sdk-rust/discussions/1257>
126fn install_crypto_provider() {
127    // https://github.com/snapview/tokio-tungstenite/issues/353
128    rustls::crypto::ring::default_provider()
129        .install_default()
130        .expect("Failed to install default rustls crypto provider");
131}
132
133/// Print installed extensions as a footer after root help output.
134/// Skips printing when help is for a subcommand (e.g. `tempo node --help`).
135fn print_extensions_footer() {
136    let is_subcommand_help = std::env::args()
137        .skip(1)
138        .any(|a| !a.starts_with('-') && a != "help");
139    if is_subcommand_help {
140        return;
141    }
142
143    let extensions = match tempo_ext::installed_extensions() {
144        Ok(e) => e,
145        Err(_) => return,
146    };
147    if extensions.is_empty() {
148        return;
149    }
150    let use_color = std::io::IsTerminal::is_terminal(&std::io::stdout());
151    let (b, bu, r) = if use_color {
152        ("\x1b[1m", "\x1b[1m\x1b[4m", "\x1b[0m")
153    } else {
154        ("", "", "")
155    };
156    println!("\n{bu}Extensions:{r}");
157    for (name, desc) in &extensions {
158        if desc.is_empty() {
159            println!("  {b}{name}{r}");
160        } else {
161            println!("  {b}{name:<22}{r} {desc}");
162        }
163    }
164}
165
166fn main() -> eyre::Result<()> {
167    install_crypto_provider();
168
169    reth_cli_util::sigsegv_handler::install();
170
171    // XXX: ensures that the error source chain is preserved in
172    // tracing-instrument generated error events. That is, this hook ensures
173    // that functions instrumented like `#[instrument(err)]` will emit an event
174    // that contains the entire error source chain.
175    //
176    // TODO: Can remove this if https://github.com/tokio-rs/tracing/issues/2648
177    // ever gets addressed.
178    tempo_eyre::install()
179        .expect("must install the eyre error hook before constructing any eyre reports");
180
181    // Enable backtraces unless a RUST_BACKTRACE value has already been explicitly provided.
182    if std::env::var_os("RUST_BACKTRACE").is_none() {
183        unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
184    }
185
186    tempo_node::init_version_metadata();
187    defaults::init_defaults();
188
189    let mut cli = match TempoCli::command()
190        .about("Tempo")
191        .try_get_matches_from(std::env::args_os())
192        .and_then(|matches| TempoCli::from_arg_matches(&matches))
193    {
194        Ok(cli) => cli,
195        Err(err) => {
196            if err.kind() == clap::error::ErrorKind::InvalidSubcommand {
197                // Unknown subcommand — try the extension launcher.
198                let code = match tempo_ext::run(std::env::args_os()) {
199                    Ok(code) => code,
200                    Err(e) => {
201                        eprintln!("{e}");
202                        1
203                    }
204                };
205                std::process::exit(code);
206            }
207
208            if matches!(
209                err.kind(),
210                clap::error::ErrorKind::DisplayHelp
211                    | clap::error::ErrorKind::DisplayHelpOnMissingArgumentOrSubcommand
212            ) {
213                let _ = err.print();
214                print_extensions_footer();
215                std::process::exit(0);
216            }
217
218            err.exit();
219        }
220    };
221
222    // If telemetry is enabled, set logs OTLP (conflicts_with in TelemetryArgs prevents both being set)
223    let mut telemetry_config = None;
224    if let Commands::Node(node_cmd) = &cli.command
225        && let Some(config) = node_cmd
226            .ext
227            .telemetry
228            .try_to_config()
229            .wrap_err("failed to parse telemetry config")?
230    {
231        let consensus_pubkey = node_cmd
232            .ext
233            .consensus
234            .public_key()
235            .wrap_err("failed parsing consensus key")?
236            .map(|k| k.to_string());
237
238        if let Some(pubkey) = &consensus_pubkey {
239            // VictoriaMetrics does not support merging `extra_fields` query args like `extra_labels` for
240            // metrics. A workaround for now is to directly hook into the `OTEL_RESOURCE_ATTRIBUTES` env var
241            // used at startup to capture contextual information.
242            let current = std::env::var("OTEL_RESOURCE_ATTRIBUTES").unwrap_or_default();
243            let new_attrs = if current.is_empty() {
244                format!("consensus_pubkey={pubkey}")
245            } else {
246                format!("{current},consensus_pubkey={pubkey}")
247            };
248
249            // SAFETY: called at startup before the OTEL SDK is initialised
250            unsafe {
251                std::env::set_var("OTEL_RESOURCE_ATTRIBUTES", &new_attrs);
252            }
253        }
254
255        // Set Reth logs OTLP. Consensus logs are exported as well via the same tracing system.
256        cli.traces.logs_otlp = Some(config.logs_otlp_url.clone());
257        cli.traces.logs_otlp_filter = config
258            .logs_otlp_filter
259            .parse()
260            .wrap_err("invalid default logs filter")?;
261
262        telemetry_config.replace(config);
263    }
264
265    let is_node = matches!(cli.command, Commands::Node(_));
266
267    let (args_and_node_handle_tx, args_and_node_handle_rx) =
268        oneshot::channel::<(TempoFullNode, TempoArgs)>();
269    let (consensus_dead_tx, mut consensus_dead_rx) = oneshot::channel();
270
271    let shutdown_token = tokio_util::sync::CancellationToken::new();
272    let cl_feed_state = consensus_feed::FeedStateHandle::new();
273
274    let shutdown_token_clone = shutdown_token.clone();
275    let cl_feed_state_clone = cl_feed_state.clone();
276    let consensus_handle = thread::spawn(move || {
277        // Exit early if we are not executing `tempo node` command.
278        if !is_node {
279            return Ok(());
280        }
281
282        let (node, args) = args_and_node_handle_rx.blocking_recv().wrap_err(
283            "channel closed before consensus-relevant command line args \
284                and a handle to the execution node could be received",
285        )?;
286
287        let ret = if node.config.dev.dev || args.follow.is_some() {
288            // When --follow is used (with or without a URL), skip consensus stack
289            futures::executor::block_on(async move {
290                shutdown_token_clone.cancelled().await;
291                Ok(())
292            })
293        } else {
294            let consensus_storage = args.consensus.storage_dir.clone().unwrap_or_else(|| {
295                node.config
296                    .datadir
297                    .clone()
298                    .resolve_datadir(node.chain_spec().chain())
299                    .data_dir()
300                    .join("consensus")
301            });
302
303            info_span!("prepare_consensus").in_scope(|| {
304                info!(
305                    path = %consensus_storage.display(),
306                    "determined directory for consensus data",
307                )
308            });
309
310            let runtime_config = commonware_runtime::tokio::Config::default()
311                .with_tcp_nodelay(Some(true))
312                .with_worker_threads(args.consensus.worker_threads)
313                .with_storage_directory(consensus_storage)
314                .with_catch_panics(true);
315
316            let runner = commonware_runtime::tokio::Runner::new(runtime_config);
317
318            runner.start(async move |ctx| {
319                // Ensure all consensus metrics are prefixed. Shadow `ctx` to
320                // not forget.
321                let ctx = ctx.with_label("consensus");
322
323                let mut metrics_server = tempo_commonware_node::metrics::install(
324                    ctx.with_label("metrics"),
325                    args.consensus.metrics_address,
326                )
327                .fuse();
328
329                // Start the unified metrics exporter if configured
330                if let Some(config) = telemetry_config {
331                    let consensus_pubkey = args
332                        .consensus
333                        .public_key()
334                        .wrap_err("failed parsing consensus key")?
335                        .map(|k| k.to_string());
336
337                    let prometheus_config = PrometheusMetricsConfig {
338                        endpoint: config.metrics_prometheus_url,
339                        interval: config.metrics_prometheus_interval,
340                        auth_header: config.metrics_auth_header,
341                        consensus_pubkey,
342                    };
343
344                    install_prometheus_metrics(
345                        ctx.with_label("telemetry_metrics"),
346                        prometheus_config,
347                    )
348                    .wrap_err("failed to start Prometheus metrics exporter")?;
349                }
350
351                let consensus_stack =
352                    run_consensus_stack(&ctx, args.consensus, node, cl_feed_state_clone);
353                tokio::pin!(consensus_stack);
354                loop {
355                    tokio::select!(
356                        biased;
357
358                        () = shutdown_token_clone.cancelled() => {
359                            break Ok(());
360                        }
361
362                        ret = &mut consensus_stack => {
363                            break ret.and_then(|()| Err(eyre::eyre!(
364                                "consensus stack exited unexpectedly"))
365                            )
366                            .wrap_err("consensus stack failed");
367                        }
368
369                        ret = &mut metrics_server, if !metrics_server.is_terminated() => {
370                            let reason = match ret.wrap_err("task_panicked") {
371                                Ok(Ok(())) => "unexpected regular exit".to_string(),
372                                Ok(Err(err)) | Err(err) => format!("{err}"),
373                            };
374                            tracing::warn!(reason, "the metrics server exited");
375                        }
376                    )
377                }
378            })
379        };
380        let _ = consensus_dead_tx.send(());
381        ret
382    });
383
384    let components =
385        |spec: Arc<TempoChainSpec>| (TempoEvmConfig::new(spec.clone()), TempoConsensus::new(spec));
386
387    cli.run_with_components::<TempoNode>(components, async move |builder, args| {
388        let faucet_args = args.faucet_args.clone();
389        let validator_key = args
390            .consensus
391            .public_key()?
392            .map(|key| B256::from_slice(key.as_ref()));
393
394        // Initialize Pyroscope profiling if enabled
395        #[cfg(feature = "pyroscope")]
396        let pyroscope_agent = if args.pyroscope_args.pyroscope_enabled {
397            let agent = pyroscope::PyroscopeAgent::builder(
398                &args.pyroscope_args.server_url,
399                &args.pyroscope_args.application_name,
400            )
401            .backend(pyroscope_pprofrs::pprof_backend(
402                pyroscope_pprofrs::PprofConfig::new()
403                    .sample_rate(args.pyroscope_args.sample_rate)
404                    .report_thread_id()
405                    .report_thread_name(),
406            ))
407            .build()
408            .wrap_err("failed to build Pyroscope agent")?;
409
410            let agent = agent.start().wrap_err("failed to start Pyroscope agent")?;
411            info!(
412                server_url = %args.pyroscope_args.server_url,
413                application_name = %args.pyroscope_args.application_name,
414                "Pyroscope profiling enabled"
415            );
416
417            Some(agent)
418        } else {
419            None
420        };
421
422        let NodeHandle {
423            node,
424            node_exit_future,
425        } = builder
426            .node(TempoNode::new(&args.node_args, validator_key))
427            .apply(|mut builder: WithLaunchContext<_>| {
428                // Enable discv5 peer discovery
429                builder
430                    .config_mut()
431                    .network
432                    .discovery
433                    .enable_discv5_discovery = true;
434
435                // Resolve the follow URL:
436                // --follow or --follow=auto -> use chain-specific default
437                // --follow=URL -> use provided URL
438                if let Some(follow) = &args.follow {
439                    let follow_url = if follow == "auto" {
440                        builder
441                            .config()
442                            .chain
443                            .default_follow_url()
444                            .map(|s| s.to_string())
445                    } else {
446                        Some(follow.clone())
447                    };
448                    builder.config_mut().debug.rpc_consensus_url = follow_url;
449                }
450
451                builder
452            })
453            .extend_rpc_modules(move |ctx| {
454                if faucet_args.enabled {
455                    let ext = TempoFaucetExt::new(
456                        faucet_args.addresses(),
457                        faucet_args.amount(),
458                        faucet_args.provider(),
459                    );
460
461                    ctx.modules.merge_configured(ext.into_rpc())?;
462                }
463
464                if validator_key.is_some() {
465                    ctx.modules
466                        .merge_configured(TempoConsensusRpc::new(cl_feed_state).into_rpc())?;
467                }
468
469                Ok(())
470            })
471            .launch_with_debug_capabilities()
472            .await
473            .wrap_err("failed launching execution node")?;
474
475        let _ = args_and_node_handle_tx.send((node, args));
476
477        // TODO: emit these inside a span
478        tokio::select! {
479            _ = node_exit_future => {
480                tracing::info!("execution node exited");
481            }
482            _ = &mut consensus_dead_rx => {
483                tracing::info!("consensus node exited");
484            }
485            _ = tokio::signal::ctrl_c() => {
486                tracing::info!("received shutdown signal");
487            }
488        }
489
490        #[cfg(feature = "pyroscope")]
491        if let Some(agent) = pyroscope_agent {
492            agent.shutdown();
493        }
494
495        Ok(())
496    })
497    .wrap_err("execution node failed")?;
498
499    shutdown_token.cancel();
500
501    match consensus_handle.join() {
502        Ok(Ok(())) => {}
503        Ok(Err(err)) => eprintln!("consensus task exited with error:\n{err:?}"),
504        Err(unwind) => std::panic::resume_unwind(unwind),
505    }
506    Ok(())
507}