cfx_storage/impls/
state.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5pub type ChildrenMerkleMap =
6    BTreeMap<ActualSlabIndex, VanillaChildrenTable<MerkleHash>>;
7
8pub struct State {
9    manager: Arc<StateManager>,
10    snapshot_db: SnapshotDb,
11    snapshot_epoch_id: EpochId,
12    snapshot_merkle_root: MerkleHash,
13    maybe_intermediate_trie: Option<Arc<DeltaMpt>>,
14    intermediate_trie_root: Option<NodeRefDeltaMpt>,
15    intermediate_trie_root_merkle: MerkleHash,
16    /// A None value indicate the special case when snapshot_db is actually the
17    /// snapshot_db from the intermediate_epoch_id.
18    maybe_intermediate_trie_key_padding: Option<DeltaMptKeyPadding>,
19    delta_trie: Arc<DeltaMpt>,
20    delta_trie_root: Option<NodeRefDeltaMpt>,
21    delta_trie_key_padding: DeltaMptKeyPadding,
22    intermediate_epoch_id: EpochId,
23    delta_trie_height: Option<u32>,
24    height: Option<u64>,
25    owned_node_set: Option<OwnedNodeSet>,
26    dirty: bool,
27
28    /// Children merkle hashes. Only used for committing and computing
29    /// merkle root. It will be cleared after being committed.
30    children_merkle_map: ChildrenMerkleMap,
31
32    // FIXME: this is a hack to get pivot chain from parent snapshot to a
33    // FIXME: snapshot. it should be done in consensus.
34    parent_epoch_id: EpochId,
35    recover_mpt_during_construct_pivot_state: bool,
36}
37
38impl State {
39    pub fn new(
40        manager: Arc<StateManager>, state_trees: StateTrees,
41        construct_pivot_state: bool,
42    ) -> Self {
43        Self {
44            manager,
45            snapshot_db: state_trees.snapshot_db,
46            snapshot_epoch_id: state_trees.snapshot_epoch_id,
47            snapshot_merkle_root: state_trees.snapshot_merkle_root,
48            maybe_intermediate_trie: state_trees.maybe_intermediate_trie,
49            intermediate_trie_root: state_trees.intermediate_trie_root,
50            intermediate_trie_root_merkle: state_trees
51                .intermediate_trie_root_merkle,
52            maybe_intermediate_trie_key_padding: state_trees
53                .maybe_intermediate_trie_key_padding,
54            delta_trie: state_trees.delta_trie,
55            delta_trie_root: state_trees.delta_trie_root,
56            delta_trie_key_padding: state_trees.delta_trie_key_padding,
57            intermediate_epoch_id: state_trees.intermediate_epoch_id,
58            delta_trie_height: state_trees.maybe_delta_trie_height,
59            height: state_trees.maybe_height,
60            owned_node_set: Some(Default::default()),
61            dirty: false,
62            children_merkle_map: ChildrenMerkleMap::new(),
63            parent_epoch_id: state_trees.parent_epoch_id,
64            recover_mpt_during_construct_pivot_state: construct_pivot_state,
65        }
66    }
67
68    fn state_root(&self, merkle_root: MerkleHash) -> StateRootWithAuxInfo {
69        let state_root = StateRoot {
70            snapshot_root: self.snapshot_merkle_root,
71            intermediate_delta_root: self.intermediate_trie_root_merkle,
72            delta_root: merkle_root,
73        };
74        let state_root_hash = state_root.compute_state_root_hash();
75        StateRootWithAuxInfo {
76            state_root,
77            aux_info: StateRootAuxInfo {
78                snapshot_epoch_id: self.snapshot_epoch_id.clone(),
79                intermediate_epoch_id: self.intermediate_epoch_id.clone(),
80                maybe_intermediate_mpt_key_padding: self
81                    .maybe_intermediate_trie_key_padding
82                    .clone(),
83                delta_mpt_key_padding: self.delta_trie_key_padding.clone(),
84                state_root_hash,
85            },
86        }
87    }
88
89    fn check_freshly_synced_snapshot(&self, op: &'static str) -> Result<()> {
90        // Can't offer proof if we are operating on a synced snapshot, which is
91        // missing intermediate mpt.
92        if self.maybe_intermediate_trie_key_padding.is_some()
93            || self.intermediate_epoch_id == NULL_EPOCH
94        {
95            Ok(())
96        } else {
97            Err(Error::UnsupportedByFreshlySyncedSnapshot(op).into())
98        }
99    }
100
101    fn get_from_delta<WithProof: StaticBool>(
102        &self, mpt: &DeltaMpt, maybe_root_node: Option<NodeRefDeltaMpt>,
103        access_key: &[u8],
104    ) -> Result<(MptValue<Box<[u8]>>, Option<TrieProof>)> {
105        // Get won't create any new nodes so it's fine to pass an empty
106        // owned_node_set.
107        let mut empty_owned_node_set: Option<OwnedNodeSet> =
108            Some(Default::default());
109
110        match maybe_root_node {
111            None => Ok((MptValue::None, None)),
112            Some(root_node) => {
113                let maybe_value = SubTrieVisitor::new(
114                    mpt,
115                    root_node.clone(),
116                    &mut empty_owned_node_set,
117                )?
118                .get(access_key)?;
119
120                let maybe_proof = match WithProof::value() {
121                    false => None,
122                    true => Some(
123                        SubTrieVisitor::new(
124                            mpt,
125                            root_node,
126                            &mut empty_owned_node_set,
127                        )?
128                        .get_proof(access_key)?,
129                    ),
130                };
131
132                Ok((maybe_value, maybe_proof))
133            }
134        }
135    }
136
137    pub fn get_from_snapshot<WithProof: StaticBool>(
138        &self, access_key: &[u8],
139    ) -> Result<(Option<Box<[u8]>>, Option<TrieProof>)> {
140        let value = self.snapshot_db.get(access_key)?;
141        Ok((
142            value,
143            if WithProof::value() {
144                let mut mpt = self.snapshot_db.open_snapshot_mpt_shared()?;
145                let mut cursor = MptCursor::<
146                    &mut dyn SnapshotMptTraitRead,
147                    BasicPathNode<&mut dyn SnapshotMptTraitRead>,
148                >::new(&mut mpt);
149                cursor.load_root()?;
150                cursor.open_path_for_key::<access_mode::Read>(access_key)?;
151                let proof = cursor.to_proof();
152                cursor.finish()?;
153
154                Some(proof)
155            } else {
156                None
157            },
158        ))
159    }
160
161    fn get_from_all_tries<WithProof: StaticBool>(
162        &self, access_key: StorageKeyWithSpace,
163    ) -> Result<(Option<Box<[u8]>>, StateProof)> {
164        let mut proof = StateProof::default();
165
166        let (maybe_value, maybe_delta_proof) = self
167            .get_from_delta::<WithProof>(
168                &self.delta_trie,
169                self.delta_trie_root.clone(),
170                &access_key
171                    .to_delta_mpt_key_bytes(&self.delta_trie_key_padding),
172            )?;
173        proof.with_delta(maybe_delta_proof);
174
175        match maybe_value {
176            MptValue::Some(value) => {
177                return Ok((Some(value), proof));
178            }
179            MptValue::TombStone => {
180                return Ok((None, proof));
181            }
182            _ => {}
183        }
184
185        // FIXME This is for the case of read-only access of the first snapshot
186        // state where intermediate_mpt is some.
187        if self.maybe_intermediate_trie_key_padding.is_some() {
188            if let Some(intermediate_trie) =
189                self.maybe_intermediate_trie.as_ref()
190            {
191                let (maybe_value, maybe_proof) = self
192                    .get_from_delta::<WithProof>(
193                        intermediate_trie,
194                        self.intermediate_trie_root.clone(),
195                        &access_key.to_delta_mpt_key_bytes(
196                            &self
197                                .maybe_intermediate_trie_key_padding
198                                .as_ref()
199                                .unwrap(),
200                        ),
201                    )?;
202
203                proof.with_intermediate(maybe_proof);
204
205                match maybe_value {
206                    MptValue::Some(value) => {
207                        return Ok((Some(value), proof));
208                    }
209                    MptValue::TombStone => {
210                        return Ok((None, proof));
211                    }
212                    _ => {}
213                }
214            }
215        }
216
217        let (maybe_value, maybe_proof) =
218            self.get_from_snapshot::<WithProof>(&access_key.to_key_bytes())?;
219        proof.with_snapshot(maybe_proof);
220
221        Ok((maybe_value, proof))
222    }
223}
224
225impl Drop for State {
226    fn drop(&mut self) {
227        if self.dirty {
228            panic!("State is dirty however is not committed before free.");
229        }
230    }
231}
232
233impl StateTrait for State {
234    fn get(
235        &self, access_key: StorageKeyWithSpace,
236    ) -> Result<Option<Box<[u8]>>> {
237        self.ensure_temp_slab_for_db_load();
238
239        self.get_from_all_tries::<NoProof>(access_key)
240            .map(|(value, _)| value)
241    }
242
243    fn set(
244        &mut self, access_key: StorageKeyWithSpace, value: Box<[u8]>,
245    ) -> Result<()> {
246        self.pre_modification();
247
248        let root_node = self.get_or_create_delta_root_node()?;
249        self.delta_trie_root = SubTrieVisitor::new(
250            &self.delta_trie,
251            root_node,
252            &mut self.owned_node_set,
253        )?
254        .set(
255            &access_key.to_delta_mpt_key_bytes(&self.delta_trie_key_padding),
256            value,
257        )?
258        .into();
259
260        Ok(())
261    }
262
263    fn delete(&mut self, access_key: StorageKeyWithSpace) -> Result<()> {
264        self.set(access_key, MptValue::<Box<[u8]>>::TombStone.unwrap())?;
265        Ok(())
266    }
267
268    fn delete_test_only(
269        &mut self, access_key: StorageKeyWithSpace,
270    ) -> Result<Option<Box<[u8]>>> {
271        self.pre_modification();
272
273        match self.get_delta_root_node() {
274            None => Ok(None),
275            Some(old_root_node) => {
276                let (old_value, _, root_node) = SubTrieVisitor::new(
277                    &self.delta_trie,
278                    old_root_node,
279                    &mut self.owned_node_set,
280                )?
281                .delete(
282                    &access_key
283                        .to_delta_mpt_key_bytes(&self.delta_trie_key_padding),
284                )?;
285                self.delta_trie_root =
286                    root_node.map(|maybe_node| maybe_node.into());
287                Ok(old_value)
288            }
289        }
290    }
291
292    /// Delete all key/value pairs with access_key_prefix as prefix. These
293    /// key/value pairs exist in three places: Delta Trie, Intermediate Trie
294    /// and Snapshot DB.
295    ///
296    /// For key/value pairs in Delta Trie, we can simply delete them. For
297    /// key/value pairs in Intermediate Trie and Snapshot DB, we try to
298    /// enumerate all key/value pairs and set tombstone in Delta Trie only when
299    /// necessary.
300    fn delete_all(
301        &mut self, access_key_prefix: StorageKeyWithSpace,
302    ) -> Result<Option<Vec<MptKeyValue>>> {
303        self.delete_all_impl::<access_mode::Write>(access_key_prefix)
304    }
305
306    fn read_all(
307        &mut self, access_key_prefix: StorageKeyWithSpace,
308    ) -> Result<Option<Vec<MptKeyValue>>> {
309        self.delete_all_impl::<access_mode::Read>(access_key_prefix)
310    }
311
312    fn read_all_with_callback(
313        &mut self, access_key_prefix: StorageKeyWithSpace,
314        callback: &mut dyn FnMut(MptKeyValue), only_account_key: bool,
315    ) -> Result<()> {
316        self.read_all_with_callback_impl(
317            access_key_prefix,
318            callback,
319            only_account_key,
320        )
321    }
322
323    fn compute_state_root(&mut self) -> Result<StateRootWithAuxInfo> {
324        self.ensure_temp_slab_for_db_load();
325
326        let merkle_root = self.compute_merkle_root()?;
327        Ok(self.state_root(merkle_root))
328    }
329
330    fn get_state_root(&self) -> Result<StateRootWithAuxInfo> {
331        self.ensure_temp_slab_for_db_load();
332
333        Ok(self.state_root(self.state_root_check()?))
334    }
335
336    // TODO(yz): replace coarse lock with a queue.
337    fn commit(&mut self, epoch_id: EpochId) -> Result<StateRootWithAuxInfo> {
338        self.ensure_temp_slab_for_db_load();
339
340        let merkle_root = self.state_root_check()?;
341
342        // TODO(yz): Think about leaving these node dirty and only commit when
343        // the dirty node is removed from cache.
344        let commit_result = self.do_db_commit(epoch_id, &merkle_root);
345        debug!(
346            "commit state for epoch {:?}: merkle_root = {:?}, delta_trie_height={:?} \
347            has_intermediate={}, height={:?}, snapshot_epoch_id={:?}, \
348            intermediate_epoch_id={:?}, intermediate_mpt_id={:?}, delta_mpt_id={}.",
349            epoch_id,
350            merkle_root,
351            self.delta_trie_height,
352            self.maybe_intermediate_trie.is_some(),
353            self.height,
354            self.snapshot_epoch_id,
355            self.intermediate_epoch_id,
356            self.maybe_intermediate_trie.as_ref().map(|mpt| mpt.get_mpt_id()),
357            self.delta_trie.get_mpt_id(),
358        );
359        if commit_result.is_err() {
360            self.revert();
361            debug!("State commitment failed.");
362
363            commit_result?;
364        }
365        if self.delta_trie_height.unwrap()
366            >= self
367                .manager
368                .get_storage_manager()
369                .get_snapshot_epoch_count()
370                / 3
371            && self.maybe_intermediate_trie.is_some()
372        {
373            // TODO: use a better criteria and put it in consensus maybe.
374            let snapshot_height = self.height.clone().unwrap()
375                - self.delta_trie_height.unwrap() as u64;
376            self.manager.check_make_snapshot(
377                self.maybe_intermediate_trie.clone(),
378                self.intermediate_trie_root.clone(),
379                &self.intermediate_epoch_id,
380                snapshot_height,
381                self.recover_mpt_during_construct_pivot_state,
382            )?;
383        }
384
385        Ok(self.state_root(merkle_root))
386    }
387}
388
389impl StateTraitExt for State {
390    fn get_with_proof(
391        &self, access_key: StorageKeyWithSpace,
392    ) -> Result<(Option<Box<[u8]>>, StateProof)> {
393        self.ensure_temp_slab_for_db_load();
394
395        self.check_freshly_synced_snapshot("proof")?;
396        self.get_from_all_tries::<WithProof>(access_key)
397    }
398
399    fn get_node_merkle_all_versions<WithProof: StaticBool>(
400        &self, access_key: StorageKeyWithSpace,
401    ) -> Result<(NodeMerkleTriplet, NodeMerkleProof)> {
402        self.check_freshly_synced_snapshot("proof")?;
403        let mut proof = NodeMerkleProof::default();
404
405        // ----------- get from delta -----------
406        let delta = match self.delta_trie_root {
407            Some(ref root_node) => {
408                let key = access_key
409                    .to_delta_mpt_key_bytes(&self.delta_trie_key_padding);
410
411                let mut owned_node_set = Some(Default::default());
412
413                let mut visitor = SubTrieVisitor::new(
414                    &self.delta_trie,
415                    root_node.clone(),
416                    // won't create any new nodes
417                    &mut owned_node_set,
418                )?;
419
420                let delta = visitor.get_merkle_hash_wo_compressed_path(&key)?;
421
422                let maybe_proof = match WithProof::value() {
423                    false => None,
424                    true => Some(
425                        SubTrieVisitor::new(
426                            &self.delta_trie,
427                            root_node.clone(),
428                            // won't create any new nodes
429                            &mut Some(Default::default()),
430                        )?
431                        .get_proof(&key)?,
432                    ),
433                };
434
435                proof.with_delta(maybe_proof);
436
437                // for tombstones, we ignore the node merkle
438                if visitor.get(&key)?.is_tombstone() {
439                    MptValue::TombStone
440                } else {
441                    MptValue::from(delta)
442                }
443            }
444            None => MptValue::None,
445        };
446
447        // ----------- get from intermediate -----------
448        let intermediate = match (
449            &self.intermediate_trie_root,
450            &self.maybe_intermediate_trie,
451            &self.maybe_intermediate_trie_key_padding,
452        ) {
453            (
454                Some(ref root_node),
455                Some(ref intermediate_trie),
456                Some(ref intermediate_trie_key_padding),
457            ) => {
458                let key = access_key
459                    .to_delta_mpt_key_bytes(&intermediate_trie_key_padding);
460
461                let mut owned_node_set = Some(Default::default());
462
463                let mut visitor = SubTrieVisitor::new(
464                    &intermediate_trie,
465                    root_node.clone(),
466                    // won't create any new nodes
467                    &mut owned_node_set,
468                )?;
469
470                let intermediate =
471                    visitor.get_merkle_hash_wo_compressed_path(&key)?;
472
473                let maybe_proof = match WithProof::value() {
474                    false => None,
475                    true => Some(
476                        SubTrieVisitor::new(
477                            &intermediate_trie,
478                            root_node.clone(),
479                            // won't create any new nodes
480                            &mut Some(Default::default()),
481                        )?
482                        .get_proof(&key)?,
483                    ),
484                };
485
486                proof.with_intermediate(maybe_proof);
487
488                // for tombstones, we ignore the node merkle
489                if visitor.get(&key)?.is_tombstone() {
490                    MptValue::TombStone
491                } else {
492                    MptValue::from(intermediate)
493                }
494            }
495            _ => MptValue::None,
496        };
497
498        // ----------- get from snapshot -----------
499        let key = access_key.to_key_bytes();
500
501        let mut mpt = self.snapshot_db.open_snapshot_mpt_shared()?;
502        let mut cursor = MptCursor::<
503            &mut dyn SnapshotMptTraitRead,
504            BasicPathNode<&mut dyn SnapshotMptTraitRead>,
505        >::new(&mut mpt);
506        cursor.load_root()?;
507        let snapshot =
508            match cursor.open_path_for_key::<access_mode::Read>(&key)? {
509                CursorOpenPathTerminal::Arrived => Some(
510                    cursor
511                        .current_node_mut()
512                        .trie_node
513                        .get_merkle_hash_wo_compressed_path(),
514                ),
515                _ => None,
516            };
517        let maybe_proof = match WithProof::value() {
518            false => None,
519            true => Some(cursor.to_proof()),
520        };
521        cursor.finish()?;
522        proof.with_snapshot(maybe_proof);
523
524        let triplet = NodeMerkleTriplet {
525            delta,
526            intermediate,
527            snapshot,
528        };
529
530        Ok((triplet, proof))
531    }
532}
533
534impl StateDbGetOriginalMethods for State {
535    fn get_original_raw_with_proof(
536        &self, key: StorageKeyWithSpace,
537    ) -> Result<(Option<Box<[u8]>>, StateProof)> {
538        let r = Ok(self.get_with_proof(key)?);
539        trace!("get_original_raw_with_proof key={:?}, value={:?}", key, r);
540        r
541    }
542
543    fn get_original_storage_root(
544        &self, address: &AddressWithSpace,
545    ) -> Result<StorageRoot> {
546        let key = StorageKey::new_storage_root_key(&address.address)
547            .with_space(address.space);
548
549        let (root, _) = self.get_node_merkle_all_versions::<NoProof>(key)?;
550
551        Ok(root)
552    }
553
554    fn get_original_storage_root_with_proof(
555        &self, address: &AddressWithSpace,
556    ) -> Result<(StorageRoot, StorageRootProof)> {
557        let key = StorageKey::new_storage_root_key(&address.address)
558            .with_space(address.space);
559
560        self.get_node_merkle_all_versions::<WithProof>(key)
561            .map_err(Into::into)
562    }
563}
564
565impl State {
566    fn ensure_temp_slab_for_db_load(&self) {
567        self.delta_trie.get_node_memory_manager().enlarge().ok();
568    }
569
570    fn pre_modification(&mut self) {
571        if !self.dirty {
572            self.dirty = true
573        }
574        self.delta_trie.get_node_memory_manager().enlarge().ok();
575    }
576
577    fn get_delta_root_node(&self) -> Option<NodeRefDeltaMpt> {
578        self.delta_trie_root.clone()
579    }
580
581    fn get_or_create_delta_root_node(&mut self) -> Result<NodeRefDeltaMpt> {
582        if self.delta_trie_root.is_none() {
583            let allocator =
584                self.delta_trie.get_node_memory_manager().get_allocator();
585            let (root_cow, entry) = CowNodeRef::new_uninitialized_node(
586                &allocator,
587                self.owned_node_set.as_mut().unwrap(),
588                self.delta_trie.get_mpt_id(),
589            )?;
590            // Insert empty node.
591            entry.insert(UnsafeCell::new(Default::default()));
592
593            self.delta_trie_root =
594                root_cow.into_child().map(|maybe_node| maybe_node.into());
595        }
596
597        // Safe because in either branch the result is Some.
598        Ok(self.get_delta_root_node().unwrap())
599    }
600
601    fn compute_merkle_root(&mut self) -> Result<MerkleHash> {
602        assert!(self.children_merkle_map.len() == 0);
603
604        match &self.delta_trie_root {
605            None => {
606                // Don't commit empty state. Empty state shouldn't exists since
607                // genesis block.
608                Ok(MERKLE_NULL_NODE)
609            }
610            Some(root_node) => {
611                let mut cow_root = CowNodeRef::new(
612                    root_node.clone(),
613                    self.owned_node_set.as_ref().unwrap(),
614                    self.delta_trie.get_mpt_id(),
615                );
616                let allocator =
617                    self.delta_trie.get_node_memory_manager().get_allocator();
618                let arc_db = self.delta_trie.get_arc_db()?;
619                let merkle = cow_root.get_or_compute_merkle(
620                    &self.delta_trie,
621                    self.owned_node_set.as_mut().unwrap(),
622                    &allocator,
623                    &mut *arc_db.to_owned_read()?,
624                    &mut self.children_merkle_map,
625                    0,
626                )?;
627                cow_root.into_child();
628
629                Ok(merkle)
630            }
631        }
632    }
633
634    fn do_db_commit(
635        &mut self, epoch_id: EpochId, merkle_root: &MerkleHash,
636    ) -> Result<()> {
637        let maybe_existing_merkle_root =
638            self.delta_trie.get_merkle_root_by_epoch_id(&epoch_id)?;
639        if maybe_existing_merkle_root.is_some() {
640            // TODO This may happen for genesis when we restart
641            info!(
642                "Overwriting computed state for epoch {:?}, \
643                 committed merkle root {:?}, new merkle root {:?}",
644                epoch_id,
645                maybe_existing_merkle_root.unwrap(),
646                merkle_root
647            );
648            assert_eq!(
649                maybe_existing_merkle_root,
650                Some(*merkle_root),
651                "Overwriting computed state with a different merkle root."
652            );
653            self.revert();
654            return Ok(());
655        }
656
657        // Use coarse lock to prevent row number from interleaving,
658        // which makes it cleaner to restart from db failure. It also
659        // benefits performance because without a coarse lock all
660        // threads may not be able to do anything else when they compete
661        // with each other on slow db writing.
662        let mut commit_transaction = self.delta_trie.start_commit()?;
663
664        let maybe_root_node = self.delta_trie_root.clone();
665        match maybe_root_node {
666            None => {}
667            Some(root_node) => {
668                let start_row_number = commit_transaction.info.row_number.value;
669
670                let mut cow_root = CowNodeRef::new(
671                    root_node,
672                    self.owned_node_set.as_ref().unwrap(),
673                    self.delta_trie.get_mpt_id(),
674                );
675
676                if cow_root.is_owned() {
677                    let allocator = self
678                        .delta_trie
679                        .get_node_memory_manager()
680                        .get_allocator();
681                    let trie_node_mut = unsafe {
682                        self.delta_trie
683                            .get_node_memory_manager()
684                            .dirty_node_as_mut_unchecked(
685                                &allocator,
686                                &mut cow_root.node_ref,
687                            )
688                    };
689                    let result = cow_root.commit_dirty_recursively(
690                        &self.delta_trie,
691                        self.owned_node_set.as_mut().unwrap(),
692                        trie_node_mut,
693                        &mut commit_transaction,
694                        &mut *self
695                            .delta_trie
696                            .get_node_memory_manager()
697                            .get_cache_manager()
698                            .lock(),
699                        &allocator,
700                        &mut self.children_merkle_map,
701                    );
702                    self.children_merkle_map.clear();
703                    self.delta_trie_root =
704                        cow_root.into_child().map(|r| r.into());
705                    result?;
706
707                    // TODO: check the guarantee of underlying db on transaction
708                    // TODO: failure. may have to commit last_row_number
709                    // TODO: separately in worst case.
710                    commit_transaction.transaction.put(
711                        "last_row_number".as_bytes(),
712                        commit_transaction
713                            .info
714                            .row_number
715                            .to_string()
716                            .as_bytes(),
717                    )?;
718                }
719
720                let db_key = *{
721                    match self.delta_trie_root.as_ref().unwrap() {
722                        // Dirty state are committed.
723                        NodeRefDeltaMpt::Dirty { index: _ } => {
724                            unreachable!();
725                        }
726                        // Empty block's state root points to its base state.
727                        NodeRefDeltaMpt::Committed { db_key } => db_key,
728                    }
729                };
730
731                commit_transaction.transaction.put(
732                    ["db_key_for_epoch_id_".as_bytes(), epoch_id.as_ref()]
733                        .concat()
734                        .as_slice(),
735                    db_key.to_string().as_bytes(),
736                )?;
737
738                commit_transaction.transaction.put(
739                    ["db_key_for_root_".as_bytes(), merkle_root.as_ref()]
740                        .concat()
741                        .as_slice(),
742                    db_key.to_string().as_bytes(),
743                )?;
744
745                self.manager.number_committed_nodes.fetch_add(
746                    (commit_transaction.info.row_number.value
747                        - start_row_number) as usize,
748                    Ordering::Relaxed,
749                );
750            }
751        }
752        trace!("root after commit: {:?}", self.delta_trie_root);
753
754        commit_transaction.transaction.put(
755            ["parent_epoch_id_".as_bytes(), epoch_id.as_ref()]
756                .concat()
757                .as_slice(),
758            self.parent_epoch_id.as_ref().to_hex::<String>().as_bytes(),
759        )?;
760
761        {
762            let arc_db = self.delta_trie.get_arc_db()?;
763            commit_transaction
764                .transaction
765                .commit(arc_db.db_ref().as_any())?;
766        }
767
768        self.delta_trie.state_root_committed(
769            epoch_id,
770            merkle_root,
771            self.parent_epoch_id,
772            self.delta_trie_root.clone(),
773        );
774
775        self.dirty = false;
776
777        Ok(())
778    }
779
780    fn state_root_check(&self) -> Result<MerkleHash> {
781        let maybe_merkle_root =
782            self.delta_trie.get_merkle(self.delta_trie_root.clone())?;
783        match maybe_merkle_root {
784            // Empty state.
785            None => Ok(MERKLE_NULL_NODE),
786            Some(merkle_hash) => {
787                // Non-empty state
788                if merkle_hash.is_zero() {
789                    Err(Error::StateCommitWithoutMerkleHash.into())
790                } else {
791                    Ok(merkle_hash)
792                }
793            }
794        }
795    }
796
797    pub fn dump<DUMPER: KVInserter<MptKeyValue>>(
798        &self, dumper: &mut DUMPER,
799    ) -> Result<()> {
800        let inserter = DeltaMptIterator {
801            mpt: self.delta_trie.clone(),
802            maybe_root_node: self.delta_trie_root.clone(),
803        };
804
805        inserter.iterate(dumper)
806    }
807
808    fn revert(&mut self) {
809        self.dirty = false;
810
811        // Free all modified nodes.
812        let owned_node_set = self.owned_node_set.as_ref().unwrap();
813        for owned_node in owned_node_set {
814            self.delta_trie.get_node_memory_manager().free_owned_node(
815                &mut owned_node.clone(),
816                self.delta_trie.get_mpt_id(),
817            );
818        }
819    }
820
821    /// Delete all key/value pairs with access_key_prefix as prefix. These
822    /// key/value pairs exist in three places: Delta Trie, Intermediate Trie
823    /// and Snapshot DB.
824    ///
825    /// For key/value pairs in Delta Trie, we can simply delete them. For
826    /// key/value pairs in Intermediate Trie and Snapshot DB, we try to
827    /// enumerate all key/value pairs and set tombstone in Delta Trie only when
828    /// necessary.
829    ///
830    /// When AM is Read, only calculate the key values to be deleted.
831    fn delete_all_impl<AM: access_mode::AccessMode>(
832        &mut self, access_key_prefix: StorageKeyWithSpace,
833    ) -> Result<Option<Vec<MptKeyValue>>> {
834        if AM::READ_ONLY {
835            self.ensure_temp_slab_for_db_load();
836        } else {
837            self.pre_modification();
838        }
839
840        // Retrieve and delete key/value pairs from delta trie
841        let delta_trie_kvs = match &self.delta_trie_root {
842            None => None,
843            Some(old_root_node) => {
844                let delta_mpt_key_prefix = access_key_prefix
845                    .to_delta_mpt_key_bytes(&self.delta_trie_key_padding);
846                let deleted = if AM::READ_ONLY {
847                    SubTrieVisitor::new(
848                        &self.delta_trie,
849                        old_root_node.clone(),
850                        &mut self.owned_node_set,
851                    )?
852                    .traversal(&delta_mpt_key_prefix, &delta_mpt_key_prefix)?
853                } else {
854                    let (deleted, _, root_node) = SubTrieVisitor::new(
855                        &self.delta_trie,
856                        old_root_node.clone(),
857                        &mut self.owned_node_set,
858                    )?
859                    .delete_all(&delta_mpt_key_prefix, &delta_mpt_key_prefix)?;
860                    self.delta_trie_root =
861                        root_node.map(|maybe_node| maybe_node.into());
862
863                    deleted
864                };
865                deleted
866            }
867        };
868
869        // Retrieve key/value pairs from intermediate trie
870        let intermediate_trie_kvs = match &self.intermediate_trie_root {
871            None => None,
872            Some(root_node) => {
873                if self.maybe_intermediate_trie_key_padding.is_some()
874                    && self.maybe_intermediate_trie.is_some()
875                {
876                    let intermediate_trie_key_padding = self
877                        .maybe_intermediate_trie_key_padding
878                        .as_ref()
879                        .unwrap();
880                    let intermediate_mpt_key_prefix = access_key_prefix
881                        .to_delta_mpt_key_bytes(intermediate_trie_key_padding);
882                    let values = SubTrieVisitor::new(
883                        self.maybe_intermediate_trie.as_ref().unwrap(),
884                        root_node.clone(),
885                        &mut self.owned_node_set,
886                    )?
887                    .traversal(
888                        &intermediate_mpt_key_prefix,
889                        &intermediate_mpt_key_prefix,
890                    )?;
891
892                    values
893                } else {
894                    None
895                }
896            }
897        };
898
899        // Retrieve key/value pairs from snapshot
900        let mut kv_iterator = self.snapshot_db.snapshot_kv_iterator()?.take();
901        let lower_bound_incl = access_key_prefix.to_key_bytes();
902        let upper_bound_excl =
903            to_key_prefix_iter_upper_bound(&lower_bound_incl);
904        let mut kvs = kv_iterator
905            .iter_range(
906                lower_bound_incl.as_slice(),
907                upper_bound_excl.as_ref().map(|v| &**v),
908            )?
909            .take();
910
911        let mut snapshot_kvs = Vec::new();
912        while let Some((key, value)) = kvs.next()? {
913            snapshot_kvs.push((key, value));
914        }
915
916        let is_address_search_prefix =
917            if let StorageKey::AddressPrefixKey(prefix) = access_key_prefix.key
918            {
919                Some(prefix)
920            } else {
921                None
922            };
923
924        let mut result = Vec::new();
925        // This is used to keep track of the deleted keys.
926        let mut deleted_keys = HashSet::new();
927        if let Some(kvs) = delta_trie_kvs {
928            for (k, v) in kvs {
929                let storage_key = StorageKeyWithSpace::from_delta_mpt_key(&k);
930                let k = storage_key.to_key_bytes();
931
932                // If it's an address search prefix, and k is not start with
933                // prefix, skip the key.
934                if let Some(prefix) = is_address_search_prefix {
935                    if !k.starts_with(prefix) {
936                        continue;
937                    }
938                }
939
940                deleted_keys.insert(k.clone());
941                if v.len() > 0 {
942                    result.push((k, v));
943                }
944            }
945        }
946
947        if let Some(kvs) = intermediate_trie_kvs {
948            for (k, v) in kvs {
949                let storage_key = StorageKeyWithSpace::from_delta_mpt_key(&k);
950                let k = storage_key.to_key_bytes();
951
952                // If it's an address search prefix, and k is not start with
953                // prefix, skip the key.
954                if let Some(prefix) = is_address_search_prefix {
955                    if !k.starts_with(prefix) {
956                        continue;
957                    }
958                }
959
960                // Only delete non-empty keys.
961                if v.len() > 0 && !AM::READ_ONLY {
962                    self.delete(storage_key)?;
963                }
964
965                if !deleted_keys.contains(&k) {
966                    deleted_keys.insert(k.clone());
967                    if v.len() > 0 {
968                        result.push((k, v));
969                    }
970                }
971            }
972        }
973
974        // No need to check v.len() because there are no tombStone values in
975        // snapshot.
976        for (k, v) in snapshot_kvs {
977            let storage_key =
978                StorageKeyWithSpace::from_key_bytes::<SkipInputCheck>(&k);
979            if !AM::READ_ONLY {
980                self.delete(storage_key)?;
981            }
982            if !deleted_keys.contains(&k) {
983                result.push((k, v));
984            }
985        }
986
987        if result.is_empty() {
988            Ok(None)
989        } else {
990            Ok(Some(result))
991        }
992    }
993
994    pub fn read_all_with_callback_impl(
995        &mut self, access_key_prefix: StorageKeyWithSpace,
996        callback: &mut dyn FnMut(MptKeyValue), only_account_key: bool,
997    ) -> Result<()> {
998        self.ensure_temp_slab_for_db_load();
999
1000        let is_address_search_prefix =
1001            if let StorageKey::AddressPrefixKey(prefix) = access_key_prefix.key
1002            {
1003                Some(prefix)
1004            } else {
1005                None
1006            };
1007
1008        // This is used to keep track of the deleted keys.
1009        let mut deleted_keys = HashSet::new();
1010
1011        // Retrieve and delete key/value pairs from delta trie
1012        if let Some(old_root_node) = &self.delta_trie_root {
1013            let mut inner_callback = |(k, v): MptKeyValue| {
1014                let storage_key = StorageKeyWithSpace::from_delta_mpt_key(&k);
1015                let k = storage_key.to_key_bytes();
1016
1017                // If it's an address search prefix, and k is not start with
1018                // prefix, skip the key.
1019                if let Some(prefix) = is_address_search_prefix {
1020                    if !k.starts_with(prefix) {
1021                        return;
1022                    }
1023                }
1024                deleted_keys.insert(k.clone());
1025                if v.len() > 0 {
1026                    callback((k, v));
1027                }
1028            };
1029            let delta_mpt_key_prefix = access_key_prefix
1030                .to_delta_mpt_key_bytes(&self.delta_trie_key_padding);
1031            SubTrieVisitor::new(
1032                &self.delta_trie,
1033                old_root_node.clone(),
1034                &mut self.owned_node_set,
1035            )?
1036            .traversal_with_callback(
1037                &delta_mpt_key_prefix,
1038                &delta_mpt_key_prefix,
1039                &mut inner_callback,
1040                true,
1041                only_account_key,
1042            )?;
1043        };
1044
1045        // Retrieve key/value pairs from intermediate trie
1046        if let Some(root_node) = &self.intermediate_trie_root {
1047            let mut inner_callback = |(k, v): MptKeyValue| {
1048                let storage_key = StorageKeyWithSpace::from_delta_mpt_key(&k);
1049                let k = storage_key.to_key_bytes();
1050
1051                // If it's an address search prefix, and k is not start with
1052                // prefix, skip the key.
1053                if let Some(prefix) = is_address_search_prefix {
1054                    if !k.starts_with(prefix) {
1055                        return;
1056                    }
1057                }
1058
1059                if !deleted_keys.contains(&k) {
1060                    deleted_keys.insert(k.clone());
1061                    if v.len() > 0 {
1062                        callback((k, v));
1063                    }
1064                }
1065            };
1066            if self.maybe_intermediate_trie_key_padding.is_some()
1067                && self.maybe_intermediate_trie.is_some()
1068            {
1069                let intermediate_trie_key_padding =
1070                    self.maybe_intermediate_trie_key_padding.as_ref().unwrap();
1071                let intermediate_mpt_key_prefix = access_key_prefix
1072                    .to_delta_mpt_key_bytes(intermediate_trie_key_padding);
1073                SubTrieVisitor::new(
1074                    self.maybe_intermediate_trie.as_ref().unwrap(),
1075                    root_node.clone(),
1076                    &mut self.owned_node_set,
1077                )?
1078                .traversal_with_callback(
1079                    &intermediate_mpt_key_prefix,
1080                    &intermediate_mpt_key_prefix,
1081                    &mut inner_callback,
1082                    true,
1083                    only_account_key,
1084                )?;
1085            }
1086        }
1087
1088        // Retrieve key/value pairs from snapshot
1089        let mut kv_iterator = self.snapshot_db.snapshot_kv_iterator()?.take();
1090        let lower_bound_incl = access_key_prefix.to_key_bytes();
1091        let upper_bound_excl =
1092            to_key_prefix_iter_upper_bound(&lower_bound_incl);
1093        let mut kvs = kv_iterator
1094            .iter_range(
1095                lower_bound_incl.as_slice(),
1096                upper_bound_excl.as_ref().map(|v| &**v),
1097            )?
1098            .take();
1099
1100        while let Some((k, v)) = kvs.next()? {
1101            if !deleted_keys.contains(&k) {
1102                callback((k, v));
1103            }
1104        }
1105
1106        Ok(())
1107    }
1108}
1109
1110use crate::{
1111    impls::{
1112        delta_mpt::{node_memory_manager::ActualSlabIndex, *},
1113        errors::*,
1114        merkle_patricia_trie::{
1115            mpt_cursor::{BasicPathNode, CursorOpenPathTerminal, MptCursor},
1116            KVInserter, MptKeyValue, TrieProof, VanillaChildrenTable,
1117        },
1118        node_merkle_proof::NodeMerkleProof,
1119        state_manager::*,
1120        state_proof::StateProof,
1121    },
1122    state::*,
1123    storage_db::*,
1124    utils::{access_mode, to_key_prefix_iter_upper_bound},
1125    StorageRootProof,
1126};
1127use cfx_internal_common::{StateRootAuxInfo, StateRootWithAuxInfo};
1128use cfx_types::AddressWithSpace;
1129use fallible_iterator::FallibleIterator;
1130use primitives::{
1131    DeltaMptKeyPadding, EpochId, MerkleHash, MptValue, NodeMerkleTriplet,
1132    SkipInputCheck, StateRoot, StaticBool, StorageKey, StorageKeyWithSpace,
1133    StorageRoot, MERKLE_NULL_NODE, NULL_EPOCH,
1134};
1135use rustc_hex::ToHex;
1136use std::{
1137    cell::UnsafeCell,
1138    collections::{BTreeMap, HashSet},
1139    sync::{atomic::Ordering, Arc},
1140};