Skip to main content

tempo_node/
telemetry.rs

1//! Unified telemetry module for exporting metrics from both consensus and execution layers.
2//!
3//! This module pushes Prometheus-format metrics directly to Victoria Metrics by polling:
4//! - Commonware's runtime context (`context.encode()`)
5//! - Reth's prometheus recorder (`handle.render()`)
6
7use commonware_runtime::{Metrics as _, Spawner as _, tokio::Context};
8use eyre::WrapErr as _;
9use jiff::SignedDuration;
10use reth_node_metrics::recorder::install_prometheus_recorder;
11use reth_tracing::tracing;
12use url::Url;
13
14/// Configuration for Prometheus metrics push export.
15pub struct PrometheusMetricsConfig {
16    /// The Prometheus export endpoint.
17    pub endpoint: Url,
18    /// The interval at which to push metrics.
19    pub interval: SignedDuration,
20    /// Optional Authorization header value
21    pub auth_header: Option<String>,
22    /// Consensus Identifier for this node.
23    pub consensus_pubkey: Option<String>,
24    /// Peer Id for this node.
25    pub peer_id: String,
26}
27
28/// Spawns a task that periodically pushes both consensus and execution metrics to Victoria Metrics.
29///
30/// This concatenates Prometheus-format metrics from both sources and pushes them directly
31/// to Victoria Metrics' Prometheus import endpoint.
32///
33/// The task runs for the lifetime of the consensus runtime.
34pub fn install_prometheus_metrics(
35    context: Context,
36    config: PrometheusMetricsConfig,
37) -> eyre::Result<()> {
38    let interval: std::time::Duration = config
39        .interval
40        .try_into()
41        .wrap_err("invalid metrics duration")?;
42
43    let mut extra_label = format!("peer_id={}", config.peer_id);
44    if let Some(pubkey) = config.consensus_pubkey {
45        extra_label.push_str(&format!(",consensus_pubkey={pubkey}"));
46    }
47
48    let mut endpoint = config.endpoint;
49    endpoint
50        .query_pairs_mut()
51        .append_pair("extra_label", &extra_label);
52
53    let url = endpoint.to_string();
54    let client = reqwest::Client::new();
55    let auth_header = config.auth_header;
56
57    let reth_recorder = install_prometheus_recorder();
58    context.spawn(move |context| async move {
59        use commonware_runtime::Clock as _;
60
61        tracing::info_span!("metrics_exporter", %url).in_scope(|| tracing::info!("started"));
62
63        loop {
64            context.sleep(interval).await;
65
66            let consensus_metrics = context.encode();
67            let reth_metrics = reth_recorder.handle().render();
68            let body = format!("{consensus_metrics}\n{reth_metrics}");
69
70            // Push to Victoria Metrics
71            let mut request = client
72                .post(&url)
73                .header("Content-Type", "text/plain")
74                .body(body);
75
76            if let Some(ref auth) = auth_header {
77                request = request.header("Authorization", auth);
78            }
79
80            let res = request.send().await;
81            tracing::info_span!("metrics_exporter", %url).in_scope(|| match res {
82                Ok(response) if !response.status().is_success() => {
83                    tracing::warn!(status = %response.status(), "metrics endpoint returned failure")
84                }
85                Err(reason) => tracing::warn!(%reason, "metrics export failed"),
86                _ => {}
87            });
88        }
89    });
90
91    Ok(())
92}