cfx_storage/impls/snapshot_sync/restoration/
full_sync_verifier.rspub struct FullSyncVerifier<SnapshotDbManager: SnapshotDbManagerTrait> {
    number_chunks: usize,
    merkle_root: MerkleHash,
    chunk_boundaries: Vec<Vec<u8>>,
    chunk_boundary_proofs: Vec<TrieProof>,
    chunk_verified: Vec<bool>,
    number_incomplete_chunk: usize,
    pending_boundary_nodes: HashMap<CompressedPathRaw, SnapshotMptNode>,
    boundary_subtree_total_size: HashMap<BoundarySubtreeIndex, u64>,
    chunk_index_by_upper_key: HashMap<Vec<u8>, usize>,
    temp_snapshot_db: SnapshotDbManager::SnapshotDbWrite,
}
impl<SnapshotDbManager: SnapshotDbManagerTrait>
    FullSyncVerifier<SnapshotDbManager>
{
    pub fn new(
        number_chunks: usize, chunk_boundaries: Vec<Vec<u8>>,
        chunk_boundary_proofs: Vec<TrieProof>, merkle_root: MerkleHash,
        snapshot_db_manager: &SnapshotDbManager, epoch_id: &EpochId,
        epoch_height: u64,
    ) -> Result<Self> {
        if number_chunks != chunk_boundaries.len() + 1 {
            bail!(Error::InvalidSnapshotSyncProof)
        }
        if number_chunks != chunk_boundary_proofs.len() + 1 {
            bail!(Error::InvalidSnapshotSyncProof)
        }
        let mut chunk_index_by_upper_key = HashMap::new();
        for (chunk_index, (chunk_boundary, proof)) in chunk_boundaries
            .iter()
            .zip(chunk_boundary_proofs.iter())
            .enumerate()
        {
            if merkle_root.ne(proof.get_merkle_root()) {
                bail!(Error::InvalidSnapshotSyncProof)
            }
            if proof.number_leaf_nodes() != 1 {
                bail!(Error::InvalidSnapshotSyncProof)
            }
            if proof.if_proves_key(&*chunk_boundary)
                != (true, proof.get_proof_nodes().last())
            {
                bail!(Error::InvalidSnapshotSyncProof)
            }
            chunk_index_by_upper_key
                .insert(chunk_boundary.clone(), chunk_index);
        }
        Ok(Self {
            number_chunks,
            merkle_root,
            chunk_boundaries,
            chunk_boundary_proofs,
            chunk_verified: vec![false; number_chunks],
            number_incomplete_chunk: number_chunks,
            pending_boundary_nodes: Default::default(),
            boundary_subtree_total_size: Default::default(),
            chunk_index_by_upper_key,
            temp_snapshot_db: snapshot_db_manager
                .new_temp_snapshot_for_full_sync(
                    epoch_id,
                    &merkle_root,
                    epoch_height,
                )?,
        })
    }
    pub fn is_completed(&self) -> bool { self.number_incomplete_chunk == 0 }
    pub fn restore_chunk<Key: Borrow<[u8]> + Debug>(
        &mut self, chunk_upper_key: &Option<Vec<u8>>, keys: &Vec<Key>,
        values: Vec<Vec<u8>>,
    ) -> Result<bool> {
        let chunk_index = match chunk_upper_key {
            None => self.number_chunks - 1,
            Some(upper_key) => {
                match self.chunk_index_by_upper_key.get(upper_key) {
                    Some(index) => *index,
                    None => {
                        warn!("chunk key {:?} does not match boundaries in manifest", upper_key);
                        return Ok(false);
                    }
                }
            }
        };
        if !keys.is_empty() {
            let mut previous = keys.first().unwrap();
            for key in &keys[1..] {
                if key.borrow().le(previous.borrow()) {
                    warn!("chunk key not in order");
                    return Ok(false);
                }
                previous = key;
            }
        }
        let key_range_left;
        let maybe_key_range_right_excl;
        let maybe_left_proof;
        let maybe_right_proof;
        if chunk_index == 0 {
            key_range_left = vec![];
            maybe_left_proof = None;
        } else {
            key_range_left = self.chunk_boundaries[chunk_index - 1].clone();
            maybe_left_proof = self.chunk_boundary_proofs.get(chunk_index - 1);
            if let Some(first_key) = keys.first() {
                if first_key.borrow().lt(&*key_range_left) {
                    warn!(
                        "first chunk key {:?} less than left range {:?}",
                        first_key, key_range_left
                    );
                    return Ok(false);
                }
            }
        };
        if chunk_index == self.number_chunks - 1 {
            maybe_key_range_right_excl = None;
            maybe_right_proof = None;
        } else {
            let key_range_right_excl =
                self.chunk_boundaries[chunk_index].clone();
            maybe_right_proof = self.chunk_boundary_proofs.get(chunk_index);
            if let Some(last_key) = keys.last() {
                if last_key.borrow().ge(&*key_range_right_excl) {
                    warn!(
                        "last chunk key {:?} larger than left range {:?}",
                        last_key, key_range_right_excl,
                    );
                    return Ok(false);
                }
            }
            maybe_key_range_right_excl = Some(key_range_right_excl);
        }
        let chunk_verifier = MptSliceVerifier::new(
            maybe_left_proof,
            &*key_range_left,
            maybe_right_proof,
            maybe_key_range_right_excl.as_ref().map(|v| &**v),
            self.merkle_root.clone(),
        );
        let chunk_rebuilder = chunk_verifier.restore(keys, &values)?;
        if chunk_rebuilder.is_valid {
            self.chunk_verified[chunk_index] = true;
            self.number_incomplete_chunk -= 1;
            self.temp_snapshot_db.start_transaction()?;
            for (key, value) in keys.into_iter().zip(values.into_iter()) {
                self.temp_snapshot_db.put_kv(key.borrow(), &*value)?;
            }
            let mut snapshot_mpt =
                self.temp_snapshot_db.open_snapshot_mpt_owned()?;
            for (path, node) in chunk_rebuilder.inner_nodes_to_write {
                snapshot_mpt.write_node(&path, &node)?;
            }
            drop(snapshot_mpt);
            self.temp_snapshot_db.commit_transaction()?;
            for (path, node) in chunk_rebuilder.boundary_nodes {
                let mut children_table = VanillaChildrenTable::default();
                unsafe {
                    for (child_index, merkle_ref) in
                        node.get_children_table_ref().iter()
                    {
                        *children_table.get_child_mut_unchecked(child_index) =
                            SubtreeMerkleWithSize {
                                merkle: *merkle_ref,
                                subtree_size: 0,
                                delta_subtree_size: 0,
                            }
                    }
                    *children_table.get_children_count_mut() =
                        node.get_children_count();
                }
                self.pending_boundary_nodes.insert(
                    path,
                    SnapshotMptNode(VanillaTrieNode::new(
                        node.get_merkle().clone(),
                        children_table,
                        node.value_as_slice()
                            .into_option()
                            .map(|ref_v| ref_v.into()),
                        node.compressed_path_ref().into(),
                    )),
                );
            }
            for (subtree_index, subtree_size) in
                chunk_rebuilder.boundary_subtree_total_size
            {
                *self
                    .boundary_subtree_total_size
                    .entry(subtree_index)
                    .or_default() += subtree_size;
            }
        }
        if self.is_completed() {
            self.finalize()?
        }
        Ok(chunk_rebuilder.is_valid)
    }
    pub fn finalize(&mut self) -> Result<()> {
        self.temp_snapshot_db.start_transaction()?;
        let mut snapshot_mpt =
            self.temp_snapshot_db.open_snapshot_mpt_owned()?;
        for (path, mut node) in self.pending_boundary_nodes.drain() {
            let mut subtree_index = BoundarySubtreeIndex {
                parent_node: node.get_merkle().clone(),
                child_index: 0,
            };
            for child_index in 0..CHILDREN_COUNT as u8 {
                subtree_index.child_index = child_index;
                if let Some(subtree_size) =
                    self.boundary_subtree_total_size.get(&subtree_index)
                {
                    unsafe {
                        node.get_child_mut_unchecked(child_index)
                            .subtree_size = *subtree_size;
                    }
                }
            }
            snapshot_mpt.write_node(&path, &node)?;
        }
        drop(snapshot_mpt);
        self.temp_snapshot_db.commit_transaction()?;
        Ok(())
    }
}
use crate::{
    impls::{
        errors::*,
        merkle_patricia_trie::{
            trie_node::TrieNodeTrait, CompressedPathRaw, VanillaChildrenTable,
            VanillaTrieNode, CHILDREN_COUNT,
        },
        snapshot_sync::restoration::mpt_slice_verifier::{
            BoundarySubtreeIndex, MptSliceVerifier,
        },
    },
    storage_db::{
        SnapshotDbManagerTrait, SnapshotDbWriteableTrait, SnapshotMptNode,
        SnapshotMptTraitRw, SubtreeMerkleWithSize,
    },
    TrieProof,
};
use primitives::{EpochId, MerkleHash};
use std::{borrow::Borrow, collections::HashMap, fmt::Debug};