1use crate::{
4 RevokedKeys, SpendingLimitUpdates, TempoTransactionPool,
5 metrics::TempoPoolMaintenanceMetrics,
6 paused::{PausedEntry, PausedFeeTokenPool},
7 transaction::TempoPooledTransaction,
8};
9use alloy_consensus::transaction::TxHashRef;
10use alloy_primitives::{
11 Address, TxHash,
12 map::{AddressMap, HashMap, HashSet},
13};
14use alloy_sol_types::SolEvent;
15use futures::StreamExt;
16use itertools::{Either, Itertools};
17use reth_chainspec::ChainSpecProvider;
18use reth_primitives_traits::AlloyBlockHeader;
19use reth_provider::{CanonStateNotification, CanonStateSubscriptions, Chain, HeaderProvider};
20use reth_storage_api::StateProviderFactory;
21use reth_transaction_pool::{PoolTransaction, TransactionPool};
22use std::{
23 collections::{BTreeMap, btree_map::Entry},
24 time::Instant,
25};
26use tempo_chainspec::TempoChainSpec;
27use tempo_contracts::precompiles::{IAccountKeychain, IFeeManager, ITIP20, ITIP403Registry};
28use tempo_precompiles::{
29 ACCOUNT_KEYCHAIN_ADDRESS, TIP_FEE_MANAGER_ADDRESS, TIP403_REGISTRY_ADDRESS,
30};
31use tempo_primitives::{TempoAddressExt, TempoHeader, TempoPrimitives};
32use tracing::{debug, error};
33
34const EVICTION_BUFFER_SECS: u64 = 3;
37
38#[derive(Debug, Default)]
43pub struct TempoPoolUpdates {
44 pub expired_txs: Vec<TxHash>,
46 pub revoked_keys: RevokedKeys,
49 pub spending_limit_changes: SpendingLimitUpdates,
54 pub validator_token_changes: AddressMap<Address>,
58 pub user_token_changes: HashSet<Address>,
65 pub blacklist_additions: Vec<(u64, Address)>,
67 pub whitelist_removals: Vec<(u64, Address)>,
69 pub pause_events: Vec<(Address, bool)>,
71 pub transfer_policy_updates: HashSet<Address>,
75 pub fee_balance_changes: AddressMap<HashSet<Address>>,
80 pub spending_limit_spends: SpendingLimitUpdates,
88}
89
90impl TempoPoolUpdates {
91 pub fn new() -> Self {
93 Self::default()
94 }
95
96 pub fn is_empty(&self) -> bool {
98 self.expired_txs.is_empty()
99 && self.revoked_keys.is_empty()
100 && self.spending_limit_changes.is_empty()
101 && self.validator_token_changes.is_empty()
102 && self.user_token_changes.is_empty()
103 && self.blacklist_additions.is_empty()
104 && self.whitelist_removals.is_empty()
105 && self.pause_events.is_empty()
106 && self.transfer_policy_updates.is_empty()
107 && self.fee_balance_changes.is_empty()
108 && self.spending_limit_spends.is_empty()
109 }
110
111 pub fn from_chain(chain: &Chain<TempoPrimitives>) -> Self {
116 let mut updates = Self::new();
117
118 for log in chain
120 .execution_outcome()
121 .receipts()
122 .iter()
123 .flatten()
124 .flat_map(|receipt| &receipt.logs)
125 {
126 if log.address == ACCOUNT_KEYCHAIN_ADDRESS {
128 if let Ok(event) = IAccountKeychain::KeyRevoked::decode_log(log) {
129 updates.revoked_keys.insert(event.account, event.publicKey);
130 } else if let Ok(event) = IAccountKeychain::SpendingLimitUpdated::decode_log(log) {
131 updates.spending_limit_changes.insert(
132 event.account,
133 event.publicKey,
134 Some(event.token),
135 );
136 } else if let Ok(event) = IAccountKeychain::AccessKeySpend::decode_log(log) {
137 updates.spending_limit_spends.insert(
138 event.account,
139 event.publicKey,
140 Some(event.token),
141 );
142 }
143 }
144 else if log.address == TIP_FEE_MANAGER_ADDRESS {
146 if let Ok(event) = IFeeManager::ValidatorTokenSet::decode_log(log) {
147 updates
148 .validator_token_changes
149 .insert(event.validator, event.token);
150 } else if let Ok(event) = IFeeManager::UserTokenSet::decode_log(log) {
151 updates.user_token_changes.insert(event.user);
152 }
153 }
154 else if log.address == TIP403_REGISTRY_ADDRESS {
156 if let Ok(event) = ITIP403Registry::BlacklistUpdated::decode_log(log)
157 && event.restricted
158 {
159 updates
160 .blacklist_additions
161 .push((event.policyId, event.account));
162 } else if let Ok(event) = ITIP403Registry::WhitelistUpdated::decode_log(log)
163 && !event.allowed
164 {
165 updates
166 .whitelist_removals
167 .push((event.policyId, event.account));
168 }
169 }
170 else if log.address.is_tip20() {
172 if let Ok(event) = ITIP20::PauseStateUpdate::decode_log(log) {
173 updates.pause_events.push((log.address, event.isPaused));
174 } else if ITIP20::TransferPolicyUpdate::decode_log(log).is_ok() {
175 updates.transfer_policy_updates.insert(log.address);
176 } else if let Ok(event) = ITIP20::Transfer::decode_log(log) {
177 updates
178 .fee_balance_changes
179 .entry(log.address)
180 .or_default()
181 .insert(event.from);
182 }
183 }
184 }
185
186 updates
187 }
188
189 pub fn has_invalidation_events(&self) -> bool {
191 !self.revoked_keys.is_empty()
192 || !self.spending_limit_changes.is_empty()
193 || !self.spending_limit_spends.is_empty()
194 || !self.validator_token_changes.is_empty()
195 || !self.user_token_changes.is_empty()
196 || !self.blacklist_additions.is_empty()
197 || !self.whitelist_removals.is_empty()
198 || !self.fee_balance_changes.is_empty()
199 }
200}
201
202#[derive(Default)]
210struct TempoPoolState {
211 expiry_map: BTreeMap<u64, Vec<TxHash>>,
213 tx_to_expiry: HashMap<TxHash, u64>,
215 paused_pool: PausedFeeTokenPool,
217 pending_staleness: PendingStalenessTracker,
219}
220
221impl TempoPoolState {
222 fn track(&mut self, tx: &TempoPooledTransaction) {
224 let valid_before = tx
225 .inner()
226 .as_aa()
227 .and_then(|tx| tx.tx().valid_before.map(|value| value.get()));
228 let key_expiry = tx.key_expiry();
229
230 let expiry = [valid_before, key_expiry].into_iter().flatten().min();
231
232 if let Some(expiry) = expiry {
233 self.expiry_map.entry(expiry).or_default().push(*tx.hash());
234 self.tx_to_expiry.insert(*tx.hash(), expiry);
235 }
236 }
237
238 fn untrack(&mut self, hash: &TxHash) {
240 if let Some(expiry) = self.tx_to_expiry.remove(hash)
241 && let Entry::Occupied(mut entry) = self.expiry_map.entry(expiry)
242 {
243 entry.get_mut().retain(|h| *h != *hash);
244 if entry.get().is_empty() {
245 entry.remove();
246 }
247 }
248 }
249
250 fn drain_expired(&mut self, tip_timestamp: u64) -> Vec<TxHash> {
253 let mut expired = Vec::new();
254 while let Some(entry) = self.expiry_map.first_entry()
255 && *entry.key() <= tip_timestamp
256 {
257 let expired_hashes = entry.remove();
258 for tx_hash in &expired_hashes {
259 self.tx_to_expiry.remove(tx_hash);
260 }
261 expired.extend(expired_hashes);
262 }
263 expired
264 }
265}
266
267const DEFAULT_PENDING_STALENESS_INTERVAL: u64 = 30 * 60;
270
271#[derive(Debug)]
278struct PendingStalenessTracker {
279 previous_pending: HashSet<TxHash>,
281 last_snapshot_time: Option<u64>,
283 interval_secs: u64,
285}
286
287impl PendingStalenessTracker {
288 fn new(interval_secs: u64) -> Self {
290 Self {
291 previous_pending: HashSet::default(),
292 last_snapshot_time: None,
293 interval_secs,
294 }
295 }
296
297 fn should_check(&self, now: u64) -> bool {
299 self.last_snapshot_time
300 .is_none_or(|last| now.saturating_sub(last) >= self.interval_secs)
301 }
302
303 fn check_and_update(&mut self, current_pending: HashSet<TxHash>, now: u64) -> Vec<TxHash> {
310 let previous_pending = std::mem::take(&mut self.previous_pending);
311
312 let (stale, next_pending): (Vec<TxHash>, HashSet<TxHash>) =
316 current_pending.into_iter().partition_map(|hash| {
317 if previous_pending.contains(&hash) {
318 Either::Left(hash)
319 } else {
320 Either::Right(hash)
321 }
322 });
323
324 self.previous_pending = next_pending;
325 self.last_snapshot_time = Some(now);
326
327 stale
328 }
329}
330
331impl Default for PendingStalenessTracker {
332 fn default() -> Self {
333 Self::new(DEFAULT_PENDING_STALENESS_INTERVAL)
334 }
335}
336
337pub async fn maintain_tempo_pool<Client>(pool: TempoTransactionPool<Client>)
350where
351 Client: StateProviderFactory
352 + HeaderProvider<Header = TempoHeader>
353 + ChainSpecProvider<ChainSpec = TempoChainSpec>
354 + CanonStateSubscriptions<Primitives = TempoPrimitives>
355 + 'static,
356{
357 let mut state = TempoPoolState::default();
358 let metrics = TempoPoolMaintenanceMetrics::default();
359
360 let mut new_txs = pool.new_transactions_listener();
362 let mut chain_events = pool.client().canonical_state_stream();
363
364 let all_txs = pool.all_transactions();
366 for tx in all_txs.pending.iter().chain(all_txs.queued.iter()) {
367 state.track(&tx.transaction);
368 }
369
370 let amm_cache = pool.amm_liquidity_cache();
371
372 loop {
373 tokio::select! {
374 tx_event = new_txs.recv() => {
376 let Some(tx_event) = tx_event else {
377 break;
378 };
379
380 state.track(&tx_event.transaction.transaction);
381 }
382
383 Some(event) = chain_events.next() => {
385 let new = match event {
386 CanonStateNotification::Reorg { old: _, new } => {
387 if let Err(err) = amm_cache.repopulate(pool.client()) {
390 error!(target: "txpool", ?err, "AMM liquidity cache repopulate after reorg failed");
391 }
392
393 new
394 }
395 CanonStateNotification::Commit { new } => new,
396 };
397
398 let block_update_start = Instant::now();
399
400 let tip = &new;
401 let bundle_state = tip.execution_outcome().state().state();
402 let tip_timestamp = tip.tip().header().timestamp();
403
404 let mut updates = TempoPoolUpdates::from_chain(tip);
406
407 tip.blocks_iter()
409 .flat_map(|block| block.body().transactions())
410 .for_each(|tx| {
411 state.untrack(tx.tx_hash())
412 });
413
414 let max_expiry = tip_timestamp.saturating_add(EVICTION_BUFFER_SECS);
417
418 let expired = state.drain_expired(max_expiry);
420 updates.expired_txs = expired.into_iter().filter(|h| pool.contains(h)).collect();
421
422 let expired_start = Instant::now();
424 let expired_count = updates.expired_txs.len();
425 if expired_count > 0 {
426 debug!(
427 target: "txpool",
428 count = expired_count,
429 tip_timestamp,
430 "Evicting expired AA transactions (valid_before)"
431 );
432 pool.remove_transactions(updates.expired_txs.clone());
433 metrics.expired_transactions_evicted.increment(expired_count as u64);
434 }
435 metrics.expired_eviction_duration_seconds.record(expired_start.elapsed());
436
437 let pause_start = Instant::now();
439
440 let pause_tokens: Vec<Address> = updates
444 .pause_events
445 .iter()
446 .filter_map(|(token, is_paused)| is_paused.then_some(*token))
447 .collect();
448
449 if !pause_tokens.is_empty() {
452 let all_txs = pool.all_transactions();
453
454 let mut by_token: AddressMap<Vec<TxHash>> = AddressMap::default();
457 for tx in all_txs.pending.iter().chain(all_txs.queued.iter()) {
458 if let Some(fee_token) = tx.transaction.inner().fee_token() {
459 by_token.entry(fee_token).or_default().push(*tx.hash());
460 }
461 }
462
463 for token in pause_tokens {
465 let Some(hashes_to_pause) = by_token.remove(&token) else {
466 continue;
468 };
469
470 let removed_txs = pool.remove_transactions(hashes_to_pause);
471 let count = removed_txs.len();
472
473 if count > 0 {
474 for tx in &removed_txs {
476 state.untrack(tx.hash());
477 }
478
479 let entries: Vec<_> = removed_txs
480 .into_iter()
481 .map(|tx| {
482 let valid_before = tx
483 .transaction
484 .inner()
485 .as_aa()
486 .and_then(|aa| aa.tx().valid_before.map(|value| value.get()));
487 PausedEntry { tx, valid_before }
488 })
489 .collect();
490
491 let cap_evicted = state.paused_pool.insert_batch(token, entries);
492 metrics.transactions_paused.increment(count as u64);
493 if cap_evicted > 0 {
494 metrics.paused_pool_cap_evicted.increment(cap_evicted as u64);
495 debug!(
496 target: "txpool",
497 cap_evicted,
498 "Evicted oldest paused transactions due to global cap"
499 );
500 }
501 debug!(
502 target: "txpool",
503 %token,
504 count,
505 "Moved transactions to paused pool (fee token paused)"
506 );
507 }
508 }
509 }
510
511 for (token, is_paused) in &updates.pause_events {
513 if *is_paused {
514 continue; }
516
517 let paused_entries = state.paused_pool.drain_token(token);
519 if !paused_entries.is_empty() {
520 let count = paused_entries.len();
521 metrics.transactions_unpaused.increment(count as u64);
522 let pool_clone = pool.clone();
523 let token = *token;
524 tokio::spawn(async move {
525 let txs: Vec<_> = paused_entries
526 .into_iter()
527 .map(|e| e.tx.transaction.clone())
528 .collect();
529
530 let results = pool_clone
531 .add_external_transactions(txs)
532 .await;
533
534 let success = results.iter().filter(|r| r.is_ok()).count();
535 debug!(
536 target: "txpool",
537 %token,
538 total = count,
539 success,
540 "Restored transactions from paused pool (fee token unpaused)"
541 );
542 });
543 }
544 }
545
546 let paused_expired = state.paused_pool.evict_expired(tip_timestamp);
548 let paused_timed_out = state.paused_pool.evict_timed_out();
549 let total_paused_evicted = paused_expired + paused_timed_out;
550 if total_paused_evicted > 0 {
551 debug!(
552 target: "txpool",
553 count = total_paused_evicted,
554 tip_timestamp,
555 "Evicted expired transactions from paused pool"
556 );
557 }
558
559 if !updates.revoked_keys.is_empty()
561 || !updates.spending_limit_changes.is_empty()
562 || !updates.spending_limit_spends.is_empty()
563 {
564 state.paused_pool.evict_invalidated(
565 &updates.revoked_keys,
566 &updates.spending_limit_changes,
567 &updates.spending_limit_spends,
568 );
569 }
570 metrics.pause_events_duration_seconds.record(pause_start.elapsed());
571
572 if !updates.transfer_policy_updates.is_empty() {
577 let all_txs = pool.all_transactions();
578 let hashes: Vec<TxHash> = all_txs
579 .pending
580 .iter()
581 .chain(all_txs.queued.iter())
582 .filter(|tx| {
583 tx.transaction
584 .resolved_fee_token()
585 .is_some_and(|t| updates.transfer_policy_updates.contains(&t))
586 })
587 .map(|tx| *tx.hash())
588 .collect();
589
590 if !hashes.is_empty() {
591 let removed_txs = pool.remove_transactions(hashes);
592 let count = removed_txs.len();
593
594 for tx in &removed_txs {
595 state.untrack(tx.hash());
596 }
597
598 metrics
599 .transfer_policy_revalidated
600 .increment(count as u64);
601
602 let pool_clone = pool.clone();
603 tokio::spawn(async move {
604 let txs: Vec<_> = removed_txs
605 .into_iter()
606 .map(|tx| (tx.origin, tx.transaction.clone()))
607 .collect();
608
609 let results =
610 pool_clone.add_transactions_with_origins(txs).await;
611
612 let success =
613 results.iter().filter(|r| r.is_ok()).count();
614 debug!(
615 target: "txpool",
616 total = count,
617 success,
618 "Re-validated transactions after transfer policy update"
619 );
620 });
621 }
622 }
623
624 let nonce_pool_start = Instant::now();
627 pool.notify_aa_pool_on_state_updates(bundle_state);
628 metrics.nonce_pool_update_duration_seconds.record(nonce_pool_start.elapsed());
629
630 let amm_start = Instant::now();
632 amm_cache.on_new_state(tip.execution_outcome());
633 if let Err(err) = amm_cache.on_new_blocks(tip.blocks_iter().map(|block| block.sealed_header()), pool.client()) {
634 error!(target: "txpool", ?err, "AMM liquidity cache update failed");
635 }
636 metrics.amm_cache_update_duration_seconds.record(amm_start.elapsed());
637
638 if updates.has_invalidation_events() {
643 let invalidation_start = Instant::now();
644 debug!(
645 target: "txpool",
646 revoked_keys = updates.revoked_keys.len(),
647 spending_limit_changes = updates.spending_limit_changes.len(),
648 spending_limit_spends = updates.spending_limit_spends.len(),
649 validator_token_changes = updates.validator_token_changes.len(),
650 user_token_changes = updates.user_token_changes.len(),
651 blacklist_additions = updates.blacklist_additions.len(),
652 whitelist_removals = updates.whitelist_removals.len(),
653 "Processing transaction invalidation events"
654 );
655 let evicted = pool.evict_invalidated_transactions(&updates);
656 for hash in &evicted {
657 state.untrack(hash);
658 }
659 metrics.transactions_invalidated.increment(evicted.len() as u64);
660 metrics
661 .invalidation_eviction_duration_seconds
662 .record(invalidation_start.elapsed());
663 }
664
665 if state.pending_staleness.should_check(tip_timestamp) {
669 let current_pending: HashSet<TxHash> =
670 pool.pending_transactions().iter().map(|tx| *tx.hash()).collect();
671 let stale_to_evict =
672 state.pending_staleness.check_and_update(current_pending, tip_timestamp);
673
674 if !stale_to_evict.is_empty() {
675 debug!(
676 target: "txpool",
677 count = stale_to_evict.len(),
678 tip_timestamp,
679 "Evicting stale pending transactions"
680 );
681 for hash in &stale_to_evict {
683 state.untrack(hash);
684 }
685 pool.remove_transactions(stale_to_evict);
686 }
687 }
688
689 metrics.block_update_duration_seconds.record(block_update_start.elapsed());
691 }
692 }
693 }
694}
695
696#[cfg(test)]
697mod tests {
698 use super::*;
699 use crate::test_utils::TxBuilder;
700 use alloy_primitives::{Address, TxHash};
701 use reth_primitives_traits::RecoveredBlock;
702 use std::sync::Arc;
703 use tempo_primitives::{Block, BlockBody, TempoHeader, TempoTxEnvelope};
704
705 mod pending_staleness_tracker_tests {
706 use super::*;
707
708 #[test]
709 fn no_eviction_on_first_snapshot() {
710 let mut tracker = PendingStalenessTracker::new(100);
711 let tx1 = TxHash::random();
712
713 let stale = tracker.check_and_update([tx1].into_iter().collect(), 100);
715 assert!(stale.is_empty());
716 assert!(tracker.previous_pending.contains(&tx1));
717 }
718
719 #[test]
720 fn evicts_transactions_present_in_both_snapshots() {
721 let mut tracker = PendingStalenessTracker::new(100);
722 let tx_stale = TxHash::random();
723 let tx_new = TxHash::random();
724
725 tracker.check_and_update([tx_stale].into_iter().collect(), 0);
727
728 let stale = tracker.check_and_update([tx_stale, tx_new].into_iter().collect(), 100);
730
731 assert_eq!(stale.len(), 1);
733 assert!(stale.contains(&tx_stale));
734
735 assert!(tracker.previous_pending.contains(&tx_new));
737 assert!(!tracker.previous_pending.contains(&tx_stale));
739 }
740
741 #[test]
742 fn should_check_returns_false_before_interval_elapsed() {
743 let mut tracker = PendingStalenessTracker::new(100);
744 let tx = TxHash::random();
745
746 assert!(tracker.should_check(0));
748 tracker.check_and_update([tx].into_iter().collect(), 0);
749
750 assert!(!tracker.should_check(50));
752 assert_eq!(tracker.last_snapshot_time, Some(0));
753
754 assert!(tracker.should_check(100));
756 }
757
758 #[test]
759 fn removes_transactions_no_longer_pending_from_snapshot() {
760 let mut tracker = PendingStalenessTracker::new(100);
761 let tx1 = TxHash::random();
762 let tx2 = TxHash::random();
763
764 tracker.check_and_update([tx1, tx2].into_iter().collect(), 0);
766 assert_eq!(tracker.previous_pending.len(), 2);
767
768 let stale = tracker.check_and_update([tx1].into_iter().collect(), 100);
771 assert_eq!(stale.len(), 1);
772 assert!(stale.contains(&tx1));
773
774 assert!(tracker.previous_pending.is_empty());
776 }
777 }
778
779 #[test]
780 fn test_remove_mined() {
781 let mut state = TempoPoolState::default();
782 let hash_a = TxHash::random();
783 let hash_b = TxHash::random();
784 let hash_unknown = TxHash::random();
785
786 state.expiry_map.entry(1000).or_default().push(hash_a);
788 state.tx_to_expiry.insert(hash_a, 1000);
789 state.expiry_map.entry(1000).or_default().push(hash_b);
790 state.tx_to_expiry.insert(hash_b, 1000);
791
792 state.untrack(&hash_a);
794 state.untrack(&hash_unknown);
795
796 assert!(!state.tx_to_expiry.contains_key(&hash_a));
798 assert_eq!(state.expiry_map[&1000], vec![hash_b]);
799
800 state.untrack(&hash_b);
802 assert!(!state.tx_to_expiry.contains_key(&hash_b));
803 assert!(!state.expiry_map.contains_key(&1000));
804 }
805
806 fn create_test_chain(
807 blocks: Vec<reth_primitives_traits::RecoveredBlock<Block>>,
808 ) -> Arc<Chain<TempoPrimitives>> {
809 create_test_chain_with_receipts(blocks, Vec::new())
810 }
811
812 fn create_test_chain_with_receipts(
813 blocks: Vec<reth_primitives_traits::RecoveredBlock<Block>>,
814 receipts: Vec<Vec<tempo_primitives::TempoReceipt>>,
815 ) -> Arc<Chain<TempoPrimitives>> {
816 use reth_provider::{Chain, ExecutionOutcome};
817
818 Arc::new(Chain::new(
819 blocks,
820 ExecutionOutcome {
821 receipts,
822 ..Default::default()
823 },
824 Default::default(),
825 ))
826 }
827
828 fn create_block_with_txs(
830 block_number: u64,
831 transactions: Vec<TempoTxEnvelope>,
832 senders: Vec<Address>,
833 ) -> RecoveredBlock<Block> {
834 let header = TempoHeader {
835 inner: alloy_consensus::Header {
836 number: block_number,
837 ..Default::default()
838 },
839 ..Default::default()
840 };
841 let body = BlockBody {
842 transactions,
843 ..Default::default()
844 };
845 let block = Block::new(header, body);
846 RecoveredBlock::new_unhashed(block, senders)
847 }
848
849 fn extract_envelope(tx: &crate::transaction::TempoPooledTransaction) -> TempoTxEnvelope {
851 tx.inner().clone().into_inner()
852 }
853
854 mod from_chain_spending_limit_spends {
855 use super::*;
856 use alloy_primitives::{IntoLogData, Log, U256};
857 use alloy_signer_local::PrivateKeySigner;
858 use tempo_primitives::{TempoReceipt, TempoTxType};
859
860 #[test]
863 fn extracts_access_key_spend_events() {
864 let user_address = Address::random();
865 let access_key_signer = PrivateKeySigner::random();
866 let key_id = access_key_signer.address();
867 let fee_token = Address::random();
868 let spent_token = Address::random();
869
870 let keychain_tx = TxBuilder::aa(user_address)
871 .fee_token(fee_token)
872 .build_keychain(user_address, &access_key_signer);
873 let envelope = extract_envelope(&keychain_tx);
874
875 let spend_log = alloy_primitives::Log::new_from_event_unchecked(
876 ACCOUNT_KEYCHAIN_ADDRESS,
877 IAccountKeychain::AccessKeySpend {
878 account: user_address,
879 publicKey: key_id,
880 token: spent_token,
881 amount: U256::from(25),
882 remainingLimit: U256::from(75),
883 },
884 )
885 .reserialize();
886 let receipt = tempo_primitives::TempoReceipt {
887 tx_type: tempo_primitives::TempoTxType::AA,
888 success: true,
889 cumulative_gas_used: 1,
890 logs: vec![spend_log],
891 };
892
893 let block = create_block_with_txs(1, vec![envelope], vec![user_address]);
894 let chain = create_test_chain_with_receipts(vec![block], vec![vec![receipt]]);
895
896 let updates = TempoPoolUpdates::from_chain(&chain);
897
898 assert!(
899 updates
900 .spending_limit_spends
901 .contains(user_address, key_id, spent_token),
902 "Should contain the AccessKeySpend event's (account, key_id, token)"
903 );
904 assert!(
905 !updates
906 .spending_limit_spends
907 .contains(user_address, key_id, fee_token),
908 "Should not infer spends from the tx fee token"
909 );
910 assert_eq!(updates.spending_limit_spends.len(), 1);
911 }
912
913 #[test]
916 fn ignores_keychain_transactions_without_access_key_spend_logs() {
917 let user_address = Address::random();
918 let access_key_signer = PrivateKeySigner::random();
919 let fee_token = Address::random();
920
921 let keychain_tx = TxBuilder::aa(user_address)
922 .fee_token(fee_token)
923 .build_keychain(user_address, &access_key_signer);
924 let envelope = extract_envelope(&keychain_tx);
925
926 let block = create_block_with_txs(1, vec![envelope], vec![user_address]);
927 let chain = create_test_chain(vec![block]);
928
929 let updates = TempoPoolUpdates::from_chain(&chain);
930 assert!(updates.spending_limit_spends.is_empty());
931 }
932
933 #[test]
935 fn ignores_non_keychain_aa_transactions() {
936 let sender = Address::random();
937 let tx = TxBuilder::aa(sender).fee_token(Address::random()).build();
938 let envelope = extract_envelope(&tx);
939
940 let block = create_block_with_txs(1, vec![envelope], vec![sender]);
941 let chain = create_test_chain(vec![block]);
942
943 let updates = TempoPoolUpdates::from_chain(&chain);
944 assert!(updates.spending_limit_spends.is_empty());
945 }
946
947 #[test]
949 fn ignores_eip1559_transactions() {
950 let sender = Address::random();
951 let tx = TxBuilder::eip1559(Address::random()).build_eip1559();
952 let envelope = extract_envelope(&tx);
953
954 let block = create_block_with_txs(1, vec![envelope], vec![sender]);
955 let chain = create_test_chain(vec![block]);
956
957 let updates = TempoPoolUpdates::from_chain(&chain);
958 assert!(updates.spending_limit_spends.is_empty());
959 }
960
961 #[test]
963 fn has_invalidation_events_includes_spending_limit_spends() {
964 let mut updates = TempoPoolUpdates::new();
965 assert!(!updates.has_invalidation_events());
966
967 updates.spending_limit_spends.insert(
968 Address::random(),
969 Address::random(),
970 Some(Address::random()),
971 );
972 assert!(updates.has_invalidation_events());
973 }
974
975 #[test]
976 fn extracts_fee_balance_changes_from_tip20_transfer_logs() {
977 let fee_token = tempo_precompiles::PATH_USD_ADDRESS;
978 let from = Address::random();
979 let to = Address::random();
980 let amount = U256::from(42_u64);
981 let log_data = ITIP20::Transfer { from, to, amount }.into_log_data();
982 let log =
983 Log::new_unchecked(fee_token, log_data.topics().to_vec(), log_data.data.clone());
984 let receipt = TempoReceipt {
985 tx_type: TempoTxType::Legacy,
986 success: true,
987 cumulative_gas_used: 21_000,
988 logs: vec![log],
989 };
990
991 let block = create_block_with_txs(1, vec![], vec![]);
992 let chain = create_test_chain_with_receipts(vec![block], vec![vec![receipt]]);
993 let updates = TempoPoolUpdates::from_chain(&chain);
994
995 assert!(
996 updates
997 .fee_balance_changes
998 .get(&fee_token)
999 .is_some_and(|accounts| accounts.len() == 1 && accounts.contains(&from)),
1000 "TIP20 transfer logs should only mark the debited sender as balance-changed"
1001 );
1002 assert!(updates.has_invalidation_events());
1003 }
1004
1005 #[test]
1007 fn extracts_transfer_policy_updates() {
1008 let fee_token = tempo_precompiles::PATH_USD_ADDRESS;
1009 let updater = Address::random();
1010 let new_policy_id = 42u64;
1011 let log_data = ITIP20::TransferPolicyUpdate {
1012 updater,
1013 newPolicyId: new_policy_id,
1014 }
1015 .into_log_data();
1016 let log =
1017 Log::new_unchecked(fee_token, log_data.topics().to_vec(), log_data.data.clone());
1018 let receipt = TempoReceipt {
1019 tx_type: TempoTxType::Legacy,
1020 success: true,
1021 cumulative_gas_used: 21_000,
1022 logs: vec![log],
1023 };
1024
1025 let block = create_block_with_txs(1, vec![], vec![]);
1026 let chain = create_test_chain_with_receipts(vec![block], vec![vec![receipt]]);
1027 let updates = TempoPoolUpdates::from_chain(&chain);
1028
1029 assert!(
1030 updates.transfer_policy_updates.contains(&fee_token),
1031 "TransferPolicyUpdate should be tracked by token address"
1032 );
1033 }
1034
1035 #[test]
1037 fn transfer_policy_updates_deduplicates_by_token() {
1038 let fee_token = tempo_precompiles::PATH_USD_ADDRESS;
1039
1040 let log_data_1 = ITIP20::TransferPolicyUpdate {
1041 updater: Address::random(),
1042 newPolicyId: 1,
1043 }
1044 .into_log_data();
1045 let log_data_2 = ITIP20::TransferPolicyUpdate {
1046 updater: Address::random(),
1047 newPolicyId: 2,
1048 }
1049 .into_log_data();
1050 let log1 = Log::new_unchecked(
1051 fee_token,
1052 log_data_1.topics().to_vec(),
1053 log_data_1.data.clone(),
1054 );
1055 let log2 = Log::new_unchecked(
1056 fee_token,
1057 log_data_2.topics().to_vec(),
1058 log_data_2.data.clone(),
1059 );
1060 let receipt = TempoReceipt {
1061 tx_type: TempoTxType::Legacy,
1062 success: true,
1063 cumulative_gas_used: 21_000,
1064 logs: vec![log1, log2],
1065 };
1066
1067 let block = create_block_with_txs(1, vec![], vec![]);
1068 let chain = create_test_chain_with_receipts(vec![block], vec![vec![receipt]]);
1069 let updates = TempoPoolUpdates::from_chain(&chain);
1070
1071 assert_eq!(
1072 updates.transfer_policy_updates.len(),
1073 1,
1074 "duplicate policy updates for the same token should be deduplicated"
1075 );
1076 }
1077
1078 #[test]
1080 fn validator_token_changes_deduplicates_by_validator() {
1081 let validator = Address::random();
1082 let token_a = Address::random();
1083 let token_b = Address::random();
1084
1085 let mut updates = TempoPoolUpdates::new();
1086 updates.validator_token_changes.insert(validator, token_a);
1087 updates.validator_token_changes.insert(validator, token_b);
1088
1089 assert_eq!(
1090 updates.validator_token_changes.len(),
1091 1,
1092 "duplicate validator entries must be deduplicated"
1093 );
1094 assert_eq!(
1095 updates.validator_token_changes.get(&validator).copied(),
1096 Some(token_b),
1097 "last-write-wins: second token should overwrite the first"
1098 );
1099 }
1100 }
1101}