1use crate::{
6 cache_config::CacheConfig,
7 cache_manager::{CacheId, CacheManager, CacheSize},
8 consensus::consensus_inner::consensus_executor::RewardExecutionInfo,
9 pow::{PowComputer, TargetDifficultyManager},
10};
11use cfx_executor::internal_contract::make_staking_events;
12use cfx_storage::{
13 state_manager::StateIndex, utils::guarded_value::*, StorageManager,
14 StorageManagerTrait,
15};
16use cfx_types::{Bloom, Space, H256};
17pub use cfxcore_types::block_data_manager::block_data_types;
18use db::SystemDB;
19use malloc_size_of::{new_malloc_size_ops, MallocSizeOf, MallocSizeOfOps};
20use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
21use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockUpgradableReadGuard};
22use primitives::{
23 block::CompactBlock,
24 receipt::{BlockReceipts, TransactionStatus},
25 Block, BlockHeader, EpochId, Receipt, SignedTransaction, TransactionIndex,
26 TransactionWithSignature, NULL_EPOCH,
27};
28use rlp::DecoderError;
29use std::{
30 collections::{HashMap, HashSet},
31 sync::Arc,
32};
33use threadpool::ThreadPool;
34pub mod db_gc_manager;
35pub mod db_manager;
36pub mod tx_data_manager;
37use crate::{
38 block_data_manager::{
39 db_manager::DBManager, tx_data_manager::TransactionDataManager,
40 },
41 consensus::pos_handler::PosVerifier,
42};
43pub use block_data_types::*;
44use cfx_execute_helper::{
45 exec_tracer::{BlockExecTraces, TransactionExecTraces},
46 phantom_tx::build_bloom_and_recover_phantom,
47};
48use cfx_internal_common::{
49 EpochExecutionCommitment, StateAvailabilityBoundary, StateRootWithAuxInfo,
50};
51use db_gc_manager::GCProgress;
52use metrics::{register_meter_with_group, Meter, MeterTimer};
53use primitives::pos::PosBlockId;
54use std::{hash::Hash, path::Path, time::Duration};
55
56lazy_static! {
57 static ref TX_POOL_RECOVER_TIMER: Arc<dyn Meter> =
58 register_meter_with_group("timer", "tx_pool::recover_public");
59}
60
61pub const NULLU64: u64 = !0;
62
63#[derive(DeriveMallocSizeOf)]
64pub struct InvalidBlockSet {
65 capacity: usize,
66 invalid_block_hashes: HashSet<H256>,
67}
68
69impl InvalidBlockSet {
70 pub fn new(capacity: usize) -> Self {
71 InvalidBlockSet {
72 capacity,
73 invalid_block_hashes: HashSet::new(),
74 }
75 }
76
77 pub fn insert(&mut self, value: H256) {
78 if !self.invalid_block_hashes.contains(&value) {
79 if self.invalid_block_hashes.len() < self.capacity {
80 self.invalid_block_hashes.insert(value);
81 return;
82 }
83
84 let mut iter = self.invalid_block_hashes.iter();
85 let the_evicted = iter.next().map(|e| e.clone());
86 if let Some(evicted) = the_evicted {
87 self.invalid_block_hashes.remove(&evicted);
88 }
89 self.invalid_block_hashes.insert(value);
90 }
91 }
92
93 pub fn contains(&self, value: &H256) -> bool {
94 self.invalid_block_hashes.contains(value)
95 }
96}
97
98#[derive(DeriveMallocSizeOf)]
99pub struct BlockDataManager {
100 block_headers: RwLock<HashMap<H256, Arc<BlockHeader>>>,
101 blocks: RwLock<HashMap<H256, Arc<Block>>>,
102 compact_blocks: RwLock<HashMap<H256, CompactBlock>>,
103 block_receipts: RwLock<HashMap<H256, BlockReceiptsInfo>>,
104 block_rewards: RwLock<HashMap<H256, BlockRewardsInfo>>,
105 block_traces: RwLock<HashMap<H256, BlockTracesInfo>>,
106 transaction_indices: RwLock<HashMap<H256, TransactionIndex>>,
107 hash_by_block_number: RwLock<HashMap<u64, H256>>,
108 local_block_info: RwLock<HashMap<H256, LocalBlockInfo>>,
109 blamed_header_verified_roots:
110 RwLock<HashMap<u64, BlamedHeaderVerifiedRoots>>,
111 epoch_execution_commitments:
124 RwLock<HashMap<H256, EpochExecutionCommitment>>,
125 epoch_execution_contexts: RwLock<HashMap<H256, EpochExecutionContext>>,
126
127 invalid_block_set: RwLock<InvalidBlockSet>,
128 cur_consensus_era_genesis_hash: RwLock<H256>,
129 cur_consensus_era_stable_hash: RwLock<H256>,
130 instance_id: Mutex<u64>,
131
132 config: DataManagerConfiguration,
133
134 tx_data_manager: TransactionDataManager,
135 pub db_manager: DBManager,
136
137 #[ignore_malloc_size_of = "Add later"]
139 pub pow: Arc<PowComputer>,
140
141 pub true_genesis: Arc<Block>,
143 pub storage_manager: Arc<StorageManager>,
144 cache_man: Arc<Mutex<CacheManager<CacheId>>>,
145 pub target_difficulty_manager: TargetDifficultyManager,
146 gc_progress: Arc<Mutex<GCProgress>>,
147
148 pub state_availability_boundary: RwLock<StateAvailabilityBoundary>,
169}
170
171impl BlockDataManager {
172 pub fn new(
173 cache_conf: CacheConfig, true_genesis: Arc<Block>, db: Arc<SystemDB>,
174 storage_manager: Arc<StorageManager>,
175 worker_pool: Arc<Mutex<ThreadPool>>, config: DataManagerConfiguration,
176 pow: Arc<PowComputer>,
177 ) -> Self {
178 let mb = 1024 * 1024;
179 let max_cache_size = cache_conf.ledger_mb() * mb;
180 let pref_cache_size = max_cache_size * 3 / 4;
181 let cache_man = Arc::new(Mutex::new(CacheManager::new(
182 pref_cache_size,
183 max_cache_size,
184 3 * mb,
185 )));
186 let tx_data_manager = TransactionDataManager::new(
187 config.tx_cache_index_maintain_timeout,
188 worker_pool,
189 );
190 let db_manager = match config.db_type {
191 DbType::Rocksdb => DBManager::new_from_rocksdb(db, pow.clone()),
192 DbType::Sqlite => DBManager::new_from_sqlite(
193 Path::new("./sqlite_db"),
194 pow.clone(),
195 ),
196 };
197 let previous_db_progress =
198 db_manager.gc_progress_from_db().unwrap_or(0);
199
200 let data_man = Self {
201 block_headers: RwLock::new(HashMap::new()),
202 blocks: RwLock::new(HashMap::new()),
203 compact_blocks: Default::default(),
204 block_receipts: Default::default(),
205 block_rewards: Default::default(),
206 block_traces: Default::default(),
207 transaction_indices: Default::default(),
208 hash_by_block_number: Default::default(),
209 local_block_info: Default::default(),
210 blamed_header_verified_roots: Default::default(),
211 epoch_execution_commitments: Default::default(),
212 epoch_execution_contexts: Default::default(),
213 invalid_block_set: RwLock::new(InvalidBlockSet::new(
214 cache_conf.invalid_block_hashes_cache_size_in_count,
215 )),
216 true_genesis: true_genesis.clone(),
217 storage_manager,
218 cache_man,
219 instance_id: Mutex::new(0),
220 config,
221 target_difficulty_manager: TargetDifficultyManager::new(
222 cache_conf.target_difficulties_cache_size_in_count,
223 ),
224 cur_consensus_era_genesis_hash: RwLock::new(true_genesis.hash()),
225 cur_consensus_era_stable_hash: RwLock::new(true_genesis.hash()),
226 tx_data_manager,
227 db_manager,
228 pow,
229 state_availability_boundary: RwLock::new(
230 StateAvailabilityBoundary::new(
231 true_genesis.hash(),
232 0,
233 None,
234 None,
235 ),
236 ),
237 gc_progress: Arc::new(Mutex::new(GCProgress::new(
238 previous_db_progress,
239 ))),
240 };
241
242 data_man.initialize_instance_id();
243
244 let cur_era_genesis_hash =
245 match data_man.db_manager.checkpoint_hashes_from_db() {
246 None => true_genesis.hash(),
247 Some((checkpoint_hash, stable_hash)) => {
248 *data_man.cur_consensus_era_genesis_hash.write() =
249 checkpoint_hash;
250 *data_man.cur_consensus_era_stable_hash.write() =
251 stable_hash;
252 checkpoint_hash
253 }
254 };
255
256 debug!(
257 "BlockDataManager::new() cur_era_genesis_hash: {:?}",
258 &cur_era_genesis_hash
259 );
260
261 if cur_era_genesis_hash == data_man.true_genesis.hash() {
262 data_man.insert_block(
264 data_man.true_genesis.clone(),
265 true, );
267 for (index, tx) in
268 data_man.true_genesis.transactions.iter().enumerate()
269 {
270 data_man.insert_transaction_index(
271 &tx.hash,
272 &TransactionIndex {
273 block_hash: cur_era_genesis_hash,
274 real_index: index,
275 is_phantom: false,
276 rpc_index: Some(index),
278 },
279 );
280 }
281 data_man.insert_epoch_execution_context(
283 cur_era_genesis_hash,
284 EpochExecutionContext {
285 start_block_number: 0,
286 },
287 true,
288 );
289 data_man.db_manager.insert_local_block_info_to_db(
291 &data_man.true_genesis.hash(),
292 &LocalBlockInfo::new(
293 BlockStatus::Valid,
294 0,
295 data_man.get_instance_id(),
296 ),
297 );
298 data_man.insert_epoch_execution_commitment(
299 data_man.true_genesis.hash(),
300 data_man.true_genesis_state_root(),
301 *data_man.true_genesis.block_header.deferred_receipts_root(),
302 *data_man
303 .true_genesis
304 .block_header
305 .deferred_logs_bloom_hash(),
306 );
307 } else {
308 data_man.insert_epoch_execution_context(
310 cur_era_genesis_hash,
311 data_man
312 .get_epoch_execution_context(&cur_era_genesis_hash)
313 .expect("ExecutionContext exists for cur_era_genesis"),
314 false, );
316 if let Some(mut local_block_info) = data_man
318 .db_manager
319 .local_block_info_from_db(&cur_era_genesis_hash)
320 {
321 local_block_info.instance_id = data_man.get_instance_id();
322 data_man.db_manager.insert_local_block_info_to_db(
323 &cur_era_genesis_hash,
324 &local_block_info,
325 );
326 }
327 }
330
331 data_man
332 }
333
334 pub fn get_instance_id(&self) -> u64 { *self.instance_id.lock() }
335
336 pub fn initialize_instance_id(&self) {
337 let mut my_instance_id = self.instance_id.lock();
338 if *my_instance_id == 0 {
339 let instance_id = self.db_manager.instance_id_from_db();
341
342 if let Some(instance_id) = instance_id {
344 *my_instance_id = instance_id + 1;
345 }
346 } else {
347 *my_instance_id += 1;
351 if let Some(mut local_block_info) =
352 self.db_manager.local_block_info_from_db(
353 &self.get_cur_consensus_era_genesis_hash(),
354 )
355 {
356 local_block_info.instance_id = *my_instance_id;
357 self.db_manager.insert_local_block_info_to_db(
358 &self.get_cur_consensus_era_genesis_hash(),
359 &local_block_info,
360 );
361 }
362 }
363
364 self.db_manager.insert_instance_id_to_db(*my_instance_id);
366 }
367
368 pub fn true_genesis_state_root(&self) -> StateRootWithAuxInfo {
370 let true_genesis_hash = self.true_genesis.hash();
371 self.storage_manager
372 .get_state_no_commit(
373 StateIndex::new_for_readonly(
374 &true_genesis_hash,
375 &StateRootWithAuxInfo::genesis(&true_genesis_hash),
376 ),
377 false,
378 None,
379 )
380 .unwrap()
381 .unwrap()
382 .get_state_root()
383 .unwrap()
384 }
385
386 pub fn transaction_by_hash(
387 &self, hash: &H256,
388 ) -> Option<Arc<SignedTransaction>> {
389 let tx_index = self
390 .transaction_index_by_hash(hash, false )?;
391 let block = self.block_by_hash(
392 &tx_index.block_hash,
393 false, )?;
395 assert!(tx_index.real_index < block.transactions.len());
396 Some(block.transactions[tx_index.real_index].clone())
397 }
398
399 pub fn insert_block_body(
401 &self, hash: H256, block: Arc<Block>, persistent: bool,
402 ) {
403 if persistent {
404 self.db_manager.insert_block_body_to_db(block.as_ref());
405 }
406 self.cache_man.lock().note_used(CacheId::Block(hash));
407 self.blocks.write().insert(hash, block);
408 }
409
410 pub fn remove_block_body(&self, hash: &H256, remove_db: bool) {
412 if remove_db {
413 self.db_manager.remove_block_body_from_db(hash);
414 }
415 self.blocks.write().remove(hash);
416 }
417
418 pub fn block_by_hash(
420 &self, hash: &H256, update_cache: bool,
421 ) -> Option<Arc<Block>> {
422 self.get(
423 hash,
424 &self.blocks,
425 |key| self.db_manager.block_from_db(key).map(Arc::new),
426 if update_cache {
427 Some(CacheId::Block(*hash))
428 } else {
429 None
430 },
431 )
432 }
433
434 pub fn block_from_db(&self, hash: &H256) -> Option<Block> {
436 self.db_manager.block_from_db(hash)
437 }
438
439 pub fn blocks_by_hash_list(
440 &self, hashes: &Vec<H256>, update_cache: bool,
441 ) -> Option<Vec<Arc<Block>>> {
442 let mut blocks = Vec::new();
443 for h in hashes {
444 blocks.push(self.block_by_hash(h, update_cache)?);
445 }
446 Some(blocks)
447 }
448
449 pub fn insert_block(&self, block: Arc<Block>, persistent: bool) {
451 let hash = block.hash();
452 self.insert_block_header(
453 hash,
454 Arc::new(block.block_header.clone()),
455 persistent,
456 );
457 self.insert_block_body(hash, block, persistent);
458 }
459
460 pub fn remove_useless_block(&self, hash: &H256, remove_db: bool) {
464 if self
467 .local_block_info_by_hash(hash)
468 .map(|info| info.get_status() == BlockStatus::Invalid)
469 .unwrap_or(true)
470 {
471 self.remove_block_header(hash, remove_db);
472 self.remove_block_body(hash, remove_db);
473 }
474 }
475
476 pub fn block_traces_by_hash(
480 &self, hash: &H256,
481 ) -> Option<BlockTracesWithEpoch> {
482 let maybe_traces_in_mem = self
483 .block_traces
484 .read()
485 .get(hash)
486 .and_then(|traces_info| traces_info.get_current_data());
487 let maybe_traces = maybe_traces_in_mem.or_else(|| {
489 self.db_manager.block_traces_from_db(hash).map(
490 |traces_with_epoch| {
491 self.block_traces
492 .write()
493 .entry(*hash)
494 .or_insert(BlockTracesInfo::default())
495 .insert_data(
496 &traces_with_epoch.0,
497 traces_with_epoch.1.clone(),
498 );
499 traces_with_epoch
500 },
501 )
502 });
503 if maybe_traces.is_some() {
504 self.cache_man.lock().note_used(CacheId::BlockTraces(*hash));
505 }
506 maybe_traces
507 }
508
509 pub fn block_tx_traces_by_hash(
511 &self, hash: &H256,
512 ) -> Option<(H256, Vec<TransactionExecTraces>)> {
513 self.block_traces_by_hash(hash).map(
514 |DataVersionTuple(pivot_hash, block_trace)| {
515 (pivot_hash, block_trace.into())
516 },
517 )
518 }
519
520 pub fn block_traces_by_hash_with_epoch(
521 &self, hash: &H256, assumed_epoch: &H256,
522 update_pivot_assumption: bool, update_cache: bool,
523 ) -> Option<BlockExecTraces> {
524 self.get_version(
525 hash,
526 assumed_epoch,
527 &self.block_traces,
528 update_pivot_assumption,
529 match update_cache {
530 true => Some(CacheId::BlockTraces(*hash)),
531 false => None,
532 },
533 |key| self.db_manager.block_traces_from_db(key),
534 |key, result| {
535 self.db_manager.insert_block_traces_to_db(key, result);
536 },
537 )
538 }
539
540 pub fn insert_block_traces(
541 &self, hash: H256, trace: BlockExecTraces, pivot_hash: H256,
542 persistent: bool,
543 ) {
544 trace! {"insert_block_traces start pivot={:?}", pivot_hash};
545 self.insert_version(
546 hash,
547 &pivot_hash,
548 trace,
549 |key, result| {
550 self.db_manager.insert_block_traces_to_db(key, result);
551 },
552 &self.block_traces,
553 CacheId::BlockTraces(hash),
554 persistent,
555 );
556 trace! {"insert_block_traces ends pivot={:?}", pivot_hash};
557 }
558
559 pub fn remove_block_traces(&self, hash: &H256, remove_db: bool) {
561 if remove_db {
562 self.db_manager.remove_block_trace_from_db(hash);
563 }
564 self.block_traces.write().remove(hash);
565 }
566
567 pub fn block_header_by_hash(
568 &self, hash: &H256,
569 ) -> Option<Arc<BlockHeader>> {
570 self.get(
571 hash,
572 &self.block_headers,
573 |key| self.db_manager.block_header_from_db(key).map(Arc::new),
574 Some(CacheId::BlockHeader(*hash)),
575 )
576 }
577
578 pub fn insert_block_header(
579 &self, hash: H256, header: Arc<BlockHeader>, persistent: bool,
580 ) {
581 self.insert(
582 hash,
583 header,
584 &self.block_headers,
585 |_, value| {
586 self.db_manager.insert_block_header_to_db(value.as_ref())
587 },
588 Some(CacheId::BlockHeader(hash)),
589 persistent,
590 )
591 }
592
593 pub fn remove_block_header(&self, hash: &H256, remove_db: bool) {
595 if remove_db {
596 self.db_manager.remove_block_header_from_db(hash);
597 }
598 self.block_headers.write().remove(hash);
599 }
600
601 pub fn block_height_by_hash(&self, hash: &H256) -> Option<u64> {
602 let result = self.block_header_by_hash(hash)?;
603 Some(result.height())
604 }
605
606 pub fn pos_reference_by_hash(
610 &self, hash: &H256,
611 ) -> Option<Option<PosBlockId>> {
612 self.block_header_by_hash(hash)
613 .map(|header| header.pos_reference().clone())
614 }
615
616 pub fn compact_block_by_hash(&self, hash: &H256) -> Option<CompactBlock> {
617 self.compact_blocks.read().get(hash).map(|b| {
618 self.cache_man
619 .lock()
620 .note_used(CacheId::CompactBlock(b.hash()));
621 b.clone()
622 })
623 }
624
625 pub fn insert_compact_block(&self, cb: CompactBlock) {
626 let hash = cb.hash();
627 self.compact_blocks.write().insert(hash, cb);
628 self.cache_man.lock().note_used(CacheId::CompactBlock(hash));
629 }
630
631 pub fn contains_compact_block(&self, hash: &H256) -> bool {
632 self.compact_blocks.read().contains_key(hash)
633 }
634
635 pub fn block_execution_result_by_hash_with_epoch(
643 &self, hash: &H256, assumed_epoch: &H256,
644 update_pivot_assumption: bool, update_cache: bool,
645 ) -> Option<BlockExecutionResult> {
646 self.get_version(
647 hash,
648 assumed_epoch,
649 &self.block_receipts,
650 update_pivot_assumption,
651 match update_cache {
652 true => Some(CacheId::BlockReceipts(*hash)),
653 false => None,
654 },
655 |key| self.db_manager.block_execution_result_from_db(key),
656 |key, result| {
657 self.db_manager
658 .insert_block_execution_result_to_db(key, result);
659 },
660 )
661 }
662
663 pub fn block_execution_result_by_hash_from_db(
664 &self, hash: &H256,
665 ) -> Option<BlockExecutionResultWithEpoch> {
666 self.db_manager.block_execution_result_from_db(hash)
667 }
668
669 pub fn block_epoch_number(&self, hash: &H256) -> Option<u64> {
670 if hash == &self.true_genesis.hash() {
671 return Some(0);
674 }
675 self.block_execution_result_by_hash_from_db(&hash)
676 .map(|execution_result| execution_result.0)
677 .and_then(|pivot| self.block_header_by_hash(&pivot))
678 .map(|header| header.height())
679 }
680
681 pub fn insert_block_execution_result(
682 &self, hash: H256, epoch: H256, block_receipts: Arc<BlockReceipts>,
683 persistent: bool,
684 ) {
685 trace! {"insert_block_traces start pivot={:?}", epoch};
686 let bloom =
687 block_receipts
688 .receipts
689 .iter()
690 .fold(Bloom::zero(), |mut b, r| {
691 b.accrue_bloom(&r.log_bloom);
692 b
693 });
694 self.insert_version(
695 hash,
696 &epoch,
697 BlockExecutionResult {
698 block_receipts,
699 bloom,
700 },
701 |key, result| {
702 self.db_manager
703 .insert_block_execution_result_to_db(key, result);
704 },
705 &self.block_receipts,
706 CacheId::BlockReceipts(hash),
707 persistent,
708 );
709 trace! {"insert_block_traces end pivot={:?}", epoch};
710 }
711
712 pub fn insert_block_reward_result(
713 &self, hash: H256, epoch: &H256, block_reward: BlockRewardResult,
714 persistent: bool,
715 ) {
716 self.insert_version(
717 hash,
718 epoch,
719 block_reward,
720 |key, result| {
721 self.db_manager
722 .insert_block_reward_result_to_db(key, &result);
723 },
724 &self.block_rewards,
725 CacheId::BlockRewards(hash),
726 persistent,
727 );
728 }
729
730 pub fn block_reward_result_by_hash_with_epoch(
731 &self, hash: &H256, assumed_epoch_later: &H256,
732 update_pivot_assumption: bool, update_cache: bool,
733 ) -> Option<BlockRewardResult> {
734 self.get_version(
735 hash,
736 assumed_epoch_later,
737 &self.block_rewards,
738 update_pivot_assumption,
739 match update_cache {
740 true => Some(CacheId::BlockRewards(*hash)),
741 false => None,
742 },
743 |key| self.db_manager.block_reward_result_from_db(key),
744 |key, result| {
745 self.db_manager
746 .insert_block_reward_result_to_db(key, result)
747 },
748 )
749 }
750
751 pub fn remove_block_result(&self, hash: &H256, remove_db: bool) {
752 self.block_receipts.write().remove(hash);
753 self.block_rewards.write().remove(hash);
754 if remove_db {
755 self.db_manager.remove_block_execution_result_from_db(hash);
756 self.db_manager.remove_block_reward_result_from_db(hash);
757 }
758 }
759
760 pub fn transaction_index_by_hash(
761 &self, hash: &H256, update_cache: bool,
762 ) -> Option<TransactionIndex> {
763 if self.config.persist_tx_index {
764 self.get(
765 hash,
766 &self.transaction_indices,
767 |key| self.db_manager.transaction_index_from_db(key),
768 if update_cache {
769 Some(CacheId::TransactionAddress(*hash))
770 } else {
771 None
772 },
773 )
774 } else {
775 self.transaction_indices.read().get(hash).map(|v| v.clone())
776 }
777 }
778
779 pub fn insert_transaction_index(
780 &self, hash: &H256, tx_index: &TransactionIndex,
781 ) {
782 if self.config.persist_tx_index {
783 self.transaction_indices
786 .write()
787 .entry(*hash)
788 .and_modify(|v| {
789 *v = tx_index.clone();
790 self.cache_man
791 .lock()
792 .note_used(CacheId::TransactionAddress(*hash));
793 });
794 self.db_manager
795 .insert_transaction_index_to_db(hash, tx_index);
796 } else {
797 self.transaction_indices
799 .write()
800 .insert(hash.clone(), tx_index.clone());
801 self.cache_man
802 .lock()
803 .note_used(CacheId::TransactionAddress(*hash));
804 }
805 }
806
807 pub fn hash_by_block_number(
808 &self, block_number: u64, update_cache: bool,
809 ) -> Option<H256> {
810 if self.config.persist_block_number_index {
811 self.get(
812 &block_number,
813 &self.hash_by_block_number,
814 |key| self.db_manager.hash_by_block_number_from_db(key),
815 if update_cache {
816 Some(CacheId::HashByBlockNumber(block_number))
817 } else {
818 None
819 },
820 )
821 } else {
822 self.hash_by_block_number
823 .read()
824 .get(&block_number)
825 .map(|v| v.clone())
826 }
827 }
828
829 pub fn insert_hash_by_block_number(
830 &self, block_number: u64, block_hash: &H256,
831 ) {
832 if self.config.persist_block_number_index {
833 self.hash_by_block_number
834 .write()
835 .entry(block_number)
836 .and_modify(|v| {
837 *v = block_hash.clone();
838 self.cache_man
839 .lock()
840 .note_used(CacheId::HashByBlockNumber(block_number));
841 });
842 self.db_manager
843 .insert_hash_by_block_number_to_db(block_number, block_hash);
844 } else {
845 self.hash_by_block_number
847 .write()
848 .insert(block_number, *block_hash);
849 self.cache_man
850 .lock()
851 .note_used(CacheId::HashByBlockNumber(block_number));
852 }
853 }
854
855 pub fn insert_local_block_info(&self, hash: &H256, info: LocalBlockInfo) {
856 self.insert(
857 *hash,
858 info,
859 &self.local_block_info,
860 |key, value| {
861 self.db_manager.insert_local_block_info_to_db(key, value)
862 },
863 Some(CacheId::LocalBlockInfo(*hash)),
864 true,
865 )
866 }
867
868 pub fn local_block_info_by_hash(
869 &self, hash: &H256,
870 ) -> Option<LocalBlockInfo> {
871 self.get(
872 hash,
873 &self.local_block_info,
874 |key| self.db_manager.local_block_info_from_db(key),
875 Some(CacheId::LocalBlockInfo(*hash)),
876 )
877 }
878
879 pub fn insert_blamed_header_verified_roots(
880 &self, height: u64, roots: BlamedHeaderVerifiedRoots,
881 ) {
882 self.insert(
883 height,
884 roots,
885 &self.blamed_header_verified_roots,
886 |key, value| {
887 self.db_manager
888 .insert_blamed_header_verified_roots_to_db(*key, value)
889 },
890 Some(CacheId::BlamedHeaderVerifiedRoots(height)),
891 true,
892 )
893 }
894
895 pub fn verified_blamed_roots_by_height(
898 &self, height: u64,
899 ) -> Option<BlamedHeaderVerifiedRoots> {
900 self.get(
901 &height,
902 &self.blamed_header_verified_roots,
903 |key| self.db_manager.blamed_header_verified_roots_from_db(*key),
904 Some(CacheId::BlamedHeaderVerifiedRoots(height)),
905 )
906 }
907
908 pub fn remove_blamed_header_verified_roots(&self, height: u64) {
909 self.blamed_header_verified_roots.write().remove(&height);
910 self.db_manager
911 .remove_blamed_header_verified_roots_from_db(height);
912 }
913
914 fn insert<K, V, InsertF>(
915 &self, key: K, value: V, in_mem: &RwLock<HashMap<K, V>>,
916 insert_f: InsertF, maybe_cache_id: Option<CacheId>, persistent: bool,
917 ) where
918 K: Clone + Eq + Hash,
919 InsertF: Fn(&K, &V),
920 {
921 if persistent {
922 insert_f(&key, &value);
923 }
924 in_mem.write().insert(key.clone(), value);
925 if let Some(cache_id) = maybe_cache_id {
926 self.cache_man.lock().note_used(cache_id);
927 }
928 }
929
930 fn get<K, V, LoadF>(
931 &self, key: &K, in_mem: &RwLock<HashMap<K, V>>, load_f: LoadF,
932 maybe_cache_id: Option<CacheId>,
933 ) -> Option<V>
934 where
935 K: Clone + Eq + Hash,
936 V: Clone,
937 LoadF: Fn(&K) -> Option<V>,
938 {
939 if let Some(value) = in_mem.read().get(key) {
940 return Some(value.clone());
941 }
942 load_f(key).map(|value| {
943 if let Some(cache_id) = maybe_cache_id {
944 let mut write = in_mem.write();
945 write.insert(key.clone(), value.clone());
946 self.cache_man.lock().note_used(cache_id);
947 }
948 value
949 })
950 }
951
952 fn insert_version<K, Ver, V, InsertF>(
953 &self, key: K, version: &Ver, value: V, insert_f: InsertF,
954 in_mem: &RwLock<HashMap<K, BlockDataWithMultiVersion<Ver, V>>>,
955 cache_id: CacheId, persistent: bool,
956 ) where
957 K: Eq + Hash,
958 Ver: Clone + Eq + PartialEq + Copy,
959 V: Clone,
960 InsertF: Fn(&K, &DataVersionTuple<Ver, V>),
961 {
962 let result = DataVersionTuple(version.clone(), value);
963 if persistent {
964 insert_f(&key, &result);
965 }
966 in_mem
967 .write()
968 .entry(key)
969 .or_insert(BlockDataWithMultiVersion::default())
970 .insert_current_data(version, result.1);
971 self.cache_man.lock().note_used(cache_id);
972 }
973
974 fn get_version<K, Ver, V, LoadF, InsertF>(
975 &self, key: &K, version: &Ver,
976 in_mem: &RwLock<HashMap<K, BlockDataWithMultiVersion<Ver, V>>>,
977 update_current_version: bool, maybe_cache_id: Option<CacheId>,
978 load_f: LoadF, insert_f: InsertF,
979 ) -> Option<V>
980 where
981 K: Eq + Hash + Clone,
982 Ver: Eq + Copy + PartialEq + std::fmt::Display,
983 V: Clone,
984 LoadF: Fn(&K) -> Option<DataVersionTuple<Ver, V>>,
985 InsertF: Fn(&K, &DataVersionTuple<Ver, V>),
986 {
987 if let Some(versions) = in_mem.write().get_mut(key) {
988 if let Some((value, is_current_version)) =
989 versions.get_data_at_version(version)
990 {
991 if update_current_version && !is_current_version {
992 versions.set_current_version(*version);
993 insert_f(key, &DataVersionTuple(*version, value.clone()));
994 }
995 if let Some(cache_id) = maybe_cache_id {
996 self.cache_man.lock().note_used(cache_id);
997 }
998 return Some(value);
999 }
1000 }
1001 let DataVersionTuple(version_in_db, res) = load_f(key)?;
1002 if version_in_db != *version {
1003 debug!(
1004 "Version from db {} does not match required {}",
1005 version_in_db, version
1006 );
1007 return None;
1008 }
1009 if let Some(cache_id) = maybe_cache_id {
1010 in_mem
1011 .write()
1012 .entry(key.clone())
1013 .or_insert(BlockDataWithMultiVersion::default())
1014 .insert_data(version, res.clone());
1015 self.cache_man.lock().note_used(cache_id);
1016 }
1017 Some(res)
1018 }
1019
1020 pub fn insert_terminals_to_db(&self, terminals: Vec<H256>) {
1021 self.db_manager.insert_terminals_to_db(&terminals)
1022 }
1023
1024 pub fn terminals_from_db(&self) -> Option<Vec<H256>> {
1025 self.db_manager.terminals_from_db()
1026 }
1027
1028 pub fn insert_executed_epoch_set_hashes_to_db(
1029 &self, epoch_number: u64, epoch_set: &Vec<H256>,
1030 ) {
1031 self.db_manager
1032 .insert_executed_epoch_set_hashes_to_db(epoch_number, epoch_set);
1033 }
1034
1035 pub fn insert_skipped_epoch_set_hashes_to_db(
1036 &self, epoch_number: u64, skipped_set: &Vec<H256>,
1037 ) {
1038 self.db_manager
1039 .insert_skipped_epoch_set_hashes_to_db(epoch_number, skipped_set);
1040 }
1041
1042 pub fn executed_epoch_set_hashes_from_db(
1043 &self, epoch_number: u64,
1044 ) -> Option<Vec<H256>> {
1045 if epoch_number != 0 {
1046 self.db_manager
1047 .executed_epoch_set_hashes_from_db(epoch_number)
1048 } else {
1049 Some(vec![self.true_genesis.hash()])
1050 }
1051 }
1052
1053 pub fn skipped_epoch_set_hashes_from_db(
1054 &self, epoch_number: u64,
1055 ) -> Option<Vec<H256>> {
1056 if epoch_number != 0 {
1057 self.db_manager
1058 .skipped_epoch_set_hashes_from_db(epoch_number)
1059 } else {
1060 Some(vec![])
1061 }
1062 }
1063
1064 pub fn all_epoch_set_hashes_from_db(
1065 &self, epoch_number: u64,
1066 ) -> Option<Vec<H256>> {
1067 if epoch_number != 0 {
1068 let mut res = self
1069 .db_manager
1070 .skipped_epoch_set_hashes_from_db(epoch_number)?;
1071 res.append(
1072 &mut self
1073 .db_manager
1074 .executed_epoch_set_hashes_from_db(epoch_number)?,
1075 );
1076 Some(res)
1077 } else {
1078 Some(vec![self.true_genesis.hash()])
1079 }
1080 }
1081
1082 pub fn receipts_retain_epoch(
1084 &self, block_hash: &H256, epoch: &H256,
1085 ) -> bool {
1086 match self.block_receipts.write().get_mut(block_hash) {
1087 Some(r) => {
1088 r.retain_version(epoch);
1089 true
1090 }
1091 None => false,
1092 }
1093 }
1094
1095 pub fn insert_epoch_execution_context(
1096 &self, hash: H256, ctx: EpochExecutionContext, persistent: bool,
1097 ) {
1098 self.insert(
1099 hash,
1100 ctx,
1101 &self.epoch_execution_contexts,
1102 |key, value| {
1103 self.db_manager.insert_execution_context_to_db(key, value)
1104 },
1105 None,
1106 persistent,
1107 );
1108 }
1109
1110 pub fn get_epoch_execution_context(
1113 &self, hash: &H256,
1114 ) -> Option<EpochExecutionContext> {
1115 self.get(
1116 hash,
1117 &self.epoch_execution_contexts,
1118 |key| self.db_manager.execution_context_from_db(key),
1119 None,
1120 )
1121 }
1122
1123 pub fn insert_epoch_execution_commitment(
1126 &self, block_hash: H256,
1127 state_root_with_aux_info: StateRootWithAuxInfo, receipts_root: H256,
1128 logs_bloom_hash: H256,
1129 ) {
1130 let commitment = EpochExecutionCommitment {
1131 state_root_with_aux_info,
1132 receipts_root,
1133 logs_bloom_hash,
1134 };
1135 self.insert(
1136 block_hash,
1137 commitment,
1138 &self.epoch_execution_commitments,
1139 |key, value| {
1140 self.db_manager
1141 .insert_epoch_execution_commitment_to_db(key, value)
1142 },
1143 None,
1144 true,
1145 );
1146 }
1147
1148 pub fn get_epoch_execution_commitment(
1150 &self, block_hash: &H256,
1151 ) -> GuardedValue<
1152 RwLockReadGuard<'_, HashMap<H256, EpochExecutionCommitment>>,
1153 NonCopy<Option<&'_ EpochExecutionCommitment>>,
1154 > {
1155 let read_lock = self.epoch_execution_commitments.read();
1156 let (read_lock, derefed) = GuardedValue::new_derefed(read_lock).into();
1157 GuardedValue::new(read_lock, NonCopy(derefed.0.get(block_hash)))
1158 }
1159
1160 pub fn load_epoch_execution_commitment_from_db(
1164 &self, block_hash: &H256,
1165 ) -> Option<EpochExecutionCommitment> {
1166 let commitment = self
1167 .db_manager
1168 .epoch_execution_commitment_from_db(block_hash)?;
1169 self.epoch_execution_commitments
1170 .write()
1171 .insert(*block_hash, commitment.clone());
1172 Some(commitment)
1173 }
1174
1175 pub fn get_epoch_execution_commitment_with_db(
1178 &self, block_hash: &H256,
1179 ) -> Option<EpochExecutionCommitment> {
1180 self.get_epoch_execution_commitment(block_hash).map_or_else(
1181 || {
1182 self.db_manager
1183 .epoch_execution_commitment_from_db(block_hash)
1184 },
1185 |maybe_ref| Some(maybe_ref.clone()),
1186 )
1187 }
1188
1189 pub fn insert_pos_reward(
1190 &self, pos_epoch: u64, pos_reward: &PosRewardInfo,
1191 ) {
1192 self.db_manager.insert_pos_reward(pos_epoch, pos_reward)
1193 }
1194
1195 pub fn pos_reward_by_pos_epoch(
1196 &self, pos_epoch: u64,
1197 ) -> Option<PosRewardInfo> {
1198 self.db_manager.pos_reward_by_pos_epoch(pos_epoch)
1199 }
1200
1201 pub fn remove_epoch_execution_commitment(&self, block_hash: &H256) {
1202 self.epoch_execution_commitments.write().remove(block_hash);
1203 }
1204
1205 pub fn remove_epoch_execution_commitment_from_db(&self, block_hash: &H256) {
1206 self.db_manager
1207 .remove_epoch_execution_commitment_from_db(block_hash);
1208 }
1209
1210 pub fn remove_epoch_execution_context(&self, block_hash: &H256) {
1211 self.epoch_execution_contexts.write().remove(block_hash);
1212 }
1213
1214 pub fn remove_epoch_execution_context_from_db(&self, block_hash: &H256) {
1215 self.db_manager
1216 .remove_epoch_execution_context_from_db(block_hash);
1217 }
1218
1219 pub fn epoch_executed(&self, epoch_hash: &H256) -> bool {
1220 self.get_epoch_execution_commitment(epoch_hash).is_some()
1222 }
1223
1224 pub fn epoch_executed_and_recovered(
1226 &self, epoch_hash: &H256, epoch_block_hashes: &Vec<H256>,
1227 on_local_pivot: bool, update_trace: bool,
1228 reward_execution_info: &Option<RewardExecutionInfo>,
1229 pos_verifier: &PosVerifier, evm_chain_id: u32,
1230 ) -> bool {
1231 if !self.epoch_executed(epoch_hash) {
1232 return false;
1233 }
1234
1235 if !on_local_pivot {
1236 return true;
1237 }
1238 let mut epoch_receipts = Vec::new();
1240 let mut epoch_staking_events = Vec::new();
1241 for h in epoch_block_hashes {
1242 if let Some(r) = self.block_execution_result_by_hash_with_epoch(
1243 h, epoch_hash, true, true, ) {
1246 epoch_receipts.push(r.block_receipts);
1247 } else {
1248 return false;
1249 }
1250 if update_trace {
1251 if self
1253 .block_traces_by_hash_with_epoch(
1254 h, epoch_hash, true, true, )
1257 .is_none()
1258 {
1259 return false;
1260 }
1261 }
1262 }
1263
1264 let mut evm_tx_index = 0;
1265
1266 for (block_idx, block_hash) in epoch_block_hashes.iter().enumerate() {
1268 let mut cfx_tx_index = 0;
1269
1270 let block = self
1271 .block_by_hash(block_hash, true )
1272 .expect("block exists");
1273
1274 for (tx_idx, tx) in block.transactions.iter().enumerate() {
1275 let Receipt {
1276 outcome_status,
1277 logs,
1278 ..
1279 } = epoch_receipts[block_idx].receipts.get(tx_idx).unwrap();
1280
1281 let rpc_index = match tx.space() {
1282 Space::Native => {
1283 let rpc_index = cfx_tx_index;
1284 cfx_tx_index += 1;
1285 rpc_index
1286 }
1287 Space::Ethereum
1288 if *outcome_status != TransactionStatus::Skipped =>
1289 {
1290 let rpc_index = evm_tx_index;
1291 evm_tx_index += 1;
1292 rpc_index
1293 }
1294 _ => usize::MAX, };
1296
1297 let (phantom_txs, _) =
1298 build_bloom_and_recover_phantom(logs, tx.hash());
1299
1300 match outcome_status {
1301 TransactionStatus::Success | TransactionStatus::Failure => {
1302 self.insert_transaction_index(
1303 &tx.hash,
1304 &TransactionIndex {
1305 block_hash: *block_hash,
1306 real_index: tx_idx,
1307 is_phantom: false,
1308 rpc_index: Some(rpc_index),
1309 },
1310 );
1311
1312 for ptx in phantom_txs {
1313 self.insert_transaction_index(
1314 &ptx.into_eip155(evm_chain_id).hash(),
1315 &TransactionIndex {
1316 block_hash: *block_hash,
1317 real_index: tx_idx,
1318 is_phantom: true,
1319 rpc_index: Some(evm_tx_index),
1320 },
1321 );
1322
1323 evm_tx_index += 1;
1324 }
1325
1326 epoch_staking_events.extend(make_staking_events(logs));
1327 }
1328 _ => {}
1329 }
1330 }
1331 }
1332 if let Some(reward_execution_info) = reward_execution_info {
1333 for block in &reward_execution_info.epoch_blocks {
1334 let h = block.as_ref().hash();
1335 if self
1336 .block_reward_result_by_hash_with_epoch(
1337 &h, epoch_hash, true, true,
1338 )
1339 .is_none()
1340 {
1341 return false;
1342 }
1343 }
1344 }
1345 let me_height = self.block_height_by_hash(epoch_hash).unwrap();
1346 if pos_verifier.pos_option().is_some() && me_height != 0 {
1347 let pivot_block_header = self
1349 .block_header_by_hash(epoch_hash)
1350 .expect("header exists");
1351 let maybe_parent_pos_ref = self
1352 .block_header_by_hash(&pivot_block_header.parent_hash()) .and_then(|parent| parent.pos_reference().clone());
1354 if pos_verifier.is_enabled_at_height(me_height)
1355 && maybe_parent_pos_ref.is_some()
1356 && maybe_parent_pos_ref != *pivot_block_header.pos_reference()
1357 {
1358 if let Some((pos_epoch, _)) = pos_verifier
1359 .get_reward_distribution_event(
1360 pivot_block_header.pos_reference().as_ref().unwrap(),
1361 maybe_parent_pos_ref.as_ref().unwrap(),
1362 )
1363 .as_ref()
1364 .and_then(|x| x.first())
1365 {
1366 if let Some(pos_reward) =
1367 self.pos_reward_by_pos_epoch(*pos_epoch)
1368 {
1369 if pos_reward.execution_epoch_hash != *epoch_hash {
1370 return false;
1375 }
1376 }
1377 }
1378 }
1379
1380 trace!(
1381 "staking events update: height={}, new={}",
1382 me_height,
1383 epoch_hash,
1384 );
1385 if let Err(e) = pos_verifier.consensus_db().put_staking_events(
1386 me_height,
1387 *epoch_hash,
1388 epoch_staking_events,
1389 ) {
1390 error!("epoch_executed err={:?}", e);
1391 return false;
1392 }
1393 }
1394
1395 true
1396 }
1397
1398 pub fn invalidate_block(&self, block_hash: H256) {
1399 let block_info =
1402 LocalBlockInfo::new(BlockStatus::Invalid, NULLU64, NULLU64);
1403 self.db_manager
1404 .insert_local_block_info_to_db(&block_hash, &block_info);
1405 self.invalid_block_set.write().insert(block_hash);
1406 }
1407
1408 pub fn verified_invalid(
1410 &self, block_hash: &H256,
1411 ) -> (bool, Option<LocalBlockInfo>) {
1412 let invalid_block_set = self.invalid_block_set.upgradable_read();
1413 if invalid_block_set.contains(block_hash) {
1414 return (true, None);
1415 } else {
1416 if let Some(block_info) =
1417 self.db_manager.local_block_info_from_db(block_hash)
1418 {
1419 match block_info.get_status() {
1420 BlockStatus::Invalid => {
1421 RwLockUpgradableReadGuard::upgrade(invalid_block_set)
1422 .insert(*block_hash);
1423 return (true, Some(block_info));
1424 }
1425 _ => return (false, Some(block_info)),
1426 }
1427 } else {
1428 return (false, None);
1430 }
1431 }
1432 }
1433
1434 pub fn cached_block_count(&self) -> usize { self.blocks.read().len() }
1435
1436 pub fn cache_size(&self) -> CacheSize {
1438 let malloc_ops = &mut new_malloc_size_ops();
1439 let block_headers = self.block_headers.read().size_of(malloc_ops);
1440 let blocks = self.blocks.read().size_of(malloc_ops);
1441 let compact_blocks = self.compact_blocks.read().size_of(malloc_ops);
1442 let block_receipts = self.block_receipts.read().size_of(malloc_ops);
1443 let block_rewards = self.block_rewards.read().size_of(malloc_ops);
1444 let block_traces = self.block_traces.read().size_of(malloc_ops);
1445 let transaction_indices =
1446 self.transaction_indices.read().size_of(malloc_ops);
1447 let hash_by_block_number =
1448 self.hash_by_block_number.read().size_of(malloc_ops);
1449 let local_block_infos =
1450 self.local_block_info.read().size_of(malloc_ops);
1451
1452 CacheSize {
1453 block_headers,
1454 blocks,
1455 block_receipts,
1456 block_rewards,
1457 block_traces,
1458 transaction_indices,
1459 compact_blocks,
1460 local_block_infos,
1461 hash_by_block_number,
1462 }
1463 }
1464
1465 fn block_cache_gc(&self) {
1466 let current_size = self.cache_size().total();
1467 let mut block_headers = self.block_headers.write();
1468 let mut blocks = self.blocks.write();
1469 let mut compact_blocks = self.compact_blocks.write();
1470 let mut executed_results = self.block_receipts.write();
1471 let mut reward_results = self.block_rewards.write();
1472 let mut block_traces = self.block_traces.write();
1473 let mut tx_indices = self.transaction_indices.write();
1474 let mut local_block_info = self.local_block_info.write();
1475 let mut blamed_header_verified_roots =
1476 self.blamed_header_verified_roots.write();
1477 let mut hash_by_block_number = self.hash_by_block_number.write();
1478 let mut cache_man = self.cache_man.lock();
1479
1480 debug!(
1481 "Before gc cache_size={} {} {} {} {} {} {} {} {} {} {}",
1482 current_size,
1483 block_headers.len(),
1484 blocks.len(),
1485 compact_blocks.len(),
1486 executed_results.len(),
1487 reward_results.len(),
1488 block_traces.len(),
1489 tx_indices.len(),
1490 local_block_info.len(),
1491 blamed_header_verified_roots.len(),
1492 hash_by_block_number.len(),
1493 );
1494
1495 cache_man.collect_garbage(current_size, |ids| {
1496 for id in &ids {
1497 match id {
1498 CacheId::Block(h) => {
1499 blocks.remove(h);
1500 }
1501 CacheId::BlockHeader(h) => {
1502 block_headers.remove(h);
1503 }
1504 CacheId::CompactBlock(h) => {
1505 compact_blocks.remove(h);
1506 }
1507 CacheId::BlockReceipts(h) => {
1508 executed_results.remove(h);
1509 }
1510 CacheId::BlockRewards(h) => {
1511 reward_results.remove(h);
1512 }
1513 CacheId::BlockTraces(h) => {
1514 block_traces.remove(h);
1515 }
1516 CacheId::TransactionAddress(h) => {
1517 tx_indices.remove(h);
1518 }
1519 CacheId::LocalBlockInfo(h) => {
1520 local_block_info.remove(h);
1521 }
1522 CacheId::BlamedHeaderVerifiedRoots(h) => {
1523 blamed_header_verified_roots.remove(h);
1524 }
1525 &CacheId::HashByBlockNumber(block_number) => {
1526 hash_by_block_number.remove(&block_number);
1527 }
1528 }
1529 }
1530
1531 let malloc_ops = &mut new_malloc_size_ops();
1532 block_headers.size_of(malloc_ops)
1533 + blocks.size_of(malloc_ops)
1534 + executed_results.size_of(malloc_ops)
1535 + reward_results.size_of(malloc_ops)
1536 + block_traces.size_of(malloc_ops)
1537 + tx_indices.size_of(malloc_ops)
1538 + compact_blocks.size_of(malloc_ops)
1539 + local_block_info.size_of(malloc_ops)
1540 });
1541
1542 block_headers.shrink_to_fit();
1543 blocks.shrink_to_fit();
1544 executed_results.shrink_to_fit();
1545 reward_results.shrink_to_fit();
1546 block_traces.shrink_to_fit();
1547 tx_indices.shrink_to_fit();
1548 compact_blocks.shrink_to_fit();
1549 local_block_info.shrink_to_fit();
1550 }
1551
1552 pub fn cache_gc(&self) { self.block_cache_gc(); }
1553
1554 pub fn set_cur_consensus_era_genesis_hash(
1555 &self, cur_era_hash: &H256, next_era_hash: &H256,
1556 ) {
1557 self.db_manager
1558 .insert_checkpoint_hashes_to_db(cur_era_hash, next_era_hash);
1559
1560 let mut era_hash = self.cur_consensus_era_genesis_hash.write();
1561 let mut stable_hash = self.cur_consensus_era_stable_hash.write();
1562 *era_hash = cur_era_hash.clone();
1563 *stable_hash = next_era_hash.clone();
1564 }
1565
1566 pub fn get_cur_consensus_era_genesis_hash(&self) -> H256 {
1567 self.cur_consensus_era_genesis_hash.read().clone()
1568 }
1569
1570 pub fn get_cur_consensus_era_stable_hash(&self) -> H256 {
1571 self.cur_consensus_era_stable_hash.read().clone()
1572 }
1573
1574 pub fn recover_unsigned_tx(
1575 &self, transactions: &Vec<TransactionWithSignature>,
1576 ) -> Result<Vec<Arc<SignedTransaction>>, DecoderError> {
1577 let _timer = MeterTimer::time_func(TX_POOL_RECOVER_TIMER.as_ref());
1578 self.tx_data_manager
1580 .recover_unsigned_tx_with_order(transactions)
1581 }
1582
1583 pub fn recover_block(&self, block: &mut Block) -> Result<(), DecoderError> {
1584 self.tx_data_manager.recover_block(block)
1585 }
1586
1587 pub fn recover_unsigned_tx_with_order(
1588 &self, transactions: &Vec<TransactionWithSignature>,
1589 ) -> Result<Vec<Arc<SignedTransaction>>, DecoderError> {
1590 self.tx_data_manager
1591 .recover_unsigned_tx_with_order(transactions)
1592 }
1593
1594 pub fn find_missing_tx_indices_encoded(
1595 &self, compact_block: &mut CompactBlock,
1596 ) -> Vec<usize> {
1597 self.tx_data_manager
1598 .find_missing_tx_indices_encoded(compact_block)
1599 }
1600
1601 pub fn get_state_readonly_index(
1603 &self, block_hash: &EpochId,
1604 ) -> Option<StateIndex> {
1605 let maybe_commitment =
1606 self.get_epoch_execution_commitment_with_db(block_hash);
1607 let maybe_state_index = match maybe_commitment {
1608 None => None,
1609 Some(execution_commitment) => Some(StateIndex::new_for_readonly(
1610 block_hash,
1611 &execution_commitment.state_root_with_aux_info,
1612 )),
1613 };
1614 maybe_state_index
1615 }
1616
1617 pub fn get_parent_epochs_for(
1619 &self, mut block: EpochId, mut count: u64,
1620 ) -> (EpochId, Vec<EpochId>) {
1621 let mut epochs_reverse_order = vec![];
1622 while count > 0 {
1623 debug!("getting parent for block {:?}", block);
1624 epochs_reverse_order.push(block);
1625 block = *self.block_header_by_hash(&block).unwrap().parent_hash();
1626 if block == NULL_EPOCH
1627 || block == *self.cur_consensus_era_genesis_hash.read()
1628 {
1629 break;
1630 }
1631 count -= 1;
1632 }
1633
1634 debug!("get_parent_epochs stopped at block {:?}", block);
1635 epochs_reverse_order.reverse();
1636 (block, epochs_reverse_order)
1637 }
1638
1639 pub fn get_snapshot_epoch_count(&self) -> u32 {
1640 self.storage_manager
1641 .get_storage_manager()
1642 .get_snapshot_epoch_count()
1643 }
1644
1645 pub fn get_snapshot_blame_plus_depth(&self) -> usize {
1646 self.storage_manager
1649 .get_storage_manager()
1650 .get_snapshot_epoch_count() as usize
1651 + 1
1652 }
1653
1654 pub fn get_executed_state_root(&self, block_hash: &H256) -> Option<H256> {
1655 let maybe_commitment =
1656 self.get_epoch_execution_commitment(block_hash).take();
1657 if let Some(commitment) = maybe_commitment {
1658 Some(commitment.state_root_with_aux_info.aux_info.state_root_hash)
1659 } else {
1660 None
1661 }
1662 }
1663
1664 pub fn earliest_epoch_with_block_body(&self) -> u64 {
1665 match self.config.additional_maintained_block_body_epoch_count {
1666 Some(defer) => self.gc_progress.lock().gc_end - defer as u64,
1667 None => 0,
1668 }
1669 }
1670
1671 pub fn earliest_epoch_with_execution_result(&self) -> u64 {
1672 match self
1673 .config
1674 .additional_maintained_execution_result_epoch_count
1675 {
1676 Some(defer) => self.gc_progress.lock().gc_end - defer as u64,
1677 None => 0,
1678 }
1679 }
1680
1681 pub fn earliest_epoch_with_trace(&self) -> u64 {
1682 match self.config.additional_maintained_trace_epoch_count {
1683 Some(defer) => self.gc_progress.lock().gc_end - defer as u64,
1684 None => 0,
1685 }
1686 }
1687
1688 pub fn new_checkpoint(
1689 &self, new_checkpoint_height: u64, best_epoch_number: u64,
1690 ) {
1691 let mut gc_progress = self.gc_progress.lock();
1692 gc_progress.gc_end = new_checkpoint_height;
1693 gc_progress.last_consensus_best_epoch = best_epoch_number;
1694 gc_progress.expected_end_consensus_best_epoch = best_epoch_number
1695 + self.config.checkpoint_gc_time_in_epoch_count as u64;
1696 }
1697
1698 pub fn database_gc(&self, best_epoch: u64) {
1699 let maybe_range = self.gc_progress.lock().get_gc_base_range(best_epoch);
1700 debug!("Start database GC, range={:?}", maybe_range);
1701 if let Some((start, end)) = maybe_range {
1702 for base_epoch in start..end {
1703 self.gc_base_epoch(base_epoch);
1704 }
1705 let mut gc_progress = self.gc_progress.lock();
1706 gc_progress.last_consensus_best_epoch = best_epoch;
1707 gc_progress.next_to_process = end;
1708 self.db_manager.insert_gc_progress_to_db(end);
1709 debug!("Database GC progress: {:?}", gc_progress);
1710 }
1711 }
1712
1713 fn gc_base_epoch(&self, base_epoch: u64) {
1717 let gc_tx_index = || {
1721 let defer_epochs = match self
1722 .config
1723 .additional_maintained_transaction_index_epoch_count
1724 {
1725 Some(x) => x,
1726 None => return,
1727 };
1728 if base_epoch <= defer_epochs as u64 {
1729 return;
1730 }
1731 let epoch_to_remove = base_epoch - defer_epochs as u64;
1732 let epoch_blocks =
1733 match self.all_epoch_set_hashes_from_db(epoch_to_remove) {
1734 None => {
1735 warn!(
1736 "GC epoch set is missing! epoch_to_remove: {}",
1737 epoch_to_remove
1738 );
1739 return;
1740 }
1741 Some(epoch_blocks) => epoch_blocks,
1742 };
1743 let mut transaction_set = HashSet::new();
1746 for transactions in epoch_blocks
1747 .iter()
1748 .filter_map(|b| self.db_manager.block_body_from_db(&b))
1749 {
1750 transaction_set.extend(transactions.iter().map(|tx| tx.hash()));
1751 }
1752 let epoch_block_set: HashSet<H256> =
1753 epoch_blocks.into_iter().collect();
1754
1755 let should_remove = |tx_hash| {
1756 if self.config.strict_tx_index_gc {
1757 if let Some(tx_index) =
1760 self.db_manager.transaction_index_from_db(&tx_hash)
1761 {
1762 epoch_block_set.contains(&tx_index.block_hash)
1763 } else {
1764 false
1765 }
1766 } else {
1767 true
1768 }
1769 };
1770 for tx in transaction_set {
1771 if should_remove(tx) {
1772 self.db_manager.remove_transaction_index_from_db(&tx);
1773 }
1774 }
1775 };
1776
1777 gc_tx_index();
1778
1779 self.gc_epoch_with_defer(
1780 base_epoch,
1781 self.config.additional_maintained_block_body_epoch_count,
1782 |h| self.remove_block_body(h, true ),
1783 );
1784 self.gc_epoch_with_defer(
1785 base_epoch,
1786 self.config
1787 .additional_maintained_execution_result_epoch_count,
1788 |h| self.remove_block_result(h, true ),
1789 );
1790 self.gc_epoch_with_defer(
1791 base_epoch,
1792 self.config.additional_maintained_reward_epoch_count,
1793 |h| self.db_manager.remove_block_reward_result_from_db(h),
1794 );
1795 self.gc_epoch_with_defer(
1796 base_epoch,
1797 self.config.additional_maintained_trace_epoch_count,
1798 |h| self.db_manager.remove_block_trace_from_db(h),
1799 );
1800 }
1801
1802 fn gc_epoch_with_defer<F>(
1803 &self, epoch_number: u64, maybe_defer_epochs: Option<usize>, gc_func: F,
1804 ) where F: Fn(&H256) -> () {
1805 if let Some(defer_epochs) = maybe_defer_epochs {
1806 if epoch_number > defer_epochs as u64 {
1807 let epoch_to_remove = epoch_number - defer_epochs as u64;
1808 match self.all_epoch_set_hashes_from_db(epoch_to_remove) {
1809 None => warn!(
1810 "GC epoch set is missing! epoch_to_remove: {}",
1811 epoch_to_remove
1812 ),
1813 Some(epoch_set) => {
1814 for b in epoch_set {
1815 gc_func(&b);
1816 }
1817 }
1818 }
1819 }
1820 }
1821 }
1822}
1823
1824#[derive(Copy, Clone)]
1825pub enum DbType {
1826 Rocksdb,
1827 Sqlite,
1828}
1829
1830pub struct DataManagerConfiguration {
1831 pub persist_tx_index: bool,
1832 pub persist_block_number_index: bool,
1833 pub tx_cache_index_maintain_timeout: Duration,
1834 pub db_type: DbType,
1835 pub additional_maintained_block_body_epoch_count: Option<usize>,
1836 pub additional_maintained_execution_result_epoch_count: Option<usize>,
1837 pub additional_maintained_reward_epoch_count: Option<usize>,
1838 pub additional_maintained_trace_epoch_count: Option<usize>,
1839 pub additional_maintained_transaction_index_epoch_count: Option<usize>,
1840 pub checkpoint_gc_time_in_epoch_count: usize,
1841 pub strict_tx_index_gc: bool,
1842}
1843
1844impl MallocSizeOf for DataManagerConfiguration {
1845 fn size_of(&self, _ops: &mut MallocSizeOfOps) -> usize { 0 }
1846}
1847
1848impl DataManagerConfiguration {
1849 pub fn new(
1850 persist_tx_index: bool, persist_block_number_index: bool,
1851 tx_cache_index_maintain_timeout: Duration, db_type: DbType,
1852 ) -> Self {
1853 Self {
1854 persist_tx_index,
1855 persist_block_number_index,
1856 tx_cache_index_maintain_timeout,
1857 db_type,
1858 additional_maintained_block_body_epoch_count: None,
1859 additional_maintained_execution_result_epoch_count: None,
1860 additional_maintained_reward_epoch_count: None,
1861 additional_maintained_trace_epoch_count: None,
1862 additional_maintained_transaction_index_epoch_count: None,
1863 checkpoint_gc_time_in_epoch_count: 1,
1864 strict_tx_index_gc: true,
1865 }
1866 }
1867}