1use std::{sync::Arc, time::Duration};
14
15use alloy_consensus::BlockHeader;
16use alloy_primitives::{B256, Bytes};
17use alloy_rpc_types_engine::PayloadId;
18use commonware_codec::{DecodeExt as _, Encode as _};
19use commonware_consensus::{
20 Block as _,
21 marshal::SchemeProvider as _,
22 types::{Epoch, Round, View},
23 utils,
24};
25use commonware_cryptography::ed25519::PublicKey;
26use commonware_macros::select;
27use commonware_runtime::{
28 ContextCell, FutureExt as _, Handle, Metrics, Pacer, Spawner, Storage, spawn_cell,
29};
30
31use commonware_utils::SystemTimeExt;
32use eyre::{OptionExt as _, WrapErr as _, bail, ensure, eyre};
33use futures::{
34 StreamExt as _, TryFutureExt as _,
35 channel::{mpsc, oneshot},
36 future::{Either, always_ready, ready, try_join},
37};
38use rand::{CryptoRng, Rng};
39use reth_node_builder::ConsensusEngineHandle;
40use reth_primitives_traits::SealedBlock;
41use tempo_dkg_onchain_artifacts::PublicOutcome;
42use tempo_node::{TempoExecutionData, TempoFullNode, TempoPayloadTypes};
43
44use reth_provider::BlockReader as _;
45use tokio::sync::RwLock;
46use tracing::{Level, debug, error, error_span, info, instrument, warn};
47
48use tempo_payload_types::TempoPayloadBuilderAttributes;
49
50use super::{
51 Mailbox, executor,
52 executor::ExecutorMailbox,
53 ingress::{Broadcast, Finalized, Genesis, Message, Propose, Verify},
54};
55use crate::{
56 consensus::{Digest, block::Block},
57 epoch::SchemeProvider,
58 subblocks,
59};
60
61pub(in crate::consensus) struct Actor<TContext, TState = Uninit> {
62 context: ContextCell<TContext>,
63 mailbox: mpsc::Receiver<Message>,
64
65 inner: Inner<TState>,
66}
67
68impl<TContext, TState> Actor<TContext, TState> {
69 pub(super) fn mailbox(&self) -> &Mailbox {
70 &self.inner.my_mailbox
71 }
72}
73
74impl<TContext> Actor<TContext, Uninit>
75where
76 TContext: Pacer + governor::clock::Clock + Rng + CryptoRng + Spawner + Storage + Metrics,
77{
78 pub(super) async fn init(config: super::Config<TContext>) -> eyre::Result<Self> {
79 let (tx, rx) = mpsc::channel(config.mailbox_size);
80 let my_mailbox = Mailbox::from_sender(tx);
81
82 let block = config
83 .execution_node
84 .provider
85 .block_by_number(0)
86 .map_err(Into::<eyre::Report>::into)
87 .and_then(|maybe| maybe.ok_or_eyre("block reader returned empty genesis block"))
88 .wrap_err("failed reading genesis block from execution node")?;
89
90 Ok(Self {
91 context: ContextCell::new(config.context),
92 mailbox: rx,
93
94 inner: Inner {
95 fee_recipient: config.fee_recipient,
96 epoch_length: config.epoch_length,
97 new_payload_wait_time: config.new_payload_wait_time,
98
99 my_mailbox,
100 marshal: config.marshal,
101
102 genesis_block: Arc::new(Block::from_execution_block(SealedBlock::seal_slow(block))),
103
104 execution_node: config.execution_node,
105 subblocks: config.subblocks,
106
107 scheme_provider: config.scheme_provider,
108
109 state: Uninit(()),
110 },
111 })
112 }
113
114 async fn run_until_stopped(self, dkg_manager: crate::dkg::manager::Mailbox) {
116 let Self {
117 context,
118 mailbox,
119 inner,
120 } = self;
121 let Ok(initialized) = inner.into_initialized(context.clone(), dkg_manager).await else {
124 return;
126 };
127
128 Actor {
129 context,
130 mailbox,
131 inner: initialized,
132 }
133 .run_until_stopped()
134 .await
135 }
136
137 pub(in crate::consensus) fn start(
138 mut self,
139 dkg_manager: crate::dkg::manager::Mailbox,
140 ) -> Handle<()> {
141 spawn_cell!(self.context, self.run_until_stopped(dkg_manager).await)
142 }
143}
144
145impl<TContext> Actor<TContext, Init>
146where
147 TContext: Pacer + governor::clock::Clock + Rng + CryptoRng + Spawner + Storage + Metrics,
148{
149 async fn run_until_stopped(mut self) {
150 while let Some(msg) = self.mailbox.next().await {
151 if let Err(error) = self.handle_message(msg) {
152 error_span!("handle message").in_scope(|| {
153 error!(
154 %error,
155 "critical error occurred while handling message; exiting"
156 )
157 });
158 break;
159 }
160 }
161 }
162
163 fn handle_message(&mut self, msg: Message) -> eyre::Result<()> {
164 match msg {
165 Message::Broadcast(broadcast) => {
166 self.context.with_label("broadcast").spawn({
167 let inner = self.inner.clone();
168 move |_| inner.handle_broadcast(broadcast)
169 });
170 }
171 Message::Finalized(finalized) => {
172 self.inner
175 .handle_finalized(*finalized)
176 .wrap_err("failed finalizing block")?;
177 }
178 Message::Genesis(genesis) => {
179 self.context.with_label("genesis").spawn({
180 let inner = self.inner.clone();
181 move |_| inner.handle_genesis(genesis)
182 });
183 }
184 Message::Propose(propose) => {
185 self.context.with_label("propose").spawn({
186 let inner = self.inner.clone();
187 move |context| inner.handle_propose(propose, context)
188 });
189 }
190 Message::Verify(verify) => {
191 self.context.with_label("verify").spawn({
192 let inner = self.inner.clone();
193 move |context| inner.handle_verify(*verify, context)
194 });
195 }
196 }
197 Ok(())
198 }
199}
200
201#[derive(Clone)]
202struct Inner<TState> {
203 fee_recipient: alloy_primitives::Address,
204 epoch_length: u64,
205 new_payload_wait_time: Duration,
206
207 my_mailbox: Mailbox,
208
209 marshal: crate::alias::marshal::Mailbox,
210
211 genesis_block: Arc<Block>,
212 execution_node: TempoFullNode,
213 subblocks: subblocks::Mailbox,
214 scheme_provider: SchemeProvider,
215
216 state: TState,
217}
218
219impl Inner<Init> {
220 #[instrument(
221 skip_all,
222 fields(%broadcast.payload),
223 err(level = Level::ERROR),
224 )]
225 async fn handle_broadcast(mut self, broadcast: Broadcast) -> eyre::Result<()> {
226 let Some(latest_proposed) = self.state.latest_proposed_block.read().await.clone() else {
227 return Err(eyre!("there was no latest block to broadcast"));
228 };
229 ensure!(
230 broadcast.payload == latest_proposed.digest(),
231 "broadcast of payload `{}` was requested, but digest of latest proposed block is `{}`",
232 broadcast.payload,
233 latest_proposed.digest(),
234 );
235
236 self.marshal.broadcast(latest_proposed).await;
237 Ok(())
238 }
239
240 #[instrument(skip_all)]
241 fn handle_finalized(&self, finalized: Finalized) -> eyre::Result<()> {
243 self.state.executor_mailbox.forward_finalized(finalized)
244 }
245
246 #[instrument(
247 skip_all,
248 fields(
249 epoch = genesis.epoch,
250 ),
251 ret(Display),
252 err(level = Level::ERROR)
253 )]
254 async fn handle_genesis(mut self, genesis: Genesis) -> eyre::Result<Digest> {
255 let source = if genesis.epoch == 0 {
256 self.genesis_block.digest()
257 } else {
258 let height =
261 utils::last_block_in_epoch(self.epoch_length, genesis.epoch.saturating_sub(1));
262
263 let Some((_, digest)) = self.marshal.get_info(height).await else {
264 bail!(
274 "no information on the source block at height `{height}` \
275 exists yet; this is a problem and will likely cause the \
276 consensus engine to not start"
277 );
278 };
279 digest
280 };
281 genesis.response.send(source).map_err(|_| {
282 eyre!("failed returning parent digest for epoch: return channel was already closed")
283 })?;
284 Ok(source)
285 }
286
287 #[instrument(
289 skip_all,
290 fields(
291 epoch = request.round.epoch(),
292 view = request.round.view(),
293 parent.view = request.parent.0,
294 parent.digest = %request.parent.1,
295 ),
296 err(level = Level::WARN),
297 )]
298 async fn handle_propose<TContext: Pacer>(
299 self,
300 request: Propose,
301 context: TContext,
302 ) -> eyre::Result<()> {
303 let Propose {
304 parent: (parent_view, parent_digest),
305 mut response,
306 round,
307 } = request;
308
309 let proposal = select!(
310 () = response.cancellation() => {
311 Err(eyre!(
312 "proposal return channel was closed by consensus \
313 engine before block could be proposed; aborting"
314 ))
315 },
316
317 res = self.clone().propose(
318 context.clone(),
319 parent_view,
320 parent_digest,
321 round
322 ) => {
323 res.wrap_err("failed creating a proposal")
324 }
325 )?;
326
327 let proposal_digest = proposal.digest();
328 let proposal_height = proposal.height();
329
330 info!(
331 proposal.digest = %proposal_digest,
332 proposal.height = %proposal_height,
333 "constructed proposal",
334 );
335
336 response.send(proposal_digest).map_err(|_| {
337 eyre!(
338 "failed returning proposal to consensus engine: response \
339 channel was already closed"
340 )
341 })?;
342
343 if proposal_digest == parent_digest {
346 return Ok(());
347 }
348
349 {
350 let mut lock = self.state.latest_proposed_block.write().await;
351 *lock = Some(proposal.clone());
352 }
353
354 let is_good = verify_block(
357 context,
358 round.epoch(),
359 self.epoch_length,
360 self.execution_node
361 .add_ons_handle
362 .beacon_engine_handle
363 .clone(),
364 &proposal,
365 parent_digest,
366 &self.scheme_provider,
367 )
368 .await
369 .wrap_err("failed verifying block against execution layer")?;
370
371 if !is_good {
372 eyre::bail!("validation reported that that just-proposed block is invalid");
373 }
374
375 Ok(())
376 }
377
378 #[instrument(
389 skip_all,
390 fields(
391 epoch = verify.round.epoch(),
392 view = verify.round.view(),
393 digest = %verify.payload,
394 parent.view = verify.parent.0,
395 parent.digest = %verify.parent.1,
396 proposer = %verify.proposer,
397 ),
398 )]
399 async fn handle_verify<TContext: Pacer>(mut self, verify: Verify, context: TContext) {
400 let Verify {
401 parent,
402 payload,
403 proposer,
404 mut response,
405 round,
406 } = verify;
407 let result = select!(
408 () = response.cancellation() => {
409 Err(eyre!(
410 "verification return channel was closed by consensus \
411 engine before block could be validated; aborting"
412 ))
413 },
414
415 res = self.clone().verify(context, parent, payload, proposer, round) => {
416 res.wrap_err("block verification failed")
417 }
418 );
419
420 let _ = report_verification_result(response, &result);
423
424 if let Ok((block, true)) = result {
426 if parent.1 != payload
429 && let Err(error) = self
430 .state
431 .executor_mailbox
432 .canonicalize_head(block.height(), block.digest())
433 {
434 tracing::warn!(
435 %error,
436 "failed making the verified proposal the head of the canonical chain",
437 );
438 }
439 self.marshal.verified(round, block).await;
440 }
441 }
442
443 async fn propose<TContext: Pacer>(
444 mut self,
445 context: TContext,
446 parent_view: View,
447 parent_digest: Digest,
448 round: Round,
449 ) -> eyre::Result<Block> {
450 let genesis_block = self.genesis_block.clone();
451 let parent_request = if parent_digest == genesis_block.digest() {
452 Either::Left(always_ready(|| Ok((*genesis_block).clone())))
453 } else {
454 Either::Right(
455 self.marshal
456 .subscribe(Some(Round::new(round.epoch(), parent_view)), parent_digest)
457 .await,
458 )
459 };
460 let parent = parent_request
461 .await
462 .map_err(|_| eyre!(
463 "failed getting parent block from syncer; syncer dropped channel before request was fulfilled"
464 ))?;
465
466 debug!(height = parent.height(), "retrieved parent block",);
467
468 if utils::is_last_block_in_epoch(self.epoch_length, parent.height())
472 .is_some_and(|e| e == round.epoch())
473 {
474 info!("parent is last height of epoch; re-proposing parent");
475 return Ok(parent);
476 }
477
478 if !verify_block(
480 context.clone(),
481 utils::epoch(self.epoch_length, parent.height()),
482 self.epoch_length,
483 self.execution_node
484 .add_ons_handle
485 .beacon_engine_handle
486 .clone(),
487 &parent,
488 parent.parent_digest(),
490 &self.scheme_provider,
491 )
492 .await
493 .wrap_err("failed verifying block against execution layer")?
494 {
495 eyre::bail!("the proposal parent block is not valid");
496 }
497
498 ready(
499 self.state
500 .executor_mailbox
501 .canonicalize_head(parent.height(), parent.digest()),
502 )
503 .and_then(|ack| ack.map_err(eyre::Report::new))
504 .await
505 .wrap_err("failed updating canonical head to parent")?;
506
507 let extra_data = if utils::is_last_block_in_epoch(self.epoch_length, parent.height() + 1)
510 .is_some_and(|e| e == round.epoch())
511 {
512 let outcome = self
514 .state
515 .dkg_manager
516 .get_public_ceremony_outcome()
517 .await
518 .wrap_err("failed getting public dkg ceremony outcome")?;
519 ensure!(
520 round.epoch() + 1 == outcome.epoch,
521 "outcome is for epoch `{}`, but we are trying to include the \
522 outcome for epoch `{}`",
523 outcome.epoch,
524 round.epoch() + 1,
525 );
526 info!(
527 outcome.epoch,
528 "received DKG outcome; will include in payload builder attributes",
529 );
530 outcome.encode().freeze().into()
531 } else {
532 match self
534 .state
535 .dkg_manager
536 .get_intermediate_dealing(round.epoch())
537 .await
538 {
539 Err(error) => {
540 warn!(
541 %error,
542 "failed getting ceremony deal for current epoch because DKG manager went away",
543 );
544 Bytes::default()
545 }
546 Ok(None) => Bytes::default(),
547 Ok(Some(deal_outcome)) => {
548 info!(
549 "found ceremony deal outcome; will include in payload builder attributes"
550 );
551 deal_outcome.encode().freeze().into()
552 }
553 }
554 };
555
556 let attrs = TempoPayloadBuilderAttributes::new(
557 payload_id_from_block_hash(&parent.block_hash()),
565 parent.block_hash(),
566 self.fee_recipient,
567 context.current().epoch_millis(),
568 extra_data,
569 move || {
570 self.subblocks
571 .get_subblocks(parent.block_hash())
572 .unwrap_or_default()
573 },
574 );
575
576 let interrupt_handle = attrs.interrupt_handle().clone();
577
578 let payload_id = self
579 .execution_node
580 .payload_builder_handle
581 .send_new_payload(attrs)
582 .pace(&context, Duration::from_millis(20))
583 .await
584 .map_err(|_| eyre!("channel was closed before a response was returned"))
585 .and_then(|ret| ret.wrap_err("execution layer rejected request"))
586 .wrap_err("failed requesting new payload from the execution layer")?;
587
588 debug!(
589 timeout_ms = self.new_payload_wait_time.as_millis(),
590 "sleeping for payload builder timeout"
591 );
592 context.sleep(self.new_payload_wait_time).await;
593
594 interrupt_handle.interrupt();
595
596 let payload = self
597 .execution_node
598 .payload_builder_handle
599 .resolve_kind(payload_id, reth_node_builder::PayloadKind::WaitForPending)
600 .pace(&context, Duration::from_millis(20))
601 .await
602 .ok_or_eyre("no payload found under provided id")
606 .and_then(|rsp| rsp.map_err(Into::<eyre::Report>::into))
607 .wrap_err_with(|| format!("failed getting payload for payload ID `{payload_id}`"))?;
608
609 Ok(Block::from_execution_block(payload.block().clone()))
610 }
611
612 async fn verify<TContext: Pacer>(
613 mut self,
614 context: TContext,
615 (parent_view, parent_digest): (View, Digest),
616 payload: Digest,
617 proposer: PublicKey,
618 round: Round,
619 ) -> eyre::Result<(Block, bool)> {
620 let genesis_block = self.genesis_block.clone();
621 let parent_request = if parent_digest == genesis_block.digest() {
622 Either::Left(always_ready(|| Ok((*genesis_block).clone())))
623 } else {
624 Either::Right(
625 self.marshal
626 .subscribe(Some(Round::new(round.epoch(), parent_view)), parent_digest)
627 .await
628 .map_err(|_| eyre!("syncer dropped channel before the parent block was sent")),
629 )
630 };
631 let block_request = self
632 .marshal
633 .subscribe(None, payload)
634 .await
635 .map_err(|_| eyre!("syncer dropped channel before the block-to-verified was sent"));
636
637 let (block, parent) = try_join(block_request, parent_request)
638 .await
639 .wrap_err("failed getting required blocks from syncer")?;
640
641 if payload == parent_digest {
648 if utils::is_last_block_in_epoch(self.epoch_length, block.height())
649 .is_some_and(|e| e == round.epoch())
650 {
651 return Ok((block, true));
652 } else {
653 return Ok((block, false));
654 }
655 }
656
657 if let Err(reason) = verify_header_extra_data(
658 &block,
659 &self.state.dkg_manager,
660 self.epoch_length,
661 &proposer,
662 )
663 .await
664 {
665 warn!(
666 %reason,
667 "header extra data could not be verified; failing block",
668 );
669 return Ok((block, false));
670 }
671
672 if let Err(error) = self
673 .state
674 .executor_mailbox
675 .canonicalize_head(parent.height(), parent.digest())
676 {
677 tracing::warn!(
678 %error,
679 parent.height = parent.height(),
680 parent.digest = %parent.digest(),
681 "failed updating canonical head to parent",
682 );
683 }
684
685 let is_good = verify_block(
686 context,
687 round.epoch(),
688 self.epoch_length,
689 self.execution_node
690 .add_ons_handle
691 .beacon_engine_handle
692 .clone(),
693 &block,
694 parent_digest,
695 &self.scheme_provider,
696 )
697 .await
698 .wrap_err("failed verifying block against execution layer")?;
699
700 Ok((block, is_good))
701 }
702}
703
704impl Inner<Uninit> {
705 #[instrument(skip_all, err)]
712 async fn into_initialized<TContext: Metrics + Spawner + Pacer>(
713 self,
714 context: TContext,
715 dkg_manager: crate::dkg::manager::Mailbox,
716 ) -> eyre::Result<Inner<Init>> {
717 let executor = executor::Builder {
718 execution_node: self.execution_node.clone(),
719 genesis_block: self.genesis_block.clone(),
720 marshal: self.marshal.clone(),
721 }
722 .build(context.with_label("executor"));
723
724 let executor_mailbox = executor.mailbox().clone();
725 let executor_handle = executor.start();
726
727 let initialized = Inner {
728 fee_recipient: self.fee_recipient,
729 epoch_length: self.epoch_length,
730 new_payload_wait_time: self.new_payload_wait_time,
731 my_mailbox: self.my_mailbox,
732 marshal: self.marshal,
733 genesis_block: self.genesis_block,
734 execution_node: self.execution_node,
735 state: Init {
736 latest_proposed_block: Arc::new(RwLock::new(None)),
737 dkg_manager,
738 executor_mailbox,
739 _executor_handle: AbortOnDrop(executor_handle).into(),
740 },
741 subblocks: self.subblocks,
742 scheme_provider: self.scheme_provider,
743 };
744
745 Ok(initialized)
746 }
747}
748
749#[derive(Clone, Debug)]
751pub(in crate::consensus) struct Uninit(());
752
753#[derive(Clone, Debug)]
755struct Init {
756 latest_proposed_block: Arc<RwLock<Option<Block>>>,
757 dkg_manager: crate::dkg::manager::Mailbox,
758 executor_mailbox: ExecutorMailbox,
760 _executor_handle: Arc<AbortOnDrop>,
765}
766
767#[instrument(
776 skip_all,
777 fields(
778 epoch,
779 epoch_length,
780 block.parent_digest = %block.parent_digest(),
781 block.digest = %block.digest(),
782 block.height = block.height(),
783 block.timestamp = block.timestamp(),
784 parent.digest = %parent_digest,
785 )
786)]
787async fn verify_block<TContext: Pacer>(
788 context: TContext,
789 epoch: Epoch,
790 epoch_length: u64,
791 engine: ConsensusEngineHandle<TempoPayloadTypes>,
792 block: &Block,
793 parent_digest: Digest,
794 scheme_provider: &SchemeProvider,
795) -> eyre::Result<bool> {
796 use alloy_rpc_types_engine::PayloadStatusEnum;
797
798 if utils::epoch(epoch_length, block.height()) != epoch {
799 info!("block does not belong to this epoch");
800 return Ok(false);
801 }
802 if block.parent_hash() != *parent_digest {
803 info!(
804 "parent digest stored in block must match the digest of the parent \
805 argument but doesn't"
806 );
807 return Ok(false);
808 }
809 let scheme = scheme_provider
810 .scheme(epoch)
811 .ok_or_eyre("cannot determine participants in the current epoch")?;
812 let block = block.clone().into_inner();
813 let execution_data = TempoExecutionData {
814 block: Arc::new(block),
815 validator_set: Some(
816 scheme
817 .participants()
818 .into_iter()
819 .map(|p| B256::from_slice(p))
820 .collect(),
821 ),
822 };
823 let payload_status = engine
824 .new_payload(execution_data)
825 .pace(&context, Duration::from_millis(50))
826 .await
827 .wrap_err("failed sending `new payload` message to execution layer to validate block")?;
828 match payload_status.status {
829 PayloadStatusEnum::Valid | PayloadStatusEnum::Accepted => Ok(true),
830 PayloadStatusEnum::Invalid { validation_error } => {
831 info!(
832 validation_error,
833 "execution layer returned that the block was invalid"
834 );
835 Ok(false)
836 }
837 PayloadStatusEnum::Syncing => {
838 bail!(
840 "failed validating block because payload is still syncing, \
841 this means the parent block was available to the consensus
842 layer but not the execution layer"
843 )
844 }
845 }
846}
847
848#[instrument(skip_all, err(Display))]
849async fn verify_header_extra_data(
850 block: &Block,
851 dkg_manager: &crate::dkg::manager::Mailbox,
852 epoch_length: u64,
853 proposer: &PublicKey,
854) -> eyre::Result<()> {
855 if utils::is_last_block_in_epoch(epoch_length, block.height()).is_some() {
856 info!(
857 "on last block of epoch; verifying that the boundary block \
858 contains the correct DKG outcome",
859 );
860 let our_outcome = dkg_manager.get_public_ceremony_outcome().await.wrap_err(
861 "failed getting public dkg ceremony outcome; cannot verify end \
862 of epoch block",
863 )?;
864 let block_outcome = PublicOutcome::decode(block.header().extra_data().as_ref()).wrap_err(
865 "failed decoding extra data header as DKG ceremony \
866 outcome; cannot verify end of epoch block",
867 )?;
868 if our_outcome != block_outcome {
869 warn!(
871 our.epoch = our_outcome.epoch,
872 our.participants = ?our_outcome.participants,
873 our.public = ?our_outcome.public,
874 block.epoch = block_outcome.epoch,
875 block.participants = ?block_outcome.participants,
876 block.public = ?block_outcome.public,
877 "our public dkg ceremony outcome does not match what's stored \
878 in the block",
879 );
880 return Err(eyre!(
881 "our public dkg ceremony outcome does not match what's \
882 stored in the block header extra_data field; they must \
883 match so that the end-of-block is valid",
884 ));
885 }
886 } else if !block.header().extra_data().is_empty()
887 && let Ok(dealing) = block.try_read_ceremony_deal_outcome()
888 {
889 info!("block header extra_data header contained intermediate DKG dealing; verifying it");
890 ensure!(
891 dealing.dealer() == proposer,
892 "proposer `{proposer}` is not the dealer `{}` recorded in the \
893 intermediate DKG dealing",
894 dealing.dealer(),
895 );
896
897 ensure!(
898 dkg_manager
899 .verify_intermediate_dealings(dealing)
900 .await
901 .wrap_err("failed request to verify DKG dealing")?,
902 "signature of intermediate DKG outcome could not be verified",
903 );
904 }
905
906 Ok(())
907}
908
909fn payload_id_from_block_hash(block_hash: &B256) -> PayloadId {
911 PayloadId::new(
912 <[u8; 8]>::try_from(&block_hash[0..8])
913 .expect("a 32 byte array always has more than 8 bytes"),
914 )
915}
916
917#[instrument(skip_all, err)]
922fn report_verification_result(
923 response: oneshot::Sender<bool>,
924 verification_result: &eyre::Result<(Block, bool)>,
925) -> eyre::Result<()> {
926 match &verification_result {
927 Ok((_, is_good)) => {
928 info!(
929 proposal_valid = is_good,
930 "returning proposal verification result to consensus",
931 );
932 response.send(*is_good).map_err(|_| {
933 eyre!(
934 "attempted to send return verification result, but \
935 receiver already dropped the channel"
936 )
937 })?;
938 }
939 Err(error) => {
940 info!(
941 %error,
942 "could not decide proposal, dropping response channel",
943 );
944 }
945 }
946 Ok(())
947}
948
949struct AbortOnDrop(Handle<()>);
951
952impl Drop for AbortOnDrop {
953 fn drop(&mut self) {
954 self.0.abort();
955 }
956}
957
958impl std::fmt::Debug for AbortOnDrop {
959 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
960 f.debug_struct("AbortOnDrop").finish_non_exhaustive()
961 }
962}