Skip to main content

tempo/
init_state.rs

1//! Initialize state from a binary dump file.
2//!
3//! This command loads TIP20 storage slots from a binary file and applies them
4//! to the genesis state. The binary format is produced by `tempo-xtask generate-state-bloat`.
5
6use std::{
7    collections::HashSet,
8    fs::File,
9    io::{BufReader, Read},
10    path::PathBuf,
11    sync::mpsc,
12    thread,
13    time::{Duration, Instant},
14};
15
16use alloy_primitives::{
17    B256, U256, keccak256,
18    map::{AddressMap, Entry},
19};
20use clap::Parser;
21use eyre::{Context as _, ensure};
22use reth_chainspec::EthereumHardforks;
23use reth_cli_commands::common::{AccessRights, CliNodeTypes, EnvironmentArgs};
24use reth_db_api::{
25    cursor::{DbCursorRO, DbDupCursorRW},
26    models::{CompactU256, StorageBeforeTx, storage_sharded_key::StorageShardedKey},
27    table::Decompress,
28    tables,
29    transaction::{DbTx, DbTxMut},
30};
31use reth_ethereum::{chainspec::EthChainSpec, tasks::Runtime};
32use reth_etl::Collector;
33use reth_primitives_traits::{Account, StorageEntry};
34use reth_provider::{
35    BlockNumReader, DBProvider, DatabaseProviderFactory, HashingWriter, RocksDBProviderFactory,
36    StaticFileProviderFactory, StaticFileSegment, StorageChangeSetReader, StorageSettingsCache,
37    TrieWriter,
38};
39use reth_trie::{IntermediateStateRootState, StateRootProgress};
40use reth_trie_db::DatabaseStateRoot;
41use tempo_chainspec::spec::TempoChainSpecParser;
42use tracing::info;
43
44/// Magic bytes for the state bloat binary format (8 bytes)
45const MAGIC: &[u8; 8] = b"TEMPOSB\x00";
46
47/// Expected format version
48const VERSION: u16 = 1;
49
50/// ETL collector file size (200 MiB per temp file before spilling a new one).
51const ETL_FILE_SIZE: usize = 200 * 1024 * 1024;
52
53/// Maximum number of storage entries to hash per worker batch.
54const WORKER_CHUNK_SIZE: usize = 100;
55
56/// Bounded channel depth for the hashing worker thread.
57const HASH_WORKER_QUEUE_DEPTH: usize = 256;
58
59/// Initialize state from a binary dump file.
60#[derive(Debug, Parser)]
61pub struct InitFromBinaryDump<C: reth_cli::chainspec::ChainSpecParser = TempoChainSpecParser> {
62    #[command(flatten)]
63    env: EnvironmentArgs<C>,
64
65    /// Path to the binary state dump file.
66    ///
67    /// The file should be generated by `tempo-xtask generate-state-bloat`.
68    #[arg(value_name = "BINARY_DUMP_FILE")]
69    state: PathBuf,
70}
71
72impl<C: reth_cli::chainspec::ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>>
73    InitFromBinaryDump<C>
74{
75    /// Execute the init-from-binary-dump command.
76    pub(crate) async fn execute<N>(self, runtime: Runtime) -> eyre::Result<()>
77    where
78        N: CliNodeTypes<ChainSpec = C::ChainSpec>,
79    {
80        info!(target: "tempo::cli", "Tempo init-from-binary-dump starting");
81
82        let etl_dir = self
83            .env
84            .datadir
85            .clone()
86            .resolve_datadir(self.env.chain.chain())
87            .data_dir()
88            .join("etl");
89        let environment = self.env.init::<N>(AccessRights::RW, runtime)?;
90        let provider_factory = environment.provider_factory;
91
92        let provider_rw = provider_factory.database_provider_rw()?;
93
94        // Verify we're at genesis (block 0)
95        let last_block = provider_rw.last_block_number()?;
96        ensure!(
97            last_block == 0,
98            "init-from-binary-dump must be run on a freshly initialized database at block 0, \
99             but found block {last_block}"
100        );
101
102        info!(target: "tempo::cli", path = %self.state.display(), "Loading binary state dump");
103
104        let file = File::open(&self.state)
105            .wrap_err_with(|| format!("failed to open {}", self.state.display()))?;
106        let mut reader = BufReader::with_capacity(64 * 1024 * 1024, file);
107
108        let mut total_entries = 0u64;
109        let mut total_blocks = 0u64;
110
111        // Track addresses and their account data for hashing
112        let mut accounts_seen: AddressMap<Account> = AddressMap::default();
113        let mut genesis_storage_keys = HashSet::new();
114
115        // ETL collectors: accumulate entries sorted, spill to disk when full
116        let mut hash_chunk: Vec<(alloy_primitives::Address, B256, CompactU256)> =
117            Vec::with_capacity(WORKER_CHUNK_SIZE);
118        let mut storage_changeset_collector: Collector<Vec<u8>, CompactU256> =
119            Collector::new(ETL_FILE_SIZE, Some(etl_dir.clone()));
120        let mut storage_history_collector: Collector<Vec<u8>, CompactU256> =
121            Collector::new(ETL_FILE_SIZE, Some(etl_dir.clone()));
122
123        for (index, entry) in provider_rw.storage_changeset(0)? {
124            let raw_key = raw_storage_key(index.address(), entry.key);
125            genesis_storage_keys.insert(raw_key.clone());
126            storage_changeset_collector
127                .insert(raw_key, CompactU256::from(entry.value))
128                .wrap_err("storage changeset ETL insert of genesis storage failed")?;
129        }
130
131        // Single worker thread for keccak hashing: owns the hashed ETL collector, receives
132        // batches over a bounded channel, and returns the collector when the sender drops.
133        let (hash_tx, hash_rx) = mpsc::sync_channel::<
134            Vec<(alloy_primitives::Address, B256, CompactU256)>,
135        >(HASH_WORKER_QUEUE_DEPTH);
136        let hashed_etl_dir = etl_dir;
137        let hash_worker =
138            thread::spawn(move || -> eyre::Result<Collector<Vec<u8>, CompactU256>> {
139                let mut hashed_collector: Collector<Vec<u8>, CompactU256> =
140                    Collector::new(ETL_FILE_SIZE, Some(hashed_etl_dir));
141                while let Ok(chunk) = hash_rx.recv() {
142                    let mut last_addr = alloy_primitives::Address::ZERO;
143                    let mut hashed_addr = B256::ZERO;
144                    for (address, slot, value) in chunk {
145                        if address != last_addr {
146                            last_addr = address;
147                            hashed_addr = keccak256(address);
148                        }
149                        let mut hashed_key = Vec::with_capacity(64);
150                        hashed_key.extend_from_slice(hashed_addr.as_slice());
151                        hashed_key.extend_from_slice(keccak256(slot).as_slice());
152                        hashed_collector
153                            .insert(hashed_key, value)
154                            .wrap_err("hashed ETL insert failed")?;
155                    }
156                }
157                Ok(hashed_collector)
158            });
159
160        // Process blocks from binary file
161        loop {
162            // Read next block header; EOF means no more blocks.
163            let mut header_buf = [0u8; 40];
164            match reader.read_exact(&mut header_buf) {
165                Ok(()) => {}
166                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
167                Err(e) => return Err(e).wrap_err("failed to read block header"),
168            }
169
170            // Validate magic
171            ensure!(
172                &header_buf[..8] == MAGIC,
173                "invalid magic bytes in block header"
174            );
175
176            // Validate version
177            let version = u16::from_be_bytes([header_buf[8], header_buf[9]]);
178            ensure!(
179                version == VERSION,
180                "unsupported binary format version {version}, expected {VERSION}"
181            );
182
183            // Skip flags (2 bytes at offset 10)
184
185            // Read address (20 bytes at offset 12)
186            let mut address_bytes = [0u8; 20];
187            address_bytes.copy_from_slice(&header_buf[12..32]);
188            let address = alloy_primitives::Address::from(address_bytes);
189
190            // Read pair count (8 bytes at offset 32)
191            let pair_count = u64::from_be_bytes(header_buf[32..40].try_into().unwrap());
192
193            info!(
194                target: "tempo::cli",
195                %address,
196                pair_count,
197                "Processing token storage block"
198            );
199
200            // Ensure the account exists in the canonical state table (only on first encounter).
201            // The binary dump is chunked: generate-state-bloat writes one block per token
202            // per chunk, so the same token address appears in multiple blocks. This entry
203            // is Vacant on the first chunk and Occupied on subsequent ones.
204            // Preserving the genesis account is critical: TIP20 tokens have bytecode (0xEF)
205            // set during genesis, and overwriting with Account::default() would clear the
206            // code hash, making the token appear uninitialized.
207            if let Entry::Vacant(e) = accounts_seen.entry(address) {
208                let hashed_address = keccak256(address);
209                let mut account_cursor = provider_rw
210                    .tx_ref()
211                    .cursor_read::<tables::HashedAccounts>()?;
212                let account = match account_cursor.seek_exact(hashed_address)? {
213                    Some((_, account)) => account,
214                    None => {
215                        return Err(eyre::eyre!(
216                            "state bloat references account {address} that is missing from genesis"
217                        ));
218                    }
219                };
220                e.insert(account);
221            }
222
223            // Read entries into the hashed ETL collector.
224            let mut entry_buf = [0u8; 64];
225            let start = Instant::now();
226            let mut last_log = start;
227
228            for i in 0..pair_count {
229                reader
230                    .read_exact(&mut entry_buf)
231                    .wrap_err("failed to read storage entry")?;
232
233                let slot = B256::from_slice(&entry_buf[..32]);
234                let value = U256::from_be_bytes::<32>(entry_buf[32..64].try_into().unwrap());
235
236                // Skip zero values (they represent deletion)
237                if value.is_zero() {
238                    continue;
239                }
240
241                let compact_value = CompactU256::from(value);
242
243                let raw_key = raw_storage_key(address, slot);
244                if !genesis_storage_keys.contains(&raw_key) {
245                    storage_changeset_collector
246                        .insert(raw_key.clone(), CompactU256::from(U256::ZERO))
247                        .wrap_err("storage changeset ETL insert failed")?;
248                    storage_history_collector
249                        .insert(raw_key, CompactU256::from(U256::ZERO))
250                        .wrap_err("storage history ETL insert failed")?;
251                }
252
253                // Queue raw data for parallel hashing
254                hash_chunk.push((address, slot, compact_value));
255
256                // Send full batches to the hashing worker thread.
257                if hash_chunk.len() >= WORKER_CHUNK_SIZE {
258                    let chunk =
259                        std::mem::replace(&mut hash_chunk, Vec::with_capacity(WORKER_CHUNK_SIZE));
260                    hash_tx.send(chunk).wrap_err("hash worker disconnected")?;
261                }
262
263                total_entries += 1;
264
265                log_collection_progress(&address, i, pair_count, start, &mut last_log);
266            }
267
268            total_blocks += 1;
269        }
270
271        // Send any remaining entries to the worker and join.
272        if !hash_chunk.is_empty() {
273            hash_tx
274                .send(std::mem::take(&mut hash_chunk))
275                .wrap_err("hash worker disconnected")?;
276        }
277        drop(hash_tx);
278        let mut hashed_collector = hash_worker
279            .join()
280            .map_err(|_| eyre::eyre!("hash worker panicked"))??;
281
282        info!(
283            target: "tempo::cli",
284            total_blocks,
285            total_entries,
286            "Entries collected, merging genesis hashed storage into ETL collector..."
287        );
288
289        // Merge existing genesis hashed storage into the collector.
290        {
291            let tx = provider_rw.tx_ref();
292            let mut cursor = tx.cursor_read::<tables::HashedStorages>()?;
293            let mut genesis_count = 0usize;
294            let walker = cursor.walk(None)?;
295            for row in walker {
296                let (hashed_address, entry) = row?;
297                let mut key = Vec::with_capacity(64);
298                key.extend_from_slice(hashed_address.as_slice());
299                key.extend_from_slice(entry.key.as_slice());
300                hashed_collector
301                    .insert(key, CompactU256::from(entry.value))
302                    .wrap_err("ETL insert of genesis hashed storage failed")?;
303                genesis_count += 1;
304            }
305            info!(
306                target: "tempo::cli",
307                genesis_count,
308                "Genesis hashed storage entries merged into collector"
309            );
310        }
311
312        let storage_settings = provider_rw.cached_storage_settings();
313        ensure!(
314            storage_settings.storage_v2,
315            "init-from-binary-dump only supports storage v2 databases"
316        );
317
318        let storage_changeset_factory = provider_factory.clone();
319        let storage_changeset_worker = thread::spawn(move || {
320            write_storage_changesets(storage_changeset_factory, storage_changeset_collector)
321        });
322
323        let storage_history_factory = provider_factory;
324        let storage_history_worker = thread::spawn(move || {
325            write_storage_history(storage_history_factory, storage_history_collector)
326        });
327
328        // Load sorted entries from each ETL collector into its database table.
329        // Strategy: iterate the sorted collector, deduplicate consecutive entries with
330        // the same composite key, and bulk-insert via append_dup.
331        // The table is cleared first so append_dup ordering is guaranteed.
332        let total_hashes = hashed_collector.len();
333        provider_rw.tx_ref().clear::<tables::HashedStorages>()?;
334        let mut hashed_cursor = provider_rw
335            .tx_ref()
336            .cursor_dup_write::<tables::HashedStorages>()?;
337        load_etl_to_cursor(
338            &mut hashed_collector,
339            total_hashes,
340            "hashed storage",
341            |k, v| {
342                hashed_cursor.append_dup(
343                    B256::from_slice(&k[..32]),
344                    StorageEntry {
345                        key: B256::from_slice(&k[32..]),
346                        value: v,
347                    },
348                )
349            },
350        )?;
351        drop(hashed_cursor);
352
353        info!(
354            target: "tempo::cli",
355            total_hashes,
356            "Storage written, writing hashed accounts..."
357        );
358
359        // Write hashed account entries using the real account metadata from plain state.
360        // This preserves bytecode_hash for genesis accounts (e.g. TIP20 tokens with 0xEF code).
361        provider_rw.insert_account_for_hashing(
362            accounts_seen
363                .iter()
364                .map(|(addr, account)| (*addr, Some(*account))),
365        )?;
366
367        storage_changeset_worker
368            .join()
369            .map_err(|_| eyre::eyre!("storage changeset worker panicked"))??;
370        storage_history_worker
371            .join()
372            .map_err(|_| eyre::eyre!("storage history worker panicked"))??;
373
374        info!(
375            target: "tempo::cli",
376            addresses = accounts_seen.len(),
377            "Hashed accounts written, computing state root and trie nodes..."
378        );
379
380        // Rebuild the merkle trie from scratch so the sparse trie cache on
381        // block 1 doesn't hit stale genesis nodes and stall on a full rebuild.
382        let trie_start = Instant::now();
383        provider_rw.tx_ref().clear::<tables::AccountsTrie>()?;
384        provider_rw.tx_ref().clear::<tables::StoragesTrie>()?;
385
386        let mut resume: Option<IntermediateStateRootState> = None;
387        let mut trie_writes = 0usize;
388
389        // Incrementally compute the merkle root over all hashed accounts/storages.
390        let state_root = {
391            use reth_trie_db::{
392                DatabaseHashedCursorFactory, DatabaseTrieCursorFactory, PackedKeyAdapter,
393            };
394            type DbStateRoot<'a, TX> = reth_trie::StateRoot<
395                DatabaseTrieCursorFactory<&'a TX, PackedKeyAdapter>,
396                DatabaseHashedCursorFactory<&'a TX>,
397            >;
398
399            // Compute state root in chunks, flushing trie nodes to disk between iterations.
400            loop {
401                match DbStateRoot::<_>::from_tx(provider_rw.tx_ref())
402                    .with_intermediate_state(resume)
403                    .root_with_progress()?
404                {
405                    StateRootProgress::Progress(state, _, updates) => {
406                        trie_writes += provider_rw.write_trie_updates(updates)?;
407                        info!(
408                            target: "tempo::cli",
409                            last_key = %state.account_root_state.last_hashed_key,
410                            trie_writes,
411                            elapsed = ?trie_start.elapsed(),
412                            "Flushing trie updates"
413                        );
414                        resume = Some(*state);
415                    }
416                    StateRootProgress::Complete(root, _, updates) => {
417                        trie_writes += provider_rw.write_trie_updates(updates)?;
418                        break root;
419                    }
420                }
421            }
422        };
423
424        info!(
425            target: "tempo::cli",
426            %state_root,
427            trie_writes,
428            elapsed = ?trie_start.elapsed(),
429            "State root computed"
430        );
431
432        // Final commit
433        provider_rw.commit()?;
434
435        info!(
436            target: "tempo::cli",
437            total_blocks,
438            total_entries,
439            %state_root,
440            "Binary state dump loaded successfully"
441        );
442
443        Ok(())
444    }
445}
446
447fn write_storage_changesets<P>(
448    provider: P,
449    mut collector: Collector<Vec<u8>, CompactU256>,
450) -> eyre::Result<()>
451where
452    P: StaticFileProviderFactory + Send + 'static,
453{
454    info!(target: "tempo::cli", "Writing storage changesets...");
455
456    provider
457        .static_file_provider()
458        .delete_segment(StaticFileSegment::StorageChangeSets)?;
459
460    let mut writer = provider.get_static_file_writer(0, StaticFileSegment::StorageChangeSets)?;
461    writer.begin_storage_changeset(0)?;
462    let total = collector.len();
463    load_storage_etl(
464        &mut collector,
465        total,
466        "storage changeset",
467        |address, key, value| {
468            writer.append_storage_changeset_entry(StorageBeforeTx {
469                address,
470                key,
471                value,
472            })?;
473            Ok(())
474        },
475    )?;
476    drop(writer);
477
478    Ok(())
479}
480
481fn write_storage_history<P>(
482    provider: P,
483    mut collector: Collector<Vec<u8>, CompactU256>,
484) -> eyre::Result<()>
485where
486    P: RocksDBProviderFactory + Send + 'static,
487{
488    info!(target: "tempo::cli", "Writing storage history...");
489
490    let rocksdb = provider.rocksdb_provider();
491    let mut batch = rocksdb.batch_with_auto_commit();
492    let block_zero_history =
493        tables::BlockNumberList::new([0]).expect("single block is always sorted");
494    let total = collector.len();
495    load_storage_etl(
496        &mut collector,
497        total,
498        "storage history",
499        |address, key, _| {
500            let history_key = StorageShardedKey::last(address, key);
501            if batch
502                .get::<tables::StoragesHistory>(history_key.clone())?
503                .is_none()
504            {
505                batch.put::<tables::StoragesHistory>(history_key, &block_zero_history)?;
506            }
507            Ok(())
508        },
509    )?;
510    batch.commit()?;
511
512    Ok(())
513}
514
515/// Composite ETL key for unhashed storage, sorted by address then slot.
516fn raw_storage_key(address: alloy_primitives::Address, slot: B256) -> Vec<u8> {
517    let mut key = Vec::with_capacity(52);
518    key.extend_from_slice(address.as_slice());
519    key.extend_from_slice(slot.as_slice());
520    key
521}
522
523fn decode_raw_storage_key(key: &[u8]) -> (alloy_primitives::Address, B256) {
524    (
525        alloy_primitives::Address::from_slice(&key[..20]),
526        B256::from_slice(&key[20..]),
527    )
528}
529
530/// Iterate a raw storage ETL collector, deduplicate consecutive entries with the
531/// same `(address, slot)` key, and call `write` for each unique entry.
532fn load_storage_etl(
533    collector: &mut Collector<Vec<u8>, CompactU256>,
534    total: usize,
535    label: &str,
536    mut write: impl FnMut(alloy_primitives::Address, B256, U256) -> eyre::Result<()>,
537) -> eyre::Result<()> {
538    let total = total.max(1);
539    let interval = (total / 10).max(1);
540    let mut pending: Option<(Vec<u8>, Vec<u8>)> = None;
541    for (index, item) in collector.iter()?.enumerate() {
542        if index > 0 && index % interval == 0 {
543            info!(
544                target: "tempo::cli",
545                progress = format_args!("{:.2}%", (index as f64 / total as f64) * 100.0),
546                "Inserting {label}"
547            );
548        }
549
550        let (key, value) = item.wrap_err("ETL iteration failed")?;
551        if let Some((ref prev_key, ref prev_val)) = pending
552            && *prev_key != key
553        {
554            let (address, storage_key) = decode_raw_storage_key(prev_key);
555            write(
556                address,
557                storage_key,
558                CompactU256::decompress_owned(prev_val.clone())?.into(),
559            )?;
560        }
561        pending = Some((key, value));
562    }
563
564    if let Some((key, val)) = pending {
565        let (address, storage_key) = decode_raw_storage_key(&key);
566        write(
567            address,
568            storage_key,
569            CompactU256::decompress_owned(val)?.into(),
570        )?;
571    }
572
573    Ok(())
574}
575
576/// Iterate a sorted ETL collector, deduplicate consecutive entries with the same key
577/// (keeping the last value), and call `append` for each unique entry.
578fn load_etl_to_cursor(
579    collector: &mut Collector<Vec<u8>, CompactU256>,
580    total: usize,
581    label: &str,
582    mut append: impl FnMut(&[u8], U256) -> Result<(), reth_db_api::DatabaseError>,
583) -> eyre::Result<()> {
584    let interval = (total / 10).max(1);
585    let mut pending: Option<(Vec<u8>, Vec<u8>)> = None;
586    for (index, item) in collector.iter()?.enumerate() {
587        if index > 0 && index % interval == 0 {
588            info!(
589                target: "tempo::cli",
590                progress = format_args!("{:.2}%", (index as f64 / total as f64) * 100.0),
591                "Inserting {label}"
592            );
593        }
594
595        let (key, value) = item.wrap_err("ETL iteration failed")?;
596        if let Some((ref prev_key, ref prev_val)) = pending
597            && *prev_key != key
598        {
599            append(
600                prev_key,
601                CompactU256::decompress_owned(prev_val.clone())?.into(),
602            )
603            .wrap_err("cursor append failed")?;
604        }
605        pending = Some((key, value));
606    }
607    if let Some((key, val)) = pending {
608        append(&key, CompactU256::decompress_owned(val)?.into())
609            .wrap_err("cursor append failed")?;
610    }
611    Ok(())
612}
613
614/// Log collection progress every 5 seconds and on the final entry.
615fn log_collection_progress(
616    address: &alloy_primitives::Address,
617    index: u64,
618    total: u64,
619    start: Instant,
620    last_log: &mut Instant,
621) {
622    if last_log.elapsed() >= Duration::from_secs(5) || index + 1 == total {
623        let pct = ((index + 1) as f64 / total as f64) * 100.0;
624        let elapsed = start.elapsed();
625        let pairs_per_sec = (index + 1) as f64 / elapsed.as_secs_f64();
626        info!(
627            target: "tempo::cli",
628            %address,
629            progress = format_args!("{}/{} ({pct:.0}%)", index + 1, total),
630            elapsed = ?elapsed,
631            pairs_per_sec = pairs_per_sec as u64,
632            "Collecting storage"
633        );
634        *last_log = Instant::now();
635    }
636}