#![forbid(unsafe_code)]
use std::{
collections::HashMap,
iter::Iterator,
path::Path,
sync::{mpsc, Arc, Mutex},
thread::{self, JoinHandle},
time::{Duration, Instant},
};
use anyhow::{ensure, Result};
use itertools::{izip, zip_eq};
use once_cell::sync::Lazy;
use diem_config::config::RocksdbConfig;
use diem_crypto::hash::{
CryptoHash, HashValue, SPARSE_MERKLE_PLACEHOLDER_HASH,
};
use diem_logger::prelude::*;
use diem_types::{
account_address::AccountAddress,
account_state_blob::{AccountStateBlob, AccountStateWithProof},
committed_block::CommittedBlock,
contract_event::ContractEvent,
epoch_change::EpochChangeProof,
ledger_info::LedgerInfoWithSignatures,
proof::{
AccountStateProof, AccumulatorConsistencyProof, SparseMerkleProof,
TransactionListProof,
},
reward_distribution_event::RewardDistributionEventV2,
term_state::PosState,
transaction::{
Transaction, TransactionInfo, TransactionListWithProof,
TransactionToCommit, TransactionWithProof, Version,
PRE_GENESIS_VERSION,
},
};
#[cfg(feature = "fuzzing")]
pub use diemdb_test::test_save_blocks_impl;
use schemadb::{ColumnFamilyName, Options, DB, DEFAULT_CF_NAME};
#[cfg(any(test, feature = "fuzzing"))]
use storage_interface::Order;
use storage_interface::{
DBReaderForPoW, DbReader, DbWriter, StartupInfo, TreeState,
};
use crate::{
backup::{backup_handler::BackupHandler, restore_handler::RestoreHandler},
change_set::{ChangeSet, SealedChangeSet},
errors::DiemDbError,
event_store::EventStore,
ledger_counters::LedgerCounters,
ledger_store::LedgerStore,
metrics::{
DIEM_STORAGE_API_LATENCY_SECONDS, DIEM_STORAGE_COMMITTED_TXNS,
DIEM_STORAGE_LATEST_TXN_VERSION, DIEM_STORAGE_LEDGER_VERSION,
DIEM_STORAGE_NEXT_BLOCK_EPOCH, DIEM_STORAGE_OTHER_TIMERS_SECONDS,
DIEM_STORAGE_ROCKSDB_PROPERTIES,
},
pruner::Pruner,
schema::*,
state_store::StateStore,
system_store::SystemStore,
transaction_store::TransactionStore,
};
use diem_types::block_metadata::BlockMetadata;
#[cfg(any(feature = "diemsum"))]
pub mod diemsum;
#[cfg(any(test, feature = "fuzzing"))]
pub mod test_helper;
pub mod backup;
pub mod errors;
pub mod metrics;
pub mod schema;
mod change_set;
mod event_store;
mod ledger_counters;
mod ledger_store;
mod pruner;
mod state_store;
mod system_store;
mod transaction_store;
#[cfg(any(test, feature = "fuzzing"))]
#[allow(dead_code)]
mod diemdb_test;
const MAX_LIMIT: u64 = 1000;
const MAX_NUM_EPOCH_ENDING_LEDGER_INFO: usize = 100;
static ROCKSDB_PROPERTY_MAP: Lazy<HashMap<&str, &str>> = Lazy::new(|| {
[
(
"diem_rocksdb_live_sst_files_size_bytes",
"rocksdb.live-sst-files-size",
),
(
"diem_rocksdb_all_memtables_size_bytes",
"rocksdb.size-all-mem-tables",
),
(
"diem_rocksdb_num_running_compactions",
"rocksdb.num-running-compactions",
),
(
"diem_rocksdb_num_running_flushes",
"rocksdb.num-running-flushes",
),
(
"diem_rocksdb_block_cache_usage_bytes",
"rocksdb.block-cache-usage",
),
(
"diem_rocksdb_cf_size_bytes",
"rocksdb.estimate-live-data-size",
),
]
.iter()
.cloned()
.collect()
});
fn error_if_too_many_requested(
num_requested: u64, max_allowed: u64,
) -> Result<()> {
if num_requested > max_allowed {
Err(DiemDbError::TooManyRequested(num_requested, max_allowed).into())
} else {
Ok(())
}
}
fn gen_rocksdb_options(config: &RocksdbConfig) -> Options {
let mut db_opts = Options::default();
db_opts.set_max_open_files(config.max_open_files);
db_opts.set_max_total_wal_size(config.max_total_wal_size);
db_opts
}
fn update_rocksdb_properties(db: &DB) -> Result<()> {
let _timer = DIEM_STORAGE_OTHER_TIMERS_SECONDS
.with_label_values(&["update_rocksdb_properties"])
.start_timer();
for cf_name in PosLedgerDB::column_families() {
for (property_name, rocksdb_property_argument) in &*ROCKSDB_PROPERTY_MAP
{
DIEM_STORAGE_ROCKSDB_PROPERTIES
.with_label_values(&[cf_name, property_name])
.set(
db.get_property(cf_name, rocksdb_property_argument)? as i64
);
}
}
Ok(())
}
#[derive(Debug)]
struct RocksdbPropertyReporter {
sender: Mutex<mpsc::Sender<()>>,
join_handle: Option<JoinHandle<()>>,
}
impl RocksdbPropertyReporter {
fn new(db: Arc<DB>) -> Self {
let (send, recv) = mpsc::channel();
let join_handle = Some(thread::spawn(move || loop {
if let Err(e) = update_rocksdb_properties(&db) {
diem_warn!(
error = ?e,
"Updating rocksdb property failed."
);
}
match recv.recv_timeout(Duration::from_secs(10)) {
Ok(_) => break,
Err(mpsc::RecvTimeoutError::Timeout) => (),
Err(mpsc::RecvTimeoutError::Disconnected) => break,
}
}));
Self {
sender: Mutex::new(send),
join_handle,
}
}
}
impl Drop for RocksdbPropertyReporter {
fn drop(&mut self) {
self.sender.lock().unwrap().send(()).unwrap();
self.join_handle
.take()
.expect("Rocksdb property reporting thread must exist.")
.join()
.expect(
"Rocksdb property reporting thread should join peacefully.",
);
}
}
#[derive(Debug)]
pub struct PosLedgerDB {
db: Arc<DB>,
ledger_store: Arc<LedgerStore>,
transaction_store: Arc<TransactionStore>,
state_store: Arc<StateStore>,
event_store: Arc<EventStore>,
system_store: SystemStore,
#[allow(dead_code)]
rocksdb_property_reporter: RocksdbPropertyReporter,
pruner: Option<Pruner>,
}
impl PosLedgerDB {
fn column_families() -> Vec<ColumnFamilyName> {
vec![
DEFAULT_CF_NAME,
EPOCH_BY_VERSION_CF_NAME,
EVENT_ACCUMULATOR_CF_NAME,
EVENT_BY_KEY_CF_NAME,
EVENT_BY_VERSION_CF_NAME,
EVENT_CF_NAME,
JELLYFISH_MERKLE_NODE_CF_NAME,
LEDGER_COUNTERS_CF_NAME,
STALE_NODE_INDEX_CF_NAME,
TRANSACTION_CF_NAME,
TRANSACTION_ACCUMULATOR_CF_NAME,
TRANSACTION_BY_ACCOUNT_CF_NAME,
TRANSACTION_INFO_CF_NAME,
LEDGER_INFO_BY_BLOCK_CF_NAME,
POS_STATE_CF_NAME,
REWARD_EVENT_CF_NAME,
COMMITTED_BLOCK_CF_NAME,
COMMITTED_BLOCK_BY_VIEW_CF_NAME,
LEDGER_INFO_BY_VOTED_BLOCK_CF_NAME,
BLOCK_BY_EPOCH_AND_ROUND_CF_NAME,
]
}
fn new_with_db(db: DB, prune_window: Option<u64>) -> Self {
let db = Arc::new(db);
PosLedgerDB {
db: Arc::clone(&db),
event_store: Arc::new(EventStore::new(Arc::clone(&db))),
ledger_store: Arc::new(LedgerStore::new(Arc::clone(&db))),
state_store: Arc::new(StateStore::new(Arc::clone(&db))),
transaction_store: Arc::new(TransactionStore::new(Arc::clone(&db))),
system_store: SystemStore::new(Arc::clone(&db)),
rocksdb_property_reporter: RocksdbPropertyReporter::new(
Arc::clone(&db),
),
pruner: prune_window.map(|n| Pruner::new(Arc::clone(&db), n)),
}
}
pub fn open<P: AsRef<Path> + Clone>(
db_root_path: P, readonly: bool, prune_window: Option<u64>,
rocksdb_config: RocksdbConfig,
) -> Result<Self> {
ensure!(
prune_window.is_none() || !readonly,
"Do not set prune_window when opening readonly.",
);
let path = db_root_path.as_ref().join("pos-ledger-db");
let instant = Instant::now();
let mut rocksdb_opts = gen_rocksdb_options(&rocksdb_config);
let db = if readonly {
DB::open_readonly(
path.clone(),
"diemdb_ro",
Self::column_families(),
rocksdb_opts,
)?
} else {
rocksdb_opts.create_if_missing(true);
rocksdb_opts.create_missing_column_families(true);
DB::open(
path.clone(),
"pos-ledger-db",
Self::column_families(),
rocksdb_opts,
)?
};
let ret = Self::new_with_db(db, prune_window);
diem_info!(
path = path,
time_ms = %instant.elapsed().as_millis(),
"Opened PosLedgerDB.",
);
Ok(ret)
}
#[cfg(any(test, feature = "fuzzing"))]
pub fn new_for_test<P: AsRef<Path> + Clone>(db_root_path: P) -> Self {
Self::open(
db_root_path,
false, None, RocksdbConfig::default(),
)
.expect("Unable to open DiemDB")
}
pub fn update_rocksdb_properties(&self) -> Result<()> {
update_rocksdb_properties(&self.db)
}
fn get_epoch_ending_ledger_infos(
&self, start_epoch: u64, end_epoch: u64, limit: usize,
) -> Result<(Vec<LedgerInfoWithSignatures>, bool)> {
self.get_epoch_ending_ledger_infos_impl(start_epoch, end_epoch, limit)
}
fn get_epoch_ending_ledger_infos_impl(
&self, start_epoch: u64, end_epoch: u64, limit: usize,
) -> Result<(Vec<LedgerInfoWithSignatures>, bool)> {
ensure!(
start_epoch <= end_epoch,
"Bad epoch range [{}, {})",
start_epoch,
end_epoch,
);
let latest_epoch = self
.ledger_store
.get_latest_ledger_info()?
.ledger_info()
.next_block_epoch();
ensure!(
end_epoch <= latest_epoch,
"Unable to provide epoch change ledger info for still open epoch. asked upper bound: {}, last sealed epoch: {}",
end_epoch,
latest_epoch - 1, );
let (paging_epoch, more) = if end_epoch - start_epoch > limit as u64 {
(start_epoch + limit as u64, true)
} else {
(end_epoch, false)
};
let lis = self
.ledger_store
.get_epoch_ending_ledger_info_iter(start_epoch, paging_epoch)?
.collect::<Result<Vec<_>>>()?;
ensure!(
lis.len() == (paging_epoch - start_epoch) as usize,
"DB corruption: missing epoch ending ledger info for epoch {}",
lis.last()
.map(|li| li.ledger_info().next_block_epoch())
.unwrap_or(start_epoch),
);
Ok((lis, more))
}
fn get_transaction_with_proof(
&self, version: Version, ledger_version: Version, fetch_events: bool,
) -> Result<TransactionWithProof> {
let proof = self
.ledger_store
.get_transaction_info_with_proof(version, ledger_version)?;
let transaction = self.transaction_store.get_transaction(version)?;
let events = if fetch_events {
Some(self.event_store.get_events_by_version(version)?)
} else {
None
};
Ok(TransactionWithProof {
version,
transaction,
events,
proof,
})
}
pub fn get_transaction(&self, version: Version) -> Result<Transaction> {
self.transaction_store.get_transaction(version)
}
pub fn get_transaction_block_meta(
&self, version: Version,
) -> Result<Option<(Version, BlockMetadata)>> {
self.transaction_store.get_block_metadata(version)
}
pub fn get_transaction_info(
&self, version: u64,
) -> Result<TransactionInfo> {
self.ledger_store.get_transaction_info(version)
}
pub fn get_backup_handler(&self) -> BackupHandler {
BackupHandler::new(
Arc::clone(&self.ledger_store),
Arc::clone(&self.transaction_store),
Arc::clone(&self.state_store),
Arc::clone(&self.event_store),
)
}
fn seal_change_set(
&self, first_version: Version, num_txns: Version, mut cs: ChangeSet,
) -> Result<(SealedChangeSet, Option<LedgerCounters>)> {
let counters = if num_txns > 0 {
Some(self.system_store.bump_ledger_counters(
first_version,
first_version + num_txns - 1,
&mut cs,
)?)
} else {
None
};
Ok((SealedChangeSet { batch: cs.batch }, counters))
}
fn save_transactions_impl(
&self, txns_to_commit: &[TransactionToCommit], first_version: u64,
mut cs: &mut ChangeSet,
) -> Result<HashValue> {
let last_version = first_version + txns_to_commit.len() as u64 - 1;
let account_state_sets = txns_to_commit
.iter()
.map(|txn_to_commit| txn_to_commit.account_states().clone())
.collect::<Vec<_>>();
let state_root_hashes = if first_version == 0 {
self.state_store.put_account_state_sets(
account_state_sets,
first_version,
&mut cs,
)?
} else {
vec![Default::default(); txns_to_commit.len()]
};
diem_debug!(
"save_transactions_impl: {} {:?}",
first_version,
state_root_hashes
);
let event_root_hashes =
zip_eq(first_version..=last_version, txns_to_commit)
.map(|(ver, txn_to_commit)| {
self.event_store.put_events(
ver,
txn_to_commit.events(),
&mut cs,
)
})
.collect::<Result<Vec<_>>>()?;
zip_eq(first_version..=last_version, txns_to_commit).try_for_each(
|(ver, txn_to_commit)| {
self.transaction_store.put_transaction(
ver,
txn_to_commit.transaction(),
&mut cs,
)
},
)?;
let txn_infos =
izip!(txns_to_commit, state_root_hashes, event_root_hashes)
.map(|(t, s, e)| {
Ok(TransactionInfo::new(
t.transaction().hash(),
s,
e,
t.gas_used(),
t.status().clone(),
))
})
.collect::<Result<Vec<_>>>()?;
assert_eq!(txn_infos.len(), txns_to_commit.len());
let mut new_root_hash = self.ledger_store.put_transaction_infos(
first_version,
&txn_infos,
&mut cs,
)?;
if first_version != 0 {
new_root_hash = Default::default();
};
Ok(new_root_hash)
}
fn commit(&self, sealed_cs: SealedChangeSet) -> Result<()> {
self.db.write_schemas(sealed_cs.batch, false)?;
Ok(())
}
fn wake_pruner(&self, latest_version: Version) {
if let Some(pruner) = self.pruner.as_ref() {
pruner.wake(latest_version)
}
}
}
impl DbReader for PosLedgerDB {
fn get_epoch_ending_ledger_infos(
&self, start_epoch: u64, end_epoch: u64,
) -> Result<EpochChangeProof> {
gauged_api("get_epoch_ending_ledger_infos", || {
let (ledger_info_with_sigs, more) =
Self::get_epoch_ending_ledger_infos(
&self,
start_epoch,
end_epoch,
MAX_NUM_EPOCH_ENDING_LEDGER_INFO,
)?;
Ok(EpochChangeProof::new(ledger_info_with_sigs, more))
})
}
fn get_transactions(
&self, start_version: Version, limit: u64, ledger_version: Version,
fetch_events: bool,
) -> Result<TransactionListWithProof> {
gauged_api("get_transactions", || {
error_if_too_many_requested(limit, MAX_LIMIT)?;
if start_version > ledger_version || limit == 0 {
return Ok(TransactionListWithProof::new_empty());
}
let limit =
std::cmp::min(limit, ledger_version - start_version + 1);
let txns = (start_version..start_version + limit)
.map(|version| self.transaction_store.get_transaction(version))
.collect::<Result<Vec<_>>>()?;
let txn_infos = (start_version..start_version + limit)
.map(|version| self.ledger_store.get_transaction_info(version))
.collect::<Result<Vec<_>>>()?;
let events = if fetch_events {
Some(
(start_version..start_version + limit)
.map(|version| {
self.event_store.get_events_by_version(version)
})
.collect::<Result<Vec<_>>>()?,
)
} else {
None
};
let proof = TransactionListProof::new(
self.ledger_store.get_transaction_range_proof(
Some(start_version),
limit,
ledger_version,
)?,
txn_infos,
);
Ok(TransactionListWithProof::new(
txns,
events,
Some(start_version),
proof,
))
})
}
fn get_block_timestamp(&self, version: u64) -> Result<u64> {
gauged_api("get_block_timestamp", || {
let ts = match self.transaction_store.get_block_metadata(version)? {
Some((_v, block_meta)) => block_meta.into_inner().1,
None => 0,
};
Ok(ts)
})
}
fn get_latest_account_state(
&self, address: AccountAddress,
) -> Result<Option<AccountStateBlob>> {
gauged_api("get_latest_account_state", || {
let ledger_info_with_sigs =
self.ledger_store.get_latest_ledger_info()?;
let version = ledger_info_with_sigs.ledger_info().version();
let (blob, _proof) = self
.state_store
.get_account_state_with_proof_by_version(address, version)?;
Ok(blob)
})
}
fn get_latest_ledger_info(&self) -> Result<LedgerInfoWithSignatures> {
gauged_api("get_latest_ledger_info", || {
self.ledger_store.get_latest_ledger_info()
})
}
fn get_startup_info(
&self, need_pos_state: bool,
) -> Result<Option<StartupInfo>> {
gauged_api("get_startup_info", || {
self.ledger_store.get_startup_info(need_pos_state)
})
}
fn get_txn_by_account(
&self, address: AccountAddress, seq_num: u64, ledger_version: Version,
fetch_events: bool,
) -> Result<Option<TransactionWithProof>> {
gauged_api("get_txn_by_account", || {
self.transaction_store
.lookup_transaction_by_account(
address,
seq_num,
ledger_version,
)?
.map(|version| {
self.get_transaction_with_proof(
version,
ledger_version,
fetch_events,
)
})
.transpose()
})
}
fn get_state_proof_with_ledger_info(
&self, known_version: u64,
ledger_info_with_sigs: LedgerInfoWithSignatures,
) -> Result<(EpochChangeProof, AccumulatorConsistencyProof)> {
gauged_api("get_state_proof_with_ledger_info", || {
let ledger_info = ledger_info_with_sigs.ledger_info();
ensure!(
known_version <= ledger_info.version(),
"Client known_version {} larger than ledger version {}.",
known_version,
ledger_info.version(),
);
let known_epoch = self.ledger_store.get_epoch(known_version)?;
let epoch_change_proof =
if known_epoch < ledger_info.next_block_epoch() {
let (ledger_infos_with_sigs, more) = self
.get_epoch_ending_ledger_infos(
known_epoch,
ledger_info.next_block_epoch(),
usize::MAX,
)?;
EpochChangeProof::new(ledger_infos_with_sigs, more)
} else {
EpochChangeProof::new(vec![], false)
};
let ledger_consistency_proof = self
.ledger_store
.get_consistency_proof(known_version, ledger_info.version())?;
Ok((epoch_change_proof, ledger_consistency_proof))
})
}
fn get_state_proof(
&self, known_version: u64,
) -> Result<(
LedgerInfoWithSignatures,
EpochChangeProof,
AccumulatorConsistencyProof,
)> {
gauged_api("get_state_proof", || {
let ledger_info_with_sigs =
self.ledger_store.get_latest_ledger_info()?;
let (epoch_change_proof, ledger_consistency_proof) = self
.get_state_proof_with_ledger_info(
known_version,
ledger_info_with_sigs.clone(),
)?;
Ok((
ledger_info_with_sigs,
epoch_change_proof,
ledger_consistency_proof,
))
})
}
fn get_account_state_with_proof(
&self, address: AccountAddress, version: Version,
ledger_version: Version,
) -> Result<AccountStateWithProof> {
gauged_api("get_account_state_with_proof", || {
ensure!(
version <= ledger_version,
"The queried version {} should be equal to or older than ledger version {}.",
version,
ledger_version
);
{
let latest_version = self.get_latest_version()?;
ensure!(
ledger_version <= latest_version,
"ledger_version specified {} is greater than committed version {}.",
ledger_version,
latest_version
);
}
let txn_info_with_proof = self
.ledger_store
.get_transaction_info_with_proof(version, ledger_version)?;
let (account_state_blob, sparse_merkle_proof) = self
.state_store
.get_account_state_with_proof_by_version(address, version)?;
Ok(AccountStateWithProof::new(
version,
account_state_blob,
AccountStateProof::new(
txn_info_with_proof,
sparse_merkle_proof,
),
))
})
}
fn get_account_state_with_proof_by_version(
&self, address: AccountAddress, version: Version,
) -> Result<(
Option<AccountStateBlob>,
SparseMerkleProof<AccountStateBlob>,
)> {
gauged_api("get_account_state_with_proof_by_version", || {
self.state_store
.get_account_state_with_proof_by_version(address, version)
})
}
fn get_latest_state_root(&self) -> Result<(Version, HashValue)> {
gauged_api("get_latest_state_root", || {
let (version, txn_info) =
self.ledger_store.get_latest_transaction_info()?;
Ok((version, txn_info.state_root_hash()))
})
}
fn get_latest_tree_state(&self) -> Result<TreeState> {
gauged_api("get_latest_tree_state", || {
let tree_state =
match self.ledger_store.get_latest_transaction_info_option()? {
Some((version, txn_info)) => self
.ledger_store
.get_tree_state(version + 1, txn_info)?,
None => TreeState::new(
0,
vec![],
self.state_store
.get_root_hash_option(PRE_GENESIS_VERSION)?
.unwrap_or(*SPARSE_MERKLE_PLACEHOLDER_HASH),
),
};
diem_info!(
num_transactions = tree_state.num_transactions,
state_root_hash = %tree_state.account_state_root_hash,
description = tree_state.describe(),
"Got latest TreeState."
);
Ok(tree_state)
})
}
fn get_epoch_ending_ledger_info(
&self, version: u64,
) -> Result<LedgerInfoWithSignatures> {
gauged_api("get_epoch_ending_ledger_info", || {
self.ledger_store.get_epoch_ending_ledger_info(version)
})
}
fn get_latest_transaction_info_option(
&self,
) -> Result<Option<(Version, TransactionInfo)>> {
gauged_api("get_latest_transaction_info_option", || {
self.ledger_store.get_latest_transaction_info_option()
})
}
fn get_accumulator_root_hash(&self, version: Version) -> Result<HashValue> {
gauged_api("get_accumulator_root_hash", || {
self.ledger_store.get_root_hash(version)
})
}
fn get_pos_state(&self, block_id: &HashValue) -> Result<PosState> {
diem_debug!("get_pos_state:{}", block_id);
self.ledger_store.get_pos_state(block_id)
}
fn get_latest_pos_state(&self) -> Arc<PosState> {
self.ledger_store.get_latest_pos_state()
}
}
impl DbWriter for PosLedgerDB {
fn save_transactions(
&self, txns_to_commit: &[TransactionToCommit], first_version: Version,
ledger_info_with_sigs: Option<&LedgerInfoWithSignatures>,
pos_state: Option<PosState>, committed_blocks: Vec<CommittedBlock>,
ledger_infos_with_voted_block: Vec<(
HashValue,
LedgerInfoWithSignatures,
)>,
) -> Result<()> {
gauged_api("save_transactions", || {
let num_txns = txns_to_commit.len() as u64;
ensure!(
ledger_info_with_sigs.is_some() || num_txns > 0,
"txns_to_commit is empty while ledger_info_with_sigs is None.",
);
let mut cs = ChangeSet::new();
if let Some(x) = ledger_info_with_sigs {
let claimed_last_version = x.ledger_info().version();
ensure!(
claimed_last_version + 1 == first_version + num_txns,
"Transaction batch not applicable: first_version {}, num_txns {}, last_version {}",
first_version,
num_txns,
claimed_last_version,
);
}
let _new_root_hash = self.save_transactions_impl(
txns_to_commit,
first_version,
&mut cs,
)?;
for b in committed_blocks {
self.ledger_store.put_committed_block(&b, &mut cs)?;
}
for (voted_block, ledger_info) in ledger_infos_with_voted_block {
self.ledger_store.put_ledger_info_by_voted_block(
&voted_block,
&ledger_info,
&mut cs,
)?;
}
if let Some(x) = ledger_info_with_sigs {
self.ledger_store.put_ledger_info(x, &mut cs)?;
if let Some(pos_state) = pos_state {
self.ledger_store.put_pos_state(
&x.ledger_info().consensus_block_id(),
pos_state,
&mut cs,
)?;
}
}
let (sealed_cs, counters) =
self.seal_change_set(first_version, num_txns, cs)?;
{
let _timer = DIEM_STORAGE_OTHER_TIMERS_SECONDS
.with_label_values(&["save_transactions_commit"])
.start_timer();
self.commit(sealed_cs)?;
}
if let Some(x) = ledger_info_with_sigs {
self.ledger_store.set_latest_ledger_info(x.clone());
DIEM_STORAGE_LEDGER_VERSION
.set(x.ledger_info().version() as i64);
DIEM_STORAGE_NEXT_BLOCK_EPOCH
.set(x.ledger_info().next_block_epoch() as i64);
}
if num_txns > 0 {
let last_version = first_version + num_txns - 1;
DIEM_STORAGE_COMMITTED_TXNS.inc_by(num_txns);
DIEM_STORAGE_LATEST_TXN_VERSION.set(last_version as i64);
counters
.expect("Counters should be bumped with transactions being saved.")
.bump_op_counters();
self.wake_pruner(last_version);
}
Ok(())
})
}
fn save_reward_event(
&self, epoch: u64, event: &RewardDistributionEventV2,
) -> Result<()> {
self.ledger_store.put_reward_event(epoch, event)
}
fn delete_pos_state_by_block(&self, block_id: &HashValue) -> Result<()> {
self.ledger_store.delete_pos_state(block_id)
}
}
impl DBReaderForPoW for PosLedgerDB {
fn get_latest_ledger_info_option(
&self,
) -> Option<LedgerInfoWithSignatures> {
self.ledger_store.get_latest_ledger_info_option()
}
fn get_block_ledger_info(
&self, consensus_block_id: &HashValue,
) -> Result<LedgerInfoWithSignatures> {
self.ledger_store.get_block_ledger_info(consensus_block_id)
}
fn get_events_by_version(
&self, start_version: u64, end_version: u64,
) -> Result<Vec<ContractEvent>> {
let iter = self.event_store.get_events_by_version_iter(
start_version,
(end_version - start_version) as usize,
)?;
let events_vec = iter.collect::<Result<Vec<Vec<ContractEvent>>>>()?;
Ok(events_vec.into_iter().flatten().collect())
}
fn get_epoch_ending_blocks(
&self, start_epoch: u64, end_epoch: u64,
) -> Result<Vec<HashValue>> {
let mut ending_blocks = Vec::new();
for ledger_info in self
.ledger_store
.get_epoch_ending_ledger_info_iter(start_epoch, end_epoch)?
{
ending_blocks.push(ledger_info?.ledger_info().consensus_block_id());
}
Ok(ending_blocks)
}
fn get_reward_event(
&self, epoch: u64,
) -> Result<RewardDistributionEventV2> {
self.ledger_store.get_reward_event(epoch)
}
fn get_committed_block_by_hash(
&self, block_hash: &HashValue,
) -> Result<CommittedBlock> {
self.ledger_store.get_committed_block_by_hash(block_hash)
}
fn get_committed_block_hash_by_view(&self, view: u64) -> Result<HashValue> {
self.ledger_store.get_committed_block_hash_by_view(view)
}
fn get_ledger_info_by_voted_block(
&self, block_id: &HashValue,
) -> Result<LedgerInfoWithSignatures> {
self.ledger_store.get_ledger_info_by_voted_block(block_id)
}
fn get_block_hash_by_epoch_and_round(
&self, epoch: u64, round: u64,
) -> Result<HashValue> {
self.ledger_store
.get_block_hash_by_epoch_and_round(epoch, round)
}
}
#[cfg(any(test, feature = "fuzzing"))]
fn get_first_seq_num_and_limit(
order: Order, cursor: u64, limit: u64,
) -> Result<(u64, u64)> {
ensure!(limit > 0, "limit should > 0, got {}", limit);
Ok(if order == Order::Ascending {
(cursor, limit)
} else if limit <= cursor {
(cursor - limit + 1, limit)
} else {
(0, cursor + 1)
})
}
pub trait GetRestoreHandler {
fn get_restore_handler(&self) -> RestoreHandler;
}
impl GetRestoreHandler for Arc<PosLedgerDB> {
fn get_restore_handler(&self) -> RestoreHandler {
RestoreHandler::new(
Arc::clone(&self.db),
Arc::clone(self),
Arc::clone(&self.ledger_store),
Arc::clone(&self.transaction_store),
Arc::clone(&self.state_store),
Arc::clone(&self.event_store),
)
}
}
fn gauged_api<T, F>(api_name: &'static str, api_impl: F) -> Result<T>
where F: FnOnce() -> Result<T> {
let timer = Instant::now();
let res = api_impl();
let res_type = match &res {
Ok(_) => "Ok",
Err(e) => {
diem_warn!(
api_name = api_name,
error = ?e,
"DiemDB API returned error."
);
"Err"
}
};
DIEM_STORAGE_API_LATENCY_SECONDS
.with_label_values(&[api_name, res_type])
.observe(timer.elapsed().as_secs_f64());
res
}