use cfx_types::H256;
use crate::{
message::MsgId,
sync::{
message::{DynamicCapability, DynamicCapabilitySet},
random, Error,
},
NodeType,
};
use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
use network::{
node_table::NodeId, service::ProtocolVersion, Error as NetworkError,
};
use parking_lot::RwLock;
use rand::prelude::SliceRandom;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::{Duration, Instant},
};
use throttling::token_bucket::{ThrottledManager, TokenBucketManager};
#[derive(DeriveMallocSizeOf)]
pub struct SynchronizationPeerState {
pub node_id: NodeId,
pub node_type: NodeType,
pub is_validator: bool,
pub protocol_version: ProtocolVersion,
pub genesis_hash: H256,
pub best_epoch: u64,
pub latest_block_hashes: HashSet<H256>,
pub received_transaction_count: usize,
pub heartbeat: Instant,
pub capabilities: DynamicCapabilitySet,
pub notified_capabilities: DynamicCapabilitySet,
pub throttling: TokenBucketManager,
pub throttled_msgs: ThrottledManager<MsgId>,
}
impl SynchronizationPeerState {
pub fn update(
&mut self, node_type: Option<NodeType>,
latest_block_hashes: HashSet<H256>, best_epoch: u64,
) -> bool {
if let Some(node_type) = node_type {
self.node_type = node_type;
}
self.heartbeat = Instant::now();
let updated = best_epoch != self.best_epoch
|| latest_block_hashes != self.latest_block_hashes;
if updated {
self.best_epoch = best_epoch;
self.latest_block_hashes = latest_block_hashes;
}
updated
}
}
pub type SynchronizationPeers =
HashMap<NodeId, Arc<RwLock<SynchronizationPeerState>>>;
#[derive(DeriveMallocSizeOf)]
pub struct SynchronizationState {
is_consortium: bool,
node_type: NodeType,
allow_phase_change_without_peer: bool,
min_phase_change_normal_peer_count: usize,
pub peers: RwLock<SynchronizationPeers>,
pub handshaking_peers: RwLock<HashMap<NodeId, (ProtocolVersion, Instant)>>,
pub last_sent_transaction_hashes: RwLock<HashSet<H256>>,
}
impl SynchronizationState {
pub fn new(
is_consortium: bool, node_type: NodeType,
allow_phase_change_without_peer: bool,
min_phase_change_normal_peer_count: usize,
) -> Self {
SynchronizationState {
is_consortium,
node_type,
allow_phase_change_without_peer,
min_phase_change_normal_peer_count,
peers: Default::default(),
handshaking_peers: Default::default(),
last_sent_transaction_hashes: Default::default(),
}
}
pub fn is_consortium(&self) -> bool { self.is_consortium }
pub fn on_status_in_handshaking(
&self, node_id: &NodeId,
) -> Option<ProtocolVersion> {
let peers = self.peers.read();
let mut handshaking_peers = self.handshaking_peers.write();
if !peers.contains_key(node_id) {
handshaking_peers.remove(node_id).map(|(v, _)| v)
} else {
None
}
}
pub fn peer_connected(
&self, node_id: NodeId, state: SynchronizationPeerState,
) {
let mut peers = self.peers.write();
if self.is_consortium() {
unimplemented!();
} else {
peers.insert(node_id, Arc::new(RwLock::new(state)));
}
}
pub fn contains_peer(&self, node_id: &NodeId) -> bool {
self.peers.read().contains_key(node_id)
}
pub fn get_peer_info(
&self, node_id: &NodeId,
) -> Result<Arc<RwLock<SynchronizationPeerState>>, Error> {
Ok(self
.peers
.read()
.get(node_id)
.ok_or(Error::UnknownPeer)?
.clone())
}
pub fn get_peer_version(
&self, peer: &NodeId,
) -> Result<ProtocolVersion, NetworkError> {
match self.get_peer_info(peer) {
Err(_) => bail!(NetworkError::InvalidNodeId),
Ok(info) => Ok(info.read().protocol_version),
}
}
pub fn update_heartbeat(&self, node_id: &NodeId) {
if let Some(state) = self.peers.read().get(node_id) {
state.write().heartbeat = Instant::now();
}
}
pub fn get_heartbeat_timeout_peers(
&self, timeout: Duration,
) -> Vec<NodeId> {
let mut timeout_peers = Vec::new();
for (peer, (_, handshake_time)) in self.handshaking_peers.read().iter()
{
if handshake_time.elapsed() > timeout {
timeout_peers.push(*peer);
}
}
for (peer, state) in self.peers.read().iter() {
if state.read().heartbeat.elapsed() > timeout {
timeout_peers.push(*peer);
}
}
timeout_peers
}
pub fn is_full_node(&self) -> bool { self.node_type == NodeType::Full }
pub fn allow_phase_change_without_peer(&self) -> bool {
self.allow_phase_change_without_peer
}
pub fn median_epoch_from_normal_peers(&self) -> Option<u64> {
let mut fresh_start = true;
let mut peer_best_epoches = Vec::new();
{
let peers = self.peers.read();
if peers.is_empty() {
debug!("median_epoch_from_normal_peers: no connected peers");
fresh_start = false;
}
for (_, state_lock) in &*peers {
let state = state_lock.read();
if state
.capabilities
.contains(DynamicCapability::NormalPhase(true))
{
fresh_start = false;
peer_best_epoches.push(state.best_epoch);
} else if state.best_epoch != 0 {
fresh_start = false;
debug!("median_epoch_from_normal_peers: not fresh start");
}
}
};
if peer_best_epoches.len() < self.min_phase_change_normal_peer_count
|| peer_best_epoches.is_empty()
{
return if fresh_start {
debug!("median_epoch_from_normal_peers: fresh start");
Some(0)
} else {
debug!(
"median_epoch_from_normal_peers: no enough peers in normal phase, have {}, require {}",
peer_best_epoches.len(), self.min_phase_change_normal_peer_count
);
None
};
}
peer_best_epoches.sort();
Some(peer_best_epoches[peer_best_epoches.len() / 2])
}
pub fn best_peer_epoch(&self) -> Option<u64> {
self.peers
.read()
.iter()
.map(|(_, state)| state.read().best_epoch)
.fold(None, |max, x| match max {
None => Some(x),
Some(max) => Some(if x > max { x } else { max }),
})
}
}
#[derive(Default)]
pub struct PeerFilter<'a> {
throttle_msg_ids: Option<HashSet<MsgId>>,
preferred_node_type: Option<NodeType>,
excludes: Option<HashSet<NodeId>>,
choose_from: Option<&'a HashSet<NodeId>>,
cap: Option<DynamicCapability>,
min_best_epoch: Option<u64>,
}
impl<'a> PeerFilter<'a> {
pub fn new(msg_id: MsgId) -> Self { PeerFilter::default().throttle(msg_id) }
pub fn with_preferred_node_type(mut self, node_type: NodeType) -> Self {
self.preferred_node_type = Some(node_type);
self
}
pub fn throttle(mut self, msg_id: MsgId) -> Self {
self.throttle_msg_ids
.get_or_insert_with(|| HashSet::new())
.insert(msg_id);
self
}
pub fn exclude(mut self, node_id: NodeId) -> Self {
self.excludes
.get_or_insert_with(|| HashSet::new())
.insert(node_id);
self
}
pub fn choose_from(mut self, peer_set: &'a HashSet<NodeId>) -> Self {
self.choose_from = Some(peer_set);
self
}
pub fn with_cap(mut self, cap: DynamicCapability) -> Self {
self.cap.replace(cap);
self
}
pub fn with_min_best_epoch(mut self, min_best_epoch: u64) -> Self {
self.min_best_epoch.replace(min_best_epoch);
self
}
pub fn select_all(self, syn: &SynchronizationState) -> Vec<NodeId> {
let mut peers = Vec::new();
let check_state = self.throttle_msg_ids.is_some()
|| self.cap.is_some()
|| self.min_best_epoch.is_some();
for (id, peer) in syn.peers.read().iter() {
let peer_node_type = peer.read().node_type.clone();
if let Some(ref preferred_node_type) = self.preferred_node_type {
if *preferred_node_type != peer_node_type {
continue;
}
}
if let Some(ref excludes) = self.excludes {
if excludes.contains(id) {
continue;
}
}
if let Some(ref choose_from) = self.choose_from {
if !choose_from.contains(id) {
continue;
}
}
if check_state {
let mut peer = peer.write();
if syn.is_consortium() {
if !peer.is_validator {
continue;
}
}
if let Some(ref ids) = self.throttle_msg_ids {
if ids
.iter()
.any(|id| peer.throttled_msgs.check_throttled(id))
{
continue;
}
}
if let Some(cap) = self.cap {
if !peer.capabilities.contains(cap) {
continue;
}
}
if let Some(min) = self.min_best_epoch {
if peer.best_epoch < min {
continue;
}
}
}
peers.push(*id);
}
peers
}
pub fn select(self, syn: &SynchronizationState) -> Option<NodeId> {
self.select_all(syn).choose(&mut random::new()).cloned()
}
pub fn select_n(self, n: usize, syn: &SynchronizationState) -> Vec<NodeId> {
let mut peers = self.select_all(syn);
peers.shuffle(&mut random::new());
peers.truncate(n);
peers
}
}