tempo_sidecar/cmd/
simple_arb.rs1use alloy::{
2 network::EthereumWallet,
3 primitives::{Address, U256},
4 providers::{Provider, ProviderBuilder},
5 rpc::types::Filter,
6 signers::local::PrivateKeySigner,
7 sol_types::SolEvent,
8};
9use clap::Parser;
10use eyre::Context;
11use itertools::Itertools;
12use metrics::{counter, describe_counter};
13use metrics_exporter_prometheus::PrometheusBuilder;
14use poem::{EndpointExt as _, Route, Server, get, listener::TcpListener};
15use std::{collections::HashSet, time::Duration};
16use tempo_precompiles::{
17 TIP_FEE_MANAGER_ADDRESS, TIP20_FACTORY_ADDRESS, tip_fee_manager::ITIPFeeAMM,
18 tip20_factory::ITIP20Factory,
19};
20use tempo_telemetry_util::error_field;
21use tracing::{debug, error, info, instrument};
22
23use crate::monitor;
24
25#[derive(Parser, Debug)]
26#[command(author, version, about, long_about = None)]
27pub struct SimpleArbArgs {
28 #[arg(short, long, required = true)]
30 rpc_url: String,
31
32 #[arg(short, long, required = true)]
34 private_key: String,
35
36 #[arg(long, default_value_t = 2)]
38 poll_interval: u64,
39
40 #[arg(long, default_value_t = 8000)]
42 metrics_port: u64,
43}
44
45#[instrument(skip(provider))]
46async fn fetch_all_pairs<P: Provider>(provider: P) -> eyre::Result<HashSet<(Address, Address)>> {
47 let filter = Filter::new()
48 .address(TIP20_FACTORY_ADDRESS)
49 .event_signature(ITIP20Factory::TokenCreated::SIGNATURE_HASH);
50
51 let logs = provider.get_logs(&filter).await?;
52
53 let tokens: Vec<Address> = logs
54 .iter()
55 .filter_map(|log| {
56 log.log_decode::<ITIP20Factory::TokenCreated>()
57 .ok()
58 .map(|event| event.inner.token)
59 })
60 .collect();
61
62 let mut pairs = HashSet::new();
63 for pair in tokens.iter().permutations(2) {
64 let (token_a, token_b) = (*pair[0], *pair[1]);
65 pairs.insert((token_a, token_b));
66 }
67
68 info!(
69 token_count = tokens.len(),
70 pair_count = pairs.len(),
71 "Fetched token pairs"
72 );
73
74 Ok(pairs)
75}
76
77impl SimpleArbArgs {
78 pub async fn run(self) -> eyre::Result<()> {
79 tracing_subscriber::FmtSubscriber::builder()
80 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
81 .init();
82
83 let builder = PrometheusBuilder::new();
84 let metrics_handle = builder
85 .install_recorder()
86 .context("failed to install recorder")?;
87
88 describe_counter!(
89 "tempo_arb_bot_successful_transactions",
90 "Number of successful transactions executed by the arb bot"
91 );
92 describe_counter!(
93 "tempo_arb_bot_failed_transactions",
94 "Number of failed transactions executed by the arb bot"
95 );
96
97 let app = Route::new().at(
98 "/metrics",
99 get(monitor::prometheus_metrics).data(metrics_handle.clone()),
100 );
101
102 let addr = format!("0.0.0.0:{}", self.metrics_port);
103
104 tokio::spawn(async move {
105 Server::new(TcpListener::bind(addr))
106 .run(app)
107 .await
108 .context("failed to run poem server")
109 });
110
111 let signer = PrivateKeySigner::from_slice(
112 &hex::decode(&self.private_key).context("failed to decode private key")?,
113 )
114 .context("failed to parse private key")?;
115
116 let signer_address = signer.address();
117 let wallet = EthereumWallet::from(signer);
118 let provider = ProviderBuilder::new()
119 .wallet(wallet)
120 .connect_http(self.rpc_url.parse().context("failed to parse RPC URL")?);
121
122 let fee_amm = ITIPFeeAMM::new(TIP_FEE_MANAGER_ADDRESS, provider.clone());
123
124 info!("Fetching all pairs...");
125 let pairs = fetch_all_pairs(provider.clone()).await?;
126
127 info!("Rebalancing initial pools...");
128 for pair in pairs.iter() {
129 let pool = fee_amm
131 .getPool(pair.0, pair.1)
132 .call()
133 .await
134 .wrap_err_with(|| {
135 format!("failed to fetch pool for tokens {}, {}", pair.0, pair.1)
136 })?;
137
138 if pool.reserveUserToken > 0
139 && let Err(e) = fee_amm
140 .rebalanceSwap(
141 pair.0,
142 pair.1,
143 U256::from(pool.reserveUserToken),
144 signer_address,
145 )
146 .send()
147 .await
148 {
149 error!(
150 token_a = %pair.0,
151 token_b = %pair.1,
152 amount = %pool.reserveUserToken,
153 err = error_field(&e),
154 "Failed to send initial rebalance transaction"
155 );
156 }
157 }
158
159 loop {
163 for pair in pairs.iter() {
164 let pool = fee_amm
166 .getPool(pair.0, pair.1)
167 .call()
168 .await
169 .wrap_err_with(|| {
170 format!("failed to fetch pool for tokens {:?}, {:?}", pair.0, pair.1)
171 })?;
172
173 if pool.reserveUserToken > 0 {
174 let mut pending_txs = vec![];
175
176 match fee_amm
177 .rebalanceSwap(
178 pair.0,
179 pair.1,
180 U256::from(pool.reserveUserToken),
181 signer_address,
182 )
183 .send()
184 .await
185 {
186 Ok(tx) => {
187 pending_txs.push(tx);
188 }
189
190 Err(e) => {
191 error!(
192 token_a = %pair.0,
193 token_b = %pair.1,
194 amount = %pool.reserveUserToken,
195 err = error_field(&e),
196 "Failed to send rebalance transaction"
197 );
198
199 counter!("tempo_arb_bot_failed_transactions", "error" => "tx_send")
200 .increment(1);
201 }
202 }
203
204 for tx in pending_txs {
206 match tokio::time::timeout(
207 Duration::from_secs(self.poll_interval * 2),
208 tx.get_receipt(),
209 )
210 .await
211 {
212 Ok(Ok(_)) => {
213 debug!("Tx receipt received successfully");
214 counter!("tempo_arb_bot_successful_transactions").increment(1);
215 }
216 Ok(Err(e)) => {
217 error!(err = error_field(&e), "Failed to get tx receipt");
218 counter!("tempo_arb_bot_failed_transactions", "error" => "fetch_receipt")
219 .increment(1);
220 }
221 Err(_) => {
222 error!("Timeout waiting for tx receipt");
223 counter!("tempo_arb_bot_failed_transactions", "error" => "receipt_timeout")
224 .increment(1);
225 }
226 }
227 }
228 }
229 }
230
231 tokio::time::sleep(Duration::from_secs(self.poll_interval)).await;
232 debug!("Polling interval elapsed, checking pools for rebalancing");
233 }
234 }
235}