use super::{
random, request_manager::RequestManager, Error, SharedSynchronizationGraph,
SynchronizationState,
};
use crate::{
block_data_manager::BlockStatus,
light_protocol::Provider as LightProvider,
message::{decode_msg, Message, MsgId},
sync::{
message::{
handle_rlp_message, msgid, Context, DynamicCapability,
GetBlockHeadersResponse, Heartbeat, NewBlockHashes, StatusV2,
StatusV3, TransactionDigests,
},
request_manager::{try_get_block_hashes, Request},
state::SnapshotChunkSync,
synchronization_phases::{SyncPhaseType, SynchronizationPhaseManager},
synchronization_state::PeerFilter,
StateSyncConfiguration,
SYNCHRONIZATION_PROTOCOL_OLD_VERSIONS_TO_SUPPORT,
SYNCHRONIZATION_PROTOCOL_VERSION, SYNC_PROTO_V1, SYNC_PROTO_V2,
},
ConsensusGraph, NodeType,
};
use cfx_internal_common::ChainIdParamsDeprecated;
use cfx_parameters::{block::MAX_BLOCK_SIZE_IN_BYTES, sync::*};
use cfx_types::H256;
use diem_types::validator_config::{ConsensusPublicKey, ConsensusVRFPublicKey};
use io::TimerToken;
use malloc_size_of::{new_malloc_size_ops, MallocSizeOf};
use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
use metrics::{register_meter_with_group, Meter, MeterTimer};
use network::{
node_table::NodeId, service::ProtocolVersion,
throttling::THROTTLING_SERVICE, Error as NetworkError, HandlerWorkType,
NetworkContext, NetworkProtocolHandler, UpdateNodeOperation,
};
use parking_lot::{Mutex, RwLock};
use primitives::{Block, BlockHeader, EpochId, SignedTransaction};
use rand::{prelude::SliceRandom, Rng};
use rlp::Rlp;
use std::{
cmp::{self, min},
collections::{BTreeMap, HashMap, HashSet, VecDeque},
sync::Arc,
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};
lazy_static! {
static ref TX_PROPAGATE_METER: Arc<dyn Meter> =
register_meter_with_group("system_metrics", "tx_propagate_set_size");
static ref TX_HASHES_PROPAGATE_METER: Arc<dyn Meter> =
register_meter_with_group(
"system_metrics",
"tx_hashes_propagate_set_size"
);
static ref BLOCK_RECOVER_TIMER: Arc<dyn Meter> =
register_meter_with_group("timer", "sync:recover_block");
static ref PROPAGATE_TX_TIMER: Arc<dyn Meter> =
register_meter_with_group("timer", "sync:propagate_tx_timer");
}
const TX_TIMER: TimerToken = 0;
const CHECK_REQUEST_TIMER: TimerToken = 1;
const BLOCK_CACHE_GC_TIMER: TimerToken = 2;
const CHECK_CATCH_UP_MODE_TIMER: TimerToken = 3;
const LOG_STATISTIC_TIMER: TimerToken = 4;
const TOTAL_WEIGHT_IN_PAST_TIMER: TimerToken = 5;
const CHECK_PEER_HEARTBEAT_TIMER: TimerToken = 6;
const CHECK_FUTURE_BLOCK_TIMER: TimerToken = 7;
const EXPIRE_BLOCK_GC_TIMER: TimerToken = 8;
const HEARTBEAT_TIMER: TimerToken = 9;
pub const CHECK_RPC_REQUEST_TIMER: TimerToken = 11;
const MAX_TXS_BYTES_TO_PROPAGATE: usize = 1024 * 1024; const EPOCH_SYNC_MAX_GAP_START: u64 = 20000;
const EPOCH_SYNC_MAX_GAP_INCREASE: u64 = 5000;
const EPOCH_SYNC_MAX_RETRY_COUNT: u64 = 20;
const EPOCH_SYNC_RESTART_TIMEOUT_S: u64 = 60 * 10;
const EPOCH_SYNC_MAX_INFLIGHT: u64 = 300;
const EPOCH_SYNC_BATCH_SIZE: u64 = 30;
const BLOCK_SYNC_MAX_INFLIGHT: usize = 1000;
#[derive(Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq)]
pub enum SyncHandlerWorkType {
RecoverPublic = 1,
LocalMessage = 2,
}
pub trait TaskSize {
fn size(&self) -> usize { 0 }
fn count(&self) -> usize { 1 }
}
pub struct AsyncTaskQueue<T: TaskSize> {
inner: RwLock<AsyncTaskQueueInner<T>>,
work_type: HandlerWorkType,
max_capacity: usize,
alpha: f64,
}
struct AsyncTaskQueueInner<T: TaskSize> {
tasks: VecDeque<T>,
size: usize,
moving_average: f64,
}
impl<T: TaskSize> AsyncTaskQueue<T> {
fn new(work_type: SyncHandlerWorkType, max_capacity: usize) -> Self {
AsyncTaskQueue {
inner: RwLock::new(AsyncTaskQueueInner {
tasks: VecDeque::new(),
size: 0,
moving_average: MAX_BLOCK_SIZE_IN_BYTES as f64,
}),
work_type: work_type as HandlerWorkType,
max_capacity,
alpha: 0.001,
}
}
pub fn dispatch(&self, io: &dyn NetworkContext, task: T) {
let mut inner = self.inner.write();
inner.size += task.size();
if task.count() != 0 {
inner.moving_average = self.alpha
* (task.size() / task.count()) as f64
+ (1.0 - self.alpha) * inner.moving_average;
}
io.dispatch_work(self.work_type);
inner.tasks.push_back(task);
trace!(
"AsyncTaskQueue dispatch: size={} average={}",
inner.size,
inner.moving_average,
);
}
fn pop(&self) -> Option<T> {
let mut inner = self.inner.write();
let task = inner.tasks.pop_front();
task.as_ref().map(|task| {
inner.size -= task.size();
});
trace!(
"AsyncTaskQueue pop: size={} average={}",
inner.size,
inner.moving_average,
);
task
}
fn size(&self) -> usize { self.inner.read().size }
pub fn is_full(&self) -> bool { self.size() >= self.max_capacity }
pub fn estimated_available_count(&self) -> usize {
let inner = self.inner.read();
if inner.size >= self.max_capacity {
0
} else if inner.moving_average != 0.0 {
((self.max_capacity - inner.size) as f64 / inner.moving_average)
as usize
} else {
self.max_capacity
}
}
}
#[derive(DeriveMallocSizeOf)]
pub struct RecoverPublicTask {
blocks: Vec<Block>,
requested: HashSet<H256>,
delay: Option<Duration>,
failed_peer: NodeId,
compact: bool,
}
impl RecoverPublicTask {
pub fn new(
blocks: Vec<Block>, requested: HashSet<H256>, failed_peer: NodeId,
compact: bool, delay: Option<Duration>,
) -> Self {
RecoverPublicTask {
blocks,
requested,
failed_peer,
compact,
delay,
}
}
}
impl TaskSize for RecoverPublicTask {
fn size(&self) -> usize {
let mut ops = new_malloc_size_ops();
self.size_of(&mut ops) + std::mem::size_of::<Self>()
}
fn count(&self) -> usize { self.blocks.len() }
}
pub struct LocalMessageTask {
message: Vec<u8>,
}
impl TaskSize for LocalMessageTask {}
struct FutureBlockContainerInner {
capacity: usize,
size: usize,
container: BTreeMap<u64, HashSet<H256>>,
hash_to_header_and_peer: HashMap<H256, (BlockHeader, NodeId)>,
}
impl FutureBlockContainerInner {
pub fn new(capacity: usize) -> Self {
FutureBlockContainerInner {
capacity,
size: 0,
container: BTreeMap::new(),
hash_to_header_and_peer: Default::default(),
}
}
}
pub struct FutureBlockContainer {
inner: RwLock<FutureBlockContainerInner>,
}
impl FutureBlockContainer {
pub fn new(capacity: usize) -> Self {
FutureBlockContainer {
inner: RwLock::new(FutureBlockContainerInner::new(capacity)),
}
}
pub fn insert(&self, header: BlockHeader, peer: NodeId) {
let inner = &mut *self.inner.write();
let header_hash = header.hash();
if inner.hash_to_header_and_peer.contains_key(&header_hash) {
return;
}
let entry = inner
.container
.entry(header.timestamp())
.or_insert(HashSet::new());
if !entry.contains(&header_hash) {
entry.insert(header_hash);
inner
.hash_to_header_and_peer
.insert(header_hash, (header, peer));
inner.size += 1;
}
if inner.size > inner.capacity {
let mut removed = false;
let mut empty_slots = Vec::new();
for entry in inner.container.iter_mut().rev() {
if entry.1.is_empty() {
empty_slots.push(*entry.0);
continue;
}
let hash = *entry.1.iter().next().unwrap();
entry.1.remove(&hash);
inner.hash_to_header_and_peer.remove(&hash);
removed = true;
if entry.1.is_empty() {
empty_slots.push(*entry.0);
}
break;
}
if removed {
inner.size -= 1;
}
for slot in empty_slots {
inner.container.remove(&slot);
}
}
}
pub fn get_before(&self, timestamp: u64) -> Vec<(BlockHeader, NodeId)> {
let mut inner = self.inner.write();
let mut result = Vec::new();
loop {
let slot = if let Some(entry) = inner.container.iter().next() {
Some(*entry.0)
} else {
None
};
if slot.is_none() || slot.unwrap() > timestamp {
break;
}
let entry = inner.container.remove(&slot.unwrap()).unwrap();
for header_hash in entry {
result.push(inner.hash_to_header_and_peer.remove(&header_hash).expect(
"hash and header are inserted/removed together atomically",
));
}
}
result
}
pub fn contains(&self, header_hash: &H256) -> bool {
self.inner
.read()
.hash_to_header_and_peer
.contains_key(header_hash)
}
}
#[derive(DeriveMallocSizeOf)]
pub struct SynchronizationProtocolHandler {
pub protocol_version: ProtocolVersion,
pub protocol_config: ProtocolConfiguration,
pub graph: SharedSynchronizationGraph,
pub syn: Arc<SynchronizationState>,
pub request_manager: Arc<RequestManager>,
pub latest_epoch_requested: Mutex<(u64, Instant, u64, u64)>,
#[ignore_malloc_size_of = "only stores reference to others"]
pub phase_manager: SynchronizationPhaseManager,
pub phase_manager_lock: Mutex<u32>,
#[ignore_malloc_size_of = "channels are not handled in MallocSizeOf"]
pub recover_public_queue: Arc<AsyncTaskQueue<RecoverPublicTask>>,
#[ignore_malloc_size_of = "channels are not handled in MallocSizeOf"]
local_message: AsyncTaskQueue<LocalMessageTask>,
#[ignore_malloc_size_of = "not used on archive nodes"]
pub state_sync: Arc<SnapshotChunkSync>,
pub synced_epoch_id: Mutex<Option<EpochId>>,
light_provider: Arc<LightProvider>,
}
#[derive(Clone, Default, DeriveMallocSizeOf)]
pub struct ProtocolConfiguration {
pub is_consortium: bool,
pub send_tx_period: Duration,
pub check_request_period: Duration,
pub check_phase_change_period: Duration,
pub heartbeat_period_interval: Duration,
pub heartbeat_timeout: Duration,
pub block_cache_gc_period: Duration,
pub expire_block_gc_period: Duration,
pub sync_expire_block_timeout: Duration,
pub headers_request_timeout: Duration,
pub blocks_request_timeout: Duration,
pub transaction_request_timeout: Duration,
pub snapshot_candidate_request_timeout: Duration,
pub snapshot_manifest_request_timeout: Duration,
pub snapshot_chunk_request_timeout: Duration,
pub tx_maintained_for_peer_timeout: Duration,
pub max_inflight_request_count: u64,
pub received_tx_index_maintain_timeout: Duration,
pub inflight_pending_tx_index_maintain_timeout: Duration,
pub request_block_with_public: bool,
pub max_trans_count_received_in_catch_up: u64,
pub min_peers_tx_propagation: usize,
pub max_peers_tx_propagation: usize,
pub max_downloading_chunks: usize,
pub max_downloading_chunk_attempts: usize,
pub test_mode: bool,
pub dev_mode: bool,
pub throttling_config_file: Option<String>,
pub chunk_size_byte: u64,
pub timeout_observing_period_s: u64,
pub max_allowed_timeout_in_observing_period: u64,
pub demote_peer_for_timeout: bool,
pub max_unprocessed_block_size: usize,
pub max_chunk_number_in_manifest: usize,
pub allow_phase_change_without_peer: bool,
pub min_phase_change_normal_peer_count: usize,
pub pos_genesis_pivot_decision: H256,
pub check_status_genesis: bool,
pub pos_started_as_voter: bool,
}
impl SynchronizationProtocolHandler {
pub fn new(
node_type: NodeType, protocol_config: ProtocolConfiguration,
state_sync_config: StateSyncConfiguration,
initial_sync_phase: SyncPhaseType,
sync_graph: SharedSynchronizationGraph,
light_provider: Arc<LightProvider>, consensus: Arc<ConsensusGraph>,
) -> Self {
let sync_state = Arc::new(SynchronizationState::new(
protocol_config.is_consortium,
node_type,
protocol_config.allow_phase_change_without_peer,
protocol_config.min_phase_change_normal_peer_count,
));
let recover_public_queue = Arc::new(AsyncTaskQueue::new(
SyncHandlerWorkType::RecoverPublic,
protocol_config.max_unprocessed_block_size,
));
let request_manager = Arc::new(RequestManager::new(
&protocol_config,
sync_state.clone(),
recover_public_queue.clone(),
));
let state_sync = Arc::new(SnapshotChunkSync::new(state_sync_config));
Self {
protocol_version: SYNCHRONIZATION_PROTOCOL_VERSION,
protocol_config,
graph: sync_graph.clone(),
syn: sync_state.clone(),
request_manager,
latest_epoch_requested: Mutex::new((0, Instant::now(), 0, 0)),
phase_manager: SynchronizationPhaseManager::new(
initial_sync_phase,
sync_state.clone(),
sync_graph.clone(),
state_sync.clone(),
consensus,
),
phase_manager_lock: Mutex::new(0),
recover_public_queue,
local_message: AsyncTaskQueue::new(
SyncHandlerWorkType::LocalMessage,
10000000000, ),
state_sync,
synced_epoch_id: Default::default(),
light_provider,
}
}
pub fn node_type(&self) -> NodeType {
if self.syn.is_full_node() {
NodeType::Full
} else {
NodeType::Archive
}
}
pub fn is_consortium(&self) -> bool { self.protocol_config.is_consortium }
fn get_to_propagate_trans(&self) -> HashMap<H256, Arc<SignedTransaction>> {
self.graph.get_to_propagate_trans()
}
fn set_to_propagate_trans(
&self, transactions: HashMap<H256, Arc<SignedTransaction>>,
) {
self.graph.set_to_propagate_trans(transactions);
}
pub fn catch_up_mode(&self) -> bool {
self.phase_manager.get_current_phase().phase_type()
!= SyncPhaseType::Normal
}
pub fn in_recover_from_db_phase(&self) -> bool {
let current_phase = self.phase_manager.get_current_phase();
current_phase.phase_type()
== SyncPhaseType::CatchUpRecoverBlockHeaderFromDB
|| current_phase.phase_type()
== SyncPhaseType::CatchUpFillBlockBodyPhase
}
pub fn need_requesting_blocks(&self) -> bool {
let current_phase = self.phase_manager.get_current_phase();
current_phase.phase_type() == SyncPhaseType::CatchUpSyncBlock
|| current_phase.phase_type() == SyncPhaseType::Normal
}
pub fn need_block_from_archive_node(&self) -> bool {
let current_phase = self.phase_manager.get_current_phase();
current_phase.phase_type() == SyncPhaseType::CatchUpSyncBlock
&& !self.syn.is_full_node()
}
pub fn preferred_peer_node_type_for_get_block(&self) -> Option<NodeType> {
if self.need_block_from_archive_node() {
Some(NodeType::Archive)
} else {
None
}
}
pub fn get_synchronization_graph(&self) -> SharedSynchronizationGraph {
self.graph.clone()
}
pub fn get_request_manager(&self) -> Arc<RequestManager> {
self.request_manager.clone()
}
pub fn append_received_transactions(
&self, transactions: Vec<Arc<SignedTransaction>>,
) {
self.request_manager
.append_received_transactions(transactions);
}
fn dispatch_message(
&self, io: &dyn NetworkContext, peer: &NodeId, msg_id: MsgId, rlp: Rlp,
) -> Result<(), Error> {
trace!("Dispatching message: peer={:?}, msg_id={:?}", peer, msg_id);
if !io.is_peer_self(peer) {
if !self.syn.contains_peer(peer) {
debug!(
"dispatch_message: Peer does not exist: peer={} msg_id={}",
peer, msg_id
);
if !self.syn.handshaking_peers.read().contains_key(peer)
|| (msg_id != msgid::STATUS_V3
&& msg_id != msgid::STATUS_V2)
{
debug!("Message from unknown peer {:?}", msg_id);
return Ok(());
}
} else {
self.syn.update_heartbeat(peer);
}
}
let ctx = Context {
node_id: *peer,
io,
manager: self,
};
if !handle_rlp_message(msg_id, &ctx, &rlp)? {
warn!("Unknown message: peer={:?} msgid={:?}", peer, msg_id);
let reason =
format!("unknown sync protocol message id {:?}", msg_id);
io.disconnect_peer(
peer,
Some(UpdateNodeOperation::Remove),
reason.as_str(),
);
}
Ok(())
}
fn handle_error(
&self, io: &dyn NetworkContext, peer: &NodeId, msg_id: MsgId, e: Error,
) {
let mut disconnect = true;
let mut warn = true;
let reason = format!("{}", e);
let error_reason = format!("{:?}", e);
let mut op = None;
match e {
Error::InvalidBlock => op = Some(UpdateNodeOperation::Failure),
Error::InvalidGetBlockTxn(_) => {
op = Some(UpdateNodeOperation::Demotion)
}
Error::InvalidStatus(_) => op = Some(UpdateNodeOperation::Demotion),
Error::InvalidMessageFormat => {
op = Some(UpdateNodeOperation::Remove)
}
Error::UnknownPeer => {
warn = false;
op = Some(UpdateNodeOperation::Failure)
}
Error::UnexpectedResponse => {
op = Some(UpdateNodeOperation::Demotion)
}
Error::RequestNotFound => {
disconnect = false;
warn = false;
}
Error::InCatchUpMode(_) => {
disconnect = false;
warn = false;
}
Error::TooManyTrans => {}
Error::InvalidTimestamp => op = Some(UpdateNodeOperation::Demotion),
Error::InvalidSnapshotManifest(_) => {
op = Some(UpdateNodeOperation::Demotion)
}
Error::InvalidSnapshotChunk(_) => {
op = Some(UpdateNodeOperation::Demotion)
}
Error::EmptySnapshotChunk => disconnect = false,
Error::AlreadyThrottled(_) => {
op = Some(UpdateNodeOperation::Remove)
}
Error::Throttled(_, msg) => {
disconnect = false;
if let Err(e) = msg.send(io, peer) {
error!("failed to send throttled packet: {:?}", e);
disconnect = true;
}
}
Error::Decoder(_) => op = Some(UpdateNodeOperation::Remove),
Error::Io(_) => disconnect = false,
Error::Network(kind) => match kind {
network::Error::SendUnsupportedMessage { .. } => {
unreachable!(
"This is a bug in protocol version maintenance. {:?}",
kind
);
}
network::Error::MessageDeprecated { .. } => {
op = Some(UpdateNodeOperation::Failure);
error!(
"Peer sent us a deprecated message {:?}. Either it's a bug \
in protocol version maintenance or the peer is malicious.",
kind
);
}
network::Error::AddressParse => disconnect = false,
network::Error::AddressResolve(_) => disconnect = false,
network::Error::Auth => disconnect = false,
network::Error::BadProtocol => {
op = Some(UpdateNodeOperation::Remove)
}
network::Error::BadAddr => disconnect = false,
network::Error::Decoder(_) => {
op = Some(UpdateNodeOperation::Remove)
}
network::Error::Expired => disconnect = false,
network::Error::Disconnect(_) => disconnect = false,
network::Error::InvalidNodeId => disconnect = false,
network::Error::OversizedPacket => disconnect = false,
network::Error::Io(_) => disconnect = false,
network::Error::Throttling(_) => disconnect = false,
network::Error::SocketIo(_) => {
op = Some(UpdateNodeOperation::Failure)
}
network::Error::Msg(_) => {
op = Some(UpdateNodeOperation::Failure)
}
},
Error::Storage(_) => disconnect = false,
Error::Msg(_) => op = Some(UpdateNodeOperation::Failure),
Error::InternalError(_) => {}
Error::RpcCancelledByDisconnection => {}
Error::RpcTimeout => {}
Error::UnexpectedMessage(_) => {
op = Some(UpdateNodeOperation::Remove)
}
Error::NotSupported(_) => disconnect = false,
}
if warn {
warn!(
"Error while handling message, peer={}, msgid={:?}, error={}",
peer, msg_id, error_reason
);
} else {
debug!(
"Minor error while handling message, peer={}, msgid={:?}, error={}",
peer, msg_id, error_reason
);
}
if disconnect {
io.disconnect_peer(peer, op, reason.as_str());
}
}
pub fn start_sync(&self, io: &dyn NetworkContext) {
let current_phase_type =
self.phase_manager.get_current_phase().phase_type();
if current_phase_type == SyncPhaseType::CatchUpRecoverBlockHeaderFromDB
|| current_phase_type == SyncPhaseType::CatchUpFillBlockBodyPhase
{
return;
}
if current_phase_type != SyncPhaseType::Normal {
self.request_epochs(io);
let best_peer_epoch = self.syn.best_peer_epoch().unwrap_or(0);
let my_best_epoch = self.graph.consensus.best_epoch_number();
if my_best_epoch + REQUEST_TERMINAL_EPOCH_LAG_THRESHOLD
>= best_peer_epoch
{
self.request_missing_terminals(io);
}
} else {
self.request_missing_terminals(io);
}
}
pub fn request_missing_terminals(&self, io: &dyn NetworkContext) {
let peers: Vec<NodeId> =
self.syn.peers.read().keys().cloned().collect();
let mut requested = HashSet::new();
let (_, era_genesis_height) =
self.graph.get_genesis_hash_and_height_in_current_era();
for peer in peers {
if let Ok(info) = self.syn.get_peer_info(&peer) {
if info.read().best_epoch < era_genesis_height {
continue;
}
let terminals = {
let mut info = info.write();
let ts = info.latest_block_hashes.clone();
info.latest_block_hashes.clear();
ts
};
let to_request = terminals
.difference(&requested)
.filter(|h| !self.graph.contains_block_header(&h))
.cloned()
.collect::<Vec<H256>>();
if terminals.len() > 0 {
debug!("Requesting terminals {:?}", to_request);
}
self.request_block_headers(
io,
Some(peer),
to_request.clone(),
true, );
requested.extend(to_request);
}
}
if requested.len() > 0 {
debug!("{:?} missing terminal block(s) requested", requested.len());
}
}
pub fn request_block_bodies(&self, io: &dyn NetworkContext) {
let in_flight_blocks = self.request_manager.in_flight_blocks();
let to_request_blocks: Vec<_> = self
.graph
.inner
.read()
.block_to_fill_set
.difference(&in_flight_blocks)
.copied()
.collect();
let n_blocks_to_request = min(
BLOCK_SYNC_MAX_INFLIGHT - in_flight_blocks.len(),
to_request_blocks.len(),
);
for block_chunk in to_request_blocks[0..n_blocks_to_request]
.chunks(MAX_BLOCKS_TO_SEND as usize)
{
self.request_blocks_without_check(io, None, block_chunk.to_vec());
}
}
pub fn request_epochs(&self, io: &dyn NetworkContext) {
let mut latest_requested = self.latest_epoch_requested.lock();
let median_peer_epoch =
self.syn.median_epoch_from_normal_peers().unwrap_or(0);
let my_best_epoch = self.graph.consensus.best_epoch_number();
let (
mut latest_requested_epoch,
latest_request_time,
old_best_epoch,
mut retry_count,
) = *latest_requested;
if old_best_epoch != my_best_epoch {
retry_count = 0;
}
let sync_max_gap = EPOCH_SYNC_MAX_GAP_START
+ EPOCH_SYNC_MAX_GAP_INCREASE * retry_count;
if latest_requested_epoch >= my_best_epoch + sync_max_gap {
if latest_request_time.elapsed()
< Duration::from_secs(EPOCH_SYNC_RESTART_TIMEOUT_S)
{
return;
} else {
latest_requested_epoch = my_best_epoch;
if retry_count < EPOCH_SYNC_MAX_RETRY_COUNT {
retry_count += 1;
}
}
}
while self.request_manager.num_epochs_in_flight()
< EPOCH_SYNC_MAX_INFLIGHT
&& latest_requested_epoch < my_best_epoch + sync_max_gap
&& (latest_requested_epoch < median_peer_epoch
|| median_peer_epoch == 0)
{
let from = cmp::max(my_best_epoch, latest_requested_epoch) + 1;
if let Some(epoch_hashes) =
self.graph.data_man.all_epoch_set_hashes_from_db(from)
{
debug!("Recovered epoch {} from db", from);
if self.need_requesting_blocks() {
self.request_blocks(io, None, epoch_hashes);
} else {
self.request_block_headers(
io,
None,
epoch_hashes,
true, );
}
latest_requested_epoch = from;
continue;
} else if median_peer_epoch == 0 {
break;
}
let peer = PeerFilter::new(msgid::GET_BLOCK_HASHES_BY_EPOCH)
.with_min_best_epoch(from)
.select(&self.syn);
if peer.is_none() {
break;
}
let until = {
let max_to_send = EPOCH_SYNC_MAX_INFLIGHT.saturating_sub(
self.request_manager.num_epochs_in_flight(),
);
let maybe_peer_info = self.syn.get_peer_info(&peer.unwrap());
if maybe_peer_info.is_err() {
continue;
}
let best_of_this_peer =
maybe_peer_info.unwrap().read().best_epoch;
let until = from + cmp::min(EPOCH_SYNC_BATCH_SIZE, max_to_send);
cmp::min(until, best_of_this_peer + 1)
};
let epochs = (from..until).collect::<Vec<u64>>();
debug!(
"requesting epochs [{}..{}]/{:?} from peer {:?}",
from,
until - 1,
median_peer_epoch,
peer
);
self.request_manager
.request_epoch_hashes(io, peer, epochs, None);
latest_requested_epoch = until - 1;
}
*latest_requested = (
latest_requested_epoch,
Instant::now(),
my_best_epoch,
retry_count,
);
}
pub fn request_block_headers(
&self, io: &dyn NetworkContext, peer: Option<NodeId>,
mut header_hashes: Vec<H256>, ignore_db: bool,
) {
if !ignore_db {
header_hashes
.retain(|hash| !self.try_request_header_from_db(io, hash));
}
header_hashes.retain(|h| !self.graph.contains_block_header(h));
self.request_manager.request_block_headers(
io,
peer,
header_hashes,
None,
);
}
fn try_request_header_from_db(
&self, io: &dyn NetworkContext, hash: &H256,
) -> bool {
if self.graph.contains_block_header(hash) {
return true;
}
if let Some(info) = self.graph.data_man.local_block_info_by_hash(hash) {
if info.get_status() == BlockStatus::Invalid {
return true;
}
if info.get_seq_num()
< self.graph.consensus.current_era_genesis_seq_num()
{
debug!("Ignore header in old era hash={:?}, seq={}, cur_era_seq={}", hash, info.get_seq_num(), self.graph.consensus.current_era_genesis_seq_num());
return true;
}
if info.get_instance_id() == self.graph.data_man.get_instance_id() {
return true;
}
}
if let Some(header) = self.graph.data_man.block_header_by_hash(hash) {
debug!("Recovered header {:?} from db", hash);
let mut block_headers_resp = GetBlockHeadersResponse::default();
block_headers_resp.request_id = 0;
let mut headers = Vec::new();
headers.push((*header).clone());
block_headers_resp.headers = headers;
let ctx = Context {
node_id: io.self_node_id(),
io,
manager: self,
};
ctx.send_response(&block_headers_resp)
.expect("send response should not be error");
return true;
} else {
return false;
}
}
fn on_blocks_inner(
&self, io: &dyn NetworkContext, task: RecoverPublicTask,
) -> Result<(), Error> {
let mut need_to_relay = Vec::new();
let mut received_blocks = HashSet::new();
let mut dependent_hashes = HashSet::new();
for mut block in task.blocks {
let hash = block.hash();
if self.graph.contains_block(&hash) {
received_blocks.insert(hash);
continue;
}
if !task.requested.contains(&hash) {
warn!("Response has not requested block {:?}", hash);
continue;
}
if let Err(e) = self.graph.data_man.recover_block(&mut block) {
warn!("Recover block {:?} with error {:?}", hash, e);
continue;
}
match self.graph.block_header_by_hash(&hash) {
Some(header) => block.block_header = header,
None => {
let (insert_result, _) = self.graph.insert_block_header(
&mut block.block_header,
true, false, false, true, );
if !insert_result.should_process_body() {
received_blocks.insert(hash);
continue;
}
let parent = block.block_header.parent_hash();
if !self.graph.contains_block(parent) {
dependent_hashes.insert(*parent);
}
for referee in block.block_header.referee_hashes() {
if !self.graph.contains_block(referee) {
dependent_hashes.insert(*referee);
}
}
}
}
let insert_result = self.graph.insert_block(
block, true, true, false, );
if insert_result.is_valid() {
received_blocks.insert(hash);
}
if insert_result.should_relay() {
need_to_relay.push(hash);
}
}
let mut filter =
PeerFilter::new(msgid::GET_BLOCKS).exclude(task.failed_peer);
if let Some(preferred_note_type) =
self.preferred_peer_node_type_for_get_block()
{
filter = filter.with_preferred_node_type(preferred_note_type);
}
let chosen_peer = filter.select(&self.syn);
self.blocks_received(
io,
task.requested,
received_blocks.clone(),
!task.compact,
chosen_peer.clone(),
task.delay,
self.preferred_peer_node_type_for_get_block(),
);
if self.graph.inner.read().locked_for_catchup {
self.request_block_bodies(io);
Ok(())
} else {
let missing_dependencies = dependent_hashes
.difference(&received_blocks)
.map(Clone::clone)
.collect();
self.request_blocks(io, chosen_peer, missing_dependencies);
self.relay_blocks(io, need_to_relay)
}
}
fn on_blocks_inner_task(
&self, io: &dyn NetworkContext,
) -> Result<(), Error> {
let task = self.recover_public_queue.pop().unwrap();
let received_blocks: Vec<H256> =
task.blocks.iter().map(|block| block.hash()).collect();
self.request_manager
.remove_net_inflight_blocks(received_blocks.iter());
self.request_manager
.remove_net_inflight_blocks(task.requested.iter());
self.on_blocks_inner(io, task)
}
fn on_local_message_task(&self, io: &dyn NetworkContext) {
let task = self.local_message.pop().unwrap();
self.on_message(io, &io.self_node_id(), task.message.as_slice());
}
pub fn on_mined_block(&self, mut block: Block) {
let hash = block.block_header.hash();
info!("Mined block {:?} header={:?}", hash, block.block_header);
let parent_hash = *block.block_header.parent_hash();
assert!(self.graph.contains_block_header(&parent_hash));
if self.graph.contains_block_header(&hash) {
warn!("Mined an duplicate block, the mining power is wasted!");
return;
}
self.graph.insert_block_header(
&mut block.block_header,
false,
false,
false,
true,
);
self.graph.insert_block(
block, false, true, false, );
}
fn broadcast_message(
&self, io: &dyn NetworkContext, skip_id: &NodeId, msg: &dyn Message,
) -> Result<(), NetworkError> {
let mut peer_ids: Vec<NodeId> = self
.syn
.peers
.read()
.keys()
.filter(|&id| *id != *skip_id)
.map(|x| *x)
.collect();
let throttle_ratio = THROTTLING_SERVICE.read().get_throttling_ratio();
let num_total = peer_ids.len();
let num_allowed = (num_total as f64 * throttle_ratio) as usize;
if num_total > num_allowed {
debug!("apply throttling for broadcast_message, total: {}, allowed: {}", num_total, num_allowed);
peer_ids.shuffle(&mut random::new());
peer_ids.truncate(num_allowed);
}
let msg_version_introduced = msg.version_introduced();
let mut msg_version_valid_till = msg.version_valid_till();
if msg_version_valid_till == self.protocol_version {
msg_version_valid_till = ProtocolVersion(std::u8::MAX);
}
for id in peer_ids {
let peer_version = self.syn.get_peer_version(&id)?;
if peer_version >= msg_version_introduced
&& peer_version <= msg_version_valid_till
{
msg.send(io, &id)?;
}
}
Ok(())
}
fn produce_status_message_v2(&self) -> StatusV2 {
let best_info = self.graph.consensus.best_info();
let chain_id = ChainIdParamsDeprecated {
chain_id: best_info.best_chain_id().in_native_space(),
};
let terminal_hashes = best_info.bounded_terminal_block_hashes.clone();
StatusV2 {
chain_id,
genesis_hash: self.graph.data_man.true_genesis.hash(),
best_epoch: best_info.best_epoch_number,
terminal_block_hashes: terminal_hashes,
}
}
fn produce_status_message_v3(&self) -> StatusV3 {
let best_info = self.graph.consensus.best_info();
let chain_id = ChainIdParamsDeprecated {
chain_id: best_info.best_chain_id().in_native_space(),
};
let terminal_hashes = best_info.bounded_terminal_block_hashes.clone();
StatusV3 {
chain_id,
node_type: self.node_type(),
genesis_hash: self.graph.data_man.true_genesis.hash(),
best_epoch: best_info.best_epoch_number,
terminal_block_hashes: terminal_hashes,
}
}
fn produce_heartbeat_message(&self) -> Heartbeat {
let best_info = self.graph.consensus.best_info();
let terminal_hashes = best_info.bounded_terminal_block_hashes.clone();
Heartbeat {
best_epoch: best_info.best_epoch_number,
terminal_block_hashes: terminal_hashes,
}
}
fn send_status(
&self, io: &dyn NetworkContext, peer: &NodeId,
peer_protocol_version: ProtocolVersion,
) -> Result<(), NetworkError> {
if peer_protocol_version == SYNC_PROTO_V2 {
let status_message = self.produce_status_message_v2();
debug!("Sending status message to {}: {:?}", peer, status_message);
status_message.send(io, peer)
} else {
let status_message = self.produce_status_message_v3();
debug!("Sending status message to {}: {:?}", peer, status_message);
status_message.send(io, peer)
}
}
fn broadcast_heartbeat(&self, io: &dyn NetworkContext) {
let status_message = self.produce_status_message_v2();
let heartbeat_message = self.produce_heartbeat_message();
debug!("Broadcasting heartbeat message: {:?}", heartbeat_message);
if self
.broadcast_message(io, &Default::default(), &heartbeat_message)
.is_err()
{
warn!("Error broadcasting heartbeat message");
}
if self
.broadcast_message(io, &Default::default(), &status_message)
.is_err()
{
warn!("Error broadcasting status message");
}
}
pub fn relay_blocks(
&self, io: &dyn NetworkContext, need_to_relay: Vec<H256>,
) -> Result<(), Error> {
if !need_to_relay.is_empty() && !self.catch_up_mode() {
let new_block_hash_msg: Box<dyn Message> =
Box::new(NewBlockHashes {
block_hashes: need_to_relay.clone(),
});
self.broadcast_message(
io,
&Default::default(),
new_block_hash_msg.as_ref(),
)
.unwrap_or_else(|e| {
warn!("Error broadcasting blocks, err={:?}", e);
});
self.light_provider
.relay_hashes(need_to_relay)
.unwrap_or_else(|e| {
warn!("Error relaying blocks to light provider: {:?}", e);
});
}
Ok(())
}
fn select_peers_for_transactions(&self) -> Vec<NodeId> {
let num_peers = self.syn.peers.read().len() as f64;
let throttle_ratio = THROTTLING_SERVICE.read().get_throttling_ratio();
let chosen_size = (num_peers.powf(-0.5).min(throttle_ratio) * num_peers)
.round() as usize;
let num_peers = chosen_size
.max(self.protocol_config.min_peers_tx_propagation)
.min(self.protocol_config.max_peers_tx_propagation);
PeerFilter::new(msgid::TRANSACTION_DIGESTS)
.with_cap(DynamicCapability::NormalPhase(true))
.select_n(num_peers, &self.syn)
}
fn propagate_transactions_to_peers(
&self, io: &dyn NetworkContext, lucky_peers: Vec<NodeId>,
) {
let _timer = MeterTimer::time_func(PROPAGATE_TX_TIMER.as_ref());
if lucky_peers.is_empty() {
return;
}
let mut nonces: Vec<(u64, u64)> = (0..lucky_peers.len())
.map(|_| (rand::thread_rng().gen(), rand::thread_rng().gen()))
.collect();
let mut short_ids_part: Vec<Vec<u8>> = vec![vec![]; lucky_peers.len()];
let mut tx_hashes_part: Vec<H256> = vec![];
let (short_ids_transactions, tx_hashes_transactions) = {
let mut transactions = self.get_to_propagate_trans();
if transactions.is_empty() {
return;
}
let mut total_tx_bytes = 0;
let mut short_ids_transactions: Vec<Arc<SignedTransaction>> =
Vec::new();
let mut tx_hashes_transactions: Vec<Arc<SignedTransaction>> =
Vec::new();
let received_pool =
self.request_manager.received_transactions.read();
for (_, tx) in transactions.iter() {
total_tx_bytes += tx.rlp_size();
if total_tx_bytes >= MAX_TXS_BYTES_TO_PROPAGATE {
break;
}
if received_pool.group_overflow_from_tx_hash(&tx.hash()) {
tx_hashes_transactions.push(tx.clone());
} else {
short_ids_transactions.push(tx.clone());
}
}
if short_ids_transactions.len() + tx_hashes_transactions.len()
!= transactions.len()
{
for tx in short_ids_transactions.iter() {
transactions.remove(&tx.hash);
}
for tx in tx_hashes_transactions.iter() {
transactions.remove(&tx.hash);
}
self.set_to_propagate_trans(transactions);
}
(short_ids_transactions, tx_hashes_transactions)
};
debug!(
"Send short ids:{}, Send tx hashes:{}",
short_ids_transactions.len(),
tx_hashes_transactions.len()
);
for tx in &short_ids_transactions {
for i in 0..lucky_peers.len() {
TransactionDigests::append_short_id(
&mut short_ids_part[i],
nonces[i].0,
nonces[i].1,
&tx.hash(),
);
}
}
let mut sent_transactions = short_ids_transactions.clone();
if !tx_hashes_transactions.is_empty() {
TX_HASHES_PROPAGATE_METER.mark(tx_hashes_transactions.len());
for tx in &tx_hashes_transactions {
TransactionDigests::append_tx_hash(
&mut tx_hashes_part,
tx.hash(),
);
}
sent_transactions.extend(tx_hashes_transactions.clone());
}
TX_PROPAGATE_METER.mark(sent_transactions.len());
if sent_transactions.len() == 0 {
return;
}
debug!(
"Sent {} transaction ids to {} peers.",
sent_transactions.len(),
lucky_peers.len()
);
let window_index = self
.request_manager
.append_sent_transactions(sent_transactions);
let mut resend_flag = false;
for i in 0..lucky_peers.len() {
let peer_id = lucky_peers[i];
let (key1, key2) = nonces.pop().unwrap();
let tx_msg = TransactionDigests::new(
window_index,
key1,
key2,
short_ids_part.pop().unwrap(),
tx_hashes_part.clone(),
);
match tx_msg.send(io, &peer_id) {
Ok(_) => {
trace!(
"{:02} <- Transactions ({} entries)",
peer_id,
tx_msg.len()
);
}
Err(e) => {
warn!(
"failed to propagate transaction ids to peer, id: {}, err: {}",
peer_id, e
);
resend_flag = true;
}
}
}
if resend_flag {
let mut resend_transactions: HashMap<H256, Arc<SignedTransaction>> =
HashMap::new();
for tx in short_ids_transactions {
resend_transactions.insert(tx.hash, tx.clone());
}
for tx in tx_hashes_transactions {
resend_transactions.insert(tx.hash, tx.clone());
}
self.set_to_propagate_trans(resend_transactions);
}
}
pub fn check_future_blocks(&self, io: &dyn NetworkContext) {
let now_timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let mut missed_body_block_hashes = HashMap::new();
let mut need_to_relay = HashSet::new();
let headers = self.graph.future_blocks.get_before(now_timestamp);
if headers.is_empty() {
return;
}
for (mut header, peer) in headers {
let hash = header.hash();
let (insert_result, to_relay) = self.graph.insert_block_header(
&mut header,
true,
false,
self.insert_header_to_consensus(),
true,
);
if insert_result.is_new_valid() {
need_to_relay.extend(to_relay);
if !self.graph.contains_block(&hash) {
missed_body_block_hashes
.entry(peer)
.or_insert(Vec::new())
.push(hash);
}
}
}
for (peer, missing_hashes) in missed_body_block_hashes {
self.request_missing_blocks(io, Some(peer), missing_hashes);
}
self.relay_blocks(io, need_to_relay.into_iter().collect())
.ok();
}
pub fn insert_header_to_consensus(&self) -> bool {
let current_phase = self.phase_manager.get_current_phase();
matches!(
current_phase.phase_type(),
SyncPhaseType::CatchUpSyncBlockHeader
| SyncPhaseType::CatchUpCheckpoint
)
}
pub fn propagate_new_transactions(&self, io: &dyn NetworkContext) {
if self.syn.peers.read().is_empty() || self.catch_up_mode() {
if self.protocol_config.dev_mode {
self.get_to_propagate_trans();
}
return;
}
let peers = self.select_peers_for_transactions();
self.propagate_transactions_to_peers(io, peers);
}
pub fn remove_expired_flying_request(&self, io: &dyn NetworkContext) {
self.request_manager.resend_timeout_requests(io);
let cancelled_requests = self.request_manager.resend_waiting_requests(
io,
!self.catch_up_mode(),
self.need_block_from_archive_node(),
);
self.handle_cancelled_requests(cancelled_requests);
}
fn handle_cancelled_requests(
&self, cancelled_requests: Vec<Box<dyn Request>>,
) {
let mut to_remove_blocks = HashSet::new();
for request in cancelled_requests {
if let Some(block_hashes) = try_get_block_hashes(&request) {
for hash in block_hashes {
to_remove_blocks.insert(*hash);
}
}
}
self.graph.remove_blocks_and_future(&to_remove_blocks);
}
pub fn send_heartbeat(&self, io: &dyn NetworkContext) {
self.broadcast_heartbeat(io);
}
fn gc(&self) {
self.graph.data_man.cache_gc();
self.graph
.data_man
.database_gc(self.graph.consensus.best_epoch_number())
}
fn log_statistics(&self) { self.graph.log_statistics(); }
fn update_total_weight_delta_heartbeat(&self) {
self.graph.update_total_weight_delta_heartbeat();
}
pub fn update_sync_phase(&self, io: &dyn NetworkContext) {
match self.phase_manager_lock.try_lock() {
Some(_pm_lock) => {
self.phase_manager.try_initialize(io, self);
loop {
let current_phase = self.phase_manager.get_current_phase();
let next_phase_type = current_phase.next(io, self);
if current_phase.phase_type() != next_phase_type {
self.phase_manager.change_phase_to(
next_phase_type,
io,
self,
);
} else {
break;
}
}
}
None => {
debug!("update_sync_phase: phase_manager locked by another IO Worker");
return;
}
}
let catch_up_mode = self.catch_up_mode();
let mut need_notify = Vec::new();
for (peer, state) in self.syn.peers.read().iter() {
let mut state = state.write();
if !state
.notified_capabilities
.contains(DynamicCapability::NormalPhase(!catch_up_mode))
{
state.received_transaction_count = 0;
state
.notified_capabilities
.insert(DynamicCapability::NormalPhase(!catch_up_mode));
need_notify.push(*peer);
}
}
info!(
"Catch-up mode: {}, latest epoch: {} missing_bodies: {}",
catch_up_mode,
self.graph.consensus.best_epoch_number(),
self.graph.inner.read().block_to_fill_set.len()
);
DynamicCapability::NormalPhase(!catch_up_mode)
.broadcast_with_peers(io, need_notify);
}
pub fn request_missing_blocks(
&self, io: &dyn NetworkContext, peer_id: Option<NodeId>,
hashes: Vec<H256>,
) {
let catch_up_mode = self.catch_up_mode();
if catch_up_mode {
self.request_blocks(io, peer_id, hashes);
} else {
self.request_manager
.request_compact_blocks(io, peer_id, hashes, None);
}
}
pub fn request_blocks(
&self, io: &dyn NetworkContext, peer_id: Option<NodeId>,
mut hashes: Vec<H256>,
) {
hashes.retain(|hash| !self.already_processed(hash));
self.request_blocks_without_check(io, peer_id, hashes)
}
pub fn request_blocks_without_check(
&self, io: &dyn NetworkContext, peer_id: Option<NodeId>,
hashes: Vec<H256>,
) {
let preferred_node_type = self.preferred_peer_node_type_for_get_block();
self.request_manager.request_blocks(
io,
peer_id,
hashes,
self.request_block_need_public(),
None,
preferred_node_type,
);
}
fn already_processed(&self, hash: &H256) -> bool {
if self.graph.contains_block(hash) {
return true;
}
if let Some(info) = self.graph.data_man.local_block_info_by_hash(hash) {
if info.get_status() == BlockStatus::Invalid {
return true;
}
if info.get_seq_num()
< self.graph.consensus.current_era_genesis_seq_num()
{
debug!(
"Ignore block in old era hash={:?}, seq={}, cur_era_seq={}",
hash,
info.get_seq_num(),
self.graph.consensus.current_era_genesis_seq_num()
);
return true;
}
if info.get_instance_id() == self.graph.data_man.get_instance_id() {
return true;
}
}
return false;
}
pub fn blocks_received(
&self, io: &dyn NetworkContext, requested_hashes: HashSet<H256>,
returned_blocks: HashSet<H256>, ask_full_block: bool,
peer: Option<NodeId>, delay: Option<Duration>,
preferred_node_type_for_block_request: Option<NodeType>,
) {
self.request_manager.blocks_received(
io,
requested_hashes,
returned_blocks,
ask_full_block,
peer,
self.request_block_need_public(),
delay,
preferred_node_type_for_block_request,
)
}
fn request_block_need_public(&self) -> bool {
self.catch_up_mode() && self.protocol_config.request_block_with_public
}
pub fn expire_block_gc(
&self, _io: &dyn NetworkContext, timeout: u64,
) -> Result<(), Error> {
if self.in_recover_from_db_phase() {
return Ok(());
}
self.graph.remove_expire_blocks(timeout);
Ok(())
}
pub fn is_block_queue_full(&self) -> bool {
self.recover_public_queue.is_full()
}
}
impl NetworkProtocolHandler for SynchronizationProtocolHandler {
fn minimum_supported_version(&self) -> ProtocolVersion {
let my_version = self.protocol_version.0;
if my_version > SYNCHRONIZATION_PROTOCOL_OLD_VERSIONS_TO_SUPPORT {
ProtocolVersion(
my_version - SYNCHRONIZATION_PROTOCOL_OLD_VERSIONS_TO_SUPPORT,
)
} else {
SYNC_PROTO_V1
}
}
fn initialize(&self, io: &dyn NetworkContext) {
io.register_timer(TX_TIMER, self.protocol_config.send_tx_period)
.expect("Error registering transactions timer");
io.register_timer(
CHECK_REQUEST_TIMER,
self.protocol_config.check_request_period,
)
.expect("Error registering check request timer");
io.register_timer(
HEARTBEAT_TIMER,
self.protocol_config.heartbeat_period_interval,
)
.expect("Error registering heartbeat timer");
io.register_timer(
BLOCK_CACHE_GC_TIMER,
self.protocol_config.block_cache_gc_period,
)
.expect("Error registering block_cache_gc timer");
io.register_timer(
CHECK_CATCH_UP_MODE_TIMER,
self.protocol_config.check_phase_change_period,
)
.expect("Error registering check_catch_up_mode timer");
io.register_timer(LOG_STATISTIC_TIMER, Duration::from_millis(5000))
.expect("Error registering log_statistics timer");
io.register_timer(
TOTAL_WEIGHT_IN_PAST_TIMER,
Duration::from_secs(BLOCK_PROPAGATION_DELAY * 2),
)
.expect("Error registering total_weight_in_past timer");
io.register_timer(CHECK_PEER_HEARTBEAT_TIMER, Duration::from_secs(60))
.expect("Error registering CHECK_PEER_HEARTBEAT_TIMER");
io.register_timer(
CHECK_FUTURE_BLOCK_TIMER,
Duration::from_millis(1000),
)
.expect("Error registering CHECK_FUTURE_BLOCK_TIMER");
io.register_timer(
EXPIRE_BLOCK_GC_TIMER,
self.protocol_config.expire_block_gc_period,
)
.expect("Error registering EXPIRE_BLOCK_GC_TIMER");
}
fn send_local_message(&self, io: &dyn NetworkContext, message: Vec<u8>) {
self.local_message
.dispatch(io, LocalMessageTask { message });
}
fn on_message(&self, io: &dyn NetworkContext, peer: &NodeId, raw: &[u8]) {
let (msg_id, rlp) = match decode_msg(raw) {
Some(msg) => msg,
None => {
return self.handle_error(
io,
peer,
msgid::INVALID,
Error::InvalidMessageFormat.into(),
)
}
};
debug!("on_message: peer={}, msgid={:?}", peer, msg_id);
self.dispatch_message(io, peer, msg_id.into(), rlp)
.unwrap_or_else(|e| self.handle_error(io, peer, msg_id.into(), e));
self.request_manager.send_pending_requests(io, peer);
}
fn on_work_dispatch(
&self, io: &dyn NetworkContext, work_type: HandlerWorkType,
) {
if work_type == SyncHandlerWorkType::RecoverPublic as HandlerWorkType {
if let Err(e) = self.on_blocks_inner_task(io) {
warn!("Error processing RecoverPublic task: {:?}", e);
}
} else if work_type
== SyncHandlerWorkType::LocalMessage as HandlerWorkType
{
self.on_local_message_task(io);
} else {
warn!("Unknown SyncHandlerWorkType");
}
}
fn on_peer_connected(
&self, io: &dyn NetworkContext, node_id: &NodeId,
peer_protocol_version: ProtocolVersion,
_pos_public_key: Option<(ConsensusPublicKey, ConsensusVRFPublicKey)>,
) {
debug!(
"Peer connected: peer={:?}, version={}",
node_id, peer_protocol_version
);
if let Err(e) = self.send_status(io, node_id, peer_protocol_version) {
debug!("Error sending status message: {:?}", e);
io.disconnect_peer(
node_id,
Some(UpdateNodeOperation::Failure),
"send status failed", );
} else {
self.syn
.handshaking_peers
.write()
.insert(*node_id, (peer_protocol_version, Instant::now()));
}
}
fn on_peer_disconnected(&self, io: &dyn NetworkContext, peer: &NodeId) {
debug!("Peer disconnected: peer={}", peer);
self.syn.peers.write().remove(peer);
self.syn.handshaking_peers.write().remove(peer);
self.request_manager.on_peer_disconnected(io, peer);
self.state_sync.on_peer_disconnected(&peer);
}
fn on_timeout(&self, io: &dyn NetworkContext, timer: TimerToken) {
trace!("Timeout: timer={:?}", timer);
match timer {
TX_TIMER => {
self.propagate_new_transactions(io);
}
CHECK_FUTURE_BLOCK_TIMER => {
self.check_future_blocks(io);
self.graph.check_not_ready_frontier(
self.insert_header_to_consensus(),
);
}
CHECK_REQUEST_TIMER => {
self.remove_expired_flying_request(io);
}
HEARTBEAT_TIMER => {
self.send_heartbeat(io);
}
BLOCK_CACHE_GC_TIMER => {
self.gc();
}
CHECK_CATCH_UP_MODE_TIMER => {
self.update_sync_phase(io);
}
LOG_STATISTIC_TIMER => {
self.log_statistics();
}
TOTAL_WEIGHT_IN_PAST_TIMER => {
self.update_total_weight_delta_heartbeat();
}
CHECK_PEER_HEARTBEAT_TIMER => {
let timeout_peers = self.syn.get_heartbeat_timeout_peers(
self.protocol_config.heartbeat_timeout,
);
for peer in timeout_peers {
io.disconnect_peer(
&peer,
Some(UpdateNodeOperation::Failure),
"sync heartbeat timeout", );
}
}
EXPIRE_BLOCK_GC_TIMER => {
self.expire_block_gc(
io,
self.protocol_config.sync_expire_block_timeout.as_secs(),
)
.ok();
}
_ => warn!("Unknown timer {} triggered.", timer),
}
}
}