Skip to main content

tempo_transaction_pool/
paused.rs

1//! Pool for transactions whose fee token is temporarily paused.
2//!
3//! When a TIP20 fee token emits `PauseStateUpdate(isPaused=true)`, transactions
4//! using that fee token are moved here instead of being evicted entirely.
5//! When the token is unpaused, transactions are moved back to the main pool
6//! and re-validated.
7
8use crate::{RevokedKeys, SpendingLimitUpdates, transaction::TempoPooledTransaction};
9use alloy_primitives::{Address, TxHash, map::HashMap};
10use reth_transaction_pool::{PoolTransaction, ValidPoolTransaction};
11use std::{sync::Arc, time::Instant};
12
13/// Duration after which paused transactions are expired and removed.
14/// If a token isn't unpaused within this time, we clear all pending transactions.
15pub const PAUSED_TX_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30 * 60); // 30 minutes
16
17/// Global cap on the total number of paused transactions across all tokens.
18///
19/// Without this cap, an attacker could repeatedly fill the main pool, trigger a pause, and shift
20/// transactions into the paused pool indefinitely. This bounds memory usage regardless of how many
21/// tokens are paused or how frequently pause events occur.
22pub const PAUSED_POOL_GLOBAL_CAP: usize = 10_000;
23
24/// Entry in the paused pool.
25#[derive(Debug, Clone)]
26pub struct PausedEntry {
27    /// The valid pool transaction that was paused (Arc to avoid expensive clones).
28    pub tx: Arc<ValidPoolTransaction<TempoPooledTransaction>>,
29    /// The `valid_before` timestamp, if any (for expiry tracking).
30    pub valid_before: Option<u64>,
31}
32
33/// Metadata for a paused fee token.
34#[derive(Debug, Clone)]
35struct PausedTokenMeta {
36    /// When this token was paused.
37    paused_at: Instant,
38    /// Transactions waiting for this token to be unpaused.
39    entries: Vec<PausedEntry>,
40}
41
42/// Pool for transactions whose fee token is temporarily paused.
43///
44/// Transactions are indexed by fee token address for efficient batch operations.
45/// Since all transactions for a token are paused/unpaused together, we track
46/// the pause timestamp at the token level rather than per-transaction.
47#[derive(Debug, Default)]
48pub struct PausedFeeTokenPool {
49    /// Fee token -> metadata including pause time and entries
50    by_token: HashMap<Address, PausedTokenMeta>,
51}
52
53impl PausedFeeTokenPool {
54    /// Creates a new empty paused pool.
55    pub fn new() -> Self {
56        Self::default()
57    }
58
59    /// Returns the total number of paused transactions across all tokens.
60    pub fn len(&self) -> usize {
61        self.by_token.values().map(|m| m.entries.len()).sum()
62    }
63
64    /// Returns true if there are no paused transactions.
65    pub fn is_empty(&self) -> bool {
66        self.by_token.is_empty()
67    }
68
69    /// Inserts transactions for a fee token into the paused pool.
70    ///
71    /// Takes the full batch at once since all transactions for a token
72    /// are paused together. The pause timestamp is recorded at insertion time.
73    ///
74    /// Enforces [`PAUSED_POOL_GLOBAL_CAP`]: if adding the batch would exceed the cap,
75    /// the oldest-paused tokens are evicted first to make room. If the batch itself
76    /// exceeds the cap, it is truncated.
77    ///
78    /// Returns the number of existing entries that were evicted to make room.
79    pub fn insert_batch(&mut self, fee_token: Address, entries: Vec<PausedEntry>) -> usize {
80        if entries.is_empty() {
81            return 0;
82        }
83
84        let current = self.len();
85        let incoming = entries.len();
86        let available = PAUSED_POOL_GLOBAL_CAP.saturating_sub(current);
87        let mut evicted = 0;
88
89        if incoming > available {
90            let need = incoming - available;
91            evicted = self.evict_oldest(need);
92        }
93
94        let remaining_capacity = PAUSED_POOL_GLOBAL_CAP.saturating_sub(self.len());
95        let to_insert = if incoming > remaining_capacity {
96            entries.into_iter().take(remaining_capacity).collect()
97        } else {
98            entries
99        };
100
101        self.by_token
102            .entry(fee_token)
103            .or_insert_with(|| PausedTokenMeta {
104                paused_at: Instant::now(),
105                entries: Vec::new(),
106            })
107            .entries
108            .extend(to_insert);
109
110        evicted
111    }
112
113    /// Evicts at least `need` entries from the oldest-paused tokens.
114    ///
115    /// Returns the total number of entries evicted.
116    fn evict_oldest(&mut self, need: usize) -> usize {
117        let mut tokens_by_age: Vec<_> = self
118            .by_token
119            .iter()
120            .map(|(addr, meta)| (*addr, meta.paused_at))
121            .collect();
122        tokens_by_age.sort_unstable_by_key(|(_, paused_at)| *paused_at);
123
124        let mut evicted = 0;
125        for (token, _) in tokens_by_age {
126            if evicted >= need {
127                break;
128            }
129            if let Some(meta) = self.by_token.remove(&token) {
130                evicted += meta.entries.len();
131            }
132        }
133        evicted
134    }
135
136    /// Drains all transactions for a given fee token.
137    ///
138    /// Returns the list of paused entries for that token.
139    pub fn drain_token(&mut self, fee_token: &Address) -> Vec<PausedEntry> {
140        self.by_token
141            .remove(fee_token)
142            .map(|m| m.entries)
143            .unwrap_or_default()
144    }
145
146    /// Returns the number of transactions paused for a given fee token.
147    pub fn count_for_token(&self, fee_token: &Address) -> usize {
148        self.by_token.get(fee_token).map_or(0, |m| m.entries.len())
149    }
150
151    /// Returns true if a transaction with the given hash is in the paused pool.
152    pub fn contains(&self, tx_hash: &TxHash) -> bool {
153        self.by_token
154            .values()
155            .any(|m| m.entries.iter().any(|e| e.tx.hash() == tx_hash))
156    }
157
158    /// Evicts expired transactions based on `valid_before` timestamp.
159    ///
160    /// Returns the number of transactions removed.
161    pub fn evict_expired(&mut self, tip_timestamp: u64) -> usize {
162        let mut count = 0;
163        for meta in self.by_token.values_mut() {
164            let before = meta.entries.len();
165            meta.entries
166                .retain(|e| e.valid_before.is_none_or(|vb| vb > tip_timestamp));
167            count += before - meta.entries.len();
168        }
169        // Clean up empty token entries
170        self.by_token.retain(|_, m| !m.entries.is_empty());
171        count
172    }
173
174    /// Evicts all transactions for tokens that have been paused for too long (timeout).
175    ///
176    /// Since all transactions for a token are paused together, we evict the entire
177    /// token's transactions when the token-level timeout expires.
178    ///
179    /// Returns the number of transactions removed.
180    pub fn evict_timed_out(&mut self) -> usize {
181        let now = Instant::now();
182        let mut count = 0;
183        self.by_token.retain(|_, meta| {
184            if now.duration_since(meta.paused_at) >= PAUSED_TX_TIMEOUT {
185                count += meta.entries.len();
186                false
187            } else {
188                true
189            }
190        });
191        count
192    }
193
194    /// Removes transactions matching invalidation criteria from the paused pool.
195    ///
196    /// This handles revoked keys, spending limit updates, and spending limit spends
197    /// in a single pass. The `spending_limit_spends` parameter captures (account, key_id,
198    /// fee_token) combos from keychain txs that were included in the block and decremented
199    /// limits via `verify_and_update_spending()`.
200    /// Uses account-keyed indexes for O(1) account lookup per transaction.
201    /// Returns the number of transactions removed.
202    pub fn evict_invalidated(
203        &mut self,
204        revoked_keys: &RevokedKeys,
205        spending_limit_updates: &SpendingLimitUpdates,
206        spending_limit_spends: &SpendingLimitUpdates,
207    ) -> usize {
208        if revoked_keys.is_empty()
209            && spending_limit_updates.is_empty()
210            && spending_limit_spends.is_empty()
211        {
212            return 0;
213        }
214
215        let mut count = 0;
216        for meta in self.by_token.values_mut() {
217            let before = meta.entries.len();
218            meta.entries.retain(|entry| {
219                let Some(subject) = entry.tx.transaction.keychain_subject() else {
220                    return true;
221                };
222                let matches_limit_update =
223                    subject.matches_spending_limit_update(spending_limit_updates);
224                let matches_limit_spend =
225                    subject.matches_spending_limit_update(spending_limit_spends);
226                let sender_paid = if matches_limit_update || matches_limit_spend {
227                    let sender = *entry.tx.transaction.sender_ref();
228                    entry
229                        .tx
230                        .transaction
231                        .inner()
232                        .fee_payer(sender)
233                        .map_or(true, |fee_payer| fee_payer == sender)
234                } else {
235                    false
236                };
237
238                let invalidated = subject.matches_revoked(revoked_keys)
239                    || (sender_paid && (matches_limit_update || matches_limit_spend));
240
241                !invalidated
242            });
243            count += before - meta.entries.len();
244        }
245        // Clean up empty token entries
246        self.by_token.retain(|_, m| !m.entries.is_empty());
247        count
248    }
249
250    /// Returns an iterator over all paused entries across all tokens.
251    pub fn all_entries(&self) -> impl Iterator<Item = &PausedEntry> {
252        self.by_token.values().flat_map(|m| &m.entries)
253    }
254}
255
256#[cfg(test)]
257mod tests {
258    use super::*;
259    use crate::test_utils::{TxBuilder, wrap_valid_tx};
260    use alloy_signer::SignerSync;
261    use alloy_signer_local::PrivateKeySigner;
262    use reth_primitives_traits::Recovered;
263    use reth_transaction_pool::TransactionOrigin;
264    use tempo_primitives::{TempoTxEnvelope, transaction::tt_signed::AASigned};
265
266    fn create_valid_tx(sender: Address) -> Arc<ValidPoolTransaction<TempoPooledTransaction>> {
267        let pooled = TxBuilder::aa(sender).build();
268        Arc::new(wrap_valid_tx(pooled, TransactionOrigin::External))
269    }
270
271    fn create_valid_keychain_tx(
272        sender: Address,
273        fee_token: Address,
274        sponsored: bool,
275    ) -> Arc<ValidPoolTransaction<TempoPooledTransaction>> {
276        let access_key_signer = PrivateKeySigner::random();
277        let pooled = TxBuilder::aa(sender)
278            .fee_token(fee_token)
279            .build_keychain(sender, &access_key_signer);
280
281        let pooled = if sponsored {
282            let sponsor = PrivateKeySigner::random();
283            let aa = pooled
284                .inner()
285                .as_aa()
286                .expect("builder should produce AA tx");
287            let mut tx = aa.tx().clone();
288            tx.fee_payer_signature = Some(alloy_primitives::Signature::new(
289                alloy_primitives::U256::ZERO,
290                alloy_primitives::U256::ZERO,
291                false,
292            ));
293            let fee_payer_hash = tx.fee_payer_signature_hash(sender);
294            tx.fee_payer_signature = Some(
295                sponsor
296                    .sign_hash_sync(&fee_payer_hash)
297                    .expect("sponsor signing should succeed"),
298            );
299
300            let aa_signed = AASigned::new_unhashed(tx, aa.signature().clone());
301            let envelope: TempoTxEnvelope = aa_signed.into();
302            TempoPooledTransaction::new(Recovered::new_unchecked(envelope, sender))
303        } else {
304            pooled
305        };
306
307        Arc::new(wrap_valid_tx(pooled, TransactionOrigin::External))
308    }
309
310    #[test]
311    fn test_insert_and_drain() {
312        let mut pool = PausedFeeTokenPool::new();
313        let fee_token = Address::random();
314
315        let entries: Vec<_> = (0..3)
316            .map(|_| PausedEntry {
317                tx: create_valid_tx(Address::random()),
318                valid_before: None,
319            })
320            .collect();
321
322        assert!(pool.is_empty());
323        pool.insert_batch(fee_token, entries);
324
325        assert_eq!(pool.len(), 3);
326        assert_eq!(pool.count_for_token(&fee_token), 3);
327
328        let drained = pool.drain_token(&fee_token);
329        assert_eq!(drained.len(), 3);
330        assert!(pool.is_empty());
331    }
332
333    #[test]
334    fn test_evict_expired() {
335        let mut pool = PausedFeeTokenPool::new();
336        let fee_token = Address::random();
337
338        let entries = vec![
339            PausedEntry {
340                tx: create_valid_tx(Address::random()),
341                valid_before: Some(100), // Will expire
342            },
343            PausedEntry {
344                tx: create_valid_tx(Address::random()),
345                valid_before: Some(200), // Won't expire
346            },
347            PausedEntry {
348                tx: create_valid_tx(Address::random()),
349                valid_before: None, // No expiry
350            },
351        ];
352
353        pool.insert_batch(fee_token, entries);
354        assert_eq!(pool.len(), 3);
355
356        let evicted = pool.evict_expired(150);
357        assert_eq!(evicted, 1);
358        assert_eq!(pool.len(), 2);
359    }
360
361    #[test]
362    fn test_global_cap_evicts_oldest() {
363        let mut pool = PausedFeeTokenPool::new();
364
365        let token_a = Address::random();
366        let token_b = Address::random();
367
368        let make_entries = |n: usize| -> Vec<PausedEntry> {
369            (0..n)
370                .map(|_| PausedEntry {
371                    tx: create_valid_tx(Address::random()),
372                    valid_before: None,
373                })
374                .collect()
375        };
376
377        // Fill to cap
378        let evicted = pool.insert_batch(token_a, make_entries(PAUSED_POOL_GLOBAL_CAP));
379        assert_eq!(evicted, 0);
380        assert_eq!(pool.len(), PAUSED_POOL_GLOBAL_CAP);
381
382        // Inserting more should evict token_a (oldest) to make room
383        let evicted = pool.insert_batch(token_b, make_entries(100));
384        assert!(evicted > 0);
385        assert!(pool.len() <= PAUSED_POOL_GLOBAL_CAP);
386        assert_eq!(pool.count_for_token(&token_b), 100);
387    }
388
389    #[test]
390    fn test_global_cap_truncates_oversized_batch() {
391        let mut pool = PausedFeeTokenPool::new();
392        let token = Address::random();
393
394        let entries: Vec<_> = (0..PAUSED_POOL_GLOBAL_CAP + 500)
395            .map(|_| PausedEntry {
396                tx: create_valid_tx(Address::random()),
397                valid_before: None,
398            })
399            .collect();
400
401        let evicted = pool.insert_batch(token, entries);
402        assert_eq!(evicted, 0);
403        assert_eq!(pool.len(), PAUSED_POOL_GLOBAL_CAP);
404    }
405
406    #[test]
407    fn test_evict_invalidated_with_spending_limit_spends() {
408        let mut pool = PausedFeeTokenPool::new();
409        let user_address = Address::random();
410        let fee_token = Address::random();
411
412        // Create a keychain-signed tx using the builder
413        let access_key_signer = alloy_signer_local::PrivateKeySigner::random();
414        let key_id = alloy_signer::Signer::address(&access_key_signer);
415        let tx = TxBuilder::aa(user_address)
416            .fee_token(fee_token)
417            .build_keychain(user_address, &access_key_signer);
418        let tx = Arc::new(wrap_valid_tx(
419            tx,
420            reth_transaction_pool::TransactionOrigin::External,
421        ));
422
423        // Also add a non-keychain tx that should NOT be evicted
424        let other_tx = create_valid_tx(Address::random());
425
426        pool.insert_batch(
427            fee_token,
428            vec![
429                PausedEntry {
430                    tx,
431                    valid_before: None,
432                },
433                PausedEntry {
434                    tx: other_tx,
435                    valid_before: None,
436                },
437            ],
438        );
439        assert_eq!(pool.len(), 2);
440
441        // Build spending_limit_spends matching the keychain tx
442        let mut spends = SpendingLimitUpdates::new();
443        spends.insert(user_address, key_id, Some(fee_token));
444
445        let evicted =
446            pool.evict_invalidated(&RevokedKeys::new(), &SpendingLimitUpdates::new(), &spends);
447
448        assert_eq!(
449            evicted, 1,
450            "Should evict the keychain tx matching the spend"
451        );
452        assert_eq!(pool.len(), 1, "Non-keychain tx should remain");
453    }
454
455    #[test]
456    fn test_evict_invalidated_keeps_sponsored_keychain_for_spending_limit_spends() {
457        let mut pool = PausedFeeTokenPool::new();
458        let user_address = Address::random();
459        let fee_token = Address::random();
460
461        let sponsored_keychain_tx = create_valid_keychain_tx(user_address, fee_token, true);
462        pool.insert_batch(
463            fee_token,
464            vec![PausedEntry {
465                tx: sponsored_keychain_tx,
466                valid_before: None,
467            }],
468        );
469
470        let key_id = pool
471            .all_entries()
472            .next()
473            .and_then(|entry| entry.tx.transaction.keychain_subject())
474            .map(|subject| subject.key_id)
475            .expect("sponsored keychain tx should have keychain subject");
476
477        let mut spends = SpendingLimitUpdates::new();
478        spends.insert(user_address, key_id, Some(fee_token));
479
480        let evicted =
481            pool.evict_invalidated(&RevokedKeys::new(), &SpendingLimitUpdates::new(), &spends);
482
483        assert_eq!(evicted, 0, "Sponsored keychain tx should not be evicted");
484        assert_eq!(pool.len(), 1);
485    }
486
487    #[test]
488    fn test_evict_invalidated_keeps_sponsored_keychain_for_spending_limit_updates() {
489        let mut pool = PausedFeeTokenPool::new();
490        let user_address = Address::random();
491        let fee_token = Address::random();
492
493        let sponsored_keychain_tx = create_valid_keychain_tx(user_address, fee_token, true);
494        pool.insert_batch(
495            fee_token,
496            vec![PausedEntry {
497                tx: sponsored_keychain_tx,
498                valid_before: None,
499            }],
500        );
501
502        let key_id = pool
503            .all_entries()
504            .next()
505            .and_then(|entry| entry.tx.transaction.keychain_subject())
506            .map(|subject| subject.key_id)
507            .expect("sponsored keychain tx should have keychain subject");
508
509        let mut updates = SpendingLimitUpdates::new();
510        updates.insert(user_address, key_id, Some(fee_token));
511
512        let evicted =
513            pool.evict_invalidated(&RevokedKeys::new(), &updates, &SpendingLimitUpdates::new());
514
515        assert_eq!(evicted, 0, "Sponsored keychain tx should not be evicted");
516        assert_eq!(pool.len(), 1);
517    }
518
519    #[test]
520    fn test_contains() {
521        let mut pool = PausedFeeTokenPool::new();
522        let fee_token = Address::random();
523
524        let tx = create_valid_tx(Address::random());
525        let tx_hash = *tx.hash();
526
527        let entry = PausedEntry {
528            tx,
529            valid_before: None,
530        };
531
532        assert!(!pool.contains(&tx_hash));
533        pool.insert_batch(fee_token, vec![entry]);
534        assert!(pool.contains(&tx_hash));
535    }
536}