1#![cfg_attr(not(test), warn(unused_crate_dependencies))]
16#![cfg_attr(docsrs, feature(doc_cfg))]
17
18#[cfg(feature = "tracy")]
21use tracy_client as _;
22
23#[cfg(feature = "otlp")]
26use opentelemetry_otlp as _;
27
28pub mod cli;
29mod defaults;
30mod follow;
31pub mod init_state;
32mod overrides;
33pub mod p2p_proxy;
34pub mod regenesis;
35mod snapshot_download;
36mod snapshot_manifest;
37pub mod tempo_cmd;
38mod utils;
39
40pub use crate::{
41 cli::{TempoArgs, TempoCli, TempoRpcModuleValidator},
42 overrides::{TempoNodeMapper, TempoOverrides},
43};
44pub use reth_cli_util as cli_util;
45pub use tempo_node;
46pub use tempo_node as node;
47
48use crate::utils::{
49 block_on_consensus_public_key, fetch_bootnodes, install_crypto_provider,
50 print_extensions_footer,
51};
52use clap::{CommandFactory, FromArgMatches};
53use commonware_runtime::{Metrics, Runner};
54use eyre::{OptionExt, WrapErr as _};
55use futures::{
56 FutureExt as _,
57 future::{Either, FusedFuture as _},
58};
59use reth_cli_runner::CliRunner;
60use reth_ethereum::{chainspec::EthChainSpec as _, cli::Commands, evm::revm::primitives::B256};
61use reth_network_api::Peers;
62use reth_node_builder::{NodeHandle, WithLaunchContext};
63use std::{sync::Arc, thread};
64use tempo_chainspec::spec::TempoChainSpec;
65use tempo_consensus::{feed as consensus_feed, run_consensus_stack, run_follow_stack};
66use tempo_evm::{TempoEvmConfig, consensus::TempoConsensus};
67use tempo_faucet::faucet::{TempoFaucetExt, TempoFaucetExtApiServer};
68pub use tempo_node::{
69 AccountInfoReader, InvalidPoolTransactionError, PoolTransaction, PoolTransactionError,
70 StatefulValidationFn, StatelessValidationFn, TempoNode, TempoNodeArgs,
71 TempoPayloadBuilderBuilder, TempoPoolBuilder, TempoPoolTransactionError,
72 TempoPooledTransaction, TransactionOrigin,
73};
74use tempo_node::{
75 TempoFullNode,
76 rpc::consensus::{TempoConsensusApiServer, TempoConsensusRpc},
77 telemetry::{PrometheusMetricsConfig, install_prometheus_metrics},
78};
79use tokio::sync::oneshot;
80use tracing::{debug, info, info_span, warn, warn_span};
81
82fn apply_tempo_cli_overrides(cli: &mut TempoCli) {
83 if let Commands::Node(node_cmd) = &mut cli.command
84 && node_cmd
85 .ext
86 .node_args
87 .engine_disable_execution_cache_sharing_with_builder
88 {
89 node_cmd.engine.share_execution_cache_with_payload_builder = false;
90 }
91}
92
93pub fn tempo_main() -> eyre::Result<()> {
95 tempo_main_with(TempoOverrides::default())
96}
97
98pub fn tempo_main_with(mut overrides: TempoOverrides) -> eyre::Result<()> {
109 install_crypto_provider();
110
111 reth_cli_util::sigsegv_handler::install();
112
113 tempo_eyre::install()
121 .expect("must install the eyre error hook before constructing any eyre reports");
122
123 if std::env::var_os("RUST_BACKTRACE").is_none() {
125 unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
126 }
127
128 tempo_node::init_version_metadata();
129 defaults::init_defaults();
130
131 let matches = match TempoCli::command()
134 .about("Tempo")
135 .mut_subcommand("snapshot-manifest", |_| snapshot_manifest::Args::command())
136 .mut_subcommand("download", |_| snapshot_download::Args::command())
137 .try_get_matches_from(std::env::args_os())
138 {
139 Ok(matches) => matches,
140 Err(err) => {
141 if err.kind() == clap::error::ErrorKind::InvalidSubcommand {
142 let code = match tempo_ext::run(std::env::args_os()) {
144 Ok(code) => code,
145 Err(e) => {
146 eprintln!("{e}");
147 1
148 }
149 };
150 std::process::exit(code);
151 }
152
153 if matches!(
154 err.kind(),
155 clap::error::ErrorKind::DisplayHelp
156 | clap::error::ErrorKind::DisplayHelpOnMissingArgumentOrSubcommand
157 ) {
158 let _ = err.print();
159 print_extensions_footer();
160 std::process::exit(0);
161 }
162
163 err.exit();
164 }
165 };
166
167 match matches.subcommand() {
170 Some(("snapshot-manifest", sub)) => return snapshot_manifest::run(sub),
171 Some(("download", sub)) => {
172 let runner =
173 CliRunner::try_default_runtime().wrap_err("failed to build download runtime")?;
174 let mut cli = match TempoCli::from_arg_matches(&matches) {
175 Ok(cli) => cli,
176 Err(err) => err.exit(),
177 };
178
179 if let Some(chain_spec) = cli.command.chain_spec() {
180 cli.logs.log_file_directory = cli
181 .logs
182 .log_file_directory
183 .join(chain_spec.chain().to_string());
184 }
185
186 let mut tracing_app = cli.configure();
187 tracing_app
188 .init_tracing(&runner)
189 .wrap_err("failed to initialize tracing")?;
190
191 return snapshot_download::run_with_runner(sub, runner);
192 }
193 _ => {}
194 }
195
196 let mut cli = match TempoCli::from_arg_matches(&matches) {
197 Ok(cli) => cli,
198 Err(err) => err.exit(),
199 };
200
201 apply_tempo_cli_overrides(&mut cli);
202
203 if let Commands::Node(node_cmd) = &cli.command
204 && node_cmd.engine.share_sparse_trie_with_payload_builder
205 && node_cmd.builder.max_payload_tasks != 1
206 {
207 eyre::bail!(
208 "--engine.share-sparse-trie-with-payload-builder requires --builder.max-tasks to be 1 (got {})",
209 node_cmd.builder.max_payload_tasks
210 );
211 }
212
213 let mut telemetry_config = None;
215 if let Commands::Node(node_cmd) = &cli.command
216 && let Some(config) = node_cmd
217 .ext
218 .telemetry
219 .try_to_config()
220 .wrap_err("failed to parse telemetry config")?
221 {
222 let consensus_pubkey = block_on_consensus_public_key(&node_cmd.ext.consensus)
223 .wrap_err("failed parsing consensus key")?
224 .map(|k| k.to_string());
225
226 let peer_id = format!(
227 "{:x}",
228 node_cmd.peer_id().wrap_err("failed to derive peer id")?
229 );
230
231 let mut extra_attrs = vec![format!("peer_id={peer_id}")];
235 if let Some(pubkey) = &consensus_pubkey {
236 extra_attrs.push(format!("consensus_pubkey={pubkey}"));
237 }
238
239 let current = std::env::var("OTEL_RESOURCE_ATTRIBUTES").unwrap_or_default();
240 let new_attrs = if current.is_empty() {
241 extra_attrs.join(",")
242 } else {
243 format!("{current},{}", extra_attrs.join(","))
244 };
245
246 unsafe {
248 std::env::set_var("OTEL_RESOURCE_ATTRIBUTES", &new_attrs);
249 }
250
251 cli.traces.logs_otlp = Some(config.logs_otlp_url.clone());
253 cli.traces.logs_otlp_filter = config
254 .logs_otlp_filter
255 .parse()
256 .wrap_err("invalid default logs filter")?;
257
258 telemetry_config = Some(config);
259 }
260
261 let is_node = matches!(cli.command, Commands::Node(_));
262
263 let (args_and_node_handle_tx, args_and_node_handle_rx) =
264 oneshot::channel::<(TempoFullNode, TempoArgs)>();
265 let (consensus_dead_tx, mut consensus_dead_rx) = oneshot::channel();
266
267 let shutdown_token = tokio_util::sync::CancellationToken::new();
268 let cl_feed_state = consensus_feed::FeedStateHandle::new();
269
270 let shutdown_token_clone = shutdown_token.clone();
271 let cl_feed_state_clone = cl_feed_state.clone();
272
273 let consensus_handle = thread::spawn(move || {
274 if !is_node {
276 return Ok(());
277 }
278
279 let (node, args) = args_and_node_handle_rx.blocking_recv().wrap_err(
280 "channel closed before consensus-relevant command line args \
281 and a handle to the execution node could be received",
282 )?;
283
284 if !args.has_consensus_engine(node.config.dev.dev) {
285 return futures::executor::block_on(async move {
286 shutdown_token_clone.cancelled().await;
287 Ok(())
288 });
289 }
290
291 let consensus_storage = args.consensus.storage_dir.clone().unwrap_or_else(|| {
292 node.config
293 .datadir
294 .clone()
295 .resolve_datadir(node.chain_spec().chain())
296 .data_dir()
297 .join("consensus")
298 });
299
300 info_span!("prepare_consensus").in_scope(|| {
301 info!(
302 path = %consensus_storage.display(),
303 "determined directory for consensus data",
304 )
305 });
306
307 let runtime_config = commonware_runtime::tokio::Config::default()
308 .with_tcp_nodelay(Some(true))
309 .with_worker_threads(args.consensus.worker_threads)
310 .with_storage_directory(consensus_storage)
311 .with_catch_panics(true);
312
313 let runner = commonware_runtime::tokio::Runner::new(runtime_config);
314 let ret = runner.start(async move |ctx| {
315 let mut metrics_server = tempo_consensus::metrics::install(
316 ctx.with_label("metrics"),
317 args.consensus.metrics_address,
318 )
319 .fuse();
320
321 if let Some(config) = telemetry_config {
323 let consensus_pubkey = args
324 .consensus
325 .public_key()
326 .await
327 .wrap_err("failed parsing consensus key")?
328 .map(|k| k.to_string());
329
330 let prometheus_config = PrometheusMetricsConfig {
331 endpoint: config.metrics_prometheus_url,
332 interval: config.metrics_prometheus_interval,
333 auth_header: config.metrics_auth_header,
334 consensus_pubkey,
335 peer_id: format!("{:x}", node.network.peer_id()),
336 };
337
338 install_prometheus_metrics(ctx.with_label("telemetry_metrics"), prometheus_config)
339 .wrap_err("failed to start Prometheus metrics exporter")?;
340 }
341
342 let consensus_stack = if let Some(follow) = args.follow {
343 let follow_url = follow
344 .resolve_url(&node.chain_spec())
345 .ok_or_eyre("No default follow URL for this chain")?;
346
347 Either::Left(run_follow_stack(
348 ctx.with_label("follow"),
349 args.consensus,
350 follow_url,
351 Arc::new(node),
352 cl_feed_state_clone,
353 ))
354 } else {
355 Either::Right(run_consensus_stack(
356 ctx.with_label("consensus"),
357 args.consensus,
358 Arc::new(node),
359 cl_feed_state_clone,
360 ))
361 };
362
363 tokio::pin!(consensus_stack);
364 loop {
365 tokio::select!(
366 biased;
367
368 () = shutdown_token_clone.cancelled() => {
369 break Ok(());
370 }
371
372 ret = &mut consensus_stack => {
373 break ret.and_then(|()| Err(eyre::eyre!(
374 "consensus stack exited unexpectedly"))
375 )
376 .wrap_err("consensus stack failed");
377 }
378
379 ret = &mut metrics_server, if !metrics_server.is_terminated() => {
380 let reason = match ret.wrap_err("task_panicked") {
381 Ok(Ok(())) => "unexpected regular exit".to_string(),
382 Ok(Err(err)) | Err(err) => format!("{err}"),
383 };
384
385 warn_span!("consensus_metrics").in_scope(|| {
386 warn!(reason, "the metrics server exited");
387 })
388 }
389 )
390 }
391 });
392
393 let _ = consensus_dead_tx.send(());
394 ret
395 });
396
397 let components =
398 |spec: Arc<TempoChainSpec>| (TempoEvmConfig::new(spec.clone()), TempoConsensus::new(spec));
399
400 cli.run_with_components::<TempoNode>(components, async move |builder, args| {
401 let faucet_args = args.faucet_args.clone();
402 let validator_key = args
403 .consensus
404 .public_key()
405 .await?
406 .map(|key| B256::from_slice(key.as_ref()));
407
408 #[cfg(feature = "pyroscope")]
410 let pyroscope_agent = if args.pyroscope_args.pyroscope_enabled {
411 let agent = pyroscope::PyroscopeAgent::builder(
412 &args.pyroscope_args.server_url,
413 &args.pyroscope_args.application_name,
414 )
415 .backend(pyroscope_pprofrs::pprof_backend(
416 pyroscope_pprofrs::PprofConfig::new()
417 .sample_rate(args.pyroscope_args.sample_rate)
418 .report_thread_id()
419 .report_thread_name(),
420 ))
421 .build()
422 .wrap_err("failed to build Pyroscope agent")?;
423
424 let agent = agent.start().wrap_err("failed to start Pyroscope agent")?;
425 info!(
426 server_url = %args.pyroscope_args.server_url,
427 application_name = %args.pyroscope_args.application_name,
428 "Pyroscope profiling enabled"
429 );
430
431 Some(agent)
432 } else {
433 None
434 };
435 let chain_id = builder.config().chain.chain().id();
436
437 let bootnodes_endpoint = match args.bootnodes_endpoint.trim() {
441 value if value.eq_ignore_ascii_case("none") => None,
442 url => Some(url.to_string()),
443 };
444
445 let NodeHandle {
446 node,
447 node_exit_future,
448 } = builder
449 .node(overrides.apply_tempo_node(TempoNode::new(
450 &args.node_args,
451 validator_key,
452 )))
453 .apply(|mut builder: WithLaunchContext<_>| {
454 builder
456 .config_mut()
457 .network
458 .discovery
459 .enable_discv5_discovery = true;
460
461 if args.is_following_uncertified() {
463 let follow_url = args
464 .follow
465 .as_ref()
466 .and_then(|follow| follow.resolve_url(&builder.config().chain));
467 builder.config_mut().debug.rpc_consensus_url = follow_url;
468 }
469
470 let has_consensus_engine =
471 args.has_consensus_engine(builder.config().dev.dev);
472
473 builder.extend_rpc_modules(move |ctx| {
474 if faucet_args.enabled {
475 let faucet_ext = TempoFaucetExt::new(
476 faucet_args.addresses(),
477 faucet_args.amount(),
478 faucet_args.provider(),
479 );
480
481 ctx.modules.merge_configured(faucet_ext.into_rpc())
482 .wrap_err("failed to register faucet rpc module")?;
483 }
484
485 if has_consensus_engine {
486 let consensus_rpc = TempoConsensusRpc::new(cl_feed_state);
487 ctx.modules.merge_configured(consensus_rpc.into_rpc())
488 .wrap_err("failed to register consensus rpc module")?;
489 }
490
491 Ok(())
492 })
493 })
494 .launch_with_debug_capabilities()
495 .await
496 .wrap_err("failed launching execution node")?;
497
498 if let Some(endpoint) = bootnodes_endpoint {
501 let network = node.network.clone();
502 node.tasks().spawn_task(async move {
503 match fetch_bootnodes(&endpoint, chain_id).await {
504 Ok(nodes) if nodes.is_empty() => {}
505 Ok(nodes) => {
506 info!(
507 chain_id,
508 count = nodes.len(),
509 endpoint,
510 "fetched bootnodes from endpoint"
511 );
512 for node in &nodes {
513 if let Some(discv4) = network.discv4() {
514 discv4.add_node(*node);
515 }
516 network.add_peer_kind(
517 node.id,
518 None,
519 node.tcp_addr(),
520 Some(node.udp_addr()),
521 );
522 }
523 if let Some(discv5) = network.discv5() {
524 let enr_requests = nodes.iter().filter_map(|node| {
525 match reth_discv5::BootNode::from_unsigned(*node) {
526 Ok(boot_node) => Some(async move {
527 if let Err(err) = discv5
528 .with_discv5(|d| {
529 d.request_enr(boot_node.to_string())
530 })
531 .await
532 {
533 debug!(%err, %node, "failed adding boot node to discv5");
534 }
535 }),
536 Err(err) => {
537 warn!(%err, %node, "failed converting boot node for discv5");
538 None
539 }
540 }
541 });
542 futures::future::join_all(enr_requests).await;
543 }
544 }
545 Err(err) => {
546 warn!(%err, endpoint, "failed to fetch bootnodes from endpoint");
547 }
548 }
549 });
550 }
551
552 let _ = args_and_node_handle_tx.send((node, args));
553
554 tokio::select! {
556 _ = node_exit_future => {
557 tracing::info!("execution node exited");
558 }
559 _ = &mut consensus_dead_rx => {
560 tracing::info!("consensus node exited");
561 }
562 _ = tokio::signal::ctrl_c() => {
563 tracing::info!("received shutdown signal");
564 }
565 }
566
567 #[cfg(feature = "pyroscope")]
568 if let Some(agent) = pyroscope_agent {
569 agent.shutdown();
570 }
571
572 Ok(())
573 })
574 .wrap_err("execution node failed")?;
575
576 shutdown_token.cancel();
577
578 match consensus_handle.join() {
579 Ok(Ok(())) => {}
580 Ok(Err(err)) => eprintln!("consensus task exited with error:\n{err:?}"),
581 Err(unwind) => std::panic::resume_unwind(unwind),
582 }
583 Ok(())
584}
585
586#[cfg(test)]
587mod tests {
588 use std::{sync::Once, time::Duration};
589
590 use clap::{CommandFactory, FromArgMatches, Parser};
591
592 use super::{
593 TempoCli, apply_tempo_cli_overrides, defaults, follow::FollowMode, snapshot_download,
594 };
595 use reth_ethereum::cli::Commands;
596
597 fn init_defaults_once() {
598 static INIT: Once = Once::new();
599 INIT.call_once(defaults::init_defaults);
600 }
601
602 fn parse_follow(args: &[&str]) -> Option<FollowMode> {
603 let cli = TempoCli::try_parse_from(args).unwrap();
604 let Commands::Node(node_cmd) = cli.command else {
605 panic!("expected node command");
606 };
607 node_cmd.ext.follow
608 }
609
610 #[test]
611 fn wrapped_download_matches_parse_for_tracing() {
612 init_defaults_once();
613
614 let matches = TempoCli::command()
615 .mut_subcommand("download", |_| snapshot_download::Args::command())
616 .try_get_matches_from([
617 "tempo",
618 "download",
619 "--manifest-url",
620 "https://snap/manifest.json",
621 "--datadir",
622 "/d",
623 "--skip-consensus=false",
624 "--log.stdout.filter",
625 "debug",
626 ])
627 .unwrap();
628
629 let cli = TempoCli::from_arg_matches(&matches).unwrap();
630
631 assert!(matches!(cli.command, Commands::Download(_)));
632 assert_eq!(cli.logs.log_stdout_filter, "debug");
633 }
634
635 #[test]
636 fn follow_arg_parses_to_expected_mode() {
637 init_defaults_once();
638
639 assert_eq!(parse_follow(&["tempo", "node", "--dev"]), None);
640 assert_eq!(
642 parse_follow(&["tempo", "node", "--dev", "--follow"]),
643 Some(FollowMode::Auto)
644 );
645 assert_eq!(
646 parse_follow(&["tempo", "node", "--dev", "--follow", "auto"]),
647 Some(FollowMode::Auto)
648 );
649 assert_eq!(
650 parse_follow(&["tempo", "node", "--dev", "--follow", "ws://upstream:8546"]),
651 Some(FollowMode::Url("ws://upstream:8546".to_string()))
652 );
653 }
654
655 #[test]
656 fn follow_certification_defaults() {
657 init_defaults_once();
658
659 let cli = TempoCli::try_parse_from(["tempo", "node", "--follow"]).unwrap();
660 let Commands::Node(node_cmd) = cli.command else {
661 panic!("expected node command");
662 };
663
664 assert!(!node_cmd.ext.is_following_uncertified());
665 assert!(node_cmd.ext.has_consensus_engine(false));
666 }
667
668 #[test]
669 fn follow_certification_disable() {
670 init_defaults_once();
671
672 let cli =
673 TempoCli::try_parse_from(["tempo", "node", "--follow", "--follow.nocertify"]).unwrap();
674
675 let Commands::Node(node_cmd) = cli.command else {
676 panic!("expected node command");
677 };
678
679 assert!(node_cmd.ext.is_following_uncertified());
680 assert!(!node_cmd.ext.has_consensus_engine(false));
681 }
682
683 #[test]
684 fn deprecated_follow_certification_flag_is_noop() {
685 init_defaults_once();
686
687 let cli = TempoCli::try_parse_from([
688 "tempo",
689 "node",
690 "--follow",
691 "--follow.experimental.certify",
692 ])
693 .unwrap();
694
695 let Commands::Node(node_cmd) = cli.command else {
696 panic!("expected node command");
697 };
698
699 assert!(!node_cmd.ext.is_following_uncertified());
700 assert!(node_cmd.ext.has_consensus_engine(false));
701 }
702
703 #[test]
704 fn consensus_block_budget_defaults_are_stable() {
705 init_defaults_once();
706
707 let cli = TempoCli::try_parse_from(["tempo", "node", "--dev"]).unwrap();
708 let Commands::Node(node_cmd) = cli.command else {
709 panic!("expected node command");
710 };
711 assert!(node_cmd.engine.share_sparse_trie_with_payload_builder);
712 assert!(
713 !node_cmd
714 .ext
715 .node_args
716 .engine_disable_execution_cache_sharing_with_builder
717 );
718 assert_eq!(node_cmd.builder.max_payload_tasks, 1);
719 assert!(!node_cmd.ext.node_args.builder_disable_prewarming);
720 assert!(node_cmd.ext.node_args.builder_enable_prewarming);
721 assert_eq!(
722 node_cmd.ext.consensus.target_block_time.into_duration(),
723 Duration::from_millis(550)
724 );
725 assert_eq!(
726 node_cmd.ext.consensus.wait_for_proposal.into_duration(),
727 Duration::from_millis(1200)
728 );
729 assert_eq!(
730 node_cmd.ext.consensus.network_budget.into_duration(),
731 Duration::from_millis(50)
732 );
733 assert_eq!(node_cmd.ext.node_args.builder_build_time_multiplier, 1.35);
734
735 let mut cli = TempoCli::try_parse_from([
736 "tempo",
737 "node",
738 "--dev",
739 "--engine.disable-execution-cache-sharing-with-builder",
740 ])
741 .unwrap();
742 apply_tempo_cli_overrides(&mut cli);
743 let Commands::Node(node_cmd) = cli.command else {
744 panic!("expected node command");
745 };
746 assert!(
747 node_cmd
748 .ext
749 .node_args
750 .engine_disable_execution_cache_sharing_with_builder
751 );
752 assert!(!node_cmd.engine.share_execution_cache_with_payload_builder);
753
754 let cli = TempoCli::try_parse_from([
755 "tempo",
756 "node",
757 "--dev",
758 "--engine.share-sparse-trie-with-payload-builder",
759 ])
760 .unwrap();
761 let Commands::Node(node_cmd) = cli.command else {
762 panic!("expected node command");
763 };
764 assert_eq!(
765 node_cmd.ext.consensus.target_block_time.into_duration(),
766 Duration::from_millis(550)
767 );
768 assert_eq!(
769 node_cmd.ext.consensus.wait_for_proposal.into_duration(),
770 Duration::from_millis(1200)
771 );
772 assert_eq!(
773 node_cmd.ext.consensus.network_budget.into_duration(),
774 Duration::from_millis(50)
775 );
776
777 let cli =
778 TempoCli::try_parse_from(["tempo", "node", "--dev", "--builder.disable-prewarming"])
779 .unwrap();
780 let Commands::Node(node_cmd) = cli.command else {
781 panic!("expected node command");
782 };
783 assert!(node_cmd.ext.node_args.builder_disable_prewarming);
784
785 let cli = TempoCli::try_parse_from([
786 "tempo",
787 "node",
788 "--dev",
789 "--builder.enable-prewarming",
790 "--builder.disable-prewarming",
791 ])
792 .unwrap();
793 let Commands::Node(node_cmd) = cli.command else {
794 panic!("expected node command");
795 };
796 assert!(node_cmd.ext.node_args.builder_enable_prewarming);
797 assert!(node_cmd.ext.node_args.builder_disable_prewarming);
798 assert!(
799 !node_cmd
800 .ext
801 .node_args
802 .payload_builder_builder()
803 .enable_prewarming
804 );
805 }
806}