1#![cfg_attr(not(test), warn(unused_crate_dependencies))]
16#![cfg_attr(docsrs, feature(doc_cfg))]
17
18#[cfg(feature = "tracy")]
21use tracy_client as _;
22
23#[cfg(feature = "otlp")]
26use opentelemetry_otlp as _;
27
28#[global_allocator]
29static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::new_allocator();
30
31#[cfg(all(feature = "jemalloc-prof", unix))]
39#[unsafe(export_name = "_rjem_malloc_conf")]
40static MALLOC_CONF: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0";
41
42mod defaults;
43mod init_state;
44mod p2p_proxy;
45mod tempo_cmd;
46
47use clap::{CommandFactory, FromArgMatches};
48use commonware_runtime::{Metrics, Runner};
49use eyre::{OptionExt, WrapErr as _};
50use futures::{
51 FutureExt as _,
52 future::{Either, FusedFuture as _},
53};
54use reth_ethereum::{chainspec::EthChainSpec as _, cli::Commands, evm::revm::primitives::B256};
55use reth_ethereum_cli::Cli;
56use reth_network_api::Peers;
57use reth_network_peers::pk2id;
58use reth_node_builder::{NodeHandle, WithLaunchContext};
59use reth_rpc_server_types::{RethRpcModule, RpcModuleSelection, RpcModuleValidator};
60use std::{sync::Arc, thread, time::Duration};
61use tempo_chainspec::spec::{TempoChainSpec, TempoChainSpecParser};
62use tempo_commonware_node::{feed as consensus_feed, run_consensus_stack, run_follow_stack};
63use tempo_consensus::TempoConsensus;
64use tempo_evm::TempoEvmConfig;
65use tempo_faucet::{
66 args::FaucetArgs,
67 faucet::{TempoFaucetExt, TempoFaucetExtApiServer},
68};
69use tempo_node::{
70 TempoFullNode, TempoNodeArgs,
71 node::TempoNode,
72 rpc::consensus::{TempoConsensusApiServer, TempoConsensusRpc},
73 telemetry::{PrometheusMetricsConfig, install_prometheus_metrics},
74};
75use tokio::sync::oneshot;
76use tracing::{debug, info, info_span, warn, warn_span};
77
78type TempoCli =
79 Cli<TempoChainSpecParser, TempoArgs, TempoRpcModuleValidator, tempo_cmd::TempoSubcommand>;
80
81const TEMPO_CUSTOM_RPC_MODULES: &[&str] = &["consensus", "operator", "tempo", "token"];
82
83#[derive(Debug, Clone, Copy)]
84struct TempoRpcModuleValidator;
85
86impl RpcModuleValidator for TempoRpcModuleValidator {
87 fn parse_selection(s: &str) -> Result<RpcModuleSelection, String> {
88 let selection = s
89 .parse::<RpcModuleSelection>()
90 .map_err(|e| format!("Failed to parse RPC modules: {e}"))?;
91
92 if let RpcModuleSelection::Selection(modules) = &selection {
93 for module in modules {
94 let RethRpcModule::Other(name) = module else {
95 continue;
96 };
97
98 if !TEMPO_CUSTOM_RPC_MODULES.contains(&name.as_str()) {
99 return Err(format!("Unknown RPC module: '{name}'"));
100 }
101 }
102 }
103
104 Ok(selection)
105 }
106}
107
108#[derive(Debug, Clone, clap::Args)]
110struct TempoArgs {
111 #[arg(long, value_name = "WEBSOCKET_URL", default_missing_value = "auto", num_args(0..=1), env = "TEMPO_FOLLOW")]
114 pub follow: Option<String>,
115
116 #[arg(
120 long = "follow.experimental.certify",
121 requires = "follow",
122 default_value_t = false
123 )]
124 pub follow_certify: bool,
125
126 #[arg(
135 long = "tempo.bootnodes-endpoint",
136 value_name = "URL",
137 default_value = "https://peers.tempo.xyz",
138 env = "TEMPO_BOOTNODES_ENDPOINT"
139 )]
140 pub bootnodes_endpoint: String,
141
142 #[command(flatten)]
143 pub telemetry: defaults::TelemetryArgs,
144
145 #[command(flatten)]
146 pub consensus: tempo_commonware_node::Args,
147
148 #[command(flatten)]
149 pub faucet_args: FaucetArgs,
150
151 #[command(flatten)]
152 pub node_args: TempoNodeArgs,
153
154 #[command(flatten)]
155 #[cfg(feature = "pyroscope")]
156 pub pyroscope_args: PyroscopeArgs,
157}
158
159impl TempoArgs {
160 fn is_following_uncertified(&self) -> bool {
161 self.follow.is_some() && !self.follow_certify
162 }
163
164 fn has_consensus_engine(&self, dev: bool) -> bool {
168 !dev && !self.is_following_uncertified()
169 }
170}
171
172#[cfg(feature = "pyroscope")]
174#[derive(Debug, Clone, PartialEq, Eq, clap::Args)]
175struct PyroscopeArgs {
176 #[arg(long = "pyroscope.enabled", default_value_t = false)]
178 pub pyroscope_enabled: bool,
179
180 #[arg(long = "pyroscope.server-url", default_value = "http://localhost:4040")]
182 pub server_url: String,
183
184 #[arg(long = "pyroscope.application-name", default_value = "tempo")]
186 pub application_name: String,
187
188 #[arg(long = "pyroscope.sample-rate", default_value_t = 100)]
190 pub sample_rate: u32,
191}
192
193fn install_crypto_provider() {
204 rustls::crypto::ring::default_provider()
206 .install_default()
207 .expect("Failed to install default rustls crypto provider");
208}
209
210trait NodeCommandExt {
211 fn peer_id(&self) -> reth_network_peers::PeerId;
213}
214
215impl NodeCommandExt for reth_cli_commands::node::NodeCommand<TempoChainSpecParser, TempoArgs> {
216 fn peer_id(&self) -> reth_network_peers::PeerId {
217 let data_dir = self.datadir.clone().resolve_datadir(self.chain.chain());
218 let sk = self
219 .network
220 .secret_key(data_dir.p2p_secret())
221 .expect("unable to derive peer id from p2p secret");
222
223 pk2id(&sk.public_key(secp256k1::SECP256K1))
224 }
225}
226
227fn print_extensions_footer() {
230 let is_subcommand_help = std::env::args()
231 .skip(1)
232 .any(|a| !a.starts_with('-') && a != "help");
233 if is_subcommand_help {
234 return;
235 }
236
237 let extensions = match tempo_ext::installed_extensions() {
238 Ok(e) => e,
239 Err(_) => return,
240 };
241 if extensions.is_empty() {
242 return;
243 }
244 let use_color = std::io::IsTerminal::is_terminal(&std::io::stdout());
245 let (b, bu, r) = if use_color {
246 ("\x1b[1m", "\x1b[1m\x1b[4m", "\x1b[0m")
247 } else {
248 ("", "", "")
249 };
250 println!("\n{bu}Extensions:{r}");
251 for (name, desc) in &extensions {
252 if desc.is_empty() {
253 println!(" {b}{name}{r}");
254 } else {
255 println!(" {b}{name:<22}{r} {desc}");
256 }
257 }
258}
259
260async fn fetch_bootnodes(
265 endpoint: &str,
266 chain_id: u64,
267) -> eyre::Result<Vec<reth_network_peers::NodeRecord>> {
268 let client = reqwest::Client::builder()
269 .timeout(Duration::from_secs(5))
270 .build()
271 .wrap_err("failed to build HTTP client")?;
272
273 let resp: std::collections::HashMap<String, Vec<String>> = client
274 .get(endpoint)
275 .send()
276 .await
277 .wrap_err("request failed")?
278 .error_for_status()
279 .wrap_err("endpoint returned error status")?
280 .json()
281 .await
282 .wrap_err("failed to parse response as JSON")?;
283
284 let key = chain_id.to_string();
285 let enodes = match resp.get(&key) {
286 Some(enodes) => enodes,
287 None => return Ok(Vec::new()),
288 };
289
290 Ok(reth_network_peers::parse_nodes(enodes))
291}
292
293fn main() -> eyre::Result<()> {
294 install_crypto_provider();
295
296 reth_cli_util::sigsegv_handler::install();
297
298 tempo_eyre::install()
306 .expect("must install the eyre error hook before constructing any eyre reports");
307
308 if std::env::var_os("RUST_BACKTRACE").is_none() {
310 unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
311 }
312
313 tempo_node::init_version_metadata();
314 defaults::init_defaults();
315
316 let mut cli = match TempoCli::command()
317 .about("Tempo")
318 .try_get_matches_from(std::env::args_os())
319 .and_then(|matches| TempoCli::from_arg_matches(&matches))
320 {
321 Ok(cli) => cli,
322 Err(err) => {
323 if err.kind() == clap::error::ErrorKind::InvalidSubcommand {
324 let code = match tempo_ext::run(std::env::args_os()) {
326 Ok(code) => code,
327 Err(e) => {
328 eprintln!("{e}");
329 1
330 }
331 };
332 std::process::exit(code);
333 }
334
335 if matches!(
336 err.kind(),
337 clap::error::ErrorKind::DisplayHelp
338 | clap::error::ErrorKind::DisplayHelpOnMissingArgumentOrSubcommand
339 ) {
340 let _ = err.print();
341 print_extensions_footer();
342 std::process::exit(0);
343 }
344
345 err.exit();
346 }
347 };
348
349 if let Commands::Node(node_cmd) = &cli.command
350 && node_cmd.engine.share_sparse_trie_with_payload_builder
351 && node_cmd.builder.max_payload_tasks != 1
352 {
353 eyre::bail!(
354 "--engine.share-sparse-trie-with-payload-builder requires --builder.max-tasks to be 1 (got {})",
355 node_cmd.builder.max_payload_tasks
356 );
357 }
358
359 let mut telemetry_config = None;
361 if let Commands::Node(node_cmd) = &cli.command
362 && let Some(config) = node_cmd
363 .ext
364 .telemetry
365 .try_to_config()
366 .wrap_err("failed to parse telemetry config")?
367 {
368 let consensus_pubkey = node_cmd
369 .ext
370 .consensus
371 .public_key()
372 .wrap_err("failed parsing consensus key")?
373 .map(|k| k.to_string());
374
375 let peer_id = format!("{:x}", node_cmd.peer_id());
376
377 let mut extra_attrs = vec![format!("peer_id={peer_id}")];
381 if let Some(pubkey) = &consensus_pubkey {
382 extra_attrs.push(format!("consensus_pubkey={pubkey}"));
383 }
384
385 if !extra_attrs.is_empty() {
386 let current = std::env::var("OTEL_RESOURCE_ATTRIBUTES").unwrap_or_default();
387 let new_attrs = if current.is_empty() {
388 extra_attrs.join(",")
389 } else {
390 format!("{current},{}", extra_attrs.join(","))
391 };
392
393 unsafe {
395 std::env::set_var("OTEL_RESOURCE_ATTRIBUTES", &new_attrs);
396 }
397 }
398
399 cli.traces.logs_otlp = Some(config.logs_otlp_url.clone());
401 cli.traces.logs_otlp_filter = config
402 .logs_otlp_filter
403 .parse()
404 .wrap_err("invalid default logs filter")?;
405
406 telemetry_config.replace(config);
407 }
408
409 let is_node = matches!(cli.command, Commands::Node(_));
410
411 let (args_and_node_handle_tx, args_and_node_handle_rx) =
412 oneshot::channel::<(TempoFullNode, TempoArgs)>();
413 let (consensus_dead_tx, mut consensus_dead_rx) = oneshot::channel();
414
415 let shutdown_token = tokio_util::sync::CancellationToken::new();
416 let cl_feed_state = consensus_feed::FeedStateHandle::new();
417
418 let shutdown_token_clone = shutdown_token.clone();
419 let cl_feed_state_clone = cl_feed_state.clone();
420
421 let consensus_handle = thread::spawn(move || {
422 if !is_node {
424 return Ok(());
425 }
426
427 let (node, args) = args_and_node_handle_rx.blocking_recv().wrap_err(
428 "channel closed before consensus-relevant command line args \
429 and a handle to the execution node could be received",
430 )?;
431
432 if !args.has_consensus_engine(node.config.dev.dev) {
433 return futures::executor::block_on(async move {
434 shutdown_token_clone.cancelled().await;
435 Ok(())
436 });
437 }
438
439 let consensus_storage = args.consensus.storage_dir.clone().unwrap_or_else(|| {
440 node.config
441 .datadir
442 .clone()
443 .resolve_datadir(node.chain_spec().chain())
444 .data_dir()
445 .join("consensus")
446 });
447
448 info_span!("prepare_consensus").in_scope(|| {
449 info!(
450 path = %consensus_storage.display(),
451 "determined directory for consensus data",
452 )
453 });
454
455 let runtime_config = commonware_runtime::tokio::Config::default()
456 .with_tcp_nodelay(Some(true))
457 .with_worker_threads(args.consensus.worker_threads)
458 .with_storage_directory(consensus_storage)
459 .with_catch_panics(true);
460
461 let runner = commonware_runtime::tokio::Runner::new(runtime_config);
462 let ret = runner.start(async move |ctx| {
463 let mut metrics_server = tempo_commonware_node::metrics::install(
464 ctx.with_label("metrics"),
465 args.consensus.metrics_address,
466 )
467 .fuse();
468
469 if let Some(config) = telemetry_config {
471 let consensus_pubkey = args
472 .consensus
473 .public_key()
474 .wrap_err("failed parsing consensus key")?
475 .map(|k| k.to_string());
476
477 let prometheus_config = PrometheusMetricsConfig {
478 endpoint: config.metrics_prometheus_url,
479 interval: config.metrics_prometheus_interval,
480 auth_header: config.metrics_auth_header,
481 consensus_pubkey,
482 peer_id: format!("{:x}", node.network.peer_id()),
483 };
484
485 install_prometheus_metrics(ctx.with_label("telemetry_metrics"), prometheus_config)
486 .wrap_err("failed to start Prometheus metrics exporter")?;
487 }
488
489 let consensus_stack = if let Some(follow) = args.follow {
490 let follow_url = if follow == "auto" {
491 node.chain_spec()
492 .default_follow_url()
493 .map(|s| s.to_string())
494 .ok_or_eyre("No default follow URL for this chain")?
495 } else {
496 follow
497 };
498
499 Either::Left(run_follow_stack(
500 ctx.with_label("follow"),
501 args.consensus,
502 follow_url,
503 Arc::new(node),
504 cl_feed_state_clone,
505 ))
506 } else {
507 Either::Right(run_consensus_stack(
508 ctx.with_label("consensus"),
509 args.consensus,
510 Arc::new(node),
511 cl_feed_state_clone,
512 ))
513 };
514
515 tokio::pin!(consensus_stack);
516 loop {
517 tokio::select!(
518 biased;
519
520 () = shutdown_token_clone.cancelled() => {
521 break Ok(());
522 }
523
524 ret = &mut consensus_stack => {
525 break ret.and_then(|()| Err(eyre::eyre!(
526 "consensus stack exited unexpectedly"))
527 )
528 .wrap_err("consensus stack failed");
529 }
530
531 ret = &mut metrics_server, if !metrics_server.is_terminated() => {
532 let reason = match ret.wrap_err("task_panicked") {
533 Ok(Ok(())) => "unexpected regular exit".to_string(),
534 Ok(Err(err)) | Err(err) => format!("{err}"),
535 };
536
537 warn_span!("consensus_metrics").in_scope(|| {
538 warn!(reason, "the metrics server exited");
539 })
540 }
541 )
542 }
543 });
544
545 let _ = consensus_dead_tx.send(());
546 ret
547 });
548
549 let components =
550 |spec: Arc<TempoChainSpec>| (TempoEvmConfig::new(spec.clone()), TempoConsensus::new(spec));
551
552 cli.run_with_components::<TempoNode>(components, async move |builder, args| {
553 let faucet_args = args.faucet_args.clone();
554 let validator_key = args
555 .consensus
556 .public_key()?
557 .map(|key| B256::from_slice(key.as_ref()));
558
559 #[cfg(feature = "pyroscope")]
561 let pyroscope_agent = if args.pyroscope_args.pyroscope_enabled {
562 let agent = pyroscope::PyroscopeAgent::builder(
563 &args.pyroscope_args.server_url,
564 &args.pyroscope_args.application_name,
565 )
566 .backend(pyroscope_pprofrs::pprof_backend(
567 pyroscope_pprofrs::PprofConfig::new()
568 .sample_rate(args.pyroscope_args.sample_rate)
569 .report_thread_id()
570 .report_thread_name(),
571 ))
572 .build()
573 .wrap_err("failed to build Pyroscope agent")?;
574
575 let agent = agent.start().wrap_err("failed to start Pyroscope agent")?;
576 info!(
577 server_url = %args.pyroscope_args.server_url,
578 application_name = %args.pyroscope_args.application_name,
579 "Pyroscope profiling enabled"
580 );
581
582 Some(agent)
583 } else {
584 None
585 };
586 let chain_id = builder.config().chain.chain().id();
587
588 let bootnodes_endpoint = match args.bootnodes_endpoint.trim() {
592 value if value.eq_ignore_ascii_case("none") => None,
593 url => Some(url.to_string()),
594 };
595
596 let NodeHandle {
597 node,
598 node_exit_future,
599 } = builder
600 .node(TempoNode::new(&args.node_args, validator_key))
601 .apply(|mut builder: WithLaunchContext<_>| {
602 builder
604 .config_mut()
605 .network
606 .discovery
607 .enable_discv5_discovery = true;
608
609 if args.is_following_uncertified() {
611 let follow_url = args.follow.clone().and_then(|v| {
612 if v != "auto" {
613 Some(v)
614 } else {
615 builder
616 .config()
617 .chain
618 .default_follow_url()
619 .map(|s| s.to_string())
620 }
621 });
622
623 builder.config_mut().debug.rpc_consensus_url = follow_url;
624 }
625
626
627 let has_consensus_engine =
628 args.has_consensus_engine(builder.config().dev.dev);
629
630 builder.extend_rpc_modules(move |ctx| {
631 if faucet_args.enabled {
632 let faucet_ext = TempoFaucetExt::new(
633 faucet_args.addresses(),
634 faucet_args.amount(),
635 faucet_args.provider(),
636 );
637
638 ctx.modules.merge_configured(faucet_ext.into_rpc())
639 .wrap_err("failed to register faucet rpc module")?;
640 }
641
642 if has_consensus_engine {
643 let consensus_rpc = TempoConsensusRpc::new(cl_feed_state);
644 ctx.modules.merge_configured(consensus_rpc.into_rpc())
645 .wrap_err("failed to register consensus rpc module")?;
646 }
647
648 Ok(())
649 })
650 })
651 .launch_with_debug_capabilities()
652 .await
653 .wrap_err("failed launching execution node")?;
654
655 if let Some(endpoint) = bootnodes_endpoint {
658 let network = node.network.clone();
659 node.tasks().spawn_task(async move {
660 match fetch_bootnodes(&endpoint, chain_id).await {
661 Ok(nodes) if nodes.is_empty() => {}
662 Ok(nodes) => {
663 info!(
664 chain_id,
665 count = nodes.len(),
666 endpoint,
667 "fetched bootnodes from endpoint"
668 );
669 for node in &nodes {
670 if let Some(discv4) = network.discv4() {
671 discv4.add_node(*node);
672 }
673 network.add_peer_kind(
674 node.id,
675 None,
676 node.tcp_addr(),
677 Some(node.udp_addr()),
678 );
679 }
680 if let Some(discv5) = network.discv5() {
681 let enr_requests = nodes.iter().filter_map(|node| {
682 match reth_discv5::BootNode::from_unsigned(*node) {
683 Ok(boot_node) => Some(async move {
684 if let Err(err) = discv5
685 .with_discv5(|d| {
686 d.request_enr(boot_node.to_string())
687 })
688 .await
689 {
690 debug!(%err, %node, "failed adding boot node to discv5");
691 }
692 }),
693 Err(err) => {
694 warn!(%err, %node, "failed converting boot node for discv5");
695 None
696 }
697 }
698 });
699 futures::future::join_all(enr_requests).await;
700 }
701 }
702 Err(err) => {
703 warn!(%err, endpoint, "failed to fetch bootnodes from endpoint");
704 }
705 }
706 });
707 }
708
709 let _ = args_and_node_handle_tx.send((node, args));
710
711 tokio::select! {
713 _ = node_exit_future => {
714 tracing::info!("execution node exited");
715 }
716 _ = &mut consensus_dead_rx => {
717 tracing::info!("consensus node exited");
718 }
719 _ = tokio::signal::ctrl_c() => {
720 tracing::info!("received shutdown signal");
721 }
722 }
723
724 #[cfg(feature = "pyroscope")]
725 if let Some(agent) = pyroscope_agent {
726 agent.shutdown();
727 }
728
729 Ok(())
730 })
731 .wrap_err("execution node failed")?;
732
733 shutdown_token.cancel();
734
735 match consensus_handle.join() {
736 Ok(Ok(())) => {}
737 Ok(Err(err)) => eprintln!("consensus task exited with error:\n{err:?}"),
738 Err(unwind) => std::panic::resume_unwind(unwind),
739 }
740 Ok(())
741}