Skip to main content

tempo_commonware_node/
subblocks.rs

1use crate::{consensus::Digest, epoch::SchemeProvider};
2use alloy_consensus::{BlockHeader, Transaction, transaction::TxHashRef};
3use alloy_primitives::{Address, B256, BlockHash, Bytes, TxHash};
4use alloy_rlp::Decodable;
5use commonware_codec::DecodeExt;
6use commonware_consensus::{
7    Epochable, Reporter, Viewable,
8    simplex::{
9        elector::Random,
10        scheme::bls12381_threshold::vrf::{Certificate, Scheme},
11        types::Activity,
12    },
13    types::{Epocher as _, FixedEpocher, Height, Round, View},
14};
15use commonware_cryptography::{
16    Signer, Verifier,
17    bls12381::primitives::variant::MinSig,
18    certificate::Provider,
19    ed25519,
20    ed25519::{PrivateKey, PublicKey},
21};
22use commonware_p2p::{Receiver, Recipients, Sender};
23use commonware_runtime::{Handle, IoBuf, Metrics, Pacer, Spawner};
24use eyre::{Context, OptionExt};
25use futures::{FutureExt as _, StreamExt, channel::mpsc};
26use indexmap::IndexMap;
27use parking_lot::Mutex;
28use reth_consensus_common::validation::MAX_RLP_BLOCK_SIZE;
29use reth_evm::{Evm, revm::database::State};
30use reth_node_builder::ConfigureEvm;
31use reth_primitives_traits::Recovered;
32use reth_provider::{
33    BlockReader, BlockSource, ProviderError, StateProviderBox, StateProviderFactory,
34};
35use reth_revm::database::StateProviderDatabase;
36use std::{
37    pin::Pin,
38    sync::{Arc, mpsc::RecvError},
39    time::{Duration, Instant},
40};
41use tempo_node::{TempoFullNode, consensus::TEMPO_SHARED_GAS_DIVISOR, evm::evm::TempoEvm};
42use tempo_primitives::{
43    RecoveredSubBlock, SignedSubBlock, SubBlock, SubBlockVersion, TempoTxEnvelope,
44};
45use tokio::sync::broadcast;
46use tracing::{Instrument, Level, Span, debug, error, instrument, warn};
47
48/// Maximum number of stored subblock transactions. Used to prevent DOS attacks.
49///
50/// NOTE: included txs are organically cleared when building the next subblock
51/// because they become invalid once their nonce is used.
52const MAX_SUBBLOCK_TXS: usize = 100_000;
53
54pub(crate) struct Config<TContext> {
55    pub(crate) context: TContext,
56    pub(crate) signer: PrivateKey,
57    pub(crate) scheme_provider: SchemeProvider,
58    pub(crate) node: TempoFullNode,
59    pub(crate) fee_recipient: Address,
60    pub(crate) time_to_build_subblock: Duration,
61    pub(crate) subblock_broadcast_interval: Duration,
62    pub(crate) epoch_strategy: FixedEpocher,
63}
64
65/// Task managing collected subblocks.
66///
67/// This actor is responsible for tracking consensus events and determining
68/// current tip of the chain and next block's proposer.
69///
70/// Once next block proposer is known, we immediately start building a new subblock.
71/// Once it's built, we broadcast it to the next proposer directly.
72///
73/// Upon receiving a subblock from the network, we ensure that we are
74/// the proposer and verify the block on top of latest state.
75pub(crate) struct Actor<TContext> {
76    /// Sender of messages to the service.
77    actions_tx: mpsc::UnboundedSender<Message>,
78    /// Receiver of events to the service.
79    actions_rx: mpsc::UnboundedReceiver<Message>,
80    /// Stream of subblock transactions from RPC.
81    subblock_transactions_rx: broadcast::Receiver<Recovered<TempoTxEnvelope>>,
82    /// Handle to a task building a new subblock.
83    our_subblock: PendingSubblock,
84
85    /// Scheme provider to track participants of each epoch.
86    scheme_provider: SchemeProvider,
87    /// Commonware runtime context.
88    context: TContext,
89    /// ed25519 private key used for consensus.
90    signer: PrivateKey,
91    /// Execution layer node.
92    node: TempoFullNode,
93    /// Fee recipient address to set for subblocks.
94    fee_recipient: Address,
95    /// Timeout for building a subblock.
96    time_to_build_subblock: Duration,
97    /// How often to broadcast subblocks to the current proposer.
98    subblock_broadcast_interval: Duration,
99    /// The epoch strategy used by tempo.
100    epoch_strategy: FixedEpocher,
101
102    /// Current consensus tip. Includes highest observed round, digest and certificate.
103    consensus_tip: Option<(Round, BlockHash, Certificate<MinSig>)>,
104
105    /// Collected subblocks keyed by validator public key.
106    subblocks: IndexMap<B256, RecoveredSubBlock>,
107    /// Subblock candidate transactions.
108    subblock_transactions: Arc<Mutex<IndexMap<TxHash, Arc<Recovered<TempoTxEnvelope>>>>>,
109}
110
111impl<TContext: Spawner + Metrics + Pacer> Actor<TContext> {
112    pub(crate) fn new(
113        Config {
114            context,
115            signer,
116            scheme_provider,
117            node,
118            fee_recipient,
119            time_to_build_subblock,
120            subblock_broadcast_interval,
121            epoch_strategy,
122        }: Config<TContext>,
123    ) -> Self {
124        let (actions_tx, actions_rx) = mpsc::unbounded();
125        Self {
126            our_subblock: PendingSubblock::None,
127            subblock_transactions_rx: node.add_ons_handle.eth_api().subblock_transactions_rx(),
128            scheme_provider,
129            actions_tx,
130            actions_rx,
131            context,
132            signer,
133            node,
134            fee_recipient,
135            time_to_build_subblock,
136            subblock_broadcast_interval,
137            epoch_strategy,
138            consensus_tip: None,
139            subblocks: Default::default(),
140            subblock_transactions: Default::default(),
141        }
142    }
143
144    /// Returns a handle to the subblocks service.
145    pub(crate) fn mailbox(&self) -> Mailbox {
146        Mailbox {
147            tx: self.actions_tx.clone(),
148        }
149    }
150
151    pub(crate) async fn run(
152        mut self,
153        (mut network_tx, mut network_rx): (
154            impl Sender<PublicKey = PublicKey>,
155            impl Receiver<PublicKey = PublicKey>,
156        ),
157    ) {
158        loop {
159            let (subblock_task, broadcast_interval) = match &mut self.our_subblock {
160                PendingSubblock::None => (None, None),
161                PendingSubblock::Task(task) => (Some(task), None),
162                PendingSubblock::Built(built) => (None, Some(&mut built.broadcast_interval)),
163            };
164
165            tokio::select! {
166                biased;
167
168                // Handle messages from consensus engine and service handle.
169                Some(action) = self.actions_rx.next() => {
170                    self.on_new_message(action);
171                },
172                // Handle new subblock transactions.
173                result = self.subblock_transactions_rx.recv() => {
174                    match result {
175                        Ok(transaction) => {
176                            self.on_new_subblock_transaction(transaction);
177                        }
178                        Err(broadcast::error::RecvError::Lagged(count)) => {
179                            warn!(
180                                lagged_count = count,
181                                "subblock transaction receiver lagged, {} messages dropped",
182                                count
183                            );
184                        }
185                        Err(broadcast::error::RecvError::Closed) => {
186                            error!("subblock transactions channel closed unexpectedly");
187                            break;
188                        }
189                    }
190                },
191                // Handle messages from the network.
192                Ok((sender, message)) = network_rx.recv() => {
193                    let _ = self.on_network_message(sender, message, &mut network_tx).await;
194                },
195                // Handle built subblocks.
196                subblock = if let Some(task) = subblock_task {
197                    (&mut task.handle).fuse()
198                } else {
199                    futures::future::Fuse::terminated()
200                } => {
201                    let task = self.our_subblock.take_task().unwrap();
202                    self.on_built_subblock(subblock, task.proposer).await;
203                }
204                // Handle subblocks broadcast.
205                _ = if let Some(broadcast_interval) = broadcast_interval {
206                    broadcast_interval.fuse()
207                } else {
208                    futures::future::Fuse::terminated()
209                } => {
210                    self.broadcast_built_subblock(&mut network_tx).await;
211                }
212            }
213        }
214    }
215
216    /// Returns the current consensus tip.
217    fn tip(&self) -> Option<BlockHash> {
218        self.consensus_tip.as_ref().map(|(_, tip, _)| *tip)
219    }
220
221    fn on_new_message(&mut self, action: Message) {
222        match action {
223            Message::GetSubBlocks { parent, response } => {
224                // This should never happen, but just in case.
225                if self.tip() != Some(parent) {
226                    let _ = response.send(Vec::new());
227                    return;
228                }
229                // Return all subblocks we've collected for this block.
230                let subblocks = self.subblocks.values().cloned().collect();
231                let _ = response.send(subblocks);
232            }
233            Message::Consensus(activity) => self.on_consensus_event(*activity),
234            Message::ValidatedSubblock(subblock) => self.on_validated_subblock(subblock),
235        }
236    }
237
238    #[instrument(skip_all, fields(transaction.tx_hash = %transaction.tx_hash()))]
239    fn on_new_subblock_transaction(&self, transaction: Recovered<TempoTxEnvelope>) {
240        if !transaction
241            .subblock_proposer()
242            .is_some_and(|k| k.matches(self.signer.public_key()))
243        {
244            return;
245        }
246        let mut txs = self.subblock_transactions.lock();
247        if txs.len() >= MAX_SUBBLOCK_TXS {
248            return;
249        }
250        txs.insert(*transaction.tx_hash(), Arc::new(transaction));
251    }
252
253    /// Tracking of the current sconsensus state by listening to notarizations and nullifications.
254    #[instrument(skip_all, fields(event.epoch = %event.epoch(), event.view = %event.view()))]
255    fn on_consensus_event(&mut self, event: Activity<Scheme<PublicKey, MinSig>, Digest>) {
256        let (new_tip, new_round, new_cert) = match event {
257            Activity::Notarization(n) => {
258                (Some(n.proposal.payload.0), n.proposal.round, n.certificate)
259            }
260            Activity::Finalization(n) => {
261                (Some(n.proposal.payload.0), n.proposal.round, n.certificate)
262            }
263            Activity::Nullification(n) => (None, n.round, n.certificate),
264            _ => return,
265        };
266
267        if let Some((round, tip, cert)) = &mut self.consensus_tip
268            && *round <= new_round
269        {
270            *round = new_round;
271            *cert = new_cert;
272
273            if let Some(new_tip) = new_tip
274                && *tip != new_tip
275            {
276                // Clear collected subblocks if we have a new tip.
277                self.subblocks.clear();
278                *tip = new_tip;
279            }
280        } else if self.consensus_tip.is_none()
281            && let Some(new_tip) = new_tip
282        {
283            // Initialize consensus tip once we know the tip block hash.
284            self.consensus_tip = Some((new_round, new_tip, new_cert));
285        }
286
287        let Some((round, tip, certificate)) = &self.consensus_tip else {
288            return;
289        };
290
291        let Ok(Some(header)) = self
292            .node
293            .provider
294            .find_block_by_hash(*tip, BlockSource::Any)
295        else {
296            debug!(?tip, "missing header for the tip block at {tip}");
297            return;
298        };
299
300        let epoch_of_next_block = self
301            .epoch_strategy
302            .containing(Height::new(header.number() + 1))
303            .expect("epoch strategy covers all epochs")
304            .epoch();
305
306        // Can't proceed without knowing a validator set for the current epoch.
307        //
308        // TODO(hamdi): When finalizing a boundary block, the scheme for the next epoch is not yet registered meaning
309        // we skip the subblock building task. This issue is scoped to the boundary and will be fixed.
310        let Some(scheme) = self.scheme_provider.scoped(epoch_of_next_block) else {
311            debug!(%epoch_of_next_block, "scheme not found for epoch");
312            return;
313        };
314
315        let next_round = if round.epoch() == epoch_of_next_block {
316            Round::new(round.epoch(), round.view().next())
317        } else {
318            Round::new(epoch_of_next_block, View::new(1))
319        };
320
321        let next_proposer = Random::select_leader::<MinSig>(
322            next_round,
323            scheme.participants().len() as u32,
324            certificate.get().map(|signature| signature.seed_signature),
325        );
326        let next_proposer = scheme.participants()[next_proposer.get() as usize].clone();
327
328        debug!(?next_proposer, ?next_round, "determined next proposer");
329
330        // Spawn new subblock building task if the current one is assuming different proposer or parent hash.
331        if self.our_subblock.parent_hash() != Some(*tip)
332            || self.our_subblock.target_proposer() != Some(&next_proposer)
333        {
334            debug!(%tip, %next_proposer, "building new subblock");
335            self.build_new_subblock(*tip, next_proposer, scheme);
336        }
337    }
338
339    fn build_new_subblock(
340        &mut self,
341        parent_hash: BlockHash,
342        next_proposer: PublicKey,
343        scheme: Arc<Scheme<PublicKey, MinSig>>,
344    ) {
345        let transactions = self.subblock_transactions.clone();
346        let node = self.node.clone();
347        let num_validators = scheme.participants().len();
348        let signer = self.signer.clone();
349        let fee_recipient = self.fee_recipient;
350        let timeout = self.time_to_build_subblock;
351        let span = Span::current();
352        let handle = self
353            .context
354            .with_label("validate_subblock")
355            .shared(true)
356            .spawn(move |_| {
357                build_subblock(
358                    transactions,
359                    node,
360                    parent_hash,
361                    num_validators,
362                    signer,
363                    fee_recipient,
364                    timeout,
365                )
366                .instrument(span)
367            });
368
369        self.our_subblock = PendingSubblock::Task(BuildSubblockTask {
370            handle,
371            parent_hash,
372            proposer: next_proposer,
373        });
374    }
375
376    #[instrument(skip_all, err(level = Level::DEBUG), fields(sender = %sender, msg_bytes = message.len()))]
377    async fn on_network_message(
378        &mut self,
379        sender: PublicKey,
380        message: IoBuf,
381        network_tx: &mut impl Sender<PublicKey = PublicKey>,
382    ) -> eyre::Result<()> {
383        let message =
384            SubblocksMessage::decode(message).wrap_err("failed to decode network message")?;
385
386        let subblock = match message {
387            SubblocksMessage::Subblock(subblock) => subblock,
388            // Process acknowledgements
389            SubblocksMessage::Ack(ack) => {
390                if let PendingSubblock::Built(built) = &mut self.our_subblock
391                    && built.proposer == sender
392                    && ack == built.subblock.signature_hash()
393                {
394                    debug!("received acknowledgement from the next proposer");
395                    built.stop_broadcasting();
396                } else {
397                    warn!(%ack, "received invalid acknowledgement");
398                }
399
400                return Ok(());
401            }
402        };
403
404        let Some(tip) = self.tip() else {
405            return Err(eyre::eyre!("missing tip of the chain"));
406        };
407
408        // Skip subblocks that are not built on top of the tip.
409        eyre::ensure!(
410            subblock.parent_hash == tip,
411            "invalid subblock parent, expected {tip}, got {}",
412            subblock.parent_hash
413        );
414
415        // Send acknowledgement to the sender.
416        //
417        // We only send it after we've validated the tip to make sure that our view
418        // of the chain matches the one of the view of subblock sender. Otherwise,
419        // we expect to receive the subblock again.
420        let _ = network_tx
421            .send(
422                Recipients::One(sender.clone()),
423                SubblocksMessage::Ack(subblock.signature_hash()).encode(),
424                true,
425            )
426            .await;
427
428        debug!("validating new subblock");
429
430        // Spawn task to validate the subblock.
431        let node = self.node.clone();
432        let validated_subblocks_tx = self.actions_tx.clone();
433        let scheme_provider = self.scheme_provider.clone();
434        let epoch_strategy = self.epoch_strategy.clone();
435        let span = Span::current();
436        self.context.clone().shared(true).spawn(move |_| {
437            validate_subblock(
438                sender.clone(),
439                node,
440                subblock,
441                validated_subblocks_tx,
442                scheme_provider,
443                epoch_strategy,
444            )
445            .instrument(span)
446        });
447
448        Ok(())
449    }
450
451    #[instrument(skip_all, fields(subblock.validator = %subblock.validator(), subblock.parent_hash = %subblock.parent_hash))]
452    fn on_validated_subblock(&mut self, subblock: RecoveredSubBlock) {
453        // Skip subblock if we are already past its parent
454        if Some(subblock.parent_hash) != self.tip() {
455            return;
456        }
457
458        debug!(subblock = ?subblock, "validated subblock");
459
460        self.subblocks.insert(subblock.validator(), subblock);
461    }
462
463    #[instrument(skip_all)]
464    async fn on_built_subblock(
465        &mut self,
466        subblock: Result<RecoveredSubBlock, commonware_runtime::Error>,
467        next_proposer: PublicKey,
468    ) {
469        let subblock = match subblock {
470            Ok(subblock) => subblock,
471            Err(error) => {
472                warn!(%error, "failed to build subblock");
473                return;
474            }
475        };
476
477        if Some(subblock.parent_hash) != self.tip() {
478            return;
479        }
480
481        self.our_subblock = PendingSubblock::Built(BuiltSubblock {
482            subblock,
483            proposer: next_proposer,
484            // ticks immediately
485            broadcast_interval: Box::pin(futures::future::ready(())),
486        });
487    }
488
489    #[instrument(skip_all)]
490    async fn broadcast_built_subblock(
491        &mut self,
492        network_tx: &mut impl Sender<PublicKey = PublicKey>,
493    ) {
494        let PendingSubblock::Built(built) = &mut self.our_subblock else {
495            return;
496        };
497
498        // Schedule next broadcast in `subblock_broadcast_interval`
499        built.broadcast_interval = Box::pin(self.context.sleep(self.subblock_broadcast_interval));
500
501        debug!(
502            ?built.subblock,
503            next_proposer = %built.proposer,
504            "sending subblock to the next proposer"
505        );
506
507        if built.proposer != self.signer.public_key() {
508            let _ = network_tx
509                .send(
510                    Recipients::One(built.proposer.clone()),
511                    SubblocksMessage::Subblock((*built.subblock).clone()).encode(),
512                    true,
513                )
514                .await;
515        } else {
516            let subblock = built.subblock.clone();
517            built.stop_broadcasting();
518            self.on_validated_subblock(subblock);
519        }
520    }
521}
522
523/// Actions processed by the subblocks service.
524#[derive(Debug)]
525enum Message {
526    /// Returns all subblocks collected so far.
527    ///
528    /// This will return nothing if parent hash does not match the current chain view
529    /// of the service or if no subblocks have been collected yet.
530    GetSubBlocks {
531        /// Parent block to return subblocks for.
532        parent: BlockHash,
533        /// Response channel.
534        response: std::sync::mpsc::SyncSender<Vec<RecoveredSubBlock>>,
535    },
536
537    /// Reports a new consensus event.
538    Consensus(Box<Activity<Scheme<PublicKey, MinSig>, Digest>>),
539
540    /// Reports a new validated subblock.
541    ValidatedSubblock(RecoveredSubBlock),
542}
543
544/// The current state of our subblock.
545#[derive(Default)]
546enum PendingSubblock {
547    /// No subblock is available.
548    #[default]
549    None,
550    /// Subblock is currently being built.
551    Task(BuildSubblockTask),
552    /// Subblock has been built and is ready to be sent.
553    Built(BuiltSubblock),
554}
555
556impl PendingSubblock {
557    /// Returns the current [`BuildSubblockTask`] if it exists and switches state to [`PendingSubblock::None`].
558    fn take_task(&mut self) -> Option<BuildSubblockTask> {
559        if let Self::Task(task) = std::mem::take(self) {
560            Some(task)
561        } else {
562            None
563        }
564    }
565
566    /// Returns the parent hash of the subblock that was built or is being built.
567    fn parent_hash(&self) -> Option<BlockHash> {
568        match self {
569            Self::Task(task) => Some(task.parent_hash),
570            Self::Built(built) => Some(built.subblock.parent_hash),
571            Self::None => None,
572        }
573    }
574
575    /// Returns the proposer we are going to send the subblock to.
576    fn target_proposer(&self) -> Option<&PublicKey> {
577        match self {
578            Self::Task(task) => Some(&task.proposer),
579            Self::Built(built) => Some(&built.proposer),
580            Self::None => None,
581        }
582    }
583}
584
585/// Task for building a subblock.
586struct BuildSubblockTask {
587    /// Handle to the spawned task.
588    handle: Handle<RecoveredSubBlock>,
589    /// Parent hash subblock is being built on top of.
590    parent_hash: BlockHash,
591    /// Proposer we are going to send the subblock to.
592    proposer: PublicKey,
593}
594
595/// A built subblock ready to be sent.
596struct BuiltSubblock {
597    /// Subblock that has been built.
598    subblock: RecoveredSubBlock,
599    /// Proposer we are going to send the subblock to.
600    proposer: PublicKey,
601    /// Interval for subblock broadcast.
602    broadcast_interval: Pin<Box<dyn Future<Output = ()> + Send>>,
603}
604
605impl BuiltSubblock {
606    /// Stops broadcasting the subblock once the acknowledgement is received.
607    fn stop_broadcasting(&mut self) {
608        self.broadcast_interval = Box::pin(futures::future::pending());
609    }
610}
611
612/// Network messages used in the subblocks service.
613#[derive(Debug)]
614enum SubblocksMessage {
615    /// A new subblock sent to the proposer.
616    Subblock(SignedSubBlock),
617    /// Acknowledgment about receiving a subblock with given hash.
618    Ack(B256),
619}
620
621impl SubblocksMessage {
622    /// Encodes the message into a [`bytes::Bytes`].
623    fn encode(self) -> bytes::Bytes {
624        match self {
625            Self::Subblock(subblock) => alloy_rlp::encode(&subblock).into(),
626            Self::Ack(hash) => bytes::Bytes::copy_from_slice(hash.as_ref()),
627        }
628    }
629
630    /// Decodes a message from the given [`bytes::Bytes`].
631    fn decode(message: IoBuf) -> alloy_rlp::Result<Self> {
632        if message.len() == 32 {
633            let hash = B256::from_slice(message.as_ref());
634            Ok(Self::Ack(hash))
635        } else {
636            let subblock = SignedSubBlock::decode(&mut message.as_ref())?;
637            Ok(Self::Subblock(subblock))
638        }
639    }
640}
641
642/// Handle to the spawned subblocks service.
643#[derive(Clone)]
644pub(crate) struct Mailbox {
645    tx: mpsc::UnboundedSender<Message>,
646}
647
648impl Mailbox {
649    pub(crate) fn get_subblocks(
650        &self,
651        parent: BlockHash,
652    ) -> Result<Vec<RecoveredSubBlock>, RecvError> {
653        let (tx, rx) = std::sync::mpsc::sync_channel(1);
654        let _ = self.tx.unbounded_send(Message::GetSubBlocks {
655            parent,
656            response: tx,
657        });
658        rx.recv()
659    }
660}
661
662impl Reporter for Mailbox {
663    type Activity = Activity<Scheme<PublicKey, MinSig>, Digest>;
664
665    async fn report(&mut self, activity: Self::Activity) -> () {
666        let _ = self
667            .tx
668            .unbounded_send(Message::Consensus(Box::new(activity)));
669    }
670}
671
672fn evm_at_block(
673    node: &TempoFullNode,
674    hash: BlockHash,
675) -> eyre::Result<TempoEvm<State<StateProviderDatabase<StateProviderBox>>>> {
676    let db = State::builder()
677        .with_database(StateProviderDatabase::new(
678            node.provider.state_by_block_hash(hash)?,
679        ))
680        .build();
681    let header = node
682        .provider
683        .find_block_by_hash(hash, BlockSource::Any)?
684        .ok_or(ProviderError::BestBlockNotFound)?;
685
686    Ok(node.evm_config.evm_for_block(db, &header)?)
687}
688
689/// Builds a subblock from candidate transactions we've collected so far.
690///
691/// This will include as many valid transactions as possible within the given timeout.
692#[instrument(skip_all, fields(parent_hash = %parent_hash))]
693async fn build_subblock(
694    transactions: Arc<Mutex<IndexMap<TxHash, Arc<Recovered<TempoTxEnvelope>>>>>,
695    node: TempoFullNode,
696    parent_hash: BlockHash,
697    num_validators: usize,
698    signer: PrivateKey,
699    fee_recipient: Address,
700    timeout: Duration,
701) -> RecoveredSubBlock {
702    let start = Instant::now();
703
704    let (transactions, senders) = match evm_at_block(&node, parent_hash) {
705        Ok(mut evm) => {
706            let (mut selected, mut senders, mut to_remove) = (Vec::new(), Vec::new(), Vec::new());
707            let gas_budget = (evm.block().gas_limit / TEMPO_SHARED_GAS_DIVISOR)
708                .checked_div(num_validators as u64)
709                .expect("validator set must not be empty");
710
711            let mut gas_left = gas_budget;
712            let txs = transactions.lock().clone();
713
714            for (tx_hash, tx) in txs {
715                // Remove transactions over subblock gas budget
716                if tx.gas_limit() > gas_budget {
717                    warn!(
718                        %tx_hash,
719                        tx_gas_limit = tx.gas_limit(),
720                        gas_budget,
721                        "removing transaction with gas limit exceeding maximum subblock gas budget"
722                    );
723                    to_remove.push(tx_hash);
724                    continue;
725                }
726
727                // Skip transactions that don't fit in remaining budget (may fit in future rounds)
728                if tx.gas_limit() > gas_left {
729                    continue;
730                }
731
732                if let Err(err) = evm.transact_commit(&*tx) {
733                    warn!(%err, tx_hash = %tx_hash, "invalid subblock candidate transaction");
734                    to_remove.push(tx_hash);
735                    continue;
736                }
737
738                gas_left -= tx.gas_limit();
739                selected.push(tx.inner().clone());
740                senders.push(tx.signer());
741
742                if start.elapsed() > timeout {
743                    break;
744                }
745            }
746
747            // If necessary, acquire lock and drop all invalid txs
748            if !to_remove.is_empty() {
749                let mut txs = transactions.lock();
750                for hash in to_remove {
751                    txs.swap_remove(&hash);
752                }
753            }
754
755            (selected, senders)
756        }
757        Err(err) => {
758            warn!(%err, "failed to build an evm at block, building an empty subblock");
759
760            Default::default()
761        }
762    };
763
764    let subblock = SubBlock {
765        version: SubBlockVersion::V1,
766        fee_recipient,
767        parent_hash,
768        transactions,
769    };
770
771    // TODO: Use a namespace for these signatures?
772    let signature = signer.sign(&[], subblock.signature_hash().as_slice());
773    let signed_subblock = SignedSubBlock {
774        inner: subblock,
775        signature: Bytes::copy_from_slice(signature.as_ref()),
776    };
777
778    RecoveredSubBlock::new_unchecked(
779        signed_subblock,
780        senders,
781        B256::from_slice(&signer.public_key()),
782    )
783}
784
785/// Validates a subblock and reports it to the subblocks service.
786///
787/// Validation checks include:
788/// 1. Signature verification
789/// 2. Ensuring that sender is a validator for the block's epoch
790/// 3. Ensuring that all transactions have corresponding nonce key set.
791/// 4. Ensuring that all transactions are valid.
792#[instrument(skip_all, err(level = Level::WARN), fields(sender = %sender))]
793async fn validate_subblock(
794    sender: PublicKey,
795    node: TempoFullNode,
796    subblock: SignedSubBlock,
797    actions_tx: mpsc::UnboundedSender<Message>,
798    scheme_provider: SchemeProvider,
799    epoch_strategy: FixedEpocher,
800) -> eyre::Result<()> {
801    let Ok(signature) =
802        ed25519::Signature::decode(&mut subblock.signature.as_ref()).wrap_err("invalid signature")
803    else {
804        return Err(eyre::eyre!("invalid signature"));
805    };
806
807    // TODO: use a namespace for these signatures?
808    if !sender.verify(&[], subblock.signature_hash().as_slice(), &signature) {
809        return Err(eyre::eyre!("invalid signature"));
810    }
811
812    if subblock.transactions.iter().any(|tx| {
813        tx.subblock_proposer()
814            .is_none_or(|proposer| !proposer.matches(&sender))
815    }) {
816        return Err(eyre::eyre!(
817            "all transactions must specify the subblock validator"
818        ));
819    }
820
821    // Recover subblock transactions and convert it into a `RecoveredSubBlock`.
822    let subblock = subblock.try_into_recovered(B256::from_slice(&sender))?;
823
824    let mut evm = evm_at_block(&node, subblock.parent_hash)?;
825
826    let epoch = epoch_strategy
827        .containing(Height::new(evm.block().number.to::<u64>() + 1))
828        .expect("epoch strategy covers all epochs")
829        .epoch();
830    let scheme = scheme_provider
831        .scoped(epoch)
832        .ok_or_eyre("scheme not found")?;
833    let participants = scheme.participants().len() as usize;
834
835    eyre::ensure!(
836        scheme.participants().iter().any(|p| p == &sender),
837        "sender is not a validator"
838    );
839
840    // Bound subblock size at a value proportional to `TEMPO_SHARED_GAS_DIVISOR`.
841    //
842    // This ensures we never collect too many subblocks to fit into a new proposal.
843    let max_size = MAX_RLP_BLOCK_SIZE / TEMPO_SHARED_GAS_DIVISOR as usize / participants;
844    if subblock.total_tx_size() > max_size {
845        warn!(
846            size = subblock.total_tx_size(),
847            max_size, "subblock is too large, skipping"
848        );
849        return Ok(());
850    }
851
852    // Bound subblock gas at the per-validator allocation.
853    let gas_budget = evm.block().gas_limit / TEMPO_SHARED_GAS_DIVISOR / participants as u64;
854    let mut total_gas = 0u64;
855    for tx in subblock.transactions_recovered() {
856        total_gas = total_gas.saturating_add(tx.gas_limit());
857        if total_gas > gas_budget {
858            warn!(
859                total_gas,
860                gas_budget, "subblock exceeds gas budget, skipping"
861            );
862            return Ok(());
863        }
864    }
865
866    // Ensure all transactions can be committed
867    for tx in subblock.transactions_recovered() {
868        if let Err(err) = evm.transact_commit(tx) {
869            return Err(eyre::eyre!("transaction failed to execute: {err:?}"));
870        }
871    }
872
873    let _ = actions_tx.unbounded_send(Message::ValidatedSubblock(subblock));
874
875    Ok(())
876}