tempo_sidecar/cmd/
tx_latency.rs

1use crate::monitor::prometheus_metrics;
2use alloy::{
3    primitives::B256,
4    providers::{Provider, ProviderBuilder, WsConnect},
5};
6use clap::Parser;
7use eyre::{Context, Result};
8use futures::StreamExt;
9use metrics::{describe_gauge, describe_histogram, gauge, histogram};
10use metrics_exporter_prometheus::PrometheusBuilder;
11use poem::{EndpointExt, Route, Server, get, listener::TcpListener};
12use reqwest::Url;
13use std::{
14    collections::{HashMap, HashSet},
15    time::{Duration, SystemTime, UNIX_EPOCH},
16};
17use tempo_alloy::{TempoNetwork, primitives::TempoHeader};
18use tokio::signal;
19use tracing::{debug, error, warn};
20
21#[derive(Parser, Debug)]
22#[command(author, version, about, long_about = None)]
23pub struct TxLatencyArgs {
24    /// RPC endpoint for the node.
25    #[arg(short, long, required = true)]
26    rpc_url: Url,
27
28    /// Chain identifier for labeling metrics.
29    #[arg(short, long, required = true)]
30    chain_id: String,
31
32    /// Port to expose Prometheus metrics on.
33    #[arg(short, long, required = true)]
34    port: u16,
35
36    /// Maximum age (seconds) to track pending transactions before expiring them.
37    #[arg(long, default_value_t = 600)]
38    max_pending_age_secs: u64,
39}
40
41struct TransactionLatencyMonitor {
42    rpc_url: Url,
43    max_pending_age: Duration,
44    /// Keeps track of the transactions that were emitted over the pending event stream.
45    pending: HashMap<B256, u128>,
46}
47
48impl TransactionLatencyMonitor {
49    fn new(rpc_url: Url, max_pending_age: Duration) -> Self {
50        Self {
51            rpc_url,
52            max_pending_age,
53            pending: HashMap::new(),
54        }
55    }
56
57    async fn watch_transactions(&mut self) -> Result<()> {
58        let rpc_url = self.rpc_url.to_string();
59        let mut provider = ProviderBuilder::new_with_network::<TempoNetwork>()
60            .connect_ws(WsConnect::new(rpc_url.clone()))
61            .await
62            .context("failed to connect websocket provider")?;
63        let mut pending_txs_sub = provider
64            .subscribe_pending_transactions()
65            .await
66            .context("failed to subscribe to pending transactions")?;
67
68        let mut block_subscription = provider
69            .subscribe_full_blocks()
70            .channel_size(1000)
71            .into_stream()
72            .await
73            .context("failed to create block stream")?;
74
75        let mut stream = pending_txs_sub.into_stream();
76
77        loop {
78            tokio::select! {
79                maybe_hash = stream.next() => {
80                    match maybe_hash {
81                        Some(hash) => { self.pending.entry(hash).or_insert_with(Self::now_millis); }
82                        None => {
83                            warn!("pending transaction stream ended; reconnecting");
84                            provider = ProviderBuilder::new_with_network::<TempoNetwork>()
85                                .connect_ws(WsConnect::new(rpc_url.clone()))
86                                .await
87                                .context("failed to reconnect websocket provider")?;
88                            pending_txs_sub = provider
89                                .subscribe_pending_transactions()
90                                .await
91                                .context("failed to resubscribe to pending transactions")?;
92                            stream = pending_txs_sub.into_stream();
93                            continue;
94                        }
95                    }
96                },
97                maybe_block = block_subscription.next() => {
98                    if let Some(Ok(block)) = maybe_block {
99                         self.on_mined_block(block.header.inner.into_consensus(), block.transactions.hashes().collect());
100                    }
101                }
102            }
103        }
104    }
105
106    fn on_mined_block(&mut self, header: TempoHeader, mined_txs: HashSet<B256>) {
107        gauge!("tempo_tx_latency_pending_observed").set(self.pending.len() as f64);
108        if self.pending.is_empty() {
109            return;
110        }
111        self.pending.retain(|hash, seen_at| {
112            if mined_txs.contains(hash) {
113                let latency_secs =
114                    Self::latency_seconds(*seen_at, header.timestamp_millis() as u128);
115                histogram!("tempo_tx_landing_latency_seconds").record(latency_secs);
116                false
117            } else {
118                true
119            }
120        });
121
122        let now = Self::now_millis();
123        let max_age_millis = self.max_pending_age.as_millis();
124        let before_cleanup = self.pending.len();
125        self.pending
126            .retain(|_, seen_at| now.saturating_sub(*seen_at) <= max_age_millis);
127
128        if self.pending.len() < before_cleanup {
129            debug!(
130                removed = before_cleanup - self.pending.len(),
131                "dropped stale pending transactions"
132            );
133        }
134    }
135
136    fn now_millis() -> u128 {
137        SystemTime::now()
138            .duration_since(UNIX_EPOCH)
139            .map(|duration| duration.as_millis())
140            .unwrap_or_default()
141    }
142
143    fn latency_seconds(seen_at_millis: u128, landing_millis: u128) -> f64 {
144        landing_millis.saturating_sub(seen_at_millis) as f64 / 1000.0
145    }
146}
147
148impl TxLatencyArgs {
149    pub async fn run(self) -> Result<()> {
150        tracing_subscriber::FmtSubscriber::builder()
151            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
152            .init();
153
154        let builder = PrometheusBuilder::new().add_global_label("chain_id", self.chain_id.clone());
155        let metrics_handle = builder
156            .install_recorder()
157            .context("failed to install recorder")?;
158
159        describe_histogram!(
160            "tempo_tx_landing_latency_seconds",
161            "Latency between seeing a transaction in the pool and it landing in a block"
162        );
163        describe_gauge!(
164            "tempo_tx_latency_pending_observed",
165            "Number of observed pending transactions awaiting inclusion"
166        );
167
168        let app = Route::new().at(
169            "/metrics",
170            get(prometheus_metrics).data(metrics_handle.clone()),
171        );
172
173        let addr = format!("0.0.0.0:{}", self.port);
174
175        let mut monitor = TransactionLatencyMonitor::new(
176            self.rpc_url,
177            Duration::from_secs(self.max_pending_age_secs),
178        );
179
180        let monitor_handle = tokio::spawn(async move {
181            if let Err(err) = monitor.watch_transactions().await {
182                error!(err = %err, "tx latency monitor exited with error");
183            }
184        });
185
186        let server = Server::new(TcpListener::bind(addr));
187        let server_handle = tokio::spawn(async move { server.run(app).await });
188
189        let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())
190            .context("failed to install SIGTERM handler")?;
191        let mut sigint = signal::unix::signal(signal::unix::SignalKind::interrupt())
192            .context("failed to install SIGINT handler")?;
193
194        tokio::select! {
195            _ = sigterm.recv() => tracing::info!("Received SIGTERM, shutting down gracefully"),
196            _ = sigint.recv() => tracing::info!("Received SIGINT, shutting down gracefully"),
197        }
198
199        monitor_handle.abort();
200        server_handle.abort();
201
202        tracing::info!("Shutdown complete");
203        Ok(())
204    }
205}