cfx_storage/impls/snapshot_sync/restoration/
full_sync_verifier.rs1pub struct FullSyncVerifier<SnapshotDbManager: SnapshotDbManagerTrait> {
6 number_chunks: usize,
7 merkle_root: MerkleHash,
8 chunk_boundaries: Vec<Vec<u8>>,
9 chunk_boundary_proofs: Vec<TrieProof>,
10 chunk_verified: Vec<bool>,
11 number_incomplete_chunk: usize,
12
13 pending_boundary_nodes: HashMap<CompressedPathRaw, SnapshotMptNode>,
14 boundary_subtree_total_size: HashMap<BoundarySubtreeIndex, u64>,
15 chunk_index_by_upper_key: HashMap<Vec<u8>, usize>,
16
17 temp_snapshot_db: SnapshotDbManager::SnapshotDbWrite,
18}
19
20impl<SnapshotDbManager: SnapshotDbManagerTrait>
21 FullSyncVerifier<SnapshotDbManager>
22{
23 pub fn new(
24 number_chunks: usize, chunk_boundaries: Vec<Vec<u8>>,
25 chunk_boundary_proofs: Vec<TrieProof>, merkle_root: MerkleHash,
26 snapshot_db_manager: &SnapshotDbManager, epoch_id: &EpochId,
27 epoch_height: u64,
28 ) -> Result<Self> {
29 if number_chunks != chunk_boundaries.len() + 1 {
30 bail!(Error::InvalidSnapshotSyncProof)
31 }
32 if number_chunks != chunk_boundary_proofs.len() + 1 {
33 bail!(Error::InvalidSnapshotSyncProof)
34 }
35 let mut chunk_index_by_upper_key = HashMap::new();
36 for (chunk_index, (chunk_boundary, proof)) in chunk_boundaries
37 .iter()
38 .zip(chunk_boundary_proofs.iter())
39 .enumerate()
40 {
41 if merkle_root.ne(proof.get_merkle_root()) {
42 bail!(Error::InvalidSnapshotSyncProof)
43 }
44 if proof.number_leaf_nodes() != 1 {
46 bail!(Error::InvalidSnapshotSyncProof)
47 }
48 if proof.if_proves_key(&*chunk_boundary)
49 != (true, proof.get_proof_nodes().last())
50 {
51 bail!(Error::InvalidSnapshotSyncProof)
52 }
53 chunk_index_by_upper_key
54 .insert(chunk_boundary.clone(), chunk_index);
55 }
56
57 Ok(Self {
58 number_chunks,
59 merkle_root,
60 chunk_boundaries,
61 chunk_boundary_proofs,
62 chunk_verified: vec![false; number_chunks],
63 number_incomplete_chunk: number_chunks,
64 pending_boundary_nodes: Default::default(),
65 boundary_subtree_total_size: Default::default(),
66 chunk_index_by_upper_key,
67 temp_snapshot_db: snapshot_db_manager
68 .new_temp_snapshot_for_full_sync(
69 epoch_id,
70 &merkle_root,
71 epoch_height,
72 )?,
73 })
74 }
75
76 pub fn is_completed(&self) -> bool { self.number_incomplete_chunk == 0 }
77
78 pub fn restore_chunk<Key: Borrow<[u8]> + Debug>(
80 &mut self, chunk_upper_key: &Option<Vec<u8>>, keys: &Vec<Key>,
81 values: Vec<Vec<u8>>,
82 ) -> Result<bool> {
83 let chunk_index = match chunk_upper_key {
84 None => self.number_chunks - 1,
85 Some(upper_key) => {
86 match self.chunk_index_by_upper_key.get(upper_key) {
87 Some(index) => *index,
88 None => {
89 warn!("chunk key {:?} does not match boundaries in manifest", upper_key);
90 return Ok(false);
91 }
92 }
93 }
94 };
95 if !keys.is_empty() {
97 let mut previous = keys.first().unwrap();
98 for key in &keys[1..] {
99 if key.borrow().le(previous.borrow()) {
100 warn!("chunk key not in order");
101 return Ok(false);
102 }
103 previous = key;
104 }
105 }
106
107 let key_range_left;
108 let maybe_key_range_right_excl;
109 let maybe_left_proof;
110 let maybe_right_proof;
111 if chunk_index == 0 {
112 key_range_left = vec![];
113 maybe_left_proof = None;
114 } else {
115 key_range_left = self.chunk_boundaries[chunk_index - 1].clone();
116 maybe_left_proof = self.chunk_boundary_proofs.get(chunk_index - 1);
117
118 if let Some(first_key) = keys.first() {
120 if first_key.borrow().lt(&*key_range_left) {
121 warn!(
122 "first chunk key {:?} less than left range {:?}",
123 first_key, key_range_left
124 );
125 return Ok(false);
126 }
127 }
128 };
129 if chunk_index == self.number_chunks - 1 {
130 maybe_key_range_right_excl = None;
131 maybe_right_proof = None;
132 } else {
133 let key_range_right_excl =
134 self.chunk_boundaries[chunk_index].clone();
135 maybe_right_proof = self.chunk_boundary_proofs.get(chunk_index);
136
137 if let Some(last_key) = keys.last() {
139 if last_key.borrow().ge(&*key_range_right_excl) {
140 warn!(
141 "last chunk key {:?} larger than left range {:?}",
142 last_key, key_range_right_excl,
143 );
144 return Ok(false);
145 }
146 }
147
148 maybe_key_range_right_excl = Some(key_range_right_excl);
149 }
150
151 let chunk_verifier = MptSliceVerifier::new(
154 maybe_left_proof,
155 &*key_range_left,
156 maybe_right_proof,
157 maybe_key_range_right_excl.as_ref().map(|v| &**v),
158 self.merkle_root.clone(),
159 );
160
161 let chunk_rebuilder = chunk_verifier.restore(keys, &values)?;
162 if chunk_rebuilder.is_valid {
163 self.chunk_verified[chunk_index] = true;
164 self.number_incomplete_chunk -= 1;
165
166 self.temp_snapshot_db.start_transaction()?;
167 for (key, value) in keys.into_iter().zip(values.into_iter()) {
169 self.temp_snapshot_db.put_kv(key.borrow(), &*value)?;
170 }
171
172 let mut snapshot_mpt =
174 self.temp_snapshot_db.open_snapshot_mpt_owned()?;
175 for (path, node) in chunk_rebuilder.inner_nodes_to_write {
176 snapshot_mpt.write_node(&path, &node)?;
177 }
178 drop(snapshot_mpt);
179 self.temp_snapshot_db.commit_transaction()?;
180
181 for (path, node) in chunk_rebuilder.boundary_nodes {
183 let mut children_table = VanillaChildrenTable::default();
184 unsafe {
185 for (child_index, merkle_ref) in
186 node.get_children_table_ref().iter()
187 {
188 *children_table.get_child_mut_unchecked(child_index) =
189 SubtreeMerkleWithSize {
190 merkle: *merkle_ref,
191 subtree_size: 0,
192 delta_subtree_size: 0,
193 }
194 }
195 *children_table.get_children_count_mut() =
196 node.get_children_count();
197 }
198 self.pending_boundary_nodes.insert(
199 path,
200 SnapshotMptNode(VanillaTrieNode::new(
201 node.get_merkle().clone(),
202 children_table,
203 node.value_as_slice()
204 .into_option()
205 .map(|ref_v| ref_v.into()),
206 node.compressed_path_ref().into(),
207 )),
208 );
209 }
210 for (subtree_index, subtree_size) in
211 chunk_rebuilder.boundary_subtree_total_size
212 {
213 *self
214 .boundary_subtree_total_size
215 .entry(subtree_index)
216 .or_default() += subtree_size;
217 }
218 }
219
220 if self.is_completed() {
221 self.finalize()?
222 }
223
224 Ok(chunk_rebuilder.is_valid)
225 }
226
227 pub fn finalize(&mut self) -> Result<()> {
231 self.temp_snapshot_db.start_transaction()?;
232 let mut snapshot_mpt =
233 self.temp_snapshot_db.open_snapshot_mpt_owned()?;
234
235 for (path, mut node) in self.pending_boundary_nodes.drain() {
236 let mut subtree_index = BoundarySubtreeIndex {
237 parent_node: node.get_merkle().clone(),
238 child_index: 0,
239 };
240 for child_index in 0..CHILDREN_COUNT as u8 {
241 subtree_index.child_index = child_index;
242 if let Some(subtree_size) =
243 self.boundary_subtree_total_size.get(&subtree_index)
244 {
245 unsafe {
247 node.get_child_mut_unchecked(child_index)
248 .subtree_size = *subtree_size;
249 }
250 }
251 }
252
253 snapshot_mpt.write_node(&path, &node)?;
254 }
255
256 drop(snapshot_mpt);
257 self.temp_snapshot_db.commit_transaction()?;
258 Ok(())
259 }
260}
261
262use crate::{
263 impls::{
264 errors::*,
265 merkle_patricia_trie::{
266 trie_node::TrieNodeTrait, CompressedPathRaw, VanillaChildrenTable,
267 VanillaTrieNode, CHILDREN_COUNT,
268 },
269 snapshot_sync::restoration::mpt_slice_verifier::{
270 BoundarySubtreeIndex, MptSliceVerifier,
271 },
272 },
273 storage_db::{
274 SnapshotDbManagerTrait, SnapshotDbWriteableTrait, SnapshotMptNode,
275 SnapshotMptTraitRw, SubtreeMerkleWithSize,
276 },
277 TrieProof,
278};
279use primitives::{EpochId, MerkleHash};
280use std::{borrow::Borrow, collections::HashMap, fmt::Debug};