metrics/
meter.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::{
6    ewma::EWMA,
7    metrics::{is_enabled, Metric, ORDER},
8    registry::{DEFAULT_GROUPING_REGISTRY, DEFAULT_REGISTRY},
9};
10use chrono::Duration;
11use lazy_static::lazy_static;
12use parking_lot::{Mutex, RwLock};
13use std::{
14    collections::HashMap,
15    sync::{atomic::AtomicBool, Arc},
16    time::Instant,
17};
18use timer::Timer;
19
20/// The tick interval in seconds used by `MeterArbiter` to update rates.
21const TICK_INTERVAL_SECS: i64 = 5;
22
23// Meters count events to produce exponentially-weighted moving average rates
24// at one-, five-, and fifteen-minutes, a mean rate, and a raw rate (m0).
25pub trait Meter: Send + Sync {
26    fn count(&self) -> usize { 0 }
27    fn mark(&self, _n: usize) {}
28    fn rate1(&self) -> f64 { 0.0 }
29    fn rate5(&self) -> f64 { 0.0 }
30    fn rate15(&self) -> f64 { 0.0 }
31    fn rate_mean(&self) -> f64 { 0.0 }
32    /// Returns the raw rate (events/sec) measured over the last 5-second
33    /// tick window, without any smoothing.
34    fn rate_m0(&self) -> f64 { 0.0 }
35    fn snapshot(&self) -> Arc<dyn Meter> { Arc::new(MeterSnapshot::default()) }
36    fn stop(&self) {}
37}
38
39struct NoopMeter;
40impl Meter for NoopMeter {}
41
42pub fn register_meter(name: &str) -> Arc<dyn Meter> {
43    if !is_enabled() {
44        return Arc::new(NoopMeter);
45    }
46
47    let meter = Arc::new(StandardMeter::new(name.into()));
48    DEFAULT_REGISTRY
49        .write()
50        .register(name.into(), meter.clone());
51    ARBITER.meters.lock().insert(name.into(), meter.clone());
52
53    meter
54}
55
56pub fn register_meter_with_group(group: &str, name: &str) -> Arc<dyn Meter> {
57    if !is_enabled() {
58        return Arc::new(NoopMeter);
59    }
60
61    let mut full_meter_name = String::from(group);
62    full_meter_name.push('_');
63    full_meter_name.push_str(name);
64
65    let meter = Arc::new(StandardMeter::new(full_meter_name.clone()));
66    DEFAULT_GROUPING_REGISTRY.write().register(
67        group.into(),
68        name.into(),
69        meter.clone(),
70    );
71
72    let mut meters = ARBITER.meters.lock();
73    assert_eq!(meters.contains_key(&full_meter_name), false);
74    meters.insert(full_meter_name, meter.clone());
75
76    meter
77}
78
79#[derive(Default, Clone)]
80struct MeterSnapshot {
81    count: usize,
82    /* m1, m5, m15, mean, m0 (raw rate over the last-tick window) */
83    rates: [u64; 5],
84    /// Count at the previous tick, used to derive m0 rate.
85    last_tick_count: usize,
86}
87
88impl Meter for MeterSnapshot {
89    fn count(&self) -> usize { self.count }
90
91    fn rate1(&self) -> f64 { f64::from_bits(self.rates[0]) }
92
93    fn rate5(&self) -> f64 { f64::from_bits(self.rates[1]) }
94
95    fn rate15(&self) -> f64 { f64::from_bits(self.rates[2]) }
96
97    fn rate_mean(&self) -> f64 { f64::from_bits(self.rates[3]) }
98
99    fn rate_m0(&self) -> f64 { f64::from_bits(self.rates[4]) }
100
101    fn snapshot(&self) -> Arc<dyn Meter> { Arc::new(self.clone()) }
102}
103
104pub struct StandardMeter {
105    name: String,
106    snapshot: RwLock<MeterSnapshot>,
107    ewmas: [EWMA; 3],
108    start_time: Instant,
109    stopped: AtomicBool,
110}
111
112impl StandardMeter {
113    pub fn new(name: String) -> Self {
114        StandardMeter {
115            name,
116            snapshot: RwLock::new(MeterSnapshot::default()),
117            ewmas: [EWMA::new(1.0), EWMA::new(5.0), EWMA::new(15.0)],
118            start_time: Instant::now(),
119            stopped: AtomicBool::new(false),
120        }
121    }
122
123    fn tick(&self) {
124        let mut snapshot = self.snapshot.write();
125
126        for i in 0..3 {
127            self.ewmas[i].tick();
128            snapshot.rates[i] = f64::to_bits(self.ewmas[i].rate());
129        }
130
131        let rate_mean_nano =
132            snapshot.count as f64 / self.start_time.elapsed().as_nanos() as f64;
133        snapshot.rates[3] = f64::to_bits(rate_mean_nano * 1e9);
134
135        // m0 rate: events since last tick / 5 seconds.
136        let m0_rate = (snapshot.count - snapshot.last_tick_count) as f64
137            / TICK_INTERVAL_SECS as f64;
138        snapshot.last_tick_count = snapshot.count;
139        snapshot.rates[4] = f64::to_bits(m0_rate);
140    }
141}
142
143impl Meter for StandardMeter {
144    fn count(&self) -> usize { self.snapshot.read().count }
145
146    fn mark(&self, n: usize) {
147        if self.stopped.load(ORDER) {
148            return;
149        }
150
151        let mut snapshot = self.snapshot.write();
152        snapshot.count += n;
153
154        self.ewmas[0].update(n);
155        self.ewmas[1].update(n);
156        self.ewmas[2].update(n);
157
158        let rate_mean_nano =
159            snapshot.count as f64 / self.start_time.elapsed().as_nanos() as f64;
160        snapshot.rates[3] = f64::to_bits(rate_mean_nano * 1e9);
161    }
162
163    fn rate1(&self) -> f64 { f64::from_bits(self.snapshot.read().rates[0]) }
164
165    fn rate5(&self) -> f64 { f64::from_bits(self.snapshot.read().rates[1]) }
166
167    fn rate15(&self) -> f64 { f64::from_bits(self.snapshot.read().rates[2]) }
168
169    fn rate_mean(&self) -> f64 { f64::from_bits(self.snapshot.read().rates[3]) }
170
171    fn rate_m0(&self) -> f64 { f64::from_bits(self.snapshot.read().rates[4]) }
172
173    fn snapshot(&self) -> Arc<dyn Meter> {
174        Arc::new(self.snapshot.read().clone())
175    }
176
177    fn stop(&self) {
178        if let Ok(false) =
179            self.stopped.compare_exchange(false, true, ORDER, ORDER)
180        {
181            ARBITER.meters.lock().remove(&self.name);
182        }
183    }
184}
185
186impl Metric for StandardMeter {
187    fn get_type(&self) -> &str { "Meter" }
188}
189
190impl Drop for StandardMeter {
191    fn drop(&mut self) { self.stop(); }
192}
193
194lazy_static! {
195    static ref ARBITER: MeterArbiter = MeterArbiter::default();
196}
197
198/// MeterArbiter ticks meters every 5s from a single thread.
199/// meters are references in a set for future stopping.
200struct MeterArbiter {
201    meters: Arc<Mutex<HashMap<String, Arc<StandardMeter>>>>,
202    timer: Timer,
203}
204
205unsafe impl Send for MeterArbiter {}
206unsafe impl Sync for MeterArbiter {}
207
208impl Default for MeterArbiter {
209    fn default() -> Self {
210        let arbiter = MeterArbiter {
211            meters: Arc::new(Mutex::new(HashMap::new())),
212            timer: Timer::new(),
213        };
214
215        let meters = arbiter.meters.clone();
216        arbiter
217            .timer
218            .schedule_repeating(
219                Duration::seconds(TICK_INTERVAL_SECS),
220                move || {
221                    for (_, meter) in meters.lock().iter() {
222                        meter.tick();
223                    }
224                },
225            )
226            .ignore();
227
228        arbiter
229    }
230}
231
232/// A struct used to measure time in metrics.
233pub struct MeterTimer {
234    meter: &'static dyn Meter,
235    start: Instant,
236}
237
238impl MeterTimer {
239    /// Call this to measure the time to run to the end of the current scope.
240    /// It will add the time from the function called till the returned
241    /// instance is dropped to `meter`.
242    pub fn time_func(meter: &'static dyn Meter) -> Self {
243        Self {
244            meter,
245            start: Instant::now(),
246        }
247    }
248}
249
250impl Drop for MeterTimer {
251    fn drop(&mut self) {
252        self.meter
253            .mark((Instant::now() - self.start).as_nanos() as usize)
254    }
255}