use crate::{
block_data_manager::{
db_decode_list, db_encode_list, BlamedHeaderVerifiedRoots,
BlockExecutionResultWithEpoch, BlockRewardResult, BlockTracesWithEpoch,
CheckpointHashes, DataVersionTuple, EpochExecutionContext,
LocalBlockInfo, PosRewardInfo,
},
db::{
COL_BLAMED_HEADER_VERIFIED_ROOTS, COL_BLOCKS, COL_BLOCK_TRACES,
COL_EPOCH_NUMBER, COL_HASH_BY_BLOCK_NUMBER, COL_MISC,
COL_REWARD_BY_POS_EPOCH, COL_TX_INDEX,
},
pow::PowComputer,
verification::VerificationConfig,
};
use byteorder::{ByteOrder, LittleEndian};
use cfx_internal_common::{
DatabaseDecodable, DatabaseEncodable, EpochExecutionCommitment,
};
use cfx_storage::{
storage_db::KeyValueDbTrait, KvdbRocksdb, KvdbSqlite, KvdbSqliteStatements,
};
use cfx_types::H256;
use db::SystemDB;
use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
use primitives::{Block, BlockHeader, SignedTransaction, TransactionIndex};
use rlp::Rlp;
use std::{collections::HashMap, fs, path::Path, sync::Arc};
use strum::IntoEnumIterator;
use strum_macros::EnumIter;
const LOCAL_BLOCK_INFO_SUFFIX_BYTE: u8 = 1;
const BLOCK_BODY_SUFFIX_BYTE: u8 = 2;
const BLOCK_EXECUTION_RESULT_SUFFIX_BYTE: u8 = 3;
const EPOCH_EXECUTION_CONTEXT_SUFFIX_BYTE: u8 = 4;
const EPOCH_CONSENSUS_EXECUTION_INFO_SUFFIX_BYTE: u8 = 5;
const EPOCH_EXECUTED_BLOCK_SET_SUFFIX_BYTE: u8 = 6;
const EPOCH_SKIPPED_BLOCK_SET_SUFFIX_BYTE: u8 = 7;
const BLOCK_REWARD_RESULT_SUFFIX_BYTE: u8 = 8;
const BLOCK_TERMINAL_KEY: &[u8] = b"block_terminals";
const GC_PROGRESS_KEY: &[u8] = b"gc_progress";
#[derive(Clone, Copy, Hash, Ord, PartialOrd, Eq, PartialEq, EnumIter)]
enum DBTable {
Misc,
Blocks,
Transactions,
EpochNumbers,
BlamedHeaderVerifiedRoots,
BlockTraces,
HashByBlockNumber,
RewardByPosEpoch,
}
fn rocks_db_col(table: DBTable) -> u32 {
match table {
DBTable::Misc => COL_MISC,
DBTable::Blocks => COL_BLOCKS,
DBTable::Transactions => COL_TX_INDEX,
DBTable::EpochNumbers => COL_EPOCH_NUMBER,
DBTable::BlamedHeaderVerifiedRoots => COL_BLAMED_HEADER_VERIFIED_ROOTS,
DBTable::BlockTraces => COL_BLOCK_TRACES,
DBTable::HashByBlockNumber => COL_HASH_BY_BLOCK_NUMBER,
DBTable::RewardByPosEpoch => COL_REWARD_BY_POS_EPOCH,
}
}
fn sqlite_db_table(table: DBTable) -> String {
match table {
DBTable::Misc => "misc",
DBTable::Blocks => "blocks",
DBTable::Transactions => "transactions",
DBTable::EpochNumbers => "epoch_numbers",
DBTable::BlamedHeaderVerifiedRoots => "blamed_header_verified_roots",
DBTable::BlockTraces => "block_traces",
DBTable::HashByBlockNumber => "hash_by_block_number",
DBTable::RewardByPosEpoch => "reward_by_pos_epoch",
}
.into()
}
pub struct DBManager {
table_db: HashMap<DBTable, Box<dyn KeyValueDbTrait<ValueType = Box<[u8]>>>>,
pow: Arc<PowComputer>,
}
impl DBManager {
pub fn new_from_rocksdb(db: Arc<SystemDB>, pow: Arc<PowComputer>) -> Self {
let mut table_db = HashMap::new();
for table in DBTable::iter() {
table_db.insert(
table,
Box::new(KvdbRocksdb {
kvdb: db.key_value().clone(),
col: rocks_db_col(table),
})
as Box<dyn KeyValueDbTrait<ValueType = Box<[u8]>>>,
);
}
Self { table_db, pow }
}
}
impl DBManager {
pub fn new_from_sqlite(db_path: &Path, pow: Arc<PowComputer>) -> Self {
if let Err(e) = fs::create_dir_all(db_path) {
panic!("Error creating database directory: {:?}", e);
}
let mut table_db = HashMap::new();
for table in DBTable::iter() {
let table_str = sqlite_db_table(table);
let (_, sqlite_db) = KvdbSqlite::open_or_create(
&db_path.join(table_str.as_str()), Arc::new(
KvdbSqliteStatements::make_statements(
&[&"value"],
&[&"BLOB"],
table_str.as_str(),
false,
)
.unwrap(),
),
false, )
.expect("Open sqlite failure");
table_db.insert(
table,
Box::new(sqlite_db)
as Box<dyn KeyValueDbTrait<ValueType = Box<[u8]>>>,
);
}
Self { table_db, pow }
}
}
impl DBManager {
pub fn insert_block_traces_to_db(
&self, block_hash: &H256, block_traces: &BlockTracesWithEpoch,
) {
self.insert_encodable_val(
DBTable::BlockTraces,
block_hash.as_bytes(),
block_traces,
);
}
pub fn block_traces_from_db(
&self, block_hash: &H256,
) -> Option<BlockTracesWithEpoch> {
let block_traces = self
.load_decodable_val(DBTable::BlockTraces, block_hash.as_bytes())?;
Some(block_traces)
}
pub fn block_from_db(&self, block_hash: &H256) -> Option<Block> {
Some(Block::new(
self.block_header_from_db(block_hash)?,
self.block_body_from_db(block_hash)?,
))
}
pub fn insert_block_header_to_db(&self, header: &BlockHeader) {
self.insert_encodable_val(
DBTable::Blocks,
header.hash().as_bytes(),
header,
);
}
pub fn block_header_from_db(&self, hash: &H256) -> Option<BlockHeader> {
let mut block_header =
self.load_decodable_val(DBTable::Blocks, hash.as_bytes())?;
VerificationConfig::get_or_fill_header_pow_quality(
&self.pow,
&mut block_header,
);
Some(block_header)
}
pub fn remove_block_header_from_db(&self, hash: &H256) {
self.remove_from_db(DBTable::Blocks, hash.as_bytes());
}
pub fn insert_transaction_index_to_db(
&self, hash: &H256, value: &TransactionIndex,
) {
self.insert_encodable_val(DBTable::Transactions, hash.as_bytes(), value)
}
pub fn transaction_index_from_db(
&self, hash: &H256,
) -> Option<TransactionIndex> {
self.load_decodable_val(DBTable::Transactions, hash.as_bytes())
}
pub fn insert_hash_by_block_number_to_db(
&self, block_number: u64, hash: &H256,
) {
self.insert_encodable_val(
DBTable::HashByBlockNumber,
&block_number.to_be_bytes(),
hash,
)
}
pub fn hash_by_block_number_from_db(
&self, block_number: &u64,
) -> Option<H256> {
self.load_decodable_val(
DBTable::HashByBlockNumber,
&block_number.to_be_bytes(),
)
}
pub fn insert_local_block_info_to_db(
&self, block_hash: &H256, value: &LocalBlockInfo,
) {
self.insert_encodable_val(
DBTable::Blocks,
&local_block_info_key(block_hash),
value,
);
}
pub fn local_block_info_from_db(
&self, block_hash: &H256,
) -> Option<LocalBlockInfo> {
self.load_decodable_val(
DBTable::Blocks,
&local_block_info_key(block_hash),
)
}
pub fn insert_blamed_header_verified_roots_to_db(
&self, block_height: u64, value: &BlamedHeaderVerifiedRoots,
) {
self.insert_encodable_val(
DBTable::BlamedHeaderVerifiedRoots,
&blamed_header_verified_roots_key(block_height),
value,
);
}
pub fn blamed_header_verified_roots_from_db(
&self, block_height: u64,
) -> Option<BlamedHeaderVerifiedRoots> {
self.load_decodable_val(
DBTable::BlamedHeaderVerifiedRoots,
&blamed_header_verified_roots_key(block_height),
)
}
pub fn remove_blamed_header_verified_roots_from_db(
&self, block_height: u64,
) {
self.remove_from_db(
DBTable::BlamedHeaderVerifiedRoots,
&blamed_header_verified_roots_key(block_height),
)
}
pub fn insert_block_body_to_db(&self, block: &Block) {
self.insert_to_db(
DBTable::Blocks,
&block_body_key(&block.hash()),
block.encode_body_with_tx_public(),
)
}
pub fn block_body_from_db(
&self, hash: &H256,
) -> Option<Vec<Arc<SignedTransaction>>> {
let encoded =
self.load_from_db(DBTable::Blocks, &block_body_key(hash))?;
let rlp = Rlp::new(&encoded);
Some(
Block::decode_body_with_tx_public(&rlp)
.expect("Wrong block rlp format!"),
)
}
pub fn remove_block_body_from_db(&self, hash: &H256) {
self.remove_from_db(DBTable::Blocks, &block_body_key(hash))
}
pub fn insert_block_execution_result_to_db(
&self, hash: &H256, value: &BlockExecutionResultWithEpoch,
) {
self.insert_encodable_val(
DBTable::Blocks,
&block_execution_result_key(hash),
value,
)
}
pub fn insert_block_reward_result_to_db(
&self, hash: &H256, value: &DataVersionTuple<H256, BlockRewardResult>,
) {
self.insert_encodable_val(
DBTable::Blocks,
&block_reward_result_key(hash),
value,
)
}
pub fn block_execution_result_from_db(
&self, hash: &H256,
) -> Option<BlockExecutionResultWithEpoch> {
self.load_decodable_val(
DBTable::Blocks,
&block_execution_result_key(hash),
)
}
pub fn block_reward_result_from_db(
&self, hash: &H256,
) -> Option<DataVersionTuple<H256, BlockRewardResult>> {
self.load_might_decodable_val(
DBTable::Blocks,
&block_reward_result_key(hash),
)
}
pub fn remove_block_execution_result_from_db(&self, hash: &H256) {
self.remove_from_db(DBTable::Blocks, &block_execution_result_key(hash))
}
pub fn remove_block_reward_result_from_db(&self, hash: &H256) {
self.remove_from_db(DBTable::Blocks, &block_reward_result_key(hash))
}
pub fn remove_block_trace_from_db(&self, hash: &H256) {
self.remove_from_db(DBTable::BlockTraces, hash.as_bytes())
}
pub fn remove_transaction_index_from_db(&self, hash: &H256) {
self.remove_from_db(DBTable::Transactions, hash.as_bytes())
}
pub fn insert_checkpoint_hashes_to_db(
&self, checkpoint_prev: &H256, checkpoint_cur: &H256,
) {
self.insert_encodable_val(
DBTable::Misc,
b"checkpoint",
&CheckpointHashes::new(*checkpoint_prev, *checkpoint_cur),
);
}
pub fn checkpoint_hashes_from_db(&self) -> Option<(H256, H256)> {
let checkpoints: CheckpointHashes =
self.load_decodable_val(DBTable::Misc, b"checkpoint")?;
Some((checkpoints.prev_hash, checkpoints.cur_hash))
}
pub fn insert_executed_epoch_set_hashes_to_db(
&self, epoch: u64, executed_hashes: &Vec<H256>,
) {
self.insert_encodable_list(
DBTable::EpochNumbers,
&executed_epoch_set_key(epoch)[0..9],
executed_hashes,
);
}
pub fn insert_skipped_epoch_set_hashes_to_db(
&self, epoch: u64, skipped_hashes: &Vec<H256>,
) {
self.insert_encodable_list(
DBTable::EpochNumbers,
&skipped_epoch_set_key(epoch)[0..9],
skipped_hashes,
);
}
pub fn executed_epoch_set_hashes_from_db(
&self, epoch: u64,
) -> Option<Vec<H256>> {
self.load_decodable_list(
DBTable::EpochNumbers,
&executed_epoch_set_key(epoch)[0..9],
)
}
pub fn skipped_epoch_set_hashes_from_db(
&self, epoch: u64,
) -> Option<Vec<H256>> {
self.load_decodable_list(
DBTable::EpochNumbers,
&skipped_epoch_set_key(epoch)[0..9],
)
}
pub fn insert_terminals_to_db(&self, terminals: &Vec<H256>) {
self.insert_encodable_list(
DBTable::Misc,
BLOCK_TERMINAL_KEY,
terminals,
);
}
pub fn terminals_from_db(&self) -> Option<Vec<H256>> {
self.load_decodable_list(DBTable::Misc, BLOCK_TERMINAL_KEY)
}
pub fn insert_epoch_execution_commitment_to_db(
&self, hash: &H256, ctx: &EpochExecutionCommitment,
) {
self.insert_encodable_val(
DBTable::Blocks,
&epoch_consensus_epoch_execution_commitment_key(hash),
ctx,
);
}
pub fn epoch_execution_commitment_from_db(
&self, hash: &H256,
) -> Option<EpochExecutionCommitment> {
self.load_decodable_val(
DBTable::Blocks,
&epoch_consensus_epoch_execution_commitment_key(hash),
)
}
pub fn remove_epoch_execution_commitment_from_db(&self, hash: &H256) {
self.remove_from_db(
DBTable::Blocks,
&epoch_consensus_epoch_execution_commitment_key(hash),
);
}
pub fn insert_instance_id_to_db(&self, instance_id: u64) {
self.insert_encodable_val(DBTable::Misc, b"instance", &instance_id);
}
pub fn instance_id_from_db(&self) -> Option<u64> {
self.load_decodable_val(DBTable::Misc, b"instance")
}
pub fn insert_execution_context_to_db(
&self, hash: &H256, ctx: &EpochExecutionContext,
) {
self.insert_encodable_val(
DBTable::Blocks,
&epoch_execution_context_key(hash),
ctx,
)
}
pub fn execution_context_from_db(
&self, hash: &H256,
) -> Option<EpochExecutionContext> {
self.load_decodable_val(
DBTable::Blocks,
&epoch_execution_context_key(hash),
)
}
pub fn remove_epoch_execution_context_from_db(&self, hash: &H256) {
self.remove_from_db(DBTable::Blocks, &epoch_execution_context_key(hash))
}
pub fn insert_gc_progress_to_db(&self, next_to_process: u64) {
self.insert_encodable_val(
DBTable::Misc,
GC_PROGRESS_KEY,
&next_to_process,
);
}
pub fn gc_progress_from_db(&self) -> Option<u64> {
self.load_decodable_val(DBTable::Misc, GC_PROGRESS_KEY)
}
pub fn insert_pos_reward(
&self, pos_epoch: u64, pos_reward: &PosRewardInfo,
) {
self.insert_encodable_val(
DBTable::RewardByPosEpoch,
&pos_epoch.to_be_bytes(),
pos_reward,
);
}
pub fn pos_reward_by_pos_epoch(
&self, pos_epoch: u64,
) -> Option<PosRewardInfo> {
self.load_decodable_val(
DBTable::RewardByPosEpoch,
&pos_epoch.to_be_bytes(),
)
}
fn insert_to_db(&self, table: DBTable, db_key: &[u8], value: Vec<u8>) {
self.table_db
.get(&table)
.unwrap()
.put(db_key, &value)
.expect("db insertion failure");
}
fn remove_from_db(&self, table: DBTable, db_key: &[u8]) {
self.table_db
.get(&table)
.unwrap()
.delete(db_key)
.expect("db removal failure");
}
fn load_from_db(&self, table: DBTable, db_key: &[u8]) -> Option<Box<[u8]>> {
self.table_db
.get(&table)
.unwrap()
.get(db_key)
.expect("db read failure")
}
fn insert_encodable_val<V>(
&self, table: DBTable, db_key: &[u8], value: &V,
) where V: DatabaseEncodable {
self.insert_to_db(table, db_key, value.db_encode())
}
fn insert_encodable_list<V>(
&self, table: DBTable, db_key: &[u8], value: &Vec<V>,
) where V: DatabaseEncodable {
self.insert_to_db(table, db_key, db_encode_list(value))
}
fn load_decodable_val<V>(
&self, table: DBTable, db_key: &[u8],
) -> Option<V>
where V: DatabaseDecodable {
let encoded = self.load_from_db(table, db_key)?;
Some(V::db_decode(&encoded).expect("decode succeeds"))
}
fn load_might_decodable_val<V>(
&self, table: DBTable, db_key: &[u8],
) -> Option<V>
where V: DatabaseDecodable {
let encoded = self.load_from_db(table, db_key)?;
V::db_decode(&encoded).ok()
}
fn load_decodable_list<V>(
&self, table: DBTable, db_key: &[u8],
) -> Option<Vec<V>>
where V: DatabaseDecodable {
let encoded = self.load_from_db(table, db_key)?;
Some(db_decode_list(&encoded).expect("decode succeeds"))
}
}
fn append_suffix(h: &H256, suffix: u8) -> Vec<u8> {
let mut key = Vec::with_capacity(H256::len_bytes() + 1);
key.extend_from_slice(h.as_bytes());
key.push(suffix);
key
}
fn local_block_info_key(block_hash: &H256) -> Vec<u8> {
append_suffix(block_hash, LOCAL_BLOCK_INFO_SUFFIX_BYTE)
}
fn blamed_header_verified_roots_key(block_height: u64) -> [u8; 8] {
let mut height_key = [0; 8];
LittleEndian::write_u64(&mut height_key[0..8], block_height);
height_key
}
fn block_body_key(block_hash: &H256) -> Vec<u8> {
append_suffix(block_hash, BLOCK_BODY_SUFFIX_BYTE)
}
fn executed_epoch_set_key(epoch_number: u64) -> [u8; 9] {
let mut epoch_key = [0; 9];
LittleEndian::write_u64(&mut epoch_key[0..8], epoch_number);
epoch_key[8] = EPOCH_EXECUTED_BLOCK_SET_SUFFIX_BYTE;
epoch_key
}
fn skipped_epoch_set_key(epoch_number: u64) -> [u8; 9] {
let mut epoch_key = [0; 9];
LittleEndian::write_u64(&mut epoch_key[0..8], epoch_number);
epoch_key[8] = EPOCH_SKIPPED_BLOCK_SET_SUFFIX_BYTE;
epoch_key
}
fn block_execution_result_key(hash: &H256) -> Vec<u8> {
append_suffix(hash, BLOCK_EXECUTION_RESULT_SUFFIX_BYTE)
}
fn block_reward_result_key(hash: &H256) -> Vec<u8> {
append_suffix(hash, BLOCK_REWARD_RESULT_SUFFIX_BYTE)
}
fn epoch_execution_context_key(hash: &H256) -> Vec<u8> {
append_suffix(hash, EPOCH_EXECUTION_CONTEXT_SUFFIX_BYTE)
}
fn epoch_consensus_epoch_execution_commitment_key(hash: &H256) -> Vec<u8> {
append_suffix(hash, EPOCH_CONSENSUS_EXECUTION_INFO_SUFFIX_BYTE)
}
impl MallocSizeOf for DBManager {
fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
self.table_db
.get(&DBTable::Blocks)
.expect("DBManager initialized")
.size_of(ops)
}
}