use crate::{
ip::{bucket::NodeBucket, sample::SampleHashMap, util::SubnetType},
node_database::NodeDatabase,
node_table::NodeId,
};
use rand::thread_rng;
use std::{
collections::{HashMap, HashSet},
net::IpAddr,
time::Duration,
};
const DEFAULT_EVICT_TIMEOUT: Duration = Duration::from_secs(7 * 24 * 3600); #[derive(Debug, PartialEq)]
pub enum ValidateInsertResult {
AlreadyExists,
OccupyIp(NodeId),
QuotaEnough,
Evict(NodeId),
QuotaNotEnough,
}
#[derive(Debug)]
pub struct NodeIpLimit {
subnet_type: SubnetType,
subnet_quota: usize, evict_timeout: Duration, trusted_buckets: SampleHashMap<u32, NodeBucket>,
untrusted_buckets: SampleHashMap<u32, NodeBucket>,
ip_index: HashMap<IpAddr, NodeId>,
node_index: HashMap<NodeId, IpAddr>,
}
impl NodeIpLimit {
pub fn new(subnet_quota: usize) -> Self {
NodeIpLimit {
subnet_type: SubnetType::C,
subnet_quota,
evict_timeout: DEFAULT_EVICT_TIMEOUT,
trusted_buckets: Default::default(),
untrusted_buckets: Default::default(),
ip_index: HashMap::new(),
node_index: HashMap::new(),
}
}
#[inline]
pub fn is_enabled(&self) -> bool { self.subnet_quota > 0 }
pub fn subnet(&self, id: &NodeId) -> Option<u32> {
let ip = self.node_index.get(id)?;
Some(self.subnet_type.subnet(ip))
}
pub fn remove(&mut self, id: &NodeId) -> bool {
if !self.is_enabled() {
return true;
}
let ip = match self.node_index.remove(id) {
Some(ip) => ip,
None => return false,
};
self.ip_index.remove(&ip);
let subnet = self.subnet_type.subnet(&ip);
if !Self::remove_with_buckets(&mut self.trusted_buckets, subnet, id) {
Self::remove_with_buckets(&mut self.untrusted_buckets, subnet, id);
}
true
}
fn remove_with_buckets(
buckets: &mut SampleHashMap<u32, NodeBucket>, subnet: u32, id: &NodeId,
) -> bool {
let bucket = match buckets.get_mut(&subnet) {
Some(bucket) => bucket,
None => return false,
};
if !bucket.remove(id) {
return false;
}
if bucket.count() == 0 {
buckets.remove(&subnet);
}
true
}
pub fn sample_trusted(&self, n: u32) -> HashSet<NodeId> {
if !self.is_enabled() {
return HashSet::new();
}
let mut sampled = HashSet::new();
if self.trusted_buckets.is_empty() {
return sampled;
}
let mut rng = thread_rng();
for _ in 0..n {
if let Some(bucket) = self.trusted_buckets.sample(&mut rng) {
if let Some(id) = bucket.sample(&mut rng) {
sampled.insert(id);
}
}
}
sampled
}
pub fn validate_insertion(
&self, id: &NodeId, ip: &IpAddr, db: &NodeDatabase,
) -> ValidateInsertResult {
if !self.is_enabled() {
return ValidateInsertResult::QuotaEnough;
}
let maybe_cur_ip = self.node_index.get(&id);
if let Some(cur_ip) = maybe_cur_ip {
if cur_ip == ip {
return ValidateInsertResult::AlreadyExists;
}
}
if let Some(old_id) = self.ip_index.get(&ip) {
return ValidateInsertResult::OccupyIp(old_id.clone());
}
if self.is_quota_allowed(ip) {
return ValidateInsertResult::QuotaEnough;
}
if let Some(cur_ip) = maybe_cur_ip {
let cur_subnet = self.subnet_type.subnet(cur_ip);
let new_subnet = self.subnet_type.subnet(ip);
if cur_subnet == new_subnet {
return ValidateInsertResult::Evict(id.clone());
}
}
if let Some(evictee) = self.select_evictee(ip, db) {
return ValidateInsertResult::Evict(evictee);
}
ValidateInsertResult::QuotaNotEnough
}
pub fn insert(
&mut self, id: NodeId, ip: IpAddr, trusted: bool,
evictee: Option<NodeId>,
) -> bool {
if !self.is_enabled() {
return true;
}
if let Some(cur_ip) = self.node_index.get(&id) {
if *cur_ip == ip {
return false;
}
}
if let Some(id) = evictee {
self.remove(&id);
}
if self.ip_index.contains_key(&ip) {
return false;
}
if self.is_quota_allowed(&ip) {
self.add_or_update(ip, id, trusted);
return true;
}
false
}
pub fn demote(&mut self, id: &NodeId) {
if let Some(ip) = self.node_index.get(id) {
let subnet = self.subnet_type.subnet(ip);
if let Some(b) = self.trusted_buckets.get_mut(&subnet) {
if b.remove(id) {
if let Some(b) = self.untrusted_buckets.get_mut(&subnet) {
b.add(id.clone());
}
}
}
}
}
fn add_or_update(&mut self, ip: IpAddr, id: NodeId, trusted: bool) {
self.remove(&id);
self.node_index.insert(id.clone(), ip);
self.ip_index.insert(ip, id.clone());
let subnet = self.subnet_type.subnet(&ip);
if trusted {
self.trusted_buckets
.get_mut_or_insert_with(subnet, NodeBucket::default)
.add(id);
} else {
self.untrusted_buckets
.get_mut_or_insert_with(subnet, NodeBucket::default)
.add(id);
}
}
fn is_quota_allowed(&self, ip: &IpAddr) -> bool {
let subnet = self.subnet_type.subnet(ip);
let num_trusted = self
.trusted_buckets
.get(&subnet)
.map_or(0, |bucket| bucket.count());
let num_untrusted = self
.untrusted_buckets
.get(&subnet)
.map_or(0, |bucket| bucket.count());
num_trusted + num_untrusted < self.subnet_quota
}
fn select_evictee(&self, ip: &IpAddr, db: &NodeDatabase) -> Option<NodeId> {
let subnet = self.subnet_type.subnet(&ip);
self.untrusted_buckets
.get(&subnet)
.and_then(|bucket| bucket.select_evictee(db, self.evict_timeout))
.or_else(|| {
self.trusted_buckets.get(&subnet).and_then(|bucket| {
bucket.select_evictee(db, self.evict_timeout)
})
})
}
}
#[cfg(test)]
mod tests {
use super::{NodeDatabase, NodeId, NodeIpLimit, ValidateInsertResult};
use std::{net::IpAddr, str::FromStr};
fn new_ip(ip: &'static str) -> IpAddr { IpAddr::from_str(ip).unwrap() }
#[test]
fn test_remove() {
let mut limit = NodeIpLimit::new(2);
assert_eq!(limit.remove(&NodeId::random()), false);
let n1 = NodeId::random();
let ip1 = new_ip("127.0.0.1");
assert_eq!(limit.insert(n1.clone(), ip1, true, None), true);
let n2 = NodeId::random();
let ip2 = new_ip("127.0.0.2");
assert_eq!(limit.insert(n2.clone(), ip2, true, None), true);
validate_node(&limit, &n1, &ip1, true );
assert_eq!(limit.remove(&n1), true);
validate_node(&limit, &n1, &ip1, false );
validate_node(&limit, &n2, &ip2, true );
assert_eq!(limit.remove(&n2), true);
validate_node(&limit, &n2, &ip2, false );
}
#[test]
fn test_sample() {
let mut limit = NodeIpLimit::new(2);
assert_eq!(limit.sample_trusted(3).is_empty(), true);
let n1 = NodeId::random();
let ip1 = new_ip("127.0.0.1");
assert_eq!(limit.insert(n1, ip1, false, None), true);
assert_eq!(limit.sample_trusted(3).is_empty(), true);
let n2 = NodeId::random();
let ip2 = new_ip("127.0.0.2");
assert_eq!(limit.insert(n2, ip2, true, None), true);
assert_eq!(limit.sample_trusted(0).len(), 0);
assert_eq!(limit.sample_trusted(1).len(), 1);
assert_eq!(limit.sample_trusted(3).len(), 1);
}
fn validate_node(
limit: &NodeIpLimit, id: &NodeId, ip: &IpAddr, exists: bool,
) {
if !exists {
assert_eq!(limit.node_index.contains_key(id), false);
} else {
assert_eq!(limit.node_index.contains_key(id), true);
assert_eq!(limit.node_index[id], *ip);
}
}
#[test]
fn test_insert_duplicate_id_ip() {
let mut limit = NodeIpLimit::new(2);
let db = NodeDatabase::new(None, 2);
let n = NodeId::random();
let ip = new_ip("127.0.0.1");
assert_eq!(
limit.validate_insertion(&n, &ip, &db),
ValidateInsertResult::QuotaEnough
);
assert_eq!(limit.insert(n.clone(), ip, true, None), true);
validate_node(&limit, &n, &ip, true );
assert_eq!(
limit.validate_insertion(&n, &ip, &db),
ValidateInsertResult::AlreadyExists
);
assert_eq!(limit.insert(n.clone(), ip, true, None), false);
assert_eq!(limit.insert(n.clone(), ip, false, None), false);
validate_node(&limit, &n, &ip, true );
}
#[test]
fn test_insert_occupy_ip_new_node() {
let mut limit = NodeIpLimit::new(2);
let db = NodeDatabase::new(None, 2);
let n1 = NodeId::random();
let ip = new_ip("127.0.0.1");
assert_eq!(
limit.validate_insertion(&n1, &ip, &db),
ValidateInsertResult::QuotaEnough
);
assert_eq!(limit.insert(n1.clone(), ip, true, None), true);
validate_node(&limit, &n1, &ip, true );
let n2 = NodeId::random();
assert_eq!(
limit.validate_insertion(&n2, &ip, &db),
ValidateInsertResult::OccupyIp(n1.clone())
);
assert_eq!(limit.insert(n2.clone(), ip, true, None), false);
validate_node(&limit, &n1, &ip, true ); validate_node(&limit, &n2, &ip, false ); assert_eq!(limit.insert(n2.clone(), ip, true, Some(n1.clone())), true);
validate_node(&limit, &n1, &ip, false ); validate_node(&limit, &n2, &ip, true ); }
#[test]
fn test_insert_occupy_ip_update_node() {
let mut limit = NodeIpLimit::new(2);
let db = NodeDatabase::new(None, 2);
let n1 = NodeId::random();
let ip1 = new_ip("127.0.0.1");
assert_eq!(
limit.validate_insertion(&n1, &ip1, &db),
ValidateInsertResult::QuotaEnough
);
assert_eq!(limit.insert(n1.clone(), ip1, true, None), true);
validate_node(&limit, &n1, &ip1, true );
let n2 = NodeId::random();
let ip2 = new_ip("127.0.0.2");
assert_eq!(
limit.validate_insertion(&n2, &ip2, &db),
ValidateInsertResult::QuotaEnough
);
assert_eq!(limit.insert(n2.clone(), ip2, true, None), true);
validate_node(&limit, &n2, &ip2, true );
assert_eq!(
limit.validate_insertion(&n2, &ip1, &db),
ValidateInsertResult::OccupyIp(n1.clone())
);
assert_eq!(limit.insert(n2.clone(), ip1, true, None), false);
validate_node(&limit, &n1, &ip1, true ); validate_node(&limit, &n2, &ip2, true ); assert_eq!(limit.insert(n2.clone(), ip1, true, Some(n1.clone())), true);
validate_node(&limit, &n1, &ip1, false ); validate_node(&limit, &n2, &ip1, true ); }
#[test]
fn test_is_quota_allowed() {
let mut limit = NodeIpLimit::new(2);
let n1 = NodeId::random();
let ip1 = new_ip("127.0.0.1");
assert_eq!(limit.insert(n1, ip1, true, None), true);
let n2 = NodeId::random();
let ip2 = new_ip("127.0.0.2");
assert_eq!(limit.insert(n2, ip2, true, None), true);
assert_eq!(limit.is_quota_allowed(&new_ip("127.0.0.3")), false);
assert_eq!(limit.is_quota_allowed(&new_ip("127.0.1.1")), true);
}
#[test]
fn test_select_evictee() {
let limit = NodeIpLimit::new(2);
let db = NodeDatabase::new(None, 2);
assert_eq!(limit.select_evictee(&new_ip("127.0.0.1"), &db), None);
}
}