1use cfxkey as keylib;
6use io as iolib;
7use keccak_hash as hash;
8
9pub const PROTOCOL_ID_SIZE: usize = 3;
10pub type ProtocolId = [u8; PROTOCOL_ID_SIZE];
11pub type HandlerWorkType = u8;
12pub type PeerId = usize;
13
14mod connection;
15mod discovery;
16mod error;
17mod handshake;
18mod ip;
19mod ip_utils;
20mod node_database;
21pub mod node_table;
22pub mod service;
23mod session;
24mod session_manager;
25pub mod throttling;
26
27pub use crate::{
28 error::{DisconnectReason, Error, ThrottlingReason},
29 ip::SessionIpLimitConfig,
30 node_table::Node,
31 service::NetworkService,
32 session::SessionDetails,
33};
34pub use io::TimerToken;
35
36use crate::{
37 node_table::NodeId,
38 service::{
39 ProtocolVersion, DEFAULT_CONNECTION_LIFETIME_FOR_PROMOTION,
40 DEFAULT_DISCOVERY_REFRESH_TIMEOUT, DEFAULT_DISCOVERY_ROUND_TIMEOUT,
41 DEFAULT_FAST_DISCOVERY_REFRESH_TIMEOUT, DEFAULT_HOUSEKEEPING_TIMEOUT,
42 DEFAULT_NODE_TABLE_TIMEOUT,
43 },
44};
45use cfx_addr::Network;
46use diem_types::validator_config::{ConsensusPublicKey, ConsensusVRFPublicKey};
47use ipnetwork::{IpNetwork, IpNetworkError};
48use keylib::Secret;
49use priority_send_queue::SendQueuePriority;
50use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream};
51use serde_derive::{Deserialize, Serialize};
52use std::{
53 cmp::Ordering,
54 net::{Ipv4Addr, SocketAddr, SocketAddrV4},
55 str::{self, FromStr},
56 sync::Arc,
57 time::{Duration, SystemTime, UNIX_EPOCH},
58};
59
60pub const NODE_TAG_NODE_TYPE: &str = "node_type";
61pub const NODE_TAG_ARCHIVE: &str = "archive";
62pub const NODE_TAG_FULL: &str = "full";
63
64#[derive(Debug, Clone, PartialEq)]
65pub struct NetworkConfiguration {
66 pub is_consortium: bool,
67 pub id: u64,
69 network_type: Network,
70 pub config_path: Option<String>,
73 pub listen_address: Option<SocketAddr>,
74 pub public_address: Option<SocketAddr>,
76 pub udp_port: Option<u16>,
77 pub nat_enabled: bool,
79 pub discovery_enabled: bool,
81 pub boot_nodes: Vec<String>,
82 pub use_secret: Option<Secret>,
84 pub max_outgoing_peers: usize,
86 pub max_outgoing_peers_archive: usize,
90 pub max_incoming_peers: usize,
92 pub max_handshakes: usize,
94 pub reserved_nodes: Vec<String>,
96 pub ip_filter: IpFilter,
98 pub housekeeping_timeout: Duration,
100 pub discovery_refresh_timeout: Duration,
103 pub fast_discovery_refresh_timeout: Duration,
106 pub discovery_round_timeout: Duration,
108 pub node_table_timeout: Duration,
110 pub connection_lifetime_for_promotion: Duration,
112 pub test_mode: bool,
113 pub subnet_quota: usize,
115 pub session_ip_limit_config: SessionIpLimitConfig,
116
117 pub discovery_config: DiscoveryConfiguration,
118}
119
120impl NetworkConfiguration {
121 pub fn new(id: u64, discovery_config: DiscoveryConfiguration) -> Self {
122 let network_type = Self::network_id_to_known_cfx_network(id);
123
124 NetworkConfiguration {
125 is_consortium: false,
126 id,
127 network_type,
128 config_path: Some("./net_config".to_string()),
129 listen_address: None,
130 public_address: None,
131 udp_port: None,
132 nat_enabled: true,
133 discovery_enabled: false,
134 boot_nodes: Vec::new(),
135 use_secret: None,
136 max_outgoing_peers: 0,
137 max_outgoing_peers_archive: 0,
138 max_incoming_peers: 0,
139 max_handshakes: 0,
140 reserved_nodes: Vec::new(),
141 ip_filter: IpFilter::default(),
142 housekeeping_timeout: DEFAULT_HOUSEKEEPING_TIMEOUT,
143 discovery_refresh_timeout: DEFAULT_DISCOVERY_REFRESH_TIMEOUT,
144 fast_discovery_refresh_timeout:
145 DEFAULT_FAST_DISCOVERY_REFRESH_TIMEOUT,
146 discovery_round_timeout: DEFAULT_DISCOVERY_ROUND_TIMEOUT,
147 node_table_timeout: DEFAULT_NODE_TABLE_TIMEOUT,
148 connection_lifetime_for_promotion:
149 DEFAULT_CONNECTION_LIFETIME_FOR_PROMOTION,
150 test_mode: false,
151 subnet_quota: 32,
152 session_ip_limit_config: SessionIpLimitConfig::default(),
153 discovery_config,
154 }
155 }
156
157 pub fn new_with_port(
158 id: u64, port: u16, discovery_config: DiscoveryConfiguration,
159 ) -> NetworkConfiguration {
160 let mut config = NetworkConfiguration::new(id, discovery_config);
161 config.listen_address = Some(SocketAddr::V4(SocketAddrV4::new(
162 Ipv4Addr::new(0, 0, 0, 0),
163 port,
164 )));
165 config
166 }
167
168 pub fn get_network_type(&self) -> &Network { &self.network_type }
169
170 pub fn network_id_to_known_cfx_network(id: u64) -> Network {
171 match id {
172 1 => Network::Test,
173 1029 => Network::Main,
174 n => Network::Id(n),
175 }
176 }
177}
178
179#[derive(Clone, Debug, PartialEq, Default)]
180pub struct DiscoveryConfiguration {
181 pub discover_node_count: u32,
182 pub expire_time: Duration,
183 pub find_node_timeout: Duration,
184 pub max_nodes_ping: usize,
185 pub ping_timeout: Duration,
186 pub throttling_interval: Duration,
187 pub throttling_limit_ping: usize,
188 pub throttling_limit_find_nodes: usize,
189}
190
191impl DiscoveryConfiguration {
192 fn expire_timestamp(&self) -> u64 {
193 (SystemTime::now() + self.expire_time)
194 .duration_since(UNIX_EPOCH)
195 .unwrap_or_default()
196 .as_secs()
197 }
198}
199
200#[derive(Debug, PartialEq, Eq, Clone)]
202pub enum NatType {
203 Nothing,
204 Any,
205 UPnP,
206 NatPMP,
207}
208
209#[derive(Clone)]
210pub enum NetworkIoMessage {
211 Start,
212 AddHandler {
213 handler: Arc<dyn NetworkProtocolHandler + Sync>,
214 protocol: ProtocolId,
215 version: ProtocolVersion,
216 callback: std::sync::mpsc::SyncSender<()>,
217 },
218 AddTimer {
220 protocol: ProtocolId,
222 token: TimerToken,
224 delay: Duration,
226 },
227 DispatchWork {
228 protocol: ProtocolId,
230 work_type: HandlerWorkType,
232 },
233 HandleProtocolMessage {
234 protocol: ProtocolId,
235 peer: PeerId,
236 node_id: NodeId,
237 data: Vec<u8>,
238 },
239}
240
241pub trait NetworkProtocolHandler: Sync + Send {
242 fn minimum_supported_version(&self) -> ProtocolVersion;
243
244 fn initialize(&self, _io: &dyn NetworkContext);
245
246 fn on_message(
247 &self, io: &dyn NetworkContext, node_id: &NodeId, data: &[u8],
248 );
249
250 fn on_peer_connected(
251 &self, io: &dyn NetworkContext, node_id: &NodeId,
252 peer_protocol_version: ProtocolVersion,
253 pos_public_key: Option<(ConsensusPublicKey, ConsensusVRFPublicKey)>,
254 );
255
256 fn on_peer_disconnected(&self, io: &dyn NetworkContext, node_id: &NodeId);
257
258 fn on_timeout(&self, io: &dyn NetworkContext, timer: TimerToken);
259
260 fn send_local_message(&self, _io: &dyn NetworkContext, _message: Vec<u8>);
261
262 fn on_work_dispatch(
263 &self, _io: &dyn NetworkContext, _work_type: HandlerWorkType,
264 );
265}
266
267#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
268pub enum UpdateNodeOperation {
269 Failure,
270 Demotion,
271 Remove,
272}
273
274pub trait NetworkContext {
275 fn get_protocol(&self) -> ProtocolId;
276
277 fn get_peer_connection_origin(&self, node_id: &NodeId) -> Option<bool>;
278
279 fn send(
280 &self, node_id: &NodeId, msg: Vec<u8>,
281 min_protocol_version: ProtocolVersion,
282 version_valid_till: ProtocolVersion, priority: SendQueuePriority,
283 ) -> Result<(), Error>;
284
285 fn disconnect_peer(
286 &self, node_id: &NodeId, op: Option<UpdateNodeOperation>, reason: &str,
287 );
288
289 fn register_timer(
292 &self, token: TimerToken, delay: Duration,
293 ) -> Result<(), Error>;
294
295 fn dispatch_work(&self, work_type: HandlerWorkType);
296
297 fn insert_peer_node_tag(&self, peer: NodeId, key: &str, value: &str);
298
299 fn is_peer_self(&self, _node_id: &NodeId) -> bool;
300
301 fn self_node_id(&self) -> NodeId;
302}
303
304#[derive(Debug, Clone)]
305pub struct SessionMetadata {
306 pub id: Option<NodeId>,
307 pub peer_protocols: Vec<ProtocolInfo>,
309 pub originated: bool,
310 pub peer_header_version: u8,
312}
313
314#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
315pub struct ProtocolInfo {
316 pub protocol: ProtocolId,
317 pub version: ProtocolVersion,
318}
319
320impl Encodable for ProtocolInfo {
321 fn rlp_append(&self, rlp: &mut RlpStream) {
322 rlp.begin_list(2);
323 rlp.append(&&self.protocol[..]);
324 rlp.append(&self.version);
325 }
326}
327
328impl Decodable for ProtocolInfo {
329 fn decode(rlp: &Rlp) -> Result<Self, DecoderError> {
330 let p: Vec<u8> = rlp.val_at(0)?;
331 if p.len() != 3 {
332 return Err(DecoderError::Custom(
333 "Invalid subprotocol string length",
334 ));
335 }
336 let mut protocol: ProtocolId = [0u8; 3];
337 protocol.clone_from_slice(&p);
338 Ok(ProtocolInfo {
339 protocol,
340 version: rlp.val_at(1)?,
341 })
342 }
343}
344
345impl PartialOrd for ProtocolInfo {
346 fn partial_cmp(&self, other: &ProtocolInfo) -> Option<Ordering> {
347 Some(self.cmp(other))
348 }
349}
350
351impl Ord for ProtocolInfo {
352 fn cmp(&self, other: &ProtocolInfo) -> Ordering {
353 self.protocol.cmp(&other.protocol)
354 }
355}
356
357#[derive(Serialize, Deserialize, Clone)]
358pub struct PeerInfo {
359 pub id: PeerId,
360 pub addr: SocketAddr,
361 pub nodeid: NodeId,
362 pub protocols: Vec<ProtocolInfo>,
363}
364
365#[derive(Clone, Debug, PartialEq, Eq)]
366pub struct IpFilter {
367 pub predefined: AllowIP,
368 pub custom_allow: Vec<IpNetwork>,
369 pub custom_block: Vec<IpNetwork>,
370}
371
372impl Default for IpFilter {
373 fn default() -> Self {
374 IpFilter {
375 predefined: AllowIP::All,
376 custom_allow: vec![],
377 custom_block: vec![],
378 }
379 }
380}
381
382impl IpFilter {
383 pub fn parse(s: &str) -> Result<IpFilter, IpNetworkError> {
385 let mut filter = IpFilter::default();
386 for f in s.split_whitespace() {
387 match f {
388 "all" => filter.predefined = AllowIP::All,
389 "private" => filter.predefined = AllowIP::Private,
390 "public" => filter.predefined = AllowIP::Public,
391 "none" => filter.predefined = AllowIP::None,
392 custom => {
393 if custom.starts_with('-') {
394 filter.custom_block.push(IpNetwork::from_str(
395 &custom.to_owned().split_off(1),
396 )?)
397 } else {
398 filter.custom_allow.push(IpNetwork::from_str(custom)?)
399 }
400 }
401 }
402 }
403 Ok(filter)
404 }
405}
406
407#[derive(Clone, Debug, PartialEq, Eq)]
409pub enum AllowIP {
410 All,
412 Private,
414 Public,
416 None,
418}
419
420pub fn parse_msg_id_leb128_2_bytes_at_most(
421 msg: &mut &[u8],
422) -> Result<u16, &'static str> {
423 let buf = *msg;
424
425 if buf.is_empty() {
426 return Err("empty buffer");
427 }
428
429 let mut ret = 0;
430 let mut pos = buf.len() - 1;
431
432 let byte = buf[pos] as u16;
433 ret |= byte & 0x7f;
434 if byte & 0x80 != 0 {
435 if pos == 0 {
436 return Err("leb128 continuation bit set but no second byte");
437 }
438 pos -= 1;
439 let byte = buf[pos] as u16;
440 ret |= (byte & 0x7f) << 7;
441 }
442
443 *msg = &buf[..pos];
444
445 Ok(ret)
446}