1use std::{
2 collections::BTreeMap,
3 sync::atomic::{AtomicU64, Ordering},
4 time::Duration,
5};
6use tracing::debug;
7
8const RATE_DECAY: u64 = 8;
10const MIN_SAMPLE_BYTES: usize = 128 * 1024;
12const VALIDATION_LATENCY_SAMPLE_WINDOW: usize = 64;
14const VALIDATION_LATENCY_WORKLOAD_SCALE: u128 = 1_000_000;
16
17static MARSHAL_PERSIST_NS_PER_BYTE: AtomicU64 = AtomicU64::new(0);
18
19pub fn marshal_persist_estimate() -> MarshalPersistEstimator {
25 MarshalPersistEstimator::from_ns_per_byte(MARSHAL_PERSIST_NS_PER_BYTE.load(Ordering::Relaxed))
26}
27
28pub fn observe_marshal_persist(block_size_bytes: usize, elapsed: Duration) {
36 if block_size_bytes < MIN_SAMPLE_BYTES || elapsed == Duration::ZERO {
37 return;
38 }
39
40 let block_size = block_size_bytes as u128;
41 let observed = elapsed
42 .as_nanos()
43 .saturating_add(block_size.saturating_sub(1))
44 / block_size;
45 let observed = observed.min(u128::from(u64::MAX)) as u64;
46
47 let _ =
48 MARSHAL_PERSIST_NS_PER_BYTE.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
49 Some(if current == 0 || observed >= current {
50 observed
51 } else {
52 let decay = ((current - observed) / RATE_DECAY).max(1);
53 current.saturating_sub(decay).max(observed)
54 })
55 });
56 debug!(
57 block_size_bytes,
58 elapsed = ?elapsed,
59 observed_ns_per_byte = observed,
60 estimated_ns_per_byte = MARSHAL_PERSIST_NS_PER_BYTE.load(Ordering::Relaxed),
61 "updated marshal persistence estimate"
62 );
63}
64
65#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
67pub struct MarshalPersistEstimator {
68 ns_per_byte: u64,
69}
70
71impl MarshalPersistEstimator {
72 pub fn from_ns_per_byte(ns_per_byte: u64) -> Self {
74 Self { ns_per_byte }
75 }
76
77 pub fn estimate(self, block_size_bytes: usize) -> Duration {
79 let nanos = u128::from(self.ns_per_byte).saturating_mul(block_size_bytes as u128);
80 Duration::from_nanos(nanos.min(u128::from(u64::MAX)) as u64)
81 }
82}
83
84#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
86pub struct ValidationLatencyWorkload {
87 gas_used: u64,
88 transaction_count: usize,
89}
90
91impl ValidationLatencyWorkload {
92 pub fn new(gas_used: u64, transaction_count: usize) -> Self {
94 Self {
95 gas_used,
96 transaction_count,
97 }
98 }
99}
100
101#[derive(Clone, Copy, Debug, PartialEq, Eq)]
102struct ValidationLatencySample {
103 workload: ValidationLatencyWorkload,
104 elapsed: Duration,
105}
106
107fn insert_count<T: Copy + Ord>(counts: &mut BTreeMap<T, usize>, value: T) {
108 *counts.entry(value).or_default() += 1;
109}
110
111fn remove_count<T: Copy + Ord>(counts: &mut BTreeMap<T, usize>, value: T) {
112 let count = counts
113 .get_mut(&value)
114 .expect("validation latency sample index out of sync");
115 *count -= 1;
116 if *count == 0 {
117 counts.remove(&value);
118 }
119}
120
121fn percentile_rank(len: usize, numerator: usize, denominator: usize) -> Option<usize> {
122 debug_assert!(numerator > 0);
123 debug_assert!(denominator > 0);
124 debug_assert!(numerator <= denominator);
125
126 if len == 0 {
127 return None;
128 }
129 Some((len * numerator).div_ceil(denominator))
130}
131
132fn percentile_from_counts<T: Copy + Ord>(
133 counts: &BTreeMap<T, usize>,
134 len: usize,
135 numerator: usize,
136 denominator: usize,
137) -> Option<T> {
138 let target = percentile_rank(len, numerator, denominator)?;
139 let mut seen = 0;
140 for (value, count) in counts {
141 seen += *count;
142 if seen >= target {
143 return Some(*value);
144 }
145 }
146 debug_assert!(false, "validation latency sample index out of sync");
147 None
148}
149
150fn scale_above_baseline(current: u128, baseline: u128) -> Option<u128> {
151 if current == 0 {
152 return Some(VALIDATION_LATENCY_WORKLOAD_SCALE);
153 }
154 if baseline == 0 {
155 return None;
156 }
157 if current <= baseline {
158 return Some(VALIDATION_LATENCY_WORKLOAD_SCALE);
159 }
160
161 Some(current.saturating_mul(VALIDATION_LATENCY_WORKLOAD_SCALE) / baseline)
162}
163
164fn scale_duration(elapsed: Duration, scale: u128) -> Duration {
165 let nanos = elapsed
166 .as_nanos()
167 .saturating_mul(scale)
168 .saturating_add(VALIDATION_LATENCY_WORKLOAD_SCALE.saturating_sub(1))
169 / VALIDATION_LATENCY_WORKLOAD_SCALE;
170 Duration::from_nanos(nanos.min(u128::from(u64::MAX)) as u64)
171}
172
173#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
175pub struct ValidationLatencyEstimate {
176 elapsed: Duration,
177 p90_gas_used: u64,
178 p90_transaction_count: usize,
179}
180
181impl ValidationLatencyEstimate {
182 pub fn estimate(self, workload: ValidationLatencyWorkload) -> Option<Duration> {
191 if self.elapsed == Duration::ZERO {
192 return None;
193 }
194
195 let scale = [
196 scale_above_baseline(u128::from(workload.gas_used), u128::from(self.p90_gas_used)),
197 scale_above_baseline(
198 workload.transaction_count as u128,
199 self.p90_transaction_count as u128,
200 ),
201 ]
202 .into_iter()
203 .flatten()
204 .max()?;
205 Some(scale_duration(self.elapsed, scale))
206 }
207}
208
209#[derive(Clone, Debug, Default)]
218pub struct ValidationLatencyEstimator {
219 sample_window: Vec<(u64, ValidationLatencySample)>,
223 elapsed_counts: BTreeMap<Duration, usize>,
224 gas_used_counts: BTreeMap<u64, usize>,
225 transaction_count_counts: BTreeMap<usize, usize>,
226}
227
228impl ValidationLatencyEstimator {
229 fn insert_sample_counts(&mut self, sample: ValidationLatencySample) {
230 insert_count(&mut self.elapsed_counts, sample.elapsed);
231 insert_count(&mut self.gas_used_counts, sample.workload.gas_used);
232 insert_count(
233 &mut self.transaction_count_counts,
234 sample.workload.transaction_count,
235 );
236 }
237
238 fn remove_sample_counts(&mut self, sample: ValidationLatencySample) {
239 remove_count(&mut self.elapsed_counts, sample.elapsed);
240 remove_count(&mut self.gas_used_counts, sample.workload.gas_used);
241 remove_count(
242 &mut self.transaction_count_counts,
243 sample.workload.transaction_count,
244 );
245 }
246
247 fn insert_sample(&mut self, sample_id: u64, sample: ValidationLatencySample) {
248 let insert_index = match self
249 .sample_window
250 .binary_search_by_key(&sample_id, |(id, _)| *id)
251 {
252 Ok(index) => {
253 let (_, replaced) = self.sample_window.remove(index);
254 self.remove_sample_counts(replaced);
255 index
256 }
257 Err(index) => index,
258 };
259
260 self.insert_sample_counts(sample);
261 self.sample_window.insert(insert_index, (sample_id, sample));
262 while self.sample_window.len() > VALIDATION_LATENCY_SAMPLE_WINDOW {
263 let (_, evicted) = self.sample_window.remove(0);
264 self.remove_sample_counts(evicted);
265 }
266 }
267
268 pub fn observe(
270 &mut self,
271 sample_id: u64,
272 workload: ValidationLatencyWorkload,
273 elapsed: Duration,
274 ) {
275 if elapsed == Duration::ZERO {
276 return;
277 }
278
279 let sample = ValidationLatencySample { workload, elapsed };
280 self.insert_sample(sample_id, sample);
281
282 debug!(
283 sample_id,
284 workload = ?workload,
285 elapsed = ?elapsed,
286 estimate = ?self.estimate(),
287 samples = self.sample_window.len(),
288 "updated validation latency estimate"
289 );
290 }
291
292 pub fn estimate(&self) -> Option<ValidationLatencyEstimate> {
298 let sample_count = self.sample_window.len();
299 let p90_elapsed = percentile_from_counts(&self.elapsed_counts, sample_count, 9, 10)?;
300 Some(ValidationLatencyEstimate {
301 elapsed: p90_elapsed,
302 p90_gas_used: percentile_from_counts(&self.gas_used_counts, sample_count, 9, 10)
303 .unwrap_or_default(),
304 p90_transaction_count: percentile_from_counts(
305 &self.transaction_count_counts,
306 sample_count,
307 9,
308 10,
309 )
310 .unwrap_or_default(),
311 })
312 }
313}
314
315#[cfg(test)]
316mod tests {
317 use super::*;
318
319 fn estimate_with_sample(
320 sample_workload: ValidationLatencyWorkload,
321 current_workload: ValidationLatencyWorkload,
322 ) -> Option<Duration> {
323 let mut estimator = ValidationLatencyEstimator::default();
324 estimator.observe(1, sample_workload, Duration::from_millis(100));
325 estimator
326 .estimate()
327 .and_then(|estimate| estimate.estimate(current_workload))
328 }
329
330 #[test]
331 fn observes_large_blocks_and_ignores_tiny_samples() {
332 MARSHAL_PERSIST_NS_PER_BYTE.store(0, Ordering::Relaxed);
333 observe_marshal_persist(MIN_SAMPLE_BYTES, Duration::from_millis(13));
334
335 assert_eq!(
336 marshal_persist_estimate().estimate(MIN_SAMPLE_BYTES),
337 Duration::from_nanos(13_107_200)
338 );
339
340 observe_marshal_persist(MIN_SAMPLE_BYTES - 1, Duration::from_millis(1));
341 observe_marshal_persist(1_000_000, Duration::ZERO);
342
343 assert_eq!(
344 marshal_persist_estimate().estimate(MIN_SAMPLE_BYTES),
345 Duration::from_nanos(13_107_200)
346 );
347 }
348
349 #[test]
350 fn validation_latency_estimate_uses_recent_p90_elapsed() {
351 let mut estimator = ValidationLatencyEstimator::default();
352 let sample_workload = ValidationLatencyWorkload::new(100, 0);
353 let current_workload = ValidationLatencyWorkload::new(100, 0);
354 for (sample_id, elapsed) in [(1, 10), (2, 20), (3, 30), (4, 40)] {
355 estimator.observe(sample_id, sample_workload, Duration::from_nanos(elapsed));
356 }
357 assert_eq!(
358 estimator
359 .estimate()
360 .and_then(|estimate| estimate.estimate(current_workload)),
361 Some(Duration::from_nanos(40))
362 );
363
364 estimator = ValidationLatencyEstimator::default();
365 for elapsed in 1..=VALIDATION_LATENCY_SAMPLE_WINDOW as u64 {
366 estimator.observe(
367 elapsed,
368 ValidationLatencyWorkload::new(1, 0),
369 Duration::from_nanos(elapsed),
370 );
371 }
372 estimator.observe(
373 10_000,
374 ValidationLatencyWorkload::new(1, 0),
375 Duration::from_nanos(10_000),
376 );
377 assert_eq!(
378 estimator
379 .estimate()
380 .and_then(|estimate| estimate.estimate(ValidationLatencyWorkload::new(1, 0))),
381 Some(Duration::from_nanos(59))
382 );
383 }
384
385 #[test]
386 fn validation_latency_estimate_replaces_existing_sample_id() {
387 let mut estimator = ValidationLatencyEstimator::default();
388 let workload = ValidationLatencyWorkload::new(100, 0);
389
390 estimator.observe(1, workload, Duration::from_millis(100));
391 estimator.observe(1, workload, Duration::from_millis(200));
392
393 assert_eq!(
394 estimator
395 .estimate()
396 .and_then(|estimate| estimate.estimate(workload)),
397 Some(Duration::from_millis(200))
398 );
399 }
400
401 #[test]
402 fn validation_latency_estimate_does_not_scale_down() {
403 assert_eq!(
404 estimate_with_sample(
405 ValidationLatencyWorkload::new(1_000, 10),
406 ValidationLatencyWorkload::new(400, 4)
407 ),
408 Some(Duration::from_millis(100))
409 );
410 }
411
412 #[test]
413 fn validation_latency_estimate_scales_up_by_gas_or_transactions() {
414 let sample = ValidationLatencyWorkload::new(1_000, 10);
415
416 assert_eq!(
417 estimate_with_sample(sample, ValidationLatencyWorkload::new(1_500, 10)),
418 Some(Duration::from_millis(150))
419 );
420 assert_eq!(
421 estimate_with_sample(sample, ValidationLatencyWorkload::new(1_000, 15)),
422 Some(Duration::from_millis(150))
423 );
424 }
425
426 #[test]
427 fn validation_latency_estimate_requires_non_empty_workload_feedback() {
428 let empty = ValidationLatencyWorkload::new(0, 0);
429
430 assert_eq!(
431 estimate_with_sample(empty, ValidationLatencyWorkload::new(0, 0)),
432 Some(Duration::from_millis(100))
433 );
434 assert_eq!(
435 estimate_with_sample(empty, ValidationLatencyWorkload::new(1_000, 10)),
436 None
437 );
438 }
439}