1#![cfg_attr(not(test), warn(unused_crate_dependencies))]
4#![cfg_attr(docsrs, feature(doc_cfg))]
5
6pub(crate) mod alias;
7mod args;
8pub(crate) mod config;
9pub mod consensus;
10pub(crate) mod dkg;
11pub(crate) mod epoch;
12pub(crate) mod executor;
13pub mod feed;
14pub mod follow;
15pub mod metrics;
16pub(crate) mod peer_manager;
17pub(crate) mod storage;
18pub(crate) mod utils;
19pub(crate) mod validators;
20
21pub(crate) mod subblocks;
22
23use std::sync::Arc;
24
25use commonware_consensus::types::FixedEpocher;
26use commonware_cryptography::ed25519::{PrivateKey, PublicKey};
27use commonware_p2p::authenticated::lookup;
28use commonware_runtime::Metrics as _;
29use commonware_utils::NZU64;
30use eyre::{OptionExt, WrapErr as _, eyre};
31use tempo_commonware_node_config::SigningShare;
32use tempo_node::TempoFullNode;
33
34pub use crate::config::{
35 BROADCASTER_CHANNEL_IDENT, BROADCASTER_LIMIT, CERTIFICATES_CHANNEL_IDENT, CERTIFICATES_LIMIT,
36 DKG_CHANNEL_IDENT, DKG_LIMIT, MARSHAL_CHANNEL_IDENT, MARSHAL_LIMIT, NAMESPACE,
37 RESOLVER_CHANNEL_IDENT, RESOLVER_LIMIT, SUBBLOCKS_CHANNEL_IDENT, SUBBLOCKS_LIMIT,
38 VOTES_CHANNEL_IDENT, VOTES_LIMIT,
39};
40
41pub use args::{Args, PositiveDuration};
42
43const PARTITION_PREFIX: &str = "engine";
46
47pub async fn run_consensus_stack(
48 context: commonware_runtime::tokio::Context,
49 config: Args,
50 execution_node: Arc<TempoFullNode>,
51 feed_state: feed::FeedStateHandle,
52) -> eyre::Result<()> {
53 let share = config
54 .signing_share
55 .as_ref()
56 .map(|share| {
57 SigningShare::read_from_file(share).wrap_err_with(|| {
58 format!(
59 "failed reading private bls12-381 key share from file `{}`",
60 share.display()
61 )
62 })
63 })
64 .transpose()?
65 .map(|signing_share| signing_share.into_inner());
66
67 let signing_key = config
68 .signing_key()?
69 .ok_or_eyre("required option `consensus.signing-key` not set")?;
70
71 let backfill_quota = commonware_runtime::Quota::per_second(config.backfill_frequency);
72
73 let (mut network, oracle) =
74 instantiate_network(&context, &config, signing_key.clone().into_inner())
75 .await
76 .wrap_err("failed to start network")?;
77
78 let message_backlog = config.message_backlog;
79 let votes = network.register(VOTES_CHANNEL_IDENT, VOTES_LIMIT, message_backlog);
80 let certificates = network.register(
81 CERTIFICATES_CHANNEL_IDENT,
82 CERTIFICATES_LIMIT,
83 message_backlog,
84 );
85 let resolver = network.register(RESOLVER_CHANNEL_IDENT, RESOLVER_LIMIT, message_backlog);
86 let broadcaster = network.register(
87 BROADCASTER_CHANNEL_IDENT,
88 BROADCASTER_LIMIT,
89 message_backlog,
90 );
91 let marshal = network.register(MARSHAL_CHANNEL_IDENT, backfill_quota, message_backlog);
92 let dkg = network.register(DKG_CHANNEL_IDENT, DKG_LIMIT, message_backlog);
93 let subblocks = network.register(SUBBLOCKS_CHANNEL_IDENT, SUBBLOCKS_LIMIT, message_backlog);
97
98 let consensus_engine = crate::consensus::engine::Builder {
99 execution_node: Some(execution_node),
100 blocker: oracle.clone(),
101 peer_manager: oracle.clone(),
102
103 partition_prefix: PARTITION_PREFIX.into(),
105 signer: signing_key.into_inner(),
106 share,
107
108 mailbox_size: config.mailbox_size,
109 deque_size: config.deque_size,
110
111 time_to_propose: config.wait_for_proposal.into_duration(),
112 time_to_collect_notarizations: config.wait_for_notarizations.into_duration(),
113 time_to_retry_nullify_broadcast: config.wait_to_rebroadcast_nullify.into_duration(),
114 time_for_peer_response: config.wait_for_peer_response.into_duration(),
115 views_to_track: config.views_to_track,
116 views_until_leader_skip: config.inactive_views_until_leader_skip,
117 payload_interrupt_time: config.time_to_prepare_proposal_transactions.into_duration(),
118 new_payload_wait_time: config.minimum_time_before_propose.into_duration(),
119 time_to_build_subblock: config.time_to_build_subblock.into_duration(),
120 subblock_broadcast_interval: config.subblock_broadcast_interval.into_duration(),
121 fcu_heartbeat_interval: config.fcu_heartbeat_interval.into_duration(),
122 with_subblocks: false,
123
124 feed_state,
125 }
126 .try_init(context.with_label("engine"))
127 .await
128 .wrap_err("failed initializing consensus engine")?;
129
130 let (network, consensus_engine) = (
131 network.start(),
132 consensus_engine.start(
133 votes,
134 certificates,
135 resolver,
136 broadcaster,
137 marshal,
138 dkg,
139 subblocks,
140 ),
141 );
142
143 tokio::select! {
144 ret = network => {
145 ret.map_err(eyre::Report::from)
146 .and_then(|()| Err(eyre!("exited unexpectedly")))
147 .wrap_err("network task failed")
148 }
149
150 ret = consensus_engine => {
151 ret.map_err(eyre::Report::from)
152 .and_then(|ret| ret.and_then(|()| Err(eyre!("exited unexpectedly"))))
153 .wrap_err("consensus engine task failed")
154 }
155 }
156}
157
158pub async fn run_follow_stack(
161 context: commonware_runtime::tokio::Context,
162 config: Args,
163 upstream_url: String,
164 execution_node: Arc<TempoFullNode>,
165 feed_state: feed::FeedStateHandle,
166) -> eyre::Result<()> {
167 let epoch_length = execution_node
168 .chain_spec()
169 .info
170 .epoch_length()
171 .ok_or_eyre("chainspec did not contain epochLength")?;
172
173 let (upstream, upstream_mailbox) = crate::follow::upstream::init(
174 context.with_label("upstream"),
175 crate::follow::upstream::Config { upstream_url },
176 );
177
178 let config = follow::Config {
179 execution_node,
180 feed_state,
181 upstream,
182 upstream_mailbox,
183 partition_prefix: PARTITION_PREFIX.into(),
184 epoch_strategy: FixedEpocher::new(NZU64!(epoch_length)),
185 mailbox_size: config.mailbox_size,
186 fcu_heartbeat_interval: config.fcu_heartbeat_interval.into_duration(),
187 };
188
189 let ret = config
190 .try_init(context.with_label("engine"))
191 .await
192 .wrap_err("failed initializing follow engine")?
193 .start()
194 .await;
195
196 ret.map_err(eyre::Report::from)
197 .and_then(|ret| ret.and_then(|()| Err(eyre!("exited unexpectedly"))))
198 .wrap_err("follow engine task failed")
199}
200
201async fn instantiate_network(
202 context: &commonware_runtime::tokio::Context,
203 config: &Args,
204 signing_key: PrivateKey,
205) -> eyre::Result<(
206 lookup::Network<commonware_runtime::tokio::Context, PrivateKey>,
207 lookup::Oracle<PublicKey>,
208)> {
209 let namespace = commonware_utils::union_unique(crate::config::NAMESPACE, b"_P2P");
212 let cfg = lookup::Config {
213 namespace,
214 crypto: signing_key,
215 listen: config.listen_address,
216 max_message_size: config.max_message_size_bytes,
217 mailbox_size: config.mailbox_size,
218 send_batch_size: commonware_utils::NZUsize!(8),
219 bypass_ip_check: config.bypass_ip_check,
220 allow_private_ips: config.allow_private_ips,
221 allow_dns: config.allow_dns,
222 tracked_peer_sets: crate::config::PEERSETS_TO_TRACK,
223 synchrony_bound: config.synchrony_bound.into_duration(),
224 max_handshake_age: config.handshake_stale_after.into_duration(),
225 handshake_timeout: config.handshake_timeout.into_duration(),
226 max_concurrent_handshakes: config.max_concurrent_handshakes,
227 block_duration: config.time_to_unblock_byzantine_peer.into_duration(),
228 dial_frequency: config.wait_before_peers_redial.into_duration(),
229 ping_frequency: config.wait_before_peers_reping.into_duration(),
230 peer_connection_cooldown: config.connection_per_peer_min_period.into_duration(),
231 allowed_handshake_rate_per_ip: commonware_runtime::Quota::with_period(
232 config.handshake_per_ip_min_period.into_duration(),
233 )
234 .ok_or_eyre("handshake per ip min period must be non-zero")?,
235 allowed_handshake_rate_per_subnet: commonware_runtime::Quota::with_period(
236 config.handshake_per_subnet_min_period.into_duration(),
237 )
238 .ok_or_eyre("handshake per subnet min period must be non-zero")?,
239 };
240
241 Ok(lookup::Network::new(context.with_label("network"), cfg))
242}