cfxcore/pos/protocol/request_manager/
mod.rs

1// Copyright 2019-2020 Conflux Foundation. All rights reserved.
2// TreeGraph is free software and distributed under Apache License 2.0.
3// See https://www.apache.org/licenses/LICENSE-2.0
4
5use crate::{
6    pos::protocol::sync_protocol::RpcResponse,
7    sync::{Error, ProtocolConfiguration},
8};
9use cfx_parameters::sync::REQUEST_START_WAITING_TIME;
10use diem_logger::prelude::diem_debug;
11use futures::{channel::oneshot, future::Future};
12use network::{node_table::NodeId, NetworkContext};
13use parking_lot::Mutex;
14pub use request_handler::{
15    AsAny, Request, RequestHandler, RequestMessage, SynchronizationPeerRequest,
16};
17use std::{
18    cmp::Ordering,
19    collections::binary_heap::BinaryHeap,
20    sync::Arc,
21    time::{Duration, Instant},
22};
23
24pub mod request_handler;
25
26#[derive(Debug)]
27struct WaitingRequest(Box<dyn Request>, Duration); // (request, delay)
28
29pub struct RequestManager {
30    /// Each element is (timeout_time, request, chosen_peer)
31    waiting_requests: Mutex<BinaryHeap<TimedWaitingRequest>>,
32
33    /// This is used to handle request_id matching
34    request_handler: Arc<RequestHandler>,
35}
36
37impl RequestManager {
38    pub fn new(protocol_config: &ProtocolConfiguration) -> Self {
39        Self {
40            waiting_requests: Default::default(),
41            request_handler: Arc::new(RequestHandler::new(protocol_config)),
42        }
43    }
44
45    /// Send a unary rpc request to remote peer `recipient`.
46    pub async fn unary_rpc<'a>(
47        &'a self, io: &'a dyn NetworkContext, recipient: Option<NodeId>,
48        mut request: Box<dyn Request>,
49    ) -> impl Future<Output = Result<Box<dyn RpcResponse>, Error>> + 'a {
50        async move {
51            // ask network to fulfill rpc request
52            let (res_tx, res_rx) = oneshot::channel();
53            request.set_response_notification(res_tx);
54
55            self.request_with_delay(io, request, recipient, None);
56
57            // wait for response
58            let response = res_rx.await??;
59            Ok(response)
60        }
61    }
62
63    /// Send request to remote peer with delay mechanism. If failed,
64    /// add the request to waiting queue to resend later.
65    pub fn request_with_delay(
66        &self, io: &dyn NetworkContext, mut request: Box<dyn Request>,
67        peer: Option<NodeId>, delay: Option<Duration>,
68    ) {
69        // increase delay for resent request.
70        let (cur_delay, next_delay) = match delay {
71            Some(d) => (d, d + *REQUEST_START_WAITING_TIME),
72            None => (*REQUEST_START_WAITING_TIME, *REQUEST_START_WAITING_TIME),
73        };
74
75        if peer.is_none() {
76            request.notify_error(Error::RpcCancelledByDisconnection.into());
77            return;
78        }
79
80        // delay if no peer available or delay required
81        if delay.is_some() {
82            // todo remove the request if waiting time is too long?
83            // E.g. attacker may broadcast many many invalid block hashes,
84            // and no peer could return the corresponding block header.
85            diem_debug!("request_with_delay: add request to waiting_requests, peer={:?}, request={:?}, delay={:?}", peer, request, cur_delay);
86            self.waiting_requests.lock().push(TimedWaitingRequest::new(
87                Instant::now() + cur_delay,
88                WaitingRequest(request, next_delay),
89                peer.unwrap(),
90            ));
91
92            return;
93        }
94
95        if let Err(mut req) = self.request_handler.send_request(
96            io,
97            peer,
98            request,
99            Some(next_delay),
100        ) {
101            debug!("request_with_delay: send_request fails, peer={:?}, request={:?}", peer, req);
102            req.notify_error(Error::RpcCancelledByDisconnection.into());
103        }
104    }
105
106    // Match request with given response.
107    // No need to let caller handle request resending.
108    pub fn match_request(
109        &self, io: &dyn NetworkContext, peer_id: &NodeId, request_id: u64,
110    ) -> Result<RequestMessage, Error> {
111        self.request_handler.match_request(io, peer_id, request_id)
112    }
113
114    pub fn process_timeout_requests(&self, io: &dyn NetworkContext) {
115        trace!("process_timeout_requests: start");
116        let timeout_requests = self.request_handler.get_timeout_requests(io);
117        for mut req in timeout_requests {
118            debug!("Timeout requests: {:?}", req);
119            req.request.notify_error(Error::RpcTimeout.into());
120        }
121    }
122
123    /// Send waiting requests that their backoff delay have passes
124    pub fn resend_waiting_requests(&self, io: &dyn NetworkContext) {
125        debug!("resend_waiting_requests: start");
126        let mut waiting_requests = self.waiting_requests.lock();
127        let now = Instant::now();
128
129        while let Some(req) = waiting_requests.pop() {
130            if req.time_to_send >= now {
131                waiting_requests.push(req);
132                break;
133            }
134
135            let chosen_peer = req.peer;
136            debug!("Send waiting req {:?} to peer={}", req, chosen_peer);
137
138            let WaitingRequest(request, delay) = req.request;
139            let next_delay = delay + *REQUEST_START_WAITING_TIME;
140
141            if let Err(mut req) = self.request_handler.send_request(
142                io,
143                Some(chosen_peer),
144                request,
145                Some(next_delay),
146            ) {
147                req.notify_error(Error::RpcCancelledByDisconnection.into());
148            }
149        }
150    }
151
152    pub fn on_peer_connected(&self, peer: &NodeId) {
153        self.request_handler.add_peer(*peer);
154    }
155
156    pub fn on_peer_disconnected(
157        &self, _io: &dyn NetworkContext, peer: &NodeId,
158    ) {
159        if let Some(unfinished_requests) =
160            self.request_handler.remove_peer(peer)
161        {
162            for mut msg in unfinished_requests {
163                msg.request
164                    .notify_error(Error::RpcCancelledByDisconnection.into());
165            }
166        } else {
167            debug!("Peer already removed form request manager when disconnected peer={}", peer);
168        }
169    }
170}
171
172#[derive(Debug)]
173struct TimedWaitingRequest {
174    time_to_send: Instant,
175    request: WaitingRequest,
176    peer: NodeId,
177}
178
179impl TimedWaitingRequest {
180    fn new(
181        time_to_send: Instant, request: WaitingRequest, peer: NodeId,
182    ) -> Self {
183        Self {
184            time_to_send,
185            request,
186            peer,
187        }
188    }
189}
190
191impl Ord for TimedWaitingRequest {
192    fn cmp(&self, other: &Self) -> Ordering {
193        other.time_to_send.cmp(&self.time_to_send)
194    }
195}
196impl PartialOrd for TimedWaitingRequest {
197    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
198        Some(self.cmp(other))
199    }
200}
201impl Eq for TimedWaitingRequest {}
202impl PartialEq for TimedWaitingRequest {
203    fn eq(&self, other: &Self) -> bool {
204        self.time_to_send == other.time_to_send
205    }
206}