network/
throttling.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::{Error, ThrottlingReason};
6use byte_unit::n_mb_bytes;
7use cfx_util_macros::bail;
8use lazy_static::lazy_static;
9use log::{debug, error, info, trace};
10use metrics::{Gauge, GaugeUsize};
11use parking_lot::RwLock;
12use serde::Deserialize;
13use serde_derive::Serialize;
14use std::sync::Arc;
15
16lazy_static! {
17    pub static ref THROTTLING_SERVICE: RwLock<Service> =
18        RwLock::new(Service::new());
19    static ref QUEUE_SIZE_GAUGE: Arc<dyn Gauge<usize>> =
20        GaugeUsize::register_with_group(
21            "network_system_data",
22            "network_throttling_queue_size"
23        );
24    static ref HIGH_QUEUE_SIZE_GAUGE: Arc<dyn Gauge<usize>> =
25        GaugeUsize::register_with_group(
26            "network_system_data",
27            "high_throttling_queue_size"
28        );
29    static ref LOW_QUEUE_SIZE_GAUGE: Arc<dyn Gauge<usize>> =
30        GaugeUsize::register_with_group(
31            "network_system_data",
32            "low_throttling_queue_size"
33        );
34}
35
36/// Throttling service is used to control the egress bandwidth, so as to avoid
37/// too much egress data cached in buffer.
38///
39/// The throttling is achieved by monitoring the message send queue size of all
40/// TCP sockets. Basically, the throttling is used in 2 ways:
41///
42/// 1. When the queue size reached the configured threshold, the synchronization
43/// layer will reduce the number of peers to broadcast messages, e.g. new block
44/// hashes, transaction digests.
45///
46/// 2. On the other hand, synchronization layer will also refuse to respond any
47/// size sensitive message, e.g. blocks.
48#[derive(Debug, Serialize, Clone, Deserialize)]
49#[serde(rename_all = "camelCase")]
50pub struct Service {
51    /// Maximum queue size.
52    /// When reached, the queue will refuse any new data.
53    queue_capacity: usize,
54    /// Minimum queue size for throttling in manner of ratio.
55    /// If queue size is less than `min_throttle_queue_size`, the throttling
56    /// does not work. Once queue size exceeds the `min_throttle_queue_size`,
57    /// the throttling begins to work in manner of linear ratio. Then, the
58    /// synchronization layer will broadcast messages to less peers.
59    min_throttle_queue_size: usize,
60    /// Maximum queue size for throttling in manner of ratio.
61    /// If queue size is between `min_throttle_queue_size` and
62    /// `max_throttle_queue_size`, the throttling works in manner of linear
63    /// ratio. Once queue size exceeds the `max_throttle_queue_size`, the
64    /// throttling not only works in ratio manner, but also blocks any
65    /// message size sensitive operations.
66    max_throttle_queue_size: usize,
67    /// Current queue size.
68    cur_queue_size: usize,
69    high_queue_size: usize,
70    low_queue_size: usize,
71}
72
73impl Service {
74    fn new() -> Service {
75        Service {
76            queue_capacity: n_mb_bytes!(256) as usize,
77            min_throttle_queue_size: n_mb_bytes!(10) as usize,
78            max_throttle_queue_size: n_mb_bytes!(64) as usize,
79            cur_queue_size: 0,
80            high_queue_size: 0,
81            low_queue_size: 0,
82        }
83    }
84
85    /// Initialize the throttling service.
86    pub fn initialize(
87        &mut self, cap_mb: usize, min_throttle_mb: usize,
88        max_throttle_mb: usize,
89    ) {
90        // 0 < min_throttle_mb < max_throttle_mb < cap_mb
91        assert!(cap_mb > max_throttle_mb);
92        assert!(max_throttle_mb > min_throttle_mb);
93        assert!(min_throttle_mb > 0);
94
95        // capacity cannot overflow with usize type.
96        let mb = n_mb_bytes!(1) as usize;
97        assert!(std::usize::MAX / mb >= cap_mb);
98
99        // ensure the current queue size will not exceed the capacity.
100        let cap = cap_mb * mb;
101        assert!(self.cur_queue_size <= cap);
102
103        self.queue_capacity = cap;
104        self.min_throttle_queue_size = min_throttle_mb * mb;
105        self.max_throttle_queue_size = max_throttle_mb * mb;
106
107        info!(
108            "throttling.initialize: min = {}M, max = {}M, cap = {}M",
109            min_throttle_mb, max_throttle_mb, cap_mb
110        );
111    }
112
113    /// Mark data enqueued with specified `data_size`, and return the new queue
114    /// size. If exceeds the queue capacity, return error with concrete reason.
115    pub(crate) fn on_enqueue(
116        &mut self, data_size: usize, is_high_priority: bool,
117    ) -> Result<usize, Error> {
118        if data_size > self.queue_capacity {
119            debug!("throttling.on_enqueue: enqueue too large data, data size = {}, queue capacity = {}", data_size, self.queue_capacity);
120            bail!(Error::Throttling(ThrottlingReason::QueueFull));
121        }
122
123        if self.cur_queue_size > self.queue_capacity - data_size {
124            debug!("throttling.on_enqueue: queue size not enough, data size = {}, queue size = {}", data_size, self.cur_queue_size);
125            bail!(Error::Throttling(ThrottlingReason::QueueFull));
126        }
127
128        self.cur_queue_size += data_size;
129        if is_high_priority {
130            self.high_queue_size += data_size
131        } else {
132            self.low_queue_size += data_size
133        }
134        trace!(
135            "throttling.on_enqueue: queue size = {}",
136            self.cur_queue_size
137        );
138
139        QUEUE_SIZE_GAUGE.update(self.cur_queue_size);
140        HIGH_QUEUE_SIZE_GAUGE.update(self.high_queue_size);
141        LOW_QUEUE_SIZE_GAUGE.update(self.low_queue_size);
142
143        Ok(self.cur_queue_size)
144    }
145
146    /// Mark data dequeued with specified `data_size`, and return the updated
147    /// queue size.
148    pub(crate) fn on_dequeue(
149        &mut self, data_size: usize, is_high_priority: bool,
150    ) -> usize {
151        if data_size > self.cur_queue_size {
152            error!("throttling.on_dequeue: dequeue too much data, data size = {}, queue size = {}", data_size, self.cur_queue_size);
153            self.cur_queue_size = 0;
154            self.high_queue_size = 0;
155            self.low_queue_size = 0;
156        } else {
157            self.cur_queue_size -= data_size;
158            if is_high_priority {
159                self.high_queue_size -= data_size
160            } else {
161                self.low_queue_size -= data_size
162            }
163        }
164
165        trace!(
166            "throttling.on_dequeue: queue size = {}",
167            self.cur_queue_size
168        );
169
170        QUEUE_SIZE_GAUGE.update(self.cur_queue_size);
171        HIGH_QUEUE_SIZE_GAUGE.update(self.high_queue_size);
172        LOW_QUEUE_SIZE_GAUGE.update(self.low_queue_size);
173
174        self.cur_queue_size
175    }
176
177    /// Validate the throttling queue size for any data size sensitive
178    /// operations. If the queue size exceeds the `max_throttle_queue_size`,
179    /// return error with concrete reason.
180    pub fn check_throttling(&self) -> Result<(), Error> {
181        if self.cur_queue_size > self.max_throttle_queue_size {
182            debug!("throttling.check_throttling: throttled, queue size = {}, max throttling size = {}", self.cur_queue_size, self.max_throttle_queue_size);
183            bail!(Error::Throttling(ThrottlingReason::Throttled));
184        }
185
186        Ok(())
187    }
188
189    /// Get the throttling ratio according to the current queue size.
190    ///
191    /// If the queue size is smaller than `min_throttle_queue_size`, return 1.0
192    /// as throttling ratio. Then, it allows to broadcast messages to all peers.
193    ///
194    /// If the queue size is larger than `max_throttle_queue_size`, return 0 as
195    /// throttling ratio. Then, it allows to broadcast messages to configured
196    /// minimum peers.
197    ///
198    /// Otherwise, the throttling works in manner of linear ratio (0, 1), in
199    /// which case it allows to broadcast messages to partial peers.
200    pub fn get_throttling_ratio(&self) -> f64 {
201        if self.cur_queue_size <= self.min_throttle_queue_size {
202            return 1.0;
203        }
204
205        if self.cur_queue_size >= self.max_throttle_queue_size {
206            debug!("throttling.get_throttling_ratio: fully throttled, queue size = {}, max throttling size = {}", self.cur_queue_size, self.max_throttle_queue_size);
207            return 0.0;
208        }
209
210        let ratio = (self.max_throttle_queue_size - self.cur_queue_size) as f64
211            / (self.max_throttle_queue_size - self.min_throttle_queue_size)
212                as f64;
213
214        debug!("throttling.get_throttling_ratio: partially throttled, queue size = {}, throttling ratio = {}", self.cur_queue_size, ratio);
215
216        ratio
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    #[test]
223    fn test_enqueue() {
224        let mut service = super::Service::new();
225        assert_eq!(service.on_enqueue(10, false).unwrap(), 10);
226        assert_eq!(service.on_enqueue(20, false).unwrap(), 30);
227
228        // enqueue data size is 0.
229        assert_eq!(service.on_enqueue(0, false).unwrap(), 30);
230    }
231
232    #[test]
233    fn test_enqueue_too_large_data() {
234        let mut service = super::Service::new();
235        assert!(service.queue_capacity < std::usize::MAX);
236        assert!(service
237            .on_enqueue(service.queue_capacity + 1, false)
238            .is_err());
239    }
240
241    #[test]
242    fn test_enqueue_full() {
243        let mut service = super::Service::new();
244        assert!(service.on_enqueue(service.queue_capacity, false).is_ok());
245        assert!(service.on_enqueue(1, false).is_err());
246    }
247
248    #[test]
249    fn test_dequeue() {
250        let mut service = super::Service::new();
251        assert_eq!(service.on_enqueue(10, false).unwrap(), 10);
252        assert_eq!(service.on_dequeue(6, false), 4);
253        assert_eq!(service.on_dequeue(3, false), 1);
254
255        // queue size not enough.
256        assert_eq!(service.on_dequeue(2, false), 0);
257    }
258
259    #[test]
260    fn test_throttle() {
261        let mut service = super::Service::new();
262
263        // not throttled by default.
264        assert!(service.check_throttling().is_ok());
265
266        // throttled once more than max_throttle_queue_size data queued.
267        let max = service.max_throttle_queue_size;
268        assert_eq!(service.on_enqueue(max + 1, false).unwrap(), max + 1);
269        assert!(service.check_throttling().is_err());
270
271        // not throttled after some data dequeued.
272        assert_eq!(service.on_dequeue(1, false), max);
273        assert!(service.check_throttling().is_ok());
274    }
275
276    #[test]
277    fn test_get_throttling_ratio() {
278        let mut service = super::Service::new();
279
280        // default ratio is 1.0
281        assert_throttling_ratio(&service, 100);
282
283        // no more than min_throttle_queue_size queued.
284        let min = service.min_throttle_queue_size;
285        assert_eq!(service.on_enqueue(min - 1, false).unwrap(), min - 1);
286        assert_throttling_ratio(&service, 100);
287        assert_eq!(service.on_enqueue(1, false).unwrap(), min);
288        assert_throttling_ratio(&service, 100);
289
290        // more than max_throttle_queue_size queued.
291        assert_eq!(service.on_dequeue(min, false), 0);
292        let max = service.max_throttle_queue_size;
293        assert_eq!(service.on_enqueue(max, false).unwrap(), max);
294        assert_throttling_ratio(&service, 0);
295        assert_eq!(service.on_enqueue(1, false).unwrap(), max + 1);
296        assert_throttling_ratio(&service, 0);
297
298        // partial throttled
299        assert_eq!(service.on_dequeue(max + 1, false), 0);
300        assert!(service.on_enqueue(min + (max - min) / 2, false).is_ok());
301        assert_throttling_ratio(&service, 50);
302    }
303
304    fn assert_throttling_ratio(service: &super::Service, percentage: usize) {
305        assert!(percentage <= 100);
306        let ratio = service.get_throttling_ratio() * 100.0;
307        assert_eq!(ratio as usize, percentage);
308    }
309}