1pub mod sync;
6
7use crate::{
8 block_data_manager::BlockDataManager,
9 consensus::SharedConsensusGraph,
10 light_protocol::{
11 common::{validate_chain_id, FullPeerState, Peers},
12 error::*,
13 handle_error,
14 message::{
15 msgid, BlockHashes as GetBlockHashesResponse,
16 BlockHeaders as GetBlockHeadersResponse,
17 BlockTxs as GetBlockTxsResponse, Blooms as GetBloomsResponse,
18 NewBlockHashes, NodeType, Receipts as GetReceiptsResponse,
19 SendRawTx, StateEntries as GetStateEntriesResponse,
20 StateRoots as GetStateRootsResponse, StatusPingDeprecatedV1,
21 StatusPingV2, StatusPongDeprecatedV1, StatusPongV2,
22 StorageRoots as GetStorageRootsResponse,
23 TxInfos as GetTxInfosResponse, Txs as GetTxsResponse,
24 WitnessInfo as GetWitnessInfoResponse,
25 },
26 LightNodeConfiguration, LIGHT_PROTOCOL_OLD_VERSIONS_TO_SUPPORT,
27 LIGHT_PROTOCOL_VERSION, LIGHT_PROTO_V1,
28 },
29 message::{decode_msg, decode_rlp_and_check_deprecation, Message, MsgId},
30 sync::{message::Throttled, SynchronizationGraph},
31 Notifications, UniqueId,
32};
33use cfx_internal_common::ChainIdParamsDeprecated;
34use cfx_parameters::light::{
35 CATCH_UP_EPOCH_LAG_THRESHOLD, CLEANUP_PERIOD, HEARTBEAT_PERIOD, SYNC_PERIOD,
36};
37use cfx_types::H256;
38use diem_types::validator_config::{ConsensusPublicKey, ConsensusVRFPublicKey};
39use network::{
40 node_table::NodeId, service::ProtocolVersion, NetworkContext,
41 NetworkProtocolHandler,
42};
43use parking_lot::RwLock;
44use rlp::Rlp;
45use std::{
46 sync::{
47 atomic::{AtomicBool, Ordering},
48 Arc,
49 },
50 thread,
51 time::{Duration, Instant},
52};
53use sync::{
54 BlockTxs, Blooms, Epochs, HashSource, Headers, Receipts, StateEntries,
55 StateRoots, StorageRoots, TxInfos, Txs, Witnesses,
56};
57use throttling::token_bucket::TokenBucketManager;
58
59type TimerToken = usize;
60
61const SYNC_TIMER: TimerToken = 0;
62const REQUEST_CLEANUP_TIMER: TimerToken = 1;
63const LOG_STATISTICS_TIMER: TimerToken = 2;
64const HEARTBEAT_TIMER: TimerToken = 3;
65const TOTAL_WEIGHT_IN_PAST_TIMER: TimerToken = 4;
66const CHECK_SYNC_NOT_READY_BLOCKS_TIMER: TimerToken = 7;
67
68pub struct Handler {
71 pub protocol_version: ProtocolVersion,
72
73 pub block_txs: Arc<BlockTxs>,
75
76 pub blooms: Blooms,
78
79 consensus: SharedConsensusGraph,
81
82 epochs: Epochs,
84
85 headers: Arc<Headers>,
87
88 join_handle: Option<thread::JoinHandle<()>>,
90
91 pub peers: Arc<Peers<FullPeerState>>,
93
94 pub receipts: Arc<Receipts>,
96
97 pub state_entries: StateEntries,
99
100 pub state_roots: Arc<StateRoots>,
102
103 stopped: Arc<AtomicBool>,
105
106 pub storage_roots: StorageRoots,
108
109 pub txs: Arc<Txs>,
111
112 pub tx_infos: TxInfos,
114
115 throttling_config_file: Option<String>,
117
118 pub witnesses: Arc<Witnesses>,
120}
121
122impl Handler {
123 pub fn new(
124 consensus: SharedConsensusGraph, graph: Arc<SynchronizationGraph>,
125 throttling_config_file: Option<String>,
126 notifications: Arc<Notifications>, config: LightNodeConfiguration,
127 ) -> Self {
128 let peers = Arc::new(Peers::new());
129 let request_id_allocator = Arc::new(UniqueId::new());
130
131 let headers = Arc::new(Headers::new(
132 graph.clone(),
133 peers.clone(),
134 request_id_allocator.clone(),
135 config.clone(),
136 ));
137
138 let epochs = Epochs::new(
139 consensus.clone(),
140 headers.clone(),
141 peers.clone(),
142 request_id_allocator.clone(),
143 config,
144 );
145
146 let witnesses = Arc::new(Witnesses::new(
147 consensus.clone(),
148 peers.clone(),
149 request_id_allocator.clone(),
150 ));
151
152 let blooms = Blooms::new(
153 peers.clone(),
154 request_id_allocator.clone(),
155 witnesses.clone(),
156 );
157
158 let receipts = Arc::new(Receipts::new(
159 peers.clone(),
160 request_id_allocator.clone(),
161 witnesses.clone(),
162 ));
163
164 let snapshot_epoch_count =
165 consensus.data_manager().get_snapshot_epoch_count() as u64;
166
167 let state_roots = Arc::new(StateRoots::new(
168 peers.clone(),
169 request_id_allocator.clone(),
170 snapshot_epoch_count,
171 witnesses.clone(),
172 ));
173
174 let state_entries = StateEntries::new(
175 peers.clone(),
176 state_roots.clone(),
177 request_id_allocator.clone(),
178 );
179
180 let storage_roots = StorageRoots::new(
181 peers.clone(),
182 state_roots.clone(),
183 request_id_allocator.clone(),
184 );
185
186 let txs =
187 Arc::new(Txs::new(peers.clone(), request_id_allocator.clone()));
188
189 let block_txs = Arc::new(BlockTxs::new(
190 consensus.clone(),
191 peers.clone(),
192 request_id_allocator.clone(),
193 txs.clone(),
194 ));
195
196 let tx_infos = TxInfos::new(
197 consensus.clone(),
198 peers.clone(),
199 request_id_allocator.clone(),
200 witnesses.clone(),
201 );
202
203 let stopped = Arc::new(AtomicBool::new(false));
204
205 let join_handle = Some(Self::start_witness_worker(
206 notifications,
207 witnesses.clone(),
208 stopped.clone(),
209 consensus.data_manager().clone(),
210 ));
211
212 Handler {
213 block_txs,
214 blooms,
215 consensus,
216 epochs,
217 headers,
218 join_handle,
219 peers,
220 protocol_version: LIGHT_PROTOCOL_VERSION,
221 receipts,
222 state_entries,
223 state_roots,
224 stopped,
225 storage_roots,
226 throttling_config_file,
227 tx_infos,
228 txs,
229 witnesses,
230 }
231 }
232
233 fn start_witness_worker(
236 notifications: Arc<Notifications>, witnesses: Arc<Witnesses>,
237 stopped: Arc<AtomicBool>, data_man: Arc<BlockDataManager>,
238 ) -> thread::JoinHandle<()> {
239 thread::Builder::new()
240 .name("Witness Worker".into())
241 .spawn(move || {
242 let mut receiver =
243 notifications.blame_verification_results.subscribe();
244
245 loop {
246 if stopped.load(Ordering::SeqCst) {
248 break;
249 }
250
251 let wait_for = Duration::from_secs(1);
253
254 let (height, maybe_witness) =
255 match receiver.recv_with_timeout(wait_for) {
256 Err(_) => continue, Ok(None) => return, Ok(Some(val)) => val,
259 };
260
261 trace!(
262 "Witness worker received: height = {:?}, maybe_witness = {:?}",
263 height, maybe_witness
264 );
265
266 data_man.remove_blamed_header_verified_roots(height);
287
288 match maybe_witness {
290 Some(w) => {
292 debug!("Requesting witness at height {}", w);
295 witnesses.request(w);
296 }
297
298 None => {
300 witnesses.in_flight.write().remove(&height);
302 }
303 }
304
305 *witnesses.height_of_latest_verified_header.write() = height;
306 }
307 })
308 .expect("Starting the Witness Worker should succeed")
309 }
310
311 #[inline]
312 fn get_existing_peer_state(
313 &self, peer: &NodeId,
314 ) -> Result<Arc<RwLock<FullPeerState>>> {
315 match self.peers.get(&peer) {
316 Some(state) => Ok(state),
317 None => {
318 bail!(Error::InternalError(format!(
321 "Received message from unknown peer={:?}",
322 peer
323 )));
324 }
325 }
326 }
327
328 #[allow(unused)]
329 #[inline]
330 fn peer_version(&self, peer: &NodeId) -> Result<ProtocolVersion> {
331 Ok(self.get_existing_peer_state(peer)?.read().protocol_version)
332 }
333
334 #[inline]
335 fn validate_peer_state(&self, peer: &NodeId, msg_id: MsgId) -> Result<()> {
336 let state = self.get_existing_peer_state(&peer)?;
337
338 if msg_id != msgid::STATUS_PONG_DEPRECATED
339 && msg_id != msgid::STATUS_PONG_V2
340 && !state.read().handshake_completed
341 {
342 warn!("Received msg={:?} from handshaking peer={:?}", msg_id, peer);
343 bail!(Error::UnexpectedMessage {
344 expected: vec![
345 msgid::STATUS_PONG_DEPRECATED,
346 msgid::STATUS_PONG_V2
347 ],
348 received: msg_id,
349 });
350 }
351
352 Ok(())
353 }
354
355 #[inline]
356 fn validate_peer_type(&self, node_type: NodeType) -> Result<()> {
357 match node_type {
358 NodeType::Archive => Ok(()),
359 NodeType::Full => Ok(()),
360 _ => bail!(Error::UnexpectedPeerType { node_type }),
361 }
362 }
363
364 #[inline]
365 fn validate_genesis_hash(&self, genesis: H256) -> Result<()> {
366 let ours = self.consensus.data_manager().true_genesis.hash();
367 let theirs = genesis;
368
369 if ours != theirs {
370 bail!(Error::GenesisMismatch { ours, theirs });
371 }
372
373 Ok(())
374 }
375
376 #[rustfmt::skip]
377 fn dispatch_message(
378 &self, io: &dyn NetworkContext, peer: &NodeId, msg_id: MsgId, rlp: Rlp,
379 ) -> Result<()> {
380 trace!("Dispatching message: peer={:?}, msg_id={:?}", peer, msg_id);
381 self.validate_peer_state(peer, msg_id)?;
382 let min_supported_ver = self.minimum_supported_version();
383 let protocol = io.get_protocol();
384
385 match msg_id {
386 msgid::STATUS_PONG_DEPRECATED => self.on_status_deprecated(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
388 msgid::STATUS_PONG_V2 => self.on_status_v2(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
389
390 msgid::BLOCK_HASHES => self.on_block_hashes(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
392 msgid::BLOCK_HEADERS => self.on_block_headers(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
393 msgid::BLOCK_TXS => self.on_block_txs(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
394 msgid::BLOOMS => self.on_blooms(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
395 msgid::NEW_BLOCK_HASHES => self.on_new_block_hashes(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
396 msgid::RECEIPTS => self.on_receipts(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
397 msgid::STATE_ENTRIES => self.on_state_entries(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
398 msgid::STATE_ROOTS => self.on_state_roots(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
399 msgid::STORAGE_ROOTS => self.on_storage_roots(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
400 msgid::TXS => self.on_txs(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
401 msgid::TX_INFOS => self.on_tx_infos(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
402 msgid::WITNESS_INFO => self.on_witness_info(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
403
404 msgid::THROTTLED => self.on_throttled(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
406
407 _ => bail!(Error::UnknownMessage{id: msg_id}),
408 }
409 }
410
411 #[inline]
412 pub fn median_peer_epoch(&self) -> Option<u64> {
413 let mut best_epochs = self.peers.fold(vec![], |mut res, state| {
414 res.push(state.read().best_epoch);
415 res
416 });
417
418 best_epochs.sort();
419
420 match best_epochs.len() {
421 0 => None,
422 n => Some(best_epochs[n / 2]),
423 }
424 }
425
426 #[inline]
427 fn catch_up_mode(&self) -> bool {
428 match self.median_peer_epoch() {
429 None => true,
430 Some(epoch) => {
431 let my_epoch = self.consensus.best_epoch_number();
432 my_epoch + CATCH_UP_EPOCH_LAG_THRESHOLD < epoch
433 }
434 }
435 }
436
437 #[inline]
438 fn print_stats(&self) {
439 match self.catch_up_mode() {
440 true => {
441 let latest_epoch = self.consensus.best_epoch_number();
442 let best_peer_epoch = self.epochs.best_peer_epoch();
443
444 let progress = if best_peer_epoch == 0 {
445 0.0
446 } else {
447 100.0 * (latest_epoch as f64) / (best_peer_epoch as f64)
448 };
449
450 info!(
451 "Catch-up mode: true, latest epoch: {} / {} ({:.2}%), latest verified: {}, inserted header count: {}",
452 latest_epoch,
453 best_peer_epoch,
454 progress,
455 self.witnesses.latest_verified(),
456 self.headers.inserted_count.load(Ordering::Relaxed),
457 )
458 }
459 false => info!(
460 "Catch-up mode: false, latest epoch: {}, latest verified: {}",
461 self.consensus.best_epoch_number(),
462 self.witnesses.latest_verified(),
463 ),
464 }
465 }
466
467 #[inline]
468 fn collect_terminals(&self) {
469 let terminals = self.peers.fold(vec![], |mut res, state| {
470 let mut state = state.write();
471 res.extend(state.terminals.iter());
472 state.terminals.clear();
473 res
474 });
475
476 let terminals = terminals.into_iter();
477 self.headers.request(terminals, HashSource::NewHash);
478 }
479
480 #[inline]
481 fn send_status(
482 &self, io: &dyn NetworkContext, peer: &NodeId,
483 peer_protocol_version: ProtocolVersion,
484 ) -> Result<()> {
485 let msg: Box<dyn Message>;
486
487 if peer_protocol_version == LIGHT_PROTO_V1 {
488 msg = Box::new(StatusPingDeprecatedV1 {
489 protocol_version: self.protocol_version.0,
490 genesis_hash: self.consensus.data_manager().true_genesis.hash(),
491 node_type: NodeType::Light,
492 });
493 } else {
494 msg = Box::new(StatusPingV2 {
495 chain_id: ChainIdParamsDeprecated {
496 chain_id: self.consensus.best_chain_id().in_native_space(),
497 },
498 genesis_hash: self.consensus.data_manager().true_genesis.hash(),
499 node_type: NodeType::Light,
500 });
501 }
502
503 msg.send(io, peer)?;
504 Ok(())
505 }
506
507 #[inline]
508 pub fn send_heartbeat(&self, io: &dyn NetworkContext) {
509 let peer_ids = self.peers.all_peers_satisfying(|_| true);
510
511 for peer in peer_ids {
512 let protocol_version = match self.get_existing_peer_state(&peer) {
513 Ok(state) => state.read().protocol_version,
514 Err(_) => {
515 warn!("Peer not found for heartbeat: {:?}", peer);
516 continue;
517 }
518 };
519
520 debug!("send_heartbeat peer={:?}", peer);
521
522 if let Err(e) = self.send_status(io, &peer, protocol_version) {
523 warn!(
524 "Error while sending heartbeat to peer {:?}: {:?}",
525 peer, e
526 );
527 }
528 }
529 }
530
531 #[inline]
532 pub fn send_raw_tx(
533 &self, io: &dyn NetworkContext, peer: &NodeId, raw: Vec<u8>,
534 ) -> Result<()> {
535 let msg: Box<dyn Message> = Box::new(SendRawTx { raw });
536 msg.send(io, peer)?;
537 Ok(())
538 }
539
540 fn on_status_v2(
541 &self, io: &dyn NetworkContext, peer: &NodeId, status: StatusPongV2,
542 ) -> Result<()> {
543 debug!("on_status (v2) peer={:?} status={:?}", peer, status);
544
545 self.validate_peer_type(status.node_type)?;
546 self.validate_genesis_hash(status.genesis_hash)?;
547 validate_chain_id(
548 &self
549 .consensus
550 .config()
551 .chain_id
552 .read()
553 .to_native_space_params(),
554 status.chain_id.into(),
555 status.best_epoch,
556 )?;
557
558 {
559 let state = self.get_existing_peer_state(peer)?;
560 let mut state = state.write();
561 state.best_epoch = status.best_epoch;
562 state.handshake_completed = true;
563 state.terminals = status.terminals.into_iter().collect();
564 }
565
566 self.start_sync(io);
569 Ok(())
570 }
571
572 fn on_status_deprecated(
573 &self, io: &dyn NetworkContext, peer: &NodeId,
574 status: StatusPongDeprecatedV1,
575 ) -> Result<()> {
576 debug!("on_status (v1) peer={:?} status={:?}", peer, status);
577
578 self.on_status_v2(
579 io,
580 peer,
581 StatusPongV2 {
582 chain_id: ChainIdParamsDeprecated {
583 chain_id: self.consensus.best_chain_id().in_native_space(),
584 },
585 node_type: status.node_type,
586 genesis_hash: status.genesis_hash,
587 best_epoch: status.best_epoch,
588 terminals: status.terminals,
589 },
590 )
591 }
592
593 fn on_block_hashes(
594 &self, io: &dyn NetworkContext, _peer: &NodeId,
595 resp: GetBlockHashesResponse,
596 ) -> Result<()> {
597 debug!(
598 "received {} block hashes (request id = {})",
599 resp.hashes.len(),
600 resp.request_id
601 );
602 trace!("on_block_hashes resp={:?}", resp);
603
604 self.epochs.receive(&resp.request_id);
605
606 let hashes = resp.hashes.into_iter();
608 self.headers.request(hashes, HashSource::Epoch);
609
610 self.start_sync(io);
611 Ok(())
612 }
613
614 fn on_block_headers(
615 &self, io: &dyn NetworkContext, peer: &NodeId,
616 resp: GetBlockHeadersResponse,
617 ) -> Result<()> {
618 debug!(
619 "received {} block headers (request id = {})",
620 resp.headers.len(),
621 resp.request_id
622 );
623 trace!("on_block_headers resp={:?}", resp);
624
625 self.headers.receive(
626 peer,
627 resp.request_id,
628 resp.headers.into_iter(),
629 )?;
630
631 self.start_sync(io);
632 Ok(())
633 }
634
635 fn on_block_txs(
636 &self, io: &dyn NetworkContext, peer: &NodeId,
637 resp: GetBlockTxsResponse,
638 ) -> Result<()> {
639 debug!(
640 "received {} block txs (request id = {})",
641 resp.block_txs.len(),
642 resp.request_id
643 );
644 trace!("on_block_txs resp={:?}", resp);
645
646 self.block_txs.receive(
647 peer,
648 resp.request_id,
649 resp.block_txs.into_iter(),
650 )?;
651
652 self.block_txs.sync(io);
653 Ok(())
654 }
655
656 fn on_blooms(
657 &self, io: &dyn NetworkContext, peer: &NodeId, resp: GetBloomsResponse,
658 ) -> Result<()> {
659 debug!(
660 "received {} blooms (request id = {})",
661 resp.blooms.len(),
662 resp.request_id
663 );
664 trace!("on_blooms resp={:?}", resp);
665
666 self.blooms
667 .receive(peer, resp.request_id, resp.blooms.into_iter())?;
668
669 self.blooms.sync(io);
670 Ok(())
671 }
672
673 fn on_new_block_hashes(
674 &self, io: &dyn NetworkContext, peer: &NodeId, msg: NewBlockHashes,
675 ) -> Result<()> {
676 debug!("received {} new block hashes", msg.hashes.len());
677 trace!("on_new_block_hashes msg={:?}", msg);
678
679 if self.catch_up_mode() {
680 if let Some(state) = self.peers.get(peer) {
681 let mut state = state.write();
682 state.terminals.extend(msg.hashes);
683 }
684 return Ok(());
685 }
686
687 self.headers.request_now_from_peer(
688 io,
689 peer,
690 msg.hashes.into_iter(),
691 HashSource::NewHash,
692 );
693
694 self.start_sync(io);
695 Ok(())
696 }
697
698 fn on_receipts(
699 &self, io: &dyn NetworkContext, peer: &NodeId,
700 resp: GetReceiptsResponse,
701 ) -> Result<()> {
702 debug!(
703 "received {} receipts (request id = {})",
704 resp.receipts.len(),
705 resp.request_id
706 );
707 trace!("on_receipts resp={:?}", resp);
708
709 self.receipts.receive(
710 peer,
711 resp.request_id,
712 resp.receipts.into_iter(),
713 )?;
714
715 self.receipts.sync(io);
716 Ok(())
717 }
718
719 fn on_state_entries(
720 &self, io: &dyn NetworkContext, peer: &NodeId,
721 resp: GetStateEntriesResponse,
722 ) -> Result<()> {
723 debug!(
724 "received {} state entries (request id = {})",
725 resp.entries.len(),
726 resp.request_id
727 );
728 trace!("on_state_entries resp={:?}", resp);
729
730 self.state_entries.receive(
731 peer,
732 resp.request_id,
733 resp.entries.into_iter(),
734 )?;
735
736 self.state_entries.sync(io);
737 Ok(())
738 }
739
740 fn on_state_roots(
741 &self, io: &dyn NetworkContext, peer: &NodeId,
742 resp: GetStateRootsResponse,
743 ) -> Result<()> {
744 debug!(
745 "received {} state roots (request id = {})",
746 resp.state_roots.len(),
747 resp.request_id
748 );
749 trace!("on_state_roots resp={:?}", resp);
750
751 self.state_roots.receive(
752 peer,
753 resp.request_id,
754 resp.state_roots.into_iter(),
755 )?;
756
757 self.state_roots.sync(io);
758 Ok(())
759 }
760
761 fn on_storage_roots(
762 &self, io: &dyn NetworkContext, peer: &NodeId,
763 resp: GetStorageRootsResponse,
764 ) -> Result<()> {
765 debug!(
766 "received {} storage roots (request id = {})",
767 resp.roots.len(),
768 resp.request_id
769 );
770 trace!("on_storage_roots resp={:?}", resp);
771
772 self.storage_roots.receive(
773 peer,
774 resp.request_id,
775 resp.roots.into_iter(),
776 )?;
777
778 self.storage_roots.sync(io);
779 Ok(())
780 }
781
782 fn on_txs(
783 &self, io: &dyn NetworkContext, peer: &NodeId, resp: GetTxsResponse,
784 ) -> Result<()> {
785 debug!(
786 "received {} txs (request id = {})",
787 resp.txs.len(),
788 resp.request_id
789 );
790 trace!("on_txs resp={:?}", resp);
791
792 self.txs
793 .receive(peer, resp.request_id, resp.txs.into_iter())?;
794
795 self.txs.sync(io);
796 Ok(())
797 }
798
799 fn on_tx_infos(
800 &self, io: &dyn NetworkContext, peer: &NodeId, resp: GetTxInfosResponse,
801 ) -> Result<()> {
802 debug!(
803 "received {} tx infos (request id = {})",
804 resp.infos.len(),
805 resp.request_id
806 );
807 trace!("on_tx_infos resp={:?}", resp);
808
809 self.tx_infos
810 .receive(peer, resp.request_id, resp.infos.into_iter())?;
811
812 self.tx_infos.sync(io);
813 Ok(())
814 }
815
816 fn on_witness_info(
817 &self, io: &dyn NetworkContext, peer: &NodeId,
818 resp: GetWitnessInfoResponse,
819 ) -> Result<()> {
820 debug!(
821 "received {} witnesses (request id = {})",
822 resp.infos.len(),
823 resp.request_id
824 );
825 trace!("on_witness_info resp={:?}", resp);
826
827 self.witnesses.receive(
828 peer,
829 resp.request_id,
830 resp.infos.into_iter(),
831 )?;
832
833 self.witnesses.sync(io);
834 Ok(())
835 }
836
837 fn start_sync(&self, io: &dyn NetworkContext) {
838 match self.catch_up_mode() {
839 true => {
840 self.headers.sync(io);
841 self.epochs.sync(io);
842 }
843 false => {
844 self.collect_terminals();
845 self.headers.sync(io);
846 }
847 };
848
849 self.witnesses.sync(io);
850 self.blooms.sync(io);
851 self.receipts.sync(io);
852 self.block_txs.sync(io);
853 self.state_entries.sync(io);
854 self.state_roots.sync(io);
855 self.storage_roots.sync(io);
856 self.txs.sync(io);
857 self.tx_infos.sync(io);
858 }
859
860 fn clean_up_requests(&self) {
861 self.block_txs.clean_up();
862 self.blooms.clean_up();
863 self.epochs.clean_up();
864 self.headers.clean_up();
865 self.receipts.clean_up();
866 self.state_entries.clean_up();
867 self.state_roots.clean_up();
868 self.storage_roots.clean_up();
869 self.tx_infos.clean_up();
870 self.txs.clean_up();
871 self.witnesses.clean_up();
872 }
873
874 fn on_throttled(
875 &self, _io: &dyn NetworkContext, peer: &NodeId, resp: Throttled,
876 ) -> Result<()> {
877 debug!("on_throttled resp={:?}", resp);
878
879 let peer = self.get_existing_peer_state(peer)?;
880 peer.write().throttled_msgs.set_throttled(
881 resp.msg_id,
882 Instant::now() + Duration::from_nanos(resp.wait_time_nanos),
883 );
884
885 Ok(())
896 }
897}
898
899impl Drop for Handler {
900 fn drop(&mut self) {
901 self.stopped.store(true, Ordering::SeqCst);
903
904 if let Some(thread) = self.join_handle.take() {
905 assert!(
908 thread.thread().id() != thread::current().id(),
909 "Attempting to join Witness Worker thread from itself (id = {:?})", thread::current().id(),
910 );
911
912 thread.join().expect("Witness Worker should not panic");
915
916 }
922 }
923}
924
925impl NetworkProtocolHandler for Handler {
926 fn minimum_supported_version(&self) -> ProtocolVersion {
927 let my_version = self.protocol_version.0;
928 if my_version > LIGHT_PROTOCOL_OLD_VERSIONS_TO_SUPPORT {
929 ProtocolVersion(my_version - LIGHT_PROTOCOL_OLD_VERSIONS_TO_SUPPORT)
930 } else {
931 LIGHT_PROTO_V1
932 }
933 }
934
935 fn initialize(&self, io: &dyn NetworkContext) {
936 io.register_timer(SYNC_TIMER, *SYNC_PERIOD)
937 .expect("Error registering sync timer");
938
939 io.register_timer(REQUEST_CLEANUP_TIMER, *CLEANUP_PERIOD)
940 .expect("Error registering request cleanup timer");
941
942 io.register_timer(LOG_STATISTICS_TIMER, Duration::from_secs(1))
943 .expect("Error registering log statistics timer");
944
945 io.register_timer(HEARTBEAT_TIMER, *HEARTBEAT_PERIOD)
946 .expect("Error registering heartbeat timer");
947
948 io.register_timer(TOTAL_WEIGHT_IN_PAST_TIMER, Duration::from_secs(20))
949 .expect("Error registering total weight in past timer");
950 io.register_timer(
951 CHECK_SYNC_NOT_READY_BLOCKS_TIMER,
952 Duration::from_millis(1000),
953 )
954 .expect("Error registering CHECK_FUTURE_BLOCK_TIMER");
955 }
956
957 fn on_message(&self, io: &dyn NetworkContext, peer: &NodeId, raw: &[u8]) {
958 trace!("on_message: peer={:?}, raw={:?}", peer, raw);
959
960 let (msg_id, rlp) = match decode_msg(raw) {
961 Some(msg) => msg,
962 None => {
963 return handle_error(
964 io,
965 peer,
966 msgid::INVALID,
967 &Error::InvalidMessageFormat.into(),
968 )
969 }
970 };
971
972 trace!("on_message: peer={:?}, msgid={:?}", peer, msg_id);
973
974 if let Err(e) = self.dispatch_message(io, peer, msg_id.into(), rlp) {
975 handle_error(io, peer, msg_id.into(), &e);
976 }
977 }
978
979 fn on_peer_connected(
980 &self, io: &dyn NetworkContext, node_id: &NodeId,
981 peer_protocol_version: ProtocolVersion,
982 _pos_public_key: Option<(ConsensusPublicKey, ConsensusVRFPublicKey)>,
983 ) {
984 debug!("on_peer_connected: peer={:?}", node_id);
985
986 match self.send_status(io, node_id, peer_protocol_version) {
987 Ok(_) => {
988 self.peers.insert(*node_id);
990 self.peers.get(node_id).unwrap().write().protocol_version =
991 peer_protocol_version;
992
993 if let Some(ref file) = self.throttling_config_file {
994 let peer = self.peers.get(node_id).expect("peer not found");
995 peer.write().unexpected_msgs = TokenBucketManager::load(
996 file,
997 Some("light_protocol::unexpected_msgs"),
998 )
999 .expect("invalid throttling configuration file");
1000 }
1001 }
1002 Err(e) => {
1003 warn!("Error while sending status: {}", e);
1004 handle_error(
1005 io,
1006 node_id,
1007 msgid::INVALID,
1008 &Error::SendStatusFailed {
1009 peer: *node_id,
1010 source: None,
1011 }
1012 .into(),
1013 );
1014 }
1015 }
1016 }
1017
1018 fn on_peer_disconnected(&self, _io: &dyn NetworkContext, peer: &NodeId) {
1019 debug!("on_peer_disconnected: peer={}", peer);
1020 self.peers.remove(peer);
1021 }
1022
1023 fn on_timeout(&self, io: &dyn NetworkContext, timer: TimerToken) {
1024 trace!("Timeout: timer={:?}", timer);
1025 match timer {
1026 SYNC_TIMER => self.start_sync(io),
1027 REQUEST_CLEANUP_TIMER => self.clean_up_requests(),
1028 LOG_STATISTICS_TIMER => {
1029 self.print_stats();
1030 self.block_txs.print_stats();
1031 self.blooms.print_stats();
1032 self.epochs.print_stats();
1033 self.headers.print_stats();
1034 self.receipts.print_stats();
1035 self.state_entries.print_stats();
1036 self.state_roots.print_stats();
1037 self.storage_roots.print_stats();
1038 self.tx_infos.print_stats();
1039 self.txs.print_stats();
1040 self.witnesses.print_stats();
1041 }
1042 HEARTBEAT_TIMER => {
1043 self.send_heartbeat(io);
1044 }
1045 TOTAL_WEIGHT_IN_PAST_TIMER => {
1046 self.consensus.update_total_weight_delta_heartbeat();
1047 }
1048 CHECK_SYNC_NOT_READY_BLOCKS_TIMER => {
1049 self.headers
1050 .graph
1051 .check_not_ready_frontier(true );
1052 }
1053 _ => warn!("Unknown timer {} triggered.", timer),
1055 }
1056 }
1057
1058 fn send_local_message(&self, _io: &dyn NetworkContext, _message: Vec<u8>) {
1059 unreachable!("Light node handler does not have send_local_message.")
1060 }
1061
1062 fn on_work_dispatch(&self, _io: &dyn NetworkContext, _work_type: u8) {
1063 unreachable!("Light node handler does not have on_work_dispatch.")
1064 }
1065}