Skip to main content

tempo_commonware_node/executor/
actor.rs

1//! Drives the actual execution forwarding blocks and setting forkchoice state.
2//!
3//! This agent forwards finalized blocks from the consensus layer to the
4//! execution layer and tracks the digest of the latest finalized block.
5//! It also advances the canonical chain by sending forkchoice-updates.
6
7use std::{ops::RangeInclusive, pin::Pin, sync::Arc, time::Duration};
8
9use alloy_rpc_types_engine::{ForkchoiceState, PayloadId};
10use commonware_consensus::{Heightable as _, marshal::Update, types::Height};
11use commonware_cryptography::ed25519::PublicKey;
12use commonware_runtime::{Clock, ContextCell, FutureExt, Handle, Pacer, Spawner, spawn_cell};
13use commonware_utils::{Acknowledgement, acknowledgement::Exact};
14use eyre::{Report, WrapErr as _, ensure};
15use futures::{
16    FutureExt as _, StreamExt as _,
17    channel::{
18        mpsc::{self, UnboundedReceiver},
19        oneshot,
20    },
21    future::{BoxFuture, Ready, ready},
22    stream::FuturesOrdered,
23};
24use prometheus_client::metrics::counter::Counter;
25use reth_ethereum::{chainspec::EthChainSpec, rpc::eth::primitives::BlockNumHash};
26use reth_provider::BlockNumReader as _;
27use tempo_node::{TempoExecutionData, TempoFullNode};
28use tempo_payload_types::TempoPayloadAttributes;
29use tokio::select;
30use tracing::{
31    Level, Span, debug, error, error_span, info, info_span, instrument, warn, warn_span,
32};
33
34use super::{
35    Config,
36    ingress::{CanonicalizeHead, Command, Message},
37};
38use crate::{
39    consensus::{Digest, block::Block},
40    executor::ingress::CanonicalizeAndBuild,
41    utils::OptionFuture,
42};
43
44/// Tracks the last forkchoice state that the executor sent to the execution layer.
45///
46/// Also tracks the corresponding heights corresponding to
47/// `forkchoice_state.head_block_hash` and
48/// `forkchoice_state.finalized_block_hash`, respectively.
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50struct LastCanonicalized {
51    forkchoice: ForkchoiceState,
52    head_height: Height,
53    finalized_height: Height,
54}
55
56impl LastCanonicalized {
57    /// Updates the finalized height and finalized block hash to `height` and `digest`.
58    ///
59    /// `height` must be ahead of the latest canonicalized finalized height. If
60    /// it is not, then this is a no-op.
61    ///
62    /// Similarly, if `height` is ahead or the same as the latest canonicalized
63    /// head height, it also updates the head height.
64    ///
65    /// This is to ensure that the finalized block hash is never ahead of the
66    /// head hash.
67    fn update_finalized(self, height: Height, digest: Digest) -> Self {
68        let mut this = self;
69        if height > this.finalized_height {
70            this.finalized_height = height;
71            this.forkchoice.safe_block_hash = digest.0;
72            this.forkchoice.finalized_block_hash = digest.0;
73        }
74        if height >= this.head_height {
75            this.head_height = height;
76            this.forkchoice.head_block_hash = digest.0;
77        }
78        this
79    }
80
81    /// Updates the head height and head block hash to `height` and `digest`.
82    ///
83    /// If `height > self.finalized_height` or `digest` is the same as the finalized block hash,
84    /// this method will return a new canonical state with `self.head_height = height` and
85    /// `self.forkchoice.head = hash`.
86    ///
87    /// If `height <= self.finalized_height`, then this method will return
88    /// `self` unchanged.
89    fn update_head(self, height: Height, digest: Digest) -> Self {
90        let mut this = self;
91        if height > this.finalized_height || digest.0 == this.forkchoice.finalized_block_hash {
92            this.head_height = height;
93            this.forkchoice.head_block_hash = digest.0;
94        }
95        this
96    }
97}
98
99pub(crate) struct Actor<TContext> {
100    context: ContextCell<TContext>,
101
102    /// A handle to the execution node layer. Used to forward finalized blocks
103    /// and to update the canonical chain by sending forkchoice updates.
104    execution_node: Arc<TempoFullNode>,
105
106    last_consensus_finalized_height: Height,
107    last_execution_finalized_height: Height,
108
109    /// The channel over which the agent will receive new commands from the
110    /// application actor.
111    mailbox: mpsc::UnboundedReceiver<Message>,
112
113    /// The mailbox of the marshal actor. Used to backfill blocks.
114    marshal: crate::alias::marshal::Mailbox,
115
116    last_canonicalized: LastCanonicalized,
117
118    /// The interval at which to send a forkchoice update heartbeat to the
119    /// execution layer.
120    fcu_heartbeat_interval: Duration,
121
122    /// The timer for the next FCU heartbeat. Reset whenever an FCU is sent.
123    fcu_heartbeat_timer: Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
124
125    /// Gap between the last finalized block on the consensus and execution
126    /// layers. Needs to be handled on startup because the execution layer does
127    /// not reliably flush all blocks.
128    finalized_heights_to_backfill: RangeInclusive<u64>,
129
130    /// Backfills that are currently in-flight and are awaiting resolution.
131    pending_backfill: OptionFuture<BoxFuture<'static, (u64, Option<Block>)>>,
132
133    /// Blocks received from the marshal actor that are awaiting execution and
134    /// acknowledgement. FuturesOrdered because it is nicer to use as a stream
135    /// in a select-loop.
136    pending_finalizations: FuturesOrdered<Ready<(Span, Block, Exact)>>,
137
138    latest_observed_finalized_tip: Option<(Height, Digest)>,
139
140    /// The node's ed25519 public key if the node is participating in
141    /// consensus. Not set if not, for example for followers.
142    public_key: Option<PublicKey>,
143
144    metrics: Metrics,
145}
146
147#[derive(Clone)]
148struct Metrics {
149    /// Number of finalized blocks whose proposer matches this node's public key.
150    finalized_blocks_proposed_by_self: Counter,
151}
152
153impl Metrics {
154    fn init<TContext>(context: &TContext) -> Self
155    where
156        TContext: commonware_runtime::Metrics,
157    {
158        let finalized_blocks_proposed_by_self = Counter::default();
159        context.register(
160            "finalized_blocks_proposed_by_self",
161            "number of finalized blocks whose proposer matches this node's public key",
162            finalized_blocks_proposed_by_self.clone(),
163        );
164        Self {
165            finalized_blocks_proposed_by_self,
166        }
167    }
168}
169
170impl<TContext> Actor<TContext>
171where
172    TContext: Clock + commonware_runtime::Metrics + Pacer + Spawner,
173{
174    pub(super) fn init(
175        context: TContext,
176        config: super::Config,
177        mailbox: UnboundedReceiver<super::ingress::Message>,
178    ) -> eyre::Result<Self> {
179        let Config {
180            execution_node,
181            last_finalized_height,
182            marshal,
183            fcu_heartbeat_interval,
184            public_key,
185        } = config;
186        let metrics = Metrics::init(&context);
187        let last_execution_finalized_height = execution_node
188            .provider
189            .last_block_number()
190            .wrap_err("unable to read latest block number from execution layer")?;
191
192        let canonical_state = execution_node.provider.canonical_in_memory_state();
193        let finalized_num_hash = canonical_state
194            .get_finalized_num_hash()
195            .unwrap_or_else(|| BlockNumHash::new(0, execution_node.chain_spec().genesis_hash()));
196        let head_num_hash: BlockNumHash = canonical_state.chain_info().into();
197
198        let fcu_heartbeat_timer = Box::pin(context.sleep(fcu_heartbeat_interval));
199        let finalized_heights_to_backfill =
200            (last_execution_finalized_height + 1)..=last_finalized_height.get();
201        let last_execution_finalized_height = Height::new(last_execution_finalized_height);
202        Ok(Self {
203            context: ContextCell::new(context),
204            execution_node,
205            last_consensus_finalized_height: last_finalized_height,
206            last_execution_finalized_height,
207            mailbox,
208            marshal,
209            last_canonicalized: LastCanonicalized {
210                forkchoice: ForkchoiceState {
211                    head_block_hash: head_num_hash.hash,
212                    safe_block_hash: finalized_num_hash.hash,
213                    finalized_block_hash: finalized_num_hash.hash,
214                },
215                head_height: Height::new(head_num_hash.number),
216                finalized_height: Height::new(finalized_num_hash.number),
217            },
218            fcu_heartbeat_interval,
219            fcu_heartbeat_timer,
220
221            finalized_heights_to_backfill,
222            pending_backfill: OptionFuture::none(),
223            pending_finalizations: FuturesOrdered::new(),
224
225            latest_observed_finalized_tip: None,
226
227            public_key,
228            metrics,
229        })
230    }
231
232    pub(crate) fn start(mut self) -> Handle<()> {
233        spawn_cell!(self.context, self.run())
234    }
235
236    async fn run(mut self) {
237        info_span!("start").in_scope(|| {
238            info!(
239                last_finalized_consensus_height = %self.last_consensus_finalized_height,
240                last_finalized_execution_height = %self.last_execution_finalized_height,
241                "consensus and execution layers reported last finalized heights; \
242                backfilling blocks from consensus to execution if necessary",
243            );
244        });
245
246        loop {
247            if self.pending_backfill.is_none()
248                && let Some(height) = self.finalized_heights_to_backfill.next()
249            {
250                self.pending_backfill.replace({
251                    let marshal = self.marshal.clone();
252                    async move { (height, marshal.get_block(Height::new(height)).await) }.boxed()
253                });
254            }
255
256            let finalized_tip_has_moved =
257                self.latest_observed_finalized_tip
258                    .is_some_and(|(height, digest)| {
259                        self.last_canonicalized
260                            != self.last_canonicalized.update_finalized(height, digest)
261                    });
262
263            select! {
264                biased;
265
266                // Complete all backfills first.
267                block = &mut self.pending_backfill => {
268                    match block {
269                        (height, Some(block)) => {
270                            let (ack, _wait) = Exact::handle();
271                            let span = info_span!("backfill_on_start", %height);
272                            let _ = self.forward_finalized(
273                                span,
274                                block,
275                                ack,
276                            ).await;
277                        }
278                        (height, None) => {
279                            warn_span!("backfill_on_start", %height)
280                            .in_scope(|| warn!(
281                                "marshal actor did not have block even though \
282                                it must have finalized it previously",
283                            ));
284                        }
285                    }
286                }
287
288                // Then forward all finalizations.
289                Some((cause, block, ack)) = self.pending_finalizations.next()
290                , if self.pending_backfill.is_none()
291                => {
292                    // Error is emitted on function return.
293                    if let Err(error) = self.forward_finalized(cause, block, ack).await
294                    {
295                        error_span!("shutdown").in_scope(|| error!(
296                            %error,
297                            "executor encountered fatal fork choice update error; \
298                            shutting down to prevent consensus-execution divergence"
299                        ));
300                        break;
301                    }
302                }
303
304                // Update the finalized tip if it has moved.
305                Some((height, digest)) = ready(self.latest_observed_finalized_tip)
306                , if finalized_tip_has_moved
307                && self.pending_backfill.is_none()
308                => {
309                    let (response, _rx) = oneshot::channel();
310                    self.canonicalize(
311                        Span::current(),
312                        HeadOrFinalized::Finalized,
313                        height,
314                        digest,
315                        JustCanonicalizeOrAlsoBuild::JustCanonicalize { response },
316                    )
317                    .await;
318                }
319
320                // Serve requests lasts.
321                msg = self.mailbox.next() => {
322                    let Some(msg) = msg else { break; };
323                    // XXX: updating forkchoice and finalizing blocks must
324                    // happen sequentially, so blocking the event loop on await
325                    // is desired.
326                    //
327                    // Backfills will be spawned as tasks and will also send
328                    // resolved the blocks to this queue.
329                    if let Err(error) = self.handle_message(msg).await {
330                        error_span!("shutdown").in_scope(|| error!(
331                            %error,
332                            "executor encountered fatal fork choice update error; \
333                            shutting down to prevent consensus-execution divergence"
334                        ));
335                        break;
336                    }
337                },
338
339                _ = (&mut self.fcu_heartbeat_timer).fuse() => {
340                    self.send_forkchoice_update_heartbeat().await;
341                    self.reset_fcu_heartbeat_timer();
342                },
343            }
344        }
345    }
346
347    fn reset_fcu_heartbeat_timer(&mut self) {
348        self.fcu_heartbeat_timer = Box::pin(self.context.sleep(self.fcu_heartbeat_interval));
349    }
350
351    #[instrument(skip_all)]
352    async fn send_forkchoice_update_heartbeat(&mut self) {
353        info!(
354            head_block_hash = %self.last_canonicalized.forkchoice.head_block_hash,
355            head_block_height = %self.last_canonicalized.head_height,
356            finalized_block_hash = %self.last_canonicalized.forkchoice.finalized_block_hash,
357            finalized_block_height = %self.last_canonicalized.finalized_height,
358            "sending FCU",
359        );
360
361        let fcu_response = self
362            .execution_node
363            .add_ons_handle
364            .beacon_engine_handle
365            .fork_choice_updated(self.last_canonicalized.forkchoice, None)
366            .pace(&self.context, Duration::from_millis(20))
367            .await;
368
369        match fcu_response {
370            Ok(response) if response.is_invalid() => {
371                warn!(
372                    payload_status = %response.payload_status,
373                    "execution layer reported FCU status",
374                );
375            }
376            Ok(response) => {
377                info!(
378                    payload_status = %response.payload_status,
379                    "execution layer reported FCU status",
380                );
381            }
382            Err(error) => {
383                warn!(
384                    error = %Report::new(error),
385                    "failed sending FCU to execution layer",
386                );
387            }
388        }
389    }
390
391    async fn handle_message(&mut self, message: Message) -> eyre::Result<()> {
392        let cause = message.cause;
393        let is_backfilling =
394            self.pending_backfill.is_some() || !self.finalized_heights_to_backfill.is_empty();
395        match message.command {
396            Command::CanonicalizeHead(..) | Command::CanonicalizeAndBuild(..) if is_backfilling => {
397                info_span!("handle_message")
398                    .in_scope(|| info!("request to canonicalize dropped while backfilling"));
399            }
400            Command::CanonicalizeHead(CanonicalizeHead {
401                height,
402                digest,
403                response,
404            }) => {
405                self.canonicalize(
406                    cause,
407                    HeadOrFinalized::Head,
408                    height,
409                    digest,
410                    JustCanonicalizeOrAlsoBuild::JustCanonicalize { response },
411                )
412                .await;
413            }
414            Command::CanonicalizeAndBuild(CanonicalizeAndBuild {
415                height,
416                digest,
417                attributes,
418                response,
419            }) => {
420                self.canonicalize(
421                    cause,
422                    HeadOrFinalized::Head,
423                    height,
424                    digest,
425                    JustCanonicalizeOrAlsoBuild::AlsoBuild {
426                        response,
427                        attributes: Box::new(*attributes),
428                    },
429                )
430                .await;
431            }
432            Command::Finalize(finalized) => match *finalized {
433                Update::Tip(_, height, digest) => {
434                    self.latest_observed_finalized_tip.replace((height, digest));
435                }
436                Update::Block(block, acknowledgement) => {
437                    self.pending_finalizations
438                        .push_back(ready((cause, block, acknowledgement)));
439                }
440            },
441        }
442        Ok(())
443    }
444
445    /// Canonicalizes `digest` by sending a forkchoice update to the execution layer.
446    #[instrument(
447        skip_all,
448        parent = &cause,
449        fields(
450            head.height = %height,
451            head.digest = %digest,
452            %head_or_finalized,
453        ),
454    )]
455    async fn canonicalize(
456        &mut self,
457        cause: Span,
458        head_or_finalized: HeadOrFinalized,
459        height: Height,
460        digest: Digest,
461        maybe_build: JustCanonicalizeOrAlsoBuild,
462    ) {
463        let new_canonicalized = match head_or_finalized {
464            HeadOrFinalized::Head => self.last_canonicalized.update_head(height, digest),
465            HeadOrFinalized::Finalized => self.last_canonicalized.update_finalized(height, digest),
466        };
467
468        if new_canonicalized == self.last_canonicalized
469            && let JustCanonicalizeOrAlsoBuild::JustCanonicalize { response } = maybe_build
470        {
471            info!("would not change forkchoice state; not sending it to the execution layer");
472            let _ = response.send(Ok(()));
473            return;
474        }
475
476        info!(
477            head_block_hash = %new_canonicalized.forkchoice.head_block_hash,
478            head_block_height = %new_canonicalized.head_height,
479            finalized_block_hash = %new_canonicalized.forkchoice.finalized_block_hash,
480            finalized_block_height = %new_canonicalized.finalized_height,
481            "sending forkchoice-update",
482        );
483        let fcu_response = match self
484            .execution_node
485            .add_ons_handle
486            .beacon_engine_handle
487            .fork_choice_updated(
488                new_canonicalized.forkchoice,
489                maybe_build.attributes().cloned(),
490            )
491            .pace(&self.context, Duration::from_millis(20))
492            .await
493            .wrap_err("failed requesting execution layer to update forkchoice state")
494        {
495            Err(error) => {
496                maybe_build.send_error(error);
497                return;
498            }
499            Ok(response) => response,
500        };
501
502        debug!(
503            payload_status = %fcu_response.payload_status,
504            "execution layer reported FCU status",
505        );
506
507        if fcu_response.is_invalid() {
508            maybe_build.send_error(
509                Report::msg(fcu_response.payload_status)
510                    .wrap_err("execution layer responded with error for forkchoice-update"),
511            );
512            return;
513        }
514
515        match maybe_build {
516            JustCanonicalizeOrAlsoBuild::JustCanonicalize { response } => {
517                let _ = response.send(Ok(()));
518            }
519            JustCanonicalizeOrAlsoBuild::AlsoBuild { response, .. } => {
520                if let Some(payload_id) = fcu_response.payload_id {
521                    let _ = response.send(Ok(payload_id));
522                }
523            }
524        }
525        self.last_canonicalized = new_canonicalized;
526        self.reset_fcu_heartbeat_timer();
527    }
528
529    /// Finalizes `block` by sending it to the execution layer.
530    ///
531    /// If `response` is set, `block` is considered to at the tip of the
532    /// finalized chain. The agent will also confirm the finalization  by
533    /// responding on that channel and set the digest as the latest finalized
534    /// head.
535    ///
536    /// The agent will also cache `digest` as the latest finalized digest.
537    /// The agent does not update the forkchoice state of the execution layer
538    /// here but upon serving a `Command::Canonicalize` request.
539    ///
540    /// If `response` is not set the agent assumes that `block` is an older
541    /// block backfilled from the consensus layer.
542    ///
543    /// # Invariants
544    ///
545    /// It is critical that a newer finalized block is always send after an
546    /// older finalized block. This is standard behavior of the commonmware
547    /// marshal agent.
548    #[instrument(
549        skip_all,
550        parent = &cause,
551        fields(
552            block.digest = %block.digest(),
553            block.height = %block.height(),
554        ),
555        err(level = Level::WARN),
556        ret,
557    )]
558    async fn forward_finalized(
559        &mut self,
560        cause: Span,
561        block: Block,
562        acknowledgment: Exact,
563    ) -> eyre::Result<()> {
564        let (response, rx) = oneshot::channel();
565        self.canonicalize(
566            Span::current(),
567            HeadOrFinalized::Finalized,
568            block.height(),
569            block.digest(),
570            JustCanonicalizeOrAlsoBuild::JustCanonicalize { response },
571        )
572        .await;
573        rx.await
574            .wrap_err("executor dropped channel")
575            .and_then(|res| res)?;
576
577        let block = block.into_inner();
578        let consensus_context = block.header().consensus_context;
579        let payload_status = self
580            .execution_node
581            .add_ons_handle
582            .beacon_engine_handle
583            .new_payload(TempoExecutionData {
584                block: Arc::new(block),
585                // can be omitted for finalized blocks
586                validator_set: None,
587            })
588            .pace(&self.context, Duration::from_millis(20))
589            .await
590            .wrap_err(
591                "failed sending new-payload request to execution engine to \
592                query payload status of finalized block",
593            )?;
594
595        ensure!(
596            payload_status.is_valid() || payload_status.is_syncing(),
597            "this is a problem: payload status of block-to-be-finalized was \
598            neither valid nor syncing: `{payload_status}`"
599        );
600
601        if let Some(public_key) = self.public_key.as_ref()
602            && consensus_context
603                .is_some_and(|context| &PublicKey::from(context.proposer.get()) == public_key)
604        {
605            self.metrics.finalized_blocks_proposed_by_self.inc();
606        }
607
608        acknowledgment.acknowledge();
609
610        Ok(())
611    }
612}
613
614/// Controls canonicalization: if attributes are sent, the FCU also builds a payload.
615enum JustCanonicalizeOrAlsoBuild {
616    JustCanonicalize {
617        response: oneshot::Sender<eyre::Result<()>>,
618    },
619    AlsoBuild {
620        response: oneshot::Sender<eyre::Result<PayloadId>>,
621        attributes: Box<TempoPayloadAttributes>,
622    },
623}
624
625impl JustCanonicalizeOrAlsoBuild {
626    fn attributes(&self) -> Option<&TempoPayloadAttributes> {
627        match self {
628            Self::JustCanonicalize { .. } => None,
629            Self::AlsoBuild { attributes, .. } => Some(attributes),
630        }
631    }
632    fn send_error(self, error: eyre::Report) {
633        match self {
634            Self::JustCanonicalize { response } => {
635                let _ = response.send(Err(error));
636            }
637            Self::AlsoBuild { response, .. } => {
638                let _ = response.send(Err(error));
639            }
640        }
641    }
642}
643
644/// Marker to indicate whether the head hash or finalized hash should be updated.
645#[derive(Debug, Clone, Copy, PartialEq, Eq)]
646enum HeadOrFinalized {
647    Head,
648    Finalized,
649}
650
651impl std::fmt::Display for HeadOrFinalized {
652    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
653        let msg = match self {
654            Self::Head => "head",
655            Self::Finalized => "finalized",
656        };
657        f.write_str(msg)
658    }
659}