1mod dex;
2mod erc20;
3
4use alloy_consensus::Transaction;
5use itertools::Itertools;
6use rayon::iter::{IntoParallelIterator, ParallelIterator};
7use reth_tracing::{
8 RethTracer, Tracer,
9 tracing::{debug, error, info},
10};
11use tempo_alloy::{
12 TempoNetwork, primitives::TempoTxEnvelope, provider::ext::TempoProviderBuilderExt,
13};
14
15use alloy::{
16 consensus::BlockHeader,
17 eips::Encodable2718,
18 network::{ReceiptResponse, TransactionBuilder, TxSignerSync},
19 primitives::{Address, B256, BlockNumber, U256},
20 providers::{
21 DynProvider, PendingTransactionBuilder, PendingTransactionError, Provider, ProviderBuilder,
22 SendableTx, WatchTxError, fillers::TxFiller,
23 },
24 rpc::client::NoParams,
25 signers::local::{
26 PrivateKeySigner,
27 coins_bip39::{English, Mnemonic, MnemonicError},
28 },
29 transports::http::reqwest::Url,
30};
31use clap::Parser;
32use eyre::{Context, OptionExt, ensure};
33use futures::{
34 FutureExt, StreamExt, TryStreamExt,
35 future::BoxFuture,
36 stream::{self},
37};
38use governor::{Quota, RateLimiter, state::StreamRateLimitExt};
39use indicatif::{ParallelProgressIterator, ProgressBar, ProgressIterator};
40use rand::{random_range, seq::IndexedRandom};
41use rlimit::Resource;
42use serde::Serialize;
43use std::{
44 collections::VecDeque,
45 fs::File,
46 io::BufWriter,
47 num::{NonZeroU32, NonZeroU64},
48 str::FromStr,
49 sync::{
50 Arc, OnceLock,
51 atomic::{AtomicUsize, Ordering},
52 },
53 thread,
54 time::Duration,
55};
56use tempo_contracts::precompiles::{
57 IFeeManager::IFeeManagerInstance,
58 IRolesAuth,
59 IStablecoinExchange::IStablecoinExchangeInstance,
60 ITIP20::{self, ITIP20Instance},
61 ITIP20Factory, STABLECOIN_EXCHANGE_ADDRESS, TIP20_FACTORY_ADDRESS,
62};
63use tempo_precompiles::{
64 DEFAULT_FEE_TOKEN_PRE_ALLEGRETTO, TIP_FEE_MANAGER_ADDRESS,
65 stablecoin_exchange::{MAX_TICK, MIN_ORDER_AMOUNT, MIN_TICK, TICK_SPACING},
66 tip20::{ISSUER_ROLE, token_id_to_address},
67};
68use tokio::{
69 select,
70 time::{Sleep, interval, sleep},
71};
72use tokio_util::sync::CancellationToken;
73
74use crate::cmd::signer_providers::SignerProviderManager;
75
76#[derive(Parser, Debug)]
78pub struct MaxTpsArgs {
79 #[arg(short, long)]
81 tps: u64,
82
83 #[arg(short, long, default_value_t = 30)]
85 duration: u64,
86
87 #[arg(short, long, default_value_t = NonZeroU64::new(100).unwrap())]
89 accounts: NonZeroU64,
90
91 #[arg(short, long, default_value = "random")]
93 mnemonic: MnemonicArg,
94
95 #[arg(short, long, default_value_t = 0)]
96 from_mnemonic_index: u32,
97
98 #[arg(long, default_value_t = DEFAULT_FEE_TOKEN_PRE_ALLEGRETTO)]
99 fee_token: Address,
100
101 #[arg(long, default_values_t = vec!["http://localhost:8545".parse::<Url>().unwrap()])]
103 target_urls: Vec<Url>,
104
105 #[arg(long, default_value_t = 100)]
108 max_concurrent_requests: usize,
109
110 #[arg(long, default_value_t = 10000)]
115 max_concurrent_transactions: usize,
116
117 #[arg(long)]
119 fd_limit: Option<u64>,
120
121 #[arg(long)]
123 node_commit_sha: Option<String>,
124
125 #[arg(long)]
127 build_profile: Option<String>,
128
129 #[arg(long)]
131 benchmark_mode: Option<String>,
132
133 #[arg(long, default_value_t = 0.8)]
135 tip20_weight: f64,
136
137 #[arg(long, default_value_t = 0.01)]
139 place_order_weight: f64,
140
141 #[arg(long, default_value_t = 0.19)]
143 swap_weight: f64,
144
145 #[arg(long, default_value_t = 0.0)]
147 erc20_weight: f64,
148
149 #[arg(long, default_value_t = 100)]
151 sample_size: usize,
152
153 #[arg(long)]
157 faucet: bool,
158
159 #[arg(long)]
163 clear_txpool: bool,
164
165 #[arg(long)]
167 disable_2d_nonces: bool,
168}
169
170impl MaxTpsArgs {
171 const WEIGHT_PRECISION: f64 = 1000.0;
172
173 pub async fn run(self) -> eyre::Result<()> {
174 RethTracer::new().init()?;
175
176 let accounts = self.accounts.get();
177
178 if let Some(fd_limit) = self.fd_limit {
180 increase_nofile_limit(fd_limit).context("Failed to increase nofile limit")?;
181 }
182
183 let signer_provider_factory = Box::new(|signer, target_url, cached_nonce_manager| {
184 ProviderBuilder::default()
185 .fetch_chain_id()
186 .with_gas_estimation()
187 .with_nonce_management(cached_nonce_manager)
188 .wallet(signer)
189 .connect_http(target_url)
190 .erased()
191 });
192
193 if self.disable_2d_nonces {
194 info!(
195 accounts = self.accounts,
196 "Creating signers (with standard nonces)"
197 );
198 let signer_provider_manager = SignerProviderManager::new(
199 self.mnemonic.resolve(),
200 self.from_mnemonic_index,
201 accounts,
202 self.target_urls.clone(),
203 Box::new(|target_url, cached_nonce_manager| {
204 ProviderBuilder::default()
205 .fetch_chain_id()
206 .with_gas_estimation()
207 .with_nonce_management(cached_nonce_manager)
208 .connect_http(target_url)
209 }),
210 signer_provider_factory,
211 );
212 self.run_with_manager(signer_provider_manager).await
213 } else {
214 info!(
215 accounts = self.accounts,
216 "Creating signers (with 2D nonces)"
217 );
218 let signer_provider_manager = SignerProviderManager::new(
219 self.mnemonic.resolve(),
220 self.from_mnemonic_index,
221 accounts,
222 self.target_urls.clone(),
223 Box::new(|target_url, _cached_nonce_manager| {
224 ProviderBuilder::new_with_network::<TempoNetwork>()
225 .with_random_2d_nonces()
226 .connect_http(target_url)
227 }),
228 signer_provider_factory,
229 );
230 self.run_with_manager(signer_provider_manager).await
231 }
232 }
233
234 async fn run_with_manager<F: TxFiller<TempoNetwork> + 'static>(
235 self,
236 signer_provider_manager: SignerProviderManager<F>,
237 ) -> eyre::Result<()> {
238 let accounts = self.accounts.get();
239 let signer_providers = signer_provider_manager.signer_providers();
240
241 if self.clear_txpool {
242 for (target_url, provider) in signer_provider_manager.target_url_providers() {
243 let transactions: u64 = provider
244 .raw_request("admin_clearTxpool".into(), NoParams::default())
245 .await
246 .context(
247 format!("Failed to clear transaction pool for {target_url}. Is `admin_clearTxpool` RPC method available?"),
248 )?;
249 info!(%target_url, transactions, "Cleared transaction pool");
250 }
251 }
252
253 let provider = signer_providers[0].1.clone();
255
256 if self.faucet {
258 fund_accounts(
259 &provider,
260 &signer_providers
261 .iter()
262 .map(|(signer, _)| signer.address())
263 .collect::<Vec<_>>(),
264 self.max_concurrent_requests,
265 self.max_concurrent_transactions,
266 )
267 .await
268 .context("Failed to fund accounts from faucet")?;
269 }
270
271 info!(fee_token = %self.fee_token, "Setting default fee token");
272 join_all(
273 signer_providers
274 .iter()
275 .map(async |(_, provider)| {
276 IFeeManagerInstance::new(TIP_FEE_MANAGER_ADDRESS, provider.clone())
277 .setUserToken(self.fee_token)
278 .send()
279 .await
280 })
281 .progress(),
282 self.max_concurrent_requests,
283 self.max_concurrent_transactions,
284 )
285 .await
286 .context("Failed to set default fee token")?;
287
288 let user_tokens = 2;
290 info!(user_tokens, "Setting up DEX");
291 let (quote_token, user_tokens) = dex::setup(
292 signer_providers,
293 user_tokens,
294 self.max_concurrent_requests,
295 self.max_concurrent_transactions,
296 )
297 .await?;
298
299 let erc20_tokens = if self.erc20_weight > 0.0 {
300 let num_erc20_tokens = 1;
301 info!(num_erc20_tokens, "Setting up ERC-20 tokens");
302 erc20::setup(
303 signer_providers,
304 num_erc20_tokens,
305 self.max_concurrent_requests,
306 self.max_concurrent_transactions,
307 )
308 .await?
309 } else {
310 Vec::new()
311 };
312
313 let total_txs = self.tps * self.duration;
315 let tip20_weight = (self.tip20_weight * Self::WEIGHT_PRECISION).trunc() as u64;
316 let place_order_weight = (self.place_order_weight * Self::WEIGHT_PRECISION).trunc() as u64;
317 let swap_weight = (self.swap_weight * Self::WEIGHT_PRECISION).trunc() as u64;
318 let erc20_weight = (self.erc20_weight * Self::WEIGHT_PRECISION).trunc() as u64;
319 let transactions = generate_transactions(GenerateTransactionsInput {
320 total_txs,
321 accounts,
322 signer_provider_manager: signer_provider_manager.clone(),
323 max_concurrent_requests: self.max_concurrent_requests,
324 tip20_weight,
325 place_order_weight,
326 swap_weight,
327 erc20_weight,
328 quote_token,
329 user_tokens,
330 erc20_tokens,
331 })
332 .await
333 .context("Failed to generate transactions")?;
334
335 let mut pending_txs = send_transactions(
337 transactions,
338 signer_provider_manager.clone(),
339 self.max_concurrent_requests,
340 self.tps,
341 sleep(Duration::from_secs(self.duration)),
342 )
343 .await;
344 let end_block_number = provider.get_block_number().await?;
345
346 info!("Retrieving first block number from sent transactions");
347 let start_block_number = loop {
348 if let Some(first_tx) = pending_txs.pop_front() {
349 debug!(hash = %first_tx.tx_hash(), "Retrieving transaction receipt for first block number");
350 if let Ok(first_tx_receipt) = first_tx
351 .with_timeout(Some(Duration::from_secs(5)))
352 .get_receipt()
353 .await
354 {
355 break first_tx_receipt.block_number;
356 }
357 } else {
358 break None;
359 }
360 };
361 let Some(start_block_number) = start_block_number else {
362 eyre::bail!("Failed to retrieve start block number")
363 };
364
365 let sample_size = pending_txs.len().min(self.sample_size);
367 let successful = Arc::new(AtomicUsize::new(0));
368 let timeout = Arc::new(AtomicUsize::new(0));
369 let failed = Arc::new(AtomicUsize::new(0));
370 info!(sample_size, "Collecting a sample of receipts");
371 stream::iter(0..sample_size)
372 .map(|_| {
373 let idx = random_range(0..pending_txs.len());
374 pending_txs.remove(idx).expect("index is in range")
375 })
376 .map(|pending_tx| {
377 let hash = *pending_tx.tx_hash();
378 pending_tx
379 .with_timeout(Some(Duration::from_secs(5)))
380 .get_receipt()
381 .map(move |result| (hash, result))
382 })
383 .buffered(self.max_concurrent_requests)
384 .for_each(async |(hash, result)| match result {
385 Ok(_) => {
386 successful.fetch_add(1, Ordering::Relaxed);
387 }
388 Err(PendingTransactionError::TxWatcher(WatchTxError::Timeout)) => {
389 timeout.fetch_add(1, Ordering::Relaxed);
390 error!(?hash, "Transaction receipt retrieval timed out");
391 }
392 Err(err) => {
393 failed.fetch_add(1, Ordering::Relaxed);
394 error!(?hash, "Transaction receipt retrieval failed: {}", err);
395 }
396 })
397 .await;
398 info!(
399 successful = successful.load(Ordering::Relaxed),
400 timeout = timeout.load(Ordering::Relaxed),
401 failed = failed.load(Ordering::Relaxed),
402 "Collected a sample of receipts"
403 );
404
405 generate_report(provider, start_block_number, end_block_number, &self).await?;
406
407 Ok(())
408 }
409}
410
411#[derive(Debug, Clone)]
412enum MnemonicArg {
413 Mnemonic(String),
414 Random,
415}
416
417impl FromStr for MnemonicArg {
418 type Err = MnemonicError;
419
420 fn from_str(s: &str) -> Result<Self, Self::Err> {
421 match s {
422 "random" => Ok(MnemonicArg::Random),
423 mnemonic => Ok(MnemonicArg::Mnemonic(
424 Mnemonic::<English>::from_str(mnemonic)?.to_phrase(),
425 )),
426 }
427 }
428}
429
430impl MnemonicArg {
431 fn resolve(&self) -> String {
432 match self {
433 MnemonicArg::Mnemonic(mnemonic) => mnemonic.clone(),
434 MnemonicArg::Random => Mnemonic::<English>::new(&mut rand_08::thread_rng()).to_phrase(),
435 }
436 }
437}
438
439async fn send_transactions<F: TxFiller<TempoNetwork> + 'static>(
441 transactions: Vec<Vec<u8>>,
442 signer_provider_manager: SignerProviderManager<F>,
443 max_concurrent_requests: usize,
444 tps: u64,
445 deadline: Sleep,
446) -> VecDeque<PendingTransactionBuilder<TempoNetwork>> {
447 info!(
448 transactions = transactions.len(),
449 max_concurrent_requests, tps, "Sending transactions"
450 );
451
452 let tx_counter = Arc::new(AtomicUsize::new(0));
454
455 let cancel = CancellationToken::new();
457 let _drop_guard = cancel.clone().drop_guard();
458 tokio::spawn(monitor_tps(
459 tx_counter.clone(),
460 transactions.len(),
461 cancel.clone(),
462 ));
463
464 let rate_limiter = RateLimiter::direct(Quota::per_second(NonZeroU32::new(tps as u32).unwrap()));
466
467 let failed = Arc::new(AtomicUsize::new(0));
468 let timeout = Arc::new(AtomicUsize::new(0));
469 let transactions = stream::iter(transactions)
470 .ratelimit_stream(&rate_limiter)
471 .zip(stream::repeat_with(|| {
472 signer_provider_manager.random_unsigned_provider()
473 }))
474 .map(|(bytes, provider)| async move {
475 tokio::time::timeout(
476 Duration::from_secs(1),
477 provider.send_raw_transaction(&bytes),
478 )
479 .await
480 })
481 .buffer_unordered(max_concurrent_requests)
482 .filter_map(|result| async {
483 match result {
484 Ok(Ok(pending_tx)) => {
485 tx_counter.fetch_add(1, Ordering::Relaxed);
486 Some(pending_tx)
487 }
488 Ok(Err(err)) => {
489 failed.fetch_add(1, Ordering::Relaxed);
490 debug!(?err, "Failed to send transaction");
491 None
492 }
493 Err(_) => {
494 timeout.fetch_add(1, Ordering::Relaxed);
495 debug!("Transaction sending timed out");
496 None
497 }
498 }
499 })
500 .take_until(deadline)
501 .collect()
502 .await;
503
504 info!(
505 success = tx_counter.load(Ordering::Relaxed),
506 failed = failed.load(Ordering::Relaxed),
507 timeout = timeout.load(Ordering::Relaxed),
508 "Finished sending transactions"
509 );
510
511 transactions
512}
513
514async fn generate_transactions<F: TxFiller<TempoNetwork> + 'static>(
515 input: GenerateTransactionsInput<F>,
516) -> eyre::Result<Vec<Vec<u8>>> {
517 let GenerateTransactionsInput {
518 total_txs,
519 accounts,
520 signer_provider_manager,
521 max_concurrent_requests,
522 tip20_weight,
523 place_order_weight,
524 swap_weight,
525 erc20_weight,
526 quote_token,
527 user_tokens,
528 erc20_tokens,
529 } = input;
530
531 let txs_per_sender = total_txs / accounts;
532 ensure!(
533 txs_per_sender > 0,
534 "txs per sender is 0, increase tps or decrease senders"
535 );
536
537 info!(transactions = total_txs, "Generating transactions");
538
539 const TX_TYPES: usize = 4;
540 let tx_weights: [u64; TX_TYPES] = [tip20_weight, swap_weight, place_order_weight, erc20_weight];
542 let gas_estimates: [Arc<OnceLock<(u128, u128, u64)>>; TX_TYPES] = Default::default();
544
545 let tip20_transfers = Arc::new(AtomicUsize::new(0));
547 let swaps = Arc::new(AtomicUsize::new(0));
548 let orders = Arc::new(AtomicUsize::new(0));
549 let erc20_transfers = Arc::new(AtomicUsize::new(0));
550
551 let builders = ProgressBar::new(total_txs)
552 .wrap_stream(stream::iter(
553 std::iter::repeat_with(|| signer_provider_manager.random_unsigned_provider())
554 .take(total_txs as usize),
555 ))
556 .map(async |provider| {
557 let token = user_tokens.choose(&mut rand::rng()).copied().unwrap();
558
559 let tx_index = tx_weights
561 .iter()
562 .enumerate()
563 .collect::<Vec<_>>()
564 .choose_weighted(&mut rand::rng(), |(_, weight)| *weight)?
565 .0;
566
567 let mut tx = match tx_index {
568 0 => {
569 tip20_transfers.fetch_add(1, Ordering::Relaxed);
570 let token = ITIP20Instance::new(token, provider.clone());
571
572 token
574 .transfer(Address::random(), U256::ONE)
575 .into_transaction_request()
576 }
577 1 => {
578 swaps.fetch_add(1, Ordering::Relaxed);
579 let exchange = IStablecoinExchangeInstance::new(
580 STABLECOIN_EXCHANGE_ADDRESS,
581 provider.clone(),
582 );
583
584 exchange
586 .quoteSwapExactAmountIn(token, quote_token, 1)
587 .into_transaction_request()
588 }
589 2 => {
590 orders.fetch_add(1, Ordering::Relaxed);
591 let exchange = IStablecoinExchangeInstance::new(
592 STABLECOIN_EXCHANGE_ADDRESS,
593 provider.clone(),
594 );
595
596 let tick =
598 rand::random_range(MIN_TICK / TICK_SPACING..=MAX_TICK / TICK_SPACING)
599 * TICK_SPACING;
600 exchange
601 .place(token, MIN_ORDER_AMOUNT, true, tick)
602 .into_transaction_request()
603 }
604 3 => {
605 erc20_transfers.fetch_add(1, Ordering::Relaxed);
606 let token_address = erc20_tokens.choose(&mut rand::rng()).copied().unwrap();
607 let token = erc20::MockERC20::new(token_address, provider.clone());
608
609 token
611 .transfer(Address::random(), U256::ONE)
612 .into_transaction_request()
613 }
614 _ => unreachable!("Only {TX_TYPES} transaction types are supported"),
615 };
616
617 let signer = signer_provider_manager.random_signer();
619 tx.inner.set_from(signer.address());
620
621 let gas = &gas_estimates[tx_index];
622 if let Some((max_fee_per_gas, max_priority_fee_per_gas, gas_limit)) = gas.get() {
625 tx.inner.set_max_fee_per_gas(*max_fee_per_gas);
626 tx.inner
627 .set_max_priority_fee_per_gas(*max_priority_fee_per_gas);
628 tx.inner.set_gas_limit(*gas_limit);
629 }
630
631 let tx = provider.fill(tx).await?;
635
636 if gas.get().is_none() {
639 let _ = gas.set(match &tx {
640 SendableTx::Builder(builder) => (
641 builder
642 .max_fee_per_gas()
643 .ok_or_eyre("max fee per gas should be filled")?,
644 builder
645 .max_priority_fee_per_gas()
646 .ok_or_eyre("max priority fee per gas should be filled")?,
647 builder
648 .gas_limit()
649 .ok_or_eyre("gas limit should be filled")?,
650 ),
651 SendableTx::Envelope(envelope) => (
652 envelope.max_fee_per_gas(),
653 envelope
654 .max_priority_fee_per_gas()
655 .ok_or_eyre("max priority fee per gas should be filled")?,
656 envelope.gas_limit(),
657 ),
658 });
659 }
660
661 eyre::Ok((tx.try_into_request()?, signer))
662 })
663 .buffer_unordered(max_concurrent_requests)
664 .try_collect::<Vec<_>>()
665 .await?;
666 info!(
667 transactions = builders.len(),
668 tip20_transfers = tip20_transfers.load(Ordering::Relaxed),
669 swaps = swaps.load(Ordering::Relaxed),
670 orders = orders.load(Ordering::Relaxed),
671 erc20_transfers = erc20_transfers.load(Ordering::Relaxed),
672 "Generated transactions",
673 );
674
675 info!(transactions = builders.len(), "Signing transactions");
676 let transactions = builders
678 .into_par_iter()
679 .progress()
680 .map(|(tx, signer)| -> eyre::Result<TempoTxEnvelope> {
681 let mut tx = tx.build_unsigned()?;
682 let sig = signer.sign_transaction_sync(tx.as_dyn_signable_mut())?;
683 Ok(tx.into_envelope(sig))
684 })
685 .map(|result| result.map(|tx| tx.encoded_2718()))
686 .collect::<eyre::Result<Vec<_>>>()?;
687
688 Ok(transactions)
689}
690
691async fn fund_accounts(
693 provider: &DynProvider<TempoNetwork>,
694 addresses: &[Address],
695 max_concurrent_requests: usize,
696 max_concurrent_transactions: usize,
697) -> eyre::Result<()> {
698 info!(accounts = addresses.len(), "Funding accounts from faucet");
699 let progress = ProgressBar::new(addresses.len() as u64);
700
701 let chunks = addresses
702 .iter()
703 .map(|address| {
704 let address = *address;
705 provider.raw_request::<_, Vec<B256>>("tempo_fundAddress".into(), (address,))
706 })
707 .chunks(max_concurrent_transactions);
708
709 for chunk in chunks.into_iter() {
710 let tx_hashes = stream::iter(chunk)
711 .buffer_unordered(max_concurrent_requests)
712 .try_collect::<Vec<_>>()
713 .await?
714 .into_iter()
715 .inspect(|_| progress.inc(1))
716 .flatten()
717 .map(async |hash| {
718 Ok(
719 PendingTransactionBuilder::new(provider.root().clone(), hash)
720 .get_receipt()
721 .await?,
722 )
723 });
724 assert_receipts(tx_hashes, max_concurrent_requests)
725 .await
726 .expect("Failed to fund accounts");
727 }
728 Ok(())
729}
730
731pub fn increase_nofile_limit(min_limit: u64) -> eyre::Result<u64> {
732 let (soft, hard) = Resource::NOFILE.get()?;
733 info!(soft, hard, "File descriptor limit at startup");
734
735 if hard < min_limit {
736 panic!(
737 "File descriptor hard limit is too low. Please increase it to at least {min_limit}."
738 );
739 }
740
741 if soft != hard {
742 Resource::NOFILE.set(hard, hard)?; let (soft, hard) = Resource::NOFILE.get()?;
744 info!(soft, hard, "After increasing file descriptor limit");
745 }
746
747 Ok(soft)
748}
749
750#[derive(Serialize)]
751struct BenchmarkedBlock {
752 number: BlockNumber,
753 tx_count: usize,
754 ok_count: usize,
755 err_count: usize,
756 gas_used: u64,
757 timestamp: u64,
758 latency_ms: Option<u64>,
759}
760
761#[derive(Serialize)]
762struct BenchmarkMetadata {
763 target_tps: u64,
764 run_duration_secs: u64,
765 accounts: u64,
766 chain_id: u64,
767 total_connections: usize,
768 start_block: BlockNumber,
769 end_block: BlockNumber,
770 #[serde(skip_serializing_if = "Option::is_none")]
771 node_commit_sha: Option<String>,
772 #[serde(skip_serializing_if = "Option::is_none")]
773 build_profile: Option<String>,
774 #[serde(skip_serializing_if = "Option::is_none")]
775 mode: Option<String>,
776 tip20_weight: f64,
777 place_order_weight: f64,
778 swap_weight: f64,
779 erc20_weight: f64,
780}
781
782#[derive(Serialize)]
783struct BenchmarkReport {
784 metadata: BenchmarkMetadata,
785 blocks: Vec<BenchmarkedBlock>,
786}
787
788pub async fn generate_report(
789 provider: DynProvider<TempoNetwork>,
790 start_block: BlockNumber,
791 end_block: BlockNumber,
792 args: &MaxTpsArgs,
793) -> eyre::Result<()> {
794 info!(start_block, end_block, "Generating report");
795
796 let mut last_block_timestamp: Option<u64> = None;
797
798 let mut benchmarked_blocks = Vec::new();
799
800 for number in start_block..=end_block {
801 let block = provider
802 .get_block(number.into())
803 .await?
804 .ok_or_eyre("Block {number} not found")?;
805 let receipts = provider
806 .get_block_receipts(number.into())
807 .await?
808 .ok_or_eyre("Receipts for block {number} not found")?;
809 let timestamp = block.header.timestamp_millis();
810
811 let latency_ms = last_block_timestamp.map(|last| timestamp - last);
812 let (ok_count, err_count) =
813 receipts
814 .iter()
815 .fold((0, 0), |(successes, failures), receipt| {
816 if receipt.status() {
817 (successes + 1, failures)
818 } else {
819 (successes, failures + 1)
820 }
821 });
822
823 benchmarked_blocks.push(BenchmarkedBlock {
824 number,
825 tx_count: receipts.len(),
826 ok_count,
827 err_count,
828 gas_used: block.header.gas_used(),
829 timestamp: block.header.timestamp_millis(),
830 latency_ms,
831 });
832
833 last_block_timestamp = Some(timestamp);
834 }
835
836 let metadata = BenchmarkMetadata {
837 target_tps: args.tps,
838 run_duration_secs: args.duration,
839 accounts: args.accounts.get(),
840 chain_id: provider.get_chain_id().await?,
841 total_connections: args.max_concurrent_requests,
842 start_block,
843 end_block,
844 node_commit_sha: args.node_commit_sha.clone(),
845 build_profile: args.build_profile.clone(),
846 mode: args.benchmark_mode.clone(),
847 tip20_weight: args.tip20_weight,
848 place_order_weight: args.place_order_weight,
849 swap_weight: args.swap_weight,
850 erc20_weight: args.erc20_weight,
851 };
852
853 let report = BenchmarkReport {
854 metadata,
855 blocks: benchmarked_blocks,
856 };
857
858 let path = "report.json";
859 let file = File::create(path)?;
860 let writer = BufWriter::new(file);
861 serde_json::to_writer_pretty(writer, &report)?;
862
863 info!(path, "Generated report");
864
865 Ok(())
866}
867
868async fn monitor_tps(tx_counter: Arc<AtomicUsize>, target_count: usize, token: CancellationToken) {
869 let mut last_count = 0;
870 let mut ticker = interval(Duration::from_secs(1));
871
872 loop {
873 select! {
874 _ = ticker.tick() => {
875 let current_count = tx_counter.load(Ordering::Relaxed);
876 let tps = current_count - last_count;
877 last_count = current_count;
878
879 info!(tps, total = current_count, "Status");
880 thread::sleep(Duration::from_secs(1));
881
882 if current_count == target_count {
883 break;
884 }
885 }
886 _ = token.cancelled() => {
887 break;
888 }
889 }
890 }
891}
892
893async fn join_all<
894 T: Future<Output = alloy::contract::Result<PendingTransactionBuilder<TempoNetwork>>>,
895>(
896 futures: impl IntoIterator<Item = T>,
897 max_concurrent_requests: usize,
898 max_concurrent_transactions: usize,
899) -> eyre::Result<()> {
900 let chunks = futures.into_iter().chunks(max_concurrent_transactions);
901
902 for chunk in chunks.into_iter() {
903 let pending_txs = stream::iter(chunk)
905 .buffer_unordered(max_concurrent_requests)
906 .try_collect::<Vec<_>>()
907 .await?;
908
909 assert_receipts(
911 pending_txs
912 .into_iter()
913 .map(|tx| async move { Ok(tx.get_receipt().await?) }),
914 max_concurrent_requests,
915 )
916 .await?;
917 }
918
919 Ok(())
920}
921
922async fn assert_receipts<R: ReceiptResponse, F: Future<Output = eyre::Result<R>>>(
923 receipts: impl IntoIterator<Item = F>,
924 max_concurrent_requests: usize,
925) -> eyre::Result<()> {
926 stream::iter(receipts.into_iter())
927 .buffer_unordered(max_concurrent_requests)
928 .try_for_each(|receipt| assert_receipt(receipt))
929 .await
930}
931
932async fn assert_receipt<R: ReceiptResponse>(receipt: R) -> eyre::Result<()> {
933 eyre::ensure!(
934 receipt.status(),
935 "Transaction {} failed",
936 receipt.transaction_hash()
937 );
938 Ok(())
939}
940
941struct GenerateTransactionsInput<F: TxFiller<TempoNetwork>> {
942 total_txs: u64,
943 accounts: u64,
944 signer_provider_manager: SignerProviderManager<F>,
945 max_concurrent_requests: usize,
946 tip20_weight: u64,
947 place_order_weight: u64,
948 swap_weight: u64,
949 erc20_weight: u64,
950 quote_token: Address,
951 user_tokens: Vec<Address>,
952 erc20_tokens: Vec<Address>,
953}