1use super::{
6 synchronization_protocol_handler::ProtocolConfiguration,
7 synchronization_state::SynchronizationState,
8};
9use crate::{
10 sync::{
11 message::{
12 msgid, GetBlockHashesByEpoch, GetBlockHeaders, GetBlockTxn,
13 GetBlocks, GetCompactBlocks, GetTransactions,
14 GetTransactionsFromTxHashes, Key, KeyContainer, TransactionDigests,
15 },
16 request_manager::request_batcher::RequestBatcher,
17 synchronization_protocol_handler::{AsyncTaskQueue, RecoverPublicTask},
18 synchronization_state::PeerFilter,
19 Error,
20 },
21 NodeType,
22};
23use cfx_parameters::sync::REQUEST_START_WAITING_TIME;
24use cfx_types::H256;
25use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
26use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
27use metrics::{
28 register_meter_with_group, Gauge, GaugeUsize, Meter, MeterTimer,
29};
30use network::{node_table::NodeId, NetworkContext};
31use parking_lot::{Mutex, RwLock};
32use primitives::{SignedTransaction, TransactionWithSignature};
33pub use request_handler::{
34 AsAny, Request, RequestHandler, RequestMessage, SynchronizationPeerRequest,
35};
36use std::{
37 cmp::Ordering,
38 collections::{binary_heap::BinaryHeap, HashSet},
39 sync::Arc,
40 time::{Duration, Instant},
41};
42use tx_handler::{
43 InflightPendingTransactionContainer, InflightPendingTransactionItem,
44 ReceivedTransactionContainer, SentTransactionContainer,
45};
46
47mod request_batcher;
48mod request_handler;
49pub mod tx_handler;
50
51lazy_static! {
52 static ref TX_REQUEST_METER: Arc<dyn Meter> =
53 register_meter_with_group("system_metrics", "tx_diff_set_size");
54 static ref TX_REQUEST_TX_HASHES_METER: Arc<dyn Meter> =
55 register_meter_with_group(
56 "system_metrics",
57 "tx_request_tx_hashes_size"
58 );
59 static ref REQUEST_MANAGER_TIMER: Arc<dyn Meter> =
60 register_meter_with_group("timer", "request_manager::request_not_tx");
61 static ref REQUEST_MANAGER_TX_TIMER: Arc<dyn Meter> =
62 register_meter_with_group("timer", "request_manager::request_tx");
63 static ref TX_RECEIVED_POOL_METER: Arc<dyn Meter> =
64 register_meter_with_group("system_metrics", "tx_received_pool_size");
65 static ref INFLIGHT_TX_POOL_GAUGE: Arc<dyn Gauge<usize>> =
66 GaugeUsize::register_with_group(
67 "system_metrics",
68 "inflight_tx_pool_size"
69 );
70 static ref TX_HASHES_INFLIGHT_TX_POOL_GAUGE: Arc<dyn Gauge<usize>> =
71 GaugeUsize::register_with_group(
72 "system_metrics",
73 "tx_hashes_inflight_tx_pool_size"
74 );
75 static ref INFLIGHT_TX_PENDING_POOL_METER: Arc<dyn Meter> =
76 register_meter_with_group(
77 "system_metrics",
78 "inflight_tx_pending_pool_size"
79 );
80 static ref INFLIGHT_TX_REJECT_METER: Arc<dyn Meter> =
81 register_meter_with_group("system_metrics", "inflight_tx_reject_size");
82 static ref REQUEST_TX_FROM_INFLIGHT_PENDING_POOL_METER: Arc<dyn Meter> =
83 register_meter_with_group(
84 "system_metrics",
85 "request_tx_from_inflight_pending_pool"
86 );
87
88 static ref DEFAULT_REQUEST_DELAY_UPPER_BOUND: Duration =
97 Duration::from_secs(600);
98 static ref DEFAULT_REQUEST_BATCH_BUCKET_SIZE: Duration =
99 Duration::from_secs(2);
100}
101
102#[derive(Debug)]
103struct WaitingRequest(Box<dyn Request>, Duration); impl MallocSizeOf for WaitingRequest {
106 fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
107 self.0.size_of(ops) + self.1.size_of(ops)
108 }
109}
110
111#[derive(DeriveMallocSizeOf)]
121pub struct RequestManager {
122 inflight_keys: KeyContainer,
124
125 waiting_requests: Mutex<BinaryHeap<TimedWaitingRequest>>,
127
128 sent_transactions: RwLock<SentTransactionContainer>,
133 pub received_transactions: Arc<RwLock<ReceivedTransactionContainer>>,
134 pub inflight_pending_transactions:
136 Arc<RwLock<InflightPendingTransactionContainer>>,
137
138 request_handler: Arc<RequestHandler>,
140
141 syn: Arc<SynchronizationState>,
142
143 #[ignore_malloc_size_of = "channels are not handled in MallocSizeOf"]
144 recover_public_queue: Arc<AsyncTaskQueue<RecoverPublicTask>>,
145}
146
147impl RequestManager {
148 pub fn new(
149 protocol_config: &ProtocolConfiguration,
150 syn: Arc<SynchronizationState>,
151 recover_public_queue: Arc<AsyncTaskQueue<RecoverPublicTask>>,
152 ) -> Self {
153 let received_tx_index_maintain_timeout =
154 protocol_config.received_tx_index_maintain_timeout;
155 let inflight_pending_tx_index_maintain_timeout =
156 protocol_config.inflight_pending_tx_index_maintain_timeout;
157
158 let sent_transaction_window_size =
160 protocol_config.tx_maintained_for_peer_timeout.as_millis()
161 / protocol_config.send_tx_period.as_millis();
162 Self {
163 received_transactions: Arc::new(RwLock::new(
164 ReceivedTransactionContainer::new(
165 received_tx_index_maintain_timeout.as_secs(),
166 ),
167 )),
168 inflight_pending_transactions: Arc::new(RwLock::new(
169 InflightPendingTransactionContainer::new(
170 inflight_pending_tx_index_maintain_timeout.as_secs(),
171 ),
172 )),
173 sent_transactions: RwLock::new(SentTransactionContainer::new(
174 sent_transaction_window_size as usize,
175 )),
176 inflight_keys: Default::default(),
177 waiting_requests: Default::default(),
178 request_handler: Arc::new(RequestHandler::new(protocol_config)),
179 syn,
180 recover_public_queue,
181 }
182 }
183
184 pub fn num_epochs_in_flight(&self) -> u64 {
185 self.inflight_keys
186 .read(msgid::GET_BLOCK_HASHES_BY_EPOCH)
187 .len() as u64
188 }
189
190 pub fn in_flight_blocks(&self) -> HashSet<H256> {
191 self.inflight_keys
192 .read(msgid::GET_BLOCKS)
193 .iter()
194 .map(|key| match key {
195 Key::Hash(h) => *h,
196 Key::Num(_) | Key::Id(_) => {
197 unreachable!("GET_BLOCKS only has hash as key");
198 }
199 })
200 .collect()
201 }
202
203 pub fn request_with_delay(
206 &self, io: &dyn NetworkContext, mut request: Box<dyn Request>,
207 mut peer: Option<NodeId>, delay: Option<Duration>,
208 ) {
209 request.with_inflight(&self.inflight_keys);
211
212 if request.is_empty() {
213 request.notify_empty();
214 return;
215 }
216 if peer.is_some()
219 && delay.is_none()
220 && !self.check_and_update_net_inflight_blocks(&request)
221 {
222 peer = None;
223 }
224
225 let (cur_delay, next_delay) = match delay {
227 Some(d) => (d, d + *REQUEST_START_WAITING_TIME),
228 None => (*REQUEST_START_WAITING_TIME, *REQUEST_START_WAITING_TIME),
229 };
230
231 if peer.is_none() || delay.is_some() {
233 debug!("request_with_delay: add request to waiting_requests, peer={:?}, request={:?}, delay={:?}", peer, request, cur_delay);
234 self.waiting_requests.lock().push(TimedWaitingRequest::new(
235 Instant::now() + cur_delay,
236 WaitingRequest(request, next_delay),
237 peer,
238 ));
239 return;
240 }
241
242 if let Err(e) = self.request_handler.send_request(
243 io,
244 peer,
245 request,
246 Some(next_delay),
247 ) {
248 debug!("request_with_delay: send_request fails, peer={:?}, request={:?}", peer, e);
249 if let Some(hashes) = try_get_block_hashes(&e) {
253 self.remove_net_inflight_blocks(hashes.iter())
254 }
255 self.waiting_requests.lock().push(TimedWaitingRequest::new(
256 Instant::now() + cur_delay,
257 WaitingRequest(e, next_delay),
258 None,
259 ));
260 }
261 }
262
263 pub fn request_block_headers(
264 &self, io: &dyn NetworkContext, peer_id: Option<NodeId>,
265 hashes: Vec<H256>, delay: Option<Duration>,
266 ) {
267 let _timer = MeterTimer::time_func(REQUEST_MANAGER_TIMER.as_ref());
268
269 debug!("request_block_headers: {:?}, peer {:?}", hashes, peer_id);
270
271 let request = GetBlockHeaders {
272 request_id: 0,
273 hashes,
274 };
275
276 self.request_with_delay(io, Box::new(request), peer_id, delay);
277 }
278
279 pub fn request_epoch_hashes(
280 &self, io: &dyn NetworkContext, peer_id: Option<NodeId>,
281 epochs: Vec<u64>, delay: Option<Duration>,
282 ) {
283 let request = GetBlockHashesByEpoch {
284 request_id: 0,
285 epochs,
286 };
287
288 self.request_with_delay(io, Box::new(request), peer_id, delay);
289 }
290
291 pub fn request_blocks(
292 &self, io: &dyn NetworkContext, peer_id: Option<NodeId>,
293 hashes: Vec<H256>, with_public: bool, delay: Option<Duration>,
294 preferred_node_type: Option<NodeType>,
295 ) {
296 let _timer = MeterTimer::time_func(REQUEST_MANAGER_TIMER.as_ref());
297 debug!("request_blocks: hashes={:?}", hashes);
298
299 let request = GetBlocks {
300 request_id: 0,
301 with_public,
302 hashes,
303 preferred_node_type,
304 };
305
306 self.request_with_delay(io, Box::new(request), peer_id, delay);
307 }
308
309 pub fn request_transactions_from_digest(
310 &self, io: &dyn NetworkContext, peer_id: NodeId,
311 transaction_digests: &TransactionDigests,
312 ) {
313 let _timer = MeterTimer::time_func(REQUEST_MANAGER_TX_TIMER.as_ref());
314
315 let window_index: usize = transaction_digests.window_index;
316 let key1 = transaction_digests.key1;
317 let key2 = transaction_digests.key2;
318 let (random_byte_vector, fixed_bytes_vector) =
319 transaction_digests.get_decomposed_short_ids();
320
321 if fixed_bytes_vector.is_empty()
322 && transaction_digests.tx_hashes.is_empty()
323 {
324 return;
325 }
326
327 let mut tx_from_short_id_inflight_keys =
328 self.inflight_keys.write(msgid::GET_TRANSACTIONS);
329 let mut tx_from_hashes_inflight_keys = self
330 .inflight_keys
331 .write(msgid::GET_TRANSACTIONS_FROM_TX_HASHES);
332 let received_transactions = self.received_transactions.read();
333
334 INFLIGHT_TX_POOL_GAUGE.update(tx_from_short_id_inflight_keys.len());
335 TX_HASHES_INFLIGHT_TX_POOL_GAUGE
336 .update(tx_from_hashes_inflight_keys.len());
337 TX_RECEIVED_POOL_METER.mark(received_transactions.get_length());
338
339 let (
340 short_ids,
341 tx_hashes,
342 tx_request_indices,
343 hashes_request_indices,
344 inflight_pending_items,
345 ) = {
346 let mut short_ids = HashSet::new();
347 let mut tx_hashes = HashSet::new();
348 let mut tx_request_indices = Vec::new();
349 let mut hashes_request_indices = Vec::new();
350 let mut inflight_pending_items: Vec<
351 InflightPendingTransactionItem,
352 > = Vec::new();
353
354 for i in 0..fixed_bytes_vector.len() {
356 let fixed_bytes = fixed_bytes_vector[i];
357 let random_bytes = random_byte_vector[i];
358
359 if received_transactions.contains_short_id(
360 fixed_bytes,
361 random_bytes,
362 key1,
363 key2,
364 ) {
365 if received_transactions.group_overflow(&fixed_bytes) {
366 hashes_request_indices.push(i);
367 }
368 continue;
370 }
371
372 if tx_from_short_id_inflight_keys.insert(Key::Id(fixed_bytes)) {
373 tx_request_indices.push(i);
374 short_ids.insert(fixed_bytes);
375 } else {
376 inflight_pending_items.push(
378 InflightPendingTransactionItem::new(
379 fixed_bytes,
380 random_bytes,
381 window_index,
382 key1,
383 key2,
384 i,
385 peer_id.clone(),
386 ),
387 );
388 INFLIGHT_TX_PENDING_POOL_METER.mark(1);
389 }
390 }
391
392 let base_index = fixed_bytes_vector.len();
394 for i in 0..transaction_digests.tx_hashes.len() {
395 let tx_hash = transaction_digests.tx_hashes[i];
396 if received_transactions.contains_tx_hash(&tx_hash) {
397 continue;
398 }
399 if tx_from_hashes_inflight_keys.insert(Key::Hash(tx_hash)) {
400 tx_request_indices.push(base_index + i);
401 tx_hashes.insert(tx_hash);
402 } else {
403 INFLIGHT_TX_REJECT_METER.mark(1);
405 }
406 }
407
408 (
409 short_ids,
410 tx_hashes,
411 tx_request_indices,
412 hashes_request_indices,
413 inflight_pending_items,
414 )
415 };
416 TX_REQUEST_METER.mark(tx_request_indices.len());
417 TX_REQUEST_TX_HASHES_METER.mark(hashes_request_indices.len());
418 debug!(
419 "Request {} tx and {} tx hashes from peer={}",
420 tx_request_indices.len(),
421 hashes_request_indices.len(),
422 peer_id.clone()
423 );
424
425 let request = GetTransactions {
426 request_id: 0,
427 window_index,
428 indices: tx_request_indices,
429 tx_hashes_indices: hashes_request_indices,
430 short_ids: short_ids.clone(),
431 tx_hashes: tx_hashes.clone(),
432 };
433
434 if request.is_empty() {
435 return;
436 }
437
438 if self
439 .request_handler
440 .send_request(io, Some(peer_id), Box::new(request), None)
441 .is_err()
442 {
443 for id in short_ids {
444 tx_from_short_id_inflight_keys.remove(&Key::Id(id));
445 }
446 for id in tx_hashes {
447 tx_from_hashes_inflight_keys.remove(&Key::Hash(id));
448 }
449 return;
450 }
451
452 self.inflight_pending_transactions
453 .write()
454 .append_inflight_pending_items(inflight_pending_items);
455 }
456
457 pub fn request_transactions_from_tx_hashes(
458 &self, io: &dyn NetworkContext, peer_id: NodeId,
459 responded_tx_hashes: Vec<H256>, window_index: usize,
460 tx_hashes_indices: &Vec<usize>,
461 ) {
462 let _timer = MeterTimer::time_func(REQUEST_MANAGER_TX_TIMER.as_ref());
463
464 if responded_tx_hashes.is_empty() {
465 return;
466 }
467
468 let mut tx_from_hashes_inflight_keys = self
469 .inflight_keys
470 .write(msgid::GET_TRANSACTIONS_FROM_TX_HASHES);
471 let received_transactions = self.received_transactions.read();
472
473 TX_HASHES_INFLIGHT_TX_POOL_GAUGE
474 .update(tx_from_hashes_inflight_keys.len());
475 TX_RECEIVED_POOL_METER.mark(received_transactions.get_length());
476
477 let (tx_hashes, indices) = {
478 let mut tx_hashes = HashSet::new();
479 let mut indices = Vec::new();
480
481 for i in 0..responded_tx_hashes.len() {
482 let tx_hash = responded_tx_hashes[i];
483 if received_transactions.contains_tx_hash(&tx_hash) {
484 continue;
486 }
487
488 if tx_from_hashes_inflight_keys.insert(Key::Hash(tx_hash)) {
489 indices.push(tx_hashes_indices[i]);
490 tx_hashes.insert(tx_hash);
491 } else {
492 INFLIGHT_TX_REJECT_METER.mark(1);
494 }
495 }
496
497 (tx_hashes, indices)
498 };
499 TX_REQUEST_METER.mark(tx_hashes.len());
500 debug!(
501 "Request {} tx using tx hashes from peer={}",
502 indices.len(),
503 peer_id
504 );
505
506 let request = GetTransactionsFromTxHashes {
507 request_id: 0,
508 window_index,
509 indices,
510 tx_hashes: tx_hashes.clone(),
511 };
512
513 if request.is_empty() {
514 return;
515 }
516
517 if self
518 .request_handler
519 .send_request(io, Some(peer_id), Box::new(request), None)
520 .is_err()
521 {
522 for id in tx_hashes {
523 tx_from_hashes_inflight_keys.remove(&Key::Hash(id));
524 }
525 }
526 }
527
528 pub fn request_compact_blocks(
529 &self, io: &dyn NetworkContext, peer_id: Option<NodeId>,
530 hashes: Vec<H256>, delay: Option<Duration>,
531 ) {
532 let _timer = MeterTimer::time_func(REQUEST_MANAGER_TIMER.as_ref());
533 debug!("request_compact_blocks: hashes={:?}", hashes);
534
535 let request = GetCompactBlocks {
536 request_id: 0,
537 hashes,
538 };
539
540 self.request_with_delay(io, Box::new(request), peer_id, delay);
541 }
542
543 pub fn request_blocktxn(
544 &self, io: &dyn NetworkContext, peer_id: NodeId, block_hash: H256,
545 index_skips: Vec<usize>, delay: Option<Duration>,
546 ) {
547 let _timer = MeterTimer::time_func(REQUEST_MANAGER_TIMER.as_ref());
548
549 let request = GetBlockTxn {
550 request_id: 0,
551 block_hash: block_hash.clone(),
552 index_skips,
553 };
554
555 self.request_with_delay(io, Box::new(request), Some(peer_id), delay);
556 }
557
558 fn send_request_again(
559 &self, io: &dyn NetworkContext, msg: &RequestMessage,
560 ) {
561 debug!("send_request_again, request={:?}", msg.request);
562 if let Some(request) = msg.request.resend() {
563 let mut filter = PeerFilter::new(request.msg_id());
564 if let Some(preferred_node_type) = request.preferred_node_type() {
565 filter = filter.with_preferred_node_type(preferred_node_type);
566 }
567 if let Some(cap) = request.required_capability() {
568 filter = filter.with_cap(cap);
569 }
570 let chosen_peer = filter.select(&self.syn);
571 debug!("send_request_again with new request, peer={:?}, new request={:?}", chosen_peer, request);
572 self.request_with_delay(io, request, chosen_peer, msg.delay);
573 }
574 }
575
576 pub fn send_pending_requests(
577 &self, io: &dyn NetworkContext, peer: &NodeId,
578 ) {
579 self.request_handler.send_pending_requests(io, peer)
580 }
581
582 pub fn resend_request_to_another_peer(
583 &self, io: &dyn NetworkContext, req: &RequestMessage,
584 ) {
585 req.request.on_removed(&self.inflight_keys);
586 self.send_request_again(io, req);
587 }
588
589 pub fn match_request(
592 &self, peer_id: &NodeId, request_id: u64,
593 ) -> Result<RequestMessage, Error> {
594 self.request_handler.match_request(peer_id, request_id)
595 }
596
597 pub fn headers_received(
603 &self, io: &dyn NetworkContext, req_hashes: HashSet<H256>,
604 mut received_headers: HashSet<H256>, delay: Option<Duration>,
605 ) {
606 let _timer = MeterTimer::time_func(REQUEST_MANAGER_TIMER.as_ref());
607 debug!(
608 "headers_received: req_hashes={:?} received_headers={:?}",
609 req_hashes, received_headers
610 );
611 let missing_headers = {
612 let mut inflight_keys =
613 self.inflight_keys.write(msgid::GET_BLOCK_HEADERS);
614 let mut missing_headers = Vec::new();
615 for req_hash in &req_hashes {
616 if !received_headers.remove(req_hash) {
617 if inflight_keys.remove(&Key::Hash(*req_hash)) {
622 missing_headers.push(*req_hash);
623 }
624 } else {
625 inflight_keys.remove(&Key::Hash(*req_hash));
626 }
627 }
628 for h in &received_headers {
629 inflight_keys.remove(&Key::Hash(*h));
630 }
631 missing_headers
632 };
633 if !missing_headers.is_empty() {
634 let chosen_peer =
635 PeerFilter::new(msgid::GET_BLOCK_HEADERS).select(&self.syn);
636 self.request_block_headers(io, chosen_peer, missing_headers, delay);
637 }
638 }
639
640 pub fn epochs_received(
642 &self, io: &dyn NetworkContext, req_epochs: HashSet<u64>,
643 mut received_epochs: HashSet<u64>, delay: Option<Duration>,
644 ) {
645 debug!(
646 "epochs_received: req_epochs={:?} received_epochs={:?}",
647 req_epochs, received_epochs
648 );
649 let missing_epochs = {
650 let mut inflight_keys =
651 self.inflight_keys.write(msgid::GET_BLOCK_HASHES_BY_EPOCH);
652 let mut missing_epochs = Vec::new();
653 for epoch_number in &req_epochs {
654 if !received_epochs.remove(epoch_number) {
655 if inflight_keys.remove(&Key::Num(*epoch_number)) {
660 missing_epochs.push(*epoch_number);
661 }
662 } else {
663 inflight_keys.remove(&Key::Num(*epoch_number));
664 }
665 }
666 for epoch_number in &received_epochs {
667 inflight_keys.remove(&Key::Num(*epoch_number));
668 }
669 missing_epochs
670 };
671 if !missing_epochs.is_empty() {
672 let chosen_peer = PeerFilter::new(msgid::GET_BLOCK_HASHES_BY_EPOCH)
673 .select(&self.syn);
674 self.request_epoch_hashes(io, chosen_peer, missing_epochs, delay);
675 }
676 }
677
678 pub fn blocks_received(
685 &self, io: &dyn NetworkContext, requested_hashes: HashSet<H256>,
686 mut received_blocks: HashSet<H256>, ask_full_block: bool,
687 peer: Option<NodeId>, with_public: bool, delay: Option<Duration>,
688 preferred_node_type_for_block_request: Option<NodeType>,
689 ) {
690 let _timer = MeterTimer::time_func(REQUEST_MANAGER_TIMER.as_ref());
691 debug!(
692 "blocks_received: req_hashes={:?} received_blocks={:?} peer={:?}",
693 requested_hashes, received_blocks, peer
694 );
695 let missing_blocks = {
696 let mut inflight_blocks =
697 self.inflight_keys.write(msgid::GET_BLOCKS);
698 let mut net_inflight_blocks =
699 self.inflight_keys.write(msgid::NET_INFLIGHT_BLOCKS);
700 let mut missing_blocks = Vec::new();
701 for req_hash in &requested_hashes {
702 net_inflight_blocks.remove(&Key::Hash(*req_hash));
703 if !received_blocks.remove(req_hash) {
704 if inflight_blocks.remove(&Key::Hash(*req_hash)) {
709 missing_blocks.push(*req_hash);
710 }
711 } else {
712 inflight_blocks.remove(&Key::Hash(*req_hash));
713 }
714 }
715 for h in &received_blocks {
716 net_inflight_blocks.remove(&Key::Hash(*h));
717 inflight_blocks.remove(&Key::Hash(*h));
718 }
719 missing_blocks
720 };
721 if !missing_blocks.is_empty() {
722 let chosen_peer = peer.or_else(|| {
727 let msg_id = if ask_full_block {
728 msgid::GET_BLOCKS
729 } else {
730 msgid::GET_CMPCT_BLOCKS
731 };
732
733 PeerFilter::new(msg_id).select(&self.syn)
734 });
735 if ask_full_block {
736 self.request_blocks(
737 io,
738 chosen_peer,
739 missing_blocks,
740 with_public,
741 delay,
742 preferred_node_type_for_block_request,
743 );
744 } else {
745 self.request_compact_blocks(
746 io,
747 chosen_peer,
748 missing_blocks,
749 delay,
750 );
751 }
752 }
753 }
754
755 pub fn transactions_received_from_digests(
756 &self, io: &dyn NetworkContext,
757 get_transactions_request: &GetTransactions,
758 signed_transactions: Vec<Arc<SignedTransaction>>,
759 ) {
760 let mut short_id_inflight_keys =
761 self.inflight_keys.write(msgid::GET_TRANSACTIONS);
762 let mut tx_hash_inflight_keys = self
763 .inflight_keys
764 .write(msgid::GET_TRANSACTIONS_FROM_TX_HASHES);
765
766 let (requests, keeped_short_ids) = self
767 .inflight_pending_transactions
768 .write()
769 .generate_tx_requests_from_inflight_pending_pool(
770 &signed_transactions,
771 );
772
773 self.append_received_transactions(signed_transactions);
774 for tx in &get_transactions_request.short_ids {
775 if !keeped_short_ids.contains(tx) {
776 short_id_inflight_keys.remove(&Key::Id(*tx));
777 }
778 }
779 for tx in &get_transactions_request.tx_hashes {
780 tx_hash_inflight_keys.remove(&Key::Hash(*tx));
781 }
782
783 if requests.is_empty() {
785 return;
786 }
787 REQUEST_TX_FROM_INFLIGHT_PENDING_POOL_METER.mark(requests.len());
788 for request in requests {
789 let tx_request = GetTransactions {
790 request_id: 0,
791 window_index: request.window_index,
792 indices: vec![request.index],
793 tx_hashes_indices: vec![],
794 short_ids: {
795 let mut set = HashSet::new();
796 set.insert(request.fixed_byte_part);
797 set
798 },
799 tx_hashes: HashSet::new(),
800 };
801 if self
802 .request_handler
803 .send_request(
804 io,
805 Some(request.peer_id),
806 Box::new(tx_request),
807 None,
808 )
809 .is_err()
810 {
811 short_id_inflight_keys
812 .remove(&Key::Id(request.fixed_byte_part));
813 }
814 }
815 }
816
817 pub fn transactions_received_from_tx_hashes(
818 &self, get_transactions_request: &GetTransactionsFromTxHashes,
819 signed_transactions: Vec<Arc<SignedTransaction>>,
820 ) {
821 let mut tx_hash_inflight_keys = self
822 .inflight_keys
823 .write(msgid::GET_TRANSACTIONS_FROM_TX_HASHES);
824 for tx in &get_transactions_request.tx_hashes {
825 tx_hash_inflight_keys.remove(&Key::Hash(*tx));
826 }
827 self.append_received_transactions(signed_transactions);
828 }
829
830 pub fn get_sent_transactions(
831 &self, window_index: usize, indices: &Vec<usize>,
832 ) -> Vec<TransactionWithSignature> {
833 let sent_transactions = self.sent_transactions.read();
834 let mut txs = Vec::with_capacity(indices.len());
835 for index in indices {
836 if let Some(tx) =
837 sent_transactions.get_transaction(window_index, *index)
838 {
839 txs.push(tx.transaction.clone());
840 }
841 }
842 txs
843 }
844
845 pub fn append_sent_transactions(
846 &self, transactions: Vec<Arc<SignedTransaction>>,
847 ) -> usize {
848 self.sent_transactions
849 .write()
850 .append_transactions(transactions)
851 }
852
853 pub fn append_received_transactions(
854 &self, transactions: Vec<Arc<SignedTransaction>>,
855 ) {
856 self.received_transactions
857 .write()
858 .append_transactions(transactions)
859 }
860
861 pub fn resend_timeout_requests(&self, io: &dyn NetworkContext) {
862 debug!("resend_timeout_requests: start");
863 let timeout_requests =
864 self.request_handler.process_timeout_requests(io);
865 for req in timeout_requests {
866 debug!("Timeout requests: {:?}", req);
867 self.resend_request_to_another_peer(io, &req);
868 }
869 }
870
871 pub fn resend_waiting_requests(
874 &self, io: &dyn NetworkContext, remove_timeout_requests: bool,
875 prefer_archive_node_for_blocks: bool,
876 ) -> Vec<Box<dyn Request>> {
877 debug!("resend_waiting_requests: start");
878 let mut waiting_requests = self.waiting_requests.lock();
879 let now = Instant::now();
880 let mut batcher =
881 RequestBatcher::new(*DEFAULT_REQUEST_BATCH_BUCKET_SIZE);
882
883 let mut cancelled_requests = Vec::new();
884 while let Some(req) = waiting_requests.pop() {
885 if req.time_to_send >= now {
886 waiting_requests.push(req);
887 break;
888 } else if remove_timeout_requests
889 && req.request.1 > *DEFAULT_REQUEST_DELAY_UPPER_BOUND
890 {
891 warn!("Request is in-flight for over an hour: {:?}", req);
893 req.request.0.on_removed(&self.inflight_keys);
894 cancelled_requests.push(req.request.0);
895 continue;
896 }
897
898 let WaitingRequest(request, delay) = req.request;
901 let request = match request.resend() {
902 Some(r) => r,
903 None => continue,
904 };
905 if !self.check_and_update_net_inflight_blocks(&request) {
906 waiting_requests.push(TimedWaitingRequest::new(
911 now + delay,
912 WaitingRequest(request, delay),
914 None,
915 ));
916 continue;
917 }
918 batcher.insert(delay, request);
919 }
920
921 for (next_delay, request) in
922 batcher.get_batched_requests(prefer_archive_node_for_blocks)
923 {
924 let mut filter = PeerFilter::new(request.msg_id());
925 if let Some(cap) = request.required_capability() {
926 filter = filter.with_cap(cap);
927 }
928 if let Some(preferred_node_type) = request.preferred_node_type() {
929 filter = filter.with_preferred_node_type(preferred_node_type);
930 }
931 let chosen_peer = match filter.select(&self.syn) {
932 Some(p) => p,
933 None => {
934 debug!("No peer to send request, wait for next time");
935 if let Some(hashes) = try_get_block_hashes(&request) {
939 self.remove_net_inflight_blocks(hashes.iter())
940 }
941 waiting_requests.push(TimedWaitingRequest::new(
942 Instant::now() + next_delay,
943 WaitingRequest(request, next_delay),
944 None,
945 ));
946 continue;
947 }
948 };
949 debug!(
950 "Send waiting req {:?} to peer={} with next_delay={:?}",
951 request, chosen_peer, next_delay
952 );
953
954 if let Err(request) = self.request_handler.send_request(
955 io,
956 Some(chosen_peer),
957 request,
958 Some(next_delay),
959 ) {
960 waiting_requests.push(TimedWaitingRequest::new(
961 Instant::now() + next_delay,
962 WaitingRequest(request, next_delay),
963 None,
964 ));
965 }
966 }
967 cancelled_requests
968 }
969
970 pub fn on_peer_connected(&self, peer: &NodeId) {
971 self.request_handler.add_peer(*peer);
972 }
973
974 pub fn on_peer_disconnected(&self, io: &dyn NetworkContext, peer: &NodeId) {
975 if let Some(unfinished_requests) =
976 self.request_handler.remove_peer(peer)
977 {
978 for msg in unfinished_requests {
979 self.resend_request_to_another_peer(io, &msg);
980 }
981 } else {
982 debug!("Peer already removed form request manager when disconnected peer={}", peer);
983 }
984 }
985
986 fn check_and_update_net_inflight_blocks(
987 &self, request: &Box<dyn Request>,
988 ) -> bool {
989 if let Some(hashes) = try_get_block_hashes(request) {
990 let mut net_inflight_blocks =
995 self.inflight_keys.write(msgid::NET_INFLIGHT_BLOCKS);
996 if net_inflight_blocks.len()
997 >= self.recover_public_queue.estimated_available_count()
998 {
999 trace!("queue is full, send block request later: inflight={} req={:?}",
1000 net_inflight_blocks.len(), request);
1001 return false;
1002 } else {
1003 for hash in hashes {
1004 net_inflight_blocks.insert(Key::Hash(*hash));
1005 }
1006 trace!("queue is not full, send block request now: inflight={} req={:?}",
1007 net_inflight_blocks.len(), request);
1008 }
1009 }
1010 true
1011 }
1012
1013 pub fn remove_net_inflight_blocks<'a, I: Iterator<Item = &'a H256>>(
1014 &self, blocks: I,
1015 ) {
1016 let mut net_inflight_blocks =
1017 self.inflight_keys.write(msgid::NET_INFLIGHT_BLOCKS);
1018 for block_hash in blocks {
1019 net_inflight_blocks.remove(&Key::Hash(*block_hash));
1020 }
1021 }
1022}
1023
1024pub fn try_get_block_hashes(request: &Box<dyn Request>) -> Option<&Vec<H256>> {
1027 match request.msg_id() {
1028 msgid::GET_BLOCKS | msgid::GET_CMPCT_BLOCKS => {
1029 let hashes = if let Some(req) =
1030 request.as_any().downcast_ref::<GetBlocks>()
1031 {
1032 &req.hashes
1033 } else if let Some(req) =
1034 request.as_any().downcast_ref::<GetCompactBlocks>()
1035 {
1036 &req.hashes
1037 } else {
1038 panic!(
1039 "MessageId and Request not match, request={:?}",
1040 request
1041 );
1042 };
1043 Some(hashes)
1044 }
1045 _ => None,
1046 }
1047}
1048
1049#[derive(Debug, DeriveMallocSizeOf)]
1050struct TimedWaitingRequest {
1051 time_to_send: Instant,
1052 request: WaitingRequest,
1053 peer: Option<NodeId>,
1054}
1055
1056impl TimedWaitingRequest {
1057 fn new(
1058 time_to_send: Instant, request: WaitingRequest, peer: Option<NodeId>,
1059 ) -> Self {
1060 Self {
1061 time_to_send,
1062 request,
1063 peer,
1064 }
1065 }
1066}
1067
1068impl Ord for TimedWaitingRequest {
1069 fn cmp(&self, other: &Self) -> Ordering {
1070 other.time_to_send.cmp(&self.time_to_send)
1071 }
1072}
1073impl PartialOrd for TimedWaitingRequest {
1074 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1075 Some(self.cmp(other))
1076 }
1077}
1078impl Eq for TimedWaitingRequest {}
1079impl PartialEq for TimedWaitingRequest {
1080 fn eq(&self, other: &Self) -> bool {
1081 self.time_to_send == other.time_to_send
1082 }
1083}