cfxcore/light_protocol/handler/
mod.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5pub mod sync;
6
7use crate::{
8    block_data_manager::BlockDataManager,
9    consensus::SharedConsensusGraph,
10    light_protocol::{
11        common::{validate_chain_id, FullPeerState, Peers},
12        error::*,
13        handle_error,
14        message::{
15            msgid, BlockHashes as GetBlockHashesResponse,
16            BlockHeaders as GetBlockHeadersResponse,
17            BlockTxs as GetBlockTxsResponse, Blooms as GetBloomsResponse,
18            NewBlockHashes, NodeType, Receipts as GetReceiptsResponse,
19            SendRawTx, StateEntries as GetStateEntriesResponse,
20            StateRoots as GetStateRootsResponse, StatusPingDeprecatedV1,
21            StatusPingV2, StatusPongDeprecatedV1, StatusPongV2,
22            StorageRoots as GetStorageRootsResponse,
23            TxInfos as GetTxInfosResponse, Txs as GetTxsResponse,
24            WitnessInfo as GetWitnessInfoResponse,
25        },
26        LightNodeConfiguration, LIGHT_PROTOCOL_OLD_VERSIONS_TO_SUPPORT,
27        LIGHT_PROTOCOL_VERSION, LIGHT_PROTO_V1,
28    },
29    message::{decode_msg, decode_rlp_and_check_deprecation, Message, MsgId},
30    sync::{message::Throttled, SynchronizationGraph},
31    Notifications, UniqueId,
32};
33use cfx_internal_common::ChainIdParamsDeprecated;
34use cfx_parameters::light::{
35    CATCH_UP_EPOCH_LAG_THRESHOLD, CLEANUP_PERIOD, HEARTBEAT_PERIOD, SYNC_PERIOD,
36};
37use cfx_types::H256;
38use diem_types::validator_config::{ConsensusPublicKey, ConsensusVRFPublicKey};
39use network::{
40    node_table::NodeId, service::ProtocolVersion, NetworkContext,
41    NetworkProtocolHandler,
42};
43use parking_lot::RwLock;
44use rlp::Rlp;
45use std::{
46    sync::{
47        atomic::{AtomicBool, Ordering},
48        Arc,
49    },
50    thread,
51    time::{Duration, Instant},
52};
53use sync::{
54    BlockTxs, Blooms, Epochs, HashSource, Headers, Receipts, StateEntries,
55    StateRoots, StorageRoots, TxInfos, Txs, Witnesses,
56};
57use throttling::token_bucket::TokenBucketManager;
58
59type TimerToken = usize;
60
61const SYNC_TIMER: TimerToken = 0;
62const REQUEST_CLEANUP_TIMER: TimerToken = 1;
63const LOG_STATISTICS_TIMER: TimerToken = 2;
64const HEARTBEAT_TIMER: TimerToken = 3;
65const TOTAL_WEIGHT_IN_PAST_TIMER: TimerToken = 4;
66const CHECK_SYNC_NOT_READY_BLOCKS_TIMER: TimerToken = 7;
67
68/// Handler is responsible for maintaining peer meta-information and
69/// dispatching messages to the query and sync sub-handlers.
70pub struct Handler {
71    pub protocol_version: ProtocolVersion,
72
73    // block tx sync manager
74    pub block_txs: Arc<BlockTxs>,
75
76    // bloom sync manager
77    pub blooms: Blooms,
78
79    // shared consensus graph
80    consensus: SharedConsensusGraph,
81
82    // epoch sync manager
83    epochs: Epochs,
84
85    // header sync manager
86    headers: Arc<Headers>,
87
88    // join handle for witness worker thread
89    join_handle: Option<thread::JoinHandle<()>>,
90
91    // collection of all peers available
92    pub peers: Arc<Peers<FullPeerState>>,
93
94    // receipt sync manager
95    pub receipts: Arc<Receipts>,
96
97    // state entry sync manager
98    pub state_entries: StateEntries,
99
100    // state root sync manager
101    pub state_roots: Arc<StateRoots>,
102
103    // whether the witness worker thread should be stopped
104    stopped: Arc<AtomicBool>,
105
106    // storage root sync manager
107    pub storage_roots: StorageRoots,
108
109    // tx sync manager
110    pub txs: Arc<Txs>,
111
112    // tx info sync manager
113    pub tx_infos: TxInfos,
114
115    // path to unexpected messages config file
116    throttling_config_file: Option<String>,
117
118    // witness sync manager
119    pub witnesses: Arc<Witnesses>,
120}
121
122impl Handler {
123    pub fn new(
124        consensus: SharedConsensusGraph, graph: Arc<SynchronizationGraph>,
125        throttling_config_file: Option<String>,
126        notifications: Arc<Notifications>, config: LightNodeConfiguration,
127    ) -> Self {
128        let peers = Arc::new(Peers::new());
129        let request_id_allocator = Arc::new(UniqueId::new());
130
131        let headers = Arc::new(Headers::new(
132            graph.clone(),
133            peers.clone(),
134            request_id_allocator.clone(),
135            config.clone(),
136        ));
137
138        let epochs = Epochs::new(
139            consensus.clone(),
140            headers.clone(),
141            peers.clone(),
142            request_id_allocator.clone(),
143            config,
144        );
145
146        let witnesses = Arc::new(Witnesses::new(
147            consensus.clone(),
148            peers.clone(),
149            request_id_allocator.clone(),
150        ));
151
152        let blooms = Blooms::new(
153            peers.clone(),
154            request_id_allocator.clone(),
155            witnesses.clone(),
156        );
157
158        let receipts = Arc::new(Receipts::new(
159            peers.clone(),
160            request_id_allocator.clone(),
161            witnesses.clone(),
162        ));
163
164        let snapshot_epoch_count =
165            consensus.data_manager().get_snapshot_epoch_count() as u64;
166
167        let state_roots = Arc::new(StateRoots::new(
168            peers.clone(),
169            request_id_allocator.clone(),
170            snapshot_epoch_count,
171            witnesses.clone(),
172        ));
173
174        let state_entries = StateEntries::new(
175            peers.clone(),
176            state_roots.clone(),
177            request_id_allocator.clone(),
178        );
179
180        let storage_roots = StorageRoots::new(
181            peers.clone(),
182            state_roots.clone(),
183            request_id_allocator.clone(),
184        );
185
186        let txs =
187            Arc::new(Txs::new(peers.clone(), request_id_allocator.clone()));
188
189        let block_txs = Arc::new(BlockTxs::new(
190            consensus.clone(),
191            peers.clone(),
192            request_id_allocator.clone(),
193            txs.clone(),
194        ));
195
196        let tx_infos = TxInfos::new(
197            consensus.clone(),
198            peers.clone(),
199            request_id_allocator.clone(),
200            witnesses.clone(),
201        );
202
203        let stopped = Arc::new(AtomicBool::new(false));
204
205        let join_handle = Some(Self::start_witness_worker(
206            notifications,
207            witnesses.clone(),
208            stopped.clone(),
209            consensus.data_manager().clone(),
210        ));
211
212        Handler {
213            block_txs,
214            blooms,
215            consensus,
216            epochs,
217            headers,
218            join_handle,
219            peers,
220            protocol_version: LIGHT_PROTOCOL_VERSION,
221            receipts,
222            state_entries,
223            state_roots,
224            stopped,
225            storage_roots,
226            throttling_config_file,
227            tx_infos,
228            txs,
229            witnesses,
230        }
231    }
232
233    // start a standalone thread for requesting witnesses.
234    // this thread will be joined when `Handler` is dropped.
235    fn start_witness_worker(
236        notifications: Arc<Notifications>, witnesses: Arc<Witnesses>,
237        stopped: Arc<AtomicBool>, data_man: Arc<BlockDataManager>,
238    ) -> thread::JoinHandle<()> {
239        thread::Builder::new()
240            .name("Witness Worker".into())
241            .spawn(move || {
242                let mut receiver =
243                    notifications.blame_verification_results.subscribe();
244
245                loop {
246                    // `stopped` is set during Drop
247                    if stopped.load(Ordering::SeqCst) {
248                        break;
249                    }
250
251                    // receive next item from channel
252                    let wait_for = Duration::from_secs(1);
253
254                    let (height, maybe_witness) =
255                        match receiver.recv_with_timeout(wait_for) {
256                            Err(_) => continue, // channel empty, try again
257                            Ok(None) => return, // sender dropped, terminate
258                            Ok(Some(val)) => val,
259                        };
260
261                    trace!(
262                        "Witness worker received: height = {:?}, maybe_witness = {:?}",
263                        height, maybe_witness
264                    );
265
266                    // avoid serving stale roots from db
267                    //
268                    //                 blame
269                    //              ............
270                    //              v          |
271                    //             ---        ---
272                    //         .- | B | <--- | C | <--- ...
273                    //  ---    |   ---        ---
274                    // | A | <-*
275                    //  ---    |   ---
276                    //         .- | D | <--- ...
277                    //             ---
278                    //              ^
279                    //          height = X
280                    //
281                    // we receive A, B, C, ..., A, D (chain reorg),
282                    // we stored the verified roots of B on disk,
283                    // after chain reorg, height X is not blamed anymore
284                    // --> need to make sure to serve correct roots directly from
285                    //     header D instead of the stale roots retrieved for B
286                    data_man.remove_blamed_header_verified_roots(height);
287
288                    // handle witness
289                    match maybe_witness {
290                        // request witness for blamed headers
291                        Some(w) => {
292                            // this request covers all blamed headers:
293                            // [w - w.blame, w - w.blame + 1, ..., w]
294                            debug!("Requesting witness at height {}", w);
295                            witnesses.request(w);
296                        }
297
298                        // for non-blamed headers, we will serve roots from disk
299                        None => {
300                            // `height` might have been blamed before a chain reorg
301                            witnesses.in_flight.write().remove(&height);
302                        }
303                    }
304
305                    *witnesses.height_of_latest_verified_header.write() = height;
306                }
307            })
308            .expect("Starting the Witness Worker should succeed")
309    }
310
311    #[inline]
312    fn get_existing_peer_state(
313        &self, peer: &NodeId,
314    ) -> Result<Arc<RwLock<FullPeerState>>> {
315        match self.peers.get(&peer) {
316            Some(state) => Ok(state),
317            None => {
318                // NOTE: this should not happen as we register
319                // all peers in `on_peer_connected`
320                bail!(Error::InternalError(format!(
321                    "Received message from unknown peer={:?}",
322                    peer
323                )));
324            }
325        }
326    }
327
328    #[allow(unused)]
329    #[inline]
330    fn peer_version(&self, peer: &NodeId) -> Result<ProtocolVersion> {
331        Ok(self.get_existing_peer_state(peer)?.read().protocol_version)
332    }
333
334    #[inline]
335    fn validate_peer_state(&self, peer: &NodeId, msg_id: MsgId) -> Result<()> {
336        let state = self.get_existing_peer_state(&peer)?;
337
338        if msg_id != msgid::STATUS_PONG_DEPRECATED
339            && msg_id != msgid::STATUS_PONG_V2
340            && !state.read().handshake_completed
341        {
342            warn!("Received msg={:?} from handshaking peer={:?}", msg_id, peer);
343            bail!(Error::UnexpectedMessage {
344                expected: vec![
345                    msgid::STATUS_PONG_DEPRECATED,
346                    msgid::STATUS_PONG_V2
347                ],
348                received: msg_id,
349            });
350        }
351
352        Ok(())
353    }
354
355    #[inline]
356    fn validate_peer_type(&self, node_type: NodeType) -> Result<()> {
357        match node_type {
358            NodeType::Archive => Ok(()),
359            NodeType::Full => Ok(()),
360            _ => bail!(Error::UnexpectedPeerType { node_type }),
361        }
362    }
363
364    #[inline]
365    fn validate_genesis_hash(&self, genesis: H256) -> Result<()> {
366        let ours = self.consensus.data_manager().true_genesis.hash();
367        let theirs = genesis;
368
369        if ours != theirs {
370            bail!(Error::GenesisMismatch { ours, theirs });
371        }
372
373        Ok(())
374    }
375
376    #[rustfmt::skip]
377    fn dispatch_message(
378        &self, io: &dyn NetworkContext, peer: &NodeId, msg_id: MsgId, rlp: Rlp,
379    ) -> Result<()> {
380        trace!("Dispatching message: peer={:?}, msg_id={:?}", peer, msg_id);
381        self.validate_peer_state(peer, msg_id)?;
382        let min_supported_ver = self.minimum_supported_version();
383        let protocol = io.get_protocol();
384
385        match msg_id {
386            // general messages
387            msgid::STATUS_PONG_DEPRECATED => self.on_status_deprecated(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
388            msgid::STATUS_PONG_V2 => self.on_status_v2(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
389
390            // sync messages
391            msgid::BLOCK_HASHES => self.on_block_hashes(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
392            msgid::BLOCK_HEADERS => self.on_block_headers(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
393            msgid::BLOCK_TXS => self.on_block_txs(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
394            msgid::BLOOMS => self.on_blooms(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
395            msgid::NEW_BLOCK_HASHES => self.on_new_block_hashes(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
396            msgid::RECEIPTS => self.on_receipts(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
397            msgid::STATE_ENTRIES => self.on_state_entries(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
398            msgid::STATE_ROOTS => self.on_state_roots(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
399            msgid::STORAGE_ROOTS => self.on_storage_roots(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
400            msgid::TXS => self.on_txs(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
401            msgid::TX_INFOS => self.on_tx_infos(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
402            msgid::WITNESS_INFO => self.on_witness_info(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
403
404            // request was throttled by service provider
405            msgid::THROTTLED => self.on_throttled(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
406
407            _ => bail!(Error::UnknownMessage{id: msg_id}),
408        }
409    }
410
411    #[inline]
412    pub fn median_peer_epoch(&self) -> Option<u64> {
413        let mut best_epochs = self.peers.fold(vec![], |mut res, state| {
414            res.push(state.read().best_epoch);
415            res
416        });
417
418        best_epochs.sort();
419
420        match best_epochs.len() {
421            0 => None,
422            n => Some(best_epochs[n / 2]),
423        }
424    }
425
426    #[inline]
427    fn catch_up_mode(&self) -> bool {
428        match self.median_peer_epoch() {
429            None => true,
430            Some(epoch) => {
431                let my_epoch = self.consensus.best_epoch_number();
432                my_epoch + CATCH_UP_EPOCH_LAG_THRESHOLD < epoch
433            }
434        }
435    }
436
437    #[inline]
438    fn print_stats(&self) {
439        match self.catch_up_mode() {
440            true => {
441                let latest_epoch = self.consensus.best_epoch_number();
442                let best_peer_epoch = self.epochs.best_peer_epoch();
443
444                let progress = if best_peer_epoch == 0 {
445                    0.0
446                } else {
447                    100.0 * (latest_epoch as f64) / (best_peer_epoch as f64)
448                };
449
450                info!(
451                    "Catch-up mode: true, latest epoch: {} / {} ({:.2}%), latest verified: {}, inserted header count: {}",
452                    latest_epoch,
453                    best_peer_epoch,
454                    progress,
455                    self.witnesses.latest_verified(),
456                    self.headers.inserted_count.load(Ordering::Relaxed),
457                )
458            }
459            false => info!(
460                "Catch-up mode: false, latest epoch: {}, latest verified: {}",
461                self.consensus.best_epoch_number(),
462                self.witnesses.latest_verified(),
463            ),
464        }
465    }
466
467    #[inline]
468    fn collect_terminals(&self) {
469        let terminals = self.peers.fold(vec![], |mut res, state| {
470            let mut state = state.write();
471            res.extend(state.terminals.iter());
472            state.terminals.clear();
473            res
474        });
475
476        let terminals = terminals.into_iter();
477        self.headers.request(terminals, HashSource::NewHash);
478    }
479
480    #[inline]
481    fn send_status(
482        &self, io: &dyn NetworkContext, peer: &NodeId,
483        peer_protocol_version: ProtocolVersion,
484    ) -> Result<()> {
485        let msg: Box<dyn Message>;
486
487        if peer_protocol_version == LIGHT_PROTO_V1 {
488            msg = Box::new(StatusPingDeprecatedV1 {
489                protocol_version: self.protocol_version.0,
490                genesis_hash: self.consensus.data_manager().true_genesis.hash(),
491                node_type: NodeType::Light,
492            });
493        } else {
494            msg = Box::new(StatusPingV2 {
495                chain_id: ChainIdParamsDeprecated {
496                    chain_id: self.consensus.best_chain_id().in_native_space(),
497                },
498                genesis_hash: self.consensus.data_manager().true_genesis.hash(),
499                node_type: NodeType::Light,
500            });
501        }
502
503        msg.send(io, peer)?;
504        Ok(())
505    }
506
507    #[inline]
508    pub fn send_heartbeat(&self, io: &dyn NetworkContext) {
509        let peer_ids = self.peers.all_peers_satisfying(|_| true);
510
511        for peer in peer_ids {
512            let protocol_version = match self.get_existing_peer_state(&peer) {
513                Ok(state) => state.read().protocol_version,
514                Err(_) => {
515                    warn!("Peer not found for heartbeat: {:?}", peer);
516                    continue;
517                }
518            };
519
520            debug!("send_heartbeat peer={:?}", peer);
521
522            if let Err(e) = self.send_status(io, &peer, protocol_version) {
523                warn!(
524                    "Error while sending heartbeat to peer {:?}: {:?}",
525                    peer, e
526                );
527            }
528        }
529    }
530
531    #[inline]
532    pub fn send_raw_tx(
533        &self, io: &dyn NetworkContext, peer: &NodeId, raw: Vec<u8>,
534    ) -> Result<()> {
535        let msg: Box<dyn Message> = Box::new(SendRawTx { raw });
536        msg.send(io, peer)?;
537        Ok(())
538    }
539
540    fn on_status_v2(
541        &self, io: &dyn NetworkContext, peer: &NodeId, status: StatusPongV2,
542    ) -> Result<()> {
543        debug!("on_status (v2) peer={:?} status={:?}", peer, status);
544
545        self.validate_peer_type(status.node_type)?;
546        self.validate_genesis_hash(status.genesis_hash)?;
547        validate_chain_id(
548            &self
549                .consensus
550                .config()
551                .chain_id
552                .read()
553                .to_native_space_params(),
554            status.chain_id.into(),
555            status.best_epoch,
556        )?;
557
558        {
559            let state = self.get_existing_peer_state(peer)?;
560            let mut state = state.write();
561            state.best_epoch = status.best_epoch;
562            state.handshake_completed = true;
563            state.terminals = status.terminals.into_iter().collect();
564        }
565
566        // NOTE: `start_sync` acquires read locks on peer states so
567        // we need to make sure to release locks before calling it
568        self.start_sync(io);
569        Ok(())
570    }
571
572    fn on_status_deprecated(
573        &self, io: &dyn NetworkContext, peer: &NodeId,
574        status: StatusPongDeprecatedV1,
575    ) -> Result<()> {
576        debug!("on_status (v1) peer={:?} status={:?}", peer, status);
577
578        self.on_status_v2(
579            io,
580            peer,
581            StatusPongV2 {
582                chain_id: ChainIdParamsDeprecated {
583                    chain_id: self.consensus.best_chain_id().in_native_space(),
584                },
585                node_type: status.node_type,
586                genesis_hash: status.genesis_hash,
587                best_epoch: status.best_epoch,
588                terminals: status.terminals,
589            },
590        )
591    }
592
593    fn on_block_hashes(
594        &self, io: &dyn NetworkContext, _peer: &NodeId,
595        resp: GetBlockHashesResponse,
596    ) -> Result<()> {
597        debug!(
598            "received {} block hashes (request id = {})",
599            resp.hashes.len(),
600            resp.request_id
601        );
602        trace!("on_block_hashes resp={:?}", resp);
603
604        self.epochs.receive(&resp.request_id);
605
606        // TODO(thegaram): do not request hashes that we did not ask for
607        let hashes = resp.hashes.into_iter();
608        self.headers.request(hashes, HashSource::Epoch);
609
610        self.start_sync(io);
611        Ok(())
612    }
613
614    fn on_block_headers(
615        &self, io: &dyn NetworkContext, peer: &NodeId,
616        resp: GetBlockHeadersResponse,
617    ) -> Result<()> {
618        debug!(
619            "received {} block headers (request id = {})",
620            resp.headers.len(),
621            resp.request_id
622        );
623        trace!("on_block_headers resp={:?}", resp);
624
625        self.headers.receive(
626            peer,
627            resp.request_id,
628            resp.headers.into_iter(),
629        )?;
630
631        self.start_sync(io);
632        Ok(())
633    }
634
635    fn on_block_txs(
636        &self, io: &dyn NetworkContext, peer: &NodeId,
637        resp: GetBlockTxsResponse,
638    ) -> Result<()> {
639        debug!(
640            "received {} block txs (request id = {})",
641            resp.block_txs.len(),
642            resp.request_id
643        );
644        trace!("on_block_txs resp={:?}", resp);
645
646        self.block_txs.receive(
647            peer,
648            resp.request_id,
649            resp.block_txs.into_iter(),
650        )?;
651
652        self.block_txs.sync(io);
653        Ok(())
654    }
655
656    fn on_blooms(
657        &self, io: &dyn NetworkContext, peer: &NodeId, resp: GetBloomsResponse,
658    ) -> Result<()> {
659        debug!(
660            "received {} blooms (request id = {})",
661            resp.blooms.len(),
662            resp.request_id
663        );
664        trace!("on_blooms resp={:?}", resp);
665
666        self.blooms
667            .receive(peer, resp.request_id, resp.blooms.into_iter())?;
668
669        self.blooms.sync(io);
670        Ok(())
671    }
672
673    fn on_new_block_hashes(
674        &self, io: &dyn NetworkContext, peer: &NodeId, msg: NewBlockHashes,
675    ) -> Result<()> {
676        debug!("received {} new block hashes", msg.hashes.len());
677        trace!("on_new_block_hashes msg={:?}", msg);
678
679        if self.catch_up_mode() {
680            if let Some(state) = self.peers.get(peer) {
681                let mut state = state.write();
682                state.terminals.extend(msg.hashes);
683            }
684            return Ok(());
685        }
686
687        self.headers.request_now_from_peer(
688            io,
689            peer,
690            msg.hashes.into_iter(),
691            HashSource::NewHash,
692        );
693
694        self.start_sync(io);
695        Ok(())
696    }
697
698    fn on_receipts(
699        &self, io: &dyn NetworkContext, peer: &NodeId,
700        resp: GetReceiptsResponse,
701    ) -> Result<()> {
702        debug!(
703            "received {} receipts (request id = {})",
704            resp.receipts.len(),
705            resp.request_id
706        );
707        trace!("on_receipts resp={:?}", resp);
708
709        self.receipts.receive(
710            peer,
711            resp.request_id,
712            resp.receipts.into_iter(),
713        )?;
714
715        self.receipts.sync(io);
716        Ok(())
717    }
718
719    fn on_state_entries(
720        &self, io: &dyn NetworkContext, peer: &NodeId,
721        resp: GetStateEntriesResponse,
722    ) -> Result<()> {
723        debug!(
724            "received {} state entries (request id = {})",
725            resp.entries.len(),
726            resp.request_id
727        );
728        trace!("on_state_entries resp={:?}", resp);
729
730        self.state_entries.receive(
731            peer,
732            resp.request_id,
733            resp.entries.into_iter(),
734        )?;
735
736        self.state_entries.sync(io);
737        Ok(())
738    }
739
740    fn on_state_roots(
741        &self, io: &dyn NetworkContext, peer: &NodeId,
742        resp: GetStateRootsResponse,
743    ) -> Result<()> {
744        debug!(
745            "received {} state roots (request id = {})",
746            resp.state_roots.len(),
747            resp.request_id
748        );
749        trace!("on_state_roots resp={:?}", resp);
750
751        self.state_roots.receive(
752            peer,
753            resp.request_id,
754            resp.state_roots.into_iter(),
755        )?;
756
757        self.state_roots.sync(io);
758        Ok(())
759    }
760
761    fn on_storage_roots(
762        &self, io: &dyn NetworkContext, peer: &NodeId,
763        resp: GetStorageRootsResponse,
764    ) -> Result<()> {
765        debug!(
766            "received {} storage roots (request id = {})",
767            resp.roots.len(),
768            resp.request_id
769        );
770        trace!("on_storage_roots resp={:?}", resp);
771
772        self.storage_roots.receive(
773            peer,
774            resp.request_id,
775            resp.roots.into_iter(),
776        )?;
777
778        self.storage_roots.sync(io);
779        Ok(())
780    }
781
782    fn on_txs(
783        &self, io: &dyn NetworkContext, peer: &NodeId, resp: GetTxsResponse,
784    ) -> Result<()> {
785        debug!(
786            "received {} txs (request id = {})",
787            resp.txs.len(),
788            resp.request_id
789        );
790        trace!("on_txs resp={:?}", resp);
791
792        self.txs
793            .receive(peer, resp.request_id, resp.txs.into_iter())?;
794
795        self.txs.sync(io);
796        Ok(())
797    }
798
799    fn on_tx_infos(
800        &self, io: &dyn NetworkContext, peer: &NodeId, resp: GetTxInfosResponse,
801    ) -> Result<()> {
802        debug!(
803            "received {} tx infos (request id = {})",
804            resp.infos.len(),
805            resp.request_id
806        );
807        trace!("on_tx_infos resp={:?}", resp);
808
809        self.tx_infos
810            .receive(peer, resp.request_id, resp.infos.into_iter())?;
811
812        self.tx_infos.sync(io);
813        Ok(())
814    }
815
816    fn on_witness_info(
817        &self, io: &dyn NetworkContext, peer: &NodeId,
818        resp: GetWitnessInfoResponse,
819    ) -> Result<()> {
820        debug!(
821            "received {} witnesses (request id = {})",
822            resp.infos.len(),
823            resp.request_id
824        );
825        trace!("on_witness_info resp={:?}", resp);
826
827        self.witnesses.receive(
828            peer,
829            resp.request_id,
830            resp.infos.into_iter(),
831        )?;
832
833        self.witnesses.sync(io);
834        Ok(())
835    }
836
837    fn start_sync(&self, io: &dyn NetworkContext) {
838        match self.catch_up_mode() {
839            true => {
840                self.headers.sync(io);
841                self.epochs.sync(io);
842            }
843            false => {
844                self.collect_terminals();
845                self.headers.sync(io);
846            }
847        };
848
849        self.witnesses.sync(io);
850        self.blooms.sync(io);
851        self.receipts.sync(io);
852        self.block_txs.sync(io);
853        self.state_entries.sync(io);
854        self.state_roots.sync(io);
855        self.storage_roots.sync(io);
856        self.txs.sync(io);
857        self.tx_infos.sync(io);
858    }
859
860    fn clean_up_requests(&self) {
861        self.block_txs.clean_up();
862        self.blooms.clean_up();
863        self.epochs.clean_up();
864        self.headers.clean_up();
865        self.receipts.clean_up();
866        self.state_entries.clean_up();
867        self.state_roots.clean_up();
868        self.storage_roots.clean_up();
869        self.tx_infos.clean_up();
870        self.txs.clean_up();
871        self.witnesses.clean_up();
872    }
873
874    fn on_throttled(
875        &self, _io: &dyn NetworkContext, peer: &NodeId, resp: Throttled,
876    ) -> Result<()> {
877        debug!("on_throttled resp={:?}", resp);
878
879        let peer = self.get_existing_peer_state(peer)?;
880        peer.write().throttled_msgs.set_throttled(
881            resp.msg_id,
882            Instant::now() + Duration::from_nanos(resp.wait_time_nanos),
883        );
884
885        // TODO(boqiu): update when throttled
886        // In case of throttled for a RPC call:
887        // 1. Just return error to client;
888        // 2. Select another peer to try again (e.g. 3 times at most).
889        //
890        // In addition, if no peer available, return error to client
891        // immediately. So, when any error occur (e.g. proof validation failed,
892        // throttled), light node should return error instead of waiting for
893        // timeout.
894
895        Ok(())
896    }
897}
898
899impl Drop for Handler {
900    fn drop(&mut self) {
901        // signal stop to worker thread
902        self.stopped.store(true, Ordering::SeqCst);
903
904        if let Some(thread) = self.join_handle.take() {
905            // joining a thread from itself is not a good idea; this should not
906            // happen in this case as the thread has no references to `Handler`
907            assert!(
908                thread.thread().id() != thread::current().id(),
909                "Attempting to join Witness Worker thread from itself (id = {:?})", thread::current().id(),
910            );
911
912            // `stopped` is set and `recv` in the worker will timeout,
913            // so the thread should stop eventually.
914            thread.join().expect("Witness Worker should not panic");
915
916            // for more info about these issues,
917            // see https://stackoverflow.com/a/42791007
918
919            // for a discussion about why we want to join the thread,
920            // see https://github.com/rust-lang/rust/issues/48820#issue-303146976
921        }
922    }
923}
924
925impl NetworkProtocolHandler for Handler {
926    fn minimum_supported_version(&self) -> ProtocolVersion {
927        let my_version = self.protocol_version.0;
928        if my_version > LIGHT_PROTOCOL_OLD_VERSIONS_TO_SUPPORT {
929            ProtocolVersion(my_version - LIGHT_PROTOCOL_OLD_VERSIONS_TO_SUPPORT)
930        } else {
931            LIGHT_PROTO_V1
932        }
933    }
934
935    fn initialize(&self, io: &dyn NetworkContext) {
936        io.register_timer(SYNC_TIMER, *SYNC_PERIOD)
937            .expect("Error registering sync timer");
938
939        io.register_timer(REQUEST_CLEANUP_TIMER, *CLEANUP_PERIOD)
940            .expect("Error registering request cleanup timer");
941
942        io.register_timer(LOG_STATISTICS_TIMER, Duration::from_secs(1))
943            .expect("Error registering log statistics timer");
944
945        io.register_timer(HEARTBEAT_TIMER, *HEARTBEAT_PERIOD)
946            .expect("Error registering heartbeat timer");
947
948        io.register_timer(TOTAL_WEIGHT_IN_PAST_TIMER, Duration::from_secs(20))
949            .expect("Error registering total weight in past timer");
950        io.register_timer(
951            CHECK_SYNC_NOT_READY_BLOCKS_TIMER,
952            Duration::from_millis(1000),
953        )
954        .expect("Error registering CHECK_FUTURE_BLOCK_TIMER");
955    }
956
957    fn on_message(&self, io: &dyn NetworkContext, peer: &NodeId, raw: &[u8]) {
958        trace!("on_message: peer={:?}, raw={:?}", peer, raw);
959
960        let (msg_id, rlp) = match decode_msg(raw) {
961            Some(msg) => msg,
962            None => {
963                return handle_error(
964                    io,
965                    peer,
966                    msgid::INVALID,
967                    &Error::InvalidMessageFormat.into(),
968                )
969            }
970        };
971
972        trace!("on_message: peer={:?}, msgid={:?}", peer, msg_id);
973
974        if let Err(e) = self.dispatch_message(io, peer, msg_id.into(), rlp) {
975            handle_error(io, peer, msg_id.into(), &e);
976        }
977    }
978
979    fn on_peer_connected(
980        &self, io: &dyn NetworkContext, node_id: &NodeId,
981        peer_protocol_version: ProtocolVersion,
982        _pos_public_key: Option<(ConsensusPublicKey, ConsensusVRFPublicKey)>,
983    ) {
984        debug!("on_peer_connected: peer={:?}", node_id);
985
986        match self.send_status(io, node_id, peer_protocol_version) {
987            Ok(_) => {
988                // insert handshaking peer
989                self.peers.insert(*node_id);
990                self.peers.get(node_id).unwrap().write().protocol_version =
991                    peer_protocol_version;
992
993                if let Some(ref file) = self.throttling_config_file {
994                    let peer = self.peers.get(node_id).expect("peer not found");
995                    peer.write().unexpected_msgs = TokenBucketManager::load(
996                        file,
997                        Some("light_protocol::unexpected_msgs"),
998                    )
999                    .expect("invalid throttling configuration file");
1000                }
1001            }
1002            Err(e) => {
1003                warn!("Error while sending status: {}", e);
1004                handle_error(
1005                    io,
1006                    node_id,
1007                    msgid::INVALID,
1008                    &Error::SendStatusFailed {
1009                        peer: *node_id,
1010                        source: None,
1011                    }
1012                    .into(),
1013                );
1014            }
1015        }
1016    }
1017
1018    fn on_peer_disconnected(&self, _io: &dyn NetworkContext, peer: &NodeId) {
1019        debug!("on_peer_disconnected: peer={}", peer);
1020        self.peers.remove(peer);
1021    }
1022
1023    fn on_timeout(&self, io: &dyn NetworkContext, timer: TimerToken) {
1024        trace!("Timeout: timer={:?}", timer);
1025        match timer {
1026            SYNC_TIMER => self.start_sync(io),
1027            REQUEST_CLEANUP_TIMER => self.clean_up_requests(),
1028            LOG_STATISTICS_TIMER => {
1029                self.print_stats();
1030                self.block_txs.print_stats();
1031                self.blooms.print_stats();
1032                self.epochs.print_stats();
1033                self.headers.print_stats();
1034                self.receipts.print_stats();
1035                self.state_entries.print_stats();
1036                self.state_roots.print_stats();
1037                self.storage_roots.print_stats();
1038                self.tx_infos.print_stats();
1039                self.txs.print_stats();
1040                self.witnesses.print_stats();
1041            }
1042            HEARTBEAT_TIMER => {
1043                self.send_heartbeat(io);
1044            }
1045            TOTAL_WEIGHT_IN_PAST_TIMER => {
1046                self.consensus.update_total_weight_delta_heartbeat();
1047            }
1048            CHECK_SYNC_NOT_READY_BLOCKS_TIMER => {
1049                self.headers
1050                    .graph
1051                    .check_not_ready_frontier(true /* header_only */);
1052            }
1053            // TODO(thegaram): add other timers (e.g. data_man gc)
1054            _ => warn!("Unknown timer {} triggered.", timer),
1055        }
1056    }
1057
1058    fn send_local_message(&self, _io: &dyn NetworkContext, _message: Vec<u8>) {
1059        unreachable!("Light node handler does not have send_local_message.")
1060    }
1061
1062    fn on_work_dispatch(&self, _io: &dyn NetworkContext, _work_type: u8) {
1063        unreachable!("Light node handler does not have on_work_dispatch.")
1064    }
1065}