1use std::sync::{
2 Arc,
3 atomic::{AtomicBool, Ordering},
4 mpsc::{self, Receiver, Sender},
5};
6
7use alloy_primitives::B256;
8use reth_engine_tree::tree::{CachedStateProvider, SavedCache};
9use reth_evm::{Evm, EvmEnvFor};
10use reth_revm::database::StateProviderDatabase;
11use reth_storage_api::{StateProviderBox, StateProviderFactory};
12use reth_tasks::{TaskExecutor, WorkerPool};
13use reth_transaction_pool::{
14 BestTransactions, PoolTransaction, error::InvalidPoolTransactionError,
15};
16use tempo_evm::{TempoEvmConfig, evm::TempoEvm};
17use tempo_transaction_pool::best::BestTransaction;
18use tracing::trace;
19
20type PrewarmEvmState = Option<TempoEvm<StateProviderDatabase<StateProviderBox>>>;
21
22pub(crate) struct BestTransactionsPrewarming {
27 transactions_rx: Receiver<Option<BestTransaction>>,
28 commands_tx: Sender<BestTransactionsCommand>,
29 stop: Arc<AtomicBool>,
30}
31
32impl BestTransactionsPrewarming {
33 pub(crate) fn new<Txs, Provider>(
35 executor: TaskExecutor,
36 provider: Provider,
37 cache: Option<SavedCache>,
38 parent_hash: B256,
39 evm_env: EvmEnvFor<TempoEvmConfig>,
40 best_txs: Txs,
41 ) -> Self
42 where
43 Txs: BestTransactions<Item = BestTransaction> + Send + 'static,
44 Provider: StateProviderFactory + Clone + 'static,
45 {
46 let (transactions_tx, transactions_rx) = mpsc::channel();
47 let (commands_tx, commands_rx) = mpsc::channel();
48 let stop = Arc::new(AtomicBool::new(false));
49 let prewarm = PrewarmingExecutionContext {
50 provider,
51 parent_hash,
52 cache,
53 evm_env,
54 stop: stop.clone(),
55 };
56
57 let this = Self {
58 transactions_rx,
59 commands_tx: commands_tx.clone(),
60 stop,
61 };
62
63 let prewarm_executor = executor.clone();
64 executor.spawn_blocking_named("builder-prewarm", move || {
65 Self::start_prewarming(
66 prewarm_executor,
67 BestTransactionsPrewarmingContext {
68 best_txs,
69 transactions_tx,
70 commands_rx,
71 commands_tx,
72 prewarm,
73 next_expiring_nonce_offset: 0,
74 },
75 );
76 });
77
78 this
79 }
80
81 fn start_prewarming<Txs, Provider>(
85 executor: TaskExecutor,
86 mut ctx: BestTransactionsPrewarmingContext<Txs, Provider>,
87 ) where
88 Txs: BestTransactions<Item = BestTransaction>,
89 Provider: StateProviderFactory + Clone + 'static,
90 {
91 let pool = executor.prewarming_pool();
92
93 pool.in_place_scope(|scope| {
94 let prewarm = ctx.prewarm.clone();
95 scope.spawn(move |_| {
96 pool.init::<PrewarmEvmState>(|_| prewarm.evm_for_ctx());
97 });
98
99 let advance = |ctx: &mut BestTransactionsPrewarmingContext<Txs, Provider>| {
100 let Some(tx) = ctx.best_txs.next() else {
101 let _ = ctx.transactions_tx.send(None);
102 return;
103 };
104 let expiring_nonce_offset = if tx.transaction.is_expiring_nonce() {
105 let offset = ctx.next_expiring_nonce_offset;
106 ctx.next_expiring_nonce_offset += 1;
107 Some(offset)
108 } else {
109 None
110 };
111 let _ = ctx.transactions_tx.send(Some(tx.clone()));
112
113 let prewarm = ctx.prewarm.clone();
114 let commands_tx = ctx.commands_tx.clone();
115 scope.spawn(move |_| {
116 Self::prewarm_transaction(prewarm, tx.clone(), expiring_nonce_offset);
117 let _ = commands_tx.send(BestTransactionsCommand::Advance);
118 });
119 };
120
121 for _ in 0..pool.current_num_threads() * 2 {
125 advance(&mut ctx);
126 }
127
128 while let Ok(command) = ctx.commands_rx.recv() {
129 match command {
130 BestTransactionsCommand::Advance => {
131 advance(&mut ctx);
132 }
133 BestTransactionsCommand::Invalid {
134 invalid,
135 old_rx,
136 new_tx,
137 } => {
138 ctx.best_txs.mark_invalid(&invalid.tx, invalid.kind);
139 ctx.transactions_tx = new_tx;
140
141 for tx in old_rx {
142 if let Some(tx) = tx
143 && !is_invalidated_buffered_transaction(&invalid.tx, &tx)
144 {
145 let _ = ctx.transactions_tx.send(Some(tx));
146 }
147 }
148 }
149 BestTransactionsCommand::NoUpdates => {
150 ctx.best_txs.no_updates();
151 }
152 BestTransactionsCommand::SkipBlobs(skip_blobs) => {
153 ctx.best_txs.set_skip_blobs(skip_blobs);
154 }
155 BestTransactionsCommand::Stop { drain_rx } => {
156 ctx.prewarm.stop();
157 drop(drain_rx);
158 return;
159 }
160 }
161 }
162 });
163
164 pool.clear();
165 }
166
167 fn prewarm_transaction<Provider>(
168 prewarm: PrewarmingExecutionContext<Provider>,
169 tx: BestTransaction,
170 expiring_nonce_offset: Option<usize>,
171 ) where
172 Provider: StateProviderFactory + Clone + 'static,
173 {
174 if prewarm.is_stopped() {
175 return;
176 }
177
178 WorkerPool::with_worker_mut(|worker| {
179 let Some(evm) = worker.get_or_init::<PrewarmEvmState>(|| prewarm.evm_for_ctx()) else {
180 return;
181 };
182
183 let tx_hash = *tx.hash();
184
185 if prewarm.is_stopped() {
186 return;
187 }
188
189 let mut tx_env = tx.transaction.clone_tx_env();
190 if let Some(tempo_tx_env) = tx_env.tempo_tx_env.as_mut() {
191 tempo_tx_env.expiring_nonce_idx = expiring_nonce_offset;
192 }
193
194 if let Err(err) = evm.transact_raw(tx_env) {
195 trace!(
196 target: "payload_builder",
197 %err,
198 ?tx_hash,
199 "Failed to prewarm transaction by execution"
200 );
201 return;
202 }
203
204 trace!(
205 target: "payload_builder",
206 ?tx_hash,
207 "Prewarmed transaction"
208 );
209 });
210 }
211}
212
213impl Drop for BestTransactionsPrewarming {
214 fn drop(&mut self) {
215 self.stop.store(true, Ordering::Relaxed);
216 let (_drain_tx, replacement_rx) = mpsc::channel();
218 let drain_rx = core::mem::replace(&mut self.transactions_rx, replacement_rx);
219 let _ = self
220 .commands_tx
221 .send(BestTransactionsCommand::Stop { drain_rx });
222 }
223}
224
225impl Iterator for BestTransactionsPrewarming {
226 type Item = BestTransaction;
227
228 fn next(&mut self) -> Option<Self::Item> {
229 if let Ok(Some(tx)) = self.transactions_rx.try_recv() {
230 return Some(tx);
231 }
232 self.commands_tx
233 .send(BestTransactionsCommand::Advance)
234 .ok()?;
235 self.transactions_rx.recv().ok().flatten()
236 }
237}
238
239impl BestTransactions for BestTransactionsPrewarming {
240 fn mark_invalid(&mut self, transaction: &Self::Item, kind: InvalidPoolTransactionError) {
241 let (new_tx, new_rx) = mpsc::channel();
242 let old_rx = core::mem::replace(&mut self.transactions_rx, new_rx);
243 let _ = self.commands_tx.send(BestTransactionsCommand::Invalid {
244 invalid: InvalidTransaction {
245 tx: transaction.clone(),
246 kind,
247 },
248 old_rx,
249 new_tx,
250 });
251 }
252
253 fn no_updates(&mut self) {
254 let _ = self.commands_tx.send(BestTransactionsCommand::NoUpdates);
255 }
256
257 fn set_skip_blobs(&mut self, skip_blobs: bool) {
258 let _ = self
259 .commands_tx
260 .send(BestTransactionsCommand::SkipBlobs(skip_blobs));
261 }
262}
263
264struct BestTransactionsPrewarmingContext<Txs, Provider> {
266 best_txs: Txs,
267 transactions_tx: Sender<Option<BestTransaction>>,
268 commands_tx: Sender<BestTransactionsCommand>,
269 commands_rx: Receiver<BestTransactionsCommand>,
270 prewarm: PrewarmingExecutionContext<Provider>,
271 next_expiring_nonce_offset: usize,
272}
273
274#[derive(Clone)]
276struct PrewarmingExecutionContext<Provider> {
277 provider: Provider,
278 parent_hash: B256,
279 cache: Option<SavedCache>,
280 evm_env: EvmEnvFor<TempoEvmConfig>,
281 stop: Arc<AtomicBool>,
282}
283
284impl<Provider> PrewarmingExecutionContext<Provider>
285where
286 Provider: StateProviderFactory + Clone + 'static,
287{
288 fn evm_for_ctx(&self) -> PrewarmEvmState {
289 let mut state_provider = match self.provider.state_by_block_hash(self.parent_hash) {
290 Ok(provider) => provider,
291 Err(err) => {
292 trace!(
293 target: "payload_builder",
294 %err,
295 parent_hash = ?self.parent_hash,
296 "failed to build state provider for transaction prewarming"
297 );
298 return None;
299 }
300 };
301
302 if let Some(cache) = &self.cache {
303 state_provider = Box::new(CachedStateProvider::new_prewarm(
304 state_provider,
305 cache.cache().clone(),
306 ));
307 }
308
309 let state_provider = StateProviderDatabase::new(state_provider);
310 let mut evm_env = self.evm_env.clone();
311 evm_env.cfg_env.disable_nonce_check = true;
312 evm_env.cfg_env.disable_balance_check = true;
313
314 Some(TempoEvm::new(state_provider, evm_env))
315 }
316
317 fn is_stopped(&self) -> bool {
318 self.stop.load(Ordering::Relaxed)
319 }
320
321 fn stop(&self) {
322 self.stop.store(true, Ordering::Relaxed);
323 }
324}
325
326#[derive(Debug)]
328enum BestTransactionsCommand {
329 Advance,
330 Invalid {
331 invalid: InvalidTransaction,
332 old_rx: Receiver<Option<BestTransaction>>,
333 new_tx: Sender<Option<BestTransaction>>,
334 },
335 NoUpdates,
336 SkipBlobs(bool),
337 Stop {
338 drain_rx: Receiver<Option<BestTransaction>>,
340 },
341}
342
343#[derive(Debug)]
345struct InvalidTransaction {
346 tx: BestTransaction,
347 kind: InvalidPoolTransactionError,
348}
349
350fn is_invalidated_buffered_transaction(
352 invalid: &BestTransaction,
353 candidate: &BestTransaction,
354) -> bool {
355 if invalid.transaction.is_expiring_nonce() {
358 return false;
359 }
360
361 if invalid.transaction.is_aa_2d() {
362 candidate
363 .transaction
364 .aa_transaction_id()
365 .zip(invalid.transaction.aa_transaction_id())
366 .is_some_and(|(candidate_id, invalid_id)| candidate_id.seq_id() == invalid_id.seq_id())
367 } else {
368 !candidate.transaction.is_aa_2d()
369 && candidate.transaction.sender() == invalid.transaction.sender()
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use super::*;
376 use alloy_consensus::{BlockHeader, Header, Signed, TxLegacy};
377 use alloy_primitives::{Address, Bytes, Signature, TxKind, U256};
378 use reth_evm::{ConfigureEvm, NextBlockEnvAttributes};
379 use reth_primitives_traits::{
380 Recovered, SealedHeader, transaction::error::InvalidTransactionError,
381 };
382 use reth_storage_api::noop::NoopProvider;
383 use reth_transaction_pool::{
384 TransactionOrigin, ValidPoolTransaction, identifier::TransactionId,
385 };
386 use std::{
387 collections::VecDeque,
388 sync::{Arc, Mutex},
389 thread,
390 time::{Duration, Instant},
391 };
392 use tempo_chainspec::TempoChainSpec;
393 use tempo_evm::{TempoEvmConfig, TempoNextBlockEnvAttributes};
394 use tempo_primitives::{TempoHeader, TempoPrimitives, TempoTxEnvelope};
395 use tempo_transaction_pool::transaction::TempoPooledTransaction;
396
397 #[derive(Debug, Default)]
398 struct TestLog {
399 yielded: usize,
400 empty_polls: usize,
401 invalid: usize,
402 no_updates: usize,
403 skip_blobs: Vec<bool>,
404 }
405
406 struct TestBestTransactions {
407 txs: VecDeque<BestTransaction>,
408 log: Arc<Mutex<TestLog>>,
409 }
410
411 impl TestBestTransactions {
412 fn new(txs: Vec<BestTransaction>, log: Arc<Mutex<TestLog>>) -> Self {
413 Self {
414 txs: txs.into(),
415 log,
416 }
417 }
418 }
419
420 impl Iterator for TestBestTransactions {
421 type Item = BestTransaction;
422
423 fn next(&mut self) -> Option<Self::Item> {
424 let tx = self.txs.pop_front();
425 {
426 let mut log = self.log.lock().unwrap();
427 if tx.is_some() {
428 log.yielded += 1;
429 } else {
430 log.empty_polls += 1;
431 }
432 }
433 if tx.is_none() {
434 thread::sleep(Duration::from_millis(1));
435 }
436 tx
437 }
438 }
439
440 impl BestTransactions for TestBestTransactions {
441 fn mark_invalid(&mut self, transaction: &Self::Item, _kind: InvalidPoolTransactionError) {
442 self.log.lock().unwrap().invalid += 1;
443 self.txs
444 .retain(|tx| !is_invalidated_buffered_transaction(transaction, tx));
445 }
446
447 fn no_updates(&mut self) {
448 self.log.lock().unwrap().no_updates += 1;
449 }
450
451 fn set_skip_blobs(&mut self, skip_blobs: bool) {
452 self.log.lock().unwrap().skip_blobs.push(skip_blobs);
453 }
454 }
455
456 fn test_tx(sender: Address, nonce: u64) -> BestTransaction {
457 test_tx_with_gas_limit(sender, nonce, 21_000)
458 }
459
460 fn test_tx_with_gas_limit(sender: Address, nonce: u64, gas_limit: u64) -> BestTransaction {
461 let tx = TxLegacy {
462 chain_id: Some(42431),
463 nonce,
464 gas_price: 20_000_000_000,
465 gas_limit,
466 to: TxKind::Call(Address::random()),
467 value: U256::ZERO,
468 input: Bytes::new(),
469 };
470 let envelope =
471 TempoTxEnvelope::Legacy(Signed::new_unhashed(tx, Signature::test_signature()));
472 let pooled = TempoPooledTransaction::new(Recovered::new_unchecked(envelope, sender));
473 Arc::new(ValidPoolTransaction {
474 transaction_id: TransactionId::new(0u64.into(), nonce),
475 transaction: pooled,
476 propagate: true,
477 timestamp: Instant::now(),
478 origin: TransactionOrigin::External,
479 authority_ids: None,
480 })
481 }
482
483 struct TestPrewarming {
484 prewarming: Option<BestTransactionsPrewarming>,
485 executor: TaskExecutor,
486 }
487
488 impl Drop for TestPrewarming {
489 fn drop(&mut self) {
490 drop(self.prewarming.take());
491 self.executor
492 .spawn_blocking_named("builder-prewarm", || {})
493 .get();
494 }
495 }
496
497 impl std::ops::Deref for TestPrewarming {
498 type Target = BestTransactionsPrewarming;
499
500 fn deref(&self) -> &Self::Target {
501 self.prewarming.as_ref().expect("prewarming exists")
502 }
503 }
504
505 impl std::ops::DerefMut for TestPrewarming {
506 fn deref_mut(&mut self) -> &mut Self::Target {
507 self.prewarming.as_mut().expect("prewarming exists")
508 }
509 }
510
511 fn prewarming(txs: Vec<BestTransaction>, log: Arc<Mutex<TestLog>>) -> TestPrewarming {
512 let executor = TaskExecutor::test();
513 prewarming_with_executor(executor, txs, log)
514 }
515
516 fn prewarming_with_executor(
517 executor: TaskExecutor,
518 txs: Vec<BestTransaction>,
519 log: Arc<Mutex<TestLog>>,
520 ) -> TestPrewarming {
521 let evm_config = TempoEvmConfig::moderato();
522 let provider =
523 NoopProvider::<TempoChainSpec, TempoPrimitives>::new(evm_config.chain_spec().clone());
524 let parent_header = SealedHeader::seal_slow(TempoHeader {
525 inner: Header {
526 number: 0,
527 timestamp: 1,
528 gas_limit: 30_000_000,
529 base_fee_per_gas: Some(1),
530 ..Default::default()
531 },
532 general_gas_limit: 30_000_000,
533 timestamp_millis_part: 0,
534 shared_gas_limit: 0,
535 ..Default::default()
536 });
537 let attributes = TempoNextBlockEnvAttributes {
538 inner: NextBlockEnvAttributes {
539 timestamp: 2,
540 suggested_fee_recipient: Address::ZERO,
541 prev_randao: B256::ZERO,
542 gas_limit: parent_header.gas_limit(),
543 parent_beacon_block_root: None,
544 withdrawals: None,
545 extra_data: Default::default(),
546 slot_number: None,
547 },
548 general_gas_limit: 30_000_000,
549 shared_gas_limit: 0,
550 timestamp_millis_part: 0,
551 consensus_context: None,
552 subblock_fee_recipients: Default::default(),
553 };
554 let evm_env = evm_config
555 .next_evm_env(&parent_header, &attributes)
556 .expect("test next block env");
557 let prewarming = BestTransactionsPrewarming::new(
558 executor.clone(),
559 provider,
560 None,
561 parent_header.hash(),
562 evm_env,
563 TestBestTransactions::new(txs, log),
564 );
565 TestPrewarming {
566 prewarming: Some(prewarming),
567 executor,
568 }
569 }
570
571 fn wait_until(mut condition: impl FnMut() -> bool) {
572 let deadline = Instant::now() + Duration::from_secs(1);
573 while Instant::now() < deadline {
574 if condition() {
575 return;
576 }
577 thread::sleep(Duration::from_millis(5));
578 }
579 assert!(condition(), "condition did not become true before timeout");
580 }
581
582 #[test]
583 fn source_ordering_is_unchanged_when_prewarming_is_enabled() {
584 let sender = Address::random();
585 let txs = vec![test_tx(sender, 0), test_tx(sender, 1), test_tx(sender, 2)];
586 let expected = txs.iter().map(|tx| *tx.hash()).collect::<Vec<_>>();
587 let log = Arc::new(Mutex::new(TestLog::default()));
588
589 let mut prewarming = prewarming(txs, log);
590 let actual = (0..expected.len())
591 .map(|_| *prewarming.next().expect("transaction").hash())
592 .collect::<Vec<_>>();
593
594 assert_eq!(actual, expected);
595 }
596
597 #[test]
598 fn prewarming_eagerly_drains_source_iterator() {
599 let sender = Address::random();
600 let executor = TaskExecutor::test();
601 let txs = (0..executor.prewarming_pool().current_num_threads() * 2 + 4)
602 .map(|nonce| test_tx(sender, nonce as u64))
603 .collect::<Vec<_>>();
604 let expected = txs.iter().map(|tx| *tx.hash()).collect::<Vec<_>>();
605 let log = Arc::new(Mutex::new(TestLog::default()));
606
607 let mut prewarming = prewarming_with_executor(executor, txs, log.clone());
608 wait_until(|| log.lock().unwrap().yielded == expected.len());
609
610 let actual = (0..expected.len())
611 .map(|_| *prewarming.next().expect("transaction").hash())
612 .collect::<Vec<_>>();
613 assert_eq!(actual, expected);
614 }
615
616 #[test]
617 fn empty_source_is_polled_for_eager_advances_and_each_consumer_advance() {
618 let executor = TaskExecutor::test();
619 let eager_advances = executor.prewarming_pool().current_num_threads() * 2;
620 let log = Arc::new(Mutex::new(TestLog::default()));
621 let mut prewarming = prewarming_with_executor(executor, Vec::new(), log.clone());
622
623 wait_until(|| log.lock().unwrap().empty_polls == eager_advances);
624
625 assert!(prewarming.next().is_none());
626 wait_until(|| log.lock().unwrap().empty_polls == eager_advances + 1);
627
628 assert!(prewarming.next().is_none());
629 wait_until(|| log.lock().unwrap().empty_polls == eager_advances + 2);
630 }
631
632 #[test]
633 fn mark_invalid_filters_already_buffered_invalidated_transactions() {
634 let sender = Address::random();
635 let mut sender_nonces = 0..;
636 let tx1 = test_tx(sender, sender_nonces.next().expect("first nonce"));
637 let tx2 = test_tx(sender, sender_nonces.next().expect("second nonce"));
638 let tx3 = test_tx(
639 Address::random(),
640 sender_nonces.next().expect("third nonce"),
641 );
642 let log = Arc::new(Mutex::new(TestLog::default()));
643
644 let mut prewarming = prewarming(vec![tx1.clone(), tx2.clone(), tx3.clone()], log.clone());
645 assert_eq!(
646 prewarming.next().as_ref().map(|tx| tx.hash()),
647 Some(tx1.hash())
648 );
649
650 wait_until(|| log.lock().unwrap().yielded == 3);
651 prewarming.mark_invalid(
652 &tx1,
653 InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
654 );
655
656 let next = prewarming.next().expect("non-invalidated transaction");
657 assert_eq!(next.hash(), tx3.hash());
658 assert_ne!(next.hash(), tx2.hash());
659 wait_until(|| log.lock().unwrap().invalid == 1);
660 }
661
662 #[test]
663 fn commands_are_forwarded_to_source_iterator() {
664 let log = Arc::new(Mutex::new(TestLog::default()));
665 let mut prewarming = prewarming(Vec::new(), log.clone());
666
667 prewarming.no_updates();
668 prewarming.set_skip_blobs(true);
669
670 wait_until(|| {
671 let log = log.lock().unwrap();
672 log.no_updates == 1 && log.skip_blobs == vec![true]
673 });
674 }
675
676 #[test]
677 fn prewarming_does_not_use_shared_worker_state_slot() {
678 let executor = TaskExecutor::test();
679 let pool = executor.prewarming_pool();
680 pool.init::<usize>(|existing| existing.map(|value| *value).unwrap_or(1));
681
682 let sender = Address::random();
683 let txs = vec![test_tx(sender, 0)];
684 let log = Arc::new(Mutex::new(TestLog::default()));
685 let mut prewarming = prewarming_with_executor(executor.clone(), txs, log);
686
687 assert!(prewarming.next().is_some());
688
689 pool.broadcast(pool.current_num_threads(), |worker| {
690 assert_eq!(*worker.get::<usize>(), 1);
691 });
692 }
693}