#[cfg(test)]
mod test;
use crate::logging::{LogEntry, LogSchema};
use anyhow::{format_err, Result};
use consensus_types::block::Block;
use diem_crypto::{hash::PRE_GENESIS_BLOCK_ID, HashValue};
use diem_infallible::Mutex;
use diem_logger::prelude::*;
use diem_types::{
contract_event::ContractEvent, ledger_info::LedgerInfo,
term_state::PosState, transaction::Transaction,
};
use executor_types::{Error, ExecutedTrees, ProcessedVMOutput};
use std::{
collections::HashMap,
sync::{Arc, Weak},
};
use storage_interface::{StartupInfo, TreeState};
pub struct SpeculationBlock {
id: HashValue,
transactions: Vec<Transaction>,
children: Vec<Arc<Mutex<SpeculationBlock>>>,
output: ProcessedVMOutput,
block_map: Arc<Mutex<HashMap<HashValue, Weak<Mutex<SpeculationBlock>>>>>,
}
impl SpeculationBlock {
pub fn new(
id: HashValue, transactions: Vec<Transaction>,
output: ProcessedVMOutput,
block_map: Arc<
Mutex<HashMap<HashValue, Weak<Mutex<SpeculationBlock>>>>,
>,
) -> Self {
Self {
id,
transactions,
children: vec![],
output,
block_map,
}
}
pub fn id(&self) -> HashValue { self.id }
pub fn transactions(&self) -> &Vec<Transaction> { &self.transactions }
pub fn add_child(&mut self, child: Arc<Mutex<SpeculationBlock>>) {
self.children.push(child)
}
pub fn output(&self) -> &ProcessedVMOutput { &self.output }
pub fn replace(
&mut self, transactions: Vec<Transaction>, output: ProcessedVMOutput,
) {
self.transactions = transactions;
self.output = output;
self.children = vec![];
}
pub fn replace_pos_state(&mut self, new_pos_state: PosState) {
self.output.replace_pos_state(new_pos_state)
}
}
impl Drop for SpeculationBlock {
fn drop(&mut self) {
self.block_map.lock().remove(&self.id()).expect(
"Speculation block must exist in block_map before being dropped.",
);
diem_debug!(
LogSchema::new(LogEntry::SpeculationCache).block_id(self.id()),
"Block dropped"
);
}
}
pub struct SpeculationCache {
synced_trees: ExecutedTrees,
committed_trees: ExecutedTrees,
committed_txns_and_events: (Vec<Transaction>, Vec<ContractEvent>),
committed_block_id: HashValue,
heads: Vec<Arc<Mutex<SpeculationBlock>>>,
block_map: Arc<Mutex<HashMap<HashValue, Weak<Mutex<SpeculationBlock>>>>>,
}
impl SpeculationCache {
pub fn new() -> Self {
Self {
synced_trees: ExecutedTrees::new_empty(),
committed_trees: ExecutedTrees::new_empty(),
committed_txns_and_events: (vec![], vec![]),
heads: vec![],
block_map: Arc::new(Mutex::new(HashMap::new())),
committed_block_id: *PRE_GENESIS_BLOCK_ID,
}
}
pub fn new_with_startup_info(startup_info: StartupInfo) -> Self {
let mut cache = Self::new();
let ledger_info = startup_info.latest_ledger_info.ledger_info();
let committed_trees = ExecutedTrees::new_with_pos_state(
startup_info.committed_tree_state,
startup_info.committed_pos_state,
);
cache.update_block_tree_root(
committed_trees,
ledger_info,
vec![], vec![], );
if let Some(synced_tree_state) = startup_info.synced_tree_state {
cache.update_synced_trees(ExecutedTrees::from(synced_tree_state));
}
cache
}
pub fn new_for_db_bootstrapping(
tree_state: TreeState, pos_state: PosState,
) -> Self {
let executor_trees =
ExecutedTrees::new_with_pos_state(tree_state, pos_state);
Self {
synced_trees: executor_trees.clone(),
committed_trees: executor_trees,
committed_txns_and_events: (vec![], vec![]),
heads: vec![],
block_map: Arc::new(Mutex::new(HashMap::new())),
committed_block_id: *PRE_GENESIS_BLOCK_ID,
}
}
pub fn committed_txns_and_events(
&self,
) -> (Vec<Transaction>, Vec<ContractEvent>) {
self.committed_txns_and_events.clone()
}
pub fn committed_block_id(&self) -> HashValue { self.committed_block_id }
pub fn committed_trees(&self) -> &ExecutedTrees { &self.committed_trees }
pub fn synced_trees(&self) -> &ExecutedTrees { &self.synced_trees }
pub fn update_block_tree_root(
&mut self, mut committed_trees: ExecutedTrees,
committed_ledger_info: &LedgerInfo, committed_txns: Vec<Transaction>,
reconfig_events: Vec<ContractEvent>,
) {
let new_root_block_id = if committed_ledger_info.ends_epoch() {
let id = Block::make_genesis_block_from_ledger_info(
committed_ledger_info,
)
.id();
diem_info!(
LogSchema::new(LogEntry::SpeculationCache)
.root_block_id(id)
.original_reconfiguration_block_id(committed_ledger_info.consensus_block_id()),
"Updated with a new root block as a virtual block of reconfiguration block"
);
committed_trees.set_pos_state_skipped(false);
id
} else {
let id = committed_ledger_info.consensus_block_id();
diem_info!(
LogSchema::new(LogEntry::SpeculationCache).root_block_id(id),
"Updated with a new root block",
);
id
};
self.committed_block_id = new_root_block_id;
self.committed_trees = committed_trees.clone();
self.committed_txns_and_events = (committed_txns, reconfig_events);
self.synced_trees = committed_trees;
}
pub fn update_synced_trees(&mut self, new_trees: ExecutedTrees) {
self.synced_trees = new_trees;
}
pub fn reset(&mut self) {
self.heads = vec![];
*self.block_map.lock() = HashMap::new();
}
pub fn add_block(
&mut self, parent_block_id: HashValue,
block: (
HashValue, Vec<Transaction>, ProcessedVMOutput, ),
) -> Result<(), Error> {
let (block_id, txns, output) = block;
let old_block = self
.block_map
.lock()
.get(&block_id)
.map(|b| {
b.upgrade().ok_or_else(|| {
format_err!(
"block {:x} has been deallocated. Something went wrong.",
block_id
)
})
})
.transpose()?;
if let Some(old_block) = old_block {
old_block.lock().replace(txns, output);
return Ok(());
}
let new_block = Arc::new(Mutex::new(SpeculationBlock::new(
block_id,
txns,
output,
Arc::clone(&self.block_map),
)));
self.block_map
.lock()
.insert(block_id, Arc::downgrade(&new_block));
if parent_block_id == self.committed_block_id() {
self.heads.push(new_block);
} else {
self.get_block(&parent_block_id)?
.lock()
.add_child(new_block);
}
Ok(())
}
pub fn prune(
&mut self, committed_ledger_info: &LedgerInfo,
committed_txns: Vec<Transaction>, reconfig_events: Vec<ContractEvent>,
) -> Result<HashValue, Error> {
let old_committed_root = self.committed_block_id;
let arc_latest_committed_block =
self.get_block(&committed_ledger_info.consensus_block_id())?;
let latest_committed_block = arc_latest_committed_block.lock();
self.heads = latest_committed_block.children.clone();
self.update_block_tree_root(
latest_committed_block.output().executed_trees().clone(),
committed_ledger_info,
committed_txns,
reconfig_events,
);
Ok(old_committed_root)
}
pub fn get_block(
&self, block_id: &HashValue,
) -> Result<Arc<Mutex<SpeculationBlock>>, Error> {
Ok(self
.block_map
.lock()
.get(&block_id)
.ok_or_else(|| Error::BlockNotFound(*block_id))?
.upgrade()
.ok_or_else(|| {
format_err!(
"block {:x} has been deallocated. Something went wrong.",
block_id
)
})?)
}
}