Skip to main content

tempo_node/
telemetry.rs

1//! Unified telemetry module for exporting metrics from both consensus and execution layers.
2//!
3//! This module pushes Prometheus-format metrics directly to Victoria Metrics by polling:
4//! - Commonware's runtime context (`context.encode()`)
5//! - Reth's prometheus recorder (`handle.render()`)
6
7use commonware_runtime::{Metrics as _, Spawner as _, tokio::Context};
8use eyre::WrapErr as _;
9use jiff::SignedDuration;
10use reth_node_metrics::recorder::install_prometheus_recorder;
11use reth_tracing::tracing;
12use url::Url;
13
14/// Configuration for Prometheus metrics push export.
15pub struct PrometheusMetricsConfig {
16    /// The Prometheus export endpoint.
17    pub endpoint: Url,
18    /// The interval at which to push metrics.
19    pub interval: SignedDuration,
20    /// Optional Authorization header value
21    pub auth_header: Option<String>,
22    /// Consensus Identifier for this node.
23    pub consensus_pubkey: Option<String>,
24}
25
26/// Spawns a task that periodically pushes both consensus and execution metrics to Victoria Metrics.
27///
28/// This concatenates Prometheus-format metrics from both sources and pushes them directly
29/// to Victoria Metrics' Prometheus import endpoint.
30///
31/// The task runs for the lifetime of the consensus runtime.
32pub fn install_prometheus_metrics(
33    context: Context,
34    config: PrometheusMetricsConfig,
35) -> eyre::Result<()> {
36    let interval: std::time::Duration = config
37        .interval
38        .try_into()
39        .wrap_err("invalid metrics duration")?;
40
41    let mut endpoint = config.endpoint;
42    if let Some(pubkey) = config.consensus_pubkey {
43        endpoint
44            .query_pairs_mut()
45            .append_pair("extra_label", &format!("consensus_pubkey={pubkey}"));
46    }
47
48    let url = endpoint.to_string();
49    let client = reqwest::Client::new();
50    let auth_header = config.auth_header;
51
52    let reth_recorder = install_prometheus_recorder();
53    context.spawn(move |context| async move {
54        use commonware_runtime::Clock as _;
55
56        tracing::info_span!("metrics_exporter", %url).in_scope(|| tracing::info!("started"));
57
58        loop {
59            context.sleep(interval).await;
60
61            let consensus_metrics = context.encode();
62            let reth_metrics = reth_recorder.handle().render();
63            let body = format!("{consensus_metrics}\n{reth_metrics}");
64
65            // Push to Victoria Metrics
66            let mut request = client
67                .post(&url)
68                .header("Content-Type", "text/plain")
69                .body(body);
70
71            if let Some(ref auth) = auth_header {
72                request = request.header("Authorization", auth);
73            }
74
75            let res = request.send().await;
76            tracing::info_span!("metrics_exporter", %url).in_scope(|| match res {
77                Ok(response) if !response.status().is_success() => {
78                    tracing::warn!(status = %response.status(), "metrics endpoint returned failure")
79                }
80                Err(reason) => tracing::warn!(%reason, "metrics export failed"),
81                _ => {}
82            });
83        }
84    });
85
86    Ok(())
87}