1use std::{collections::HashSet, fmt::Debug, str::FromStr, time::Duration};
4
5use commonware_runtime::{Clock as _, deterministic::Context};
6
7use crate::TestingNode;
8
9const PEERS_BLOCKED: &str = "peers_blocked";
10const LATEST_EPOCH: &str = "epoch_manager_latest_epoch";
11const LATEST_PARTICIPANTS: &str = "epoch_manager_latest_participants";
12const PROCESSED_HEIGHT: &str = "marshal_processed_height";
13const DKG_FAILURES: &str = "dkg_manager_ceremony_failures_total";
14const ROUNDS_SKIPPED: &str = "rounds_skipped_total";
15
16const DEFAULT_POLL_INTERVAL: Duration = Duration::from_millis(100);
17
18#[derive(Clone, Debug, PartialEq)]
19struct Sample {
20 name: String,
21 value: String,
22}
23
24impl Sample {
25 fn parse(line: &str) -> Option<Self> {
26 let line = line.trim();
27 if line.is_empty() || line.starts_with('#') {
28 return None;
29 }
30
31 let mut parts = line.split_whitespace();
32 let key = parts.next().expect("metric sample has no name");
33 let value = parts
34 .next()
35 .unwrap_or_else(|| panic!("metric sample `{key}` has no value"));
36
37 let name = key.split_once('{').map_or(key, |(name, _)| name);
38
39 Some(Self {
40 name: name.to_owned(),
41 value: value.to_owned(),
42 })
43 }
44
45 fn value<T>(&self) -> T
46 where
47 T: FromStr,
48 T::Err: Debug,
49 {
50 self.value.parse().expect("metrics parses into type")
51 }
52}
53
54#[derive(Clone, Debug, Default, PartialEq)]
55pub struct Metrics {
56 samples: Vec<Sample>,
57}
58
59pub trait MetricScope {
60 fn metric_prefix(&self) -> String;
61}
62
63pub trait MetricsExt {
64 fn to_metrics(&self) -> Metrics;
65}
66
67impl<T> MetricsExt for T
68where
69 T: commonware_runtime::Metrics,
70{
71 fn to_metrics(&self) -> Metrics {
72 Metrics::from_context(self)
73 }
74}
75
76impl<TClock> MetricScope for TestingNode<TClock>
77where
78 TClock: commonware_runtime::Clock,
79{
80 fn metric_prefix(&self) -> String {
81 Self::metric_prefix(self)
82 }
83}
84
85impl Metrics {
86 pub fn from_context(context: &impl commonware_runtime::Metrics) -> Self {
88 let samples = context.encode().lines().filter_map(Sample::parse).collect();
89 Self { samples }
90 }
91
92 pub fn value<T>(&self, metric_suffix: &str) -> Option<T>
93 where
94 T: FromStr,
95 T::Err: Debug,
96 {
97 self.samples
98 .iter()
99 .find(|s| s.name.ends_with(metric_suffix))
100 .map(Sample::value)
101 }
102
103 pub fn values<'a, T>(&'a self, metric_suffix: &'a str) -> impl Iterator<Item = T> + 'a
104 where
105 T: FromStr + 'a,
106 T::Err: Debug,
107 {
108 self.samples
109 .iter()
110 .filter(move |s| s.name.ends_with(metric_suffix))
111 .map(Sample::value)
112 }
113
114 pub fn for_scope(&self, scope: &impl MetricScope) -> Self {
116 let prefix = format!("{}_", scope.metric_prefix());
117 let samples = self
118 .samples
119 .iter()
120 .filter(|s| s.name.starts_with(&prefix))
121 .cloned()
122 .collect();
123 Self { samples }
124 }
125
126 #[track_caller]
128 pub fn consensus_at_height(&self, target_height: u64) -> usize {
129 self.values::<u64>(PROCESSED_HEIGHT)
130 .filter(|height| *height >= target_height)
131 .count()
132 }
133
134 #[track_caller]
136 pub fn consensus_at_epoch(&self, target_epoch: u64) -> usize {
137 self.values::<u64>(LATEST_EPOCH)
138 .filter(|epoch| *epoch >= target_epoch)
139 .count()
140 }
141
142 pub fn latest_consensus_epoch(&self) -> Option<u64> {
143 self.value::<u64>(LATEST_EPOCH)
144 }
145
146 pub fn latest_consensus_height(&self) -> Option<u64> {
147 self.value::<u64>(PROCESSED_HEIGHT)
148 }
149
150 pub fn consensus_before_epoch(&self, upper_bound: u64) -> bool {
151 self.values::<u64>(LATEST_EPOCH)
152 .all(|epoch| epoch < upper_bound)
153 }
154
155 pub fn has_consensus_participants(&self, target: u64) -> bool {
156 self.values::<u64>(LATEST_PARTICIPANTS)
157 .any(|participants| participants == target)
158 }
159
160 #[track_caller]
162 pub fn assert_no_blocked_peers(&self) {
163 assert!(
164 self.values::<u64>(PEERS_BLOCKED)
165 .all(|blocked_peers| blocked_peers == 0)
166 );
167 }
168
169 #[track_caller]
171 pub fn assert_no_dkg_failures(&self) {
172 assert!(
173 self.values::<u64>(DKG_FAILURES)
174 .all(|failures| failures == 0)
175 );
176 }
177
178 #[track_caller]
180 pub fn assert_any_rounds_skipped(&self) {
181 assert!(
182 self.values::<u64>(ROUNDS_SKIPPED)
183 .any(|skipped_rounds| skipped_rounds > 0),
184 "expected at least one consensus instance to have skipped rounds"
185 );
186 }
187}
188
189pub fn assert_no_duplicate_definitions(context: &impl commonware_runtime::Metrics) {
190 let mut definitions = HashSet::new();
191 let metrics = context.encode();
192
193 for definition in metrics.lines().filter(|line| line.starts_with('#')) {
194 assert!(
195 definitions.insert(definition),
196 "metric `{definition}` is duplicate"
197 );
198 }
199}
200
201pub async fn wait_for_metrics(context: &Context, predicate: impl FnMut(&Metrics) -> bool) {
203 wait_for_metrics_with_interval(context, DEFAULT_POLL_INTERVAL, predicate).await;
204}
205
206pub async fn wait_for_metrics_with_interval(
208 context: &Context,
209 poll_interval: Duration,
210 mut predicate: impl FnMut(&Metrics) -> bool,
211) {
212 loop {
213 let metrics = context.to_metrics();
214 if predicate(&metrics) {
215 return;
216 }
217
218 context.sleep(poll_interval).await;
219 }
220}
221
222pub async fn wait_for_height(context: &Context, scope: &impl MetricScope, target_height: u64) {
224 wait_for_height_with_interval(context, scope, target_height, DEFAULT_POLL_INTERVAL).await;
225}
226
227pub async fn wait_for_height_with_interval(
229 context: &Context,
230 scope: &impl MetricScope,
231 target_height: u64,
232 poll_interval: Duration,
233) {
234 wait_for_metrics_with_interval(context, poll_interval, |metrics| {
235 metrics.for_scope(scope).consensus_at_height(target_height) > 0
236 })
237 .await;
238}
239
240pub async fn wait_for_participants(context: &Context, target: u64) {
241 wait_for_participants_with_interval(context, target, DEFAULT_POLL_INTERVAL).await;
242}
243
244pub async fn wait_for_participants_with_interval(
245 context: &Context,
246 target: u64,
247 poll_interval: Duration,
248) {
249 wait_for_metrics_with_interval(context, poll_interval, |metrics| {
250 metrics.has_consensus_participants(target)
251 })
252 .await;
253}