tempo_bench/cmd/
max_tps.rs

1mod dex;
2mod erc20;
3
4use alloy_consensus::Transaction;
5use itertools::Itertools;
6use rayon::iter::{IntoParallelIterator, ParallelIterator};
7use reth_tracing::{
8    RethTracer, Tracer,
9    tracing::{debug, error, info},
10};
11use tempo_alloy::{
12    TempoNetwork, primitives::TempoTxEnvelope, provider::ext::TempoProviderBuilderExt,
13};
14
15use alloy::{
16    consensus::BlockHeader,
17    eips::Encodable2718,
18    network::{ReceiptResponse, TransactionBuilder, TxSignerSync},
19    primitives::{Address, B256, BlockNumber, U256},
20    providers::{
21        DynProvider, PendingTransactionBuilder, PendingTransactionError, Provider, ProviderBuilder,
22        SendableTx, WatchTxError, fillers::TxFiller,
23    },
24    rpc::client::NoParams,
25    signers::local::{
26        PrivateKeySigner,
27        coins_bip39::{English, Mnemonic, MnemonicError},
28    },
29    transports::http::reqwest::Url,
30};
31use clap::Parser;
32use eyre::{Context, OptionExt, ensure};
33use futures::{
34    FutureExt, StreamExt, TryStreamExt,
35    future::BoxFuture,
36    stream::{self},
37};
38use governor::{Quota, RateLimiter, state::StreamRateLimitExt};
39use indicatif::{ParallelProgressIterator, ProgressBar, ProgressIterator};
40use rand::{random_range, seq::IndexedRandom};
41use rlimit::Resource;
42use serde::Serialize;
43use std::{
44    collections::VecDeque,
45    fs::File,
46    io::BufWriter,
47    num::{NonZeroU32, NonZeroU64},
48    str::FromStr,
49    sync::{
50        Arc, OnceLock,
51        atomic::{AtomicUsize, Ordering},
52    },
53    thread,
54    time::Duration,
55};
56use tempo_contracts::precompiles::{
57    IFeeManager::IFeeManagerInstance,
58    IRolesAuth,
59    IStablecoinExchange::IStablecoinExchangeInstance,
60    ITIP20::{self, ITIP20Instance},
61    ITIP20Factory, STABLECOIN_EXCHANGE_ADDRESS, TIP20_FACTORY_ADDRESS,
62};
63use tempo_precompiles::{
64    DEFAULT_FEE_TOKEN_PRE_ALLEGRETTO, TIP_FEE_MANAGER_ADDRESS,
65    stablecoin_exchange::{MAX_TICK, MIN_ORDER_AMOUNT, MIN_TICK, TICK_SPACING},
66    tip20::{ISSUER_ROLE, token_id_to_address},
67};
68use tokio::{
69    select,
70    time::{Sleep, interval, sleep},
71};
72use tokio_util::sync::CancellationToken;
73
74use crate::cmd::signer_providers::SignerProviderManager;
75
76/// Run maximum TPS throughput benchmarking
77#[derive(Parser, Debug)]
78pub struct MaxTpsArgs {
79    /// Target transactions per second
80    #[arg(short, long)]
81    tps: u64,
82
83    /// Test duration in seconds
84    #[arg(short, long, default_value_t = 30)]
85    duration: u64,
86
87    /// Number of accounts for pre-generation
88    #[arg(short, long, default_value_t = NonZeroU64::new(100).unwrap())]
89    accounts: NonZeroU64,
90
91    /// Mnemonic for generating accounts
92    #[arg(short, long, default_value = "random")]
93    mnemonic: MnemonicArg,
94
95    #[arg(short, long, default_value_t = 0)]
96    from_mnemonic_index: u32,
97
98    #[arg(long, default_value_t = DEFAULT_FEE_TOKEN_PRE_ALLEGRETTO)]
99    fee_token: Address,
100
101    /// Target URLs for network connections
102    #[arg(long, default_values_t = vec!["http://localhost:8545".parse::<Url>().unwrap()])]
103    target_urls: Vec<Url>,
104
105    /// A limit of the maximum number of concurrent requests, prevents issues with too many
106    /// connections open at once.
107    #[arg(long, default_value_t = 100)]
108    max_concurrent_requests: usize,
109
110    /// A number of transaction to send, before waiting for their receipts, that should be likely
111    /// safe.
112    ///
113    /// Large amount of transactions in a block will result in system transaction OutOfGas error.
114    #[arg(long, default_value_t = 10000)]
115    max_concurrent_transactions: usize,
116
117    /// File descriptor limit to set
118    #[arg(long)]
119    fd_limit: Option<u64>,
120
121    /// Node commit SHA for metadata
122    #[arg(long)]
123    node_commit_sha: Option<String>,
124
125    /// Build profile for metadata (e.g., "release", "debug", "maxperf")
126    #[arg(long)]
127    build_profile: Option<String>,
128
129    /// Benchmark mode for metadata (e.g., "max_tps", "stress_test")
130    #[arg(long)]
131    benchmark_mode: Option<String>,
132
133    /// A weight that determines the likelihood of generating a TIP-20 transfer transaction.
134    #[arg(long, default_value_t = 0.8)]
135    tip20_weight: f64,
136
137    /// A weight that determines the likelihood of generating a DEX place transaction.
138    #[arg(long, default_value_t = 0.01)]
139    place_order_weight: f64,
140
141    /// A weight that determines the likelihood of generating a DEX swapExactAmountIn transaction.
142    #[arg(long, default_value_t = 0.19)]
143    swap_weight: f64,
144
145    /// A weight that determines the likelihood of generating an ERC-20 transfer transaction.
146    #[arg(long, default_value_t = 0.0)]
147    erc20_weight: f64,
148
149    /// An amount of receipts to wait for after sending all the transactions.
150    #[arg(long, default_value_t = 100)]
151    sample_size: usize,
152
153    /// Fund accounts from the faucet before running the benchmark.
154    ///
155    /// Calls tempo_fundAddress for each account.
156    #[arg(long)]
157    faucet: bool,
158
159    /// Clear the transaction pool before running the benchmark.
160    ///
161    /// Calls admin_clearTxpool.
162    #[arg(long)]
163    clear_txpool: bool,
164
165    /// Disable 2D nonces
166    #[arg(long)]
167    disable_2d_nonces: bool,
168}
169
170impl MaxTpsArgs {
171    const WEIGHT_PRECISION: f64 = 1000.0;
172
173    pub async fn run(self) -> eyre::Result<()> {
174        RethTracer::new().init()?;
175
176        let accounts = self.accounts.get();
177
178        // Set file descriptor limit if provided
179        if let Some(fd_limit) = self.fd_limit {
180            increase_nofile_limit(fd_limit).context("Failed to increase nofile limit")?;
181        }
182
183        let signer_provider_factory = Box::new(|signer, target_url, cached_nonce_manager| {
184            ProviderBuilder::default()
185                .fetch_chain_id()
186                .with_gas_estimation()
187                .with_nonce_management(cached_nonce_manager)
188                .wallet(signer)
189                .connect_http(target_url)
190                .erased()
191        });
192
193        if self.disable_2d_nonces {
194            info!(
195                accounts = self.accounts,
196                "Creating signers (with standard nonces)"
197            );
198            let signer_provider_manager = SignerProviderManager::new(
199                self.mnemonic.resolve(),
200                self.from_mnemonic_index,
201                accounts,
202                self.target_urls.clone(),
203                Box::new(|target_url, cached_nonce_manager| {
204                    ProviderBuilder::default()
205                        .fetch_chain_id()
206                        .with_gas_estimation()
207                        .with_nonce_management(cached_nonce_manager)
208                        .connect_http(target_url)
209                }),
210                signer_provider_factory,
211            );
212            self.run_with_manager(signer_provider_manager).await
213        } else {
214            info!(
215                accounts = self.accounts,
216                "Creating signers (with 2D nonces)"
217            );
218            let signer_provider_manager = SignerProviderManager::new(
219                self.mnemonic.resolve(),
220                self.from_mnemonic_index,
221                accounts,
222                self.target_urls.clone(),
223                Box::new(|target_url, _cached_nonce_manager| {
224                    ProviderBuilder::new_with_network::<TempoNetwork>()
225                        .with_random_2d_nonces()
226                        .connect_http(target_url)
227                }),
228                signer_provider_factory,
229            );
230            self.run_with_manager(signer_provider_manager).await
231        }
232    }
233
234    async fn run_with_manager<F: TxFiller<TempoNetwork> + 'static>(
235        self,
236        signer_provider_manager: SignerProviderManager<F>,
237    ) -> eyre::Result<()> {
238        let accounts = self.accounts.get();
239        let signer_providers = signer_provider_manager.signer_providers();
240
241        if self.clear_txpool {
242            for (target_url, provider) in signer_provider_manager.target_url_providers() {
243                let transactions: u64 = provider
244                    .raw_request("admin_clearTxpool".into(), NoParams::default())
245                    .await
246                    .context(
247                        format!("Failed to clear transaction pool for {target_url}. Is `admin_clearTxpool` RPC method available?"),
248                    )?;
249                info!(%target_url, transactions, "Cleared transaction pool");
250            }
251        }
252
253        // Grab first provider to call some RPC methods
254        let provider = signer_providers[0].1.clone();
255
256        // Fund accounts from faucet if requested
257        if self.faucet {
258            fund_accounts(
259                &provider,
260                &signer_providers
261                    .iter()
262                    .map(|(signer, _)| signer.address())
263                    .collect::<Vec<_>>(),
264                self.max_concurrent_requests,
265                self.max_concurrent_transactions,
266            )
267            .await
268            .context("Failed to fund accounts from faucet")?;
269        }
270
271        info!(fee_token = %self.fee_token, "Setting default fee token");
272        join_all(
273            signer_providers
274                .iter()
275                .map(async |(_, provider)| {
276                    IFeeManagerInstance::new(TIP_FEE_MANAGER_ADDRESS, provider.clone())
277                        .setUserToken(self.fee_token)
278                        .send()
279                        .await
280                })
281                .progress(),
282            self.max_concurrent_requests,
283            self.max_concurrent_transactions,
284        )
285        .await
286        .context("Failed to set default fee token")?;
287
288        // Setup DEX
289        let user_tokens = 2;
290        info!(user_tokens, "Setting up DEX");
291        let (quote_token, user_tokens) = dex::setup(
292            signer_providers,
293            user_tokens,
294            self.max_concurrent_requests,
295            self.max_concurrent_transactions,
296        )
297        .await?;
298
299        let erc20_tokens = if self.erc20_weight > 0.0 {
300            let num_erc20_tokens = 1;
301            info!(num_erc20_tokens, "Setting up ERC-20 tokens");
302            erc20::setup(
303                signer_providers,
304                num_erc20_tokens,
305                self.max_concurrent_requests,
306                self.max_concurrent_transactions,
307            )
308            .await?
309        } else {
310            Vec::new()
311        };
312
313        // Generate all transactions
314        let total_txs = self.tps * self.duration;
315        let tip20_weight = (self.tip20_weight * Self::WEIGHT_PRECISION).trunc() as u64;
316        let place_order_weight = (self.place_order_weight * Self::WEIGHT_PRECISION).trunc() as u64;
317        let swap_weight = (self.swap_weight * Self::WEIGHT_PRECISION).trunc() as u64;
318        let erc20_weight = (self.erc20_weight * Self::WEIGHT_PRECISION).trunc() as u64;
319        let transactions = generate_transactions(GenerateTransactionsInput {
320            total_txs,
321            accounts,
322            signer_provider_manager: signer_provider_manager.clone(),
323            max_concurrent_requests: self.max_concurrent_requests,
324            tip20_weight,
325            place_order_weight,
326            swap_weight,
327            erc20_weight,
328            quote_token,
329            user_tokens,
330            erc20_tokens,
331        })
332        .await
333        .context("Failed to generate transactions")?;
334
335        // Send transactions
336        let mut pending_txs = send_transactions(
337            transactions,
338            signer_provider_manager.clone(),
339            self.max_concurrent_requests,
340            self.tps,
341            sleep(Duration::from_secs(self.duration)),
342        )
343        .await;
344        let end_block_number = provider.get_block_number().await?;
345
346        info!("Retrieving first block number from sent transactions");
347        let start_block_number = loop {
348            if let Some(first_tx) = pending_txs.pop_front() {
349                debug!(hash = %first_tx.tx_hash(), "Retrieving transaction receipt for first block number");
350                if let Ok(first_tx_receipt) = first_tx
351                    .with_timeout(Some(Duration::from_secs(5)))
352                    .get_receipt()
353                    .await
354                {
355                    break first_tx_receipt.block_number;
356                }
357            } else {
358                break None;
359            }
360        };
361        let Some(start_block_number) = start_block_number else {
362            eyre::bail!("Failed to retrieve start block number")
363        };
364
365        // Collect a sample of receipts and print the stats
366        let sample_size = pending_txs.len().min(self.sample_size);
367        let successful = Arc::new(AtomicUsize::new(0));
368        let timeout = Arc::new(AtomicUsize::new(0));
369        let failed = Arc::new(AtomicUsize::new(0));
370        info!(sample_size, "Collecting a sample of receipts");
371        stream::iter(0..sample_size)
372            .map(|_| {
373                let idx = random_range(0..pending_txs.len());
374                pending_txs.remove(idx).expect("index is in range")
375            })
376            .map(|pending_tx| {
377                let hash = *pending_tx.tx_hash();
378                pending_tx
379                    .with_timeout(Some(Duration::from_secs(5)))
380                    .get_receipt()
381                    .map(move |result| (hash, result))
382            })
383            .buffered(self.max_concurrent_requests)
384            .for_each(async |(hash, result)| match result {
385                Ok(_) => {
386                    successful.fetch_add(1, Ordering::Relaxed);
387                }
388                Err(PendingTransactionError::TxWatcher(WatchTxError::Timeout)) => {
389                    timeout.fetch_add(1, Ordering::Relaxed);
390                    error!(?hash, "Transaction receipt retrieval timed out");
391                }
392                Err(err) => {
393                    failed.fetch_add(1, Ordering::Relaxed);
394                    error!(?hash, "Transaction receipt retrieval failed: {}", err);
395                }
396            })
397            .await;
398        info!(
399            successful = successful.load(Ordering::Relaxed),
400            timeout = timeout.load(Ordering::Relaxed),
401            failed = failed.load(Ordering::Relaxed),
402            "Collected a sample of receipts"
403        );
404
405        generate_report(provider, start_block_number, end_block_number, &self).await?;
406
407        Ok(())
408    }
409}
410
411#[derive(Debug, Clone)]
412enum MnemonicArg {
413    Mnemonic(String),
414    Random,
415}
416
417impl FromStr for MnemonicArg {
418    type Err = MnemonicError;
419
420    fn from_str(s: &str) -> Result<Self, Self::Err> {
421        match s {
422            "random" => Ok(MnemonicArg::Random),
423            mnemonic => Ok(MnemonicArg::Mnemonic(
424                Mnemonic::<English>::from_str(mnemonic)?.to_phrase(),
425            )),
426        }
427    }
428}
429
430impl MnemonicArg {
431    fn resolve(&self) -> String {
432        match self {
433            MnemonicArg::Mnemonic(mnemonic) => mnemonic.clone(),
434            MnemonicArg::Random => Mnemonic::<English>::new(&mut rand_08::thread_rng()).to_phrase(),
435        }
436    }
437}
438
439/// Awaits pending transactions with up to `tps` per second and `max_concurrent_requests` simultaneous in-flight requests. Stops when `deadline` future resolves.
440async fn send_transactions<F: TxFiller<TempoNetwork> + 'static>(
441    transactions: Vec<Vec<u8>>,
442    signer_provider_manager: SignerProviderManager<F>,
443    max_concurrent_requests: usize,
444    tps: u64,
445    deadline: Sleep,
446) -> VecDeque<PendingTransactionBuilder<TempoNetwork>> {
447    info!(
448        transactions = transactions.len(),
449        max_concurrent_requests, tps, "Sending transactions"
450    );
451
452    // Create shared transaction counter and monitoring
453    let tx_counter = Arc::new(AtomicUsize::new(0));
454
455    // Spawn monitoring task for TPS reporting
456    let cancel = CancellationToken::new();
457    let _drop_guard = cancel.clone().drop_guard();
458    tokio::spawn(monitor_tps(
459        tx_counter.clone(),
460        transactions.len(),
461        cancel.clone(),
462    ));
463
464    // Create a rate limiter
465    let rate_limiter = RateLimiter::direct(Quota::per_second(NonZeroU32::new(tps as u32).unwrap()));
466
467    let failed = Arc::new(AtomicUsize::new(0));
468    let timeout = Arc::new(AtomicUsize::new(0));
469    let transactions = stream::iter(transactions)
470        .ratelimit_stream(&rate_limiter)
471        .zip(stream::repeat_with(|| {
472            signer_provider_manager.random_unsigned_provider()
473        }))
474        .map(|(bytes, provider)| async move {
475            tokio::time::timeout(
476                Duration::from_secs(1),
477                provider.send_raw_transaction(&bytes),
478            )
479            .await
480        })
481        .buffer_unordered(max_concurrent_requests)
482        .filter_map(|result| async {
483            match result {
484                Ok(Ok(pending_tx)) => {
485                    tx_counter.fetch_add(1, Ordering::Relaxed);
486                    Some(pending_tx)
487                }
488                Ok(Err(err)) => {
489                    failed.fetch_add(1, Ordering::Relaxed);
490                    debug!(?err, "Failed to send transaction");
491                    None
492                }
493                Err(_) => {
494                    timeout.fetch_add(1, Ordering::Relaxed);
495                    debug!("Transaction sending timed out");
496                    None
497                }
498            }
499        })
500        .take_until(deadline)
501        .collect()
502        .await;
503
504    info!(
505        success = tx_counter.load(Ordering::Relaxed),
506        failed = failed.load(Ordering::Relaxed),
507        timeout = timeout.load(Ordering::Relaxed),
508        "Finished sending transactions"
509    );
510
511    transactions
512}
513
514async fn generate_transactions<F: TxFiller<TempoNetwork> + 'static>(
515    input: GenerateTransactionsInput<F>,
516) -> eyre::Result<Vec<Vec<u8>>> {
517    let GenerateTransactionsInput {
518        total_txs,
519        accounts,
520        signer_provider_manager,
521        max_concurrent_requests,
522        tip20_weight,
523        place_order_weight,
524        swap_weight,
525        erc20_weight,
526        quote_token,
527        user_tokens,
528        erc20_tokens,
529    } = input;
530
531    let txs_per_sender = total_txs / accounts;
532    ensure!(
533        txs_per_sender > 0,
534        "txs per sender is 0, increase tps or decrease senders"
535    );
536
537    info!(transactions = total_txs, "Generating transactions");
538
539    const TX_TYPES: usize = 4;
540    // Weights for random sampling for each transaction type
541    let tx_weights: [u64; TX_TYPES] = [tip20_weight, swap_weight, place_order_weight, erc20_weight];
542    // Cached gas estimates for each transaction type
543    let gas_estimates: [Arc<OnceLock<(u128, u128, u64)>>; TX_TYPES] = Default::default();
544
545    // Counters for number of transactions of each type
546    let tip20_transfers = Arc::new(AtomicUsize::new(0));
547    let swaps = Arc::new(AtomicUsize::new(0));
548    let orders = Arc::new(AtomicUsize::new(0));
549    let erc20_transfers = Arc::new(AtomicUsize::new(0));
550
551    let builders = ProgressBar::new(total_txs)
552        .wrap_stream(stream::iter(
553            std::iter::repeat_with(|| signer_provider_manager.random_unsigned_provider())
554                .take(total_txs as usize),
555        ))
556        .map(async |provider| {
557            let token = user_tokens.choose(&mut rand::rng()).copied().unwrap();
558
559            // TODO: can be improved with an enum per transaction type
560            let tx_index = tx_weights
561                .iter()
562                .enumerate()
563                .collect::<Vec<_>>()
564                .choose_weighted(&mut rand::rng(), |(_, weight)| *weight)?
565                .0;
566
567            let mut tx = match tx_index {
568                0 => {
569                    tip20_transfers.fetch_add(1, Ordering::Relaxed);
570                    let token = ITIP20Instance::new(token, provider.clone());
571
572                    // Transfer minimum possible amount
573                    token
574                        .transfer(Address::random(), U256::ONE)
575                        .into_transaction_request()
576                }
577                1 => {
578                    swaps.fetch_add(1, Ordering::Relaxed);
579                    let exchange = IStablecoinExchangeInstance::new(
580                        STABLECOIN_EXCHANGE_ADDRESS,
581                        provider.clone(),
582                    );
583
584                    // Swap minimum possible amount
585                    exchange
586                        .quoteSwapExactAmountIn(token, quote_token, 1)
587                        .into_transaction_request()
588                }
589                2 => {
590                    orders.fetch_add(1, Ordering::Relaxed);
591                    let exchange = IStablecoinExchangeInstance::new(
592                        STABLECOIN_EXCHANGE_ADDRESS,
593                        provider.clone(),
594                    );
595
596                    // Place an order at a random tick that's a multiple of `TICK_SPACING`
597                    let tick =
598                        rand::random_range(MIN_TICK / TICK_SPACING..=MAX_TICK / TICK_SPACING)
599                            * TICK_SPACING;
600                    exchange
601                        .place(token, MIN_ORDER_AMOUNT, true, tick)
602                        .into_transaction_request()
603                }
604                3 => {
605                    erc20_transfers.fetch_add(1, Ordering::Relaxed);
606                    let token_address = erc20_tokens.choose(&mut rand::rng()).copied().unwrap();
607                    let token = erc20::MockERC20::new(token_address, provider.clone());
608
609                    // Transfer minimum possible amount
610                    token
611                        .transfer(Address::random(), U256::ONE)
612                        .into_transaction_request()
613                }
614                _ => unreachable!("Only {TX_TYPES} transaction types are supported"),
615            };
616
617            // Get a random signer and set it as the sender of the transaction.
618            let signer = signer_provider_manager.random_signer();
619            tx.inner.set_from(signer.address());
620
621            let gas = &gas_estimates[tx_index];
622            // If we already filled the gas fields once for that transaction type, use it.
623            // This will skip the gas filler.
624            if let Some((max_fee_per_gas, max_priority_fee_per_gas, gas_limit)) = gas.get() {
625                tx.inner.set_max_fee_per_gas(*max_fee_per_gas);
626                tx.inner
627                    .set_max_priority_fee_per_gas(*max_priority_fee_per_gas);
628                tx.inner.set_gas_limit(*gas_limit);
629            }
630
631            // Fill the rest of transaction. In case we already filled the gas fields,
632            // it will only fill the chain ID and nonce that are efficiently cached inside
633            // the fillers.
634            let tx = provider.fill(tx).await?;
635
636            // If we never filled the gas fields for that transaction type, cache the estimated
637            // gas.
638            if gas.get().is_none() {
639                let _ = gas.set(match &tx {
640                    SendableTx::Builder(builder) => (
641                        builder
642                            .max_fee_per_gas()
643                            .ok_or_eyre("max fee per gas should be filled")?,
644                        builder
645                            .max_priority_fee_per_gas()
646                            .ok_or_eyre("max priority fee per gas should be filled")?,
647                        builder
648                            .gas_limit()
649                            .ok_or_eyre("gas limit should be filled")?,
650                    ),
651                    SendableTx::Envelope(envelope) => (
652                        envelope.max_fee_per_gas(),
653                        envelope
654                            .max_priority_fee_per_gas()
655                            .ok_or_eyre("max priority fee per gas should be filled")?,
656                        envelope.gas_limit(),
657                    ),
658                });
659            }
660
661            eyre::Ok((tx.try_into_request()?, signer))
662        })
663        .buffer_unordered(max_concurrent_requests)
664        .try_collect::<Vec<_>>()
665        .await?;
666    info!(
667        transactions = builders.len(),
668        tip20_transfers = tip20_transfers.load(Ordering::Relaxed),
669        swaps = swaps.load(Ordering::Relaxed),
670        orders = orders.load(Ordering::Relaxed),
671        erc20_transfers = erc20_transfers.load(Ordering::Relaxed),
672        "Generated transactions",
673    );
674
675    info!(transactions = builders.len(), "Signing transactions");
676    // Sign transactions in parallel using signers directly, so it doesn't require async
677    let transactions = builders
678        .into_par_iter()
679        .progress()
680        .map(|(tx, signer)| -> eyre::Result<TempoTxEnvelope> {
681            let mut tx = tx.build_unsigned()?;
682            let sig = signer.sign_transaction_sync(tx.as_dyn_signable_mut())?;
683            Ok(tx.into_envelope(sig))
684        })
685        .map(|result| result.map(|tx| tx.encoded_2718()))
686        .collect::<eyre::Result<Vec<_>>>()?;
687
688    Ok(transactions)
689}
690
691/// Funds accounts from the faucet using `temp_fundAddress` RPC.
692async fn fund_accounts(
693    provider: &DynProvider<TempoNetwork>,
694    addresses: &[Address],
695    max_concurrent_requests: usize,
696    max_concurrent_transactions: usize,
697) -> eyre::Result<()> {
698    info!(accounts = addresses.len(), "Funding accounts from faucet");
699    let progress = ProgressBar::new(addresses.len() as u64);
700
701    let chunks = addresses
702        .iter()
703        .map(|address| {
704            let address = *address;
705            provider.raw_request::<_, Vec<B256>>("tempo_fundAddress".into(), (address,))
706        })
707        .chunks(max_concurrent_transactions);
708
709    for chunk in chunks.into_iter() {
710        let tx_hashes = stream::iter(chunk)
711            .buffer_unordered(max_concurrent_requests)
712            .try_collect::<Vec<_>>()
713            .await?
714            .into_iter()
715            .inspect(|_| progress.inc(1))
716            .flatten()
717            .map(async |hash| {
718                Ok(
719                    PendingTransactionBuilder::new(provider.root().clone(), hash)
720                        .get_receipt()
721                        .await?,
722                )
723            });
724        assert_receipts(tx_hashes, max_concurrent_requests)
725            .await
726            .expect("Failed to fund accounts");
727    }
728    Ok(())
729}
730
731pub fn increase_nofile_limit(min_limit: u64) -> eyre::Result<u64> {
732    let (soft, hard) = Resource::NOFILE.get()?;
733    info!(soft, hard, "File descriptor limit at startup");
734
735    if hard < min_limit {
736        panic!(
737            "File descriptor hard limit is too low. Please increase it to at least {min_limit}."
738        );
739    }
740
741    if soft != hard {
742        Resource::NOFILE.set(hard, hard)?; // Just max things out to give us plenty of overhead.
743        let (soft, hard) = Resource::NOFILE.get()?;
744        info!(soft, hard, "After increasing file descriptor limit");
745    }
746
747    Ok(soft)
748}
749
750#[derive(Serialize)]
751struct BenchmarkedBlock {
752    number: BlockNumber,
753    tx_count: usize,
754    ok_count: usize,
755    err_count: usize,
756    gas_used: u64,
757    timestamp: u64,
758    latency_ms: Option<u64>,
759}
760
761#[derive(Serialize)]
762struct BenchmarkMetadata {
763    target_tps: u64,
764    run_duration_secs: u64,
765    accounts: u64,
766    chain_id: u64,
767    total_connections: usize,
768    start_block: BlockNumber,
769    end_block: BlockNumber,
770    #[serde(skip_serializing_if = "Option::is_none")]
771    node_commit_sha: Option<String>,
772    #[serde(skip_serializing_if = "Option::is_none")]
773    build_profile: Option<String>,
774    #[serde(skip_serializing_if = "Option::is_none")]
775    mode: Option<String>,
776    tip20_weight: f64,
777    place_order_weight: f64,
778    swap_weight: f64,
779    erc20_weight: f64,
780}
781
782#[derive(Serialize)]
783struct BenchmarkReport {
784    metadata: BenchmarkMetadata,
785    blocks: Vec<BenchmarkedBlock>,
786}
787
788pub async fn generate_report(
789    provider: DynProvider<TempoNetwork>,
790    start_block: BlockNumber,
791    end_block: BlockNumber,
792    args: &MaxTpsArgs,
793) -> eyre::Result<()> {
794    info!(start_block, end_block, "Generating report");
795
796    let mut last_block_timestamp: Option<u64> = None;
797
798    let mut benchmarked_blocks = Vec::new();
799
800    for number in start_block..=end_block {
801        let block = provider
802            .get_block(number.into())
803            .await?
804            .ok_or_eyre("Block {number} not found")?;
805        let receipts = provider
806            .get_block_receipts(number.into())
807            .await?
808            .ok_or_eyre("Receipts for block {number} not found")?;
809        let timestamp = block.header.timestamp_millis();
810
811        let latency_ms = last_block_timestamp.map(|last| timestamp - last);
812        let (ok_count, err_count) =
813            receipts
814                .iter()
815                .fold((0, 0), |(successes, failures), receipt| {
816                    if receipt.status() {
817                        (successes + 1, failures)
818                    } else {
819                        (successes, failures + 1)
820                    }
821                });
822
823        benchmarked_blocks.push(BenchmarkedBlock {
824            number,
825            tx_count: receipts.len(),
826            ok_count,
827            err_count,
828            gas_used: block.header.gas_used(),
829            timestamp: block.header.timestamp_millis(),
830            latency_ms,
831        });
832
833        last_block_timestamp = Some(timestamp);
834    }
835
836    let metadata = BenchmarkMetadata {
837        target_tps: args.tps,
838        run_duration_secs: args.duration,
839        accounts: args.accounts.get(),
840        chain_id: provider.get_chain_id().await?,
841        total_connections: args.max_concurrent_requests,
842        start_block,
843        end_block,
844        node_commit_sha: args.node_commit_sha.clone(),
845        build_profile: args.build_profile.clone(),
846        mode: args.benchmark_mode.clone(),
847        tip20_weight: args.tip20_weight,
848        place_order_weight: args.place_order_weight,
849        swap_weight: args.swap_weight,
850        erc20_weight: args.erc20_weight,
851    };
852
853    let report = BenchmarkReport {
854        metadata,
855        blocks: benchmarked_blocks,
856    };
857
858    let path = "report.json";
859    let file = File::create(path)?;
860    let writer = BufWriter::new(file);
861    serde_json::to_writer_pretty(writer, &report)?;
862
863    info!(path, "Generated report");
864
865    Ok(())
866}
867
868async fn monitor_tps(tx_counter: Arc<AtomicUsize>, target_count: usize, token: CancellationToken) {
869    let mut last_count = 0;
870    let mut ticker = interval(Duration::from_secs(1));
871
872    loop {
873        select! {
874            _ = ticker.tick() => {
875                let current_count = tx_counter.load(Ordering::Relaxed);
876                let tps = current_count - last_count;
877                last_count = current_count;
878
879                info!(tps, total = current_count, "Status");
880                thread::sleep(Duration::from_secs(1));
881
882                if current_count == target_count {
883                    break;
884                }
885            }
886            _ = token.cancelled() => {
887                break;
888            }
889        }
890    }
891}
892
893async fn join_all<
894    T: Future<Output = alloy::contract::Result<PendingTransactionBuilder<TempoNetwork>>>,
895>(
896    futures: impl IntoIterator<Item = T>,
897    max_concurrent_requests: usize,
898    max_concurrent_transactions: usize,
899) -> eyre::Result<()> {
900    let chunks = futures.into_iter().chunks(max_concurrent_transactions);
901
902    for chunk in chunks.into_iter() {
903        // Send transactions and collect pending builders
904        let pending_txs = stream::iter(chunk)
905            .buffer_unordered(max_concurrent_requests)
906            .try_collect::<Vec<_>>()
907            .await?;
908
909        // Fetch receipts and assert status
910        assert_receipts(
911            pending_txs
912                .into_iter()
913                .map(|tx| async move { Ok(tx.get_receipt().await?) }),
914            max_concurrent_requests,
915        )
916        .await?;
917    }
918
919    Ok(())
920}
921
922async fn assert_receipts<R: ReceiptResponse, F: Future<Output = eyre::Result<R>>>(
923    receipts: impl IntoIterator<Item = F>,
924    max_concurrent_requests: usize,
925) -> eyre::Result<()> {
926    stream::iter(receipts.into_iter())
927        .buffer_unordered(max_concurrent_requests)
928        .try_for_each(|receipt| assert_receipt(receipt))
929        .await
930}
931
932async fn assert_receipt<R: ReceiptResponse>(receipt: R) -> eyre::Result<()> {
933    eyre::ensure!(
934        receipt.status(),
935        "Transaction {} failed",
936        receipt.transaction_hash()
937    );
938    Ok(())
939}
940
941struct GenerateTransactionsInput<F: TxFiller<TempoNetwork>> {
942    total_txs: u64,
943    accounts: u64,
944    signer_provider_manager: SignerProviderManager<F>,
945    max_concurrent_requests: usize,
946    tip20_weight: u64,
947    place_order_weight: u64,
948    swap_weight: u64,
949    erc20_weight: u64,
950    quote_token: Address,
951    user_tokens: Vec<Address>,
952    erc20_tokens: Vec<Address>,
953}