Skip to main content

tempo/
p2p_proxy.rs

1use alloy::{
2    consensus::BlockHeader,
3    eips::{BlockHashOrNumber, BlockNumberOrTag},
4    network::{Network, primitives::HeaderResponse},
5    primitives::B256,
6    providers::{Provider, ProviderBuilder},
7    rpc::client::BatchRequest,
8};
9use alloy_rlp::Encodable;
10use clap::Parser;
11use eyre::{Context, Result};
12use futures::StreamExt;
13use reth_chainspec::Head;
14use reth_cli_util::get_secret_key;
15use reth_eth_wire_types::{
16    HeadersDirection, PooledTransactions, primitives::BasicNetworkPrimitives,
17};
18use reth_ethereum::{
19    network::{
20        NetworkConfig, NetworkEventListenerProvider, NetworkInfo, NetworkManager, PeersConfig,
21        PeersInfo, eth_requests::IncomingEthRequest, transactions::NetworkTransactionEvent,
22    },
23    tasks::Runtime,
24};
25use std::{
26    collections::{BTreeMap, HashMap},
27    path::PathBuf,
28    sync::{
29        Arc,
30        atomic::{AtomicU64, Ordering},
31    },
32    time::Duration,
33};
34use tempo_alloy::TempoNetwork;
35use tempo_chainspec::spec::{TempoChainSpec, chain_value_parser};
36use tempo_primitives::{TempoHeader, TempoPrimitives, TempoTxEnvelope};
37use tokio::sync::{mpsc, oneshot};
38use tracing::{debug, error, info};
39
40/// Tempo-specific network primitives for the proxy node.
41type TempoNetPrimitives = BasicNetworkPrimitives<TempoPrimitives, TempoTxEnvelope>;
42type TempoRpcBlock = <TempoNetwork as Network>::BlockResponse;
43
44/// 3 hrs of blocks at 500ms block time.
45const CACHE_CAPACITY: u64 = 60 * 60 * 6; // 21600
46/// Maximum number of headers to fetch per RPC batch request.
47const HEADER_RPC_BATCH_SIZE: usize = 128;
48/// Maximum number of block headers to serve in a `GetBlockHeaders` response.
49const MAX_HEADERS_SERVE: usize = 1024;
50/// Soft cap on the total encoded body size in a `GetBlockBodies` response.
51const SOFT_BODY_RESPONSE_SIZE_LIMIT: usize = 1024 * 1024; // 1 MiB
52
53#[derive(Parser, Debug)]
54#[command(
55    about = "Run a proxy P2P node that serves cached block data fetched from an RPC endpoint"
56)]
57pub struct P2pProxyArgs {
58    /// RPC endpoint to fetch blocks from (HTTP or WebSocket).
59    #[arg(long, required = true)]
60    rpc_url: String,
61
62    /// Chain to connect to.
63    #[arg(long, default_value = "mainnet")]
64    chain: String,
65
66    /// Port for the P2P listener.
67    #[arg(long, default_value_t = 30303)]
68    port: u16,
69
70    /// Discovery port.
71    #[arg(long)]
72    discovery_port: Option<u16>,
73
74    /// Maximum number of inbound peer connections.
75    #[arg(long, default_value_t = 100)]
76    max_inbound: usize,
77
78    /// Maximum number of concurrent incoming connection attempts.
79    #[arg(long, default_value_t = 30)]
80    max_concurrent_inbound: usize,
81
82    /// Number of blocks to cache.
83    #[arg(long)]
84    cache_blocks: Option<u64>,
85
86    /// Path to the P2P secret key file. If the file doesn't exist, a new key is generated
87    /// and saved. If omitted, an ephemeral key is generated on each start.
88    #[arg(long)]
89    p2p_secret_key: Option<PathBuf>,
90}
91
92impl P2pProxyArgs {
93    pub(crate) async fn run(self) -> Result<()> {
94        let chain_spec = chain_value_parser(&self.chain)?;
95
96        // Fetch latest head from RPC for the network status handshake
97        let provider = ProviderBuilder::new_with_network::<TempoNetwork>()
98            .connect(&self.rpc_url)
99            .await
100            .context("failed to connect to RPC")?;
101        let latest_block = provider
102            .get_block_by_number(Default::default())
103            .await
104            .context("failed to fetch latest block")?
105            .ok_or_else(|| eyre::eyre!("latest block not found"))?;
106        let head = Head {
107            number: latest_block.header.number(),
108            hash: latest_block.header.hash(),
109            difficulty: latest_block.header.difficulty(),
110            total_difficulty: latest_block.header.difficulty(),
111            timestamp: latest_block.header.timestamp(),
112        };
113        info!(number = head.number, hash = %head.hash, "fetched latest head");
114
115        // Channel for the single fetcher service
116        let (fetch_tx, fetch_rx) = mpsc::channel::<FetchRequest>(256);
117
118        // Spawn the block fetcher service
119        let rpc_url = self.rpc_url.clone();
120        let cache_blocks = self.cache_blocks.unwrap_or(CACHE_CAPACITY);
121        tokio::spawn(async move {
122            if let Err(err) = run_fetcher_service(rpc_url, fetch_rx, cache_blocks).await {
123                error!(%err, "block fetcher service exited with error");
124            }
125        });
126
127        // Resolve the P2P secret key: load from file (creating if needed), or ephemeral
128        let secret_key = match &self.p2p_secret_key {
129            Some(path) => get_secret_key(path).context("failed to load P2P secret key")?,
130            None => reth_cli_util::load_secret_key::rng_secret_key(),
131        };
132
133        // Launch the P2P network
134        let net_cfg = NetConfig {
135            port: self.port,
136            discovery_port: self.discovery_port,
137            max_inbound: self.max_inbound,
138            max_concurrent_inbound: self.max_concurrent_inbound,
139            head,
140        };
141        run_p2p_network(chain_spec, net_cfg, fetch_tx, secret_key).await
142    }
143}
144
145/// Resolved network configuration passed to `run_p2p_network`.
146struct NetConfig {
147    port: u16,
148    discovery_port: Option<u16>,
149    max_inbound: usize,
150    max_concurrent_inbound: usize,
151    head: Head,
152}
153
154/// Shared counters for periodic proxy stats logging.
155struct RequestStats {
156    header_requests_received: AtomicU64,
157    body_requests_received: AtomicU64,
158    headers_served: AtomicU64,
159    bodies_served: AtomicU64,
160}
161
162/// Messages from the request handler to the single block-fetcher service.
163enum FetchRequest {
164    GetHeaders {
165        request: reth_eth_wire_types::GetBlockHeaders,
166        response: oneshot::Sender<Vec<TempoHeader>>,
167    },
168    GetBodies {
169        hashes: Vec<B256>,
170        response: oneshot::Sender<Vec<tempo_primitives::BlockBody>>,
171    },
172}
173
174/// A cached block: header + body, indexed by number and hash.
175struct BlockCache {
176    /// Blocks ordered by number.
177    by_number: BTreeMap<u64, CachedBlock>,
178    /// Hash -> block number index.
179    by_hash: HashMap<B256, u64>,
180    capacity: u64,
181}
182
183impl BlockCache {
184    fn new(capacity: u64) -> Self {
185        Self {
186            by_number: BTreeMap::new(),
187            by_hash: HashMap::new(),
188            capacity,
189        }
190    }
191
192    fn insert_header(&mut self, number: u64, hash: B256, header: TempoHeader) {
193        self.upsert(number, hash, header, None);
194    }
195
196    fn insert_block(
197        &mut self,
198        number: u64,
199        hash: B256,
200        header: TempoHeader,
201        body: tempo_primitives::BlockBody,
202    ) {
203        self.upsert(number, hash, header, Some(body));
204    }
205
206    fn upsert(
207        &mut self,
208        number: u64,
209        hash: B256,
210        header: TempoHeader,
211        body: Option<tempo_primitives::BlockBody>,
212    ) {
213        if let Some(old_hash) = self.by_number.get(&number).map(|block| block.hash)
214            && old_hash != hash
215        {
216            self.by_hash.remove(&old_hash);
217        }
218
219        let body = match self.by_number.remove(&number) {
220            Some(existing) => body.or(existing.body),
221            None => body,
222        };
223
224        self.by_number
225            .insert(number, CachedBlock { header, body, hash });
226        self.by_hash.insert(hash, number);
227        self.evict();
228    }
229
230    fn evict(&mut self) {
231        while self.by_number.len() as u64 > self.capacity {
232            if let Some((_, block)) = self.by_number.pop_first() {
233                self.by_hash.remove(&block.hash);
234            }
235        }
236    }
237
238    fn get_by_number(&self, number: u64) -> Option<&CachedBlock> {
239        self.by_number.get(&number)
240    }
241
242    fn get_by_hash(&self, hash: &B256) -> Option<&CachedBlock> {
243        self.by_hash
244            .get(hash)
245            .and_then(|num| self.by_number.get(num))
246    }
247}
248
249#[derive(Clone)]
250struct CachedBlock {
251    header: TempoHeader,
252    body: Option<tempo_primitives::BlockBody>,
253    hash: B256,
254}
255
256/// Single block-fetcher service that owns the cache and handles all fetch requests.
257async fn run_fetcher_service(
258    rpc_url: String,
259    mut fetch_rx: mpsc::Receiver<FetchRequest>,
260    cache_blocks: u64,
261) -> Result<()> {
262    let provider = ProviderBuilder::new_with_network::<TempoNetwork>()
263        .connect(&rpc_url)
264        .await
265        .context("failed to connect to RPC")?;
266
267    let mut cache = BlockCache::new(cache_blocks);
268
269    // Process incoming requests
270    while let Some(req) = fetch_rx.recv().await {
271        match req {
272            FetchRequest::GetHeaders { request, response } => {
273                let headers = resolve_headers(&provider, &mut cache, &request).await;
274                let _ = response.send(headers);
275            }
276            FetchRequest::GetBodies { hashes, response } => {
277                let bodies = resolve_bodies(&provider, &mut cache, &hashes).await;
278                let _ = response.send(bodies);
279            }
280        }
281    }
282
283    Ok(())
284}
285
286/// Launch the P2P network and handle incoming eth requests.
287async fn run_p2p_network(
288    chain_spec: Arc<TempoChainSpec>,
289    cfg: NetConfig,
290    fetch_tx: mpsc::Sender<FetchRequest>,
291    secret_key: secp256k1::SecretKey,
292) -> Result<()> {
293    let peers_config = PeersConfig::default()
294        .with_max_inbound(cfg.max_inbound)
295        .with_max_outbound(0);
296
297    let mut builder = NetworkConfig::<_, TempoNetPrimitives>::builder(secret_key, Runtime::test())
298        .listener_port(cfg.port)
299        .disable_dns_discovery()
300        .disable_tx_gossip(true)
301        .peer_config(peers_config)
302        .set_head(cfg.head);
303
304    if let Some(dp) = cfg.discovery_port {
305        builder = builder.discovery_port(dp);
306    }
307
308    let mut config = builder.build_with_noop_provider(chain_spec);
309    config.sessions_config.session_event_buffer = cfg.max_concurrent_inbound;
310
311    let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1024);
312    let (transactions_tx, mut transactions_rx) = reth_metrics::common::mpsc::memory_bounded_channel(
313        config
314            .transactions_manager_config
315            .tx_channel_memory_limit_bytes,
316        "p2p-proxy.tx",
317    );
318
319    let network = NetworkManager::new(config)
320        .await
321        .context("failed to create network manager")?
322        .with_eth_request_handler(requests_tx)
323        .with_transactions(transactions_tx);
324
325    let handle = network.handle().clone();
326    info!(
327        peer_id = %handle.peer_id(),
328        local_addr = %handle.local_addr(),
329        max_inbound = cfg.max_inbound,
330        "P2P proxy node started"
331    );
332
333    // Print network events
334    let events_handle = handle.clone();
335    tokio::spawn(async move {
336        let mut events = events_handle.event_listener();
337        while let Some(event) = events.next().await {
338            debug!(?event, "network event");
339        }
340    });
341
342    // Drain transaction events — respond empty to all requests
343    tokio::spawn(async move {
344        while let Some(event) = transactions_rx.recv().await {
345            if let NetworkTransactionEvent::GetPooledTransactions { response, .. } = event {
346                let _ = response.send(Ok(PooledTransactions(vec![])));
347            }
348        }
349    });
350
351    // Spawn the network
352    tokio::spawn(network);
353
354    // Proxy stats for periodic logging
355    let stats = Arc::new(RequestStats {
356        header_requests_received: AtomicU64::new(0),
357        body_requests_received: AtomicU64::new(0),
358        headers_served: AtomicU64::new(0),
359        bodies_served: AtomicU64::new(0),
360    });
361
362    // Periodic stats logging
363    let stats_log = Arc::clone(&stats);
364    let stats_handle = handle.clone();
365    tokio::spawn(async move {
366        let mut interval = tokio::time::interval(Duration::from_secs(30));
367        interval.tick().await; // skip immediate first tick
368        loop {
369            interval.tick().await;
370            let header_requests_received =
371                stats_log.header_requests_received.load(Ordering::Relaxed);
372            let body_requests_received = stats_log.body_requests_received.load(Ordering::Relaxed);
373            let headers_served = stats_log.headers_served.load(Ordering::Relaxed);
374            let bodies_served = stats_log.bodies_served.load(Ordering::Relaxed);
375            let peers = stats_handle.num_connected_peers();
376            info!(
377                peers,
378                header_requests_received,
379                body_requests_received,
380                headers_served,
381                bodies_served,
382                "proxy stats"
383            );
384        }
385    });
386
387    // Handle incoming eth requests
388    while let Some(eth_request) = requests_rx.recv().await {
389        match eth_request {
390            IncomingEthRequest::GetBlockHeaders {
391                peer_id,
392                request,
393                response,
394            } => {
395                debug!(%peer_id, ?request, "received GetBlockHeaders");
396                stats
397                    .header_requests_received
398                    .fetch_add(1, Ordering::Relaxed);
399                let fetch_tx = fetch_tx.clone();
400                let stats = Arc::clone(&stats);
401                tokio::spawn(async move {
402                    let headers = async {
403                        let (tx, rx) = oneshot::channel();
404                        fetch_tx
405                            .send(FetchRequest::GetHeaders {
406                                request,
407                                response: tx,
408                            })
409                            .await
410                            .ok()?;
411                        rx.await.ok()
412                    }
413                    .await
414                    .unwrap_or_default();
415                    let headers_served = headers.len() as u64;
416                    if response.send(Ok(headers.into())).is_ok() {
417                        stats
418                            .headers_served
419                            .fetch_add(headers_served, Ordering::Relaxed);
420                    }
421                });
422            }
423            IncomingEthRequest::GetBlockBodies {
424                peer_id,
425                request,
426                response,
427            } => {
428                debug!(%peer_id, ?request, "received GetBlockBodies");
429                stats.body_requests_received.fetch_add(1, Ordering::Relaxed);
430                let fetch_tx = fetch_tx.clone();
431                let stats = Arc::clone(&stats);
432                tokio::spawn(async move {
433                    let bodies = async {
434                        let (tx, rx) = oneshot::channel();
435                        fetch_tx
436                            .send(FetchRequest::GetBodies {
437                                hashes: request.0,
438                                response: tx,
439                            })
440                            .await
441                            .ok()?;
442                        rx.await.ok()
443                    }
444                    .await
445                    .unwrap_or_default();
446                    let bodies_served = bodies.len() as u64;
447                    if response.send(Ok(bodies.into())).is_ok() {
448                        stats
449                            .bodies_served
450                            .fetch_add(bodies_served, Ordering::Relaxed);
451                    }
452                });
453            }
454            // All other requests get empty responses
455            IncomingEthRequest::GetNodeData { response, .. } => {
456                let _ = response.send(Ok(Default::default()));
457            }
458            IncomingEthRequest::GetReceipts { response, .. } => {
459                let _ = response.send(Ok(reth_eth_wire_types::Receipts(vec![])));
460            }
461            IncomingEthRequest::GetReceipts69 { response, .. } => {
462                let _ = response.send(Ok(reth_eth_wire_types::Receipts69(vec![])));
463            }
464            IncomingEthRequest::GetReceipts70 { response, .. } => {
465                let _ = response.send(Ok(reth_eth_wire_types::Receipts70 {
466                    last_block_incomplete: false,
467                    receipts: vec![],
468                }));
469            }
470            IncomingEthRequest::GetBlockAccessLists { response, .. } => {
471                let _ = response.send(Ok(Default::default()));
472            }
473            IncomingEthRequest::GetCells { response, .. } => {
474                let _ = response.send(Ok(Default::default()));
475            }
476        }
477    }
478
479    Ok(())
480}
481
482/// Fetch a single block by number and insert it into the cache.
483#[cfg(test)]
484async fn fetch_and_cache_block(
485    provider: &impl Provider<TempoNetwork>,
486    cache: &mut BlockCache,
487    number: u64,
488) -> Result<()> {
489    let block = provider
490        .get_block_by_number(number.into())
491        .full()
492        .await
493        .context("rpc request failed")?
494        .ok_or_else(|| eyre::eyre!("block {number} not found"))?;
495
496    let hash = block.header.hash();
497    let header: TempoHeader = block.header.inner.inner.clone();
498    let body = tempo_primitives::BlockBody {
499        transactions: block
500            .transactions
501            .into_transactions()
502            .map(|tx| tx.into_inner())
503            .collect(),
504        ommers: vec![],
505        withdrawals: block.withdrawals,
506    };
507
508    cache.insert_block(number, hash, header, body);
509    Ok(())
510}
511
512async fn fetch_and_cache_header_by_hash(
513    provider: &impl Provider<TempoNetwork>,
514    cache: &mut BlockCache,
515    hash: B256,
516) -> Result<u64> {
517    let block = provider
518        .get_block_by_hash(hash)
519        .await
520        .context("rpc request failed")?
521        .ok_or_else(|| eyre::eyre!("block {hash} not found"))?;
522
523    let number = block.header.number();
524    let header: TempoHeader = block.header.inner.inner.clone();
525    cache.insert_header(number, block.header.hash(), header);
526    Ok(number)
527}
528
529async fn fetch_and_cache_header_by_number(
530    provider: &impl Provider<TempoNetwork>,
531    cache: &mut BlockCache,
532    number: u64,
533) -> Result<()> {
534    let block = provider
535        .get_block_by_number(number.into())
536        .await
537        .context("rpc request failed")?
538        .ok_or_else(|| eyre::eyre!("block {number} not found"))?;
539
540    let header: TempoHeader = block.header.inner.inner.clone();
541    cache.insert_header(block.header.number(), block.header.hash(), header);
542    Ok(())
543}
544
545async fn fetch_and_cache_header_batch(
546    provider: &impl Provider<TempoNetwork>,
547    cache: &mut BlockCache,
548    numbers: &[u64],
549) -> Result<()> {
550    let mut batch = BatchRequest::new(provider.client());
551    let mut waiters = Vec::with_capacity(numbers.len());
552
553    for &number in numbers {
554        let waiter = batch.add_call::<_, Option<TempoRpcBlock>>(
555            "eth_getBlockByNumber",
556            &(BlockNumberOrTag::Number(number), false),
557        )?;
558        waiters.push((number, waiter));
559    }
560
561    batch.send().await.context("failed to fetch header batch")?;
562
563    for (number, waiter) in waiters {
564        match waiter.await {
565            Ok(Some(block)) => {
566                let header: TempoHeader = block.header.inner.inner.clone();
567                cache.insert_header(block.header.number(), block.header.hash(), header);
568            }
569            Ok(None) => {
570                debug!(number, "header batch returned no block");
571            }
572            Err(err) => {
573                debug!(number, %err, "header batch waiter failed; falling back to single request");
574                let _ = fetch_and_cache_header_by_number(provider, cache, number).await;
575            }
576        }
577    }
578
579    Ok(())
580}
581
582async fn fetch_and_cache_headers(
583    provider: &impl Provider<TempoNetwork>,
584    cache: &mut BlockCache,
585    numbers: &[u64],
586) {
587    let missing_numbers: Vec<u64> = numbers
588        .iter()
589        .copied()
590        .filter(|number| cache.get_by_number(*number).is_none())
591        .collect();
592
593    for chunk in missing_numbers.chunks(HEADER_RPC_BATCH_SIZE) {
594        if fetch_and_cache_header_batch(provider, cache, chunk)
595            .await
596            .is_err()
597        {
598            for &number in chunk {
599                let _ = fetch_and_cache_header_by_number(provider, cache, number).await;
600            }
601        }
602    }
603}
604
605async fn resolve_start_block_number(
606    provider: &impl Provider<TempoNetwork>,
607    cache: &mut BlockCache,
608    start_block: BlockHashOrNumber,
609) -> Option<u64> {
610    match start_block {
611        BlockHashOrNumber::Number(number) => Some(number),
612        BlockHashOrNumber::Hash(hash) => {
613            if let Some(block) = cache.get_by_hash(&hash) {
614                return Some(block.header.number());
615            }
616
617            fetch_and_cache_header_by_hash(provider, cache, hash)
618                .await
619                .ok()
620        }
621    }
622}
623
624fn requested_header_numbers(
625    mut current: u64,
626    request: &reth_eth_wire_types::GetBlockHeaders,
627) -> Vec<u64> {
628    let limit = request.limit.min(MAX_HEADERS_SERVE as u64) as usize;
629    let mut numbers = Vec::with_capacity(limit);
630    let step = u64::from(request.skip) + 1;
631
632    for _ in 0..limit {
633        numbers.push(current);
634
635        match request.direction {
636            HeadersDirection::Rising => match current.checked_add(step) {
637                Some(next) => current = next,
638                None => break,
639            },
640            HeadersDirection::Falling => match current.checked_sub(step) {
641                Some(next) => current = next,
642                None => break,
643            },
644        }
645    }
646
647    numbers
648}
649
650/// Resolve a GetBlockHeaders request from cache, fetching missing blocks from RPC as needed.
651async fn resolve_headers(
652    provider: &impl Provider<TempoNetwork>,
653    cache: &mut BlockCache,
654    request: &reth_eth_wire_types::GetBlockHeaders,
655) -> Vec<TempoHeader> {
656    let Some(start_num) = resolve_start_block_number(provider, cache, request.start_block).await
657    else {
658        return Vec::new();
659    };
660
661    let requested_numbers = requested_header_numbers(start_num, request);
662    fetch_and_cache_headers(provider, cache, &requested_numbers).await;
663
664    let mut headers = Vec::with_capacity(requested_numbers.len());
665    for number in requested_numbers {
666        let Some(block) = cache.get_by_number(number) else {
667            break;
668        };
669        headers.push(block.header.clone());
670    }
671
672    headers
673}
674
675async fn fetch_body_by_hash(
676    provider: &impl Provider<TempoNetwork>,
677    cache: &mut BlockCache,
678    hash: B256,
679) -> Option<tempo_primitives::BlockBody> {
680    let block = provider
681        .get_block_by_hash(hash)
682        .full()
683        .await
684        .ok()
685        .flatten()?;
686    let number = block.header.number();
687    let header: TempoHeader = block.header.inner.inner.clone();
688    let body = tempo_primitives::BlockBody {
689        transactions: block
690            .transactions
691            .into_transactions()
692            .map(|tx| tx.into_inner())
693            .collect(),
694        ommers: vec![],
695        withdrawals: block.withdrawals,
696    };
697
698    cache.insert_block(number, hash, header, body.clone());
699    Some(body)
700}
701
702/// Resolve a GetBlockBodies request from cache, fetching missing blocks from RPC as needed.
703async fn resolve_bodies(
704    provider: &impl Provider<TempoNetwork>,
705    cache: &mut BlockCache,
706    hashes: &[B256],
707) -> Vec<tempo_primitives::BlockBody> {
708    let mut bodies = Vec::new();
709    let mut total_bytes = 0usize;
710
711    for &hash in hashes {
712        let body = match cache
713            .get_by_hash(&hash)
714            .and_then(|block| block.body.clone())
715        {
716            Some(body) => body,
717            None => match fetch_body_by_hash(provider, cache, hash).await {
718                Some(body) => body,
719                None => break,
720            },
721        };
722
723        // At least one body is served as they can be up to ~8MiB.
724        total_bytes = total_bytes.saturating_add(body.length());
725        bodies.push(body);
726
727        if total_bytes >= SOFT_BODY_RESPONSE_SIZE_LIMIT {
728            break;
729        }
730    }
731
732    bodies
733}
734
735#[cfg(test)]
736mod tests {
737    use super::*;
738    use alloy::{
739        consensus::{BlockHeader, Header},
740        primitives::Sealable,
741    };
742    use reth_eth_wire_types::GetBlockHeaders;
743
744    const MODERATO_RPC: &str = "https://rpc.moderato.tempo.xyz";
745
746    fn moderato_provider() -> impl Provider<TempoNetwork> {
747        ProviderBuilder::new_with_network::<TempoNetwork>()
748            .connect_http(MODERATO_RPC.parse().unwrap())
749    }
750
751    fn cached_body_with_min_size(min_size: usize) -> tempo_primitives::BlockBody {
752        let mut body = tempo_primitives::BlockBody::default();
753        while body.length() < min_size {
754            body.ommers.push(TempoHeader::default());
755        }
756        body
757    }
758
759    fn numbered_hash(number: u64) -> B256 {
760        let mut hash = [0u8; 32];
761        hash[24..].copy_from_slice(&number.to_be_bytes());
762        B256::from(hash)
763    }
764
765    fn insert_test_header(cache: &mut BlockCache, number: u64) {
766        let header = TempoHeader {
767            inner: Header {
768                number,
769                parent_hash: number.checked_sub(1).map_or(B256::default(), numbered_hash),
770                ..Default::default()
771            },
772            ..Default::default()
773        };
774        cache.insert_header(number, numbered_hash(number), header);
775    }
776
777    #[test]
778    fn requested_header_numbers_allows_reth_default_request_limit() {
779        let request = GetBlockHeaders {
780            start_block: BlockHashOrNumber::Number(10),
781            limit: 1_000,
782            skip: 0,
783            direction: HeadersDirection::Rising,
784        };
785
786        let numbers = requested_header_numbers(10, &request);
787
788        assert_eq!(numbers.len(), 1_000);
789        assert_eq!(numbers[0], 10);
790        assert_eq!(numbers[999], 1_009);
791    }
792
793    #[test]
794    fn requested_header_numbers_caps_at_protocol_header_limit() {
795        let request = GetBlockHeaders {
796            start_block: BlockHashOrNumber::Number(0),
797            limit: MAX_HEADERS_SERVE as u64 + 1,
798            skip: 0,
799            direction: HeadersDirection::Rising,
800        };
801
802        let numbers = requested_header_numbers(0, &request);
803
804        assert_eq!(numbers.len(), MAX_HEADERS_SERVE);
805        assert_eq!(numbers[MAX_HEADERS_SERVE - 1], MAX_HEADERS_SERVE as u64 - 1);
806    }
807
808    #[tokio::test]
809    async fn resolve_headers_serves_more_than_rpc_batch_size_when_cached() {
810        let provider = moderato_provider();
811        let mut cache = BlockCache::new(MAX_HEADERS_SERVE as u64);
812
813        for number in 1..=1_000 {
814            insert_test_header(&mut cache, number);
815        }
816
817        let request = GetBlockHeaders {
818            start_block: BlockHashOrNumber::Number(1),
819            limit: 1_000,
820            skip: 0,
821            direction: HeadersDirection::Rising,
822        };
823        let headers = resolve_headers(&provider, &mut cache, &request).await;
824
825        assert_eq!(headers.len(), 1_000);
826        assert_eq!(headers[0].number(), 1);
827        assert_eq!(headers[999].number(), 1_000);
828    }
829
830    #[tokio::test]
831    async fn resolve_bodies_stops_after_reaching_soft_size_limit() {
832        let provider = moderato_provider();
833        let mut cache = BlockCache::new(100);
834        let body = cached_body_with_min_size(SOFT_BODY_RESPONSE_SIZE_LIMIT / 2 + 1);
835        let first_hash = B256::with_last_byte(1);
836        let second_hash = B256::with_last_byte(2);
837        let third_hash = B256::with_last_byte(3);
838
839        cache.insert_block(1, first_hash, TempoHeader::default(), body.clone());
840        cache.insert_block(2, second_hash, TempoHeader::default(), body.clone());
841        cache.insert_block(3, third_hash, TempoHeader::default(), body);
842
843        let bodies = resolve_bodies(
844            &provider,
845            &mut cache,
846            &[first_hash, second_hash, third_hash],
847        )
848        .await;
849        assert_eq!(bodies.len(), 2);
850    }
851
852    #[tokio::test]
853    async fn resolve_bodies_serves_body_exceeding_soft_size_limit() {
854        let provider = moderato_provider();
855        let mut cache = BlockCache::new(100);
856        let body = cached_body_with_min_size(8 * SOFT_BODY_RESPONSE_SIZE_LIMIT);
857        let first_hash = B256::with_last_byte(1);
858        let second_hash = B256::with_last_byte(2);
859
860        cache.insert_block(1, first_hash, TempoHeader::default(), body.clone());
861        cache.insert_block(2, second_hash, TempoHeader::default(), body);
862
863        let bodies = resolve_bodies(&provider, &mut cache, &[first_hash, second_hash]).await;
864        assert_eq!(bodies.len(), 1);
865        assert!(bodies[0].length() > SOFT_BODY_RESPONSE_SIZE_LIMIT);
866    }
867
868    #[tokio::test]
869    async fn fetch_headers_and_bodies() {
870        let provider = moderato_provider();
871        let mut cache = BlockCache::new(100);
872
873        let latest = provider.get_block_number().await.unwrap();
874        let start = latest.saturating_sub(4);
875
876        // Fetch 5 rising headers
877        let request = GetBlockHeaders {
878            start_block: BlockHashOrNumber::Number(start),
879            limit: 5,
880            skip: 0,
881            direction: HeadersDirection::Rising,
882        };
883        let headers = resolve_headers(&provider, &mut cache, &request).await;
884        assert_eq!(headers.len(), 5);
885        for (i, header) in headers.iter().enumerate() {
886            assert_eq!(header.number(), start + i as u64);
887        }
888        // Parent hash chain should be consistent
889        for pair in headers.windows(2) {
890            assert_eq!(pair[1].parent_hash(), pair[0].hash_slow());
891        }
892
893        // Fetch bodies for the cached blocks
894        let hashes: Vec<B256> = (start..=latest)
895            .map(|n| cache.get_by_number(n).unwrap().hash)
896            .collect();
897        let bodies = resolve_bodies(&provider, &mut cache, &hashes).await;
898        assert_eq!(bodies.len(), 5);
899    }
900
901    #[tokio::test]
902    async fn fetch_body_by_hash_from_rpc() {
903        let provider = moderato_provider();
904        let mut cache = BlockCache::new(100);
905
906        // Learn a hash, then clear cache to force RPC fetch
907        let latest = provider.get_block_number().await.unwrap();
908        fetch_and_cache_block(&provider, &mut cache, latest)
909            .await
910            .unwrap();
911        let hash = cache.get_by_number(latest).unwrap().hash;
912        cache = BlockCache::new(100);
913
914        let bodies = resolve_bodies(&provider, &mut cache, &[hash]).await;
915        assert_eq!(bodies.len(), 1);
916        assert!(
917            cache.get_by_hash(&hash).is_some(),
918            "should be cached after fetch"
919        );
920    }
921
922    #[tokio::test]
923    async fn fetch_headers_by_hash_from_rpc_when_not_cached() {
924        let provider = moderato_provider();
925        let mut cache = BlockCache::new(100);
926
927        let latest = provider.get_block_number().await.unwrap();
928        let start = latest.saturating_sub(2);
929        let start_block = provider
930            .get_block_by_number(start.into())
931            .await
932            .unwrap()
933            .unwrap();
934        let start_hash = start_block.header.hash();
935
936        let request = GetBlockHeaders {
937            start_block: BlockHashOrNumber::Hash(start_hash),
938            limit: 3,
939            skip: 0,
940            direction: HeadersDirection::Rising,
941        };
942        let headers = resolve_headers(&provider, &mut cache, &request).await;
943
944        assert_eq!(headers.len(), 3);
945        assert_eq!(headers[0].number(), start);
946        assert_eq!(headers[0].hash_slow(), start_hash);
947        assert!(cache.get_by_hash(&start_hash).is_some());
948    }
949}