1use std::{collections::BTreeMap, pin::Pin, sync::Arc, time::Duration};
8
9use alloy_rpc_types_engine::ForkchoiceState;
10use commonware_consensus::{Heightable as _, marshal::Update, types::Height};
11
12use commonware_runtime::{
13 Clock, ContextCell, FutureExt, Handle, Metrics, Pacer, Spawner, spawn_cell,
14};
15use commonware_utils::{Acknowledgement, acknowledgement::Exact};
16use eyre::{OptionExt as _, Report, WrapErr as _, ensure};
17use futures::{
18 FutureExt as _, StreamExt as _,
19 channel::{
20 mpsc::{self, UnboundedReceiver},
21 oneshot,
22 },
23 select_biased,
24};
25use reth_provider::{BlockHashReader, BlockNumReader as _};
26use tempo_node::{TempoExecutionData, TempoFullNode};
27use tracing::{
28 Level, Span, debug, error, error_span, info, info_span, instrument, warn, warn_span,
29};
30
31use super::{
32 Config,
33 ingress::{CanonicalizeHead, Command, Message, SubscribeFinalized},
34};
35use crate::consensus::{Digest, block::Block};
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43struct LastCanonicalized {
44 forkchoice: ForkchoiceState,
45 head_height: Height,
46 finalized_height: Height,
47}
48
49impl LastCanonicalized {
50 fn update_finalized(self, height: Height, digest: Digest) -> Self {
61 let mut this = self;
62 if height > this.finalized_height {
63 this.finalized_height = height;
64 this.forkchoice.safe_block_hash = digest.0;
65 this.forkchoice.finalized_block_hash = digest.0;
66 }
67 if height >= this.head_height {
68 this.head_height = height;
69 this.forkchoice.head_block_hash = digest.0;
70 }
71 this
72 }
73
74 fn update_head(self, height: Height, digest: Digest) -> Self {
83 let mut this = self;
84 if height > this.finalized_height {
85 this.head_height = height;
86 this.forkchoice.head_block_hash = digest.0;
87 }
88 this
89 }
90}
91
92pub(crate) struct Actor<TContext> {
93 context: ContextCell<TContext>,
94
95 execution_node: TempoFullNode,
98
99 last_consensus_finalized_height: Height,
100 last_execution_finalized_height: Height,
101
102 mailbox: mpsc::UnboundedReceiver<Message>,
105
106 marshal: crate::alias::marshal::Mailbox,
108
109 last_canonicalized: LastCanonicalized,
110
111 fcu_heartbeat_interval: Duration,
114
115 fcu_heartbeat_timer: Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
117
118 pending_finalized_subscriptions: BTreeMap<Height, Vec<oneshot::Sender<()>>>,
121}
122
123impl<TContext> Actor<TContext>
124where
125 TContext: Clock + Metrics + Pacer + Spawner,
126{
127 pub(super) fn init(
128 context: TContext,
129 config: super::Config,
130 mailbox: UnboundedReceiver<super::ingress::Message>,
131 ) -> eyre::Result<Self> {
132 let Config {
133 execution_node,
134 last_finalized_height,
135 marshal,
136 fcu_heartbeat_interval,
137 } = config;
138 let last_execution_finalized_height = execution_node
139 .provider
140 .last_block_number()
141 .wrap_err("unable to read latest block number from execution layer")?;
142 let last_finalized_block_hash = execution_node
143 .provider
144 .block_hash(last_execution_finalized_height)
145 .map_or_else(
146 |e| Err(Report::new(e)),
147 |hash| hash.ok_or_eyre("execution layer does not have the block hash"),
148 )
149 .wrap_err("failed to read the last finalized block hash")?;
150 let fcu_heartbeat_timer = Box::pin(context.sleep(fcu_heartbeat_interval));
151 Ok(Self {
152 context: ContextCell::new(context),
153 execution_node,
154 last_consensus_finalized_height: last_finalized_height,
155 last_execution_finalized_height: Height::new(last_execution_finalized_height),
156 mailbox,
157 marshal,
158 last_canonicalized: LastCanonicalized {
159 forkchoice: ForkchoiceState {
160 head_block_hash: last_finalized_block_hash,
161 safe_block_hash: last_finalized_block_hash,
162 finalized_block_hash: last_finalized_block_hash,
163 },
164 head_height: Height::zero(),
165 finalized_height: Height::zero(),
166 },
167 fcu_heartbeat_interval,
168 fcu_heartbeat_timer,
169 pending_finalized_subscriptions: BTreeMap::new(),
170 })
171 }
172
173 pub(crate) fn start(mut self) -> Handle<()> {
174 spawn_cell!(self.context, self.run().await)
175 }
176
177 async fn run(mut self) {
178 info_span!("start").in_scope(|| {
179 info!(
180 last_finalized_consensus_height = %self.last_consensus_finalized_height,
181 last_finalized_execution_height = %self.last_execution_finalized_height,
182 "consensus and execution layers reported last finalized heights; \
183 backfilling blocks from consensus to execution if necessary",
184 );
185 });
186
187 let mut backfill_on_start = {
188 let marshal = self.marshal.clone();
189 std::pin::pin!(
190 futures::stream::iter(
191 self.last_execution_finalized_height.get() + 1
192 ..=self.last_consensus_finalized_height.get(),
193 )
194 .then(move |height| {
195 let marshal = marshal.clone();
196 async move { (height, marshal.get_block(Height::new(height)).await) }
197 })
198 .fuse()
199 )
200 };
201
202 loop {
203 select_biased! {
204 backfill = backfill_on_start.next() => {
205 match backfill {
206 Some((height, Some(block))) => {
207 let (ack, _wait) = Exact::handle();
208 let span = info_span!("backfill_on_start", height);
209 let _ = self.forward_finalized(
210 span,
211 block,
212 ack,
213 ).await;
214 }
215 Some((height, None)) => {
216 warn_span!("backfill_on_start", height)
217 .in_scope(|| warn!(
218 "marshal actor did not have block even though \
219 it must have finalized it previously",
220 ));
221 }
222 None => {
223 info_span!("backfill_on_start")
224 .in_scope(|| info!(
225 "no more blocks to backfill from consensus to \
226 execution layer")
227 );
228 }
229 }
230 },
231
232 msg = self.mailbox.next() => {
233 let Some(msg) = msg else { break; };
234 if let Err(error) = self.handle_message(msg).await {
241 error_span!("shutdown").in_scope(|| error!(
242 %error,
243 "executor encountered fatal fork choice update error; \
244 shutting down to prevent consensus-execution divergence"
245 ));
246 break;
247 }
248 },
249
250 _ = (&mut self.fcu_heartbeat_timer).fuse() => {
251 self.send_forkchoice_update_heartbeat().await;
252 self.reset_fcu_heartbeat_timer();
253 },
254 }
255 }
256 }
257
258 fn reset_fcu_heartbeat_timer(&mut self) {
259 self.fcu_heartbeat_timer = Box::pin(self.context.sleep(self.fcu_heartbeat_interval));
260 }
261
262 #[instrument(skip_all)]
263 async fn send_forkchoice_update_heartbeat(&mut self) {
264 info!(
265 head_block_hash = %self.last_canonicalized.forkchoice.head_block_hash,
266 head_block_height = %self.last_canonicalized.head_height,
267 finalized_block_hash = %self.last_canonicalized.forkchoice.finalized_block_hash,
268 finalized_block_height = %self.last_canonicalized.finalized_height,
269 "sending FCU",
270 );
271
272 let fcu_response = self
273 .execution_node
274 .add_ons_handle
275 .beacon_engine_handle
276 .fork_choice_updated(
277 self.last_canonicalized.forkchoice,
278 None,
279 reth_node_builder::EngineApiMessageVersion::V3,
280 )
281 .pace(&self.context, Duration::from_millis(20))
282 .await;
283
284 match fcu_response {
285 Ok(response) if response.is_invalid() => {
286 warn!(
287 payload_status = %response.payload_status,
288 "execution layer reported FCU status",
289 );
290 }
291 Ok(response) => {
292 info!(
293 payload_status = %response.payload_status,
294 "execution layer reported FCU status",
295 );
296 }
297 Err(error) => {
298 warn!(
299 error = %Report::new(error),
300 "failed sending FCU to execution layer",
301 );
302 }
303 }
304 }
305
306 async fn handle_message(&mut self, message: Message) -> eyre::Result<()> {
307 let cause = message.cause;
308 match message.command {
309 Command::CanonicalizeHead(CanonicalizeHead {
310 height,
311 digest,
312 ack,
313 }) => {
314 let _ = self
317 .canonicalize(cause, HeadOrFinalized::Head, height, digest, ack)
318 .await;
319 }
320 Command::Finalize(finalized) => {
321 self.finalize(cause, *finalized)
322 .await
323 .wrap_err("failed handling finalization")?;
324 }
325 Command::SubscribeFinalized(SubscribeFinalized { height, response }) => {
326 if self.last_canonicalized.finalized_height >= height {
327 let _ = response.send(());
328 } else {
329 self.pending_finalized_subscriptions
330 .entry(height)
331 .or_default()
332 .push(response);
333 }
334 }
335 }
336 Ok(())
337 }
338
339 #[instrument(
341 skip_all,
342 parent = &cause,
343 fields(
344 head.height = %height,
345 head.digest = %digest,
346 %head_or_finalized,
347 ),
348 err,
349 )]
350 async fn canonicalize(
351 &mut self,
352 cause: Span,
353 head_or_finalized: HeadOrFinalized,
354 height: Height,
355 digest: Digest,
356 ack: oneshot::Sender<()>,
357 ) -> eyre::Result<()> {
358 let new_canonicalized = match head_or_finalized {
359 HeadOrFinalized::Head => self.last_canonicalized.update_head(height, digest),
360 HeadOrFinalized::Finalized => self.last_canonicalized.update_finalized(height, digest),
361 };
362
363 if new_canonicalized == self.last_canonicalized {
364 info!("would not change forkchoice state; not sending it to the execution layer");
365 let _ = ack.send(());
366 return Ok(());
367 }
368
369 info!(
370 head_block_hash = %new_canonicalized.forkchoice.head_block_hash,
371 head_block_height = %new_canonicalized.head_height,
372 finalized_block_hash = %new_canonicalized.forkchoice.finalized_block_hash,
373 finalized_block_height = %new_canonicalized.finalized_height,
374 "sending forkchoice-update",
375 );
376 let fcu_response = self
377 .execution_node
378 .add_ons_handle
379 .beacon_engine_handle
380 .fork_choice_updated(
381 new_canonicalized.forkchoice,
382 None,
383 reth_node_builder::EngineApiMessageVersion::V3,
384 )
385 .pace(&self.context, Duration::from_millis(20))
386 .await
387 .wrap_err("failed requesting execution layer to update forkchoice state")?;
388
389 debug!(
390 payload_status = %fcu_response.payload_status,
391 "execution layer reported FCU status",
392 );
393
394 if fcu_response.is_invalid() {
395 return Err(Report::msg(fcu_response.payload_status)
396 .wrap_err("execution layer responded with error for forkchoice-update"));
397 }
398
399 let _ = ack.send(());
400 self.last_canonicalized = new_canonicalized;
401 self.reset_fcu_heartbeat_timer();
402
403 Ok(())
404 }
405
406 #[instrument(parent = &cause, skip_all)]
407 async fn finalize(&mut self, cause: Span, finalized: Update<Block>) -> eyre::Result<()> {
409 match finalized {
410 Update::Tip(_, height, digest) => {
411 self.canonicalize(
412 Span::current(),
413 HeadOrFinalized::Finalized,
414 height,
415 digest,
416 oneshot::channel().0,
417 )
418 .await
419 .wrap_err("failed canonicalizing finalization tip")?;
420 }
421 Update::Block(block, acknowledgment) => {
422 self.forward_finalized(Span::current(), block, acknowledgment)
423 .await
424 .wrap_err("failed forwarding finalized block to execution layer")?;
425 }
426 }
427 Ok(())
428 }
429
430 #[instrument(
450 skip_all,
451 parent = &cause,
452 fields(
453 block.digest = %block.digest(),
454 block.height = %block.height(),
455 ),
456 err(level = Level::WARN),
457 ret,
458 )]
459 async fn forward_finalized(
460 &mut self,
461 cause: Span,
462 block: Block,
463 acknowledgment: Exact,
464 ) -> eyre::Result<()> {
465 let height = block.height();
466 self.canonicalize(
467 Span::current(),
468 HeadOrFinalized::Finalized,
469 block.height(),
470 block.digest(),
471 oneshot::channel().0,
472 )
473 .await
474 .wrap_err("failed canonicalizing finalized block")?;
475
476 let block = block.into_inner();
477 let payload_status = self
478 .execution_node
479 .add_ons_handle
480 .beacon_engine_handle
481 .new_payload(TempoExecutionData {
482 block: Arc::new(block),
483 validator_set: None,
485 })
486 .pace(&self.context, Duration::from_millis(20))
487 .await
488 .wrap_err(
489 "failed sending new-payload request to execution engine to \
490 query payload status of finalized block",
491 )?;
492
493 ensure!(
494 payload_status.is_valid() || payload_status.is_syncing(),
495 "this is a problem: payload status of block-to-be-finalized was \
496 neither valid nor syncing: `{payload_status}`"
497 );
498
499 acknowledgment.acknowledge();
500
501 self.pending_finalized_subscriptions.retain(|&key, value| {
502 let retain = key > height;
503 if !retain {
504 value.drain(..).for_each(|tx| {
505 let _ = tx.send(());
506 });
507 }
508 retain
509 });
510
511 Ok(())
512 }
513}
514
515#[derive(Debug, Clone, Copy, PartialEq, Eq)]
517enum HeadOrFinalized {
518 Head,
519 Finalized,
520}
521
522impl std::fmt::Display for HeadOrFinalized {
523 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
524 let msg = match self {
525 Self::Head => "head",
526 Self::Finalized => "finalized",
527 };
528 f.write_str(msg)
529 }
530}