Skip to main content

tempo_transaction_pool/
maintain.rs

1//! Transaction pool maintenance tasks.
2
3use crate::{
4    RevokedKeys, SpendingLimitUpdates, TempoTransactionPool,
5    metrics::TempoPoolMaintenanceMetrics,
6    paused::{PausedEntry, PausedFeeTokenPool},
7    transaction::TempoPooledTransaction,
8};
9use alloy_primitives::{
10    Address, B256, Log, TxHash,
11    map::{AddressMap, AddressSet, B256Map, B256Set},
12};
13use alloy_sol_types::SolEvent;
14use futures::StreamExt;
15use itertools::{Either, Itertools};
16use reth_chainspec::ChainSpecProvider;
17use reth_primitives_traits::AlloyBlockHeader;
18use reth_provider::{CanonStateNotification, CanonStateSubscriptions, Chain, HeaderProvider};
19use reth_storage_api::StateProviderFactory;
20use reth_transaction_pool::{AllPoolTransactions, PoolTransaction, TransactionPool};
21use std::{
22    collections::{BTreeMap, btree_map::Entry},
23    time::Instant,
24};
25use tempo_chainspec::TempoChainSpec;
26use tempo_contracts::precompiles::{IAccountKeychain, IFeeManager, ITIP20, ITIP403Registry};
27use tempo_precompiles::{
28    ACCOUNT_KEYCHAIN_ADDRESS, TIP_FEE_MANAGER_ADDRESS, TIP403_REGISTRY_ADDRESS,
29};
30use tempo_primitives::{TempoAddressExt, TempoHeader, TempoPrimitives};
31use tracing::{debug, error};
32
33/// Evict transactions this many seconds before they expire to reduce propagation
34/// of near-expiry transactions that are likely to fail validation on peers.
35const EVICTION_BUFFER_SECS: u64 = 3;
36
37/// Maximum number of new-transaction events to receive in a single maintenance wakeup
38/// before yielding back to the event loop. Bounds the per-wakeup work so a sustained
39/// burst of new transactions cannot starve block-commit processing.
40const NEW_TX_DRAIN_LIMIT: usize = 4096;
41
42/// Aggregated block-level invalidation events for the transaction pool.
43///
44/// Collects all invalidation events from a block into a single structure,
45/// allowing efficient batch processing of pool updates.
46#[derive(Debug, Default)]
47pub struct TempoPoolUpdates {
48    /// Revoked keychain keys.
49    /// Indexed by account for efficient lookup.
50    pub revoked_keys: RevokedKeys,
51    /// Inline key authorization target-key status changes.
52    ///
53    /// A pending inline authorization for `(account, key)` is stale once another transaction
54    /// authorizes, admin-authorizes, or revokes that same key.
55    pub key_authorization_target_changes: RevokedKeys,
56    /// Spending limit changes.
57    /// When a spending limit changes, transactions from that key paying with that token
58    /// may become unexecutable if the new limit is below their value.
59    /// Indexed by account for efficient lookup.
60    pub spending_limit_changes: SpendingLimitUpdates,
61    /// Validator token preference changes: validator to new_token (last-write-wins).
62    /// Uses `AddressMap` to deduplicate by validator, preventing resource amplification
63    /// when a validator emits multiple `ValidatorTokenSet` events in the same block.
64    pub validator_token_changes: AddressMap<Address>,
65    /// User token preference changes.
66    /// When a user changes their fee token preference via `setUserToken()`, pending
67    /// transactions from that user that don't have an explicit fee_token set may now
68    /// resolve to a different token at execution time, causing fee payment failures.
69    /// Uses a set since a user can emit multiple events in the same block; we only need to
70    /// process each user once. No cleanup needed as this is ephemeral per-block data.
71    pub user_token_changes: AddressSet,
72    /// TIP403 blacklist additions: (policy_id, account).
73    pub blacklist_additions: Vec<(u64, Address)>,
74    /// TIP403 whitelist removals: (policy_id, account).
75    pub whitelist_removals: Vec<(u64, Address)>,
76    /// Fee token pause state changes: (token, is_paused).
77    pub pause_events: Vec<(Address, bool)>,
78    /// Tokens whose transfer policy was changed via `changeTransferPolicyId()`.
79    /// Pending transactions using these tokens as fee tokens need to be re-validated
80    /// because the new policy may forbid the fee payer or fee manager.
81    pub transfer_policy_updates: AddressSet,
82    /// Tokens whose `quoteToken` was updated via `completeQuoteTokenUpdate()`.
83    /// Pending transactions paying in these tokens need to be re-validated because the new
84    /// quote token may invalidate the old route.
85    pub quote_token_updates: AddressSet,
86    /// Fee token balance changes keyed by token.
87    ///
88    /// We only track the debited `from` account from TIP20 `Transfer` logs because credits to the
89    /// `to` account cannot make an already-admitted transaction newly invalid.
90    pub fee_balance_changes: AddressMap<AddressSet>,
91    /// Spending-limit spends emitted by the account keychain during execution.
92    ///
93    /// We record the exact `(account, key_id, token)` triples emitted by `AccessKeySpend`
94    /// events. During eviction, the pool re-reads the remaining limit from state for these
95    /// triples and compares against pending tx fee costs. This keeps maintenance aligned
96    /// with the runtime's actual spending-limit decrements instead of inferring them from
97    /// the mined transaction body.
98    pub spending_limit_spends: SpendingLimitUpdates,
99    /// TIP-1053 key-authorization witness burns.
100    ///
101    /// Pending AA transactions carrying the same `(account, witness)` key authorization are no
102    /// longer executable once the account explicitly burns that witness.
103    pub key_authorization_witness_burns: AddressMap<B256Set>,
104}
105
106impl TempoPoolUpdates {
107    /// Creates a new empty `TempoPoolUpdates`.
108    pub fn new() -> Self {
109        Self::default()
110    }
111
112    /// Returns true if there are no updates to process.
113    pub fn is_empty(&self) -> bool {
114        self.revoked_keys.is_empty()
115            && self.key_authorization_target_changes.is_empty()
116            && self.spending_limit_changes.is_empty()
117            && self.validator_token_changes.is_empty()
118            && self.user_token_changes.is_empty()
119            && self.blacklist_additions.is_empty()
120            && self.whitelist_removals.is_empty()
121            && self.pause_events.is_empty()
122            && self.transfer_policy_updates.is_empty()
123            && self.quote_token_updates.is_empty()
124            && self.fee_balance_changes.is_empty()
125            && self.spending_limit_spends.is_empty()
126            && self.key_authorization_witness_burns.is_empty()
127    }
128
129    /// Extracts pool updates from a committed chain segment.
130    ///
131    /// Parses receipts for relevant events (key revocations, validator token changes,
132    /// blacklist additions, pause events).
133    pub fn from_chain(chain: &Chain<TempoPrimitives>) -> Self {
134        let mut updates = Self::new();
135
136        // Parse events from receipts
137        for log in chain
138            .execution_outcome()
139            .receipts()
140            .iter()
141            .flatten()
142            .flat_map(|receipt| &receipt.logs)
143        {
144            // Fee token pause events and balance changes.
145            //
146            // Checked first because TIP-20 `Transfer` logs dominate block receipts; this avoids
147            // three address comparisons per transfer before reaching the matching branch.
148            if log.address.is_tip20() {
149                match Tip20PoolEvent::decode(log) {
150                    Some(Tip20PoolEvent::PauseStateUpdate(event)) => {
151                        updates.pause_events.push((log.address, event.isPaused));
152                    }
153                    Some(Tip20PoolEvent::TransferPolicyUpdate) => {
154                        updates.transfer_policy_updates.insert(log.address);
155                    }
156                    Some(Tip20PoolEvent::QuoteTokenUpdate) => {
157                        updates.quote_token_updates.insert(log.address);
158                    }
159                    Some(Tip20PoolEvent::Transfer { from }) => {
160                        updates
161                            .fee_balance_changes
162                            .entry(log.address)
163                            .or_default()
164                            .insert(from);
165                    }
166                    None => {}
167                }
168            }
169            // Key revocations and spending limit changes
170            else if log.address == ACCOUNT_KEYCHAIN_ADDRESS {
171                match AccountKeychainPoolEvent::decode(log) {
172                    Some(AccountKeychainPoolEvent::KeyRevoked(event)) => {
173                        updates.revoked_keys.insert(event.account, event.publicKey);
174                        updates
175                            .key_authorization_target_changes
176                            .insert(event.account, event.publicKey);
177                    }
178                    Some(AccountKeychainPoolEvent::KeyAuthorized(event)) => {
179                        updates
180                            .key_authorization_target_changes
181                            .insert(event.account, event.publicKey);
182                    }
183                    Some(AccountKeychainPoolEvent::AdminKeyAuthorized(event)) => {
184                        updates
185                            .key_authorization_target_changes
186                            .insert(event.account, event.publicKey);
187                    }
188                    Some(AccountKeychainPoolEvent::SpendingLimitUpdated(event)) => {
189                        updates.spending_limit_changes.insert(
190                            event.account,
191                            event.publicKey,
192                            Some(event.token),
193                        );
194                    }
195                    Some(AccountKeychainPoolEvent::AccessKeySpend(event)) => {
196                        updates.spending_limit_spends.insert(
197                            event.account,
198                            event.publicKey,
199                            Some(event.token),
200                        );
201                    }
202                    Some(AccountKeychainPoolEvent::KeyAuthorizationWitnessBurned(event)) => {
203                        updates
204                            .key_authorization_witness_burns
205                            .entry(event.account)
206                            .or_default()
207                            .insert(event.witness);
208                    }
209                    None => {}
210                }
211            }
212            // Validator and user token changes
213            else if log.address == TIP_FEE_MANAGER_ADDRESS {
214                match FeeManagerPoolEvent::decode(log) {
215                    Some(FeeManagerPoolEvent::ValidatorTokenSet(event)) => {
216                        updates
217                            .validator_token_changes
218                            .insert(event.validator, event.token);
219                    }
220                    Some(FeeManagerPoolEvent::UserTokenSet(event)) => {
221                        updates.user_token_changes.insert(event.user);
222                    }
223                    None => {}
224                }
225            }
226            // TIP403 blacklist additions and whitelist removals
227            else if log.address == TIP403_REGISTRY_ADDRESS {
228                match Tip403PoolEvent::decode(log) {
229                    Some(Tip403PoolEvent::BlacklistUpdated(event)) if event.restricted => {
230                        updates
231                            .blacklist_additions
232                            .push((event.policyId, event.account));
233                    }
234                    Some(Tip403PoolEvent::WhitelistUpdated(event)) if !event.allowed => {
235                        updates
236                            .whitelist_removals
237                            .push((event.policyId, event.account));
238                    }
239                    Some(_) | None => {}
240                }
241            }
242        }
243
244        updates
245    }
246
247    /// Returns true if there are any invalidation events that require scanning the pool.
248    pub fn has_invalidation_events(&self) -> bool {
249        self.has_keychain_subject_updates()
250            || !self.key_authorization_target_changes.is_empty()
251            || !self.validator_token_changes.is_empty()
252            || !self.user_token_changes.is_empty()
253            || !self.blacklist_additions.is_empty()
254            || !self.whitelist_removals.is_empty()
255            || !self.fee_balance_changes.is_empty()
256            || !self.key_authorization_witness_burns.is_empty()
257    }
258
259    /// Returns true if updates may invalidate keychain-signature transactions.
260    pub fn has_keychain_subject_updates(&self) -> bool {
261        !self.revoked_keys.is_empty()
262            || !self.spending_limit_changes.is_empty()
263            || !self.spending_limit_spends.is_empty()
264    }
265}
266
267/// Transaction-pool relevant subset of `IAccountKeychain::IAccountKeychainEvents`.
268enum AccountKeychainPoolEvent {
269    /// [`IAccountKeychain::KeyAuthorized`] log.
270    KeyAuthorized(IAccountKeychain::KeyAuthorized),
271    /// [`IAccountKeychain::AdminKeyAuthorized`] log.
272    AdminKeyAuthorized(IAccountKeychain::AdminKeyAuthorized),
273    /// [`IAccountKeychain::KeyRevoked`] log.
274    KeyRevoked(IAccountKeychain::KeyRevoked),
275    /// [`IAccountKeychain::SpendingLimitUpdated`] log.
276    SpendingLimitUpdated(IAccountKeychain::SpendingLimitUpdated),
277    /// [`IAccountKeychain::AccessKeySpend`] log.
278    AccessKeySpend(IAccountKeychain::AccessKeySpend),
279    /// [`IAccountKeychain::KeyAuthorizationWitnessBurned`] log.
280    KeyAuthorizationWitnessBurned(IAccountKeychain::KeyAuthorizationWitnessBurned),
281}
282
283impl AccountKeychainPoolEvent {
284    /// Decodes only account-keychain events used by transaction-pool maintenance.
285    fn decode(log: &Log) -> Option<Self> {
286        match first_topic(log)? {
287            IAccountKeychain::KeyAuthorized::SIGNATURE_HASH => {
288                decode_event(log).map(Self::KeyAuthorized)
289            }
290            IAccountKeychain::AdminKeyAuthorized::SIGNATURE_HASH => {
291                decode_event(log).map(Self::AdminKeyAuthorized)
292            }
293            IAccountKeychain::KeyRevoked::SIGNATURE_HASH => decode_event(log).map(Self::KeyRevoked),
294            IAccountKeychain::SpendingLimitUpdated::SIGNATURE_HASH => {
295                decode_event(log).map(Self::SpendingLimitUpdated)
296            }
297            IAccountKeychain::AccessKeySpend::SIGNATURE_HASH => {
298                decode_event(log).map(Self::AccessKeySpend)
299            }
300            IAccountKeychain::KeyAuthorizationWitnessBurned::SIGNATURE_HASH => {
301                decode_event(log).map(Self::KeyAuthorizationWitnessBurned)
302            }
303            _ => None,
304        }
305    }
306}
307
308/// Transaction-pool relevant subset of `IFeeManager::IFeeManagerEvents`.
309enum FeeManagerPoolEvent {
310    /// [`IFeeManager::ValidatorTokenSet`] log.
311    ValidatorTokenSet(IFeeManager::ValidatorTokenSet),
312    /// [`IFeeManager::UserTokenSet`] log.
313    UserTokenSet(IFeeManager::UserTokenSet),
314}
315
316impl FeeManagerPoolEvent {
317    /// Decodes only fee-manager events used by transaction-pool maintenance.
318    fn decode(log: &Log) -> Option<Self> {
319        match first_topic(log)? {
320            IFeeManager::ValidatorTokenSet::SIGNATURE_HASH => {
321                decode_event(log).map(Self::ValidatorTokenSet)
322            }
323            IFeeManager::UserTokenSet::SIGNATURE_HASH => decode_event(log).map(Self::UserTokenSet),
324            _ => None,
325        }
326    }
327}
328
329/// Transaction-pool relevant subset of `ITIP403Registry::ITIP403RegistryEvents`.
330enum Tip403PoolEvent {
331    /// [`ITIP403Registry::BlacklistUpdated`] log.
332    BlacklistUpdated(ITIP403Registry::BlacklistUpdated),
333    /// [`ITIP403Registry::WhitelistUpdated`] log.
334    WhitelistUpdated(ITIP403Registry::WhitelistUpdated),
335}
336
337impl Tip403PoolEvent {
338    /// Decodes only TIP-403 registry events used by transaction-pool maintenance.
339    fn decode(log: &Log) -> Option<Self> {
340        match first_topic(log)? {
341            ITIP403Registry::BlacklistUpdated::SIGNATURE_HASH => {
342                decode_event(log).map(Self::BlacklistUpdated)
343            }
344            ITIP403Registry::WhitelistUpdated::SIGNATURE_HASH => {
345                decode_event(log).map(Self::WhitelistUpdated)
346            }
347            _ => None,
348        }
349    }
350}
351
352/// Transaction-pool relevant subset of `ITIP20::ITIP20Events`.
353enum Tip20PoolEvent {
354    /// [`ITIP20::PauseStateUpdate`] log.
355    PauseStateUpdate(ITIP20::PauseStateUpdate),
356    /// [`ITIP20::TransferPolicyUpdate`] log.
357    TransferPolicyUpdate,
358    /// [`ITIP20::QuoteTokenUpdate`] log.
359    QuoteTokenUpdate,
360    /// [`ITIP20::Transfer`] log; only the debited `from` account is retained.
361    Transfer { from: Address },
362}
363
364impl Tip20PoolEvent {
365    /// Decodes only TIP-20 events used by transaction-pool maintenance.
366    fn decode(log: &Log) -> Option<Self> {
367        match first_topic(log)? {
368            // `Transfer` is by far the most common TIP-20 log, so avoid a full event decode
369            // and read the indexed `from` directly from `topics[1]`. We only need the debited
370            // account for `fee_balance_changes`; `to` and `amount` are unused.
371            ITIP20::Transfer::SIGNATURE_HASH => log.topics().get(1).map(|topic| Self::Transfer {
372                from: Address::from_word(*topic),
373            }),
374            ITIP20::PauseStateUpdate::SIGNATURE_HASH => {
375                decode_event(log).map(Self::PauseStateUpdate)
376            }
377            ITIP20::TransferPolicyUpdate::SIGNATURE_HASH => {
378                decode_event::<ITIP20::TransferPolicyUpdate>(log)
379                    .map(|_| Self::TransferPolicyUpdate)
380            }
381            ITIP20::QuoteTokenUpdate::SIGNATURE_HASH => {
382                decode_event::<ITIP20::QuoteTokenUpdate>(log).map(|_| Self::QuoteTokenUpdate)
383            }
384            _ => None,
385        }
386    }
387}
388
389fn first_topic(log: &Log) -> Option<B256> {
390    log.topics().first().copied()
391}
392
393/// Decodes after the caller has matched `topic0`, avoiding the allocating
394/// invalid-signature error path for unrelated events.
395fn decode_event<T: SolEvent>(log: &Log) -> Option<T> {
396    T::decode_log(log).ok().map(|event| event.data)
397}
398
399/// Tracking state for pool maintenance operations.
400///
401/// Tracks AA transaction expiry (`valid_before` timestamps) for eviction.
402///
403/// Note: Stale entries (transactions no longer in the pool) are cleaned up lazily
404/// when we check `pool.contains()` before eviction. This avoids the overhead of
405/// subscribing to all transaction lifecycle events.
406#[derive(Default)]
407struct TempoPoolState {
408    /// Maps timestamp to transactions that are going to be invalidated at that time (due to `valid_after` or keychain-related expiry).
409    expiry_map: BTreeMap<u64, B256Set>,
410    /// Reverse mapping: tx_hash -> valid_before timestamp (for cleanup during drain).
411    tx_to_expiry: B256Map<u64>,
412    /// Pool for transactions whose fee token is temporarily paused.
413    paused_pool: PausedFeeTokenPool,
414    /// Tracks pending transaction staleness for DoS mitigation.
415    pending_staleness: PendingStalenessTracker,
416}
417
418impl TempoPoolState {
419    /// Tracks an AA transaction with a `valid_before` timestamp.
420    fn track(&mut self, tx: &TempoPooledTransaction) {
421        let valid_before = tx
422            .inner()
423            .as_aa()
424            .and_then(|tx| tx.tx().valid_before.map(|value| value.get()));
425        let key_expiry = tx.key_expiry();
426
427        let expiry = [valid_before, key_expiry].into_iter().flatten().min();
428
429        if let Some(expiry) = expiry {
430            self.expiry_map
431                .entry(expiry)
432                .or_default()
433                .insert(*tx.hash());
434            self.tx_to_expiry.insert(*tx.hash(), expiry);
435        }
436    }
437
438    /// Removes expiry and key-expiry tracking for a single transaction.
439    fn untrack(&mut self, hash: &TxHash) {
440        if let Some(expiry) = self.tx_to_expiry.remove(hash)
441            && let Entry::Occupied(mut entry) = self.expiry_map.entry(expiry)
442        {
443            entry.get_mut().remove(hash);
444            if entry.get().is_empty() {
445                entry.remove();
446            }
447        }
448    }
449
450    /// Removes expiry and key-expiry tracking for a batch of transactions.
451    ///
452    /// Mined transactions often share the same expiry timestamp, so first group
453    /// hashes by their recorded expiry and then touch each expiry bucket once.
454    /// This avoids repeating the `expiry_map` lookup for every mined hash while
455    /// preserving O(1)-ish removal from each `B256Set` bucket.
456    fn untrack_many<'a>(&mut self, hashes: impl IntoIterator<Item = &'a TxHash>) {
457        // Skip iterating the mined hashes if nothing is tracked for expiry.
458        if self.tx_to_expiry.is_empty() {
459            return;
460        }
461
462        let mut hashes_by_expiry: BTreeMap<u64, B256Set> = BTreeMap::new();
463
464        for hash in hashes {
465            if let Some(expiry) = self.tx_to_expiry.remove(hash) {
466                hashes_by_expiry.entry(expiry).or_default().insert(*hash);
467            }
468        }
469
470        for (expiry, hashes) in hashes_by_expiry {
471            if let Entry::Occupied(mut entry) = self.expiry_map.entry(expiry) {
472                let bucket = entry.get_mut();
473                for hash in hashes {
474                    bucket.remove(&hash);
475                }
476                if bucket.is_empty() {
477                    entry.remove();
478                }
479            }
480        }
481    }
482
483    /// Collects and removes all expired transactions up to the given timestamp.
484    /// Returns the list of expired transaction hashes.
485    fn drain_expired(&mut self, tip_timestamp: u64) -> Vec<TxHash> {
486        let mut expired = Vec::new();
487        while let Some(entry) = self.expiry_map.first_entry()
488            && *entry.key() <= tip_timestamp
489        {
490            let expired_hashes = entry.remove();
491            expired.reserve(expired_hashes.len());
492            for tx_hash in expired_hashes {
493                self.tx_to_expiry.remove(&tx_hash);
494                expired.push(tx_hash);
495            }
496        }
497        expired
498    }
499}
500
501/// Default interval for pending transaction staleness checks (30 minutes).
502/// Transactions that remain pending across two consecutive snapshots will be evicted.
503const DEFAULT_PENDING_STALENESS_INTERVAL: u64 = 30 * 60;
504
505/// Tracks pending transactions across snapshots to detect stale transactions.
506///
507/// Uses a simple snapshot comparison approach:
508/// - Every interval, take a snapshot of current pending transactions
509/// - Transactions present in both the previous and current snapshot are considered stale
510/// - Stale transactions are evicted since they've been pending for at least one full interval
511#[derive(Debug)]
512struct PendingStalenessTracker {
513    /// Previous snapshot of pending transaction hashes.
514    previous_pending: B256Set,
515    /// Timestamp of the last snapshot.
516    last_snapshot_time: Option<u64>,
517    /// Interval in seconds between staleness checks.
518    interval_secs: u64,
519}
520
521impl PendingStalenessTracker {
522    /// Creates a new tracker with the given check interval.
523    fn new(interval_secs: u64) -> Self {
524        Self {
525            previous_pending: B256Set::default(),
526            last_snapshot_time: None,
527            interval_secs,
528        }
529    }
530
531    /// Returns true if the staleness check interval has elapsed and a snapshot should be taken.
532    fn should_check(&self, now: u64) -> bool {
533        self.last_snapshot_time
534            .is_none_or(|last| now.saturating_sub(last) >= self.interval_secs)
535    }
536
537    /// Checks for stale transactions and updates the snapshot.
538    ///
539    /// Returns transactions that have been pending across two consecutive snapshots
540    /// (i.e., pending for at least one full interval).
541    ///
542    /// Call `should_check` first to avoid collecting the pending set on every block.
543    fn check_and_update(&mut self, current_pending: B256Set, now: u64) -> Vec<TxHash> {
544        let previous_pending = std::mem::take(&mut self.previous_pending);
545
546        // Split the current snapshot into stale transactions to evict and fresh
547        // transactions to track. A transaction is stale if it appears in both
548        // the previous and current pending snapshots.
549        let (stale, next_pending): (Vec<TxHash>, B256Set) =
550            current_pending.into_iter().partition_map(|hash| {
551                if previous_pending.contains(&hash) {
552                    Either::Left(hash)
553                } else {
554                    Either::Right(hash)
555                }
556            });
557
558        self.previous_pending = next_pending;
559        self.last_snapshot_time = Some(now);
560
561        stale
562    }
563}
564
565impl Default for PendingStalenessTracker {
566    fn default() -> Self {
567        Self::new(DEFAULT_PENDING_STALENESS_INTERVAL)
568    }
569}
570
571/// Unified maintenance task for the Tempo transaction pool.
572///
573/// Handles:
574/// - Evicting expired AA transactions (`valid_before <= tip_timestamp`)
575/// - Evicting transactions using expired keychain keys (`AuthorizedKey.expiry <= tip_timestamp`)
576/// - Updating the AA 2D nonce pool from `NonceManager` changes
577/// - Refreshing the AMM liquidity cache from `FeeManager` updates
578/// - Removing transactions signed with revoked keychain keys
579/// - Moving transactions to/from the paused pool when fee tokens are paused/unpaused
580///
581/// Consolidates these operations into a single event loop to avoid multiple tasks
582/// competing for canonical state updates and to minimize contention on pool locks.
583pub async fn maintain_tempo_pool<Client>(pool: TempoTransactionPool<Client>)
584where
585    Client: StateProviderFactory
586        + HeaderProvider<Header = TempoHeader>
587        + ChainSpecProvider<ChainSpec = TempoChainSpec>
588        + CanonStateSubscriptions<Primitives = TempoPrimitives>
589        + 'static,
590{
591    let mut state = TempoPoolState::default();
592    let metrics = TempoPoolMaintenanceMetrics::default();
593
594    // Subscribe to new transactions and chain events
595    let mut new_txs = pool.new_transactions_listener();
596    let mut chain_events = pool.client().canonical_state_stream();
597
598    // Populate expiry tracking with existing transactions to prevent race conditions at start-up
599    let all_txs = pool.all_transactions();
600    for tx in all_txs.iter() {
601        state.track(&tx.transaction);
602    }
603
604    let amm_cache = pool.amm_liquidity_cache();
605    let mut new_tx_events = Vec::with_capacity(NEW_TX_DRAIN_LIMIT);
606
607    loop {
608        tokio::select! {
609            // Track new transactions for expiry (valid_before and key expiry)
610            n = new_txs.recv_many(&mut new_tx_events, NEW_TX_DRAIN_LIMIT) => {
611                if n == 0 {
612                    break;
613                }
614
615                // Batch already-buffered events to amortize select/poll overhead while bounding
616                // per-wakeup work so block processing can still make progress.
617                for tx_event in new_tx_events.drain(..) {
618                    state.track(&tx_event.transaction.transaction);
619                }
620            }
621
622            // Process all maintenance operations on new block commit or reorg
623            Some(event) = chain_events.next() => {
624                let new = match event {
625                    CanonStateNotification::Reorg { old: _, new } => {
626                        // Repopulate AMM liquidity cache from the new canonical chain
627                        // to invalidate stale entries from orphaned blocks.
628                        if let Err(err) = amm_cache.repopulate(pool.client()) {
629                            error!(target: "txpool", ?err, "AMM liquidity cache repopulate after reorg failed");
630                        }
631
632                        new
633                    }
634                    CanonStateNotification::Commit { new } => new,
635                };
636
637                let block_update_start = Instant::now();
638
639                let tip = &new;
640                let bundle_state = tip.execution_outcome().state().state();
641                let tip_timestamp = tip.tip().header().timestamp();
642
643                // Removed transactions are collected here and dropped at the end of the
644                // iteration: deallocating them (input data, signatures, allocator work) is
645                // expensive and there is a block time of slack after the updates are done.
646                let mut removed_txs: Vec<Vec<_>> = Vec::with_capacity(1);
647
648                // 1. Update 2D nonce pool before scan-based maintenance.
649                // This removes mined 2D nonce transactions and promotes newly
650                // unblocked transactions before later pool scans.
651                let nonce_pool_start = Instant::now();
652                removed_txs.push(pool.notify_aa_pool_on_state_updates(bundle_state));
653                metrics.nonce_pool_update_duration_seconds.record(nonce_pool_start.elapsed());
654
655                // 2. Update AMM liquidity cache before revalidation/invalidation scans.
656                let amm_start = Instant::now();
657                amm_cache.on_new_state(tip.execution_outcome());
658                if let Err(err) = amm_cache
659                    .on_new_blocks(tip.blocks_iter().map(|block| block.sealed_header()), pool.client())
660                {
661                    error!(target: "txpool", ?err, "AMM liquidity cache update failed");
662                }
663                metrics.amm_cache_update_duration_seconds.record(amm_start.elapsed());
664
665                // 3. Collect all block-level invalidation events
666                let updates = TempoPoolUpdates::from_chain(tip);
667
668                // Remove expiry tracking for mined transactions.
669                state.untrack_many(tip.transaction_hashes());
670
671                // Evict transactions slightly before they expire to prevent
672                // broadcasting near-expiry txs that peers would reject.
673                let max_expiry = tip_timestamp.saturating_add(EVICTION_BUFFER_SECS);
674
675                // Collect expired transactions from local tracking state. Mined transactions
676                // were untracked above so they cannot be drained here, and hashes that have
677                // since left the pool are no-ops for `remove_transactions`.
678                let expired_txs = state.drain_expired(max_expiry);
679
680                // 4. Evict expired AA transactions
681                let expired_start = Instant::now();
682                if !expired_txs.is_empty() {
683                    let evicted = pool.remove_transactions(expired_txs);
684                    debug!(
685                        target: "txpool",
686                        count = evicted.len(),
687                        tip_timestamp,
688                        "Evicting expired AA transactions (valid_before)"
689                    );
690                    metrics.expired_transactions_evicted.increment(evicted.len() as u64);
691                    removed_txs.push(evicted);
692                }
693                metrics.expired_eviction_duration_seconds.record(expired_start.elapsed());
694
695                let mut all_txs: Option<AllPoolTransactions<TempoPooledTransaction>> = None;
696                let mut removed_this_iteration = B256Set::default();
697
698                // 5. Handle fee token pause/unpause events
699                let pause_start = Instant::now();
700
701                // Collect pause tokens that need pool scanning.
702                // For pause events, we need to scan the pool. For unpause events, we
703                // only need to check the paused_pool (O(1) lookup by token).
704                let pause_tokens: Vec<Address> = updates
705                    .pause_events
706                    .iter()
707                    .filter_map(|(token, is_paused)| is_paused.then_some(*token))
708                    .collect();
709
710                // Process pause events: fetch pool transactions once for all pause tokens.
711                // This avoids the O(pause_events * pool_size) cost of fetching per event.
712                if !pause_tokens.is_empty() {
713                    // Group transactions by effective fee token for efficient batch processing.
714                    // This single pass over all transactions handles all pause events.
715                    let mut by_token = {
716                        let all_txs = all_txs.get_or_insert_with(|| pool.all_transactions());
717                        all_txs.iter()
718                            .filter(|tx| !removed_this_iteration.contains(tx.hash()))
719                            .fold(
720                                AddressMap::<Vec<TxHash>>::default(),
721                                |mut by_token, tx| {
722                                    by_token
723                                        .entry(tx.transaction.effective_fee_token())
724                                        .or_default()
725                                        .push(*tx.hash());
726                                    by_token
727                                },
728                            )
729                    };
730
731                    // Process each pause token
732                    for token in pause_tokens {
733                        let Some(hashes_to_pause) = by_token.remove(&token) else {
734                            // No transactions use this fee token - skip
735                            continue;
736                        };
737
738                        let removed_txs = pool.remove_transactions(hashes_to_pause);
739                        let count = removed_txs.len();
740
741                        if count > 0 {
742                            // Clean up expiry tracking for paused txs
743                            for tx in &removed_txs {
744                                state.untrack(tx.hash());
745                                removed_this_iteration.insert(*tx.hash());
746                            }
747
748                            let entries: Vec<_> = removed_txs
749                                .into_iter()
750                                .map(|tx| {
751                                    let valid_before = tx
752                                        .transaction
753                                        .inner()
754                                        .as_aa()
755                                        .and_then(|aa| aa.tx().valid_before.map(|value| value.get()));
756                                    PausedEntry { tx, valid_before }
757                                })
758                                .collect();
759
760                            let cap_evicted = state.paused_pool.insert_batch(token, entries);
761                            metrics.transactions_paused.increment(count as u64);
762                            if cap_evicted > 0 {
763                                metrics.paused_pool_cap_evicted.increment(cap_evicted as u64);
764                                debug!(
765                                    target: "txpool",
766                                    cap_evicted,
767                                    "Evicted oldest paused transactions due to global cap"
768                                );
769                            }
770                            debug!(
771                                target: "txpool",
772                                %token,
773                                count,
774                                "Moved transactions to paused pool (fee token paused)"
775                            );
776                        }
777                    }
778                }
779
780                // Process unpause events: O(1) lookup per token in paused_pool
781                for (token, is_paused) in &updates.pause_events {
782                    if *is_paused {
783                        continue; // Already handled above
784                    }
785
786                    // Unpause: drain from paused pool and re-add to main pool
787                    let paused_entries = state.paused_pool.drain_token(token);
788                    if !paused_entries.is_empty() {
789                        let count = paused_entries.len();
790                        metrics.transactions_unpaused.increment(count as u64);
791                        let pool_clone = pool.clone();
792                        let token = *token;
793                        tokio::spawn(async move {
794                            let txs: Vec<_> = paused_entries
795                                .into_iter()
796                                .map(|e| e.tx.transaction.clone())
797                                .collect();
798
799                            let results = pool_clone
800                                .add_external_transactions(txs)
801                                .await;
802
803                            let success = results.iter().filter(|r| r.is_ok()).count();
804                            debug!(
805                                target: "txpool",
806                                %token,
807                                total = count,
808                                success,
809                                "Restored transactions from paused pool (fee token unpaused)"
810                            );
811                        });
812                    }
813                }
814
815                // 6. Evict expired transactions from the paused pool
816                let paused_expired = state.paused_pool.evict_expired(tip_timestamp);
817                let paused_timed_out = state.paused_pool.evict_timed_out();
818                let total_paused_evicted = paused_expired + paused_timed_out;
819                if total_paused_evicted > 0 {
820                    debug!(
821                        target: "txpool",
822                        count = total_paused_evicted,
823                        tip_timestamp,
824                        "Evicted expired transactions from paused pool"
825                    );
826                }
827
828                // 7. Evict hard keychain invalidations from paused pool
829                // Ignore spending_limit_spends here: AccessKeySpend only proves partial limit consumption, and paused txs are fully revalidated on unpause.
830                if !updates.revoked_keys.is_empty()
831                    || !updates.key_authorization_target_changes.is_empty()
832                    || !updates.spending_limit_changes.is_empty()
833                    || !updates.key_authorization_witness_burns.is_empty()
834                {
835                    state.paused_pool.evict_invalidated(
836                        &updates.revoked_keys,
837                        &updates.key_authorization_target_changes,
838                        &updates.spending_limit_changes,
839                        &updates.key_authorization_witness_burns,
840                    );
841                }
842                metrics.pause_events_duration_seconds.record(pause_start.elapsed());
843
844                // 8. Handle potentially invalidating updates
845                // When a cached value changes of a token (transfer policy, or quote token) changes,
846                // pending transactions using that token may become invalid. We need to remove them
847                // and re-add so they go through full validation against the updated state.
848                for (updated, counter, reason) in [
849                    (
850                        &updates.transfer_policy_updates,
851                        &metrics.transfer_policy_revalidated,
852                        "transfer policy update",
853                    ),
854                    (
855                        &updates.quote_token_updates,
856                        &metrics.quote_token_revalidated,
857                        "quote token update",
858                    ),
859                ] {
860                    if updated.is_empty() {
861                        continue;
862                    }
863
864                    let hashes: Vec<TxHash> = {
865                        let all_txs = all_txs.get_or_insert_with(|| pool.all_transactions());
866                        all_txs
867                            .iter()
868                            .filter(|tx| !removed_this_iteration.contains(tx.hash()))
869                            .filter(|tx| {
870                                tx.transaction
871                                    .resolved_fee_token()
872                                    .is_some_and(|t| updated.contains(&t))
873                            })
874                            .map(|tx| *tx.hash())
875                            .collect()
876                    };
877                    if !hashes.is_empty() {
878                        let removed_txs = pool.remove_transactions(hashes);
879                        let count = removed_txs.len();
880
881                        for tx in &removed_txs {
882                            state.untrack(tx.hash());
883                            removed_this_iteration.insert(*tx.hash());
884                        }
885
886                        counter.increment(count as u64);
887
888                        let pool_clone = pool.clone();
889                        tokio::spawn(async move {
890                            let txs: Vec<_> = removed_txs
891                                .into_iter()
892                                .map(|tx| (tx.origin, tx.transaction.clone()))
893                                .collect();
894
895                            let results = pool_clone.add_transactions_with_origins(txs).await;
896                            let success = results.iter().filter(|r| r.is_ok()).count();
897                            debug!(
898                                target: "txpool",
899                                total = count,
900                                success,
901                                reason,
902                                "Re-validated transactions"
903                            );
904                        });
905                    }
906                }
907
908                // 9. Evict invalidated transactions in a single pool scan
909                // This checks revoked keys, spending limit changes, validator token changes,
910                // blacklist additions, and whitelist removals together to avoid scanning
911                // all transactions multiple times per block.
912                if updates.has_invalidation_events() {
913                    let invalidation_start = Instant::now();
914                    debug!(
915                        target: "txpool",
916                        revoked_keys = updates.revoked_keys.len(),
917                        key_authorization_target_changes =
918                            updates.key_authorization_target_changes.len(),
919                        spending_limit_changes = updates.spending_limit_changes.len(),
920                        spending_limit_spends = updates.spending_limit_spends.len(),
921                        validator_token_changes = updates.validator_token_changes.len(),
922                        user_token_changes = updates.user_token_changes.len(),
923                        blacklist_additions = updates.blacklist_additions.len(),
924                        whitelist_removals = updates.whitelist_removals.len(),
925                        "Processing transaction invalidation events"
926                    );
927                    let evicted = {
928                        let all_txs = all_txs.get_or_insert_with(|| pool.all_transactions());
929                        pool.evict_invalidated_transactions_from(
930                            &updates,
931                            all_txs
932                                .iter()
933                                .filter(|tx| !removed_this_iteration.contains(tx.hash())),
934                        )
935                    };
936                    for tx in &evicted {
937                        state.untrack(tx.hash());
938                    }
939                    metrics.transactions_invalidated.increment(evicted.len() as u64);
940                    removed_txs.push(evicted);
941                    metrics
942                        .invalidation_eviction_duration_seconds
943                        .record(invalidation_start.elapsed());
944                }
945
946                // 10. Evict stale pending transactions (must happen after AA pool promotions in step 1)
947                // Only runs once per interval (~30 min) to avoid overhead on every block.
948                // Transactions pending across two consecutive snapshots are considered stale.
949                if state.pending_staleness.should_check(tip_timestamp) {
950                    let current_pending: B256Set =
951                        pool.pending_transactions().iter().map(|tx| *tx.hash()).collect();
952                    let stale_to_evict =
953                        state.pending_staleness.check_and_update(current_pending, tip_timestamp);
954
955                    if !stale_to_evict.is_empty() {
956                        debug!(
957                            target: "txpool",
958                            count = stale_to_evict.len(),
959                            tip_timestamp,
960                            "Evicting stale pending transactions"
961                        );
962                        // Clean up expiry tracking for stale txs to prevent orphaned entries
963                        for hash in &stale_to_evict {
964                            state.untrack(hash);
965                        }
966                        removed_txs.push(pool.remove_transactions(stale_to_evict));
967                    }
968                }
969
970                // Record total block update duration
971                metrics.block_update_duration_seconds.record(block_update_start.elapsed());
972
973                // Deallocating removed transactions is expensive, so do it after all updates are done.
974                drop(removed_txs);
975            }
976        }
977    }
978}
979
980#[cfg(test)]
981mod tests {
982    use super::*;
983    use crate::test_utils::TxBuilder;
984    use alloy_primitives::{Address, B256, TxHash};
985    use reth_primitives_traits::RecoveredBlock;
986    use std::{collections::HashSet, sync::Arc};
987    use tempo_primitives::{Block, BlockBody, TempoHeader, TempoTxEnvelope};
988
989    mod pending_staleness_tracker_tests {
990        use super::*;
991
992        #[test]
993        fn no_eviction_on_first_snapshot() {
994            let mut tracker = PendingStalenessTracker::new(100);
995            let tx1 = TxHash::random();
996
997            // First snapshot should not evict anything (no previous snapshot to compare)
998            let stale = tracker.check_and_update([tx1].into_iter().collect(), 100);
999            assert!(stale.is_empty());
1000            assert!(tracker.previous_pending.contains(&tx1));
1001        }
1002
1003        #[test]
1004        fn evicts_transactions_present_in_both_snapshots() {
1005            let mut tracker = PendingStalenessTracker::new(100);
1006            let tx_stale = TxHash::random();
1007            let tx_new = TxHash::random();
1008
1009            // First snapshot at t=0
1010            tracker.check_and_update([tx_stale].into_iter().collect(), 0);
1011
1012            // Second snapshot at t=100: tx_stale still pending, tx_new is new
1013            let stale = tracker.check_and_update([tx_stale, tx_new].into_iter().collect(), 100);
1014
1015            // tx_stale was in both snapshots -> evicted
1016            assert_eq!(stale.len(), 1);
1017            assert!(stale.contains(&tx_stale));
1018
1019            // tx_new should be tracked for the next snapshot
1020            assert!(tracker.previous_pending.contains(&tx_new));
1021            // tx_stale should NOT be in the snapshot (it was evicted)
1022            assert!(!tracker.previous_pending.contains(&tx_stale));
1023        }
1024
1025        #[test]
1026        fn should_check_returns_false_before_interval_elapsed() {
1027            let mut tracker = PendingStalenessTracker::new(100);
1028            let tx = TxHash::random();
1029
1030            // First snapshot at t=0
1031            assert!(tracker.should_check(0));
1032            tracker.check_and_update([tx].into_iter().collect(), 0);
1033
1034            // At t=50 (before interval elapsed) - should_check returns false
1035            assert!(!tracker.should_check(50));
1036            assert_eq!(tracker.last_snapshot_time, Some(0));
1037
1038            // At t=100 (interval elapsed) - should_check returns true
1039            assert!(tracker.should_check(100));
1040        }
1041
1042        #[test]
1043        fn removes_transactions_no_longer_pending_from_snapshot() {
1044            let mut tracker = PendingStalenessTracker::new(100);
1045            let tx1 = TxHash::random();
1046            let tx2 = TxHash::random();
1047
1048            // First snapshot with both txs at t=0
1049            tracker.check_and_update([tx1, tx2].into_iter().collect(), 0);
1050            assert_eq!(tracker.previous_pending.len(), 2);
1051
1052            // Second snapshot at t=100: only tx1 still pending
1053            // tx1 was in both -> stale, tx2 not in current -> removed from tracking
1054            let stale = tracker.check_and_update([tx1].into_iter().collect(), 100);
1055            assert_eq!(stale.len(), 1);
1056            assert!(stale.contains(&tx1));
1057
1058            // Neither should be in the snapshot now
1059            assert!(tracker.previous_pending.is_empty());
1060        }
1061    }
1062
1063    #[test]
1064    fn track_groups_duplicate_expiries() {
1065        let mut state = TempoPoolState::default();
1066        let tx_a = TxBuilder::aa(Address::random())
1067            .nonce(1)
1068            .valid_before(1000)
1069            .build();
1070        let tx_b = TxBuilder::aa(Address::random())
1071            .nonce(2)
1072            .valid_before(1000)
1073            .build();
1074
1075        state.track(&tx_a);
1076        state.track(&tx_b);
1077        state.track(&tx_a);
1078
1079        let bucket = state.expiry_map.get(&1000).unwrap();
1080        assert_eq!(bucket.len(), 2);
1081        assert!(bucket.contains(tx_a.hash()));
1082        assert!(bucket.contains(tx_b.hash()));
1083        assert_eq!(state.tx_to_expiry.get(tx_a.hash()), Some(&1000));
1084        assert_eq!(state.tx_to_expiry.get(tx_b.hash()), Some(&1000));
1085    }
1086
1087    #[test]
1088    fn untrack_removes_hash_and_empty_bucket() {
1089        let mut state = TempoPoolState::default();
1090        let hash_a = TxHash::random();
1091        let hash_b = TxHash::random();
1092        let hash_unknown = TxHash::random();
1093
1094        // Track two txs at the same valid_before
1095        insert_tracked_hash(&mut state, hash_a, 1000);
1096        insert_tracked_hash(&mut state, hash_b, 1000);
1097
1098        // Mine hash_a and an unknown hash
1099        state.untrack(&hash_a);
1100        state.untrack(&hash_unknown);
1101
1102        // hash_a removed from both maps
1103        assert!(!state.tx_to_expiry.contains_key(&hash_a));
1104        let bucket = state.expiry_map.get(&1000).unwrap();
1105        assert_eq!(bucket.len(), 1);
1106        assert!(bucket.contains(&hash_b));
1107
1108        // Mine hash_b should remove the expiry_map entry entirely
1109        state.untrack(&hash_b);
1110        assert!(!state.tx_to_expiry.contains_key(&hash_b));
1111        assert!(!state.expiry_map.contains_key(&1000));
1112    }
1113
1114    #[test]
1115    fn untrack_many_removes_hashes_by_expiry_bucket() {
1116        let mut state = TempoPoolState::default();
1117        let hash_a = TxHash::random();
1118        let hash_b = TxHash::random();
1119        let hash_c = TxHash::random();
1120        let hash_d = TxHash::random();
1121        let hash_unknown = TxHash::random();
1122
1123        insert_tracked_hash(&mut state, hash_a, 1000);
1124        insert_tracked_hash(&mut state, hash_b, 1000);
1125        insert_tracked_hash(&mut state, hash_c, 1000);
1126        insert_tracked_hash(&mut state, hash_d, 2000);
1127
1128        state.untrack_many([&hash_a, &hash_b, &hash_unknown, &hash_d]);
1129
1130        assert!(!state.tx_to_expiry.contains_key(&hash_a));
1131        assert!(!state.tx_to_expiry.contains_key(&hash_b));
1132        assert!(!state.tx_to_expiry.contains_key(&hash_d));
1133        assert_eq!(state.tx_to_expiry.get(&hash_c), Some(&1000));
1134
1135        let bucket = state.expiry_map.get(&1000).unwrap();
1136        assert_eq!(bucket.len(), 1);
1137        assert!(bucket.contains(&hash_c));
1138        assert!(!state.expiry_map.contains_key(&2000));
1139    }
1140
1141    #[test]
1142    fn drain_expired_removes_expired_buckets_and_returns_hashes() {
1143        let mut state = TempoPoolState::default();
1144        let hash_a = TxHash::random();
1145        let hash_b = TxHash::random();
1146        let hash_c = TxHash::random();
1147        let hash_d = TxHash::random();
1148
1149        insert_tracked_hash(&mut state, hash_a, 1000);
1150        insert_tracked_hash(&mut state, hash_b, 1000);
1151        insert_tracked_hash(&mut state, hash_c, 2000);
1152        insert_tracked_hash(&mut state, hash_d, 3000);
1153
1154        let expired = state.drain_expired(2000);
1155
1156        assert_hashes_eq(expired, &[hash_a, hash_b, hash_c]);
1157        assert!(!state.expiry_map.contains_key(&1000));
1158        assert!(!state.expiry_map.contains_key(&2000));
1159        assert!(state.expiry_map[&3000].contains(&hash_d));
1160        assert!(!state.tx_to_expiry.contains_key(&hash_a));
1161        assert!(!state.tx_to_expiry.contains_key(&hash_b));
1162        assert!(!state.tx_to_expiry.contains_key(&hash_c));
1163        assert_eq!(state.tx_to_expiry.get(&hash_d), Some(&3000));
1164    }
1165
1166    fn insert_tracked_hash(state: &mut TempoPoolState, hash: TxHash, expiry: u64) {
1167        state.expiry_map.entry(expiry).or_default().insert(hash);
1168        state.tx_to_expiry.insert(hash, expiry);
1169    }
1170
1171    fn assert_hashes_eq(actual: Vec<TxHash>, expected: &[TxHash]) {
1172        assert_eq!(actual.len(), expected.len());
1173        let actual: HashSet<TxHash> = actual.into_iter().collect();
1174        let expected: HashSet<TxHash> = expected.iter().copied().collect();
1175        assert_eq!(actual, expected);
1176    }
1177
1178    mod narrow_event_decoding {
1179        use super::*;
1180        use alloy_primitives::U256;
1181
1182        macro_rules! assert_decodes_like_generated {
1183            ($enum_ty:ident, $variant:ident, $event_ty:ty, $log:expr) => {{
1184                let expected = generated_decode::<$event_ty>(&$log);
1185                match $enum_ty::decode(&$log) {
1186                    Some($enum_ty::$variant(event)) => assert_eq!(event, expected),
1187                    _ => panic!("unexpected decoded event"),
1188                }
1189            }};
1190        }
1191
1192        macro_rules! assert_decodes_unit_like_generated {
1193            ($enum_ty:ident, $variant:ident, $event_ty:ty, $log:expr) => {{
1194                let _expected = generated_decode::<$event_ty>(&$log);
1195                assert!(
1196                    matches!($enum_ty::decode(&$log), Some($enum_ty::$variant)),
1197                    "unexpected decoded event"
1198                );
1199            }};
1200        }
1201
1202        fn event_log<T>(address: Address, event: T) -> Log
1203        where
1204            T: SolEvent,
1205            for<'a> &'a T: Into<alloy_primitives::LogData>,
1206        {
1207            Log::new_from_event_unchecked(address, event).reserialize()
1208        }
1209
1210        fn generated_decode<T: SolEvent>(log: &Log) -> T {
1211            T::decode_log(log)
1212                .expect("generated event decode should succeed")
1213                .data
1214        }
1215
1216        #[test]
1217        fn account_keychain_decode_matches_generated_event_decoders() {
1218            let log = event_log(
1219                ACCOUNT_KEYCHAIN_ADDRESS,
1220                IAccountKeychain::KeyAuthorized {
1221                    account: Address::random(),
1222                    publicKey: Address::random(),
1223                    signatureType: 0,
1224                    expiry: u64::MAX,
1225                },
1226            );
1227            assert_decodes_like_generated!(
1228                AccountKeychainPoolEvent,
1229                KeyAuthorized,
1230                IAccountKeychain::KeyAuthorized,
1231                log
1232            );
1233
1234            let log = event_log(
1235                ACCOUNT_KEYCHAIN_ADDRESS,
1236                IAccountKeychain::AdminKeyAuthorized {
1237                    account: Address::random(),
1238                    publicKey: Address::random(),
1239                },
1240            );
1241            assert_decodes_like_generated!(
1242                AccountKeychainPoolEvent,
1243                AdminKeyAuthorized,
1244                IAccountKeychain::AdminKeyAuthorized,
1245                log
1246            );
1247
1248            let log = event_log(
1249                ACCOUNT_KEYCHAIN_ADDRESS,
1250                IAccountKeychain::KeyRevoked {
1251                    account: Address::random(),
1252                    publicKey: Address::random(),
1253                },
1254            );
1255            assert_decodes_like_generated!(
1256                AccountKeychainPoolEvent,
1257                KeyRevoked,
1258                IAccountKeychain::KeyRevoked,
1259                log
1260            );
1261
1262            let log = event_log(
1263                ACCOUNT_KEYCHAIN_ADDRESS,
1264                IAccountKeychain::SpendingLimitUpdated {
1265                    account: Address::random(),
1266                    publicKey: Address::random(),
1267                    token: Address::random(),
1268                    newLimit: U256::from(12_345),
1269                },
1270            );
1271            assert_decodes_like_generated!(
1272                AccountKeychainPoolEvent,
1273                SpendingLimitUpdated,
1274                IAccountKeychain::SpendingLimitUpdated,
1275                log
1276            );
1277
1278            let log = event_log(
1279                ACCOUNT_KEYCHAIN_ADDRESS,
1280                IAccountKeychain::AccessKeySpend {
1281                    account: Address::random(),
1282                    publicKey: Address::random(),
1283                    token: Address::random(),
1284                    amount: U256::from(25),
1285                    remainingLimit: U256::from(75),
1286                },
1287            );
1288            assert_decodes_like_generated!(
1289                AccountKeychainPoolEvent,
1290                AccessKeySpend,
1291                IAccountKeychain::AccessKeySpend,
1292                log
1293            );
1294
1295            let log = event_log(
1296                ACCOUNT_KEYCHAIN_ADDRESS,
1297                IAccountKeychain::KeyAuthorizationWitnessBurned {
1298                    account: Address::random(),
1299                    witness: B256::random(),
1300                },
1301            );
1302            assert_decodes_like_generated!(
1303                AccountKeychainPoolEvent,
1304                KeyAuthorizationWitnessBurned,
1305                IAccountKeychain::KeyAuthorizationWitnessBurned,
1306                log
1307            );
1308        }
1309
1310        #[test]
1311        fn fee_manager_decode_matches_generated_event_decoders() {
1312            let log = event_log(
1313                TIP_FEE_MANAGER_ADDRESS,
1314                IFeeManager::ValidatorTokenSet {
1315                    validator: Address::random(),
1316                    token: Address::random(),
1317                },
1318            );
1319            assert_decodes_like_generated!(
1320                FeeManagerPoolEvent,
1321                ValidatorTokenSet,
1322                IFeeManager::ValidatorTokenSet,
1323                log
1324            );
1325
1326            let log = event_log(
1327                TIP_FEE_MANAGER_ADDRESS,
1328                IFeeManager::UserTokenSet {
1329                    user: Address::random(),
1330                    token: Address::random(),
1331                },
1332            );
1333            assert_decodes_like_generated!(
1334                FeeManagerPoolEvent,
1335                UserTokenSet,
1336                IFeeManager::UserTokenSet,
1337                log
1338            );
1339        }
1340
1341        #[test]
1342        fn tip403_decode_matches_generated_event_decoders() {
1343            let log = event_log(
1344                TIP403_REGISTRY_ADDRESS,
1345                ITIP403Registry::BlacklistUpdated {
1346                    policyId: 7,
1347                    updater: Address::random(),
1348                    account: Address::random(),
1349                    restricted: true,
1350                },
1351            );
1352            assert_decodes_like_generated!(
1353                Tip403PoolEvent,
1354                BlacklistUpdated,
1355                ITIP403Registry::BlacklistUpdated,
1356                log
1357            );
1358
1359            let log = event_log(
1360                TIP403_REGISTRY_ADDRESS,
1361                ITIP403Registry::WhitelistUpdated {
1362                    policyId: 9,
1363                    updater: Address::random(),
1364                    account: Address::random(),
1365                    allowed: false,
1366                },
1367            );
1368            assert_decodes_like_generated!(
1369                Tip403PoolEvent,
1370                WhitelistUpdated,
1371                ITIP403Registry::WhitelistUpdated,
1372                log
1373            );
1374        }
1375
1376        #[test]
1377        fn tip20_decode_matches_generated_event_decoders() {
1378            let token = tempo_precompiles::PATH_USD_ADDRESS;
1379            let log = event_log(
1380                token,
1381                ITIP20::PauseStateUpdate {
1382                    updater: Address::random(),
1383                    isPaused: true,
1384                },
1385            );
1386            assert_decodes_like_generated!(
1387                Tip20PoolEvent,
1388                PauseStateUpdate,
1389                ITIP20::PauseStateUpdate,
1390                log
1391            );
1392
1393            let log = event_log(
1394                token,
1395                ITIP20::TransferPolicyUpdate {
1396                    updater: Address::random(),
1397                    newPolicyId: 11,
1398                },
1399            );
1400            assert_decodes_unit_like_generated!(
1401                Tip20PoolEvent,
1402                TransferPolicyUpdate,
1403                ITIP20::TransferPolicyUpdate,
1404                log
1405            );
1406
1407            let log = event_log(
1408                token,
1409                ITIP20::QuoteTokenUpdate {
1410                    updater: Address::random(),
1411                    newQuoteToken: Address::random(),
1412                },
1413            );
1414            assert_decodes_unit_like_generated!(
1415                Tip20PoolEvent,
1416                QuoteTokenUpdate,
1417                ITIP20::QuoteTokenUpdate,
1418                log
1419            );
1420
1421            let log = event_log(
1422                token,
1423                ITIP20::Transfer {
1424                    from: Address::random(),
1425                    to: Address::random(),
1426                    amount: U256::from(42),
1427                },
1428            );
1429            // `Transfer` decoding is specialized to read only the indexed `from` topic, so
1430            // compare that against the field a full event decode would produce.
1431            let expected = generated_decode::<ITIP20::Transfer>(&log);
1432            match Tip20PoolEvent::decode(&log) {
1433                Some(Tip20PoolEvent::Transfer { from }) => assert_eq!(from, expected.from),
1434                _ => panic!("unexpected decoded event"),
1435            }
1436        }
1437    }
1438
1439    fn create_test_chain(
1440        blocks: Vec<reth_primitives_traits::RecoveredBlock<Block>>,
1441    ) -> Arc<Chain<TempoPrimitives>> {
1442        create_test_chain_with_receipts(blocks, Vec::new())
1443    }
1444
1445    fn create_test_chain_with_receipts(
1446        blocks: Vec<reth_primitives_traits::RecoveredBlock<Block>>,
1447        receipts: Vec<Vec<tempo_primitives::TempoReceipt>>,
1448    ) -> Arc<Chain<TempoPrimitives>> {
1449        use reth_provider::{Chain, ExecutionOutcome};
1450
1451        Arc::new(Chain::new(
1452            blocks,
1453            ExecutionOutcome {
1454                receipts,
1455                ..Default::default()
1456            },
1457            Default::default(),
1458        ))
1459    }
1460
1461    /// Helper to create a recovered block containing the given transactions.
1462    fn create_block_with_txs(
1463        block_number: u64,
1464        transactions: Vec<TempoTxEnvelope>,
1465        senders: Vec<Address>,
1466    ) -> RecoveredBlock<Block> {
1467        let header = TempoHeader {
1468            inner: alloy_consensus::Header {
1469                number: block_number,
1470                ..Default::default()
1471            },
1472            ..Default::default()
1473        };
1474        let body = BlockBody {
1475            transactions,
1476            ..Default::default()
1477        };
1478        let block = Block::new(header, body);
1479        RecoveredBlock::new_unhashed(block, senders)
1480    }
1481
1482    /// Helper to extract a TempoTxEnvelope from a TempoPooledTransaction.
1483    fn extract_envelope(tx: &crate::transaction::TempoPooledTransaction) -> TempoTxEnvelope {
1484        tx.inner().clone().into_inner()
1485    }
1486
1487    mod from_chain_spending_limit_spends {
1488        use super::*;
1489        use alloy_primitives::{IntoLogData, Log, U256};
1490        use alloy_signer_local::PrivateKeySigner;
1491        use tempo_primitives::{TempoReceipt, TempoTxType};
1492
1493        /// Verify from_chain uses AccessKeySpend logs so it can track the actually spent token
1494        /// even when it differs from the mined tx's fee token.
1495        #[test]
1496        fn extracts_access_key_spend_events() {
1497            let user_address = Address::random();
1498            let access_key_signer = PrivateKeySigner::random();
1499            let key_id = access_key_signer.address();
1500            let fee_token = Address::random();
1501            let spent_token = Address::random();
1502
1503            let keychain_tx = TxBuilder::aa(user_address)
1504                .fee_token(fee_token)
1505                .build_keychain(user_address, &access_key_signer);
1506            let envelope = extract_envelope(&keychain_tx);
1507
1508            let spend_log = alloy_primitives::Log::new_from_event_unchecked(
1509                ACCOUNT_KEYCHAIN_ADDRESS,
1510                IAccountKeychain::AccessKeySpend {
1511                    account: user_address,
1512                    publicKey: key_id,
1513                    token: spent_token,
1514                    amount: U256::from(25),
1515                    remainingLimit: U256::from(75),
1516                },
1517            )
1518            .reserialize();
1519            let receipt = tempo_primitives::TempoReceipt {
1520                tx_type: tempo_primitives::TempoTxType::AA,
1521                success: true,
1522                cumulative_gas_used: 1,
1523                logs: vec![spend_log],
1524            };
1525
1526            let block = create_block_with_txs(1, vec![envelope], vec![user_address]);
1527            let chain = create_test_chain_with_receipts(vec![block], vec![vec![receipt]]);
1528
1529            let updates = TempoPoolUpdates::from_chain(&chain);
1530
1531            assert!(
1532                updates
1533                    .spending_limit_spends
1534                    .contains(user_address, key_id, spent_token),
1535                "Should contain the AccessKeySpend event's (account, key_id, token)"
1536            );
1537            assert!(
1538                !updates
1539                    .spending_limit_spends
1540                    .contains(user_address, key_id, fee_token),
1541                "Should not infer spends from the tx fee token"
1542            );
1543            assert_eq!(updates.spending_limit_spends.len(), 1);
1544        }
1545
1546        #[test]
1547        fn extracts_key_authorization_witness_burned_events() {
1548            let account = Address::random();
1549            let witness = B256::random();
1550
1551            let log = alloy_primitives::Log::new_from_event_unchecked(
1552                ACCOUNT_KEYCHAIN_ADDRESS,
1553                IAccountKeychain::KeyAuthorizationWitnessBurned { account, witness },
1554            )
1555            .reserialize();
1556            let receipt = tempo_primitives::TempoReceipt {
1557                tx_type: tempo_primitives::TempoTxType::AA,
1558                success: true,
1559                cumulative_gas_used: 1,
1560                logs: vec![log],
1561            };
1562
1563            let block = create_block_with_txs(1, vec![], vec![]);
1564            let chain = create_test_chain_with_receipts(vec![block], vec![vec![receipt]]);
1565
1566            let updates = TempoPoolUpdates::from_chain(&chain);
1567
1568            assert!(
1569                updates
1570                    .key_authorization_witness_burns
1571                    .get(&account)
1572                    .is_some_and(|witnesses| witnesses.contains(&witness)),
1573                "Should contain the burned (account, witness)"
1574            );
1575            assert!(updates.has_invalidation_events());
1576        }
1577
1578        /// The pool should only track actual AccessKeySpend events, not infer spends from the
1579        /// mined transaction body.
1580        #[test]
1581        fn ignores_keychain_transactions_without_access_key_spend_logs() {
1582            let user_address = Address::random();
1583            let access_key_signer = PrivateKeySigner::random();
1584            let fee_token = Address::random();
1585
1586            let keychain_tx = TxBuilder::aa(user_address)
1587                .fee_token(fee_token)
1588                .build_keychain(user_address, &access_key_signer);
1589            let envelope = extract_envelope(&keychain_tx);
1590
1591            let block = create_block_with_txs(1, vec![envelope], vec![user_address]);
1592            let chain = create_test_chain(vec![block]);
1593
1594            let updates = TempoPoolUpdates::from_chain(&chain);
1595            assert!(updates.spending_limit_spends.is_empty());
1596        }
1597
1598        /// Non-keychain AA txs should NOT produce spending limit spends.
1599        #[test]
1600        fn ignores_non_keychain_aa_transactions() {
1601            let sender = Address::random();
1602            let tx = TxBuilder::aa(sender).fee_token(Address::random()).build();
1603            let envelope = extract_envelope(&tx);
1604
1605            let block = create_block_with_txs(1, vec![envelope], vec![sender]);
1606            let chain = create_test_chain(vec![block]);
1607
1608            let updates = TempoPoolUpdates::from_chain(&chain);
1609            assert!(updates.spending_limit_spends.is_empty());
1610        }
1611
1612        /// EIP-1559 txs should NOT produce spending limit spends.
1613        #[test]
1614        fn ignores_eip1559_transactions() {
1615            let sender = Address::random();
1616            let tx = TxBuilder::eip1559(Address::random()).build_eip1559();
1617            let envelope = extract_envelope(&tx);
1618
1619            let block = create_block_with_txs(1, vec![envelope], vec![sender]);
1620            let chain = create_test_chain(vec![block]);
1621
1622            let updates = TempoPoolUpdates::from_chain(&chain);
1623            assert!(updates.spending_limit_spends.is_empty());
1624        }
1625
1626        /// has_invalidation_events returns true when spending_limit_spends is non-empty.
1627        #[test]
1628        fn has_invalidation_events_includes_spending_limit_spends() {
1629            let mut updates = TempoPoolUpdates::new();
1630            assert!(!updates.has_invalidation_events());
1631
1632            updates.spending_limit_spends.insert(
1633                Address::random(),
1634                Address::random(),
1635                Some(Address::random()),
1636            );
1637            assert!(updates.has_invalidation_events());
1638        }
1639
1640        #[test]
1641        fn extracts_fee_balance_changes_from_tip20_transfer_logs() {
1642            let fee_token = tempo_precompiles::PATH_USD_ADDRESS;
1643            let from = Address::random();
1644            let to = Address::random();
1645            let amount = U256::from(42_u64);
1646            let log_data = ITIP20::Transfer { from, to, amount }.into_log_data();
1647            let log =
1648                Log::new_unchecked(fee_token, log_data.topics().to_vec(), log_data.data.clone());
1649            let receipt = TempoReceipt {
1650                tx_type: TempoTxType::Legacy,
1651                success: true,
1652                cumulative_gas_used: 21_000,
1653                logs: vec![log],
1654            };
1655
1656            let block = create_block_with_txs(1, vec![], vec![]);
1657            let chain = create_test_chain_with_receipts(vec![block], vec![vec![receipt]]);
1658            let updates = TempoPoolUpdates::from_chain(&chain);
1659
1660            assert!(
1661                updates
1662                    .fee_balance_changes
1663                    .get(&fee_token)
1664                    .is_some_and(|accounts| accounts.len() == 1 && accounts.contains(&from)),
1665                "TIP20 transfer logs should only mark the debited sender as balance-changed"
1666            );
1667            assert!(updates.has_invalidation_events());
1668        }
1669
1670        /// TransferPolicyUpdate events are parsed from TIP20 token logs.
1671        #[test]
1672        fn extracts_transfer_policy_updates() {
1673            let fee_token = tempo_precompiles::PATH_USD_ADDRESS;
1674            let updater = Address::random();
1675            let new_policy_id = 42u64;
1676            let log_data = ITIP20::TransferPolicyUpdate {
1677                updater,
1678                newPolicyId: new_policy_id,
1679            }
1680            .into_log_data();
1681            let log =
1682                Log::new_unchecked(fee_token, log_data.topics().to_vec(), log_data.data.clone());
1683            let receipt = TempoReceipt {
1684                tx_type: TempoTxType::Legacy,
1685                success: true,
1686                cumulative_gas_used: 21_000,
1687                logs: vec![log],
1688            };
1689
1690            let block = create_block_with_txs(1, vec![], vec![]);
1691            let chain = create_test_chain_with_receipts(vec![block], vec![vec![receipt]]);
1692            let updates = TempoPoolUpdates::from_chain(&chain);
1693
1694            assert!(
1695                updates.transfer_policy_updates.contains(&fee_token),
1696                "TransferPolicyUpdate should be tracked by token address"
1697            );
1698        }
1699
1700        /// Duplicate TransferPolicyUpdate events for the same token are deduplicated.
1701        #[test]
1702        fn transfer_policy_updates_deduplicates_by_token() {
1703            let fee_token = tempo_precompiles::PATH_USD_ADDRESS;
1704
1705            let log_data_1 = ITIP20::TransferPolicyUpdate {
1706                updater: Address::random(),
1707                newPolicyId: 1,
1708            }
1709            .into_log_data();
1710            let log_data_2 = ITIP20::TransferPolicyUpdate {
1711                updater: Address::random(),
1712                newPolicyId: 2,
1713            }
1714            .into_log_data();
1715            let log1 = Log::new_unchecked(
1716                fee_token,
1717                log_data_1.topics().to_vec(),
1718                log_data_1.data.clone(),
1719            );
1720            let log2 = Log::new_unchecked(
1721                fee_token,
1722                log_data_2.topics().to_vec(),
1723                log_data_2.data.clone(),
1724            );
1725            let receipt = TempoReceipt {
1726                tx_type: TempoTxType::Legacy,
1727                success: true,
1728                cumulative_gas_used: 21_000,
1729                logs: vec![log1, log2],
1730            };
1731
1732            let block = create_block_with_txs(1, vec![], vec![]);
1733            let chain = create_test_chain_with_receipts(vec![block], vec![vec![receipt]]);
1734            let updates = TempoPoolUpdates::from_chain(&chain);
1735
1736            assert_eq!(
1737                updates.transfer_policy_updates.len(),
1738                1,
1739                "duplicate policy updates for the same token should be deduplicated"
1740            );
1741        }
1742
1743        /// Duplicate validator token changes must be deduplicated (last-write-wins).
1744        #[test]
1745        fn validator_token_changes_deduplicates_by_validator() {
1746            let validator = Address::random();
1747            let token_a = Address::random();
1748            let token_b = Address::random();
1749
1750            let mut updates = TempoPoolUpdates::new();
1751            updates.validator_token_changes.insert(validator, token_a);
1752            updates.validator_token_changes.insert(validator, token_b);
1753
1754            assert_eq!(
1755                updates.validator_token_changes.len(),
1756                1,
1757                "duplicate validator entries must be deduplicated"
1758            );
1759            assert_eq!(
1760                updates.validator_token_changes.get(&validator).copied(),
1761                Some(token_b),
1762                "last-write-wins: second token should overwrite the first"
1763            );
1764        }
1765    }
1766}