tempo_commonware_node/dkg/manager/actor/
pre_allegretto.rs1use std::net::SocketAddr;
2
3use commonware_codec::{EncodeSize, RangeCfg, Read, ReadExt as _, Write, varint::UInt};
4use commonware_consensus::{Block as _, Reporter as _, types::Epoch, utils};
5use commonware_cryptography::{
6 bls12381::primitives::{group::Share, poly::Public, variant::MinSig},
7 ed25519::PublicKey,
8};
9use commonware_p2p::{Receiver, Sender, utils::mux::MuxHandle};
10use commonware_runtime::{Clock, ContextCell, Metrics, Spawner, Storage};
11use commonware_storage::metadata::Metadata;
12use commonware_utils::{
13 quorum,
14 sequence::U64,
15 set::{Ordered, OrderedAssociated},
16};
17use eyre::{OptionExt as _, WrapErr as _};
18use rand_core::CryptoRngCore;
19use tempo_chainspec::hardfork::TempoHardforks;
20use tempo_dkg_onchain_artifacts::PublicOutcome;
21use tracing::{Span, info, instrument, warn};
22
23use crate::{
24 consensus::block::Block,
25 dkg::{
26 HardforkRegime,
27 ceremony::{self, Ceremony},
28 manager::validators::ValidatorState,
29 },
30 epoch,
31};
32
33const CURRENT_EPOCH_KEY: U64 = U64::new(0);
34const PREVIOUS_EPOCH_KEY: U64 = U64::new(1);
35
36impl<TContext, TPeerManager> super::Actor<TContext, TPeerManager>
37where
38 TContext: Clock + CryptoRngCore + commonware_runtime::Metrics + Spawner + Storage,
39 TPeerManager: commonware_p2p::Manager<
40 PublicKey = PublicKey,
41 Peers = OrderedAssociated<PublicKey, SocketAddr>,
42 >,
43{
44 #[instrument(skip_all, err)]
53 pub(super) async fn pre_allegretto_init(&mut self) -> eyre::Result<()> {
54 if !self.post_allegretto_metadatas.exists() {
55 let spec = self.config.execution_node.chain_spec();
56 let public_polynomial = spec
57 .info
58 .public_polynomial()
59 .clone()
60 .ok_or_eyre("chainspec did not contain publicPolynomial; cannot go on without it")?
61 .into_inner();
62
63 let validators = spec
64 .info
65 .validators()
66 .clone()
67 .ok_or_eyre("chainspec did not contain validators; cannot go on without them")?
68 .into_inner();
69
70 if self
71 .pre_allegretto_metadatas
72 .epoch_metadata
73 .get(&CURRENT_EPOCH_KEY)
74 .is_none()
75 {
76 self.pre_allegretto_metadatas
77 .epoch_metadata
78 .put_sync(
79 CURRENT_EPOCH_KEY,
80 EpochState {
81 epoch: 0,
82 participants: validators.keys().clone(),
83 public: public_polynomial,
84 share: self.config.initial_share.clone(),
85 },
86 )
87 .await
88 .expect("must always be able to persists state");
89 }
90
91 let current_epoch = self
96 .pre_allegretto_metadatas
97 .epoch_metadata
98 .get(&CURRENT_EPOCH_KEY)
99 .expect("we ensured above that the epoch state is initialized")
100 .epoch();
101 self.validators_metadata
102 .put_sync(
103 current_epoch.saturating_sub(1).into(),
108 ValidatorState::with_unknown_contract_state(validators.clone()),
109 )
110 .await
111 .expect("must always be able to write state");
112 }
113 Ok(())
114 }
115
116 #[instrument(
135 parent = &cause,
136 skip_all,
137 fields(
138 block.derived_epoch = utils::epoch(self.config.epoch_length, block.height()),
139 block.height = block.height(),
140 block.timestamp = block.timestamp(),
141 latest_epoch = self.current_epoch_state().epoch(),
142 ),
143 )]
144 pub(super) async fn handle_finalized_pre_allegretto<TReceiver, TSender>(
145 &mut self,
146 cause: Span,
147 block: Block,
148 maybe_ceremony: &mut Option<Ceremony<ContextCell<TContext>, TReceiver, TSender>>,
149 ceremony_mux: &mut MuxHandle<TSender, TReceiver>,
150 ) where
151 TReceiver: Receiver<PublicKey = PublicKey>,
152 TSender: Sender<PublicKey = PublicKey>,
153 {
154 if let Some(block_epoch) =
159 utils::is_last_block_in_epoch(self.config.epoch_length, block.height())
160 {
161 let epoch_state = self.current_epoch_state();
162 if block_epoch + 1 == epoch_state.epoch() {
163 if self
164 .config
165 .execution_node
166 .chain_spec()
167 .is_allegretto_active_at_timestamp(block.timestamp())
168 {
169 info!(
170 "block timestamp is after allegretto hardfork; attempting \
171 to transition to dynamic validator sets by reading validators \
172 from smart contract",
173 );
174 match self
175 .transition_to_dynamic_validator_sets(ceremony_mux)
176 .await
177 {
178 Ok(ceremony) => {
179 maybe_ceremony.replace(ceremony);
180 info!(
181 "transitioning to dynamic validator sets was successful; \
182 deleting current pre-allegretto epoch state and leaving \
183 DKG logic to the post-hardfork routines",
184 );
185 self.pre_allegretto_metadatas
186 .delete_current_epoch_state()
187 .await;
188 return;
189 }
190 Err(error) => {
191 self.metrics.failed_allegretto_transitions.inc();
192 warn!(
193 %error,
194 "transitioning to dynamic validator sets was not \
195 successful; will attempt again next epoch"
196 );
197 }
198 }
199 }
200
201 if maybe_ceremony.is_none()
209 || maybe_ceremony
210 .as_ref()
211 .is_some_and(|ceremony| ceremony.epoch() != epoch_state.epoch())
212 {
213 maybe_ceremony.replace(self.start_pre_allegretto_ceremony(ceremony_mux).await);
214 }
215 self.register_current_epoch_state().await;
216 } else {
217 warn!(
218 "block was a boundary block, but not the last block of the \
219 previous epoch; ignoring it"
220 );
221 }
222 return;
223 }
224
225 if block.height().is_multiple_of(self.config.epoch_length)
230 && let Some(old_epoch_state) = self
231 .pre_allegretto_metadatas
232 .epoch_metadata
233 .remove(&PREVIOUS_EPOCH_KEY)
234 {
235 self.config
236 .epoch_manager
237 .report(
238 epoch::Exit {
239 epoch: old_epoch_state.epoch,
240 }
241 .into(),
242 )
243 .await;
244 self.pre_allegretto_metadatas
245 .epoch_metadata
246 .sync()
247 .await
248 .expect("must always be able to persist state");
249 }
250
251 let mut ceremony = maybe_ceremony
252 .take()
253 .expect("a ceremony must always exist except for the last block");
254
255 match epoch::relative_position(block.height(), self.config.epoch_length) {
256 epoch::RelativePosition::FirstHalf => {
257 let _ = ceremony.distribute_shares().await;
258 let _ = ceremony.process_messages().await;
259 }
260 epoch::RelativePosition::Middle => {
261 let _ = ceremony.process_messages().await;
262 let _ = ceremony
263 .construct_intermediate_outcome(HardforkRegime::PreAllegretto)
264 .await;
265 }
266 epoch::RelativePosition::SecondHalf => {
267 let _ = ceremony
268 .process_dealings_in_block(&block, HardforkRegime::PreAllegretto)
269 .await;
270 }
271 }
272
273 let is_one_before_boundary =
274 utils::is_last_block_in_epoch(self.config.epoch_length, block.height() + 1).is_some();
275
276 if !is_one_before_boundary {
277 assert!(
278 maybe_ceremony.replace(ceremony).is_none(),
279 "ceremony was taken out of the option and is being put back"
280 );
281 return;
282 }
283
284 info!("on pre-to-last height of epoch; finalizing ceremony");
288
289 let next_epoch = ceremony.epoch() + 1;
290
291 let ceremony_outcome = match ceremony.finalize() {
292 Ok(outcome) => {
293 self.metrics.ceremony.one_more_success();
294 info!(
295 "ceremony was successful; using the new participants, polynomial and secret key"
296 );
297 outcome
298 }
299 Err(outcome) => {
300 self.metrics.ceremony.one_more_failure();
301 warn!(
302 "ceremony was a failure; using the old participants, polynomial and secret key"
303 );
304 outcome
305 }
306 };
307 let (public, share) = ceremony_outcome.role.into_key_pair();
308
309 let old_epoch_state = self
310 .pre_allegretto_metadatas
311 .epoch_metadata
312 .remove(&CURRENT_EPOCH_KEY)
313 .expect("there must always be a current epoch state");
314
315 self.pre_allegretto_metadatas
316 .epoch_metadata
317 .put(PREVIOUS_EPOCH_KEY, old_epoch_state);
318
319 let new_epoch_state = EpochState {
320 epoch: next_epoch,
321 participants: ceremony_outcome.participants,
322 public,
323 share,
324 };
325 self.pre_allegretto_metadatas
326 .epoch_metadata
327 .put(CURRENT_EPOCH_KEY, new_epoch_state.clone());
328
329 self.pre_allegretto_metadatas
330 .epoch_metadata
331 .sync()
332 .await
333 .expect("must always be able to write epoch state to disk");
334
335 if let Some(epoch) = new_epoch_state.epoch.checked_sub(2) {
337 let mut ceremony_metadata = self.ceremony_metadata.lock().await;
338 ceremony_metadata.remove(&epoch.into());
339 ceremony_metadata.sync().await.expect("metadata must sync");
340 }
341 }
342
343 #[instrument(skip_all, fields(epoch = self.pre_allegretto_metadatas.current_epoch_state().unwrap().epoch()))]
344 pub(super) async fn start_pre_allegretto_ceremony<TReceiver, TSender>(
345 &mut self,
346 mux: &mut MuxHandle<TSender, TReceiver>,
347 ) -> Ceremony<ContextCell<TContext>, TReceiver, TSender>
348 where
349 TReceiver: Receiver<PublicKey = PublicKey>,
350 TSender: Sender<PublicKey = PublicKey>,
351 {
352 let epoch_state = self
353 .pre_allegretto_metadatas
354 .epoch_metadata
355 .get(&CURRENT_EPOCH_KEY)
356 .expect("the epoch state must always during the lifetime of the actor");
357 let config = ceremony::Config {
358 namespace: self.config.namespace.clone(),
359 me: self.config.me.clone(),
360 public: epoch_state.public.clone(),
361 share: epoch_state.share.clone(),
362 epoch: epoch_state.epoch,
363 dealers: epoch_state.participants.clone(),
364 players: epoch_state.participants.clone(),
365 };
366
367 let ceremony = ceremony::Ceremony::init(
368 &mut self.context,
369 mux,
370 self.ceremony_metadata.clone(),
371 config,
372 self.metrics.ceremony.clone(),
373 )
374 .await
375 .expect("must always be able to initialize ceremony");
376
377 info!(
378 us = %self.config.me,
379 n_dealers = ceremony.dealers().len(),
380 dealers = ?ceremony.dealers(),
381 n_players = ceremony.players().len(),
382 players = ?ceremony.players(),
383 as_player = ceremony.is_player(),
384 as_dealer = ceremony.is_dealer(),
385 "started a ceremony",
386 );
387
388 self.metrics.pre_allegretto_ceremonies.inc();
389 ceremony
390 }
391
392 async fn transition_to_dynamic_validator_sets<TReceiver, TSender>(
393 &mut self,
394 mux: &mut MuxHandle<TSender, TReceiver>,
395 ) -> eyre::Result<Ceremony<ContextCell<TContext>, TReceiver, TSender>>
396 where
397 TReceiver: Receiver<PublicKey = PublicKey>,
398 TSender: Sender<PublicKey = PublicKey>,
399 {
400 let epoch_state = self
401 .pre_allegretto_metadatas
402 .epoch_metadata
403 .get(&CURRENT_EPOCH_KEY)
404 .cloned()
405 .expect(
406 "when transitioning from pre-allegretto static validator sets to \
407 post-allegretto dynamic validator sets the pre-allegretto epoch \
408 state must exist",
409 );
410
411 self.transition_from_static_validator_sets(epoch_state, mux)
412 .await
413 .wrap_err("hand-over to post-allegretto dynamic validator set logic failed")
414 }
415}
416
417pub(super) struct Metadatas<TContext>
418where
419 TContext: Clock + Metrics + Storage,
420{
421 epoch_metadata: Metadata<TContext, U64, EpochState>,
424}
425
426impl<TContext> Metadatas<TContext>
427where
428 TContext: Clock + Metrics + Storage,
429{
430 pub(super) async fn init(context: &TContext, partition_prefix: &str) -> Self
431 where
432 TContext: Metrics,
433 {
434 let epoch_metadata = Metadata::init(
435 context.with_label("post_allegretto_epoch_metadata"),
436 commonware_storage::metadata::Config {
437 partition: format!("{partition_prefix}_current_epoch"),
440 codec_config: (),
441 },
442 )
443 .await
444 .expect("must be able to initialize metadata on disk to function");
445
446 Self { epoch_metadata }
447 }
448
449 pub(super) fn dkg_outcome(&self) -> Option<PublicOutcome> {
450 let epoch_state = self.current_epoch_state()?;
451 Some(PublicOutcome {
452 epoch: epoch_state.epoch(),
453 participants: epoch_state.participants().clone(),
454 public: epoch_state.public_polynomial().clone(),
455 })
456 }
457
458 pub(super) fn previous_epoch_state(&self) -> Option<&EpochState> {
459 self.epoch_metadata.get(&PREVIOUS_EPOCH_KEY)
460 }
461
462 pub(super) fn current_epoch_state(&self) -> Option<&EpochState> {
463 self.epoch_metadata.get(&CURRENT_EPOCH_KEY)
464 }
465
466 async fn delete_current_epoch_state(&mut self) -> Option<EpochState> {
471 let current_state = self.epoch_metadata.remove(&CURRENT_EPOCH_KEY);
472 self.epoch_metadata
473 .sync()
474 .await
475 .expect("must always be able to sync state to disk");
476 current_state
477 }
478
479 pub(super) async fn delete_previous_epoch_state(&mut self) -> Option<EpochState> {
480 let previous_state = self.epoch_metadata.remove(&PREVIOUS_EPOCH_KEY);
481 self.epoch_metadata
482 .sync()
483 .await
484 .expect("must always be able to sync state to disk");
485 previous_state
486 }
487}
488
489#[derive(Clone)]
491pub(super) struct EpochState {
492 epoch: Epoch,
493 participants: Ordered<PublicKey>,
494 public: Public<MinSig>,
495 share: Option<Share>,
496}
497
498impl std::fmt::Debug for EpochState {
499 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
500 f.debug_struct("EpochState")
501 .field("epoch", &self.epoch)
502 .field("participants", &self.participants)
503 .field("public", &self.public)
504 .field("share", &self.share.as_ref().map(|_| "<private share>"))
505 .finish()
506 }
507}
508
509impl EpochState {
510 pub(super) fn epoch(&self) -> Epoch {
511 self.epoch
512 }
513
514 pub(super) fn participants(&self) -> &Ordered<PublicKey> {
515 &self.participants
516 }
517
518 pub(super) fn public_polynomial(&self) -> &Public<MinSig> {
519 &self.public
520 }
521
522 pub(super) fn private_share(&self) -> &Option<Share> {
523 &self.share
524 }
525}
526
527impl Write for EpochState {
528 fn write(&self, buf: &mut impl bytes::BufMut) {
529 UInt(self.epoch).write(buf);
530 self.participants.write(buf);
531 self.public.write(buf);
532 self.share.write(buf);
533 }
534}
535
536impl EncodeSize for EpochState {
537 fn encode_size(&self) -> usize {
538 UInt(self.epoch).encode_size()
539 + self.participants.encode_size()
540 + self.public.encode_size()
541 + self.share.encode_size()
542 }
543}
544
545impl Read for EpochState {
546 type Cfg = ();
547
548 fn read_cfg(
549 buf: &mut impl bytes::Buf,
550 _cfg: &Self::Cfg,
551 ) -> Result<Self, commonware_codec::Error> {
552 let epoch = UInt::read(buf)?.into();
553 let participants = Ordered::read_cfg(buf, &(RangeCfg::from(0..=usize::MAX), ()))?;
554 let public =
555 Public::<MinSig>::read_cfg(buf, &(quorum(participants.len() as u32) as usize))?;
556 let share = Option::<Share>::read_cfg(buf, &())?;
557 Ok(Self {
558 epoch,
559 participants,
560 public,
561 share,
562 })
563 }
564}