pub mod cache;
pub mod cache_manager_delta_mpts;
pub mod cow_node_ref;
pub mod delta_mpt_iterator;
pub mod delta_mpt_open_db_manager;
mod mem_optimized_trie_node;
pub(in super::super) mod node_memory_manager;
mod node_ref;
pub(super) mod node_ref_map;
pub(super) mod owned_node_set;
pub(super) mod return_after_use;
pub(super) mod row_number;
mod slab;
pub mod subtrie_visitor;
#[cfg(test)]
mod tests;
pub use self::{
cow_node_ref::CowNodeRef,
delta_mpt_iterator::DeltaMptIterator,
delta_mpt_open_db_manager::{
ArcDeltaDbWrapper, OpenDeltaDbLru, OpenableOnDemandOpenDeltaDbTrait,
},
mem_optimized_trie_node::MemOptimizedTrieNode,
node_memory_manager::{TrieNodeDeltaMpt, TrieNodeDeltaMptCell},
node_ref::*,
node_ref_map::DEFAULT_NODE_MAP_SIZE,
owned_node_set::OwnedNodeSet,
slab::Slab,
subtrie_visitor::SubTrieVisitor,
};
pub type DeltaMpt = MultiVersionMerklePatriciaTrie;
pub type ChildrenTableDeltaMpt = CompactedChildrenTable<NodeRefDeltaMptCompact>;
pub type ChildrenTableManagedDeltaMpt = ChildrenTable<NodeRefDeltaMptCompact>;
#[derive(Default)]
pub struct AtomicCommit {
pub row_number: RowNumber,
}
pub struct AtomicCommitTransaction<
'a,
Transaction: BorrowMut<DeltaDbTransactionTraitObj>,
> {
pub info: MutexGuard<'a, AtomicCommit>,
pub transaction: Transaction,
}
pub struct MultiVersionMerklePatriciaTrie {
mpt_id: DeltaMptId,
root_node_by_epoch: RwLock<HashMap<EpochId, Option<NodeRefDeltaMpt>>>,
root_node_by_merkle_root: RwLock<HashMap<MerkleHash, NodeRefDeltaMpt>>,
node_memory_manager: Arc<DeltaMptsNodeMemoryManager>,
db_manager: Arc<dyn OpenableOnDemandOpenDeltaDbTrait>,
#[allow(unused)]
delta_mpts_releaser: Option<DeltaDbReleaser>,
commit_lock: Mutex<AtomicCommit>,
parent_epoch_by_epoch: RwLock<HashMap<EpochId, EpochId>>,
}
impl MallocSizeOf for MultiVersionMerklePatriciaTrie {
fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
let mut size = 0;
size += self.root_node_by_epoch.size_of(ops);
size += self.root_node_by_merkle_root.size_of(ops);
size += self.node_memory_manager.size_of(ops);
size += self.parent_epoch_by_epoch.size_of(ops);
size
}
}
impl MultiVersionMerklePatriciaTrie {
pub fn new(
db_manager: Arc<dyn OpenableOnDemandOpenDeltaDbTrait>,
snapshot_epoch_id: EpochId, storage_manager: Arc<StorageManager>,
mpt_id: DeltaMptId,
node_memory_manager: Arc<DeltaMptsNodeMemoryManager>,
) -> Result<Self> {
let row_number = Self::parse_row_number(
db_manager.open(mpt_id)?.get("last_row_number".as_bytes()),
)
.unwrap()
.unwrap_or_default();
debug!("Created DeltaMpt with id {}", mpt_id);
Ok(Self {
mpt_id,
root_node_by_epoch: Default::default(),
root_node_by_merkle_root: Default::default(),
node_memory_manager,
delta_mpts_releaser: Some(DeltaDbReleaser {
snapshot_epoch_id,
storage_manager: Arc::downgrade(&storage_manager),
mpt_id,
}),
db_manager,
commit_lock: Mutex::new(AtomicCommit {
row_number: RowNumber { value: row_number },
}),
parent_epoch_by_epoch: Default::default(),
})
}
pub fn new_single_mpt(
db_manager: Arc<dyn OpenableOnDemandOpenDeltaDbTrait>,
node_memory_manager: Arc<DeltaMptsNodeMemoryManager>,
) -> Result<Self> {
let mpt_id = 0;
let row_number = Self::parse_row_number(
db_manager.open(mpt_id)?.get("last_row_number".as_bytes()),
)
.unwrap()
.unwrap_or_default();
Ok(Self {
mpt_id,
root_node_by_epoch: Default::default(),
root_node_by_merkle_root: Default::default(),
node_memory_manager,
delta_mpts_releaser: None,
db_manager,
commit_lock: Mutex::new(AtomicCommit {
row_number: RowNumber { value: row_number },
}),
parent_epoch_by_epoch: Default::default(),
})
}
pub fn get_mpt_id(&self) -> DeltaMptId { self.mpt_id }
pub fn start_commit(
&self,
) -> Result<AtomicCommitTransaction<Box<DeltaDbTransactionTraitObj>>> {
Ok(AtomicCommitTransaction {
info: self.commit_lock.lock(),
transaction: self.get_arc_db()?.start_transaction_dyn(true)?,
})
}
pub(super) fn state_root_committed(
&self, epoch_id: EpochId, merkle_root: &MerkleHash,
parent_epoch_id: EpochId, root_node: Option<NodeRefDeltaMpt>,
) {
self.set_parent_epoch(epoch_id, parent_epoch_id.clone());
if root_node.is_some() {
self.set_root_node_ref(
merkle_root.clone(),
root_node.clone().unwrap(),
);
}
self.set_epoch_root(epoch_id, root_node);
}
fn load_root_node_ref_from_db(
&self, merkle_root: &MerkleHash,
) -> Result<Option<NodeRefDeltaMpt>> {
let db_key_result = Self::parse_row_number(
self.get_arc_db()?.get(
["db_key_for_root_".as_bytes(), merkle_root.as_ref()]
.concat()
.as_slice(),
),
)?;
match db_key_result {
Some(db_key) => Ok(Some(self.loaded_root(merkle_root, db_key))),
None => Ok(None),
}
}
fn load_root_node_ref_from_db_by_epoch(
&self, epoch_id: &EpochId,
) -> Result<Option<Option<NodeRefDeltaMpt>>> {
let db_key_result = Self::parse_row_number(
self.get_arc_db()?.get(
["db_key_for_epoch_id_".as_bytes(), epoch_id.as_ref()]
.concat()
.as_slice(),
),
)?;
match db_key_result {
Some(db_key) => {
Ok(Some(self.loaded_root_at_epoch(epoch_id, db_key)))
}
None => {
if self.get_parent_epoch(epoch_id)?.is_some() {
Ok(Some(None))
} else {
Ok(None)
}
}
}
}
fn load_parent_epoch_id_from_db(
&self, epoch_id: &EpochId,
) -> Result<Option<EpochId>> {
let parent_epoch_id_result =
self.get_arc_db()?.get(
["parent_epoch_id_".as_bytes(), epoch_id.as_ref()]
.concat()
.as_slice(),
)?;
match parent_epoch_id_result {
Some(parent_epoch_id_hexstr) => {
let parent_epoch_id = hexstr_to_h256(unsafe {
std::str::from_utf8_unchecked(&*parent_epoch_id_hexstr)
});
self.set_parent_epoch(
epoch_id.clone(),
parent_epoch_id.clone(),
);
Ok(Some(parent_epoch_id))
}
None => Ok(None),
}
}
pub fn get_root_node_ref_by_epoch(
&self, epoch_id: &EpochId,
) -> Result<Option<Option<NodeRefDeltaMpt>>> {
let node_ref = self.root_node_by_epoch.read().get(epoch_id).cloned();
if node_ref.is_none() {
self.load_root_node_ref_from_db_by_epoch(epoch_id)
} else {
Ok(node_ref)
}
}
pub fn get_root_node_ref(
&self, merkle_root: &MerkleHash,
) -> Result<Option<NodeRefDeltaMpt>> {
let node_ref = self
.root_node_by_merkle_root
.read()
.get(merkle_root)
.cloned();
if node_ref.is_none() {
self.load_root_node_ref_from_db(merkle_root)
} else {
Ok(node_ref)
}
}
pub fn get_parent_epoch(
&self, epoch_id: &EpochId,
) -> Result<Option<EpochId>> {
let parent_epoch =
self.parent_epoch_by_epoch.read().get(epoch_id).cloned();
if parent_epoch.is_none() {
self.load_parent_epoch_id_from_db(epoch_id)
} else {
Ok(parent_epoch)
}
}
fn set_epoch_root(&self, epoch_id: EpochId, root: Option<NodeRefDeltaMpt>) {
self.root_node_by_epoch.write().insert(epoch_id, root);
}
fn set_root_node_ref(
&self, merkle_root: MerkleHash, node_ref: NodeRefDeltaMpt,
) {
self.root_node_by_merkle_root
.write()
.insert(merkle_root, node_ref);
}
fn set_parent_epoch(&self, epoch_id: EpochId, parent_epoch_id: EpochId) {
self.parent_epoch_by_epoch
.write()
.insert(epoch_id, parent_epoch_id);
}
fn loaded_root(
&self, merkle_root: &MerkleHash, db_key: DeltaMptDbKey,
) -> NodeRefDeltaMpt {
let root = NodeRefDeltaMpt::Committed { db_key };
self.set_root_node_ref(*merkle_root, root.clone());
root
}
fn loaded_root_at_epoch(
&self, epoch_id: &EpochId, db_key: DeltaMptDbKey,
) -> Option<NodeRefDeltaMpt> {
let root = Some(NodeRefDeltaMpt::Committed { db_key });
self.set_epoch_root(*epoch_id, root.clone());
root
}
pub fn get_node_memory_manager(&self) -> &DeltaMptsNodeMemoryManager {
&self.node_memory_manager
}
pub fn get_merkle(
&self, maybe_node: Option<NodeRefDeltaMpt>,
) -> Result<Option<MerkleHash>> {
match maybe_node {
Some(node) => Ok(Some({
let arc_db = self.get_arc_db()?;
let merkle = self
.node_memory_manager
.node_as_ref_with_cache_manager(
&self.node_memory_manager.get_allocator(),
node,
self.node_memory_manager.get_cache_manager(),
&mut *arc_db.to_owned_read()?,
self.mpt_id,
&mut false,
)?
.get_merkle()
.clone();
merkle
})),
None => Ok(None),
}
}
pub fn get_merkle_root_by_epoch_id(
&self, epoch_id: &EpochId,
) -> Result<Option<MerkleHash>> {
match self.get_root_node_ref_by_epoch(epoch_id)? {
None => Ok(None),
Some(root_node) => {
Ok(self.get_merkle(root_node)?.or(Some(MERKLE_NULL_NODE)))
}
}
}
pub fn log_usage(&self) { self.node_memory_manager.log_usage(); }
}
impl MultiVersionMerklePatriciaTrie {
fn parse_row_number(
x: Result<Option<Box<[u8]>>>,
) -> Result<Option<RowNumberUnderlyingType>> {
Ok(match x?.as_ref() {
None => None,
Some(row_number_bytes) => {
trace!("parse_row_number:{:?}", row_number_bytes);
Some(
unsafe {
std::str::from_utf8_unchecked(row_number_bytes.as_ref())
}
.parse::<RowNumberUnderlyingType>()?,
)
}
})
}
pub fn get_arc_db(&self) -> Result<ArcDeltaDbWrapper> {
self.db_manager.open(self.mpt_id)
}
}
#[derive(Default)]
pub struct DeltaMptIdGen {
id_limit: DeltaMptId,
available_ids: Vec<DeltaMptId>,
}
impl DeltaMptIdGen {
pub fn allocate(&mut self) -> Result<DeltaMptId> {
let id;
match self.available_ids.pop() {
None => {
if self.id_limit != DeltaMptId::max_value() {
id = Ok(self.id_limit);
self.id_limit += 1;
} else {
id = Err(Error::TooManyDeltaMPT.into())
}
}
Some(x) => id = Ok(x),
};
id
}
pub fn free(&mut self, id: DeltaMptId) {
let max_id = self.id_limit - 1;
if id == max_id {
self.id_limit = max_id
} else {
self.available_ids.push(id);
}
}
}
use self::{
node_memory_manager::*, node_ref_map::DeltaMptDbKey, row_number::*,
};
use crate::{
impls::{
delta_mpt::node_ref_map::DeltaMptId, errors::*,
merkle_patricia_trie::*, storage_manager::storage_manager::*,
},
storage_db::delta_db_manager::DeltaDbTransactionTraitObj,
};
use cfx_types::hexstr_to_h256;
use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
use parking_lot::{Mutex, MutexGuard, RwLock};
use primitives::{EpochId, MerkleHash, MERKLE_NULL_NODE};
use std::{borrow::BorrowMut, collections::HashMap, sync::Arc};