cfxcore/pos/protocol/request_manager/
mod.rs1use crate::{
6 pos::protocol::sync_protocol::RpcResponse,
7 sync::{Error, ProtocolConfiguration},
8};
9use cfx_parameters::sync::REQUEST_START_WAITING_TIME;
10use diem_logger::prelude::diem_debug;
11use futures::{channel::oneshot, future::Future};
12use network::{node_table::NodeId, NetworkContext};
13use parking_lot::Mutex;
14pub use request_handler::{
15 AsAny, Request, RequestHandler, RequestMessage, SynchronizationPeerRequest,
16};
17use std::{
18 cmp::Ordering,
19 collections::binary_heap::BinaryHeap,
20 sync::Arc,
21 time::{Duration, Instant},
22};
23
24pub mod request_handler;
25
26#[derive(Debug)]
27struct WaitingRequest(Box<dyn Request>, Duration); pub struct RequestManager {
30 waiting_requests: Mutex<BinaryHeap<TimedWaitingRequest>>,
32
33 request_handler: Arc<RequestHandler>,
35}
36
37impl RequestManager {
38 pub fn new(protocol_config: &ProtocolConfiguration) -> Self {
39 Self {
40 waiting_requests: Default::default(),
41 request_handler: Arc::new(RequestHandler::new(protocol_config)),
42 }
43 }
44
45 pub async fn unary_rpc<'a>(
47 &'a self, io: &'a dyn NetworkContext, recipient: Option<NodeId>,
48 mut request: Box<dyn Request>,
49 ) -> impl Future<Output = Result<Box<dyn RpcResponse>, Error>> + 'a {
50 async move {
51 let (res_tx, res_rx) = oneshot::channel();
53 request.set_response_notification(res_tx);
54
55 self.request_with_delay(io, request, recipient, None);
56
57 let response = res_rx.await??;
59 Ok(response)
60 }
61 }
62
63 pub fn request_with_delay(
66 &self, io: &dyn NetworkContext, mut request: Box<dyn Request>,
67 peer: Option<NodeId>, delay: Option<Duration>,
68 ) {
69 let (cur_delay, next_delay) = match delay {
71 Some(d) => (d, d + *REQUEST_START_WAITING_TIME),
72 None => (*REQUEST_START_WAITING_TIME, *REQUEST_START_WAITING_TIME),
73 };
74
75 if peer.is_none() {
76 request.notify_error(Error::RpcCancelledByDisconnection.into());
77 return;
78 }
79
80 if delay.is_some() {
82 diem_debug!("request_with_delay: add request to waiting_requests, peer={:?}, request={:?}, delay={:?}", peer, request, cur_delay);
86 self.waiting_requests.lock().push(TimedWaitingRequest::new(
87 Instant::now() + cur_delay,
88 WaitingRequest(request, next_delay),
89 peer.unwrap(),
90 ));
91
92 return;
93 }
94
95 if let Err(mut req) = self.request_handler.send_request(
96 io,
97 peer,
98 request,
99 Some(next_delay),
100 ) {
101 debug!("request_with_delay: send_request fails, peer={:?}, request={:?}", peer, req);
102 req.notify_error(Error::RpcCancelledByDisconnection.into());
103 }
104 }
105
106 pub fn match_request(
109 &self, io: &dyn NetworkContext, peer_id: &NodeId, request_id: u64,
110 ) -> Result<RequestMessage, Error> {
111 self.request_handler.match_request(io, peer_id, request_id)
112 }
113
114 pub fn process_timeout_requests(&self, io: &dyn NetworkContext) {
115 trace!("process_timeout_requests: start");
116 let timeout_requests = self.request_handler.get_timeout_requests(io);
117 for mut req in timeout_requests {
118 debug!("Timeout requests: {:?}", req);
119 req.request.notify_error(Error::RpcTimeout.into());
120 }
121 }
122
123 pub fn resend_waiting_requests(&self, io: &dyn NetworkContext) {
125 debug!("resend_waiting_requests: start");
126 let mut waiting_requests = self.waiting_requests.lock();
127 let now = Instant::now();
128
129 while let Some(req) = waiting_requests.pop() {
130 if req.time_to_send >= now {
131 waiting_requests.push(req);
132 break;
133 }
134
135 let chosen_peer = req.peer;
136 debug!("Send waiting req {:?} to peer={}", req, chosen_peer);
137
138 let WaitingRequest(request, delay) = req.request;
139 let next_delay = delay + *REQUEST_START_WAITING_TIME;
140
141 if let Err(mut req) = self.request_handler.send_request(
142 io,
143 Some(chosen_peer),
144 request,
145 Some(next_delay),
146 ) {
147 req.notify_error(Error::RpcCancelledByDisconnection.into());
148 }
149 }
150 }
151
152 pub fn on_peer_connected(&self, peer: &NodeId) {
153 self.request_handler.add_peer(*peer);
154 }
155
156 pub fn on_peer_disconnected(
157 &self, _io: &dyn NetworkContext, peer: &NodeId,
158 ) {
159 if let Some(unfinished_requests) =
160 self.request_handler.remove_peer(peer)
161 {
162 for mut msg in unfinished_requests {
163 msg.request
164 .notify_error(Error::RpcCancelledByDisconnection.into());
165 }
166 } else {
167 debug!("Peer already removed form request manager when disconnected peer={}", peer);
168 }
169 }
170}
171
172#[derive(Debug)]
173struct TimedWaitingRequest {
174 time_to_send: Instant,
175 request: WaitingRequest,
176 peer: NodeId,
177}
178
179impl TimedWaitingRequest {
180 fn new(
181 time_to_send: Instant, request: WaitingRequest, peer: NodeId,
182 ) -> Self {
183 Self {
184 time_to_send,
185 request,
186 peer,
187 }
188 }
189}
190
191impl Ord for TimedWaitingRequest {
192 fn cmp(&self, other: &Self) -> Ordering {
193 other.time_to_send.cmp(&self.time_to_send)
194 }
195}
196impl PartialOrd for TimedWaitingRequest {
197 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
198 Some(self.cmp(other))
199 }
200}
201impl Eq for TimedWaitingRequest {}
202impl PartialEq for TimedWaitingRequest {
203 fn eq(&self, other: &Self) -> bool {
204 self.time_to_send == other.time_to_send
205 }
206}