tempo_sidecar/cmd/
simple_arb.rs1use alloy::{
2 network::EthereumWallet,
3 primitives::{Address, U256},
4 providers::{Provider, ProviderBuilder},
5 signers::local::PrivateKeySigner,
6};
7use clap::Parser;
8use eyre::Context;
9use itertools::Itertools;
10use metrics::{counter, describe_counter};
11use metrics_exporter_prometheus::PrometheusBuilder;
12use poem::{EndpointExt as _, Route, Server, get, listener::TcpListener};
13use std::{collections::HashSet, time::Duration};
14use tempo_precompiles::{
15 TIP_FEE_MANAGER_ADDRESS, TIP20_FACTORY_ADDRESS, tip_fee_manager::ITIPFeeAMM,
16 tip20::token_id_to_address, tip20_factory::ITIP20Factory,
17};
18use tempo_telemetry_util::error_field;
19use tracing::{debug, error, info, instrument};
20
21use crate::monitor;
22
23#[derive(Parser, Debug)]
24#[command(author, version, about, long_about = None)]
25pub struct SimpleArbArgs {
26 #[arg(short, long, required = true)]
28 rpc_url: String,
29
30 #[arg(short, long, required = true)]
32 private_key: String,
33
34 #[arg(long, default_value_t = 2)]
36 poll_interval: u64,
37
38 #[arg(long, default_value_t = 8000)]
40 metrics_port: u64,
41}
42
43#[instrument(skip(provider))]
44async fn fetch_all_pairs<P: Provider>(provider: P) -> eyre::Result<HashSet<(Address, Address)>> {
45 let tip20_factory = ITIP20Factory::new(TIP20_FACTORY_ADDRESS, provider);
46 let last_token_id = tip20_factory.tokenIdCounter().call().await?.to::<u64>();
47
48 let tokens = (0..last_token_id)
49 .map(token_id_to_address)
50 .collect::<Vec<_>>();
51
52 let mut pairs = HashSet::new();
53 for pair in tokens.iter().permutations(2) {
54 let (token_a, token_b) = (*pair[0], *pair[1]);
55 pairs.insert((token_a, token_b));
56 }
57
58 info!(
59 token_count = tokens.len(),
60 pair_count = pairs.len(),
61 "Fetched token pairs"
62 );
63
64 Ok(pairs)
65}
66
67impl SimpleArbArgs {
68 pub async fn run(self) -> eyre::Result<()> {
69 tracing_subscriber::FmtSubscriber::builder()
70 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
71 .init();
72
73 let builder = PrometheusBuilder::new();
74 let metrics_handle = builder
75 .install_recorder()
76 .context("failed to install recorder")?;
77
78 describe_counter!(
79 "tempo_arb_bot_successful_transactions",
80 "Number of successful transactions executed by the arb bot"
81 );
82 describe_counter!(
83 "tempo_arb_bot_failed_transactions",
84 "Number of failed transactions executed by the arb bot"
85 );
86
87 let app = Route::new().at(
88 "/metrics",
89 get(monitor::prometheus_metrics).data(metrics_handle.clone()),
90 );
91
92 let addr = format!("0.0.0.0:{}", self.metrics_port);
93
94 tokio::spawn(async move {
95 Server::new(TcpListener::bind(addr))
96 .run(app)
97 .await
98 .context("failed to run poem server")
99 });
100
101 let signer = PrivateKeySigner::from_slice(
102 &hex::decode(&self.private_key).context("failed to decode private key")?,
103 )
104 .context("failed to parse private key")?;
105
106 let signer_address = signer.address();
107 let wallet = EthereumWallet::from(signer);
108 let provider = ProviderBuilder::new()
109 .wallet(wallet)
110 .connect_http(self.rpc_url.parse().context("failed to parse RPC URL")?);
111
112 let fee_amm = ITIPFeeAMM::new(TIP_FEE_MANAGER_ADDRESS, provider.clone());
113
114 info!("Fetching all pairs...");
115 let pairs = fetch_all_pairs(provider.clone()).await?;
116
117 info!("Rebalancing initial pools...");
118 for pair in pairs.iter() {
119 let pool = fee_amm
121 .getPool(pair.0, pair.1)
122 .call()
123 .await
124 .wrap_err_with(|| {
125 format!("failed to fetch pool for tokens {}, {}", pair.0, pair.1)
126 })?;
127
128 if pool.reserveUserToken > 0
129 && let Err(e) = fee_amm
130 .rebalanceSwap(
131 pair.0,
132 pair.1,
133 U256::from(pool.reserveUserToken),
134 signer_address,
135 )
136 .send()
137 .await
138 {
139 error!(
140 token_a = %pair.0,
141 token_b = %pair.1,
142 amount = %pool.reserveUserToken,
143 err = error_field(&e),
144 "Failed to send initial rebalance transaction"
145 );
146 }
147 }
148
149 loop {
153 for pair in pairs.iter() {
154 let pool = fee_amm
156 .getPool(pair.0, pair.1)
157 .call()
158 .await
159 .wrap_err_with(|| {
160 format!("failed to fetch pool for tokens {:?}, {:?}", pair.0, pair.1)
161 })?;
162
163 if pool.reserveUserToken > 0 {
164 let mut pending_txs = vec![];
165
166 match fee_amm
167 .rebalanceSwap(
168 pair.0,
169 pair.1,
170 U256::from(pool.reserveUserToken),
171 signer_address,
172 )
173 .send()
174 .await
175 {
176 Ok(tx) => {
177 pending_txs.push(tx);
178 }
179
180 Err(e) => {
181 error!(
182 token_a = %pair.0,
183 token_b = %pair.1,
184 amount = %pool.reserveUserToken,
185 err = error_field(&e),
186 "Failed to send rebalance transaction"
187 );
188
189 counter!("tempo_arb_bot_failed_transactions", "error" => "tx_send")
190 .increment(1);
191 }
192 }
193
194 for tx in pending_txs {
196 match tokio::time::timeout(
197 Duration::from_secs(self.poll_interval * 2),
198 tx.get_receipt(),
199 )
200 .await
201 {
202 Ok(Ok(_)) => {
203 debug!("Tx receipt received successfully");
204 counter!("tempo_arb_bot_successful_transactions").increment(1);
205 }
206 Ok(Err(e)) => {
207 error!(err = error_field(&e), "Failed to get tx receipt");
208 counter!("tempo_arb_bot_failed_transactions", "error" => "fetch_receipt")
209 .increment(1);
210 }
211 Err(_) => {
212 error!("Timeout waiting for tx receipt");
213 counter!("tempo_arb_bot_failed_transactions", "error" => "receipt_timeout")
214 .increment(1);
215 }
216 }
217 }
218 }
219 }
220
221 tokio::time::sleep(Duration::from_secs(self.poll_interval)).await;
222 debug!("Polling interval elapsed, checking pools for rebalancing");
223 }
224 }
225}