tempo_commonware_node/follow/
engine.rs1use std::{sync::Arc, time::Duration};
14
15use commonware_broadcast::buffered;
16use commonware_consensus::{Reporters, marshal, types::FixedEpocher};
17use commonware_cryptography::ed25519::PublicKey;
18use commonware_parallel::Sequential;
19use commonware_runtime::{
20 BufferPooler, Clock, ContextCell, Handle, Metrics, Pacer, Spawner, Storage,
21 buffer::paged::CacheRef, spawn_cell,
22};
23use commonware_utils::{NZUsize, channel::mpsc};
24use eyre::{WrapErr as _, eyre};
25use futures::{StreamExt as _, stream::FuturesUnordered};
26use rand_08::{CryptoRng, Rng};
27use tempo_node::TempoFullNode;
28use tracing::{info, info_span};
29
30use super::{driver, resolver, resolver::Resolver, stubs};
31use crate::{
32 consensus::{Digest, block::Block},
33 epoch::SchemeProvider,
34 executor,
35 feed::{self, FeedStateHandle},
36 follow::upstream,
37 storage,
38};
39
40#[derive(Clone)]
42pub struct Config<TUpstream> {
43 pub execution_node: Arc<TempoFullNode>,
45
46 pub feed_state: FeedStateHandle,
48
49 pub partition_prefix: String,
51
52 pub epoch_strategy: FixedEpocher,
54
55 pub mailbox_size: usize,
57
58 pub fcu_heartbeat_interval: Duration,
60
61 pub upstream: TUpstream,
63
64 pub upstream_mailbox: upstream::Mailbox,
66}
67
68impl<TUpstream> Config<TUpstream> {
69 pub async fn try_init<TContext>(
71 self,
72 context: TContext,
73 ) -> eyre::Result<Engine<TContext, TUpstream>>
74 where
75 TContext: Clock
76 + Rng
77 + CryptoRng
78 + Metrics
79 + Pacer
80 + Spawner
81 + Storage
82 + BufferPooler
83 + Clone
84 + Send
85 + 'static,
86 {
87 let scheme_provider = SchemeProvider::new();
88
89 let page_cache_ref = CacheRef::from_pooler(
90 &context,
91 storage::BUFFER_POOL_PAGE_SIZE,
92 storage::BUFFER_POOL_CAPACITY,
93 );
94
95 let finalizations_by_height = storage::init_finalizations_archive(
96 &context,
97 &self.partition_prefix,
98 page_cache_ref.clone(),
99 )
100 .await
101 .wrap_err("failed to initialize finalizations by height archive")?;
102
103 let finalized_blocks = storage::init_finalized_blocks_archive(
104 &context,
105 &self.partition_prefix,
106 page_cache_ref.clone(),
107 )
108 .await
109 .wrap_err("failed to initialize finalized blocks archive")?;
110
111 let epoch_strategy = self.epoch_strategy.clone();
112
113 let (marshal_actor, marshal_mailbox, last_finalized_height): (
114 crate::alias::marshal::Actor<TContext>,
115 crate::alias::marshal::Mailbox,
116 _,
117 ) = marshal::core::Actor::init(
118 context.with_label("marshal"),
119 finalizations_by_height,
120 finalized_blocks,
121 marshal::Config {
122 provider: scheme_provider.clone(),
123 epocher: epoch_strategy.clone(),
124 partition_prefix: self.partition_prefix.clone(),
125 mailbox_size: self.mailbox_size,
126 view_retention_timeout: commonware_consensus::types::ViewDelta::new(1),
127 prunable_items_per_section: storage::PRUNABLE_ITEMS_PER_SECTION,
128 page_cache: page_cache_ref,
129 replay_buffer: storage::REPLAY_BUFFER,
130 key_write_buffer: storage::WRITE_BUFFER,
131 value_write_buffer: storage::WRITE_BUFFER,
132 max_repair: storage::MAX_REPAIR,
133 max_pending_acks: NZUsize!(1),
134 block_codec_config: (),
135 strategy: Sequential,
136 },
137 )
138 .await;
139
140 info_span!("follow_engine").in_scope(|| {
141 info!(
142 last_finalized_height = last_finalized_height.get(),
143 "initialized marshal"
144 )
145 });
146
147 let (resolver, resolver_mailbox, resolver_rx) = resolver::try_init(
148 context.with_label("resolver"),
149 resolver::Config {
150 execution_node: self.execution_node.clone(),
151 upstream: self.upstream_mailbox.clone(),
152 mailbox_size: self.mailbox_size,
153 },
154 );
155
156 let (feed_actor, feed_mailbox) = feed::init(
157 context.with_label("feed"),
158 marshal_mailbox.clone(),
159 epoch_strategy.clone(),
160 self.execution_node.clone(),
161 self.feed_state,
162 );
163
164 let (executor_actor, executor_mailbox) = executor::init(
165 context.with_label("executor"),
166 executor::Config {
167 execution_node: self.execution_node.clone(),
168 last_finalized_height,
169 marshal: marshal_mailbox.clone(),
170 fcu_heartbeat_interval: self.fcu_heartbeat_interval,
171 public_key: None,
172 },
173 )
174 .wrap_err("failed to initialize executor")?;
175
176 let broadcast = stubs::null_broadcast(context.with_label("broadcast"), self.mailbox_size);
178
179 let (driver, driver_mailbox) = driver::try_init(
180 context.with_label("driver"),
181 driver::Config {
182 execution_node: self.execution_node.clone(),
183 scheme_provider: scheme_provider.clone(),
184 last_finalized_height,
185 marshal: marshal_mailbox,
186 feed: feed_mailbox,
187 epoch_strategy: epoch_strategy.clone(),
188 },
189 )
190 .wrap_err("failed initializing driver actor")?;
191
192 Ok(Engine {
193 context: ContextCell::new(context),
194 driver,
195 driver_mailbox,
196 resolver,
197 resolver_mailbox,
198 resolver_rx,
199 marshal: marshal_actor,
200 executor: executor_actor,
201 executor_mailbox,
202 feed: feed_actor,
203 broadcast,
204 upstream: self.upstream,
205 })
206 }
207}
208
209pub struct Engine<TContext, TUpstreamActor>
210where
211 TContext: Clock + Rng + CryptoRng + Metrics + Pacer + Spawner + Storage + BufferPooler,
212 TUpstreamActor:,
213{
214 context: ContextCell<TContext>,
215 driver: driver::Driver<TContext>,
216 driver_mailbox: driver::Mailbox,
217 resolver: Resolver<TContext>,
218 resolver_mailbox: resolver::Mailbox,
219 resolver_rx: mpsc::Receiver<commonware_consensus::marshal::resolver::handler::Message<Digest>>,
220 marshal: crate::alias::marshal::Actor<TContext>,
221 executor: executor::Actor<TContext>,
222 executor_mailbox: executor::Mailbox,
223 feed: feed::Actor<TContext>,
224 broadcast: buffered::Mailbox<PublicKey, Block>,
225 upstream: TUpstreamActor,
226}
227
228impl<TContext, TUpstreamActor> Engine<TContext, TUpstreamActor>
229where
230 TContext: Clock
231 + Rng
232 + CryptoRng
233 + Metrics
234 + Pacer
235 + Spawner
236 + Storage
237 + BufferPooler
238 + Clone
239 + Send
240 + 'static,
241 TUpstreamActor: upstream::UpstreamActor,
242{
243 pub fn start(mut self) -> Handle<eyre::Result<()>> {
244 spawn_cell!(self.context, self.run())
245 }
246
247 async fn run(self) -> eyre::Result<()> {
248 let Self {
249 upstream,
250 driver,
251 driver_mailbox,
252 resolver,
253 resolver_mailbox,
254 resolver_rx,
255 marshal,
256 executor,
257 executor_mailbox,
258 feed,
259 broadcast,
260 ..
261 } = self;
262
263 let actors = vec![
264 driver.start(),
265 executor.start(),
266 feed.start(),
267 marshal.start(
268 Reporters::from((
269 executor_mailbox.clone(),
270 driver_mailbox.to_marshal_reporter(),
271 )),
272 broadcast,
273 (resolver_rx, resolver_mailbox),
274 ),
275 resolver.start(),
276 upstream.start(driver_mailbox.to_event_reporter()),
277 ];
278
279 if FuturesUnordered::from_iter(actors).next().await.is_some() {
281 return Err(eyre!("one critical subsystem exited unexpectedly"));
282 }
283
284 Ok(())
285 }
286}