1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
use crate::{
    message::{
        GetMaybeRequestId, Message, MessageProtocolVersionBound, MsgId,
        RequestId, SetRequestId,
    },
    sync::{
        message::{
            msgid, Context, DynamicCapability, Handleable, KeyContainer,
            StateSyncCandidateResponse,
        },
        request_manager::{AsAny, Request},
        state::storage::SnapshotSyncCandidate,
        Error, ProtocolConfiguration, SYNC_PROTO_V1, SYNC_PROTO_V3,
    },
};
use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
use network::service::ProtocolVersion;
use rlp::Encodable;
use rlp_derive::{RlpDecodable, RlpEncodable};
use std::{any::Any, time::Duration};

#[derive(Clone, RlpEncodable, RlpDecodable, Debug, DeriveMallocSizeOf)]
pub struct StateSyncCandidateRequest {
    pub request_id: RequestId,
    pub candidates: Vec<SnapshotSyncCandidate>,
}

build_msg_with_request_id_impl! {
    StateSyncCandidateRequest, msgid::STATE_SYNC_CANDIDATE_REQUEST,
    "StateSyncCandidateRequest", SYNC_PROTO_V1, SYNC_PROTO_V3
}

impl Handleable for StateSyncCandidateRequest {
    fn handle(self, ctx: &Context) -> Result<(), Error> {
        let mut supported_candidates =
            Vec::with_capacity(self.candidates.len());
        let storage_manager = ctx
            .manager
            .graph
            .data_man
            .storage_manager
            .get_storage_manager();
        for candidate in self.candidates {
            match candidate {
                SnapshotSyncCandidate::FullSync {
                    height,
                    snapshot_epoch_id,
                } => {
                    match storage_manager
                        .get_snapshot_info_at_epoch(&snapshot_epoch_id)
                    {
                        Some(snapshot_info) => {
                            if snapshot_info.height == height {
                                supported_candidates.push(
                                    SnapshotSyncCandidate::FullSync {
                                        height,
                                        snapshot_epoch_id,
                                    },
                                );
                            } else {
                                warn!(
                                    "Invalid SnapshotSyncCandidate, height unmatch: get {:?}, \
                                    local_height of the snapshot is {}",
                                    candidate, snapshot_info.height);
                            }
                        }
                        None => {
                            debug!(
                                "Requested snapshot not exist: {:?}",
                                candidate
                            );
                        }
                    }
                }
                _ => {
                    warn!("Unsupported candidate: {:?}", candidate);
                }
            }
        }
        ctx.send_response(&StateSyncCandidateResponse {
            request_id: self.request_id,
            supported_candidates,
        })?;

        Ok(())
    }
}

impl AsAny for StateSyncCandidateRequest {
    fn as_any(&self) -> &dyn Any { self }

    fn as_any_mut(&mut self) -> &mut dyn Any { self }
}

impl Request for StateSyncCandidateRequest {
    fn timeout(&self, conf: &ProtocolConfiguration) -> Duration {
        conf.snapshot_candidate_request_timeout
    }

    fn on_removed(&self, _inflight_keys: &KeyContainer) {}

    fn with_inflight(&mut self, _inflight_keys: &KeyContainer) {}

    fn is_empty(&self) -> bool { false }

    fn resend(&self) -> Option<Box<dyn Request>> { None }

    fn required_capability(&self) -> Option<DynamicCapability> { None }
}