1use alloy::{
2 primitives::{
3 Address,
4 map::{AddressMap, AddressSet, HashMap, HashSet},
5 },
6 providers::{Provider, ProviderBuilder},
7 rpc::types::{Filter, Log},
8 sol_types::SolEvent,
9};
10use eyre::{Result, eyre};
11use futures::future::{join_all, try_join_all};
12use itertools::Itertools;
13use metrics::{counter, gauge};
14use metrics_exporter_prometheus::PrometheusHandle;
15use poem::{Response, handler};
16use rand_distr::num_traits::Zero;
17use reqwest::Url;
18use std::sync::Arc;
19use tempo_precompiles::{
20 TIP_FEE_MANAGER_ADDRESS,
21 tip_fee_manager::ITIPFeeAMM::{self, ITIPFeeAMMInstance, Mint, Pool},
22 tip20::ITIP20,
23};
24use tracing::{debug, error, info, instrument};
25
26pub struct TIP20Token {
27 decimals: u8,
28 name: String,
29}
30
31struct MonitorConfig {
33 rpc_url: Url,
34 poll_interval: u64,
35 target_tokens: AddressSet,
36}
37
38pub struct Monitor {
40 rpc_url: Url,
41 poll_interval: u64,
42 tokens: AddressMap<TIP20Token>,
43 pools: HashMap<(Address, Address), Pool>,
44 known_pairs: HashSet<(Address, Address)>,
45 last_processed_block: u64,
46}
47
48trait FilterExt {
49 fn with_minted_tokens<'a>(self, tokens: impl Iterator<Item = &'a Address>) -> Self;
50}
51
52impl FilterExt for Filter {
53 fn with_minted_tokens<'a>(mut self, tokens: impl Iterator<Item = &'a Address>) -> Self {
58 for addr in tokens {
59 let b256 = addr.into_word();
60 self.topics[2].insert(b256);
61 self.topics[3].insert(b256);
62 }
63 self
64 }
65}
66
67impl MonitorConfig {
68 pub fn new(rpc_url: Url, poll_interval: u64, target_tokens: AddressSet) -> Self {
69 Self {
70 rpc_url,
71 poll_interval,
72 target_tokens,
73 }
74 }
75
76 #[instrument(name = "monitor::init", skip(self))]
78 pub async fn init(self) -> Result<Monitor> {
79 let provider = Arc::new(
80 ProviderBuilder::new()
81 .connect(self.rpc_url.as_str())
82 .await?,
83 );
84
85 let tokens = self.fetch_token_metadata(&provider).await?;
87
88 let last_processed_block = provider.get_block_number().await?;
90 let known_pairs = self.discover_pools(&provider).await?;
91
92 info!(
93 pools_discovered = known_pairs.len(),
94 last_block = last_processed_block,
95 "pool discovery complete"
96 );
97
98 Ok(Monitor {
99 rpc_url: self.rpc_url,
100 poll_interval: self.poll_interval,
101 tokens,
102 pools: Default::default(),
103 known_pairs,
104 last_processed_block,
105 })
106 }
107
108 async fn fetch_token_metadata<P: Provider + Clone>(
110 &self,
111 provider: &Arc<P>,
112 ) -> Result<AddressMap<TIP20Token>> {
113 let get_token_metadata: Vec<_> = self
114 .target_tokens
115 .iter()
116 .map(|addr| {
117 debug!(%addr, "fetching token metadata");
118 let token = ITIP20::new(*addr, provider.clone());
119 async move {
120 let decimals = token.decimals().call().await.map_err(|e| {
121 counter!("tempo_fee_amm_errors", "request" => "decimals").increment(1);
122 eyre!("failed to fetch token decimals for {}: {}", addr, e)
123 })?;
124 let name = token.name().call().await.map_err(|e| {
125 counter!("tempo_fee_amm_errors", "request" => "name").increment(1);
126 eyre!("failed to fetch token name for {}: {}", addr, e)
127 })?;
128 Ok::<_, eyre::Error>((*addr, TIP20Token { decimals, name }))
129 }
130 })
131 .collect();
132
133 try_join_all(get_token_metadata)
134 .await
135 .map(|v| v.into_iter().collect())
136 }
137
138 async fn discover_pools<P: Provider + Clone>(
140 &self,
141 provider: &Arc<P>,
142 ) -> Result<HashSet<(Address, Address)>> {
143 let check_pool_futures: Vec<_> = self
144 .target_tokens
145 .iter()
146 .permutations(2)
147 .map(|pair| {
148 let (token_a, token_b) = (*pair[0], *pair[1]);
149 let fee_amm = ITIPFeeAMM::new(TIP_FEE_MANAGER_ADDRESS, provider.clone());
150 async move {
151 match fee_amm.getPool(token_a, token_b).call().await {
152 Ok(pool) => {
153 if pool.reserveUserToken.is_zero() {
155 None
156 } else {
157 debug!(%token_a, %token_b, "discovered pool");
158 Some((token_a, token_b))
159 }
160 }
161 Err(e) => {
162 counter!("tempo_fee_amm_errors", "request" => "pool").increment(1);
163 error!(%token_a, %token_b, "failed to fetch pool: {}", e);
164 None
165 }
166 }
167 }
168 })
169 .collect();
170
171 let results = join_all(check_pool_futures).await;
172 Ok(results.into_iter().flatten().collect())
173 }
174}
175
176impl Monitor {
177 pub async fn new(rpc_url: Url, poll_interval: u64, target_tokens: AddressSet) -> Result<Self> {
179 MonitorConfig::new(rpc_url, poll_interval, target_tokens)
180 .init()
181 .await
182 }
183
184 #[instrument(name = "monitor::check_for_new_pools", skip(self))]
186 async fn check_for_new_pools(&mut self) -> Result<()> {
187 let provider = ProviderBuilder::new()
188 .connect(self.rpc_url.as_str())
189 .await?;
190
191 let current_block = provider.get_block_number().await?;
192
193 if current_block <= self.last_processed_block {
194 return Ok(());
195 }
196
197 let filter = Filter::new()
198 .address(TIP_FEE_MANAGER_ADDRESS)
199 .event_signature(Mint::SIGNATURE_HASH)
200 .from_block(self.last_processed_block + 1)
201 .to_block(current_block)
202 .with_minted_tokens(self.tokens.keys());
203
204 let logs = provider.get_logs(&filter).await?;
205
206 let mut new_pools = 0;
207 for log in logs {
208 let (user_token, validator_token) = parse_mint_tokens(&log);
209 if self.known_pairs.insert((user_token, validator_token)) {
210 new_pools += 1;
211 }
212 }
213
214 self.last_processed_block = current_block;
215 if new_pools > 0 {
216 info!(new_pools, "discovered new pools");
217 }
218
219 Ok(())
220 }
221
222 #[instrument(name = "monitor::update_tip20_pools", skip(self))]
223 async fn update_tip20_pools(&mut self) -> Result<()> {
224 let provider = ProviderBuilder::new()
225 .connect(self.rpc_url.as_str())
226 .await?;
227
228 let fee_amm: ITIPFeeAMMInstance<_, _> = ITIPFeeAMM::new(TIP_FEE_MANAGER_ADDRESS, provider);
229
230 for &(token_a, token_b) in &self.known_pairs {
231 debug!(%token_a, %token_b, "fetching pool");
232
233 let pool: Result<Pool, _> = fee_amm.getPool(token_a, token_b).call().await;
234 match pool {
235 Ok(pool) => {
236 if pool.reserveUserToken.is_zero() {
238 continue;
239 }
240
241 self.pools.insert((token_a, token_b), pool);
242 }
243 Err(e) => {
244 counter!("tempo_fee_amm_errors", "request" => "pool").increment(1);
245
246 return Err(eyre!(
247 "failed to fetch pool {} -> {}: {}",
248 token_a,
249 token_b,
250 e
251 ));
252 }
253 }
254 }
255
256 Ok(())
257 }
258
259 #[instrument(name = "monitor::update_metrics", skip(self))]
260 fn update_metrics(&self) {
261 for ((token_a_address, token_b_address), pool) in self.pools.iter() {
262 let (token_a_balance, token_b_balance) =
263 (pool.reserveUserToken, pool.reserveValidatorToken);
264
265 let token_a = match self.tokens.get(token_a_address) {
266 Some(token) => token,
267 None => continue,
268 };
269
270 let token_b = match self.tokens.get(token_b_address) {
271 Some(token) => token,
272 None => continue,
273 };
274
275 gauge!(
276 "tempo_fee_amm_user_token_reserves",
277 "token_a" => token_a_address.to_string(),
278 "token_b" => token_b_address.to_string(),
279 "token_a_name" => token_a.name.to_string(),
280 "token_b_name" => token_b.name.to_string()
281 )
282 .set((token_a_balance / 10u128.pow(token_a.decimals as u32)) as f64);
283
284 gauge!(
285 "tempo_fee_amm_validator_token_reserves",
286 "token_a" => token_a_address.to_string(),
287 "token_b" => token_b_address.to_string(),
288 "token_a_name" => token_a.name.to_string(),
289 "token_b_name" => token_b.name.to_string()
290 )
291 .set((token_b_balance / 10u128.pow(token_b.decimals as u32)) as f64);
292 }
293 }
294
295 #[instrument(name = "monitor::worker", skip(self))]
296 pub async fn worker(&mut self) {
297 loop {
298 info!("updating pools");
299 if let Err(e) = self.check_for_new_pools().await {
300 error!("failed to check for new pools: {}", e);
301 }
302 if let Err(e) = self.update_tip20_pools().await {
303 error!("failed to update pools: {}", e);
304 }
305 self.update_metrics();
306 tokio::time::sleep(std::time::Duration::from_secs(self.poll_interval)).await;
307 }
308 }
309}
310
311#[handler]
312pub async fn prometheus_metrics(handle: poem::web::Data<&PrometheusHandle>) -> Response {
313 let metrics = handle.render();
314 Response::builder()
315 .header("content-type", "text/plain")
316 .body(metrics)
317}
318
319fn parse_mint_tokens(log: &Log) -> (Address, Address) {
323 (
324 Address::from_word(log.topics()[2]),
325 Address::from_word(log.topics()[3]),
326 )
327}