1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
// Copyright 2019 Conflux Foundation. All rights reserved.
// Conflux is free software and distributed under GNU General Public License.
// See http://www.gnu.org/licenses/

/// Merge an MPT with sorted deletion and insertion stream.
/// The merge can be in-place or writing to a new MPT (save-as mode).
///
/// In the merging process, the merger keeps a path from root to a node. When
/// the next key to process from insertion or deletion stream lies outside the
/// subtree of the last node in the path, the last node is closed. New nodes are
/// open if the key goes further down extending the path. In in-place mode, node
/// deletions are executed and node writes happen when a modified node is
/// closed. In save-as mode, node deletion is no-op, and node writes happen
/// when a node is closed, and when opening the new node. All modified
/// node and skipped subtree are saved into the new MPT.
///
/// In a save-as copy, the base_mpt could be implemented as an iterator of the
/// original MPT because the original MPT is exactly visited in string order
/// by the path_db_key. (See definition of path_db_key below.)
// TODO(yz): In future merge can be made into multiple threads easily by merging
// different children in parallel then combine the root node.
pub struct MptMerger<'a> {
    rw_cursor: MptCursorRw<
        MergeMptsInRequest<'a>,
        ReadWritePathNode<MergeMptsInRequest<'a>>,
    >,
}

impl<'a> MptMerger<'a> {
    pub fn new(
        maybe_readonly_mpt: Option<&'a mut dyn SnapshotMptTraitReadAndIterate>,
        out_mpt: &'a mut dyn SnapshotMptTraitRw,
    ) -> Self {
        Self {
            rw_cursor: MptCursorRw::new(MergeMptsInRequest {
                maybe_readonly_mpt,
                out_mpt,
            }),
        }
    }

    pub fn merge(
        &mut self, inserter: &DumpedMptKvIterator,
    ) -> Result<MerkleHash> {
        self.rw_cursor.load_root(false)?;

        struct Merger<'x, 'a: 'x> {
            merger: &'x mut MptMerger<'a>,
        }

        impl<'x, 'a: 'x> Merger<'x, 'a> {
            fn merger_mut(&mut self) -> &mut MptMerger<'a> { self.merger }
        }

        impl<'x, 'a: 'x> KVInserter<MptKeyValue> for Merger<'x, 'a> {
            fn push(&mut self, v: MptKeyValue) -> Result<()> {
                let (key, value) = v;
                if value.len() > 0 {
                    self.merger_mut().rw_cursor.insert(&key, value)?;
                } else {
                    self.merger_mut().rw_cursor.delete(&key)?;
                }
                Ok(())
            }
        }

        inserter.iterate(&mut Merger { merger: self })?;

        self.rw_cursor.finish()
    }

    /// The iterators operate on key, value store.
    pub fn merge_insertion_deletion_separated<'k>(
        &mut self,
        mut delete_keys_iter: impl FallibleIterator<
            Item = (Vec<u8>, ()),
            Error = Error,
        >,
        mut set_keys_iter: impl FallibleIterator<Item = MptKeyValue, Error = Error>,
        in_reconstruct_snapshot_state: bool,
    ) -> Result<MerkleHash> {
        self.rw_cursor.load_root(in_reconstruct_snapshot_state)?;

        let mut key_to_delete = delete_keys_iter.next()?;
        let mut key_value_to_set = set_keys_iter.next()?;

        loop {
            if key_to_delete.is_none() {
                if key_value_to_set.is_some() {
                    let (key, value) = key_value_to_set.unwrap();
                    self.rw_cursor.insert(&key, value)?;
                    while let Some((key, value)) = set_keys_iter.next()? {
                        self.rw_cursor.insert(&key, value)?;
                    }
                }
                break;
            };

            if key_value_to_set.is_none() {
                if key_to_delete.is_some() {
                    self.rw_cursor
                        .delete(&key_to_delete.as_ref().unwrap().0)?;
                    while let Some((key, _)) = delete_keys_iter.next()? {
                        self.rw_cursor.delete(&key)?;
                    }
                }
                break;
            }

            // In a diff, if there is a deletion of the same key of a insertion,
            // delete only happens before the insertion because the inserted key
            // value must present in the final merged result for it to be in the
            // diff.
            let key_delete = &key_to_delete.as_ref().unwrap().0;
            if key_delete <= &key_value_to_set.as_ref().unwrap().0 {
                self.rw_cursor.delete(key_delete)?;
                key_to_delete = delete_keys_iter.next()?;
            } else {
                let (key_insert, value) = key_value_to_set.take().unwrap();
                self.rw_cursor.insert(&key_insert, value)?;
                key_value_to_set = set_keys_iter.next()?;
            }
        }

        self.rw_cursor.finish()
    }
}

struct MergeMptsInRequest<'a> {
    maybe_readonly_mpt: Option<&'a mut dyn SnapshotMptTraitReadAndIterate>,
    out_mpt: &'a mut dyn SnapshotMptTraitRw,
}

impl GetReadMpt for MergeMptsInRequest<'_> {
    fn get_merkle_root(&self) -> MerkleHash {
        if self.maybe_readonly_mpt.is_some() {
            self.maybe_readonly_mpt.as_ref().unwrap().get_merkle_root()
        } else {
            self.out_mpt.get_merkle_root()
        }
    }

    fn get_read_mpt(&mut self) -> &mut dyn SnapshotMptTraitRead {
        if self.maybe_readonly_mpt.is_some() {
            self.maybe_readonly_mpt.as_mut().unwrap().as_readonly()
        } else {
            self.out_mpt.as_readonly()
        }
    }
}

impl GetRwMpt for MergeMptsInRequest<'_> {
    fn get_write_mpt(&mut self) -> &mut dyn SnapshotMptTraitRw { self.out_mpt }

    fn get_write_and_read_mpt(
        &mut self,
    ) -> (
        &mut dyn SnapshotMptTraitRw,
        Option<&mut dyn SnapshotMptTraitReadAndIterate>,
    ) {
        (
            self.out_mpt,
            // Can't use map() here because it would be a compilation error.
            match &mut self.maybe_readonly_mpt {
                None => None,
                Some(x) => Some(*x),
            },
        )
    }

    fn is_save_as_write(&self) -> bool { self.maybe_readonly_mpt.is_some() }

    fn is_in_place_update(&self) -> bool { self.maybe_readonly_mpt.is_none() }
}

use crate::{
    impls::{
        errors::*,
        merkle_patricia_trie::{mpt_cursor::*, KVInserter, MptKeyValue},
    },
    storage_db::snapshot_mpt::*,
    tests::DumpedMptKvIterator,
};
use fallible_iterator::FallibleIterator;
use primitives::MerkleHash;