cached_pos_ledger_db/speculation_cache/
mod.rs1#[cfg(test)]
23mod test;
24
25use crate::logging::{LogEntry, LogSchema};
26use anyhow::{format_err, Result};
27use consensus_types::block::Block;
28use diem_crypto::{hash::PRE_GENESIS_BLOCK_ID, HashValue};
29use diem_infallible::Mutex;
30use diem_logger::prelude::*;
31use diem_types::{
32 contract_event::ContractEvent, ledger_info::LedgerInfo,
33 term_state::PosState, transaction::Transaction,
34};
35use executor_types::{Error, ExecutedTrees, ProcessedVMOutput};
36use std::{
37 collections::HashMap,
38 sync::{Arc, Weak},
39};
40use storage_interface::{StartupInfo, TreeState};
41
42pub struct SpeculationBlock {
45 id: HashValue,
47 transactions: Vec<Transaction>,
49 children: Vec<Arc<Mutex<SpeculationBlock>>>,
51 output: ProcessedVMOutput,
53 block_map: Arc<Mutex<HashMap<HashValue, Weak<Mutex<SpeculationBlock>>>>>,
56}
57
58impl SpeculationBlock {
59 pub fn new(
60 id: HashValue, transactions: Vec<Transaction>,
61 output: ProcessedVMOutput,
62 block_map: Arc<
63 Mutex<HashMap<HashValue, Weak<Mutex<SpeculationBlock>>>>,
64 >,
65 ) -> Self {
66 Self {
67 id,
68 transactions,
69 children: vec![],
70 output,
71 block_map,
72 }
73 }
74
75 pub fn id(&self) -> HashValue { self.id }
76
77 pub fn transactions(&self) -> &Vec<Transaction> { &self.transactions }
78
79 pub fn add_child(&mut self, child: Arc<Mutex<SpeculationBlock>>) {
80 self.children.push(child)
81 }
82
83 pub fn output(&self) -> &ProcessedVMOutput { &self.output }
84
85 pub fn replace(
86 &mut self, transactions: Vec<Transaction>, output: ProcessedVMOutput,
87 ) {
88 self.transactions = transactions;
89 self.output = output;
90 self.children = vec![];
91 }
92
93 pub fn replace_pos_state(&mut self, new_pos_state: PosState) {
94 self.output.replace_pos_state(new_pos_state)
95 }
96}
97
98impl Drop for SpeculationBlock {
100 fn drop(&mut self) {
101 self.block_map.lock().remove(&self.id()).expect(
102 "Speculation block must exist in block_map before being dropped.",
103 );
104 diem_debug!(
105 LogSchema::new(LogEntry::SpeculationCache).block_id(self.id()),
106 "Block dropped"
107 );
108 }
109}
110
111pub struct SpeculationCache {
119 synced_trees: ExecutedTrees,
120 committed_trees: ExecutedTrees,
121 committed_txns_and_events: (Vec<Transaction>, Vec<ContractEvent>),
122 committed_block_id: HashValue,
124 heads: Vec<Arc<Mutex<SpeculationBlock>>>,
126 block_map: Arc<Mutex<HashMap<HashValue, Weak<Mutex<SpeculationBlock>>>>>,
129}
130
131impl SpeculationCache {
132 pub fn new() -> Self {
133 Self {
134 synced_trees: ExecutedTrees::new_empty(),
135 committed_trees: ExecutedTrees::new_empty(),
136 committed_txns_and_events: (vec![], vec![]),
137 heads: vec![],
138 block_map: Arc::new(Mutex::new(HashMap::new())),
139 committed_block_id: *PRE_GENESIS_BLOCK_ID,
140 }
141 }
142
143 pub fn new_with_startup_info(startup_info: StartupInfo) -> Self {
144 let mut cache = Self::new();
145 let ledger_info = startup_info.latest_ledger_info.ledger_info();
146 let committed_trees = ExecutedTrees::new_with_pos_state(
147 startup_info.committed_tree_state,
148 startup_info.committed_pos_state,
149 );
150 cache.update_block_tree_root(
151 committed_trees,
152 ledger_info,
153 vec![], vec![], );
156 if let Some(synced_tree_state) = startup_info.synced_tree_state {
157 cache.update_synced_trees(ExecutedTrees::from(synced_tree_state));
160 }
161 cache
162 }
163
164 pub fn new_for_db_bootstrapping(
165 tree_state: TreeState, pos_state: PosState,
166 ) -> Self {
167 let executor_trees =
170 ExecutedTrees::new_with_pos_state(tree_state, pos_state);
171 Self {
172 synced_trees: executor_trees.clone(),
173 committed_trees: executor_trees,
174 committed_txns_and_events: (vec![], vec![]),
175 heads: vec![],
176 block_map: Arc::new(Mutex::new(HashMap::new())),
177 committed_block_id: *PRE_GENESIS_BLOCK_ID,
178 }
179 }
180
181 pub fn committed_txns_and_events(
182 &self,
183 ) -> (Vec<Transaction>, Vec<ContractEvent>) {
184 self.committed_txns_and_events.clone()
185 }
186
187 pub fn committed_block_id(&self) -> HashValue { self.committed_block_id }
188
189 pub fn committed_trees(&self) -> &ExecutedTrees { &self.committed_trees }
190
191 pub fn synced_trees(&self) -> &ExecutedTrees { &self.synced_trees }
192
193 pub fn update_block_tree_root(
194 &mut self, mut committed_trees: ExecutedTrees,
195 committed_ledger_info: &LedgerInfo, committed_txns: Vec<Transaction>,
196 reconfig_events: Vec<ContractEvent>,
197 ) {
198 let new_root_block_id = if committed_ledger_info.ends_epoch() {
199 let id = Block::make_genesis_block_from_ledger_info(
202 committed_ledger_info,
203 )
204 .id();
205 diem_info!(
206 LogSchema::new(LogEntry::SpeculationCache)
207 .root_block_id(id)
208 .original_reconfiguration_block_id(committed_ledger_info.consensus_block_id()),
209 "Updated with a new root block as a virtual block of reconfiguration block"
210 );
211 committed_trees.set_pos_state_skipped(false);
212 id
213 } else {
214 let id = committed_ledger_info.consensus_block_id();
215 diem_info!(
216 LogSchema::new(LogEntry::SpeculationCache).root_block_id(id),
217 "Updated with a new root block",
218 );
219 id
220 };
221 self.committed_block_id = new_root_block_id;
222 self.committed_trees = committed_trees.clone();
223 self.committed_txns_and_events = (committed_txns, reconfig_events);
224 self.synced_trees = committed_trees;
225 }
226
227 pub fn update_synced_trees(&mut self, new_trees: ExecutedTrees) {
228 self.synced_trees = new_trees;
229 }
230
231 pub fn reset(&mut self) {
232 self.heads = vec![];
233 *self.block_map.lock() = HashMap::new();
234 }
235
236 pub fn add_block(
237 &mut self, parent_block_id: HashValue,
238 block: (
239 HashValue, Vec<Transaction>, ProcessedVMOutput, ),
243 ) -> Result<(), Error> {
244 let (block_id, txns, output) = block;
246
247 let old_block = self
249 .block_map
250 .lock()
251 .get(&block_id)
252 .map(|b| {
253 b.upgrade().ok_or_else(|| {
254 format_err!(
255 "block {:x} has been deallocated. Something went wrong.",
256 block_id
257 )
258 })
259 })
260 .transpose()?;
261
262 if let Some(old_block) = old_block {
263 old_block.lock().replace(txns, output);
264 return Ok(());
265 }
266
267 let new_block = Arc::new(Mutex::new(SpeculationBlock::new(
268 block_id,
269 txns,
270 output,
271 Arc::clone(&self.block_map),
272 )));
273 self.block_map
275 .lock()
276 .insert(block_id, Arc::downgrade(&new_block));
277 if parent_block_id == self.committed_block_id() {
279 self.heads.push(new_block);
280 } else {
281 self.get_block(&parent_block_id)?
282 .lock()
283 .add_child(new_block);
284 }
285 Ok(())
286 }
287
288 pub fn prune(
290 &mut self, committed_ledger_info: &LedgerInfo,
291 committed_txns: Vec<Transaction>, reconfig_events: Vec<ContractEvent>,
292 ) -> Result<HashValue, Error> {
293 let old_committed_root = self.committed_block_id;
294 let arc_latest_committed_block =
295 self.get_block(&committed_ledger_info.consensus_block_id())?;
296 let latest_committed_block = arc_latest_committed_block.lock();
297 self.heads = latest_committed_block.children.clone();
298 self.update_block_tree_root(
299 latest_committed_block.output().executed_trees().clone(),
300 committed_ledger_info,
301 committed_txns,
302 reconfig_events,
303 );
304 Ok(old_committed_root)
305 }
306
307 pub fn get_block(
309 &self, block_id: &HashValue,
310 ) -> Result<Arc<Mutex<SpeculationBlock>>, Error> {
311 Ok(self
312 .block_map
313 .lock()
314 .get(&block_id)
315 .ok_or_else(|| Error::BlockNotFound(*block_id))?
316 .upgrade()
317 .ok_or_else(|| {
318 format_err!(
319 "block {:x} has been deallocated. Something went wrong.",
320 block_id
321 )
322 })?)
323 }
324}