cached_pos_ledger_db/
lib.rs1use std::sync::Arc;
6
7use anyhow::{format_err, Result};
8
9use diem_crypto::HashValue;
10use diem_logger::prelude::*;
11use diem_types::{
12 account_address::AccountAddress,
13 block_info::PivotBlockDecision,
14 ledger_info::LedgerInfo,
15 term_state::{NodeID, PosState},
16 transaction::Transaction,
17};
18use executor_types::{Error, ExecutedTrees, ProcessedVMOutput};
19use parking_lot::Mutex;
20pub use speculation_cache::{SpeculationBlock, SpeculationCache};
21use storage_interface::{DbReaderWriter, TreeState};
22
23mod logging;
24mod speculation_cache;
25
26pub struct CachedPosLedgerDB {
27 pub db: DbReaderWriter,
28 pub cache: Mutex<SpeculationCache>,
29}
30
31impl CachedPosLedgerDB {
32 pub fn new(db: DbReaderWriter) -> Self {
33 let startup_info = db
34 .reader
35 .get_startup_info(true)
36 .expect("Shouldn't fail")
37 .expect("DB not bootstrapped.");
38
39 Self {
40 db,
41 cache: Mutex::new(SpeculationCache::new_with_startup_info(
42 startup_info,
43 )),
44 }
45 }
46
47 fn get_executed_trees(
48 &self, block_id: HashValue,
49 ) -> Result<ExecutedTrees, Error> {
50 diem_debug!(
51 "get_executed_trees:{} {}",
52 block_id,
53 self.cache.lock().committed_block_id()
54 );
55 let executed_trees =
56 if block_id == self.cache.lock().committed_block_id() {
57 self.cache.lock().committed_trees().clone()
58 } else {
59 self.get_block(&block_id)?
60 .lock()
61 .output()
62 .executed_trees()
63 .clone()
64 };
65
66 Ok(executed_trees)
67 }
68
69 pub fn get_pos_state(
70 &self, block_id: &HashValue,
71 ) -> Result<PosState, Error> {
72 if let Ok(executed_tree) = self.get_executed_trees(*block_id) {
73 Ok(executed_tree.pos_state().clone())
74 } else {
75 self.db.reader.get_pos_state(block_id).map_err(|_| {
76 Error::InternalError {
77 error: "pos state not found".to_string(),
78 }
79 })
80 }
81 }
82
83 pub fn reset_cache(&self) -> Result<(), Error> {
84 let startup_info = self
85 .db
86 .reader
87 .get_startup_info(true)?
88 .ok_or_else(|| format_err!("DB not bootstrapped."))?;
89 *(self.cache.lock()) =
90 SpeculationCache::new_with_startup_info(startup_info);
91 Ok(())
92 }
93
94 pub fn new_on_unbootstrapped_db(
95 db: DbReaderWriter, tree_state: TreeState, initial_seed: Vec<u8>,
96 initial_nodes: Vec<(NodeID, u64)>,
97 initial_committee: Vec<(AccountAddress, u64)>,
98 genesis_pivot_decision: Option<PivotBlockDecision>,
99 ) -> Self {
100 let genesis_pivot_decision =
132 genesis_pivot_decision.unwrap_or(PivotBlockDecision {
133 block_hash: Default::default(),
134 height: 0,
135 });
136 let pos_state = PosState::new(
137 initial_seed,
138 initial_nodes,
139 initial_committee,
140 genesis_pivot_decision,
141 );
142 Self {
143 db,
144 cache: Mutex::new(SpeculationCache::new_for_db_bootstrapping(
145 tree_state, pos_state,
146 )),
147 }
148 }
149
150 pub fn committed_block_id(&self) -> HashValue {
151 return self.cache.lock().committed_block_id();
152 }
153
154 pub fn update_block_tree_root(
155 &self, committed_trees: ExecutedTrees,
156 committed_ledger_info: &LedgerInfo, committed_txns: Vec<Transaction>,
157 ) {
158 self.cache.lock().update_block_tree_root(
159 committed_trees,
160 committed_ledger_info,
161 committed_txns,
162 )
163 }
164
165 pub fn update_synced_trees(&self, new_trees: ExecutedTrees) {
166 self.cache.lock().update_synced_trees(new_trees)
167 }
168
169 pub fn add_block(
170 &self, parent_block_id: HashValue,
171 block: (
172 HashValue, Vec<Transaction>, ProcessedVMOutput, ),
176 ) -> Result<(), Error> {
177 self.cache.lock().add_block(parent_block_id, block)
178 }
179
180 pub fn reset(&self) { self.cache.lock().reset() }
181
182 pub fn prune(
183 &self, committed_ledger_info: &LedgerInfo,
184 committed_txns: Vec<Transaction>,
185 ) -> Result<HashValue, Error> {
186 self.cache
187 .lock()
188 .prune(committed_ledger_info, committed_txns)
189 }
190
191 pub fn get_block(
192 &self, block_id: &HashValue,
193 ) -> Result<Arc<Mutex<SpeculationBlock>>, Error> {
194 self.cache.lock().get_block(block_id)
195 }
196}