use crate::pos::consensus::{
block_storage::{
block_tree::BlockTree,
tracing::{observe_block, BlockStage},
BlockReader,
},
counters,
logging::{LogEvent, LogSchema},
persistent_liveness_storage::{
PersistentLivenessStorage, RecoveryData, RootInfo, RootMetadata,
},
state_replication::StateComputer,
util::time_service::TimeService,
};
use anyhow::{bail, ensure, format_err, Context};
use consensus_types::{
block::Block, executed_block::ExecutedBlock, quorum_cert::QuorumCert,
sync_info::SyncInfo, timeout_certificate::TimeoutCertificate,
};
use diem_crypto::HashValue;
use diem_infallible::RwLock;
use diem_logger::prelude::*;
use diem_types::{
ledger_info::LedgerInfoWithSignatures, transaction::TransactionStatus,
};
use executor_types::{Error, StateComputeResult};
use pow_types::PowInterface;
use short_hex_str::AsShortHexStr;
use std::{collections::vec_deque::VecDeque, sync::Arc, time::Duration};
#[cfg(test)]
#[path = "block_store_test.rs"]
mod block_store_test;
#[cfg(test)]
#[path = "block_store_and_lec_recovery_test.rs"]
mod block_store_and_lec_recovery_test;
#[path = "sync_manager.rs"]
pub mod sync_manager;
fn update_counters_for_committed_blocks(
blocks_to_commit: &[Arc<ExecutedBlock>],
) {
for block in blocks_to_commit {
observe_block(block.block().timestamp_usecs(), BlockStage::COMMITTED);
let txn_status = block.compute_result().compute_status();
counters::NUM_TXNS_PER_BLOCK.observe(txn_status.len() as f64);
counters::COMMITTED_BLOCKS_COUNT.inc();
counters::LAST_COMMITTED_ROUND.set(block.round() as i64);
counters::LAST_COMMITTED_VERSION
.set(block.compute_result().num_leaves() as i64);
for status in txn_status.iter() {
match status {
TransactionStatus::Keep(_) => {
counters::COMMITTED_TXNS_COUNT
.with_label_values(&["success"])
.inc();
}
TransactionStatus::Discard(_) => {
counters::COMMITTED_TXNS_COUNT
.with_label_values(&["failed"])
.inc();
}
TransactionStatus::Retry => {
counters::COMMITTED_TXNS_COUNT
.with_label_values(&["retry"])
.inc();
}
}
}
}
}
pub struct BlockStore {
inner: Arc<RwLock<BlockTree>>,
state_computer: Arc<dyn StateComputer>,
storage: Arc<dyn PersistentLivenessStorage>,
time_service: Arc<dyn TimeService>,
pub pow_handler: Arc<dyn PowInterface>,
}
impl BlockStore {
pub fn new(
storage: Arc<dyn PersistentLivenessStorage>,
initial_data: RecoveryData, state_computer: Arc<dyn StateComputer>,
max_pruned_blocks_in_mem: usize, time_service: Arc<dyn TimeService>,
pow_handler: Arc<dyn PowInterface>,
) -> Self {
let highest_tc = initial_data.highest_timeout_certificate();
let (root, root_metadata, blocks, quorum_certs) = initial_data.take();
Self::build(
root,
root_metadata,
blocks,
quorum_certs,
highest_tc,
state_computer,
storage,
max_pruned_blocks_in_mem,
time_service,
pow_handler,
)
}
fn build(
root: RootInfo, root_metadata: RootMetadata, blocks: Vec<Block>,
quorum_certs: Vec<QuorumCert>,
highest_timeout_cert: Option<TimeoutCertificate>,
state_computer: Arc<dyn StateComputer>,
storage: Arc<dyn PersistentLivenessStorage>,
max_pruned_blocks_in_mem: usize, time_service: Arc<dyn TimeService>,
pow_handler: Arc<dyn PowInterface>,
) -> Self {
let RootInfo(root_block, root_qc, root_li) = root;
assert_eq!(
root_qc.certified_block().version(),
root_metadata.version(),
"root qc version {} doesn't match committed trees {}",
root_qc.certified_block().version(),
root_metadata.version(),
);
let result = StateComputeResult::new(
root_metadata.accu_hash,
root_metadata.frozen_root_hashes,
root_metadata.num_leaves, vec![], 0, None, vec![], vec![], root_metadata.pivot_decision,
);
diem_debug!("BlockStore root block result = {:?}", result);
let executed_root_block = ExecutedBlock::new(
root_block,
result,
);
let tree = BlockTree::new(
executed_root_block,
root_qc,
root_li,
max_pruned_blocks_in_mem,
highest_timeout_cert.map(Arc::new),
);
let block_store = Self {
inner: Arc::new(RwLock::new(tree)),
state_computer,
storage,
time_service,
pow_handler,
};
for block in blocks {
block_store
.execute_and_insert_block(block, true, true)
.unwrap_or_else(|e| {
panic!(
"[BlockStore] failed to insert block during build {:?}",
e
)
});
}
for qc in quorum_certs {
block_store
.insert_single_quorum_cert(qc)
.unwrap_or_else(|e| {
panic!(
"[BlockStore] failed to insert quorum during build{:?}",
e
)
});
}
counters::LAST_COMMITTED_ROUND.set(block_store.root().round() as i64);
counters::LAST_COMMITTED_VERSION
.set(block_store.root().compute_result().num_leaves() as i64);
block_store
}
pub async fn commit(
&self, finality_proof: LedgerInfoWithSignatures,
) -> anyhow::Result<()> {
let block_id_to_commit =
finality_proof.ledger_info().consensus_block_id();
diem_debug!("BlockStore::commit: id={}", block_id_to_commit);
let block_to_commit = self
.get_block(block_id_to_commit)
.ok_or_else(|| format_err!("Committed block id not found"))?;
if block_to_commit == self.root() {
diem_debug!("commit an committed block in sync");
return Ok(());
}
ensure!(
block_to_commit.round() > self.root().round(),
"Committed block round lower than root"
);
let blocks_to_commit = self
.path_from_root(block_id_to_commit)
.unwrap_or_else(Vec::new);
let ledger_blocks: Vec<Block> =
blocks_to_commit.iter().map(|b| b.block().clone()).collect();
self.storage
.save_ledger_blocks(ledger_blocks)
.expect("Failed to persist committed blocks");
self.state_computer
.commit(
blocks_to_commit.iter().map(|b| b.id()).collect(),
finality_proof,
)
.await
.expect("Failed to persist commit");
update_counters_for_committed_blocks(&blocks_to_commit);
let current_round = self.root().round();
let committed_round = block_to_commit.round();
diem_debug!(
LogSchema::new(LogEvent::CommitViaBlock).round(current_round),
committed_round = committed_round,
block_id = block_to_commit.id(),
);
event!("committed",
"block_id": block_to_commit.id().short_str(),
"epoch": block_to_commit.epoch(),
"round": committed_round,
"parent_id": block_to_commit.parent_id().short_str(),
);
self.prune_tree(block_to_commit.id());
Ok(())
}
pub fn execute_and_insert_block(
&self, block: Block, catch_up_mode: bool, force_compute: bool,
) -> anyhow::Result<Arc<ExecutedBlock>> {
diem_debug!("execute_and_insert_block: block={:?}", block.id());
if !force_compute {
if let Some(existing_block) = self.get_block(block.id()) {
return Ok(existing_block);
}
}
ensure!(
self.inner.read().root().round() < block.round(),
"Block with old round"
);
let executed_block = match self
.execute_block(block.clone(), catch_up_mode)
{
Ok(res) => Ok(res),
Err(Error::BlockNotFound(parent_block_id)) => {
let blocks_to_reexecute = self
.path_from_root(parent_block_id)
.unwrap_or_else(Vec::new);
for block in blocks_to_reexecute {
self.execute_block(block.block().clone(), catch_up_mode)?;
}
self.execute_block(block, catch_up_mode)
}
err => err,
}?;
let block_time =
Duration::from_micros(executed_block.timestamp_usecs());
self.time_service.wait_until(block_time);
self.storage
.save_tree(vec![executed_block.block().clone()], vec![])
.context("Insert block failed when saving block")?;
self.inner.write().insert_block(executed_block)
}
fn execute_block(
&self, block: Block, catch_up_mode: bool,
) -> anyhow::Result<ExecutedBlock, Error> {
let state_compute_result = self.state_computer.compute(
&block,
block.parent_id(),
catch_up_mode,
)?;
observe_block(block.timestamp_usecs(), BlockStage::EXECUTED);
Ok(ExecutedBlock::new(block, state_compute_result))
}
pub fn insert_single_quorum_cert(
&self, qc: QuorumCert,
) -> anyhow::Result<()> {
diem_debug!("insert_single_quorum_cert: qc={:?}", qc);
match self.get_block(qc.certified_block().id()) {
Some(executed_block) => {
ensure!(
executed_block.block_info() == *qc.certified_block(),
"QC for block {} has different {:?} than local {:?}",
qc.certified_block().id(),
qc.certified_block(),
executed_block.block_info()
);
observe_block(
executed_block.block().timestamp_usecs(),
BlockStage::QC_ADDED,
);
}
None => {
bail!("Insert {} without having the block in store first", qc)
}
};
self.storage
.save_tree(vec![], vec![qc.clone()])
.context("Insert block failed when saving quorum")?;
self.inner.write().insert_quorum_cert(qc)
}
pub fn insert_timeout_certificate(
&self, tc: Arc<TimeoutCertificate>,
) -> anyhow::Result<()> {
let cur_tc_round =
self.highest_timeout_cert().map_or(0, |tc| tc.round());
if tc.round() <= cur_tc_round {
return Ok(());
}
self.storage
.save_highest_timeout_cert(tc.as_ref().clone())
.context(
"Timeout certificate insert failed when persisting to DB",
)?;
self.inner.write().replace_timeout_cert(tc);
Ok(())
}
fn prune_tree(&self, next_root_id: HashValue) -> VecDeque<HashValue> {
let id_to_remove = self.inner.read().find_blocks_to_prune(next_root_id);
if let Err(e) = self
.storage
.prune_tree(id_to_remove.clone().into_iter().collect())
{
diem_error!(error = ?e, "fail to delete block");
}
self.inner
.write()
.process_pruned_blocks(next_root_id, id_to_remove.clone());
id_to_remove
}
}
impl BlockReader for BlockStore {
fn block_exists(&self, block_id: HashValue) -> bool {
self.inner.read().block_exists(&block_id)
}
fn get_block(&self, block_id: HashValue) -> Option<Arc<ExecutedBlock>> {
self.inner.read().get_block(&block_id)
}
fn get_ledger_block(
&self, block_id: &HashValue,
) -> anyhow::Result<Option<Block>> {
self.storage.get_ledger_block(block_id)
}
fn root(&self) -> Arc<ExecutedBlock> { self.inner.read().root() }
fn get_quorum_cert_for_block(
&self, block_id: HashValue,
) -> Option<Arc<QuorumCert>> {
self.inner.read().get_quorum_cert_for_block(&block_id)
}
fn path_from_root(
&self, block_id: HashValue,
) -> Option<Vec<Arc<ExecutedBlock>>> {
self.inner.read().path_from_root(block_id)
}
fn highest_certified_block(&self) -> Arc<ExecutedBlock> {
self.inner.read().highest_certified_block()
}
fn highest_quorum_cert(&self) -> Arc<QuorumCert> {
self.inner.read().highest_quorum_cert()
}
fn highest_commit_cert(&self) -> Arc<QuorumCert> {
self.inner.read().highest_commit_cert()
}
fn highest_timeout_cert(&self) -> Option<Arc<TimeoutCertificate>> {
self.inner.read().highest_timeout_cert()
}
fn sync_info(&self) -> SyncInfo {
SyncInfo::new(
self.highest_quorum_cert().as_ref().clone(),
self.highest_commit_cert().as_ref().clone(),
self.highest_timeout_cert().map(|tc| tc.as_ref().clone()),
)
}
}
#[cfg(any(test, feature = "fuzzing"))]
#[allow(unused)]
impl BlockStore {
pub(crate) fn len(&self) -> usize { self.inner.read().len() }
pub(crate) fn child_links(&self) -> usize {
self.inner.read().child_links()
}
pub(super) fn pruned_blocks_in_mem(&self) -> usize {
self.inner.read().pruned_blocks_in_mem()
}
pub fn insert_block_with_qc(
&self, block: Block,
) -> anyhow::Result<Arc<ExecutedBlock>> {
self.insert_single_quorum_cert(block.quorum_cert().clone())?;
self.execute_and_insert_block(block, false, false)
}
}