1use crate::{RevokedKeys, SpendingLimitUpdates, transaction::TempoPooledTransaction};
9use alloy_primitives::{Address, TxHash, map::HashMap};
10use reth_transaction_pool::{PoolTransaction, ValidPoolTransaction};
11use std::{sync::Arc, time::Instant};
12
13pub const PAUSED_TX_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30 * 60); pub const PAUSED_POOL_GLOBAL_CAP: usize = 10_000;
23
24#[derive(Debug, Clone)]
26pub struct PausedEntry {
27 pub tx: Arc<ValidPoolTransaction<TempoPooledTransaction>>,
29 pub valid_before: Option<u64>,
31}
32
33#[derive(Debug, Clone)]
35struct PausedTokenMeta {
36 paused_at: Instant,
38 entries: Vec<PausedEntry>,
40}
41
42#[derive(Debug, Default)]
48pub struct PausedFeeTokenPool {
49 by_token: HashMap<Address, PausedTokenMeta>,
51}
52
53impl PausedFeeTokenPool {
54 pub fn new() -> Self {
56 Self::default()
57 }
58
59 pub fn len(&self) -> usize {
61 self.by_token.values().map(|m| m.entries.len()).sum()
62 }
63
64 pub fn is_empty(&self) -> bool {
66 self.by_token.is_empty()
67 }
68
69 pub fn insert_batch(&mut self, fee_token: Address, entries: Vec<PausedEntry>) -> usize {
80 if entries.is_empty() {
81 return 0;
82 }
83
84 let current = self.len();
85 let incoming = entries.len();
86 let available = PAUSED_POOL_GLOBAL_CAP.saturating_sub(current);
87 let mut evicted = 0;
88
89 if incoming > available {
90 let need = incoming - available;
91 evicted = self.evict_oldest(need);
92 }
93
94 let remaining_capacity = PAUSED_POOL_GLOBAL_CAP.saturating_sub(self.len());
95 let to_insert = if incoming > remaining_capacity {
96 entries.into_iter().take(remaining_capacity).collect()
97 } else {
98 entries
99 };
100
101 self.by_token
102 .entry(fee_token)
103 .or_insert_with(|| PausedTokenMeta {
104 paused_at: Instant::now(),
105 entries: Vec::new(),
106 })
107 .entries
108 .extend(to_insert);
109
110 evicted
111 }
112
113 fn evict_oldest(&mut self, need: usize) -> usize {
117 let mut tokens_by_age: Vec<_> = self
118 .by_token
119 .iter()
120 .map(|(addr, meta)| (*addr, meta.paused_at))
121 .collect();
122 tokens_by_age.sort_unstable_by_key(|(_, paused_at)| *paused_at);
123
124 let mut evicted = 0;
125 for (token, _) in tokens_by_age {
126 if evicted >= need {
127 break;
128 }
129 if let Some(meta) = self.by_token.remove(&token) {
130 evicted += meta.entries.len();
131 }
132 }
133 evicted
134 }
135
136 pub fn drain_token(&mut self, fee_token: &Address) -> Vec<PausedEntry> {
140 self.by_token
141 .remove(fee_token)
142 .map(|m| m.entries)
143 .unwrap_or_default()
144 }
145
146 pub fn count_for_token(&self, fee_token: &Address) -> usize {
148 self.by_token.get(fee_token).map_or(0, |m| m.entries.len())
149 }
150
151 pub fn contains(&self, tx_hash: &TxHash) -> bool {
153 self.by_token
154 .values()
155 .any(|m| m.entries.iter().any(|e| e.tx.hash() == tx_hash))
156 }
157
158 pub fn evict_expired(&mut self, tip_timestamp: u64) -> usize {
162 let mut count = 0;
163 for meta in self.by_token.values_mut() {
164 let before = meta.entries.len();
165 meta.entries
166 .retain(|e| e.valid_before.is_none_or(|vb| vb > tip_timestamp));
167 count += before - meta.entries.len();
168 }
169 self.by_token.retain(|_, m| !m.entries.is_empty());
171 count
172 }
173
174 pub fn evict_timed_out(&mut self) -> usize {
181 let now = Instant::now();
182 let mut count = 0;
183 self.by_token.retain(|_, meta| {
184 if now.duration_since(meta.paused_at) >= PAUSED_TX_TIMEOUT {
185 count += meta.entries.len();
186 false
187 } else {
188 true
189 }
190 });
191 count
192 }
193
194 pub fn evict_invalidated(
202 &mut self,
203 revoked_keys: &RevokedKeys,
204 spending_limit_updates: &SpendingLimitUpdates,
205 spending_limit_spends: &SpendingLimitUpdates,
206 ) -> usize {
207 if revoked_keys.is_empty()
208 && spending_limit_updates.is_empty()
209 && spending_limit_spends.is_empty()
210 {
211 return 0;
212 }
213
214 let mut count = 0;
215 for meta in self.by_token.values_mut() {
216 let before = meta.entries.len();
217 meta.entries.retain(|entry| {
218 let Some(subject) = entry.tx.transaction.keychain_subject() else {
219 return true;
220 };
221 let matches_limit_update =
222 subject.matches_spending_limit_update(spending_limit_updates);
223 let matches_limit_spend =
224 subject.matches_spending_limit_update(spending_limit_spends);
225 let sender_paid = if matches_limit_update || matches_limit_spend {
226 let sender = *entry.tx.transaction.sender_ref();
227 entry
228 .tx
229 .transaction
230 .inner()
231 .fee_payer(sender)
232 .map_or(true, |fee_payer| fee_payer == sender)
233 } else {
234 false
235 };
236
237 let invalidated = subject.matches_revoked(revoked_keys)
238 || (sender_paid && (matches_limit_update || matches_limit_spend));
239
240 !invalidated
241 });
242 count += before - meta.entries.len();
243 }
244 self.by_token.retain(|_, m| !m.entries.is_empty());
246 count
247 }
248
249 pub fn all_entries(&self) -> impl Iterator<Item = &PausedEntry> {
251 self.by_token.values().flat_map(|m| &m.entries)
252 }
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258 use crate::test_utils::{TxBuilder, wrap_valid_tx};
259 use alloy_signer::SignerSync;
260 use alloy_signer_local::PrivateKeySigner;
261 use reth_primitives_traits::Recovered;
262 use reth_transaction_pool::TransactionOrigin;
263 use tempo_primitives::{TempoTxEnvelope, transaction::tt_signed::AASigned};
264
265 fn create_valid_tx(sender: Address) -> Arc<ValidPoolTransaction<TempoPooledTransaction>> {
266 let pooled = TxBuilder::aa(sender).build();
267 Arc::new(wrap_valid_tx(pooled, TransactionOrigin::External))
268 }
269
270 fn create_valid_keychain_tx(
271 sender: Address,
272 fee_token: Address,
273 sponsored: bool,
274 ) -> Arc<ValidPoolTransaction<TempoPooledTransaction>> {
275 let access_key_signer = PrivateKeySigner::random();
276 let pooled = TxBuilder::aa(sender)
277 .fee_token(fee_token)
278 .build_keychain(sender, &access_key_signer);
279
280 let pooled = if sponsored {
281 let sponsor = PrivateKeySigner::random();
282 let aa = pooled
283 .inner()
284 .as_aa()
285 .expect("builder should produce AA tx");
286 let mut tx = aa.tx().clone();
287 tx.fee_payer_signature = Some(alloy_primitives::Signature::new(
288 alloy_primitives::U256::ZERO,
289 alloy_primitives::U256::ZERO,
290 false,
291 ));
292 let fee_payer_hash = tx.fee_payer_signature_hash(sender);
293 tx.fee_payer_signature = Some(
294 sponsor
295 .sign_hash_sync(&fee_payer_hash)
296 .expect("sponsor signing should succeed"),
297 );
298
299 let aa_signed = AASigned::new_unhashed(tx, aa.signature().clone());
300 let envelope: TempoTxEnvelope = aa_signed.into();
301 TempoPooledTransaction::new(Recovered::new_unchecked(envelope, sender))
302 } else {
303 pooled
304 };
305
306 Arc::new(wrap_valid_tx(pooled, TransactionOrigin::External))
307 }
308
309 #[test]
310 fn test_insert_and_drain() {
311 let mut pool = PausedFeeTokenPool::new();
312 let fee_token = Address::random();
313
314 let entries: Vec<_> = (0..3)
315 .map(|_| PausedEntry {
316 tx: create_valid_tx(Address::random()),
317 valid_before: None,
318 })
319 .collect();
320
321 assert!(pool.is_empty());
322 pool.insert_batch(fee_token, entries);
323
324 assert_eq!(pool.len(), 3);
325 assert_eq!(pool.count_for_token(&fee_token), 3);
326
327 let drained = pool.drain_token(&fee_token);
328 assert_eq!(drained.len(), 3);
329 assert!(pool.is_empty());
330 }
331
332 #[test]
333 fn test_evict_expired() {
334 let mut pool = PausedFeeTokenPool::new();
335 let fee_token = Address::random();
336
337 let entries = vec![
338 PausedEntry {
339 tx: create_valid_tx(Address::random()),
340 valid_before: Some(100), },
342 PausedEntry {
343 tx: create_valid_tx(Address::random()),
344 valid_before: Some(200), },
346 PausedEntry {
347 tx: create_valid_tx(Address::random()),
348 valid_before: None, },
350 ];
351
352 pool.insert_batch(fee_token, entries);
353 assert_eq!(pool.len(), 3);
354
355 let evicted = pool.evict_expired(150);
356 assert_eq!(evicted, 1);
357 assert_eq!(pool.len(), 2);
358 }
359
360 #[test]
361 fn test_global_cap_evicts_oldest() {
362 let mut pool = PausedFeeTokenPool::new();
363
364 let token_a = Address::random();
365 let token_b = Address::random();
366
367 let make_entries = |n: usize| -> Vec<PausedEntry> {
368 (0..n)
369 .map(|_| PausedEntry {
370 tx: create_valid_tx(Address::random()),
371 valid_before: None,
372 })
373 .collect()
374 };
375
376 let evicted = pool.insert_batch(token_a, make_entries(PAUSED_POOL_GLOBAL_CAP));
378 assert_eq!(evicted, 0);
379 assert_eq!(pool.len(), PAUSED_POOL_GLOBAL_CAP);
380
381 let evicted = pool.insert_batch(token_b, make_entries(100));
383 assert!(evicted > 0);
384 assert!(pool.len() <= PAUSED_POOL_GLOBAL_CAP);
385 assert_eq!(pool.count_for_token(&token_b), 100);
386 }
387
388 #[test]
389 fn test_global_cap_truncates_oversized_batch() {
390 let mut pool = PausedFeeTokenPool::new();
391 let token = Address::random();
392
393 let entries: Vec<_> = (0..PAUSED_POOL_GLOBAL_CAP + 500)
394 .map(|_| PausedEntry {
395 tx: create_valid_tx(Address::random()),
396 valid_before: None,
397 })
398 .collect();
399
400 let evicted = pool.insert_batch(token, entries);
401 assert_eq!(evicted, 0);
402 assert_eq!(pool.len(), PAUSED_POOL_GLOBAL_CAP);
403 }
404
405 #[test]
406 fn test_evict_invalidated_with_spending_limit_spends() {
407 let mut pool = PausedFeeTokenPool::new();
408 let user_address = Address::random();
409 let fee_token = Address::random();
410
411 let access_key_signer = alloy_signer_local::PrivateKeySigner::random();
413 let key_id = alloy_signer::Signer::address(&access_key_signer);
414 let tx = TxBuilder::aa(user_address)
415 .fee_token(fee_token)
416 .build_keychain(user_address, &access_key_signer);
417 let tx = Arc::new(wrap_valid_tx(
418 tx,
419 reth_transaction_pool::TransactionOrigin::External,
420 ));
421
422 let other_tx = create_valid_tx(Address::random());
424
425 pool.insert_batch(
426 fee_token,
427 vec![
428 PausedEntry {
429 tx,
430 valid_before: None,
431 },
432 PausedEntry {
433 tx: other_tx,
434 valid_before: None,
435 },
436 ],
437 );
438 assert_eq!(pool.len(), 2);
439
440 let mut spends = SpendingLimitUpdates::new();
442 spends.insert(user_address, key_id, Some(fee_token));
443
444 let evicted =
445 pool.evict_invalidated(&RevokedKeys::new(), &SpendingLimitUpdates::new(), &spends);
446
447 assert_eq!(
448 evicted, 1,
449 "Should evict the keychain tx matching the spend"
450 );
451 assert_eq!(pool.len(), 1, "Non-keychain tx should remain");
452 }
453
454 #[test]
455 fn test_evict_invalidated_keeps_sponsored_keychain_for_spending_limit_spends() {
456 let mut pool = PausedFeeTokenPool::new();
457 let user_address = Address::random();
458 let fee_token = Address::random();
459
460 let sponsored_keychain_tx = create_valid_keychain_tx(user_address, fee_token, true);
461 pool.insert_batch(
462 fee_token,
463 vec![PausedEntry {
464 tx: sponsored_keychain_tx,
465 valid_before: None,
466 }],
467 );
468
469 let key_id = pool
470 .all_entries()
471 .next()
472 .and_then(|entry| entry.tx.transaction.keychain_subject())
473 .map(|subject| subject.key_id)
474 .expect("sponsored keychain tx should have keychain subject");
475
476 let mut spends = SpendingLimitUpdates::new();
477 spends.insert(user_address, key_id, Some(fee_token));
478
479 let evicted =
480 pool.evict_invalidated(&RevokedKeys::new(), &SpendingLimitUpdates::new(), &spends);
481
482 assert_eq!(evicted, 0, "Sponsored keychain tx should not be evicted");
483 assert_eq!(pool.len(), 1);
484 }
485
486 #[test]
487 fn test_evict_invalidated_keeps_sponsored_keychain_for_spending_limit_updates() {
488 let mut pool = PausedFeeTokenPool::new();
489 let user_address = Address::random();
490 let fee_token = Address::random();
491
492 let sponsored_keychain_tx = create_valid_keychain_tx(user_address, fee_token, true);
493 pool.insert_batch(
494 fee_token,
495 vec![PausedEntry {
496 tx: sponsored_keychain_tx,
497 valid_before: None,
498 }],
499 );
500
501 let key_id = pool
502 .all_entries()
503 .next()
504 .and_then(|entry| entry.tx.transaction.keychain_subject())
505 .map(|subject| subject.key_id)
506 .expect("sponsored keychain tx should have keychain subject");
507
508 let mut updates = SpendingLimitUpdates::new();
509 updates.insert(user_address, key_id, Some(fee_token));
510
511 let evicted =
512 pool.evict_invalidated(&RevokedKeys::new(), &updates, &SpendingLimitUpdates::new());
513
514 assert_eq!(evicted, 0, "Sponsored keychain tx should not be evicted");
515 assert_eq!(pool.len(), 1);
516 }
517
518 #[test]
519 fn test_contains() {
520 let mut pool = PausedFeeTokenPool::new();
521 let fee_token = Address::random();
522
523 let tx = create_valid_tx(Address::random());
524 let tx_hash = *tx.hash();
525
526 let entry = PausedEntry {
527 tx,
528 valid_before: None,
529 };
530
531 assert!(!pool.contains(&tx_hash));
532 pool.insert_batch(fee_token, vec![entry]);
533 assert!(pool.contains(&tx_hash));
534 }
535}