tempo_sidecar/cmd/
monitor.rs

1use crate::monitor::{Monitor, prometheus_metrics};
2use clap::Parser;
3use eyre::Context;
4use metrics::{describe_counter, describe_gauge};
5use metrics_exporter_prometheus::PrometheusBuilder;
6use poem::{EndpointExt, Route, Server, get, listener::TcpListener};
7use reqwest::Url;
8use tokio::signal;
9use tracing_subscriber::EnvFilter;
10
11#[derive(Parser, Debug)]
12#[command(author, version, about, long_about = None)]
13pub struct MonitorArgs {
14    #[arg(short, long, required = true)]
15    rpc_url: Url,
16
17    #[arg(long, default_value_t = 5)]
18    poll_interval: u64,
19
20    #[arg(short, long, required = true)]
21    chain_id: String,
22
23    #[arg(short, long, required = true)]
24    port: u16,
25}
26
27impl MonitorArgs {
28    pub async fn run(self) -> eyre::Result<()> {
29        tracing_subscriber::FmtSubscriber::builder()
30            .with_env_filter(EnvFilter::from_default_env())
31            .init();
32
33        let builder = PrometheusBuilder::new().add_global_label("chain_id", self.chain_id.clone());
34        let metrics_handle = builder
35            .install_recorder()
36            .context("failed to install recorder")?;
37
38        let mut monitor = Monitor::new(self.rpc_url, self.poll_interval);
39
40        describe_gauge!(
41            "tempo_fee_amm_user_reserves",
42            "User token reserves in the FeeAMM pool"
43        );
44        describe_gauge!(
45            "tempo_fee_amm_validator_reserves",
46            "Validator token reserves in the FeeAMM pool"
47        );
48
49        describe_counter!(
50            "tempo_fee_amm_errors",
51            "Number of errors encountered while fetching FeeAMM data"
52        );
53
54        let app = Route::new().at(
55            "/metrics",
56            get(prometheus_metrics).data(metrics_handle.clone()),
57        );
58
59        let addr = format!("0.0.0.0:{}", self.port);
60
61        let monitor_handle = tokio::spawn(async move {
62            monitor.worker().await;
63        });
64
65        let server = Server::new(TcpListener::bind(addr));
66        let server_handle = tokio::spawn(async move { server.run(app).await });
67
68        let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())
69            .context("failed to install SIGTERM handler")?;
70        let mut sigint = signal::unix::signal(signal::unix::SignalKind::interrupt())
71            .context("failed to install SIGINT handler")?;
72
73        tokio::select! {
74            _ = sigterm.recv() => tracing::info!("Received SIGTERM, shutting down gracefully"),
75            _ = sigint.recv() => tracing::info!("Received SIGINT, shutting down gracefully"),
76        }
77
78        // Abort tasks
79        monitor_handle.abort();
80        server_handle.abort();
81
82        tracing::info!("Shutdown complete");
83        Ok(())
84    }
85}