1use super::{
6 random, request_manager::RequestManager, Error, SharedSynchronizationGraph,
7 SynchronizationState,
8};
9use crate::{
10 block_data_manager::BlockStatus,
11 light_protocol::Provider as LightProvider,
12 message::{decode_msg, Message, MsgId},
13 sync::{
14 message::{
15 handle_rlp_message, msgid, Context, DynamicCapability,
16 GetBlockHeadersResponse, Heartbeat, NewBlockHashes, StatusV2,
17 StatusV3, TransactionDigests,
18 },
19 request_manager::{try_get_block_hashes, Request},
20 state::SnapshotChunkSync,
21 synchronization_phases::{SyncPhaseType, SynchronizationPhaseManager},
22 synchronization_state::PeerFilter,
23 StateSyncConfiguration,
24 SYNCHRONIZATION_PROTOCOL_OLD_VERSIONS_TO_SUPPORT,
25 SYNCHRONIZATION_PROTOCOL_VERSION, SYNC_PROTO_V1, SYNC_PROTO_V2,
26 },
27 ConsensusGraph, NodeType,
28};
29use cfx_internal_common::ChainIdParamsDeprecated;
30use cfx_parameters::{block::MAX_BLOCK_SIZE_IN_BYTES, sync::*};
31use cfx_types::H256;
32use diem_types::validator_config::{ConsensusPublicKey, ConsensusVRFPublicKey};
33use malloc_size_of::{new_malloc_size_ops, MallocSizeOf};
34use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
35use metrics::{register_meter_with_group, Meter, MeterTimer};
36use network::{
37 node_table::NodeId, service::ProtocolVersion,
38 throttling::THROTTLING_SERVICE, Error as NetworkError, HandlerWorkType,
39 NetworkContext, NetworkProtocolHandler, UpdateNodeOperation,
40};
41use parking_lot::{Mutex, RwLock};
42use primitives::{Block, BlockHeader, EpochId, SignedTransaction};
43use rand::{prelude::SliceRandom, Rng};
44use rlp::Rlp;
45use std::{
46 cmp::{self, min},
47 collections::{BTreeMap, HashMap, HashSet, VecDeque},
48 sync::Arc,
49 time::{Duration, Instant, SystemTime, UNIX_EPOCH},
50};
51
52lazy_static! {
53 static ref TX_PROPAGATE_METER: Arc<dyn Meter> =
54 register_meter_with_group("system_metrics", "tx_propagate_set_size");
55 static ref TX_HASHES_PROPAGATE_METER: Arc<dyn Meter> =
56 register_meter_with_group(
57 "system_metrics",
58 "tx_hashes_propagate_set_size"
59 );
60 static ref BLOCK_RECOVER_TIMER: Arc<dyn Meter> =
61 register_meter_with_group("timer", "sync:recover_block");
62 static ref PROPAGATE_TX_TIMER: Arc<dyn Meter> =
63 register_meter_with_group("timer", "sync:propagate_tx_timer");
64}
65
66type TimerToken = usize;
67
68const TX_TIMER: TimerToken = 0;
69const CHECK_REQUEST_TIMER: TimerToken = 1;
70const BLOCK_CACHE_GC_TIMER: TimerToken = 2;
71const CHECK_CATCH_UP_MODE_TIMER: TimerToken = 3;
72const LOG_STATISTIC_TIMER: TimerToken = 4;
73const TOTAL_WEIGHT_IN_PAST_TIMER: TimerToken = 5;
74const CHECK_PEER_HEARTBEAT_TIMER: TimerToken = 6;
75const CHECK_FUTURE_BLOCK_TIMER: TimerToken = 7;
76const EXPIRE_BLOCK_GC_TIMER: TimerToken = 8;
77const HEARTBEAT_TIMER: TimerToken = 9;
78pub const CHECK_RPC_REQUEST_TIMER: TimerToken = 11;
79
80const MAX_TXS_BYTES_TO_PROPAGATE: usize = 1024 * 1024; const EPOCH_SYNC_MAX_GAP_START: u64 = 20000;
84const EPOCH_SYNC_MAX_GAP_INCREASE: u64 = 5000;
86const EPOCH_SYNC_MAX_RETRY_COUNT: u64 = 20;
90const EPOCH_SYNC_RESTART_TIMEOUT_S: u64 = 60 * 10;
93const EPOCH_SYNC_MAX_INFLIGHT: u64 = 300;
94const EPOCH_SYNC_BATCH_SIZE: u64 = 30;
95const BLOCK_SYNC_MAX_INFLIGHT: usize = 1000;
96
97#[derive(Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq)]
98pub enum SyncHandlerWorkType {
99 RecoverPublic = 1,
100 LocalMessage = 2,
101}
102
103pub trait TaskSize {
104 fn size(&self) -> usize { 0 }
105
106 fn count(&self) -> usize { 1 }
107}
108
109pub struct AsyncTaskQueue<T: TaskSize> {
111 inner: RwLock<AsyncTaskQueueInner<T>>,
112 work_type: HandlerWorkType,
113
114 max_capacity: usize,
118
119 alpha: f64,
121}
122
123struct AsyncTaskQueueInner<T: TaskSize> {
124 tasks: VecDeque<T>,
125 size: usize,
126 moving_average: f64,
127}
128
129impl<T: TaskSize> AsyncTaskQueue<T> {
130 fn new(work_type: SyncHandlerWorkType, max_capacity: usize) -> Self {
131 AsyncTaskQueue {
132 inner: RwLock::new(AsyncTaskQueueInner {
133 tasks: VecDeque::new(),
134 size: 0,
135 moving_average: MAX_BLOCK_SIZE_IN_BYTES as f64,
138 }),
139 work_type: work_type as HandlerWorkType,
140 max_capacity,
141 alpha: 0.001,
143 }
144 }
145
146 pub fn dispatch(&self, io: &dyn NetworkContext, task: T) {
147 let mut inner = self.inner.write();
148 inner.size += task.size();
149 if task.count() != 0 {
151 inner.moving_average = self.alpha
152 * (task.size() / task.count()) as f64
153 + (1.0 - self.alpha) * inner.moving_average;
154 }
155 io.dispatch_work(self.work_type);
156 inner.tasks.push_back(task);
157 trace!(
158 "AsyncTaskQueue dispatch: size={} average={}",
159 inner.size,
160 inner.moving_average,
161 );
162 }
163
164 fn pop(&self) -> Option<T> {
165 let mut inner = self.inner.write();
166 let task = inner.tasks.pop_front();
167 task.as_ref().map(|task| {
168 inner.size -= task.size();
169 });
170 trace!(
171 "AsyncTaskQueue pop: size={} average={}",
172 inner.size,
173 inner.moving_average,
174 );
175 task
176 }
177
178 fn size(&self) -> usize { self.inner.read().size }
179
180 pub fn is_full(&self) -> bool { self.size() >= self.max_capacity }
181
182 pub fn estimated_available_count(&self) -> usize {
184 let inner = self.inner.read();
185 if inner.size >= self.max_capacity {
186 0
187 } else if inner.moving_average != 0.0 {
188 ((self.max_capacity - inner.size) as f64 / inner.moving_average)
189 as usize
190 } else {
191 self.max_capacity
193 }
194 }
195}
196
197#[derive(DeriveMallocSizeOf)]
198pub struct RecoverPublicTask {
199 blocks: Vec<Block>,
200 requested: HashSet<H256>,
201 delay: Option<Duration>,
202 failed_peer: NodeId,
203 compact: bool,
204}
205
206impl RecoverPublicTask {
207 pub fn new(
208 blocks: Vec<Block>, requested: HashSet<H256>, failed_peer: NodeId,
209 compact: bool, delay: Option<Duration>,
210 ) -> Self {
211 RecoverPublicTask {
212 blocks,
213 requested,
214 failed_peer,
215 compact,
216 delay,
217 }
218 }
219}
220
221impl TaskSize for RecoverPublicTask {
222 fn size(&self) -> usize {
223 let mut ops = new_malloc_size_ops();
224 self.size_of(&mut ops) + std::mem::size_of::<Self>()
225 }
226
227 fn count(&self) -> usize { self.blocks.len() }
228}
229
230pub struct LocalMessageTask {
231 message: Vec<u8>,
232}
233
234impl TaskSize for LocalMessageTask {}
235
236struct FutureBlockContainerInner {
237 capacity: usize,
238 size: usize,
239 container: BTreeMap<u64, HashSet<H256>>,
240
241 hash_to_header_and_peer: HashMap<H256, (BlockHeader, NodeId)>,
246}
247
248impl FutureBlockContainerInner {
249 pub fn new(capacity: usize) -> Self {
250 FutureBlockContainerInner {
251 capacity,
252 size: 0,
253 container: BTreeMap::new(),
254 hash_to_header_and_peer: Default::default(),
255 }
256 }
257}
258
259pub struct FutureBlockContainer {
260 inner: RwLock<FutureBlockContainerInner>,
261}
262
263impl FutureBlockContainer {
264 pub fn new(capacity: usize) -> Self {
265 FutureBlockContainer {
266 inner: RwLock::new(FutureBlockContainerInner::new(capacity)),
267 }
268 }
269
270 pub fn insert(&self, header: BlockHeader, peer: NodeId) {
271 let inner = &mut *self.inner.write();
272 let header_hash = header.hash();
273 if inner.hash_to_header_and_peer.contains_key(&header_hash) {
274 return;
275 }
276 let entry = inner
277 .container
278 .entry(header.timestamp())
279 .or_insert(HashSet::new());
280 if !entry.contains(&header_hash) {
281 entry.insert(header_hash);
282 inner
283 .hash_to_header_and_peer
284 .insert(header_hash, (header, peer));
285 inner.size += 1;
286 }
287
288 if inner.size > inner.capacity {
289 let mut removed = false;
290 let mut empty_slots = Vec::new();
291 for entry in inner.container.iter_mut().rev() {
292 if entry.1.is_empty() {
293 empty_slots.push(*entry.0);
294 continue;
295 }
296
297 let hash = *entry.1.iter().next().unwrap();
298 entry.1.remove(&hash);
299 inner.hash_to_header_and_peer.remove(&hash);
300 removed = true;
301
302 if entry.1.is_empty() {
303 empty_slots.push(*entry.0);
304 }
305 break;
306 }
307
308 if removed {
309 inner.size -= 1;
310 }
311
312 for slot in empty_slots {
313 inner.container.remove(&slot);
314 }
315 }
316 }
317
318 pub fn get_before(&self, timestamp: u64) -> Vec<(BlockHeader, NodeId)> {
319 let mut inner = self.inner.write();
320 let mut result = Vec::new();
321
322 loop {
323 let slot = if let Some(entry) = inner.container.iter().next() {
324 Some(*entry.0)
325 } else {
326 None
327 };
328
329 if slot.is_none() || slot.unwrap() > timestamp {
330 break;
331 }
332
333 let entry = inner.container.remove(&slot.unwrap()).unwrap();
334
335 for header_hash in entry {
336 result.push(inner.hash_to_header_and_peer.remove(&header_hash).expect(
337 "hash and header are inserted/removed together atomically",
338 ));
339 }
340 }
341
342 result
343 }
344
345 pub fn contains(&self, header_hash: &H256) -> bool {
346 self.inner
347 .read()
348 .hash_to_header_and_peer
349 .contains_key(header_hash)
350 }
351}
352
353#[derive(DeriveMallocSizeOf)]
354pub struct SynchronizationProtocolHandler {
355 pub protocol_version: ProtocolVersion,
356
357 pub protocol_config: ProtocolConfiguration,
358 pub graph: SharedSynchronizationGraph,
359 pub syn: Arc<SynchronizationState>,
360 pub request_manager: Arc<RequestManager>,
361 pub latest_epoch_requested: Mutex<(u64, Instant, u64, u64)>,
364 #[ignore_malloc_size_of = "only stores reference to others"]
365 pub phase_manager: SynchronizationPhaseManager,
366 pub phase_manager_lock: Mutex<u32>,
367
368 #[ignore_malloc_size_of = "channels are not handled in MallocSizeOf"]
370 pub recover_public_queue: Arc<AsyncTaskQueue<RecoverPublicTask>>,
371
372 #[ignore_malloc_size_of = "channels are not handled in MallocSizeOf"]
374 local_message: AsyncTaskQueue<LocalMessageTask>,
375
376 #[ignore_malloc_size_of = "not used on archive nodes"]
378 pub state_sync: Arc<SnapshotChunkSync>,
379
380 pub synced_epoch_id: Mutex<Option<EpochId>>,
383
384 light_provider: Arc<LightProvider>,
386}
387
388#[derive(Clone, Default, DeriveMallocSizeOf)]
389pub struct ProtocolConfiguration {
390 pub is_consortium: bool,
391 pub send_tx_period: Duration,
392 pub check_request_period: Duration,
393 pub check_phase_change_period: Duration,
394 pub heartbeat_period_interval: Duration,
395 pub heartbeat_timeout: Duration,
396 pub block_cache_gc_period: Duration,
397 pub expire_block_gc_period: Duration,
398 pub sync_expire_block_timeout: Duration,
399 pub headers_request_timeout: Duration,
400 pub blocks_request_timeout: Duration,
401 pub transaction_request_timeout: Duration,
402 pub snapshot_candidate_request_timeout: Duration,
403 pub snapshot_manifest_request_timeout: Duration,
404 pub snapshot_chunk_request_timeout: Duration,
405 pub tx_maintained_for_peer_timeout: Duration,
406 pub max_inflight_request_count: u64,
407 pub received_tx_index_maintain_timeout: Duration,
408 pub inflight_pending_tx_index_maintain_timeout: Duration,
409 pub request_block_with_public: bool,
410 pub max_trans_count_received_in_catch_up: u64,
411 pub min_peers_tx_propagation: usize,
412 pub max_peers_tx_propagation: usize,
413 pub max_downloading_chunks: usize,
414 pub max_downloading_chunk_attempts: usize,
415 pub test_mode: bool,
416 pub dev_mode: bool,
417 pub throttling_config_file: Option<String>,
418 pub chunk_size_byte: u64,
419 pub timeout_observing_period_s: u64,
420 pub max_allowed_timeout_in_observing_period: u64,
421 pub demote_peer_for_timeout: bool,
422 pub max_unprocessed_block_size: usize,
423 pub max_chunk_number_in_manifest: usize,
424 pub allow_phase_change_without_peer: bool,
425 pub min_phase_change_normal_peer_count: usize,
426 pub pos_genesis_pivot_decision: H256,
427 pub check_status_genesis: bool,
428
429 pub pos_started_as_voter: bool,
430}
431
432impl SynchronizationProtocolHandler {
433 pub fn new(
434 node_type: NodeType, protocol_config: ProtocolConfiguration,
435 state_sync_config: StateSyncConfiguration,
436 initial_sync_phase: SyncPhaseType,
437 sync_graph: SharedSynchronizationGraph,
438 light_provider: Arc<LightProvider>, consensus: Arc<ConsensusGraph>,
439 ) -> Self {
440 let sync_state = Arc::new(SynchronizationState::new(
441 protocol_config.is_consortium,
442 node_type,
443 protocol_config.allow_phase_change_without_peer,
444 protocol_config.min_phase_change_normal_peer_count,
445 ));
446 let recover_public_queue = Arc::new(AsyncTaskQueue::new(
447 SyncHandlerWorkType::RecoverPublic,
448 protocol_config.max_unprocessed_block_size,
449 ));
450 let request_manager = Arc::new(RequestManager::new(
451 &protocol_config,
452 sync_state.clone(),
453 recover_public_queue.clone(),
454 ));
455
456 let state_sync = Arc::new(SnapshotChunkSync::new(state_sync_config));
457
458 Self {
459 protocol_version: SYNCHRONIZATION_PROTOCOL_VERSION,
460 protocol_config,
461 graph: sync_graph.clone(),
462 syn: sync_state.clone(),
463 request_manager,
464 latest_epoch_requested: Mutex::new((0, Instant::now(), 0, 0)),
465 phase_manager: SynchronizationPhaseManager::new(
466 initial_sync_phase,
467 sync_state.clone(),
468 sync_graph.clone(),
469 state_sync.clone(),
470 consensus,
471 ),
472 phase_manager_lock: Mutex::new(0),
473 recover_public_queue,
474 local_message: AsyncTaskQueue::new(
475 SyncHandlerWorkType::LocalMessage,
476 10000000000, ),
478 state_sync,
479 synced_epoch_id: Default::default(),
480 light_provider,
481 }
482 }
483
484 pub fn node_type(&self) -> NodeType {
485 if self.syn.is_full_node() {
486 NodeType::Full
487 } else {
488 NodeType::Archive
489 }
490 }
491
492 pub fn is_consortium(&self) -> bool { self.protocol_config.is_consortium }
493
494 fn get_to_propagate_trans(&self) -> HashMap<H256, Arc<SignedTransaction>> {
495 self.graph.get_to_propagate_trans()
496 }
497
498 fn set_to_propagate_trans(
499 &self, transactions: HashMap<H256, Arc<SignedTransaction>>,
500 ) {
501 self.graph.set_to_propagate_trans(transactions);
502 }
503
504 pub fn catch_up_mode(&self) -> bool {
505 self.phase_manager.get_current_phase().phase_type()
506 != SyncPhaseType::Normal
507 }
508
509 pub fn in_recover_from_db_phase(&self) -> bool {
510 let current_phase = self.phase_manager.get_current_phase();
511 current_phase.phase_type()
512 == SyncPhaseType::CatchUpRecoverBlockHeaderFromDB
513 || current_phase.phase_type()
514 == SyncPhaseType::CatchUpFillBlockBodyPhase
515 }
516
517 pub fn need_requesting_blocks(&self) -> bool {
518 let current_phase = self.phase_manager.get_current_phase();
519 current_phase.phase_type() == SyncPhaseType::CatchUpSyncBlock
520 || current_phase.phase_type() == SyncPhaseType::Normal
521 }
522
523 pub fn need_block_from_archive_node(&self) -> bool {
524 let current_phase = self.phase_manager.get_current_phase();
525 current_phase.phase_type() == SyncPhaseType::CatchUpSyncBlock
526 && !self.syn.is_full_node()
527 }
528
529 pub fn preferred_peer_node_type_for_get_block(&self) -> Option<NodeType> {
530 if self.need_block_from_archive_node() {
531 Some(NodeType::Archive)
532 } else {
533 None
534 }
535 }
536
537 pub fn get_synchronization_graph(&self) -> SharedSynchronizationGraph {
538 self.graph.clone()
539 }
540
541 pub fn get_request_manager(&self) -> Arc<RequestManager> {
542 self.request_manager.clone()
543 }
544
545 pub fn append_received_transactions(
546 &self, transactions: Vec<Arc<SignedTransaction>>,
547 ) {
548 self.request_manager
549 .append_received_transactions(transactions);
550 }
551
552 fn dispatch_message(
553 &self, io: &dyn NetworkContext, peer: &NodeId, msg_id: MsgId, rlp: Rlp,
554 ) -> Result<(), Error> {
555 trace!("Dispatching message: peer={:?}, msg_id={:?}", peer, msg_id);
556 if !io.is_peer_self(peer) {
557 if !self.syn.contains_peer(peer) {
558 debug!(
559 "dispatch_message: Peer does not exist: peer={} msg_id={}",
560 peer, msg_id
561 );
562 if !self.syn.handshaking_peers.read().contains_key(peer)
566 || (msg_id != msgid::STATUS_V3
567 && msg_id != msgid::STATUS_V2)
568 {
569 debug!("Message from unknown peer {:?}", msg_id);
570 return Ok(());
571 }
572 } else {
573 self.syn.update_heartbeat(peer);
574 }
575 }
576
577 let ctx = Context {
578 node_id: *peer,
579 io,
580 manager: self,
581 };
582
583 if !handle_rlp_message(msg_id, &ctx, &rlp)? {
584 warn!("Unknown message: peer={:?} msgid={:?}", peer, msg_id);
585 let reason =
586 format!("unknown sync protocol message id {:?}", msg_id);
587 io.disconnect_peer(
588 peer,
589 Some(UpdateNodeOperation::Remove),
590 reason.as_str(),
591 );
592 }
593
594 Ok(())
595 }
596
597 fn handle_error(
599 &self, io: &dyn NetworkContext, peer: &NodeId, msg_id: MsgId, e: Error,
600 ) {
601 let mut disconnect = true;
602 let mut warn = true;
603 let reason = format!("{}", e);
604 let error_reason = format!("{:?}", e);
605 let mut op = None;
606
607 match e {
610 Error::InvalidBlock => op = Some(UpdateNodeOperation::Failure),
611 Error::InvalidGetBlockTxn(_) => {
612 op = Some(UpdateNodeOperation::Demotion)
613 }
614 Error::InvalidStatus(_) => op = Some(UpdateNodeOperation::Demotion),
615 Error::InvalidMessageFormat => {
616 op = Some(UpdateNodeOperation::Remove)
619 }
620 Error::UnknownPeer => {
621 warn = false;
622 op = Some(UpdateNodeOperation::Failure)
623 }
624 Error::UnexpectedResponse => {
627 op = Some(UpdateNodeOperation::Demotion)
628 }
629 Error::RequestNotFound => {
630 disconnect = false;
631 warn = false;
632 }
633 Error::InCatchUpMode(_) => {
634 disconnect = false;
635 warn = false;
636 }
637 Error::TooManyTrans => {}
638 Error::InvalidTimestamp => op = Some(UpdateNodeOperation::Demotion),
639 Error::InvalidSnapshotManifest(_) => {
640 op = Some(UpdateNodeOperation::Demotion)
641 }
642 Error::InvalidSnapshotChunk(_) => {
643 op = Some(UpdateNodeOperation::Demotion)
644 }
645 Error::EmptySnapshotChunk => disconnect = false,
646 Error::AlreadyThrottled(_) => {
647 op = Some(UpdateNodeOperation::Remove)
648 }
649 Error::Throttled(_, msg) => {
650 disconnect = false;
651
652 if let Err(e) = msg.send(io, peer) {
653 error!("failed to send throttled packet: {:?}", e);
654 disconnect = true;
655 }
656 }
657 Error::Decoder(_) => op = Some(UpdateNodeOperation::Remove),
658 Error::Io(_) => disconnect = false,
659 Error::Network(kind) => match kind {
660 network::Error::SendUnsupportedMessage { .. } => {
661 unreachable!(
662 "This is a bug in protocol version maintenance. {:?}",
663 kind
664 );
665 }
666
667 network::Error::MessageDeprecated { .. } => {
668 op = Some(UpdateNodeOperation::Failure);
669 error!(
670 "Peer sent us a deprecated message {:?}. Either it's a bug \
671 in protocol version maintenance or the peer is malicious.",
672 kind
673 );
674 }
675
676 network::Error::AddressParse => disconnect = false,
677 network::Error::AddressResolve(_) => disconnect = false,
678 network::Error::Auth => disconnect = false,
679 network::Error::BadProtocol => {
680 op = Some(UpdateNodeOperation::Remove)
681 }
682 network::Error::BadAddr => disconnect = false,
683 network::Error::Decoder(_) => {
684 op = Some(UpdateNodeOperation::Remove)
685 }
686 network::Error::Expired => disconnect = false,
687 network::Error::Disconnect(_) => disconnect = false,
688 network::Error::InvalidNodeId => disconnect = false,
689 network::Error::OversizedPacket => disconnect = false,
690 network::Error::Io(_) => disconnect = false,
691 network::Error::Throttling(_) => disconnect = false,
692 network::Error::SocketIo(_) => {
693 op = Some(UpdateNodeOperation::Failure)
694 }
695 network::Error::Msg(_) => {
696 op = Some(UpdateNodeOperation::Failure)
697 }
698 },
699 Error::Storage(_) => disconnect = false,
700 Error::Msg(_) => op = Some(UpdateNodeOperation::Failure),
701 Error::InternalError(_) => {}
702 Error::RpcCancelledByDisconnection => {}
703 Error::RpcTimeout => {}
704 Error::UnexpectedMessage(_) => {
705 op = Some(UpdateNodeOperation::Remove)
706 }
707 Error::NotSupported(_) => disconnect = false,
708 }
709
710 if warn {
711 warn!(
712 "Error while handling message, peer={}, msgid={:?}, error={}",
713 peer, msg_id, error_reason
714 );
715 } else {
716 debug!(
717 "Minor error while handling message, peer={}, msgid={:?}, error={}",
718 peer, msg_id, error_reason
719 );
720 }
721
722 if disconnect {
723 io.disconnect_peer(peer, op, reason.as_str());
724 }
725 }
726
727 pub fn start_sync(&self, io: &dyn NetworkContext) {
728 let current_phase_type =
729 self.phase_manager.get_current_phase().phase_type();
730 if current_phase_type == SyncPhaseType::CatchUpRecoverBlockHeaderFromDB
731 || current_phase_type == SyncPhaseType::CatchUpFillBlockBodyPhase
732 {
733 return;
734 }
735
736 if current_phase_type != SyncPhaseType::Normal {
737 self.request_epochs(io);
738 let best_peer_epoch = self.syn.best_peer_epoch().unwrap_or(0);
739 let my_best_epoch = self.graph.consensus.best_epoch_number();
740 if my_best_epoch + REQUEST_TERMINAL_EPOCH_LAG_THRESHOLD
741 >= best_peer_epoch
742 {
743 self.request_missing_terminals(io);
744 }
745 } else {
746 self.request_missing_terminals(io);
747 }
748 }
749
750 pub fn request_missing_terminals(&self, io: &dyn NetworkContext) {
751 let peers: Vec<NodeId> =
752 self.syn.peers.read().keys().cloned().collect();
753
754 let mut requested = HashSet::new();
755
756 let (_, era_genesis_height) =
757 self.graph.get_genesis_hash_and_height_in_current_era();
758 for peer in peers {
759 if let Ok(info) = self.syn.get_peer_info(&peer) {
760 if info.read().best_epoch < era_genesis_height {
761 continue;
764 }
765 let terminals = {
766 let mut info = info.write();
767 let ts = info.latest_block_hashes.clone();
768 info.latest_block_hashes.clear();
769 ts
770 };
771
772 let to_request = terminals
773 .difference(&requested)
774 .filter(|h| !self.graph.contains_block_header(&h))
780 .cloned()
781 .collect::<Vec<H256>>();
782
783 if terminals.len() > 0 {
784 debug!("Requesting terminals {:?}", to_request);
785 }
786
787 self.request_block_headers(
788 io,
789 Some(peer),
790 to_request.clone(),
791 true, );
793
794 requested.extend(to_request);
795 }
796 }
797
798 if requested.len() > 0 {
799 debug!("{:?} missing terminal block(s) requested", requested.len());
800 }
801 }
802
803 pub fn request_block_bodies(&self, io: &dyn NetworkContext) {
805 let in_flight_blocks = self.request_manager.in_flight_blocks();
806 let to_request_blocks: Vec<_> = self
807 .graph
808 .inner
809 .read()
810 .block_to_fill_set
811 .difference(&in_flight_blocks)
812 .copied()
813 .collect();
814 let n_blocks_to_request = min(
815 BLOCK_SYNC_MAX_INFLIGHT - in_flight_blocks.len(),
816 to_request_blocks.len(),
817 );
818
819 for block_chunk in to_request_blocks[0..n_blocks_to_request]
822 .chunks(MAX_BLOCKS_TO_SEND as usize)
823 {
824 self.request_blocks_without_check(io, None, block_chunk.to_vec());
825 }
826 }
827
828 pub fn request_epochs(&self, io: &dyn NetworkContext) {
832 let mut latest_requested = self.latest_epoch_requested.lock();
834
835 let median_peer_epoch =
839 self.syn.median_epoch_from_normal_peers().unwrap_or(0);
840 let my_best_epoch = self.graph.consensus.best_epoch_number();
841 let (
842 mut latest_requested_epoch,
843 latest_request_time,
844 old_best_epoch,
845 mut retry_count,
846 ) = *latest_requested;
847 if old_best_epoch != my_best_epoch {
850 retry_count = 0;
851 }
852
853 let sync_max_gap = EPOCH_SYNC_MAX_GAP_START
860 + EPOCH_SYNC_MAX_GAP_INCREASE * retry_count;
861 if latest_requested_epoch >= my_best_epoch + sync_max_gap {
866 if latest_request_time.elapsed()
867 < Duration::from_secs(EPOCH_SYNC_RESTART_TIMEOUT_S)
868 {
869 return;
870 } else {
871 latest_requested_epoch = my_best_epoch;
873 if retry_count < EPOCH_SYNC_MAX_RETRY_COUNT {
874 retry_count += 1;
875 }
876 }
877 }
878
879 while self.request_manager.num_epochs_in_flight()
880 < EPOCH_SYNC_MAX_INFLIGHT
881 && latest_requested_epoch < my_best_epoch + sync_max_gap
882 && (latest_requested_epoch < median_peer_epoch
883 || median_peer_epoch == 0)
884 {
885 let from = cmp::max(my_best_epoch, latest_requested_epoch) + 1;
886 if let Some(epoch_hashes) =
888 self.graph.data_man.all_epoch_set_hashes_from_db(from)
889 {
890 debug!("Recovered epoch {} from db", from);
891 if self.need_requesting_blocks() {
892 self.request_blocks(io, None, epoch_hashes);
893 } else {
894 self.request_block_headers(
895 io,
896 None,
897 epoch_hashes,
898 true, );
900 }
901 latest_requested_epoch = from;
902 continue;
903 } else if median_peer_epoch == 0 {
904 break;
907 }
908
909 let peer = PeerFilter::new(msgid::GET_BLOCK_HASHES_BY_EPOCH)
912 .with_min_best_epoch(from)
913 .select(&self.syn);
914
915 if peer.is_none() {
917 break;
918 }
919
920 let until = {
921 let max_to_send = EPOCH_SYNC_MAX_INFLIGHT.saturating_sub(
922 self.request_manager.num_epochs_in_flight(),
923 );
924 let maybe_peer_info = self.syn.get_peer_info(&peer.unwrap());
925 if maybe_peer_info.is_err() {
926 continue;
930 }
931
932 let best_of_this_peer =
933 maybe_peer_info.unwrap().read().best_epoch;
934
935 let until = from + cmp::min(EPOCH_SYNC_BATCH_SIZE, max_to_send);
936 cmp::min(until, best_of_this_peer + 1)
937 };
938
939 let epochs = (from..until).collect::<Vec<u64>>();
940
941 debug!(
942 "requesting epochs [{}..{}]/{:?} from peer {:?}",
943 from,
944 until - 1,
945 median_peer_epoch,
946 peer
947 );
948
949 self.request_manager
950 .request_epoch_hashes(io, peer, epochs, None);
951 latest_requested_epoch = until - 1;
952 }
953 *latest_requested = (
954 latest_requested_epoch,
955 Instant::now(),
956 my_best_epoch,
957 retry_count,
958 );
959 }
960
961 pub fn request_block_headers(
962 &self, io: &dyn NetworkContext, peer: Option<NodeId>,
963 mut header_hashes: Vec<H256>, ignore_db: bool,
964 ) {
965 if !ignore_db {
966 header_hashes
967 .retain(|hash| !self.try_request_header_from_db(io, hash));
968 }
969 header_hashes.retain(|h| !self.graph.contains_block_header(h));
972 self.request_manager.request_block_headers(
973 io,
974 peer,
975 header_hashes,
976 None,
977 );
978 }
979
980 fn try_request_header_from_db(
984 &self, io: &dyn NetworkContext, hash: &H256,
985 ) -> bool {
986 if self.graph.contains_block_header(hash) {
987 return true;
988 }
989
990 if let Some(info) = self.graph.data_man.local_block_info_by_hash(hash) {
991 if info.get_status() == BlockStatus::Invalid {
992 return true;
994 }
995 if info.get_seq_num()
996 < self.graph.consensus.current_era_genesis_seq_num()
997 {
998 debug!("Ignore header in old era hash={:?}, seq={}, cur_era_seq={}", hash, info.get_seq_num(), self.graph.consensus.current_era_genesis_seq_num());
999 return true;
1002 }
1003
1004 if info.get_instance_id() == self.graph.data_man.get_instance_id() {
1005 return true;
1008 }
1009 }
1010
1011 if let Some(header) = self.graph.data_man.block_header_by_hash(hash) {
1014 debug!("Recovered header {:?} from db", hash);
1015 let mut block_headers_resp = GetBlockHeadersResponse::default();
1017 block_headers_resp.request_id = 0;
1018 let mut headers = Vec::new();
1019 headers.push((*header).clone());
1020 block_headers_resp.headers = headers;
1021
1022 let ctx = Context {
1023 node_id: io.self_node_id(),
1024 io,
1025 manager: self,
1026 };
1027
1028 ctx.send_response(&block_headers_resp)
1029 .expect("send response should not be error");
1030 return true;
1031 } else {
1032 return false;
1033 }
1034 }
1035
1036 fn on_blocks_inner(
1037 &self, io: &dyn NetworkContext, task: RecoverPublicTask,
1038 ) -> Result<(), Error> {
1039 let mut need_to_relay = Vec::new();
1040 let mut received_blocks = HashSet::new();
1041 let mut dependent_hashes = HashSet::new();
1042 for mut block in task.blocks {
1043 let hash = block.hash();
1044 if self.graph.contains_block(&hash) {
1045 received_blocks.insert(hash);
1049 continue;
1050 }
1051 if !task.requested.contains(&hash) {
1052 warn!("Response has not requested block {:?}", hash);
1053 continue;
1054 }
1055 if let Err(e) = self.graph.data_man.recover_block(&mut block) {
1056 warn!("Recover block {:?} with error {:?}", hash, e);
1057 continue;
1058 }
1059
1060 match self.graph.block_header_by_hash(&hash) {
1061 Some(header) => block.block_header = header,
1062 None => {
1063 let (insert_result, _) = self.graph.insert_block_header(
1068 &mut block.block_header,
1069 true, false, false, true, );
1074 if !insert_result.should_process_body() {
1075 received_blocks.insert(hash);
1080 continue;
1081 }
1082
1083 let parent = block.block_header.parent_hash();
1088 if !self.graph.contains_block(parent) {
1089 dependent_hashes.insert(*parent);
1090 }
1091 for referee in block.block_header.referee_hashes() {
1092 if !self.graph.contains_block(referee) {
1093 dependent_hashes.insert(*referee);
1094 }
1095 }
1096 }
1097 }
1098 let insert_result = self.graph.insert_block(
1099 block, true, true, false, );
1103 if insert_result.is_valid() {
1104 received_blocks.insert(hash);
1106 }
1107 if insert_result.should_relay() {
1108 need_to_relay.push(hash);
1109 }
1110 }
1111 let mut filter =
1112 PeerFilter::new(msgid::GET_BLOCKS).exclude(task.failed_peer);
1113 if let Some(preferred_note_type) =
1114 self.preferred_peer_node_type_for_get_block()
1115 {
1116 filter = filter.with_preferred_node_type(preferred_note_type);
1117 }
1118 let chosen_peer = filter.select(&self.syn);
1119 self.blocks_received(
1120 io,
1121 task.requested,
1122 received_blocks.clone(),
1123 !task.compact,
1124 chosen_peer.clone(),
1125 task.delay,
1126 self.preferred_peer_node_type_for_get_block(),
1127 );
1128 if self.graph.inner.read().locked_for_catchup {
1129 self.request_block_bodies(io);
1130 Ok(())
1131 } else {
1132 let missing_dependencies = dependent_hashes
1133 .difference(&received_blocks)
1134 .map(Clone::clone)
1135 .collect();
1136 self.request_blocks(io, chosen_peer, missing_dependencies);
1137 self.relay_blocks(io, need_to_relay)
1138 }
1139 }
1140
1141 fn on_blocks_inner_task(
1142 &self, io: &dyn NetworkContext,
1143 ) -> Result<(), Error> {
1144 let task = self.recover_public_queue.pop().unwrap();
1145 let received_blocks: Vec<H256> =
1146 task.blocks.iter().map(|block| block.hash()).collect();
1147 self.request_manager
1148 .remove_net_inflight_blocks(received_blocks.iter());
1149 self.request_manager
1150 .remove_net_inflight_blocks(task.requested.iter());
1151 self.on_blocks_inner(io, task)
1152 }
1153
1154 fn on_local_message_task(&self, io: &dyn NetworkContext) {
1155 let task = self.local_message.pop().unwrap();
1156 self.on_message(io, &io.self_node_id(), task.message.as_slice());
1157 }
1158
1159 pub fn on_mined_block(&self, mut block: Block) {
1160 let hash = block.block_header.hash();
1161 info!("Mined block {:?} header={:?}", hash, block.block_header);
1162 let parent_hash = *block.block_header.parent_hash();
1163
1164 assert!(self.graph.contains_block_header(&parent_hash));
1165 if self.graph.contains_block_header(&hash) {
1166 warn!("Mined an duplicate block, the mining power is wasted!");
1167 return;
1168 }
1169 self.graph.insert_block_header(
1170 &mut block.block_header,
1171 false,
1172 false,
1173 false,
1174 true,
1175 );
1176 self.graph.insert_block(
1179 block, false, true, false, );
1183 }
1184
1185 fn broadcast_message(
1186 &self, io: &dyn NetworkContext, skip_id: &NodeId, msg: &dyn Message,
1187 ) -> Result<(), NetworkError> {
1188 let mut peer_ids: Vec<NodeId> = self
1189 .syn
1190 .peers
1191 .read()
1192 .keys()
1193 .filter(|&id| *id != *skip_id)
1194 .map(|x| *x)
1195 .collect();
1196
1197 let throttle_ratio = THROTTLING_SERVICE.read().get_throttling_ratio();
1198 let num_total = peer_ids.len();
1199 let num_allowed = (num_total as f64 * throttle_ratio) as usize;
1200
1201 if num_total > num_allowed {
1202 debug!("apply throttling for broadcast_message, total: {}, allowed: {}", num_total, num_allowed);
1203 peer_ids.shuffle(&mut random::new());
1204 peer_ids.truncate(num_allowed);
1205 }
1206
1207 let msg_version_introduced = msg.version_introduced();
1212 let mut msg_version_valid_till = msg.version_valid_till();
1213 if msg_version_valid_till == self.protocol_version {
1214 msg_version_valid_till = ProtocolVersion(std::u8::MAX);
1215 }
1216 for id in peer_ids {
1217 let peer_version = self.syn.get_peer_version(&id)?;
1218 if peer_version >= msg_version_introduced
1219 && peer_version <= msg_version_valid_till
1220 {
1221 msg.send(io, &id)?;
1222 }
1223 }
1224
1225 Ok(())
1226 }
1227
1228 fn produce_status_message_v2(&self) -> StatusV2 {
1229 let best_info = self.graph.consensus.best_info();
1230 let chain_id = ChainIdParamsDeprecated {
1231 chain_id: best_info.best_chain_id().in_native_space(),
1232 };
1233 let terminal_hashes = best_info.bounded_terminal_block_hashes.clone();
1234
1235 StatusV2 {
1236 chain_id,
1237 genesis_hash: self.graph.data_man.true_genesis.hash(),
1238 best_epoch: best_info.best_epoch_number,
1239 terminal_block_hashes: terminal_hashes,
1240 }
1241 }
1242
1243 fn produce_status_message_v3(&self) -> StatusV3 {
1244 let best_info = self.graph.consensus.best_info();
1245 let chain_id = ChainIdParamsDeprecated {
1246 chain_id: best_info.best_chain_id().in_native_space(),
1247 };
1248 let terminal_hashes = best_info.bounded_terminal_block_hashes.clone();
1249
1250 StatusV3 {
1251 chain_id,
1252 node_type: self.node_type(),
1253 genesis_hash: self.graph.data_man.true_genesis.hash(),
1254 best_epoch: best_info.best_epoch_number,
1255 terminal_block_hashes: terminal_hashes,
1256 }
1257 }
1258
1259 fn produce_heartbeat_message(&self) -> Heartbeat {
1260 let best_info = self.graph.consensus.best_info();
1261 let terminal_hashes = best_info.bounded_terminal_block_hashes.clone();
1262
1263 Heartbeat {
1264 best_epoch: best_info.best_epoch_number,
1265 terminal_block_hashes: terminal_hashes,
1266 }
1267 }
1268
1269 fn send_status(
1270 &self, io: &dyn NetworkContext, peer: &NodeId,
1271 peer_protocol_version: ProtocolVersion,
1272 ) -> Result<(), NetworkError> {
1273 if peer_protocol_version == SYNC_PROTO_V2 {
1274 let status_message = self.produce_status_message_v2();
1275 debug!("Sending status message to {}: {:?}", peer, status_message);
1276 status_message.send(io, peer)
1277 } else {
1278 let status_message = self.produce_status_message_v3();
1279 debug!("Sending status message to {}: {:?}", peer, status_message);
1280 status_message.send(io, peer)
1281 }
1282 }
1283
1284 fn broadcast_heartbeat(&self, io: &dyn NetworkContext) {
1285 let status_message = self.produce_status_message_v2();
1286 let heartbeat_message = self.produce_heartbeat_message();
1287 debug!("Broadcasting heartbeat message: {:?}", heartbeat_message);
1288
1289 if self
1290 .broadcast_message(io, &Default::default(), &heartbeat_message)
1291 .is_err()
1292 {
1293 warn!("Error broadcasting heartbeat message");
1294 }
1295 if self
1296 .broadcast_message(io, &Default::default(), &status_message)
1297 .is_err()
1298 {
1299 warn!("Error broadcasting status message");
1300 }
1301 }
1302
1303 pub fn relay_blocks(
1304 &self, io: &dyn NetworkContext, need_to_relay: Vec<H256>,
1305 ) -> Result<(), Error> {
1306 if !need_to_relay.is_empty() && !self.catch_up_mode() {
1307 let new_block_hash_msg: Box<dyn Message> =
1308 Box::new(NewBlockHashes {
1309 block_hashes: need_to_relay.clone(),
1310 });
1311 self.broadcast_message(
1312 io,
1313 &Default::default(),
1314 new_block_hash_msg.as_ref(),
1315 )
1316 .unwrap_or_else(|e| {
1317 warn!("Error broadcasting blocks, err={:?}", e);
1318 });
1319
1320 self.light_provider
1321 .relay_hashes(need_to_relay)
1322 .unwrap_or_else(|e| {
1323 warn!("Error relaying blocks to light provider: {:?}", e);
1324 });
1325 }
1326
1327 Ok(())
1328 }
1329
1330 fn select_peers_for_transactions(&self) -> Vec<NodeId> {
1331 let num_peers = self.syn.peers.read().len() as f64;
1332 let throttle_ratio = THROTTLING_SERVICE.read().get_throttling_ratio();
1333
1334 let chosen_size = (num_peers.powf(-0.5).min(throttle_ratio) * num_peers)
1336 .round() as usize;
1337
1338 let num_peers = chosen_size
1339 .max(self.protocol_config.min_peers_tx_propagation)
1340 .min(self.protocol_config.max_peers_tx_propagation);
1341
1342 PeerFilter::new(msgid::TRANSACTION_DIGESTS)
1343 .with_cap(DynamicCapability::NormalPhase(true))
1344 .select_n(num_peers, &self.syn)
1345 }
1346
1347 fn propagate_transactions_to_peers(
1348 &self, io: &dyn NetworkContext, lucky_peers: Vec<NodeId>,
1349 ) {
1350 let _timer = MeterTimer::time_func(PROPAGATE_TX_TIMER.as_ref());
1351 if lucky_peers.is_empty() {
1352 return;
1353 }
1354
1355 let mut nonces: Vec<(u64, u64)> = (0..lucky_peers.len())
1357 .map(|_| (rand::rng().random(), rand::rng().random()))
1358 .collect();
1359
1360 let mut short_ids_part: Vec<Vec<u8>> = vec![vec![]; lucky_peers.len()];
1361 let mut tx_hashes_part: Vec<H256> = vec![];
1362 let (short_ids_transactions, tx_hashes_transactions) = {
1363 let mut transactions = self.get_to_propagate_trans();
1364 if transactions.is_empty() {
1365 return;
1366 }
1367
1368 let mut total_tx_bytes = 0;
1369 let mut short_ids_transactions: Vec<Arc<SignedTransaction>> =
1370 Vec::new();
1371 let mut tx_hashes_transactions: Vec<Arc<SignedTransaction>> =
1372 Vec::new();
1373
1374 let received_pool =
1375 self.request_manager.received_transactions.read();
1376 for (_, tx) in transactions.iter() {
1377 total_tx_bytes += tx.rlp_size();
1378 if total_tx_bytes >= MAX_TXS_BYTES_TO_PROPAGATE {
1379 break;
1380 }
1381 if received_pool.group_overflow_from_tx_hash(&tx.hash()) {
1382 tx_hashes_transactions.push(tx.clone());
1383 } else {
1384 short_ids_transactions.push(tx.clone());
1385 }
1386 }
1387
1388 if short_ids_transactions.len() + tx_hashes_transactions.len()
1389 != transactions.len()
1390 {
1391 for tx in short_ids_transactions.iter() {
1392 transactions.remove(&tx.hash);
1393 }
1394 for tx in tx_hashes_transactions.iter() {
1395 transactions.remove(&tx.hash);
1396 }
1397 self.set_to_propagate_trans(transactions);
1398 }
1399
1400 (short_ids_transactions, tx_hashes_transactions)
1401 };
1402 debug!(
1403 "Send short ids:{}, Send tx hashes:{}",
1404 short_ids_transactions.len(),
1405 tx_hashes_transactions.len()
1406 );
1407 for tx in &short_ids_transactions {
1408 for i in 0..lucky_peers.len() {
1409 TransactionDigests::append_short_id(
1412 &mut short_ids_part[i],
1413 nonces[i].0,
1414 nonces[i].1,
1415 &tx.hash(),
1416 );
1417 }
1418 }
1419 let mut sent_transactions = short_ids_transactions.clone();
1420 if !tx_hashes_transactions.is_empty() {
1421 TX_HASHES_PROPAGATE_METER.mark(tx_hashes_transactions.len());
1422 for tx in &tx_hashes_transactions {
1423 TransactionDigests::append_tx_hash(
1424 &mut tx_hashes_part,
1425 tx.hash(),
1426 );
1427 }
1428 sent_transactions.extend(tx_hashes_transactions.clone());
1429 }
1430
1431 TX_PROPAGATE_METER.mark(sent_transactions.len());
1432
1433 if sent_transactions.len() == 0 {
1434 return;
1435 }
1436
1437 debug!(
1438 "Sent {} transaction ids to {} peers.",
1439 sent_transactions.len(),
1440 lucky_peers.len()
1441 );
1442
1443 let window_index = self
1444 .request_manager
1445 .append_sent_transactions(sent_transactions);
1446
1447 let mut resend_flag = false;
1448 for i in 0..lucky_peers.len() {
1449 let peer_id = lucky_peers[i];
1450 let (key1, key2) = nonces.pop().unwrap();
1451 let tx_msg = TransactionDigests::new(
1452 window_index,
1453 key1,
1454 key2,
1455 short_ids_part.pop().unwrap(),
1456 tx_hashes_part.clone(),
1457 );
1458 match tx_msg.send(io, &peer_id) {
1459 Ok(_) => {
1460 trace!(
1461 "{:02} <- Transactions ({} entries)",
1462 peer_id,
1463 tx_msg.len()
1464 );
1465 }
1466 Err(e) => {
1467 warn!(
1468 "failed to propagate transaction ids to peer, id: {}, err: {}",
1469 peer_id, e
1470 );
1471 resend_flag = true;
1472 }
1473 }
1474 }
1475
1476 if resend_flag {
1477 let mut resend_transactions: HashMap<H256, Arc<SignedTransaction>> =
1478 HashMap::new();
1479 for tx in short_ids_transactions {
1480 resend_transactions.insert(tx.hash, tx.clone());
1481 }
1482 for tx in tx_hashes_transactions {
1483 resend_transactions.insert(tx.hash, tx.clone());
1484 }
1485 self.set_to_propagate_trans(resend_transactions);
1486 }
1487 }
1488
1489 pub fn check_future_blocks(&self, io: &dyn NetworkContext) {
1490 let now_timestamp = SystemTime::now()
1491 .duration_since(UNIX_EPOCH)
1492 .unwrap()
1493 .as_secs();
1494
1495 let mut missed_body_block_hashes = HashMap::new();
1496 let mut need_to_relay = HashSet::new();
1497 let headers = self.graph.future_blocks.get_before(now_timestamp);
1498
1499 if headers.is_empty() {
1500 return;
1501 }
1502
1503 for (mut header, peer) in headers {
1504 let hash = header.hash();
1505 let (insert_result, to_relay) = self.graph.insert_block_header(
1506 &mut header,
1507 true,
1508 false,
1509 self.insert_header_to_consensus(),
1510 true,
1511 );
1512 if insert_result.is_new_valid() {
1513 need_to_relay.extend(to_relay);
1514
1515 if !self.graph.contains_block(&hash) {
1517 missed_body_block_hashes
1520 .entry(peer)
1521 .or_insert(Vec::new())
1522 .push(hash);
1523 }
1524 }
1525 }
1526
1527 for (peer, missing_hashes) in missed_body_block_hashes {
1528 self.request_missing_blocks(io, Some(peer), missing_hashes);
1531 }
1532
1533 self.relay_blocks(io, need_to_relay.into_iter().collect())
1535 .ok();
1536 }
1537
1538 pub fn insert_header_to_consensus(&self) -> bool {
1543 let current_phase = self.phase_manager.get_current_phase();
1544 matches!(
1545 current_phase.phase_type(),
1546 SyncPhaseType::CatchUpSyncBlockHeader
1547 | SyncPhaseType::CatchUpCheckpoint
1548 )
1549 }
1550
1551 pub fn propagate_new_transactions(&self, io: &dyn NetworkContext) {
1552 if self.syn.peers.read().is_empty() || self.catch_up_mode() {
1553 if self.protocol_config.dev_mode {
1554 self.get_to_propagate_trans();
1556 }
1557 return;
1558 }
1559
1560 let peers = self.select_peers_for_transactions();
1561 self.propagate_transactions_to_peers(io, peers);
1562 }
1563
1564 pub fn remove_expired_flying_request(&self, io: &dyn NetworkContext) {
1565 self.request_manager.resend_timeout_requests(io);
1566 let cancelled_requests = self.request_manager.resend_waiting_requests(
1567 io,
1568 !self.catch_up_mode(),
1569 self.need_block_from_archive_node(),
1570 );
1571 self.handle_cancelled_requests(cancelled_requests);
1572 }
1573
1574 fn handle_cancelled_requests(
1578 &self, cancelled_requests: Vec<Box<dyn Request>>,
1579 ) {
1580 let mut to_remove_blocks = HashSet::new();
1581 for request in cancelled_requests {
1582 if let Some(block_hashes) = try_get_block_hashes(&request) {
1586 for hash in block_hashes {
1587 to_remove_blocks.insert(*hash);
1588 }
1589 }
1590 }
1591 self.graph.remove_blocks_and_future(&to_remove_blocks);
1592 }
1593
1594 pub fn send_heartbeat(&self, io: &dyn NetworkContext) {
1595 self.broadcast_heartbeat(io);
1596 }
1597
1598 fn gc(&self) {
1599 self.graph.data_man.cache_gc();
1600 self.graph
1601 .data_man
1602 .database_gc(self.graph.consensus.best_epoch_number())
1603 }
1604
1605 fn log_statistics(&self) { self.graph.log_statistics(); }
1606
1607 fn update_total_weight_delta_heartbeat(&self) {
1608 self.graph.update_total_weight_delta_heartbeat();
1609 }
1610
1611 pub fn update_sync_phase(&self, io: &dyn NetworkContext) {
1612 match self.phase_manager_lock.try_lock() {
1613 Some(_pm_lock) => {
1614 self.phase_manager.try_initialize(io, self);
1615 loop {
1616 let current_phase = self.phase_manager.get_current_phase();
1618 let next_phase_type = current_phase.next(io, self);
1619 if current_phase.phase_type() != next_phase_type {
1620 self.phase_manager.change_phase_to(
1622 next_phase_type,
1623 io,
1624 self,
1625 );
1626 } else {
1627 break;
1628 }
1629 }
1630 }
1631 None => {
1632 debug!("update_sync_phase: phase_manager locked by another IO Worker");
1633 return;
1634 }
1635 }
1636
1637 let catch_up_mode = self.catch_up_mode();
1638 let mut need_notify = Vec::new();
1639 for (peer, state) in self.syn.peers.read().iter() {
1640 let mut state = state.write();
1641 if !state
1642 .notified_capabilities
1643 .contains(DynamicCapability::NormalPhase(!catch_up_mode))
1644 {
1645 state.received_transaction_count = 0;
1646 state
1647 .notified_capabilities
1648 .insert(DynamicCapability::NormalPhase(!catch_up_mode));
1649 need_notify.push(*peer);
1650 }
1651 }
1652 info!(
1653 "Catch-up mode: {}, latest epoch: {} missing_bodies: {}",
1654 catch_up_mode,
1655 self.graph.consensus.best_epoch_number(),
1656 self.graph.inner.read().block_to_fill_set.len()
1657 );
1658
1659 DynamicCapability::NormalPhase(!catch_up_mode)
1660 .broadcast_with_peers(io, need_notify);
1661 }
1662
1663 pub fn request_missing_blocks(
1664 &self, io: &dyn NetworkContext, peer_id: Option<NodeId>,
1665 hashes: Vec<H256>,
1666 ) {
1667 let catch_up_mode = self.catch_up_mode();
1668 if catch_up_mode {
1669 self.request_blocks(io, peer_id, hashes);
1670 } else {
1671 self.request_manager
1672 .request_compact_blocks(io, peer_id, hashes, None);
1673 }
1674 }
1675
1676 pub fn request_blocks(
1677 &self, io: &dyn NetworkContext, peer_id: Option<NodeId>,
1678 mut hashes: Vec<H256>,
1679 ) {
1680 hashes.retain(|hash| !self.already_processed(hash));
1681 self.request_blocks_without_check(io, peer_id, hashes)
1682 }
1683
1684 pub fn request_blocks_without_check(
1685 &self, io: &dyn NetworkContext, peer_id: Option<NodeId>,
1686 hashes: Vec<H256>,
1687 ) {
1688 let preferred_node_type = self.preferred_peer_node_type_for_get_block();
1689 self.request_manager.request_blocks(
1690 io,
1691 peer_id,
1692 hashes,
1693 self.request_block_need_public(),
1694 None,
1695 preferred_node_type,
1696 );
1697 }
1698
1699 fn already_processed(&self, hash: &H256) -> bool {
1703 if self.graph.contains_block(hash) {
1704 return true;
1705 }
1706
1707 if let Some(info) = self.graph.data_man.local_block_info_by_hash(hash) {
1708 if info.get_status() == BlockStatus::Invalid {
1709 return true;
1711 }
1712 if info.get_seq_num()
1713 < self.graph.consensus.current_era_genesis_seq_num()
1714 {
1715 debug!(
1716 "Ignore block in old era hash={:?}, seq={}, cur_era_seq={}",
1717 hash,
1718 info.get_seq_num(),
1719 self.graph.consensus.current_era_genesis_seq_num()
1720 );
1721 return true;
1724 }
1725
1726 if info.get_instance_id() == self.graph.data_man.get_instance_id() {
1727 return true;
1730 }
1731 }
1732 return false;
1733 }
1734
1735 pub fn blocks_received(
1736 &self, io: &dyn NetworkContext, requested_hashes: HashSet<H256>,
1737 returned_blocks: HashSet<H256>, ask_full_block: bool,
1738 peer: Option<NodeId>, delay: Option<Duration>,
1739 preferred_node_type_for_block_request: Option<NodeType>,
1740 ) {
1741 self.request_manager.blocks_received(
1742 io,
1743 requested_hashes,
1744 returned_blocks,
1745 ask_full_block,
1746 peer,
1747 self.request_block_need_public(),
1748 delay,
1749 preferred_node_type_for_block_request,
1750 )
1751 }
1752
1753 fn request_block_need_public(&self) -> bool {
1754 self.catch_up_mode() && self.protocol_config.request_block_with_public
1755 }
1756
1757 pub fn expire_block_gc(
1758 &self, _io: &dyn NetworkContext, timeout: u64,
1759 ) -> Result<(), Error> {
1760 if self.in_recover_from_db_phase() {
1761 return Ok(());
1764 }
1765 self.graph.remove_expire_blocks(timeout);
1766 Ok(())
1767 }
1768
1769 pub fn is_block_queue_full(&self) -> bool {
1770 self.recover_public_queue.is_full()
1771 }
1772}
1773
1774impl NetworkProtocolHandler for SynchronizationProtocolHandler {
1775 fn minimum_supported_version(&self) -> ProtocolVersion {
1776 let my_version = self.protocol_version.0;
1777 if my_version > SYNCHRONIZATION_PROTOCOL_OLD_VERSIONS_TO_SUPPORT {
1778 ProtocolVersion(
1779 my_version - SYNCHRONIZATION_PROTOCOL_OLD_VERSIONS_TO_SUPPORT,
1780 )
1781 } else {
1782 SYNC_PROTO_V1
1783 }
1784 }
1785
1786 fn initialize(&self, io: &dyn NetworkContext) {
1787 io.register_timer(TX_TIMER, self.protocol_config.send_tx_period)
1788 .expect("Error registering transactions timer");
1789 io.register_timer(
1790 CHECK_REQUEST_TIMER,
1791 self.protocol_config.check_request_period,
1792 )
1793 .expect("Error registering check request timer");
1794 io.register_timer(
1795 HEARTBEAT_TIMER,
1796 self.protocol_config.heartbeat_period_interval,
1797 )
1798 .expect("Error registering heartbeat timer");
1799 io.register_timer(
1800 BLOCK_CACHE_GC_TIMER,
1801 self.protocol_config.block_cache_gc_period,
1802 )
1803 .expect("Error registering block_cache_gc timer");
1804 io.register_timer(
1805 CHECK_CATCH_UP_MODE_TIMER,
1806 self.protocol_config.check_phase_change_period,
1807 )
1808 .expect("Error registering check_catch_up_mode timer");
1809 io.register_timer(LOG_STATISTIC_TIMER, Duration::from_millis(5000))
1810 .expect("Error registering log_statistics timer");
1811 io.register_timer(
1812 TOTAL_WEIGHT_IN_PAST_TIMER,
1813 Duration::from_secs(BLOCK_PROPAGATION_DELAY * 2),
1814 )
1815 .expect("Error registering total_weight_in_past timer");
1816 io.register_timer(CHECK_PEER_HEARTBEAT_TIMER, Duration::from_secs(60))
1817 .expect("Error registering CHECK_PEER_HEARTBEAT_TIMER");
1818 io.register_timer(
1819 CHECK_FUTURE_BLOCK_TIMER,
1820 Duration::from_millis(1000),
1821 )
1822 .expect("Error registering CHECK_FUTURE_BLOCK_TIMER");
1823 io.register_timer(
1824 EXPIRE_BLOCK_GC_TIMER,
1825 self.protocol_config.expire_block_gc_period,
1826 )
1827 .expect("Error registering EXPIRE_BLOCK_GC_TIMER");
1828 }
1829
1830 fn send_local_message(&self, io: &dyn NetworkContext, message: Vec<u8>) {
1831 self.local_message
1832 .dispatch(io, LocalMessageTask { message });
1833 }
1834
1835 fn on_message(&self, io: &dyn NetworkContext, peer: &NodeId, raw: &[u8]) {
1836 let (msg_id, rlp) = match decode_msg(raw) {
1837 Some(msg) => msg,
1838 None => {
1839 return self.handle_error(
1840 io,
1841 peer,
1842 msgid::INVALID,
1843 Error::InvalidMessageFormat.into(),
1844 )
1845 }
1846 };
1847
1848 debug!("on_message: peer={}, msgid={:?}", peer, msg_id);
1849
1850 self.dispatch_message(io, peer, msg_id.into(), rlp)
1851 .unwrap_or_else(|e| self.handle_error(io, peer, msg_id.into(), e));
1852
1853 self.request_manager.send_pending_requests(io, peer);
1856 }
1857
1858 fn on_work_dispatch(
1859 &self, io: &dyn NetworkContext, work_type: HandlerWorkType,
1860 ) {
1861 if work_type == SyncHandlerWorkType::RecoverPublic as HandlerWorkType {
1862 if let Err(e) = self.on_blocks_inner_task(io) {
1863 warn!("Error processing RecoverPublic task: {:?}", e);
1864 }
1865 } else if work_type
1866 == SyncHandlerWorkType::LocalMessage as HandlerWorkType
1867 {
1868 self.on_local_message_task(io);
1869 } else {
1870 warn!("Unknown SyncHandlerWorkType");
1871 }
1872 }
1873
1874 fn on_peer_connected(
1875 &self, io: &dyn NetworkContext, node_id: &NodeId,
1876 peer_protocol_version: ProtocolVersion,
1877 _pos_public_key: Option<(ConsensusPublicKey, ConsensusVRFPublicKey)>,
1878 ) {
1879 debug!(
1880 "Peer connected: peer={:?}, version={}",
1881 node_id, peer_protocol_version
1882 );
1883 if let Err(e) = self.send_status(io, node_id, peer_protocol_version) {
1884 debug!("Error sending status message: {:?}", e);
1885 io.disconnect_peer(
1886 node_id,
1887 Some(UpdateNodeOperation::Failure),
1888 "send status failed", );
1890 } else {
1891 self.syn
1892 .handshaking_peers
1893 .write()
1894 .insert(*node_id, (peer_protocol_version, Instant::now()));
1895 }
1896 }
1897
1898 fn on_peer_disconnected(&self, io: &dyn NetworkContext, peer: &NodeId) {
1899 debug!("Peer disconnected: peer={}", peer);
1900 self.syn.peers.write().remove(peer);
1901 self.syn.handshaking_peers.write().remove(peer);
1902 self.request_manager.on_peer_disconnected(io, peer);
1903 self.state_sync.on_peer_disconnected(&peer);
1904 }
1905
1906 fn on_timeout(&self, io: &dyn NetworkContext, timer: TimerToken) {
1907 trace!("Timeout: timer={:?}", timer);
1908 match timer {
1909 TX_TIMER => {
1910 self.propagate_new_transactions(io);
1911 }
1912 CHECK_FUTURE_BLOCK_TIMER => {
1913 self.check_future_blocks(io);
1914 self.graph.check_not_ready_frontier(
1915 self.insert_header_to_consensus(),
1916 );
1917 }
1918 CHECK_REQUEST_TIMER => {
1919 self.remove_expired_flying_request(io);
1920 }
1921 HEARTBEAT_TIMER => {
1922 self.send_heartbeat(io);
1923 }
1924 BLOCK_CACHE_GC_TIMER => {
1925 self.gc();
1926 }
1927 CHECK_CATCH_UP_MODE_TIMER => {
1928 self.update_sync_phase(io);
1929 }
1930 LOG_STATISTIC_TIMER => {
1931 self.log_statistics();
1932 }
1933 TOTAL_WEIGHT_IN_PAST_TIMER => {
1934 self.update_total_weight_delta_heartbeat();
1935 }
1936 CHECK_PEER_HEARTBEAT_TIMER => {
1937 let timeout_peers = self.syn.get_heartbeat_timeout_peers(
1938 self.protocol_config.heartbeat_timeout,
1939 );
1940 for peer in timeout_peers {
1941 io.disconnect_peer(
1942 &peer,
1943 Some(UpdateNodeOperation::Failure),
1944 "sync heartbeat timeout", );
1946 }
1947 }
1948 EXPIRE_BLOCK_GC_TIMER => {
1949 self.expire_block_gc(
1954 io,
1955 self.protocol_config.sync_expire_block_timeout.as_secs(),
1956 )
1957 .ok();
1958 }
1959 _ => warn!("Unknown timer {} triggered.", timer),
1960 }
1961 }
1962}