Skip to main content

tempo_commonware_node/follow/
engine.rs

1//! Follow mode engine that syncs from upstream via RPC.
2//!
3//! This module provides a minimal consensus-layer stack for follow mode:
4//! - Marshal for storage and verification
5//! - Executor for driving Reth
6//! - FeedState for RPC serving
7//! - Resolver for marshal's gap-repair
8//! - Tip tracker for push-based finalization events
9//!
10//! The archive format is shared with the consensus engine running in validator mode
11//! so nodes can switch between validator and follower modes without data migration.
12
13use std::{sync::Arc, time::Duration};
14
15use commonware_broadcast::buffered;
16use commonware_consensus::{Reporters, marshal, types::FixedEpocher};
17use commonware_cryptography::ed25519::PublicKey;
18use commonware_parallel::Sequential;
19use commonware_runtime::{
20    BufferPooler, Clock, ContextCell, Handle, Metrics, Pacer, Spawner, Storage,
21    buffer::paged::CacheRef, spawn_cell,
22};
23use commonware_utils::{NZUsize, channel::mpsc};
24use eyre::{WrapErr as _, eyre};
25use futures::{StreamExt as _, stream::FuturesUnordered};
26use rand_08::{CryptoRng, Rng};
27use tempo_node::TempoFullNode;
28use tracing::{info, info_span};
29
30use super::{driver, resolver, resolver::Resolver, stubs};
31use crate::{
32    consensus::{Digest, block::Block},
33    epoch::SchemeProvider,
34    executor,
35    feed::{self, FeedStateHandle},
36    follow::upstream,
37    storage,
38};
39
40/// Builder for the follow engine.
41#[derive(Clone)]
42pub struct Config<TUpstream> {
43    /// The execution node to drive.
44    pub execution_node: Arc<TempoFullNode>,
45
46    /// Feed state handle for RPC serving.
47    pub feed_state: FeedStateHandle,
48
49    /// Partition prefix for storage.
50    pub partition_prefix: String,
51
52    /// Epoch strategy.
53    pub epoch_strategy: FixedEpocher,
54
55    /// Mailbox size for async channels.
56    pub mailbox_size: usize,
57
58    /// FCU heartbeat interval.
59    pub fcu_heartbeat_interval: Duration,
60
61    /// An actor that can be started with reporters listening to consensus events.
62    pub upstream: TUpstream,
63
64    /// Mailbox to an upstream actor running outside of the follower engine.
65    pub upstream_mailbox: upstream::Mailbox,
66}
67
68impl<TUpstream> Config<TUpstream> {
69    /// Initialize all components and return an [`Engine`] ready to start.
70    pub async fn try_init<TContext>(
71        self,
72        context: TContext,
73    ) -> eyre::Result<Engine<TContext, TUpstream>>
74    where
75        TContext: Clock
76            + Rng
77            + CryptoRng
78            + Metrics
79            + Pacer
80            + Spawner
81            + Storage
82            + BufferPooler
83            + Clone
84            + Send
85            + 'static,
86    {
87        let scheme_provider = SchemeProvider::new();
88
89        let page_cache_ref = CacheRef::from_pooler(
90            &context,
91            storage::BUFFER_POOL_PAGE_SIZE,
92            storage::BUFFER_POOL_CAPACITY,
93        );
94
95        let finalizations_by_height = storage::init_finalizations_archive(
96            &context,
97            &self.partition_prefix,
98            page_cache_ref.clone(),
99        )
100        .await
101        .wrap_err("failed to initialize finalizations by height archive")?;
102
103        let finalized_blocks = storage::init_finalized_blocks_archive(
104            &context,
105            &self.partition_prefix,
106            page_cache_ref.clone(),
107        )
108        .await
109        .wrap_err("failed to initialize finalized blocks archive")?;
110
111        let epoch_strategy = self.epoch_strategy.clone();
112
113        let (marshal_actor, marshal_mailbox, last_finalized_height): (
114            crate::alias::marshal::Actor<TContext>,
115            crate::alias::marshal::Mailbox,
116            _,
117        ) = marshal::core::Actor::init(
118            context.with_label("marshal"),
119            finalizations_by_height,
120            finalized_blocks,
121            marshal::Config {
122                provider: scheme_provider.clone(),
123                epocher: epoch_strategy.clone(),
124                partition_prefix: self.partition_prefix.clone(),
125                mailbox_size: self.mailbox_size,
126                view_retention_timeout: commonware_consensus::types::ViewDelta::new(1),
127                prunable_items_per_section: storage::PRUNABLE_ITEMS_PER_SECTION,
128                page_cache: page_cache_ref,
129                replay_buffer: storage::REPLAY_BUFFER,
130                key_write_buffer: storage::WRITE_BUFFER,
131                value_write_buffer: storage::WRITE_BUFFER,
132                max_repair: storage::MAX_REPAIR,
133                max_pending_acks: NZUsize!(1),
134                block_codec_config: (),
135                strategy: Sequential,
136            },
137        )
138        .await;
139
140        info_span!("follow_engine").in_scope(|| {
141            info!(
142                last_finalized_height = last_finalized_height.get(),
143                "initialized marshal"
144            )
145        });
146
147        let (resolver, resolver_mailbox, resolver_rx) = resolver::try_init(
148            context.with_label("resolver"),
149            resolver::Config {
150                execution_node: self.execution_node.clone(),
151                upstream: self.upstream_mailbox.clone(),
152                mailbox_size: self.mailbox_size,
153            },
154        );
155
156        let (feed_actor, feed_mailbox) = feed::init(
157            context.with_label("feed"),
158            marshal_mailbox.clone(),
159            epoch_strategy.clone(),
160            self.execution_node.clone(),
161            self.feed_state,
162        );
163
164        let (executor_actor, executor_mailbox) = executor::init(
165            context.with_label("executor"),
166            executor::Config {
167                execution_node: self.execution_node.clone(),
168                last_finalized_height,
169                marshal: marshal_mailbox.clone(),
170                fcu_heartbeat_interval: self.fcu_heartbeat_interval,
171                public_key: None,
172            },
173        )
174        .wrap_err("failed to initialize executor")?;
175
176        // No broadcast is needed in follow mode.
177        let broadcast = stubs::null_broadcast(context.with_label("broadcast"), self.mailbox_size);
178
179        let (driver, driver_mailbox) = driver::try_init(
180            context.with_label("driver"),
181            driver::Config {
182                execution_node: self.execution_node.clone(),
183                scheme_provider: scheme_provider.clone(),
184                last_finalized_height,
185                marshal: marshal_mailbox,
186                feed: feed_mailbox,
187                epoch_strategy: epoch_strategy.clone(),
188            },
189        )
190        .wrap_err("failed initializing driver actor")?;
191
192        Ok(Engine {
193            context: ContextCell::new(context),
194            driver,
195            driver_mailbox,
196            resolver,
197            resolver_mailbox,
198            resolver_rx,
199            marshal: marshal_actor,
200            executor: executor_actor,
201            executor_mailbox,
202            feed: feed_actor,
203            broadcast,
204            upstream: self.upstream,
205        })
206    }
207}
208
209pub struct Engine<TContext, TUpstreamActor>
210where
211    TContext: Clock + Rng + CryptoRng + Metrics + Pacer + Spawner + Storage + BufferPooler,
212    TUpstreamActor:,
213{
214    context: ContextCell<TContext>,
215    driver: driver::Driver<TContext>,
216    driver_mailbox: driver::Mailbox,
217    resolver: Resolver<TContext>,
218    resolver_mailbox: resolver::Mailbox,
219    resolver_rx: mpsc::Receiver<commonware_consensus::marshal::resolver::handler::Message<Digest>>,
220    marshal: crate::alias::marshal::Actor<TContext>,
221    executor: executor::Actor<TContext>,
222    executor_mailbox: executor::Mailbox,
223    feed: feed::Actor<TContext>,
224    broadcast: buffered::Mailbox<PublicKey, Block>,
225    upstream: TUpstreamActor,
226}
227
228impl<TContext, TUpstreamActor> Engine<TContext, TUpstreamActor>
229where
230    TContext: Clock
231        + Rng
232        + CryptoRng
233        + Metrics
234        + Pacer
235        + Spawner
236        + Storage
237        + BufferPooler
238        + Clone
239        + Send
240        + 'static,
241    TUpstreamActor: upstream::UpstreamActor,
242{
243    pub fn start(mut self) -> Handle<eyre::Result<()>> {
244        spawn_cell!(self.context, self.run())
245    }
246
247    async fn run(self) -> eyre::Result<()> {
248        let Self {
249            upstream,
250            driver,
251            driver_mailbox,
252            resolver,
253            resolver_mailbox,
254            resolver_rx,
255            marshal,
256            executor,
257            executor_mailbox,
258            feed,
259            broadcast,
260            ..
261        } = self;
262
263        let actors = vec![
264            driver.start(),
265            executor.start(),
266            feed.start(),
267            marshal.start(
268                Reporters::from((
269                    executor_mailbox.clone(),
270                    driver_mailbox.to_marshal_reporter(),
271                )),
272                broadcast,
273                (resolver_rx, resolver_mailbox),
274            ),
275            resolver.start(),
276            upstream.start(driver_mailbox.to_event_reporter()),
277        ];
278
279        // TODO: report which actor failed and why.
280        if FuturesUnordered::from_iter(actors).next().await.is_some() {
281            return Err(eyre!("one critical subsystem exited unexpectedly"));
282        }
283
284        Ok(())
285    }
286}