1use std::time::Instant;
9
10use alloy_consensus::Sealable as _;
11use commonware_consensus::simplex::{scheme::bls12381_threshold::vrf::Scheme, types::Finalization};
12use commonware_cryptography::{
13 bls12381::primitives::variant::MinSig, certificate::Scheme as _, ed25519::PublicKey,
14};
15use commonware_runtime::{BufferPooler, Clock, Metrics, Spawner, Storage, buffer::paged::CacheRef};
16use commonware_storage::{
17 archive::{Archive as _, Identifier, immutable, prunable},
18 translator::TwoCap,
19};
20use commonware_utils::{NZU16, NZU64, NZUsize};
21use eyre::{WrapErr as _, ensure};
22use reth_provider::{BlockIdReader, BlockReader};
23use tracing::{info, instrument};
24
25use crate::{
26 config::BLOCKS_FREEZER_TABLE_INITIAL_SIZE_BYTES,
27 consensus::{Digest, block::Block},
28};
29
30pub(crate) mod hybrid;
31
32pub(crate) use hybrid::{FinalizedBlocksProvider, Hybrid};
33
34const FINALIZATIONS_BY_HEIGHT: &str = "finalizations-by-height";
35const PRUNABLE_FINALIZED_BLOCKS: &str = "finalized-blocks-prunable";
36
37pub(in crate::storage) const IMMUTABLE_ITEMS_PER_SECTION: std::num::NonZeroU64 = NZU64!(262_144);
38pub(in crate::storage) const FREEZER_TABLE_RESIZE_FREQUENCY: u8 = 4;
39pub(in crate::storage) const FREEZER_TABLE_RESIZE_CHUNK_SIZE: u32 = 2u32.pow(16); pub(in crate::storage) const FREEZER_VALUE_TARGET_SIZE: u64 = 1024 * 1024 * 1024; pub(in crate::storage) const FREEZER_VALUE_COMPRESSION: Option<u8> = Some(3);
42
43pub(crate) const REPLAY_BUFFER: std::num::NonZeroUsize = NZUsize!(8 * 1024 * 1024); pub(crate) const WRITE_BUFFER: std::num::NonZeroUsize = NZUsize!(1024 * 1024); pub(crate) const PRUNABLE_ITEMS_PER_SECTION: std::num::NonZeroU64 = NZU64!(4_096);
46pub(crate) const MAX_REPAIR: std::num::NonZeroUsize = NZUsize!(20);
47pub(crate) const BUFFER_POOL_PAGE_SIZE: std::num::NonZeroU16 = NZU16!(4_096); pub(crate) const BUFFER_POOL_CAPACITY: std::num::NonZeroUsize = NZUsize!(8_192); pub(crate) const DEFAULT_FINALIZED_BLOCKS_RETENTION: u64 = 16_384;
64
65const _: () = assert!(
66 DEFAULT_FINALIZED_BLOCKS_RETENTION >= 2 * PRUNABLE_ITEMS_PER_SECTION.get(),
67 "DEFAULT_FINALIZED_BLOCKS_RETENTION must be at least 2 * PRUNABLE_ITEMS_PER_SECTION; \
68 otherwise the section-rounding overshoot dominates the working set",
69);
70
71pub(crate) async fn init_finalizations_archive<TContext>(
72 context: &TContext,
73 partition_prefix: &str,
74 page_cache: CacheRef,
75) -> Result<
76 immutable::Archive<TContext, Digest, Finalization<Scheme<PublicKey, MinSig>, Digest>>,
77 commonware_storage::archive::Error,
78>
79where
80 TContext: Clock + Metrics + Spawner + Storage + BufferPooler + Clone + Send + 'static,
81{
82 let start = Instant::now();
83 let archive = immutable::Archive::init(
84 context.with_label("finalizations_by_height"),
85 immutable::Config {
86 metadata_partition: format!("{partition_prefix}-{FINALIZATIONS_BY_HEIGHT}-metadata"),
87 freezer_table_partition: format!(
88 "{partition_prefix}-{FINALIZATIONS_BY_HEIGHT}-freezer-table"
89 ),
90 freezer_table_initial_size: BLOCKS_FREEZER_TABLE_INITIAL_SIZE_BYTES,
91 freezer_table_resize_frequency: FREEZER_TABLE_RESIZE_FREQUENCY,
92 freezer_table_resize_chunk_size: FREEZER_TABLE_RESIZE_CHUNK_SIZE,
93 freezer_key_partition: format!(
94 "{partition_prefix}-{FINALIZATIONS_BY_HEIGHT}-freezer-key"
95 ),
96 freezer_key_page_cache: page_cache.clone(),
97 freezer_value_partition: format!(
98 "{partition_prefix}-{FINALIZATIONS_BY_HEIGHT}-freezer-value"
99 ),
100 freezer_value_target_size: FREEZER_VALUE_TARGET_SIZE,
101 freezer_value_compression: FREEZER_VALUE_COMPRESSION,
102 ordinal_partition: format!("{partition_prefix}-{FINALIZATIONS_BY_HEIGHT}-ordinal"),
103 items_per_section: IMMUTABLE_ITEMS_PER_SECTION,
104 codec_config: Scheme::<PublicKey, MinSig>::certificate_codec_config_unbounded(),
105 replay_buffer: REPLAY_BUFFER,
106 freezer_key_write_buffer: WRITE_BUFFER,
107 freezer_value_write_buffer: WRITE_BUFFER,
108 ordinal_write_buffer: WRITE_BUFFER,
109 },
110 )
111 .await;
112
113 info!(
114 elapsed = %tempo_telemetry_util::display_duration(start.elapsed()),
115 "restored finalizations by height archive"
116 );
117
118 archive
119}
120
121#[instrument(skip_all, fields(partition_prefix, retention_blocks), err(Display))]
125pub(crate) async fn init_finalized_blocks<TContext, P>(
126 context: &TContext,
127 partition_prefix: &str,
128 page_cache: CacheRef,
129 provider: P,
130 retention_blocks: u64,
131) -> eyre::Result<Hybrid<TContext, P>>
132where
133 TContext: Clock + Metrics + Spawner + Storage + BufferPooler + Clone + Send + 'static,
134 P: FinalizedBlocksProvider + 'static,
135{
136 ensure!(
137 retention_blocks > 0,
138 "finalized blocks retention must be greater than zero",
139 );
140
141 let prunable =
142 init_prunable_finalized_blocks_archive(context, partition_prefix, page_cache.clone())
143 .await
144 .wrap_err("failed to initialize prunable finalized blocks archive")?;
145
146 Ok(Hybrid::new(hybrid::Config {
147 prunable,
148 execution_block_provider: provider,
149 retention_blocks,
150 }))
151}
152
153async fn init_prunable_finalized_blocks_archive<TContext>(
159 context: &TContext,
160 partition_prefix: &str,
161 page_cache: CacheRef,
162) -> Result<prunable::Archive<TwoCap, TContext, Digest, Block>, commonware_storage::archive::Error>
163where
164 TContext: Clock + Metrics + Spawner + Storage + BufferPooler + Clone + Send + 'static,
165{
166 let start = Instant::now();
167 let archive = prunable::Archive::init(
168 context.with_label("finalized_blocks_prunable"),
169 prunable::Config {
170 translator: TwoCap,
171 key_partition: format!("{partition_prefix}-{PRUNABLE_FINALIZED_BLOCKS}-key"),
172 key_page_cache: page_cache,
173 value_partition: format!("{partition_prefix}-{PRUNABLE_FINALIZED_BLOCKS}-value"),
174 compression: FREEZER_VALUE_COMPRESSION,
175 codec_config: (),
176 items_per_section: PRUNABLE_ITEMS_PER_SECTION,
177 key_write_buffer: WRITE_BUFFER,
178 value_write_buffer: WRITE_BUFFER,
179 replay_buffer: REPLAY_BUFFER,
180 },
181 )
182 .await;
183
184 info!(
185 elapsed = %tempo_telemetry_util::display_duration(start.elapsed()),
186 "restored prunable finalized blocks archive",
187 );
188
189 archive
190}
191
192pub async fn find_last_finalized_marker<TContext, P>(
200 context: &TContext,
201 execution_provider: &P,
202 max_depth: u64,
203) -> eyre::Result<Option<(u64, Finalization<Scheme<PublicKey, MinSig>, Digest>)>>
204where
205 TContext: Clock + Metrics + Spawner + Storage + BufferPooler + Clone + Send + 'static,
206 P: BlockIdReader + BlockReader<Block = tempo_primitives::Block> + Send + Sync + ?Sized,
207{
208 let page_cache = CacheRef::from_pooler(context, BUFFER_POOL_PAGE_SIZE, BUFFER_POOL_CAPACITY);
209 let archive = init_finalizations_archive(context, crate::PARTITION_PREFIX, page_cache)
210 .await
211 .wrap_err("failed to open finalizations-by-height archive")?;
212
213 if archive.last_index().is_none() {
214 return Ok(None);
215 }
216 let Some(finalized_tip) = execution_provider
217 .finalized_block_number()
218 .wrap_err("failed reading finalized block number from execution provider")?
219 else {
220 return Ok(None);
221 };
222
223 let search_end = finalized_tip.saturating_sub(max_depth);
224 for height in (search_end..=finalized_tip).rev() {
225 let Some(finalization) = archive
226 .get(Identifier::Index(height))
227 .await
228 .wrap_err_with(|| format!("failed reading finalization at height {height}"))?
229 else {
230 continue;
231 };
232
233 let Some(block) = execution_provider
234 .block_by_number(height)
235 .wrap_err_with(|| format!("failed reading block at height {height}"))?
236 else {
237 continue;
238 };
239
240 let finalization_digest = finalization.proposal.payload;
241 let block_digest = Digest(block.header.hash_slow());
242 ensure!(
243 finalization_digest == block_digest,
244 "digest mismatch at height `{height}`. finalization: {finalization_digest}, execution: {block_digest}",
245 );
246
247 return Ok(Some((height, finalization)));
248 }
249
250 Ok(None)
251}