use anyhow::{format_err, Result};
use diem_crypto::{hash::SPARSE_MERKLE_PLACEHOLDER_HASH, HashValue};
use diem_types::{
access_path::AccessPath,
account_address::AccountAddress,
account_state::AccountState,
account_state_blob::{AccountStateBlob, AccountStateWithProof},
committed_block::CommittedBlock,
contract_event::ContractEvent,
epoch_change::EpochChangeProof,
epoch_state::EpochState,
ledger_info::{
deserialize_ledger_info_unchecked, LedgerInfoWithSignatures,
},
move_resource::MoveStorage,
proof::{
definition::LeafCount, AccumulatorConsistencyProof, SparseMerkleProof,
},
reward_distribution_event::RewardDistributionEventV2,
term_state::PosState,
transaction::{
TransactionInfo, TransactionListWithProof, TransactionToCommit,
TransactionWithProof, Version,
},
};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, HashSet},
convert::TryFrom,
sync::Arc,
};
use thiserror::Error;
pub mod mock;
pub mod state_view;
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct StartupInfo {
#[serde(deserialize_with = "deserialize_ledger_info_unchecked")]
pub latest_ledger_info: LedgerInfoWithSignatures,
pub latest_epoch_state: Option<EpochState>,
pub committed_tree_state: TreeState,
pub synced_tree_state: Option<TreeState>,
pub committed_pos_state: PosState,
}
impl StartupInfo {
pub fn new(
latest_ledger_info: LedgerInfoWithSignatures,
latest_epoch_state: Option<EpochState>,
committed_tree_state: TreeState, synced_tree_state: Option<TreeState>,
committed_pos_state: PosState,
) -> Self {
Self {
latest_ledger_info,
latest_epoch_state,
committed_tree_state,
synced_tree_state,
committed_pos_state,
}
}
#[cfg(any(feature = "fuzzing"))]
pub fn new_for_testing() -> Self {
use diem_types::on_chain_config::ValidatorSet;
let latest_ledger_info = LedgerInfoWithSignatures::genesis(
HashValue::zero(),
ValidatorSet::empty(),
);
let latest_epoch_state = None;
let committed_tree_state = TreeState {
num_transactions: 0,
ledger_frozen_subtree_hashes: Vec::new(),
account_state_root_hash: *SPARSE_MERKLE_PLACEHOLDER_HASH,
};
let synced_tree_state = None;
Self {
latest_ledger_info,
latest_epoch_state,
committed_tree_state,
synced_tree_state,
}
}
pub fn get_epoch_state(&self) -> &EpochState {
self.latest_ledger_info
.ledger_info()
.next_epoch_state()
.unwrap_or_else(|| {
self.latest_epoch_state
.as_ref()
.expect("EpochState must exist")
})
}
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct TreeState {
pub num_transactions: LeafCount,
pub ledger_frozen_subtree_hashes: Vec<HashValue>,
pub account_state_root_hash: HashValue,
}
impl TreeState {
pub fn new(
num_transactions: LeafCount,
ledger_frozen_subtree_hashes: Vec<HashValue>,
account_state_root_hash: HashValue,
) -> Self {
Self {
num_transactions,
ledger_frozen_subtree_hashes,
account_state_root_hash,
}
}
pub fn describe(&self) -> &'static str {
if self.num_transactions != 0 {
"DB has been bootstrapped."
} else if self.account_state_root_hash
!= *SPARSE_MERKLE_PLACEHOLDER_HASH
{
"DB has no transaction, but a non-empty pre-genesis state."
} else {
"DB is empty, has no transaction or state."
}
}
}
#[derive(Debug, Deserialize, Error, PartialEq, Serialize)]
pub enum Error {
#[error("Service error: {:?}", error)]
ServiceError { error: String },
#[error("Serialization error: {0}")]
SerializationError(String),
}
impl From<anyhow::Error> for Error {
fn from(error: anyhow::Error) -> Self {
Self::ServiceError {
error: format!("{}", error),
}
}
}
impl From<bcs::Error> for Error {
fn from(error: bcs::Error) -> Self {
Self::SerializationError(format!("{}", error))
}
}
impl From<diem_secure_net::Error> for Error {
fn from(error: diem_secure_net::Error) -> Self {
Self::ServiceError {
error: format!("{}", error),
}
}
}
#[derive(Clone, Copy, Eq, PartialEq)]
pub enum Order {
Ascending,
Descending,
}
pub trait DbReader: Send + Sync {
fn get_epoch_ending_ledger_infos(
&self, start_epoch: u64, end_epoch: u64,
) -> Result<EpochChangeProof>;
fn get_transactions(
&self, start_version: Version, batch_size: u64,
ledger_version: Version, fetch_events: bool,
) -> Result<TransactionListWithProof>;
fn get_block_timestamp(&self, version: u64) -> Result<u64>;
fn get_last_version_before_timestamp(
&self, _timestamp: u64, _ledger_version: Version,
) -> Result<Version> {
unimplemented!()
}
fn get_latest_account_state(
&self, address: AccountAddress,
) -> Result<Option<AccountStateBlob>>;
fn get_latest_ledger_info(&self) -> Result<LedgerInfoWithSignatures>;
fn get_latest_version(&self) -> Result<Version> {
Ok(self.get_latest_ledger_info()?.ledger_info().version())
}
fn get_latest_commit_metadata(&self) -> Result<(Version, u64)> {
let ledger_info_with_sig = self.get_latest_ledger_info()?;
let ledger_info = ledger_info_with_sig.ledger_info();
Ok((ledger_info.version(), ledger_info.timestamp_usecs()))
}
fn get_startup_info(
&self, need_pos_state: bool,
) -> Result<Option<StartupInfo>>;
fn get_txn_by_account(
&self, address: AccountAddress, seq_num: u64, ledger_version: Version,
fetch_events: bool,
) -> Result<Option<TransactionWithProof>>;
fn get_state_proof_with_ledger_info(
&self, known_version: u64, ledger_info: LedgerInfoWithSignatures,
) -> Result<(EpochChangeProof, AccumulatorConsistencyProof)>;
fn get_state_proof(
&self, known_version: u64,
) -> Result<(
LedgerInfoWithSignatures,
EpochChangeProof,
AccumulatorConsistencyProof,
)>;
fn get_account_state_with_proof(
&self, address: AccountAddress, version: Version,
ledger_version: Version,
) -> Result<AccountStateWithProof>;
fn get_account_state_with_proof_by_version(
&self, address: AccountAddress, version: Version,
) -> Result<(
Option<AccountStateBlob>,
SparseMerkleProof<AccountStateBlob>,
)>;
fn get_latest_state_root(&self) -> Result<(Version, HashValue)>;
fn get_latest_tree_state(&self) -> Result<TreeState>;
fn get_epoch_ending_ledger_info(
&self, known_version: u64,
) -> Result<LedgerInfoWithSignatures>;
fn get_latest_transaction_info_option(
&self,
) -> Result<Option<(Version, TransactionInfo)>> {
unimplemented!()
}
fn get_accumulator_root_hash(
&self, _version: Version,
) -> Result<HashValue> {
unimplemented!()
}
fn get_pos_state(&self, _block_id: &HashValue) -> Result<PosState> {
unimplemented!()
}
fn get_latest_pos_state(&self) -> Arc<PosState> { unimplemented!() }
}
impl MoveStorage for &dyn DbReader {
fn batch_fetch_resources(
&self, access_paths: Vec<AccessPath>,
) -> Result<Vec<Vec<u8>>> {
self.batch_fetch_resources_by_version(
access_paths,
self.fetch_synced_version()?,
)
}
fn batch_fetch_resources_by_version(
&self, access_paths: Vec<AccessPath>, version: Version,
) -> Result<Vec<Vec<u8>>> {
let addresses: Vec<AccountAddress> = access_paths
.iter()
.collect::<HashSet<_>>()
.iter()
.map(|path| path.address)
.collect();
let results = addresses
.iter()
.map(|addr| {
self.get_account_state_with_proof_by_version(*addr, version)
})
.collect::<Result<Vec<_>>>()?;
let account_states = addresses
.iter()
.zip_eq(results)
.map(|(addr, (blob, _proof))| {
let account_state = AccountState::try_from(&blob.ok_or_else(|| {
format_err!("missing blob in account state/account does not exist")
})?)?;
Ok((addr, account_state))
})
.collect::<Result<HashMap<_, AccountState>>>()?;
access_paths
.iter()
.map(|path| {
Ok(account_states
.get(&path.address)
.ok_or_else(|| {
format_err!(
"missing account state for queried access path"
)
})?
.get(&path.path)
.ok_or_else(|| {
format_err!("no value found in account state")
})?
.clone())
})
.collect()
}
fn fetch_synced_version(&self) -> Result<u64> {
let (synced_version, _) = self
.get_latest_transaction_info_option()
.map_err(|e| {
format_err!(
"[MoveStorage] Failed fetching latest transaction info: {}",
e
)
})?
.ok_or_else(|| {
format_err!("[MoveStorage] Latest transaction info not found.")
})?;
Ok(synced_version)
}
}
pub trait DbWriter: Send + Sync {
fn save_transactions(
&self, txns_to_commit: &[TransactionToCommit], first_version: Version,
ledger_info_with_sigs: Option<&LedgerInfoWithSignatures>,
pos_state: Option<PosState>, committed_blocks: Vec<CommittedBlock>,
ledger_infos_with_voted_block: Vec<(
HashValue,
LedgerInfoWithSignatures,
)>,
) -> Result<()>;
fn save_reward_event(
&self, epoch: u64, event: &RewardDistributionEventV2,
) -> Result<()>;
fn delete_pos_state_by_block(&self, block_id: &HashValue) -> Result<()>;
}
#[derive(Clone)]
pub struct DbReaderWriter {
pub reader: Arc<dyn DbReader>,
pub writer: Arc<dyn DbWriter>,
}
impl DbReaderWriter {
pub fn new<D: 'static + DbReader + DbWriter>(db: D) -> Self {
let reader = Arc::new(db);
let writer = Arc::clone(&reader);
Self { reader, writer }
}
pub fn from_arc<D: 'static + DbReader + DbWriter>(arc_db: Arc<D>) -> Self {
let reader = Arc::clone(&arc_db);
let writer = Arc::clone(&arc_db);
Self { reader, writer }
}
pub fn wrap<D: 'static + DbReader + DbWriter>(db: D) -> (Arc<D>, Self) {
let arc_db = Arc::new(db);
(Arc::clone(&arc_db), Self::from_arc(arc_db))
}
}
impl<D> From<D> for DbReaderWriter
where D: 'static + DbReader + DbWriter
{
fn from(db: D) -> Self { Self::new(db) }
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum StorageRequest {
GetAccountStateWithProofByVersionRequest(
Box<GetAccountStateWithProofByVersionRequest>,
),
GetStartupInfoRequest,
SaveTransactionsRequest(Box<SaveTransactionsRequest>),
}
#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
pub struct GetAccountStateWithProofByVersionRequest {
pub address: AccountAddress,
pub version: Version,
}
impl GetAccountStateWithProofByVersionRequest {
pub fn new(address: AccountAddress, version: Version) -> Self {
Self { address, version }
}
}
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
pub struct SaveTransactionsRequest {
pub txns_to_commit: Vec<TransactionToCommit>,
pub first_version: Version,
pub ledger_info_with_signatures: Option<LedgerInfoWithSignatures>,
}
impl SaveTransactionsRequest {
pub fn new(
txns_to_commit: Vec<TransactionToCommit>, first_version: Version,
ledger_info_with_signatures: Option<LedgerInfoWithSignatures>,
) -> Self {
SaveTransactionsRequest {
txns_to_commit,
first_version,
ledger_info_with_signatures,
}
}
}
pub trait DBReaderForPoW: Send + Sync + DbReader {
fn get_latest_ledger_info_option(&self)
-> Option<LedgerInfoWithSignatures>;
fn get_block_ledger_info(
&self, consensus_block_id: &HashValue,
) -> Result<LedgerInfoWithSignatures>;
fn get_events_by_version(
&self, start_version: u64, end_version: u64,
) -> Result<Vec<ContractEvent>>;
fn get_epoch_ending_blocks(
&self, start_epoch: u64, end_epoch: u64,
) -> Result<Vec<HashValue>>;
fn get_reward_event(&self, epoch: u64)
-> Result<RewardDistributionEventV2>;
fn get_committed_block_by_hash(
&self, block_hash: &HashValue,
) -> Result<CommittedBlock>;
fn get_committed_block_hash_by_view(&self, view: u64) -> Result<HashValue>;
fn get_ledger_info_by_voted_block(
&self, block_id: &HashValue,
) -> Result<LedgerInfoWithSignatures>;
fn get_block_hash_by_epoch_and_round(
&self, epoch: u64, round: u64,
) -> Result<HashValue>;
}