Skip to main content

tempo_commonware_node/
lib.rs

1//! A Tempo node using commonware's threshold simplex as consensus.
2
3#![cfg_attr(not(test), warn(unused_crate_dependencies))]
4#![cfg_attr(docsrs, feature(doc_cfg))]
5
6pub(crate) mod alias;
7mod args;
8pub(crate) mod config;
9pub mod consensus;
10pub(crate) mod dkg;
11pub(crate) mod epoch;
12pub(crate) mod executor;
13pub mod feed;
14pub mod follow;
15pub mod metrics;
16pub(crate) mod peer_manager;
17pub(crate) mod storage;
18pub(crate) mod utils;
19pub(crate) mod validators;
20
21pub(crate) mod subblocks;
22
23use std::sync::Arc;
24
25use commonware_consensus::types::FixedEpocher;
26use commonware_cryptography::ed25519::{PrivateKey, PublicKey};
27use commonware_p2p::authenticated::lookup;
28use commonware_runtime::Metrics as _;
29use commonware_utils::NZU64;
30use eyre::{OptionExt, WrapErr as _, eyre};
31use tempo_commonware_node_config::SigningShare;
32use tempo_node::TempoFullNode;
33
34pub use crate::config::{
35    BROADCASTER_CHANNEL_IDENT, BROADCASTER_LIMIT, CERTIFICATES_CHANNEL_IDENT, CERTIFICATES_LIMIT,
36    DKG_CHANNEL_IDENT, DKG_LIMIT, MARSHAL_CHANNEL_IDENT, MARSHAL_LIMIT, NAMESPACE,
37    RESOLVER_CHANNEL_IDENT, RESOLVER_LIMIT, SUBBLOCKS_CHANNEL_IDENT, SUBBLOCKS_LIMIT,
38    VOTES_CHANNEL_IDENT, VOTES_LIMIT,
39};
40
41pub use args::{Args, PositiveDuration};
42
43// Shared by both the consensus and follow engines such that
44// snapshots for overlapping archives can be reused.
45const PARTITION_PREFIX: &str = "engine";
46
47pub async fn run_consensus_stack(
48    context: commonware_runtime::tokio::Context,
49    config: Args,
50    execution_node: Arc<TempoFullNode>,
51    feed_state: feed::FeedStateHandle,
52) -> eyre::Result<()> {
53    let share = config
54        .signing_share
55        .as_ref()
56        .map(|share| {
57            SigningShare::read_from_file(share).wrap_err_with(|| {
58                format!(
59                    "failed reading private bls12-381 key share from file `{}`",
60                    share.display()
61                )
62            })
63        })
64        .transpose()?
65        .map(|signing_share| signing_share.into_inner());
66
67    let signing_key = config
68        .signing_key()?
69        .ok_or_eyre("required option `consensus.signing-key` not set")?;
70
71    let backfill_quota = commonware_runtime::Quota::per_second(config.backfill_frequency);
72
73    let (mut network, oracle) =
74        instantiate_network(&context, &config, signing_key.clone().into_inner())
75            .await
76            .wrap_err("failed to start network")?;
77
78    let message_backlog = config.message_backlog;
79    let votes = network.register(VOTES_CHANNEL_IDENT, VOTES_LIMIT, message_backlog);
80    let certificates = network.register(
81        CERTIFICATES_CHANNEL_IDENT,
82        CERTIFICATES_LIMIT,
83        message_backlog,
84    );
85    let resolver = network.register(RESOLVER_CHANNEL_IDENT, RESOLVER_LIMIT, message_backlog);
86    let broadcaster = network.register(
87        BROADCASTER_CHANNEL_IDENT,
88        BROADCASTER_LIMIT,
89        message_backlog,
90    );
91    let marshal = network.register(MARSHAL_CHANNEL_IDENT, backfill_quota, message_backlog);
92    let dkg = network.register(DKG_CHANNEL_IDENT, DKG_LIMIT, message_backlog);
93    // We create the subblocks channel even though it might not be used to make
94    // sure that we don't ban peers that activate subblocks and send messages
95    // through this subchannel.
96    let subblocks = network.register(SUBBLOCKS_CHANNEL_IDENT, SUBBLOCKS_LIMIT, message_backlog);
97
98    let consensus_engine = crate::consensus::engine::Builder {
99        execution_node: Some(execution_node),
100        blocker: oracle.clone(),
101        peer_manager: oracle.clone(),
102
103        // TODO: Set this through config?
104        partition_prefix: PARTITION_PREFIX.into(),
105        signer: signing_key.into_inner(),
106        share,
107
108        mailbox_size: config.mailbox_size,
109        deque_size: config.deque_size,
110
111        time_to_propose: config.wait_for_proposal.into_duration(),
112        time_to_collect_notarizations: config.wait_for_notarizations.into_duration(),
113        time_to_retry_nullify_broadcast: config.wait_to_rebroadcast_nullify.into_duration(),
114        time_for_peer_response: config.wait_for_peer_response.into_duration(),
115        views_to_track: config.views_to_track,
116        views_until_leader_skip: config.inactive_views_until_leader_skip,
117        payload_interrupt_time: config.time_to_prepare_proposal_transactions.into_duration(),
118        new_payload_wait_time: config.minimum_time_before_propose.into_duration(),
119        time_to_build_subblock: config.time_to_build_subblock.into_duration(),
120        subblock_broadcast_interval: config.subblock_broadcast_interval.into_duration(),
121        fcu_heartbeat_interval: config.fcu_heartbeat_interval.into_duration(),
122        with_subblocks: false,
123
124        feed_state,
125    }
126    .try_init(context.with_label("engine"))
127    .await
128    .wrap_err("failed initializing consensus engine")?;
129
130    let (network, consensus_engine) = (
131        network.start(),
132        consensus_engine.start(
133            votes,
134            certificates,
135            resolver,
136            broadcaster,
137            marshal,
138            dkg,
139            subblocks,
140        ),
141    );
142
143    tokio::select! {
144        ret = network => {
145            ret.map_err(eyre::Report::from)
146                .and_then(|()| Err(eyre!("exited unexpectedly")))
147                .wrap_err("network task failed")
148        }
149
150        ret = consensus_engine => {
151            ret.map_err(eyre::Report::from)
152                .and_then(|ret| ret.and_then(|()| Err(eyre!("exited unexpectedly"))))
153                .wrap_err("consensus engine task failed")
154        }
155    }
156}
157
158/// Run the follower stack. This uses RPC to sync consensus state and drive
159/// the execution layer from the upstream node.
160pub async fn run_follow_stack(
161    context: commonware_runtime::tokio::Context,
162    config: Args,
163    upstream_url: String,
164    execution_node: Arc<TempoFullNode>,
165    feed_state: feed::FeedStateHandle,
166) -> eyre::Result<()> {
167    let epoch_length = execution_node
168        .chain_spec()
169        .info
170        .epoch_length()
171        .ok_or_eyre("chainspec did not contain epochLength")?;
172
173    let (upstream, upstream_mailbox) = crate::follow::upstream::init(
174        context.with_label("upstream"),
175        crate::follow::upstream::Config { upstream_url },
176    );
177
178    let config = follow::Config {
179        execution_node,
180        feed_state,
181        upstream,
182        upstream_mailbox,
183        partition_prefix: PARTITION_PREFIX.into(),
184        epoch_strategy: FixedEpocher::new(NZU64!(epoch_length)),
185        mailbox_size: config.mailbox_size,
186        fcu_heartbeat_interval: config.fcu_heartbeat_interval.into_duration(),
187    };
188
189    let ret = config
190        .try_init(context.with_label("engine"))
191        .await
192        .wrap_err("failed initializing follow engine")?
193        .start()
194        .await;
195
196    ret.map_err(eyre::Report::from)
197        .and_then(|ret| ret.and_then(|()| Err(eyre!("exited unexpectedly"))))
198        .wrap_err("follow engine task failed")
199}
200
201async fn instantiate_network(
202    context: &commonware_runtime::tokio::Context,
203    config: &Args,
204    signing_key: PrivateKey,
205) -> eyre::Result<(
206    lookup::Network<commonware_runtime::tokio::Context, PrivateKey>,
207    lookup::Oracle<PublicKey>,
208)> {
209    // TODO: Find out why `union_unique` should be used. This is the only place
210    // where `NAMESPACE` is used at all. We follow alto's example for now.
211    let namespace = commonware_utils::union_unique(crate::config::NAMESPACE, b"_P2P");
212    let cfg = lookup::Config {
213        namespace,
214        crypto: signing_key,
215        listen: config.listen_address,
216        max_message_size: config.max_message_size_bytes,
217        mailbox_size: config.mailbox_size,
218        send_batch_size: commonware_utils::NZUsize!(8),
219        bypass_ip_check: config.bypass_ip_check,
220        allow_private_ips: config.allow_private_ips,
221        allow_dns: config.allow_dns,
222        tracked_peer_sets: crate::config::PEERSETS_TO_TRACK,
223        synchrony_bound: config.synchrony_bound.into_duration(),
224        max_handshake_age: config.handshake_stale_after.into_duration(),
225        handshake_timeout: config.handshake_timeout.into_duration(),
226        max_concurrent_handshakes: config.max_concurrent_handshakes,
227        block_duration: config.time_to_unblock_byzantine_peer.into_duration(),
228        dial_frequency: config.wait_before_peers_redial.into_duration(),
229        ping_frequency: config.wait_before_peers_reping.into_duration(),
230        peer_connection_cooldown: config.connection_per_peer_min_period.into_duration(),
231        allowed_handshake_rate_per_ip: commonware_runtime::Quota::with_period(
232            config.handshake_per_ip_min_period.into_duration(),
233        )
234        .ok_or_eyre("handshake per ip min period must be non-zero")?,
235        allowed_handshake_rate_per_subnet: commonware_runtime::Quota::with_period(
236            config.handshake_per_subnet_min_period.into_duration(),
237        )
238        .ok_or_eyre("handshake per subnet min period must be non-zero")?,
239    };
240
241    Ok(lookup::Network::new(context.with_label("network"), cfg))
242}