1use std::{ops::RangeInclusive, pin::Pin, sync::Arc, time::Duration};
8
9use alloy_rpc_types_engine::{ForkchoiceState, PayloadId};
10use commonware_consensus::{Heightable as _, marshal::Update, types::Height};
11use commonware_cryptography::ed25519::PublicKey;
12use commonware_runtime::{Clock, ContextCell, FutureExt, Handle, Pacer, Spawner, spawn_cell};
13use commonware_utils::{Acknowledgement, acknowledgement::Exact};
14use eyre::{Report, WrapErr as _, ensure};
15use futures::{
16 FutureExt as _, StreamExt as _,
17 channel::{
18 mpsc::{self, UnboundedReceiver},
19 oneshot,
20 },
21 future::{BoxFuture, Ready, ready},
22 stream::FuturesOrdered,
23};
24use prometheus_client::metrics::counter::Counter;
25use reth_ethereum::{chainspec::EthChainSpec, rpc::eth::primitives::BlockNumHash};
26use reth_provider::BlockNumReader as _;
27use tempo_node::{TempoExecutionData, TempoFullNode};
28use tempo_payload_types::TempoPayloadAttributes;
29use tokio::select;
30use tracing::{
31 Level, Span, debug, error, error_span, info, info_span, instrument, warn, warn_span,
32};
33
34use super::{
35 Config,
36 ingress::{CanonicalizeHead, Command, Message},
37};
38use crate::{
39 consensus::{Digest, block::Block},
40 executor::ingress::CanonicalizeAndBuild,
41 utils::OptionFuture,
42};
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50struct LastCanonicalized {
51 forkchoice: ForkchoiceState,
52 head_height: Height,
53 finalized_height: Height,
54}
55
56impl LastCanonicalized {
57 fn update_finalized(self, height: Height, digest: Digest) -> Self {
68 let mut this = self;
69 if height > this.finalized_height {
70 this.finalized_height = height;
71 this.forkchoice.safe_block_hash = digest.0;
72 this.forkchoice.finalized_block_hash = digest.0;
73 }
74 if height >= this.head_height {
75 this.head_height = height;
76 this.forkchoice.head_block_hash = digest.0;
77 }
78 this
79 }
80
81 fn update_head(self, height: Height, digest: Digest) -> Self {
90 let mut this = self;
91 if height > this.finalized_height || digest.0 == this.forkchoice.finalized_block_hash {
92 this.head_height = height;
93 this.forkchoice.head_block_hash = digest.0;
94 }
95 this
96 }
97}
98
99pub(crate) struct Actor<TContext> {
100 context: ContextCell<TContext>,
101
102 execution_node: Arc<TempoFullNode>,
105
106 last_consensus_finalized_height: Height,
107 last_execution_finalized_height: Height,
108
109 mailbox: mpsc::UnboundedReceiver<Message>,
112
113 marshal: crate::alias::marshal::Mailbox,
115
116 last_canonicalized: LastCanonicalized,
117
118 fcu_heartbeat_interval: Duration,
121
122 fcu_heartbeat_timer: Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
124
125 finalized_heights_to_backfill: RangeInclusive<u64>,
129
130 pending_backfill: OptionFuture<BoxFuture<'static, (u64, Option<Block>)>>,
132
133 pending_finalizations: FuturesOrdered<Ready<(Span, Block, Exact)>>,
137
138 latest_observed_finalized_tip: Option<(Height, Digest)>,
139
140 public_key: Option<PublicKey>,
143
144 metrics: Metrics,
145}
146
147#[derive(Clone)]
148struct Metrics {
149 finalized_blocks_proposed_by_self: Counter,
151}
152
153impl Metrics {
154 fn init<TContext>(context: &TContext) -> Self
155 where
156 TContext: commonware_runtime::Metrics,
157 {
158 let finalized_blocks_proposed_by_self = Counter::default();
159 context.register(
160 "finalized_blocks_proposed_by_self",
161 "number of finalized blocks whose proposer matches this node's public key",
162 finalized_blocks_proposed_by_self.clone(),
163 );
164 Self {
165 finalized_blocks_proposed_by_self,
166 }
167 }
168}
169
170impl<TContext> Actor<TContext>
171where
172 TContext: Clock + commonware_runtime::Metrics + Pacer + Spawner,
173{
174 pub(super) fn init(
175 context: TContext,
176 config: super::Config,
177 mailbox: UnboundedReceiver<super::ingress::Message>,
178 ) -> eyre::Result<Self> {
179 let Config {
180 execution_node,
181 last_finalized_height,
182 marshal,
183 fcu_heartbeat_interval,
184 public_key,
185 } = config;
186 let metrics = Metrics::init(&context);
187 let last_execution_finalized_height = execution_node
188 .provider
189 .last_block_number()
190 .wrap_err("unable to read latest block number from execution layer")?;
191
192 let canonical_state = execution_node.provider.canonical_in_memory_state();
193 let finalized_num_hash = canonical_state
194 .get_finalized_num_hash()
195 .unwrap_or_else(|| BlockNumHash::new(0, execution_node.chain_spec().genesis_hash()));
196 let head_num_hash: BlockNumHash = canonical_state.chain_info().into();
197
198 let fcu_heartbeat_timer = Box::pin(context.sleep(fcu_heartbeat_interval));
199 let finalized_heights_to_backfill =
200 (last_execution_finalized_height + 1)..=last_finalized_height.get();
201 let last_execution_finalized_height = Height::new(last_execution_finalized_height);
202 Ok(Self {
203 context: ContextCell::new(context),
204 execution_node,
205 last_consensus_finalized_height: last_finalized_height,
206 last_execution_finalized_height,
207 mailbox,
208 marshal,
209 last_canonicalized: LastCanonicalized {
210 forkchoice: ForkchoiceState {
211 head_block_hash: head_num_hash.hash,
212 safe_block_hash: finalized_num_hash.hash,
213 finalized_block_hash: finalized_num_hash.hash,
214 },
215 head_height: Height::new(head_num_hash.number),
216 finalized_height: Height::new(finalized_num_hash.number),
217 },
218 fcu_heartbeat_interval,
219 fcu_heartbeat_timer,
220
221 finalized_heights_to_backfill,
222 pending_backfill: OptionFuture::none(),
223 pending_finalizations: FuturesOrdered::new(),
224
225 latest_observed_finalized_tip: None,
226
227 public_key,
228 metrics,
229 })
230 }
231
232 pub(crate) fn start(mut self) -> Handle<()> {
233 spawn_cell!(self.context, self.run())
234 }
235
236 async fn run(mut self) {
237 info_span!("start").in_scope(|| {
238 info!(
239 last_finalized_consensus_height = %self.last_consensus_finalized_height,
240 last_finalized_execution_height = %self.last_execution_finalized_height,
241 "consensus and execution layers reported last finalized heights; \
242 backfilling blocks from consensus to execution if necessary",
243 );
244 });
245
246 loop {
247 if self.pending_backfill.is_none()
248 && let Some(height) = self.finalized_heights_to_backfill.next()
249 {
250 self.pending_backfill.replace({
251 let marshal = self.marshal.clone();
252 async move { (height, marshal.get_block(Height::new(height)).await) }.boxed()
253 });
254 }
255
256 let finalized_tip_has_moved =
257 self.latest_observed_finalized_tip
258 .is_some_and(|(height, digest)| {
259 self.last_canonicalized
260 != self.last_canonicalized.update_finalized(height, digest)
261 });
262
263 select! {
264 biased;
265
266 block = &mut self.pending_backfill => {
268 match block {
269 (height, Some(block)) => {
270 let (ack, _wait) = Exact::handle();
271 let span = info_span!("backfill_on_start", %height);
272 let _ = self.forward_finalized(
273 span,
274 block,
275 ack,
276 ).await;
277 }
278 (height, None) => {
279 warn_span!("backfill_on_start", %height)
280 .in_scope(|| warn!(
281 "marshal actor did not have block even though \
282 it must have finalized it previously",
283 ));
284 }
285 }
286 }
287
288 Some((cause, block, ack)) = self.pending_finalizations.next()
290 , if self.pending_backfill.is_none()
291 => {
292 if let Err(error) = self.forward_finalized(cause, block, ack).await
294 {
295 error_span!("shutdown").in_scope(|| error!(
296 %error,
297 "executor encountered fatal fork choice update error; \
298 shutting down to prevent consensus-execution divergence"
299 ));
300 break;
301 }
302 }
303
304 Some((height, digest)) = ready(self.latest_observed_finalized_tip)
306 , if finalized_tip_has_moved
307 && self.pending_backfill.is_none()
308 => {
309 let (response, _rx) = oneshot::channel();
310 self.canonicalize(
311 Span::current(),
312 HeadOrFinalized::Finalized,
313 height,
314 digest,
315 JustCanonicalizeOrAlsoBuild::JustCanonicalize { response },
316 )
317 .await;
318 }
319
320 msg = self.mailbox.next() => {
322 let Some(msg) = msg else { break; };
323 if let Err(error) = self.handle_message(msg).await {
330 error_span!("shutdown").in_scope(|| error!(
331 %error,
332 "executor encountered fatal fork choice update error; \
333 shutting down to prevent consensus-execution divergence"
334 ));
335 break;
336 }
337 },
338
339 _ = (&mut self.fcu_heartbeat_timer).fuse() => {
340 self.send_forkchoice_update_heartbeat().await;
341 self.reset_fcu_heartbeat_timer();
342 },
343 }
344 }
345 }
346
347 fn reset_fcu_heartbeat_timer(&mut self) {
348 self.fcu_heartbeat_timer = Box::pin(self.context.sleep(self.fcu_heartbeat_interval));
349 }
350
351 #[instrument(skip_all)]
352 async fn send_forkchoice_update_heartbeat(&mut self) {
353 info!(
354 head_block_hash = %self.last_canonicalized.forkchoice.head_block_hash,
355 head_block_height = %self.last_canonicalized.head_height,
356 finalized_block_hash = %self.last_canonicalized.forkchoice.finalized_block_hash,
357 finalized_block_height = %self.last_canonicalized.finalized_height,
358 "sending FCU",
359 );
360
361 let fcu_response = self
362 .execution_node
363 .add_ons_handle
364 .beacon_engine_handle
365 .fork_choice_updated(self.last_canonicalized.forkchoice, None)
366 .pace(&self.context, Duration::from_millis(20))
367 .await;
368
369 match fcu_response {
370 Ok(response) if response.is_invalid() => {
371 warn!(
372 payload_status = %response.payload_status,
373 "execution layer reported FCU status",
374 );
375 }
376 Ok(response) => {
377 info!(
378 payload_status = %response.payload_status,
379 "execution layer reported FCU status",
380 );
381 }
382 Err(error) => {
383 warn!(
384 error = %Report::new(error),
385 "failed sending FCU to execution layer",
386 );
387 }
388 }
389 }
390
391 async fn handle_message(&mut self, message: Message) -> eyre::Result<()> {
392 let cause = message.cause;
393 let is_backfilling =
394 self.pending_backfill.is_some() || !self.finalized_heights_to_backfill.is_empty();
395 match message.command {
396 Command::CanonicalizeHead(..) | Command::CanonicalizeAndBuild(..) if is_backfilling => {
397 info_span!("handle_message")
398 .in_scope(|| info!("request to canonicalize dropped while backfilling"));
399 }
400 Command::CanonicalizeHead(CanonicalizeHead {
401 height,
402 digest,
403 response,
404 }) => {
405 self.canonicalize(
406 cause,
407 HeadOrFinalized::Head,
408 height,
409 digest,
410 JustCanonicalizeOrAlsoBuild::JustCanonicalize { response },
411 )
412 .await;
413 }
414 Command::CanonicalizeAndBuild(CanonicalizeAndBuild {
415 height,
416 digest,
417 attributes,
418 response,
419 }) => {
420 self.canonicalize(
421 cause,
422 HeadOrFinalized::Head,
423 height,
424 digest,
425 JustCanonicalizeOrAlsoBuild::AlsoBuild {
426 response,
427 attributes: Box::new(*attributes),
428 },
429 )
430 .await;
431 }
432 Command::Finalize(finalized) => match *finalized {
433 Update::Tip(_, height, digest) => {
434 self.latest_observed_finalized_tip.replace((height, digest));
435 }
436 Update::Block(block, acknowledgement) => {
437 self.pending_finalizations
438 .push_back(ready((cause, block, acknowledgement)));
439 }
440 },
441 }
442 Ok(())
443 }
444
445 #[instrument(
447 skip_all,
448 parent = &cause,
449 fields(
450 head.height = %height,
451 head.digest = %digest,
452 %head_or_finalized,
453 ),
454 )]
455 async fn canonicalize(
456 &mut self,
457 cause: Span,
458 head_or_finalized: HeadOrFinalized,
459 height: Height,
460 digest: Digest,
461 maybe_build: JustCanonicalizeOrAlsoBuild,
462 ) {
463 let new_canonicalized = match head_or_finalized {
464 HeadOrFinalized::Head => self.last_canonicalized.update_head(height, digest),
465 HeadOrFinalized::Finalized => self.last_canonicalized.update_finalized(height, digest),
466 };
467
468 if new_canonicalized == self.last_canonicalized
469 && let JustCanonicalizeOrAlsoBuild::JustCanonicalize { response } = maybe_build
470 {
471 info!("would not change forkchoice state; not sending it to the execution layer");
472 let _ = response.send(Ok(()));
473 return;
474 }
475
476 info!(
477 head_block_hash = %new_canonicalized.forkchoice.head_block_hash,
478 head_block_height = %new_canonicalized.head_height,
479 finalized_block_hash = %new_canonicalized.forkchoice.finalized_block_hash,
480 finalized_block_height = %new_canonicalized.finalized_height,
481 "sending forkchoice-update",
482 );
483 let fcu_response = match self
484 .execution_node
485 .add_ons_handle
486 .beacon_engine_handle
487 .fork_choice_updated(
488 new_canonicalized.forkchoice,
489 maybe_build.attributes().cloned(),
490 )
491 .pace(&self.context, Duration::from_millis(20))
492 .await
493 .wrap_err("failed requesting execution layer to update forkchoice state")
494 {
495 Err(error) => {
496 maybe_build.send_error(error);
497 return;
498 }
499 Ok(response) => response,
500 };
501
502 debug!(
503 payload_status = %fcu_response.payload_status,
504 "execution layer reported FCU status",
505 );
506
507 if fcu_response.is_invalid() {
508 maybe_build.send_error(
509 Report::msg(fcu_response.payload_status)
510 .wrap_err("execution layer responded with error for forkchoice-update"),
511 );
512 return;
513 }
514
515 match maybe_build {
516 JustCanonicalizeOrAlsoBuild::JustCanonicalize { response } => {
517 let _ = response.send(Ok(()));
518 }
519 JustCanonicalizeOrAlsoBuild::AlsoBuild { response, .. } => {
520 if let Some(payload_id) = fcu_response.payload_id {
521 let _ = response.send(Ok(payload_id));
522 }
523 }
524 }
525 self.last_canonicalized = new_canonicalized;
526 self.reset_fcu_heartbeat_timer();
527 }
528
529 #[instrument(
549 skip_all,
550 parent = &cause,
551 fields(
552 block.digest = %block.digest(),
553 block.height = %block.height(),
554 ),
555 err(level = Level::WARN),
556 ret,
557 )]
558 async fn forward_finalized(
559 &mut self,
560 cause: Span,
561 block: Block,
562 acknowledgment: Exact,
563 ) -> eyre::Result<()> {
564 let (response, rx) = oneshot::channel();
565 self.canonicalize(
566 Span::current(),
567 HeadOrFinalized::Finalized,
568 block.height(),
569 block.digest(),
570 JustCanonicalizeOrAlsoBuild::JustCanonicalize { response },
571 )
572 .await;
573 rx.await
574 .wrap_err("executor dropped channel")
575 .and_then(|res| res)?;
576
577 let block = block.into_inner();
578 let consensus_context = block.header().consensus_context;
579 let payload_status = self
580 .execution_node
581 .add_ons_handle
582 .beacon_engine_handle
583 .new_payload(TempoExecutionData {
584 block: Arc::new(block),
585 validator_set: None,
587 })
588 .pace(&self.context, Duration::from_millis(20))
589 .await
590 .wrap_err(
591 "failed sending new-payload request to execution engine to \
592 query payload status of finalized block",
593 )?;
594
595 ensure!(
596 payload_status.is_valid() || payload_status.is_syncing(),
597 "this is a problem: payload status of block-to-be-finalized was \
598 neither valid nor syncing: `{payload_status}`"
599 );
600
601 if let Some(public_key) = self.public_key.as_ref()
602 && consensus_context
603 .is_some_and(|context| &PublicKey::from(context.proposer.get()) == public_key)
604 {
605 self.metrics.finalized_blocks_proposed_by_self.inc();
606 }
607
608 acknowledgment.acknowledge();
609
610 Ok(())
611 }
612}
613
614enum JustCanonicalizeOrAlsoBuild {
616 JustCanonicalize {
617 response: oneshot::Sender<eyre::Result<()>>,
618 },
619 AlsoBuild {
620 response: oneshot::Sender<eyre::Result<PayloadId>>,
621 attributes: Box<TempoPayloadAttributes>,
622 },
623}
624
625impl JustCanonicalizeOrAlsoBuild {
626 fn attributes(&self) -> Option<&TempoPayloadAttributes> {
627 match self {
628 Self::JustCanonicalize { .. } => None,
629 Self::AlsoBuild { attributes, .. } => Some(attributes),
630 }
631 }
632 fn send_error(self, error: eyre::Report) {
633 match self {
634 Self::JustCanonicalize { response } => {
635 let _ = response.send(Err(error));
636 }
637 Self::AlsoBuild { response, .. } => {
638 let _ = response.send(Err(error));
639 }
640 }
641 }
642}
643
644#[derive(Debug, Clone, Copy, PartialEq, Eq)]
646enum HeadOrFinalized {
647 Head,
648 Finalized,
649}
650
651impl std::fmt::Display for HeadOrFinalized {
652 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
653 let msg = match self {
654 Self::Head => "head",
655 Self::Finalized => "finalized",
656 };
657 f.write_str(msg)
658 }
659}