tempo_commonware_node/consensus/application/
executor.rs

1//! Drives the actual execution forwarding blocks and setting forkchoice state.
2//!
3//! This agent forwards finalized blocks from the consensus layer to the
4//! execution layer and tracks the digest of the latest finalized block.
5//! It also advances the canonical chain by sending forkchoice-updates.
6//!
7//! If the agent detects that the execution layer is missing blocks it attempts
8//! to backfill them from the consensus layer.
9
10use std::{sync::Arc, time::Duration};
11
12use alloy_primitives::B256;
13use alloy_rpc_types_engine::ForkchoiceState;
14use commonware_consensus::{Block as _, marshal::Update};
15
16use commonware_macros::select;
17use commonware_runtime::{ContextCell, FutureExt, Handle, Metrics, Pacer, Spawner, spawn_cell};
18use commonware_utils::{Acknowledgement, acknowledgement::Exact};
19use eyre::{Report, WrapErr as _, ensure, eyre};
20use futures::{
21    StreamExt as _,
22    channel::{mpsc, oneshot},
23};
24use reth_provider::BlockNumReader as _;
25use tempo_node::{TempoExecutionData, TempoFullNode};
26use tracing::{Level, Span, debug, info, instrument, warn};
27
28use crate::consensus::{Digest, block::Block};
29
30pub(super) struct Builder {
31    /// A handle to the execution node layer. Used to forward finalized blocks
32    /// and to update the canonical chain by sending forkchoice updates.
33    pub(super) execution_node: TempoFullNode,
34
35    /// The genesis block of the network. Used to populate fields on
36    /// the send the initial forkchoice state.
37    pub(super) genesis_block: Arc<Block>,
38
39    /// The mailbox of the marshal actor. Used to backfill blocks.
40    pub(super) marshal: crate::alias::marshal::Mailbox,
41}
42
43impl Builder {
44    /// Constructs the [`Executor`].
45    pub(super) fn build<TContext>(self, context: TContext) -> Executor<TContext>
46    where
47        TContext: Spawner,
48    {
49        let Self {
50            execution_node,
51            genesis_block,
52            marshal,
53        } = self;
54
55        let (to_me, from_app) = mpsc::unbounded();
56
57        let my_mailbox = ExecutorMailbox { inner: to_me };
58
59        let genesis_hash = genesis_block.block_hash();
60        Executor {
61            context: ContextCell::new(context),
62            execution_node,
63            mailbox: from_app,
64            marshal,
65            my_mailbox,
66            last_canonicalized: LastCanonicalized {
67                forkchoice: ForkchoiceState {
68                    head_block_hash: genesis_hash,
69                    safe_block_hash: genesis_hash,
70                    finalized_block_hash: genesis_hash,
71                },
72                head_height: 0,
73                finalized_height: 0,
74            },
75        }
76    }
77}
78
79/// Tracks the last forkchoice state that the executor sent to the execution layer.
80///
81/// Also tracks the corresponding heights corresponding to
82/// `forkchoice_state.head_block_hash` and
83/// `forkchoice_state.finalized_block_hash`, respectively.
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85struct LastCanonicalized {
86    forkchoice: ForkchoiceState,
87    head_height: u64,
88    finalized_height: u64,
89}
90
91impl LastCanonicalized {
92    /// Updates the finalized height and finalized block hash to `height` and `hash`.
93    ///
94    /// `height` must be ahead of the latest canonicalized finalized height. If
95    /// it is not, then this is a no-op.
96    ///
97    /// Similarly, if `height` is ahead or the same as the latest canonicalized
98    /// head height, it also updates the head height.
99    ///
100    /// This is to ensure that the finalized block hash is never ahead of the
101    /// head hash.
102    fn update_finalized(self, height: u64, hash: B256) -> Self {
103        let mut this = self;
104        if height > this.finalized_height {
105            this.finalized_height = height;
106            this.forkchoice.safe_block_hash = hash;
107            this.forkchoice.finalized_block_hash = hash;
108        }
109        if height >= this.head_height {
110            this.head_height = height;
111            this.forkchoice.head_block_hash = hash;
112        }
113        this
114    }
115
116    /// Updates the head height and head block hash to `height` and `hash`.
117    ///
118    /// If `height > self.finalized_height`, this method will return a new
119    /// canonical state with `self.head_height = height` and
120    /// `self.forkchoice.head = hash`.
121    ///
122    /// If `height <= self.finalized_height`, then this method will return
123    /// `self` unchanged.
124    fn update_head(self, height: u64, hash: B256) -> Self {
125        let mut this = self;
126        if height > this.finalized_height {
127            this.head_height = height;
128            this.forkchoice.head_block_hash = hash;
129        }
130        this
131    }
132}
133
134pub(super) struct Executor<TContext> {
135    context: ContextCell<TContext>,
136
137    /// A handle to the execution node layer. Used to forward finalized blocks
138    /// and to update the canonical chain by sending forkchoice updates.
139    execution_node: TempoFullNode,
140
141    /// The channel over which the agent will receive new commands from the
142    /// application actor.
143    mailbox: mpsc::UnboundedReceiver<Message>,
144
145    /// The mailbox of the marshal actor. Used to backfill blocks.
146    marshal: crate::alias::marshal::Mailbox,
147
148    /// The mailbox passed to other parts of the system to forward messages to
149    /// the agent.
150    my_mailbox: ExecutorMailbox,
151
152    last_canonicalized: LastCanonicalized,
153}
154
155impl<TContext> Executor<TContext>
156where
157    TContext: Metrics + Pacer + Spawner,
158{
159    pub(super) fn mailbox(&self) -> &ExecutorMailbox {
160        &self.my_mailbox
161    }
162
163    pub(super) fn start(mut self) -> Handle<()> {
164        spawn_cell!(self.context, self.run().await)
165    }
166
167    async fn run(mut self) {
168        loop {
169            select! {
170                msg = self.mailbox.next() => {
171                    let Some(msg) = msg else { break; };
172                    // XXX: updating forkchoice and finalizing blocks must
173                    // happen sequentially, so blocking the event loop on await
174                    // is desired.
175                    //
176                    // Backfills will be spawned as tasks and will also send
177                    // resolved the blocks to this queue.
178                    self.handle_message(msg).await;
179                },
180            }
181        }
182    }
183
184    async fn handle_message(&mut self, message: Message) {
185        let cause = message.cause;
186        match message.command {
187            Command::CanonicalizeHead {
188                height,
189                digest,
190                ack,
191            } => {
192                let _ = self
193                    .canonicalize(cause, HeadOrFinalized::Head, height, digest, ack)
194                    .await;
195            }
196            Command::Finalize(finalized) => {
197                let _ = self.finalize(cause, *finalized).await;
198            }
199        }
200    }
201
202    /// Canonicalizes `digest` by sending a forkchoice update to the execution layer.
203    #[instrument(
204        skip_all,
205        parent = &cause,
206        fields(
207            head.height = height,
208            head.digest = %digest,
209            %head_or_finalized,
210        ),
211        err,
212    )]
213    async fn canonicalize(
214        &mut self,
215        cause: Span,
216        head_or_finalized: HeadOrFinalized,
217        height: u64,
218        digest: Digest,
219        ack: oneshot::Sender<()>,
220    ) -> eyre::Result<()> {
221        let new_canonicalized = match head_or_finalized {
222            HeadOrFinalized::Head => self.last_canonicalized.update_head(height, digest.0),
223            HeadOrFinalized::Finalized => {
224                self.last_canonicalized.update_finalized(height, digest.0)
225            }
226        };
227
228        if new_canonicalized == self.last_canonicalized {
229            info!("would not change forkchoice state; not sending it to the execution layer");
230            let _ = ack.send(());
231            return Ok(());
232        }
233
234        info!(
235            head_block_hash = %new_canonicalized.forkchoice.head_block_hash,
236            head_block_height = new_canonicalized.head_height,
237            finalized_block_hash = %new_canonicalized.forkchoice.finalized_block_hash,
238            finalized_block_height = new_canonicalized.finalized_height,
239            "sending forkchoice-update",
240        );
241        let fcu_response = self
242            .execution_node
243            .add_ons_handle
244            .beacon_engine_handle
245            .fork_choice_updated(
246                new_canonicalized.forkchoice,
247                None,
248                reth_node_builder::EngineApiMessageVersion::V3,
249            )
250            .pace(&self.context, Duration::from_millis(20))
251            .await
252            .wrap_err("failed requesting execution layer to update forkchoice state")?;
253
254        debug!(
255            payload_status = %fcu_response.payload_status,
256            "execution layer reported FCU status",
257        );
258
259        if fcu_response.is_invalid() {
260            return Err(Report::msg(fcu_response.payload_status)
261                .wrap_err("execution layer responded with error for forkchoice-update"));
262        }
263
264        let _ = ack.send(());
265        self.last_canonicalized = new_canonicalized;
266
267        Ok(())
268    }
269
270    #[instrument(parent = &cause, skip_all)]
271    /// Handles finalization events.
272    async fn finalize(&mut self, cause: Span, finalized: super::ingress::Finalized) {
273        match finalized.inner {
274            Update::Tip(height, digest) => {
275                let _: Result<_, _> = self
276                    .canonicalize(
277                        Span::current(),
278                        HeadOrFinalized::Finalized,
279                        height,
280                        digest,
281                        oneshot::channel().0,
282                    )
283                    .await;
284            }
285            Update::Block(block, acknowledgment) => {
286                let _: Result<_, _> = self
287                    .forward_finalized(Span::current(), block, acknowledgment)
288                    .await;
289            }
290        }
291    }
292
293    /// Finalizes `block` by sending it to the execution layer.
294    ///
295    /// If `response` is set, `block` is considered to at the tip of the
296    /// finalized chain. The agent will also confirm the finalization  by
297    /// responding on that channel and set the digest as the latest finalized
298    /// head.
299    ///
300    /// The agent will also cache `digest` as the latest finalized digest.
301    /// The agent does not update the forkchoice state of the execution layer
302    /// here but upon serving a `Command::Canonicalize` request.
303    ///
304    /// If `response` is not set the agent assumes that `block` is an older
305    /// block backfilled from the consensus layer.
306    ///
307    /// # Invariants
308    ///
309    /// It is critical that a newer finalized block is always send after an
310    /// older finalized block. This is standard behavior of the commonmware
311    /// marshal agent.
312    #[instrument(
313        skip_all,
314        parent = &cause,
315        fields(
316            block.digest = %block.digest(),
317            block.height = block.height(),
318        ),
319        err(level = Level::WARN),
320        ret,
321    )]
322    async fn forward_finalized(
323        &mut self,
324        cause: Span,
325        block: Block,
326        acknowledgment: Exact,
327    ) -> eyre::Result<()> {
328        if let Err(error) = self
329            .canonicalize(
330                Span::current(),
331                HeadOrFinalized::Finalized,
332                block.height(),
333                block.digest(),
334                oneshot::channel().0,
335            )
336            .await
337        {
338            warn!(
339                %error,
340                "failed canonicalizing finalized block; will still attempt \
341                forwarding it to the execution layer",
342            );
343        }
344
345        if let Ok(execution_height) = self
346            .execution_node
347            .provider
348            .last_block_number()
349            .map_err(Report::new)
350            .inspect_err(|error| {
351                warn!(
352                    %error,
353                    "failed getting last finalized block from execution layer, will \
354                    finalize forward block to execution layer without extra checks, \
355                    but it might fail"
356                )
357            })
358            && execution_height + 1 < block.height()
359        {
360            info!(
361                execution.finalized_height = execution_height,
362                "hole detected; consensus attempts to finalize block with gaps \
363                on the execution layer; filling them in first",
364            );
365            let _ = self.fill_holes(execution_height + 1, block.height()).await;
366        }
367
368        let block = block.into_inner();
369        let payload_status = self
370            .execution_node
371            .add_ons_handle
372            .beacon_engine_handle
373            .new_payload(TempoExecutionData {
374                block: Arc::new(block),
375                // can be omitted for finalized blocks
376                validator_set: None,
377            })
378            .pace(&self.context, Duration::from_millis(20))
379            .await
380            .wrap_err(
381                "failed sending new-payload request to execution engine to \
382                query payload status of finalized block",
383            )?;
384
385        ensure!(
386            payload_status.is_valid() || payload_status.is_syncing(),
387            "this is a problem: payload status of block-to-be-finalized was \
388            neither valid nor syncing: `{payload_status}`"
389        );
390
391        acknowledgment.acknowledge();
392
393        Ok(())
394    }
395
396    /// Reads all blocks heights `from..to` and forwards them to the execution layer.
397    #[instrument(
398        skip_all,
399        fields(from, to),
400        err(level = Level::WARN),
401    )]
402    async fn fill_holes(&mut self, from: u64, to: u64) -> eyre::Result<()> {
403        ensure!(from <= to, "backfill range is negative");
404
405        for height in from..to {
406            let block = self.marshal.get_block(height).await.ok_or_else(|| {
407                eyre!(
408                    "marshal actor does not know about block `{height}`, but \
409                    this function expects that it has all blocks in the provided \
410                    range",
411                )
412            })?;
413
414            let digest = block.digest();
415
416            let payload_status = self
417                .execution_node
418                .add_ons_handle
419                .beacon_engine_handle
420                .new_payload(TempoExecutionData {
421                    block: Arc::new(block.into_inner()),
422                    // can be omitted for finalized blocks
423                    validator_set: None,
424                })
425                .pace(&self.context, Duration::from_millis(20))
426                .await
427                .wrap_err(
428                    "failed sending new-payload request to execution engine to \
429                    query payload status of finalized block",
430                )?;
431
432            ensure!(
433                payload_status.is_valid() || payload_status.is_syncing(),
434                "this is a problem: payload status of block `{digest}` we are \
435                trying to backfill is neither valid nor syncing: \
436                `{payload_status}`"
437            );
438        }
439        Ok(())
440    }
441}
442
443/// Marker to indicate whether the head hash or finalized hash should be updated.
444#[derive(Debug, Clone, Copy, PartialEq, Eq)]
445enum HeadOrFinalized {
446    Head,
447    Finalized,
448}
449
450impl std::fmt::Display for HeadOrFinalized {
451    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
452        let msg = match self {
453            Self::Head => "head",
454            Self::Finalized => "finalized",
455        };
456        f.write_str(msg)
457    }
458}
459
460#[derive(Clone, Debug)]
461pub(super) struct ExecutorMailbox {
462    inner: mpsc::UnboundedSender<Message>,
463}
464
465impl ExecutorMailbox {
466    /// Requests the agent to update the head of the canonical chain to `digest`.
467    pub(super) fn canonicalize_head(
468        &self,
469        height: u64,
470        digest: Digest,
471    ) -> eyre::Result<oneshot::Receiver<()>> {
472        let (tx, rx) = oneshot::channel();
473        self.inner
474            .unbounded_send(Message {
475                cause: Span::current(),
476                command: Command::CanonicalizeHead {
477                    height,
478                    digest,
479                    ack: tx,
480                },
481            })
482            .wrap_err("failed sending canonicalize request to agent, this means it exited")?;
483
484        Ok(rx)
485    }
486
487    /// Requests the agent to forward a `finalized` block to the execution layer.
488    pub(super) fn forward_finalized(
489        &self,
490        finalized: super::ingress::Finalized,
491    ) -> eyre::Result<()> {
492        self.inner
493            .unbounded_send(Message {
494                cause: Span::current(),
495                command: Command::Finalize(finalized.into()),
496            })
497            .wrap_err("failed sending finalization request to agent, this means it exited")
498    }
499}
500
501#[derive(Debug)]
502struct Message {
503    cause: Span,
504    command: Command,
505}
506
507#[derive(Debug)]
508enum Command {
509    /// Requests the agent to set the head of the canonical chain to `digest`.
510    CanonicalizeHead {
511        height: u64,
512        digest: Digest,
513        ack: oneshot::Sender<()>,
514    },
515    /// Requests the agent to forward a finalization event to the execution layer.
516    Finalize(Box<super::ingress::Finalized>),
517}