1use jsonrpc_core as core;
20use jsonrpc_core::futures::future::{Either, FutureExt};
21use log::debug;
22use order_stat;
23use parking_lot::RwLock;
24use std::{
25 fmt,
26 sync::{
27 atomic::{self, AtomicUsize},
28 Arc,
29 },
30 time,
31};
32
33const RATE_SECONDS: usize = 10;
34const STATS_SAMPLES: usize = 60;
35
36struct RateCalculator {
37 era: time::Instant,
38 samples: [u16; RATE_SECONDS],
39}
40
41impl fmt::Debug for RateCalculator {
42 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
43 write!(fmt, "{} req/s", self.rate())
44 }
45}
46
47impl Default for RateCalculator {
48 fn default() -> Self {
49 RateCalculator {
50 era: time::Instant::now(),
51 samples: [0; RATE_SECONDS],
52 }
53 }
54}
55
56impl RateCalculator {
57 fn elapsed(&self) -> u64 { self.era.elapsed().as_secs() }
58
59 pub fn tick(&mut self) -> u16 {
60 if self.elapsed() >= RATE_SECONDS as u64 {
61 self.era = time::Instant::now();
62 self.samples[0] = 0;
63 }
64
65 let pos = self.elapsed() as usize % RATE_SECONDS;
66 let next = (pos + 1) % RATE_SECONDS;
67 self.samples[next] = 0;
68 self.samples[pos] = self.samples[pos].saturating_add(1);
69 self.samples[pos]
70 }
71
72 fn current_rate(&self) -> usize {
73 let now = match self.elapsed() {
74 i if i >= RATE_SECONDS as u64 => RATE_SECONDS,
75 i => i as usize + 1,
76 };
77 let sum: usize = self.samples[0..now].iter().map(|x| *x as usize).sum();
78 sum / now
79 }
80
81 pub fn rate(&self) -> usize {
82 if self.elapsed() > RATE_SECONDS as u64 {
83 0
84 } else {
85 self.current_rate()
86 }
87 }
88}
89
90struct StatsCalculator<T = u32> {
91 filled: bool,
92 idx: usize,
93 samples: [T; STATS_SAMPLES],
94}
95
96impl<T: Default + Copy> Default for StatsCalculator<T> {
97 fn default() -> Self {
98 StatsCalculator {
99 filled: false,
100 idx: 0,
101 samples: [T::default(); STATS_SAMPLES],
102 }
103 }
104}
105
106impl<T: fmt::Display + Default + Copy + Ord> fmt::Debug for StatsCalculator<T> {
107 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
108 write!(fmt, "median: {} ms", self.approximated_median())
109 }
110}
111
112impl<T: Default + Copy + Ord> StatsCalculator<T> {
113 pub fn add(&mut self, sample: T) {
114 self.idx += 1;
115 if self.idx >= STATS_SAMPLES {
116 self.filled = true;
117 self.idx = 0;
118 }
119
120 self.samples[self.idx] = sample;
121 }
122
123 pub fn approximated_median(&self) -> T {
125 let mut copy = [T::default(); STATS_SAMPLES];
126 copy.copy_from_slice(&self.samples);
127 let bound = if self.filled {
128 STATS_SAMPLES
129 } else {
130 self.idx + 1
131 };
132
133 let (_, &mut median) =
134 order_stat::median_of_medians(&mut copy[0..bound]);
135 median
136 }
137}
138
139#[derive(Default, Debug)]
141pub struct RpcStats {
142 requests: RwLock<RateCalculator>,
143 roundtrips: RwLock<StatsCalculator<u128>>,
144 active_sessions: AtomicUsize,
145}
146
147impl RpcStats {
148 pub fn open_session(&self) {
150 self.active_sessions.fetch_add(1, atomic::Ordering::SeqCst);
151 }
152
153 pub fn close_session(&self) {
156 self.active_sessions.fetch_sub(1, atomic::Ordering::SeqCst);
157 }
158
159 pub fn count_request(&self) -> u16 { self.requests.write().tick() }
161
162 pub fn add_roundtrip(&self, microseconds: u128) {
164 self.roundtrips.write().add(microseconds)
165 }
166
167 pub fn sessions(&self) -> usize {
169 self.active_sessions.load(atomic::Ordering::Relaxed)
170 }
171
172 pub fn requests_rate(&self) -> usize { self.requests.read().rate() }
174
175 pub fn approximated_roundtrip(&self) -> u128 {
177 self.roundtrips.read().approximated_median()
178 }
179}
180
181pub trait ActivityNotifier: Send + Sync + 'static {
183 fn active(&self);
185}
186
187pub struct Middleware<T: ActivityNotifier = ClientNotifier> {
189 stats: Arc<RpcStats>,
190 notifier: T,
191}
192
193impl<T: ActivityNotifier> Middleware<T> {
194 pub fn new(stats: Arc<RpcStats>, notifier: T) -> Self {
196 Middleware { stats, notifier }
197 }
198}
199
200impl<M: core::Metadata, T: ActivityNotifier> core::Middleware<M>
201 for Middleware<T>
202{
203 type CallFuture = core::middleware::NoopCallFuture;
204 type Future = core::FutureResponse;
205
206 fn on_request<F, X>(
207 &self, request: core::Request, meta: M, process: F,
208 ) -> Either<Self::Future, X>
209 where
210 F: FnOnce(core::Request, M) -> X,
211 X: core::futures::Future<Output = Option<core::Response>>
212 + Send
213 + 'static,
214 {
215 let start = time::Instant::now();
216
217 self.notifier.active();
218 self.stats.count_request();
219
220 let id = match request {
221 core::Request::Single(core::Call::MethodCall(ref call)) => {
222 Some(call.id.clone())
223 }
224 _ => None,
225 };
226 let stats = self.stats.clone();
227
228 let future = process(request, meta).map(move |res| {
229 let time = start.elapsed().as_micros();
230 if time > 10_000 {
231 debug!(target: "rpc", "[{:?}] Took {}ms", id, time / 1_000);
232 }
233 stats.add_roundtrip(time);
234 res
235 });
236
237 Either::Left(future.boxed())
238 }
239}
240
241pub struct ClientNotifier {
243 }
245
246impl ActivityNotifier for ClientNotifier {
247 fn active(&self) {
248 }
250}
251
252#[cfg(test)]
253mod tests {
254 use super::{RateCalculator, RpcStats, StatsCalculator};
255
256 #[test]
257 fn should_calculate_rate() {
258 let mut avg = RateCalculator::default();
260
261 avg.tick();
263 avg.tick();
264 avg.tick();
265 let rate = avg.rate();
266
267 assert_eq!(rate, 3usize);
269 }
270
271 #[test]
272 fn should_approximate_median() {
273 let mut stats = StatsCalculator::default();
275 stats.add(5);
276 stats.add(100);
277 stats.add(3);
278 stats.add(15);
279 stats.add(20);
280 stats.add(6);
281
282 let median = stats.approximated_median();
284
285 assert_eq!(median, 5);
287 }
288
289 #[test]
290 fn should_count_rpc_stats() {
291 let stats = RpcStats::default();
293 assert_eq!(stats.sessions(), 0);
294 assert_eq!(stats.requests_rate(), 0);
295 assert_eq!(stats.approximated_roundtrip(), 0);
296
297 stats.open_session();
299 stats.close_session();
300 stats.open_session();
301 stats.count_request();
302 stats.count_request();
303 stats.add_roundtrip(125);
304
305 assert_eq!(stats.sessions(), 1);
307 assert_eq!(stats.requests_rate(), 2);
308 assert_eq!(stats.approximated_roundtrip(), 125);
309 }
310
311 #[test]
312 fn should_be_sync_and_send() {
313 let stats = RpcStats::default();
314 is_sync(stats);
315 }
316
317 fn is_sync<F: Send + Sync>(x: F) { drop(x) }
318}