metrics/
histogram.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::{
6    metrics::{is_enabled, Metric},
7    registry::{DEFAULT_GROUPING_REGISTRY, DEFAULT_REGISTRY},
8};
9use parking_lot::RwLock;
10use rand::{rng, Rng};
11use std::{
12    cmp::Ordering,
13    collections::BinaryHeap,
14    sync::Arc,
15    time::{Duration, Instant},
16};
17
18pub trait Histogram: Send + Sync {
19    fn count(&self) -> usize { 0 }
20    fn max(&self) -> u64 { 0 }
21    fn mean(&self) -> f64 { 0.0 }
22    fn min(&self) -> u64 { 0 }
23    fn percentile(&self, _p: f64) -> u64 { 0 }
24    fn snapshot(&self) -> Arc<dyn Histogram> { Arc::new(Snapshot::default()) }
25    fn stddev(&self) -> f64 { self.variance().sqrt() }
26    fn sum(&self) -> u64 { 0 }
27    fn update(&self, _v: u64) {}
28    fn variance(&self) -> f64 { 0.0 }
29    fn update_since(&self, start_time: Instant) {
30        self.update(
31            Instant::now()
32                .saturating_duration_since(start_time)
33                .as_nanos() as u64,
34        );
35    }
36}
37
38pub enum Sample {
39    Uniform,
40    ExpDecay(f64),
41}
42
43impl Sample {
44    pub fn register(
45        &self, name: &str, reservoir_size: usize,
46    ) -> Arc<dyn Histogram> {
47        if !is_enabled() {
48            return Arc::new(NoopHistogram);
49        }
50
51        assert!(reservoir_size > 0);
52
53        match *self {
54            Sample::Uniform => {
55                let sample = Arc::new(UniformSample::new(reservoir_size));
56                DEFAULT_REGISTRY
57                    .write()
58                    .register(name.into(), sample.clone());
59                sample
60            }
61            Sample::ExpDecay(alpha) => {
62                let sample =
63                    Arc::new(ExpDecaySample::new(alpha, reservoir_size));
64                DEFAULT_REGISTRY
65                    .write()
66                    .register(name.into(), sample.clone());
67                sample
68            }
69        }
70    }
71
72    pub fn register_with_group(
73        &self, group: &str, name: &str, reservoir_size: usize,
74    ) -> Arc<dyn Histogram> {
75        if !is_enabled() {
76            return Arc::new(NoopHistogram);
77        }
78
79        assert!(reservoir_size > 0);
80
81        match *self {
82            Sample::Uniform => {
83                let sample = Arc::new(UniformSample::new(reservoir_size));
84                DEFAULT_GROUPING_REGISTRY.write().register(
85                    group.into(),
86                    name.into(),
87                    sample.clone(),
88                );
89                sample
90            }
91            Sample::ExpDecay(alpha) => {
92                let sample =
93                    Arc::new(ExpDecaySample::new(alpha, reservoir_size));
94                DEFAULT_GROUPING_REGISTRY.write().register(
95                    group.into(),
96                    name.into(),
97                    sample.clone(),
98                );
99                sample
100            }
101        }
102    }
103}
104
105struct NoopHistogram;
106impl Histogram for NoopHistogram {}
107
108#[derive(Default, Clone)]
109struct Snapshot {
110    count: usize,
111    values: Vec<u64>,
112}
113
114impl Histogram for Snapshot {
115    fn count(&self) -> usize { self.count }
116
117    fn max(&self) -> u64 { self.values.iter().max().cloned().unwrap_or(0) }
118
119    fn mean(&self) -> f64 {
120        if self.values.is_empty() {
121            0.0
122        } else {
123            self.sum() as f64 / self.values.len() as f64
124        }
125    }
126
127    fn min(&self) -> u64 { self.values.iter().min().cloned().unwrap_or(0) }
128
129    fn percentile(&self, p: f64) -> u64 { sample_percentile(&self.values, p) }
130
131    fn snapshot(&self) -> Arc<dyn Histogram> { Arc::new(self.clone()) }
132
133    fn sum(&self) -> u64 { self.values.iter().sum() }
134
135    fn variance(&self) -> f64 { sample_variance(&self.values) }
136}
137
138fn sample_percentile(sorted_values: &Vec<u64>, p: f64) -> u64 {
139    assert!(p > 0.0 && p < 1.0);
140    if sorted_values.is_empty() {
141        return 0;
142    }
143
144    let pos = (sorted_values.len() - 1) as f64 * p;
145    sorted_values.get(pos as usize).cloned().unwrap_or(0)
146}
147
148fn sample_variance(values: &Vec<u64>) -> f64 {
149    if values.is_empty() {
150        return 0.0;
151    }
152
153    let sum: u64 = values.iter().sum();
154    let mean = sum as f64 / values.len() as f64;
155
156    let mut sum = 0.0;
157    for v in values {
158        let d = *v as f64 - mean;
159        sum += d * d;
160    }
161
162    sum / values.len() as f64
163}
164
165/// A uniform sample using Vitter's Algorithm R. (http://www.cs.umd.edu/~samir/498/vitter.pdf)
166pub struct UniformSample {
167    reservoir_size: usize,
168    data: RwLock<Snapshot>,
169}
170
171impl UniformSample {
172    pub fn new(reservoir_size: usize) -> Self {
173        UniformSample {
174            reservoir_size,
175            data: RwLock::new(Snapshot {
176                count: 0,
177                values: Vec::with_capacity(reservoir_size),
178            }),
179        }
180    }
181}
182
183impl Histogram for UniformSample {
184    fn count(&self) -> usize { self.data.read().count() }
185
186    fn max(&self) -> u64 { self.data.read().max() }
187
188    fn mean(&self) -> f64 { self.data.read().mean() }
189
190    fn min(&self) -> u64 { self.data.read().min() }
191
192    fn percentile(&self, p: f64) -> u64 {
193        let mut data = self.data.write();
194        data.values.sort();
195        sample_percentile(&data.values, p)
196    }
197
198    fn snapshot(&self) -> Arc<dyn Histogram> {
199        Arc::new(self.data.read().clone())
200    }
201
202    fn sum(&self) -> u64 { self.data.read().sum() }
203
204    fn update(&self, v: u64) {
205        let mut data = self.data.write();
206
207        data.count += 1;
208
209        if data.values.len() < self.reservoir_size {
210            data.values.push(v);
211        } else {
212            let mut rng = rng();
213            let r = rng.random_range(0..data.count);
214
215            // replace probability is reservoir_size/1+count
216            if let Some(replaced) = data.values.get_mut(r) {
217                *replaced = v;
218            }
219        }
220    }
221
222    fn variance(&self) -> f64 { self.data.read().variance() }
223}
224
225impl Metric for UniformSample {
226    fn get_type(&self) -> &str { "Histogram" }
227}
228
229const RESCALE_THRESHOLD: Duration = Duration::from_secs(3600);
230
231struct ExpDecaySampleData {
232    count: usize,
233    t0: Instant,
234    t1: Instant,
235    values: BinaryHeap<ExpDecaySampleItem>,
236}
237
238/// ExpDecaySample is an exponentially-decaying sample using a forward-decaying
239/// priority reservoir. See Cormode et al's "Forward Decay: A Practical Time
240/// Decay Model for Streaming Systems".
241///
242/// <http://dimacs.rutgers.edu/~graham/pubs/papers/fwddecay.pdf>
243struct ExpDecaySample {
244    alpha: f64,
245    reservoir_size: usize,
246    data: RwLock<ExpDecaySampleData>,
247}
248
249impl ExpDecaySample {
250    fn new(alpha: f64, reservoir_size: usize) -> Self {
251        let now = Instant::now();
252        ExpDecaySample {
253            alpha,
254            reservoir_size,
255            data: RwLock::new(ExpDecaySampleData {
256                count: 0,
257                t0: now,
258                t1: now + RESCALE_THRESHOLD,
259                values: BinaryHeap::with_capacity(reservoir_size),
260            }),
261        }
262    }
263}
264
265impl Histogram for ExpDecaySample {
266    fn count(&self) -> usize { self.data.read().count }
267
268    fn max(&self) -> u64 {
269        let data = self.data.read();
270        data.values.iter().map(|item| item.v).max().unwrap_or(0)
271    }
272
273    fn mean(&self) -> f64 {
274        let data = self.data.read();
275
276        if data.values.is_empty() {
277            return 0.0;
278        }
279
280        let sum: u64 = data.values.iter().map(|item| item.v).sum();
281        sum as f64 / data.values.len() as f64
282    }
283
284    fn min(&self) -> u64 {
285        let data = self.data.read();
286        data.values.iter().map(|item| item.v).min().unwrap_or(0)
287    }
288
289    fn percentile(&self, p: f64) -> u64 {
290        let data = self.data.read();
291        let mut values: Vec<u64> =
292            data.values.iter().map(|item| item.v).collect();
293        values.sort();
294        sample_percentile(&values, p)
295    }
296
297    fn snapshot(&self) -> Arc<dyn Histogram> {
298        let data = self.data.read();
299        let mut values: Vec<u64> =
300            data.values.iter().map(|item| item.v).collect();
301        values.sort();
302        Arc::new(Snapshot {
303            count: data.count,
304            values,
305        })
306    }
307
308    fn sum(&self) -> u64 {
309        let data = self.data.read();
310        data.values.iter().map(|item| item.v).sum()
311    }
312
313    fn update(&self, v: u64) {
314        let mut data = self.data.write();
315
316        data.count += 1;
317
318        if data.values.len() == self.reservoir_size {
319            data.values.pop();
320        }
321
322        let now = Instant::now();
323        let k = (now - data.t0).as_nanos() as f64
324            / Duration::from_secs(1).as_nanos() as f64
325            * self.alpha;
326        let k = k.exp() * rand::rng().random_range(0.0..1.0);
327        if k.is_normal() {
328            data.values.push(ExpDecaySampleItem { k, v });
329        }
330
331        if now > data.t1 {
332            let items: Vec<ExpDecaySampleItem> = data.values.drain().collect();
333            let t0 = data.t0;
334
335            data.t0 = now;
336            data.t1 = now + RESCALE_THRESHOLD;
337            for mut item in items {
338                let k = (now - t0).as_nanos() as f64
339                    / Duration::from_secs(1).as_nanos() as f64
340                    * (-self.alpha);
341                item.k *= k.exp();
342                if k.is_normal() {
343                    data.values.push(item);
344                }
345            }
346        }
347    }
348
349    fn variance(&self) -> f64 {
350        let data = self.data.read();
351        let values: Vec<u64> = data.values.iter().map(|item| item.v).collect();
352        sample_variance(&values)
353    }
354}
355
356impl Metric for ExpDecaySample {
357    fn get_type(&self) -> &str { "Histogram" }
358}
359
360struct ExpDecaySampleItem {
361    k: f64,
362    v: u64,
363}
364
365impl PartialEq for ExpDecaySampleItem {
366    fn eq(&self, other: &Self) -> bool { self.k.eq(&other.k) }
367}
368
369impl Eq for ExpDecaySampleItem {}
370
371impl PartialOrd for ExpDecaySampleItem {
372    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
373        Some(self.cmp(other))
374    }
375}
376
377impl Ord for ExpDecaySampleItem {
378    // for k, the smaller, the bigger
379    fn cmp(&self, other: &Self) -> Ordering {
380        other
381            .k
382            .partial_cmp(&self.k)
383            .expect("k should be comparable")
384    }
385}