tempo_transaction_pool/
maintain.rs1use crate::{TempoTransactionPool, transaction::TempoPooledTransaction};
4use alloy_primitives::TxHash;
5use futures::StreamExt;
6use reth_chainspec::ChainSpecProvider;
7use reth_primitives_traits::AlloyBlockHeader;
8use reth_provider::{CanonStateNotification, CanonStateSubscriptions};
9use reth_storage_api::StateProviderFactory;
10use reth_transaction_pool::TransactionPool;
11use std::collections::BTreeMap;
12use tempo_chainspec::TempoChainSpec;
13use tempo_primitives::{AASigned, TempoPrimitives};
14use tracing::{debug, error};
15
16pub async fn evict_expired_aa_txs<P, C>(pool: P, client: C)
22where
23 P: TransactionPool<Transaction = TempoPooledTransaction> + 'static,
24 C: CanonStateSubscriptions + 'static,
25{
26 let track_expiry = |map: &mut BTreeMap<u64, Vec<TxHash>>, maybe_aa_tx: Option<&AASigned>| {
28 if let Some(aa_tx) = maybe_aa_tx
29 && let Some(valid_before) = aa_tx.tx().valid_before
30 {
31 let hash = *aa_tx.hash();
32 map.entry(valid_before).or_default().push(hash);
33 }
34 };
35
36 let mut expiry_map: BTreeMap<u64, Vec<TxHash>> = BTreeMap::new();
38
39 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
41
42 let mut new_txs = pool.new_transactions_listener();
44 let mut chain_events = client.canonical_state_stream();
45
46 pool.all_transactions()
48 .all()
49 .for_each(|tx| track_expiry(&mut expiry_map, tx.inner().as_aa()));
50
51 loop {
52 tokio::select! {
53 tx_event = new_txs.recv() => {
55 let Some(tx_event) = tx_event else {
56 break;
57 };
58
59 let tx = &tx_event.transaction.transaction;
61 track_expiry(&mut expiry_map, tx.inner().as_aa());
62 }
63
64 Some(event) = chain_events.next() => {
66 let CanonStateNotification::Commit { new } = event else {
67 continue;
68 };
69
70 let tip_timestamp = new.tip().header().timestamp();
71
72 let mut to_remove = Vec::new();
74 while let Some(entry) = expiry_map.first_entry() && *entry.key() <= tip_timestamp {
75 to_remove.extend(entry.remove());
76 }
77
78 if !to_remove.is_empty() {
79 debug!(
80 target: "txpool",
81 count = to_remove.len(),
82 tip_timestamp,
83 "Evicting expired AA transactions"
84 );
85 pool.remove_transactions(to_remove);
87 }
88 }
89 }
90 }
91}
92
93pub async fn maintain_2d_nonce_pool<Client>(pool: TempoTransactionPool<Client>)
104where
105 Client: StateProviderFactory
106 + ChainSpecProvider<ChainSpec = TempoChainSpec>
107 + CanonStateSubscriptions<Primitives = TempoPrimitives>
108 + 'static,
109{
110 let mut events = pool.client().canonical_state_stream();
111 while let Some(notification) = events.next().await {
112 pool.notify_aa_pool_on_state_updates(
113 notification.committed().execution_outcome().state().state(),
114 );
115 }
116}
117
118pub async fn maintain_amm_cache<Client>(pool: TempoTransactionPool<Client>)
121where
122 Client: StateProviderFactory
123 + ChainSpecProvider<ChainSpec = TempoChainSpec>
124 + CanonStateSubscriptions<Primitives = TempoPrimitives>
125 + 'static,
126{
127 let amm_cache = pool.amm_liquidity_cache();
128 let mut events = pool.client().canonical_state_stream();
129
130 while let Some(notification) = events.next().await {
131 let tip = notification.committed();
132
133 amm_cache.on_new_state(tip.execution_outcome());
134 for block in tip.blocks_iter() {
135 if let Err(err) = amm_cache.on_new_block(block.sealed_header(), pool.client()) {
136 error!(target: "txpool", ?err, "AMM liquidity cache update failed");
137 }
138 }
139 }
140}