tempo_sidecar/monitor/
mod.rs

1use alloy::{primitives::Address, providers::ProviderBuilder};
2use eyre::{Result, eyre};
3use itertools::Itertools;
4use metrics::{counter, gauge};
5use metrics_exporter_prometheus::PrometheusHandle;
6use poem::{Response, handler};
7use reqwest::Url;
8use std::collections::HashMap;
9use tempo_precompiles::{
10    TIP_FEE_MANAGER_ADDRESS, TIP20_FACTORY_ADDRESS,
11    tip_fee_manager::ITIPFeeAMM::{self, ITIPFeeAMMInstance, Pool},
12    tip20::{ITIP20, token_id_to_address},
13    tip20_factory::ITIP20Factory,
14};
15use tracing::{debug, error, info, instrument};
16
17pub struct TIP20Token {
18    decimals: u8,
19    name: String,
20}
21
22pub struct Monitor {
23    rpc_url: Url,
24    poll_interval: u64,
25    tokens: HashMap<Address, TIP20Token>,
26    pools: HashMap<(Address, Address), Pool>,
27}
28
29impl Monitor {
30    pub fn new(rpc_url: Url, poll_interval: u64) -> Self {
31        Self {
32            rpc_url,
33            poll_interval,
34            tokens: HashMap::new(),
35            pools: HashMap::new(),
36        }
37    }
38
39    #[instrument(name = "monitor::update_tip20_tokens", skip(self))]
40    async fn update_tip20_tokens(&mut self) -> Result<()> {
41        let provider = ProviderBuilder::new()
42            .connect(self.rpc_url.as_str())
43            .await?;
44        let tip20_factory = ITIP20Factory::new(TIP20_FACTORY_ADDRESS, provider.clone());
45
46        let last_token_id = tip20_factory
47            .tokenIdCounter()
48            .call()
49            .await
50            .map_err(|e| eyre!("{}", e))?
51            .to::<u64>();
52
53        info!(count = last_token_id + 1, "fetching tokens");
54
55        for token_address in (0..last_token_id).map(token_id_to_address) {
56            debug!("fetching token at address {}", token_address);
57            if self.tokens.contains_key(&token_address) {
58                debug!("token already exists, skipping");
59                continue;
60            }
61
62            let token = ITIP20::new(token_address, provider.clone());
63            let decimals = token.decimals().call().await.map_err(|e| {
64                counter!("tempo_fee_amm_errors", "request" => "decimals").increment(1);
65                eyre!(
66                    "failed to fetch token decimals for {}: {}",
67                    token_address,
68                    e
69                )
70            })?;
71
72            let name = token.name().call().await.map_err(|e| {
73                counter!("tempo_fee_amm_errors", "request" => "decimals").increment(1);
74                eyre!("failed to fetch token name for {}: {}", token_address, e)
75            })?;
76
77            self.tokens
78                .insert(token_address, TIP20Token { decimals, name });
79        }
80
81        Ok(())
82    }
83
84    #[instrument(name = "monitor::update_tip20_pools", skip(self))]
85    async fn update_tip20_pools(&mut self) -> Result<()> {
86        let provider = ProviderBuilder::new()
87            .connect(self.rpc_url.as_str())
88            .await?;
89
90        let fee_amm: ITIPFeeAMMInstance<_, _> = ITIPFeeAMM::new(TIP_FEE_MANAGER_ADDRESS, provider);
91
92        for pool_addresses in self.tokens.keys().permutations(2) {
93            let (&token_a, &token_b) = (pool_addresses[0], pool_addresses[1]);
94            debug!(%token_a, %token_b, "fetching pool");
95
96            let pool: Result<Pool, _> = fee_amm.getPool(token_a, token_b).call().await;
97            match pool {
98                Ok(pool) => {
99                    self.pools.insert((token_a, token_b), pool);
100                }
101                Err(e) => {
102                    // skip if pool is non existent
103                    if e.as_revert_data().is_some() {
104                        continue;
105                    };
106
107                    counter!("tempo_fee_amm_errors", "request" => "pool").increment(1);
108
109                    return Err(eyre!(
110                        "failed to fetch pool {} -> {}: {}",
111                        token_a,
112                        token_b,
113                        e
114                    ));
115                }
116            }
117        }
118
119        Ok(())
120    }
121
122    #[instrument(name = "monitor::update_metrics", skip(self))]
123    async fn update_metrics(&self) {
124        for ((token_a_address, token_b_address), pool) in self.pools.iter() {
125            let (token_a_balance, token_b_balance) =
126                (pool.reserveUserToken, pool.reserveValidatorToken);
127
128            let token_a = match self.tokens.get(token_a_address) {
129                Some(token) => token,
130                None => continue,
131            };
132
133            let token_b = match self.tokens.get(token_b_address) {
134                Some(token) => token,
135                None => continue,
136            };
137
138            gauge!(
139                "tempo_fee_amm_user_token_reserves",
140                "token_a" => token_a_address.to_string(),
141                "token_b" => token_b_address.to_string(),
142                "token_a_name" => token_a.name.to_string(),
143                "token_b_name" => token_b.name.to_string()
144            )
145            .set((token_a_balance / 10u128.pow(token_a.decimals as u32)) as f64);
146
147            gauge!(
148                "tempo_fee_amm_validator_token_reserves",
149                "token_a" => token_a_address.to_string(),
150                "token_b" => token_b_address.to_string(),
151                "token_a_name" => token_a.name.to_string(),
152                "token_b_name" => token_b.name.to_string()
153            )
154            .set((token_b_balance / 10u128.pow(token_b.decimals as u32)) as f64);
155        }
156    }
157
158    #[instrument(name = "monitor::worker", skip(self))]
159    pub async fn worker(&mut self) {
160        loop {
161            info!("updating pools and tokens");
162            if let Err(e) = self.update_tip20_tokens().await {
163                error!("failed to update pools: {}", e);
164            };
165            if let Err(e) = self.update_tip20_pools().await {
166                error!("failed to update pools: {}", e);
167            };
168            self.update_metrics().await;
169            tokio::time::sleep(std::time::Duration::from_secs(self.poll_interval)).await;
170        }
171    }
172}
173
174#[handler]
175pub async fn prometheus_metrics(handle: poem::web::Data<&PrometheusHandle>) -> Response {
176    let metrics = handle.render();
177    Response::builder()
178        .header("content-type", "text/plain")
179        .body(metrics)
180}