1use std::{
14 sync::{Arc, Mutex},
15 time::{Duration, Instant, SystemTime},
16};
17
18use alloy_consensus::BlockHeader;
19use alloy_primitives::{B256, Bytes};
20use commonware_codec::{Encode as _, EncodeSize as _, ReadExt as _};
21use commonware_consensus::{
22 Heightable as _,
23 simplex::Plan,
24 types::{Epoch, Epocher as _, FixedEpocher, Height, HeightDelta, Round, View},
25};
26use commonware_cryptography::{certificate::Provider as _, ed25519::PublicKey};
27use commonware_macros::select;
28use commonware_p2p::Recipients;
29use commonware_runtime::{
30 ContextCell, FutureExt as _, Handle, Metrics as _, Pacer, Spawner, Storage, spawn_cell,
31};
32use prometheus_client::metrics::counter::Counter;
33
34use commonware_utils::SystemTimeExt;
35use eyre::{OptionExt as _, WrapErr as _, bail, ensure, eyre};
36use futures::{StreamExt as _, channel::mpsc, future::try_join};
37use rand_08::{CryptoRng, Rng};
38use reth_node_builder::{Block as _, ConsensusEngineHandle};
39use reth_primitives_traits::BlockBody as _;
40use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
41use tempo_node::{TempoExecutionData, TempoFullNode, TempoPayloadTypes};
42use tempo_telemetry_util::display_duration;
43
44use reth_provider::{BlockHashReader as _, BlockReader as _, BlockSource};
45use tempo_payload_types::{
46 TempoPayloadAttributes, ValidationLatencyEstimator, ValidationLatencyWorkload,
47 marshal_persist_estimate, observe_marshal_persist,
48};
49use tempo_primitives::TempoConsensusContext;
50use tracing::{Level, debug, info, info_span, instrument, warn};
51
52use super::{
53 Mailbox,
54 ingress::{Broadcast, Genesis, Message, Propose, Verify},
55};
56use crate::{
57 consensus::{Digest, block::Block},
58 epoch::SchemeProvider,
59 subblocks,
60 utils::OptionFuture,
61};
62
63pub(in crate::consensus) struct Actor<TContext, TState = Uninit> {
64 context: ContextCell<TContext>,
65 mailbox: mpsc::Receiver<Message>,
66
67 inner: Inner<TState>,
68}
69
70struct BuildProposalArgs {
71 propose_start: Instant,
72 parent_view: View,
73 parent_digest: Digest,
74 round: Round,
75 leader: PublicKey,
76}
77
78struct ProposalReturn {
79 time: SystemTime,
80 block_size_bytes: usize,
81}
82
83impl<TContext, TState> Actor<TContext, TState> {
84 pub(super) fn mailbox(&self) -> &Mailbox {
85 &self.inner.my_mailbox
86 }
87}
88
89impl<TContext> Actor<TContext, Uninit>
90where
91 TContext: Pacer
92 + governor::clock::Clock
93 + Rng
94 + CryptoRng
95 + Spawner
96 + Storage
97 + commonware_runtime::Metrics,
98{
99 pub(super) async fn init(config: super::Config<TContext>) -> eyre::Result<Self> {
100 let (tx, rx) = mpsc::channel(config.mailbox_size);
101 let my_mailbox = Mailbox::from_sender(tx);
102
103 let metrics = Metrics::init(&config.context);
104
105 Ok(Self {
106 context: ContextCell::new(config.context),
107 mailbox: rx,
108
109 inner: Inner {
110 public_key: config.public_key,
111 epoch_strategy: config.epoch_strategy,
112
113 proposal_return_budget: config.proposal_return_budget,
114
115 my_mailbox,
116 marshal: config.marshal,
117
118 execution_node: config.execution_node,
119 executor: config.executor,
120
121 subblocks: config.subblocks,
122
123 scheme_provider: config.scheme_provider,
124 validation_latency_estimator: Default::default(),
125
126 metrics,
127
128 state: Uninit(()),
129 },
130 })
131 }
132
133 async fn run_until_stopped(self, dkg_manager: crate::dkg::manager::Mailbox) {
135 let Self {
136 context,
137 mailbox,
138 inner,
139 } = self;
140 let Ok(initialized) = inner.into_initialized(dkg_manager).await else {
143 return;
145 };
146
147 Actor {
148 context,
149 mailbox,
150 inner: initialized,
151 }
152 .run_until_stopped()
153 .await
154 }
155
156 pub(in crate::consensus) fn start(
157 mut self,
158 dkg_manager: crate::dkg::manager::Mailbox,
159 ) -> Handle<()> {
160 spawn_cell!(self.context, self.run_until_stopped(dkg_manager))
161 }
162}
163
164impl<TContext> Actor<TContext, Init>
165where
166 TContext: Pacer
167 + governor::clock::Clock
168 + Rng
169 + CryptoRng
170 + Spawner
171 + Storage
172 + commonware_runtime::Metrics,
173{
174 async fn run_until_stopped(mut self) {
175 while let Some(msg) = self.mailbox.next().await {
176 self.handle_message(msg);
177 }
178 }
179
180 fn handle_message(&mut self, msg: Message) {
181 match msg {
182 Message::Broadcast(broadcast) => {
183 self.context.with_label("broadcast").spawn({
184 let inner = self.inner.clone();
185 move |_| inner.handle_broadcast(*broadcast)
186 });
187 }
188 Message::Genesis(genesis) => {
189 self.context.with_label("genesis").spawn({
190 let inner = self.inner.clone();
191 move |context| inner.handle_genesis(genesis, context)
192 });
193 }
194 Message::Propose(propose) => {
195 self.context.with_label("propose").spawn({
196 let inner = self.inner.clone();
197 move |context| inner.handle_propose(*propose, context)
198 });
199 }
200 Message::Verify(verify) => {
201 self.context.with_label("verify").spawn({
202 let inner = self.inner.clone();
203 move |context| inner.handle_verify(*verify, context)
204 });
205 }
206 }
207 }
208}
209
210#[derive(Clone)]
211struct Inner<TState> {
212 public_key: PublicKey,
213 epoch_strategy: FixedEpocher,
214 proposal_return_budget: Duration,
216
217 my_mailbox: Mailbox,
218
219 marshal: crate::alias::marshal::Mailbox,
220
221 execution_node: Arc<TempoFullNode>,
222 executor: crate::executor::Mailbox,
223 subblocks: Option<subblocks::Mailbox>,
224 scheme_provider: SchemeProvider,
225 validation_latency_estimator: Arc<Mutex<ValidationLatencyEstimator>>,
226
227 metrics: Metrics,
228
229 state: TState,
230}
231
232impl Inner<Init> {
233 #[instrument(
234 skip_all,
235 fields(%digest),
236 )]
237 async fn handle_broadcast(self, Broadcast { digest, plan }: Broadcast) {
238 let (round, recipients) = match plan {
239 Plan::Propose { round } => (round, Recipients::All),
240 Plan::Forward { round, recipients } => (round, recipients),
241 };
242 self.marshal.forward(round, digest, recipients).await;
243 }
244
245 #[instrument(
246 skip_all,
247 fields(
248 epoch = %genesis.epoch,
249 ),
250 ret(Display),
251 err(level = Level::ERROR)
252 )]
253 async fn handle_genesis<TContext: commonware_runtime::Clock>(
254 self,
255 mut genesis: Genesis,
256 context: TContext,
257 ) -> eyre::Result<Digest> {
258 let boundary = match genesis.epoch.previous() {
262 None => Height::zero(),
263 Some(previous_epoch) => self
264 .epoch_strategy
265 .last(previous_epoch)
266 .expect("epoch strategy is for all epochs"),
267 };
268
269 let mut attempts = 0;
270 let epoch_genesis = loop {
271 attempts += 1;
272 if let Ok(Some(hash)) = self.execution_node.provider.block_hash(boundary.get()) {
273 break Digest(hash);
274 } else if let Some((_, digest)) = self.marshal.get_info(boundary).await {
275 break digest;
276 } else {
277 info_span!("fetch_genesis_digest").in_scope(|| {
278 info!(
279 boundary.height = %boundary,
280 attempts,
281 "neither marshal actor nor execution layer had the \
282 boundary block of the previous epoch available; \
283 waiting 2s before trying again"
284 );
285 });
286 select!(
287 () = genesis.response.closed() => {
288 return Err(eyre!("genesis request was cancelled"));
289 },
290
291 _ = context.sleep(Duration::from_secs(2)) => {
292 continue;
293 },
294 );
295 }
296 };
297 genesis.response.send(epoch_genesis).map_err(|_| {
298 eyre!("failed returning parent digest for epoch: return channel was already closed")
299 })?;
300 Ok(epoch_genesis)
301 }
302
303 #[instrument(
305 skip_all,
306 fields(
307 epoch = %request.round.epoch(),
308 view = %request.round.view(),
309 parent.view = %request.parent.0,
310 parent.digest = %request.parent.1,
311 ),
312 err(level = Level::WARN),
313 )]
314 async fn handle_propose<TContext: Pacer>(
315 self,
316 request: Propose,
317 context: TContext,
318 ) -> eyre::Result<()> {
319 let Propose {
320 parent: (parent_view, parent_digest),
321 mut response,
322 round,
323 leader,
324 started_at: propose_start,
325 } = request;
326
327 let proposal_digest = {
328 let mut proposal = Box::pin(async {
329 let already_verified = OptionFuture::some(self.marshal.get_verified(round));
351 futures::pin_mut!(already_verified);
352
353 let mut proposal = Box::pin(self.clone().propose(
354 context.clone(),
355 BuildProposalArgs {
356 propose_start,
357 parent_view,
358 parent_digest,
359 round,
360 leader,
361 },
362 ));
363
364 let proposal_result = tokio::select! {
365 biased;
366
367 Some(block) = &mut already_verified => {
368 debug!("skipping proposal: verified block already exists for round on restart");
369 Ok((block, None))
370 },
371
372 res = &mut proposal => {
373 res.wrap_err("failed creating a proposal")
374 },
375 };
376
377 let (block, proposal_return) = if already_verified.is_some()
380 && let Some(block) = already_verified.await
381 {
382 debug!("skipping proposal: verified block already exists for round on restart");
383 (block, None)
384 } else {
385 proposal_result?
386 };
387
388 let digest = block.digest();
389 if let Some(proposal_return) = proposal_return {
390 let persist_start = Instant::now();
391 if !self.marshal.proposed(round, block).await {
392 bail!("marshal actor rejected persisting proposal");
393 }
394 observe_marshal_persist(
395 proposal_return.block_size_bytes,
396 persist_start.elapsed(),
397 );
398
399 context.sleep_until(proposal_return.time).await;
401 }
402
403 eyre::Ok(digest)
404 });
405
406 tokio::select! {
407 () = response.closed() => {
408 return Err(eyre!(
409 "proposal return channel was closed by consensus \
410 engine before block could be proposed; aborting"
411 ))
412 },
413
414 res = &mut proposal => {
415 res?
416 },
417 }
418 };
419
420 info!(
421 proposal.digest = %proposal_digest,
422 "constructed proposal",
423 );
424
425 response.send(proposal_digest).map_err(|_| {
426 eyre!(
427 "failed returning proposal to consensus engine: response \
428 channel was already closed"
429 )
430 })?;
431
432 Ok(())
433 }
434
435 #[instrument(
446 skip_all,
447 fields(
448 epoch = %verify.round.epoch(),
449 view = %verify.round.view(),
450 digest = %verify.payload,
451 parent.view = %verify.parent.0,
452 parent.digest = %verify.parent.1,
453 proposer = %verify.proposer,
454 ),
455 err,
456 )]
457 async fn handle_verify<TContext: Pacer>(
458 self,
459 verify: Verify,
460 context: TContext,
461 ) -> eyre::Result<()> {
462 let Verify {
463 parent,
464 payload,
465 proposer,
466 mut response,
467 round,
468 } = verify;
469 let VerifyResult {
470 result,
471 block,
472 parent,
473 } = select!(
474 () = response.closed() => {
475 Err(eyre!(
476 "verification return channel was closed by consensus \
477 engine before block could be validated; aborting"
478 ))
479 },
480
481 res = self.clone().verify(context, parent, payload, proposer, round) => {
482 res.wrap_err("block verification failed")
483 }
484 )?;
485
486 if response.send(result).is_err() {
487 warn!("received dropped channel before verification result could be returned");
488 }
489 drop((block, parent));
491
492 Ok(())
493 }
494
495 async fn propose<TContext: Pacer>(
496 self,
497 context: TContext,
498 args: BuildProposalArgs,
499 ) -> eyre::Result<(Block, Option<ProposalReturn>)> {
500 let BuildProposalArgs {
501 propose_start,
502 parent_view,
503 parent_digest,
504 round,
505 leader,
506 } = args;
507
508 let parent = subscribe(
509 &self.execution_node,
510 Round::new(round.epoch(), parent_view),
511 parent_digest,
512 &self.marshal,
513 )
514 .await?;
515
516 debug!(height = %parent.height(), "retrieved parent block",);
517
518 let parent_epoch_info = self
519 .epoch_strategy
520 .containing(parent.height())
521 .expect("epoch strategy is for all heights");
522
523 if parent_epoch_info.last() == parent.height() && parent_epoch_info.epoch() == round.epoch()
527 {
528 let parent = if parent.block().header().block_access_list_hash().is_some()
532 && parent.block_access_list().is_none()
533 {
534 self.marshal
535 .subscribe_by_digest(
536 Some(Round::new(round.epoch(), parent_view)),
537 parent_digest,
538 )
539 .await
540 .await
541 .map_err(|_| eyre!("syncer dropped channel before the parent block was sent"))?
542 } else {
543 parent
544 };
545 if !self.marshal.verified(round, parent.clone()).await {
546 bail!("marshal rejected re-proposed boundary block");
547 }
548 info!("parent is last height of epoch; re-proposing parent");
549 return Ok((parent, None));
550 }
551
552 let is_genesis_parent = parent.height().is_zero()
553 || parent_epoch_info.last() == parent.height()
554 && parent_epoch_info.epoch().next() == round.epoch();
555
556 if !is_genesis_parent
564 && verify_block(
565 context.clone(),
566 parent_epoch_info.epoch(),
567 &self.epoch_strategy,
568 self.execution_node
569 .add_ons_handle
570 .beacon_engine_handle
571 .clone(),
572 &parent,
573 parent.parent_digest(),
575 &self.scheme_provider,
576 )
577 .await
578 .wrap_err("failed verifying block against execution layer")?
579 .is_none()
580 {
581 bail!("the proposal parent block is not valid");
582 }
583
584 let extra_data = if parent_epoch_info.last() == parent.height().next()
587 && parent_epoch_info.epoch() == round.epoch()
588 {
589 let outcome = self
591 .state
592 .dkg_manager
593 .get_dkg_outcome(parent_digest, parent.height())
594 .await
595 .wrap_err("failed getting public dkg ceremony outcome")?;
596 ensure!(
597 round.epoch().next() == outcome.epoch,
598 "outcome is for epoch `{}`, but we are trying to include the \
599 outcome for epoch `{}`",
600 outcome.epoch,
601 round.epoch().next(),
602 );
603 info!(
604 %outcome.epoch,
605 outcome.network_identity = %outcome.network_identity(),
606 outcome.dealers = ?outcome.dealers(),
607 outcome.players = ?outcome.players(),
608 outcome.next_players = ?outcome.next_players(),
609 "received DKG outcome; will include in payload builder attributes",
610 );
611 outcome.encode().into()
612 } else {
613 match self.state.dkg_manager.get_dealer_log(round.epoch()).await {
615 Err(error) => {
616 warn!(
617 %error,
618 "failed getting signed dealer log for current epoch \
619 because actor dropped response channel",
620 );
621 Bytes::default()
622 }
623 Ok(None) => Bytes::default(),
624 Ok(Some(log)) => {
625 info!(
626 "received signed dealer log; will include in payload \
627 builder attributes"
628 );
629 log.encode().into()
630 }
631 }
632 };
633
634 let mut epoch_millis = context.current().epoch_millis();
639 if epoch_millis <= parent.timestamp_millis() {
640 self.metrics.parent_ahead_of_local_time.inc();
641 epoch_millis = parent.timestamp_millis() + 1
642 };
643
644 let (timestamp, timestamp_millis_part) = (epoch_millis / 1000, epoch_millis % 1000);
645
646 let consensus_context = Some(TempoConsensusContext {
647 epoch: round.epoch().get(),
648 view: round.view().get(),
649 parent_view: parent_view.get(),
650 proposer: crate::utils::public_key_to_tempo_primitive(&leader),
651 });
652
653 let parent_hash = parent.block_hash();
654 let proposer_public_key = crate::utils::public_key_to_b256(&self.public_key);
655 let marshal_persist = marshal_persist_estimate();
656 let build_budget = self
660 .proposal_return_budget
661 .saturating_sub(propose_start.elapsed());
662 let validation_latency_estimate = self
663 .validation_latency_estimator
664 .lock()
665 .ok()
666 .and_then(|estimator| estimator.estimate());
667 let attrs = TempoPayloadAttributes::new(
668 Some(proposer_public_key),
669 timestamp,
670 timestamp_millis_part,
671 extra_data,
672 consensus_context,
673 move || {
674 self.subblocks
675 .as_ref()
676 .and_then(|s| s.get_subblocks(parent_hash).ok())
677 .unwrap_or_default()
678 },
679 )
680 .with_payload_build_budget(build_budget)
681 .with_validation_latency_estimate(validation_latency_estimate);
682
683 let payload_build_start = Instant::now();
688 let payload = self
689 .state
690 .executor
691 .canonicalize_and_build(parent.height(), parent.digest(), attrs)?
692 .await
693 .wrap_err(
694 "executor dropped the payload channel: the build failed (the \
695 executor logs the cause) or the executor shut down",
696 )?;
697
698 let payload_build_elapsed = payload_build_start.elapsed();
699 let payload_validation_work_elapsed = payload.validation_work_duration();
700 let validation_latency_elapsed = payload.validation_latency_duration();
701 let (block, block_access_list) = payload.into_execution_payload();
702 let execution_block_rlp_size_bytes = block.rlp_length();
703 let proposal = Block::from_execution_block_with_encoded_size(
704 block,
705 block_access_list,
706 execution_block_rlp_size_bytes,
707 )
708 .wrap_err("payload builder produced an invalid block access list")?;
709 let consensus_block_size_bytes = proposal.encode_size();
710 let validator_marshal_persist = marshal_persist.estimate(consensus_block_size_bytes);
711 let proposal_elapsed = propose_start.elapsed();
712 let return_delay = self
716 .proposal_return_budget
717 .saturating_sub(proposal_elapsed)
718 .saturating_sub(validation_latency_elapsed)
719 .saturating_sub(validator_marshal_persist);
720 debug!(
721 proposal_elapsed = %display_duration(proposal_elapsed),
722 build_time = %display_duration(payload_build_elapsed),
723 payload_validation_work = %display_duration(payload_validation_work_elapsed),
724 validation_latency_time = %display_duration(validation_latency_elapsed),
725 validator_marshal_persist = %display_duration(validator_marshal_persist),
726 return_time = %display_duration(return_delay),
727 execution_block_rlp_size_bytes,
728 consensus_block_size_bytes,
729 "sleeping before returning proposal"
730 );
731 let proposal_return_time = context.current() + return_delay;
732
733 Ok((
734 proposal,
735 Some(ProposalReturn {
736 time: proposal_return_time,
737 block_size_bytes: consensus_block_size_bytes,
738 }),
739 ))
740 }
741
742 async fn verify<TContext: Pacer>(
743 self,
744 context: TContext,
745 (parent_view, parent_digest): (View, Digest),
746 payload: Digest,
747 proposer: PublicKey,
748 round: Round,
749 ) -> eyre::Result<VerifyResult> {
750 let (block, parent) = try_join(
751 subscribe(&self.execution_node, round, payload, &self.marshal),
752 subscribe(
753 &self.execution_node,
754 Round::new(round.epoch(), parent_view),
755 parent_digest,
756 &self.marshal,
757 ),
758 )
759 .await
760 .wrap_err("failed getting required blocks")?;
761
762 if payload == parent_digest {
769 let epoch_info = self
770 .epoch_strategy
771 .containing(block.height())
772 .expect("epoch strategy is for all heights");
773 if epoch_info.last() == block.height() && epoch_info.epoch() == round.epoch() {
774 if !self.marshal.verified(round, block).await {
775 bail!("marshal actor refused to persist verified re-proposed block");
776 }
777 return Ok(VerifyResult {
778 result: true,
779 block: None,
780 parent: Some(parent),
781 });
782 } else {
783 return Ok(VerifyResult {
784 result: false,
785 block: Some(block),
786 parent: Some(parent),
787 });
788 }
789 }
790
791 if let Err(reason) = verify_header(
792 &block,
793 (parent_view, parent_digest),
794 round,
795 &self.state.dkg_manager,
796 &self.epoch_strategy,
797 &proposer,
798 )
799 .await
800 {
801 warn!(%reason, "header could not be verified; failing block");
802 return Ok(VerifyResult {
803 result: false,
804 block: Some(block),
805 parent: Some(parent),
806 });
807 }
808
809 if let Err(error) = self
810 .state
811 .executor
812 .canonicalize_head(parent.height(), parent.digest())
813 .await
814 {
815 tracing::warn!(
816 %error,
817 parent.height = %parent.height(),
818 parent.digest = %parent.digest(),
819 "failed updating canonical head to parent; trying to go on",
820 );
821 }
822
823 let validation_duration = verify_block(
824 context,
825 round.epoch(),
826 &self.epoch_strategy,
827 self.execution_node
828 .add_ons_handle
829 .beacon_engine_handle
830 .clone(),
831 &block,
832 parent_digest,
833 &self.scheme_provider,
834 )
835 .await
836 .wrap_err("failed verifying block against execution layer")?;
837 if let Some(duration) = validation_duration
838 && let Ok(mut estimator) = self.validation_latency_estimator.lock()
839 {
840 estimator.observe(
841 block.height().get(),
842 ValidationLatencyWorkload::new(
843 block.block().gas_used(),
844 block.block().body().transaction_count(),
845 ),
846 duration,
847 );
848 }
849 let is_good = validation_duration.is_some();
850
851 let block_height = block.height();
852 let block_digest = block.digest();
853
854 if is_good {
855 if !self.marshal.verified(round, block).await {
857 bail!("marshal actor refused to persist verified block");
858 }
859
860 self.state
862 .executor
863 .canonicalize_head(block_height, block_digest)
864 .await
865 .wrap_err("failed making the verified proposal the head of the canonical chain")?;
866
867 return Ok(VerifyResult {
868 result: true,
869 block: None,
870 parent: Some(parent),
871 });
872 }
873
874 Ok(VerifyResult {
875 result: false,
876 block: Some(block),
877 parent: Some(parent),
878 })
879 }
880}
881
882impl Inner<Uninit> {
883 #[instrument(skip_all, err)]
890 async fn into_initialized(
891 self,
892 dkg_manager: crate::dkg::manager::Mailbox,
893 ) -> eyre::Result<Inner<Init>> {
894 let initialized = Inner {
895 public_key: self.public_key,
896 epoch_strategy: self.epoch_strategy,
897 proposal_return_budget: self.proposal_return_budget,
898 my_mailbox: self.my_mailbox,
899 marshal: self.marshal,
900 execution_node: self.execution_node,
901 executor: self.executor.clone(),
902 state: Init {
903 dkg_manager,
904 executor: self.executor.clone(),
905 },
906 subblocks: self.subblocks,
907 scheme_provider: self.scheme_provider,
908 validation_latency_estimator: self.validation_latency_estimator,
909 metrics: self.metrics,
910 };
911
912 Ok(initialized)
913 }
914}
915
916#[derive(Clone, Debug)]
918pub(in crate::consensus) struct Uninit(());
919
920#[derive(Clone, Debug)]
922struct Init {
923 dkg_manager: crate::dkg::manager::Mailbox,
924 executor: crate::executor::Mailbox,
926}
927
928struct VerifyResult {
929 result: bool,
934 block: Option<Block>,
936 parent: Option<Block>,
938}
939
940#[instrument(
950 skip_all,
951 fields(
952 %epoch,
953 epoch_length,
954 block.parent_digest = %block.parent_digest(),
955 block.digest = %block.digest(),
956 block.height = %block.height(),
957 block.timestamp = block.timestamp(),
958 parent.digest = %parent_digest,
959 )
960)]
961async fn verify_block<TContext: Pacer>(
962 context: TContext,
963 epoch: Epoch,
964 epoch_strategy: &FixedEpocher,
965 engine: ConsensusEngineHandle<TempoPayloadTypes>,
966 block: &Block,
967 parent_digest: Digest,
968 scheme_provider: &SchemeProvider,
969) -> eyre::Result<Option<Duration>> {
970 use alloy_rpc_types_engine::PayloadStatusEnum;
971
972 let epoch_info = epoch_strategy
973 .containing(block.height())
974 .expect("epoch strategy is for all heights");
975 if epoch_info.epoch() != epoch {
976 info!("block does not belong to this epoch");
977 return Ok(None);
978 }
979 if block.parent_hash() != *parent_digest {
980 info!(
981 "parent digest stored in block must match the digest of the parent \
982 argument but doesn't"
983 );
984 return Ok(None);
985 }
986
987 let scheme = scheme_provider
989 .scoped(epoch)
990 .ok_or_eyre("cannot determine participants in the current epoch")?;
991
992 let validator_set = Some(
993 scheme
994 .participants()
995 .into_iter()
996 .map(|p| B256::from_slice(p))
997 .collect(),
998 );
999 let (block, block_access_list) = block.clone().into_parts();
1000 let execution_data = TempoExecutionData {
1001 block: Arc::new(block),
1002 block_access_list,
1003 validator_set,
1004 };
1005 let validation_start = Instant::now();
1006 let payload_status = engine
1007 .new_payload(execution_data)
1008 .pace(&context, Duration::from_millis(50))
1009 .await
1010 .wrap_err("failed sending `new payload` message to execution layer to validate block")?;
1011 match payload_status.status {
1012 PayloadStatusEnum::Valid => Ok(Some(validation_start.elapsed())),
1013 PayloadStatusEnum::Invalid { validation_error } => {
1014 info!(
1015 validation_error,
1016 "execution layer returned that the block was invalid"
1017 );
1018 Ok(None)
1019 }
1020 PayloadStatusEnum::Accepted => {
1021 bail!(
1022 "failed validating block because payload was accepted, meaning \
1023 that this was not actually executed by the execution layer for some reason"
1024 )
1025 }
1026 PayloadStatusEnum::Syncing => {
1027 bail!(
1028 "failed validating block because payload is still syncing, \
1029 this means the parent block was available to the consensus \
1030 layer but not the execution layer"
1031 )
1032 }
1033 }
1034}
1035
1036#[instrument(skip_all, err(Display))]
1037async fn verify_header(
1038 block: &Block,
1039 parent: (View, Digest),
1040 round: Round,
1041 dkg_manager: &crate::dkg::manager::Mailbox,
1042 epoch_strategy: &FixedEpocher,
1043 proposer: &PublicKey,
1044) -> eyre::Result<()> {
1045 let epoch_info = epoch_strategy
1046 .containing(block.height())
1047 .expect("epoch strategy is for all heights");
1048
1049 let ctx = block
1050 .header()
1051 .consensus_context
1052 .ok_or_eyre("missing consensus context")?;
1053
1054 let expected_ctx = TempoConsensusContext {
1055 epoch: round.epoch().get(),
1056 view: round.view().get(),
1057 parent_view: parent.0.get(),
1058 proposer: crate::utils::public_key_to_tempo_primitive(proposer),
1059 };
1060
1061 ensure!(
1062 ctx == expected_ctx,
1063 "mismatch in consensus context for block `{}`. expected `{expected_ctx:?}`. got `{ctx:?}`",
1064 block.digest()
1065 );
1066
1067 if epoch_info.last() == block.height() {
1068 info!(
1069 "on last block of epoch; verifying that the boundary block \
1070 contains the correct DKG outcome",
1071 );
1072 let our_outcome = dkg_manager
1073 .get_dkg_outcome(parent.1, block.height().saturating_sub(HeightDelta::new(1)))
1074 .await
1075 .wrap_err(
1076 "failed getting public dkg ceremony outcome; cannot verify end \
1077 of epoch block",
1078 )?;
1079 let block_outcome = OnchainDkgOutcome::read(&mut block.header().extra_data().as_ref())
1080 .wrap_err(
1081 "failed decoding extra data header as DKG ceremony \
1082 outcome; cannot verify end of epoch block",
1083 )?;
1084 if our_outcome != block_outcome {
1085 warn!(
1087 our.epoch = %our_outcome.epoch,
1088 our.players = ?our_outcome.players(),
1089 our.next_players = ?our_outcome.next_players(),
1090 our.sharing = ?our_outcome.sharing(),
1091 our.is_next_full_dkg = ?our_outcome.is_next_full_dkg,
1092 block.epoch = %block_outcome.epoch,
1093 block.players = ?block_outcome.players(),
1094 block.next_players = ?block_outcome.next_players(),
1095 block.sharing = ?block_outcome.sharing(),
1096 block.is_next_full_dkg = ?block_outcome.is_next_full_dkg,
1097 "our public dkg outcome does not match what's stored \
1098 in the block",
1099 );
1100 return Err(eyre!(
1101 "our public dkg outcome does not match what's \
1102 stored in the block header extra_data field; they must \
1103 match so that the end-of-block is valid",
1104 ));
1105 }
1106 } else if !block.header().extra_data().is_empty() {
1107 let bytes = block.header().extra_data().to_vec();
1108 let dealer = dkg_manager
1109 .verify_dealer_log(round.epoch(), bytes)
1110 .await
1111 .wrap_err("failed request to verify DKG dealing")?;
1112 ensure!(
1113 &dealer == proposer,
1114 "proposer `{proposer}` is not the dealer `{dealer}` of the dealing \
1115 in the block",
1116 );
1117 }
1118
1119 Ok(())
1120}
1121
1122#[instrument(skip_all, fields(%round, %digest), err, ret(Display))]
1124async fn subscribe(
1125 execution_node: &TempoFullNode,
1126 round: Round,
1127 digest: Digest,
1128 marshal: &crate::alias::marshal::Mailbox,
1129) -> eyre::Result<Block> {
1130 let block = if let Some(block) = execution_node
1131 .provider
1132 .find_block_by_hash(digest.0, BlockSource::Any)
1133 .wrap_err_with(|| format!("failed querying execution layer for parent block `{digest}`"))?
1134 {
1135 Block::from_execution_block_unchecked(block.seal(), None)
1137 } else {
1138 marshal
1139 .subscribe_by_digest(Some(round), digest)
1140 .await
1141 .await
1142 .map_err(|_| eyre!("syncer dropped channel before the parent block was sent"))?
1143 };
1144 Ok(block)
1145}
1146
1147#[derive(Clone)]
1148struct Metrics {
1149 parent_ahead_of_local_time: Counter,
1150}
1151
1152impl Metrics {
1153 fn init<TContext>(context: &TContext) -> Self
1154 where
1155 TContext: commonware_runtime::Metrics,
1156 {
1157 let parent_ahead_of_local_time = Counter::default();
1158 context.register(
1159 "parent_ahead_of_local_time",
1160 "number of times the parent block timestamp was ahead of local time",
1161 parent_ahead_of_local_time.clone(),
1162 );
1163
1164 Self {
1165 parent_ahead_of_local_time,
1166 }
1167 }
1168}