cfx_storage/impls/delta_mpt/
mod.rs1pub mod cache;
6pub mod cache_manager_delta_mpts;
7pub mod cow_node_ref;
8pub mod delta_mpt_iterator;
9pub mod delta_mpt_open_db_manager;
10mod mem_optimized_trie_node;
11pub(in super::super) mod node_memory_manager;
12mod node_ref;
13pub(super) mod node_ref_map;
14pub(super) mod owned_node_set;
15pub(super) mod return_after_use;
16pub(super) mod row_number;
17mod slab;
20pub mod subtrie_visitor;
21
22#[cfg(test)]
23mod tests;
24
25pub use self::{
26 cow_node_ref::CowNodeRef,
27 delta_mpt_iterator::DeltaMptIterator,
28 delta_mpt_open_db_manager::{
29 ArcDeltaDbWrapper, OpenDeltaDbLru, OpenableOnDemandOpenDeltaDbTrait,
30 },
31 mem_optimized_trie_node::MemOptimizedTrieNode,
32 node_memory_manager::{TrieNodeDeltaMpt, TrieNodeDeltaMptCell},
33 node_ref::*,
34 node_ref_map::DEFAULT_NODE_MAP_SIZE,
35 owned_node_set::OwnedNodeSet,
36 slab::Slab,
37 subtrie_visitor::SubTrieVisitor,
38};
39
40pub type DeltaMpt = MultiVersionMerklePatriciaTrie;
41
42pub type ChildrenTableDeltaMpt = CompactedChildrenTable<NodeRefDeltaMptCompact>;
43pub type ChildrenTableManagedDeltaMpt = ChildrenTable<NodeRefDeltaMptCompact>;
44
45#[derive(Default)]
46pub struct AtomicCommit {
47 pub row_number: RowNumber,
48}
49
50pub struct AtomicCommitTransaction<
51 'a,
52 Transaction: BorrowMut<DeltaDbTransactionTraitObj>,
53> {
54 pub info: MutexGuard<'a, AtomicCommit>,
55 pub transaction: Transaction,
56}
57
58pub struct MultiVersionMerklePatriciaTrie {
59 mpt_id: DeltaMptId,
61 root_node_by_epoch: RwLock<HashMap<EpochId, Option<NodeRefDeltaMpt>>>,
63 root_node_by_merkle_root: RwLock<HashMap<MerkleHash, NodeRefDeltaMpt>>,
65 node_memory_manager: Arc<DeltaMptsNodeMemoryManager>,
77 db_manager: Arc<dyn OpenableOnDemandOpenDeltaDbTrait>,
80 #[allow(unused)]
84 delta_mpts_releaser: Option<DeltaDbReleaser>,
85 commit_lock: Mutex<AtomicCommit>,
87
88 parent_epoch_by_epoch: RwLock<HashMap<EpochId, EpochId>>,
92}
93
94impl MallocSizeOf for MultiVersionMerklePatriciaTrie {
95 fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
96 let mut size = 0;
97 size += self.root_node_by_epoch.size_of(ops);
98 size += self.root_node_by_merkle_root.size_of(ops);
99 size += self.node_memory_manager.size_of(ops);
100 size += self.parent_epoch_by_epoch.size_of(ops);
101 size
102 }
103}
104
105impl MultiVersionMerklePatriciaTrie {
106 pub fn new(
107 db_manager: Arc<dyn OpenableOnDemandOpenDeltaDbTrait>,
108 snapshot_epoch_id: EpochId, storage_manager: Arc<StorageManager>,
109 mpt_id: DeltaMptId,
110 node_memory_manager: Arc<DeltaMptsNodeMemoryManager>,
111 ) -> Result<Self> {
112 let row_number = Self::parse_row_number(
113 db_manager.open(mpt_id)?.get("last_row_number".as_bytes()),
114 )
115 .unwrap()
117 .unwrap_or_default();
118
119 debug!("Created DeltaMpt with id {}", mpt_id);
120
121 Ok(Self {
122 mpt_id,
123 root_node_by_epoch: Default::default(),
124 root_node_by_merkle_root: Default::default(),
125 node_memory_manager,
126 delta_mpts_releaser: Some(DeltaDbReleaser {
127 snapshot_epoch_id,
128 storage_manager: Arc::downgrade(&storage_manager),
129 mpt_id,
130 }),
131 db_manager,
132 commit_lock: Mutex::new(AtomicCommit {
133 row_number: RowNumber { value: row_number },
134 }),
135 parent_epoch_by_epoch: Default::default(),
136 })
137 }
138
139 pub fn new_single_mpt(
140 db_manager: Arc<dyn OpenableOnDemandOpenDeltaDbTrait>,
141 node_memory_manager: Arc<DeltaMptsNodeMemoryManager>,
142 ) -> Result<Self> {
143 let mpt_id = 0;
144 let row_number = Self::parse_row_number(
145 db_manager.open(mpt_id)?.get("last_row_number".as_bytes()),
146 )
147 .unwrap()
149 .unwrap_or_default();
150
151 Ok(Self {
152 mpt_id,
153 root_node_by_epoch: Default::default(),
154 root_node_by_merkle_root: Default::default(),
155 node_memory_manager,
156 delta_mpts_releaser: None,
157 db_manager,
158 commit_lock: Mutex::new(AtomicCommit {
159 row_number: RowNumber { value: row_number },
160 }),
161 parent_epoch_by_epoch: Default::default(),
162 })
163 }
164
165 pub fn get_mpt_id(&self) -> DeltaMptId { self.mpt_id }
166
167 pub fn start_commit(
168 &self,
169 ) -> Result<AtomicCommitTransaction<'_, Box<DeltaDbTransactionTraitObj>>>
170 {
171 Ok(AtomicCommitTransaction {
172 info: self.commit_lock.lock(),
173 transaction: self.get_arc_db()?.start_transaction_dyn(true)?,
174 })
175 }
176
177 pub(super) fn state_root_committed(
178 &self, epoch_id: EpochId, merkle_root: &MerkleHash,
179 parent_epoch_id: EpochId, root_node: Option<NodeRefDeltaMpt>,
180 ) {
181 self.set_parent_epoch(epoch_id, parent_epoch_id.clone());
182 if root_node.is_some() {
183 self.set_root_node_ref(
184 merkle_root.clone(),
185 root_node.clone().unwrap(),
186 );
187 }
188 self.set_epoch_root(epoch_id, root_node);
189 }
190
191 fn load_root_node_ref_from_db(
192 &self, merkle_root: &MerkleHash,
193 ) -> Result<Option<NodeRefDeltaMpt>> {
194 let db_key_result = Self::parse_row_number(
195 self.get_arc_db()?.get(
201 ["db_key_for_root_".as_bytes(), merkle_root.as_ref()]
202 .concat()
203 .as_slice(),
204 ),
205 )?;
206 match db_key_result {
207 Some(db_key) => Ok(Some(self.loaded_root(merkle_root, db_key))),
208 None => Ok(None),
209 }
210 }
211
212 fn load_root_node_ref_from_db_by_epoch(
213 &self, epoch_id: &EpochId,
214 ) -> Result<Option<Option<NodeRefDeltaMpt>>> {
215 let db_key_result = Self::parse_row_number(
216 self.get_arc_db()?.get(
222 ["db_key_for_epoch_id_".as_bytes(), epoch_id.as_ref()]
223 .concat()
224 .as_slice(),
225 ),
226 )?;
227 match db_key_result {
228 Some(db_key) => {
229 Ok(Some(self.loaded_root_at_epoch(epoch_id, db_key)))
230 }
231 None => {
232 if self.get_parent_epoch(epoch_id)?.is_some() {
233 Ok(Some(None))
234 } else {
235 Ok(None)
236 }
237 }
238 }
239 }
240
241 fn load_parent_epoch_id_from_db(
242 &self, epoch_id: &EpochId,
243 ) -> Result<Option<EpochId>> {
244 let parent_epoch_id_result =
245 self.get_arc_db()?.get(
251 ["parent_epoch_id_".as_bytes(), epoch_id.as_ref()]
252 .concat()
253 .as_slice(),
254 )?;
255 match parent_epoch_id_result {
256 Some(parent_epoch_id_hexstr) => {
257 let parent_epoch_id = hexstr_to_h256(unsafe {
258 std::str::from_utf8_unchecked(&*parent_epoch_id_hexstr)
259 });
260 self.set_parent_epoch(
261 epoch_id.clone(),
262 parent_epoch_id.clone(),
263 );
264 Ok(Some(parent_epoch_id))
265 }
266 None => Ok(None),
267 }
268 }
269
270 pub fn get_root_node_ref_by_epoch(
271 &self, epoch_id: &EpochId,
272 ) -> Result<Option<Option<NodeRefDeltaMpt>>> {
273 let node_ref = self.root_node_by_epoch.read().get(epoch_id).cloned();
274 if node_ref.is_none() {
275 self.load_root_node_ref_from_db_by_epoch(epoch_id)
276 } else {
277 Ok(node_ref)
278 }
279 }
280
281 pub fn get_root_node_ref(
283 &self, merkle_root: &MerkleHash,
284 ) -> Result<Option<NodeRefDeltaMpt>> {
285 let node_ref = self
286 .root_node_by_merkle_root
287 .read()
288 .get(merkle_root)
289 .cloned();
290 if node_ref.is_none() {
291 self.load_root_node_ref_from_db(merkle_root)
292 } else {
293 Ok(node_ref)
294 }
295 }
296
297 pub fn get_parent_epoch(
298 &self, epoch_id: &EpochId,
299 ) -> Result<Option<EpochId>> {
300 let parent_epoch =
301 self.parent_epoch_by_epoch.read().get(epoch_id).cloned();
302 if parent_epoch.is_none() {
303 self.load_parent_epoch_id_from_db(epoch_id)
304 } else {
305 Ok(parent_epoch)
306 }
307 }
308
309 fn set_epoch_root(&self, epoch_id: EpochId, root: Option<NodeRefDeltaMpt>) {
312 self.root_node_by_epoch.write().insert(epoch_id, root);
313 }
314
315 fn set_root_node_ref(
316 &self, merkle_root: MerkleHash, node_ref: NodeRefDeltaMpt,
317 ) {
318 self.root_node_by_merkle_root
319 .write()
320 .insert(merkle_root, node_ref);
321 }
322
323 fn set_parent_epoch(&self, epoch_id: EpochId, parent_epoch_id: EpochId) {
324 self.parent_epoch_by_epoch
325 .write()
326 .insert(epoch_id, parent_epoch_id);
327 }
328
329 fn loaded_root(
330 &self, merkle_root: &MerkleHash, db_key: DeltaMptDbKey,
331 ) -> NodeRefDeltaMpt {
332 let root = NodeRefDeltaMpt::Committed { db_key };
333 self.set_root_node_ref(*merkle_root, root.clone());
334
335 root
336 }
337
338 fn loaded_root_at_epoch(
339 &self, epoch_id: &EpochId, db_key: DeltaMptDbKey,
340 ) -> Option<NodeRefDeltaMpt> {
341 let root = Some(NodeRefDeltaMpt::Committed { db_key });
342 self.set_epoch_root(*epoch_id, root.clone());
343
344 root
345 }
346
347 pub fn get_node_memory_manager(&self) -> &DeltaMptsNodeMemoryManager {
348 &self.node_memory_manager
349 }
350
351 pub fn get_merkle(
352 &self, maybe_node: Option<NodeRefDeltaMpt>,
353 ) -> Result<Option<MerkleHash>> {
354 match maybe_node {
355 Some(node) => Ok(Some({
356 let arc_db = self.get_arc_db()?;
357 let merkle = self
359 .node_memory_manager
360 .node_as_ref_with_cache_manager(
361 &self.node_memory_manager.get_allocator(),
362 node,
363 self.node_memory_manager.get_cache_manager(),
364 &mut *arc_db.to_owned_read()?,
365 self.mpt_id,
366 &mut false,
367 )?
368 .get_merkle()
369 .clone();
370 merkle
371 })),
372 None => Ok(None),
373 }
374 }
375
376 pub fn get_merkle_root_by_epoch_id(
377 &self, epoch_id: &EpochId,
378 ) -> Result<Option<MerkleHash>> {
379 match self.get_root_node_ref_by_epoch(epoch_id)? {
380 None => Ok(None),
381 Some(root_node) => {
382 Ok(self.get_merkle(root_node)?.or(Some(MERKLE_NULL_NODE)))
383 }
384 }
385 }
386
387 pub fn log_usage(&self) { self.node_memory_manager.log_usage(); }
388}
389
390impl MultiVersionMerklePatriciaTrie {
392 fn parse_row_number(
393 x: Result<Option<Box<[u8]>>>,
394 ) -> Result<Option<RowNumberUnderlyingType>> {
395 Ok(match x?.as_ref() {
396 None => None,
397 Some(row_number_bytes) => {
398 trace!("parse_row_number:{:?}", row_number_bytes);
399 Some(
400 unsafe {
401 std::str::from_utf8_unchecked(row_number_bytes.as_ref())
402 }
403 .parse::<RowNumberUnderlyingType>()?,
404 )
405 }
406 })
407 }
408
409 pub fn get_arc_db(&self) -> Result<ArcDeltaDbWrapper> {
410 self.db_manager.open(self.mpt_id)
411 }
412}
413
414#[derive(Default)]
415pub struct DeltaMptIdGen {
416 id_limit: DeltaMptId,
417 available_ids: Vec<DeltaMptId>,
418}
419
420impl DeltaMptIdGen {
421 pub fn allocate(&mut self) -> Result<DeltaMptId> {
422 let id;
423 match self.available_ids.pop() {
424 None => {
425 if self.id_limit != DeltaMptId::max_value() {
426 id = Ok(self.id_limit);
427 self.id_limit += 1;
428 } else {
429 id = Err(Error::TooManyDeltaMPT.into())
430 }
431 }
432 Some(x) => id = Ok(x),
433 };
434
435 id
436 }
437
438 pub fn free(&mut self, id: DeltaMptId) {
439 let max_id = self.id_limit - 1;
440 if id == max_id {
441 self.id_limit = max_id
442 } else {
443 self.available_ids.push(id);
444 }
445 }
446}
447
448use self::{
449 node_memory_manager::*, node_ref_map::DeltaMptDbKey, row_number::*,
450};
451use crate::{
452 impls::{
453 delta_mpt::node_ref_map::DeltaMptId, errors::*,
454 merkle_patricia_trie::*, storage_manager::storage_manager::*,
455 },
456 storage_db::delta_db_manager::DeltaDbTransactionTraitObj,
457};
458use cfx_types::hexstr_to_h256;
459use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
460use parking_lot::{Mutex, MutexGuard, RwLock};
461use primitives::{EpochId, MerkleHash, MERKLE_NULL_NODE};
462use std::{borrow::BorrowMut, collections::HashMap, sync::Arc};