1use super::{
6 synchronization_protocol_handler::ProtocolConfiguration,
7 synchronization_state::SynchronizationState,
8};
9use crate::{
10 sync::{
11 message::{
12 msgid, GetBlockHashesByEpoch, GetBlockHeaders, GetBlockTxn,
13 GetBlocks, GetCompactBlocks, GetTransactions,
14 GetTransactionsFromTxHashes, Key, KeyContainer, TransactionDigests,
15 },
16 request_manager::request_batcher::RequestBatcher,
17 synchronization_protocol_handler::{AsyncTaskQueue, RecoverPublicTask},
18 synchronization_state::PeerFilter,
19 Error,
20 },
21 NodeType,
22};
23use cfx_parameters::sync::REQUEST_START_WAITING_TIME;
24use cfx_types::H256;
25use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
26use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
27use metrics::{
28 register_meter_with_group, Gauge, GaugeUsize, Meter, MeterTimer,
29};
30use network::{node_table::NodeId, NetworkContext};
31use parking_lot::{Mutex, RwLock};
32use primitives::{SignedTransaction, TransactionWithSignature};
33pub use request_handler::{
34 AsAny, Request, RequestHandler, RequestMessage, SynchronizationPeerRequest,
35};
36use std::{
37 cmp::Ordering,
38 collections::{binary_heap::BinaryHeap, HashSet},
39 sync::Arc,
40 time::{Duration, Instant},
41};
42use tx_handler::{
43 InflightPendingTransactionContainer, InflightPendingTransactionItem,
44 ReceivedTransactionContainer, SentTransactionContainer,
45};
46
47mod request_batcher;
48mod request_handler;
49pub mod tx_handler;
50
51lazy_static! {
52 static ref TX_REQUEST_METER: Arc<dyn Meter> =
53 register_meter_with_group("system_metrics", "tx_diff_set_size");
54 static ref TX_REQUEST_TX_HASHES_METER: Arc<dyn Meter> =
55 register_meter_with_group(
56 "system_metrics",
57 "tx_request_tx_hashes_size"
58 );
59 static ref REQUEST_MANAGER_TIMER: Arc<dyn Meter> =
60 register_meter_with_group("timer", "request_manager::request_not_tx");
61 static ref REQUEST_MANAGER_TX_TIMER: Arc<dyn Meter> =
62 register_meter_with_group("timer", "request_manager::request_tx");
63 static ref TX_RECEIVED_POOL_METER: Arc<dyn Meter> =
64 register_meter_with_group("system_metrics", "tx_received_pool_size");
65 static ref INFLIGHT_TX_POOL_GAUGE: Arc<dyn Gauge<usize>> =
66 GaugeUsize::register_with_group(
67 "system_metrics",
68 "inflight_tx_pool_size"
69 );
70 static ref TX_HASHES_INFLIGHT_TX_POOL_GAUGE: Arc<dyn Gauge<usize>> =
71 GaugeUsize::register_with_group(
72 "system_metrics",
73 "tx_hashes_inflight_tx_pool_size"
74 );
75 static ref INFLIGHT_TX_PENDING_POOL_METER: Arc<dyn Meter> =
76 register_meter_with_group(
77 "system_metrics",
78 "inflight_tx_pending_pool_size"
79 );
80 static ref INFLIGHT_TX_REJECT_METER: Arc<dyn Meter> =
81 register_meter_with_group("system_metrics", "inflight_tx_reject_size");
82 static ref REQUEST_TX_FROM_INFLIGHT_PENDING_POOL_METER: Arc<dyn Meter> =
83 register_meter_with_group(
84 "system_metrics",
85 "request_tx_from_inflight_pending_pool"
86 );
87
88 static ref DEFAULT_REQUEST_DELAY_UPPER_BOUND: Duration =
97 Duration::from_secs(600);
98 static ref DEFAULT_REQUEST_BATCH_BUCKET_SIZE: Duration =
99 Duration::from_secs(2);
100}
101
102#[derive(Debug)]
103struct WaitingRequest(Box<dyn Request>, Duration); impl MallocSizeOf for WaitingRequest {
106 fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
107 self.0.size_of(ops) + self.1.size_of(ops)
108 }
109}
110
111#[derive(DeriveMallocSizeOf)]
121pub struct RequestManager {
122 inflight_keys: KeyContainer,
124
125 waiting_requests: Mutex<BinaryHeap<TimedWaitingRequest>>,
127
128 sent_transactions: RwLock<SentTransactionContainer>,
133 pub received_transactions: Arc<RwLock<ReceivedTransactionContainer>>,
134 pub inflight_pending_transactions:
136 Arc<RwLock<InflightPendingTransactionContainer>>,
137
138 request_handler: Arc<RequestHandler>,
140
141 syn: Arc<SynchronizationState>,
142
143 #[ignore_malloc_size_of = "channels are not handled in MallocSizeOf"]
144 recover_public_queue: Arc<AsyncTaskQueue<RecoverPublicTask>>,
145}
146
147impl RequestManager {
148 pub fn new(
149 protocol_config: &ProtocolConfiguration,
150 syn: Arc<SynchronizationState>,
151 recover_public_queue: Arc<AsyncTaskQueue<RecoverPublicTask>>,
152 ) -> Self {
153 let received_tx_index_maintain_timeout =
154 protocol_config.received_tx_index_maintain_timeout;
155 let inflight_pending_tx_index_maintain_timeout =
156 protocol_config.inflight_pending_tx_index_maintain_timeout;
157
158 let sent_transaction_window_size =
160 protocol_config.tx_maintained_for_peer_timeout.as_millis()
161 / protocol_config.send_tx_period.as_millis();
162 Self {
163 received_transactions: Arc::new(RwLock::new(
164 ReceivedTransactionContainer::new(
165 received_tx_index_maintain_timeout.as_secs(),
166 ),
167 )),
168 inflight_pending_transactions: Arc::new(RwLock::new(
169 InflightPendingTransactionContainer::new(
170 inflight_pending_tx_index_maintain_timeout.as_secs(),
171 ),
172 )),
173 sent_transactions: RwLock::new(SentTransactionContainer::new(
174 sent_transaction_window_size as usize,
175 )),
176 inflight_keys: Default::default(),
177 waiting_requests: Default::default(),
178 request_handler: Arc::new(RequestHandler::new(protocol_config)),
179 syn,
180 recover_public_queue,
181 }
182 }
183
184 pub fn num_epochs_in_flight(&self) -> u64 {
185 self.inflight_keys
186 .read(msgid::GET_BLOCK_HASHES_BY_EPOCH)
187 .len() as u64
188 }
189
190 pub fn in_flight_blocks(&self) -> HashSet<H256> {
191 self.inflight_keys
192 .read(msgid::GET_BLOCKS)
193 .iter()
194 .map(|key| match key {
195 Key::Hash(h) => *h,
196 Key::Num(_) | Key::Id(_) => {
197 unreachable!("GET_BLOCKS only has hash as key");
198 }
199 })
200 .collect()
201 }
202
203 pub fn request_with_delay(
206 &self, io: &dyn NetworkContext, mut request: Box<dyn Request>,
207 mut peer: Option<NodeId>, delay: Option<Duration>,
208 ) {
209 request.with_inflight(&self.inflight_keys);
211
212 if request.is_empty() {
213 request.notify_empty();
214 return;
215 }
216 if peer.is_some()
219 && delay.is_none()
220 && !self.check_and_update_net_inflight_blocks(&request)
221 {
222 peer = None;
223 }
224
225 let (cur_delay, next_delay) = match delay {
227 Some(d) => (d, d + *REQUEST_START_WAITING_TIME),
228 None => (*REQUEST_START_WAITING_TIME, *REQUEST_START_WAITING_TIME),
229 };
230
231 if peer.is_none() || delay.is_some() {
233 debug!("request_with_delay: add request to waiting_requests, peer={:?}, request={:?}, delay={:?}", peer, request, cur_delay);
234 self.waiting_requests.lock().push(TimedWaitingRequest::new(
235 Instant::now() + cur_delay,
236 WaitingRequest(request, next_delay),
237 peer,
238 ));
239 return;
240 }
241
242 if let Err(e) = self.request_handler.send_request(
243 io,
244 peer,
245 request,
246 Some(next_delay),
247 ) {
248 debug!("request_with_delay: send_request fails, peer={:?}, request={:?}", peer, e);
249 if let Some(hashes) = try_get_block_hashes(&e) {
253 self.remove_net_inflight_blocks(hashes.iter())
254 }
255 self.waiting_requests.lock().push(TimedWaitingRequest::new(
256 Instant::now() + cur_delay,
257 WaitingRequest(e, next_delay),
258 None,
259 ));
260 }
261 }
262
263 pub fn request_block_headers(
264 &self, io: &dyn NetworkContext, peer_id: Option<NodeId>,
265 hashes: Vec<H256>, delay: Option<Duration>,
266 ) {
267 let _timer = MeterTimer::time_func(REQUEST_MANAGER_TIMER.as_ref());
268
269 debug!("request_block_headers: {:?}, peer {:?}", hashes, peer_id);
270
271 let request = GetBlockHeaders {
272 request_id: 0,
273 hashes,
274 };
275
276 self.request_with_delay(io, Box::new(request), peer_id, delay);
277 }
278
279 pub fn request_epoch_hashes(
280 &self, io: &dyn NetworkContext, peer_id: Option<NodeId>,
281 epochs: Vec<u64>, delay: Option<Duration>,
282 ) {
283 let request = GetBlockHashesByEpoch {
284 request_id: 0,
285 epochs,
286 };
287
288 self.request_with_delay(io, Box::new(request), peer_id, delay);
289 }
290
291 pub fn request_blocks(
292 &self, io: &dyn NetworkContext, peer_id: Option<NodeId>,
293 hashes: Vec<H256>, with_public: bool, delay: Option<Duration>,
294 preferred_node_type: Option<NodeType>,
295 ) {
296 let _timer = MeterTimer::time_func(REQUEST_MANAGER_TIMER.as_ref());
297 debug!("request_blocks: hashes={:?}", hashes);
298
299 let request = GetBlocks {
300 request_id: 0,
301 with_public,
302 hashes,
303 preferred_node_type,
304 };
305
306 self.request_with_delay(io, Box::new(request), peer_id, delay);
307 }
308
309 pub fn request_transactions_from_digest(
310 &self, io: &dyn NetworkContext, peer_id: NodeId,
311 transaction_digests: &TransactionDigests,
312 ) {
313 let _timer = MeterTimer::time_func(REQUEST_MANAGER_TX_TIMER.as_ref());
314
315 let window_index: usize = transaction_digests.window_index;
316 let key1 = transaction_digests.key1;
317 let key2 = transaction_digests.key2;
318 let (random_byte_vector, fixed_bytes_vector) =
319 transaction_digests.get_decomposed_short_ids();
320
321 if fixed_bytes_vector.is_empty()
322 && transaction_digests.tx_hashes.is_empty()
323 {
324 return;
325 }
326
327 let mut tx_from_short_id_inflight_keys =
328 self.inflight_keys.write(msgid::GET_TRANSACTIONS);
329 let mut tx_from_hashes_inflight_keys = self
330 .inflight_keys
331 .write(msgid::GET_TRANSACTIONS_FROM_TX_HASHES);
332 let received_transactions = self.received_transactions.read();
333
334 INFLIGHT_TX_POOL_GAUGE.update(tx_from_short_id_inflight_keys.len());
335 TX_HASHES_INFLIGHT_TX_POOL_GAUGE
336 .update(tx_from_hashes_inflight_keys.len());
337 TX_RECEIVED_POOL_METER.mark(received_transactions.get_length());
338
339 let (
340 short_ids,
341 tx_hashes,
342 tx_request_indices,
343 hashes_request_indices,
344 inflight_pending_items,
345 ) = {
346 let mut short_ids = HashSet::new();
347 let mut tx_hashes = HashSet::new();
348 let mut tx_request_indices = Vec::new();
349 let mut hashes_request_indices = Vec::new();
350 let mut inflight_pending_items: Vec<
351 InflightPendingTransactionItem,
352 > = Vec::new();
353
354 for i in 0..fixed_bytes_vector.len() {
356 let fixed_bytes = fixed_bytes_vector[i];
357 let random_bytes = random_byte_vector[i];
358
359 if received_transactions.contains_short_id(
360 fixed_bytes,
361 random_bytes,
362 key1,
363 key2,
364 ) {
365 if received_transactions.group_overflow(&fixed_bytes) {
366 hashes_request_indices.push(i);
367 }
368 continue;
370 }
371
372 if tx_from_short_id_inflight_keys.insert(Key::Id(fixed_bytes)) {
373 tx_request_indices.push(i);
374 short_ids.insert(fixed_bytes);
375 } else {
376 inflight_pending_items.push(
378 InflightPendingTransactionItem::new(
379 fixed_bytes,
380 random_bytes,
381 window_index,
382 key1,
383 key2,
384 i,
385 peer_id.clone(),
386 ),
387 );
388 INFLIGHT_TX_PENDING_POOL_METER.mark(1);
389 }
390 }
391
392 let base_index = fixed_bytes_vector.len();
394 for i in 0..transaction_digests.tx_hashes.len() {
395 let tx_hash = transaction_digests.tx_hashes[i];
396 if received_transactions.contains_tx_hash(&tx_hash) {
397 continue;
398 }
399 if tx_from_hashes_inflight_keys.insert(Key::Hash(tx_hash)) {
400 tx_request_indices.push(base_index + i);
401 tx_hashes.insert(tx_hash);
402 } else {
403 INFLIGHT_TX_REJECT_METER.mark(1);
405 }
406 }
407
408 (
409 short_ids,
410 tx_hashes,
411 tx_request_indices,
412 hashes_request_indices,
413 inflight_pending_items,
414 )
415 };
416 TX_REQUEST_METER.mark(tx_request_indices.len());
417 TX_REQUEST_TX_HASHES_METER.mark(hashes_request_indices.len());
418 debug!(
419 "Request {} tx and {} tx hashes from peer={}",
420 tx_request_indices.len(),
421 hashes_request_indices.len(),
422 peer_id.clone()
423 );
424
425 let request = GetTransactions {
426 request_id: 0,
427 window_index,
428 indices: tx_request_indices,
429 tx_hashes_indices: hashes_request_indices,
430 short_ids: short_ids.clone(),
431 tx_hashes: tx_hashes.clone(),
432 };
433
434 if request.is_empty() {
435 return;
436 }
437
438 if self
439 .request_handler
440 .send_request(io, Some(peer_id), Box::new(request), None)
441 .is_err()
442 {
443 for id in short_ids {
444 tx_from_short_id_inflight_keys.remove(&Key::Id(id));
445 }
446 for id in tx_hashes {
447 tx_from_hashes_inflight_keys.remove(&Key::Hash(id));
448 }
449 return;
450 }
451
452 self.inflight_pending_transactions
453 .write()
454 .append_inflight_pending_items(inflight_pending_items);
455 }
456
457 pub fn request_transactions_from_tx_hashes(
458 &self, io: &dyn NetworkContext, peer_id: NodeId,
459 responded_tx_hashes: Vec<H256>, window_index: usize,
460 tx_hashes_indices: &Vec<usize>,
461 ) -> Result<(), Error> {
462 let _timer = MeterTimer::time_func(REQUEST_MANAGER_TX_TIMER.as_ref());
463
464 if responded_tx_hashes.is_empty() {
465 return Ok(());
466 }
467
468 if responded_tx_hashes.len() > tx_hashes_indices.len() {
469 return Err(Error::UnexpectedResponse.into());
470 }
471
472 let mut tx_from_hashes_inflight_keys = self
473 .inflight_keys
474 .write(msgid::GET_TRANSACTIONS_FROM_TX_HASHES);
475 let received_transactions = self.received_transactions.read();
476
477 TX_HASHES_INFLIGHT_TX_POOL_GAUGE
478 .update(tx_from_hashes_inflight_keys.len());
479 TX_RECEIVED_POOL_METER.mark(received_transactions.get_length());
480
481 let (tx_hashes, indices) = {
482 let mut tx_hashes = HashSet::new();
483 let mut indices = Vec::new();
484
485 for i in 0..responded_tx_hashes.len() {
486 let tx_hash = responded_tx_hashes[i];
487 if received_transactions.contains_tx_hash(&tx_hash) {
488 continue;
490 }
491
492 if tx_from_hashes_inflight_keys.insert(Key::Hash(tx_hash)) {
493 indices.push(tx_hashes_indices[i]);
494 tx_hashes.insert(tx_hash);
495 } else {
496 INFLIGHT_TX_REJECT_METER.mark(1);
498 }
499 }
500
501 (tx_hashes, indices)
502 };
503 TX_REQUEST_METER.mark(tx_hashes.len());
504 debug!(
505 "Request {} tx using tx hashes from peer={}",
506 indices.len(),
507 peer_id
508 );
509
510 let request = GetTransactionsFromTxHashes {
511 request_id: 0,
512 window_index,
513 indices,
514 tx_hashes: tx_hashes.clone(),
515 };
516
517 if request.is_empty() {
518 return Ok(());
519 }
520
521 if self
522 .request_handler
523 .send_request(io, Some(peer_id), Box::new(request), None)
524 .is_err()
525 {
526 for id in tx_hashes {
527 tx_from_hashes_inflight_keys.remove(&Key::Hash(id));
528 }
529 }
530
531 Ok(())
532 }
533
534 pub fn request_compact_blocks(
535 &self, io: &dyn NetworkContext, peer_id: Option<NodeId>,
536 hashes: Vec<H256>, delay: Option<Duration>,
537 ) {
538 let _timer = MeterTimer::time_func(REQUEST_MANAGER_TIMER.as_ref());
539 debug!("request_compact_blocks: hashes={:?}", hashes);
540
541 let request = GetCompactBlocks {
542 request_id: 0,
543 hashes,
544 };
545
546 self.request_with_delay(io, Box::new(request), peer_id, delay);
547 }
548
549 pub fn request_blocktxn(
550 &self, io: &dyn NetworkContext, peer_id: NodeId, block_hash: H256,
551 index_skips: Vec<usize>, delay: Option<Duration>,
552 ) {
553 let _timer = MeterTimer::time_func(REQUEST_MANAGER_TIMER.as_ref());
554
555 let request = GetBlockTxn {
556 request_id: 0,
557 block_hash: block_hash.clone(),
558 index_skips,
559 };
560
561 self.request_with_delay(io, Box::new(request), Some(peer_id), delay);
562 }
563
564 fn send_request_again(
565 &self, io: &dyn NetworkContext, msg: &RequestMessage,
566 ) {
567 debug!("send_request_again, request={:?}", msg.request);
568 if let Some(request) = msg.request.resend() {
569 let mut filter = PeerFilter::new(request.msg_id());
570 if let Some(preferred_node_type) = request.preferred_node_type() {
571 filter = filter.with_preferred_node_type(preferred_node_type);
572 }
573 if let Some(cap) = request.required_capability() {
574 filter = filter.with_cap(cap);
575 }
576 let chosen_peer = filter.select(&self.syn);
577 debug!("send_request_again with new request, peer={:?}, new request={:?}", chosen_peer, request);
578 self.request_with_delay(io, request, chosen_peer, msg.delay);
579 }
580 }
581
582 pub fn send_pending_requests(
583 &self, io: &dyn NetworkContext, peer: &NodeId,
584 ) {
585 self.request_handler.send_pending_requests(io, peer)
586 }
587
588 pub fn resend_request_to_another_peer(
589 &self, io: &dyn NetworkContext, req: &RequestMessage,
590 ) {
591 req.request.on_removed(&self.inflight_keys);
592 self.send_request_again(io, req);
593 }
594
595 pub fn match_request(
598 &self, peer_id: &NodeId, request_id: u64,
599 ) -> Result<RequestMessage, Error> {
600 self.request_handler.match_request(peer_id, request_id)
601 }
602
603 pub fn headers_received(
609 &self, io: &dyn NetworkContext, req_hashes: HashSet<H256>,
610 mut received_headers: HashSet<H256>, delay: Option<Duration>,
611 ) {
612 let _timer = MeterTimer::time_func(REQUEST_MANAGER_TIMER.as_ref());
613 debug!(
614 "headers_received: req_hashes={:?} received_headers={:?}",
615 req_hashes, received_headers
616 );
617 let missing_headers = {
618 let mut inflight_keys =
619 self.inflight_keys.write(msgid::GET_BLOCK_HEADERS);
620 let mut missing_headers = Vec::new();
621 for req_hash in &req_hashes {
622 if !received_headers.remove(req_hash) {
623 if inflight_keys.remove(&Key::Hash(*req_hash)) {
628 missing_headers.push(*req_hash);
629 }
630 } else {
631 inflight_keys.remove(&Key::Hash(*req_hash));
632 }
633 }
634 for h in &received_headers {
635 inflight_keys.remove(&Key::Hash(*h));
636 }
637 missing_headers
638 };
639 if !missing_headers.is_empty() {
640 let chosen_peer =
641 PeerFilter::new(msgid::GET_BLOCK_HEADERS).select(&self.syn);
642 self.request_block_headers(io, chosen_peer, missing_headers, delay);
643 }
644 }
645
646 pub fn epochs_received(
648 &self, io: &dyn NetworkContext, req_epochs: HashSet<u64>,
649 mut received_epochs: HashSet<u64>, delay: Option<Duration>,
650 ) {
651 debug!(
652 "epochs_received: req_epochs={:?} received_epochs={:?}",
653 req_epochs, received_epochs
654 );
655 let missing_epochs = {
656 let mut inflight_keys =
657 self.inflight_keys.write(msgid::GET_BLOCK_HASHES_BY_EPOCH);
658 let mut missing_epochs = Vec::new();
659 for epoch_number in &req_epochs {
660 if !received_epochs.remove(epoch_number) {
661 if inflight_keys.remove(&Key::Num(*epoch_number)) {
666 missing_epochs.push(*epoch_number);
667 }
668 } else {
669 inflight_keys.remove(&Key::Num(*epoch_number));
670 }
671 }
672 for epoch_number in &received_epochs {
673 inflight_keys.remove(&Key::Num(*epoch_number));
674 }
675 missing_epochs
676 };
677 if !missing_epochs.is_empty() {
678 let chosen_peer = PeerFilter::new(msgid::GET_BLOCK_HASHES_BY_EPOCH)
679 .select(&self.syn);
680 self.request_epoch_hashes(io, chosen_peer, missing_epochs, delay);
681 }
682 }
683
684 pub fn blocks_received(
691 &self, io: &dyn NetworkContext, requested_hashes: HashSet<H256>,
692 mut received_blocks: HashSet<H256>, ask_full_block: bool,
693 peer: Option<NodeId>, with_public: bool, delay: Option<Duration>,
694 preferred_node_type_for_block_request: Option<NodeType>,
695 ) {
696 let _timer = MeterTimer::time_func(REQUEST_MANAGER_TIMER.as_ref());
697 debug!(
698 "blocks_received: req_hashes={:?} received_blocks={:?} peer={:?}",
699 requested_hashes, received_blocks, peer
700 );
701 let missing_blocks = {
702 let mut inflight_blocks =
703 self.inflight_keys.write(msgid::GET_BLOCKS);
704 let mut net_inflight_blocks =
705 self.inflight_keys.write(msgid::NET_INFLIGHT_BLOCKS);
706 let mut missing_blocks = Vec::new();
707 for req_hash in &requested_hashes {
708 net_inflight_blocks.remove(&Key::Hash(*req_hash));
709 if !received_blocks.remove(req_hash) {
710 if inflight_blocks.remove(&Key::Hash(*req_hash)) {
715 missing_blocks.push(*req_hash);
716 }
717 } else {
718 inflight_blocks.remove(&Key::Hash(*req_hash));
719 }
720 }
721 for h in &received_blocks {
722 net_inflight_blocks.remove(&Key::Hash(*h));
723 inflight_blocks.remove(&Key::Hash(*h));
724 }
725 missing_blocks
726 };
727 if !missing_blocks.is_empty() {
728 let chosen_peer = peer.or_else(|| {
733 let msg_id = if ask_full_block {
734 msgid::GET_BLOCKS
735 } else {
736 msgid::GET_CMPCT_BLOCKS
737 };
738
739 PeerFilter::new(msg_id).select(&self.syn)
740 });
741 if ask_full_block {
742 self.request_blocks(
743 io,
744 chosen_peer,
745 missing_blocks,
746 with_public,
747 delay,
748 preferred_node_type_for_block_request,
749 );
750 } else {
751 self.request_compact_blocks(
752 io,
753 chosen_peer,
754 missing_blocks,
755 delay,
756 );
757 }
758 }
759 }
760
761 pub fn transactions_received_from_digests(
762 &self, io: &dyn NetworkContext,
763 get_transactions_request: &GetTransactions,
764 signed_transactions: Vec<Arc<SignedTransaction>>,
765 ) {
766 let mut short_id_inflight_keys =
767 self.inflight_keys.write(msgid::GET_TRANSACTIONS);
768 let mut tx_hash_inflight_keys = self
769 .inflight_keys
770 .write(msgid::GET_TRANSACTIONS_FROM_TX_HASHES);
771
772 let (requests, keeped_short_ids) = self
773 .inflight_pending_transactions
774 .write()
775 .generate_tx_requests_from_inflight_pending_pool(
776 &signed_transactions,
777 );
778
779 self.append_received_transactions(signed_transactions);
780 for tx in &get_transactions_request.short_ids {
781 if !keeped_short_ids.contains(tx) {
782 short_id_inflight_keys.remove(&Key::Id(*tx));
783 }
784 }
785 for tx in &get_transactions_request.tx_hashes {
786 tx_hash_inflight_keys.remove(&Key::Hash(*tx));
787 }
788
789 if requests.is_empty() {
791 return;
792 }
793 REQUEST_TX_FROM_INFLIGHT_PENDING_POOL_METER.mark(requests.len());
794 for request in requests {
795 let tx_request = GetTransactions {
796 request_id: 0,
797 window_index: request.window_index,
798 indices: vec![request.index],
799 tx_hashes_indices: vec![],
800 short_ids: {
801 let mut set = HashSet::new();
802 set.insert(request.fixed_byte_part);
803 set
804 },
805 tx_hashes: HashSet::new(),
806 };
807 if self
808 .request_handler
809 .send_request(
810 io,
811 Some(request.peer_id),
812 Box::new(tx_request),
813 None,
814 )
815 .is_err()
816 {
817 short_id_inflight_keys
818 .remove(&Key::Id(request.fixed_byte_part));
819 }
820 }
821 }
822
823 pub fn transactions_received_from_tx_hashes(
824 &self, get_transactions_request: &GetTransactionsFromTxHashes,
825 signed_transactions: Vec<Arc<SignedTransaction>>,
826 ) {
827 let mut tx_hash_inflight_keys = self
828 .inflight_keys
829 .write(msgid::GET_TRANSACTIONS_FROM_TX_HASHES);
830 for tx in &get_transactions_request.tx_hashes {
831 tx_hash_inflight_keys.remove(&Key::Hash(*tx));
832 }
833 self.append_received_transactions(signed_transactions);
834 }
835
836 pub fn get_sent_transactions(
837 &self, window_index: usize, indices: &Vec<usize>,
838 ) -> Vec<TransactionWithSignature> {
839 let sent_transactions = self.sent_transactions.read();
840 let mut txs = Vec::with_capacity(indices.len());
841 for index in indices {
842 if let Some(tx) =
843 sent_transactions.get_transaction(window_index, *index)
844 {
845 txs.push(tx.transaction.clone());
846 }
847 }
848 txs
849 }
850
851 pub fn append_sent_transactions(
852 &self, transactions: Vec<Arc<SignedTransaction>>,
853 ) -> usize {
854 self.sent_transactions
855 .write()
856 .append_transactions(transactions)
857 }
858
859 pub fn append_received_transactions(
860 &self, transactions: Vec<Arc<SignedTransaction>>,
861 ) {
862 self.received_transactions
863 .write()
864 .append_transactions(transactions)
865 }
866
867 pub fn resend_timeout_requests(&self, io: &dyn NetworkContext) {
868 debug!("resend_timeout_requests: start");
869 let timeout_requests =
870 self.request_handler.process_timeout_requests(io);
871 for req in timeout_requests {
872 debug!("Timeout requests: {:?}", req);
873 self.resend_request_to_another_peer(io, &req);
874 }
875 }
876
877 pub fn resend_waiting_requests(
880 &self, io: &dyn NetworkContext, remove_timeout_requests: bool,
881 prefer_archive_node_for_blocks: bool,
882 ) -> Vec<Box<dyn Request>> {
883 debug!("resend_waiting_requests: start");
884 let mut waiting_requests = self.waiting_requests.lock();
885 let now = Instant::now();
886 let mut batcher =
887 RequestBatcher::new(*DEFAULT_REQUEST_BATCH_BUCKET_SIZE);
888
889 let mut cancelled_requests = Vec::new();
890 while let Some(req) = waiting_requests.pop() {
891 if req.time_to_send >= now {
892 waiting_requests.push(req);
893 break;
894 } else if remove_timeout_requests
895 && req.request.1 > *DEFAULT_REQUEST_DELAY_UPPER_BOUND
896 {
897 warn!("Request is in-flight for over an hour: {:?}", req);
899 req.request.0.on_removed(&self.inflight_keys);
900 cancelled_requests.push(req.request.0);
901 continue;
902 }
903
904 let WaitingRequest(request, delay) = req.request;
907 let request = match request.resend() {
908 Some(r) => r,
909 None => continue,
910 };
911 if !self.check_and_update_net_inflight_blocks(&request) {
912 waiting_requests.push(TimedWaitingRequest::new(
917 now + delay,
918 WaitingRequest(request, delay),
920 None,
921 ));
922 continue;
923 }
924 batcher.insert(delay, request);
925 }
926
927 for (next_delay, request) in
928 batcher.get_batched_requests(prefer_archive_node_for_blocks)
929 {
930 let mut filter = PeerFilter::new(request.msg_id());
931 if let Some(cap) = request.required_capability() {
932 filter = filter.with_cap(cap);
933 }
934 if let Some(preferred_node_type) = request.preferred_node_type() {
935 filter = filter.with_preferred_node_type(preferred_node_type);
936 }
937 let chosen_peer = match filter.select(&self.syn) {
938 Some(p) => p,
939 None => {
940 debug!("No peer to send request, wait for next time");
941 if let Some(hashes) = try_get_block_hashes(&request) {
945 self.remove_net_inflight_blocks(hashes.iter())
946 }
947 waiting_requests.push(TimedWaitingRequest::new(
948 Instant::now() + next_delay,
949 WaitingRequest(request, next_delay),
950 None,
951 ));
952 continue;
953 }
954 };
955 debug!(
956 "Send waiting req {:?} to peer={} with next_delay={:?}",
957 request, chosen_peer, next_delay
958 );
959
960 if let Err(request) = self.request_handler.send_request(
961 io,
962 Some(chosen_peer),
963 request,
964 Some(next_delay),
965 ) {
966 waiting_requests.push(TimedWaitingRequest::new(
967 Instant::now() + next_delay,
968 WaitingRequest(request, next_delay),
969 None,
970 ));
971 }
972 }
973 cancelled_requests
974 }
975
976 pub fn on_peer_connected(&self, peer: &NodeId) {
977 self.request_handler.add_peer(*peer);
978 }
979
980 pub fn on_peer_disconnected(&self, io: &dyn NetworkContext, peer: &NodeId) {
981 if let Some(unfinished_requests) =
982 self.request_handler.remove_peer(peer)
983 {
984 for msg in unfinished_requests {
985 self.resend_request_to_another_peer(io, &msg);
986 }
987 } else {
988 debug!("Peer already removed form request manager when disconnected peer={}", peer);
989 }
990 }
991
992 fn check_and_update_net_inflight_blocks(
993 &self, request: &Box<dyn Request>,
994 ) -> bool {
995 if let Some(hashes) = try_get_block_hashes(request) {
996 let mut net_inflight_blocks =
1001 self.inflight_keys.write(msgid::NET_INFLIGHT_BLOCKS);
1002 if net_inflight_blocks.len()
1003 >= self.recover_public_queue.estimated_available_count()
1004 {
1005 trace!("queue is full, send block request later: inflight={} req={:?}",
1006 net_inflight_blocks.len(), request);
1007 return false;
1008 } else {
1009 for hash in hashes {
1010 net_inflight_blocks.insert(Key::Hash(*hash));
1011 }
1012 trace!("queue is not full, send block request now: inflight={} req={:?}",
1013 net_inflight_blocks.len(), request);
1014 }
1015 }
1016 true
1017 }
1018
1019 pub fn remove_net_inflight_blocks<'a, I: Iterator<Item = &'a H256>>(
1020 &self, blocks: I,
1021 ) {
1022 let mut net_inflight_blocks =
1023 self.inflight_keys.write(msgid::NET_INFLIGHT_BLOCKS);
1024 for block_hash in blocks {
1025 net_inflight_blocks.remove(&Key::Hash(*block_hash));
1026 }
1027 }
1028}
1029
1030pub fn try_get_block_hashes(request: &Box<dyn Request>) -> Option<&Vec<H256>> {
1033 match request.msg_id() {
1034 msgid::GET_BLOCKS | msgid::GET_CMPCT_BLOCKS => {
1035 let hashes = if let Some(req) =
1036 request.as_any().downcast_ref::<GetBlocks>()
1037 {
1038 &req.hashes
1039 } else if let Some(req) =
1040 request.as_any().downcast_ref::<GetCompactBlocks>()
1041 {
1042 &req.hashes
1043 } else {
1044 panic!(
1045 "MessageId and Request not match, request={:?}",
1046 request
1047 );
1048 };
1049 Some(hashes)
1050 }
1051 _ => None,
1052 }
1053}
1054
1055#[derive(Debug, DeriveMallocSizeOf)]
1056struct TimedWaitingRequest {
1057 time_to_send: Instant,
1058 request: WaitingRequest,
1059 peer: Option<NodeId>,
1060}
1061
1062impl TimedWaitingRequest {
1063 fn new(
1064 time_to_send: Instant, request: WaitingRequest, peer: Option<NodeId>,
1065 ) -> Self {
1066 Self {
1067 time_to_send,
1068 request,
1069 peer,
1070 }
1071 }
1072}
1073
1074impl Ord for TimedWaitingRequest {
1075 fn cmp(&self, other: &Self) -> Ordering {
1076 other.time_to_send.cmp(&self.time_to_send)
1077 }
1078}
1079impl PartialOrd for TimedWaitingRequest {
1080 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1081 Some(self.cmp(other))
1082 }
1083}
1084impl Eq for TimedWaitingRequest {}
1085impl PartialEq for TimedWaitingRequest {
1086 fn eq(&self, other: &Self) -> bool {
1087 self.time_to_send == other.time_to_send
1088 }
1089}