cfxcore/pos/protocol/request_manager/
request_handler.rs1use crate::{
6 message::{Message, SetRequestId},
7 pos::protocol::{
8 request_manager::RequestManager, sync_protocol::RpcResponse,
9 },
10 sync::{Error, ProtocolConfiguration},
11};
12use futures::channel::oneshot;
13use network::{
14 node_table::NodeId, Error as NetworkError, NetworkContext,
15 UpdateNodeOperation,
16};
17use parking_lot::Mutex;
18use std::{
19 any::Any,
20 cmp::Ordering,
21 collections::{BinaryHeap, HashMap, HashSet, VecDeque},
22 fmt::Debug,
23 mem,
24 sync::{
25 atomic::{AtomicBool, Ordering as AtomicOrdering},
26 Arc,
27 },
28 time::{Duration, Instant, SystemTime, UNIX_EPOCH},
29};
30
31pub struct RequestHandler {
32 protocol_config: ProtocolConfiguration,
33 peers: Mutex<HashMap<NodeId, RequestContainer>>,
34 requests_queue: Mutex<BinaryHeap<Arc<TimedSyncRequests>>>,
35}
36
37impl RequestHandler {
38 pub fn new(protocol_config: &ProtocolConfiguration) -> Self {
39 Self {
40 protocol_config: protocol_config.clone(),
41 peers: Mutex::new(HashMap::new()),
42 requests_queue: Default::default(),
43 }
44 }
45
46 pub fn add_peer(&self, peer_id: NodeId) {
47 self.peers.lock().insert(
48 peer_id,
49 RequestContainer {
50 peer_id,
51 inflight_requests: HashMap::new(),
52 next_request_id: 0,
53 max_inflight_request_count: self
54 .protocol_config
55 .max_inflight_request_count,
56 ..Default::default()
57 },
58 );
59 }
60
61 pub fn match_request(
67 &self, io: &dyn NetworkContext, peer_id: &NodeId, request_id: u64,
68 ) -> Result<RequestMessage, Error> {
69 let mut peers = self.peers.lock();
70 let mut requests_queue = self.requests_queue.lock();
71 if let Some(peer) = peers.get_mut(peer_id) {
72 peer.match_request(
73 io,
74 request_id,
75 &mut *requests_queue,
76 &self.protocol_config,
77 )
78 } else {
79 bail!(Error::UnknownPeer);
80 }
81 }
82
83 pub fn send_request(
86 &self, io: &dyn NetworkContext, peer: Option<NodeId>,
87 mut request: Box<dyn Request>, delay: Option<Duration>,
88 ) -> Result<(), Box<dyn Request>> {
89 let peer = match peer {
90 Some(peer) => peer,
91 None => return Err(request),
92 };
93
94 let mut peers = self.peers.lock();
95 let mut requests_queue = self.requests_queue.lock();
96
97 let peer_info = match peers.get_mut(&peer) {
98 Some(peer) => peer,
99 None => return Err(request),
100 };
101
102 let request_id = match peer_info.get_next_request_id() {
103 Some(id) => id,
104 None => {
105 peer_info.append_pending_request(RequestMessage::new(
106 request, delay,
107 ));
108 return Ok(());
109 }
110 };
111
112 request.set_request_id(request_id);
113 let send_res = request.send(io, &peer);
114 let is_send_error = if let Err(e) = send_res {
115 match e {
116 NetworkError::OversizedPacket => {
117 panic!("Request packet should not be oversized!")
118 }
119 _ => {}
120 }
121 true
122 } else {
123 false
124 };
125
126 let msg = RequestMessage::new(request, delay);
127
128 let timed_req = Arc::new(TimedSyncRequests::from_request(
129 peer,
130 request_id,
131 &msg,
132 &self.protocol_config,
133 is_send_error,
134 ));
135 peer_info.append_inflight_request(request_id, msg, timed_req.clone());
136 requests_queue.push(timed_req);
137
138 Ok(())
139 }
140
141 fn get_timeout_sync_requests(&self) -> Vec<Arc<TimedSyncRequests>> {
142 let mut requests = self.requests_queue.lock();
143 let mut timeout_requests = Vec::new();
144 let now = Instant::now();
145 loop {
146 if requests.is_empty() {
147 break;
148 }
149 let sync_req = requests.pop().expect("queue not empty");
150 if sync_req.removed.load(AtomicOrdering::Relaxed) == true {
151 continue;
152 }
153 if sync_req.timeout_time >= now {
154 requests.push(sync_req);
155 break;
156 } else {
157 debug!("Timeout request {:?}", sync_req);
158 timeout_requests.push(sync_req);
159 }
160 }
161 timeout_requests
162 }
163
164 pub fn get_timeout_requests(
165 &self, io: &dyn NetworkContext,
166 ) -> Vec<RequestMessage> {
167 let mut timeout_requests = Vec::new();
169 let mut peers_to_disconnect = HashSet::new();
170 for sync_req in self.get_timeout_sync_requests() {
171 if let Ok(req) =
172 self.match_request(io, &sync_req.peer_id, sync_req.request_id)
173 {
174 let peer_id = &sync_req.peer_id;
175 if let Some(request_container) =
176 self.peers.lock().get_mut(peer_id)
177 {
178 if request_container
179 .on_timeout_should_disconnect(&self.protocol_config)
180 {
181 peers_to_disconnect.insert(*peer_id);
182 }
183 }
184 timeout_requests.push(req);
185 } else {
186 debug!("Timeout a removed request {:?}", sync_req);
187 }
188 }
189 let op = if self.protocol_config.demote_peer_for_timeout {
190 Some(UpdateNodeOperation::Demotion)
191 } else {
192 Some(UpdateNodeOperation::Failure)
193 };
194 for peer_id in peers_to_disconnect {
195 io.disconnect_peer(
198 &peer_id,
199 op,
200 "too many timeout requests", );
202 }
203
204 timeout_requests
205 }
206
207 pub fn remove_peer(&self, peer_id: &NodeId) -> Option<Vec<RequestMessage>> {
209 self.peers
210 .lock()
211 .remove(peer_id)
212 .map(|mut p| p.get_unfinished_requests())
213 }
214}
215
216#[derive(Default)]
217struct RequestContainer {
218 peer_id: NodeId,
219 pub inflight_requests: HashMap<u64, SynchronizationPeerRequest>,
220 pub next_request_id: u64,
221 pub max_inflight_request_count: u64,
222 pub pending_requests: VecDeque<RequestMessage>,
223 pub timeout_statistics: VecDeque<u64>,
224}
225
226impl RequestContainer {
227 pub fn on_timeout_should_disconnect(
228 &mut self, config: &ProtocolConfiguration,
229 ) -> bool {
230 let now = SystemTime::now()
231 .duration_since(UNIX_EPOCH)
232 .unwrap()
233 .as_secs();
234 if self.timeout_statistics.is_empty() {
235 self.timeout_statistics.push_back(now);
236 return false;
237 }
238
239 self.timeout_statistics.push_back(now);
240 loop {
241 let old_time = *self.timeout_statistics.front().unwrap();
242 if now - old_time <= config.timeout_observing_period_s {
243 break;
244 }
245 self.timeout_statistics.pop_front();
246 }
247
248 if self.timeout_statistics.len()
249 <= config.max_allowed_timeout_in_observing_period as usize
250 {
251 return false;
252 } else {
253 return true;
254 }
255 }
256
257 pub fn get_next_request_id(&mut self) -> Option<u64> {
261 if self.inflight_requests.len()
262 < self.max_inflight_request_count as usize
263 {
264 let id = self.next_request_id;
265 self.next_request_id += 1;
266 Some(id)
267 } else {
268 None
269 }
270 }
271
272 pub fn append_inflight_request(
273 &mut self, request_id: u64, message: RequestMessage,
274 timed_req: Arc<TimedSyncRequests>,
275 ) {
276 self.inflight_requests.insert(
277 request_id,
278 SynchronizationPeerRequest { message, timed_req },
279 );
280 }
281
282 pub fn append_pending_request(&mut self, msg: RequestMessage) {
283 self.pending_requests.push_back(msg);
284 }
285
286 pub fn has_pending_requests(&self) -> bool {
287 !self.pending_requests.is_empty()
288 }
289
290 pub fn pop_pending_request(&mut self) -> Option<RequestMessage> {
291 self.pending_requests.pop_front()
292 }
293
294 pub fn remove_inflight_request(
295 &mut self, request_id: u64,
296 ) -> Option<SynchronizationPeerRequest> {
297 if let Some(save_req) = self.inflight_requests.remove(&request_id) {
298 Some(save_req)
299 } else {
300 debug!(
301 "Remove out of bound request peer={} request_id={} next={}",
302 self.peer_id, request_id, self.next_request_id
303 );
304 None
305 }
306 }
307
308 pub fn match_request(
315 &mut self, io: &dyn NetworkContext, request_id: u64,
316 requests_queue: &mut BinaryHeap<Arc<TimedSyncRequests>>,
317 protocol_config: &ProtocolConfiguration,
318 ) -> Result<RequestMessage, Error> {
319 let removed_req = self.remove_inflight_request(request_id);
320 if let Some(removed_req) = removed_req {
321 removed_req
322 .timed_req
323 .removed
324 .store(true, AtomicOrdering::Relaxed);
325 while self.has_pending_requests() {
326 if let Some(new_request_id) = self.get_next_request_id() {
327 let mut pending_msg = self.pop_pending_request().unwrap();
328 pending_msg.set_request_id(new_request_id);
329 let send_res = pending_msg.request.send(io, &self.peer_id);
330 let is_send_error = if let Err(e) = send_res {
331 match e {
332 NetworkError::OversizedPacket => panic!(
333 "Request packet should not be oversized!"
334 ),
335 _ => {}
336 }
337 true
338 } else {
339 false
340 };
341
342 let timed_req = Arc::new(TimedSyncRequests::from_request(
343 self.peer_id,
344 new_request_id,
345 &pending_msg,
346 protocol_config,
347 is_send_error,
348 ));
349 self.append_inflight_request(
350 new_request_id,
351 pending_msg,
352 timed_req.clone(),
353 );
354 requests_queue.push(timed_req);
355 } else {
356 break;
357 }
358 }
359 Ok(removed_req.message)
360 } else {
361 bail!(Error::RequestNotFound)
362 }
363 }
364
365 pub fn get_unfinished_requests(&mut self) -> Vec<RequestMessage> {
366 let mut unfinished_requests = Vec::new();
367 let mut new_map = HashMap::new();
368 mem::swap(&mut self.inflight_requests, &mut new_map);
369 for (_, req) in new_map {
370 req.timed_req.removed.store(true, AtomicOrdering::Relaxed);
371 unfinished_requests.push(req.message);
372 }
373
374 while let Some(req) = self.pending_requests.pop_front() {
375 unfinished_requests.push(req);
376 }
377 unfinished_requests
378 }
379}
380
381#[derive(Debug)]
382pub struct SynchronizationPeerRequest {
383 pub message: RequestMessage,
384 pub timed_req: Arc<TimedSyncRequests>,
385}
386
387pub trait AsAny {
389 fn as_any(&self) -> &dyn Any;
390 fn as_any_mut(&mut self) -> &mut dyn Any;
391}
392
393pub trait Request: Send + Debug + AsAny + Message + SetRequestId {
395 fn timeout(&self, conf: &ProtocolConfiguration) -> Duration;
397
398 fn notify_error(&mut self, error: Error);
400
401 fn set_response_notification(
403 &mut self, res_tx: oneshot::Sender<Result<Box<dyn RpcResponse>, Error>>,
404 );
405}
406
407#[derive(Debug)]
408pub struct RequestMessage {
409 pub request: Box<dyn Request>,
410 pub delay: Option<Duration>,
411}
412
413impl RequestMessage {
414 pub fn new(request: Box<dyn Request>, delay: Option<Duration>) -> Self {
415 RequestMessage { request, delay }
416 }
417
418 pub fn set_request_id(&mut self, request_id: u64) {
419 self.request.set_request_id(request_id);
420 }
421
422 pub fn downcast_mut<T: Request + Any>(
423 &mut self, _io: &dyn NetworkContext, _request_manager: &RequestManager,
424 ) -> Result<&mut T, Error> {
425 match self.request.as_any_mut().downcast_mut::<T>() {
426 Some(req) => Ok(req),
427 None => {
428 warn!("failed to downcast general request to concrete request type");
429 Err(Error::UnexpectedResponse.into())
430 }
431 }
432 }
433}
434
435#[derive(Debug)]
436pub struct TimedSyncRequests {
437 pub peer_id: NodeId,
438 pub timeout_time: Instant,
439 pub request_id: u64,
440 pub removed: AtomicBool,
441}
442
443impl TimedSyncRequests {
444 pub fn new(
445 peer_id: NodeId, timeout: Duration, request_id: u64,
446 ) -> TimedSyncRequests {
447 TimedSyncRequests {
448 peer_id,
449 timeout_time: Instant::now() + timeout,
450 request_id,
451 removed: AtomicBool::new(false),
452 }
453 }
454
455 pub fn from_request(
456 peer_id: NodeId, request_id: u64, msg: &RequestMessage,
457 conf: &ProtocolConfiguration, is_send_error: bool,
458 ) -> TimedSyncRequests {
459 let timeout = if is_send_error {
460 Duration::from_secs(1)
461 } else {
462 msg.request.timeout(conf)
463 };
464 TimedSyncRequests::new(peer_id, timeout, request_id)
465 }
466}
467
468impl Ord for TimedSyncRequests {
469 fn cmp(&self, other: &Self) -> Ordering {
470 other.timeout_time.cmp(&self.timeout_time)
471 }
472}
473
474impl PartialOrd for TimedSyncRequests {
475 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
476 Some(self.cmp(other))
477 }
478}
479
480impl Eq for TimedSyncRequests {}
481
482impl PartialEq for TimedSyncRequests {
483 fn eq(&self, other: &Self) -> bool {
484 self.timeout_time == other.timeout_time
485 }
486}