cfxcore/sync/
synchronization_protocol_handler.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use super::{
6    random, request_manager::RequestManager, Error, SharedSynchronizationGraph,
7    SynchronizationState,
8};
9use crate::{
10    block_data_manager::BlockStatus,
11    light_protocol::Provider as LightProvider,
12    message::{decode_msg, Message, MsgId},
13    sync::{
14        message::{
15            handle_rlp_message, msgid, Context, DynamicCapability,
16            GetBlockHeadersResponse, Heartbeat, NewBlockHashes, StatusV2,
17            StatusV3, TransactionDigests,
18        },
19        request_manager::{try_get_block_hashes, Request},
20        state::SnapshotChunkSync,
21        synchronization_phases::{SyncPhaseType, SynchronizationPhaseManager},
22        synchronization_state::PeerFilter,
23        StateSyncConfiguration,
24        SYNCHRONIZATION_PROTOCOL_OLD_VERSIONS_TO_SUPPORT,
25        SYNCHRONIZATION_PROTOCOL_VERSION, SYNC_PROTO_V1, SYNC_PROTO_V2,
26    },
27    ConsensusGraph, NodeType,
28};
29use cfx_internal_common::ChainIdParamsDeprecated;
30use cfx_parameters::{block::MAX_BLOCK_SIZE_IN_BYTES, sync::*};
31use cfx_types::H256;
32use diem_types::validator_config::{ConsensusPublicKey, ConsensusVRFPublicKey};
33use malloc_size_of::{new_malloc_size_ops, MallocSizeOf};
34use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
35use metrics::{register_meter_with_group, Meter, MeterTimer};
36use network::{
37    node_table::NodeId, service::ProtocolVersion,
38    throttling::THROTTLING_SERVICE, Error as NetworkError, HandlerWorkType,
39    NetworkContext, NetworkProtocolHandler, UpdateNodeOperation,
40};
41use parking_lot::{Mutex, RwLock};
42use primitives::{Block, BlockHeader, EpochId, SignedTransaction};
43use rand::{prelude::SliceRandom, Rng};
44use rlp::Rlp;
45use std::{
46    cmp::{self, min},
47    collections::{BTreeMap, HashMap, HashSet, VecDeque},
48    sync::Arc,
49    time::{Duration, Instant, SystemTime, UNIX_EPOCH},
50};
51
52lazy_static! {
53    static ref TX_PROPAGATE_METER: Arc<dyn Meter> =
54        register_meter_with_group("system_metrics", "tx_propagate_set_size");
55    static ref TX_HASHES_PROPAGATE_METER: Arc<dyn Meter> =
56        register_meter_with_group(
57            "system_metrics",
58            "tx_hashes_propagate_set_size"
59        );
60    static ref BLOCK_RECOVER_TIMER: Arc<dyn Meter> =
61        register_meter_with_group("timer", "sync:recover_block");
62    static ref PROPAGATE_TX_TIMER: Arc<dyn Meter> =
63        register_meter_with_group("timer", "sync:propagate_tx_timer");
64}
65
66type TimerToken = usize;
67
68const TX_TIMER: TimerToken = 0;
69const CHECK_REQUEST_TIMER: TimerToken = 1;
70const BLOCK_CACHE_GC_TIMER: TimerToken = 2;
71const CHECK_CATCH_UP_MODE_TIMER: TimerToken = 3;
72const LOG_STATISTIC_TIMER: TimerToken = 4;
73const TOTAL_WEIGHT_IN_PAST_TIMER: TimerToken = 5;
74const CHECK_PEER_HEARTBEAT_TIMER: TimerToken = 6;
75const CHECK_FUTURE_BLOCK_TIMER: TimerToken = 7;
76const EXPIRE_BLOCK_GC_TIMER: TimerToken = 8;
77const HEARTBEAT_TIMER: TimerToken = 9;
78pub const CHECK_RPC_REQUEST_TIMER: TimerToken = 11;
79
80const MAX_TXS_BYTES_TO_PROPAGATE: usize = 1024 * 1024; // 1MB
81
82/// The maximum allowed gap between `best_epoch` and `latest_epoch_requested`.
83const EPOCH_SYNC_MAX_GAP_START: u64 = 20000;
84/// The max gap is increased if our best_epoch does not change after timeout.
85const EPOCH_SYNC_MAX_GAP_INCREASE: u64 = 5000;
86/// After 20 retries, the gap becomes 120000 epochs = 6 eras. This is usually
87/// larger than the number of epochs after a checkpoint and gives a bound
88/// of the memory usage to maintain downloaded blocks in Sync/Consensus Graph.
89const EPOCH_SYNC_MAX_RETRY_COUNT: u64 = 20;
90/// If not future epochs can be requested because of `EPOCH_SYNC_MAX_GAP`,
91/// after waiting this timeout we'll request from `best_epoch` again.
92const EPOCH_SYNC_RESTART_TIMEOUT_S: u64 = 60 * 10;
93const EPOCH_SYNC_MAX_INFLIGHT: u64 = 300;
94const EPOCH_SYNC_BATCH_SIZE: u64 = 30;
95const BLOCK_SYNC_MAX_INFLIGHT: usize = 1000;
96
97#[derive(Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq)]
98pub enum SyncHandlerWorkType {
99    RecoverPublic = 1,
100    LocalMessage = 2,
101}
102
103pub trait TaskSize {
104    fn size(&self) -> usize { 0 }
105
106    fn count(&self) -> usize { 1 }
107}
108
109/// FIFO queue to async execute tasks.
110pub struct AsyncTaskQueue<T: TaskSize> {
111    inner: RwLock<AsyncTaskQueueInner<T>>,
112    work_type: HandlerWorkType,
113
114    // The maximum number of elements in the queue.
115    // Note we do not drop elements even when the queue is full to
116    // keep the behavior of this queue consistent.
117    max_capacity: usize,
118
119    // Alpha for computing moving average.
120    alpha: f64,
121}
122
123struct AsyncTaskQueueInner<T: TaskSize> {
124    tasks: VecDeque<T>,
125    size: usize,
126    moving_average: f64,
127}
128
129impl<T: TaskSize> AsyncTaskQueue<T> {
130    fn new(work_type: SyncHandlerWorkType, max_capacity: usize) -> Self {
131        AsyncTaskQueue {
132            inner: RwLock::new(AsyncTaskQueueInner {
133                tasks: VecDeque::new(),
134                size: 0,
135                // Set to max value at start to avoid sending too many requests
136                // at the start.
137                moving_average: MAX_BLOCK_SIZE_IN_BYTES as f64,
138            }),
139            work_type: work_type as HandlerWorkType,
140            max_capacity,
141            // TODO: set a proper value.
142            alpha: 0.001,
143        }
144    }
145
146    pub fn dispatch(&self, io: &dyn NetworkContext, task: T) {
147        let mut inner = self.inner.write();
148        inner.size += task.size();
149        // Compute moving average.
150        if task.count() != 0 {
151            inner.moving_average = self.alpha
152                * (task.size() / task.count()) as f64
153                + (1.0 - self.alpha) * inner.moving_average;
154        }
155        io.dispatch_work(self.work_type);
156        inner.tasks.push_back(task);
157        trace!(
158            "AsyncTaskQueue dispatch: size={} average={}",
159            inner.size,
160            inner.moving_average,
161        );
162    }
163
164    fn pop(&self) -> Option<T> {
165        let mut inner = self.inner.write();
166        let task = inner.tasks.pop_front();
167        task.as_ref().map(|task| {
168            inner.size -= task.size();
169        });
170        trace!(
171            "AsyncTaskQueue pop: size={} average={}",
172            inner.size,
173            inner.moving_average,
174        );
175        task
176    }
177
178    fn size(&self) -> usize { self.inner.read().size }
179
180    pub fn is_full(&self) -> bool { self.size() >= self.max_capacity }
181
182    /// Return `true` if inflight insertion is successful.
183    pub fn estimated_available_count(&self) -> usize {
184        let inner = self.inner.read();
185        if inner.size >= self.max_capacity {
186            0
187        } else if inner.moving_average != 0.0 {
188            ((self.max_capacity - inner.size) as f64 / inner.moving_average)
189                as usize
190        } else {
191            // This should never happen.
192            self.max_capacity
193        }
194    }
195}
196
197#[derive(DeriveMallocSizeOf)]
198pub struct RecoverPublicTask {
199    blocks: Vec<Block>,
200    requested: HashSet<H256>,
201    delay: Option<Duration>,
202    failed_peer: NodeId,
203    compact: bool,
204}
205
206impl RecoverPublicTask {
207    pub fn new(
208        blocks: Vec<Block>, requested: HashSet<H256>, failed_peer: NodeId,
209        compact: bool, delay: Option<Duration>,
210    ) -> Self {
211        RecoverPublicTask {
212            blocks,
213            requested,
214            failed_peer,
215            compact,
216            delay,
217        }
218    }
219}
220
221impl TaskSize for RecoverPublicTask {
222    fn size(&self) -> usize {
223        let mut ops = new_malloc_size_ops();
224        self.size_of(&mut ops) + std::mem::size_of::<Self>()
225    }
226
227    fn count(&self) -> usize { self.blocks.len() }
228}
229
230pub struct LocalMessageTask {
231    message: Vec<u8>,
232}
233
234impl TaskSize for LocalMessageTask {}
235
236struct FutureBlockContainerInner {
237    capacity: usize,
238    size: usize,
239    container: BTreeMap<u64, HashSet<H256>>,
240
241    // The value is a tuple of the header corresponding to a hash and the peer
242    // that we receive this header from. Since a header is only broadcast
243    // after receiving its body, we should be able to receive the block body
244    // from the peer successfully.
245    hash_to_header_and_peer: HashMap<H256, (BlockHeader, NodeId)>,
246}
247
248impl FutureBlockContainerInner {
249    pub fn new(capacity: usize) -> Self {
250        FutureBlockContainerInner {
251            capacity,
252            size: 0,
253            container: BTreeMap::new(),
254            hash_to_header_and_peer: Default::default(),
255        }
256    }
257}
258
259pub struct FutureBlockContainer {
260    inner: RwLock<FutureBlockContainerInner>,
261}
262
263impl FutureBlockContainer {
264    pub fn new(capacity: usize) -> Self {
265        FutureBlockContainer {
266            inner: RwLock::new(FutureBlockContainerInner::new(capacity)),
267        }
268    }
269
270    pub fn insert(&self, header: BlockHeader, peer: NodeId) {
271        let inner = &mut *self.inner.write();
272        let header_hash = header.hash();
273        if inner.hash_to_header_and_peer.contains_key(&header_hash) {
274            return;
275        }
276        let entry = inner
277            .container
278            .entry(header.timestamp())
279            .or_insert(HashSet::new());
280        if !entry.contains(&header_hash) {
281            entry.insert(header_hash);
282            inner
283                .hash_to_header_and_peer
284                .insert(header_hash, (header, peer));
285            inner.size += 1;
286        }
287
288        if inner.size > inner.capacity {
289            let mut removed = false;
290            let mut empty_slots = Vec::new();
291            for entry in inner.container.iter_mut().rev() {
292                if entry.1.is_empty() {
293                    empty_slots.push(*entry.0);
294                    continue;
295                }
296
297                let hash = *entry.1.iter().next().unwrap();
298                entry.1.remove(&hash);
299                inner.hash_to_header_and_peer.remove(&hash);
300                removed = true;
301
302                if entry.1.is_empty() {
303                    empty_slots.push(*entry.0);
304                }
305                break;
306            }
307
308            if removed {
309                inner.size -= 1;
310            }
311
312            for slot in empty_slots {
313                inner.container.remove(&slot);
314            }
315        }
316    }
317
318    pub fn get_before(&self, timestamp: u64) -> Vec<(BlockHeader, NodeId)> {
319        let mut inner = self.inner.write();
320        let mut result = Vec::new();
321
322        loop {
323            let slot = if let Some(entry) = inner.container.iter().next() {
324                Some(*entry.0)
325            } else {
326                None
327            };
328
329            if slot.is_none() || slot.unwrap() > timestamp {
330                break;
331            }
332
333            let entry = inner.container.remove(&slot.unwrap()).unwrap();
334
335            for header_hash in entry {
336                result.push(inner.hash_to_header_and_peer.remove(&header_hash).expect(
337                    "hash and header are inserted/removed together atomically",
338                ));
339            }
340        }
341
342        result
343    }
344
345    pub fn contains(&self, header_hash: &H256) -> bool {
346        self.inner
347            .read()
348            .hash_to_header_and_peer
349            .contains_key(header_hash)
350    }
351}
352
353#[derive(DeriveMallocSizeOf)]
354pub struct SynchronizationProtocolHandler {
355    pub protocol_version: ProtocolVersion,
356
357    pub protocol_config: ProtocolConfiguration,
358    pub graph: SharedSynchronizationGraph,
359    pub syn: Arc<SynchronizationState>,
360    pub request_manager: Arc<RequestManager>,
361    /// The latest `(requested_epoch_number, request_time, old_best_epoch,
362    /// retry_count)`
363    pub latest_epoch_requested: Mutex<(u64, Instant, u64, u64)>,
364    #[ignore_malloc_size_of = "only stores reference to others"]
365    pub phase_manager: SynchronizationPhaseManager,
366    pub phase_manager_lock: Mutex<u32>,
367
368    // Worker task queue for recover public
369    #[ignore_malloc_size_of = "channels are not handled in MallocSizeOf"]
370    pub recover_public_queue: Arc<AsyncTaskQueue<RecoverPublicTask>>,
371
372    // Worker task queue for local message
373    #[ignore_malloc_size_of = "channels are not handled in MallocSizeOf"]
374    local_message: AsyncTaskQueue<LocalMessageTask>,
375
376    // state sync for any checkpoint
377    #[ignore_malloc_size_of = "not used on archive nodes"]
378    pub state_sync: Arc<SnapshotChunkSync>,
379
380    /// The epoch id of the remotely synchronized state.
381    /// This is always `None` for archive nodes.
382    pub synced_epoch_id: Mutex<Option<EpochId>>,
383
384    // provider for serving light protocol queries
385    light_provider: Arc<LightProvider>,
386}
387
388#[derive(Clone, Default, DeriveMallocSizeOf)]
389pub struct ProtocolConfiguration {
390    pub is_consortium: bool,
391    pub send_tx_period: Duration,
392    pub check_request_period: Duration,
393    pub check_phase_change_period: Duration,
394    pub heartbeat_period_interval: Duration,
395    pub heartbeat_timeout: Duration,
396    pub block_cache_gc_period: Duration,
397    pub expire_block_gc_period: Duration,
398    pub sync_expire_block_timeout: Duration,
399    pub headers_request_timeout: Duration,
400    pub blocks_request_timeout: Duration,
401    pub transaction_request_timeout: Duration,
402    pub snapshot_candidate_request_timeout: Duration,
403    pub snapshot_manifest_request_timeout: Duration,
404    pub snapshot_chunk_request_timeout: Duration,
405    pub tx_maintained_for_peer_timeout: Duration,
406    pub max_inflight_request_count: u64,
407    pub received_tx_index_maintain_timeout: Duration,
408    pub inflight_pending_tx_index_maintain_timeout: Duration,
409    pub request_block_with_public: bool,
410    pub max_trans_count_received_in_catch_up: u64,
411    pub min_peers_tx_propagation: usize,
412    pub max_peers_tx_propagation: usize,
413    pub max_downloading_chunks: usize,
414    pub max_downloading_chunk_attempts: usize,
415    pub test_mode: bool,
416    pub dev_mode: bool,
417    pub throttling_config_file: Option<String>,
418    pub chunk_size_byte: u64,
419    pub timeout_observing_period_s: u64,
420    pub max_allowed_timeout_in_observing_period: u64,
421    pub demote_peer_for_timeout: bool,
422    pub max_unprocessed_block_size: usize,
423    pub max_chunk_number_in_manifest: usize,
424    pub allow_phase_change_without_peer: bool,
425    pub min_phase_change_normal_peer_count: usize,
426    pub pos_genesis_pivot_decision: H256,
427    pub check_status_genesis: bool,
428
429    pub pos_started_as_voter: bool,
430}
431
432impl SynchronizationProtocolHandler {
433    pub fn new(
434        node_type: NodeType, protocol_config: ProtocolConfiguration,
435        state_sync_config: StateSyncConfiguration,
436        initial_sync_phase: SyncPhaseType,
437        sync_graph: SharedSynchronizationGraph,
438        light_provider: Arc<LightProvider>, consensus: Arc<ConsensusGraph>,
439    ) -> Self {
440        let sync_state = Arc::new(SynchronizationState::new(
441            protocol_config.is_consortium,
442            node_type,
443            protocol_config.allow_phase_change_without_peer,
444            protocol_config.min_phase_change_normal_peer_count,
445        ));
446        let recover_public_queue = Arc::new(AsyncTaskQueue::new(
447            SyncHandlerWorkType::RecoverPublic,
448            protocol_config.max_unprocessed_block_size,
449        ));
450        let request_manager = Arc::new(RequestManager::new(
451            &protocol_config,
452            sync_state.clone(),
453            recover_public_queue.clone(),
454        ));
455
456        let state_sync = Arc::new(SnapshotChunkSync::new(state_sync_config));
457
458        Self {
459            protocol_version: SYNCHRONIZATION_PROTOCOL_VERSION,
460            protocol_config,
461            graph: sync_graph.clone(),
462            syn: sync_state.clone(),
463            request_manager,
464            latest_epoch_requested: Mutex::new((0, Instant::now(), 0, 0)),
465            phase_manager: SynchronizationPhaseManager::new(
466                initial_sync_phase,
467                sync_state.clone(),
468                sync_graph.clone(),
469                state_sync.clone(),
470                consensus,
471            ),
472            phase_manager_lock: Mutex::new(0),
473            recover_public_queue,
474            local_message: AsyncTaskQueue::new(
475                SyncHandlerWorkType::LocalMessage,
476                10000000000, // TODO: Set a better capacity.
477            ),
478            state_sync,
479            synced_epoch_id: Default::default(),
480            light_provider,
481        }
482    }
483
484    pub fn node_type(&self) -> NodeType {
485        if self.syn.is_full_node() {
486            NodeType::Full
487        } else {
488            NodeType::Archive
489        }
490    }
491
492    pub fn is_consortium(&self) -> bool { self.protocol_config.is_consortium }
493
494    fn get_to_propagate_trans(&self) -> HashMap<H256, Arc<SignedTransaction>> {
495        self.graph.get_to_propagate_trans()
496    }
497
498    fn set_to_propagate_trans(
499        &self, transactions: HashMap<H256, Arc<SignedTransaction>>,
500    ) {
501        self.graph.set_to_propagate_trans(transactions);
502    }
503
504    pub fn catch_up_mode(&self) -> bool {
505        self.phase_manager.get_current_phase().phase_type()
506            != SyncPhaseType::Normal
507    }
508
509    pub fn in_recover_from_db_phase(&self) -> bool {
510        let current_phase = self.phase_manager.get_current_phase();
511        current_phase.phase_type()
512            == SyncPhaseType::CatchUpRecoverBlockHeaderFromDB
513            || current_phase.phase_type()
514                == SyncPhaseType::CatchUpFillBlockBodyPhase
515    }
516
517    pub fn need_requesting_blocks(&self) -> bool {
518        let current_phase = self.phase_manager.get_current_phase();
519        current_phase.phase_type() == SyncPhaseType::CatchUpSyncBlock
520            || current_phase.phase_type() == SyncPhaseType::Normal
521    }
522
523    pub fn need_block_from_archive_node(&self) -> bool {
524        let current_phase = self.phase_manager.get_current_phase();
525        current_phase.phase_type() == SyncPhaseType::CatchUpSyncBlock
526            && !self.syn.is_full_node()
527    }
528
529    pub fn preferred_peer_node_type_for_get_block(&self) -> Option<NodeType> {
530        if self.need_block_from_archive_node() {
531            Some(NodeType::Archive)
532        } else {
533            None
534        }
535    }
536
537    pub fn get_synchronization_graph(&self) -> SharedSynchronizationGraph {
538        self.graph.clone()
539    }
540
541    pub fn get_request_manager(&self) -> Arc<RequestManager> {
542        self.request_manager.clone()
543    }
544
545    pub fn append_received_transactions(
546        &self, transactions: Vec<Arc<SignedTransaction>>,
547    ) {
548        self.request_manager
549            .append_received_transactions(transactions);
550    }
551
552    fn dispatch_message(
553        &self, io: &dyn NetworkContext, peer: &NodeId, msg_id: MsgId, rlp: Rlp,
554    ) -> Result<(), Error> {
555        trace!("Dispatching message: peer={:?}, msg_id={:?}", peer, msg_id);
556        if !io.is_peer_self(peer) {
557            if !self.syn.contains_peer(peer) {
558                debug!(
559                    "dispatch_message: Peer does not exist: peer={} msg_id={}",
560                    peer, msg_id
561                );
562                // We may only receive status message from a peer not in
563                // `syn.peers`, and this peer should be in
564                // `syn.handshaking_peers`
565                if !self.syn.handshaking_peers.read().contains_key(peer)
566                    || (msg_id != msgid::STATUS_V3
567                        && msg_id != msgid::STATUS_V2)
568                {
569                    debug!("Message from unknown peer {:?}", msg_id);
570                    return Ok(());
571                }
572            } else {
573                self.syn.update_heartbeat(peer);
574            }
575        }
576
577        let ctx = Context {
578            node_id: *peer,
579            io,
580            manager: self,
581        };
582
583        if !handle_rlp_message(msg_id, &ctx, &rlp)? {
584            warn!("Unknown message: peer={:?} msgid={:?}", peer, msg_id);
585            let reason =
586                format!("unknown sync protocol message id {:?}", msg_id);
587            io.disconnect_peer(
588                peer,
589                Some(UpdateNodeOperation::Remove),
590                reason.as_str(),
591            );
592        }
593
594        Ok(())
595    }
596
597    /// Error handling for dispatched messages.
598    fn handle_error(
599        &self, io: &dyn NetworkContext, peer: &NodeId, msg_id: MsgId, e: Error,
600    ) {
601        let mut disconnect = true;
602        let mut warn = true;
603        let reason = format!("{}", e);
604        let error_reason = format!("{:?}", e);
605        let mut op = None;
606
607        // NOTE, DO NOT USE WILDCARD IN THE FOLLOWING MATCH STATEMENT!
608        // COMPILER WILL HELP TO FIND UNHANDLED ERROR CASES.
609        match e {
610            Error::InvalidBlock => op = Some(UpdateNodeOperation::Failure),
611            Error::InvalidGetBlockTxn(_) => {
612                op = Some(UpdateNodeOperation::Demotion)
613            }
614            Error::InvalidStatus(_) => op = Some(UpdateNodeOperation::Demotion),
615            Error::InvalidMessageFormat => {
616                // TODO: Shall we blacklist a node when the message format is
617                // wrong? maybe it's a different version of sync protocol?
618                op = Some(UpdateNodeOperation::Remove)
619            }
620            Error::UnknownPeer => {
621                warn = false;
622                op = Some(UpdateNodeOperation::Failure)
623            }
624            // TODO handle the unexpected response case (timeout or real invalid
625            // message type)
626            Error::UnexpectedResponse => {
627                op = Some(UpdateNodeOperation::Demotion)
628            }
629            Error::RequestNotFound => {
630                disconnect = false;
631                warn = false;
632            }
633            Error::InCatchUpMode(_) => {
634                disconnect = false;
635                warn = false;
636            }
637            Error::TooManyTrans => {}
638            Error::InvalidTimestamp => op = Some(UpdateNodeOperation::Demotion),
639            Error::InvalidSnapshotManifest(_) => {
640                op = Some(UpdateNodeOperation::Demotion)
641            }
642            Error::InvalidSnapshotChunk(_) => {
643                op = Some(UpdateNodeOperation::Demotion)
644            }
645            Error::EmptySnapshotChunk => disconnect = false,
646            Error::AlreadyThrottled(_) => {
647                op = Some(UpdateNodeOperation::Remove)
648            }
649            Error::Throttled(_, msg) => {
650                disconnect = false;
651
652                if let Err(e) = msg.send(io, peer) {
653                    error!("failed to send throttled packet: {:?}", e);
654                    disconnect = true;
655                }
656            }
657            Error::Decoder(_) => op = Some(UpdateNodeOperation::Remove),
658            Error::Io(_) => disconnect = false,
659            Error::Network(kind) => match kind {
660                network::Error::SendUnsupportedMessage { .. } => {
661                    unreachable!(
662                        "This is a bug in protocol version maintenance. {:?}",
663                        kind
664                    );
665                }
666
667                network::Error::MessageDeprecated { .. } => {
668                    op = Some(UpdateNodeOperation::Failure);
669                    error!(
670                        "Peer sent us a deprecated message {:?}. Either it's a bug \
671                        in protocol version maintenance or the peer is malicious.",
672                        kind
673                    );
674                }
675
676                network::Error::AddressParse => disconnect = false,
677                network::Error::AddressResolve(_) => disconnect = false,
678                network::Error::Auth => disconnect = false,
679                network::Error::BadProtocol => {
680                    op = Some(UpdateNodeOperation::Remove)
681                }
682                network::Error::BadAddr => disconnect = false,
683                network::Error::Decoder(_) => {
684                    op = Some(UpdateNodeOperation::Remove)
685                }
686                network::Error::Expired => disconnect = false,
687                network::Error::Disconnect(_) => disconnect = false,
688                network::Error::InvalidNodeId => disconnect = false,
689                network::Error::OversizedPacket => disconnect = false,
690                network::Error::Io(_) => disconnect = false,
691                network::Error::Throttling(_) => disconnect = false,
692                network::Error::SocketIo(_) => {
693                    op = Some(UpdateNodeOperation::Failure)
694                }
695                network::Error::Msg(_) => {
696                    op = Some(UpdateNodeOperation::Failure)
697                }
698            },
699            Error::Storage(_) => disconnect = false,
700            Error::Msg(_) => op = Some(UpdateNodeOperation::Failure),
701            Error::InternalError(_) => {}
702            Error::RpcCancelledByDisconnection => {}
703            Error::RpcTimeout => {}
704            Error::UnexpectedMessage(_) => {
705                op = Some(UpdateNodeOperation::Remove)
706            }
707            Error::NotSupported(_) => disconnect = false,
708        }
709
710        if warn {
711            warn!(
712                "Error while handling message, peer={}, msgid={:?}, error={}",
713                peer, msg_id, error_reason
714            );
715        } else {
716            debug!(
717                "Minor error while handling message, peer={}, msgid={:?}, error={}",
718                peer, msg_id, error_reason
719            );
720        }
721
722        if disconnect {
723            io.disconnect_peer(peer, op, reason.as_str());
724        }
725    }
726
727    pub fn start_sync(&self, io: &dyn NetworkContext) {
728        let current_phase_type =
729            self.phase_manager.get_current_phase().phase_type();
730        if current_phase_type == SyncPhaseType::CatchUpRecoverBlockHeaderFromDB
731            || current_phase_type == SyncPhaseType::CatchUpFillBlockBodyPhase
732        {
733            return;
734        }
735
736        if current_phase_type != SyncPhaseType::Normal {
737            self.request_epochs(io);
738            let best_peer_epoch = self.syn.best_peer_epoch().unwrap_or(0);
739            let my_best_epoch = self.graph.consensus.best_epoch_number();
740            if my_best_epoch + REQUEST_TERMINAL_EPOCH_LAG_THRESHOLD
741                >= best_peer_epoch
742            {
743                self.request_missing_terminals(io);
744            }
745        } else {
746            self.request_missing_terminals(io);
747        }
748    }
749
750    pub fn request_missing_terminals(&self, io: &dyn NetworkContext) {
751        let peers: Vec<NodeId> =
752            self.syn.peers.read().keys().cloned().collect();
753
754        let mut requested = HashSet::new();
755
756        let (_, era_genesis_height) =
757            self.graph.get_genesis_hash_and_height_in_current_era();
758        for peer in peers {
759            if let Ok(info) = self.syn.get_peer_info(&peer) {
760                if info.read().best_epoch < era_genesis_height {
761                    // This peer is probably in catch-up mode, so we do not need
762                    // to request these old terminal blocks.
763                    continue;
764                }
765                let terminals = {
766                    let mut info = info.write();
767                    let ts = info.latest_block_hashes.clone();
768                    info.latest_block_hashes.clear();
769                    ts
770                };
771
772                let to_request = terminals
773                    .difference(&requested)
774                    // We cannot filter out block headers with `data_man` here,
775                    // otherwise if we crash before inserting a terminal into
776                    // consensus, we will never process it
777                    // after restarting in the tests where
778                    // no new blocks are generated.
779                    .filter(|h| !self.graph.contains_block_header(&h))
780                    .cloned()
781                    .collect::<Vec<H256>>();
782
783                if terminals.len() > 0 {
784                    debug!("Requesting terminals {:?}", to_request);
785                }
786
787                self.request_block_headers(
788                    io,
789                    Some(peer),
790                    to_request.clone(),
791                    true, /* ignore_db */
792                );
793
794                requested.extend(to_request);
795            }
796        }
797
798        if requested.len() > 0 {
799            debug!("{:?} missing terminal block(s) requested", requested.len());
800        }
801    }
802
803    /// Request missing block bodies from random peers in batches.
804    pub fn request_block_bodies(&self, io: &dyn NetworkContext) {
805        let in_flight_blocks = self.request_manager.in_flight_blocks();
806        let to_request_blocks: Vec<_> = self
807            .graph
808            .inner
809            .read()
810            .block_to_fill_set
811            .difference(&in_flight_blocks)
812            .copied()
813            .collect();
814        let n_blocks_to_request = min(
815            BLOCK_SYNC_MAX_INFLIGHT - in_flight_blocks.len(),
816            to_request_blocks.len(),
817        );
818
819        // Use `MAX_BLOCKS_TO_SEND` as the batch size so the peer can respond
820        // with all blocks.
821        for block_chunk in to_request_blocks[0..n_blocks_to_request]
822            .chunks(MAX_BLOCKS_TO_SEND as usize)
823        {
824            self.request_blocks_without_check(io, None, block_chunk.to_vec());
825        }
826    }
827
828    // FIXME Use another function for block catch up. It should only use local
829    // epoch set and end with all consensus block retrieved, not related to
830    // median peer epoch.
831    pub fn request_epochs(&self, io: &dyn NetworkContext) {
832        // make sure only one thread can request new epochs at a time
833        let mut latest_requested = self.latest_epoch_requested.lock();
834
835        // We use median here instead of max, so w.h.p. we won't request all
836        // epoch sets from malicious peer.
837        // See https://github.com/Conflux-Chain/conflux-rust/issues/1466.
838        let median_peer_epoch =
839            self.syn.median_epoch_from_normal_peers().unwrap_or(0);
840        let my_best_epoch = self.graph.consensus.best_epoch_number();
841        let (
842            mut latest_requested_epoch,
843            latest_request_time,
844            old_best_epoch,
845            mut retry_count,
846        ) = *latest_requested;
847        // We have switched to the correct pivot chain, so we can reset epoch
848        // sync.
849        if old_best_epoch != my_best_epoch {
850            retry_count = 0;
851        }
852
853        // It's possible that there is a malicious heavy subtree that is heavier
854        // than the pivot chain within the next EPOCH_SYNC_MAX_GAP_START epochs.
855        // In this case, if we do not try to download more epochs, our
856        // best_epoch will the tip of the malicious subtree and will remain
857        // unchanged, meaning the syncing process will be blocked forever.
858        // Increase the syncing gap if our pivot chain remain unchanged.
859        let sync_max_gap = EPOCH_SYNC_MAX_GAP_START
860            + EPOCH_SYNC_MAX_GAP_INCREASE * retry_count;
861        // If the gap is too large, it means that the next epoch of
862        // `my_best_epoch` is missing, either because received
863        // epoch_set is wrong or we have too many epochs with
864        // blocks not received.
865        if latest_requested_epoch >= my_best_epoch + sync_max_gap {
866            if latest_request_time.elapsed()
867                < Duration::from_secs(EPOCH_SYNC_RESTART_TIMEOUT_S)
868            {
869                return;
870            } else {
871                // Restart from `my_best_epoch` to fix possible problems.
872                latest_requested_epoch = my_best_epoch;
873                if retry_count < EPOCH_SYNC_MAX_RETRY_COUNT {
874                    retry_count += 1;
875                }
876            }
877        }
878
879        while self.request_manager.num_epochs_in_flight()
880            < EPOCH_SYNC_MAX_INFLIGHT
881            && latest_requested_epoch < my_best_epoch + sync_max_gap
882            && (latest_requested_epoch < median_peer_epoch
883                || median_peer_epoch == 0)
884        {
885            let from = cmp::max(my_best_epoch, latest_requested_epoch) + 1;
886            // Check epochs from db
887            if let Some(epoch_hashes) =
888                self.graph.data_man.all_epoch_set_hashes_from_db(from)
889            {
890                debug!("Recovered epoch {} from db", from);
891                if self.need_requesting_blocks() {
892                    self.request_blocks(io, None, epoch_hashes);
893                } else {
894                    self.request_block_headers(
895                        io,
896                        None,
897                        epoch_hashes,
898                        true, /* ignore_db */
899                    );
900                }
901                latest_requested_epoch = from;
902                continue;
903            } else if median_peer_epoch == 0 {
904                // We have recovered all epochs from db, and there is no peer to
905                // request new epochs, so we should enter `Latest` phase
906                break;
907            }
908
909            // Epoch hashes are not in db, so should be requested from another
910            // peer
911            let peer = PeerFilter::new(msgid::GET_BLOCK_HASHES_BY_EPOCH)
912                .with_min_best_epoch(from)
913                .select(&self.syn);
914
915            // no peer has the epoch we need; try later
916            if peer.is_none() {
917                break;
918            }
919
920            let until = {
921                let max_to_send = EPOCH_SYNC_MAX_INFLIGHT.saturating_sub(
922                    self.request_manager.num_epochs_in_flight(),
923                );
924                let maybe_peer_info = self.syn.get_peer_info(&peer.unwrap());
925                if maybe_peer_info.is_err() {
926                    // The peer is disconnected after we chose it.
927                    // `latest_requested` is not updated, so we just continue to
928                    // try another peer.
929                    continue;
930                }
931
932                let best_of_this_peer =
933                    maybe_peer_info.unwrap().read().best_epoch;
934
935                let until = from + cmp::min(EPOCH_SYNC_BATCH_SIZE, max_to_send);
936                cmp::min(until, best_of_this_peer + 1)
937            };
938
939            let epochs = (from..until).collect::<Vec<u64>>();
940
941            debug!(
942                "requesting epochs [{}..{}]/{:?} from peer {:?}",
943                from,
944                until - 1,
945                median_peer_epoch,
946                peer
947            );
948
949            self.request_manager
950                .request_epoch_hashes(io, peer, epochs, None);
951            latest_requested_epoch = until - 1;
952        }
953        *latest_requested = (
954            latest_requested_epoch,
955            Instant::now(),
956            my_best_epoch,
957            retry_count,
958        );
959    }
960
961    pub fn request_block_headers(
962        &self, io: &dyn NetworkContext, peer: Option<NodeId>,
963        mut header_hashes: Vec<H256>, ignore_db: bool,
964    ) {
965        if !ignore_db {
966            header_hashes
967                .retain(|hash| !self.try_request_header_from_db(io, hash));
968        }
969        // Headers may have been inserted into sync graph before as dependent
970        // blocks
971        header_hashes.retain(|h| !self.graph.contains_block_header(h));
972        self.request_manager.request_block_headers(
973            io,
974            peer,
975            header_hashes,
976            None,
977        );
978    }
979
980    /// Try to get the block header from db. Return `true` if the block header
981    /// exists in db or is inserted before. Handle the block header if its
982    /// seq_num is less than that of the current era genesis.
983    fn try_request_header_from_db(
984        &self, io: &dyn NetworkContext, hash: &H256,
985    ) -> bool {
986        if self.graph.contains_block_header(hash) {
987            return true;
988        }
989
990        if let Some(info) = self.graph.data_man.local_block_info_by_hash(hash) {
991            if info.get_status() == BlockStatus::Invalid {
992                // this block was invalid before
993                return true;
994            }
995            if info.get_seq_num()
996                < self.graph.consensus.current_era_genesis_seq_num()
997            {
998                debug!("Ignore header in old era hash={:?}, seq={}, cur_era_seq={}", hash, info.get_seq_num(), self.graph.consensus.current_era_genesis_seq_num());
999                // The block is ordered before current era genesis, so we do
1000                // not need to process it
1001                return true;
1002            }
1003
1004            if info.get_instance_id() == self.graph.data_man.get_instance_id() {
1005                // This block header has already entered consensus
1006                // graph in this run.
1007                return true;
1008            }
1009        }
1010
1011        // FIXME: If there is no block info in db, whether we need to fetch
1012        // block header from db?
1013        if let Some(header) = self.graph.data_man.block_header_by_hash(hash) {
1014            debug!("Recovered header {:?} from db", hash);
1015            // Process headers from db
1016            let mut block_headers_resp = GetBlockHeadersResponse::default();
1017            block_headers_resp.request_id = 0;
1018            let mut headers = Vec::new();
1019            headers.push((*header).clone());
1020            block_headers_resp.headers = headers;
1021
1022            let ctx = Context {
1023                node_id: io.self_node_id(),
1024                io,
1025                manager: self,
1026            };
1027
1028            ctx.send_response(&block_headers_resp)
1029                .expect("send response should not be error");
1030            return true;
1031        } else {
1032            return false;
1033        }
1034    }
1035
1036    fn on_blocks_inner(
1037        &self, io: &dyn NetworkContext, task: RecoverPublicTask,
1038    ) -> Result<(), Error> {
1039        let mut need_to_relay = Vec::new();
1040        let mut received_blocks = HashSet::new();
1041        let mut dependent_hashes = HashSet::new();
1042        for mut block in task.blocks {
1043            let hash = block.hash();
1044            if self.graph.contains_block(&hash) {
1045                // A block might be loaded from db and sent to the local queue
1046                // multiple times, but we should only process it and request its
1047                // dependence once.
1048                received_blocks.insert(hash);
1049                continue;
1050            }
1051            if !task.requested.contains(&hash) {
1052                warn!("Response has not requested block {:?}", hash);
1053                continue;
1054            }
1055            if let Err(e) = self.graph.data_man.recover_block(&mut block) {
1056                warn!("Recover block {:?} with error {:?}", hash, e);
1057                continue;
1058            }
1059
1060            match self.graph.block_header_by_hash(&hash) {
1061                Some(header) => block.block_header = header,
1062                None => {
1063                    // Blocks may be synced directly without inserting headers
1064                    // before. We can only enter this case if we are catching
1065                    // up. We do not need to relay headers
1066                    // during catch-up.
1067                    let (insert_result, _) = self.graph.insert_block_header(
1068                        &mut block.block_header,
1069                        true,  // need_to_verify
1070                        false, // bench_mode
1071                        false, // insert_into_consensus
1072                        true,  // persistent
1073                    );
1074                    if !insert_result.should_process_body() {
1075                        // If the header is invalid or the block has been
1076                        // processed in consensus, we do not need to request the
1077                        // block, so just mark it
1078                        // received.
1079                        received_blocks.insert(hash);
1080                        continue;
1081                    }
1082
1083                    // Request missing dependent blocks. This is needed because
1084                    // they may not be in any epoch_set because of out of stable
1085                    // era, so they will not be retrieved by
1086                    // request_epochs.
1087                    let parent = block.block_header.parent_hash();
1088                    if !self.graph.contains_block(parent) {
1089                        dependent_hashes.insert(*parent);
1090                    }
1091                    for referee in block.block_header.referee_hashes() {
1092                        if !self.graph.contains_block(referee) {
1093                            dependent_hashes.insert(*referee);
1094                        }
1095                    }
1096                }
1097            }
1098            let insert_result = self.graph.insert_block(
1099                block, true,  /* need_to_verify */
1100                true,  /* persistent */
1101                false, /* recover_from_db */
1102            );
1103            if insert_result.is_valid() {
1104                // The requested block is correctly received
1105                received_blocks.insert(hash);
1106            }
1107            if insert_result.should_relay() {
1108                need_to_relay.push(hash);
1109            }
1110        }
1111        let mut filter =
1112            PeerFilter::new(msgid::GET_BLOCKS).exclude(task.failed_peer);
1113        if let Some(preferred_note_type) =
1114            self.preferred_peer_node_type_for_get_block()
1115        {
1116            filter = filter.with_preferred_node_type(preferred_note_type);
1117        }
1118        let chosen_peer = filter.select(&self.syn);
1119        self.blocks_received(
1120            io,
1121            task.requested,
1122            received_blocks.clone(),
1123            !task.compact,
1124            chosen_peer.clone(),
1125            task.delay,
1126            self.preferred_peer_node_type_for_get_block(),
1127        );
1128        if self.graph.inner.read().locked_for_catchup {
1129            self.request_block_bodies(io);
1130            Ok(())
1131        } else {
1132            let missing_dependencies = dependent_hashes
1133                .difference(&received_blocks)
1134                .map(Clone::clone)
1135                .collect();
1136            self.request_blocks(io, chosen_peer, missing_dependencies);
1137            self.relay_blocks(io, need_to_relay)
1138        }
1139    }
1140
1141    fn on_blocks_inner_task(
1142        &self, io: &dyn NetworkContext,
1143    ) -> Result<(), Error> {
1144        let task = self.recover_public_queue.pop().unwrap();
1145        let received_blocks: Vec<H256> =
1146            task.blocks.iter().map(|block| block.hash()).collect();
1147        self.request_manager
1148            .remove_net_inflight_blocks(received_blocks.iter());
1149        self.request_manager
1150            .remove_net_inflight_blocks(task.requested.iter());
1151        self.on_blocks_inner(io, task)
1152    }
1153
1154    fn on_local_message_task(&self, io: &dyn NetworkContext) {
1155        let task = self.local_message.pop().unwrap();
1156        self.on_message(io, &io.self_node_id(), task.message.as_slice());
1157    }
1158
1159    pub fn on_mined_block(&self, mut block: Block) {
1160        let hash = block.block_header.hash();
1161        info!("Mined block {:?} header={:?}", hash, block.block_header);
1162        let parent_hash = *block.block_header.parent_hash();
1163
1164        assert!(self.graph.contains_block_header(&parent_hash));
1165        if self.graph.contains_block_header(&hash) {
1166            warn!("Mined an duplicate block, the mining power is wasted!");
1167            return;
1168        }
1169        self.graph.insert_block_header(
1170            &mut block.block_header,
1171            false,
1172            false,
1173            false,
1174            true,
1175        );
1176        // Do not need to look at the result since this new block will be
1177        // broadcast to peers.
1178        self.graph.insert_block(
1179            block, false, /* need_to_verify */
1180            true,  /* persistent */
1181            false, /* recover_from_db */
1182        );
1183    }
1184
1185    fn broadcast_message(
1186        &self, io: &dyn NetworkContext, skip_id: &NodeId, msg: &dyn Message,
1187    ) -> Result<(), NetworkError> {
1188        let mut peer_ids: Vec<NodeId> = self
1189            .syn
1190            .peers
1191            .read()
1192            .keys()
1193            .filter(|&id| *id != *skip_id)
1194            .map(|x| *x)
1195            .collect();
1196
1197        let throttle_ratio = THROTTLING_SERVICE.read().get_throttling_ratio();
1198        let num_total = peer_ids.len();
1199        let num_allowed = (num_total as f64 * throttle_ratio) as usize;
1200
1201        if num_total > num_allowed {
1202            debug!("apply throttling for broadcast_message, total: {}, allowed: {}", num_total, num_allowed);
1203            peer_ids.shuffle(&mut random::new());
1204            peer_ids.truncate(num_allowed);
1205        }
1206
1207        // We only broadcast message which version matches the peer.
1208        // When there two version of the same message to broadcast,
1209        // and their valid versions are disjoint, each peer will receive
1210        // at most one of the message.
1211        let msg_version_introduced = msg.version_introduced();
1212        let mut msg_version_valid_till = msg.version_valid_till();
1213        if msg_version_valid_till == self.protocol_version {
1214            msg_version_valid_till = ProtocolVersion(std::u8::MAX);
1215        }
1216        for id in peer_ids {
1217            let peer_version = self.syn.get_peer_version(&id)?;
1218            if peer_version >= msg_version_introduced
1219                && peer_version <= msg_version_valid_till
1220            {
1221                msg.send(io, &id)?;
1222            }
1223        }
1224
1225        Ok(())
1226    }
1227
1228    fn produce_status_message_v2(&self) -> StatusV2 {
1229        let best_info = self.graph.consensus.best_info();
1230        let chain_id = ChainIdParamsDeprecated {
1231            chain_id: best_info.best_chain_id().in_native_space(),
1232        };
1233        let terminal_hashes = best_info.bounded_terminal_block_hashes.clone();
1234
1235        StatusV2 {
1236            chain_id,
1237            genesis_hash: self.graph.data_man.true_genesis.hash(),
1238            best_epoch: best_info.best_epoch_number,
1239            terminal_block_hashes: terminal_hashes,
1240        }
1241    }
1242
1243    fn produce_status_message_v3(&self) -> StatusV3 {
1244        let best_info = self.graph.consensus.best_info();
1245        let chain_id = ChainIdParamsDeprecated {
1246            chain_id: best_info.best_chain_id().in_native_space(),
1247        };
1248        let terminal_hashes = best_info.bounded_terminal_block_hashes.clone();
1249
1250        StatusV3 {
1251            chain_id,
1252            node_type: self.node_type(),
1253            genesis_hash: self.graph.data_man.true_genesis.hash(),
1254            best_epoch: best_info.best_epoch_number,
1255            terminal_block_hashes: terminal_hashes,
1256        }
1257    }
1258
1259    fn produce_heartbeat_message(&self) -> Heartbeat {
1260        let best_info = self.graph.consensus.best_info();
1261        let terminal_hashes = best_info.bounded_terminal_block_hashes.clone();
1262
1263        Heartbeat {
1264            best_epoch: best_info.best_epoch_number,
1265            terminal_block_hashes: terminal_hashes,
1266        }
1267    }
1268
1269    fn send_status(
1270        &self, io: &dyn NetworkContext, peer: &NodeId,
1271        peer_protocol_version: ProtocolVersion,
1272    ) -> Result<(), NetworkError> {
1273        if peer_protocol_version == SYNC_PROTO_V2 {
1274            let status_message = self.produce_status_message_v2();
1275            debug!("Sending status message to {}: {:?}", peer, status_message);
1276            status_message.send(io, peer)
1277        } else {
1278            let status_message = self.produce_status_message_v3();
1279            debug!("Sending status message to {}: {:?}", peer, status_message);
1280            status_message.send(io, peer)
1281        }
1282    }
1283
1284    fn broadcast_heartbeat(&self, io: &dyn NetworkContext) {
1285        let status_message = self.produce_status_message_v2();
1286        let heartbeat_message = self.produce_heartbeat_message();
1287        debug!("Broadcasting heartbeat message: {:?}", heartbeat_message);
1288
1289        if self
1290            .broadcast_message(io, &Default::default(), &heartbeat_message)
1291            .is_err()
1292        {
1293            warn!("Error broadcasting heartbeat message");
1294        }
1295        if self
1296            .broadcast_message(io, &Default::default(), &status_message)
1297            .is_err()
1298        {
1299            warn!("Error broadcasting status message");
1300        }
1301    }
1302
1303    pub fn relay_blocks(
1304        &self, io: &dyn NetworkContext, need_to_relay: Vec<H256>,
1305    ) -> Result<(), Error> {
1306        if !need_to_relay.is_empty() && !self.catch_up_mode() {
1307            let new_block_hash_msg: Box<dyn Message> =
1308                Box::new(NewBlockHashes {
1309                    block_hashes: need_to_relay.clone(),
1310                });
1311            self.broadcast_message(
1312                io,
1313                &Default::default(),
1314                new_block_hash_msg.as_ref(),
1315            )
1316            .unwrap_or_else(|e| {
1317                warn!("Error broadcasting blocks, err={:?}", e);
1318            });
1319
1320            self.light_provider
1321                .relay_hashes(need_to_relay)
1322                .unwrap_or_else(|e| {
1323                    warn!("Error relaying blocks to light provider: {:?}", e);
1324                });
1325        }
1326
1327        Ok(())
1328    }
1329
1330    fn select_peers_for_transactions(&self) -> Vec<NodeId> {
1331        let num_peers = self.syn.peers.read().len() as f64;
1332        let throttle_ratio = THROTTLING_SERVICE.read().get_throttling_ratio();
1333
1334        // min(sqrt(x)/x, throttle_ratio)
1335        let chosen_size = (num_peers.powf(-0.5).min(throttle_ratio) * num_peers)
1336            .round() as usize;
1337
1338        let num_peers = chosen_size
1339            .max(self.protocol_config.min_peers_tx_propagation)
1340            .min(self.protocol_config.max_peers_tx_propagation);
1341
1342        PeerFilter::new(msgid::TRANSACTION_DIGESTS)
1343            .with_cap(DynamicCapability::NormalPhase(true))
1344            .select_n(num_peers, &self.syn)
1345    }
1346
1347    fn propagate_transactions_to_peers(
1348        &self, io: &dyn NetworkContext, lucky_peers: Vec<NodeId>,
1349    ) {
1350        let _timer = MeterTimer::time_func(PROPAGATE_TX_TIMER.as_ref());
1351        if lucky_peers.is_empty() {
1352            return;
1353        }
1354
1355        // 29 since the remaining bytes is 29.
1356        let mut nonces: Vec<(u64, u64)> = (0..lucky_peers.len())
1357            .map(|_| (rand::rng().random(), rand::rng().random()))
1358            .collect();
1359
1360        let mut short_ids_part: Vec<Vec<u8>> = vec![vec![]; lucky_peers.len()];
1361        let mut tx_hashes_part: Vec<H256> = vec![];
1362        let (short_ids_transactions, tx_hashes_transactions) = {
1363            let mut transactions = self.get_to_propagate_trans();
1364            if transactions.is_empty() {
1365                return;
1366            }
1367
1368            let mut total_tx_bytes = 0;
1369            let mut short_ids_transactions: Vec<Arc<SignedTransaction>> =
1370                Vec::new();
1371            let mut tx_hashes_transactions: Vec<Arc<SignedTransaction>> =
1372                Vec::new();
1373
1374            let received_pool =
1375                self.request_manager.received_transactions.read();
1376            for (_, tx) in transactions.iter() {
1377                total_tx_bytes += tx.rlp_size();
1378                if total_tx_bytes >= MAX_TXS_BYTES_TO_PROPAGATE {
1379                    break;
1380                }
1381                if received_pool.group_overflow_from_tx_hash(&tx.hash()) {
1382                    tx_hashes_transactions.push(tx.clone());
1383                } else {
1384                    short_ids_transactions.push(tx.clone());
1385                }
1386            }
1387
1388            if short_ids_transactions.len() + tx_hashes_transactions.len()
1389                != transactions.len()
1390            {
1391                for tx in short_ids_transactions.iter() {
1392                    transactions.remove(&tx.hash);
1393                }
1394                for tx in tx_hashes_transactions.iter() {
1395                    transactions.remove(&tx.hash);
1396                }
1397                self.set_to_propagate_trans(transactions);
1398            }
1399
1400            (short_ids_transactions, tx_hashes_transactions)
1401        };
1402        debug!(
1403            "Send short ids:{}, Send tx hashes:{}",
1404            short_ids_transactions.len(),
1405            tx_hashes_transactions.len()
1406        );
1407        for tx in &short_ids_transactions {
1408            for i in 0..lucky_peers.len() {
1409                //consist of [one random position byte, and last three
1410                // bytes]
1411                TransactionDigests::append_short_id(
1412                    &mut short_ids_part[i],
1413                    nonces[i].0,
1414                    nonces[i].1,
1415                    &tx.hash(),
1416                );
1417            }
1418        }
1419        let mut sent_transactions = short_ids_transactions.clone();
1420        if !tx_hashes_transactions.is_empty() {
1421            TX_HASHES_PROPAGATE_METER.mark(tx_hashes_transactions.len());
1422            for tx in &tx_hashes_transactions {
1423                TransactionDigests::append_tx_hash(
1424                    &mut tx_hashes_part,
1425                    tx.hash(),
1426                );
1427            }
1428            sent_transactions.extend(tx_hashes_transactions.clone());
1429        }
1430
1431        TX_PROPAGATE_METER.mark(sent_transactions.len());
1432
1433        if sent_transactions.len() == 0 {
1434            return;
1435        }
1436
1437        debug!(
1438            "Sent {} transaction ids to {} peers.",
1439            sent_transactions.len(),
1440            lucky_peers.len()
1441        );
1442
1443        let window_index = self
1444            .request_manager
1445            .append_sent_transactions(sent_transactions);
1446
1447        let mut resend_flag = false;
1448        for i in 0..lucky_peers.len() {
1449            let peer_id = lucky_peers[i];
1450            let (key1, key2) = nonces.pop().unwrap();
1451            let tx_msg = TransactionDigests::new(
1452                window_index,
1453                key1,
1454                key2,
1455                short_ids_part.pop().unwrap(),
1456                tx_hashes_part.clone(),
1457            );
1458            match tx_msg.send(io, &peer_id) {
1459                Ok(_) => {
1460                    trace!(
1461                        "{:02} <- Transactions ({} entries)",
1462                        peer_id,
1463                        tx_msg.len()
1464                    );
1465                }
1466                Err(e) => {
1467                    warn!(
1468                        "failed to propagate transaction ids to peer, id: {}, err: {}",
1469                        peer_id, e
1470                    );
1471                    resend_flag = true;
1472                }
1473            }
1474        }
1475
1476        if resend_flag {
1477            let mut resend_transactions: HashMap<H256, Arc<SignedTransaction>> =
1478                HashMap::new();
1479            for tx in short_ids_transactions {
1480                resend_transactions.insert(tx.hash, tx.clone());
1481            }
1482            for tx in tx_hashes_transactions {
1483                resend_transactions.insert(tx.hash, tx.clone());
1484            }
1485            self.set_to_propagate_trans(resend_transactions);
1486        }
1487    }
1488
1489    pub fn check_future_blocks(&self, io: &dyn NetworkContext) {
1490        let now_timestamp = SystemTime::now()
1491            .duration_since(UNIX_EPOCH)
1492            .unwrap()
1493            .as_secs();
1494
1495        let mut missed_body_block_hashes = HashMap::new();
1496        let mut need_to_relay = HashSet::new();
1497        let headers = self.graph.future_blocks.get_before(now_timestamp);
1498
1499        if headers.is_empty() {
1500            return;
1501        }
1502
1503        for (mut header, peer) in headers {
1504            let hash = header.hash();
1505            let (insert_result, to_relay) = self.graph.insert_block_header(
1506                &mut header,
1507                true,
1508                false,
1509                self.insert_header_to_consensus(),
1510                true,
1511            );
1512            if insert_result.is_new_valid() {
1513                need_to_relay.extend(to_relay);
1514
1515                // check block body
1516                if !self.graph.contains_block(&hash) {
1517                    // There are no duplicate headers in `future_blocks`, so
1518                    // using Vec for each peer is enough.
1519                    missed_body_block_hashes
1520                        .entry(peer)
1521                        .or_insert(Vec::new())
1522                        .push(hash);
1523                }
1524            }
1525        }
1526
1527        for (peer, missing_hashes) in missed_body_block_hashes {
1528            // request missing blocks from the peer where we receive their
1529            // headers.
1530            self.request_missing_blocks(io, Some(peer), missing_hashes);
1531        }
1532
1533        // relay if necessary
1534        self.relay_blocks(io, need_to_relay.into_iter().collect())
1535            .ok();
1536    }
1537
1538    /// If we are in `SyncHeaders` or `CatchUpCheckpoint` phase, we should
1539    /// insert graph-ready block headers to sync graph directly.
1540    /// For `CatchUpCheckpoint`, this is needed to move the target checkpoint,
1541    /// and avoid being blocked on a stale checkpoint that no peers can serve.
1542    pub fn insert_header_to_consensus(&self) -> bool {
1543        let current_phase = self.phase_manager.get_current_phase();
1544        matches!(
1545            current_phase.phase_type(),
1546            SyncPhaseType::CatchUpSyncBlockHeader
1547                | SyncPhaseType::CatchUpCheckpoint
1548        )
1549    }
1550
1551    pub fn propagate_new_transactions(&self, io: &dyn NetworkContext) {
1552        if self.syn.peers.read().is_empty() || self.catch_up_mode() {
1553            if self.protocol_config.dev_mode {
1554                // In single-node dev mode, just clear to_propagate_trans.
1555                self.get_to_propagate_trans();
1556            }
1557            return;
1558        }
1559
1560        let peers = self.select_peers_for_transactions();
1561        self.propagate_transactions_to_peers(io, peers);
1562    }
1563
1564    pub fn remove_expired_flying_request(&self, io: &dyn NetworkContext) {
1565        self.request_manager.resend_timeout_requests(io);
1566        let cancelled_requests = self.request_manager.resend_waiting_requests(
1567            io,
1568            !self.catch_up_mode(),
1569            self.need_block_from_archive_node(),
1570        );
1571        self.handle_cancelled_requests(cancelled_requests);
1572    }
1573
1574    /// Remove the blocks in `cancelled_requests` and their future set from sync
1575    /// graph, so if they are needed in the future, they will be requested
1576    /// again.
1577    fn handle_cancelled_requests(
1578        &self, cancelled_requests: Vec<Box<dyn Request>>,
1579    ) {
1580        let mut to_remove_blocks = HashSet::new();
1581        for request in cancelled_requests {
1582            // We do not need to handle `GetBlockHeaders` because if a header is
1583            // not received, the block will not exist in our sync
1584            // graph.
1585            if let Some(block_hashes) = try_get_block_hashes(&request) {
1586                for hash in block_hashes {
1587                    to_remove_blocks.insert(*hash);
1588                }
1589            }
1590        }
1591        self.graph.remove_blocks_and_future(&to_remove_blocks);
1592    }
1593
1594    pub fn send_heartbeat(&self, io: &dyn NetworkContext) {
1595        self.broadcast_heartbeat(io);
1596    }
1597
1598    fn gc(&self) {
1599        self.graph.data_man.cache_gc();
1600        self.graph
1601            .data_man
1602            .database_gc(self.graph.consensus.best_epoch_number())
1603    }
1604
1605    fn log_statistics(&self) { self.graph.log_statistics(); }
1606
1607    fn update_total_weight_delta_heartbeat(&self) {
1608        self.graph.update_total_weight_delta_heartbeat();
1609    }
1610
1611    pub fn update_sync_phase(&self, io: &dyn NetworkContext) {
1612        match self.phase_manager_lock.try_lock() {
1613            Some(_pm_lock) => {
1614                self.phase_manager.try_initialize(io, self);
1615                loop {
1616                    // Allow multiple phase changes in one round.
1617                    let current_phase = self.phase_manager.get_current_phase();
1618                    let next_phase_type = current_phase.next(io, self);
1619                    if current_phase.phase_type() != next_phase_type {
1620                        // Phase changed
1621                        self.phase_manager.change_phase_to(
1622                            next_phase_type,
1623                            io,
1624                            self,
1625                        );
1626                    } else {
1627                        break;
1628                    }
1629                }
1630            }
1631            None => {
1632                debug!("update_sync_phase: phase_manager locked by another IO Worker");
1633                return;
1634            }
1635        }
1636
1637        let catch_up_mode = self.catch_up_mode();
1638        let mut need_notify = Vec::new();
1639        for (peer, state) in self.syn.peers.read().iter() {
1640            let mut state = state.write();
1641            if !state
1642                .notified_capabilities
1643                .contains(DynamicCapability::NormalPhase(!catch_up_mode))
1644            {
1645                state.received_transaction_count = 0;
1646                state
1647                    .notified_capabilities
1648                    .insert(DynamicCapability::NormalPhase(!catch_up_mode));
1649                need_notify.push(*peer);
1650            }
1651        }
1652        info!(
1653            "Catch-up mode: {}, latest epoch: {} missing_bodies: {}",
1654            catch_up_mode,
1655            self.graph.consensus.best_epoch_number(),
1656            self.graph.inner.read().block_to_fill_set.len()
1657        );
1658
1659        DynamicCapability::NormalPhase(!catch_up_mode)
1660            .broadcast_with_peers(io, need_notify);
1661    }
1662
1663    pub fn request_missing_blocks(
1664        &self, io: &dyn NetworkContext, peer_id: Option<NodeId>,
1665        hashes: Vec<H256>,
1666    ) {
1667        let catch_up_mode = self.catch_up_mode();
1668        if catch_up_mode {
1669            self.request_blocks(io, peer_id, hashes);
1670        } else {
1671            self.request_manager
1672                .request_compact_blocks(io, peer_id, hashes, None);
1673        }
1674    }
1675
1676    pub fn request_blocks(
1677        &self, io: &dyn NetworkContext, peer_id: Option<NodeId>,
1678        mut hashes: Vec<H256>,
1679    ) {
1680        hashes.retain(|hash| !self.already_processed(hash));
1681        self.request_blocks_without_check(io, peer_id, hashes)
1682    }
1683
1684    pub fn request_blocks_without_check(
1685        &self, io: &dyn NetworkContext, peer_id: Option<NodeId>,
1686        hashes: Vec<H256>,
1687    ) {
1688        let preferred_node_type = self.preferred_peer_node_type_for_get_block();
1689        self.request_manager.request_blocks(
1690            io,
1691            peer_id,
1692            hashes,
1693            self.request_block_need_public(),
1694            None,
1695            preferred_node_type,
1696        );
1697    }
1698
1699    /// Try to get the block from db. Return `true` if the block exists in db or
1700    /// is inserted before. Handle the block if its seq_num is less
1701    /// than that of the current era genesis.
1702    fn already_processed(&self, hash: &H256) -> bool {
1703        if self.graph.contains_block(hash) {
1704            return true;
1705        }
1706
1707        if let Some(info) = self.graph.data_man.local_block_info_by_hash(hash) {
1708            if info.get_status() == BlockStatus::Invalid {
1709                // this block is invalid before
1710                return true;
1711            }
1712            if info.get_seq_num()
1713                < self.graph.consensus.current_era_genesis_seq_num()
1714            {
1715                debug!(
1716                    "Ignore block in old era hash={:?}, seq={}, cur_era_seq={}",
1717                    hash,
1718                    info.get_seq_num(),
1719                    self.graph.consensus.current_era_genesis_seq_num()
1720                );
1721                // The block is ordered before current era genesis, so we do
1722                // not need to process it
1723                return true;
1724            }
1725
1726            if info.get_instance_id() == self.graph.data_man.get_instance_id() {
1727                // This block has already entered consensus graph
1728                // in this run.
1729                return true;
1730            }
1731        }
1732        return false;
1733    }
1734
1735    pub fn blocks_received(
1736        &self, io: &dyn NetworkContext, requested_hashes: HashSet<H256>,
1737        returned_blocks: HashSet<H256>, ask_full_block: bool,
1738        peer: Option<NodeId>, delay: Option<Duration>,
1739        preferred_node_type_for_block_request: Option<NodeType>,
1740    ) {
1741        self.request_manager.blocks_received(
1742            io,
1743            requested_hashes,
1744            returned_blocks,
1745            ask_full_block,
1746            peer,
1747            self.request_block_need_public(),
1748            delay,
1749            preferred_node_type_for_block_request,
1750        )
1751    }
1752
1753    fn request_block_need_public(&self) -> bool {
1754        self.catch_up_mode() && self.protocol_config.request_block_with_public
1755    }
1756
1757    pub fn expire_block_gc(
1758        &self, _io: &dyn NetworkContext, timeout: u64,
1759    ) -> Result<(), Error> {
1760        if self.in_recover_from_db_phase() {
1761            // In recover_from_db phase, this will be done at the end of
1762            // recovery.
1763            return Ok(());
1764        }
1765        self.graph.remove_expire_blocks(timeout);
1766        Ok(())
1767    }
1768
1769    pub fn is_block_queue_full(&self) -> bool {
1770        self.recover_public_queue.is_full()
1771    }
1772}
1773
1774impl NetworkProtocolHandler for SynchronizationProtocolHandler {
1775    fn minimum_supported_version(&self) -> ProtocolVersion {
1776        let my_version = self.protocol_version.0;
1777        if my_version > SYNCHRONIZATION_PROTOCOL_OLD_VERSIONS_TO_SUPPORT {
1778            ProtocolVersion(
1779                my_version - SYNCHRONIZATION_PROTOCOL_OLD_VERSIONS_TO_SUPPORT,
1780            )
1781        } else {
1782            SYNC_PROTO_V1
1783        }
1784    }
1785
1786    fn initialize(&self, io: &dyn NetworkContext) {
1787        io.register_timer(TX_TIMER, self.protocol_config.send_tx_period)
1788            .expect("Error registering transactions timer");
1789        io.register_timer(
1790            CHECK_REQUEST_TIMER,
1791            self.protocol_config.check_request_period,
1792        )
1793        .expect("Error registering check request timer");
1794        io.register_timer(
1795            HEARTBEAT_TIMER,
1796            self.protocol_config.heartbeat_period_interval,
1797        )
1798        .expect("Error registering heartbeat timer");
1799        io.register_timer(
1800            BLOCK_CACHE_GC_TIMER,
1801            self.protocol_config.block_cache_gc_period,
1802        )
1803        .expect("Error registering block_cache_gc timer");
1804        io.register_timer(
1805            CHECK_CATCH_UP_MODE_TIMER,
1806            self.protocol_config.check_phase_change_period,
1807        )
1808        .expect("Error registering check_catch_up_mode timer");
1809        io.register_timer(LOG_STATISTIC_TIMER, Duration::from_millis(5000))
1810            .expect("Error registering log_statistics timer");
1811        io.register_timer(
1812            TOTAL_WEIGHT_IN_PAST_TIMER,
1813            Duration::from_secs(BLOCK_PROPAGATION_DELAY * 2),
1814        )
1815        .expect("Error registering total_weight_in_past timer");
1816        io.register_timer(CHECK_PEER_HEARTBEAT_TIMER, Duration::from_secs(60))
1817            .expect("Error registering CHECK_PEER_HEARTBEAT_TIMER");
1818        io.register_timer(
1819            CHECK_FUTURE_BLOCK_TIMER,
1820            Duration::from_millis(1000),
1821        )
1822        .expect("Error registering CHECK_FUTURE_BLOCK_TIMER");
1823        io.register_timer(
1824            EXPIRE_BLOCK_GC_TIMER,
1825            self.protocol_config.expire_block_gc_period,
1826        )
1827        .expect("Error registering EXPIRE_BLOCK_GC_TIMER");
1828    }
1829
1830    fn send_local_message(&self, io: &dyn NetworkContext, message: Vec<u8>) {
1831        self.local_message
1832            .dispatch(io, LocalMessageTask { message });
1833    }
1834
1835    fn on_message(&self, io: &dyn NetworkContext, peer: &NodeId, raw: &[u8]) {
1836        let (msg_id, rlp) = match decode_msg(raw) {
1837            Some(msg) => msg,
1838            None => {
1839                return self.handle_error(
1840                    io,
1841                    peer,
1842                    msgid::INVALID,
1843                    Error::InvalidMessageFormat.into(),
1844                )
1845            }
1846        };
1847
1848        debug!("on_message: peer={}, msgid={:?}", peer, msg_id);
1849
1850        self.dispatch_message(io, peer, msg_id.into(), rlp)
1851            .unwrap_or_else(|e| self.handle_error(io, peer, msg_id.into(), e));
1852
1853        // TODO: Only call when the message is a Response. But maybe not worth
1854        // doing since the check for available request_id is cheap.
1855        self.request_manager.send_pending_requests(io, peer);
1856    }
1857
1858    fn on_work_dispatch(
1859        &self, io: &dyn NetworkContext, work_type: HandlerWorkType,
1860    ) {
1861        if work_type == SyncHandlerWorkType::RecoverPublic as HandlerWorkType {
1862            if let Err(e) = self.on_blocks_inner_task(io) {
1863                warn!("Error processing RecoverPublic task: {:?}", e);
1864            }
1865        } else if work_type
1866            == SyncHandlerWorkType::LocalMessage as HandlerWorkType
1867        {
1868            self.on_local_message_task(io);
1869        } else {
1870            warn!("Unknown SyncHandlerWorkType");
1871        }
1872    }
1873
1874    fn on_peer_connected(
1875        &self, io: &dyn NetworkContext, node_id: &NodeId,
1876        peer_protocol_version: ProtocolVersion,
1877        _pos_public_key: Option<(ConsensusPublicKey, ConsensusVRFPublicKey)>,
1878    ) {
1879        debug!(
1880            "Peer connected: peer={:?}, version={}",
1881            node_id, peer_protocol_version
1882        );
1883        if let Err(e) = self.send_status(io, node_id, peer_protocol_version) {
1884            debug!("Error sending status message: {:?}", e);
1885            io.disconnect_peer(
1886                node_id,
1887                Some(UpdateNodeOperation::Failure),
1888                "send status failed", /* reason */
1889            );
1890        } else {
1891            self.syn
1892                .handshaking_peers
1893                .write()
1894                .insert(*node_id, (peer_protocol_version, Instant::now()));
1895        }
1896    }
1897
1898    fn on_peer_disconnected(&self, io: &dyn NetworkContext, peer: &NodeId) {
1899        debug!("Peer disconnected: peer={}", peer);
1900        self.syn.peers.write().remove(peer);
1901        self.syn.handshaking_peers.write().remove(peer);
1902        self.request_manager.on_peer_disconnected(io, peer);
1903        self.state_sync.on_peer_disconnected(&peer);
1904    }
1905
1906    fn on_timeout(&self, io: &dyn NetworkContext, timer: TimerToken) {
1907        trace!("Timeout: timer={:?}", timer);
1908        match timer {
1909            TX_TIMER => {
1910                self.propagate_new_transactions(io);
1911            }
1912            CHECK_FUTURE_BLOCK_TIMER => {
1913                self.check_future_blocks(io);
1914                self.graph.check_not_ready_frontier(
1915                    self.insert_header_to_consensus(),
1916                );
1917            }
1918            CHECK_REQUEST_TIMER => {
1919                self.remove_expired_flying_request(io);
1920            }
1921            HEARTBEAT_TIMER => {
1922                self.send_heartbeat(io);
1923            }
1924            BLOCK_CACHE_GC_TIMER => {
1925                self.gc();
1926            }
1927            CHECK_CATCH_UP_MODE_TIMER => {
1928                self.update_sync_phase(io);
1929            }
1930            LOG_STATISTIC_TIMER => {
1931                self.log_statistics();
1932            }
1933            TOTAL_WEIGHT_IN_PAST_TIMER => {
1934                self.update_total_weight_delta_heartbeat();
1935            }
1936            CHECK_PEER_HEARTBEAT_TIMER => {
1937                let timeout_peers = self.syn.get_heartbeat_timeout_peers(
1938                    self.protocol_config.heartbeat_timeout,
1939                );
1940                for peer in timeout_peers {
1941                    io.disconnect_peer(
1942                        &peer,
1943                        Some(UpdateNodeOperation::Failure),
1944                        "sync heartbeat timeout", /* reason */
1945                    );
1946                }
1947            }
1948            EXPIRE_BLOCK_GC_TIMER => {
1949                // remove expire blocks every `expire_block_gc_period`
1950                // TODO Parameterize this timeout.
1951                // Set to twice expire period to ensure that stale blocks will
1952                // exist in the frontier across two consecutive GC.
1953                self.expire_block_gc(
1954                    io,
1955                    self.protocol_config.sync_expire_block_timeout.as_secs(),
1956                )
1957                .ok();
1958            }
1959            _ => warn!("Unknown timer {} triggered.", timer),
1960        }
1961    }
1962}