pub struct FullSyncVerifier<SnapshotDbManager: SnapshotDbManagerTrait> {
number_chunks: usize,
merkle_root: MerkleHash,
chunk_boundaries: Vec<Vec<u8>>,
chunk_boundary_proofs: Vec<TrieProof>,
chunk_verified: Vec<bool>,
number_incomplete_chunk: usize,
pending_boundary_nodes: HashMap<CompressedPathRaw, SnapshotMptNode>,
boundary_subtree_total_size: HashMap<BoundarySubtreeIndex, u64>,
chunk_index_by_upper_key: HashMap<Vec<u8>, usize>,
temp_snapshot_db: SnapshotDbManager::SnapshotDbWrite,
}
impl<SnapshotDbManager: SnapshotDbManagerTrait>
FullSyncVerifier<SnapshotDbManager>
{
pub fn new(
number_chunks: usize, chunk_boundaries: Vec<Vec<u8>>,
chunk_boundary_proofs: Vec<TrieProof>, merkle_root: MerkleHash,
snapshot_db_manager: &SnapshotDbManager, epoch_id: &EpochId,
epoch_height: u64,
) -> Result<Self> {
if number_chunks != chunk_boundaries.len() + 1 {
bail!(Error::InvalidSnapshotSyncProof)
}
if number_chunks != chunk_boundary_proofs.len() + 1 {
bail!(Error::InvalidSnapshotSyncProof)
}
let mut chunk_index_by_upper_key = HashMap::new();
for (chunk_index, (chunk_boundary, proof)) in chunk_boundaries
.iter()
.zip(chunk_boundary_proofs.iter())
.enumerate()
{
if merkle_root.ne(proof.get_merkle_root()) {
bail!(Error::InvalidSnapshotSyncProof)
}
if proof.number_leaf_nodes() != 1 {
bail!(Error::InvalidSnapshotSyncProof)
}
if proof.if_proves_key(&*chunk_boundary)
!= (true, proof.get_proof_nodes().last())
{
bail!(Error::InvalidSnapshotSyncProof)
}
chunk_index_by_upper_key
.insert(chunk_boundary.clone(), chunk_index);
}
Ok(Self {
number_chunks,
merkle_root,
chunk_boundaries,
chunk_boundary_proofs,
chunk_verified: vec![false; number_chunks],
number_incomplete_chunk: number_chunks,
pending_boundary_nodes: Default::default(),
boundary_subtree_total_size: Default::default(),
chunk_index_by_upper_key,
temp_snapshot_db: snapshot_db_manager
.new_temp_snapshot_for_full_sync(
epoch_id,
&merkle_root,
epoch_height,
)?,
})
}
pub fn is_completed(&self) -> bool { self.number_incomplete_chunk == 0 }
pub fn restore_chunk<Key: Borrow<[u8]> + Debug>(
&mut self, chunk_upper_key: &Option<Vec<u8>>, keys: &Vec<Key>,
values: Vec<Vec<u8>>,
) -> Result<bool> {
let chunk_index = match chunk_upper_key {
None => self.number_chunks - 1,
Some(upper_key) => {
match self.chunk_index_by_upper_key.get(upper_key) {
Some(index) => *index,
None => {
warn!("chunk key {:?} does not match boundaries in manifest", upper_key);
return Ok(false);
}
}
}
};
if !keys.is_empty() {
let mut previous = keys.first().unwrap();
for key in &keys[1..] {
if key.borrow().le(previous.borrow()) {
warn!("chunk key not in order");
return Ok(false);
}
previous = key;
}
}
let key_range_left;
let maybe_key_range_right_excl;
let maybe_left_proof;
let maybe_right_proof;
if chunk_index == 0 {
key_range_left = vec![];
maybe_left_proof = None;
} else {
key_range_left = self.chunk_boundaries[chunk_index - 1].clone();
maybe_left_proof = self.chunk_boundary_proofs.get(chunk_index - 1);
if let Some(first_key) = keys.first() {
if first_key.borrow().lt(&*key_range_left) {
warn!(
"first chunk key {:?} less than left range {:?}",
first_key, key_range_left
);
return Ok(false);
}
}
};
if chunk_index == self.number_chunks - 1 {
maybe_key_range_right_excl = None;
maybe_right_proof = None;
} else {
let key_range_right_excl =
self.chunk_boundaries[chunk_index].clone();
maybe_right_proof = self.chunk_boundary_proofs.get(chunk_index);
if let Some(last_key) = keys.last() {
if last_key.borrow().ge(&*key_range_right_excl) {
warn!(
"last chunk key {:?} larger than left range {:?}",
last_key, key_range_right_excl,
);
return Ok(false);
}
}
maybe_key_range_right_excl = Some(key_range_right_excl);
}
let chunk_verifier = MptSliceVerifier::new(
maybe_left_proof,
&*key_range_left,
maybe_right_proof,
maybe_key_range_right_excl.as_ref().map(|v| &**v),
self.merkle_root.clone(),
);
let chunk_rebuilder = chunk_verifier.restore(keys, &values)?;
if chunk_rebuilder.is_valid {
self.chunk_verified[chunk_index] = true;
self.number_incomplete_chunk -= 1;
self.temp_snapshot_db.start_transaction()?;
for (key, value) in keys.into_iter().zip(values.into_iter()) {
self.temp_snapshot_db.put_kv(key.borrow(), &*value)?;
}
let mut snapshot_mpt =
self.temp_snapshot_db.open_snapshot_mpt_owned()?;
for (path, node) in chunk_rebuilder.inner_nodes_to_write {
snapshot_mpt.write_node(&path, &node)?;
}
drop(snapshot_mpt);
self.temp_snapshot_db.commit_transaction()?;
for (path, node) in chunk_rebuilder.boundary_nodes {
let mut children_table = VanillaChildrenTable::default();
unsafe {
for (child_index, merkle_ref) in
node.get_children_table_ref().iter()
{
*children_table.get_child_mut_unchecked(child_index) =
SubtreeMerkleWithSize {
merkle: *merkle_ref,
subtree_size: 0,
delta_subtree_size: 0,
}
}
*children_table.get_children_count_mut() =
node.get_children_count();
}
self.pending_boundary_nodes.insert(
path,
SnapshotMptNode(VanillaTrieNode::new(
node.get_merkle().clone(),
children_table,
node.value_as_slice()
.into_option()
.map(|ref_v| ref_v.into()),
node.compressed_path_ref().into(),
)),
);
}
for (subtree_index, subtree_size) in
chunk_rebuilder.boundary_subtree_total_size
{
*self
.boundary_subtree_total_size
.entry(subtree_index)
.or_default() += subtree_size;
}
}
if self.is_completed() {
self.finalize()?
}
Ok(chunk_rebuilder.is_valid)
}
pub fn finalize(&mut self) -> Result<()> {
self.temp_snapshot_db.start_transaction()?;
let mut snapshot_mpt =
self.temp_snapshot_db.open_snapshot_mpt_owned()?;
for (path, mut node) in self.pending_boundary_nodes.drain() {
let mut subtree_index = BoundarySubtreeIndex {
parent_node: node.get_merkle().clone(),
child_index: 0,
};
for child_index in 0..CHILDREN_COUNT as u8 {
subtree_index.child_index = child_index;
if let Some(subtree_size) =
self.boundary_subtree_total_size.get(&subtree_index)
{
unsafe {
node.get_child_mut_unchecked(child_index)
.subtree_size = *subtree_size;
}
}
}
snapshot_mpt.write_node(&path, &node)?;
}
drop(snapshot_mpt);
self.temp_snapshot_db.commit_transaction()?;
Ok(())
}
}
use crate::{
impls::{
errors::*,
merkle_patricia_trie::{
trie_node::TrieNodeTrait, CompressedPathRaw, VanillaChildrenTable,
VanillaTrieNode, CHILDREN_COUNT,
},
snapshot_sync::restoration::mpt_slice_verifier::{
BoundarySubtreeIndex, MptSliceVerifier,
},
},
storage_db::{
SnapshotDbManagerTrait, SnapshotDbWriteableTrait, SnapshotMptNode,
SnapshotMptTraitRw, SubtreeMerkleWithSize,
},
TrieProof,
};
use primitives::{EpochId, MerkleHash};
use std::{borrow::Borrow, collections::HashMap, fmt::Debug};