tempo_sidecar/cmd/
monitor.rs1use crate::monitor::{Monitor, prometheus_metrics};
2use alloy::primitives::Address;
3use clap::Parser;
4use eyre::{Context, eyre};
5use metrics::{describe_counter, describe_gauge};
6use metrics_exporter_prometheus::PrometheusBuilder;
7use poem::{EndpointExt, Route, Server, get, listener::TcpListener};
8use reqwest::Url;
9use tempo_precompiles::tip20::is_tip20_prefix;
10use tokio::signal;
11use tracing_subscriber::EnvFilter;
12
13#[derive(Parser, Debug)]
14#[command(author, version, about, long_about = None)]
15pub struct MonitorArgs {
16 #[arg(short, long, required = true)]
17 rpc_url: Url,
18
19 #[arg(long, default_value_t = 5)]
20 poll_interval: u64,
21
22 #[arg(short, long, required = true)]
23 chain_id: String,
24
25 #[arg(short, long, required = true)]
26 port: u16,
27
28 #[arg(short, long, value_delimiter = ',', num_args = 2.., required = true)]
32 tokens: Vec<Address>,
33}
34
35impl MonitorArgs {
36 pub async fn run(self) -> eyre::Result<()> {
37 tracing_subscriber::FmtSubscriber::builder()
38 .with_env_filter(EnvFilter::from_default_env())
39 .init();
40
41 let builder = PrometheusBuilder::new().add_global_label("chain_id", self.chain_id.clone());
42 let metrics_handle = builder
43 .install_recorder()
44 .context("failed to install recorder")?;
45
46 if self.tokens.iter().any(|t| !is_tip20_prefix(*t)) {
47 return Err(eyre!("Invalid input. Pools require TIP20 tokens."));
48 }
49
50 let mut monitor = Monitor::new(
51 self.rpc_url,
52 self.poll_interval,
53 self.tokens.iter().copied().collect(),
54 )
55 .await
56 .context("failed to initialize monitor")?;
57
58 describe_gauge!(
59 "tempo_fee_amm_user_reserves",
60 "User token reserves in the FeeAMM pool"
61 );
62 describe_gauge!(
63 "tempo_fee_amm_validator_reserves",
64 "Validator token reserves in the FeeAMM pool"
65 );
66
67 describe_counter!(
68 "tempo_fee_amm_errors",
69 "Number of errors encountered while fetching FeeAMM data"
70 );
71
72 let app = Route::new().at(
73 "/metrics",
74 get(prometheus_metrics).data(metrics_handle.clone()),
75 );
76
77 let addr = format!("0.0.0.0:{}", self.port);
78
79 let monitor_handle = tokio::spawn(async move {
80 monitor.worker().await;
81 });
82
83 let server = Server::new(TcpListener::bind(addr));
84 let server_handle = tokio::spawn(async move { server.run(app).await });
85
86 let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())
87 .context("failed to install SIGTERM handler")?;
88 let mut sigint = signal::unix::signal(signal::unix::SignalKind::interrupt())
89 .context("failed to install SIGINT handler")?;
90
91 tokio::select! {
92 _ = sigterm.recv() => tracing::info!("Received SIGTERM, shutting down gracefully"),
93 _ = sigint.recv() => tracing::info!("Received SIGINT, shutting down gracefully"),
94 }
95
96 monitor_handle.abort();
98 server_handle.abort();
99
100 tracing::info!("Shutdown complete");
101 Ok(())
102 }
103}