1use std::{
7 collections::HashSet,
8 fs::File,
9 io::{BufReader, Read},
10 path::PathBuf,
11 sync::mpsc,
12 thread,
13 time::{Duration, Instant},
14};
15
16use alloy_primitives::{
17 B256, U256, keccak256,
18 map::{AddressMap, Entry},
19};
20use clap::Parser;
21use eyre::{Context as _, ensure};
22use reth_chainspec::EthereumHardforks;
23use reth_cli_commands::common::{AccessRights, CliNodeTypes, EnvironmentArgs};
24use reth_db_api::{
25 cursor::{DbCursorRO, DbDupCursorRW},
26 models::{CompactU256, StorageBeforeTx, storage_sharded_key::StorageShardedKey},
27 table::Decompress,
28 tables,
29 transaction::{DbTx, DbTxMut},
30};
31use reth_ethereum::{chainspec::EthChainSpec, tasks::Runtime};
32use reth_etl::Collector;
33use reth_primitives_traits::{Account, StorageEntry};
34use reth_provider::{
35 BlockNumReader, DBProvider, DatabaseProviderFactory, HashingWriter, RocksDBProviderFactory,
36 StaticFileProviderFactory, StaticFileSegment, StorageChangeSetReader, StorageSettingsCache,
37 TrieWriter,
38};
39use reth_trie::{IntermediateStateRootState, StateRootProgress};
40use reth_trie_db::DatabaseStateRoot;
41use tempo_chainspec::spec::TempoChainSpecParser;
42use tracing::info;
43
44const MAGIC: &[u8; 8] = b"TEMPOSB\x00";
46
47const VERSION: u16 = 1;
49
50const ETL_FILE_SIZE: usize = 200 * 1024 * 1024;
52
53const WORKER_CHUNK_SIZE: usize = 100;
55
56const HASH_WORKER_QUEUE_DEPTH: usize = 256;
58
59#[derive(Debug, Parser)]
61pub struct InitFromBinaryDump<C: reth_cli::chainspec::ChainSpecParser = TempoChainSpecParser> {
62 #[command(flatten)]
63 env: EnvironmentArgs<C>,
64
65 #[arg(value_name = "BINARY_DUMP_FILE")]
69 state: PathBuf,
70}
71
72impl<C: reth_cli::chainspec::ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>>
73 InitFromBinaryDump<C>
74{
75 pub(crate) async fn execute<N>(self, runtime: Runtime) -> eyre::Result<()>
77 where
78 N: CliNodeTypes<ChainSpec = C::ChainSpec>,
79 {
80 info!(target: "tempo::cli", "Tempo init-from-binary-dump starting");
81
82 let etl_dir = self
83 .env
84 .datadir
85 .clone()
86 .resolve_datadir(self.env.chain.chain())
87 .data_dir()
88 .join("etl");
89 let environment = self.env.init::<N>(AccessRights::RW, runtime)?;
90 let provider_factory = environment.provider_factory;
91
92 let provider_rw = provider_factory.database_provider_rw()?;
93
94 let last_block = provider_rw.last_block_number()?;
96 ensure!(
97 last_block == 0,
98 "init-from-binary-dump must be run on a freshly initialized database at block 0, \
99 but found block {last_block}"
100 );
101
102 info!(target: "tempo::cli", path = %self.state.display(), "Loading binary state dump");
103
104 let file = File::open(&self.state)
105 .wrap_err_with(|| format!("failed to open {}", self.state.display()))?;
106 let mut reader = BufReader::with_capacity(64 * 1024 * 1024, file);
107
108 let mut total_entries = 0u64;
109 let mut total_blocks = 0u64;
110
111 let mut accounts_seen: AddressMap<Account> = AddressMap::default();
113 let mut genesis_storage_keys = HashSet::new();
114
115 let mut hash_chunk: Vec<(alloy_primitives::Address, B256, CompactU256)> =
117 Vec::with_capacity(WORKER_CHUNK_SIZE);
118 let mut storage_changeset_collector: Collector<Vec<u8>, CompactU256> =
119 Collector::new(ETL_FILE_SIZE, Some(etl_dir.clone()));
120 let mut storage_history_collector: Collector<Vec<u8>, CompactU256> =
121 Collector::new(ETL_FILE_SIZE, Some(etl_dir.clone()));
122
123 for (index, entry) in provider_rw.storage_changeset(0)? {
124 let raw_key = raw_storage_key(index.address(), entry.key);
125 genesis_storage_keys.insert(raw_key.clone());
126 storage_changeset_collector
127 .insert(raw_key, CompactU256::from(entry.value))
128 .wrap_err("storage changeset ETL insert of genesis storage failed")?;
129 }
130
131 let (hash_tx, hash_rx) = mpsc::sync_channel::<
134 Vec<(alloy_primitives::Address, B256, CompactU256)>,
135 >(HASH_WORKER_QUEUE_DEPTH);
136 let hashed_etl_dir = etl_dir;
137 let hash_worker =
138 thread::spawn(move || -> eyre::Result<Collector<Vec<u8>, CompactU256>> {
139 let mut hashed_collector: Collector<Vec<u8>, CompactU256> =
140 Collector::new(ETL_FILE_SIZE, Some(hashed_etl_dir));
141 while let Ok(chunk) = hash_rx.recv() {
142 let mut last_addr = alloy_primitives::Address::ZERO;
143 let mut hashed_addr = B256::ZERO;
144 for (address, slot, value) in chunk {
145 if address != last_addr {
146 last_addr = address;
147 hashed_addr = keccak256(address);
148 }
149 let mut hashed_key = Vec::with_capacity(64);
150 hashed_key.extend_from_slice(hashed_addr.as_slice());
151 hashed_key.extend_from_slice(keccak256(slot).as_slice());
152 hashed_collector
153 .insert(hashed_key, value)
154 .wrap_err("hashed ETL insert failed")?;
155 }
156 }
157 Ok(hashed_collector)
158 });
159
160 loop {
162 let mut header_buf = [0u8; 40];
164 match reader.read_exact(&mut header_buf) {
165 Ok(()) => {}
166 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
167 Err(e) => return Err(e).wrap_err("failed to read block header"),
168 }
169
170 ensure!(
172 &header_buf[..8] == MAGIC,
173 "invalid magic bytes in block header"
174 );
175
176 let version = u16::from_be_bytes([header_buf[8], header_buf[9]]);
178 ensure!(
179 version == VERSION,
180 "unsupported binary format version {version}, expected {VERSION}"
181 );
182
183 let mut address_bytes = [0u8; 20];
187 address_bytes.copy_from_slice(&header_buf[12..32]);
188 let address = alloy_primitives::Address::from(address_bytes);
189
190 let pair_count = u64::from_be_bytes(header_buf[32..40].try_into().unwrap());
192
193 info!(
194 target: "tempo::cli",
195 %address,
196 pair_count,
197 "Processing token storage block"
198 );
199
200 if let Entry::Vacant(e) = accounts_seen.entry(address) {
208 let hashed_address = keccak256(address);
209 let mut account_cursor = provider_rw
210 .tx_ref()
211 .cursor_read::<tables::HashedAccounts>()?;
212 let account = match account_cursor.seek_exact(hashed_address)? {
213 Some((_, account)) => account,
214 None => {
215 return Err(eyre::eyre!(
216 "state bloat references account {address} that is missing from genesis"
217 ));
218 }
219 };
220 e.insert(account);
221 }
222
223 let mut entry_buf = [0u8; 64];
225 let start = Instant::now();
226 let mut last_log = start;
227
228 for i in 0..pair_count {
229 reader
230 .read_exact(&mut entry_buf)
231 .wrap_err("failed to read storage entry")?;
232
233 let slot = B256::from_slice(&entry_buf[..32]);
234 let value = U256::from_be_bytes::<32>(entry_buf[32..64].try_into().unwrap());
235
236 if value.is_zero() {
238 continue;
239 }
240
241 let compact_value = CompactU256::from(value);
242
243 let raw_key = raw_storage_key(address, slot);
244 if !genesis_storage_keys.contains(&raw_key) {
245 storage_changeset_collector
246 .insert(raw_key.clone(), CompactU256::from(U256::ZERO))
247 .wrap_err("storage changeset ETL insert failed")?;
248 storage_history_collector
249 .insert(raw_key, CompactU256::from(U256::ZERO))
250 .wrap_err("storage history ETL insert failed")?;
251 }
252
253 hash_chunk.push((address, slot, compact_value));
255
256 if hash_chunk.len() >= WORKER_CHUNK_SIZE {
258 let chunk =
259 std::mem::replace(&mut hash_chunk, Vec::with_capacity(WORKER_CHUNK_SIZE));
260 hash_tx.send(chunk).wrap_err("hash worker disconnected")?;
261 }
262
263 total_entries += 1;
264
265 log_collection_progress(&address, i, pair_count, start, &mut last_log);
266 }
267
268 total_blocks += 1;
269 }
270
271 if !hash_chunk.is_empty() {
273 hash_tx
274 .send(std::mem::take(&mut hash_chunk))
275 .wrap_err("hash worker disconnected")?;
276 }
277 drop(hash_tx);
278 let mut hashed_collector = hash_worker
279 .join()
280 .map_err(|_| eyre::eyre!("hash worker panicked"))??;
281
282 info!(
283 target: "tempo::cli",
284 total_blocks,
285 total_entries,
286 "Entries collected, merging genesis hashed storage into ETL collector..."
287 );
288
289 {
291 let tx = provider_rw.tx_ref();
292 let mut cursor = tx.cursor_read::<tables::HashedStorages>()?;
293 let mut genesis_count = 0usize;
294 let walker = cursor.walk(None)?;
295 for row in walker {
296 let (hashed_address, entry) = row?;
297 let mut key = Vec::with_capacity(64);
298 key.extend_from_slice(hashed_address.as_slice());
299 key.extend_from_slice(entry.key.as_slice());
300 hashed_collector
301 .insert(key, CompactU256::from(entry.value))
302 .wrap_err("ETL insert of genesis hashed storage failed")?;
303 genesis_count += 1;
304 }
305 info!(
306 target: "tempo::cli",
307 genesis_count,
308 "Genesis hashed storage entries merged into collector"
309 );
310 }
311
312 let storage_settings = provider_rw.cached_storage_settings();
313 ensure!(
314 storage_settings.storage_v2,
315 "init-from-binary-dump only supports storage v2 databases"
316 );
317
318 let storage_changeset_factory = provider_factory.clone();
319 let storage_changeset_worker = thread::spawn(move || {
320 write_storage_changesets(storage_changeset_factory, storage_changeset_collector)
321 });
322
323 let storage_history_factory = provider_factory;
324 let storage_history_worker = thread::spawn(move || {
325 write_storage_history(storage_history_factory, storage_history_collector)
326 });
327
328 let total_hashes = hashed_collector.len();
333 provider_rw.tx_ref().clear::<tables::HashedStorages>()?;
334 let mut hashed_cursor = provider_rw
335 .tx_ref()
336 .cursor_dup_write::<tables::HashedStorages>()?;
337 load_etl_to_cursor(
338 &mut hashed_collector,
339 total_hashes,
340 "hashed storage",
341 |k, v| {
342 hashed_cursor.append_dup(
343 B256::from_slice(&k[..32]),
344 StorageEntry {
345 key: B256::from_slice(&k[32..]),
346 value: v,
347 },
348 )
349 },
350 )?;
351 drop(hashed_cursor);
352
353 info!(
354 target: "tempo::cli",
355 total_hashes,
356 "Storage written, writing hashed accounts..."
357 );
358
359 provider_rw.insert_account_for_hashing(
362 accounts_seen
363 .iter()
364 .map(|(addr, account)| (*addr, Some(*account))),
365 )?;
366
367 storage_changeset_worker
368 .join()
369 .map_err(|_| eyre::eyre!("storage changeset worker panicked"))??;
370 storage_history_worker
371 .join()
372 .map_err(|_| eyre::eyre!("storage history worker panicked"))??;
373
374 info!(
375 target: "tempo::cli",
376 addresses = accounts_seen.len(),
377 "Hashed accounts written, computing state root and trie nodes..."
378 );
379
380 let trie_start = Instant::now();
383 provider_rw.tx_ref().clear::<tables::AccountsTrie>()?;
384 provider_rw.tx_ref().clear::<tables::StoragesTrie>()?;
385
386 let mut resume: Option<IntermediateStateRootState> = None;
387 let mut trie_writes = 0usize;
388
389 let state_root = {
391 use reth_trie_db::{
392 DatabaseHashedCursorFactory, DatabaseTrieCursorFactory, PackedKeyAdapter,
393 };
394 type DbStateRoot<'a, TX> = reth_trie::StateRoot<
395 DatabaseTrieCursorFactory<&'a TX, PackedKeyAdapter>,
396 DatabaseHashedCursorFactory<&'a TX>,
397 >;
398
399 loop {
401 match DbStateRoot::<_>::from_tx(provider_rw.tx_ref())
402 .with_intermediate_state(resume)
403 .root_with_progress()?
404 {
405 StateRootProgress::Progress(state, _, updates) => {
406 trie_writes += provider_rw.write_trie_updates(updates)?;
407 info!(
408 target: "tempo::cli",
409 last_key = %state.account_root_state.last_hashed_key,
410 trie_writes,
411 elapsed = ?trie_start.elapsed(),
412 "Flushing trie updates"
413 );
414 resume = Some(*state);
415 }
416 StateRootProgress::Complete(root, _, updates) => {
417 trie_writes += provider_rw.write_trie_updates(updates)?;
418 break root;
419 }
420 }
421 }
422 };
423
424 info!(
425 target: "tempo::cli",
426 %state_root,
427 trie_writes,
428 elapsed = ?trie_start.elapsed(),
429 "State root computed"
430 );
431
432 provider_rw.commit()?;
434
435 info!(
436 target: "tempo::cli",
437 total_blocks,
438 total_entries,
439 %state_root,
440 "Binary state dump loaded successfully"
441 );
442
443 Ok(())
444 }
445}
446
447fn write_storage_changesets<P>(
448 provider: P,
449 mut collector: Collector<Vec<u8>, CompactU256>,
450) -> eyre::Result<()>
451where
452 P: StaticFileProviderFactory + Send + 'static,
453{
454 info!(target: "tempo::cli", "Writing storage changesets...");
455
456 provider
457 .static_file_provider()
458 .delete_segment(StaticFileSegment::StorageChangeSets)?;
459
460 let mut writer = provider.get_static_file_writer(0, StaticFileSegment::StorageChangeSets)?;
461 writer.begin_storage_changeset(0)?;
462 let total = collector.len();
463 load_storage_etl(
464 &mut collector,
465 total,
466 "storage changeset",
467 |address, key, value| {
468 writer.append_storage_changeset_entry(StorageBeforeTx {
469 address,
470 key,
471 value,
472 })?;
473 Ok(())
474 },
475 )?;
476 drop(writer);
477
478 Ok(())
479}
480
481fn write_storage_history<P>(
482 provider: P,
483 mut collector: Collector<Vec<u8>, CompactU256>,
484) -> eyre::Result<()>
485where
486 P: RocksDBProviderFactory + Send + 'static,
487{
488 info!(target: "tempo::cli", "Writing storage history...");
489
490 let rocksdb = provider.rocksdb_provider();
491 let mut batch = rocksdb.batch_with_auto_commit();
492 let block_zero_history =
493 tables::BlockNumberList::new([0]).expect("single block is always sorted");
494 let total = collector.len();
495 load_storage_etl(
496 &mut collector,
497 total,
498 "storage history",
499 |address, key, _| {
500 let history_key = StorageShardedKey::last(address, key);
501 if batch
502 .get::<tables::StoragesHistory>(history_key.clone())?
503 .is_none()
504 {
505 batch.put::<tables::StoragesHistory>(history_key, &block_zero_history)?;
506 }
507 Ok(())
508 },
509 )?;
510 batch.commit()?;
511
512 Ok(())
513}
514
515fn raw_storage_key(address: alloy_primitives::Address, slot: B256) -> Vec<u8> {
517 let mut key = Vec::with_capacity(52);
518 key.extend_from_slice(address.as_slice());
519 key.extend_from_slice(slot.as_slice());
520 key
521}
522
523fn decode_raw_storage_key(key: &[u8]) -> (alloy_primitives::Address, B256) {
524 (
525 alloy_primitives::Address::from_slice(&key[..20]),
526 B256::from_slice(&key[20..]),
527 )
528}
529
530fn load_storage_etl(
533 collector: &mut Collector<Vec<u8>, CompactU256>,
534 total: usize,
535 label: &str,
536 mut write: impl FnMut(alloy_primitives::Address, B256, U256) -> eyre::Result<()>,
537) -> eyre::Result<()> {
538 let total = total.max(1);
539 let interval = (total / 10).max(1);
540 let mut pending: Option<(Vec<u8>, Vec<u8>)> = None;
541 for (index, item) in collector.iter()?.enumerate() {
542 if index > 0 && index % interval == 0 {
543 info!(
544 target: "tempo::cli",
545 progress = format_args!("{:.2}%", (index as f64 / total as f64) * 100.0),
546 "Inserting {label}"
547 );
548 }
549
550 let (key, value) = item.wrap_err("ETL iteration failed")?;
551 if let Some((ref prev_key, ref prev_val)) = pending
552 && *prev_key != key
553 {
554 let (address, storage_key) = decode_raw_storage_key(prev_key);
555 write(
556 address,
557 storage_key,
558 CompactU256::decompress_owned(prev_val.clone())?.into(),
559 )?;
560 }
561 pending = Some((key, value));
562 }
563
564 if let Some((key, val)) = pending {
565 let (address, storage_key) = decode_raw_storage_key(&key);
566 write(
567 address,
568 storage_key,
569 CompactU256::decompress_owned(val)?.into(),
570 )?;
571 }
572
573 Ok(())
574}
575
576fn load_etl_to_cursor(
579 collector: &mut Collector<Vec<u8>, CompactU256>,
580 total: usize,
581 label: &str,
582 mut append: impl FnMut(&[u8], U256) -> Result<(), reth_db_api::DatabaseError>,
583) -> eyre::Result<()> {
584 let interval = (total / 10).max(1);
585 let mut pending: Option<(Vec<u8>, Vec<u8>)> = None;
586 for (index, item) in collector.iter()?.enumerate() {
587 if index > 0 && index % interval == 0 {
588 info!(
589 target: "tempo::cli",
590 progress = format_args!("{:.2}%", (index as f64 / total as f64) * 100.0),
591 "Inserting {label}"
592 );
593 }
594
595 let (key, value) = item.wrap_err("ETL iteration failed")?;
596 if let Some((ref prev_key, ref prev_val)) = pending
597 && *prev_key != key
598 {
599 append(
600 prev_key,
601 CompactU256::decompress_owned(prev_val.clone())?.into(),
602 )
603 .wrap_err("cursor append failed")?;
604 }
605 pending = Some((key, value));
606 }
607 if let Some((key, val)) = pending {
608 append(&key, CompactU256::decompress_owned(val)?.into())
609 .wrap_err("cursor append failed")?;
610 }
611 Ok(())
612}
613
614fn log_collection_progress(
616 address: &alloy_primitives::Address,
617 index: u64,
618 total: u64,
619 start: Instant,
620 last_log: &mut Instant,
621) {
622 if last_log.elapsed() >= Duration::from_secs(5) || index + 1 == total {
623 let pct = ((index + 1) as f64 / total as f64) * 100.0;
624 let elapsed = start.elapsed();
625 let pairs_per_sec = (index + 1) as f64 / elapsed.as_secs_f64();
626 info!(
627 target: "tempo::cli",
628 %address,
629 progress = format_args!("{}/{} ({pct:.0}%)", index + 1, total),
630 elapsed = ?elapsed,
631 pairs_per_sec = pairs_per_sec as u64,
632 "Collecting storage"
633 );
634 *last_log = Instant::now();
635 }
636}