Skip to main content

tempo_payload_builder/
prewarming.rs

1use std::sync::{
2    Arc,
3    atomic::{AtomicBool, Ordering},
4    mpsc::{self, Receiver, Sender},
5};
6
7use alloy_primitives::B256;
8use reth_engine_tree::tree::{CachedStateProvider, SavedCache};
9use reth_evm::{Evm, EvmEnvFor};
10use reth_revm::database::StateProviderDatabase;
11use reth_storage_api::{StateProviderBox, StateProviderFactory};
12use reth_tasks::{TaskExecutor, WorkerPool};
13use reth_transaction_pool::{
14    BestTransactions, PoolTransaction, error::InvalidPoolTransactionError,
15};
16use tempo_evm::{TempoEvmConfig, evm::TempoEvm};
17use tempo_transaction_pool::best::BestTransaction;
18use tracing::trace;
19
20type PrewarmEvmState = Option<TempoEvm<StateProviderDatabase<StateProviderBox>>>;
21
22/// Prewarming orchestrator that consumes source [`BestTransactions`] with bounded
23/// lookahead, prewarms buffered transactions in parallel, and produces a new
24/// [`BestTransactions`] iterator with the source order and invalidations triggered
25/// by [`Self::mark_invalid`] preserved.
26pub(crate) struct BestTransactionsPrewarming {
27    transactions_rx: Receiver<Option<BestTransaction>>,
28    commands_tx: Sender<BestTransactionsCommand>,
29    stop: Arc<AtomicBool>,
30}
31
32impl BestTransactionsPrewarming {
33    /// Spawns prewarming for `best_txs` and returns a new [`BestTransactions`] iterator.
34    pub(crate) fn new<Txs, Provider>(
35        executor: TaskExecutor,
36        provider: Provider,
37        cache: Option<SavedCache>,
38        parent_hash: B256,
39        evm_env: EvmEnvFor<TempoEvmConfig>,
40        best_txs: Txs,
41    ) -> Self
42    where
43        Txs: BestTransactions<Item = BestTransaction> + Send + 'static,
44        Provider: StateProviderFactory + Clone + 'static,
45    {
46        let (transactions_tx, transactions_rx) = mpsc::channel();
47        let (commands_tx, commands_rx) = mpsc::channel();
48        let stop = Arc::new(AtomicBool::new(false));
49        let prewarm = PrewarmingExecutionContext {
50            provider,
51            parent_hash,
52            cache,
53            evm_env,
54            stop: stop.clone(),
55        };
56
57        let this = Self {
58            transactions_rx,
59            commands_tx: commands_tx.clone(),
60            stop,
61        };
62
63        let prewarm_executor = executor.clone();
64        executor.spawn_blocking_named("builder-prewarm", move || {
65            Self::start_prewarming(
66                prewarm_executor,
67                BestTransactionsPrewarmingContext {
68                    best_txs,
69                    transactions_tx,
70                    commands_rx,
71                    commands_tx,
72                    prewarm,
73                    next_expiring_nonce_offset: 0,
74                },
75            );
76        });
77
78        this
79    }
80
81    /// Runs the coordinator side of prewarming for a payload build.
82    ///
83    /// See [`BestTransactionsPrewarming`] for details.
84    fn start_prewarming<Txs, Provider>(
85        executor: TaskExecutor,
86        mut ctx: BestTransactionsPrewarmingContext<Txs, Provider>,
87    ) where
88        Txs: BestTransactions<Item = BestTransaction>,
89        Provider: StateProviderFactory + Clone + 'static,
90    {
91        let pool = executor.prewarming_pool();
92
93        pool.in_place_scope(|scope| {
94            let prewarm = ctx.prewarm.clone();
95            scope.spawn(move |_| {
96                pool.init::<PrewarmEvmState>(|_| prewarm.evm_for_ctx());
97            });
98
99            let advance = |ctx: &mut BestTransactionsPrewarmingContext<Txs, Provider>| {
100                let Some(tx) = ctx.best_txs.next() else {
101                    let _ = ctx.transactions_tx.send(None);
102                    return;
103                };
104                let expiring_nonce_offset = if tx.transaction.is_expiring_nonce() {
105                    let offset = ctx.next_expiring_nonce_offset;
106                    ctx.next_expiring_nonce_offset += 1;
107                    Some(offset)
108                } else {
109                    None
110                };
111                let _ = ctx.transactions_tx.send(Some(tx.clone()));
112
113                let prewarm = ctx.prewarm.clone();
114                let commands_tx = ctx.commands_tx.clone();
115                scope.spawn(move |_| {
116                    Self::prewarm_transaction(prewarm, tx.clone(), expiring_nonce_offset);
117                    let _ = commands_tx.send(BestTransactionsCommand::Advance);
118                });
119            };
120
121            // Fill the initial batch of transactions to execute and prewarm.
122            //
123            // We schedule 2x the number of threads to make sure that workers are never idle.
124            for _ in 0..pool.current_num_threads() * 2 {
125                advance(&mut ctx);
126            }
127
128            while let Ok(command) = ctx.commands_rx.recv() {
129                match command {
130                    BestTransactionsCommand::Advance => {
131                        advance(&mut ctx);
132                    }
133                    BestTransactionsCommand::Invalid {
134                        invalid,
135                        old_rx,
136                        new_tx,
137                    } => {
138                        ctx.best_txs.mark_invalid(&invalid.tx, invalid.kind);
139                        ctx.transactions_tx = new_tx;
140
141                        for tx in old_rx {
142                            if let Some(tx) = tx
143                                && !is_invalidated_buffered_transaction(&invalid.tx, &tx)
144                            {
145                                let _ = ctx.transactions_tx.send(Some(tx));
146                            }
147                        }
148                    }
149                    BestTransactionsCommand::NoUpdates => {
150                        ctx.best_txs.no_updates();
151                    }
152                    BestTransactionsCommand::SkipBlobs(skip_blobs) => {
153                        ctx.best_txs.set_skip_blobs(skip_blobs);
154                    }
155                    BestTransactionsCommand::Stop { drain_rx } => {
156                        ctx.prewarm.stop();
157                        drop(drain_rx);
158                        return;
159                    }
160                }
161            }
162        });
163
164        pool.clear();
165    }
166
167    fn prewarm_transaction<Provider>(
168        prewarm: PrewarmingExecutionContext<Provider>,
169        tx: BestTransaction,
170        expiring_nonce_offset: Option<usize>,
171    ) where
172        Provider: StateProviderFactory + Clone + 'static,
173    {
174        if prewarm.is_stopped() {
175            return;
176        }
177
178        WorkerPool::with_worker_mut(|worker| {
179            let Some(evm) = worker.get_or_init::<PrewarmEvmState>(|| prewarm.evm_for_ctx()) else {
180                return;
181            };
182
183            let tx_hash = *tx.hash();
184
185            if prewarm.is_stopped() {
186                return;
187            }
188
189            let mut tx_env = tx.transaction.clone_tx_env();
190            if let Some(tempo_tx_env) = tx_env.tempo_tx_env.as_mut() {
191                tempo_tx_env.expiring_nonce_idx = expiring_nonce_offset;
192            }
193
194            if let Err(err) = evm.transact_raw(tx_env) {
195                trace!(
196                    target: "payload_builder",
197                    %err,
198                    ?tx_hash,
199                    "Failed to prewarm transaction by execution"
200                );
201                return;
202            }
203
204            trace!(
205                target: "payload_builder",
206                ?tx_hash,
207                "Prewarmed transaction"
208            );
209        });
210    }
211}
212
213impl Drop for BestTransactionsPrewarming {
214    fn drop(&mut self) {
215        self.stop.store(true, Ordering::Relaxed);
216        // Move buffered transaction cleanup to the prewarm coordinator instead of this builder thread.
217        let (_drain_tx, replacement_rx) = mpsc::channel();
218        let drain_rx = core::mem::replace(&mut self.transactions_rx, replacement_rx);
219        let _ = self
220            .commands_tx
221            .send(BestTransactionsCommand::Stop { drain_rx });
222    }
223}
224
225impl Iterator for BestTransactionsPrewarming {
226    type Item = BestTransaction;
227
228    fn next(&mut self) -> Option<Self::Item> {
229        if let Ok(Some(tx)) = self.transactions_rx.try_recv() {
230            return Some(tx);
231        }
232        self.commands_tx
233            .send(BestTransactionsCommand::Advance)
234            .ok()?;
235        self.transactions_rx.recv().ok().flatten()
236    }
237}
238
239impl BestTransactions for BestTransactionsPrewarming {
240    fn mark_invalid(&mut self, transaction: &Self::Item, kind: InvalidPoolTransactionError) {
241        let (new_tx, new_rx) = mpsc::channel();
242        let old_rx = core::mem::replace(&mut self.transactions_rx, new_rx);
243        let _ = self.commands_tx.send(BestTransactionsCommand::Invalid {
244            invalid: InvalidTransaction {
245                tx: transaction.clone(),
246                kind,
247            },
248            old_rx,
249            new_tx,
250        });
251    }
252
253    fn no_updates(&mut self) {
254        let _ = self.commands_tx.send(BestTransactionsCommand::NoUpdates);
255    }
256
257    fn set_skip_blobs(&mut self, skip_blobs: bool) {
258        let _ = self
259            .commands_tx
260            .send(BestTransactionsCommand::SkipBlobs(skip_blobs));
261    }
262}
263
264/// Context for prewarming best transactions for a payload build.
265struct BestTransactionsPrewarmingContext<Txs, Provider> {
266    best_txs: Txs,
267    transactions_tx: Sender<Option<BestTransaction>>,
268    commands_tx: Sender<BestTransactionsCommand>,
269    commands_rx: Receiver<BestTransactionsCommand>,
270    prewarm: PrewarmingExecutionContext<Provider>,
271    next_expiring_nonce_offset: usize,
272}
273
274/// Context needed to prewarm transaction storage independently of the real builder.
275#[derive(Clone)]
276struct PrewarmingExecutionContext<Provider> {
277    provider: Provider,
278    parent_hash: B256,
279    cache: Option<SavedCache>,
280    evm_env: EvmEnvFor<TempoEvmConfig>,
281    stop: Arc<AtomicBool>,
282}
283
284impl<Provider> PrewarmingExecutionContext<Provider>
285where
286    Provider: StateProviderFactory + Clone + 'static,
287{
288    fn evm_for_ctx(&self) -> PrewarmEvmState {
289        let mut state_provider = match self.provider.state_by_block_hash(self.parent_hash) {
290            Ok(provider) => provider,
291            Err(err) => {
292                trace!(
293                    target: "payload_builder",
294                    %err,
295                    parent_hash = ?self.parent_hash,
296                    "failed to build state provider for transaction prewarming"
297                );
298                return None;
299            }
300        };
301
302        if let Some(cache) = &self.cache {
303            state_provider = Box::new(CachedStateProvider::new_prewarm(
304                state_provider,
305                cache.cache().clone(),
306            ));
307        }
308
309        let state_provider = StateProviderDatabase::new(state_provider);
310        let mut evm_env = self.evm_env.clone();
311        evm_env.cfg_env.disable_nonce_check = true;
312        evm_env.cfg_env.disable_balance_check = true;
313
314        Some(TempoEvm::new(state_provider, evm_env))
315    }
316
317    fn is_stopped(&self) -> bool {
318        self.stop.load(Ordering::Relaxed)
319    }
320
321    fn stop(&self) {
322        self.stop.store(true, Ordering::Relaxed);
323    }
324}
325
326/// Command sent by [`BestTransactionsPrewarming`] consumer.
327#[derive(Debug)]
328enum BestTransactionsCommand {
329    Advance,
330    Invalid {
331        invalid: InvalidTransaction,
332        old_rx: Receiver<Option<BestTransaction>>,
333        new_tx: Sender<Option<BestTransaction>>,
334    },
335    NoUpdates,
336    SkipBlobs(bool),
337    Stop {
338        /// Receiver moved out of the builder thread so queued transactions drain on the coordinator.
339        drain_rx: Receiver<Option<BestTransaction>>,
340    },
341}
342
343/// Invalid transaction encountered during execution.
344#[derive(Debug)]
345struct InvalidTransaction {
346    tx: BestTransaction,
347    kind: InvalidPoolTransactionError,
348}
349
350/// Returns whether the candidate transaction is invalidated by the given invalid transaction.
351fn is_invalidated_buffered_transaction(
352    invalid: &BestTransaction,
353    candidate: &BestTransaction,
354) -> bool {
355    // Skip invalidation for expiring nonce transactions - they are independent
356    // and should not block other expiring nonce txs from the same sender
357    if invalid.transaction.is_expiring_nonce() {
358        return false;
359    }
360
361    if invalid.transaction.is_aa_2d() {
362        candidate
363            .transaction
364            .aa_transaction_id()
365            .zip(invalid.transaction.aa_transaction_id())
366            .is_some_and(|(candidate_id, invalid_id)| candidate_id.seq_id() == invalid_id.seq_id())
367    } else {
368        !candidate.transaction.is_aa_2d()
369            && candidate.transaction.sender() == invalid.transaction.sender()
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use super::*;
376    use alloy_consensus::{BlockHeader, Header, Signed, TxLegacy};
377    use alloy_primitives::{Address, Bytes, Signature, TxKind, U256};
378    use reth_evm::{ConfigureEvm, NextBlockEnvAttributes};
379    use reth_primitives_traits::{
380        Recovered, SealedHeader, transaction::error::InvalidTransactionError,
381    };
382    use reth_storage_api::noop::NoopProvider;
383    use reth_transaction_pool::{
384        TransactionOrigin, ValidPoolTransaction, identifier::TransactionId,
385    };
386    use std::{
387        collections::VecDeque,
388        sync::{Arc, Mutex},
389        thread,
390        time::{Duration, Instant},
391    };
392    use tempo_chainspec::TempoChainSpec;
393    use tempo_evm::{TempoEvmConfig, TempoNextBlockEnvAttributes};
394    use tempo_primitives::{TempoHeader, TempoPrimitives, TempoTxEnvelope};
395    use tempo_transaction_pool::transaction::TempoPooledTransaction;
396
397    #[derive(Debug, Default)]
398    struct TestLog {
399        yielded: usize,
400        empty_polls: usize,
401        invalid: usize,
402        no_updates: usize,
403        skip_blobs: Vec<bool>,
404    }
405
406    struct TestBestTransactions {
407        txs: VecDeque<BestTransaction>,
408        log: Arc<Mutex<TestLog>>,
409    }
410
411    impl TestBestTransactions {
412        fn new(txs: Vec<BestTransaction>, log: Arc<Mutex<TestLog>>) -> Self {
413            Self {
414                txs: txs.into(),
415                log,
416            }
417        }
418    }
419
420    impl Iterator for TestBestTransactions {
421        type Item = BestTransaction;
422
423        fn next(&mut self) -> Option<Self::Item> {
424            let tx = self.txs.pop_front();
425            {
426                let mut log = self.log.lock().unwrap();
427                if tx.is_some() {
428                    log.yielded += 1;
429                } else {
430                    log.empty_polls += 1;
431                }
432            }
433            if tx.is_none() {
434                thread::sleep(Duration::from_millis(1));
435            }
436            tx
437        }
438    }
439
440    impl BestTransactions for TestBestTransactions {
441        fn mark_invalid(&mut self, transaction: &Self::Item, _kind: InvalidPoolTransactionError) {
442            self.log.lock().unwrap().invalid += 1;
443            self.txs
444                .retain(|tx| !is_invalidated_buffered_transaction(transaction, tx));
445        }
446
447        fn no_updates(&mut self) {
448            self.log.lock().unwrap().no_updates += 1;
449        }
450
451        fn set_skip_blobs(&mut self, skip_blobs: bool) {
452            self.log.lock().unwrap().skip_blobs.push(skip_blobs);
453        }
454    }
455
456    fn test_tx(sender: Address, nonce: u64) -> BestTransaction {
457        test_tx_with_gas_limit(sender, nonce, 21_000)
458    }
459
460    fn test_tx_with_gas_limit(sender: Address, nonce: u64, gas_limit: u64) -> BestTransaction {
461        let tx = TxLegacy {
462            chain_id: Some(42431),
463            nonce,
464            gas_price: 20_000_000_000,
465            gas_limit,
466            to: TxKind::Call(Address::random()),
467            value: U256::ZERO,
468            input: Bytes::new(),
469        };
470        let envelope =
471            TempoTxEnvelope::Legacy(Signed::new_unhashed(tx, Signature::test_signature()));
472        let pooled = TempoPooledTransaction::new(Recovered::new_unchecked(envelope, sender));
473        Arc::new(ValidPoolTransaction {
474            transaction_id: TransactionId::new(0u64.into(), nonce),
475            transaction: pooled,
476            propagate: true,
477            timestamp: Instant::now(),
478            origin: TransactionOrigin::External,
479            authority_ids: None,
480        })
481    }
482
483    struct TestPrewarming {
484        prewarming: Option<BestTransactionsPrewarming>,
485        executor: TaskExecutor,
486    }
487
488    impl Drop for TestPrewarming {
489        fn drop(&mut self) {
490            drop(self.prewarming.take());
491            self.executor
492                .spawn_blocking_named("builder-prewarm", || {})
493                .get();
494        }
495    }
496
497    impl std::ops::Deref for TestPrewarming {
498        type Target = BestTransactionsPrewarming;
499
500        fn deref(&self) -> &Self::Target {
501            self.prewarming.as_ref().expect("prewarming exists")
502        }
503    }
504
505    impl std::ops::DerefMut for TestPrewarming {
506        fn deref_mut(&mut self) -> &mut Self::Target {
507            self.prewarming.as_mut().expect("prewarming exists")
508        }
509    }
510
511    fn prewarming(txs: Vec<BestTransaction>, log: Arc<Mutex<TestLog>>) -> TestPrewarming {
512        let executor = TaskExecutor::test();
513        prewarming_with_executor(executor, txs, log)
514    }
515
516    fn prewarming_with_executor(
517        executor: TaskExecutor,
518        txs: Vec<BestTransaction>,
519        log: Arc<Mutex<TestLog>>,
520    ) -> TestPrewarming {
521        let evm_config = TempoEvmConfig::moderato();
522        let provider =
523            NoopProvider::<TempoChainSpec, TempoPrimitives>::new(evm_config.chain_spec().clone());
524        let parent_header = SealedHeader::seal_slow(TempoHeader {
525            inner: Header {
526                number: 0,
527                timestamp: 1,
528                gas_limit: 30_000_000,
529                base_fee_per_gas: Some(1),
530                ..Default::default()
531            },
532            general_gas_limit: 30_000_000,
533            timestamp_millis_part: 0,
534            shared_gas_limit: 0,
535            ..Default::default()
536        });
537        let attributes = TempoNextBlockEnvAttributes {
538            inner: NextBlockEnvAttributes {
539                timestamp: 2,
540                suggested_fee_recipient: Address::ZERO,
541                prev_randao: B256::ZERO,
542                gas_limit: parent_header.gas_limit(),
543                parent_beacon_block_root: None,
544                withdrawals: None,
545                extra_data: Default::default(),
546                slot_number: None,
547            },
548            general_gas_limit: 30_000_000,
549            shared_gas_limit: 0,
550            timestamp_millis_part: 0,
551            consensus_context: None,
552            subblock_fee_recipients: Default::default(),
553        };
554        let evm_env = evm_config
555            .next_evm_env(&parent_header, &attributes)
556            .expect("test next block env");
557        let prewarming = BestTransactionsPrewarming::new(
558            executor.clone(),
559            provider,
560            None,
561            parent_header.hash(),
562            evm_env,
563            TestBestTransactions::new(txs, log),
564        );
565        TestPrewarming {
566            prewarming: Some(prewarming),
567            executor,
568        }
569    }
570
571    fn wait_until(mut condition: impl FnMut() -> bool) {
572        let deadline = Instant::now() + Duration::from_secs(1);
573        while Instant::now() < deadline {
574            if condition() {
575                return;
576            }
577            thread::sleep(Duration::from_millis(5));
578        }
579        assert!(condition(), "condition did not become true before timeout");
580    }
581
582    #[test]
583    fn source_ordering_is_unchanged_when_prewarming_is_enabled() {
584        let sender = Address::random();
585        let txs = vec![test_tx(sender, 0), test_tx(sender, 1), test_tx(sender, 2)];
586        let expected = txs.iter().map(|tx| *tx.hash()).collect::<Vec<_>>();
587        let log = Arc::new(Mutex::new(TestLog::default()));
588
589        let mut prewarming = prewarming(txs, log);
590        let actual = (0..expected.len())
591            .map(|_| *prewarming.next().expect("transaction").hash())
592            .collect::<Vec<_>>();
593
594        assert_eq!(actual, expected);
595    }
596
597    #[test]
598    fn prewarming_eagerly_drains_source_iterator() {
599        let sender = Address::random();
600        let executor = TaskExecutor::test();
601        let txs = (0..executor.prewarming_pool().current_num_threads() * 2 + 4)
602            .map(|nonce| test_tx(sender, nonce as u64))
603            .collect::<Vec<_>>();
604        let expected = txs.iter().map(|tx| *tx.hash()).collect::<Vec<_>>();
605        let log = Arc::new(Mutex::new(TestLog::default()));
606
607        let mut prewarming = prewarming_with_executor(executor, txs, log.clone());
608        wait_until(|| log.lock().unwrap().yielded == expected.len());
609
610        let actual = (0..expected.len())
611            .map(|_| *prewarming.next().expect("transaction").hash())
612            .collect::<Vec<_>>();
613        assert_eq!(actual, expected);
614    }
615
616    #[test]
617    fn empty_source_is_polled_for_eager_advances_and_each_consumer_advance() {
618        let executor = TaskExecutor::test();
619        let eager_advances = executor.prewarming_pool().current_num_threads() * 2;
620        let log = Arc::new(Mutex::new(TestLog::default()));
621        let mut prewarming = prewarming_with_executor(executor, Vec::new(), log.clone());
622
623        wait_until(|| log.lock().unwrap().empty_polls == eager_advances);
624
625        assert!(prewarming.next().is_none());
626        wait_until(|| log.lock().unwrap().empty_polls == eager_advances + 1);
627
628        assert!(prewarming.next().is_none());
629        wait_until(|| log.lock().unwrap().empty_polls == eager_advances + 2);
630    }
631
632    #[test]
633    fn mark_invalid_filters_already_buffered_invalidated_transactions() {
634        let sender = Address::random();
635        let mut sender_nonces = 0..;
636        let tx1 = test_tx(sender, sender_nonces.next().expect("first nonce"));
637        let tx2 = test_tx(sender, sender_nonces.next().expect("second nonce"));
638        let tx3 = test_tx(
639            Address::random(),
640            sender_nonces.next().expect("third nonce"),
641        );
642        let log = Arc::new(Mutex::new(TestLog::default()));
643
644        let mut prewarming = prewarming(vec![tx1.clone(), tx2.clone(), tx3.clone()], log.clone());
645        assert_eq!(
646            prewarming.next().as_ref().map(|tx| tx.hash()),
647            Some(tx1.hash())
648        );
649
650        wait_until(|| log.lock().unwrap().yielded == 3);
651        prewarming.mark_invalid(
652            &tx1,
653            InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
654        );
655
656        let next = prewarming.next().expect("non-invalidated transaction");
657        assert_eq!(next.hash(), tx3.hash());
658        assert_ne!(next.hash(), tx2.hash());
659        wait_until(|| log.lock().unwrap().invalid == 1);
660    }
661
662    #[test]
663    fn commands_are_forwarded_to_source_iterator() {
664        let log = Arc::new(Mutex::new(TestLog::default()));
665        let mut prewarming = prewarming(Vec::new(), log.clone());
666
667        prewarming.no_updates();
668        prewarming.set_skip_blobs(true);
669
670        wait_until(|| {
671            let log = log.lock().unwrap();
672            log.no_updates == 1 && log.skip_blobs == vec![true]
673        });
674    }
675
676    #[test]
677    fn prewarming_does_not_use_shared_worker_state_slot() {
678        let executor = TaskExecutor::test();
679        let pool = executor.prewarming_pool();
680        pool.init::<usize>(|existing| existing.map(|value| *value).unwrap_or(1));
681
682        let sender = Address::random();
683        let txs = vec![test_tx(sender, 0)];
684        let log = Arc::new(Mutex::new(TestLog::default()));
685        let mut prewarming = prewarming_with_executor(executor.clone(), txs, log);
686
687        assert!(prewarming.next().is_some());
688
689        pool.broadcast(pool.current_num_threads(), |worker| {
690            assert_eq!(*worker.get::<usize>(), 1);
691        });
692    }
693}