tempo/
main.rs

1//! Main executable for the Reth-Commonware node.
2//!
3//! This binary launches a blockchain node that combines:
4//! - Reth's execution layer for transaction processing and state management
5//! - Commonware's consensus engine for block agreement
6//!
7//! The node operates by:
8//! 1. Starting the Reth node infrastructure (database, networking, RPC)
9//! 2. Creating the application state that bridges Reth and Commonware
10//! 3. Launching the Commonware consensus engine via a separate task and a separate tokio runtime.
11//! 4. Running both components until shutdown
12//!
13//! Configuration can be provided via command-line arguments or configuration files.
14
15#![cfg_attr(not(test), warn(unused_crate_dependencies))]
16#![cfg_attr(docsrs, feature(doc_cfg))]
17
18mod defaults;
19
20use clap::Parser;
21use commonware_runtime::{Metrics, Runner};
22use eyre::WrapErr as _;
23use futures::{FutureExt as _, future::FusedFuture as _};
24use reth_ethereum::{
25    chainspec::EthChainSpec as _,
26    cli::{Cli, Commands},
27    evm::revm::primitives::B256,
28};
29use reth_ethereum_cli as _;
30use reth_node_builder::{NodeHandle, WithLaunchContext};
31use std::{sync::Arc, thread};
32use tempo_chainspec::spec::{TempoChainSpec, TempoChainSpecParser};
33use tempo_commonware_node::run_consensus_stack;
34use tempo_consensus::TempoConsensus;
35use tempo_evm::{TempoEvmConfig, TempoEvmFactory};
36use tempo_faucet::{
37    args::FaucetArgs,
38    faucet::{TempoFaucetExt, TempoFaucetExtApiServer},
39};
40use tempo_node::{TempoFullNode, TempoNodeArgs, node::TempoNode};
41use tokio::sync::oneshot;
42use tracing::{info, info_span};
43
44// TODO: migrate this to tempo_node eventually.
45#[derive(Debug, Clone, PartialEq, Eq, clap::Args)]
46struct TempoArgs {
47    /// Follow this specific RPC node for block hashes
48    #[arg(
49        long,
50        value_name = "URL",
51        default_missing_value = "wss://rpc.testnet.tempo.xyz",
52        num_args(0..=1)
53    )]
54    pub follow: Option<String>,
55
56    #[command(flatten)]
57    pub consensus: tempo_commonware_node::Args,
58
59    #[command(flatten)]
60    pub faucet_args: FaucetArgs,
61
62    #[command(flatten)]
63    pub node_args: TempoNodeArgs,
64}
65
66fn main() -> eyre::Result<()> {
67    reth_cli_util::sigsegv_handler::install();
68
69    // XXX: ensures that the error source chain is preserved in
70    // tracing-instrument generated error events. That is, this hook ensures
71    // that functions instrumented like `#[instrument(err)]` will emit an event
72    // that contains the entire error source chain.
73    //
74    // TODO: Can remove this if https://github.com/tokio-rs/tracing/issues/2648
75    // ever gets addressed.
76    tempo_eyre::install()
77        .expect("must install the eyre error hook before constructing any eyre reports");
78
79    // Enable backtraces unless a RUST_BACKTRACE value has already been explicitly provided.
80    if std::env::var_os("RUST_BACKTRACE").is_none() {
81        unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
82    }
83
84    tempo_node::init_version_metadata();
85    defaults::init_defaults();
86
87    let cli = Cli::<TempoChainSpecParser, TempoArgs>::parse();
88    let is_node = matches!(cli.command, Commands::Node(_));
89
90    let (args_and_node_handle_tx, args_and_node_handle_rx) =
91        oneshot::channel::<(TempoFullNode, TempoArgs)>();
92    let (consensus_dead_tx, mut consensus_dead_rx) = oneshot::channel();
93
94    let shutdown_token = tokio_util::sync::CancellationToken::new();
95
96    let shutdown_token_clone = shutdown_token.clone();
97    let consensus_handle = thread::spawn(move || {
98        // Exit early if we are not executing `tempo node` command.
99        if !is_node {
100            return Ok(());
101        }
102
103        let (node, args) = args_and_node_handle_rx.blocking_recv().wrap_err(
104            "channel closed before consensus-relevant command line args \
105                and a handle to the execution node could be received",
106        )?;
107
108        let ret = if node.config.dev.dev || args.follow.is_some() {
109            futures::executor::block_on(async move {
110                shutdown_token_clone.cancelled().await;
111                Ok(())
112            })
113        } else {
114            let consensus_storage = &node
115                .config
116                .datadir
117                .clone()
118                .resolve_datadir(node.chain_spec().chain())
119                .data_dir()
120                .join("consensus");
121            let runtime_config = commonware_runtime::tokio::Config::default()
122                .with_tcp_nodelay(Some(true))
123                .with_worker_threads(args.consensus.worker_threads)
124                .with_storage_directory(consensus_storage)
125                .with_catch_panics(true);
126
127            info_span!("prepare_consensus").in_scope(|| {
128                info!(
129                    path = %consensus_storage.display(),
130                    "determined directory for consensus data",
131                )
132            });
133
134            let runner = commonware_runtime::tokio::Runner::new(runtime_config);
135
136            runner.start(async move |ctx| {
137                // Ensure all consensus metrics are prefixed. Shadow `ctx` to
138                // not forget.
139                let ctx = ctx.with_label("consensus");
140
141                let mut metrics_server = tempo_commonware_node::metrics::install(
142                    ctx.with_label("metrics"),
143                    args.consensus.metrics_address,
144                )
145                .fuse();
146                let consensus_stack = run_consensus_stack(&ctx, args.consensus, node);
147                tokio::pin!(consensus_stack);
148                loop {
149                    tokio::select!(
150                        biased;
151
152                        () = shutdown_token_clone.cancelled() => {
153                            break Ok(());
154                        }
155
156                        ret = &mut consensus_stack => {
157                            break ret.and_then(|()| Err(eyre::eyre!(
158                                "consensus stack exited unexpectedly"))
159                            )
160                            .wrap_err("consensus stack failed");
161                        }
162
163                        ret = &mut metrics_server, if !metrics_server.is_terminated() => {
164                            let reason = match ret.wrap_err("task_panicked") {
165                                Ok(Ok(())) => "unexpected regular exit".to_string(),
166                                Ok(Err(err)) | Err(err) => format!("{err}"),
167                            };
168                            tracing::warn!(reason, "the metrics server exited");
169                        }
170                    )
171                }
172            })
173        };
174        let _ = consensus_dead_tx.send(());
175        ret
176    });
177
178    let components = |spec: Arc<TempoChainSpec>| {
179        (
180            TempoEvmConfig::new(spec.clone(), TempoEvmFactory::default()),
181            TempoConsensus::new(spec),
182        )
183    };
184
185    cli.run_with_components::<TempoNode>(components, async move |builder, args| {
186        let faucet_args = args.faucet_args.clone();
187        let validator_key = args
188            .consensus
189            .public_key()?
190            .map(|key| B256::from_slice(key.as_ref()));
191
192        let NodeHandle {
193            node,
194            node_exit_future,
195        } = builder
196            .node(TempoNode::new(&args.node_args, validator_key))
197            .apply(|mut builder: WithLaunchContext<_>| {
198                if let Some(follow_url) = &args.follow {
199                    builder.config_mut().debug.rpc_consensus_url = Some(follow_url.clone());
200                }
201
202                builder
203            })
204            .extend_rpc_modules(move |ctx| {
205                if faucet_args.enabled {
206                    let ext = TempoFaucetExt::new(
207                        faucet_args.addresses(),
208                        faucet_args.amount(),
209                        faucet_args.provider(),
210                    );
211
212                    ctx.modules.merge_configured(ext.into_rpc())?;
213                }
214
215                Ok(())
216            })
217            .launch_with_debug_capabilities()
218            .await
219            .wrap_err("failed launching execution node")?;
220
221        let _ = args_and_node_handle_tx.send((node, args));
222
223        // TODO: emit these inside a span
224        tokio::select! {
225            _ = node_exit_future => {
226                tracing::info!("execution node exited");
227            }
228            _ = &mut consensus_dead_rx => {
229                tracing::info!("consensus node exited");
230            }
231            _ = tokio::signal::ctrl_c() => {
232                tracing::info!("received shutdown signal");
233            }
234        }
235        Ok(())
236    })
237    .wrap_err("execution node failed")?;
238
239    shutdown_token.cancel();
240
241    match consensus_handle.join() {
242        Ok(Ok(())) => {}
243        Ok(Err(err)) => eprintln!("consensus task exited with error:\n{err:?}"),
244        Err(unwind) => std::panic::resume_unwind(unwind),
245    }
246    Ok(())
247}