cfxcore/sync/message/
state_sync_candidate_request.rs1use crate::{
2 message::{
3 GetMaybeRequestId, Message, MessageProtocolVersionBound, MsgId,
4 RequestId, SetRequestId,
5 },
6 sync::{
7 message::{
8 msgid, Context, DynamicCapability, Handleable, KeyContainer,
9 StateSyncCandidateResponse,
10 },
11 request_manager::{AsAny, Request},
12 state::storage::SnapshotSyncCandidate,
13 Error, ProtocolConfiguration, SYNC_PROTO_V1, SYNC_PROTO_V3,
14 },
15};
16use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
17use network::service::ProtocolVersion;
18use rlp::Encodable;
19use rlp_derive::{RlpDecodable, RlpEncodable};
20use std::{any::Any, time::Duration};
21
22#[derive(Clone, RlpEncodable, RlpDecodable, Debug, DeriveMallocSizeOf)]
23pub struct StateSyncCandidateRequest {
24 pub request_id: RequestId,
25 pub candidates: Vec<SnapshotSyncCandidate>,
26}
27
28build_msg_with_request_id_impl! {
29 StateSyncCandidateRequest, msgid::STATE_SYNC_CANDIDATE_REQUEST,
30 "StateSyncCandidateRequest", SYNC_PROTO_V1, SYNC_PROTO_V3
31}
32
33impl Handleable for StateSyncCandidateRequest {
34 fn handle(self, ctx: &Context) -> Result<(), Error> {
35 let mut supported_candidates =
36 Vec::with_capacity(self.candidates.len());
37 let storage_manager = ctx
38 .manager
39 .graph
40 .data_man
41 .storage_manager
42 .get_storage_manager();
43 for candidate in self.candidates {
44 match candidate {
45 SnapshotSyncCandidate::FullSync {
46 height,
47 snapshot_epoch_id,
48 } => {
49 match storage_manager
50 .get_snapshot_info_at_epoch(&snapshot_epoch_id)
51 {
52 Some(snapshot_info) => {
53 if snapshot_info.height == height {
54 supported_candidates.push(
55 SnapshotSyncCandidate::FullSync {
56 height,
57 snapshot_epoch_id,
58 },
59 );
60 } else {
61 warn!(
62 "Invalid SnapshotSyncCandidate, height unmatch: get {:?}, \
63 local_height of the snapshot is {}",
64 candidate, snapshot_info.height);
65 }
66 }
67 None => {
68 debug!(
69 "Requested snapshot not exist: {:?}",
70 candidate
71 );
72 }
73 }
74 }
75 _ => {
76 warn!("Unsupported candidate: {:?}", candidate);
77 }
78 }
79 }
80 ctx.send_response(&StateSyncCandidateResponse {
81 request_id: self.request_id,
82 supported_candidates,
83 })?;
84
85 Ok(())
86 }
87}
88
89impl AsAny for StateSyncCandidateRequest {
90 fn as_any(&self) -> &dyn Any { self }
91
92 fn as_any_mut(&mut self) -> &mut dyn Any { self }
93}
94
95impl Request for StateSyncCandidateRequest {
96 fn timeout(&self, conf: &ProtocolConfiguration) -> Duration {
97 conf.snapshot_candidate_request_timeout
98 }
99
100 fn on_removed(&self, _inflight_keys: &KeyContainer) {}
101
102 fn with_inflight(&mut self, _inflight_keys: &KeyContainer) {}
103
104 fn is_empty(&self) -> bool { false }
105
106 fn resend(&self) -> Option<Box<dyn Request>> { None }
107
108 fn required_capability(&self) -> Option<DynamicCapability> { None }
109}