cfxcore/sync/request_manager/
request_batcher.rs

1use crate::{
2    sync::{
3        message::{GetBlockHashesByEpoch, GetBlockHeaders, GetBlocks},
4        request_manager::Request,
5    },
6    NodeType,
7};
8use cfx_types::H256;
9use std::{cmp::min, time::Duration};
10
11const DEFAULT_REQUEST_BATCHER_BUCKET_NUMBER: usize = 30;
12const DEFAULT_REQUEST_HEADER_BATCH_SIZE: usize = 100;
13const DEFAULT_REQUEST_BLOCK_BATCH_SIZE: usize = 50;
14const DEFAULT_REQUEST_EPOCH_BATCH_SIZE: usize = 10;
15
16/// Batch requests that are going to be resent.
17/// Resent requests include: `GetBlockHeaders`, `GetBlockTxn`,
18/// `GetBlockHashesByEpoch`, and `GetBlocks`. `GetBlockTxn` only includes one
19/// block so cannot be batched.
20/// TODO: Exclude failing peers for requests and batch based on peers.
21pub struct RequestBatcher {
22    /// Hashes in header requests
23    headers: DelayBucket<H256>,
24
25    /// Hashes in block requests
26    /// TODO Handle with_public.
27    blocks: DelayBucket<H256>,
28
29    /// Epoch numbers in epoch requests
30    epochs: DelayBucket<u64>,
31
32    original_requests: Vec<(Duration, Box<dyn Request>)>,
33}
34
35impl RequestBatcher {
36    pub fn new(bucket_size: Duration) -> Self {
37        Self {
38            headers: DelayBucket::new(
39                DEFAULT_REQUEST_BATCHER_BUCKET_NUMBER,
40                bucket_size,
41            ),
42            blocks: DelayBucket::new(
43                DEFAULT_REQUEST_BATCHER_BUCKET_NUMBER,
44                bucket_size,
45            ),
46            epochs: DelayBucket::new(
47                DEFAULT_REQUEST_BATCHER_BUCKET_NUMBER,
48                bucket_size,
49            ),
50            original_requests: Vec::new(),
51        }
52    }
53
54    /// Insert request and its delay into this batcher
55    /// TODO Remove these downcast.
56    pub fn insert(&mut self, delay: Duration, mut request: Box<dyn Request>) {
57        if let Some(header_req) =
58            request.as_any_mut().downcast_mut::<GetBlockHeaders>()
59        {
60            self.headers.insert(&mut header_req.hashes, delay);
61        } else if let Some(block_req) =
62            request.as_any_mut().downcast_mut::<GetBlocks>()
63        {
64            self.blocks.insert(&mut block_req.hashes, delay);
65        } else if let Some(epoch_req) =
66            request.as_any_mut().downcast_mut::<GetBlockHashesByEpoch>()
67        {
68            self.epochs.insert(&mut epoch_req.epochs, delay);
69        } else {
70            self.original_requests.push((delay, request));
71        }
72    }
73
74    /// Batch inserted requests according to their request types.
75    /// Requests with close delays are batched together.
76    pub fn get_batched_requests(
77        mut self, prefer_archive_node_for_blocks: bool,
78    ) -> impl Iterator<Item = (Duration, Box<dyn Request>)> {
79        let mut requests = Vec::new();
80        for (delay, hashes) in
81            self.headers.batch_iter(DEFAULT_REQUEST_HEADER_BATCH_SIZE)
82        {
83            requests.push((
84                delay,
85                Box::new(GetBlockHeaders {
86                    request_id: 0,
87                    hashes,
88                }) as Box<dyn Request>,
89            ));
90        }
91        let preferred_node_type = if prefer_archive_node_for_blocks {
92            Some(NodeType::Archive)
93        } else {
94            None
95        };
96        for (delay, hashes) in
97            self.blocks.batch_iter(DEFAULT_REQUEST_BLOCK_BATCH_SIZE)
98        {
99            requests.push((
100                delay,
101                Box::new(GetBlocks {
102                    request_id: 0,
103                    with_public: false,
104                    hashes,
105                    preferred_node_type: preferred_node_type.clone(),
106                }) as Box<dyn Request>,
107            ));
108        }
109        for (delay, epochs) in
110            self.epochs.batch_iter(DEFAULT_REQUEST_EPOCH_BATCH_SIZE)
111        {
112            requests.push((
113                delay,
114                Box::new(GetBlockHashesByEpoch {
115                    request_id: 0,
116                    epochs,
117                }) as Box<dyn Request>,
118            ));
119        }
120        requests.append(&mut self.original_requests);
121        requests.into_iter()
122    }
123}
124
125/// Store requests with close delay to the same buckets.
126struct DelayBucket<T> {
127    /// Each bucket keeps the sum of request delays and the flattened request
128    /// set in the bucket.
129    buckets: Vec<(Duration, Vec<T>)>,
130
131    /// requests with delay in [i*bucket_size, (i+1)*bucket_size) is stored in
132    /// buckets i
133    bucket_size: Duration,
134}
135
136impl<T> DelayBucket<T> {
137    fn new(bucket_number: usize, bucket_size: Duration) -> Self {
138        let mut buckets = Vec::with_capacity(bucket_number);
139        for _ in 0..bucket_number {
140            buckets.push((Duration::default(), Vec::new()));
141        }
142        Self {
143            buckets,
144            bucket_size,
145        }
146    }
147
148    fn insert(&mut self, new_requests: &mut Vec<T>, delay: Duration) {
149        let bucket_index = min(
150            (delay.as_millis() / self.bucket_size.as_millis()) as usize,
151            self.buckets.len() - 1,
152        );
153        let (delay_sum, requests) = &mut self.buckets[bucket_index];
154        *delay_sum += delay * new_requests.len() as u32;
155        requests.append(new_requests);
156    }
157}
158
159impl<T: Clone> DelayBucket<T> {
160    /// Return the batched requests with the given batch_size.
161    /// The delay is the average of all requests in this bucket.
162    fn batch_iter(
163        &self, batch_size: usize,
164    ) -> impl Iterator<Item = (Duration, Vec<T>)> {
165        let mut batches = Vec::new();
166        for (delay_sum, bucket) in &self.buckets {
167            if bucket.is_empty() {
168                continue;
169            }
170            let delay = *delay_sum / bucket.len() as u32;
171            for batch in bucket.chunks(batch_size) {
172                batches.push((delay, batch.to_vec()));
173            }
174        }
175        batches.into_iter()
176    }
177}
178
179#[test]
180fn test_bucket_batch() {
181    let mut bucket = DelayBucket::<usize>::new(5, Duration::from_secs(1));
182    bucket.insert(&mut vec![1, 2, 3, 4, 5], Duration::from_millis(500));
183    let mut iter = bucket.batch_iter(2);
184    let item1 = iter.next();
185    assert!(item1.is_some());
186    let item1 = item1.unwrap();
187    assert_eq!(item1.0, Duration::from_millis(500));
188    assert_eq!(item1.1, vec![1, 2]);
189    iter.next();
190    let item3 = iter.next();
191    assert!(item3.is_some());
192    assert_eq!(item3.unwrap().1, vec![5]);
193    let item4 = iter.next();
194    assert!(item4.is_none());
195}