cfxcore/sync/message/
snapshot_manifest_request.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::{
6    block_data_manager::BlockExecutionResult,
7    message::{
8        GetMaybeRequestId, Message, MessageProtocolVersionBound, MsgId,
9        RequestId, SetRequestId,
10    },
11    sync::{
12        message::{
13            msgid, Context, DynamicCapability, Handleable, KeyContainer,
14            SnapshotManifestResponse,
15        },
16        request_manager::{AsAny, Request},
17        state::storage::{RangedManifest, SnapshotSyncCandidate},
18        Error, ProtocolConfiguration, SYNC_PROTO_V1, SYNC_PROTO_V3,
19    },
20};
21use cfx_parameters::{
22    consensus::DEFERRED_STATE_EPOCH_COUNT,
23    consensus_internal::REWARD_EPOCH_COUNT,
24};
25use cfx_types::H256;
26use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
27use network::service::ProtocolVersion;
28use primitives::{EpochNumber, StateRoot};
29use rlp::Encodable;
30use rlp_derive::{RlpDecodable, RlpEncodable};
31use std::{any::Any, time::Duration};
32
33#[derive(Debug, Clone, RlpDecodable, RlpEncodable, DeriveMallocSizeOf)]
34pub struct SnapshotManifestRequest {
35    pub request_id: u64,
36    pub snapshot_to_sync: SnapshotSyncCandidate,
37    pub start_chunk: Option<Vec<u8>>,
38    pub trusted_blame_block: Option<H256>,
39}
40
41build_msg_with_request_id_impl! {
42    SnapshotManifestRequest, msgid::GET_SNAPSHOT_MANIFEST,
43    "SnapshotManifestRequest", SYNC_PROTO_V1, SYNC_PROTO_V3
44}
45
46impl Handleable for SnapshotManifestRequest {
47    fn handle(self, ctx: &Context) -> Result<(), Error> {
48        // TODO Handle the case where we cannot serve the snapshot
49        let snapshot_merkle_root;
50        let manifest = match RangedManifest::load(
51            &self.snapshot_to_sync,
52            self.start_chunk.clone(),
53            &ctx.manager.graph.data_man.storage_manager,
54            ctx.manager.protocol_config.chunk_size_byte,
55            ctx.manager.protocol_config.max_chunk_number_in_manifest,
56        ) {
57            Ok(Some((m, merkle_root))) => {
58                snapshot_merkle_root = merkle_root;
59                m
60            }
61            _ => {
62                // Return an empty response to indicate that we cannot serve the
63                // state
64                ctx.send_response(&SnapshotManifestResponse {
65                    request_id: self.request_id,
66                    ..Default::default()
67                })?;
68                return Ok(());
69            }
70        };
71        if self.is_initial_request() {
72            let (state_root_vec, receipt_blame_vec, bloom_blame_vec) =
73                self.get_blame_states(ctx).unwrap_or_default();
74            let block_receipts =
75                self.get_block_receipts(ctx).unwrap_or_default();
76
77            debug!("handle SnapshotManifestRequest {:?}", self,);
78            ctx.send_response(&SnapshotManifestResponse {
79                request_id: self.request_id,
80                manifest,
81                snapshot_merkle_root,
82                state_root_vec,
83                receipt_blame_vec,
84                bloom_blame_vec,
85                block_receipts,
86            })
87        } else {
88            ctx.send_response(&SnapshotManifestResponse {
89                request_id: self.request_id,
90                manifest,
91                snapshot_merkle_root: Default::default(),
92                state_root_vec: Default::default(),
93                receipt_blame_vec: Default::default(),
94                bloom_blame_vec: Default::default(),
95                block_receipts: Default::default(),
96            })
97        }
98    }
99}
100
101impl SnapshotManifestRequest {
102    pub fn new(
103        snapshot_sync_candidate: SnapshotSyncCandidate,
104        trusted_blame_block: Option<H256>, start_chunk: Option<Vec<u8>>,
105    ) -> Self {
106        SnapshotManifestRequest {
107            request_id: 0,
108            snapshot_to_sync: snapshot_sync_candidate,
109            start_chunk,
110            trusted_blame_block,
111        }
112    }
113
114    pub fn is_initial_request(&self) -> bool {
115        self.trusted_blame_block.is_some()
116    }
117
118    /// This function returns the receipts of REWARD_EPOCH_COUNT epochs
119    /// backward from the epoch of *snapshot_to_sync*. It needs to
120    /// return receipts of so many epochs to the request sender due to
121    /// the following reason. Let the epoch of *snapshot_to_sync* be E(i).
122    /// In the node of the request sender, to compute the state of E(i+1),
123    /// it would require to compute and include the reward of
124    /// E(i+1-REWARD_EPOCH_COUNT).
125    fn get_block_receipts(
126        &self, ctx: &Context,
127    ) -> Option<Vec<BlockExecutionResult>> {
128        let mut epoch_receipts = Vec::new();
129        let mut epoch_hash =
130            self.snapshot_to_sync.get_snapshot_epoch_id().clone();
131        for i in 0..REWARD_EPOCH_COUNT {
132            if let Some(block) =
133                ctx.manager.graph.data_man.block_header_by_hash(&epoch_hash)
134            {
135                match ctx.manager.graph.consensus.get_block_hashes_by_epoch(
136                    EpochNumber::Number(block.height()),
137                ) {
138                    Ok(ordered_executable_epoch_blocks) => {
139                        if i == 0
140                            && *ordered_executable_epoch_blocks.last().unwrap()
141                                != epoch_hash
142                        {
143                            debug!(
144                                "Snapshot epoch id mismatched for epoch {}",
145                                block.height()
146                            );
147                            return None;
148                        }
149                        for hash in &ordered_executable_epoch_blocks {
150                            match ctx
151                                .manager
152                                .graph
153                                .data_man
154                                .block_execution_result_by_hash_with_epoch(
155                                    hash,
156                                    &epoch_hash,
157                                    false, /* update_pivot_assumption */
158                                    false, /* update_cache */
159                                ) {
160                                Some(block_execution_result) => {
161                                    epoch_receipts.push(block_execution_result);
162                                }
163                                None => {
164                                    debug!("Cannot get execution result for hash={:?} epoch_hash={:?}",
165                                        hash, epoch_hash
166                                    );
167                                    return None;
168                                }
169                            }
170                        }
171                    }
172                    Err(_) => {
173                        debug!(
174                            "Cannot get block hashes for epoch {}",
175                            block.height()
176                        );
177                        return None;
178                    }
179                }
180                // We have reached original genesis
181                if block.height() == 0 {
182                    break;
183                }
184                epoch_hash = block.parent_hash().clone();
185            } else {
186                warn!(
187                    "failed to find block={} in db, peer={}",
188                    epoch_hash, ctx.node_id
189                );
190                return None;
191            }
192        }
193        Some(epoch_receipts)
194    }
195
196    /// return an empty vec if some information not exist in db, caller may find
197    /// another peer to send the request; otherwise return a state_blame_vec
198    /// of the requested block
199    fn get_blame_states(
200        &self, ctx: &Context,
201    ) -> Option<(Vec<StateRoot>, Vec<H256>, Vec<H256>)> {
202        let trusted_block = ctx
203            .manager
204            .graph
205            .data_man
206            .block_header_by_hash(&self.trusted_blame_block?)?;
207        let snapshot_epoch_block =
208            ctx.manager.graph.data_man.block_header_by_hash(
209                self.snapshot_to_sync.get_snapshot_epoch_id(),
210            )?;
211        if trusted_block.height() < snapshot_epoch_block.height() {
212            warn!(
213                "receive invalid snapshot manifest request from peer={}",
214                ctx.node_id
215            );
216            return None;
217        }
218        let mut block_hash = trusted_block.hash();
219        let mut trusted_block_height = trusted_block.height();
220        let mut blame_count = trusted_block.blame();
221        let mut deferred_block_hash = block_hash;
222        for _ in 0..DEFERRED_STATE_EPOCH_COUNT {
223            deferred_block_hash = *ctx
224                .manager
225                .graph
226                .data_man
227                .block_header_by_hash(&deferred_block_hash)
228                .expect("All headers exist")
229                .parent_hash();
230        }
231
232        let min_vec_len = if snapshot_epoch_block.height() == 0 {
233            trusted_block.height()
234                - DEFERRED_STATE_EPOCH_COUNT
235                - snapshot_epoch_block.height()
236                + 1
237        } else {
238            trusted_block.height()
239                - DEFERRED_STATE_EPOCH_COUNT
240                - snapshot_epoch_block.height()
241                + REWARD_EPOCH_COUNT
242        };
243        let mut state_root_vec = Vec::with_capacity(min_vec_len as usize);
244        let mut receipt_blame_vec = Vec::with_capacity(min_vec_len as usize);
245        let mut bloom_blame_vec = Vec::with_capacity(min_vec_len as usize);
246
247        // loop until we have enough length of `state_root_vec`
248        loop {
249            if let Some(block) =
250                ctx.manager.graph.data_man.block_header_by_hash(&block_hash)
251            {
252                // We've jumped to another trusted block.
253                if block.height() + blame_count as u64 + 1
254                    == trusted_block_height
255                {
256                    trusted_block_height = block.height();
257                    blame_count = block.blame()
258                }
259                if let Some(commitment) = ctx
260                    .manager
261                    .graph
262                    .data_man
263                    .get_epoch_execution_commitment_with_db(
264                        &deferred_block_hash,
265                    )
266                {
267                    state_root_vec.push(
268                        commitment.state_root_with_aux_info.state_root.clone(),
269                    );
270                    receipt_blame_vec.push(commitment.receipts_root);
271                    bloom_blame_vec.push(commitment.logs_bloom_hash);
272                } else {
273                    warn!(
274                        "failed to find block={} in db, peer={}",
275                        block_hash, ctx.node_id
276                    );
277                    return None;
278                }
279                // We've collected enough states.
280                if block.height() + blame_count as u64 == trusted_block_height
281                    && state_root_vec.len() >= min_vec_len as usize
282                {
283                    break;
284                }
285                block_hash = *block.parent_hash();
286                deferred_block_hash = *ctx
287                    .manager
288                    .graph
289                    .data_man
290                    .block_header_by_hash(&deferred_block_hash)
291                    .expect("All headers received")
292                    .parent_hash();
293            } else {
294                warn!(
295                    "failed to find block={} in db, peer={}",
296                    block_hash, ctx.node_id
297                );
298                return None;
299            }
300        }
301
302        Some((state_root_vec, receipt_blame_vec, bloom_blame_vec))
303    }
304}
305
306impl AsAny for SnapshotManifestRequest {
307    fn as_any(&self) -> &dyn Any { self }
308
309    fn as_any_mut(&mut self) -> &mut dyn Any { self }
310}
311
312impl Request for SnapshotManifestRequest {
313    fn timeout(&self, conf: &ProtocolConfiguration) -> Duration {
314        conf.snapshot_manifest_request_timeout
315    }
316
317    fn on_removed(&self, _inflight_keys: &KeyContainer) {}
318
319    fn with_inflight(&mut self, _inflight_keys: &KeyContainer) {}
320
321    fn is_empty(&self) -> bool { false }
322
323    fn resend(&self) -> Option<Box<dyn Request>> { None }
324
325    fn required_capability(&self) -> Option<DynamicCapability> { None }
326}