cfx_storage/impls/delta_mpt/
cow_node_ref.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5/// Set this flag to true to enable storing children merkles for
6/// possibly faster merkle root computation.
7const ENABLE_CHILDREN_MERKLES: bool = true;
8
9/// Load children merkles only when the number of uncached children nodes is
10/// above this threshold. Note that a small value will result in worse
11/// performance.
12const CHILDREN_MERKLE_UNCACHED_THRESHOLD: u32 = 4;
13
14/// Load/store children merkles only when the depth of current node is above
15/// this threshold. This is motivated by the fact that lower (deeper) nodes will
16/// be read less frequently than high nodes. The root node has depth 0. Note
17/// that a small value will result in worse performance.
18/// Depth 5 = 69905 (70k) nodes.
19/// Depth 6 = 1118481 (1.1 million) nodes.
20/// Depth 7 = 17895697 (18 million) nodes.
21const CHILDREN_MERKLE_DEPTH_THRESHOLD: u16 = 4;
22
23/// CowNodeRef facilities access and modification to trie nodes in multi-version
24/// MPT. It offers read-only access to the original trie node, and creates an
25/// unique owned trie node once there is any modification. The ownership is
26/// maintained centralized in owned_node_set which is passed into many methods
27/// as argument. When CowNodeRef is created from an owned node, the ownership is
28/// transferred into the CowNodeRef object. The visitor of MPT makes sure that
29/// ownership of any trie node is not transferred more than once at the same
30/// time.
31pub struct CowNodeRef {
32    owned: bool,
33    mpt_id: DeltaMptId,
34    pub node_ref: NodeRefDeltaMpt,
35}
36
37pub struct MaybeOwnedTrieNode<'a> {
38    trie_node: &'a TrieNodeDeltaMptCell,
39}
40
41type GuardedMaybeOwnedTrieNodeAsCowCallParam<'c> = GuardedValue<
42    Option<MutexGuard<'c, DeltaMptsCacheManager>>,
43    MaybeOwnedTrieNodeAsCowCallParam,
44>;
45
46/// This class can only be meaningfully used internally by CowNodeRef.
47pub struct MaybeOwnedTrieNodeAsCowCallParam {
48    trie_node: *mut TrieNodeDeltaMpt,
49}
50
51impl MaybeOwnedTrieNodeAsCowCallParam {
52    // Returns a mutable reference to trie node when the trie_node is owned,
53    // however the precondition is unchecked.
54    unsafe fn owned_as_mut_unchecked<'a>(
55        &mut self,
56    ) -> &'a mut TrieNodeDeltaMpt {
57        &mut *self.trie_node
58    }
59
60    /// Do not implement in a trait to keep the call private.
61    fn as_ref<'a>(&self) -> &'a TrieNodeDeltaMpt { unsafe { &*self.trie_node } }
62}
63
64impl<'a, GuardType> GuardedValue<GuardType, MaybeOwnedTrieNode<'a>> {
65    pub fn take(
66        x: Self,
67    ) -> GuardedValue<GuardType, MaybeOwnedTrieNodeAsCowCallParam> {
68        let (guard, value) = x.into();
69        GuardedValue::new(
70            guard,
71            MaybeOwnedTrieNodeAsCowCallParam {
72                trie_node: value.trie_node.get(),
73            },
74        )
75    }
76}
77
78impl<'a, GuardType> GuardedValue<GuardType, &'a TrieNodeDeltaMptCell> {
79    pub fn into_wrapped(
80        x: Self,
81    ) -> GuardedValue<GuardType, MaybeOwnedTrieNode<'a>> {
82        let (guard, value) = x.into();
83        GuardedValue::new(guard, MaybeOwnedTrieNode { trie_node: value })
84    }
85}
86
87impl<'a> MaybeOwnedTrieNode<'a> {
88    pub fn take(x: Self) -> MaybeOwnedTrieNodeAsCowCallParam {
89        MaybeOwnedTrieNodeAsCowCallParam {
90            trie_node: x.trie_node.get(),
91        }
92    }
93}
94
95impl<'a> Deref for MaybeOwnedTrieNode<'a> {
96    type Target = TrieNodeDeltaMpt;
97
98    fn deref(&self) -> &Self::Target { self.trie_node.get_ref() }
99}
100
101impl<'a> MaybeOwnedTrieNode<'a> {
102    pub unsafe fn owned_as_mut_unchecked(
103        &mut self,
104    ) -> &'a mut TrieNodeDeltaMpt {
105        self.trie_node.get_as_mut()
106    }
107}
108
109impl CowNodeRef {
110    pub fn new_uninitialized_node<'a>(
111        allocator: AllocatorRefRefDeltaMpt<'a>,
112        owned_node_set: &mut OwnedNodeSet, mpt_id: DeltaMptId,
113    ) -> Result<(Self, SlabVacantEntryDeltaMpt<'a>)> {
114        let (node_ref, new_entry) =
115            DeltaMptsNodeMemoryManager::new_node(allocator)?;
116        owned_node_set.insert(node_ref.clone(), None);
117
118        Ok((
119            Self {
120                owned: true,
121                mpt_id,
122                node_ref,
123            },
124            new_entry,
125        ))
126    }
127
128    pub fn new(
129        node_ref: NodeRefDeltaMpt, owned_node_set: &OwnedNodeSet,
130        mpt_id: DeltaMptId,
131    ) -> Self {
132        Self {
133            owned: owned_node_set.contains(&node_ref),
134            mpt_id,
135            node_ref,
136        }
137    }
138
139    /// Take the value out of Self. Self is safe to drop.
140    pub fn take(&mut self) -> Self {
141        let ret = Self {
142            owned: self.owned,
143            mpt_id: self.mpt_id,
144            node_ref: self.node_ref.clone(),
145        };
146
147        self.owned = false;
148        ret
149    }
150}
151
152impl Drop for CowNodeRef {
153    /// Assert that the CowNodeRef doesn't own something.
154    fn drop(&mut self) {
155        debug_assert_eq!(false, self.owned);
156    }
157}
158
159impl CowNodeRef {
160    pub fn is_owned(&self) -> bool { self.owned }
161
162    fn convert_to_owned<'a>(
163        &mut self, allocator: AllocatorRefRefDeltaMpt<'a>,
164        owned_node_set: &mut OwnedNodeSet,
165    ) -> Result<Option<SlabVacantEntryDeltaMpt<'a>>> {
166        if self.owned {
167            Ok(None)
168        } else {
169            // Similar to Self::new_uninitialized_node(), but considers the
170            // original db key.
171            let (node_ref, new_entry) =
172                DeltaMptsNodeMemoryManager::new_node(&allocator)?;
173            let original_db_key = match self.node_ref {
174                NodeRefDeltaMpt::Committed { db_key } => db_key,
175                NodeRefDeltaMpt::Dirty { .. } => unreachable!(),
176            };
177            owned_node_set.insert(node_ref.clone(), Some(original_db_key));
178            self.node_ref = node_ref;
179            self.owned = true;
180
181            Ok(Some(new_entry))
182        }
183    }
184
185    /// The returned MaybeOwnedTrieNode is considered a borrow of CowNodeRef
186    /// because when it's owned user may use it as mutable borrow of
187    /// TrieNode. The lifetime is bounded by allocator for slab and by
188    /// node_memory_manager for cache.
189    ///
190    /// Lifetime of cache is separated because holding the lock itself shouldn't
191    /// prevent any further calls on self.
192    pub fn get_trie_node<'a, 'c: 'a>(
193        &'a mut self, node_memory_manager: &'c DeltaMptsNodeMemoryManager,
194        allocator: AllocatorRefRefDeltaMpt<'a>,
195        db: &mut DeltaDbOwnedReadTraitObj,
196    ) -> Result<
197        GuardedValue<
198            Option<MutexGuard<'c, DeltaMptsCacheManager>>,
199            MaybeOwnedTrieNode<'a>,
200        >,
201    > {
202        Ok(GuardedValue::into_wrapped(
203            node_memory_manager.node_cell_with_cache_manager(
204                &allocator,
205                self.node_ref.clone(),
206                node_memory_manager.get_cache_manager(),
207                db,
208                self.mpt_id,
209                &mut false,
210            )?,
211        ))
212    }
213
214    /// The trie node obtained from CowNodeRef is invalidated at the same time
215    /// of delete_node and into_child. A trie node obtained from
216    /// CowNodeRef will be inaccessible because it's obtained through
217    /// Self::get_trie_node, which has shorter lifetime because it's a
218    /// borrow of the CowNodeRef.
219    pub fn delete_node(
220        mut self, node_memory_manager: &DeltaMptsNodeMemoryManager,
221        owned_node_set: &mut OwnedNodeSet,
222    ) {
223        if self.owned {
224            node_memory_manager
225                .free_owned_node(&mut self.node_ref, self.mpt_id);
226            owned_node_set.remove(&self.node_ref);
227            self.owned = false;
228        }
229    }
230
231    // FIXME: maybe forbid calling for un-owned node? Check
232    // SubTrieVisitor#delete, #delete_all, etc.
233    pub fn into_child(mut self) -> Option<NodeRefDeltaMptCompact> {
234        if self.owned {
235            self.owned = false;
236        }
237        Some(self.node_ref.clone().into())
238    }
239
240    /// The deletion is always successful. When return value is Error, the
241    /// failing part is iteration.
242    pub fn delete_subtree(
243        mut self, trie: &DeltaMpt, owned_node_set: &OwnedNodeSet,
244        guarded_trie_node: GuardedMaybeOwnedTrieNodeAsCowCallParam,
245        key_prefix: CompressedPathRaw, values: &mut Vec<MptKeyValue>,
246        db: &mut DeltaDbOwnedReadTraitObj,
247    ) -> Result<()> {
248        if self.owned {
249            if guarded_trie_node.as_ref().as_ref().has_value() {
250                assert!(CompressedPathRaw::has_second_nibble(
251                    key_prefix.path_mask()
252                ));
253                values.push((
254                    key_prefix.path_slice().to_vec(),
255                    guarded_trie_node.as_ref().as_ref().value_clone().unwrap(),
256                ));
257            }
258
259            let children_table =
260                guarded_trie_node.as_ref().as_ref().children_table.clone();
261            // Free the lock for trie_node.
262            // FIXME: try to share the lock.
263            drop(guarded_trie_node);
264
265            let node_memory_manager = trie.get_node_memory_manager();
266            let allocator = node_memory_manager.get_allocator();
267            for (i, node_ref) in children_table.iter() {
268                let mut cow_child_node =
269                    Self::new((*node_ref).into(), owned_node_set, self.mpt_id);
270                let child_node = cow_child_node.get_trie_node(
271                    node_memory_manager,
272                    &allocator,
273                    db,
274                )?;
275                let key_prefix = CompressedPathRaw::join_connected_paths(
276                    &key_prefix,
277                    i,
278                    &child_node.compressed_path_ref(),
279                );
280                let child_node = GuardedValue::take(child_node);
281                cow_child_node.delete_subtree(
282                    trie,
283                    owned_node_set,
284                    child_node,
285                    key_prefix,
286                    values,
287                    db,
288                )?;
289            }
290
291            node_memory_manager
292                .free_owned_node(&mut self.node_ref, self.mpt_id);
293            self.owned = false;
294            Ok(())
295        } else {
296            self.iterate_internal(
297                owned_node_set,
298                trie,
299                guarded_trie_node,
300                key_prefix,
301                values,
302                db,
303            )
304        }
305    }
306
307    fn commit_dirty_recurse_into_children<
308        Transaction: BorrowMut<DeltaDbTransactionTraitObj>,
309    >(
310        &mut self, trie: &DeltaMpt, owned_node_set: &mut OwnedNodeSet,
311        trie_node: &mut TrieNodeDeltaMpt,
312        commit_transaction: &mut AtomicCommitTransaction<Transaction>,
313        cache_manager: &mut DeltaMptsCacheManager,
314        allocator_ref: AllocatorRefRefDeltaMpt,
315        children_merkle_map: &mut ChildrenMerkleMap,
316    ) -> Result<()> {
317        for (_i, node_ref_mut) in trie_node.children_table.iter_mut() {
318            let node_ref = node_ref_mut.clone();
319            let mut cow_child_node =
320                Self::new(node_ref.into(), owned_node_set, self.mpt_id);
321            if cow_child_node.is_owned() {
322                let trie_node = unsafe {
323                    trie.get_node_memory_manager().dirty_node_as_mut_unchecked(
324                        allocator_ref,
325                        &mut cow_child_node.node_ref,
326                    )
327                };
328                let commit_result = cow_child_node.commit_dirty_recursively(
329                    trie,
330                    owned_node_set,
331                    trie_node,
332                    commit_transaction,
333                    cache_manager,
334                    allocator_ref,
335                    children_merkle_map,
336                );
337
338                if commit_result.is_ok() {
339                    // An owned child TrieNode now have a new NodeRef.
340                    *node_ref_mut = cow_child_node.into_child().unwrap();
341                } else {
342                    cow_child_node.into_child();
343
344                    commit_result?;
345                }
346            }
347        }
348        Ok(())
349    }
350
351    fn set_merkle(
352        &mut self, children_merkles: MaybeMerkleTableRef,
353        path_without_first_nibble: bool, trie_node: &mut TrieNodeDeltaMpt,
354    ) -> MerkleHash {
355        let path_merkle = trie_node
356            .compute_merkle(children_merkles, path_without_first_nibble);
357        trie_node.set_merkle(&path_merkle);
358
359        path_merkle
360    }
361
362    fn uncached_children_count(
363        &mut self, trie: &DeltaMpt, trie_node: &mut TrieNodeDeltaMpt,
364    ) -> u32 {
365        let node_memory_manager = trie.get_node_memory_manager();
366        let cache_manager = node_memory_manager.get_cache_manager();
367        trie_node
368            .children_table
369            .iter()
370            .map(|(_i, node_ref)| match NodeRefDeltaMpt::from(*node_ref) {
371                NodeRefDeltaMpt::Committed { db_key }
372                    if !cache_manager
373                        .lock()
374                        .is_cached((self.mpt_id, db_key)) =>
375                {
376                    1
377                }
378                _ => 0,
379            })
380            .sum()
381    }
382
383    /// Get if unowned, compute if owned.
384    ///
385    /// parent_node_path_steps_plus_one is the steps of path from root to the
386    /// compressed_path. e.g. parent_node_path_steps_plus_one is 0 for root
387    /// node, 1 for root node's direct children.
388    pub fn get_or_compute_merkle(
389        &mut self, trie: &DeltaMpt, owned_node_set: &mut OwnedNodeSet,
390        allocator_ref: AllocatorRefRefDeltaMpt,
391        db: &mut DeltaDbOwnedReadTraitObj,
392        children_merkle_map: &mut ChildrenMerkleMap,
393        parent_node_path_steps_plus_one: u16,
394    ) -> Result<MerkleHash> {
395        if self.owned {
396            let trie_node = unsafe {
397                trie.get_node_memory_manager().dirty_node_as_mut_unchecked(
398                    allocator_ref,
399                    &mut self.node_ref,
400                )
401            };
402            let node_path_steps = parent_node_path_steps_plus_one
403                + trie_node.compressed_path_ref().path_steps();
404            let children_merkles = self.get_or_compute_children_merkles(
405                trie,
406                owned_node_set,
407                trie_node,
408                allocator_ref,
409                db,
410                children_merkle_map,
411                node_path_steps,
412            )?;
413
414            let merkle = self.set_merkle(
415                children_merkles.as_ref(),
416                (parent_node_path_steps_plus_one % 2) == 1,
417                trie_node,
418            );
419
420            Ok(merkle)
421        } else {
422            let mut load_from_db = false;
423            let trie_node = trie
424                .get_node_memory_manager()
425                .node_as_ref_with_cache_manager(
426                    allocator_ref,
427                    self.node_ref.clone(),
428                    trie.get_node_memory_manager().get_cache_manager(),
429                    db,
430                    self.mpt_id,
431                    &mut load_from_db,
432                )?;
433            if load_from_db {
434                trie.get_node_memory_manager()
435                    .compute_merkle_db_loads
436                    .fetch_add(1, Ordering::Relaxed);
437            }
438            Ok(trie_node.get_merkle().clone())
439        }
440    }
441
442    fn get_or_compute_children_merkles(
443        &mut self, trie: &DeltaMpt, owned_node_set: &mut OwnedNodeSet,
444        trie_node: &mut TrieNodeDeltaMpt,
445        allocator_ref: AllocatorRefRefDeltaMpt,
446        db: &mut DeltaDbOwnedReadTraitObj,
447        children_merkle_map: &mut ChildrenMerkleMap, node_path_steps: u16,
448    ) -> Result<MaybeMerkleTable> {
449        match trie_node.children_table.get_children_count() {
450            0 => Ok(None),
451            _ if ENABLE_CHILDREN_MERKLES => {
452                let original_db_key = match self.node_ref {
453                    NodeRefDeltaMpt::Dirty { index } => {
454                        owned_node_set.get_original_db_key(index)
455                    }
456                    NodeRefDeltaMpt::Committed { .. } => unreachable!(),
457                };
458                let known_merkles = match original_db_key {
459                    Some(original_db_key)
460                        if node_path_steps
461                            > CHILDREN_MERKLE_DEPTH_THRESHOLD =>
462                    {
463                        let node_memory_manager =
464                            trie.get_node_memory_manager();
465                        let num_uncached =
466                            self.uncached_children_count(trie, trie_node);
467                        if num_uncached > CHILDREN_MERKLE_UNCACHED_THRESHOLD {
468                            node_memory_manager.load_children_merkles_from_db(
469                                db,
470                                original_db_key,
471                            )?
472                        } else {
473                            None
474                        }
475                    }
476                    _ => None,
477                };
478                self.compute_children_merkles(
479                    trie,
480                    owned_node_set,
481                    trie_node,
482                    allocator_ref,
483                    db,
484                    children_merkle_map,
485                    known_merkles,
486                    node_path_steps,
487                )
488            }
489            _ => self.compute_children_merkles(
490                trie,
491                owned_node_set,
492                trie_node,
493                allocator_ref,
494                db,
495                children_merkle_map,
496                None,
497                node_path_steps,
498            ),
499        }
500    }
501
502    #[inline]
503    fn compute_children_merkles(
504        &mut self, trie: &DeltaMpt, owned_node_set: &mut OwnedNodeSet,
505        trie_node: &mut TrieNodeDeltaMpt,
506        allocator_ref: AllocatorRefRefDeltaMpt,
507        db: &mut DeltaDbOwnedReadTraitObj,
508        children_merkle_map: &mut ChildrenMerkleMap,
509        known_merkles: Option<CompactedChildrenTable<MerkleHash>>,
510        node_path_steps: u16,
511    ) -> Result<MaybeMerkleTable> {
512        let known = known_merkles.is_some();
513        let known_merkles = known_merkles.unwrap_or_default();
514        let mut merkles = [MERKLE_NULL_NODE; CHILDREN_COUNT];
515        let record_children_merkles = node_path_steps
516            > CHILDREN_MERKLE_DEPTH_THRESHOLD
517            && self.uncached_children_count(trie, trie_node)
518                > CHILDREN_MERKLE_UNCACHED_THRESHOLD;
519
520        for (i, maybe_node_ref_mut) in trie_node.children_table.iter_non_skip()
521        {
522            match maybe_node_ref_mut {
523                None => merkles[i as usize] = MERKLE_NULL_NODE,
524                Some(node_ref_mut) => {
525                    let node_ref_mut = NodeRefDeltaMpt::from(*node_ref_mut);
526                    match (known, node_ref_mut) {
527                        (true, NodeRefDeltaMpt::Committed { .. }) => {
528                            merkles[i as usize] =
529                                known_merkles.get_child(i).unwrap_or_default();
530                        }
531                        (_, node_ref_mut @ _) => {
532                            let mut cow_child_node = Self::new(
533                                node_ref_mut,
534                                owned_node_set,
535                                self.mpt_id,
536                            );
537                            let result = cow_child_node.get_or_compute_merkle(
538                                trie,
539                                owned_node_set,
540                                allocator_ref,
541                                db,
542                                children_merkle_map,
543                                // +1 for the child_index, see also comment for
544                                // get_or_compute_merkle.
545                                node_path_steps + 1,
546                            );
547                            // There is no change to the child reference so the
548                            // return value is dropped.
549                            cow_child_node.into_child();
550
551                            merkles[i as usize] = result?;
552                        }
553                    }
554                }
555            }
556        }
557
558        if record_children_merkles {
559            if let NodeRefDeltaMpt::Dirty { index } = self.node_ref {
560                children_merkle_map.insert(
561                    index,
562                    VanillaChildrenTable::<MerkleHash>::from(merkles),
563                );
564            }
565        }
566
567        Ok(Some(merkles))
568    }
569
570    // FIXME: unit test.
571    // FIXME: It's unnecessary to use owned_node_set for read-only access.
572    // FIXME: Where to put which method? CowNodeRef, MVMPT / MPT,
573    // FIXME: SubTrieVisitor?
574    pub fn iterate_internal<KVInserterType: KVInserter<MptKeyValue>>(
575        &self, owned_node_set: &OwnedNodeSet, trie: &DeltaMpt,
576        guarded_trie_node: GuardedMaybeOwnedTrieNodeAsCowCallParam,
577        key_prefix: CompressedPathRaw, values: &mut KVInserterType,
578        db: &mut DeltaDbOwnedReadTraitObj,
579    ) -> Result<()> {
580        if guarded_trie_node.as_ref().as_ref().has_value() {
581            assert!(CompressedPathRaw::has_second_nibble(
582                key_prefix.path_mask()
583            ));
584            values.push((
585                key_prefix.path_slice().to_vec(),
586                guarded_trie_node.as_ref().as_ref().value_clone().unwrap(),
587            ))?;
588        }
589
590        let children_table =
591            guarded_trie_node.as_ref().as_ref().children_table.clone();
592        // Free the lock for trie_node.
593        // FIXME: try to share the lock.
594        drop(guarded_trie_node);
595
596        let node_memory_manager = trie.get_node_memory_manager();
597        let allocator = node_memory_manager.get_allocator();
598        for (i, node_ref) in children_table.iter() {
599            let mut cow_child_node =
600                Self::new((*node_ref).into(), owned_node_set, self.mpt_id);
601            let child_node = cow_child_node.get_trie_node(
602                node_memory_manager,
603                &allocator,
604                db,
605            )?;
606            let key_prefix = CompressedPathRaw::join_connected_paths(
607                &key_prefix,
608                i,
609                &child_node.compressed_path_ref(),
610            );
611            let child_node = GuardedValue::take(child_node);
612            cow_child_node.iterate_internal(
613                owned_node_set,
614                trie,
615                child_node,
616                key_prefix,
617                values,
618                db,
619            )?;
620        }
621
622        Ok(())
623    }
624
625    // only_account_key can be used to filter the only account key
626    pub fn iterate_internal_with_callback(
627        &self, owned_node_set: &OwnedNodeSet, trie: &DeltaMpt,
628        guarded_trie_node: GuardedMaybeOwnedTrieNodeAsCowCallParam,
629        key_prefix: CompressedPathRaw, db: &mut DeltaDbOwnedReadTraitObj,
630        callback: &mut dyn FnMut(MptKeyValue), is_delta_mpt: bool,
631        only_account_key: bool,
632    ) -> Result<()> {
633        // filter out all the key that is longer than the account key
634        if only_account_key {
635            let key_len =
636                SpaceStorageFilter::space_flag_index(is_delta_mpt) + 1;
637            if key_prefix.path_slice().len() > key_len {
638                return Ok(());
639            }
640        }
641        if guarded_trie_node.as_ref().as_ref().has_value() {
642            assert!(CompressedPathRaw::has_second_nibble(
643                key_prefix.path_mask()
644            ));
645            callback((
646                key_prefix.path_slice().to_vec(),
647                guarded_trie_node.as_ref().as_ref().value_clone().unwrap(),
648            ));
649        }
650
651        let children_table =
652            guarded_trie_node.as_ref().as_ref().children_table.clone();
653        // Free the lock for trie_node.
654        // FIXME: try to share the lock.
655        drop(guarded_trie_node);
656
657        let node_memory_manager = trie.get_node_memory_manager();
658        let allocator = node_memory_manager.get_allocator();
659        for (i, node_ref) in children_table.iter() {
660            let mut cow_child_node =
661                Self::new((*node_ref).into(), owned_node_set, self.mpt_id);
662            let child_node = cow_child_node.get_trie_node(
663                node_memory_manager,
664                &allocator,
665                db,
666            )?;
667            let key_prefix = CompressedPathRaw::join_connected_paths(
668                &key_prefix,
669                i,
670                &child_node.compressed_path_ref(),
671            );
672            let child_node = GuardedValue::take(child_node);
673            cow_child_node.iterate_internal_with_callback(
674                owned_node_set,
675                trie,
676                child_node,
677                key_prefix,
678                db,
679                callback,
680                is_delta_mpt,
681                only_account_key,
682            )?;
683        }
684
685        Ok(())
686    }
687
688    /// Recursively commit dirty nodes.
689    pub fn commit_dirty_recursively<
690        Transaction: BorrowMut<DeltaDbTransactionTraitObj>,
691    >(
692        &mut self, trie: &DeltaMpt, owned_node_set: &mut OwnedNodeSet,
693        trie_node: &mut TrieNodeDeltaMpt,
694        commit_transaction: &mut AtomicCommitTransaction<Transaction>,
695        cache_manager: &mut DeltaMptsCacheManager,
696        allocator_ref: AllocatorRefRefDeltaMpt,
697        children_merkle_map: &mut ChildrenMerkleMap,
698    ) -> Result<bool> {
699        if self.owned {
700            self.commit_dirty_recurse_into_children(
701                trie,
702                owned_node_set,
703                trie_node,
704                commit_transaction,
705                cache_manager,
706                allocator_ref,
707                children_merkle_map,
708            )?;
709
710            let db_key = commit_transaction.info.row_number.value;
711            commit_transaction
712                .transaction
713                .borrow_mut()
714                .put_with_number_key(
715                    commit_transaction
716                        .info
717                        .row_number
718                        .value
719                        .try_into()
720                        .expect("not exceed i64::MAX"),
721                    trie_node.rlp_bytes().as_slice(),
722                )?;
723            commit_transaction.info.row_number =
724                commit_transaction.info.row_number.get_next()?;
725
726            let slot = match &self.node_ref {
727                NodeRefDeltaMpt::Dirty { index } => *index,
728                _ => unreachable!(),
729            };
730            if let Some(children_merkles) = children_merkle_map.remove(&slot) {
731                commit_transaction.transaction.borrow_mut().put(
732                    format!("cm{}", db_key).as_bytes(),
733                    &children_merkles.rlp_bytes(),
734                )?;
735            }
736
737            let committed_node_ref = NodeRefDeltaMpt::Committed { db_key };
738            owned_node_set.insert(committed_node_ref.clone(), None);
739            owned_node_set.remove(&self.node_ref);
740            // We insert the new node_ref into owned_node_set first because in
741            // general inserting to a set may fail, even though it
742            // doesn't fail for the current implementation.
743            //
744            // It would be more difficult to deal with if the insertion to
745            // node_ref_map below fails while we haven't updated information
746            // about the current node: we may forget to rollback the insertion
747            // into node_ref_map and cache algorithm.
748            cache_manager.insert_to_node_ref_map_and_call_cache_access(
749                (self.mpt_id, db_key),
750                slot,
751                trie.get_node_memory_manager(),
752            )?;
753            self.node_ref = committed_node_ref;
754
755            Ok(true)
756        } else {
757            Ok(false)
758        }
759    }
760
761    pub fn cow_merge_path(
762        self, trie: &DeltaMpt, owned_node_set: &mut OwnedNodeSet,
763        trie_node: GuardedMaybeOwnedTrieNodeAsCowCallParam,
764        child_node_ref: NodeRefDeltaMpt, child_index: u8,
765        db: &mut DeltaDbOwnedReadTraitObj,
766    ) -> Result<CowNodeRef> {
767        let node_memory_manager = trie.get_node_memory_manager();
768        let allocator = node_memory_manager.get_allocator();
769
770        let mut child_node_cow =
771            CowNodeRef::new(child_node_ref, owned_node_set, self.mpt_id);
772        let compressed_path_ref =
773            trie_node.as_ref().as_ref().compressed_path_ref();
774        let path_prefix = CompressedPathRaw::from(compressed_path_ref);
775        // FIXME: Here we may hold the lock and get the trie node for the child
776        // FIXME: node. think about it.
777        drop(trie_node);
778        // COW modify child,
779        // FIXME: error processing. Error happens when child node isn't dirty.
780        // FIXME: State can be easily reverted if the trie node containing the
781        // FIXME: value or itself isn't dirty as well. However if a
782        // FIXME: dirty child node was removed, recovering the state
783        // FIXME: becomes difficult.
784        let child_trie_node = child_node_cow.get_trie_node(
785            node_memory_manager,
786            &allocator,
787            db,
788        )?;
789        let new_path = CompressedPathRaw::join_connected_paths(
790            &path_prefix,
791            child_index,
792            &child_trie_node.compressed_path_ref(),
793        );
794
795        // FIXME: if child_trie_node isn't owned, but node_cow is owned, modify
796        // FIXME: node_cow.
797        let child_trie_node = GuardedValue::take(child_trie_node);
798        child_node_cow.cow_set_compressed_path(
799            &node_memory_manager,
800            owned_node_set,
801            new_path,
802            child_trie_node,
803        )?;
804        self.delete_node(node_memory_manager, owned_node_set);
805
806        Ok(child_node_cow)
807    }
808
809    /// When the node is unowned, it doesn't make sense to do copy-on-write
810    /// creation because the new node will be deleted immediately.
811    pub unsafe fn delete_value_unchecked_followed_by_node_deletion(
812        &mut self, mut trie_node: GuardedMaybeOwnedTrieNodeAsCowCallParam,
813    ) -> Box<[u8]> {
814        if self.owned {
815            trie_node
816                .as_mut()
817                .owned_as_mut_unchecked()
818                .delete_value_unchecked()
819        } else {
820            trie_node.as_ref().as_ref().value_clone().unwrap()
821        }
822    }
823
824    pub fn cow_set_compressed_path(
825        &mut self, node_memory_manager: &DeltaMptsNodeMemoryManager,
826        owned_node_set: &mut OwnedNodeSet, path: CompressedPathRaw,
827        trie_node: GuardedMaybeOwnedTrieNodeAsCowCallParam,
828    ) -> Result<()> {
829        let path_to_take = Cell::new(Some(path));
830
831        self.cow_modify_with_operation(
832            &node_memory_manager.get_allocator(),
833            owned_node_set,
834            trie_node,
835            |owned_trie_node| {
836                owned_trie_node
837                    .set_compressed_path(path_to_take.replace(None).unwrap())
838            },
839            |read_only_trie_node| {
840                (
841                    unsafe {
842                        read_only_trie_node.copy_and_replace_fields(
843                            None,
844                            path_to_take.replace(None),
845                            None,
846                        )
847                    },
848                    (),
849                )
850            },
851        )
852    }
853
854    pub unsafe fn cow_delete_value_unchecked(
855        &mut self, node_memory_manager: &DeltaMptsNodeMemoryManager,
856        owned_node_set: &mut OwnedNodeSet,
857        trie_node: GuardedMaybeOwnedTrieNodeAsCowCallParam,
858    ) -> Result<Box<[u8]>> {
859        self.cow_modify_with_operation(
860            &node_memory_manager.get_allocator(),
861            owned_node_set,
862            trie_node,
863            |owned_trie_node| owned_trie_node.delete_value_unchecked(),
864            |read_only_trie_node| {
865                (
866                    read_only_trie_node.copy_and_replace_fields(
867                        Some(None),
868                        None,
869                        None,
870                    ),
871                    read_only_trie_node.value_clone().unwrap(),
872                )
873            },
874        )
875    }
876
877    pub fn cow_replace_value_valid(
878        &mut self, node_memory_manager: &DeltaMptsNodeMemoryManager,
879        owned_node_set: &mut OwnedNodeSet,
880        trie_node: GuardedMaybeOwnedTrieNodeAsCowCallParam, value: Box<[u8]>,
881    ) -> Result<MptValue<Box<[u8]>>> {
882        let value_to_take = Cell::new(Some(value));
883
884        self.cow_modify_with_operation(
885            &node_memory_manager.get_allocator(),
886            owned_node_set,
887            trie_node,
888            |owned_trie_node| {
889                owned_trie_node
890                    .replace_value_valid(value_to_take.replace(None).unwrap())
891            },
892            |read_only_trie_node| {
893                (
894                    unsafe {
895                        read_only_trie_node.copy_and_replace_fields(
896                            Some(value_to_take.replace(None)),
897                            None,
898                            None,
899                        )
900                    },
901                    read_only_trie_node.value_clone(),
902                )
903            },
904        )
905    }
906
907    /// If owned, run f_owned on trie node; otherwise run f_ref on the read-only
908    /// trie node to create the equivalent trie node and return value as the
909    /// final state of f_owned.
910    pub fn cow_modify_with_operation<
911        'a,
912        OutputType,
913        FOwned: FnOnce(&'a mut TrieNodeDeltaMpt) -> OutputType,
914        FRef: FnOnce(&'a TrieNodeDeltaMpt) -> (TrieNodeDeltaMpt, OutputType),
915    >(
916        &mut self, allocator: AllocatorRefRefDeltaMpt<'a>,
917        owned_node_set: &mut OwnedNodeSet,
918        mut trie_node: GuardedMaybeOwnedTrieNodeAsCowCallParam,
919        f_owned: FOwned, f_ref: FRef,
920    ) -> Result<OutputType> {
921        let copied = self.convert_to_owned(allocator, owned_node_set)?;
922        match copied {
923            None => unsafe {
924                let trie_node_mut = trie_node.as_mut().owned_as_mut_unchecked();
925                Ok(f_owned(trie_node_mut))
926            },
927            Some(new_entry) => {
928                let (new_trie_node, output) =
929                    f_ref(trie_node.as_ref().as_ref());
930                new_entry.insert(&new_trie_node);
931                Ok(output)
932            }
933        }
934    }
935
936    pub fn cow_modify<'a>(
937        &mut self, allocator: AllocatorRefRefDeltaMpt<'a>,
938        owned_node_set: &mut OwnedNodeSet,
939        mut trie_node: GuardedMaybeOwnedTrieNodeAsCowCallParam,
940    ) -> Result<&'a mut TrieNodeDeltaMpt> {
941        let copied = self.convert_to_owned(allocator, owned_node_set)?;
942        match copied {
943            None => unsafe { Ok(trie_node.as_mut().owned_as_mut_unchecked()) },
944            Some(new_entry) => unsafe {
945                let new_trie_node = trie_node
946                    .as_ref()
947                    .as_ref()
948                    .copy_and_replace_fields(None, None, None);
949                let key = new_entry.key();
950                new_entry.insert(&new_trie_node);
951                Ok(DeltaMptsNodeMemoryManager::get_in_memory_node_mut(
952                    allocator, key,
953                ))
954            },
955        }
956    }
957}
958
959use super::{
960    super::{
961        super::{
962            storage_db::delta_db_manager::{
963                DeltaDbOwnedReadTraitObj, DeltaDbTransactionTraitObj,
964            },
965            utils::{guarded_value::GuardedValue, UnsafeCellExtension},
966        },
967        errors::*,
968        merkle_patricia_trie::{merkle::*, *},
969        state::ChildrenMerkleMap,
970    },
971    node_memory_manager::*,
972    owned_node_set::OwnedNodeSet,
973    AtomicCommitTransaction, DeltaMpt, *,
974};
975use parking_lot::MutexGuard;
976use primitives::{MerkleHash, MptValue, SpaceStorageFilter, MERKLE_NULL_NODE};
977use rlp::*;
978use std::{
979    borrow::BorrowMut, cell::Cell, convert::TryInto, ops::Deref,
980    sync::atomic::Ordering,
981};