1use std::{sync::Arc, time::Duration};
11
12use alloy_primitives::B256;
13use alloy_rpc_types_engine::ForkchoiceState;
14use commonware_consensus::{Block as _, marshal::Update};
15
16use commonware_macros::select;
17use commonware_runtime::{ContextCell, FutureExt, Handle, Metrics, Pacer, Spawner, spawn_cell};
18use commonware_utils::{Acknowledgement, acknowledgement::Exact};
19use eyre::{Report, WrapErr as _, ensure, eyre};
20use futures::{
21 StreamExt as _,
22 channel::{mpsc, oneshot},
23};
24use reth_provider::BlockNumReader as _;
25use tempo_node::{TempoExecutionData, TempoFullNode};
26use tracing::{Level, Span, debug, info, instrument, warn};
27
28use crate::consensus::{Digest, block::Block};
29
30pub(super) struct Builder {
31 pub(super) execution_node: TempoFullNode,
34
35 pub(super) genesis_block: Arc<Block>,
38
39 pub(super) marshal: crate::alias::marshal::Mailbox,
41}
42
43impl Builder {
44 pub(super) fn build<TContext>(self, context: TContext) -> Executor<TContext>
46 where
47 TContext: Spawner,
48 {
49 let Self {
50 execution_node,
51 genesis_block,
52 marshal,
53 } = self;
54
55 let (to_me, from_app) = mpsc::unbounded();
56
57 let my_mailbox = ExecutorMailbox { inner: to_me };
58
59 let genesis_hash = genesis_block.block_hash();
60 Executor {
61 context: ContextCell::new(context),
62 execution_node,
63 mailbox: from_app,
64 marshal,
65 my_mailbox,
66 last_canonicalized: LastCanonicalized {
67 forkchoice: ForkchoiceState {
68 head_block_hash: genesis_hash,
69 safe_block_hash: genesis_hash,
70 finalized_block_hash: genesis_hash,
71 },
72 head_height: 0,
73 finalized_height: 0,
74 },
75 }
76 }
77}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85struct LastCanonicalized {
86 forkchoice: ForkchoiceState,
87 head_height: u64,
88 finalized_height: u64,
89}
90
91impl LastCanonicalized {
92 fn update_finalized(self, height: u64, hash: B256) -> Self {
103 let mut this = self;
104 if height > this.finalized_height {
105 this.finalized_height = height;
106 this.forkchoice.safe_block_hash = hash;
107 this.forkchoice.finalized_block_hash = hash;
108 }
109 if height >= this.head_height {
110 this.head_height = height;
111 this.forkchoice.head_block_hash = hash;
112 }
113 this
114 }
115
116 fn update_head(self, height: u64, hash: B256) -> Self {
125 let mut this = self;
126 if height > this.finalized_height {
127 this.head_height = height;
128 this.forkchoice.head_block_hash = hash;
129 }
130 this
131 }
132}
133
134pub(super) struct Executor<TContext> {
135 context: ContextCell<TContext>,
136
137 execution_node: TempoFullNode,
140
141 mailbox: mpsc::UnboundedReceiver<Message>,
144
145 marshal: crate::alias::marshal::Mailbox,
147
148 my_mailbox: ExecutorMailbox,
151
152 last_canonicalized: LastCanonicalized,
153}
154
155impl<TContext> Executor<TContext>
156where
157 TContext: Metrics + Pacer + Spawner,
158{
159 pub(super) fn mailbox(&self) -> &ExecutorMailbox {
160 &self.my_mailbox
161 }
162
163 pub(super) fn start(mut self) -> Handle<()> {
164 spawn_cell!(self.context, self.run().await)
165 }
166
167 async fn run(mut self) {
168 loop {
169 select! {
170 msg = self.mailbox.next() => {
171 let Some(msg) = msg else { break; };
172 self.handle_message(msg).await;
179 },
180 }
181 }
182 }
183
184 async fn handle_message(&mut self, message: Message) {
185 let cause = message.cause;
186 match message.command {
187 Command::CanonicalizeHead {
188 height,
189 digest,
190 ack,
191 } => {
192 let _ = self
193 .canonicalize(cause, HeadOrFinalized::Head, height, digest, ack)
194 .await;
195 }
196 Command::Finalize(finalized) => {
197 let _ = self.finalize(cause, *finalized).await;
198 }
199 }
200 }
201
202 #[instrument(
204 skip_all,
205 parent = &cause,
206 fields(
207 head.height = height,
208 head.digest = %digest,
209 %head_or_finalized,
210 ),
211 err,
212 )]
213 async fn canonicalize(
214 &mut self,
215 cause: Span,
216 head_or_finalized: HeadOrFinalized,
217 height: u64,
218 digest: Digest,
219 ack: oneshot::Sender<()>,
220 ) -> eyre::Result<()> {
221 let new_canonicalized = match head_or_finalized {
222 HeadOrFinalized::Head => self.last_canonicalized.update_head(height, digest.0),
223 HeadOrFinalized::Finalized => {
224 self.last_canonicalized.update_finalized(height, digest.0)
225 }
226 };
227
228 if new_canonicalized == self.last_canonicalized {
229 info!("would not change forkchoice state; not sending it to the execution layer");
230 let _ = ack.send(());
231 return Ok(());
232 }
233
234 info!(
235 head_block_hash = %new_canonicalized.forkchoice.head_block_hash,
236 head_block_height = new_canonicalized.head_height,
237 finalized_block_hash = %new_canonicalized.forkchoice.finalized_block_hash,
238 finalized_block_height = new_canonicalized.finalized_height,
239 "sending forkchoice-update",
240 );
241 let fcu_response = self
242 .execution_node
243 .add_ons_handle
244 .beacon_engine_handle
245 .fork_choice_updated(
246 new_canonicalized.forkchoice,
247 None,
248 reth_node_builder::EngineApiMessageVersion::V3,
249 )
250 .pace(&self.context, Duration::from_millis(20))
251 .await
252 .wrap_err("failed requesting execution layer to update forkchoice state")?;
253
254 debug!(
255 payload_status = %fcu_response.payload_status,
256 "execution layer reported FCU status",
257 );
258
259 if fcu_response.is_invalid() {
260 return Err(Report::msg(fcu_response.payload_status)
261 .wrap_err("execution layer responded with error for forkchoice-update"));
262 }
263
264 let _ = ack.send(());
265 self.last_canonicalized = new_canonicalized;
266
267 Ok(())
268 }
269
270 #[instrument(parent = &cause, skip_all)]
271 async fn finalize(&mut self, cause: Span, finalized: super::ingress::Finalized) {
273 match finalized.inner {
274 Update::Tip(height, digest) => {
275 let _: Result<_, _> = self
276 .canonicalize(
277 Span::current(),
278 HeadOrFinalized::Finalized,
279 height,
280 digest,
281 oneshot::channel().0,
282 )
283 .await;
284 }
285 Update::Block(block, acknowledgment) => {
286 let _: Result<_, _> = self
287 .forward_finalized(Span::current(), block, acknowledgment)
288 .await;
289 }
290 }
291 }
292
293 #[instrument(
313 skip_all,
314 parent = &cause,
315 fields(
316 block.digest = %block.digest(),
317 block.height = block.height(),
318 ),
319 err(level = Level::WARN),
320 ret,
321 )]
322 async fn forward_finalized(
323 &mut self,
324 cause: Span,
325 block: Block,
326 acknowledgment: Exact,
327 ) -> eyre::Result<()> {
328 if let Err(error) = self
329 .canonicalize(
330 Span::current(),
331 HeadOrFinalized::Finalized,
332 block.height(),
333 block.digest(),
334 oneshot::channel().0,
335 )
336 .await
337 {
338 warn!(
339 %error,
340 "failed canonicalizing finalized block; will still attempt \
341 forwarding it to the execution layer",
342 );
343 }
344
345 if let Ok(execution_height) = self
346 .execution_node
347 .provider
348 .last_block_number()
349 .map_err(Report::new)
350 .inspect_err(|error| {
351 warn!(
352 %error,
353 "failed getting last finalized block from execution layer, will \
354 finalize forward block to execution layer without extra checks, \
355 but it might fail"
356 )
357 })
358 && execution_height + 1 < block.height()
359 {
360 info!(
361 execution.finalized_height = execution_height,
362 "hole detected; consensus attempts to finalize block with gaps \
363 on the execution layer; filling them in first",
364 );
365 let _ = self.fill_holes(execution_height + 1, block.height()).await;
366 }
367
368 let block = block.into_inner();
369 let payload_status = self
370 .execution_node
371 .add_ons_handle
372 .beacon_engine_handle
373 .new_payload(TempoExecutionData {
374 block: Arc::new(block),
375 validator_set: None,
377 })
378 .pace(&self.context, Duration::from_millis(20))
379 .await
380 .wrap_err(
381 "failed sending new-payload request to execution engine to \
382 query payload status of finalized block",
383 )?;
384
385 ensure!(
386 payload_status.is_valid() || payload_status.is_syncing(),
387 "this is a problem: payload status of block-to-be-finalized was \
388 neither valid nor syncing: `{payload_status}`"
389 );
390
391 acknowledgment.acknowledge();
392
393 Ok(())
394 }
395
396 #[instrument(
398 skip_all,
399 fields(from, to),
400 err(level = Level::WARN),
401 )]
402 async fn fill_holes(&mut self, from: u64, to: u64) -> eyre::Result<()> {
403 ensure!(from <= to, "backfill range is negative");
404
405 for height in from..to {
406 let block = self.marshal.get_block(height).await.ok_or_else(|| {
407 eyre!(
408 "marshal actor does not know about block `{height}`, but \
409 this function expects that it has all blocks in the provided \
410 range",
411 )
412 })?;
413
414 let digest = block.digest();
415
416 let payload_status = self
417 .execution_node
418 .add_ons_handle
419 .beacon_engine_handle
420 .new_payload(TempoExecutionData {
421 block: Arc::new(block.into_inner()),
422 validator_set: None,
424 })
425 .pace(&self.context, Duration::from_millis(20))
426 .await
427 .wrap_err(
428 "failed sending new-payload request to execution engine to \
429 query payload status of finalized block",
430 )?;
431
432 ensure!(
433 payload_status.is_valid() || payload_status.is_syncing(),
434 "this is a problem: payload status of block `{digest}` we are \
435 trying to backfill is neither valid nor syncing: \
436 `{payload_status}`"
437 );
438 }
439 Ok(())
440 }
441}
442
443#[derive(Debug, Clone, Copy, PartialEq, Eq)]
445enum HeadOrFinalized {
446 Head,
447 Finalized,
448}
449
450impl std::fmt::Display for HeadOrFinalized {
451 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
452 let msg = match self {
453 Self::Head => "head",
454 Self::Finalized => "finalized",
455 };
456 f.write_str(msg)
457 }
458}
459
460#[derive(Clone, Debug)]
461pub(super) struct ExecutorMailbox {
462 inner: mpsc::UnboundedSender<Message>,
463}
464
465impl ExecutorMailbox {
466 pub(super) fn canonicalize_head(
468 &self,
469 height: u64,
470 digest: Digest,
471 ) -> eyre::Result<oneshot::Receiver<()>> {
472 let (tx, rx) = oneshot::channel();
473 self.inner
474 .unbounded_send(Message {
475 cause: Span::current(),
476 command: Command::CanonicalizeHead {
477 height,
478 digest,
479 ack: tx,
480 },
481 })
482 .wrap_err("failed sending canonicalize request to agent, this means it exited")?;
483
484 Ok(rx)
485 }
486
487 pub(super) fn forward_finalized(
489 &self,
490 finalized: super::ingress::Finalized,
491 ) -> eyre::Result<()> {
492 self.inner
493 .unbounded_send(Message {
494 cause: Span::current(),
495 command: Command::Finalize(finalized.into()),
496 })
497 .wrap_err("failed sending finalization request to agent, this means it exited")
498 }
499}
500
501#[derive(Debug)]
502struct Message {
503 cause: Span,
504 command: Command,
505}
506
507#[derive(Debug)]
508enum Command {
509 CanonicalizeHead {
511 height: u64,
512 digest: Digest,
513 ack: oneshot::Sender<()>,
514 },
515 Finalize(Box<super::ingress::Finalized>),
517}