1#![cfg_attr(not(test), warn(unused_crate_dependencies))]
16#![cfg_attr(docsrs, feature(doc_cfg))]
17
18#[cfg(feature = "tracy")]
21use tracy_client as _;
22
23#[global_allocator]
24static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::new_allocator();
25
26#[cfg(all(feature = "jemalloc-prof", unix))]
34#[unsafe(export_name = "_rjem_malloc_conf")]
35static MALLOC_CONF: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0";
36
37mod defaults;
38mod init_state;
39mod tempo_cmd;
40
41use clap::{CommandFactory, FromArgMatches};
42use commonware_runtime::{Metrics, Runner};
43use eyre::WrapErr as _;
44use futures::{FutureExt as _, future::FusedFuture as _};
45use reth_ethereum::{chainspec::EthChainSpec as _, cli::Commands, evm::revm::primitives::B256};
46use reth_ethereum_cli::Cli;
47use reth_node_builder::{NodeHandle, WithLaunchContext};
48use reth_rpc_server_types::DefaultRpcModuleValidator;
49use std::{sync::Arc, thread};
50use tempo_chainspec::spec::{TempoChainSpec, TempoChainSpecParser};
51use tempo_commonware_node::{feed as consensus_feed, run_consensus_stack};
52use tempo_consensus::TempoConsensus;
53use tempo_evm::TempoEvmConfig;
54use tempo_faucet::{
55 args::FaucetArgs,
56 faucet::{TempoFaucetExt, TempoFaucetExtApiServer},
57};
58use tempo_node::{
59 TempoFullNode, TempoNodeArgs,
60 node::TempoNode,
61 rpc::consensus::{TempoConsensusApiServer, TempoConsensusRpc},
62 telemetry::{PrometheusMetricsConfig, install_prometheus_metrics},
63};
64use tokio::sync::oneshot;
65use tracing::{info, info_span};
66
67type TempoCli =
68 Cli<TempoChainSpecParser, TempoArgs, DefaultRpcModuleValidator, tempo_cmd::TempoSubcommand>;
69
70#[derive(Debug, Clone, clap::Args)]
72struct TempoArgs {
73 #[arg(long, value_name = "URL", default_missing_value = "auto", num_args(0..=1), env = "TEMPO_FOLLOW")]
76 pub follow: Option<String>,
77
78 #[command(flatten)]
79 pub telemetry: defaults::TelemetryArgs,
80
81 #[command(flatten)]
82 pub consensus: tempo_commonware_node::Args,
83
84 #[command(flatten)]
85 pub faucet_args: FaucetArgs,
86
87 #[command(flatten)]
88 pub node_args: TempoNodeArgs,
89
90 #[command(flatten)]
91 #[cfg(feature = "pyroscope")]
92 pub pyroscope_args: PyroscopeArgs,
93}
94
95#[cfg(feature = "pyroscope")]
97#[derive(Debug, Clone, PartialEq, Eq, clap::Args)]
98struct PyroscopeArgs {
99 #[arg(long = "pyroscope.enabled", default_value_t = false)]
101 pub pyroscope_enabled: bool,
102
103 #[arg(long = "pyroscope.server-url", default_value = "http://localhost:4040")]
105 pub server_url: String,
106
107 #[arg(long = "pyroscope.application-name", default_value = "tempo")]
109 pub application_name: String,
110
111 #[arg(long = "pyroscope.sample-rate", default_value_t = 100)]
113 pub sample_rate: u32,
114}
115
116fn install_crypto_provider() {
127 rustls::crypto::ring::default_provider()
129 .install_default()
130 .expect("Failed to install default rustls crypto provider");
131}
132
133fn print_extensions_footer() {
136 let is_subcommand_help = std::env::args()
137 .skip(1)
138 .any(|a| !a.starts_with('-') && a != "help");
139 if is_subcommand_help {
140 return;
141 }
142
143 let extensions = match tempo_ext::installed_extensions() {
144 Ok(e) => e,
145 Err(_) => return,
146 };
147 if extensions.is_empty() {
148 return;
149 }
150 let use_color = std::io::IsTerminal::is_terminal(&std::io::stdout());
151 let (b, bu, r) = if use_color {
152 ("\x1b[1m", "\x1b[1m\x1b[4m", "\x1b[0m")
153 } else {
154 ("", "", "")
155 };
156 println!("\n{bu}Extensions:{r}");
157 for (name, desc) in &extensions {
158 if desc.is_empty() {
159 println!(" {b}{name}{r}");
160 } else {
161 println!(" {b}{name:<22}{r} {desc}");
162 }
163 }
164}
165
166fn main() -> eyre::Result<()> {
167 install_crypto_provider();
168
169 reth_cli_util::sigsegv_handler::install();
170
171 tempo_eyre::install()
179 .expect("must install the eyre error hook before constructing any eyre reports");
180
181 if std::env::var_os("RUST_BACKTRACE").is_none() {
183 unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
184 }
185
186 tempo_node::init_version_metadata();
187 defaults::init_defaults();
188
189 let mut cli = match TempoCli::command()
190 .about("Tempo")
191 .try_get_matches_from(std::env::args_os())
192 .and_then(|matches| TempoCli::from_arg_matches(&matches))
193 {
194 Ok(cli) => cli,
195 Err(err) => {
196 if err.kind() == clap::error::ErrorKind::InvalidSubcommand {
197 let code = match tempo_ext::run(std::env::args_os()) {
199 Ok(code) => code,
200 Err(e) => {
201 eprintln!("{e}");
202 1
203 }
204 };
205 std::process::exit(code);
206 }
207
208 if matches!(
209 err.kind(),
210 clap::error::ErrorKind::DisplayHelp
211 | clap::error::ErrorKind::DisplayHelpOnMissingArgumentOrSubcommand
212 ) {
213 let _ = err.print();
214 print_extensions_footer();
215 std::process::exit(0);
216 }
217
218 err.exit();
219 }
220 };
221
222 let mut telemetry_config = None;
224 if let Commands::Node(node_cmd) = &cli.command
225 && let Some(config) = node_cmd
226 .ext
227 .telemetry
228 .try_to_config()
229 .wrap_err("failed to parse telemetry config")?
230 {
231 let consensus_pubkey = node_cmd
232 .ext
233 .consensus
234 .public_key()
235 .wrap_err("failed parsing consensus key")?
236 .map(|k| k.to_string());
237
238 if let Some(pubkey) = &consensus_pubkey {
239 let current = std::env::var("OTEL_RESOURCE_ATTRIBUTES").unwrap_or_default();
243 let new_attrs = if current.is_empty() {
244 format!("consensus_pubkey={pubkey}")
245 } else {
246 format!("{current},consensus_pubkey={pubkey}")
247 };
248
249 unsafe {
251 std::env::set_var("OTEL_RESOURCE_ATTRIBUTES", &new_attrs);
252 }
253 }
254
255 cli.traces.logs_otlp = Some(config.logs_otlp_url.clone());
257 cli.traces.logs_otlp_filter = config
258 .logs_otlp_filter
259 .parse()
260 .wrap_err("invalid default logs filter")?;
261
262 telemetry_config.replace(config);
263 }
264
265 let is_node = matches!(cli.command, Commands::Node(_));
266
267 let (args_and_node_handle_tx, args_and_node_handle_rx) =
268 oneshot::channel::<(TempoFullNode, TempoArgs)>();
269 let (consensus_dead_tx, mut consensus_dead_rx) = oneshot::channel();
270
271 let shutdown_token = tokio_util::sync::CancellationToken::new();
272 let cl_feed_state = consensus_feed::FeedStateHandle::new();
273
274 let shutdown_token_clone = shutdown_token.clone();
275 let cl_feed_state_clone = cl_feed_state.clone();
276 let consensus_handle = thread::spawn(move || {
277 if !is_node {
279 return Ok(());
280 }
281
282 let (node, args) = args_and_node_handle_rx.blocking_recv().wrap_err(
283 "channel closed before consensus-relevant command line args \
284 and a handle to the execution node could be received",
285 )?;
286
287 let ret = if node.config.dev.dev || args.follow.is_some() {
288 futures::executor::block_on(async move {
290 shutdown_token_clone.cancelled().await;
291 Ok(())
292 })
293 } else {
294 let consensus_storage = args.consensus.storage_dir.clone().unwrap_or_else(|| {
295 node.config
296 .datadir
297 .clone()
298 .resolve_datadir(node.chain_spec().chain())
299 .data_dir()
300 .join("consensus")
301 });
302
303 info_span!("prepare_consensus").in_scope(|| {
304 info!(
305 path = %consensus_storage.display(),
306 "determined directory for consensus data",
307 )
308 });
309
310 let runtime_config = commonware_runtime::tokio::Config::default()
311 .with_tcp_nodelay(Some(true))
312 .with_worker_threads(args.consensus.worker_threads)
313 .with_storage_directory(consensus_storage)
314 .with_catch_panics(true);
315
316 let runner = commonware_runtime::tokio::Runner::new(runtime_config);
317
318 runner.start(async move |ctx| {
319 let ctx = ctx.with_label("consensus");
322
323 let mut metrics_server = tempo_commonware_node::metrics::install(
324 ctx.with_label("metrics"),
325 args.consensus.metrics_address,
326 )
327 .fuse();
328
329 if let Some(config) = telemetry_config {
331 let consensus_pubkey = args
332 .consensus
333 .public_key()
334 .wrap_err("failed parsing consensus key")?
335 .map(|k| k.to_string());
336
337 let prometheus_config = PrometheusMetricsConfig {
338 endpoint: config.metrics_prometheus_url,
339 interval: config.metrics_prometheus_interval,
340 auth_header: config.metrics_auth_header,
341 consensus_pubkey,
342 };
343
344 install_prometheus_metrics(
345 ctx.with_label("telemetry_metrics"),
346 prometheus_config,
347 )
348 .wrap_err("failed to start Prometheus metrics exporter")?;
349 }
350
351 let consensus_stack =
352 run_consensus_stack(&ctx, args.consensus, node, cl_feed_state_clone);
353 tokio::pin!(consensus_stack);
354 loop {
355 tokio::select!(
356 biased;
357
358 () = shutdown_token_clone.cancelled() => {
359 break Ok(());
360 }
361
362 ret = &mut consensus_stack => {
363 break ret.and_then(|()| Err(eyre::eyre!(
364 "consensus stack exited unexpectedly"))
365 )
366 .wrap_err("consensus stack failed");
367 }
368
369 ret = &mut metrics_server, if !metrics_server.is_terminated() => {
370 let reason = match ret.wrap_err("task_panicked") {
371 Ok(Ok(())) => "unexpected regular exit".to_string(),
372 Ok(Err(err)) | Err(err) => format!("{err}"),
373 };
374 tracing::warn!(reason, "the metrics server exited");
375 }
376 )
377 }
378 })
379 };
380 let _ = consensus_dead_tx.send(());
381 ret
382 });
383
384 let components =
385 |spec: Arc<TempoChainSpec>| (TempoEvmConfig::new(spec.clone()), TempoConsensus::new(spec));
386
387 cli.run_with_components::<TempoNode>(components, async move |builder, args| {
388 let faucet_args = args.faucet_args.clone();
389 let validator_key = args
390 .consensus
391 .public_key()?
392 .map(|key| B256::from_slice(key.as_ref()));
393
394 #[cfg(feature = "pyroscope")]
396 let pyroscope_agent = if args.pyroscope_args.pyroscope_enabled {
397 let agent = pyroscope::PyroscopeAgent::builder(
398 &args.pyroscope_args.server_url,
399 &args.pyroscope_args.application_name,
400 )
401 .backend(pyroscope_pprofrs::pprof_backend(
402 pyroscope_pprofrs::PprofConfig::new()
403 .sample_rate(args.pyroscope_args.sample_rate)
404 .report_thread_id()
405 .report_thread_name(),
406 ))
407 .build()
408 .wrap_err("failed to build Pyroscope agent")?;
409
410 let agent = agent.start().wrap_err("failed to start Pyroscope agent")?;
411 info!(
412 server_url = %args.pyroscope_args.server_url,
413 application_name = %args.pyroscope_args.application_name,
414 "Pyroscope profiling enabled"
415 );
416
417 Some(agent)
418 } else {
419 None
420 };
421
422 let NodeHandle {
423 node,
424 node_exit_future,
425 } = builder
426 .node(TempoNode::new(&args.node_args, validator_key))
427 .apply(|mut builder: WithLaunchContext<_>| {
428 builder
430 .config_mut()
431 .network
432 .discovery
433 .enable_discv5_discovery = true;
434
435 if let Some(follow) = &args.follow {
439 let follow_url = if follow == "auto" {
440 builder
441 .config()
442 .chain
443 .default_follow_url()
444 .map(|s| s.to_string())
445 } else {
446 Some(follow.clone())
447 };
448 builder.config_mut().debug.rpc_consensus_url = follow_url;
449 }
450
451 builder
452 })
453 .extend_rpc_modules(move |ctx| {
454 if faucet_args.enabled {
455 let ext = TempoFaucetExt::new(
456 faucet_args.addresses(),
457 faucet_args.amount(),
458 faucet_args.provider(),
459 );
460
461 ctx.modules.merge_configured(ext.into_rpc())?;
462 }
463
464 if validator_key.is_some() {
465 ctx.modules
466 .merge_configured(TempoConsensusRpc::new(cl_feed_state).into_rpc())?;
467 }
468
469 Ok(())
470 })
471 .launch_with_debug_capabilities()
472 .await
473 .wrap_err("failed launching execution node")?;
474
475 let _ = args_and_node_handle_tx.send((node, args));
476
477 tokio::select! {
479 _ = node_exit_future => {
480 tracing::info!("execution node exited");
481 }
482 _ = &mut consensus_dead_rx => {
483 tracing::info!("consensus node exited");
484 }
485 _ = tokio::signal::ctrl_c() => {
486 tracing::info!("received shutdown signal");
487 }
488 }
489
490 #[cfg(feature = "pyroscope")]
491 if let Some(agent) = pyroscope_agent {
492 agent.shutdown();
493 }
494
495 Ok(())
496 })
497 .wrap_err("execution node failed")?;
498
499 shutdown_token.cancel();
500
501 match consensus_handle.join() {
502 Ok(Ok(())) => {}
503 Ok(Err(err)) => eprintln!("consensus task exited with error:\n{err:?}"),
504 Err(unwind) => std::panic::resume_unwind(unwind),
505 }
506 Ok(())
507}