cfxcore/sync/request_manager/
request_batcher.rs1use crate::{
2 sync::{
3 message::{GetBlockHashesByEpoch, GetBlockHeaders, GetBlocks},
4 request_manager::Request,
5 },
6 NodeType,
7};
8use cfx_types::H256;
9use std::{cmp::min, time::Duration};
10
11const DEFAULT_REQUEST_BATCHER_BUCKET_NUMBER: usize = 30;
12const DEFAULT_REQUEST_HEADER_BATCH_SIZE: usize = 100;
13const DEFAULT_REQUEST_BLOCK_BATCH_SIZE: usize = 50;
14const DEFAULT_REQUEST_EPOCH_BATCH_SIZE: usize = 10;
15
16pub struct RequestBatcher {
22 headers: DelayBucket<H256>,
24
25 blocks: DelayBucket<H256>,
28
29 epochs: DelayBucket<u64>,
31
32 original_requests: Vec<(Duration, Box<dyn Request>)>,
33}
34
35impl RequestBatcher {
36 pub fn new(bucket_size: Duration) -> Self {
37 Self {
38 headers: DelayBucket::new(
39 DEFAULT_REQUEST_BATCHER_BUCKET_NUMBER,
40 bucket_size,
41 ),
42 blocks: DelayBucket::new(
43 DEFAULT_REQUEST_BATCHER_BUCKET_NUMBER,
44 bucket_size,
45 ),
46 epochs: DelayBucket::new(
47 DEFAULT_REQUEST_BATCHER_BUCKET_NUMBER,
48 bucket_size,
49 ),
50 original_requests: Vec::new(),
51 }
52 }
53
54 pub fn insert(&mut self, delay: Duration, mut request: Box<dyn Request>) {
57 if let Some(header_req) =
58 request.as_any_mut().downcast_mut::<GetBlockHeaders>()
59 {
60 self.headers.insert(&mut header_req.hashes, delay);
61 } else if let Some(block_req) =
62 request.as_any_mut().downcast_mut::<GetBlocks>()
63 {
64 self.blocks.insert(&mut block_req.hashes, delay);
65 } else if let Some(epoch_req) =
66 request.as_any_mut().downcast_mut::<GetBlockHashesByEpoch>()
67 {
68 self.epochs.insert(&mut epoch_req.epochs, delay);
69 } else {
70 self.original_requests.push((delay, request));
71 }
72 }
73
74 pub fn get_batched_requests(
77 mut self, prefer_archive_node_for_blocks: bool,
78 ) -> impl Iterator<Item = (Duration, Box<dyn Request>)> {
79 let mut requests = Vec::new();
80 for (delay, hashes) in
81 self.headers.batch_iter(DEFAULT_REQUEST_HEADER_BATCH_SIZE)
82 {
83 requests.push((
84 delay,
85 Box::new(GetBlockHeaders {
86 request_id: 0,
87 hashes,
88 }) as Box<dyn Request>,
89 ));
90 }
91 let preferred_node_type = if prefer_archive_node_for_blocks {
92 Some(NodeType::Archive)
93 } else {
94 None
95 };
96 for (delay, hashes) in
97 self.blocks.batch_iter(DEFAULT_REQUEST_BLOCK_BATCH_SIZE)
98 {
99 requests.push((
100 delay,
101 Box::new(GetBlocks {
102 request_id: 0,
103 with_public: false,
104 hashes,
105 preferred_node_type: preferred_node_type.clone(),
106 }) as Box<dyn Request>,
107 ));
108 }
109 for (delay, epochs) in
110 self.epochs.batch_iter(DEFAULT_REQUEST_EPOCH_BATCH_SIZE)
111 {
112 requests.push((
113 delay,
114 Box::new(GetBlockHashesByEpoch {
115 request_id: 0,
116 epochs,
117 }) as Box<dyn Request>,
118 ));
119 }
120 requests.append(&mut self.original_requests);
121 requests.into_iter()
122 }
123}
124
125struct DelayBucket<T> {
127 buckets: Vec<(Duration, Vec<T>)>,
130
131 bucket_size: Duration,
134}
135
136impl<T> DelayBucket<T> {
137 fn new(bucket_number: usize, bucket_size: Duration) -> Self {
138 let mut buckets = Vec::with_capacity(bucket_number);
139 for _ in 0..bucket_number {
140 buckets.push((Duration::default(), Vec::new()));
141 }
142 Self {
143 buckets,
144 bucket_size,
145 }
146 }
147
148 fn insert(&mut self, new_requests: &mut Vec<T>, delay: Duration) {
149 let bucket_index = min(
150 (delay.as_millis() / self.bucket_size.as_millis()) as usize,
151 self.buckets.len() - 1,
152 );
153 let (delay_sum, requests) = &mut self.buckets[bucket_index];
154 *delay_sum += delay * new_requests.len() as u32;
155 requests.append(new_requests);
156 }
157}
158
159impl<T: Clone> DelayBucket<T> {
160 fn batch_iter(
163 &self, batch_size: usize,
164 ) -> impl Iterator<Item = (Duration, Vec<T>)> {
165 let mut batches = Vec::new();
166 for (delay_sum, bucket) in &self.buckets {
167 if bucket.is_empty() {
168 continue;
169 }
170 let delay = *delay_sum / bucket.len() as u32;
171 for batch in bucket.chunks(batch_size) {
172 batches.push((delay, batch.to_vec()));
173 }
174 }
175 batches.into_iter()
176 }
177}
178
179#[test]
180fn test_bucket_batch() {
181 let mut bucket = DelayBucket::<usize>::new(5, Duration::from_secs(1));
182 bucket.insert(&mut vec![1, 2, 3, 4, 5], Duration::from_millis(500));
183 let mut iter = bucket.batch_iter(2);
184 let item1 = iter.next();
185 assert!(item1.is_some());
186 let item1 = item1.unwrap();
187 assert_eq!(item1.0, Duration::from_millis(500));
188 assert_eq!(item1.1, vec![1, 2]);
189 iter.next();
190 let item3 = iter.next();
191 assert!(item3.is_some());
192 assert_eq!(item3.unwrap().1, vec![5]);
193 let item4 = iter.next();
194 assert!(item4.is_none());
195}