Skip to main content

tempo_e2e/
metrics.rs

1//! Metrics parsing and assertion helpers for e2e tests.
2
3use std::{collections::HashSet, fmt::Debug, str::FromStr, time::Duration};
4
5use commonware_runtime::{Clock as _, deterministic::Context};
6
7use crate::TestingNode;
8
9const PEERS_BLOCKED: &str = "peers_blocked";
10const LATEST_EPOCH: &str = "epoch_manager_latest_epoch";
11const LATEST_PARTICIPANTS: &str = "epoch_manager_latest_participants";
12const PROCESSED_HEIGHT: &str = "marshal_processed_height";
13const DKG_FAILURES: &str = "dkg_manager_ceremony_failures_total";
14const ROUNDS_SKIPPED: &str = "rounds_skipped_total";
15
16const DEFAULT_POLL_INTERVAL: Duration = Duration::from_millis(100);
17
18#[derive(Clone, Debug, PartialEq)]
19struct Sample {
20    name: String,
21    value: String,
22}
23
24impl Sample {
25    fn parse(line: &str) -> Option<Self> {
26        let line = line.trim();
27        if line.is_empty() || line.starts_with('#') {
28            return None;
29        }
30
31        let mut parts = line.split_whitespace();
32        let key = parts.next().expect("metric sample has no name");
33        let value = parts
34            .next()
35            .unwrap_or_else(|| panic!("metric sample `{key}` has no value"));
36
37        let name = key.split_once('{').map_or(key, |(name, _)| name);
38
39        Some(Self {
40            name: name.to_owned(),
41            value: value.to_owned(),
42        })
43    }
44
45    fn value<T>(&self) -> T
46    where
47        T: FromStr,
48        T::Err: Debug,
49    {
50        self.value.parse().expect("metrics parses into type")
51    }
52}
53
54#[derive(Clone, Debug, Default, PartialEq)]
55pub struct Metrics {
56    samples: Vec<Sample>,
57}
58
59pub trait MetricScope {
60    fn metric_prefix(&self) -> String;
61}
62
63pub trait MetricsExt {
64    fn to_metrics(&self) -> Metrics;
65}
66
67impl<T> MetricsExt for T
68where
69    T: commonware_runtime::Metrics,
70{
71    fn to_metrics(&self) -> Metrics {
72        Metrics::from_context(self)
73    }
74}
75
76impl<TClock> MetricScope for TestingNode<TClock>
77where
78    TClock: commonware_runtime::Clock,
79{
80    fn metric_prefix(&self) -> String {
81        Self::metric_prefix(self)
82    }
83}
84
85impl Metrics {
86    /// Samples metrics from a Commonware runtime context.
87    pub fn from_context(context: &impl commonware_runtime::Metrics) -> Self {
88        let samples = context.encode().lines().filter_map(Sample::parse).collect();
89        Self { samples }
90    }
91
92    pub fn value<T>(&self, metric_suffix: &str) -> Option<T>
93    where
94        T: FromStr,
95        T::Err: Debug,
96    {
97        self.samples
98            .iter()
99            .find(|s| s.name.ends_with(metric_suffix))
100            .map(Sample::value)
101    }
102
103    pub fn values<'a, T>(&'a self, metric_suffix: &'a str) -> impl Iterator<Item = T> + 'a
104    where
105        T: FromStr + 'a,
106        T::Err: Debug,
107    {
108        self.samples
109            .iter()
110            .filter(move |s| s.name.ends_with(metric_suffix))
111            .map(Sample::value)
112    }
113
114    /// Returns metrics for a metric-emitting runtime scope.
115    pub fn for_scope(&self, scope: &impl MetricScope) -> Self {
116        let prefix = format!("{}_", scope.metric_prefix());
117        let samples = self
118            .samples
119            .iter()
120            .filter(|s| s.name.starts_with(&prefix))
121            .cloned()
122            .collect();
123        Self { samples }
124    }
125
126    /// Counts consensus instances whose processed height is at least `target_height`.
127    #[track_caller]
128    pub fn consensus_at_height(&self, target_height: u64) -> usize {
129        self.values::<u64>(PROCESSED_HEIGHT)
130            .filter(|height| *height >= target_height)
131            .count()
132    }
133
134    /// Counts consensus instances whose latest epoch is at least `target_epoch`.
135    #[track_caller]
136    pub fn consensus_at_epoch(&self, target_epoch: u64) -> usize {
137        self.values::<u64>(LATEST_EPOCH)
138            .filter(|epoch| *epoch >= target_epoch)
139            .count()
140    }
141
142    pub fn latest_consensus_epoch(&self) -> Option<u64> {
143        self.value::<u64>(LATEST_EPOCH)
144    }
145
146    pub fn latest_consensus_height(&self) -> Option<u64> {
147        self.value::<u64>(PROCESSED_HEIGHT)
148    }
149
150    pub fn consensus_before_epoch(&self, upper_bound: u64) -> bool {
151        self.values::<u64>(LATEST_EPOCH)
152            .all(|epoch| epoch < upper_bound)
153    }
154
155    pub fn has_consensus_participants(&self, target: u64) -> bool {
156        self.values::<u64>(LATEST_PARTICIPANTS)
157            .any(|participants| participants == target)
158    }
159
160    /// Asserts that all `peers_blocked` metrics are zero.
161    #[track_caller]
162    pub fn assert_no_blocked_peers(&self) {
163        assert!(
164            self.values::<u64>(PEERS_BLOCKED)
165                .all(|blocked_peers| blocked_peers == 0)
166        );
167    }
168
169    /// Asserts that all DKG ceremony failure counters are zero.
170    #[track_caller]
171    pub fn assert_no_dkg_failures(&self) {
172        assert!(
173            self.values::<u64>(DKG_FAILURES)
174                .all(|failures| failures == 0)
175        );
176    }
177
178    /// Asserts that at least one consensus instance skipped rounds.
179    #[track_caller]
180    pub fn assert_any_rounds_skipped(&self) {
181        assert!(
182            self.values::<u64>(ROUNDS_SKIPPED)
183                .any(|skipped_rounds| skipped_rounds > 0),
184            "expected at least one consensus instance to have skipped rounds"
185        );
186    }
187}
188
189pub fn assert_no_duplicate_definitions(context: &impl commonware_runtime::Metrics) {
190    let mut definitions = HashSet::new();
191    let metrics = context.encode();
192
193    for definition in metrics.lines().filter(|line| line.starts_with('#')) {
194        assert!(
195            definitions.insert(definition),
196            "metric `{definition}` is duplicate"
197        );
198    }
199}
200
201/// Polls context metrics until `predicate` returns true.
202pub async fn wait_for_metrics(context: &Context, predicate: impl FnMut(&Metrics) -> bool) {
203    wait_for_metrics_with_interval(context, DEFAULT_POLL_INTERVAL, predicate).await;
204}
205
206/// Polls context metrics at `poll_interval` until `predicate` returns true.
207pub async fn wait_for_metrics_with_interval(
208    context: &Context,
209    poll_interval: Duration,
210    mut predicate: impl FnMut(&Metrics) -> bool,
211) {
212    loop {
213        let metrics = context.to_metrics();
214        if predicate(&metrics) {
215            return;
216        }
217
218        context.sleep(poll_interval).await;
219    }
220}
221
222/// Polls until a metric scope reaches `target_height`.
223pub async fn wait_for_height(context: &Context, scope: &impl MetricScope, target_height: u64) {
224    wait_for_height_with_interval(context, scope, target_height, DEFAULT_POLL_INTERVAL).await;
225}
226
227/// Polls at `poll_interval` until a metric scope reaches `target_height`.
228pub async fn wait_for_height_with_interval(
229    context: &Context,
230    scope: &impl MetricScope,
231    target_height: u64,
232    poll_interval: Duration,
233) {
234    wait_for_metrics_with_interval(context, poll_interval, |metrics| {
235        metrics.for_scope(scope).consensus_at_height(target_height) > 0
236    })
237    .await;
238}
239
240pub async fn wait_for_participants(context: &Context, target: u64) {
241    wait_for_participants_with_interval(context, target, DEFAULT_POLL_INTERVAL).await;
242}
243
244pub async fn wait_for_participants_with_interval(
245    context: &Context,
246    target: u64,
247    poll_interval: Duration,
248) {
249    wait_for_metrics_with_interval(context, poll_interval, |metrics| {
250        metrics.has_consensus_participants(target)
251    })
252    .await;
253}