network/
lib.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use cfxkey as keylib;
6use io as iolib;
7use keccak_hash as hash;
8
9pub const PROTOCOL_ID_SIZE: usize = 3;
10pub type ProtocolId = [u8; PROTOCOL_ID_SIZE];
11pub type HandlerWorkType = u8;
12pub type PeerId = usize;
13
14mod connection;
15mod discovery;
16mod error;
17mod handshake;
18mod ip;
19mod ip_utils;
20mod node_database;
21pub mod node_table;
22pub mod service;
23mod session;
24mod session_manager;
25pub mod throttling;
26
27pub use crate::{
28    error::{DisconnectReason, Error, ThrottlingReason},
29    ip::SessionIpLimitConfig,
30    node_table::Node,
31    service::NetworkService,
32    session::SessionDetails,
33};
34pub use io::TimerToken;
35
36use crate::{
37    node_table::NodeId,
38    service::{
39        ProtocolVersion, DEFAULT_CONNECTION_LIFETIME_FOR_PROMOTION,
40        DEFAULT_DISCOVERY_REFRESH_TIMEOUT, DEFAULT_DISCOVERY_ROUND_TIMEOUT,
41        DEFAULT_FAST_DISCOVERY_REFRESH_TIMEOUT, DEFAULT_HOUSEKEEPING_TIMEOUT,
42        DEFAULT_NODE_TABLE_TIMEOUT,
43    },
44};
45use cfx_addr::Network;
46use diem_types::validator_config::{ConsensusPublicKey, ConsensusVRFPublicKey};
47use ipnetwork::{IpNetwork, IpNetworkError};
48use keylib::Secret;
49use priority_send_queue::SendQueuePriority;
50use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream};
51use serde_derive::{Deserialize, Serialize};
52use std::{
53    cmp::Ordering,
54    net::{Ipv4Addr, SocketAddr, SocketAddrV4},
55    str::{self, FromStr},
56    sync::Arc,
57    time::{Duration, SystemTime, UNIX_EPOCH},
58};
59
60pub const NODE_TAG_NODE_TYPE: &str = "node_type";
61pub const NODE_TAG_ARCHIVE: &str = "archive";
62pub const NODE_TAG_FULL: &str = "full";
63
64#[derive(Debug, Clone, PartialEq)]
65pub struct NetworkConfiguration {
66    pub is_consortium: bool,
67    /// Network identifier
68    pub id: u64,
69    network_type: Network,
70    /// Directory path to store general network configuration. None means
71    /// nothing will be saved
72    pub config_path: Option<String>,
73    pub listen_address: Option<SocketAddr>,
74    /// IP address to advertise. Detected automatically if none.
75    pub public_address: Option<SocketAddr>,
76    pub udp_port: Option<u16>,
77    /// Enable NAT configuration
78    pub nat_enabled: bool,
79    /// Enable discovery
80    pub discovery_enabled: bool,
81    pub boot_nodes: Vec<String>,
82    /// Use provided node key instead of default
83    pub use_secret: Option<Secret>,
84    /// Maximum number of outgoing peers
85    pub max_outgoing_peers: usize,
86    /// Maximum number of outgoing connections to archive nodes. 0 represents
87    /// not required to connect to archive nodes. E.g. light node or full node
88    /// need not to connect to archive nodes.
89    pub max_outgoing_peers_archive: usize,
90    /// Maximum number of incoming peers
91    pub max_incoming_peers: usize,
92    /// Maximum number of ongoing handshakes
93    pub max_handshakes: usize,
94    /// List of reserved node addresses.
95    pub reserved_nodes: Vec<String>,
96    /// IP filter
97    pub ip_filter: IpFilter,
98    /// Timeout duration for initiating peer connection management
99    pub housekeeping_timeout: Duration,
100    /// Timeout duration for refreshing discovery protocol
101    /// when there are enough outgoing connections
102    pub discovery_refresh_timeout: Duration,
103    /// Timeout duration for refreshing discovery protocol
104    /// when there are NOT enough outgoing connections
105    pub fast_discovery_refresh_timeout: Duration,
106    /// Period between consecutive rounds of the same current discovery process
107    pub discovery_round_timeout: Duration,
108    /// Timeout duration for persisting node table
109    pub node_table_timeout: Duration,
110    /// Connection lifetime threshold for promotion
111    pub connection_lifetime_for_promotion: Duration,
112    pub test_mode: bool,
113    /// Maximum number of P2P nodes for subnet B (ip/16).
114    pub subnet_quota: usize,
115    pub session_ip_limit_config: SessionIpLimitConfig,
116
117    pub discovery_config: DiscoveryConfiguration,
118}
119
120impl NetworkConfiguration {
121    pub fn new(id: u64, discovery_config: DiscoveryConfiguration) -> Self {
122        let network_type = Self::network_id_to_known_cfx_network(id);
123
124        NetworkConfiguration {
125            is_consortium: false,
126            id,
127            network_type,
128            config_path: Some("./net_config".to_string()),
129            listen_address: None,
130            public_address: None,
131            udp_port: None,
132            nat_enabled: true,
133            discovery_enabled: false,
134            boot_nodes: Vec::new(),
135            use_secret: None,
136            max_outgoing_peers: 0,
137            max_outgoing_peers_archive: 0,
138            max_incoming_peers: 0,
139            max_handshakes: 0,
140            reserved_nodes: Vec::new(),
141            ip_filter: IpFilter::default(),
142            housekeeping_timeout: DEFAULT_HOUSEKEEPING_TIMEOUT,
143            discovery_refresh_timeout: DEFAULT_DISCOVERY_REFRESH_TIMEOUT,
144            fast_discovery_refresh_timeout:
145                DEFAULT_FAST_DISCOVERY_REFRESH_TIMEOUT,
146            discovery_round_timeout: DEFAULT_DISCOVERY_ROUND_TIMEOUT,
147            node_table_timeout: DEFAULT_NODE_TABLE_TIMEOUT,
148            connection_lifetime_for_promotion:
149                DEFAULT_CONNECTION_LIFETIME_FOR_PROMOTION,
150            test_mode: false,
151            subnet_quota: 32,
152            session_ip_limit_config: SessionIpLimitConfig::default(),
153            discovery_config,
154        }
155    }
156
157    pub fn new_with_port(
158        id: u64, port: u16, discovery_config: DiscoveryConfiguration,
159    ) -> NetworkConfiguration {
160        let mut config = NetworkConfiguration::new(id, discovery_config);
161        config.listen_address = Some(SocketAddr::V4(SocketAddrV4::new(
162            Ipv4Addr::new(0, 0, 0, 0),
163            port,
164        )));
165        config
166    }
167
168    pub fn get_network_type(&self) -> &Network { &self.network_type }
169
170    pub fn network_id_to_known_cfx_network(id: u64) -> Network {
171        match id {
172            1 => Network::Test,
173            1029 => Network::Main,
174            n => Network::Id(n),
175        }
176    }
177}
178
179#[derive(Clone, Debug, PartialEq, Default)]
180pub struct DiscoveryConfiguration {
181    pub discover_node_count: u32,
182    pub expire_time: Duration,
183    pub find_node_timeout: Duration,
184    pub max_nodes_ping: usize,
185    pub ping_timeout: Duration,
186    pub throttling_interval: Duration,
187    pub throttling_limit_ping: usize,
188    pub throttling_limit_find_nodes: usize,
189}
190
191impl DiscoveryConfiguration {
192    fn expire_timestamp(&self) -> u64 {
193        (SystemTime::now() + self.expire_time)
194            .duration_since(UNIX_EPOCH)
195            .unwrap_or_default()
196            .as_secs()
197    }
198}
199
200/// Type of NAT resolving method
201#[derive(Debug, PartialEq, Eq, Clone)]
202pub enum NatType {
203    Nothing,
204    Any,
205    UPnP,
206    NatPMP,
207}
208
209#[derive(Clone)]
210pub enum NetworkIoMessage {
211    Start,
212    AddHandler {
213        handler: Arc<dyn NetworkProtocolHandler + Sync>,
214        protocol: ProtocolId,
215        version: ProtocolVersion,
216        callback: std::sync::mpsc::SyncSender<()>,
217    },
218    /// Register a new protocol timer
219    AddTimer {
220        /// Protocol Id.
221        protocol: ProtocolId,
222        /// Timer token.
223        token: TimerToken,
224        /// Timer delay.
225        delay: Duration,
226    },
227    DispatchWork {
228        /// Protocol Id.
229        protocol: ProtocolId,
230        /// Work type.
231        work_type: HandlerWorkType,
232    },
233    HandleProtocolMessage {
234        protocol: ProtocolId,
235        peer: PeerId,
236        node_id: NodeId,
237        data: Vec<u8>,
238    },
239}
240
241pub trait NetworkProtocolHandler: Sync + Send {
242    fn minimum_supported_version(&self) -> ProtocolVersion;
243
244    fn initialize(&self, _io: &dyn NetworkContext);
245
246    fn on_message(
247        &self, io: &dyn NetworkContext, node_id: &NodeId, data: &[u8],
248    );
249
250    fn on_peer_connected(
251        &self, io: &dyn NetworkContext, node_id: &NodeId,
252        peer_protocol_version: ProtocolVersion,
253        pos_public_key: Option<(ConsensusPublicKey, ConsensusVRFPublicKey)>,
254    );
255
256    fn on_peer_disconnected(&self, io: &dyn NetworkContext, node_id: &NodeId);
257
258    fn on_timeout(&self, io: &dyn NetworkContext, timer: TimerToken);
259
260    fn send_local_message(&self, _io: &dyn NetworkContext, _message: Vec<u8>);
261
262    fn on_work_dispatch(
263        &self, _io: &dyn NetworkContext, _work_type: HandlerWorkType,
264    );
265}
266
267#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
268pub enum UpdateNodeOperation {
269    Failure,
270    Demotion,
271    Remove,
272}
273
274pub trait NetworkContext {
275    fn get_protocol(&self) -> ProtocolId;
276
277    fn get_peer_connection_origin(&self, node_id: &NodeId) -> Option<bool>;
278
279    fn send(
280        &self, node_id: &NodeId, msg: Vec<u8>,
281        min_protocol_version: ProtocolVersion,
282        version_valid_till: ProtocolVersion, priority: SendQueuePriority,
283    ) -> Result<(), Error>;
284
285    fn disconnect_peer(
286        &self, node_id: &NodeId, op: Option<UpdateNodeOperation>, reason: &str,
287    );
288
289    /// Register a new IO timer. 'IoHandler::timeout' will be called with the
290    /// token.
291    fn register_timer(
292        &self, token: TimerToken, delay: Duration,
293    ) -> Result<(), Error>;
294
295    fn dispatch_work(&self, work_type: HandlerWorkType);
296
297    fn insert_peer_node_tag(&self, peer: NodeId, key: &str, value: &str);
298
299    fn is_peer_self(&self, _node_id: &NodeId) -> bool;
300
301    fn self_node_id(&self) -> NodeId;
302}
303
304#[derive(Debug, Clone)]
305pub struct SessionMetadata {
306    pub id: Option<NodeId>,
307    /// There won't be many protocols so it's faster to use Vec than Map.
308    pub peer_protocols: Vec<ProtocolInfo>,
309    pub originated: bool,
310    /// Packet header version of the peer.
311    pub peer_header_version: u8,
312}
313
314#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
315pub struct ProtocolInfo {
316    pub protocol: ProtocolId,
317    pub version: ProtocolVersion,
318}
319
320impl Encodable for ProtocolInfo {
321    fn rlp_append(&self, rlp: &mut RlpStream) {
322        rlp.begin_list(2);
323        rlp.append(&&self.protocol[..]);
324        rlp.append(&self.version);
325    }
326}
327
328impl Decodable for ProtocolInfo {
329    fn decode(rlp: &Rlp) -> Result<Self, DecoderError> {
330        let p: Vec<u8> = rlp.val_at(0)?;
331        if p.len() != 3 {
332            return Err(DecoderError::Custom(
333                "Invalid subprotocol string length",
334            ));
335        }
336        let mut protocol: ProtocolId = [0u8; 3];
337        protocol.clone_from_slice(&p);
338        Ok(ProtocolInfo {
339            protocol,
340            version: rlp.val_at(1)?,
341        })
342    }
343}
344
345impl PartialOrd for ProtocolInfo {
346    fn partial_cmp(&self, other: &ProtocolInfo) -> Option<Ordering> {
347        Some(self.cmp(other))
348    }
349}
350
351impl Ord for ProtocolInfo {
352    fn cmp(&self, other: &ProtocolInfo) -> Ordering {
353        self.protocol.cmp(&other.protocol)
354    }
355}
356
357#[derive(Serialize, Deserialize, Clone)]
358pub struct PeerInfo {
359    pub id: PeerId,
360    pub addr: SocketAddr,
361    pub nodeid: NodeId,
362    pub protocols: Vec<ProtocolInfo>,
363}
364
365#[derive(Clone, Debug, PartialEq, Eq)]
366pub struct IpFilter {
367    pub predefined: AllowIP,
368    pub custom_allow: Vec<IpNetwork>,
369    pub custom_block: Vec<IpNetwork>,
370}
371
372impl Default for IpFilter {
373    fn default() -> Self {
374        IpFilter {
375            predefined: AllowIP::All,
376            custom_allow: vec![],
377            custom_block: vec![],
378        }
379    }
380}
381
382impl IpFilter {
383    /// Attempt to parse the peer mode from a string.
384    pub fn parse(s: &str) -> Result<IpFilter, IpNetworkError> {
385        let mut filter = IpFilter::default();
386        for f in s.split_whitespace() {
387            match f {
388                "all" => filter.predefined = AllowIP::All,
389                "private" => filter.predefined = AllowIP::Private,
390                "public" => filter.predefined = AllowIP::Public,
391                "none" => filter.predefined = AllowIP::None,
392                custom => {
393                    if custom.starts_with('-') {
394                        filter.custom_block.push(IpNetwork::from_str(
395                            &custom.to_owned().split_off(1),
396                        )?)
397                    } else {
398                        filter.custom_allow.push(IpNetwork::from_str(custom)?)
399                    }
400                }
401            }
402        }
403        Ok(filter)
404    }
405}
406
407/// IP filter
408#[derive(Clone, Debug, PartialEq, Eq)]
409pub enum AllowIP {
410    /// Connect to any address
411    All,
412    /// Connect to private network only
413    Private,
414    /// Connect to public network only
415    Public,
416    /// Block all addresses
417    None,
418}
419
420pub fn parse_msg_id_leb128_2_bytes_at_most(
421    msg: &mut &[u8],
422) -> Result<u16, &'static str> {
423    let buf = *msg;
424
425    if buf.is_empty() {
426        return Err("empty buffer");
427    }
428
429    let mut ret = 0;
430    let mut pos = buf.len() - 1;
431
432    let byte = buf[pos] as u16;
433    ret |= byte & 0x7f;
434    if byte & 0x80 != 0 {
435        if pos == 0 {
436            return Err("leb128 continuation bit set but no second byte");
437        }
438        pos -= 1;
439        let byte = buf[pos] as u16;
440        ret |= (byte & 0x7f) << 7;
441    }
442
443    *msg = &buf[..pos];
444
445    Ok(ret)
446}