1use crate::{
4 RevokedKeys, SpendingLimitUpdates, TempoTransactionPool,
5 metrics::TempoPoolMaintenanceMetrics,
6 paused::{PausedEntry, PausedFeeTokenPool},
7 transaction::TempoPooledTransaction,
8};
9use alloy_primitives::{
10 Address, B256, Log, TxHash,
11 map::{AddressMap, AddressSet, B256Map, B256Set},
12};
13use alloy_sol_types::SolEvent;
14use futures::StreamExt;
15use itertools::{Either, Itertools};
16use reth_chainspec::ChainSpecProvider;
17use reth_primitives_traits::AlloyBlockHeader;
18use reth_provider::{CanonStateNotification, CanonStateSubscriptions, Chain, HeaderProvider};
19use reth_storage_api::StateProviderFactory;
20use reth_transaction_pool::{AllPoolTransactions, PoolTransaction, TransactionPool};
21use std::{
22 collections::{BTreeMap, btree_map::Entry},
23 time::Instant,
24};
25use tempo_chainspec::TempoChainSpec;
26use tempo_contracts::precompiles::{IAccountKeychain, IFeeManager, ITIP20, ITIP403Registry};
27use tempo_precompiles::{
28 ACCOUNT_KEYCHAIN_ADDRESS, TIP_FEE_MANAGER_ADDRESS, TIP403_REGISTRY_ADDRESS,
29};
30use tempo_primitives::{TempoAddressExt, TempoHeader, TempoPrimitives};
31use tracing::{debug, error};
32
33const EVICTION_BUFFER_SECS: u64 = 3;
36
37const NEW_TX_DRAIN_LIMIT: usize = 4096;
41
42#[derive(Debug, Default)]
47pub struct TempoPoolUpdates {
48 pub revoked_keys: RevokedKeys,
51 pub key_authorization_target_changes: RevokedKeys,
56 pub spending_limit_changes: SpendingLimitUpdates,
61 pub validator_token_changes: AddressMap<Address>,
65 pub user_token_changes: AddressSet,
72 pub blacklist_additions: Vec<(u64, Address)>,
74 pub whitelist_removals: Vec<(u64, Address)>,
76 pub pause_events: Vec<(Address, bool)>,
78 pub transfer_policy_updates: AddressSet,
82 pub quote_token_updates: AddressSet,
86 pub fee_balance_changes: AddressMap<AddressSet>,
91 pub spending_limit_spends: SpendingLimitUpdates,
99 pub key_authorization_witness_burns: AddressMap<B256Set>,
104}
105
106impl TempoPoolUpdates {
107 pub fn new() -> Self {
109 Self::default()
110 }
111
112 pub fn is_empty(&self) -> bool {
114 self.revoked_keys.is_empty()
115 && self.key_authorization_target_changes.is_empty()
116 && self.spending_limit_changes.is_empty()
117 && self.validator_token_changes.is_empty()
118 && self.user_token_changes.is_empty()
119 && self.blacklist_additions.is_empty()
120 && self.whitelist_removals.is_empty()
121 && self.pause_events.is_empty()
122 && self.transfer_policy_updates.is_empty()
123 && self.quote_token_updates.is_empty()
124 && self.fee_balance_changes.is_empty()
125 && self.spending_limit_spends.is_empty()
126 && self.key_authorization_witness_burns.is_empty()
127 }
128
129 pub fn from_chain(chain: &Chain<TempoPrimitives>) -> Self {
134 let mut updates = Self::new();
135
136 for log in chain
138 .execution_outcome()
139 .receipts()
140 .iter()
141 .flatten()
142 .flat_map(|receipt| &receipt.logs)
143 {
144 if log.address.is_tip20() {
149 match Tip20PoolEvent::decode(log) {
150 Some(Tip20PoolEvent::PauseStateUpdate(event)) => {
151 updates.pause_events.push((log.address, event.isPaused));
152 }
153 Some(Tip20PoolEvent::TransferPolicyUpdate) => {
154 updates.transfer_policy_updates.insert(log.address);
155 }
156 Some(Tip20PoolEvent::QuoteTokenUpdate) => {
157 updates.quote_token_updates.insert(log.address);
158 }
159 Some(Tip20PoolEvent::Transfer { from }) => {
160 updates
161 .fee_balance_changes
162 .entry(log.address)
163 .or_default()
164 .insert(from);
165 }
166 None => {}
167 }
168 }
169 else if log.address == ACCOUNT_KEYCHAIN_ADDRESS {
171 match AccountKeychainPoolEvent::decode(log) {
172 Some(AccountKeychainPoolEvent::KeyRevoked(event)) => {
173 updates.revoked_keys.insert(event.account, event.publicKey);
174 updates
175 .key_authorization_target_changes
176 .insert(event.account, event.publicKey);
177 }
178 Some(AccountKeychainPoolEvent::KeyAuthorized(event)) => {
179 updates
180 .key_authorization_target_changes
181 .insert(event.account, event.publicKey);
182 }
183 Some(AccountKeychainPoolEvent::AdminKeyAuthorized(event)) => {
184 updates
185 .key_authorization_target_changes
186 .insert(event.account, event.publicKey);
187 }
188 Some(AccountKeychainPoolEvent::SpendingLimitUpdated(event)) => {
189 updates.spending_limit_changes.insert(
190 event.account,
191 event.publicKey,
192 Some(event.token),
193 );
194 }
195 Some(AccountKeychainPoolEvent::AccessKeySpend(event)) => {
196 updates.spending_limit_spends.insert(
197 event.account,
198 event.publicKey,
199 Some(event.token),
200 );
201 }
202 Some(AccountKeychainPoolEvent::KeyAuthorizationWitnessBurned(event)) => {
203 updates
204 .key_authorization_witness_burns
205 .entry(event.account)
206 .or_default()
207 .insert(event.witness);
208 }
209 None => {}
210 }
211 }
212 else if log.address == TIP_FEE_MANAGER_ADDRESS {
214 match FeeManagerPoolEvent::decode(log) {
215 Some(FeeManagerPoolEvent::ValidatorTokenSet(event)) => {
216 updates
217 .validator_token_changes
218 .insert(event.validator, event.token);
219 }
220 Some(FeeManagerPoolEvent::UserTokenSet(event)) => {
221 updates.user_token_changes.insert(event.user);
222 }
223 None => {}
224 }
225 }
226 else if log.address == TIP403_REGISTRY_ADDRESS {
228 match Tip403PoolEvent::decode(log) {
229 Some(Tip403PoolEvent::BlacklistUpdated(event)) if event.restricted => {
230 updates
231 .blacklist_additions
232 .push((event.policyId, event.account));
233 }
234 Some(Tip403PoolEvent::WhitelistUpdated(event)) if !event.allowed => {
235 updates
236 .whitelist_removals
237 .push((event.policyId, event.account));
238 }
239 Some(_) | None => {}
240 }
241 }
242 }
243
244 updates
245 }
246
247 pub fn has_invalidation_events(&self) -> bool {
249 self.has_keychain_subject_updates()
250 || !self.key_authorization_target_changes.is_empty()
251 || !self.validator_token_changes.is_empty()
252 || !self.user_token_changes.is_empty()
253 || !self.blacklist_additions.is_empty()
254 || !self.whitelist_removals.is_empty()
255 || !self.fee_balance_changes.is_empty()
256 || !self.key_authorization_witness_burns.is_empty()
257 }
258
259 pub fn has_keychain_subject_updates(&self) -> bool {
261 !self.revoked_keys.is_empty()
262 || !self.spending_limit_changes.is_empty()
263 || !self.spending_limit_spends.is_empty()
264 }
265}
266
267enum AccountKeychainPoolEvent {
269 KeyAuthorized(IAccountKeychain::KeyAuthorized),
271 AdminKeyAuthorized(IAccountKeychain::AdminKeyAuthorized),
273 KeyRevoked(IAccountKeychain::KeyRevoked),
275 SpendingLimitUpdated(IAccountKeychain::SpendingLimitUpdated),
277 AccessKeySpend(IAccountKeychain::AccessKeySpend),
279 KeyAuthorizationWitnessBurned(IAccountKeychain::KeyAuthorizationWitnessBurned),
281}
282
283impl AccountKeychainPoolEvent {
284 fn decode(log: &Log) -> Option<Self> {
286 match first_topic(log)? {
287 IAccountKeychain::KeyAuthorized::SIGNATURE_HASH => {
288 decode_event(log).map(Self::KeyAuthorized)
289 }
290 IAccountKeychain::AdminKeyAuthorized::SIGNATURE_HASH => {
291 decode_event(log).map(Self::AdminKeyAuthorized)
292 }
293 IAccountKeychain::KeyRevoked::SIGNATURE_HASH => decode_event(log).map(Self::KeyRevoked),
294 IAccountKeychain::SpendingLimitUpdated::SIGNATURE_HASH => {
295 decode_event(log).map(Self::SpendingLimitUpdated)
296 }
297 IAccountKeychain::AccessKeySpend::SIGNATURE_HASH => {
298 decode_event(log).map(Self::AccessKeySpend)
299 }
300 IAccountKeychain::KeyAuthorizationWitnessBurned::SIGNATURE_HASH => {
301 decode_event(log).map(Self::KeyAuthorizationWitnessBurned)
302 }
303 _ => None,
304 }
305 }
306}
307
308enum FeeManagerPoolEvent {
310 ValidatorTokenSet(IFeeManager::ValidatorTokenSet),
312 UserTokenSet(IFeeManager::UserTokenSet),
314}
315
316impl FeeManagerPoolEvent {
317 fn decode(log: &Log) -> Option<Self> {
319 match first_topic(log)? {
320 IFeeManager::ValidatorTokenSet::SIGNATURE_HASH => {
321 decode_event(log).map(Self::ValidatorTokenSet)
322 }
323 IFeeManager::UserTokenSet::SIGNATURE_HASH => decode_event(log).map(Self::UserTokenSet),
324 _ => None,
325 }
326 }
327}
328
329enum Tip403PoolEvent {
331 BlacklistUpdated(ITIP403Registry::BlacklistUpdated),
333 WhitelistUpdated(ITIP403Registry::WhitelistUpdated),
335}
336
337impl Tip403PoolEvent {
338 fn decode(log: &Log) -> Option<Self> {
340 match first_topic(log)? {
341 ITIP403Registry::BlacklistUpdated::SIGNATURE_HASH => {
342 decode_event(log).map(Self::BlacklistUpdated)
343 }
344 ITIP403Registry::WhitelistUpdated::SIGNATURE_HASH => {
345 decode_event(log).map(Self::WhitelistUpdated)
346 }
347 _ => None,
348 }
349 }
350}
351
352enum Tip20PoolEvent {
354 PauseStateUpdate(ITIP20::PauseStateUpdate),
356 TransferPolicyUpdate,
358 QuoteTokenUpdate,
360 Transfer { from: Address },
362}
363
364impl Tip20PoolEvent {
365 fn decode(log: &Log) -> Option<Self> {
367 match first_topic(log)? {
368 ITIP20::Transfer::SIGNATURE_HASH => log.topics().get(1).map(|topic| Self::Transfer {
372 from: Address::from_word(*topic),
373 }),
374 ITIP20::PauseStateUpdate::SIGNATURE_HASH => {
375 decode_event(log).map(Self::PauseStateUpdate)
376 }
377 ITIP20::TransferPolicyUpdate::SIGNATURE_HASH => {
378 decode_event::<ITIP20::TransferPolicyUpdate>(log)
379 .map(|_| Self::TransferPolicyUpdate)
380 }
381 ITIP20::QuoteTokenUpdate::SIGNATURE_HASH => {
382 decode_event::<ITIP20::QuoteTokenUpdate>(log).map(|_| Self::QuoteTokenUpdate)
383 }
384 _ => None,
385 }
386 }
387}
388
389fn first_topic(log: &Log) -> Option<B256> {
390 log.topics().first().copied()
391}
392
393fn decode_event<T: SolEvent>(log: &Log) -> Option<T> {
396 T::decode_log(log).ok().map(|event| event.data)
397}
398
399#[derive(Default)]
407struct TempoPoolState {
408 expiry_map: BTreeMap<u64, B256Set>,
410 tx_to_expiry: B256Map<u64>,
412 paused_pool: PausedFeeTokenPool,
414 pending_staleness: PendingStalenessTracker,
416}
417
418impl TempoPoolState {
419 fn track(&mut self, tx: &TempoPooledTransaction) {
421 let valid_before = tx
422 .inner()
423 .as_aa()
424 .and_then(|tx| tx.tx().valid_before.map(|value| value.get()));
425 let key_expiry = tx.key_expiry();
426
427 let expiry = [valid_before, key_expiry].into_iter().flatten().min();
428
429 if let Some(expiry) = expiry {
430 self.expiry_map
431 .entry(expiry)
432 .or_default()
433 .insert(*tx.hash());
434 self.tx_to_expiry.insert(*tx.hash(), expiry);
435 }
436 }
437
438 fn untrack(&mut self, hash: &TxHash) {
440 if let Some(expiry) = self.tx_to_expiry.remove(hash)
441 && let Entry::Occupied(mut entry) = self.expiry_map.entry(expiry)
442 {
443 entry.get_mut().remove(hash);
444 if entry.get().is_empty() {
445 entry.remove();
446 }
447 }
448 }
449
450 fn untrack_many<'a>(&mut self, hashes: impl IntoIterator<Item = &'a TxHash>) {
457 if self.tx_to_expiry.is_empty() {
459 return;
460 }
461
462 let mut hashes_by_expiry: BTreeMap<u64, B256Set> = BTreeMap::new();
463
464 for hash in hashes {
465 if let Some(expiry) = self.tx_to_expiry.remove(hash) {
466 hashes_by_expiry.entry(expiry).or_default().insert(*hash);
467 }
468 }
469
470 for (expiry, hashes) in hashes_by_expiry {
471 if let Entry::Occupied(mut entry) = self.expiry_map.entry(expiry) {
472 let bucket = entry.get_mut();
473 for hash in hashes {
474 bucket.remove(&hash);
475 }
476 if bucket.is_empty() {
477 entry.remove();
478 }
479 }
480 }
481 }
482
483 fn drain_expired(&mut self, tip_timestamp: u64) -> Vec<TxHash> {
486 let mut expired = Vec::new();
487 while let Some(entry) = self.expiry_map.first_entry()
488 && *entry.key() <= tip_timestamp
489 {
490 let expired_hashes = entry.remove();
491 expired.reserve(expired_hashes.len());
492 for tx_hash in expired_hashes {
493 self.tx_to_expiry.remove(&tx_hash);
494 expired.push(tx_hash);
495 }
496 }
497 expired
498 }
499}
500
501const DEFAULT_PENDING_STALENESS_INTERVAL: u64 = 30 * 60;
504
505#[derive(Debug)]
512struct PendingStalenessTracker {
513 previous_pending: B256Set,
515 last_snapshot_time: Option<u64>,
517 interval_secs: u64,
519}
520
521impl PendingStalenessTracker {
522 fn new(interval_secs: u64) -> Self {
524 Self {
525 previous_pending: B256Set::default(),
526 last_snapshot_time: None,
527 interval_secs,
528 }
529 }
530
531 fn should_check(&self, now: u64) -> bool {
533 self.last_snapshot_time
534 .is_none_or(|last| now.saturating_sub(last) >= self.interval_secs)
535 }
536
537 fn check_and_update(&mut self, current_pending: B256Set, now: u64) -> Vec<TxHash> {
544 let previous_pending = std::mem::take(&mut self.previous_pending);
545
546 let (stale, next_pending): (Vec<TxHash>, B256Set) =
550 current_pending.into_iter().partition_map(|hash| {
551 if previous_pending.contains(&hash) {
552 Either::Left(hash)
553 } else {
554 Either::Right(hash)
555 }
556 });
557
558 self.previous_pending = next_pending;
559 self.last_snapshot_time = Some(now);
560
561 stale
562 }
563}
564
565impl Default for PendingStalenessTracker {
566 fn default() -> Self {
567 Self::new(DEFAULT_PENDING_STALENESS_INTERVAL)
568 }
569}
570
571pub async fn maintain_tempo_pool<Client>(pool: TempoTransactionPool<Client>)
584where
585 Client: StateProviderFactory
586 + HeaderProvider<Header = TempoHeader>
587 + ChainSpecProvider<ChainSpec = TempoChainSpec>
588 + CanonStateSubscriptions<Primitives = TempoPrimitives>
589 + 'static,
590{
591 let mut state = TempoPoolState::default();
592 let metrics = TempoPoolMaintenanceMetrics::default();
593
594 let mut new_txs = pool.new_transactions_listener();
596 let mut chain_events = pool.client().canonical_state_stream();
597
598 let all_txs = pool.all_transactions();
600 for tx in all_txs.iter() {
601 state.track(&tx.transaction);
602 }
603
604 let amm_cache = pool.amm_liquidity_cache();
605 let mut new_tx_events = Vec::with_capacity(NEW_TX_DRAIN_LIMIT);
606
607 loop {
608 tokio::select! {
609 n = new_txs.recv_many(&mut new_tx_events, NEW_TX_DRAIN_LIMIT) => {
611 if n == 0 {
612 break;
613 }
614
615 for tx_event in new_tx_events.drain(..) {
618 state.track(&tx_event.transaction.transaction);
619 }
620 }
621
622 Some(event) = chain_events.next() => {
624 let new = match event {
625 CanonStateNotification::Reorg { old: _, new } => {
626 if let Err(err) = amm_cache.repopulate(pool.client()) {
629 error!(target: "txpool", ?err, "AMM liquidity cache repopulate after reorg failed");
630 }
631
632 new
633 }
634 CanonStateNotification::Commit { new } => new,
635 };
636
637 let block_update_start = Instant::now();
638
639 let tip = &new;
640 let bundle_state = tip.execution_outcome().state().state();
641 let tip_timestamp = tip.tip().header().timestamp();
642
643 let mut removed_txs: Vec<Vec<_>> = Vec::with_capacity(1);
647
648 let nonce_pool_start = Instant::now();
652 removed_txs.push(pool.notify_aa_pool_on_state_updates(bundle_state));
653 metrics.nonce_pool_update_duration_seconds.record(nonce_pool_start.elapsed());
654
655 let amm_start = Instant::now();
657 amm_cache.on_new_state(tip.execution_outcome());
658 if let Err(err) = amm_cache
659 .on_new_blocks(tip.blocks_iter().map(|block| block.sealed_header()), pool.client())
660 {
661 error!(target: "txpool", ?err, "AMM liquidity cache update failed");
662 }
663 metrics.amm_cache_update_duration_seconds.record(amm_start.elapsed());
664
665 let updates = TempoPoolUpdates::from_chain(tip);
667
668 state.untrack_many(tip.transaction_hashes());
670
671 let max_expiry = tip_timestamp.saturating_add(EVICTION_BUFFER_SECS);
674
675 let expired_txs = state.drain_expired(max_expiry);
679
680 let expired_start = Instant::now();
682 if !expired_txs.is_empty() {
683 let evicted = pool.remove_transactions(expired_txs);
684 debug!(
685 target: "txpool",
686 count = evicted.len(),
687 tip_timestamp,
688 "Evicting expired AA transactions (valid_before)"
689 );
690 metrics.expired_transactions_evicted.increment(evicted.len() as u64);
691 removed_txs.push(evicted);
692 }
693 metrics.expired_eviction_duration_seconds.record(expired_start.elapsed());
694
695 let mut all_txs: Option<AllPoolTransactions<TempoPooledTransaction>> = None;
696 let mut removed_this_iteration = B256Set::default();
697
698 let pause_start = Instant::now();
700
701 let pause_tokens: Vec<Address> = updates
705 .pause_events
706 .iter()
707 .filter_map(|(token, is_paused)| is_paused.then_some(*token))
708 .collect();
709
710 if !pause_tokens.is_empty() {
713 let mut by_token = {
716 let all_txs = all_txs.get_or_insert_with(|| pool.all_transactions());
717 all_txs.iter()
718 .filter(|tx| !removed_this_iteration.contains(tx.hash()))
719 .fold(
720 AddressMap::<Vec<TxHash>>::default(),
721 |mut by_token, tx| {
722 by_token
723 .entry(tx.transaction.effective_fee_token())
724 .or_default()
725 .push(*tx.hash());
726 by_token
727 },
728 )
729 };
730
731 for token in pause_tokens {
733 let Some(hashes_to_pause) = by_token.remove(&token) else {
734 continue;
736 };
737
738 let removed_txs = pool.remove_transactions(hashes_to_pause);
739 let count = removed_txs.len();
740
741 if count > 0 {
742 for tx in &removed_txs {
744 state.untrack(tx.hash());
745 removed_this_iteration.insert(*tx.hash());
746 }
747
748 let entries: Vec<_> = removed_txs
749 .into_iter()
750 .map(|tx| {
751 let valid_before = tx
752 .transaction
753 .inner()
754 .as_aa()
755 .and_then(|aa| aa.tx().valid_before.map(|value| value.get()));
756 PausedEntry { tx, valid_before }
757 })
758 .collect();
759
760 let cap_evicted = state.paused_pool.insert_batch(token, entries);
761 metrics.transactions_paused.increment(count as u64);
762 if cap_evicted > 0 {
763 metrics.paused_pool_cap_evicted.increment(cap_evicted as u64);
764 debug!(
765 target: "txpool",
766 cap_evicted,
767 "Evicted oldest paused transactions due to global cap"
768 );
769 }
770 debug!(
771 target: "txpool",
772 %token,
773 count,
774 "Moved transactions to paused pool (fee token paused)"
775 );
776 }
777 }
778 }
779
780 for (token, is_paused) in &updates.pause_events {
782 if *is_paused {
783 continue; }
785
786 let paused_entries = state.paused_pool.drain_token(token);
788 if !paused_entries.is_empty() {
789 let count = paused_entries.len();
790 metrics.transactions_unpaused.increment(count as u64);
791 let pool_clone = pool.clone();
792 let token = *token;
793 tokio::spawn(async move {
794 let txs: Vec<_> = paused_entries
795 .into_iter()
796 .map(|e| e.tx.transaction.clone())
797 .collect();
798
799 let results = pool_clone
800 .add_external_transactions(txs)
801 .await;
802
803 let success = results.iter().filter(|r| r.is_ok()).count();
804 debug!(
805 target: "txpool",
806 %token,
807 total = count,
808 success,
809 "Restored transactions from paused pool (fee token unpaused)"
810 );
811 });
812 }
813 }
814
815 let paused_expired = state.paused_pool.evict_expired(tip_timestamp);
817 let paused_timed_out = state.paused_pool.evict_timed_out();
818 let total_paused_evicted = paused_expired + paused_timed_out;
819 if total_paused_evicted > 0 {
820 debug!(
821 target: "txpool",
822 count = total_paused_evicted,
823 tip_timestamp,
824 "Evicted expired transactions from paused pool"
825 );
826 }
827
828 if !updates.revoked_keys.is_empty()
831 || !updates.key_authorization_target_changes.is_empty()
832 || !updates.spending_limit_changes.is_empty()
833 || !updates.key_authorization_witness_burns.is_empty()
834 {
835 state.paused_pool.evict_invalidated(
836 &updates.revoked_keys,
837 &updates.key_authorization_target_changes,
838 &updates.spending_limit_changes,
839 &updates.key_authorization_witness_burns,
840 );
841 }
842 metrics.pause_events_duration_seconds.record(pause_start.elapsed());
843
844 for (updated, counter, reason) in [
849 (
850 &updates.transfer_policy_updates,
851 &metrics.transfer_policy_revalidated,
852 "transfer policy update",
853 ),
854 (
855 &updates.quote_token_updates,
856 &metrics.quote_token_revalidated,
857 "quote token update",
858 ),
859 ] {
860 if updated.is_empty() {
861 continue;
862 }
863
864 let hashes: Vec<TxHash> = {
865 let all_txs = all_txs.get_or_insert_with(|| pool.all_transactions());
866 all_txs
867 .iter()
868 .filter(|tx| !removed_this_iteration.contains(tx.hash()))
869 .filter(|tx| {
870 tx.transaction
871 .resolved_fee_token()
872 .is_some_and(|t| updated.contains(&t))
873 })
874 .map(|tx| *tx.hash())
875 .collect()
876 };
877 if !hashes.is_empty() {
878 let removed_txs = pool.remove_transactions(hashes);
879 let count = removed_txs.len();
880
881 for tx in &removed_txs {
882 state.untrack(tx.hash());
883 removed_this_iteration.insert(*tx.hash());
884 }
885
886 counter.increment(count as u64);
887
888 let pool_clone = pool.clone();
889 tokio::spawn(async move {
890 let txs: Vec<_> = removed_txs
891 .into_iter()
892 .map(|tx| (tx.origin, tx.transaction.clone()))
893 .collect();
894
895 let results = pool_clone.add_transactions_with_origins(txs).await;
896 let success = results.iter().filter(|r| r.is_ok()).count();
897 debug!(
898 target: "txpool",
899 total = count,
900 success,
901 reason,
902 "Re-validated transactions"
903 );
904 });
905 }
906 }
907
908 if updates.has_invalidation_events() {
913 let invalidation_start = Instant::now();
914 debug!(
915 target: "txpool",
916 revoked_keys = updates.revoked_keys.len(),
917 key_authorization_target_changes =
918 updates.key_authorization_target_changes.len(),
919 spending_limit_changes = updates.spending_limit_changes.len(),
920 spending_limit_spends = updates.spending_limit_spends.len(),
921 validator_token_changes = updates.validator_token_changes.len(),
922 user_token_changes = updates.user_token_changes.len(),
923 blacklist_additions = updates.blacklist_additions.len(),
924 whitelist_removals = updates.whitelist_removals.len(),
925 "Processing transaction invalidation events"
926 );
927 let evicted = {
928 let all_txs = all_txs.get_or_insert_with(|| pool.all_transactions());
929 pool.evict_invalidated_transactions_from(
930 &updates,
931 all_txs
932 .iter()
933 .filter(|tx| !removed_this_iteration.contains(tx.hash())),
934 )
935 };
936 for tx in &evicted {
937 state.untrack(tx.hash());
938 }
939 metrics.transactions_invalidated.increment(evicted.len() as u64);
940 removed_txs.push(evicted);
941 metrics
942 .invalidation_eviction_duration_seconds
943 .record(invalidation_start.elapsed());
944 }
945
946 if state.pending_staleness.should_check(tip_timestamp) {
950 let current_pending: B256Set =
951 pool.pending_transactions().iter().map(|tx| *tx.hash()).collect();
952 let stale_to_evict =
953 state.pending_staleness.check_and_update(current_pending, tip_timestamp);
954
955 if !stale_to_evict.is_empty() {
956 debug!(
957 target: "txpool",
958 count = stale_to_evict.len(),
959 tip_timestamp,
960 "Evicting stale pending transactions"
961 );
962 for hash in &stale_to_evict {
964 state.untrack(hash);
965 }
966 removed_txs.push(pool.remove_transactions(stale_to_evict));
967 }
968 }
969
970 metrics.block_update_duration_seconds.record(block_update_start.elapsed());
972
973 drop(removed_txs);
975 }
976 }
977 }
978}
979
980#[cfg(test)]
981mod tests {
982 use super::*;
983 use crate::test_utils::TxBuilder;
984 use alloy_primitives::{Address, B256, TxHash};
985 use reth_primitives_traits::RecoveredBlock;
986 use std::{collections::HashSet, sync::Arc};
987 use tempo_primitives::{Block, BlockBody, TempoHeader, TempoTxEnvelope};
988
989 mod pending_staleness_tracker_tests {
990 use super::*;
991
992 #[test]
993 fn no_eviction_on_first_snapshot() {
994 let mut tracker = PendingStalenessTracker::new(100);
995 let tx1 = TxHash::random();
996
997 let stale = tracker.check_and_update([tx1].into_iter().collect(), 100);
999 assert!(stale.is_empty());
1000 assert!(tracker.previous_pending.contains(&tx1));
1001 }
1002
1003 #[test]
1004 fn evicts_transactions_present_in_both_snapshots() {
1005 let mut tracker = PendingStalenessTracker::new(100);
1006 let tx_stale = TxHash::random();
1007 let tx_new = TxHash::random();
1008
1009 tracker.check_and_update([tx_stale].into_iter().collect(), 0);
1011
1012 let stale = tracker.check_and_update([tx_stale, tx_new].into_iter().collect(), 100);
1014
1015 assert_eq!(stale.len(), 1);
1017 assert!(stale.contains(&tx_stale));
1018
1019 assert!(tracker.previous_pending.contains(&tx_new));
1021 assert!(!tracker.previous_pending.contains(&tx_stale));
1023 }
1024
1025 #[test]
1026 fn should_check_returns_false_before_interval_elapsed() {
1027 let mut tracker = PendingStalenessTracker::new(100);
1028 let tx = TxHash::random();
1029
1030 assert!(tracker.should_check(0));
1032 tracker.check_and_update([tx].into_iter().collect(), 0);
1033
1034 assert!(!tracker.should_check(50));
1036 assert_eq!(tracker.last_snapshot_time, Some(0));
1037
1038 assert!(tracker.should_check(100));
1040 }
1041
1042 #[test]
1043 fn removes_transactions_no_longer_pending_from_snapshot() {
1044 let mut tracker = PendingStalenessTracker::new(100);
1045 let tx1 = TxHash::random();
1046 let tx2 = TxHash::random();
1047
1048 tracker.check_and_update([tx1, tx2].into_iter().collect(), 0);
1050 assert_eq!(tracker.previous_pending.len(), 2);
1051
1052 let stale = tracker.check_and_update([tx1].into_iter().collect(), 100);
1055 assert_eq!(stale.len(), 1);
1056 assert!(stale.contains(&tx1));
1057
1058 assert!(tracker.previous_pending.is_empty());
1060 }
1061 }
1062
1063 #[test]
1064 fn track_groups_duplicate_expiries() {
1065 let mut state = TempoPoolState::default();
1066 let tx_a = TxBuilder::aa(Address::random())
1067 .nonce(1)
1068 .valid_before(1000)
1069 .build();
1070 let tx_b = TxBuilder::aa(Address::random())
1071 .nonce(2)
1072 .valid_before(1000)
1073 .build();
1074
1075 state.track(&tx_a);
1076 state.track(&tx_b);
1077 state.track(&tx_a);
1078
1079 let bucket = state.expiry_map.get(&1000).unwrap();
1080 assert_eq!(bucket.len(), 2);
1081 assert!(bucket.contains(tx_a.hash()));
1082 assert!(bucket.contains(tx_b.hash()));
1083 assert_eq!(state.tx_to_expiry.get(tx_a.hash()), Some(&1000));
1084 assert_eq!(state.tx_to_expiry.get(tx_b.hash()), Some(&1000));
1085 }
1086
1087 #[test]
1088 fn untrack_removes_hash_and_empty_bucket() {
1089 let mut state = TempoPoolState::default();
1090 let hash_a = TxHash::random();
1091 let hash_b = TxHash::random();
1092 let hash_unknown = TxHash::random();
1093
1094 insert_tracked_hash(&mut state, hash_a, 1000);
1096 insert_tracked_hash(&mut state, hash_b, 1000);
1097
1098 state.untrack(&hash_a);
1100 state.untrack(&hash_unknown);
1101
1102 assert!(!state.tx_to_expiry.contains_key(&hash_a));
1104 let bucket = state.expiry_map.get(&1000).unwrap();
1105 assert_eq!(bucket.len(), 1);
1106 assert!(bucket.contains(&hash_b));
1107
1108 state.untrack(&hash_b);
1110 assert!(!state.tx_to_expiry.contains_key(&hash_b));
1111 assert!(!state.expiry_map.contains_key(&1000));
1112 }
1113
1114 #[test]
1115 fn untrack_many_removes_hashes_by_expiry_bucket() {
1116 let mut state = TempoPoolState::default();
1117 let hash_a = TxHash::random();
1118 let hash_b = TxHash::random();
1119 let hash_c = TxHash::random();
1120 let hash_d = TxHash::random();
1121 let hash_unknown = TxHash::random();
1122
1123 insert_tracked_hash(&mut state, hash_a, 1000);
1124 insert_tracked_hash(&mut state, hash_b, 1000);
1125 insert_tracked_hash(&mut state, hash_c, 1000);
1126 insert_tracked_hash(&mut state, hash_d, 2000);
1127
1128 state.untrack_many([&hash_a, &hash_b, &hash_unknown, &hash_d]);
1129
1130 assert!(!state.tx_to_expiry.contains_key(&hash_a));
1131 assert!(!state.tx_to_expiry.contains_key(&hash_b));
1132 assert!(!state.tx_to_expiry.contains_key(&hash_d));
1133 assert_eq!(state.tx_to_expiry.get(&hash_c), Some(&1000));
1134
1135 let bucket = state.expiry_map.get(&1000).unwrap();
1136 assert_eq!(bucket.len(), 1);
1137 assert!(bucket.contains(&hash_c));
1138 assert!(!state.expiry_map.contains_key(&2000));
1139 }
1140
1141 #[test]
1142 fn drain_expired_removes_expired_buckets_and_returns_hashes() {
1143 let mut state = TempoPoolState::default();
1144 let hash_a = TxHash::random();
1145 let hash_b = TxHash::random();
1146 let hash_c = TxHash::random();
1147 let hash_d = TxHash::random();
1148
1149 insert_tracked_hash(&mut state, hash_a, 1000);
1150 insert_tracked_hash(&mut state, hash_b, 1000);
1151 insert_tracked_hash(&mut state, hash_c, 2000);
1152 insert_tracked_hash(&mut state, hash_d, 3000);
1153
1154 let expired = state.drain_expired(2000);
1155
1156 assert_hashes_eq(expired, &[hash_a, hash_b, hash_c]);
1157 assert!(!state.expiry_map.contains_key(&1000));
1158 assert!(!state.expiry_map.contains_key(&2000));
1159 assert!(state.expiry_map[&3000].contains(&hash_d));
1160 assert!(!state.tx_to_expiry.contains_key(&hash_a));
1161 assert!(!state.tx_to_expiry.contains_key(&hash_b));
1162 assert!(!state.tx_to_expiry.contains_key(&hash_c));
1163 assert_eq!(state.tx_to_expiry.get(&hash_d), Some(&3000));
1164 }
1165
1166 fn insert_tracked_hash(state: &mut TempoPoolState, hash: TxHash, expiry: u64) {
1167 state.expiry_map.entry(expiry).or_default().insert(hash);
1168 state.tx_to_expiry.insert(hash, expiry);
1169 }
1170
1171 fn assert_hashes_eq(actual: Vec<TxHash>, expected: &[TxHash]) {
1172 assert_eq!(actual.len(), expected.len());
1173 let actual: HashSet<TxHash> = actual.into_iter().collect();
1174 let expected: HashSet<TxHash> = expected.iter().copied().collect();
1175 assert_eq!(actual, expected);
1176 }
1177
1178 mod narrow_event_decoding {
1179 use super::*;
1180 use alloy_primitives::U256;
1181
1182 macro_rules! assert_decodes_like_generated {
1183 ($enum_ty:ident, $variant:ident, $event_ty:ty, $log:expr) => {{
1184 let expected = generated_decode::<$event_ty>(&$log);
1185 match $enum_ty::decode(&$log) {
1186 Some($enum_ty::$variant(event)) => assert_eq!(event, expected),
1187 _ => panic!("unexpected decoded event"),
1188 }
1189 }};
1190 }
1191
1192 macro_rules! assert_decodes_unit_like_generated {
1193 ($enum_ty:ident, $variant:ident, $event_ty:ty, $log:expr) => {{
1194 let _expected = generated_decode::<$event_ty>(&$log);
1195 assert!(
1196 matches!($enum_ty::decode(&$log), Some($enum_ty::$variant)),
1197 "unexpected decoded event"
1198 );
1199 }};
1200 }
1201
1202 fn event_log<T>(address: Address, event: T) -> Log
1203 where
1204 T: SolEvent,
1205 for<'a> &'a T: Into<alloy_primitives::LogData>,
1206 {
1207 Log::new_from_event_unchecked(address, event).reserialize()
1208 }
1209
1210 fn generated_decode<T: SolEvent>(log: &Log) -> T {
1211 T::decode_log(log)
1212 .expect("generated event decode should succeed")
1213 .data
1214 }
1215
1216 #[test]
1217 fn account_keychain_decode_matches_generated_event_decoders() {
1218 let log = event_log(
1219 ACCOUNT_KEYCHAIN_ADDRESS,
1220 IAccountKeychain::KeyAuthorized {
1221 account: Address::random(),
1222 publicKey: Address::random(),
1223 signatureType: 0,
1224 expiry: u64::MAX,
1225 },
1226 );
1227 assert_decodes_like_generated!(
1228 AccountKeychainPoolEvent,
1229 KeyAuthorized,
1230 IAccountKeychain::KeyAuthorized,
1231 log
1232 );
1233
1234 let log = event_log(
1235 ACCOUNT_KEYCHAIN_ADDRESS,
1236 IAccountKeychain::AdminKeyAuthorized {
1237 account: Address::random(),
1238 publicKey: Address::random(),
1239 },
1240 );
1241 assert_decodes_like_generated!(
1242 AccountKeychainPoolEvent,
1243 AdminKeyAuthorized,
1244 IAccountKeychain::AdminKeyAuthorized,
1245 log
1246 );
1247
1248 let log = event_log(
1249 ACCOUNT_KEYCHAIN_ADDRESS,
1250 IAccountKeychain::KeyRevoked {
1251 account: Address::random(),
1252 publicKey: Address::random(),
1253 },
1254 );
1255 assert_decodes_like_generated!(
1256 AccountKeychainPoolEvent,
1257 KeyRevoked,
1258 IAccountKeychain::KeyRevoked,
1259 log
1260 );
1261
1262 let log = event_log(
1263 ACCOUNT_KEYCHAIN_ADDRESS,
1264 IAccountKeychain::SpendingLimitUpdated {
1265 account: Address::random(),
1266 publicKey: Address::random(),
1267 token: Address::random(),
1268 newLimit: U256::from(12_345),
1269 },
1270 );
1271 assert_decodes_like_generated!(
1272 AccountKeychainPoolEvent,
1273 SpendingLimitUpdated,
1274 IAccountKeychain::SpendingLimitUpdated,
1275 log
1276 );
1277
1278 let log = event_log(
1279 ACCOUNT_KEYCHAIN_ADDRESS,
1280 IAccountKeychain::AccessKeySpend {
1281 account: Address::random(),
1282 publicKey: Address::random(),
1283 token: Address::random(),
1284 amount: U256::from(25),
1285 remainingLimit: U256::from(75),
1286 },
1287 );
1288 assert_decodes_like_generated!(
1289 AccountKeychainPoolEvent,
1290 AccessKeySpend,
1291 IAccountKeychain::AccessKeySpend,
1292 log
1293 );
1294
1295 let log = event_log(
1296 ACCOUNT_KEYCHAIN_ADDRESS,
1297 IAccountKeychain::KeyAuthorizationWitnessBurned {
1298 account: Address::random(),
1299 witness: B256::random(),
1300 },
1301 );
1302 assert_decodes_like_generated!(
1303 AccountKeychainPoolEvent,
1304 KeyAuthorizationWitnessBurned,
1305 IAccountKeychain::KeyAuthorizationWitnessBurned,
1306 log
1307 );
1308 }
1309
1310 #[test]
1311 fn fee_manager_decode_matches_generated_event_decoders() {
1312 let log = event_log(
1313 TIP_FEE_MANAGER_ADDRESS,
1314 IFeeManager::ValidatorTokenSet {
1315 validator: Address::random(),
1316 token: Address::random(),
1317 },
1318 );
1319 assert_decodes_like_generated!(
1320 FeeManagerPoolEvent,
1321 ValidatorTokenSet,
1322 IFeeManager::ValidatorTokenSet,
1323 log
1324 );
1325
1326 let log = event_log(
1327 TIP_FEE_MANAGER_ADDRESS,
1328 IFeeManager::UserTokenSet {
1329 user: Address::random(),
1330 token: Address::random(),
1331 },
1332 );
1333 assert_decodes_like_generated!(
1334 FeeManagerPoolEvent,
1335 UserTokenSet,
1336 IFeeManager::UserTokenSet,
1337 log
1338 );
1339 }
1340
1341 #[test]
1342 fn tip403_decode_matches_generated_event_decoders() {
1343 let log = event_log(
1344 TIP403_REGISTRY_ADDRESS,
1345 ITIP403Registry::BlacklistUpdated {
1346 policyId: 7,
1347 updater: Address::random(),
1348 account: Address::random(),
1349 restricted: true,
1350 },
1351 );
1352 assert_decodes_like_generated!(
1353 Tip403PoolEvent,
1354 BlacklistUpdated,
1355 ITIP403Registry::BlacklistUpdated,
1356 log
1357 );
1358
1359 let log = event_log(
1360 TIP403_REGISTRY_ADDRESS,
1361 ITIP403Registry::WhitelistUpdated {
1362 policyId: 9,
1363 updater: Address::random(),
1364 account: Address::random(),
1365 allowed: false,
1366 },
1367 );
1368 assert_decodes_like_generated!(
1369 Tip403PoolEvent,
1370 WhitelistUpdated,
1371 ITIP403Registry::WhitelistUpdated,
1372 log
1373 );
1374 }
1375
1376 #[test]
1377 fn tip20_decode_matches_generated_event_decoders() {
1378 let token = tempo_precompiles::PATH_USD_ADDRESS;
1379 let log = event_log(
1380 token,
1381 ITIP20::PauseStateUpdate {
1382 updater: Address::random(),
1383 isPaused: true,
1384 },
1385 );
1386 assert_decodes_like_generated!(
1387 Tip20PoolEvent,
1388 PauseStateUpdate,
1389 ITIP20::PauseStateUpdate,
1390 log
1391 );
1392
1393 let log = event_log(
1394 token,
1395 ITIP20::TransferPolicyUpdate {
1396 updater: Address::random(),
1397 newPolicyId: 11,
1398 },
1399 );
1400 assert_decodes_unit_like_generated!(
1401 Tip20PoolEvent,
1402 TransferPolicyUpdate,
1403 ITIP20::TransferPolicyUpdate,
1404 log
1405 );
1406
1407 let log = event_log(
1408 token,
1409 ITIP20::QuoteTokenUpdate {
1410 updater: Address::random(),
1411 newQuoteToken: Address::random(),
1412 },
1413 );
1414 assert_decodes_unit_like_generated!(
1415 Tip20PoolEvent,
1416 QuoteTokenUpdate,
1417 ITIP20::QuoteTokenUpdate,
1418 log
1419 );
1420
1421 let log = event_log(
1422 token,
1423 ITIP20::Transfer {
1424 from: Address::random(),
1425 to: Address::random(),
1426 amount: U256::from(42),
1427 },
1428 );
1429 let expected = generated_decode::<ITIP20::Transfer>(&log);
1432 match Tip20PoolEvent::decode(&log) {
1433 Some(Tip20PoolEvent::Transfer { from }) => assert_eq!(from, expected.from),
1434 _ => panic!("unexpected decoded event"),
1435 }
1436 }
1437 }
1438
1439 fn create_test_chain(
1440 blocks: Vec<reth_primitives_traits::RecoveredBlock<Block>>,
1441 ) -> Arc<Chain<TempoPrimitives>> {
1442 create_test_chain_with_receipts(blocks, Vec::new())
1443 }
1444
1445 fn create_test_chain_with_receipts(
1446 blocks: Vec<reth_primitives_traits::RecoveredBlock<Block>>,
1447 receipts: Vec<Vec<tempo_primitives::TempoReceipt>>,
1448 ) -> Arc<Chain<TempoPrimitives>> {
1449 use reth_provider::{Chain, ExecutionOutcome};
1450
1451 Arc::new(Chain::new(
1452 blocks,
1453 ExecutionOutcome {
1454 receipts,
1455 ..Default::default()
1456 },
1457 Default::default(),
1458 ))
1459 }
1460
1461 fn create_block_with_txs(
1463 block_number: u64,
1464 transactions: Vec<TempoTxEnvelope>,
1465 senders: Vec<Address>,
1466 ) -> RecoveredBlock<Block> {
1467 let header = TempoHeader {
1468 inner: alloy_consensus::Header {
1469 number: block_number,
1470 ..Default::default()
1471 },
1472 ..Default::default()
1473 };
1474 let body = BlockBody {
1475 transactions,
1476 ..Default::default()
1477 };
1478 let block = Block::new(header, body);
1479 RecoveredBlock::new_unhashed(block, senders)
1480 }
1481
1482 fn extract_envelope(tx: &crate::transaction::TempoPooledTransaction) -> TempoTxEnvelope {
1484 tx.inner().clone().into_inner()
1485 }
1486
1487 mod from_chain_spending_limit_spends {
1488 use super::*;
1489 use alloy_primitives::{IntoLogData, Log, U256};
1490 use alloy_signer_local::PrivateKeySigner;
1491 use tempo_primitives::{TempoReceipt, TempoTxType};
1492
1493 #[test]
1496 fn extracts_access_key_spend_events() {
1497 let user_address = Address::random();
1498 let access_key_signer = PrivateKeySigner::random();
1499 let key_id = access_key_signer.address();
1500 let fee_token = Address::random();
1501 let spent_token = Address::random();
1502
1503 let keychain_tx = TxBuilder::aa(user_address)
1504 .fee_token(fee_token)
1505 .build_keychain(user_address, &access_key_signer);
1506 let envelope = extract_envelope(&keychain_tx);
1507
1508 let spend_log = alloy_primitives::Log::new_from_event_unchecked(
1509 ACCOUNT_KEYCHAIN_ADDRESS,
1510 IAccountKeychain::AccessKeySpend {
1511 account: user_address,
1512 publicKey: key_id,
1513 token: spent_token,
1514 amount: U256::from(25),
1515 remainingLimit: U256::from(75),
1516 },
1517 )
1518 .reserialize();
1519 let receipt = tempo_primitives::TempoReceipt {
1520 tx_type: tempo_primitives::TempoTxType::AA,
1521 success: true,
1522 cumulative_gas_used: 1,
1523 logs: vec![spend_log],
1524 };
1525
1526 let block = create_block_with_txs(1, vec![envelope], vec![user_address]);
1527 let chain = create_test_chain_with_receipts(vec![block], vec![vec![receipt]]);
1528
1529 let updates = TempoPoolUpdates::from_chain(&chain);
1530
1531 assert!(
1532 updates
1533 .spending_limit_spends
1534 .contains(user_address, key_id, spent_token),
1535 "Should contain the AccessKeySpend event's (account, key_id, token)"
1536 );
1537 assert!(
1538 !updates
1539 .spending_limit_spends
1540 .contains(user_address, key_id, fee_token),
1541 "Should not infer spends from the tx fee token"
1542 );
1543 assert_eq!(updates.spending_limit_spends.len(), 1);
1544 }
1545
1546 #[test]
1547 fn extracts_key_authorization_witness_burned_events() {
1548 let account = Address::random();
1549 let witness = B256::random();
1550
1551 let log = alloy_primitives::Log::new_from_event_unchecked(
1552 ACCOUNT_KEYCHAIN_ADDRESS,
1553 IAccountKeychain::KeyAuthorizationWitnessBurned { account, witness },
1554 )
1555 .reserialize();
1556 let receipt = tempo_primitives::TempoReceipt {
1557 tx_type: tempo_primitives::TempoTxType::AA,
1558 success: true,
1559 cumulative_gas_used: 1,
1560 logs: vec![log],
1561 };
1562
1563 let block = create_block_with_txs(1, vec![], vec![]);
1564 let chain = create_test_chain_with_receipts(vec![block], vec![vec![receipt]]);
1565
1566 let updates = TempoPoolUpdates::from_chain(&chain);
1567
1568 assert!(
1569 updates
1570 .key_authorization_witness_burns
1571 .get(&account)
1572 .is_some_and(|witnesses| witnesses.contains(&witness)),
1573 "Should contain the burned (account, witness)"
1574 );
1575 assert!(updates.has_invalidation_events());
1576 }
1577
1578 #[test]
1581 fn ignores_keychain_transactions_without_access_key_spend_logs() {
1582 let user_address = Address::random();
1583 let access_key_signer = PrivateKeySigner::random();
1584 let fee_token = Address::random();
1585
1586 let keychain_tx = TxBuilder::aa(user_address)
1587 .fee_token(fee_token)
1588 .build_keychain(user_address, &access_key_signer);
1589 let envelope = extract_envelope(&keychain_tx);
1590
1591 let block = create_block_with_txs(1, vec![envelope], vec![user_address]);
1592 let chain = create_test_chain(vec![block]);
1593
1594 let updates = TempoPoolUpdates::from_chain(&chain);
1595 assert!(updates.spending_limit_spends.is_empty());
1596 }
1597
1598 #[test]
1600 fn ignores_non_keychain_aa_transactions() {
1601 let sender = Address::random();
1602 let tx = TxBuilder::aa(sender).fee_token(Address::random()).build();
1603 let envelope = extract_envelope(&tx);
1604
1605 let block = create_block_with_txs(1, vec![envelope], vec![sender]);
1606 let chain = create_test_chain(vec![block]);
1607
1608 let updates = TempoPoolUpdates::from_chain(&chain);
1609 assert!(updates.spending_limit_spends.is_empty());
1610 }
1611
1612 #[test]
1614 fn ignores_eip1559_transactions() {
1615 let sender = Address::random();
1616 let tx = TxBuilder::eip1559(Address::random()).build_eip1559();
1617 let envelope = extract_envelope(&tx);
1618
1619 let block = create_block_with_txs(1, vec![envelope], vec![sender]);
1620 let chain = create_test_chain(vec![block]);
1621
1622 let updates = TempoPoolUpdates::from_chain(&chain);
1623 assert!(updates.spending_limit_spends.is_empty());
1624 }
1625
1626 #[test]
1628 fn has_invalidation_events_includes_spending_limit_spends() {
1629 let mut updates = TempoPoolUpdates::new();
1630 assert!(!updates.has_invalidation_events());
1631
1632 updates.spending_limit_spends.insert(
1633 Address::random(),
1634 Address::random(),
1635 Some(Address::random()),
1636 );
1637 assert!(updates.has_invalidation_events());
1638 }
1639
1640 #[test]
1641 fn extracts_fee_balance_changes_from_tip20_transfer_logs() {
1642 let fee_token = tempo_precompiles::PATH_USD_ADDRESS;
1643 let from = Address::random();
1644 let to = Address::random();
1645 let amount = U256::from(42_u64);
1646 let log_data = ITIP20::Transfer { from, to, amount }.into_log_data();
1647 let log =
1648 Log::new_unchecked(fee_token, log_data.topics().to_vec(), log_data.data.clone());
1649 let receipt = TempoReceipt {
1650 tx_type: TempoTxType::Legacy,
1651 success: true,
1652 cumulative_gas_used: 21_000,
1653 logs: vec![log],
1654 };
1655
1656 let block = create_block_with_txs(1, vec![], vec![]);
1657 let chain = create_test_chain_with_receipts(vec![block], vec![vec![receipt]]);
1658 let updates = TempoPoolUpdates::from_chain(&chain);
1659
1660 assert!(
1661 updates
1662 .fee_balance_changes
1663 .get(&fee_token)
1664 .is_some_and(|accounts| accounts.len() == 1 && accounts.contains(&from)),
1665 "TIP20 transfer logs should only mark the debited sender as balance-changed"
1666 );
1667 assert!(updates.has_invalidation_events());
1668 }
1669
1670 #[test]
1672 fn extracts_transfer_policy_updates() {
1673 let fee_token = tempo_precompiles::PATH_USD_ADDRESS;
1674 let updater = Address::random();
1675 let new_policy_id = 42u64;
1676 let log_data = ITIP20::TransferPolicyUpdate {
1677 updater,
1678 newPolicyId: new_policy_id,
1679 }
1680 .into_log_data();
1681 let log =
1682 Log::new_unchecked(fee_token, log_data.topics().to_vec(), log_data.data.clone());
1683 let receipt = TempoReceipt {
1684 tx_type: TempoTxType::Legacy,
1685 success: true,
1686 cumulative_gas_used: 21_000,
1687 logs: vec![log],
1688 };
1689
1690 let block = create_block_with_txs(1, vec![], vec![]);
1691 let chain = create_test_chain_with_receipts(vec![block], vec![vec![receipt]]);
1692 let updates = TempoPoolUpdates::from_chain(&chain);
1693
1694 assert!(
1695 updates.transfer_policy_updates.contains(&fee_token),
1696 "TransferPolicyUpdate should be tracked by token address"
1697 );
1698 }
1699
1700 #[test]
1702 fn transfer_policy_updates_deduplicates_by_token() {
1703 let fee_token = tempo_precompiles::PATH_USD_ADDRESS;
1704
1705 let log_data_1 = ITIP20::TransferPolicyUpdate {
1706 updater: Address::random(),
1707 newPolicyId: 1,
1708 }
1709 .into_log_data();
1710 let log_data_2 = ITIP20::TransferPolicyUpdate {
1711 updater: Address::random(),
1712 newPolicyId: 2,
1713 }
1714 .into_log_data();
1715 let log1 = Log::new_unchecked(
1716 fee_token,
1717 log_data_1.topics().to_vec(),
1718 log_data_1.data.clone(),
1719 );
1720 let log2 = Log::new_unchecked(
1721 fee_token,
1722 log_data_2.topics().to_vec(),
1723 log_data_2.data.clone(),
1724 );
1725 let receipt = TempoReceipt {
1726 tx_type: TempoTxType::Legacy,
1727 success: true,
1728 cumulative_gas_used: 21_000,
1729 logs: vec![log1, log2],
1730 };
1731
1732 let block = create_block_with_txs(1, vec![], vec![]);
1733 let chain = create_test_chain_with_receipts(vec![block], vec![vec![receipt]]);
1734 let updates = TempoPoolUpdates::from_chain(&chain);
1735
1736 assert_eq!(
1737 updates.transfer_policy_updates.len(),
1738 1,
1739 "duplicate policy updates for the same token should be deduplicated"
1740 );
1741 }
1742
1743 #[test]
1745 fn validator_token_changes_deduplicates_by_validator() {
1746 let validator = Address::random();
1747 let token_a = Address::random();
1748 let token_b = Address::random();
1749
1750 let mut updates = TempoPoolUpdates::new();
1751 updates.validator_token_changes.insert(validator, token_a);
1752 updates.validator_token_changes.insert(validator, token_b);
1753
1754 assert_eq!(
1755 updates.validator_token_changes.len(),
1756 1,
1757 "duplicate validator entries must be deduplicated"
1758 );
1759 assert_eq!(
1760 updates.validator_token_changes.get(&validator).copied(),
1761 Some(token_b),
1762 "last-write-wins: second token should overwrite the first"
1763 );
1764 }
1765 }
1766}