network/ip/
node_limit.rs

1use crate::{
2    ip::{bucket::NodeBucket, sample::SampleHashMap, util::SubnetType},
3    node_database::NodeDatabase,
4    node_table::NodeId,
5};
6use rand::thread_rng;
7use std::{
8    collections::{HashMap, HashSet},
9    net::IpAddr,
10    time::Duration,
11};
12
13/// Default maximum duration of last node contact time that used to evict a node
14/// when `NodeBucket` is full.
15const DEFAULT_EVICT_TIMEOUT: Duration = Duration::from_secs(7 * 24 * 3600); // 7 days
16
17/// Validation result before adding a node.
18#[derive(Debug, PartialEq)]
19pub enum ValidateInsertResult {
20    /// Node already exists and IP address not changed
21    AlreadyExists,
22    /// Node will be new added, and occupy the IP address used by existing
23    /// node. In this case, the existing node will be evicted before adding
24    /// the new node.
25    OccupyIp(NodeId),
26    /// Node is allowed to add or update with new IP address, because the
27    /// corresponding `NodeBucket` has enough quota.
28    QuotaEnough,
29    /// Node is allowed to add or update, but need to evict the specified
30    /// existing node first.
31    Evict(NodeId),
32    /// Node is not allowed to add or update, because the corresponding
33    /// `NodeBucket` is full, and no node could be evicted. E.g. all nodes in
34    /// the bucket are in connecting status.
35    QuotaNotEnough,
36}
37
38/// NodeLimit is used to limit the number of nodes stored in database via IP
39/// address and subnet. Basically, one IP address only allow one node stored
40/// in database, and a subnet allow configured N nodes stored in database.
41///
42/// When adding a new node, and the number of nodes for the subnet reaches the
43/// quota limitation, any node may be evicted from database according to
44/// a pre-defined rule.
45#[derive(Debug)]
46pub struct NodeIpLimit {
47    subnet_type: SubnetType,
48    subnet_quota: usize,     // quota for a subnet
49    evict_timeout: Duration, // used to evict out-of-date node
50
51    // all trusted nodes grouped by subnet
52    trusted_buckets: SampleHashMap<u32, NodeBucket>,
53    // all untrusted nodes grouped by subnet
54    untrusted_buckets: SampleHashMap<u32, NodeBucket>,
55
56    // helpful indices
57    ip_index: HashMap<IpAddr, NodeId>,
58    node_index: HashMap<NodeId, IpAddr>,
59}
60
61impl NodeIpLimit {
62    pub fn new(subnet_quota: usize) -> Self {
63        NodeIpLimit {
64            subnet_type: SubnetType::C,
65            subnet_quota,
66            evict_timeout: DEFAULT_EVICT_TIMEOUT,
67            trusted_buckets: Default::default(),
68            untrusted_buckets: Default::default(),
69            ip_index: HashMap::new(),
70            node_index: HashMap::new(),
71        }
72    }
73
74    #[inline]
75    pub fn is_enabled(&self) -> bool { self.subnet_quota > 0 }
76
77    /// Get the subnet of specified node `id`.
78    pub fn subnet(&self, id: &NodeId) -> Option<u32> {
79        let ip = self.node_index.get(id)?;
80        Some(self.subnet_type.subnet(ip))
81    }
82
83    /// Remove the specified node `id` and return `true` if removed
84    /// successfully. If not found, return `false`.
85    pub fn remove(&mut self, id: &NodeId) -> bool {
86        if !self.is_enabled() {
87            return true;
88        }
89
90        let ip = match self.node_index.remove(id) {
91            Some(ip) => ip,
92            None => return false,
93        };
94
95        self.ip_index.remove(&ip);
96
97        let subnet = self.subnet_type.subnet(&ip);
98        if !Self::remove_with_buckets(&mut self.trusted_buckets, subnet, id) {
99            Self::remove_with_buckets(&mut self.untrusted_buckets, subnet, id);
100        }
101
102        true
103    }
104
105    /// Remove node from specified buckets.
106    fn remove_with_buckets(
107        buckets: &mut SampleHashMap<u32, NodeBucket>, subnet: u32, id: &NodeId,
108    ) -> bool {
109        let bucket = match buckets.get_mut(&subnet) {
110            Some(bucket) => bucket,
111            None => return false,
112        };
113
114        if !bucket.remove(id) {
115            return false;
116        }
117
118        // remove bucket on empty
119        if bucket.count() == 0 {
120            buckets.remove(&subnet);
121        }
122
123        true
124    }
125
126    /// Randomly select `n` trusted nodes. Note, it may return less than `n`
127    /// nodes. Note, the time complexity is O(n), where n is the number of
128    /// sampled nodes.
129    pub fn sample_trusted(&self, n: u32) -> HashSet<NodeId> {
130        if !self.is_enabled() {
131            return HashSet::new();
132        }
133
134        let mut sampled = HashSet::new();
135        if self.trusted_buckets.is_empty() {
136            return sampled;
137        }
138
139        let mut rng = thread_rng();
140
141        for _ in 0..n {
142            if let Some(bucket) = self.trusted_buckets.sample(&mut rng) {
143                if let Some(id) = bucket.sample(&mut rng) {
144                    sampled.insert(id);
145                }
146            }
147        }
148
149        sampled
150    }
151
152    /// Validate before inserting a node with specified `id` and `ip`.
153    /// The returned result indicates whether insertion is allowed,
154    /// and possible evictee before insertion.
155    ///
156    /// When subnet quota is not enough before insertion:
157    /// 1. If node IP changed and still in the same subnet,
158    /// just evict the node itself.
159    /// 2. Otherwise, someone in the subnet of `ip` may be evicted.
160    ///
161    /// There are 2 cases that insertion is not allowed:
162    /// 1. Node already exists and ip not changed;
163    /// 2. Subnet quota is not enough, and no evictee found.
164    pub fn validate_insertion(
165        &self, id: &NodeId, ip: &IpAddr, db: &NodeDatabase,
166    ) -> ValidateInsertResult {
167        if !self.is_enabled() {
168            return ValidateInsertResult::QuotaEnough;
169        }
170
171        // node exists and ip not changed.
172        let maybe_cur_ip = self.node_index.get(&id);
173        if let Some(cur_ip) = maybe_cur_ip {
174            if cur_ip == ip {
175                return ValidateInsertResult::AlreadyExists;
176            }
177        }
178
179        // ip already in use by other node
180        if let Some(old_id) = self.ip_index.get(&ip) {
181            return ValidateInsertResult::OccupyIp(old_id.clone());
182        }
183
184        // quota enough
185        if self.is_quota_allowed(ip) {
186            return ValidateInsertResult::QuotaEnough;
187        }
188
189        // Node ip changed, but still in the same subnet.
190        // So, just evict the node itself.
191        if let Some(cur_ip) = maybe_cur_ip {
192            let cur_subnet = self.subnet_type.subnet(cur_ip);
193            let new_subnet = self.subnet_type.subnet(ip);
194            if cur_subnet == new_subnet {
195                return ValidateInsertResult::Evict(id.clone());
196            }
197        }
198
199        // quota not enough, try to evict one.
200        if let Some(evictee) = self.select_evictee(ip, db) {
201            return ValidateInsertResult::Evict(evictee);
202        }
203
204        ValidateInsertResult::QuotaNotEnough
205    }
206
207    /// Insert a node with specified `id` and `ip` as trusted or untrusted.
208    /// If evictee specified, then remove that one before insert new one.
209    /// Returns `true` if insert successfully, otherwise `false`.
210    pub fn insert(
211        &mut self, id: NodeId, ip: IpAddr, trusted: bool,
212        evictee: Option<NodeId>,
213    ) -> bool {
214        if !self.is_enabled() {
215            return true;
216        }
217
218        // node exists and ip not changed.
219        if let Some(cur_ip) = self.node_index.get(&id) {
220            if *cur_ip == ip {
221                return false;
222            }
223        }
224
225        // remove evictee before insertion
226        if let Some(id) = evictee {
227            self.remove(&id);
228        }
229
230        // ip already in use by other node
231        if self.ip_index.contains_key(&ip) {
232            return false;
233        }
234
235        if self.is_quota_allowed(&ip) {
236            self.add_or_update(ip, id, trusted);
237            return true;
238        }
239
240        false
241    }
242
243    /// Demote `id` from trusted to untrusted
244    pub fn demote(&mut self, id: &NodeId) {
245        if let Some(ip) = self.node_index.get(id) {
246            let subnet = self.subnet_type.subnet(ip);
247            if let Some(b) = self.trusted_buckets.get_mut(&subnet) {
248                if b.remove(id) {
249                    if let Some(b) = self.untrusted_buckets.get_mut(&subnet) {
250                        b.add(id.clone());
251                    }
252                }
253            }
254        }
255    }
256
257    /// Add or update a node with new IP address. It assumes that the bucket
258    /// of corresponding subnet have enough quota.
259    fn add_or_update(&mut self, ip: IpAddr, id: NodeId, trusted: bool) {
260        // clear the old ip information at first
261        self.remove(&id);
262
263        self.node_index.insert(id.clone(), ip);
264        self.ip_index.insert(ip, id.clone());
265
266        let subnet = self.subnet_type.subnet(&ip);
267        if trusted {
268            self.trusted_buckets
269                .get_mut_or_insert_with(subnet, NodeBucket::default)
270                .add(id);
271        } else {
272            self.untrusted_buckets
273                .get_mut_or_insert_with(subnet, NodeBucket::default)
274                .add(id);
275        }
276    }
277
278    /// Check whether the subnet quota is enough for the specified IP address .
279    fn is_quota_allowed(&self, ip: &IpAddr) -> bool {
280        let subnet = self.subnet_type.subnet(ip);
281
282        let num_trusted = self
283            .trusted_buckets
284            .get(&subnet)
285            .map_or(0, |bucket| bucket.count());
286
287        let num_untrusted = self
288            .untrusted_buckets
289            .get(&subnet)
290            .map_or(0, |bucket| bucket.count());
291
292        num_trusted + num_untrusted < self.subnet_quota
293    }
294
295    /// Select a node to evict.
296    fn select_evictee(&self, ip: &IpAddr, db: &NodeDatabase) -> Option<NodeId> {
297        let subnet = self.subnet_type.subnet(&ip);
298
299        // evict untrusted node prior to trusted node
300        self.untrusted_buckets
301            .get(&subnet)
302            .and_then(|bucket| bucket.select_evictee(db, self.evict_timeout))
303            .or_else(|| {
304                self.trusted_buckets.get(&subnet).and_then(|bucket| {
305                    bucket.select_evictee(db, self.evict_timeout)
306                })
307            })
308    }
309}
310
311#[cfg(test)]
312mod tests {
313    use super::{NodeDatabase, NodeId, NodeIpLimit, ValidateInsertResult};
314    use std::{net::IpAddr, str::FromStr};
315
316    fn new_ip(ip: &'static str) -> IpAddr { IpAddr::from_str(ip).unwrap() }
317
318    #[test]
319    fn test_remove() {
320        let mut limit = NodeIpLimit::new(2);
321
322        // remove non-exist node
323        assert_eq!(limit.remove(&NodeId::random()), false);
324
325        // add 2 new nodes
326        let n1 = NodeId::random();
327        let ip1 = new_ip("127.0.0.1");
328        assert_eq!(limit.insert(n1.clone(), ip1, true, None), true);
329
330        let n2 = NodeId::random();
331        let ip2 = new_ip("127.0.0.2");
332        assert_eq!(limit.insert(n2.clone(), ip2, true, None), true);
333
334        // remove those 2 nodes
335        validate_node(&limit, &n1, &ip1, true /* exists */);
336        assert_eq!(limit.remove(&n1), true);
337        validate_node(&limit, &n1, &ip1, false /* exists */);
338
339        validate_node(&limit, &n2, &ip2, true /* exists */);
340        assert_eq!(limit.remove(&n2), true);
341        validate_node(&limit, &n2, &ip2, false /* exists */);
342    }
343
344    #[test]
345    fn test_sample() {
346        let mut limit = NodeIpLimit::new(2);
347
348        // empty case
349        assert_eq!(limit.sample_trusted(3).is_empty(), true);
350
351        // only untrusted nodes case
352        let n1 = NodeId::random();
353        let ip1 = new_ip("127.0.0.1");
354        assert_eq!(limit.insert(n1, ip1, false, None), true);
355        assert_eq!(limit.sample_trusted(3).is_empty(), true);
356
357        // trusted nodes case
358        let n2 = NodeId::random();
359        let ip2 = new_ip("127.0.0.2");
360        assert_eq!(limit.insert(n2, ip2, true, None), true);
361        assert_eq!(limit.sample_trusted(0).len(), 0);
362        assert_eq!(limit.sample_trusted(1).len(), 1);
363        assert_eq!(limit.sample_trusted(3).len(), 1);
364    }
365
366    fn validate_node(
367        limit: &NodeIpLimit, id: &NodeId, ip: &IpAddr, exists: bool,
368    ) {
369        if !exists {
370            assert_eq!(limit.node_index.contains_key(id), false);
371        } else {
372            assert_eq!(limit.node_index.contains_key(id), true);
373            assert_eq!(limit.node_index[id], *ip);
374        }
375    }
376
377    #[test]
378    fn test_insert_duplicate_id_ip() {
379        let mut limit = NodeIpLimit::new(2);
380        let db = NodeDatabase::new(None, 2);
381
382        // quota is enough
383        let n = NodeId::random();
384        let ip = new_ip("127.0.0.1");
385        assert_eq!(
386            limit.validate_insertion(&n, &ip, &db),
387            ValidateInsertResult::QuotaEnough
388        );
389        assert_eq!(limit.insert(n.clone(), ip, true, None), true);
390        validate_node(&limit, &n, &ip, true /* exists */);
391
392        // cannot insert with same id and ip as trusted or untrusted
393        assert_eq!(
394            limit.validate_insertion(&n, &ip, &db),
395            ValidateInsertResult::AlreadyExists
396        );
397        assert_eq!(limit.insert(n.clone(), ip, true, None), false);
398        assert_eq!(limit.insert(n.clone(), ip, false, None), false);
399        validate_node(&limit, &n, &ip, true /* exists */);
400    }
401
402    #[test]
403    fn test_insert_occupy_ip_new_node() {
404        let mut limit = NodeIpLimit::new(2);
405        let db = NodeDatabase::new(None, 2);
406
407        // insert n1
408        let n1 = NodeId::random();
409        let ip = new_ip("127.0.0.1");
410        assert_eq!(
411            limit.validate_insertion(&n1, &ip, &db),
412            ValidateInsertResult::QuotaEnough
413        );
414        assert_eq!(limit.insert(n1.clone(), ip, true, None), true);
415        validate_node(&limit, &n1, &ip, true /* exists */);
416
417        // add n2 with existing ip which need to evict the n1
418        let n2 = NodeId::random();
419        assert_eq!(
420            limit.validate_insertion(&n2, &ip, &db),
421            ValidateInsertResult::OccupyIp(n1.clone())
422        );
423
424        // add n2 without evicting n1
425        assert_eq!(limit.insert(n2.clone(), ip, true, None), false);
426        validate_node(&limit, &n1, &ip, true /* exists */); // n1 not evicted
427        validate_node(&limit, &n2, &ip, false /* exists */); // n2 not inserted
428
429        // add n2 with evicting n1
430        assert_eq!(limit.insert(n2.clone(), ip, true, Some(n1.clone())), true);
431        validate_node(&limit, &n1, &ip, false /* exists */); // n1 evicted
432        validate_node(&limit, &n2, &ip, true /* exists */); // n2 inserted
433    }
434
435    #[test]
436    fn test_insert_occupy_ip_update_node() {
437        let mut limit = NodeIpLimit::new(2);
438        let db = NodeDatabase::new(None, 2);
439
440        // insert n1 and n2
441        let n1 = NodeId::random();
442        let ip1 = new_ip("127.0.0.1");
443        assert_eq!(
444            limit.validate_insertion(&n1, &ip1, &db),
445            ValidateInsertResult::QuotaEnough
446        );
447        assert_eq!(limit.insert(n1.clone(), ip1, true, None), true);
448        validate_node(&limit, &n1, &ip1, true /* exists */);
449
450        let n2 = NodeId::random();
451        let ip2 = new_ip("127.0.0.2");
452        assert_eq!(
453            limit.validate_insertion(&n2, &ip2, &db),
454            ValidateInsertResult::QuotaEnough
455        );
456        assert_eq!(limit.insert(n2.clone(), ip2, true, None), true);
457        validate_node(&limit, &n2, &ip2, true /* exists */);
458
459        // change n2's ip from ip2 to ip1
460        assert_eq!(
461            limit.validate_insertion(&n2, &ip1, &db),
462            ValidateInsertResult::OccupyIp(n1.clone())
463        );
464
465        // update n2 without evicting n1
466        assert_eq!(limit.insert(n2.clone(), ip1, true, None), false);
467        validate_node(&limit, &n1, &ip1, true /* exists */); // n1 not evicted
468        validate_node(&limit, &n2, &ip2, true /* exists */); // n2 not updated
469
470        // update n2 with evicting n1
471        assert_eq!(limit.insert(n2.clone(), ip1, true, Some(n1.clone())), true);
472        validate_node(&limit, &n1, &ip1, false /* exists */); // n1 evicted
473        validate_node(&limit, &n2, &ip1, true /* exists */); // n2 updated
474    }
475
476    #[test]
477    fn test_is_quota_allowed() {
478        let mut limit = NodeIpLimit::new(2);
479
480        // add n1
481        let n1 = NodeId::random();
482        let ip1 = new_ip("127.0.0.1");
483        assert_eq!(limit.insert(n1, ip1, true, None), true);
484
485        // add n2
486        let n2 = NodeId::random();
487        let ip2 = new_ip("127.0.0.2");
488        assert_eq!(limit.insert(n2, ip2, true, None), true);
489
490        // same subnet
491        assert_eq!(limit.is_quota_allowed(&new_ip("127.0.0.3")), false);
492
493        // different subnet
494        assert_eq!(limit.is_quota_allowed(&new_ip("127.0.1.1")), true);
495    }
496
497    #[test]
498    fn test_select_evictee() {
499        let limit = NodeIpLimit::new(2);
500        let db = NodeDatabase::new(None, 2);
501
502        // select from empty bucket
503        assert_eq!(limit.select_evictee(&new_ip("127.0.0.1"), &db), None);
504    }
505}