cfx_storage/impls/merkle_patricia_trie/
mpt_cursor.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5/// Cursor to access Snapshot Mpt.
6pub struct MptCursor<Mpt, PathNode> {
7    mpt: Option<Mpt>,
8    path_nodes: Vec<PathNode>,
9}
10
11impl<Mpt, PathNode> MptCursor<Mpt, PathNode> {
12    pub fn new(mpt: Mpt) -> Self {
13        Self {
14            mpt: Some(mpt),
15            path_nodes: vec![],
16        }
17    }
18
19    pub fn get_path_nodes(&self) -> &Vec<PathNode> { &self.path_nodes }
20
21    /// Never call this method after pop_root.
22    pub fn current_node_mut(&mut self) -> &mut PathNode {
23        self.path_nodes.last_mut().expect("Root exists in cursor")
24    }
25}
26
27impl<Mpt: GetReadMpt, PathNode: PathNodeTrait<Mpt>> MptCursor<Mpt, PathNode> {
28    pub fn load_root(&mut self) -> Result<()>
29    where Self: CursorToRootNode<Mpt, PathNode> {
30        let root_node = PathNode::load_root(self, false)?;
31        self.path_nodes.push(root_node);
32        Ok(())
33    }
34
35    pub fn to_proof(&self) -> TrieProof {
36        let mut trie_nodes = Vec::with_capacity(self.path_nodes.len());
37        for node in &self.path_nodes {
38            let trie_node = &node.get_basic_path_node().trie_node;
39            trie_nodes.push(TrieProofNode::new(
40                trie_node.get_children_merkles().map_or_else(
41                    || VanillaChildrenTable::default(),
42                    |merkle_table| merkle_table.into(),
43                ),
44                trie_node
45                    .value_as_slice()
46                    .into_option()
47                    .map(|slice| slice.into()),
48                trie_node.compressed_path_ref().into(),
49                node.get_basic_path_node().path_start_steps % 2 != 0,
50            ));
51        }
52
53        // Unwrap is fine because the TrieProof must be valid unless the Mpt is
54        // being modified.
55        TrieProof::new(trie_nodes).unwrap()
56    }
57
58    pub fn push_node(&mut self, node: PathNode) { self.path_nodes.push(node); }
59
60    /// Don't call this method for root node.
61    pub fn pop_one_node(&mut self) -> Result<()> {
62        let node = self.path_nodes.pop().unwrap();
63        let parent_node = self.path_nodes.last_mut().unwrap();
64        let mpt_taken = node.commit(parent_node)?;
65        let parent_basic_node = parent_node.get_basic_path_node_mut();
66        parent_basic_node.next_child_index += 1;
67        parent_basic_node.mpt = mpt_taken;
68
69        Ok(())
70    }
71
72    fn pop_nodes(&mut self, target_key_steps: u16) -> Result<()> {
73        // unwrap is fine because the first node is root and it will never be
74        // popped.
75        while self
76            .path_nodes
77            .last()
78            .unwrap()
79            .get_basic_path_node()
80            .path_start_steps
81            > target_key_steps
82        {
83            self.pop_one_node()?
84        }
85        Ok(())
86    }
87
88    fn pop_root(&mut self) -> Result<MerkleHash> {
89        self.path_nodes.pop().unwrap().commit_root(&mut self.mpt)
90    }
91
92    pub fn finish(&mut self) -> Result<MerkleHash> {
93        self.pop_nodes(0)?;
94        self.pop_root()
95    }
96
97    /// Pop irrelevant paths and calculate what's remaining.
98    pub fn pop_path_for_key<'k, AM: access_mode::AccessMode>(
99        &mut self, key: &'k [u8],
100    ) -> Result<CursorPopNodesTerminal<'k>> {
101        match walk::<access_mode::Write, _>(
102            key,
103            &self
104                .path_nodes
105                .last()
106                .unwrap()
107                .get_basic_path_node()
108                .full_path_to_node,
109            &MPT_CURSOR_GET_CHILD,
110        ) {
111            WalkStop::Arrived => Ok(CursorPopNodesTerminal::Arrived),
112            // The scenario of Descent is classified into ChildNotFound scenario
113            // because the checking of child_index is skipped.
114            WalkStop::Descent {
115                child_index: _,
116                key_remaining: _,
117                child_node: _,
118            } => {
119                unreachable!()
120            }
121            // It actually means to descent.
122            WalkStop::ChildNotFound {
123                child_index,
124                key_remaining,
125            } => Ok(CursorPopNodesTerminal::Descent {
126                child_index,
127                key_remaining,
128            }),
129            WalkStop::PathDiverted {
130                key_child_index,
131                key_remaining,
132                matched_path,
133                unmatched_child_index,
134                unmatched_path_remaining,
135            } => {
136                // Pop irrelevant nodes.
137                let match_stopped_steps = matched_path.path_steps();
138                self.pop_nodes(match_stopped_steps)?;
139
140                let last_node = self.path_nodes.last().unwrap();
141                let started_steps =
142                    last_node.get_basic_path_node().path_start_steps;
143                let last_trie_node = &last_node.get_basic_path_node().trie_node;
144
145                // The beginning of compressed_path is always aligned at full
146                // byte.
147                let aligned_path_start_offset = started_steps / 2;
148                if started_steps
149                    + last_trie_node.compressed_path_ref().path_steps()
150                    == match_stopped_steps
151                {
152                    if key_child_index.is_none() {
153                        // Arrived
154                        Ok(CursorPopNodesTerminal::Arrived)
155                    } else {
156                        Ok(CursorPopNodesTerminal::Descent {
157                            child_index: key_child_index.unwrap(),
158                            key_remaining,
159                        })
160                    }
161                } else {
162                    // PathDiverted
163                    if AM::READ_ONLY {
164                        Ok(CursorPopNodesTerminal::PathDiverted(
165                            WalkStop::path_diverted_uninitialized(),
166                        ))
167                    } else {
168                        let original_compressed_path_ref =
169                            last_trie_node.compressed_path_ref();
170                        let original_compressed_path_ref_mask =
171                            original_compressed_path_ref.path_mask();
172                        let actual_matched_path = CompressedPathRaw::new(
173                            &matched_path.path_slice()
174                                [aligned_path_start_offset as usize..],
175                            CompressedPathRaw::set_second_nibble(
176                                matched_path.path_mask(),
177                                CompressedPathRaw::second_nibble(
178                                    original_compressed_path_ref_mask,
179                                ),
180                            ),
181                        );
182                        let actual_unmatched_path_remaining =
183                            CompressedPathRaw::new_and_apply_mask(
184                                &unmatched_path_remaining.path_slice()[0
185                                    ..(original_compressed_path_ref.path_size()
186                                        - actual_matched_path.path_size())
187                                        as usize],
188                                CompressedPathRaw::set_second_nibble(
189                                    original_compressed_path_ref_mask,
190                                    CompressedPathRaw::second_nibble(
191                                        unmatched_path_remaining.path_mask(),
192                                    ),
193                                ),
194                            );
195
196                        Ok(CursorPopNodesTerminal::PathDiverted(
197                            WalkStop::PathDiverted {
198                                key_child_index,
199                                key_remaining,
200                                matched_path: actual_matched_path,
201                                unmatched_child_index,
202                                unmatched_path_remaining:
203                                    actual_unmatched_path_remaining,
204                            },
205                        ))
206                    }
207                }
208            }
209        }
210    }
211
212    pub fn open_path_for_key<'k, AM: access_mode::AccessMode>(
213        &mut self, key: &'k [u8],
214    ) -> Result<CursorOpenPathTerminal<'k>> {
215        // The access_mode is Write here because we need the diverted path
216        // information.
217        match self.pop_path_for_key::<access_mode::Write>(key) {
218            Err(e) => Err(e),
219            Ok(CursorPopNodesTerminal::Arrived) => {
220                Ok(CursorOpenPathTerminal::Arrived)
221            }
222            Ok(CursorPopNodesTerminal::PathDiverted(path_diverted)) => {
223                Ok(CursorOpenPathTerminal::PathDiverted(path_diverted))
224            }
225            Ok(CursorPopNodesTerminal::Descent {
226                mut key_remaining,
227                mut child_index,
228            }) => {
229                loop {
230                    let new_node = match self
231                        .path_nodes
232                        .last_mut()
233                        .unwrap()
234                        .open_child_index(child_index)?
235                    {
236                        Some(node) => node,
237                        None => {
238                            return Ok(CursorOpenPathTerminal::ChildNotFound {
239                                key_remaining,
240                                child_index,
241                            })
242                        }
243                    };
244
245                    let next_step = walk::<AM, _>(
246                        key_remaining.path_slice,
247                        &new_node
248                            .get_basic_path_node()
249                            .trie_node
250                            .compressed_path_ref(),
251                        &MPT_CURSOR_GET_CHILD,
252                    );
253                    match &next_step {
254                        WalkStop::Arrived => {
255                            self.path_nodes.push(new_node);
256                            return Ok(CursorOpenPathTerminal::Arrived);
257                        }
258                        // The scenario of Descent is classified into
259                        // ChildNotFound scenario because the checking of
260                        // child_index is skipped.
261                        WalkStop::Descent { .. } => {
262                            unreachable!()
263                        }
264                        // It actually means to descent.
265                        WalkStop::ChildNotFound {
266                            child_index: new_child_index,
267                            key_remaining: new_key_remaining,
268                        } => {
269                            self.path_nodes.push(new_node);
270                            child_index = *new_child_index;
271                            key_remaining = new_key_remaining.clone();
272                            continue;
273                        }
274                        WalkStop::PathDiverted { .. } => {
275                            self.path_nodes.push(new_node);
276                            // Leave the match to save the path_diverted
277                            // information, then break the loop to finally
278                            // work on the expanding of compressed path.
279                        }
280                    }
281                    return Ok(CursorOpenPathTerminal::PathDiverted(next_step));
282                }
283            }
284        }
285    }
286}
287
288pub struct MptCursorRw<Mpt, RwPathNode> {
289    cursor: MptCursor<Mpt, RwPathNode>,
290
291    io_error: Cell<bool>,
292}
293
294impl<Mpt, RwPathNode> Deref for MptCursorRw<Mpt, RwPathNode> {
295    type Target = MptCursor<Mpt, RwPathNode>;
296
297    fn deref(&self) -> &Self::Target { &self.cursor }
298}
299
300impl<Mpt, RwPathNode> DerefMut for MptCursorRw<Mpt, RwPathNode> {
301    fn deref_mut(&mut self) -> &mut Self::Target { &mut self.cursor }
302}
303
304impl<Mpt: GetRwMpt, PathNode: RwPathNodeTrait<Mpt>> MptCursorRw<Mpt, PathNode> {
305    pub fn new(mpt: Mpt) -> Self {
306        Self {
307            cursor: MptCursor::new(mpt),
308            io_error: Cell::new(false),
309        }
310    }
311
312    pub fn load_root(
313        &mut self, in_reconstruct_snapshot_state: bool,
314    ) -> Result<()>
315    where Self: CursorToRootNode<Mpt, PathNode> {
316        let root_node =
317            PathNode::load_root(self, in_reconstruct_snapshot_state)?;
318        self.path_nodes.push(root_node);
319        Ok(())
320    }
321
322    pub fn insert(&mut self, key: &[u8], value: Box<[u8]>) -> Result<()> {
323        match self.open_path_for_key::<access_mode::Write>(key)? {
324            CursorOpenPathTerminal::Arrived => {
325                self.path_nodes
326                    .last_mut()
327                    .unwrap()
328                    .replace_value_valid(value);
329            }
330            CursorOpenPathTerminal::ChildNotFound {
331                child_index,
332                key_remaining,
333            } => {
334                // Create a new node for child_index, key_remaining
335                // and value.
336                let value_len = value.len();
337                let parent_node = self.path_nodes.last_mut().unwrap();
338                unsafe {
339                    parent_node
340                        .get_read_write_path_node()
341                        .trie_node
342                        .add_new_child_unchecked(
343                            child_index,
344                            &SubtreeMerkleWithSize::default(),
345                        );
346                }
347                parent_node
348                    .get_read_write_path_node()
349                    .skip_till_child_index(child_index)?;
350                parent_node.get_read_write_path_node().next_child_index =
351                    child_index;
352                let in_reconstruct_snapshot_state =
353                    parent_node.in_reconstruct_snapshot_state();
354                let new_node = PathNode::new(
355                    BasicPathNode::new(
356                        SnapshotMptNode(VanillaTrieNode::new(
357                            MERKLE_NULL_NODE,
358                            Default::default(),
359                            Some(value),
360                            key_remaining.into(),
361                        )),
362                        parent_node.take_mpt(),
363                        &parent_node
364                            .get_read_write_path_node()
365                            .full_path_to_node,
366                        child_index,
367                        in_reconstruct_snapshot_state,
368                    ),
369                    parent_node,
370                    value_len,
371                );
372                self.path_nodes.push(new_node);
373            }
374            CursorOpenPathTerminal::PathDiverted(WalkStop::PathDiverted {
375                key_child_index,
376                key_remaining,
377                matched_path,
378                unmatched_child_index,
379                unmatched_path_remaining,
380            }) => {
381                let last_node = self.path_nodes.pop().unwrap();
382                let parent_node = self.path_nodes.last_mut().unwrap();
383
384                let value_len = value.len();
385                let insert_value_at_fork = key_child_index.is_none();
386                let mut last_node_as_child = SubtreeMerkleWithSize::default();
387                // Set the size of the original Snapshot mpt. When the update in
388                // the subtree of last node is finished, The new mpt's subtree
389                // size is computed from the subtree_size_delta, and the
390                // subtree_size_delta is propagated upward.
391                last_node_as_child.subtree_size = (last_node
392                    .get_read_only_path_node()
393                    .trie_node
394                    .subtree_size(
395                        last_node.get_read_only_path_node().get_path_to_node(),
396                    ) as i64
397                    - last_node.get_read_only_path_node().subtree_size_delta)
398                    as u64;
399                let mut fork_node = PathNode::new(
400                    BasicPathNode::new(
401                        SnapshotMptNode(VanillaTrieNode::new(
402                            MERKLE_NULL_NODE,
403                            VanillaChildrenTable::new_from_one_child(
404                                unmatched_child_index,
405                                &last_node_as_child,
406                            ),
407                            // The value isn't set when insert_value_at_fork
408                            // because the compiler
409                            // wants to make sure value isn't moved in the
410                            // Some() match branch below.
411                            None,
412                            matched_path,
413                        )),
414                        None,
415                        &parent_node.get_basic_path_node().full_path_to_node,
416                        parent_node.get_basic_path_node().next_child_index,
417                        parent_node.in_reconstruct_snapshot_state(),
418                    ),
419                    parent_node,
420                    if insert_value_at_fork { value_len } else { 0 },
421                );
422
423                // "delete" last node when necessary, and create a new node for
424                // the unmatched path.
425                let mut unmatched_child_node = last_node
426                    .unmatched_child_node_for_path_diversion(
427                        CompressedPathRaw::extend_path(
428                            &fork_node.get_basic_path_node().full_path_to_node,
429                            unmatched_child_index,
430                        ),
431                        unmatched_path_remaining,
432                    )?;
433
434                // Unmatched path on the right hand side.
435                if key_child_index.is_none()
436                    || key_child_index.unwrap() < unmatched_child_index
437                {
438                    // The unmatched subtree is not yet opened for operations,
439                    // so it's kept under maybe_compressed_path_split_child_*
440                    // fields of the fork node and will be processed later.
441                    fork_node.get_basic_path_node_mut().mpt =
442                        unmatched_child_node.take_mpt();
443                    fork_node
444                        .get_read_write_path_node()
445                        .maybe_compressed_path_split_child_index =
446                        unmatched_child_index;
447                    fork_node
448                        .get_read_write_path_node()
449                        .maybe_compressed_path_split_child_node =
450                        Some(Box::new(unmatched_child_node));
451                } else {
452                    // Path forked on the right side, the update in the subtree
453                    // of last_node has finished.
454                    // Commit last_node with parent as fork_node.
455                    fork_node.get_read_write_path_node().next_child_index =
456                        unmatched_child_index;
457                    fork_node.get_read_write_path_node().mpt =
458                        unmatched_child_node
459                            .commit(fork_node.get_read_write_path_node())?;
460                }
461                match key_child_index {
462                    Some(child_index) => unsafe {
463                        // Move on to the next child.
464                        fork_node.get_read_write_path_node().next_child_index =
465                            child_index;
466                        fork_node
467                            .get_read_write_path_node()
468                            .trie_node
469                            .add_new_child_unchecked(
470                                child_index,
471                                &SubtreeMerkleWithSize::default(),
472                            );
473
474                        let value_node = PathNode::new(
475                            BasicPathNode::new(
476                                SnapshotMptNode(VanillaTrieNode::new(
477                                    MERKLE_NULL_NODE,
478                                    Default::default(),
479                                    Some(value),
480                                    key_remaining.into(),
481                                )),
482                                fork_node.take_mpt(),
483                                &fork_node
484                                    .get_basic_path_node()
485                                    .full_path_to_node,
486                                child_index,
487                                parent_node.in_reconstruct_snapshot_state(),
488                            ),
489                            &fork_node,
490                            value_len,
491                        );
492
493                        self.path_nodes.push(fork_node);
494                        self.path_nodes.push(value_node);
495                    },
496                    None => {
497                        fork_node
498                            .get_read_write_path_node()
499                            .trie_node
500                            .replace_value_valid(value);
501
502                        self.path_nodes.push(fork_node);
503                    }
504                }
505            }
506            _ => {
507                unreachable!()
508            }
509        }
510
511        Ok(())
512    }
513
514    pub fn delete(&mut self, key: &[u8]) -> Result<()> {
515        match self.open_path_for_key::<access_mode::Read>(key)? {
516            CursorOpenPathTerminal::Arrived => {
517                let last_node = self.path_nodes.last_mut().unwrap();
518                if last_node.get_read_only_path_node().trie_node.has_value() {
519                    last_node.delete_value_assumed_existence();
520                // The action here is as simple as delete the value. It
521                // seems tricky if there is no children left, or if
522                // there is only one children left. Actually, the
523                // deletion of a node value happens before all subtree
524                // actions. The skip_till_child_index method will take
525                // care of all children, the committing will take care
526                // of path merge and node deletion.
527                } else {
528                    warn!(
529                        "In MptCursorRw, non-existing key {:?} is asked to be deleted.",
530                        key.to_hex::<String>());
531                }
532            }
533            CursorOpenPathTerminal::PathDiverted(_) => {
534                warn!(
535                    "In MptCursorRw, non-existing key {:?} is asked to be deleted.",
536                    key.to_hex::<String>());
537            }
538            CursorOpenPathTerminal::ChildNotFound { .. } => {
539                warn!(
540                    "In MptCursorRw, non-existing key {:?} is asked to be deleted.",
541                    key.to_hex::<String>());
542            }
543        }
544        Ok(())
545    }
546
547    fn copy_subtree_without_root(
548        subtree_root: &mut ReadWritePathNode<Mpt>,
549    ) -> Result<()> {
550        let (dest_mpt, source_mpt) = subtree_root
551            .basic_node
552            .mpt
553            .as_mut_assumed_owner()
554            .get_write_and_read_mpt();
555        let mut iter =
556            match source_mpt.unwrap().iterate_subtree_trie_nodes_without_root(
557                &subtree_root.basic_node.full_path_to_node,
558            ) {
559                Err(e) => {
560                    subtree_root.has_io_error.set_has_io_error();
561                    bail!(e);
562                }
563                Ok(iter) => iter,
564            };
565        loop {
566            if let Some((path, snapshot_mpt_node)) = match iter.next() {
567                Err(e) => {
568                    subtree_root.has_io_error.set_has_io_error();
569                    bail!(e);
570                }
571                Ok(item) => item,
572            } {
573                let result = dest_mpt.write_node(&path, &snapshot_mpt_node);
574                if result.is_err() {
575                    subtree_root.has_io_error.set_has_io_error();
576                    return result;
577                }
578                continue;
579            }
580            break;
581        }
582        Ok(())
583    }
584}
585
586pub trait CursorLoadNodeWrapper<Mpt>: TakeMpt<Mpt> {
587    fn load_node_wrapper<'a>(
588        &self, mpt: &mut Mpt, path: &CompressedPathRaw,
589    ) -> Result<SnapshotMptNode>;
590}
591
592impl<Mpt: GetReadMpt> CursorLoadNodeWrapper<Mpt> for BasicPathNode<Mpt> {
593    fn load_node_wrapper<'a>(
594        &self, mpt: &mut Mpt, path: &CompressedPathRaw,
595    ) -> Result<SnapshotMptNode> {
596        mpt.get_read_mpt()
597            .load_node(path)?
598            .ok_or(Error::from(Error::SnapshotMPTTrieNodeNotFound))
599    }
600}
601
602impl<Mpt: GetReadMpt, PathNode> CursorLoadNodeWrapper<Mpt>
603    for MptCursor<Mpt, PathNode>
604{
605    fn load_node_wrapper(
606        &self, mpt: &mut Mpt, path: &CompressedPathRaw,
607    ) -> Result<SnapshotMptNode> {
608        mpt.get_read_mpt()
609            .load_node(path)?
610            .ok_or(Error::from(Error::SnapshotMPTTrieNodeNotFound))
611    }
612}
613
614/// General implementation for ReadWrite path nodes and cursor.
615impl<Mpt: GetReadMpt, T: CursorSetIoError + TakeMpt<Mpt>>
616    CursorLoadNodeWrapper<Mpt> for T
617{
618    fn load_node_wrapper(
619        &self, mpt: &mut Mpt, path: &CompressedPathRaw,
620    ) -> Result<SnapshotMptNode> {
621        match mpt.get_read_mpt().load_node(path) {
622            Err(e) => {
623                self.set_has_io_error();
624
625                Err(e)
626            }
627            Ok(Some(node)) => Ok(node),
628            Ok(None) => {
629                self.set_has_io_error();
630
631                Err(Error::from(Error::SnapshotMPTTrieNodeNotFound))
632            }
633        }
634    }
635}
636
637pub trait CursorToRootNode<Mpt: GetReadMpt, PathNode: PathNodeTrait<Mpt>> {
638    fn new_root(
639        &self, basic_node: BasicPathNode<Mpt>, mpt_is_empty: bool,
640    ) -> PathNode;
641}
642
643impl<Mpt: GetReadMpt, Cursor: CursorLoadNodeWrapper<Mpt>>
644    CursorToRootNode<Mpt, BasicPathNode<Mpt>> for Cursor
645{
646    fn new_root(
647        &self, basic_node: BasicPathNode<Mpt>, _mpt_is_empty: bool,
648    ) -> BasicPathNode<Mpt> {
649        basic_node
650    }
651}
652
653impl<Mpt: GetRwMpt, Cursor: CursorLoadNodeWrapper<Mpt> + CursorSetIoError>
654    CursorToRootNode<Mpt, ReadWritePathNode<Mpt>> for Cursor
655{
656    fn new_root(
657        &self, basic_node: BasicPathNode<Mpt>, mpt_is_empty: bool,
658    ) -> ReadWritePathNode<Mpt> {
659        ReadWritePathNode {
660            basic_node,
661            is_loaded: !mpt_is_empty,
662            maybe_first_realized_child_index:
663                ReadWritePathNode::<Mpt>::NULL_CHILD_INDEX,
664            the_first_child_if_pending: None,
665            maybe_compressed_path_split_child_index:
666                ReadWritePathNode::<Mpt>::NULL_CHILD_INDEX,
667            maybe_compressed_path_split_child_node: None,
668            subtree_size_delta: 0,
669            delta_subtree_size: 0,
670            has_io_error: self.io_error(),
671            db_committed: false,
672        }
673    }
674}
675
676pub struct BasicPathNode<Mpt> {
677    pub mpt: Option<Mpt>,
678
679    pub trie_node: SnapshotMptNode,
680
681    /// The fields below are necessary for loading the requested key / values.
682    // path_start_steps is changed when compressed path changes happen, but it
683    // doesn't matter because it only happens to node removed from current MPT
684    // path.
685    path_start_steps: u16,
686    // full_path_to_node is changed when combining compressed path.
687    // But changes doesn't matter because when it happens it's already
688    // popped out from current MPT path.
689    full_path_to_node: CompressedPathRaw,
690    // The path_db_key changes when breaking a compressed path.
691    // The path_db_key must be corrected before writing out the node.
692    path_db_key: CompressedPathRaw,
693
694    /// The next child index to look into.
695    pub next_child_index: u8,
696
697    pub in_reconstruct_snapshot_state: bool,
698}
699
700impl<Mpt> BasicPathNode<Mpt> {
701    fn new(
702        trie_node: SnapshotMptNode, mpt: Option<Mpt>,
703        parent_path: &CompressedPathRaw, child_index: u8,
704        in_reconstruct_snapshot_state: bool,
705    ) -> Self {
706        let full_path_to_node = CompressedPathRaw::join_connected_paths(
707            parent_path,
708            child_index,
709            &trie_node.compressed_path_ref(),
710        );
711
712        let path_db_key =
713            CompressedPathRaw::extend_path(parent_path, child_index);
714        Self {
715            mpt,
716            trie_node,
717            path_start_steps: path_db_key.path_steps(),
718            full_path_to_node,
719            path_db_key,
720            next_child_index: 0,
721            in_reconstruct_snapshot_state,
722        }
723    }
724
725    pub fn get_path_to_node(&self) -> &CompressedPathRaw {
726        &self.full_path_to_node
727    }
728}
729
730impl<Mpt> Deref for BasicPathNode<Mpt> {
731    type Target = SnapshotMptNode;
732
733    fn deref(&self) -> &Self::Target { &self.trie_node }
734}
735
736pub struct ReadWritePathNode<Mpt> {
737    basic_node: BasicPathNode<Mpt>,
738
739    is_loaded: bool,
740
741    /// When the node has only one child and no value after operations in its
742    /// subtree, the node should be combined with its child. We keep the
743    /// child index to join the path.
744    ///
745    /// The node has only one child <==>
746    /// maybe_first_realized_child_index != NULL_CHILD_INDEX &&
747    /// the_first_child_if_pending.is_some()
748    ///
749    /// These two fields are maintained based on the concluded subtree so far.
750    maybe_first_realized_child_index: u8,
751    the_first_child_if_pending: Option<Box<ReadWritePathNode<Mpt>>>,
752
753    /// If the node is created by breaking compressed_path of a node in the
754    /// original mpt, a new child node is created for the remaining part of the
755    /// compressed path with the original children table. We must keep the
756    /// new child, since we may recurse into the subtree of the new child
757    /// later.
758    maybe_compressed_path_split_child_index: u8,
759    maybe_compressed_path_split_child_node: Option<Box<ReadWritePathNode<Mpt>>>,
760
761    /// For SnapshotMpt
762    subtree_size_delta: i64,
763    /// For DeltaMpt which is the difference between current snapshot and the
764    /// parent snapshots.
765    delta_subtree_size: u64,
766
767    has_io_error: *const Cell<bool>,
768    db_committed: bool,
769}
770
771impl<Mpt> Deref for ReadWritePathNode<Mpt> {
772    type Target = BasicPathNode<Mpt>;
773
774    fn deref(&self) -> &Self::Target { &self.basic_node }
775}
776
777impl<Mpt> DerefMut for ReadWritePathNode<Mpt> {
778    fn deref_mut(&mut self) -> &mut Self::Target { &mut self.basic_node }
779}
780
781#[allow(drop_bounds)]
782pub trait PathNodeTrait<Mpt: GetReadMpt>:
783    CursorLoadNodeWrapper<Mpt> + Drop + Sized
784{
785    fn new_loaded(basic_node: BasicPathNode<Mpt>, parent_node: &Self) -> Self;
786
787    /// A default no-op implementation to suppress compiler warnings
788    /// about "patterns aren't allowed in methods without bodies"
789    /// https://github.com/rust-lang/rust/issues/35203
790    fn commit(mut self, _parent_node: &mut Self) -> Result<Option<Mpt>> {
791        Ok(self.take_mpt())
792    }
793
794    /// A default no-op implementation to suppress compiler warnings
795    /// about "patterns aren't allowed in methods without bodies"
796    /// https://github.com/rust-lang/rust/issues/35203
797    fn commit_root(
798        mut self, mpt_taken: &mut Option<Mpt>,
799    ) -> Result<MerkleHash> {
800        *mpt_taken = self.take_mpt();
801        Ok(self.get_basic_path_node().trie_node.get_merkle().clone())
802    }
803
804    fn get_basic_path_node(&self) -> &BasicPathNode<Mpt>;
805
806    fn get_basic_path_node_mut(&mut self) -> &mut BasicPathNode<Mpt>;
807
808    fn load_root<
809        Cursor: CursorLoadNodeWrapper<Mpt> + CursorToRootNode<Mpt, Self>,
810    >(
811        cursor: &mut Cursor, in_reconstruct_snapshot_state: bool,
812    ) -> Result<Self> {
813        let mut mpt = cursor.take_mpt();
814        // Special case for Genesis snapshot, where the mpt is an non-existence
815        // db, to which the load_node_wrapper call fails.
816        let mpt_is_empty;
817        let root_trie_node = match cursor.load_node_wrapper(
818            mpt.as_mut_assumed_owner(),
819            &CompressedPathRaw::default(),
820        ) {
821            Ok(node) => {
822                mpt_is_empty = false;
823
824                node
825            }
826            Err(e) => match e {
827                Error::SnapshotMPTTrieNodeNotFound => {
828                    mpt_is_empty = true;
829
830                    SnapshotMptNode(VanillaTrieNode::new(
831                        MERKLE_NULL_NODE,
832                        Default::default(),
833                        None,
834                        CompressedPathRaw::default(),
835                    ))
836                }
837                _ => {
838                    bail!(e);
839                }
840            },
841        };
842
843        Ok(cursor.new_root(
844            BasicPathNode {
845                mpt,
846                trie_node: root_trie_node,
847                path_start_steps: 0,
848                full_path_to_node: Default::default(),
849                path_db_key: Default::default(),
850                next_child_index: 0,
851                in_reconstruct_snapshot_state,
852            },
853            mpt_is_empty,
854        ))
855    }
856
857    fn load_into(
858        parent_node: &Self, mpt: &mut Option<Mpt>, node_child_index: u8,
859        supposed_merkle_root: &MerkleHash, in_reconstruct_snapshot_state: bool,
860    ) -> Result<Self> {
861        let parent_path = &parent_node.get_basic_path_node().full_path_to_node;
862
863        let path_db_key =
864            CompressedPathRaw::extend_path(parent_path, node_child_index);
865
866        let trie_node = parent_node
867            .load_node_wrapper(mpt.as_mut().unwrap(), &path_db_key)?;
868
869        if in_reconstruct_snapshot_state {
870            if trie_node.get_merkle() != supposed_merkle_root {
871                warn!("loaded trie node merkle hash {:?} != supposed merkle hash {:?}, path_db_key={:?}",
872                trie_node.get_merkle(),
873                supposed_merkle_root,
874                path_db_key,);
875            }
876        } else {
877            assert_eq!(
878                trie_node.get_merkle(),
879                supposed_merkle_root,
880                "loaded trie node merkle hash {:?} != supposed merkle hash {:?}, path_db_key={:?}",
881                trie_node.get_merkle(),
882                supposed_merkle_root,
883                path_db_key,
884            );
885        }
886
887        let full_path_to_node = CompressedPathRaw::join_connected_paths(
888            parent_path,
889            node_child_index,
890            &trie_node.compressed_path_ref(),
891        );
892
893        Ok(Self::new_loaded(
894            BasicPathNode {
895                mpt: mpt.take(),
896                trie_node,
897                path_start_steps: path_db_key.path_steps(),
898                full_path_to_node,
899                path_db_key,
900                next_child_index: 0,
901                in_reconstruct_snapshot_state,
902            },
903            parent_node,
904        ))
905    }
906
907    fn open_child_index(&mut self, child_index: u8) -> Result<Option<Self>>;
908
909    fn in_reconstruct_snapshot_state(&self) -> bool;
910}
911
912// TODO: What's the value of RwPathNodeTrait? It seems sufficient now to have
913// only ReadWritePathNode. We may have a chance to move out subtree_size related
914// fields from ReadWritePathNode and create a new SnapshotMptRWPathNode to for
915// maintenance of subtree size.
916pub trait RwPathNodeTrait<Mpt: GetRwMpt>: PathNodeTrait<Mpt> {
917    fn new(
918        basic_node: BasicPathNode<Mpt>, parent_node: &Self, value_size: usize,
919    ) -> Self {
920        let mut this_node = Self::new_loaded(basic_node, parent_node);
921        this_node.get_read_write_path_node().is_loaded = false;
922
923        if value_size > 0 {
924            let key_size = this_node
925                .get_basic_path_node()
926                .full_path_to_node
927                .path_size();
928            let new_key_value_rlp_size =
929                rlp_key_value_len(key_size, value_size);
930            this_node.get_read_write_path_node().subtree_size_delta =
931                new_key_value_rlp_size as i64;
932            this_node.get_read_write_path_node().delta_subtree_size =
933                new_key_value_rlp_size;
934        }
935
936        this_node
937    }
938
939    fn get_read_write_path_node(&mut self) -> &mut ReadWritePathNode<Mpt>;
940
941    fn get_read_only_path_node(&self) -> &ReadWritePathNode<Mpt>;
942
943    fn unmatched_child_node_for_path_diversion(
944        self, new_path_db_key: CompressedPathRaw,
945        new_compressed_path: CompressedPathRaw,
946    ) -> Result<ReadWritePathNode<Mpt>>;
947
948    fn replace_value_valid(&mut self, value: Box<[u8]>);
949
950    fn delete_value_assumed_existence(&mut self);
951}
952
953impl<Mpt: GetReadMpt> PathNodeTrait<Mpt> for BasicPathNode<Mpt> {
954    fn new_loaded(basic_node: BasicPathNode<Mpt>, _parent_node: &Self) -> Self {
955        basic_node
956    }
957
958    fn get_basic_path_node(&self) -> &BasicPathNode<Mpt> { self }
959
960    fn get_basic_path_node_mut(&mut self) -> &mut BasicPathNode<Mpt> { self }
961
962    fn open_child_index(&mut self, child_index: u8) -> Result<Option<Self>> {
963        self.next_child_index = child_index;
964
965        match self
966            .trie_node
967            .get_children_table_ref()
968            .get_child(child_index)
969        {
970            None => Ok(None),
971
972            Some(&SubtreeMerkleWithSize {
973                merkle: ref supposed_merkle_hash,
974                ..
975            }) => {
976                let mut mpt_taken = self.mpt.take();
977                Ok(Some(Self::load_into(
978                    self,
979                    &mut mpt_taken,
980                    child_index,
981                    supposed_merkle_hash,
982                    self.in_reconstruct_snapshot_state,
983                )?))
984            }
985        }
986    }
987
988    fn in_reconstruct_snapshot_state(&self) -> bool {
989        self.in_reconstruct_snapshot_state
990    }
991}
992
993impl<Mpt: GetRwMpt> PathNodeTrait<Mpt> for ReadWritePathNode<Mpt> {
994    fn new_loaded(basic_node: BasicPathNode<Mpt>, parent_node: &Self) -> Self {
995        Self {
996            basic_node,
997            is_loaded: true,
998            maybe_first_realized_child_index: Self::NULL_CHILD_INDEX,
999            the_first_child_if_pending: None,
1000            maybe_compressed_path_split_child_index: Self::NULL_CHILD_INDEX,
1001            maybe_compressed_path_split_child_node: None,
1002            subtree_size_delta: 0,
1003            delta_subtree_size: 0,
1004            has_io_error: parent_node.has_io_error,
1005            db_committed: false,
1006        }
1007    }
1008
1009    fn get_basic_path_node(&self) -> &BasicPathNode<Mpt> { &self.basic_node }
1010
1011    fn get_basic_path_node_mut(&mut self) -> &mut BasicPathNode<Mpt> {
1012        &mut self.basic_node
1013    }
1014
1015    fn commit(mut self, parent: &mut Self) -> Result<Option<Mpt>> {
1016        self.skip_till_child_index(CHILDREN_COUNT as u8)?;
1017        if !self.is_node_empty() {
1018            let mut maybe_pending_child =
1019                self.the_first_child_if_pending.take();
1020            if let Some(pending_child) = maybe_pending_child.as_mut() {
1021                // Handle path compression. Move the VanillaTrieNode from child
1022                // into self with appropriate modifications.
1023                if !self.trie_node.has_value()
1024                    && self.trie_node.get_children_count() == 1
1025                {
1026                    // Since the current trie node is empty, we
1027                    // update the child_node and replace current trie_node
1028                    // with it.
1029                    //
1030                    // The subtree size isn't affected.
1031                    let child_trie_node = &mut pending_child.trie_node;
1032                    let new_path = CompressedPathRaw::join_connected_paths(
1033                        &self.trie_node.compressed_path_ref(),
1034                        self.maybe_first_realized_child_index,
1035                        &child_trie_node.compressed_path_ref(),
1036                    );
1037
1038                    child_trie_node.set_compressed_path(new_path);
1039
1040                    self.trie_node =
1041                        mem::replace(child_trie_node, Default::default());
1042                }
1043                // Write out the child, reset to empty in case of path
1044                // compression, or write out without change.
1045                Self::write_out_pending_child(
1046                    &mut self.mpt,
1047                    &mut maybe_pending_child,
1048                )?;
1049            }
1050            self.compute_merkle(self.path_start_steps % 2 != 0);
1051        }
1052
1053        parent.set_concluded_child(self)
1054    }
1055
1056    fn commit_root(
1057        mut self, mpt_taken: &mut Option<Mpt>,
1058    ) -> Result<MerkleHash> {
1059        self.skip_till_child_index(CHILDREN_COUNT as u8)?;
1060        if self.is_node_empty() {
1061            *mpt_taken = self.write_out()?;
1062            Ok(MERKLE_NULL_NODE)
1063        } else {
1064            Self::write_out_pending_child(
1065                &mut self.basic_node.mpt,
1066                &mut self.the_first_child_if_pending,
1067            )?;
1068            let merkle = self
1069                .compute_merkle(/* path_without_first_nibble = */ false);
1070            *mpt_taken = self.write_out()?;
1071            Ok(merkle)
1072        }
1073    }
1074
1075    fn open_child_index(&mut self, child_index: u8) -> Result<Option<Self>> {
1076        self.skip_till_child_index(child_index)?;
1077
1078        match Self::open_maybe_split_compressed_path_node(
1079            self.maybe_compressed_path_split_child_index,
1080            &mut self.maybe_compressed_path_split_child_node,
1081            child_index,
1082        )? {
1083            Some(mut node) => {
1084                self.next_child_index = child_index;
1085                node.mpt = self.take_mpt();
1086
1087                Ok(Some(*node))
1088            }
1089            None => match self.basic_node.open_child_index(child_index) {
1090                Err(e) => Err(e),
1091                Ok(None) => Ok(None),
1092                Ok(Some(new_basic_node)) => {
1093                    Ok(Some(Self::new_loaded(new_basic_node, self)))
1094                }
1095            },
1096        }
1097    }
1098
1099    fn in_reconstruct_snapshot_state(&self) -> bool {
1100        self.in_reconstruct_snapshot_state
1101    }
1102}
1103
1104impl<Mpt: GetRwMpt> RwPathNodeTrait<Mpt> for ReadWritePathNode<Mpt> {
1105    fn get_read_write_path_node(&mut self) -> &mut ReadWritePathNode<Mpt> {
1106        self
1107    }
1108
1109    fn get_read_only_path_node(&self) -> &ReadWritePathNode<Mpt> { self }
1110
1111    /// When a node's compressed path is split under PathDiverted insertion, we
1112    /// must "remove" the original node, create a new node for the
1113    /// unmatched_path and children_table.
1114    fn unmatched_child_node_for_path_diversion(
1115        mut self, new_path_db_key: CompressedPathRaw,
1116        new_compressed_path: CompressedPathRaw,
1117    ) -> Result<Self> {
1118        let mut child_node;
1119        if self.is_loaded {
1120            child_node = Self {
1121                basic_node: BasicPathNode {
1122                    mpt: None,
1123                    trie_node: Default::default(),
1124                    path_start_steps: new_path_db_key.path_steps(),
1125                    full_path_to_node: self.full_path_to_node.clone(),
1126                    path_db_key: new_path_db_key,
1127                    next_child_index: self.next_child_index,
1128                    in_reconstruct_snapshot_state: self
1129                        .in_reconstruct_snapshot_state,
1130                },
1131                is_loaded: false,
1132                maybe_first_realized_child_index: self
1133                    .maybe_first_realized_child_index,
1134                the_first_child_if_pending: self
1135                    .the_first_child_if_pending
1136                    .take(),
1137                maybe_compressed_path_split_child_index: self
1138                    .maybe_compressed_path_split_child_index,
1139                maybe_compressed_path_split_child_node: self
1140                    .maybe_compressed_path_split_child_node
1141                    .take(),
1142                subtree_size_delta: self.subtree_size_delta,
1143                delta_subtree_size: self.delta_subtree_size,
1144                has_io_error: self.has_io_error,
1145                db_committed: self.db_committed,
1146            };
1147
1148            mem::swap(&mut child_node.trie_node, &mut self.trie_node);
1149            child_node.mpt = self.write_out()?;
1150        } else {
1151            self.get_basic_path_node_mut().path_start_steps =
1152                new_path_db_key.path_steps();
1153            self.get_basic_path_node_mut().path_db_key = new_path_db_key;
1154            child_node = self;
1155        };
1156
1157        child_node
1158            .trie_node
1159            .set_compressed_path(new_compressed_path);
1160
1161        Ok(child_node)
1162    }
1163
1164    fn replace_value_valid(&mut self, value: Box<[u8]>) {
1165        let key_len = self.full_path_to_node.path_size();
1166        let mut size_delta = rlp_key_value_len(key_len, value.len()) as i64;
1167        let maybe_old_value = self.trie_node.replace_value_valid(value);
1168        match maybe_old_value {
1169            MptValue::None => {
1170                // No-op
1171            }
1172            MptValue::TombStone => {
1173                // There is no TombStone in Snapshot MPT.
1174                unreachable!()
1175            }
1176            MptValue::Some(old_value) => {
1177                size_delta -= rlp_key_value_len(key_len, old_value.len()) as i64
1178            }
1179        }
1180
1181        self.subtree_size_delta += size_delta;
1182    }
1183
1184    fn delete_value_assumed_existence(&mut self) {
1185        let old_value = unsafe { self.trie_node.delete_value_unchecked() };
1186        let key_size = self.full_path_to_node.path_size();
1187        self.subtree_size_delta -=
1188            rlp_key_value_len(key_size, old_value.len()) as i64;
1189        // The "delta" for marked deletion is considered (key, "").
1190        self.delta_subtree_size += rlp_key_value_len(key_size, 0);
1191    }
1192}
1193
1194impl<Mpt> ReadWritePathNode<Mpt> {
1195    /// Initial value for `self.first_realized_child_index`, meaning these is no
1196    /// child concluded in cursor iteration.
1197    const NULL_CHILD_INDEX: u8 = 16;
1198
1199    pub fn disable_path_compression(&mut self) {
1200        self.maybe_first_realized_child_index = 0;
1201    }
1202
1203    fn get_has_io_error(&self) -> bool { self.io_error().get() }
1204
1205    fn is_node_empty(&self) -> bool {
1206        !self.trie_node.has_value() && self.trie_node.get_children_count() == 0
1207    }
1208
1209    fn compute_merkle(
1210        &mut self, path_without_first_nibble: bool,
1211    ) -> MerkleHash {
1212        let path_merkle = self.trie_node.compute_merkle(
1213            self.trie_node.get_children_merkles().as_ref(),
1214            path_without_first_nibble,
1215        );
1216        self.trie_node.set_merkle(&path_merkle);
1217
1218        path_merkle
1219    }
1220}
1221
1222impl<Mpt: GetRwMpt> ReadWritePathNode<Mpt> {
1223    fn write_out(mut self) -> Result<Option<Mpt>> {
1224        // There is nothing to worry about for path_db_key changes in case of
1225        // path compression changes, because db changes is as simple as
1226        // data overwriting / deletion / creation.
1227        if self.is_node_empty() {
1228            // In-place mode.
1229            let io_mpts = self.basic_node.mpt.as_mut_assumed_owner();
1230            if io_mpts.is_in_place_update() && self.is_loaded {
1231                let result = io_mpts
1232                    .get_write_mpt()
1233                    .delete_node(&self.basic_node.path_db_key);
1234                if result.is_err() {
1235                    self.set_has_io_error();
1236                    bail!(result.unwrap_err());
1237                }
1238            }
1239        } else {
1240            let result = self
1241                .basic_node
1242                .mpt
1243                .as_mut_assumed_owner()
1244                .get_write_mpt()
1245                .write_node(
1246                    &self.basic_node.path_db_key,
1247                    &self.basic_node.trie_node,
1248                );
1249            if result.is_err() {
1250                self.set_has_io_error();
1251                bail!(result.unwrap_err());
1252            }
1253        }
1254
1255        self.db_committed = true;
1256        Ok(self.take_mpt())
1257    }
1258
1259    /// We have to pass struct fields because of borrow checker.
1260    fn open_maybe_split_compressed_path_node(
1261        maybe_compressed_path_split_child_index: u8,
1262        maybe_compressed_path_split_child_node: &mut Option<Box<Self>>,
1263        child_index: u8,
1264    ) -> Result<Option<Box<Self>>> {
1265        if child_index == maybe_compressed_path_split_child_index {
1266            Ok(maybe_compressed_path_split_child_node.take())
1267        } else {
1268            Ok(None)
1269        }
1270    }
1271
1272    fn skip_till_child_index(&mut self, child_index: u8) -> Result<()> {
1273        let is_save_as_mode =
1274            self.mpt.as_ref_assumed_owner().is_save_as_write();
1275        for (
1276            this_child_index,
1277            &SubtreeMerkleWithSize {
1278                merkle: ref this_child_node_merkle_ref,
1279                ..
1280            },
1281        ) in self
1282            .basic_node
1283            .trie_node
1284            .get_children_table_ref()
1285            .iter()
1286            .set_start_index(self.next_child_index)
1287        {
1288            if this_child_index < child_index {
1289                let child_node;
1290                // Handle compressed path logics for concluded subtrees.
1291                // The value of the root node is guaranteed to be settled before
1292                // changes in the subtree.
1293                //
1294                // Even though this child isn't modified, path
1295                // compression may happen if all
1296                // later children are deleted.
1297                let must_load_child_node = !self.trie_node.has_value()
1298                    && self.maybe_first_realized_child_index
1299                        == Self::NULL_CHILD_INDEX;
1300
1301                if !must_load_child_node {
1302                    //  Path compression is unnecessary.
1303                    Self::write_out_pending_child(
1304                        &mut self.basic_node.mpt,
1305                        &mut self.the_first_child_if_pending,
1306                    )?;
1307                }
1308                child_node = match Self::open_maybe_split_compressed_path_node(
1309                    self.maybe_compressed_path_split_child_index,
1310                    &mut self.maybe_compressed_path_split_child_node,
1311                    this_child_index,
1312                )? {
1313                    Some(mut child_node) => {
1314                        child_node.mpt = self.basic_node.mpt.take();
1315                        child_node.compute_merkle(
1316                            child_node.path_start_steps % 2 != 0,
1317                        );
1318
1319                        if is_save_as_mode {
1320                            MptCursorRw::<Mpt, Self>::copy_subtree_without_root(
1321                                        &mut child_node,
1322                                    )?;
1323                        }
1324
1325                        child_node
1326                    }
1327                    None => {
1328                        if is_save_as_mode || must_load_child_node {
1329                            let mut mpt_taken = self.basic_node.mpt.take();
1330                            let mut child_node =
1331                                Box::new(ReadWritePathNode::load_into(
1332                                    self,
1333                                    &mut mpt_taken,
1334                                    this_child_index,
1335                                    this_child_node_merkle_ref,
1336                                    self.in_reconstruct_snapshot_state,
1337                                )?);
1338
1339                            if is_save_as_mode {
1340                                MptCursorRw::<Mpt, Self>::copy_subtree_without_root(
1341                                            &mut child_node,
1342                                        )?;
1343                            }
1344
1345                            child_node
1346                        } else {
1347                            continue;
1348                        }
1349                    }
1350                };
1351
1352                unsafe {
1353                    let mut_self = &mut *(self as *const Self as *mut Self);
1354                    mut_self.next_child_index = this_child_index;
1355                    let mpt_taken =
1356                        mut_self.set_concluded_child(*child_node)?;
1357                    self.basic_node.mpt = mpt_taken;
1358                }
1359            } else {
1360                break;
1361            }
1362        }
1363        // We don't set next_child_index here in this method because there is
1364        // always follow-ups actions which set next_child_index, or
1365        // next_child_index no longer matter.
1366        Ok(())
1367    }
1368
1369    fn set_concluded_child(
1370        &mut self, mut child_node: ReadWritePathNode<Mpt>,
1371    ) -> Result<Option<Mpt>> {
1372        self.subtree_size_delta += child_node.subtree_size_delta;
1373        self.delta_subtree_size += child_node.delta_subtree_size;
1374
1375        if !child_node.is_node_empty() {
1376            let subtree_size = (self
1377                .basic_node
1378                .trie_node
1379                .get_child(self.basic_node.next_child_index)
1380                // Unwrap is safe. See comment below.
1381                .unwrap()
1382                .subtree_size as i64
1383                + child_node.subtree_size_delta)
1384                as u64;
1385
1386            // The safety is guaranteed because when the child doesn't
1387            // originally exist, the child was added to the children table when
1388            // the child was created.
1389            unsafe {
1390                self.basic_node.trie_node.replace_child_unchecked(
1391                    self.basic_node.next_child_index,
1392                    SubtreeMerkleWithSize {
1393                        merkle: child_node.trie_node.get_merkle().clone(),
1394                        subtree_size,
1395                        delta_subtree_size: child_node.delta_subtree_size,
1396                    },
1397                )
1398            };
1399
1400            // The node won't merge with its first children, because either the
1401            // node has value, or the child node is the second child. The
1402            // assumption here is that in db and rust string comparison a string
1403            // that is a prefix of another string is considered smaller.
1404            if self.trie_node.has_value() {
1405                Ok(child_node.write_out()?)
1406            } else if self.maybe_first_realized_child_index
1407                != Self::NULL_CHILD_INDEX
1408            {
1409                Self::write_out_pending_child(
1410                    &mut child_node.mpt,
1411                    &mut self.the_first_child_if_pending,
1412                )?;
1413                Ok(child_node.write_out()?)
1414            } else {
1415                // This child is the first realized child.
1416                self.maybe_first_realized_child_index = self.next_child_index;
1417                let mpt_taken = child_node.take_mpt();
1418                self.the_first_child_if_pending = Some(Box::new(child_node));
1419                Ok(mpt_taken)
1420            }
1421        } else {
1422            let mpt_taken = child_node.write_out()?;
1423
1424            // The safety is guaranteed by condition.
1425            unsafe {
1426                self.basic_node
1427                    .trie_node
1428                    .delete_child_unchecked(self.basic_node.next_child_index)
1429            };
1430
1431            Ok(mpt_taken)
1432        }
1433    }
1434
1435    /// This function is written like this because we want rust borrow checker
1436    /// to work smart.
1437    fn write_out_pending_child(
1438        mpt: &mut Option<Mpt>,
1439        the_first_child: &mut Option<Box<ReadWritePathNode<Mpt>>>,
1440    ) -> Result<()> {
1441        if the_first_child.is_some() {
1442            let mut child = the_first_child.take().unwrap();
1443            child.mpt = mpt.take();
1444            *mpt = child.write_out()?;
1445        }
1446        Ok(())
1447    }
1448}
1449
1450impl<Mpt> Drop for BasicPathNode<Mpt> {
1451    fn drop(&mut self) {
1452        // No-op for read only access.
1453    }
1454}
1455
1456impl<Mpt> Drop for ReadWritePathNode<Mpt> {
1457    fn drop(&mut self) {
1458        if !self.get_has_io_error() {
1459            if self.db_committed == false {
1460                self.set_has_io_error();
1461                debug_assert_eq!(
1462                    true,
1463                    self.db_committed,
1464                    "Node {:?}, {:?} uncommitted in MptCursorRw.",
1465                    self.path_db_key.as_ref(),
1466                    self.trie_node.get_merkle(),
1467                );
1468            }
1469        }
1470    }
1471}
1472
1473pub trait GetReadMpt {
1474    #[allow(unused)]
1475    fn get_merkle_root(&self) -> MerkleHash;
1476
1477    fn get_read_mpt(&mut self) -> &mut dyn SnapshotMptTraitRead;
1478}
1479
1480pub trait GetRwMpt: GetReadMpt {
1481    fn get_write_mpt(&mut self) -> &mut dyn SnapshotMptTraitRw;
1482
1483    fn get_write_and_read_mpt(
1484        &mut self,
1485    ) -> (
1486        &mut dyn SnapshotMptTraitRw,
1487        Option<&mut dyn SnapshotMptTraitReadAndIterate>,
1488    );
1489
1490    fn is_save_as_write(&self) -> bool;
1491    fn is_in_place_update(&self) -> bool;
1492}
1493
1494impl GetReadMpt for &mut dyn SnapshotMptTraitRead {
1495    fn get_merkle_root(&self) -> MerkleHash {
1496        SnapshotMptTraitRead::get_merkle_root(*self)
1497    }
1498
1499    fn get_read_mpt(&mut self) -> &mut dyn SnapshotMptTraitRead { *self }
1500}
1501
1502pub trait TakeMpt<Mpt> {
1503    fn take_mpt(&mut self) -> Option<Mpt>;
1504}
1505
1506impl<Mpt> TakeMpt<Mpt> for BasicPathNode<Mpt> {
1507    fn take_mpt(&mut self) -> Option<Mpt> { self.mpt.take() }
1508}
1509
1510impl<Mpt> TakeMpt<Mpt> for ReadWritePathNode<Mpt> {
1511    fn take_mpt(&mut self) -> Option<Mpt> { self.basic_node.take_mpt() }
1512}
1513
1514impl<Mpt, PathNode> TakeMpt<Mpt> for MptCursor<Mpt, PathNode> {
1515    fn take_mpt(&mut self) -> Option<Mpt> { self.mpt.take() }
1516}
1517
1518impl<Mpt, PathNode> TakeMpt<Mpt> for MptCursorRw<Mpt, PathNode> {
1519    fn take_mpt(&mut self) -> Option<Mpt> { self.mpt.take() }
1520}
1521
1522pub trait CursorSetIoError {
1523    fn io_error(&self) -> &Cell<bool>;
1524    fn set_has_io_error(&self);
1525}
1526
1527impl<Mpt, PathNode> CursorSetIoError for MptCursorRw<Mpt, PathNode> {
1528    fn io_error(&self) -> &Cell<bool> { &self.io_error }
1529
1530    fn set_has_io_error(&self) { self.io_error.replace(true); }
1531}
1532
1533impl CursorSetIoError for *const Cell<bool> {
1534    fn io_error(&self) -> &Cell<bool> { unsafe { &**self } }
1535
1536    fn set_has_io_error(&self) { self.io_error().replace(true); }
1537}
1538
1539impl<Mpt> CursorSetIoError for ReadWritePathNode<Mpt> {
1540    fn io_error(&self) -> &Cell<bool> { unsafe { &*self.has_io_error } }
1541
1542    fn set_has_io_error(&self) { self.io_error().replace(true); }
1543}
1544
1545struct MptCursorGetChild {}
1546
1547static MPT_CURSOR_GET_CHILD: MptCursorGetChild = MptCursorGetChild {};
1548
1549impl<'node> GetChildTrait<'node> for MptCursorGetChild {
1550    type ChildIdType = ();
1551
1552    fn get_child(&'node self, _child_index: u8) -> Option<()> { None }
1553}
1554
1555pub enum CursorPopNodesTerminal<'key> {
1556    Arrived,
1557    Descent {
1558        key_remaining: CompressedPathRef<'key>,
1559        child_index: u8,
1560    },
1561    PathDiverted(WalkStop<'key, ()>),
1562}
1563
1564pub enum CursorOpenPathTerminal<'key> {
1565    Arrived,
1566    ChildNotFound {
1567        key_remaining: CompressedPathRef<'key>,
1568        child_index: u8,
1569    },
1570    PathDiverted(WalkStop<'key, ()>),
1571}
1572
1573pub trait OptionUnwrapBorrowAssumedSomeExtension<T> {
1574    fn as_ref_assumed_owner(&self) -> &T;
1575    fn as_mut_assumed_owner(&mut self) -> &mut T;
1576}
1577
1578impl<Mpt> OptionUnwrapBorrowAssumedSomeExtension<Mpt> for Option<Mpt> {
1579    fn as_ref_assumed_owner(&self) -> &Mpt { self.as_ref().unwrap() }
1580
1581    fn as_mut_assumed_owner(&mut self) -> &mut Mpt { self.as_mut().unwrap() }
1582}
1583
1584pub fn rlp_str_len(len: usize) -> u64 {
1585    if len <= 55 {
1586        len as u64 + 1
1587    } else {
1588        let mut len_of_len = 0i32;
1589        while (len >> (8 * len_of_len)) > 0 {
1590            len_of_len += 1;
1591        }
1592
1593        len as u64 + 1 + len_of_len as u64
1594    }
1595}
1596
1597/// We assume that the keys and values are serialized in separate vector,
1598/// therefore we only add up those rlp string lengths.
1599/// The rlp bytes for the up-most structures are ignored for sync slicer.
1600pub fn rlp_key_value_len(key_len: u16, value_len: usize) -> u64 {
1601    rlp_str_len(key_len.into()) + rlp_str_len(value_len)
1602}
1603
1604use crate::{
1605    impls::{
1606        errors::*,
1607        merkle_patricia_trie::{
1608            children_table::*,
1609            compressed_path::CompressedPathTrait,
1610            trie_node::{TrieNodeTrait, VanillaTrieNode},
1611            trie_proof::*,
1612            walk::*,
1613            CompressedPathRaw, CompressedPathRef,
1614        },
1615    },
1616    storage_db::snapshot_mpt::*,
1617    utils::access_mode,
1618};
1619use primitives::{MerkleHash, MptValue, MERKLE_NULL_NODE};
1620use rustc_hex::ToHex;
1621use std::{
1622    cell::Cell,
1623    mem,
1624    ops::{Deref, DerefMut},
1625    vec::Vec,
1626};