tempo_commonware_node/dkg/manager/
validators.rs

1use std::{collections::HashMap, net::SocketAddr};
2
3use alloy_primitives::Address;
4use commonware_codec::{DecodeExt as _, EncodeSize, RangeCfg, Read, Write, varint::UInt};
5use commonware_consensus::{types::Epoch, utils};
6use commonware_cryptography::ed25519::PublicKey;
7use commonware_utils::set::{Ordered, OrderedAssociated};
8use eyre::{OptionExt as _, WrapErr as _};
9use reth_ethereum::evm::revm::{State, database::StateProviderDatabase};
10use reth_node_builder::{Block as _, ConfigureEvm as _};
11use reth_provider::{
12    BlockHashReader, BlockIdReader as _, BlockReader as _, BlockSource, StateProviderFactory as _,
13};
14use tempo_node::TempoFullNode;
15use tempo_precompiles::{
16    storage::StorageCtx,
17    validator_config::{IValidatorConfig, ValidatorConfig},
18};
19
20use tracing::{Level, info, instrument, warn};
21
22/// Reads the validator config of `epoch`.
23///
24/// The validator config for `epoch` is always read from the last height of
25/// `epoch-1`.
26#[instrument(
27    skip_all,
28    fields(
29        attempt = _attempt,
30        for_epoch,
31        from_block = last_height_before_epoch(for_epoch, epoch_length),
32    ),
33    err
34)]
35pub(super) async fn read_from_contract(
36    _attempt: u32,
37    node: &TempoFullNode,
38    for_epoch: Epoch,
39    epoch_length: u64,
40) -> eyre::Result<OrderedAssociated<PublicKey, DecodedValidator>> {
41    let last_height = last_height_before_epoch(for_epoch, epoch_length);
42
43    // Try mapping the block height to a hash tracked by reth.
44    //
45    // First check the canonical chain, then fallback to pending block state.
46    //
47    // Necessary because the DKG and application actors process finalized block concurrently.
48    let block_hash = if let Some(hash) = node
49        .provider
50        .block_hash(last_height)
51        .wrap_err_with(|| format!("failed reading block hash at height `{last_height}`"))?
52    {
53        hash
54    } else if let Some(pending) = node
55        .provider
56        .pending_block_num_hash()
57        .wrap_err("failed reading pending block state")?
58        && pending.number == last_height
59    {
60        pending.hash
61    } else {
62        return Err(eyre::eyre!("block not found at height `{last_height}`"));
63    };
64
65    let block = node
66        .provider
67        .find_block_by_hash(block_hash, BlockSource::Any)
68        .map_err(Into::<eyre::Report>::into)
69        .and_then(|maybe| maybe.ok_or_eyre("execution layer returned empty block"))
70        .wrap_err_with(|| format!("failed reading block with hash `{block_hash}`"))?;
71
72    let db = State::builder()
73        .with_database(StateProviderDatabase::new(
74            node.provider
75                .state_by_block_hash(block_hash)
76                .wrap_err_with(|| {
77                    format!("failed to get state from node provider for hash `{block_hash}`")
78                })?,
79        ))
80        .build();
81
82    // XXX: Ensure that evm and internals go out of scope before the await point
83    // below.
84    let raw_validators = {
85        let mut evm = node
86            .evm_config
87            .evm_for_block(db, block.header())
88            .wrap_err("failed instantiating evm for genesis block")?;
89
90        let ctx = evm.ctx_mut();
91        StorageCtx::enter_evm(&mut ctx.journaled_state, &ctx.block, &ctx.cfg, || {
92            let validator_config = ValidatorConfig::new();
93            validator_config
94                .get_validators()
95                .wrap_err("failed to query contract for validator config")
96        })
97    }?;
98
99    info!(?raw_validators, "read validators from contract",);
100
101    Ok(decode_from_contract(raw_validators).await)
102}
103
104#[instrument(skip_all, fields(validators_to_decode = contract_vals.len()))]
105async fn decode_from_contract(
106    contract_vals: Vec<IValidatorConfig::Validator>,
107) -> OrderedAssociated<PublicKey, DecodedValidator> {
108    let mut decoded = HashMap::new();
109    for val in contract_vals.into_iter().filter(|val| val.active) {
110        // NOTE: not reporting errors because `decode_from_contract` emits
111        // events on success and error
112        if let Ok(val) = DecodedValidator::decode_from_contract(val)
113            && let Some(old) = decoded.insert(val.public_key.clone(), val)
114        {
115            warn!(
116                %old,
117                new = %decoded.get(&old.public_key).expect("just inserted it"),
118                "replaced peer because public keys were duplicated",
119            );
120        }
121    }
122    decoded.into_iter().collect::<_>()
123}
124
125/// Tracks the participants of each DKG ceremony, and, by extension, the p2p network.
126///
127/// The participants tracked here are in order:
128///
129/// 1. the dealers, that will drop out of the next ceremony
130/// 2. the player, that will become dealers in the next ceremony
131/// 3. the syncing players, that will become players in the next ceremony
132#[derive(Clone, Debug)]
133pub(super) struct ValidatorState {
134    dealers: OrderedAssociated<PublicKey, DecodedValidator>,
135    players: OrderedAssociated<PublicKey, DecodedValidator>,
136    syncing_players: OrderedAssociated<PublicKey, DecodedValidator>,
137}
138
139impl ValidatorState {
140    pub(super) fn new(validators: OrderedAssociated<PublicKey, DecodedValidator>) -> Self {
141        Self {
142            dealers: validators.clone(),
143            players: validators.clone(),
144            syncing_players: validators,
145        }
146    }
147
148    /// Returns a validator state with only public key and inbound address set.
149    ///
150    /// All other values take default values.
151    pub(super) fn with_unknown_contract_state(
152        validators: OrderedAssociated<PublicKey, SocketAddr>,
153    ) -> Self {
154        let validators = validators
155            .iter_pairs()
156            .map(|(key, addr)| {
157                let key = key.clone();
158                let validator = DecodedValidator {
159                    public_key: key.clone(),
160                    inbound: *addr,
161                    outbound: SocketAddr::from(([0, 0, 0, 0], 0)),
162                    index: 0,
163                    address: Address::ZERO,
164                };
165                (key, validator)
166            })
167            .collect();
168        Self::new(validators)
169    }
170
171    pub(super) fn dealers(&self) -> &OrderedAssociated<PublicKey, DecodedValidator> {
172        &self.dealers
173    }
174
175    pub(super) fn players(&self) -> &OrderedAssociated<PublicKey, DecodedValidator> {
176        &self.players
177    }
178
179    pub(super) fn syncing_players(&self) -> &OrderedAssociated<PublicKey, DecodedValidator> {
180        &self.syncing_players
181    }
182
183    pub(super) fn dealer_pubkeys(&self) -> Ordered<PublicKey> {
184        self.dealers.keys().clone()
185    }
186
187    pub(super) fn player_pubkeys(&self) -> Ordered<PublicKey> {
188        self.players.keys().clone()
189    }
190
191    /// Constructs a peerset to register on the peer manager.
192    ///
193    /// The peerset is constructed by merging the participants of all the
194    /// validator sets tracked in this queue, and resolving each of their
195    /// addresses (parsing socket address or looking up domain name).
196    ///
197    /// If a validator has entries across the tracked sets, then then its entry
198    /// for the latest pushed set is taken. For those cases where looking up
199    /// domain names failed, the last successfully looked up name is taken.
200    pub(super) fn resolve_addresses_and_merge_peers(
201        &self,
202    ) -> OrderedAssociated<PublicKey, SocketAddr> {
203        // IMPORTANT: Starting with the syncing players to ensure that the
204        // latest address for a validator with a given pubkey is used.
205        // OrderedAssociated takes the first instance of a key it sees and
206        // drops the later instances.
207        self.syncing_players()
208            .iter_pairs()
209            .chain(self.players().iter_pairs())
210            .chain(self.dealers().iter_pairs())
211            .map(|(pubkey, validator)| (pubkey.clone(), validator.inbound_socket_addr()))
212            .collect()
213    }
214
215    /// Pushes `syncing_players` into the participants queue.
216    ///
217    /// This method is called on successful DKG ceremonies: the current players
218    /// will become the next dealers, and the current syncing players will become
219    /// the next regular players.
220    ///
221    /// Removes and returns the old dealers.
222    pub(super) fn push_on_success(
223        &mut self,
224        syncing_players: OrderedAssociated<PublicKey, DecodedValidator>,
225    ) -> OrderedAssociated<PublicKey, DecodedValidator> {
226        let players = std::mem::replace(&mut self.syncing_players, syncing_players);
227        let dealers = std::mem::replace(&mut self.players, players);
228        std::mem::replace(&mut self.dealers, dealers)
229    }
230
231    /// Pushes `syncing_players` into the participants queue.
232    ///
233    /// This method is called on failed DKG ceremonies: the current dealers
234    /// will remain dealers for the next epoch, the current players are dropped
235    /// (since for them, the ceremony failed), and the current syncing players
236    /// will become the next regular players.
237    pub(super) fn push_on_failure(
238        &mut self,
239        syncing_players: OrderedAssociated<PublicKey, DecodedValidator>,
240    ) -> OrderedAssociated<PublicKey, DecodedValidator> {
241        let players = std::mem::replace(&mut self.syncing_players, syncing_players);
242        // The previous players are dropped/returned - these are for who the
243        // ceremony failed for.
244        std::mem::replace(&mut self.players, players)
245    }
246}
247
248impl Write for ValidatorState {
249    fn write(&self, buf: &mut impl bytes::BufMut) {
250        self.dealers().write(buf);
251        self.players().write(buf);
252        self.syncing_players().write(buf);
253    }
254}
255
256impl EncodeSize for ValidatorState {
257    fn encode_size(&self) -> usize {
258        self.dealers().encode_size()
259            + self.players().encode_size()
260            + self.syncing_players().encode_size()
261    }
262}
263
264impl Read for ValidatorState {
265    type Cfg = ();
266
267    fn read_cfg(
268        buf: &mut impl bytes::Buf,
269        _cfg: &Self::Cfg,
270    ) -> Result<Self, commonware_codec::Error> {
271        // The range 0..=usize::MAX here is ok: what we are writing to disk
272        // is completely under our control and there is no danger of DoS.
273        let dealers = OrderedAssociated::read_cfg(buf, &(RangeCfg::from(0..=usize::MAX), (), ()))?;
274        let players = OrderedAssociated::read_cfg(buf, &(RangeCfg::from(0..=usize::MAX), (), ()))?;
275        let syncing_players =
276            OrderedAssociated::read_cfg(buf, &(RangeCfg::from(0..=usize::MAX), (), ()))?;
277        Ok(Self {
278            dealers,
279            players,
280            syncing_players,
281        })
282    }
283}
284
285/// A ContractValidator is a peer read from the validator config smart const.
286///
287/// The inbound and outbound addresses stored herein are guaranteed to be of the
288/// form `<host>:<port>` for inbound, and `<ip>:<port>` for outbound. Here,
289/// `<host>` is either an IPv4 or IPV6 address, or a fully qualified domain name.
290/// `<ip>` is an IPv4 or IPv6 address.
291#[derive(Clone, Debug, PartialEq, Eq)]
292pub(super) struct DecodedValidator {
293    /// The `publicKey` field of the contract. Used by other validators to
294    /// identify a peer by verifying the signatures of its p2p messages and
295    /// as a dealer/player/participant in DKG ceremonies and consensus for a
296    /// given epoch. Part of the set registered with the lookup p2p manager.
297    pub(super) public_key: PublicKey,
298    /// The `inboundAddress` field of the contract. Used by other validators
299    /// to dial a peer and ensure that messages from that peer are coming from
300    /// this address. Part of the set registered with the lookup p2p manager.
301    pub(super) inbound: SocketAddr,
302    /// The `outboundAddress` field of the contract. Currently ignored because
303    /// all p2p communication is symmetric (outbound and inbound) via the
304    /// `inboundAddress` field.
305    pub(super) outbound: SocketAddr,
306    /// The `index` field of the contract. Not used by consensus and just here
307    /// for debugging purposes to identify the contract entry. Emitted in
308    /// tracing events.
309    pub(super) index: u64,
310    /// The `address` field of the contract. Not used by consensus and just here
311    /// for debugging purposes to identify the contract entry. Emitted in
312    /// tracing events.
313    pub(super) address: Address,
314}
315
316impl DecodedValidator {
317    /// Attempts to decode a single validator from the values read in the smart contract.
318    ///
319    /// This function does not perform hostname lookup on either of the addresses.
320    /// Instead, only the shape of the addresses are checked for whether they are
321    /// socket addresses (IP:PORT pairs), or fully qualified domain names.
322    #[instrument(ret(Display, level = Level::INFO), err(level = Level::WARN))]
323    pub(super) fn decode_from_contract(
324        IValidatorConfig::Validator {
325            publicKey,
326            index,
327            validatorAddress,
328            inboundAddress,
329            outboundAddress,
330            ..
331        }: IValidatorConfig::Validator,
332    ) -> eyre::Result<Self> {
333        let public_key = PublicKey::decode(publicKey.as_ref())
334            .wrap_err("failed decoding publicKey field as ed25519 public key")?;
335        let inbound = inboundAddress
336            .parse()
337            .wrap_err("inboundAddress was not valid")?;
338        let outbound = outboundAddress
339            .parse()
340            .wrap_err("outboundAddress was not valid")?;
341        Ok(Self {
342            public_key,
343            inbound,
344            outbound,
345            index,
346            address: validatorAddress,
347        })
348    }
349
350    fn inbound_socket_addr(&self) -> SocketAddr {
351        self.inbound
352    }
353}
354
355impl std::fmt::Display for DecodedValidator {
356    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
357        f.write_fmt(format_args!(
358            "public key = `{}`, inbound = `{}`, outbound = `{}`, index = `{}`, address = `{}`",
359            self.public_key, self.inbound, self.outbound, self.index, self.address
360        ))
361    }
362}
363
364impl Write for DecodedValidator {
365    fn write(&self, buf: &mut impl bytes::BufMut) {
366        self.public_key.write(buf);
367        self.inbound.to_string().as_bytes().write(buf);
368        self.outbound.to_string().as_bytes().write(buf);
369        UInt(self.index).write(buf);
370        self.address.0.write(buf);
371    }
372}
373
374impl EncodeSize for DecodedValidator {
375    fn encode_size(&self) -> usize {
376        self.public_key.encode_size()
377            + self.inbound.to_string().as_bytes().encode_size()
378            + self.outbound.to_string().as_bytes().encode_size()
379            + UInt(self.index).encode_size()
380            + self.address.0.encode_size()
381    }
382}
383
384impl Read for DecodedValidator {
385    type Cfg = ();
386
387    fn read_cfg(
388        mut buf: &mut impl bytes::Buf,
389        _cfg: &Self::Cfg,
390    ) -> Result<Self, commonware_codec::Error> {
391        let public_key = PublicKey::read_cfg(buf, &())?;
392        let inbound = {
393            // 253 is the maximum length of a fqdn.
394            let bytes = Vec::<u8>::read_cfg(buf, &(RangeCfg::new(0..=253usize), ()))?;
395            String::from_utf8(bytes).map_err(|_| {
396                commonware_codec::Error::Invalid("decode inbound address", "not utf8")
397            })?
398        }
399        .parse()
400        .map_err(|_| {
401            commonware_codec::Error::Invalid("decode inbound address", "not <ip>:<port>")
402        })?;
403        let outbound = {
404            // 253 is the maximum length of a fqdn.
405            let bytes = Vec::<u8>::read_cfg(buf, &(RangeCfg::new(0..=253usize), ()))?;
406            String::from_utf8(bytes).map_err(|_| {
407                commonware_codec::Error::Invalid("decode outbound address", "not utf8")
408            })?
409        }
410        .parse()
411        .map_err(|_| {
412            commonware_codec::Error::Invalid("decode outbound address", "not <ip>:<port>")
413        })?;
414        let index = UInt::read_cfg(&mut buf, &())?.into();
415        let address = Address::new(<[u8; 20]>::read_cfg(&mut buf, &())?);
416        Ok(Self {
417            public_key,
418            inbound,
419            outbound,
420            index,
421            address,
422        })
423    }
424}
425
426fn last_height_before_epoch(epoch: Epoch, epoch_length: u64) -> u64 {
427    epoch
428        .checked_sub(1)
429        .map_or(0, |epoch| utils::last_block_in_epoch(epoch_length, epoch))
430}
431
432#[cfg(test)]
433mod tests {
434    use commonware_codec::{DecodeExt as _, Encode as _};
435    use commonware_cryptography::{PrivateKeyExt, Signer, ed25519::PrivateKey};
436
437    use crate::dkg::manager::DecodedValidator;
438
439    #[test]
440    fn roundtrip_decoded_validator() {
441        let private_key = PrivateKey::from_seed(42);
442        let decoded_validator = DecodedValidator {
443            public_key: private_key.public_key(),
444            inbound: "192.168.0.1:1234".parse().unwrap(),
445            outbound: "127.0.0.1:4321".parse().unwrap(),
446            index: 42,
447            address: alloy_primitives::Address::ZERO,
448        };
449        assert_eq!(
450            decoded_validator,
451            DecodedValidator::decode(&mut decoded_validator.encode().freeze()).unwrap()
452        );
453    }
454}