tempo_transaction_pool/
maintain.rs

1//! Transaction pool maintenance tasks.
2
3use crate::{TempoTransactionPool, transaction::TempoPooledTransaction};
4use alloy_primitives::TxHash;
5use futures::StreamExt;
6use reth_chainspec::ChainSpecProvider;
7use reth_primitives_traits::AlloyBlockHeader;
8use reth_provider::{CanonStateNotification, CanonStateSubscriptions};
9use reth_storage_api::StateProviderFactory;
10use reth_transaction_pool::TransactionPool;
11use std::collections::BTreeMap;
12use tempo_chainspec::TempoChainSpec;
13use tempo_primitives::{AASigned, TempoPrimitives};
14use tracing::{debug, error};
15
16/// Spawns a background task that evicts expired AA transactions.
17///
18/// - Listens for new blocks to get chain timestamp
19/// - Listens for new transactions to track `valid_before` timestamps
20/// - Evicts transactions when `valid_before <= tip_timestamp`
21pub async fn evict_expired_aa_txs<P, C>(pool: P, client: C)
22where
23    P: TransactionPool<Transaction = TempoPooledTransaction> + 'static,
24    C: CanonStateSubscriptions + 'static,
25{
26    // Helper to track AA transactions with `valid_before` timestamps
27    let track_expiry = |map: &mut BTreeMap<u64, Vec<TxHash>>, maybe_aa_tx: Option<&AASigned>| {
28        if let Some(aa_tx) = maybe_aa_tx
29            && let Some(valid_before) = aa_tx.tx().valid_before
30        {
31            let hash = *aa_tx.hash();
32            map.entry(valid_before).or_default().push(hash);
33        }
34    };
35
36    // Track valid_before timestamp -> Vec<TxHash>
37    let mut expiry_map: BTreeMap<u64, Vec<TxHash>> = BTreeMap::new();
38
39    // Small delay to allow backup tasks to initialize
40    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
41
42    // Subscribe to new transactions and blocks
43    let mut new_txs = pool.new_transactions_listener();
44    let mut chain_events = client.canonical_state_stream();
45
46    // Populate expiry map to prevent race condition at start-up
47    pool.all_transactions()
48        .all()
49        .for_each(|tx| track_expiry(&mut expiry_map, tx.inner().as_aa()));
50
51    loop {
52        tokio::select! {
53            // Update cache when a txs are added to the mempool
54            tx_event = new_txs.recv() => {
55                let Some(tx_event) = tx_event else {
56                    break;
57                };
58
59                // Check if it's an AA tx with `valid_before`
60                let tx = &tx_event.transaction.transaction;
61                track_expiry(&mut expiry_map, tx.inner().as_aa());
62            }
63
64            // Check for expired txs when a new block is committed
65            Some(event) = chain_events.next() => {
66                let CanonStateNotification::Commit { new } = event else {
67                    continue;
68                };
69
70                let tip_timestamp = new.tip().header().timestamp();
71
72                // Gather expired tx hashes and evict them
73                let mut to_remove = Vec::new();
74                while let Some(entry) = expiry_map.first_entry() && *entry.key() <= tip_timestamp {
75                    to_remove.extend(entry.remove());
76                }
77
78                if !to_remove.is_empty() {
79                    debug!(
80                        target: "txpool",
81                        count = to_remove.len(),
82                        tip_timestamp,
83                        "Evicting expired AA transactions"
84                    );
85                    // Note: txs already mined or evicted by other means are ignored
86                    pool.remove_transactions(to_remove);
87                }
88            }
89        }
90    }
91}
92
93/// An endless future that maintains the [`TempoTransactionPool`] 2d nonce pool based on the storage changes of the `NonceManager` precompile.
94///
95/// The `NonceManager` contains
96///
97/// ```solidity
98///  mapping(address => mapping(uint256 => uint64)) public nonces
99/// ```
100///
101/// where each slot tracks the current nonce for a nonce key assigned to the transaction.
102/// The next executable nonce is the current value of in the contract's state.
103pub async fn maintain_2d_nonce_pool<Client>(pool: TempoTransactionPool<Client>)
104where
105    Client: StateProviderFactory
106        + ChainSpecProvider<ChainSpec = TempoChainSpec>
107        + CanonStateSubscriptions<Primitives = TempoPrimitives>
108        + 'static,
109{
110    let mut events = pool.client().canonical_state_stream();
111    while let Some(notification) = events.next().await {
112        pool.notify_aa_pool_on_state_updates(
113            notification.committed().execution_outcome().state().state(),
114        );
115    }
116}
117
118/// An endless future that updates the [`crate::amm::AmmLiquidityCache`] based
119/// on the storage changes of the `FeeManager` precompile.
120pub async fn maintain_amm_cache<Client>(pool: TempoTransactionPool<Client>)
121where
122    Client: StateProviderFactory
123        + ChainSpecProvider<ChainSpec = TempoChainSpec>
124        + CanonStateSubscriptions<Primitives = TempoPrimitives>
125        + 'static,
126{
127    let amm_cache = pool.amm_liquidity_cache();
128    let mut events = pool.client().canonical_state_stream();
129
130    while let Some(notification) = events.next().await {
131        let tip = notification.committed();
132
133        amm_cache.on_new_state(tip.execution_outcome());
134        for block in tip.blocks_iter() {
135            if let Err(err) = amm_cache.on_new_block(block.sealed_header(), pool.client()) {
136                error!(target: "txpool", ?err, "AMM liquidity cache update failed");
137            }
138        }
139    }
140}