cached_pos_ledger_db/speculation_cache/
mod.rs1#[cfg(test)]
23mod test;
24
25use crate::logging::{LogEntry, LogSchema};
26use anyhow::{format_err, Result};
27use consensus_types::block::Block;
28use diem_crypto::{hash::PRE_GENESIS_BLOCK_ID, HashValue};
29use diem_logger::prelude::*;
30use diem_types::{
31 ledger_info::LedgerInfo, term_state::PosState, transaction::Transaction,
32};
33use executor_types::{Error, ExecutedTrees, ProcessedVMOutput};
34use parking_lot::Mutex;
35use std::{
36 collections::HashMap,
37 sync::{Arc, Weak},
38};
39use storage_interface::{StartupInfo, TreeState};
40
41pub struct SpeculationBlock {
44 id: HashValue,
46 transactions: Vec<Transaction>,
48 children: Vec<Arc<Mutex<SpeculationBlock>>>,
50 output: ProcessedVMOutput,
52 block_map: Arc<Mutex<HashMap<HashValue, Weak<Mutex<SpeculationBlock>>>>>,
55}
56
57impl SpeculationBlock {
58 pub fn new(
59 id: HashValue, transactions: Vec<Transaction>,
60 output: ProcessedVMOutput,
61 block_map: Arc<
62 Mutex<HashMap<HashValue, Weak<Mutex<SpeculationBlock>>>>,
63 >,
64 ) -> Self {
65 Self {
66 id,
67 transactions,
68 children: vec![],
69 output,
70 block_map,
71 }
72 }
73
74 pub fn id(&self) -> HashValue { self.id }
75
76 pub fn transactions(&self) -> &Vec<Transaction> { &self.transactions }
77
78 pub fn add_child(&mut self, child: Arc<Mutex<SpeculationBlock>>) {
79 self.children.push(child)
80 }
81
82 pub fn output(&self) -> &ProcessedVMOutput { &self.output }
83
84 pub fn replace(
85 &mut self, transactions: Vec<Transaction>, output: ProcessedVMOutput,
86 ) {
87 self.transactions = transactions;
88 self.output = output;
89 self.children = vec![];
90 }
91
92 pub fn replace_pos_state(&mut self, new_pos_state: PosState) {
93 self.output.replace_pos_state(new_pos_state)
94 }
95}
96
97impl Drop for SpeculationBlock {
99 fn drop(&mut self) {
100 self.block_map.lock().remove(&self.id()).expect(
101 "Speculation block must exist in block_map before being dropped.",
102 );
103 diem_debug!(
104 LogSchema::new(LogEntry::SpeculationCache).block_id(self.id()),
105 "Block dropped"
106 );
107 }
108}
109
110pub struct SpeculationCache {
118 synced_trees: ExecutedTrees,
119 committed_trees: ExecutedTrees,
120 committed_txns: Vec<Transaction>,
121 committed_block_id: HashValue,
123 heads: Vec<Arc<Mutex<SpeculationBlock>>>,
125 block_map: Arc<Mutex<HashMap<HashValue, Weak<Mutex<SpeculationBlock>>>>>,
128}
129
130impl SpeculationCache {
131 pub fn new() -> Self {
132 Self {
133 synced_trees: ExecutedTrees::new_empty(),
134 committed_trees: ExecutedTrees::new_empty(),
135 committed_txns: vec![],
136 heads: vec![],
137 block_map: Arc::new(Mutex::new(HashMap::new())),
138 committed_block_id: *PRE_GENESIS_BLOCK_ID,
139 }
140 }
141
142 pub fn new_with_startup_info(startup_info: StartupInfo) -> Self {
143 let mut cache = Self::new();
144 let ledger_info = startup_info.latest_ledger_info.ledger_info();
145 let committed_trees = ExecutedTrees::new_with_pos_state(
146 startup_info.committed_tree_state,
147 startup_info.committed_pos_state,
148 );
149 cache.update_block_tree_root(
150 committed_trees,
151 ledger_info,
152 vec![], );
154 if let Some(synced_tree_state) = startup_info.synced_tree_state {
155 cache.update_synced_trees(ExecutedTrees::from(synced_tree_state));
158 }
159 cache
160 }
161
162 pub fn new_for_db_bootstrapping(
163 tree_state: TreeState, pos_state: PosState,
164 ) -> Self {
165 let executor_trees =
168 ExecutedTrees::new_with_pos_state(tree_state, pos_state);
169 Self {
170 synced_trees: executor_trees.clone(),
171 committed_trees: executor_trees,
172 committed_txns: vec![],
173 heads: vec![],
174 block_map: Arc::new(Mutex::new(HashMap::new())),
175 committed_block_id: *PRE_GENESIS_BLOCK_ID,
176 }
177 }
178
179 pub fn committed_txns(&self) -> Vec<Transaction> {
180 self.committed_txns.clone()
181 }
182
183 pub fn committed_block_id(&self) -> HashValue { self.committed_block_id }
184
185 pub fn committed_trees(&self) -> &ExecutedTrees { &self.committed_trees }
186
187 pub fn synced_trees(&self) -> &ExecutedTrees { &self.synced_trees }
188
189 pub fn update_block_tree_root(
190 &mut self, mut committed_trees: ExecutedTrees,
191 committed_ledger_info: &LedgerInfo, committed_txns: Vec<Transaction>,
192 ) {
193 let new_root_block_id = if committed_ledger_info.ends_epoch() {
194 let id = Block::make_genesis_block_from_ledger_info(
197 committed_ledger_info,
198 )
199 .id();
200 diem_info!(
201 LogSchema::new(LogEntry::SpeculationCache)
202 .root_block_id(id)
203 .original_reconfiguration_block_id(committed_ledger_info.consensus_block_id()),
204 "Updated with a new root block as a virtual block of reconfiguration block"
205 );
206 committed_trees.set_pos_state_skipped(false);
207 id
208 } else {
209 let id = committed_ledger_info.consensus_block_id();
210 diem_info!(
211 LogSchema::new(LogEntry::SpeculationCache).root_block_id(id),
212 "Updated with a new root block",
213 );
214 id
215 };
216 self.committed_block_id = new_root_block_id;
217 self.committed_trees = committed_trees.clone();
218 self.committed_txns = committed_txns;
219 self.synced_trees = committed_trees;
220 }
221
222 pub fn update_synced_trees(&mut self, new_trees: ExecutedTrees) {
223 self.synced_trees = new_trees;
224 }
225
226 pub fn reset(&mut self) {
227 self.heads = vec![];
228 *self.block_map.lock() = HashMap::new();
229 }
230
231 pub fn add_block(
232 &mut self, parent_block_id: HashValue,
233 block: (
234 HashValue, Vec<Transaction>, ProcessedVMOutput, ),
238 ) -> Result<(), Error> {
239 let (block_id, txns, output) = block;
241
242 let old_block = self
244 .block_map
245 .lock()
246 .get(&block_id)
247 .map(|b| {
248 b.upgrade().ok_or_else(|| {
249 format_err!(
250 "block {:x} has been deallocated. Something went wrong.",
251 block_id
252 )
253 })
254 })
255 .transpose()?;
256
257 if let Some(old_block) = old_block {
258 old_block.lock().replace(txns, output);
259 return Ok(());
260 }
261
262 let new_block = Arc::new(Mutex::new(SpeculationBlock::new(
263 block_id,
264 txns,
265 output,
266 Arc::clone(&self.block_map),
267 )));
268 self.block_map
270 .lock()
271 .insert(block_id, Arc::downgrade(&new_block));
272 if parent_block_id == self.committed_block_id() {
274 self.heads.push(new_block);
275 } else {
276 self.get_block(&parent_block_id)?
277 .lock()
278 .add_child(new_block);
279 }
280 Ok(())
281 }
282
283 pub fn prune(
285 &mut self, committed_ledger_info: &LedgerInfo,
286 committed_txns: Vec<Transaction>,
287 ) -> Result<HashValue, Error> {
288 let old_committed_root = self.committed_block_id;
289 let arc_latest_committed_block =
290 self.get_block(&committed_ledger_info.consensus_block_id())?;
291 let latest_committed_block = arc_latest_committed_block.lock();
292 self.heads = latest_committed_block.children.clone();
293 self.update_block_tree_root(
294 latest_committed_block.output().executed_trees().clone(),
295 committed_ledger_info,
296 committed_txns,
297 );
298 Ok(old_committed_root)
299 }
300
301 pub fn get_block(
303 &self, block_id: &HashValue,
304 ) -> Result<Arc<Mutex<SpeculationBlock>>, Error> {
305 Ok(self
306 .block_map
307 .lock()
308 .get(&block_id)
309 .ok_or_else(|| Error::BlockNotFound(*block_id))?
310 .upgrade()
311 .ok_or_else(|| {
312 format_err!(
313 "block {:x} has been deallocated. Something went wrong.",
314 block_id
315 )
316 })?)
317 }
318}