cfxcore/sync/request_manager/
mod.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use super::{
6    synchronization_protocol_handler::ProtocolConfiguration,
7    synchronization_state::SynchronizationState,
8};
9use crate::{
10    sync::{
11        message::{
12            msgid, GetBlockHashesByEpoch, GetBlockHeaders, GetBlockTxn,
13            GetBlocks, GetCompactBlocks, GetTransactions,
14            GetTransactionsFromTxHashes, Key, KeyContainer, TransactionDigests,
15        },
16        request_manager::request_batcher::RequestBatcher,
17        synchronization_protocol_handler::{AsyncTaskQueue, RecoverPublicTask},
18        synchronization_state::PeerFilter,
19        Error,
20    },
21    NodeType,
22};
23use cfx_parameters::sync::REQUEST_START_WAITING_TIME;
24use cfx_types::H256;
25use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
26use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
27use metrics::{
28    register_meter_with_group, Gauge, GaugeUsize, Meter, MeterTimer,
29};
30use network::{node_table::NodeId, NetworkContext};
31use parking_lot::{Mutex, RwLock};
32use primitives::{SignedTransaction, TransactionWithSignature};
33pub use request_handler::{
34    AsAny, Request, RequestHandler, RequestMessage, SynchronizationPeerRequest,
35};
36use std::{
37    cmp::Ordering,
38    collections::{binary_heap::BinaryHeap, HashSet},
39    sync::Arc,
40    time::{Duration, Instant},
41};
42use tx_handler::{
43    InflightPendingTransactionContainer, InflightPendingTransactionItem,
44    ReceivedTransactionContainer, SentTransactionContainer,
45};
46
47mod request_batcher;
48mod request_handler;
49pub mod tx_handler;
50
51lazy_static! {
52    static ref TX_REQUEST_METER: Arc<dyn Meter> =
53        register_meter_with_group("system_metrics", "tx_diff_set_size");
54    static ref TX_REQUEST_TX_HASHES_METER: Arc<dyn Meter> =
55        register_meter_with_group(
56            "system_metrics",
57            "tx_request_tx_hashes_size"
58        );
59    static ref REQUEST_MANAGER_TIMER: Arc<dyn Meter> =
60        register_meter_with_group("timer", "request_manager::request_not_tx");
61    static ref REQUEST_MANAGER_TX_TIMER: Arc<dyn Meter> =
62        register_meter_with_group("timer", "request_manager::request_tx");
63    static ref TX_RECEIVED_POOL_METER: Arc<dyn Meter> =
64        register_meter_with_group("system_metrics", "tx_received_pool_size");
65    static ref INFLIGHT_TX_POOL_GAUGE: Arc<dyn Gauge<usize>> =
66        GaugeUsize::register_with_group(
67            "system_metrics",
68            "inflight_tx_pool_size"
69        );
70    static ref TX_HASHES_INFLIGHT_TX_POOL_GAUGE: Arc<dyn Gauge<usize>> =
71        GaugeUsize::register_with_group(
72            "system_metrics",
73            "tx_hashes_inflight_tx_pool_size"
74        );
75    static ref INFLIGHT_TX_PENDING_POOL_METER: Arc<dyn Meter> =
76        register_meter_with_group(
77            "system_metrics",
78            "inflight_tx_pending_pool_size"
79        );
80    static ref INFLIGHT_TX_REJECT_METER: Arc<dyn Meter> =
81        register_meter_with_group("system_metrics", "inflight_tx_reject_size");
82    static ref REQUEST_TX_FROM_INFLIGHT_PENDING_POOL_METER: Arc<dyn Meter> =
83        register_meter_with_group(
84            "system_metrics",
85            "request_tx_from_inflight_pending_pool"
86        );
87
88    /// Delay is increased by 1 second each time, so it costs at least 600*601/2 (about 50h) to reach
89    /// this upper bound. And requests will be discarded after reaching this upper bound.
90    ///
91    /// This upper bound is set so that a block should have been before the current checkpoint
92    /// in our experience, so it will not be referred to by a new block and will likely not
93    /// affect the sync process.
94    /// TODO: We may want to decrease this dynamically in case some active attackers are
95    /// sending non-existent hashes to better limit the resource exhaustion.
96    static ref DEFAULT_REQUEST_DELAY_UPPER_BOUND: Duration =
97        Duration::from_secs(600);
98    static ref DEFAULT_REQUEST_BATCH_BUCKET_SIZE: Duration =
99        Duration::from_secs(2);
100}
101
102#[derive(Debug)]
103struct WaitingRequest(Box<dyn Request>, Duration); // (request, delay)
104
105impl MallocSizeOf for WaitingRequest {
106    fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
107        self.0.size_of(ops) + self.1.size_of(ops)
108    }
109}
110
111/// When a header or block is requested by the `RequestManager`, it is ensured
112/// that if it's not fully received, its hash exists
113/// in `in_flight` after every function call.
114///
115/// The thread who removes a hash from `in_flight` is responsible to request it
116/// again if it's not received.
117///
118/// No lock is held when we call another function in this struct, and all locks
119/// are acquired in the same order, so there should exist no deadlocks.
120#[derive(DeriveMallocSizeOf)]
121pub struct RequestManager {
122    // used to avoid send duplicated requests.
123    inflight_keys: KeyContainer,
124
125    /// Each element is (timeout_time, request, chosen_peer)
126    waiting_requests: Mutex<BinaryHeap<TimedWaitingRequest>>,
127
128    /// The following fields are used to control how to
129    /// propagate transactions in normal case.
130    /// Holds a set of transactions recently sent to this peer to avoid
131    /// spamming.
132    sent_transactions: RwLock<SentTransactionContainer>,
133    pub received_transactions: Arc<RwLock<ReceivedTransactionContainer>>,
134    // used to avoid send duplicated requests.
135    pub inflight_pending_transactions:
136        Arc<RwLock<InflightPendingTransactionContainer>>,
137
138    /// This is used to handle request_id matching
139    request_handler: Arc<RequestHandler>,
140
141    syn: Arc<SynchronizationState>,
142
143    #[ignore_malloc_size_of = "channels are not handled in MallocSizeOf"]
144    recover_public_queue: Arc<AsyncTaskQueue<RecoverPublicTask>>,
145}
146
147impl RequestManager {
148    pub fn new(
149        protocol_config: &ProtocolConfiguration,
150        syn: Arc<SynchronizationState>,
151        recover_public_queue: Arc<AsyncTaskQueue<RecoverPublicTask>>,
152    ) -> Self {
153        let received_tx_index_maintain_timeout =
154            protocol_config.received_tx_index_maintain_timeout;
155        let inflight_pending_tx_index_maintain_timeout =
156            protocol_config.inflight_pending_tx_index_maintain_timeout;
157
158        // FIXME: make sent_transaction_window_size to be 2^pow.
159        let sent_transaction_window_size =
160            protocol_config.tx_maintained_for_peer_timeout.as_millis()
161                / protocol_config.send_tx_period.as_millis();
162        Self {
163            received_transactions: Arc::new(RwLock::new(
164                ReceivedTransactionContainer::new(
165                    received_tx_index_maintain_timeout.as_secs(),
166                ),
167            )),
168            inflight_pending_transactions: Arc::new(RwLock::new(
169                InflightPendingTransactionContainer::new(
170                    inflight_pending_tx_index_maintain_timeout.as_secs(),
171                ),
172            )),
173            sent_transactions: RwLock::new(SentTransactionContainer::new(
174                sent_transaction_window_size as usize,
175            )),
176            inflight_keys: Default::default(),
177            waiting_requests: Default::default(),
178            request_handler: Arc::new(RequestHandler::new(protocol_config)),
179            syn,
180            recover_public_queue,
181        }
182    }
183
184    pub fn num_epochs_in_flight(&self) -> u64 {
185        self.inflight_keys
186            .read(msgid::GET_BLOCK_HASHES_BY_EPOCH)
187            .len() as u64
188    }
189
190    pub fn in_flight_blocks(&self) -> HashSet<H256> {
191        self.inflight_keys
192            .read(msgid::GET_BLOCKS)
193            .iter()
194            .map(|key| match key {
195                Key::Hash(h) => *h,
196                Key::Num(_) | Key::Id(_) => {
197                    unreachable!("GET_BLOCKS only has hash as key");
198                }
199            })
200            .collect()
201    }
202
203    /// Send request to remote peer with delay mechanism. If failed,
204    /// add the request to waiting queue to resend later.
205    pub fn request_with_delay(
206        &self, io: &dyn NetworkContext, mut request: Box<dyn Request>,
207        mut peer: Option<NodeId>, delay: Option<Duration>,
208    ) {
209        // retain the request items that not in flight.
210        request.with_inflight(&self.inflight_keys);
211
212        if request.is_empty() {
213            request.notify_empty();
214            return;
215        }
216        // Check block-related requests, and put them into waiting_requests
217        // if we cannot process it.
218        if peer.is_some()
219            && delay.is_none()
220            && !self.check_and_update_net_inflight_blocks(&request)
221        {
222            peer = None;
223        }
224
225        // increase delay for resent request.
226        let (cur_delay, next_delay) = match delay {
227            Some(d) => (d, d + *REQUEST_START_WAITING_TIME),
228            None => (*REQUEST_START_WAITING_TIME, *REQUEST_START_WAITING_TIME),
229        };
230
231        // delay if no peer available or delay required
232        if peer.is_none() || delay.is_some() {
233            debug!("request_with_delay: add request to waiting_requests, peer={:?}, request={:?}, delay={:?}", peer, request, cur_delay);
234            self.waiting_requests.lock().push(TimedWaitingRequest::new(
235                Instant::now() + cur_delay,
236                WaitingRequest(request, next_delay),
237                peer,
238            ));
239            return;
240        }
241
242        if let Err(e) = self.request_handler.send_request(
243            io,
244            peer,
245            request,
246            Some(next_delay),
247        ) {
248            debug!("request_with_delay: send_request fails, peer={:?}, request={:?}", peer, e);
249            // These requests are not actually sent,
250            // and they will not be inserted into requests_queue,
251            // so remove them from net_inflight_blocks.
252            if let Some(hashes) = try_get_block_hashes(&e) {
253                self.remove_net_inflight_blocks(hashes.iter())
254            }
255            self.waiting_requests.lock().push(TimedWaitingRequest::new(
256                Instant::now() + cur_delay,
257                WaitingRequest(e, next_delay),
258                None,
259            ));
260        }
261    }
262
263    pub fn request_block_headers(
264        &self, io: &dyn NetworkContext, peer_id: Option<NodeId>,
265        hashes: Vec<H256>, delay: Option<Duration>,
266    ) {
267        let _timer = MeterTimer::time_func(REQUEST_MANAGER_TIMER.as_ref());
268
269        debug!("request_block_headers: {:?}, peer {:?}", hashes, peer_id);
270
271        let request = GetBlockHeaders {
272            request_id: 0,
273            hashes,
274        };
275
276        self.request_with_delay(io, Box::new(request), peer_id, delay);
277    }
278
279    pub fn request_epoch_hashes(
280        &self, io: &dyn NetworkContext, peer_id: Option<NodeId>,
281        epochs: Vec<u64>, delay: Option<Duration>,
282    ) {
283        let request = GetBlockHashesByEpoch {
284            request_id: 0,
285            epochs,
286        };
287
288        self.request_with_delay(io, Box::new(request), peer_id, delay);
289    }
290
291    pub fn request_blocks(
292        &self, io: &dyn NetworkContext, peer_id: Option<NodeId>,
293        hashes: Vec<H256>, with_public: bool, delay: Option<Duration>,
294        preferred_node_type: Option<NodeType>,
295    ) {
296        let _timer = MeterTimer::time_func(REQUEST_MANAGER_TIMER.as_ref());
297        debug!("request_blocks: hashes={:?}", hashes);
298
299        let request = GetBlocks {
300            request_id: 0,
301            with_public,
302            hashes,
303            preferred_node_type,
304        };
305
306        self.request_with_delay(io, Box::new(request), peer_id, delay);
307    }
308
309    pub fn request_transactions_from_digest(
310        &self, io: &dyn NetworkContext, peer_id: NodeId,
311        transaction_digests: &TransactionDigests,
312    ) {
313        let _timer = MeterTimer::time_func(REQUEST_MANAGER_TX_TIMER.as_ref());
314
315        let window_index: usize = transaction_digests.window_index;
316        let key1 = transaction_digests.key1;
317        let key2 = transaction_digests.key2;
318        let (random_byte_vector, fixed_bytes_vector) =
319            transaction_digests.get_decomposed_short_ids();
320
321        if fixed_bytes_vector.is_empty()
322            && transaction_digests.tx_hashes.is_empty()
323        {
324            return;
325        }
326
327        let mut tx_from_short_id_inflight_keys =
328            self.inflight_keys.write(msgid::GET_TRANSACTIONS);
329        let mut tx_from_hashes_inflight_keys = self
330            .inflight_keys
331            .write(msgid::GET_TRANSACTIONS_FROM_TX_HASHES);
332        let received_transactions = self.received_transactions.read();
333
334        INFLIGHT_TX_POOL_GAUGE.update(tx_from_short_id_inflight_keys.len());
335        TX_HASHES_INFLIGHT_TX_POOL_GAUGE
336            .update(tx_from_hashes_inflight_keys.len());
337        TX_RECEIVED_POOL_METER.mark(received_transactions.get_length());
338
339        let (
340            short_ids,
341            tx_hashes,
342            tx_request_indices,
343            hashes_request_indices,
344            inflight_pending_items,
345        ) = {
346            let mut short_ids = HashSet::new();
347            let mut tx_hashes = HashSet::new();
348            let mut tx_request_indices = Vec::new();
349            let mut hashes_request_indices = Vec::new();
350            let mut inflight_pending_items: Vec<
351                InflightPendingTransactionItem,
352            > = Vec::new();
353
354            //process short ids
355            for i in 0..fixed_bytes_vector.len() {
356                let fixed_bytes = fixed_bytes_vector[i];
357                let random_bytes = random_byte_vector[i];
358
359                if received_transactions.contains_short_id(
360                    fixed_bytes,
361                    random_bytes,
362                    key1,
363                    key2,
364                ) {
365                    if received_transactions.group_overflow(&fixed_bytes) {
366                        hashes_request_indices.push(i);
367                    }
368                    // Already received or need to request long id
369                    continue;
370                }
371
372                if tx_from_short_id_inflight_keys.insert(Key::Id(fixed_bytes)) {
373                    tx_request_indices.push(i);
374                    short_ids.insert(fixed_bytes);
375                } else {
376                    // Already being requested, put in inflight pending queue
377                    inflight_pending_items.push(
378                        InflightPendingTransactionItem::new(
379                            fixed_bytes,
380                            random_bytes,
381                            window_index,
382                            key1,
383                            key2,
384                            i,
385                            peer_id.clone(),
386                        ),
387                    );
388                    INFLIGHT_TX_PENDING_POOL_METER.mark(1);
389                }
390            }
391
392            //process tx hashes
393            let base_index = fixed_bytes_vector.len();
394            for i in 0..transaction_digests.tx_hashes.len() {
395                let tx_hash = transaction_digests.tx_hashes[i];
396                if received_transactions.contains_tx_hash(&tx_hash) {
397                    continue;
398                }
399                if tx_from_hashes_inflight_keys.insert(Key::Hash(tx_hash)) {
400                    tx_request_indices.push(base_index + i);
401                    tx_hashes.insert(tx_hash);
402                } else {
403                    // Already being requested
404                    INFLIGHT_TX_REJECT_METER.mark(1);
405                }
406            }
407
408            (
409                short_ids,
410                tx_hashes,
411                tx_request_indices,
412                hashes_request_indices,
413                inflight_pending_items,
414            )
415        };
416        TX_REQUEST_METER.mark(tx_request_indices.len());
417        TX_REQUEST_TX_HASHES_METER.mark(hashes_request_indices.len());
418        debug!(
419            "Request {} tx and {} tx hashes from peer={}",
420            tx_request_indices.len(),
421            hashes_request_indices.len(),
422            peer_id.clone()
423        );
424
425        let request = GetTransactions {
426            request_id: 0,
427            window_index,
428            indices: tx_request_indices,
429            tx_hashes_indices: hashes_request_indices,
430            short_ids: short_ids.clone(),
431            tx_hashes: tx_hashes.clone(),
432        };
433
434        if request.is_empty() {
435            return;
436        }
437
438        if self
439            .request_handler
440            .send_request(io, Some(peer_id), Box::new(request), None)
441            .is_err()
442        {
443            for id in short_ids {
444                tx_from_short_id_inflight_keys.remove(&Key::Id(id));
445            }
446            for id in tx_hashes {
447                tx_from_hashes_inflight_keys.remove(&Key::Hash(id));
448            }
449            return;
450        }
451
452        self.inflight_pending_transactions
453            .write()
454            .append_inflight_pending_items(inflight_pending_items);
455    }
456
457    pub fn request_transactions_from_tx_hashes(
458        &self, io: &dyn NetworkContext, peer_id: NodeId,
459        responded_tx_hashes: Vec<H256>, window_index: usize,
460        tx_hashes_indices: &Vec<usize>,
461    ) {
462        let _timer = MeterTimer::time_func(REQUEST_MANAGER_TX_TIMER.as_ref());
463
464        if responded_tx_hashes.is_empty() {
465            return;
466        }
467
468        let mut tx_from_hashes_inflight_keys = self
469            .inflight_keys
470            .write(msgid::GET_TRANSACTIONS_FROM_TX_HASHES);
471        let received_transactions = self.received_transactions.read();
472
473        TX_HASHES_INFLIGHT_TX_POOL_GAUGE
474            .update(tx_from_hashes_inflight_keys.len());
475        TX_RECEIVED_POOL_METER.mark(received_transactions.get_length());
476
477        let (tx_hashes, indices) = {
478            let mut tx_hashes = HashSet::new();
479            let mut indices = Vec::new();
480
481            for i in 0..responded_tx_hashes.len() {
482                let tx_hash = responded_tx_hashes[i];
483                if received_transactions.contains_tx_hash(&tx_hash) {
484                    // Already received
485                    continue;
486                }
487
488                if tx_from_hashes_inflight_keys.insert(Key::Hash(tx_hash)) {
489                    indices.push(tx_hashes_indices[i]);
490                    tx_hashes.insert(tx_hash);
491                } else {
492                    // Already being requested
493                    INFLIGHT_TX_REJECT_METER.mark(1);
494                }
495            }
496
497            (tx_hashes, indices)
498        };
499        TX_REQUEST_METER.mark(tx_hashes.len());
500        debug!(
501            "Request {} tx using tx hashes from peer={}",
502            indices.len(),
503            peer_id
504        );
505
506        let request = GetTransactionsFromTxHashes {
507            request_id: 0,
508            window_index,
509            indices,
510            tx_hashes: tx_hashes.clone(),
511        };
512
513        if request.is_empty() {
514            return;
515        }
516
517        if self
518            .request_handler
519            .send_request(io, Some(peer_id), Box::new(request), None)
520            .is_err()
521        {
522            for id in tx_hashes {
523                tx_from_hashes_inflight_keys.remove(&Key::Hash(id));
524            }
525        }
526    }
527
528    pub fn request_compact_blocks(
529        &self, io: &dyn NetworkContext, peer_id: Option<NodeId>,
530        hashes: Vec<H256>, delay: Option<Duration>,
531    ) {
532        let _timer = MeterTimer::time_func(REQUEST_MANAGER_TIMER.as_ref());
533        debug!("request_compact_blocks: hashes={:?}", hashes);
534
535        let request = GetCompactBlocks {
536            request_id: 0,
537            hashes,
538        };
539
540        self.request_with_delay(io, Box::new(request), peer_id, delay);
541    }
542
543    pub fn request_blocktxn(
544        &self, io: &dyn NetworkContext, peer_id: NodeId, block_hash: H256,
545        index_skips: Vec<usize>, delay: Option<Duration>,
546    ) {
547        let _timer = MeterTimer::time_func(REQUEST_MANAGER_TIMER.as_ref());
548
549        let request = GetBlockTxn {
550            request_id: 0,
551            block_hash: block_hash.clone(),
552            index_skips,
553        };
554
555        self.request_with_delay(io, Box::new(request), Some(peer_id), delay);
556    }
557
558    fn send_request_again(
559        &self, io: &dyn NetworkContext, msg: &RequestMessage,
560    ) {
561        debug!("send_request_again, request={:?}", msg.request);
562        if let Some(request) = msg.request.resend() {
563            let mut filter = PeerFilter::new(request.msg_id());
564            if let Some(preferred_node_type) = request.preferred_node_type() {
565                filter = filter.with_preferred_node_type(preferred_node_type);
566            }
567            if let Some(cap) = request.required_capability() {
568                filter = filter.with_cap(cap);
569            }
570            let chosen_peer = filter.select(&self.syn);
571            debug!("send_request_again with new request, peer={:?}, new request={:?}", chosen_peer, request);
572            self.request_with_delay(io, request, chosen_peer, msg.delay);
573        }
574    }
575
576    pub fn send_pending_requests(
577        &self, io: &dyn NetworkContext, peer: &NodeId,
578    ) {
579        self.request_handler.send_pending_requests(io, peer)
580    }
581
582    pub fn resend_request_to_another_peer(
583        &self, io: &dyn NetworkContext, req: &RequestMessage,
584    ) {
585        req.request.on_removed(&self.inflight_keys);
586        self.send_request_again(io, req);
587    }
588
589    // Match request with given response.
590    // No need to let caller handle request resending.
591    pub fn match_request(
592        &self, peer_id: &NodeId, request_id: u64,
593    ) -> Result<RequestMessage, Error> {
594        self.request_handler.match_request(peer_id, request_id)
595    }
596
597    /// Remove inflight keys when a header is received.
598    ///
599    /// If a request is removed from `req_hashes`, it's the caller's
600    /// responsibility to ensure that the removed request either has already
601    /// received or will be requested by the caller again.
602    pub fn headers_received(
603        &self, io: &dyn NetworkContext, req_hashes: HashSet<H256>,
604        mut received_headers: HashSet<H256>, delay: Option<Duration>,
605    ) {
606        let _timer = MeterTimer::time_func(REQUEST_MANAGER_TIMER.as_ref());
607        debug!(
608            "headers_received: req_hashes={:?} received_headers={:?}",
609            req_hashes, received_headers
610        );
611        let missing_headers = {
612            let mut inflight_keys =
613                self.inflight_keys.write(msgid::GET_BLOCK_HEADERS);
614            let mut missing_headers = Vec::new();
615            for req_hash in &req_hashes {
616                if !received_headers.remove(req_hash) {
617                    // If `req_hash` is not in `headers_in_flight`, it may has
618                    // been received or requested
619                    // again by another thread, so we do not need to request it
620                    // in that case
621                    if inflight_keys.remove(&Key::Hash(*req_hash)) {
622                        missing_headers.push(*req_hash);
623                    }
624                } else {
625                    inflight_keys.remove(&Key::Hash(*req_hash));
626                }
627            }
628            for h in &received_headers {
629                inflight_keys.remove(&Key::Hash(*h));
630            }
631            missing_headers
632        };
633        if !missing_headers.is_empty() {
634            let chosen_peer =
635                PeerFilter::new(msgid::GET_BLOCK_HEADERS).select(&self.syn);
636            self.request_block_headers(io, chosen_peer, missing_headers, delay);
637        }
638    }
639
640    /// Remove from inflight keys when a epoch is received.
641    pub fn epochs_received(
642        &self, io: &dyn NetworkContext, req_epochs: HashSet<u64>,
643        mut received_epochs: HashSet<u64>, delay: Option<Duration>,
644    ) {
645        debug!(
646            "epochs_received: req_epochs={:?} received_epochs={:?}",
647            req_epochs, received_epochs
648        );
649        let missing_epochs = {
650            let mut inflight_keys =
651                self.inflight_keys.write(msgid::GET_BLOCK_HASHES_BY_EPOCH);
652            let mut missing_epochs = Vec::new();
653            for epoch_number in &req_epochs {
654                if !received_epochs.remove(epoch_number) {
655                    // If `epoch_number` is not in `epochs_in_flight`, it may
656                    // has been received or requested
657                    // again by another thread, so we do not need to request it
658                    // in that case
659                    if inflight_keys.remove(&Key::Num(*epoch_number)) {
660                        missing_epochs.push(*epoch_number);
661                    }
662                } else {
663                    inflight_keys.remove(&Key::Num(*epoch_number));
664                }
665            }
666            for epoch_number in &received_epochs {
667                inflight_keys.remove(&Key::Num(*epoch_number));
668            }
669            missing_epochs
670        };
671        if !missing_epochs.is_empty() {
672            let chosen_peer = PeerFilter::new(msgid::GET_BLOCK_HASHES_BY_EPOCH)
673                .select(&self.syn);
674            self.request_epoch_hashes(io, chosen_peer, missing_epochs, delay);
675        }
676    }
677
678    /// Remove from inflight keys when a block is received.
679    ///
680    /// If a request is removed from `req_hashes`, it's the caller's
681    /// responsibility to ensure that the removed request either has already
682    /// received or will be requested by the caller again (the case for
683    /// `Blocktxn`).
684    pub fn blocks_received(
685        &self, io: &dyn NetworkContext, requested_hashes: HashSet<H256>,
686        mut received_blocks: HashSet<H256>, ask_full_block: bool,
687        peer: Option<NodeId>, with_public: bool, delay: Option<Duration>,
688        preferred_node_type_for_block_request: Option<NodeType>,
689    ) {
690        let _timer = MeterTimer::time_func(REQUEST_MANAGER_TIMER.as_ref());
691        debug!(
692            "blocks_received: req_hashes={:?} received_blocks={:?} peer={:?}",
693            requested_hashes, received_blocks, peer
694        );
695        let missing_blocks = {
696            let mut inflight_blocks =
697                self.inflight_keys.write(msgid::GET_BLOCKS);
698            let mut net_inflight_blocks =
699                self.inflight_keys.write(msgid::NET_INFLIGHT_BLOCKS);
700            let mut missing_blocks = Vec::new();
701            for req_hash in &requested_hashes {
702                net_inflight_blocks.remove(&Key::Hash(*req_hash));
703                if !received_blocks.remove(req_hash) {
704                    // If `req_hash` is not in `blocks_in_flight`, it may has
705                    // been received or requested
706                    // again by another thread, so we do not need to request it
707                    // in that case
708                    if inflight_blocks.remove(&Key::Hash(*req_hash)) {
709                        missing_blocks.push(*req_hash);
710                    }
711                } else {
712                    inflight_blocks.remove(&Key::Hash(*req_hash));
713                }
714            }
715            for h in &received_blocks {
716                net_inflight_blocks.remove(&Key::Hash(*h));
717                inflight_blocks.remove(&Key::Hash(*h));
718            }
719            missing_blocks
720        };
721        if !missing_blocks.is_empty() {
722            // `peer` is passed in for the case that a compact block is received
723            // and a full block is reconstructed, but the full block
724            // is incorrect. We should ask the same peer for the
725            // full block instead of choosing a random peer.
726            let chosen_peer = peer.or_else(|| {
727                let msg_id = if ask_full_block {
728                    msgid::GET_BLOCKS
729                } else {
730                    msgid::GET_CMPCT_BLOCKS
731                };
732
733                PeerFilter::new(msg_id).select(&self.syn)
734            });
735            if ask_full_block {
736                self.request_blocks(
737                    io,
738                    chosen_peer,
739                    missing_blocks,
740                    with_public,
741                    delay,
742                    preferred_node_type_for_block_request,
743                );
744            } else {
745                self.request_compact_blocks(
746                    io,
747                    chosen_peer,
748                    missing_blocks,
749                    delay,
750                );
751            }
752        }
753    }
754
755    pub fn transactions_received_from_digests(
756        &self, io: &dyn NetworkContext,
757        get_transactions_request: &GetTransactions,
758        signed_transactions: Vec<Arc<SignedTransaction>>,
759    ) {
760        let mut short_id_inflight_keys =
761            self.inflight_keys.write(msgid::GET_TRANSACTIONS);
762        let mut tx_hash_inflight_keys = self
763            .inflight_keys
764            .write(msgid::GET_TRANSACTIONS_FROM_TX_HASHES);
765
766        let (requests, keeped_short_ids) = self
767            .inflight_pending_transactions
768            .write()
769            .generate_tx_requests_from_inflight_pending_pool(
770                &signed_transactions,
771            );
772
773        self.append_received_transactions(signed_transactions);
774        for tx in &get_transactions_request.short_ids {
775            if !keeped_short_ids.contains(tx) {
776                short_id_inflight_keys.remove(&Key::Id(*tx));
777            }
778        }
779        for tx in &get_transactions_request.tx_hashes {
780            tx_hash_inflight_keys.remove(&Key::Hash(*tx));
781        }
782
783        //request transactions from inflight pending pool
784        if requests.is_empty() {
785            return;
786        }
787        REQUEST_TX_FROM_INFLIGHT_PENDING_POOL_METER.mark(requests.len());
788        for request in requests {
789            let tx_request = GetTransactions {
790                request_id: 0,
791                window_index: request.window_index,
792                indices: vec![request.index],
793                tx_hashes_indices: vec![],
794                short_ids: {
795                    let mut set = HashSet::new();
796                    set.insert(request.fixed_byte_part);
797                    set
798                },
799                tx_hashes: HashSet::new(),
800            };
801            if self
802                .request_handler
803                .send_request(
804                    io,
805                    Some(request.peer_id),
806                    Box::new(tx_request),
807                    None,
808                )
809                .is_err()
810            {
811                short_id_inflight_keys
812                    .remove(&Key::Id(request.fixed_byte_part));
813            }
814        }
815    }
816
817    pub fn transactions_received_from_tx_hashes(
818        &self, get_transactions_request: &GetTransactionsFromTxHashes,
819        signed_transactions: Vec<Arc<SignedTransaction>>,
820    ) {
821        let mut tx_hash_inflight_keys = self
822            .inflight_keys
823            .write(msgid::GET_TRANSACTIONS_FROM_TX_HASHES);
824        for tx in &get_transactions_request.tx_hashes {
825            tx_hash_inflight_keys.remove(&Key::Hash(*tx));
826        }
827        self.append_received_transactions(signed_transactions);
828    }
829
830    pub fn get_sent_transactions(
831        &self, window_index: usize, indices: &Vec<usize>,
832    ) -> Vec<TransactionWithSignature> {
833        let sent_transactions = self.sent_transactions.read();
834        let mut txs = Vec::with_capacity(indices.len());
835        for index in indices {
836            if let Some(tx) =
837                sent_transactions.get_transaction(window_index, *index)
838            {
839                txs.push(tx.transaction.clone());
840            }
841        }
842        txs
843    }
844
845    pub fn append_sent_transactions(
846        &self, transactions: Vec<Arc<SignedTransaction>>,
847    ) -> usize {
848        self.sent_transactions
849            .write()
850            .append_transactions(transactions)
851    }
852
853    pub fn append_received_transactions(
854        &self, transactions: Vec<Arc<SignedTransaction>>,
855    ) {
856        self.received_transactions
857            .write()
858            .append_transactions(transactions)
859    }
860
861    pub fn resend_timeout_requests(&self, io: &dyn NetworkContext) {
862        debug!("resend_timeout_requests: start");
863        let timeout_requests =
864            self.request_handler.process_timeout_requests(io);
865        for req in timeout_requests {
866            debug!("Timeout requests: {:?}", req);
867            self.resend_request_to_another_peer(io, &req);
868        }
869    }
870
871    /// Send waiting requests that their backoff delay have passes.
872    /// Return the cancelled requests that have timeout too many times.
873    pub fn resend_waiting_requests(
874        &self, io: &dyn NetworkContext, remove_timeout_requests: bool,
875        prefer_archive_node_for_blocks: bool,
876    ) -> Vec<Box<dyn Request>> {
877        debug!("resend_waiting_requests: start");
878        let mut waiting_requests = self.waiting_requests.lock();
879        let now = Instant::now();
880        let mut batcher =
881            RequestBatcher::new(*DEFAULT_REQUEST_BATCH_BUCKET_SIZE);
882
883        let mut cancelled_requests = Vec::new();
884        while let Some(req) = waiting_requests.pop() {
885            if req.time_to_send >= now {
886                waiting_requests.push(req);
887                break;
888            } else if remove_timeout_requests
889                && req.request.1 > *DEFAULT_REQUEST_DELAY_UPPER_BOUND
890            {
891                // Discard stale requests
892                warn!("Request is in-flight for over an hour: {:?}", req);
893                req.request.0.on_removed(&self.inflight_keys);
894                cancelled_requests.push(req.request.0);
895                continue;
896            }
897
898            // Waiting requests are already in-flight, so send them without
899            // checking
900            let WaitingRequest(request, delay) = req.request;
901            let request = match request.resend() {
902                Some(r) => r,
903                None => continue,
904            };
905            if !self.check_and_update_net_inflight_blocks(&request) {
906                // Keep GetBlocks requests in queue
907                // when we do not have the capability to process them.
908                // We resend GetCompactBlocks as GetBlocks, so only check
909                // GET_BLOCKS here.
910                waiting_requests.push(TimedWaitingRequest::new(
911                    now + delay,
912                    // Do not increase delay because this is not a failure.
913                    WaitingRequest(request, delay),
914                    None,
915                ));
916                continue;
917            }
918            batcher.insert(delay, request);
919        }
920
921        for (next_delay, request) in
922            batcher.get_batched_requests(prefer_archive_node_for_blocks)
923        {
924            let mut filter = PeerFilter::new(request.msg_id());
925            if let Some(cap) = request.required_capability() {
926                filter = filter.with_cap(cap);
927            }
928            if let Some(preferred_node_type) = request.preferred_node_type() {
929                filter = filter.with_preferred_node_type(preferred_node_type);
930            }
931            let chosen_peer = match filter.select(&self.syn) {
932                Some(p) => p,
933                None => {
934                    debug!("No peer to send request, wait for next time");
935                    // These requests are not actually sent,
936                    // and they will not be inserted into requests_queue,
937                    // so remove them from net_inflight_blocks.
938                    if let Some(hashes) = try_get_block_hashes(&request) {
939                        self.remove_net_inflight_blocks(hashes.iter())
940                    }
941                    waiting_requests.push(TimedWaitingRequest::new(
942                        Instant::now() + next_delay,
943                        WaitingRequest(request, next_delay),
944                        None,
945                    ));
946                    continue;
947                }
948            };
949            debug!(
950                "Send waiting req {:?} to peer={} with next_delay={:?}",
951                request, chosen_peer, next_delay
952            );
953
954            if let Err(request) = self.request_handler.send_request(
955                io,
956                Some(chosen_peer),
957                request,
958                Some(next_delay),
959            ) {
960                waiting_requests.push(TimedWaitingRequest::new(
961                    Instant::now() + next_delay,
962                    WaitingRequest(request, next_delay),
963                    None,
964                ));
965            }
966        }
967        cancelled_requests
968    }
969
970    pub fn on_peer_connected(&self, peer: &NodeId) {
971        self.request_handler.add_peer(*peer);
972    }
973
974    pub fn on_peer_disconnected(&self, io: &dyn NetworkContext, peer: &NodeId) {
975        if let Some(unfinished_requests) =
976            self.request_handler.remove_peer(peer)
977        {
978            for msg in unfinished_requests {
979                self.resend_request_to_another_peer(io, &msg);
980            }
981        } else {
982            debug!("Peer already removed form request manager when disconnected peer={}", peer);
983        }
984    }
985
986    fn check_and_update_net_inflight_blocks(
987        &self, request: &Box<dyn Request>,
988    ) -> bool {
989        if let Some(hashes) = try_get_block_hashes(request) {
990            // Insert the request into waiting queue when the queue is
991            // already full, to avoid requesting more blocks
992            // than we can process. Requests will be
993            // inserted to waiting queue if peer_id is None.
994            let mut net_inflight_blocks =
995                self.inflight_keys.write(msgid::NET_INFLIGHT_BLOCKS);
996            if net_inflight_blocks.len()
997                >= self.recover_public_queue.estimated_available_count()
998            {
999                trace!("queue is full, send block request later: inflight={} req={:?}",
1000                           net_inflight_blocks.len(), request);
1001                return false;
1002            } else {
1003                for hash in hashes {
1004                    net_inflight_blocks.insert(Key::Hash(*hash));
1005                }
1006                trace!("queue is not full, send block request now: inflight={} req={:?}",
1007                           net_inflight_blocks.len(), request);
1008            }
1009        }
1010        true
1011    }
1012
1013    pub fn remove_net_inflight_blocks<'a, I: Iterator<Item = &'a H256>>(
1014        &self, blocks: I,
1015    ) {
1016        let mut net_inflight_blocks =
1017            self.inflight_keys.write(msgid::NET_INFLIGHT_BLOCKS);
1018        for block_hash in blocks {
1019            net_inflight_blocks.remove(&Key::Hash(*block_hash));
1020        }
1021    }
1022}
1023
1024/// Return block hashes in `request` if it's requesting blocks.
1025/// Return None otherwise.
1026pub fn try_get_block_hashes(request: &Box<dyn Request>) -> Option<&Vec<H256>> {
1027    match request.msg_id() {
1028        msgid::GET_BLOCKS | msgid::GET_CMPCT_BLOCKS => {
1029            let hashes = if let Some(req) =
1030                request.as_any().downcast_ref::<GetBlocks>()
1031            {
1032                &req.hashes
1033            } else if let Some(req) =
1034                request.as_any().downcast_ref::<GetCompactBlocks>()
1035            {
1036                &req.hashes
1037            } else {
1038                panic!(
1039                    "MessageId and Request not match, request={:?}",
1040                    request
1041                );
1042            };
1043            Some(hashes)
1044        }
1045        _ => None,
1046    }
1047}
1048
1049#[derive(Debug, DeriveMallocSizeOf)]
1050struct TimedWaitingRequest {
1051    time_to_send: Instant,
1052    request: WaitingRequest,
1053    peer: Option<NodeId>,
1054}
1055
1056impl TimedWaitingRequest {
1057    fn new(
1058        time_to_send: Instant, request: WaitingRequest, peer: Option<NodeId>,
1059    ) -> Self {
1060        Self {
1061            time_to_send,
1062            request,
1063            peer,
1064        }
1065    }
1066}
1067
1068impl Ord for TimedWaitingRequest {
1069    fn cmp(&self, other: &Self) -> Ordering {
1070        other.time_to_send.cmp(&self.time_to_send)
1071    }
1072}
1073impl PartialOrd for TimedWaitingRequest {
1074    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1075        Some(self.cmp(other))
1076    }
1077}
1078impl Eq for TimedWaitingRequest {}
1079impl PartialEq for TimedWaitingRequest {
1080    fn eq(&self, other: &Self) -> bool {
1081        self.time_to_send == other.time_to_send
1082    }
1083}