tempo_commonware_node/
subblocks.rs

1use crate::{consensus::Digest, epoch::SchemeProvider};
2use alloy_consensus::{BlockHeader, Transaction, transaction::TxHashRef};
3use alloy_primitives::{Address, B256, BlockHash, Bytes, TxHash};
4use alloy_rlp::Decodable;
5use commonware_codec::DecodeExt;
6use commonware_consensus::{
7    Epochable, Reporter, Viewable,
8    marshal::SchemeProvider as _,
9    simplex::{
10        select_leader,
11        signing_scheme::{
12            Scheme as _,
13            bls12381_threshold::{self, Scheme},
14        },
15        types::Activity,
16    },
17    types::Round,
18    utils,
19};
20use commonware_cryptography::{
21    Signer, Verifier,
22    bls12381::primitives::variant::MinSig,
23    ed25519::{PrivateKey, PublicKey, Signature},
24};
25use commonware_p2p::{Receiver, Recipients, Sender};
26use commonware_runtime::{Handle, Metrics, Pacer, Spawner};
27use eyre::{Context, OptionExt};
28use futures::{FutureExt as _, StreamExt, channel::mpsc};
29use indexmap::IndexMap;
30use parking_lot::Mutex;
31use reth_consensus_common::validation::MAX_RLP_BLOCK_SIZE;
32use reth_evm::{Evm, revm::database::State};
33use reth_node_builder::ConfigureEvm;
34use reth_primitives_traits::Recovered;
35use reth_provider::{
36    BlockReader, BlockSource, ProviderError, StateProviderBox, StateProviderFactory,
37};
38use reth_revm::database::StateProviderDatabase;
39use std::{
40    pin::Pin,
41    sync::{Arc, mpsc::RecvError},
42    time::{Duration, Instant},
43};
44use tempo_node::{TempoFullNode, consensus::TEMPO_SHARED_GAS_DIVISOR, evm::evm::TempoEvm};
45use tempo_primitives::{
46    RecoveredSubBlock, SignedSubBlock, SubBlock, SubBlockVersion, TempoTxEnvelope,
47};
48use tokio::sync::broadcast;
49use tracing::{Instrument, Level, Span, debug, instrument, warn};
50
51/// Maximum number of stored subblock transactions. Used to prevent DOS attacks.
52///
53/// NOTE: included txs are organically cleared when building the next subblock
54/// because they become invalid once their nonce is used.
55const MAX_SUBBLOCK_TXS: usize = 100_000;
56
57pub(crate) struct Config<TContext> {
58    pub(crate) context: TContext,
59    pub(crate) signer: PrivateKey,
60    pub(crate) scheme_provider: SchemeProvider,
61    pub(crate) node: TempoFullNode,
62    pub(crate) fee_recipient: Address,
63    pub(crate) time_to_build_subblock: Duration,
64    pub(crate) subblock_broadcast_interval: Duration,
65    pub(crate) epoch_length: u64,
66}
67
68/// Task managing collected subblocks.
69///
70/// This actor is responsible for tracking consensus events and determining
71/// current tip of the chain and next block's proposer.
72///
73/// Once next block proposer is known, we immediately start building a new subblock.
74/// Once it's built, we broadcast it to the next proposer directly.
75///
76/// Upon receiving a subblock from the network, we ensure that we are
77/// the proposer and verify the block on top of latest state.
78pub(crate) struct Actor<TContext> {
79    /// Sender of messages to the service.
80    actions_tx: mpsc::UnboundedSender<Message>,
81    /// Receiver of events to the service.
82    actions_rx: mpsc::UnboundedReceiver<Message>,
83    /// Stream of subblock transactions from RPC.
84    subblock_transactions_rx: broadcast::Receiver<Recovered<TempoTxEnvelope>>,
85    /// Handle to a task building a new subblock.
86    our_subblock: PendingSubblock,
87
88    /// Scheme provider to track participants of each epoch.
89    scheme_provider: SchemeProvider,
90    /// Commonware runtime context.
91    context: TContext,
92    /// ed25519 private key used for consensus.
93    signer: PrivateKey,
94    /// Execution layer node.
95    node: TempoFullNode,
96    /// Fee recipient address to set for subblocks.
97    fee_recipient: Address,
98    /// Timeout for building a subblock.
99    time_to_build_subblock: Duration,
100    /// How often to broadcast subblocks to the current proposer.
101    subblock_broadcast_interval: Duration,
102    /// Length of an epoch in blocks.
103    epoch_length: u64,
104
105    /// Current consensus tip. Includes highest observed round, digest and certificate.
106    consensus_tip: Option<(Round, BlockHash, bls12381_threshold::Signature<MinSig>)>,
107
108    /// Collected subblocks keyed by validator public key.
109    subblocks: IndexMap<B256, RecoveredSubBlock>,
110    /// Subblock candidate transactions.
111    subblock_transactions: Arc<Mutex<IndexMap<TxHash, Arc<Recovered<TempoTxEnvelope>>>>>,
112}
113
114impl<TContext: Spawner + Metrics + Pacer> Actor<TContext> {
115    pub(crate) fn new(
116        Config {
117            context,
118            signer,
119            scheme_provider,
120            node,
121            fee_recipient,
122            time_to_build_subblock,
123            subblock_broadcast_interval,
124            epoch_length,
125        }: Config<TContext>,
126    ) -> Self {
127        let (actions_tx, actions_rx) = mpsc::unbounded();
128        Self {
129            our_subblock: PendingSubblock::None,
130            subblock_transactions_rx: node.add_ons_handle.eth_api().subblock_transactions_rx(),
131            scheme_provider,
132            actions_tx,
133            actions_rx,
134            context,
135            signer,
136            node,
137            fee_recipient,
138            time_to_build_subblock,
139            subblock_broadcast_interval,
140            epoch_length,
141            consensus_tip: None,
142            subblocks: Default::default(),
143            subblock_transactions: Default::default(),
144        }
145    }
146
147    /// Returns a handle to the subblocks service.
148    pub(crate) fn mailbox(&self) -> Mailbox {
149        Mailbox {
150            tx: self.actions_tx.clone(),
151        }
152    }
153
154    pub(crate) async fn run(
155        mut self,
156        (mut network_tx, mut network_rx): (
157            impl Sender<PublicKey = PublicKey>,
158            impl Receiver<PublicKey = PublicKey>,
159        ),
160    ) {
161        loop {
162            let (subblock_task, broadcast_interval) = match &mut self.our_subblock {
163                PendingSubblock::None => (None, None),
164                PendingSubblock::Task(task) => (Some(task), None),
165                PendingSubblock::Built(built) => (None, Some(&mut built.broadcast_interval)),
166            };
167
168            tokio::select! {
169                biased;
170
171                // Handle messages from consensus engine and service handle.
172                Some(action) = self.actions_rx.next() => {
173                    self.on_new_message(action);
174                },
175                // Handle new subblock transactions.
176                Ok(transaction) = self.subblock_transactions_rx.recv() => {
177                    self.on_new_subblock_transaction(transaction);
178                },
179                // Handle messages from the network.
180                Ok((sender, message)) = network_rx.recv() => {
181                    let _ = self.on_network_message(sender, message, &mut network_tx).await;
182                },
183                // Handle built subblocks.
184                subblock = if let Some(task) = subblock_task {
185                    (&mut task.handle).fuse()
186                } else {
187                    futures::future::Fuse::terminated()
188                } => {
189                    let task = self.our_subblock.take_task().unwrap();
190                    self.on_built_subblock(subblock, task.proposer).await;
191                }
192                // Handle subblocks broadcast.
193                _ = if let Some(broadcast_interval) = broadcast_interval {
194                    broadcast_interval.fuse()
195                } else {
196                    futures::future::Fuse::terminated()
197                } => {
198                    self.broadcast_built_subblock(&mut network_tx).await;
199                }
200            }
201        }
202    }
203
204    /// Returns the current consensus tip.
205    fn tip(&self) -> Option<BlockHash> {
206        self.consensus_tip.as_ref().map(|(_, tip, _)| *tip)
207    }
208
209    fn on_new_message(&mut self, action: Message) {
210        match action {
211            Message::GetSubBlocks { parent, response } => {
212                // This should never happen, but just in case.
213                if self.tip() != Some(parent) {
214                    let _ = response.send(Vec::new());
215                    return;
216                }
217                // Return all subblocks we've collected for this block.
218                let subblocks = self.subblocks.values().cloned().collect();
219                let _ = response.send(subblocks);
220            }
221            Message::Consensus(activity) => self.on_consensus_event(*activity),
222            Message::ValidatedSubblock(subblock) => self.on_validated_subblock(subblock),
223        }
224    }
225
226    #[instrument(skip_all, fields(transaction.tx_hash = %transaction.tx_hash()))]
227    fn on_new_subblock_transaction(&self, transaction: Recovered<TempoTxEnvelope>) {
228        if !transaction
229            .subblock_proposer()
230            .is_some_and(|k| k.matches(self.signer.public_key()))
231        {
232            return;
233        }
234        let mut txs = self.subblock_transactions.lock();
235        if txs.len() >= MAX_SUBBLOCK_TXS {
236            return;
237        }
238        txs.insert(*transaction.tx_hash(), Arc::new(transaction));
239    }
240
241    /// Tracking of the current sconsensus state by listening to notarizations and nullifications.
242    #[instrument(skip_all, fields(event.epoch = event.epoch(), event.view = event.view()))]
243    fn on_consensus_event(&mut self, event: Activity<Scheme<PublicKey, MinSig>, Digest>) {
244        let (new_tip, new_round, new_cert) = match event {
245            Activity::Notarization(n) => {
246                (Some(n.proposal.payload.0), n.proposal.round, n.certificate)
247            }
248            Activity::Finalization(n) => {
249                (Some(n.proposal.payload.0), n.proposal.round, n.certificate)
250            }
251            Activity::Nullification(n) => (None, n.round, n.certificate),
252            _ => return,
253        };
254
255        if let Some((round, tip, cert)) = &mut self.consensus_tip
256            && *round <= new_round
257        {
258            *round = new_round;
259            *cert = new_cert;
260
261            if let Some(new_tip) = new_tip
262                && *tip != new_tip
263            {
264                // Clear collected subblocks if we have a new tip.
265                self.subblocks.clear();
266                *tip = new_tip;
267            }
268        } else if self.consensus_tip.is_none()
269            && let Some(new_tip) = new_tip
270        {
271            // Initialize consensus tip once we know the tip block hash.
272            self.consensus_tip = Some((new_round, new_tip, new_cert));
273        }
274
275        let Some((round, tip, certificate)) = &self.consensus_tip else {
276            return;
277        };
278
279        let Ok(Some(header)) = self
280            .node
281            .provider
282            .find_block_by_hash(*tip, BlockSource::Any)
283        else {
284            debug!(?tip, "missing header for the tip block at {tip}");
285            return;
286        };
287
288        let epoch_of_next_block = utils::epoch(self.epoch_length, header.number() + 1);
289
290        // Can't proceed without knowing a validator set for the current epoch.
291        let Some(scheme) = self.scheme_provider.scheme(epoch_of_next_block) else {
292            debug!(epoch_of_next_block, "scheme not found for epoch");
293            return;
294        };
295
296        let next_round = if round.epoch() == epoch_of_next_block {
297            Round::new(round.epoch(), round.view() + 1)
298        } else {
299            Round::new(epoch_of_next_block, 1)
300        };
301
302        let seed = if next_round.view() == 1 {
303            // First view does not have a seed.
304            None
305        } else {
306            scheme.seed(*round, certificate)
307        };
308
309        let (next_proposer, _) = select_leader::<Scheme<PublicKey, MinSig>, _>(
310            scheme.participants().as_ref(),
311            next_round,
312            seed,
313        );
314
315        debug!(?next_proposer, ?next_round, "determined next proposer");
316
317        // Spawn new subblock building task if the current one is assuming different proposer or parent hash.
318        if self.our_subblock.parent_hash() != Some(*tip)
319            || self.our_subblock.target_proposer() != Some(&next_proposer)
320        {
321            debug!(%tip, %next_proposer, "building new subblock");
322            self.build_new_subblock(*tip, next_proposer, scheme);
323        }
324    }
325
326    fn build_new_subblock(
327        &mut self,
328        parent_hash: BlockHash,
329        next_proposer: PublicKey,
330        scheme: Arc<Scheme<PublicKey, MinSig>>,
331    ) {
332        let transactions = self.subblock_transactions.clone();
333        let node = self.node.clone();
334        let num_validators = scheme.participants().len();
335        let signer = self.signer.clone();
336        let fee_recipient = self.fee_recipient;
337        let timeout = self.time_to_build_subblock;
338        let span = Span::current();
339        let handle = self
340            .context
341            .with_label("validate_subblock")
342            .shared(true)
343            .spawn(move |_| {
344                build_subblock(
345                    transactions,
346                    node,
347                    parent_hash,
348                    num_validators,
349                    signer,
350                    fee_recipient,
351                    timeout,
352                )
353                .instrument(span)
354            });
355
356        self.our_subblock = PendingSubblock::Task(BuildSubblockTask {
357            handle,
358            parent_hash,
359            proposer: next_proposer,
360        });
361    }
362
363    #[instrument(skip_all, err(level = Level::WARN), fields(sender = %sender, msg_bytes = message.len()))]
364    async fn on_network_message(
365        &mut self,
366        sender: PublicKey,
367        message: bytes::Bytes,
368        network_tx: &mut impl Sender<PublicKey = PublicKey>,
369    ) -> eyre::Result<()> {
370        let message =
371            SubblocksMessage::decode(message).wrap_err("failed to decode network message")?;
372
373        let subblock = match message {
374            SubblocksMessage::Subblock(subblock) => subblock,
375            // Process acknowledgements
376            SubblocksMessage::Ack(ack) => {
377                if let PendingSubblock::Built(built) = &mut self.our_subblock
378                    && built.proposer == sender
379                    && ack == built.subblock.signature_hash()
380                {
381                    debug!("received acknowledgement from the next proposer");
382                    built.stop_broadcasting();
383                } else {
384                    warn!(%ack, "received invalid acknowledgement");
385                }
386
387                return Ok(());
388            }
389        };
390
391        let Some(tip) = self.tip() else {
392            return Err(eyre::eyre!("missing tip of the chain"));
393        };
394
395        // Skip subblocks that are not built on top of the tip.
396        eyre::ensure!(
397            subblock.parent_hash == tip,
398            "invalid subblock parent, expected {tip}, got {}",
399            subblock.parent_hash
400        );
401
402        // Send acknowledgement to the sender.
403        //
404        // We only send it after we've validated the tip to make sure that our view
405        // of the chain matches the one of the view of subblock sender. Otherwise,
406        // we expect to receive the subblock again.
407        let _ = network_tx
408            .send(
409                Recipients::One(sender.clone()),
410                SubblocksMessage::Ack(subblock.signature_hash()).encode(),
411                true,
412            )
413            .await;
414
415        debug!("validating new subblock");
416
417        // Spawn task to validate the subblock.
418        let node = self.node.clone();
419        let validated_subblocks_tx = self.actions_tx.clone();
420        let scheme_provider = self.scheme_provider.clone();
421        let epoch_length = self.epoch_length;
422        let span = Span::current();
423        self.context.clone().shared(true).spawn(move |_| {
424            validate_subblock(
425                sender.clone(),
426                node,
427                subblock,
428                validated_subblocks_tx,
429                scheme_provider,
430                epoch_length,
431            )
432            .instrument(span)
433        });
434
435        Ok(())
436    }
437
438    #[instrument(skip_all, fields(subblock.validator = %subblock.validator(), subblock.parent_hash = %subblock.parent_hash))]
439    fn on_validated_subblock(&mut self, subblock: RecoveredSubBlock) {
440        // Skip subblock if we are already past its parent
441        if Some(subblock.parent_hash) != self.tip() {
442            return;
443        }
444
445        debug!(subblock = ?subblock, "validated subblock");
446
447        self.subblocks.insert(subblock.validator(), subblock);
448    }
449
450    #[instrument(skip_all)]
451    async fn on_built_subblock(
452        &mut self,
453        subblock: Result<RecoveredSubBlock, commonware_runtime::Error>,
454        next_proposer: PublicKey,
455    ) {
456        let subblock = match subblock {
457            Ok(subblock) => subblock,
458            Err(error) => {
459                warn!(%error, "failed to build subblock");
460                return;
461            }
462        };
463
464        if Some(subblock.parent_hash) != self.tip() {
465            return;
466        }
467
468        self.our_subblock = PendingSubblock::Built(BuiltSubblock {
469            subblock,
470            proposer: next_proposer,
471            // ticks immediately
472            broadcast_interval: Box::pin(futures::future::ready(())),
473        });
474    }
475
476    #[instrument(skip_all)]
477    async fn broadcast_built_subblock(
478        &mut self,
479        network_tx: &mut impl Sender<PublicKey = PublicKey>,
480    ) {
481        let PendingSubblock::Built(built) = &mut self.our_subblock else {
482            return;
483        };
484
485        // Schedule next broadcast in `subblock_broadcast_interval`
486        built.broadcast_interval = Box::pin(self.context.sleep(self.subblock_broadcast_interval));
487
488        debug!(
489            ?built.subblock,
490            next_proposer = %built.proposer,
491            "sending subblock to the next proposer"
492        );
493
494        if built.proposer != self.signer.public_key() {
495            let _ = network_tx
496                .send(
497                    Recipients::One(built.proposer.clone()),
498                    SubblocksMessage::Subblock((*built.subblock).clone()).encode(),
499                    true,
500                )
501                .await;
502        } else {
503            let subblock = built.subblock.clone();
504            built.stop_broadcasting();
505            self.on_validated_subblock(subblock);
506        }
507    }
508}
509
510/// Actions processed by the subblocks service.
511#[derive(Debug)]
512enum Message {
513    /// Returns all subblocks collected so far.
514    ///
515    /// This will return nothing if parent hash does not match the current chain view
516    /// of the service or if no subblocks have been collected yet.
517    GetSubBlocks {
518        /// Parent block to return subblocks for.
519        parent: BlockHash,
520        /// Response channel.
521        response: std::sync::mpsc::SyncSender<Vec<RecoveredSubBlock>>,
522    },
523
524    /// Reports a new consensus event.
525    Consensus(Box<Activity<Scheme<PublicKey, MinSig>, Digest>>),
526
527    /// Reports a new validated subblock.
528    ValidatedSubblock(RecoveredSubBlock),
529}
530
531/// The current state of our subblock.
532#[derive(Default)]
533enum PendingSubblock {
534    /// No subblock is available.
535    #[default]
536    None,
537    /// Subblock is currently being built.
538    Task(BuildSubblockTask),
539    /// Subblock has been built and is ready to be sent.
540    Built(BuiltSubblock),
541}
542
543impl PendingSubblock {
544    /// Returns the current [`BuildSubblockTask`] if it exists and switches state to [`PendingSubblock::None`].
545    fn take_task(&mut self) -> Option<BuildSubblockTask> {
546        if let Self::Task(task) = std::mem::take(self) {
547            Some(task)
548        } else {
549            None
550        }
551    }
552
553    /// Returns the parent hash of the subblock that was built or is being built.
554    fn parent_hash(&self) -> Option<BlockHash> {
555        match self {
556            Self::Task(task) => Some(task.parent_hash),
557            Self::Built(built) => Some(built.subblock.parent_hash),
558            Self::None => None,
559        }
560    }
561
562    /// Returns the proposer we are going to send the subblock to.
563    fn target_proposer(&self) -> Option<&PublicKey> {
564        match self {
565            Self::Task(task) => Some(&task.proposer),
566            Self::Built(built) => Some(&built.proposer),
567            Self::None => None,
568        }
569    }
570}
571
572/// Task for building a subblock.
573struct BuildSubblockTask {
574    /// Handle to the spawned task.
575    handle: Handle<RecoveredSubBlock>,
576    /// Parent hash subblock is being built on top of.
577    parent_hash: BlockHash,
578    /// Proposer we are going to send the subblock to.
579    proposer: PublicKey,
580}
581
582/// A built subblock ready to be sent.
583struct BuiltSubblock {
584    /// Subblock that has been built.
585    subblock: RecoveredSubBlock,
586    /// Proposer we are going to send the subblock to.
587    proposer: PublicKey,
588    /// Interval for subblock broadcast.
589    broadcast_interval: Pin<Box<dyn Future<Output = ()> + Send>>,
590}
591
592impl BuiltSubblock {
593    /// Stops broadcasting the subblock once the acknowledgement is received.
594    fn stop_broadcasting(&mut self) {
595        self.broadcast_interval = Box::pin(futures::future::pending());
596    }
597}
598
599/// Network messages used in the subblocks service.
600#[derive(Debug)]
601enum SubblocksMessage {
602    /// A new subblock sent to the proposer.
603    Subblock(SignedSubBlock),
604    /// Acknowledgment about receiving a subblock with given hash.
605    Ack(B256),
606}
607
608impl SubblocksMessage {
609    /// Encodes the message into a [`bytes::Bytes`].
610    fn encode(self) -> bytes::Bytes {
611        match self {
612            Self::Subblock(subblock) => alloy_rlp::encode(&subblock).into(),
613            Self::Ack(hash) => bytes::Bytes::copy_from_slice(hash.as_ref()),
614        }
615    }
616
617    /// Decodes a message from the given [`bytes::Bytes`].
618    fn decode(message: bytes::Bytes) -> alloy_rlp::Result<Self> {
619        if message.len() == 32 {
620            let hash = B256::from_slice(&message);
621            Ok(Self::Ack(hash))
622        } else {
623            let subblock = SignedSubBlock::decode(&mut &*message)?;
624            Ok(Self::Subblock(subblock))
625        }
626    }
627}
628
629/// Handle to the spawned subblocks service.
630#[derive(Clone)]
631pub(crate) struct Mailbox {
632    tx: mpsc::UnboundedSender<Message>,
633}
634
635impl Mailbox {
636    pub(crate) fn get_subblocks(
637        &self,
638        parent: BlockHash,
639    ) -> Result<Vec<RecoveredSubBlock>, RecvError> {
640        let (tx, rx) = std::sync::mpsc::sync_channel(1);
641        let _ = self.tx.unbounded_send(Message::GetSubBlocks {
642            parent,
643            response: tx,
644        });
645        rx.recv()
646    }
647}
648
649impl Reporter for Mailbox {
650    type Activity = Activity<Scheme<PublicKey, MinSig>, Digest>;
651
652    async fn report(&mut self, activity: Self::Activity) -> () {
653        let _ = self
654            .tx
655            .unbounded_send(Message::Consensus(Box::new(activity)));
656    }
657}
658
659fn evm_at_block(
660    node: &TempoFullNode,
661    hash: BlockHash,
662) -> eyre::Result<TempoEvm<State<StateProviderDatabase<StateProviderBox>>>> {
663    let db = State::builder()
664        .with_database(StateProviderDatabase::new(
665            node.provider.state_by_block_hash(hash)?,
666        ))
667        .build();
668    let header = node
669        .provider
670        .find_block_by_hash(hash, BlockSource::Any)?
671        .ok_or(ProviderError::BestBlockNotFound)?;
672
673    Ok(node.evm_config.evm_for_block(db, &header)?)
674}
675
676/// Builds a subblock from candidate transactions we've collected so far.
677///
678/// This will include as many valid transactions as possible within the given timeout.
679#[instrument(skip_all, fields(parent_hash = %parent_hash))]
680async fn build_subblock(
681    transactions: Arc<Mutex<IndexMap<TxHash, Arc<Recovered<TempoTxEnvelope>>>>>,
682    node: TempoFullNode,
683    parent_hash: BlockHash,
684    num_validators: usize,
685    signer: PrivateKey,
686    fee_recipient: Address,
687    timeout: Duration,
688) -> RecoveredSubBlock {
689    let start = Instant::now();
690
691    let (transactions, senders) = match evm_at_block(&node, parent_hash) {
692        Ok(mut evm) => {
693            let mut selected_transactions = Vec::new();
694            let mut senders = Vec::new();
695            let mut gas_left =
696                evm.block().gas_limit / TEMPO_SHARED_GAS_DIVISOR / num_validators as u64;
697
698            let txs = transactions.lock().clone();
699            for (tx_hash, tx) in txs {
700                if tx.gas_limit() > gas_left {
701                    continue;
702                }
703                if let Err(err) = evm.transact_commit(&*tx) {
704                    warn!(%err, tx_hash = %tx_hash, "invalid subblock candidate transaction");
705                    // Remove invalid transactions from the set.
706                    transactions.lock().swap_remove(&tx_hash);
707                    continue;
708                }
709
710                gas_left -= tx.gas_limit();
711                selected_transactions.push(tx.inner().clone());
712                senders.push(tx.signer());
713
714                if start.elapsed() > timeout {
715                    break;
716                }
717            }
718
719            (selected_transactions, senders)
720        }
721        Err(err) => {
722            warn!(%err, "failed to build an evm at block, building an empty subblock");
723
724            Default::default()
725        }
726    };
727
728    let subblock = SubBlock {
729        version: SubBlockVersion::V1,
730        fee_recipient,
731        parent_hash,
732        transactions,
733    };
734
735    let signature = signer.sign(None, subblock.signature_hash().as_slice());
736    let signed_subblock = SignedSubBlock {
737        inner: subblock,
738        signature: Bytes::copy_from_slice(signature.as_ref()),
739    };
740
741    RecoveredSubBlock::new_unchecked(
742        signed_subblock,
743        senders,
744        B256::from_slice(&signer.public_key()),
745    )
746}
747
748/// Validates a subblock and reports it to the subblocks service.
749///
750/// Validation checks include:
751/// 1. Signature verification
752/// 2. Ensuring that sender is a validator for the block's epoch
753/// 3. Ensuring that all transactions have corresponding nonce key set.
754/// 4. Ensuring that all transactions are valid.
755#[instrument(skip_all, err(level = Level::WARN), fields(sender = %sender))]
756async fn validate_subblock(
757    sender: PublicKey,
758    node: TempoFullNode,
759    subblock: SignedSubBlock,
760    actions_tx: mpsc::UnboundedSender<Message>,
761    scheme_provider: SchemeProvider,
762    epoch_length: u64,
763) -> eyre::Result<()> {
764    let Ok(signature) =
765        Signature::decode(&mut subblock.signature.as_ref()).wrap_err("invalid signature")
766    else {
767        return Err(eyre::eyre!("invalid signature"));
768    };
769
770    if !sender.verify(None, subblock.signature_hash().as_slice(), &signature) {
771        return Err(eyre::eyre!("invalid signature"));
772    }
773
774    if subblock.transactions.iter().any(|tx| {
775        tx.subblock_proposer()
776            .is_none_or(|proposer| !proposer.matches(&sender))
777    }) {
778        return Err(eyre::eyre!(
779            "all transactions must specify the subblock validator"
780        ));
781    }
782
783    // Recover subblock transactions and convert it into a `RecoveredSubBlock`.
784    let subblock = subblock.try_into_recovered(B256::from_slice(&sender))?;
785
786    let mut evm = evm_at_block(&node, subblock.parent_hash)?;
787
788    let epoch = utils::epoch(epoch_length, evm.block().number.to::<u64>() + 1);
789    let scheme = scheme_provider
790        .scheme(epoch)
791        .ok_or_eyre("scheme not found")?;
792
793    eyre::ensure!(
794        scheme.participants().iter().any(|p| p == &sender),
795        "sender is not a validator"
796    );
797
798    // Bound subblock size at a value proportional to `TEMPO_SHARED_GAS_DIVISOR`.
799    //
800    // This ensures we never collect too many subblocks to fit into a new proposal.
801    let max_size = MAX_RLP_BLOCK_SIZE
802        / TEMPO_SHARED_GAS_DIVISOR as usize
803        / scheme.participants().len() as usize;
804    if subblock.total_tx_size() > max_size {
805        warn!(
806            size = subblock.total_tx_size(),
807            max_size, "subblock is too large, skipping"
808        );
809        return Ok(());
810    }
811
812    for tx in subblock.transactions_recovered() {
813        if let Err(err) = evm.transact_commit(tx) {
814            return Err(eyre::eyre!("transaction failed to execute: {err:?}"));
815        }
816    }
817
818    let _ = actions_tx.unbounded_send(Message::ValidatedSubblock(subblock));
819
820    Ok(())
821}