network/
node_table.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::{ip_utils::*, AllowIP, Error, IpFilter};
6use cfx_types::H512;
7use cfx_util_macros::bail;
8use enum_map::{Enum, EnumMap};
9use io::StreamToken;
10use log::{debug, warn};
11use rand::{self, prelude::SliceRandom, Rng};
12use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream};
13use serde::Deserialize;
14use serde_derive::Serialize;
15use serde_json;
16use std::{
17    collections::{HashMap, HashSet},
18    fmt::{self, Display, Formatter},
19    fs,
20    hash::{Hash, Hasher},
21    net::{
22        Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6,
23        ToSocketAddrs,
24    },
25    path::{Path, PathBuf},
26    slice,
27    str::FromStr,
28    time::{self, Duration, SystemTime},
29};
30use strum::IntoEnumIterator;
31use strum_macros::EnumIter;
32
33/// Node public key
34pub type NodeId = H512;
35
36#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
37#[serde(rename_all = "camelCase")]
38/// Node address info
39pub struct NodeEndpoint {
40    /// IP(V4 or V6) address
41    pub address: SocketAddr,
42    /// Connection port.
43    pub udp_port: u16,
44}
45
46impl NodeEndpoint {
47    pub fn udp_address(&self) -> SocketAddr {
48        match self.address {
49            SocketAddr::V4(a) => {
50                SocketAddr::V4(SocketAddrV4::new(*a.ip(), self.udp_port))
51            }
52            SocketAddr::V6(a) => SocketAddr::V6(SocketAddrV6::new(
53                *a.ip(),
54                self.udp_port,
55                a.flowinfo(),
56                a.scope_id(),
57            )),
58        }
59    }
60
61    pub fn is_allowed(&self, filter: &IpFilter) -> bool {
62        (self.is_allowed_by_predefined(&filter.predefined)
63            || filter
64                .custom_allow
65                .iter()
66                .any(|ipnet| self.address.ip().is_within(ipnet)))
67            && !filter
68                .custom_block
69                .iter()
70                .any(|ipnet| self.address.ip().is_within(ipnet))
71    }
72
73    pub fn is_allowed_by_predefined(&self, filter: &AllowIP) -> bool {
74        match filter {
75            AllowIP::All => true,
76            AllowIP::Private => self.address.ip().is_usable_private(),
77            AllowIP::Public => self.address.ip().is_usable_public(),
78            AllowIP::None => false,
79        }
80    }
81
82    pub fn from_rlp(rlp: &Rlp) -> Result<Self, DecoderError> {
83        let tcp_port = rlp.val_at::<u16>(2)?;
84        let udp_port = rlp.val_at::<u16>(1)?;
85        let addr_bytes = rlp.at(0)?.data()?;
86        let address = match addr_bytes.len() {
87            4 => Ok(SocketAddr::V4(SocketAddrV4::new(
88                Ipv4Addr::new(
89                    addr_bytes[0],
90                    addr_bytes[1],
91                    addr_bytes[2],
92                    addr_bytes[3],
93                ),
94                tcp_port,
95            ))),
96            16 => {
97                let mut o: [u16; 8] = [0; 8];
98                for i in 0..8 {
99                    o[i] = ((addr_bytes[2 * i + 1] as u16) << 8)
100                        | (addr_bytes[2 * i] as u16);
101                }
102                Ok(SocketAddr::V6(SocketAddrV6::new(
103                    Ipv6Addr::new(
104                        o[0], o[1], o[2], o[3], o[4], o[5], o[6], o[7],
105                    ),
106                    tcp_port,
107                    0,
108                    0,
109                )))
110            }
111            _ => Err(DecoderError::RlpInconsistentLengthAndData),
112        }?;
113        Ok(NodeEndpoint { address, udp_port })
114    }
115
116    pub fn to_rlp(&self, rlp: &mut RlpStream) {
117        match self.address {
118            SocketAddr::V4(a) => {
119                rlp.append(&(&a.ip().octets()[..]));
120            }
121            SocketAddr::V6(a) => unsafe {
122                let segments = a.ip().segments();
123                let o: *const u8 = segments.as_ptr() as *const u8;
124                rlp.append(&slice::from_raw_parts(o, 16));
125            },
126        };
127        rlp.append(&self.udp_port);
128        rlp.append(&self.address.port());
129    }
130
131    pub fn to_rlp_list(&self, rlp: &mut RlpStream) {
132        rlp.begin_list(3);
133        self.to_rlp(rlp);
134    }
135
136    /// Validates that the port is not 0 and address IP is specified
137    pub fn is_valid(&self) -> bool {
138        self.udp_port != 0
139            && self.address.port() != 0
140            && match self.address {
141                SocketAddr::V4(a) => !a.ip().is_unspecified(),
142                SocketAddr::V6(a) => !a.ip().is_unspecified(),
143            }
144    }
145}
146
147impl FromStr for NodeEndpoint {
148    type Err = Error;
149
150    /// Create endpoint from string. Performs name resolution if given a host
151    /// name.
152    fn from_str(s: &str) -> Result<NodeEndpoint, Error> {
153        let address = s.to_socket_addrs().map(|mut i| i.next());
154        match address {
155            Ok(Some(a)) => Ok(NodeEndpoint {
156                address: a,
157                udp_port: a.port(),
158            }),
159            Ok(None) => bail!(Error::AddressResolve(None)),
160            Err(_) => Err(Error::AddressParse), /* always an io::Error
161                                                 * of InvalidInput
162                                                 * kind */
163        }
164    }
165}
166
167#[derive(Clone, Debug)]
168pub struct NodeEntry {
169    pub id: NodeId,
170    pub endpoint: NodeEndpoint,
171}
172
173impl Encodable for NodeEntry {
174    fn rlp_append(&self, s: &mut RlpStream) {
175        s.begin_list(4);
176        self.endpoint.to_rlp(s);
177        s.append(&self.id);
178    }
179}
180
181impl Decodable for NodeEntry {
182    fn decode(rlp: &Rlp) -> Result<Self, DecoderError> {
183        Ok(NodeEntry {
184            id: rlp.val_at(3)?,
185            endpoint: NodeEndpoint::from_rlp(rlp)?,
186        })
187    }
188}
189
190#[derive(Debug, PartialEq, Eq, Copy, Clone)]
191pub enum PeerType {
192    _Required,
193    Optional,
194}
195
196/// A type for representing an interaction (contact) with a node at a given time
197/// that was either a success or a failure.
198#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
199#[serde(rename_all = "camelCase")]
200pub enum NodeContact {
201    Success(SystemTime),
202    Failure(SystemTime),
203    Demoted(SystemTime),
204}
205
206impl NodeContact {
207    pub fn success() -> NodeContact { NodeContact::Success(SystemTime::now()) }
208
209    pub fn failure() -> NodeContact { NodeContact::Failure(SystemTime::now()) }
210
211    pub fn demoted() -> NodeContact { NodeContact::Demoted(SystemTime::now()) }
212
213    pub fn is_demoted(&self) -> bool { matches!(self, NodeContact::Demoted(_)) }
214
215    pub fn time(&self) -> SystemTime {
216        match *self {
217            NodeContact::Success(t)
218            | NodeContact::Failure(t)
219            | NodeContact::Demoted(t) => t,
220        }
221    }
222
223    pub fn success_for_duration(&self, due: Duration) -> bool {
224        if let NodeContact::Success(t) = *self {
225            if let Ok(d) = t.elapsed() {
226                return d > due;
227            }
228        }
229        false
230    }
231
232    /// Filters and old contact, returning `None` if it happened longer than a
233    /// week ago.
234    #[allow(dead_code)]
235    fn recent(&self) -> Option<&NodeContact> {
236        let t = self.time();
237        if let Ok(d) = t.elapsed() {
238            if d < Duration::from_secs(60 * 60 * 24 * 7) {
239                return Some(self);
240            }
241        }
242
243        None
244    }
245}
246
247#[derive(Debug, Clone, Serialize, Deserialize)]
248#[serde(rename_all = "camelCase")]
249pub struct Node {
250    pub id: NodeId,
251    pub endpoint: NodeEndpoint,
252
253    // Updated by both udp ping/pong message in discovery protocol
254    // and tcp connection event.
255    // This metric can be used in prioritizing selection of peers
256    // to establish outgoing connections.
257    // It can also be used in considering demoting a
258    // trusted peer to untrusted.
259    pub last_contact: Option<NodeContact>,
260    // Updated by tcp connection event.
261    // This metric is used to consider when to promote untrusted
262    // peers to trusted. This is a runtime information which
263    // does not need to be made persistent.
264    pub last_connected: Option<NodeContact>,
265    pub stream_token: Option<StreamToken>,
266    // Generally, it is used by protocol handler layer to attach
267    // some tags to node, so as to:
268    // 1. Sampling nodes with special tags, e.g.
269    //     - archive nodes first
270    //     - good credit nodes first
271    //     - good network nodes first
272    // 2. Refuse incoming connection from node with special tags.
273    pub tags: HashMap<String, String>,
274}
275
276impl Node {
277    pub fn new(id: NodeId, endpoint: NodeEndpoint) -> Node {
278        Node {
279            id,
280            endpoint,
281            last_contact: None,
282            last_connected: None,
283            stream_token: None,
284            tags: Default::default(),
285        }
286    }
287}
288
289impl Display for Node {
290    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
291        if self.endpoint.udp_port != self.endpoint.address.port() {
292            write!(
293                f,
294                "cfxnode://{:x}@{}+{}",
295                self.id, self.endpoint.address, self.endpoint.udp_port
296            )?;
297        } else {
298            write!(f, "cfxnode://{:x}@{}", self.id, self.endpoint.address)?;
299        }
300        Ok(())
301    }
302}
303
304impl FromStr for Node {
305    type Err = Error;
306
307    fn from_str(s: &str) -> Result<Self, Self::Err> {
308        let (id, endpoint) =
309            if let Some(id_and_address_str) = s.strip_prefix("cfxnode://") {
310                // A node url with format "cfxnode://ID@IP:PORT"
311                let delimiter_index =
312                    id_and_address_str.find("@").ok_or(Error::AddressParse)?;
313                (
314                    id_and_address_str[..delimiter_index]
315                        .parse()
316                        .map_err(|_| Error::InvalidNodeId)?,
317                    NodeEndpoint::from_str(
318                        &id_and_address_str[delimiter_index + 1..],
319                    )?,
320                )
321            } else {
322                // A simple address without node id.
323                (NodeId::default(), NodeEndpoint::from_str(s)?)
324            };
325
326        Ok(Node {
327            id,
328            endpoint,
329            last_contact: None,
330            last_connected: None,
331            stream_token: None,
332            tags: Default::default(),
333        })
334    }
335}
336
337impl PartialEq for Node {
338    fn eq(&self, other: &Self) -> bool { self.id == other.id }
339}
340
341impl Eq for Node {}
342
343impl Hash for Node {
344    fn hash<H>(&self, state: &mut H)
345    where H: Hasher {
346        self.id.hash(state)
347    }
348}
349
350const MAX_NODES: usize = 4096;
351
352#[derive(
353    Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq, Enum, EnumIter, Default,
354)]
355enum NodeReputation {
356    Success = 0,
357    #[default]
358    Unknown = 1,
359    Failure = 2,
360    Demoted = 3,
361}
362
363const NODE_REPUTATION_LEVEL_COUNT: usize = 3;
364
365#[derive(Default, Clone, Copy)]
366struct NodeReputationIndex(NodeReputation, usize);
367
368/// Node table backed by disk file.
369pub struct NodeTable {
370    /// A vector list of nodes for each reputation level
371    node_reputation_table: EnumMap<NodeReputation, Vec<Node>>,
372    /// Map node id to the reputation level and the index in the above table
373    node_index: HashMap<NodeId, NodeReputationIndex>,
374    useless_nodes: HashSet<NodeId>,
375    path: Option<PathBuf>,
376}
377
378impl NodeTable {
379    pub fn new(dir: Option<String>, filename: &str) -> NodeTable {
380        let path = dir.map(|dir| {
381            let mut buf = PathBuf::from(dir);
382            buf.push(filename);
383            buf
384        });
385
386        let mut node_table = NodeTable {
387            node_reputation_table: EnumMap::default(),
388            node_index: HashMap::new(),
389            path,
390            useless_nodes: HashSet::new(),
391        };
392
393        node_table.load_from_file();
394        node_table
395    }
396
397    fn node_reputation(contact: &Option<NodeContact>) -> NodeReputation {
398        if let Some(contact) = contact {
399            match contact {
400                NodeContact::Success(_) => NodeReputation::Success,
401                NodeContact::Failure(_) => NodeReputation::Failure,
402                NodeContact::Demoted(_) => NodeReputation::Demoted,
403                //_ => panic!("Unknown contact information!"),
404            }
405        } else {
406            NodeReputation::Unknown
407        }
408    }
409
410    fn load_from_file(&mut self) {
411        let path = match self.path {
412            Some(ref path) => path,
413            None => return,
414        };
415
416        let file = match fs::File::open(path) {
417            Ok(file) => file,
418            Err(e) => {
419                debug!("node table file not found: {:?}", e);
420                return;
421            }
422        };
423        let res: Result<json::NodeTable, _> = serde_json::from_reader(file);
424        match res {
425            Ok(table) => {
426                for n in table.nodes {
427                    let node = n.into_node();
428                    if let Some(node) = node {
429                        if !self.node_index.contains_key(&node.id) {
430                            let node_rep =
431                                Self::node_reputation(&node.last_contact);
432                            self.add_to_reputation_level(node_rep, node);
433                        } else {
434                            warn!("There exist multiple entries for same node id: {:?}", node.id);
435                        }
436                    }
437                }
438            }
439            Err(e) => {
440                warn!("Error reading node table file: {:?}", e);
441            }
442        }
443    }
444
445    pub fn sample_nodes(
446        &self, count: u32, _filter: &IpFilter,
447    ) -> Vec<NodeEntry> {
448        let mut nodes: Vec<NodeEntry> = Vec::new();
449        for _i in 0..count {
450            let mut rng = rand::rng();
451            let node_rep_idx = rng.random_range(0..NODE_REPUTATION_LEVEL_COUNT);
452            let node_rep = NodeReputation::iter().nth(node_rep_idx).unwrap();
453            let node_rep_vec = &self.node_reputation_table[node_rep];
454            if !node_rep_vec.is_empty() {
455                let idx = rng.random_range(0..node_rep_vec.len());
456                let n = &node_rep_vec[idx];
457                nodes.push(NodeEntry {
458                    id: n.id,
459                    endpoint: n.endpoint.clone(),
460                });
461            }
462        }
463        let mut unique_nodes: Vec<NodeEntry> = Vec::new();
464        let mut nodes_set: HashSet<NodeId> = HashSet::new();
465        for n in nodes {
466            if !nodes_set.contains(&n.id) {
467                nodes_set.insert(n.id);
468                unique_nodes.push(n);
469            }
470        }
471        unique_nodes
472    }
473
474    /// Return a random sample set of nodes inside the table
475    pub fn sample_node_ids(
476        &self, count: u32, _filter: &IpFilter,
477    ) -> HashSet<NodeId> {
478        let mut node_id_set: HashSet<NodeId> = HashSet::new();
479        let mut rng = rand::rng();
480        for _i in 0..count {
481            let node_rep_idx = rng.random_range(0..NODE_REPUTATION_LEVEL_COUNT);
482            let node_rep = NodeReputation::iter().nth(node_rep_idx).unwrap();
483            let node_rep_vec = &self.node_reputation_table[node_rep];
484            if !node_rep_vec.is_empty() {
485                let idx = rng.random_range(0..node_rep_vec.len());
486                let n = &node_rep_vec[idx];
487                if !node_id_set.contains(&n.id) {
488                    node_id_set.insert(n.id);
489                }
490            }
491        }
492
493        node_id_set
494    }
495
496    // If node exists, update last contact, insert otherwise.
497    // Endpoint will be updated if node exists.
498    pub fn update_last_contact(&mut self, node: Node) {
499        let mut _index = NodeReputationIndex::default();
500        let mut exist = false;
501        if let Some(index) = self.node_index.get_mut(&node.id) {
502            _index = *index;
503            exist = true;
504        }
505
506        let target_node_rep = Self::node_reputation(&node.last_contact);
507
508        if !exist {
509            self.add_to_reputation_level(target_node_rep, node);
510            return;
511        }
512
513        // check whether the node position will change
514        if target_node_rep == _index.0 {
515            let old_node = &mut self.node_reputation_table[_index.0][_index.1];
516            old_node.last_contact = node.last_contact;
517            old_node.endpoint = node.endpoint;
518        } else {
519            let mut removed_node =
520                self.remove_from_reputation_level(&_index).unwrap();
521            removed_node.last_contact = node.last_contact;
522            removed_node.endpoint = node.endpoint;
523            self.add_to_reputation_level(target_node_rep, removed_node);
524        }
525    }
526
527    // This function does not preserve runtime connection information
528    pub fn add_node(&mut self, mut node: Node, preserve_last_contact: bool) {
529        debug!("NodeTable {:?} add_node {:?}", self.path, node);
530        let mut _index = NodeReputationIndex::default();
531        let mut exist = false;
532        if let Some(index) = self.node_index.get_mut(&node.id) {
533            _index = *index;
534            exist = true;
535        }
536
537        if !exist {
538            let target_node_rep = Self::node_reputation(&node.last_contact);
539            self.add_to_reputation_level(target_node_rep, node);
540            return;
541        }
542
543        if preserve_last_contact {
544            let node_vec = &mut self.node_reputation_table[_index.0];
545            node.last_contact = node_vec[_index.1].last_contact;
546            node_vec[_index.1] = node;
547        } else {
548            let target_node_rep = Self::node_reputation(&node.last_contact);
549            // check whether the node position will change
550            if target_node_rep == _index.0 {
551                self.node_reputation_table[_index.0][_index.1] = node;
552            } else {
553                self.remove_from_reputation_level(&_index);
554                self.add_to_reputation_level(target_node_rep, node);
555            }
556        }
557    }
558
559    fn is_reputation_level_demoted(&self, index: &NodeReputationIndex) -> bool {
560        index.0 == NodeReputation::Demoted
561    }
562
563    fn remove_from_reputation_level(
564        &mut self, index: &NodeReputationIndex,
565    ) -> Option<Node> {
566        let node_rep_vec = &mut self.node_reputation_table[index.0];
567
568        if node_rep_vec.is_empty() || index.1 >= node_rep_vec.len() {
569            return None;
570        }
571
572        if node_rep_vec.len() - 1 == index.1 {
573            // to remove the last item
574            let node_id = node_rep_vec[node_rep_vec.len() - 1].id;
575            self.node_index.remove(&node_id);
576            return node_rep_vec.pop();
577        }
578
579        let tail_node = node_rep_vec.pop();
580        if let Some(tail_node) = tail_node {
581            let removed_node = node_rep_vec[index.1].clone();
582            self.node_index.remove(&removed_node.id);
583            if let Some(node_idx) = self.node_index.get_mut(&tail_node.id) {
584                node_rep_vec[index.1] = tail_node;
585                *node_idx = *index;
586                Some(removed_node)
587            } else {
588                panic!("Should not happen!");
589            }
590        } else {
591            panic!("Should not happen!");
592        }
593    }
594
595    fn add_to_reputation_level(
596        &mut self, node_rep: NodeReputation, node: Node,
597    ) {
598        let node_idx = self.node_reputation_table[node_rep].len();
599        let node_table_idx = NodeReputationIndex(node_rep, node_idx);
600        self.node_index.insert(node.id, node_table_idx);
601        self.node_reputation_table[node_rep].push(node);
602    }
603
604    /// Returns a list of ordered nodes according to their most recent contact
605    /// and filtering useless nodes. The algorithm for creating the sorted nodes
606    /// is:
607    /// - Contacts that aren't recent (older than 1 week) are discarded
608    /// - (1) Nodes with a successful contact are ordered (most recent success
609    ///   first)
610    /// - (2) Nodes with unknown contact (older than 1 week or new nodes) are
611    ///   randomly shuffled
612    /// - (3) Nodes with a failed contact are ordered (oldest failure first)
613    /// - The final result is the concatenation of (1), (2) and (3)
614    fn ordered_entries(&self) -> Vec<&Node> {
615        let mut success = Vec::new();
616        let mut failures = Vec::new();
617        let mut unknown = Vec::new();
618
619        for n in self.node_reputation_table[NodeReputation::Success].iter() {
620            if !self.useless_nodes.contains(&n.id) {
621                success.push(n);
622            }
623        }
624
625        for n in self.node_reputation_table[NodeReputation::Failure].iter() {
626            if !self.useless_nodes.contains(&n.id) {
627                failures.push(n);
628            }
629        }
630
631        for n in self.node_reputation_table[NodeReputation::Unknown].iter() {
632            if !self.useless_nodes.contains(&n.id) {
633                unknown.push(n);
634            }
635        }
636
637        success.sort_by(|a, b| {
638            let a = a.last_contact.expect(
639                "vector only contains values with defined last_contact; qed",
640            );
641            let b = b.last_contact.expect(
642                "vector only contains values with defined last_contact; qed",
643            );
644            // inverse ordering, most recent successes come first
645            b.time().cmp(&a.time())
646        });
647
648        failures.sort_by(|a, b| {
649            let a = a.last_contact.expect(
650                "vector only contains values with defined last_contact; qed",
651            );
652            let b = b.last_contact.expect(
653                "vector only contains values with defined last_contact; qed",
654            );
655            // normal ordering, most distant failures come first
656            a.time().cmp(&b.time())
657        });
658
659        unknown.shuffle(&mut rand::rng());
660
661        success.append(&mut unknown);
662        success.append(&mut failures);
663        success
664    }
665
666    /// Returns node ids sorted by failure percentage, for nodes with the same
667    /// failure percentage the absolute number of failures is considered.
668    pub fn nodes(&self, filter: &IpFilter) -> Vec<NodeId> {
669        self.ordered_entries()
670            .iter()
671            .filter(|n| n.endpoint.is_allowed(filter))
672            .map(|n| n.id)
673            .collect()
674    }
675
676    pub fn entries_with_filter(&self, filter: &IpFilter) -> Vec<NodeEntry> {
677        self.ordered_entries()
678            .iter()
679            .filter(|n| n.endpoint.is_allowed(filter))
680            .map(|n| NodeEntry {
681                endpoint: n.endpoint.clone(),
682                id: n.id,
683            })
684            .collect()
685    }
686
687    /// Ordered list of all entries by failure percentage, for nodes with the
688    /// same failure percentage the absolute number of failures is
689    /// considered.
690    pub fn entries(&self) -> Vec<NodeEntry> {
691        self.ordered_entries()
692            .iter()
693            .map(|n| NodeEntry {
694                endpoint: n.endpoint.clone(),
695                id: n.id,
696            })
697            .collect()
698    }
699
700    /// Get particular node
701    pub fn get_mut(&mut self, id: &NodeId) -> Option<&mut Node> {
702        let index = self.node_index.get(id);
703        if let Some(index) = index {
704            Some(&mut self.node_reputation_table[index.0][index.1])
705        } else {
706            None
707        }
708    }
709
710    /// Get particular node
711    pub fn get(&self, id: &NodeId) -> Option<&Node> {
712        let index = self.node_index.get(id);
713        if let Some(index) = index {
714            Some(&self.node_reputation_table[index.0][index.1])
715        } else {
716            None
717        }
718    }
719
720    /// Check if a node exists in the table.
721    pub fn contains(&self, id: &NodeId) -> bool {
722        self.node_index.contains_key(id)
723    }
724
725    pub fn remove_with_id(&mut self, id: &NodeId) -> Option<Node> {
726        let mut _index;
727        if let Some(index) = self.node_index.get(id) {
728            _index = *index;
729        } else {
730            return None;
731        }
732
733        self.remove_from_reputation_level(&_index)
734    }
735
736    /// Set last contact as failure or demoted for a node
737    pub fn note_unsuccess_contact(
738        &mut self, id: &NodeId, by_connection: bool,
739        last_contact: Option<NodeContact>,
740    ) {
741        let mut _index;
742        if let Some(index) = self.node_index.get(id) {
743            _index = *index;
744        } else {
745            return;
746        }
747
748        let target_node_rep = Self::node_reputation(&last_contact);
749        if target_node_rep == _index.0 {
750            let node = &mut self.node_reputation_table[_index.0][_index.1];
751            node.last_contact = last_contact;
752            if by_connection {
753                node.last_connected = last_contact;
754            }
755        } else if self.is_reputation_level_demoted(&_index) {
756            // Only update node.last_connected
757            if by_connection {
758                let node = &mut self.node_reputation_table[_index.0][_index.1];
759                node.last_connected = last_contact;
760            }
761        } else if let Some(mut node) =
762            self.remove_from_reputation_level(&_index)
763        {
764            node.last_contact = last_contact;
765            if by_connection {
766                node.last_connected = last_contact;
767            }
768            self.add_to_reputation_level(target_node_rep, node);
769        } else {
770            panic!("Should not happen!");
771        }
772    }
773
774    /// Set last contact as success for a node
775    pub fn note_success(
776        &mut self, id: &NodeId, by_connection: bool, token: Option<StreamToken>,
777    ) {
778        let mut _index;
779        if let Some(index) = self.node_index.get(id) {
780            _index = *index;
781        } else {
782            return;
783        }
784
785        let target_node_rep = NodeReputation::Success;
786        if target_node_rep == _index.0 {
787            let node = &mut self.node_reputation_table[_index.0][_index.1];
788            node.last_contact = Some(NodeContact::success());
789            if by_connection {
790                node.last_connected = Some(NodeContact::success());
791                if token.is_some() {
792                    node.stream_token = token;
793                }
794            }
795        } else if self.is_reputation_level_demoted(&_index) {
796            // Only update node.last_connected
797            if by_connection {
798                let node = &mut self.node_reputation_table[_index.0][_index.1];
799                node.last_connected = Some(NodeContact::success());
800                if token.is_some() {
801                    node.stream_token = token;
802                }
803            }
804        } else if let Some(mut node) =
805            self.remove_from_reputation_level(&_index)
806        {
807            node.last_contact = Some(NodeContact::success());
808            if by_connection {
809                node.last_connected = Some(NodeContact::success());
810                if token.is_some() {
811                    node.stream_token = token;
812                }
813            }
814            self.add_to_reputation_level(target_node_rep, node);
815        } else {
816            panic!("Should not happen!");
817        }
818    }
819
820    /// Mark as useless, no further attempts to connect until next call to
821    /// `clear_useless`.
822    pub fn mark_as_useless(&mut self, id: &NodeId) {
823        self.useless_nodes.insert(*id);
824    }
825
826    /// Attempt to connect to useless nodes again.
827    pub fn clear_useless(&mut self) { self.useless_nodes.clear(); }
828
829    /// Save the (un)trusted_nodes.json file.
830    pub fn save(&self) {
831        let path = match self.path {
832            Some(ref path) => Path::new(path),
833            None => return,
834        };
835
836        if let Some(dir) = path.parent() {
837            if let Err(e) = fs::create_dir_all(dir) {
838                warn!("Error creating node table directory: {:?}", e);
839                return;
840            }
841        }
842
843        let node_ids = self.nodes(&IpFilter::default());
844        let nodes = node_ids
845            .into_iter()
846            .map(|id| {
847                let index = &self.node_index[&id];
848                &self.node_reputation_table[index.0][index.1]
849            })
850            .take(MAX_NODES)
851            .map(Into::into)
852            .collect();
853        let table = json::NodeTable { nodes };
854
855        match fs::File::create(path) {
856            Ok(file) => {
857                if let Err(e) = serde_json::to_writer_pretty(file, &table) {
858                    warn!("Error writing node table file: {:?}", e);
859                }
860            }
861            Err(e) => {
862                warn!("Error creating node table file: {:?}", e);
863            }
864        }
865    }
866
867    pub fn all(&self) -> Vec<NodeId> {
868        self.node_index.keys().copied().collect()
869    }
870}
871
872impl Drop for NodeTable {
873    fn drop(&mut self) { self.save(); }
874}
875
876/// Check if node url is valid
877pub fn validate_node_url(url: &str) -> Option<Error> {
878    Node::from_str(url).err()
879}
880
881mod json {
882    use super::*;
883
884    #[derive(Serialize, Deserialize)]
885    pub struct NodeTable {
886        pub nodes: Vec<Node>,
887    }
888
889    #[derive(Serialize, Deserialize)]
890    pub enum NodeContact {
891        #[serde(rename = "success")]
892        Success(u64),
893        #[serde(rename = "failure")]
894        Failure(u64),
895        #[serde(rename = "demoted")]
896        Demoted(u64),
897    }
898
899    impl NodeContact {
900        pub fn into_node_contact(self) -> super::NodeContact {
901            match self {
902                NodeContact::Success(s) => super::NodeContact::Success(
903                    time::UNIX_EPOCH + Duration::from_secs(s),
904                ),
905                NodeContact::Failure(s) => super::NodeContact::Failure(
906                    time::UNIX_EPOCH + Duration::from_secs(s),
907                ),
908                NodeContact::Demoted(s) => super::NodeContact::Demoted(
909                    time::UNIX_EPOCH + Duration::from_secs(s),
910                ),
911            }
912        }
913    }
914
915    #[derive(Serialize, Deserialize)]
916    pub struct Node {
917        pub url: String,
918        pub last_contact: Option<NodeContact>,
919        pub tags: HashMap<String, String>,
920    }
921
922    impl Node {
923        pub fn into_node(self) -> Option<super::Node> {
924            match super::Node::from_str(&self.url) {
925                Ok(mut node) => {
926                    node.last_contact =
927                        self.last_contact.map(NodeContact::into_node_contact);
928                    node.tags = self.tags;
929                    Some(node)
930                }
931                _ => None,
932            }
933        }
934    }
935
936    impl<'a> From<&'a super::Node> for Node {
937        fn from(node: &'a super::Node) -> Self {
938            let last_contact = node.last_contact.and_then(|c| match c {
939                super::NodeContact::Success(t) => t
940                    .duration_since(time::UNIX_EPOCH)
941                    .ok()
942                    .map(|d| NodeContact::Success(d.as_secs())),
943                super::NodeContact::Failure(t) => t
944                    .duration_since(time::UNIX_EPOCH)
945                    .ok()
946                    .map(|d| NodeContact::Failure(d.as_secs())),
947                super::NodeContact::Demoted(t) => t
948                    .duration_since(time::UNIX_EPOCH)
949                    .ok()
950                    .map(|d| NodeContact::Demoted(d.as_secs())),
951            });
952
953            Node {
954                url: format!("{}", node),
955                last_contact,
956                tags: node.tags.clone(),
957            }
958        }
959    }
960}