tempo_commonware_node/dkg/manager/
validators.rs1use std::{collections::HashMap, net::SocketAddr};
2
3use alloy_primitives::Address;
4use commonware_codec::{DecodeExt as _, EncodeSize, RangeCfg, Read, Write, varint::UInt};
5use commonware_consensus::{types::Epoch, utils};
6use commonware_cryptography::ed25519::PublicKey;
7use commonware_utils::set::{Ordered, OrderedAssociated};
8use eyre::{OptionExt as _, WrapErr as _};
9use reth_ethereum::evm::revm::{State, database::StateProviderDatabase};
10use reth_node_builder::{Block as _, ConfigureEvm as _};
11use reth_provider::{
12 BlockHashReader, BlockIdReader as _, BlockReader as _, BlockSource, StateProviderFactory as _,
13};
14use tempo_node::TempoFullNode;
15use tempo_precompiles::{
16 storage::StorageCtx,
17 validator_config::{IValidatorConfig, ValidatorConfig},
18};
19
20use tracing::{Level, info, instrument, warn};
21
22#[instrument(
27 skip_all,
28 fields(
29 attempt = _attempt,
30 for_epoch,
31 from_block = last_height_before_epoch(for_epoch, epoch_length),
32 ),
33 err
34)]
35pub(super) async fn read_from_contract(
36 _attempt: u32,
37 node: &TempoFullNode,
38 for_epoch: Epoch,
39 epoch_length: u64,
40) -> eyre::Result<OrderedAssociated<PublicKey, DecodedValidator>> {
41 let last_height = last_height_before_epoch(for_epoch, epoch_length);
42
43 let block_hash = if let Some(hash) = node
49 .provider
50 .block_hash(last_height)
51 .wrap_err_with(|| format!("failed reading block hash at height `{last_height}`"))?
52 {
53 hash
54 } else if let Some(pending) = node
55 .provider
56 .pending_block_num_hash()
57 .wrap_err("failed reading pending block state")?
58 && pending.number == last_height
59 {
60 pending.hash
61 } else {
62 return Err(eyre::eyre!("block not found at height `{last_height}`"));
63 };
64
65 let block = node
66 .provider
67 .find_block_by_hash(block_hash, BlockSource::Any)
68 .map_err(Into::<eyre::Report>::into)
69 .and_then(|maybe| maybe.ok_or_eyre("execution layer returned empty block"))
70 .wrap_err_with(|| format!("failed reading block with hash `{block_hash}`"))?;
71
72 let db = State::builder()
73 .with_database(StateProviderDatabase::new(
74 node.provider
75 .state_by_block_hash(block_hash)
76 .wrap_err_with(|| {
77 format!("failed to get state from node provider for hash `{block_hash}`")
78 })?,
79 ))
80 .build();
81
82 let raw_validators = {
85 let mut evm = node
86 .evm_config
87 .evm_for_block(db, block.header())
88 .wrap_err("failed instantiating evm for genesis block")?;
89
90 let ctx = evm.ctx_mut();
91 StorageCtx::enter_evm(&mut ctx.journaled_state, &ctx.block, &ctx.cfg, || {
92 let validator_config = ValidatorConfig::new();
93 validator_config
94 .get_validators()
95 .wrap_err("failed to query contract for validator config")
96 })
97 }?;
98
99 info!(?raw_validators, "read validators from contract",);
100
101 Ok(decode_from_contract(raw_validators).await)
102}
103
104#[instrument(skip_all, fields(validators_to_decode = contract_vals.len()))]
105async fn decode_from_contract(
106 contract_vals: Vec<IValidatorConfig::Validator>,
107) -> OrderedAssociated<PublicKey, DecodedValidator> {
108 let mut decoded = HashMap::new();
109 for val in contract_vals.into_iter().filter(|val| val.active) {
110 if let Ok(val) = DecodedValidator::decode_from_contract(val)
113 && let Some(old) = decoded.insert(val.public_key.clone(), val)
114 {
115 warn!(
116 %old,
117 new = %decoded.get(&old.public_key).expect("just inserted it"),
118 "replaced peer because public keys were duplicated",
119 );
120 }
121 }
122 decoded.into_iter().collect::<_>()
123}
124
125#[derive(Clone, Debug)]
133pub(super) struct ValidatorState {
134 dealers: OrderedAssociated<PublicKey, DecodedValidator>,
135 players: OrderedAssociated<PublicKey, DecodedValidator>,
136 syncing_players: OrderedAssociated<PublicKey, DecodedValidator>,
137}
138
139impl ValidatorState {
140 pub(super) fn new(validators: OrderedAssociated<PublicKey, DecodedValidator>) -> Self {
141 Self {
142 dealers: validators.clone(),
143 players: validators.clone(),
144 syncing_players: validators,
145 }
146 }
147
148 pub(super) fn with_unknown_contract_state(
152 validators: OrderedAssociated<PublicKey, SocketAddr>,
153 ) -> Self {
154 let validators = validators
155 .iter_pairs()
156 .map(|(key, addr)| {
157 let key = key.clone();
158 let validator = DecodedValidator {
159 public_key: key.clone(),
160 inbound: *addr,
161 outbound: SocketAddr::from(([0, 0, 0, 0], 0)),
162 index: 0,
163 address: Address::ZERO,
164 };
165 (key, validator)
166 })
167 .collect();
168 Self::new(validators)
169 }
170
171 pub(super) fn dealers(&self) -> &OrderedAssociated<PublicKey, DecodedValidator> {
172 &self.dealers
173 }
174
175 pub(super) fn players(&self) -> &OrderedAssociated<PublicKey, DecodedValidator> {
176 &self.players
177 }
178
179 pub(super) fn syncing_players(&self) -> &OrderedAssociated<PublicKey, DecodedValidator> {
180 &self.syncing_players
181 }
182
183 pub(super) fn dealer_pubkeys(&self) -> Ordered<PublicKey> {
184 self.dealers.keys().clone()
185 }
186
187 pub(super) fn player_pubkeys(&self) -> Ordered<PublicKey> {
188 self.players.keys().clone()
189 }
190
191 pub(super) fn resolve_addresses_and_merge_peers(
201 &self,
202 ) -> OrderedAssociated<PublicKey, SocketAddr> {
203 self.syncing_players()
208 .iter_pairs()
209 .chain(self.players().iter_pairs())
210 .chain(self.dealers().iter_pairs())
211 .map(|(pubkey, validator)| (pubkey.clone(), validator.inbound_socket_addr()))
212 .collect()
213 }
214
215 pub(super) fn push_on_success(
223 &mut self,
224 syncing_players: OrderedAssociated<PublicKey, DecodedValidator>,
225 ) -> OrderedAssociated<PublicKey, DecodedValidator> {
226 let players = std::mem::replace(&mut self.syncing_players, syncing_players);
227 let dealers = std::mem::replace(&mut self.players, players);
228 std::mem::replace(&mut self.dealers, dealers)
229 }
230
231 pub(super) fn push_on_failure(
238 &mut self,
239 syncing_players: OrderedAssociated<PublicKey, DecodedValidator>,
240 ) -> OrderedAssociated<PublicKey, DecodedValidator> {
241 let players = std::mem::replace(&mut self.syncing_players, syncing_players);
242 std::mem::replace(&mut self.players, players)
245 }
246}
247
248impl Write for ValidatorState {
249 fn write(&self, buf: &mut impl bytes::BufMut) {
250 self.dealers().write(buf);
251 self.players().write(buf);
252 self.syncing_players().write(buf);
253 }
254}
255
256impl EncodeSize for ValidatorState {
257 fn encode_size(&self) -> usize {
258 self.dealers().encode_size()
259 + self.players().encode_size()
260 + self.syncing_players().encode_size()
261 }
262}
263
264impl Read for ValidatorState {
265 type Cfg = ();
266
267 fn read_cfg(
268 buf: &mut impl bytes::Buf,
269 _cfg: &Self::Cfg,
270 ) -> Result<Self, commonware_codec::Error> {
271 let dealers = OrderedAssociated::read_cfg(buf, &(RangeCfg::from(0..=usize::MAX), (), ()))?;
274 let players = OrderedAssociated::read_cfg(buf, &(RangeCfg::from(0..=usize::MAX), (), ()))?;
275 let syncing_players =
276 OrderedAssociated::read_cfg(buf, &(RangeCfg::from(0..=usize::MAX), (), ()))?;
277 Ok(Self {
278 dealers,
279 players,
280 syncing_players,
281 })
282 }
283}
284
285#[derive(Clone, Debug, PartialEq, Eq)]
292pub(super) struct DecodedValidator {
293 pub(super) public_key: PublicKey,
298 pub(super) inbound: SocketAddr,
302 pub(super) outbound: SocketAddr,
306 pub(super) index: u64,
310 pub(super) address: Address,
314}
315
316impl DecodedValidator {
317 #[instrument(ret(Display, level = Level::INFO), err(level = Level::WARN))]
323 pub(super) fn decode_from_contract(
324 IValidatorConfig::Validator {
325 publicKey,
326 index,
327 validatorAddress,
328 inboundAddress,
329 outboundAddress,
330 ..
331 }: IValidatorConfig::Validator,
332 ) -> eyre::Result<Self> {
333 let public_key = PublicKey::decode(publicKey.as_ref())
334 .wrap_err("failed decoding publicKey field as ed25519 public key")?;
335 let inbound = inboundAddress
336 .parse()
337 .wrap_err("inboundAddress was not valid")?;
338 let outbound = outboundAddress
339 .parse()
340 .wrap_err("outboundAddress was not valid")?;
341 Ok(Self {
342 public_key,
343 inbound,
344 outbound,
345 index,
346 address: validatorAddress,
347 })
348 }
349
350 fn inbound_socket_addr(&self) -> SocketAddr {
351 self.inbound
352 }
353}
354
355impl std::fmt::Display for DecodedValidator {
356 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
357 f.write_fmt(format_args!(
358 "public key = `{}`, inbound = `{}`, outbound = `{}`, index = `{}`, address = `{}`",
359 self.public_key, self.inbound, self.outbound, self.index, self.address
360 ))
361 }
362}
363
364impl Write for DecodedValidator {
365 fn write(&self, buf: &mut impl bytes::BufMut) {
366 self.public_key.write(buf);
367 self.inbound.to_string().as_bytes().write(buf);
368 self.outbound.to_string().as_bytes().write(buf);
369 UInt(self.index).write(buf);
370 self.address.0.write(buf);
371 }
372}
373
374impl EncodeSize for DecodedValidator {
375 fn encode_size(&self) -> usize {
376 self.public_key.encode_size()
377 + self.inbound.to_string().as_bytes().encode_size()
378 + self.outbound.to_string().as_bytes().encode_size()
379 + UInt(self.index).encode_size()
380 + self.address.0.encode_size()
381 }
382}
383
384impl Read for DecodedValidator {
385 type Cfg = ();
386
387 fn read_cfg(
388 mut buf: &mut impl bytes::Buf,
389 _cfg: &Self::Cfg,
390 ) -> Result<Self, commonware_codec::Error> {
391 let public_key = PublicKey::read_cfg(buf, &())?;
392 let inbound = {
393 let bytes = Vec::<u8>::read_cfg(buf, &(RangeCfg::new(0..=253usize), ()))?;
395 String::from_utf8(bytes).map_err(|_| {
396 commonware_codec::Error::Invalid("decode inbound address", "not utf8")
397 })?
398 }
399 .parse()
400 .map_err(|_| {
401 commonware_codec::Error::Invalid("decode inbound address", "not <ip>:<port>")
402 })?;
403 let outbound = {
404 let bytes = Vec::<u8>::read_cfg(buf, &(RangeCfg::new(0..=253usize), ()))?;
406 String::from_utf8(bytes).map_err(|_| {
407 commonware_codec::Error::Invalid("decode outbound address", "not utf8")
408 })?
409 }
410 .parse()
411 .map_err(|_| {
412 commonware_codec::Error::Invalid("decode outbound address", "not <ip>:<port>")
413 })?;
414 let index = UInt::read_cfg(&mut buf, &())?.into();
415 let address = Address::new(<[u8; 20]>::read_cfg(&mut buf, &())?);
416 Ok(Self {
417 public_key,
418 inbound,
419 outbound,
420 index,
421 address,
422 })
423 }
424}
425
426fn last_height_before_epoch(epoch: Epoch, epoch_length: u64) -> u64 {
427 epoch
428 .checked_sub(1)
429 .map_or(0, |epoch| utils::last_block_in_epoch(epoch_length, epoch))
430}
431
432#[cfg(test)]
433mod tests {
434 use commonware_codec::{DecodeExt as _, Encode as _};
435 use commonware_cryptography::{PrivateKeyExt, Signer, ed25519::PrivateKey};
436
437 use crate::dkg::manager::DecodedValidator;
438
439 #[test]
440 fn roundtrip_decoded_validator() {
441 let private_key = PrivateKey::from_seed(42);
442 let decoded_validator = DecodedValidator {
443 public_key: private_key.public_key(),
444 inbound: "192.168.0.1:1234".parse().unwrap(),
445 outbound: "127.0.0.1:4321".parse().unwrap(),
446 index: 42,
447 address: alloy_primitives::Address::ZERO,
448 };
449 assert_eq!(
450 decoded_validator,
451 DecodedValidator::decode(&mut decoded_validator.encode().freeze()).unwrap()
452 );
453 }
454}