Skip to main content

tempo_consensus/storage/hybrid/
mod.rs

1//! [`Hybrid`] is a prunable archive of finalized blocks fronting reth.
2//!
3//! Reth is the source of truth. The prunable archive is a hot cache of
4//! the most-recently finalized blocks that lets the marshal serve
5//! gap-repair without round-tripping to reth on every read. The marshal
6//! only sees the [`Blocks`] interface and is unaware which side served
7//! a given read.
8//!
9//! # Eviction
10//!
11//! Eviction is reth-driven, not marshal-driven. Every [`Blocks::put`]
12//! reads reth's finalized watermark and prunes the cache below
13//! `reth.finalized − retention_blocks + 1`. [`Blocks::prune`] from the
14//! marshal is a no-op.
15//!
16//! As a consequences, the cache never drops a block reth doesn't yet have.
17//! If reth is lagging the marshal actor, the cache temporarily holds more than
18//! `retention_blocks` items.
19//!
20//! # Section-rounding
21//!
22//! The prunable archive prunes in whole sections of
23//! `items_per_section` items (4096 in production); `prune(min)` rounds
24//! `min` down to a section boundary.
25//!
26//! Consequences:
27//!
28//! - Retention is approximate: the cache holds between `retention_blocks`
29//!   and `retention_blocks + items_per_section − 1` items, evicted in
30//!   one-section batches.
31//! - The cache/reth boundary is the section-aligned `oldest_allowed`,
32//!   not `tip − retention`. Reads in `[oldest_allowed, tip − retention)`
33//!   still hit the cache. Same answer, slightly wasteful path.
34//! - A re-`put` below the requested floor may still land in the cache
35//!   (live tail section) or be silently absorbed (see "Stale puts"
36//!   below).
37//!
38//! Pick `retention_blocks` as a few multiples of `items_per_section` so
39//! section overshoot is a small fraction of the working set; see
40//! [`super::DEFAULT_FINALIZED_BLOCKS_RETENTION`].
41//!
42//! # Stale puts
43//!
44//! [`Hybrid::put`] absorbs the prunable archive's
45//! [`archive::Error::AlreadyPrunedTo`] as a silent success. The
46//! eviction invariant
47//! `oldest_allowed ≤ section_aligned(reth.finalized − retention + 1)
48//! ≤ reth.finalized` guarantees that a put at `H < oldest_allowed`
49//! also has `H ≤ reth.finalized`, so the block is durable in reth and
50//! a subsequent [`Blocks::get`] will hit the reth fallback. Surfacing
51//! the error would crash the node on a recoverable condition (e.g.
52//! follow-mode catching up while reth has synced past the cache
53//! window).
54//!
55//! # Why reth pruning is not a concern
56//!
57//! Reth may be configured to retain only a window of recent history,
58//! creating a `reth.pruned_below ≤ reth.finalized` watermark. The
59//! marshal can never ask for a block panic-on-miss below
60//! `reth.pruned_below`:
61//!
62//! - **`Blocks::put(H)`**: marshal's `last_processed_height` is floored
63//!   to `max(stored_height, reth.finalized)` at startup
64//!   ([`alias::marshal::init`]), so every put has
65//!   `H > reth.finalized > reth.pruned_below`.
66//! - **`Blocks::get(Index(H))`**: only ever asks for the next
67//!   contiguous height or a `gap_end` already in the cache.
68//! - **`Blocks::get(Key(digest))`**: gap-repair parent walks may ask
69//!   for a digest below `reth.pruned_below`; on miss we return
70//!   `Ok(None)` and the marshal falls back to peer resolution.
71//!
72//! Configuring reth with a smaller retention window than
73//! `retention_blocks` is a perf concern (more peer fetches), not a
74//! correctness one.
75//!
76//! [`alias::marshal::init`]: crate::alias::marshal::init
77
78use alloy_primitives::B256;
79use commonware_consensus::{Heightable as _, marshal::store::Blocks, types::Height};
80use commonware_runtime::{BufferPooler, Clock, Metrics, Storage};
81use commonware_storage::{
82    archive::{self, Identifier, prunable},
83    translator::TwoCap,
84};
85use reth_node_core::primitives::SealedBlock;
86use reth_provider::{
87    BlockReader, BlockSource, ProviderError, ProviderResult,
88    providers::{BlockchainProvider, ProviderNodeTypes},
89};
90use tracing::{debug, info, instrument, warn};
91
92use crate::consensus::{Digest, block::Block};
93
94#[cfg(test)]
95mod test;
96
97/// Narrow view of reth that [`Hybrid`] needs: a finalized watermark and
98/// canonical-by-height / canonical-by-hash block reads.
99///
100/// Exists to make unit testing easier. [`BlockchainProvider`] is used in
101/// production.
102pub(crate) trait FinalizedBlocksProvider: Send + Sync {
103    /// Reth's last finalized block height, or `None` if reth has not yet
104    /// finalized anything (fresh chain).
105    fn finalized_height(&self) -> Option<u64>;
106
107    /// Look up a finalized block by height in reth.
108    ///
109    /// Implementations MUST return `None` for any `height` above
110    /// [`Self::finalized_height`] (and unconditionally when
111    /// [`Self::finalized_height`] is `None`); the marshal relies on
112    /// [`Hybrid`] only serving blocks reth has marked as finalized.
113    fn block_by_height(&self, height: u64) -> ProviderResult<Option<Block>>;
114
115    /// Look up a finalized block by hash in reth.
116    ///
117    /// Implementations MUST restrict the lookup to canonical blocks
118    /// only — pending/in-flight blocks must never be returned, otherwise
119    /// the marshal could be handed a non-finalized block.
120    fn block_by_hash(&self, hash: B256) -> ProviderResult<Option<Block>>;
121}
122
123/// Production impl over reth's [`BlockchainProvider`] — the only type
124/// passed to [`Hybrid`] in production. Generic over `N` only so the
125/// impl works for any concrete `BlockchainProvider<N>` whose primitive
126/// block type is [`tempo_primitives::Block`] (e.g. the
127/// `BlockchainProvider<NodeTypesWithDBAdapter<TempoNode, DatabaseEnv>>`
128/// alias used by `tempo_node::TempoFullNode`).
129impl<N> FinalizedBlocksProvider for BlockchainProvider<N>
130where
131    N: ProviderNodeTypes,
132    Self: BlockReader<Block = tempo_primitives::Block>,
133{
134    fn finalized_height(&self) -> Option<u64> {
135        // Direct read of `canonical_in_memory_state` — equivalent to
136        // `BlockchainProvider`'s `BlockIdReader::finalized_block_num_hash`,
137        // but typed as the genuinely infallible `Option<u64>` rather
138        // than the `Result<Option<_>>` reth's trait demands.
139        self.canonical_in_memory_state()
140            .get_finalized_num_hash()
141            .map(|nh| nh.number)
142    }
143
144    #[instrument(skip_all, fields(height), err)]
145    fn block_by_height(&self, height: u64) -> ProviderResult<Option<Block>> {
146        // Gate the lookup on reth's finalized watermark so the marshal can
147        // never be served a block that reth has not yet marked as
148        // finalized. Without this, the canonical-by-number read would
149        // happily return a block that is canonical-but-not-yet-finalized
150        // (or even a block reth has accepted past its finalized tip),
151        // violating [`Blocks`]'s "finalized only" contract.
152        let Some(finalized) = self.finalized_height() else {
153            // Reth has not finalized anything yet — nothing below the
154            // (nonexistent) finalized watermark is reachable.
155            return Ok(None);
156        };
157        if height > finalized {
158            return Ok(None);
159        }
160        match self.block_by_number(height) {
161            Ok(maybe_block) => Ok(maybe_block.map(|block| {
162                Block::from_execution_block_unchecked(SealedBlock::seal_slow(block), None)
163            })),
164            Err(err @ ProviderError::BlockExpired { .. }) => {
165                info!(error = %eyre::Report::new(err), "cannot find block");
166                Ok(None)
167            }
168            Err(bad_error) => Err(bad_error),
169        }
170    }
171
172    #[instrument(skip_all, fields(hash), err)]
173    fn block_by_hash(&self, hash: B256) -> ProviderResult<Option<Block>> {
174        // `Canonical` (not `Any`) so the marshal can never be served a
175        // block that lives only in reth's pending in-memory tree — see
176        // [`Blocks::get`] on [`Hybrid`].
177        match self.find_block_by_hash(hash, BlockSource::Canonical) {
178            Ok(maybe_block) => Ok(maybe_block.map(|block| {
179                Block::from_execution_block_unchecked(SealedBlock::seal_slow(block), None)
180            })),
181            Err(err @ ProviderError::BlockExpired { .. }) => {
182                info!(error = %eyre::Report::new(err), "cannot find block");
183                Ok(None)
184            }
185            Err(bad_error) => Err(bad_error),
186        }
187    }
188}
189
190/// Error returned by [`Hybrid`]'s [`Blocks`] impl.
191#[derive(Debug, thiserror::Error)]
192pub(crate) enum Error {
193    #[error(transparent)]
194    Archive(#[from] archive::Error),
195
196    #[error(transparent)]
197    Provider(#[from] reth_provider::ProviderError),
198}
199
200/// Backing prunable archive type.
201// TODO: Look into whether to take TwoCap or another translator. TwoCap is used
202// by commonware itself, but higher caps can reduce the likelihood of collisions.
203pub(crate) type Prunable<TContext> = prunable::Archive<TwoCap, TContext, Digest, Block>;
204
205/// Configuration for [`Hybrid`].
206pub(crate) struct Config<TContext, TExecutionBlockProvider>
207where
208    TContext: BufferPooler + Storage + Metrics + Clock,
209{
210    /// Prunable archive backing the most recently finalized blocks.
211    pub(crate) prunable: Prunable<TContext>,
212
213    /// Execution layer block provider used to look up finalized blocks below
214    /// the cache window and to read reth's finalized watermark for cache
215    /// eviction.
216    pub(crate) execution_block_provider: TExecutionBlockProvider,
217
218    /// Number of most-recently-finalized blocks (relative to the EL's
219    /// finalized watermark) to keep in the prunable cache. Anything
220    /// older is dropped from the cache and served out of
221    /// [`Self::execution_block_provider`] instead.
222    pub(crate) retention_blocks: u64,
223}
224
225/// Finalized blocks store backed by a prunable archive (a hot cache of
226/// the most recently finalized blocks) and reth (the source of truth for
227/// finalized blocks).
228pub(crate) struct Hybrid<TContext, TExecutionBlockProvider>
229where
230    TContext: BufferPooler + Storage + Metrics + Clock,
231{
232    /// Hot cache of recently finalized blocks. Bounded to roughly
233    /// `retention_blocks` items above reth's finalized watermark via
234    /// [`Self::evict_below_execution_finalized_floor`].
235    prunable: Prunable<TContext>,
236
237    /// Execution layer block provider used to look up finalized blocks below
238    /// the cache window and to read reth's finalized watermark for cache
239    /// eviction.
240    execution_block_provider: TExecutionBlockProvider,
241
242    /// Number of most-recently-finalized blocks (relative to the EL's
243    /// finalized watermark) to keep in the prunable cache. Anything
244    /// older is dropped from the cache and served out of
245    /// [`Self::execution_block_provider`] instead.
246    retention_blocks: u64,
247}
248
249impl<TContext, P> Hybrid<TContext, P>
250where
251    TContext: BufferPooler + Storage + Metrics + Clock,
252    P: FinalizedBlocksProvider + 'static,
253{
254    pub(crate) fn new(config: Config<TContext, P>) -> Self {
255        let Config {
256            prunable,
257            execution_block_provider: provider,
258            retention_blocks,
259        } = config;
260        Self {
261            prunable,
262            execution_block_provider: provider,
263            retention_blocks,
264        }
265    }
266
267    /// Drops blocks below the execution layer's finalized watermark.
268    ///
269    /// The cache is sized relative to the EL's finalized boundary, not to
270    /// the height the marshal happens to put. This keeps two invariants:
271    ///
272    /// - We never evict a block the EL doesn't yet have. If the EL is lagging
273    ///   the marshal, no eviction happens; the cache temporarily grows past
274    ///   `retention_blocks`.
275    /// - Eviction tracks the EL's progress monotonically. Once the EL
276    ///   finalizes height `H`, the cache may drop everything below
277    ///   `H - retention_blocks + 1` on the next put.
278    async fn evict_below_execution_finalized_floor(&mut self) -> Result<(), archive::Error> {
279        // Reth hasn't finalized anything yet (fresh chain) — nothing is
280        // safe to evict.
281        let Some(execution_finalized) = self.execution_block_provider.finalized_height() else {
282            return Ok(());
283        };
284        let Some(min_to_keep) = execution_finalized.checked_sub(self.retention_blocks) else {
285            return Ok(());
286        };
287        // `prune(min)` keeps `min` and above, and the archive rounds `min`
288        // *down* to a section boundary, so the actual retained window can
289        // exceed `retention_blocks` by up to `items_per_section − 1` items.
290        // See the module docs ("Section-rounding") for the full story.
291        let prune_floor = min_to_keep.saturating_add(1);
292        prunable::Archive::prune(&mut self.prunable, prune_floor).await
293    }
294}
295
296impl<TContext, TExecutionBlockProvider> Blocks for Hybrid<TContext, TExecutionBlockProvider>
297where
298    TContext: BufferPooler + Storage + Metrics + Clock + Send + Sync + 'static,
299    TExecutionBlockProvider: FinalizedBlocksProvider + 'static,
300{
301    type Block = Block;
302    type Error = Error;
303
304    #[instrument(skip_all, err)]
305    async fn put(&mut self, block: Self::Block) -> Result<(), Self::Error> {
306        let height = block.height();
307        let digest = block.digest();
308        match archive::Archive::put(&mut self.prunable, height.get(), digest, block).await {
309            Ok(()) => {}
310            // The prunable cache has already evicted this height — but
311            // by the cache's eviction invariant
312            // (`oldest_allowed ≤ section_aligned(execution_finalized − retention + 1)
313            // ≤ execution_finalized`), `height < oldest_allowed` implies
314            // `height ≤ execution_finalized`. The EL's finality contract
315            // guarantees every block at or below `execution_finalized` is
316            // durably persisted, so the marshal's subsequent
317            // `Blocks::get(height)` will be served out of the execution
318            // layer fallback path. We can't write to EL ourselves (it owns
319            // its own storage), but we don't have to — the block is
320            // already durable. Treat the put as a successful no-op so
321            // we don't trip the marshal's "failed to finalize" panic
322            // on a perfectly recoverable condition.
323            Err(archive::Error::AlreadyPrunedTo(oldest_allowed)) => {
324                debug!(
325                    %height,
326                    oldest_allowed,
327                    execution_finalized = ?self.execution_block_provider.finalized_height(),
328                    "finalized block below prunable cache window; trusting the \
329                    execution layer's finalized storage and treating put as a \
330                    no-op"
331                );
332            }
333            Err(other) => return Err(other.into()),
334        }
335
336        if let Err(err) = self.evict_below_execution_finalized_floor().await {
337            // Eviction failures are not fatal; the next put will retry.
338            // We log because they may indicate disk-level issues.
339            warn!(
340                %err,
341                %height,
342                retention = self.retention_blocks,
343                "failed to evict prunable finalized blocks cache after put"
344            );
345        }
346        Ok(())
347    }
348
349    async fn sync(&mut self) -> Result<(), Self::Error> {
350        archive::Archive::sync(&mut self.prunable).await?;
351        Ok(())
352    }
353
354    /// Attempts to read `id` from the prunable archive, falling back to EL on miss.
355    async fn get(&self, id: Identifier<'_, Digest>) -> Result<Option<Self::Block>, Self::Error> {
356        // EL read errors propagate to the marshal — see this module's
357        // doc comment ("Marshal panic behavior") for what happens then.
358        match id {
359            Identifier::Index(height) => {
360                if let Some(block) =
361                    archive::Archive::get(&self.prunable, Identifier::Index(height)).await?
362                {
363                    return Ok(Some(block));
364                }
365                Ok(self.execution_block_provider.block_by_height(height)?)
366            }
367            Identifier::Key(digest) => {
368                if let Some(block) =
369                    archive::Archive::get(&self.prunable, Identifier::Key(digest)).await?
370                {
371                    return Ok(Some(block));
372                }
373                Ok(self.execution_block_provider.block_by_hash(digest.0)?)
374            }
375        }
376    }
377
378    /// No-op: Cache eviction is EL-driven (see [`Self::evict_below_execution_finalized_floor`]).
379    async fn prune(&mut self, _min: Height) -> Result<(), Self::Error> {
380        Ok(())
381    }
382
383    fn missing_items(&self, start: Height, max: usize) -> Vec<Height> {
384        // EL is treated as a contiguous source; gaps can only exist in the
385        // prunable archive. The marshal only ever asks about heights at or
386        // above `last_processed_height`, which the prunable archive must
387        // cover.
388        archive::Archive::missing_items(&self.prunable, start.get(), max)
389            .into_iter()
390            .map(Height::new)
391            .collect()
392    }
393
394    fn next_gap(&self, value: Height) -> (Option<Height>, Option<Height>) {
395        let (a, b) = archive::Archive::next_gap(&self.prunable, value.get());
396        (a.map(Height::new), b.map(Height::new))
397    }
398
399    fn last_index(&self) -> Option<Height> {
400        // Only report what lives in the prunable archive. The marshal uses
401        // this to drive repair against the certificates archive; reflecting
402        // EL here would mask gaps the marshal needs to fill via the
403        // resolver.
404        archive::Archive::last_index(&self.prunable).map(Height::new)
405    }
406}