tempo_sidecar/cmd/
tx_latency.rs1use crate::monitor::prometheus_metrics;
2use alloy::{
3 primitives::B256,
4 providers::{Provider, ProviderBuilder, WsConnect},
5};
6use clap::Parser;
7use eyre::{Context, Result};
8use futures::StreamExt;
9use metrics::{describe_gauge, describe_histogram, gauge, histogram};
10use metrics_exporter_prometheus::PrometheusBuilder;
11use poem::{EndpointExt, Route, Server, get, listener::TcpListener};
12use reqwest::Url;
13use std::{
14 collections::{HashMap, HashSet},
15 time::{Duration, SystemTime, UNIX_EPOCH},
16};
17use tempo_alloy::{TempoNetwork, primitives::TempoHeader};
18use tokio::signal;
19use tracing::{debug, error, warn};
20
21#[derive(Parser, Debug)]
22#[command(author, version, about, long_about = None)]
23pub struct TxLatencyArgs {
24 #[arg(short, long, required = true)]
26 rpc_url: Url,
27
28 #[arg(short, long, required = true)]
30 chain_id: String,
31
32 #[arg(short, long, required = true)]
34 port: u16,
35
36 #[arg(long, default_value_t = 600)]
38 max_pending_age_secs: u64,
39}
40
41struct TransactionLatencyMonitor {
42 rpc_url: Url,
43 max_pending_age: Duration,
44 pending: HashMap<B256, u128>,
46}
47
48impl TransactionLatencyMonitor {
49 fn new(rpc_url: Url, max_pending_age: Duration) -> Self {
50 Self {
51 rpc_url,
52 max_pending_age,
53 pending: HashMap::new(),
54 }
55 }
56
57 async fn watch_transactions(&mut self) -> Result<()> {
58 let rpc_url = self.rpc_url.to_string();
59 let mut provider = ProviderBuilder::new_with_network::<TempoNetwork>()
60 .connect_ws(WsConnect::new(rpc_url.clone()))
61 .await
62 .context("failed to connect websocket provider")?;
63 let mut pending_txs_sub = provider
64 .subscribe_pending_transactions()
65 .await
66 .context("failed to subscribe to pending transactions")?;
67
68 let mut block_subscription = provider
69 .subscribe_full_blocks()
70 .channel_size(1000)
71 .into_stream()
72 .await
73 .context("failed to create block stream")?;
74
75 let mut stream = pending_txs_sub.into_stream();
76
77 loop {
78 tokio::select! {
79 maybe_hash = stream.next() => {
80 match maybe_hash {
81 Some(hash) => { self.pending.entry(hash).or_insert_with(Self::now_millis); }
82 None => {
83 warn!("pending transaction stream ended; reconnecting");
84 provider = ProviderBuilder::new_with_network::<TempoNetwork>()
85 .connect_ws(WsConnect::new(rpc_url.clone()))
86 .await
87 .context("failed to reconnect websocket provider")?;
88 pending_txs_sub = provider
89 .subscribe_pending_transactions()
90 .await
91 .context("failed to resubscribe to pending transactions")?;
92 stream = pending_txs_sub.into_stream();
93 continue;
94 }
95 }
96 },
97 maybe_block = block_subscription.next() => {
98 if let Some(Ok(block)) = maybe_block {
99 self.on_mined_block(block.header.inner.into_consensus(), block.transactions.hashes().collect());
100 }
101 }
102 }
103 }
104 }
105
106 fn on_mined_block(&mut self, header: TempoHeader, mined_txs: HashSet<B256>) {
107 gauge!("tempo_tx_latency_pending_observed").set(self.pending.len() as f64);
108 if self.pending.is_empty() {
109 return;
110 }
111 self.pending.retain(|hash, seen_at| {
112 if mined_txs.contains(hash) {
113 let latency_secs =
114 Self::latency_seconds(*seen_at, header.timestamp_millis() as u128);
115 histogram!("tempo_tx_landing_latency_seconds").record(latency_secs);
116 false
117 } else {
118 true
119 }
120 });
121
122 let now = Self::now_millis();
123 let max_age_millis = self.max_pending_age.as_millis();
124 let before_cleanup = self.pending.len();
125 self.pending
126 .retain(|_, seen_at| now.saturating_sub(*seen_at) <= max_age_millis);
127
128 if self.pending.len() < before_cleanup {
129 debug!(
130 removed = before_cleanup - self.pending.len(),
131 "dropped stale pending transactions"
132 );
133 }
134 }
135
136 fn now_millis() -> u128 {
137 SystemTime::now()
138 .duration_since(UNIX_EPOCH)
139 .map(|duration| duration.as_millis())
140 .unwrap_or_default()
141 }
142
143 fn latency_seconds(seen_at_millis: u128, landing_millis: u128) -> f64 {
144 landing_millis.saturating_sub(seen_at_millis) as f64 / 1000.0
145 }
146}
147
148impl TxLatencyArgs {
149 pub async fn run(self) -> Result<()> {
150 tracing_subscriber::FmtSubscriber::builder()
151 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
152 .init();
153
154 let builder = PrometheusBuilder::new().add_global_label("chain_id", self.chain_id.clone());
155 let metrics_handle = builder
156 .install_recorder()
157 .context("failed to install recorder")?;
158
159 describe_histogram!(
160 "tempo_tx_landing_latency_seconds",
161 "Latency between seeing a transaction in the pool and it landing in a block"
162 );
163 describe_gauge!(
164 "tempo_tx_latency_pending_observed",
165 "Number of observed pending transactions awaiting inclusion"
166 );
167
168 let app = Route::new().at(
169 "/metrics",
170 get(prometheus_metrics).data(metrics_handle.clone()),
171 );
172
173 let addr = format!("0.0.0.0:{}", self.port);
174
175 let mut monitor = TransactionLatencyMonitor::new(
176 self.rpc_url,
177 Duration::from_secs(self.max_pending_age_secs),
178 );
179
180 let monitor_handle = tokio::spawn(async move {
181 if let Err(err) = monitor.watch_transactions().await {
182 error!(err = %err, "tx latency monitor exited with error");
183 }
184 });
185
186 let server = Server::new(TcpListener::bind(addr));
187 let server_handle = tokio::spawn(async move { server.run(app).await });
188
189 let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())
190 .context("failed to install SIGTERM handler")?;
191 let mut sigint = signal::unix::signal(signal::unix::SignalKind::interrupt())
192 .context("failed to install SIGINT handler")?;
193
194 tokio::select! {
195 _ = sigterm.recv() => tracing::info!("Received SIGTERM, shutting down gracefully"),
196 _ = sigint.recv() => tracing::info!("Received SIGINT, shutting down gracefully"),
197 }
198
199 monitor_handle.abort();
200 server_handle.abort();
201
202 tracing::info!("Shutdown complete");
203 Ok(())
204 }
205}