1use crate::{
6 message::{Message, SetRequestId},
7 sync::{
8 message::{DynamicCapability, KeyContainer},
9 request_manager::RequestManager,
10 synchronization_protocol_handler::ProtocolConfiguration,
11 Error,
12 },
13 NodeType,
14};
15use cfx_parameters::sync::FAILED_REQUEST_RESEND_WAIT;
16use malloc_size_of::MallocSizeOf;
17use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
18use network::{
19 node_table::NodeId, Error as NetworkError, NetworkContext,
20 UpdateNodeOperation,
21};
22use parking_lot::Mutex;
23use std::{
24 any::Any,
25 cmp::Ordering,
26 collections::{BinaryHeap, HashMap, HashSet, VecDeque},
27 fmt::Debug,
28 mem,
29 sync::{
30 atomic::{AtomicBool, Ordering as AtomicOrdering},
31 Arc,
32 },
33 time::{Duration, Instant, SystemTime, UNIX_EPOCH},
34};
35
36#[derive(DeriveMallocSizeOf)]
37pub struct RequestHandler {
38 protocol_config: ProtocolConfiguration,
39 peers: Mutex<HashMap<NodeId, RequestContainer>>,
40 requests_queue: Mutex<BinaryHeap<Arc<TimedSyncRequests>>>,
41}
42
43impl RequestHandler {
44 pub fn new(protocol_config: &ProtocolConfiguration) -> Self {
45 Self {
46 protocol_config: protocol_config.clone(),
47 peers: Mutex::new(HashMap::new()),
48 requests_queue: Default::default(),
49 }
50 }
51
52 pub fn add_peer(&self, peer_id: NodeId) {
53 self.peers.lock().insert(
54 peer_id,
55 RequestContainer {
56 peer_id,
57 inflight_requests: HashMap::new(),
58 next_request_id: rand::random(),
62 max_inflight_request_count: self
63 .protocol_config
64 .max_inflight_request_count,
65 ..Default::default()
66 },
67 );
68 }
69
70 pub fn match_request(
76 &self, peer_id: &NodeId, request_id: u64,
77 ) -> Result<RequestMessage, Error> {
78 let mut peers = self.peers.lock();
79 if let Some(peer) = peers.get_mut(peer_id) {
80 peer.match_request(request_id)
81 } else {
82 bail!(Error::UnknownPeer);
83 }
84 }
85
86 pub fn send_pending_requests(
87 &self, io: &dyn NetworkContext, peer: &NodeId,
88 ) {
89 if let Some(peer_info) = self.peers.lock().get_mut(peer) {
90 peer_info.send_pending_requests(
91 io,
92 &mut *self.requests_queue.lock(),
93 &self.protocol_config,
94 );
95 }
96 }
97
98 pub fn send_request(
101 &self, io: &dyn NetworkContext, peer: Option<NodeId>,
102 request: Box<dyn Request>, delay: Option<Duration>,
103 ) -> Result<(), Box<dyn Request>> {
104 let peer = match peer {
105 Some(peer) => peer,
106 None => return Err(request),
107 };
108
109 let peers = &mut *self.peers.lock();
110 let peer_info = match peers.get_mut(&peer) {
111 Some(peer) => peer,
112 None => return Err(request),
113 };
114
115 let msg = RequestMessage::new(request, delay);
116
117 let request_id = match peer_info.get_next_request_id() {
118 Some(id) => id,
119 None => {
120 peer_info.append_pending_request(msg);
121 return Ok(());
122 }
123 };
124
125 peer_info.immediate_send_request_to_peer(
126 io,
127 request_id,
128 msg,
129 &mut *self.requests_queue.lock(),
130 &self.protocol_config,
131 );
132
133 Ok(())
134 }
135
136 fn get_timeout_sync_requests(&self) -> Vec<Arc<TimedSyncRequests>> {
137 let mut requests = self.requests_queue.lock();
138 let mut timeout_requests = Vec::new();
139 let now = Instant::now();
140 loop {
141 if requests.is_empty() {
142 break;
143 }
144 let sync_req = requests.pop().expect("queue not empty");
145 if sync_req.removed.load(AtomicOrdering::Relaxed) == true {
146 continue;
147 }
148 if sync_req.timeout_time >= now {
149 requests.push(sync_req);
150 break;
151 } else {
152 debug!("Timeout request {:?}", sync_req);
153 timeout_requests.push(sync_req);
154 }
155 }
156 timeout_requests
157 }
158
159 pub fn process_timeout_requests(
160 &self, io: &dyn NetworkContext,
161 ) -> Vec<RequestMessage> {
162 let mut timeout_requests = Vec::new();
164 let mut peers_to_disconnect = HashSet::new();
165 let mut peers_to_send_pending_requests = HashSet::new();
166 for sync_req in self.get_timeout_sync_requests() {
167 if let Ok(mut req) =
168 self.match_request(&sync_req.peer_id, sync_req.request_id)
169 {
170 let peer_id = sync_req.peer_id.clone();
171 if let Some(request_container) =
172 self.peers.lock().get_mut(&peer_id)
173 {
174 if request_container
175 .on_timeout_should_disconnect(&self.protocol_config)
176 {
177 peers_to_disconnect.insert(peer_id);
178 } else {
179 peers_to_send_pending_requests.insert(peer_id);
180 }
181 }
182 req.request.notify_timeout();
183 timeout_requests.push(req);
184 } else {
185 debug!("Timeout a removed request {:?}", sync_req);
186 }
187 }
188 let op = if self.protocol_config.demote_peer_for_timeout {
189 Some(UpdateNodeOperation::Demotion)
190 } else {
191 Some(UpdateNodeOperation::Failure)
192 };
193 for peer_id in peers_to_disconnect {
194 io.disconnect_peer(
197 &peer_id,
198 op,
199 "too many timeout requests", );
201 }
202 for peer_id in peers_to_send_pending_requests {
203 self.send_pending_requests(io, &peer_id);
204 }
205
206 timeout_requests
207 }
208
209 pub fn remove_peer(&self, peer_id: &NodeId) -> Option<Vec<RequestMessage>> {
211 self.peers
212 .lock()
213 .remove(peer_id)
214 .map(|mut p| p.get_unfinished_requests())
215 }
216}
217
218#[derive(Default, DeriveMallocSizeOf)]
219struct RequestContainer {
220 peer_id: NodeId,
221 pub inflight_requests: HashMap<u64, SynchronizationPeerRequest>,
222 pub next_request_id: u64,
223 pub max_inflight_request_count: u64,
224 pub pending_requests: VecDeque<RequestMessage>,
225 pub timeout_statistics: VecDeque<u64>,
226}
227
228impl RequestContainer {
229 pub fn on_timeout_should_disconnect(
230 &mut self, config: &ProtocolConfiguration,
231 ) -> bool {
232 let now = SystemTime::now()
233 .duration_since(UNIX_EPOCH)
234 .unwrap()
235 .as_secs();
236 if self.timeout_statistics.is_empty() {
237 self.timeout_statistics.push_back(now);
238 return false;
239 }
240
241 self.timeout_statistics.push_back(now);
242 loop {
243 let old_time = *self.timeout_statistics.front().unwrap();
244 if now - old_time <= config.timeout_observing_period_s {
245 break;
246 }
247 self.timeout_statistics.pop_front();
248 }
249
250 if self.timeout_statistics.len()
251 <= config.max_allowed_timeout_in_observing_period as usize
252 {
253 return false;
254 } else {
255 return true;
256 }
257 }
258
259 pub fn get_next_request_id(&mut self) -> Option<u64> {
263 if self.inflight_requests.len()
264 < self.max_inflight_request_count as usize
265 {
266 let id = self.next_request_id;
267 self.next_request_id += 1;
268 Some(id)
269 } else {
270 None
271 }
272 }
273
274 pub fn append_inflight_request(
275 &mut self, request_id: u64, message: RequestMessage,
276 timed_req: Arc<TimedSyncRequests>,
277 ) {
278 self.inflight_requests.insert(
279 request_id,
280 SynchronizationPeerRequest { message, timed_req },
281 );
282 }
283
284 pub fn append_pending_request(&mut self, msg: RequestMessage) {
285 self.pending_requests.push_back(msg);
286 }
287
288 pub fn has_pending_requests(&self) -> bool {
289 !self.pending_requests.is_empty()
290 }
291
292 pub fn pop_pending_request(&mut self) -> Option<RequestMessage> {
293 self.pending_requests.pop_front()
294 }
295
296 pub fn remove_inflight_request(
297 &mut self, request_id: u64,
298 ) -> Option<SynchronizationPeerRequest> {
299 if let Some(save_req) = self.inflight_requests.remove(&request_id) {
300 Some(save_req)
301 } else {
302 debug!(
303 "Remove out of bound request peer={} request_id={} next={}",
304 self.peer_id, request_id, self.next_request_id
305 );
306 None
307 }
308 }
309
310 fn immediate_send_request_to_peer(
311 &mut self, io: &dyn NetworkContext, request_id: u64,
312 mut request_message: RequestMessage,
313 requests_queue: &mut BinaryHeap<Arc<TimedSyncRequests>>,
314 protocol_config: &ProtocolConfiguration,
315 ) {
316 request_message.request.set_request_id(request_id);
317 let res = request_message.request.send(io, &self.peer_id);
318 let is_send_error = if let Err(e) = res {
319 match e {
320 NetworkError::OversizedPacket => {
321 panic!("Request packet should not be oversized!")
322 }
323 _ => {}
324 }
325 true
326 } else {
327 false
328 };
329
330 let timed_req = Arc::new(TimedSyncRequests::from_request(
331 self.peer_id,
332 request_id,
333 &request_message,
334 protocol_config,
335 is_send_error,
336 ));
337 self.append_inflight_request(
338 request_id,
339 request_message,
340 timed_req.clone(),
341 );
342 requests_queue.push(timed_req);
343 }
344
345 pub fn send_pending_requests(
349 &mut self, io: &dyn NetworkContext,
350 requests_queue: &mut BinaryHeap<Arc<TimedSyncRequests>>,
351 protocol_config: &ProtocolConfiguration,
352 ) {
353 trace!("send_pending_requests: len={}", self.pending_requests.len());
354 while self.has_pending_requests() {
355 if let Some(new_request_id) = self.get_next_request_id() {
356 let pending_msg = self.pop_pending_request().unwrap();
357
358 self.immediate_send_request_to_peer(
359 io,
360 new_request_id,
361 pending_msg,
362 requests_queue,
363 protocol_config,
364 );
365 } else {
366 break;
367 }
368 }
369 }
370
371 pub fn match_request(
376 &mut self, request_id: u64,
377 ) -> Result<RequestMessage, Error> {
378 let removed_req = self.remove_inflight_request(request_id);
379 if let Some(removed_req) = removed_req {
380 removed_req
381 .timed_req
382 .removed
383 .store(true, AtomicOrdering::Relaxed);
384 Ok(removed_req.message)
385 } else {
386 bail!(Error::RequestNotFound)
387 }
388 }
389
390 pub fn get_unfinished_requests(&mut self) -> Vec<RequestMessage> {
391 let mut unfinished_requests = Vec::new();
392 let mut new_map = HashMap::new();
393 mem::swap(&mut self.inflight_requests, &mut new_map);
394 for (_, req) in new_map {
395 req.timed_req.removed.store(true, AtomicOrdering::Relaxed);
396 unfinished_requests.push(req.message);
397 }
398
399 while let Some(req) = self.pending_requests.pop_front() {
400 unfinished_requests.push(req);
401 }
402 unfinished_requests
403 }
404}
405
406#[derive(Debug, DeriveMallocSizeOf)]
407pub struct SynchronizationPeerRequest {
408 pub message: RequestMessage,
409 pub timed_req: Arc<TimedSyncRequests>,
410}
411
412pub trait AsAny {
414 fn as_any(&self) -> &dyn Any;
415 fn as_any_mut(&mut self) -> &mut dyn Any;
416}
417
418pub trait Request:
420 Send + Debug + AsAny + Message + SetRequestId + MallocSizeOf
421{
422 fn timeout(&self, conf: &ProtocolConfiguration) -> Duration;
424
425 fn on_removed(&self, inflight_keys: &KeyContainer);
428 fn with_inflight(&mut self, inflight_keys: &KeyContainer);
432 fn is_empty(&self) -> bool;
435 fn notify_empty(&mut self) {}
437 fn resend(&self) -> Option<Box<dyn Request>>;
447
448 fn required_capability(&self) -> Option<DynamicCapability> { None }
450
451 fn notify_timeout(&mut self) {}
453
454 fn preferred_node_type(&self) -> Option<NodeType> { None }
456}
457
458#[derive(Debug, DeriveMallocSizeOf)]
459pub struct RequestMessage {
460 pub request: Box<dyn Request>,
461 pub delay: Option<Duration>,
462}
463
464impl RequestMessage {
465 pub fn new(request: Box<dyn Request>, delay: Option<Duration>) -> Self {
466 RequestMessage { request, delay }
467 }
468
469 pub fn set_request_id(&mut self, request_id: u64) {
470 self.request.set_request_id(request_id);
471 }
472
473 pub fn downcast_ref<T: Request + Any>(
477 &self, io: &dyn NetworkContext, request_manager: &RequestManager,
478 ) -> Result<&T, Error> {
479 match self.request.as_any().downcast_ref::<T>() {
480 Some(req) => Ok(req),
481 None => {
482 warn!("failed to downcast general request to concrete request type, message = {:?}", self);
483 if let Some(resent_request) = self.request.resend() {
484 request_manager.resend_request_to_another_peer(
485 io,
486 &RequestMessage::new(resent_request, self.delay),
487 );
488 }
489 Err(Error::UnexpectedResponse.into())
490 }
491 }
492 }
493
494 pub fn downcast_mut<T: Request + Any>(
495 &mut self, _io: &dyn NetworkContext, _request_manager: &RequestManager,
496 ) -> Result<&mut T, Error> {
497 match self.request.as_any_mut().downcast_mut::<T>() {
498 Some(req) => Ok(req),
499 None => {
500 warn!("failed to downcast general request to concrete request type");
501 Err(Error::UnexpectedResponse.into())
502 }
503 }
504 }
505}
506
507#[derive(Debug, DeriveMallocSizeOf)]
508pub struct TimedSyncRequests {
509 pub peer_id: NodeId,
510 pub timeout_time: Instant,
511 pub request_id: u64,
512 pub removed: AtomicBool,
513}
514
515impl TimedSyncRequests {
516 pub fn new(
517 peer_id: NodeId, timeout: Duration, request_id: u64,
518 ) -> TimedSyncRequests {
519 TimedSyncRequests {
520 peer_id,
521 timeout_time: Instant::now() + timeout,
522 request_id,
523 removed: AtomicBool::new(false),
524 }
525 }
526
527 pub fn from_request(
528 peer_id: NodeId, request_id: u64, msg: &RequestMessage,
529 conf: &ProtocolConfiguration, is_send_error: bool,
530 ) -> TimedSyncRequests {
531 let timeout = if is_send_error {
532 FAILED_REQUEST_RESEND_WAIT.clone()
533 } else {
534 msg.request.timeout(conf)
535 };
536 TimedSyncRequests::new(peer_id, timeout, request_id)
537 }
538}
539
540impl Ord for TimedSyncRequests {
541 fn cmp(&self, other: &Self) -> Ordering {
542 other.timeout_time.cmp(&self.timeout_time)
543 }
544}
545
546impl PartialOrd for TimedSyncRequests {
547 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
548 Some(self.cmp(other))
549 }
550}
551
552impl Eq for TimedSyncRequests {}
553
554impl PartialEq for TimedSyncRequests {
555 fn eq(&self, other: &Self) -> bool {
556 self.timeout_time == other.timeout_time
557 }
558}