cfx_storage/impls/delta_mpt/
mod.rspub mod cache;
pub mod cache_manager_delta_mpts;
pub mod cow_node_ref;
pub mod delta_mpt_iterator;
pub mod delta_mpt_open_db_manager;
mod mem_optimized_trie_node;
pub(in super::super) mod node_memory_manager;
mod node_ref;
pub(super) mod node_ref_map;
pub(super) mod owned_node_set;
pub(super) mod return_after_use;
pub(super) mod row_number;
mod slab;
pub mod subtrie_visitor;
#[cfg(test)]
mod tests;
pub use self::{
    cow_node_ref::CowNodeRef,
    delta_mpt_iterator::DeltaMptIterator,
    delta_mpt_open_db_manager::{
        ArcDeltaDbWrapper, OpenDeltaDbLru, OpenableOnDemandOpenDeltaDbTrait,
    },
    mem_optimized_trie_node::MemOptimizedTrieNode,
    node_memory_manager::{TrieNodeDeltaMpt, TrieNodeDeltaMptCell},
    node_ref::*,
    node_ref_map::DEFAULT_NODE_MAP_SIZE,
    owned_node_set::OwnedNodeSet,
    slab::Slab,
    subtrie_visitor::SubTrieVisitor,
};
pub type DeltaMpt = MultiVersionMerklePatriciaTrie;
pub type ChildrenTableDeltaMpt = CompactedChildrenTable<NodeRefDeltaMptCompact>;
pub type ChildrenTableManagedDeltaMpt = ChildrenTable<NodeRefDeltaMptCompact>;
#[derive(Default)]
pub struct AtomicCommit {
    pub row_number: RowNumber,
}
pub struct AtomicCommitTransaction<
    'a,
    Transaction: BorrowMut<DeltaDbTransactionTraitObj>,
> {
    pub info: MutexGuard<'a, AtomicCommit>,
    pub transaction: Transaction,
}
pub struct MultiVersionMerklePatriciaTrie {
    mpt_id: DeltaMptId,
    root_node_by_epoch: RwLock<HashMap<EpochId, Option<NodeRefDeltaMpt>>>,
    root_node_by_merkle_root: RwLock<HashMap<MerkleHash, NodeRefDeltaMpt>>,
    node_memory_manager: Arc<DeltaMptsNodeMemoryManager>,
    db_manager: Arc<dyn OpenableOnDemandOpenDeltaDbTrait>,
    #[allow(unused)]
    delta_mpts_releaser: Option<DeltaDbReleaser>,
    commit_lock: Mutex<AtomicCommit>,
    parent_epoch_by_epoch: RwLock<HashMap<EpochId, EpochId>>,
}
impl MallocSizeOf for MultiVersionMerklePatriciaTrie {
    fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
        let mut size = 0;
        size += self.root_node_by_epoch.size_of(ops);
        size += self.root_node_by_merkle_root.size_of(ops);
        size += self.node_memory_manager.size_of(ops);
        size += self.parent_epoch_by_epoch.size_of(ops);
        size
    }
}
impl MultiVersionMerklePatriciaTrie {
    pub fn new(
        db_manager: Arc<dyn OpenableOnDemandOpenDeltaDbTrait>,
        snapshot_epoch_id: EpochId, storage_manager: Arc<StorageManager>,
        mpt_id: DeltaMptId,
        node_memory_manager: Arc<DeltaMptsNodeMemoryManager>,
    ) -> Result<Self> {
        let row_number = Self::parse_row_number(
            db_manager.open(mpt_id)?.get("last_row_number".as_bytes()),
        )
        .unwrap()
        .unwrap_or_default();
        debug!("Created DeltaMpt with id {}", mpt_id);
        Ok(Self {
            mpt_id,
            root_node_by_epoch: Default::default(),
            root_node_by_merkle_root: Default::default(),
            node_memory_manager,
            delta_mpts_releaser: Some(DeltaDbReleaser {
                snapshot_epoch_id,
                storage_manager: Arc::downgrade(&storage_manager),
                mpt_id,
            }),
            db_manager,
            commit_lock: Mutex::new(AtomicCommit {
                row_number: RowNumber { value: row_number },
            }),
            parent_epoch_by_epoch: Default::default(),
        })
    }
    pub fn new_single_mpt(
        db_manager: Arc<dyn OpenableOnDemandOpenDeltaDbTrait>,
        node_memory_manager: Arc<DeltaMptsNodeMemoryManager>,
    ) -> Result<Self> {
        let mpt_id = 0;
        let row_number = Self::parse_row_number(
            db_manager.open(mpt_id)?.get("last_row_number".as_bytes()),
        )
        .unwrap()
        .unwrap_or_default();
        Ok(Self {
            mpt_id,
            root_node_by_epoch: Default::default(),
            root_node_by_merkle_root: Default::default(),
            node_memory_manager,
            delta_mpts_releaser: None,
            db_manager,
            commit_lock: Mutex::new(AtomicCommit {
                row_number: RowNumber { value: row_number },
            }),
            parent_epoch_by_epoch: Default::default(),
        })
    }
    pub fn get_mpt_id(&self) -> DeltaMptId { self.mpt_id }
    pub fn start_commit(
        &self,
    ) -> Result<AtomicCommitTransaction<Box<DeltaDbTransactionTraitObj>>> {
        Ok(AtomicCommitTransaction {
            info: self.commit_lock.lock(),
            transaction: self.get_arc_db()?.start_transaction_dyn(true)?,
        })
    }
    pub(super) fn state_root_committed(
        &self, epoch_id: EpochId, merkle_root: &MerkleHash,
        parent_epoch_id: EpochId, root_node: Option<NodeRefDeltaMpt>,
    ) {
        self.set_parent_epoch(epoch_id, parent_epoch_id.clone());
        if root_node.is_some() {
            self.set_root_node_ref(
                merkle_root.clone(),
                root_node.clone().unwrap(),
            );
        }
        self.set_epoch_root(epoch_id, root_node);
    }
    fn load_root_node_ref_from_db(
        &self, merkle_root: &MerkleHash,
    ) -> Result<Option<NodeRefDeltaMpt>> {
        let db_key_result = Self::parse_row_number(
            self.get_arc_db()?.get(
                ["db_key_for_root_".as_bytes(), merkle_root.as_ref()]
                    .concat()
                    .as_slice(),
            ),
        )?;
        match db_key_result {
            Some(db_key) => Ok(Some(self.loaded_root(merkle_root, db_key))),
            None => Ok(None),
        }
    }
    fn load_root_node_ref_from_db_by_epoch(
        &self, epoch_id: &EpochId,
    ) -> Result<Option<Option<NodeRefDeltaMpt>>> {
        let db_key_result = Self::parse_row_number(
            self.get_arc_db()?.get(
                ["db_key_for_epoch_id_".as_bytes(), epoch_id.as_ref()]
                    .concat()
                    .as_slice(),
            ),
        )?;
        match db_key_result {
            Some(db_key) => {
                Ok(Some(self.loaded_root_at_epoch(epoch_id, db_key)))
            }
            None => {
                if self.get_parent_epoch(epoch_id)?.is_some() {
                    Ok(Some(None))
                } else {
                    Ok(None)
                }
            }
        }
    }
    fn load_parent_epoch_id_from_db(
        &self, epoch_id: &EpochId,
    ) -> Result<Option<EpochId>> {
        let parent_epoch_id_result =
            self.get_arc_db()?.get(
                ["parent_epoch_id_".as_bytes(), epoch_id.as_ref()]
                    .concat()
                    .as_slice(),
            )?;
        match parent_epoch_id_result {
            Some(parent_epoch_id_hexstr) => {
                let parent_epoch_id = hexstr_to_h256(unsafe {
                    std::str::from_utf8_unchecked(&*parent_epoch_id_hexstr)
                });
                self.set_parent_epoch(
                    epoch_id.clone(),
                    parent_epoch_id.clone(),
                );
                Ok(Some(parent_epoch_id))
            }
            None => Ok(None),
        }
    }
    pub fn get_root_node_ref_by_epoch(
        &self, epoch_id: &EpochId,
    ) -> Result<Option<Option<NodeRefDeltaMpt>>> {
        let node_ref = self.root_node_by_epoch.read().get(epoch_id).cloned();
        if node_ref.is_none() {
            self.load_root_node_ref_from_db_by_epoch(epoch_id)
        } else {
            Ok(node_ref)
        }
    }
    pub fn get_root_node_ref(
        &self, merkle_root: &MerkleHash,
    ) -> Result<Option<NodeRefDeltaMpt>> {
        let node_ref = self
            .root_node_by_merkle_root
            .read()
            .get(merkle_root)
            .cloned();
        if node_ref.is_none() {
            self.load_root_node_ref_from_db(merkle_root)
        } else {
            Ok(node_ref)
        }
    }
    pub fn get_parent_epoch(
        &self, epoch_id: &EpochId,
    ) -> Result<Option<EpochId>> {
        let parent_epoch =
            self.parent_epoch_by_epoch.read().get(epoch_id).cloned();
        if parent_epoch.is_none() {
            self.load_parent_epoch_id_from_db(epoch_id)
        } else {
            Ok(parent_epoch)
        }
    }
    fn set_epoch_root(&self, epoch_id: EpochId, root: Option<NodeRefDeltaMpt>) {
        self.root_node_by_epoch.write().insert(epoch_id, root);
    }
    fn set_root_node_ref(
        &self, merkle_root: MerkleHash, node_ref: NodeRefDeltaMpt,
    ) {
        self.root_node_by_merkle_root
            .write()
            .insert(merkle_root, node_ref);
    }
    fn set_parent_epoch(&self, epoch_id: EpochId, parent_epoch_id: EpochId) {
        self.parent_epoch_by_epoch
            .write()
            .insert(epoch_id, parent_epoch_id);
    }
    fn loaded_root(
        &self, merkle_root: &MerkleHash, db_key: DeltaMptDbKey,
    ) -> NodeRefDeltaMpt {
        let root = NodeRefDeltaMpt::Committed { db_key };
        self.set_root_node_ref(*merkle_root, root.clone());
        root
    }
    fn loaded_root_at_epoch(
        &self, epoch_id: &EpochId, db_key: DeltaMptDbKey,
    ) -> Option<NodeRefDeltaMpt> {
        let root = Some(NodeRefDeltaMpt::Committed { db_key });
        self.set_epoch_root(*epoch_id, root.clone());
        root
    }
    pub fn get_node_memory_manager(&self) -> &DeltaMptsNodeMemoryManager {
        &self.node_memory_manager
    }
    pub fn get_merkle(
        &self, maybe_node: Option<NodeRefDeltaMpt>,
    ) -> Result<Option<MerkleHash>> {
        match maybe_node {
            Some(node) => Ok(Some({
                let arc_db = self.get_arc_db()?;
                let merkle = self
                    .node_memory_manager
                    .node_as_ref_with_cache_manager(
                        &self.node_memory_manager.get_allocator(),
                        node,
                        self.node_memory_manager.get_cache_manager(),
                        &mut *arc_db.to_owned_read()?,
                        self.mpt_id,
                        &mut false,
                    )?
                    .get_merkle()
                    .clone();
                merkle
            })),
            None => Ok(None),
        }
    }
    pub fn get_merkle_root_by_epoch_id(
        &self, epoch_id: &EpochId,
    ) -> Result<Option<MerkleHash>> {
        match self.get_root_node_ref_by_epoch(epoch_id)? {
            None => Ok(None),
            Some(root_node) => {
                Ok(self.get_merkle(root_node)?.or(Some(MERKLE_NULL_NODE)))
            }
        }
    }
    pub fn log_usage(&self) { self.node_memory_manager.log_usage(); }
}
impl MultiVersionMerklePatriciaTrie {
    fn parse_row_number(
        x: Result<Option<Box<[u8]>>>,
    ) -> Result<Option<RowNumberUnderlyingType>> {
        Ok(match x?.as_ref() {
            None => None,
            Some(row_number_bytes) => {
                trace!("parse_row_number:{:?}", row_number_bytes);
                Some(
                    unsafe {
                        std::str::from_utf8_unchecked(row_number_bytes.as_ref())
                    }
                    .parse::<RowNumberUnderlyingType>()?,
                )
            }
        })
    }
    pub fn get_arc_db(&self) -> Result<ArcDeltaDbWrapper> {
        self.db_manager.open(self.mpt_id)
    }
}
#[derive(Default)]
pub struct DeltaMptIdGen {
    id_limit: DeltaMptId,
    available_ids: Vec<DeltaMptId>,
}
impl DeltaMptIdGen {
    pub fn allocate(&mut self) -> Result<DeltaMptId> {
        let id;
        match self.available_ids.pop() {
            None => {
                if self.id_limit != DeltaMptId::max_value() {
                    id = Ok(self.id_limit);
                    self.id_limit += 1;
                } else {
                    id = Err(Error::TooManyDeltaMPT.into())
                }
            }
            Some(x) => id = Ok(x),
        };
        id
    }
    pub fn free(&mut self, id: DeltaMptId) {
        let max_id = self.id_limit - 1;
        if id == max_id {
            self.id_limit = max_id
        } else {
            self.available_ids.push(id);
        }
    }
}
use self::{
    node_memory_manager::*, node_ref_map::DeltaMptDbKey, row_number::*,
};
use crate::{
    impls::{
        delta_mpt::node_ref_map::DeltaMptId, errors::*,
        merkle_patricia_trie::*, storage_manager::storage_manager::*,
    },
    storage_db::delta_db_manager::DeltaDbTransactionTraitObj,
};
use cfx_types::hexstr_to_h256;
use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
use parking_lot::{Mutex, MutexGuard, RwLock};
use primitives::{EpochId, MerkleHash, MERKLE_NULL_NODE};
use std::{borrow::BorrowMut, collections::HashMap, sync::Arc};