Skip to main content

tempo/
p2p_proxy.rs

1use alloy::{
2    consensus::BlockHeader,
3    eips::{BlockHashOrNumber, BlockNumberOrTag},
4    network::{Network, primitives::HeaderResponse},
5    primitives::B256,
6    providers::{Provider, ProviderBuilder},
7    rpc::client::BatchRequest,
8};
9use clap::Parser;
10use eyre::{Context, Result};
11use futures::StreamExt;
12use reth_chainspec::Head;
13use reth_cli_util::get_secret_key;
14use reth_eth_wire_types::{
15    HeadersDirection, PooledTransactions, primitives::BasicNetworkPrimitives,
16};
17use reth_ethereum::{
18    network::{
19        NetworkConfig, NetworkEventListenerProvider, NetworkInfo, NetworkManager, PeersConfig,
20        PeersInfo, eth_requests::IncomingEthRequest, transactions::NetworkTransactionEvent,
21    },
22    tasks::Runtime,
23};
24use std::{
25    collections::{BTreeMap, HashMap},
26    path::PathBuf,
27    sync::{
28        Arc,
29        atomic::{AtomicU64, Ordering},
30    },
31    time::Duration,
32};
33use tempo_alloy::TempoNetwork;
34use tempo_chainspec::spec::{TempoChainSpec, chain_value_parser};
35use tempo_primitives::{TempoHeader, TempoPrimitives, TempoTxEnvelope};
36use tokio::sync::{mpsc, oneshot};
37use tracing::{debug, error, info};
38
39/// Tempo-specific network primitives for the proxy node.
40type TempoNetPrimitives = BasicNetworkPrimitives<TempoPrimitives, TempoTxEnvelope>;
41type TempoRpcBlock = <TempoNetwork as Network>::BlockResponse;
42
43/// 3 hrs of blocks at 500ms block time.
44const CACHE_CAPACITY: u64 = 60 * 60 * 6; // 21600
45const HEADER_BATCH_SIZE: usize = 512;
46
47#[derive(Parser, Debug)]
48#[command(
49    about = "Run a proxy P2P node that serves cached block data fetched from an RPC endpoint"
50)]
51pub(crate) struct P2pProxyArgs {
52    /// RPC endpoint to fetch blocks from (HTTP or WebSocket).
53    #[arg(long, required = true)]
54    rpc_url: String,
55
56    /// Chain to connect to.
57    #[arg(long, default_value = "mainnet")]
58    chain: String,
59
60    /// Port for the P2P listener.
61    #[arg(long, default_value_t = 30303)]
62    port: u16,
63
64    /// Discovery port.
65    #[arg(long)]
66    discovery_port: Option<u16>,
67
68    /// Maximum number of inbound peer connections.
69    #[arg(long, default_value_t = 100)]
70    max_inbound: usize,
71
72    /// Maximum number of concurrent incoming connection attempts.
73    #[arg(long, default_value_t = 30)]
74    max_concurrent_inbound: usize,
75
76    /// Number of blocks to cache.
77    #[arg(long)]
78    cache_blocks: Option<u64>,
79
80    /// Path to the P2P secret key file. If the file doesn't exist, a new key is generated
81    /// and saved. If omitted, an ephemeral key is generated on each start.
82    #[arg(long)]
83    p2p_secret_key: Option<PathBuf>,
84}
85
86impl P2pProxyArgs {
87    pub(crate) async fn run(self) -> Result<()> {
88        let chain_spec = chain_value_parser(&self.chain)?;
89
90        // Fetch latest head from RPC for the network status handshake
91        let provider = ProviderBuilder::new_with_network::<TempoNetwork>()
92            .connect(&self.rpc_url)
93            .await
94            .context("failed to connect to RPC")?;
95        let latest_block = provider
96            .get_block_by_number(Default::default())
97            .await
98            .context("failed to fetch latest block")?
99            .ok_or_else(|| eyre::eyre!("latest block not found"))?;
100        let head = Head {
101            number: latest_block.header.number(),
102            hash: latest_block.header.hash(),
103            difficulty: latest_block.header.difficulty(),
104            total_difficulty: latest_block.header.difficulty(),
105            timestamp: latest_block.header.timestamp(),
106        };
107        info!(number = head.number, hash = %head.hash, "fetched latest head");
108
109        // Channel for the single fetcher service
110        let (fetch_tx, fetch_rx) = mpsc::channel::<FetchRequest>(256);
111
112        // Spawn the block fetcher service
113        let rpc_url = self.rpc_url.clone();
114        let cache_blocks = self.cache_blocks.unwrap_or(CACHE_CAPACITY);
115        tokio::spawn(async move {
116            if let Err(err) = run_fetcher_service(rpc_url, fetch_rx, cache_blocks).await {
117                error!(%err, "block fetcher service exited with error");
118            }
119        });
120
121        // Resolve the P2P secret key: load from file (creating if needed), or ephemeral
122        let secret_key = match &self.p2p_secret_key {
123            Some(path) => get_secret_key(path).context("failed to load P2P secret key")?,
124            None => reth_cli_util::load_secret_key::rng_secret_key(),
125        };
126
127        // Launch the P2P network
128        let net_cfg = NetConfig {
129            port: self.port,
130            discovery_port: self.discovery_port,
131            max_inbound: self.max_inbound,
132            max_concurrent_inbound: self.max_concurrent_inbound,
133            head,
134        };
135        run_p2p_network(chain_spec, net_cfg, fetch_tx, secret_key).await
136    }
137}
138
139/// Resolved network configuration passed to `run_p2p_network`.
140struct NetConfig {
141    port: u16,
142    discovery_port: Option<u16>,
143    max_inbound: usize,
144    max_concurrent_inbound: usize,
145    head: Head,
146}
147
148/// Shared request counters for periodic stats logging.
149struct RequestStats {
150    headers: AtomicU64,
151    bodies: AtomicU64,
152}
153
154/// Messages from the request handler to the single block-fetcher service.
155enum FetchRequest {
156    GetHeaders {
157        request: reth_eth_wire_types::GetBlockHeaders,
158        response: oneshot::Sender<Vec<TempoHeader>>,
159    },
160    GetBodies {
161        hashes: Vec<B256>,
162        response: oneshot::Sender<Vec<tempo_primitives::BlockBody>>,
163    },
164}
165
166/// A cached block: header + body, indexed by number and hash.
167struct BlockCache {
168    /// Blocks ordered by number.
169    by_number: BTreeMap<u64, CachedBlock>,
170    /// Hash -> block number index.
171    by_hash: HashMap<B256, u64>,
172    capacity: u64,
173}
174
175impl BlockCache {
176    fn new(capacity: u64) -> Self {
177        Self {
178            by_number: BTreeMap::new(),
179            by_hash: HashMap::new(),
180            capacity,
181        }
182    }
183
184    fn insert_header(&mut self, number: u64, hash: B256, header: TempoHeader) {
185        self.upsert(number, hash, header, None);
186    }
187
188    fn insert_block(
189        &mut self,
190        number: u64,
191        hash: B256,
192        header: TempoHeader,
193        body: tempo_primitives::BlockBody,
194    ) {
195        self.upsert(number, hash, header, Some(body));
196    }
197
198    fn upsert(
199        &mut self,
200        number: u64,
201        hash: B256,
202        header: TempoHeader,
203        body: Option<tempo_primitives::BlockBody>,
204    ) {
205        if let Some(old_hash) = self.by_number.get(&number).map(|block| block.hash)
206            && old_hash != hash
207        {
208            self.by_hash.remove(&old_hash);
209        }
210
211        let body = match self.by_number.remove(&number) {
212            Some(existing) => body.or(existing.body),
213            None => body,
214        };
215
216        self.by_number
217            .insert(number, CachedBlock { header, body, hash });
218        self.by_hash.insert(hash, number);
219        self.evict();
220    }
221
222    fn evict(&mut self) {
223        while self.by_number.len() as u64 > self.capacity {
224            if let Some((&oldest_num, _)) = self.by_number.iter().next()
225                && let Some(block) = self.by_number.remove(&oldest_num)
226            {
227                self.by_hash.remove(&block.hash);
228            }
229        }
230    }
231
232    fn get_by_number(&self, number: u64) -> Option<&CachedBlock> {
233        self.by_number.get(&number)
234    }
235
236    fn get_by_hash(&self, hash: &B256) -> Option<&CachedBlock> {
237        self.by_hash
238            .get(hash)
239            .and_then(|num| self.by_number.get(num))
240    }
241}
242
243#[derive(Clone)]
244struct CachedBlock {
245    header: TempoHeader,
246    body: Option<tempo_primitives::BlockBody>,
247    hash: B256,
248}
249
250/// Single block-fetcher service that owns the cache and handles all fetch requests.
251async fn run_fetcher_service(
252    rpc_url: String,
253    mut fetch_rx: mpsc::Receiver<FetchRequest>,
254    cache_blocks: u64,
255) -> Result<()> {
256    let provider = ProviderBuilder::new_with_network::<TempoNetwork>()
257        .connect(&rpc_url)
258        .await
259        .context("failed to connect to RPC")?;
260
261    let mut cache = BlockCache::new(cache_blocks);
262
263    // Process incoming requests
264    while let Some(req) = fetch_rx.recv().await {
265        match req {
266            FetchRequest::GetHeaders { request, response } => {
267                let headers = resolve_headers(&provider, &mut cache, &request).await;
268                let _ = response.send(headers);
269            }
270            FetchRequest::GetBodies { hashes, response } => {
271                let bodies = resolve_bodies(&provider, &mut cache, &hashes).await;
272                let _ = response.send(bodies);
273            }
274        }
275    }
276
277    Ok(())
278}
279
280/// Launch the P2P network and handle incoming eth requests.
281async fn run_p2p_network(
282    chain_spec: Arc<TempoChainSpec>,
283    cfg: NetConfig,
284    fetch_tx: mpsc::Sender<FetchRequest>,
285    secret_key: secp256k1::SecretKey,
286) -> Result<()> {
287    let peers_config = PeersConfig::default()
288        .with_max_inbound(cfg.max_inbound)
289        .with_max_outbound(0);
290
291    let mut builder = NetworkConfig::<_, TempoNetPrimitives>::builder(secret_key, Runtime::test())
292        .listener_port(cfg.port)
293        .disable_dns_discovery()
294        .disable_tx_gossip(true)
295        .peer_config(peers_config)
296        .set_head(cfg.head);
297
298    if let Some(dp) = cfg.discovery_port {
299        builder = builder.discovery_port(dp);
300    }
301
302    let mut config = builder.build_with_noop_provider(chain_spec);
303    config.sessions_config.session_event_buffer = cfg.max_concurrent_inbound;
304
305    let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1024);
306    let (transactions_tx, mut transactions_rx) = reth_metrics::common::mpsc::memory_bounded_channel(
307        config
308            .transactions_manager_config
309            .tx_channel_memory_limit_bytes,
310        "p2p-proxy.tx",
311    );
312
313    let network = NetworkManager::new(config)
314        .await
315        .context("failed to create network manager")?
316        .with_eth_request_handler(requests_tx)
317        .with_transactions(transactions_tx);
318
319    let handle = network.handle().clone();
320    info!(
321        peer_id = %handle.peer_id(),
322        local_addr = %handle.local_addr(),
323        max_inbound = cfg.max_inbound,
324        "P2P proxy node started"
325    );
326
327    // Print network events
328    let events_handle = handle.clone();
329    tokio::spawn(async move {
330        let mut events = events_handle.event_listener();
331        while let Some(event) = events.next().await {
332            debug!(?event, "network event");
333        }
334    });
335
336    // Drain transaction events — respond empty to all requests
337    tokio::spawn(async move {
338        while let Some(event) = transactions_rx.recv().await {
339            if let NetworkTransactionEvent::GetPooledTransactions { response, .. } = event {
340                let _ = response.send(Ok(PooledTransactions(vec![])));
341            }
342        }
343    });
344
345    // Spawn the network
346    tokio::spawn(network);
347
348    // Request stats for periodic logging
349    let stats = Arc::new(RequestStats {
350        headers: AtomicU64::new(0),
351        bodies: AtomicU64::new(0),
352    });
353
354    // Periodic stats logging
355    let stats_log = Arc::clone(&stats);
356    let stats_handle = handle.clone();
357    tokio::spawn(async move {
358        let mut interval = tokio::time::interval(Duration::from_secs(30));
359        interval.tick().await; // skip immediate first tick
360        loop {
361            interval.tick().await;
362            let h = stats_log.headers.load(Ordering::Relaxed);
363            let b = stats_log.bodies.load(Ordering::Relaxed);
364            let peers = stats_handle.num_connected_peers();
365            info!(peers, headers_served = h, bodies_served = b, "proxy stats");
366        }
367    });
368
369    // Handle incoming eth requests
370    while let Some(eth_request) = requests_rx.recv().await {
371        match eth_request {
372            IncomingEthRequest::GetBlockHeaders {
373                peer_id,
374                request,
375                response,
376            } => {
377                debug!(%peer_id, ?request, "received GetBlockHeaders");
378                stats.headers.fetch_add(1, Ordering::Relaxed);
379                let fetch_tx = fetch_tx.clone();
380                tokio::spawn(async move {
381                    let headers = async {
382                        let (tx, rx) = oneshot::channel();
383                        fetch_tx
384                            .send(FetchRequest::GetHeaders {
385                                request,
386                                response: tx,
387                            })
388                            .await
389                            .ok()?;
390                        rx.await.ok()
391                    }
392                    .await
393                    .unwrap_or_default();
394                    let _ = response.send(Ok(headers.into()));
395                });
396            }
397            IncomingEthRequest::GetBlockBodies {
398                peer_id,
399                request,
400                response,
401            } => {
402                debug!(%peer_id, ?request, "received GetBlockBodies");
403                stats.bodies.fetch_add(1, Ordering::Relaxed);
404                let fetch_tx = fetch_tx.clone();
405                tokio::spawn(async move {
406                    let bodies = async {
407                        let (tx, rx) = oneshot::channel();
408                        fetch_tx
409                            .send(FetchRequest::GetBodies {
410                                hashes: request.0,
411                                response: tx,
412                            })
413                            .await
414                            .ok()?;
415                        rx.await.ok()
416                    }
417                    .await
418                    .unwrap_or_default();
419                    let _ = response.send(Ok(bodies.into()));
420                });
421            }
422            // All other requests get empty responses
423            IncomingEthRequest::GetNodeData { response, .. } => {
424                let _ = response.send(Ok(Default::default()));
425            }
426            IncomingEthRequest::GetReceipts { response, .. } => {
427                let _ = response.send(Ok(reth_eth_wire_types::Receipts(vec![])));
428            }
429            IncomingEthRequest::GetReceipts69 { response, .. } => {
430                let _ = response.send(Ok(reth_eth_wire_types::Receipts69(vec![])));
431            }
432            IncomingEthRequest::GetReceipts70 { response, .. } => {
433                let _ = response.send(Ok(reth_eth_wire_types::Receipts70 {
434                    last_block_incomplete: false,
435                    receipts: vec![],
436                }));
437            }
438            IncomingEthRequest::GetBlockAccessLists { response, .. } => {
439                let _ = response.send(Ok(Default::default()));
440            }
441        }
442    }
443
444    Ok(())
445}
446
447/// Fetch a single block by number and insert it into the cache.
448#[cfg(test)]
449async fn fetch_and_cache_block(
450    provider: &impl Provider<TempoNetwork>,
451    cache: &mut BlockCache,
452    number: u64,
453) -> Result<()> {
454    let block = provider
455        .get_block_by_number(number.into())
456        .full()
457        .await
458        .context("rpc request failed")?
459        .ok_or_else(|| eyre::eyre!("block {number} not found"))?;
460
461    let hash = block.header.hash();
462    let header: TempoHeader = block.header.inner.inner.clone();
463    let body = tempo_primitives::BlockBody {
464        transactions: block
465            .transactions
466            .into_transactions()
467            .map(|tx| tx.into_inner())
468            .collect(),
469        ommers: vec![],
470        withdrawals: block.withdrawals,
471    };
472
473    cache.insert_block(number, hash, header, body);
474    Ok(())
475}
476
477async fn fetch_and_cache_header_by_hash(
478    provider: &impl Provider<TempoNetwork>,
479    cache: &mut BlockCache,
480    hash: B256,
481) -> Result<u64> {
482    let block = provider
483        .get_block_by_hash(hash)
484        .await
485        .context("rpc request failed")?
486        .ok_or_else(|| eyre::eyre!("block {hash} not found"))?;
487
488    let number = block.header.number();
489    let header: TempoHeader = block.header.inner.inner.clone();
490    cache.insert_header(number, block.header.hash(), header);
491    Ok(number)
492}
493
494async fn fetch_and_cache_header_by_number(
495    provider: &impl Provider<TempoNetwork>,
496    cache: &mut BlockCache,
497    number: u64,
498) -> Result<()> {
499    let block = provider
500        .get_block_by_number(number.into())
501        .await
502        .context("rpc request failed")?
503        .ok_or_else(|| eyre::eyre!("block {number} not found"))?;
504
505    let header: TempoHeader = block.header.inner.inner.clone();
506    cache.insert_header(block.header.number(), block.header.hash(), header);
507    Ok(())
508}
509
510async fn fetch_and_cache_header_batch(
511    provider: &impl Provider<TempoNetwork>,
512    cache: &mut BlockCache,
513    numbers: &[u64],
514) -> Result<()> {
515    let mut batch = BatchRequest::new(provider.client());
516    let mut waiters = Vec::with_capacity(numbers.len());
517
518    for &number in numbers {
519        let waiter = batch.add_call::<_, Option<TempoRpcBlock>>(
520            "eth_getBlockByNumber",
521            &(BlockNumberOrTag::Number(number), false),
522        )?;
523        waiters.push((number, waiter));
524    }
525
526    batch.send().await.context("failed to fetch header batch")?;
527
528    for (number, waiter) in waiters {
529        match waiter.await {
530            Ok(Some(block)) => {
531                let header: TempoHeader = block.header.inner.inner.clone();
532                cache.insert_header(block.header.number(), block.header.hash(), header);
533            }
534            Ok(None) => {
535                debug!(number, "header batch returned no block");
536            }
537            Err(err) => {
538                debug!(number, %err, "header batch waiter failed; falling back to single request");
539                let _ = fetch_and_cache_header_by_number(provider, cache, number).await;
540            }
541        }
542    }
543
544    Ok(())
545}
546
547async fn fetch_and_cache_headers(
548    provider: &impl Provider<TempoNetwork>,
549    cache: &mut BlockCache,
550    numbers: &[u64],
551) {
552    let missing_numbers: Vec<u64> = numbers
553        .iter()
554        .copied()
555        .filter(|number| cache.get_by_number(*number).is_none())
556        .collect();
557
558    for chunk in missing_numbers.chunks(HEADER_BATCH_SIZE) {
559        if fetch_and_cache_header_batch(provider, cache, chunk)
560            .await
561            .is_err()
562        {
563            for &number in chunk {
564                let _ = fetch_and_cache_header_by_number(provider, cache, number).await;
565            }
566        }
567    }
568}
569
570async fn resolve_start_block_number(
571    provider: &impl Provider<TempoNetwork>,
572    cache: &mut BlockCache,
573    start_block: BlockHashOrNumber,
574) -> Option<u64> {
575    match start_block {
576        BlockHashOrNumber::Number(number) => Some(number),
577        BlockHashOrNumber::Hash(hash) => {
578            if let Some(block) = cache.get_by_hash(&hash) {
579                return Some(block.header.number());
580            }
581
582            fetch_and_cache_header_by_hash(provider, cache, hash)
583                .await
584                .ok()
585        }
586    }
587}
588
589fn requested_header_numbers(
590    mut current: u64,
591    request: &reth_eth_wire_types::GetBlockHeaders,
592) -> Vec<u64> {
593    let limit = request.limit.min(HEADER_BATCH_SIZE as u64) as usize;
594    let mut numbers = Vec::with_capacity(limit);
595    let step = u64::from(request.skip) + 1;
596
597    for _ in 0..limit {
598        numbers.push(current);
599
600        match request.direction {
601            HeadersDirection::Rising => match current.checked_add(step) {
602                Some(next) => current = next,
603                None => break,
604            },
605            HeadersDirection::Falling => match current.checked_sub(step) {
606                Some(next) => current = next,
607                None => break,
608            },
609        }
610    }
611
612    numbers
613}
614
615/// Resolve a GetBlockHeaders request from cache, fetching missing blocks from RPC as needed.
616async fn resolve_headers(
617    provider: &impl Provider<TempoNetwork>,
618    cache: &mut BlockCache,
619    request: &reth_eth_wire_types::GetBlockHeaders,
620) -> Vec<TempoHeader> {
621    let Some(start_num) = resolve_start_block_number(provider, cache, request.start_block).await
622    else {
623        return Vec::new();
624    };
625
626    let requested_numbers = requested_header_numbers(start_num, request);
627    fetch_and_cache_headers(provider, cache, &requested_numbers).await;
628
629    let mut headers = Vec::with_capacity(requested_numbers.len());
630    for number in requested_numbers {
631        let Some(block) = cache.get_by_number(number) else {
632            break;
633        };
634        headers.push(block.header.clone());
635    }
636
637    headers
638}
639
640async fn fetch_body_by_hash(
641    provider: &impl Provider<TempoNetwork>,
642    cache: &mut BlockCache,
643    hash: B256,
644) -> Option<tempo_primitives::BlockBody> {
645    let block = provider
646        .get_block_by_hash(hash)
647        .full()
648        .await
649        .ok()
650        .flatten()?;
651    let number = block.header.number();
652    let header: TempoHeader = block.header.inner.inner.clone();
653    let body = tempo_primitives::BlockBody {
654        transactions: block
655            .transactions
656            .into_transactions()
657            .map(|tx| tx.into_inner())
658            .collect(),
659        ommers: vec![],
660        withdrawals: block.withdrawals,
661    };
662
663    cache.insert_block(number, hash, header, body.clone());
664    Some(body)
665}
666
667/// Resolve a GetBlockBodies request from cache, fetching missing blocks from RPC as needed.
668async fn resolve_bodies(
669    provider: &impl Provider<TempoNetwork>,
670    cache: &mut BlockCache,
671    hashes: &[B256],
672) -> Vec<tempo_primitives::BlockBody> {
673    let mut bodies = Vec::new();
674
675    for &hash in hashes {
676        if let Some(body) = cache
677            .get_by_hash(&hash)
678            .and_then(|block| block.body.clone())
679        {
680            bodies.push(body);
681            continue;
682        }
683
684        if let Some(body) = fetch_body_by_hash(provider, cache, hash).await {
685            bodies.push(body);
686        }
687    }
688
689    bodies
690}
691
692#[cfg(test)]
693mod tests {
694    use super::*;
695    use alloy::{consensus::BlockHeader, primitives::Sealable};
696    use reth_eth_wire_types::GetBlockHeaders;
697
698    const MODERATO_RPC: &str = "https://rpc.moderato.tempo.xyz";
699
700    fn moderato_provider() -> impl Provider<TempoNetwork> {
701        ProviderBuilder::new_with_network::<TempoNetwork>()
702            .connect_http(MODERATO_RPC.parse().unwrap())
703    }
704
705    #[tokio::test]
706    async fn fetch_headers_and_bodies() {
707        let provider = moderato_provider();
708        let mut cache = BlockCache::new(100);
709
710        let latest = provider.get_block_number().await.unwrap();
711        let start = latest.saturating_sub(4);
712
713        // Fetch 5 rising headers
714        let request = GetBlockHeaders {
715            start_block: BlockHashOrNumber::Number(start),
716            limit: 5,
717            skip: 0,
718            direction: HeadersDirection::Rising,
719        };
720        let headers = resolve_headers(&provider, &mut cache, &request).await;
721        assert_eq!(headers.len(), 5);
722        for (i, header) in headers.iter().enumerate() {
723            assert_eq!(header.number(), start + i as u64);
724        }
725        // Parent hash chain should be consistent
726        for pair in headers.windows(2) {
727            assert_eq!(pair[1].parent_hash(), pair[0].hash_slow());
728        }
729
730        // Fetch bodies for the cached blocks
731        let hashes: Vec<B256> = (start..=latest)
732            .map(|n| cache.get_by_number(n).unwrap().hash)
733            .collect();
734        let bodies = resolve_bodies(&provider, &mut cache, &hashes).await;
735        assert_eq!(bodies.len(), 5);
736    }
737
738    #[tokio::test]
739    async fn fetch_body_by_hash_from_rpc() {
740        let provider = moderato_provider();
741        let mut cache = BlockCache::new(100);
742
743        // Learn a hash, then clear cache to force RPC fetch
744        let latest = provider.get_block_number().await.unwrap();
745        fetch_and_cache_block(&provider, &mut cache, latest)
746            .await
747            .unwrap();
748        let hash = cache.get_by_number(latest).unwrap().hash;
749        cache = BlockCache::new(100);
750
751        let bodies = resolve_bodies(&provider, &mut cache, &[hash]).await;
752        assert_eq!(bodies.len(), 1);
753        assert!(
754            cache.get_by_hash(&hash).is_some(),
755            "should be cached after fetch"
756        );
757    }
758
759    #[tokio::test]
760    async fn fetch_headers_by_hash_from_rpc_when_not_cached() {
761        let provider = moderato_provider();
762        let mut cache = BlockCache::new(100);
763
764        let latest = provider.get_block_number().await.unwrap();
765        let start = latest.saturating_sub(2);
766        let start_block = provider
767            .get_block_by_number(start.into())
768            .await
769            .unwrap()
770            .unwrap();
771        let start_hash = start_block.header.hash();
772
773        let request = GetBlockHeaders {
774            start_block: BlockHashOrNumber::Hash(start_hash),
775            limit: 3,
776            skip: 0,
777            direction: HeadersDirection::Rising,
778        };
779        let headers = resolve_headers(&provider, &mut cache, &request).await;
780
781        assert_eq!(headers.len(), 3);
782        assert_eq!(headers[0].number(), start);
783        assert_eq!(headers[0].hash_slow(), start_hash);
784        assert!(cache.get_by_hash(&start_hash).is_some());
785    }
786}