Skip to main content

tempo_bench/cmd/
max_tps.rs

1mod dex;
2mod erc20;
3mod mpp;
4
5use alloy_consensus::Transaction;
6use itertools::Itertools;
7use reth_tracing::{
8    RethTracer, Tracer,
9    tracing::{debug, error, info},
10};
11use tempo_alloy::{
12    TempoNetwork, fillers::ExpiringNonceFiller, provider::ext::TempoProviderBuilderExt,
13};
14
15use alloy::{
16    consensus::BlockHeader,
17    eips::Encodable2718,
18    network::{EthereumWallet, ReceiptResponse, TransactionBuilder, TxSignerSync},
19    primitives::{Address, B256, BlockNumber, U256},
20    providers::{
21        DynProvider, PendingTransactionBuilder, PendingTransactionError, Provider, ProviderBuilder,
22        SendableTx, WatchTxError, fillers::TxFiller,
23    },
24    rpc::client::NoParams,
25    signers::local::{
26        Secp256k1Signer,
27        coins_bip39::{English, Mnemonic, MnemonicError},
28    },
29    transports::http::reqwest::Url,
30};
31use clap::Parser;
32use eyre::{Context, OptionExt};
33use futures::{
34    FutureExt, Stream, StreamExt, TryStreamExt,
35    future::BoxFuture,
36    stream::{self},
37};
38use governor::{Quota, RateLimiter, state::StreamRateLimitExt};
39use indicatif::{ProgressBar, ProgressIterator};
40use rand::{random_range, seq::IndexedRandom};
41use rlimit::Resource;
42use serde::Serialize;
43use std::{
44    collections::VecDeque,
45    fs::File,
46    io::BufWriter,
47    num::{NonZeroU32, NonZeroU64},
48    str::FromStr,
49    sync::{
50        Arc, OnceLock,
51        atomic::{AtomicUsize, Ordering},
52    },
53    time::Duration,
54};
55use tempo_contracts::precompiles::{
56    IFeeManager::IFeeManagerInstance,
57    IRolesAuth,
58    IStablecoinDEX::IStablecoinDEXInstance,
59    ITIP20::{self, ITIP20Instance},
60    ITIP20Factory, STABLECOIN_DEX_ADDRESS, TIP20_FACTORY_ADDRESS,
61};
62use tempo_precompiles::{
63    TIP_FEE_MANAGER_ADDRESS,
64    stablecoin_dex::{MAX_TICK, MIN_ORDER_AMOUNT, MIN_TICK, TICK_SPACING},
65    tip_fee_manager::DEFAULT_FEE_TOKEN,
66    tip20::ISSUER_ROLE,
67};
68use tokio::{
69    select,
70    time::{interval, sleep},
71};
72use tokio_util::sync::CancellationToken;
73
74use crate::cmd::signer_providers::SignerProviderManager;
75
76/// Run maximum TPS throughput benchmarking
77#[derive(Parser, Debug)]
78pub struct MaxTpsArgs {
79    /// Target transactions per second
80    #[arg(short, long)]
81    tps: u64,
82
83    /// Test duration in seconds
84    #[arg(short, long, default_value_t = 30)]
85    duration: u64,
86
87    /// Number of accounts for pre-generation
88    #[arg(short, long, default_value_t = NonZeroU64::new(100).unwrap())]
89    accounts: NonZeroU64,
90
91    /// Mnemonic for generating accounts
92    #[arg(short, long, default_value = "random")]
93    mnemonic: MnemonicArg,
94
95    #[arg(short, long, default_value_t = 0)]
96    from_mnemonic_index: u32,
97
98    #[arg(long, default_value_t = DEFAULT_FEE_TOKEN)]
99    fee_token: Address,
100
101    /// Target URLs for network connections (comma-separated or multiple --target-urls)
102    #[arg(long, value_delimiter = ',', action = clap::ArgAction::Append, default_values_t = vec!["http://localhost:8545".parse::<Url>().unwrap()])]
103    target_urls: Vec<Url>,
104
105    /// A limit of the maximum number of concurrent requests, prevents issues with too many
106    /// connections open at once.
107    #[arg(long, default_value_t = 100)]
108    max_concurrent_requests: usize,
109
110    /// A number of transaction to send, before waiting for their receipts, that should be likely
111    /// safe.
112    ///
113    /// Large amount of transactions in a block will result in system transaction OutOfGas error.
114    #[arg(long, default_value_t = 10000)]
115    max_concurrent_transactions: usize,
116
117    /// File descriptor limit to set
118    #[arg(long)]
119    fd_limit: Option<u64>,
120
121    /// Node commit SHA for metadata
122    #[arg(long)]
123    node_commit_sha: Option<String>,
124
125    /// Build profile for metadata (e.g., "release", "debug", "maxperf")
126    #[arg(long)]
127    build_profile: Option<String>,
128
129    /// Benchmark mode for metadata (e.g., "max_tps", "stress_test")
130    #[arg(long)]
131    benchmark_mode: Option<String>,
132
133    /// A weight that determines the likelihood of generating a TIP-20 transfer transaction.
134    #[arg(long, default_value_t = 1.0)]
135    tip20_weight: f64,
136
137    /// A weight that determines the likelihood of generating a DEX place transaction.
138    #[arg(long, default_value_t = 0.0)]
139    place_order_weight: f64,
140
141    /// A weight that determines the likelihood of generating a DEX swapExactAmountIn transaction.
142    #[arg(long, default_value_t = 0.0)]
143    swap_weight: f64,
144
145    /// A weight that determines the likelihood of generating an ERC-20 transfer transaction.
146    #[arg(long, default_value_t = 0.0)]
147    erc20_weight: f64,
148
149    /// A weight that determines the likelihood of generating an MPP channel open+close
150    /// transaction. Requires `--mpp-contract-address`.
151    #[arg(long, default_value_t = 0.0)]
152    mpp_weight: f64,
153
154    /// Address of a deployed TempoStreamChannel contract. Required when `--mpp-weight` > 0.
155    #[arg(long, default_value = "0x33b901018174ddabe4841042ab76ba85d4e24f25")]
156    mpp_contract_address: Option<Address>,
157
158    /// Send transfers to existing signer accounts instead of random new addresses.
159    ///
160    /// When enabled, TIP-20 and ERC-20 transfers are sent to the bench's own signer addresses
161    /// (which already exist on-chain), avoiding cold SSTORE for account creation. This tests
162    /// pure transfer throughput without state growth.
163    #[arg(long)]
164    existing_recipients: bool,
165
166    /// An amount of receipts to wait for after sending all the transactions.
167    #[arg(long, default_value_t = 100)]
168    sample_size: usize,
169
170    /// Fund accounts from the faucet before running the benchmark.
171    ///
172    /// Calls tempo_fundAddress for each account.
173    #[arg(long)]
174    faucet: bool,
175
176    /// URL for the faucet service. If not provided, uses the first target URL.
177    #[arg(long)]
178    faucet_url: Option<Url>,
179
180    /// Clear the transaction pool before running the benchmark.
181    ///
182    /// Calls admin_clearTxpool.
183    #[arg(long)]
184    clear_txpool: bool,
185
186    /// Use 2D nonces instead of expiring nonces.
187    ///
188    /// By default, tempo-bench uses expiring nonces ([TIP-1009]) which use a circular buffer
189    /// for replay protection, avoiding state bloat. Use this flag to switch to 2D nonces
190    /// which store nonce state per (address, nonce_key) pair.
191    ///
192    /// [TIP-1009]: <https://docs.tempo.xyz/protocol/tips/tip-1009>
193    #[arg(long)]
194    use_2d_nonces: bool,
195
196    /// Use standard sequential nonces instead of expiring nonces.
197    ///
198    /// This disables both expiring nonces and 2D nonces, using traditional sequential
199    /// nonce management.
200    #[arg(long)]
201    use_standard_nonces: bool,
202}
203
204impl MaxTpsArgs {
205    const WEIGHT_PRECISION: f64 = 1000.0;
206
207    pub async fn run(self) -> eyre::Result<()> {
208        RethTracer::new().init()?;
209
210        let accounts = self.accounts.get();
211
212        // Set file descriptor limit if provided
213        if let Some(fd_limit) = self.fd_limit {
214            increase_nofile_limit(fd_limit).context("Failed to increase nofile limit")?;
215        }
216
217        let signer_provider_factory = Box::new(|signer, target_url, cached_nonce_manager| {
218            ProviderBuilder::default()
219                .fetch_chain_id()
220                .with_gas_estimation()
221                .with_nonce_management(cached_nonce_manager)
222                .wallet(EthereumWallet::from(signer))
223                .connect_http(target_url)
224                .erased()
225        });
226
227        if self.use_2d_nonces {
228            info!(
229                accounts = self.accounts,
230                "Creating signers (with 2D nonces)"
231            );
232            let signer_provider_manager = SignerProviderManager::new(
233                self.mnemonic.resolve(),
234                self.from_mnemonic_index,
235                accounts,
236                self.target_urls.clone(),
237                Box::new(|target_url, _cached_nonce_manager| {
238                    ProviderBuilder::new_with_network::<TempoNetwork>()
239                        .with_random_2d_nonces()
240                        .connect_http(target_url)
241                }),
242                signer_provider_factory,
243            );
244            self.run_with_manager(signer_provider_manager).await
245        } else if self.use_standard_nonces {
246            info!(
247                accounts = self.accounts,
248                "Creating signers (with standard nonces)"
249            );
250            let signer_provider_manager = SignerProviderManager::new(
251                self.mnemonic.resolve(),
252                self.from_mnemonic_index,
253                accounts,
254                self.target_urls.clone(),
255                Box::new(|target_url, cached_nonce_manager| {
256                    ProviderBuilder::default()
257                        .fetch_chain_id()
258                        .with_gas_estimation()
259                        .with_nonce_management(cached_nonce_manager)
260                        .connect_http(target_url)
261                }),
262                signer_provider_factory,
263            );
264            self.run_with_manager(signer_provider_manager).await
265        } else {
266            // Default: Use expiring nonces (TIP-1009)
267            // Use the default 25-second expiry window (protocol max is 30s).
268            let expiry_secs = ExpiringNonceFiller::DEFAULT_EXPIRY_SECS;
269            info!(
270                accounts = self.accounts,
271                expiry_secs, "Creating signers (with expiring nonces - TIP-1009)"
272            );
273            let signer_provider_manager = SignerProviderManager::new(
274                self.mnemonic.resolve(),
275                self.from_mnemonic_index,
276                accounts,
277                self.target_urls.clone(),
278                Box::new(move |target_url, _cached_nonce_manager| {
279                    ProviderBuilder::default()
280                        .filler(ExpiringNonceFiller::with_expiry_secs(expiry_secs))
281                        .with_gas_estimation()
282                        .fetch_chain_id()
283                        .connect_http(target_url)
284                }),
285                signer_provider_factory,
286            );
287            self.run_with_manager(signer_provider_manager).await
288        }
289    }
290
291    async fn run_with_manager<F: TxFiller<TempoNetwork> + 'static>(
292        self,
293        signer_provider_manager: SignerProviderManager<F>,
294    ) -> eyre::Result<()> {
295        // Validate required addresses before sending any transactions
296        eyre::ensure!(
297            self.mpp_weight == 0.0 || self.mpp_contract_address.is_some(),
298            "--mpp-contract-address is required when --mpp-weight > 0"
299        );
300
301        let signer_providers = signer_provider_manager.signer_providers();
302
303        if self.clear_txpool {
304            for (target_url, provider) in signer_provider_manager.target_url_providers() {
305                let transactions: u64 = provider
306                    .raw_request("admin_clearTxpool".into(), NoParams::default())
307                    .await
308                    .context(
309                        format!("Failed to clear transaction pool for {target_url}. Is `admin_clearTxpool` RPC method available?"),
310                    )?;
311                info!(%target_url, transactions, "Cleared transaction pool");
312            }
313        }
314
315        // Grab first provider to call some RPC methods
316        let provider = signer_providers[0].1.clone();
317
318        // Fund accounts from faucet if requested
319        if self.faucet {
320            let faucet_provider: DynProvider<TempoNetwork> =
321                if let Some(ref faucet_url) = self.faucet_url {
322                    info!(%faucet_url, "Using custom faucet URL");
323                    ProviderBuilder::new_with_network::<TempoNetwork>()
324                        .connect_http(faucet_url.clone())
325                        .erased()
326                } else {
327                    provider.clone()
328                };
329            fund_accounts(
330                &faucet_provider,
331                &signer_providers
332                    .iter()
333                    .map(|(signer, _)| signer.address())
334                    .collect::<Vec<_>>(),
335                self.max_concurrent_requests,
336                self.max_concurrent_transactions,
337            )
338            .await
339            .context("Failed to fund accounts from faucet")?;
340        }
341
342        info!(fee_token = %self.fee_token, "Setting default fee token");
343        join_all(
344            signer_providers
345                .iter()
346                .map(async |(_, provider)| {
347                    IFeeManagerInstance::new(TIP_FEE_MANAGER_ADDRESS, provider.clone())
348                        .setUserToken(self.fee_token)
349                        .send()
350                        .await
351                })
352                .progress(),
353            self.max_concurrent_requests,
354            self.max_concurrent_transactions,
355        )
356        .await
357        .context("Failed to set default fee token")?;
358
359        // Setup DEX tokens, pairs, and liquidity only if any DEX transaction type has non-zero
360        // weight. Otherwise, use the fee token for TIP-20 transfers directly.
361        let (quote_token, user_tokens) = if self.place_order_weight > 0.0 || self.swap_weight > 0.0
362        {
363            let user_tokens = 2;
364            info!(user_tokens, "Setting up DEX");
365            let (quote_token, user_tokens) = dex::setup(
366                signer_providers,
367                user_tokens,
368                self.max_concurrent_requests,
369                self.max_concurrent_transactions,
370            )
371            .await?;
372            (Some(quote_token), user_tokens)
373        } else {
374            info!(fee_token = %self.fee_token, "Using fee token for TIP-20 transfers");
375            (None, vec![self.fee_token])
376        };
377
378        let erc20_tokens = if self.erc20_weight > 0.0 {
379            let num_erc20_tokens = 1;
380            info!(num_erc20_tokens, "Setting up ERC-20 tokens");
381            erc20::setup(
382                signer_providers,
383                num_erc20_tokens,
384                self.max_concurrent_requests,
385                self.max_concurrent_transactions,
386            )
387            .await?
388        } else {
389            Vec::new()
390        };
391
392        let mpp_contract_address = if self.mpp_weight > 0.0 {
393            let addr = self.mpp_contract_address.expect("validated above");
394            mpp::setup(
395                signer_providers,
396                addr,
397                self.fee_token,
398                self.max_concurrent_requests,
399                self.max_concurrent_transactions,
400            )
401            .await?;
402            Some(addr)
403        } else {
404            None
405        };
406
407        // Generate and send transactions
408        let total_txs = self.tps * self.duration;
409        let tip20_weight = (self.tip20_weight * Self::WEIGHT_PRECISION).trunc() as u64;
410        let place_order_weight = (self.place_order_weight * Self::WEIGHT_PRECISION).trunc() as u64;
411        let swap_weight = (self.swap_weight * Self::WEIGHT_PRECISION).trunc() as u64;
412        let erc20_weight = (self.erc20_weight * Self::WEIGHT_PRECISION).trunc() as u64;
413        let mpp_weight = (self.mpp_weight * Self::WEIGHT_PRECISION).trunc() as u64;
414
415        let recipients = if self.existing_recipients {
416            let addrs: Vec<Address> = signer_providers
417                .iter()
418                .map(|(signer, _)| signer.address())
419                .collect();
420            info!(
421                recipients = addrs.len(),
422                "Using existing signer addresses as recipients"
423            );
424            Some(addrs)
425        } else {
426            None
427        };
428
429        let use_expiring_nonces = !self.use_2d_nonces && !self.use_standard_nonces;
430        let expiry_secs = if use_expiring_nonces {
431            Some(ExpiringNonceFiller::DEFAULT_EXPIRY_SECS)
432        } else {
433            None
434        };
435
436        let chain_id = provider.get_chain_id().await?;
437
438        let gen_input = GenerateTransactionsInput {
439            tip20_weight,
440            place_order_weight,
441            swap_weight,
442            erc20_weight,
443            mpp_weight,
444            quote_token,
445            user_tokens,
446            erc20_tokens,
447            mpp_contract_address,
448            chain_id,
449            recipients,
450            expiry_secs,
451        };
452
453        info!(total_txs, "Generating and sending transactions");
454
455        let counters = TransactionCounters::default();
456        let target_count = total_txs as usize;
457        let cancel_token = CancellationToken::new();
458
459        // Start TPS monitor
460        tokio::spawn(monitor_tps(
461            counters.clone(),
462            target_count,
463            cancel_token.clone(),
464        ));
465
466        let rate_limiter =
467            RateLimiter::direct(Quota::per_second(NonZeroU32::new(self.tps as u32).unwrap()));
468        let start_block_number = provider.get_block_number().await?;
469
470        let mut pending_txs =
471            generate_transactions(signer_provider_manager.clone(), gen_input, counters.clone())
472                // Take exactly the required number of transactions to not over-send
473                .take(target_count)
474                // Stop when duration exceeded, no matter if we sent all transactions or not
475                .take_until(sleep(Duration::from_secs(self.duration)))
476                // Keep a buffer of pre-generated transactions to send as fast as possible
477                .buffer_unordered(self.tps as usize)
478                // Filter only successfully generated transactions
479                .filter_map(|result| async {
480                    match result {
481                        Ok(bytes) => Some(bytes),
482                        Err(e) => {
483                            debug!(?e, "Transaction generation failed");
484                            None
485                        }
486                    }
487                })
488                .boxed()
489                .ratelimit_stream(&rate_limiter)
490                // Pair each transaction with a random provider to send it
491                .zip(stream::repeat_with(|| {
492                    signer_provider_manager.random_unsigned_provider()
493                }))
494                // Prepare transaction sending futures
495                .map(|(bytes, provider)| async move {
496                    tokio::time::timeout(
497                        Duration::from_secs(1),
498                        provider.send_raw_transaction(&bytes),
499                    )
500                    .await
501                })
502                // Send transactions in parallel with up to the specified concurrency limit
503                .buffer_unordered(self.max_concurrent_requests)
504                .filter_map({
505                    let counters = counters.clone();
506                    move |result| {
507                        let counters = counters.clone();
508                        async move {
509                            match result {
510                                Ok(Ok(pending_tx)) => {
511                                    counters.sent.fetch_add(1, Ordering::Relaxed);
512                                    counters.success.fetch_add(1, Ordering::Relaxed);
513                                    Some(pending_tx)
514                                }
515                                Ok(Err(err)) => {
516                                    counters.sent.fetch_add(1, Ordering::Relaxed);
517                                    counters.failed.fetch_add(1, Ordering::Relaxed);
518                                    debug!(?err, "Failed to send transaction");
519                                    None
520                                }
521                                Err(_) => {
522                                    counters.sent.fetch_add(1, Ordering::Relaxed);
523                                    counters.timed_out.fetch_add(1, Ordering::Relaxed);
524                                    debug!("Transaction sending timed out");
525                                    None
526                                }
527                            }
528                        }
529                    }
530                })
531                .collect::<VecDeque<_>>()
532                .await;
533
534        cancel_token.cancel();
535
536        info!(
537            tip20_transfers = counters.tip20_transfers.load(Ordering::Relaxed),
538            swaps = counters.swaps.load(Ordering::Relaxed),
539            orders = counters.orders.load(Ordering::Relaxed),
540            erc20_transfers = counters.erc20_transfers.load(Ordering::Relaxed),
541            mpp_open_close = counters.mpp_open_close.load(Ordering::Relaxed),
542            success = counters.success.load(Ordering::Relaxed),
543            failed = counters.failed.load(Ordering::Relaxed),
544            timed_out = counters.timed_out.load(Ordering::Relaxed),
545            "Finished sending transactions"
546        );
547
548        let end_block_number = provider.get_block_number().await?;
549
550        // Collect a sample of receipts and print the stats
551        let sample_size = pending_txs.len().min(self.sample_size);
552        let successful = Arc::new(AtomicUsize::new(0));
553        let timeout = Arc::new(AtomicUsize::new(0));
554        let failed = Arc::new(AtomicUsize::new(0));
555        info!(sample_size, "Collecting a sample of receipts");
556        stream::iter(0..sample_size)
557            .map(|_| {
558                let idx = random_range(0..pending_txs.len());
559                pending_txs.remove(idx).expect("index is in range")
560            })
561            .map(|pending_tx| {
562                let hash = *pending_tx.tx_hash();
563                pending_tx
564                    .with_timeout(Some(Duration::from_secs(5)))
565                    .get_receipt()
566                    .map(move |result| (hash, result))
567            })
568            .buffered(self.max_concurrent_requests)
569            .for_each(async |(hash, result)| match result {
570                Ok(_) => {
571                    successful.fetch_add(1, Ordering::Relaxed);
572                }
573                Err(PendingTransactionError::TxWatcher(WatchTxError::Timeout)) => {
574                    timeout.fetch_add(1, Ordering::Relaxed);
575                    error!(?hash, "Transaction receipt retrieval timed out");
576                }
577                Err(err) => {
578                    failed.fetch_add(1, Ordering::Relaxed);
579                    error!(?hash, "Transaction receipt retrieval failed: {}", err);
580                }
581            })
582            .await;
583        info!(
584            successful = successful.load(Ordering::Relaxed),
585            timeout = timeout.load(Ordering::Relaxed),
586            failed = failed.load(Ordering::Relaxed),
587            "Collected a sample of receipts"
588        );
589
590        generate_report(provider, start_block_number, end_block_number, &self).await?;
591
592        Ok(())
593    }
594}
595
596#[derive(Debug, Clone)]
597enum MnemonicArg {
598    Mnemonic(String),
599    Random,
600}
601
602impl FromStr for MnemonicArg {
603    type Err = MnemonicError;
604
605    fn from_str(s: &str) -> Result<Self, Self::Err> {
606        match s {
607            "random" => Ok(MnemonicArg::Random),
608            mnemonic => Ok(MnemonicArg::Mnemonic(
609                Mnemonic::<English>::from_str(mnemonic)?.to_phrase(),
610            )),
611        }
612    }
613}
614
615impl MnemonicArg {
616    fn resolve(&self) -> String {
617        match self {
618            MnemonicArg::Mnemonic(mnemonic) => mnemonic.clone(),
619            MnemonicArg::Random => Mnemonic::<English>::new(&mut rand_08::thread_rng()).to_phrase(),
620        }
621    }
622}
623
624#[derive(Clone, Default)]
625struct TransactionCounters {
626    /// Per-type generation counters
627    tip20_transfers: Arc<AtomicUsize>,
628    swaps: Arc<AtomicUsize>,
629    orders: Arc<AtomicUsize>,
630    erc20_transfers: Arc<AtomicUsize>,
631    mpp_open_close: Arc<AtomicUsize>,
632    /// Sending counters
633    sent: Arc<AtomicUsize>,
634    success: Arc<AtomicUsize>,
635    failed: Arc<AtomicUsize>,
636    timed_out: Arc<AtomicUsize>,
637}
638
639#[derive(Clone)]
640struct GenerateTransactionsInput {
641    tip20_weight: u64,
642    place_order_weight: u64,
643    swap_weight: u64,
644    erc20_weight: u64,
645    mpp_weight: u64,
646    quote_token: Option<Address>,
647    user_tokens: Vec<Address>,
648    erc20_tokens: Vec<Address>,
649    mpp_contract_address: Option<Address>,
650    chain_id: u64,
651    /// When set, transfers go to these existing addresses instead of `Address::random()`.
652    recipients: Option<Vec<Address>>,
653    /// When `Some`, sets `valid_before` on each transaction right before signing
654    /// to keep it fresh for expiring nonces.
655    expiry_secs: Option<u64>,
656}
657
658/// Returns an infinite stream of futures, each generating, signing, and encoding one transaction.
659fn generate_transactions<F: TxFiller<TempoNetwork> + 'static>(
660    signer_provider_manager: SignerProviderManager<F>,
661    input: GenerateTransactionsInput,
662    counters: TransactionCounters,
663) -> impl Stream<Item = impl Future<Output = eyre::Result<Vec<u8>>>> {
664    let GenerateTransactionsInput {
665        tip20_weight,
666        place_order_weight,
667        swap_weight,
668        erc20_weight,
669        mpp_weight,
670        quote_token,
671        user_tokens,
672        erc20_tokens,
673        mpp_contract_address,
674        chain_id,
675        recipients,
676        expiry_secs,
677    } = input;
678
679    const TX_TYPES: usize = 5;
680    // Weights for random sampling for each transaction type
681    let tx_weights: [u64; TX_TYPES] = [
682        tip20_weight,
683        swap_weight,
684        place_order_weight,
685        erc20_weight,
686        mpp_weight,
687    ];
688    // Cached gas estimates for each transaction type
689    let gas_estimates: [Arc<OnceLock<(u128, u128, u64)>>; TX_TYPES] = Default::default();
690    // Global tx counter used to bump priority fee, ensuring unique tx hashes
691    // when using expiring nonces (which share nonce=0).
692    let tx_id = Arc::new(AtomicUsize::new(0));
693
694    stream::repeat_with(move || {
695        let signer_provider_manager = signer_provider_manager.clone();
696        let gas_estimates = gas_estimates.clone();
697        let tx_id = tx_id.clone();
698        let recipients = recipients.clone();
699        let user_tokens = user_tokens.clone();
700        let erc20_tokens = erc20_tokens.clone();
701        let counters = counters.clone();
702
703        async move {
704            let provider = signer_provider_manager.random_unsigned_provider();
705            let signer = signer_provider_manager.random_signer();
706            let token = user_tokens.choose(&mut rand::rng()).copied().unwrap();
707
708            // TODO: can be improved with an enum per transaction type
709            let tx_index = tx_weights
710                .iter()
711                .enumerate()
712                .collect::<Vec<_>>()
713                .choose_weighted(&mut rand::rng(), |(_, weight)| *weight)?
714                .0;
715
716            let recipient = match &recipients {
717                Some(addrs) => *addrs.choose(&mut rand::rng()).unwrap(),
718                None => Address::random(),
719            };
720
721            let mut tx = match tx_index {
722                0 => {
723                    counters.tip20_transfers.fetch_add(1, Ordering::Relaxed);
724                    let token = ITIP20Instance::new(token, provider.clone());
725
726                    // Transfer minimum possible amount
727                    token
728                        .transfer(recipient, U256::ONE)
729                        .into_transaction_request()
730                }
731                1 => {
732                    counters.swaps.fetch_add(1, Ordering::Relaxed);
733                    let exchange =
734                        IStablecoinDEXInstance::new(STABLECOIN_DEX_ADDRESS, provider.clone());
735
736                    // Swap minimum possible amount
737                    let quote_token =
738                        quote_token.expect("quote_token must be set when swap_weight > 0");
739                    exchange
740                        .quoteSwapExactAmountIn(token, quote_token, 1)
741                        .into_transaction_request()
742                }
743                2 => {
744                    counters.orders.fetch_add(1, Ordering::Relaxed);
745                    let exchange =
746                        IStablecoinDEXInstance::new(STABLECOIN_DEX_ADDRESS, provider.clone());
747
748                    // Place an order at a random tick that's a multiple of `TICK_SPACING`
749                    let tick = random_range(MIN_TICK / TICK_SPACING..=MAX_TICK / TICK_SPACING)
750                        * TICK_SPACING;
751                    exchange
752                        .place(token, MIN_ORDER_AMOUNT, true, tick)
753                        .into_transaction_request()
754                }
755                3 => {
756                    counters.erc20_transfers.fetch_add(1, Ordering::Relaxed);
757                    let token_address = erc20_tokens.choose(&mut rand::rng()).copied().unwrap();
758                    let token = erc20::MockERC20::new(token_address, provider.clone());
759
760                    // Transfer minimum possible amount
761                    token
762                        .transfer(recipient, U256::ONE)
763                        .into_transaction_request()
764                }
765                4 => {
766                    counters.mpp_open_close.fetch_add(1, Ordering::Relaxed);
767                    let contract = mpp_contract_address
768                        .expect("mpp_channel_address must be set when mpp_weight > 0");
769
770                    // Single Tempo tx with two calls: open a channel, then close it.
771                    // Signer is both payer and payee, so close succeeds immediately.
772                    let salt = B256::random();
773                    let payer = signer.address();
774                    let channel_id = mpp::compute_channel_id(
775                        payer,
776                        payer,
777                        token,
778                        salt,
779                        Address::ZERO,
780                        contract,
781                        chain_id,
782                    );
783                    mpp::build_open_and_close(contract, payer, token, salt, channel_id)
784                }
785                _ => unreachable!("Only {TX_TYPES} transaction types are supported"),
786            };
787
788            // Get a random signer and set it as the sender of the transaction.
789            tx.inner.set_from(signer.address());
790
791            let gas = &gas_estimates[tx_index];
792            // If we already filled the gas fields once for that transaction type, use it.
793            // This will skip the gas filler.
794            if let Some((max_fee_per_gas, max_priority_fee_per_gas, gas_limit)) = gas.get() {
795                tx.inner.set_max_fee_per_gas(*max_fee_per_gas);
796                tx.inner
797                    .set_max_priority_fee_per_gas(*max_priority_fee_per_gas);
798                tx.inner.set_gas_limit(*gas_limit);
799            }
800
801            // Fill the rest of transaction. In case we already filled the gas fields,
802            // it will only fill the chain ID and nonce that are efficiently cached inside
803            // the fillers.
804            let filled = provider.fill(tx).await?;
805
806            // If we never filled the gas fields for that transaction type, cache the estimated
807            // gas.
808            if gas.get().is_none() {
809                let _ = gas.set(match &filled {
810                    SendableTx::Builder(builder) => (
811                        builder
812                            .max_fee_per_gas()
813                            .ok_or_eyre("max fee per gas should be filled")?,
814                        builder
815                            .max_priority_fee_per_gas()
816                            .ok_or_eyre("max priority fee per gas should be filled")?,
817                        builder
818                            .gas_limit()
819                            .ok_or_eyre("gas limit should be filled")?,
820                    ),
821                    SendableTx::Envelope(envelope) => (
822                        envelope.max_fee_per_gas(),
823                        envelope
824                            .max_priority_fee_per_gas()
825                            .ok_or_eyre("max priority fee per gas should be filled")?,
826                        envelope.gas_limit(),
827                    ),
828                });
829            }
830
831            let mut req = filled.try_into_request()?;
832
833            // Bump priority fee by a unique counter to ensure unique tx hashes
834            // when using expiring nonces (which share nonce=0).
835            let id = tx_id.fetch_add(1, Ordering::Relaxed) as u128;
836            if let Some(fee) = req.max_priority_fee_per_gas() {
837                req.inner.set_max_priority_fee_per_gas(fee + id);
838            }
839            if let Some(fee) = req.max_fee_per_gas() {
840                req.inner.set_max_fee_per_gas(fee + id);
841            }
842
843            // Set valid_before right before signing to keep it fresh (expiring nonces only)
844            if let Some(expiry_secs) = expiry_secs {
845                let now = std::time::SystemTime::now()
846                    .duration_since(std::time::UNIX_EPOCH)
847                    .map(|d| d.as_secs())
848                    .unwrap_or(0);
849                req.set_valid_before(now + expiry_secs);
850            }
851
852            // Sign and encode
853            let mut unsigned = req.build_unsigned()?;
854            let sig = signer.sign_transaction_sync(unsigned.as_dyn_signable_mut())?;
855            eyre::Ok(unsigned.into_envelope(sig).encoded_2718())
856        }
857    })
858}
859
860/// Funds accounts from the faucet using `temp_fundAddress` RPC.
861async fn fund_accounts(
862    provider: &DynProvider<TempoNetwork>,
863    addresses: &[Address],
864    max_concurrent_requests: usize,
865    max_concurrent_transactions: usize,
866) -> eyre::Result<()> {
867    info!(accounts = addresses.len(), "Funding accounts from faucet");
868    let progress = ProgressBar::new(addresses.len() as u64);
869
870    let chunks = addresses
871        .iter()
872        .map(|address| {
873            let address = *address;
874            provider.raw_request::<_, Vec<B256>>("tempo_fundAddress".into(), (address,))
875        })
876        .chunks(max_concurrent_transactions);
877
878    for chunk in chunks.into_iter() {
879        let tx_hashes = stream::iter(chunk)
880            .buffer_unordered(max_concurrent_requests)
881            .try_collect::<Vec<_>>()
882            .await?
883            .into_iter()
884            .inspect(|_| progress.inc(1))
885            .flatten()
886            .map(async |hash| {
887                Ok(
888                    PendingTransactionBuilder::new(provider.root().clone(), hash)
889                        .get_receipt()
890                        .await?,
891                )
892            });
893        assert_receipts(tx_hashes, max_concurrent_requests)
894            .await
895            .expect("Failed to fund accounts");
896    }
897    Ok(())
898}
899
900pub fn increase_nofile_limit(min_limit: u64) -> eyre::Result<u64> {
901    let (soft, hard) = Resource::NOFILE.get()?;
902    info!(soft, hard, "File descriptor limit at startup");
903
904    if hard < min_limit {
905        panic!(
906            "File descriptor hard limit is too low. Please increase it to at least {min_limit}."
907        );
908    }
909
910    if soft != hard {
911        Resource::NOFILE.set(hard, hard)?; // Just max things out to give us plenty of overhead.
912        let (soft, hard) = Resource::NOFILE.get()?;
913        info!(soft, hard, "After increasing file descriptor limit");
914    }
915
916    Ok(soft)
917}
918
919#[derive(Serialize)]
920struct BenchmarkedBlock {
921    number: BlockNumber,
922    tx_count: usize,
923    ok_count: usize,
924    err_count: usize,
925    gas_used: u64,
926    timestamp: u64,
927    latency_ms: Option<u64>,
928}
929
930#[derive(Serialize)]
931struct BenchmarkMetadata {
932    target_tps: u64,
933    run_duration_secs: u64,
934    accounts: u64,
935    chain_id: u64,
936    total_connections: usize,
937    start_block: BlockNumber,
938    end_block: BlockNumber,
939    #[serde(skip_serializing_if = "Option::is_none")]
940    node_commit_sha: Option<String>,
941    #[serde(skip_serializing_if = "Option::is_none")]
942    build_profile: Option<String>,
943    #[serde(skip_serializing_if = "Option::is_none")]
944    mode: Option<String>,
945    tip20_weight: f64,
946    place_order_weight: f64,
947    swap_weight: f64,
948    erc20_weight: f64,
949    mpp_weight: f64,
950}
951
952#[derive(Serialize)]
953struct BenchmarkReport {
954    metadata: BenchmarkMetadata,
955    blocks: Vec<BenchmarkedBlock>,
956}
957
958pub async fn generate_report(
959    provider: DynProvider<TempoNetwork>,
960    start_block: BlockNumber,
961    end_block: BlockNumber,
962    args: &MaxTpsArgs,
963) -> eyre::Result<()> {
964    info!(start_block, end_block, "Generating report");
965
966    let mut last_block_timestamp: Option<u64> = None;
967
968    let mut benchmarked_blocks = Vec::new();
969
970    // Skip start_block — it was the chain head before any transactions were sent
971    for number in (start_block + 1)..=end_block {
972        let block = provider
973            .get_block(number.into())
974            .await?
975            .ok_or_eyre("Block {number} not found")?;
976        let receipts = provider
977            .get_block_receipts(number.into())
978            .await?
979            .ok_or_eyre("Receipts for block {number} not found")?;
980
981        let timestamp = block.header.timestamp_millis();
982
983        let latency_ms = last_block_timestamp.map(|last| timestamp - last);
984        let (ok_count, err_count) =
985            receipts
986                .iter()
987                .fold((0, 0), |(successes, failures), receipt| {
988                    if receipt.status() {
989                        (successes + 1, failures)
990                    } else {
991                        (successes, failures + 1)
992                    }
993                });
994
995        benchmarked_blocks.push(BenchmarkedBlock {
996            number,
997            tx_count: receipts.len(),
998            ok_count,
999            err_count,
1000            gas_used: block.header.gas_used(),
1001            timestamp: block.header.timestamp_millis(),
1002            latency_ms,
1003        });
1004
1005        last_block_timestamp = Some(timestamp);
1006    }
1007
1008    // Remove leading ramp-up blocks (system-only, no gas used)
1009    while benchmarked_blocks.first().is_some_and(|b| b.gas_used == 0) {
1010        benchmarked_blocks.remove(0);
1011    }
1012
1013    // Reset latency_ms for the new first block (its latency was relative to a stripped block)
1014    if let Some(first) = benchmarked_blocks.first_mut() {
1015        first.latency_ms = None;
1016    }
1017
1018    let metadata = BenchmarkMetadata {
1019        target_tps: args.tps,
1020        run_duration_secs: args.duration,
1021        accounts: args.accounts.get(),
1022        chain_id: provider.get_chain_id().await?,
1023        total_connections: args.max_concurrent_requests,
1024        start_block,
1025        end_block,
1026        node_commit_sha: args.node_commit_sha.clone(),
1027        build_profile: args.build_profile.clone(),
1028        mode: args.benchmark_mode.clone(),
1029        tip20_weight: args.tip20_weight,
1030        place_order_weight: args.place_order_weight,
1031        swap_weight: args.swap_weight,
1032        erc20_weight: args.erc20_weight,
1033        mpp_weight: args.mpp_weight,
1034    };
1035
1036    let report = BenchmarkReport {
1037        metadata,
1038        blocks: benchmarked_blocks,
1039    };
1040
1041    let path = "report.json";
1042    let file = File::create(path)?;
1043    let writer = BufWriter::new(file);
1044    serde_json::to_writer_pretty(writer, &report)?;
1045
1046    info!(path, "Generated report");
1047
1048    Ok(())
1049}
1050
1051async fn monitor_tps(counters: TransactionCounters, target_count: usize, token: CancellationToken) {
1052    let mut last_count = 0;
1053    let mut ticker = interval(Duration::from_secs(1));
1054
1055    loop {
1056        select! {
1057            _ = ticker.tick() => {
1058                let current_count = counters.sent.load(Ordering::Relaxed);
1059                let tps = current_count - last_count;
1060                last_count = current_count;
1061
1062                info!(
1063                    tps,
1064                    total = current_count,
1065                    failed = counters.failed.load(Ordering::Relaxed),
1066                    timed_out = counters.timed_out.load(Ordering::Relaxed),
1067                    "Status"
1068                );
1069                tokio::time::sleep(Duration::from_secs(1)).await;
1070
1071                if current_count == target_count {
1072                    break;
1073                }
1074            }
1075            _ = token.cancelled() => {
1076                break;
1077            }
1078        }
1079    }
1080}
1081
1082async fn join_all<
1083    T: Future<Output = alloy::contract::Result<PendingTransactionBuilder<TempoNetwork>>>,
1084>(
1085    futures: impl IntoIterator<Item = T>,
1086    max_concurrent_requests: usize,
1087    max_concurrent_transactions: usize,
1088) -> eyre::Result<()> {
1089    let chunks = futures.into_iter().chunks(max_concurrent_transactions);
1090
1091    for chunk in chunks.into_iter() {
1092        // Send transactions and collect pending builders
1093        let pending_txs = stream::iter(chunk)
1094            .buffer_unordered(max_concurrent_requests)
1095            .try_collect::<Vec<_>>()
1096            .await?;
1097
1098        // Fetch receipts and assert status
1099        assert_receipts(
1100            pending_txs
1101                .into_iter()
1102                .map(|tx| async move { Ok(tx.get_receipt().await?) }),
1103            max_concurrent_requests,
1104        )
1105        .await?;
1106    }
1107
1108    Ok(())
1109}
1110
1111async fn assert_receipts<R: ReceiptResponse, F: Future<Output = eyre::Result<R>>>(
1112    receipts: impl IntoIterator<Item = F>,
1113    max_concurrent_requests: usize,
1114) -> eyre::Result<()> {
1115    stream::iter(receipts)
1116        .buffer_unordered(max_concurrent_requests)
1117        .try_for_each(|receipt| assert_receipt(receipt))
1118        .await
1119}
1120
1121async fn assert_receipt<R: ReceiptResponse>(receipt: R) -> eyre::Result<()> {
1122    eyre::ensure!(
1123        receipt.status(),
1124        "Transaction {} failed",
1125        receipt.transaction_hash()
1126    );
1127    Ok(())
1128}