Skip to main content

tempo_consensus/
lib.rs

1//! A Tempo node using commonware's threshold simplex as consensus.
2
3#![cfg_attr(not(test), warn(unused_crate_dependencies))]
4#![cfg_attr(docsrs, feature(doc_cfg))]
5
6pub(crate) mod alias;
7mod args;
8pub(crate) mod config;
9pub mod consensus;
10pub(crate) mod dkg;
11pub(crate) mod epoch;
12pub(crate) mod executor;
13pub mod feed;
14pub mod follow;
15pub mod metrics;
16pub(crate) mod network_identity;
17pub(crate) mod peer_manager;
18pub(crate) mod storage;
19pub(crate) mod subblocks;
20pub(crate) mod utils;
21pub(crate) mod validators;
22
23use std::sync::Arc;
24
25use commonware_consensus::types::FixedEpocher;
26use commonware_cryptography::ed25519::{PrivateKey, PublicKey};
27use commonware_p2p::authenticated::lookup;
28use commonware_runtime::Metrics as _;
29use commonware_utils::NZU64;
30use eyre::{OptionExt, WrapErr as _, eyre};
31use tempo_consensus_config::SigningShare;
32use tempo_node::TempoFullNode;
33use tracing::info;
34
35pub use crate::config::{
36    BROADCASTER_CHANNEL_IDENT, BROADCASTER_LIMIT, CERTIFICATES_CHANNEL_IDENT, CERTIFICATES_LIMIT,
37    DKG_CHANNEL_IDENT, DKG_LIMIT, MARSHAL_CHANNEL_IDENT, MARSHAL_LIMIT, NAMESPACE,
38    RESOLVER_CHANNEL_IDENT, RESOLVER_LIMIT, SUBBLOCKS_CHANNEL_IDENT, SUBBLOCKS_LIMIT,
39    VOTES_CHANNEL_IDENT, VOTES_LIMIT,
40};
41
42pub use args::{Args, PositiveDuration};
43pub use storage::find_last_finalized_marker;
44
45// Shared by both the consensus and follow engines such that
46// snapshots for overlapping archives can be reused.
47const PARTITION_PREFIX: &str = "engine";
48
49pub async fn run_consensus_stack(
50    context: commonware_runtime::tokio::Context,
51    config: Args,
52    execution_node: Arc<TempoFullNode>,
53    feed_state: feed::FeedStateHandle,
54) -> eyre::Result<()> {
55    let share = config
56        .signing_share
57        .as_ref()
58        .map(|share| {
59            SigningShare::read_from_file(share).wrap_err_with(|| {
60                format!(
61                    "failed reading private bls12-381 key share from file `{}`",
62                    share.display()
63                )
64            })
65        })
66        .transpose()?
67        .map(|signing_share| signing_share.into_inner());
68
69    let signing_key = config
70        .signing_key()
71        .await?
72        .ok_or_eyre("required option `consensus.signing-key` not set")?;
73
74    let backfill_quota = commonware_runtime::Quota::per_second(config.backfill_frequency);
75
76    let (mut network, oracle) =
77        instantiate_network(&context, &config, signing_key.clone().into_inner())
78            .await
79            .wrap_err("failed to start network")?;
80
81    let message_backlog = config.message_backlog;
82    let votes = network.register(VOTES_CHANNEL_IDENT, VOTES_LIMIT, message_backlog);
83    let certificates = network.register(
84        CERTIFICATES_CHANNEL_IDENT,
85        CERTIFICATES_LIMIT,
86        message_backlog,
87    );
88    let resolver = network.register(RESOLVER_CHANNEL_IDENT, RESOLVER_LIMIT, message_backlog);
89    let broadcaster = network.register(
90        BROADCASTER_CHANNEL_IDENT,
91        BROADCASTER_LIMIT,
92        message_backlog,
93    );
94    let marshal = network.register(MARSHAL_CHANNEL_IDENT, backfill_quota, message_backlog);
95    let dkg = network.register(DKG_CHANNEL_IDENT, DKG_LIMIT, message_backlog);
96    // We create the subblocks channel even though it might not be used to make
97    // sure that we don't ban peers that activate subblocks and send messages
98    // through this subchannel.
99    let subblocks = network.register(SUBBLOCKS_CHANNEL_IDENT, SUBBLOCKS_LIMIT, message_backlog);
100
101    let target_block_time = config.target_block_time.into_duration();
102    // Consensus owns the end-to-end local proposal window. The network budget
103    // is reserved for propagation, and the remaining time is passed down to
104    // proposal handling and local payload building.
105    let proposal_return_budget =
106        target_block_time.saturating_sub(config.network_budget.into_duration());
107
108    let consensus_engine = crate::consensus::engine::Builder {
109        execution_node: Some(execution_node),
110        blocker: oracle.clone(),
111        peer_manager: oracle.clone(),
112
113        // TODO: Set this through config?
114        partition_prefix: PARTITION_PREFIX.into(),
115        signer: signing_key.into_inner(),
116        share,
117
118        mailbox_size: config.mailbox_size,
119        deque_size: config.deque_size,
120
121        time_to_propose: config.wait_for_proposal.into_duration(),
122        time_to_collect_notarizations: config.wait_for_notarizations.into_duration(),
123        time_to_retry_nullify_broadcast: config.wait_to_rebroadcast_nullify.into_duration(),
124        time_for_peer_response: config.wait_for_peer_response.into_duration(),
125        views_to_track: config.views_to_track,
126        views_until_leader_skip: config.inactive_views_until_leader_skip,
127        proposal_return_budget,
128        time_to_build_subblock: config.time_to_build_subblock.into_duration(),
129        subblock_broadcast_interval: config.subblock_broadcast_interval.into_duration(),
130        fcu_heartbeat_interval: config.fcu_heartbeat_interval.into_duration(),
131        with_subblocks: false,
132
133        feed_state,
134
135        finalized_blocks_retention: config.finalized_blocks_retention,
136    }
137    .try_init(context.with_label("engine"))
138    .await
139    .wrap_err("failed initializing consensus engine")?;
140
141    let (network, consensus_engine) = (
142        network.start(),
143        consensus_engine.start(
144            votes,
145            certificates,
146            resolver,
147            broadcaster,
148            marshal,
149            dkg,
150            subblocks,
151        ),
152    );
153
154    tokio::select! {
155        ret = network => {
156            ret.map_err(eyre::Report::from)
157                .and_then(|()| Err(eyre!("exited unexpectedly")))
158                .wrap_err("network task failed")
159        }
160
161        ret = consensus_engine => {
162            ret.map_err(eyre::Report::from)
163                .and_then(|ret| ret.and_then(|()| Err(eyre!("exited unexpectedly"))))
164                .wrap_err("consensus engine task failed")
165        }
166    }
167}
168
169/// Run the follower stack. This uses RPC to sync consensus state and drive
170/// the execution layer from the upstream node.
171pub async fn run_follow_stack(
172    context: commonware_runtime::tokio::Context,
173    config: Args,
174    upstream_url: String,
175    execution_node: Arc<TempoFullNode>,
176    feed_state: feed::FeedStateHandle,
177) -> eyre::Result<()> {
178    let chain_spec = execution_node.chain_spec();
179
180    let epoch_length = chain_spec
181        .info
182        .epoch_length()
183        .ok_or_eyre("chainspec did not contain epochLength")?;
184
185    let chain_spec_network_identity = chain_spec
186        .network_identity
187        .clone()
188        .ok_or_eyre("chainspec has no dkg outcome in genesis header")?;
189
190    let network_identity = config
191        .network_identity()
192        .unwrap_or(chain_spec_network_identity);
193
194    info!(%network_identity.from_epoch, %network_identity.identity, "registered network identity");
195
196    let (upstream, upstream_mailbox) = crate::follow::upstream::init(
197        context.with_label("upstream"),
198        crate::follow::upstream::Config { upstream_url },
199    );
200
201    let config = follow::Config {
202        execution_node,
203        feed_state,
204        upstream,
205        upstream_mailbox,
206        network_identity,
207        partition_prefix: PARTITION_PREFIX.into(),
208        epoch_strategy: FixedEpocher::new(NZU64!(epoch_length)),
209        mailbox_size: config.mailbox_size,
210        fcu_heartbeat_interval: config.fcu_heartbeat_interval.into_duration(),
211        finalized_blocks_retention: config.finalized_blocks_retention,
212    };
213
214    let ret = config
215        .try_init(context.with_label("engine"))
216        .await
217        .wrap_err("failed initializing follow engine")?
218        .start()
219        .await;
220
221    ret.map_err(eyre::Report::from)
222        .and_then(|ret| ret.and_then(|()| Err(eyre!("exited unexpectedly"))))
223        .wrap_err("follow engine task failed")
224}
225
226async fn instantiate_network(
227    context: &commonware_runtime::tokio::Context,
228    config: &Args,
229    signing_key: PrivateKey,
230) -> eyre::Result<(
231    lookup::Network<commonware_runtime::tokio::Context, PrivateKey>,
232    lookup::Oracle<PublicKey>,
233)> {
234    // TODO: Find out why `union_unique` should be used. This is the only place
235    // where `NAMESPACE` is used at all. We follow alto's example for now.
236    let namespace = commonware_utils::union_unique(crate::config::NAMESPACE, b"_P2P");
237    let cfg = lookup::Config {
238        namespace,
239        crypto: signing_key,
240        listen: config.listen_address,
241        max_message_size: config.max_message_size_bytes,
242        mailbox_size: config.mailbox_size,
243        send_batch_size: commonware_utils::NZUsize!(8),
244        bypass_ip_check: config.bypass_ip_check,
245        allow_private_ips: config.allow_private_ips,
246        allow_dns: config.allow_dns,
247        tracked_peer_sets: crate::config::PEERSETS_TO_TRACK,
248        synchrony_bound: config.synchrony_bound.into_duration(),
249        max_handshake_age: config.handshake_stale_after.into_duration(),
250        handshake_timeout: config.handshake_timeout.into_duration(),
251        max_concurrent_handshakes: config.max_concurrent_handshakes,
252        block_duration: config.time_to_unblock_byzantine_peer.into_duration(),
253        dial_frequency: config.wait_before_peers_redial.into_duration(),
254        ping_frequency: config.wait_before_peers_reping.into_duration(),
255        peer_connection_cooldown: config.connection_per_peer_min_period.into_duration(),
256        allowed_handshake_rate_per_ip: commonware_runtime::Quota::with_period(
257            config.handshake_per_ip_min_period.into_duration(),
258        )
259        .ok_or_eyre("handshake per ip min period must be non-zero")?,
260        allowed_handshake_rate_per_subnet: commonware_runtime::Quota::with_period(
261            config.handshake_per_subnet_min_period.into_duration(),
262        )
263        .ok_or_eyre("handshake per subnet min period must be non-zero")?,
264    };
265
266    Ok(lookup::Network::new(context.with_label("network"), cfg))
267}