cfxcore/sync/message/
snapshot_chunk_request.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::{
6    message::{
7        GetMaybeRequestId, Message, MessageProtocolVersionBound, MsgId,
8        RequestId, SetRequestId,
9    },
10    sync::{
11        message::{
12            msgid, Context, DynamicCapability, Handleable, KeyContainer,
13            SnapshotChunkResponse,
14        },
15        request_manager::{AsAny, Request},
16        state::storage::{Chunk, ChunkKey, SnapshotSyncCandidate},
17        Error, ProtocolConfiguration, SYNC_PROTO_V1, SYNC_PROTO_V3,
18    },
19};
20use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
21use network::service::ProtocolVersion;
22use rlp::Encodable;
23use rlp_derive::{RlpDecodable, RlpEncodable};
24use std::{any::Any, time::Duration};
25
26#[derive(Debug, Clone, RlpDecodable, RlpEncodable, DeriveMallocSizeOf)]
27pub struct SnapshotChunkRequest {
28    // request_id for SnapshotChunkRequest is independent from each other
29    // because request_id is set per message when the request is sent.
30    pub request_id: u64,
31    pub snapshot_to_sync: SnapshotSyncCandidate,
32    pub chunk_key: ChunkKey,
33}
34
35build_msg_with_request_id_impl! {
36    SnapshotChunkRequest, msgid::GET_SNAPSHOT_CHUNK,
37    "SnapshotChunkRequest", SYNC_PROTO_V1, SYNC_PROTO_V3
38}
39
40impl SnapshotChunkRequest {
41    pub fn new(
42        snapshot_sync_candidate: SnapshotSyncCandidate, chunk_key: ChunkKey,
43    ) -> Self {
44        SnapshotChunkRequest {
45            request_id: 0,
46            snapshot_to_sync: snapshot_sync_candidate,
47            chunk_key,
48        }
49    }
50}
51
52impl Handleable for SnapshotChunkRequest {
53    fn handle(self, ctx: &Context) -> Result<(), Error> {
54        let snapshot_epoch_id = match &self.snapshot_to_sync {
55            SnapshotSyncCandidate::FullSync {
56                snapshot_epoch_id, ..
57            } => snapshot_epoch_id,
58            _ => bail!(Error::NotSupported(
59                "OneStepSync/IncSync not yet implemented.".into()
60            )),
61        };
62        let chunk = match Chunk::load(
63            snapshot_epoch_id,
64            &self.chunk_key,
65            &ctx.manager.graph.data_man.storage_manager,
66            ctx.manager.protocol_config.chunk_size_byte * 2,
67        ) {
68            Ok(Some(chunk)) => chunk,
69            Err(r) => return Err(r),
70            _ => Chunk::default(),
71        };
72
73        ctx.send_response(&SnapshotChunkResponse {
74            request_id: self.request_id,
75            chunk,
76        })
77    }
78}
79
80impl AsAny for SnapshotChunkRequest {
81    fn as_any(&self) -> &dyn Any { self }
82
83    fn as_any_mut(&mut self) -> &mut dyn Any { self }
84}
85
86impl Request for SnapshotChunkRequest {
87    fn timeout(&self, conf: &ProtocolConfiguration) -> Duration {
88        conf.snapshot_chunk_request_timeout
89    }
90
91    fn on_removed(&self, _inflight_keys: &KeyContainer) {}
92
93    fn with_inflight(&mut self, _inflight_keys: &KeyContainer) {}
94
95    fn is_empty(&self) -> bool { false }
96
97    fn resend(&self) -> Option<Box<dyn Request>> { None }
98
99    fn required_capability(&self) -> Option<DynamicCapability> { None }
100}