1use crate::{consensus::Digest, epoch::SchemeProvider};
2use alloy_consensus::{BlockHeader, Transaction, transaction::TxHashRef};
3use alloy_primitives::{Address, B256, BlockHash, Bytes, TxHash};
4use alloy_rlp::Decodable;
5use commonware_codec::DecodeExt;
6use commonware_consensus::{
7 Epochable, Reporter, Viewable,
8 simplex::{
9 elector::Random,
10 scheme::bls12381_threshold::vrf::{Certificate, Scheme},
11 types::Activity,
12 },
13 types::{Epocher as _, FixedEpocher, Height, Round, View},
14};
15use commonware_cryptography::{
16 Signer, Verifier,
17 bls12381::primitives::variant::MinSig,
18 certificate::Provider,
19 ed25519,
20 ed25519::{PrivateKey, PublicKey},
21};
22use commonware_p2p::{Receiver, Recipients, Sender};
23use commonware_runtime::{Handle, IoBuf, Metrics, Pacer, Spawner};
24use eyre::{Context, OptionExt};
25use futures::{FutureExt as _, StreamExt, channel::mpsc};
26use indexmap::IndexMap;
27use parking_lot::Mutex;
28use reth_consensus_common::validation::MAX_RLP_BLOCK_SIZE;
29use reth_evm::{Evm, revm::database::State};
30use reth_node_builder::ConfigureEvm;
31use reth_primitives_traits::Recovered;
32use reth_provider::{
33 BlockReader, BlockSource, ProviderError, StateProviderBox, StateProviderFactory,
34};
35use reth_revm::database::StateProviderDatabase;
36use std::{
37 pin::Pin,
38 sync::{Arc, mpsc::RecvError},
39 time::{Duration, Instant},
40};
41use tempo_node::{TempoFullNode, consensus::TEMPO_SHARED_GAS_DIVISOR, evm::evm::TempoEvm};
42use tempo_primitives::{
43 RecoveredSubBlock, SignedSubBlock, SubBlock, SubBlockVersion, TempoTxEnvelope,
44};
45use tokio::sync::broadcast;
46use tracing::{Instrument, Level, Span, debug, error, instrument, warn};
47
48const MAX_SUBBLOCK_TXS: usize = 100_000;
53
54pub(crate) struct Config<TContext> {
55 pub(crate) context: TContext,
56 pub(crate) signer: PrivateKey,
57 pub(crate) scheme_provider: SchemeProvider,
58 pub(crate) node: TempoFullNode,
59 pub(crate) fee_recipient: Address,
60 pub(crate) time_to_build_subblock: Duration,
61 pub(crate) subblock_broadcast_interval: Duration,
62 pub(crate) epoch_strategy: FixedEpocher,
63}
64
65pub(crate) struct Actor<TContext> {
76 actions_tx: mpsc::UnboundedSender<Message>,
78 actions_rx: mpsc::UnboundedReceiver<Message>,
80 subblock_transactions_rx: broadcast::Receiver<Recovered<TempoTxEnvelope>>,
82 our_subblock: PendingSubblock,
84
85 scheme_provider: SchemeProvider,
87 context: TContext,
89 signer: PrivateKey,
91 node: TempoFullNode,
93 fee_recipient: Address,
95 time_to_build_subblock: Duration,
97 subblock_broadcast_interval: Duration,
99 epoch_strategy: FixedEpocher,
101
102 consensus_tip: Option<(Round, BlockHash, Certificate<MinSig>)>,
104
105 subblocks: IndexMap<B256, RecoveredSubBlock>,
107 subblock_transactions: Arc<Mutex<IndexMap<TxHash, Arc<Recovered<TempoTxEnvelope>>>>>,
109}
110
111impl<TContext: Spawner + Metrics + Pacer> Actor<TContext> {
112 pub(crate) fn new(
113 Config {
114 context,
115 signer,
116 scheme_provider,
117 node,
118 fee_recipient,
119 time_to_build_subblock,
120 subblock_broadcast_interval,
121 epoch_strategy,
122 }: Config<TContext>,
123 ) -> Self {
124 let (actions_tx, actions_rx) = mpsc::unbounded();
125 Self {
126 our_subblock: PendingSubblock::None,
127 subblock_transactions_rx: node.add_ons_handle.eth_api().subblock_transactions_rx(),
128 scheme_provider,
129 actions_tx,
130 actions_rx,
131 context,
132 signer,
133 node,
134 fee_recipient,
135 time_to_build_subblock,
136 subblock_broadcast_interval,
137 epoch_strategy,
138 consensus_tip: None,
139 subblocks: Default::default(),
140 subblock_transactions: Default::default(),
141 }
142 }
143
144 pub(crate) fn mailbox(&self) -> Mailbox {
146 Mailbox {
147 tx: self.actions_tx.clone(),
148 }
149 }
150
151 pub(crate) async fn run(
152 mut self,
153 (mut network_tx, mut network_rx): (
154 impl Sender<PublicKey = PublicKey>,
155 impl Receiver<PublicKey = PublicKey>,
156 ),
157 ) {
158 loop {
159 let (subblock_task, broadcast_interval) = match &mut self.our_subblock {
160 PendingSubblock::None => (None, None),
161 PendingSubblock::Task(task) => (Some(task), None),
162 PendingSubblock::Built(built) => (None, Some(&mut built.broadcast_interval)),
163 };
164
165 tokio::select! {
166 biased;
167
168 Some(action) = self.actions_rx.next() => {
170 self.on_new_message(action);
171 },
172 result = self.subblock_transactions_rx.recv() => {
174 match result {
175 Ok(transaction) => {
176 self.on_new_subblock_transaction(transaction);
177 }
178 Err(broadcast::error::RecvError::Lagged(count)) => {
179 warn!(
180 lagged_count = count,
181 "subblock transaction receiver lagged, {} messages dropped",
182 count
183 );
184 }
185 Err(broadcast::error::RecvError::Closed) => {
186 error!("subblock transactions channel closed unexpectedly");
187 break;
188 }
189 }
190 },
191 Ok((sender, message)) = network_rx.recv() => {
193 let _ = self.on_network_message(sender, message, &mut network_tx).await;
194 },
195 subblock = if let Some(task) = subblock_task {
197 (&mut task.handle).fuse()
198 } else {
199 futures::future::Fuse::terminated()
200 } => {
201 let task = self.our_subblock.take_task().unwrap();
202 self.on_built_subblock(subblock, task.proposer).await;
203 }
204 _ = if let Some(broadcast_interval) = broadcast_interval {
206 broadcast_interval.fuse()
207 } else {
208 futures::future::Fuse::terminated()
209 } => {
210 self.broadcast_built_subblock(&mut network_tx).await;
211 }
212 }
213 }
214 }
215
216 fn tip(&self) -> Option<BlockHash> {
218 self.consensus_tip.as_ref().map(|(_, tip, _)| *tip)
219 }
220
221 fn on_new_message(&mut self, action: Message) {
222 match action {
223 Message::GetSubBlocks { parent, response } => {
224 if self.tip() != Some(parent) {
226 let _ = response.send(Vec::new());
227 return;
228 }
229 let subblocks = self.subblocks.values().cloned().collect();
231 let _ = response.send(subblocks);
232 }
233 Message::Consensus(activity) => self.on_consensus_event(*activity),
234 Message::ValidatedSubblock(subblock) => self.on_validated_subblock(subblock),
235 }
236 }
237
238 #[instrument(skip_all, fields(transaction.tx_hash = %transaction.tx_hash()))]
239 fn on_new_subblock_transaction(&self, transaction: Recovered<TempoTxEnvelope>) {
240 if !transaction
241 .subblock_proposer()
242 .is_some_and(|k| k.matches(self.signer.public_key()))
243 {
244 return;
245 }
246 let mut txs = self.subblock_transactions.lock();
247 if txs.len() >= MAX_SUBBLOCK_TXS {
248 return;
249 }
250 txs.insert(*transaction.tx_hash(), Arc::new(transaction));
251 }
252
253 #[instrument(skip_all, fields(event.epoch = %event.epoch(), event.view = %event.view()))]
255 fn on_consensus_event(&mut self, event: Activity<Scheme<PublicKey, MinSig>, Digest>) {
256 let (new_tip, new_round, new_cert) = match event {
257 Activity::Notarization(n) => {
258 (Some(n.proposal.payload.0), n.proposal.round, n.certificate)
259 }
260 Activity::Finalization(n) => {
261 (Some(n.proposal.payload.0), n.proposal.round, n.certificate)
262 }
263 Activity::Nullification(n) => (None, n.round, n.certificate),
264 _ => return,
265 };
266
267 if let Some((round, tip, cert)) = &mut self.consensus_tip
268 && *round <= new_round
269 {
270 *round = new_round;
271 *cert = new_cert;
272
273 if let Some(new_tip) = new_tip
274 && *tip != new_tip
275 {
276 self.subblocks.clear();
278 *tip = new_tip;
279 }
280 } else if self.consensus_tip.is_none()
281 && let Some(new_tip) = new_tip
282 {
283 self.consensus_tip = Some((new_round, new_tip, new_cert));
285 }
286
287 let Some((round, tip, certificate)) = &self.consensus_tip else {
288 return;
289 };
290
291 let Ok(Some(header)) = self
292 .node
293 .provider
294 .find_block_by_hash(*tip, BlockSource::Any)
295 else {
296 debug!(?tip, "missing header for the tip block at {tip}");
297 return;
298 };
299
300 let epoch_of_next_block = self
301 .epoch_strategy
302 .containing(Height::new(header.number() + 1))
303 .expect("epoch strategy covers all epochs")
304 .epoch();
305
306 let Some(scheme) = self.scheme_provider.scoped(epoch_of_next_block) else {
311 debug!(%epoch_of_next_block, "scheme not found for epoch");
312 return;
313 };
314
315 let next_round = if round.epoch() == epoch_of_next_block {
316 Round::new(round.epoch(), round.view().next())
317 } else {
318 Round::new(epoch_of_next_block, View::new(1))
319 };
320
321 let next_proposer = Random::select_leader::<MinSig>(
322 next_round,
323 scheme.participants().len() as u32,
324 certificate.get().map(|signature| signature.seed_signature),
325 );
326 let next_proposer = scheme.participants()[next_proposer.get() as usize].clone();
327
328 debug!(?next_proposer, ?next_round, "determined next proposer");
329
330 if self.our_subblock.parent_hash() != Some(*tip)
332 || self.our_subblock.target_proposer() != Some(&next_proposer)
333 {
334 debug!(%tip, %next_proposer, "building new subblock");
335 self.build_new_subblock(*tip, next_proposer, scheme);
336 }
337 }
338
339 fn build_new_subblock(
340 &mut self,
341 parent_hash: BlockHash,
342 next_proposer: PublicKey,
343 scheme: Arc<Scheme<PublicKey, MinSig>>,
344 ) {
345 let transactions = self.subblock_transactions.clone();
346 let node = self.node.clone();
347 let num_validators = scheme.participants().len();
348 let signer = self.signer.clone();
349 let fee_recipient = self.fee_recipient;
350 let timeout = self.time_to_build_subblock;
351 let span = Span::current();
352 let handle = self
353 .context
354 .with_label("validate_subblock")
355 .shared(true)
356 .spawn(move |_| {
357 build_subblock(
358 transactions,
359 node,
360 parent_hash,
361 num_validators,
362 signer,
363 fee_recipient,
364 timeout,
365 )
366 .instrument(span)
367 });
368
369 self.our_subblock = PendingSubblock::Task(BuildSubblockTask {
370 handle,
371 parent_hash,
372 proposer: next_proposer,
373 });
374 }
375
376 #[instrument(skip_all, err(level = Level::DEBUG), fields(sender = %sender, msg_bytes = message.len()))]
377 async fn on_network_message(
378 &mut self,
379 sender: PublicKey,
380 message: IoBuf,
381 network_tx: &mut impl Sender<PublicKey = PublicKey>,
382 ) -> eyre::Result<()> {
383 let message =
384 SubblocksMessage::decode(message).wrap_err("failed to decode network message")?;
385
386 let subblock = match message {
387 SubblocksMessage::Subblock(subblock) => subblock,
388 SubblocksMessage::Ack(ack) => {
390 if let PendingSubblock::Built(built) = &mut self.our_subblock
391 && built.proposer == sender
392 && ack == built.subblock.signature_hash()
393 {
394 debug!("received acknowledgement from the next proposer");
395 built.stop_broadcasting();
396 } else {
397 warn!(%ack, "received invalid acknowledgement");
398 }
399
400 return Ok(());
401 }
402 };
403
404 let Some(tip) = self.tip() else {
405 return Err(eyre::eyre!("missing tip of the chain"));
406 };
407
408 eyre::ensure!(
410 subblock.parent_hash == tip,
411 "invalid subblock parent, expected {tip}, got {}",
412 subblock.parent_hash
413 );
414
415 let _ = network_tx
421 .send(
422 Recipients::One(sender.clone()),
423 SubblocksMessage::Ack(subblock.signature_hash()).encode(),
424 true,
425 )
426 .await;
427
428 debug!("validating new subblock");
429
430 let node = self.node.clone();
432 let validated_subblocks_tx = self.actions_tx.clone();
433 let scheme_provider = self.scheme_provider.clone();
434 let epoch_strategy = self.epoch_strategy.clone();
435 let span = Span::current();
436 self.context.clone().shared(true).spawn(move |_| {
437 validate_subblock(
438 sender.clone(),
439 node,
440 subblock,
441 validated_subblocks_tx,
442 scheme_provider,
443 epoch_strategy,
444 )
445 .instrument(span)
446 });
447
448 Ok(())
449 }
450
451 #[instrument(skip_all, fields(subblock.validator = %subblock.validator(), subblock.parent_hash = %subblock.parent_hash))]
452 fn on_validated_subblock(&mut self, subblock: RecoveredSubBlock) {
453 if Some(subblock.parent_hash) != self.tip() {
455 return;
456 }
457
458 debug!(subblock = ?subblock, "validated subblock");
459
460 self.subblocks.insert(subblock.validator(), subblock);
461 }
462
463 #[instrument(skip_all)]
464 async fn on_built_subblock(
465 &mut self,
466 subblock: Result<RecoveredSubBlock, commonware_runtime::Error>,
467 next_proposer: PublicKey,
468 ) {
469 let subblock = match subblock {
470 Ok(subblock) => subblock,
471 Err(error) => {
472 warn!(%error, "failed to build subblock");
473 return;
474 }
475 };
476
477 if Some(subblock.parent_hash) != self.tip() {
478 return;
479 }
480
481 self.our_subblock = PendingSubblock::Built(BuiltSubblock {
482 subblock,
483 proposer: next_proposer,
484 broadcast_interval: Box::pin(futures::future::ready(())),
486 });
487 }
488
489 #[instrument(skip_all)]
490 async fn broadcast_built_subblock(
491 &mut self,
492 network_tx: &mut impl Sender<PublicKey = PublicKey>,
493 ) {
494 let PendingSubblock::Built(built) = &mut self.our_subblock else {
495 return;
496 };
497
498 built.broadcast_interval = Box::pin(self.context.sleep(self.subblock_broadcast_interval));
500
501 debug!(
502 ?built.subblock,
503 next_proposer = %built.proposer,
504 "sending subblock to the next proposer"
505 );
506
507 if built.proposer != self.signer.public_key() {
508 let _ = network_tx
509 .send(
510 Recipients::One(built.proposer.clone()),
511 SubblocksMessage::Subblock((*built.subblock).clone()).encode(),
512 true,
513 )
514 .await;
515 } else {
516 let subblock = built.subblock.clone();
517 built.stop_broadcasting();
518 self.on_validated_subblock(subblock);
519 }
520 }
521}
522
523#[derive(Debug)]
525enum Message {
526 GetSubBlocks {
531 parent: BlockHash,
533 response: std::sync::mpsc::SyncSender<Vec<RecoveredSubBlock>>,
535 },
536
537 Consensus(Box<Activity<Scheme<PublicKey, MinSig>, Digest>>),
539
540 ValidatedSubblock(RecoveredSubBlock),
542}
543
544#[derive(Default)]
546enum PendingSubblock {
547 #[default]
549 None,
550 Task(BuildSubblockTask),
552 Built(BuiltSubblock),
554}
555
556impl PendingSubblock {
557 fn take_task(&mut self) -> Option<BuildSubblockTask> {
559 if let Self::Task(task) = std::mem::take(self) {
560 Some(task)
561 } else {
562 None
563 }
564 }
565
566 fn parent_hash(&self) -> Option<BlockHash> {
568 match self {
569 Self::Task(task) => Some(task.parent_hash),
570 Self::Built(built) => Some(built.subblock.parent_hash),
571 Self::None => None,
572 }
573 }
574
575 fn target_proposer(&self) -> Option<&PublicKey> {
577 match self {
578 Self::Task(task) => Some(&task.proposer),
579 Self::Built(built) => Some(&built.proposer),
580 Self::None => None,
581 }
582 }
583}
584
585struct BuildSubblockTask {
587 handle: Handle<RecoveredSubBlock>,
589 parent_hash: BlockHash,
591 proposer: PublicKey,
593}
594
595struct BuiltSubblock {
597 subblock: RecoveredSubBlock,
599 proposer: PublicKey,
601 broadcast_interval: Pin<Box<dyn Future<Output = ()> + Send>>,
603}
604
605impl BuiltSubblock {
606 fn stop_broadcasting(&mut self) {
608 self.broadcast_interval = Box::pin(futures::future::pending());
609 }
610}
611
612#[derive(Debug)]
614enum SubblocksMessage {
615 Subblock(SignedSubBlock),
617 Ack(B256),
619}
620
621impl SubblocksMessage {
622 fn encode(self) -> bytes::Bytes {
624 match self {
625 Self::Subblock(subblock) => alloy_rlp::encode(&subblock).into(),
626 Self::Ack(hash) => bytes::Bytes::copy_from_slice(hash.as_ref()),
627 }
628 }
629
630 fn decode(message: IoBuf) -> alloy_rlp::Result<Self> {
632 if message.len() == 32 {
633 let hash = B256::from_slice(message.as_ref());
634 Ok(Self::Ack(hash))
635 } else {
636 let subblock = SignedSubBlock::decode(&mut message.as_ref())?;
637 Ok(Self::Subblock(subblock))
638 }
639 }
640}
641
642#[derive(Clone)]
644pub(crate) struct Mailbox {
645 tx: mpsc::UnboundedSender<Message>,
646}
647
648impl Mailbox {
649 pub(crate) fn get_subblocks(
650 &self,
651 parent: BlockHash,
652 ) -> Result<Vec<RecoveredSubBlock>, RecvError> {
653 let (tx, rx) = std::sync::mpsc::sync_channel(1);
654 let _ = self.tx.unbounded_send(Message::GetSubBlocks {
655 parent,
656 response: tx,
657 });
658 rx.recv()
659 }
660}
661
662impl Reporter for Mailbox {
663 type Activity = Activity<Scheme<PublicKey, MinSig>, Digest>;
664
665 async fn report(&mut self, activity: Self::Activity) -> () {
666 let _ = self
667 .tx
668 .unbounded_send(Message::Consensus(Box::new(activity)));
669 }
670}
671
672fn evm_at_block(
673 node: &TempoFullNode,
674 hash: BlockHash,
675) -> eyre::Result<TempoEvm<State<StateProviderDatabase<StateProviderBox>>>> {
676 let db = State::builder()
677 .with_database(StateProviderDatabase::new(
678 node.provider.state_by_block_hash(hash)?,
679 ))
680 .build();
681 let header = node
682 .provider
683 .find_block_by_hash(hash, BlockSource::Any)?
684 .ok_or(ProviderError::BestBlockNotFound)?;
685
686 Ok(node.evm_config.evm_for_block(db, &header)?)
687}
688
689#[instrument(skip_all, fields(parent_hash = %parent_hash))]
693async fn build_subblock(
694 transactions: Arc<Mutex<IndexMap<TxHash, Arc<Recovered<TempoTxEnvelope>>>>>,
695 node: TempoFullNode,
696 parent_hash: BlockHash,
697 num_validators: usize,
698 signer: PrivateKey,
699 fee_recipient: Address,
700 timeout: Duration,
701) -> RecoveredSubBlock {
702 let start = Instant::now();
703
704 let (transactions, senders) = match evm_at_block(&node, parent_hash) {
705 Ok(mut evm) => {
706 let (mut selected, mut senders, mut to_remove) = (Vec::new(), Vec::new(), Vec::new());
707 let gas_budget = (evm.block().gas_limit / TEMPO_SHARED_GAS_DIVISOR)
708 .checked_div(num_validators as u64)
709 .expect("validator set must not be empty");
710
711 let mut gas_left = gas_budget;
712 let txs = transactions.lock().clone();
713
714 for (tx_hash, tx) in txs {
715 if tx.gas_limit() > gas_budget {
717 warn!(
718 %tx_hash,
719 tx_gas_limit = tx.gas_limit(),
720 gas_budget,
721 "removing transaction with gas limit exceeding maximum subblock gas budget"
722 );
723 to_remove.push(tx_hash);
724 continue;
725 }
726
727 if tx.gas_limit() > gas_left {
729 continue;
730 }
731
732 if let Err(err) = evm.transact_commit(&*tx) {
733 warn!(%err, tx_hash = %tx_hash, "invalid subblock candidate transaction");
734 to_remove.push(tx_hash);
735 continue;
736 }
737
738 gas_left -= tx.gas_limit();
739 selected.push(tx.inner().clone());
740 senders.push(tx.signer());
741
742 if start.elapsed() > timeout {
743 break;
744 }
745 }
746
747 if !to_remove.is_empty() {
749 let mut txs = transactions.lock();
750 for hash in to_remove {
751 txs.swap_remove(&hash);
752 }
753 }
754
755 (selected, senders)
756 }
757 Err(err) => {
758 warn!(%err, "failed to build an evm at block, building an empty subblock");
759
760 Default::default()
761 }
762 };
763
764 let subblock = SubBlock {
765 version: SubBlockVersion::V1,
766 fee_recipient,
767 parent_hash,
768 transactions,
769 };
770
771 let signature = signer.sign(&[], subblock.signature_hash().as_slice());
773 let signed_subblock = SignedSubBlock {
774 inner: subblock,
775 signature: Bytes::copy_from_slice(signature.as_ref()),
776 };
777
778 RecoveredSubBlock::new_unchecked(
779 signed_subblock,
780 senders,
781 B256::from_slice(&signer.public_key()),
782 )
783}
784
785#[instrument(skip_all, err(level = Level::WARN), fields(sender = %sender))]
793async fn validate_subblock(
794 sender: PublicKey,
795 node: TempoFullNode,
796 subblock: SignedSubBlock,
797 actions_tx: mpsc::UnboundedSender<Message>,
798 scheme_provider: SchemeProvider,
799 epoch_strategy: FixedEpocher,
800) -> eyre::Result<()> {
801 let Ok(signature) =
802 ed25519::Signature::decode(&mut subblock.signature.as_ref()).wrap_err("invalid signature")
803 else {
804 return Err(eyre::eyre!("invalid signature"));
805 };
806
807 if !sender.verify(&[], subblock.signature_hash().as_slice(), &signature) {
809 return Err(eyre::eyre!("invalid signature"));
810 }
811
812 if subblock.transactions.iter().any(|tx| {
813 tx.subblock_proposer()
814 .is_none_or(|proposer| !proposer.matches(&sender))
815 }) {
816 return Err(eyre::eyre!(
817 "all transactions must specify the subblock validator"
818 ));
819 }
820
821 let subblock = subblock.try_into_recovered(B256::from_slice(&sender))?;
823
824 let mut evm = evm_at_block(&node, subblock.parent_hash)?;
825
826 let epoch = epoch_strategy
827 .containing(Height::new(evm.block().number.to::<u64>() + 1))
828 .expect("epoch strategy covers all epochs")
829 .epoch();
830 let scheme = scheme_provider
831 .scoped(epoch)
832 .ok_or_eyre("scheme not found")?;
833 let participants = scheme.participants().len() as usize;
834
835 eyre::ensure!(
836 scheme.participants().iter().any(|p| p == &sender),
837 "sender is not a validator"
838 );
839
840 let max_size = MAX_RLP_BLOCK_SIZE / TEMPO_SHARED_GAS_DIVISOR as usize / participants;
844 if subblock.total_tx_size() > max_size {
845 warn!(
846 size = subblock.total_tx_size(),
847 max_size, "subblock is too large, skipping"
848 );
849 return Ok(());
850 }
851
852 let gas_budget = evm.block().gas_limit / TEMPO_SHARED_GAS_DIVISOR / participants as u64;
854 let mut total_gas = 0u64;
855 for tx in subblock.transactions_recovered() {
856 total_gas = total_gas.saturating_add(tx.gas_limit());
857 if total_gas > gas_budget {
858 warn!(
859 total_gas,
860 gas_budget, "subblock exceeds gas budget, skipping"
861 );
862 return Ok(());
863 }
864 }
865
866 for tx in subblock.transactions_recovered() {
868 if let Err(err) = evm.transact_commit(tx) {
869 return Err(eyre::eyre!("transaction failed to execute: {err:?}"));
870 }
871 }
872
873 let _ = actions_tx.unbounded_send(Message::ValidatedSubblock(subblock));
874
875 Ok(())
876}