cfxcore/light_protocol/handler/sync/common/
sync_manager.rs1use super::{HasKey, PriorityQueue};
6use crate::{
7 light_protocol::{
8 common::{FullPeerFilter, FullPeerState, Peers},
9 Error,
10 },
11 message::{MsgId, RequestId},
12};
13use network::node_table::NodeId;
14use parking_lot::{Mutex, RwLock};
15use std::{
16 cmp::Ord,
17 collections::HashMap,
18 fmt::Debug,
19 hash::Hash,
20 sync::Arc,
21 time::{Duration, Instant},
22};
23use throttling::token_bucket::ThrottleResult;
24
25#[derive(Debug)]
26struct InFlightRequest<T> {
27 pub item: T,
28 pub request_id: RequestId,
29 pub sent_at: Instant,
30}
31
32impl<T> InFlightRequest<T> {
33 pub fn new(item: T, request_id: RequestId) -> Self {
34 InFlightRequest {
35 item,
36 request_id,
37 sent_at: Instant::now(),
38 }
39 }
40}
41
42pub struct SyncManager<Key, Item> {
43 in_flight: RwLock<HashMap<Key, InFlightRequest<Item>>>,
45
46 peers: Arc<Peers<FullPeerState>>,
48
49 sync_lock: Mutex<()>,
51
52 waiting: RwLock<PriorityQueue<Key, Item>>,
54
55 request_msg_id: MsgId,
57}
58
59impl<Key, Item> SyncManager<Key, Item>
60where
61 Key: Clone + Eq + Hash,
62 Item: Debug + Clone + HasKey<Key> + Ord,
63{
64 pub fn new(
65 peers: Arc<Peers<FullPeerState>>, request_msg_id: MsgId,
66 ) -> Self {
67 let in_flight = RwLock::new(HashMap::new());
68 let sync_lock = Default::default();
69 let waiting = RwLock::new(PriorityQueue::new());
70
71 SyncManager {
72 in_flight,
73 peers,
74 sync_lock,
75 waiting,
76 request_msg_id,
77 }
78 }
79
80 #[inline]
81 pub fn num_waiting(&self) -> usize { self.waiting.read().len() }
82
83 #[inline]
84 pub fn num_in_flight(&self) -> usize { self.in_flight.read().len() }
85
86 #[inline]
87 #[allow(dead_code)]
88 pub fn contains(&self, key: &Key) -> bool {
89 self.in_flight.read().contains_key(key)
90 || self.waiting.read().contains(&key)
91 }
92
93 #[inline]
94 fn get_existing_peer_state(
95 &self, peer: &NodeId,
96 ) -> Result<Arc<RwLock<FullPeerState>>, Error> {
97 match self.peers.get(peer) {
98 Some(state) => Ok(state),
99 None => {
100 bail!(Error::InternalError(format!(
101 "Received message from unknown peer={:?}",
102 peer
103 )));
104 }
105 }
106 }
107
108 #[inline]
109 pub fn check_if_requested(
110 &self, peer: &NodeId, request_id: RequestId, key: &Key,
111 ) -> Result<Option<RequestId>, Error> {
112 let id = match self.in_flight.read().get(&key).map(|req| req.request_id)
113 {
114 Some(id) if id == request_id => return Ok(Some(id)),
115 x => x,
116 };
117
118 let peer = self.get_existing_peer_state(peer)?;
119
120 let bucket_name = self.request_msg_id.to_string();
121 let bucket = match peer.read().unexpected_msgs.get(&bucket_name) {
122 Some(bucket) => bucket,
123 None => return Ok(id),
124 };
125
126 let result = bucket.lock().throttle_default();
127
128 match result {
129 ThrottleResult::Success => Ok(id),
130 ThrottleResult::Throttled(_) => Ok(id),
131 ThrottleResult::AlreadyThrottled => {
132 bail!(Error::UnexpectedResponse {
133 expected: id,
134 received: request_id,
135 });
136 }
137 }
138 }
139
140 #[inline]
141 pub fn remove_in_flight(&self, key: &Key) {
142 self.in_flight.write().remove(&key);
143 }
144
145 #[inline]
146 pub fn insert_waiting<I>(&self, items: I)
147 where I: Iterator<Item = Item> {
148 let in_flight = self.in_flight.read();
149 let mut waiting = self.waiting.write();
150 let missing = items.filter(|item| !in_flight.contains_key(&item.key()));
151 waiting.extend(missing);
152 }
153
154 pub fn sync(
155 &self, max_in_flight: usize, batch_size: usize,
156 request: impl Fn(&NodeId, Vec<Key>) -> Result<Option<RequestId>, Error>,
157 ) {
158 let _guard = match self.sync_lock.try_lock() {
159 None => return,
160 Some(g) => g,
161 };
162
163 if self.peers.is_empty() {
165 debug!("No peers available; aborting sync");
166 return;
167 }
168
169 loop {
170 let mut in_flight = self.in_flight.write();
172 let mut waiting = self.waiting.write();
173
174 let max_to_request = max_in_flight.saturating_sub(in_flight.len());
176 let num_to_request = std::cmp::min(max_to_request, batch_size);
177
178 if num_to_request == 0 {
179 return;
180 }
181
182 let mut batch: Vec<Item> = vec![];
183
184 while let Some(item) = waiting.pop() {
187 if !in_flight.contains_key(&item.key()) {
192 batch.push(item);
193 }
194
195 if batch.len() == num_to_request {
196 break;
197 }
198 }
199
200 if batch.is_empty() {
202 return;
203 }
204
205 let peer = match FullPeerFilter::new(self.request_msg_id)
207 .select(self.peers.clone())
208 {
209 Some(peer) => peer,
210 None => {
211 warn!("No peers available");
212 waiting.extend(batch.to_owned().into_iter());
213 return;
214 }
215 };
216
217 let keys = batch.iter().map(|h| h.key()).collect();
218
219 match request(&peer, keys) {
220 Ok(None) => {}
221 Ok(Some(request_id)) => {
222 let new_in_flight =
223 batch.to_owned().into_iter().map(|item| {
224 (item.key(), InFlightRequest::new(item, request_id))
225 });
226
227 in_flight.extend(new_in_flight);
228 }
229 Err(e) => {
230 warn!(
231 "Failed to request items {:?} from peer {:?}: {:?}",
232 batch, peer, e
233 );
234
235 waiting.extend(batch.to_owned().into_iter());
236 }
237 }
238 }
239 }
240
241 #[inline]
242 pub fn remove_timeout_requests(&self, timeout: Duration) -> Vec<Item> {
243 let mut in_flight = self.in_flight.write();
244
245 let items: Vec<_> = in_flight
247 .iter()
248 .filter_map(|(_hash, req)| match req.sent_at {
249 t if t.elapsed() < timeout => None,
250 _ => Some(req.item.clone()),
251 })
252 .collect();
253
254 for item in &items {
256 in_flight.remove(&item.key());
257 }
258
259 items
260 }
261
262 #[inline]
263 pub fn request_now<I>(
264 &self, items: I,
265 request: impl Fn(&NodeId, Vec<Key>) -> Result<Option<RequestId>, Error>,
266 ) where
267 I: Iterator<Item = Item>,
268 {
269 let peer = match FullPeerFilter::new(self.request_msg_id)
270 .select(self.peers.clone())
271 {
272 Some(peer) => peer,
273 None => {
274 warn!("No peers available");
275 self.insert_waiting(items);
276 return;
277 }
278 };
279
280 self.request_now_from_peer(items, &peer, request);
281 }
282
283 #[inline]
284 pub fn request_now_from_peer<I>(
285 &self, items: I, peer: &NodeId,
286 request: impl Fn(&NodeId, Vec<Key>) -> Result<Option<RequestId>, Error>,
287 ) where
288 I: Iterator<Item = Item>,
289 {
290 let mut in_flight = self.in_flight.write();
291
292 let missing = items
293 .filter(|item| !in_flight.contains_key(&item.key()))
294 .collect::<Vec<_>>();
295
296 let keys = missing.iter().map(|h| h.key()).collect();
297
298 match request(peer, keys) {
299 Ok(None) => {}
300 Ok(Some(request_id)) => {
301 let new_in_flight = missing.into_iter().map(|item| {
302 (item.key(), InFlightRequest::new(item, request_id))
303 });
304
305 in_flight.extend(new_in_flight);
306 }
307 Err(e) => {
308 warn!(
309 "Failed to request {:?} from {:?}: {:?}",
310 missing, peer, e
311 );
312
313 drop(in_flight);
314 self.insert_waiting(missing.into_iter());
315 }
316 }
317 }
318}