cfxcore/light_protocol/
provider.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::{
6    consensus::{
7        MaybeExecutedTxExtraInfo, SharedConsensusGraph, TransactionInfo,
8    },
9    light_protocol::{
10        common::{
11            partition_results, validate_chain_id, LedgerInfo, LightPeerState,
12            Peers,
13        },
14        error::*,
15        handle_error,
16        message::{
17            msgid, BlockHashes as GetBlockHashesResponse,
18            BlockHeaders as GetBlockHeadersResponse,
19            BlockTxs as GetBlockTxsResponse, BlockTxsWithHash, BloomWithEpoch,
20            Blooms as GetBloomsResponse, GetBlockHashesByEpoch,
21            GetBlockHeaders, GetBlockTxs, GetBlooms, GetReceipts,
22            GetStateEntries, GetStateRoots, GetStorageRoots, GetTxInfos,
23            GetTxs, GetWitnessInfo, NewBlockHashes, NodeType,
24            Receipts as GetReceiptsResponse, ReceiptsWithEpoch, SendRawTx,
25            StateEntries as GetStateEntriesResponse, StateEntryProof,
26            StateEntryWithKey, StateKey, StateRootWithEpoch,
27            StateRoots as GetStateRootsResponse, StatusPingDeprecatedV1,
28            StatusPingV2, StatusPongDeprecatedV1, StatusPongV2, StorageRootKey,
29            StorageRootProof, StorageRootWithKey,
30            StorageRoots as GetStorageRootsResponse, TxInfo,
31            TxInfos as GetTxInfosResponse, Txs as GetTxsResponse,
32            WitnessInfo as GetWitnessInfoResponse,
33        },
34        LIGHT_PROTOCOL_ID, LIGHT_PROTOCOL_OLD_VERSIONS_TO_SUPPORT,
35        LIGHT_PROTOCOL_VERSION, LIGHT_PROTO_V1,
36    },
37    message::{decode_msg, decode_rlp_and_check_deprecation, Message, MsgId},
38    sync::{message::Throttled, SynchronizationGraph},
39    verification::{compute_epoch_receipt_proof, compute_transaction_proof},
40    TransactionPool,
41};
42use cfx_internal_common::ChainIdParamsDeprecated;
43use cfx_parameters::light::{
44    MAX_EPOCHS_TO_SEND, MAX_HEADERS_TO_SEND, MAX_ITEMS_TO_SEND,
45    MAX_TXS_TO_SEND, MAX_WITNESSES_TO_SEND,
46};
47use cfx_types::H256;
48use diem_types::validator_config::{ConsensusPublicKey, ConsensusVRFPublicKey};
49use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
50use network::{
51    node_table::NodeId, service::ProtocolVersion,
52    throttling::THROTTLING_SERVICE, NetworkContext, NetworkProtocolHandler,
53    NetworkService, UpdateNodeOperation,
54};
55use parking_lot::RwLock;
56use primitives::{
57    SignedTransaction, TransactionIndex, TransactionWithSignature,
58};
59use rand::prelude::SliceRandom;
60use rlp::Rlp;
61use std::{
62    sync::{Arc, Weak},
63    time::{Duration, Instant},
64};
65use throttling::token_bucket::{ThrottleResult, TokenBucketManager};
66
67type TimerToken = usize;
68
69const CHECK_PEER_HEARTBEAT_TIMER: TimerToken = 0;
70
71#[derive(DeriveMallocSizeOf)]
72pub struct Provider {
73    pub protocol_version: ProtocolVersion,
74    node_type: NodeType,
75
76    // shared consensus graph
77    #[ignore_malloc_size_of = "arc already counted"]
78    consensus: SharedConsensusGraph,
79
80    // shared synchronization graph
81    graph: Arc<SynchronizationGraph>,
82
83    // helper API for retrieving ledger information
84    #[ignore_malloc_size_of = "arc already counted"]
85    ledger: LedgerInfo,
86
87    // shared network service
88    // NOTE: use weak pointer in order to avoid circular references
89    #[ignore_malloc_size_of = "channels are not handled in MallocSizeOf"]
90    network: Weak<NetworkService>,
91
92    // collection of all peers available
93    peers: Peers<LightPeerState>,
94
95    // shared transaction pool
96    tx_pool: Arc<TransactionPool>,
97
98    throttling_config_file: Option<String>,
99}
100
101impl Provider {
102    pub fn new(
103        consensus: SharedConsensusGraph, graph: Arc<SynchronizationGraph>,
104        network: Weak<NetworkService>, tx_pool: Arc<TransactionPool>,
105        throttling_config_file: Option<String>, node_type: NodeType,
106    ) -> Self {
107        let ledger = LedgerInfo::new(consensus.clone());
108        let peers = Peers::new();
109
110        Provider {
111            protocol_version: LIGHT_PROTOCOL_VERSION,
112            node_type,
113            consensus,
114            graph,
115            ledger,
116            network,
117            peers,
118            tx_pool,
119            throttling_config_file,
120        }
121    }
122
123    pub fn register(
124        self: &Arc<Self>, network: Arc<NetworkService>,
125    ) -> std::result::Result<(), String> {
126        network
127            .register_protocol(
128                self.clone(),
129                LIGHT_PROTOCOL_ID,
130                self.protocol_version,
131            )
132            .map_err(|e| {
133                format!("failed to register protocol Provider: {:?}", e)
134            })
135    }
136
137    #[inline]
138    fn get_existing_peer_state(
139        &self, peer: &NodeId,
140    ) -> Result<Arc<RwLock<LightPeerState>>> {
141        match self.peers.get(peer) {
142            Some(state) => Ok(state),
143            None => {
144                // NOTE: this should not happen as we register
145                // all peers in `on_peer_connected`
146                bail!(Error::InternalError(format!(
147                    "Received message from unknown peer={:?}",
148                    peer
149                )))
150            }
151        }
152    }
153
154    #[inline]
155    fn peer_version(&self, peer: &NodeId) -> Result<ProtocolVersion> {
156        Ok(self.get_existing_peer_state(peer)?.read().protocol_version)
157    }
158
159    #[inline]
160    fn validate_peer_state(&self, peer: &NodeId, msg_id: MsgId) -> Result<()> {
161        let state = self.get_existing_peer_state(&peer)?;
162
163        if msg_id != msgid::STATUS_PING_DEPRECATED
164            && msg_id != msgid::STATUS_PING_V2
165            && !state.read().handshake_completed
166        {
167            warn!("Received msg={:?} from handshaking peer={:?}", msg_id, peer);
168            bail!(Error::UnexpectedMessage {
169                expected: vec![
170                    msgid::STATUS_PING_DEPRECATED,
171                    msgid::STATUS_PING_V2
172                ],
173                received: msg_id,
174            });
175        }
176
177        Ok(())
178    }
179
180    #[rustfmt::skip]
181    fn dispatch_message(
182        &self, io: &dyn NetworkContext, peer: &NodeId, msg_id: MsgId, rlp: Rlp,
183    ) -> Result<()> {
184        trace!("Dispatching message: peer={:?}, msg_id={:?}", peer, msg_id);
185        self.validate_peer_state(peer, msg_id)?;
186        let min_supported_ver = self.minimum_supported_version();
187        let protocol = io.get_protocol();
188
189        match msg_id {
190            msgid::STATUS_PING_DEPRECATED => self.on_status_deprecated(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
191            msgid::STATUS_PING_V2 => self.on_status_v2(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
192            msgid::GET_STATE_ENTRIES => self.on_get_state_entries(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
193            msgid::GET_STATE_ROOTS => self.on_get_state_roots(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
194            msgid::GET_BLOCK_HASHES_BY_EPOCH => self.on_get_block_hashes_by_epoch(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
195            msgid::GET_BLOCK_HEADERS => self.on_get_block_headers(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
196            msgid::SEND_RAW_TX => self.on_send_raw_tx(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
197            msgid::GET_RECEIPTS => self.on_get_receipts(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
198            msgid::GET_TXS => self.on_get_txs(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
199            msgid::GET_WITNESS_INFO => self.on_get_witness_info(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
200            msgid::GET_BLOOMS => self.on_get_blooms(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
201            msgid::GET_BLOCK_TXS => self.on_get_block_txs(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
202            msgid::GET_TX_INFOS => self.on_get_tx_infos(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
203            msgid::GET_STORAGE_ROOTS => self.on_get_storage_roots(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
204            _ => bail!(Error::UnknownMessage{id: msg_id}),
205        }
206    }
207
208    #[inline]
209    fn all_light_peers(&self) -> Vec<NodeId> {
210        // peers completing the handshake are guaranteed to be light peers
211        self.peers.all_peers_satisfying(|s| s.handshake_completed)
212    }
213
214    #[inline]
215    fn tx_by_hash(&self, hash: H256) -> Option<SignedTransaction> {
216        if let Some(info) = self.consensus.get_signed_tx_and_tx_info(&hash) {
217            return Some(info.0);
218        };
219
220        if let Some(tx) = self.tx_pool.get_transaction(&hash) {
221            return Some((*tx).clone());
222        };
223
224        None
225    }
226
227    #[inline]
228    fn tx_info_by_hash(&self, hash: H256) -> Result<TxInfo> {
229        let (tx, tx_index, receipt) =
230            match self.consensus.get_signed_tx_and_tx_info(&hash) {
231                None => {
232                    bail!(Error::UnableToProduceTxInfo {
233                        reason: format!("Unable to get tx info for {:?}", hash)
234                    });
235                }
236                Some((
237                    _,
238                    TransactionInfo {
239                        maybe_executed_extra_info: None,
240                        ..
241                    },
242                )) => {
243                    bail!(Error::UnableToProduceTxInfo {
244                        reason: format!("Unable to get receipt for {:?}", hash)
245                    });
246                }
247                Some((
248                    _,
249                    TransactionInfo {
250                        tx_index:
251                            TransactionIndex {
252                                is_phantom: true, ..
253                            },
254                        ..
255                    },
256                )) => {
257                    bail!(Error::UnableToProduceTxInfo {
258                        reason: format!(
259                            "Phantom tx not supported (hash: {:?})",
260                            hash
261                        )
262                    });
263                }
264                Some((
265                    tx,
266                    TransactionInfo {
267                        tx_index,
268                        maybe_executed_extra_info:
269                            Some(MaybeExecutedTxExtraInfo { receipt, .. }),
270                    },
271                )) => {
272                    assert_eq!(tx.hash(), hash); // sanity check
273                    (tx, tx_index, receipt)
274                }
275            };
276
277        let block_hash = tx_index.block_hash;
278        let block = self.ledger.block(block_hash)?;
279        let tx_index_in_block = tx_index.real_index;
280        let num_txs_in_block = block.transactions.len();
281
282        let tx_proof =
283            compute_transaction_proof(&block.transactions, tx_index_in_block);
284
285        let epoch = match self.consensus.get_block_epoch_number(&block_hash) {
286            Some(epoch) => epoch,
287            None => {
288                bail!(Error::UnableToProduceTxInfo {
289                    reason: format!(
290                        "Unable to get epoch number for block {:?}",
291                        block_hash
292                    )
293                });
294            }
295        };
296
297        let epoch_hashes = match self.ledger.block_hashes_in(epoch) {
298            Ok(hs) => hs,
299            Err(e) => {
300                bail!(Error::UnableToProduceTxInfo {
301                    reason: format!(
302                        "Unable to find epoch hashes for {}: {}",
303                        epoch, e
304                    )
305                });
306            }
307        };
308
309        let num_blocks_in_epoch = epoch_hashes.len();
310
311        let block_index_in_epoch =
312            match epoch_hashes.iter().position(|h| *h == block_hash) {
313                Some(id) => id,
314                None => {
315                    bail!(Error::UnableToProduceTxInfo {
316                        reason: format!(
317                            "Unable to find {:?} in epoch {}",
318                            block_hash, epoch
319                        )
320                    });
321                }
322            };
323
324        let epoch_receipts = self
325            .ledger
326            .receipts_of(epoch)?
327            .iter()
328            .cloned()
329            .map(Arc::new)
330            .collect::<Vec<_>>();
331
332        let epoch_receipt_proof = compute_epoch_receipt_proof(
333            &epoch_receipts,
334            block_index_in_epoch,
335            tx_index_in_block,
336        );
337
338        let (maybe_prev_receipt, maybe_prev_receipt_proof) =
339            match tx_index_in_block {
340                0 => (None, None),
341                _ => {
342                    let receipt = epoch_receipts[block_index_in_epoch].receipts
343                        [tx_index_in_block - 1]
344                        .clone();
345
346                    let proof = compute_epoch_receipt_proof(
347                        &epoch_receipts,
348                        block_index_in_epoch,
349                        tx_index_in_block - 1,
350                    );
351
352                    (Some(receipt), Some(proof.block_receipt_proof))
353                }
354            };
355
356        Ok(TxInfo {
357            epoch,
358
359            // tx-related fields
360            tx,
361            tx_index_in_block,
362            num_txs_in_block,
363            tx_proof,
364
365            // receipt-related fields
366            receipt,
367            block_index_in_epoch,
368            num_blocks_in_epoch,
369            block_index_proof: epoch_receipt_proof.block_index_proof,
370            receipt_proof: epoch_receipt_proof.block_receipt_proof,
371
372            // prior_gas_used-related fields
373            maybe_prev_receipt,
374            maybe_prev_receipt_proof,
375        })
376    }
377
378    fn send_status(
379        &self, io: &dyn NetworkContext, peer: &NodeId,
380    ) -> Result<()> {
381        let best_info = self.consensus.best_info();
382        let genesis_hash = self.graph.data_man.true_genesis.hash();
383
384        let terminals = best_info.bounded_terminal_block_hashes.clone();
385
386        let msg: Box<dyn Message>;
387        if self.peer_version(peer)? == LIGHT_PROTO_V1 {
388            msg = Box::new(StatusPongDeprecatedV1 {
389                protocol_version: self.protocol_version.0,
390                best_epoch: best_info.best_epoch_number,
391                genesis_hash,
392                node_type: self.node_type,
393                terminals,
394            });
395        } else {
396            msg = Box::new(StatusPongV2 {
397                chain_id: ChainIdParamsDeprecated {
398                    chain_id: self.consensus.best_chain_id().in_native_space(),
399                },
400                best_epoch: best_info.best_epoch_number,
401                genesis_hash,
402                node_type: self.node_type,
403                terminals,
404            });
405        }
406
407        msg.send(io, peer)?;
408        Ok(())
409    }
410
411    #[inline]
412    fn validate_peer_type(&self, node_type: NodeType) -> Result<()> {
413        match node_type {
414            NodeType::Light => Ok(()),
415            _ => bail!(Error::UnexpectedPeerType { node_type }),
416        }
417    }
418
419    #[inline]
420    fn validate_genesis_hash(&self, genesis: H256) -> Result<()> {
421        let ours = self.graph.data_man.true_genesis.hash();
422        let theirs = genesis;
423
424        if ours != theirs {
425            bail!(Error::GenesisMismatch { ours, theirs });
426        }
427
428        Ok(())
429    }
430
431    fn on_status_v2(
432        &self, io: &dyn NetworkContext, peer: &NodeId, status: StatusPingV2,
433    ) -> Result<()> {
434        debug!("on_status (v2) peer={:?} status={:?}", peer, status);
435        self.throttle(peer, &status)?;
436
437        self.validate_peer_type(status.node_type)?;
438        self.validate_genesis_hash(status.genesis_hash)?;
439        validate_chain_id(
440            &self
441                .consensus
442                .config()
443                .chain_id
444                .read()
445                .to_native_space_params(),
446            status.chain_id.into(),
447            /* peer_height = */ 0,
448        )?;
449
450        self.send_status(io, peer)
451            .map_err(|e| Error::SendStatusFailed {
452                peer: *peer,
453                source: Some(Box::new(e)),
454            })?;
455
456        let state = self.get_existing_peer_state(peer)?;
457        let mut state = state.write();
458        state.handshake_completed = true;
459        state.last_heartbeat = Instant::now();
460        Ok(())
461    }
462
463    fn on_status_deprecated(
464        &self, io: &dyn NetworkContext, peer: &NodeId,
465        status: StatusPingDeprecatedV1,
466    ) -> Result<()> {
467        debug!("on_status (v1) peer={:?} status={:?}", peer, status);
468
469        self.on_status_v2(
470            io,
471            peer,
472            StatusPingV2 {
473                genesis_hash: status.genesis_hash,
474                node_type: status.node_type,
475                chain_id: ChainIdParamsDeprecated {
476                    chain_id: self.consensus.best_chain_id().in_native_space(),
477                },
478            },
479        )
480    }
481
482    fn on_get_state_roots(
483        &self, io: &dyn NetworkContext, peer: &NodeId, req: GetStateRoots,
484    ) -> Result<()> {
485        debug!("on_get_state_roots req={:?}", req);
486        self.throttle(peer, &req)?;
487        let request_id = req.request_id;
488
489        let it = req
490            .epochs
491            .into_iter()
492            .take(MAX_ITEMS_TO_SEND)
493            .map::<Result<_>, _>(|epoch| {
494                let state_root = self.ledger.state_root_of(epoch)?.state_root;
495                Ok(StateRootWithEpoch { epoch, state_root })
496            });
497
498        let (state_roots, errors) = partition_results(it);
499
500        if !errors.is_empty() {
501            debug!("Errors while serving GetStateRoots request: {:?}", errors);
502        }
503
504        let msg: Box<dyn Message> = Box::new(GetStateRootsResponse {
505            request_id,
506            state_roots,
507        });
508
509        msg.send(io, peer)?;
510        Ok(())
511    }
512
513    fn state_entry(&self, key: StateKey) -> Result<StateEntryWithKey> {
514        let snapshot_epoch_count = self.ledger.snapshot_epoch_count() as u64;
515
516        // state root in current snapshot period
517        let state_root = self.ledger.state_root_of(key.epoch)?.state_root;
518
519        // state root in previous snapshot period
520        let prev_snapshot_state_root = match key.epoch {
521            e if e <= snapshot_epoch_count => None,
522            _ => Some(
523                self.ledger
524                    .state_root_of(key.epoch - snapshot_epoch_count)?
525                    .state_root,
526            ),
527        };
528
529        // state entry and state proof
530        let (entry, state_proof) =
531            self.ledger.state_entry_at(key.epoch, &key.key)?;
532
533        let proof = StateEntryProof {
534            state_root,
535            prev_snapshot_state_root,
536            state_proof,
537        };
538
539        Ok(StateEntryWithKey { key, entry, proof })
540    }
541
542    fn on_get_state_entries(
543        &self, io: &dyn NetworkContext, peer: &NodeId, req: GetStateEntries,
544    ) -> Result<()> {
545        debug!("on_get_state_entries req={:?}", req);
546        self.throttle(peer, &req)?;
547        let request_id = req.request_id;
548
549        let it = req
550            .keys
551            .into_iter()
552            .take(MAX_ITEMS_TO_SEND)
553            .map(|key| self.state_entry(key));
554
555        let (entries, errors) = partition_results(it);
556
557        if !errors.is_empty() {
558            debug!(
559                "Errors while serving GetStateEntries request: {:?}",
560                errors
561            );
562        }
563
564        let msg: Box<dyn Message> = Box::new(GetStateEntriesResponse {
565            request_id,
566            entries,
567        });
568
569        msg.send(io, peer)?;
570        Ok(())
571    }
572
573    fn on_get_block_hashes_by_epoch(
574        &self, io: &dyn NetworkContext, peer: &NodeId,
575        req: GetBlockHashesByEpoch,
576    ) -> Result<()> {
577        debug!("on_get_block_hashes_by_epoch req={:?}", req);
578        self.throttle(peer, &req)?;
579        let request_id = req.request_id;
580
581        let it = req
582            .epochs
583            .iter()
584            .take(MAX_EPOCHS_TO_SEND)
585            .map(|&e| self.graph.get_all_block_hashes_by_epoch(e));
586
587        let (hashes, errors) = partition_results(it);
588
589        if !errors.is_empty() {
590            debug!(
591                "Errors while serving GetBlockHashesByEpoch request: {:?}",
592                errors
593            );
594        }
595
596        let msg: Box<dyn Message> = Box::new(GetBlockHashesResponse {
597            request_id,
598            hashes: hashes.into_iter().flatten().collect(),
599        });
600
601        msg.send(io, peer)?;
602        Ok(())
603    }
604
605    fn on_get_block_headers(
606        &self, io: &dyn NetworkContext, peer: &NodeId, req: GetBlockHeaders,
607    ) -> Result<()> {
608        debug!("on_get_block_headers req={:?}", req);
609        self.throttle(peer, &req)?;
610        let request_id = req.request_id;
611
612        let it = req
613            .hashes
614            .iter()
615            .take(MAX_HEADERS_TO_SEND)
616            .map::<Result<_>, _>(|h| {
617                self.graph
618                    .data_man
619                    .block_header_by_hash(&h)
620                    .map(|header_arc| header_arc.as_ref().clone())
621                    .ok_or_else(|| {
622                        Error::Msg(format!("Block {:?} not found", h)).into()
623                    })
624            });
625
626        let (headers, errors) = partition_results(it);
627
628        if !errors.is_empty() {
629            debug!(
630                "Errors while serving GetBlockHeaders request: {:?}",
631                errors
632            );
633        }
634
635        let msg: Box<dyn Message> = Box::new(GetBlockHeadersResponse {
636            request_id,
637            headers,
638        });
639
640        msg.send(io, peer)?;
641        Ok(())
642    }
643
644    fn on_send_raw_tx(
645        &self, _io: &dyn NetworkContext, peer: &NodeId, req: SendRawTx,
646    ) -> Result<()> {
647        debug!("on_send_raw_tx req={:?}", req);
648        self.throttle(peer, &req)?;
649        let tx: TransactionWithSignature = rlp::decode(&req.raw)?;
650
651        let (passed, failed) = self.tx_pool.insert_new_transactions(vec![tx]);
652
653        match (passed.len(), failed.len()) {
654            (0, 0) => {
655                debug!("Tx already inserted, ignoring");
656                Ok(())
657            }
658            (0, 1) => {
659                let err = failed.values().next().expect("Not empty");
660                warn!("Failed to insert tx: {}", err);
661                Ok(())
662            }
663            (1, 0) => {
664                debug!("Tx inserted successfully");
665                // TODO(thegaram): consider relaying to peers
666                Ok(())
667            }
668            _ => {
669                // NOTE: this should not happen
670                bail!(Error::InternalError(format!(
671                    "insert_new_transactions failed: {:?}, {:?}",
672                    passed, failed
673                )))
674            }
675        }
676    }
677
678    fn on_get_receipts(
679        &self, io: &dyn NetworkContext, peer: &NodeId, req: GetReceipts,
680    ) -> Result<()> {
681        debug!("on_get_receipts req={:?}", req);
682        self.throttle(peer, &req)?;
683        let request_id = req.request_id;
684
685        let it = req.epochs.into_iter().take(MAX_ITEMS_TO_SEND).map(|epoch| {
686            self.ledger.receipts_of(epoch).map(|epoch_receipts| {
687                ReceiptsWithEpoch {
688                    epoch,
689                    epoch_receipts,
690                }
691            })
692        });
693
694        let (receipts, errors) = partition_results(it);
695
696        if !errors.is_empty() {
697            debug!("Errors while serving GetReceipts request: {:?}", errors);
698        }
699
700        let msg: Box<dyn Message> = Box::new(GetReceiptsResponse {
701            request_id,
702            receipts,
703        });
704
705        msg.send(io, peer)?;
706        Ok(())
707    }
708
709    fn on_get_txs(
710        &self, io: &dyn NetworkContext, peer: &NodeId, req: GetTxs,
711    ) -> Result<()> {
712        debug!("on_get_txs req={:?}", req);
713        self.throttle(peer, &req)?;
714        let request_id = req.request_id;
715
716        let it = req
717            .hashes
718            .into_iter()
719            .take(MAX_TXS_TO_SEND)
720            .map::<Result<_>, _>(|h| {
721                self.tx_by_hash(h).ok_or_else(|| {
722                    Error::Msg(format!("Tx {:?} not found", h)).into()
723                })
724            });
725
726        let (txs, errors) = partition_results(it);
727
728        if !errors.is_empty() {
729            debug!("Errors while serving GetTxs request: {:?}", errors);
730        }
731
732        let msg: Box<dyn Message> =
733            Box::new(GetTxsResponse { request_id, txs });
734
735        msg.send(io, peer)?;
736        Ok(())
737    }
738
739    fn on_get_witness_info(
740        &self, io: &dyn NetworkContext, peer: &NodeId, req: GetWitnessInfo,
741    ) -> Result<()> {
742        debug!("on_get_witness_info req={:?}", req);
743        self.throttle(peer, &req)?;
744        let request_id = req.request_id;
745
746        let it = req
747            .witnesses
748            .into_iter()
749            .take(MAX_WITNESSES_TO_SEND)
750            .map(|w| self.ledger.witness_info(w));
751
752        let (infos, errors) = partition_results(it);
753
754        if !errors.is_empty() {
755            debug!("Errors while serving GetWitnessInfo request: {:?}", errors);
756        }
757
758        let msg: Box<dyn Message> =
759            Box::new(GetWitnessInfoResponse { request_id, infos });
760
761        msg.send(io, peer)?;
762        Ok(())
763    }
764
765    fn on_get_blooms(
766        &self, io: &dyn NetworkContext, peer: &NodeId, req: GetBlooms,
767    ) -> Result<()> {
768        debug!("on_get_blooms req={:?}", req);
769        self.throttle(peer, &req)?;
770        let request_id = req.request_id;
771
772        let it = req.epochs.into_iter().take(MAX_ITEMS_TO_SEND).map(|epoch| {
773            self.ledger
774                .bloom_of(epoch)
775                .map(|bloom| BloomWithEpoch { epoch, bloom })
776        });
777
778        let (blooms, errors) = partition_results(it);
779
780        if !errors.is_empty() {
781            debug!("Errors while serving GetBlooms request: {:?}", errors);
782        }
783
784        let msg: Box<dyn Message> =
785            Box::new(GetBloomsResponse { request_id, blooms });
786
787        msg.send(io, peer)?;
788        Ok(())
789    }
790
791    fn on_get_block_txs(
792        &self, io: &dyn NetworkContext, peer: &NodeId, req: GetBlockTxs,
793    ) -> Result<()> {
794        debug!("on_get_block_txs req={:?}", req);
795        self.throttle(peer, &req)?;
796        let request_id = req.request_id;
797
798        let it = req
799            .hashes
800            .into_iter()
801            .take(MAX_ITEMS_TO_SEND)
802            .map::<Result<_>, _>(|h| {
803                let block = self.ledger.block(h)?;
804
805                let block_txs = block
806                    .transactions
807                    .clone()
808                    .into_iter()
809                    .map(|arc_tx| (*arc_tx).clone())
810                    .collect();
811
812                Ok(BlockTxsWithHash {
813                    hash: block.hash(),
814                    block_txs,
815                })
816            });
817
818        let (block_txs, errors) = partition_results(it);
819
820        if !errors.is_empty() {
821            debug!("Errors while serving GetBlockTxs request: {:?}", errors);
822        }
823
824        let msg: Box<dyn Message> = Box::new(GetBlockTxsResponse {
825            request_id,
826            block_txs,
827        });
828
829        msg.send(io, peer)?;
830        Ok(())
831    }
832
833    fn on_get_tx_infos(
834        &self, io: &dyn NetworkContext, peer: &NodeId, req: GetTxInfos,
835    ) -> Result<()> {
836        debug!("on_get_tx_infos req={:?}", req);
837        self.throttle(peer, &req)?;
838        let request_id = req.request_id;
839
840        let it = req
841            .hashes
842            .into_iter()
843            .take(MAX_ITEMS_TO_SEND)
844            .map(|h| self.tx_info_by_hash(h));
845
846        let (infos, errors) = partition_results(it);
847
848        if !errors.is_empty() {
849            debug!("Errors while serving GetTxInfos request: {:?}", errors);
850        }
851
852        let msg: Box<dyn Message> =
853            Box::new(GetTxInfosResponse { request_id, infos });
854
855        msg.send(io, peer)?;
856        Ok(())
857    }
858
859    fn storage_root(&self, key: StorageRootKey) -> Result<StorageRootWithKey> {
860        let snapshot_epoch_count = self.ledger.snapshot_epoch_count() as u64;
861
862        // state root in current snapshot period
863        let state_root = self.ledger.state_root_of(key.epoch)?.state_root;
864
865        // state root in previous snapshot period
866        let prev_snapshot_state_root = match key.epoch {
867            e if e <= snapshot_epoch_count => None,
868            _ => Some(
869                self.ledger
870                    .state_root_of(key.epoch - snapshot_epoch_count)?
871                    .state_root,
872            ),
873        };
874
875        // storage root and merkle proof
876        let (root, merkle_proof) =
877            self.ledger.storage_root_of(key.epoch, &key.address)?;
878
879        let proof = StorageRootProof {
880            state_root,
881            prev_snapshot_state_root,
882            merkle_proof,
883        };
884
885        Ok(StorageRootWithKey { key, root, proof })
886    }
887
888    fn on_get_storage_roots(
889        &self, io: &dyn NetworkContext, peer: &NodeId, req: GetStorageRoots,
890    ) -> Result<()> {
891        debug!("on_get_storage_roots req={:?}", req);
892        self.throttle(peer, &req)?;
893        let request_id = req.request_id;
894
895        let it = req
896            .keys
897            .into_iter()
898            .take(MAX_ITEMS_TO_SEND)
899            .map(|key| self.storage_root(key));
900
901        let (roots, errors) = partition_results(it);
902
903        if !errors.is_empty() {
904            debug!(
905                "Errors while serving GetStorageRoots request: {:?}",
906                errors
907            );
908        }
909
910        let msg: Box<dyn Message> =
911            Box::new(GetStorageRootsResponse { request_id, roots });
912
913        msg.send(io, peer)?;
914        Ok(())
915    }
916
917    fn broadcast(
918        &self, io: &dyn NetworkContext, mut peers: Vec<NodeId>,
919        msg: &dyn Message,
920    ) -> Result<()> {
921        debug!("broadcast peers={:?}", peers);
922
923        let throttle_ratio = THROTTLING_SERVICE.read().get_throttling_ratio();
924        let total = peers.len();
925        let allowed = (total as f64 * throttle_ratio) as usize;
926
927        if total > allowed {
928            debug!(
929                "Apply throttling for broadcast, total: {}, allowed: {}",
930                total, allowed
931            );
932            peers.shuffle(&mut rand::rng());
933            peers.truncate(allowed);
934        }
935
936        for id in peers {
937            msg.send(io, &id)?;
938        }
939
940        Ok(())
941    }
942
943    pub fn relay_hashes(self: &Arc<Self>, hashes: Vec<H256>) -> Result<()> {
944        debug!("relay_hashes hashes={:?}", hashes);
945
946        if hashes.is_empty() {
947            return Ok(());
948        }
949
950        // check network availability
951        let network = match self.network.upgrade() {
952            Some(network) => network,
953            None => {
954                bail!(Error::InternalError(
955                    "Network unavailable, not relaying hashes".to_owned()
956                ));
957            }
958        };
959
960        // broadcast message
961        let res = network.with_context(self.clone(), LIGHT_PROTOCOL_ID, |io| {
962            let msg: Box<dyn Message> = Box::new(NewBlockHashes { hashes });
963            self.broadcast(io, self.all_light_peers(), msg.as_ref())
964        });
965
966        if let Err(e) = res {
967            warn!("Error broadcasting blocks: {:?}", e);
968        };
969
970        Ok(())
971    }
972
973    fn throttle<T: Message>(&self, peer: &NodeId, msg: &T) -> Result<()> {
974        let peer = self.get_existing_peer_state(peer)?;
975
976        let bucket_name = msg.msg_name().to_string();
977        let bucket = match peer.read().throttling.get(&bucket_name) {
978            Some(bucket) => bucket,
979            None => return Ok(()),
980        };
981
982        let result = bucket.lock().throttle_default();
983
984        match result {
985            ThrottleResult::Success => Ok(()),
986            ThrottleResult::Throttled(wait_time) => {
987                let throttled = Throttled {
988                    msg_id: msg.msg_id(),
989                    wait_time_nanos: wait_time.as_nanos() as u64,
990                    request_id: msg.get_request_id(),
991                };
992
993                bail!(Error::Throttled(msg.msg_name(), throttled))
994            }
995            ThrottleResult::AlreadyThrottled => {
996                bail!(Error::AlreadyThrottled(msg.msg_name()))
997            }
998        }
999    }
1000
1001    fn check_timeout(&self, io: &dyn NetworkContext, timeout: Duration) {
1002        for peer in self.peers.all_peers_satisfying(|p| {
1003            p.handshake_completed && p.last_heartbeat.elapsed() >= timeout
1004        }) {
1005            io.disconnect_peer(
1006                &peer,
1007                Some(UpdateNodeOperation::Failure),
1008                "light node sync heartbeat timeout", /* reason */
1009            );
1010        }
1011    }
1012}
1013
1014impl NetworkProtocolHandler for Provider {
1015    fn minimum_supported_version(&self) -> ProtocolVersion {
1016        let my_version = self.protocol_version.0;
1017        if my_version > LIGHT_PROTOCOL_OLD_VERSIONS_TO_SUPPORT {
1018            ProtocolVersion(my_version - LIGHT_PROTOCOL_OLD_VERSIONS_TO_SUPPORT)
1019        } else {
1020            LIGHT_PROTO_V1
1021        }
1022    }
1023
1024    fn initialize(&self, io: &dyn NetworkContext) {
1025        io.register_timer(CHECK_PEER_HEARTBEAT_TIMER, Duration::from_secs(60))
1026            .expect("Error registering CHECK_PEER_HEARTBEAT_TIMER");
1027    }
1028
1029    fn on_message(&self, io: &dyn NetworkContext, peer: &NodeId, raw: &[u8]) {
1030        trace!("on_message: peer={:?}, raw={:?}", peer, raw);
1031
1032        let (msg_id, rlp) = match decode_msg(raw) {
1033            Some(msg) => msg,
1034            None => {
1035                return handle_error(
1036                    io,
1037                    peer,
1038                    msgid::INVALID,
1039                    &Error::InvalidMessageFormat.into(),
1040                )
1041            }
1042        };
1043
1044        debug!("on_message: peer={:?}, msgid={:?}", peer, msg_id);
1045
1046        if let Err(e) = self.dispatch_message(io, peer, msg_id.into(), rlp) {
1047            handle_error(io, peer, msg_id.into(), &e);
1048        }
1049    }
1050
1051    fn on_peer_connected(
1052        &self, _io: &dyn NetworkContext, node_id: &NodeId,
1053        peer_protocol_version: ProtocolVersion,
1054        _pos_public_key: Option<(ConsensusPublicKey, ConsensusVRFPublicKey)>,
1055    ) {
1056        debug!(
1057            "on_peer_connected: peer={:?} version={}",
1058            node_id, peer_protocol_version
1059        );
1060
1061        // insert handshaking peer, wait for StatusPing
1062        self.peers.insert_with(*node_id, |peer| {
1063            if let Some(ref file) = self.throttling_config_file {
1064                peer.throttling =
1065                    TokenBucketManager::load(file, Some("light_protocol"))
1066                        .expect("invalid throttling configuration file");
1067            }
1068            peer.protocol_version = peer_protocol_version;
1069            peer.last_heartbeat = Instant::now();
1070        });
1071    }
1072
1073    fn on_peer_disconnected(&self, _io: &dyn NetworkContext, peer: &NodeId) {
1074        debug!("on_peer_disconnected: peer={}", peer);
1075        self.peers.remove(peer);
1076    }
1077
1078    fn on_timeout(&self, io: &dyn NetworkContext, timer: TimerToken) {
1079        match timer {
1080            CHECK_PEER_HEARTBEAT_TIMER => {
1081                // TODO: config for light clients.
1082                self.check_timeout(io, Duration::from_secs(180))
1083            }
1084            _ => warn!("Unknown timer {} triggered.", timer),
1085        }
1086    }
1087
1088    fn send_local_message(&self, _io: &dyn NetworkContext, _message: Vec<u8>) {
1089        unreachable!("Light node provider does not have send_local_message.")
1090    }
1091
1092    fn on_work_dispatch(&self, _io: &dyn NetworkContext, _work_type: u8) {
1093        unreachable!("Light node provider does not have on_work_dispatch.")
1094    }
1095}