1use crate::{consensus::Digest, epoch::SchemeProvider};
2use alloy_consensus::{BlockHeader, Transaction, transaction::TxHashRef};
3use alloy_primitives::{Address, B256, BlockHash, Bytes, TxHash};
4use alloy_rlp::Decodable;
5use commonware_codec::DecodeExt;
6use commonware_consensus::{
7 Epochable, Reporter, Viewable,
8 simplex::{
9 elector::Random,
10 scheme::bls12381_threshold::vrf::{Certificate, Scheme},
11 types::Activity,
12 },
13 types::{Epocher as _, FixedEpocher, Height, Round, View},
14};
15use commonware_cryptography::{
16 Signer, Verifier,
17 bls12381::primitives::variant::MinSig,
18 certificate::Provider,
19 ed25519,
20 ed25519::{PrivateKey, PublicKey},
21};
22use commonware_p2p::{Receiver, Recipients, Sender};
23use commonware_runtime::{Handle, IoBuf, Metrics, Pacer, Spawner};
24use eyre::{Context, OptionExt};
25use futures::{FutureExt as _, StreamExt, channel::mpsc};
26use indexmap::IndexMap;
27use parking_lot::Mutex;
28use reth_consensus_common::validation::MAX_RLP_BLOCK_SIZE;
29use reth_evm::{Evm, revm::database::State};
30use reth_node_builder::ConfigureEvm;
31use reth_primitives_traits::Recovered;
32use reth_provider::{
33 BlockReader, BlockSource, ProviderError, StateProviderBox, StateProviderFactory,
34};
35use reth_revm::database::StateProviderDatabase;
36use std::{
37 pin::Pin,
38 sync::{Arc, mpsc::RecvError},
39 time::{Duration, Instant},
40};
41use tempo_chainspec::hardfork::TempoHardforks;
42use tempo_node::{TempoFullNode, evm::evm::TempoEvm};
43use tempo_primitives::{
44 RecoveredSubBlock, SignedSubBlock, SubBlock, SubBlockVersion, TempoTxEnvelope,
45};
46use tokio::sync::broadcast;
47use tracing::{Instrument, Level, Span, debug, error, instrument, warn};
48
49const MAX_SUBBLOCK_TXS: usize = 100_000;
54
55pub(crate) struct Config<TContext> {
56 pub(crate) context: TContext,
57 pub(crate) signer: PrivateKey,
58 pub(crate) scheme_provider: SchemeProvider,
59 pub(crate) node: Arc<TempoFullNode>,
60 pub(crate) fee_recipient: Address,
61 pub(crate) time_to_build_subblock: Duration,
62 pub(crate) subblock_broadcast_interval: Duration,
63 pub(crate) epoch_strategy: FixedEpocher,
64}
65
66pub(crate) struct Actor<TContext> {
77 actions_tx: mpsc::UnboundedSender<Message>,
79 actions_rx: mpsc::UnboundedReceiver<Message>,
81 subblock_transactions_rx: broadcast::Receiver<Recovered<TempoTxEnvelope>>,
83 our_subblock: PendingSubblock,
85
86 scheme_provider: SchemeProvider,
88 context: TContext,
90 signer: PrivateKey,
92 node: Arc<TempoFullNode>,
94 fee_recipient: Address,
96 time_to_build_subblock: Duration,
98 subblock_broadcast_interval: Duration,
100 epoch_strategy: FixedEpocher,
102
103 consensus_tip: Option<(Round, BlockHash, Certificate<MinSig>)>,
105
106 subblocks: IndexMap<B256, RecoveredSubBlock>,
108 subblock_transactions: Arc<Mutex<IndexMap<TxHash, Arc<Recovered<TempoTxEnvelope>>>>>,
110}
111
112impl<TContext: Spawner + Metrics + Pacer> Actor<TContext> {
113 pub(crate) fn new(
114 Config {
115 context,
116 signer,
117 scheme_provider,
118 node,
119 fee_recipient,
120 time_to_build_subblock,
121 subblock_broadcast_interval,
122 epoch_strategy,
123 }: Config<TContext>,
124 ) -> Self {
125 let (actions_tx, actions_rx) = mpsc::unbounded();
126 Self {
127 our_subblock: PendingSubblock::None,
128 subblock_transactions_rx: node.add_ons_handle.eth_api().subblock_transactions_rx(),
129 scheme_provider,
130 actions_tx,
131 actions_rx,
132 context,
133 signer,
134 node,
135 fee_recipient,
136 time_to_build_subblock,
137 subblock_broadcast_interval,
138 epoch_strategy,
139 consensus_tip: None,
140 subblocks: Default::default(),
141 subblock_transactions: Default::default(),
142 }
143 }
144
145 pub(crate) fn mailbox(&self) -> Mailbox {
147 Mailbox {
148 tx: self.actions_tx.clone(),
149 }
150 }
151
152 pub(crate) async fn run(
153 mut self,
154 (mut network_tx, mut network_rx): (
155 impl Sender<PublicKey = PublicKey>,
156 impl Receiver<PublicKey = PublicKey>,
157 ),
158 ) {
159 loop {
160 let (subblock_task, broadcast_interval) = match &mut self.our_subblock {
161 PendingSubblock::None => (None, None),
162 PendingSubblock::Task(task) => (Some(task), None),
163 PendingSubblock::Built(built) => (None, Some(&mut built.broadcast_interval)),
164 };
165
166 tokio::select! {
167 biased;
168
169 Some(action) = self.actions_rx.next() => {
171 self.on_new_message(action);
172 },
173 result = self.subblock_transactions_rx.recv() => {
175 match result {
176 Ok(transaction) => {
177 self.on_new_subblock_transaction(transaction);
178 }
179 Err(broadcast::error::RecvError::Lagged(count)) => {
180 warn!(
181 lagged_count = count,
182 "subblock transaction receiver lagged, {} messages dropped",
183 count
184 );
185 }
186 Err(broadcast::error::RecvError::Closed) => {
187 error!("subblock transactions channel closed unexpectedly");
188 break;
189 }
190 }
191 },
192 Ok((sender, message)) = network_rx.recv() => {
194 let _ = self.on_network_message(sender, message, &mut network_tx).await;
195 },
196 subblock = if let Some(task) = subblock_task {
198 (&mut task.handle).fuse()
199 } else {
200 futures::future::Fuse::terminated()
201 } => {
202 let task = self.our_subblock.take_task().unwrap();
203 self.on_built_subblock(subblock, task.proposer).await;
204 }
205 _ = if let Some(broadcast_interval) = broadcast_interval {
207 broadcast_interval.fuse()
208 } else {
209 futures::future::Fuse::terminated()
210 } => {
211 self.broadcast_built_subblock(&mut network_tx).await;
212 }
213 }
214 }
215 }
216
217 fn tip(&self) -> Option<BlockHash> {
219 self.consensus_tip.as_ref().map(|(_, tip, _)| *tip)
220 }
221
222 fn on_new_message(&mut self, action: Message) {
223 match action {
224 Message::GetSubBlocks { parent, response } => {
225 if self.tip() != Some(parent) {
227 let _ = response.send(Vec::new());
228 return;
229 }
230 let subblocks = self.subblocks.values().cloned().collect();
232 let _ = response.send(subblocks);
233 }
234 Message::Consensus(activity) => self.on_consensus_event(*activity),
235 Message::ValidatedSubblock(subblock) => self.on_validated_subblock(subblock),
236 }
237 }
238
239 #[instrument(skip_all, fields(transaction.tx_hash = %transaction.tx_hash()))]
240 fn on_new_subblock_transaction(&self, transaction: Recovered<TempoTxEnvelope>) {
241 if !transaction
242 .subblock_proposer()
243 .is_some_and(|k| k.matches(self.signer.public_key()))
244 {
245 return;
246 }
247 let mut txs = self.subblock_transactions.lock();
248 if txs.len() >= MAX_SUBBLOCK_TXS {
249 return;
250 }
251 txs.insert(*transaction.tx_hash(), Arc::new(transaction));
252 }
253
254 #[instrument(skip_all, fields(event.epoch = %event.epoch(), event.view = %event.view()))]
256 fn on_consensus_event(&mut self, event: Activity<Scheme<PublicKey, MinSig>, Digest>) {
257 let (new_tip, new_round, new_cert) = match event {
258 Activity::Notarization(n) => {
259 (Some(n.proposal.payload.0), n.proposal.round, n.certificate)
260 }
261 Activity::Finalization(n) => {
262 (Some(n.proposal.payload.0), n.proposal.round, n.certificate)
263 }
264 Activity::Nullification(n) => (None, n.round, n.certificate),
265 _ => return,
266 };
267
268 if let Some((round, tip, cert)) = &mut self.consensus_tip
269 && *round <= new_round
270 {
271 *round = new_round;
272 *cert = new_cert;
273
274 if let Some(new_tip) = new_tip
275 && *tip != new_tip
276 {
277 self.subblocks.clear();
279 *tip = new_tip;
280 }
281 } else if self.consensus_tip.is_none()
282 && let Some(new_tip) = new_tip
283 {
284 self.consensus_tip = Some((new_round, new_tip, new_cert));
286 }
287
288 let Some((round, tip, certificate)) = &self.consensus_tip else {
289 return;
290 };
291
292 let Ok(Some(header)) = self
293 .node
294 .provider
295 .find_block_by_hash(*tip, BlockSource::Any)
296 else {
297 debug!(?tip, "missing header for the tip block at {tip}");
298 return;
299 };
300
301 let epoch_of_next_block = self
302 .epoch_strategy
303 .containing(Height::new(header.number() + 1))
304 .expect("epoch strategy covers all epochs")
305 .epoch();
306
307 let Some(scheme) = self.scheme_provider.scoped(epoch_of_next_block) else {
312 debug!(%epoch_of_next_block, "scheme not found for epoch");
313 return;
314 };
315
316 let next_round = if round.epoch() == epoch_of_next_block {
317 Round::new(round.epoch(), round.view().next())
318 } else {
319 Round::new(epoch_of_next_block, View::new(1))
320 };
321
322 let next_proposer = Random::select_leader::<MinSig>(
323 next_round,
324 scheme.participants().len() as u32,
325 certificate.get().map(|signature| signature.seed_signature),
326 );
327 let next_proposer = scheme.participants()[next_proposer.get() as usize].clone();
328
329 debug!(?next_proposer, ?next_round, "determined next proposer");
330
331 if self.our_subblock.parent_hash() != Some(*tip)
333 || self.our_subblock.target_proposer() != Some(&next_proposer)
334 {
335 debug!(%tip, %next_proposer, "building new subblock");
336 self.build_new_subblock(*tip, next_proposer, scheme);
337 }
338 }
339
340 fn build_new_subblock(
341 &mut self,
342 parent_hash: BlockHash,
343 next_proposer: PublicKey,
344 scheme: Arc<Scheme<PublicKey, MinSig>>,
345 ) {
346 let transactions = self.subblock_transactions.clone();
347 let node = self.node.clone();
348 let num_validators = scheme.participants().len();
349 let signer = self.signer.clone();
350 let fee_recipient = self.fee_recipient;
351 let timeout = self.time_to_build_subblock;
352 let span = Span::current();
353 let handle = self
354 .context
355 .with_label("validate_subblock")
356 .shared(true)
357 .spawn(move |_| {
358 build_subblock(
359 transactions,
360 node,
361 parent_hash,
362 num_validators,
363 signer,
364 fee_recipient,
365 timeout,
366 )
367 .instrument(span)
368 });
369
370 self.our_subblock = PendingSubblock::Task(BuildSubblockTask {
371 handle,
372 parent_hash,
373 proposer: next_proposer,
374 });
375 }
376
377 #[instrument(skip_all, err(level = Level::DEBUG), fields(sender = %sender, msg_bytes = message.len()))]
378 async fn on_network_message(
379 &mut self,
380 sender: PublicKey,
381 message: IoBuf,
382 network_tx: &mut impl Sender<PublicKey = PublicKey>,
383 ) -> eyre::Result<()> {
384 let message =
385 SubblocksMessage::decode(message).wrap_err("failed to decode network message")?;
386
387 let subblock = match message {
388 SubblocksMessage::Subblock(subblock) => subblock,
389 SubblocksMessage::Ack(ack) => {
391 if let PendingSubblock::Built(built) = &mut self.our_subblock
392 && built.proposer == sender
393 && ack == built.subblock.signature_hash()
394 {
395 debug!("received acknowledgement from the next proposer");
396 built.stop_broadcasting();
397 } else {
398 warn!(%ack, "received invalid acknowledgement");
399 }
400
401 return Ok(());
402 }
403 };
404
405 let Some(tip) = self.tip() else {
406 return Err(eyre::eyre!("missing tip of the chain"));
407 };
408
409 eyre::ensure!(
411 subblock.parent_hash == tip,
412 "invalid subblock parent, expected {tip}, got {}",
413 subblock.parent_hash
414 );
415
416 let _ = network_tx
422 .send(
423 Recipients::One(sender.clone()),
424 SubblocksMessage::Ack(subblock.signature_hash()).encode(),
425 true,
426 )
427 .await;
428
429 debug!("validating new subblock");
430
431 let node = self.node.clone();
433 let validated_subblocks_tx = self.actions_tx.clone();
434 let scheme_provider = self.scheme_provider.clone();
435 let epoch_strategy = self.epoch_strategy.clone();
436 let span = Span::current();
437 self.context.clone().shared(true).spawn(move |_| {
438 validate_subblock(
439 sender.clone(),
440 node,
441 subblock,
442 validated_subblocks_tx,
443 scheme_provider,
444 epoch_strategy,
445 )
446 .instrument(span)
447 });
448
449 Ok(())
450 }
451
452 #[instrument(skip_all, fields(subblock.validator = %subblock.validator(), subblock.parent_hash = %subblock.parent_hash))]
453 fn on_validated_subblock(&mut self, subblock: RecoveredSubBlock) {
454 if Some(subblock.parent_hash) != self.tip() {
456 return;
457 }
458
459 debug!(subblock = ?subblock, "validated subblock");
460
461 self.subblocks.insert(subblock.validator(), subblock);
462 }
463
464 #[instrument(skip_all)]
465 async fn on_built_subblock(
466 &mut self,
467 subblock: Result<RecoveredSubBlock, commonware_runtime::Error>,
468 next_proposer: PublicKey,
469 ) {
470 let subblock = match subblock {
471 Ok(subblock) => subblock,
472 Err(error) => {
473 warn!(%error, "failed to build subblock");
474 return;
475 }
476 };
477
478 if Some(subblock.parent_hash) != self.tip() {
479 return;
480 }
481
482 self.our_subblock = PendingSubblock::Built(BuiltSubblock {
483 subblock,
484 proposer: next_proposer,
485 broadcast_interval: Box::pin(futures::future::ready(())),
487 });
488 }
489
490 #[instrument(skip_all)]
491 async fn broadcast_built_subblock(
492 &mut self,
493 network_tx: &mut impl Sender<PublicKey = PublicKey>,
494 ) {
495 let PendingSubblock::Built(built) = &mut self.our_subblock else {
496 return;
497 };
498
499 built.broadcast_interval = Box::pin(self.context.sleep(self.subblock_broadcast_interval));
501
502 debug!(
503 ?built.subblock,
504 next_proposer = %built.proposer,
505 "sending subblock to the next proposer"
506 );
507
508 if built.proposer != self.signer.public_key() {
509 let _ = network_tx
510 .send(
511 Recipients::One(built.proposer.clone()),
512 SubblocksMessage::Subblock((*built.subblock).clone()).encode(),
513 true,
514 )
515 .await;
516 } else {
517 let subblock = built.subblock.clone();
518 built.stop_broadcasting();
519 self.on_validated_subblock(subblock);
520 }
521 }
522}
523
524#[derive(Debug)]
526enum Message {
527 GetSubBlocks {
532 parent: BlockHash,
534 response: std::sync::mpsc::SyncSender<Vec<RecoveredSubBlock>>,
536 },
537
538 Consensus(Box<Activity<Scheme<PublicKey, MinSig>, Digest>>),
540
541 ValidatedSubblock(RecoveredSubBlock),
543}
544
545#[derive(Default)]
547enum PendingSubblock {
548 #[default]
550 None,
551 Task(BuildSubblockTask),
553 Built(BuiltSubblock),
555}
556
557impl PendingSubblock {
558 fn take_task(&mut self) -> Option<BuildSubblockTask> {
560 if let Self::Task(task) = std::mem::take(self) {
561 Some(task)
562 } else {
563 None
564 }
565 }
566
567 fn parent_hash(&self) -> Option<BlockHash> {
569 match self {
570 Self::Task(task) => Some(task.parent_hash),
571 Self::Built(built) => Some(built.subblock.parent_hash),
572 Self::None => None,
573 }
574 }
575
576 fn target_proposer(&self) -> Option<&PublicKey> {
578 match self {
579 Self::Task(task) => Some(&task.proposer),
580 Self::Built(built) => Some(&built.proposer),
581 Self::None => None,
582 }
583 }
584}
585
586struct BuildSubblockTask {
588 handle: Handle<RecoveredSubBlock>,
590 parent_hash: BlockHash,
592 proposer: PublicKey,
594}
595
596struct BuiltSubblock {
598 subblock: RecoveredSubBlock,
600 proposer: PublicKey,
602 broadcast_interval: Pin<Box<dyn Future<Output = ()> + Send>>,
604}
605
606impl BuiltSubblock {
607 fn stop_broadcasting(&mut self) {
609 self.broadcast_interval = Box::pin(futures::future::pending());
610 }
611}
612
613#[derive(Debug)]
615enum SubblocksMessage {
616 Subblock(SignedSubBlock),
618 Ack(B256),
620}
621
622impl SubblocksMessage {
623 fn encode(self) -> bytes::Bytes {
625 match self {
626 Self::Subblock(subblock) => alloy_rlp::encode(&subblock).into(),
627 Self::Ack(hash) => bytes::Bytes::copy_from_slice(hash.as_ref()),
628 }
629 }
630
631 fn decode(message: IoBuf) -> alloy_rlp::Result<Self> {
633 if message.len() == 32 {
634 let hash = B256::from_slice(message.as_ref());
635 Ok(Self::Ack(hash))
636 } else {
637 let subblock = SignedSubBlock::decode(&mut message.as_ref())?;
638 Ok(Self::Subblock(subblock))
639 }
640 }
641}
642
643#[derive(Clone)]
645pub(crate) struct Mailbox {
646 tx: mpsc::UnboundedSender<Message>,
647}
648
649impl Mailbox {
650 pub(crate) fn get_subblocks(
651 &self,
652 parent: BlockHash,
653 ) -> Result<Vec<RecoveredSubBlock>, RecvError> {
654 let (tx, rx) = std::sync::mpsc::sync_channel(1);
655 let _ = self.tx.unbounded_send(Message::GetSubBlocks {
656 parent,
657 response: tx,
658 });
659 rx.recv()
660 }
661}
662
663impl Reporter for Mailbox {
664 type Activity = Activity<Scheme<PublicKey, MinSig>, Digest>;
665
666 async fn report(&mut self, activity: Self::Activity) -> () {
667 let _ = self
668 .tx
669 .unbounded_send(Message::Consensus(Box::new(activity)));
670 }
671}
672
673fn evm_at_block(
674 node: &TempoFullNode,
675 hash: BlockHash,
676) -> eyre::Result<TempoEvm<State<StateProviderDatabase<StateProviderBox>>>> {
677 let db = State::builder()
678 .with_database(StateProviderDatabase::new(
679 node.provider.state_by_block_hash(hash)?,
680 ))
681 .build();
682 let header = node
683 .provider
684 .find_block_by_hash(hash, BlockSource::Any)?
685 .ok_or(ProviderError::BestBlockNotFound)?;
686
687 Ok(node.evm_config.evm_for_block(db, &header)?)
688}
689
690#[instrument(skip_all, fields(parent_hash = %parent_hash))]
694async fn build_subblock(
695 transactions: Arc<Mutex<IndexMap<TxHash, Arc<Recovered<TempoTxEnvelope>>>>>,
696 node: Arc<TempoFullNode>,
697 parent_hash: BlockHash,
698 num_validators: usize,
699 signer: PrivateKey,
700 fee_recipient: Address,
701 timeout: Duration,
702) -> RecoveredSubBlock {
703 let start = Instant::now();
704
705 let (transactions, senders) = match evm_at_block(&node, parent_hash) {
706 Ok(mut evm) => {
707 let (mut selected, mut senders, mut to_remove) = (Vec::new(), Vec::new(), Vec::new());
708 let shared_gas_limit = node
709 .config
710 .chain
711 .shared_gas_limit_at(evm.block().timestamp.saturating_to(), evm.block().gas_limit);
712 let gas_budget = shared_gas_limit
713 .checked_div(num_validators as u64)
714 .expect("validator set must not be empty");
715
716 let mut gas_left = gas_budget;
717 let txs = transactions.lock().clone();
718
719 for (tx_hash, tx) in txs {
720 let max_regular_gas =
721 core::cmp::min(tx.gas_limit(), evm.cfg.tx_gas_limit_cap.unwrap_or(u64::MAX));
722 if max_regular_gas > gas_budget {
724 warn!(
725 %tx_hash,
726 tx_gas_limit = tx.gas_limit(),
727 max_regular_gas,
728 gas_budget,
729 "removing transaction with gas limit exceeding maximum subblock gas budget"
730 );
731 to_remove.push(tx_hash);
732 continue;
733 }
734
735 if max_regular_gas > gas_left {
737 continue;
738 }
739
740 if let Err(err) = evm.transact_commit(&*tx) {
741 warn!(%err, tx_hash = %tx_hash, "invalid subblock candidate transaction");
742 to_remove.push(tx_hash);
743 continue;
744 }
745
746 gas_left -= max_regular_gas;
747 selected.push(tx.inner().clone());
748 senders.push(tx.signer());
749
750 if start.elapsed() > timeout {
751 break;
752 }
753 }
754
755 if !to_remove.is_empty() {
757 let mut txs = transactions.lock();
758 for hash in to_remove {
759 txs.swap_remove(&hash);
760 }
761 }
762
763 (selected, senders)
764 }
765 Err(err) => {
766 warn!(%err, "failed to build an evm at block, building an empty subblock");
767
768 Default::default()
769 }
770 };
771
772 let subblock = SubBlock {
773 version: SubBlockVersion::V1,
774 fee_recipient,
775 parent_hash,
776 transactions,
777 };
778
779 let signature = signer.sign(&[], subblock.signature_hash().as_slice());
781 let signed_subblock = SignedSubBlock {
782 inner: subblock,
783 signature: Bytes::copy_from_slice(signature.as_ref()),
784 };
785
786 RecoveredSubBlock::new_unchecked(
787 signed_subblock,
788 senders,
789 B256::from_slice(&signer.public_key()),
790 )
791}
792
793#[instrument(skip_all, err(level = Level::WARN), fields(sender = %sender))]
801async fn validate_subblock(
802 sender: PublicKey,
803 node: Arc<TempoFullNode>,
804 subblock: SignedSubBlock,
805 actions_tx: mpsc::UnboundedSender<Message>,
806 scheme_provider: SchemeProvider,
807 epoch_strategy: FixedEpocher,
808) -> eyre::Result<()> {
809 let Ok(signature) =
810 ed25519::Signature::decode(&mut subblock.signature.as_ref()).wrap_err("invalid signature")
811 else {
812 return Err(eyre::eyre!("invalid signature"));
813 };
814
815 if !sender.verify(&[], subblock.signature_hash().as_slice(), &signature) {
817 return Err(eyre::eyre!("invalid signature"));
818 }
819
820 if subblock.transactions.iter().any(|tx| {
821 tx.subblock_proposer()
822 .is_none_or(|proposer| !proposer.matches(&sender))
823 }) {
824 return Err(eyre::eyre!(
825 "all transactions must specify the subblock validator"
826 ));
827 }
828
829 let subblock = subblock.try_into_recovered(B256::from_slice(&sender))?;
831
832 let mut evm = evm_at_block(&node, subblock.parent_hash)?;
833
834 let epoch = epoch_strategy
835 .containing(Height::new(evm.block().number.to::<u64>() + 1))
836 .expect("epoch strategy covers all epochs")
837 .epoch();
838 let scheme = scheme_provider
839 .scoped(epoch)
840 .ok_or_eyre("scheme not found")?;
841 let participants = scheme.participants().len() as usize;
842
843 eyre::ensure!(
844 scheme.participants().iter().any(|p| p == &sender),
845 "sender is not a validator"
846 );
847
848 let shared_gas_limit = node
849 .config
850 .chain
851 .shared_gas_limit_at(evm.block().timestamp.saturating_to(), evm.block().gas_limit);
852
853 let max_size = (MAX_RLP_BLOCK_SIZE as u128 * u128::from(shared_gas_limit)
857 / u128::from(evm.block().gas_limit)
858 / participants as u128) as usize;
859 if subblock.total_tx_size() > max_size {
860 warn!(
861 size = subblock.total_tx_size(),
862 max_size, "subblock is too large, skipping"
863 );
864 return Ok(());
865 }
866
867 let gas_budget = shared_gas_limit / participants as u64;
869 let mut total_gas = 0u64;
870 for tx in subblock.transactions_recovered() {
871 let max_regular_gas =
872 core::cmp::min(tx.gas_limit(), evm.cfg.tx_gas_limit_cap.unwrap_or(u64::MAX));
873 total_gas = total_gas.saturating_add(max_regular_gas);
874 if total_gas > gas_budget {
875 warn!(
876 total_gas,
877 gas_budget, "subblock exceeds gas budget, skipping"
878 );
879 return Ok(());
880 }
881 }
882
883 for tx in subblock.transactions_recovered() {
885 if let Err(err) = evm.transact_commit(tx) {
886 return Err(eyre::eyre!("transaction failed to execute: {err:?}"));
887 }
888 }
889
890 let _ = actions_tx.unbounded_send(Message::ValidatedSubblock(subblock));
891
892 Ok(())
893}