1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
// Copyright 2019 Conflux Foundation. All rights reserved.
// Conflux is free software and distributed under GNU General Public License.
// See http://www.gnu.org/licenses/
/// Merge an MPT with sorted deletion and insertion stream.
/// The merge can be in-place or writing to a new MPT (save-as mode).
///
/// In the merging process, the merger keeps a path from root to a node. When
/// the next key to process from insertion or deletion stream lies outside the
/// subtree of the last node in the path, the last node is closed. New nodes are
/// open if the key goes further down extending the path. In in-place mode, node
/// deletions are executed and node writes happen when a modified node is
/// closed. In save-as mode, node deletion is no-op, and node writes happen
/// when a node is closed, and when opening the new node. All modified
/// node and skipped subtree are saved into the new MPT.
///
/// In a save-as copy, the base_mpt could be implemented as an iterator of the
/// original MPT because the original MPT is exactly visited in string order
/// by the path_db_key. (See definition of path_db_key below.)
// TODO(yz): In future merge can be made into multiple threads easily by merging
// different children in parallel then combine the root node.
pub struct MptMerger<'a> {
rw_cursor: MptCursorRw<
MergeMptsInRequest<'a>,
ReadWritePathNode<MergeMptsInRequest<'a>>,
>,
}
impl<'a> MptMerger<'a> {
pub fn new(
maybe_readonly_mpt: Option<&'a mut dyn SnapshotMptTraitReadAndIterate>,
out_mpt: &'a mut dyn SnapshotMptTraitRw,
) -> Self {
Self {
rw_cursor: MptCursorRw::new(MergeMptsInRequest {
maybe_readonly_mpt,
out_mpt,
}),
}
}
pub fn merge(
&mut self, inserter: &DumpedMptKvIterator,
) -> Result<MerkleHash> {
self.rw_cursor.load_root(false)?;
struct Merger<'x, 'a: 'x> {
merger: &'x mut MptMerger<'a>,
}
impl<'x, 'a: 'x> Merger<'x, 'a> {
fn merger_mut(&mut self) -> &mut MptMerger<'a> { self.merger }
}
impl<'x, 'a: 'x> KVInserter<MptKeyValue> for Merger<'x, 'a> {
fn push(&mut self, v: MptKeyValue) -> Result<()> {
let (key, value) = v;
if value.len() > 0 {
self.merger_mut().rw_cursor.insert(&key, value)?;
} else {
self.merger_mut().rw_cursor.delete(&key)?;
}
Ok(())
}
}
inserter.iterate(&mut Merger { merger: self })?;
self.rw_cursor.finish()
}
/// The iterators operate on key, value store.
pub fn merge_insertion_deletion_separated<'k>(
&mut self,
mut delete_keys_iter: impl FallibleIterator<
Item = (Vec<u8>, ()),
Error = Error,
>,
mut set_keys_iter: impl FallibleIterator<Item = MptKeyValue, Error = Error>,
in_reconstruct_snapshot_state: bool,
) -> Result<MerkleHash> {
self.rw_cursor.load_root(in_reconstruct_snapshot_state)?;
let mut key_to_delete = delete_keys_iter.next()?;
let mut key_value_to_set = set_keys_iter.next()?;
loop {
if key_to_delete.is_none() {
if key_value_to_set.is_some() {
let (key, value) = key_value_to_set.unwrap();
self.rw_cursor.insert(&key, value)?;
while let Some((key, value)) = set_keys_iter.next()? {
self.rw_cursor.insert(&key, value)?;
}
}
break;
};
if key_value_to_set.is_none() {
if key_to_delete.is_some() {
self.rw_cursor
.delete(&key_to_delete.as_ref().unwrap().0)?;
while let Some((key, _)) = delete_keys_iter.next()? {
self.rw_cursor.delete(&key)?;
}
}
break;
}
// In a diff, if there is a deletion of the same key of a insertion,
// delete only happens before the insertion because the inserted key
// value must present in the final merged result for it to be in the
// diff.
let key_delete = &key_to_delete.as_ref().unwrap().0;
if key_delete <= &key_value_to_set.as_ref().unwrap().0 {
self.rw_cursor.delete(key_delete)?;
key_to_delete = delete_keys_iter.next()?;
} else {
let (key_insert, value) = key_value_to_set.take().unwrap();
self.rw_cursor.insert(&key_insert, value)?;
key_value_to_set = set_keys_iter.next()?;
}
}
self.rw_cursor.finish()
}
}
struct MergeMptsInRequest<'a> {
maybe_readonly_mpt: Option<&'a mut dyn SnapshotMptTraitReadAndIterate>,
out_mpt: &'a mut dyn SnapshotMptTraitRw,
}
impl GetReadMpt for MergeMptsInRequest<'_> {
fn get_merkle_root(&self) -> MerkleHash {
if self.maybe_readonly_mpt.is_some() {
self.maybe_readonly_mpt.as_ref().unwrap().get_merkle_root()
} else {
self.out_mpt.get_merkle_root()
}
}
fn get_read_mpt(&mut self) -> &mut dyn SnapshotMptTraitRead {
if self.maybe_readonly_mpt.is_some() {
self.maybe_readonly_mpt.as_mut().unwrap().as_readonly()
} else {
self.out_mpt.as_readonly()
}
}
}
impl GetRwMpt for MergeMptsInRequest<'_> {
fn get_write_mpt(&mut self) -> &mut dyn SnapshotMptTraitRw { self.out_mpt }
fn get_write_and_read_mpt(
&mut self,
) -> (
&mut dyn SnapshotMptTraitRw,
Option<&mut dyn SnapshotMptTraitReadAndIterate>,
) {
(
self.out_mpt,
// Can't use map() here because it would be a compilation error.
match &mut self.maybe_readonly_mpt {
None => None,
Some(x) => Some(*x),
},
)
}
fn is_save_as_write(&self) -> bool { self.maybe_readonly_mpt.is_some() }
fn is_in_place_update(&self) -> bool { self.maybe_readonly_mpt.is_none() }
}
use crate::{
impls::{
errors::*,
merkle_patricia_trie::{mpt_cursor::*, KVInserter, MptKeyValue},
},
storage_db::snapshot_mpt::*,
tests::DumpedMptKvIterator,
};
use fallible_iterator::FallibleIterator;
use primitives::MerkleHash;