cfxcore/sync/state/state_sync_manifest/
snapshot_manifest_manager.rs

1#![allow(unused)]
2
3use crate::{
4    block_data_manager::BlockExecutionResult,
5    message::NetworkContext,
6    sync::{
7        error::Error,
8        message::{
9            msgid, Context, SnapshotManifestRequest, SnapshotManifestResponse,
10        },
11        state::storage::SnapshotSyncCandidate,
12        synchronization_state::PeerFilter,
13        SynchronizationProtocolHandler,
14    },
15    verification::compute_receipts_root,
16};
17use cfx_internal_common::{StateRootAuxInfo, StateRootWithAuxInfo};
18use cfx_parameters::{
19    consensus::DEFERRED_STATE_EPOCH_COUNT,
20    consensus_internal::REWARD_EPOCH_COUNT,
21};
22use cfx_storage::{
23    storage_db::{SnapshotInfo, SnapshotKeptToProvideSyncStatus},
24    TrieProof,
25};
26use cfx_types::{option_vec_to_hex, H256};
27use network::node_table::NodeId;
28use primitives::{
29    BlockHeaderBuilder, BlockReceipts, EpochId, EpochNumber, StateRoot,
30    StorageKey, StorageKeyWithSpace, NULL_EPOCH,
31};
32use rand::{
33    rng,
34    seq::{IndexedRandom, SliceRandom},
35};
36
37use std::{
38    collections::HashSet,
39    fmt::{Debug, Formatter},
40    sync::Arc,
41    time::{Duration, Instant},
42};
43
44pub struct SnapshotManifestManager {
45    manifest_request_status: Option<(Instant, NodeId)>,
46    pub snapshot_candidate: SnapshotSyncCandidate,
47    trusted_blame_block: H256,
48    pub active_peers: HashSet<NodeId>,
49
50    pub chunk_boundaries: Vec<Vec<u8>>,
51    pub chunk_boundary_proofs: Vec<TrieProof>,
52
53    related_data: Option<RelatedData>,
54    config: SnapshotManifestConfig,
55}
56
57#[derive(Clone)]
58pub struct RelatedData {
59    /// State root verified by blame.
60    pub true_state_root_by_blame_info: StateRootWithAuxInfo,
61    /// Point to the corresponding entry to the snapshot in the blame vectors.
62    pub blame_vec_offset: usize,
63    pub receipt_blame_vec: Vec<H256>,
64    pub bloom_blame_vec: Vec<H256>,
65    pub epoch_receipts: Vec<(H256, H256, Arc<BlockReceipts>)>,
66    pub snapshot_info: SnapshotInfo,
67    pub parent_snapshot_info: Option<SnapshotInfo>,
68}
69
70impl SnapshotManifestManager {
71    pub fn new_and_start(
72        snapshot_candidate: SnapshotSyncCandidate, trusted_blame_block: H256,
73        active_peers: HashSet<NodeId>, config: SnapshotManifestConfig,
74        io: &dyn NetworkContext, sync_handler: &SynchronizationProtocolHandler,
75    ) -> Self {
76        let mut manager = Self {
77            manifest_request_status: None,
78            snapshot_candidate,
79            trusted_blame_block,
80            active_peers,
81            chunk_boundaries: vec![],
82            chunk_boundary_proofs: vec![],
83            related_data: None,
84            config,
85        };
86        manager.request_manifest(io, sync_handler, None);
87        manager
88    }
89
90    pub fn handle_snapshot_manifest_response(
91        &mut self, ctx: &Context, response: SnapshotManifestResponse,
92        request: &SnapshotManifestRequest,
93    ) -> Result<Option<RelatedData>, Error> {
94        match self
95            .handle_snapshot_manifest_response_impl(ctx, response, request)
96        {
97            Ok(r) => Ok(r),
98            Err(e) => {
99                self.note_failure(&ctx.node_id);
100                Err(e)
101            }
102        }
103    }
104
105    fn handle_snapshot_manifest_response_impl(
106        &mut self, ctx: &Context, response: SnapshotManifestResponse,
107        request: &SnapshotManifestRequest,
108    ) -> Result<Option<RelatedData>, Error> {
109        // new era started
110        if request.snapshot_to_sync != self.snapshot_candidate {
111            info!(
112                "The received snapshot manifest doesn't match the current snapshot_candidate,\
113                 current snapshot_candidate = {:?}, requested sync candidate = {:?}",
114                self.snapshot_candidate,
115                request.snapshot_to_sync);
116            return Ok(None);
117        }
118
119        info!(
120            "Snapshot manifest received, checkpoint = {:?}, chunk_boundaries.len()={}, \
121            start={}, next={}",
122            self.snapshot_candidate, response.manifest.chunk_boundaries.len(),
123            option_vec_to_hex(request.start_chunk.as_ref()), option_vec_to_hex(response.manifest.next.as_ref())
124        );
125
126        // validate blame state if requested
127        if request.is_initial_request() {
128            if !self.chunk_boundaries.is_empty() {
129                bail!(Error::InvalidSnapshotManifest(
130                    "Initial manifest is not expected".into(),
131                ));
132            }
133            let (
134                blame_vec_offset,
135                state_root_with_aux_info,
136                snapshot_info,
137                parent_snapshot_info,
138            ) = match Self::validate_blame_states(
139                ctx,
140                self.snapshot_candidate.get_snapshot_epoch_id(),
141                &self.trusted_blame_block,
142                &response.state_root_vec,
143                &response.receipt_blame_vec,
144                &response.bloom_blame_vec,
145            ) {
146                Some(info_tuple) => info_tuple,
147                None => {
148                    warn!("failed to validate the blame state, re-sync manifest from other peer");
149                    self.resync_manifest(ctx);
150                    bail!(Error::InvalidSnapshotManifest(
151                        "invalid blame state in manifest".into(),
152                    ));
153                }
154            };
155
156            let epoch_receipts =
157                match SnapshotManifestManager::validate_epoch_receipts(
158                    ctx,
159                    blame_vec_offset,
160                    self.snapshot_candidate.get_snapshot_epoch_id(),
161                    &response.receipt_blame_vec,
162                    &response.bloom_blame_vec,
163                    &response.block_receipts,
164                ) {
165                    Some(epoch_receipts) => epoch_receipts,
166                    None => {
167                        warn!("failed to validate the epoch receipts, re-sync manifest from other peer");
168                        self.resync_manifest(ctx);
169                        bail!(Error::InvalidSnapshotManifest(
170                            "invalid epoch receipts in manifest".into(),
171                        ));
172                    }
173                };
174
175            // Check proofs for keys.
176            if let Err(e) =
177                response.manifest.validate(&snapshot_info.merkle_root)
178            {
179                warn!("failed to validate snapshot manifest, error = {:?}", e);
180                bail!(Error::InvalidSnapshotManifest(
181                    "invalid chunk proofs in manifest".into(),
182                ));
183            }
184            self.related_data = Some(RelatedData {
185                true_state_root_by_blame_info: state_root_with_aux_info,
186                blame_vec_offset,
187                receipt_blame_vec: response.receipt_blame_vec,
188                bloom_blame_vec: response.bloom_blame_vec,
189                epoch_receipts,
190                snapshot_info,
191                parent_snapshot_info,
192            });
193        } else {
194            if self.chunk_boundaries.is_empty() {
195                bail!(Error::InvalidSnapshotManifest(
196                    "Non-initial manifest is not expected".into()
197                ));
198            }
199            debug_assert_eq!(
200                request.start_chunk.as_ref(),
201                self.chunk_boundaries.last()
202            );
203            if let Some(related_data) = &self.related_data {
204                // Check proofs for keys.
205                if let Err(e) = response
206                    .manifest
207                    .validate(&related_data.snapshot_info.merkle_root)
208                {
209                    warn!(
210                        "failed to validate snapshot manifest, error = {:?}",
211                        e
212                    );
213                    bail!(Error::InvalidSnapshotManifest(
214                        "invalid chunk proofs in manifest".into(),
215                    ));
216                }
217            }
218        }
219        // The first element is `start_key` and overlaps with the previous
220        // manifest.
221        self.chunk_boundaries
222            .extend_from_slice(&response.manifest.chunk_boundaries);
223        self.chunk_boundary_proofs
224            .extend_from_slice(&response.manifest.chunk_boundary_proofs);
225        if response.manifest.next.is_none() {
226            return Ok(self.related_data.clone());
227        } else {
228            self.request_manifest(ctx.io, ctx.manager, response.manifest.next);
229        }
230        Ok(None)
231    }
232
233    /// request manifest from random peer
234    pub fn request_manifest(
235        &mut self, io: &dyn NetworkContext,
236        sync_handler: &SynchronizationProtocolHandler,
237        start_chunk: Option<Vec<u8>>,
238    ) {
239        let maybe_trusted_blame_block = if start_chunk.is_none() {
240            Some(self.trusted_blame_block.clone())
241        } else {
242            None
243        };
244        let request = SnapshotManifestRequest::new(
245            // Safe to unwrap since it's guaranteed to be Some(..)
246            self.snapshot_candidate.clone(),
247            maybe_trusted_blame_block,
248            start_chunk,
249        );
250
251        let available_peers = PeerFilter::new(msgid::GET_SNAPSHOT_MANIFEST)
252            .choose_from(&self.active_peers)
253            .select_all(&sync_handler.syn);
254        let maybe_peer = available_peers.choose(&mut rng()).map(|p| *p);
255        if let Some(peer) = maybe_peer {
256            self.manifest_request_status = Some((Instant::now(), peer));
257            sync_handler.request_manager.request_with_delay(
258                io,
259                Box::new(request),
260                Some(peer),
261                None,
262            );
263        }
264    }
265
266    fn resync_manifest(&mut self, ctx: &Context) {
267        self.request_manifest(
268            ctx.io,
269            ctx.manager,
270            self.chunk_boundaries.last().cloned(),
271        );
272    }
273
274    pub fn check_timeout(&mut self, ctx: &Context) {
275        if let Some((manifest_start_time, peer)) = &self.manifest_request_status
276        {
277            if manifest_start_time.elapsed()
278                > self.config.manifest_request_timeout
279            {
280                self.active_peers.remove(peer);
281                self.manifest_request_status = None;
282                self.resync_manifest(ctx);
283            }
284        }
285    }
286
287    pub fn is_inactive(&self) -> bool { self.active_peers.is_empty() }
288
289    pub fn validate_blame_states(
290        ctx: &Context, snapshot_epoch_id: &H256, trusted_blame_block: &H256,
291        state_root_vec: &Vec<StateRoot>, receipt_blame_vec: &Vec<H256>,
292        bloom_blame_vec: &Vec<H256>,
293    ) -> Option<(
294        usize,
295        StateRootWithAuxInfo,
296        SnapshotInfo,
297        Option<SnapshotInfo>,
298    )> {
299        let mut state_blame_vec = vec![];
300
301        // these two header must exist in disk, it's safe to unwrap
302        let snapshot_block_header = ctx
303            .manager
304            .graph
305            .data_man
306            .block_header_by_hash(snapshot_epoch_id)
307            .expect("block header must exist for snapshot to sync");
308        let trusted_blame_block = ctx
309            .manager
310            .graph
311            .data_man
312            .block_header_by_hash(trusted_blame_block)
313            .expect("trusted_blame_block header must exist");
314
315        // check snapshot position in `out_state_blame_vec`
316        let offset = (trusted_blame_block.height()
317            - (snapshot_block_header.height() + DEFERRED_STATE_EPOCH_COUNT))
318            as usize;
319        if offset >= state_root_vec.len() {
320            warn!("validate_blame_states: not enough state_root");
321            return None;
322        }
323
324        let min_vec_len = if snapshot_block_header.height() == 0 {
325            trusted_blame_block.height()
326                - DEFERRED_STATE_EPOCH_COUNT
327                - snapshot_block_header.height()
328                + 1
329        } else {
330            trusted_blame_block.height()
331                - DEFERRED_STATE_EPOCH_COUNT
332                - snapshot_block_header.height()
333                + REWARD_EPOCH_COUNT
334        };
335        let mut trusted_blocks = Vec::new();
336        let mut trusted_block_height = trusted_blame_block.height();
337        let mut blame_count = trusted_blame_block.blame();
338        let mut block_hash = trusted_blame_block.hash();
339        let mut vec_len: usize = 0;
340        trusted_blocks.push(trusted_blame_block);
341
342        // verify the length of vector.
343        loop {
344            vec_len += 1;
345            let block = ctx
346                .manager
347                .graph
348                .data_man
349                .block_header_by_hash(&block_hash)
350                .expect("block header must exist");
351            // We've jump to another trusted block.
352            if block.height() + blame_count as u64 + 1 == trusted_block_height {
353                trusted_block_height = block.height();
354                blame_count = block.blame();
355                trusted_blocks.push(block.clone());
356            }
357            if block.height() + blame_count as u64 == trusted_block_height
358                && vec_len >= min_vec_len as usize
359            {
360                break;
361            }
362            block_hash = *block.parent_hash();
363        }
364        // verify the length of vector
365        if vec_len != state_root_vec.len() {
366            warn!(
367                "wrong length of state_root_vec, expected {}, but {} found",
368                vec_len,
369                state_root_vec.len()
370            );
371            return None;
372        }
373        // Construct out_state_blame_vec.
374        state_blame_vec.clear();
375        for state_root in state_root_vec {
376            state_blame_vec.push(state_root.compute_state_root_hash());
377        }
378        let mut slice_begin = 0;
379        for trusted_block in trusted_blocks {
380            let slice_end = slice_begin + trusted_block.blame() as usize + 1;
381            let deferred_state_root = if trusted_block.blame() == 0 {
382                state_blame_vec[slice_begin].clone()
383            } else {
384                BlockHeaderBuilder::compute_blame_state_root_vec_root(
385                    state_blame_vec[slice_begin..slice_end].to_vec(),
386                )
387            };
388            let deferred_receipts_root = if trusted_block.blame() == 0 {
389                receipt_blame_vec[slice_begin].clone()
390            } else {
391                BlockHeaderBuilder::compute_blame_state_root_vec_root(
392                    receipt_blame_vec[slice_begin..slice_end].to_vec(),
393                )
394            };
395            let deferred_logs_bloom_hash = if trusted_block.blame() == 0 {
396                bloom_blame_vec[slice_begin].clone()
397            } else {
398                BlockHeaderBuilder::compute_blame_state_root_vec_root(
399                    bloom_blame_vec[slice_begin..slice_end].to_vec(),
400                )
401            };
402            // verify `deferred_state_root`, `deferred_receipts_root` and
403            // `deferred_logs_bloom_hash`
404            if deferred_state_root != *trusted_block.deferred_state_root()
405                || deferred_receipts_root
406                    != *trusted_block.deferred_receipts_root()
407                || deferred_logs_bloom_hash
408                    != *trusted_block.deferred_logs_bloom_hash()
409            {
410                warn!("root mismatch: (state_root, receipts_root, logs_bloom_hash) \
411                should be ({:?} {:?} {:?}), get ({:?} {:?} {:?})",
412                      trusted_block.deferred_state_root(),
413                      trusted_block.deferred_receipts_root(),
414                      trusted_block.deferred_logs_bloom_hash(),
415                      deferred_state_root,
416                      deferred_receipts_root,
417                      deferred_logs_bloom_hash,
418                );
419                return None;
420            }
421            slice_begin = slice_end;
422        }
423
424        let snapshot_epoch_count =
425            ctx.manager.graph.data_man.get_snapshot_epoch_count();
426        let (parent_snapshot_epoch, pivot_chain_parts) =
427            ctx.manager.graph.data_man.get_parent_epochs_for(
428                snapshot_epoch_id.clone(),
429                snapshot_epoch_count as u64,
430            );
431
432        let parent_snapshot_height = if parent_snapshot_epoch == NULL_EPOCH {
433            0
434        } else {
435            ctx.manager
436                .graph
437                .data_man
438                .block_header_by_hash(&parent_snapshot_epoch)
439                .unwrap()
440                .height()
441        };
442        let mut snapshot_state_root = state_root_vec[offset].clone();
443        let state_root_hash = state_root_vec[offset].compute_state_root_hash();
444
445        let snapshot_before_stable_checkpoint = if snapshot_block_header
446            .height()
447            > snapshot_epoch_count as u64
448        {
449            let (grandparent_snapshot_epoch, grandparent_pivot_chain_parts) =
450                ctx.manager.graph.data_man.get_parent_epochs_for(
451                    parent_snapshot_epoch.clone(),
452                    snapshot_epoch_count as u64,
453                );
454
455            let grandparent_snapshot_height =
456                if grandparent_snapshot_epoch == NULL_EPOCH {
457                    0
458                } else {
459                    ctx.manager
460                        .graph
461                        .data_man
462                        .block_header_by_hash(&grandparent_snapshot_epoch)
463                        .unwrap()
464                        .height()
465                };
466            debug!(
467                "grandparent snapshot epoch {:?}, height {}",
468                grandparent_snapshot_epoch, grandparent_snapshot_height
469            );
470
471            Some(SnapshotInfo {
472                // Prevent immediate removed by setting 'InfoOnly' to avoid
473                snapshot_info_kept_to_provide_sync:
474                    SnapshotKeptToProvideSyncStatus::InfoOnly,
475                serve_one_step_sync: false,
476                merkle_root: state_root_vec[offset - 1].snapshot_root,
477                height: snapshot_block_header.height()
478                    - snapshot_epoch_count as u64,
479                parent_snapshot_epoch_id: grandparent_snapshot_epoch,
480                parent_snapshot_height: grandparent_snapshot_height,
481                pivot_chain_parts: grandparent_pivot_chain_parts,
482            })
483        } else {
484            None
485        };
486
487        Some((
488            offset,
489            StateRootWithAuxInfo {
490                state_root: snapshot_state_root,
491                aux_info: StateRootAuxInfo {
492                    snapshot_epoch_id: snapshot_epoch_id.clone(),
493                    // This field will not be used
494                    delta_mpt_key_padding:
495                        StorageKeyWithSpace::delta_mpt_padding(
496                            &state_root_vec[offset].snapshot_root,
497                            &state_root_vec[offset].intermediate_delta_root,
498                        ),
499                    intermediate_epoch_id: parent_snapshot_epoch,
500                    // We don't necessarily need to know because
501                    // the execution of the next epoch shifts delta MPT.
502                    maybe_intermediate_mpt_key_padding: None,
503                    state_root_hash,
504                },
505            },
506            SnapshotInfo {
507                snapshot_info_kept_to_provide_sync: Default::default(),
508                serve_one_step_sync: false,
509                // We need the extra -1 to get a state root that points to the
510                // snapshot we want.
511                merkle_root: state_root_vec[offset
512                    - ctx
513                        .manager
514                        .graph
515                        .data_man
516                        .get_snapshot_blame_plus_depth()]
517                .snapshot_root,
518                height: snapshot_block_header.height(),
519                parent_snapshot_epoch_id: parent_snapshot_epoch,
520                parent_snapshot_height,
521                pivot_chain_parts,
522            },
523            snapshot_before_stable_checkpoint,
524        ))
525    }
526
527    pub fn validate_epoch_receipts(
528        ctx: &Context, blame_vec_offset: usize, snapshot_epoch_id: &EpochId,
529        receipt_blame_vec: &Vec<H256>, bloom_blame_vec: &Vec<H256>,
530        block_receipts: &Vec<BlockExecutionResult>,
531    ) -> Option<Vec<(H256, H256, Arc<BlockReceipts>)>> {
532        let mut epoch_hash = snapshot_epoch_id.clone();
533        let checkpoint = ctx
534            .manager
535            .graph
536            .data_man
537            .block_header_by_hash(snapshot_epoch_id)
538            .expect("checkpoint header must exist");
539        let epoch_receipts_count = if checkpoint.height() == 0 {
540            1
541        } else {
542            REWARD_EPOCH_COUNT
543        } as usize;
544        let mut receipts_vec_offset = 0;
545        let mut result = Vec::new();
546        for idx in 0..epoch_receipts_count {
547            let block_header = ctx
548                .manager
549                .graph
550                .data_man
551                .block_header_by_hash(&epoch_hash)
552                .expect("block header must exist");
553            let ordered_executable_epoch_blocks = ctx
554                .manager
555                .graph
556                .consensus
557                .get_block_hashes_by_epoch(EpochNumber::Number(
558                    block_header.height(),
559                ))
560                .expect("ordered executable epoch blocks must exist");
561            let mut epoch_receipts = Vec::new();
562            for i in 0..ordered_executable_epoch_blocks.len() {
563                if let Some(block_receipt) =
564                    block_receipts.get(receipts_vec_offset + i)
565                {
566                    epoch_receipts.push(block_receipt.block_receipts.clone());
567                } else {
568                    // Invalid block_receipts vector length.
569                    return None;
570                }
571            }
572            let receipt_root = compute_receipts_root(&epoch_receipts);
573            let logs_bloom_hash =
574                BlockHeaderBuilder::compute_block_logs_bloom_hash(
575                    &epoch_receipts,
576                );
577            if receipt_blame_vec[blame_vec_offset + idx] != receipt_root {
578                debug!(
579                    "wrong receipt root, expected={:?}, now={:?}",
580                    receipt_blame_vec[blame_vec_offset + idx],
581                    receipt_root
582                );
583                return None;
584            }
585            if bloom_blame_vec[blame_vec_offset + idx] != logs_bloom_hash {
586                debug!(
587                    "wrong logs bloom hash, expected={:?}, now={:?}",
588                    bloom_blame_vec[blame_vec_offset + idx],
589                    logs_bloom_hash
590                );
591                return None;
592            }
593            for i in 0..ordered_executable_epoch_blocks.len() {
594                result.push((
595                    ordered_executable_epoch_blocks[i],
596                    epoch_hash,
597                    epoch_receipts[i].clone(),
598                ));
599            }
600            receipts_vec_offset += ordered_executable_epoch_blocks.len();
601            epoch_hash = *block_header.parent_hash();
602        }
603        if receipts_vec_offset == block_receipts.len() {
604            Some(result)
605        } else {
606            None
607        }
608    }
609
610    pub fn on_peer_disconnected(&mut self, peer: &NodeId) {
611        self.active_peers.remove(peer);
612    }
613
614    fn note_failure(&mut self, node_id: &NodeId) {
615        self.active_peers.remove(node_id);
616    }
617}
618
619impl Debug for SnapshotManifestManager {
620    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
621        write!(
622            f,
623            "(request_status = {:?}, candidate={:?} active_peers: {})",
624            self.manifest_request_status,
625            self.snapshot_candidate,
626            self.active_peers.len(),
627        )
628    }
629}
630
631pub struct SnapshotManifestConfig {
632    pub manifest_request_timeout: Duration,
633}