Skip to main content

tempo_sidecar/cmd/
monitor.rs

1use crate::monitor::{Monitor, prometheus_metrics};
2use alloy::primitives::Address;
3use clap::Parser;
4use eyre::{Context, eyre};
5use metrics::{describe_counter, describe_gauge};
6use metrics_exporter_prometheus::PrometheusBuilder;
7use poem::{EndpointExt, Route, Server, get, listener::TcpListener};
8use reqwest::Url;
9use tempo_precompiles::tip20::is_tip20_prefix;
10use tokio::signal;
11use tracing_subscriber::EnvFilter;
12
13#[derive(Parser, Debug)]
14#[command(author, version, about, long_about = None)]
15pub struct MonitorArgs {
16    #[arg(short, long, required = true)]
17    rpc_url: Url,
18
19    #[arg(long, default_value_t = 5)]
20    poll_interval: u64,
21
22    #[arg(short, long, required = true)]
23    chain_id: String,
24
25    #[arg(short, long, required = true)]
26    port: u16,
27
28    /// Comma-separated list of token addresses to monitor.
29    ///
30    /// NOTE: Only pools with both tokens whitelisted will be monitored.
31    #[arg(short, long, value_delimiter = ',', num_args = 2.., required = true)]
32    tokens: Vec<Address>,
33}
34
35impl MonitorArgs {
36    pub async fn run(self) -> eyre::Result<()> {
37        tracing_subscriber::FmtSubscriber::builder()
38            .with_env_filter(EnvFilter::from_default_env())
39            .init();
40
41        let builder = PrometheusBuilder::new().add_global_label("chain_id", self.chain_id.clone());
42        let metrics_handle = builder
43            .install_recorder()
44            .context("failed to install recorder")?;
45
46        if self.tokens.iter().any(|t| !is_tip20_prefix(*t)) {
47            return Err(eyre!("Invalid input. Pools require TIP20 tokens."));
48        }
49
50        let mut monitor = Monitor::new(
51            self.rpc_url,
52            self.poll_interval,
53            self.tokens.iter().copied().collect(),
54        )
55        .await
56        .context("failed to initialize monitor")?;
57
58        describe_gauge!(
59            "tempo_fee_amm_user_reserves",
60            "User token reserves in the FeeAMM pool"
61        );
62        describe_gauge!(
63            "tempo_fee_amm_validator_reserves",
64            "Validator token reserves in the FeeAMM pool"
65        );
66
67        describe_counter!(
68            "tempo_fee_amm_errors",
69            "Number of errors encountered while fetching FeeAMM data"
70        );
71
72        let app = Route::new().at(
73            "/metrics",
74            get(prometheus_metrics).data(metrics_handle.clone()),
75        );
76
77        let addr = format!("0.0.0.0:{}", self.port);
78
79        let monitor_handle = tokio::spawn(async move {
80            monitor.worker().await;
81        });
82
83        let server = Server::new(TcpListener::bind(addr));
84        let server_handle = tokio::spawn(async move { server.run(app).await });
85
86        let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())
87            .context("failed to install SIGTERM handler")?;
88        let mut sigint = signal::unix::signal(signal::unix::SignalKind::interrupt())
89            .context("failed to install SIGINT handler")?;
90
91        tokio::select! {
92            _ = sigterm.recv() => tracing::info!("Received SIGTERM, shutting down gracefully"),
93            _ = sigint.recv() => tracing::info!("Received SIGINT, shutting down gracefully"),
94        }
95
96        // Abort tasks
97        monitor_handle.abort();
98        server_handle.abort();
99
100        tracing::info!("Shutdown complete");
101        Ok(())
102    }
103}