cfxcore/light_protocol/handler/sync/common/
sync_manager.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use super::{HasKey, PriorityQueue};
6use crate::{
7    light_protocol::{
8        common::{FullPeerFilter, FullPeerState, Peers},
9        Error,
10    },
11    message::{MsgId, RequestId},
12};
13use network::node_table::NodeId;
14use parking_lot::{Mutex, RwLock};
15use std::{
16    cmp::Ord,
17    collections::HashMap,
18    fmt::Debug,
19    hash::Hash,
20    sync::Arc,
21    time::{Duration, Instant},
22};
23use throttling::token_bucket::ThrottleResult;
24
25#[derive(Debug)]
26struct InFlightRequest<T> {
27    pub item: T,
28    pub request_id: RequestId,
29    pub sent_at: Instant,
30}
31
32impl<T> InFlightRequest<T> {
33    pub fn new(item: T, request_id: RequestId) -> Self {
34        InFlightRequest {
35            item,
36            request_id,
37            sent_at: Instant::now(),
38        }
39    }
40}
41
42pub struct SyncManager<Key, Item> {
43    // headers requested but not received yet
44    in_flight: RwLock<HashMap<Key, InFlightRequest<Item>>>,
45
46    // collection of all peers available
47    peers: Arc<Peers<FullPeerState>>,
48
49    // mutex used to make sure at most one thread drives sync at any given time
50    sync_lock: Mutex<()>,
51
52    // priority queue of headers we need excluding the ones in `in_flight`
53    waiting: RwLock<PriorityQueue<Key, Item>>,
54
55    // used to filter peer to send request
56    request_msg_id: MsgId,
57}
58
59impl<Key, Item> SyncManager<Key, Item>
60where
61    Key: Clone + Eq + Hash,
62    Item: Debug + Clone + HasKey<Key> + Ord,
63{
64    pub fn new(
65        peers: Arc<Peers<FullPeerState>>, request_msg_id: MsgId,
66    ) -> Self {
67        let in_flight = RwLock::new(HashMap::new());
68        let sync_lock = Default::default();
69        let waiting = RwLock::new(PriorityQueue::new());
70
71        SyncManager {
72            in_flight,
73            peers,
74            sync_lock,
75            waiting,
76            request_msg_id,
77        }
78    }
79
80    #[inline]
81    pub fn num_waiting(&self) -> usize { self.waiting.read().len() }
82
83    #[inline]
84    pub fn num_in_flight(&self) -> usize { self.in_flight.read().len() }
85
86    #[inline]
87    #[allow(dead_code)]
88    pub fn contains(&self, key: &Key) -> bool {
89        self.in_flight.read().contains_key(key)
90            || self.waiting.read().contains(&key)
91    }
92
93    #[inline]
94    fn get_existing_peer_state(
95        &self, peer: &NodeId,
96    ) -> Result<Arc<RwLock<FullPeerState>>, Error> {
97        match self.peers.get(peer) {
98            Some(state) => Ok(state),
99            None => {
100                bail!(Error::InternalError(format!(
101                    "Received message from unknown peer={:?}",
102                    peer
103                )));
104            }
105        }
106    }
107
108    #[inline]
109    pub fn check_if_requested(
110        &self, peer: &NodeId, request_id: RequestId, key: &Key,
111    ) -> Result<Option<RequestId>, Error> {
112        let id = match self.in_flight.read().get(&key).map(|req| req.request_id)
113        {
114            Some(id) if id == request_id => return Ok(Some(id)),
115            x => x,
116        };
117
118        let peer = self.get_existing_peer_state(peer)?;
119
120        let bucket_name = self.request_msg_id.to_string();
121        let bucket = match peer.read().unexpected_msgs.get(&bucket_name) {
122            Some(bucket) => bucket,
123            None => return Ok(id),
124        };
125
126        let result = bucket.lock().throttle_default();
127
128        match result {
129            ThrottleResult::Success => Ok(id),
130            ThrottleResult::Throttled(_) => Ok(id),
131            ThrottleResult::AlreadyThrottled => {
132                bail!(Error::UnexpectedResponse {
133                    expected: id,
134                    received: request_id,
135                });
136            }
137        }
138    }
139
140    #[inline]
141    pub fn remove_in_flight(&self, key: &Key) {
142        self.in_flight.write().remove(&key);
143    }
144
145    #[inline]
146    pub fn insert_waiting<I>(&self, items: I)
147    where I: Iterator<Item = Item> {
148        let in_flight = self.in_flight.read();
149        let mut waiting = self.waiting.write();
150        let missing = items.filter(|item| !in_flight.contains_key(&item.key()));
151        waiting.extend(missing);
152    }
153
154    pub fn sync(
155        &self, max_in_flight: usize, batch_size: usize,
156        request: impl Fn(&NodeId, Vec<Key>) -> Result<Option<RequestId>, Error>,
157    ) {
158        let _guard = match self.sync_lock.try_lock() {
159            None => return,
160            Some(g) => g,
161        };
162
163        // check if there are any peers available
164        if self.peers.is_empty() {
165            debug!("No peers available; aborting sync");
166            return;
167        }
168
169        loop {
170            // unlock after each batch so that we do not block other threads
171            let mut in_flight = self.in_flight.write();
172            let mut waiting = self.waiting.write();
173
174            // collect batch
175            let max_to_request = max_in_flight.saturating_sub(in_flight.len());
176            let num_to_request = std::cmp::min(max_to_request, batch_size);
177
178            if num_to_request == 0 {
179                return;
180            }
181
182            let mut batch: Vec<Item> = vec![];
183
184            // NOTE: cannot use iterator on BinaryHeap as
185            // it returns elements in arbitrary order!
186            while let Some(item) = waiting.pop() {
187                // skip occasional items already in flight
188                // this can happen if an item is inserted using
189                // `insert_waiting`, then inserted again using
190                // `request_now`
191                if !in_flight.contains_key(&item.key()) {
192                    batch.push(item);
193                }
194
195                if batch.len() == num_to_request {
196                    break;
197                }
198            }
199
200            // we're finished when there's nothing more to request
201            if batch.is_empty() {
202                return;
203            }
204
205            // select peer for batch
206            let peer = match FullPeerFilter::new(self.request_msg_id)
207                .select(self.peers.clone())
208            {
209                Some(peer) => peer,
210                None => {
211                    warn!("No peers available");
212                    waiting.extend(batch.to_owned().into_iter());
213                    return;
214                }
215            };
216
217            let keys = batch.iter().map(|h| h.key()).collect();
218
219            match request(&peer, keys) {
220                Ok(None) => {}
221                Ok(Some(request_id)) => {
222                    let new_in_flight =
223                        batch.to_owned().into_iter().map(|item| {
224                            (item.key(), InFlightRequest::new(item, request_id))
225                        });
226
227                    in_flight.extend(new_in_flight);
228                }
229                Err(e) => {
230                    warn!(
231                        "Failed to request items {:?} from peer {:?}: {:?}",
232                        batch, peer, e
233                    );
234
235                    waiting.extend(batch.to_owned().into_iter());
236                }
237            }
238        }
239    }
240
241    #[inline]
242    pub fn remove_timeout_requests(&self, timeout: Duration) -> Vec<Item> {
243        let mut in_flight = self.in_flight.write();
244
245        // collect timed-out requests
246        let items: Vec<_> = in_flight
247            .iter()
248            .filter_map(|(_hash, req)| match req.sent_at {
249                t if t.elapsed() < timeout => None,
250                _ => Some(req.item.clone()),
251            })
252            .collect();
253
254        // remove requests from `in_flight`
255        for item in &items {
256            in_flight.remove(&item.key());
257        }
258
259        items
260    }
261
262    #[inline]
263    pub fn request_now<I>(
264        &self, items: I,
265        request: impl Fn(&NodeId, Vec<Key>) -> Result<Option<RequestId>, Error>,
266    ) where
267        I: Iterator<Item = Item>,
268    {
269        let peer = match FullPeerFilter::new(self.request_msg_id)
270            .select(self.peers.clone())
271        {
272            Some(peer) => peer,
273            None => {
274                warn!("No peers available");
275                self.insert_waiting(items);
276                return;
277            }
278        };
279
280        self.request_now_from_peer(items, &peer, request);
281    }
282
283    #[inline]
284    pub fn request_now_from_peer<I>(
285        &self, items: I, peer: &NodeId,
286        request: impl Fn(&NodeId, Vec<Key>) -> Result<Option<RequestId>, Error>,
287    ) where
288        I: Iterator<Item = Item>,
289    {
290        let mut in_flight = self.in_flight.write();
291
292        let missing = items
293            .filter(|item| !in_flight.contains_key(&item.key()))
294            .collect::<Vec<_>>();
295
296        let keys = missing.iter().map(|h| h.key()).collect();
297
298        match request(peer, keys) {
299            Ok(None) => {}
300            Ok(Some(request_id)) => {
301                let new_in_flight = missing.into_iter().map(|item| {
302                    (item.key(), InFlightRequest::new(item, request_id))
303                });
304
305                in_flight.extend(new_in_flight);
306            }
307            Err(e) => {
308                warn!(
309                    "Failed to request {:?} from {:?}: {:?}",
310                    missing, peer, e
311                );
312
313                drop(in_flight);
314                self.insert_waiting(missing.into_iter());
315            }
316        }
317    }
318}