Skip to main content

tempo_payload_types/
budget.rs

1use std::{
2    collections::BTreeMap,
3    sync::atomic::{AtomicU64, Ordering},
4    time::Duration,
5};
6use tracing::debug;
7
8/// How quickly the learned marshal persistence rate decays when blocks get cheaper.
9const RATE_DECAY: u64 = 8;
10/// Ignore tiny blocks so fixed archive overhead does not become a large-block byte cost.
11const MIN_SAMPLE_BYTES: usize = 128 * 1024;
12/// Number of recent successful EL validation timings to retain.
13const VALIDATION_LATENCY_SAMPLE_WINDOW: usize = 64;
14/// Fixed-point scale for validation workload multipliers.
15const VALIDATION_LATENCY_WORKLOAD_SCALE: u128 = 1_000_000;
16
17static MARSHAL_PERSIST_NS_PER_BYTE: AtomicU64 = AtomicU64::new(0);
18
19/// Returns the current estimate of consensus marshal persistence cost.
20///
21/// This is a point-in-time snapshot. Callers use it before building or
22/// returning a proposal so the same estimate is applied consistently to that
23/// decision.
24pub fn marshal_persist_estimate() -> MarshalPersistEstimator {
25    MarshalPersistEstimator::from_ns_per_byte(MARSHAL_PERSIST_NS_PER_BYTE.load(Ordering::Relaxed))
26}
27
28/// Records time spent persisting an encoded block through consensus marshal.
29///
30/// The observation is stored as nanoseconds per encoded block byte. Large
31/// blocks teach future build and return budgets how much size-dependent
32/// persistence time to reserve for both proposers and validators.
33/// Consensus records this from local `marshal.proposed` time after persisting a
34/// proposal.
35pub fn observe_marshal_persist(block_size_bytes: usize, elapsed: Duration) {
36    if block_size_bytes < MIN_SAMPLE_BYTES || elapsed == Duration::ZERO {
37        return;
38    }
39
40    let block_size = block_size_bytes as u128;
41    let observed = elapsed
42        .as_nanos()
43        .saturating_add(block_size.saturating_sub(1))
44        / block_size;
45    let observed = observed.min(u128::from(u64::MAX)) as u64;
46
47    let _ =
48        MARSHAL_PERSIST_NS_PER_BYTE.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
49            Some(if current == 0 || observed >= current {
50                observed
51            } else {
52                let decay = ((current - observed) / RATE_DECAY).max(1);
53                current.saturating_sub(decay).max(observed)
54            })
55        });
56    debug!(
57        block_size_bytes,
58        elapsed = ?elapsed,
59        observed_ns_per_byte = observed,
60        estimated_ns_per_byte = MARSHAL_PERSIST_NS_PER_BYTE.load(Ordering::Relaxed),
61        "updated marshal persistence estimate"
62    );
63}
64
65/// Point-in-time marshal persistence cost per encoded block byte.
66#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
67pub struct MarshalPersistEstimator {
68    ns_per_byte: u64,
69}
70
71impl MarshalPersistEstimator {
72    /// Creates an estimator from a raw nanoseconds-per-byte rate.
73    pub fn from_ns_per_byte(ns_per_byte: u64) -> Self {
74        Self { ns_per_byte }
75    }
76
77    /// Estimates marshal persistence time for an encoded block size.
78    pub fn estimate(self, block_size_bytes: usize) -> Duration {
79        let nanos = u128::from(self.ns_per_byte).saturating_mul(block_size_bytes as u128);
80        Duration::from_nanos(nanos.min(u128::from(u64::MAX)) as u64)
81    }
82}
83
84/// Gas and transaction count used to estimate validation latency.
85#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
86pub struct ValidationLatencyWorkload {
87    gas_used: u64,
88    transaction_count: usize,
89}
90
91impl ValidationLatencyWorkload {
92    /// Creates a validation workload from gas and transaction count.
93    pub fn new(gas_used: u64, transaction_count: usize) -> Self {
94        Self {
95            gas_used,
96            transaction_count,
97        }
98    }
99}
100
101#[derive(Clone, Copy, Debug, PartialEq, Eq)]
102struct ValidationLatencySample {
103    workload: ValidationLatencyWorkload,
104    elapsed: Duration,
105}
106
107fn insert_count<T: Copy + Ord>(counts: &mut BTreeMap<T, usize>, value: T) {
108    *counts.entry(value).or_default() += 1;
109}
110
111fn remove_count<T: Copy + Ord>(counts: &mut BTreeMap<T, usize>, value: T) {
112    let count = counts
113        .get_mut(&value)
114        .expect("validation latency sample index out of sync");
115    *count -= 1;
116    if *count == 0 {
117        counts.remove(&value);
118    }
119}
120
121fn percentile_rank(len: usize, numerator: usize, denominator: usize) -> Option<usize> {
122    debug_assert!(numerator > 0);
123    debug_assert!(denominator > 0);
124    debug_assert!(numerator <= denominator);
125
126    if len == 0 {
127        return None;
128    }
129    Some((len * numerator).div_ceil(denominator))
130}
131
132fn percentile_from_counts<T: Copy + Ord>(
133    counts: &BTreeMap<T, usize>,
134    len: usize,
135    numerator: usize,
136    denominator: usize,
137) -> Option<T> {
138    let target = percentile_rank(len, numerator, denominator)?;
139    let mut seen = 0;
140    for (value, count) in counts {
141        seen += *count;
142        if seen >= target {
143            return Some(*value);
144        }
145    }
146    debug_assert!(false, "validation latency sample index out of sync");
147    None
148}
149
150fn scale_above_baseline(current: u128, baseline: u128) -> Option<u128> {
151    if current == 0 {
152        return Some(VALIDATION_LATENCY_WORKLOAD_SCALE);
153    }
154    if baseline == 0 {
155        return None;
156    }
157    if current <= baseline {
158        return Some(VALIDATION_LATENCY_WORKLOAD_SCALE);
159    }
160
161    Some(current.saturating_mul(VALIDATION_LATENCY_WORKLOAD_SCALE) / baseline)
162}
163
164fn scale_duration(elapsed: Duration, scale: u128) -> Duration {
165    let nanos = elapsed
166        .as_nanos()
167        .saturating_mul(scale)
168        .saturating_add(VALIDATION_LATENCY_WORKLOAD_SCALE.saturating_sub(1))
169        / VALIDATION_LATENCY_WORKLOAD_SCALE;
170    Duration::from_nanos(nanos.min(u128::from(u64::MAX)) as u64)
171}
172
173/// Point-in-time validation latency estimate from recent proposal validation.
174#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
175pub struct ValidationLatencyEstimate {
176    elapsed: Duration,
177    p90_gas_used: u64,
178    p90_transaction_count: usize,
179}
180
181impl ValidationLatencyEstimate {
182    /// Estimates validation latency for the supplied workload.
183    ///
184    /// Recent elapsed validation feedback is the floor so faster replay feedback
185    /// still reclaims budget without shrinking smaller current blocks. If the
186    /// current block carries more gas or transactions than the recent P90 workload,
187    /// the estimate scales up by that excess. Encoded bytes are intentionally
188    /// not used here because BAL sidecar bytes are charged through marshal
189    /// persistence, not execution-layer validation work.
190    pub fn estimate(self, workload: ValidationLatencyWorkload) -> Option<Duration> {
191        if self.elapsed == Duration::ZERO {
192            return None;
193        }
194
195        let scale = [
196            scale_above_baseline(u128::from(workload.gas_used), u128::from(self.p90_gas_used)),
197            scale_above_baseline(
198                workload.transaction_count as u128,
199                self.p90_transaction_count as u128,
200            ),
201        ]
202        .into_iter()
203        .flatten()
204        .max()?;
205        Some(scale_duration(self.elapsed, scale))
206    }
207}
208
209/// Tracks recent local execution-layer block validation latency.
210///
211/// The validation latency estimate uses the recent P90 successful proposal
212/// validation as an absolute floor, then scales that floor up when the current
213/// workload exceeds the recent P90 gas or transaction count. This avoids
214/// combining independent per-unit rates from different workloads while still
215/// reserving validator headroom when the builder grows beyond the workloads
216/// that produced the feedback.
217#[derive(Clone, Debug, Default)]
218pub struct ValidationLatencyEstimator {
219    /// Samples are kept in id order for retention; count maps are keyed by
220    /// observed values so estimate snapshots can read percentiles without
221    /// sorting.
222    sample_window: Vec<(u64, ValidationLatencySample)>,
223    elapsed_counts: BTreeMap<Duration, usize>,
224    gas_used_counts: BTreeMap<u64, usize>,
225    transaction_count_counts: BTreeMap<usize, usize>,
226}
227
228impl ValidationLatencyEstimator {
229    fn insert_sample_counts(&mut self, sample: ValidationLatencySample) {
230        insert_count(&mut self.elapsed_counts, sample.elapsed);
231        insert_count(&mut self.gas_used_counts, sample.workload.gas_used);
232        insert_count(
233            &mut self.transaction_count_counts,
234            sample.workload.transaction_count,
235        );
236    }
237
238    fn remove_sample_counts(&mut self, sample: ValidationLatencySample) {
239        remove_count(&mut self.elapsed_counts, sample.elapsed);
240        remove_count(&mut self.gas_used_counts, sample.workload.gas_used);
241        remove_count(
242            &mut self.transaction_count_counts,
243            sample.workload.transaction_count,
244        );
245    }
246
247    fn insert_sample(&mut self, sample_id: u64, sample: ValidationLatencySample) {
248        let insert_index = match self
249            .sample_window
250            .binary_search_by_key(&sample_id, |(id, _)| *id)
251        {
252            Ok(index) => {
253                let (_, replaced) = self.sample_window.remove(index);
254                self.remove_sample_counts(replaced);
255                index
256            }
257            Err(index) => index,
258        };
259
260        self.insert_sample_counts(sample);
261        self.sample_window.insert(insert_index, (sample_id, sample));
262        while self.sample_window.len() > VALIDATION_LATENCY_SAMPLE_WINDOW {
263            let (_, evicted) = self.sample_window.remove(0);
264            self.remove_sample_counts(evicted);
265        }
266    }
267
268    /// Records local time spent validating a block through the execution layer.
269    pub fn observe(
270        &mut self,
271        sample_id: u64,
272        workload: ValidationLatencyWorkload,
273        elapsed: Duration,
274    ) {
275        if elapsed == Duration::ZERO {
276            return;
277        }
278
279        let sample = ValidationLatencySample { workload, elapsed };
280        self.insert_sample(sample_id, sample);
281
282        debug!(
283            sample_id,
284            workload = ?workload,
285            elapsed = ?elapsed,
286            estimate = ?self.estimate(),
287            samples = self.sample_window.len(),
288            "updated validation latency estimate"
289        );
290    }
291
292    /// Returns the current estimate for execution-layer block validation work.
293    ///
294    /// `None` means this node has not yet observed any successful validations.
295    /// Callers should fall back to their conservative validator-work estimate in
296    /// that case.
297    pub fn estimate(&self) -> Option<ValidationLatencyEstimate> {
298        let sample_count = self.sample_window.len();
299        let p90_elapsed = percentile_from_counts(&self.elapsed_counts, sample_count, 9, 10)?;
300        Some(ValidationLatencyEstimate {
301            elapsed: p90_elapsed,
302            p90_gas_used: percentile_from_counts(&self.gas_used_counts, sample_count, 9, 10)
303                .unwrap_or_default(),
304            p90_transaction_count: percentile_from_counts(
305                &self.transaction_count_counts,
306                sample_count,
307                9,
308                10,
309            )
310            .unwrap_or_default(),
311        })
312    }
313}
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318
319    fn estimate_with_sample(
320        sample_workload: ValidationLatencyWorkload,
321        current_workload: ValidationLatencyWorkload,
322    ) -> Option<Duration> {
323        let mut estimator = ValidationLatencyEstimator::default();
324        estimator.observe(1, sample_workload, Duration::from_millis(100));
325        estimator
326            .estimate()
327            .and_then(|estimate| estimate.estimate(current_workload))
328    }
329
330    #[test]
331    fn observes_large_blocks_and_ignores_tiny_samples() {
332        MARSHAL_PERSIST_NS_PER_BYTE.store(0, Ordering::Relaxed);
333        observe_marshal_persist(MIN_SAMPLE_BYTES, Duration::from_millis(13));
334
335        assert_eq!(
336            marshal_persist_estimate().estimate(MIN_SAMPLE_BYTES),
337            Duration::from_nanos(13_107_200)
338        );
339
340        observe_marshal_persist(MIN_SAMPLE_BYTES - 1, Duration::from_millis(1));
341        observe_marshal_persist(1_000_000, Duration::ZERO);
342
343        assert_eq!(
344            marshal_persist_estimate().estimate(MIN_SAMPLE_BYTES),
345            Duration::from_nanos(13_107_200)
346        );
347    }
348
349    #[test]
350    fn validation_latency_estimate_uses_recent_p90_elapsed() {
351        let mut estimator = ValidationLatencyEstimator::default();
352        let sample_workload = ValidationLatencyWorkload::new(100, 0);
353        let current_workload = ValidationLatencyWorkload::new(100, 0);
354        for (sample_id, elapsed) in [(1, 10), (2, 20), (3, 30), (4, 40)] {
355            estimator.observe(sample_id, sample_workload, Duration::from_nanos(elapsed));
356        }
357        assert_eq!(
358            estimator
359                .estimate()
360                .and_then(|estimate| estimate.estimate(current_workload)),
361            Some(Duration::from_nanos(40))
362        );
363
364        estimator = ValidationLatencyEstimator::default();
365        for elapsed in 1..=VALIDATION_LATENCY_SAMPLE_WINDOW as u64 {
366            estimator.observe(
367                elapsed,
368                ValidationLatencyWorkload::new(1, 0),
369                Duration::from_nanos(elapsed),
370            );
371        }
372        estimator.observe(
373            10_000,
374            ValidationLatencyWorkload::new(1, 0),
375            Duration::from_nanos(10_000),
376        );
377        assert_eq!(
378            estimator
379                .estimate()
380                .and_then(|estimate| estimate.estimate(ValidationLatencyWorkload::new(1, 0))),
381            Some(Duration::from_nanos(59))
382        );
383    }
384
385    #[test]
386    fn validation_latency_estimate_replaces_existing_sample_id() {
387        let mut estimator = ValidationLatencyEstimator::default();
388        let workload = ValidationLatencyWorkload::new(100, 0);
389
390        estimator.observe(1, workload, Duration::from_millis(100));
391        estimator.observe(1, workload, Duration::from_millis(200));
392
393        assert_eq!(
394            estimator
395                .estimate()
396                .and_then(|estimate| estimate.estimate(workload)),
397            Some(Duration::from_millis(200))
398        );
399    }
400
401    #[test]
402    fn validation_latency_estimate_does_not_scale_down() {
403        assert_eq!(
404            estimate_with_sample(
405                ValidationLatencyWorkload::new(1_000, 10),
406                ValidationLatencyWorkload::new(400, 4)
407            ),
408            Some(Duration::from_millis(100))
409        );
410    }
411
412    #[test]
413    fn validation_latency_estimate_scales_up_by_gas_or_transactions() {
414        let sample = ValidationLatencyWorkload::new(1_000, 10);
415
416        assert_eq!(
417            estimate_with_sample(sample, ValidationLatencyWorkload::new(1_500, 10)),
418            Some(Duration::from_millis(150))
419        );
420        assert_eq!(
421            estimate_with_sample(sample, ValidationLatencyWorkload::new(1_000, 15)),
422            Some(Duration::from_millis(150))
423        );
424    }
425
426    #[test]
427    fn validation_latency_estimate_requires_non_empty_workload_feedback() {
428        let empty = ValidationLatencyWorkload::new(0, 0);
429
430        assert_eq!(
431            estimate_with_sample(empty, ValidationLatencyWorkload::new(0, 0)),
432            Some(Duration::from_millis(100))
433        );
434        assert_eq!(
435            estimate_with_sample(empty, ValidationLatencyWorkload::new(1_000, 10)),
436            None
437        );
438    }
439}