Skip to main content

tempo_consensus/consensus/application/
actor.rs

1//! The actor running the application event loop.
2//!
3//! # On the usage of the commonware-pacer
4//!
5//! The actor will contain `Pacer::pace` calls for all interactions
6//! with the execution layer. This is a no-op in production because the
7//! commonware tokio runtime ignores these. However, these are critical in
8//! e2e tests using the commonware deterministic runtime: since the execution
9//! layer is still running on the tokio runtime, these calls signal the
10//! deterministic runtime to spend real life time to wait for the execution
11//! layer calls to complete.
12
13use std::{
14    sync::{Arc, Mutex},
15    time::{Duration, Instant, SystemTime},
16};
17
18use alloy_consensus::BlockHeader;
19use alloy_primitives::{B256, Bytes};
20use commonware_codec::{Encode as _, EncodeSize as _, ReadExt as _};
21use commonware_consensus::{
22    Heightable as _,
23    simplex::Plan,
24    types::{Epoch, Epocher as _, FixedEpocher, Height, HeightDelta, Round, View},
25};
26use commonware_cryptography::{certificate::Provider as _, ed25519::PublicKey};
27use commonware_macros::select;
28use commonware_p2p::Recipients;
29use commonware_runtime::{
30    ContextCell, FutureExt as _, Handle, Metrics as _, Pacer, Spawner, Storage, spawn_cell,
31};
32use prometheus_client::metrics::counter::Counter;
33
34use commonware_utils::SystemTimeExt;
35use eyre::{OptionExt as _, WrapErr as _, bail, ensure, eyre};
36use futures::{StreamExt as _, channel::mpsc, future::try_join};
37use rand_08::{CryptoRng, Rng};
38use reth_node_builder::{Block as _, ConsensusEngineHandle};
39use reth_primitives_traits::BlockBody as _;
40use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
41use tempo_node::{TempoExecutionData, TempoFullNode, TempoPayloadTypes};
42use tempo_telemetry_util::display_duration;
43
44use reth_provider::{BlockHashReader as _, BlockReader as _, BlockSource};
45use tempo_payload_types::{
46    TempoPayloadAttributes, ValidationLatencyEstimator, ValidationLatencyWorkload,
47    marshal_persist_estimate, observe_marshal_persist,
48};
49use tempo_primitives::TempoConsensusContext;
50use tracing::{Level, debug, info, info_span, instrument, warn};
51
52use super::{
53    Mailbox,
54    ingress::{Broadcast, Genesis, Message, Propose, Verify},
55};
56use crate::{
57    consensus::{Digest, block::Block},
58    epoch::SchemeProvider,
59    subblocks,
60    utils::OptionFuture,
61};
62
63pub(in crate::consensus) struct Actor<TContext, TState = Uninit> {
64    context: ContextCell<TContext>,
65    mailbox: mpsc::Receiver<Message>,
66
67    inner: Inner<TState>,
68}
69
70struct BuildProposalArgs {
71    propose_start: Instant,
72    parent_view: View,
73    parent_digest: Digest,
74    round: Round,
75    leader: PublicKey,
76}
77
78struct ProposalReturn {
79    time: SystemTime,
80    block_size_bytes: usize,
81}
82
83impl<TContext, TState> Actor<TContext, TState> {
84    pub(super) fn mailbox(&self) -> &Mailbox {
85        &self.inner.my_mailbox
86    }
87}
88
89impl<TContext> Actor<TContext, Uninit>
90where
91    TContext: Pacer
92        + governor::clock::Clock
93        + Rng
94        + CryptoRng
95        + Spawner
96        + Storage
97        + commonware_runtime::Metrics,
98{
99    pub(super) async fn init(config: super::Config<TContext>) -> eyre::Result<Self> {
100        let (tx, rx) = mpsc::channel(config.mailbox_size);
101        let my_mailbox = Mailbox::from_sender(tx);
102
103        let metrics = Metrics::init(&config.context);
104
105        Ok(Self {
106            context: ContextCell::new(config.context),
107            mailbox: rx,
108
109            inner: Inner {
110                public_key: config.public_key,
111                epoch_strategy: config.epoch_strategy,
112
113                proposal_return_budget: config.proposal_return_budget,
114
115                my_mailbox,
116                marshal: config.marshal,
117
118                execution_node: config.execution_node,
119                executor: config.executor,
120
121                subblocks: config.subblocks,
122
123                scheme_provider: config.scheme_provider,
124                validation_latency_estimator: Default::default(),
125
126                metrics,
127
128                state: Uninit(()),
129            },
130        })
131    }
132
133    /// Runs the actor until it is externally stopped.
134    async fn run_until_stopped(self, dkg_manager: crate::dkg::manager::Mailbox) {
135        let Self {
136            context,
137            mailbox,
138            inner,
139        } = self;
140        // TODO(janis): should be placed under a shutdown signal so we don't
141        // just stall on startup.
142        let Ok(initialized) = inner.into_initialized(dkg_manager).await else {
143            // Drop the error because into_initialized generates an error event.
144            return;
145        };
146
147        Actor {
148            context,
149            mailbox,
150            inner: initialized,
151        }
152        .run_until_stopped()
153        .await
154    }
155
156    pub(in crate::consensus) fn start(
157        mut self,
158        dkg_manager: crate::dkg::manager::Mailbox,
159    ) -> Handle<()> {
160        spawn_cell!(self.context, self.run_until_stopped(dkg_manager))
161    }
162}
163
164impl<TContext> Actor<TContext, Init>
165where
166    TContext: Pacer
167        + governor::clock::Clock
168        + Rng
169        + CryptoRng
170        + Spawner
171        + Storage
172        + commonware_runtime::Metrics,
173{
174    async fn run_until_stopped(mut self) {
175        while let Some(msg) = self.mailbox.next().await {
176            self.handle_message(msg);
177        }
178    }
179
180    fn handle_message(&mut self, msg: Message) {
181        match msg {
182            Message::Broadcast(broadcast) => {
183                self.context.with_label("broadcast").spawn({
184                    let inner = self.inner.clone();
185                    move |_| inner.handle_broadcast(*broadcast)
186                });
187            }
188            Message::Genesis(genesis) => {
189                self.context.with_label("genesis").spawn({
190                    let inner = self.inner.clone();
191                    move |context| inner.handle_genesis(genesis, context)
192                });
193            }
194            Message::Propose(propose) => {
195                self.context.with_label("propose").spawn({
196                    let inner = self.inner.clone();
197                    move |context| inner.handle_propose(*propose, context)
198                });
199            }
200            Message::Verify(verify) => {
201                self.context.with_label("verify").spawn({
202                    let inner = self.inner.clone();
203                    move |context| inner.handle_verify(*verify, context)
204                });
205            }
206        }
207    }
208}
209
210#[derive(Clone)]
211struct Inner<TState> {
212    public_key: PublicKey,
213    epoch_strategy: FixedEpocher,
214    // Local proposal window after reserving network propagation time.
215    proposal_return_budget: Duration,
216
217    my_mailbox: Mailbox,
218
219    marshal: crate::alias::marshal::Mailbox,
220
221    execution_node: Arc<TempoFullNode>,
222    executor: crate::executor::Mailbox,
223    subblocks: Option<subblocks::Mailbox>,
224    scheme_provider: SchemeProvider,
225    validation_latency_estimator: Arc<Mutex<ValidationLatencyEstimator>>,
226
227    metrics: Metrics,
228
229    state: TState,
230}
231
232impl Inner<Init> {
233    #[instrument(
234        skip_all,
235        fields(%digest),
236    )]
237    async fn handle_broadcast(self, Broadcast { digest, plan }: Broadcast) {
238        let (round, recipients) = match plan {
239            Plan::Propose { round } => (round, Recipients::All),
240            Plan::Forward { round, recipients } => (round, recipients),
241        };
242        self.marshal.forward(round, digest, recipients).await;
243    }
244
245    #[instrument(
246        skip_all,
247        fields(
248            epoch = %genesis.epoch,
249        ),
250        ret(Display),
251        err(level = Level::ERROR)
252    )]
253    async fn handle_genesis<TContext: commonware_runtime::Clock>(
254        self,
255        mut genesis: Genesis,
256        context: TContext,
257    ) -> eyre::Result<Digest> {
258        // The last block of the previous epoch is the genesis of the current
259        // epoch. Only epoch 0/height 0 is special cased because first height
260        // of epoch 0 == genesis of epoch 0.
261        let boundary = match genesis.epoch.previous() {
262            None => Height::zero(),
263            Some(previous_epoch) => self
264                .epoch_strategy
265                .last(previous_epoch)
266                .expect("epoch strategy is for all epochs"),
267        };
268
269        let mut attempts = 0;
270        let epoch_genesis = loop {
271            attempts += 1;
272            if let Ok(Some(hash)) = self.execution_node.provider.block_hash(boundary.get()) {
273                break Digest(hash);
274            } else if let Some((_, digest)) = self.marshal.get_info(boundary).await {
275                break digest;
276            } else {
277                info_span!("fetch_genesis_digest").in_scope(|| {
278                    info!(
279                        boundary.height = %boundary,
280                        attempts,
281                        "neither marshal actor nor execution layer had the \
282                        boundary block of the previous epoch available; \
283                        waiting 2s before trying again"
284                    );
285                });
286                select!(
287                    () = genesis.response.closed() => {
288                        return Err(eyre!("genesis request was cancelled"));
289                    },
290
291                    _ = context.sleep(Duration::from_secs(2)) => {
292                        continue;
293                    },
294                );
295            }
296        };
297        genesis.response.send(epoch_genesis).map_err(|_| {
298            eyre!("failed returning parent digest for epoch: return channel was already closed")
299        })?;
300        Ok(epoch_genesis)
301    }
302
303    /// Handles a [`Propose`] request.
304    #[instrument(
305        skip_all,
306        fields(
307            epoch = %request.round.epoch(),
308            view = %request.round.view(),
309            parent.view = %request.parent.0,
310            parent.digest = %request.parent.1,
311        ),
312        err(level = Level::WARN),
313    )]
314    async fn handle_propose<TContext: Pacer>(
315        self,
316        request: Propose,
317        context: TContext,
318    ) -> eyre::Result<()> {
319        let Propose {
320            parent: (parent_view, parent_digest),
321            mut response,
322            round,
323            leader,
324            started_at: propose_start,
325        } = request;
326
327        let proposal_digest = {
328            let mut proposal = Box::pin(async {
329                // Follow the commonware marshal::standard::inline application:
330                //
331                // >On leader recovery, marshal may already hold a verified block
332                // >for this round (persisted by a pre-crash propose whose
333                // >notarize vote never reached the journal).
334                //
335                // >The parent context recovered by simplex may differ from the one
336                // >the cached block was built against, so the stored block is not safe to reuse
337                // >and building a fresh block would land on the same prunable
338                // >archive index and be silently dropped.
339                //
340                // >Skip this view and let the voter nullify it via timeout.
341                //
342                // TODO: we are diverging from commonware in that we return the digest
343                // here. Is that ok or can that cause problems?
344                //
345                // `marshal.get_verified` can take a long time if marshal is busy
346                // persisting the parent block, so we race it with payload building to
347                // avoid delaying the usual proposal path. If it finds a verified block,
348                // we always prefer that block and skip the newly built proposal,
349                // even when payload construction finishes first.
350                let already_verified = OptionFuture::some(self.marshal.get_verified(round));
351                futures::pin_mut!(already_verified);
352
353                let mut proposal = Box::pin(self.clone().propose(
354                    context.clone(),
355                    BuildProposalArgs {
356                        propose_start,
357                        parent_view,
358                        parent_digest,
359                        round,
360                        leader,
361                    },
362                ));
363
364                let proposal_result = tokio::select! {
365                    biased;
366
367                    Some(block) = &mut already_verified => {
368                        debug!("skipping proposal: verified block already exists for round on restart");
369                        Ok((block, None))
370                    },
371
372                    res = &mut proposal => {
373                        res.wrap_err("failed creating a proposal")
374                    },
375                };
376
377                // already_verified blocks are always preferred, even if
378                // building a block failed.
379                let (block, proposal_return) = if already_verified.is_some()
380                    && let Some(block) = already_verified.await
381                {
382                    debug!("skipping proposal: verified block already exists for round on restart");
383                    (block, None)
384                } else {
385                    proposal_result?
386                };
387
388                let digest = block.digest();
389                if let Some(proposal_return) = proposal_return {
390                    let persist_start = Instant::now();
391                    if !self.marshal.proposed(round, block).await {
392                        bail!("marshal actor rejected persisting proposal");
393                    }
394                    observe_marshal_persist(
395                        proposal_return.block_size_bytes,
396                        persist_start.elapsed(),
397                    );
398
399                    // Keep waiting for the remaining return time, if there's anything left after building the block.
400                    context.sleep_until(proposal_return.time).await;
401                }
402
403                eyre::Ok(digest)
404            });
405
406            tokio::select! {
407                () = response.closed() => {
408                    return Err(eyre!(
409                        "proposal return channel was closed by consensus \
410                        engine before block could be proposed; aborting"
411                    ))
412                },
413
414                res = &mut proposal => {
415                    res?
416                },
417            }
418        };
419
420        info!(
421            proposal.digest = %proposal_digest,
422            "constructed proposal",
423        );
424
425        response.send(proposal_digest).map_err(|_| {
426            eyre!(
427                "failed returning proposal to consensus engine: response \
428                channel was already closed"
429            )
430        })?;
431
432        Ok(())
433    }
434
435    /// Verifies a [`Verify`] request.
436    ///
437    /// this method only renders a decision on the `verify.response`
438    /// channel if it was able to come to a boolean decision. If it was
439    /// unable to refute or prove the validity of the block it will
440    /// return an error and drop the response channel.
441    ///
442    /// Conditions for which no decision could be made are usually:
443    /// no block could be read from the syncer or communication with the
444    /// execution layer failed.
445    #[instrument(
446        skip_all,
447        fields(
448            epoch = %verify.round.epoch(),
449            view = %verify.round.view(),
450            digest = %verify.payload,
451            parent.view = %verify.parent.0,
452            parent.digest = %verify.parent.1,
453            proposer = %verify.proposer,
454        ),
455        err,
456    )]
457    async fn handle_verify<TContext: Pacer>(
458        self,
459        verify: Verify,
460        context: TContext,
461    ) -> eyre::Result<()> {
462        let Verify {
463            parent,
464            payload,
465            proposer,
466            mut response,
467            round,
468        } = verify;
469        let VerifyResult {
470            result,
471            block,
472            parent,
473        } = select!(
474            () = response.closed() => {
475                Err(eyre!(
476                    "verification return channel was closed by consensus \
477                    engine before block could be validated; aborting"
478                ))
479            },
480
481            res = self.clone().verify(context, parent, payload, proposer, round) => {
482                res.wrap_err("block verification failed")
483            }
484        )?;
485
486        if response.send(result).is_err() {
487            warn!("received dropped channel before verification result could be returned");
488        }
489        // Keep large block drops out of the pre-response path.
490        drop((block, parent));
491
492        Ok(())
493    }
494
495    async fn propose<TContext: Pacer>(
496        self,
497        context: TContext,
498        args: BuildProposalArgs,
499    ) -> eyre::Result<(Block, Option<ProposalReturn>)> {
500        let BuildProposalArgs {
501            propose_start,
502            parent_view,
503            parent_digest,
504            round,
505            leader,
506        } = args;
507
508        let parent = subscribe(
509            &self.execution_node,
510            Round::new(round.epoch(), parent_view),
511            parent_digest,
512            &self.marshal,
513        )
514        .await?;
515
516        debug!(height = %parent.height(), "retrieved parent block",);
517
518        let parent_epoch_info = self
519            .epoch_strategy
520            .containing(parent.height())
521            .expect("epoch strategy is for all heights");
522
523        // If in the same epoch, re-propose the parent if the parent is the last height
524        // of the epoch. parent.height+1 should be proposed as the first block of the
525        // next epoch.
526        if parent_epoch_info.last() == parent.height() && parent_epoch_info.epoch() == round.epoch()
527        {
528            // If the header has a block access list hash but the block itself doesn't
529            // it likely means that the block was fetched from reth database and we need to
530            // additionally fetch the BAL from commonware.
531            let parent = if parent.block().header().block_access_list_hash().is_some()
532                && parent.block_access_list().is_none()
533            {
534                self.marshal
535                    .subscribe_by_digest(
536                        Some(Round::new(round.epoch(), parent_view)),
537                        parent_digest,
538                    )
539                    .await
540                    .await
541                    .map_err(|_| eyre!("syncer dropped channel before the parent block was sent"))?
542            } else {
543                parent
544            };
545            if !self.marshal.verified(round, parent.clone()).await {
546                bail!("marshal rejected re-proposed boundary block");
547            }
548            info!("parent is last height of epoch; re-proposing parent");
549            return Ok((parent, None));
550        }
551
552        let is_genesis_parent = parent.height().is_zero()
553            || parent_epoch_info.last() == parent.height()
554                && parent_epoch_info.epoch().next() == round.epoch();
555
556        // Send the proposal parent to execution layer to cover edge cases when
557        // we were not asked to to verify it (and hence are missing it in the
558        // EL).
559        //
560        // If proposing the first block of an epoch, its parent
561        // (genesis/boundary block) must exist and be finalized, so we can skip
562        // it.
563        if !is_genesis_parent
564            && verify_block(
565                context.clone(),
566                parent_epoch_info.epoch(),
567                &self.epoch_strategy,
568                self.execution_node
569                    .add_ons_handle
570                    .beacon_engine_handle
571                    .clone(),
572                &parent,
573                // It is safe to not verify the parent of the parent because this block is already notarized.
574                parent.parent_digest(),
575                &self.scheme_provider,
576            )
577            .await
578            .wrap_err("failed verifying block against execution layer")?
579            .is_none()
580        {
581            bail!("the proposal parent block is not valid");
582        }
583
584        // Query DKG manager for ceremony data before building payload
585        // This data will be passed to the payload builder via attributes
586        let extra_data = if parent_epoch_info.last() == parent.height().next()
587            && parent_epoch_info.epoch() == round.epoch()
588        {
589            // At epoch boundary: include public ceremony outcome
590            let outcome = self
591                .state
592                .dkg_manager
593                .get_dkg_outcome(parent_digest, parent.height())
594                .await
595                .wrap_err("failed getting public dkg ceremony outcome")?;
596            ensure!(
597                round.epoch().next() == outcome.epoch,
598                "outcome is for epoch `{}`, but we are trying to include the \
599                outcome for epoch `{}`",
600                outcome.epoch,
601                round.epoch().next(),
602            );
603            info!(
604                %outcome.epoch,
605                outcome.network_identity = %outcome.network_identity(),
606                outcome.dealers = ?outcome.dealers(),
607                outcome.players = ?outcome.players(),
608                outcome.next_players = ?outcome.next_players(),
609                "received DKG outcome; will include in payload builder attributes",
610            );
611            outcome.encode().into()
612        } else {
613            // Regular block: try to include DKG dealer log.
614            match self.state.dkg_manager.get_dealer_log(round.epoch()).await {
615                Err(error) => {
616                    warn!(
617                        %error,
618                        "failed getting signed dealer log for current epoch \
619                        because actor dropped response channel",
620                    );
621                    Bytes::default()
622                }
623                Ok(None) => Bytes::default(),
624                Ok(Some(log)) => {
625                    info!(
626                        "received signed dealer log; will include in payload \
627                        builder attributes"
628                    );
629                    log.encode().into()
630                }
631            }
632        };
633
634        // Use current timestamp but make sure that if parent's timestamp is in the future, we account for that.
635        //
636        // We don't expect this being hit in practice because we validate the
637        // timestamp is not in the future during EL validation.
638        let mut epoch_millis = context.current().epoch_millis();
639        if epoch_millis <= parent.timestamp_millis() {
640            self.metrics.parent_ahead_of_local_time.inc();
641            epoch_millis = parent.timestamp_millis() + 1
642        };
643
644        let (timestamp, timestamp_millis_part) = (epoch_millis / 1000, epoch_millis % 1000);
645
646        let consensus_context = Some(TempoConsensusContext {
647            epoch: round.epoch().get(),
648            view: round.view().get(),
649            parent_view: parent_view.get(),
650            proposer: crate::utils::public_key_to_tempo_primitive(&leader),
651        });
652
653        let parent_hash = parent.block_hash();
654        let proposer_public_key = crate::utils::public_key_to_b256(&self.public_key);
655        let marshal_persist = marshal_persist_estimate();
656        // Give the builder only the proposal window that remains when payload
657        // construction is requested. This accounts for a late `handle_propose`
658        // start instead of resetting the budget at builder entry.
659        let build_budget = self
660            .proposal_return_budget
661            .saturating_sub(propose_start.elapsed());
662        let validation_latency_estimate = self
663            .validation_latency_estimator
664            .lock()
665            .ok()
666            .and_then(|estimator| estimator.estimate());
667        let attrs = TempoPayloadAttributes::new(
668            Some(proposer_public_key),
669            timestamp,
670            timestamp_millis_part,
671            extra_data,
672            consensus_context,
673            move || {
674                self.subblocks
675                    .as_ref()
676                    .and_then(|s| s.get_subblocks(parent_hash).ok())
677                    .unwrap_or_default()
678            },
679        )
680        .with_payload_build_budget(build_budget)
681        .with_validation_latency_estimate(validation_latency_estimate);
682
683        // Subscribe to the payload build. The executor owns the build job
684        // and runs it to completion; dropping the receiver (for example
685        // because the proposal was cancelled) tells it that the payload is
686        // no longer wanted.
687        let payload_build_start = Instant::now();
688        let payload = self
689            .state
690            .executor
691            .canonicalize_and_build(parent.height(), parent.digest(), attrs)?
692            .await
693            .wrap_err(
694                "executor dropped the payload channel: the build failed (the \
695                executor logs the cause) or the executor shut down",
696            )?;
697
698        let payload_build_elapsed = payload_build_start.elapsed();
699        let payload_validation_work_elapsed = payload.validation_work_duration();
700        let validation_latency_elapsed = payload.validation_latency_duration();
701        let (block, block_access_list) = payload.into_execution_payload();
702        let execution_block_rlp_size_bytes = block.rlp_length();
703        let proposal = Block::from_execution_block_with_encoded_size(
704            block,
705            block_access_list,
706            execution_block_rlp_size_bytes,
707        )
708        .wrap_err("payload builder produced an invalid block access list")?;
709        let consensus_block_size_bytes = proposal.encode_size();
710        let validator_marshal_persist = marshal_persist.estimate(consensus_block_size_bytes);
711        let proposal_elapsed = propose_start.elapsed();
712        // Pace proposal return from the original propose start. Validators still
713        // need to repeat replayable build work and marshal persistence, so leave
714        // room for those costs before returning the proposal.
715        let return_delay = self
716            .proposal_return_budget
717            .saturating_sub(proposal_elapsed)
718            .saturating_sub(validation_latency_elapsed)
719            .saturating_sub(validator_marshal_persist);
720        debug!(
721            proposal_elapsed = %display_duration(proposal_elapsed),
722            build_time = %display_duration(payload_build_elapsed),
723            payload_validation_work = %display_duration(payload_validation_work_elapsed),
724            validation_latency_time = %display_duration(validation_latency_elapsed),
725            validator_marshal_persist = %display_duration(validator_marshal_persist),
726            return_time = %display_duration(return_delay),
727            execution_block_rlp_size_bytes,
728            consensus_block_size_bytes,
729            "sleeping before returning proposal"
730        );
731        let proposal_return_time = context.current() + return_delay;
732
733        Ok((
734            proposal,
735            Some(ProposalReturn {
736                time: proposal_return_time,
737                block_size_bytes: consensus_block_size_bytes,
738            }),
739        ))
740    }
741
742    async fn verify<TContext: Pacer>(
743        self,
744        context: TContext,
745        (parent_view, parent_digest): (View, Digest),
746        payload: Digest,
747        proposer: PublicKey,
748        round: Round,
749    ) -> eyre::Result<VerifyResult> {
750        let (block, parent) = try_join(
751            subscribe(&self.execution_node, round, payload, &self.marshal),
752            subscribe(
753                &self.execution_node,
754                Round::new(round.epoch(), parent_view),
755                parent_digest,
756                &self.marshal,
757            ),
758        )
759        .await
760        .wrap_err("failed getting required blocks")?;
761
762        // Can only repropose at the end of an epoch.
763        //
764        // NOTE: fetching block and parent twice (in the case block == parent)
765        // seems wasteful, but both run concurrently, should finish almost
766        // immediately, and happen very rarely. It's better to optimize for the
767        // general case.
768        if payload == parent_digest {
769            let epoch_info = self
770                .epoch_strategy
771                .containing(block.height())
772                .expect("epoch strategy is for all heights");
773            if epoch_info.last() == block.height() && epoch_info.epoch() == round.epoch() {
774                if !self.marshal.verified(round, block).await {
775                    bail!("marshal actor refused to persist verified re-proposed block");
776                }
777                return Ok(VerifyResult {
778                    result: true,
779                    block: None,
780                    parent: Some(parent),
781                });
782            } else {
783                return Ok(VerifyResult {
784                    result: false,
785                    block: Some(block),
786                    parent: Some(parent),
787                });
788            }
789        }
790
791        if let Err(reason) = verify_header(
792            &block,
793            (parent_view, parent_digest),
794            round,
795            &self.state.dkg_manager,
796            &self.epoch_strategy,
797            &proposer,
798        )
799        .await
800        {
801            warn!(%reason, "header could not be verified; failing block");
802            return Ok(VerifyResult {
803                result: false,
804                block: Some(block),
805                parent: Some(parent),
806            });
807        }
808
809        if let Err(error) = self
810            .state
811            .executor
812            .canonicalize_head(parent.height(), parent.digest())
813            .await
814        {
815            tracing::warn!(
816                %error,
817                parent.height = %parent.height(),
818                parent.digest = %parent.digest(),
819                "failed updating canonical head to parent; trying to go on",
820            );
821        }
822
823        let validation_duration = verify_block(
824            context,
825            round.epoch(),
826            &self.epoch_strategy,
827            self.execution_node
828                .add_ons_handle
829                .beacon_engine_handle
830                .clone(),
831            &block,
832            parent_digest,
833            &self.scheme_provider,
834        )
835        .await
836        .wrap_err("failed verifying block against execution layer")?;
837        if let Some(duration) = validation_duration
838            && let Ok(mut estimator) = self.validation_latency_estimator.lock()
839        {
840            estimator.observe(
841                block.height().get(),
842                ValidationLatencyWorkload::new(
843                    block.block().gas_used(),
844                    block.block().body().transaction_count(),
845                ),
846                duration,
847            );
848        }
849        let is_good = validation_duration.is_some();
850
851        let block_height = block.height();
852        let block_digest = block.digest();
853
854        if is_good {
855            // Persist the block in the marshal actor and execution layer.
856            if !self.marshal.verified(round, block).await {
857                bail!("marshal actor refused to persist verified block");
858            }
859
860            // FIXME: move this into the certification step?
861            self.state
862                .executor
863                .canonicalize_head(block_height, block_digest)
864                .await
865                .wrap_err("failed making the verified proposal the head of the canonical chain")?;
866
867            return Ok(VerifyResult {
868                result: true,
869                block: None,
870                parent: Some(parent),
871            });
872        }
873
874        Ok(VerifyResult {
875            result: false,
876            block: Some(block),
877            parent: Some(parent),
878        })
879    }
880}
881
882impl Inner<Uninit> {
883    /// Returns a fully initialized actor using runtime information.
884    ///
885    /// This includes:
886    ///
887    /// 1. reading the last finalized digest from the consensus marshaller.
888    /// 2. starting the canonical chain engine and storing its handle.
889    #[instrument(skip_all, err)]
890    async fn into_initialized(
891        self,
892        dkg_manager: crate::dkg::manager::Mailbox,
893    ) -> eyre::Result<Inner<Init>> {
894        let initialized = Inner {
895            public_key: self.public_key,
896            epoch_strategy: self.epoch_strategy,
897            proposal_return_budget: self.proposal_return_budget,
898            my_mailbox: self.my_mailbox,
899            marshal: self.marshal,
900            execution_node: self.execution_node,
901            executor: self.executor.clone(),
902            state: Init {
903                dkg_manager,
904                executor: self.executor.clone(),
905            },
906            subblocks: self.subblocks,
907            scheme_provider: self.scheme_provider,
908            validation_latency_estimator: self.validation_latency_estimator,
909            metrics: self.metrics,
910        };
911
912        Ok(initialized)
913    }
914}
915
916/// Marker type to signal that the actor is not fully initialized.
917#[derive(Clone, Debug)]
918pub(in crate::consensus) struct Uninit(());
919
920/// Carries the runtime initialized state of the application.
921#[derive(Clone, Debug)]
922struct Init {
923    dkg_manager: crate::dkg::manager::Mailbox,
924    /// The communication channel to the executor agent.
925    executor: crate::executor::Mailbox,
926}
927
928struct VerifyResult {
929    /// Whether consensus should accept the verified proposal.
930    ///
931    /// This is the value sent through `Verify::response`: `true` accepts the
932    /// proposal, `false` rejects it.
933    result: bool,
934    /// The proposed block when it was not moved into the verified marshal state.
935    block: Option<Block>,
936    /// The parent block fetched to verify the proposal.
937    parent: Option<Block>,
938}
939
940/// Verifies `block` given its `parent` against the execution layer.
941///
942/// Returns EL validation duration when validation reached the execution layer
943/// and succeeded, or `None` if the block is invalid. Returns an error if
944/// validation was not possible, for example if communication with the execution
945/// layer failed.
946///
947/// Reason the reason for why a block was not valid is communicated as a
948/// tracing event.
949#[instrument(
950    skip_all,
951    fields(
952        %epoch,
953        epoch_length,
954        block.parent_digest = %block.parent_digest(),
955        block.digest = %block.digest(),
956        block.height = %block.height(),
957        block.timestamp = block.timestamp(),
958        parent.digest = %parent_digest,
959    )
960)]
961async fn verify_block<TContext: Pacer>(
962    context: TContext,
963    epoch: Epoch,
964    epoch_strategy: &FixedEpocher,
965    engine: ConsensusEngineHandle<TempoPayloadTypes>,
966    block: &Block,
967    parent_digest: Digest,
968    scheme_provider: &SchemeProvider,
969) -> eyre::Result<Option<Duration>> {
970    use alloy_rpc_types_engine::PayloadStatusEnum;
971
972    let epoch_info = epoch_strategy
973        .containing(block.height())
974        .expect("epoch strategy is for all heights");
975    if epoch_info.epoch() != epoch {
976        info!("block does not belong to this epoch");
977        return Ok(None);
978    }
979    if block.parent_hash() != *parent_digest {
980        info!(
981            "parent digest stored in block must match the digest of the parent \
982            argument but doesn't"
983        );
984        return Ok(None);
985    }
986
987    // Scheme registration precedes engine creation, so the scheme must exist
988    let scheme = scheme_provider
989        .scoped(epoch)
990        .ok_or_eyre("cannot determine participants in the current epoch")?;
991
992    let validator_set = Some(
993        scheme
994            .participants()
995            .into_iter()
996            .map(|p| B256::from_slice(p))
997            .collect(),
998    );
999    let (block, block_access_list) = block.clone().into_parts();
1000    let execution_data = TempoExecutionData {
1001        block: Arc::new(block),
1002        block_access_list,
1003        validator_set,
1004    };
1005    let validation_start = Instant::now();
1006    let payload_status = engine
1007        .new_payload(execution_data)
1008        .pace(&context, Duration::from_millis(50))
1009        .await
1010        .wrap_err("failed sending `new payload` message to execution layer to validate block")?;
1011    match payload_status.status {
1012        PayloadStatusEnum::Valid => Ok(Some(validation_start.elapsed())),
1013        PayloadStatusEnum::Invalid { validation_error } => {
1014            info!(
1015                validation_error,
1016                "execution layer returned that the block was invalid"
1017            );
1018            Ok(None)
1019        }
1020        PayloadStatusEnum::Accepted => {
1021            bail!(
1022                "failed validating block because payload was accepted, meaning \
1023                that this was not actually executed by the execution layer for some reason"
1024            )
1025        }
1026        PayloadStatusEnum::Syncing => {
1027            bail!(
1028                "failed validating block because payload is still syncing, \
1029                this means the parent block was available to the consensus \
1030                layer but not the execution layer"
1031            )
1032        }
1033    }
1034}
1035
1036#[instrument(skip_all, err(Display))]
1037async fn verify_header(
1038    block: &Block,
1039    parent: (View, Digest),
1040    round: Round,
1041    dkg_manager: &crate::dkg::manager::Mailbox,
1042    epoch_strategy: &FixedEpocher,
1043    proposer: &PublicKey,
1044) -> eyre::Result<()> {
1045    let epoch_info = epoch_strategy
1046        .containing(block.height())
1047        .expect("epoch strategy is for all heights");
1048
1049    let ctx = block
1050        .header()
1051        .consensus_context
1052        .ok_or_eyre("missing consensus context")?;
1053
1054    let expected_ctx = TempoConsensusContext {
1055        epoch: round.epoch().get(),
1056        view: round.view().get(),
1057        parent_view: parent.0.get(),
1058        proposer: crate::utils::public_key_to_tempo_primitive(proposer),
1059    };
1060
1061    ensure!(
1062        ctx == expected_ctx,
1063        "mismatch in consensus context for block `{}`. expected `{expected_ctx:?}`. got `{ctx:?}`",
1064        block.digest()
1065    );
1066
1067    if epoch_info.last() == block.height() {
1068        info!(
1069            "on last block of epoch; verifying that the boundary block \
1070            contains the correct DKG outcome",
1071        );
1072        let our_outcome = dkg_manager
1073            .get_dkg_outcome(parent.1, block.height().saturating_sub(HeightDelta::new(1)))
1074            .await
1075            .wrap_err(
1076                "failed getting public dkg ceremony outcome; cannot verify end \
1077                of epoch block",
1078            )?;
1079        let block_outcome = OnchainDkgOutcome::read(&mut block.header().extra_data().as_ref())
1080            .wrap_err(
1081                "failed decoding extra data header as DKG ceremony \
1082                outcome; cannot verify end of epoch block",
1083            )?;
1084        if our_outcome != block_outcome {
1085            // Emit the log here so that it's structured. The error would be annoying to read.
1086            warn!(
1087                our.epoch = %our_outcome.epoch,
1088                our.players = ?our_outcome.players(),
1089                our.next_players = ?our_outcome.next_players(),
1090                our.sharing = ?our_outcome.sharing(),
1091                our.is_next_full_dkg = ?our_outcome.is_next_full_dkg,
1092                block.epoch = %block_outcome.epoch,
1093                block.players = ?block_outcome.players(),
1094                block.next_players = ?block_outcome.next_players(),
1095                block.sharing = ?block_outcome.sharing(),
1096                block.is_next_full_dkg = ?block_outcome.is_next_full_dkg,
1097                "our public dkg outcome does not match what's stored \
1098                in the block",
1099            );
1100            return Err(eyre!(
1101                "our public dkg outcome does not match what's \
1102                stored in the block header extra_data field; they must \
1103                match so that the end-of-block is valid",
1104            ));
1105        }
1106    } else if !block.header().extra_data().is_empty() {
1107        let bytes = block.header().extra_data().to_vec();
1108        let dealer = dkg_manager
1109            .verify_dealer_log(round.epoch(), bytes)
1110            .await
1111            .wrap_err("failed request to verify DKG dealing")?;
1112        ensure!(
1113            &dealer == proposer,
1114            "proposer `{proposer}` is not the dealer `{dealer}` of the dealing \
1115            in the block",
1116        );
1117    }
1118
1119    Ok(())
1120}
1121
1122/// Read a block from the execution layer or fetches it from consensus p2p.
1123#[instrument(skip_all, fields(%round, %digest), err, ret(Display))]
1124async fn subscribe(
1125    execution_node: &TempoFullNode,
1126    round: Round,
1127    digest: Digest,
1128    marshal: &crate::alias::marshal::Mailbox,
1129) -> eyre::Result<Block> {
1130    let block = if let Some(block) = execution_node
1131        .provider
1132        .find_block_by_hash(digest.0, BlockSource::Any)
1133        .wrap_err_with(|| format!("failed querying execution layer for parent block `{digest}`"))?
1134    {
1135        // EL database reads do not include commonware sidecars.
1136        Block::from_execution_block_unchecked(block.seal(), None)
1137    } else {
1138        marshal
1139            .subscribe_by_digest(Some(round), digest)
1140            .await
1141            .await
1142            .map_err(|_| eyre!("syncer dropped channel before the parent block was sent"))?
1143    };
1144    Ok(block)
1145}
1146
1147#[derive(Clone)]
1148struct Metrics {
1149    parent_ahead_of_local_time: Counter,
1150}
1151
1152impl Metrics {
1153    fn init<TContext>(context: &TContext) -> Self
1154    where
1155        TContext: commonware_runtime::Metrics,
1156    {
1157        let parent_ahead_of_local_time = Counter::default();
1158        context.register(
1159            "parent_ahead_of_local_time",
1160            "number of times the parent block timestamp was ahead of local time",
1161            parent_ahead_of_local_time.clone(),
1162        );
1163
1164        Self {
1165            parent_ahead_of_local_time,
1166        }
1167    }
1168}