1#![allow(deprecated)]
6use cfxkey as keylib;
7use io as iolib;
8use keccak_hash as hash;
9
10pub const PROTOCOL_ID_SIZE: usize = 3;
11pub type ProtocolId = [u8; PROTOCOL_ID_SIZE];
12pub type HandlerWorkType = u8;
13pub type PeerId = usize;
14
15mod connection;
16mod discovery;
17mod error;
18mod handshake;
19mod ip;
20mod ip_utils;
21mod node_database;
22pub mod node_table;
23pub mod service;
24mod session;
25mod session_manager;
26pub mod throttling;
27
28pub use crate::{
29 error::{DisconnectReason, Error, ThrottlingReason},
30 ip::SessionIpLimitConfig,
31 node_table::Node,
32 service::NetworkService,
33 session::SessionDetails,
34};
35pub use io::TimerToken;
36
37use crate::{
38 node_table::NodeId,
39 service::{
40 ProtocolVersion, DEFAULT_CONNECTION_LIFETIME_FOR_PROMOTION,
41 DEFAULT_DISCOVERY_REFRESH_TIMEOUT, DEFAULT_DISCOVERY_ROUND_TIMEOUT,
42 DEFAULT_FAST_DISCOVERY_REFRESH_TIMEOUT, DEFAULT_HOUSEKEEPING_TIMEOUT,
43 DEFAULT_NODE_TABLE_TIMEOUT,
44 },
45};
46use cfx_addr::Network;
47use diem_types::validator_config::{ConsensusPublicKey, ConsensusVRFPublicKey};
48use ipnetwork::{IpNetwork, IpNetworkError};
49use keylib::Secret;
50use priority_send_queue::SendQueuePriority;
51use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream};
52use serde_derive::{Deserialize, Serialize};
53use std::{
54 cmp::Ordering,
55 net::{Ipv4Addr, SocketAddr, SocketAddrV4},
56 str::{self, FromStr},
57 sync::Arc,
58 time::{Duration, SystemTime, UNIX_EPOCH},
59};
60
61pub const NODE_TAG_NODE_TYPE: &str = "node_type";
62pub const NODE_TAG_ARCHIVE: &str = "archive";
63pub const NODE_TAG_FULL: &str = "full";
64
65#[derive(Debug, Clone, PartialEq)]
66pub struct NetworkConfiguration {
67 pub is_consortium: bool,
68 pub id: u64,
70 network_type: Network,
71 pub config_path: Option<String>,
74 pub listen_address: Option<SocketAddr>,
75 pub public_address: Option<SocketAddr>,
77 pub udp_port: Option<u16>,
78 pub nat_enabled: bool,
80 pub discovery_enabled: bool,
82 pub boot_nodes: Vec<String>,
83 pub use_secret: Option<Secret>,
85 pub max_outgoing_peers: usize,
87 pub max_outgoing_peers_archive: usize,
91 pub max_incoming_peers: usize,
93 pub max_handshakes: usize,
95 pub reserved_nodes: Vec<String>,
97 pub ip_filter: IpFilter,
99 pub housekeeping_timeout: Duration,
101 pub discovery_refresh_timeout: Duration,
104 pub fast_discovery_refresh_timeout: Duration,
107 pub discovery_round_timeout: Duration,
109 pub node_table_timeout: Duration,
111 pub connection_lifetime_for_promotion: Duration,
113 pub test_mode: bool,
114 pub subnet_quota: usize,
116 pub session_ip_limit_config: SessionIpLimitConfig,
117
118 pub discovery_config: DiscoveryConfiguration,
119}
120
121impl NetworkConfiguration {
122 pub fn new(id: u64, discovery_config: DiscoveryConfiguration) -> Self {
123 let network_type = Self::network_id_to_known_cfx_network(id);
124
125 NetworkConfiguration {
126 is_consortium: false,
127 id,
128 network_type,
129 config_path: Some("./net_config".to_string()),
130 listen_address: None,
131 public_address: None,
132 udp_port: None,
133 nat_enabled: true,
134 discovery_enabled: false,
135 boot_nodes: Vec::new(),
136 use_secret: None,
137 max_outgoing_peers: 0,
138 max_outgoing_peers_archive: 0,
139 max_incoming_peers: 0,
140 max_handshakes: 0,
141 reserved_nodes: Vec::new(),
142 ip_filter: IpFilter::default(),
143 housekeeping_timeout: DEFAULT_HOUSEKEEPING_TIMEOUT,
144 discovery_refresh_timeout: DEFAULT_DISCOVERY_REFRESH_TIMEOUT,
145 fast_discovery_refresh_timeout:
146 DEFAULT_FAST_DISCOVERY_REFRESH_TIMEOUT,
147 discovery_round_timeout: DEFAULT_DISCOVERY_ROUND_TIMEOUT,
148 node_table_timeout: DEFAULT_NODE_TABLE_TIMEOUT,
149 connection_lifetime_for_promotion:
150 DEFAULT_CONNECTION_LIFETIME_FOR_PROMOTION,
151 test_mode: false,
152 subnet_quota: 32,
153 session_ip_limit_config: SessionIpLimitConfig::default(),
154 discovery_config,
155 }
156 }
157
158 pub fn new_with_port(
159 id: u64, port: u16, discovery_config: DiscoveryConfiguration,
160 ) -> NetworkConfiguration {
161 let mut config = NetworkConfiguration::new(id, discovery_config);
162 config.listen_address = Some(SocketAddr::V4(SocketAddrV4::new(
163 Ipv4Addr::new(0, 0, 0, 0),
164 port,
165 )));
166 config
167 }
168
169 pub fn get_network_type(&self) -> &Network { &self.network_type }
170
171 pub fn network_id_to_known_cfx_network(id: u64) -> Network {
172 match id {
173 1 => Network::Test,
174 1029 => Network::Main,
175 n => Network::Id(n),
176 }
177 }
178}
179
180#[derive(Clone, Debug, PartialEq, Default)]
181pub struct DiscoveryConfiguration {
182 pub discover_node_count: u32,
183 pub expire_time: Duration,
184 pub find_node_timeout: Duration,
185 pub max_nodes_ping: usize,
186 pub ping_timeout: Duration,
187 pub throttling_interval: Duration,
188 pub throttling_limit_ping: usize,
189 pub throttling_limit_find_nodes: usize,
190}
191
192impl DiscoveryConfiguration {
193 fn expire_timestamp(&self) -> u64 {
194 (SystemTime::now() + self.expire_time)
195 .duration_since(UNIX_EPOCH)
196 .unwrap_or_default()
197 .as_secs()
198 }
199}
200
201#[derive(Debug, PartialEq, Eq, Clone)]
203pub enum NatType {
204 Nothing,
205 Any,
206 UPnP,
207 NatPMP,
208}
209
210#[derive(Clone)]
211pub enum NetworkIoMessage {
212 Start,
213 AddHandler {
214 handler: Arc<dyn NetworkProtocolHandler + Sync>,
215 protocol: ProtocolId,
216 version: ProtocolVersion,
217 callback: std::sync::mpsc::SyncSender<()>,
218 },
219 AddTimer {
221 protocol: ProtocolId,
223 token: TimerToken,
225 delay: Duration,
227 },
228 DispatchWork {
229 protocol: ProtocolId,
231 work_type: HandlerWorkType,
233 },
234 HandleProtocolMessage {
235 protocol: ProtocolId,
236 peer: PeerId,
237 node_id: NodeId,
238 data: Vec<u8>,
239 },
240}
241
242pub trait NetworkProtocolHandler: Sync + Send {
243 fn minimum_supported_version(&self) -> ProtocolVersion;
244
245 fn initialize(&self, _io: &dyn NetworkContext);
246
247 fn on_message(
248 &self, io: &dyn NetworkContext, node_id: &NodeId, data: &[u8],
249 );
250
251 fn on_peer_connected(
252 &self, io: &dyn NetworkContext, node_id: &NodeId,
253 peer_protocol_version: ProtocolVersion,
254 pos_public_key: Option<(ConsensusPublicKey, ConsensusVRFPublicKey)>,
255 );
256
257 fn on_peer_disconnected(&self, io: &dyn NetworkContext, node_id: &NodeId);
258
259 fn on_timeout(&self, io: &dyn NetworkContext, timer: TimerToken);
260
261 fn send_local_message(&self, _io: &dyn NetworkContext, _message: Vec<u8>);
262
263 fn on_work_dispatch(
264 &self, _io: &dyn NetworkContext, _work_type: HandlerWorkType,
265 );
266}
267
268#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
269pub enum UpdateNodeOperation {
270 Failure,
271 Demotion,
272 Remove,
273}
274
275pub trait NetworkContext {
276 fn get_protocol(&self) -> ProtocolId;
277
278 fn get_peer_connection_origin(&self, node_id: &NodeId) -> Option<bool>;
279
280 fn send(
281 &self, node_id: &NodeId, msg: Vec<u8>,
282 min_protocol_version: ProtocolVersion,
283 version_valid_till: ProtocolVersion, priority: SendQueuePriority,
284 ) -> Result<(), Error>;
285
286 fn disconnect_peer(
287 &self, node_id: &NodeId, op: Option<UpdateNodeOperation>, reason: &str,
288 );
289
290 fn register_timer(
293 &self, token: TimerToken, delay: Duration,
294 ) -> Result<(), Error>;
295
296 fn dispatch_work(&self, work_type: HandlerWorkType);
297
298 fn insert_peer_node_tag(&self, peer: NodeId, key: &str, value: &str);
299
300 fn is_peer_self(&self, _node_id: &NodeId) -> bool;
301
302 fn self_node_id(&self) -> NodeId;
303}
304
305#[derive(Debug, Clone)]
306pub struct SessionMetadata {
307 pub id: Option<NodeId>,
308 pub peer_protocols: Vec<ProtocolInfo>,
310 pub originated: bool,
311 pub peer_header_version: u8,
313}
314
315#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
316pub struct ProtocolInfo {
317 pub protocol: ProtocolId,
318 pub version: ProtocolVersion,
319}
320
321impl Encodable for ProtocolInfo {
322 fn rlp_append(&self, rlp: &mut RlpStream) {
323 rlp.begin_list(2);
324 rlp.append(&&self.protocol[..]);
325 rlp.append(&self.version);
326 }
327}
328
329impl Decodable for ProtocolInfo {
330 fn decode(rlp: &Rlp) -> Result<Self, DecoderError> {
331 let p: Vec<u8> = rlp.val_at(0)?;
332 if p.len() != 3 {
333 return Err(DecoderError::Custom(
334 "Invalid subprotocol string length",
335 ));
336 }
337 let mut protocol: ProtocolId = [0u8; 3];
338 protocol.clone_from_slice(&p);
339 Ok(ProtocolInfo {
340 protocol,
341 version: rlp.val_at(1)?,
342 })
343 }
344}
345
346impl PartialOrd for ProtocolInfo {
347 fn partial_cmp(&self, other: &ProtocolInfo) -> Option<Ordering> {
348 Some(self.cmp(other))
349 }
350}
351
352impl Ord for ProtocolInfo {
353 fn cmp(&self, other: &ProtocolInfo) -> Ordering {
354 self.protocol.cmp(&other.protocol)
355 }
356}
357
358#[derive(Serialize, Deserialize)]
359pub struct PeerInfo {
360 pub id: PeerId,
361 pub addr: SocketAddr,
362 pub nodeid: NodeId,
363 pub protocols: Vec<ProtocolInfo>,
364}
365
366#[derive(Clone, Debug, PartialEq, Eq)]
367pub struct IpFilter {
368 pub predefined: AllowIP,
369 pub custom_allow: Vec<IpNetwork>,
370 pub custom_block: Vec<IpNetwork>,
371}
372
373impl Default for IpFilter {
374 fn default() -> Self {
375 IpFilter {
376 predefined: AllowIP::All,
377 custom_allow: vec![],
378 custom_block: vec![],
379 }
380 }
381}
382
383impl IpFilter {
384 pub fn parse(s: &str) -> Result<IpFilter, IpNetworkError> {
386 let mut filter = IpFilter::default();
387 for f in s.split_whitespace() {
388 match f {
389 "all" => filter.predefined = AllowIP::All,
390 "private" => filter.predefined = AllowIP::Private,
391 "public" => filter.predefined = AllowIP::Public,
392 "none" => filter.predefined = AllowIP::None,
393 custom => {
394 if custom.starts_with('-') {
395 filter.custom_block.push(IpNetwork::from_str(
396 &custom.to_owned().split_off(1),
397 )?)
398 } else {
399 filter.custom_allow.push(IpNetwork::from_str(custom)?)
400 }
401 }
402 }
403 }
404 Ok(filter)
405 }
406}
407
408#[derive(Clone, Debug, PartialEq, Eq)]
410pub enum AllowIP {
411 All,
413 Private,
415 Public,
417 None,
419}
420
421pub fn parse_msg_id_leb128_2_bytes_at_most(msg: &mut &[u8]) -> u16 {
422 let buf = *msg;
423
424 let mut ret = 0;
425 let mut pos = buf.len() - 1;
426
427 let byte = buf[pos] as u16;
428 ret |= byte & 0x7f;
429 if byte & 0x80 != 0 {
430 pos -= 1;
431 let byte = buf[pos] as u16;
432 ret |= (byte & 0x7f) << 7;
433 }
434
435 *msg = &buf[..pos];
436
437 ret
438}