#![allow(deprecated)]
use cfxkey as keylib;
use io as iolib;
use keccak_hash as hash;
pub const PROTOCOL_ID_SIZE: usize = 3;
pub type ProtocolId = [u8; PROTOCOL_ID_SIZE];
pub type HandlerWorkType = u8;
pub type PeerId = usize;
mod connection;
mod discovery;
mod error;
mod handshake;
mod ip;
mod ip_utils;
mod node_database;
pub mod node_table;
pub mod service;
mod session;
mod session_manager;
pub mod throttling;
pub use crate::{
error::{DisconnectReason, Error, ThrottlingReason},
ip::SessionIpLimitConfig,
node_table::Node,
service::NetworkService,
session::SessionDetails,
};
pub use io::TimerToken;
use crate::{
node_table::NodeId,
service::{
ProtocolVersion, DEFAULT_CONNECTION_LIFETIME_FOR_PROMOTION,
DEFAULT_DISCOVERY_REFRESH_TIMEOUT, DEFAULT_DISCOVERY_ROUND_TIMEOUT,
DEFAULT_FAST_DISCOVERY_REFRESH_TIMEOUT, DEFAULT_HOUSEKEEPING_TIMEOUT,
DEFAULT_NODE_TABLE_TIMEOUT,
},
};
use cfx_addr::Network;
use diem_types::validator_config::{ConsensusPublicKey, ConsensusVRFPublicKey};
use ipnetwork::{IpNetwork, IpNetworkError};
use keylib::Secret;
use priority_send_queue::SendQueuePriority;
use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream};
use serde_derive::{Deserialize, Serialize};
use std::{
cmp::Ordering,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
str::{self, FromStr},
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
};
pub const NODE_TAG_NODE_TYPE: &str = "node_type";
pub const NODE_TAG_ARCHIVE: &str = "archive";
pub const NODE_TAG_FULL: &str = "full";
#[derive(Debug, Clone, PartialEq)]
pub struct NetworkConfiguration {
pub is_consortium: bool,
pub id: u64,
network_type: Network,
pub config_path: Option<String>,
pub listen_address: Option<SocketAddr>,
pub public_address: Option<SocketAddr>,
pub udp_port: Option<u16>,
pub nat_enabled: bool,
pub discovery_enabled: bool,
pub boot_nodes: Vec<String>,
pub use_secret: Option<Secret>,
pub max_outgoing_peers: usize,
pub max_outgoing_peers_archive: usize,
pub max_incoming_peers: usize,
pub max_handshakes: usize,
pub reserved_nodes: Vec<String>,
pub ip_filter: IpFilter,
pub housekeeping_timeout: Duration,
pub discovery_refresh_timeout: Duration,
pub fast_discovery_refresh_timeout: Duration,
pub discovery_round_timeout: Duration,
pub node_table_timeout: Duration,
pub connection_lifetime_for_promotion: Duration,
pub test_mode: bool,
pub subnet_quota: usize,
pub session_ip_limit_config: SessionIpLimitConfig,
pub discovery_config: DiscoveryConfiguration,
}
impl NetworkConfiguration {
pub fn new(id: u64, discovery_config: DiscoveryConfiguration) -> Self {
let network_type = Self::network_id_to_known_cfx_network(id);
NetworkConfiguration {
is_consortium: false,
id,
network_type,
config_path: Some("./net_config".to_string()),
listen_address: None,
public_address: None,
udp_port: None,
nat_enabled: true,
discovery_enabled: false,
boot_nodes: Vec::new(),
use_secret: None,
max_outgoing_peers: 0,
max_outgoing_peers_archive: 0,
max_incoming_peers: 0,
max_handshakes: 0,
reserved_nodes: Vec::new(),
ip_filter: IpFilter::default(),
housekeeping_timeout: DEFAULT_HOUSEKEEPING_TIMEOUT,
discovery_refresh_timeout: DEFAULT_DISCOVERY_REFRESH_TIMEOUT,
fast_discovery_refresh_timeout:
DEFAULT_FAST_DISCOVERY_REFRESH_TIMEOUT,
discovery_round_timeout: DEFAULT_DISCOVERY_ROUND_TIMEOUT,
node_table_timeout: DEFAULT_NODE_TABLE_TIMEOUT,
connection_lifetime_for_promotion:
DEFAULT_CONNECTION_LIFETIME_FOR_PROMOTION,
test_mode: false,
subnet_quota: 32,
session_ip_limit_config: SessionIpLimitConfig::default(),
discovery_config,
}
}
pub fn new_with_port(
id: u64, port: u16, discovery_config: DiscoveryConfiguration,
) -> NetworkConfiguration {
let mut config = NetworkConfiguration::new(id, discovery_config);
config.listen_address = Some(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(0, 0, 0, 0),
port,
)));
config
}
pub fn get_network_type(&self) -> &Network { &self.network_type }
pub fn network_id_to_known_cfx_network(id: u64) -> Network {
match id {
1 => Network::Test,
1029 => Network::Main,
n => Network::Id(n),
}
}
}
#[derive(Clone, Debug, PartialEq, Default)]
pub struct DiscoveryConfiguration {
pub discover_node_count: u32,
pub expire_time: Duration,
pub find_node_timeout: Duration,
pub max_nodes_ping: usize,
pub ping_timeout: Duration,
pub throttling_interval: Duration,
pub throttling_limit_ping: usize,
pub throttling_limit_find_nodes: usize,
}
impl DiscoveryConfiguration {
fn expire_timestamp(&self) -> u64 {
(SystemTime::now() + self.expire_time)
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum NatType {
Nothing,
Any,
UPnP,
NatPMP,
}
#[derive(Clone)]
pub enum NetworkIoMessage {
Start,
AddHandler {
handler: Arc<dyn NetworkProtocolHandler + Sync>,
protocol: ProtocolId,
version: ProtocolVersion,
callback: std::sync::mpsc::SyncSender<()>,
},
AddTimer {
protocol: ProtocolId,
token: TimerToken,
delay: Duration,
},
DispatchWork {
protocol: ProtocolId,
work_type: HandlerWorkType,
},
HandleProtocolMessage {
protocol: ProtocolId,
peer: PeerId,
node_id: NodeId,
data: Vec<u8>,
},
}
pub trait NetworkProtocolHandler: Sync + Send {
fn minimum_supported_version(&self) -> ProtocolVersion;
fn initialize(&self, _io: &dyn NetworkContext);
fn on_message(
&self, io: &dyn NetworkContext, node_id: &NodeId, data: &[u8],
);
fn on_peer_connected(
&self, io: &dyn NetworkContext, node_id: &NodeId,
peer_protocol_version: ProtocolVersion,
pos_public_key: Option<(ConsensusPublicKey, ConsensusVRFPublicKey)>,
);
fn on_peer_disconnected(&self, io: &dyn NetworkContext, node_id: &NodeId);
fn on_timeout(&self, io: &dyn NetworkContext, timer: TimerToken);
fn send_local_message(&self, _io: &dyn NetworkContext, _message: Vec<u8>);
fn on_work_dispatch(
&self, _io: &dyn NetworkContext, _work_type: HandlerWorkType,
);
}
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub enum UpdateNodeOperation {
Failure,
Demotion,
Remove,
}
pub trait NetworkContext {
fn get_protocol(&self) -> ProtocolId;
fn get_peer_connection_origin(&self, node_id: &NodeId) -> Option<bool>;
fn send(
&self, node_id: &NodeId, msg: Vec<u8>,
min_protocol_version: ProtocolVersion,
version_valid_till: ProtocolVersion, priority: SendQueuePriority,
) -> Result<(), Error>;
fn disconnect_peer(
&self, node_id: &NodeId, op: Option<UpdateNodeOperation>, reason: &str,
);
fn register_timer(
&self, token: TimerToken, delay: Duration,
) -> Result<(), Error>;
fn dispatch_work(&self, work_type: HandlerWorkType);
fn insert_peer_node_tag(&self, peer: NodeId, key: &str, value: &str);
fn is_peer_self(&self, _node_id: &NodeId) -> bool;
fn self_node_id(&self) -> NodeId;
}
#[derive(Debug, Clone)]
pub struct SessionMetadata {
pub id: Option<NodeId>,
pub peer_protocols: Vec<ProtocolInfo>,
pub originated: bool,
pub peer_header_version: u8,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ProtocolInfo {
pub protocol: ProtocolId,
pub version: ProtocolVersion,
}
impl Encodable for ProtocolInfo {
fn rlp_append(&self, rlp: &mut RlpStream) {
rlp.begin_list(2);
rlp.append(&&self.protocol[..]);
rlp.append(&self.version);
}
}
impl Decodable for ProtocolInfo {
fn decode(rlp: &Rlp) -> Result<Self, DecoderError> {
let p: Vec<u8> = rlp.val_at(0)?;
if p.len() != 3 {
return Err(DecoderError::Custom(
"Invalid subprotocol string length",
));
}
let mut protocol: ProtocolId = [0u8; 3];
protocol.clone_from_slice(&p);
Ok(ProtocolInfo {
protocol,
version: rlp.val_at(1)?,
})
}
}
impl PartialOrd for ProtocolInfo {
fn partial_cmp(&self, other: &ProtocolInfo) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for ProtocolInfo {
fn cmp(&self, other: &ProtocolInfo) -> Ordering {
self.protocol.cmp(&other.protocol)
}
}
#[derive(Serialize, Deserialize)]
pub struct PeerInfo {
pub id: PeerId,
pub addr: SocketAddr,
pub nodeid: NodeId,
pub protocols: Vec<ProtocolInfo>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct IpFilter {
pub predefined: AllowIP,
pub custom_allow: Vec<IpNetwork>,
pub custom_block: Vec<IpNetwork>,
}
impl Default for IpFilter {
fn default() -> Self {
IpFilter {
predefined: AllowIP::All,
custom_allow: vec![],
custom_block: vec![],
}
}
}
impl IpFilter {
pub fn parse(s: &str) -> Result<IpFilter, IpNetworkError> {
let mut filter = IpFilter::default();
for f in s.split_whitespace() {
match f {
"all" => filter.predefined = AllowIP::All,
"private" => filter.predefined = AllowIP::Private,
"public" => filter.predefined = AllowIP::Public,
"none" => filter.predefined = AllowIP::None,
custom => {
if custom.starts_with('-') {
filter.custom_block.push(IpNetwork::from_str(
&custom.to_owned().split_off(1),
)?)
} else {
filter.custom_allow.push(IpNetwork::from_str(custom)?)
}
}
}
}
Ok(filter)
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum AllowIP {
All,
Private,
Public,
None,
}
pub fn parse_msg_id_leb128_2_bytes_at_most(msg: &mut &[u8]) -> u16 {
let buf = *msg;
let mut ret = 0;
let mut pos = buf.len() - 1;
let byte = buf[pos] as u16;
ret |= byte & 0x7f;
if byte & 0x80 != 0 {
pos -= 1;
let byte = buf[pos] as u16;
ret |= (byte & 0x7f) << 7;
}
*msg = &buf[..pos];
ret
}