1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
// Copyright 2019 Conflux Foundation. All rights reserved.
// Conflux is free software and distributed under GNU General Public License.
// See http://www.gnu.org/licenses/

use super::{HasKey, PriorityQueue};
use crate::{
    light_protocol::{
        common::{FullPeerFilter, FullPeerState, Peers},
        Error,
    },
    message::{MsgId, RequestId},
};
use network::node_table::NodeId;
use parking_lot::{Mutex, RwLock};
use std::{
    cmp::Ord,
    collections::HashMap,
    fmt::Debug,
    hash::Hash,
    sync::Arc,
    time::{Duration, Instant},
};
use throttling::token_bucket::ThrottleResult;

#[derive(Debug)]
struct InFlightRequest<T> {
    pub item: T,
    pub request_id: RequestId,
    pub sent_at: Instant,
}

impl<T> InFlightRequest<T> {
    pub fn new(item: T, request_id: RequestId) -> Self {
        InFlightRequest {
            item,
            request_id,
            sent_at: Instant::now(),
        }
    }
}

pub struct SyncManager<Key, Item> {
    // headers requested but not received yet
    in_flight: RwLock<HashMap<Key, InFlightRequest<Item>>>,

    // collection of all peers available
    peers: Arc<Peers<FullPeerState>>,

    // mutex used to make sure at most one thread drives sync at any given time
    sync_lock: Mutex<()>,

    // priority queue of headers we need excluding the ones in `in_flight`
    waiting: RwLock<PriorityQueue<Key, Item>>,

    // used to filter peer to send request
    request_msg_id: MsgId,
}

impl<Key, Item> SyncManager<Key, Item>
where
    Key: Clone + Eq + Hash,
    Item: Debug + Clone + HasKey<Key> + Ord,
{
    pub fn new(
        peers: Arc<Peers<FullPeerState>>, request_msg_id: MsgId,
    ) -> Self {
        let in_flight = RwLock::new(HashMap::new());
        let sync_lock = Default::default();
        let waiting = RwLock::new(PriorityQueue::new());

        SyncManager {
            in_flight,
            peers,
            sync_lock,
            waiting,
            request_msg_id,
        }
    }

    #[inline]
    pub fn num_waiting(&self) -> usize { self.waiting.read().len() }

    #[inline]
    pub fn num_in_flight(&self) -> usize { self.in_flight.read().len() }

    #[inline]
    #[allow(dead_code)]
    pub fn contains(&self, key: &Key) -> bool {
        self.in_flight.read().contains_key(key)
            || self.waiting.read().contains(&key)
    }

    #[inline]
    fn get_existing_peer_state(
        &self, peer: &NodeId,
    ) -> Result<Arc<RwLock<FullPeerState>>, Error> {
        match self.peers.get(peer) {
            Some(state) => Ok(state),
            None => {
                bail!(Error::InternalError(format!(
                    "Received message from unknown peer={:?}",
                    peer
                )));
            }
        }
    }

    #[inline]
    pub fn check_if_requested(
        &self, peer: &NodeId, request_id: RequestId, key: &Key,
    ) -> Result<Option<RequestId>, Error> {
        let id = match self.in_flight.read().get(&key).map(|req| req.request_id)
        {
            Some(id) if id == request_id => return Ok(Some(id)),
            x => x,
        };

        let peer = self.get_existing_peer_state(peer)?;

        let bucket_name = self.request_msg_id.to_string();
        let bucket = match peer.read().unexpected_msgs.get(&bucket_name) {
            Some(bucket) => bucket,
            None => return Ok(id),
        };

        let result = bucket.lock().throttle_default();

        match result {
            ThrottleResult::Success => Ok(id),
            ThrottleResult::Throttled(_) => Ok(id),
            ThrottleResult::AlreadyThrottled => {
                bail!(Error::UnexpectedResponse {
                    expected: id,
                    received: request_id,
                });
            }
        }
    }

    #[inline]
    pub fn remove_in_flight(&self, key: &Key) {
        self.in_flight.write().remove(&key);
    }

    #[inline]
    pub fn insert_waiting<I>(&self, items: I)
    where I: Iterator<Item = Item> {
        let in_flight = self.in_flight.read();
        let mut waiting = self.waiting.write();
        let missing = items.filter(|item| !in_flight.contains_key(&item.key()));
        waiting.extend(missing);
    }

    pub fn sync(
        &self, max_in_flight: usize, batch_size: usize,
        request: impl Fn(&NodeId, Vec<Key>) -> Result<Option<RequestId>, Error>,
    ) {
        let _guard = match self.sync_lock.try_lock() {
            None => return,
            Some(g) => g,
        };

        // check if there are any peers available
        if self.peers.is_empty() {
            debug!("No peers available; aborting sync");
            return;
        }

        loop {
            // unlock after each batch so that we do not block other threads
            let mut in_flight = self.in_flight.write();
            let mut waiting = self.waiting.write();

            // collect batch
            let max_to_request = max_in_flight.saturating_sub(in_flight.len());
            let num_to_request = std::cmp::min(max_to_request, batch_size);

            if num_to_request == 0 {
                return;
            }

            let mut batch: Vec<Item> = vec![];

            // NOTE: cannot use iterator on BinaryHeap as
            // it returns elements in arbitrary order!
            while let Some(item) = waiting.pop() {
                // skip occasional items already in flight
                // this can happen if an item is inserted using
                // `insert_waiting`, then inserted again using
                // `request_now`
                if !in_flight.contains_key(&item.key()) {
                    batch.push(item);
                }

                if batch.len() == num_to_request {
                    break;
                }
            }

            // we're finished when there's nothing more to request
            if batch.is_empty() {
                return;
            }

            // select peer for batch
            let peer = match FullPeerFilter::new(self.request_msg_id)
                .select(self.peers.clone())
            {
                Some(peer) => peer,
                None => {
                    warn!("No peers available");
                    waiting.extend(batch.to_owned().into_iter());
                    return;
                }
            };

            let keys = batch.iter().map(|h| h.key()).collect();

            match request(&peer, keys) {
                Ok(None) => {}
                Ok(Some(request_id)) => {
                    let new_in_flight =
                        batch.to_owned().into_iter().map(|item| {
                            (item.key(), InFlightRequest::new(item, request_id))
                        });

                    in_flight.extend(new_in_flight);
                }
                Err(e) => {
                    warn!(
                        "Failed to request items {:?} from peer {:?}: {:?}",
                        batch, peer, e
                    );

                    waiting.extend(batch.to_owned().into_iter());
                }
            }
        }
    }

    #[inline]
    pub fn remove_timeout_requests(&self, timeout: Duration) -> Vec<Item> {
        let mut in_flight = self.in_flight.write();

        // collect timed-out requests
        let items: Vec<_> = in_flight
            .iter()
            .filter_map(|(_hash, req)| match req.sent_at {
                t if t.elapsed() < timeout => None,
                _ => Some(req.item.clone()),
            })
            .collect();

        // remove requests from `in_flight`
        for item in &items {
            in_flight.remove(&item.key());
        }

        items
    }

    #[inline]
    pub fn request_now<I>(
        &self, items: I,
        request: impl Fn(&NodeId, Vec<Key>) -> Result<Option<RequestId>, Error>,
    ) where
        I: Iterator<Item = Item>,
    {
        let peer = match FullPeerFilter::new(self.request_msg_id)
            .select(self.peers.clone())
        {
            Some(peer) => peer,
            None => {
                warn!("No peers available");
                self.insert_waiting(items);
                return;
            }
        };

        self.request_now_from_peer(items, &peer, request);
    }

    #[inline]
    pub fn request_now_from_peer<I>(
        &self, items: I, peer: &NodeId,
        request: impl Fn(&NodeId, Vec<Key>) -> Result<Option<RequestId>, Error>,
    ) where
        I: Iterator<Item = Item>,
    {
        let mut in_flight = self.in_flight.write();

        let missing = items
            .filter(|item| !in_flight.contains_key(&item.key()))
            .collect::<Vec<_>>();

        let keys = missing.iter().map(|h| h.key()).collect();

        match request(peer, keys) {
            Ok(None) => {}
            Ok(Some(request_id)) => {
                let new_in_flight = missing.into_iter().map(|item| {
                    (item.key(), InFlightRequest::new(item, request_id))
                });

                in_flight.extend(new_in_flight);
            }
            Err(e) => {
                warn!(
                    "Failed to request {:?} from {:?}: {:?}",
                    missing, peer, e
                );

                drop(in_flight);
                self.insert_waiting(missing.into_iter());
            }
        }
    }
}