1use crate::{
6 metrics::{is_enabled, Metric},
7 registry::{DEFAULT_GROUPING_REGISTRY, DEFAULT_REGISTRY},
8};
9use parking_lot::RwLock;
10use rand::{rng, Rng};
11use std::{
12 cmp::Ordering,
13 collections::BinaryHeap,
14 sync::Arc,
15 time::{Duration, Instant},
16};
17
18pub trait Histogram: Send + Sync {
19 fn count(&self) -> usize { 0 }
20 fn max(&self) -> u64 { 0 }
21 fn mean(&self) -> f64 { 0.0 }
22 fn min(&self) -> u64 { 0 }
23 fn percentile(&self, _p: f64) -> u64 { 0 }
24 fn snapshot(&self) -> Arc<dyn Histogram> { Arc::new(Snapshot::default()) }
25 fn stddev(&self) -> f64 { self.variance().sqrt() }
26 fn sum(&self) -> u64 { 0 }
27 fn update(&self, _v: u64) {}
28 fn variance(&self) -> f64 { 0.0 }
29 fn update_since(&self, start_time: Instant) {
30 self.update(
31 Instant::now()
32 .saturating_duration_since(start_time)
33 .as_nanos() as u64,
34 );
35 }
36}
37
38pub enum Sample {
39 Uniform,
40 ExpDecay(f64),
41}
42
43impl Sample {
44 pub fn register(
45 &self, name: &str, reservoir_size: usize,
46 ) -> Arc<dyn Histogram> {
47 if !is_enabled() {
48 return Arc::new(NoopHistogram);
49 }
50
51 assert!(reservoir_size > 0);
52
53 match *self {
54 Sample::Uniform => {
55 let sample = Arc::new(UniformSample::new(reservoir_size));
56 DEFAULT_REGISTRY
57 .write()
58 .register(name.into(), sample.clone());
59 sample
60 }
61 Sample::ExpDecay(alpha) => {
62 let sample =
63 Arc::new(ExpDecaySample::new(alpha, reservoir_size));
64 DEFAULT_REGISTRY
65 .write()
66 .register(name.into(), sample.clone());
67 sample
68 }
69 }
70 }
71
72 pub fn register_with_group(
73 &self, group: &str, name: &str, reservoir_size: usize,
74 ) -> Arc<dyn Histogram> {
75 if !is_enabled() {
76 return Arc::new(NoopHistogram);
77 }
78
79 assert!(reservoir_size > 0);
80
81 match *self {
82 Sample::Uniform => {
83 let sample = Arc::new(UniformSample::new(reservoir_size));
84 DEFAULT_GROUPING_REGISTRY.write().register(
85 group.into(),
86 name.into(),
87 sample.clone(),
88 );
89 sample
90 }
91 Sample::ExpDecay(alpha) => {
92 let sample =
93 Arc::new(ExpDecaySample::new(alpha, reservoir_size));
94 DEFAULT_GROUPING_REGISTRY.write().register(
95 group.into(),
96 name.into(),
97 sample.clone(),
98 );
99 sample
100 }
101 }
102 }
103}
104
105struct NoopHistogram;
106impl Histogram for NoopHistogram {}
107
108#[derive(Default, Clone)]
109struct Snapshot {
110 count: usize,
111 values: Vec<u64>,
112}
113
114impl Histogram for Snapshot {
115 fn count(&self) -> usize { self.count }
116
117 fn max(&self) -> u64 { self.values.iter().max().cloned().unwrap_or(0) }
118
119 fn mean(&self) -> f64 {
120 if self.values.is_empty() {
121 0.0
122 } else {
123 self.sum() as f64 / self.values.len() as f64
124 }
125 }
126
127 fn min(&self) -> u64 { self.values.iter().min().cloned().unwrap_or(0) }
128
129 fn percentile(&self, p: f64) -> u64 { sample_percentile(&self.values, p) }
130
131 fn snapshot(&self) -> Arc<dyn Histogram> { Arc::new(self.clone()) }
132
133 fn sum(&self) -> u64 { self.values.iter().sum() }
134
135 fn variance(&self) -> f64 { sample_variance(&self.values) }
136}
137
138fn sample_percentile(sorted_values: &Vec<u64>, p: f64) -> u64 {
139 assert!(p > 0.0 && p < 1.0);
140 if sorted_values.is_empty() {
141 return 0;
142 }
143
144 let pos = (sorted_values.len() - 1) as f64 * p;
145 sorted_values.get(pos as usize).cloned().unwrap_or(0)
146}
147
148fn sample_variance(values: &Vec<u64>) -> f64 {
149 if values.is_empty() {
150 return 0.0;
151 }
152
153 let sum: u64 = values.iter().sum();
154 let mean = sum as f64 / values.len() as f64;
155
156 let mut sum = 0.0;
157 for v in values {
158 let d = *v as f64 - mean;
159 sum += d * d;
160 }
161
162 sum / values.len() as f64
163}
164
165pub struct UniformSample {
167 reservoir_size: usize,
168 data: RwLock<Snapshot>,
169}
170
171impl UniformSample {
172 pub fn new(reservoir_size: usize) -> Self {
173 UniformSample {
174 reservoir_size,
175 data: RwLock::new(Snapshot {
176 count: 0,
177 values: Vec::with_capacity(reservoir_size),
178 }),
179 }
180 }
181}
182
183impl Histogram for UniformSample {
184 fn count(&self) -> usize { self.data.read().count() }
185
186 fn max(&self) -> u64 { self.data.read().max() }
187
188 fn mean(&self) -> f64 { self.data.read().mean() }
189
190 fn min(&self) -> u64 { self.data.read().min() }
191
192 fn percentile(&self, p: f64) -> u64 {
193 let mut data = self.data.write();
194 data.values.sort();
195 sample_percentile(&data.values, p)
196 }
197
198 fn snapshot(&self) -> Arc<dyn Histogram> {
199 Arc::new(self.data.read().clone())
200 }
201
202 fn sum(&self) -> u64 { self.data.read().sum() }
203
204 fn update(&self, v: u64) {
205 let mut data = self.data.write();
206
207 data.count += 1;
208
209 if data.values.len() < self.reservoir_size {
210 data.values.push(v);
211 } else {
212 let mut rng = rng();
213 let r = rng.random_range(0..data.count);
214
215 if let Some(replaced) = data.values.get_mut(r) {
217 *replaced = v;
218 }
219 }
220 }
221
222 fn variance(&self) -> f64 { self.data.read().variance() }
223}
224
225impl Metric for UniformSample {
226 fn get_type(&self) -> &str { "Histogram" }
227}
228
229const RESCALE_THRESHOLD: Duration = Duration::from_secs(3600);
230
231struct ExpDecaySampleData {
232 count: usize,
233 t0: Instant,
234 t1: Instant,
235 values: BinaryHeap<ExpDecaySampleItem>,
236}
237
238struct ExpDecaySample {
244 alpha: f64,
245 reservoir_size: usize,
246 data: RwLock<ExpDecaySampleData>,
247}
248
249impl ExpDecaySample {
250 fn new(alpha: f64, reservoir_size: usize) -> Self {
251 let now = Instant::now();
252 ExpDecaySample {
253 alpha,
254 reservoir_size,
255 data: RwLock::new(ExpDecaySampleData {
256 count: 0,
257 t0: now,
258 t1: now + RESCALE_THRESHOLD,
259 values: BinaryHeap::with_capacity(reservoir_size),
260 }),
261 }
262 }
263}
264
265impl Histogram for ExpDecaySample {
266 fn count(&self) -> usize { self.data.read().count }
267
268 fn max(&self) -> u64 {
269 let data = self.data.read();
270 data.values.iter().map(|item| item.v).max().unwrap_or(0)
271 }
272
273 fn mean(&self) -> f64 {
274 let data = self.data.read();
275
276 if data.values.is_empty() {
277 return 0.0;
278 }
279
280 let sum: u64 = data.values.iter().map(|item| item.v).sum();
281 sum as f64 / data.values.len() as f64
282 }
283
284 fn min(&self) -> u64 {
285 let data = self.data.read();
286 data.values.iter().map(|item| item.v).min().unwrap_or(0)
287 }
288
289 fn percentile(&self, p: f64) -> u64 {
290 let data = self.data.read();
291 let mut values: Vec<u64> =
292 data.values.iter().map(|item| item.v).collect();
293 values.sort();
294 sample_percentile(&values, p)
295 }
296
297 fn snapshot(&self) -> Arc<dyn Histogram> {
298 let data = self.data.read();
299 let mut values: Vec<u64> =
300 data.values.iter().map(|item| item.v).collect();
301 values.sort();
302 Arc::new(Snapshot {
303 count: data.count,
304 values,
305 })
306 }
307
308 fn sum(&self) -> u64 {
309 let data = self.data.read();
310 data.values.iter().map(|item| item.v).sum()
311 }
312
313 fn update(&self, v: u64) {
314 let mut data = self.data.write();
315
316 data.count += 1;
317
318 if data.values.len() == self.reservoir_size {
319 data.values.pop();
320 }
321
322 let now = Instant::now();
323 let k = (now - data.t0).as_nanos() as f64
324 / Duration::from_secs(1).as_nanos() as f64
325 * self.alpha;
326 let k = k.exp() * rand::rng().random_range(0.0..1.0);
327 if k.is_normal() {
328 data.values.push(ExpDecaySampleItem { k, v });
329 }
330
331 if now > data.t1 {
332 let items: Vec<ExpDecaySampleItem> = data.values.drain().collect();
333 let t0 = data.t0;
334
335 data.t0 = now;
336 data.t1 = now + RESCALE_THRESHOLD;
337 for mut item in items {
338 let k = (now - t0).as_nanos() as f64
339 / Duration::from_secs(1).as_nanos() as f64
340 * (-self.alpha);
341 item.k *= k.exp();
342 if k.is_normal() {
343 data.values.push(item);
344 }
345 }
346 }
347 }
348
349 fn variance(&self) -> f64 {
350 let data = self.data.read();
351 let values: Vec<u64> = data.values.iter().map(|item| item.v).collect();
352 sample_variance(&values)
353 }
354}
355
356impl Metric for ExpDecaySample {
357 fn get_type(&self) -> &str { "Histogram" }
358}
359
360struct ExpDecaySampleItem {
361 k: f64,
362 v: u64,
363}
364
365impl PartialEq for ExpDecaySampleItem {
366 fn eq(&self, other: &Self) -> bool { self.k.eq(&other.k) }
367}
368
369impl Eq for ExpDecaySampleItem {}
370
371impl PartialOrd for ExpDecaySampleItem {
372 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
373 Some(self.cmp(other))
374 }
375}
376
377impl Ord for ExpDecaySampleItem {
378 fn cmp(&self, other: &Self) -> Ordering {
380 other
381 .k
382 .partial_cmp(&self.k)
383 .expect("k should be comparable")
384 }
385}