Skip to main content

tempo_commonware_node/consensus/application/
actor.rs

1//! The actor running the application event loop.
2//!
3//! # On the usage of the commonware-pacer
4//!
5//! The actor will contain `Pacer::pace` calls for all interactions
6//! with the execution layer. This is a no-op in production because the
7//! commonware tokio runtime ignores these. However, these are critical in
8//! e2e tests using the commonware deterministic runtime: since the execution
9//! layer is still running on the tokio runtime, these calls signal the
10//! deterministic runtime to spend real life time to wait for the execution
11//! layer calls to complete.
12
13use std::{sync::Arc, time::Duration};
14
15use alloy_consensus::BlockHeader;
16use alloy_primitives::{B256, Bytes};
17use alloy_rpc_types_engine::PayloadId;
18use commonware_codec::{Encode as _, ReadExt as _};
19use commonware_consensus::{
20    Heightable as _,
21    types::{Epoch, Epocher as _, FixedEpocher, Height, HeightDelta, Round, View},
22};
23use commonware_cryptography::{certificate::Provider as _, ed25519::PublicKey};
24use commonware_macros::select;
25use commonware_runtime::{
26    ContextCell, FutureExt as _, Handle, Metrics, Pacer, Spawner, Storage, spawn_cell,
27};
28
29use commonware_utils::{SystemTimeExt, channel::oneshot};
30use eyre::{OptionExt as _, WrapErr as _, bail, ensure, eyre};
31use futures::{
32    StreamExt as _, TryFutureExt as _,
33    channel::mpsc,
34    future::{ready, try_join},
35};
36use rand_08::{CryptoRng, Rng};
37use reth_ethereum::chainspec::EthChainSpec as _;
38use reth_node_builder::{Block as _, BuiltPayload, ConsensusEngineHandle};
39use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
40use tempo_node::{TempoExecutionData, TempoFullNode, TempoPayloadTypes};
41
42use reth_provider::{BlockHashReader as _, BlockReader as _};
43use tokio::sync::RwLock;
44use tracing::{Level, debug, error, error_span, info, info_span, instrument, warn};
45
46use tempo_payload_types::TempoPayloadBuilderAttributes;
47
48use super::{
49    Mailbox,
50    ingress::{Broadcast, Genesis, Message, Propose, Verify},
51};
52use crate::{
53    consensus::{Digest, block::Block},
54    epoch::SchemeProvider,
55    subblocks,
56};
57
58pub(in crate::consensus) struct Actor<TContext, TState = Uninit> {
59    context: ContextCell<TContext>,
60    mailbox: mpsc::Receiver<Message>,
61
62    inner: Inner<TState>,
63}
64
65impl<TContext, TState> Actor<TContext, TState> {
66    pub(super) fn mailbox(&self) -> &Mailbox {
67        &self.inner.my_mailbox
68    }
69}
70
71impl<TContext> Actor<TContext, Uninit>
72where
73    TContext: Pacer + governor::clock::Clock + Rng + CryptoRng + Spawner + Storage + Metrics,
74{
75    pub(super) async fn init(config: super::Config<TContext>) -> eyre::Result<Self> {
76        let (tx, rx) = mpsc::channel(config.mailbox_size);
77        let my_mailbox = Mailbox::from_sender(tx);
78
79        Ok(Self {
80            context: ContextCell::new(config.context),
81            mailbox: rx,
82
83            inner: Inner {
84                fee_recipient: config.fee_recipient,
85                epoch_strategy: config.epoch_strategy,
86
87                payload_resolve_time: config.payload_resolve_time,
88                payload_return_time: config.payload_return_time,
89
90                my_mailbox,
91                marshal: config.marshal,
92
93                execution_node: config.execution_node,
94                executor: config.executor,
95
96                subblocks: config.subblocks,
97
98                scheme_provider: config.scheme_provider,
99
100                state: Uninit(()),
101            },
102        })
103    }
104
105    /// Runs the actor until it is externally stopped.
106    async fn run_until_stopped(self, dkg_manager: crate::dkg::manager::Mailbox) {
107        let Self {
108            context,
109            mailbox,
110            inner,
111        } = self;
112        // TODO(janis): should be placed under a shutdown signal so we don't
113        // just stall on startup.
114        let Ok(initialized) = inner.into_initialized(dkg_manager).await else {
115            // Drop the error because into_initialized generates an error event.
116            return;
117        };
118
119        Actor {
120            context,
121            mailbox,
122            inner: initialized,
123        }
124        .run_until_stopped()
125        .await
126    }
127
128    pub(in crate::consensus) fn start(
129        mut self,
130        dkg_manager: crate::dkg::manager::Mailbox,
131    ) -> Handle<()> {
132        spawn_cell!(self.context, self.run_until_stopped(dkg_manager).await)
133    }
134}
135
136impl<TContext> Actor<TContext, Init>
137where
138    TContext: Pacer + governor::clock::Clock + Rng + CryptoRng + Spawner + Storage + Metrics,
139{
140    async fn run_until_stopped(mut self) {
141        while let Some(msg) = self.mailbox.next().await {
142            if let Err(error) = self.handle_message(msg) {
143                error_span!("handle message").in_scope(|| {
144                    error!(
145                        %error,
146                        "critical error occurred while handling message; exiting"
147                    )
148                });
149                break;
150            }
151        }
152    }
153
154    fn handle_message(&mut self, msg: Message) -> eyre::Result<()> {
155        match msg {
156            Message::Broadcast(broadcast) => {
157                self.context.with_label("broadcast").spawn({
158                    let inner = self.inner.clone();
159                    move |_| inner.handle_broadcast(broadcast)
160                });
161            }
162            Message::Genesis(genesis) => {
163                self.context.with_label("genesis").spawn({
164                    let inner = self.inner.clone();
165                    move |context| inner.handle_genesis(genesis, context)
166                });
167            }
168            Message::Propose(propose) => {
169                self.context.with_label("propose").spawn({
170                    let inner = self.inner.clone();
171                    move |context| inner.handle_propose(propose, context)
172                });
173            }
174            Message::Verify(verify) => {
175                self.context.with_label("verify").spawn({
176                    let inner = self.inner.clone();
177                    move |context| inner.handle_verify(*verify, context)
178                });
179            }
180        }
181        Ok(())
182    }
183}
184
185#[derive(Clone)]
186struct Inner<TState> {
187    fee_recipient: alloy_primitives::Address,
188    epoch_strategy: FixedEpocher,
189    payload_resolve_time: Duration,
190    payload_return_time: Duration,
191
192    my_mailbox: Mailbox,
193
194    marshal: crate::alias::marshal::Mailbox,
195
196    execution_node: TempoFullNode,
197    executor: crate::executor::Mailbox,
198    subblocks: Option<subblocks::Mailbox>,
199    scheme_provider: SchemeProvider,
200
201    state: TState,
202}
203
204impl Inner<Init> {
205    #[instrument(
206        skip_all,
207        fields(%broadcast.payload),
208        err(level = Level::ERROR),
209    )]
210    async fn handle_broadcast(self, broadcast: Broadcast) -> eyre::Result<()> {
211        let Some((round, latest_proposed)) = self.state.latest_proposed_block.read().await.clone()
212        else {
213            return Err(eyre!("there was no latest block to broadcast"));
214        };
215        ensure!(
216            broadcast.payload == latest_proposed.digest(),
217            "broadcast of payload `{}` was requested, but digest of latest proposed block is `{}`",
218            broadcast.payload,
219            latest_proposed.digest(),
220        );
221
222        self.marshal.proposed(round, latest_proposed).await;
223        Ok(())
224    }
225
226    #[instrument(
227        skip_all,
228        fields(
229            epoch = %genesis.epoch,
230        ),
231        ret(Display),
232        err(level = Level::ERROR)
233    )]
234    async fn handle_genesis<TContext: commonware_runtime::Clock>(
235        self,
236        mut genesis: Genesis,
237        context: TContext,
238    ) -> eyre::Result<Digest> {
239        // The last block of the previous epoch is the genesis of the current
240        // epoch. Only epoch 0/height 0 is special cased because first height
241        // of epoch 0 == genesis of epoch 0.
242        let boundary = match genesis.epoch.previous() {
243            None => Height::zero(),
244            Some(previous_epoch) => self
245                .epoch_strategy
246                .last(previous_epoch)
247                .expect("epoch strategy is for all epochs"),
248        };
249
250        let mut attempts = 0;
251        let epoch_genesis = loop {
252            attempts += 1;
253            if let Ok(Some(hash)) = self.execution_node.provider.block_hash(boundary.get()) {
254                break Digest(hash);
255            } else if let Some((_, digest)) = self.marshal.get_info(boundary).await {
256                break digest;
257            } else {
258                info_span!("fetch_genesis_digest").in_scope(|| {
259                    info!(
260                        boundary.height = %boundary,
261                        attempts,
262                        "neither marshal actor nor execution layer had the \
263                        boundary block of the previous epoch available; \
264                        waiting 2s before trying again"
265                    );
266                });
267                select!(
268                    () = genesis.response.closed() => {
269                        return Err(eyre!("genesis request was cancelled"));
270                    },
271
272                    _ = context.sleep(Duration::from_secs(2)) => {
273                        continue;
274                    },
275                );
276            }
277        };
278        genesis.response.send(epoch_genesis).map_err(|_| {
279            eyre!("failed returning parent digest for epoch: return channel was already closed")
280        })?;
281        Ok(epoch_genesis)
282    }
283
284    /// Handles a [`Propose`] request.
285    #[instrument(
286        skip_all,
287        fields(
288            epoch = %request.round.epoch(),
289            view = %request.round.view(),
290            parent.view = %request.parent.0,
291            parent.digest = %request.parent.1,
292        ),
293        err(level = Level::WARN),
294    )]
295    async fn handle_propose<TContext: Pacer>(
296        self,
297        request: Propose,
298        context: TContext,
299    ) -> eyre::Result<()> {
300        let Propose {
301            parent: (parent_view, parent_digest),
302            mut response,
303            round,
304        } = request;
305
306        let proposal = select!(
307            () = response.closed() => {
308                Err(eyre!(
309                    "proposal return channel was closed by consensus \
310                    engine before block could be proposed; aborting"
311                ))
312           },
313
314            res = self.clone().propose(
315                context.clone(),
316                parent_view,
317                parent_digest,
318                round
319            ) => {
320                res.wrap_err("failed creating a proposal")
321            }
322        )?;
323
324        let proposal_digest = proposal.digest();
325        let proposal_height = proposal.height();
326
327        info!(
328            proposal.digest = %proposal_digest,
329            proposal.height = %proposal_height,
330            "constructed proposal",
331        );
332
333        response.send(proposal_digest).map_err(|_| {
334            eyre!(
335                "failed returning proposal to consensus engine: response \
336                channel was already closed"
337            )
338        })?;
339
340        // If re-proposing, then don't store the parent for broadcasting and
341        // don't touch the execution layer.
342        if proposal_digest == parent_digest {
343            return Ok(());
344        }
345
346        {
347            let mut lock = self.state.latest_proposed_block.write().await;
348            *lock = Some((round, proposal.clone()));
349        }
350
351        Ok(())
352    }
353
354    /// Verifies a [`Verify`] request.
355    ///
356    /// this method only renders a decision on the `verify.response`
357    /// channel if it was able to come to a boolean decision. If it was
358    /// unable to refute or prove the validity of the block it will
359    /// return an error and drop the response channel.
360    ///
361    /// Conditions for which no decision could be made are usually:
362    /// no block could be read from the syncer or communication with the
363    /// execution layer failed.
364    #[instrument(
365        skip_all,
366        fields(
367            epoch = %verify.round.epoch(),
368            view = %verify.round.view(),
369            digest = %verify.payload,
370            parent.view = %verify.parent.0,
371            parent.digest = %verify.parent.1,
372            proposer = %verify.proposer,
373        ),
374    )]
375    async fn handle_verify<TContext: Pacer>(self, verify: Verify, context: TContext) {
376        let Verify {
377            parent,
378            payload,
379            proposer,
380            mut response,
381            round,
382        } = verify;
383        let result = select!(
384            () = response.closed() => {
385                Err(eyre!(
386                    "verification return channel was closed by consensus \
387                    engine before block could be validated; aborting"
388                ))
389            },
390
391            res = self.clone().verify(context, parent, payload, proposer, round) => {
392                res.wrap_err("block verification failed")
393            }
394        );
395
396        // Respond with the verification result ASAP. Also generates
397        // the event reporting the result of the verification.
398        let _ = report_verification_result(response, &result);
399
400        // 2. make the forkchoice state available && cache the block
401        if let Ok((block, true)) = result {
402            // Only make the verified block canonical when not doing a
403            // re-propose at the end of an epoch.
404            if parent.1 != payload
405                && let Err(error) = self
406                    .state
407                    .executor
408                    .canonicalize_head(block.height(), block.digest())
409            {
410                tracing::warn!(
411                    %error,
412                    "failed making the verified proposal the head of the canonical chain",
413                );
414            }
415            self.marshal.verified(round, block).await;
416        }
417    }
418
419    async fn propose<TContext: Pacer>(
420        self,
421        context: TContext,
422        parent_view: View,
423        parent_digest: Digest,
424        round: Round,
425    ) -> eyre::Result<Block> {
426        let parent = get_parent(
427            &self.execution_node,
428            round,
429            parent_digest,
430            parent_view,
431            &self.marshal,
432        )
433        .await?;
434        debug!(height = %parent.height(), "retrieved parent block",);
435
436        let parent_epoch_info = self
437            .epoch_strategy
438            .containing(parent.height())
439            .expect("epoch strategy is for all heights");
440        // XXX: Re-propose the parent if the parent is the last height of the
441        // epoch. parent.height+1 should be proposed as the first block of the
442        // next epoch.
443        if parent_epoch_info.last() == parent.height() && parent_epoch_info.epoch() == round.epoch()
444        {
445            info!("parent is last height of epoch; re-proposing parent");
446            return Ok(parent);
447        }
448
449        // Send the proposal parent to reth to cover edge cases when we were not asked to verify it directly.
450        if !verify_block(
451            context.clone(),
452            parent_epoch_info.epoch(),
453            &self.epoch_strategy,
454            self.execution_node
455                .add_ons_handle
456                .beacon_engine_handle
457                .clone(),
458            &parent,
459            // It is safe to not verify the parent of the parent because this block is already notarized.
460            parent.parent_digest(),
461            &self.scheme_provider,
462        )
463        .await
464        .wrap_err("failed verifying block against execution layer")?
465        {
466            eyre::bail!("the proposal parent block is not valid");
467        }
468
469        ready(
470            self.state
471                .executor
472                .canonicalize_head(parent.height(), parent.digest()),
473        )
474        .and_then(|ack| ack.map_err(eyre::Report::new))
475        .await
476        .wrap_err("failed updating canonical head to parent")?;
477
478        // Query DKG manager for ceremony data before building payload
479        // This data will be passed to the payload builder via attributes
480        let extra_data = if parent_epoch_info.last() == parent.height().next()
481            && parent_epoch_info.epoch() == round.epoch()
482        {
483            // At epoch boundary: include public ceremony outcome
484            let outcome = self
485                .state
486                .dkg_manager
487                .get_dkg_outcome(parent_digest, parent.height())
488                .await
489                .wrap_err("failed getting public dkg ceremony outcome")?;
490            ensure!(
491                round.epoch().next() == outcome.epoch,
492                "outcome is for epoch `{}`, but we are trying to include the \
493                outcome for epoch `{}`",
494                outcome.epoch,
495                round.epoch().next(),
496            );
497            info!(
498                %outcome.epoch,
499                outcome.network_identity = %outcome.network_identity(),
500                outcome.dealers = ?outcome.dealers(),
501                outcome.players = ?outcome.players(),
502                outcome.next_players = ?outcome.next_players(),
503                "received DKG outcome; will include in payload builder attributes",
504            );
505            outcome.encode().into()
506        } else {
507            // Regular block: try to include DKG dealer log.
508            match self.state.dkg_manager.get_dealer_log(round.epoch()).await {
509                Err(error) => {
510                    warn!(
511                        %error,
512                        "failed getting signed dealer log for current epoch \
513                        because actor dropped response channel",
514                    );
515                    Bytes::default()
516                }
517                Ok(None) => Bytes::default(),
518                Ok(Some(log)) => {
519                    info!(
520                        "received signed dealer log; will include in payload \
521                        builder attributes"
522                    );
523                    log.encode().into()
524                }
525            }
526        };
527
528        let attrs = TempoPayloadBuilderAttributes::new(
529            // XXX: derives the payload ID from the parent so that
530            // overlong payload builds will eventually succeed on the
531            // next iteration: if all other nodes take equally as long,
532            // the consensus engine will kill the proposal task (see
533            // also `response.cancellation` below). Then eventually
534            // consensus will circle back to an earlier node, which then
535            // has the chance of picking up the old payload.
536            payload_id_from_block_hash(&parent.block_hash()),
537            parent.block_hash(),
538            self.fee_recipient,
539            context.current().epoch_millis(),
540            extra_data,
541            move || {
542                self.subblocks
543                    .as_ref()
544                    .and_then(|s| s.get_subblocks(parent.block_hash()).ok())
545                    .unwrap_or_default()
546            },
547        );
548
549        let interrupt_handle = attrs.interrupt_handle().clone();
550
551        let payload_id = self
552            .execution_node
553            .payload_builder_handle
554            .send_new_payload(attrs)
555            .pace(&context, Duration::from_millis(20))
556            .await
557            .map_err(|_| eyre!("channel was closed before a response was returned"))
558            .and_then(|ret| ret.wrap_err("execution layer rejected request"))
559            .wrap_err("failed requesting new payload from the execution layer")?;
560
561        debug!(
562            resolve_time_ms = self.payload_resolve_time.as_millis(),
563            return_time_ms = self.payload_return_time.as_millis(),
564            "sleeping before payload builder resolving"
565        );
566
567        // Start the timer for `self.payload_return_time`
568        //
569        // This guarantees that we will not propose the block too early, and waits for at least `self.payload_return_time`,
570        // plus whatever time is needed to finish building the block.
571        let payload_return_time = context.current() + self.payload_return_time;
572
573        // Give payload builder at least `self.payload_resolve_time` until we interrupt it.
574        //
575        // The interrupt doesn't mean we'll immediately get the payload back,
576        // but only signals the builder to stop executing transactions,
577        // and start calculating the state root and sealing the block.
578        context.sleep(self.payload_resolve_time).await;
579
580        interrupt_handle.interrupt();
581
582        let payload = self
583            .execution_node
584            .payload_builder_handle
585            .resolve_kind(payload_id, reth_node_builder::PayloadKind::WaitForPending)
586            .pace(&context, Duration::from_millis(20))
587            .await
588            // XXX: this returns Option<Result<_, _>>; drilling into
589            // resolve_kind this really seems to resolve to None if no
590            // payload_id was found.
591            .ok_or_eyre("no payload found under provided id")
592            .and_then(|rsp| rsp.map_err(Into::<eyre::Report>::into))
593            .wrap_err_with(|| format!("failed getting payload for payload ID `{payload_id}`"))?;
594
595        // Keep waiting for `self.payload_return_time`, if there's anything left after building the block.
596        context.sleep_until(payload_return_time).await;
597
598        Ok(Block::from_execution_block(payload.block().clone()))
599    }
600
601    async fn verify<TContext: Pacer>(
602        self,
603        context: TContext,
604        (parent_view, parent_digest): (View, Digest),
605        payload: Digest,
606        proposer: PublicKey,
607        round: Round,
608    ) -> eyre::Result<(Block, bool)> {
609        let block_request = self
610            .marshal
611            .subscribe_by_digest(None, payload)
612            .await
613            .map_err(|_| eyre!("syncer dropped channel before the block-to-verified was sent"));
614
615        let (block, parent) = try_join(
616            block_request,
617            get_parent(
618                &self.execution_node,
619                round,
620                parent_digest,
621                parent_view,
622                &self.marshal,
623            ),
624        )
625        .await
626        .wrap_err("failed getting required blocks from syncer")?;
627
628        // Can only repropose at the end of an epoch.
629        //
630        // NOTE: fetching block and parent twice (in the case block == parent)
631        // seems wasteful, but both run concurrently, should finish almost
632        // immediately, and happen very rarely. It's better to optimize for the
633        // general case.
634        if payload == parent_digest {
635            let epoch_info = self
636                .epoch_strategy
637                .containing(block.height())
638                .expect("epoch strategy is for all heights");
639            if epoch_info.last() == block.height() && epoch_info.epoch() == round.epoch() {
640                return Ok((block, true));
641            } else {
642                return Ok((block, false));
643            }
644        }
645
646        if let Err(reason) = verify_header_extra_data(
647            &block,
648            (parent_view, parent_digest),
649            round,
650            &self.state.dkg_manager,
651            &self.epoch_strategy,
652            &proposer,
653        )
654        .await
655        {
656            warn!(
657                %reason,
658                "header extra data could not be verified; failing block",
659            );
660            return Ok((block, false));
661        }
662
663        if let Err(error) = self
664            .state
665            .executor
666            .canonicalize_head(parent.height(), parent.digest())
667        {
668            tracing::warn!(
669                %error,
670                parent.height = %parent.height(),
671                parent.digest = %parent.digest(),
672                "failed updating canonical head to parent",
673            );
674        }
675
676        let is_good = verify_block(
677            context,
678            round.epoch(),
679            &self.epoch_strategy,
680            self.execution_node
681                .add_ons_handle
682                .beacon_engine_handle
683                .clone(),
684            &block,
685            parent_digest,
686            &self.scheme_provider,
687        )
688        .await
689        .wrap_err("failed verifying block against execution layer")?;
690
691        Ok((block, is_good))
692    }
693}
694
695impl Inner<Uninit> {
696    /// Returns a fully initialized actor using runtime information.
697    ///
698    /// This includes:
699    ///
700    /// 1. reading the last finalized digest from the consensus marshaller.
701    /// 2. starting the canonical chain engine and storing its handle.
702    #[instrument(skip_all, err)]
703    async fn into_initialized(
704        self,
705        dkg_manager: crate::dkg::manager::Mailbox,
706    ) -> eyre::Result<Inner<Init>> {
707        let initialized = Inner {
708            fee_recipient: self.fee_recipient,
709            epoch_strategy: self.epoch_strategy,
710            payload_resolve_time: self.payload_resolve_time,
711            payload_return_time: self.payload_return_time,
712            my_mailbox: self.my_mailbox,
713            marshal: self.marshal,
714            execution_node: self.execution_node,
715            executor: self.executor.clone(),
716            state: Init {
717                latest_proposed_block: Arc::new(RwLock::new(None)),
718                dkg_manager,
719                executor: self.executor.clone(),
720            },
721            subblocks: self.subblocks,
722            scheme_provider: self.scheme_provider,
723        };
724
725        Ok(initialized)
726    }
727}
728
729/// Marker type to signal that the actor is not fully initialized.
730#[derive(Clone, Debug)]
731pub(in crate::consensus) struct Uninit(());
732
733/// Carries the runtime initialized state of the application.
734#[derive(Clone, Debug)]
735struct Init {
736    latest_proposed_block: Arc<RwLock<Option<(Round, Block)>>>,
737    dkg_manager: crate::dkg::manager::Mailbox,
738    /// The communication channel to the executor agent.
739    executor: crate::executor::Mailbox,
740}
741
742/// Verifies `block` given its `parent` against the execution layer.
743///
744/// Returns whether the block is valid or not. Returns an error if validation
745/// was not possible, for example if communication with the execution layer
746/// failed.
747///
748/// Reason the reason for why a block was not valid is communicated as a
749/// tracing event.
750#[instrument(
751    skip_all,
752    fields(
753        %epoch,
754        epoch_length,
755        block.parent_digest = %block.parent_digest(),
756        block.digest = %block.digest(),
757        block.height = %block.height(),
758        block.timestamp = block.timestamp(),
759        parent.digest = %parent_digest,
760    )
761)]
762async fn verify_block<TContext: Pacer>(
763    context: TContext,
764    epoch: Epoch,
765    epoch_strategy: &FixedEpocher,
766    engine: ConsensusEngineHandle<TempoPayloadTypes>,
767    block: &Block,
768    parent_digest: Digest,
769    scheme_provider: &SchemeProvider,
770) -> eyre::Result<bool> {
771    use alloy_rpc_types_engine::PayloadStatusEnum;
772
773    let epoch_info = epoch_strategy
774        .containing(block.height())
775        .expect("epoch strategy is for all heights");
776    if epoch_info.epoch() != epoch {
777        info!("block does not belong to this epoch");
778        return Ok(false);
779    }
780    if block.parent_hash() != *parent_digest {
781        info!(
782            "parent digest stored in block must match the digest of the parent \
783            argument but doesn't"
784        );
785        return Ok(false);
786    }
787
788    // FIXME: in cases where validate_block is called on the boundary block,
789    // the scheme might not be available.
790    //
791    // The fix is to track directly track notarizations and feed them to the
792    // EL that way, instead of doing this at the automaton/proposal level.
793    //
794    // https://github.com/tempoxyz/tempo/issues/1411
795    //
796    // let scheme = scheme_provider
797    //     .scoped(epoch)
798    //     .ok_or_eyre("cannot determine participants in the current epoch")?;
799    let validator_set = scheme_provider.scoped(epoch).map(|scheme| {
800        scheme
801            .participants()
802            .into_iter()
803            .map(|p| B256::from_slice(p))
804            .collect()
805    });
806    let block = block.clone().into_inner();
807    let execution_data = TempoExecutionData {
808        block: Arc::new(block),
809        validator_set,
810    };
811    let payload_status = engine
812        .new_payload(execution_data)
813        .pace(&context, Duration::from_millis(50))
814        .await
815        .wrap_err("failed sending `new payload` message to execution layer to validate block")?;
816    match payload_status.status {
817        PayloadStatusEnum::Valid => Ok(true),
818        PayloadStatusEnum::Invalid { validation_error } => {
819            info!(
820                validation_error,
821                "execution layer returned that the block was invalid"
822            );
823            Ok(false)
824        }
825        PayloadStatusEnum::Accepted => {
826            bail!(
827                "failed validating block because payload was accepted, meaning \
828                that this was not actually executed by the execution layer for some reason"
829            )
830        }
831        PayloadStatusEnum::Syncing => {
832            bail!(
833                "failed validating block because payload is still syncing, \
834                this means the parent block was available to the consensus \
835                layer but not the execution layer"
836            )
837        }
838    }
839}
840
841#[instrument(skip_all, err(Display))]
842async fn verify_header_extra_data(
843    block: &Block,
844    parent: (View, Digest),
845    round: Round,
846    dkg_manager: &crate::dkg::manager::Mailbox,
847    epoch_strategy: &FixedEpocher,
848    proposer: &PublicKey,
849) -> eyre::Result<()> {
850    let epoch_info = epoch_strategy
851        .containing(block.height())
852        .expect("epoch strategy is for all heights");
853    if epoch_info.last() == block.height() {
854        info!(
855            "on last block of epoch; verifying that the boundary block \
856            contains the correct DKG outcome",
857        );
858        let our_outcome = dkg_manager
859            .get_dkg_outcome(parent.1, block.height().saturating_sub(HeightDelta::new(1)))
860            .await
861            .wrap_err(
862                "failed getting public dkg ceremony outcome; cannot verify end \
863                of epoch block",
864            )?;
865        let block_outcome = OnchainDkgOutcome::read(&mut block.header().extra_data().as_ref())
866            .wrap_err(
867                "failed decoding extra data header as DKG ceremony \
868                outcome; cannot verify end of epoch block",
869            )?;
870        if our_outcome != block_outcome {
871            // Emit the log here so that it's structured. The error would be annoying to read.
872            warn!(
873                our.epoch = %our_outcome.epoch,
874                our.players = ?our_outcome.players(),
875                our.next_players = ?our_outcome.next_players(),
876                our.sharing = ?our_outcome.sharing(),
877                our.is_next_full_dkg = ?our_outcome.is_next_full_dkg,
878                block.epoch = %block_outcome.epoch,
879                block.players = ?block_outcome.players(),
880                block.next_players = ?block_outcome.next_players(),
881                block.sharing = ?block_outcome.sharing(),
882                block.is_next_full_dkg = ?block_outcome.is_next_full_dkg,
883                "our public dkg outcome does not match what's stored \
884                in the block",
885            );
886            return Err(eyre!(
887                "our public dkg outcome does not match what's \
888                stored in the block header extra_data field; they must \
889                match so that the end-of-block is valid",
890            ));
891        }
892    } else if !block.header().extra_data().is_empty() {
893        let bytes = block.header().extra_data().to_vec();
894        let dealer = dkg_manager
895            .verify_dealer_log(round.epoch(), bytes)
896            .await
897            .wrap_err("failed request to verify DKG dealing")?;
898        ensure!(
899            &dealer == proposer,
900            "proposer `{proposer}` is not the dealer `{dealer}` of the dealing \
901            in the block",
902        );
903    }
904
905    Ok(())
906}
907
908/// Constructs a [`PayloadId`] from the first 8 bytes of `block_hash`.
909fn payload_id_from_block_hash(block_hash: &B256) -> PayloadId {
910    PayloadId::new(
911        <[u8; 8]>::try_from(&block_hash[0..8])
912            .expect("a 32 byte array always has more than 8 bytes"),
913    )
914}
915
916/// Reports the verification result as a tracing event and consensus response.
917///
918/// This means either sending true/false if a decision could be rendered, or
919/// dropping the channel, if not.
920#[instrument(skip_all, err)]
921fn report_verification_result(
922    response: oneshot::Sender<bool>,
923    verification_result: &eyre::Result<(Block, bool)>,
924) -> eyre::Result<()> {
925    match &verification_result {
926        Ok((_, is_good)) => {
927            info!(
928                proposal_valid = is_good,
929                "returning proposal verification result to consensus",
930            );
931            response.send(*is_good).map_err(|_| {
932                eyre!(
933                    "attempted to send return verification result, but \
934                        receiver already dropped the channel"
935                )
936            })?;
937        }
938        Err(error) => {
939            info!(
940                %error,
941                "could not decide proposal, dropping response channel",
942            );
943        }
944    }
945    Ok(())
946}
947
948async fn get_parent(
949    execution_node: &TempoFullNode,
950    round: Round,
951    parent_digest: Digest,
952    parent_view: View,
953    marshal: &crate::alias::marshal::Mailbox,
954) -> eyre::Result<Block> {
955    let genesis_digest = execution_node.chain_spec().genesis_hash();
956    if parent_digest == Digest(genesis_digest) {
957        let genesis_block = Block::from_execution_block(
958            execution_node
959                .provider
960                .block_by_number(0)
961                .map_or_else(
962                    |e| Err(eyre::Report::new(e)),
963                    |block| block.ok_or_eyre("execution layer did not have block"),
964                )
965                .wrap_err("execution layer did not have the genesis block")?
966                .seal(),
967        );
968        Ok(genesis_block)
969    } else {
970        marshal
971            .subscribe_by_digest(Some(Round::new(round.epoch(), parent_view)), parent_digest)
972            .await
973            .await
974            .map_err(|_| eyre!("syncer dropped channel before the parent block was sent"))
975    }
976}