client/rpc/
informant.rs

1// Copyright 2015-2019 Parity Technologies (UK) Ltd.
2// This file is part of Parity Ethereum.
3
4// Parity Ethereum is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Parity Ethereum is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Parity Ethereum.  If not, see <http://www.gnu.org/licenses/>.
16
17//! RPC Requests Statistics
18
19use jsonrpc_core as core;
20use jsonrpc_core::futures::future::{Either, FutureExt};
21use log::debug;
22use order_stat;
23use parking_lot::RwLock;
24use std::{
25    fmt,
26    sync::{
27        atomic::{self, AtomicUsize},
28        Arc,
29    },
30    time,
31};
32
33const RATE_SECONDS: usize = 10;
34const STATS_SAMPLES: usize = 60;
35
36struct RateCalculator {
37    era: time::Instant,
38    samples: [u16; RATE_SECONDS],
39}
40
41impl fmt::Debug for RateCalculator {
42    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
43        write!(fmt, "{} req/s", self.rate())
44    }
45}
46
47impl Default for RateCalculator {
48    fn default() -> Self {
49        RateCalculator {
50            era: time::Instant::now(),
51            samples: [0; RATE_SECONDS],
52        }
53    }
54}
55
56impl RateCalculator {
57    fn elapsed(&self) -> u64 { self.era.elapsed().as_secs() }
58
59    pub fn tick(&mut self) -> u16 {
60        if self.elapsed() >= RATE_SECONDS as u64 {
61            self.era = time::Instant::now();
62            self.samples[0] = 0;
63        }
64
65        let pos = self.elapsed() as usize % RATE_SECONDS;
66        let next = (pos + 1) % RATE_SECONDS;
67        self.samples[next] = 0;
68        self.samples[pos] = self.samples[pos].saturating_add(1);
69        self.samples[pos]
70    }
71
72    fn current_rate(&self) -> usize {
73        let now = match self.elapsed() {
74            i if i >= RATE_SECONDS as u64 => RATE_SECONDS,
75            i => i as usize + 1,
76        };
77        let sum: usize = self.samples[0..now].iter().map(|x| *x as usize).sum();
78        sum / now
79    }
80
81    pub fn rate(&self) -> usize {
82        if self.elapsed() > RATE_SECONDS as u64 {
83            0
84        } else {
85            self.current_rate()
86        }
87    }
88}
89
90struct StatsCalculator<T = u32> {
91    filled: bool,
92    idx: usize,
93    samples: [T; STATS_SAMPLES],
94}
95
96impl<T: Default + Copy> Default for StatsCalculator<T> {
97    fn default() -> Self {
98        StatsCalculator {
99            filled: false,
100            idx: 0,
101            samples: [T::default(); STATS_SAMPLES],
102        }
103    }
104}
105
106impl<T: fmt::Display + Default + Copy + Ord> fmt::Debug for StatsCalculator<T> {
107    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
108        write!(fmt, "median: {} ms", self.approximated_median())
109    }
110}
111
112impl<T: Default + Copy + Ord> StatsCalculator<T> {
113    pub fn add(&mut self, sample: T) {
114        self.idx += 1;
115        if self.idx >= STATS_SAMPLES {
116            self.filled = true;
117            self.idx = 0;
118        }
119
120        self.samples[self.idx] = sample;
121    }
122
123    /// Returns approximate of media
124    pub fn approximated_median(&self) -> T {
125        let mut copy = [T::default(); STATS_SAMPLES];
126        copy.copy_from_slice(&self.samples);
127        let bound = if self.filled {
128            STATS_SAMPLES
129        } else {
130            self.idx + 1
131        };
132
133        let (_, &mut median) =
134            order_stat::median_of_medians(&mut copy[0..bound]);
135        median
136    }
137}
138
139/// RPC Statistics
140#[derive(Default, Debug)]
141pub struct RpcStats {
142    requests: RwLock<RateCalculator>,
143    roundtrips: RwLock<StatsCalculator<u128>>,
144    active_sessions: AtomicUsize,
145}
146
147impl RpcStats {
148    /// Count session opened
149    pub fn open_session(&self) {
150        self.active_sessions.fetch_add(1, atomic::Ordering::SeqCst);
151    }
152
153    /// Count session closed.
154    /// Silently overflows if closing unopened session.
155    pub fn close_session(&self) {
156        self.active_sessions.fetch_sub(1, atomic::Ordering::SeqCst);
157    }
158
159    /// Count request. Returns number of requests in current second.
160    pub fn count_request(&self) -> u16 { self.requests.write().tick() }
161
162    /// Add roundtrip time (microseconds)
163    pub fn add_roundtrip(&self, microseconds: u128) {
164        self.roundtrips.write().add(microseconds)
165    }
166
167    /// Returns number of open sessions
168    pub fn sessions(&self) -> usize {
169        self.active_sessions.load(atomic::Ordering::Relaxed)
170    }
171
172    /// Returns requests rate
173    pub fn requests_rate(&self) -> usize { self.requests.read().rate() }
174
175    /// Returns approximated roundtrip in microseconds
176    pub fn approximated_roundtrip(&self) -> u128 {
177        self.roundtrips.read().approximated_median()
178    }
179}
180
181/// Notifies about RPC activity.
182pub trait ActivityNotifier: Send + Sync + 'static {
183    /// Activity on RPC interface
184    fn active(&self);
185}
186
187/// Stats-counting RPC middleware
188pub struct Middleware<T: ActivityNotifier = ClientNotifier> {
189    stats: Arc<RpcStats>,
190    notifier: T,
191}
192
193impl<T: ActivityNotifier> Middleware<T> {
194    /// Create new Middleware with stats counter and activity notifier.
195    pub fn new(stats: Arc<RpcStats>, notifier: T) -> Self {
196        Middleware { stats, notifier }
197    }
198}
199
200impl<M: core::Metadata, T: ActivityNotifier> core::Middleware<M>
201    for Middleware<T>
202{
203    type CallFuture = core::middleware::NoopCallFuture;
204    type Future = core::FutureResponse;
205
206    fn on_request<F, X>(
207        &self, request: core::Request, meta: M, process: F,
208    ) -> Either<Self::Future, X>
209    where
210        F: FnOnce(core::Request, M) -> X,
211        X: core::futures::Future<Output = Option<core::Response>>
212            + Send
213            + 'static,
214    {
215        let start = time::Instant::now();
216
217        self.notifier.active();
218        self.stats.count_request();
219
220        let id = match request {
221            core::Request::Single(core::Call::MethodCall(ref call)) => {
222                Some(call.id.clone())
223            }
224            _ => None,
225        };
226        let stats = self.stats.clone();
227
228        let future = process(request, meta).map(move |res| {
229            let time = start.elapsed().as_micros();
230            if time > 10_000 {
231                debug!(target: "rpc", "[{:?}] Took {}ms", id, time / 1_000);
232            }
233            stats.add_roundtrip(time);
234            res
235        });
236
237        Either::Left(future.boxed())
238    }
239}
240
241/// Client Notifier
242pub struct ClientNotifier {
243    //    pub client: Arc<::ethcore::client::Client>,
244}
245
246impl ActivityNotifier for ClientNotifier {
247    fn active(&self) {
248        //        self.client.keep_alive()
249    }
250}
251
252#[cfg(test)]
253mod tests {
254    use super::{RateCalculator, RpcStats, StatsCalculator};
255
256    #[test]
257    fn should_calculate_rate() {
258        // given
259        let mut avg = RateCalculator::default();
260
261        // when
262        avg.tick();
263        avg.tick();
264        avg.tick();
265        let rate = avg.rate();
266
267        // then
268        assert_eq!(rate, 3usize);
269    }
270
271    #[test]
272    fn should_approximate_median() {
273        // given
274        let mut stats = StatsCalculator::default();
275        stats.add(5);
276        stats.add(100);
277        stats.add(3);
278        stats.add(15);
279        stats.add(20);
280        stats.add(6);
281
282        // when
283        let median = stats.approximated_median();
284
285        // then
286        assert_eq!(median, 5);
287    }
288
289    #[test]
290    fn should_count_rpc_stats() {
291        // given
292        let stats = RpcStats::default();
293        assert_eq!(stats.sessions(), 0);
294        assert_eq!(stats.requests_rate(), 0);
295        assert_eq!(stats.approximated_roundtrip(), 0);
296
297        // when
298        stats.open_session();
299        stats.close_session();
300        stats.open_session();
301        stats.count_request();
302        stats.count_request();
303        stats.add_roundtrip(125);
304
305        // then
306        assert_eq!(stats.sessions(), 1);
307        assert_eq!(stats.requests_rate(), 2);
308        assert_eq!(stats.approximated_roundtrip(), 125);
309    }
310
311    #[test]
312    fn should_be_sync_and_send() {
313        let stats = RpcStats::default();
314        is_sync(stats);
315    }
316
317    fn is_sync<F: Send + Sync>(x: F) { drop(x) }
318}