Skip to main content

tempo_consensus/
subblocks.rs

1use crate::{consensus::Digest, epoch::SchemeProvider};
2use alloy_consensus::{BlockHeader, Transaction, transaction::TxHashRef};
3use alloy_primitives::{Address, B256, BlockHash, Bytes, TxHash};
4use alloy_rlp::Decodable;
5use commonware_codec::DecodeExt;
6use commonware_consensus::{
7    Epochable, Reporter, Viewable,
8    simplex::{
9        elector::Random,
10        scheme::bls12381_threshold::vrf::{Certificate, Scheme},
11        types::Activity,
12    },
13    types::{Epocher as _, FixedEpocher, Height, Round, View},
14};
15use commonware_cryptography::{
16    Signer, Verifier,
17    bls12381::primitives::variant::MinSig,
18    certificate::Provider,
19    ed25519,
20    ed25519::{PrivateKey, PublicKey},
21};
22use commonware_p2p::{Receiver, Recipients, Sender};
23use commonware_runtime::{Handle, IoBuf, Metrics, Pacer, Spawner};
24use eyre::{Context, OptionExt};
25use futures::{FutureExt as _, StreamExt, channel::mpsc};
26use indexmap::IndexMap;
27use parking_lot::Mutex;
28use reth_consensus_common::validation::MAX_RLP_BLOCK_SIZE;
29use reth_evm::{Evm, revm::database::State};
30use reth_node_builder::ConfigureEvm;
31use reth_primitives_traits::Recovered;
32use reth_provider::{
33    BlockReader, BlockSource, ProviderError, StateProviderBox, StateProviderFactory,
34};
35use reth_revm::database::StateProviderDatabase;
36use std::{
37    pin::Pin,
38    sync::{Arc, mpsc::RecvError},
39    time::{Duration, Instant},
40};
41use tempo_chainspec::hardfork::TempoHardforks;
42use tempo_node::{TempoFullNode, evm::evm::TempoEvm};
43use tempo_primitives::{
44    RecoveredSubBlock, SignedSubBlock, SubBlock, SubBlockVersion, TempoTxEnvelope,
45};
46use tokio::sync::broadcast;
47use tracing::{Instrument, Level, Span, debug, error, instrument, warn};
48
49/// Maximum number of stored subblock transactions. Used to prevent DOS attacks.
50///
51/// NOTE: included txs are organically cleared when building the next subblock
52/// because they become invalid once their nonce is used.
53const MAX_SUBBLOCK_TXS: usize = 100_000;
54
55pub(crate) struct Config<TContext> {
56    pub(crate) context: TContext,
57    pub(crate) signer: PrivateKey,
58    pub(crate) scheme_provider: SchemeProvider,
59    pub(crate) node: Arc<TempoFullNode>,
60    pub(crate) fee_recipient: Address,
61    pub(crate) time_to_build_subblock: Duration,
62    pub(crate) subblock_broadcast_interval: Duration,
63    pub(crate) epoch_strategy: FixedEpocher,
64}
65
66/// Task managing collected subblocks.
67///
68/// This actor is responsible for tracking consensus events and determining
69/// current tip of the chain and next block's proposer.
70///
71/// Once next block proposer is known, we immediately start building a new subblock.
72/// Once it's built, we broadcast it to the next proposer directly.
73///
74/// Upon receiving a subblock from the network, we ensure that we are
75/// the proposer and verify the block on top of latest state.
76pub(crate) struct Actor<TContext> {
77    /// Sender of messages to the service.
78    actions_tx: mpsc::UnboundedSender<Message>,
79    /// Receiver of events to the service.
80    actions_rx: mpsc::UnboundedReceiver<Message>,
81    /// Stream of subblock transactions from RPC.
82    subblock_transactions_rx: broadcast::Receiver<Recovered<TempoTxEnvelope>>,
83    /// Handle to a task building a new subblock.
84    our_subblock: PendingSubblock,
85
86    /// Scheme provider to track participants of each epoch.
87    scheme_provider: SchemeProvider,
88    /// Commonware runtime context.
89    context: TContext,
90    /// ed25519 private key used for consensus.
91    signer: PrivateKey,
92    /// Execution layer node.
93    node: Arc<TempoFullNode>,
94    /// Fee recipient address to set for subblocks.
95    fee_recipient: Address,
96    /// Timeout for building a subblock.
97    time_to_build_subblock: Duration,
98    /// How often to broadcast subblocks to the current proposer.
99    subblock_broadcast_interval: Duration,
100    /// The epoch strategy used by tempo.
101    epoch_strategy: FixedEpocher,
102
103    /// Current consensus tip. Includes highest observed round, digest and certificate.
104    consensus_tip: Option<(Round, BlockHash, Certificate<MinSig>)>,
105
106    /// Collected subblocks keyed by validator public key.
107    subblocks: IndexMap<B256, RecoveredSubBlock>,
108    /// Subblock candidate transactions.
109    subblock_transactions: Arc<Mutex<IndexMap<TxHash, Arc<Recovered<TempoTxEnvelope>>>>>,
110}
111
112impl<TContext: Spawner + Metrics + Pacer> Actor<TContext> {
113    pub(crate) fn new(
114        Config {
115            context,
116            signer,
117            scheme_provider,
118            node,
119            fee_recipient,
120            time_to_build_subblock,
121            subblock_broadcast_interval,
122            epoch_strategy,
123        }: Config<TContext>,
124    ) -> Self {
125        let (actions_tx, actions_rx) = mpsc::unbounded();
126        Self {
127            our_subblock: PendingSubblock::None,
128            subblock_transactions_rx: node.add_ons_handle.eth_api().subblock_transactions_rx(),
129            scheme_provider,
130            actions_tx,
131            actions_rx,
132            context,
133            signer,
134            node,
135            fee_recipient,
136            time_to_build_subblock,
137            subblock_broadcast_interval,
138            epoch_strategy,
139            consensus_tip: None,
140            subblocks: Default::default(),
141            subblock_transactions: Default::default(),
142        }
143    }
144
145    /// Returns a handle to the subblocks service.
146    pub(crate) fn mailbox(&self) -> Mailbox {
147        Mailbox {
148            tx: self.actions_tx.clone(),
149        }
150    }
151
152    pub(crate) async fn run(
153        mut self,
154        (mut network_tx, mut network_rx): (
155            impl Sender<PublicKey = PublicKey>,
156            impl Receiver<PublicKey = PublicKey>,
157        ),
158    ) {
159        loop {
160            let (subblock_task, broadcast_interval) = match &mut self.our_subblock {
161                PendingSubblock::None => (None, None),
162                PendingSubblock::Task(task) => (Some(task), None),
163                PendingSubblock::Built(built) => (None, Some(&mut built.broadcast_interval)),
164            };
165
166            tokio::select! {
167                biased;
168
169                // Handle messages from consensus engine and service handle.
170                Some(action) = self.actions_rx.next() => {
171                    self.on_new_message(action);
172                },
173                // Handle new subblock transactions.
174                result = self.subblock_transactions_rx.recv() => {
175                    match result {
176                        Ok(transaction) => {
177                            self.on_new_subblock_transaction(transaction);
178                        }
179                        Err(broadcast::error::RecvError::Lagged(count)) => {
180                            warn!(
181                                lagged_count = count,
182                                "subblock transaction receiver lagged, {} messages dropped",
183                                count
184                            );
185                        }
186                        Err(broadcast::error::RecvError::Closed) => {
187                            error!("subblock transactions channel closed unexpectedly");
188                            break;
189                        }
190                    }
191                },
192                // Handle messages from the network.
193                Ok((sender, message)) = network_rx.recv() => {
194                    let _ = self.on_network_message(sender, message, &mut network_tx).await;
195                },
196                // Handle built subblocks.
197                subblock = if let Some(task) = subblock_task {
198                    (&mut task.handle).fuse()
199                } else {
200                    futures::future::Fuse::terminated()
201                } => {
202                    let task = self.our_subblock.take_task().unwrap();
203                    self.on_built_subblock(subblock, task.proposer).await;
204                }
205                // Handle subblocks broadcast.
206                _ = if let Some(broadcast_interval) = broadcast_interval {
207                    broadcast_interval.fuse()
208                } else {
209                    futures::future::Fuse::terminated()
210                } => {
211                    self.broadcast_built_subblock(&mut network_tx).await;
212                }
213            }
214        }
215    }
216
217    /// Returns the current consensus tip.
218    fn tip(&self) -> Option<BlockHash> {
219        self.consensus_tip.as_ref().map(|(_, tip, _)| *tip)
220    }
221
222    fn on_new_message(&mut self, action: Message) {
223        match action {
224            Message::GetSubBlocks { parent, response } => {
225                // This should never happen, but just in case.
226                if self.tip() != Some(parent) {
227                    let _ = response.send(Vec::new());
228                    return;
229                }
230                // Return all subblocks we've collected for this block.
231                let subblocks = self.subblocks.values().cloned().collect();
232                let _ = response.send(subblocks);
233            }
234            Message::Consensus(activity) => self.on_consensus_event(*activity),
235            Message::ValidatedSubblock(subblock) => self.on_validated_subblock(subblock),
236        }
237    }
238
239    #[instrument(skip_all, fields(transaction.tx_hash = %transaction.tx_hash()))]
240    fn on_new_subblock_transaction(&self, transaction: Recovered<TempoTxEnvelope>) {
241        if !transaction
242            .subblock_proposer()
243            .is_some_and(|k| k.matches(self.signer.public_key()))
244        {
245            return;
246        }
247        let mut txs = self.subblock_transactions.lock();
248        if txs.len() >= MAX_SUBBLOCK_TXS {
249            return;
250        }
251        txs.insert(*transaction.tx_hash(), Arc::new(transaction));
252    }
253
254    /// Tracking of the current sconsensus state by listening to notarizations and nullifications.
255    #[instrument(skip_all, fields(event.epoch = %event.epoch(), event.view = %event.view()))]
256    fn on_consensus_event(&mut self, event: Activity<Scheme<PublicKey, MinSig>, Digest>) {
257        let (new_tip, new_round, new_cert) = match event {
258            Activity::Notarization(n) => {
259                (Some(n.proposal.payload.0), n.proposal.round, n.certificate)
260            }
261            Activity::Finalization(n) => {
262                (Some(n.proposal.payload.0), n.proposal.round, n.certificate)
263            }
264            Activity::Nullification(n) => (None, n.round, n.certificate),
265            _ => return,
266        };
267
268        if let Some((round, tip, cert)) = &mut self.consensus_tip
269            && *round <= new_round
270        {
271            *round = new_round;
272            *cert = new_cert;
273
274            if let Some(new_tip) = new_tip
275                && *tip != new_tip
276            {
277                // Clear collected subblocks if we have a new tip.
278                self.subblocks.clear();
279                *tip = new_tip;
280            }
281        } else if self.consensus_tip.is_none()
282            && let Some(new_tip) = new_tip
283        {
284            // Initialize consensus tip once we know the tip block hash.
285            self.consensus_tip = Some((new_round, new_tip, new_cert));
286        }
287
288        let Some((round, tip, certificate)) = &self.consensus_tip else {
289            return;
290        };
291
292        let Ok(Some(header)) = self
293            .node
294            .provider
295            .find_block_by_hash(*tip, BlockSource::Any)
296        else {
297            debug!(?tip, "missing header for the tip block at {tip}");
298            return;
299        };
300
301        let epoch_of_next_block = self
302            .epoch_strategy
303            .containing(Height::new(header.number() + 1))
304            .expect("epoch strategy covers all epochs")
305            .epoch();
306
307        // Can't proceed without knowing a validator set for the current epoch.
308        //
309        // TODO(hamdi): When finalizing a boundary block, the scheme for the next epoch is not yet registered meaning
310        // we skip the subblock building task. This issue is scoped to the boundary and will be fixed.
311        let Some(scheme) = self.scheme_provider.scoped(epoch_of_next_block) else {
312            debug!(%epoch_of_next_block, "scheme not found for epoch");
313            return;
314        };
315
316        let next_round = if round.epoch() == epoch_of_next_block {
317            Round::new(round.epoch(), round.view().next())
318        } else {
319            Round::new(epoch_of_next_block, View::new(1))
320        };
321
322        let next_proposer = Random::select_leader::<MinSig>(
323            next_round,
324            scheme.participants().len() as u32,
325            certificate.get().map(|signature| signature.seed_signature),
326        );
327        let next_proposer = scheme.participants()[next_proposer.get() as usize].clone();
328
329        debug!(?next_proposer, ?next_round, "determined next proposer");
330
331        // Spawn new subblock building task if the current one is assuming different proposer or parent hash.
332        if self.our_subblock.parent_hash() != Some(*tip)
333            || self.our_subblock.target_proposer() != Some(&next_proposer)
334        {
335            debug!(%tip, %next_proposer, "building new subblock");
336            self.build_new_subblock(*tip, next_proposer, scheme);
337        }
338    }
339
340    fn build_new_subblock(
341        &mut self,
342        parent_hash: BlockHash,
343        next_proposer: PublicKey,
344        scheme: Arc<Scheme<PublicKey, MinSig>>,
345    ) {
346        let transactions = self.subblock_transactions.clone();
347        let node = self.node.clone();
348        let num_validators = scheme.participants().len();
349        let signer = self.signer.clone();
350        let fee_recipient = self.fee_recipient;
351        let timeout = self.time_to_build_subblock;
352        let span = Span::current();
353        let handle = self
354            .context
355            .with_label("validate_subblock")
356            .shared(true)
357            .spawn(move |_| {
358                build_subblock(
359                    transactions,
360                    node,
361                    parent_hash,
362                    num_validators,
363                    signer,
364                    fee_recipient,
365                    timeout,
366                )
367                .instrument(span)
368            });
369
370        self.our_subblock = PendingSubblock::Task(BuildSubblockTask {
371            handle,
372            parent_hash,
373            proposer: next_proposer,
374        });
375    }
376
377    #[instrument(skip_all, err(level = Level::DEBUG), fields(sender = %sender, msg_bytes = message.len()))]
378    async fn on_network_message(
379        &mut self,
380        sender: PublicKey,
381        message: IoBuf,
382        network_tx: &mut impl Sender<PublicKey = PublicKey>,
383    ) -> eyre::Result<()> {
384        let message =
385            SubblocksMessage::decode(message).wrap_err("failed to decode network message")?;
386
387        let subblock = match message {
388            SubblocksMessage::Subblock(subblock) => subblock,
389            // Process acknowledgements
390            SubblocksMessage::Ack(ack) => {
391                if let PendingSubblock::Built(built) = &mut self.our_subblock
392                    && built.proposer == sender
393                    && ack == built.subblock.signature_hash()
394                {
395                    debug!("received acknowledgement from the next proposer");
396                    built.stop_broadcasting();
397                } else {
398                    warn!(%ack, "received invalid acknowledgement");
399                }
400
401                return Ok(());
402            }
403        };
404
405        let Some(tip) = self.tip() else {
406            return Err(eyre::eyre!("missing tip of the chain"));
407        };
408
409        // Skip subblocks that are not built on top of the tip.
410        eyre::ensure!(
411            subblock.parent_hash == tip,
412            "invalid subblock parent, expected {tip}, got {}",
413            subblock.parent_hash
414        );
415
416        // Send acknowledgement to the sender.
417        //
418        // We only send it after we've validated the tip to make sure that our view
419        // of the chain matches the one of the view of subblock sender. Otherwise,
420        // we expect to receive the subblock again.
421        let _ = network_tx
422            .send(
423                Recipients::One(sender.clone()),
424                SubblocksMessage::Ack(subblock.signature_hash()).encode(),
425                true,
426            )
427            .await;
428
429        debug!("validating new subblock");
430
431        // Spawn task to validate the subblock.
432        let node = self.node.clone();
433        let validated_subblocks_tx = self.actions_tx.clone();
434        let scheme_provider = self.scheme_provider.clone();
435        let epoch_strategy = self.epoch_strategy.clone();
436        let span = Span::current();
437        self.context.clone().shared(true).spawn(move |_| {
438            validate_subblock(
439                sender.clone(),
440                node,
441                subblock,
442                validated_subblocks_tx,
443                scheme_provider,
444                epoch_strategy,
445            )
446            .instrument(span)
447        });
448
449        Ok(())
450    }
451
452    #[instrument(skip_all, fields(subblock.validator = %subblock.validator(), subblock.parent_hash = %subblock.parent_hash))]
453    fn on_validated_subblock(&mut self, subblock: RecoveredSubBlock) {
454        // Skip subblock if we are already past its parent
455        if Some(subblock.parent_hash) != self.tip() {
456            return;
457        }
458
459        debug!(subblock = ?subblock, "validated subblock");
460
461        self.subblocks.insert(subblock.validator(), subblock);
462    }
463
464    #[instrument(skip_all)]
465    async fn on_built_subblock(
466        &mut self,
467        subblock: Result<RecoveredSubBlock, commonware_runtime::Error>,
468        next_proposer: PublicKey,
469    ) {
470        let subblock = match subblock {
471            Ok(subblock) => subblock,
472            Err(error) => {
473                warn!(%error, "failed to build subblock");
474                return;
475            }
476        };
477
478        if Some(subblock.parent_hash) != self.tip() {
479            return;
480        }
481
482        self.our_subblock = PendingSubblock::Built(BuiltSubblock {
483            subblock,
484            proposer: next_proposer,
485            // ticks immediately
486            broadcast_interval: Box::pin(futures::future::ready(())),
487        });
488    }
489
490    #[instrument(skip_all)]
491    async fn broadcast_built_subblock(
492        &mut self,
493        network_tx: &mut impl Sender<PublicKey = PublicKey>,
494    ) {
495        let PendingSubblock::Built(built) = &mut self.our_subblock else {
496            return;
497        };
498
499        // Schedule next broadcast in `subblock_broadcast_interval`
500        built.broadcast_interval = Box::pin(self.context.sleep(self.subblock_broadcast_interval));
501
502        debug!(
503            ?built.subblock,
504            next_proposer = %built.proposer,
505            "sending subblock to the next proposer"
506        );
507
508        if built.proposer != self.signer.public_key() {
509            let _ = network_tx
510                .send(
511                    Recipients::One(built.proposer.clone()),
512                    SubblocksMessage::Subblock((*built.subblock).clone()).encode(),
513                    true,
514                )
515                .await;
516        } else {
517            let subblock = built.subblock.clone();
518            built.stop_broadcasting();
519            self.on_validated_subblock(subblock);
520        }
521    }
522}
523
524/// Actions processed by the subblocks service.
525#[derive(Debug)]
526enum Message {
527    /// Returns all subblocks collected so far.
528    ///
529    /// This will return nothing if parent hash does not match the current chain view
530    /// of the service or if no subblocks have been collected yet.
531    GetSubBlocks {
532        /// Parent block to return subblocks for.
533        parent: BlockHash,
534        /// Response channel.
535        response: std::sync::mpsc::SyncSender<Vec<RecoveredSubBlock>>,
536    },
537
538    /// Reports a new consensus event.
539    Consensus(Box<Activity<Scheme<PublicKey, MinSig>, Digest>>),
540
541    /// Reports a new validated subblock.
542    ValidatedSubblock(RecoveredSubBlock),
543}
544
545/// The current state of our subblock.
546#[derive(Default)]
547enum PendingSubblock {
548    /// No subblock is available.
549    #[default]
550    None,
551    /// Subblock is currently being built.
552    Task(BuildSubblockTask),
553    /// Subblock has been built and is ready to be sent.
554    Built(BuiltSubblock),
555}
556
557impl PendingSubblock {
558    /// Returns the current [`BuildSubblockTask`] if it exists and switches state to [`PendingSubblock::None`].
559    fn take_task(&mut self) -> Option<BuildSubblockTask> {
560        if let Self::Task(task) = std::mem::take(self) {
561            Some(task)
562        } else {
563            None
564        }
565    }
566
567    /// Returns the parent hash of the subblock that was built or is being built.
568    fn parent_hash(&self) -> Option<BlockHash> {
569        match self {
570            Self::Task(task) => Some(task.parent_hash),
571            Self::Built(built) => Some(built.subblock.parent_hash),
572            Self::None => None,
573        }
574    }
575
576    /// Returns the proposer we are going to send the subblock to.
577    fn target_proposer(&self) -> Option<&PublicKey> {
578        match self {
579            Self::Task(task) => Some(&task.proposer),
580            Self::Built(built) => Some(&built.proposer),
581            Self::None => None,
582        }
583    }
584}
585
586/// Task for building a subblock.
587struct BuildSubblockTask {
588    /// Handle to the spawned task.
589    handle: Handle<RecoveredSubBlock>,
590    /// Parent hash subblock is being built on top of.
591    parent_hash: BlockHash,
592    /// Proposer we are going to send the subblock to.
593    proposer: PublicKey,
594}
595
596/// A built subblock ready to be sent.
597struct BuiltSubblock {
598    /// Subblock that has been built.
599    subblock: RecoveredSubBlock,
600    /// Proposer we are going to send the subblock to.
601    proposer: PublicKey,
602    /// Interval for subblock broadcast.
603    broadcast_interval: Pin<Box<dyn Future<Output = ()> + Send>>,
604}
605
606impl BuiltSubblock {
607    /// Stops broadcasting the subblock once the acknowledgement is received.
608    fn stop_broadcasting(&mut self) {
609        self.broadcast_interval = Box::pin(futures::future::pending());
610    }
611}
612
613/// Network messages used in the subblocks service.
614#[derive(Debug)]
615enum SubblocksMessage {
616    /// A new subblock sent to the proposer.
617    Subblock(SignedSubBlock),
618    /// Acknowledgment about receiving a subblock with given hash.
619    Ack(B256),
620}
621
622impl SubblocksMessage {
623    /// Encodes the message into a [`bytes::Bytes`].
624    fn encode(self) -> bytes::Bytes {
625        match self {
626            Self::Subblock(subblock) => alloy_rlp::encode(&subblock).into(),
627            Self::Ack(hash) => bytes::Bytes::copy_from_slice(hash.as_ref()),
628        }
629    }
630
631    /// Decodes a message from the given [`bytes::Bytes`].
632    fn decode(message: IoBuf) -> alloy_rlp::Result<Self> {
633        if message.len() == 32 {
634            let hash = B256::from_slice(message.as_ref());
635            Ok(Self::Ack(hash))
636        } else {
637            let subblock = SignedSubBlock::decode(&mut message.as_ref())?;
638            Ok(Self::Subblock(subblock))
639        }
640    }
641}
642
643/// Handle to the spawned subblocks service.
644#[derive(Clone)]
645pub(crate) struct Mailbox {
646    tx: mpsc::UnboundedSender<Message>,
647}
648
649impl Mailbox {
650    pub(crate) fn get_subblocks(
651        &self,
652        parent: BlockHash,
653    ) -> Result<Vec<RecoveredSubBlock>, RecvError> {
654        let (tx, rx) = std::sync::mpsc::sync_channel(1);
655        let _ = self.tx.unbounded_send(Message::GetSubBlocks {
656            parent,
657            response: tx,
658        });
659        rx.recv()
660    }
661}
662
663impl Reporter for Mailbox {
664    type Activity = Activity<Scheme<PublicKey, MinSig>, Digest>;
665
666    async fn report(&mut self, activity: Self::Activity) -> () {
667        let _ = self
668            .tx
669            .unbounded_send(Message::Consensus(Box::new(activity)));
670    }
671}
672
673fn evm_at_block(
674    node: &TempoFullNode,
675    hash: BlockHash,
676) -> eyre::Result<TempoEvm<State<StateProviderDatabase<StateProviderBox>>>> {
677    let db = State::builder()
678        .with_database(StateProviderDatabase::new(
679            node.provider.state_by_block_hash(hash)?,
680        ))
681        .build();
682    let header = node
683        .provider
684        .find_block_by_hash(hash, BlockSource::Any)?
685        .ok_or(ProviderError::BestBlockNotFound)?;
686
687    Ok(node.evm_config.evm_for_block(db, &header)?)
688}
689
690/// Builds a subblock from candidate transactions we've collected so far.
691///
692/// This will include as many valid transactions as possible within the given timeout.
693#[instrument(skip_all, fields(parent_hash = %parent_hash))]
694async fn build_subblock(
695    transactions: Arc<Mutex<IndexMap<TxHash, Arc<Recovered<TempoTxEnvelope>>>>>,
696    node: Arc<TempoFullNode>,
697    parent_hash: BlockHash,
698    num_validators: usize,
699    signer: PrivateKey,
700    fee_recipient: Address,
701    timeout: Duration,
702) -> RecoveredSubBlock {
703    let start = Instant::now();
704
705    let (transactions, senders) = match evm_at_block(&node, parent_hash) {
706        Ok(mut evm) => {
707            let (mut selected, mut senders, mut to_remove) = (Vec::new(), Vec::new(), Vec::new());
708            let shared_gas_limit = node
709                .config
710                .chain
711                .shared_gas_limit_at(evm.block().timestamp.saturating_to(), evm.block().gas_limit);
712            let gas_budget = shared_gas_limit
713                .checked_div(num_validators as u64)
714                .expect("validator set must not be empty");
715
716            let mut gas_left = gas_budget;
717            let txs = transactions.lock().clone();
718
719            for (tx_hash, tx) in txs {
720                let max_regular_gas =
721                    core::cmp::min(tx.gas_limit(), evm.cfg.tx_gas_limit_cap.unwrap_or(u64::MAX));
722                // Remove transactions over subblock gas budget
723                if max_regular_gas > gas_budget {
724                    warn!(
725                        %tx_hash,
726                        tx_gas_limit = tx.gas_limit(),
727                        max_regular_gas,
728                        gas_budget,
729                        "removing transaction with gas limit exceeding maximum subblock gas budget"
730                    );
731                    to_remove.push(tx_hash);
732                    continue;
733                }
734
735                // Skip transactions that don't fit in remaining budget (may fit in future rounds)
736                if max_regular_gas > gas_left {
737                    continue;
738                }
739
740                if let Err(err) = evm.transact_commit(&*tx) {
741                    warn!(%err, tx_hash = %tx_hash, "invalid subblock candidate transaction");
742                    to_remove.push(tx_hash);
743                    continue;
744                }
745
746                gas_left -= max_regular_gas;
747                selected.push(tx.inner().clone());
748                senders.push(tx.signer());
749
750                if start.elapsed() > timeout {
751                    break;
752                }
753            }
754
755            // If necessary, acquire lock and drop all invalid txs
756            if !to_remove.is_empty() {
757                let mut txs = transactions.lock();
758                for hash in to_remove {
759                    txs.swap_remove(&hash);
760                }
761            }
762
763            (selected, senders)
764        }
765        Err(err) => {
766            warn!(%err, "failed to build an evm at block, building an empty subblock");
767
768            Default::default()
769        }
770    };
771
772    let subblock = SubBlock {
773        version: SubBlockVersion::V1,
774        fee_recipient,
775        parent_hash,
776        transactions,
777    };
778
779    // TODO: Use a namespace for these signatures?
780    let signature = signer.sign(&[], subblock.signature_hash().as_slice());
781    let signed_subblock = SignedSubBlock {
782        inner: subblock,
783        signature: Bytes::copy_from_slice(signature.as_ref()),
784    };
785
786    RecoveredSubBlock::new_unchecked(
787        signed_subblock,
788        senders,
789        B256::from_slice(&signer.public_key()),
790    )
791}
792
793/// Validates a subblock and reports it to the subblocks service.
794///
795/// Validation checks include:
796/// 1. Signature verification
797/// 2. Ensuring that sender is a validator for the block's epoch
798/// 3. Ensuring that all transactions have corresponding nonce key set.
799/// 4. Ensuring that all transactions are valid.
800#[instrument(skip_all, err(level = Level::WARN), fields(sender = %sender))]
801async fn validate_subblock(
802    sender: PublicKey,
803    node: Arc<TempoFullNode>,
804    subblock: SignedSubBlock,
805    actions_tx: mpsc::UnboundedSender<Message>,
806    scheme_provider: SchemeProvider,
807    epoch_strategy: FixedEpocher,
808) -> eyre::Result<()> {
809    let Ok(signature) =
810        ed25519::Signature::decode(&mut subblock.signature.as_ref()).wrap_err("invalid signature")
811    else {
812        return Err(eyre::eyre!("invalid signature"));
813    };
814
815    // TODO: use a namespace for these signatures?
816    if !sender.verify(&[], subblock.signature_hash().as_slice(), &signature) {
817        return Err(eyre::eyre!("invalid signature"));
818    }
819
820    if subblock.transactions.iter().any(|tx| {
821        tx.subblock_proposer()
822            .is_none_or(|proposer| !proposer.matches(&sender))
823    }) {
824        return Err(eyre::eyre!(
825            "all transactions must specify the subblock validator"
826        ));
827    }
828
829    // Recover subblock transactions and convert it into a `RecoveredSubBlock`.
830    let subblock = subblock.try_into_recovered(B256::from_slice(&sender))?;
831
832    let mut evm = evm_at_block(&node, subblock.parent_hash)?;
833
834    let epoch = epoch_strategy
835        .containing(Height::new(evm.block().number.to::<u64>() + 1))
836        .expect("epoch strategy covers all epochs")
837        .epoch();
838    let scheme = scheme_provider
839        .scoped(epoch)
840        .ok_or_eyre("scheme not found")?;
841    let participants = scheme.participants().len() as usize;
842
843    eyre::ensure!(
844        scheme.participants().iter().any(|p| p == &sender),
845        "sender is not a validator"
846    );
847
848    let shared_gas_limit = node
849        .config
850        .chain
851        .shared_gas_limit_at(evm.block().timestamp.saturating_to(), evm.block().gas_limit);
852
853    // Bound subblock size at a value proportional to shared_gas_limit.
854    //
855    // This ensures we never collect too many subblocks to fit into a new proposal.
856    let max_size = (MAX_RLP_BLOCK_SIZE as u128 * u128::from(shared_gas_limit)
857        / u128::from(evm.block().gas_limit)
858        / participants as u128) as usize;
859    if subblock.total_tx_size() > max_size {
860        warn!(
861            size = subblock.total_tx_size(),
862            max_size, "subblock is too large, skipping"
863        );
864        return Ok(());
865    }
866
867    // Bound subblock gas at the per-validator allocation.
868    let gas_budget = shared_gas_limit / participants as u64;
869    let mut total_gas = 0u64;
870    for tx in subblock.transactions_recovered() {
871        let max_regular_gas =
872            core::cmp::min(tx.gas_limit(), evm.cfg.tx_gas_limit_cap.unwrap_or(u64::MAX));
873        total_gas = total_gas.saturating_add(max_regular_gas);
874        if total_gas > gas_budget {
875            warn!(
876                total_gas,
877                gas_budget, "subblock exceeds gas budget, skipping"
878            );
879            return Ok(());
880        }
881    }
882
883    // Ensure all transactions can be committed
884    for tx in subblock.transactions_recovered() {
885        if let Err(err) = evm.transact_commit(tx) {
886            return Err(eyre::eyre!("transaction failed to execute: {err:?}"));
887        }
888    }
889
890    let _ = actions_tx.unbounded_send(Message::ValidatedSubblock(subblock));
891
892    Ok(())
893}