cfx_storage/impls/delta_mpt/
subtrie_visitor.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5pub struct SubTrieVisitor<'trie, 'db: 'trie> {
6    root: CowNodeRef,
7
8    trie_ref: &'trie MerklePatriciaTrie,
9    db: ReturnAfterUse<'trie, ArcDeltaDbWrapper>,
10    phantom: PhantomData<&'db ArcDeltaDbWrapper>,
11
12    /// We use ReturnAfterUse because only one SubTrieVisitor(the deepest) can
13    /// hold the mutable reference of owned_node_set.
14    owned_node_set: ReturnAfterUse<'trie, OwnedNodeSet>,
15}
16
17type MerklePatriciaTrie = DeltaMpt;
18
19impl<'trie> SubTrieVisitor<'trie, 'trie> {
20    pub fn new(
21        trie_ref: &'trie MerklePatriciaTrie, root: NodeRefDeltaMpt,
22        owned_node_set: &'trie mut Option<OwnedNodeSet>,
23    ) -> Result<Self> {
24        Ok(Self {
25            trie_ref,
26            db: ReturnAfterUse::new_from_value(trie_ref.get_arc_db()?),
27            phantom: PhantomData,
28            root: CowNodeRef::new(
29                root,
30                owned_node_set.as_ref().unwrap(),
31                trie_ref.get_mpt_id(),
32            ),
33            owned_node_set: ReturnAfterUse::new(owned_node_set),
34        })
35    }
36}
37
38impl<'trie, 'db: 'trie> SubTrieVisitor<'trie, 'db> {
39    fn new_visitor_for_subtree<'a>(
40        &'a mut self, child_node: NodeRefDeltaMpt,
41    ) -> SubTrieVisitor<'a, 'db>
42    where 'trie: 'a {
43        let trie_ref = self.trie_ref;
44        let cow_child_node = CowNodeRef::new(
45            child_node,
46            self.owned_node_set.get_ref(),
47            self.trie_ref.get_mpt_id(),
48        );
49        SubTrieVisitor {
50            trie_ref,
51            db: ReturnAfterUse::<'a, ArcDeltaDbWrapper>::new_from_origin::<'trie>(
52                &mut self.db,
53            ),
54            phantom: PhantomData,
55            root: cow_child_node,
56            owned_node_set: ReturnAfterUse::new_from_origin(
57                &mut self.owned_node_set,
58            ),
59        }
60    }
61
62    pub fn get_trie_ref(&self) -> &'trie MerklePatriciaTrie { self.trie_ref }
63
64    fn node_memory_manager(&self) -> &'trie DeltaMptsNodeMemoryManager {
65        &self.get_trie_ref().get_node_memory_manager()
66    }
67
68    fn get_trie_node<'a>(
69        &mut self, key: KeyPart, allocator_ref: AllocatorRefRefDeltaMpt<'a>,
70    ) -> Result<
71        Option<
72            GuardedValue<
73                Option<MutexGuard<'a, DeltaMptsCacheManager>>,
74                &'a TrieNodeDeltaMpt,
75            >,
76        >,
77    >
78    where 'trie: 'a {
79        let node_memory_manager = self.node_memory_manager();
80        let cache_manager = node_memory_manager.get_cache_manager();
81        let mut node_ref = self.root.node_ref.clone();
82        let mut key = key;
83
84        let mut db_load_count = 0;
85        loop {
86            let mut is_loaded_from_db = false;
87            let trie_node = node_memory_manager
88                .node_as_ref_with_cache_manager(
89                    allocator_ref,
90                    node_ref,
91                    cache_manager,
92                    &mut *self.db.get_mut().to_owned_read()?,
93                    self.trie_ref.get_mpt_id(),
94                    &mut is_loaded_from_db,
95                )?;
96            if is_loaded_from_db {
97                db_load_count += 1;
98            }
99            match trie_node.walk::<access_mode::Read>(key) {
100                WalkStop::Arrived => {
101                    node_memory_manager.log_uncached_key_access(db_load_count);
102                    let (guard, trie_node) = trie_node.into();
103                    return Ok(Some(GuardedValue::new(guard, trie_node)));
104                }
105                WalkStop::Descent {
106                    key_remaining,
107                    child_index: _,
108                    child_node,
109                } => {
110                    node_ref = child_node.clone().into();
111                    key = key_remaining;
112                }
113                _ => {
114                    return Ok(None);
115                }
116            }
117        }
118    }
119
120    fn retrieve_children_hashes<'a>(
121        &self, children_table: ChildrenTableDeltaMpt,
122    ) -> Result<MaybeMerkleTable> {
123        if children_table.get_children_count() == 0 {
124            return Ok(None);
125        }
126
127        let mut merkles = ChildrenMerkleTable::default();
128
129        for (i, maybe_node_ref) in children_table.iter_non_skip() {
130            merkles[i as usize] = match maybe_node_ref {
131                None => MERKLE_NULL_NODE,
132                Some(node_ref) => {
133                    self.trie_ref.get_merkle(Some((*node_ref).into()))?.unwrap()
134                }
135            };
136        }
137
138        return Ok(Some(merkles));
139    }
140
141    pub fn get_proof<'a>(&mut self, key: KeyPart) -> Result<TrieProof> {
142        let allocator_ref = &self.node_memory_manager().get_allocator();
143        let node_memory_manager = self.node_memory_manager();
144        let cache_manager = node_memory_manager.get_cache_manager();
145
146        let mut proof_nodes = vec![];
147        let mut finished = false;
148        let mut node_ref = self.root.node_ref.clone();
149        let mut key = key;
150        let mut path_without_first_nibble = false;
151
152        while !finished {
153            // retrieve current trie node
154            let trie_node = node_memory_manager
155                .node_as_ref_with_cache_manager(
156                    allocator_ref,
157                    node_ref.clone(),
158                    cache_manager,
159                    &mut *self.db.get_mut().to_owned_read()?,
160                    self.trie_ref.get_mpt_id(),
161                    &mut false,
162                )?;
163
164            // update variables for subsequent traversal
165            match trie_node.walk::<access_mode::Read>(key) {
166                WalkStop::Arrived => {
167                    finished = true;
168                }
169                WalkStop::Descent {
170                    key_remaining,
171                    child_node,
172                    ..
173                } => {
174                    node_ref = child_node.clone().into();
175                    key = key_remaining;
176                }
177                _ => {
178                    finished = true;
179                }
180            };
181
182            // create proof node
183            proof_nodes.push({
184                let maybe_value = trie_node.value_clone().into_option();
185                let compressed_path = trie_node.compressed_path_ref().into();
186
187                let children = trie_node.children_table.clone();
188                let this_node_path_without_first_nibble =
189                    path_without_first_nibble;
190                path_without_first_nibble =
191                    CompressedPathRaw::has_second_nibble(
192                        trie_node.compressed_path_ref().path_mask(),
193                    );
194                drop(trie_node);
195                let children_merkles =
196                    self.retrieve_children_hashes(children)?;
197                TrieProofNode::new(
198                    children_merkles.into(),
199                    maybe_value,
200                    compressed_path,
201                    this_node_path_without_first_nibble,
202                )
203            });
204        }
205
206        // The proof can be wrong when the trie is being modified and we haven't
207        // update the merkle hash bottom-up.
208        Ok(TrieProof::new(proof_nodes)?)
209    }
210
211    pub fn get(&mut self, key: KeyPart) -> Result<MptValue<Box<[u8]>>> {
212        let allocator = self.node_memory_manager().get_allocator();
213        let maybe_trie_node = self.get_trie_node(key, &allocator)?;
214
215        Ok(match maybe_trie_node {
216            None => MptValue::None,
217            Some(trie_node) => trie_node.value_clone(),
218        })
219    }
220
221    pub fn get_merkle_hash_wo_compressed_path(
222        &mut self, key: KeyPart,
223    ) -> Result<Option<MerkleHash>> {
224        let allocator = self.node_memory_manager().get_allocator();
225        let maybe_trie_node = self.get_trie_node(key, &allocator)?;
226
227        match maybe_trie_node {
228            None => Ok(None),
229            Some(trie_node) => {
230                if trie_node.get_compressed_path_size() == 0 {
231                    return Ok(Some(trie_node.get_merkle().clone()));
232                }
233
234                let maybe_value = trie_node.value_clone().into_option();
235
236                let merkles = {
237                    let children_table = trie_node.children_table.clone();
238                    drop(trie_node);
239                    self.retrieve_children_hashes(children_table)?
240                };
241
242                Ok(Some(compute_node_merkle(
243                    merkles.as_ref(),
244                    maybe_value.as_ref().map(|value| value.as_ref()),
245                )))
246            }
247        }
248    }
249
250    /// The visitor can only be used once to modify.
251    /// Returns (deleted value, is root node replaced, the current root node for
252    /// the subtree).
253    pub fn delete(
254        mut self, key: KeyPart,
255    ) -> Result<(Option<Box<[u8]>>, bool, Option<NodeRefDeltaMptCompact>)> {
256        let node_memory_manager = self.node_memory_manager();
257        let allocator = node_memory_manager.get_allocator();
258        let mut node_cow = self.root.take();
259        // TODO(yz): be compliant to borrow rule and avoid duplicated
260
261        // FIXME: map_split?
262        let is_owned = node_cow.is_owned();
263        let trie_node_ref = node_cow.get_trie_node(
264            node_memory_manager,
265            &allocator,
266            &mut *self.db.get_mut().to_owned_read()?,
267        )?;
268        match trie_node_ref.walk::<access_mode::Read>(key) {
269            WalkStop::Arrived => {
270                // If value doesn't exists, returns invalid key error.
271                let result = trie_node_ref.check_delete_value();
272                if result.is_err() {
273                    return Ok((None, false, node_cow.into_child()));
274                }
275                let action = result.unwrap();
276                match action {
277                    TrieNodeAction::Delete => {
278                        // The current node is going to be dropped if owned.
279                        let trie_node = GuardedValue::take(trie_node_ref);
280                        let value = unsafe {
281                            node_cow.delete_value_unchecked_followed_by_node_deletion(
282                                trie_node,
283                            )
284                        };
285                        node_cow.delete_node(
286                            node_memory_manager,
287                            self.owned_node_set.get_mut(),
288                        );
289                        Ok((Some(value), true, None))
290                    }
291                    TrieNodeAction::MergePath {
292                        child_index,
293                        child_node_ref,
294                    } => {
295                        // The current node is going to be merged with its only
296                        // child after the value deletion.
297                        let value = trie_node_ref.value_clone().unwrap();
298
299                        let trie_node = GuardedValue::take(trie_node_ref);
300                        let merged_node_cow = node_cow.cow_merge_path(
301                            self.get_trie_ref(),
302                            self.owned_node_set.get_mut(),
303                            trie_node,
304                            child_node_ref,
305                            child_index,
306                            &mut *self.db.get_mut().to_owned_read()?,
307                        )?;
308
309                        // FIXME: true?
310                        Ok((Some(value), true, merged_node_cow.into_child()))
311                    }
312                    TrieNodeAction::Modify => {
313                        let node_changed = !is_owned;
314                        let trie_node = GuardedValue::take(trie_node_ref);
315                        let value = unsafe {
316                            node_cow.cow_delete_value_unchecked(
317                                &node_memory_manager,
318                                self.owned_node_set.get_mut(),
319                                trie_node,
320                            )?
321                        };
322
323                        Ok((Some(value), node_changed, node_cow.into_child()))
324                    }
325                }
326            }
327            WalkStop::Descent {
328                key_remaining,
329                child_node,
330                child_index,
331            } => {
332                drop(trie_node_ref);
333                let result = self
334                    .new_visitor_for_subtree(child_node.clone().into())
335                    .delete(key_remaining);
336                if result.is_err() {
337                    node_cow.into_child();
338                    return result;
339                }
340                let trie_node_ref = node_cow.get_trie_node(
341                    node_memory_manager,
342                    &allocator,
343                    &mut *self.db.get_mut().to_owned_read()?,
344                )?;
345                let (value, child_replaced, new_child_node) = result.unwrap();
346                if child_replaced {
347                    let action = trie_node_ref
348                        .check_replace_or_delete_child_action(
349                            child_index,
350                            new_child_node,
351                        );
352                    match action {
353                        TrieNodeAction::MergePath {
354                            child_index,
355                            child_node_ref,
356                        } => {
357                            // The current node is going to be merged with its
358                            // only child after the
359                            // value deletion.
360                            let trie_node = GuardedValue::take(trie_node_ref);
361                            let merged_node_cow = node_cow.cow_merge_path(
362                                self.get_trie_ref(),
363                                self.owned_node_set.get_mut(),
364                                trie_node,
365                                child_node_ref,
366                                child_index,
367                                &mut *self.db.get_mut().to_owned_read()?,
368                            )?;
369
370                            // FIXME: true?
371                            Ok((value, true, merged_node_cow.into_child()))
372                        }
373                        TrieNodeAction::Modify => unsafe {
374                            let node_ref_changed = !is_owned;
375                            let trie_node = GuardedValue::take(trie_node_ref);
376                            match new_child_node {
377                                None => {
378                                    node_cow
379                                        .cow_modify(
380                                            &allocator,
381                                            self.owned_node_set.get_mut(),
382                                            trie_node,
383                                        )?
384                                        .delete_child_unchecked(child_index);
385                                }
386                                Some(replacement) => {
387                                    node_cow
388                                        .cow_modify(
389                                            &allocator,
390                                            self.owned_node_set.get_mut(),
391                                            trie_node,
392                                        )?
393                                        .replace_child_unchecked(
394                                            child_index,
395                                            replacement,
396                                        );
397                                }
398                            }
399
400                            Ok((value, node_ref_changed, node_cow.into_child()))
401                        },
402                        _ => {
403                            unreachable!()
404                        }
405                    }
406                } else {
407                    Ok((value, false, node_cow.into_child()))
408                }
409            }
410
411            _ => Ok((None, false, node_cow.into_child())),
412        }
413    }
414
415    /// The visitor can only be used once to modify.
416    /// Returns (deleted value, is root node replaced, the current root node for
417    /// the subtree).
418    pub fn delete_all(
419        mut self, key: KeyPart, key_remaining: KeyPart,
420    ) -> Result<(
421        Option<Vec<MptKeyValue>>,
422        bool,
423        Option<NodeRefDeltaMptCompact>,
424    )> {
425        let node_memory_manager = self.node_memory_manager();
426        let allocator = node_memory_manager.get_allocator();
427        let mut node_cow = self.root.take();
428        // TODO(yz): be compliant to borrow rule and avoid duplicated
429
430        // FIXME: map_split?
431        let trie_node_ref = node_cow.get_trie_node(
432            node_memory_manager,
433            &allocator,
434            &mut *self.db.get_mut().to_owned_read()?,
435        )?;
436
437        let key_prefix: CompressedPathRaw;
438        match trie_node_ref.walk::<access_mode::Write>(key_remaining) {
439            WalkStop::ChildNotFound {
440                key_remaining: _,
441                child_index: _,
442            } => return Ok((None, false, node_cow.into_child())),
443            WalkStop::Arrived => {
444                // To enumerate the subtree.
445                key_prefix = key.into();
446            }
447            WalkStop::PathDiverted {
448                key_child_index,
449                key_remaining: _,
450                matched_path: _,
451                unmatched_child_index,
452                unmatched_path_remaining,
453            } => {
454                if key_child_index.is_some() {
455                    return Ok((None, false, node_cow.into_child()));
456                }
457                // To enumerate the subtree.
458                key_prefix = CompressedPathRaw::join_connected_paths(
459                    &key,
460                    unmatched_child_index,
461                    &unmatched_path_remaining,
462                );
463            }
464            WalkStop::Descent {
465                key_remaining,
466                child_node,
467                child_index,
468            } => {
469                drop(trie_node_ref);
470                let result = self
471                    .new_visitor_for_subtree(child_node.clone().into())
472                    .delete_all(key, key_remaining);
473                if result.is_err() {
474                    node_cow.into_child();
475                    return result;
476                }
477                let is_owned = node_cow.is_owned();
478                let trie_node_ref = node_cow.get_trie_node(
479                    node_memory_manager,
480                    &allocator,
481                    &mut *self.db.get_mut().to_owned_read()?,
482                )?;
483                let (value, child_replaced, new_child_node) = result.unwrap();
484                // FIXME: copied from delete(). Try to reuse code?
485                if child_replaced {
486                    let action = trie_node_ref
487                        .check_replace_or_delete_child_action(
488                            child_index,
489                            new_child_node,
490                        );
491                    match action {
492                        TrieNodeAction::MergePath {
493                            child_index,
494                            child_node_ref,
495                        } => {
496                            // The current node is going to be merged with its
497                            // only child after the
498                            // value deletion.
499                            let trie_node = GuardedValue::take(trie_node_ref);
500                            let merged_node_cow = node_cow.cow_merge_path(
501                                self.get_trie_ref(),
502                                self.owned_node_set.get_mut(),
503                                trie_node,
504                                child_node_ref,
505                                child_index,
506                                &mut *self.db.get_mut().to_owned_read()?,
507                            )?;
508
509                            // FIXME: true?
510                            return Ok((
511                                value,
512                                true,
513                                merged_node_cow.into_child(),
514                            ));
515                        }
516                        TrieNodeAction::Modify => unsafe {
517                            let node_ref_changed = !is_owned;
518                            let trie_node = GuardedValue::take(trie_node_ref);
519                            match new_child_node {
520                                None => {
521                                    node_cow
522                                        .cow_modify(
523                                            &allocator,
524                                            self.owned_node_set.get_mut(),
525                                            trie_node,
526                                        )?
527                                        .delete_child_unchecked(child_index);
528                                }
529                                Some(replacement) => {
530                                    node_cow
531                                        .cow_modify(
532                                            &allocator,
533                                            self.owned_node_set.get_mut(),
534                                            trie_node,
535                                        )?
536                                        .replace_child_unchecked(
537                                            child_index,
538                                            replacement,
539                                        );
540                                }
541                            }
542
543                            return Ok((
544                                value,
545                                node_ref_changed,
546                                node_cow.into_child(),
547                            ));
548                        },
549                        _ => {
550                            unreachable!()
551                        }
552                    }
553                } else {
554                    return Ok((value, false, node_cow.into_child()));
555                }
556            }
557        }
558
559        let trie_node = GuardedValue::take(trie_node_ref);
560        let mut old_values = vec![];
561        node_cow.delete_subtree(
562            self.get_trie_ref(),
563            self.owned_node_set.get_ref(),
564            trie_node,
565            key_prefix,
566            &mut old_values,
567            &mut *self.db.get_mut().to_owned_read()?,
568        )?;
569
570        Ok((Some(old_values), true, None))
571    }
572
573    /// return all key/value pairs given the prefix
574    pub fn traversal(
575        mut self, key: KeyPart, key_remaining: KeyPart,
576    ) -> Result<Option<Vec<MptKeyValue>>> {
577        let node_memory_manager = self.node_memory_manager();
578        let allocator = node_memory_manager.get_allocator();
579        let mut node_cow = self.root.take();
580
581        let trie_node_ref = node_cow.get_trie_node(
582            node_memory_manager,
583            &allocator,
584            &mut *self.db.get_mut().to_owned_read()?,
585        )?;
586
587        let key_prefix: CompressedPathRaw;
588        match trie_node_ref.walk::<access_mode::Write>(key_remaining) {
589            WalkStop::ChildNotFound { .. } => return Ok(None),
590            WalkStop::Arrived => {
591                // To enumerate the subtree.
592                key_prefix = key.into();
593            }
594            WalkStop::PathDiverted {
595                key_child_index,
596                unmatched_child_index,
597                unmatched_path_remaining,
598                ..
599            } => {
600                if key_child_index.is_some() {
601                    return Ok(None);
602                }
603                // To enumerate the subtree.
604                key_prefix = CompressedPathRaw::join_connected_paths(
605                    &key,
606                    unmatched_child_index,
607                    &unmatched_path_remaining,
608                );
609            }
610            WalkStop::Descent {
611                key_remaining,
612                child_node,
613                ..
614            } => {
615                drop(trie_node_ref);
616                let values = self
617                    .new_visitor_for_subtree(child_node.clone().into())
618                    .traversal(key, key_remaining)?;
619                return Ok(values);
620            }
621        }
622
623        let trie_node = GuardedValue::take(trie_node_ref);
624        let mut values = vec![];
625        node_cow.iterate_internal(
626            self.owned_node_set.get_ref(),
627            self.get_trie_ref(),
628            trie_node,
629            key_prefix,
630            &mut values,
631            &mut *self.db.get_mut().to_owned_read()?,
632        )?;
633        Ok(Some(values))
634    }
635
636    /// return all key/value pairs given the prefix
637    pub fn traversal_with_callback(
638        mut self, key: KeyPart, key_remaining: KeyPart,
639        callback: &mut dyn FnMut(MptKeyValue), is_delta_mpt: bool,
640        only_account_key: bool,
641    ) -> Result<()> {
642        let node_memory_manager = self.node_memory_manager();
643        let allocator = node_memory_manager.get_allocator();
644        let mut node_cow = self.root.take();
645
646        let trie_node_ref = node_cow.get_trie_node(
647            node_memory_manager,
648            &allocator,
649            &mut *self.db.get_mut().to_owned_read()?,
650        )?;
651
652        let key_prefix: CompressedPathRaw;
653        match trie_node_ref.walk::<access_mode::Write>(key_remaining) {
654            WalkStop::ChildNotFound { .. } => return Ok(()),
655            WalkStop::Arrived => {
656                // To enumerate the subtree.
657                key_prefix = key.into();
658            }
659            WalkStop::PathDiverted {
660                key_child_index,
661                unmatched_child_index,
662                unmatched_path_remaining,
663                ..
664            } => {
665                if key_child_index.is_some() {
666                    return Ok(());
667                }
668                // To enumerate the subtree.
669                key_prefix = CompressedPathRaw::join_connected_paths(
670                    &key,
671                    unmatched_child_index,
672                    &unmatched_path_remaining,
673                );
674            }
675            WalkStop::Descent {
676                key_remaining,
677                child_node,
678                ..
679            } => {
680                drop(trie_node_ref);
681                self.new_visitor_for_subtree(child_node.clone().into())
682                    .traversal_with_callback(
683                        key,
684                        key_remaining,
685                        callback,
686                        is_delta_mpt,
687                        only_account_key,
688                    )?;
689                return Ok(());
690            }
691        }
692
693        let trie_node = GuardedValue::take(trie_node_ref);
694        node_cow.iterate_internal_with_callback(
695            self.owned_node_set.get_ref(),
696            self.get_trie_ref(),
697            trie_node,
698            key_prefix,
699            &mut *self.db.get_mut().to_owned_read()?,
700            callback,
701            is_delta_mpt,
702            only_account_key,
703        )?;
704        Ok(())
705    }
706
707    // In a method we visit node one or 2 times but borrow-checker prevent
708    // holding and access other fields so it's visited multiple times.
709    // FIXME: Check if we did something like this.
710    // It's correct behavior if we first
711    // access this node, recurse into children, then access it again, because
712    // the accesses in subtree and other threads may in extreme cases evict
713    // this node from cache.
714
715    // Assume that the obtained TrieNode will be set valid value (non-empty)
716    // later on.
717    /// Insert a valid value into MPT.
718    /// The visitor can only be used once to modify.
719    unsafe fn insert_checked_value(
720        mut self, key: KeyPart, value: Box<[u8]>,
721    ) -> Result<(bool, NodeRefDeltaMptCompact)> {
722        let node_memory_manager = self.node_memory_manager();
723        let allocator = node_memory_manager.get_allocator();
724        let mut node_cow = self.root.take();
725        // TODO(yz): be compliant to borrow rule and avoid duplicated
726
727        let is_owned = node_cow.is_owned();
728        // FIXME: apply db_load_counter to all places where it matters, and also
729        // FIXME: pass down to recursion. (Also check other methods.)
730        let trie_node_ref = node_cow.get_trie_node(
731            node_memory_manager,
732            &allocator,
733            &mut *self.db.get_mut().to_owned_read()?,
734        )?;
735        trace!(
736            "insert_checked_value: trie_node.path={:?} {:?}",
737            trie_node_ref.compressed_path_ref(),
738            trie_node_ref.get_compressed_path_size()
739        );
740        match trie_node_ref.walk::<access_mode::Write>(key) {
741            WalkStop::Arrived => {
742                let node_ref_changed = !is_owned;
743                let trie_node = GuardedValue::take(trie_node_ref);
744                node_cow.cow_replace_value_valid(
745                    &node_memory_manager,
746                    self.owned_node_set.get_mut(),
747                    trie_node,
748                    value,
749                )?;
750
751                Ok((node_ref_changed, node_cow.into_child().unwrap()))
752            }
753            WalkStop::Descent {
754                key_remaining,
755                child_node,
756                child_index,
757            } => {
758                drop(trie_node_ref);
759                let result = self
760                    .new_visitor_for_subtree(child_node.clone().into())
761                    .insert_checked_value(key_remaining, value);
762                if result.is_err() {
763                    node_cow.into_child();
764                    return result;
765                }
766                let (child_changed, new_child_node) = result.unwrap();
767
768                if child_changed {
769                    let node_ref_changed = !node_cow.is_owned();
770                    let trie_node =
771                        GuardedValue::take(node_cow.get_trie_node(
772                            node_memory_manager,
773                            &allocator,
774                            &mut *self.db.get_mut().to_owned_read()?,
775                        )?);
776                    node_cow
777                        .cow_modify(
778                            &allocator,
779                            self.owned_node_set.get_mut(),
780                            trie_node,
781                        )?
782                        .replace_child_unchecked(child_index, new_child_node);
783
784                    Ok((node_ref_changed, node_cow.into_child().unwrap()))
785                } else {
786                    Ok((false, node_cow.into_child().unwrap()))
787                }
788            }
789            WalkStop::PathDiverted {
790                key_child_index,
791                key_remaining,
792                matched_path,
793                unmatched_child_index,
794                unmatched_path_remaining,
795            } => {
796                // create a new node to replace self with compressed
797                // path = matched_path, modify current
798                // node with the remaining path, and attach it as child to the
799                // replacement node create a new node for
800                // insertion (if key_remaining is non-empty), set it to child,
801                // with key_remaining.
802                let (new_node_cow, new_node_entry) =
803                    CowNodeRef::new_uninitialized_node(
804                        &allocator,
805                        self.owned_node_set.get_mut(),
806                        self.trie_ref.get_mpt_id(),
807                    )?;
808                let mut new_node = MemOptimizedTrieNode::default();
809                // set compressed path.
810                new_node.set_compressed_path(matched_path);
811
812                let trie_node = GuardedValue::take(trie_node_ref);
813                node_cow.cow_set_compressed_path(
814                    &node_memory_manager,
815                    self.owned_node_set.get_mut(),
816                    unmatched_path_remaining,
817                    trie_node,
818                )?;
819
820                // It's safe because we know that this is the first child.
821                new_node.set_first_child_unchecked(
822                    unmatched_child_index,
823                    // It's safe to unwrap because we know that it's not none.
824                    node_cow.into_child().unwrap(),
825                );
826
827                // TODO(yz): remove duplicated code.
828                match key_child_index {
829                    None => {
830                        // Insert value at the current node
831                        new_node.replace_value_valid(value);
832                    }
833                    Some(child_index) => {
834                        // TODO(yz): Maybe create CowNodeRef on NULL then
835                        // cow_set_value then set path.
836                        let (child_node_cow, child_node_entry) =
837                            CowNodeRef::new_uninitialized_node(
838                                &allocator,
839                                self.owned_node_set.get_mut(),
840                                self.trie_ref.get_mpt_id(),
841                            )?;
842                        let mut new_child_node =
843                            MemOptimizedTrieNode::default();
844                        // set compressed path.
845                        new_child_node.copy_compressed_path(key_remaining);
846                        new_child_node.replace_value_valid(value);
847                        child_node_entry.insert(&new_child_node);
848
849                        // It's safe because it's guaranteed that
850                        // key_child_index != unmatched_child_index
851                        new_node.add_new_child_unchecked(
852                            child_index,
853                            // It's safe to unwrap here because it's not null
854                            // node.
855                            child_node_cow.into_child().unwrap(),
856                        );
857                    }
858                }
859                new_node_entry.insert(&new_node);
860                Ok((true, new_node_cow.into_child().unwrap()))
861            }
862            WalkStop::ChildNotFound {
863                key_remaining,
864                child_index,
865            } => {
866                // TODO(yz): Maybe create CowNodeRef on NULL then cow_set_value
867                // then set path.
868                let (child_node_cow, child_node_entry) =
869                    CowNodeRef::new_uninitialized_node(
870                        &allocator,
871                        self.owned_node_set.get_mut(),
872                        self.trie_ref.get_mpt_id(),
873                    )?;
874                let mut new_child_node = MemOptimizedTrieNode::default();
875                // set compressed path.
876                new_child_node.copy_compressed_path(key_remaining);
877                new_child_node.replace_value_valid(value);
878                child_node_entry.insert(&new_child_node);
879
880                let node_ref_changed = !is_owned;
881                let trie_node = GuardedValue::take(trie_node_ref);
882                node_cow
883                    .cow_modify(
884                        &allocator,
885                        self.owned_node_set.get_mut(),
886                        trie_node,
887                    )?
888                    .add_new_child_unchecked(
889                        child_index,
890                        child_node_cow.into_child().unwrap(),
891                    );
892
893                Ok((node_ref_changed, node_cow.into_child().unwrap()))
894            }
895        }
896    }
897
898    pub fn set(
899        self, key: KeyPart, value: Box<[u8]>,
900    ) -> Result<NodeRefDeltaMpt> {
901        TrieNodeDeltaMpt::check_key_size(key)?;
902        TrieNodeDeltaMpt::check_value_size(&value)?;
903        let new_root;
904        unsafe {
905            new_root = self.insert_checked_value(key, value)?.1;
906        }
907        Ok(new_root.into())
908    }
909}
910
911use super::{
912    super::{
913        super::utils::{access_mode, guarded_value::GuardedValue},
914        errors::*,
915        merkle_patricia_trie::{
916            merkle::*,
917            trie_proof::TrieProofNode,
918            walk::{KeyPart, TrieNodeWalkTrait, WalkStop},
919            *,
920        },
921    },
922    cow_node_ref::CowNodeRef,
923    delta_mpt_open_db_manager::ArcDeltaDbWrapper,
924    mem_optimized_trie_node::*,
925    node_memory_manager::*,
926    owned_node_set::OwnedNodeSet,
927    return_after_use::ReturnAfterUse,
928    ChildrenTableDeltaMpt, DeltaMpt, *,
929};
930use parking_lot::MutexGuard;
931use primitives::{MerkleHash, MptValue, MERKLE_NULL_NODE};
932use std::marker::PhantomData;