network/
node_table.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::{ip_utils::*, AllowIP, Error, IpFilter};
6use cfx_types::H512;
7use cfx_util_macros::bail;
8use enum_map::{Enum, EnumMap};
9use io::StreamToken;
10use log::{debug, warn};
11use rand::{self, prelude::SliceRandom, Rng};
12use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream};
13use serde::Deserialize;
14use serde_derive::Serialize;
15use serde_json;
16use std::{
17    collections::{HashMap, HashSet},
18    fmt::{self, Display, Formatter},
19    fs,
20    hash::{Hash, Hasher},
21    net::{
22        Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6,
23        ToSocketAddrs,
24    },
25    path::{Path, PathBuf},
26    slice,
27    str::FromStr,
28    time::{self, Duration, SystemTime},
29};
30use strum::IntoEnumIterator;
31use strum_macros::EnumIter;
32
33/// Node public key
34pub type NodeId = H512;
35
36#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
37#[serde(rename_all = "camelCase")]
38/// Node address info
39pub struct NodeEndpoint {
40    /// IP(V4 or V6) address
41    pub address: SocketAddr,
42    /// Connection port.
43    pub udp_port: u16,
44}
45
46impl NodeEndpoint {
47    pub fn udp_address(&self) -> SocketAddr {
48        match self.address {
49            SocketAddr::V4(a) => {
50                SocketAddr::V4(SocketAddrV4::new(*a.ip(), self.udp_port))
51            }
52            SocketAddr::V6(a) => SocketAddr::V6(SocketAddrV6::new(
53                *a.ip(),
54                self.udp_port,
55                a.flowinfo(),
56                a.scope_id(),
57            )),
58        }
59    }
60
61    pub fn is_allowed(&self, filter: &IpFilter) -> bool {
62        (self.is_allowed_by_predefined(&filter.predefined)
63            || filter
64                .custom_allow
65                .iter()
66                .any(|ipnet| self.address.ip().is_within(ipnet)))
67            && !filter
68                .custom_block
69                .iter()
70                .any(|ipnet| self.address.ip().is_within(ipnet))
71    }
72
73    pub fn is_allowed_by_predefined(&self, filter: &AllowIP) -> bool {
74        match filter {
75            AllowIP::All => true,
76            AllowIP::Private => self.address.ip().is_usable_private(),
77            AllowIP::Public => self.address.ip().is_usable_public(),
78            AllowIP::None => false,
79        }
80    }
81
82    pub fn from_rlp(rlp: &Rlp) -> Result<Self, DecoderError> {
83        let tcp_port = rlp.val_at::<u16>(2)?;
84        let udp_port = rlp.val_at::<u16>(1)?;
85        let addr_bytes = rlp.at(0)?.data()?;
86        let address = match addr_bytes.len() {
87            4 => Ok(SocketAddr::V4(SocketAddrV4::new(
88                Ipv4Addr::new(
89                    addr_bytes[0],
90                    addr_bytes[1],
91                    addr_bytes[2],
92                    addr_bytes[3],
93                ),
94                tcp_port,
95            ))),
96            16 => {
97                let mut o: [u16; 8] = [0; 8];
98                for i in 0..8 {
99                    o[i] = ((addr_bytes[2 * i + 1] as u16) << 8)
100                        | (addr_bytes[2 * i] as u16);
101                }
102                Ok(SocketAddr::V6(SocketAddrV6::new(
103                    Ipv6Addr::new(
104                        o[0], o[1], o[2], o[3], o[4], o[5], o[6], o[7],
105                    ),
106                    tcp_port,
107                    0,
108                    0,
109                )))
110            }
111            _ => Err(DecoderError::RlpInconsistentLengthAndData),
112        }?;
113        Ok(NodeEndpoint { address, udp_port })
114    }
115
116    pub fn to_rlp(&self, rlp: &mut RlpStream) {
117        match self.address {
118            SocketAddr::V4(a) => {
119                rlp.append(&(&a.ip().octets()[..]));
120            }
121            SocketAddr::V6(a) => unsafe {
122                let segments = a.ip().segments();
123                let o: *const u8 = segments.as_ptr() as *const u8;
124                rlp.append(&slice::from_raw_parts(o, 16));
125            },
126        };
127        rlp.append(&self.udp_port);
128        rlp.append(&self.address.port());
129    }
130
131    pub fn to_rlp_list(&self, rlp: &mut RlpStream) {
132        rlp.begin_list(3);
133        self.to_rlp(rlp);
134    }
135
136    /// Validates that the port is not 0 and address IP is specified
137    pub fn is_valid(&self) -> bool {
138        self.udp_port != 0
139            && self.address.port() != 0
140            && match self.address {
141                SocketAddr::V4(a) => !a.ip().is_unspecified(),
142                SocketAddr::V6(a) => !a.ip().is_unspecified(),
143            }
144    }
145}
146
147impl FromStr for NodeEndpoint {
148    type Err = Error;
149
150    /// Create endpoint from string. Performs name resolution if given a host
151    /// name.
152    fn from_str(s: &str) -> Result<NodeEndpoint, Error> {
153        let address = s.to_socket_addrs().map(|mut i| i.next());
154        match address {
155            Ok(Some(a)) => Ok(NodeEndpoint {
156                address: a,
157                udp_port: a.port(),
158            }),
159            Ok(None) => bail!(Error::AddressResolve(None)),
160            Err(_) => Err(Error::AddressParse.into()), /* always an io::Error
161                                                        * of InvalidInput
162                                                        * kind */
163        }
164    }
165}
166
167#[derive(Clone, Debug)]
168pub struct NodeEntry {
169    pub id: NodeId,
170    pub endpoint: NodeEndpoint,
171}
172
173impl Encodable for NodeEntry {
174    fn rlp_append(&self, s: &mut RlpStream) {
175        s.begin_list(4);
176        self.endpoint.to_rlp(s);
177        s.append(&self.id);
178    }
179}
180
181impl Decodable for NodeEntry {
182    fn decode(rlp: &Rlp) -> Result<Self, DecoderError> {
183        Ok(NodeEntry {
184            id: rlp.val_at(3)?,
185            endpoint: NodeEndpoint::from_rlp(rlp)?,
186        })
187    }
188}
189
190#[derive(Debug, PartialEq, Eq, Copy, Clone)]
191pub enum PeerType {
192    _Required,
193    Optional,
194}
195
196/// A type for representing an interaction (contact) with a node at a given time
197/// that was either a success or a failure.
198#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
199#[serde(rename_all = "camelCase")]
200pub enum NodeContact {
201    Success(SystemTime),
202    Failure(SystemTime),
203    Demoted(SystemTime),
204}
205
206impl NodeContact {
207    pub fn success() -> NodeContact { NodeContact::Success(SystemTime::now()) }
208
209    pub fn failure() -> NodeContact { NodeContact::Failure(SystemTime::now()) }
210
211    pub fn demoted() -> NodeContact { NodeContact::Demoted(SystemTime::now()) }
212
213    pub fn is_demoted(&self) -> bool { matches!(self, NodeContact::Demoted(_)) }
214
215    pub fn time(&self) -> SystemTime {
216        match *self {
217            NodeContact::Success(t)
218            | NodeContact::Failure(t)
219            | NodeContact::Demoted(t) => t,
220        }
221    }
222
223    pub fn success_for_duration(&self, due: Duration) -> bool {
224        let mut res = false;
225        match *self {
226            NodeContact::Success(t) => {
227                if let Ok(d) = t.elapsed() {
228                    if d > due {
229                        res = true;
230                    }
231                }
232            }
233            _ => {}
234        };
235
236        res
237    }
238
239    /// Filters and old contact, returning `None` if it happened longer than a
240    /// week ago.
241    #[allow(dead_code)]
242    fn recent(&self) -> Option<&NodeContact> {
243        let t = self.time();
244        if let Ok(d) = t.elapsed() {
245            if d < Duration::from_secs(60 * 60 * 24 * 7) {
246                return Some(self);
247            }
248        }
249
250        None
251    }
252}
253
254#[derive(Debug, Clone, Serialize, Deserialize)]
255#[serde(rename_all = "camelCase")]
256pub struct Node {
257    pub id: NodeId,
258    pub endpoint: NodeEndpoint,
259
260    // Updated by both udp ping/pong message in discovery protocol
261    // and tcp connection event.
262    // This metric can be used in prioritizing selection of peers
263    // to establish outgoing connections.
264    // It can also be used in considering demoting a
265    // trusted peer to untrusted.
266    pub last_contact: Option<NodeContact>,
267    // Updated by tcp connection event.
268    // This metric is used to consider when to promote untrusted
269    // peers to trusted. This is a runtime information which
270    // does not need to be made persistent.
271    pub last_connected: Option<NodeContact>,
272    pub stream_token: Option<StreamToken>,
273    // Generally, it is used by protocol handler layer to attach
274    // some tags to node, so as to:
275    // 1. Sampling nodes with special tags, e.g.
276    //     - archive nodes first
277    //     - good credit nodes first
278    //     - good network nodes first
279    // 2. Refuse incoming connection from node with special tags.
280    pub tags: HashMap<String, String>,
281}
282
283impl Node {
284    pub fn new(id: NodeId, endpoint: NodeEndpoint) -> Node {
285        Node {
286            id,
287            endpoint,
288            last_contact: None,
289            last_connected: None,
290            stream_token: None,
291            tags: Default::default(),
292        }
293    }
294}
295
296impl Display for Node {
297    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
298        if self.endpoint.udp_port != self.endpoint.address.port() {
299            write!(
300                f,
301                "cfxnode://{:x}@{}+{}",
302                self.id, self.endpoint.address, self.endpoint.udp_port
303            )?;
304        } else {
305            write!(f, "cfxnode://{:x}@{}", self.id, self.endpoint.address)?;
306        }
307        Ok(())
308    }
309}
310
311impl FromStr for Node {
312    type Err = Error;
313
314    fn from_str(s: &str) -> Result<Self, Self::Err> {
315        let (id, endpoint) =
316            if let Some(id_and_address_str) = s.strip_prefix("cfxnode://") {
317                // A node url with format "cfxnode://ID@IP:PORT"
318                let delimiter_index =
319                    id_and_address_str.find("@").ok_or(Error::AddressParse)?;
320                (
321                    id_and_address_str[..delimiter_index]
322                        .parse()
323                        .map_err(|_| Error::InvalidNodeId)?,
324                    NodeEndpoint::from_str(
325                        &id_and_address_str[delimiter_index + 1..],
326                    )?,
327                )
328            } else {
329                // A simple address without node id.
330                (NodeId::default(), NodeEndpoint::from_str(s)?)
331            };
332
333        Ok(Node {
334            id,
335            endpoint,
336            last_contact: None,
337            last_connected: None,
338            stream_token: None,
339            tags: Default::default(),
340        })
341    }
342}
343
344impl PartialEq for Node {
345    fn eq(&self, other: &Self) -> bool { self.id == other.id }
346}
347
348impl Eq for Node {}
349
350impl Hash for Node {
351    fn hash<H>(&self, state: &mut H)
352    where H: Hasher {
353        self.id.hash(state)
354    }
355}
356
357const MAX_NODES: usize = 4096;
358
359#[derive(
360    Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq, Enum, EnumIter,
361)]
362enum NodeReputation {
363    Success = 0,
364    Unknown = 1,
365    Failure = 2,
366    Demoted = 3,
367}
368
369const NODE_REPUTATION_LEVEL_COUNT: usize = 3;
370
371impl Default for NodeReputation {
372    fn default() -> Self { NodeReputation::Unknown }
373}
374
375#[derive(Default, Clone, Copy)]
376struct NodeReputationIndex(NodeReputation, usize);
377
378/// Node table backed by disk file.
379pub struct NodeTable {
380    /// A vector list of nodes for each reputation level
381    node_reputation_table: EnumMap<NodeReputation, Vec<Node>>,
382    /// Map node id to the reputation level and the index in the above table
383    node_index: HashMap<NodeId, NodeReputationIndex>,
384    useless_nodes: HashSet<NodeId>,
385    path: Option<PathBuf>,
386}
387
388impl NodeTable {
389    pub fn new(dir: Option<String>, filename: &str) -> NodeTable {
390        let path = dir.and_then(|dir| {
391            let mut buf = PathBuf::from(dir);
392            buf.push(filename);
393            Some(buf)
394        });
395
396        let mut node_table = NodeTable {
397            node_reputation_table: EnumMap::default(),
398            node_index: HashMap::new(),
399            path,
400            useless_nodes: HashSet::new(),
401        };
402
403        node_table.load_from_file();
404        node_table
405    }
406
407    fn node_reputation(contact: &Option<NodeContact>) -> NodeReputation {
408        if let Some(contact) = contact {
409            match contact {
410                NodeContact::Success(_) => NodeReputation::Success,
411                NodeContact::Failure(_) => NodeReputation::Failure,
412                NodeContact::Demoted(_) => NodeReputation::Demoted,
413                //_ => panic!("Unknown contact information!"),
414            }
415        } else {
416            NodeReputation::Unknown
417        }
418    }
419
420    fn load_from_file(&mut self) {
421        let path = match self.path {
422            Some(ref path) => path,
423            None => return,
424        };
425
426        let file = match fs::File::open(path) {
427            Ok(file) => file,
428            Err(e) => {
429                debug!("node table file not found: {:?}", e);
430                return;
431            }
432        };
433        let res: Result<json::NodeTable, _> = serde_json::from_reader(file);
434        match res {
435            Ok(table) => {
436                for n in table.nodes {
437                    let node = n.into_node();
438                    if let Some(node) = node {
439                        if !self.node_index.contains_key(&node.id) {
440                            let node_rep =
441                                Self::node_reputation(&node.last_contact);
442                            self.add_to_reputation_level(node_rep, node);
443                        } else {
444                            warn!("There exist multiple entries for same node id: {:?}", node.id);
445                        }
446                    }
447                }
448            }
449            Err(e) => {
450                warn!("Error reading node table file: {:?}", e);
451            }
452        }
453    }
454
455    pub fn sample_nodes(
456        &self, count: u32, _filter: &IpFilter,
457    ) -> Vec<NodeEntry> {
458        let mut nodes: Vec<NodeEntry> = Vec::new();
459        for _i in 0..count {
460            let mut rng = rand::thread_rng();
461            let node_rep_idx = rng.random_range(0..NODE_REPUTATION_LEVEL_COUNT);
462            let node_rep = NodeReputation::iter().nth(node_rep_idx).unwrap();
463            let node_rep_vec = &self.node_reputation_table[node_rep];
464            if !node_rep_vec.is_empty() {
465                let idx = rng.random_range(0..node_rep_vec.len());
466                let n = &node_rep_vec[idx];
467                nodes.push(NodeEntry {
468                    id: n.id,
469                    endpoint: n.endpoint.clone(),
470                });
471            }
472        }
473        let mut unique_nodes: Vec<NodeEntry> = Vec::new();
474        let mut nodes_set: HashSet<NodeId> = HashSet::new();
475        for n in nodes {
476            if !nodes_set.contains(&n.id) {
477                nodes_set.insert(n.id);
478                unique_nodes.push(n);
479            }
480        }
481        unique_nodes
482    }
483
484    /// Return a random sample set of nodes inside the table
485    pub fn sample_node_ids(
486        &self, count: u32, _filter: &IpFilter,
487    ) -> HashSet<NodeId> {
488        let mut node_id_set: HashSet<NodeId> = HashSet::new();
489        let mut rng = rand::thread_rng();
490        for _i in 0..count {
491            let node_rep_idx = rng.random_range(0..NODE_REPUTATION_LEVEL_COUNT);
492            let node_rep = NodeReputation::iter().nth(node_rep_idx).unwrap();
493            let node_rep_vec = &self.node_reputation_table[node_rep];
494            if !node_rep_vec.is_empty() {
495                let idx = rng.random_range(0..node_rep_vec.len());
496                let n = &node_rep_vec[idx];
497                if !node_id_set.contains(&n.id) {
498                    node_id_set.insert(n.id);
499                }
500            }
501        }
502
503        node_id_set
504    }
505
506    // If node exists, update last contact, insert otherwise.
507    // Endpoint will be updated if node exists.
508    pub fn update_last_contact(&mut self, node: Node) {
509        let mut _index = NodeReputationIndex::default();
510        let mut exist = false;
511        if let Some(index) = self.node_index.get_mut(&node.id) {
512            _index = *index;
513            exist = true;
514        }
515
516        let target_node_rep = Self::node_reputation(&node.last_contact);
517
518        if !exist {
519            self.add_to_reputation_level(target_node_rep, node);
520            return;
521        }
522
523        // check whether the node position will change
524        if target_node_rep == _index.0 {
525            let old_node = &mut self.node_reputation_table[_index.0][_index.1];
526            old_node.last_contact = node.last_contact;
527            old_node.endpoint = node.endpoint;
528        } else {
529            let mut removed_node =
530                self.remove_from_reputation_level(&_index).unwrap();
531            removed_node.last_contact = node.last_contact;
532            removed_node.endpoint = node.endpoint;
533            self.add_to_reputation_level(target_node_rep, removed_node);
534        }
535    }
536
537    // This function does not preserve runtime connection information
538    pub fn add_node(&mut self, mut node: Node, preserve_last_contact: bool) {
539        debug!("NodeTable {:?} add_node {:?}", self.path, node);
540        let mut _index = NodeReputationIndex::default();
541        let mut exist = false;
542        if let Some(index) = self.node_index.get_mut(&node.id) {
543            _index = *index;
544            exist = true;
545        }
546
547        if !exist {
548            let target_node_rep = Self::node_reputation(&node.last_contact);
549            self.add_to_reputation_level(target_node_rep, node);
550            return;
551        }
552
553        if preserve_last_contact {
554            let node_vec = &mut self.node_reputation_table[_index.0];
555            node.last_contact = node_vec[_index.1].last_contact;
556            node_vec[_index.1] = node;
557        } else {
558            let target_node_rep = Self::node_reputation(&node.last_contact);
559            // check whether the node position will change
560            if target_node_rep == _index.0 {
561                self.node_reputation_table[_index.0][_index.1] = node;
562            } else {
563                self.remove_from_reputation_level(&_index);
564                self.add_to_reputation_level(target_node_rep, node);
565            }
566        }
567    }
568
569    fn is_reputation_level_demoted(&self, index: &NodeReputationIndex) -> bool {
570        index.0 == NodeReputation::Demoted
571    }
572
573    fn remove_from_reputation_level(
574        &mut self, index: &NodeReputationIndex,
575    ) -> Option<Node> {
576        let node_rep_vec = &mut self.node_reputation_table[index.0];
577
578        if node_rep_vec.is_empty() || index.1 >= node_rep_vec.len() {
579            return None;
580        }
581
582        if node_rep_vec.len() - 1 == index.1 {
583            // to remove the last item
584            let node_id = node_rep_vec[node_rep_vec.len() - 1].id;
585            self.node_index.remove(&node_id);
586            return node_rep_vec.pop();
587        }
588
589        let tail_node = node_rep_vec.pop();
590        if let Some(tail_node) = tail_node {
591            let removed_node = node_rep_vec[index.1].clone();
592            self.node_index.remove(&removed_node.id);
593            if let Some(node_idx) = self.node_index.get_mut(&tail_node.id) {
594                node_rep_vec[index.1] = tail_node;
595                *node_idx = *index;
596                Some(removed_node)
597            } else {
598                panic!("Should not happen!");
599            }
600        } else {
601            panic!("Should not happen!");
602        }
603    }
604
605    fn add_to_reputation_level(
606        &mut self, node_rep: NodeReputation, node: Node,
607    ) {
608        let node_idx = self.node_reputation_table[node_rep].len();
609        let node_table_idx = NodeReputationIndex(node_rep, node_idx);
610        self.node_index.insert(node.id, node_table_idx);
611        self.node_reputation_table[node_rep].push(node);
612    }
613
614    /// Returns a list of ordered nodes according to their most recent contact
615    /// and filtering useless nodes. The algorithm for creating the sorted nodes
616    /// is:
617    /// - Contacts that aren't recent (older than 1 week) are discarded
618    /// - (1) Nodes with a successful contact are ordered (most recent success
619    ///   first)
620    /// - (2) Nodes with unknown contact (older than 1 week or new nodes) are
621    ///   randomly shuffled
622    /// - (3) Nodes with a failed contact are ordered (oldest failure first)
623    /// - The final result is the concatenation of (1), (2) and (3)
624    fn ordered_entries(&self) -> Vec<&Node> {
625        let mut success = Vec::new();
626        let mut failures = Vec::new();
627        let mut unknown = Vec::new();
628
629        for n in self.node_reputation_table[NodeReputation::Success].iter() {
630            if !self.useless_nodes.contains(&n.id) {
631                success.push(n);
632            }
633        }
634
635        for n in self.node_reputation_table[NodeReputation::Failure].iter() {
636            if !self.useless_nodes.contains(&n.id) {
637                failures.push(n);
638            }
639        }
640
641        for n in self.node_reputation_table[NodeReputation::Unknown].iter() {
642            if !self.useless_nodes.contains(&n.id) {
643                unknown.push(n);
644            }
645        }
646
647        success.sort_by(|a, b| {
648            let a = a.last_contact.expect(
649                "vector only contains values with defined last_contact; qed",
650            );
651            let b = b.last_contact.expect(
652                "vector only contains values with defined last_contact; qed",
653            );
654            // inverse ordering, most recent successes come first
655            b.time().cmp(&a.time())
656        });
657
658        failures.sort_by(|a, b| {
659            let a = a.last_contact.expect(
660                "vector only contains values with defined last_contact; qed",
661            );
662            let b = b.last_contact.expect(
663                "vector only contains values with defined last_contact; qed",
664            );
665            // normal ordering, most distant failures come first
666            a.time().cmp(&b.time())
667        });
668
669        unknown.shuffle(&mut rand::thread_rng());
670
671        success.append(&mut unknown);
672        success.append(&mut failures);
673        success
674    }
675
676    /// Returns node ids sorted by failure percentage, for nodes with the same
677    /// failure percentage the absolute number of failures is considered.
678    pub fn nodes(&self, filter: &IpFilter) -> Vec<NodeId> {
679        self.ordered_entries()
680            .iter()
681            .filter(|n| n.endpoint.is_allowed(&filter))
682            .map(|n| n.id)
683            .collect()
684    }
685
686    pub fn entries_with_filter(&self, filter: &IpFilter) -> Vec<NodeEntry> {
687        self.ordered_entries()
688            .iter()
689            .filter(|n| n.endpoint.is_allowed(&filter))
690            .map(|n| NodeEntry {
691                endpoint: n.endpoint.clone(),
692                id: n.id,
693            })
694            .collect()
695    }
696
697    /// Ordered list of all entries by failure percentage, for nodes with the
698    /// same failure percentage the absolute number of failures is
699    /// considered.
700    pub fn entries(&self) -> Vec<NodeEntry> {
701        self.ordered_entries()
702            .iter()
703            .map(|n| NodeEntry {
704                endpoint: n.endpoint.clone(),
705                id: n.id,
706            })
707            .collect()
708    }
709
710    /// Get particular node
711    pub fn get_mut(&mut self, id: &NodeId) -> Option<&mut Node> {
712        let index = self.node_index.get(id);
713        if let Some(index) = index {
714            Some(&mut self.node_reputation_table[index.0][index.1])
715        } else {
716            None
717        }
718    }
719
720    /// Get particular node
721    pub fn get(&self, id: &NodeId) -> Option<&Node> {
722        let index = self.node_index.get(id);
723        if let Some(index) = index {
724            Some(&self.node_reputation_table[index.0][index.1])
725        } else {
726            None
727        }
728    }
729
730    /// Check if a node exists in the table.
731    pub fn contains(&self, id: &NodeId) -> bool {
732        self.node_index.contains_key(id)
733    }
734
735    pub fn remove_with_id(&mut self, id: &NodeId) -> Option<Node> {
736        let mut _index;
737        if let Some(index) = self.node_index.get(id) {
738            _index = *index;
739        } else {
740            return None;
741        }
742
743        self.remove_from_reputation_level(&_index)
744    }
745
746    /// Set last contact as failure or demoted for a node
747    pub fn note_unsuccess_contact(
748        &mut self, id: &NodeId, by_connection: bool,
749        last_contact: Option<NodeContact>,
750    ) {
751        let mut _index;
752        if let Some(index) = self.node_index.get(id) {
753            _index = *index;
754        } else {
755            return;
756        }
757
758        let target_node_rep = Self::node_reputation(&last_contact);
759        if target_node_rep == _index.0 {
760            let node = &mut self.node_reputation_table[_index.0][_index.1];
761            node.last_contact = last_contact.clone();
762            if by_connection {
763                node.last_connected = last_contact.clone();
764            }
765        } else if self.is_reputation_level_demoted(&_index) {
766            // Only update node.last_connected
767            if by_connection {
768                let node = &mut self.node_reputation_table[_index.0][_index.1];
769                node.last_connected = last_contact.clone();
770            }
771        } else if let Some(mut node) =
772            self.remove_from_reputation_level(&_index)
773        {
774            node.last_contact = last_contact.clone();
775            if by_connection {
776                node.last_connected = last_contact.clone();
777            }
778            self.add_to_reputation_level(target_node_rep, node);
779        } else {
780            panic!("Should not happen!");
781        }
782    }
783
784    /// Set last contact as success for a node
785    pub fn note_success(
786        &mut self, id: &NodeId, by_connection: bool, token: Option<StreamToken>,
787    ) {
788        let mut _index;
789        if let Some(index) = self.node_index.get(id) {
790            _index = *index;
791        } else {
792            return;
793        }
794
795        let target_node_rep = NodeReputation::Success;
796        if target_node_rep == _index.0 {
797            let node = &mut self.node_reputation_table[_index.0][_index.1];
798            node.last_contact = Some(NodeContact::success());
799            if by_connection {
800                node.last_connected = Some(NodeContact::success());
801                if token != None {
802                    node.stream_token = token;
803                }
804            }
805        } else if self.is_reputation_level_demoted(&_index) {
806            // Only update node.last_connected
807            if by_connection {
808                let node = &mut self.node_reputation_table[_index.0][_index.1];
809                node.last_connected = Some(NodeContact::success());
810                if token != None {
811                    node.stream_token = token;
812                }
813            }
814        } else if let Some(mut node) =
815            self.remove_from_reputation_level(&_index)
816        {
817            node.last_contact = Some(NodeContact::success());
818            if by_connection {
819                node.last_connected = Some(NodeContact::success());
820                if token != None {
821                    node.stream_token = token;
822                }
823            }
824            self.add_to_reputation_level(target_node_rep, node);
825        } else {
826            panic!("Should not happen!");
827        }
828    }
829
830    /// Mark as useless, no further attempts to connect until next call to
831    /// `clear_useless`.
832    pub fn mark_as_useless(&mut self, id: &NodeId) {
833        self.useless_nodes.insert(id.clone());
834    }
835
836    /// Attempt to connect to useless nodes again.
837    pub fn clear_useless(&mut self) { self.useless_nodes.clear(); }
838
839    /// Save the (un)trusted_nodes.json file.
840    pub fn save(&self) {
841        let path = match self.path {
842            Some(ref path) => Path::new(path),
843            None => return,
844        };
845
846        if let Some(dir) = path.parent() {
847            if let Err(e) = fs::create_dir_all(dir) {
848                warn!("Error creating node table directory: {:?}", e);
849                return;
850            }
851        }
852
853        let node_ids = self.nodes(&IpFilter::default());
854        let nodes = node_ids
855            .into_iter()
856            .map(|id| {
857                let index = &self.node_index[&id];
858                &self.node_reputation_table[index.0][index.1]
859            })
860            .take(MAX_NODES)
861            .map(Into::into)
862            .collect();
863        let table = json::NodeTable { nodes };
864
865        match fs::File::create(&path) {
866            Ok(file) => {
867                if let Err(e) = serde_json::to_writer_pretty(file, &table) {
868                    warn!("Error writing node table file: {:?}", e);
869                }
870            }
871            Err(e) => {
872                warn!("Error creating node table file: {:?}", e);
873            }
874        }
875    }
876
877    pub fn all(&self) -> Vec<NodeId> {
878        self.node_index.keys().copied().collect()
879    }
880}
881
882impl Drop for NodeTable {
883    fn drop(&mut self) { self.save(); }
884}
885
886/// Check if node url is valid
887pub fn validate_node_url(url: &str) -> Option<Error> {
888    match Node::from_str(url) {
889        Ok(_) => None,
890        Err(e) => Some(e),
891    }
892}
893
894mod json {
895    use super::*;
896
897    #[derive(Serialize, Deserialize)]
898    pub struct NodeTable {
899        pub nodes: Vec<Node>,
900    }
901
902    #[derive(Serialize, Deserialize)]
903    pub enum NodeContact {
904        #[serde(rename = "success")]
905        Success(u64),
906        #[serde(rename = "failure")]
907        Failure(u64),
908        #[serde(rename = "demoted")]
909        Demoted(u64),
910    }
911
912    impl NodeContact {
913        pub fn into_node_contact(self) -> super::NodeContact {
914            match self {
915                NodeContact::Success(s) => super::NodeContact::Success(
916                    time::UNIX_EPOCH + Duration::from_secs(s),
917                ),
918                NodeContact::Failure(s) => super::NodeContact::Failure(
919                    time::UNIX_EPOCH + Duration::from_secs(s),
920                ),
921                NodeContact::Demoted(s) => super::NodeContact::Demoted(
922                    time::UNIX_EPOCH + Duration::from_secs(s),
923                ),
924            }
925        }
926    }
927
928    #[derive(Serialize, Deserialize)]
929    pub struct Node {
930        pub url: String,
931        pub last_contact: Option<NodeContact>,
932        pub tags: HashMap<String, String>,
933    }
934
935    impl Node {
936        pub fn into_node(self) -> Option<super::Node> {
937            match super::Node::from_str(&self.url) {
938                Ok(mut node) => {
939                    node.last_contact =
940                        self.last_contact.map(NodeContact::into_node_contact);
941                    node.tags = self.tags;
942                    Some(node)
943                }
944                _ => None,
945            }
946        }
947    }
948
949    impl<'a> From<&'a super::Node> for Node {
950        fn from(node: &'a super::Node) -> Self {
951            let last_contact = node.last_contact.and_then(|c| match c {
952                super::NodeContact::Success(t) => t
953                    .duration_since(time::UNIX_EPOCH)
954                    .ok()
955                    .map(|d| NodeContact::Success(d.as_secs())),
956                super::NodeContact::Failure(t) => t
957                    .duration_since(time::UNIX_EPOCH)
958                    .ok()
959                    .map(|d| NodeContact::Failure(d.as_secs())),
960                super::NodeContact::Demoted(t) => t
961                    .duration_since(time::UNIX_EPOCH)
962                    .ok()
963                    .map(|d| NodeContact::Demoted(d.as_secs())),
964            });
965
966            Node {
967                url: format!("{}", node),
968                last_contact,
969                tags: node.tags.clone(),
970            }
971        }
972    }
973}