Skip to main content

tempo_sidecar/cmd/
tx_latency.rs

1use crate::monitor::prometheus_metrics;
2use alloy::{
3    primitives::map::{B256Map, B256Set},
4    providers::{Provider, ProviderBuilder, WsConnect},
5};
6use clap::Parser;
7use eyre::{Context, Result};
8use futures::StreamExt;
9use metrics::{describe_gauge, describe_histogram, gauge, histogram};
10use metrics_exporter_prometheus::PrometheusBuilder;
11use poem::{EndpointExt, Route, Server, get, listener::TcpListener};
12use reqwest::Url;
13use std::time::{Duration, SystemTime, UNIX_EPOCH};
14use tempo_alloy::{TempoNetwork, primitives::TempoHeader};
15use tokio::signal;
16use tracing::{debug, error, warn};
17
18#[derive(Parser, Debug)]
19#[command(author, version, about, long_about = None)]
20pub struct TxLatencyArgs {
21    /// RPC endpoint for the node.
22    #[arg(short, long, required = true)]
23    rpc_url: Url,
24
25    /// Chain identifier for labeling metrics.
26    #[arg(short, long, required = true)]
27    chain_id: String,
28
29    /// Port to expose Prometheus metrics on.
30    #[arg(short, long, required = true)]
31    port: u16,
32
33    /// Maximum age (seconds) to track pending transactions before expiring them.
34    #[arg(long, default_value_t = 600)]
35    max_pending_age_secs: u64,
36}
37
38struct TransactionLatencyMonitor {
39    rpc_url: Url,
40    max_pending_age: Duration,
41    /// Keeps track of the transactions that were emitted over the pending event stream.
42    pending: B256Map<u128>,
43}
44
45impl TransactionLatencyMonitor {
46    fn new(rpc_url: Url, max_pending_age: Duration) -> Self {
47        Self {
48            rpc_url,
49            max_pending_age,
50            pending: Default::default(),
51        }
52    }
53
54    async fn watch_transactions(&mut self) -> Result<()> {
55        let rpc_url = self.rpc_url.to_string();
56        let mut provider = ProviderBuilder::new_with_network::<TempoNetwork>()
57            .connect_ws(WsConnect::new(rpc_url.clone()))
58            .await
59            .context("failed to connect websocket provider")?;
60        let mut pending_txs_sub = provider
61            .subscribe_pending_transactions()
62            .await
63            .context("failed to subscribe to pending transactions")?;
64
65        let mut block_subscription = provider
66            .subscribe_full_blocks()
67            .channel_size(1000)
68            .into_stream()
69            .await
70            .context("failed to create block stream")?;
71
72        let mut stream = pending_txs_sub.into_stream();
73
74        loop {
75            tokio::select! {
76                maybe_hash = stream.next() => {
77                    match maybe_hash {
78                        Some(hash) => { self.pending.entry(hash).or_insert_with(Self::now_millis); }
79                        None => {
80                            warn!("pending transaction stream ended; reconnecting");
81                            provider = ProviderBuilder::new_with_network::<TempoNetwork>()
82                                .connect_ws(WsConnect::new(rpc_url.clone()))
83                                .await
84                                .context("failed to reconnect websocket provider")?;
85                            pending_txs_sub = provider
86                                .subscribe_pending_transactions()
87                                .await
88                                .context("failed to resubscribe to pending transactions")?;
89                            stream = pending_txs_sub.into_stream();
90                            continue;
91                        }
92                    }
93                },
94                maybe_block = block_subscription.next() => {
95                    if let Some(Ok(block)) = maybe_block {
96                         self.on_mined_block(block.header.inner.into_consensus(), block.transactions.hashes().collect());
97                    }
98                }
99            }
100        }
101    }
102
103    fn on_mined_block(&mut self, header: TempoHeader, mined_txs: B256Set) {
104        gauge!("tempo_tx_latency_pending_observed").set(self.pending.len() as f64);
105        if self.pending.is_empty() {
106            return;
107        }
108        self.pending.retain(|hash, seen_at| {
109            if mined_txs.contains(hash) {
110                let latency_secs =
111                    Self::latency_seconds(*seen_at, header.timestamp_millis() as u128);
112                histogram!("tempo_tx_landing_latency_seconds").record(latency_secs);
113                false
114            } else {
115                true
116            }
117        });
118
119        let now = Self::now_millis();
120        let max_age_millis = self.max_pending_age.as_millis();
121        let before_cleanup = self.pending.len();
122        self.pending
123            .retain(|_, seen_at| now.saturating_sub(*seen_at) <= max_age_millis);
124
125        if self.pending.len() < before_cleanup {
126            debug!(
127                removed = before_cleanup - self.pending.len(),
128                "dropped stale pending transactions"
129            );
130        }
131    }
132
133    fn now_millis() -> u128 {
134        SystemTime::now()
135            .duration_since(UNIX_EPOCH)
136            .map(|duration| duration.as_millis())
137            .unwrap_or_default()
138    }
139
140    fn latency_seconds(seen_at_millis: u128, landing_millis: u128) -> f64 {
141        landing_millis.saturating_sub(seen_at_millis) as f64 / 1000.0
142    }
143}
144
145impl TxLatencyArgs {
146    pub async fn run(self) -> Result<()> {
147        tracing_subscriber::FmtSubscriber::builder()
148            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
149            .init();
150
151        let builder = PrometheusBuilder::new().add_global_label("chain_id", self.chain_id.clone());
152        let metrics_handle = builder
153            .install_recorder()
154            .context("failed to install recorder")?;
155
156        describe_histogram!(
157            "tempo_tx_landing_latency_seconds",
158            "Latency between seeing a transaction in the pool and it landing in a block"
159        );
160        describe_gauge!(
161            "tempo_tx_latency_pending_observed",
162            "Number of observed pending transactions awaiting inclusion"
163        );
164
165        let app = Route::new().at(
166            "/metrics",
167            get(prometheus_metrics).data(metrics_handle.clone()),
168        );
169
170        let addr = format!("0.0.0.0:{}", self.port);
171
172        let mut monitor = TransactionLatencyMonitor::new(
173            self.rpc_url,
174            Duration::from_secs(self.max_pending_age_secs),
175        );
176
177        let monitor_handle = tokio::spawn(async move {
178            if let Err(err) = monitor.watch_transactions().await {
179                error!(err = %err, "tx latency monitor exited with error");
180            }
181        });
182
183        let server = Server::new(TcpListener::bind(addr));
184        let server_handle = tokio::spawn(async move { server.run(app).await });
185
186        let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())
187            .context("failed to install SIGTERM handler")?;
188        let mut sigint = signal::unix::signal(signal::unix::SignalKind::interrupt())
189            .context("failed to install SIGINT handler")?;
190
191        tokio::select! {
192            _ = sigterm.recv() => tracing::info!("Received SIGTERM, shutting down gracefully"),
193            _ = sigint.recv() => tracing::info!("Received SIGINT, shutting down gracefully"),
194        }
195
196        monitor_handle.abort();
197        server_handle.abort();
198
199        tracing::info!("Shutdown complete");
200        Ok(())
201    }
202}