tempo_sidecar/cmd/
simple_arb.rs

1use alloy::{
2    network::EthereumWallet,
3    primitives::{Address, U256},
4    providers::{Provider, ProviderBuilder},
5    signers::local::PrivateKeySigner,
6};
7use clap::Parser;
8use eyre::Context;
9use itertools::Itertools;
10use metrics::{counter, describe_counter};
11use metrics_exporter_prometheus::PrometheusBuilder;
12use poem::{EndpointExt as _, Route, Server, get, listener::TcpListener};
13use std::{collections::HashSet, time::Duration};
14use tempo_precompiles::{
15    TIP_FEE_MANAGER_ADDRESS, TIP20_FACTORY_ADDRESS, tip_fee_manager::ITIPFeeAMM,
16    tip20::token_id_to_address, tip20_factory::ITIP20Factory,
17};
18use tempo_telemetry_util::error_field;
19use tracing::{debug, error, info, instrument};
20
21use crate::monitor;
22
23#[derive(Parser, Debug)]
24#[command(author, version, about, long_about = None)]
25pub struct SimpleArbArgs {
26    /// RPC endpoint for the node
27    #[arg(short, long, required = true)]
28    rpc_url: String,
29
30    /// Private key of the tx sender
31    #[arg(short, long, required = true)]
32    private_key: String,
33
34    /// Interval between checking pools for rebalancing. This should be set to the block time.
35    #[arg(long, default_value_t = 2)]
36    poll_interval: u64,
37
38    /// Prometheus port for metrics
39    #[arg(long, default_value_t = 8000)]
40    metrics_port: u64,
41}
42
43#[instrument(skip(provider))]
44async fn fetch_all_pairs<P: Provider>(provider: P) -> eyre::Result<HashSet<(Address, Address)>> {
45    let tip20_factory = ITIP20Factory::new(TIP20_FACTORY_ADDRESS, provider);
46    let last_token_id = tip20_factory.tokenIdCounter().call().await?.to::<u64>();
47
48    let tokens = (0..last_token_id)
49        .map(token_id_to_address)
50        .collect::<Vec<_>>();
51
52    let mut pairs = HashSet::new();
53    for pair in tokens.iter().permutations(2) {
54        let (token_a, token_b) = (*pair[0], *pair[1]);
55        pairs.insert((token_a, token_b));
56    }
57
58    info!(
59        token_count = tokens.len(),
60        pair_count = pairs.len(),
61        "Fetched token pairs"
62    );
63
64    Ok(pairs)
65}
66
67impl SimpleArbArgs {
68    pub async fn run(self) -> eyre::Result<()> {
69        tracing_subscriber::FmtSubscriber::builder()
70            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
71            .init();
72
73        let builder = PrometheusBuilder::new();
74        let metrics_handle = builder
75            .install_recorder()
76            .context("failed to install recorder")?;
77
78        describe_counter!(
79            "tempo_arb_bot_successful_transactions",
80            "Number of successful transactions executed by the arb bot"
81        );
82        describe_counter!(
83            "tempo_arb_bot_failed_transactions",
84            "Number of failed transactions executed by the arb bot"
85        );
86
87        let app = Route::new().at(
88            "/metrics",
89            get(monitor::prometheus_metrics).data(metrics_handle.clone()),
90        );
91
92        let addr = format!("0.0.0.0:{}", self.metrics_port);
93
94        tokio::spawn(async move {
95            Server::new(TcpListener::bind(addr))
96                .run(app)
97                .await
98                .context("failed to run poem server")
99        });
100
101        let signer = PrivateKeySigner::from_slice(
102            &hex::decode(&self.private_key).context("failed to decode private key")?,
103        )
104        .context("failed to parse private key")?;
105
106        let signer_address = signer.address();
107        let wallet = EthereumWallet::from(signer);
108        let provider = ProviderBuilder::new()
109            .wallet(wallet)
110            .connect_http(self.rpc_url.parse().context("failed to parse RPC URL")?);
111
112        let fee_amm = ITIPFeeAMM::new(TIP_FEE_MANAGER_ADDRESS, provider.clone());
113
114        info!("Fetching all pairs...");
115        let pairs = fetch_all_pairs(provider.clone()).await?;
116
117        info!("Rebalancing initial pools...");
118        for pair in pairs.iter() {
119            // Get current pool state
120            let pool = fee_amm
121                .getPool(pair.0, pair.1)
122                .call()
123                .await
124                .wrap_err_with(|| {
125                    format!("failed to fetch pool for tokens {}, {}", pair.0, pair.1)
126                })?;
127
128            if pool.reserveUserToken > 0
129                && let Err(e) = fee_amm
130                    .rebalanceSwap(
131                        pair.0,
132                        pair.1,
133                        U256::from(pool.reserveUserToken),
134                        signer_address,
135                    )
136                    .send()
137                    .await
138            {
139                error!(
140                    token_a = %pair.0,
141                    token_b = %pair.1,
142                    amount = %pool.reserveUserToken,
143                    err = error_field(&e),
144                    "Failed to send initial rebalance transaction"
145                );
146            }
147        }
148
149        // NOTE: currently this is a very simple approach that checks all pools every `n`
150        // milliseconds. While this should ensure pools are always balanced within a few blocks,
151        // this can be updated to listen to events and only rebalance pools that have been swapped.
152        loop {
153            for pair in pairs.iter() {
154                // Get current pool state
155                let pool = fee_amm
156                    .getPool(pair.0, pair.1)
157                    .call()
158                    .await
159                    .wrap_err_with(|| {
160                        format!("failed to fetch pool for tokens {:?}, {:?}", pair.0, pair.1)
161                    })?;
162
163                if pool.reserveUserToken > 0 {
164                    let mut pending_txs = vec![];
165
166                    match fee_amm
167                        .rebalanceSwap(
168                            pair.0,
169                            pair.1,
170                            U256::from(pool.reserveUserToken),
171                            signer_address,
172                        )
173                        .send()
174                        .await
175                    {
176                        Ok(tx) => {
177                            pending_txs.push(tx);
178                        }
179
180                        Err(e) => {
181                            error!(
182                                token_a = %pair.0,
183                                token_b = %pair.1,
184                                amount = %pool.reserveUserToken,
185                                err = error_field(&e),
186                                "Failed to send rebalance transaction"
187                            );
188
189                            counter!("tempo_arb_bot_failed_transactions", "error" => "tx_send")
190                                .increment(1);
191                        }
192                    }
193
194                    // Await all receipts with timeout
195                    for tx in pending_txs {
196                        match tokio::time::timeout(
197                            Duration::from_secs(self.poll_interval * 2),
198                            tx.get_receipt(),
199                        )
200                        .await
201                        {
202                            Ok(Ok(_)) => {
203                                debug!("Tx receipt received successfully");
204                                counter!("tempo_arb_bot_successful_transactions").increment(1);
205                            }
206                            Ok(Err(e)) => {
207                                error!(err = error_field(&e), "Failed to get tx receipt");
208                                counter!("tempo_arb_bot_failed_transactions", "error" => "fetch_receipt")
209                                    .increment(1);
210                            }
211                            Err(_) => {
212                                error!("Timeout waiting for tx receipt");
213                                counter!("tempo_arb_bot_failed_transactions", "error" => "receipt_timeout")
214                                    .increment(1);
215                            }
216                        }
217                    }
218                }
219            }
220
221            tokio::time::sleep(Duration::from_secs(self.poll_interval)).await;
222            debug!("Polling interval elapsed, checking pools for rebalancing");
223        }
224    }
225}