cfx_storage/impls/delta_mpt/
mod.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5pub mod cache;
6pub mod cache_manager_delta_mpts;
7pub mod cow_node_ref;
8pub mod delta_mpt_iterator;
9pub mod delta_mpt_open_db_manager;
10mod mem_optimized_trie_node;
11pub(in super::super) mod node_memory_manager;
12mod node_ref;
13pub(super) mod node_ref_map;
14pub(super) mod owned_node_set;
15pub(super) mod return_after_use;
16pub(super) mod row_number;
17/// Fork of upstream slab in order to compact data and be thread-safe without
18/// giant lock.
19mod slab;
20pub mod subtrie_visitor;
21
22#[cfg(test)]
23mod tests;
24
25pub use self::{
26    cow_node_ref::CowNodeRef,
27    delta_mpt_iterator::DeltaMptIterator,
28    delta_mpt_open_db_manager::{
29        ArcDeltaDbWrapper, OpenDeltaDbLru, OpenableOnDemandOpenDeltaDbTrait,
30    },
31    mem_optimized_trie_node::MemOptimizedTrieNode,
32    node_memory_manager::{TrieNodeDeltaMpt, TrieNodeDeltaMptCell},
33    node_ref::*,
34    node_ref_map::DEFAULT_NODE_MAP_SIZE,
35    owned_node_set::OwnedNodeSet,
36    slab::Slab,
37    subtrie_visitor::SubTrieVisitor,
38};
39
40pub type DeltaMpt = MultiVersionMerklePatriciaTrie;
41
42pub type ChildrenTableDeltaMpt = CompactedChildrenTable<NodeRefDeltaMptCompact>;
43pub type ChildrenTableManagedDeltaMpt = ChildrenTable<NodeRefDeltaMptCompact>;
44
45#[derive(Default)]
46pub struct AtomicCommit {
47    pub row_number: RowNumber,
48}
49
50pub struct AtomicCommitTransaction<
51    'a,
52    Transaction: BorrowMut<DeltaDbTransactionTraitObj>,
53> {
54    pub info: MutexGuard<'a, AtomicCommit>,
55    pub transaction: Transaction,
56}
57
58pub struct MultiVersionMerklePatriciaTrie {
59    /// Id for cache manager.
60    mpt_id: DeltaMptId,
61    /// These map are incomplete as some of other roots live in disk db.
62    root_node_by_epoch: RwLock<HashMap<EpochId, Option<NodeRefDeltaMpt>>>,
63    /// Find trie root by merkle root is mainly for debugging.
64    root_node_by_merkle_root: RwLock<HashMap<MerkleHash, NodeRefDeltaMpt>>,
65    /// Note that we don't manage ChildrenTable in allocator because it's
66    /// variable-length.
67    ///
68    /// The node memory manager holds reference to db on disk which stores MPT
69    /// nodes.
70    ///
71    /// The nodes in memory should be considered a cache for MPT.
72    /// However for delta_trie the disk_db contains MPT nodes which are swapped
73    /// out from memory because persistence isn't necessary.
74    /// (So far we don't have write-back implementation. For write-back we
75    /// should think more about roots in disk db.)
76    node_memory_manager: Arc<DeltaMptsNodeMemoryManager>,
77    /// Underlying database for DeltaMpt.
78    // Opened databases are managed by a LRU cache for reducing memory use.
79    db_manager: Arc<dyn OpenableOnDemandOpenDeltaDbTrait>,
80    /// Take care of database clean-ups for DeltaMpt.
81    // The variable is used in drop. Variable with non-trivial dtor shouldn't
82    // trigger the compiler warning.
83    #[allow(unused)]
84    delta_mpts_releaser: Option<DeltaDbReleaser>,
85    /// Mutex for row number computation in state commitment.
86    commit_lock: Mutex<AtomicCommit>,
87
88    // This is a hack to avoid passing pivot chain information from consensus
89    // to snapshot computation.
90    // FIXME: do it from Consensus if possible.
91    parent_epoch_by_epoch: RwLock<HashMap<EpochId, EpochId>>,
92}
93
94impl MallocSizeOf for MultiVersionMerklePatriciaTrie {
95    fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
96        let mut size = 0;
97        size += self.root_node_by_epoch.size_of(ops);
98        size += self.root_node_by_merkle_root.size_of(ops);
99        size += self.node_memory_manager.size_of(ops);
100        size += self.parent_epoch_by_epoch.size_of(ops);
101        size
102    }
103}
104
105impl MultiVersionMerklePatriciaTrie {
106    pub fn new(
107        db_manager: Arc<dyn OpenableOnDemandOpenDeltaDbTrait>,
108        snapshot_epoch_id: EpochId, storage_manager: Arc<StorageManager>,
109        mpt_id: DeltaMptId,
110        node_memory_manager: Arc<DeltaMptsNodeMemoryManager>,
111    ) -> Result<Self> {
112        let row_number = Self::parse_row_number(
113            db_manager.open(mpt_id)?.get("last_row_number".as_bytes()),
114        )
115        // unwrap() on new is fine.
116        .unwrap()
117        .unwrap_or_default();
118
119        debug!("Created DeltaMpt with id {}", mpt_id);
120
121        Ok(Self {
122            mpt_id,
123            root_node_by_epoch: Default::default(),
124            root_node_by_merkle_root: Default::default(),
125            node_memory_manager,
126            delta_mpts_releaser: Some(DeltaDbReleaser {
127                snapshot_epoch_id,
128                storage_manager: Arc::downgrade(&storage_manager),
129                mpt_id,
130            }),
131            db_manager,
132            commit_lock: Mutex::new(AtomicCommit {
133                row_number: RowNumber { value: row_number },
134            }),
135            parent_epoch_by_epoch: Default::default(),
136        })
137    }
138
139    pub fn new_single_mpt(
140        db_manager: Arc<dyn OpenableOnDemandOpenDeltaDbTrait>,
141        node_memory_manager: Arc<DeltaMptsNodeMemoryManager>,
142    ) -> Result<Self> {
143        let mpt_id = 0;
144        let row_number = Self::parse_row_number(
145            db_manager.open(mpt_id)?.get("last_row_number".as_bytes()),
146        )
147        // unwrap() on new is fine.
148        .unwrap()
149        .unwrap_or_default();
150
151        Ok(Self {
152            mpt_id,
153            root_node_by_epoch: Default::default(),
154            root_node_by_merkle_root: Default::default(),
155            node_memory_manager,
156            delta_mpts_releaser: None,
157            db_manager,
158            commit_lock: Mutex::new(AtomicCommit {
159                row_number: RowNumber { value: row_number },
160            }),
161            parent_epoch_by_epoch: Default::default(),
162        })
163    }
164
165    pub fn get_mpt_id(&self) -> DeltaMptId { self.mpt_id }
166
167    pub fn start_commit(
168        &self,
169    ) -> Result<AtomicCommitTransaction<'_, Box<DeltaDbTransactionTraitObj>>>
170    {
171        Ok(AtomicCommitTransaction {
172            info: self.commit_lock.lock(),
173            transaction: self.get_arc_db()?.start_transaction_dyn(true)?,
174        })
175    }
176
177    pub(super) fn state_root_committed(
178        &self, epoch_id: EpochId, merkle_root: &MerkleHash,
179        parent_epoch_id: EpochId, root_node: Option<NodeRefDeltaMpt>,
180    ) {
181        self.set_parent_epoch(epoch_id, parent_epoch_id.clone());
182        if root_node.is_some() {
183            self.set_root_node_ref(
184                merkle_root.clone(),
185                root_node.clone().unwrap(),
186            );
187        }
188        self.set_epoch_root(epoch_id, root_node);
189    }
190
191    fn load_root_node_ref_from_db(
192        &self, merkle_root: &MerkleHash,
193    ) -> Result<Option<NodeRefDeltaMpt>> {
194        let db_key_result = Self::parse_row_number(
195            // FIXME: the usage here for sqlite is serialized.
196            // FIXME: Think of a way of doing it correctly.
197            //
198            // FIXME: think about operations in state_manager and state, which
199            // FIXME: deserve a dedicated db connection. (Of course read-only)
200            self.get_arc_db()?.get(
201                ["db_key_for_root_".as_bytes(), merkle_root.as_ref()]
202                    .concat()
203                    .as_slice(),
204            ),
205        )?;
206        match db_key_result {
207            Some(db_key) => Ok(Some(self.loaded_root(merkle_root, db_key))),
208            None => Ok(None),
209        }
210    }
211
212    fn load_root_node_ref_from_db_by_epoch(
213        &self, epoch_id: &EpochId,
214    ) -> Result<Option<Option<NodeRefDeltaMpt>>> {
215        let db_key_result = Self::parse_row_number(
216            // FIXME: the usage here for sqlite is serialized.
217            // FIXME: Think of a way of doing it correctly.
218            //
219            // FIXME: think about operations in state_manager and state, which
220            // FIXME: deserve a dedicated db connection. (Of course read-only)
221            self.get_arc_db()?.get(
222                ["db_key_for_epoch_id_".as_bytes(), epoch_id.as_ref()]
223                    .concat()
224                    .as_slice(),
225            ),
226        )?;
227        match db_key_result {
228            Some(db_key) => {
229                Ok(Some(self.loaded_root_at_epoch(epoch_id, db_key)))
230            }
231            None => {
232                if self.get_parent_epoch(epoch_id)?.is_some() {
233                    Ok(Some(None))
234                } else {
235                    Ok(None)
236                }
237            }
238        }
239    }
240
241    fn load_parent_epoch_id_from_db(
242        &self, epoch_id: &EpochId,
243    ) -> Result<Option<EpochId>> {
244        let parent_epoch_id_result =
245            // FIXME: the usage here for sqlite is serialized.
246            // FIXME: Think of a way of doing it correctly.
247            //
248            // FIXME: think about operations in state_manager and state, which
249            // FIXME: deserve a dedicated db connection. (Of course read-only)
250            self.get_arc_db()?.get(
251                ["parent_epoch_id_".as_bytes(), epoch_id.as_ref()]
252                    .concat()
253                    .as_slice(),
254            )?;
255        match parent_epoch_id_result {
256            Some(parent_epoch_id_hexstr) => {
257                let parent_epoch_id = hexstr_to_h256(unsafe {
258                    std::str::from_utf8_unchecked(&*parent_epoch_id_hexstr)
259                });
260                self.set_parent_epoch(
261                    epoch_id.clone(),
262                    parent_epoch_id.clone(),
263                );
264                Ok(Some(parent_epoch_id))
265            }
266            None => Ok(None),
267        }
268    }
269
270    pub fn get_root_node_ref_by_epoch(
271        &self, epoch_id: &EpochId,
272    ) -> Result<Option<Option<NodeRefDeltaMpt>>> {
273        let node_ref = self.root_node_by_epoch.read().get(epoch_id).cloned();
274        if node_ref.is_none() {
275            self.load_root_node_ref_from_db_by_epoch(epoch_id)
276        } else {
277            Ok(node_ref)
278        }
279    }
280
281    /// Find trie root by merkle root is mainly for debugging.
282    pub fn get_root_node_ref(
283        &self, merkle_root: &MerkleHash,
284    ) -> Result<Option<NodeRefDeltaMpt>> {
285        let node_ref = self
286            .root_node_by_merkle_root
287            .read()
288            .get(merkle_root)
289            .cloned();
290        if node_ref.is_none() {
291            self.load_root_node_ref_from_db(merkle_root)
292        } else {
293            Ok(node_ref)
294        }
295    }
296
297    pub fn get_parent_epoch(
298        &self, epoch_id: &EpochId,
299    ) -> Result<Option<EpochId>> {
300        let parent_epoch =
301            self.parent_epoch_by_epoch.read().get(epoch_id).cloned();
302        if parent_epoch.is_none() {
303            self.load_parent_epoch_id_from_db(epoch_id)
304        } else {
305            Ok(parent_epoch)
306        }
307    }
308
309    // These set methods are private to storage mod. Writing to db happens at
310    // state commitment.
311    fn set_epoch_root(&self, epoch_id: EpochId, root: Option<NodeRefDeltaMpt>) {
312        self.root_node_by_epoch.write().insert(epoch_id, root);
313    }
314
315    fn set_root_node_ref(
316        &self, merkle_root: MerkleHash, node_ref: NodeRefDeltaMpt,
317    ) {
318        self.root_node_by_merkle_root
319            .write()
320            .insert(merkle_root, node_ref);
321    }
322
323    fn set_parent_epoch(&self, epoch_id: EpochId, parent_epoch_id: EpochId) {
324        self.parent_epoch_by_epoch
325            .write()
326            .insert(epoch_id, parent_epoch_id);
327    }
328
329    fn loaded_root(
330        &self, merkle_root: &MerkleHash, db_key: DeltaMptDbKey,
331    ) -> NodeRefDeltaMpt {
332        let root = NodeRefDeltaMpt::Committed { db_key };
333        self.set_root_node_ref(*merkle_root, root.clone());
334
335        root
336    }
337
338    fn loaded_root_at_epoch(
339        &self, epoch_id: &EpochId, db_key: DeltaMptDbKey,
340    ) -> Option<NodeRefDeltaMpt> {
341        let root = Some(NodeRefDeltaMpt::Committed { db_key });
342        self.set_epoch_root(*epoch_id, root.clone());
343
344        root
345    }
346
347    pub fn get_node_memory_manager(&self) -> &DeltaMptsNodeMemoryManager {
348        &self.node_memory_manager
349    }
350
351    pub fn get_merkle(
352        &self, maybe_node: Option<NodeRefDeltaMpt>,
353    ) -> Result<Option<MerkleHash>> {
354        match maybe_node {
355            Some(node) => Ok(Some({
356                let arc_db = self.get_arc_db()?;
357                // To avoid compile error
358                let merkle = self
359                    .node_memory_manager
360                    .node_as_ref_with_cache_manager(
361                        &self.node_memory_manager.get_allocator(),
362                        node,
363                        self.node_memory_manager.get_cache_manager(),
364                        &mut *arc_db.to_owned_read()?,
365                        self.mpt_id,
366                        &mut false,
367                    )?
368                    .get_merkle()
369                    .clone();
370                merkle
371            })),
372            None => Ok(None),
373        }
374    }
375
376    pub fn get_merkle_root_by_epoch_id(
377        &self, epoch_id: &EpochId,
378    ) -> Result<Option<MerkleHash>> {
379        match self.get_root_node_ref_by_epoch(epoch_id)? {
380            None => Ok(None),
381            Some(root_node) => {
382                Ok(self.get_merkle(root_node)?.or(Some(MERKLE_NULL_NODE)))
383            }
384        }
385    }
386
387    pub fn log_usage(&self) { self.node_memory_manager.log_usage(); }
388}
389
390// Utility function.
391impl MultiVersionMerklePatriciaTrie {
392    fn parse_row_number(
393        x: Result<Option<Box<[u8]>>>,
394    ) -> Result<Option<RowNumberUnderlyingType>> {
395        Ok(match x?.as_ref() {
396            None => None,
397            Some(row_number_bytes) => {
398                trace!("parse_row_number:{:?}", row_number_bytes);
399                Some(
400                    unsafe {
401                        std::str::from_utf8_unchecked(row_number_bytes.as_ref())
402                    }
403                    .parse::<RowNumberUnderlyingType>()?,
404                )
405            }
406        })
407    }
408
409    pub fn get_arc_db(&self) -> Result<ArcDeltaDbWrapper> {
410        self.db_manager.open(self.mpt_id)
411    }
412}
413
414#[derive(Default)]
415pub struct DeltaMptIdGen {
416    id_limit: DeltaMptId,
417    available_ids: Vec<DeltaMptId>,
418}
419
420impl DeltaMptIdGen {
421    pub fn allocate(&mut self) -> Result<DeltaMptId> {
422        let id;
423        match self.available_ids.pop() {
424            None => {
425                if self.id_limit != DeltaMptId::max_value() {
426                    id = Ok(self.id_limit);
427                    self.id_limit += 1;
428                } else {
429                    id = Err(Error::TooManyDeltaMPT.into())
430                }
431            }
432            Some(x) => id = Ok(x),
433        };
434
435        id
436    }
437
438    pub fn free(&mut self, id: DeltaMptId) {
439        let max_id = self.id_limit - 1;
440        if id == max_id {
441            self.id_limit = max_id
442        } else {
443            self.available_ids.push(id);
444        }
445    }
446}
447
448use self::{
449    node_memory_manager::*, node_ref_map::DeltaMptDbKey, row_number::*,
450};
451use crate::{
452    impls::{
453        delta_mpt::node_ref_map::DeltaMptId, errors::*,
454        merkle_patricia_trie::*, storage_manager::storage_manager::*,
455    },
456    storage_db::delta_db_manager::DeltaDbTransactionTraitObj,
457};
458use cfx_types::hexstr_to_h256;
459use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
460use parking_lot::{Mutex, MutexGuard, RwLock};
461use primitives::{EpochId, MerkleHash, MERKLE_NULL_NODE};
462use std::{borrow::BorrowMut, collections::HashMap, sync::Arc};