tempo_commonware_node/consensus/application/
actor.rs

1//! The actor running the application event loop.
2//!
3//! # On the usage of the commonware-pacer
4//!
5//! The actor will contain `Pacer::pace` calls for all interactions
6//! with the execution layer. This is a no-op in production because the
7//! commonware tokio runtime ignores these. However, these are critical in
8//! e2e tests using the commonware deterministic runtime: since the execution
9//! layer is still running on the tokio runtime, these calls signal the
10//! deterministic runtime to spend real life time to wait for the execution
11//! layer calls to complete.
12
13use std::{sync::Arc, time::Duration};
14
15use alloy_consensus::BlockHeader;
16use alloy_primitives::{B256, Bytes};
17use alloy_rpc_types_engine::PayloadId;
18use commonware_codec::{DecodeExt as _, Encode as _};
19use commonware_consensus::{
20    Block as _,
21    marshal::SchemeProvider as _,
22    types::{Epoch, Round, View},
23    utils,
24};
25use commonware_cryptography::ed25519::PublicKey;
26use commonware_macros::select;
27use commonware_runtime::{
28    ContextCell, FutureExt as _, Handle, Metrics, Pacer, Spawner, Storage, spawn_cell,
29};
30
31use commonware_utils::SystemTimeExt;
32use eyre::{OptionExt as _, WrapErr as _, bail, ensure, eyre};
33use futures::{
34    StreamExt as _, TryFutureExt as _,
35    channel::{mpsc, oneshot},
36    future::{Either, always_ready, ready, try_join},
37};
38use rand::{CryptoRng, Rng};
39use reth_node_builder::ConsensusEngineHandle;
40use reth_primitives_traits::SealedBlock;
41use tempo_dkg_onchain_artifacts::PublicOutcome;
42use tempo_node::{TempoExecutionData, TempoFullNode, TempoPayloadTypes};
43
44use reth_provider::BlockReader as _;
45use tokio::sync::RwLock;
46use tracing::{Level, debug, error, error_span, info, instrument, warn};
47
48use tempo_payload_types::TempoPayloadBuilderAttributes;
49
50use super::{
51    Mailbox, executor,
52    executor::ExecutorMailbox,
53    ingress::{Broadcast, Finalized, Genesis, Message, Propose, Verify},
54};
55use crate::{
56    consensus::{Digest, block::Block},
57    epoch::SchemeProvider,
58    subblocks,
59};
60
61pub(in crate::consensus) struct Actor<TContext, TState = Uninit> {
62    context: ContextCell<TContext>,
63    mailbox: mpsc::Receiver<Message>,
64
65    inner: Inner<TState>,
66}
67
68impl<TContext, TState> Actor<TContext, TState> {
69    pub(super) fn mailbox(&self) -> &Mailbox {
70        &self.inner.my_mailbox
71    }
72}
73
74impl<TContext> Actor<TContext, Uninit>
75where
76    TContext: Pacer + governor::clock::Clock + Rng + CryptoRng + Spawner + Storage + Metrics,
77{
78    pub(super) async fn init(config: super::Config<TContext>) -> eyre::Result<Self> {
79        let (tx, rx) = mpsc::channel(config.mailbox_size);
80        let my_mailbox = Mailbox::from_sender(tx);
81
82        let block = config
83            .execution_node
84            .provider
85            .block_by_number(0)
86            .map_err(Into::<eyre::Report>::into)
87            .and_then(|maybe| maybe.ok_or_eyre("block reader returned empty genesis block"))
88            .wrap_err("failed reading genesis block from execution node")?;
89
90        Ok(Self {
91            context: ContextCell::new(config.context),
92            mailbox: rx,
93
94            inner: Inner {
95                fee_recipient: config.fee_recipient,
96                epoch_length: config.epoch_length,
97                new_payload_wait_time: config.new_payload_wait_time,
98
99                my_mailbox,
100                marshal: config.marshal,
101
102                genesis_block: Arc::new(Block::from_execution_block(SealedBlock::seal_slow(block))),
103
104                execution_node: config.execution_node,
105                subblocks: config.subblocks,
106
107                scheme_provider: config.scheme_provider,
108
109                state: Uninit(()),
110            },
111        })
112    }
113
114    /// Runs the actor until it is externally stopped.
115    async fn run_until_stopped(self, dkg_manager: crate::dkg::manager::Mailbox) {
116        let Self {
117            context,
118            mailbox,
119            inner,
120        } = self;
121        // TODO(janis): should be placed under a shutdown signal so we don't
122        // just stall on startup.
123        let Ok(initialized) = inner.into_initialized(context.clone(), dkg_manager).await else {
124            // XXX: relies on into_initialized generating an error event before exit.
125            return;
126        };
127
128        Actor {
129            context,
130            mailbox,
131            inner: initialized,
132        }
133        .run_until_stopped()
134        .await
135    }
136
137    pub(in crate::consensus) fn start(
138        mut self,
139        dkg_manager: crate::dkg::manager::Mailbox,
140    ) -> Handle<()> {
141        spawn_cell!(self.context, self.run_until_stopped(dkg_manager).await)
142    }
143}
144
145impl<TContext> Actor<TContext, Init>
146where
147    TContext: Pacer + governor::clock::Clock + Rng + CryptoRng + Spawner + Storage + Metrics,
148{
149    async fn run_until_stopped(mut self) {
150        while let Some(msg) = self.mailbox.next().await {
151            if let Err(error) = self.handle_message(msg) {
152                error_span!("handle message").in_scope(|| {
153                    error!(
154                        %error,
155                        "critical error occurred while handling message; exiting"
156                    )
157                });
158                break;
159            }
160        }
161    }
162
163    fn handle_message(&mut self, msg: Message) -> eyre::Result<()> {
164        match msg {
165            Message::Broadcast(broadcast) => {
166                self.context.with_label("broadcast").spawn({
167                    let inner = self.inner.clone();
168                    move |_| inner.handle_broadcast(broadcast)
169                });
170            }
171            Message::Finalized(finalized) => {
172                // XXX: being able to finalize is the only stop condition.
173                // There is no point continuing if this doesn't work.
174                self.inner
175                    .handle_finalized(*finalized)
176                    .wrap_err("failed finalizing block")?;
177            }
178            Message::Genesis(genesis) => {
179                self.context.with_label("genesis").spawn({
180                    let inner = self.inner.clone();
181                    move |_| inner.handle_genesis(genesis)
182                });
183            }
184            Message::Propose(propose) => {
185                self.context.with_label("propose").spawn({
186                    let inner = self.inner.clone();
187                    move |context| inner.handle_propose(propose, context)
188                });
189            }
190            Message::Verify(verify) => {
191                self.context.with_label("verify").spawn({
192                    let inner = self.inner.clone();
193                    move |context| inner.handle_verify(*verify, context)
194                });
195            }
196        }
197        Ok(())
198    }
199}
200
201#[derive(Clone)]
202struct Inner<TState> {
203    fee_recipient: alloy_primitives::Address,
204    epoch_length: u64,
205    new_payload_wait_time: Duration,
206
207    my_mailbox: Mailbox,
208
209    marshal: crate::alias::marshal::Mailbox,
210
211    genesis_block: Arc<Block>,
212    execution_node: TempoFullNode,
213    subblocks: subblocks::Mailbox,
214    scheme_provider: SchemeProvider,
215
216    state: TState,
217}
218
219impl Inner<Init> {
220    #[instrument(
221        skip_all,
222        fields(%broadcast.payload),
223        err(level = Level::ERROR),
224    )]
225    async fn handle_broadcast(mut self, broadcast: Broadcast) -> eyre::Result<()> {
226        let Some(latest_proposed) = self.state.latest_proposed_block.read().await.clone() else {
227            return Err(eyre!("there was no latest block to broadcast"));
228        };
229        ensure!(
230            broadcast.payload == latest_proposed.digest(),
231            "broadcast of payload `{}` was requested, but digest of latest proposed block is `{}`",
232            broadcast.payload,
233            latest_proposed.digest(),
234        );
235
236        self.marshal.broadcast(latest_proposed).await;
237        Ok(())
238    }
239
240    #[instrument(skip_all)]
241    /// Pushes a `finalized` request to the back of the finalization queue.
242    fn handle_finalized(&self, finalized: Finalized) -> eyre::Result<()> {
243        self.state.executor_mailbox.forward_finalized(finalized)
244    }
245
246    #[instrument(
247        skip_all,
248        fields(
249            epoch = genesis.epoch,
250        ),
251        ret(Display),
252        err(level = Level::ERROR)
253    )]
254    async fn handle_genesis(mut self, genesis: Genesis) -> eyre::Result<Digest> {
255        let source = if genesis.epoch == 0 {
256            self.genesis_block.digest()
257        } else {
258            // The last block of the *previous* epoch provides the "genesis"
259            // of the *current* epoch. Only epoch 0 is special cased above.
260            let height =
261                utils::last_block_in_epoch(self.epoch_length, genesis.epoch.saturating_sub(1));
262
263            let Some((_, digest)) = self.marshal.get_info(height).await else {
264                // XXX: the None case here should not be hit:
265                // 1. an epoch transition is triggered by the application
266                // finalizing the last block of the outgoing epoch.
267                // 2. the finalized block is received from the marshal actor,
268                // so we know it must be available and indexed
269                // by the marshaller.
270                // 3. this means this call should always succeed.
271                //
272                // TODO(janis): should we panic instead?
273                bail!(
274                    "no information on the source block at height `{height}` \
275                    exists yet; this is a problem and will likely cause the \
276                    consensus engine to not start"
277                );
278            };
279            digest
280        };
281        genesis.response.send(source).map_err(|_| {
282            eyre!("failed returning parent digest for epoch: return channel was already closed")
283        })?;
284        Ok(source)
285    }
286
287    /// Handles a [`Propose`] request.
288    #[instrument(
289        skip_all,
290        fields(
291            epoch = request.round.epoch(),
292            view = request.round.view(),
293            parent.view = request.parent.0,
294            parent.digest = %request.parent.1,
295        ),
296        err(level = Level::WARN),
297    )]
298    async fn handle_propose<TContext: Pacer>(
299        self,
300        request: Propose,
301        context: TContext,
302    ) -> eyre::Result<()> {
303        let Propose {
304            parent: (parent_view, parent_digest),
305            mut response,
306            round,
307        } = request;
308
309        let proposal = select!(
310            () = response.cancellation() => {
311                Err(eyre!(
312                    "proposal return channel was closed by consensus \
313                    engine before block could be proposed; aborting"
314                ))
315           },
316
317            res = self.clone().propose(
318                context.clone(),
319                parent_view,
320                parent_digest,
321                round
322            ) => {
323                res.wrap_err("failed creating a proposal")
324            }
325        )?;
326
327        let proposal_digest = proposal.digest();
328        let proposal_height = proposal.height();
329
330        info!(
331            proposal.digest = %proposal_digest,
332            proposal.height = %proposal_height,
333            "constructed proposal",
334        );
335
336        response.send(proposal_digest).map_err(|_| {
337            eyre!(
338                "failed returning proposal to consensus engine: response \
339                channel was already closed"
340            )
341        })?;
342
343        // If re-proposing, then don't store the parent for broadcasting and
344        // don't touch the execution layer.
345        if proposal_digest == parent_digest {
346            return Ok(());
347        }
348
349        {
350            let mut lock = self.state.latest_proposed_block.write().await;
351            *lock = Some(proposal.clone());
352        }
353
354        // Make sure reth sees the new payload so that in the next round we can
355        // verify blocks on top of it.
356        let is_good = verify_block(
357            context,
358            round.epoch(),
359            self.epoch_length,
360            self.execution_node
361                .add_ons_handle
362                .beacon_engine_handle
363                .clone(),
364            &proposal,
365            parent_digest,
366            &self.scheme_provider,
367        )
368        .await
369        .wrap_err("failed verifying block against execution layer")?;
370
371        if !is_good {
372            eyre::bail!("validation reported that that just-proposed block is invalid");
373        }
374
375        Ok(())
376    }
377
378    /// Verifies a [`Verify`] request.
379    ///
380    /// this method only renders a decision on the `verify.response`
381    /// channel if it was able to come to a boolean decision. If it was
382    /// unable to refute or prove the validity of the block it will
383    /// return an error and drop the response channel.
384    ///
385    /// Conditions for which no decision could be made are usually:
386    /// no block could be read from the syncer or communication with the
387    /// execution layer failed.
388    #[instrument(
389        skip_all,
390        fields(
391            epoch = verify.round.epoch(),
392            view = verify.round.view(),
393            digest = %verify.payload,
394            parent.view = verify.parent.0,
395            parent.digest = %verify.parent.1,
396            proposer = %verify.proposer,
397        ),
398    )]
399    async fn handle_verify<TContext: Pacer>(mut self, verify: Verify, context: TContext) {
400        let Verify {
401            parent,
402            payload,
403            proposer,
404            mut response,
405            round,
406        } = verify;
407        let result = select!(
408            () = response.cancellation() => {
409                Err(eyre!(
410                    "verification return channel was closed by consensus \
411                    engine before block could be validated; aborting"
412                ))
413            },
414
415            res = self.clone().verify(context, parent, payload, proposer, round) => {
416                res.wrap_err("block verification failed")
417            }
418        );
419
420        // Respond with the verification result ASAP. Also generates
421        // the event reporting the result of the verification.
422        let _ = report_verification_result(response, &result);
423
424        // 2. make the forkchoice state available && cache the block
425        if let Ok((block, true)) = result {
426            // Only make the verified block canonical when not doing a
427            // re-propose at the end of an epoch.
428            if parent.1 != payload
429                && let Err(error) = self
430                    .state
431                    .executor_mailbox
432                    .canonicalize_head(block.height(), block.digest())
433            {
434                tracing::warn!(
435                    %error,
436                    "failed making the verified proposal the head of the canonical chain",
437                );
438            }
439            self.marshal.verified(round, block).await;
440        }
441    }
442
443    async fn propose<TContext: Pacer>(
444        mut self,
445        context: TContext,
446        parent_view: View,
447        parent_digest: Digest,
448        round: Round,
449    ) -> eyre::Result<Block> {
450        let genesis_block = self.genesis_block.clone();
451        let parent_request = if parent_digest == genesis_block.digest() {
452            Either::Left(always_ready(|| Ok((*genesis_block).clone())))
453        } else {
454            Either::Right(
455                self.marshal
456                    .subscribe(Some(Round::new(round.epoch(), parent_view)), parent_digest)
457                    .await,
458            )
459        };
460        let parent = parent_request
461            .await
462            .map_err(|_| eyre!(
463                "failed getting parent block from syncer; syncer dropped channel before request was fulfilled"
464            ))?;
465
466        debug!(height = parent.height(), "retrieved parent block",);
467
468        // XXX: Re-propose the parent if the parent is the last height of the
469        // epoch. parent.height+1 should be proposed as the first block of the
470        // next epoch.
471        if utils::is_last_block_in_epoch(self.epoch_length, parent.height())
472            .is_some_and(|e| e == round.epoch())
473        {
474            info!("parent is last height of epoch; re-proposing parent");
475            return Ok(parent);
476        }
477
478        // Send the proposal parent to reth to cover edge cases when we were not asked to verify it directly.
479        if !verify_block(
480            context.clone(),
481            utils::epoch(self.epoch_length, parent.height()),
482            self.epoch_length,
483            self.execution_node
484                .add_ons_handle
485                .beacon_engine_handle
486                .clone(),
487            &parent,
488            // It is safe to not verify the parent of the parent because this block is already notarized.
489            parent.parent_digest(),
490            &self.scheme_provider,
491        )
492        .await
493        .wrap_err("failed verifying block against execution layer")?
494        {
495            eyre::bail!("the proposal parent block is not valid");
496        }
497
498        ready(
499            self.state
500                .executor_mailbox
501                .canonicalize_head(parent.height(), parent.digest()),
502        )
503        .and_then(|ack| ack.map_err(eyre::Report::new))
504        .await
505        .wrap_err("failed updating canonical head to parent")?;
506
507        // Query DKG manager for ceremony data before building payload
508        // This data will be passed to the payload builder via attributes
509        let extra_data = if utils::is_last_block_in_epoch(self.epoch_length, parent.height() + 1)
510            .is_some_and(|e| e == round.epoch())
511        {
512            // At epoch boundary: include public ceremony outcome
513            let outcome = self
514                .state
515                .dkg_manager
516                .get_public_ceremony_outcome()
517                .await
518                .wrap_err("failed getting public dkg ceremony outcome")?;
519            ensure!(
520                round.epoch() + 1 == outcome.epoch,
521                "outcome is for epoch `{}`, but we are trying to include the \
522                outcome for epoch `{}`",
523                outcome.epoch,
524                round.epoch() + 1,
525            );
526            info!(
527                outcome.epoch,
528                "received DKG outcome; will include in payload builder attributes",
529            );
530            outcome.encode().freeze().into()
531        } else {
532            // Regular block: try to include intermediate dealing
533            match self
534                .state
535                .dkg_manager
536                .get_intermediate_dealing(round.epoch())
537                .await
538            {
539                Err(error) => {
540                    warn!(
541                        %error,
542                        "failed getting ceremony deal for current epoch because DKG manager went away",
543                    );
544                    Bytes::default()
545                }
546                Ok(None) => Bytes::default(),
547                Ok(Some(deal_outcome)) => {
548                    info!(
549                        "found ceremony deal outcome; will include in payload builder attributes"
550                    );
551                    deal_outcome.encode().freeze().into()
552                }
553            }
554        };
555
556        let attrs = TempoPayloadBuilderAttributes::new(
557            // XXX: derives the payload ID from the parent so that
558            // overlong payload builds will eventually succeed on the
559            // next iteration: if all other nodes take equally as long,
560            // the consensus engine will kill the proposal task (see
561            // also `response.cancellation` below). Then eventually
562            // consensus will circle back to an earlier node, which then
563            // has the chance of picking up the old payload.
564            payload_id_from_block_hash(&parent.block_hash()),
565            parent.block_hash(),
566            self.fee_recipient,
567            context.current().epoch_millis(),
568            extra_data,
569            move || {
570                self.subblocks
571                    .get_subblocks(parent.block_hash())
572                    .unwrap_or_default()
573            },
574        );
575
576        let interrupt_handle = attrs.interrupt_handle().clone();
577
578        let payload_id = self
579            .execution_node
580            .payload_builder_handle
581            .send_new_payload(attrs)
582            .pace(&context, Duration::from_millis(20))
583            .await
584            .map_err(|_| eyre!("channel was closed before a response was returned"))
585            .and_then(|ret| ret.wrap_err("execution layer rejected request"))
586            .wrap_err("failed requesting new payload from the execution layer")?;
587
588        debug!(
589            timeout_ms = self.new_payload_wait_time.as_millis(),
590            "sleeping for payload builder timeout"
591        );
592        context.sleep(self.new_payload_wait_time).await;
593
594        interrupt_handle.interrupt();
595
596        let payload = self
597            .execution_node
598            .payload_builder_handle
599            .resolve_kind(payload_id, reth_node_builder::PayloadKind::WaitForPending)
600            .pace(&context, Duration::from_millis(20))
601            .await
602            // XXX: this returns Option<Result<_, _>>; drilling into
603            // resolve_kind this really seems to resolve to None if no
604            // payload_id was found.
605            .ok_or_eyre("no payload found under provided id")
606            .and_then(|rsp| rsp.map_err(Into::<eyre::Report>::into))
607            .wrap_err_with(|| format!("failed getting payload for payload ID `{payload_id}`"))?;
608
609        Ok(Block::from_execution_block(payload.block().clone()))
610    }
611
612    async fn verify<TContext: Pacer>(
613        mut self,
614        context: TContext,
615        (parent_view, parent_digest): (View, Digest),
616        payload: Digest,
617        proposer: PublicKey,
618        round: Round,
619    ) -> eyre::Result<(Block, bool)> {
620        let genesis_block = self.genesis_block.clone();
621        let parent_request = if parent_digest == genesis_block.digest() {
622            Either::Left(always_ready(|| Ok((*genesis_block).clone())))
623        } else {
624            Either::Right(
625                self.marshal
626                    .subscribe(Some(Round::new(round.epoch(), parent_view)), parent_digest)
627                    .await
628                    .map_err(|_| eyre!("syncer dropped channel before the parent block was sent")),
629            )
630        };
631        let block_request = self
632            .marshal
633            .subscribe(None, payload)
634            .await
635            .map_err(|_| eyre!("syncer dropped channel before the block-to-verified was sent"));
636
637        let (block, parent) = try_join(block_request, parent_request)
638            .await
639            .wrap_err("failed getting required blocks from syncer")?;
640
641        // Can only repropose at the end of an epoch.
642        //
643        // NOTE: fetching block and parent twice (in the case block == parent)
644        // seems wasteful, but both run concurrently, should finish almost
645        // immediately, and happen very rarely. It's better to optimize for the
646        // general case.
647        if payload == parent_digest {
648            if utils::is_last_block_in_epoch(self.epoch_length, block.height())
649                .is_some_and(|e| e == round.epoch())
650            {
651                return Ok((block, true));
652            } else {
653                return Ok((block, false));
654            }
655        }
656
657        if let Err(reason) = verify_header_extra_data(
658            &block,
659            &self.state.dkg_manager,
660            self.epoch_length,
661            &proposer,
662        )
663        .await
664        {
665            warn!(
666                %reason,
667                "header extra data could not be verified; failing block",
668            );
669            return Ok((block, false));
670        }
671
672        if let Err(error) = self
673            .state
674            .executor_mailbox
675            .canonicalize_head(parent.height(), parent.digest())
676        {
677            tracing::warn!(
678                %error,
679                parent.height = parent.height(),
680                parent.digest = %parent.digest(),
681                "failed updating canonical head to parent",
682            );
683        }
684
685        let is_good = verify_block(
686            context,
687            round.epoch(),
688            self.epoch_length,
689            self.execution_node
690                .add_ons_handle
691                .beacon_engine_handle
692                .clone(),
693            &block,
694            parent_digest,
695            &self.scheme_provider,
696        )
697        .await
698        .wrap_err("failed verifying block against execution layer")?;
699
700        Ok((block, is_good))
701    }
702}
703
704impl Inner<Uninit> {
705    /// Returns a fully initialized actor using runtime information.
706    ///
707    /// This includes:
708    ///
709    /// 1. reading the last finalized digest from the consensus marshaller.
710    /// 2. starting the canonical chain engine and storing its handle.
711    #[instrument(skip_all, err)]
712    async fn into_initialized<TContext: Metrics + Spawner + Pacer>(
713        self,
714        context: TContext,
715        dkg_manager: crate::dkg::manager::Mailbox,
716    ) -> eyre::Result<Inner<Init>> {
717        let executor = executor::Builder {
718            execution_node: self.execution_node.clone(),
719            genesis_block: self.genesis_block.clone(),
720            marshal: self.marshal.clone(),
721        }
722        .build(context.with_label("executor"));
723
724        let executor_mailbox = executor.mailbox().clone();
725        let executor_handle = executor.start();
726
727        let initialized = Inner {
728            fee_recipient: self.fee_recipient,
729            epoch_length: self.epoch_length,
730            new_payload_wait_time: self.new_payload_wait_time,
731            my_mailbox: self.my_mailbox,
732            marshal: self.marshal,
733            genesis_block: self.genesis_block,
734            execution_node: self.execution_node,
735            state: Init {
736                latest_proposed_block: Arc::new(RwLock::new(None)),
737                dkg_manager,
738                executor_mailbox,
739                _executor_handle: AbortOnDrop(executor_handle).into(),
740            },
741            subblocks: self.subblocks,
742            scheme_provider: self.scheme_provider,
743        };
744
745        Ok(initialized)
746    }
747}
748
749/// Marker type to signal that the actor is not fully initialized.
750#[derive(Clone, Debug)]
751pub(in crate::consensus) struct Uninit(());
752
753/// Carries the runtime initialized state of the application.
754#[derive(Clone, Debug)]
755struct Init {
756    latest_proposed_block: Arc<RwLock<Option<Block>>>,
757    dkg_manager: crate::dkg::manager::Mailbox,
758    /// The communication channel to the [`executor::Executor`] task.
759    executor_mailbox: ExecutorMailbox,
760    /// The handle to the spawned executor task.
761    ///
762    /// If the last instance of this is dropped (the application task is aborted),
763    /// this ensures that the task is aborted as well.
764    _executor_handle: Arc<AbortOnDrop>,
765}
766
767/// Verifies `block` given its `parent` against the execution layer.
768///
769/// Returns whether the block is valid or not. Returns an error if validation
770/// was not possible, for example if communication with the execution layer
771/// failed.
772///
773/// Reason the reason for why a block was not valid is communicated as a
774/// tracing event.
775#[instrument(
776    skip_all,
777    fields(
778        epoch,
779        epoch_length,
780        block.parent_digest = %block.parent_digest(),
781        block.digest = %block.digest(),
782        block.height = block.height(),
783        block.timestamp = block.timestamp(),
784        parent.digest = %parent_digest,
785    )
786)]
787async fn verify_block<TContext: Pacer>(
788    context: TContext,
789    epoch: Epoch,
790    epoch_length: u64,
791    engine: ConsensusEngineHandle<TempoPayloadTypes>,
792    block: &Block,
793    parent_digest: Digest,
794    scheme_provider: &SchemeProvider,
795) -> eyre::Result<bool> {
796    use alloy_rpc_types_engine::PayloadStatusEnum;
797
798    if utils::epoch(epoch_length, block.height()) != epoch {
799        info!("block does not belong to this epoch");
800        return Ok(false);
801    }
802    if block.parent_hash() != *parent_digest {
803        info!(
804            "parent digest stored in block must match the digest of the parent \
805            argument but doesn't"
806        );
807        return Ok(false);
808    }
809    let scheme = scheme_provider
810        .scheme(epoch)
811        .ok_or_eyre("cannot determine participants in the current epoch")?;
812    let block = block.clone().into_inner();
813    let execution_data = TempoExecutionData {
814        block: Arc::new(block),
815        validator_set: Some(
816            scheme
817                .participants()
818                .into_iter()
819                .map(|p| B256::from_slice(p))
820                .collect(),
821        ),
822    };
823    let payload_status = engine
824        .new_payload(execution_data)
825        .pace(&context, Duration::from_millis(50))
826        .await
827        .wrap_err("failed sending `new payload` message to execution layer to validate block")?;
828    match payload_status.status {
829        PayloadStatusEnum::Valid | PayloadStatusEnum::Accepted => Ok(true),
830        PayloadStatusEnum::Invalid { validation_error } => {
831            info!(
832                validation_error,
833                "execution layer returned that the block was invalid"
834            );
835            Ok(false)
836        }
837        PayloadStatusEnum::Syncing => {
838            // FIXME: is this error message correct?
839            bail!(
840                "failed validating block because payload is still syncing, \
841                this means the parent block was available to the consensus
842                layer but not the execution layer"
843            )
844        }
845    }
846}
847
848#[instrument(skip_all, err(Display))]
849async fn verify_header_extra_data(
850    block: &Block,
851    dkg_manager: &crate::dkg::manager::Mailbox,
852    epoch_length: u64,
853    proposer: &PublicKey,
854) -> eyre::Result<()> {
855    if utils::is_last_block_in_epoch(epoch_length, block.height()).is_some() {
856        info!(
857            "on last block of epoch; verifying that the boundary block \
858            contains the correct DKG outcome",
859        );
860        let our_outcome = dkg_manager.get_public_ceremony_outcome().await.wrap_err(
861            "failed getting public dkg ceremony outcome; cannot verify end \
862                of epoch block",
863        )?;
864        let block_outcome = PublicOutcome::decode(block.header().extra_data().as_ref()).wrap_err(
865            "failed decoding extra data header as DKG ceremony \
866                outcome; cannot verify end of epoch block",
867        )?;
868        if our_outcome != block_outcome {
869            // Emit the log here so that it's structured. The error would be annoying to read.
870            warn!(
871                our.epoch = our_outcome.epoch,
872                our.participants = ?our_outcome.participants,
873                our.public = ?our_outcome.public,
874                block.epoch = block_outcome.epoch,
875                block.participants = ?block_outcome.participants,
876                block.public = ?block_outcome.public,
877                "our public dkg ceremony outcome does not match what's stored \
878                in the block",
879            );
880            return Err(eyre!(
881                "our public dkg ceremony outcome does not match what's \
882                stored in the block header extra_data field; they must \
883                match so that the end-of-block is valid",
884            ));
885        }
886    } else if !block.header().extra_data().is_empty()
887        && let Ok(dealing) = block.try_read_ceremony_deal_outcome()
888    {
889        info!("block header extra_data header contained intermediate DKG dealing; verifying it");
890        ensure!(
891            dealing.dealer() == proposer,
892            "proposer `{proposer}` is not the dealer `{}` recorded in the \
893            intermediate DKG dealing",
894            dealing.dealer(),
895        );
896
897        ensure!(
898            dkg_manager
899                .verify_intermediate_dealings(dealing)
900                .await
901                .wrap_err("failed request to verify DKG dealing")?,
902            "signature of intermediate DKG outcome could not be verified",
903        );
904    }
905
906    Ok(())
907}
908
909/// Constructs a [`PayloadId`] from the first 8 bytes of `block_hash`.
910fn payload_id_from_block_hash(block_hash: &B256) -> PayloadId {
911    PayloadId::new(
912        <[u8; 8]>::try_from(&block_hash[0..8])
913            .expect("a 32 byte array always has more than 8 bytes"),
914    )
915}
916
917/// Reports the verification result as a tracing event and consensus response.
918///
919/// This means either sending true/false if a decision could be rendered, or
920/// dropping the channel, if not.
921#[instrument(skip_all, err)]
922fn report_verification_result(
923    response: oneshot::Sender<bool>,
924    verification_result: &eyre::Result<(Block, bool)>,
925) -> eyre::Result<()> {
926    match &verification_result {
927        Ok((_, is_good)) => {
928            info!(
929                proposal_valid = is_good,
930                "returning proposal verification result to consensus",
931            );
932            response.send(*is_good).map_err(|_| {
933                eyre!(
934                    "attempted to send return verification result, but \
935                        receiver already dropped the channel"
936                )
937            })?;
938        }
939        Err(error) => {
940            info!(
941                %error,
942                "could not decide proposal, dropping response channel",
943            );
944        }
945    }
946    Ok(())
947}
948
949/// Ensures the task associated with the [`Handle`] is aborted [`Handle::abort`] when this instance is dropped.
950struct AbortOnDrop(Handle<()>);
951
952impl Drop for AbortOnDrop {
953    fn drop(&mut self) {
954        self.0.abort();
955    }
956}
957
958impl std::fmt::Debug for AbortOnDrop {
959    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
960        f.debug_struct("AbortOnDrop").finish_non_exhaustive()
961    }
962}