cfxcore/sync/message/
snapshot_manifest_request.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::{
6    block_data_manager::BlockExecutionResult,
7    message::{
8        GetMaybeRequestId, Message, MessageProtocolVersionBound, MsgId,
9        RequestId, SetRequestId,
10    },
11    sync::{
12        message::{
13            msgid, Context, DynamicCapability, Handleable, KeyContainer,
14            SnapshotManifestResponse,
15        },
16        request_manager::{AsAny, Request},
17        state::storage::{RangedManifest, SnapshotSyncCandidate},
18        Error, ProtocolConfiguration, SYNC_PROTO_V1, SYNC_PROTO_V3,
19    },
20};
21use cfx_parameters::{
22    consensus::DEFERRED_STATE_EPOCH_COUNT,
23    consensus_internal::REWARD_EPOCH_COUNT,
24};
25use cfx_types::H256;
26use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
27use network::service::ProtocolVersion;
28use primitives::{EpochNumber, StateRoot};
29use rlp::Encodable;
30use rlp_derive::{RlpDecodable, RlpEncodable};
31use std::{any::Any, time::Duration};
32
33#[derive(Debug, Clone, RlpDecodable, RlpEncodable, DeriveMallocSizeOf)]
34pub struct SnapshotManifestRequest {
35    pub request_id: u64,
36    pub snapshot_to_sync: SnapshotSyncCandidate,
37    pub start_chunk: Option<Vec<u8>>,
38    pub trusted_blame_block: Option<H256>,
39}
40
41build_msg_with_request_id_impl! {
42    SnapshotManifestRequest, msgid::GET_SNAPSHOT_MANIFEST,
43    "SnapshotManifestRequest", SYNC_PROTO_V1, SYNC_PROTO_V3
44}
45
46impl Handleable for SnapshotManifestRequest {
47    fn handle(self, ctx: &Context) -> Result<(), Error> {
48        // TODO Handle the case where we cannot serve the snapshot
49        let snapshot_merkle_root;
50        let manifest = match RangedManifest::load(
51            &self.snapshot_to_sync,
52            self.start_chunk.clone(),
53            &ctx.manager.graph.data_man.storage_manager,
54            ctx.manager.protocol_config.chunk_size_byte,
55            ctx.manager.protocol_config.max_chunk_number_in_manifest,
56        ) {
57            Ok(Some((m, merkle_root))) => {
58                snapshot_merkle_root = merkle_root;
59                m
60            }
61            _ => {
62                // Return an empty response to indicate that we cannot serve the
63                // state
64                ctx.send_response(&SnapshotManifestResponse {
65                    request_id: self.request_id,
66                    ..Default::default()
67                })?;
68                return Ok(());
69            }
70        };
71        if self.is_initial_request() {
72            let (state_root_vec, receipt_blame_vec, bloom_blame_vec) =
73                self.get_blame_states(ctx).unwrap_or_default();
74            let block_receipts =
75                self.get_block_receipts(ctx).unwrap_or_default();
76
77            debug!("handle SnapshotManifestRequest {:?}", self,);
78            ctx.send_response(&SnapshotManifestResponse {
79                request_id: self.request_id,
80                manifest,
81                snapshot_merkle_root,
82                state_root_vec,
83                receipt_blame_vec,
84                bloom_blame_vec,
85                block_receipts,
86            })
87        } else {
88            ctx.send_response(&SnapshotManifestResponse {
89                request_id: self.request_id,
90                manifest,
91                snapshot_merkle_root: Default::default(),
92                state_root_vec: Default::default(),
93                receipt_blame_vec: Default::default(),
94                bloom_blame_vec: Default::default(),
95                block_receipts: Default::default(),
96            })
97        }
98    }
99}
100
101impl SnapshotManifestRequest {
102    pub fn new(
103        snapshot_sync_candidate: SnapshotSyncCandidate,
104        trusted_blame_block: Option<H256>, start_chunk: Option<Vec<u8>>,
105    ) -> Self {
106        SnapshotManifestRequest {
107            request_id: 0,
108            snapshot_to_sync: snapshot_sync_candidate,
109            start_chunk,
110            trusted_blame_block,
111        }
112    }
113
114    pub fn is_initial_request(&self) -> bool {
115        self.trusted_blame_block.is_some()
116    }
117
118    /// This function returns the receipts of REWARD_EPOCH_COUNT epochs
119    /// backward from the epoch of *snapshot_to_sync*. It needs to
120    /// return receipts of so many epochs to the request sender due to
121    /// the following reason. Let the epoch of *snapshot_to_sync* be E(i).
122    /// In the node of the request sender, to compute the state of E(i+1),
123    /// it would require to compute and include the reward of
124    /// E(i+1-REWARD_EPOCH_COUNT).
125    fn get_block_receipts(
126        &self, ctx: &Context,
127    ) -> Option<Vec<BlockExecutionResult>> {
128        let mut epoch_receipts = Vec::new();
129        let mut epoch_hash =
130            self.snapshot_to_sync.get_snapshot_epoch_id().clone();
131        for i in 0..REWARD_EPOCH_COUNT {
132            if let Some(block) =
133                ctx.manager.graph.data_man.block_header_by_hash(&epoch_hash)
134            {
135                match ctx.manager.graph.consensus.get_block_hashes_by_epoch(
136                    EpochNumber::Number(block.height()),
137                ) {
138                    Ok(ordered_executable_epoch_blocks) => {
139                        if i == 0
140                            && *ordered_executable_epoch_blocks.last().unwrap()
141                                != epoch_hash
142                        {
143                            debug!(
144                                "Snapshot epoch id mismatched for epoch {}",
145                                block.height()
146                            );
147                            return None;
148                        }
149                        for hash in &ordered_executable_epoch_blocks {
150                            match ctx
151                                .manager
152                                .graph
153                                .data_man
154                                .block_execution_result_by_hash_with_epoch(
155                                    hash,
156                                    &epoch_hash,
157                                    false, /* update_pivot_assumption */
158                                    false, /* update_cache */
159                                ) {
160                                Some(block_execution_result) => {
161                                    epoch_receipts.push(block_execution_result);
162                                }
163                                None => {
164                                    debug!("Cannot get execution result for hash={:?} epoch_hash={:?}",
165                                        hash, epoch_hash
166                                    );
167                                    return None;
168                                }
169                            }
170                        }
171                    }
172                    Err(_) => {
173                        debug!(
174                            "Cannot get block hashes for epoch {}",
175                            block.height()
176                        );
177                        return None;
178                    }
179                }
180                // We have reached original genesis
181                if block.height() == 0 {
182                    break;
183                }
184                epoch_hash = block.parent_hash().clone();
185            } else {
186                warn!(
187                    "failed to find block={} in db, peer={}",
188                    epoch_hash, ctx.node_id
189                );
190                return None;
191            }
192        }
193        Some(epoch_receipts)
194    }
195
196    /// return an empty vec if some information not exist in db, caller may find
197    /// another peer to send the request; otherwise return a state_blame_vec
198    /// of the requested block
199    fn get_blame_states(
200        &self, ctx: &Context,
201    ) -> Option<(Vec<StateRoot>, Vec<H256>, Vec<H256>)> {
202        let trusted_block = ctx
203            .manager
204            .graph
205            .data_man
206            .block_header_by_hash(&self.trusted_blame_block?)?;
207        let snapshot_epoch_block =
208            ctx.manager.graph.data_man.block_header_by_hash(
209                self.snapshot_to_sync.get_snapshot_epoch_id(),
210            )?;
211        let trusted_height = trusted_block.height();
212        let snapshot_height = snapshot_epoch_block.height();
213        // The deferred-state walk below requires `trusted_height` to be at
214        // least `DEFERRED_STATE_EPOCH_COUNT` ahead of `snapshot_height`;
215        // otherwise the `min_vec_len` arithmetic would underflow on `u64`
216        // and panic under `overflow-checks=true`. Reject such requests
217        // (which are only producible by a malicious or misconfigured peer)
218        // before performing any further work.
219        let base_len = match trusted_height
220            .checked_sub(DEFERRED_STATE_EPOCH_COUNT)
221            .and_then(|v| v.checked_sub(snapshot_height))
222        {
223            Some(v) => v,
224            None => {
225                warn!(
226                    "receive invalid snapshot manifest request from peer={}, \
227                     trusted_height={}, snapshot_height={}",
228                    ctx.node_id, trusted_height, snapshot_height,
229                );
230                return None;
231            }
232        };
233        let mut block_hash = trusted_block.hash();
234        let mut trusted_block_height = trusted_height;
235        let mut blame_count = trusted_block.blame();
236        let mut deferred_block_hash = block_hash;
237        for _ in 0..DEFERRED_STATE_EPOCH_COUNT {
238            deferred_block_hash = *ctx
239                .manager
240                .graph
241                .data_man
242                .block_header_by_hash(&deferred_block_hash)?
243                .parent_hash();
244        }
245
246        let min_vec_len = if snapshot_height == 0 {
247            base_len + 1
248        } else {
249            base_len + REWARD_EPOCH_COUNT
250        };
251        let mut state_root_vec = Vec::with_capacity(min_vec_len as usize);
252        let mut receipt_blame_vec = Vec::with_capacity(min_vec_len as usize);
253        let mut bloom_blame_vec = Vec::with_capacity(min_vec_len as usize);
254
255        // loop until we have enough length of `state_root_vec`
256        loop {
257            if let Some(block) =
258                ctx.manager.graph.data_man.block_header_by_hash(&block_hash)
259            {
260                // We've jumped to another trusted block.
261                if block.height() + blame_count as u64 + 1
262                    == trusted_block_height
263                {
264                    trusted_block_height = block.height();
265                    blame_count = block.blame()
266                }
267                if let Some(commitment) = ctx
268                    .manager
269                    .graph
270                    .data_man
271                    .get_epoch_execution_commitment_with_db(
272                        &deferred_block_hash,
273                    )
274                {
275                    state_root_vec.push(
276                        commitment.state_root_with_aux_info.state_root.clone(),
277                    );
278                    receipt_blame_vec.push(commitment.receipts_root);
279                    bloom_blame_vec.push(commitment.logs_bloom_hash);
280                } else {
281                    warn!(
282                        "failed to find block={} in db, peer={}",
283                        block_hash, ctx.node_id
284                    );
285                    return None;
286                }
287                // We've collected enough states.
288                if block.height() + blame_count as u64 == trusted_block_height
289                    && state_root_vec.len() >= min_vec_len as usize
290                {
291                    break;
292                }
293                block_hash = *block.parent_hash();
294                deferred_block_hash = *ctx
295                    .manager
296                    .graph
297                    .data_man
298                    .block_header_by_hash(&deferred_block_hash)?
299                    .parent_hash();
300            } else {
301                warn!(
302                    "failed to find block={} in db, peer={}",
303                    block_hash, ctx.node_id
304                );
305                return None;
306            }
307        }
308
309        Some((state_root_vec, receipt_blame_vec, bloom_blame_vec))
310    }
311}
312
313impl AsAny for SnapshotManifestRequest {
314    fn as_any(&self) -> &dyn Any { self }
315
316    fn as_any_mut(&mut self) -> &mut dyn Any { self }
317}
318
319impl Request for SnapshotManifestRequest {
320    fn timeout(&self, conf: &ProtocolConfiguration) -> Duration {
321        conf.snapshot_manifest_request_timeout
322    }
323
324    fn on_removed(&self, _inflight_keys: &KeyContainer) {}
325
326    fn with_inflight(&mut self, _inflight_keys: &KeyContainer) {}
327
328    fn is_empty(&self) -> bool { false }
329
330    fn resend(&self) -> Option<Box<dyn Request>> { None }
331
332    fn required_capability(&self) -> Option<DynamicCapability> { None }
333}