cfxcore/pos/protocol/request_manager/
request_handler.rs

1// Copyright 2019-2020 Conflux Foundation. All rights reserved.
2// TreeGraph is free software and distributed under Apache License 2.0.
3// See https://www.apache.org/licenses/LICENSE-2.0
4
5use crate::{
6    message::{Message, SetRequestId},
7    pos::protocol::{
8        request_manager::RequestManager, sync_protocol::RpcResponse,
9    },
10    sync::{Error, ProtocolConfiguration},
11};
12use futures::channel::oneshot;
13use network::{
14    node_table::NodeId, Error as NetworkError, NetworkContext,
15    UpdateNodeOperation,
16};
17use parking_lot::Mutex;
18use std::{
19    any::Any,
20    cmp::Ordering,
21    collections::{BinaryHeap, HashMap, HashSet, VecDeque},
22    fmt::Debug,
23    mem,
24    sync::{
25        atomic::{AtomicBool, Ordering as AtomicOrdering},
26        Arc,
27    },
28    time::{Duration, Instant, SystemTime, UNIX_EPOCH},
29};
30
31pub struct RequestHandler {
32    protocol_config: ProtocolConfiguration,
33    peers: Mutex<HashMap<NodeId, RequestContainer>>,
34    requests_queue: Mutex<BinaryHeap<Arc<TimedSyncRequests>>>,
35}
36
37impl RequestHandler {
38    pub fn new(protocol_config: &ProtocolConfiguration) -> Self {
39        Self {
40            protocol_config: protocol_config.clone(),
41            peers: Mutex::new(HashMap::new()),
42            requests_queue: Default::default(),
43        }
44    }
45
46    pub fn add_peer(&self, peer_id: NodeId) {
47        self.peers.lock().insert(
48            peer_id,
49            RequestContainer {
50                peer_id,
51                inflight_requests: HashMap::new(),
52                next_request_id: 0,
53                max_inflight_request_count: self
54                    .protocol_config
55                    .max_inflight_request_count,
56                ..Default::default()
57            },
58        );
59    }
60
61    // Match request for given response.
62    // Could return the following error:
63    // 1. Error return from peer.match_request(): No need to let caller handle
64    //    request resending;
65    // 2. UnknownPeer: No need to let caller handle request resending;
66    pub fn match_request(
67        &self, io: &dyn NetworkContext, peer_id: &NodeId, request_id: u64,
68    ) -> Result<RequestMessage, Error> {
69        let mut peers = self.peers.lock();
70        let mut requests_queue = self.requests_queue.lock();
71        if let Some(peer) = peers.get_mut(peer_id) {
72            peer.match_request(
73                io,
74                request_id,
75                &mut *requests_queue,
76                &self.protocol_config,
77            )
78        } else {
79            bail!(Error::UnknownPeer);
80        }
81    }
82
83    /// Send request to the specified peer. If peer is `None` or send request
84    /// failed, return the request back to caller to handle in advance.
85    pub fn send_request(
86        &self, io: &dyn NetworkContext, peer: Option<NodeId>,
87        mut request: Box<dyn Request>, delay: Option<Duration>,
88    ) -> Result<(), Box<dyn Request>> {
89        let peer = match peer {
90            Some(peer) => peer,
91            None => return Err(request),
92        };
93
94        let mut peers = self.peers.lock();
95        let mut requests_queue = self.requests_queue.lock();
96
97        let peer_info = match peers.get_mut(&peer) {
98            Some(peer) => peer,
99            None => return Err(request),
100        };
101
102        let request_id = match peer_info.get_next_request_id() {
103            Some(id) => id,
104            None => {
105                peer_info.append_pending_request(RequestMessage::new(
106                    request, delay,
107                ));
108                return Ok(());
109            }
110        };
111
112        request.set_request_id(request_id);
113        let send_res = request.send(io, &peer);
114        let is_send_error = if let Err(e) = send_res {
115            match e {
116                NetworkError::OversizedPacket => {
117                    panic!("Request packet should not be oversized!")
118                }
119                _ => {}
120            }
121            true
122        } else {
123            false
124        };
125
126        let msg = RequestMessage::new(request, delay);
127
128        let timed_req = Arc::new(TimedSyncRequests::from_request(
129            peer,
130            request_id,
131            &msg,
132            &self.protocol_config,
133            is_send_error,
134        ));
135        peer_info.append_inflight_request(request_id, msg, timed_req.clone());
136        requests_queue.push(timed_req);
137
138        Ok(())
139    }
140
141    fn get_timeout_sync_requests(&self) -> Vec<Arc<TimedSyncRequests>> {
142        let mut requests = self.requests_queue.lock();
143        let mut timeout_requests = Vec::new();
144        let now = Instant::now();
145        loop {
146            if requests.is_empty() {
147                break;
148            }
149            let sync_req = requests.pop().expect("queue not empty");
150            if sync_req.removed.load(AtomicOrdering::Relaxed) == true {
151                continue;
152            }
153            if sync_req.timeout_time >= now {
154                requests.push(sync_req);
155                break;
156            } else {
157                debug!("Timeout request {:?}", sync_req);
158                timeout_requests.push(sync_req);
159            }
160        }
161        timeout_requests
162    }
163
164    pub fn get_timeout_requests(
165        &self, io: &dyn NetworkContext,
166    ) -> Vec<RequestMessage> {
167        // Check if in-flight requests timeout
168        let mut timeout_requests = Vec::new();
169        let mut peers_to_disconnect = HashSet::new();
170        for sync_req in self.get_timeout_sync_requests() {
171            if let Ok(req) =
172                self.match_request(io, &sync_req.peer_id, sync_req.request_id)
173            {
174                let peer_id = &sync_req.peer_id;
175                if let Some(request_container) =
176                    self.peers.lock().get_mut(peer_id)
177                {
178                    if request_container
179                        .on_timeout_should_disconnect(&self.protocol_config)
180                    {
181                        peers_to_disconnect.insert(*peer_id);
182                    }
183                }
184                timeout_requests.push(req);
185            } else {
186                debug!("Timeout a removed request {:?}", sync_req);
187            }
188        }
189        let op = if self.protocol_config.demote_peer_for_timeout {
190            Some(UpdateNodeOperation::Demotion)
191        } else {
192            Some(UpdateNodeOperation::Failure)
193        };
194        for peer_id in peers_to_disconnect {
195            // Note `self.peers` will be used in `disconnect_peer`, so we must
196            // call it without locking `self.peers`.
197            io.disconnect_peer(
198                &peer_id,
199                op,
200                "too many timeout requests", /* reason */
201            );
202        }
203
204        timeout_requests
205    }
206
207    /// Return unfinished_requests
208    pub fn remove_peer(&self, peer_id: &NodeId) -> Option<Vec<RequestMessage>> {
209        self.peers
210            .lock()
211            .remove(peer_id)
212            .map(|mut p| p.get_unfinished_requests())
213    }
214}
215
216#[derive(Default)]
217struct RequestContainer {
218    peer_id: NodeId,
219    pub inflight_requests: HashMap<u64, SynchronizationPeerRequest>,
220    pub next_request_id: u64,
221    pub max_inflight_request_count: u64,
222    pub pending_requests: VecDeque<RequestMessage>,
223    pub timeout_statistics: VecDeque<u64>,
224}
225
226impl RequestContainer {
227    pub fn on_timeout_should_disconnect(
228        &mut self, config: &ProtocolConfiguration,
229    ) -> bool {
230        let now = SystemTime::now()
231            .duration_since(UNIX_EPOCH)
232            .unwrap()
233            .as_secs();
234        if self.timeout_statistics.is_empty() {
235            self.timeout_statistics.push_back(now);
236            return false;
237        }
238
239        self.timeout_statistics.push_back(now);
240        loop {
241            let old_time = *self.timeout_statistics.front().unwrap();
242            if now - old_time <= config.timeout_observing_period_s {
243                break;
244            }
245            self.timeout_statistics.pop_front();
246        }
247
248        if self.timeout_statistics.len()
249            <= config.max_allowed_timeout_in_observing_period as usize
250        {
251            return false;
252        } else {
253            return true;
254        }
255    }
256
257    /// If new request will be allowed to send, advance the request id now,
258    /// otherwise, actual new request id will be given to this request
259    /// when it is moved from pending to inflight queue.
260    pub fn get_next_request_id(&mut self) -> Option<u64> {
261        if self.inflight_requests.len()
262            < self.max_inflight_request_count as usize
263        {
264            let id = self.next_request_id;
265            self.next_request_id += 1;
266            Some(id)
267        } else {
268            None
269        }
270    }
271
272    pub fn append_inflight_request(
273        &mut self, request_id: u64, message: RequestMessage,
274        timed_req: Arc<TimedSyncRequests>,
275    ) {
276        self.inflight_requests.insert(
277            request_id,
278            SynchronizationPeerRequest { message, timed_req },
279        );
280    }
281
282    pub fn append_pending_request(&mut self, msg: RequestMessage) {
283        self.pending_requests.push_back(msg);
284    }
285
286    pub fn has_pending_requests(&self) -> bool {
287        !self.pending_requests.is_empty()
288    }
289
290    pub fn pop_pending_request(&mut self) -> Option<RequestMessage> {
291        self.pending_requests.pop_front()
292    }
293
294    pub fn remove_inflight_request(
295        &mut self, request_id: u64,
296    ) -> Option<SynchronizationPeerRequest> {
297        if let Some(save_req) = self.inflight_requests.remove(&request_id) {
298            Some(save_req)
299        } else {
300            debug!(
301                "Remove out of bound request peer={} request_id={} next={}",
302                self.peer_id, request_id, self.next_request_id
303            );
304            None
305        }
306    }
307
308    // Match request with given response.
309    // Could return the following error:
310    // 1. RequestNotFound: In this case, no request is matched, so NO need to
311    //    handle the resending of the request for caller;
312    // 2. Error from send_message(): This also does NOT introduce needs to
313    //    handle request resending for caller;
314    pub fn match_request(
315        &mut self, io: &dyn NetworkContext, request_id: u64,
316        requests_queue: &mut BinaryHeap<Arc<TimedSyncRequests>>,
317        protocol_config: &ProtocolConfiguration,
318    ) -> Result<RequestMessage, Error> {
319        let removed_req = self.remove_inflight_request(request_id);
320        if let Some(removed_req) = removed_req {
321            removed_req
322                .timed_req
323                .removed
324                .store(true, AtomicOrdering::Relaxed);
325            while self.has_pending_requests() {
326                if let Some(new_request_id) = self.get_next_request_id() {
327                    let mut pending_msg = self.pop_pending_request().unwrap();
328                    pending_msg.set_request_id(new_request_id);
329                    let send_res = pending_msg.request.send(io, &self.peer_id);
330                    let is_send_error = if let Err(e) = send_res {
331                        match e {
332                            NetworkError::OversizedPacket => panic!(
333                                "Request packet should not be oversized!"
334                            ),
335                            _ => {}
336                        }
337                        true
338                    } else {
339                        false
340                    };
341
342                    let timed_req = Arc::new(TimedSyncRequests::from_request(
343                        self.peer_id,
344                        new_request_id,
345                        &pending_msg,
346                        protocol_config,
347                        is_send_error,
348                    ));
349                    self.append_inflight_request(
350                        new_request_id,
351                        pending_msg,
352                        timed_req.clone(),
353                    );
354                    requests_queue.push(timed_req);
355                } else {
356                    break;
357                }
358            }
359            Ok(removed_req.message)
360        } else {
361            bail!(Error::RequestNotFound)
362        }
363    }
364
365    pub fn get_unfinished_requests(&mut self) -> Vec<RequestMessage> {
366        let mut unfinished_requests = Vec::new();
367        let mut new_map = HashMap::new();
368        mem::swap(&mut self.inflight_requests, &mut new_map);
369        for (_, req) in new_map {
370            req.timed_req.removed.store(true, AtomicOrdering::Relaxed);
371            unfinished_requests.push(req.message);
372        }
373
374        while let Some(req) = self.pending_requests.pop_front() {
375            unfinished_requests.push(req);
376        }
377        unfinished_requests
378    }
379}
380
381#[derive(Debug)]
382pub struct SynchronizationPeerRequest {
383    pub message: RequestMessage,
384    pub timed_req: Arc<TimedSyncRequests>,
385}
386
387/// Support to downcast trait to concrete request type.
388pub trait AsAny {
389    fn as_any(&self) -> &dyn Any;
390    fn as_any_mut(&mut self) -> &mut dyn Any;
391}
392
393/// Trait of request message
394pub trait Request: Send + Debug + AsAny + Message + SetRequestId {
395    /// Request timeout for resend purpose.
396    fn timeout(&self, conf: &ProtocolConfiguration) -> Duration;
397
398    /// Notify the handler when error happens for the request.
399    fn notify_error(&mut self, error: Error);
400
401    /// This is for RPC request. Set the notification handle for the request.
402    fn set_response_notification(
403        &mut self, res_tx: oneshot::Sender<Result<Box<dyn RpcResponse>, Error>>,
404    );
405}
406
407#[derive(Debug)]
408pub struct RequestMessage {
409    pub request: Box<dyn Request>,
410    pub delay: Option<Duration>,
411}
412
413impl RequestMessage {
414    pub fn new(request: Box<dyn Request>, delay: Option<Duration>) -> Self {
415        RequestMessage { request, delay }
416    }
417
418    pub fn set_request_id(&mut self, request_id: u64) {
419        self.request.set_request_id(request_id);
420    }
421
422    pub fn downcast_mut<T: Request + Any>(
423        &mut self, _io: &dyn NetworkContext, _request_manager: &RequestManager,
424    ) -> Result<&mut T, Error> {
425        match self.request.as_any_mut().downcast_mut::<T>() {
426            Some(req) => Ok(req),
427            None => {
428                warn!("failed to downcast general request to concrete request type");
429                Err(Error::UnexpectedResponse.into())
430            }
431        }
432    }
433}
434
435#[derive(Debug)]
436pub struct TimedSyncRequests {
437    pub peer_id: NodeId,
438    pub timeout_time: Instant,
439    pub request_id: u64,
440    pub removed: AtomicBool,
441}
442
443impl TimedSyncRequests {
444    pub fn new(
445        peer_id: NodeId, timeout: Duration, request_id: u64,
446    ) -> TimedSyncRequests {
447        TimedSyncRequests {
448            peer_id,
449            timeout_time: Instant::now() + timeout,
450            request_id,
451            removed: AtomicBool::new(false),
452        }
453    }
454
455    pub fn from_request(
456        peer_id: NodeId, request_id: u64, msg: &RequestMessage,
457        conf: &ProtocolConfiguration, is_send_error: bool,
458    ) -> TimedSyncRequests {
459        let timeout = if is_send_error {
460            Duration::from_secs(1)
461        } else {
462            msg.request.timeout(conf)
463        };
464        TimedSyncRequests::new(peer_id, timeout, request_id)
465    }
466}
467
468impl Ord for TimedSyncRequests {
469    fn cmp(&self, other: &Self) -> Ordering {
470        other.timeout_time.cmp(&self.timeout_time)
471    }
472}
473
474impl PartialOrd for TimedSyncRequests {
475    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
476        Some(self.cmp(other))
477    }
478}
479
480impl Eq for TimedSyncRequests {}
481
482impl PartialEq for TimedSyncRequests {
483    fn eq(&self, other: &Self) -> bool {
484        self.timeout_time == other.timeout_time
485    }
486}