Skip to main content

tempo_consensus/follow/
engine.rs

1//! Follow mode engine that syncs from upstream via RPC.
2//!
3//! This module provides a minimal consensus-layer stack for follow mode:
4//! - Marshal for storage and verification
5//! - Executor for driving Reth
6//! - FeedState for RPC serving
7//! - Resolver for marshal's gap-repair
8//! - Tip tracker for push-based finalization events
9//!
10//! The archive format is shared with the consensus engine running in validator mode
11//! so nodes can switch between validator and follower modes without data migration.
12
13use std::{sync::Arc, time::Duration};
14
15use commonware_broadcast::buffered;
16use commonware_consensus::{Reporters, types::FixedEpocher};
17use commonware_cryptography::ed25519::PublicKey;
18use commonware_runtime::{
19    BufferPooler, Clock, ContextCell, Handle, Metrics, Pacer, Spawner, Storage,
20    buffer::paged::CacheRef, spawn_cell,
21};
22use commonware_utils::{NZUsize, channel::mpsc};
23use eyre::{WrapErr as _, eyre};
24use futures::{StreamExt as _, stream::FuturesUnordered};
25use rand_08::{CryptoRng, Rng};
26use tempo_chainspec::NetworkIdentity;
27use tempo_node::TempoFullNode;
28use tracing::{info, info_span};
29
30use super::{driver, resolver, resolver::Resolver, stubs};
31use crate::{
32    alias,
33    consensus::{Digest, block::Block},
34    epoch::SchemeProvider,
35    executor,
36    feed::{self, FeedStateHandle},
37    follow::upstream,
38    storage,
39};
40
41/// Builder for the follow engine.
42#[derive(Clone)]
43pub struct Config<TUpstream> {
44    /// The execution node to drive.
45    pub execution_node: Arc<TempoFullNode>,
46
47    /// Feed state handle for RPC serving.
48    pub feed_state: FeedStateHandle,
49
50    /// Partition prefix for storage.
51    pub partition_prefix: String,
52
53    /// Epoch strategy.
54    pub epoch_strategy: FixedEpocher,
55
56    /// Latest network Identity of the chain.
57    pub network_identity: NetworkIdentity,
58
59    /// Mailbox size for async channels.
60    pub mailbox_size: usize,
61
62    /// FCU heartbeat interval.
63    pub fcu_heartbeat_interval: Duration,
64
65    /// An actor that can be started with reporters listening to consensus events.
66    pub upstream: TUpstream,
67
68    /// Mailbox to an upstream actor running outside of the follower engine.
69    pub upstream_mailbox: upstream::Mailbox,
70
71    /// Number of recently finalized blocks retained in the prunable archive
72    /// passed to the marshal actor. Older blocks are served from reth.
73    pub finalized_blocks_retention: u64,
74}
75
76impl<TUpstream> Config<TUpstream> {
77    /// Initialize all components and return an [`Engine`] ready to start.
78    pub async fn try_init<TContext>(
79        self,
80        context: TContext,
81    ) -> eyre::Result<Engine<TContext, TUpstream>>
82    where
83        TContext: Clock
84            + Rng
85            + CryptoRng
86            + Metrics
87            + Pacer
88            + Spawner
89            + Storage
90            + BufferPooler
91            + Clone
92            + Send
93            + 'static,
94    {
95        let scheme_provider = SchemeProvider::new();
96
97        let page_cache_ref = CacheRef::from_pooler(
98            &context,
99            storage::BUFFER_POOL_PAGE_SIZE,
100            storage::BUFFER_POOL_CAPACITY,
101        );
102
103        let epoch_strategy = self.epoch_strategy.clone();
104
105        let alias::marshal::Initialized {
106            actor: marshal_actor,
107            mailbox: marshal_mailbox,
108            last_finalized_height,
109        } = alias::marshal::init(
110            context.clone(),
111            page_cache_ref,
112            self.execution_node.clone(),
113            alias::marshal::Config {
114                partition_prefix: self.partition_prefix.clone(),
115                mailbox_size: self.mailbox_size,
116                view_retention_timeout: commonware_consensus::types::ViewDelta::new(1),
117                max_pending_acks: NZUsize!(1),
118                finalized_blocks_retention: self.finalized_blocks_retention,
119                epoch_strategy: epoch_strategy.clone(),
120                scheme_provider: scheme_provider.clone(),
121            },
122        )
123        .await
124        .wrap_err("failed to initialize marshal")?;
125
126        info_span!("follow_engine").in_scope(|| {
127            info!(
128                last_finalized_height = last_finalized_height.get(),
129                "initialized marshal"
130            )
131        });
132
133        let (resolver, resolver_mailbox, resolver_rx) = resolver::try_init(
134            context.with_label("resolver"),
135            resolver::Config {
136                execution_node: self.execution_node.clone(),
137                upstream: self.upstream_mailbox.clone(),
138                mailbox_size: self.mailbox_size,
139            },
140        );
141
142        let (feed_actor, feed_mailbox) = feed::init(
143            context.with_label("feed"),
144            marshal_mailbox.clone(),
145            epoch_strategy.clone(),
146            self.execution_node.clone(),
147            self.feed_state,
148        );
149
150        let (executor_actor, executor_mailbox) = executor::init(
151            context.with_label("executor"),
152            executor::Config {
153                execution_node: self.execution_node.clone(),
154                last_finalized_height,
155                marshal: marshal_mailbox.clone(),
156                fcu_heartbeat_interval: self.fcu_heartbeat_interval,
157                public_key: None,
158            },
159        )
160        .wrap_err("failed to initialize executor")?;
161
162        // No broadcast is needed in follow mode.
163        let broadcast = stubs::null_broadcast(context.with_label("broadcast"), self.mailbox_size);
164
165        let (driver, driver_mailbox) = driver::try_init(
166            context.with_label("driver"),
167            driver::Config {
168                execution_node: self.execution_node.clone(),
169                scheme_provider: scheme_provider.clone(),
170                network_identity: self.network_identity,
171                last_finalized_height,
172                marshal: marshal_mailbox,
173                feed: feed_mailbox,
174                epoch_strategy: epoch_strategy.clone(),
175            },
176        )
177        .wrap_err("failed initializing driver actor")?;
178
179        Ok(Engine {
180            context: ContextCell::new(context),
181            driver,
182            driver_mailbox,
183            resolver,
184            resolver_mailbox,
185            resolver_rx,
186            marshal: marshal_actor,
187            executor: executor_actor,
188            executor_mailbox,
189            feed: feed_actor,
190            broadcast,
191            upstream: self.upstream,
192        })
193    }
194}
195
196pub struct Engine<TContext, TUpstreamActor>
197where
198    TContext: Clock + Rng + CryptoRng + Metrics + Pacer + Spawner + Storage + BufferPooler,
199    TUpstreamActor:,
200{
201    context: ContextCell<TContext>,
202    driver: driver::Driver<TContext>,
203    driver_mailbox: driver::Mailbox,
204    resolver: Resolver<TContext>,
205    resolver_mailbox: resolver::Mailbox,
206    resolver_rx: mpsc::Receiver<commonware_consensus::marshal::resolver::handler::Message<Digest>>,
207    marshal: crate::alias::marshal::Actor<TContext>,
208    executor: executor::Actor<TContext>,
209    executor_mailbox: executor::Mailbox,
210    feed: feed::Actor<TContext>,
211    broadcast: buffered::Mailbox<PublicKey, Block>,
212    upstream: TUpstreamActor,
213}
214
215impl<TContext, TUpstreamActor> Engine<TContext, TUpstreamActor>
216where
217    TContext: Clock
218        + Rng
219        + CryptoRng
220        + Metrics
221        + Pacer
222        + Spawner
223        + Storage
224        + BufferPooler
225        + Clone
226        + Send
227        + 'static,
228    TUpstreamActor: upstream::UpstreamActor,
229{
230    pub fn start(mut self) -> Handle<eyre::Result<()>> {
231        spawn_cell!(self.context, self.run())
232    }
233
234    async fn run(self) -> eyre::Result<()> {
235        let Self {
236            upstream,
237            driver,
238            driver_mailbox,
239            resolver,
240            resolver_mailbox,
241            resolver_rx,
242            marshal,
243            executor,
244            executor_mailbox,
245            feed,
246            broadcast,
247            ..
248        } = self;
249
250        let actors = vec![
251            driver.start(),
252            executor.start(),
253            feed.start(),
254            marshal.start(
255                Reporters::from((
256                    executor_mailbox.clone(),
257                    driver_mailbox.to_marshal_reporter(),
258                )),
259                broadcast,
260                (resolver_rx, resolver_mailbox),
261            ),
262            resolver.start(),
263            upstream.start(driver_mailbox.to_event_reporter()),
264        ];
265
266        // TODO: report which actor failed and why.
267        if FuturesUnordered::from_iter(actors).next().await.is_some() {
268            return Err(eyre!("one critical subsystem exited unexpectedly"));
269        }
270
271        Ok(())
272    }
273}