cfxcore/sync/request_manager/
request_handler.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::{
6    message::{Message, SetRequestId},
7    sync::{
8        message::{DynamicCapability, KeyContainer},
9        request_manager::RequestManager,
10        synchronization_protocol_handler::ProtocolConfiguration,
11        Error,
12    },
13    NodeType,
14};
15use cfx_parameters::sync::FAILED_REQUEST_RESEND_WAIT;
16use malloc_size_of::MallocSizeOf;
17use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
18use network::{
19    node_table::NodeId, Error as NetworkError, NetworkContext,
20    UpdateNodeOperation,
21};
22use parking_lot::Mutex;
23use std::{
24    any::Any,
25    cmp::Ordering,
26    collections::{BinaryHeap, HashMap, HashSet, VecDeque},
27    fmt::Debug,
28    mem,
29    sync::{
30        atomic::{AtomicBool, Ordering as AtomicOrdering},
31        Arc,
32    },
33    time::{Duration, Instant, SystemTime, UNIX_EPOCH},
34};
35
36#[derive(DeriveMallocSizeOf)]
37pub struct RequestHandler {
38    protocol_config: ProtocolConfiguration,
39    peers: Mutex<HashMap<NodeId, RequestContainer>>,
40    requests_queue: Mutex<BinaryHeap<Arc<TimedSyncRequests>>>,
41}
42
43impl RequestHandler {
44    pub fn new(protocol_config: &ProtocolConfiguration) -> Self {
45        Self {
46            protocol_config: protocol_config.clone(),
47            peers: Mutex::new(HashMap::new()),
48            requests_queue: Default::default(),
49        }
50    }
51
52    pub fn add_peer(&self, peer_id: NodeId) {
53        self.peers.lock().insert(
54            peer_id,
55            RequestContainer {
56                peer_id,
57                inflight_requests: HashMap::new(),
58                // Initialize request_id randomly to prevent responses from a
59                // peer to interfere with requests of the same
60                // peer after reconnection.
61                next_request_id: rand::random(),
62                max_inflight_request_count: self
63                    .protocol_config
64                    .max_inflight_request_count,
65                ..Default::default()
66            },
67        );
68    }
69
70    // Match request for given response.
71    // Could return the following error:
72    // 1. Error return from peer.match_request(): No need to let caller handle
73    //    request resending;
74    // 2. UnknownPeer: No need to let caller handle request resending;
75    pub fn match_request(
76        &self, peer_id: &NodeId, request_id: u64,
77    ) -> Result<RequestMessage, Error> {
78        let mut peers = self.peers.lock();
79        if let Some(peer) = peers.get_mut(peer_id) {
80            peer.match_request(request_id)
81        } else {
82            bail!(Error::UnknownPeer);
83        }
84    }
85
86    pub fn send_pending_requests(
87        &self, io: &dyn NetworkContext, peer: &NodeId,
88    ) {
89        if let Some(peer_info) = self.peers.lock().get_mut(peer) {
90            peer_info.send_pending_requests(
91                io,
92                &mut *self.requests_queue.lock(),
93                &self.protocol_config,
94            );
95        }
96    }
97
98    /// Send request to the specified peer. If peer is `None` or send request
99    /// failed, return the request back to caller to handle in advance.
100    pub fn send_request(
101        &self, io: &dyn NetworkContext, peer: Option<NodeId>,
102        request: Box<dyn Request>, delay: Option<Duration>,
103    ) -> Result<(), Box<dyn Request>> {
104        let peer = match peer {
105            Some(peer) => peer,
106            None => return Err(request),
107        };
108
109        let peers = &mut *self.peers.lock();
110        let peer_info = match peers.get_mut(&peer) {
111            Some(peer) => peer,
112            None => return Err(request),
113        };
114
115        let msg = RequestMessage::new(request, delay);
116
117        let request_id = match peer_info.get_next_request_id() {
118            Some(id) => id,
119            None => {
120                peer_info.append_pending_request(msg);
121                return Ok(());
122            }
123        };
124
125        peer_info.immediate_send_request_to_peer(
126            io,
127            request_id,
128            msg,
129            &mut *self.requests_queue.lock(),
130            &self.protocol_config,
131        );
132
133        Ok(())
134    }
135
136    fn get_timeout_sync_requests(&self) -> Vec<Arc<TimedSyncRequests>> {
137        let mut requests = self.requests_queue.lock();
138        let mut timeout_requests = Vec::new();
139        let now = Instant::now();
140        loop {
141            if requests.is_empty() {
142                break;
143            }
144            let sync_req = requests.pop().expect("queue not empty");
145            if sync_req.removed.load(AtomicOrdering::Relaxed) == true {
146                continue;
147            }
148            if sync_req.timeout_time >= now {
149                requests.push(sync_req);
150                break;
151            } else {
152                debug!("Timeout request {:?}", sync_req);
153                timeout_requests.push(sync_req);
154            }
155        }
156        timeout_requests
157    }
158
159    pub fn process_timeout_requests(
160        &self, io: &dyn NetworkContext,
161    ) -> Vec<RequestMessage> {
162        // Check if in-flight requests timeout
163        let mut timeout_requests = Vec::new();
164        let mut peers_to_disconnect = HashSet::new();
165        let mut peers_to_send_pending_requests = HashSet::new();
166        for sync_req in self.get_timeout_sync_requests() {
167            if let Ok(mut req) =
168                self.match_request(&sync_req.peer_id, sync_req.request_id)
169            {
170                let peer_id = sync_req.peer_id.clone();
171                if let Some(request_container) =
172                    self.peers.lock().get_mut(&peer_id)
173                {
174                    if request_container
175                        .on_timeout_should_disconnect(&self.protocol_config)
176                    {
177                        peers_to_disconnect.insert(peer_id);
178                    } else {
179                        peers_to_send_pending_requests.insert(peer_id);
180                    }
181                }
182                req.request.notify_timeout();
183                timeout_requests.push(req);
184            } else {
185                debug!("Timeout a removed request {:?}", sync_req);
186            }
187        }
188        let op = if self.protocol_config.demote_peer_for_timeout {
189            Some(UpdateNodeOperation::Demotion)
190        } else {
191            Some(UpdateNodeOperation::Failure)
192        };
193        for peer_id in peers_to_disconnect {
194            // Note `self.peers` will be used in `disconnect_peer`, so we must
195            // call it without locking `self.peers`.
196            io.disconnect_peer(
197                &peer_id,
198                op,
199                "too many timeout requests", /* reason */
200            );
201        }
202        for peer_id in peers_to_send_pending_requests {
203            self.send_pending_requests(io, &peer_id);
204        }
205
206        timeout_requests
207    }
208
209    /// Return unfinished_requests
210    pub fn remove_peer(&self, peer_id: &NodeId) -> Option<Vec<RequestMessage>> {
211        self.peers
212            .lock()
213            .remove(peer_id)
214            .map(|mut p| p.get_unfinished_requests())
215    }
216}
217
218#[derive(Default, DeriveMallocSizeOf)]
219struct RequestContainer {
220    peer_id: NodeId,
221    pub inflight_requests: HashMap<u64, SynchronizationPeerRequest>,
222    pub next_request_id: u64,
223    pub max_inflight_request_count: u64,
224    pub pending_requests: VecDeque<RequestMessage>,
225    pub timeout_statistics: VecDeque<u64>,
226}
227
228impl RequestContainer {
229    pub fn on_timeout_should_disconnect(
230        &mut self, config: &ProtocolConfiguration,
231    ) -> bool {
232        let now = SystemTime::now()
233            .duration_since(UNIX_EPOCH)
234            .unwrap()
235            .as_secs();
236        if self.timeout_statistics.is_empty() {
237            self.timeout_statistics.push_back(now);
238            return false;
239        }
240
241        self.timeout_statistics.push_back(now);
242        loop {
243            let old_time = *self.timeout_statistics.front().unwrap();
244            if now - old_time <= config.timeout_observing_period_s {
245                break;
246            }
247            self.timeout_statistics.pop_front();
248        }
249
250        if self.timeout_statistics.len()
251            <= config.max_allowed_timeout_in_observing_period as usize
252        {
253            return false;
254        } else {
255            return true;
256        }
257    }
258
259    /// If new request will be allowed to send, advance the request id now,
260    /// otherwise, actual new request id will be given to this request
261    /// when it is moved from pending to inflight queue.
262    pub fn get_next_request_id(&mut self) -> Option<u64> {
263        if self.inflight_requests.len()
264            < self.max_inflight_request_count as usize
265        {
266            let id = self.next_request_id;
267            self.next_request_id += 1;
268            Some(id)
269        } else {
270            None
271        }
272    }
273
274    pub fn append_inflight_request(
275        &mut self, request_id: u64, message: RequestMessage,
276        timed_req: Arc<TimedSyncRequests>,
277    ) {
278        self.inflight_requests.insert(
279            request_id,
280            SynchronizationPeerRequest { message, timed_req },
281        );
282    }
283
284    pub fn append_pending_request(&mut self, msg: RequestMessage) {
285        self.pending_requests.push_back(msg);
286    }
287
288    pub fn has_pending_requests(&self) -> bool {
289        !self.pending_requests.is_empty()
290    }
291
292    pub fn pop_pending_request(&mut self) -> Option<RequestMessage> {
293        self.pending_requests.pop_front()
294    }
295
296    pub fn remove_inflight_request(
297        &mut self, request_id: u64,
298    ) -> Option<SynchronizationPeerRequest> {
299        if let Some(save_req) = self.inflight_requests.remove(&request_id) {
300            Some(save_req)
301        } else {
302            debug!(
303                "Remove out of bound request peer={} request_id={} next={}",
304                self.peer_id, request_id, self.next_request_id
305            );
306            None
307        }
308    }
309
310    fn immediate_send_request_to_peer(
311        &mut self, io: &dyn NetworkContext, request_id: u64,
312        mut request_message: RequestMessage,
313        requests_queue: &mut BinaryHeap<Arc<TimedSyncRequests>>,
314        protocol_config: &ProtocolConfiguration,
315    ) {
316        request_message.request.set_request_id(request_id);
317        let res = request_message.request.send(io, &self.peer_id);
318        let is_send_error = if let Err(e) = res {
319            match e {
320                NetworkError::OversizedPacket => {
321                    panic!("Request packet should not be oversized!")
322                }
323                _ => {}
324            }
325            true
326        } else {
327            false
328        };
329
330        let timed_req = Arc::new(TimedSyncRequests::from_request(
331            self.peer_id,
332            request_id,
333            &request_message,
334            protocol_config,
335            is_send_error,
336        ));
337        self.append_inflight_request(
338            request_id,
339            request_message,
340            timed_req.clone(),
341        );
342        requests_queue.push(timed_req);
343    }
344
345    // Error from send_message():
346    //      This also does NOT introduce needs to handle request
347    //      resending for caller;
348    pub fn send_pending_requests(
349        &mut self, io: &dyn NetworkContext,
350        requests_queue: &mut BinaryHeap<Arc<TimedSyncRequests>>,
351        protocol_config: &ProtocolConfiguration,
352    ) {
353        trace!("send_pending_requests: len={}", self.pending_requests.len());
354        while self.has_pending_requests() {
355            if let Some(new_request_id) = self.get_next_request_id() {
356                let pending_msg = self.pop_pending_request().unwrap();
357
358                self.immediate_send_request_to_peer(
359                    io,
360                    new_request_id,
361                    pending_msg,
362                    requests_queue,
363                    protocol_config,
364                );
365            } else {
366                break;
367            }
368        }
369    }
370
371    // Match request with given response.
372    // Could return the following error:
373    // 1. RequestNotFound: In this case, no request is matched, so NO need to
374    //    handle the resending of the request for caller;
375    pub fn match_request(
376        &mut self, request_id: u64,
377    ) -> Result<RequestMessage, Error> {
378        let removed_req = self.remove_inflight_request(request_id);
379        if let Some(removed_req) = removed_req {
380            removed_req
381                .timed_req
382                .removed
383                .store(true, AtomicOrdering::Relaxed);
384            Ok(removed_req.message)
385        } else {
386            bail!(Error::RequestNotFound)
387        }
388    }
389
390    pub fn get_unfinished_requests(&mut self) -> Vec<RequestMessage> {
391        let mut unfinished_requests = Vec::new();
392        let mut new_map = HashMap::new();
393        mem::swap(&mut self.inflight_requests, &mut new_map);
394        for (_, req) in new_map {
395            req.timed_req.removed.store(true, AtomicOrdering::Relaxed);
396            unfinished_requests.push(req.message);
397        }
398
399        while let Some(req) = self.pending_requests.pop_front() {
400            unfinished_requests.push(req);
401        }
402        unfinished_requests
403    }
404}
405
406#[derive(Debug, DeriveMallocSizeOf)]
407pub struct SynchronizationPeerRequest {
408    pub message: RequestMessage,
409    pub timed_req: Arc<TimedSyncRequests>,
410}
411
412/// Support to downcast trait to concrete request type.
413pub trait AsAny {
414    fn as_any(&self) -> &dyn Any;
415    fn as_any_mut(&mut self) -> &mut dyn Any;
416}
417
418/// Trait of request message
419pub trait Request:
420    Send + Debug + AsAny + Message + SetRequestId + MallocSizeOf
421{
422    /// Request timeout for resend purpose.
423    fn timeout(&self, conf: &ProtocolConfiguration) -> Duration;
424
425    /// Cleanup the inflight request items when peer disconnected or invalid
426    /// message received.
427    fn on_removed(&self, inflight_keys: &KeyContainer);
428    /// Before send a request, check if its items already in flight.
429    /// If in flight, do not request duplicated items.
430    /// Otherwise, insert the item key into `inflight_keys`.
431    fn with_inflight(&mut self, inflight_keys: &KeyContainer);
432    /// If all requested items are already in flight, then do not send request
433    /// to remote peer.
434    fn is_empty(&self) -> bool;
435    /// Notify the handler when the request gets cancelled by empty.
436    fn notify_empty(&mut self) {}
437    /// When a request failed (send fail, invalid response or timeout), it will
438    /// be resend automatically.
439    ///
440    /// For some kind of requests, it will resend other kind of request other
441    /// than the original one. E.g. when get compact block failed, it will
442    /// request the whole block instead.
443    ///
444    /// If resend is not required, return `None`, e.g. request transactions
445    /// failed.
446    fn resend(&self) -> Option<Box<dyn Request>>;
447
448    /// Required peer capability to send this request
449    fn required_capability(&self) -> Option<DynamicCapability> { None }
450
451    /// Notify the handler when the request gets timeout.
452    fn notify_timeout(&mut self) {}
453
454    /// Epoch-gap-limit required by this request.
455    fn preferred_node_type(&self) -> Option<NodeType> { None }
456}
457
458#[derive(Debug, DeriveMallocSizeOf)]
459pub struct RequestMessage {
460    pub request: Box<dyn Request>,
461    pub delay: Option<Duration>,
462}
463
464impl RequestMessage {
465    pub fn new(request: Box<dyn Request>, delay: Option<Duration>) -> Self {
466        RequestMessage { request, delay }
467    }
468
469    pub fn set_request_id(&mut self, request_id: u64) {
470        self.request.set_request_id(request_id);
471    }
472
473    /// Download cast request to specified request type.
474    /// If downcast failed, resend the request again and return
475    /// `UnexpectedResponse` error.
476    pub fn downcast_ref<T: Request + Any>(
477        &self, io: &dyn NetworkContext, request_manager: &RequestManager,
478    ) -> Result<&T, Error> {
479        match self.request.as_any().downcast_ref::<T>() {
480            Some(req) => Ok(req),
481            None => {
482                warn!("failed to downcast general request to concrete request type, message = {:?}", self);
483                if let Some(resent_request) = self.request.resend() {
484                    request_manager.resend_request_to_another_peer(
485                        io,
486                        &RequestMessage::new(resent_request, self.delay),
487                    );
488                }
489                Err(Error::UnexpectedResponse.into())
490            }
491        }
492    }
493
494    pub fn downcast_mut<T: Request + Any>(
495        &mut self, _io: &dyn NetworkContext, _request_manager: &RequestManager,
496    ) -> Result<&mut T, Error> {
497        match self.request.as_any_mut().downcast_mut::<T>() {
498            Some(req) => Ok(req),
499            None => {
500                warn!("failed to downcast general request to concrete request type");
501                Err(Error::UnexpectedResponse.into())
502            }
503        }
504    }
505}
506
507#[derive(Debug, DeriveMallocSizeOf)]
508pub struct TimedSyncRequests {
509    pub peer_id: NodeId,
510    pub timeout_time: Instant,
511    pub request_id: u64,
512    pub removed: AtomicBool,
513}
514
515impl TimedSyncRequests {
516    pub fn new(
517        peer_id: NodeId, timeout: Duration, request_id: u64,
518    ) -> TimedSyncRequests {
519        TimedSyncRequests {
520            peer_id,
521            timeout_time: Instant::now() + timeout,
522            request_id,
523            removed: AtomicBool::new(false),
524        }
525    }
526
527    pub fn from_request(
528        peer_id: NodeId, request_id: u64, msg: &RequestMessage,
529        conf: &ProtocolConfiguration, is_send_error: bool,
530    ) -> TimedSyncRequests {
531        let timeout = if is_send_error {
532            FAILED_REQUEST_RESEND_WAIT.clone()
533        } else {
534            msg.request.timeout(conf)
535        };
536        TimedSyncRequests::new(peer_id, timeout, request_id)
537    }
538}
539
540impl Ord for TimedSyncRequests {
541    fn cmp(&self, other: &Self) -> Ordering {
542        other.timeout_time.cmp(&self.timeout_time)
543    }
544}
545
546impl PartialOrd for TimedSyncRequests {
547    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
548        Some(self.cmp(other))
549    }
550}
551
552impl Eq for TimedSyncRequests {}
553
554impl PartialEq for TimedSyncRequests {
555    fn eq(&self, other: &Self) -> bool {
556        self.timeout_time == other.timeout_time
557    }
558}