1use crate::{
4 RevokedKeys, SpendingLimitUpdates, TempoTransactionPool,
5 metrics::TempoPoolMaintenanceMetrics,
6 paused::{PausedEntry, PausedFeeTokenPool},
7 transaction::TempoPooledTransaction,
8 tt_2d_pool::AASequenceId,
9};
10use alloy_consensus::transaction::TxHashRef;
11use alloy_primitives::{
12 Address, TxHash,
13 map::{AddressMap, B256Set, HashMap, HashSet},
14};
15use alloy_sol_types::SolEvent;
16use futures::StreamExt;
17use reth_chainspec::ChainSpecProvider;
18use reth_primitives_traits::AlloyBlockHeader;
19use reth_provider::{CanonStateNotification, CanonStateSubscriptions, Chain};
20use reth_storage_api::StateProviderFactory;
21use reth_transaction_pool::{PoolTransaction, TransactionOrigin, TransactionPool};
22use std::{
23 collections::{BTreeMap, btree_map::Entry},
24 sync::Arc,
25 time::Instant,
26};
27use tempo_chainspec::{TempoChainSpec, hardfork::TempoHardforks, spec::TEMPO_T1_BASE_FEE};
28use tempo_contracts::precompiles::{IAccountKeychain, IFeeManager, ITIP20, ITIP403Registry};
29use tempo_precompiles::{
30 ACCOUNT_KEYCHAIN_ADDRESS, TIP_FEE_MANAGER_ADDRESS, TIP403_REGISTRY_ADDRESS,
31 tip20::is_tip20_prefix,
32};
33use tempo_primitives::{AASigned, TempoPrimitives};
34use tracing::{debug, error};
35
36const EVICTION_BUFFER_SECS: u64 = 3;
39
40#[derive(Debug, Default)]
45pub struct TempoPoolUpdates {
46 pub expired_txs: Vec<TxHash>,
48 pub revoked_keys: RevokedKeys,
51 pub spending_limit_changes: SpendingLimitUpdates,
56 pub validator_token_changes: Vec<(Address, Address)>,
58 pub user_token_changes: HashSet<Address>,
65 pub blacklist_additions: Vec<(u64, Address)>,
67 pub whitelist_removals: Vec<(u64, Address)>,
69 pub pause_events: Vec<(Address, bool)>,
71 pub spending_limit_spends: SpendingLimitUpdates,
79}
80
81impl TempoPoolUpdates {
82 pub fn new() -> Self {
84 Self::default()
85 }
86
87 pub fn is_empty(&self) -> bool {
89 self.expired_txs.is_empty()
90 && self.revoked_keys.is_empty()
91 && self.spending_limit_changes.is_empty()
92 && self.validator_token_changes.is_empty()
93 && self.user_token_changes.is_empty()
94 && self.blacklist_additions.is_empty()
95 && self.whitelist_removals.is_empty()
96 && self.pause_events.is_empty()
97 && self.spending_limit_spends.is_empty()
98 }
99
100 pub fn from_chain(chain: &Chain<TempoPrimitives>) -> Self {
105 let mut updates = Self::new();
106
107 for log in chain
109 .execution_outcome()
110 .receipts()
111 .iter()
112 .flatten()
113 .flat_map(|receipt| &receipt.logs)
114 {
115 if log.address == ACCOUNT_KEYCHAIN_ADDRESS {
117 if let Ok(event) = IAccountKeychain::KeyRevoked::decode_log(log) {
118 updates.revoked_keys.insert(event.account, event.publicKey);
119 } else if let Ok(event) = IAccountKeychain::SpendingLimitUpdated::decode_log(log) {
120 updates.spending_limit_changes.insert(
121 event.account,
122 event.publicKey,
123 Some(event.token),
124 );
125 }
126 }
127 else if log.address == TIP_FEE_MANAGER_ADDRESS {
129 if let Ok(event) = IFeeManager::ValidatorTokenSet::decode_log(log) {
130 updates
131 .validator_token_changes
132 .push((event.validator, event.token));
133 } else if let Ok(event) = IFeeManager::UserTokenSet::decode_log(log) {
134 updates.user_token_changes.insert(event.user);
135 }
136 }
137 else if log.address == TIP403_REGISTRY_ADDRESS {
139 if let Ok(event) = ITIP403Registry::BlacklistUpdated::decode_log(log)
140 && event.restricted
141 {
142 updates
143 .blacklist_additions
144 .push((event.policyId, event.account));
145 } else if let Ok(event) = ITIP403Registry::WhitelistUpdated::decode_log(log)
146 && !event.allowed
147 {
148 updates
149 .whitelist_removals
150 .push((event.policyId, event.account));
151 }
152 }
153 else if is_tip20_prefix(log.address)
155 && let Ok(event) = ITIP20::PauseStateUpdate::decode_log(log)
156 {
157 updates.pause_events.push((log.address, event.isPaused));
158 }
159 }
160
161 for tx in chain
166 .blocks_iter()
167 .flat_map(|block| block.body().transactions())
168 {
169 let Some(aa_tx) = tx.as_aa() else {
170 continue;
171 };
172 let Some(keychain_sig) = aa_tx.signature().as_keychain() else {
173 continue;
174 };
175 let Ok(key_id) = keychain_sig.key_id(&aa_tx.signature_hash()) else {
176 continue;
177 };
178 if key_id.is_zero() {
180 continue;
181 }
182 updates.spending_limit_spends.insert(
187 keychain_sig.user_address,
188 key_id,
189 aa_tx.tx().fee_token,
190 );
191 }
192
193 updates
194 }
195
196 pub fn has_invalidation_events(&self) -> bool {
198 !self.revoked_keys.is_empty()
199 || !self.spending_limit_changes.is_empty()
200 || !self.spending_limit_spends.is_empty()
201 || !self.validator_token_changes.is_empty()
202 || !self.user_token_changes.is_empty()
203 || !self.blacklist_additions.is_empty()
204 || !self.whitelist_removals.is_empty()
205 }
206}
207
208#[derive(Default)]
216struct TempoPoolState {
217 expiry_map: BTreeMap<u64, Vec<TxHash>>,
219 tx_to_expiry: HashMap<TxHash, u64>,
221 paused_pool: PausedFeeTokenPool,
223 pending_staleness: PendingStalenessTracker,
225 t1_transition_cleanup_done: bool,
229 key_expiry: KeyExpiryTracker,
231}
232
233impl TempoPoolState {
234 fn track_expiry(&mut self, maybe_aa_tx: Option<&AASigned>) {
236 if let Some(aa_tx) = maybe_aa_tx
237 && let Some(valid_before) = aa_tx.tx().valid_before
238 {
239 let hash = *aa_tx.hash();
240 self.expiry_map.entry(valid_before).or_default().push(hash);
241 self.tx_to_expiry.insert(hash, valid_before);
242 }
243 }
244
245 fn untrack_expiry(&mut self, hash: &TxHash) {
247 if let Some(valid_before) = self.tx_to_expiry.remove(hash)
248 && let Entry::Occupied(mut entry) = self.expiry_map.entry(valid_before)
249 {
250 entry.get_mut().retain(|h| *h != *hash);
251 if entry.get().is_empty() {
252 entry.remove();
253 }
254 }
255
256 self.key_expiry.untrack(hash);
257 }
258
259 fn drain_expired(&mut self, tip_timestamp: u64) -> Vec<TxHash> {
262 let mut expired = Vec::new();
263 while let Some(entry) = self.expiry_map.first_entry()
264 && *entry.key() <= tip_timestamp
265 {
266 let expired_hashes = entry.remove();
267 for tx_hash in &expired_hashes {
268 self.tx_to_expiry.remove(tx_hash);
269 }
270 expired.extend(expired_hashes);
271 }
272 expired
273 }
274
275 fn track_key_expiry(&mut self, tx: &TempoPooledTransaction) {
280 let Some(expiry) = tx.key_expiry() else {
281 return;
282 };
283
284 let Some(subject) = tx.keychain_subject() else {
285 return;
286 };
287
288 self.key_expiry
289 .track(subject.account, subject.key_id, expiry, *tx.hash());
290 }
291}
292
293const DEFAULT_PENDING_STALENESS_INTERVAL: u64 = 30 * 60;
296
297#[derive(Debug)]
304struct PendingStalenessTracker {
305 previous_pending: HashSet<TxHash>,
307 last_snapshot_time: Option<u64>,
309 interval_secs: u64,
311}
312
313impl PendingStalenessTracker {
314 fn new(interval_secs: u64) -> Self {
316 Self {
317 previous_pending: HashSet::default(),
318 last_snapshot_time: None,
319 interval_secs,
320 }
321 }
322
323 fn should_check(&self, now: u64) -> bool {
325 self.last_snapshot_time
326 .is_none_or(|last| now.saturating_sub(last) >= self.interval_secs)
327 }
328
329 fn check_and_update(&mut self, current_pending: HashSet<TxHash>, now: u64) -> Vec<TxHash> {
336 let stale: Vec<TxHash> = self
338 .previous_pending
339 .intersection(¤t_pending)
340 .copied()
341 .collect();
342
343 self.previous_pending = current_pending
345 .into_iter()
346 .filter(|hash| !stale.contains(hash))
347 .collect();
348 self.last_snapshot_time = Some(now);
349
350 stale
351 }
352}
353
354impl Default for PendingStalenessTracker {
355 fn default() -> Self {
356 Self::new(DEFAULT_PENDING_STALENESS_INTERVAL)
357 }
358}
359
360#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
362struct KeyId {
363 account: Address,
364 key_id: Address,
365}
366
367#[derive(Debug, Default)]
376struct KeyExpiryTracker {
377 expiry_map: BTreeMap<u64, HashSet<KeyId>>,
379 key_to_txs: HashMap<KeyId, (u64, HashSet<TxHash>)>,
381 tx_to_key: HashMap<TxHash, KeyId>,
383}
384
385impl KeyExpiryTracker {
386 fn track(&mut self, account: Address, key_id: Address, expiry: u64, tx_hash: TxHash) {
391 let key = KeyId { account, key_id };
392
393 match self.key_to_txs.entry(key) {
394 alloy_primitives::map::Entry::Occupied(mut entry) => {
395 let (existing_expiry, txs) = entry.get_mut();
396 debug_assert_eq!(
397 *existing_expiry, expiry,
398 "Key expiry changed unexpectedly - this shouldn't happen"
399 );
400 txs.insert(tx_hash);
401 }
402 alloy_primitives::map::Entry::Vacant(entry) => {
403 entry.insert((expiry, [tx_hash].into_iter().collect()));
404 self.expiry_map.entry(expiry).or_default().insert(key);
405 }
406 }
407 self.tx_to_key.insert(tx_hash, key);
408 }
409
410 fn untrack(&mut self, hash: &TxHash) {
412 let Some(key) = self.tx_to_key.remove(hash) else {
413 return;
414 };
415
416 let Some((expiry, txs)) = self.key_to_txs.get_mut(&key) else {
417 return;
418 };
419 txs.remove(hash);
420
421 if txs.is_empty() {
422 let expiry = *expiry;
423 self.key_to_txs.remove(&key);
424 if let Some(keys) = self.expiry_map.get_mut(&expiry) {
425 keys.remove(&key);
426 if keys.is_empty() {
427 self.expiry_map.remove(&expiry);
428 }
429 }
430 }
431 }
432
433 fn drain_expired(&mut self, tip_timestamp: u64) -> Vec<TxHash> {
437 let mut expired_txs = Vec::new();
438
439 while let Some(entry) = self.expiry_map.first_entry()
440 && *entry.key() <= tip_timestamp
441 {
442 let expired_keys = entry.remove();
443 for key in expired_keys {
444 if let Some((_, txs)) = self.key_to_txs.remove(&key) {
445 for tx in &txs {
446 self.tx_to_key.remove(tx);
447 }
448 expired_txs.extend(txs);
449 }
450 }
451 }
452
453 expired_txs
454 }
455}
456
457pub async fn maintain_tempo_pool<Client>(pool: TempoTransactionPool<Client>)
470where
471 Client: StateProviderFactory
472 + reth_provider::HeaderProvider<Header: reth_primitives_traits::BlockHeader>
473 + ChainSpecProvider<ChainSpec = TempoChainSpec>
474 + CanonStateSubscriptions<Primitives = TempoPrimitives>
475 + 'static,
476{
477 let mut state = TempoPoolState::default();
478 let metrics = TempoPoolMaintenanceMetrics::default();
479
480 let mut new_txs = pool.new_transactions_listener();
482 let mut chain_events = pool.client().canonical_state_stream();
483
484 let all_txs = pool.all_transactions();
486 for tx in all_txs.pending.iter().chain(all_txs.queued.iter()) {
487 state.track_expiry(tx.transaction.inner().as_aa());
488 state.track_key_expiry(&tx.transaction);
489 }
490
491 let amm_cache = pool.amm_liquidity_cache();
492
493 loop {
494 tokio::select! {
495 tx_event = new_txs.recv() => {
497 let Some(tx_event) = tx_event else {
498 break;
499 };
500
501 let tx = &tx_event.transaction.transaction;
502 state.track_expiry(tx.inner().as_aa());
503 state.track_key_expiry(tx);
504 }
505
506 Some(event) = chain_events.next() => {
508 let new = match event {
509 CanonStateNotification::Reorg { old, new } => {
510 let (orphaned_txs, affected_seq_ids) =
512 handle_reorg(old, new.clone(), |hash| pool.contains(hash));
513
514 if !affected_seq_ids.is_empty() {
517 let new_tip_hash = new.tip().hash();
518 if let Err(err) = pool.reset_2d_nonces_from_state(
519 affected_seq_ids.into_iter().collect(),
520 new_tip_hash,
521 ) {
522 error!(
523 target: "txpool",
524 ?err,
525 "Failed to reset 2D nonce state after reorg"
526 );
527 }
528 }
529
530 if !orphaned_txs.is_empty() {
531 let count = orphaned_txs.len();
532 debug!(
533 target: "txpool",
534 count,
535 "Re-injecting orphaned AA 2D transactions after reorg"
536 );
537
538 let pool_clone = pool.clone();
539 tokio::spawn(async move {
540 let results = pool_clone
541 .add_transactions(TransactionOrigin::Local, orphaned_txs)
542 .await;
543 let failed = results.iter().filter(|r| r.is_err()).count();
544 if failed > 0 {
545 debug!(
546 target: "txpool",
547 failed,
548 "Some orphaned AA 2D transactions failed to re-inject"
549 );
550 }
551 });
552 }
553
554 pool.notify_aa_pool_on_state_updates(new.execution_outcome().state().state());
556
557 if let Err(err) = amm_cache.repopulate(pool.client()) {
560 error!(target: "txpool", ?err, "AMM liquidity cache repopulate after reorg failed");
561 }
562
563 continue;
564 }
565 CanonStateNotification::Commit { new } => new,
566 };
567
568 let block_update_start = Instant::now();
569
570 let tip = &new;
571 let bundle_state = tip.execution_outcome().state().state();
572 let tip_timestamp = tip.tip().header().timestamp();
573
574 if !state.t1_transition_cleanup_done {
579 let chain_spec = pool.client().chain_spec();
580 if chain_spec.is_t1_active_at_timestamp(tip_timestamp) {
581 let evicted = evict_underpriced_transactions_for_t1(&pool);
582 if evicted > 0 {
583 debug!(
584 target: "txpool",
585 count = evicted,
586 tip_timestamp,
587 "T1 transition: evicted underpriced transactions (max_fee_per_gas < 20 billion attodollars)"
588 );
589 }
590 state.t1_transition_cleanup_done = true;
591 }
592 }
593
594 let mut updates = TempoPoolUpdates::from_chain(tip);
596
597 tip.blocks_iter()
599 .flat_map(|block| block.body().transactions())
600 .for_each(|tx| {
601 state.untrack_expiry(tx.tx_hash())
602 });
603
604 let max_expiry = tip_timestamp.saturating_add(EVICTION_BUFFER_SECS);
607
608 let expired = state.drain_expired(max_expiry);
610 updates.expired_txs = expired.into_iter().filter(|h| pool.contains(h)).collect();
611
612 let key_expired = state.key_expiry.drain_expired(max_expiry);
614 let key_expired: Vec<TxHash> =
615 key_expired.into_iter().filter(|h| pool.contains(h)).collect();
616
617 let expired_start = Instant::now();
619 let expired_count = updates.expired_txs.len();
620 if expired_count > 0 {
621 debug!(
622 target: "txpool",
623 count = expired_count,
624 tip_timestamp,
625 "Evicting expired AA transactions (valid_before)"
626 );
627 pool.remove_transactions(updates.expired_txs.clone());
628 metrics.expired_transactions_evicted.increment(expired_count as u64);
629 }
630
631 let key_expired_count = key_expired.len();
633 if key_expired_count > 0 {
634 debug!(
635 target: "txpool",
636 count = key_expired_count,
637 tip_timestamp,
638 "Evicting transactions with expired keychain keys"
639 );
640 pool.remove_transactions(key_expired);
641 metrics.expired_transactions_evicted.increment(key_expired_count as u64);
642 }
643 metrics.expired_eviction_duration_seconds.record(expired_start.elapsed());
644
645 let pause_start = Instant::now();
647
648 let pause_tokens: Vec<Address> = updates
652 .pause_events
653 .iter()
654 .filter_map(|(token, is_paused)| is_paused.then_some(*token))
655 .collect();
656
657 if !pause_tokens.is_empty() {
660 let all_txs = pool.all_transactions();
661
662 let mut by_token: AddressMap<Vec<TxHash>> = AddressMap::default();
665 for tx in all_txs.pending.iter().chain(all_txs.queued.iter()) {
666 if let Some(fee_token) = tx.transaction.inner().fee_token() {
667 by_token.entry(fee_token).or_default().push(*tx.hash());
668 }
669 }
670
671 for token in pause_tokens {
673 let Some(hashes_to_pause) = by_token.remove(&token) else {
674 continue;
676 };
677
678 let removed_txs = pool.remove_transactions(hashes_to_pause);
679 let count = removed_txs.len();
680
681 if count > 0 {
682 for tx in &removed_txs {
684 state.untrack_expiry(tx.hash());
685 }
686
687 let entries: Vec<_> = removed_txs
688 .into_iter()
689 .map(|tx| {
690 let valid_before = tx
691 .transaction
692 .inner()
693 .as_aa()
694 .and_then(|aa| aa.tx().valid_before);
695 PausedEntry { tx, valid_before }
696 })
697 .collect();
698
699 let cap_evicted = state.paused_pool.insert_batch(token, entries);
700 metrics.transactions_paused.increment(count as u64);
701 if cap_evicted > 0 {
702 metrics.paused_pool_cap_evicted.increment(cap_evicted as u64);
703 debug!(
704 target: "txpool",
705 cap_evicted,
706 "Evicted oldest paused transactions due to global cap"
707 );
708 }
709 debug!(
710 target: "txpool",
711 %token,
712 count,
713 "Moved transactions to paused pool (fee token paused)"
714 );
715 }
716 }
717 }
718
719 for (token, is_paused) in &updates.pause_events {
721 if *is_paused {
722 continue; }
724
725 let paused_entries = state.paused_pool.drain_token(token);
727 if !paused_entries.is_empty() {
728 let count = paused_entries.len();
729 metrics.transactions_unpaused.increment(count as u64);
730 let pool_clone = pool.clone();
731 let token = *token;
732 tokio::spawn(async move {
733 let txs: Vec<_> = paused_entries
734 .into_iter()
735 .map(|e| e.tx.transaction.clone())
736 .collect();
737
738 let results = pool_clone
739 .add_external_transactions(txs)
740 .await;
741
742 let success = results.iter().filter(|r| r.is_ok()).count();
743 debug!(
744 target: "txpool",
745 %token,
746 total = count,
747 success,
748 "Restored transactions from paused pool (fee token unpaused)"
749 );
750 });
751 }
752 }
753
754 let paused_expired = state.paused_pool.evict_expired(tip_timestamp);
756 let paused_timed_out = state.paused_pool.evict_timed_out();
757 let total_paused_evicted = paused_expired + paused_timed_out;
758 if total_paused_evicted > 0 {
759 debug!(
760 target: "txpool",
761 count = total_paused_evicted,
762 tip_timestamp,
763 "Evicted expired transactions from paused pool"
764 );
765 }
766
767 if !updates.revoked_keys.is_empty()
769 || !updates.spending_limit_changes.is_empty()
770 || !updates.spending_limit_spends.is_empty()
771 {
772 state.paused_pool.evict_invalidated(
773 &updates.revoked_keys,
774 &updates.spending_limit_changes,
775 &updates.spending_limit_spends,
776 );
777 }
778 metrics.pause_events_duration_seconds.record(pause_start.elapsed());
779
780 let nonce_pool_start = Instant::now();
782 pool.notify_aa_pool_on_state_updates(bundle_state);
783
784 pool.remove_included_expiring_nonce_txs(
788 tip.blocks_iter()
789 .flat_map(|block| block.body().transactions())
790 .map(|tx| tx.tx_hash()),
791 );
792 metrics.nonce_pool_update_duration_seconds.record(nonce_pool_start.elapsed());
793
794 let amm_start = Instant::now();
796 amm_cache.on_new_state(tip.execution_outcome());
797 for block in tip.blocks_iter() {
798 if let Err(err) = amm_cache.on_new_block(block.sealed_header(), pool.client()) {
799 error!(target: "txpool", ?err, "AMM liquidity cache update failed");
800 }
801 }
802 metrics.amm_cache_update_duration_seconds.record(amm_start.elapsed());
803
804 if updates.has_invalidation_events() {
809 let invalidation_start = Instant::now();
810 debug!(
811 target: "txpool",
812 revoked_keys = updates.revoked_keys.len(),
813 spending_limit_changes = updates.spending_limit_changes.len(),
814 spending_limit_spends = updates.spending_limit_spends.len(),
815 validator_token_changes = updates.validator_token_changes.len(),
816 user_token_changes = updates.user_token_changes.len(),
817 blacklist_additions = updates.blacklist_additions.len(),
818 whitelist_removals = updates.whitelist_removals.len(),
819 "Processing transaction invalidation events"
820 );
821 let evicted = pool.evict_invalidated_transactions(&updates);
822 for hash in &evicted {
823 state.untrack_expiry(hash);
824 }
825 metrics.transactions_invalidated.increment(evicted.len() as u64);
826 metrics
827 .invalidation_eviction_duration_seconds
828 .record(invalidation_start.elapsed());
829 }
830
831 if state.pending_staleness.should_check(tip_timestamp) {
835 let current_pending: HashSet<TxHash> =
836 pool.pending_transactions().iter().map(|tx| *tx.hash()).collect();
837 let stale_to_evict =
838 state.pending_staleness.check_and_update(current_pending, tip_timestamp);
839
840 if !stale_to_evict.is_empty() {
841 debug!(
842 target: "txpool",
843 count = stale_to_evict.len(),
844 tip_timestamp,
845 "Evicting stale pending transactions"
846 );
847 for hash in &stale_to_evict {
849 state.untrack_expiry(hash);
850 }
851 pool.remove_transactions(stale_to_evict);
852 }
853 }
854
855 metrics.block_update_duration_seconds.record(block_update_start.elapsed());
857 }
858 }
859 }
860}
861
862fn evict_underpriced_transactions_for_t1<Pool>(pool: &Pool) -> usize
871where
872 Pool: TransactionPool,
873{
874 let all_txs = pool.all_transactions();
875 let t1_base_fee = TEMPO_T1_BASE_FEE as u128;
876
877 let underpriced_hashes: Vec<TxHash> = all_txs
878 .pending
879 .iter()
880 .chain(all_txs.queued.iter())
881 .filter(|tx| tx.max_fee_per_gas() < t1_base_fee)
882 .map(|tx| *tx.hash())
883 .collect();
884
885 let count = underpriced_hashes.len();
886 if count > 0 {
887 pool.remove_transactions(underpriced_hashes);
888 }
889
890 count
891}
892
893pub fn handle_reorg<F>(
900 old_chain: Arc<Chain<TempoPrimitives>>,
901 new_chain: Arc<Chain<TempoPrimitives>>,
902 is_in_pool: F,
903) -> (Vec<TempoPooledTransaction>, HashSet<AASequenceId>)
904where
905 F: Fn(&TxHash) -> bool,
906{
907 let (new_blocks, _) = new_chain.inner();
909 let (old_blocks, _) = old_chain.inner();
910
911 let new_mined_hashes: B256Set = new_blocks.transaction_hashes().collect();
913
914 let mut orphaned_txs = Vec::new();
915 let mut affected_seq_ids = HashSet::default();
916
917 for tx in old_blocks.transactions_ecrecovered() {
919 if new_mined_hashes.contains(tx.tx_hash()) {
921 continue;
922 }
923
924 let Some(aa_tx) = tx.as_aa() else {
925 continue;
926 };
927
928 if aa_tx.tx().nonce_key.is_zero() {
930 continue;
931 }
932
933 let seq_id = AASequenceId::new(tx.signer(), aa_tx.tx().nonce_key);
934
935 affected_seq_ids.insert(seq_id);
938
939 let pooled_tx = TempoPooledTransaction::new(tx);
940 if is_in_pool(pooled_tx.hash()) {
941 continue;
942 }
943
944 orphaned_txs.push(pooled_tx);
945 }
946
947 (orphaned_txs, affected_seq_ids)
948}
949
950#[cfg(test)]
951mod tests {
952 use super::*;
953 use crate::test_utils::TxBuilder;
954 use alloy_primitives::{Address, TxHash, U256};
955 use reth_primitives_traits::RecoveredBlock;
956 use reth_transaction_pool::PoolTransaction;
957 use std::collections::HashSet;
958 use tempo_primitives::{Block, BlockBody, TempoHeader, TempoTxEnvelope};
959
960 mod pending_staleness_tracker_tests {
961 use super::*;
962
963 #[test]
964 fn no_eviction_on_first_snapshot() {
965 let mut tracker = PendingStalenessTracker::new(100);
966 let tx1 = TxHash::random();
967
968 let stale = tracker.check_and_update([tx1].into_iter().collect(), 100);
970 assert!(stale.is_empty());
971 assert!(tracker.previous_pending.contains(&tx1));
972 }
973
974 #[test]
975 fn evicts_transactions_present_in_both_snapshots() {
976 let mut tracker = PendingStalenessTracker::new(100);
977 let tx_stale = TxHash::random();
978 let tx_new = TxHash::random();
979
980 tracker.check_and_update([tx_stale].into_iter().collect(), 0);
982
983 let stale = tracker.check_and_update([tx_stale, tx_new].into_iter().collect(), 100);
985
986 assert_eq!(stale.len(), 1);
988 assert!(stale.contains(&tx_stale));
989
990 assert!(tracker.previous_pending.contains(&tx_new));
992 assert!(!tracker.previous_pending.contains(&tx_stale));
994 }
995
996 #[test]
997 fn should_check_returns_false_before_interval_elapsed() {
998 let mut tracker = PendingStalenessTracker::new(100);
999 let tx = TxHash::random();
1000
1001 assert!(tracker.should_check(0));
1003 tracker.check_and_update([tx].into_iter().collect(), 0);
1004
1005 assert!(!tracker.should_check(50));
1007 assert_eq!(tracker.last_snapshot_time, Some(0));
1008
1009 assert!(tracker.should_check(100));
1011 }
1012
1013 #[test]
1014 fn removes_transactions_no_longer_pending_from_snapshot() {
1015 let mut tracker = PendingStalenessTracker::new(100);
1016 let tx1 = TxHash::random();
1017 let tx2 = TxHash::random();
1018
1019 tracker.check_and_update([tx1, tx2].into_iter().collect(), 0);
1021 assert_eq!(tracker.previous_pending.len(), 2);
1022
1023 let stale = tracker.check_and_update([tx1].into_iter().collect(), 100);
1026 assert_eq!(stale.len(), 1);
1027 assert!(stale.contains(&tx1));
1028
1029 assert!(tracker.previous_pending.is_empty());
1031 }
1032 }
1033
1034 #[test]
1035 fn test_remove_mined() {
1036 let mut state = TempoPoolState::default();
1037 let hash_a = TxHash::random();
1038 let hash_b = TxHash::random();
1039 let hash_unknown = TxHash::random();
1040
1041 state.expiry_map.entry(1000).or_default().push(hash_a);
1043 state.tx_to_expiry.insert(hash_a, 1000);
1044 state.expiry_map.entry(1000).or_default().push(hash_b);
1045 state.tx_to_expiry.insert(hash_b, 1000);
1046
1047 state.untrack_expiry(&hash_a);
1049 state.untrack_expiry(&hash_unknown);
1050
1051 assert!(!state.tx_to_expiry.contains_key(&hash_a));
1053 assert_eq!(state.expiry_map[&1000], vec![hash_b]);
1054
1055 state.untrack_expiry(&hash_b);
1057 assert!(!state.tx_to_expiry.contains_key(&hash_b));
1058 assert!(!state.expiry_map.contains_key(&1000));
1059 }
1060
1061 mod key_expiry_tracker_tests {
1062 use super::*;
1063
1064 #[test]
1065 fn tracks_single_key_single_tx() {
1066 let mut tracker = KeyExpiryTracker::default();
1067 let account = Address::random();
1068 let key_id = Address::random();
1069 let tx_hash = TxHash::random();
1070 let expiry = 1000;
1071
1072 tracker.track(account, key_id, expiry, tx_hash);
1073
1074 let key = KeyId { account, key_id };
1075 assert!(tracker.key_to_txs.contains_key(&key));
1076 assert!(tracker.expiry_map.contains_key(&expiry));
1077 assert_eq!(tracker.tx_to_key.get(&tx_hash), Some(&key));
1078 }
1079
1080 #[test]
1081 fn tracks_multiple_txs_for_same_key() {
1082 let mut tracker = KeyExpiryTracker::default();
1083 let account = Address::random();
1084 let key_id = Address::random();
1085 let expiry = 1000;
1086 let tx1 = TxHash::random();
1087 let tx2 = TxHash::random();
1088
1089 tracker.track(account, key_id, expiry, tx1);
1090 tracker.track(account, key_id, expiry, tx2);
1091
1092 let key = KeyId { account, key_id };
1093 let (_, txs) = tracker.key_to_txs.get(&key).unwrap();
1094 assert_eq!(txs.len(), 2);
1095 assert!(txs.contains(&tx1));
1096 assert!(txs.contains(&tx2));
1097 assert_eq!(tracker.tx_to_key.len(), 2);
1098 }
1099
1100 #[test]
1101 fn drain_expired_returns_txs_for_expired_keys() {
1102 let mut tracker = KeyExpiryTracker::default();
1103 let account = Address::random();
1104 let key_id = Address::random();
1105 let tx1 = TxHash::random();
1106 let tx2 = TxHash::random();
1107
1108 tracker.track(account, key_id, 1000, tx1);
1110 tracker.track(account, key_id, 1000, tx2);
1111
1112 let expired = tracker.drain_expired(999);
1114 assert!(expired.is_empty());
1115
1116 let expired = tracker.drain_expired(1000);
1118 assert_eq!(expired.len(), 2);
1119 assert!(expired.contains(&tx1));
1120 assert!(expired.contains(&tx2));
1121
1122 assert!(tracker.key_to_txs.is_empty());
1124 assert!(tracker.expiry_map.is_empty());
1125 assert!(tracker.tx_to_key.is_empty());
1126 }
1127
1128 #[test]
1129 fn drain_expired_handles_multiple_keys_with_different_expiries() {
1130 let mut tracker = KeyExpiryTracker::default();
1131 let account = Address::random();
1132 let key1 = Address::random();
1133 let key2 = Address::random();
1134 let tx1 = TxHash::random();
1135 let tx2 = TxHash::random();
1136
1137 tracker.track(account, key1, 1000, tx1);
1139 tracker.track(account, key2, 2000, tx2);
1140
1141 let expired = tracker.drain_expired(1500);
1143 assert_eq!(expired.len(), 1);
1144 assert!(expired.contains(&tx1));
1145
1146 let expired = tracker.drain_expired(2000);
1148 assert_eq!(expired.len(), 1);
1149 assert!(expired.contains(&tx2));
1150 }
1151
1152 #[test]
1153 fn remove_tx_cleans_up_tx_from_key() {
1154 let mut tracker = KeyExpiryTracker::default();
1155 let account = Address::random();
1156 let key_id = Address::random();
1157 let tx1 = TxHash::random();
1158 let tx2 = TxHash::random();
1159
1160 tracker.track(account, key_id, 1000, tx1);
1161 tracker.track(account, key_id, 1000, tx2);
1162
1163 tracker.untrack(&tx1);
1165 let key = KeyId { account, key_id };
1166 let (_, txs) = tracker.key_to_txs.get(&key).unwrap();
1167 assert_eq!(txs.len(), 1);
1168 assert!(txs.contains(&tx2));
1169 assert!(tracker.expiry_map.contains_key(&1000));
1170 assert!(!tracker.tx_to_key.contains_key(&tx1));
1171 assert!(tracker.tx_to_key.contains_key(&tx2));
1172
1173 tracker.untrack(&tx2);
1175 assert!(!tracker.key_to_txs.contains_key(&key));
1176 assert!(!tracker.expiry_map.contains_key(&1000));
1177 assert!(tracker.tx_to_key.is_empty());
1178 }
1179
1180 #[test]
1181 fn remove_tx_ignores_unknown_hashes() {
1182 let mut tracker = KeyExpiryTracker::default();
1183 let account = Address::random();
1184 let key_id = Address::random();
1185 let tx = TxHash::random();
1186 let unknown = TxHash::random();
1187
1188 tracker.track(account, key_id, 1000, tx);
1189 tracker.untrack(&unknown);
1190
1191 let key = KeyId { account, key_id };
1193 assert!(tracker.key_to_txs.contains_key(&key));
1194 assert!(tracker.expiry_map.contains_key(&1000));
1195 assert_eq!(tracker.tx_to_key.len(), 1);
1196 }
1197
1198 #[test]
1199 fn remove_tx_then_drain_expired() {
1200 let mut tracker = KeyExpiryTracker::default();
1201 let account = Address::random();
1202 let key_id = Address::random();
1203 let tx1 = TxHash::random();
1204 let tx2 = TxHash::random();
1205
1206 tracker.track(account, key_id, 1000, tx1);
1207 tracker.track(account, key_id, 1000, tx2);
1208
1209 tracker.untrack(&tx1);
1211 let expired = tracker.drain_expired(1000);
1212 assert_eq!(expired.len(), 1);
1213 assert!(expired.contains(&tx2));
1214
1215 assert!(tracker.key_to_txs.is_empty());
1217 assert!(tracker.expiry_map.is_empty());
1218 assert!(tracker.tx_to_key.is_empty());
1219 }
1220 }
1221
1222 fn create_test_chain(
1223 blocks: Vec<reth_primitives_traits::RecoveredBlock<Block>>,
1224 ) -> Arc<Chain<TempoPrimitives>> {
1225 use reth_provider::{Chain, ExecutionOutcome};
1226
1227 Arc::new(Chain::new(
1228 blocks,
1229 ExecutionOutcome::default(),
1230 Default::default(),
1231 ))
1232 }
1233
1234 fn create_block_with_txs(
1236 block_number: u64,
1237 transactions: Vec<TempoTxEnvelope>,
1238 senders: Vec<Address>,
1239 ) -> RecoveredBlock<Block> {
1240 let header = TempoHeader {
1241 inner: alloy_consensus::Header {
1242 number: block_number,
1243 ..Default::default()
1244 },
1245 ..Default::default()
1246 };
1247 let body = BlockBody {
1248 transactions,
1249 ..Default::default()
1250 };
1251 let block = Block::new(header, body);
1252 RecoveredBlock::new_unhashed(block, senders)
1253 }
1254
1255 fn extract_envelope(tx: &crate::transaction::TempoPooledTransaction) -> TempoTxEnvelope {
1257 tx.inner().clone().into_inner()
1258 }
1259
1260 #[test]
1268 fn handle_reorg_correctly_identifies_orphaned_aa_2d_transactions() {
1269 let sender_2d = Address::random();
1270
1271 let tx_2d_orphaned = TxBuilder::aa(sender_2d).nonce_key(U256::from(1)).build();
1273 let hash_2d_orphaned = *tx_2d_orphaned.hash();
1274 let envelope_2d_orphaned = extract_envelope(&tx_2d_orphaned);
1275
1276 let tx_2d_reincluded = TxBuilder::aa(sender_2d).nonce_key(U256::from(2)).build();
1278 let envelope_2d_reincluded = extract_envelope(&tx_2d_reincluded);
1279
1280 let tx_2d_in_pool = TxBuilder::aa(sender_2d).nonce_key(U256::from(3)).build();
1282 let hash_2d_in_pool = *tx_2d_in_pool.hash();
1283 let envelope_2d_in_pool = extract_envelope(&tx_2d_in_pool);
1284
1285 let tx_non_2d = TxBuilder::aa(sender_2d).nonce_key(U256::ZERO).build();
1287 let envelope_non_2d = extract_envelope(&tx_non_2d);
1288
1289 let tx_eip1559 = TxBuilder::eip1559(Address::random()).build();
1291 let envelope_eip1559 = extract_envelope(&tx_eip1559);
1292
1293 let old_block = create_block_with_txs(
1295 1,
1296 vec![
1297 envelope_2d_orphaned,
1298 envelope_2d_reincluded.clone(),
1299 envelope_2d_in_pool,
1300 envelope_non_2d,
1301 envelope_eip1559,
1302 ],
1303 vec![sender_2d; 5],
1304 );
1305 let old_chain = create_test_chain(vec![old_block]);
1306
1307 let new_block = create_block_with_txs(1, vec![envelope_2d_reincluded], vec![sender_2d]);
1309 let new_chain = create_test_chain(vec![new_block]);
1310
1311 let pool_hashes: HashSet<TxHash> = [hash_2d_in_pool].into_iter().collect();
1313
1314 let (orphaned, affected_seq_ids) =
1316 handle_reorg(old_chain, new_chain, |hash| pool_hashes.contains(hash));
1317
1318 assert_eq!(
1320 orphaned.len(),
1321 1,
1322 "Expected exactly 1 orphaned tx, got {}",
1323 orphaned.len()
1324 );
1325 assert_eq!(
1326 *orphaned[0].hash(),
1327 hash_2d_orphaned,
1328 "Wrong transaction was identified as orphaned"
1329 );
1330
1331 assert_eq!(
1334 affected_seq_ids.len(),
1335 2,
1336 "Expected 2 affected seq_ids, got {}",
1337 affected_seq_ids.len()
1338 );
1339 assert!(
1340 affected_seq_ids.contains(&AASequenceId::new(sender_2d, U256::from(1))),
1341 "Should contain orphaned tx's seq_id (nonce_key=1)"
1342 );
1343 assert!(
1344 affected_seq_ids.contains(&AASequenceId::new(sender_2d, U256::from(3))),
1345 "Should contain in-pool tx's seq_id (nonce_key=3)"
1346 );
1347 assert!(
1349 !affected_seq_ids.contains(&AASequenceId::new(sender_2d, U256::from(2))),
1350 "Should NOT contain re-included tx's seq_id (nonce_key=2) - tx is in new chain"
1351 );
1352 }
1353
1354 mod from_chain_spending_limit_spends {
1355 use super::*;
1356 use alloy_signer_local::PrivateKeySigner;
1357
1358 #[test]
1360 fn extracts_keychain_tx_spending_limit_spends() {
1361 let user_address = Address::random();
1362 let access_key_signer = PrivateKeySigner::random();
1363 let key_id = access_key_signer.address();
1364 let fee_token = Address::random();
1365
1366 let keychain_tx = TxBuilder::aa(user_address)
1367 .fee_token(fee_token)
1368 .build_keychain(user_address, &access_key_signer);
1369 let envelope = extract_envelope(&keychain_tx);
1370
1371 let block = create_block_with_txs(1, vec![envelope], vec![user_address]);
1372 let chain = create_test_chain(vec![block]);
1373
1374 let updates = TempoPoolUpdates::from_chain(&chain);
1375
1376 assert!(
1377 updates
1378 .spending_limit_spends
1379 .contains(user_address, key_id, fee_token),
1380 "Should contain the keychain tx's (account, key_id, fee_token)"
1381 );
1382 assert_eq!(updates.spending_limit_spends.len(), 1);
1383 }
1384
1385 #[test]
1387 fn ignores_non_keychain_aa_transactions() {
1388 let sender = Address::random();
1389 let tx = TxBuilder::aa(sender).fee_token(Address::random()).build();
1390 let envelope = extract_envelope(&tx);
1391
1392 let block = create_block_with_txs(1, vec![envelope], vec![sender]);
1393 let chain = create_test_chain(vec![block]);
1394
1395 let updates = TempoPoolUpdates::from_chain(&chain);
1396 assert!(updates.spending_limit_spends.is_empty());
1397 }
1398
1399 #[test]
1401 fn ignores_eip1559_transactions() {
1402 let sender = Address::random();
1403 let tx = TxBuilder::eip1559(Address::random()).build_eip1559();
1404 let envelope = extract_envelope(&tx);
1405
1406 let block = create_block_with_txs(1, vec![envelope], vec![sender]);
1407 let chain = create_test_chain(vec![block]);
1408
1409 let updates = TempoPoolUpdates::from_chain(&chain);
1410 assert!(updates.spending_limit_spends.is_empty());
1411 }
1412
1413 #[test]
1415 fn uses_wildcard_fee_token_when_none_set() {
1416 let user_address = Address::random();
1417 let access_key_signer = PrivateKeySigner::random();
1418 let key_id = access_key_signer.address();
1419
1420 let keychain_tx =
1422 TxBuilder::aa(user_address).build_keychain(user_address, &access_key_signer);
1423 let envelope = extract_envelope(&keychain_tx);
1424
1425 let block = create_block_with_txs(1, vec![envelope], vec![user_address]);
1426 let chain = create_test_chain(vec![block]);
1427
1428 let updates = TempoPoolUpdates::from_chain(&chain);
1429
1430 assert!(updates.spending_limit_spends.contains(
1432 user_address,
1433 key_id,
1434 Address::random(),
1435 ));
1436 }
1437
1438 #[test]
1440 fn has_invalidation_events_includes_spending_limit_spends() {
1441 let mut updates = TempoPoolUpdates::new();
1442 assert!(!updates.has_invalidation_events());
1443
1444 updates.spending_limit_spends.insert(
1445 Address::random(),
1446 Address::random(),
1447 Some(Address::random()),
1448 );
1449 assert!(updates.has_invalidation_events());
1450 }
1451 }
1452}