1use crate::{
6 consensus::{
7 MaybeExecutedTxExtraInfo, SharedConsensusGraph, TransactionInfo,
8 },
9 light_protocol::{
10 common::{
11 partition_results, validate_chain_id, LedgerInfo, LightPeerState,
12 Peers,
13 },
14 error::*,
15 handle_error,
16 message::{
17 msgid, BlockHashes as GetBlockHashesResponse,
18 BlockHeaders as GetBlockHeadersResponse,
19 BlockTxs as GetBlockTxsResponse, BlockTxsWithHash, BloomWithEpoch,
20 Blooms as GetBloomsResponse, GetBlockHashesByEpoch,
21 GetBlockHeaders, GetBlockTxs, GetBlooms, GetReceipts,
22 GetStateEntries, GetStateRoots, GetStorageRoots, GetTxInfos,
23 GetTxs, GetWitnessInfo, NewBlockHashes, NodeType,
24 Receipts as GetReceiptsResponse, ReceiptsWithEpoch, SendRawTx,
25 StateEntries as GetStateEntriesResponse, StateEntryProof,
26 StateEntryWithKey, StateKey, StateRootWithEpoch,
27 StateRoots as GetStateRootsResponse, StatusPingDeprecatedV1,
28 StatusPingV2, StatusPongDeprecatedV1, StatusPongV2, StorageRootKey,
29 StorageRootProof, StorageRootWithKey,
30 StorageRoots as GetStorageRootsResponse, TxInfo,
31 TxInfos as GetTxInfosResponse, Txs as GetTxsResponse,
32 WitnessInfo as GetWitnessInfoResponse,
33 },
34 LIGHT_PROTOCOL_ID, LIGHT_PROTOCOL_OLD_VERSIONS_TO_SUPPORT,
35 LIGHT_PROTOCOL_VERSION, LIGHT_PROTO_V1,
36 },
37 message::{decode_msg, decode_rlp_and_check_deprecation, Message, MsgId},
38 sync::{message::Throttled, SynchronizationGraph},
39 verification::{compute_epoch_receipt_proof, compute_transaction_proof},
40 TransactionPool,
41};
42use cfx_internal_common::ChainIdParamsDeprecated;
43use cfx_parameters::light::{
44 MAX_EPOCHS_TO_SEND, MAX_HEADERS_TO_SEND, MAX_ITEMS_TO_SEND,
45 MAX_TXS_TO_SEND, MAX_WITNESSES_TO_SEND,
46};
47use cfx_types::H256;
48use diem_types::validator_config::{ConsensusPublicKey, ConsensusVRFPublicKey};
49use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
50use network::{
51 node_table::NodeId, service::ProtocolVersion,
52 throttling::THROTTLING_SERVICE, NetworkContext, NetworkProtocolHandler,
53 NetworkService, UpdateNodeOperation,
54};
55use parking_lot::RwLock;
56use primitives::{
57 SignedTransaction, TransactionIndex, TransactionWithSignature,
58};
59use rand::prelude::SliceRandom;
60use rlp::Rlp;
61use std::{
62 sync::{Arc, Weak},
63 time::{Duration, Instant},
64};
65use throttling::token_bucket::{ThrottleResult, TokenBucketManager};
66
67type TimerToken = usize;
68
69const CHECK_PEER_HEARTBEAT_TIMER: TimerToken = 0;
70
71#[derive(DeriveMallocSizeOf)]
72pub struct Provider {
73 pub protocol_version: ProtocolVersion,
74 node_type: NodeType,
75
76 #[ignore_malloc_size_of = "arc already counted"]
78 consensus: SharedConsensusGraph,
79
80 graph: Arc<SynchronizationGraph>,
82
83 #[ignore_malloc_size_of = "arc already counted"]
85 ledger: LedgerInfo,
86
87 #[ignore_malloc_size_of = "channels are not handled in MallocSizeOf"]
90 network: Weak<NetworkService>,
91
92 peers: Peers<LightPeerState>,
94
95 tx_pool: Arc<TransactionPool>,
97
98 throttling_config_file: Option<String>,
99}
100
101impl Provider {
102 pub fn new(
103 consensus: SharedConsensusGraph, graph: Arc<SynchronizationGraph>,
104 network: Weak<NetworkService>, tx_pool: Arc<TransactionPool>,
105 throttling_config_file: Option<String>, node_type: NodeType,
106 ) -> Self {
107 let ledger = LedgerInfo::new(consensus.clone());
108 let peers = Peers::new();
109
110 Provider {
111 protocol_version: LIGHT_PROTOCOL_VERSION,
112 node_type,
113 consensus,
114 graph,
115 ledger,
116 network,
117 peers,
118 tx_pool,
119 throttling_config_file,
120 }
121 }
122
123 pub fn register(
124 self: &Arc<Self>, network: Arc<NetworkService>,
125 ) -> std::result::Result<(), String> {
126 network
127 .register_protocol(
128 self.clone(),
129 LIGHT_PROTOCOL_ID,
130 self.protocol_version,
131 )
132 .map_err(|e| {
133 format!("failed to register protocol Provider: {:?}", e)
134 })
135 }
136
137 #[inline]
138 fn get_existing_peer_state(
139 &self, peer: &NodeId,
140 ) -> Result<Arc<RwLock<LightPeerState>>> {
141 match self.peers.get(peer) {
142 Some(state) => Ok(state),
143 None => {
144 bail!(Error::InternalError(format!(
147 "Received message from unknown peer={:?}",
148 peer
149 )))
150 }
151 }
152 }
153
154 #[inline]
155 fn peer_version(&self, peer: &NodeId) -> Result<ProtocolVersion> {
156 Ok(self.get_existing_peer_state(peer)?.read().protocol_version)
157 }
158
159 #[inline]
160 fn validate_peer_state(&self, peer: &NodeId, msg_id: MsgId) -> Result<()> {
161 let state = self.get_existing_peer_state(&peer)?;
162
163 if msg_id != msgid::STATUS_PING_DEPRECATED
164 && msg_id != msgid::STATUS_PING_V2
165 && !state.read().handshake_completed
166 {
167 warn!("Received msg={:?} from handshaking peer={:?}", msg_id, peer);
168 bail!(Error::UnexpectedMessage {
169 expected: vec![
170 msgid::STATUS_PING_DEPRECATED,
171 msgid::STATUS_PING_V2
172 ],
173 received: msg_id,
174 });
175 }
176
177 Ok(())
178 }
179
180 #[rustfmt::skip]
181 fn dispatch_message(
182 &self, io: &dyn NetworkContext, peer: &NodeId, msg_id: MsgId, rlp: Rlp,
183 ) -> Result<()> {
184 trace!("Dispatching message: peer={:?}, msg_id={:?}", peer, msg_id);
185 self.validate_peer_state(peer, msg_id)?;
186 let min_supported_ver = self.minimum_supported_version();
187 let protocol = io.get_protocol();
188
189 match msg_id {
190 msgid::STATUS_PING_DEPRECATED => self.on_status_deprecated(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
191 msgid::STATUS_PING_V2 => self.on_status_v2(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
192 msgid::GET_STATE_ENTRIES => self.on_get_state_entries(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
193 msgid::GET_STATE_ROOTS => self.on_get_state_roots(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
194 msgid::GET_BLOCK_HASHES_BY_EPOCH => self.on_get_block_hashes_by_epoch(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
195 msgid::GET_BLOCK_HEADERS => self.on_get_block_headers(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
196 msgid::SEND_RAW_TX => self.on_send_raw_tx(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
197 msgid::GET_RECEIPTS => self.on_get_receipts(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
198 msgid::GET_TXS => self.on_get_txs(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
199 msgid::GET_WITNESS_INFO => self.on_get_witness_info(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
200 msgid::GET_BLOOMS => self.on_get_blooms(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
201 msgid::GET_BLOCK_TXS => self.on_get_block_txs(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
202 msgid::GET_TX_INFOS => self.on_get_tx_infos(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
203 msgid::GET_STORAGE_ROOTS => self.on_get_storage_roots(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
204 _ => bail!(Error::UnknownMessage{id: msg_id}),
205 }
206 }
207
208 #[inline]
209 fn all_light_peers(&self) -> Vec<NodeId> {
210 self.peers.all_peers_satisfying(|s| s.handshake_completed)
212 }
213
214 #[inline]
215 fn tx_by_hash(&self, hash: H256) -> Option<SignedTransaction> {
216 if let Some(info) = self.consensus.get_signed_tx_and_tx_info(&hash) {
217 return Some(info.0);
218 };
219
220 if let Some(tx) = self.tx_pool.get_transaction(&hash) {
221 return Some((*tx).clone());
222 };
223
224 None
225 }
226
227 #[inline]
228 fn tx_info_by_hash(&self, hash: H256) -> Result<TxInfo> {
229 let (tx, tx_index, receipt) =
230 match self.consensus.get_signed_tx_and_tx_info(&hash) {
231 None => {
232 bail!(Error::UnableToProduceTxInfo {
233 reason: format!("Unable to get tx info for {:?}", hash)
234 });
235 }
236 Some((
237 _,
238 TransactionInfo {
239 maybe_executed_extra_info: None,
240 ..
241 },
242 )) => {
243 bail!(Error::UnableToProduceTxInfo {
244 reason: format!("Unable to get receipt for {:?}", hash)
245 });
246 }
247 Some((
248 _,
249 TransactionInfo {
250 tx_index:
251 TransactionIndex {
252 is_phantom: true, ..
253 },
254 ..
255 },
256 )) => {
257 bail!(Error::UnableToProduceTxInfo {
258 reason: format!(
259 "Phantom tx not supported (hash: {:?})",
260 hash
261 )
262 });
263 }
264 Some((
265 tx,
266 TransactionInfo {
267 tx_index,
268 maybe_executed_extra_info:
269 Some(MaybeExecutedTxExtraInfo { receipt, .. }),
270 },
271 )) => {
272 assert_eq!(tx.hash(), hash); (tx, tx_index, receipt)
274 }
275 };
276
277 let block_hash = tx_index.block_hash;
278 let block = self.ledger.block(block_hash)?;
279 let tx_index_in_block = tx_index.real_index;
280 let num_txs_in_block = block.transactions.len();
281
282 let tx_proof =
283 compute_transaction_proof(&block.transactions, tx_index_in_block);
284
285 let epoch = match self.consensus.get_block_epoch_number(&block_hash) {
286 Some(epoch) => epoch,
287 None => {
288 bail!(Error::UnableToProduceTxInfo {
289 reason: format!(
290 "Unable to get epoch number for block {:?}",
291 block_hash
292 )
293 });
294 }
295 };
296
297 let epoch_hashes = match self.ledger.block_hashes_in(epoch) {
298 Ok(hs) => hs,
299 Err(e) => {
300 bail!(Error::UnableToProduceTxInfo {
301 reason: format!(
302 "Unable to find epoch hashes for {}: {}",
303 epoch, e
304 )
305 });
306 }
307 };
308
309 let num_blocks_in_epoch = epoch_hashes.len();
310
311 let block_index_in_epoch =
312 match epoch_hashes.iter().position(|h| *h == block_hash) {
313 Some(id) => id,
314 None => {
315 bail!(Error::UnableToProduceTxInfo {
316 reason: format!(
317 "Unable to find {:?} in epoch {}",
318 block_hash, epoch
319 )
320 });
321 }
322 };
323
324 let epoch_receipts = self
325 .ledger
326 .receipts_of(epoch)?
327 .iter()
328 .cloned()
329 .map(Arc::new)
330 .collect::<Vec<_>>();
331
332 let epoch_receipt_proof = compute_epoch_receipt_proof(
333 &epoch_receipts,
334 block_index_in_epoch,
335 tx_index_in_block,
336 );
337
338 let (maybe_prev_receipt, maybe_prev_receipt_proof) =
339 match tx_index_in_block {
340 0 => (None, None),
341 _ => {
342 let receipt = epoch_receipts[block_index_in_epoch].receipts
343 [tx_index_in_block - 1]
344 .clone();
345
346 let proof = compute_epoch_receipt_proof(
347 &epoch_receipts,
348 block_index_in_epoch,
349 tx_index_in_block - 1,
350 );
351
352 (Some(receipt), Some(proof.block_receipt_proof))
353 }
354 };
355
356 Ok(TxInfo {
357 epoch,
358
359 tx,
361 tx_index_in_block,
362 num_txs_in_block,
363 tx_proof,
364
365 receipt,
367 block_index_in_epoch,
368 num_blocks_in_epoch,
369 block_index_proof: epoch_receipt_proof.block_index_proof,
370 receipt_proof: epoch_receipt_proof.block_receipt_proof,
371
372 maybe_prev_receipt,
374 maybe_prev_receipt_proof,
375 })
376 }
377
378 fn send_status(
379 &self, io: &dyn NetworkContext, peer: &NodeId,
380 ) -> Result<()> {
381 let best_info = self.consensus.best_info();
382 let genesis_hash = self.graph.data_man.true_genesis.hash();
383
384 let terminals = best_info.bounded_terminal_block_hashes.clone();
385
386 let msg: Box<dyn Message>;
387 if self.peer_version(peer)? == LIGHT_PROTO_V1 {
388 msg = Box::new(StatusPongDeprecatedV1 {
389 protocol_version: self.protocol_version.0,
390 best_epoch: best_info.best_epoch_number,
391 genesis_hash,
392 node_type: self.node_type,
393 terminals,
394 });
395 } else {
396 msg = Box::new(StatusPongV2 {
397 chain_id: ChainIdParamsDeprecated {
398 chain_id: self.consensus.best_chain_id().in_native_space(),
399 },
400 best_epoch: best_info.best_epoch_number,
401 genesis_hash,
402 node_type: self.node_type,
403 terminals,
404 });
405 }
406
407 msg.send(io, peer)?;
408 Ok(())
409 }
410
411 #[inline]
412 fn validate_peer_type(&self, node_type: NodeType) -> Result<()> {
413 match node_type {
414 NodeType::Light => Ok(()),
415 _ => bail!(Error::UnexpectedPeerType { node_type }),
416 }
417 }
418
419 #[inline]
420 fn validate_genesis_hash(&self, genesis: H256) -> Result<()> {
421 let ours = self.graph.data_man.true_genesis.hash();
422 let theirs = genesis;
423
424 if ours != theirs {
425 bail!(Error::GenesisMismatch { ours, theirs });
426 }
427
428 Ok(())
429 }
430
431 fn on_status_v2(
432 &self, io: &dyn NetworkContext, peer: &NodeId, status: StatusPingV2,
433 ) -> Result<()> {
434 debug!("on_status (v2) peer={:?} status={:?}", peer, status);
435 self.throttle(peer, &status)?;
436
437 self.validate_peer_type(status.node_type)?;
438 self.validate_genesis_hash(status.genesis_hash)?;
439 validate_chain_id(
440 &self
441 .consensus
442 .config()
443 .chain_id
444 .read()
445 .to_native_space_params(),
446 status.chain_id.into(),
447 0,
448 )?;
449
450 self.send_status(io, peer)
451 .map_err(|e| Error::SendStatusFailed {
452 peer: *peer,
453 source: Some(Box::new(e)),
454 })?;
455
456 let state = self.get_existing_peer_state(peer)?;
457 let mut state = state.write();
458 state.handshake_completed = true;
459 state.last_heartbeat = Instant::now();
460 Ok(())
461 }
462
463 fn on_status_deprecated(
464 &self, io: &dyn NetworkContext, peer: &NodeId,
465 status: StatusPingDeprecatedV1,
466 ) -> Result<()> {
467 debug!("on_status (v1) peer={:?} status={:?}", peer, status);
468
469 self.on_status_v2(
470 io,
471 peer,
472 StatusPingV2 {
473 genesis_hash: status.genesis_hash,
474 node_type: status.node_type,
475 chain_id: ChainIdParamsDeprecated {
476 chain_id: self.consensus.best_chain_id().in_native_space(),
477 },
478 },
479 )
480 }
481
482 fn on_get_state_roots(
483 &self, io: &dyn NetworkContext, peer: &NodeId, req: GetStateRoots,
484 ) -> Result<()> {
485 debug!("on_get_state_roots req={:?}", req);
486 self.throttle(peer, &req)?;
487 let request_id = req.request_id;
488
489 let it = req
490 .epochs
491 .into_iter()
492 .take(MAX_ITEMS_TO_SEND)
493 .map::<Result<_>, _>(|epoch| {
494 let state_root = self.ledger.state_root_of(epoch)?.state_root;
495 Ok(StateRootWithEpoch { epoch, state_root })
496 });
497
498 let (state_roots, errors) = partition_results(it);
499
500 if !errors.is_empty() {
501 debug!("Errors while serving GetStateRoots request: {:?}", errors);
502 }
503
504 let msg: Box<dyn Message> = Box::new(GetStateRootsResponse {
505 request_id,
506 state_roots,
507 });
508
509 msg.send(io, peer)?;
510 Ok(())
511 }
512
513 fn state_entry(&self, key: StateKey) -> Result<StateEntryWithKey> {
514 let snapshot_epoch_count = self.ledger.snapshot_epoch_count() as u64;
515
516 let state_root = self.ledger.state_root_of(key.epoch)?.state_root;
518
519 let prev_snapshot_state_root = match key.epoch {
521 e if e <= snapshot_epoch_count => None,
522 _ => Some(
523 self.ledger
524 .state_root_of(key.epoch - snapshot_epoch_count)?
525 .state_root,
526 ),
527 };
528
529 let (entry, state_proof) =
531 self.ledger.state_entry_at(key.epoch, &key.key)?;
532
533 let proof = StateEntryProof {
534 state_root,
535 prev_snapshot_state_root,
536 state_proof,
537 };
538
539 Ok(StateEntryWithKey { key, entry, proof })
540 }
541
542 fn on_get_state_entries(
543 &self, io: &dyn NetworkContext, peer: &NodeId, req: GetStateEntries,
544 ) -> Result<()> {
545 debug!("on_get_state_entries req={:?}", req);
546 self.throttle(peer, &req)?;
547 let request_id = req.request_id;
548
549 let it = req
550 .keys
551 .into_iter()
552 .take(MAX_ITEMS_TO_SEND)
553 .map(|key| self.state_entry(key));
554
555 let (entries, errors) = partition_results(it);
556
557 if !errors.is_empty() {
558 debug!(
559 "Errors while serving GetStateEntries request: {:?}",
560 errors
561 );
562 }
563
564 let msg: Box<dyn Message> = Box::new(GetStateEntriesResponse {
565 request_id,
566 entries,
567 });
568
569 msg.send(io, peer)?;
570 Ok(())
571 }
572
573 fn on_get_block_hashes_by_epoch(
574 &self, io: &dyn NetworkContext, peer: &NodeId,
575 req: GetBlockHashesByEpoch,
576 ) -> Result<()> {
577 debug!("on_get_block_hashes_by_epoch req={:?}", req);
578 self.throttle(peer, &req)?;
579 let request_id = req.request_id;
580
581 let it = req
582 .epochs
583 .iter()
584 .take(MAX_EPOCHS_TO_SEND)
585 .map(|&e| self.graph.get_all_block_hashes_by_epoch(e));
586
587 let (hashes, errors) = partition_results(it);
588
589 if !errors.is_empty() {
590 debug!(
591 "Errors while serving GetBlockHashesByEpoch request: {:?}",
592 errors
593 );
594 }
595
596 let msg: Box<dyn Message> = Box::new(GetBlockHashesResponse {
597 request_id,
598 hashes: hashes.into_iter().flatten().collect(),
599 });
600
601 msg.send(io, peer)?;
602 Ok(())
603 }
604
605 fn on_get_block_headers(
606 &self, io: &dyn NetworkContext, peer: &NodeId, req: GetBlockHeaders,
607 ) -> Result<()> {
608 debug!("on_get_block_headers req={:?}", req);
609 self.throttle(peer, &req)?;
610 let request_id = req.request_id;
611
612 let it = req
613 .hashes
614 .iter()
615 .take(MAX_HEADERS_TO_SEND)
616 .map::<Result<_>, _>(|h| {
617 self.graph
618 .data_man
619 .block_header_by_hash(&h)
620 .map(|header_arc| header_arc.as_ref().clone())
621 .ok_or_else(|| {
622 Error::Msg(format!("Block {:?} not found", h)).into()
623 })
624 });
625
626 let (headers, errors) = partition_results(it);
627
628 if !errors.is_empty() {
629 debug!(
630 "Errors while serving GetBlockHeaders request: {:?}",
631 errors
632 );
633 }
634
635 let msg: Box<dyn Message> = Box::new(GetBlockHeadersResponse {
636 request_id,
637 headers,
638 });
639
640 msg.send(io, peer)?;
641 Ok(())
642 }
643
644 fn on_send_raw_tx(
645 &self, _io: &dyn NetworkContext, peer: &NodeId, req: SendRawTx,
646 ) -> Result<()> {
647 debug!("on_send_raw_tx req={:?}", req);
648 self.throttle(peer, &req)?;
649 let tx: TransactionWithSignature = rlp::decode(&req.raw)?;
650
651 let (passed, failed) = self.tx_pool.insert_new_transactions(vec![tx]);
652
653 match (passed.len(), failed.len()) {
654 (0, 0) => {
655 debug!("Tx already inserted, ignoring");
656 Ok(())
657 }
658 (0, 1) => {
659 let err = failed.values().next().expect("Not empty");
660 warn!("Failed to insert tx: {}", err);
661 Ok(())
662 }
663 (1, 0) => {
664 debug!("Tx inserted successfully");
665 Ok(())
667 }
668 _ => {
669 bail!(Error::InternalError(format!(
671 "insert_new_transactions failed: {:?}, {:?}",
672 passed, failed
673 )))
674 }
675 }
676 }
677
678 fn on_get_receipts(
679 &self, io: &dyn NetworkContext, peer: &NodeId, req: GetReceipts,
680 ) -> Result<()> {
681 debug!("on_get_receipts req={:?}", req);
682 self.throttle(peer, &req)?;
683 let request_id = req.request_id;
684
685 let it = req.epochs.into_iter().take(MAX_ITEMS_TO_SEND).map(|epoch| {
686 self.ledger.receipts_of(epoch).map(|epoch_receipts| {
687 ReceiptsWithEpoch {
688 epoch,
689 epoch_receipts,
690 }
691 })
692 });
693
694 let (receipts, errors) = partition_results(it);
695
696 if !errors.is_empty() {
697 debug!("Errors while serving GetReceipts request: {:?}", errors);
698 }
699
700 let msg: Box<dyn Message> = Box::new(GetReceiptsResponse {
701 request_id,
702 receipts,
703 });
704
705 msg.send(io, peer)?;
706 Ok(())
707 }
708
709 fn on_get_txs(
710 &self, io: &dyn NetworkContext, peer: &NodeId, req: GetTxs,
711 ) -> Result<()> {
712 debug!("on_get_txs req={:?}", req);
713 self.throttle(peer, &req)?;
714 let request_id = req.request_id;
715
716 let it = req
717 .hashes
718 .into_iter()
719 .take(MAX_TXS_TO_SEND)
720 .map::<Result<_>, _>(|h| {
721 self.tx_by_hash(h).ok_or_else(|| {
722 Error::Msg(format!("Tx {:?} not found", h)).into()
723 })
724 });
725
726 let (txs, errors) = partition_results(it);
727
728 if !errors.is_empty() {
729 debug!("Errors while serving GetTxs request: {:?}", errors);
730 }
731
732 let msg: Box<dyn Message> =
733 Box::new(GetTxsResponse { request_id, txs });
734
735 msg.send(io, peer)?;
736 Ok(())
737 }
738
739 fn on_get_witness_info(
740 &self, io: &dyn NetworkContext, peer: &NodeId, req: GetWitnessInfo,
741 ) -> Result<()> {
742 debug!("on_get_witness_info req={:?}", req);
743 self.throttle(peer, &req)?;
744 let request_id = req.request_id;
745
746 let it = req
747 .witnesses
748 .into_iter()
749 .take(MAX_WITNESSES_TO_SEND)
750 .map(|w| self.ledger.witness_info(w));
751
752 let (infos, errors) = partition_results(it);
753
754 if !errors.is_empty() {
755 debug!("Errors while serving GetWitnessInfo request: {:?}", errors);
756 }
757
758 let msg: Box<dyn Message> =
759 Box::new(GetWitnessInfoResponse { request_id, infos });
760
761 msg.send(io, peer)?;
762 Ok(())
763 }
764
765 fn on_get_blooms(
766 &self, io: &dyn NetworkContext, peer: &NodeId, req: GetBlooms,
767 ) -> Result<()> {
768 debug!("on_get_blooms req={:?}", req);
769 self.throttle(peer, &req)?;
770 let request_id = req.request_id;
771
772 let it = req.epochs.into_iter().take(MAX_ITEMS_TO_SEND).map(|epoch| {
773 self.ledger
774 .bloom_of(epoch)
775 .map(|bloom| BloomWithEpoch { epoch, bloom })
776 });
777
778 let (blooms, errors) = partition_results(it);
779
780 if !errors.is_empty() {
781 debug!("Errors while serving GetBlooms request: {:?}", errors);
782 }
783
784 let msg: Box<dyn Message> =
785 Box::new(GetBloomsResponse { request_id, blooms });
786
787 msg.send(io, peer)?;
788 Ok(())
789 }
790
791 fn on_get_block_txs(
792 &self, io: &dyn NetworkContext, peer: &NodeId, req: GetBlockTxs,
793 ) -> Result<()> {
794 debug!("on_get_block_txs req={:?}", req);
795 self.throttle(peer, &req)?;
796 let request_id = req.request_id;
797
798 let it = req
799 .hashes
800 .into_iter()
801 .take(MAX_ITEMS_TO_SEND)
802 .map::<Result<_>, _>(|h| {
803 let block = self.ledger.block(h)?;
804
805 let block_txs = block
806 .transactions
807 .clone()
808 .into_iter()
809 .map(|arc_tx| (*arc_tx).clone())
810 .collect();
811
812 Ok(BlockTxsWithHash {
813 hash: block.hash(),
814 block_txs,
815 })
816 });
817
818 let (block_txs, errors) = partition_results(it);
819
820 if !errors.is_empty() {
821 debug!("Errors while serving GetBlockTxs request: {:?}", errors);
822 }
823
824 let msg: Box<dyn Message> = Box::new(GetBlockTxsResponse {
825 request_id,
826 block_txs,
827 });
828
829 msg.send(io, peer)?;
830 Ok(())
831 }
832
833 fn on_get_tx_infos(
834 &self, io: &dyn NetworkContext, peer: &NodeId, req: GetTxInfos,
835 ) -> Result<()> {
836 debug!("on_get_tx_infos req={:?}", req);
837 self.throttle(peer, &req)?;
838 let request_id = req.request_id;
839
840 let it = req
841 .hashes
842 .into_iter()
843 .take(MAX_ITEMS_TO_SEND)
844 .map(|h| self.tx_info_by_hash(h));
845
846 let (infos, errors) = partition_results(it);
847
848 if !errors.is_empty() {
849 debug!("Errors while serving GetTxInfos request: {:?}", errors);
850 }
851
852 let msg: Box<dyn Message> =
853 Box::new(GetTxInfosResponse { request_id, infos });
854
855 msg.send(io, peer)?;
856 Ok(())
857 }
858
859 fn storage_root(&self, key: StorageRootKey) -> Result<StorageRootWithKey> {
860 let snapshot_epoch_count = self.ledger.snapshot_epoch_count() as u64;
861
862 let state_root = self.ledger.state_root_of(key.epoch)?.state_root;
864
865 let prev_snapshot_state_root = match key.epoch {
867 e if e <= snapshot_epoch_count => None,
868 _ => Some(
869 self.ledger
870 .state_root_of(key.epoch - snapshot_epoch_count)?
871 .state_root,
872 ),
873 };
874
875 let (root, merkle_proof) =
877 self.ledger.storage_root_of(key.epoch, &key.address)?;
878
879 let proof = StorageRootProof {
880 state_root,
881 prev_snapshot_state_root,
882 merkle_proof,
883 };
884
885 Ok(StorageRootWithKey { key, root, proof })
886 }
887
888 fn on_get_storage_roots(
889 &self, io: &dyn NetworkContext, peer: &NodeId, req: GetStorageRoots,
890 ) -> Result<()> {
891 debug!("on_get_storage_roots req={:?}", req);
892 self.throttle(peer, &req)?;
893 let request_id = req.request_id;
894
895 let it = req
896 .keys
897 .into_iter()
898 .take(MAX_ITEMS_TO_SEND)
899 .map(|key| self.storage_root(key));
900
901 let (roots, errors) = partition_results(it);
902
903 if !errors.is_empty() {
904 debug!(
905 "Errors while serving GetStorageRoots request: {:?}",
906 errors
907 );
908 }
909
910 let msg: Box<dyn Message> =
911 Box::new(GetStorageRootsResponse { request_id, roots });
912
913 msg.send(io, peer)?;
914 Ok(())
915 }
916
917 fn broadcast(
918 &self, io: &dyn NetworkContext, mut peers: Vec<NodeId>,
919 msg: &dyn Message,
920 ) -> Result<()> {
921 debug!("broadcast peers={:?}", peers);
922
923 let throttle_ratio = THROTTLING_SERVICE.read().get_throttling_ratio();
924 let total = peers.len();
925 let allowed = (total as f64 * throttle_ratio) as usize;
926
927 if total > allowed {
928 debug!(
929 "Apply throttling for broadcast, total: {}, allowed: {}",
930 total, allowed
931 );
932 peers.shuffle(&mut rand::rng());
933 peers.truncate(allowed);
934 }
935
936 for id in peers {
937 msg.send(io, &id)?;
938 }
939
940 Ok(())
941 }
942
943 pub fn relay_hashes(self: &Arc<Self>, hashes: Vec<H256>) -> Result<()> {
944 debug!("relay_hashes hashes={:?}", hashes);
945
946 if hashes.is_empty() {
947 return Ok(());
948 }
949
950 let network = match self.network.upgrade() {
952 Some(network) => network,
953 None => {
954 bail!(Error::InternalError(
955 "Network unavailable, not relaying hashes".to_owned()
956 ));
957 }
958 };
959
960 let res = network.with_context(self.clone(), LIGHT_PROTOCOL_ID, |io| {
962 let msg: Box<dyn Message> = Box::new(NewBlockHashes { hashes });
963 self.broadcast(io, self.all_light_peers(), msg.as_ref())
964 });
965
966 if let Err(e) = res {
967 warn!("Error broadcasting blocks: {:?}", e);
968 };
969
970 Ok(())
971 }
972
973 fn throttle<T: Message>(&self, peer: &NodeId, msg: &T) -> Result<()> {
974 let peer = self.get_existing_peer_state(peer)?;
975
976 let bucket_name = msg.msg_name().to_string();
977 let bucket = match peer.read().throttling.get(&bucket_name) {
978 Some(bucket) => bucket,
979 None => return Ok(()),
980 };
981
982 let result = bucket.lock().throttle_default();
983
984 match result {
985 ThrottleResult::Success => Ok(()),
986 ThrottleResult::Throttled(wait_time) => {
987 let throttled = Throttled {
988 msg_id: msg.msg_id(),
989 wait_time_nanos: wait_time.as_nanos() as u64,
990 request_id: msg.get_request_id(),
991 };
992
993 bail!(Error::Throttled(msg.msg_name(), throttled))
994 }
995 ThrottleResult::AlreadyThrottled => {
996 bail!(Error::AlreadyThrottled(msg.msg_name()))
997 }
998 }
999 }
1000
1001 fn check_timeout(&self, io: &dyn NetworkContext, timeout: Duration) {
1002 for peer in self.peers.all_peers_satisfying(|p| {
1003 p.handshake_completed && p.last_heartbeat.elapsed() >= timeout
1004 }) {
1005 io.disconnect_peer(
1006 &peer,
1007 Some(UpdateNodeOperation::Failure),
1008 "light node sync heartbeat timeout", );
1010 }
1011 }
1012}
1013
1014impl NetworkProtocolHandler for Provider {
1015 fn minimum_supported_version(&self) -> ProtocolVersion {
1016 let my_version = self.protocol_version.0;
1017 if my_version > LIGHT_PROTOCOL_OLD_VERSIONS_TO_SUPPORT {
1018 ProtocolVersion(my_version - LIGHT_PROTOCOL_OLD_VERSIONS_TO_SUPPORT)
1019 } else {
1020 LIGHT_PROTO_V1
1021 }
1022 }
1023
1024 fn initialize(&self, io: &dyn NetworkContext) {
1025 io.register_timer(CHECK_PEER_HEARTBEAT_TIMER, Duration::from_secs(60))
1026 .expect("Error registering CHECK_PEER_HEARTBEAT_TIMER");
1027 }
1028
1029 fn on_message(&self, io: &dyn NetworkContext, peer: &NodeId, raw: &[u8]) {
1030 trace!("on_message: peer={:?}, raw={:?}", peer, raw);
1031
1032 let (msg_id, rlp) = match decode_msg(raw) {
1033 Some(msg) => msg,
1034 None => {
1035 return handle_error(
1036 io,
1037 peer,
1038 msgid::INVALID,
1039 &Error::InvalidMessageFormat.into(),
1040 )
1041 }
1042 };
1043
1044 debug!("on_message: peer={:?}, msgid={:?}", peer, msg_id);
1045
1046 if let Err(e) = self.dispatch_message(io, peer, msg_id.into(), rlp) {
1047 handle_error(io, peer, msg_id.into(), &e);
1048 }
1049 }
1050
1051 fn on_peer_connected(
1052 &self, _io: &dyn NetworkContext, node_id: &NodeId,
1053 peer_protocol_version: ProtocolVersion,
1054 _pos_public_key: Option<(ConsensusPublicKey, ConsensusVRFPublicKey)>,
1055 ) {
1056 debug!(
1057 "on_peer_connected: peer={:?} version={}",
1058 node_id, peer_protocol_version
1059 );
1060
1061 self.peers.insert_with(*node_id, |peer| {
1063 if let Some(ref file) = self.throttling_config_file {
1064 peer.throttling =
1065 TokenBucketManager::load(file, Some("light_protocol"))
1066 .expect("invalid throttling configuration file");
1067 }
1068 peer.protocol_version = peer_protocol_version;
1069 peer.last_heartbeat = Instant::now();
1070 });
1071 }
1072
1073 fn on_peer_disconnected(&self, _io: &dyn NetworkContext, peer: &NodeId) {
1074 debug!("on_peer_disconnected: peer={}", peer);
1075 self.peers.remove(peer);
1076 }
1077
1078 fn on_timeout(&self, io: &dyn NetworkContext, timer: TimerToken) {
1079 match timer {
1080 CHECK_PEER_HEARTBEAT_TIMER => {
1081 self.check_timeout(io, Duration::from_secs(180))
1083 }
1084 _ => warn!("Unknown timer {} triggered.", timer),
1085 }
1086 }
1087
1088 fn send_local_message(&self, _io: &dyn NetworkContext, _message: Vec<u8>) {
1089 unreachable!("Light node provider does not have send_local_message.")
1090 }
1091
1092 fn on_work_dispatch(&self, _io: &dyn NetworkContext, _work_type: u8) {
1093 unreachable!("Light node provider does not have on_work_dispatch.")
1094 }
1095}