Skip to main content

tempo_sidecar/cmd/
simple_arb.rs

1use alloy::{
2    network::EthereumWallet,
3    primitives::{Address, U256},
4    providers::{Provider, ProviderBuilder},
5    rpc::types::Filter,
6    signers::local::PrivateKeySigner,
7    sol_types::SolEvent,
8};
9use clap::Parser;
10use eyre::Context;
11use itertools::Itertools;
12use metrics::{counter, describe_counter};
13use metrics_exporter_prometheus::PrometheusBuilder;
14use poem::{EndpointExt as _, Route, Server, get, listener::TcpListener};
15use std::{collections::HashSet, time::Duration};
16use tempo_precompiles::{
17    TIP_FEE_MANAGER_ADDRESS, TIP20_FACTORY_ADDRESS, tip_fee_manager::ITIPFeeAMM,
18    tip20_factory::ITIP20Factory,
19};
20use tempo_telemetry_util::error_field;
21use tracing::{debug, error, info, instrument};
22
23use crate::monitor;
24
25#[derive(Parser, Debug)]
26#[command(author, version, about, long_about = None)]
27pub struct SimpleArbArgs {
28    /// RPC endpoint for the node
29    #[arg(short, long, required = true)]
30    rpc_url: String,
31
32    /// Private key of the tx sender
33    #[arg(short, long, required = true)]
34    private_key: String,
35
36    /// Interval between checking pools for rebalancing. This should be set to the block time.
37    #[arg(long, default_value_t = 2)]
38    poll_interval: u64,
39
40    /// Prometheus port for metrics
41    #[arg(long, default_value_t = 8000)]
42    metrics_port: u64,
43}
44
45#[instrument(skip(provider))]
46async fn fetch_all_pairs<P: Provider>(provider: P) -> eyre::Result<HashSet<(Address, Address)>> {
47    let filter = Filter::new()
48        .address(TIP20_FACTORY_ADDRESS)
49        .event_signature(ITIP20Factory::TokenCreated::SIGNATURE_HASH);
50
51    let logs = provider.get_logs(&filter).await?;
52
53    let tokens: Vec<Address> = logs
54        .iter()
55        .filter_map(|log| {
56            log.log_decode::<ITIP20Factory::TokenCreated>()
57                .ok()
58                .map(|event| event.inner.token)
59        })
60        .collect();
61
62    let mut pairs = HashSet::new();
63    for pair in tokens.iter().permutations(2) {
64        let (token_a, token_b) = (*pair[0], *pair[1]);
65        pairs.insert((token_a, token_b));
66    }
67
68    info!(
69        token_count = tokens.len(),
70        pair_count = pairs.len(),
71        "Fetched token pairs"
72    );
73
74    Ok(pairs)
75}
76
77impl SimpleArbArgs {
78    pub async fn run(self) -> eyre::Result<()> {
79        tracing_subscriber::FmtSubscriber::builder()
80            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
81            .init();
82
83        let builder = PrometheusBuilder::new();
84        let metrics_handle = builder
85            .install_recorder()
86            .context("failed to install recorder")?;
87
88        describe_counter!(
89            "tempo_arb_bot_successful_transactions",
90            "Number of successful transactions executed by the arb bot"
91        );
92        describe_counter!(
93            "tempo_arb_bot_failed_transactions",
94            "Number of failed transactions executed by the arb bot"
95        );
96
97        let app = Route::new().at(
98            "/metrics",
99            get(monitor::prometheus_metrics).data(metrics_handle.clone()),
100        );
101
102        let addr = format!("0.0.0.0:{}", self.metrics_port);
103
104        tokio::spawn(async move {
105            Server::new(TcpListener::bind(addr))
106                .run(app)
107                .await
108                .context("failed to run poem server")
109        });
110
111        let signer = PrivateKeySigner::from_slice(
112            &hex::decode(&self.private_key).context("failed to decode private key")?,
113        )
114        .context("failed to parse private key")?;
115
116        let signer_address = signer.address();
117        let wallet = EthereumWallet::from(signer);
118        let provider = ProviderBuilder::new()
119            .wallet(wallet)
120            .connect_http(self.rpc_url.parse().context("failed to parse RPC URL")?);
121
122        let fee_amm = ITIPFeeAMM::new(TIP_FEE_MANAGER_ADDRESS, provider.clone());
123
124        info!("Fetching all pairs...");
125        let pairs = fetch_all_pairs(provider.clone()).await?;
126
127        info!("Rebalancing initial pools...");
128        for pair in pairs.iter() {
129            // Get current pool state
130            let pool = fee_amm
131                .getPool(pair.0, pair.1)
132                .call()
133                .await
134                .wrap_err_with(|| {
135                    format!("failed to fetch pool for tokens {}, {}", pair.0, pair.1)
136                })?;
137
138            if pool.reserveUserToken > 0
139                && let Err(e) = fee_amm
140                    .rebalanceSwap(
141                        pair.0,
142                        pair.1,
143                        U256::from(pool.reserveUserToken),
144                        signer_address,
145                    )
146                    .send()
147                    .await
148            {
149                error!(
150                    token_a = %pair.0,
151                    token_b = %pair.1,
152                    amount = %pool.reserveUserToken,
153                    err = error_field(&e),
154                    "Failed to send initial rebalance transaction"
155                );
156            }
157        }
158
159        // NOTE: currently this is a very simple approach that checks all pools every `n`
160        // milliseconds. While this should ensure pools are always balanced within a few blocks,
161        // this can be updated to listen to events and only rebalance pools that have been swapped.
162        loop {
163            for pair in pairs.iter() {
164                // Get current pool state
165                let pool = fee_amm
166                    .getPool(pair.0, pair.1)
167                    .call()
168                    .await
169                    .wrap_err_with(|| {
170                        format!("failed to fetch pool for tokens {:?}, {:?}", pair.0, pair.1)
171                    })?;
172
173                if pool.reserveUserToken > 0 {
174                    let mut pending_txs = vec![];
175
176                    match fee_amm
177                        .rebalanceSwap(
178                            pair.0,
179                            pair.1,
180                            U256::from(pool.reserveUserToken),
181                            signer_address,
182                        )
183                        .send()
184                        .await
185                    {
186                        Ok(tx) => {
187                            pending_txs.push(tx);
188                        }
189
190                        Err(e) => {
191                            error!(
192                                token_a = %pair.0,
193                                token_b = %pair.1,
194                                amount = %pool.reserveUserToken,
195                                err = error_field(&e),
196                                "Failed to send rebalance transaction"
197                            );
198
199                            counter!("tempo_arb_bot_failed_transactions", "error" => "tx_send")
200                                .increment(1);
201                        }
202                    }
203
204                    // Await all receipts with timeout
205                    for tx in pending_txs {
206                        match tokio::time::timeout(
207                            Duration::from_secs(self.poll_interval * 2),
208                            tx.get_receipt(),
209                        )
210                        .await
211                        {
212                            Ok(Ok(_)) => {
213                                debug!("Tx receipt received successfully");
214                                counter!("tempo_arb_bot_successful_transactions").increment(1);
215                            }
216                            Ok(Err(e)) => {
217                                error!(err = error_field(&e), "Failed to get tx receipt");
218                                counter!("tempo_arb_bot_failed_transactions", "error" => "fetch_receipt")
219                                    .increment(1);
220                            }
221                            Err(_) => {
222                                error!("Timeout waiting for tx receipt");
223                                counter!("tempo_arb_bot_failed_transactions", "error" => "receipt_timeout")
224                                    .increment(1);
225                            }
226                        }
227                    }
228                }
229            }
230
231            tokio::time::sleep(Duration::from_secs(self.poll_interval)).await;
232            debug!("Polling interval elapsed, checking pools for rebalancing");
233        }
234    }
235}