cfx_storage/impls/snapshot_sync/restoration/
full_sync_verifier.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5pub struct FullSyncVerifier<SnapshotDbManager: SnapshotDbManagerTrait> {
6    number_chunks: usize,
7    merkle_root: MerkleHash,
8    chunk_boundaries: Vec<Vec<u8>>,
9    chunk_boundary_proofs: Vec<TrieProof>,
10    chunk_verified: Vec<bool>,
11    number_incomplete_chunk: usize,
12
13    pending_boundary_nodes: HashMap<CompressedPathRaw, SnapshotMptNode>,
14    boundary_subtree_total_size: HashMap<BoundarySubtreeIndex, u64>,
15    chunk_index_by_upper_key: HashMap<Vec<u8>, usize>,
16
17    temp_snapshot_db: SnapshotDbManager::SnapshotDbWrite,
18}
19
20impl<SnapshotDbManager: SnapshotDbManagerTrait>
21    FullSyncVerifier<SnapshotDbManager>
22{
23    pub fn new(
24        number_chunks: usize, chunk_boundaries: Vec<Vec<u8>>,
25        chunk_boundary_proofs: Vec<TrieProof>, merkle_root: MerkleHash,
26        snapshot_db_manager: &SnapshotDbManager, epoch_id: &EpochId,
27        epoch_height: u64,
28    ) -> Result<Self> {
29        if number_chunks != chunk_boundaries.len() + 1 {
30            bail!(Error::InvalidSnapshotSyncProof)
31        }
32        if number_chunks != chunk_boundary_proofs.len() + 1 {
33            bail!(Error::InvalidSnapshotSyncProof)
34        }
35        let mut chunk_index_by_upper_key = HashMap::new();
36        for (chunk_index, (chunk_boundary, proof)) in chunk_boundaries
37            .iter()
38            .zip(chunk_boundary_proofs.iter())
39            .enumerate()
40        {
41            if merkle_root.ne(proof.get_merkle_root()) {
42                bail!(Error::InvalidSnapshotSyncProof)
43            }
44            // We don't want the proof to carry extra nodes.
45            if proof.number_leaf_nodes() != 1 {
46                bail!(Error::InvalidSnapshotSyncProof)
47            }
48            if proof.if_proves_key(&*chunk_boundary)
49                != (true, proof.get_proof_nodes().last())
50            {
51                bail!(Error::InvalidSnapshotSyncProof)
52            }
53            chunk_index_by_upper_key
54                .insert(chunk_boundary.clone(), chunk_index);
55        }
56
57        Ok(Self {
58            number_chunks,
59            merkle_root,
60            chunk_boundaries,
61            chunk_boundary_proofs,
62            chunk_verified: vec![false; number_chunks],
63            number_incomplete_chunk: number_chunks,
64            pending_boundary_nodes: Default::default(),
65            boundary_subtree_total_size: Default::default(),
66            chunk_index_by_upper_key,
67            temp_snapshot_db: snapshot_db_manager
68                .new_temp_snapshot_for_full_sync(
69                    epoch_id,
70                    &merkle_root,
71                    epoch_height,
72                )?,
73        })
74    }
75
76    pub fn is_completed(&self) -> bool { self.number_incomplete_chunk == 0 }
77
78    // FIXME: multi-threading, where &mut can be dropped.
79    pub fn restore_chunk<Key: Borrow<[u8]> + Debug>(
80        &mut self, chunk_upper_key: &Option<Vec<u8>>, keys: &Vec<Key>,
81        values: Vec<Vec<u8>>,
82    ) -> Result<bool> {
83        let chunk_index = match chunk_upper_key {
84            None => self.number_chunks - 1,
85            Some(upper_key) => {
86                match self.chunk_index_by_upper_key.get(upper_key) {
87                    Some(index) => *index,
88                    None => {
89                        warn!("chunk key {:?} does not match boundaries in manifest", upper_key);
90                        return Ok(false);
91                    }
92                }
93            }
94        };
95        // Check key monotone.
96        if !keys.is_empty() {
97            let mut previous = keys.first().unwrap();
98            for key in &keys[1..] {
99                if key.borrow().le(previous.borrow()) {
100                    warn!("chunk key not in order");
101                    return Ok(false);
102                }
103                previous = key;
104            }
105        }
106
107        let key_range_left;
108        let maybe_key_range_right_excl;
109        let maybe_left_proof;
110        let maybe_right_proof;
111        if chunk_index == 0 {
112            key_range_left = vec![];
113            maybe_left_proof = None;
114        } else {
115            key_range_left = self.chunk_boundaries[chunk_index - 1].clone();
116            maybe_left_proof = self.chunk_boundary_proofs.get(chunk_index - 1);
117
118            // Check key boundary.
119            if let Some(first_key) = keys.first() {
120                if first_key.borrow().lt(&*key_range_left) {
121                    warn!(
122                        "first chunk key {:?} less than left range {:?}",
123                        first_key, key_range_left
124                    );
125                    return Ok(false);
126                }
127            }
128        };
129        if chunk_index == self.number_chunks - 1 {
130            maybe_key_range_right_excl = None;
131            maybe_right_proof = None;
132        } else {
133            let key_range_right_excl =
134                self.chunk_boundaries[chunk_index].clone();
135            maybe_right_proof = self.chunk_boundary_proofs.get(chunk_index);
136
137            // Check key boundary.
138            if let Some(last_key) = keys.last() {
139                if last_key.borrow().ge(&*key_range_right_excl) {
140                    warn!(
141                        "last chunk key {:?} larger than left range {:?}",
142                        last_key, key_range_right_excl,
143                    );
144                    return Ok(false);
145                }
146            }
147
148            maybe_key_range_right_excl = Some(key_range_right_excl);
149        }
150
151        // FIXME: multi-threading.
152        // Restore.
153        let chunk_verifier = MptSliceVerifier::new(
154            maybe_left_proof,
155            &*key_range_left,
156            maybe_right_proof,
157            maybe_key_range_right_excl.as_ref().map(|v| &**v),
158            self.merkle_root.clone(),
159        );
160
161        let chunk_rebuilder = chunk_verifier.restore(keys, &values)?;
162        if chunk_rebuilder.is_valid {
163            self.chunk_verified[chunk_index] = true;
164            self.number_incomplete_chunk -= 1;
165
166            self.temp_snapshot_db.start_transaction()?;
167            // Commit key-values.
168            for (key, value) in keys.into_iter().zip(values.into_iter()) {
169                self.temp_snapshot_db.put_kv(key.borrow(), &*value)?;
170            }
171
172            // Commit inner nodes.
173            let mut snapshot_mpt =
174                self.temp_snapshot_db.open_snapshot_mpt_owned()?;
175            for (path, node) in chunk_rebuilder.inner_nodes_to_write {
176                snapshot_mpt.write_node(&path, &node)?;
177            }
178            drop(snapshot_mpt);
179            self.temp_snapshot_db.commit_transaction()?;
180
181            // Combine changes around boundary nodes.
182            for (path, node) in chunk_rebuilder.boundary_nodes {
183                let mut children_table = VanillaChildrenTable::default();
184                unsafe {
185                    for (child_index, merkle_ref) in
186                        node.get_children_table_ref().iter()
187                    {
188                        *children_table.get_child_mut_unchecked(child_index) =
189                            SubtreeMerkleWithSize {
190                                merkle: *merkle_ref,
191                                subtree_size: 0,
192                                delta_subtree_size: 0,
193                            }
194                    }
195                    *children_table.get_children_count_mut() =
196                        node.get_children_count();
197                }
198                self.pending_boundary_nodes.insert(
199                    path,
200                    SnapshotMptNode(VanillaTrieNode::new(
201                        node.get_merkle().clone(),
202                        children_table,
203                        node.value_as_slice()
204                            .into_option()
205                            .map(|ref_v| ref_v.into()),
206                        node.compressed_path_ref().into(),
207                    )),
208                );
209            }
210            for (subtree_index, subtree_size) in
211                chunk_rebuilder.boundary_subtree_total_size
212            {
213                *self
214                    .boundary_subtree_total_size
215                    .entry(subtree_index)
216                    .or_default() += subtree_size;
217            }
218        }
219
220        if self.is_completed() {
221            self.finalize()?
222        }
223
224        Ok(chunk_rebuilder.is_valid)
225    }
226
227    // FIXME: multi-threading
228    /// Combine and write boundary subtree nodes after all chunks have been
229    /// completed.
230    pub fn finalize(&mut self) -> Result<()> {
231        self.temp_snapshot_db.start_transaction()?;
232        let mut snapshot_mpt =
233            self.temp_snapshot_db.open_snapshot_mpt_owned()?;
234
235        for (path, mut node) in self.pending_boundary_nodes.drain() {
236            let mut subtree_index = BoundarySubtreeIndex {
237                parent_node: node.get_merkle().clone(),
238                child_index: 0,
239            };
240            for child_index in 0..CHILDREN_COUNT as u8 {
241                subtree_index.child_index = child_index;
242                if let Some(subtree_size) =
243                    self.boundary_subtree_total_size.get(&subtree_index)
244                {
245                    // Actually safe.
246                    unsafe {
247                        node.get_child_mut_unchecked(child_index)
248                            .subtree_size = *subtree_size;
249                    }
250                }
251            }
252
253            snapshot_mpt.write_node(&path, &node)?;
254        }
255
256        drop(snapshot_mpt);
257        self.temp_snapshot_db.commit_transaction()?;
258        Ok(())
259    }
260}
261
262use crate::{
263    impls::{
264        errors::*,
265        merkle_patricia_trie::{
266            trie_node::TrieNodeTrait, CompressedPathRaw, VanillaChildrenTable,
267            VanillaTrieNode, CHILDREN_COUNT,
268        },
269        snapshot_sync::restoration::mpt_slice_verifier::{
270            BoundarySubtreeIndex, MptSliceVerifier,
271        },
272    },
273    storage_db::{
274        SnapshotDbManagerTrait, SnapshotDbWriteableTrait, SnapshotMptNode,
275        SnapshotMptTraitRw, SubtreeMerkleWithSize,
276    },
277    TrieProof,
278};
279use primitives::{EpochId, MerkleHash};
280use std::{borrow::Borrow, collections::HashMap, fmt::Debug};