1use crate::{Error, ThrottlingReason};
6use byte_unit::n_mb_bytes;
7use cfx_util_macros::bail;
8use lazy_static::lazy_static;
9use log::{debug, error, info, trace};
10use metrics::{Gauge, GaugeUsize};
11use parking_lot::RwLock;
12use serde::Deserialize;
13use serde_derive::Serialize;
14use std::sync::Arc;
15
16lazy_static! {
17 pub static ref THROTTLING_SERVICE: RwLock<Service> =
18 RwLock::new(Service::new());
19 static ref QUEUE_SIZE_GAUGE: Arc<dyn Gauge<usize>> =
20 GaugeUsize::register_with_group(
21 "network_system_data",
22 "network_throttling_queue_size"
23 );
24 static ref HIGH_QUEUE_SIZE_GAUGE: Arc<dyn Gauge<usize>> =
25 GaugeUsize::register_with_group(
26 "network_system_data",
27 "high_throttling_queue_size"
28 );
29 static ref LOW_QUEUE_SIZE_GAUGE: Arc<dyn Gauge<usize>> =
30 GaugeUsize::register_with_group(
31 "network_system_data",
32 "low_throttling_queue_size"
33 );
34}
35
36#[derive(Debug, Serialize, Clone, Deserialize)]
49#[serde(rename_all = "camelCase")]
50pub struct Service {
51 queue_capacity: usize,
54 min_throttle_queue_size: usize,
60 max_throttle_queue_size: usize,
67 cur_queue_size: usize,
69 high_queue_size: usize,
70 low_queue_size: usize,
71}
72
73impl Service {
74 fn new() -> Service {
75 Service {
76 queue_capacity: n_mb_bytes!(256) as usize,
77 min_throttle_queue_size: n_mb_bytes!(10) as usize,
78 max_throttle_queue_size: n_mb_bytes!(64) as usize,
79 cur_queue_size: 0,
80 high_queue_size: 0,
81 low_queue_size: 0,
82 }
83 }
84
85 pub fn initialize(
87 &mut self, cap_mb: usize, min_throttle_mb: usize,
88 max_throttle_mb: usize,
89 ) {
90 assert!(cap_mb > max_throttle_mb);
92 assert!(max_throttle_mb > min_throttle_mb);
93 assert!(min_throttle_mb > 0);
94
95 let mb = n_mb_bytes!(1) as usize;
97 assert!(std::usize::MAX / mb >= cap_mb);
98
99 let cap = cap_mb * mb;
101 assert!(self.cur_queue_size <= cap);
102
103 self.queue_capacity = cap;
104 self.min_throttle_queue_size = min_throttle_mb * mb;
105 self.max_throttle_queue_size = max_throttle_mb * mb;
106
107 info!(
108 "throttling.initialize: min = {}M, max = {}M, cap = {}M",
109 min_throttle_mb, max_throttle_mb, cap_mb
110 );
111 }
112
113 pub(crate) fn on_enqueue(
116 &mut self, data_size: usize, is_high_priority: bool,
117 ) -> Result<usize, Error> {
118 if data_size > self.queue_capacity {
119 debug!("throttling.on_enqueue: enqueue too large data, data size = {}, queue capacity = {}", data_size, self.queue_capacity);
120 bail!(Error::Throttling(ThrottlingReason::QueueFull));
121 }
122
123 if self.cur_queue_size > self.queue_capacity - data_size {
124 debug!("throttling.on_enqueue: queue size not enough, data size = {}, queue size = {}", data_size, self.cur_queue_size);
125 bail!(Error::Throttling(ThrottlingReason::QueueFull));
126 }
127
128 self.cur_queue_size += data_size;
129 if is_high_priority {
130 self.high_queue_size += data_size
131 } else {
132 self.low_queue_size += data_size
133 }
134 trace!(
135 "throttling.on_enqueue: queue size = {}",
136 self.cur_queue_size
137 );
138
139 QUEUE_SIZE_GAUGE.update(self.cur_queue_size);
140 HIGH_QUEUE_SIZE_GAUGE.update(self.high_queue_size);
141 LOW_QUEUE_SIZE_GAUGE.update(self.low_queue_size);
142
143 Ok(self.cur_queue_size)
144 }
145
146 pub(crate) fn on_dequeue(
149 &mut self, data_size: usize, is_high_priority: bool,
150 ) -> usize {
151 if data_size > self.cur_queue_size {
152 error!("throttling.on_dequeue: dequeue too much data, data size = {}, queue size = {}", data_size, self.cur_queue_size);
153 self.cur_queue_size = 0;
154 self.high_queue_size = 0;
155 self.low_queue_size = 0;
156 } else {
157 self.cur_queue_size -= data_size;
158 if is_high_priority {
159 self.high_queue_size -= data_size
160 } else {
161 self.low_queue_size -= data_size
162 }
163 }
164
165 trace!(
166 "throttling.on_dequeue: queue size = {}",
167 self.cur_queue_size
168 );
169
170 QUEUE_SIZE_GAUGE.update(self.cur_queue_size);
171 HIGH_QUEUE_SIZE_GAUGE.update(self.high_queue_size);
172 LOW_QUEUE_SIZE_GAUGE.update(self.low_queue_size);
173
174 self.cur_queue_size
175 }
176
177 pub fn check_throttling(&self) -> Result<(), Error> {
181 if self.cur_queue_size > self.max_throttle_queue_size {
182 debug!("throttling.check_throttling: throttled, queue size = {}, max throttling size = {}", self.cur_queue_size, self.max_throttle_queue_size);
183 bail!(Error::Throttling(ThrottlingReason::Throttled));
184 }
185
186 Ok(())
187 }
188
189 pub fn get_throttling_ratio(&self) -> f64 {
201 if self.cur_queue_size <= self.min_throttle_queue_size {
202 return 1.0;
203 }
204
205 if self.cur_queue_size >= self.max_throttle_queue_size {
206 debug!("throttling.get_throttling_ratio: fully throttled, queue size = {}, max throttling size = {}", self.cur_queue_size, self.max_throttle_queue_size);
207 return 0.0;
208 }
209
210 let ratio = (self.max_throttle_queue_size - self.cur_queue_size) as f64
211 / (self.max_throttle_queue_size - self.min_throttle_queue_size)
212 as f64;
213
214 debug!("throttling.get_throttling_ratio: partially throttled, queue size = {}, throttling ratio = {}", self.cur_queue_size, ratio);
215
216 ratio
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 #[test]
223 fn test_enqueue() {
224 let mut service = super::Service::new();
225 assert_eq!(service.on_enqueue(10, false).unwrap(), 10);
226 assert_eq!(service.on_enqueue(20, false).unwrap(), 30);
227
228 assert_eq!(service.on_enqueue(0, false).unwrap(), 30);
230 }
231
232 #[test]
233 fn test_enqueue_too_large_data() {
234 let mut service = super::Service::new();
235 assert!(service.queue_capacity < std::usize::MAX);
236 assert!(service
237 .on_enqueue(service.queue_capacity + 1, false)
238 .is_err());
239 }
240
241 #[test]
242 fn test_enqueue_full() {
243 let mut service = super::Service::new();
244 assert!(service.on_enqueue(service.queue_capacity, false).is_ok());
245 assert!(service.on_enqueue(1, false).is_err());
246 }
247
248 #[test]
249 fn test_dequeue() {
250 let mut service = super::Service::new();
251 assert_eq!(service.on_enqueue(10, false).unwrap(), 10);
252 assert_eq!(service.on_dequeue(6, false), 4);
253 assert_eq!(service.on_dequeue(3, false), 1);
254
255 assert_eq!(service.on_dequeue(2, false), 0);
257 }
258
259 #[test]
260 fn test_throttle() {
261 let mut service = super::Service::new();
262
263 assert!(service.check_throttling().is_ok());
265
266 let max = service.max_throttle_queue_size;
268 assert_eq!(service.on_enqueue(max + 1, false).unwrap(), max + 1);
269 assert!(service.check_throttling().is_err());
270
271 assert_eq!(service.on_dequeue(1, false), max);
273 assert!(service.check_throttling().is_ok());
274 }
275
276 #[test]
277 fn test_get_throttling_ratio() {
278 let mut service = super::Service::new();
279
280 assert_throttling_ratio(&service, 100);
282
283 let min = service.min_throttle_queue_size;
285 assert_eq!(service.on_enqueue(min - 1, false).unwrap(), min - 1);
286 assert_throttling_ratio(&service, 100);
287 assert_eq!(service.on_enqueue(1, false).unwrap(), min);
288 assert_throttling_ratio(&service, 100);
289
290 assert_eq!(service.on_dequeue(min, false), 0);
292 let max = service.max_throttle_queue_size;
293 assert_eq!(service.on_enqueue(max, false).unwrap(), max);
294 assert_throttling_ratio(&service, 0);
295 assert_eq!(service.on_enqueue(1, false).unwrap(), max + 1);
296 assert_throttling_ratio(&service, 0);
297
298 assert_eq!(service.on_dequeue(max + 1, false), 0);
300 assert!(service.on_enqueue(min + (max - min) / 2, false).is_ok());
301 assert_throttling_ratio(&service, 50);
302 }
303
304 fn assert_throttling_ratio(service: &super::Service, percentage: usize) {
305 assert!(percentage <= 100);
306 let ratio = service.get_throttling_ratio() * 100.0;
307 assert_eq!(ratio as usize, percentage);
308 }
309}