1use alloy::{
2 consensus::BlockHeader,
3 eips::{BlockHashOrNumber, BlockNumberOrTag},
4 network::{Network, primitives::HeaderResponse},
5 primitives::B256,
6 providers::{Provider, ProviderBuilder},
7 rpc::client::BatchRequest,
8};
9use alloy_rlp::Encodable;
10use clap::Parser;
11use eyre::{Context, Result};
12use futures::StreamExt;
13use reth_chainspec::Head;
14use reth_cli_util::get_secret_key;
15use reth_eth_wire_types::{
16 HeadersDirection, PooledTransactions, primitives::BasicNetworkPrimitives,
17};
18use reth_ethereum::{
19 network::{
20 NetworkConfig, NetworkEventListenerProvider, NetworkInfo, NetworkManager, PeersConfig,
21 PeersInfo, eth_requests::IncomingEthRequest, transactions::NetworkTransactionEvent,
22 },
23 tasks::Runtime,
24};
25use std::{
26 collections::{BTreeMap, HashMap},
27 path::PathBuf,
28 sync::{
29 Arc,
30 atomic::{AtomicU64, Ordering},
31 },
32 time::Duration,
33};
34use tempo_alloy::TempoNetwork;
35use tempo_chainspec::spec::{TempoChainSpec, chain_value_parser};
36use tempo_primitives::{TempoHeader, TempoPrimitives, TempoTxEnvelope};
37use tokio::sync::{mpsc, oneshot};
38use tracing::{debug, error, info};
39
40type TempoNetPrimitives = BasicNetworkPrimitives<TempoPrimitives, TempoTxEnvelope>;
42type TempoRpcBlock = <TempoNetwork as Network>::BlockResponse;
43
44const CACHE_CAPACITY: u64 = 60 * 60 * 6; const HEADER_RPC_BATCH_SIZE: usize = 128;
48const MAX_HEADERS_SERVE: usize = 1024;
50const SOFT_BODY_RESPONSE_SIZE_LIMIT: usize = 1024 * 1024; #[derive(Parser, Debug)]
54#[command(
55 about = "Run a proxy P2P node that serves cached block data fetched from an RPC endpoint"
56)]
57pub struct P2pProxyArgs {
58 #[arg(long, required = true)]
60 rpc_url: String,
61
62 #[arg(long, default_value = "mainnet")]
64 chain: String,
65
66 #[arg(long, default_value_t = 30303)]
68 port: u16,
69
70 #[arg(long)]
72 discovery_port: Option<u16>,
73
74 #[arg(long, default_value_t = 100)]
76 max_inbound: usize,
77
78 #[arg(long, default_value_t = 30)]
80 max_concurrent_inbound: usize,
81
82 #[arg(long)]
84 cache_blocks: Option<u64>,
85
86 #[arg(long)]
89 p2p_secret_key: Option<PathBuf>,
90}
91
92impl P2pProxyArgs {
93 pub(crate) async fn run(self) -> Result<()> {
94 let chain_spec = chain_value_parser(&self.chain)?;
95
96 let provider = ProviderBuilder::new_with_network::<TempoNetwork>()
98 .connect(&self.rpc_url)
99 .await
100 .context("failed to connect to RPC")?;
101 let latest_block = provider
102 .get_block_by_number(Default::default())
103 .await
104 .context("failed to fetch latest block")?
105 .ok_or_else(|| eyre::eyre!("latest block not found"))?;
106 let head = Head {
107 number: latest_block.header.number(),
108 hash: latest_block.header.hash(),
109 difficulty: latest_block.header.difficulty(),
110 total_difficulty: latest_block.header.difficulty(),
111 timestamp: latest_block.header.timestamp(),
112 };
113 info!(number = head.number, hash = %head.hash, "fetched latest head");
114
115 let (fetch_tx, fetch_rx) = mpsc::channel::<FetchRequest>(256);
117
118 let rpc_url = self.rpc_url.clone();
120 let cache_blocks = self.cache_blocks.unwrap_or(CACHE_CAPACITY);
121 tokio::spawn(async move {
122 if let Err(err) = run_fetcher_service(rpc_url, fetch_rx, cache_blocks).await {
123 error!(%err, "block fetcher service exited with error");
124 }
125 });
126
127 let secret_key = match &self.p2p_secret_key {
129 Some(path) => get_secret_key(path).context("failed to load P2P secret key")?,
130 None => reth_cli_util::load_secret_key::rng_secret_key(),
131 };
132
133 let net_cfg = NetConfig {
135 port: self.port,
136 discovery_port: self.discovery_port,
137 max_inbound: self.max_inbound,
138 max_concurrent_inbound: self.max_concurrent_inbound,
139 head,
140 };
141 run_p2p_network(chain_spec, net_cfg, fetch_tx, secret_key).await
142 }
143}
144
145struct NetConfig {
147 port: u16,
148 discovery_port: Option<u16>,
149 max_inbound: usize,
150 max_concurrent_inbound: usize,
151 head: Head,
152}
153
154struct RequestStats {
156 header_requests_received: AtomicU64,
157 body_requests_received: AtomicU64,
158 headers_served: AtomicU64,
159 bodies_served: AtomicU64,
160}
161
162enum FetchRequest {
164 GetHeaders {
165 request: reth_eth_wire_types::GetBlockHeaders,
166 response: oneshot::Sender<Vec<TempoHeader>>,
167 },
168 GetBodies {
169 hashes: Vec<B256>,
170 response: oneshot::Sender<Vec<tempo_primitives::BlockBody>>,
171 },
172}
173
174struct BlockCache {
176 by_number: BTreeMap<u64, CachedBlock>,
178 by_hash: HashMap<B256, u64>,
180 capacity: u64,
181}
182
183impl BlockCache {
184 fn new(capacity: u64) -> Self {
185 Self {
186 by_number: BTreeMap::new(),
187 by_hash: HashMap::new(),
188 capacity,
189 }
190 }
191
192 fn insert_header(&mut self, number: u64, hash: B256, header: TempoHeader) {
193 self.upsert(number, hash, header, None);
194 }
195
196 fn insert_block(
197 &mut self,
198 number: u64,
199 hash: B256,
200 header: TempoHeader,
201 body: tempo_primitives::BlockBody,
202 ) {
203 self.upsert(number, hash, header, Some(body));
204 }
205
206 fn upsert(
207 &mut self,
208 number: u64,
209 hash: B256,
210 header: TempoHeader,
211 body: Option<tempo_primitives::BlockBody>,
212 ) {
213 if let Some(old_hash) = self.by_number.get(&number).map(|block| block.hash)
214 && old_hash != hash
215 {
216 self.by_hash.remove(&old_hash);
217 }
218
219 let body = match self.by_number.remove(&number) {
220 Some(existing) => body.or(existing.body),
221 None => body,
222 };
223
224 self.by_number
225 .insert(number, CachedBlock { header, body, hash });
226 self.by_hash.insert(hash, number);
227 self.evict();
228 }
229
230 fn evict(&mut self) {
231 while self.by_number.len() as u64 > self.capacity {
232 if let Some((_, block)) = self.by_number.pop_first() {
233 self.by_hash.remove(&block.hash);
234 }
235 }
236 }
237
238 fn get_by_number(&self, number: u64) -> Option<&CachedBlock> {
239 self.by_number.get(&number)
240 }
241
242 fn get_by_hash(&self, hash: &B256) -> Option<&CachedBlock> {
243 self.by_hash
244 .get(hash)
245 .and_then(|num| self.by_number.get(num))
246 }
247}
248
249#[derive(Clone)]
250struct CachedBlock {
251 header: TempoHeader,
252 body: Option<tempo_primitives::BlockBody>,
253 hash: B256,
254}
255
256async fn run_fetcher_service(
258 rpc_url: String,
259 mut fetch_rx: mpsc::Receiver<FetchRequest>,
260 cache_blocks: u64,
261) -> Result<()> {
262 let provider = ProviderBuilder::new_with_network::<TempoNetwork>()
263 .connect(&rpc_url)
264 .await
265 .context("failed to connect to RPC")?;
266
267 let mut cache = BlockCache::new(cache_blocks);
268
269 while let Some(req) = fetch_rx.recv().await {
271 match req {
272 FetchRequest::GetHeaders { request, response } => {
273 let headers = resolve_headers(&provider, &mut cache, &request).await;
274 let _ = response.send(headers);
275 }
276 FetchRequest::GetBodies { hashes, response } => {
277 let bodies = resolve_bodies(&provider, &mut cache, &hashes).await;
278 let _ = response.send(bodies);
279 }
280 }
281 }
282
283 Ok(())
284}
285
286async fn run_p2p_network(
288 chain_spec: Arc<TempoChainSpec>,
289 cfg: NetConfig,
290 fetch_tx: mpsc::Sender<FetchRequest>,
291 secret_key: secp256k1::SecretKey,
292) -> Result<()> {
293 let peers_config = PeersConfig::default()
294 .with_max_inbound(cfg.max_inbound)
295 .with_max_outbound(0);
296
297 let mut builder = NetworkConfig::<_, TempoNetPrimitives>::builder(secret_key, Runtime::test())
298 .listener_port(cfg.port)
299 .disable_dns_discovery()
300 .disable_tx_gossip(true)
301 .peer_config(peers_config)
302 .set_head(cfg.head);
303
304 if let Some(dp) = cfg.discovery_port {
305 builder = builder.discovery_port(dp);
306 }
307
308 let mut config = builder.build_with_noop_provider(chain_spec);
309 config.sessions_config.session_event_buffer = cfg.max_concurrent_inbound;
310
311 let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1024);
312 let (transactions_tx, mut transactions_rx) = reth_metrics::common::mpsc::memory_bounded_channel(
313 config
314 .transactions_manager_config
315 .tx_channel_memory_limit_bytes,
316 "p2p-proxy.tx",
317 );
318
319 let network = NetworkManager::new(config)
320 .await
321 .context("failed to create network manager")?
322 .with_eth_request_handler(requests_tx)
323 .with_transactions(transactions_tx);
324
325 let handle = network.handle().clone();
326 info!(
327 peer_id = %handle.peer_id(),
328 local_addr = %handle.local_addr(),
329 max_inbound = cfg.max_inbound,
330 "P2P proxy node started"
331 );
332
333 let events_handle = handle.clone();
335 tokio::spawn(async move {
336 let mut events = events_handle.event_listener();
337 while let Some(event) = events.next().await {
338 debug!(?event, "network event");
339 }
340 });
341
342 tokio::spawn(async move {
344 while let Some(event) = transactions_rx.recv().await {
345 if let NetworkTransactionEvent::GetPooledTransactions { response, .. } = event {
346 let _ = response.send(Ok(PooledTransactions(vec![])));
347 }
348 }
349 });
350
351 tokio::spawn(network);
353
354 let stats = Arc::new(RequestStats {
356 header_requests_received: AtomicU64::new(0),
357 body_requests_received: AtomicU64::new(0),
358 headers_served: AtomicU64::new(0),
359 bodies_served: AtomicU64::new(0),
360 });
361
362 let stats_log = Arc::clone(&stats);
364 let stats_handle = handle.clone();
365 tokio::spawn(async move {
366 let mut interval = tokio::time::interval(Duration::from_secs(30));
367 interval.tick().await; loop {
369 interval.tick().await;
370 let header_requests_received =
371 stats_log.header_requests_received.load(Ordering::Relaxed);
372 let body_requests_received = stats_log.body_requests_received.load(Ordering::Relaxed);
373 let headers_served = stats_log.headers_served.load(Ordering::Relaxed);
374 let bodies_served = stats_log.bodies_served.load(Ordering::Relaxed);
375 let peers = stats_handle.num_connected_peers();
376 info!(
377 peers,
378 header_requests_received,
379 body_requests_received,
380 headers_served,
381 bodies_served,
382 "proxy stats"
383 );
384 }
385 });
386
387 while let Some(eth_request) = requests_rx.recv().await {
389 match eth_request {
390 IncomingEthRequest::GetBlockHeaders {
391 peer_id,
392 request,
393 response,
394 } => {
395 debug!(%peer_id, ?request, "received GetBlockHeaders");
396 stats
397 .header_requests_received
398 .fetch_add(1, Ordering::Relaxed);
399 let fetch_tx = fetch_tx.clone();
400 let stats = Arc::clone(&stats);
401 tokio::spawn(async move {
402 let headers = async {
403 let (tx, rx) = oneshot::channel();
404 fetch_tx
405 .send(FetchRequest::GetHeaders {
406 request,
407 response: tx,
408 })
409 .await
410 .ok()?;
411 rx.await.ok()
412 }
413 .await
414 .unwrap_or_default();
415 let headers_served = headers.len() as u64;
416 if response.send(Ok(headers.into())).is_ok() {
417 stats
418 .headers_served
419 .fetch_add(headers_served, Ordering::Relaxed);
420 }
421 });
422 }
423 IncomingEthRequest::GetBlockBodies {
424 peer_id,
425 request,
426 response,
427 } => {
428 debug!(%peer_id, ?request, "received GetBlockBodies");
429 stats.body_requests_received.fetch_add(1, Ordering::Relaxed);
430 let fetch_tx = fetch_tx.clone();
431 let stats = Arc::clone(&stats);
432 tokio::spawn(async move {
433 let bodies = async {
434 let (tx, rx) = oneshot::channel();
435 fetch_tx
436 .send(FetchRequest::GetBodies {
437 hashes: request.0,
438 response: tx,
439 })
440 .await
441 .ok()?;
442 rx.await.ok()
443 }
444 .await
445 .unwrap_or_default();
446 let bodies_served = bodies.len() as u64;
447 if response.send(Ok(bodies.into())).is_ok() {
448 stats
449 .bodies_served
450 .fetch_add(bodies_served, Ordering::Relaxed);
451 }
452 });
453 }
454 IncomingEthRequest::GetNodeData { response, .. } => {
456 let _ = response.send(Ok(Default::default()));
457 }
458 IncomingEthRequest::GetReceipts { response, .. } => {
459 let _ = response.send(Ok(reth_eth_wire_types::Receipts(vec![])));
460 }
461 IncomingEthRequest::GetReceipts69 { response, .. } => {
462 let _ = response.send(Ok(reth_eth_wire_types::Receipts69(vec![])));
463 }
464 IncomingEthRequest::GetReceipts70 { response, .. } => {
465 let _ = response.send(Ok(reth_eth_wire_types::Receipts70 {
466 last_block_incomplete: false,
467 receipts: vec![],
468 }));
469 }
470 IncomingEthRequest::GetBlockAccessLists { response, .. } => {
471 let _ = response.send(Ok(Default::default()));
472 }
473 IncomingEthRequest::GetCells { response, .. } => {
474 let _ = response.send(Ok(Default::default()));
475 }
476 }
477 }
478
479 Ok(())
480}
481
482#[cfg(test)]
484async fn fetch_and_cache_block(
485 provider: &impl Provider<TempoNetwork>,
486 cache: &mut BlockCache,
487 number: u64,
488) -> Result<()> {
489 let block = provider
490 .get_block_by_number(number.into())
491 .full()
492 .await
493 .context("rpc request failed")?
494 .ok_or_else(|| eyre::eyre!("block {number} not found"))?;
495
496 let hash = block.header.hash();
497 let header: TempoHeader = block.header.inner.inner.clone();
498 let body = tempo_primitives::BlockBody {
499 transactions: block
500 .transactions
501 .into_transactions()
502 .map(|tx| tx.into_inner())
503 .collect(),
504 ommers: vec![],
505 withdrawals: block.withdrawals,
506 };
507
508 cache.insert_block(number, hash, header, body);
509 Ok(())
510}
511
512async fn fetch_and_cache_header_by_hash(
513 provider: &impl Provider<TempoNetwork>,
514 cache: &mut BlockCache,
515 hash: B256,
516) -> Result<u64> {
517 let block = provider
518 .get_block_by_hash(hash)
519 .await
520 .context("rpc request failed")?
521 .ok_or_else(|| eyre::eyre!("block {hash} not found"))?;
522
523 let number = block.header.number();
524 let header: TempoHeader = block.header.inner.inner.clone();
525 cache.insert_header(number, block.header.hash(), header);
526 Ok(number)
527}
528
529async fn fetch_and_cache_header_by_number(
530 provider: &impl Provider<TempoNetwork>,
531 cache: &mut BlockCache,
532 number: u64,
533) -> Result<()> {
534 let block = provider
535 .get_block_by_number(number.into())
536 .await
537 .context("rpc request failed")?
538 .ok_or_else(|| eyre::eyre!("block {number} not found"))?;
539
540 let header: TempoHeader = block.header.inner.inner.clone();
541 cache.insert_header(block.header.number(), block.header.hash(), header);
542 Ok(())
543}
544
545async fn fetch_and_cache_header_batch(
546 provider: &impl Provider<TempoNetwork>,
547 cache: &mut BlockCache,
548 numbers: &[u64],
549) -> Result<()> {
550 let mut batch = BatchRequest::new(provider.client());
551 let mut waiters = Vec::with_capacity(numbers.len());
552
553 for &number in numbers {
554 let waiter = batch.add_call::<_, Option<TempoRpcBlock>>(
555 "eth_getBlockByNumber",
556 &(BlockNumberOrTag::Number(number), false),
557 )?;
558 waiters.push((number, waiter));
559 }
560
561 batch.send().await.context("failed to fetch header batch")?;
562
563 for (number, waiter) in waiters {
564 match waiter.await {
565 Ok(Some(block)) => {
566 let header: TempoHeader = block.header.inner.inner.clone();
567 cache.insert_header(block.header.number(), block.header.hash(), header);
568 }
569 Ok(None) => {
570 debug!(number, "header batch returned no block");
571 }
572 Err(err) => {
573 debug!(number, %err, "header batch waiter failed; falling back to single request");
574 let _ = fetch_and_cache_header_by_number(provider, cache, number).await;
575 }
576 }
577 }
578
579 Ok(())
580}
581
582async fn fetch_and_cache_headers(
583 provider: &impl Provider<TempoNetwork>,
584 cache: &mut BlockCache,
585 numbers: &[u64],
586) {
587 let missing_numbers: Vec<u64> = numbers
588 .iter()
589 .copied()
590 .filter(|number| cache.get_by_number(*number).is_none())
591 .collect();
592
593 for chunk in missing_numbers.chunks(HEADER_RPC_BATCH_SIZE) {
594 if fetch_and_cache_header_batch(provider, cache, chunk)
595 .await
596 .is_err()
597 {
598 for &number in chunk {
599 let _ = fetch_and_cache_header_by_number(provider, cache, number).await;
600 }
601 }
602 }
603}
604
605async fn resolve_start_block_number(
606 provider: &impl Provider<TempoNetwork>,
607 cache: &mut BlockCache,
608 start_block: BlockHashOrNumber,
609) -> Option<u64> {
610 match start_block {
611 BlockHashOrNumber::Number(number) => Some(number),
612 BlockHashOrNumber::Hash(hash) => {
613 if let Some(block) = cache.get_by_hash(&hash) {
614 return Some(block.header.number());
615 }
616
617 fetch_and_cache_header_by_hash(provider, cache, hash)
618 .await
619 .ok()
620 }
621 }
622}
623
624fn requested_header_numbers(
625 mut current: u64,
626 request: &reth_eth_wire_types::GetBlockHeaders,
627) -> Vec<u64> {
628 let limit = request.limit.min(MAX_HEADERS_SERVE as u64) as usize;
629 let mut numbers = Vec::with_capacity(limit);
630 let step = u64::from(request.skip) + 1;
631
632 for _ in 0..limit {
633 numbers.push(current);
634
635 match request.direction {
636 HeadersDirection::Rising => match current.checked_add(step) {
637 Some(next) => current = next,
638 None => break,
639 },
640 HeadersDirection::Falling => match current.checked_sub(step) {
641 Some(next) => current = next,
642 None => break,
643 },
644 }
645 }
646
647 numbers
648}
649
650async fn resolve_headers(
652 provider: &impl Provider<TempoNetwork>,
653 cache: &mut BlockCache,
654 request: &reth_eth_wire_types::GetBlockHeaders,
655) -> Vec<TempoHeader> {
656 let Some(start_num) = resolve_start_block_number(provider, cache, request.start_block).await
657 else {
658 return Vec::new();
659 };
660
661 let requested_numbers = requested_header_numbers(start_num, request);
662 fetch_and_cache_headers(provider, cache, &requested_numbers).await;
663
664 let mut headers = Vec::with_capacity(requested_numbers.len());
665 for number in requested_numbers {
666 let Some(block) = cache.get_by_number(number) else {
667 break;
668 };
669 headers.push(block.header.clone());
670 }
671
672 headers
673}
674
675async fn fetch_body_by_hash(
676 provider: &impl Provider<TempoNetwork>,
677 cache: &mut BlockCache,
678 hash: B256,
679) -> Option<tempo_primitives::BlockBody> {
680 let block = provider
681 .get_block_by_hash(hash)
682 .full()
683 .await
684 .ok()
685 .flatten()?;
686 let number = block.header.number();
687 let header: TempoHeader = block.header.inner.inner.clone();
688 let body = tempo_primitives::BlockBody {
689 transactions: block
690 .transactions
691 .into_transactions()
692 .map(|tx| tx.into_inner())
693 .collect(),
694 ommers: vec![],
695 withdrawals: block.withdrawals,
696 };
697
698 cache.insert_block(number, hash, header, body.clone());
699 Some(body)
700}
701
702async fn resolve_bodies(
704 provider: &impl Provider<TempoNetwork>,
705 cache: &mut BlockCache,
706 hashes: &[B256],
707) -> Vec<tempo_primitives::BlockBody> {
708 let mut bodies = Vec::new();
709 let mut total_bytes = 0usize;
710
711 for &hash in hashes {
712 let body = match cache
713 .get_by_hash(&hash)
714 .and_then(|block| block.body.clone())
715 {
716 Some(body) => body,
717 None => match fetch_body_by_hash(provider, cache, hash).await {
718 Some(body) => body,
719 None => break,
720 },
721 };
722
723 total_bytes = total_bytes.saturating_add(body.length());
725 bodies.push(body);
726
727 if total_bytes >= SOFT_BODY_RESPONSE_SIZE_LIMIT {
728 break;
729 }
730 }
731
732 bodies
733}
734
735#[cfg(test)]
736mod tests {
737 use super::*;
738 use alloy::{
739 consensus::{BlockHeader, Header},
740 primitives::Sealable,
741 };
742 use reth_eth_wire_types::GetBlockHeaders;
743
744 const MODERATO_RPC: &str = "https://rpc.moderato.tempo.xyz";
745
746 fn moderato_provider() -> impl Provider<TempoNetwork> {
747 ProviderBuilder::new_with_network::<TempoNetwork>()
748 .connect_http(MODERATO_RPC.parse().unwrap())
749 }
750
751 fn cached_body_with_min_size(min_size: usize) -> tempo_primitives::BlockBody {
752 let mut body = tempo_primitives::BlockBody::default();
753 while body.length() < min_size {
754 body.ommers.push(TempoHeader::default());
755 }
756 body
757 }
758
759 fn numbered_hash(number: u64) -> B256 {
760 let mut hash = [0u8; 32];
761 hash[24..].copy_from_slice(&number.to_be_bytes());
762 B256::from(hash)
763 }
764
765 fn insert_test_header(cache: &mut BlockCache, number: u64) {
766 let header = TempoHeader {
767 inner: Header {
768 number,
769 parent_hash: number.checked_sub(1).map_or(B256::default(), numbered_hash),
770 ..Default::default()
771 },
772 ..Default::default()
773 };
774 cache.insert_header(number, numbered_hash(number), header);
775 }
776
777 #[test]
778 fn requested_header_numbers_allows_reth_default_request_limit() {
779 let request = GetBlockHeaders {
780 start_block: BlockHashOrNumber::Number(10),
781 limit: 1_000,
782 skip: 0,
783 direction: HeadersDirection::Rising,
784 };
785
786 let numbers = requested_header_numbers(10, &request);
787
788 assert_eq!(numbers.len(), 1_000);
789 assert_eq!(numbers[0], 10);
790 assert_eq!(numbers[999], 1_009);
791 }
792
793 #[test]
794 fn requested_header_numbers_caps_at_protocol_header_limit() {
795 let request = GetBlockHeaders {
796 start_block: BlockHashOrNumber::Number(0),
797 limit: MAX_HEADERS_SERVE as u64 + 1,
798 skip: 0,
799 direction: HeadersDirection::Rising,
800 };
801
802 let numbers = requested_header_numbers(0, &request);
803
804 assert_eq!(numbers.len(), MAX_HEADERS_SERVE);
805 assert_eq!(numbers[MAX_HEADERS_SERVE - 1], MAX_HEADERS_SERVE as u64 - 1);
806 }
807
808 #[tokio::test]
809 async fn resolve_headers_serves_more_than_rpc_batch_size_when_cached() {
810 let provider = moderato_provider();
811 let mut cache = BlockCache::new(MAX_HEADERS_SERVE as u64);
812
813 for number in 1..=1_000 {
814 insert_test_header(&mut cache, number);
815 }
816
817 let request = GetBlockHeaders {
818 start_block: BlockHashOrNumber::Number(1),
819 limit: 1_000,
820 skip: 0,
821 direction: HeadersDirection::Rising,
822 };
823 let headers = resolve_headers(&provider, &mut cache, &request).await;
824
825 assert_eq!(headers.len(), 1_000);
826 assert_eq!(headers[0].number(), 1);
827 assert_eq!(headers[999].number(), 1_000);
828 }
829
830 #[tokio::test]
831 async fn resolve_bodies_stops_after_reaching_soft_size_limit() {
832 let provider = moderato_provider();
833 let mut cache = BlockCache::new(100);
834 let body = cached_body_with_min_size(SOFT_BODY_RESPONSE_SIZE_LIMIT / 2 + 1);
835 let first_hash = B256::with_last_byte(1);
836 let second_hash = B256::with_last_byte(2);
837 let third_hash = B256::with_last_byte(3);
838
839 cache.insert_block(1, first_hash, TempoHeader::default(), body.clone());
840 cache.insert_block(2, second_hash, TempoHeader::default(), body.clone());
841 cache.insert_block(3, third_hash, TempoHeader::default(), body);
842
843 let bodies = resolve_bodies(
844 &provider,
845 &mut cache,
846 &[first_hash, second_hash, third_hash],
847 )
848 .await;
849 assert_eq!(bodies.len(), 2);
850 }
851
852 #[tokio::test]
853 async fn resolve_bodies_serves_body_exceeding_soft_size_limit() {
854 let provider = moderato_provider();
855 let mut cache = BlockCache::new(100);
856 let body = cached_body_with_min_size(8 * SOFT_BODY_RESPONSE_SIZE_LIMIT);
857 let first_hash = B256::with_last_byte(1);
858 let second_hash = B256::with_last_byte(2);
859
860 cache.insert_block(1, first_hash, TempoHeader::default(), body.clone());
861 cache.insert_block(2, second_hash, TempoHeader::default(), body);
862
863 let bodies = resolve_bodies(&provider, &mut cache, &[first_hash, second_hash]).await;
864 assert_eq!(bodies.len(), 1);
865 assert!(bodies[0].length() > SOFT_BODY_RESPONSE_SIZE_LIMIT);
866 }
867
868 #[tokio::test]
869 async fn fetch_headers_and_bodies() {
870 let provider = moderato_provider();
871 let mut cache = BlockCache::new(100);
872
873 let latest = provider.get_block_number().await.unwrap();
874 let start = latest.saturating_sub(4);
875
876 let request = GetBlockHeaders {
878 start_block: BlockHashOrNumber::Number(start),
879 limit: 5,
880 skip: 0,
881 direction: HeadersDirection::Rising,
882 };
883 let headers = resolve_headers(&provider, &mut cache, &request).await;
884 assert_eq!(headers.len(), 5);
885 for (i, header) in headers.iter().enumerate() {
886 assert_eq!(header.number(), start + i as u64);
887 }
888 for pair in headers.windows(2) {
890 assert_eq!(pair[1].parent_hash(), pair[0].hash_slow());
891 }
892
893 let hashes: Vec<B256> = (start..=latest)
895 .map(|n| cache.get_by_number(n).unwrap().hash)
896 .collect();
897 let bodies = resolve_bodies(&provider, &mut cache, &hashes).await;
898 assert_eq!(bodies.len(), 5);
899 }
900
901 #[tokio::test]
902 async fn fetch_body_by_hash_from_rpc() {
903 let provider = moderato_provider();
904 let mut cache = BlockCache::new(100);
905
906 let latest = provider.get_block_number().await.unwrap();
908 fetch_and_cache_block(&provider, &mut cache, latest)
909 .await
910 .unwrap();
911 let hash = cache.get_by_number(latest).unwrap().hash;
912 cache = BlockCache::new(100);
913
914 let bodies = resolve_bodies(&provider, &mut cache, &[hash]).await;
915 assert_eq!(bodies.len(), 1);
916 assert!(
917 cache.get_by_hash(&hash).is_some(),
918 "should be cached after fetch"
919 );
920 }
921
922 #[tokio::test]
923 async fn fetch_headers_by_hash_from_rpc_when_not_cached() {
924 let provider = moderato_provider();
925 let mut cache = BlockCache::new(100);
926
927 let latest = provider.get_block_number().await.unwrap();
928 let start = latest.saturating_sub(2);
929 let start_block = provider
930 .get_block_by_number(start.into())
931 .await
932 .unwrap()
933 .unwrap();
934 let start_hash = start_block.header.hash();
935
936 let request = GetBlockHeaders {
937 start_block: BlockHashOrNumber::Hash(start_hash),
938 limit: 3,
939 skip: 0,
940 direction: HeadersDirection::Rising,
941 };
942 let headers = resolve_headers(&provider, &mut cache, &request).await;
943
944 assert_eq!(headers.len(), 3);
945 assert_eq!(headers[0].number(), start);
946 assert_eq!(headers[0].hash_slow(), start_hash);
947 assert!(cache.get_by_hash(&start_hash).is_some());
948 }
949}