1use alloy::{
2 consensus::BlockHeader,
3 eips::{BlockHashOrNumber, BlockNumberOrTag},
4 network::{Network, primitives::HeaderResponse},
5 primitives::B256,
6 providers::{Provider, ProviderBuilder},
7 rpc::client::BatchRequest,
8};
9use clap::Parser;
10use eyre::{Context, Result};
11use futures::StreamExt;
12use reth_chainspec::Head;
13use reth_cli_util::get_secret_key;
14use reth_eth_wire_types::{
15 HeadersDirection, PooledTransactions, primitives::BasicNetworkPrimitives,
16};
17use reth_ethereum::{
18 network::{
19 NetworkConfig, NetworkEventListenerProvider, NetworkInfo, NetworkManager, PeersConfig,
20 PeersInfo, eth_requests::IncomingEthRequest, transactions::NetworkTransactionEvent,
21 },
22 tasks::Runtime,
23};
24use std::{
25 collections::{BTreeMap, HashMap},
26 path::PathBuf,
27 sync::{
28 Arc,
29 atomic::{AtomicU64, Ordering},
30 },
31 time::Duration,
32};
33use tempo_alloy::TempoNetwork;
34use tempo_chainspec::spec::{TempoChainSpec, chain_value_parser};
35use tempo_primitives::{TempoHeader, TempoPrimitives, TempoTxEnvelope};
36use tokio::sync::{mpsc, oneshot};
37use tracing::{debug, error, info};
38
39type TempoNetPrimitives = BasicNetworkPrimitives<TempoPrimitives, TempoTxEnvelope>;
41type TempoRpcBlock = <TempoNetwork as Network>::BlockResponse;
42
43const CACHE_CAPACITY: u64 = 60 * 60 * 6; const HEADER_BATCH_SIZE: usize = 512;
46
47#[derive(Parser, Debug)]
48#[command(
49 about = "Run a proxy P2P node that serves cached block data fetched from an RPC endpoint"
50)]
51pub(crate) struct P2pProxyArgs {
52 #[arg(long, required = true)]
54 rpc_url: String,
55
56 #[arg(long, default_value = "mainnet")]
58 chain: String,
59
60 #[arg(long, default_value_t = 30303)]
62 port: u16,
63
64 #[arg(long)]
66 discovery_port: Option<u16>,
67
68 #[arg(long, default_value_t = 100)]
70 max_inbound: usize,
71
72 #[arg(long, default_value_t = 30)]
74 max_concurrent_inbound: usize,
75
76 #[arg(long)]
78 cache_blocks: Option<u64>,
79
80 #[arg(long)]
83 p2p_secret_key: Option<PathBuf>,
84}
85
86impl P2pProxyArgs {
87 pub(crate) async fn run(self) -> Result<()> {
88 let chain_spec = chain_value_parser(&self.chain)?;
89
90 let provider = ProviderBuilder::new_with_network::<TempoNetwork>()
92 .connect(&self.rpc_url)
93 .await
94 .context("failed to connect to RPC")?;
95 let latest_block = provider
96 .get_block_by_number(Default::default())
97 .await
98 .context("failed to fetch latest block")?
99 .ok_or_else(|| eyre::eyre!("latest block not found"))?;
100 let head = Head {
101 number: latest_block.header.number(),
102 hash: latest_block.header.hash(),
103 difficulty: latest_block.header.difficulty(),
104 total_difficulty: latest_block.header.difficulty(),
105 timestamp: latest_block.header.timestamp(),
106 };
107 info!(number = head.number, hash = %head.hash, "fetched latest head");
108
109 let (fetch_tx, fetch_rx) = mpsc::channel::<FetchRequest>(256);
111
112 let rpc_url = self.rpc_url.clone();
114 let cache_blocks = self.cache_blocks.unwrap_or(CACHE_CAPACITY);
115 tokio::spawn(async move {
116 if let Err(err) = run_fetcher_service(rpc_url, fetch_rx, cache_blocks).await {
117 error!(%err, "block fetcher service exited with error");
118 }
119 });
120
121 let secret_key = match &self.p2p_secret_key {
123 Some(path) => get_secret_key(path).context("failed to load P2P secret key")?,
124 None => reth_cli_util::load_secret_key::rng_secret_key(),
125 };
126
127 let net_cfg = NetConfig {
129 port: self.port,
130 discovery_port: self.discovery_port,
131 max_inbound: self.max_inbound,
132 max_concurrent_inbound: self.max_concurrent_inbound,
133 head,
134 };
135 run_p2p_network(chain_spec, net_cfg, fetch_tx, secret_key).await
136 }
137}
138
139struct NetConfig {
141 port: u16,
142 discovery_port: Option<u16>,
143 max_inbound: usize,
144 max_concurrent_inbound: usize,
145 head: Head,
146}
147
148struct RequestStats {
150 headers: AtomicU64,
151 bodies: AtomicU64,
152}
153
154enum FetchRequest {
156 GetHeaders {
157 request: reth_eth_wire_types::GetBlockHeaders,
158 response: oneshot::Sender<Vec<TempoHeader>>,
159 },
160 GetBodies {
161 hashes: Vec<B256>,
162 response: oneshot::Sender<Vec<tempo_primitives::BlockBody>>,
163 },
164}
165
166struct BlockCache {
168 by_number: BTreeMap<u64, CachedBlock>,
170 by_hash: HashMap<B256, u64>,
172 capacity: u64,
173}
174
175impl BlockCache {
176 fn new(capacity: u64) -> Self {
177 Self {
178 by_number: BTreeMap::new(),
179 by_hash: HashMap::new(),
180 capacity,
181 }
182 }
183
184 fn insert_header(&mut self, number: u64, hash: B256, header: TempoHeader) {
185 self.upsert(number, hash, header, None);
186 }
187
188 fn insert_block(
189 &mut self,
190 number: u64,
191 hash: B256,
192 header: TempoHeader,
193 body: tempo_primitives::BlockBody,
194 ) {
195 self.upsert(number, hash, header, Some(body));
196 }
197
198 fn upsert(
199 &mut self,
200 number: u64,
201 hash: B256,
202 header: TempoHeader,
203 body: Option<tempo_primitives::BlockBody>,
204 ) {
205 if let Some(old_hash) = self.by_number.get(&number).map(|block| block.hash)
206 && old_hash != hash
207 {
208 self.by_hash.remove(&old_hash);
209 }
210
211 let body = match self.by_number.remove(&number) {
212 Some(existing) => body.or(existing.body),
213 None => body,
214 };
215
216 self.by_number
217 .insert(number, CachedBlock { header, body, hash });
218 self.by_hash.insert(hash, number);
219 self.evict();
220 }
221
222 fn evict(&mut self) {
223 while self.by_number.len() as u64 > self.capacity {
224 if let Some((&oldest_num, _)) = self.by_number.iter().next()
225 && let Some(block) = self.by_number.remove(&oldest_num)
226 {
227 self.by_hash.remove(&block.hash);
228 }
229 }
230 }
231
232 fn get_by_number(&self, number: u64) -> Option<&CachedBlock> {
233 self.by_number.get(&number)
234 }
235
236 fn get_by_hash(&self, hash: &B256) -> Option<&CachedBlock> {
237 self.by_hash
238 .get(hash)
239 .and_then(|num| self.by_number.get(num))
240 }
241}
242
243#[derive(Clone)]
244struct CachedBlock {
245 header: TempoHeader,
246 body: Option<tempo_primitives::BlockBody>,
247 hash: B256,
248}
249
250async fn run_fetcher_service(
252 rpc_url: String,
253 mut fetch_rx: mpsc::Receiver<FetchRequest>,
254 cache_blocks: u64,
255) -> Result<()> {
256 let provider = ProviderBuilder::new_with_network::<TempoNetwork>()
257 .connect(&rpc_url)
258 .await
259 .context("failed to connect to RPC")?;
260
261 let mut cache = BlockCache::new(cache_blocks);
262
263 while let Some(req) = fetch_rx.recv().await {
265 match req {
266 FetchRequest::GetHeaders { request, response } => {
267 let headers = resolve_headers(&provider, &mut cache, &request).await;
268 let _ = response.send(headers);
269 }
270 FetchRequest::GetBodies { hashes, response } => {
271 let bodies = resolve_bodies(&provider, &mut cache, &hashes).await;
272 let _ = response.send(bodies);
273 }
274 }
275 }
276
277 Ok(())
278}
279
280async fn run_p2p_network(
282 chain_spec: Arc<TempoChainSpec>,
283 cfg: NetConfig,
284 fetch_tx: mpsc::Sender<FetchRequest>,
285 secret_key: secp256k1::SecretKey,
286) -> Result<()> {
287 let peers_config = PeersConfig::default()
288 .with_max_inbound(cfg.max_inbound)
289 .with_max_outbound(0);
290
291 let mut builder = NetworkConfig::<_, TempoNetPrimitives>::builder(secret_key, Runtime::test())
292 .listener_port(cfg.port)
293 .disable_dns_discovery()
294 .disable_tx_gossip(true)
295 .peer_config(peers_config)
296 .set_head(cfg.head);
297
298 if let Some(dp) = cfg.discovery_port {
299 builder = builder.discovery_port(dp);
300 }
301
302 let mut config = builder.build_with_noop_provider(chain_spec);
303 config.sessions_config.session_event_buffer = cfg.max_concurrent_inbound;
304
305 let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1024);
306 let (transactions_tx, mut transactions_rx) = reth_metrics::common::mpsc::memory_bounded_channel(
307 config
308 .transactions_manager_config
309 .tx_channel_memory_limit_bytes,
310 "p2p-proxy.tx",
311 );
312
313 let network = NetworkManager::new(config)
314 .await
315 .context("failed to create network manager")?
316 .with_eth_request_handler(requests_tx)
317 .with_transactions(transactions_tx);
318
319 let handle = network.handle().clone();
320 info!(
321 peer_id = %handle.peer_id(),
322 local_addr = %handle.local_addr(),
323 max_inbound = cfg.max_inbound,
324 "P2P proxy node started"
325 );
326
327 let events_handle = handle.clone();
329 tokio::spawn(async move {
330 let mut events = events_handle.event_listener();
331 while let Some(event) = events.next().await {
332 debug!(?event, "network event");
333 }
334 });
335
336 tokio::spawn(async move {
338 while let Some(event) = transactions_rx.recv().await {
339 if let NetworkTransactionEvent::GetPooledTransactions { response, .. } = event {
340 let _ = response.send(Ok(PooledTransactions(vec![])));
341 }
342 }
343 });
344
345 tokio::spawn(network);
347
348 let stats = Arc::new(RequestStats {
350 headers: AtomicU64::new(0),
351 bodies: AtomicU64::new(0),
352 });
353
354 let stats_log = Arc::clone(&stats);
356 let stats_handle = handle.clone();
357 tokio::spawn(async move {
358 let mut interval = tokio::time::interval(Duration::from_secs(30));
359 interval.tick().await; loop {
361 interval.tick().await;
362 let h = stats_log.headers.load(Ordering::Relaxed);
363 let b = stats_log.bodies.load(Ordering::Relaxed);
364 let peers = stats_handle.num_connected_peers();
365 info!(peers, headers_served = h, bodies_served = b, "proxy stats");
366 }
367 });
368
369 while let Some(eth_request) = requests_rx.recv().await {
371 match eth_request {
372 IncomingEthRequest::GetBlockHeaders {
373 peer_id,
374 request,
375 response,
376 } => {
377 debug!(%peer_id, ?request, "received GetBlockHeaders");
378 stats.headers.fetch_add(1, Ordering::Relaxed);
379 let fetch_tx = fetch_tx.clone();
380 tokio::spawn(async move {
381 let headers = async {
382 let (tx, rx) = oneshot::channel();
383 fetch_tx
384 .send(FetchRequest::GetHeaders {
385 request,
386 response: tx,
387 })
388 .await
389 .ok()?;
390 rx.await.ok()
391 }
392 .await
393 .unwrap_or_default();
394 let _ = response.send(Ok(headers.into()));
395 });
396 }
397 IncomingEthRequest::GetBlockBodies {
398 peer_id,
399 request,
400 response,
401 } => {
402 debug!(%peer_id, ?request, "received GetBlockBodies");
403 stats.bodies.fetch_add(1, Ordering::Relaxed);
404 let fetch_tx = fetch_tx.clone();
405 tokio::spawn(async move {
406 let bodies = async {
407 let (tx, rx) = oneshot::channel();
408 fetch_tx
409 .send(FetchRequest::GetBodies {
410 hashes: request.0,
411 response: tx,
412 })
413 .await
414 .ok()?;
415 rx.await.ok()
416 }
417 .await
418 .unwrap_or_default();
419 let _ = response.send(Ok(bodies.into()));
420 });
421 }
422 IncomingEthRequest::GetNodeData { response, .. } => {
424 let _ = response.send(Ok(Default::default()));
425 }
426 IncomingEthRequest::GetReceipts { response, .. } => {
427 let _ = response.send(Ok(reth_eth_wire_types::Receipts(vec![])));
428 }
429 IncomingEthRequest::GetReceipts69 { response, .. } => {
430 let _ = response.send(Ok(reth_eth_wire_types::Receipts69(vec![])));
431 }
432 IncomingEthRequest::GetReceipts70 { response, .. } => {
433 let _ = response.send(Ok(reth_eth_wire_types::Receipts70 {
434 last_block_incomplete: false,
435 receipts: vec![],
436 }));
437 }
438 IncomingEthRequest::GetBlockAccessLists { response, .. } => {
439 let _ = response.send(Ok(Default::default()));
440 }
441 }
442 }
443
444 Ok(())
445}
446
447#[cfg(test)]
449async fn fetch_and_cache_block(
450 provider: &impl Provider<TempoNetwork>,
451 cache: &mut BlockCache,
452 number: u64,
453) -> Result<()> {
454 let block = provider
455 .get_block_by_number(number.into())
456 .full()
457 .await
458 .context("rpc request failed")?
459 .ok_or_else(|| eyre::eyre!("block {number} not found"))?;
460
461 let hash = block.header.hash();
462 let header: TempoHeader = block.header.inner.inner.clone();
463 let body = tempo_primitives::BlockBody {
464 transactions: block
465 .transactions
466 .into_transactions()
467 .map(|tx| tx.into_inner())
468 .collect(),
469 ommers: vec![],
470 withdrawals: block.withdrawals,
471 };
472
473 cache.insert_block(number, hash, header, body);
474 Ok(())
475}
476
477async fn fetch_and_cache_header_by_hash(
478 provider: &impl Provider<TempoNetwork>,
479 cache: &mut BlockCache,
480 hash: B256,
481) -> Result<u64> {
482 let block = provider
483 .get_block_by_hash(hash)
484 .await
485 .context("rpc request failed")?
486 .ok_or_else(|| eyre::eyre!("block {hash} not found"))?;
487
488 let number = block.header.number();
489 let header: TempoHeader = block.header.inner.inner.clone();
490 cache.insert_header(number, block.header.hash(), header);
491 Ok(number)
492}
493
494async fn fetch_and_cache_header_by_number(
495 provider: &impl Provider<TempoNetwork>,
496 cache: &mut BlockCache,
497 number: u64,
498) -> Result<()> {
499 let block = provider
500 .get_block_by_number(number.into())
501 .await
502 .context("rpc request failed")?
503 .ok_or_else(|| eyre::eyre!("block {number} not found"))?;
504
505 let header: TempoHeader = block.header.inner.inner.clone();
506 cache.insert_header(block.header.number(), block.header.hash(), header);
507 Ok(())
508}
509
510async fn fetch_and_cache_header_batch(
511 provider: &impl Provider<TempoNetwork>,
512 cache: &mut BlockCache,
513 numbers: &[u64],
514) -> Result<()> {
515 let mut batch = BatchRequest::new(provider.client());
516 let mut waiters = Vec::with_capacity(numbers.len());
517
518 for &number in numbers {
519 let waiter = batch.add_call::<_, Option<TempoRpcBlock>>(
520 "eth_getBlockByNumber",
521 &(BlockNumberOrTag::Number(number), false),
522 )?;
523 waiters.push((number, waiter));
524 }
525
526 batch.send().await.context("failed to fetch header batch")?;
527
528 for (number, waiter) in waiters {
529 match waiter.await {
530 Ok(Some(block)) => {
531 let header: TempoHeader = block.header.inner.inner.clone();
532 cache.insert_header(block.header.number(), block.header.hash(), header);
533 }
534 Ok(None) => {
535 debug!(number, "header batch returned no block");
536 }
537 Err(err) => {
538 debug!(number, %err, "header batch waiter failed; falling back to single request");
539 let _ = fetch_and_cache_header_by_number(provider, cache, number).await;
540 }
541 }
542 }
543
544 Ok(())
545}
546
547async fn fetch_and_cache_headers(
548 provider: &impl Provider<TempoNetwork>,
549 cache: &mut BlockCache,
550 numbers: &[u64],
551) {
552 let missing_numbers: Vec<u64> = numbers
553 .iter()
554 .copied()
555 .filter(|number| cache.get_by_number(*number).is_none())
556 .collect();
557
558 for chunk in missing_numbers.chunks(HEADER_BATCH_SIZE) {
559 if fetch_and_cache_header_batch(provider, cache, chunk)
560 .await
561 .is_err()
562 {
563 for &number in chunk {
564 let _ = fetch_and_cache_header_by_number(provider, cache, number).await;
565 }
566 }
567 }
568}
569
570async fn resolve_start_block_number(
571 provider: &impl Provider<TempoNetwork>,
572 cache: &mut BlockCache,
573 start_block: BlockHashOrNumber,
574) -> Option<u64> {
575 match start_block {
576 BlockHashOrNumber::Number(number) => Some(number),
577 BlockHashOrNumber::Hash(hash) => {
578 if let Some(block) = cache.get_by_hash(&hash) {
579 return Some(block.header.number());
580 }
581
582 fetch_and_cache_header_by_hash(provider, cache, hash)
583 .await
584 .ok()
585 }
586 }
587}
588
589fn requested_header_numbers(
590 mut current: u64,
591 request: &reth_eth_wire_types::GetBlockHeaders,
592) -> Vec<u64> {
593 let limit = request.limit.min(HEADER_BATCH_SIZE as u64) as usize;
594 let mut numbers = Vec::with_capacity(limit);
595 let step = u64::from(request.skip) + 1;
596
597 for _ in 0..limit {
598 numbers.push(current);
599
600 match request.direction {
601 HeadersDirection::Rising => match current.checked_add(step) {
602 Some(next) => current = next,
603 None => break,
604 },
605 HeadersDirection::Falling => match current.checked_sub(step) {
606 Some(next) => current = next,
607 None => break,
608 },
609 }
610 }
611
612 numbers
613}
614
615async fn resolve_headers(
617 provider: &impl Provider<TempoNetwork>,
618 cache: &mut BlockCache,
619 request: &reth_eth_wire_types::GetBlockHeaders,
620) -> Vec<TempoHeader> {
621 let Some(start_num) = resolve_start_block_number(provider, cache, request.start_block).await
622 else {
623 return Vec::new();
624 };
625
626 let requested_numbers = requested_header_numbers(start_num, request);
627 fetch_and_cache_headers(provider, cache, &requested_numbers).await;
628
629 let mut headers = Vec::with_capacity(requested_numbers.len());
630 for number in requested_numbers {
631 let Some(block) = cache.get_by_number(number) else {
632 break;
633 };
634 headers.push(block.header.clone());
635 }
636
637 headers
638}
639
640async fn fetch_body_by_hash(
641 provider: &impl Provider<TempoNetwork>,
642 cache: &mut BlockCache,
643 hash: B256,
644) -> Option<tempo_primitives::BlockBody> {
645 let block = provider
646 .get_block_by_hash(hash)
647 .full()
648 .await
649 .ok()
650 .flatten()?;
651 let number = block.header.number();
652 let header: TempoHeader = block.header.inner.inner.clone();
653 let body = tempo_primitives::BlockBody {
654 transactions: block
655 .transactions
656 .into_transactions()
657 .map(|tx| tx.into_inner())
658 .collect(),
659 ommers: vec![],
660 withdrawals: block.withdrawals,
661 };
662
663 cache.insert_block(number, hash, header, body.clone());
664 Some(body)
665}
666
667async fn resolve_bodies(
669 provider: &impl Provider<TempoNetwork>,
670 cache: &mut BlockCache,
671 hashes: &[B256],
672) -> Vec<tempo_primitives::BlockBody> {
673 let mut bodies = Vec::new();
674
675 for &hash in hashes {
676 if let Some(body) = cache
677 .get_by_hash(&hash)
678 .and_then(|block| block.body.clone())
679 {
680 bodies.push(body);
681 continue;
682 }
683
684 if let Some(body) = fetch_body_by_hash(provider, cache, hash).await {
685 bodies.push(body);
686 }
687 }
688
689 bodies
690}
691
692#[cfg(test)]
693mod tests {
694 use super::*;
695 use alloy::{consensus::BlockHeader, primitives::Sealable};
696 use reth_eth_wire_types::GetBlockHeaders;
697
698 const MODERATO_RPC: &str = "https://rpc.moderato.tempo.xyz";
699
700 fn moderato_provider() -> impl Provider<TempoNetwork> {
701 ProviderBuilder::new_with_network::<TempoNetwork>()
702 .connect_http(MODERATO_RPC.parse().unwrap())
703 }
704
705 #[tokio::test]
706 async fn fetch_headers_and_bodies() {
707 let provider = moderato_provider();
708 let mut cache = BlockCache::new(100);
709
710 let latest = provider.get_block_number().await.unwrap();
711 let start = latest.saturating_sub(4);
712
713 let request = GetBlockHeaders {
715 start_block: BlockHashOrNumber::Number(start),
716 limit: 5,
717 skip: 0,
718 direction: HeadersDirection::Rising,
719 };
720 let headers = resolve_headers(&provider, &mut cache, &request).await;
721 assert_eq!(headers.len(), 5);
722 for (i, header) in headers.iter().enumerate() {
723 assert_eq!(header.number(), start + i as u64);
724 }
725 for pair in headers.windows(2) {
727 assert_eq!(pair[1].parent_hash(), pair[0].hash_slow());
728 }
729
730 let hashes: Vec<B256> = (start..=latest)
732 .map(|n| cache.get_by_number(n).unwrap().hash)
733 .collect();
734 let bodies = resolve_bodies(&provider, &mut cache, &hashes).await;
735 assert_eq!(bodies.len(), 5);
736 }
737
738 #[tokio::test]
739 async fn fetch_body_by_hash_from_rpc() {
740 let provider = moderato_provider();
741 let mut cache = BlockCache::new(100);
742
743 let latest = provider.get_block_number().await.unwrap();
745 fetch_and_cache_block(&provider, &mut cache, latest)
746 .await
747 .unwrap();
748 let hash = cache.get_by_number(latest).unwrap().hash;
749 cache = BlockCache::new(100);
750
751 let bodies = resolve_bodies(&provider, &mut cache, &[hash]).await;
752 assert_eq!(bodies.len(), 1);
753 assert!(
754 cache.get_by_hash(&hash).is_some(),
755 "should be cached after fetch"
756 );
757 }
758
759 #[tokio::test]
760 async fn fetch_headers_by_hash_from_rpc_when_not_cached() {
761 let provider = moderato_provider();
762 let mut cache = BlockCache::new(100);
763
764 let latest = provider.get_block_number().await.unwrap();
765 let start = latest.saturating_sub(2);
766 let start_block = provider
767 .get_block_by_number(start.into())
768 .await
769 .unwrap()
770 .unwrap();
771 let start_hash = start_block.header.hash();
772
773 let request = GetBlockHeaders {
774 start_block: BlockHashOrNumber::Hash(start_hash),
775 limit: 3,
776 skip: 0,
777 direction: HeadersDirection::Rising,
778 };
779 let headers = resolve_headers(&provider, &mut cache, &request).await;
780
781 assert_eq!(headers.len(), 3);
782 assert_eq!(headers[0].number(), start);
783 assert_eq!(headers[0].hash_slow(), start_hash);
784 assert!(cache.get_by_hash(&start_hash).is_some());
785 }
786}