1use std::{collections::VecDeque, ops::RangeInclusive, sync::Arc, time::Duration};
8
9use alloy_rpc_types_engine::{ForkchoiceState, PayloadId};
10use commonware_consensus::{Heightable as _, marshal::Update, types::Height};
11use commonware_cryptography::ed25519::PublicKey;
12use commonware_runtime::{
13 Clock, ContextCell, FutureExt, Handle, Metrics as RuntimeMetrics, Pacer, Spawner, spawn_cell,
14};
15use commonware_utils::{Acknowledgement, acknowledgement::Exact};
16use eyre::{Report, WrapErr as _, ensure};
17use futures::{
18 FutureExt as _, StreamExt as _,
19 channel::{
20 mpsc::{self, UnboundedReceiver},
21 oneshot,
22 },
23 future::BoxFuture,
24 stream::FuturesUnordered,
25};
26use prometheus_client::metrics::counter::Counter;
27use reth_ethereum::{chainspec::EthChainSpec, rpc::eth::primitives::BlockNumHash};
28use reth_node_builder::PayloadKind;
29use tempo_node::{TempoExecutionData, TempoFullNode};
30use tempo_payload_types::{TempoBuiltPayload, TempoPayloadAttributes};
31use tokio::select;
32use tracing::{
33 Level, Span, debug, error, error_span, info, info_span, instrument, warn, warn_span,
34};
35
36use super::{
37 Config,
38 ingress::{CanonicalizeAndBuild, CanonicalizeHead, Command, Message},
39};
40use crate::{
41 consensus::{Digest, block::Block},
42 utils::OptionFuture,
43};
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51struct LastCanonicalized {
52 forkchoice: ForkchoiceState,
53 head_height: Height,
54 finalized_height: Height,
55}
56
57impl LastCanonicalized {
58 fn update_finalized(self, height: Height, digest: Digest) -> Self {
69 let mut this = self;
70 if height > this.finalized_height {
71 this.finalized_height = height;
72 this.forkchoice.safe_block_hash = digest.0;
73 this.forkchoice.finalized_block_hash = digest.0;
74 }
75 if height >= this.head_height {
76 this.head_height = height;
77 this.forkchoice.head_block_hash = digest.0;
78 }
79 this
80 }
81
82 fn update_head(self, height: Height, digest: Digest) -> Self {
91 let mut this = self;
92 if height > this.finalized_height || digest.0 == this.forkchoice.finalized_block_hash {
93 this.head_height = height;
94 this.forkchoice.head_block_hash = digest.0;
95 }
96 this
97 }
98}
99
100pub(crate) struct Actor<TContext> {
101 context: ContextCell<TContext>,
102
103 execution_node: Arc<TempoFullNode>,
106
107 last_consensus_finalized_height: Height,
108 last_execution_finalized_height: Height,
109
110 mailbox: mpsc::UnboundedReceiver<Message>,
113
114 marshal: crate::alias::marshal::Mailbox,
116
117 last_canonicalized: LastCanonicalized,
118
119 fcu_heartbeat_interval: Duration,
122
123 fcu_heartbeat_timer: OptionFuture<BoxFuture<'static, ()>>,
127
128 finalized_heights_to_backfill: RangeInclusive<u64>,
132
133 pending_backfill: OptionFuture<BoxFuture<'static, (u64, Option<Block>)>>,
135
136 execution_queue: VecDeque<ExecutionRequest>,
138 execution_task: OptionFuture<BoxFuture<'static, ExecutionTaskResult>>,
140
141 payload_jobs: FuturesUnordered<BoxFuture<'static, ()>>,
148
149 latest_observed_finalized_tip: Option<(Height, Digest)>,
150
151 public_key: Option<PublicKey>,
154
155 metrics: Metrics,
156}
157
158#[derive(Clone)]
159struct Metrics {
160 finalized_blocks_proposed_by_self: Counter,
162}
163
164impl Metrics {
165 fn init<TContext>(context: &TContext) -> Self
166 where
167 TContext: RuntimeMetrics,
168 {
169 let finalized_blocks_proposed_by_self = Counter::default();
170 context.register(
171 "finalized_blocks_proposed_by_self",
172 "number of finalized blocks whose proposer matches this node's public key",
173 finalized_blocks_proposed_by_self.clone(),
174 );
175 Self {
176 finalized_blocks_proposed_by_self,
177 }
178 }
179}
180
181impl<TContext> Actor<TContext>
182where
183 TContext: Clock + RuntimeMetrics + Pacer + Spawner,
184{
185 pub(super) fn init(
186 context: TContext,
187 config: super::Config,
188 mailbox: UnboundedReceiver<super::ingress::Message>,
189 ) -> eyre::Result<Self> {
190 let Config {
191 execution_node,
192 last_finalized_height,
193 marshal,
194 fcu_heartbeat_interval,
195 public_key,
196 } = config;
197 let metrics = Metrics::init(&context);
198 let canonical_state = execution_node.provider.canonical_in_memory_state();
199 let finalized_num_hash = canonical_state
200 .get_finalized_num_hash()
201 .unwrap_or_else(|| BlockNumHash::new(0, execution_node.chain_spec().genesis_hash()));
202 let head_num_hash: BlockNumHash = canonical_state.chain_info().into();
203
204 let fcu_heartbeat_timer = OptionFuture::some(context.sleep(fcu_heartbeat_interval).boxed());
205 let last_execution_finalized_height = Height::new(finalized_num_hash.number);
206 let finalized_heights_to_backfill =
207 (last_execution_finalized_height.get() + 1)..=last_finalized_height.get();
208 Ok(Self {
209 context: ContextCell::new(context),
210 execution_node,
211 last_consensus_finalized_height: last_finalized_height,
212 last_execution_finalized_height,
213 mailbox,
214 marshal,
215 last_canonicalized: LastCanonicalized {
216 forkchoice: ForkchoiceState {
217 head_block_hash: head_num_hash.hash,
218 safe_block_hash: finalized_num_hash.hash,
219 finalized_block_hash: finalized_num_hash.hash,
220 },
221 head_height: Height::new(head_num_hash.number),
222 finalized_height: Height::new(finalized_num_hash.number),
223 },
224 fcu_heartbeat_interval,
225 fcu_heartbeat_timer,
226
227 finalized_heights_to_backfill,
228 pending_backfill: OptionFuture::none(),
229 execution_queue: VecDeque::new(),
230 execution_task: OptionFuture::none(),
231 payload_jobs: FuturesUnordered::new(),
232
233 latest_observed_finalized_tip: None,
234
235 public_key,
236 metrics,
237 })
238 }
239
240 pub(crate) fn start(mut self) -> Handle<()> {
241 spawn_cell!(self.context, self.run())
242 }
243
244 async fn run(mut self) {
245 info_span!("start").in_scope(|| {
246 info!(
247 last_finalized_consensus_height = %self.last_consensus_finalized_height,
248 last_finalized_execution_height = %self.last_execution_finalized_height,
249 "consensus and execution layers reported last finalized heights; \
250 backfilling blocks from consensus to execution if necessary",
251 );
252 });
253
254 loop {
255 if self.pending_backfill.is_none()
256 && let Some(height) = self.finalized_heights_to_backfill.next()
257 {
258 self.pending_backfill.replace({
259 let marshal = self.marshal.clone();
260 async move { (height, marshal.get_block(Height::new(height)).await) }.boxed()
261 });
262 }
263
264 self.start_next_execution_task();
265 self.update_fcu_heartbeat_timer();
266
267 select! {
268 biased;
269
270 task_result = &mut self.execution_task => {
271 match task_result {
272 ExecutionTaskResult::Completed { canonicalized, payload_job } => {
273 if let Some(canonicalized) = canonicalized {
274 self.last_canonicalized = canonicalized;
278 }
279 if let Some(job) = payload_job {
280 self.payload_jobs.push(
281 run_payload_job(
282 self.context.clone(),
283 self.execution_node.clone(),
284 job,
285 )
286 .boxed(),
287 );
288 }
289 }
290 ExecutionTaskResult::Fatal { error } => {
291 error_span!("shutdown").in_scope(|| error!(
292 %error,
293 "executor encountered fatal execution-layer update error; \
294 shutting down to prevent consensus-execution divergence"
295 ));
296 break;
297 }
298 }
299 }
300
301 block = &mut self.pending_backfill => {
302 match block {
303 (height, Some(block)) => {
304 let (ack, _wait) = Exact::handle();
305 let span = info_span!("backfill_on_start", %height);
306 self.enqueue_execution_request(ExecutionRequest::FinalizeBlock(
307 Box::new(FinalizedBlockRequest {
308 cause: span,
309 block,
310 acknowledgment: ack,
311 is_backfill: true,
312 }),
313 ));
314 }
315 (height, None) => {
316 warn_span!("backfill_on_start", %height)
317 .in_scope(|| warn!(
318 "marshal actor did not have block even though \
319 it must have finalized it previously",
320 ));
321 }
322 }
323 }
324
325 Some(()) = self.payload_jobs.next() => {}
326
327 msg = self.mailbox.next() => {
328 let Some(msg) = msg else { break; };
329 if let Err(error) = self.handle_message(msg) {
330 error_span!("shutdown").in_scope(|| error!(
331 %error,
332 "executor failed handling message; \
333 shutting down to prevent consensus-execution divergence"
334 ));
335 break;
336 }
337 },
338
339 _ = (&mut self.fcu_heartbeat_timer).fuse() => {
340 self.send_forkchoice_update_heartbeat();
341 },
342 }
343 }
344 }
345
346 fn arm_fcu_heartbeat_timer(&mut self) {
347 if !self.fcu_heartbeat_timer.is_none() {
348 return;
349 }
350 self.fcu_heartbeat_timer
351 .replace(self.context.sleep(self.fcu_heartbeat_interval).boxed());
352 }
353
354 fn disarm_fcu_heartbeat_timer(&mut self) {
355 self.fcu_heartbeat_timer = OptionFuture::none();
356 }
357
358 fn update_fcu_heartbeat_timer(&mut self) {
359 if !self.is_backfilling()
360 && self.execution_task.is_none()
361 && self.execution_queue.is_empty()
362 {
363 self.arm_fcu_heartbeat_timer();
364 } else {
365 self.disarm_fcu_heartbeat_timer();
366 }
367 }
368
369 #[instrument(skip_all)]
370 fn send_forkchoice_update_heartbeat(&mut self) {
371 self.enqueue_execution_request(ExecutionRequest::Heartbeat {
372 cause: Span::current(),
373 });
374 }
375
376 fn handle_message(&mut self, message: Message) -> eyre::Result<()> {
377 let cause = message.cause;
378 let is_backfilling = self.is_backfilling();
379 match message.command {
380 Command::CanonicalizeHead(CanonicalizeHead {
381 height,
382 digest,
383 response,
384 }) => {
385 if is_backfilling {
386 info_span!("handle_message")
387 .in_scope(|| info!("request to canonicalize deferred while backfilling"));
388 }
389 self.enqueue_execution_request(ExecutionRequest::Canonicalize(Box::new(
390 Canonicalize {
391 cause,
392 head_or_finalized: HeadOrFinalized::Head,
393 height,
394 digest,
395 response: Some(response),
396 build_attributes: None,
397 },
398 )));
399 }
400 Command::CanonicalizeAndBuild(CanonicalizeAndBuild {
401 height,
402 digest,
403 attributes,
404 response,
405 }) => {
406 if is_backfilling {
407 info_span!("handle_message").in_scope(|| {
408 info!("request to canonicalize and build deferred while backfilling")
409 });
410 }
411 self.enqueue_execution_request(ExecutionRequest::Canonicalize(Box::new(
412 Canonicalize {
413 cause,
414 head_or_finalized: HeadOrFinalized::Head,
415 height,
416 digest,
417 response: None,
418 build_attributes: Some((*attributes, response)),
419 },
420 )));
421 }
422 Command::Finalize(finalized) => match *finalized {
423 Update::Tip(_, height, digest) => {
424 self.latest_observed_finalized_tip.replace((height, digest));
425 }
426 Update::Block(block, acknowledgement) => {
427 self.enqueue_execution_request(ExecutionRequest::FinalizeBlock(Box::new(
428 FinalizedBlockRequest {
429 cause,
430 block,
431 acknowledgment: acknowledgement,
432 is_backfill: false,
433 },
434 )));
435 }
436 },
437 }
438 Ok(())
439 }
440
441 fn enqueue_execution_request(&mut self, request: ExecutionRequest) {
442 if matches!(&request, ExecutionRequest::Heartbeat { .. })
443 && (!self.execution_queue.is_empty()
444 || !self.execution_task.is_none()
445 || self.is_backfilling())
446 {
447 return;
448 }
449
450 if request.is_backfill() {
451 let insert_at = self
452 .execution_queue
453 .iter()
454 .position(|request| !request.is_backfill())
455 .unwrap_or(self.execution_queue.len());
456 self.execution_queue.insert(insert_at, request);
457 } else {
458 self.execution_queue.push_back(request);
459 }
460 }
461
462 fn start_next_execution_task(&mut self) {
463 if !self.execution_task.is_none() {
464 return;
465 }
466
467 if self.execution_queue.is_empty()
470 && !self.is_backfilling()
471 && let Some((height, digest)) = self.latest_observed_finalized_tip
472 && let new_canonicalized = self.last_canonicalized.update_finalized(height, digest)
473 && new_canonicalized != self.last_canonicalized
474 {
475 self.execution_queue
476 .push_back(ExecutionRequest::Canonicalize(Box::new(Canonicalize {
477 cause: Span::current(),
478 head_or_finalized: HeadOrFinalized::Finalized,
479 height,
480 digest,
481 response: None,
482 build_attributes: None,
483 })));
484 }
485
486 let Some(request) = self.execution_queue.front() else {
487 return;
488 };
489 if self.is_backfilling() && !request.is_backfill() {
490 return;
491 }
492 let request = self.execution_queue.pop_front().expect("front exists");
493
494 let task = execute_request(
495 self.context.clone(),
496 self.execution_node.clone(),
497 self.public_key.clone(),
498 self.metrics.clone(),
499 self.last_canonicalized,
500 request,
501 );
502 self.execution_task.replace(task.boxed());
503 }
504
505 fn is_backfilling(&self) -> bool {
506 self.pending_backfill.is_some() || !self.finalized_heights_to_backfill.is_empty()
507 }
508}
509
510enum ExecutionRequest {
511 Heartbeat { cause: Span },
512 Canonicalize(Box<Canonicalize>),
513 FinalizeBlock(Box<FinalizedBlockRequest>),
514}
515
516impl ExecutionRequest {
517 fn is_backfill(&self) -> bool {
518 let Self::FinalizeBlock(req) = self else {
519 return false;
520 };
521 req.is_backfill
522 }
523}
524
525struct FinalizedBlockRequest {
526 cause: Span,
527 block: Block,
528 acknowledgment: Exact,
529 is_backfill: bool,
530}
531
532#[derive(Debug, Clone, Copy, PartialEq, Eq)]
533enum ForkchoiceUpdateKind {
534 Heartbeat,
535 Canonicalize { head_or_finalized: HeadOrFinalized },
536}
537
538enum ExecutionTaskResult {
539 Completed {
540 canonicalized: Option<LastCanonicalized>,
541 payload_job: Option<StartPayloadJob>,
544 },
545 Fatal {
546 error: Report,
547 },
548}
549
550struct Canonicalize {
551 cause: Span,
552 head_or_finalized: HeadOrFinalized,
553 height: Height,
554 digest: Digest,
555 response: Option<oneshot::Sender<()>>,
558 build_attributes: Option<(TempoPayloadAttributes, oneshot::Sender<TempoBuiltPayload>)>,
561}
562
563struct StartPayloadJob {
566 cause: Span,
567 payload_id: PayloadId,
568 response: oneshot::Sender<TempoBuiltPayload>,
569}
570
571async fn execute_request<TContext>(
572 context: ContextCell<TContext>,
573 execution_node: Arc<TempoFullNode>,
574 public_key: Option<PublicKey>,
575 metrics: Metrics,
576 canonicalized: LastCanonicalized,
577 request: ExecutionRequest,
578) -> ExecutionTaskResult
579where
580 TContext: Pacer,
581{
582 match request {
583 ExecutionRequest::Heartbeat { cause } => {
584 if let Err(error) = submit_forkchoice_update(
585 &execution_node,
586 &context,
587 cause,
588 canonicalized,
589 None,
590 ForkchoiceUpdateKind::Heartbeat,
591 )
592 .await
593 {
594 warn!(%error, "queued forkchoice update failed");
595 }
596 ExecutionTaskResult::Completed {
597 canonicalized: None,
598 payload_job: None,
599 }
600 }
601 ExecutionRequest::Canonicalize(request) => {
602 let (canonicalized, payload_job) =
603 run_canonicalize_task(&context, execution_node, canonicalized, *request).await;
604 ExecutionTaskResult::Completed {
605 canonicalized,
606 payload_job,
607 }
608 }
609 ExecutionRequest::FinalizeBlock(request) => {
610 let fatal_on_error = !request.is_backfill;
611 match forward_finalized(
612 &context,
613 execution_node,
614 public_key,
615 metrics,
616 canonicalized,
617 *request,
618 )
619 .await
620 {
621 Ok(canonicalized) => ExecutionTaskResult::Completed {
622 canonicalized,
623 payload_job: None,
624 },
625 Err(error) if fatal_on_error => ExecutionTaskResult::Fatal { error },
626 Err(error) => {
627 warn!(%error, "failed forwarding backfilled finalized block to execution layer");
628 ExecutionTaskResult::Completed {
629 canonicalized: None,
630 payload_job: None,
631 }
632 }
633 }
634 }
635 }
636}
637
638#[instrument(
639 skip_all,
640 parent = &cause,
641 fields(
642 %height,
643 %digest,
644 head_or_finalized = %head_or_finalized,
645 ),
646)]
647async fn run_canonicalize_task<TContext: Pacer>(
648 context: &TContext,
649 execution_node: Arc<TempoFullNode>,
650 canonicalized: LastCanonicalized,
651 Canonicalize {
652 cause,
653 head_or_finalized,
654 height,
655 digest,
656 response,
657 mut build_attributes,
658 }: Canonicalize,
659) -> (Option<LastCanonicalized>, Option<StartPayloadJob>) {
660 let new_canonicalized = match head_or_finalized {
661 HeadOrFinalized::Head => canonicalized.update_head(height, digest),
662 HeadOrFinalized::Finalized => canonicalized.update_finalized(height, digest),
663 };
664
665 if build_attributes
666 .as_ref()
667 .is_some_and(|(_, response)| response.is_canceled())
668 {
669 info!("dropping payload build request: the subscriber went away while it was queued");
670 build_attributes.take();
671 }
672
673 if build_attributes.is_some() && new_canonicalized.forkchoice.head_block_hash != digest.0 {
680 info!("dropping payload build request: its parent cannot be made the head");
681 build_attributes.take();
682 }
683
684 let (attributes, payload_response) = build_attributes.unzip();
685
686 match submit_forkchoice_update(
690 &execution_node,
691 context,
692 cause.clone(),
693 new_canonicalized,
694 attributes,
695 ForkchoiceUpdateKind::Canonicalize { head_or_finalized },
696 )
697 .await
698 {
699 Ok(payload_id) => {
700 if let Some(response) = response {
701 let _ = response.send(());
702 }
703 let payload_job = match (payload_response, payload_id) {
704 (Some(response), Some(payload_id)) => Some(StartPayloadJob {
705 cause,
706 payload_id,
707 response,
708 }),
709 (Some(_dropped_to_signal_failure), None) => {
710 warn!("execution layer did not return a payload id for the build request");
711 None
712 }
713 (None, _) => None,
714 };
715 (Some(new_canonicalized), payload_job)
716 }
717 Err(error) => {
718 warn!(%error, "forkchoice update failed");
721 (None, None)
722 }
723 }
724}
725
726#[instrument(
735 skip_all,
736 parent = &cause,
737 fields(%payload_id),
738)]
739async fn run_payload_job<TContext: Pacer>(
740 context: TContext,
741 execution_node: Arc<TempoFullNode>,
742 StartPayloadJob {
743 cause,
744 payload_id,
745 mut response,
746 }: StartPayloadJob,
747) {
748 let payload = select! {
749 payload = execution_node
750 .payload_builder_handle
751 .resolve_kind(payload_id, PayloadKind::WaitForPending)
752 .pace(&context, Duration::from_millis(20))
753 => payload,
754
755 () = response.cancellation() => {
757 info!("payload subscriber went away before the payload was resolved; killing the payload build");
758 return;
759 }
760 };
761
762 match payload {
765 Some(Ok(payload)) => {
766 if response.send(payload).is_err() {
767 info!(
768 "payload subscriber went away before the payload could be delivered; discarding it"
769 );
770 }
771 }
772 Some(Err(error)) => {
773 warn!(
774 error = %eyre::Report::new(error),
775 "payload build job failed",
776 );
777 }
778 None => {
779 warn!("no payload build job found under the payload ID");
780 }
781 }
782}
783
784#[instrument(
785 skip_all,
786 parent = &cause,
787 fields(
788 head_block_hash = %canonicalized.forkchoice.head_block_hash,
789 head_block_height = %canonicalized.head_height,
790 finalized_block_hash = %canonicalized.forkchoice.finalized_block_hash,
791 finalized_block_height = %canonicalized.finalized_height,
792 ?kind,
793 ),
794)]
795async fn submit_forkchoice_update<TContext: Pacer>(
796 execution_node: &TempoFullNode,
797 context: &TContext,
798 cause: Span,
799 canonicalized: LastCanonicalized,
800 attrs: Option<TempoPayloadAttributes>,
801 kind: ForkchoiceUpdateKind,
802) -> eyre::Result<Option<PayloadId>> {
803 let fcu_response = execution_node
804 .add_ons_handle
805 .beacon_engine_handle
806 .fork_choice_updated(canonicalized.forkchoice, attrs)
807 .pace(context, Duration::from_millis(20))
808 .await
809 .wrap_err("failed requesting execution layer to update forkchoice state")?;
810
811 if kind == ForkchoiceUpdateKind::Heartbeat {
812 if fcu_response.is_invalid() {
813 warn!(
814 payload_status = %fcu_response.payload_status,
815 "execution layer reported FCU status",
816 );
817 } else {
818 info!(
819 payload_status = %fcu_response.payload_status,
820 "execution layer reported FCU status",
821 );
822 }
823 } else {
824 debug!(
825 payload_status = %fcu_response.payload_status,
826 "execution layer reported FCU status",
827 );
828 }
829
830 if fcu_response.is_invalid() {
831 return Err(Report::msg(fcu_response.payload_status)
832 .wrap_err("execution layer responded with error for forkchoice-update"));
833 }
834
835 Ok(fcu_response.payload_id)
836}
837
838#[instrument(
839 skip_all,
840 parent = &request.cause,
841 fields(
842 block.digest = %request.block.digest(),
843 block.height = %request.block.height(),
844 ),
845 err(level = Level::WARN),
846 ret,
847)]
848async fn forward_finalized<TContext: Pacer>(
849 context: &TContext,
850 execution_node: Arc<TempoFullNode>,
851 public_key: Option<PublicKey>,
852 metrics: Metrics,
853 canonicalized: LastCanonicalized,
854 request: FinalizedBlockRequest,
855) -> eyre::Result<Option<LastCanonicalized>> {
856 let FinalizedBlockRequest {
857 cause,
858 block,
859 acknowledgment,
860 is_backfill: _,
861 } = request;
862
863 let new_canonicalized = canonicalized.update_finalized(block.height(), block.digest());
864 let forkchoice = (new_canonicalized != canonicalized).then_some(new_canonicalized);
865
866 if let Some(canonicalized) = forkchoice {
867 submit_forkchoice_update(
868 &execution_node,
869 context,
870 cause.clone(),
871 canonicalized,
872 None,
873 ForkchoiceUpdateKind::Canonicalize {
874 head_or_finalized: HeadOrFinalized::Finalized,
875 },
876 )
877 .await?;
878 }
879
880 let (block, block_access_list) = block.into_parts();
881 let consensus_context = block.header().consensus_context;
882 let payload_status = execution_node
883 .add_ons_handle
884 .beacon_engine_handle
885 .new_payload(TempoExecutionData {
886 block: Arc::new(block),
887 block_access_list,
888 validator_set: None,
890 })
891 .pace(context, Duration::from_millis(20))
892 .await
893 .wrap_err(
894 "failed sending new-payload request to execution engine to \
895 query payload status of finalized block",
896 )?;
897
898 ensure!(
899 payload_status.is_valid() || payload_status.is_syncing(),
900 "this is a problem: payload status of block-to-be-finalized was \
901 neither valid nor syncing: `{payload_status}`"
902 );
903
904 if let Some(public_key) = public_key.as_ref()
905 && consensus_context
906 .is_some_and(|context| &PublicKey::from(context.proposer.get()) == public_key)
907 {
908 metrics.finalized_blocks_proposed_by_self.inc();
909 }
910
911 acknowledgment.acknowledge();
912
913 Ok(forkchoice)
914}
915
916#[derive(Debug, Clone, Copy, PartialEq, Eq)]
918enum HeadOrFinalized {
919 Head,
920 Finalized,
921}
922
923impl std::fmt::Display for HeadOrFinalized {
924 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
925 let msg = match self {
926 Self::Head => "head",
927 Self::Finalized => "finalized",
928 };
929 f.write_str(msg)
930 }
931}