diem_jellyfish_merkle/restore/
mod.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8//! This module implements the functionality to restore a `JellyfishMerkleTree`
9//! from small chunks of accounts.
10
11#[cfg(test)]
12mod restore_test;
13
14use crate::{
15    nibble_path::{NibbleIterator, NibblePath},
16    node_type::{
17        get_child_and_sibling_half_start, Child, Children, InternalNode,
18        LeafNode, Node, NodeKey,
19    },
20    NibbleExt, NodeBatch, TreeReader, TreeWriter, ROOT_NIBBLE_HEIGHT,
21};
22use anyhow::{bail, ensure, format_err, Result};
23use diem_crypto::{
24    hash::{CryptoHash, SPARSE_MERKLE_PLACEHOLDER_HASH},
25    HashValue,
26};
27use diem_nibble::Nibble;
28use diem_types::{
29    proof::{SparseMerkleInternalNode, SparseMerkleRangeProof},
30    transaction::Version,
31};
32use mirai_annotations::*;
33use std::sync::Arc;
34
35#[derive(Clone, Debug, Eq, PartialEq)]
36enum ChildInfo<V> {
37    /// This child is an internal node. The hash of the internal node is stored
38    /// here if it is known, otherwise it is `None`. In the process of
39    /// restoring a tree, we will only know the hash of an internal node
40    /// after we see all the keys that share the same prefix.
41    Internal { hash: Option<HashValue> },
42
43    /// This child is a leaf node.
44    Leaf { node: LeafNode<V> },
45}
46
47impl<V> ChildInfo<V>
48where V: crate::Value
49{
50    /// Converts `self` to a child, assuming the hash is known if it's an
51    /// internal node.
52    fn into_child(self, version: Version) -> Child {
53        match self {
54            Self::Internal { hash } => {
55                Child::new(
56                    hash.expect("Must have been initialized."),
57                    version,
58                    false, /* is_leaf */
59                )
60            }
61            Self::Leaf { node } => {
62                Child::new(node.hash(), version, true /* is_leaf */)
63            }
64        }
65    }
66}
67
68#[derive(Clone, Debug)]
69struct InternalInfo<V> {
70    /// The node key of this internal node.
71    node_key: NodeKey,
72
73    /// The existing children. Every time a child appears, the corresponding
74    /// position will be set to `Some`.
75    children: [Option<ChildInfo<V>>; 16],
76}
77
78impl<V> InternalInfo<V>
79where V: crate::Value
80{
81    /// Creates an empty internal node with no children.
82    fn new_empty(node_key: NodeKey) -> Self {
83        Self {
84            node_key,
85            children: Default::default(),
86        }
87    }
88
89    fn set_child(&mut self, index: usize, child_info: ChildInfo<V>) {
90        precondition!(index < 16);
91        self.children[index] = Some(child_info);
92    }
93
94    /// Converts `self` to an internal node, assuming all of its children are
95    /// already known and fully initialized.
96    fn into_internal_node(
97        mut self, version: Version,
98    ) -> (NodeKey, InternalNode) {
99        let mut children = Children::new();
100
101        // Calling `into_iter` on an array is equivalent to calling `iter`:
102        // https://github.com/rust-lang/rust/issues/25725. So we use `iter_mut` and `take`.
103        for (index, child_info_option) in self.children.iter_mut().enumerate() {
104            if let Some(child_info) = child_info_option.take() {
105                children.insert(
106                    (index as u8).into(),
107                    child_info.into_child(version),
108                );
109            }
110        }
111
112        (self.node_key, InternalNode::new(children))
113    }
114}
115
116pub struct JellyfishMerkleRestore<V> {
117    /// The underlying storage.
118    store: Arc<dyn TreeWriter<V>>,
119
120    /// The version of the tree we are restoring.
121    version: Version,
122
123    /// The nodes we have partially restored. Each `partial_nodes[i-1]` is the
124    /// parent of `partial_nodes[i]`. If a node `partial_nodes[i-1]` has
125    /// multiple children, only the rightmost known child will appear here
126    /// as `partial_nodes[i]`, because any other children on the left would
127    /// have been frozen.
128    ///
129    /// At any point in time, the structure looks like the following:
130    ///
131    /// ```text
132    /// +----+----+----+----+----+----+----+----+
133    /// |    |    |    |    |    |    |    | C  |  partial_nodes[0]
134    /// +----+----+----+----+----+----+----+----+
135    ///   |         |              |
136    ///   |         |              |
137    ///   |         |              |
138    ///   v         v              v
139    /// Frozen    Frozen     +----+----+----+----+----+----+----+----+
140    ///                      |    |    |    | B  |    |    | A  |    |  partial_nodes[1]
141    ///                      +----+----+----+----+----+----+----+----+
142    ///                             |         |
143    ///                             |         |
144    ///                             |         |
145    ///                             v         v
146    ///                            Frozen    Previously inserted account
147    /// ```
148    ///
149    /// We insert the accounts from left to right. So if the next account
150    /// appears at position `A`, it will cause the leaf at position `B` to
151    /// be frozen. If it appears at position `B`, it might cause a few
152    /// internal nodes to be created additionally. If it appears at position
153    /// `C`, it will also cause `partial_nodes[1]` to be added to
154    /// `frozen_nodes` as an internal node and be removed from
155    /// `partial_nodes`.
156    partial_nodes: Vec<InternalInfo<V>>,
157
158    /// The nodes that have been fully restored and are ready to be written to
159    /// storage.
160    frozen_nodes: NodeBatch<V>,
161
162    /// The most recently added leaf. This is used to ensure the keys come in
163    /// increasing order and do proof verification.
164    previous_leaf: Option<LeafNode<V>>,
165
166    /// The number of keys we have received since the most recent restart.
167    num_keys_received: u64,
168
169    /// When the restoration process finishes, we expect the tree to have this
170    /// root hash.
171    expected_root_hash: HashValue,
172}
173
174impl<V> JellyfishMerkleRestore<V>
175where V: crate::Value
176{
177    pub fn new<D: 'static + TreeReader<V> + TreeWriter<V>>(
178        store: Arc<D>, version: Version, expected_root_hash: HashValue,
179    ) -> Result<Self> {
180        let tree_reader = Arc::clone(&store);
181        let (partial_nodes, previous_leaf) =
182            if let Some((node_key, leaf_node)) =
183                tree_reader.get_rightmost_leaf()?
184            {
185                // TODO: confirm rightmost leaf is at the desired version
186                // If the system crashed in the middle of the previous
187                // restoration attempt, we need to recover the
188                // partial nodes to the state right before the crash.
189                (
190                    Self::recover_partial_nodes(
191                        tree_reader.as_ref(),
192                        version,
193                        node_key,
194                    )?,
195                    Some(leaf_node),
196                )
197            } else {
198                (
199                    vec![InternalInfo::new_empty(NodeKey::new_empty_path(
200                        version,
201                    ))],
202                    None,
203                )
204            };
205
206        Ok(Self {
207            store,
208            version,
209            partial_nodes,
210            frozen_nodes: NodeBatch::new(),
211            previous_leaf,
212            num_keys_received: 0,
213            expected_root_hash,
214        })
215    }
216
217    pub fn new_overwrite<D: 'static + TreeWriter<V>>(
218        store: Arc<D>, version: Version, expected_root_hash: HashValue,
219    ) -> Result<Self> {
220        Ok(Self {
221            store,
222            version,
223            partial_nodes: vec![InternalInfo::new_empty(
224                NodeKey::new_empty_path(version),
225            )],
226            frozen_nodes: NodeBatch::new(),
227            previous_leaf: None,
228            num_keys_received: 0,
229            expected_root_hash,
230        })
231    }
232
233    /// Recovers partial nodes from storage. We do this by looking at all the
234    /// ancestors of the rightmost leaf. The ones do not exist in storage
235    /// are the partial nodes.
236    fn recover_partial_nodes(
237        store: &dyn TreeReader<V>, version: Version,
238        rightmost_leaf_node_key: NodeKey,
239    ) -> Result<Vec<InternalInfo<V>>> {
240        ensure!(
241            rightmost_leaf_node_key.nibble_path().num_nibbles() > 0,
242            "Root node would not be written until entire restoration process has completed \
243             successfully.",
244        );
245
246        // Start from the parent of the rightmost leaf. If this internal node
247        // exists in storage, it is not a partial node. Go to the parent
248        // node and repeat until we see a node that does not exist. This
249        // node and all its ancestors will be the partial nodes.
250        let mut node_key = rightmost_leaf_node_key.gen_parent_node_key();
251        while store.get_node_option(&node_key)?.is_some() {
252            node_key = node_key.gen_parent_node_key();
253        }
254
255        // Next we reconstruct all the partial nodes up to the root node,
256        // starting from the bottom. For all of them, we scan all its
257        // possible child positions and see if there is one at
258        // each position. If the node is not the bottom one, there is
259        // additionally a partial node child at the position
260        // `previous_child_index`.
261        let mut partial_nodes = vec![];
262        // Initialize `previous_child_index` to `None` for the first iteration
263        // of the loop so the code below treats it differently.
264        let mut previous_child_index = None;
265
266        loop {
267            let mut internal_info = InternalInfo::new_empty(node_key.clone());
268
269            for i in 0..previous_child_index.unwrap_or(16) {
270                let child_node_key =
271                    node_key.gen_child_node_key(version, (i as u8).into());
272                if let Some(node) = store.get_node_option(&child_node_key)? {
273                    let child_info = match node {
274                        Node::Internal(internal_node) => ChildInfo::Internal {
275                            hash: Some(internal_node.hash()),
276                        },
277                        Node::Leaf(leaf_node) => {
278                            ChildInfo::Leaf { node: leaf_node }
279                        }
280                        Node::Null => {
281                            bail!("Null node should not appear in storage.")
282                        }
283                    };
284                    internal_info.set_child(i, child_info);
285                }
286            }
287
288            // If this is not the lowest partial node, it will have a partial
289            // node child at `previous_child_index`. Set the hash of
290            // this child to `None` because it is a partial node and
291            // we do not know its hash yet. For the lowest partial node, we just
292            // find all its known children from storage in the loop above.
293            if let Some(index) = previous_child_index {
294                internal_info
295                    .set_child(index, ChildInfo::Internal { hash: None });
296            }
297
298            partial_nodes.push(internal_info);
299            if node_key.nibble_path().num_nibbles() == 0 {
300                break;
301            }
302            previous_child_index =
303                node_key.nibble_path().last().map(|x| u8::from(x) as usize);
304            node_key = node_key.gen_parent_node_key();
305        }
306
307        partial_nodes.reverse();
308        Ok(partial_nodes)
309    }
310
311    /// Restores a chunk of accounts. This function will verify that the given
312    /// chunk is correct using the proof and root hash, then write things to
313    /// storage. If the chunk is invalid, an error will be returned and
314    /// nothing will be written to storage.
315    pub fn add_chunk(
316        &mut self, chunk: Vec<(HashValue, V)>, proof: SparseMerkleRangeProof,
317    ) -> Result<()> {
318        ensure!(!chunk.is_empty(), "Should not add empty chunks.");
319
320        for (key, value) in chunk {
321            if let Some(ref prev_leaf) = self.previous_leaf {
322                ensure!(
323                    key > prev_leaf.account_key(),
324                    "Account keys must come in increasing order.",
325                )
326            }
327            self.add_one(key, value.clone());
328            self.previous_leaf.replace(LeafNode::new(key, value));
329            self.num_keys_received += 1;
330        }
331
332        // Verify what we have added so far is all correct.
333        self.verify(proof)?;
334
335        // Write the frozen nodes to storage.
336        self.store.write_node_batch(&self.frozen_nodes)?;
337        self.frozen_nodes.clear();
338
339        Ok(())
340    }
341
342    /// Restores one account.
343    fn add_one(&mut self, new_key: HashValue, new_value: V) {
344        let nibble_path = NibblePath::new(new_key.to_vec());
345        let mut nibbles = nibble_path.nibbles();
346
347        for i in 0..ROOT_NIBBLE_HEIGHT {
348            let child_index =
349                u8::from(nibbles.next().expect("This nibble must exist."))
350                    as usize;
351
352            match self.partial_nodes[i].children[child_index] {
353                Some(ref child_info) => {
354                    // If there exists an internal node at this position, we
355                    // just continue the loop with the next
356                    // nibble. Here we deal with the leaf case.
357                    if let ChildInfo::Leaf { node } = child_info {
358                        assert_eq!(
359                            i,
360                            self.partial_nodes.len() - 1,
361                            "If we see a leaf, there will be no more partial internal nodes on \
362                             lower level, since they would have been frozen.",
363                        );
364
365                        let existing_leaf = node.clone();
366                        self.insert_at_leaf(
367                            child_index,
368                            existing_leaf,
369                            new_key,
370                            new_value,
371                            nibbles,
372                        );
373                        break;
374                    }
375                }
376                None => {
377                    // This means that we are going to put a leaf in this
378                    // position. For all the descendants on
379                    // the left, they are now frozen.
380                    self.freeze(i + 1);
381
382                    // Mark this position as a leaf child.
383                    self.partial_nodes[i].set_child(
384                        child_index,
385                        ChildInfo::Leaf {
386                            node: LeafNode::new(new_key, new_value),
387                        },
388                    );
389
390                    // We do not add this leaf node to self.frozen_nodes because
391                    // we don't know its node key yet. We
392                    // will know its node key when the next account comes.
393                    break;
394                }
395            }
396        }
397    }
398
399    /// Inserts a new account at the position of the existing leaf node. We may
400    /// need to create multiple internal nodes depending on the length of
401    /// the common prefix of the existing key and the new key.
402    fn insert_at_leaf(
403        &mut self, child_index: usize, existing_leaf: LeafNode<V>,
404        new_key: HashValue, new_value: V,
405        mut remaining_nibbles: NibbleIterator<'_>,
406    ) {
407        let num_existing_partial_nodes = self.partial_nodes.len();
408
409        // The node at this position becomes an internal node. Since we may
410        // insert more nodes at this position in the future, we do not
411        // know its hash yet.
412        self.partial_nodes[num_existing_partial_nodes - 1]
413            .set_child(child_index, ChildInfo::Internal { hash: None });
414
415        // Next we build the new internal nodes from top to bottom. All these
416        // internal node except the bottom one will now have a single
417        // internal node child.
418        let common_prefix_len = existing_leaf
419            .account_key()
420            .common_prefix_nibbles_len(new_key);
421        for _ in num_existing_partial_nodes..common_prefix_len {
422            let visited_nibbles = remaining_nibbles.visited_nibbles().collect();
423            let next_nibble =
424                remaining_nibbles.next().expect("This nibble must exist.");
425            let new_node_key = NodeKey::new(self.version, visited_nibbles);
426
427            let mut internal_info = InternalInfo::new_empty(new_node_key);
428            internal_info.set_child(
429                u8::from(next_nibble) as usize,
430                ChildInfo::Internal { hash: None },
431            );
432            self.partial_nodes.push(internal_info);
433        }
434
435        // The last internal node will have two leaf node children.
436        let visited_nibbles = remaining_nibbles.visited_nibbles().collect();
437        let new_node_key = NodeKey::new(self.version, visited_nibbles);
438        let mut internal_info = InternalInfo::new_empty(new_node_key);
439
440        // Next we put the existing leaf as a child of this internal node.
441        let existing_child_index =
442            existing_leaf.account_key().get_nibble(common_prefix_len);
443        internal_info.set_child(
444            u8::from(existing_child_index) as usize,
445            ChildInfo::Leaf {
446                node: existing_leaf,
447            },
448        );
449
450        // Do not set the new child for now. We always call `freeze` first, then
451        // set the new child later, because this way it's easier in
452        // `freeze` to find the correct leaf to freeze -- it's always
453        // the rightmost leaf on the lowest level.
454        self.partial_nodes.push(internal_info);
455        self.freeze(self.partial_nodes.len());
456
457        // Now we set the new child.
458        let new_child_index = new_key.get_nibble(common_prefix_len);
459        assert!(
460            new_child_index > existing_child_index,
461            "New leaf must be on the right.",
462        );
463        self.partial_nodes
464            .last_mut()
465            .expect("This node must exist.")
466            .set_child(
467                u8::from(new_child_index) as usize,
468                ChildInfo::Leaf {
469                    node: LeafNode::new(new_key, new_value),
470                },
471            );
472    }
473
474    /// Puts the nodes that will not be changed later in `self.frozen_nodes`.
475    fn freeze(&mut self, num_remaining_partial_nodes: usize) {
476        self.freeze_previous_leaf();
477        self.freeze_internal_nodes(num_remaining_partial_nodes);
478    }
479
480    /// Freezes the previously added leaf node. It should always be the
481    /// rightmost leaf node on the lowest level, inserted in the previous
482    /// `add_one` call.
483    fn freeze_previous_leaf(&mut self) {
484        // If this is the very first key, there is no previous leaf to freeze.
485        if self.num_keys_received == 0 {
486            return;
487        }
488
489        let last_node = self
490            .partial_nodes
491            .last()
492            .expect("Must have at least one partial node.");
493        let rightmost_child_index = last_node
494            .children
495            .iter()
496            .rposition(|x| x.is_some())
497            .expect("Must have at least one child.");
498
499        match last_node.children[rightmost_child_index] {
500            Some(ChildInfo::Leaf { ref node }) => {
501                let child_node_key = last_node
502                    .node_key
503                    .gen_child_node_key(self.version, (rightmost_child_index as u8).into());
504                self.frozen_nodes
505                    .insert(child_node_key, node.clone().into());
506            }
507            _ => panic!("Must have at least one child and must not have further internal nodes."),
508        }
509    }
510
511    /// Freeze extra internal nodes. Only `num_remaining_nodes` partial internal
512    /// nodes will be kept and the ones on the lower level will be frozen.
513    fn freeze_internal_nodes(&mut self, num_remaining_nodes: usize) {
514        while self.partial_nodes.len() > num_remaining_nodes {
515            let last_node =
516                self.partial_nodes.pop().expect("This node must exist.");
517            let (node_key, internal_node) =
518                last_node.into_internal_node(self.version);
519            // Keep the hash of this node before moving it into `frozen_nodes`,
520            // so we can update its parent later.
521            let node_hash = internal_node.hash();
522            self.frozen_nodes.insert(node_key, internal_node.into());
523
524            // Now that we have computed the hash of the internal node above, we
525            // will also update its parent unless it is root node.
526            if let Some(parent_node) = self.partial_nodes.last_mut() {
527                // This internal node must be the rightmost child of its parent
528                // at the moment.
529                let rightmost_child_index = parent_node
530                    .children
531                    .iter()
532                    .rposition(|x| x.is_some())
533                    .expect("Must have at least one child.");
534
535                match parent_node.children[rightmost_child_index] {
536                    Some(ChildInfo::Internal { ref mut hash }) => {
537                        assert_eq!(hash.replace(node_hash), None);
538                    }
539                    _ => panic!(
540                        "Must have at least one child and the rightmost child must not be a leaf."
541                    ),
542                }
543            }
544        }
545    }
546
547    /// Verifies that all accounts that have been added so far (from the
548    /// leftmost one to `self.previous_leaf`) are correct, i.e., we are able
549    /// to construct `self.expected_root_hash` by combining all existing
550    /// accounts and `proof`.
551    #[allow(clippy::collapsible_if)]
552    fn verify(&self, proof: SparseMerkleRangeProof) -> Result<()> {
553        let previous_leaf = self
554            .previous_leaf
555            .as_ref()
556            .expect("The previous leaf must exist.");
557        let previous_key = previous_leaf.account_key();
558
559        // If we have all siblings on the path from root to `previous_key`, we
560        // should be able to compute the root hash. The siblings on the
561        // right are already in the proof. Now we compute the siblings
562        // on the left side, which represent all the accounts that have ever
563        // been added.
564        let mut left_siblings = vec![];
565
566        // The following process might add some extra placeholder siblings on
567        // the left, but it is nontrivial to determine when the loop
568        // should stop. So instead we just add these siblings for now
569        // and get rid of them in the next step.
570        let mut num_visited_right_siblings = 0;
571        for (i, bit) in previous_key.iter_bits().enumerate() {
572            if bit {
573                // This node is a right child and there should be a sibling on
574                // the left.
575                let sibling = if i >= self.partial_nodes.len() * 4 {
576                    *SPARSE_MERKLE_PLACEHOLDER_HASH
577                } else {
578                    Self::compute_left_sibling(
579                        &self.partial_nodes[i / 4],
580                        previous_key.get_nibble(i / 4),
581                        (3 - i % 4) as u8,
582                    )
583                };
584                left_siblings.push(sibling);
585            } else {
586                // This node is a left child and there should be a sibling on
587                // the right.
588                num_visited_right_siblings += 1;
589            }
590        }
591        ensure!(
592            num_visited_right_siblings >= proof.right_siblings().len(),
593            "Too many right siblings in the proof.",
594        );
595
596        // Now we remove any extra placeholder siblings at the bottom. We keep
597        // removing the last sibling if 1) it's a placeholder 2) it's a
598        // sibling on the left.
599        for bit in previous_key.iter_bits().rev() {
600            if bit {
601                if *left_siblings.last().expect("This sibling must exist.")
602                    == *SPARSE_MERKLE_PLACEHOLDER_HASH
603                {
604                    left_siblings.pop();
605                } else {
606                    break;
607                }
608            } else if num_visited_right_siblings > proof.right_siblings().len()
609            {
610                num_visited_right_siblings -= 1;
611            } else {
612                break;
613            }
614        }
615
616        // Compute the root hash now that we have all the siblings.
617        let num_siblings = left_siblings.len() + proof.right_siblings().len();
618        let mut left_sibling_iter = left_siblings.iter().rev();
619        let mut right_sibling_iter = proof.right_siblings().iter();
620        let mut current_hash = previous_leaf.hash();
621        for bit in previous_key
622            .iter_bits()
623            .rev()
624            .skip(HashValue::LENGTH_IN_BITS - num_siblings)
625        {
626            let (left_hash, right_hash) = if bit {
627                (
628                    *left_sibling_iter
629                        .next()
630                        .ok_or_else(|| format_err!("Missing left sibling."))?,
631                    current_hash,
632                )
633            } else {
634                (
635                    current_hash,
636                    *right_sibling_iter
637                        .next()
638                        .ok_or_else(|| format_err!("Missing right sibling."))?,
639                )
640            };
641            current_hash =
642                SparseMerkleInternalNode::new(left_hash, right_hash).hash();
643        }
644
645        ensure!(
646            current_hash == self.expected_root_hash,
647            "Root hashes do not match. Actual root hash: {:x}. Expected root hash: {:x}.",
648            current_hash,
649            self.expected_root_hash,
650        );
651
652        Ok(())
653    }
654
655    /// Computes the sibling on the left for the `n`-th child.
656    fn compute_left_sibling(
657        partial_node: &InternalInfo<V>, n: Nibble, height: u8,
658    ) -> HashValue {
659        assert!(height < 4);
660        let width = 1usize << height;
661        let start = get_child_and_sibling_half_start(n, height).1 as usize;
662        Self::compute_left_sibling_impl(
663            &partial_node.children[start..start + width],
664        )
665        .0
666    }
667
668    /// Returns the hash for given portion of the subtree and whether this part
669    /// is a leaf node.
670    fn compute_left_sibling_impl(
671        children: &[Option<ChildInfo<V>>],
672    ) -> (HashValue, bool) {
673        assert!(!children.is_empty());
674
675        let num_children = children.len();
676        assert!(num_children.is_power_of_two());
677
678        if num_children == 1 {
679            match &children[0] {
680                Some(ChildInfo::Internal { hash }) => {
681                    (*hash.as_ref().expect("The hash must be known."), false)
682                }
683                Some(ChildInfo::Leaf { node }) => (node.hash(), true),
684                None => (*SPARSE_MERKLE_PLACEHOLDER_HASH, true),
685            }
686        } else {
687            let (left_hash, left_is_leaf) =
688                Self::compute_left_sibling_impl(&children[..num_children / 2]);
689            let (right_hash, right_is_leaf) =
690                Self::compute_left_sibling_impl(&children[num_children / 2..]);
691
692            if left_hash == *SPARSE_MERKLE_PLACEHOLDER_HASH && right_is_leaf {
693                (right_hash, true)
694            } else if left_is_leaf
695                && right_hash == *SPARSE_MERKLE_PLACEHOLDER_HASH
696            {
697                (left_hash, true)
698            } else {
699                (
700                    SparseMerkleInternalNode::new(left_hash, right_hash).hash(),
701                    false,
702                )
703            }
704        }
705    }
706
707    /// Finishes the restoration process. This tells the code that there is no
708    /// more account, otherwise we can not freeze the rightmost leaf and its
709    /// ancestors.
710    pub fn finish(mut self) -> Result<()> {
711        // Deal with the special case when the entire tree has a single leaf.
712        if self.partial_nodes.len() == 1 {
713            let mut num_children = 0;
714            let mut leaf = None;
715            for i in 0..16 {
716                if let Some(ref child_info) = self.partial_nodes[0].children[i]
717                {
718                    num_children += 1;
719                    if let ChildInfo::Leaf { node } = child_info {
720                        leaf = Some(node.clone());
721                    }
722                }
723            }
724
725            if num_children == 1 {
726                if let Some(node) = leaf {
727                    let node_key = NodeKey::new_empty_path(self.version);
728                    assert!(self.frozen_nodes.is_empty());
729                    self.frozen_nodes.insert(node_key, node.into());
730                    self.store.write_node_batch(&self.frozen_nodes)?;
731                    return Ok(());
732                }
733            }
734        }
735
736        self.freeze(0);
737        self.store.write_node_batch(&self.frozen_nodes)
738    }
739}