cfxcore/sync/message/
state_sync_candidate_request.rs

1use crate::{
2    message::{
3        GetMaybeRequestId, Message, MessageProtocolVersionBound, MsgId,
4        RequestId, SetRequestId,
5    },
6    sync::{
7        message::{
8            msgid, Context, DynamicCapability, Handleable, KeyContainer,
9            StateSyncCandidateResponse,
10        },
11        request_manager::{AsAny, Request},
12        state::storage::SnapshotSyncCandidate,
13        Error, ProtocolConfiguration, SYNC_PROTO_V1, SYNC_PROTO_V3,
14    },
15};
16use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
17use network::service::ProtocolVersion;
18use rlp::Encodable;
19use rlp_derive::{RlpDecodable, RlpEncodable};
20use std::{any::Any, time::Duration};
21
22#[derive(Clone, RlpEncodable, RlpDecodable, Debug, DeriveMallocSizeOf)]
23pub struct StateSyncCandidateRequest {
24    pub request_id: RequestId,
25    pub candidates: Vec<SnapshotSyncCandidate>,
26}
27
28build_msg_with_request_id_impl! {
29    StateSyncCandidateRequest, msgid::STATE_SYNC_CANDIDATE_REQUEST,
30    "StateSyncCandidateRequest", SYNC_PROTO_V1, SYNC_PROTO_V3
31}
32
33impl Handleable for StateSyncCandidateRequest {
34    fn handle(self, ctx: &Context) -> Result<(), Error> {
35        let mut supported_candidates =
36            Vec::with_capacity(self.candidates.len());
37        let storage_manager = ctx
38            .manager
39            .graph
40            .data_man
41            .storage_manager
42            .get_storage_manager();
43        for candidate in self.candidates {
44            match candidate {
45                SnapshotSyncCandidate::FullSync {
46                    height,
47                    snapshot_epoch_id,
48                } => {
49                    match storage_manager
50                        .get_snapshot_info_at_epoch(&snapshot_epoch_id)
51                    {
52                        Some(snapshot_info) => {
53                            if snapshot_info.height == height {
54                                supported_candidates.push(
55                                    SnapshotSyncCandidate::FullSync {
56                                        height,
57                                        snapshot_epoch_id,
58                                    },
59                                );
60                            } else {
61                                warn!(
62                                    "Invalid SnapshotSyncCandidate, height unmatch: get {:?}, \
63                                    local_height of the snapshot is {}",
64                                    candidate, snapshot_info.height);
65                            }
66                        }
67                        None => {
68                            debug!(
69                                "Requested snapshot not exist: {:?}",
70                                candidate
71                            );
72                        }
73                    }
74                }
75                _ => {
76                    warn!("Unsupported candidate: {:?}", candidate);
77                }
78            }
79        }
80        ctx.send_response(&StateSyncCandidateResponse {
81            request_id: self.request_id,
82            supported_candidates,
83        })?;
84
85        Ok(())
86    }
87}
88
89impl AsAny for StateSyncCandidateRequest {
90    fn as_any(&self) -> &dyn Any { self }
91
92    fn as_any_mut(&mut self) -> &mut dyn Any { self }
93}
94
95impl Request for StateSyncCandidateRequest {
96    fn timeout(&self, conf: &ProtocolConfiguration) -> Duration {
97        conf.snapshot_candidate_request_timeout
98    }
99
100    fn on_removed(&self, _inflight_keys: &KeyContainer) {}
101
102    fn with_inflight(&mut self, _inflight_keys: &KeyContainer) {}
103
104    fn is_empty(&self) -> bool { false }
105
106    fn resend(&self) -> Option<Box<dyn Request>> { None }
107
108    fn required_capability(&self) -> Option<DynamicCapability> { None }
109}