cfxcore/block_data_manager/
db_manager.rs

1use crate::{
2    block_data_manager::{
3        db_decode_list, db_encode_list, BlamedHeaderVerifiedRoots,
4        BlockExecutionResultWithEpoch, BlockRewardResult, BlockTracesWithEpoch,
5        CheckpointHashes, DataVersionTuple, EpochExecutionContext,
6        LocalBlockInfo, PosRewardInfo,
7    },
8    db::{
9        COL_BLAMED_HEADER_VERIFIED_ROOTS, COL_BLOCKS, COL_BLOCK_TRACES,
10        COL_EPOCH_NUMBER, COL_HASH_BY_BLOCK_NUMBER, COL_MISC,
11        COL_REWARD_BY_POS_EPOCH, COL_TX_INDEX,
12    },
13    pow::PowComputer,
14    verification::VerificationConfig,
15};
16use byteorder::{ByteOrder, LittleEndian};
17use cfx_internal_common::{
18    DatabaseDecodable, DatabaseEncodable, EpochExecutionCommitment,
19};
20use cfx_storage::{
21    storage_db::KeyValueDbTrait, KvdbRocksdb, KvdbSqlite, KvdbSqliteStatements,
22};
23use cfx_types::H256;
24use db::SystemDB;
25use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
26use primitives::{Block, BlockHeader, SignedTransaction, TransactionIndex};
27use rlp::Rlp;
28use std::{collections::HashMap, fs, path::Path, sync::Arc};
29use strum::IntoEnumIterator;
30use strum_macros::EnumIter;
31
32const LOCAL_BLOCK_INFO_SUFFIX_BYTE: u8 = 1;
33const BLOCK_BODY_SUFFIX_BYTE: u8 = 2;
34const BLOCK_EXECUTION_RESULT_SUFFIX_BYTE: u8 = 3;
35const EPOCH_EXECUTION_CONTEXT_SUFFIX_BYTE: u8 = 4;
36const EPOCH_CONSENSUS_EXECUTION_INFO_SUFFIX_BYTE: u8 = 5;
37const EPOCH_EXECUTED_BLOCK_SET_SUFFIX_BYTE: u8 = 6;
38const EPOCH_SKIPPED_BLOCK_SET_SUFFIX_BYTE: u8 = 7;
39const BLOCK_REWARD_RESULT_SUFFIX_BYTE: u8 = 8;
40const BLOCK_TERMINAL_KEY: &[u8] = b"block_terminals";
41const GC_PROGRESS_KEY: &[u8] = b"gc_progress";
42
43#[derive(Clone, Copy, Hash, Ord, PartialOrd, Eq, PartialEq, EnumIter)]
44enum DBTable {
45    Misc,
46    Blocks,
47    Transactions,
48    EpochNumbers,
49    BlamedHeaderVerifiedRoots,
50    BlockTraces,
51    HashByBlockNumber,
52    RewardByPosEpoch,
53}
54
55fn rocks_db_col(table: DBTable) -> u32 {
56    match table {
57        DBTable::Misc => COL_MISC,
58        DBTable::Blocks => COL_BLOCKS,
59        DBTable::Transactions => COL_TX_INDEX,
60        DBTable::EpochNumbers => COL_EPOCH_NUMBER,
61        DBTable::BlamedHeaderVerifiedRoots => COL_BLAMED_HEADER_VERIFIED_ROOTS,
62        DBTable::BlockTraces => COL_BLOCK_TRACES,
63        DBTable::HashByBlockNumber => COL_HASH_BY_BLOCK_NUMBER,
64        DBTable::RewardByPosEpoch => COL_REWARD_BY_POS_EPOCH,
65    }
66}
67
68fn sqlite_db_table(table: DBTable) -> String {
69    match table {
70        DBTable::Misc => "misc",
71        DBTable::Blocks => "blocks",
72        DBTable::Transactions => "transactions",
73        DBTable::EpochNumbers => "epoch_numbers",
74        DBTable::BlamedHeaderVerifiedRoots => "blamed_header_verified_roots",
75        DBTable::BlockTraces => "block_traces",
76        DBTable::HashByBlockNumber => "hash_by_block_number",
77        DBTable::RewardByPosEpoch => "reward_by_pos_epoch",
78    }
79    .into()
80}
81
82pub struct DBManager {
83    table_db: HashMap<DBTable, Box<dyn KeyValueDbTrait<ValueType = Box<[u8]>>>>,
84    pow: Arc<PowComputer>,
85}
86
87impl DBManager {
88    pub fn new_from_rocksdb(db: Arc<SystemDB>, pow: Arc<PowComputer>) -> Self {
89        let mut table_db = HashMap::new();
90
91        for table in DBTable::iter() {
92            table_db.insert(
93                table,
94                Box::new(KvdbRocksdb {
95                    kvdb: db.key_value().clone(),
96                    col: rocks_db_col(table),
97                })
98                    as Box<dyn KeyValueDbTrait<ValueType = Box<[u8]>>>,
99            );
100        }
101        Self { table_db, pow }
102    }
103}
104
105impl DBManager {
106    pub fn new_from_sqlite(db_path: &Path, pow: Arc<PowComputer>) -> Self {
107        if let Err(e) = fs::create_dir_all(db_path) {
108            panic!("Error creating database directory: {:?}", e);
109        }
110        let mut table_db = HashMap::new();
111        for table in DBTable::iter() {
112            let table_str = sqlite_db_table(table);
113            let (_, sqlite_db) = KvdbSqlite::open_or_create(
114                &db_path.join(table_str.as_str()), /* Use separate database
115                                                    * for different table */
116                Arc::new(
117                    KvdbSqliteStatements::make_statements(
118                        &[&"value"],
119                        &[&"BLOB"],
120                        table_str.as_str(),
121                        false,
122                    )
123                    .unwrap(),
124                ),
125                false, /* unsafe_mode */
126            )
127            .expect("Open sqlite failure");
128            table_db.insert(
129                table,
130                Box::new(sqlite_db)
131                    as Box<dyn KeyValueDbTrait<ValueType = Box<[u8]>>>,
132            );
133        }
134        Self { table_db, pow }
135    }
136}
137
138impl DBManager {
139    pub fn insert_block_traces_to_db(
140        &self, block_hash: &H256, block_traces: &BlockTracesWithEpoch,
141    ) {
142        self.insert_encodable_val(
143            DBTable::BlockTraces,
144            block_hash.as_bytes(),
145            block_traces,
146        );
147    }
148
149    pub fn block_traces_from_db(
150        &self, block_hash: &H256,
151    ) -> Option<BlockTracesWithEpoch> {
152        let block_traces = self
153            .load_decodable_val(DBTable::BlockTraces, block_hash.as_bytes())?;
154        Some(block_traces)
155    }
156
157    /// TODO Use new_with_rlp_size
158    pub fn block_from_db(&self, block_hash: &H256) -> Option<Block> {
159        Some(Block::new(
160            self.block_header_from_db(block_hash)?,
161            self.block_body_from_db(block_hash)?,
162        ))
163    }
164
165    pub fn insert_block_header_to_db(&self, header: &BlockHeader) {
166        self.insert_encodable_val(
167            DBTable::Blocks,
168            header.hash().as_bytes(),
169            header,
170        );
171    }
172
173    pub fn block_header_from_db(&self, hash: &H256) -> Option<BlockHeader> {
174        let mut block_header =
175            self.load_decodable_val(DBTable::Blocks, hash.as_bytes())?;
176        VerificationConfig::get_or_fill_header_pow_quality(
177            &self.pow,
178            &mut block_header,
179        );
180        Some(block_header)
181    }
182
183    pub fn remove_block_header_from_db(&self, hash: &H256) {
184        self.remove_from_db(DBTable::Blocks, hash.as_bytes());
185    }
186
187    pub fn insert_transaction_index_to_db(
188        &self, hash: &H256, value: &TransactionIndex,
189    ) {
190        self.insert_encodable_val(DBTable::Transactions, hash.as_bytes(), value)
191    }
192
193    pub fn transaction_index_from_db(
194        &self, hash: &H256,
195    ) -> Option<TransactionIndex> {
196        self.load_decodable_val(DBTable::Transactions, hash.as_bytes())
197    }
198
199    pub fn insert_hash_by_block_number_to_db(
200        &self, block_number: u64, hash: &H256,
201    ) {
202        self.insert_encodable_val(
203            DBTable::HashByBlockNumber,
204            &block_number.to_be_bytes(),
205            hash,
206        )
207    }
208
209    pub fn hash_by_block_number_from_db(
210        &self, block_number: &u64,
211    ) -> Option<H256> {
212        self.load_decodable_val(
213            DBTable::HashByBlockNumber,
214            &block_number.to_be_bytes(),
215        )
216    }
217
218    /// Store block info to db. Block info includes block status and
219    /// the sequence number when the block enters consensus graph.
220    /// The db key is the block hash plus one extra byte, so we can get better
221    /// data locality if we get both a block and its info from db.
222    /// The info is not a part of the block because the block is inserted
223    /// before we know its info, and we do not want to insert a large chunk
224    /// again. TODO Maybe we can use in-place modification (operator `merge`
225    /// in rocksdb) to keep the info together with the block.
226    pub fn insert_local_block_info_to_db(
227        &self, block_hash: &H256, value: &LocalBlockInfo,
228    ) {
229        self.insert_encodable_val(
230            DBTable::Blocks,
231            &local_block_info_key(block_hash),
232            value,
233        );
234    }
235
236    /// Get block info from db.
237    pub fn local_block_info_from_db(
238        &self, block_hash: &H256,
239    ) -> Option<LocalBlockInfo> {
240        self.load_decodable_val(
241            DBTable::Blocks,
242            &local_block_info_key(block_hash),
243        )
244    }
245
246    pub fn insert_blamed_header_verified_roots_to_db(
247        &self, block_height: u64, value: &BlamedHeaderVerifiedRoots,
248    ) {
249        self.insert_encodable_val(
250            DBTable::BlamedHeaderVerifiedRoots,
251            &blamed_header_verified_roots_key(block_height),
252            value,
253        );
254    }
255
256    /// Get correct roots of blamed headers from db.
257    /// These are maintained on light nodes only.
258    pub fn blamed_header_verified_roots_from_db(
259        &self, block_height: u64,
260    ) -> Option<BlamedHeaderVerifiedRoots> {
261        self.load_decodable_val(
262            DBTable::BlamedHeaderVerifiedRoots,
263            &blamed_header_verified_roots_key(block_height),
264        )
265    }
266
267    pub fn remove_blamed_header_verified_roots_from_db(
268        &self, block_height: u64,
269    ) {
270        self.remove_from_db(
271            DBTable::BlamedHeaderVerifiedRoots,
272            &blamed_header_verified_roots_key(block_height),
273        )
274    }
275
276    pub fn insert_block_body_to_db(&self, block: &Block) {
277        self.insert_to_db(
278            DBTable::Blocks,
279            &block_body_key(&block.hash()),
280            block.encode_body_with_tx_public(),
281        )
282    }
283
284    pub fn block_body_from_db(
285        &self, hash: &H256,
286    ) -> Option<Vec<Arc<SignedTransaction>>> {
287        let encoded =
288            self.load_from_db(DBTable::Blocks, &block_body_key(hash))?;
289        let rlp = Rlp::new(&encoded);
290        Some(
291            Block::decode_body_with_tx_public(&rlp)
292                .expect("Wrong block rlp format!"),
293        )
294    }
295
296    pub fn remove_block_body_from_db(&self, hash: &H256) {
297        self.remove_from_db(DBTable::Blocks, &block_body_key(hash))
298    }
299
300    pub fn insert_block_execution_result_to_db(
301        &self, hash: &H256, value: &BlockExecutionResultWithEpoch,
302    ) {
303        self.insert_encodable_val(
304            DBTable::Blocks,
305            &block_execution_result_key(hash),
306            value,
307        )
308    }
309
310    pub fn insert_block_reward_result_to_db(
311        &self, hash: &H256, value: &DataVersionTuple<H256, BlockRewardResult>,
312    ) {
313        self.insert_encodable_val(
314            DBTable::Blocks,
315            &block_reward_result_key(hash),
316            value,
317        )
318    }
319
320    pub fn block_execution_result_from_db(
321        &self, hash: &H256,
322    ) -> Option<BlockExecutionResultWithEpoch> {
323        self.load_decodable_val(
324            DBTable::Blocks,
325            &block_execution_result_key(hash),
326        )
327    }
328
329    pub fn block_reward_result_from_db(
330        &self, hash: &H256,
331    ) -> Option<DataVersionTuple<H256, BlockRewardResult>> {
332        self.load_might_decodable_val(
333            DBTable::Blocks,
334            &block_reward_result_key(hash),
335        )
336    }
337
338    pub fn remove_block_execution_result_from_db(&self, hash: &H256) {
339        self.remove_from_db(DBTable::Blocks, &block_execution_result_key(hash))
340    }
341
342    pub fn remove_block_reward_result_from_db(&self, hash: &H256) {
343        self.remove_from_db(DBTable::Blocks, &block_reward_result_key(hash))
344    }
345
346    pub fn remove_block_trace_from_db(&self, hash: &H256) {
347        self.remove_from_db(DBTable::BlockTraces, hash.as_bytes())
348    }
349
350    pub fn remove_transaction_index_from_db(&self, hash: &H256) {
351        self.remove_from_db(DBTable::Transactions, hash.as_bytes())
352    }
353
354    pub fn insert_checkpoint_hashes_to_db(
355        &self, checkpoint_prev: &H256, checkpoint_cur: &H256,
356    ) {
357        self.insert_encodable_val(
358            DBTable::Misc,
359            b"checkpoint",
360            &CheckpointHashes::new(*checkpoint_prev, *checkpoint_cur),
361        );
362    }
363
364    pub fn checkpoint_hashes_from_db(&self) -> Option<(H256, H256)> {
365        let checkpoints: CheckpointHashes =
366            self.load_decodable_val(DBTable::Misc, b"checkpoint")?;
367        Some((checkpoints.prev_hash, checkpoints.cur_hash))
368    }
369
370    pub fn insert_executed_epoch_set_hashes_to_db(
371        &self, epoch: u64, executed_hashes: &Vec<H256>,
372    ) {
373        self.insert_encodable_list(
374            DBTable::EpochNumbers,
375            &executed_epoch_set_key(epoch)[0..9],
376            executed_hashes,
377        );
378    }
379
380    pub fn insert_skipped_epoch_set_hashes_to_db(
381        &self, epoch: u64, skipped_hashes: &Vec<H256>,
382    ) {
383        self.insert_encodable_list(
384            DBTable::EpochNumbers,
385            &skipped_epoch_set_key(epoch)[0..9],
386            skipped_hashes,
387        );
388    }
389
390    pub fn executed_epoch_set_hashes_from_db(
391        &self, epoch: u64,
392    ) -> Option<Vec<H256>> {
393        self.load_decodable_list(
394            DBTable::EpochNumbers,
395            &executed_epoch_set_key(epoch)[0..9],
396        )
397    }
398
399    pub fn skipped_epoch_set_hashes_from_db(
400        &self, epoch: u64,
401    ) -> Option<Vec<H256>> {
402        self.load_decodable_list(
403            DBTable::EpochNumbers,
404            &skipped_epoch_set_key(epoch)[0..9],
405        )
406    }
407
408    pub fn insert_terminals_to_db(&self, terminals: &Vec<H256>) {
409        self.insert_encodable_list(
410            DBTable::Misc,
411            BLOCK_TERMINAL_KEY,
412            terminals,
413        );
414    }
415
416    pub fn terminals_from_db(&self) -> Option<Vec<H256>> {
417        self.load_decodable_list(DBTable::Misc, BLOCK_TERMINAL_KEY)
418    }
419
420    pub fn insert_epoch_execution_commitment_to_db(
421        &self, hash: &H256, ctx: &EpochExecutionCommitment,
422    ) {
423        self.insert_encodable_val(
424            DBTable::Blocks,
425            &epoch_consensus_epoch_execution_commitment_key(hash),
426            ctx,
427        );
428    }
429
430    pub fn epoch_execution_commitment_from_db(
431        &self, hash: &H256,
432    ) -> Option<EpochExecutionCommitment> {
433        self.load_decodable_val(
434            DBTable::Blocks,
435            &epoch_consensus_epoch_execution_commitment_key(hash),
436        )
437    }
438
439    pub fn remove_epoch_execution_commitment_from_db(&self, hash: &H256) {
440        self.remove_from_db(
441            DBTable::Blocks,
442            &epoch_consensus_epoch_execution_commitment_key(hash),
443        );
444    }
445
446    pub fn insert_instance_id_to_db(&self, instance_id: u64) {
447        self.insert_encodable_val(DBTable::Misc, b"instance", &instance_id);
448    }
449
450    pub fn instance_id_from_db(&self) -> Option<u64> {
451        self.load_decodable_val(DBTable::Misc, b"instance")
452    }
453
454    pub fn insert_execution_context_to_db(
455        &self, hash: &H256, ctx: &EpochExecutionContext,
456    ) {
457        self.insert_encodable_val(
458            DBTable::Blocks,
459            &epoch_execution_context_key(hash),
460            ctx,
461        )
462    }
463
464    pub fn execution_context_from_db(
465        &self, hash: &H256,
466    ) -> Option<EpochExecutionContext> {
467        self.load_decodable_val(
468            DBTable::Blocks,
469            &epoch_execution_context_key(hash),
470        )
471    }
472
473    pub fn remove_epoch_execution_context_from_db(&self, hash: &H256) {
474        self.remove_from_db(DBTable::Blocks, &epoch_execution_context_key(hash))
475    }
476
477    pub fn insert_gc_progress_to_db(&self, next_to_process: u64) {
478        self.insert_encodable_val(
479            DBTable::Misc,
480            GC_PROGRESS_KEY,
481            &next_to_process,
482        );
483    }
484
485    pub fn gc_progress_from_db(&self) -> Option<u64> {
486        self.load_decodable_val(DBTable::Misc, GC_PROGRESS_KEY)
487    }
488
489    pub fn insert_pos_reward(
490        &self, pos_epoch: u64, pos_reward: &PosRewardInfo,
491    ) {
492        self.insert_encodable_val(
493            DBTable::RewardByPosEpoch,
494            &pos_epoch.to_be_bytes(),
495            pos_reward,
496        );
497    }
498
499    pub fn pos_reward_by_pos_epoch(
500        &self, pos_epoch: u64,
501    ) -> Option<PosRewardInfo> {
502        self.load_decodable_val(
503            DBTable::RewardByPosEpoch,
504            &pos_epoch.to_be_bytes(),
505        )
506    }
507
508    /// The functions below are private utils used by the DBManager to access
509    /// database
510    fn insert_to_db(&self, table: DBTable, db_key: &[u8], value: Vec<u8>) {
511        self.table_db
512            .get(&table)
513            .unwrap()
514            .put(db_key, &value)
515            .expect("db insertion failure");
516    }
517
518    fn remove_from_db(&self, table: DBTable, db_key: &[u8]) {
519        self.table_db
520            .get(&table)
521            .unwrap()
522            .delete(db_key)
523            .expect("db removal failure");
524    }
525
526    fn load_from_db(&self, table: DBTable, db_key: &[u8]) -> Option<Box<[u8]>> {
527        self.table_db
528            .get(&table)
529            .unwrap()
530            .get(db_key)
531            .expect("db read failure")
532    }
533
534    fn insert_encodable_val<V>(
535        &self, table: DBTable, db_key: &[u8], value: &V,
536    ) where V: DatabaseEncodable {
537        self.insert_to_db(table, db_key, value.db_encode())
538    }
539
540    fn insert_encodable_list<V>(
541        &self, table: DBTable, db_key: &[u8], value: &Vec<V>,
542    ) where V: DatabaseEncodable {
543        self.insert_to_db(table, db_key, db_encode_list(value))
544    }
545
546    fn load_decodable_val<V>(
547        &self, table: DBTable, db_key: &[u8],
548    ) -> Option<V>
549    where V: DatabaseDecodable {
550        let encoded = self.load_from_db(table, db_key)?;
551        Some(V::db_decode(&encoded).expect("decode succeeds"))
552    }
553
554    fn load_might_decodable_val<V>(
555        &self, table: DBTable, db_key: &[u8],
556    ) -> Option<V>
557    where V: DatabaseDecodable {
558        let encoded = self.load_from_db(table, db_key)?;
559        V::db_decode(&encoded).ok()
560    }
561
562    fn load_decodable_list<V>(
563        &self, table: DBTable, db_key: &[u8],
564    ) -> Option<Vec<V>>
565    where V: DatabaseDecodable {
566        let encoded = self.load_from_db(table, db_key)?;
567        Some(db_decode_list(&encoded).expect("decode succeeds"))
568    }
569}
570
571fn append_suffix(h: &H256, suffix: u8) -> Vec<u8> {
572    let mut key = Vec::with_capacity(H256::len_bytes() + 1);
573    key.extend_from_slice(h.as_bytes());
574    key.push(suffix);
575    key
576}
577
578fn local_block_info_key(block_hash: &H256) -> Vec<u8> {
579    append_suffix(block_hash, LOCAL_BLOCK_INFO_SUFFIX_BYTE)
580}
581
582fn blamed_header_verified_roots_key(block_height: u64) -> [u8; 8] {
583    let mut height_key = [0; 8];
584    LittleEndian::write_u64(&mut height_key[0..8], block_height);
585    height_key
586}
587
588fn block_body_key(block_hash: &H256) -> Vec<u8> {
589    append_suffix(block_hash, BLOCK_BODY_SUFFIX_BYTE)
590}
591
592fn executed_epoch_set_key(epoch_number: u64) -> [u8; 9] {
593    let mut epoch_key = [0; 9];
594    LittleEndian::write_u64(&mut epoch_key[0..8], epoch_number);
595    epoch_key[8] = EPOCH_EXECUTED_BLOCK_SET_SUFFIX_BYTE;
596    epoch_key
597}
598
599fn skipped_epoch_set_key(epoch_number: u64) -> [u8; 9] {
600    let mut epoch_key = [0; 9];
601    LittleEndian::write_u64(&mut epoch_key[0..8], epoch_number);
602    epoch_key[8] = EPOCH_SKIPPED_BLOCK_SET_SUFFIX_BYTE;
603    epoch_key
604}
605
606fn block_execution_result_key(hash: &H256) -> Vec<u8> {
607    append_suffix(hash, BLOCK_EXECUTION_RESULT_SUFFIX_BYTE)
608}
609
610fn block_reward_result_key(hash: &H256) -> Vec<u8> {
611    append_suffix(hash, BLOCK_REWARD_RESULT_SUFFIX_BYTE)
612}
613
614fn epoch_execution_context_key(hash: &H256) -> Vec<u8> {
615    append_suffix(hash, EPOCH_EXECUTION_CONTEXT_SUFFIX_BYTE)
616}
617
618fn epoch_consensus_epoch_execution_commitment_key(hash: &H256) -> Vec<u8> {
619    append_suffix(hash, EPOCH_CONSENSUS_EXECUTION_INFO_SUFFIX_BYTE)
620}
621
622impl MallocSizeOf for DBManager {
623    fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
624        // Here we only handle the case that all columns are stored within the
625        // same rocksdb.
626        self.table_db
627            .get(&DBTable::Blocks)
628            .expect("DBManager initialized")
629            .size_of(ops)
630    }
631}