cfxcore/block_data_manager/
mod.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::{
6    cache_config::CacheConfig,
7    cache_manager::{CacheId, CacheManager, CacheSize},
8    consensus::consensus_inner::consensus_executor::RewardExecutionInfo,
9    pow::{PowComputer, TargetDifficultyManager},
10};
11use cfx_executor::internal_contract::make_staking_events;
12use cfx_storage::{
13    state_manager::StateIndex, utils::guarded_value::*, StorageManager,
14    StorageManagerTrait,
15};
16use cfx_types::{Bloom, Space, H256};
17pub use cfxcore_types::block_data_manager::block_data_types;
18use db::SystemDB;
19use malloc_size_of::{new_malloc_size_ops, MallocSizeOf, MallocSizeOfOps};
20use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
21use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockUpgradableReadGuard};
22use primitives::{
23    block::CompactBlock,
24    receipt::{BlockReceipts, TransactionStatus},
25    Block, BlockHeader, EpochId, Receipt, SignedTransaction, TransactionIndex,
26    TransactionWithSignature, NULL_EPOCH,
27};
28use rlp::DecoderError;
29use std::{
30    collections::{HashMap, HashSet},
31    sync::Arc,
32};
33use threadpool::ThreadPool;
34pub mod db_gc_manager;
35pub mod db_manager;
36pub mod tx_data_manager;
37use crate::{
38    block_data_manager::{
39        db_manager::DBManager, tx_data_manager::TransactionDataManager,
40    },
41    consensus::pos_handler::PosVerifier,
42};
43pub use block_data_types::*;
44use cfx_execute_helper::{
45    exec_tracer::{BlockExecTraces, TransactionExecTraces},
46    phantom_tx::build_bloom_and_recover_phantom,
47};
48use cfx_internal_common::{
49    EpochExecutionCommitment, StateAvailabilityBoundary, StateRootWithAuxInfo,
50};
51use db_gc_manager::GCProgress;
52use metrics::{register_meter_with_group, Meter, MeterTimer};
53use primitives::pos::PosBlockId;
54use std::{hash::Hash, path::Path, time::Duration};
55
56lazy_static! {
57    static ref TX_POOL_RECOVER_TIMER: Arc<dyn Meter> =
58        register_meter_with_group("timer", "tx_pool::recover_public");
59}
60
61pub const NULLU64: u64 = !0;
62
63#[derive(DeriveMallocSizeOf)]
64pub struct InvalidBlockSet {
65    capacity: usize,
66    invalid_block_hashes: HashSet<H256>,
67}
68
69impl InvalidBlockSet {
70    pub fn new(capacity: usize) -> Self {
71        InvalidBlockSet {
72            capacity,
73            invalid_block_hashes: HashSet::new(),
74        }
75    }
76
77    pub fn insert(&mut self, value: H256) {
78        if !self.invalid_block_hashes.contains(&value) {
79            if self.invalid_block_hashes.len() < self.capacity {
80                self.invalid_block_hashes.insert(value);
81                return;
82            }
83
84            let mut iter = self.invalid_block_hashes.iter();
85            let the_evicted = iter.next().map(|e| e.clone());
86            if let Some(evicted) = the_evicted {
87                self.invalid_block_hashes.remove(&evicted);
88            }
89            self.invalid_block_hashes.insert(value);
90        }
91    }
92
93    pub fn contains(&self, value: &H256) -> bool {
94        self.invalid_block_hashes.contains(value)
95    }
96}
97
98#[derive(DeriveMallocSizeOf)]
99pub struct BlockDataManager {
100    block_headers: RwLock<HashMap<H256, Arc<BlockHeader>>>,
101    blocks: RwLock<HashMap<H256, Arc<Block>>>,
102    compact_blocks: RwLock<HashMap<H256, CompactBlock>>,
103    block_receipts: RwLock<HashMap<H256, BlockReceiptsInfo>>,
104    block_rewards: RwLock<HashMap<H256, BlockRewardsInfo>>,
105    block_traces: RwLock<HashMap<H256, BlockTracesInfo>>,
106    transaction_indices: RwLock<HashMap<H256, TransactionIndex>>,
107    hash_by_block_number: RwLock<HashMap<u64, H256>>,
108    local_block_info: RwLock<HashMap<H256, LocalBlockInfo>>,
109    blamed_header_verified_roots:
110        RwLock<HashMap<u64, BlamedHeaderVerifiedRoots>>,
111    /// Caching for receipts_root and logs_bloom for epochs after
112    /// cur_era_genesis. It is not deferred, i.e., indexed by the hash of
113    /// the pivot block that produces the result when executed.
114    /// It is also used for checking whether an epoch has been executed.
115    /// It can be updated, i.e., adding new items, in the following cases:
116    /// 1) When a new epoch gets executed in normal execution;
117    /// 2) After syncing snapshot, we need to update execution commitment for
118    ///    pivot blocks around snapshot block based on blaming information;
119    /// 3) After recovering block graph from db, update execution commitment
120    ///    from db;
121    /// 4) In BlockDataManager::new(), update execution commitment of
122    ///    true_genesis_block.
123    epoch_execution_commitments:
124        RwLock<HashMap<H256, EpochExecutionCommitment>>,
125    epoch_execution_contexts: RwLock<HashMap<H256, EpochExecutionContext>>,
126
127    invalid_block_set: RwLock<InvalidBlockSet>,
128    cur_consensus_era_genesis_hash: RwLock<H256>,
129    cur_consensus_era_stable_hash: RwLock<H256>,
130    instance_id: Mutex<u64>,
131
132    config: DataManagerConfiguration,
133
134    tx_data_manager: TransactionDataManager,
135    pub db_manager: DBManager,
136
137    // TODO Add MallocSizeOf.
138    #[ignore_malloc_size_of = "Add later"]
139    pub pow: Arc<PowComputer>,
140
141    /// This is the original genesis block.
142    pub true_genesis: Arc<Block>,
143    pub storage_manager: Arc<StorageManager>,
144    cache_man: Arc<Mutex<CacheManager<CacheId>>>,
145    pub target_difficulty_manager: TargetDifficultyManager,
146    gc_progress: Arc<Mutex<GCProgress>>,
147
148    /// This maintains the boundary height of available state and commitments
149    /// (executed but not deleted or in `ExecutionTaskQueue`).
150    /// The upper bound always equal to latest executed epoch height.
151    /// As for the lower bound:
152    ///   1. For archive node, it always equals `cur_era_stable_height`.
153    ///   2. For full node, it equals the height of remotely synchronized state
154    ///      at start, and equals `cur_era_stable_height` after making a new
155    ///      checkpoint.
156    ///
157    /// The lower boundary height will be updated when:
158    ///   1. New checkpoint
159    ///   2. Full Node snapshot syncing
160    ///   3. New Snapshot
161    ///
162    /// The upper boundary height will be updated when:
163    ///   1. Pivot chain switch
164    ///   2. Execution of new epoch
165    ///
166    /// The state of an epoch is valid if and only if the height of the epoch
167    /// is inside the boundary.
168    pub state_availability_boundary: RwLock<StateAvailabilityBoundary>,
169}
170
171impl BlockDataManager {
172    pub fn new(
173        cache_conf: CacheConfig, true_genesis: Arc<Block>, db: Arc<SystemDB>,
174        storage_manager: Arc<StorageManager>,
175        worker_pool: Arc<Mutex<ThreadPool>>, config: DataManagerConfiguration,
176        pow: Arc<PowComputer>,
177    ) -> Self {
178        let mb = 1024 * 1024;
179        let max_cache_size = cache_conf.ledger_mb() * mb;
180        let pref_cache_size = max_cache_size * 3 / 4;
181        let cache_man = Arc::new(Mutex::new(CacheManager::new(
182            pref_cache_size,
183            max_cache_size,
184            3 * mb,
185        )));
186        let tx_data_manager = TransactionDataManager::new(
187            config.tx_cache_index_maintain_timeout,
188            worker_pool,
189        );
190        let db_manager = match config.db_type {
191            DbType::Rocksdb => DBManager::new_from_rocksdb(db, pow.clone()),
192            DbType::Sqlite => DBManager::new_from_sqlite(
193                Path::new("./sqlite_db"),
194                pow.clone(),
195            ),
196        };
197        let previous_db_progress =
198            db_manager.gc_progress_from_db().unwrap_or(0);
199
200        let data_man = Self {
201            block_headers: RwLock::new(HashMap::new()),
202            blocks: RwLock::new(HashMap::new()),
203            compact_blocks: Default::default(),
204            block_receipts: Default::default(),
205            block_rewards: Default::default(),
206            block_traces: Default::default(),
207            transaction_indices: Default::default(),
208            hash_by_block_number: Default::default(),
209            local_block_info: Default::default(),
210            blamed_header_verified_roots: Default::default(),
211            epoch_execution_commitments: Default::default(),
212            epoch_execution_contexts: Default::default(),
213            invalid_block_set: RwLock::new(InvalidBlockSet::new(
214                cache_conf.invalid_block_hashes_cache_size_in_count,
215            )),
216            true_genesis: true_genesis.clone(),
217            storage_manager,
218            cache_man,
219            instance_id: Mutex::new(0),
220            config,
221            target_difficulty_manager: TargetDifficultyManager::new(
222                cache_conf.target_difficulties_cache_size_in_count,
223            ),
224            cur_consensus_era_genesis_hash: RwLock::new(true_genesis.hash()),
225            cur_consensus_era_stable_hash: RwLock::new(true_genesis.hash()),
226            tx_data_manager,
227            db_manager,
228            pow,
229            state_availability_boundary: RwLock::new(
230                StateAvailabilityBoundary::new(
231                    true_genesis.hash(),
232                    0,
233                    None,
234                    None,
235                ),
236            ),
237            gc_progress: Arc::new(Mutex::new(GCProgress::new(
238                previous_db_progress,
239            ))),
240        };
241
242        data_man.initialize_instance_id();
243
244        let cur_era_genesis_hash =
245            match data_man.db_manager.checkpoint_hashes_from_db() {
246                None => true_genesis.hash(),
247                Some((checkpoint_hash, stable_hash)) => {
248                    *data_man.cur_consensus_era_genesis_hash.write() =
249                        checkpoint_hash;
250                    *data_man.cur_consensus_era_stable_hash.write() =
251                        stable_hash;
252                    checkpoint_hash
253                }
254            };
255
256        debug!(
257            "BlockDataManager::new() cur_era_genesis_hash: {:?}",
258            &cur_era_genesis_hash
259        );
260
261        if cur_era_genesis_hash == data_man.true_genesis.hash() {
262            // Only insert block body for true genesis
263            data_man.insert_block(
264                data_man.true_genesis.clone(),
265                true, /* persistent */
266            );
267            for (index, tx) in
268                data_man.true_genesis.transactions.iter().enumerate()
269            {
270                data_man.insert_transaction_index(
271                    &tx.hash,
272                    &TransactionIndex {
273                        block_hash: cur_era_genesis_hash,
274                        real_index: index,
275                        is_phantom: false,
276                        // FIXME(thegaram): do we allow EVM txs in genesis?
277                        rpc_index: Some(index),
278                    },
279                );
280            }
281            // Initialize ExecutionContext for true genesis
282            data_man.insert_epoch_execution_context(
283                cur_era_genesis_hash,
284                EpochExecutionContext {
285                    start_block_number: 0,
286                },
287                true,
288            );
289            // persist local_block_info for true genesis
290            data_man.db_manager.insert_local_block_info_to_db(
291                &data_man.true_genesis.hash(),
292                &LocalBlockInfo::new(
293                    BlockStatus::Valid,
294                    0,
295                    data_man.get_instance_id(),
296                ),
297            );
298            data_man.insert_epoch_execution_commitment(
299                data_man.true_genesis.hash(),
300                data_man.true_genesis_state_root(),
301                *data_man.true_genesis.block_header.deferred_receipts_root(),
302                *data_man
303                    .true_genesis
304                    .block_header
305                    .deferred_logs_bloom_hash(),
306            );
307        } else {
308            // Recover ExecutionContext for cur_era_genesis from db
309            data_man.insert_epoch_execution_context(
310                cur_era_genesis_hash,
311                data_man
312                    .get_epoch_execution_context(&cur_era_genesis_hash)
313                    .expect("ExecutionContext exists for cur_era_genesis"),
314                false, /* Not persistent because it's already in db */
315            );
316            // for other era genesis, we need to change the instance_id
317            if let Some(mut local_block_info) = data_man
318                .db_manager
319                .local_block_info_from_db(&cur_era_genesis_hash)
320            {
321                local_block_info.instance_id = data_man.get_instance_id();
322                data_man.db_manager.insert_local_block_info_to_db(
323                    &cur_era_genesis_hash,
324                    &local_block_info,
325                );
326            }
327            // The commitments of cur_era_genesis will be recovered in
328            // `construct_pivot_state` with other epochs
329        }
330
331        data_man
332    }
333
334    pub fn get_instance_id(&self) -> u64 { *self.instance_id.lock() }
335
336    pub fn initialize_instance_id(&self) {
337        let mut my_instance_id = self.instance_id.lock();
338        if *my_instance_id == 0 {
339            // load last instance id
340            let instance_id = self.db_manager.instance_id_from_db();
341
342            // set new instance id
343            if let Some(instance_id) = instance_id {
344                *my_instance_id = instance_id + 1;
345            }
346        } else {
347            // This case will only happen when full node begins to sync block
348            // bodies. And we should change the instance_id of genesis block to
349            // current one.
350            *my_instance_id += 1;
351            if let Some(mut local_block_info) =
352                self.db_manager.local_block_info_from_db(
353                    &self.get_cur_consensus_era_genesis_hash(),
354                )
355            {
356                local_block_info.instance_id = *my_instance_id;
357                self.db_manager.insert_local_block_info_to_db(
358                    &self.get_cur_consensus_era_genesis_hash(),
359                    &local_block_info,
360                );
361            }
362        }
363
364        // persist new instance id
365        self.db_manager.insert_instance_id_to_db(*my_instance_id);
366    }
367
368    /// This will return the state root of true genesis block.
369    pub fn true_genesis_state_root(&self) -> StateRootWithAuxInfo {
370        let true_genesis_hash = self.true_genesis.hash();
371        self.storage_manager
372            .get_state_no_commit(
373                StateIndex::new_for_readonly(
374                    &true_genesis_hash,
375                    &StateRootWithAuxInfo::genesis(&true_genesis_hash),
376                ),
377                /* try_open = */ false,
378                None,
379            )
380            .unwrap()
381            .unwrap()
382            .get_state_root()
383            .unwrap()
384    }
385
386    pub fn transaction_by_hash(
387        &self, hash: &H256,
388    ) -> Option<Arc<SignedTransaction>> {
389        let tx_index = self
390            .transaction_index_by_hash(hash, false /* update_cache */)?;
391        let block = self.block_by_hash(
392            &tx_index.block_hash,
393            false, /* update_cache */
394        )?;
395        assert!(tx_index.real_index < block.transactions.len());
396        Some(block.transactions[tx_index.real_index].clone())
397    }
398
399    /// insert block body in memory cache and db
400    pub fn insert_block_body(
401        &self, hash: H256, block: Arc<Block>, persistent: bool,
402    ) {
403        if persistent {
404            self.db_manager.insert_block_body_to_db(block.as_ref());
405        }
406        self.cache_man.lock().note_used(CacheId::Block(hash));
407        self.blocks.write().insert(hash, block);
408    }
409
410    /// remove block body in memory cache and db
411    pub fn remove_block_body(&self, hash: &H256, remove_db: bool) {
412        if remove_db {
413            self.db_manager.remove_block_body_from_db(hash);
414        }
415        self.blocks.write().remove(hash);
416    }
417
418    /// TODO Also set block header
419    pub fn block_by_hash(
420        &self, hash: &H256, update_cache: bool,
421    ) -> Option<Arc<Block>> {
422        self.get(
423            hash,
424            &self.blocks,
425            |key| self.db_manager.block_from_db(key).map(Arc::new),
426            if update_cache {
427                Some(CacheId::Block(*hash))
428            } else {
429                None
430            },
431        )
432    }
433
434    /// This function returns the block from db without wrapping it in `Arc`.
435    pub fn block_from_db(&self, hash: &H256) -> Option<Block> {
436        self.db_manager.block_from_db(hash)
437    }
438
439    pub fn blocks_by_hash_list(
440        &self, hashes: &Vec<H256>, update_cache: bool,
441    ) -> Option<Vec<Arc<Block>>> {
442        let mut blocks = Vec::new();
443        for h in hashes {
444            blocks.push(self.block_by_hash(h, update_cache)?);
445        }
446        Some(blocks)
447    }
448
449    /// insert block/header into memory cache, block/header into db
450    pub fn insert_block(&self, block: Arc<Block>, persistent: bool) {
451        let hash = block.hash();
452        self.insert_block_header(
453            hash,
454            Arc::new(block.block_header.clone()),
455            persistent,
456        );
457        self.insert_block_body(hash, block, persistent);
458    }
459
460    /// Remove block body and block header in memory cache and db.
461    /// This is used to delete invalid blocks or dangling blocks never connected
462    /// to the pivot chain.
463    pub fn remove_useless_block(&self, hash: &H256, remove_db: bool) {
464        // If a block has entered consensus before, it is a part of the
465        // blockchain and we should not remove it here.
466        if self
467            .local_block_info_by_hash(hash)
468            .map(|info| info.get_status() == BlockStatus::Invalid)
469            .unwrap_or(true)
470        {
471            self.remove_block_header(hash, remove_db);
472            self.remove_block_body(hash, remove_db);
473        }
474    }
475
476    /// Get the traces for a single block without checking the assumed pivot
477    /// block
478    /// Return `BlockTracesWithEpoch`.
479    pub fn block_traces_by_hash(
480        &self, hash: &H256,
481    ) -> Option<BlockTracesWithEpoch> {
482        let maybe_traces_in_mem = self
483            .block_traces
484            .read()
485            .get(hash)
486            .and_then(|traces_info| traces_info.get_current_data());
487        // Make sure the ReadLock of `block_traces` is dropped here.
488        let maybe_traces = maybe_traces_in_mem.or_else(|| {
489            self.db_manager.block_traces_from_db(hash).map(
490                |traces_with_epoch| {
491                    self.block_traces
492                        .write()
493                        .entry(*hash)
494                        .or_insert(BlockTracesInfo::default())
495                        .insert_data(
496                            &traces_with_epoch.0,
497                            traces_with_epoch.1.clone(),
498                        );
499                    traces_with_epoch
500                },
501            )
502        });
503        if maybe_traces.is_some() {
504            self.cache_man.lock().note_used(CacheId::BlockTraces(*hash));
505        }
506        maybe_traces
507    }
508
509    /// Return `(pivot_hash, tx_traces)`.
510    pub fn block_tx_traces_by_hash(
511        &self, hash: &H256,
512    ) -> Option<(H256, Vec<TransactionExecTraces>)> {
513        self.block_traces_by_hash(hash).map(
514            |DataVersionTuple(pivot_hash, block_trace)| {
515                (pivot_hash, block_trace.into())
516            },
517        )
518    }
519
520    pub fn block_traces_by_hash_with_epoch(
521        &self, hash: &H256, assumed_epoch: &H256,
522        update_pivot_assumption: bool, update_cache: bool,
523    ) -> Option<BlockExecTraces> {
524        self.get_version(
525            hash,
526            assumed_epoch,
527            &self.block_traces,
528            update_pivot_assumption,
529            match update_cache {
530                true => Some(CacheId::BlockTraces(*hash)),
531                false => None,
532            },
533            |key| self.db_manager.block_traces_from_db(key),
534            |key, result| {
535                self.db_manager.insert_block_traces_to_db(key, result);
536            },
537        )
538    }
539
540    pub fn insert_block_traces(
541        &self, hash: H256, trace: BlockExecTraces, pivot_hash: H256,
542        persistent: bool,
543    ) {
544        trace! {"insert_block_traces start pivot={:?}", pivot_hash};
545        self.insert_version(
546            hash,
547            &pivot_hash,
548            trace,
549            |key, result| {
550                self.db_manager.insert_block_traces_to_db(key, result);
551            },
552            &self.block_traces,
553            CacheId::BlockTraces(hash),
554            persistent,
555        );
556        trace! {"insert_block_traces ends pivot={:?}", pivot_hash};
557    }
558
559    /// remove block traces in memory cache and db
560    pub fn remove_block_traces(&self, hash: &H256, remove_db: bool) {
561        if remove_db {
562            self.db_manager.remove_block_trace_from_db(hash);
563        }
564        self.block_traces.write().remove(hash);
565    }
566
567    pub fn block_header_by_hash(
568        &self, hash: &H256,
569    ) -> Option<Arc<BlockHeader>> {
570        self.get(
571            hash,
572            &self.block_headers,
573            |key| self.db_manager.block_header_from_db(key).map(Arc::new),
574            Some(CacheId::BlockHeader(*hash)),
575        )
576    }
577
578    pub fn insert_block_header(
579        &self, hash: H256, header: Arc<BlockHeader>, persistent: bool,
580    ) {
581        self.insert(
582            hash,
583            header,
584            &self.block_headers,
585            |_, value| {
586                self.db_manager.insert_block_header_to_db(value.as_ref())
587            },
588            Some(CacheId::BlockHeader(hash)),
589            persistent,
590        )
591    }
592
593    /// remove block header in memory cache and db
594    pub fn remove_block_header(&self, hash: &H256, remove_db: bool) {
595        if remove_db {
596            self.db_manager.remove_block_header_from_db(hash);
597        }
598        self.block_headers.write().remove(hash);
599    }
600
601    pub fn block_height_by_hash(&self, hash: &H256) -> Option<u64> {
602        let result = self.block_header_by_hash(hash)?;
603        Some(result.height())
604    }
605
606    /// Return `None` if the header does not exist.
607    /// Return `Some(None)` if the header exist but it does not have a PoS
608    /// reference field.
609    pub fn pos_reference_by_hash(
610        &self, hash: &H256,
611    ) -> Option<Option<PosBlockId>> {
612        self.block_header_by_hash(hash)
613            .map(|header| header.pos_reference().clone())
614    }
615
616    pub fn compact_block_by_hash(&self, hash: &H256) -> Option<CompactBlock> {
617        self.compact_blocks.read().get(hash).map(|b| {
618            self.cache_man
619                .lock()
620                .note_used(CacheId::CompactBlock(b.hash()));
621            b.clone()
622        })
623    }
624
625    pub fn insert_compact_block(&self, cb: CompactBlock) {
626        let hash = cb.hash();
627        self.compact_blocks.write().insert(hash, cb);
628        self.cache_man.lock().note_used(CacheId::CompactBlock(hash));
629    }
630
631    pub fn contains_compact_block(&self, hash: &H256) -> bool {
632        self.compact_blocks.read().contains_key(hash)
633    }
634
635    /// Return None if receipts for corresponding epoch is not computed before
636    /// or has been overwritten by another new pivot chain in db.
637    /// If `update_pivot_assumption` is true and we have execution results of
638    /// `assumed_epoch` in memory, we will also ensure `assumed_epoch`
639    /// is persisted as the pivot hash in db.
640    ///
641    /// This function will require lock of block_receipts.
642    pub fn block_execution_result_by_hash_with_epoch(
643        &self, hash: &H256, assumed_epoch: &H256,
644        update_pivot_assumption: bool, update_cache: bool,
645    ) -> Option<BlockExecutionResult> {
646        self.get_version(
647            hash,
648            assumed_epoch,
649            &self.block_receipts,
650            update_pivot_assumption,
651            match update_cache {
652                true => Some(CacheId::BlockReceipts(*hash)),
653                false => None,
654            },
655            |key| self.db_manager.block_execution_result_from_db(key),
656            |key, result| {
657                self.db_manager
658                    .insert_block_execution_result_to_db(key, result);
659            },
660        )
661    }
662
663    pub fn block_execution_result_by_hash_from_db(
664        &self, hash: &H256,
665    ) -> Option<BlockExecutionResultWithEpoch> {
666        self.db_manager.block_execution_result_from_db(hash)
667    }
668
669    pub fn block_epoch_number(&self, hash: &H256) -> Option<u64> {
670        if hash == &self.true_genesis.hash() {
671            // True genesis is not executed and does not have an execution
672            // result, so we need to process it specially.
673            return Some(0);
674        }
675        self.block_execution_result_by_hash_from_db(&hash)
676            .map(|execution_result| execution_result.0)
677            .and_then(|pivot| self.block_header_by_hash(&pivot))
678            .map(|header| header.height())
679    }
680
681    pub fn insert_block_execution_result(
682        &self, hash: H256, epoch: H256, block_receipts: Arc<BlockReceipts>,
683        persistent: bool,
684    ) {
685        trace! {"insert_block_traces start pivot={:?}", epoch};
686        let bloom =
687            block_receipts
688                .receipts
689                .iter()
690                .fold(Bloom::zero(), |mut b, r| {
691                    b.accrue_bloom(&r.log_bloom);
692                    b
693                });
694        self.insert_version(
695            hash,
696            &epoch,
697            BlockExecutionResult {
698                block_receipts,
699                bloom,
700            },
701            |key, result| {
702                self.db_manager
703                    .insert_block_execution_result_to_db(key, result);
704            },
705            &self.block_receipts,
706            CacheId::BlockReceipts(hash),
707            persistent,
708        );
709        trace! {"insert_block_traces end pivot={:?}", epoch};
710    }
711
712    pub fn insert_block_reward_result(
713        &self, hash: H256, epoch: &H256, block_reward: BlockRewardResult,
714        persistent: bool,
715    ) {
716        self.insert_version(
717            hash,
718            epoch,
719            block_reward,
720            |key, result| {
721                self.db_manager
722                    .insert_block_reward_result_to_db(key, &result);
723            },
724            &self.block_rewards,
725            CacheId::BlockRewards(hash),
726            persistent,
727        );
728    }
729
730    pub fn block_reward_result_by_hash_with_epoch(
731        &self, hash: &H256, assumed_epoch_later: &H256,
732        update_pivot_assumption: bool, update_cache: bool,
733    ) -> Option<BlockRewardResult> {
734        self.get_version(
735            hash,
736            assumed_epoch_later,
737            &self.block_rewards,
738            update_pivot_assumption,
739            match update_cache {
740                true => Some(CacheId::BlockRewards(*hash)),
741                false => None,
742            },
743            |key| self.db_manager.block_reward_result_from_db(key),
744            |key, result| {
745                self.db_manager
746                    .insert_block_reward_result_to_db(key, result)
747            },
748        )
749    }
750
751    pub fn remove_block_result(&self, hash: &H256, remove_db: bool) {
752        self.block_receipts.write().remove(hash);
753        self.block_rewards.write().remove(hash);
754        if remove_db {
755            self.db_manager.remove_block_execution_result_from_db(hash);
756            self.db_manager.remove_block_reward_result_from_db(hash);
757        }
758    }
759
760    pub fn transaction_index_by_hash(
761        &self, hash: &H256, update_cache: bool,
762    ) -> Option<TransactionIndex> {
763        if self.config.persist_tx_index {
764            self.get(
765                hash,
766                &self.transaction_indices,
767                |key| self.db_manager.transaction_index_from_db(key),
768                if update_cache {
769                    Some(CacheId::TransactionAddress(*hash))
770                } else {
771                    None
772                },
773            )
774        } else {
775            self.transaction_indices.read().get(hash).map(|v| v.clone())
776        }
777    }
778
779    pub fn insert_transaction_index(
780        &self, hash: &H256, tx_index: &TransactionIndex,
781    ) {
782        if self.config.persist_tx_index {
783            // transaction_indices will not be updated if it's not inserted
784            // before
785            self.transaction_indices
786                .write()
787                .entry(*hash)
788                .and_modify(|v| {
789                    *v = tx_index.clone();
790                    self.cache_man
791                        .lock()
792                        .note_used(CacheId::TransactionAddress(*hash));
793                });
794            self.db_manager
795                .insert_transaction_index_to_db(hash, tx_index);
796        } else {
797            // If not persisted, we will just hold it temporarily in memory
798            self.transaction_indices
799                .write()
800                .insert(hash.clone(), tx_index.clone());
801            self.cache_man
802                .lock()
803                .note_used(CacheId::TransactionAddress(*hash));
804        }
805    }
806
807    pub fn hash_by_block_number(
808        &self, block_number: u64, update_cache: bool,
809    ) -> Option<H256> {
810        if self.config.persist_block_number_index {
811            self.get(
812                &block_number,
813                &self.hash_by_block_number,
814                |key| self.db_manager.hash_by_block_number_from_db(key),
815                if update_cache {
816                    Some(CacheId::HashByBlockNumber(block_number))
817                } else {
818                    None
819                },
820            )
821        } else {
822            self.hash_by_block_number
823                .read()
824                .get(&block_number)
825                .map(|v| v.clone())
826        }
827    }
828
829    pub fn insert_hash_by_block_number(
830        &self, block_number: u64, block_hash: &H256,
831    ) {
832        if self.config.persist_block_number_index {
833            self.hash_by_block_number
834                .write()
835                .entry(block_number)
836                .and_modify(|v| {
837                    *v = block_hash.clone();
838                    self.cache_man
839                        .lock()
840                        .note_used(CacheId::HashByBlockNumber(block_number));
841                });
842            self.db_manager
843                .insert_hash_by_block_number_to_db(block_number, block_hash);
844        } else {
845            // If not persisted, we will just hold it temporarily in memory
846            self.hash_by_block_number
847                .write()
848                .insert(block_number, *block_hash);
849            self.cache_man
850                .lock()
851                .note_used(CacheId::HashByBlockNumber(block_number));
852        }
853    }
854
855    pub fn insert_local_block_info(&self, hash: &H256, info: LocalBlockInfo) {
856        self.insert(
857            *hash,
858            info,
859            &self.local_block_info,
860            |key, value| {
861                self.db_manager.insert_local_block_info_to_db(key, value)
862            },
863            Some(CacheId::LocalBlockInfo(*hash)),
864            true,
865        )
866    }
867
868    pub fn local_block_info_by_hash(
869        &self, hash: &H256,
870    ) -> Option<LocalBlockInfo> {
871        self.get(
872            hash,
873            &self.local_block_info,
874            |key| self.db_manager.local_block_info_from_db(key),
875            Some(CacheId::LocalBlockInfo(*hash)),
876        )
877    }
878
879    pub fn insert_blamed_header_verified_roots(
880        &self, height: u64, roots: BlamedHeaderVerifiedRoots,
881    ) {
882        self.insert(
883            height,
884            roots,
885            &self.blamed_header_verified_roots,
886            |key, value| {
887                self.db_manager
888                    .insert_blamed_header_verified_roots_to_db(*key, value)
889            },
890            Some(CacheId::BlamedHeaderVerifiedRoots(height)),
891            true,
892        )
893    }
894
895    /// Get correct roots of blamed headers from db.
896    /// These are maintained on light nodes only.
897    pub fn verified_blamed_roots_by_height(
898        &self, height: u64,
899    ) -> Option<BlamedHeaderVerifiedRoots> {
900        self.get(
901            &height,
902            &self.blamed_header_verified_roots,
903            |key| self.db_manager.blamed_header_verified_roots_from_db(*key),
904            Some(CacheId::BlamedHeaderVerifiedRoots(height)),
905        )
906    }
907
908    pub fn remove_blamed_header_verified_roots(&self, height: u64) {
909        self.blamed_header_verified_roots.write().remove(&height);
910        self.db_manager
911            .remove_blamed_header_verified_roots_from_db(height);
912    }
913
914    fn insert<K, V, InsertF>(
915        &self, key: K, value: V, in_mem: &RwLock<HashMap<K, V>>,
916        insert_f: InsertF, maybe_cache_id: Option<CacheId>, persistent: bool,
917    ) where
918        K: Clone + Eq + Hash,
919        InsertF: Fn(&K, &V),
920    {
921        if persistent {
922            insert_f(&key, &value);
923        }
924        in_mem.write().insert(key.clone(), value);
925        if let Some(cache_id) = maybe_cache_id {
926            self.cache_man.lock().note_used(cache_id);
927        }
928    }
929
930    fn get<K, V, LoadF>(
931        &self, key: &K, in_mem: &RwLock<HashMap<K, V>>, load_f: LoadF,
932        maybe_cache_id: Option<CacheId>,
933    ) -> Option<V>
934    where
935        K: Clone + Eq + Hash,
936        V: Clone,
937        LoadF: Fn(&K) -> Option<V>,
938    {
939        if let Some(value) = in_mem.read().get(key) {
940            return Some(value.clone());
941        }
942        load_f(key).map(|value| {
943            if let Some(cache_id) = maybe_cache_id {
944                let mut write = in_mem.write();
945                write.insert(key.clone(), value.clone());
946                self.cache_man.lock().note_used(cache_id);
947            }
948            value
949        })
950    }
951
952    fn insert_version<K, Ver, V, InsertF>(
953        &self, key: K, version: &Ver, value: V, insert_f: InsertF,
954        in_mem: &RwLock<HashMap<K, BlockDataWithMultiVersion<Ver, V>>>,
955        cache_id: CacheId, persistent: bool,
956    ) where
957        K: Eq + Hash,
958        Ver: Clone + Eq + PartialEq + Copy,
959        V: Clone,
960        InsertF: Fn(&K, &DataVersionTuple<Ver, V>),
961    {
962        let result = DataVersionTuple(version.clone(), value);
963        if persistent {
964            insert_f(&key, &result);
965        }
966        in_mem
967            .write()
968            .entry(key)
969            .or_insert(BlockDataWithMultiVersion::default())
970            .insert_current_data(version, result.1);
971        self.cache_man.lock().note_used(cache_id);
972    }
973
974    fn get_version<K, Ver, V, LoadF, InsertF>(
975        &self, key: &K, version: &Ver,
976        in_mem: &RwLock<HashMap<K, BlockDataWithMultiVersion<Ver, V>>>,
977        update_current_version: bool, maybe_cache_id: Option<CacheId>,
978        load_f: LoadF, insert_f: InsertF,
979    ) -> Option<V>
980    where
981        K: Eq + Hash + Clone,
982        Ver: Eq + Copy + PartialEq + std::fmt::Display,
983        V: Clone,
984        LoadF: Fn(&K) -> Option<DataVersionTuple<Ver, V>>,
985        InsertF: Fn(&K, &DataVersionTuple<Ver, V>),
986    {
987        if let Some(versions) = in_mem.write().get_mut(key) {
988            if let Some((value, is_current_version)) =
989                versions.get_data_at_version(version)
990            {
991                if update_current_version && !is_current_version {
992                    versions.set_current_version(*version);
993                    insert_f(key, &DataVersionTuple(*version, value.clone()));
994                }
995                if let Some(cache_id) = maybe_cache_id {
996                    self.cache_man.lock().note_used(cache_id);
997                }
998                return Some(value);
999            }
1000        }
1001        let DataVersionTuple(version_in_db, res) = load_f(key)?;
1002        if version_in_db != *version {
1003            debug!(
1004                "Version from db {} does not match required {}",
1005                version_in_db, version
1006            );
1007            return None;
1008        }
1009        if let Some(cache_id) = maybe_cache_id {
1010            in_mem
1011                .write()
1012                .entry(key.clone())
1013                .or_insert(BlockDataWithMultiVersion::default())
1014                .insert_data(version, res.clone());
1015            self.cache_man.lock().note_used(cache_id);
1016        }
1017        Some(res)
1018    }
1019
1020    pub fn insert_terminals_to_db(&self, terminals: Vec<H256>) {
1021        self.db_manager.insert_terminals_to_db(&terminals)
1022    }
1023
1024    pub fn terminals_from_db(&self) -> Option<Vec<H256>> {
1025        self.db_manager.terminals_from_db()
1026    }
1027
1028    pub fn insert_executed_epoch_set_hashes_to_db(
1029        &self, epoch_number: u64, epoch_set: &Vec<H256>,
1030    ) {
1031        self.db_manager
1032            .insert_executed_epoch_set_hashes_to_db(epoch_number, epoch_set);
1033    }
1034
1035    pub fn insert_skipped_epoch_set_hashes_to_db(
1036        &self, epoch_number: u64, skipped_set: &Vec<H256>,
1037    ) {
1038        self.db_manager
1039            .insert_skipped_epoch_set_hashes_to_db(epoch_number, skipped_set);
1040    }
1041
1042    pub fn executed_epoch_set_hashes_from_db(
1043        &self, epoch_number: u64,
1044    ) -> Option<Vec<H256>> {
1045        if epoch_number != 0 {
1046            self.db_manager
1047                .executed_epoch_set_hashes_from_db(epoch_number)
1048        } else {
1049            Some(vec![self.true_genesis.hash()])
1050        }
1051    }
1052
1053    pub fn skipped_epoch_set_hashes_from_db(
1054        &self, epoch_number: u64,
1055    ) -> Option<Vec<H256>> {
1056        if epoch_number != 0 {
1057            self.db_manager
1058                .skipped_epoch_set_hashes_from_db(epoch_number)
1059        } else {
1060            Some(vec![])
1061        }
1062    }
1063
1064    pub fn all_epoch_set_hashes_from_db(
1065        &self, epoch_number: u64,
1066    ) -> Option<Vec<H256>> {
1067        if epoch_number != 0 {
1068            let mut res = self
1069                .db_manager
1070                .skipped_epoch_set_hashes_from_db(epoch_number)?;
1071            res.append(
1072                &mut self
1073                    .db_manager
1074                    .executed_epoch_set_hashes_from_db(epoch_number)?,
1075            );
1076            Some(res)
1077        } else {
1078            Some(vec![self.true_genesis.hash()])
1079        }
1080    }
1081
1082    /// Return `false` if there is no executed results for given `block_hash`
1083    pub fn receipts_retain_epoch(
1084        &self, block_hash: &H256, epoch: &H256,
1085    ) -> bool {
1086        match self.block_receipts.write().get_mut(block_hash) {
1087            Some(r) => {
1088                r.retain_version(epoch);
1089                true
1090            }
1091            None => false,
1092        }
1093    }
1094
1095    pub fn insert_epoch_execution_context(
1096        &self, hash: H256, ctx: EpochExecutionContext, persistent: bool,
1097    ) {
1098        self.insert(
1099            hash,
1100            ctx,
1101            &self.epoch_execution_contexts,
1102            |key, value| {
1103                self.db_manager.insert_execution_context_to_db(key, value)
1104            },
1105            None,
1106            persistent,
1107        );
1108    }
1109
1110    /// The in-memory state will not be updated because it's only garbage
1111    /// collected explicitly when we make checkpoints.
1112    pub fn get_epoch_execution_context(
1113        &self, hash: &H256,
1114    ) -> Option<EpochExecutionContext> {
1115        self.get(
1116            hash,
1117            &self.epoch_execution_contexts,
1118            |key| self.db_manager.execution_context_from_db(key),
1119            None,
1120        )
1121    }
1122
1123    /// TODO We can avoid persisting execution_commitments for blocks
1124    /// not on the pivot chain after a checkpoint
1125    pub fn insert_epoch_execution_commitment(
1126        &self, block_hash: H256,
1127        state_root_with_aux_info: StateRootWithAuxInfo, receipts_root: H256,
1128        logs_bloom_hash: H256,
1129    ) {
1130        let commitment = EpochExecutionCommitment {
1131            state_root_with_aux_info,
1132            receipts_root,
1133            logs_bloom_hash,
1134        };
1135        self.insert(
1136            block_hash,
1137            commitment,
1138            &self.epoch_execution_commitments,
1139            |key, value| {
1140                self.db_manager
1141                    .insert_epoch_execution_commitment_to_db(key, value)
1142            },
1143            None,
1144            true,
1145        );
1146    }
1147
1148    /// Get in-mem execution commitment.
1149    pub fn get_epoch_execution_commitment(
1150        &self, block_hash: &H256,
1151    ) -> GuardedValue<
1152        RwLockReadGuard<'_, HashMap<H256, EpochExecutionCommitment>>,
1153        NonCopy<Option<&'_ EpochExecutionCommitment>>,
1154    > {
1155        let read_lock = self.epoch_execution_commitments.read();
1156        let (read_lock, derefed) = GuardedValue::new_derefed(read_lock).into();
1157        GuardedValue::new(read_lock, NonCopy(derefed.0.get(block_hash)))
1158    }
1159
1160    /// Load commitment from db.
1161    /// The caller should ensure that the loaded commitment is after
1162    /// cur_era_genesis and can be garbage-collected by checkpoint.
1163    pub fn load_epoch_execution_commitment_from_db(
1164        &self, block_hash: &H256,
1165    ) -> Option<EpochExecutionCommitment> {
1166        let commitment = self
1167            .db_manager
1168            .epoch_execution_commitment_from_db(block_hash)?;
1169        self.epoch_execution_commitments
1170            .write()
1171            .insert(*block_hash, commitment.clone());
1172        Some(commitment)
1173    }
1174
1175    /// Get persisted execution commitment.
1176    /// It will check db if it's missing in db.
1177    pub fn get_epoch_execution_commitment_with_db(
1178        &self, block_hash: &H256,
1179    ) -> Option<EpochExecutionCommitment> {
1180        self.get_epoch_execution_commitment(block_hash).map_or_else(
1181            || {
1182                self.db_manager
1183                    .epoch_execution_commitment_from_db(block_hash)
1184            },
1185            |maybe_ref| Some(maybe_ref.clone()),
1186        )
1187    }
1188
1189    pub fn insert_pos_reward(
1190        &self, pos_epoch: u64, pos_reward: &PosRewardInfo,
1191    ) {
1192        self.db_manager.insert_pos_reward(pos_epoch, pos_reward)
1193    }
1194
1195    pub fn pos_reward_by_pos_epoch(
1196        &self, pos_epoch: u64,
1197    ) -> Option<PosRewardInfo> {
1198        self.db_manager.pos_reward_by_pos_epoch(pos_epoch)
1199    }
1200
1201    pub fn remove_epoch_execution_commitment(&self, block_hash: &H256) {
1202        self.epoch_execution_commitments.write().remove(block_hash);
1203    }
1204
1205    pub fn remove_epoch_execution_commitment_from_db(&self, block_hash: &H256) {
1206        self.db_manager
1207            .remove_epoch_execution_commitment_from_db(block_hash);
1208    }
1209
1210    pub fn remove_epoch_execution_context(&self, block_hash: &H256) {
1211        self.epoch_execution_contexts.write().remove(block_hash);
1212    }
1213
1214    pub fn remove_epoch_execution_context_from_db(&self, block_hash: &H256) {
1215        self.db_manager
1216            .remove_epoch_execution_context_from_db(block_hash);
1217    }
1218
1219    pub fn epoch_executed(&self, epoch_hash: &H256) -> bool {
1220        // `block_receipts_root` is not computed when recovering from db
1221        self.get_epoch_execution_commitment(epoch_hash).is_some()
1222    }
1223
1224    /// Check if all executed results of an epoch exist
1225    pub fn epoch_executed_and_recovered(
1226        &self, epoch_hash: &H256, epoch_block_hashes: &Vec<H256>,
1227        on_local_pivot: bool, update_trace: bool,
1228        reward_execution_info: &Option<RewardExecutionInfo>,
1229        pos_verifier: &PosVerifier, evm_chain_id: u32,
1230    ) -> bool {
1231        if !self.epoch_executed(epoch_hash) {
1232            return false;
1233        }
1234
1235        if !on_local_pivot {
1236            return true;
1237        }
1238        // Check if all blocks receipts and traces are from this epoch
1239        let mut epoch_receipts = Vec::new();
1240        let mut epoch_staking_events = Vec::new();
1241        for h in epoch_block_hashes {
1242            if let Some(r) = self.block_execution_result_by_hash_with_epoch(
1243                h, epoch_hash, true, /* update_pivot_assumption */
1244                true, /* update_cache */
1245            ) {
1246                epoch_receipts.push(r.block_receipts);
1247            } else {
1248                return false;
1249            }
1250            if update_trace {
1251                // Update block traces in db if needed.
1252                if self
1253                    .block_traces_by_hash_with_epoch(
1254                        h, epoch_hash, true, /* update_pivot_assumption */
1255                        true, /* update_cache */
1256                    )
1257                    .is_none()
1258                {
1259                    return false;
1260                }
1261            }
1262        }
1263
1264        let mut evm_tx_index = 0;
1265
1266        // Recover tx address if we will skip pivot chain execution
1267        for (block_idx, block_hash) in epoch_block_hashes.iter().enumerate() {
1268            let mut cfx_tx_index = 0;
1269
1270            let block = self
1271                .block_by_hash(block_hash, true /* update_cache */)
1272                .expect("block exists");
1273
1274            for (tx_idx, tx) in block.transactions.iter().enumerate() {
1275                let Receipt {
1276                    outcome_status,
1277                    logs,
1278                    ..
1279                } = epoch_receipts[block_idx].receipts.get(tx_idx).unwrap();
1280
1281                let rpc_index = match tx.space() {
1282                    Space::Native => {
1283                        let rpc_index = cfx_tx_index;
1284                        cfx_tx_index += 1;
1285                        rpc_index
1286                    }
1287                    Space::Ethereum
1288                        if *outcome_status != TransactionStatus::Skipped =>
1289                    {
1290                        let rpc_index = evm_tx_index;
1291                        evm_tx_index += 1;
1292                        rpc_index
1293                    }
1294                    _ => usize::MAX, // this will not be used
1295                };
1296
1297                let (phantom_txs, _) =
1298                    build_bloom_and_recover_phantom(logs, tx.hash());
1299
1300                match outcome_status {
1301                    TransactionStatus::Success | TransactionStatus::Failure => {
1302                        self.insert_transaction_index(
1303                            &tx.hash,
1304                            &TransactionIndex {
1305                                block_hash: *block_hash,
1306                                real_index: tx_idx,
1307                                is_phantom: false,
1308                                rpc_index: Some(rpc_index),
1309                            },
1310                        );
1311
1312                        for ptx in phantom_txs {
1313                            self.insert_transaction_index(
1314                                &ptx.into_eip155(evm_chain_id).hash(),
1315                                &TransactionIndex {
1316                                    block_hash: *block_hash,
1317                                    real_index: tx_idx,
1318                                    is_phantom: true,
1319                                    rpc_index: Some(evm_tx_index),
1320                                },
1321                            );
1322
1323                            evm_tx_index += 1;
1324                        }
1325
1326                        epoch_staking_events.extend(make_staking_events(logs));
1327                    }
1328                    _ => {}
1329                }
1330            }
1331        }
1332        if let Some(reward_execution_info) = reward_execution_info {
1333            for block in &reward_execution_info.epoch_blocks {
1334                let h = block.as_ref().hash();
1335                if self
1336                    .block_reward_result_by_hash_with_epoch(
1337                        &h, epoch_hash, true, true,
1338                    )
1339                    .is_none()
1340                {
1341                    return false;
1342                }
1343            }
1344        }
1345        let me_height = self.block_height_by_hash(epoch_hash).unwrap();
1346        if pos_verifier.pos_option().is_some() && me_height != 0 {
1347            // Check if stored pos reward is on pivot.
1348            let pivot_block_header = self
1349                .block_header_by_hash(epoch_hash)
1350                .expect("header exists");
1351            let maybe_parent_pos_ref = self
1352                .block_header_by_hash(&pivot_block_header.parent_hash()) // `None` only for genesis.
1353                .and_then(|parent| parent.pos_reference().clone());
1354            if pos_verifier.is_enabled_at_height(me_height)
1355                && maybe_parent_pos_ref.is_some()
1356                && maybe_parent_pos_ref != *pivot_block_header.pos_reference()
1357            {
1358                if let Some((pos_epoch, _)) = pos_verifier
1359                    .get_reward_distribution_event(
1360                        pivot_block_header.pos_reference().as_ref().unwrap(),
1361                        maybe_parent_pos_ref.as_ref().unwrap(),
1362                    )
1363                    .as_ref()
1364                    .and_then(|x| x.first())
1365                {
1366                    if let Some(pos_reward) =
1367                        self.pos_reward_by_pos_epoch(*pos_epoch)
1368                    {
1369                        if pos_reward.execution_epoch_hash != *epoch_hash {
1370                            // The stored pos reward is executed in another
1371                            // epoch, so we need to
1372                            // reexecute this epoch to restore the pos reward of
1373                            // the current pivot.
1374                            return false;
1375                        }
1376                    }
1377                }
1378            }
1379
1380            trace!(
1381                "staking events update: height={}, new={}",
1382                me_height,
1383                epoch_hash,
1384            );
1385            if let Err(e) = pos_verifier.consensus_db().put_staking_events(
1386                me_height,
1387                *epoch_hash,
1388                epoch_staking_events,
1389            ) {
1390                error!("epoch_executed err={:?}", e);
1391                return false;
1392            }
1393        }
1394
1395        true
1396    }
1397
1398    pub fn invalidate_block(&self, block_hash: H256) {
1399        // This block will never enter consensus graph, so
1400        // assign it a NULL sequence number.
1401        let block_info =
1402            LocalBlockInfo::new(BlockStatus::Invalid, NULLU64, NULLU64);
1403        self.db_manager
1404            .insert_local_block_info_to_db(&block_hash, &block_info);
1405        self.invalid_block_set.write().insert(block_hash);
1406    }
1407
1408    /// Check if a block is already marked as invalid.
1409    pub fn verified_invalid(
1410        &self, block_hash: &H256,
1411    ) -> (bool, Option<LocalBlockInfo>) {
1412        let invalid_block_set = self.invalid_block_set.upgradable_read();
1413        if invalid_block_set.contains(block_hash) {
1414            return (true, None);
1415        } else {
1416            if let Some(block_info) =
1417                self.db_manager.local_block_info_from_db(block_hash)
1418            {
1419                match block_info.get_status() {
1420                    BlockStatus::Invalid => {
1421                        RwLockUpgradableReadGuard::upgrade(invalid_block_set)
1422                            .insert(*block_hash);
1423                        return (true, Some(block_info));
1424                    }
1425                    _ => return (false, Some(block_info)),
1426                }
1427            } else {
1428                // No status on disk, so the block is not marked invalid before
1429                return (false, None);
1430            }
1431        }
1432    }
1433
1434    pub fn cached_block_count(&self) -> usize { self.blocks.read().len() }
1435
1436    /// Get current cache size.
1437    pub fn cache_size(&self) -> CacheSize {
1438        let malloc_ops = &mut new_malloc_size_ops();
1439        let block_headers = self.block_headers.read().size_of(malloc_ops);
1440        let blocks = self.blocks.read().size_of(malloc_ops);
1441        let compact_blocks = self.compact_blocks.read().size_of(malloc_ops);
1442        let block_receipts = self.block_receipts.read().size_of(malloc_ops);
1443        let block_rewards = self.block_rewards.read().size_of(malloc_ops);
1444        let block_traces = self.block_traces.read().size_of(malloc_ops);
1445        let transaction_indices =
1446            self.transaction_indices.read().size_of(malloc_ops);
1447        let hash_by_block_number =
1448            self.hash_by_block_number.read().size_of(malloc_ops);
1449        let local_block_infos =
1450            self.local_block_info.read().size_of(malloc_ops);
1451
1452        CacheSize {
1453            block_headers,
1454            blocks,
1455            block_receipts,
1456            block_rewards,
1457            block_traces,
1458            transaction_indices,
1459            compact_blocks,
1460            local_block_infos,
1461            hash_by_block_number,
1462        }
1463    }
1464
1465    fn block_cache_gc(&self) {
1466        let current_size = self.cache_size().total();
1467        let mut block_headers = self.block_headers.write();
1468        let mut blocks = self.blocks.write();
1469        let mut compact_blocks = self.compact_blocks.write();
1470        let mut executed_results = self.block_receipts.write();
1471        let mut reward_results = self.block_rewards.write();
1472        let mut block_traces = self.block_traces.write();
1473        let mut tx_indices = self.transaction_indices.write();
1474        let mut local_block_info = self.local_block_info.write();
1475        let mut blamed_header_verified_roots =
1476            self.blamed_header_verified_roots.write();
1477        let mut hash_by_block_number = self.hash_by_block_number.write();
1478        let mut cache_man = self.cache_man.lock();
1479
1480        debug!(
1481            "Before gc cache_size={} {} {} {} {} {} {} {} {} {} {}",
1482            current_size,
1483            block_headers.len(),
1484            blocks.len(),
1485            compact_blocks.len(),
1486            executed_results.len(),
1487            reward_results.len(),
1488            block_traces.len(),
1489            tx_indices.len(),
1490            local_block_info.len(),
1491            blamed_header_verified_roots.len(),
1492            hash_by_block_number.len(),
1493        );
1494
1495        cache_man.collect_garbage(current_size, |ids| {
1496            for id in &ids {
1497                match id {
1498                    CacheId::Block(h) => {
1499                        blocks.remove(h);
1500                    }
1501                    CacheId::BlockHeader(h) => {
1502                        block_headers.remove(h);
1503                    }
1504                    CacheId::CompactBlock(h) => {
1505                        compact_blocks.remove(h);
1506                    }
1507                    CacheId::BlockReceipts(h) => {
1508                        executed_results.remove(h);
1509                    }
1510                    CacheId::BlockRewards(h) => {
1511                        reward_results.remove(h);
1512                    }
1513                    CacheId::BlockTraces(h) => {
1514                        block_traces.remove(h);
1515                    }
1516                    CacheId::TransactionAddress(h) => {
1517                        tx_indices.remove(h);
1518                    }
1519                    CacheId::LocalBlockInfo(h) => {
1520                        local_block_info.remove(h);
1521                    }
1522                    CacheId::BlamedHeaderVerifiedRoots(h) => {
1523                        blamed_header_verified_roots.remove(h);
1524                    }
1525                    &CacheId::HashByBlockNumber(block_number) => {
1526                        hash_by_block_number.remove(&block_number);
1527                    }
1528                }
1529            }
1530
1531            let malloc_ops = &mut new_malloc_size_ops();
1532            block_headers.size_of(malloc_ops)
1533                + blocks.size_of(malloc_ops)
1534                + executed_results.size_of(malloc_ops)
1535                + reward_results.size_of(malloc_ops)
1536                + block_traces.size_of(malloc_ops)
1537                + tx_indices.size_of(malloc_ops)
1538                + compact_blocks.size_of(malloc_ops)
1539                + local_block_info.size_of(malloc_ops)
1540        });
1541
1542        block_headers.shrink_to_fit();
1543        blocks.shrink_to_fit();
1544        executed_results.shrink_to_fit();
1545        reward_results.shrink_to_fit();
1546        block_traces.shrink_to_fit();
1547        tx_indices.shrink_to_fit();
1548        compact_blocks.shrink_to_fit();
1549        local_block_info.shrink_to_fit();
1550    }
1551
1552    pub fn cache_gc(&self) { self.block_cache_gc(); }
1553
1554    pub fn set_cur_consensus_era_genesis_hash(
1555        &self, cur_era_hash: &H256, next_era_hash: &H256,
1556    ) {
1557        self.db_manager
1558            .insert_checkpoint_hashes_to_db(cur_era_hash, next_era_hash);
1559
1560        let mut era_hash = self.cur_consensus_era_genesis_hash.write();
1561        let mut stable_hash = self.cur_consensus_era_stable_hash.write();
1562        *era_hash = cur_era_hash.clone();
1563        *stable_hash = next_era_hash.clone();
1564    }
1565
1566    pub fn get_cur_consensus_era_genesis_hash(&self) -> H256 {
1567        self.cur_consensus_era_genesis_hash.read().clone()
1568    }
1569
1570    pub fn get_cur_consensus_era_stable_hash(&self) -> H256 {
1571        self.cur_consensus_era_stable_hash.read().clone()
1572    }
1573
1574    pub fn recover_unsigned_tx(
1575        &self, transactions: &Vec<TransactionWithSignature>,
1576    ) -> Result<Vec<Arc<SignedTransaction>>, DecoderError> {
1577        let _timer = MeterTimer::time_func(TX_POOL_RECOVER_TIMER.as_ref());
1578        // Return all transactions without checking if it's cached.
1579        self.tx_data_manager
1580            .recover_unsigned_tx_with_order(transactions)
1581    }
1582
1583    pub fn recover_block(&self, block: &mut Block) -> Result<(), DecoderError> {
1584        self.tx_data_manager.recover_block(block)
1585    }
1586
1587    pub fn recover_unsigned_tx_with_order(
1588        &self, transactions: &Vec<TransactionWithSignature>,
1589    ) -> Result<Vec<Arc<SignedTransaction>>, DecoderError> {
1590        self.tx_data_manager
1591            .recover_unsigned_tx_with_order(transactions)
1592    }
1593
1594    pub fn find_missing_tx_indices_encoded(
1595        &self, compact_block: &mut CompactBlock,
1596    ) -> Vec<usize> {
1597        self.tx_data_manager
1598            .find_missing_tx_indices_encoded(compact_block)
1599    }
1600
1601    /// Caller should make sure the state exists.
1602    pub fn get_state_readonly_index(
1603        &self, block_hash: &EpochId,
1604    ) -> Option<StateIndex> {
1605        let maybe_commitment =
1606            self.get_epoch_execution_commitment_with_db(block_hash);
1607        let maybe_state_index = match maybe_commitment {
1608            None => None,
1609            Some(execution_commitment) => Some(StateIndex::new_for_readonly(
1610                block_hash,
1611                &execution_commitment.state_root_with_aux_info,
1612            )),
1613        };
1614        maybe_state_index
1615    }
1616
1617    // TODO: There could be io error when getting block by hash.
1618    pub fn get_parent_epochs_for(
1619        &self, mut block: EpochId, mut count: u64,
1620    ) -> (EpochId, Vec<EpochId>) {
1621        let mut epochs_reverse_order = vec![];
1622        while count > 0 {
1623            debug!("getting parent for block {:?}", block);
1624            epochs_reverse_order.push(block);
1625            block = *self.block_header_by_hash(&block).unwrap().parent_hash();
1626            if block == NULL_EPOCH
1627                || block == *self.cur_consensus_era_genesis_hash.read()
1628            {
1629                break;
1630            }
1631            count -= 1;
1632        }
1633
1634        debug!("get_parent_epochs stopped at block {:?}", block);
1635        epochs_reverse_order.reverse();
1636        (block, epochs_reverse_order)
1637    }
1638
1639    pub fn get_snapshot_epoch_count(&self) -> u32 {
1640        self.storage_manager
1641            .get_storage_manager()
1642            .get_snapshot_epoch_count()
1643    }
1644
1645    pub fn get_snapshot_blame_plus_depth(&self) -> usize {
1646        // We need the extra + 1 to get a state root that points to the
1647        // snapshot we want.
1648        self.storage_manager
1649            .get_storage_manager()
1650            .get_snapshot_epoch_count() as usize
1651            + 1
1652    }
1653
1654    pub fn get_executed_state_root(&self, block_hash: &H256) -> Option<H256> {
1655        let maybe_commitment =
1656            self.get_epoch_execution_commitment(block_hash).take();
1657        if let Some(commitment) = maybe_commitment {
1658            Some(commitment.state_root_with_aux_info.aux_info.state_root_hash)
1659        } else {
1660            None
1661        }
1662    }
1663
1664    pub fn earliest_epoch_with_block_body(&self) -> u64 {
1665        match self.config.additional_maintained_block_body_epoch_count {
1666            Some(defer) => self.gc_progress.lock().gc_end - defer as u64,
1667            None => 0,
1668        }
1669    }
1670
1671    pub fn earliest_epoch_with_execution_result(&self) -> u64 {
1672        match self
1673            .config
1674            .additional_maintained_execution_result_epoch_count
1675        {
1676            Some(defer) => self.gc_progress.lock().gc_end - defer as u64,
1677            None => 0,
1678        }
1679    }
1680
1681    pub fn earliest_epoch_with_trace(&self) -> u64 {
1682        match self.config.additional_maintained_trace_epoch_count {
1683            Some(defer) => self.gc_progress.lock().gc_end - defer as u64,
1684            None => 0,
1685        }
1686    }
1687
1688    pub fn new_checkpoint(
1689        &self, new_checkpoint_height: u64, best_epoch_number: u64,
1690    ) {
1691        let mut gc_progress = self.gc_progress.lock();
1692        gc_progress.gc_end = new_checkpoint_height;
1693        gc_progress.last_consensus_best_epoch = best_epoch_number;
1694        gc_progress.expected_end_consensus_best_epoch = best_epoch_number
1695            + self.config.checkpoint_gc_time_in_epoch_count as u64;
1696    }
1697
1698    pub fn database_gc(&self, best_epoch: u64) {
1699        let maybe_range = self.gc_progress.lock().get_gc_base_range(best_epoch);
1700        debug!("Start database GC, range={:?}", maybe_range);
1701        if let Some((start, end)) = maybe_range {
1702            for base_epoch in start..end {
1703                self.gc_base_epoch(base_epoch);
1704            }
1705            let mut gc_progress = self.gc_progress.lock();
1706            gc_progress.last_consensus_best_epoch = best_epoch;
1707            gc_progress.next_to_process = end;
1708            self.db_manager.insert_gc_progress_to_db(end);
1709            debug!("Database GC progress: {:?}", gc_progress);
1710        }
1711    }
1712
1713    /// Garbage collect different types of data in the corresponding epoch based
1714    /// on `base_epoch` and the `additional_maintained*` parameters of these
1715    /// data types.
1716    fn gc_base_epoch(&self, base_epoch: u64) {
1717        // We must GC tx index before block body, otherwise we may be unable to
1718        // get the transactions in this epoch.
1719
1720        let gc_tx_index = || {
1721            let defer_epochs = match self
1722                .config
1723                .additional_maintained_transaction_index_epoch_count
1724            {
1725                Some(x) => x,
1726                None => return,
1727            };
1728            if base_epoch <= defer_epochs as u64 {
1729                return;
1730            }
1731            let epoch_to_remove = base_epoch - defer_epochs as u64;
1732            let epoch_blocks =
1733                match self.all_epoch_set_hashes_from_db(epoch_to_remove) {
1734                    None => {
1735                        warn!(
1736                            "GC epoch set is missing! epoch_to_remove: {}",
1737                            epoch_to_remove
1738                        );
1739                        return;
1740                    }
1741                    Some(epoch_blocks) => epoch_blocks,
1742                };
1743            // Store all packed transactions in a set first to
1744            // deduplicate transactions for database operations.
1745            let mut transaction_set = HashSet::new();
1746            for transactions in epoch_blocks
1747                .iter()
1748                .filter_map(|b| self.db_manager.block_body_from_db(&b))
1749            {
1750                transaction_set.extend(transactions.iter().map(|tx| tx.hash()));
1751            }
1752            let epoch_block_set: HashSet<H256> =
1753                epoch_blocks.into_iter().collect();
1754
1755            let should_remove = |tx_hash| {
1756                if self.config.strict_tx_index_gc {
1757                    // Check if this tx is actually executed in the
1758                    // processed epoch.
1759                    if let Some(tx_index) =
1760                        self.db_manager.transaction_index_from_db(&tx_hash)
1761                    {
1762                        epoch_block_set.contains(&tx_index.block_hash)
1763                    } else {
1764                        false
1765                    }
1766                } else {
1767                    true
1768                }
1769            };
1770            for tx in transaction_set {
1771                if should_remove(tx) {
1772                    self.db_manager.remove_transaction_index_from_db(&tx);
1773                }
1774            }
1775        };
1776
1777        gc_tx_index();
1778
1779        self.gc_epoch_with_defer(
1780            base_epoch,
1781            self.config.additional_maintained_block_body_epoch_count,
1782            |h| self.remove_block_body(h, true /* remove_db */),
1783        );
1784        self.gc_epoch_with_defer(
1785            base_epoch,
1786            self.config
1787                .additional_maintained_execution_result_epoch_count,
1788            |h| self.remove_block_result(h, true /* remove_db */),
1789        );
1790        self.gc_epoch_with_defer(
1791            base_epoch,
1792            self.config.additional_maintained_reward_epoch_count,
1793            |h| self.db_manager.remove_block_reward_result_from_db(h),
1794        );
1795        self.gc_epoch_with_defer(
1796            base_epoch,
1797            self.config.additional_maintained_trace_epoch_count,
1798            |h| self.db_manager.remove_block_trace_from_db(h),
1799        );
1800    }
1801
1802    fn gc_epoch_with_defer<F>(
1803        &self, epoch_number: u64, maybe_defer_epochs: Option<usize>, gc_func: F,
1804    ) where F: Fn(&H256) -> () {
1805        if let Some(defer_epochs) = maybe_defer_epochs {
1806            if epoch_number > defer_epochs as u64 {
1807                let epoch_to_remove = epoch_number - defer_epochs as u64;
1808                match self.all_epoch_set_hashes_from_db(epoch_to_remove) {
1809                    None => warn!(
1810                        "GC epoch set is missing! epoch_to_remove: {}",
1811                        epoch_to_remove
1812                    ),
1813                    Some(epoch_set) => {
1814                        for b in epoch_set {
1815                            gc_func(&b);
1816                        }
1817                    }
1818                }
1819            }
1820        }
1821    }
1822}
1823
1824#[derive(Copy, Clone)]
1825pub enum DbType {
1826    Rocksdb,
1827    Sqlite,
1828}
1829
1830pub struct DataManagerConfiguration {
1831    pub persist_tx_index: bool,
1832    pub persist_block_number_index: bool,
1833    pub tx_cache_index_maintain_timeout: Duration,
1834    pub db_type: DbType,
1835    pub additional_maintained_block_body_epoch_count: Option<usize>,
1836    pub additional_maintained_execution_result_epoch_count: Option<usize>,
1837    pub additional_maintained_reward_epoch_count: Option<usize>,
1838    pub additional_maintained_trace_epoch_count: Option<usize>,
1839    pub additional_maintained_transaction_index_epoch_count: Option<usize>,
1840    pub checkpoint_gc_time_in_epoch_count: usize,
1841    pub strict_tx_index_gc: bool,
1842}
1843
1844impl MallocSizeOf for DataManagerConfiguration {
1845    fn size_of(&self, _ops: &mut MallocSizeOfOps) -> usize { 0 }
1846}
1847
1848impl DataManagerConfiguration {
1849    pub fn new(
1850        persist_tx_index: bool, persist_block_number_index: bool,
1851        tx_cache_index_maintain_timeout: Duration, db_type: DbType,
1852    ) -> Self {
1853        Self {
1854            persist_tx_index,
1855            persist_block_number_index,
1856            tx_cache_index_maintain_timeout,
1857            db_type,
1858            additional_maintained_block_body_epoch_count: None,
1859            additional_maintained_execution_result_epoch_count: None,
1860            additional_maintained_reward_epoch_count: None,
1861            additional_maintained_trace_epoch_count: None,
1862            additional_maintained_transaction_index_epoch_count: None,
1863            checkpoint_gc_time_in_epoch_count: 1,
1864            strict_tx_index_gc: true,
1865        }
1866    }
1867}