Skip to main content

tempo_sidecar/monitor/
mod.rs

1use alloy::{
2    primitives::{
3        Address,
4        map::{AddressMap, AddressSet, HashMap, HashSet},
5    },
6    providers::{Provider, ProviderBuilder},
7    rpc::types::{Filter, Log},
8    sol_types::SolEvent,
9};
10use eyre::{Result, eyre};
11use futures::future::{join_all, try_join_all};
12use itertools::Itertools;
13use metrics::{counter, gauge};
14use metrics_exporter_prometheus::PrometheusHandle;
15use poem::{Response, handler};
16use rand_distr::num_traits::Zero;
17use reqwest::Url;
18use std::sync::Arc;
19use tempo_precompiles::{
20    TIP_FEE_MANAGER_ADDRESS,
21    tip_fee_manager::ITIPFeeAMM::{self, ITIPFeeAMMInstance, Mint, Pool},
22    tip20::ITIP20,
23};
24use tracing::{debug, error, info, instrument};
25
26pub struct TIP20Token {
27    decimals: u8,
28    name: String,
29}
30
31/// Configuration for the monitor.
32struct MonitorConfig {
33    rpc_url: Url,
34    poll_interval: u64,
35    target_tokens: AddressSet,
36}
37
38/// Initialized monitor with fetched token metadata.
39pub struct Monitor {
40    rpc_url: Url,
41    poll_interval: u64,
42    tokens: AddressMap<TIP20Token>,
43    pools: HashMap<(Address, Address), Pool>,
44    known_pairs: HashSet<(Address, Address)>,
45    last_processed_block: u64,
46}
47
48trait FilterExt {
49    fn with_minted_tokens<'a>(self, tokens: impl Iterator<Item = &'a Address>) -> Self;
50}
51
52impl FilterExt for Filter {
53    /// Restricts the filter to events where both, topic 2 and topic 3, are among the input tokens.
54    ///
55    /// WARNING: Caller must ensure that the filter targets fee AMM mint events:
56    /// - `Mint(address indexed sender, address indexed userToken, address indexed validatorToken, ..)`
57    fn with_minted_tokens<'a>(mut self, tokens: impl Iterator<Item = &'a Address>) -> Self {
58        for addr in tokens {
59            let b256 = addr.into_word();
60            self.topics[2].insert(b256);
61            self.topics[3].insert(b256);
62        }
63        self
64    }
65}
66
67impl MonitorConfig {
68    pub fn new(rpc_url: Url, poll_interval: u64, target_tokens: AddressSet) -> Self {
69        Self {
70            rpc_url,
71            poll_interval,
72            target_tokens,
73        }
74    }
75
76    /// Fetches token metadata, discovers existing pools, and returns an initialized `Monitor`.
77    #[instrument(name = "monitor::init", skip(self))]
78    pub async fn init(self) -> Result<Monitor> {
79        let provider = Arc::new(
80            ProviderBuilder::new()
81                .connect(self.rpc_url.as_str())
82                .await?,
83        );
84
85        // Fetch metadata for all whitelisted tokens
86        let tokens = self.fetch_token_metadata(&provider).await?;
87
88        // Discover existing pools by querying all token permutations
89        let last_processed_block = provider.get_block_number().await?;
90        let known_pairs = self.discover_pools(&provider).await?;
91
92        info!(
93            pools_discovered = known_pairs.len(),
94            last_block = last_processed_block,
95            "pool discovery complete"
96        );
97
98        Ok(Monitor {
99            rpc_url: self.rpc_url,
100            poll_interval: self.poll_interval,
101            tokens,
102            pools: Default::default(),
103            known_pairs,
104            last_processed_block,
105        })
106    }
107
108    /// Fetches metadata for all whitelisted tokens.
109    async fn fetch_token_metadata<P: Provider + Clone>(
110        &self,
111        provider: &Arc<P>,
112    ) -> Result<AddressMap<TIP20Token>> {
113        let get_token_metadata: Vec<_> = self
114            .target_tokens
115            .iter()
116            .map(|addr| {
117                debug!(%addr, "fetching token metadata");
118                let token = ITIP20::new(*addr, provider.clone());
119                async move {
120                    let decimals = token.decimals().call().await.map_err(|e| {
121                        counter!("tempo_fee_amm_errors", "request" => "decimals").increment(1);
122                        eyre!("failed to fetch token decimals for {}: {}", addr, e)
123                    })?;
124                    let name = token.name().call().await.map_err(|e| {
125                        counter!("tempo_fee_amm_errors", "request" => "name").increment(1);
126                        eyre!("failed to fetch token name for {}: {}", addr, e)
127                    })?;
128                    Ok::<_, eyre::Error>((*addr, TIP20Token { decimals, name }))
129                }
130            })
131            .collect();
132
133        try_join_all(get_token_metadata)
134            .await
135            .map(|v| v.into_iter().collect())
136    }
137
138    /// Discovers existing pools by querying all token permutations in parallel.
139    async fn discover_pools<P: Provider + Clone>(
140        &self,
141        provider: &Arc<P>,
142    ) -> Result<HashSet<(Address, Address)>> {
143        let check_pool_futures: Vec<_> = self
144            .target_tokens
145            .iter()
146            .permutations(2)
147            .map(|pair| {
148                let (token_a, token_b) = (*pair[0], *pair[1]);
149                let fee_amm = ITIPFeeAMM::new(TIP_FEE_MANAGER_ADDRESS, provider.clone());
150                async move {
151                    match fee_amm.getPool(token_a, token_b).call().await {
152                        Ok(pool) => {
153                            // Skip if pool isn't initialized.
154                            if pool.reserveUserToken.is_zero() {
155                                None
156                            } else {
157                                debug!(%token_a, %token_b, "discovered pool");
158                                Some((token_a, token_b))
159                            }
160                        }
161                        Err(e) => {
162                            counter!("tempo_fee_amm_errors", "request" => "pool").increment(1);
163                            error!(%token_a, %token_b, "failed to fetch pool: {}", e);
164                            None
165                        }
166                    }
167                }
168            })
169            .collect();
170
171        let results = join_all(check_pool_futures).await;
172        Ok(results.into_iter().flatten().collect())
173    }
174}
175
176impl Monitor {
177    /// Creates a new `Monitor` by fetching token metadata and discovering historical pools.
178    pub async fn new(rpc_url: Url, poll_interval: u64, target_tokens: AddressSet) -> Result<Self> {
179        MonitorConfig::new(rpc_url, poll_interval, target_tokens)
180            .init()
181            .await
182    }
183
184    /// Checks for new pools by querying `Mint` events since last processed block.
185    #[instrument(name = "monitor::check_for_new_pools", skip(self))]
186    async fn check_for_new_pools(&mut self) -> Result<()> {
187        let provider = ProviderBuilder::new()
188            .connect(self.rpc_url.as_str())
189            .await?;
190
191        let current_block = provider.get_block_number().await?;
192
193        if current_block <= self.last_processed_block {
194            return Ok(());
195        }
196
197        let filter = Filter::new()
198            .address(TIP_FEE_MANAGER_ADDRESS)
199            .event_signature(Mint::SIGNATURE_HASH)
200            .from_block(self.last_processed_block + 1)
201            .to_block(current_block)
202            .with_minted_tokens(self.tokens.keys());
203
204        let logs = provider.get_logs(&filter).await?;
205
206        let mut new_pools = 0;
207        for log in logs {
208            let (user_token, validator_token) = parse_mint_tokens(&log);
209            if self.known_pairs.insert((user_token, validator_token)) {
210                new_pools += 1;
211            }
212        }
213
214        self.last_processed_block = current_block;
215        if new_pools > 0 {
216            info!(new_pools, "discovered new pools");
217        }
218
219        Ok(())
220    }
221
222    #[instrument(name = "monitor::update_tip20_pools", skip(self))]
223    async fn update_tip20_pools(&mut self) -> Result<()> {
224        let provider = ProviderBuilder::new()
225            .connect(self.rpc_url.as_str())
226            .await?;
227
228        let fee_amm: ITIPFeeAMMInstance<_, _> = ITIPFeeAMM::new(TIP_FEE_MANAGER_ADDRESS, provider);
229
230        for &(token_a, token_b) in &self.known_pairs {
231            debug!(%token_a, %token_b, "fetching pool");
232
233            let pool: Result<Pool, _> = fee_amm.getPool(token_a, token_b).call().await;
234            match pool {
235                Ok(pool) => {
236                    // Skip if pool isn't initialized.
237                    if pool.reserveUserToken.is_zero() {
238                        continue;
239                    }
240
241                    self.pools.insert((token_a, token_b), pool);
242                }
243                Err(e) => {
244                    counter!("tempo_fee_amm_errors", "request" => "pool").increment(1);
245
246                    return Err(eyre!(
247                        "failed to fetch pool {} -> {}: {}",
248                        token_a,
249                        token_b,
250                        e
251                    ));
252                }
253            }
254        }
255
256        Ok(())
257    }
258
259    #[instrument(name = "monitor::update_metrics", skip(self))]
260    fn update_metrics(&self) {
261        for ((token_a_address, token_b_address), pool) in self.pools.iter() {
262            let (token_a_balance, token_b_balance) =
263                (pool.reserveUserToken, pool.reserveValidatorToken);
264
265            let token_a = match self.tokens.get(token_a_address) {
266                Some(token) => token,
267                None => continue,
268            };
269
270            let token_b = match self.tokens.get(token_b_address) {
271                Some(token) => token,
272                None => continue,
273            };
274
275            gauge!(
276                "tempo_fee_amm_user_token_reserves",
277                "token_a" => token_a_address.to_string(),
278                "token_b" => token_b_address.to_string(),
279                "token_a_name" => token_a.name.to_string(),
280                "token_b_name" => token_b.name.to_string()
281            )
282            .set((token_a_balance / 10u128.pow(token_a.decimals as u32)) as f64);
283
284            gauge!(
285                "tempo_fee_amm_validator_token_reserves",
286                "token_a" => token_a_address.to_string(),
287                "token_b" => token_b_address.to_string(),
288                "token_a_name" => token_a.name.to_string(),
289                "token_b_name" => token_b.name.to_string()
290            )
291            .set((token_b_balance / 10u128.pow(token_b.decimals as u32)) as f64);
292        }
293    }
294
295    #[instrument(name = "monitor::worker", skip(self))]
296    pub async fn worker(&mut self) {
297        loop {
298            info!("updating pools");
299            if let Err(e) = self.check_for_new_pools().await {
300                error!("failed to check for new pools: {}", e);
301            }
302            if let Err(e) = self.update_tip20_pools().await {
303                error!("failed to update pools: {}", e);
304            }
305            self.update_metrics();
306            tokio::time::sleep(std::time::Duration::from_secs(self.poll_interval)).await;
307        }
308    }
309}
310
311#[handler]
312pub async fn prometheus_metrics(handle: poem::web::Data<&PrometheusHandle>) -> Response {
313    let metrics = handle.render();
314    Response::builder()
315        .header("content-type", "text/plain")
316        .body(metrics)
317}
318
319/// Parses user and validator token addresses from a `FeeAMM::Mint` event log.
320///
321/// WARNING: Caller is responsible for ensuring the input is a `FeeAMM::Mint` event.
322fn parse_mint_tokens(log: &Log) -> (Address, Address) {
323    (
324        Address::from_word(log.topics()[2]),
325        Address::from_word(log.topics()[3]),
326    )
327}