tempo_sidecar/cmd/
tx_latency.rs1use crate::monitor::prometheus_metrics;
2use alloy::{
3 primitives::map::{B256Map, B256Set},
4 providers::{Provider, ProviderBuilder, WsConnect},
5};
6use clap::Parser;
7use eyre::{Context, Result};
8use futures::StreamExt;
9use metrics::{describe_gauge, describe_histogram, gauge, histogram};
10use metrics_exporter_prometheus::PrometheusBuilder;
11use poem::{EndpointExt, Route, Server, get, listener::TcpListener};
12use reqwest::Url;
13use std::time::{Duration, SystemTime, UNIX_EPOCH};
14use tempo_alloy::{TempoNetwork, primitives::TempoHeader};
15use tokio::signal;
16use tracing::{debug, error, warn};
17
18#[derive(Parser, Debug)]
19#[command(author, version, about, long_about = None)]
20pub struct TxLatencyArgs {
21 #[arg(short, long, required = true)]
23 rpc_url: Url,
24
25 #[arg(short, long, required = true)]
27 chain_id: String,
28
29 #[arg(short, long, required = true)]
31 port: u16,
32
33 #[arg(long, default_value_t = 600)]
35 max_pending_age_secs: u64,
36}
37
38struct TransactionLatencyMonitor {
39 rpc_url: Url,
40 max_pending_age: Duration,
41 pending: B256Map<u128>,
43}
44
45impl TransactionLatencyMonitor {
46 fn new(rpc_url: Url, max_pending_age: Duration) -> Self {
47 Self {
48 rpc_url,
49 max_pending_age,
50 pending: Default::default(),
51 }
52 }
53
54 async fn watch_transactions(&mut self) -> Result<()> {
55 let rpc_url = self.rpc_url.to_string();
56 let mut provider = ProviderBuilder::new_with_network::<TempoNetwork>()
57 .connect_ws(WsConnect::new(rpc_url.clone()))
58 .await
59 .context("failed to connect websocket provider")?;
60 let mut pending_txs_sub = provider
61 .subscribe_pending_transactions()
62 .await
63 .context("failed to subscribe to pending transactions")?;
64
65 let mut block_subscription = provider
66 .subscribe_full_blocks()
67 .channel_size(1000)
68 .into_stream()
69 .await
70 .context("failed to create block stream")?;
71
72 let mut stream = pending_txs_sub.into_stream();
73
74 loop {
75 tokio::select! {
76 maybe_hash = stream.next() => {
77 match maybe_hash {
78 Some(hash) => { self.pending.entry(hash).or_insert_with(Self::now_millis); }
79 None => {
80 warn!("pending transaction stream ended; reconnecting");
81 provider = ProviderBuilder::new_with_network::<TempoNetwork>()
82 .connect_ws(WsConnect::new(rpc_url.clone()))
83 .await
84 .context("failed to reconnect websocket provider")?;
85 pending_txs_sub = provider
86 .subscribe_pending_transactions()
87 .await
88 .context("failed to resubscribe to pending transactions")?;
89 stream = pending_txs_sub.into_stream();
90 continue;
91 }
92 }
93 },
94 maybe_block = block_subscription.next() => {
95 if let Some(Ok(block)) = maybe_block {
96 self.on_mined_block(block.header.inner.into_consensus(), block.transactions.hashes().collect());
97 }
98 }
99 }
100 }
101 }
102
103 fn on_mined_block(&mut self, header: TempoHeader, mined_txs: B256Set) {
104 gauge!("tempo_tx_latency_pending_observed").set(self.pending.len() as f64);
105 if self.pending.is_empty() {
106 return;
107 }
108 self.pending.retain(|hash, seen_at| {
109 if mined_txs.contains(hash) {
110 let latency_secs =
111 Self::latency_seconds(*seen_at, header.timestamp_millis() as u128);
112 histogram!("tempo_tx_landing_latency_seconds").record(latency_secs);
113 false
114 } else {
115 true
116 }
117 });
118
119 let now = Self::now_millis();
120 let max_age_millis = self.max_pending_age.as_millis();
121 let before_cleanup = self.pending.len();
122 self.pending
123 .retain(|_, seen_at| now.saturating_sub(*seen_at) <= max_age_millis);
124
125 if self.pending.len() < before_cleanup {
126 debug!(
127 removed = before_cleanup - self.pending.len(),
128 "dropped stale pending transactions"
129 );
130 }
131 }
132
133 fn now_millis() -> u128 {
134 SystemTime::now()
135 .duration_since(UNIX_EPOCH)
136 .map(|duration| duration.as_millis())
137 .unwrap_or_default()
138 }
139
140 fn latency_seconds(seen_at_millis: u128, landing_millis: u128) -> f64 {
141 landing_millis.saturating_sub(seen_at_millis) as f64 / 1000.0
142 }
143}
144
145impl TxLatencyArgs {
146 pub async fn run(self) -> Result<()> {
147 tracing_subscriber::FmtSubscriber::builder()
148 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
149 .init();
150
151 let builder = PrometheusBuilder::new().add_global_label("chain_id", self.chain_id.clone());
152 let metrics_handle = builder
153 .install_recorder()
154 .context("failed to install recorder")?;
155
156 describe_histogram!(
157 "tempo_tx_landing_latency_seconds",
158 "Latency between seeing a transaction in the pool and it landing in a block"
159 );
160 describe_gauge!(
161 "tempo_tx_latency_pending_observed",
162 "Number of observed pending transactions awaiting inclusion"
163 );
164
165 let app = Route::new().at(
166 "/metrics",
167 get(prometheus_metrics).data(metrics_handle.clone()),
168 );
169
170 let addr = format!("0.0.0.0:{}", self.port);
171
172 let mut monitor = TransactionLatencyMonitor::new(
173 self.rpc_url,
174 Duration::from_secs(self.max_pending_age_secs),
175 );
176
177 let monitor_handle = tokio::spawn(async move {
178 if let Err(err) = monitor.watch_transactions().await {
179 error!(err = %err, "tx latency monitor exited with error");
180 }
181 });
182
183 let server = Server::new(TcpListener::bind(addr));
184 let server_handle = tokio::spawn(async move { server.run(app).await });
185
186 let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())
187 .context("failed to install SIGTERM handler")?;
188 let mut sigint = signal::unix::signal(signal::unix::SignalKind::interrupt())
189 .context("failed to install SIGINT handler")?;
190
191 tokio::select! {
192 _ = sigterm.recv() => tracing::info!("Received SIGTERM, shutting down gracefully"),
193 _ = sigint.recv() => tracing::info!("Received SIGINT, shutting down gracefully"),
194 }
195
196 monitor_handle.abort();
197 server_handle.abort();
198
199 tracing::info!("Shutdown complete");
200 Ok(())
201 }
202}