Skip to main content

tempo_transaction_pool/
maintain.rs

1//! Transaction pool maintenance tasks.
2
3use crate::{
4    RevokedKeys, SpendingLimitUpdates, TempoTransactionPool,
5    metrics::TempoPoolMaintenanceMetrics,
6    paused::{PausedEntry, PausedFeeTokenPool},
7    transaction::TempoPooledTransaction,
8};
9use alloy_consensus::transaction::TxHashRef;
10use alloy_primitives::{
11    Address, TxHash,
12    map::{AddressMap, HashMap, HashSet},
13};
14use alloy_sol_types::SolEvent;
15use futures::StreamExt;
16use itertools::{Either, Itertools};
17use reth_chainspec::ChainSpecProvider;
18use reth_primitives_traits::AlloyBlockHeader;
19use reth_provider::{CanonStateNotification, CanonStateSubscriptions, Chain, HeaderProvider};
20use reth_storage_api::StateProviderFactory;
21use reth_transaction_pool::{PoolTransaction, TransactionPool};
22use std::{
23    collections::{BTreeMap, btree_map::Entry},
24    time::Instant,
25};
26use tempo_chainspec::TempoChainSpec;
27use tempo_contracts::precompiles::{IAccountKeychain, IFeeManager, ITIP20, ITIP403Registry};
28use tempo_precompiles::{
29    ACCOUNT_KEYCHAIN_ADDRESS, TIP_FEE_MANAGER_ADDRESS, TIP403_REGISTRY_ADDRESS,
30};
31use tempo_primitives::{TempoAddressExt, TempoHeader, TempoPrimitives};
32use tracing::{debug, error};
33
34/// Evict transactions this many seconds before they expire to reduce propagation
35/// of near-expiry transactions that are likely to fail validation on peers.
36const EVICTION_BUFFER_SECS: u64 = 3;
37
38/// Aggregated block-level invalidation events for the transaction pool.
39///
40/// Collects all invalidation events from a block into a single structure,
41/// allowing efficient batch processing of pool updates.
42#[derive(Debug, Default)]
43pub struct TempoPoolUpdates {
44    /// Transaction hashes that have expired (valid_before <= tip_timestamp).
45    pub expired_txs: Vec<TxHash>,
46    /// Revoked keychain keys.
47    /// Indexed by account for efficient lookup.
48    pub revoked_keys: RevokedKeys,
49    /// Spending limit changes.
50    /// When a spending limit changes, transactions from that key paying with that token
51    /// may become unexecutable if the new limit is below their value.
52    /// Indexed by account for efficient lookup.
53    pub spending_limit_changes: SpendingLimitUpdates,
54    /// Validator token preference changes: validator to new_token (last-write-wins).
55    /// Uses `AddressMap` to deduplicate by validator, preventing resource amplification
56    /// when a validator emits multiple `ValidatorTokenSet` events in the same block.
57    pub validator_token_changes: AddressMap<Address>,
58    /// User token preference changes.
59    /// When a user changes their fee token preference via `setUserToken()`, pending
60    /// transactions from that user that don't have an explicit fee_token set may now
61    /// resolve to a different token at execution time, causing fee payment failures.
62    /// Uses a set since a user can emit multiple events in the same block; we only need to
63    /// process each user once. No cleanup needed as this is ephemeral per-block data.
64    pub user_token_changes: HashSet<Address>,
65    /// TIP403 blacklist additions: (policy_id, account).
66    pub blacklist_additions: Vec<(u64, Address)>,
67    /// TIP403 whitelist removals: (policy_id, account).
68    pub whitelist_removals: Vec<(u64, Address)>,
69    /// Fee token pause state changes: (token, is_paused).
70    pub pause_events: Vec<(Address, bool)>,
71    /// Tokens whose transfer policy was changed via `changeTransferPolicyId()`.
72    /// Pending transactions using these tokens as fee tokens need to be re-validated
73    /// because the new policy may forbid the fee payer or fee manager.
74    pub transfer_policy_updates: HashSet<Address>,
75    /// Fee token balance changes keyed by token.
76    ///
77    /// We only track the debited `from` account from TIP20 `Transfer` logs because credits to the
78    /// `to` account cannot make an already-admitted transaction newly invalid.
79    pub fee_balance_changes: AddressMap<HashSet<Address>>,
80    /// Spending-limit spends emitted by the account keychain during execution.
81    ///
82    /// We record the exact `(account, key_id, token)` triples emitted by `AccessKeySpend`
83    /// events. During eviction, the pool re-reads the remaining limit from state for these
84    /// triples and compares against pending tx fee costs. This keeps maintenance aligned
85    /// with the runtime's actual spending-limit decrements instead of inferring them from
86    /// the mined transaction body.
87    pub spending_limit_spends: SpendingLimitUpdates,
88}
89
90impl TempoPoolUpdates {
91    /// Creates a new empty `TempoPoolUpdates`.
92    pub fn new() -> Self {
93        Self::default()
94    }
95
96    /// Returns true if there are no updates to process.
97    pub fn is_empty(&self) -> bool {
98        self.expired_txs.is_empty()
99            && self.revoked_keys.is_empty()
100            && self.spending_limit_changes.is_empty()
101            && self.validator_token_changes.is_empty()
102            && self.user_token_changes.is_empty()
103            && self.blacklist_additions.is_empty()
104            && self.whitelist_removals.is_empty()
105            && self.pause_events.is_empty()
106            && self.transfer_policy_updates.is_empty()
107            && self.fee_balance_changes.is_empty()
108            && self.spending_limit_spends.is_empty()
109    }
110
111    /// Extracts pool updates from a committed chain segment.
112    ///
113    /// Parses receipts for relevant events (key revocations, validator token changes,
114    /// blacklist additions, pause events).
115    pub fn from_chain(chain: &Chain<TempoPrimitives>) -> Self {
116        let mut updates = Self::new();
117
118        // Parse events from receipts
119        for log in chain
120            .execution_outcome()
121            .receipts()
122            .iter()
123            .flatten()
124            .flat_map(|receipt| &receipt.logs)
125        {
126            // Key revocations and spending limit changes
127            if log.address == ACCOUNT_KEYCHAIN_ADDRESS {
128                if let Ok(event) = IAccountKeychain::KeyRevoked::decode_log(log) {
129                    updates.revoked_keys.insert(event.account, event.publicKey);
130                } else if let Ok(event) = IAccountKeychain::SpendingLimitUpdated::decode_log(log) {
131                    updates.spending_limit_changes.insert(
132                        event.account,
133                        event.publicKey,
134                        Some(event.token),
135                    );
136                } else if let Ok(event) = IAccountKeychain::AccessKeySpend::decode_log(log) {
137                    updates.spending_limit_spends.insert(
138                        event.account,
139                        event.publicKey,
140                        Some(event.token),
141                    );
142                }
143            }
144            // Validator and user token changes
145            else if log.address == TIP_FEE_MANAGER_ADDRESS {
146                if let Ok(event) = IFeeManager::ValidatorTokenSet::decode_log(log) {
147                    updates
148                        .validator_token_changes
149                        .insert(event.validator, event.token);
150                } else if let Ok(event) = IFeeManager::UserTokenSet::decode_log(log) {
151                    updates.user_token_changes.insert(event.user);
152                }
153            }
154            // TIP403 blacklist additions and whitelist removals
155            else if log.address == TIP403_REGISTRY_ADDRESS {
156                if let Ok(event) = ITIP403Registry::BlacklistUpdated::decode_log(log)
157                    && event.restricted
158                {
159                    updates
160                        .blacklist_additions
161                        .push((event.policyId, event.account));
162                } else if let Ok(event) = ITIP403Registry::WhitelistUpdated::decode_log(log)
163                    && !event.allowed
164                {
165                    updates
166                        .whitelist_removals
167                        .push((event.policyId, event.account));
168                }
169            }
170            // Fee token pause events and balance changes
171            else if log.address.is_tip20() {
172                if let Ok(event) = ITIP20::PauseStateUpdate::decode_log(log) {
173                    updates.pause_events.push((log.address, event.isPaused));
174                } else if ITIP20::TransferPolicyUpdate::decode_log(log).is_ok() {
175                    updates.transfer_policy_updates.insert(log.address);
176                } else if let Ok(event) = ITIP20::Transfer::decode_log(log) {
177                    updates
178                        .fee_balance_changes
179                        .entry(log.address)
180                        .or_default()
181                        .insert(event.from);
182                }
183            }
184        }
185
186        updates
187    }
188
189    /// Returns true if there are any invalidation events that require scanning the pool.
190    pub fn has_invalidation_events(&self) -> bool {
191        !self.revoked_keys.is_empty()
192            || !self.spending_limit_changes.is_empty()
193            || !self.spending_limit_spends.is_empty()
194            || !self.validator_token_changes.is_empty()
195            || !self.user_token_changes.is_empty()
196            || !self.blacklist_additions.is_empty()
197            || !self.whitelist_removals.is_empty()
198            || !self.fee_balance_changes.is_empty()
199    }
200}
201
202/// Tracking state for pool maintenance operations.
203///
204/// Tracks AA transaction expiry (`valid_before` timestamps) for eviction.
205///
206/// Note: Stale entries (transactions no longer in the pool) are cleaned up lazily
207/// when we check `pool.contains()` before eviction. This avoids the overhead of
208/// subscribing to all transaction lifecycle events.
209#[derive(Default)]
210struct TempoPoolState {
211    /// Maps timestamp to transactions that are going to be invalidated at that time (due to `valid_after` or keychain-related expiry).
212    expiry_map: BTreeMap<u64, Vec<TxHash>>,
213    /// Reverse mapping: tx_hash -> valid_before timestamp (for cleanup during drain).
214    tx_to_expiry: HashMap<TxHash, u64>,
215    /// Pool for transactions whose fee token is temporarily paused.
216    paused_pool: PausedFeeTokenPool,
217    /// Tracks pending transaction staleness for DoS mitigation.
218    pending_staleness: PendingStalenessTracker,
219}
220
221impl TempoPoolState {
222    /// Tracks an AA transaction with a `valid_before` timestamp.
223    fn track(&mut self, tx: &TempoPooledTransaction) {
224        let valid_before = tx
225            .inner()
226            .as_aa()
227            .and_then(|tx| tx.tx().valid_before.map(|value| value.get()));
228        let key_expiry = tx.key_expiry();
229
230        let expiry = [valid_before, key_expiry].into_iter().flatten().min();
231
232        if let Some(expiry) = expiry {
233            self.expiry_map.entry(expiry).or_default().push(*tx.hash());
234            self.tx_to_expiry.insert(*tx.hash(), expiry);
235        }
236    }
237
238    /// Removes expiry and key-expiry tracking for a single transaction.
239    fn untrack(&mut self, hash: &TxHash) {
240        if let Some(expiry) = self.tx_to_expiry.remove(hash)
241            && let Entry::Occupied(mut entry) = self.expiry_map.entry(expiry)
242        {
243            entry.get_mut().retain(|h| *h != *hash);
244            if entry.get().is_empty() {
245                entry.remove();
246            }
247        }
248    }
249
250    /// Collects and removes all expired transactions up to the given timestamp.
251    /// Returns the list of expired transaction hashes.
252    fn drain_expired(&mut self, tip_timestamp: u64) -> Vec<TxHash> {
253        let mut expired = Vec::new();
254        while let Some(entry) = self.expiry_map.first_entry()
255            && *entry.key() <= tip_timestamp
256        {
257            let expired_hashes = entry.remove();
258            for tx_hash in &expired_hashes {
259                self.tx_to_expiry.remove(tx_hash);
260            }
261            expired.extend(expired_hashes);
262        }
263        expired
264    }
265}
266
267/// Default interval for pending transaction staleness checks (30 minutes).
268/// Transactions that remain pending across two consecutive snapshots will be evicted.
269const DEFAULT_PENDING_STALENESS_INTERVAL: u64 = 30 * 60;
270
271/// Tracks pending transactions across snapshots to detect stale transactions.
272///
273/// Uses a simple snapshot comparison approach:
274/// - Every interval, take a snapshot of current pending transactions
275/// - Transactions present in both the previous and current snapshot are considered stale
276/// - Stale transactions are evicted since they've been pending for at least one full interval
277#[derive(Debug)]
278struct PendingStalenessTracker {
279    /// Previous snapshot of pending transaction hashes.
280    previous_pending: HashSet<TxHash>,
281    /// Timestamp of the last snapshot.
282    last_snapshot_time: Option<u64>,
283    /// Interval in seconds between staleness checks.
284    interval_secs: u64,
285}
286
287impl PendingStalenessTracker {
288    /// Creates a new tracker with the given check interval.
289    fn new(interval_secs: u64) -> Self {
290        Self {
291            previous_pending: HashSet::default(),
292            last_snapshot_time: None,
293            interval_secs,
294        }
295    }
296
297    /// Returns true if the staleness check interval has elapsed and a snapshot should be taken.
298    fn should_check(&self, now: u64) -> bool {
299        self.last_snapshot_time
300            .is_none_or(|last| now.saturating_sub(last) >= self.interval_secs)
301    }
302
303    /// Checks for stale transactions and updates the snapshot.
304    ///
305    /// Returns transactions that have been pending across two consecutive snapshots
306    /// (i.e., pending for at least one full interval).
307    ///
308    /// Call `should_check` first to avoid collecting the pending set on every block.
309    fn check_and_update(&mut self, current_pending: HashSet<TxHash>, now: u64) -> Vec<TxHash> {
310        let previous_pending = std::mem::take(&mut self.previous_pending);
311
312        // Split the current snapshot into stale transactions to evict and fresh
313        // transactions to track. A transaction is stale if it appears in both
314        // the previous and current pending snapshots.
315        let (stale, next_pending): (Vec<TxHash>, HashSet<TxHash>) =
316            current_pending.into_iter().partition_map(|hash| {
317                if previous_pending.contains(&hash) {
318                    Either::Left(hash)
319                } else {
320                    Either::Right(hash)
321                }
322            });
323
324        self.previous_pending = next_pending;
325        self.last_snapshot_time = Some(now);
326
327        stale
328    }
329}
330
331impl Default for PendingStalenessTracker {
332    fn default() -> Self {
333        Self::new(DEFAULT_PENDING_STALENESS_INTERVAL)
334    }
335}
336
337/// Unified maintenance task for the Tempo transaction pool.
338///
339/// Handles:
340/// - Evicting expired AA transactions (`valid_before <= tip_timestamp`)
341/// - Evicting transactions using expired keychain keys (`AuthorizedKey.expiry <= tip_timestamp`)
342/// - Updating the AA 2D nonce pool from `NonceManager` changes
343/// - Refreshing the AMM liquidity cache from `FeeManager` updates
344/// - Removing transactions signed with revoked keychain keys
345/// - Moving transactions to/from the paused pool when fee tokens are paused/unpaused
346///
347/// Consolidates these operations into a single event loop to avoid multiple tasks
348/// competing for canonical state updates and to minimize contention on pool locks.
349pub async fn maintain_tempo_pool<Client>(pool: TempoTransactionPool<Client>)
350where
351    Client: StateProviderFactory
352        + HeaderProvider<Header = TempoHeader>
353        + ChainSpecProvider<ChainSpec = TempoChainSpec>
354        + CanonStateSubscriptions<Primitives = TempoPrimitives>
355        + 'static,
356{
357    let mut state = TempoPoolState::default();
358    let metrics = TempoPoolMaintenanceMetrics::default();
359
360    // Subscribe to new transactions and chain events
361    let mut new_txs = pool.new_transactions_listener();
362    let mut chain_events = pool.client().canonical_state_stream();
363
364    // Populate expiry tracking with existing transactions to prevent race conditions at start-up
365    let all_txs = pool.all_transactions();
366    for tx in all_txs.pending.iter().chain(all_txs.queued.iter()) {
367        state.track(&tx.transaction);
368    }
369
370    let amm_cache = pool.amm_liquidity_cache();
371
372    loop {
373        tokio::select! {
374            // Track new transactions for expiry (valid_before and key expiry)
375            tx_event = new_txs.recv() => {
376                let Some(tx_event) = tx_event else {
377                    break;
378                };
379
380                state.track(&tx_event.transaction.transaction);
381            }
382
383            // Process all maintenance operations on new block commit or reorg
384            Some(event) = chain_events.next() => {
385                let new = match event {
386                    CanonStateNotification::Reorg { old: _, new } => {
387                        // Repopulate AMM liquidity cache from the new canonical chain
388                        // to invalidate stale entries from orphaned blocks.
389                        if let Err(err) = amm_cache.repopulate(pool.client()) {
390                            error!(target: "txpool", ?err, "AMM liquidity cache repopulate after reorg failed");
391                        }
392
393                        new
394                    }
395                    CanonStateNotification::Commit { new } => new,
396                };
397
398                let block_update_start = Instant::now();
399
400                let tip = &new;
401                let bundle_state = tip.execution_outcome().state().state();
402                let tip_timestamp = tip.tip().header().timestamp();
403
404                // 1. Collect all block-level invalidation events
405                let mut updates = TempoPoolUpdates::from_chain(tip);
406
407                // Remove expiry tracking for mined transactions.
408                tip.blocks_iter()
409                    .flat_map(|block| block.body().transactions())
410                    .for_each(|tx| {
411                    state.untrack(tx.tx_hash())
412                });
413
414                // Evict transactions slightly before they expire to prevent
415                // broadcasting near-expiry txs that peers would reject.
416                let max_expiry = tip_timestamp.saturating_add(EVICTION_BUFFER_SECS);
417
418                // Add expired transactions (from local tracking state)
419                let expired = state.drain_expired(max_expiry);
420                updates.expired_txs = expired.into_iter().filter(|h| pool.contains(h)).collect();
421
422                // 2. Evict expired AA transactions
423                let expired_start = Instant::now();
424                let expired_count = updates.expired_txs.len();
425                if expired_count > 0 {
426                    debug!(
427                        target: "txpool",
428                        count = expired_count,
429                        tip_timestamp,
430                        "Evicting expired AA transactions (valid_before)"
431                    );
432                    pool.remove_transactions(updates.expired_txs.clone());
433                    metrics.expired_transactions_evicted.increment(expired_count as u64);
434                }
435                metrics.expired_eviction_duration_seconds.record(expired_start.elapsed());
436
437                // 3. Handle fee token pause/unpause events
438                let pause_start = Instant::now();
439
440                // Collect pause tokens that need pool scanning.
441                // For pause events, we need to scan the pool. For unpause events, we
442                // only need to check the paused_pool (O(1) lookup by token).
443                let pause_tokens: Vec<Address> = updates
444                    .pause_events
445                    .iter()
446                    .filter_map(|(token, is_paused)| is_paused.then_some(*token))
447                    .collect();
448
449                // Process pause events: fetch pool transactions once for all pause tokens.
450                // This avoids the O(pause_events * pool_size) cost of fetching per event.
451                if !pause_tokens.is_empty() {
452                    let all_txs = pool.all_transactions();
453
454                    // Group transactions by fee token for efficient batch processing.
455                    // This single pass over all transactions handles all pause events.
456                    let mut by_token: AddressMap<Vec<TxHash>> = AddressMap::default();
457                    for tx in all_txs.pending.iter().chain(all_txs.queued.iter()) {
458                        if let Some(fee_token) = tx.transaction.inner().fee_token() {
459                            by_token.entry(fee_token).or_default().push(*tx.hash());
460                        }
461                    }
462
463                    // Process each pause token
464                    for token in pause_tokens {
465                        let Some(hashes_to_pause) = by_token.remove(&token) else {
466                            // No transactions use this fee token - skip
467                            continue;
468                        };
469
470                        let removed_txs = pool.remove_transactions(hashes_to_pause);
471                        let count = removed_txs.len();
472
473                        if count > 0 {
474                            // Clean up expiry tracking for paused txs
475                            for tx in &removed_txs {
476                                state.untrack(tx.hash());
477                            }
478
479                            let entries: Vec<_> = removed_txs
480                                .into_iter()
481                                .map(|tx| {
482                                    let valid_before = tx
483                                        .transaction
484                                        .inner()
485                                        .as_aa()
486                                        .and_then(|aa| aa.tx().valid_before.map(|value| value.get()));
487                                    PausedEntry { tx, valid_before }
488                                })
489                                .collect();
490
491                            let cap_evicted = state.paused_pool.insert_batch(token, entries);
492                            metrics.transactions_paused.increment(count as u64);
493                            if cap_evicted > 0 {
494                                metrics.paused_pool_cap_evicted.increment(cap_evicted as u64);
495                                debug!(
496                                    target: "txpool",
497                                    cap_evicted,
498                                    "Evicted oldest paused transactions due to global cap"
499                                );
500                            }
501                            debug!(
502                                target: "txpool",
503                                %token,
504                                count,
505                                "Moved transactions to paused pool (fee token paused)"
506                            );
507                        }
508                    }
509                }
510
511                // Process unpause events: O(1) lookup per token in paused_pool
512                for (token, is_paused) in &updates.pause_events {
513                    if *is_paused {
514                        continue; // Already handled above
515                    }
516
517                    // Unpause: drain from paused pool and re-add to main pool
518                    let paused_entries = state.paused_pool.drain_token(token);
519                    if !paused_entries.is_empty() {
520                        let count = paused_entries.len();
521                        metrics.transactions_unpaused.increment(count as u64);
522                        let pool_clone = pool.clone();
523                        let token = *token;
524                        tokio::spawn(async move {
525                            let txs: Vec<_> = paused_entries
526                                .into_iter()
527                                .map(|e| e.tx.transaction.clone())
528                                .collect();
529
530                            let results = pool_clone
531                                .add_external_transactions(txs)
532                                .await;
533
534                            let success = results.iter().filter(|r| r.is_ok()).count();
535                            debug!(
536                                target: "txpool",
537                                %token,
538                                total = count,
539                                success,
540                                "Restored transactions from paused pool (fee token unpaused)"
541                            );
542                        });
543                    }
544                }
545
546                // 4. Evict expired transactions from the paused pool
547                let paused_expired = state.paused_pool.evict_expired(tip_timestamp);
548                let paused_timed_out = state.paused_pool.evict_timed_out();
549                let total_paused_evicted = paused_expired + paused_timed_out;
550                if total_paused_evicted > 0 {
551                    debug!(
552                        target: "txpool",
553                        count = total_paused_evicted,
554                        tip_timestamp,
555                        "Evicted expired transactions from paused pool"
556                    );
557                }
558
559                // 5. Evict revoked keys and spending limit updates from paused pool
560                if !updates.revoked_keys.is_empty()
561                    || !updates.spending_limit_changes.is_empty()
562                    || !updates.spending_limit_spends.is_empty()
563                {
564                    state.paused_pool.evict_invalidated(
565                        &updates.revoked_keys,
566                        &updates.spending_limit_changes,
567                        &updates.spending_limit_spends,
568                    );
569                }
570                metrics.pause_events_duration_seconds.record(pause_start.elapsed());
571
572                // 5b. Handle transfer policy updates
573                // When a token's transfer policy changes, pending transactions using that
574                // token may become invalid under the new policy. We remove them and re-add
575                // so they go through full validation against the updated policy.
576                if !updates.transfer_policy_updates.is_empty() {
577                    let all_txs = pool.all_transactions();
578                    let hashes: Vec<TxHash> = all_txs
579                        .pending
580                        .iter()
581                        .chain(all_txs.queued.iter())
582                        .filter(|tx| {
583                            tx.transaction
584                                .resolved_fee_token()
585                                .is_some_and(|t| updates.transfer_policy_updates.contains(&t))
586                        })
587                        .map(|tx| *tx.hash())
588                        .collect();
589
590                    if !hashes.is_empty() {
591                        let removed_txs = pool.remove_transactions(hashes);
592                        let count = removed_txs.len();
593
594                        for tx in &removed_txs {
595                            state.untrack(tx.hash());
596                        }
597
598                        metrics
599                            .transfer_policy_revalidated
600                            .increment(count as u64);
601
602                        let pool_clone = pool.clone();
603                        tokio::spawn(async move {
604                            let txs: Vec<_> = removed_txs
605                                .into_iter()
606                                .map(|tx| (tx.origin, tx.transaction.clone()))
607                                .collect();
608
609                            let results =
610                                pool_clone.add_transactions_with_origins(txs).await;
611
612                            let success =
613                                results.iter().filter(|r| r.is_ok()).count();
614                            debug!(
615                                target: "txpool",
616                                total = count,
617                                success,
618                                "Re-validated transactions after transfer policy update"
619                            );
620                        });
621                    }
622                }
623
624                // 6. Update 2D nonce pool (also removes included expiring nonce txs
625                // via slot changes on the nonce precompile)
626                let nonce_pool_start = Instant::now();
627                pool.notify_aa_pool_on_state_updates(bundle_state);
628                metrics.nonce_pool_update_duration_seconds.record(nonce_pool_start.elapsed());
629
630                // 7. Update AMM liquidity cache (must happen before validator token eviction)
631                let amm_start = Instant::now();
632                amm_cache.on_new_state(tip.execution_outcome());
633                if let Err(err) = amm_cache.on_new_blocks(tip.blocks_iter().map(|block| block.sealed_header()), pool.client()) {
634                    error!(target: "txpool", ?err, "AMM liquidity cache update failed");
635                }
636                metrics.amm_cache_update_duration_seconds.record(amm_start.elapsed());
637
638                // 8. Evict invalidated transactions in a single pool scan
639                // This checks revoked keys, spending limit changes, validator token changes,
640                // blacklist additions, and whitelist removals together to avoid scanning
641                // all transactions multiple times per block.
642                if updates.has_invalidation_events() {
643                    let invalidation_start = Instant::now();
644                    debug!(
645                        target: "txpool",
646                        revoked_keys = updates.revoked_keys.len(),
647                        spending_limit_changes = updates.spending_limit_changes.len(),
648                        spending_limit_spends = updates.spending_limit_spends.len(),
649                        validator_token_changes = updates.validator_token_changes.len(),
650                        user_token_changes = updates.user_token_changes.len(),
651                        blacklist_additions = updates.blacklist_additions.len(),
652                        whitelist_removals = updates.whitelist_removals.len(),
653                        "Processing transaction invalidation events"
654                    );
655                    let evicted = pool.evict_invalidated_transactions(&updates);
656                    for hash in &evicted {
657                        state.untrack(hash);
658                    }
659                    metrics.transactions_invalidated.increment(evicted.len() as u64);
660                    metrics
661                        .invalidation_eviction_duration_seconds
662                        .record(invalidation_start.elapsed());
663                }
664
665                // 9. Evict stale pending transactions (must happen after AA pool promotions in step 6)
666                // Only runs once per interval (~30 min) to avoid overhead on every block.
667                // Transactions pending across two consecutive snapshots are considered stale.
668                if state.pending_staleness.should_check(tip_timestamp) {
669                    let current_pending: HashSet<TxHash> =
670                        pool.pending_transactions().iter().map(|tx| *tx.hash()).collect();
671                    let stale_to_evict =
672                        state.pending_staleness.check_and_update(current_pending, tip_timestamp);
673
674                    if !stale_to_evict.is_empty() {
675                        debug!(
676                            target: "txpool",
677                            count = stale_to_evict.len(),
678                            tip_timestamp,
679                            "Evicting stale pending transactions"
680                        );
681                        // Clean up expiry tracking for stale txs to prevent orphaned entries
682                        for hash in &stale_to_evict {
683                            state.untrack(hash);
684                        }
685                        pool.remove_transactions(stale_to_evict);
686                    }
687                }
688
689                // Record total block update duration
690                metrics.block_update_duration_seconds.record(block_update_start.elapsed());
691            }
692        }
693    }
694}
695
696#[cfg(test)]
697mod tests {
698    use super::*;
699    use crate::test_utils::TxBuilder;
700    use alloy_primitives::{Address, TxHash};
701    use reth_primitives_traits::RecoveredBlock;
702    use std::sync::Arc;
703    use tempo_primitives::{Block, BlockBody, TempoHeader, TempoTxEnvelope};
704
705    mod pending_staleness_tracker_tests {
706        use super::*;
707
708        #[test]
709        fn no_eviction_on_first_snapshot() {
710            let mut tracker = PendingStalenessTracker::new(100);
711            let tx1 = TxHash::random();
712
713            // First snapshot should not evict anything (no previous snapshot to compare)
714            let stale = tracker.check_and_update([tx1].into_iter().collect(), 100);
715            assert!(stale.is_empty());
716            assert!(tracker.previous_pending.contains(&tx1));
717        }
718
719        #[test]
720        fn evicts_transactions_present_in_both_snapshots() {
721            let mut tracker = PendingStalenessTracker::new(100);
722            let tx_stale = TxHash::random();
723            let tx_new = TxHash::random();
724
725            // First snapshot at t=0
726            tracker.check_and_update([tx_stale].into_iter().collect(), 0);
727
728            // Second snapshot at t=100: tx_stale still pending, tx_new is new
729            let stale = tracker.check_and_update([tx_stale, tx_new].into_iter().collect(), 100);
730
731            // tx_stale was in both snapshots -> evicted
732            assert_eq!(stale.len(), 1);
733            assert!(stale.contains(&tx_stale));
734
735            // tx_new should be tracked for the next snapshot
736            assert!(tracker.previous_pending.contains(&tx_new));
737            // tx_stale should NOT be in the snapshot (it was evicted)
738            assert!(!tracker.previous_pending.contains(&tx_stale));
739        }
740
741        #[test]
742        fn should_check_returns_false_before_interval_elapsed() {
743            let mut tracker = PendingStalenessTracker::new(100);
744            let tx = TxHash::random();
745
746            // First snapshot at t=0
747            assert!(tracker.should_check(0));
748            tracker.check_and_update([tx].into_iter().collect(), 0);
749
750            // At t=50 (before interval elapsed) - should_check returns false
751            assert!(!tracker.should_check(50));
752            assert_eq!(tracker.last_snapshot_time, Some(0));
753
754            // At t=100 (interval elapsed) - should_check returns true
755            assert!(tracker.should_check(100));
756        }
757
758        #[test]
759        fn removes_transactions_no_longer_pending_from_snapshot() {
760            let mut tracker = PendingStalenessTracker::new(100);
761            let tx1 = TxHash::random();
762            let tx2 = TxHash::random();
763
764            // First snapshot with both txs at t=0
765            tracker.check_and_update([tx1, tx2].into_iter().collect(), 0);
766            assert_eq!(tracker.previous_pending.len(), 2);
767
768            // Second snapshot at t=100: only tx1 still pending
769            // tx1 was in both -> stale, tx2 not in current -> removed from tracking
770            let stale = tracker.check_and_update([tx1].into_iter().collect(), 100);
771            assert_eq!(stale.len(), 1);
772            assert!(stale.contains(&tx1));
773
774            // Neither should be in the snapshot now
775            assert!(tracker.previous_pending.is_empty());
776        }
777    }
778
779    #[test]
780    fn test_remove_mined() {
781        let mut state = TempoPoolState::default();
782        let hash_a = TxHash::random();
783        let hash_b = TxHash::random();
784        let hash_unknown = TxHash::random();
785
786        // Track two txs at the same valid_before
787        state.expiry_map.entry(1000).or_default().push(hash_a);
788        state.tx_to_expiry.insert(hash_a, 1000);
789        state.expiry_map.entry(1000).or_default().push(hash_b);
790        state.tx_to_expiry.insert(hash_b, 1000);
791
792        // Mine hash_a and an unknown hash
793        state.untrack(&hash_a);
794        state.untrack(&hash_unknown);
795
796        // hash_a removed from both maps
797        assert!(!state.tx_to_expiry.contains_key(&hash_a));
798        assert_eq!(state.expiry_map[&1000], vec![hash_b]);
799
800        // Mine hash_b should remove the expiry_map entry entirely
801        state.untrack(&hash_b);
802        assert!(!state.tx_to_expiry.contains_key(&hash_b));
803        assert!(!state.expiry_map.contains_key(&1000));
804    }
805
806    fn create_test_chain(
807        blocks: Vec<reth_primitives_traits::RecoveredBlock<Block>>,
808    ) -> Arc<Chain<TempoPrimitives>> {
809        create_test_chain_with_receipts(blocks, Vec::new())
810    }
811
812    fn create_test_chain_with_receipts(
813        blocks: Vec<reth_primitives_traits::RecoveredBlock<Block>>,
814        receipts: Vec<Vec<tempo_primitives::TempoReceipt>>,
815    ) -> Arc<Chain<TempoPrimitives>> {
816        use reth_provider::{Chain, ExecutionOutcome};
817
818        Arc::new(Chain::new(
819            blocks,
820            ExecutionOutcome {
821                receipts,
822                ..Default::default()
823            },
824            Default::default(),
825        ))
826    }
827
828    /// Helper to create a recovered block containing the given transactions.
829    fn create_block_with_txs(
830        block_number: u64,
831        transactions: Vec<TempoTxEnvelope>,
832        senders: Vec<Address>,
833    ) -> RecoveredBlock<Block> {
834        let header = TempoHeader {
835            inner: alloy_consensus::Header {
836                number: block_number,
837                ..Default::default()
838            },
839            ..Default::default()
840        };
841        let body = BlockBody {
842            transactions,
843            ..Default::default()
844        };
845        let block = Block::new(header, body);
846        RecoveredBlock::new_unhashed(block, senders)
847    }
848
849    /// Helper to extract a TempoTxEnvelope from a TempoPooledTransaction.
850    fn extract_envelope(tx: &crate::transaction::TempoPooledTransaction) -> TempoTxEnvelope {
851        tx.inner().clone().into_inner()
852    }
853
854    mod from_chain_spending_limit_spends {
855        use super::*;
856        use alloy_primitives::{IntoLogData, Log, U256};
857        use alloy_signer_local::PrivateKeySigner;
858        use tempo_primitives::{TempoReceipt, TempoTxType};
859
860        /// Verify from_chain uses AccessKeySpend logs so it can track the actually spent token
861        /// even when it differs from the mined tx's fee token.
862        #[test]
863        fn extracts_access_key_spend_events() {
864            let user_address = Address::random();
865            let access_key_signer = PrivateKeySigner::random();
866            let key_id = access_key_signer.address();
867            let fee_token = Address::random();
868            let spent_token = Address::random();
869
870            let keychain_tx = TxBuilder::aa(user_address)
871                .fee_token(fee_token)
872                .build_keychain(user_address, &access_key_signer);
873            let envelope = extract_envelope(&keychain_tx);
874
875            let spend_log = alloy_primitives::Log::new_from_event_unchecked(
876                ACCOUNT_KEYCHAIN_ADDRESS,
877                IAccountKeychain::AccessKeySpend {
878                    account: user_address,
879                    publicKey: key_id,
880                    token: spent_token,
881                    amount: U256::from(25),
882                    remainingLimit: U256::from(75),
883                },
884            )
885            .reserialize();
886            let receipt = tempo_primitives::TempoReceipt {
887                tx_type: tempo_primitives::TempoTxType::AA,
888                success: true,
889                cumulative_gas_used: 1,
890                logs: vec![spend_log],
891            };
892
893            let block = create_block_with_txs(1, vec![envelope], vec![user_address]);
894            let chain = create_test_chain_with_receipts(vec![block], vec![vec![receipt]]);
895
896            let updates = TempoPoolUpdates::from_chain(&chain);
897
898            assert!(
899                updates
900                    .spending_limit_spends
901                    .contains(user_address, key_id, spent_token),
902                "Should contain the AccessKeySpend event's (account, key_id, token)"
903            );
904            assert!(
905                !updates
906                    .spending_limit_spends
907                    .contains(user_address, key_id, fee_token),
908                "Should not infer spends from the tx fee token"
909            );
910            assert_eq!(updates.spending_limit_spends.len(), 1);
911        }
912
913        /// The pool should only track actual AccessKeySpend events, not infer spends from the
914        /// mined transaction body.
915        #[test]
916        fn ignores_keychain_transactions_without_access_key_spend_logs() {
917            let user_address = Address::random();
918            let access_key_signer = PrivateKeySigner::random();
919            let fee_token = Address::random();
920
921            let keychain_tx = TxBuilder::aa(user_address)
922                .fee_token(fee_token)
923                .build_keychain(user_address, &access_key_signer);
924            let envelope = extract_envelope(&keychain_tx);
925
926            let block = create_block_with_txs(1, vec![envelope], vec![user_address]);
927            let chain = create_test_chain(vec![block]);
928
929            let updates = TempoPoolUpdates::from_chain(&chain);
930            assert!(updates.spending_limit_spends.is_empty());
931        }
932
933        /// Non-keychain AA txs should NOT produce spending limit spends.
934        #[test]
935        fn ignores_non_keychain_aa_transactions() {
936            let sender = Address::random();
937            let tx = TxBuilder::aa(sender).fee_token(Address::random()).build();
938            let envelope = extract_envelope(&tx);
939
940            let block = create_block_with_txs(1, vec![envelope], vec![sender]);
941            let chain = create_test_chain(vec![block]);
942
943            let updates = TempoPoolUpdates::from_chain(&chain);
944            assert!(updates.spending_limit_spends.is_empty());
945        }
946
947        /// EIP-1559 txs should NOT produce spending limit spends.
948        #[test]
949        fn ignores_eip1559_transactions() {
950            let sender = Address::random();
951            let tx = TxBuilder::eip1559(Address::random()).build_eip1559();
952            let envelope = extract_envelope(&tx);
953
954            let block = create_block_with_txs(1, vec![envelope], vec![sender]);
955            let chain = create_test_chain(vec![block]);
956
957            let updates = TempoPoolUpdates::from_chain(&chain);
958            assert!(updates.spending_limit_spends.is_empty());
959        }
960
961        /// has_invalidation_events returns true when spending_limit_spends is non-empty.
962        #[test]
963        fn has_invalidation_events_includes_spending_limit_spends() {
964            let mut updates = TempoPoolUpdates::new();
965            assert!(!updates.has_invalidation_events());
966
967            updates.spending_limit_spends.insert(
968                Address::random(),
969                Address::random(),
970                Some(Address::random()),
971            );
972            assert!(updates.has_invalidation_events());
973        }
974
975        #[test]
976        fn extracts_fee_balance_changes_from_tip20_transfer_logs() {
977            let fee_token = tempo_precompiles::PATH_USD_ADDRESS;
978            let from = Address::random();
979            let to = Address::random();
980            let amount = U256::from(42_u64);
981            let log_data = ITIP20::Transfer { from, to, amount }.into_log_data();
982            let log =
983                Log::new_unchecked(fee_token, log_data.topics().to_vec(), log_data.data.clone());
984            let receipt = TempoReceipt {
985                tx_type: TempoTxType::Legacy,
986                success: true,
987                cumulative_gas_used: 21_000,
988                logs: vec![log],
989            };
990
991            let block = create_block_with_txs(1, vec![], vec![]);
992            let chain = create_test_chain_with_receipts(vec![block], vec![vec![receipt]]);
993            let updates = TempoPoolUpdates::from_chain(&chain);
994
995            assert!(
996                updates
997                    .fee_balance_changes
998                    .get(&fee_token)
999                    .is_some_and(|accounts| accounts.len() == 1 && accounts.contains(&from)),
1000                "TIP20 transfer logs should only mark the debited sender as balance-changed"
1001            );
1002            assert!(updates.has_invalidation_events());
1003        }
1004
1005        /// TransferPolicyUpdate events are parsed from TIP20 token logs.
1006        #[test]
1007        fn extracts_transfer_policy_updates() {
1008            let fee_token = tempo_precompiles::PATH_USD_ADDRESS;
1009            let updater = Address::random();
1010            let new_policy_id = 42u64;
1011            let log_data = ITIP20::TransferPolicyUpdate {
1012                updater,
1013                newPolicyId: new_policy_id,
1014            }
1015            .into_log_data();
1016            let log =
1017                Log::new_unchecked(fee_token, log_data.topics().to_vec(), log_data.data.clone());
1018            let receipt = TempoReceipt {
1019                tx_type: TempoTxType::Legacy,
1020                success: true,
1021                cumulative_gas_used: 21_000,
1022                logs: vec![log],
1023            };
1024
1025            let block = create_block_with_txs(1, vec![], vec![]);
1026            let chain = create_test_chain_with_receipts(vec![block], vec![vec![receipt]]);
1027            let updates = TempoPoolUpdates::from_chain(&chain);
1028
1029            assert!(
1030                updates.transfer_policy_updates.contains(&fee_token),
1031                "TransferPolicyUpdate should be tracked by token address"
1032            );
1033        }
1034
1035        /// Duplicate TransferPolicyUpdate events for the same token are deduplicated.
1036        #[test]
1037        fn transfer_policy_updates_deduplicates_by_token() {
1038            let fee_token = tempo_precompiles::PATH_USD_ADDRESS;
1039
1040            let log_data_1 = ITIP20::TransferPolicyUpdate {
1041                updater: Address::random(),
1042                newPolicyId: 1,
1043            }
1044            .into_log_data();
1045            let log_data_2 = ITIP20::TransferPolicyUpdate {
1046                updater: Address::random(),
1047                newPolicyId: 2,
1048            }
1049            .into_log_data();
1050            let log1 = Log::new_unchecked(
1051                fee_token,
1052                log_data_1.topics().to_vec(),
1053                log_data_1.data.clone(),
1054            );
1055            let log2 = Log::new_unchecked(
1056                fee_token,
1057                log_data_2.topics().to_vec(),
1058                log_data_2.data.clone(),
1059            );
1060            let receipt = TempoReceipt {
1061                tx_type: TempoTxType::Legacy,
1062                success: true,
1063                cumulative_gas_used: 21_000,
1064                logs: vec![log1, log2],
1065            };
1066
1067            let block = create_block_with_txs(1, vec![], vec![]);
1068            let chain = create_test_chain_with_receipts(vec![block], vec![vec![receipt]]);
1069            let updates = TempoPoolUpdates::from_chain(&chain);
1070
1071            assert_eq!(
1072                updates.transfer_policy_updates.len(),
1073                1,
1074                "duplicate policy updates for the same token should be deduplicated"
1075            );
1076        }
1077
1078        /// Duplicate validator token changes must be deduplicated (last-write-wins).
1079        #[test]
1080        fn validator_token_changes_deduplicates_by_validator() {
1081            let validator = Address::random();
1082            let token_a = Address::random();
1083            let token_b = Address::random();
1084
1085            let mut updates = TempoPoolUpdates::new();
1086            updates.validator_token_changes.insert(validator, token_a);
1087            updates.validator_token_changes.insert(validator, token_b);
1088
1089            assert_eq!(
1090                updates.validator_token_changes.len(),
1091                1,
1092                "duplicate validator entries must be deduplicated"
1093            );
1094            assert_eq!(
1095                updates.validator_token_changes.get(&validator).copied(),
1096                Some(token_b),
1097                "last-write-wins: second token should overwrite the first"
1098            );
1099        }
1100    }
1101}