1#![cfg_attr(not(test), warn(unused_crate_dependencies))]
4#![cfg_attr(docsrs, feature(doc_cfg))]
5
6pub(crate) mod alias;
7mod args;
8pub(crate) mod config;
9pub mod consensus;
10pub(crate) mod dkg;
11pub(crate) mod epoch;
12pub(crate) mod executor;
13pub mod feed;
14pub mod follow;
15pub mod metrics;
16pub(crate) mod network_identity;
17pub(crate) mod peer_manager;
18pub(crate) mod storage;
19pub(crate) mod subblocks;
20pub(crate) mod utils;
21pub(crate) mod validators;
22
23use std::sync::Arc;
24
25use commonware_consensus::types::FixedEpocher;
26use commonware_cryptography::ed25519::{PrivateKey, PublicKey};
27use commonware_p2p::authenticated::lookup;
28use commonware_runtime::Metrics as _;
29use commonware_utils::NZU64;
30use eyre::{OptionExt, WrapErr as _, eyre};
31use tempo_consensus_config::SigningShare;
32use tempo_node::TempoFullNode;
33use tracing::info;
34
35pub use crate::config::{
36 BROADCASTER_CHANNEL_IDENT, BROADCASTER_LIMIT, CERTIFICATES_CHANNEL_IDENT, CERTIFICATES_LIMIT,
37 DKG_CHANNEL_IDENT, DKG_LIMIT, MARSHAL_CHANNEL_IDENT, MARSHAL_LIMIT, NAMESPACE,
38 RESOLVER_CHANNEL_IDENT, RESOLVER_LIMIT, SUBBLOCKS_CHANNEL_IDENT, SUBBLOCKS_LIMIT,
39 VOTES_CHANNEL_IDENT, VOTES_LIMIT,
40};
41
42pub use args::{Args, PositiveDuration};
43pub use storage::find_last_finalized_marker;
44
45const PARTITION_PREFIX: &str = "engine";
48
49pub async fn run_consensus_stack(
50 context: commonware_runtime::tokio::Context,
51 config: Args,
52 execution_node: Arc<TempoFullNode>,
53 feed_state: feed::FeedStateHandle,
54) -> eyre::Result<()> {
55 let share = config
56 .signing_share
57 .as_ref()
58 .map(|share| {
59 SigningShare::read_from_file(share).wrap_err_with(|| {
60 format!(
61 "failed reading private bls12-381 key share from file `{}`",
62 share.display()
63 )
64 })
65 })
66 .transpose()?
67 .map(|signing_share| signing_share.into_inner());
68
69 let signing_key = config
70 .signing_key()
71 .await?
72 .ok_or_eyre("required option `consensus.signing-key` not set")?;
73
74 let backfill_quota = commonware_runtime::Quota::per_second(config.backfill_frequency);
75
76 let (mut network, oracle) =
77 instantiate_network(&context, &config, signing_key.clone().into_inner())
78 .await
79 .wrap_err("failed to start network")?;
80
81 let message_backlog = config.message_backlog;
82 let votes = network.register(VOTES_CHANNEL_IDENT, VOTES_LIMIT, message_backlog);
83 let certificates = network.register(
84 CERTIFICATES_CHANNEL_IDENT,
85 CERTIFICATES_LIMIT,
86 message_backlog,
87 );
88 let resolver = network.register(RESOLVER_CHANNEL_IDENT, RESOLVER_LIMIT, message_backlog);
89 let broadcaster = network.register(
90 BROADCASTER_CHANNEL_IDENT,
91 BROADCASTER_LIMIT,
92 message_backlog,
93 );
94 let marshal = network.register(MARSHAL_CHANNEL_IDENT, backfill_quota, message_backlog);
95 let dkg = network.register(DKG_CHANNEL_IDENT, DKG_LIMIT, message_backlog);
96 let subblocks = network.register(SUBBLOCKS_CHANNEL_IDENT, SUBBLOCKS_LIMIT, message_backlog);
100
101 let target_block_time = config.target_block_time.into_duration();
102 let proposal_return_budget =
106 target_block_time.saturating_sub(config.network_budget.into_duration());
107
108 let consensus_engine = crate::consensus::engine::Builder {
109 execution_node: Some(execution_node),
110 blocker: oracle.clone(),
111 peer_manager: oracle.clone(),
112
113 partition_prefix: PARTITION_PREFIX.into(),
115 signer: signing_key.into_inner(),
116 share,
117
118 mailbox_size: config.mailbox_size,
119 deque_size: config.deque_size,
120
121 time_to_propose: config.wait_for_proposal.into_duration(),
122 time_to_collect_notarizations: config.wait_for_notarizations.into_duration(),
123 time_to_retry_nullify_broadcast: config.wait_to_rebroadcast_nullify.into_duration(),
124 time_for_peer_response: config.wait_for_peer_response.into_duration(),
125 views_to_track: config.views_to_track,
126 views_until_leader_skip: config.inactive_views_until_leader_skip,
127 proposal_return_budget,
128 time_to_build_subblock: config.time_to_build_subblock.into_duration(),
129 subblock_broadcast_interval: config.subblock_broadcast_interval.into_duration(),
130 fcu_heartbeat_interval: config.fcu_heartbeat_interval.into_duration(),
131 with_subblocks: false,
132
133 feed_state,
134
135 finalized_blocks_retention: config.finalized_blocks_retention,
136 }
137 .try_init(context.with_label("engine"))
138 .await
139 .wrap_err("failed initializing consensus engine")?;
140
141 let (network, consensus_engine) = (
142 network.start(),
143 consensus_engine.start(
144 votes,
145 certificates,
146 resolver,
147 broadcaster,
148 marshal,
149 dkg,
150 subblocks,
151 ),
152 );
153
154 tokio::select! {
155 ret = network => {
156 ret.map_err(eyre::Report::from)
157 .and_then(|()| Err(eyre!("exited unexpectedly")))
158 .wrap_err("network task failed")
159 }
160
161 ret = consensus_engine => {
162 ret.map_err(eyre::Report::from)
163 .and_then(|ret| ret.and_then(|()| Err(eyre!("exited unexpectedly"))))
164 .wrap_err("consensus engine task failed")
165 }
166 }
167}
168
169pub async fn run_follow_stack(
172 context: commonware_runtime::tokio::Context,
173 config: Args,
174 upstream_url: String,
175 execution_node: Arc<TempoFullNode>,
176 feed_state: feed::FeedStateHandle,
177) -> eyre::Result<()> {
178 let chain_spec = execution_node.chain_spec();
179
180 let epoch_length = chain_spec
181 .info
182 .epoch_length()
183 .ok_or_eyre("chainspec did not contain epochLength")?;
184
185 let chain_spec_network_identity = chain_spec
186 .network_identity
187 .clone()
188 .ok_or_eyre("chainspec has no dkg outcome in genesis header")?;
189
190 let network_identity = config
191 .network_identity()
192 .unwrap_or(chain_spec_network_identity);
193
194 info!(%network_identity.from_epoch, %network_identity.identity, "registered network identity");
195
196 let (upstream, upstream_mailbox) = crate::follow::upstream::init(
197 context.with_label("upstream"),
198 crate::follow::upstream::Config { upstream_url },
199 );
200
201 let config = follow::Config {
202 execution_node,
203 feed_state,
204 upstream,
205 upstream_mailbox,
206 network_identity,
207 partition_prefix: PARTITION_PREFIX.into(),
208 epoch_strategy: FixedEpocher::new(NZU64!(epoch_length)),
209 mailbox_size: config.mailbox_size,
210 fcu_heartbeat_interval: config.fcu_heartbeat_interval.into_duration(),
211 finalized_blocks_retention: config.finalized_blocks_retention,
212 };
213
214 let ret = config
215 .try_init(context.with_label("engine"))
216 .await
217 .wrap_err("failed initializing follow engine")?
218 .start()
219 .await;
220
221 ret.map_err(eyre::Report::from)
222 .and_then(|ret| ret.and_then(|()| Err(eyre!("exited unexpectedly"))))
223 .wrap_err("follow engine task failed")
224}
225
226async fn instantiate_network(
227 context: &commonware_runtime::tokio::Context,
228 config: &Args,
229 signing_key: PrivateKey,
230) -> eyre::Result<(
231 lookup::Network<commonware_runtime::tokio::Context, PrivateKey>,
232 lookup::Oracle<PublicKey>,
233)> {
234 let namespace = commonware_utils::union_unique(crate::config::NAMESPACE, b"_P2P");
237 let cfg = lookup::Config {
238 namespace,
239 crypto: signing_key,
240 listen: config.listen_address,
241 max_message_size: config.max_message_size_bytes,
242 mailbox_size: config.mailbox_size,
243 send_batch_size: commonware_utils::NZUsize!(8),
244 bypass_ip_check: config.bypass_ip_check,
245 allow_private_ips: config.allow_private_ips,
246 allow_dns: config.allow_dns,
247 tracked_peer_sets: crate::config::PEERSETS_TO_TRACK,
248 synchrony_bound: config.synchrony_bound.into_duration(),
249 max_handshake_age: config.handshake_stale_after.into_duration(),
250 handshake_timeout: config.handshake_timeout.into_duration(),
251 max_concurrent_handshakes: config.max_concurrent_handshakes,
252 block_duration: config.time_to_unblock_byzantine_peer.into_duration(),
253 dial_frequency: config.wait_before_peers_redial.into_duration(),
254 ping_frequency: config.wait_before_peers_reping.into_duration(),
255 peer_connection_cooldown: config.connection_per_peer_min_period.into_duration(),
256 allowed_handshake_rate_per_ip: commonware_runtime::Quota::with_period(
257 config.handshake_per_ip_min_period.into_duration(),
258 )
259 .ok_or_eyre("handshake per ip min period must be non-zero")?,
260 allowed_handshake_rate_per_subnet: commonware_runtime::Quota::with_period(
261 config.handshake_per_subnet_min_period.into_duration(),
262 )
263 .ok_or_eyre("handshake per subnet min period must be non-zero")?,
264 };
265
266 Ok(lookup::Network::new(context.with_label("network"), cfg))
267}