tempo_consensus/storage/hybrid/mod.rs
1//! [`Hybrid`] is a prunable archive of finalized blocks fronting reth.
2//!
3//! Reth is the source of truth. The prunable archive is a hot cache of
4//! the most-recently finalized blocks that lets the marshal serve
5//! gap-repair without round-tripping to reth on every read. The marshal
6//! only sees the [`Blocks`] interface and is unaware which side served
7//! a given read.
8//!
9//! # Eviction
10//!
11//! Eviction is reth-driven, not marshal-driven. Every [`Blocks::put`]
12//! reads reth's finalized watermark and prunes the cache below
13//! `reth.finalized − retention_blocks + 1`. [`Blocks::prune`] from the
14//! marshal is a no-op.
15//!
16//! As a consequences, the cache never drops a block reth doesn't yet have.
17//! If reth is lagging the marshal actor, the cache temporarily holds more than
18//! `retention_blocks` items.
19//!
20//! # Section-rounding
21//!
22//! The prunable archive prunes in whole sections of
23//! `items_per_section` items (4096 in production); `prune(min)` rounds
24//! `min` down to a section boundary.
25//!
26//! Consequences:
27//!
28//! - Retention is approximate: the cache holds between `retention_blocks`
29//! and `retention_blocks + items_per_section − 1` items, evicted in
30//! one-section batches.
31//! - The cache/reth boundary is the section-aligned `oldest_allowed`,
32//! not `tip − retention`. Reads in `[oldest_allowed, tip − retention)`
33//! still hit the cache. Same answer, slightly wasteful path.
34//! - A re-`put` below the requested floor may still land in the cache
35//! (live tail section) or be silently absorbed (see "Stale puts"
36//! below).
37//!
38//! Pick `retention_blocks` as a few multiples of `items_per_section` so
39//! section overshoot is a small fraction of the working set; see
40//! [`super::DEFAULT_FINALIZED_BLOCKS_RETENTION`].
41//!
42//! # Stale puts
43//!
44//! [`Hybrid::put`] absorbs the prunable archive's
45//! [`archive::Error::AlreadyPrunedTo`] as a silent success. The
46//! eviction invariant
47//! `oldest_allowed ≤ section_aligned(reth.finalized − retention + 1)
48//! ≤ reth.finalized` guarantees that a put at `H < oldest_allowed`
49//! also has `H ≤ reth.finalized`, so the block is durable in reth and
50//! a subsequent [`Blocks::get`] will hit the reth fallback. Surfacing
51//! the error would crash the node on a recoverable condition (e.g.
52//! follow-mode catching up while reth has synced past the cache
53//! window).
54//!
55//! # Why reth pruning is not a concern
56//!
57//! Reth may be configured to retain only a window of recent history,
58//! creating a `reth.pruned_below ≤ reth.finalized` watermark. The
59//! marshal can never ask for a block panic-on-miss below
60//! `reth.pruned_below`:
61//!
62//! - **`Blocks::put(H)`**: marshal's `last_processed_height` is floored
63//! to `max(stored_height, reth.finalized)` at startup
64//! ([`alias::marshal::init`]), so every put has
65//! `H > reth.finalized > reth.pruned_below`.
66//! - **`Blocks::get(Index(H))`**: only ever asks for the next
67//! contiguous height or a `gap_end` already in the cache.
68//! - **`Blocks::get(Key(digest))`**: gap-repair parent walks may ask
69//! for a digest below `reth.pruned_below`; on miss we return
70//! `Ok(None)` and the marshal falls back to peer resolution.
71//!
72//! Configuring reth with a smaller retention window than
73//! `retention_blocks` is a perf concern (more peer fetches), not a
74//! correctness one.
75//!
76//! [`alias::marshal::init`]: crate::alias::marshal::init
77
78use alloy_primitives::B256;
79use commonware_consensus::{Heightable as _, marshal::store::Blocks, types::Height};
80use commonware_runtime::{BufferPooler, Clock, Metrics, Storage};
81use commonware_storage::{
82 archive::{self, Identifier, prunable},
83 translator::TwoCap,
84};
85use reth_node_core::primitives::SealedBlock;
86use reth_provider::{
87 BlockReader, BlockSource, ProviderError, ProviderResult,
88 providers::{BlockchainProvider, ProviderNodeTypes},
89};
90use tracing::{debug, info, instrument, warn};
91
92use crate::consensus::{Digest, block::Block};
93
94#[cfg(test)]
95mod test;
96
97/// Narrow view of reth that [`Hybrid`] needs: a finalized watermark and
98/// canonical-by-height / canonical-by-hash block reads.
99///
100/// Exists to make unit testing easier. [`BlockchainProvider`] is used in
101/// production.
102pub(crate) trait FinalizedBlocksProvider: Send + Sync {
103 /// Reth's last finalized block height, or `None` if reth has not yet
104 /// finalized anything (fresh chain).
105 fn finalized_height(&self) -> Option<u64>;
106
107 /// Look up a finalized block by height in reth.
108 ///
109 /// Implementations MUST return `None` for any `height` above
110 /// [`Self::finalized_height`] (and unconditionally when
111 /// [`Self::finalized_height`] is `None`); the marshal relies on
112 /// [`Hybrid`] only serving blocks reth has marked as finalized.
113 fn block_by_height(&self, height: u64) -> ProviderResult<Option<Block>>;
114
115 /// Look up a finalized block by hash in reth.
116 ///
117 /// Implementations MUST restrict the lookup to canonical blocks
118 /// only — pending/in-flight blocks must never be returned, otherwise
119 /// the marshal could be handed a non-finalized block.
120 fn block_by_hash(&self, hash: B256) -> ProviderResult<Option<Block>>;
121}
122
123/// Production impl over reth's [`BlockchainProvider`] — the only type
124/// passed to [`Hybrid`] in production. Generic over `N` only so the
125/// impl works for any concrete `BlockchainProvider<N>` whose primitive
126/// block type is [`tempo_primitives::Block`] (e.g. the
127/// `BlockchainProvider<NodeTypesWithDBAdapter<TempoNode, DatabaseEnv>>`
128/// alias used by `tempo_node::TempoFullNode`).
129impl<N> FinalizedBlocksProvider for BlockchainProvider<N>
130where
131 N: ProviderNodeTypes,
132 Self: BlockReader<Block = tempo_primitives::Block>,
133{
134 fn finalized_height(&self) -> Option<u64> {
135 // Direct read of `canonical_in_memory_state` — equivalent to
136 // `BlockchainProvider`'s `BlockIdReader::finalized_block_num_hash`,
137 // but typed as the genuinely infallible `Option<u64>` rather
138 // than the `Result<Option<_>>` reth's trait demands.
139 self.canonical_in_memory_state()
140 .get_finalized_num_hash()
141 .map(|nh| nh.number)
142 }
143
144 #[instrument(skip_all, fields(height), err)]
145 fn block_by_height(&self, height: u64) -> ProviderResult<Option<Block>> {
146 // Gate the lookup on reth's finalized watermark so the marshal can
147 // never be served a block that reth has not yet marked as
148 // finalized. Without this, the canonical-by-number read would
149 // happily return a block that is canonical-but-not-yet-finalized
150 // (or even a block reth has accepted past its finalized tip),
151 // violating [`Blocks`]'s "finalized only" contract.
152 let Some(finalized) = self.finalized_height() else {
153 // Reth has not finalized anything yet — nothing below the
154 // (nonexistent) finalized watermark is reachable.
155 return Ok(None);
156 };
157 if height > finalized {
158 return Ok(None);
159 }
160 match self.block_by_number(height) {
161 Ok(maybe_block) => Ok(maybe_block.map(|block| {
162 Block::from_execution_block_unchecked(SealedBlock::seal_slow(block), None)
163 })),
164 Err(err @ ProviderError::BlockExpired { .. }) => {
165 info!(error = %eyre::Report::new(err), "cannot find block");
166 Ok(None)
167 }
168 Err(bad_error) => Err(bad_error),
169 }
170 }
171
172 #[instrument(skip_all, fields(hash), err)]
173 fn block_by_hash(&self, hash: B256) -> ProviderResult<Option<Block>> {
174 // `Canonical` (not `Any`) so the marshal can never be served a
175 // block that lives only in reth's pending in-memory tree — see
176 // [`Blocks::get`] on [`Hybrid`].
177 match self.find_block_by_hash(hash, BlockSource::Canonical) {
178 Ok(maybe_block) => Ok(maybe_block.map(|block| {
179 Block::from_execution_block_unchecked(SealedBlock::seal_slow(block), None)
180 })),
181 Err(err @ ProviderError::BlockExpired { .. }) => {
182 info!(error = %eyre::Report::new(err), "cannot find block");
183 Ok(None)
184 }
185 Err(bad_error) => Err(bad_error),
186 }
187 }
188}
189
190/// Error returned by [`Hybrid`]'s [`Blocks`] impl.
191#[derive(Debug, thiserror::Error)]
192pub(crate) enum Error {
193 #[error(transparent)]
194 Archive(#[from] archive::Error),
195
196 #[error(transparent)]
197 Provider(#[from] reth_provider::ProviderError),
198}
199
200/// Backing prunable archive type.
201// TODO: Look into whether to take TwoCap or another translator. TwoCap is used
202// by commonware itself, but higher caps can reduce the likelihood of collisions.
203pub(crate) type Prunable<TContext> = prunable::Archive<TwoCap, TContext, Digest, Block>;
204
205/// Configuration for [`Hybrid`].
206pub(crate) struct Config<TContext, TExecutionBlockProvider>
207where
208 TContext: BufferPooler + Storage + Metrics + Clock,
209{
210 /// Prunable archive backing the most recently finalized blocks.
211 pub(crate) prunable: Prunable<TContext>,
212
213 /// Execution layer block provider used to look up finalized blocks below
214 /// the cache window and to read reth's finalized watermark for cache
215 /// eviction.
216 pub(crate) execution_block_provider: TExecutionBlockProvider,
217
218 /// Number of most-recently-finalized blocks (relative to the EL's
219 /// finalized watermark) to keep in the prunable cache. Anything
220 /// older is dropped from the cache and served out of
221 /// [`Self::execution_block_provider`] instead.
222 pub(crate) retention_blocks: u64,
223}
224
225/// Finalized blocks store backed by a prunable archive (a hot cache of
226/// the most recently finalized blocks) and reth (the source of truth for
227/// finalized blocks).
228pub(crate) struct Hybrid<TContext, TExecutionBlockProvider>
229where
230 TContext: BufferPooler + Storage + Metrics + Clock,
231{
232 /// Hot cache of recently finalized blocks. Bounded to roughly
233 /// `retention_blocks` items above reth's finalized watermark via
234 /// [`Self::evict_below_execution_finalized_floor`].
235 prunable: Prunable<TContext>,
236
237 /// Execution layer block provider used to look up finalized blocks below
238 /// the cache window and to read reth's finalized watermark for cache
239 /// eviction.
240 execution_block_provider: TExecutionBlockProvider,
241
242 /// Number of most-recently-finalized blocks (relative to the EL's
243 /// finalized watermark) to keep in the prunable cache. Anything
244 /// older is dropped from the cache and served out of
245 /// [`Self::execution_block_provider`] instead.
246 retention_blocks: u64,
247}
248
249impl<TContext, P> Hybrid<TContext, P>
250where
251 TContext: BufferPooler + Storage + Metrics + Clock,
252 P: FinalizedBlocksProvider + 'static,
253{
254 pub(crate) fn new(config: Config<TContext, P>) -> Self {
255 let Config {
256 prunable,
257 execution_block_provider: provider,
258 retention_blocks,
259 } = config;
260 Self {
261 prunable,
262 execution_block_provider: provider,
263 retention_blocks,
264 }
265 }
266
267 /// Drops blocks below the execution layer's finalized watermark.
268 ///
269 /// The cache is sized relative to the EL's finalized boundary, not to
270 /// the height the marshal happens to put. This keeps two invariants:
271 ///
272 /// - We never evict a block the EL doesn't yet have. If the EL is lagging
273 /// the marshal, no eviction happens; the cache temporarily grows past
274 /// `retention_blocks`.
275 /// - Eviction tracks the EL's progress monotonically. Once the EL
276 /// finalizes height `H`, the cache may drop everything below
277 /// `H - retention_blocks + 1` on the next put.
278 async fn evict_below_execution_finalized_floor(&mut self) -> Result<(), archive::Error> {
279 // Reth hasn't finalized anything yet (fresh chain) — nothing is
280 // safe to evict.
281 let Some(execution_finalized) = self.execution_block_provider.finalized_height() else {
282 return Ok(());
283 };
284 let Some(min_to_keep) = execution_finalized.checked_sub(self.retention_blocks) else {
285 return Ok(());
286 };
287 // `prune(min)` keeps `min` and above, and the archive rounds `min`
288 // *down* to a section boundary, so the actual retained window can
289 // exceed `retention_blocks` by up to `items_per_section − 1` items.
290 // See the module docs ("Section-rounding") for the full story.
291 let prune_floor = min_to_keep.saturating_add(1);
292 prunable::Archive::prune(&mut self.prunable, prune_floor).await
293 }
294}
295
296impl<TContext, TExecutionBlockProvider> Blocks for Hybrid<TContext, TExecutionBlockProvider>
297where
298 TContext: BufferPooler + Storage + Metrics + Clock + Send + Sync + 'static,
299 TExecutionBlockProvider: FinalizedBlocksProvider + 'static,
300{
301 type Block = Block;
302 type Error = Error;
303
304 #[instrument(skip_all, err)]
305 async fn put(&mut self, block: Self::Block) -> Result<(), Self::Error> {
306 let height = block.height();
307 let digest = block.digest();
308 match archive::Archive::put(&mut self.prunable, height.get(), digest, block).await {
309 Ok(()) => {}
310 // The prunable cache has already evicted this height — but
311 // by the cache's eviction invariant
312 // (`oldest_allowed ≤ section_aligned(execution_finalized − retention + 1)
313 // ≤ execution_finalized`), `height < oldest_allowed` implies
314 // `height ≤ execution_finalized`. The EL's finality contract
315 // guarantees every block at or below `execution_finalized` is
316 // durably persisted, so the marshal's subsequent
317 // `Blocks::get(height)` will be served out of the execution
318 // layer fallback path. We can't write to EL ourselves (it owns
319 // its own storage), but we don't have to — the block is
320 // already durable. Treat the put as a successful no-op so
321 // we don't trip the marshal's "failed to finalize" panic
322 // on a perfectly recoverable condition.
323 Err(archive::Error::AlreadyPrunedTo(oldest_allowed)) => {
324 debug!(
325 %height,
326 oldest_allowed,
327 execution_finalized = ?self.execution_block_provider.finalized_height(),
328 "finalized block below prunable cache window; trusting the \
329 execution layer's finalized storage and treating put as a \
330 no-op"
331 );
332 }
333 Err(other) => return Err(other.into()),
334 }
335
336 if let Err(err) = self.evict_below_execution_finalized_floor().await {
337 // Eviction failures are not fatal; the next put will retry.
338 // We log because they may indicate disk-level issues.
339 warn!(
340 %err,
341 %height,
342 retention = self.retention_blocks,
343 "failed to evict prunable finalized blocks cache after put"
344 );
345 }
346 Ok(())
347 }
348
349 async fn sync(&mut self) -> Result<(), Self::Error> {
350 archive::Archive::sync(&mut self.prunable).await?;
351 Ok(())
352 }
353
354 /// Attempts to read `id` from the prunable archive, falling back to EL on miss.
355 async fn get(&self, id: Identifier<'_, Digest>) -> Result<Option<Self::Block>, Self::Error> {
356 // EL read errors propagate to the marshal — see this module's
357 // doc comment ("Marshal panic behavior") for what happens then.
358 match id {
359 Identifier::Index(height) => {
360 if let Some(block) =
361 archive::Archive::get(&self.prunable, Identifier::Index(height)).await?
362 {
363 return Ok(Some(block));
364 }
365 Ok(self.execution_block_provider.block_by_height(height)?)
366 }
367 Identifier::Key(digest) => {
368 if let Some(block) =
369 archive::Archive::get(&self.prunable, Identifier::Key(digest)).await?
370 {
371 return Ok(Some(block));
372 }
373 Ok(self.execution_block_provider.block_by_hash(digest.0)?)
374 }
375 }
376 }
377
378 /// No-op: Cache eviction is EL-driven (see [`Self::evict_below_execution_finalized_floor`]).
379 async fn prune(&mut self, _min: Height) -> Result<(), Self::Error> {
380 Ok(())
381 }
382
383 fn missing_items(&self, start: Height, max: usize) -> Vec<Height> {
384 // EL is treated as a contiguous source; gaps can only exist in the
385 // prunable archive. The marshal only ever asks about heights at or
386 // above `last_processed_height`, which the prunable archive must
387 // cover.
388 archive::Archive::missing_items(&self.prunable, start.get(), max)
389 .into_iter()
390 .map(Height::new)
391 .collect()
392 }
393
394 fn next_gap(&self, value: Height) -> (Option<Height>, Option<Height>) {
395 let (a, b) = archive::Archive::next_gap(&self.prunable, value.get());
396 (a.map(Height::new), b.map(Height::new))
397 }
398
399 fn last_index(&self) -> Option<Height> {
400 // Only report what lives in the prunable archive. The marshal uses
401 // this to drive repair against the certificates archive; reflecting
402 // EL here would mask gaps the marshal needs to fill via the
403 // resolver.
404 archive::Archive::last_index(&self.prunable).map(Height::new)
405 }
406}