Skip to main content

tempo_payload_builder/
lib.rs

1//! Tempo Payload Builder.
2
3#![cfg_attr(not(test), warn(unused_crate_dependencies))]
4#![cfg_attr(docsrs, feature(doc_cfg))]
5
6mod budget;
7mod encode;
8mod metrics;
9mod prewarming;
10
11pub use budget::DEFAULT_BUILD_TIME_MULTIPLIER;
12use crossbeam_channel::Sender;
13use reth_trie_common::ordered_root::OrderedTrieRootEncodedBuilder;
14
15use crate::{
16    budget::{
17        BUILD_TIME_MULTIPLIER_SCALE, decay_build_time_multiplier, observed_build_time_multiplier,
18        payload_budget_decision, scaled_build_time_multiplier,
19    },
20    encode::{EncodedBlockTransactionList, EncodedBlockTransactionsBuilder, ExecutionBlockEncoder},
21    metrics::{BlockBuildStopReason, InstrumentedFinishProvider, TempoPayloadBuilderMetrics},
22    prewarming::BestTransactionsPrewarming,
23};
24use alloy_consensus::{BlockHeader as _, Signed, Transaction as _, TxLegacy, TxReceipt};
25use alloy_eip7928::bal::Bal;
26use alloy_eips::eip2718::Encodable2718;
27use alloy_primitives::{Address, B256, Bloom, Bytes, U256, keccak256};
28use alloy_rlp::{Decodable, Encodable};
29use reth_basic_payload_builder::{
30    BuildArguments, BuildOutcome, MissingPayloadBehaviour, PayloadBuilder, PayloadConfig,
31    is_better_payload,
32};
33use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks};
34use reth_consensus_common::validation::MAX_RLP_BLOCK_SIZE;
35use reth_engine_tree::tree::{
36    CachedStateMetrics, CachedStateMetricsSource, CachedStateProvider,
37    instrumented_state::InstrumentedStateProvider,
38};
39use reth_errors::{ConsensusError, ProviderError};
40use reth_evm::{
41    ConfigureEvm, Database, Evm, NextBlockEnvAttributes, OnStateHook,
42    block::{BlockExecutionError, BlockExecutor, BlockValidationError},
43    execute::BlockAssemblerInput,
44};
45use reth_execution_types::BlockExecutionOutput;
46use reth_payload_builder::{EthBuiltPayload, PayloadBuilderError};
47use reth_payload_primitives::{BuiltPayload, BuiltPayloadExecutedBlock};
48use reth_primitives_traits::{
49    Recovered, RecoveredBlock, transaction::error::InvalidTransactionError,
50};
51use reth_revm::{
52    State, context::Block, database::StateProviderDatabase,
53    db::states::bundle_state::BundleRetention, state::EvmState,
54};
55use reth_storage_api::{HashedPostStateProvider, StateProviderFactory, StateRootProvider};
56use reth_tasks::TaskExecutor;
57use reth_transaction_pool::{
58    BestTransactions, BestTransactionsAttributes, PoolTransaction, TransactionPool,
59    ValidPoolTransaction, error::InvalidPoolTransactionError,
60};
61use std::{
62    sync::{
63        Arc,
64        atomic::{AtomicU64, Ordering},
65        mpsc,
66    },
67    time::{Duration, Instant},
68};
69use tempo_chainspec::{TempoChainSpec, hardfork::TempoHardforks};
70use tempo_evm::{TempoEvmConfig, TempoNextBlockEnvAttributes, TempoStateAccess, evm::TempoEvm};
71use tempo_payload_types::{
72    TempoBuiltPayload, TempoPayloadAttributes, ValidationLatencyWorkload, marshal_persist_estimate,
73};
74use tempo_precompiles::{storage::StorageActions, validator_config_v2::ValidatorConfigV2};
75use tempo_primitives::{
76    RecoveredSubBlock, SubBlockMetadata, TempoHeader, TempoReceipt, TempoTxEnvelope,
77    subblock::PartialValidatorKey,
78    transaction::envelope::{TEMPO_SYSTEM_TX_SENDER, TEMPO_SYSTEM_TX_SIGNATURE},
79};
80use tempo_transaction_pool::{
81    StateAwareBestTransactions, TempoTransactionPool,
82    transaction::{TempoPoolTransactionError, TempoPooledTransaction},
83};
84use tokio::sync::oneshot;
85use tracing::{Level, debug, debug_span, error, info, instrument, trace, warn};
86
87/// Conservative estimate for non-transaction execution block RLP bytes.
88///
89/// Exact block RLP length is computed asynchronously after payload construction, so the builder uses
90/// this margin together with known transaction, withdrawal, and extra-data lengths for Osaka size
91/// checks and pacing estimates.
92const NON_TRANSACTION_SIZE_ESTIMATE: usize = 2048;
93
94#[derive(Debug, Clone)]
95pub struct TempoPayloadBuilder<Provider> {
96    pool: TempoTransactionPool<Provider>,
97    provider: Provider,
98    executor: TaskExecutor,
99    evm_config: TempoEvmConfig,
100    metrics: TempoPayloadBuilderMetrics,
101    cache_metrics: CachedStateMetrics,
102    /// Height at which we've seen an invalid subblock.
103    ///
104    /// We pre-validate all of the subblock transactions when collecting subblocks, so this
105    /// should never be set because subblocks with invalid transactions should never make it to the payload builder.
106    ///
107    /// However, due to disruptive nature of subblock-related bugs (invalid subblock
108    /// we're continuously failing to apply halts block building), we protect against this by tracking
109    /// last height at which we've seen an invalid subblock, and not including any subblocks
110    /// at this height for any payloads.
111    highest_invalid_subblock: Arc<AtomicU64>,
112    /// Whether the node is configured in `--dev` miner mode.
113    is_dev: bool,
114    /// Whether to enable state provider metrics.
115    state_provider_metrics: bool,
116    /// Whether to enable prewarming of best transactions.
117    enable_prewarming: bool,
118    /// Whether to include block access lists in built execution payloads.
119    enable_bal: bool,
120    /// Learned estimate of total replayable build work divided by work at tx cutoff.
121    ///
122    /// This lets the builder reserve time for non-interruptible
123    /// `builder_finish` without a fixed duration.
124    build_time_multiplier: Arc<AtomicU64>,
125}
126
127/// Runtime settings for the Tempo payload builder.
128#[derive(Debug, Clone, Copy)]
129pub struct TempoPayloadBuilderConfig {
130    /// Whether the node is configured in `--dev` miner mode.
131    pub is_dev: bool,
132    /// Whether to enable state provider metrics.
133    pub state_provider_metrics: bool,
134    /// Whether to enable prewarming of best transactions.
135    pub enable_prewarming: bool,
136    /// Initial estimate of total replayable build work divided by work at tx cutoff.
137    ///
138    /// `1.0` means no finish-work headroom beyond observed work so far. Values
139    /// above `1.0` stop transaction execution earlier to leave room for
140    /// `builder_finish`, which validators also repeat.
141    pub build_time_multiplier: f64,
142}
143
144impl Default for TempoPayloadBuilderConfig {
145    fn default() -> Self {
146        Self {
147            is_dev: false,
148            state_provider_metrics: false,
149            enable_prewarming: true,
150            build_time_multiplier: DEFAULT_BUILD_TIME_MULTIPLIER,
151        }
152    }
153}
154
155impl<Provider> TempoPayloadBuilder<Provider> {
156    pub fn new(
157        pool: TempoTransactionPool<Provider>,
158        provider: Provider,
159        executor: TaskExecutor,
160        evm_config: TempoEvmConfig,
161        config: TempoPayloadBuilderConfig,
162    ) -> Self {
163        Self {
164            pool,
165            provider,
166            executor,
167            evm_config,
168            metrics: TempoPayloadBuilderMetrics::default(),
169            cache_metrics: CachedStateMetrics::zeroed(CachedStateMetricsSource::Builder),
170            highest_invalid_subblock: Default::default(),
171            is_dev: config.is_dev,
172            state_provider_metrics: config.state_provider_metrics,
173            enable_prewarming: config.enable_prewarming,
174            enable_bal: cfg!(feature = "bal"),
175            build_time_multiplier: Arc::new(AtomicU64::new(scaled_build_time_multiplier(
176                config.build_time_multiplier,
177            ))),
178        }
179    }
180
181    fn build_time_multiplier(&self) -> u64 {
182        self.build_time_multiplier.load(Ordering::Relaxed)
183    }
184
185    fn update_build_time_multiplier(&self, total_work: Duration, work_at_tx_cutoff: Duration) {
186        let Some(observed) = observed_build_time_multiplier(total_work, work_at_tx_cutoff) else {
187            return;
188        };
189        let _ = self.build_time_multiplier.fetch_update(
190            Ordering::Relaxed,
191            Ordering::Relaxed,
192            |current| Some(decay_build_time_multiplier(current, observed)),
193        );
194    }
195}
196
197impl<Provider: ChainSpecProvider<ChainSpec = TempoChainSpec>> TempoPayloadBuilder<Provider> {
198    /// Builds system transactions to seal the block.
199    ///
200    /// Returns a vector of system transactions that must be executed at the end of each block:
201    /// - Subblocks signatures - validates subblock signatures
202    fn build_seal_block_txs(
203        &self,
204        evm: &TempoEvm<impl Database>,
205        subblocks: &[RecoveredSubBlock],
206    ) -> Vec<Recovered<TempoTxEnvelope>> {
207        if subblocks.is_empty() && evm.cfg.spec.is_t4() {
208            // Post-T4, omit the subblocks metadata transaction if there are no subblocks
209            return vec![];
210        }
211
212        let chain_spec = self.provider.chain_spec();
213        let chain_id = Some(chain_spec.chain().id());
214
215        // Build subblocks signatures system transaction
216        let subblocks_metadata = subblocks
217            .iter()
218            .map(|s| s.metadata())
219            .collect::<Vec<SubBlockMetadata>>();
220        let subblocks_input = alloy_rlp::encode(&subblocks_metadata)
221            .into_iter()
222            .chain(evm.block.number.to_be_bytes_vec())
223            .collect();
224
225        let subblocks_signatures_tx = Recovered::new_unchecked(
226            TempoTxEnvelope::Legacy(Signed::new_unhashed(
227                TxLegacy {
228                    chain_id,
229                    nonce: 0,
230                    gas_price: 0,
231                    gas_limit: 0,
232                    to: Address::ZERO.into(),
233                    value: U256::ZERO,
234                    input: subblocks_input,
235                },
236                TEMPO_SYSTEM_TX_SIGNATURE,
237            )),
238            TEMPO_SYSTEM_TX_SENDER,
239        );
240
241        vec![subblocks_signatures_tx]
242    }
243}
244
245impl<Provider> PayloadBuilder for TempoPayloadBuilder<Provider>
246where
247    Provider:
248        StateProviderFactory + ChainSpecProvider<ChainSpec = TempoChainSpec> + Clone + 'static,
249{
250    type Attributes = TempoPayloadAttributes;
251    type BuiltPayload = TempoBuiltPayload;
252
253    fn try_build(
254        &self,
255        args: BuildArguments<Self::Attributes, Self::BuiltPayload>,
256    ) -> Result<BuildOutcome<Self::BuiltPayload>, PayloadBuilderError> {
257        self.build_payload(
258            args,
259            |attributes| self.pool.best_transactions_with_attributes(attributes),
260            false,
261        )
262    }
263
264    fn on_missing_payload(
265        &self,
266        _args: BuildArguments<Self::Attributes, Self::BuiltPayload>,
267    ) -> MissingPayloadBehaviour<Self::BuiltPayload> {
268        MissingPayloadBehaviour::AwaitInProgress
269    }
270
271    fn build_empty_payload(
272        &self,
273        config: PayloadConfig<Self::Attributes, TempoHeader>,
274    ) -> Result<Self::BuiltPayload, PayloadBuilderError> {
275        self.build_payload(
276            BuildArguments::new(
277                Default::default(),
278                None,
279                None,
280                config,
281                Default::default(),
282                Default::default(),
283            ),
284            |_| core::iter::empty(),
285            true,
286        )?
287        .into_payload()
288        .ok_or_else(|| PayloadBuilderError::MissingPayload)
289    }
290}
291
292impl<Provider> TempoPayloadBuilder<Provider>
293where
294    Provider:
295        StateProviderFactory + ChainSpecProvider<ChainSpec = TempoChainSpec> + Clone + 'static,
296{
297    #[instrument(
298        target = "payload_builder",
299        skip_all,
300        fields(
301            id = %args.config.payload_id,
302            parent_number = %args.config.parent_header.number(),
303            parent_hash = %args.config.parent_header.hash()
304        )
305    )]
306    fn build_payload<Txs>(
307        &self,
308        args: BuildArguments<TempoPayloadAttributes, TempoBuiltPayload>,
309        best_txs: impl FnOnce(BestTransactionsAttributes) -> Txs,
310        empty: bool,
311    ) -> Result<BuildOutcome<TempoBuiltPayload>, PayloadBuilderError>
312    where
313        Txs: BestTransactions<Item = Arc<ValidPoolTransaction<TempoPooledTransaction>>>
314            + Send
315            + 'static,
316    {
317        let BuildArguments {
318            cached_reads,
319            execution_cache,
320            mut trie_handle,
321            config,
322            cancel,
323            best_payload,
324            ..
325        } = args;
326        let PayloadConfig {
327            parent_header,
328            attributes,
329            payload_id,
330            ..
331        } = config;
332        let build_once_with_shared_trie =
333            // When trie handle is provided, we build the payload once so the shared trie can be reused.
334            trie_handle.is_some()
335            // `--dev` mode does not use the shared-trie builder flow.
336            && !self.is_dev;
337
338        macro_rules! check_cancel {
339            () => {
340                if cancel.is_cancelled() {
341                    return Ok(BuildOutcome::Cancelled);
342                }
343            };
344        }
345
346        check_cancel!();
347
348        let start = Instant::now();
349
350        let block_time_millis =
351            (attributes.timestamp_millis() - parent_header.timestamp_millis()) as f64;
352        self.metrics.block_time_millis.record(block_time_millis);
353        self.metrics.block_time_millis_last.set(block_time_millis);
354
355        let state_setup_start = Instant::now();
356        let _state_setup_span = debug_span!(target: "payload_builder", "state_setup").entered();
357        let mut state_provider = self.provider.state_by_block_hash(parent_header.hash())?;
358        if let Some(execution_cache) = &execution_cache {
359            state_provider = Box::new(CachedStateProvider::new(
360                state_provider,
361                execution_cache.cache().clone(),
362                Some(self.cache_metrics.clone()),
363            ));
364        }
365        if self.state_provider_metrics {
366            state_provider = Box::new(InstrumentedStateProvider::new(state_provider, "builder"));
367        }
368
369        let state = StateProviderDatabase::new(&state_provider);
370        let mut db = State::builder()
371            .with_database(Box::new(state) as Box<dyn Database<Error = ProviderError>>)
372            .with_bundle_update()
373            .build();
374        drop(_state_setup_span);
375        self.metrics
376            .state_setup_duration_seconds
377            .record(state_setup_start.elapsed());
378
379        check_cancel!();
380
381        let chain_spec = self.provider.chain_spec();
382        let is_osaka = self
383            .provider
384            .chain_spec()
385            .is_osaka_active_at_timestamp(attributes.timestamp);
386
387        let block_gas_limit: u64 = parent_header.gas_limit();
388        let shared_gas_limit =
389            chain_spec.shared_gas_limit_at(attributes.timestamp, block_gas_limit);
390        // Non-shared gas limit is the maximum gas available for proposer's pool transactions.
391        // The remaining `shared_gas_limit` is reserved for validator subblocks.
392        let non_shared_gas_limit = block_gas_limit - shared_gas_limit;
393        let general_gas_limit = chain_spec.general_gas_limit_at(
394            attributes.timestamp,
395            block_gas_limit,
396            shared_gas_limit,
397        );
398        let hardfork = chain_spec.tempo_hardfork_at(attributes.timestamp);
399
400        let mut cumulative_gas_used = 0;
401        let mut cumulative_state_gas_used = 0u64;
402        let mut non_payment_gas_used = 0;
403        let mut estimated_rlp_block_size = attributes
404            .withdrawals
405            .as_ref()
406            .map(|w| w.length())
407            .unwrap_or(0)
408            + NON_TRANSACTION_SIZE_ESTIMATE
409            + attributes.extra_data().length();
410        let mut payment_transactions = 0u64;
411        let mut pool_transactions_yielded = 0u64;
412        let mut pool_transactions_included = 0u64;
413        let mut total_fees = U256::ZERO;
414
415        // If building an empty payload, don't include any subblocks
416        //
417        // Also don't include any subblocks if we've seen an invalid subblock
418        // at this height or above.
419        let mut subblocks = if empty
420            || self.highest_invalid_subblock.load(Ordering::Relaxed) > parent_header.number()
421        {
422            vec![]
423        } else {
424            attributes.subblocks()
425        };
426
427        subblocks.retain(|subblock| {
428            // Edge case: remove subblocks with expired transactions
429            //
430            // We pre-validate all of the subblocks on top of parent state in subblocks service
431            // which leaves the only reason for transactions to get invalidated by expiry of
432            // `valid_before` field.
433            if subblock.has_expired_transactions(attributes.timestamp) {
434                self.metrics.inc_subblocks_expired();
435                return false;
436            }
437
438            // Account for the subblock's size
439            estimated_rlp_block_size += subblock.total_tx_size();
440
441            true
442        });
443
444        let subblock_fee_recipients = subblocks
445            .iter()
446            .map(|subblock| {
447                (
448                    PartialValidatorKey::from_slice(&subblock.validator()[..15]),
449                    subblock.fee_recipient,
450                )
451            })
452            .collect();
453
454        let next_attributes = TempoNextBlockEnvAttributes {
455            inner: NextBlockEnvAttributes {
456                timestamp: attributes.timestamp,
457                suggested_fee_recipient: attributes.suggested_fee_recipient,
458                prev_randao: attributes.prev_randao,
459                gas_limit: block_gas_limit,
460                parent_beacon_block_root: attributes.parent_beacon_block_root,
461                withdrawals: attributes.withdrawals.clone().map(Into::into),
462                extra_data: attributes.extra_data().clone(),
463                slot_number: attributes.slot_number,
464            },
465            general_gas_limit,
466            shared_gas_limit,
467            timestamp_millis_part: attributes.timestamp_millis_part(),
468            consensus_context: attributes.consensus_context(),
469            subblock_fee_recipients,
470        };
471        let evm_env = self
472            .evm_config
473            .next_evm_env(&parent_header, &next_attributes)
474            .map_err(PayloadBuilderError::other)?;
475        let ctx = self
476            .evm_config
477            .context_for_next_block(&parent_header, next_attributes)
478            .map_err(PayloadBuilderError::other)?;
479
480        let evm = self.evm_config.evm_with_env(&mut db, evm_env);
481        let mut executor = self.evm_config.create_executor(evm, ctx.clone());
482
483        check_cancel!();
484
485        // Override the fee recipient with the on-chain value from the V2
486        // validator config contract, if available.
487        maybe_override_fee_recipient(&mut executor, &attributes);
488
489        let bal_task_handle = if self.enable_bal {
490            let bal_task_handle =
491                self.spawn_bal_task(trie_handle.as_ref().map(|handle| handle.state_hook()));
492            executor
493                .evm_mut()
494                .db_mut()
495                .set_state_hook(Some(Box::new(bal_task_handle.state_hook())));
496            Some(bal_task_handle)
497        } else {
498            if let Some(ref handle) = trie_handle {
499                executor
500                    .evm_mut()
501                    .db_mut()
502                    .set_state_hook(Some(Box::new(handle.state_hook())));
503            }
504            None
505        };
506
507        executor.apply_pre_execution_changes().map_err(|err| {
508            warn!(%err, "failed to apply pre-execution changes");
509            PayloadBuilderError::Internal(err.into())
510        })?;
511        if let Some(bal_task_handle) = &bal_task_handle {
512            bal_task_handle.bump_bal_index();
513        }
514
515        check_cancel!();
516
517        debug!("building new payload");
518
519        let (roots_tx, roots_rx) = self.spawn_roots_task();
520
521        // Prepare system transactions before actual block building and account for their size.
522        let prepare_system_txs_start = Instant::now();
523        let system_txs = self.build_seal_block_txs(executor.evm(), &subblocks);
524        for tx in &system_txs {
525            estimated_rlp_block_size += tx.inner().length();
526        }
527        let prepare_system_txs_elapsed = prepare_system_txs_start.elapsed();
528        self.metrics
529            .prepare_system_transactions_duration_seconds
530            .record(prepare_system_txs_elapsed);
531
532        if is_osaka && estimated_rlp_block_size > MAX_RLP_BLOCK_SIZE {
533            return Err(PayloadBuilderError::other(ConsensusError::BlockTooLarge {
534                rlp_length: estimated_rlp_block_size,
535                max_rlp_length: MAX_RLP_BLOCK_SIZE,
536            }));
537        }
538
539        let base_fee = executor.evm().block().basefee;
540        let pool_fetch_start = Instant::now();
541        let best_txs = best_txs(BestTransactionsAttributes::new(
542            base_fee,
543            executor
544                .evm()
545                .block()
546                .blob_gasprice()
547                .map(|gasprice| gasprice as u64),
548        ));
549        // Wrap best transactions into state-aware wrapper to skip transactions that
550        // get invalidated by already-executed ones.
551        let mut best_txs = StateAwareBestTransactions::new(if self.enable_prewarming {
552            Box::new(BestTransactionsPrewarming::new(
553                self.executor.clone(),
554                self.provider.clone(),
555                execution_cache,
556                parent_header.hash(),
557                executor.evm().evm_env(),
558                best_txs,
559            )) as Box<dyn BestTransactions<Item = _>>
560        } else {
561            Box::new(best_txs)
562        });
563        self.metrics
564            .pool_fetch_duration_seconds
565            .record(pool_fetch_start.elapsed());
566
567        let execution_start = Instant::now();
568        let _block_fill_span = debug_span!(target: "payload_builder", "block_fill").entered();
569        let mut skipped_oversized_block = false;
570        let mut invalid_pool_transaction_execution_attempts = 0u64;
571        let mut normal_transaction_fill_idle_elapsed = Duration::ZERO;
572        // Consensus builds carry a remaining proposal budget. When present, the
573        // builder stops pool tx execution before projected proposer and validator
574        // work would consume that window.
575        let payload_build_budget = attributes.payload_build_budget();
576        let build_time_multiplier = self.build_time_multiplier();
577        let marshal_persist = marshal_persist_estimate();
578        let validation_latency = attributes.validation_latency_estimate();
579        let block_build_stop_reason = loop {
580            check_cancel!();
581
582            if let Some(build_budget) = payload_build_budget {
583                let elapsed = start.elapsed();
584                let current_workload = ValidationLatencyWorkload::new(
585                    cumulative_gas_used,
586                    pool_transactions_included as usize,
587                );
588                let budget_decision = payload_budget_decision(
589                    elapsed,
590                    normal_transaction_fill_idle_elapsed,
591                    build_time_multiplier,
592                    marshal_persist,
593                    estimated_rlp_block_size,
594                    validation_latency,
595                    current_workload,
596                );
597                if budget_decision.total_reserved >= build_budget {
598                    debug!(
599                        target: "payload_builder",
600                        ?elapsed,
601                        ?normal_transaction_fill_idle_elapsed,
602                        ?build_budget,
603                        predicted_builder_work = ?budget_decision.predicted_builder_work,
604                        predicted_validator_work = ?budget_decision.predicted_validator_work,
605                        total_reserved = ?budget_decision.total_reserved,
606                        marshal_persist = ?budget_decision.marshal_persist,
607                        ?current_workload,
608                        gas_used = cumulative_gas_used,
609                        transactions = pool_transactions_included,
610                        estimated_rlp_block_size,
611                        build_time_multiplier = build_time_multiplier as f64
612                            / BUILD_TIME_MULTIPLIER_SCALE as f64,
613                        "stopping pool transaction execution before payload build budget is exhausted"
614                    );
615                    break BlockBuildStopReason::BuildBudget;
616                }
617            }
618
619            let Some(pool_tx) = best_txs.next() else {
620                if build_once_with_shared_trie
621                    && payload_build_budget.is_some()
622                    && cumulative_gas_used < non_shared_gas_limit
623                {
624                    std::thread::sleep(Duration::from_millis(1));
625                    normal_transaction_fill_idle_elapsed += Duration::from_millis(1);
626                    continue;
627                }
628                let stop_reason = if cumulative_gas_used >= non_shared_gas_limit {
629                    BlockBuildStopReason::GasLimit
630                } else if skipped_oversized_block {
631                    BlockBuildStopReason::RlpBlockSizeLimit
632                } else {
633                    BlockBuildStopReason::TxPoolEmpty
634                };
635                break stop_reason;
636            };
637            pool_transactions_yielded += 1;
638
639            let max_regular_gas_used = core::cmp::min(
640                pool_tx.gas_limit(),
641                executor.evm().cfg.tx_gas_limit_cap.unwrap_or(u64::MAX),
642            );
643
644            // Ensure we still have capacity for this transaction within the non-shared gas limit.
645            // The remaining `shared_gas_limit` is reserved for validator subblocks and must not
646            // be consumed by proposer's pool transactions.
647            if cumulative_gas_used + max_regular_gas_used > non_shared_gas_limit {
648                // Mark this transaction as invalid since it doesn't fit
649                // The iterator will handle lane switching internally when appropriate
650                best_txs.mark_invalid(
651                    &pool_tx,
652                    InvalidPoolTransactionError::ExceedsGasLimit(
653                        pool_tx.gas_limit(),
654                        non_shared_gas_limit - cumulative_gas_used,
655                    ),
656                );
657                self.metrics
658                    .inc_pool_tx_skipped("exceeds_non_shared_gas_limit");
659                continue;
660            }
661
662            let is_payment = if hardfork.is_t5() {
663                pool_tx.transaction.is_payment()
664            } else {
665                pool_tx.transaction.inner().is_payment_v1()
666            };
667
668            // If the tx is not a payment and will exceed the general gas limit
669            // mark the tx as invalid and continue
670            if !is_payment && non_payment_gas_used + max_regular_gas_used > general_gas_limit {
671                best_txs.mark_invalid(
672                    &pool_tx,
673                    InvalidPoolTransactionError::Other(Box::new(
674                        TempoPoolTransactionError::ExceedsNonPaymentLimit,
675                    )),
676                );
677                self.metrics
678                    .inc_pool_tx_skipped("exceeds_general_gas_limit");
679                continue;
680            }
681
682            check_cancel!();
683            if is_payment {
684                payment_transactions += 1;
685            }
686
687            let tx_rlp_length = pool_tx.transaction.encoded_length();
688            let estimated_block_size_with_tx = estimated_rlp_block_size + tx_rlp_length;
689
690            if is_osaka && estimated_block_size_with_tx > MAX_RLP_BLOCK_SIZE {
691                best_txs.mark_invalid(
692                    &pool_tx,
693                    InvalidPoolTransactionError::OversizedData {
694                        size: estimated_block_size_with_tx,
695                        limit: MAX_RLP_BLOCK_SIZE,
696                    },
697                );
698                self.metrics.inc_pool_tx_skipped("oversized_block");
699                skipped_oversized_block = true;
700                continue;
701            }
702
703            let tx_debug_repr = tracing::enabled!(Level::TRACE)
704                .then(|| format!("{:?}", pool_tx.transaction))
705                .unwrap_or_default();
706
707            let execution_result = executor.execute_transaction_with_result_closure(
708                pool_tx.transaction.executable(),
709                |result| {
710                    cumulative_gas_used += result.block_gas_used();
711                    cumulative_state_gas_used += result.state_gas_used();
712                    if !is_payment {
713                        non_payment_gas_used += result.block_gas_used();
714                    }
715
716                    // Score payload value by the validator-credited fee amount that the
717                    // FeeManager precompile actually wrote during this transaction.
718                    total_fees += result.validator_fee();
719
720                    // Notify transactions iterator about the new state.
721                    best_txs.on_new_result(result);
722                },
723            );
724            if let Err(err) = execution_result {
725                if let BlockExecutionError::Validation(BlockValidationError::InvalidTx {
726                    error,
727                    ..
728                }) = &err
729                {
730                    invalid_pool_transaction_execution_attempts += 1;
731
732                    if error.is_nonce_too_low() {
733                        // if the nonce is too low, we can skip this transaction
734                        trace!(%error, tx = %tx_debug_repr, "skipping nonce too low transaction");
735                        self.metrics.inc_pool_tx_skipped("nonce_too_low");
736                    } else {
737                        // if the transaction is invalid, we can skip it and all of its
738                        // descendants
739                        trace!(%error, tx = %tx_debug_repr, "skipping invalid transaction and its descendants");
740                        best_txs.mark_invalid(
741                            &pool_tx,
742                            InvalidPoolTransactionError::Consensus(
743                                InvalidTransactionError::TxTypeNotSupported,
744                            ),
745                        );
746                        self.metrics.inc_pool_tx_skipped("invalid_tx");
747                    }
748                    continue;
749                } else {
750                    return Err(PayloadBuilderError::evm(err));
751                }
752            };
753            trace!("Transaction executed");
754            if let Some(bal_task_handle) = &bal_task_handle {
755                bal_task_handle.bump_bal_index();
756            }
757
758            pool_transactions_included += 1;
759            estimated_rlp_block_size += tx_rlp_length;
760            let _ = roots_tx.send((
761                BuilderTx::Pooled(pool_tx),
762                executor.receipts().last().unwrap().clone(),
763            ));
764        };
765
766        // cancel pre-warming, if any, by dropping the iter
767        drop(best_txs);
768
769        let elapsed_at_tx_cutoff = start.elapsed();
770        let validation_work_at_tx_cutoff =
771            elapsed_at_tx_cutoff.saturating_sub(normal_transaction_fill_idle_elapsed);
772        drop(_block_fill_span);
773        self.metrics
774            .inc_block_build_stop_reason(block_build_stop_reason);
775        let normal_transaction_fill_elapsed = execution_start.elapsed();
776        self.metrics
777            .total_normal_transaction_fill_duration_seconds
778            .record(normal_transaction_fill_elapsed);
779        self.metrics
780            .normal_transaction_fill_idle_duration_seconds
781            .record(normal_transaction_fill_idle_elapsed);
782        self.metrics
783            .payment_transactions
784            .record(payment_transactions as f64);
785        self.metrics
786            .payment_transactions_last
787            .set(payment_transactions as f64);
788
789        check_cancel!();
790
791        // check if we have a better block or received more subblocks
792        if !is_better_payload(best_payload.as_ref(), total_fees)
793            && !is_more_subblocks(best_payload.as_ref(), &subblocks)
794        {
795            // Release db
796            drop(executor);
797            drop(db);
798            // can skip building the block
799            return Ok(BuildOutcome::Aborted {
800                fees: total_fees,
801                cached_reads,
802            });
803        }
804
805        let subblocks_start = Instant::now();
806        let _subblock_txs_span =
807            debug_span!(target: "payload_builder", "execute_subblock_txs").entered();
808        let subblocks_count = subblocks.len() as f64;
809        let mut subblock_transactions = 0f64;
810        // Apply subblock transactions
811        for subblock in subblocks {
812            let subblock_start = Instant::now();
813            let mut subblock_tx_count = 0f64;
814
815            for tx in subblock.into_recovered_iter() {
816                if let Err(err) = executor.execute_transaction(&tx) {
817                    if let BlockExecutionError::Validation(BlockValidationError::InvalidTx {
818                        ..
819                    }) = &err
820                    {
821                        error!(
822                            ?err,
823                            "subblock transaction failed execution, aborting payload building"
824                        );
825                        self.highest_invalid_subblock
826                            .store(executor.evm().block().number.to(), Ordering::Relaxed);
827                        self.metrics.inc_build_failure("subblock_invalid_tx");
828                        return Err(PayloadBuilderError::evm(err));
829                    } else {
830                        return Err(PayloadBuilderError::evm(err));
831                    }
832                }
833                if let Some(bal_task_handle) = &bal_task_handle {
834                    bal_task_handle.bump_bal_index();
835                }
836
837                subblock_tx_count += 1.0;
838                let _ = roots_tx.send((
839                    BuilderTx::Owned(Box::new(tx)),
840                    executor.receipts().last().unwrap().clone(),
841                ));
842            }
843
844            self.metrics
845                .subblock_execution_duration_seconds
846                .record(subblock_start.elapsed());
847            self.metrics
848                .subblock_transaction_count
849                .record(subblock_tx_count);
850            subblock_transactions += subblock_tx_count;
851        }
852        drop(_subblock_txs_span);
853        let total_subblock_transaction_execution_elapsed = subblocks_start.elapsed();
854        self.metrics
855            .total_subblock_transaction_execution_duration_seconds
856            .record(total_subblock_transaction_execution_elapsed);
857        self.metrics.subblocks.record(subblocks_count);
858        self.metrics.subblocks_last.set(subblocks_count);
859        self.metrics
860            .subblock_transactions
861            .record(subblock_transactions);
862        self.metrics
863            .subblock_transactions_last
864            .set(subblock_transactions);
865
866        // Apply system transactions
867        let system_txs_execution_start = Instant::now();
868        let _system_txs_span =
869            debug_span!(target: "payload_builder", "execute_system_txs").entered();
870        for system_tx in system_txs {
871            executor
872                .execute_transaction(&system_tx)
873                .map_err(PayloadBuilderError::evm)?;
874            if let Some(bal_task_handle) = &bal_task_handle {
875                bal_task_handle.bump_bal_index();
876            }
877
878            let _ = roots_tx.send((
879                BuilderTx::Owned(Box::new(system_tx)),
880                executor.receipts().last().unwrap().clone(),
881            ));
882        }
883        drop(_system_txs_span);
884        let system_txs_execution_elapsed = system_txs_execution_start.elapsed();
885        self.metrics
886            .system_transactions_execution_duration_seconds
887            .record(system_txs_execution_elapsed);
888
889        let total_transaction_execution_elapsed = normal_transaction_fill_elapsed
890            + total_subblock_transaction_execution_elapsed
891            + system_txs_execution_elapsed;
892        self.metrics
893            .total_transaction_execution_duration_seconds
894            .record(total_transaction_execution_elapsed);
895
896        let payload_finalization_start = Instant::now();
897        let _finish_span = debug_span!(target: "payload_builder", "finish_block").entered();
898        let finish_provider = InstrumentedFinishProvider {
899            inner: &*state_provider,
900            metrics: self.metrics.clone(),
901        };
902
903        check_cancel!();
904
905        let builder_finish_start = Instant::now();
906
907        // Drop the roots task handle to trigger finalization
908        drop(roots_tx);
909
910        let (evm, execution_result) = executor.finish()?;
911        let evm_env = evm.into_env();
912
913        // merge all transitions into bundle state before deriving the hashed post-state
914        db.merge_transitions(BundleRetention::Reverts);
915
916        // Drop the state hook to signal that execution is complete and the sparse trie task can
917        // finalize the state root.
918        db.set_state_hook(None);
919
920        // Drop the BAL task sender to trigger finalization.
921        let bal_rx = bal_task_handle.map(|handle| handle.into_bal_rx());
922
923        let hashed_state = if let Some(Ok(hashed_state)) = trie_handle
924            .as_mut()
925            .map(|handle| handle.take_hashed_state_rx().recv())
926        {
927            hashed_state
928        } else {
929            finish_provider.hashed_post_state(&db.bundle_state)
930        };
931
932        let (state_root_outcome, sparse_trie_state_root_wait_elapsed) =
933            if let Some(mut handle) = trie_handle {
934                let state_root_wait_start = Instant::now();
935                let _span = debug_span!(target: "payload_builder", "await_state_root").entered();
936                match handle.state_root() {
937                    Ok(outcome) => {
938                        let elapsed = state_root_wait_start.elapsed();
939                        self.metrics
940                            .sparse_trie_state_root_wait_duration_seconds
941                            .record(elapsed);
942                        debug!(
943                            target: "payload_builder",
944                            id = %payload_id,
945                            state_root = ?outcome.state_root,
946                            "received state root from sparse trie"
947                        );
948                        Some((outcome, elapsed))
949                    }
950                    Err(err) => {
951                        warn!(
952                            target: "payload_builder",
953                            id = %payload_id,
954                            %err,
955                            "sparse trie failed, falling back to sync state root"
956                        );
957                        None
958                    }
959                }
960            } else {
961                None
962            }
963            .unzip();
964
965        let (block_access_list, block_access_list_hash) = if let Some(bal_rx) = bal_rx {
966            let (bal, bal_hash) = bal_rx.blocking_recv().map_err(PayloadBuilderError::other)?;
967            (Some(bal), Some(bal_hash))
968        } else {
969            (None, None)
970        };
971
972        let (state_root, trie_updates) = if let Some(outcome) = state_root_outcome {
973            (outcome.state_root, outcome.trie_updates)
974        } else {
975            let (state_root, trie_updates) = finish_provider
976                .state_root_with_updates(hashed_state.clone())
977                .map_err(BlockExecutionError::other)?;
978
979            (state_root, Arc::new(trie_updates))
980        };
981
982        let RootsTaskResult {
983            transactions_root,
984            receipts_root,
985            receipts_bloom,
986            transactions,
987            senders,
988            encoded_block_transactions,
989        } = roots_rx
990            .blocking_recv()
991            .map_err(PayloadBuilderError::other)?;
992
993        let block = self.evm_config.block_assembler.assemble_block(
994            BlockAssemblerInput::new(
995                evm_env,
996                ctx,
997                &parent_header,
998                transactions,
999                &execution_result,
1000                &db.bundle_state,
1001                &finish_provider,
1002                state_root,
1003                block_access_list_hash,
1004            ),
1005            Some(transactions_root),
1006            Some(receipts_root),
1007            Some(receipts_bloom),
1008        )?;
1009
1010        let block = RecoveredBlock::new_unhashed(block, senders);
1011
1012        let builder_finish_elapsed = builder_finish_start.elapsed();
1013        self.metrics
1014            .builder_finish_duration_seconds
1015            .record(builder_finish_elapsed);
1016        drop(_finish_span);
1017        let payload_finalization_elapsed = payload_finalization_start.elapsed();
1018        self.metrics
1019            .payload_finalization_duration_seconds
1020            .record(payload_finalization_elapsed);
1021
1022        let total_transactions = block.transaction_count();
1023        self.metrics
1024            .total_transactions
1025            .record(total_transactions as f64);
1026        self.metrics
1027            .total_transactions_last
1028            .set(total_transactions as f64);
1029
1030        let gas_used = block.gas_used();
1031        self.metrics.gas_used.record(gas_used as f64);
1032        self.metrics.gas_used_last.set(gas_used as f64);
1033        self.metrics
1034            .state_gas_used
1035            .record(cumulative_state_gas_used as f64);
1036        self.metrics
1037            .state_gas_used_last
1038            .set(cumulative_state_gas_used as f64);
1039        self.metrics
1040            .general_gas_used_last
1041            .set(non_payment_gas_used as f64);
1042        self.metrics
1043            .payment_gas_used_last
1044            .set(cumulative_gas_used as f64 - non_payment_gas_used as f64);
1045        self.metrics
1046            .general_gas_limit_last
1047            .set(general_gas_limit as f64);
1048        self.metrics
1049            .payment_gas_limit_last
1050            .set(non_shared_gas_limit as f64 - general_gas_limit as f64);
1051        self.metrics
1052            .shared_gas_limit_last
1053            .set(shared_gas_limit as f64);
1054
1055        let requests = chain_spec
1056            .is_prague_active_at_timestamp(attributes.timestamp)
1057            .then(|| execution_result.requests.clone());
1058
1059        let pool_transactions_inclusion_ratio = if pool_transactions_yielded == 0 {
1060            0.0
1061        } else {
1062            pool_transactions_included as f64 / pool_transactions_yielded as f64
1063        };
1064        self.metrics
1065            .pool_transactions_yielded
1066            .record(pool_transactions_yielded as f64);
1067        self.metrics
1068            .pool_transactions_yielded_last
1069            .set(pool_transactions_yielded as f64);
1070        self.metrics
1071            .pool_transactions_included
1072            .record(pool_transactions_included as f64);
1073        self.metrics
1074            .pool_transactions_included_last
1075            .set(pool_transactions_included as f64);
1076        self.metrics
1077            .invalid_pool_transaction_execution_attempts
1078            .record(invalid_pool_transaction_execution_attempts as f64);
1079        self.metrics
1080            .pool_transactions_inclusion_ratio
1081            .record(pool_transactions_inclusion_ratio);
1082        self.metrics
1083            .pool_transactions_inclusion_ratio_last
1084            .set(pool_transactions_inclusion_ratio);
1085
1086        let elapsed = start.elapsed();
1087        let validation_work_duration = elapsed.saturating_sub(normal_transaction_fill_idle_elapsed);
1088        if payload_build_budget.is_some() {
1089            self.update_build_time_multiplier(
1090                validation_work_duration,
1091                validation_work_at_tx_cutoff,
1092            );
1093        }
1094        if is_osaka && estimated_rlp_block_size > MAX_RLP_BLOCK_SIZE {
1095            return Err(PayloadBuilderError::other(ConsensusError::BlockTooLarge {
1096                rlp_length: estimated_rlp_block_size,
1097                max_rlp_length: MAX_RLP_BLOCK_SIZE,
1098            }));
1099        }
1100        let recorded_block_size_bytes =
1101            estimated_rlp_block_size + block_access_list.as_ref().map_or(0, Encodable::length);
1102        let final_workload = ValidationLatencyWorkload::new(gas_used, total_transactions);
1103        let validation_latency_duration = validation_latency
1104            .and_then(|estimate| estimate.estimate(final_workload))
1105            .unwrap_or(validation_work_duration);
1106
1107        self.metrics.payload_build_duration_seconds.record(elapsed);
1108        let gas_per_second = block.gas_used() as f64 / elapsed.as_secs_f64();
1109        self.metrics.gas_per_second.record(gas_per_second);
1110        self.metrics.gas_per_second_last.set(gas_per_second);
1111        self.metrics
1112            .rlp_block_size_bytes
1113            .record(recorded_block_size_bytes as f64);
1114        self.metrics
1115            .rlp_block_size_bytes_last
1116            .set(recorded_block_size_bytes as f64);
1117
1118        info!(
1119            parent_hash = ?block.parent_hash(),
1120            number = block.number(),
1121            hash = ?block.hash(),
1122            timestamp = block.timestamp_millis(),
1123            gas_limit = block.gas_limit(),
1124            gas_used,
1125            cumulative_state_gas_used,
1126            extra_data = %block.extra_data(),
1127            subblocks_count,
1128            payment_transactions,
1129            pool_transactions_yielded,
1130            pool_transactions_included,
1131            invalid_pool_transaction_execution_attempts,
1132            pool_transactions_inclusion_ratio,
1133            subblock_transactions,
1134            total_transactions,
1135            ?elapsed,
1136            ?validation_work_duration,
1137            ?validation_latency_duration,
1138            ?normal_transaction_fill_elapsed,
1139            ?normal_transaction_fill_idle_elapsed,
1140            ?total_subblock_transaction_execution_elapsed,
1141            ?system_txs_execution_elapsed,
1142            ?total_transaction_execution_elapsed,
1143            ?sparse_trie_state_root_wait_elapsed,
1144            ?builder_finish_elapsed,
1145            "Built payload"
1146        );
1147
1148        let block = Arc::new(block);
1149        let execution_block_encoder = ExecutionBlockEncoder::new(
1150            block.clone(),
1151            estimated_rlp_block_size,
1152            encoded_block_transactions,
1153        );
1154        // Clone the shared cache handle into the payload before the encoder is dropped.
1155        let execution_block_encoded = execution_block_encoder.encoded_block();
1156        // Drop the encoder off-thread so its `Drop` impl can populate the cache in the background.
1157        self.executor.spawn_drop(execution_block_encoder);
1158        let eth_payload = EthBuiltPayload::new(block.clone(), total_fees, requests, None);
1159
1160        let execution_output = BlockExecutionOutput {
1161            result: execution_result,
1162            state: db.take_bundle(),
1163        };
1164
1165        let executed_block = BuiltPayloadExecutedBlock {
1166            recovered_block: block,
1167            execution_output: Arc::new(execution_output),
1168            hashed_state: Arc::new(hashed_state),
1169            trie_updates,
1170        };
1171
1172        let payload = TempoBuiltPayload::new(
1173            eth_payload,
1174            block_access_list,
1175            Some(executed_block),
1176            validation_work_duration,
1177            validation_latency_duration,
1178            estimated_rlp_block_size,
1179            execution_block_encoded,
1180        );
1181
1182        drop(db);
1183        self.executor.spawn_drop(state_provider);
1184        if build_once_with_shared_trie {
1185            Ok(BuildOutcome::Freeze(payload))
1186        } else {
1187            Ok(BuildOutcome::Better {
1188                payload,
1189                cached_reads,
1190            })
1191        }
1192    }
1193
1194    fn spawn_roots_task(
1195        &self,
1196    ) -> (
1197        Sender<(BuilderTx, TempoReceipt)>,
1198        oneshot::Receiver<RootsTaskResult>,
1199    ) {
1200        let (transactions_tx, transactions_rx) =
1201            crossbeam_channel::unbounded::<(BuilderTx, TempoReceipt)>();
1202        let (result_tx, result_rx) = oneshot::channel();
1203
1204        self.executor
1205            .spawn_blocking_named("builder-roots-task", || {
1206                let mut transactions = Vec::new();
1207                let mut senders = Vec::new();
1208
1209                let mut transactions_root = OrderedTrieRootEncodedBuilder::new();
1210                let mut receipts_root = OrderedTrieRootEncodedBuilder::new();
1211                let mut receipts_bloom = Bloom::ZERO;
1212                let mut encoded_block_transactions = EncodedBlockTransactionsBuilder::default();
1213
1214                let mut buf = Vec::new();
1215
1216                for (tx, receipt) in transactions_rx.into_iter() {
1217                    let (tx, sender) = tx.into_parts();
1218                    buf.clear();
1219                    tx.encode_2718(&mut buf);
1220                    transactions_root.push_next(&buf);
1221                    encoded_block_transactions.push(&tx, &buf);
1222                    transactions.push(tx);
1223                    senders.push(sender);
1224
1225                    let receipt = receipt.with_bloom_ref();
1226
1227                    buf.clear();
1228                    receipt.encode_2718(&mut buf);
1229                    receipts_root.push_next(&buf);
1230                    receipts_bloom |= receipt.bloom();
1231                }
1232                let transactions_root = transactions_root.finalize();
1233                let receipts_root = receipts_root.finalize();
1234                let _ = result_tx.send(RootsTaskResult {
1235                    transactions_root,
1236                    receipts_root,
1237                    receipts_bloom,
1238                    transactions,
1239                    senders,
1240                    encoded_block_transactions: encoded_block_transactions.finish(),
1241                });
1242            });
1243
1244        (transactions_tx, result_rx)
1245    }
1246
1247    fn spawn_bal_task(&self, mut state_root_task_hook: Option<impl OnStateHook>) -> BalTaskHandle {
1248        let (task_tx, task_rx) = mpsc::channel::<BalMessage>();
1249        let (bal_tx, bal_rx) = oneshot::channel();
1250        self.executor.spawn_blocking_named("builder-bal-task", || {
1251            let mut bal_state =
1252                reth_revm::database_interface::bal::BalState::new().with_bal_builder();
1253            for msg in task_rx {
1254                match msg {
1255                    BalMessage::BumpIndex => {
1256                        bal_state.bump_bal_index();
1257                    }
1258                    BalMessage::State(state) => {
1259                        bal_state.commit(&state);
1260                        if let Some(state_root_task_hook) = &mut state_root_task_hook {
1261                            state_root_task_hook.on_state(state);
1262                        }
1263                    }
1264                }
1265            }
1266
1267            drop(state_root_task_hook);
1268            let bal: Bal = bal_state.take_built_alloy_bal().unwrap().into();
1269            let mut encoded = Vec::new();
1270            bal.encode(&mut encoded);
1271            let bal_hash = keccak256(&encoded);
1272
1273            let _ = bal_tx.send((encoded.into(), bal_hash));
1274        });
1275
1276        BalTaskHandle {
1277            msg_tx: task_tx,
1278            bal_rx,
1279        }
1280    }
1281}
1282
1283struct BalTaskHandle {
1284    msg_tx: mpsc::Sender<BalMessage>,
1285    bal_rx: oneshot::Receiver<(Bytes, B256)>,
1286}
1287
1288impl BalTaskHandle {
1289    fn state_hook(&self) -> impl OnStateHook {
1290        let msg_tx = self.msg_tx.clone();
1291        move |state: EvmState| {
1292            let _ = msg_tx.send(BalMessage::State(state));
1293        }
1294    }
1295
1296    fn bump_bal_index(&self) {
1297        let _ = self.msg_tx.send(BalMessage::BumpIndex);
1298    }
1299
1300    fn into_bal_rx(self) -> oneshot::Receiver<(Bytes, B256)> {
1301        self.bal_rx
1302    }
1303}
1304
1305enum BalMessage {
1306    State(EvmState),
1307    BumpIndex,
1308}
1309
1310pub fn is_more_subblocks(
1311    best_payload: Option<&TempoBuiltPayload>,
1312    subblocks: &[RecoveredSubBlock],
1313) -> bool {
1314    let Some(best_payload) = best_payload else {
1315        return false;
1316    };
1317    let Some(best_metadata) = best_payload
1318        .block()
1319        .body()
1320        .transactions
1321        .iter()
1322        .rev()
1323        .filter(|tx| tx.is_system_tx())
1324        .find_map(|tx| Vec::<SubBlockMetadata>::decode(&mut tx.input().as_ref()).ok())
1325    else {
1326        return false;
1327    };
1328
1329    subblocks.len() > best_metadata.len()
1330}
1331
1332/// Overrides the block's fee recipient (beneficiary) with the value from the
1333/// V2 validator config contract, if the contract is active and returns a
1334/// non-zero address for the given `public_key`.
1335fn maybe_override_fee_recipient<DB: Database>(
1336    executor: &mut impl BlockExecutor<Evm = TempoEvm<DB>>,
1337    attributes: &TempoPayloadAttributes,
1338) {
1339    let Some(public_key) = attributes.proposer_public_key() else {
1340        return;
1341    };
1342    let ctx = executor.evm_mut().ctx_mut();
1343    if !ctx.cfg.spec.is_t2() {
1344        return;
1345    }
1346
1347    // We are using the database as a read-only storage context to avoid modifying the journal state.
1348    // Reading slots here might be dangerous because they would end up being warmed and might affect gas accounting.
1349    match ctx.journaled_state.database.with_read_only_storage_ctx(
1350        ctx.cfg.spec,
1351        StorageActions::disabled(),
1352        || -> Result<Option<Address>, PayloadBuilderError> {
1353            let parent_number = ctx.block.number.saturating_to::<u64>() - 1;
1354
1355            let config = ValidatorConfigV2::default();
1356            if !config
1357                .is_initialized()
1358                .map_err(PayloadBuilderError::other)?
1359            {
1360                return Ok(None);
1361            }
1362            let init_height = config
1363                .get_initialized_at_height()
1364                .map_err(PayloadBuilderError::other)?;
1365            if init_height > parent_number {
1366                return Ok(None);
1367            }
1368            let on_chain = config
1369                .validator_by_public_key(*public_key)
1370                .map(|v| v.feeRecipient)
1371                .map_err(PayloadBuilderError::other)?;
1372            Ok((!on_chain.is_zero()).then_some(on_chain))
1373        },
1374    ) {
1375        Ok(Some(fee_recipient)) => {
1376            debug!(%fee_recipient, "resolved fee recipient from contract");
1377            executor.evm_mut().ctx_mut().block.beneficiary = fee_recipient;
1378        }
1379        Ok(None) => {}
1380        Err(err) => {
1381            warn!(%err, "failed resolving fee recipient from contract; using fallback");
1382        }
1383    }
1384}
1385
1386#[derive(Debug)]
1387enum BuilderTx {
1388    Pooled(Arc<ValidPoolTransaction<TempoPooledTransaction>>),
1389    Owned(Box<Recovered<TempoTxEnvelope>>),
1390}
1391
1392impl BuilderTx {
1393    fn into_parts(self) -> (TempoTxEnvelope, Address) {
1394        match self {
1395            Self::Pooled(tx) => tx.transaction.inner().clone().into_parts(),
1396            Self::Owned(tx) => tx.into_parts(),
1397        }
1398    }
1399}
1400
1401/// Result produced by the roots task while finalizing payload block data.
1402#[derive(Debug)]
1403pub(crate) struct RootsTaskResult {
1404    /// The root hash of the transaction trie.
1405    transactions_root: B256,
1406    /// The root hash of the receipts trie.
1407    receipts_root: B256,
1408    /// The receipts bloom filter.
1409    receipts_bloom: Bloom,
1410    /// The transactions included in the block.
1411    transactions: Vec<TempoTxEnvelope>,
1412    /// The senders of the transactions.
1413    senders: Vec<Address>,
1414    /// The RLP encoded transaction list for the block body.
1415    ///
1416    /// Since roots task already encodes every transaction for the transaction trie,
1417    /// we can reuse those bytes for the [`ExecutionBlockEncoder`].
1418    encoded_block_transactions: EncodedBlockTransactionList,
1419}
1420
1421#[cfg(test)]
1422mod tests {
1423    use super::*;
1424    use alloy_consensus::BlockBody;
1425    use alloy_primitives::{Address, B256, Bytes};
1426    use core::num::NonZeroU64;
1427    use reth_primitives_traits::Block as _;
1428    use tempo_payload_types::EncodedBlock;
1429    use tempo_primitives::{
1430        AASigned, Block, SignedSubBlock, SubBlock, SubBlockVersion, TempoSignature,
1431        TempoTransaction,
1432    };
1433
1434    fn nz(value: u64) -> NonZeroU64 {
1435        NonZeroU64::new(value).expect("test valid_before must be non-zero")
1436    }
1437
1438    trait TestExt {
1439        fn random() -> Self;
1440        fn with_valid_before(_: Option<NonZeroU64>) -> Self
1441        where
1442            Self: Sized,
1443        {
1444            Self::random()
1445        }
1446    }
1447
1448    impl TestExt for SubBlockMetadata {
1449        fn random() -> Self {
1450            Self {
1451                version: SubBlockVersion::V1,
1452                validator: B256::random(),
1453                fee_recipient: Address::random(),
1454                signature: Bytes::new(),
1455            }
1456        }
1457    }
1458
1459    impl TestExt for RecoveredSubBlock {
1460        fn random() -> Self {
1461            Self::with_valid_before(None)
1462        }
1463
1464        fn with_valid_before(valid_before: Option<NonZeroU64>) -> Self {
1465            let tx = TempoTxEnvelope::AA(AASigned::new_unhashed(
1466                TempoTransaction {
1467                    valid_before,
1468                    ..Default::default()
1469                },
1470                TempoSignature::default(),
1471            ));
1472            let signed = SignedSubBlock {
1473                inner: SubBlock {
1474                    version: SubBlockVersion::V1,
1475                    parent_hash: B256::random(),
1476                    fee_recipient: Address::random(),
1477                    transactions: vec![tx],
1478                },
1479                signature: Bytes::new(),
1480            };
1481            Self::new_unchecked(signed, vec![Address::ZERO], B256::ZERO)
1482        }
1483    }
1484
1485    fn payload_with_metadata(count: usize) -> TempoBuiltPayload {
1486        let metadata: Vec<_> = (0..count).map(|_| SubBlockMetadata::random()).collect();
1487        let input: Bytes = alloy_rlp::encode(&metadata).into();
1488        let tx = TempoTxEnvelope::Legacy(Signed::new_unhashed(
1489            TxLegacy {
1490                chain_id: None,
1491                nonce: 0,
1492                gas_price: 0,
1493                gas_limit: 0,
1494                to: Address::random().into(),
1495                value: U256::ZERO,
1496                input,
1497            },
1498            TEMPO_SYSTEM_TX_SIGNATURE,
1499        ));
1500        let block = Block {
1501            header: TempoHeader::default(),
1502            body: BlockBody {
1503                transactions: vec![tx],
1504                ommers: vec![],
1505                withdrawals: None,
1506            },
1507        }
1508        .try_into_recovered()
1509        .unwrap();
1510        let eth = EthBuiltPayload::new(Arc::new(block), U256::ZERO, None, None);
1511        TempoBuiltPayload::new(
1512            eth,
1513            None,
1514            None,
1515            Duration::ZERO,
1516            Duration::ZERO,
1517            NON_TRANSACTION_SIZE_ESTIMATE,
1518            EncodedBlock::default(),
1519        )
1520    }
1521
1522    #[test]
1523    fn test_is_more_subblocks() {
1524        // None payload always returns false
1525        assert!(!is_more_subblocks(None, &[]));
1526        assert!(!is_more_subblocks(None, &[RecoveredSubBlock::random()]));
1527
1528        // Equal count returns false (1 == 1)
1529        let payload = payload_with_metadata(1);
1530        assert!(!is_more_subblocks(
1531            Some(&payload),
1532            &[RecoveredSubBlock::random()]
1533        ));
1534
1535        // More subblocks returns true (2 > 1)
1536        assert!(is_more_subblocks(
1537            Some(&payload),
1538            &[RecoveredSubBlock::random(), RecoveredSubBlock::random()]
1539        ));
1540
1541        // Fewer subblocks returns false (1 < 2)
1542        let payload = payload_with_metadata(2);
1543        assert!(!is_more_subblocks(
1544            Some(&payload),
1545            &[RecoveredSubBlock::random()]
1546        ));
1547
1548        // Empty metadata, empty subblocks returns false (0 > 0 is false)
1549        let payload = payload_with_metadata(0);
1550        assert!(!is_more_subblocks(Some(&payload), &[]));
1551
1552        // Empty metadata, one subblock returns true (1 > 0)
1553        assert!(is_more_subblocks(
1554            Some(&payload),
1555            &[RecoveredSubBlock::random()]
1556        ));
1557    }
1558
1559    #[test]
1560    fn test_extra_data_flow_in_attributes() {
1561        // Test that extra_data in attributes can be accessed correctly
1562        let extra_data = Bytes::from(vec![42, 43, 44, 45, 46]);
1563
1564        let attrs = TempoPayloadAttributes::new(None, 1, 0, extra_data.clone(), None, Vec::new);
1565
1566        assert_eq!(attrs.extra_data(), &extra_data);
1567
1568        // Verify the data is as expected
1569        let injected_data = attrs.extra_data().clone();
1570
1571        assert_eq!(injected_data, extra_data);
1572    }
1573
1574    #[test]
1575    fn test_recovered_subblock_has_expired_transactions_boundary() {
1576        // valid_before == timestamp → expired
1577        let subblock = RecoveredSubBlock::with_valid_before(Some(nz(1000)));
1578        assert!(subblock.has_expired_transactions(1000));
1579
1580        // valid_before < timestamp → expired
1581        assert!(subblock.has_expired_transactions(1001));
1582
1583        // valid_before > timestamp → NOT expired
1584        assert!(!subblock.has_expired_transactions(999));
1585
1586        // No valid_before → NOT expired
1587        let subblock_no_expiry = RecoveredSubBlock::with_valid_before(None);
1588        assert!(!subblock_no_expiry.has_expired_transactions(1000));
1589    }
1590}