use jsonrpc_core as core;
use jsonrpc_core::futures::future::{Either, FutureExt};
use log::debug;
use order_stat;
use parking_lot::RwLock;
use std::{
fmt,
sync::{
atomic::{self, AtomicUsize},
Arc,
},
time,
};
const RATE_SECONDS: usize = 10;
const STATS_SAMPLES: usize = 60;
struct RateCalculator {
era: time::Instant,
samples: [u16; RATE_SECONDS],
}
impl fmt::Debug for RateCalculator {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{} req/s", self.rate())
}
}
impl Default for RateCalculator {
fn default() -> Self {
RateCalculator {
era: time::Instant::now(),
samples: [0; RATE_SECONDS],
}
}
}
impl RateCalculator {
fn elapsed(&self) -> u64 { self.era.elapsed().as_secs() }
pub fn tick(&mut self) -> u16 {
if self.elapsed() >= RATE_SECONDS as u64 {
self.era = time::Instant::now();
self.samples[0] = 0;
}
let pos = self.elapsed() as usize % RATE_SECONDS;
let next = (pos + 1) % RATE_SECONDS;
self.samples[next] = 0;
self.samples[pos] = self.samples[pos].saturating_add(1);
self.samples[pos]
}
fn current_rate(&self) -> usize {
let now = match self.elapsed() {
i if i >= RATE_SECONDS as u64 => RATE_SECONDS,
i => i as usize + 1,
};
let sum: usize = self.samples[0..now].iter().map(|x| *x as usize).sum();
sum / now
}
pub fn rate(&self) -> usize {
if self.elapsed() > RATE_SECONDS as u64 {
0
} else {
self.current_rate()
}
}
}
struct StatsCalculator<T = u32> {
filled: bool,
idx: usize,
samples: [T; STATS_SAMPLES],
}
impl<T: Default + Copy> Default for StatsCalculator<T> {
fn default() -> Self {
StatsCalculator {
filled: false,
idx: 0,
samples: [T::default(); STATS_SAMPLES],
}
}
}
impl<T: fmt::Display + Default + Copy + Ord> fmt::Debug for StatsCalculator<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "median: {} ms", self.approximated_median())
}
}
impl<T: Default + Copy + Ord> StatsCalculator<T> {
pub fn add(&mut self, sample: T) {
self.idx += 1;
if self.idx >= STATS_SAMPLES {
self.filled = true;
self.idx = 0;
}
self.samples[self.idx] = sample;
}
pub fn approximated_median(&self) -> T {
let mut copy = [T::default(); STATS_SAMPLES];
copy.copy_from_slice(&self.samples);
let bound = if self.filled {
STATS_SAMPLES
} else {
self.idx + 1
};
let (_, &mut median) =
order_stat::median_of_medians(&mut copy[0..bound]);
median
}
}
#[derive(Default, Debug)]
pub struct RpcStats {
requests: RwLock<RateCalculator>,
roundtrips: RwLock<StatsCalculator<u128>>,
active_sessions: AtomicUsize,
}
impl RpcStats {
pub fn open_session(&self) {
self.active_sessions.fetch_add(1, atomic::Ordering::SeqCst);
}
pub fn close_session(&self) {
self.active_sessions.fetch_sub(1, atomic::Ordering::SeqCst);
}
pub fn count_request(&self) -> u16 { self.requests.write().tick() }
pub fn add_roundtrip(&self, microseconds: u128) {
self.roundtrips.write().add(microseconds)
}
pub fn sessions(&self) -> usize {
self.active_sessions.load(atomic::Ordering::Relaxed)
}
pub fn requests_rate(&self) -> usize { self.requests.read().rate() }
pub fn approximated_roundtrip(&self) -> u128 {
self.roundtrips.read().approximated_median()
}
}
pub trait ActivityNotifier: Send + Sync + 'static {
fn active(&self);
}
pub struct Middleware<T: ActivityNotifier = ClientNotifier> {
stats: Arc<RpcStats>,
notifier: T,
}
impl<T: ActivityNotifier> Middleware<T> {
pub fn new(stats: Arc<RpcStats>, notifier: T) -> Self {
Middleware { stats, notifier }
}
}
impl<M: core::Metadata, T: ActivityNotifier> core::Middleware<M>
for Middleware<T>
{
type CallFuture = core::middleware::NoopCallFuture;
type Future = core::FutureResponse;
fn on_request<F, X>(
&self, request: core::Request, meta: M, process: F,
) -> Either<Self::Future, X>
where
F: FnOnce(core::Request, M) -> X,
X: core::futures::Future<Output = Option<core::Response>>
+ Send
+ 'static,
{
let start = time::Instant::now();
self.notifier.active();
self.stats.count_request();
let id = match request {
core::Request::Single(core::Call::MethodCall(ref call)) => {
Some(call.id.clone())
}
_ => None,
};
let stats = self.stats.clone();
let future = process(request, meta).map(move |res| {
let time = start.elapsed().as_micros();
if time > 10_000 {
debug!(target: "rpc", "[{:?}] Took {}ms", id, time / 1_000);
}
stats.add_roundtrip(time);
res
});
Either::Left(future.boxed())
}
}
pub struct ClientNotifier {
}
impl ActivityNotifier for ClientNotifier {
fn active(&self) {
}
}
#[cfg(test)]
mod tests {
use super::{RateCalculator, RpcStats, StatsCalculator};
#[test]
fn should_calculate_rate() {
let mut avg = RateCalculator::default();
avg.tick();
avg.tick();
avg.tick();
let rate = avg.rate();
assert_eq!(rate, 3usize);
}
#[test]
fn should_approximate_median() {
let mut stats = StatsCalculator::default();
stats.add(5);
stats.add(100);
stats.add(3);
stats.add(15);
stats.add(20);
stats.add(6);
let median = stats.approximated_median();
assert_eq!(median, 5);
}
#[test]
fn should_count_rpc_stats() {
let stats = RpcStats::default();
assert_eq!(stats.sessions(), 0);
assert_eq!(stats.requests_rate(), 0);
assert_eq!(stats.approximated_roundtrip(), 0);
stats.open_session();
stats.close_session();
stats.open_session();
stats.count_request();
stats.count_request();
stats.add_roundtrip(125);
assert_eq!(stats.sessions(), 1);
assert_eq!(stats.requests_rate(), 2);
assert_eq!(stats.approximated_roundtrip(), 125);
}
#[test]
fn should_be_sync_and_send() {
let stats = RpcStats::default();
is_sync(stats);
}
fn is_sync<F: Send + Sync>(x: F) { drop(x) }
}