Skip to main content

tempo_consensus/storage/
mod.rs

1//! This module defines consensus archive formats.
2//!
3//! Finalized blocks are stored in a [`Hybrid`] store which
4//! merges a prunable archive (holding the most recently finalized blocks) with
5//! a lookup into the execution layer (used for blocks below the prunable
6//! retention window).
7
8use std::time::Instant;
9
10use alloy_consensus::Sealable as _;
11use commonware_consensus::simplex::{scheme::bls12381_threshold::vrf::Scheme, types::Finalization};
12use commonware_cryptography::{
13    bls12381::primitives::variant::MinSig, certificate::Scheme as _, ed25519::PublicKey,
14};
15use commonware_runtime::{BufferPooler, Clock, Metrics, Spawner, Storage, buffer::paged::CacheRef};
16use commonware_storage::{
17    archive::{Archive as _, Identifier, immutable, prunable},
18    translator::TwoCap,
19};
20use commonware_utils::{NZU16, NZU64, NZUsize};
21use eyre::{WrapErr as _, ensure};
22use reth_provider::{BlockIdReader, BlockReader};
23use tracing::{info, instrument};
24
25use crate::{
26    config::BLOCKS_FREEZER_TABLE_INITIAL_SIZE_BYTES,
27    consensus::{Digest, block::Block},
28};
29
30pub(crate) mod hybrid;
31
32pub(crate) use hybrid::{FinalizedBlocksProvider, Hybrid};
33
34const FINALIZATIONS_BY_HEIGHT: &str = "finalizations-by-height";
35const PRUNABLE_FINALIZED_BLOCKS: &str = "finalized-blocks-prunable";
36
37pub(in crate::storage) const IMMUTABLE_ITEMS_PER_SECTION: std::num::NonZeroU64 = NZU64!(262_144);
38pub(in crate::storage) const FREEZER_TABLE_RESIZE_FREQUENCY: u8 = 4;
39pub(in crate::storage) const FREEZER_TABLE_RESIZE_CHUNK_SIZE: u32 = 2u32.pow(16); // 64KB chunks
40pub(in crate::storage) const FREEZER_VALUE_TARGET_SIZE: u64 = 1024 * 1024 * 1024; // 1GB
41pub(in crate::storage) const FREEZER_VALUE_COMPRESSION: Option<u8> = Some(3);
42
43pub(crate) const REPLAY_BUFFER: std::num::NonZeroUsize = NZUsize!(8 * 1024 * 1024); // 8MB
44pub(crate) const WRITE_BUFFER: std::num::NonZeroUsize = NZUsize!(1024 * 1024); // 1MB
45pub(crate) const PRUNABLE_ITEMS_PER_SECTION: std::num::NonZeroU64 = NZU64!(4_096);
46pub(crate) const MAX_REPAIR: std::num::NonZeroUsize = NZUsize!(20);
47pub(crate) const BUFFER_POOL_PAGE_SIZE: std::num::NonZeroU16 = NZU16!(4_096); // 4KB
48pub(crate) const BUFFER_POOL_CAPACITY: std::num::NonZeroUsize = NZUsize!(8_192); // 32MB (8k page slots)
49
50/// Default number of finalized blocks (relative to reth's finalized
51/// watermark) to keep cached in the prunable archive.
52///
53/// Beyond this depth, [`Hybrid`] falls back to looking up blocks from the
54/// execution layer.
55///
56/// The prunable archive evicts in `PRUNABLE_ITEMS_PER_SECTION`-sized
57/// batches (see [`hybrid`]'s "Section-rounding" docs). When reth is
58/// caught up to the marshal's tip the cache holds between `RETENTION`
59/// and `RETENTION + PRUNABLE_ITEMS_PER_SECTION − 1` items; if reth is
60/// lagging the marshal, the cache can hold more (it never drops blocks
61/// reth doesn't yet have). The assertion below keeps the section
62/// overshoot small relative to `RETENTION` (current ratio: 4×).
63pub(crate) const DEFAULT_FINALIZED_BLOCKS_RETENTION: u64 = 16_384;
64
65const _: () = assert!(
66    DEFAULT_FINALIZED_BLOCKS_RETENTION >= 2 * PRUNABLE_ITEMS_PER_SECTION.get(),
67    "DEFAULT_FINALIZED_BLOCKS_RETENTION must be at least 2 * PRUNABLE_ITEMS_PER_SECTION; \
68     otherwise the section-rounding overshoot dominates the working set",
69);
70
71pub(crate) async fn init_finalizations_archive<TContext>(
72    context: &TContext,
73    partition_prefix: &str,
74    page_cache: CacheRef,
75) -> Result<
76    immutable::Archive<TContext, Digest, Finalization<Scheme<PublicKey, MinSig>, Digest>>,
77    commonware_storage::archive::Error,
78>
79where
80    TContext: Clock + Metrics + Spawner + Storage + BufferPooler + Clone + Send + 'static,
81{
82    let start = Instant::now();
83    let archive = immutable::Archive::init(
84        context.with_label("finalizations_by_height"),
85        immutable::Config {
86            metadata_partition: format!("{partition_prefix}-{FINALIZATIONS_BY_HEIGHT}-metadata"),
87            freezer_table_partition: format!(
88                "{partition_prefix}-{FINALIZATIONS_BY_HEIGHT}-freezer-table"
89            ),
90            freezer_table_initial_size: BLOCKS_FREEZER_TABLE_INITIAL_SIZE_BYTES,
91            freezer_table_resize_frequency: FREEZER_TABLE_RESIZE_FREQUENCY,
92            freezer_table_resize_chunk_size: FREEZER_TABLE_RESIZE_CHUNK_SIZE,
93            freezer_key_partition: format!(
94                "{partition_prefix}-{FINALIZATIONS_BY_HEIGHT}-freezer-key"
95            ),
96            freezer_key_page_cache: page_cache.clone(),
97            freezer_value_partition: format!(
98                "{partition_prefix}-{FINALIZATIONS_BY_HEIGHT}-freezer-value"
99            ),
100            freezer_value_target_size: FREEZER_VALUE_TARGET_SIZE,
101            freezer_value_compression: FREEZER_VALUE_COMPRESSION,
102            ordinal_partition: format!("{partition_prefix}-{FINALIZATIONS_BY_HEIGHT}-ordinal"),
103            items_per_section: IMMUTABLE_ITEMS_PER_SECTION,
104            codec_config: Scheme::<PublicKey, MinSig>::certificate_codec_config_unbounded(),
105            replay_buffer: REPLAY_BUFFER,
106            freezer_key_write_buffer: WRITE_BUFFER,
107            freezer_value_write_buffer: WRITE_BUFFER,
108            ordinal_write_buffer: WRITE_BUFFER,
109        },
110    )
111    .await;
112
113    info!(
114        elapsed = %tempo_telemetry_util::display_duration(start.elapsed()),
115        "restored finalizations by height archive"
116    );
117
118    archive
119}
120
121/// Initialize the [`Hybrid`] finalized blocks store backed by a prunable
122/// archive (for `retention_blocks` recent items) and a reth provider lookup
123/// (for everything older).
124#[instrument(skip_all, fields(partition_prefix, retention_blocks), err(Display))]
125pub(crate) async fn init_finalized_blocks<TContext, P>(
126    context: &TContext,
127    partition_prefix: &str,
128    page_cache: CacheRef,
129    provider: P,
130    retention_blocks: u64,
131) -> eyre::Result<Hybrid<TContext, P>>
132where
133    TContext: Clock + Metrics + Spawner + Storage + BufferPooler + Clone + Send + 'static,
134    P: FinalizedBlocksProvider + 'static,
135{
136    ensure!(
137        retention_blocks > 0,
138        "finalized blocks retention must be greater than zero",
139    );
140
141    let prunable =
142        init_prunable_finalized_blocks_archive(context, partition_prefix, page_cache.clone())
143            .await
144            .wrap_err("failed to initialize prunable finalized blocks archive")?;
145
146    Ok(Hybrid::new(hybrid::Config {
147        prunable,
148        execution_block_provider: provider,
149        retention_blocks,
150    }))
151}
152
153/// Initialize the prunable archive that holds recently finalized blocks.
154///
155/// This archive only holds at most `retention_blocks` items at any time;
156/// older blocks are removed by the prune step in
157/// [`Hybrid`].
158async fn init_prunable_finalized_blocks_archive<TContext>(
159    context: &TContext,
160    partition_prefix: &str,
161    page_cache: CacheRef,
162) -> Result<prunable::Archive<TwoCap, TContext, Digest, Block>, commonware_storage::archive::Error>
163where
164    TContext: Clock + Metrics + Spawner + Storage + BufferPooler + Clone + Send + 'static,
165{
166    let start = Instant::now();
167    let archive = prunable::Archive::init(
168        context.with_label("finalized_blocks_prunable"),
169        prunable::Config {
170            translator: TwoCap,
171            key_partition: format!("{partition_prefix}-{PRUNABLE_FINALIZED_BLOCKS}-key"),
172            key_page_cache: page_cache,
173            value_partition: format!("{partition_prefix}-{PRUNABLE_FINALIZED_BLOCKS}-value"),
174            compression: FREEZER_VALUE_COMPRESSION,
175            codec_config: (),
176            items_per_section: PRUNABLE_ITEMS_PER_SECTION,
177            key_write_buffer: WRITE_BUFFER,
178            value_write_buffer: WRITE_BUFFER,
179            replay_buffer: REPLAY_BUFFER,
180        },
181    )
182    .await;
183
184    info!(
185        elapsed = %tempo_telemetry_util::display_duration(start.elapsed()),
186        "restored prunable finalized blocks archive",
187    );
188
189    archive
190}
191
192/// Finds the latest finalization certificate backed by finalized execution storage.
193///
194/// Searches backwards from the execution provider's finalized tip. At
195/// most `max_depth` blocks behind that starting height are inspected.
196///
197/// Returns `None` if no persisted finalization certificate has a matching
198/// finalized execution block.
199pub async fn find_last_finalized_marker<TContext, P>(
200    context: &TContext,
201    execution_provider: &P,
202    max_depth: u64,
203) -> eyre::Result<Option<(u64, Finalization<Scheme<PublicKey, MinSig>, Digest>)>>
204where
205    TContext: Clock + Metrics + Spawner + Storage + BufferPooler + Clone + Send + 'static,
206    P: BlockIdReader + BlockReader<Block = tempo_primitives::Block> + Send + Sync + ?Sized,
207{
208    let page_cache = CacheRef::from_pooler(context, BUFFER_POOL_PAGE_SIZE, BUFFER_POOL_CAPACITY);
209    let archive = init_finalizations_archive(context, crate::PARTITION_PREFIX, page_cache)
210        .await
211        .wrap_err("failed to open finalizations-by-height archive")?;
212
213    if archive.last_index().is_none() {
214        return Ok(None);
215    }
216    let Some(finalized_tip) = execution_provider
217        .finalized_block_number()
218        .wrap_err("failed reading finalized block number from execution provider")?
219    else {
220        return Ok(None);
221    };
222
223    let search_end = finalized_tip.saturating_sub(max_depth);
224    for height in (search_end..=finalized_tip).rev() {
225        let Some(finalization) = archive
226            .get(Identifier::Index(height))
227            .await
228            .wrap_err_with(|| format!("failed reading finalization at height {height}"))?
229        else {
230            continue;
231        };
232
233        let Some(block) = execution_provider
234            .block_by_number(height)
235            .wrap_err_with(|| format!("failed reading block at height {height}"))?
236        else {
237            continue;
238        };
239
240        let finalization_digest = finalization.proposal.payload;
241        let block_digest = Digest(block.header.hash_slow());
242        ensure!(
243            finalization_digest == block_digest,
244            "digest mismatch at height `{height}`. finalization: {finalization_digest}, execution: {block_digest}",
245        );
246
247        return Ok(Some((height, finalization)));
248    }
249
250    Ok(None)
251}