1#![cfg_attr(not(test), warn(unused_crate_dependencies))]
4#![cfg_attr(docsrs, feature(doc_cfg))]
5
6pub(crate) mod alias;
7mod args;
8pub(crate) mod config;
9pub mod consensus;
10pub(crate) mod dkg;
11pub(crate) mod epoch;
12pub(crate) mod executor;
13pub mod feed;
14pub mod metrics;
15pub(crate) mod peer_manager;
16pub(crate) mod utils;
17pub(crate) mod validators;
18
19pub(crate) mod subblocks;
20
21use commonware_cryptography::ed25519::{PrivateKey, PublicKey};
22use commonware_p2p::authenticated::lookup;
23use commonware_runtime::Metrics as _;
24use eyre::{OptionExt, WrapErr as _, eyre};
25use tempo_commonware_node_config::SigningShare;
26use tempo_node::TempoFullNode;
27
28pub use crate::config::{
29 BROADCASTER_CHANNEL_IDENT, BROADCASTER_LIMIT, CERTIFICATES_CHANNEL_IDENT, CERTIFICATES_LIMIT,
30 DKG_CHANNEL_IDENT, DKG_LIMIT, MARSHAL_CHANNEL_IDENT, MARSHAL_LIMIT, NAMESPACE,
31 RESOLVER_CHANNEL_IDENT, RESOLVER_LIMIT, SUBBLOCKS_CHANNEL_IDENT, SUBBLOCKS_LIMIT,
32 VOTES_CHANNEL_IDENT, VOTES_LIMIT,
33};
34
35pub use args::{Args, PositiveDuration};
36
37pub async fn run_consensus_stack(
38 context: &commonware_runtime::tokio::Context,
39 config: Args,
40 execution_node: TempoFullNode,
41 feed_state: feed::FeedStateHandle,
42) -> eyre::Result<()> {
43 let share = config
44 .signing_share
45 .as_ref()
46 .map(|share| {
47 SigningShare::read_from_file(share).wrap_err_with(|| {
48 format!(
49 "failed reading private bls12-381 key share from file `{}`",
50 share.display()
51 )
52 })
53 })
54 .transpose()?
55 .map(|signing_share| signing_share.into_inner());
56
57 let signing_key = config
58 .signing_key()?
59 .ok_or_eyre("required option `consensus.signing-key` not set")?;
60
61 let backfill_quota = commonware_runtime::Quota::per_second(config.backfill_frequency);
62
63 let (mut network, oracle) =
64 instantiate_network(context, &config, signing_key.clone().into_inner())
65 .await
66 .wrap_err("failed to start network")?;
67
68 let message_backlog = config.message_backlog;
69 let votes = network.register(VOTES_CHANNEL_IDENT, VOTES_LIMIT, message_backlog);
70 let certificates = network.register(
71 CERTIFICATES_CHANNEL_IDENT,
72 CERTIFICATES_LIMIT,
73 message_backlog,
74 );
75 let resolver = network.register(RESOLVER_CHANNEL_IDENT, RESOLVER_LIMIT, message_backlog);
76 let broadcaster = network.register(
77 BROADCASTER_CHANNEL_IDENT,
78 BROADCASTER_LIMIT,
79 message_backlog,
80 );
81 let marshal = network.register(MARSHAL_CHANNEL_IDENT, backfill_quota, message_backlog);
82 let dkg = network.register(DKG_CHANNEL_IDENT, DKG_LIMIT, message_backlog);
83 let subblocks = network.register(SUBBLOCKS_CHANNEL_IDENT, SUBBLOCKS_LIMIT, message_backlog);
87
88 let fee_recipient = config
89 .fee_recipient
90 .ok_or_eyre("required option `consensus.fee-recipient` not set")?;
91
92 let consensus_engine = crate::consensus::engine::Builder {
93 fee_recipient,
94
95 execution_node: Some(execution_node),
96 blocker: oracle.clone(),
97 peer_manager: oracle.clone(),
98
99 partition_prefix: "engine".into(),
101 signer: signing_key.into_inner(),
102 share,
103
104 mailbox_size: config.mailbox_size,
105 deque_size: config.deque_size,
106
107 time_to_propose: config.wait_for_proposal.into_duration(),
108 time_to_collect_notarizations: config.wait_for_notarizations.into_duration(),
109 time_to_retry_nullify_broadcast: config.wait_to_rebroadcast_nullify.into_duration(),
110 time_for_peer_response: config.wait_for_peer_response.into_duration(),
111 views_to_track: config.views_to_track,
112 views_until_leader_skip: config.inactive_views_until_leader_skip,
113 payload_interrupt_time: config.time_to_prepare_proposal_transactions.into_duration(),
114 new_payload_wait_time: config.minimum_time_before_propose.into_duration(),
115 time_to_build_subblock: config.time_to_build_subblock.into_duration(),
116 subblock_broadcast_interval: config.subblock_broadcast_interval.into_duration(),
117 fcu_heartbeat_interval: config.fcu_heartbeat_interval.into_duration(),
118 with_subblocks: config.enable_subblocks,
119
120 feed_state,
121 }
122 .try_init(context.with_label("engine"))
123 .await
124 .wrap_err("failed initializing consensus engine")?;
125
126 let (network, consensus_engine) = (
127 network.start(),
128 consensus_engine.start(
129 votes,
130 certificates,
131 resolver,
132 broadcaster,
133 marshal,
134 dkg,
135 subblocks,
136 ),
137 );
138
139 tokio::select! {
140 ret = network => {
141 ret.map_err(eyre::Report::from)
142 .and_then(|()| Err(eyre!("exited unexpectedly")))
143 .wrap_err("network task failed")
144 }
145
146 ret = consensus_engine => {
147 ret.map_err(eyre::Report::from)
148 .and_then(|ret| ret.and_then(|()| Err(eyre!("exited unexpectedly"))))
149 .wrap_err("consensus engine task failed")
150 }
151 }
152}
153
154async fn instantiate_network(
155 context: &commonware_runtime::tokio::Context,
156 config: &Args,
157 signing_key: PrivateKey,
158) -> eyre::Result<(
159 lookup::Network<commonware_runtime::tokio::Context, PrivateKey>,
160 lookup::Oracle<PublicKey>,
161)> {
162 let namespace = commonware_utils::union_unique(crate::config::NAMESPACE, b"_P2P");
165 let cfg = lookup::Config {
166 namespace,
167 crypto: signing_key,
168 listen: config.listen_address,
169 max_message_size: config.max_message_size_bytes,
170 mailbox_size: config.mailbox_size,
171 bypass_ip_check: config.bypass_ip_check,
172 allow_private_ips: config.allow_private_ips,
173 allow_dns: config.allow_dns,
174 tracked_peer_sets: crate::config::PEERSETS_TO_TRACK,
175 synchrony_bound: config.synchrony_bound.into_duration(),
176 max_handshake_age: config.handshake_stale_after.into_duration(),
177 handshake_timeout: config.handshake_timeout.into_duration(),
178 max_concurrent_handshakes: config.max_concurrent_handshakes,
179 block_duration: config.time_to_unblock_byzantine_peer.into_duration(),
180 dial_frequency: config.wait_before_peers_redial.into_duration(),
181 query_frequency: config.wait_before_peers_discovery.into_duration(),
182 ping_frequency: config.wait_before_peers_reping.into_duration(),
183 allowed_connection_rate_per_peer: commonware_runtime::Quota::with_period(
184 config.connection_per_peer_min_period.into_duration(),
185 )
186 .ok_or_eyre("connection min period must be non-zero")?,
187 allowed_handshake_rate_per_ip: commonware_runtime::Quota::with_period(
188 config.handshake_per_ip_min_period.into_duration(),
189 )
190 .ok_or_eyre("handshake per ip min period must be non-zero")?,
191 allowed_handshake_rate_per_subnet: commonware_runtime::Quota::with_period(
192 config.handshake_per_subnet_min_period.into_duration(),
193 )
194 .ok_or_eyre("handshake per subnet min period must be non-zero")?,
195 };
196
197 Ok(lookup::Network::new(context.with_label("network"), cfg))
198}