1use std::{sync::Arc, time::Duration};
14
15use alloy_consensus::BlockHeader;
16use alloy_primitives::{B256, Bytes};
17use alloy_rpc_types_engine::PayloadId;
18use commonware_codec::{Encode as _, ReadExt as _};
19use commonware_consensus::{
20 Heightable as _,
21 types::{Epoch, Epocher as _, FixedEpocher, Height, HeightDelta, Round, View},
22};
23use commonware_cryptography::{certificate::Provider as _, ed25519::PublicKey};
24use commonware_macros::select;
25use commonware_runtime::{
26 ContextCell, FutureExt as _, Handle, Metrics, Pacer, Spawner, Storage, spawn_cell,
27};
28
29use commonware_utils::{SystemTimeExt, channel::oneshot};
30use eyre::{OptionExt as _, WrapErr as _, bail, ensure, eyre};
31use futures::{
32 StreamExt as _, TryFutureExt as _,
33 channel::mpsc,
34 future::{ready, try_join},
35};
36use rand_08::{CryptoRng, Rng};
37use reth_ethereum::chainspec::EthChainSpec as _;
38use reth_node_builder::{Block as _, BuiltPayload, ConsensusEngineHandle};
39use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
40use tempo_node::{TempoExecutionData, TempoFullNode, TempoPayloadTypes};
41
42use reth_provider::{BlockHashReader as _, BlockReader as _};
43use tokio::sync::RwLock;
44use tracing::{Level, debug, error, error_span, info, info_span, instrument, warn};
45
46use tempo_payload_types::TempoPayloadBuilderAttributes;
47
48use super::{
49 Mailbox,
50 ingress::{Broadcast, Genesis, Message, Propose, Verify},
51};
52use crate::{
53 consensus::{Digest, block::Block},
54 epoch::SchemeProvider,
55 subblocks,
56};
57
58pub(in crate::consensus) struct Actor<TContext, TState = Uninit> {
59 context: ContextCell<TContext>,
60 mailbox: mpsc::Receiver<Message>,
61
62 inner: Inner<TState>,
63}
64
65impl<TContext, TState> Actor<TContext, TState> {
66 pub(super) fn mailbox(&self) -> &Mailbox {
67 &self.inner.my_mailbox
68 }
69}
70
71impl<TContext> Actor<TContext, Uninit>
72where
73 TContext: Pacer + governor::clock::Clock + Rng + CryptoRng + Spawner + Storage + Metrics,
74{
75 pub(super) async fn init(config: super::Config<TContext>) -> eyre::Result<Self> {
76 let (tx, rx) = mpsc::channel(config.mailbox_size);
77 let my_mailbox = Mailbox::from_sender(tx);
78
79 Ok(Self {
80 context: ContextCell::new(config.context),
81 mailbox: rx,
82
83 inner: Inner {
84 fee_recipient: config.fee_recipient,
85 epoch_strategy: config.epoch_strategy,
86
87 payload_resolve_time: config.payload_resolve_time,
88 payload_return_time: config.payload_return_time,
89
90 my_mailbox,
91 marshal: config.marshal,
92
93 execution_node: config.execution_node,
94 executor: config.executor,
95
96 subblocks: config.subblocks,
97
98 scheme_provider: config.scheme_provider,
99
100 state: Uninit(()),
101 },
102 })
103 }
104
105 async fn run_until_stopped(self, dkg_manager: crate::dkg::manager::Mailbox) {
107 let Self {
108 context,
109 mailbox,
110 inner,
111 } = self;
112 let Ok(initialized) = inner.into_initialized(dkg_manager).await else {
115 return;
117 };
118
119 Actor {
120 context,
121 mailbox,
122 inner: initialized,
123 }
124 .run_until_stopped()
125 .await
126 }
127
128 pub(in crate::consensus) fn start(
129 mut self,
130 dkg_manager: crate::dkg::manager::Mailbox,
131 ) -> Handle<()> {
132 spawn_cell!(self.context, self.run_until_stopped(dkg_manager).await)
133 }
134}
135
136impl<TContext> Actor<TContext, Init>
137where
138 TContext: Pacer + governor::clock::Clock + Rng + CryptoRng + Spawner + Storage + Metrics,
139{
140 async fn run_until_stopped(mut self) {
141 while let Some(msg) = self.mailbox.next().await {
142 if let Err(error) = self.handle_message(msg) {
143 error_span!("handle message").in_scope(|| {
144 error!(
145 %error,
146 "critical error occurred while handling message; exiting"
147 )
148 });
149 break;
150 }
151 }
152 }
153
154 fn handle_message(&mut self, msg: Message) -> eyre::Result<()> {
155 match msg {
156 Message::Broadcast(broadcast) => {
157 self.context.with_label("broadcast").spawn({
158 let inner = self.inner.clone();
159 move |_| inner.handle_broadcast(broadcast)
160 });
161 }
162 Message::Genesis(genesis) => {
163 self.context.with_label("genesis").spawn({
164 let inner = self.inner.clone();
165 move |context| inner.handle_genesis(genesis, context)
166 });
167 }
168 Message::Propose(propose) => {
169 self.context.with_label("propose").spawn({
170 let inner = self.inner.clone();
171 move |context| inner.handle_propose(propose, context)
172 });
173 }
174 Message::Verify(verify) => {
175 self.context.with_label("verify").spawn({
176 let inner = self.inner.clone();
177 move |context| inner.handle_verify(*verify, context)
178 });
179 }
180 }
181 Ok(())
182 }
183}
184
185#[derive(Clone)]
186struct Inner<TState> {
187 fee_recipient: alloy_primitives::Address,
188 epoch_strategy: FixedEpocher,
189 payload_resolve_time: Duration,
190 payload_return_time: Duration,
191
192 my_mailbox: Mailbox,
193
194 marshal: crate::alias::marshal::Mailbox,
195
196 execution_node: TempoFullNode,
197 executor: crate::executor::Mailbox,
198 subblocks: Option<subblocks::Mailbox>,
199 scheme_provider: SchemeProvider,
200
201 state: TState,
202}
203
204impl Inner<Init> {
205 #[instrument(
206 skip_all,
207 fields(%broadcast.payload),
208 err(level = Level::ERROR),
209 )]
210 async fn handle_broadcast(self, broadcast: Broadcast) -> eyre::Result<()> {
211 let Some((round, latest_proposed)) = self.state.latest_proposed_block.read().await.clone()
212 else {
213 return Err(eyre!("there was no latest block to broadcast"));
214 };
215 ensure!(
216 broadcast.payload == latest_proposed.digest(),
217 "broadcast of payload `{}` was requested, but digest of latest proposed block is `{}`",
218 broadcast.payload,
219 latest_proposed.digest(),
220 );
221
222 self.marshal.proposed(round, latest_proposed).await;
223 Ok(())
224 }
225
226 #[instrument(
227 skip_all,
228 fields(
229 epoch = %genesis.epoch,
230 ),
231 ret(Display),
232 err(level = Level::ERROR)
233 )]
234 async fn handle_genesis<TContext: commonware_runtime::Clock>(
235 self,
236 mut genesis: Genesis,
237 context: TContext,
238 ) -> eyre::Result<Digest> {
239 let boundary = match genesis.epoch.previous() {
243 None => Height::zero(),
244 Some(previous_epoch) => self
245 .epoch_strategy
246 .last(previous_epoch)
247 .expect("epoch strategy is for all epochs"),
248 };
249
250 let mut attempts = 0;
251 let epoch_genesis = loop {
252 attempts += 1;
253 if let Ok(Some(hash)) = self.execution_node.provider.block_hash(boundary.get()) {
254 break Digest(hash);
255 } else if let Some((_, digest)) = self.marshal.get_info(boundary).await {
256 break digest;
257 } else {
258 info_span!("fetch_genesis_digest").in_scope(|| {
259 info!(
260 boundary.height = %boundary,
261 attempts,
262 "neither marshal actor nor execution layer had the \
263 boundary block of the previous epoch available; \
264 waiting 2s before trying again"
265 );
266 });
267 select!(
268 () = genesis.response.closed() => {
269 return Err(eyre!("genesis request was cancelled"));
270 },
271
272 _ = context.sleep(Duration::from_secs(2)) => {
273 continue;
274 },
275 );
276 }
277 };
278 genesis.response.send(epoch_genesis).map_err(|_| {
279 eyre!("failed returning parent digest for epoch: return channel was already closed")
280 })?;
281 Ok(epoch_genesis)
282 }
283
284 #[instrument(
286 skip_all,
287 fields(
288 epoch = %request.round.epoch(),
289 view = %request.round.view(),
290 parent.view = %request.parent.0,
291 parent.digest = %request.parent.1,
292 ),
293 err(level = Level::WARN),
294 )]
295 async fn handle_propose<TContext: Pacer>(
296 self,
297 request: Propose,
298 context: TContext,
299 ) -> eyre::Result<()> {
300 let Propose {
301 parent: (parent_view, parent_digest),
302 mut response,
303 round,
304 } = request;
305
306 let proposal = select!(
307 () = response.closed() => {
308 Err(eyre!(
309 "proposal return channel was closed by consensus \
310 engine before block could be proposed; aborting"
311 ))
312 },
313
314 res = self.clone().propose(
315 context.clone(),
316 parent_view,
317 parent_digest,
318 round
319 ) => {
320 res.wrap_err("failed creating a proposal")
321 }
322 )?;
323
324 let proposal_digest = proposal.digest();
325 let proposal_height = proposal.height();
326
327 info!(
328 proposal.digest = %proposal_digest,
329 proposal.height = %proposal_height,
330 "constructed proposal",
331 );
332
333 response.send(proposal_digest).map_err(|_| {
334 eyre!(
335 "failed returning proposal to consensus engine: response \
336 channel was already closed"
337 )
338 })?;
339
340 if proposal_digest == parent_digest {
343 return Ok(());
344 }
345
346 {
347 let mut lock = self.state.latest_proposed_block.write().await;
348 *lock = Some((round, proposal.clone()));
349 }
350
351 Ok(())
352 }
353
354 #[instrument(
365 skip_all,
366 fields(
367 epoch = %verify.round.epoch(),
368 view = %verify.round.view(),
369 digest = %verify.payload,
370 parent.view = %verify.parent.0,
371 parent.digest = %verify.parent.1,
372 proposer = %verify.proposer,
373 ),
374 )]
375 async fn handle_verify<TContext: Pacer>(self, verify: Verify, context: TContext) {
376 let Verify {
377 parent,
378 payload,
379 proposer,
380 mut response,
381 round,
382 } = verify;
383 let result = select!(
384 () = response.closed() => {
385 Err(eyre!(
386 "verification return channel was closed by consensus \
387 engine before block could be validated; aborting"
388 ))
389 },
390
391 res = self.clone().verify(context, parent, payload, proposer, round) => {
392 res.wrap_err("block verification failed")
393 }
394 );
395
396 let _ = report_verification_result(response, &result);
399
400 if let Ok((block, true)) = result {
402 if parent.1 != payload
405 && let Err(error) = self
406 .state
407 .executor
408 .canonicalize_head(block.height(), block.digest())
409 {
410 tracing::warn!(
411 %error,
412 "failed making the verified proposal the head of the canonical chain",
413 );
414 }
415 self.marshal.verified(round, block).await;
416 }
417 }
418
419 async fn propose<TContext: Pacer>(
420 self,
421 context: TContext,
422 parent_view: View,
423 parent_digest: Digest,
424 round: Round,
425 ) -> eyre::Result<Block> {
426 let parent = get_parent(
427 &self.execution_node,
428 round,
429 parent_digest,
430 parent_view,
431 &self.marshal,
432 )
433 .await?;
434 debug!(height = %parent.height(), "retrieved parent block",);
435
436 let parent_epoch_info = self
437 .epoch_strategy
438 .containing(parent.height())
439 .expect("epoch strategy is for all heights");
440 if parent_epoch_info.last() == parent.height() && parent_epoch_info.epoch() == round.epoch()
444 {
445 info!("parent is last height of epoch; re-proposing parent");
446 return Ok(parent);
447 }
448
449 if !verify_block(
451 context.clone(),
452 parent_epoch_info.epoch(),
453 &self.epoch_strategy,
454 self.execution_node
455 .add_ons_handle
456 .beacon_engine_handle
457 .clone(),
458 &parent,
459 parent.parent_digest(),
461 &self.scheme_provider,
462 )
463 .await
464 .wrap_err("failed verifying block against execution layer")?
465 {
466 eyre::bail!("the proposal parent block is not valid");
467 }
468
469 ready(
470 self.state
471 .executor
472 .canonicalize_head(parent.height(), parent.digest()),
473 )
474 .and_then(|ack| ack.map_err(eyre::Report::new))
475 .await
476 .wrap_err("failed updating canonical head to parent")?;
477
478 let extra_data = if parent_epoch_info.last() == parent.height().next()
481 && parent_epoch_info.epoch() == round.epoch()
482 {
483 let outcome = self
485 .state
486 .dkg_manager
487 .get_dkg_outcome(parent_digest, parent.height())
488 .await
489 .wrap_err("failed getting public dkg ceremony outcome")?;
490 ensure!(
491 round.epoch().next() == outcome.epoch,
492 "outcome is for epoch `{}`, but we are trying to include the \
493 outcome for epoch `{}`",
494 outcome.epoch,
495 round.epoch().next(),
496 );
497 info!(
498 %outcome.epoch,
499 outcome.network_identity = %outcome.network_identity(),
500 outcome.dealers = ?outcome.dealers(),
501 outcome.players = ?outcome.players(),
502 outcome.next_players = ?outcome.next_players(),
503 "received DKG outcome; will include in payload builder attributes",
504 );
505 outcome.encode().into()
506 } else {
507 match self.state.dkg_manager.get_dealer_log(round.epoch()).await {
509 Err(error) => {
510 warn!(
511 %error,
512 "failed getting signed dealer log for current epoch \
513 because actor dropped response channel",
514 );
515 Bytes::default()
516 }
517 Ok(None) => Bytes::default(),
518 Ok(Some(log)) => {
519 info!(
520 "received signed dealer log; will include in payload \
521 builder attributes"
522 );
523 log.encode().into()
524 }
525 }
526 };
527
528 let attrs = TempoPayloadBuilderAttributes::new(
529 payload_id_from_block_hash(&parent.block_hash()),
537 parent.block_hash(),
538 self.fee_recipient,
539 context.current().epoch_millis(),
540 extra_data,
541 move || {
542 self.subblocks
543 .as_ref()
544 .and_then(|s| s.get_subblocks(parent.block_hash()).ok())
545 .unwrap_or_default()
546 },
547 );
548
549 let interrupt_handle = attrs.interrupt_handle().clone();
550
551 let payload_id = self
552 .execution_node
553 .payload_builder_handle
554 .send_new_payload(attrs)
555 .pace(&context, Duration::from_millis(20))
556 .await
557 .map_err(|_| eyre!("channel was closed before a response was returned"))
558 .and_then(|ret| ret.wrap_err("execution layer rejected request"))
559 .wrap_err("failed requesting new payload from the execution layer")?;
560
561 debug!(
562 resolve_time_ms = self.payload_resolve_time.as_millis(),
563 return_time_ms = self.payload_return_time.as_millis(),
564 "sleeping before payload builder resolving"
565 );
566
567 let payload_return_time = context.current() + self.payload_return_time;
572
573 context.sleep(self.payload_resolve_time).await;
579
580 interrupt_handle.interrupt();
581
582 let payload = self
583 .execution_node
584 .payload_builder_handle
585 .resolve_kind(payload_id, reth_node_builder::PayloadKind::WaitForPending)
586 .pace(&context, Duration::from_millis(20))
587 .await
588 .ok_or_eyre("no payload found under provided id")
592 .and_then(|rsp| rsp.map_err(Into::<eyre::Report>::into))
593 .wrap_err_with(|| format!("failed getting payload for payload ID `{payload_id}`"))?;
594
595 context.sleep_until(payload_return_time).await;
597
598 Ok(Block::from_execution_block(payload.block().clone()))
599 }
600
601 async fn verify<TContext: Pacer>(
602 self,
603 context: TContext,
604 (parent_view, parent_digest): (View, Digest),
605 payload: Digest,
606 proposer: PublicKey,
607 round: Round,
608 ) -> eyre::Result<(Block, bool)> {
609 let block_request = self
610 .marshal
611 .subscribe_by_digest(None, payload)
612 .await
613 .map_err(|_| eyre!("syncer dropped channel before the block-to-verified was sent"));
614
615 let (block, parent) = try_join(
616 block_request,
617 get_parent(
618 &self.execution_node,
619 round,
620 parent_digest,
621 parent_view,
622 &self.marshal,
623 ),
624 )
625 .await
626 .wrap_err("failed getting required blocks from syncer")?;
627
628 if payload == parent_digest {
635 let epoch_info = self
636 .epoch_strategy
637 .containing(block.height())
638 .expect("epoch strategy is for all heights");
639 if epoch_info.last() == block.height() && epoch_info.epoch() == round.epoch() {
640 return Ok((block, true));
641 } else {
642 return Ok((block, false));
643 }
644 }
645
646 if let Err(reason) = verify_header_extra_data(
647 &block,
648 (parent_view, parent_digest),
649 round,
650 &self.state.dkg_manager,
651 &self.epoch_strategy,
652 &proposer,
653 )
654 .await
655 {
656 warn!(
657 %reason,
658 "header extra data could not be verified; failing block",
659 );
660 return Ok((block, false));
661 }
662
663 if let Err(error) = self
664 .state
665 .executor
666 .canonicalize_head(parent.height(), parent.digest())
667 {
668 tracing::warn!(
669 %error,
670 parent.height = %parent.height(),
671 parent.digest = %parent.digest(),
672 "failed updating canonical head to parent",
673 );
674 }
675
676 let is_good = verify_block(
677 context,
678 round.epoch(),
679 &self.epoch_strategy,
680 self.execution_node
681 .add_ons_handle
682 .beacon_engine_handle
683 .clone(),
684 &block,
685 parent_digest,
686 &self.scheme_provider,
687 )
688 .await
689 .wrap_err("failed verifying block against execution layer")?;
690
691 Ok((block, is_good))
692 }
693}
694
695impl Inner<Uninit> {
696 #[instrument(skip_all, err)]
703 async fn into_initialized(
704 self,
705 dkg_manager: crate::dkg::manager::Mailbox,
706 ) -> eyre::Result<Inner<Init>> {
707 let initialized = Inner {
708 fee_recipient: self.fee_recipient,
709 epoch_strategy: self.epoch_strategy,
710 payload_resolve_time: self.payload_resolve_time,
711 payload_return_time: self.payload_return_time,
712 my_mailbox: self.my_mailbox,
713 marshal: self.marshal,
714 execution_node: self.execution_node,
715 executor: self.executor.clone(),
716 state: Init {
717 latest_proposed_block: Arc::new(RwLock::new(None)),
718 dkg_manager,
719 executor: self.executor.clone(),
720 },
721 subblocks: self.subblocks,
722 scheme_provider: self.scheme_provider,
723 };
724
725 Ok(initialized)
726 }
727}
728
729#[derive(Clone, Debug)]
731pub(in crate::consensus) struct Uninit(());
732
733#[derive(Clone, Debug)]
735struct Init {
736 latest_proposed_block: Arc<RwLock<Option<(Round, Block)>>>,
737 dkg_manager: crate::dkg::manager::Mailbox,
738 executor: crate::executor::Mailbox,
740}
741
742#[instrument(
751 skip_all,
752 fields(
753 %epoch,
754 epoch_length,
755 block.parent_digest = %block.parent_digest(),
756 block.digest = %block.digest(),
757 block.height = %block.height(),
758 block.timestamp = block.timestamp(),
759 parent.digest = %parent_digest,
760 )
761)]
762async fn verify_block<TContext: Pacer>(
763 context: TContext,
764 epoch: Epoch,
765 epoch_strategy: &FixedEpocher,
766 engine: ConsensusEngineHandle<TempoPayloadTypes>,
767 block: &Block,
768 parent_digest: Digest,
769 scheme_provider: &SchemeProvider,
770) -> eyre::Result<bool> {
771 use alloy_rpc_types_engine::PayloadStatusEnum;
772
773 let epoch_info = epoch_strategy
774 .containing(block.height())
775 .expect("epoch strategy is for all heights");
776 if epoch_info.epoch() != epoch {
777 info!("block does not belong to this epoch");
778 return Ok(false);
779 }
780 if block.parent_hash() != *parent_digest {
781 info!(
782 "parent digest stored in block must match the digest of the parent \
783 argument but doesn't"
784 );
785 return Ok(false);
786 }
787
788 let validator_set = scheme_provider.scoped(epoch).map(|scheme| {
800 scheme
801 .participants()
802 .into_iter()
803 .map(|p| B256::from_slice(p))
804 .collect()
805 });
806 let block = block.clone().into_inner();
807 let execution_data = TempoExecutionData {
808 block: Arc::new(block),
809 validator_set,
810 };
811 let payload_status = engine
812 .new_payload(execution_data)
813 .pace(&context, Duration::from_millis(50))
814 .await
815 .wrap_err("failed sending `new payload` message to execution layer to validate block")?;
816 match payload_status.status {
817 PayloadStatusEnum::Valid => Ok(true),
818 PayloadStatusEnum::Invalid { validation_error } => {
819 info!(
820 validation_error,
821 "execution layer returned that the block was invalid"
822 );
823 Ok(false)
824 }
825 PayloadStatusEnum::Accepted => {
826 bail!(
827 "failed validating block because payload was accepted, meaning \
828 that this was not actually executed by the execution layer for some reason"
829 )
830 }
831 PayloadStatusEnum::Syncing => {
832 bail!(
833 "failed validating block because payload is still syncing, \
834 this means the parent block was available to the consensus \
835 layer but not the execution layer"
836 )
837 }
838 }
839}
840
841#[instrument(skip_all, err(Display))]
842async fn verify_header_extra_data(
843 block: &Block,
844 parent: (View, Digest),
845 round: Round,
846 dkg_manager: &crate::dkg::manager::Mailbox,
847 epoch_strategy: &FixedEpocher,
848 proposer: &PublicKey,
849) -> eyre::Result<()> {
850 let epoch_info = epoch_strategy
851 .containing(block.height())
852 .expect("epoch strategy is for all heights");
853 if epoch_info.last() == block.height() {
854 info!(
855 "on last block of epoch; verifying that the boundary block \
856 contains the correct DKG outcome",
857 );
858 let our_outcome = dkg_manager
859 .get_dkg_outcome(parent.1, block.height().saturating_sub(HeightDelta::new(1)))
860 .await
861 .wrap_err(
862 "failed getting public dkg ceremony outcome; cannot verify end \
863 of epoch block",
864 )?;
865 let block_outcome = OnchainDkgOutcome::read(&mut block.header().extra_data().as_ref())
866 .wrap_err(
867 "failed decoding extra data header as DKG ceremony \
868 outcome; cannot verify end of epoch block",
869 )?;
870 if our_outcome != block_outcome {
871 warn!(
873 our.epoch = %our_outcome.epoch,
874 our.players = ?our_outcome.players(),
875 our.next_players = ?our_outcome.next_players(),
876 our.sharing = ?our_outcome.sharing(),
877 our.is_next_full_dkg = ?our_outcome.is_next_full_dkg,
878 block.epoch = %block_outcome.epoch,
879 block.players = ?block_outcome.players(),
880 block.next_players = ?block_outcome.next_players(),
881 block.sharing = ?block_outcome.sharing(),
882 block.is_next_full_dkg = ?block_outcome.is_next_full_dkg,
883 "our public dkg outcome does not match what's stored \
884 in the block",
885 );
886 return Err(eyre!(
887 "our public dkg outcome does not match what's \
888 stored in the block header extra_data field; they must \
889 match so that the end-of-block is valid",
890 ));
891 }
892 } else if !block.header().extra_data().is_empty() {
893 let bytes = block.header().extra_data().to_vec();
894 let dealer = dkg_manager
895 .verify_dealer_log(round.epoch(), bytes)
896 .await
897 .wrap_err("failed request to verify DKG dealing")?;
898 ensure!(
899 &dealer == proposer,
900 "proposer `{proposer}` is not the dealer `{dealer}` of the dealing \
901 in the block",
902 );
903 }
904
905 Ok(())
906}
907
908fn payload_id_from_block_hash(block_hash: &B256) -> PayloadId {
910 PayloadId::new(
911 <[u8; 8]>::try_from(&block_hash[0..8])
912 .expect("a 32 byte array always has more than 8 bytes"),
913 )
914}
915
916#[instrument(skip_all, err)]
921fn report_verification_result(
922 response: oneshot::Sender<bool>,
923 verification_result: &eyre::Result<(Block, bool)>,
924) -> eyre::Result<()> {
925 match &verification_result {
926 Ok((_, is_good)) => {
927 info!(
928 proposal_valid = is_good,
929 "returning proposal verification result to consensus",
930 );
931 response.send(*is_good).map_err(|_| {
932 eyre!(
933 "attempted to send return verification result, but \
934 receiver already dropped the channel"
935 )
936 })?;
937 }
938 Err(error) => {
939 info!(
940 %error,
941 "could not decide proposal, dropping response channel",
942 );
943 }
944 }
945 Ok(())
946}
947
948async fn get_parent(
949 execution_node: &TempoFullNode,
950 round: Round,
951 parent_digest: Digest,
952 parent_view: View,
953 marshal: &crate::alias::marshal::Mailbox,
954) -> eyre::Result<Block> {
955 let genesis_digest = execution_node.chain_spec().genesis_hash();
956 if parent_digest == Digest(genesis_digest) {
957 let genesis_block = Block::from_execution_block(
958 execution_node
959 .provider
960 .block_by_number(0)
961 .map_or_else(
962 |e| Err(eyre::Report::new(e)),
963 |block| block.ok_or_eyre("execution layer did not have block"),
964 )
965 .wrap_err("execution layer did not have the genesis block")?
966 .seal(),
967 );
968 Ok(genesis_block)
969 } else {
970 marshal
971 .subscribe_by_digest(Some(Round::new(round.epoch(), parent_view)), parent_digest)
972 .await
973 .await
974 .map_err(|_| eyre!("syncer dropped channel before the parent block was sent"))
975 }
976}