diem_jellyfish_merkle/restore/mod.rs
1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8//! This module implements the functionality to restore a `JellyfishMerkleTree`
9//! from small chunks of accounts.
10
11#[cfg(test)]
12mod restore_test;
13
14use crate::{
15 nibble_path::{NibbleIterator, NibblePath},
16 node_type::{
17 get_child_and_sibling_half_start, Child, Children, InternalNode,
18 LeafNode, Node, NodeKey,
19 },
20 NibbleExt, NodeBatch, TreeReader, TreeWriter, ROOT_NIBBLE_HEIGHT,
21};
22use anyhow::{bail, ensure, format_err, Result};
23use diem_crypto::{
24 hash::{CryptoHash, SPARSE_MERKLE_PLACEHOLDER_HASH},
25 HashValue,
26};
27use diem_nibble::Nibble;
28use diem_types::{
29 proof::{SparseMerkleInternalNode, SparseMerkleRangeProof},
30 transaction::Version,
31};
32use mirai_annotations::*;
33use std::sync::Arc;
34
35#[derive(Clone, Debug, Eq, PartialEq)]
36enum ChildInfo<V> {
37 /// This child is an internal node. The hash of the internal node is stored
38 /// here if it is known, otherwise it is `None`. In the process of
39 /// restoring a tree, we will only know the hash of an internal node
40 /// after we see all the keys that share the same prefix.
41 Internal { hash: Option<HashValue> },
42
43 /// This child is a leaf node.
44 Leaf { node: LeafNode<V> },
45}
46
47impl<V> ChildInfo<V>
48where V: crate::Value
49{
50 /// Converts `self` to a child, assuming the hash is known if it's an
51 /// internal node.
52 fn into_child(self, version: Version) -> Child {
53 match self {
54 Self::Internal { hash } => {
55 Child::new(
56 hash.expect("Must have been initialized."),
57 version,
58 false, /* is_leaf */
59 )
60 }
61 Self::Leaf { node } => {
62 Child::new(node.hash(), version, true /* is_leaf */)
63 }
64 }
65 }
66}
67
68#[derive(Clone, Debug)]
69struct InternalInfo<V> {
70 /// The node key of this internal node.
71 node_key: NodeKey,
72
73 /// The existing children. Every time a child appears, the corresponding
74 /// position will be set to `Some`.
75 children: [Option<ChildInfo<V>>; 16],
76}
77
78impl<V> InternalInfo<V>
79where V: crate::Value
80{
81 /// Creates an empty internal node with no children.
82 fn new_empty(node_key: NodeKey) -> Self {
83 Self {
84 node_key,
85 children: Default::default(),
86 }
87 }
88
89 fn set_child(&mut self, index: usize, child_info: ChildInfo<V>) {
90 precondition!(index < 16);
91 self.children[index] = Some(child_info);
92 }
93
94 /// Converts `self` to an internal node, assuming all of its children are
95 /// already known and fully initialized.
96 fn into_internal_node(
97 mut self, version: Version,
98 ) -> (NodeKey, InternalNode) {
99 let mut children = Children::new();
100
101 // Calling `into_iter` on an array is equivalent to calling `iter`:
102 // https://github.com/rust-lang/rust/issues/25725. So we use `iter_mut` and `take`.
103 for (index, child_info_option) in self.children.iter_mut().enumerate() {
104 if let Some(child_info) = child_info_option.take() {
105 children.insert(
106 (index as u8).into(),
107 child_info.into_child(version),
108 );
109 }
110 }
111
112 (self.node_key, InternalNode::new(children))
113 }
114}
115
116pub struct JellyfishMerkleRestore<V> {
117 /// The underlying storage.
118 store: Arc<dyn TreeWriter<V>>,
119
120 /// The version of the tree we are restoring.
121 version: Version,
122
123 /// The nodes we have partially restored. Each `partial_nodes[i-1]` is the
124 /// parent of `partial_nodes[i]`. If a node `partial_nodes[i-1]` has
125 /// multiple children, only the rightmost known child will appear here
126 /// as `partial_nodes[i]`, because any other children on the left would
127 /// have been frozen.
128 ///
129 /// At any point in time, the structure looks like the following:
130 ///
131 /// ```text
132 /// +----+----+----+----+----+----+----+----+
133 /// | | | | | | | | C | partial_nodes[0]
134 /// +----+----+----+----+----+----+----+----+
135 /// | | |
136 /// | | |
137 /// | | |
138 /// v v v
139 /// Frozen Frozen +----+----+----+----+----+----+----+----+
140 /// | | | | B | | | A | | partial_nodes[1]
141 /// +----+----+----+----+----+----+----+----+
142 /// | |
143 /// | |
144 /// | |
145 /// v v
146 /// Frozen Previously inserted account
147 /// ```
148 ///
149 /// We insert the accounts from left to right. So if the next account
150 /// appears at position `A`, it will cause the leaf at position `B` to
151 /// be frozen. If it appears at position `B`, it might cause a few
152 /// internal nodes to be created additionally. If it appears at position
153 /// `C`, it will also cause `partial_nodes[1]` to be added to
154 /// `frozen_nodes` as an internal node and be removed from
155 /// `partial_nodes`.
156 partial_nodes: Vec<InternalInfo<V>>,
157
158 /// The nodes that have been fully restored and are ready to be written to
159 /// storage.
160 frozen_nodes: NodeBatch<V>,
161
162 /// The most recently added leaf. This is used to ensure the keys come in
163 /// increasing order and do proof verification.
164 previous_leaf: Option<LeafNode<V>>,
165
166 /// The number of keys we have received since the most recent restart.
167 num_keys_received: u64,
168
169 /// When the restoration process finishes, we expect the tree to have this
170 /// root hash.
171 expected_root_hash: HashValue,
172}
173
174impl<V> JellyfishMerkleRestore<V>
175where V: crate::Value
176{
177 pub fn new<D: 'static + TreeReader<V> + TreeWriter<V>>(
178 store: Arc<D>, version: Version, expected_root_hash: HashValue,
179 ) -> Result<Self> {
180 let tree_reader = Arc::clone(&store);
181 let (partial_nodes, previous_leaf) =
182 if let Some((node_key, leaf_node)) =
183 tree_reader.get_rightmost_leaf()?
184 {
185 // TODO: confirm rightmost leaf is at the desired version
186 // If the system crashed in the middle of the previous
187 // restoration attempt, we need to recover the
188 // partial nodes to the state right before the crash.
189 (
190 Self::recover_partial_nodes(
191 tree_reader.as_ref(),
192 version,
193 node_key,
194 )?,
195 Some(leaf_node),
196 )
197 } else {
198 (
199 vec![InternalInfo::new_empty(NodeKey::new_empty_path(
200 version,
201 ))],
202 None,
203 )
204 };
205
206 Ok(Self {
207 store,
208 version,
209 partial_nodes,
210 frozen_nodes: NodeBatch::new(),
211 previous_leaf,
212 num_keys_received: 0,
213 expected_root_hash,
214 })
215 }
216
217 pub fn new_overwrite<D: 'static + TreeWriter<V>>(
218 store: Arc<D>, version: Version, expected_root_hash: HashValue,
219 ) -> Result<Self> {
220 Ok(Self {
221 store,
222 version,
223 partial_nodes: vec![InternalInfo::new_empty(
224 NodeKey::new_empty_path(version),
225 )],
226 frozen_nodes: NodeBatch::new(),
227 previous_leaf: None,
228 num_keys_received: 0,
229 expected_root_hash,
230 })
231 }
232
233 /// Recovers partial nodes from storage. We do this by looking at all the
234 /// ancestors of the rightmost leaf. The ones do not exist in storage
235 /// are the partial nodes.
236 fn recover_partial_nodes(
237 store: &dyn TreeReader<V>, version: Version,
238 rightmost_leaf_node_key: NodeKey,
239 ) -> Result<Vec<InternalInfo<V>>> {
240 ensure!(
241 rightmost_leaf_node_key.nibble_path().num_nibbles() > 0,
242 "Root node would not be written until entire restoration process has completed \
243 successfully.",
244 );
245
246 // Start from the parent of the rightmost leaf. If this internal node
247 // exists in storage, it is not a partial node. Go to the parent
248 // node and repeat until we see a node that does not exist. This
249 // node and all its ancestors will be the partial nodes.
250 let mut node_key = rightmost_leaf_node_key.gen_parent_node_key();
251 while store.get_node_option(&node_key)?.is_some() {
252 node_key = node_key.gen_parent_node_key();
253 }
254
255 // Next we reconstruct all the partial nodes up to the root node,
256 // starting from the bottom. For all of them, we scan all its
257 // possible child positions and see if there is one at
258 // each position. If the node is not the bottom one, there is
259 // additionally a partial node child at the position
260 // `previous_child_index`.
261 let mut partial_nodes = vec![];
262 // Initialize `previous_child_index` to `None` for the first iteration
263 // of the loop so the code below treats it differently.
264 let mut previous_child_index = None;
265
266 loop {
267 let mut internal_info = InternalInfo::new_empty(node_key.clone());
268
269 for i in 0..previous_child_index.unwrap_or(16) {
270 let child_node_key =
271 node_key.gen_child_node_key(version, (i as u8).into());
272 if let Some(node) = store.get_node_option(&child_node_key)? {
273 let child_info = match node {
274 Node::Internal(internal_node) => ChildInfo::Internal {
275 hash: Some(internal_node.hash()),
276 },
277 Node::Leaf(leaf_node) => {
278 ChildInfo::Leaf { node: leaf_node }
279 }
280 Node::Null => {
281 bail!("Null node should not appear in storage.")
282 }
283 };
284 internal_info.set_child(i, child_info);
285 }
286 }
287
288 // If this is not the lowest partial node, it will have a partial
289 // node child at `previous_child_index`. Set the hash of
290 // this child to `None` because it is a partial node and
291 // we do not know its hash yet. For the lowest partial node, we just
292 // find all its known children from storage in the loop above.
293 if let Some(index) = previous_child_index {
294 internal_info
295 .set_child(index, ChildInfo::Internal { hash: None });
296 }
297
298 partial_nodes.push(internal_info);
299 if node_key.nibble_path().num_nibbles() == 0 {
300 break;
301 }
302 previous_child_index =
303 node_key.nibble_path().last().map(|x| u8::from(x) as usize);
304 node_key = node_key.gen_parent_node_key();
305 }
306
307 partial_nodes.reverse();
308 Ok(partial_nodes)
309 }
310
311 /// Restores a chunk of accounts. This function will verify that the given
312 /// chunk is correct using the proof and root hash, then write things to
313 /// storage. If the chunk is invalid, an error will be returned and
314 /// nothing will be written to storage.
315 pub fn add_chunk(
316 &mut self, chunk: Vec<(HashValue, V)>, proof: SparseMerkleRangeProof,
317 ) -> Result<()> {
318 ensure!(!chunk.is_empty(), "Should not add empty chunks.");
319
320 for (key, value) in chunk {
321 if let Some(ref prev_leaf) = self.previous_leaf {
322 ensure!(
323 key > prev_leaf.account_key(),
324 "Account keys must come in increasing order.",
325 )
326 }
327 self.add_one(key, value.clone());
328 self.previous_leaf.replace(LeafNode::new(key, value));
329 self.num_keys_received += 1;
330 }
331
332 // Verify what we have added so far is all correct.
333 self.verify(proof)?;
334
335 // Write the frozen nodes to storage.
336 self.store.write_node_batch(&self.frozen_nodes)?;
337 self.frozen_nodes.clear();
338
339 Ok(())
340 }
341
342 /// Restores one account.
343 fn add_one(&mut self, new_key: HashValue, new_value: V) {
344 let nibble_path = NibblePath::new(new_key.to_vec());
345 let mut nibbles = nibble_path.nibbles();
346
347 for i in 0..ROOT_NIBBLE_HEIGHT {
348 let child_index =
349 u8::from(nibbles.next().expect("This nibble must exist."))
350 as usize;
351
352 match self.partial_nodes[i].children[child_index] {
353 Some(ref child_info) => {
354 // If there exists an internal node at this position, we
355 // just continue the loop with the next
356 // nibble. Here we deal with the leaf case.
357 if let ChildInfo::Leaf { node } = child_info {
358 assert_eq!(
359 i,
360 self.partial_nodes.len() - 1,
361 "If we see a leaf, there will be no more partial internal nodes on \
362 lower level, since they would have been frozen.",
363 );
364
365 let existing_leaf = node.clone();
366 self.insert_at_leaf(
367 child_index,
368 existing_leaf,
369 new_key,
370 new_value,
371 nibbles,
372 );
373 break;
374 }
375 }
376 None => {
377 // This means that we are going to put a leaf in this
378 // position. For all the descendants on
379 // the left, they are now frozen.
380 self.freeze(i + 1);
381
382 // Mark this position as a leaf child.
383 self.partial_nodes[i].set_child(
384 child_index,
385 ChildInfo::Leaf {
386 node: LeafNode::new(new_key, new_value),
387 },
388 );
389
390 // We do not add this leaf node to self.frozen_nodes because
391 // we don't know its node key yet. We
392 // will know its node key when the next account comes.
393 break;
394 }
395 }
396 }
397 }
398
399 /// Inserts a new account at the position of the existing leaf node. We may
400 /// need to create multiple internal nodes depending on the length of
401 /// the common prefix of the existing key and the new key.
402 fn insert_at_leaf(
403 &mut self, child_index: usize, existing_leaf: LeafNode<V>,
404 new_key: HashValue, new_value: V,
405 mut remaining_nibbles: NibbleIterator<'_>,
406 ) {
407 let num_existing_partial_nodes = self.partial_nodes.len();
408
409 // The node at this position becomes an internal node. Since we may
410 // insert more nodes at this position in the future, we do not
411 // know its hash yet.
412 self.partial_nodes[num_existing_partial_nodes - 1]
413 .set_child(child_index, ChildInfo::Internal { hash: None });
414
415 // Next we build the new internal nodes from top to bottom. All these
416 // internal node except the bottom one will now have a single
417 // internal node child.
418 let common_prefix_len = existing_leaf
419 .account_key()
420 .common_prefix_nibbles_len(new_key);
421 for _ in num_existing_partial_nodes..common_prefix_len {
422 let visited_nibbles = remaining_nibbles.visited_nibbles().collect();
423 let next_nibble =
424 remaining_nibbles.next().expect("This nibble must exist.");
425 let new_node_key = NodeKey::new(self.version, visited_nibbles);
426
427 let mut internal_info = InternalInfo::new_empty(new_node_key);
428 internal_info.set_child(
429 u8::from(next_nibble) as usize,
430 ChildInfo::Internal { hash: None },
431 );
432 self.partial_nodes.push(internal_info);
433 }
434
435 // The last internal node will have two leaf node children.
436 let visited_nibbles = remaining_nibbles.visited_nibbles().collect();
437 let new_node_key = NodeKey::new(self.version, visited_nibbles);
438 let mut internal_info = InternalInfo::new_empty(new_node_key);
439
440 // Next we put the existing leaf as a child of this internal node.
441 let existing_child_index =
442 existing_leaf.account_key().get_nibble(common_prefix_len);
443 internal_info.set_child(
444 u8::from(existing_child_index) as usize,
445 ChildInfo::Leaf {
446 node: existing_leaf,
447 },
448 );
449
450 // Do not set the new child for now. We always call `freeze` first, then
451 // set the new child later, because this way it's easier in
452 // `freeze` to find the correct leaf to freeze -- it's always
453 // the rightmost leaf on the lowest level.
454 self.partial_nodes.push(internal_info);
455 self.freeze(self.partial_nodes.len());
456
457 // Now we set the new child.
458 let new_child_index = new_key.get_nibble(common_prefix_len);
459 assert!(
460 new_child_index > existing_child_index,
461 "New leaf must be on the right.",
462 );
463 self.partial_nodes
464 .last_mut()
465 .expect("This node must exist.")
466 .set_child(
467 u8::from(new_child_index) as usize,
468 ChildInfo::Leaf {
469 node: LeafNode::new(new_key, new_value),
470 },
471 );
472 }
473
474 /// Puts the nodes that will not be changed later in `self.frozen_nodes`.
475 fn freeze(&mut self, num_remaining_partial_nodes: usize) {
476 self.freeze_previous_leaf();
477 self.freeze_internal_nodes(num_remaining_partial_nodes);
478 }
479
480 /// Freezes the previously added leaf node. It should always be the
481 /// rightmost leaf node on the lowest level, inserted in the previous
482 /// `add_one` call.
483 fn freeze_previous_leaf(&mut self) {
484 // If this is the very first key, there is no previous leaf to freeze.
485 if self.num_keys_received == 0 {
486 return;
487 }
488
489 let last_node = self
490 .partial_nodes
491 .last()
492 .expect("Must have at least one partial node.");
493 let rightmost_child_index = last_node
494 .children
495 .iter()
496 .rposition(|x| x.is_some())
497 .expect("Must have at least one child.");
498
499 match last_node.children[rightmost_child_index] {
500 Some(ChildInfo::Leaf { ref node }) => {
501 let child_node_key = last_node
502 .node_key
503 .gen_child_node_key(self.version, (rightmost_child_index as u8).into());
504 self.frozen_nodes
505 .insert(child_node_key, node.clone().into());
506 }
507 _ => panic!("Must have at least one child and must not have further internal nodes."),
508 }
509 }
510
511 /// Freeze extra internal nodes. Only `num_remaining_nodes` partial internal
512 /// nodes will be kept and the ones on the lower level will be frozen.
513 fn freeze_internal_nodes(&mut self, num_remaining_nodes: usize) {
514 while self.partial_nodes.len() > num_remaining_nodes {
515 let last_node =
516 self.partial_nodes.pop().expect("This node must exist.");
517 let (node_key, internal_node) =
518 last_node.into_internal_node(self.version);
519 // Keep the hash of this node before moving it into `frozen_nodes`,
520 // so we can update its parent later.
521 let node_hash = internal_node.hash();
522 self.frozen_nodes.insert(node_key, internal_node.into());
523
524 // Now that we have computed the hash of the internal node above, we
525 // will also update its parent unless it is root node.
526 if let Some(parent_node) = self.partial_nodes.last_mut() {
527 // This internal node must be the rightmost child of its parent
528 // at the moment.
529 let rightmost_child_index = parent_node
530 .children
531 .iter()
532 .rposition(|x| x.is_some())
533 .expect("Must have at least one child.");
534
535 match parent_node.children[rightmost_child_index] {
536 Some(ChildInfo::Internal { ref mut hash }) => {
537 assert_eq!(hash.replace(node_hash), None);
538 }
539 _ => panic!(
540 "Must have at least one child and the rightmost child must not be a leaf."
541 ),
542 }
543 }
544 }
545 }
546
547 /// Verifies that all accounts that have been added so far (from the
548 /// leftmost one to `self.previous_leaf`) are correct, i.e., we are able
549 /// to construct `self.expected_root_hash` by combining all existing
550 /// accounts and `proof`.
551 #[allow(clippy::collapsible_if)]
552 fn verify(&self, proof: SparseMerkleRangeProof) -> Result<()> {
553 let previous_leaf = self
554 .previous_leaf
555 .as_ref()
556 .expect("The previous leaf must exist.");
557 let previous_key = previous_leaf.account_key();
558
559 // If we have all siblings on the path from root to `previous_key`, we
560 // should be able to compute the root hash. The siblings on the
561 // right are already in the proof. Now we compute the siblings
562 // on the left side, which represent all the accounts that have ever
563 // been added.
564 let mut left_siblings = vec![];
565
566 // The following process might add some extra placeholder siblings on
567 // the left, but it is nontrivial to determine when the loop
568 // should stop. So instead we just add these siblings for now
569 // and get rid of them in the next step.
570 let mut num_visited_right_siblings = 0;
571 for (i, bit) in previous_key.iter_bits().enumerate() {
572 if bit {
573 // This node is a right child and there should be a sibling on
574 // the left.
575 let sibling = if i >= self.partial_nodes.len() * 4 {
576 *SPARSE_MERKLE_PLACEHOLDER_HASH
577 } else {
578 Self::compute_left_sibling(
579 &self.partial_nodes[i / 4],
580 previous_key.get_nibble(i / 4),
581 (3 - i % 4) as u8,
582 )
583 };
584 left_siblings.push(sibling);
585 } else {
586 // This node is a left child and there should be a sibling on
587 // the right.
588 num_visited_right_siblings += 1;
589 }
590 }
591 ensure!(
592 num_visited_right_siblings >= proof.right_siblings().len(),
593 "Too many right siblings in the proof.",
594 );
595
596 // Now we remove any extra placeholder siblings at the bottom. We keep
597 // removing the last sibling if 1) it's a placeholder 2) it's a
598 // sibling on the left.
599 for bit in previous_key.iter_bits().rev() {
600 if bit {
601 if *left_siblings.last().expect("This sibling must exist.")
602 == *SPARSE_MERKLE_PLACEHOLDER_HASH
603 {
604 left_siblings.pop();
605 } else {
606 break;
607 }
608 } else if num_visited_right_siblings > proof.right_siblings().len()
609 {
610 num_visited_right_siblings -= 1;
611 } else {
612 break;
613 }
614 }
615
616 // Compute the root hash now that we have all the siblings.
617 let num_siblings = left_siblings.len() + proof.right_siblings().len();
618 let mut left_sibling_iter = left_siblings.iter().rev();
619 let mut right_sibling_iter = proof.right_siblings().iter();
620 let mut current_hash = previous_leaf.hash();
621 for bit in previous_key
622 .iter_bits()
623 .rev()
624 .skip(HashValue::LENGTH_IN_BITS - num_siblings)
625 {
626 let (left_hash, right_hash) = if bit {
627 (
628 *left_sibling_iter
629 .next()
630 .ok_or_else(|| format_err!("Missing left sibling."))?,
631 current_hash,
632 )
633 } else {
634 (
635 current_hash,
636 *right_sibling_iter
637 .next()
638 .ok_or_else(|| format_err!("Missing right sibling."))?,
639 )
640 };
641 current_hash =
642 SparseMerkleInternalNode::new(left_hash, right_hash).hash();
643 }
644
645 ensure!(
646 current_hash == self.expected_root_hash,
647 "Root hashes do not match. Actual root hash: {:x}. Expected root hash: {:x}.",
648 current_hash,
649 self.expected_root_hash,
650 );
651
652 Ok(())
653 }
654
655 /// Computes the sibling on the left for the `n`-th child.
656 fn compute_left_sibling(
657 partial_node: &InternalInfo<V>, n: Nibble, height: u8,
658 ) -> HashValue {
659 assert!(height < 4);
660 let width = 1usize << height;
661 let start = get_child_and_sibling_half_start(n, height).1 as usize;
662 Self::compute_left_sibling_impl(
663 &partial_node.children[start..start + width],
664 )
665 .0
666 }
667
668 /// Returns the hash for given portion of the subtree and whether this part
669 /// is a leaf node.
670 fn compute_left_sibling_impl(
671 children: &[Option<ChildInfo<V>>],
672 ) -> (HashValue, bool) {
673 assert!(!children.is_empty());
674
675 let num_children = children.len();
676 assert!(num_children.is_power_of_two());
677
678 if num_children == 1 {
679 match &children[0] {
680 Some(ChildInfo::Internal { hash }) => {
681 (*hash.as_ref().expect("The hash must be known."), false)
682 }
683 Some(ChildInfo::Leaf { node }) => (node.hash(), true),
684 None => (*SPARSE_MERKLE_PLACEHOLDER_HASH, true),
685 }
686 } else {
687 let (left_hash, left_is_leaf) =
688 Self::compute_left_sibling_impl(&children[..num_children / 2]);
689 let (right_hash, right_is_leaf) =
690 Self::compute_left_sibling_impl(&children[num_children / 2..]);
691
692 if left_hash == *SPARSE_MERKLE_PLACEHOLDER_HASH && right_is_leaf {
693 (right_hash, true)
694 } else if left_is_leaf
695 && right_hash == *SPARSE_MERKLE_PLACEHOLDER_HASH
696 {
697 (left_hash, true)
698 } else {
699 (
700 SparseMerkleInternalNode::new(left_hash, right_hash).hash(),
701 false,
702 )
703 }
704 }
705 }
706
707 /// Finishes the restoration process. This tells the code that there is no
708 /// more account, otherwise we can not freeze the rightmost leaf and its
709 /// ancestors.
710 pub fn finish(mut self) -> Result<()> {
711 // Deal with the special case when the entire tree has a single leaf.
712 if self.partial_nodes.len() == 1 {
713 let mut num_children = 0;
714 let mut leaf = None;
715 for i in 0..16 {
716 if let Some(ref child_info) = self.partial_nodes[0].children[i]
717 {
718 num_children += 1;
719 if let ChildInfo::Leaf { node } = child_info {
720 leaf = Some(node.clone());
721 }
722 }
723 }
724
725 if num_children == 1 {
726 if let Some(node) = leaf {
727 let node_key = NodeKey::new_empty_path(self.version);
728 assert!(self.frozen_nodes.is_empty());
729 self.frozen_nodes.insert(node_key, node.into());
730 self.store.write_node_batch(&self.frozen_nodes)?;
731 return Ok(());
732 }
733 }
734 }
735
736 self.freeze(0);
737 self.store.write_node_batch(&self.frozen_nodes)
738 }
739}