use crate::{Error, ThrottlingReason};
use byte_unit::n_mb_bytes;
use cfx_util_macros::bail;
use lazy_static::lazy_static;
use log::{debug, error, info, trace};
use metrics::{Gauge, GaugeUsize};
use parking_lot::RwLock;
use serde::Deserialize;
use serde_derive::Serialize;
use std::sync::Arc;
lazy_static! {
pub static ref THROTTLING_SERVICE: RwLock<Service> =
RwLock::new(Service::new());
static ref QUEUE_SIZE_GAUGE: Arc<dyn Gauge<usize>> =
GaugeUsize::register_with_group(
"network_system_data",
"network_throttling_queue_size"
);
static ref HIGH_QUEUE_SIZE_GAUGE: Arc<dyn Gauge<usize>> =
GaugeUsize::register_with_group(
"network_system_data",
"high_throttling_queue_size"
);
static ref LOW_QUEUE_SIZE_GAUGE: Arc<dyn Gauge<usize>> =
GaugeUsize::register_with_group(
"network_system_data",
"low_throttling_queue_size"
);
}
#[derive(Debug, Serialize, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Service {
queue_capacity: usize,
min_throttle_queue_size: usize,
max_throttle_queue_size: usize,
cur_queue_size: usize,
high_queue_size: usize,
low_queue_size: usize,
}
impl Service {
fn new() -> Service {
Service {
queue_capacity: n_mb_bytes!(256) as usize,
min_throttle_queue_size: n_mb_bytes!(10) as usize,
max_throttle_queue_size: n_mb_bytes!(64) as usize,
cur_queue_size: 0,
high_queue_size: 0,
low_queue_size: 0,
}
}
pub fn initialize(
&mut self, cap_mb: usize, min_throttle_mb: usize,
max_throttle_mb: usize,
) {
assert!(cap_mb > max_throttle_mb);
assert!(max_throttle_mb > min_throttle_mb);
assert!(min_throttle_mb > 0);
let mb = n_mb_bytes!(1) as usize;
assert!(std::usize::MAX / mb >= cap_mb);
let cap = cap_mb * mb;
assert!(self.cur_queue_size <= cap);
self.queue_capacity = cap;
self.min_throttle_queue_size = min_throttle_mb * mb;
self.max_throttle_queue_size = max_throttle_mb * mb;
info!(
"throttling.initialize: min = {}M, max = {}M, cap = {}M",
min_throttle_mb, max_throttle_mb, cap_mb
);
}
pub(crate) fn on_enqueue(
&mut self, data_size: usize, is_high_priority: bool,
) -> Result<usize, Error> {
if data_size > self.queue_capacity {
debug!("throttling.on_enqueue: enqueue too large data, data size = {}, queue capacity = {}", data_size, self.queue_capacity);
bail!(Error::Throttling(ThrottlingReason::QueueFull));
}
if self.cur_queue_size > self.queue_capacity - data_size {
debug!("throttling.on_enqueue: queue size not enough, data size = {}, queue size = {}", data_size, self.cur_queue_size);
bail!(Error::Throttling(ThrottlingReason::QueueFull));
}
self.cur_queue_size += data_size;
if is_high_priority {
self.high_queue_size += data_size
} else {
self.low_queue_size += data_size
}
trace!(
"throttling.on_enqueue: queue size = {}",
self.cur_queue_size
);
QUEUE_SIZE_GAUGE.update(self.cur_queue_size);
HIGH_QUEUE_SIZE_GAUGE.update(self.high_queue_size);
LOW_QUEUE_SIZE_GAUGE.update(self.low_queue_size);
Ok(self.cur_queue_size)
}
pub(crate) fn on_dequeue(
&mut self, data_size: usize, is_high_priority: bool,
) -> usize {
if data_size > self.cur_queue_size {
error!("throttling.on_dequeue: dequeue too much data, data size = {}, queue size = {}", data_size, self.cur_queue_size);
self.cur_queue_size = 0;
self.high_queue_size = 0;
self.low_queue_size = 0;
} else {
self.cur_queue_size -= data_size;
if is_high_priority {
self.high_queue_size -= data_size
} else {
self.low_queue_size -= data_size
}
}
trace!(
"throttling.on_dequeue: queue size = {}",
self.cur_queue_size
);
QUEUE_SIZE_GAUGE.update(self.cur_queue_size);
HIGH_QUEUE_SIZE_GAUGE.update(self.high_queue_size);
LOW_QUEUE_SIZE_GAUGE.update(self.low_queue_size);
self.cur_queue_size
}
pub fn check_throttling(&self) -> Result<(), Error> {
if self.cur_queue_size > self.max_throttle_queue_size {
debug!("throttling.check_throttling: throttled, queue size = {}, max throttling size = {}", self.cur_queue_size, self.max_throttle_queue_size);
bail!(Error::Throttling(ThrottlingReason::Throttled));
}
Ok(())
}
pub fn get_throttling_ratio(&self) -> f64 {
if self.cur_queue_size <= self.min_throttle_queue_size {
return 1.0;
}
if self.cur_queue_size >= self.max_throttle_queue_size {
debug!("throttling.get_throttling_ratio: fully throttled, queue size = {}, max throttling size = {}", self.cur_queue_size, self.max_throttle_queue_size);
return 0.0;
}
let ratio = (self.max_throttle_queue_size - self.cur_queue_size) as f64
/ (self.max_throttle_queue_size - self.min_throttle_queue_size)
as f64;
debug!("throttling.get_throttling_ratio: partially throttled, queue size = {}, throttling ratio = {}", self.cur_queue_size, ratio);
ratio
}
}
#[cfg(test)]
mod tests {
#[test]
fn test_enqueue() {
let mut service = super::Service::new();
assert_eq!(service.on_enqueue(10, false).unwrap(), 10);
assert_eq!(service.on_enqueue(20, false).unwrap(), 30);
assert_eq!(service.on_enqueue(0, false).unwrap(), 30);
}
#[test]
fn test_enqueue_too_large_data() {
let mut service = super::Service::new();
assert!(service.queue_capacity < std::usize::MAX);
assert!(service
.on_enqueue(service.queue_capacity + 1, false)
.is_err());
}
#[test]
fn test_enqueue_full() {
let mut service = super::Service::new();
assert!(service.on_enqueue(service.queue_capacity, false).is_ok());
assert!(service.on_enqueue(1, false).is_err());
}
#[test]
fn test_dequeue() {
let mut service = super::Service::new();
assert_eq!(service.on_enqueue(10, false).unwrap(), 10);
assert_eq!(service.on_dequeue(6, false), 4);
assert_eq!(service.on_dequeue(3, false), 1);
assert_eq!(service.on_dequeue(2, false), 0);
}
#[test]
fn test_throttle() {
let mut service = super::Service::new();
assert!(service.check_throttling().is_ok());
let max = service.max_throttle_queue_size;
assert_eq!(service.on_enqueue(max + 1, false).unwrap(), max + 1);
assert!(service.check_throttling().is_err());
assert_eq!(service.on_dequeue(1, false), max);
assert!(service.check_throttling().is_ok());
}
#[test]
fn test_get_throttling_ratio() {
let mut service = super::Service::new();
assert_throttling_ratio(&service, 100);
let min = service.min_throttle_queue_size;
assert_eq!(service.on_enqueue(min - 1, false).unwrap(), min - 1);
assert_throttling_ratio(&service, 100);
assert_eq!(service.on_enqueue(1, false).unwrap(), min);
assert_throttling_ratio(&service, 100);
assert_eq!(service.on_dequeue(min, false), 0);
let max = service.max_throttle_queue_size;
assert_eq!(service.on_enqueue(max, false).unwrap(), max);
assert_throttling_ratio(&service, 0);
assert_eq!(service.on_enqueue(1, false).unwrap(), max + 1);
assert_throttling_ratio(&service, 0);
assert_eq!(service.on_dequeue(max + 1, false), 0);
assert!(service.on_enqueue(min + (max - min) / 2, false).is_ok());
assert_throttling_ratio(&service, 50);
}
fn assert_throttling_ratio(service: &super::Service, percentage: usize) {
assert!(percentage <= 100);
let ratio = service.get_throttling_ratio() * 100.0;
assert_eq!(ratio as usize, percentage);
}
}