1use crate::{
6 amm::AmmLiquidityCache, best::MergeBestTransactions, transaction::TempoPooledTransaction,
7 tt_2d_pool::AA2dPool, validator::TempoTransactionValidator,
8};
9use alloy_consensus::Transaction;
10use alloy_primitives::{Address, B256, map::HashMap};
11use parking_lot::RwLock;
12use reth_chainspec::ChainSpecProvider;
13use reth_eth_wire_types::HandleMempoolData;
14use reth_primitives_traits::Block;
15use reth_provider::{ChangedAccount, StateProviderFactory};
16use reth_transaction_pool::{
17 AddedTransactionOutcome, AllPoolTransactions, BestTransactions, BestTransactionsAttributes,
18 BlockInfo, CanonicalStateUpdate, CoinbaseTipOrdering, GetPooledTransactionLimit,
19 NewBlobSidecar, Pool, PoolResult, PoolSize, PoolTransaction, PropagatedTransactions,
20 TransactionEvents, TransactionOrigin, TransactionPool, TransactionPoolExt,
21 TransactionValidationOutcome, TransactionValidationTaskExecutor, TransactionValidator,
22 ValidPoolTransaction,
23 blobstore::DiskFileBlobStore,
24 error::{PoolError, PoolErrorKind},
25 identifier::TransactionId,
26};
27use revm::database::BundleAccount;
28use std::{collections::HashSet, sync::Arc, time::Instant};
29use tempo_chainspec::TempoChainSpec;
30
31pub struct TempoTransactionPool<Client> {
33 protocol_pool: Pool<
35 TransactionValidationTaskExecutor<TempoTransactionValidator<Client>>,
36 CoinbaseTipOrdering<TempoPooledTransaction>,
37 DiskFileBlobStore,
38 >,
39 aa_2d_pool: Arc<RwLock<AA2dPool>>,
41}
42
43impl<Client> TempoTransactionPool<Client> {
44 pub fn new(
45 protocol_pool: Pool<
46 TransactionValidationTaskExecutor<TempoTransactionValidator<Client>>,
47 CoinbaseTipOrdering<TempoPooledTransaction>,
48 DiskFileBlobStore,
49 >,
50 aa_2d_pool: AA2dPool,
51 ) -> Self {
52 Self {
53 protocol_pool,
54 aa_2d_pool: Arc::new(RwLock::new(aa_2d_pool)),
55 }
56 }
57}
58impl<Client> TempoTransactionPool<Client>
59where
60 Client: StateProviderFactory + ChainSpecProvider<ChainSpec = TempoChainSpec> + 'static,
61{
62 pub fn amm_liquidity_cache(&self) -> AmmLiquidityCache {
64 self.protocol_pool
65 .validator()
66 .validator()
67 .amm_liquidity_cache()
68 }
69
70 pub fn client(&self) -> &Client {
72 self.protocol_pool.validator().validator().client()
73 }
74
75 pub(crate) fn notify_aa_pool_on_state_updates(&self, state: &HashMap<Address, BundleAccount>) {
77 let (promoted, _mined) = self.aa_2d_pool.write().on_state_updates(state);
78 self.protocol_pool
80 .inner()
81 .notify_on_transaction_updates(promoted, Vec::new());
82 }
83
84 fn add_validated_transactions(
85 &self,
86 origin: TransactionOrigin,
87 transactions: Vec<TransactionValidationOutcome<TempoPooledTransaction>>,
88 ) -> Vec<PoolResult<AddedTransactionOutcome>> {
89 if transactions.iter().any(|outcome| {
90 outcome
91 .as_valid_transaction()
92 .map(|tx| tx.transaction().is_aa_2d())
93 .unwrap_or(false)
94 }) {
95 let mut results = Vec::with_capacity(transactions.len());
97 for tx in transactions {
98 results.push(self.add_validated_transaction(origin, tx));
99 }
100 return results;
101 }
102
103 self.protocol_pool
104 .inner()
105 .add_transactions(origin, transactions)
106 }
107
108 fn add_validated_transaction(
109 &self,
110 origin: TransactionOrigin,
111 transaction: TransactionValidationOutcome<TempoPooledTransaction>,
112 ) -> PoolResult<AddedTransactionOutcome> {
113 match transaction {
114 TransactionValidationOutcome::Valid {
115 balance,
116 state_nonce,
117 bytecode_hash,
118 transaction,
119 propagate,
120 authorities,
121 } => {
122 if transaction.transaction().is_aa_2d() {
123 let transaction = transaction.into_transaction();
124 let sender_id = self
125 .protocol_pool
126 .inner()
127 .get_sender_id(transaction.sender());
128 let transaction_id = TransactionId::new(sender_id, transaction.nonce());
129 let tx = ValidPoolTransaction {
130 transaction,
131 transaction_id,
132 propagate,
133 timestamp: Instant::now(),
134 origin,
135 authority_ids: authorities
136 .map(|auths| self.protocol_pool.inner().get_sender_ids(auths)),
137 };
138 let added = self
139 .aa_2d_pool
140 .write()
141 .add_transaction(Arc::new(tx), state_nonce)?;
142 let hash = *added.hash();
143 if let Some(pending) = added.as_pending() {
144 self.protocol_pool
145 .inner()
146 .on_new_pending_transaction(pending);
147 }
148
149 let state = added.transaction_state();
150 self.protocol_pool.inner().notify_event_listeners(&added);
152 self.protocol_pool
153 .inner()
154 .on_new_transaction(added.into_new_transaction_event());
155
156 Ok(AddedTransactionOutcome { hash, state })
157 } else {
158 self.protocol_pool
159 .inner()
160 .add_transactions(
161 origin,
162 std::iter::once(TransactionValidationOutcome::Valid {
163 balance,
164 state_nonce,
165 bytecode_hash,
166 transaction,
167 propagate,
168 authorities,
169 }),
170 )
171 .pop()
172 .unwrap()
173 }
174 }
175 invalid => {
176 self.protocol_pool
178 .inner()
179 .add_transactions(origin, Some(invalid))
180 .pop()
181 .unwrap()
182 }
183 }
184 }
185}
186
187impl<Client> Clone for TempoTransactionPool<Client> {
189 fn clone(&self) -> Self {
190 Self {
191 protocol_pool: self.protocol_pool.clone(),
192 aa_2d_pool: Arc::clone(&self.aa_2d_pool),
193 }
194 }
195}
196
197impl<Client> std::fmt::Debug for TempoTransactionPool<Client> {
199 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200 f.debug_struct("TempoTransactionPool")
201 .field("protocol_pool", &"Pool<...>")
202 .field("aa_2d_nonce_pool", &"AA2dPool<...>")
203 .finish_non_exhaustive()
204 }
205}
206
207impl<Client> TransactionPool for TempoTransactionPool<Client>
209where
210 Client: StateProviderFactory
211 + ChainSpecProvider<ChainSpec = TempoChainSpec>
212 + Send
213 + Sync
214 + 'static,
215 TempoPooledTransaction: reth_transaction_pool::EthPoolTransaction,
216{
217 type Transaction = TempoPooledTransaction;
218
219 fn pool_size(&self) -> PoolSize {
220 let mut size = self.protocol_pool.pool_size();
221 let (pending, queued) = self.aa_2d_pool.read().pending_and_queued_txn_count();
222 size.pending += pending;
223 size.queued += queued;
224 size
225 }
226
227 fn block_info(&self) -> BlockInfo {
228 self.protocol_pool.block_info()
229 }
230
231 async fn add_transaction_and_subscribe(
232 &self,
233 origin: TransactionOrigin,
234 transaction: Self::Transaction,
235 ) -> PoolResult<TransactionEvents> {
236 let tx = self
237 .protocol_pool
238 .validator()
239 .validate_transaction(origin, transaction)
240 .await;
241 let res = self.add_validated_transaction(origin, tx)?;
242 self.transaction_event_listener(res.hash)
243 .ok_or_else(|| PoolError::new(res.hash, PoolErrorKind::DiscardedOnInsert))
244 }
245
246 async fn add_transaction(
247 &self,
248 origin: TransactionOrigin,
249 transaction: Self::Transaction,
250 ) -> PoolResult<AddedTransactionOutcome> {
251 let tx = self
252 .protocol_pool
253 .validator()
254 .validate_transaction(origin, transaction)
255 .await;
256 self.add_validated_transaction(origin, tx)
257 }
258
259 async fn add_transactions(
260 &self,
261 origin: TransactionOrigin,
262 transactions: Vec<Self::Transaction>,
263 ) -> Vec<PoolResult<AddedTransactionOutcome>> {
264 if transactions.is_empty() {
265 return Vec::new();
266 }
267 let validated = self
268 .protocol_pool
269 .validator()
270 .validate_transactions_with_origin(origin, transactions)
271 .await;
272
273 self.add_validated_transactions(origin, validated)
274 }
275
276 fn transaction_event_listener(&self, tx_hash: B256) -> Option<TransactionEvents> {
277 self.protocol_pool.transaction_event_listener(tx_hash)
278 }
279
280 fn all_transactions_event_listener(
281 &self,
282 ) -> reth_transaction_pool::AllTransactionsEvents<Self::Transaction> {
283 self.protocol_pool.all_transactions_event_listener()
284 }
285
286 fn pending_transactions_listener_for(
287 &self,
288 kind: reth_transaction_pool::TransactionListenerKind,
289 ) -> tokio::sync::mpsc::Receiver<B256> {
290 self.protocol_pool.pending_transactions_listener_for(kind)
291 }
292
293 fn blob_transaction_sidecars_listener(&self) -> tokio::sync::mpsc::Receiver<NewBlobSidecar> {
294 self.protocol_pool.blob_transaction_sidecars_listener()
295 }
296
297 fn new_transactions_listener_for(
298 &self,
299 kind: reth_transaction_pool::TransactionListenerKind,
300 ) -> tokio::sync::mpsc::Receiver<reth_transaction_pool::NewTransactionEvent<Self::Transaction>>
301 {
302 self.protocol_pool.new_transactions_listener_for(kind)
303 }
304
305 fn pooled_transaction_hashes(&self) -> Vec<B256> {
306 let mut hashes = self.protocol_pool.pooled_transaction_hashes();
307 hashes.extend(self.aa_2d_pool.read().pooled_transactions_hashes_iter());
308 hashes
309 }
310
311 fn pooled_transaction_hashes_max(&self, max: usize) -> Vec<B256> {
312 let protocol_hashes = self.protocol_pool.pooled_transaction_hashes_max(max);
313 if protocol_hashes.len() >= max {
314 return protocol_hashes;
315 }
316 let remaining = max - protocol_hashes.len();
317 let mut hashes = protocol_hashes;
318 hashes.extend(
319 self.aa_2d_pool
320 .read()
321 .pooled_transactions_hashes_iter()
322 .take(remaining),
323 );
324 hashes
325 }
326
327 fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
328 let mut txs = self.protocol_pool.pooled_transactions();
329 txs.extend(self.aa_2d_pool.read().pooled_transactions_iter());
330 txs
331 }
332
333 fn pooled_transactions_max(
334 &self,
335 max: usize,
336 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
337 let mut txs = self.protocol_pool.pooled_transactions_max(max);
338 if txs.len() >= max {
339 return txs;
340 }
341
342 let remaining = max - txs.len();
343 txs.extend(
344 self.aa_2d_pool
345 .read()
346 .pooled_transactions_iter()
347 .take(remaining),
348 );
349 txs
350 }
351
352 fn get_pooled_transaction_elements(
353 &self,
354 tx_hashes: Vec<B256>,
355 limit: GetPooledTransactionLimit,
356 ) -> Vec<<Self::Transaction as PoolTransaction>::Pooled> {
357 let mut txs = self
358 .aa_2d_pool
359 .read()
360 .get_all_iter(&tx_hashes)
361 .filter_map(|tx| {
362 tx.transaction
363 .clone_into_pooled()
364 .ok()
365 .map(|tx| tx.into_inner())
366 })
367 .collect::<Vec<_>>();
368 txs.extend(
369 self.protocol_pool
370 .get_pooled_transaction_elements(tx_hashes, limit),
371 );
372
373 txs
374 }
375
376 fn get_pooled_transaction_element(
377 &self,
378 tx_hash: B256,
379 ) -> Option<reth_primitives_traits::Recovered<<Self::Transaction as PoolTransaction>::Pooled>>
380 {
381 self.protocol_pool
382 .get_pooled_transaction_element(tx_hash)
383 .or_else(|| {
384 self.aa_2d_pool
385 .read()
386 .get(&tx_hash)
387 .and_then(|tx| tx.transaction.clone_into_pooled().ok())
388 })
389 }
390
391 fn best_transactions(
392 &self,
393 ) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>> {
394 let left = self.protocol_pool.inner().best_transactions();
395 let right = self.aa_2d_pool.read().best_transactions();
396 Box::new(MergeBestTransactions::new(left, right))
397 }
398
399 fn best_transactions_with_attributes(
400 &self,
401 _attributes: BestTransactionsAttributes,
402 ) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>> {
403 self.best_transactions()
404 }
405
406 fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
407 let mut pending = self.protocol_pool.pending_transactions();
408 pending.extend(self.aa_2d_pool.read().pending_transactions());
409 pending
410 }
411
412 fn pending_transactions_max(
413 &self,
414 max: usize,
415 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
416 let protocol_txs = self.protocol_pool.pending_transactions_max(max);
417 if protocol_txs.len() >= max {
418 return protocol_txs;
419 }
420 let remaining = max - protocol_txs.len();
421 let mut txs = protocol_txs;
422 txs.extend(
423 self.aa_2d_pool
424 .read()
425 .pending_transactions()
426 .take(remaining),
427 );
428 txs
429 }
430
431 fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
432 let mut queued = self.protocol_pool.queued_transactions();
433 queued.extend(self.aa_2d_pool.read().queued_transactions());
434 queued
435 }
436
437 fn pending_and_queued_txn_count(&self) -> (usize, usize) {
438 let (protocol_pending, protocol_queued) = self.protocol_pool.pending_and_queued_txn_count();
439 let (aa_pending, aa_queued) = self.aa_2d_pool.read().pending_and_queued_txn_count();
440 (protocol_pending + aa_pending, protocol_queued + aa_queued)
441 }
442
443 fn all_transactions(&self) -> AllPoolTransactions<Self::Transaction> {
444 let mut transactions = self.protocol_pool.all_transactions();
445 {
446 let aa_2d_pool = self.aa_2d_pool.read();
447 transactions
448 .pending
449 .extend(aa_2d_pool.pending_transactions());
450 transactions.queued.extend(aa_2d_pool.queued_transactions());
451 }
452 transactions
453 }
454
455 fn all_transaction_hashes(&self) -> Vec<B256> {
456 let mut hashes = self.protocol_pool.all_transaction_hashes();
457 hashes.extend(self.aa_2d_pool.read().all_transaction_hashes_iter());
458 hashes
459 }
460
461 fn remove_transactions(
462 &self,
463 hashes: Vec<B256>,
464 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
465 let mut txs = self.aa_2d_pool.write().remove_transactions(hashes.iter());
466 txs.extend(self.protocol_pool.remove_transactions(hashes));
467 txs
468 }
469
470 fn remove_transactions_and_descendants(
471 &self,
472 hashes: Vec<B256>,
473 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
474 let mut txs = self
475 .aa_2d_pool
476 .write()
477 .remove_transactions_and_descendants(hashes.iter());
478 txs.extend(
479 self.protocol_pool
480 .remove_transactions_and_descendants(hashes),
481 );
482 txs
483 }
484
485 fn remove_transactions_by_sender(
486 &self,
487 sender: Address,
488 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
489 let mut txs = self
490 .aa_2d_pool
491 .write()
492 .remove_transactions_by_sender(sender);
493 txs.extend(self.protocol_pool.remove_transactions_by_sender(sender));
494 txs
495 }
496
497 fn retain_unknown<A: HandleMempoolData>(&self, announcement: &mut A) {
498 self.protocol_pool.retain_unknown(announcement);
499 let aa_pool = self.aa_2d_pool.read();
500 announcement.retain_by_hash(|tx| !aa_pool.contains(tx))
501 }
502
503 fn contains(&self, tx_hash: &B256) -> bool {
504 self.protocol_pool.contains(tx_hash) || self.aa_2d_pool.read().contains(tx_hash)
505 }
506
507 fn get(&self, tx_hash: &B256) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
508 self.protocol_pool
509 .get(tx_hash)
510 .or_else(|| self.aa_2d_pool.read().get(tx_hash))
511 }
512
513 fn get_all(&self, txs: Vec<B256>) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
514 let mut result = self.aa_2d_pool.read().get_all(txs.iter());
515 result.extend(self.protocol_pool.get_all(txs));
516 result
517 }
518
519 fn on_propagated(&self, txs: PropagatedTransactions) {
520 self.protocol_pool.on_propagated(txs);
521 }
522
523 fn get_transactions_by_sender(
524 &self,
525 sender: Address,
526 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
527 let mut txs = self.protocol_pool.get_transactions_by_sender(sender);
528 txs.extend(
529 self.aa_2d_pool
530 .read()
531 .get_transactions_by_sender_iter(sender),
532 );
533 txs
534 }
535
536 fn get_pending_transactions_with_predicate(
537 &self,
538 predicate: impl FnMut(&ValidPoolTransaction<Self::Transaction>) -> bool,
539 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
540 self.protocol_pool
542 .get_pending_transactions_with_predicate(predicate)
543 }
544
545 fn get_pending_transactions_by_sender(
546 &self,
547 sender: Address,
548 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
549 let mut txs = self
550 .protocol_pool
551 .get_pending_transactions_by_sender(sender);
552 txs.extend(
553 self.aa_2d_pool
554 .read()
555 .pending_transactions()
556 .filter(|tx| tx.sender() == sender),
557 );
558
559 txs
560 }
561
562 fn get_queued_transactions_by_sender(
563 &self,
564 sender: Address,
565 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
566 self.protocol_pool.get_queued_transactions_by_sender(sender)
567 }
568
569 fn get_highest_transaction_by_sender(
570 &self,
571 sender: Address,
572 ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
573 self.protocol_pool.get_highest_transaction_by_sender(sender)
576 }
577
578 fn get_highest_consecutive_transaction_by_sender(
579 &self,
580 sender: Address,
581 on_chain_nonce: u64,
582 ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
583 self.protocol_pool
585 .get_highest_consecutive_transaction_by_sender(sender, on_chain_nonce)
586 }
587
588 fn get_transaction_by_sender_and_nonce(
589 &self,
590 sender: Address,
591 nonce: u64,
592 ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
593 self.protocol_pool
595 .get_transaction_by_sender_and_nonce(sender, nonce)
596 }
597
598 fn get_transactions_by_origin(
599 &self,
600 origin: TransactionOrigin,
601 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
602 let mut txs = self.protocol_pool.get_transactions_by_origin(origin);
603 txs.extend(
604 self.aa_2d_pool
605 .read()
606 .get_transactions_by_origin_iter(origin),
607 );
608 txs
609 }
610
611 fn get_pending_transactions_by_origin(
612 &self,
613 origin: TransactionOrigin,
614 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
615 let mut txs = self
616 .protocol_pool
617 .get_pending_transactions_by_origin(origin);
618 txs.extend(
619 self.aa_2d_pool
620 .read()
621 .get_pending_transactions_by_origin_iter(origin),
622 );
623 txs
624 }
625
626 fn unique_senders(&self) -> HashSet<Address> {
627 let mut senders = self.protocol_pool.unique_senders();
628 senders.extend(self.aa_2d_pool.read().senders_iter().copied());
629 senders
630 }
631
632 fn get_blob(
633 &self,
634 tx_hash: B256,
635 ) -> Result<
636 Option<Arc<alloy_eips::eip7594::BlobTransactionSidecarVariant>>,
637 reth_transaction_pool::blobstore::BlobStoreError,
638 > {
639 self.protocol_pool.get_blob(tx_hash)
640 }
641
642 fn get_all_blobs(
643 &self,
644 tx_hashes: Vec<B256>,
645 ) -> Result<
646 Vec<(
647 B256,
648 Arc<alloy_eips::eip7594::BlobTransactionSidecarVariant>,
649 )>,
650 reth_transaction_pool::blobstore::BlobStoreError,
651 > {
652 self.protocol_pool.get_all_blobs(tx_hashes)
653 }
654
655 fn get_all_blobs_exact(
656 &self,
657 tx_hashes: Vec<B256>,
658 ) -> Result<
659 Vec<Arc<alloy_eips::eip7594::BlobTransactionSidecarVariant>>,
660 reth_transaction_pool::blobstore::BlobStoreError,
661 > {
662 self.protocol_pool.get_all_blobs_exact(tx_hashes)
663 }
664
665 fn get_blobs_for_versioned_hashes_v1(
666 &self,
667 versioned_hashes: &[B256],
668 ) -> Result<
669 Vec<Option<alloy_eips::eip4844::BlobAndProofV1>>,
670 reth_transaction_pool::blobstore::BlobStoreError,
671 > {
672 self.protocol_pool
673 .get_blobs_for_versioned_hashes_v1(versioned_hashes)
674 }
675
676 fn get_blobs_for_versioned_hashes_v2(
677 &self,
678 versioned_hashes: &[B256],
679 ) -> Result<
680 Option<Vec<alloy_eips::eip4844::BlobAndProofV2>>,
681 reth_transaction_pool::blobstore::BlobStoreError,
682 > {
683 self.protocol_pool
684 .get_blobs_for_versioned_hashes_v2(versioned_hashes)
685 }
686}
687
688impl<Client> TransactionPoolExt for TempoTransactionPool<Client>
689where
690 Client: StateProviderFactory + ChainSpecProvider<ChainSpec = TempoChainSpec> + 'static,
691{
692 fn set_block_info(&self, info: BlockInfo) {
693 self.protocol_pool.set_block_info(info)
694 }
695
696 fn on_canonical_state_change<B>(&self, update: CanonicalStateUpdate<'_, B>)
697 where
698 B: Block,
699 {
700 self.protocol_pool.on_canonical_state_change(update)
701 }
702
703 fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
704 self.protocol_pool.update_accounts(accounts)
705 }
706
707 fn delete_blob(&self, tx: B256) {
708 self.protocol_pool.delete_blob(tx)
709 }
710
711 fn delete_blobs(&self, txs: Vec<B256>) {
712 self.protocol_pool.delete_blobs(txs)
713 }
714
715 fn cleanup_blobs(&self) {
716 self.protocol_pool.cleanup_blobs()
717 }
718}