Skip to main content

tempo_commonware_node/executor/
actor.rs

1//! Drives the actual execution forwarding blocks and setting forkchoice state.
2//!
3//! This agent forwards finalized blocks from the consensus layer to the
4//! execution layer and tracks the digest of the latest finalized block.
5//! It also advances the canonical chain by sending forkchoice-updates.
6
7use std::{collections::BTreeMap, pin::Pin, sync::Arc, time::Duration};
8
9use alloy_rpc_types_engine::ForkchoiceState;
10use commonware_consensus::{Heightable as _, marshal::Update, types::Height};
11
12use commonware_runtime::{
13    Clock, ContextCell, FutureExt, Handle, Metrics, Pacer, Spawner, spawn_cell,
14};
15use commonware_utils::{Acknowledgement, acknowledgement::Exact};
16use eyre::{OptionExt as _, Report, WrapErr as _, ensure};
17use futures::{
18    FutureExt as _, StreamExt as _,
19    channel::{
20        mpsc::{self, UnboundedReceiver},
21        oneshot,
22    },
23    select_biased,
24};
25use reth_provider::{BlockHashReader, BlockNumReader as _};
26use tempo_node::{TempoExecutionData, TempoFullNode};
27use tracing::{
28    Level, Span, debug, error, error_span, info, info_span, instrument, warn, warn_span,
29};
30
31use super::{
32    Config,
33    ingress::{CanonicalizeHead, Command, Message, SubscribeFinalized},
34};
35use crate::consensus::{Digest, block::Block};
36
37/// Tracks the last forkchoice state that the executor sent to the execution layer.
38///
39/// Also tracks the corresponding heights corresponding to
40/// `forkchoice_state.head_block_hash` and
41/// `forkchoice_state.finalized_block_hash`, respectively.
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43struct LastCanonicalized {
44    forkchoice: ForkchoiceState,
45    head_height: Height,
46    finalized_height: Height,
47}
48
49impl LastCanonicalized {
50    /// Updates the finalized height and finalized block hash to `height` and `digest`.
51    ///
52    /// `height` must be ahead of the latest canonicalized finalized height. If
53    /// it is not, then this is a no-op.
54    ///
55    /// Similarly, if `height` is ahead or the same as the latest canonicalized
56    /// head height, it also updates the head height.
57    ///
58    /// This is to ensure that the finalized block hash is never ahead of the
59    /// head hash.
60    fn update_finalized(self, height: Height, digest: Digest) -> Self {
61        let mut this = self;
62        if height > this.finalized_height {
63            this.finalized_height = height;
64            this.forkchoice.safe_block_hash = digest.0;
65            this.forkchoice.finalized_block_hash = digest.0;
66        }
67        if height >= this.head_height {
68            this.head_height = height;
69            this.forkchoice.head_block_hash = digest.0;
70        }
71        this
72    }
73
74    /// Updates the head height and head block hash to `height` and `digest`.
75    ///
76    /// If `height > self.finalized_height`, this method will return a new
77    /// canonical state with `self.head_height = height` and
78    /// `self.forkchoice.head = hash`.
79    ///
80    /// If `height <= self.finalized_height`, then this method will return
81    /// `self` unchanged.
82    fn update_head(self, height: Height, digest: Digest) -> Self {
83        let mut this = self;
84        if height > this.finalized_height {
85            this.head_height = height;
86            this.forkchoice.head_block_hash = digest.0;
87        }
88        this
89    }
90}
91
92pub(crate) struct Actor<TContext> {
93    context: ContextCell<TContext>,
94
95    /// A handle to the execution node layer. Used to forward finalized blocks
96    /// and to update the canonical chain by sending forkchoice updates.
97    execution_node: TempoFullNode,
98
99    last_consensus_finalized_height: Height,
100    last_execution_finalized_height: Height,
101
102    /// The channel over which the agent will receive new commands from the
103    /// application actor.
104    mailbox: mpsc::UnboundedReceiver<Message>,
105
106    /// The mailbox of the marshal actor. Used to backfill blocks.
107    marshal: crate::alias::marshal::Mailbox,
108
109    last_canonicalized: LastCanonicalized,
110
111    /// The interval at which to send a forkchoice update heartbeat to the
112    /// execution layer.
113    fcu_heartbeat_interval: Duration,
114
115    /// The timer for the next FCU heartbeat. Reset whenever an FCU is sent.
116    fcu_heartbeat_timer: Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
117
118    /// A list of subscriptions waiting for the executor to finalize a block
119    /// at a given height.
120    pending_finalized_subscriptions: BTreeMap<Height, Vec<oneshot::Sender<()>>>,
121}
122
123impl<TContext> Actor<TContext>
124where
125    TContext: Clock + Metrics + Pacer + Spawner,
126{
127    pub(super) fn init(
128        context: TContext,
129        config: super::Config,
130        mailbox: UnboundedReceiver<super::ingress::Message>,
131    ) -> eyre::Result<Self> {
132        let Config {
133            execution_node,
134            last_finalized_height,
135            marshal,
136            fcu_heartbeat_interval,
137        } = config;
138        let last_execution_finalized_height = execution_node
139            .provider
140            .last_block_number()
141            .wrap_err("unable to read latest block number from execution layer")?;
142        let last_finalized_block_hash = execution_node
143            .provider
144            .block_hash(last_execution_finalized_height)
145            .map_or_else(
146                |e| Err(Report::new(e)),
147                |hash| hash.ok_or_eyre("execution layer does not have the block hash"),
148            )
149            .wrap_err("failed to read the last finalized block hash")?;
150        let fcu_heartbeat_timer = Box::pin(context.sleep(fcu_heartbeat_interval));
151        Ok(Self {
152            context: ContextCell::new(context),
153            execution_node,
154            last_consensus_finalized_height: last_finalized_height,
155            last_execution_finalized_height: Height::new(last_execution_finalized_height),
156            mailbox,
157            marshal,
158            last_canonicalized: LastCanonicalized {
159                forkchoice: ForkchoiceState {
160                    head_block_hash: last_finalized_block_hash,
161                    safe_block_hash: last_finalized_block_hash,
162                    finalized_block_hash: last_finalized_block_hash,
163                },
164                head_height: Height::zero(),
165                finalized_height: Height::zero(),
166            },
167            fcu_heartbeat_interval,
168            fcu_heartbeat_timer,
169            pending_finalized_subscriptions: BTreeMap::new(),
170        })
171    }
172
173    pub(crate) fn start(mut self) -> Handle<()> {
174        spawn_cell!(self.context, self.run().await)
175    }
176
177    async fn run(mut self) {
178        info_span!("start").in_scope(|| {
179            info!(
180                last_finalized_consensus_height = %self.last_consensus_finalized_height,
181                last_finalized_execution_height = %self.last_execution_finalized_height,
182                "consensus and execution layers reported last finalized heights; \
183                backfilling blocks from consensus to execution if necessary",
184            );
185        });
186
187        let mut backfill_on_start = {
188            let marshal = self.marshal.clone();
189            std::pin::pin!(
190                futures::stream::iter(
191                    self.last_execution_finalized_height.get() + 1
192                        ..=self.last_consensus_finalized_height.get(),
193                )
194                .then(move |height| {
195                    let marshal = marshal.clone();
196                    async move { (height, marshal.get_block(Height::new(height)).await) }
197                })
198                .fuse()
199            )
200        };
201
202        loop {
203            select_biased! {
204                backfill = backfill_on_start.next() => {
205                    match backfill {
206                        Some((height, Some(block))) => {
207                            let (ack, _wait) = Exact::handle();
208                            let span = info_span!("backfill_on_start", height);
209                            let _ = self.forward_finalized(
210                                span,
211                                block,
212                                ack,
213                            ).await;
214                        }
215                        Some((height, None)) => {
216                            warn_span!("backfill_on_start", height)
217                            .in_scope(|| warn!(
218                                "marshal actor did not have block even though \
219                                it must have finalized it previously",
220                            ));
221                        }
222                        None => {
223                            info_span!("backfill_on_start")
224                            .in_scope(|| info!(
225                                "no more blocks to backfill from consensus to \
226                                execution layer")
227                            );
228                        }
229                    }
230                },
231
232                msg = self.mailbox.next() => {
233                    let Some(msg) = msg else { break; };
234                    // XXX: updating forkchoice and finalizing blocks must
235                    // happen sequentially, so blocking the event loop on await
236                    // is desired.
237                    //
238                    // Backfills will be spawned as tasks and will also send
239                    // resolved the blocks to this queue.
240                    if let Err(error) = self.handle_message(msg).await {
241                        error_span!("shutdown").in_scope(|| error!(
242                            %error,
243                            "executor encountered fatal fork choice update error; \
244                            shutting down to prevent consensus-execution divergence"
245                        ));
246                        break;
247                    }
248                },
249
250                _ = (&mut self.fcu_heartbeat_timer).fuse() => {
251                    self.send_forkchoice_update_heartbeat().await;
252                    self.reset_fcu_heartbeat_timer();
253                },
254            }
255        }
256    }
257
258    fn reset_fcu_heartbeat_timer(&mut self) {
259        self.fcu_heartbeat_timer = Box::pin(self.context.sleep(self.fcu_heartbeat_interval));
260    }
261
262    #[instrument(skip_all)]
263    async fn send_forkchoice_update_heartbeat(&mut self) {
264        info!(
265            head_block_hash = %self.last_canonicalized.forkchoice.head_block_hash,
266            head_block_height = %self.last_canonicalized.head_height,
267            finalized_block_hash = %self.last_canonicalized.forkchoice.finalized_block_hash,
268            finalized_block_height = %self.last_canonicalized.finalized_height,
269            "sending FCU",
270        );
271
272        let fcu_response = self
273            .execution_node
274            .add_ons_handle
275            .beacon_engine_handle
276            .fork_choice_updated(
277                self.last_canonicalized.forkchoice,
278                None,
279                reth_node_builder::EngineApiMessageVersion::V3,
280            )
281            .pace(&self.context, Duration::from_millis(20))
282            .await;
283
284        match fcu_response {
285            Ok(response) if response.is_invalid() => {
286                warn!(
287                    payload_status = %response.payload_status,
288                    "execution layer reported FCU status",
289                );
290            }
291            Ok(response) => {
292                info!(
293                    payload_status = %response.payload_status,
294                    "execution layer reported FCU status",
295                );
296            }
297            Err(error) => {
298                warn!(
299                    error = %Report::new(error),
300                    "failed sending FCU to execution layer",
301                );
302            }
303        }
304    }
305
306    async fn handle_message(&mut self, message: Message) -> eyre::Result<()> {
307        let cause = message.cause;
308        match message.command {
309            Command::CanonicalizeHead(CanonicalizeHead {
310                height,
311                digest,
312                ack,
313            }) => {
314                // Errors are logged inside canonicalize; head canonicalization failures
315                // are non-fatal and will be retried on the next block.
316                let _ = self
317                    .canonicalize(cause, HeadOrFinalized::Head, height, digest, ack)
318                    .await;
319            }
320            Command::Finalize(finalized) => {
321                self.finalize(cause, *finalized)
322                    .await
323                    .wrap_err("failed handling finalization")?;
324            }
325            Command::SubscribeFinalized(SubscribeFinalized { height, response }) => {
326                if self.last_canonicalized.finalized_height >= height {
327                    let _ = response.send(());
328                } else {
329                    self.pending_finalized_subscriptions
330                        .entry(height)
331                        .or_default()
332                        .push(response);
333                }
334            }
335        }
336        Ok(())
337    }
338
339    /// Canonicalizes `digest` by sending a forkchoice update to the execution layer.
340    #[instrument(
341        skip_all,
342        parent = &cause,
343        fields(
344            head.height = %height,
345            head.digest = %digest,
346            %head_or_finalized,
347        ),
348        err,
349    )]
350    async fn canonicalize(
351        &mut self,
352        cause: Span,
353        head_or_finalized: HeadOrFinalized,
354        height: Height,
355        digest: Digest,
356        ack: oneshot::Sender<()>,
357    ) -> eyre::Result<()> {
358        let new_canonicalized = match head_or_finalized {
359            HeadOrFinalized::Head => self.last_canonicalized.update_head(height, digest),
360            HeadOrFinalized::Finalized => self.last_canonicalized.update_finalized(height, digest),
361        };
362
363        if new_canonicalized == self.last_canonicalized {
364            info!("would not change forkchoice state; not sending it to the execution layer");
365            let _ = ack.send(());
366            return Ok(());
367        }
368
369        info!(
370            head_block_hash = %new_canonicalized.forkchoice.head_block_hash,
371            head_block_height = %new_canonicalized.head_height,
372            finalized_block_hash = %new_canonicalized.forkchoice.finalized_block_hash,
373            finalized_block_height = %new_canonicalized.finalized_height,
374            "sending forkchoice-update",
375        );
376        let fcu_response = self
377            .execution_node
378            .add_ons_handle
379            .beacon_engine_handle
380            .fork_choice_updated(
381                new_canonicalized.forkchoice,
382                None,
383                reth_node_builder::EngineApiMessageVersion::V3,
384            )
385            .pace(&self.context, Duration::from_millis(20))
386            .await
387            .wrap_err("failed requesting execution layer to update forkchoice state")?;
388
389        debug!(
390            payload_status = %fcu_response.payload_status,
391            "execution layer reported FCU status",
392        );
393
394        if fcu_response.is_invalid() {
395            return Err(Report::msg(fcu_response.payload_status)
396                .wrap_err("execution layer responded with error for forkchoice-update"));
397        }
398
399        let _ = ack.send(());
400        self.last_canonicalized = new_canonicalized;
401        self.reset_fcu_heartbeat_timer();
402
403        Ok(())
404    }
405
406    #[instrument(parent = &cause, skip_all)]
407    /// Handles finalization events.
408    async fn finalize(&mut self, cause: Span, finalized: Update<Block>) -> eyre::Result<()> {
409        match finalized {
410            Update::Tip(_, height, digest) => {
411                self.canonicalize(
412                    Span::current(),
413                    HeadOrFinalized::Finalized,
414                    height,
415                    digest,
416                    oneshot::channel().0,
417                )
418                .await
419                .wrap_err("failed canonicalizing finalization tip")?;
420            }
421            Update::Block(block, acknowledgment) => {
422                self.forward_finalized(Span::current(), block, acknowledgment)
423                    .await
424                    .wrap_err("failed forwarding finalized block to execution layer")?;
425            }
426        }
427        Ok(())
428    }
429
430    /// Finalizes `block` by sending it to the execution layer.
431    ///
432    /// If `response` is set, `block` is considered to at the tip of the
433    /// finalized chain. The agent will also confirm the finalization  by
434    /// responding on that channel and set the digest as the latest finalized
435    /// head.
436    ///
437    /// The agent will also cache `digest` as the latest finalized digest.
438    /// The agent does not update the forkchoice state of the execution layer
439    /// here but upon serving a `Command::Canonicalize` request.
440    ///
441    /// If `response` is not set the agent assumes that `block` is an older
442    /// block backfilled from the consensus layer.
443    ///
444    /// # Invariants
445    ///
446    /// It is critical that a newer finalized block is always send after an
447    /// older finalized block. This is standard behavior of the commonmware
448    /// marshal agent.
449    #[instrument(
450        skip_all,
451        parent = &cause,
452        fields(
453            block.digest = %block.digest(),
454            block.height = %block.height(),
455        ),
456        err(level = Level::WARN),
457        ret,
458    )]
459    async fn forward_finalized(
460        &mut self,
461        cause: Span,
462        block: Block,
463        acknowledgment: Exact,
464    ) -> eyre::Result<()> {
465        let height = block.height();
466        self.canonicalize(
467            Span::current(),
468            HeadOrFinalized::Finalized,
469            block.height(),
470            block.digest(),
471            oneshot::channel().0,
472        )
473        .await
474        .wrap_err("failed canonicalizing finalized block")?;
475
476        let block = block.into_inner();
477        let payload_status = self
478            .execution_node
479            .add_ons_handle
480            .beacon_engine_handle
481            .new_payload(TempoExecutionData {
482                block: Arc::new(block),
483                // can be omitted for finalized blocks
484                validator_set: None,
485            })
486            .pace(&self.context, Duration::from_millis(20))
487            .await
488            .wrap_err(
489                "failed sending new-payload request to execution engine to \
490                query payload status of finalized block",
491            )?;
492
493        ensure!(
494            payload_status.is_valid() || payload_status.is_syncing(),
495            "this is a problem: payload status of block-to-be-finalized was \
496            neither valid nor syncing: `{payload_status}`"
497        );
498
499        acknowledgment.acknowledge();
500
501        self.pending_finalized_subscriptions.retain(|&key, value| {
502            let retain = key > height;
503            if !retain {
504                value.drain(..).for_each(|tx| {
505                    let _ = tx.send(());
506                });
507            }
508            retain
509        });
510
511        Ok(())
512    }
513}
514
515/// Marker to indicate whether the head hash or finalized hash should be updated.
516#[derive(Debug, Clone, Copy, PartialEq, Eq)]
517enum HeadOrFinalized {
518    Head,
519    Finalized,
520}
521
522impl std::fmt::Display for HeadOrFinalized {
523    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
524        let msg = match self {
525            Self::Head => "head",
526            Self::Finalized => "finalized",
527        };
528        f.write_str(msg)
529    }
530}