Skip to main content

tempo_consensus/executor/
actor.rs

1//! Drives the actual execution forwarding blocks and setting forkchoice state.
2//!
3//! This agent forwards finalized blocks from the consensus layer to the
4//! execution layer and tracks the digest of the latest finalized block.
5//! It also advances the canonical chain by sending forkchoice-updates.
6
7use std::{collections::VecDeque, ops::RangeInclusive, sync::Arc, time::Duration};
8
9use alloy_rpc_types_engine::{ForkchoiceState, PayloadId};
10use commonware_consensus::{Heightable as _, marshal::Update, types::Height};
11use commonware_cryptography::ed25519::PublicKey;
12use commonware_runtime::{
13    Clock, ContextCell, FutureExt, Handle, Metrics as RuntimeMetrics, Pacer, Spawner, spawn_cell,
14};
15use commonware_utils::{Acknowledgement, acknowledgement::Exact};
16use eyre::{Report, WrapErr as _, ensure};
17use futures::{
18    FutureExt as _, StreamExt as _,
19    channel::{
20        mpsc::{self, UnboundedReceiver},
21        oneshot,
22    },
23    future::BoxFuture,
24    stream::FuturesUnordered,
25};
26use prometheus_client::metrics::counter::Counter;
27use reth_ethereum::{chainspec::EthChainSpec, rpc::eth::primitives::BlockNumHash};
28use reth_node_builder::PayloadKind;
29use tempo_node::{TempoExecutionData, TempoFullNode};
30use tempo_payload_types::{TempoBuiltPayload, TempoPayloadAttributes};
31use tokio::select;
32use tracing::{
33    Level, Span, debug, error, error_span, info, info_span, instrument, warn, warn_span,
34};
35
36use super::{
37    Config,
38    ingress::{CanonicalizeAndBuild, CanonicalizeHead, Command, Message},
39};
40use crate::{
41    consensus::{Digest, block::Block},
42    utils::OptionFuture,
43};
44
45/// Tracks the latest forkchoice state accepted by the execution layer.
46///
47/// Also tracks the corresponding heights corresponding to
48/// `forkchoice_state.head_block_hash` and
49/// `forkchoice_state.finalized_block_hash`, respectively.
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51struct LastCanonicalized {
52    forkchoice: ForkchoiceState,
53    head_height: Height,
54    finalized_height: Height,
55}
56
57impl LastCanonicalized {
58    /// Updates the finalized height and finalized block hash to `height` and `digest`.
59    ///
60    /// `height` must be ahead of the latest canonicalized finalized height. If
61    /// it is not, then this is a no-op.
62    ///
63    /// Similarly, if `height` is ahead or the same as the latest canonicalized
64    /// head height, it also updates the head height.
65    ///
66    /// This is to ensure that the finalized block hash is never ahead of the
67    /// head hash.
68    fn update_finalized(self, height: Height, digest: Digest) -> Self {
69        let mut this = self;
70        if height > this.finalized_height {
71            this.finalized_height = height;
72            this.forkchoice.safe_block_hash = digest.0;
73            this.forkchoice.finalized_block_hash = digest.0;
74        }
75        if height >= this.head_height {
76            this.head_height = height;
77            this.forkchoice.head_block_hash = digest.0;
78        }
79        this
80    }
81
82    /// Updates the head height and head block hash to `height` and `digest`.
83    ///
84    /// If `height > self.finalized_height` or `digest` is the same as the finalized block hash,
85    /// this method will return a new canonical state with `self.head_height = height` and
86    /// `self.forkchoice.head = hash`.
87    ///
88    /// If `height <= self.finalized_height`, then this method will return
89    /// `self` unchanged.
90    fn update_head(self, height: Height, digest: Digest) -> Self {
91        let mut this = self;
92        if height > this.finalized_height || digest.0 == this.forkchoice.finalized_block_hash {
93            this.head_height = height;
94            this.forkchoice.head_block_hash = digest.0;
95        }
96        this
97    }
98}
99
100pub(crate) struct Actor<TContext> {
101    context: ContextCell<TContext>,
102
103    /// A handle to the execution node layer. Used to forward finalized blocks
104    /// and to update the canonical chain by sending forkchoice updates.
105    execution_node: Arc<TempoFullNode>,
106
107    last_consensus_finalized_height: Height,
108    last_execution_finalized_height: Height,
109
110    /// The channel over which the agent will receive new commands from the
111    /// application actor.
112    mailbox: mpsc::UnboundedReceiver<Message>,
113
114    /// The mailbox of the marshal actor. Used to backfill blocks.
115    marshal: crate::alias::marshal::Mailbox,
116
117    last_canonicalized: LastCanonicalized,
118
119    /// The interval at which to send a forkchoice update heartbeat to the
120    /// execution layer.
121    fcu_heartbeat_interval: Duration,
122
123    /// The timer for the next FCU heartbeat.
124    ///
125    /// Armed only when no execution request is active or queued.
126    fcu_heartbeat_timer: OptionFuture<BoxFuture<'static, ()>>,
127
128    /// Gap between the last finalized block on the consensus and execution
129    /// layers. Needs to be handled on startup because the execution layer does
130    /// not reliably flush all blocks.
131    finalized_heights_to_backfill: RangeInclusive<u64>,
132
133    /// Backfills that are currently in-flight and are awaiting resolution.
134    pending_backfill: OptionFuture<BoxFuture<'static, (u64, Option<Block>)>>,
135
136    /// Execution-layer requests waiting for the active execution task to finish.
137    execution_queue: VecDeque<ExecutionRequest>,
138    /// The single execution-layer request currently being driven in the background.
139    execution_task: OptionFuture<BoxFuture<'static, ExecutionTaskResult>>,
140
141    /// Payload build jobs currently being driven to completion.
142    ///
143    /// Each job resolves a payload from the execution layer's payload builder
144    /// and delivers it to the subscriber that requested the build. If the
145    /// subscriber dropped its receiver in the meantime, the built payload is
146    /// discarded.
147    payload_jobs: FuturesUnordered<BoxFuture<'static, ()>>,
148
149    latest_observed_finalized_tip: Option<(Height, Digest)>,
150
151    /// The node's ed25519 public key if the node is participating in
152    /// consensus. Not set if not, for example for followers.
153    public_key: Option<PublicKey>,
154
155    metrics: Metrics,
156}
157
158#[derive(Clone)]
159struct Metrics {
160    /// Number of finalized blocks whose proposer matches this node's public key.
161    finalized_blocks_proposed_by_self: Counter,
162}
163
164impl Metrics {
165    fn init<TContext>(context: &TContext) -> Self
166    where
167        TContext: RuntimeMetrics,
168    {
169        let finalized_blocks_proposed_by_self = Counter::default();
170        context.register(
171            "finalized_blocks_proposed_by_self",
172            "number of finalized blocks whose proposer matches this node's public key",
173            finalized_blocks_proposed_by_self.clone(),
174        );
175        Self {
176            finalized_blocks_proposed_by_self,
177        }
178    }
179}
180
181impl<TContext> Actor<TContext>
182where
183    TContext: Clock + RuntimeMetrics + Pacer + Spawner,
184{
185    pub(super) fn init(
186        context: TContext,
187        config: super::Config,
188        mailbox: UnboundedReceiver<super::ingress::Message>,
189    ) -> eyre::Result<Self> {
190        let Config {
191            execution_node,
192            last_finalized_height,
193            marshal,
194            fcu_heartbeat_interval,
195            public_key,
196        } = config;
197        let metrics = Metrics::init(&context);
198        let canonical_state = execution_node.provider.canonical_in_memory_state();
199        let finalized_num_hash = canonical_state
200            .get_finalized_num_hash()
201            .unwrap_or_else(|| BlockNumHash::new(0, execution_node.chain_spec().genesis_hash()));
202        let head_num_hash: BlockNumHash = canonical_state.chain_info().into();
203
204        let fcu_heartbeat_timer = OptionFuture::some(context.sleep(fcu_heartbeat_interval).boxed());
205        let last_execution_finalized_height = Height::new(finalized_num_hash.number);
206        let finalized_heights_to_backfill =
207            (last_execution_finalized_height.get() + 1)..=last_finalized_height.get();
208        Ok(Self {
209            context: ContextCell::new(context),
210            execution_node,
211            last_consensus_finalized_height: last_finalized_height,
212            last_execution_finalized_height,
213            mailbox,
214            marshal,
215            last_canonicalized: LastCanonicalized {
216                forkchoice: ForkchoiceState {
217                    head_block_hash: head_num_hash.hash,
218                    safe_block_hash: finalized_num_hash.hash,
219                    finalized_block_hash: finalized_num_hash.hash,
220                },
221                head_height: Height::new(head_num_hash.number),
222                finalized_height: Height::new(finalized_num_hash.number),
223            },
224            fcu_heartbeat_interval,
225            fcu_heartbeat_timer,
226
227            finalized_heights_to_backfill,
228            pending_backfill: OptionFuture::none(),
229            execution_queue: VecDeque::new(),
230            execution_task: OptionFuture::none(),
231            payload_jobs: FuturesUnordered::new(),
232
233            latest_observed_finalized_tip: None,
234
235            public_key,
236            metrics,
237        })
238    }
239
240    pub(crate) fn start(mut self) -> Handle<()> {
241        spawn_cell!(self.context, self.run())
242    }
243
244    async fn run(mut self) {
245        info_span!("start").in_scope(|| {
246            info!(
247                last_finalized_consensus_height = %self.last_consensus_finalized_height,
248                last_finalized_execution_height = %self.last_execution_finalized_height,
249                "consensus and execution layers reported last finalized heights; \
250                backfilling blocks from consensus to execution if necessary",
251            );
252        });
253
254        loop {
255            if self.pending_backfill.is_none()
256                && let Some(height) = self.finalized_heights_to_backfill.next()
257            {
258                self.pending_backfill.replace({
259                    let marshal = self.marshal.clone();
260                    async move { (height, marshal.get_block(Height::new(height)).await) }.boxed()
261                });
262            }
263
264            self.start_next_execution_task();
265            self.update_fcu_heartbeat_timer();
266
267            select! {
268                biased;
269
270                task_result = &mut self.execution_task => {
271                    match task_result {
272                        ExecutionTaskResult::Completed { canonicalized, payload_job } => {
273                            if let Some(canonicalized) = canonicalized {
274                                // There is only one execution task running at
275                                // a time, and `last_canonicalized` is only
276                                // mutated here to keep a consistent view.
277                                self.last_canonicalized = canonicalized;
278                            }
279                            if let Some(job) = payload_job {
280                                self.payload_jobs.push(
281                                    run_payload_job(
282                                        self.context.clone(),
283                                        self.execution_node.clone(),
284                                        job,
285                                    )
286                                    .boxed(),
287                                );
288                            }
289                        }
290                        ExecutionTaskResult::Fatal { error } => {
291                            error_span!("shutdown").in_scope(|| error!(
292                                %error,
293                                "executor encountered fatal execution-layer update error; \
294                                shutting down to prevent consensus-execution divergence"
295                            ));
296                            break;
297                        }
298                    }
299                }
300
301                block = &mut self.pending_backfill => {
302                    match block {
303                        (height, Some(block)) => {
304                            let (ack, _wait) = Exact::handle();
305                            let span = info_span!("backfill_on_start", %height);
306                            self.enqueue_execution_request(ExecutionRequest::FinalizeBlock(
307                                Box::new(FinalizedBlockRequest {
308                                    cause: span,
309                                    block,
310                                    acknowledgment: ack,
311                                    is_backfill: true,
312                                }),
313                            ));
314                        }
315                        (height, None) => {
316                            warn_span!("backfill_on_start", %height)
317                            .in_scope(|| warn!(
318                                "marshal actor did not have block even though \
319                                it must have finalized it previously",
320                            ));
321                        }
322                    }
323                }
324
325                Some(()) = self.payload_jobs.next() => {}
326
327                msg = self.mailbox.next() => {
328                    let Some(msg) = msg else { break; };
329                    if let Err(error) = self.handle_message(msg) {
330                        error_span!("shutdown").in_scope(|| error!(
331                            %error,
332                            "executor failed handling message; \
333                            shutting down to prevent consensus-execution divergence"
334                        ));
335                        break;
336                    }
337                },
338
339                _ = (&mut self.fcu_heartbeat_timer).fuse() => {
340                    self.send_forkchoice_update_heartbeat();
341                },
342            }
343        }
344    }
345
346    fn arm_fcu_heartbeat_timer(&mut self) {
347        if !self.fcu_heartbeat_timer.is_none() {
348            return;
349        }
350        self.fcu_heartbeat_timer
351            .replace(self.context.sleep(self.fcu_heartbeat_interval).boxed());
352    }
353
354    fn disarm_fcu_heartbeat_timer(&mut self) {
355        self.fcu_heartbeat_timer = OptionFuture::none();
356    }
357
358    fn update_fcu_heartbeat_timer(&mut self) {
359        if !self.is_backfilling()
360            && self.execution_task.is_none()
361            && self.execution_queue.is_empty()
362        {
363            self.arm_fcu_heartbeat_timer();
364        } else {
365            self.disarm_fcu_heartbeat_timer();
366        }
367    }
368
369    #[instrument(skip_all)]
370    fn send_forkchoice_update_heartbeat(&mut self) {
371        self.enqueue_execution_request(ExecutionRequest::Heartbeat {
372            cause: Span::current(),
373        });
374    }
375
376    fn handle_message(&mut self, message: Message) -> eyre::Result<()> {
377        let cause = message.cause;
378        let is_backfilling = self.is_backfilling();
379        match message.command {
380            Command::CanonicalizeHead(CanonicalizeHead {
381                height,
382                digest,
383                response,
384            }) => {
385                if is_backfilling {
386                    info_span!("handle_message")
387                        .in_scope(|| info!("request to canonicalize deferred while backfilling"));
388                }
389                self.enqueue_execution_request(ExecutionRequest::Canonicalize(Box::new(
390                    Canonicalize {
391                        cause,
392                        head_or_finalized: HeadOrFinalized::Head,
393                        height,
394                        digest,
395                        response: Some(response),
396                        build_attributes: None,
397                    },
398                )));
399            }
400            Command::CanonicalizeAndBuild(CanonicalizeAndBuild {
401                height,
402                digest,
403                attributes,
404                response,
405            }) => {
406                if is_backfilling {
407                    info_span!("handle_message").in_scope(|| {
408                        info!("request to canonicalize and build deferred while backfilling")
409                    });
410                }
411                self.enqueue_execution_request(ExecutionRequest::Canonicalize(Box::new(
412                    Canonicalize {
413                        cause,
414                        head_or_finalized: HeadOrFinalized::Head,
415                        height,
416                        digest,
417                        response: None,
418                        build_attributes: Some((*attributes, response)),
419                    },
420                )));
421            }
422            Command::Finalize(finalized) => match *finalized {
423                Update::Tip(_, height, digest) => {
424                    self.latest_observed_finalized_tip.replace((height, digest));
425                }
426                Update::Block(block, acknowledgement) => {
427                    self.enqueue_execution_request(ExecutionRequest::FinalizeBlock(Box::new(
428                        FinalizedBlockRequest {
429                            cause,
430                            block,
431                            acknowledgment: acknowledgement,
432                            is_backfill: false,
433                        },
434                    )));
435                }
436            },
437        }
438        Ok(())
439    }
440
441    fn enqueue_execution_request(&mut self, request: ExecutionRequest) {
442        if matches!(&request, ExecutionRequest::Heartbeat { .. })
443            && (!self.execution_queue.is_empty()
444                || !self.execution_task.is_none()
445                || self.is_backfilling())
446        {
447            return;
448        }
449
450        if request.is_backfill() {
451            let insert_at = self
452                .execution_queue
453                .iter()
454                .position(|request| !request.is_backfill())
455                .unwrap_or(self.execution_queue.len());
456            self.execution_queue.insert(insert_at, request);
457        } else {
458            self.execution_queue.push_back(request);
459        }
460    }
461
462    fn start_next_execution_task(&mut self) {
463        if !self.execution_task.is_none() {
464            return;
465        }
466
467        // If nothing is currently scheduled and a newer finalized tip was
468        // observed, push it into the queue so that it will be picked up next.
469        if self.execution_queue.is_empty()
470            && !self.is_backfilling()
471            && let Some((height, digest)) = self.latest_observed_finalized_tip
472            && let new_canonicalized = self.last_canonicalized.update_finalized(height, digest)
473            && new_canonicalized != self.last_canonicalized
474        {
475            self.execution_queue
476                .push_back(ExecutionRequest::Canonicalize(Box::new(Canonicalize {
477                    cause: Span::current(),
478                    head_or_finalized: HeadOrFinalized::Finalized,
479                    height,
480                    digest,
481                    response: None,
482                    build_attributes: None,
483                })));
484        }
485
486        let Some(request) = self.execution_queue.front() else {
487            return;
488        };
489        if self.is_backfilling() && !request.is_backfill() {
490            return;
491        }
492        let request = self.execution_queue.pop_front().expect("front exists");
493
494        let task = execute_request(
495            self.context.clone(),
496            self.execution_node.clone(),
497            self.public_key.clone(),
498            self.metrics.clone(),
499            self.last_canonicalized,
500            request,
501        );
502        self.execution_task.replace(task.boxed());
503    }
504
505    fn is_backfilling(&self) -> bool {
506        self.pending_backfill.is_some() || !self.finalized_heights_to_backfill.is_empty()
507    }
508}
509
510enum ExecutionRequest {
511    Heartbeat { cause: Span },
512    Canonicalize(Box<Canonicalize>),
513    FinalizeBlock(Box<FinalizedBlockRequest>),
514}
515
516impl ExecutionRequest {
517    fn is_backfill(&self) -> bool {
518        let Self::FinalizeBlock(req) = self else {
519            return false;
520        };
521        req.is_backfill
522    }
523}
524
525struct FinalizedBlockRequest {
526    cause: Span,
527    block: Block,
528    acknowledgment: Exact,
529    is_backfill: bool,
530}
531
532#[derive(Debug, Clone, Copy, PartialEq, Eq)]
533enum ForkchoiceUpdateKind {
534    Heartbeat,
535    Canonicalize { head_or_finalized: HeadOrFinalized },
536}
537
538enum ExecutionTaskResult {
539    Completed {
540        canonicalized: Option<LastCanonicalized>,
541        /// A payload build that the forkchoice update kicked off on the
542        /// execution layer and that still needs to be driven to completion.
543        payload_job: Option<StartPayloadJob>,
544    },
545    Fatal {
546        error: Report,
547    },
548}
549
550struct Canonicalize {
551    cause: Span,
552    head_or_finalized: HeadOrFinalized,
553    height: Height,
554    digest: Digest,
555    /// Acknowledges to the requester that the execution layer accepted the
556    /// forkchoice update.
557    response: Option<oneshot::Sender<()>>,
558    /// Payload attributes to register a build job with the forkchoice
559    /// update, paired with the subscriber awaiting the built payload.
560    build_attributes: Option<(TempoPayloadAttributes, oneshot::Sender<TempoBuiltPayload>)>,
561}
562
563/// A payload build registered on the execution layer whose result still needs
564/// to be delivered to the subscriber that requested it.
565struct StartPayloadJob {
566    cause: Span,
567    payload_id: PayloadId,
568    response: oneshot::Sender<TempoBuiltPayload>,
569}
570
571async fn execute_request<TContext>(
572    context: ContextCell<TContext>,
573    execution_node: Arc<TempoFullNode>,
574    public_key: Option<PublicKey>,
575    metrics: Metrics,
576    canonicalized: LastCanonicalized,
577    request: ExecutionRequest,
578) -> ExecutionTaskResult
579where
580    TContext: Pacer,
581{
582    match request {
583        ExecutionRequest::Heartbeat { cause } => {
584            if let Err(error) = submit_forkchoice_update(
585                &execution_node,
586                &context,
587                cause,
588                canonicalized,
589                None,
590                ForkchoiceUpdateKind::Heartbeat,
591            )
592            .await
593            {
594                warn!(%error, "queued forkchoice update failed");
595            }
596            ExecutionTaskResult::Completed {
597                canonicalized: None,
598                payload_job: None,
599            }
600        }
601        ExecutionRequest::Canonicalize(request) => {
602            let (canonicalized, payload_job) =
603                run_canonicalize_task(&context, execution_node, canonicalized, *request).await;
604            ExecutionTaskResult::Completed {
605                canonicalized,
606                payload_job,
607            }
608        }
609        ExecutionRequest::FinalizeBlock(request) => {
610            let fatal_on_error = !request.is_backfill;
611            match forward_finalized(
612                &context,
613                execution_node,
614                public_key,
615                metrics,
616                canonicalized,
617                *request,
618            )
619            .await
620            {
621                Ok(canonicalized) => ExecutionTaskResult::Completed {
622                    canonicalized,
623                    payload_job: None,
624                },
625                Err(error) if fatal_on_error => ExecutionTaskResult::Fatal { error },
626                Err(error) => {
627                    warn!(%error, "failed forwarding backfilled finalized block to execution layer");
628                    ExecutionTaskResult::Completed {
629                        canonicalized: None,
630                        payload_job: None,
631                    }
632                }
633            }
634        }
635    }
636}
637
638#[instrument(
639    skip_all,
640    parent = &cause,
641    fields(
642        %height,
643        %digest,
644        head_or_finalized = %head_or_finalized,
645    ),
646)]
647async fn run_canonicalize_task<TContext: Pacer>(
648    context: &TContext,
649    execution_node: Arc<TempoFullNode>,
650    canonicalized: LastCanonicalized,
651    Canonicalize {
652        cause,
653        head_or_finalized,
654        height,
655        digest,
656        response,
657        mut build_attributes,
658    }: Canonicalize,
659) -> (Option<LastCanonicalized>, Option<StartPayloadJob>) {
660    let new_canonicalized = match head_or_finalized {
661        HeadOrFinalized::Head => canonicalized.update_head(height, digest),
662        HeadOrFinalized::Finalized => canonicalized.update_finalized(height, digest),
663    };
664
665    if build_attributes
666        .as_ref()
667        .is_some_and(|(_, response)| response.is_canceled())
668    {
669        info!("dropping payload build request: the subscriber went away while it was queued");
670        build_attributes.take();
671    }
672
673    // Only build on top of the most recent head. If the requested parent
674    // could not be made the head (because a block above it was already
675    // finalized), the build is stale, and submitting its attributes anyway
676    // would register a build on top of the wrong block. Taking the
677    // attributes drops the response channel, which signals the failure to
678    // the subscriber.
679    if build_attributes.is_some() && new_canonicalized.forkchoice.head_block_hash != digest.0 {
680        info!("dropping payload build request: its parent cannot be made the head");
681        build_attributes.take();
682    }
683
684    let (attributes, payload_response) = build_attributes.unzip();
685
686    // The forkchoice update is submitted even if it would not change the
687    // forkchoice state: the execution layer treats it as a no-op (the FCU
688    // heartbeat relies on this).
689    match submit_forkchoice_update(
690        &execution_node,
691        context,
692        cause.clone(),
693        new_canonicalized,
694        attributes,
695        ForkchoiceUpdateKind::Canonicalize { head_or_finalized },
696    )
697    .await
698    {
699        Ok(payload_id) => {
700            if let Some(response) = response {
701                let _ = response.send(());
702            }
703            let payload_job = match (payload_response, payload_id) {
704                (Some(response), Some(payload_id)) => Some(StartPayloadJob {
705                    cause,
706                    payload_id,
707                    response,
708                }),
709                (Some(_dropped_to_signal_failure), None) => {
710                    warn!("execution layer did not return a payload id for the build request");
711                    None
712                }
713                (None, _) => None,
714            };
715            (Some(new_canonicalized), payload_job)
716        }
717        Err(error) => {
718            // Dropping the response channels signals the failure to the
719            // subscribers; the cause is only logged here.
720            warn!(%error, "forkchoice update failed");
721            (None, None)
722        }
723    }
724}
725
726/// Drives a payload build on the execution layer to completion.
727///
728/// Resolves the payload registered under `payload_id` from the execution
729/// layer's payload builder and delivers it on `response`. If the subscriber
730/// goes away before the payload is resolved (for example because the
731/// consensus engine cancelled the proposal request that triggered the
732/// build), the in-flight resolve future is dropped, which deregisters the
733/// build job from the payload builder and aborts the build.
734#[instrument(
735    skip_all,
736    parent = &cause,
737    fields(%payload_id),
738)]
739async fn run_payload_job<TContext: Pacer>(
740    context: TContext,
741    execution_node: Arc<TempoFullNode>,
742    StartPayloadJob {
743        cause,
744        payload_id,
745        mut response,
746    }: StartPayloadJob,
747) {
748    let payload = select! {
749        payload = execution_node
750            .payload_builder_handle
751            .resolve_kind(payload_id, PayloadKind::WaitForPending)
752            .pace(&context, Duration::from_millis(20))
753        => payload,
754
755        // Drops the in-flight payload-resolution, killing payload build.
756        () = response.cancellation() => {
757            info!("payload subscriber went away before the payload was resolved; killing the payload build");
758            return;
759        }
760    };
761
762    // In the failure branches, dropping the response channel signals the
763    // failure to the subscriber; the cause is only logged here.
764    match payload {
765        Some(Ok(payload)) => {
766            if response.send(payload).is_err() {
767                info!(
768                    "payload subscriber went away before the payload could be delivered; discarding it"
769                );
770            }
771        }
772        Some(Err(error)) => {
773            warn!(
774                error = %eyre::Report::new(error),
775                "payload build job failed",
776            );
777        }
778        None => {
779            warn!("no payload build job found under the payload ID");
780        }
781    }
782}
783
784#[instrument(
785    skip_all,
786    parent = &cause,
787    fields(
788        head_block_hash = %canonicalized.forkchoice.head_block_hash,
789        head_block_height = %canonicalized.head_height,
790        finalized_block_hash = %canonicalized.forkchoice.finalized_block_hash,
791        finalized_block_height = %canonicalized.finalized_height,
792        ?kind,
793    ),
794)]
795async fn submit_forkchoice_update<TContext: Pacer>(
796    execution_node: &TempoFullNode,
797    context: &TContext,
798    cause: Span,
799    canonicalized: LastCanonicalized,
800    attrs: Option<TempoPayloadAttributes>,
801    kind: ForkchoiceUpdateKind,
802) -> eyre::Result<Option<PayloadId>> {
803    let fcu_response = execution_node
804        .add_ons_handle
805        .beacon_engine_handle
806        .fork_choice_updated(canonicalized.forkchoice, attrs)
807        .pace(context, Duration::from_millis(20))
808        .await
809        .wrap_err("failed requesting execution layer to update forkchoice state")?;
810
811    if kind == ForkchoiceUpdateKind::Heartbeat {
812        if fcu_response.is_invalid() {
813            warn!(
814                payload_status = %fcu_response.payload_status,
815                "execution layer reported FCU status",
816            );
817        } else {
818            info!(
819                payload_status = %fcu_response.payload_status,
820                "execution layer reported FCU status",
821            );
822        }
823    } else {
824        debug!(
825            payload_status = %fcu_response.payload_status,
826            "execution layer reported FCU status",
827        );
828    }
829
830    if fcu_response.is_invalid() {
831        return Err(Report::msg(fcu_response.payload_status)
832            .wrap_err("execution layer responded with error for forkchoice-update"));
833    }
834
835    Ok(fcu_response.payload_id)
836}
837
838#[instrument(
839    skip_all,
840    parent = &request.cause,
841    fields(
842        block.digest = %request.block.digest(),
843        block.height = %request.block.height(),
844    ),
845    err(level = Level::WARN),
846    ret,
847)]
848async fn forward_finalized<TContext: Pacer>(
849    context: &TContext,
850    execution_node: Arc<TempoFullNode>,
851    public_key: Option<PublicKey>,
852    metrics: Metrics,
853    canonicalized: LastCanonicalized,
854    request: FinalizedBlockRequest,
855) -> eyre::Result<Option<LastCanonicalized>> {
856    let FinalizedBlockRequest {
857        cause,
858        block,
859        acknowledgment,
860        is_backfill: _,
861    } = request;
862
863    let new_canonicalized = canonicalized.update_finalized(block.height(), block.digest());
864    let forkchoice = (new_canonicalized != canonicalized).then_some(new_canonicalized);
865
866    if let Some(canonicalized) = forkchoice {
867        submit_forkchoice_update(
868            &execution_node,
869            context,
870            cause.clone(),
871            canonicalized,
872            None,
873            ForkchoiceUpdateKind::Canonicalize {
874                head_or_finalized: HeadOrFinalized::Finalized,
875            },
876        )
877        .await?;
878    }
879
880    let (block, block_access_list) = block.into_parts();
881    let consensus_context = block.header().consensus_context;
882    let payload_status = execution_node
883        .add_ons_handle
884        .beacon_engine_handle
885        .new_payload(TempoExecutionData {
886            block: Arc::new(block),
887            block_access_list,
888            // can be omitted for finalized blocks
889            validator_set: None,
890        })
891        .pace(context, Duration::from_millis(20))
892        .await
893        .wrap_err(
894            "failed sending new-payload request to execution engine to \
895                query payload status of finalized block",
896        )?;
897
898    ensure!(
899        payload_status.is_valid() || payload_status.is_syncing(),
900        "this is a problem: payload status of block-to-be-finalized was \
901            neither valid nor syncing: `{payload_status}`"
902    );
903
904    if let Some(public_key) = public_key.as_ref()
905        && consensus_context
906            .is_some_and(|context| &PublicKey::from(context.proposer.get()) == public_key)
907    {
908        metrics.finalized_blocks_proposed_by_self.inc();
909    }
910
911    acknowledgment.acknowledge();
912
913    Ok(forkchoice)
914}
915
916/// Marker to indicate whether the head hash or finalized hash should be updated.
917#[derive(Debug, Clone, Copy, PartialEq, Eq)]
918enum HeadOrFinalized {
919    Head,
920    Finalized,
921}
922
923impl std::fmt::Display for HeadOrFinalized {
924    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
925        let msg = match self {
926            Self::Head => "head",
927            Self::Finalized => "finalized",
928        };
929        f.write_str(msg)
930    }
931}