1use std::{sync::Arc, time::Duration};
14
15use commonware_broadcast::buffered;
16use commonware_consensus::{Reporters, types::FixedEpocher};
17use commonware_cryptography::ed25519::PublicKey;
18use commonware_runtime::{
19 BufferPooler, Clock, ContextCell, Handle, Metrics, Pacer, Spawner, Storage,
20 buffer::paged::CacheRef, spawn_cell,
21};
22use commonware_utils::{NZUsize, channel::mpsc};
23use eyre::{WrapErr as _, eyre};
24use futures::{StreamExt as _, stream::FuturesUnordered};
25use rand_08::{CryptoRng, Rng};
26use tempo_chainspec::NetworkIdentity;
27use tempo_node::TempoFullNode;
28use tracing::{info, info_span};
29
30use super::{driver, resolver, resolver::Resolver, stubs};
31use crate::{
32 alias,
33 consensus::{Digest, block::Block},
34 epoch::SchemeProvider,
35 executor,
36 feed::{self, FeedStateHandle},
37 follow::upstream,
38 storage,
39};
40
41#[derive(Clone)]
43pub struct Config<TUpstream> {
44 pub execution_node: Arc<TempoFullNode>,
46
47 pub feed_state: FeedStateHandle,
49
50 pub partition_prefix: String,
52
53 pub epoch_strategy: FixedEpocher,
55
56 pub network_identity: NetworkIdentity,
58
59 pub mailbox_size: usize,
61
62 pub fcu_heartbeat_interval: Duration,
64
65 pub upstream: TUpstream,
67
68 pub upstream_mailbox: upstream::Mailbox,
70
71 pub finalized_blocks_retention: u64,
74}
75
76impl<TUpstream> Config<TUpstream> {
77 pub async fn try_init<TContext>(
79 self,
80 context: TContext,
81 ) -> eyre::Result<Engine<TContext, TUpstream>>
82 where
83 TContext: Clock
84 + Rng
85 + CryptoRng
86 + Metrics
87 + Pacer
88 + Spawner
89 + Storage
90 + BufferPooler
91 + Clone
92 + Send
93 + 'static,
94 {
95 let scheme_provider = SchemeProvider::new();
96
97 let page_cache_ref = CacheRef::from_pooler(
98 &context,
99 storage::BUFFER_POOL_PAGE_SIZE,
100 storage::BUFFER_POOL_CAPACITY,
101 );
102
103 let epoch_strategy = self.epoch_strategy.clone();
104
105 let alias::marshal::Initialized {
106 actor: marshal_actor,
107 mailbox: marshal_mailbox,
108 last_finalized_height,
109 } = alias::marshal::init(
110 context.clone(),
111 page_cache_ref,
112 self.execution_node.clone(),
113 alias::marshal::Config {
114 partition_prefix: self.partition_prefix.clone(),
115 mailbox_size: self.mailbox_size,
116 view_retention_timeout: commonware_consensus::types::ViewDelta::new(1),
117 max_pending_acks: NZUsize!(1),
118 finalized_blocks_retention: self.finalized_blocks_retention,
119 epoch_strategy: epoch_strategy.clone(),
120 scheme_provider: scheme_provider.clone(),
121 },
122 )
123 .await
124 .wrap_err("failed to initialize marshal")?;
125
126 info_span!("follow_engine").in_scope(|| {
127 info!(
128 last_finalized_height = last_finalized_height.get(),
129 "initialized marshal"
130 )
131 });
132
133 let (resolver, resolver_mailbox, resolver_rx) = resolver::try_init(
134 context.with_label("resolver"),
135 resolver::Config {
136 execution_node: self.execution_node.clone(),
137 upstream: self.upstream_mailbox.clone(),
138 mailbox_size: self.mailbox_size,
139 },
140 );
141
142 let (feed_actor, feed_mailbox) = feed::init(
143 context.with_label("feed"),
144 marshal_mailbox.clone(),
145 epoch_strategy.clone(),
146 self.execution_node.clone(),
147 self.feed_state,
148 );
149
150 let (executor_actor, executor_mailbox) = executor::init(
151 context.with_label("executor"),
152 executor::Config {
153 execution_node: self.execution_node.clone(),
154 last_finalized_height,
155 marshal: marshal_mailbox.clone(),
156 fcu_heartbeat_interval: self.fcu_heartbeat_interval,
157 public_key: None,
158 },
159 )
160 .wrap_err("failed to initialize executor")?;
161
162 let broadcast = stubs::null_broadcast(context.with_label("broadcast"), self.mailbox_size);
164
165 let (driver, driver_mailbox) = driver::try_init(
166 context.with_label("driver"),
167 driver::Config {
168 execution_node: self.execution_node.clone(),
169 scheme_provider: scheme_provider.clone(),
170 network_identity: self.network_identity,
171 last_finalized_height,
172 marshal: marshal_mailbox,
173 feed: feed_mailbox,
174 epoch_strategy: epoch_strategy.clone(),
175 },
176 )
177 .wrap_err("failed initializing driver actor")?;
178
179 Ok(Engine {
180 context: ContextCell::new(context),
181 driver,
182 driver_mailbox,
183 resolver,
184 resolver_mailbox,
185 resolver_rx,
186 marshal: marshal_actor,
187 executor: executor_actor,
188 executor_mailbox,
189 feed: feed_actor,
190 broadcast,
191 upstream: self.upstream,
192 })
193 }
194}
195
196pub struct Engine<TContext, TUpstreamActor>
197where
198 TContext: Clock + Rng + CryptoRng + Metrics + Pacer + Spawner + Storage + BufferPooler,
199 TUpstreamActor:,
200{
201 context: ContextCell<TContext>,
202 driver: driver::Driver<TContext>,
203 driver_mailbox: driver::Mailbox,
204 resolver: Resolver<TContext>,
205 resolver_mailbox: resolver::Mailbox,
206 resolver_rx: mpsc::Receiver<commonware_consensus::marshal::resolver::handler::Message<Digest>>,
207 marshal: crate::alias::marshal::Actor<TContext>,
208 executor: executor::Actor<TContext>,
209 executor_mailbox: executor::Mailbox,
210 feed: feed::Actor<TContext>,
211 broadcast: buffered::Mailbox<PublicKey, Block>,
212 upstream: TUpstreamActor,
213}
214
215impl<TContext, TUpstreamActor> Engine<TContext, TUpstreamActor>
216where
217 TContext: Clock
218 + Rng
219 + CryptoRng
220 + Metrics
221 + Pacer
222 + Spawner
223 + Storage
224 + BufferPooler
225 + Clone
226 + Send
227 + 'static,
228 TUpstreamActor: upstream::UpstreamActor,
229{
230 pub fn start(mut self) -> Handle<eyre::Result<()>> {
231 spawn_cell!(self.context, self.run())
232 }
233
234 async fn run(self) -> eyre::Result<()> {
235 let Self {
236 upstream,
237 driver,
238 driver_mailbox,
239 resolver,
240 resolver_mailbox,
241 resolver_rx,
242 marshal,
243 executor,
244 executor_mailbox,
245 feed,
246 broadcast,
247 ..
248 } = self;
249
250 let actors = vec![
251 driver.start(),
252 executor.start(),
253 feed.start(),
254 marshal.start(
255 Reporters::from((
256 executor_mailbox.clone(),
257 driver_mailbox.to_marshal_reporter(),
258 )),
259 broadcast,
260 (resolver_rx, resolver_mailbox),
261 ),
262 resolver.start(),
263 upstream.start(driver_mailbox.to_event_reporter()),
264 ];
265
266 if FuturesUnordered::from_iter(actors).next().await.is_some() {
268 return Err(eyre!("one critical subsystem exited unexpectedly"));
269 }
270
271 Ok(())
272 }
273}