1use crate::{
6 ewma::EWMA,
7 metrics::{is_enabled, Metric, ORDER},
8 registry::{DEFAULT_GROUPING_REGISTRY, DEFAULT_REGISTRY},
9};
10use chrono::Duration;
11use lazy_static::lazy_static;
12use parking_lot::{Mutex, RwLock};
13use std::{
14 collections::HashMap,
15 sync::{atomic::AtomicBool, Arc},
16 time::Instant,
17};
18use timer::Timer;
19
20const TICK_INTERVAL_SECS: i64 = 5;
22
23pub trait Meter: Send + Sync {
26 fn count(&self) -> usize { 0 }
27 fn mark(&self, _n: usize) {}
28 fn rate1(&self) -> f64 { 0.0 }
29 fn rate5(&self) -> f64 { 0.0 }
30 fn rate15(&self) -> f64 { 0.0 }
31 fn rate_mean(&self) -> f64 { 0.0 }
32 fn rate_m0(&self) -> f64 { 0.0 }
35 fn snapshot(&self) -> Arc<dyn Meter> { Arc::new(MeterSnapshot::default()) }
36 fn stop(&self) {}
37}
38
39struct NoopMeter;
40impl Meter for NoopMeter {}
41
42pub fn register_meter(name: &str) -> Arc<dyn Meter> {
43 if !is_enabled() {
44 return Arc::new(NoopMeter);
45 }
46
47 let meter = Arc::new(StandardMeter::new(name.into()));
48 DEFAULT_REGISTRY
49 .write()
50 .register(name.into(), meter.clone());
51 ARBITER.meters.lock().insert(name.into(), meter.clone());
52
53 meter
54}
55
56pub fn register_meter_with_group(group: &str, name: &str) -> Arc<dyn Meter> {
57 if !is_enabled() {
58 return Arc::new(NoopMeter);
59 }
60
61 let mut full_meter_name = String::from(group);
62 full_meter_name.push('_');
63 full_meter_name.push_str(name);
64
65 let meter = Arc::new(StandardMeter::new(full_meter_name.clone()));
66 DEFAULT_GROUPING_REGISTRY.write().register(
67 group.into(),
68 name.into(),
69 meter.clone(),
70 );
71
72 let mut meters = ARBITER.meters.lock();
73 assert_eq!(meters.contains_key(&full_meter_name), false);
74 meters.insert(full_meter_name, meter.clone());
75
76 meter
77}
78
79#[derive(Default, Clone)]
80struct MeterSnapshot {
81 count: usize,
82 rates: [u64; 5],
84 last_tick_count: usize,
86}
87
88impl Meter for MeterSnapshot {
89 fn count(&self) -> usize { self.count }
90
91 fn rate1(&self) -> f64 { f64::from_bits(self.rates[0]) }
92
93 fn rate5(&self) -> f64 { f64::from_bits(self.rates[1]) }
94
95 fn rate15(&self) -> f64 { f64::from_bits(self.rates[2]) }
96
97 fn rate_mean(&self) -> f64 { f64::from_bits(self.rates[3]) }
98
99 fn rate_m0(&self) -> f64 { f64::from_bits(self.rates[4]) }
100
101 fn snapshot(&self) -> Arc<dyn Meter> { Arc::new(self.clone()) }
102}
103
104pub struct StandardMeter {
105 name: String,
106 snapshot: RwLock<MeterSnapshot>,
107 ewmas: [EWMA; 3],
108 start_time: Instant,
109 stopped: AtomicBool,
110}
111
112impl StandardMeter {
113 pub fn new(name: String) -> Self {
114 StandardMeter {
115 name,
116 snapshot: RwLock::new(MeterSnapshot::default()),
117 ewmas: [EWMA::new(1.0), EWMA::new(5.0), EWMA::new(15.0)],
118 start_time: Instant::now(),
119 stopped: AtomicBool::new(false),
120 }
121 }
122
123 fn tick(&self) {
124 let mut snapshot = self.snapshot.write();
125
126 for i in 0..3 {
127 self.ewmas[i].tick();
128 snapshot.rates[i] = f64::to_bits(self.ewmas[i].rate());
129 }
130
131 let rate_mean_nano =
132 snapshot.count as f64 / self.start_time.elapsed().as_nanos() as f64;
133 snapshot.rates[3] = f64::to_bits(rate_mean_nano * 1e9);
134
135 let m0_rate = (snapshot.count - snapshot.last_tick_count) as f64
137 / TICK_INTERVAL_SECS as f64;
138 snapshot.last_tick_count = snapshot.count;
139 snapshot.rates[4] = f64::to_bits(m0_rate);
140 }
141}
142
143impl Meter for StandardMeter {
144 fn count(&self) -> usize { self.snapshot.read().count }
145
146 fn mark(&self, n: usize) {
147 if self.stopped.load(ORDER) {
148 return;
149 }
150
151 let mut snapshot = self.snapshot.write();
152 snapshot.count += n;
153
154 self.ewmas[0].update(n);
155 self.ewmas[1].update(n);
156 self.ewmas[2].update(n);
157
158 let rate_mean_nano =
159 snapshot.count as f64 / self.start_time.elapsed().as_nanos() as f64;
160 snapshot.rates[3] = f64::to_bits(rate_mean_nano * 1e9);
161 }
162
163 fn rate1(&self) -> f64 { f64::from_bits(self.snapshot.read().rates[0]) }
164
165 fn rate5(&self) -> f64 { f64::from_bits(self.snapshot.read().rates[1]) }
166
167 fn rate15(&self) -> f64 { f64::from_bits(self.snapshot.read().rates[2]) }
168
169 fn rate_mean(&self) -> f64 { f64::from_bits(self.snapshot.read().rates[3]) }
170
171 fn rate_m0(&self) -> f64 { f64::from_bits(self.snapshot.read().rates[4]) }
172
173 fn snapshot(&self) -> Arc<dyn Meter> {
174 Arc::new(self.snapshot.read().clone())
175 }
176
177 fn stop(&self) {
178 if let Ok(false) =
179 self.stopped.compare_exchange(false, true, ORDER, ORDER)
180 {
181 ARBITER.meters.lock().remove(&self.name);
182 }
183 }
184}
185
186impl Metric for StandardMeter {
187 fn get_type(&self) -> &str { "Meter" }
188}
189
190impl Drop for StandardMeter {
191 fn drop(&mut self) { self.stop(); }
192}
193
194lazy_static! {
195 static ref ARBITER: MeterArbiter = MeterArbiter::default();
196}
197
198struct MeterArbiter {
201 meters: Arc<Mutex<HashMap<String, Arc<StandardMeter>>>>,
202 timer: Timer,
203}
204
205unsafe impl Send for MeterArbiter {}
206unsafe impl Sync for MeterArbiter {}
207
208impl Default for MeterArbiter {
209 fn default() -> Self {
210 let arbiter = MeterArbiter {
211 meters: Arc::new(Mutex::new(HashMap::new())),
212 timer: Timer::new(),
213 };
214
215 let meters = arbiter.meters.clone();
216 arbiter
217 .timer
218 .schedule_repeating(
219 Duration::seconds(TICK_INTERVAL_SECS),
220 move || {
221 for (_, meter) in meters.lock().iter() {
222 meter.tick();
223 }
224 },
225 )
226 .ignore();
227
228 arbiter
229 }
230}
231
232pub struct MeterTimer {
234 meter: &'static dyn Meter,
235 start: Instant,
236}
237
238impl MeterTimer {
239 pub fn time_func(meter: &'static dyn Meter) -> Self {
243 Self {
244 meter,
245 start: Instant::now(),
246 }
247 }
248}
249
250impl Drop for MeterTimer {
251 fn drop(&mut self) {
252 self.meter
253 .mark((Instant::now() - self.start).as_nanos() as usize)
254 }
255}