network/
lib.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5#![allow(deprecated)]
6use cfxkey as keylib;
7use io as iolib;
8use keccak_hash as hash;
9
10pub const PROTOCOL_ID_SIZE: usize = 3;
11pub type ProtocolId = [u8; PROTOCOL_ID_SIZE];
12pub type HandlerWorkType = u8;
13pub type PeerId = usize;
14
15mod connection;
16mod discovery;
17mod error;
18mod handshake;
19mod ip;
20mod ip_utils;
21mod node_database;
22pub mod node_table;
23pub mod service;
24mod session;
25mod session_manager;
26pub mod throttling;
27
28pub use crate::{
29    error::{DisconnectReason, Error, ThrottlingReason},
30    ip::SessionIpLimitConfig,
31    node_table::Node,
32    service::NetworkService,
33    session::SessionDetails,
34};
35pub use io::TimerToken;
36
37use crate::{
38    node_table::NodeId,
39    service::{
40        ProtocolVersion, DEFAULT_CONNECTION_LIFETIME_FOR_PROMOTION,
41        DEFAULT_DISCOVERY_REFRESH_TIMEOUT, DEFAULT_DISCOVERY_ROUND_TIMEOUT,
42        DEFAULT_FAST_DISCOVERY_REFRESH_TIMEOUT, DEFAULT_HOUSEKEEPING_TIMEOUT,
43        DEFAULT_NODE_TABLE_TIMEOUT,
44    },
45};
46use cfx_addr::Network;
47use diem_types::validator_config::{ConsensusPublicKey, ConsensusVRFPublicKey};
48use ipnetwork::{IpNetwork, IpNetworkError};
49use keylib::Secret;
50use priority_send_queue::SendQueuePriority;
51use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream};
52use serde_derive::{Deserialize, Serialize};
53use std::{
54    cmp::Ordering,
55    net::{Ipv4Addr, SocketAddr, SocketAddrV4},
56    str::{self, FromStr},
57    sync::Arc,
58    time::{Duration, SystemTime, UNIX_EPOCH},
59};
60
61pub const NODE_TAG_NODE_TYPE: &str = "node_type";
62pub const NODE_TAG_ARCHIVE: &str = "archive";
63pub const NODE_TAG_FULL: &str = "full";
64
65#[derive(Debug, Clone, PartialEq)]
66pub struct NetworkConfiguration {
67    pub is_consortium: bool,
68    /// Network identifier
69    pub id: u64,
70    network_type: Network,
71    /// Directory path to store general network configuration. None means
72    /// nothing will be saved
73    pub config_path: Option<String>,
74    pub listen_address: Option<SocketAddr>,
75    /// IP address to advertise. Detected automatically if none.
76    pub public_address: Option<SocketAddr>,
77    pub udp_port: Option<u16>,
78    /// Enable NAT configuration
79    pub nat_enabled: bool,
80    /// Enable discovery
81    pub discovery_enabled: bool,
82    pub boot_nodes: Vec<String>,
83    /// Use provided node key instead of default
84    pub use_secret: Option<Secret>,
85    /// Maximum number of outgoing peers
86    pub max_outgoing_peers: usize,
87    /// Maximum number of outgoing connections to archive nodes. 0 represents
88    /// not required to connect to archive nodes. E.g. light node or full node
89    /// need not to connect to archive nodes.
90    pub max_outgoing_peers_archive: usize,
91    /// Maximum number of incoming peers
92    pub max_incoming_peers: usize,
93    /// Maximum number of ongoing handshakes
94    pub max_handshakes: usize,
95    /// List of reserved node addresses.
96    pub reserved_nodes: Vec<String>,
97    /// IP filter
98    pub ip_filter: IpFilter,
99    /// Timeout duration for initiating peer connection management
100    pub housekeeping_timeout: Duration,
101    /// Timeout duration for refreshing discovery protocol
102    /// when there are enough outgoing connections
103    pub discovery_refresh_timeout: Duration,
104    /// Timeout duration for refreshing discovery protocol
105    /// when there are NOT enough outgoing connections
106    pub fast_discovery_refresh_timeout: Duration,
107    /// Period between consecutive rounds of the same current discovery process
108    pub discovery_round_timeout: Duration,
109    /// Timeout duration for persisting node table
110    pub node_table_timeout: Duration,
111    /// Connection lifetime threshold for promotion
112    pub connection_lifetime_for_promotion: Duration,
113    pub test_mode: bool,
114    /// Maximum number of P2P nodes for subnet B (ip/16).
115    pub subnet_quota: usize,
116    pub session_ip_limit_config: SessionIpLimitConfig,
117
118    pub discovery_config: DiscoveryConfiguration,
119}
120
121impl NetworkConfiguration {
122    pub fn new(id: u64, discovery_config: DiscoveryConfiguration) -> Self {
123        let network_type = Self::network_id_to_known_cfx_network(id);
124
125        NetworkConfiguration {
126            is_consortium: false,
127            id,
128            network_type,
129            config_path: Some("./net_config".to_string()),
130            listen_address: None,
131            public_address: None,
132            udp_port: None,
133            nat_enabled: true,
134            discovery_enabled: false,
135            boot_nodes: Vec::new(),
136            use_secret: None,
137            max_outgoing_peers: 0,
138            max_outgoing_peers_archive: 0,
139            max_incoming_peers: 0,
140            max_handshakes: 0,
141            reserved_nodes: Vec::new(),
142            ip_filter: IpFilter::default(),
143            housekeeping_timeout: DEFAULT_HOUSEKEEPING_TIMEOUT,
144            discovery_refresh_timeout: DEFAULT_DISCOVERY_REFRESH_TIMEOUT,
145            fast_discovery_refresh_timeout:
146                DEFAULT_FAST_DISCOVERY_REFRESH_TIMEOUT,
147            discovery_round_timeout: DEFAULT_DISCOVERY_ROUND_TIMEOUT,
148            node_table_timeout: DEFAULT_NODE_TABLE_TIMEOUT,
149            connection_lifetime_for_promotion:
150                DEFAULT_CONNECTION_LIFETIME_FOR_PROMOTION,
151            test_mode: false,
152            subnet_quota: 32,
153            session_ip_limit_config: SessionIpLimitConfig::default(),
154            discovery_config,
155        }
156    }
157
158    pub fn new_with_port(
159        id: u64, port: u16, discovery_config: DiscoveryConfiguration,
160    ) -> NetworkConfiguration {
161        let mut config = NetworkConfiguration::new(id, discovery_config);
162        config.listen_address = Some(SocketAddr::V4(SocketAddrV4::new(
163            Ipv4Addr::new(0, 0, 0, 0),
164            port,
165        )));
166        config
167    }
168
169    pub fn get_network_type(&self) -> &Network { &self.network_type }
170
171    pub fn network_id_to_known_cfx_network(id: u64) -> Network {
172        match id {
173            1 => Network::Test,
174            1029 => Network::Main,
175            n => Network::Id(n),
176        }
177    }
178}
179
180#[derive(Clone, Debug, PartialEq, Default)]
181pub struct DiscoveryConfiguration {
182    pub discover_node_count: u32,
183    pub expire_time: Duration,
184    pub find_node_timeout: Duration,
185    pub max_nodes_ping: usize,
186    pub ping_timeout: Duration,
187    pub throttling_interval: Duration,
188    pub throttling_limit_ping: usize,
189    pub throttling_limit_find_nodes: usize,
190}
191
192impl DiscoveryConfiguration {
193    fn expire_timestamp(&self) -> u64 {
194        (SystemTime::now() + self.expire_time)
195            .duration_since(UNIX_EPOCH)
196            .unwrap_or_default()
197            .as_secs()
198    }
199}
200
201/// Type of NAT resolving method
202#[derive(Debug, PartialEq, Eq, Clone)]
203pub enum NatType {
204    Nothing,
205    Any,
206    UPnP,
207    NatPMP,
208}
209
210#[derive(Clone)]
211pub enum NetworkIoMessage {
212    Start,
213    AddHandler {
214        handler: Arc<dyn NetworkProtocolHandler + Sync>,
215        protocol: ProtocolId,
216        version: ProtocolVersion,
217        callback: std::sync::mpsc::SyncSender<()>,
218    },
219    /// Register a new protocol timer
220    AddTimer {
221        /// Protocol Id.
222        protocol: ProtocolId,
223        /// Timer token.
224        token: TimerToken,
225        /// Timer delay.
226        delay: Duration,
227    },
228    DispatchWork {
229        /// Protocol Id.
230        protocol: ProtocolId,
231        /// Work type.
232        work_type: HandlerWorkType,
233    },
234    HandleProtocolMessage {
235        protocol: ProtocolId,
236        peer: PeerId,
237        node_id: NodeId,
238        data: Vec<u8>,
239    },
240}
241
242pub trait NetworkProtocolHandler: Sync + Send {
243    fn minimum_supported_version(&self) -> ProtocolVersion;
244
245    fn initialize(&self, _io: &dyn NetworkContext);
246
247    fn on_message(
248        &self, io: &dyn NetworkContext, node_id: &NodeId, data: &[u8],
249    );
250
251    fn on_peer_connected(
252        &self, io: &dyn NetworkContext, node_id: &NodeId,
253        peer_protocol_version: ProtocolVersion,
254        pos_public_key: Option<(ConsensusPublicKey, ConsensusVRFPublicKey)>,
255    );
256
257    fn on_peer_disconnected(&self, io: &dyn NetworkContext, node_id: &NodeId);
258
259    fn on_timeout(&self, io: &dyn NetworkContext, timer: TimerToken);
260
261    fn send_local_message(&self, _io: &dyn NetworkContext, _message: Vec<u8>);
262
263    fn on_work_dispatch(
264        &self, _io: &dyn NetworkContext, _work_type: HandlerWorkType,
265    );
266}
267
268#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
269pub enum UpdateNodeOperation {
270    Failure,
271    Demotion,
272    Remove,
273}
274
275pub trait NetworkContext {
276    fn get_protocol(&self) -> ProtocolId;
277
278    fn get_peer_connection_origin(&self, node_id: &NodeId) -> Option<bool>;
279
280    fn send(
281        &self, node_id: &NodeId, msg: Vec<u8>,
282        min_protocol_version: ProtocolVersion,
283        version_valid_till: ProtocolVersion, priority: SendQueuePriority,
284    ) -> Result<(), Error>;
285
286    fn disconnect_peer(
287        &self, node_id: &NodeId, op: Option<UpdateNodeOperation>, reason: &str,
288    );
289
290    /// Register a new IO timer. 'IoHandler::timeout' will be called with the
291    /// token.
292    fn register_timer(
293        &self, token: TimerToken, delay: Duration,
294    ) -> Result<(), Error>;
295
296    fn dispatch_work(&self, work_type: HandlerWorkType);
297
298    fn insert_peer_node_tag(&self, peer: NodeId, key: &str, value: &str);
299
300    fn is_peer_self(&self, _node_id: &NodeId) -> bool;
301
302    fn self_node_id(&self) -> NodeId;
303}
304
305#[derive(Debug, Clone)]
306pub struct SessionMetadata {
307    pub id: Option<NodeId>,
308    /// There won't be many protocols so it's faster to use Vec than Map.
309    pub peer_protocols: Vec<ProtocolInfo>,
310    pub originated: bool,
311    /// Packet header version of the peer.
312    pub peer_header_version: u8,
313}
314
315#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
316pub struct ProtocolInfo {
317    pub protocol: ProtocolId,
318    pub version: ProtocolVersion,
319}
320
321impl Encodable for ProtocolInfo {
322    fn rlp_append(&self, rlp: &mut RlpStream) {
323        rlp.begin_list(2);
324        rlp.append(&&self.protocol[..]);
325        rlp.append(&self.version);
326    }
327}
328
329impl Decodable for ProtocolInfo {
330    fn decode(rlp: &Rlp) -> Result<Self, DecoderError> {
331        let p: Vec<u8> = rlp.val_at(0)?;
332        if p.len() != 3 {
333            return Err(DecoderError::Custom(
334                "Invalid subprotocol string length",
335            ));
336        }
337        let mut protocol: ProtocolId = [0u8; 3];
338        protocol.clone_from_slice(&p);
339        Ok(ProtocolInfo {
340            protocol,
341            version: rlp.val_at(1)?,
342        })
343    }
344}
345
346impl PartialOrd for ProtocolInfo {
347    fn partial_cmp(&self, other: &ProtocolInfo) -> Option<Ordering> {
348        Some(self.cmp(other))
349    }
350}
351
352impl Ord for ProtocolInfo {
353    fn cmp(&self, other: &ProtocolInfo) -> Ordering {
354        self.protocol.cmp(&other.protocol)
355    }
356}
357
358#[derive(Serialize, Deserialize)]
359pub struct PeerInfo {
360    pub id: PeerId,
361    pub addr: SocketAddr,
362    pub nodeid: NodeId,
363    pub protocols: Vec<ProtocolInfo>,
364}
365
366#[derive(Clone, Debug, PartialEq, Eq)]
367pub struct IpFilter {
368    pub predefined: AllowIP,
369    pub custom_allow: Vec<IpNetwork>,
370    pub custom_block: Vec<IpNetwork>,
371}
372
373impl Default for IpFilter {
374    fn default() -> Self {
375        IpFilter {
376            predefined: AllowIP::All,
377            custom_allow: vec![],
378            custom_block: vec![],
379        }
380    }
381}
382
383impl IpFilter {
384    /// Attempt to parse the peer mode from a string.
385    pub fn parse(s: &str) -> Result<IpFilter, IpNetworkError> {
386        let mut filter = IpFilter::default();
387        for f in s.split_whitespace() {
388            match f {
389                "all" => filter.predefined = AllowIP::All,
390                "private" => filter.predefined = AllowIP::Private,
391                "public" => filter.predefined = AllowIP::Public,
392                "none" => filter.predefined = AllowIP::None,
393                custom => {
394                    if custom.starts_with('-') {
395                        filter.custom_block.push(IpNetwork::from_str(
396                            &custom.to_owned().split_off(1),
397                        )?)
398                    } else {
399                        filter.custom_allow.push(IpNetwork::from_str(custom)?)
400                    }
401                }
402            }
403        }
404        Ok(filter)
405    }
406}
407
408/// IP filter
409#[derive(Clone, Debug, PartialEq, Eq)]
410pub enum AllowIP {
411    /// Connect to any address
412    All,
413    /// Connect to private network only
414    Private,
415    /// Connect to public network only
416    Public,
417    /// Block all addresses
418    None,
419}
420
421pub fn parse_msg_id_leb128_2_bytes_at_most(msg: &mut &[u8]) -> u16 {
422    let buf = *msg;
423
424    let mut ret = 0;
425    let mut pos = buf.len() - 1;
426
427    let byte = buf[pos] as u16;
428    ret |= byte & 0x7f;
429    if byte & 0x80 != 0 {
430        pos -= 1;
431        let byte = buf[pos] as u16;
432        ret |= (byte & 0x7f) << 7;
433    }
434
435    *msg = &buf[..pos];
436
437    ret
438}