tempo_sidecar/monitor/
mod.rs1use alloy::{primitives::Address, providers::ProviderBuilder};
2use eyre::{Result, eyre};
3use itertools::Itertools;
4use metrics::{counter, gauge};
5use metrics_exporter_prometheus::PrometheusHandle;
6use poem::{Response, handler};
7use reqwest::Url;
8use std::collections::HashMap;
9use tempo_precompiles::{
10 TIP_FEE_MANAGER_ADDRESS, TIP20_FACTORY_ADDRESS,
11 tip_fee_manager::ITIPFeeAMM::{self, ITIPFeeAMMInstance, Pool},
12 tip20::{ITIP20, token_id_to_address},
13 tip20_factory::ITIP20Factory,
14};
15use tracing::{debug, error, info, instrument};
16
17pub struct TIP20Token {
18 decimals: u8,
19 name: String,
20}
21
22pub struct Monitor {
23 rpc_url: Url,
24 poll_interval: u64,
25 tokens: HashMap<Address, TIP20Token>,
26 pools: HashMap<(Address, Address), Pool>,
27}
28
29impl Monitor {
30 pub fn new(rpc_url: Url, poll_interval: u64) -> Self {
31 Self {
32 rpc_url,
33 poll_interval,
34 tokens: HashMap::new(),
35 pools: HashMap::new(),
36 }
37 }
38
39 #[instrument(name = "monitor::update_tip20_tokens", skip(self))]
40 async fn update_tip20_tokens(&mut self) -> Result<()> {
41 let provider = ProviderBuilder::new()
42 .connect(self.rpc_url.as_str())
43 .await?;
44 let tip20_factory = ITIP20Factory::new(TIP20_FACTORY_ADDRESS, provider.clone());
45
46 let last_token_id = tip20_factory
47 .tokenIdCounter()
48 .call()
49 .await
50 .map_err(|e| eyre!("{}", e))?
51 .to::<u64>();
52
53 info!(count = last_token_id + 1, "fetching tokens");
54
55 for token_address in (0..last_token_id).map(token_id_to_address) {
56 debug!("fetching token at address {}", token_address);
57 if self.tokens.contains_key(&token_address) {
58 debug!("token already exists, skipping");
59 continue;
60 }
61
62 let token = ITIP20::new(token_address, provider.clone());
63 let decimals = token.decimals().call().await.map_err(|e| {
64 counter!("tempo_fee_amm_errors", "request" => "decimals").increment(1);
65 eyre!(
66 "failed to fetch token decimals for {}: {}",
67 token_address,
68 e
69 )
70 })?;
71
72 let name = token.name().call().await.map_err(|e| {
73 counter!("tempo_fee_amm_errors", "request" => "decimals").increment(1);
74 eyre!("failed to fetch token name for {}: {}", token_address, e)
75 })?;
76
77 self.tokens
78 .insert(token_address, TIP20Token { decimals, name });
79 }
80
81 Ok(())
82 }
83
84 #[instrument(name = "monitor::update_tip20_pools", skip(self))]
85 async fn update_tip20_pools(&mut self) -> Result<()> {
86 let provider = ProviderBuilder::new()
87 .connect(self.rpc_url.as_str())
88 .await?;
89
90 let fee_amm: ITIPFeeAMMInstance<_, _> = ITIPFeeAMM::new(TIP_FEE_MANAGER_ADDRESS, provider);
91
92 for pool_addresses in self.tokens.keys().permutations(2) {
93 let (&token_a, &token_b) = (pool_addresses[0], pool_addresses[1]);
94 debug!(%token_a, %token_b, "fetching pool");
95
96 let pool: Result<Pool, _> = fee_amm.getPool(token_a, token_b).call().await;
97 match pool {
98 Ok(pool) => {
99 self.pools.insert((token_a, token_b), pool);
100 }
101 Err(e) => {
102 if e.as_revert_data().is_some() {
104 continue;
105 };
106
107 counter!("tempo_fee_amm_errors", "request" => "pool").increment(1);
108
109 return Err(eyre!(
110 "failed to fetch pool {} -> {}: {}",
111 token_a,
112 token_b,
113 e
114 ));
115 }
116 }
117 }
118
119 Ok(())
120 }
121
122 #[instrument(name = "monitor::update_metrics", skip(self))]
123 async fn update_metrics(&self) {
124 for ((token_a_address, token_b_address), pool) in self.pools.iter() {
125 let (token_a_balance, token_b_balance) =
126 (pool.reserveUserToken, pool.reserveValidatorToken);
127
128 let token_a = match self.tokens.get(token_a_address) {
129 Some(token) => token,
130 None => continue,
131 };
132
133 let token_b = match self.tokens.get(token_b_address) {
134 Some(token) => token,
135 None => continue,
136 };
137
138 gauge!(
139 "tempo_fee_amm_user_token_reserves",
140 "token_a" => token_a_address.to_string(),
141 "token_b" => token_b_address.to_string(),
142 "token_a_name" => token_a.name.to_string(),
143 "token_b_name" => token_b.name.to_string()
144 )
145 .set((token_a_balance / 10u128.pow(token_a.decimals as u32)) as f64);
146
147 gauge!(
148 "tempo_fee_amm_validator_token_reserves",
149 "token_a" => token_a_address.to_string(),
150 "token_b" => token_b_address.to_string(),
151 "token_a_name" => token_a.name.to_string(),
152 "token_b_name" => token_b.name.to_string()
153 )
154 .set((token_b_balance / 10u128.pow(token_b.decimals as u32)) as f64);
155 }
156 }
157
158 #[instrument(name = "monitor::worker", skip(self))]
159 pub async fn worker(&mut self) {
160 loop {
161 info!("updating pools and tokens");
162 if let Err(e) = self.update_tip20_tokens().await {
163 error!("failed to update pools: {}", e);
164 };
165 if let Err(e) = self.update_tip20_pools().await {
166 error!("failed to update pools: {}", e);
167 };
168 self.update_metrics().await;
169 tokio::time::sleep(std::time::Duration::from_secs(self.poll_interval)).await;
170 }
171 }
172}
173
174#[handler]
175pub async fn prometheus_metrics(handle: poem::web::Data<&PrometheusHandle>) -> Response {
176 let metrics = handle.render();
177 Response::builder()
178 .header("content-type", "text/plain")
179 .body(metrics)
180}