1mod dex;
2mod erc20;
3mod mpp;
4
5use alloy_consensus::Transaction;
6use itertools::Itertools;
7use reth_tracing::{
8 RethTracer, Tracer,
9 tracing::{debug, error, info},
10};
11use tempo_alloy::{
12 TempoNetwork, fillers::ExpiringNonceFiller, provider::ext::TempoProviderBuilderExt,
13};
14
15use alloy::{
16 consensus::BlockHeader,
17 eips::Encodable2718,
18 network::{EthereumWallet, ReceiptResponse, TransactionBuilder, TxSignerSync},
19 primitives::{Address, B256, BlockNumber, U256},
20 providers::{
21 DynProvider, PendingTransactionBuilder, PendingTransactionError, Provider, ProviderBuilder,
22 SendableTx, WatchTxError, fillers::TxFiller,
23 },
24 rpc::client::NoParams,
25 signers::local::{
26 Secp256k1Signer,
27 coins_bip39::{English, Mnemonic, MnemonicError},
28 },
29 transports::http::reqwest::Url,
30};
31use clap::Parser;
32use eyre::{Context, OptionExt};
33use futures::{
34 FutureExt, Stream, StreamExt, TryStreamExt,
35 future::BoxFuture,
36 stream::{self},
37};
38use governor::{Quota, RateLimiter, state::StreamRateLimitExt};
39use indicatif::{ProgressBar, ProgressIterator};
40use rand::{random_range, seq::IndexedRandom};
41use rlimit::Resource;
42use serde::Serialize;
43use std::{
44 collections::VecDeque,
45 fs::File,
46 io::BufWriter,
47 num::{NonZeroU32, NonZeroU64},
48 str::FromStr,
49 sync::{
50 Arc, OnceLock,
51 atomic::{AtomicUsize, Ordering},
52 },
53 time::Duration,
54};
55use tempo_contracts::precompiles::{
56 IFeeManager::IFeeManagerInstance,
57 IRolesAuth,
58 IStablecoinDEX::IStablecoinDEXInstance,
59 ITIP20::{self, ITIP20Instance},
60 ITIP20Factory, STABLECOIN_DEX_ADDRESS, TIP20_FACTORY_ADDRESS,
61};
62use tempo_precompiles::{
63 TIP_FEE_MANAGER_ADDRESS,
64 stablecoin_dex::{MAX_TICK, MIN_ORDER_AMOUNT, MIN_TICK, TICK_SPACING},
65 tip_fee_manager::DEFAULT_FEE_TOKEN,
66 tip20::ISSUER_ROLE,
67};
68use tokio::{
69 select,
70 time::{interval, sleep},
71};
72use tokio_util::sync::CancellationToken;
73
74use crate::cmd::signer_providers::SignerProviderManager;
75
76#[derive(Parser, Debug)]
78pub struct MaxTpsArgs {
79 #[arg(short, long)]
81 tps: u64,
82
83 #[arg(short, long, default_value_t = 30)]
85 duration: u64,
86
87 #[arg(short, long, default_value_t = NonZeroU64::new(100).unwrap())]
89 accounts: NonZeroU64,
90
91 #[arg(short, long, default_value = "random")]
93 mnemonic: MnemonicArg,
94
95 #[arg(short, long, default_value_t = 0)]
96 from_mnemonic_index: u32,
97
98 #[arg(long, default_value_t = DEFAULT_FEE_TOKEN)]
99 fee_token: Address,
100
101 #[arg(long, value_delimiter = ',', action = clap::ArgAction::Append, default_values_t = vec!["http://localhost:8545".parse::<Url>().unwrap()])]
103 target_urls: Vec<Url>,
104
105 #[arg(long, default_value_t = 100)]
108 max_concurrent_requests: usize,
109
110 #[arg(long, default_value_t = 10000)]
115 max_concurrent_transactions: usize,
116
117 #[arg(long)]
119 fd_limit: Option<u64>,
120
121 #[arg(long)]
123 node_commit_sha: Option<String>,
124
125 #[arg(long)]
127 build_profile: Option<String>,
128
129 #[arg(long)]
131 benchmark_mode: Option<String>,
132
133 #[arg(long, default_value_t = 1.0)]
135 tip20_weight: f64,
136
137 #[arg(long, default_value_t = 0.0)]
139 place_order_weight: f64,
140
141 #[arg(long, default_value_t = 0.0)]
143 swap_weight: f64,
144
145 #[arg(long, default_value_t = 0.0)]
147 erc20_weight: f64,
148
149 #[arg(long, default_value_t = 0.0)]
152 mpp_weight: f64,
153
154 #[arg(long, default_value = "0x33b901018174ddabe4841042ab76ba85d4e24f25")]
156 mpp_contract_address: Option<Address>,
157
158 #[arg(long)]
164 existing_recipients: bool,
165
166 #[arg(long, default_value_t = 100)]
168 sample_size: usize,
169
170 #[arg(long)]
174 faucet: bool,
175
176 #[arg(long)]
178 faucet_url: Option<Url>,
179
180 #[arg(long)]
184 clear_txpool: bool,
185
186 #[arg(long)]
194 use_2d_nonces: bool,
195
196 #[arg(long)]
201 use_standard_nonces: bool,
202}
203
204impl MaxTpsArgs {
205 const WEIGHT_PRECISION: f64 = 1000.0;
206
207 pub async fn run(self) -> eyre::Result<()> {
208 RethTracer::new().init()?;
209
210 let accounts = self.accounts.get();
211
212 if let Some(fd_limit) = self.fd_limit {
214 increase_nofile_limit(fd_limit).context("Failed to increase nofile limit")?;
215 }
216
217 let signer_provider_factory = Box::new(|signer, target_url, cached_nonce_manager| {
218 ProviderBuilder::default()
219 .fetch_chain_id()
220 .with_gas_estimation()
221 .with_nonce_management(cached_nonce_manager)
222 .wallet(EthereumWallet::from(signer))
223 .connect_http(target_url)
224 .erased()
225 });
226
227 if self.use_2d_nonces {
228 info!(
229 accounts = self.accounts,
230 "Creating signers (with 2D nonces)"
231 );
232 let signer_provider_manager = SignerProviderManager::new(
233 self.mnemonic.resolve(),
234 self.from_mnemonic_index,
235 accounts,
236 self.target_urls.clone(),
237 Box::new(|target_url, _cached_nonce_manager| {
238 ProviderBuilder::new_with_network::<TempoNetwork>()
239 .with_random_2d_nonces()
240 .connect_http(target_url)
241 }),
242 signer_provider_factory,
243 );
244 self.run_with_manager(signer_provider_manager).await
245 } else if self.use_standard_nonces {
246 info!(
247 accounts = self.accounts,
248 "Creating signers (with standard nonces)"
249 );
250 let signer_provider_manager = SignerProviderManager::new(
251 self.mnemonic.resolve(),
252 self.from_mnemonic_index,
253 accounts,
254 self.target_urls.clone(),
255 Box::new(|target_url, cached_nonce_manager| {
256 ProviderBuilder::default()
257 .fetch_chain_id()
258 .with_gas_estimation()
259 .with_nonce_management(cached_nonce_manager)
260 .connect_http(target_url)
261 }),
262 signer_provider_factory,
263 );
264 self.run_with_manager(signer_provider_manager).await
265 } else {
266 let expiry_secs = ExpiringNonceFiller::DEFAULT_EXPIRY_SECS;
269 info!(
270 accounts = self.accounts,
271 expiry_secs, "Creating signers (with expiring nonces - TIP-1009)"
272 );
273 let signer_provider_manager = SignerProviderManager::new(
274 self.mnemonic.resolve(),
275 self.from_mnemonic_index,
276 accounts,
277 self.target_urls.clone(),
278 Box::new(move |target_url, _cached_nonce_manager| {
279 ProviderBuilder::default()
280 .filler(ExpiringNonceFiller::with_expiry_secs(expiry_secs))
281 .with_gas_estimation()
282 .fetch_chain_id()
283 .connect_http(target_url)
284 }),
285 signer_provider_factory,
286 );
287 self.run_with_manager(signer_provider_manager).await
288 }
289 }
290
291 async fn run_with_manager<F: TxFiller<TempoNetwork> + 'static>(
292 self,
293 signer_provider_manager: SignerProviderManager<F>,
294 ) -> eyre::Result<()> {
295 eyre::ensure!(
297 self.mpp_weight == 0.0 || self.mpp_contract_address.is_some(),
298 "--mpp-contract-address is required when --mpp-weight > 0"
299 );
300
301 let signer_providers = signer_provider_manager.signer_providers();
302
303 if self.clear_txpool {
304 for (target_url, provider) in signer_provider_manager.target_url_providers() {
305 let transactions: u64 = provider
306 .raw_request("admin_clearTxpool".into(), NoParams::default())
307 .await
308 .context(
309 format!("Failed to clear transaction pool for {target_url}. Is `admin_clearTxpool` RPC method available?"),
310 )?;
311 info!(%target_url, transactions, "Cleared transaction pool");
312 }
313 }
314
315 let provider = signer_providers[0].1.clone();
317
318 if self.faucet {
320 let faucet_provider: DynProvider<TempoNetwork> =
321 if let Some(ref faucet_url) = self.faucet_url {
322 info!(%faucet_url, "Using custom faucet URL");
323 ProviderBuilder::new_with_network::<TempoNetwork>()
324 .connect_http(faucet_url.clone())
325 .erased()
326 } else {
327 provider.clone()
328 };
329 fund_accounts(
330 &faucet_provider,
331 &signer_providers
332 .iter()
333 .map(|(signer, _)| signer.address())
334 .collect::<Vec<_>>(),
335 self.max_concurrent_requests,
336 self.max_concurrent_transactions,
337 )
338 .await
339 .context("Failed to fund accounts from faucet")?;
340 }
341
342 info!(fee_token = %self.fee_token, "Setting default fee token");
343 join_all(
344 signer_providers
345 .iter()
346 .map(async |(_, provider)| {
347 IFeeManagerInstance::new(TIP_FEE_MANAGER_ADDRESS, provider.clone())
348 .setUserToken(self.fee_token)
349 .send()
350 .await
351 })
352 .progress(),
353 self.max_concurrent_requests,
354 self.max_concurrent_transactions,
355 )
356 .await
357 .context("Failed to set default fee token")?;
358
359 let (quote_token, user_tokens) = if self.place_order_weight > 0.0 || self.swap_weight > 0.0
362 {
363 let user_tokens = 2;
364 info!(user_tokens, "Setting up DEX");
365 let (quote_token, user_tokens) = dex::setup(
366 signer_providers,
367 user_tokens,
368 self.max_concurrent_requests,
369 self.max_concurrent_transactions,
370 )
371 .await?;
372 (Some(quote_token), user_tokens)
373 } else {
374 info!(fee_token = %self.fee_token, "Using fee token for TIP-20 transfers");
375 (None, vec![self.fee_token])
376 };
377
378 let erc20_tokens = if self.erc20_weight > 0.0 {
379 let num_erc20_tokens = 1;
380 info!(num_erc20_tokens, "Setting up ERC-20 tokens");
381 erc20::setup(
382 signer_providers,
383 num_erc20_tokens,
384 self.max_concurrent_requests,
385 self.max_concurrent_transactions,
386 )
387 .await?
388 } else {
389 Vec::new()
390 };
391
392 let mpp_contract_address = if self.mpp_weight > 0.0 {
393 let addr = self.mpp_contract_address.expect("validated above");
394 mpp::setup(
395 signer_providers,
396 addr,
397 self.fee_token,
398 self.max_concurrent_requests,
399 self.max_concurrent_transactions,
400 )
401 .await?;
402 Some(addr)
403 } else {
404 None
405 };
406
407 let total_txs = self.tps * self.duration;
409 let tip20_weight = (self.tip20_weight * Self::WEIGHT_PRECISION).trunc() as u64;
410 let place_order_weight = (self.place_order_weight * Self::WEIGHT_PRECISION).trunc() as u64;
411 let swap_weight = (self.swap_weight * Self::WEIGHT_PRECISION).trunc() as u64;
412 let erc20_weight = (self.erc20_weight * Self::WEIGHT_PRECISION).trunc() as u64;
413 let mpp_weight = (self.mpp_weight * Self::WEIGHT_PRECISION).trunc() as u64;
414
415 let recipients = if self.existing_recipients {
416 let addrs: Vec<Address> = signer_providers
417 .iter()
418 .map(|(signer, _)| signer.address())
419 .collect();
420 info!(
421 recipients = addrs.len(),
422 "Using existing signer addresses as recipients"
423 );
424 Some(addrs)
425 } else {
426 None
427 };
428
429 let use_expiring_nonces = !self.use_2d_nonces && !self.use_standard_nonces;
430 let expiry_secs = if use_expiring_nonces {
431 Some(ExpiringNonceFiller::DEFAULT_EXPIRY_SECS)
432 } else {
433 None
434 };
435
436 let chain_id = provider.get_chain_id().await?;
437
438 let gen_input = GenerateTransactionsInput {
439 tip20_weight,
440 place_order_weight,
441 swap_weight,
442 erc20_weight,
443 mpp_weight,
444 quote_token,
445 user_tokens,
446 erc20_tokens,
447 mpp_contract_address,
448 chain_id,
449 recipients,
450 expiry_secs,
451 };
452
453 info!(total_txs, "Generating and sending transactions");
454
455 let counters = TransactionCounters::default();
456 let target_count = total_txs as usize;
457 let cancel_token = CancellationToken::new();
458
459 tokio::spawn(monitor_tps(
461 counters.clone(),
462 target_count,
463 cancel_token.clone(),
464 ));
465
466 let rate_limiter =
467 RateLimiter::direct(Quota::per_second(NonZeroU32::new(self.tps as u32).unwrap()));
468 let start_block_number = provider.get_block_number().await?;
469
470 let mut pending_txs =
471 generate_transactions(signer_provider_manager.clone(), gen_input, counters.clone())
472 .take(target_count)
474 .take_until(sleep(Duration::from_secs(self.duration)))
476 .buffer_unordered(self.tps as usize)
478 .filter_map(|result| async {
480 match result {
481 Ok(bytes) => Some(bytes),
482 Err(e) => {
483 debug!(?e, "Transaction generation failed");
484 None
485 }
486 }
487 })
488 .boxed()
489 .ratelimit_stream(&rate_limiter)
490 .zip(stream::repeat_with(|| {
492 signer_provider_manager.random_unsigned_provider()
493 }))
494 .map(|(bytes, provider)| async move {
496 tokio::time::timeout(
497 Duration::from_secs(1),
498 provider.send_raw_transaction(&bytes),
499 )
500 .await
501 })
502 .buffer_unordered(self.max_concurrent_requests)
504 .filter_map({
505 let counters = counters.clone();
506 move |result| {
507 let counters = counters.clone();
508 async move {
509 match result {
510 Ok(Ok(pending_tx)) => {
511 counters.sent.fetch_add(1, Ordering::Relaxed);
512 counters.success.fetch_add(1, Ordering::Relaxed);
513 Some(pending_tx)
514 }
515 Ok(Err(err)) => {
516 counters.sent.fetch_add(1, Ordering::Relaxed);
517 counters.failed.fetch_add(1, Ordering::Relaxed);
518 debug!(?err, "Failed to send transaction");
519 None
520 }
521 Err(_) => {
522 counters.sent.fetch_add(1, Ordering::Relaxed);
523 counters.timed_out.fetch_add(1, Ordering::Relaxed);
524 debug!("Transaction sending timed out");
525 None
526 }
527 }
528 }
529 }
530 })
531 .collect::<VecDeque<_>>()
532 .await;
533
534 cancel_token.cancel();
535
536 info!(
537 tip20_transfers = counters.tip20_transfers.load(Ordering::Relaxed),
538 swaps = counters.swaps.load(Ordering::Relaxed),
539 orders = counters.orders.load(Ordering::Relaxed),
540 erc20_transfers = counters.erc20_transfers.load(Ordering::Relaxed),
541 mpp_open_close = counters.mpp_open_close.load(Ordering::Relaxed),
542 success = counters.success.load(Ordering::Relaxed),
543 failed = counters.failed.load(Ordering::Relaxed),
544 timed_out = counters.timed_out.load(Ordering::Relaxed),
545 "Finished sending transactions"
546 );
547
548 let end_block_number = provider.get_block_number().await?;
549
550 let sample_size = pending_txs.len().min(self.sample_size);
552 let successful = Arc::new(AtomicUsize::new(0));
553 let timeout = Arc::new(AtomicUsize::new(0));
554 let failed = Arc::new(AtomicUsize::new(0));
555 info!(sample_size, "Collecting a sample of receipts");
556 stream::iter(0..sample_size)
557 .map(|_| {
558 let idx = random_range(0..pending_txs.len());
559 pending_txs.remove(idx).expect("index is in range")
560 })
561 .map(|pending_tx| {
562 let hash = *pending_tx.tx_hash();
563 pending_tx
564 .with_timeout(Some(Duration::from_secs(5)))
565 .get_receipt()
566 .map(move |result| (hash, result))
567 })
568 .buffered(self.max_concurrent_requests)
569 .for_each(async |(hash, result)| match result {
570 Ok(_) => {
571 successful.fetch_add(1, Ordering::Relaxed);
572 }
573 Err(PendingTransactionError::TxWatcher(WatchTxError::Timeout)) => {
574 timeout.fetch_add(1, Ordering::Relaxed);
575 error!(?hash, "Transaction receipt retrieval timed out");
576 }
577 Err(err) => {
578 failed.fetch_add(1, Ordering::Relaxed);
579 error!(?hash, "Transaction receipt retrieval failed: {}", err);
580 }
581 })
582 .await;
583 info!(
584 successful = successful.load(Ordering::Relaxed),
585 timeout = timeout.load(Ordering::Relaxed),
586 failed = failed.load(Ordering::Relaxed),
587 "Collected a sample of receipts"
588 );
589
590 generate_report(provider, start_block_number, end_block_number, &self).await?;
591
592 Ok(())
593 }
594}
595
596#[derive(Debug, Clone)]
597enum MnemonicArg {
598 Mnemonic(String),
599 Random,
600}
601
602impl FromStr for MnemonicArg {
603 type Err = MnemonicError;
604
605 fn from_str(s: &str) -> Result<Self, Self::Err> {
606 match s {
607 "random" => Ok(MnemonicArg::Random),
608 mnemonic => Ok(MnemonicArg::Mnemonic(
609 Mnemonic::<English>::from_str(mnemonic)?.to_phrase(),
610 )),
611 }
612 }
613}
614
615impl MnemonicArg {
616 fn resolve(&self) -> String {
617 match self {
618 MnemonicArg::Mnemonic(mnemonic) => mnemonic.clone(),
619 MnemonicArg::Random => Mnemonic::<English>::new(&mut rand_08::thread_rng()).to_phrase(),
620 }
621 }
622}
623
624#[derive(Clone, Default)]
625struct TransactionCounters {
626 tip20_transfers: Arc<AtomicUsize>,
628 swaps: Arc<AtomicUsize>,
629 orders: Arc<AtomicUsize>,
630 erc20_transfers: Arc<AtomicUsize>,
631 mpp_open_close: Arc<AtomicUsize>,
632 sent: Arc<AtomicUsize>,
634 success: Arc<AtomicUsize>,
635 failed: Arc<AtomicUsize>,
636 timed_out: Arc<AtomicUsize>,
637}
638
639#[derive(Clone)]
640struct GenerateTransactionsInput {
641 tip20_weight: u64,
642 place_order_weight: u64,
643 swap_weight: u64,
644 erc20_weight: u64,
645 mpp_weight: u64,
646 quote_token: Option<Address>,
647 user_tokens: Vec<Address>,
648 erc20_tokens: Vec<Address>,
649 mpp_contract_address: Option<Address>,
650 chain_id: u64,
651 recipients: Option<Vec<Address>>,
653 expiry_secs: Option<u64>,
656}
657
658fn generate_transactions<F: TxFiller<TempoNetwork> + 'static>(
660 signer_provider_manager: SignerProviderManager<F>,
661 input: GenerateTransactionsInput,
662 counters: TransactionCounters,
663) -> impl Stream<Item = impl Future<Output = eyre::Result<Vec<u8>>>> {
664 let GenerateTransactionsInput {
665 tip20_weight,
666 place_order_weight,
667 swap_weight,
668 erc20_weight,
669 mpp_weight,
670 quote_token,
671 user_tokens,
672 erc20_tokens,
673 mpp_contract_address,
674 chain_id,
675 recipients,
676 expiry_secs,
677 } = input;
678
679 const TX_TYPES: usize = 5;
680 let tx_weights: [u64; TX_TYPES] = [
682 tip20_weight,
683 swap_weight,
684 place_order_weight,
685 erc20_weight,
686 mpp_weight,
687 ];
688 let gas_estimates: [Arc<OnceLock<(u128, u128, u64)>>; TX_TYPES] = Default::default();
690 let tx_id = Arc::new(AtomicUsize::new(0));
693
694 stream::repeat_with(move || {
695 let signer_provider_manager = signer_provider_manager.clone();
696 let gas_estimates = gas_estimates.clone();
697 let tx_id = tx_id.clone();
698 let recipients = recipients.clone();
699 let user_tokens = user_tokens.clone();
700 let erc20_tokens = erc20_tokens.clone();
701 let counters = counters.clone();
702
703 async move {
704 let provider = signer_provider_manager.random_unsigned_provider();
705 let signer = signer_provider_manager.random_signer();
706 let token = user_tokens.choose(&mut rand::rng()).copied().unwrap();
707
708 let tx_index = tx_weights
710 .iter()
711 .enumerate()
712 .collect::<Vec<_>>()
713 .choose_weighted(&mut rand::rng(), |(_, weight)| *weight)?
714 .0;
715
716 let recipient = match &recipients {
717 Some(addrs) => *addrs.choose(&mut rand::rng()).unwrap(),
718 None => Address::random(),
719 };
720
721 let mut tx = match tx_index {
722 0 => {
723 counters.tip20_transfers.fetch_add(1, Ordering::Relaxed);
724 let token = ITIP20Instance::new(token, provider.clone());
725
726 token
728 .transfer(recipient, U256::ONE)
729 .into_transaction_request()
730 }
731 1 => {
732 counters.swaps.fetch_add(1, Ordering::Relaxed);
733 let exchange =
734 IStablecoinDEXInstance::new(STABLECOIN_DEX_ADDRESS, provider.clone());
735
736 let quote_token =
738 quote_token.expect("quote_token must be set when swap_weight > 0");
739 exchange
740 .quoteSwapExactAmountIn(token, quote_token, 1)
741 .into_transaction_request()
742 }
743 2 => {
744 counters.orders.fetch_add(1, Ordering::Relaxed);
745 let exchange =
746 IStablecoinDEXInstance::new(STABLECOIN_DEX_ADDRESS, provider.clone());
747
748 let tick = random_range(MIN_TICK / TICK_SPACING..=MAX_TICK / TICK_SPACING)
750 * TICK_SPACING;
751 exchange
752 .place(token, MIN_ORDER_AMOUNT, true, tick)
753 .into_transaction_request()
754 }
755 3 => {
756 counters.erc20_transfers.fetch_add(1, Ordering::Relaxed);
757 let token_address = erc20_tokens.choose(&mut rand::rng()).copied().unwrap();
758 let token = erc20::MockERC20::new(token_address, provider.clone());
759
760 token
762 .transfer(recipient, U256::ONE)
763 .into_transaction_request()
764 }
765 4 => {
766 counters.mpp_open_close.fetch_add(1, Ordering::Relaxed);
767 let contract = mpp_contract_address
768 .expect("mpp_channel_address must be set when mpp_weight > 0");
769
770 let salt = B256::random();
773 let payer = signer.address();
774 let channel_id = mpp::compute_channel_id(
775 payer,
776 payer,
777 token,
778 salt,
779 Address::ZERO,
780 contract,
781 chain_id,
782 );
783 mpp::build_open_and_close(contract, payer, token, salt, channel_id)
784 }
785 _ => unreachable!("Only {TX_TYPES} transaction types are supported"),
786 };
787
788 tx.inner.set_from(signer.address());
790
791 let gas = &gas_estimates[tx_index];
792 if let Some((max_fee_per_gas, max_priority_fee_per_gas, gas_limit)) = gas.get() {
795 tx.inner.set_max_fee_per_gas(*max_fee_per_gas);
796 tx.inner
797 .set_max_priority_fee_per_gas(*max_priority_fee_per_gas);
798 tx.inner.set_gas_limit(*gas_limit);
799 }
800
801 let filled = provider.fill(tx).await?;
805
806 if gas.get().is_none() {
809 let _ = gas.set(match &filled {
810 SendableTx::Builder(builder) => (
811 builder
812 .max_fee_per_gas()
813 .ok_or_eyre("max fee per gas should be filled")?,
814 builder
815 .max_priority_fee_per_gas()
816 .ok_or_eyre("max priority fee per gas should be filled")?,
817 builder
818 .gas_limit()
819 .ok_or_eyre("gas limit should be filled")?,
820 ),
821 SendableTx::Envelope(envelope) => (
822 envelope.max_fee_per_gas(),
823 envelope
824 .max_priority_fee_per_gas()
825 .ok_or_eyre("max priority fee per gas should be filled")?,
826 envelope.gas_limit(),
827 ),
828 });
829 }
830
831 let mut req = filled.try_into_request()?;
832
833 let id = tx_id.fetch_add(1, Ordering::Relaxed) as u128;
836 if let Some(fee) = req.max_priority_fee_per_gas() {
837 req.inner.set_max_priority_fee_per_gas(fee + id);
838 }
839 if let Some(fee) = req.max_fee_per_gas() {
840 req.inner.set_max_fee_per_gas(fee + id);
841 }
842
843 if let Some(expiry_secs) = expiry_secs {
845 let now = std::time::SystemTime::now()
846 .duration_since(std::time::UNIX_EPOCH)
847 .map(|d| d.as_secs())
848 .unwrap_or(0);
849 req.set_valid_before(now + expiry_secs);
850 }
851
852 let mut unsigned = req.build_unsigned()?;
854 let sig = signer.sign_transaction_sync(unsigned.as_dyn_signable_mut())?;
855 eyre::Ok(unsigned.into_envelope(sig).encoded_2718())
856 }
857 })
858}
859
860async fn fund_accounts(
862 provider: &DynProvider<TempoNetwork>,
863 addresses: &[Address],
864 max_concurrent_requests: usize,
865 max_concurrent_transactions: usize,
866) -> eyre::Result<()> {
867 info!(accounts = addresses.len(), "Funding accounts from faucet");
868 let progress = ProgressBar::new(addresses.len() as u64);
869
870 let chunks = addresses
871 .iter()
872 .map(|address| {
873 let address = *address;
874 provider.raw_request::<_, Vec<B256>>("tempo_fundAddress".into(), (address,))
875 })
876 .chunks(max_concurrent_transactions);
877
878 for chunk in chunks.into_iter() {
879 let tx_hashes = stream::iter(chunk)
880 .buffer_unordered(max_concurrent_requests)
881 .try_collect::<Vec<_>>()
882 .await?
883 .into_iter()
884 .inspect(|_| progress.inc(1))
885 .flatten()
886 .map(async |hash| {
887 Ok(
888 PendingTransactionBuilder::new(provider.root().clone(), hash)
889 .get_receipt()
890 .await?,
891 )
892 });
893 assert_receipts(tx_hashes, max_concurrent_requests)
894 .await
895 .expect("Failed to fund accounts");
896 }
897 Ok(())
898}
899
900pub fn increase_nofile_limit(min_limit: u64) -> eyre::Result<u64> {
901 let (soft, hard) = Resource::NOFILE.get()?;
902 info!(soft, hard, "File descriptor limit at startup");
903
904 if hard < min_limit {
905 panic!(
906 "File descriptor hard limit is too low. Please increase it to at least {min_limit}."
907 );
908 }
909
910 if soft != hard {
911 Resource::NOFILE.set(hard, hard)?; let (soft, hard) = Resource::NOFILE.get()?;
913 info!(soft, hard, "After increasing file descriptor limit");
914 }
915
916 Ok(soft)
917}
918
919#[derive(Serialize)]
920struct BenchmarkedBlock {
921 number: BlockNumber,
922 tx_count: usize,
923 ok_count: usize,
924 err_count: usize,
925 gas_used: u64,
926 timestamp: u64,
927 latency_ms: Option<u64>,
928}
929
930#[derive(Serialize)]
931struct BenchmarkMetadata {
932 target_tps: u64,
933 run_duration_secs: u64,
934 accounts: u64,
935 chain_id: u64,
936 total_connections: usize,
937 start_block: BlockNumber,
938 end_block: BlockNumber,
939 #[serde(skip_serializing_if = "Option::is_none")]
940 node_commit_sha: Option<String>,
941 #[serde(skip_serializing_if = "Option::is_none")]
942 build_profile: Option<String>,
943 #[serde(skip_serializing_if = "Option::is_none")]
944 mode: Option<String>,
945 tip20_weight: f64,
946 place_order_weight: f64,
947 swap_weight: f64,
948 erc20_weight: f64,
949 mpp_weight: f64,
950}
951
952#[derive(Serialize)]
953struct BenchmarkReport {
954 metadata: BenchmarkMetadata,
955 blocks: Vec<BenchmarkedBlock>,
956}
957
958pub async fn generate_report(
959 provider: DynProvider<TempoNetwork>,
960 start_block: BlockNumber,
961 end_block: BlockNumber,
962 args: &MaxTpsArgs,
963) -> eyre::Result<()> {
964 info!(start_block, end_block, "Generating report");
965
966 let mut last_block_timestamp: Option<u64> = None;
967
968 let mut benchmarked_blocks = Vec::new();
969
970 for number in (start_block + 1)..=end_block {
972 let block = provider
973 .get_block(number.into())
974 .await?
975 .ok_or_eyre("Block {number} not found")?;
976 let receipts = provider
977 .get_block_receipts(number.into())
978 .await?
979 .ok_or_eyre("Receipts for block {number} not found")?;
980
981 let timestamp = block.header.timestamp_millis();
982
983 let latency_ms = last_block_timestamp.map(|last| timestamp - last);
984 let (ok_count, err_count) =
985 receipts
986 .iter()
987 .fold((0, 0), |(successes, failures), receipt| {
988 if receipt.status() {
989 (successes + 1, failures)
990 } else {
991 (successes, failures + 1)
992 }
993 });
994
995 benchmarked_blocks.push(BenchmarkedBlock {
996 number,
997 tx_count: receipts.len(),
998 ok_count,
999 err_count,
1000 gas_used: block.header.gas_used(),
1001 timestamp: block.header.timestamp_millis(),
1002 latency_ms,
1003 });
1004
1005 last_block_timestamp = Some(timestamp);
1006 }
1007
1008 while benchmarked_blocks.first().is_some_and(|b| b.gas_used == 0) {
1010 benchmarked_blocks.remove(0);
1011 }
1012
1013 if let Some(first) = benchmarked_blocks.first_mut() {
1015 first.latency_ms = None;
1016 }
1017
1018 let metadata = BenchmarkMetadata {
1019 target_tps: args.tps,
1020 run_duration_secs: args.duration,
1021 accounts: args.accounts.get(),
1022 chain_id: provider.get_chain_id().await?,
1023 total_connections: args.max_concurrent_requests,
1024 start_block,
1025 end_block,
1026 node_commit_sha: args.node_commit_sha.clone(),
1027 build_profile: args.build_profile.clone(),
1028 mode: args.benchmark_mode.clone(),
1029 tip20_weight: args.tip20_weight,
1030 place_order_weight: args.place_order_weight,
1031 swap_weight: args.swap_weight,
1032 erc20_weight: args.erc20_weight,
1033 mpp_weight: args.mpp_weight,
1034 };
1035
1036 let report = BenchmarkReport {
1037 metadata,
1038 blocks: benchmarked_blocks,
1039 };
1040
1041 let path = "report.json";
1042 let file = File::create(path)?;
1043 let writer = BufWriter::new(file);
1044 serde_json::to_writer_pretty(writer, &report)?;
1045
1046 info!(path, "Generated report");
1047
1048 Ok(())
1049}
1050
1051async fn monitor_tps(counters: TransactionCounters, target_count: usize, token: CancellationToken) {
1052 let mut last_count = 0;
1053 let mut ticker = interval(Duration::from_secs(1));
1054
1055 loop {
1056 select! {
1057 _ = ticker.tick() => {
1058 let current_count = counters.sent.load(Ordering::Relaxed);
1059 let tps = current_count - last_count;
1060 last_count = current_count;
1061
1062 info!(
1063 tps,
1064 total = current_count,
1065 failed = counters.failed.load(Ordering::Relaxed),
1066 timed_out = counters.timed_out.load(Ordering::Relaxed),
1067 "Status"
1068 );
1069 tokio::time::sleep(Duration::from_secs(1)).await;
1070
1071 if current_count == target_count {
1072 break;
1073 }
1074 }
1075 _ = token.cancelled() => {
1076 break;
1077 }
1078 }
1079 }
1080}
1081
1082async fn join_all<
1083 T: Future<Output = alloy::contract::Result<PendingTransactionBuilder<TempoNetwork>>>,
1084>(
1085 futures: impl IntoIterator<Item = T>,
1086 max_concurrent_requests: usize,
1087 max_concurrent_transactions: usize,
1088) -> eyre::Result<()> {
1089 let chunks = futures.into_iter().chunks(max_concurrent_transactions);
1090
1091 for chunk in chunks.into_iter() {
1092 let pending_txs = stream::iter(chunk)
1094 .buffer_unordered(max_concurrent_requests)
1095 .try_collect::<Vec<_>>()
1096 .await?;
1097
1098 assert_receipts(
1100 pending_txs
1101 .into_iter()
1102 .map(|tx| async move { Ok(tx.get_receipt().await?) }),
1103 max_concurrent_requests,
1104 )
1105 .await?;
1106 }
1107
1108 Ok(())
1109}
1110
1111async fn assert_receipts<R: ReceiptResponse, F: Future<Output = eyre::Result<R>>>(
1112 receipts: impl IntoIterator<Item = F>,
1113 max_concurrent_requests: usize,
1114) -> eyre::Result<()> {
1115 stream::iter(receipts)
1116 .buffer_unordered(max_concurrent_requests)
1117 .try_for_each(|receipt| assert_receipt(receipt))
1118 .await
1119}
1120
1121async fn assert_receipt<R: ReceiptResponse>(receipt: R) -> eyre::Result<()> {
1122 eyre::ensure!(
1123 receipt.status(),
1124 "Transaction {} failed",
1125 receipt.transaction_hash()
1126 );
1127 Ok(())
1128}