1use crate::{RevokedKeys, SpendingLimitUpdates, transaction::TempoPooledTransaction};
9use alloy_primitives::{Address, TxHash, map::HashMap};
10use reth_transaction_pool::{PoolTransaction, ValidPoolTransaction};
11use std::{sync::Arc, time::Instant};
12
13pub const PAUSED_TX_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30 * 60); pub const PAUSED_POOL_GLOBAL_CAP: usize = 10_000;
23
24#[derive(Debug, Clone)]
26pub struct PausedEntry {
27 pub tx: Arc<ValidPoolTransaction<TempoPooledTransaction>>,
29 pub valid_before: Option<u64>,
31}
32
33#[derive(Debug, Clone)]
35struct PausedTokenMeta {
36 paused_at: Instant,
38 entries: Vec<PausedEntry>,
40}
41
42#[derive(Debug, Default)]
48pub struct PausedFeeTokenPool {
49 by_token: HashMap<Address, PausedTokenMeta>,
51}
52
53impl PausedFeeTokenPool {
54 pub fn new() -> Self {
56 Self::default()
57 }
58
59 pub fn len(&self) -> usize {
61 self.by_token.values().map(|m| m.entries.len()).sum()
62 }
63
64 pub fn is_empty(&self) -> bool {
66 self.by_token.is_empty()
67 }
68
69 pub fn insert_batch(&mut self, fee_token: Address, entries: Vec<PausedEntry>) -> usize {
80 if entries.is_empty() {
81 return 0;
82 }
83
84 let current = self.len();
85 let incoming = entries.len();
86 let available = PAUSED_POOL_GLOBAL_CAP.saturating_sub(current);
87 let mut evicted = 0;
88
89 if incoming > available {
90 let need = incoming - available;
91 evicted = self.evict_oldest(need);
92 }
93
94 let remaining_capacity = PAUSED_POOL_GLOBAL_CAP.saturating_sub(self.len());
95 let to_insert = if incoming > remaining_capacity {
96 entries.into_iter().take(remaining_capacity).collect()
97 } else {
98 entries
99 };
100
101 self.by_token
102 .entry(fee_token)
103 .or_insert_with(|| PausedTokenMeta {
104 paused_at: Instant::now(),
105 entries: Vec::new(),
106 })
107 .entries
108 .extend(to_insert);
109
110 evicted
111 }
112
113 fn evict_oldest(&mut self, need: usize) -> usize {
117 let mut tokens_by_age: Vec<_> = self
118 .by_token
119 .iter()
120 .map(|(addr, meta)| (*addr, meta.paused_at))
121 .collect();
122 tokens_by_age.sort_unstable_by_key(|(_, paused_at)| *paused_at);
123
124 let mut evicted = 0;
125 for (token, _) in tokens_by_age {
126 if evicted >= need {
127 break;
128 }
129 if let Some(meta) = self.by_token.remove(&token) {
130 evicted += meta.entries.len();
131 }
132 }
133 evicted
134 }
135
136 pub fn drain_token(&mut self, fee_token: &Address) -> Vec<PausedEntry> {
140 self.by_token
141 .remove(fee_token)
142 .map(|m| m.entries)
143 .unwrap_or_default()
144 }
145
146 pub fn count_for_token(&self, fee_token: &Address) -> usize {
148 self.by_token.get(fee_token).map_or(0, |m| m.entries.len())
149 }
150
151 pub fn contains(&self, tx_hash: &TxHash) -> bool {
153 self.by_token
154 .values()
155 .any(|m| m.entries.iter().any(|e| e.tx.hash() == tx_hash))
156 }
157
158 pub fn evict_expired(&mut self, tip_timestamp: u64) -> usize {
162 let mut count = 0;
163 for meta in self.by_token.values_mut() {
164 let before = meta.entries.len();
165 meta.entries
166 .retain(|e| e.valid_before.is_none_or(|vb| vb > tip_timestamp));
167 count += before - meta.entries.len();
168 }
169 self.by_token.retain(|_, m| !m.entries.is_empty());
171 count
172 }
173
174 pub fn evict_timed_out(&mut self) -> usize {
181 let now = Instant::now();
182 let mut count = 0;
183 self.by_token.retain(|_, meta| {
184 if now.duration_since(meta.paused_at) >= PAUSED_TX_TIMEOUT {
185 count += meta.entries.len();
186 false
187 } else {
188 true
189 }
190 });
191 count
192 }
193
194 pub fn evict_invalidated(
203 &mut self,
204 revoked_keys: &RevokedKeys,
205 spending_limit_updates: &SpendingLimitUpdates,
206 spending_limit_spends: &SpendingLimitUpdates,
207 ) -> usize {
208 if revoked_keys.is_empty()
209 && spending_limit_updates.is_empty()
210 && spending_limit_spends.is_empty()
211 {
212 return 0;
213 }
214
215 let mut count = 0;
216 for meta in self.by_token.values_mut() {
217 let before = meta.entries.len();
218 meta.entries.retain(|entry| {
219 let Some(subject) = entry.tx.transaction.keychain_subject() else {
220 return true;
221 };
222 let matches_limit_update =
223 subject.matches_spending_limit_update(spending_limit_updates);
224 let matches_limit_spend =
225 subject.matches_spending_limit_update(spending_limit_spends);
226 let sender_paid = if matches_limit_update || matches_limit_spend {
227 let sender = *entry.tx.transaction.sender_ref();
228 entry
229 .tx
230 .transaction
231 .inner()
232 .fee_payer(sender)
233 .map_or(true, |fee_payer| fee_payer == sender)
234 } else {
235 false
236 };
237
238 let invalidated = subject.matches_revoked(revoked_keys)
239 || (sender_paid && (matches_limit_update || matches_limit_spend));
240
241 !invalidated
242 });
243 count += before - meta.entries.len();
244 }
245 self.by_token.retain(|_, m| !m.entries.is_empty());
247 count
248 }
249
250 pub fn all_entries(&self) -> impl Iterator<Item = &PausedEntry> {
252 self.by_token.values().flat_map(|m| &m.entries)
253 }
254}
255
256#[cfg(test)]
257mod tests {
258 use super::*;
259 use crate::test_utils::{TxBuilder, wrap_valid_tx};
260 use alloy_signer::SignerSync;
261 use alloy_signer_local::PrivateKeySigner;
262 use reth_primitives_traits::Recovered;
263 use reth_transaction_pool::TransactionOrigin;
264 use tempo_primitives::{TempoTxEnvelope, transaction::tt_signed::AASigned};
265
266 fn create_valid_tx(sender: Address) -> Arc<ValidPoolTransaction<TempoPooledTransaction>> {
267 let pooled = TxBuilder::aa(sender).build();
268 Arc::new(wrap_valid_tx(pooled, TransactionOrigin::External))
269 }
270
271 fn create_valid_keychain_tx(
272 sender: Address,
273 fee_token: Address,
274 sponsored: bool,
275 ) -> Arc<ValidPoolTransaction<TempoPooledTransaction>> {
276 let access_key_signer = PrivateKeySigner::random();
277 let pooled = TxBuilder::aa(sender)
278 .fee_token(fee_token)
279 .build_keychain(sender, &access_key_signer);
280
281 let pooled = if sponsored {
282 let sponsor = PrivateKeySigner::random();
283 let aa = pooled
284 .inner()
285 .as_aa()
286 .expect("builder should produce AA tx");
287 let mut tx = aa.tx().clone();
288 tx.fee_payer_signature = Some(alloy_primitives::Signature::new(
289 alloy_primitives::U256::ZERO,
290 alloy_primitives::U256::ZERO,
291 false,
292 ));
293 let fee_payer_hash = tx.fee_payer_signature_hash(sender);
294 tx.fee_payer_signature = Some(
295 sponsor
296 .sign_hash_sync(&fee_payer_hash)
297 .expect("sponsor signing should succeed"),
298 );
299
300 let aa_signed = AASigned::new_unhashed(tx, aa.signature().clone());
301 let envelope: TempoTxEnvelope = aa_signed.into();
302 TempoPooledTransaction::new(Recovered::new_unchecked(envelope, sender))
303 } else {
304 pooled
305 };
306
307 Arc::new(wrap_valid_tx(pooled, TransactionOrigin::External))
308 }
309
310 #[test]
311 fn test_insert_and_drain() {
312 let mut pool = PausedFeeTokenPool::new();
313 let fee_token = Address::random();
314
315 let entries: Vec<_> = (0..3)
316 .map(|_| PausedEntry {
317 tx: create_valid_tx(Address::random()),
318 valid_before: None,
319 })
320 .collect();
321
322 assert!(pool.is_empty());
323 pool.insert_batch(fee_token, entries);
324
325 assert_eq!(pool.len(), 3);
326 assert_eq!(pool.count_for_token(&fee_token), 3);
327
328 let drained = pool.drain_token(&fee_token);
329 assert_eq!(drained.len(), 3);
330 assert!(pool.is_empty());
331 }
332
333 #[test]
334 fn test_evict_expired() {
335 let mut pool = PausedFeeTokenPool::new();
336 let fee_token = Address::random();
337
338 let entries = vec![
339 PausedEntry {
340 tx: create_valid_tx(Address::random()),
341 valid_before: Some(100), },
343 PausedEntry {
344 tx: create_valid_tx(Address::random()),
345 valid_before: Some(200), },
347 PausedEntry {
348 tx: create_valid_tx(Address::random()),
349 valid_before: None, },
351 ];
352
353 pool.insert_batch(fee_token, entries);
354 assert_eq!(pool.len(), 3);
355
356 let evicted = pool.evict_expired(150);
357 assert_eq!(evicted, 1);
358 assert_eq!(pool.len(), 2);
359 }
360
361 #[test]
362 fn test_global_cap_evicts_oldest() {
363 let mut pool = PausedFeeTokenPool::new();
364
365 let token_a = Address::random();
366 let token_b = Address::random();
367
368 let make_entries = |n: usize| -> Vec<PausedEntry> {
369 (0..n)
370 .map(|_| PausedEntry {
371 tx: create_valid_tx(Address::random()),
372 valid_before: None,
373 })
374 .collect()
375 };
376
377 let evicted = pool.insert_batch(token_a, make_entries(PAUSED_POOL_GLOBAL_CAP));
379 assert_eq!(evicted, 0);
380 assert_eq!(pool.len(), PAUSED_POOL_GLOBAL_CAP);
381
382 let evicted = pool.insert_batch(token_b, make_entries(100));
384 assert!(evicted > 0);
385 assert!(pool.len() <= PAUSED_POOL_GLOBAL_CAP);
386 assert_eq!(pool.count_for_token(&token_b), 100);
387 }
388
389 #[test]
390 fn test_global_cap_truncates_oversized_batch() {
391 let mut pool = PausedFeeTokenPool::new();
392 let token = Address::random();
393
394 let entries: Vec<_> = (0..PAUSED_POOL_GLOBAL_CAP + 500)
395 .map(|_| PausedEntry {
396 tx: create_valid_tx(Address::random()),
397 valid_before: None,
398 })
399 .collect();
400
401 let evicted = pool.insert_batch(token, entries);
402 assert_eq!(evicted, 0);
403 assert_eq!(pool.len(), PAUSED_POOL_GLOBAL_CAP);
404 }
405
406 #[test]
407 fn test_evict_invalidated_with_spending_limit_spends() {
408 let mut pool = PausedFeeTokenPool::new();
409 let user_address = Address::random();
410 let fee_token = Address::random();
411
412 let access_key_signer = alloy_signer_local::PrivateKeySigner::random();
414 let key_id = alloy_signer::Signer::address(&access_key_signer);
415 let tx = TxBuilder::aa(user_address)
416 .fee_token(fee_token)
417 .build_keychain(user_address, &access_key_signer);
418 let tx = Arc::new(wrap_valid_tx(
419 tx,
420 reth_transaction_pool::TransactionOrigin::External,
421 ));
422
423 let other_tx = create_valid_tx(Address::random());
425
426 pool.insert_batch(
427 fee_token,
428 vec![
429 PausedEntry {
430 tx,
431 valid_before: None,
432 },
433 PausedEntry {
434 tx: other_tx,
435 valid_before: None,
436 },
437 ],
438 );
439 assert_eq!(pool.len(), 2);
440
441 let mut spends = SpendingLimitUpdates::new();
443 spends.insert(user_address, key_id, Some(fee_token));
444
445 let evicted =
446 pool.evict_invalidated(&RevokedKeys::new(), &SpendingLimitUpdates::new(), &spends);
447
448 assert_eq!(
449 evicted, 1,
450 "Should evict the keychain tx matching the spend"
451 );
452 assert_eq!(pool.len(), 1, "Non-keychain tx should remain");
453 }
454
455 #[test]
456 fn test_evict_invalidated_keeps_sponsored_keychain_for_spending_limit_spends() {
457 let mut pool = PausedFeeTokenPool::new();
458 let user_address = Address::random();
459 let fee_token = Address::random();
460
461 let sponsored_keychain_tx = create_valid_keychain_tx(user_address, fee_token, true);
462 pool.insert_batch(
463 fee_token,
464 vec![PausedEntry {
465 tx: sponsored_keychain_tx,
466 valid_before: None,
467 }],
468 );
469
470 let key_id = pool
471 .all_entries()
472 .next()
473 .and_then(|entry| entry.tx.transaction.keychain_subject())
474 .map(|subject| subject.key_id)
475 .expect("sponsored keychain tx should have keychain subject");
476
477 let mut spends = SpendingLimitUpdates::new();
478 spends.insert(user_address, key_id, Some(fee_token));
479
480 let evicted =
481 pool.evict_invalidated(&RevokedKeys::new(), &SpendingLimitUpdates::new(), &spends);
482
483 assert_eq!(evicted, 0, "Sponsored keychain tx should not be evicted");
484 assert_eq!(pool.len(), 1);
485 }
486
487 #[test]
488 fn test_evict_invalidated_keeps_sponsored_keychain_for_spending_limit_updates() {
489 let mut pool = PausedFeeTokenPool::new();
490 let user_address = Address::random();
491 let fee_token = Address::random();
492
493 let sponsored_keychain_tx = create_valid_keychain_tx(user_address, fee_token, true);
494 pool.insert_batch(
495 fee_token,
496 vec![PausedEntry {
497 tx: sponsored_keychain_tx,
498 valid_before: None,
499 }],
500 );
501
502 let key_id = pool
503 .all_entries()
504 .next()
505 .and_then(|entry| entry.tx.transaction.keychain_subject())
506 .map(|subject| subject.key_id)
507 .expect("sponsored keychain tx should have keychain subject");
508
509 let mut updates = SpendingLimitUpdates::new();
510 updates.insert(user_address, key_id, Some(fee_token));
511
512 let evicted =
513 pool.evict_invalidated(&RevokedKeys::new(), &updates, &SpendingLimitUpdates::new());
514
515 assert_eq!(evicted, 0, "Sponsored keychain tx should not be evicted");
516 assert_eq!(pool.len(), 1);
517 }
518
519 #[test]
520 fn test_contains() {
521 let mut pool = PausedFeeTokenPool::new();
522 let fee_token = Address::random();
523
524 let tx = create_valid_tx(Address::random());
525 let tx_hash = *tx.hash();
526
527 let entry = PausedEntry {
528 tx,
529 valid_before: None,
530 };
531
532 assert!(!pool.contains(&tx_hash));
533 pool.insert_batch(fee_token, vec![entry]);
534 assert!(pool.contains(&tx_hash));
535 }
536}