cached_pos_ledger_db/speculation_cache/
mod.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8//! In a leader based consensus algorithm, each participant maintains a block
9//! tree that looks like the following in the executor:
10//! ```text
11//!  Height      5      6      7      ...
12//!
13//! Committed -> B5  -> B6  -> B7
14//!         |
15//!         └--> B5' -> B6' -> B7'
16//!                     |
17//!                     └----> B7"
18//! ```
19//! This module implements `SpeculationCache` that is an in-memory
20//! representation of this tree.
21
22#[cfg(test)]
23mod test;
24
25use crate::logging::{LogEntry, LogSchema};
26use anyhow::{format_err, Result};
27use consensus_types::block::Block;
28use diem_crypto::{hash::PRE_GENESIS_BLOCK_ID, HashValue};
29use diem_logger::prelude::*;
30use diem_types::{
31    ledger_info::LedgerInfo, term_state::PosState, transaction::Transaction,
32};
33use executor_types::{Error, ExecutedTrees, ProcessedVMOutput};
34use parking_lot::Mutex;
35use std::{
36    collections::HashMap,
37    sync::{Arc, Weak},
38};
39use storage_interface::{StartupInfo, TreeState};
40
41/// The struct that stores all speculation result of its counterpart in
42/// consensus.
43pub struct SpeculationBlock {
44    // The block id of which the output is computed from.
45    id: HashValue,
46    // The transactions in the block.
47    transactions: Vec<Transaction>,
48    // The pointers to all the children blocks.
49    children: Vec<Arc<Mutex<SpeculationBlock>>>,
50    // The speculative execution result.
51    output: ProcessedVMOutput,
52    // A pointer to the global block map keyed by id to achieve O(1) lookup
53    // time complexity.
54    block_map: Arc<Mutex<HashMap<HashValue, Weak<Mutex<SpeculationBlock>>>>>,
55}
56
57impl SpeculationBlock {
58    pub fn new(
59        id: HashValue, transactions: Vec<Transaction>,
60        output: ProcessedVMOutput,
61        block_map: Arc<
62            Mutex<HashMap<HashValue, Weak<Mutex<SpeculationBlock>>>>,
63        >,
64    ) -> Self {
65        Self {
66            id,
67            transactions,
68            children: vec![],
69            output,
70            block_map,
71        }
72    }
73
74    pub fn id(&self) -> HashValue { self.id }
75
76    pub fn transactions(&self) -> &Vec<Transaction> { &self.transactions }
77
78    pub fn add_child(&mut self, child: Arc<Mutex<SpeculationBlock>>) {
79        self.children.push(child)
80    }
81
82    pub fn output(&self) -> &ProcessedVMOutput { &self.output }
83
84    pub fn replace(
85        &mut self, transactions: Vec<Transaction>, output: ProcessedVMOutput,
86    ) {
87        self.transactions = transactions;
88        self.output = output;
89        self.children = vec![];
90    }
91
92    pub fn replace_pos_state(&mut self, new_pos_state: PosState) {
93        self.output.replace_pos_state(new_pos_state)
94    }
95}
96
97/// drop() will clean the current block entry from the global map.
98impl Drop for SpeculationBlock {
99    fn drop(&mut self) {
100        self.block_map.lock().remove(&self.id()).expect(
101            "Speculation block must exist in block_map before being dropped.",
102        );
103        diem_debug!(
104            LogSchema::new(LogEntry::SpeculationCache).block_id(self.id()),
105            "Block dropped"
106        );
107    }
108}
109
110/// SpeculationCache implements the block tree structure. The tree is
111/// represented by a root block id, all the children of root and a global block
112/// map. Each block is an `Arc<Mutex<SpeculationBlock>>` with ref_count = 1. For
113/// the chidren of the root, the sole owner is `heads`. For the rest, the sole
114/// owner is their parent block. So when a block is dropped, all its descendants
115/// will be dropped recursively. In the meanwhile, wheir entries in the block
116/// map will be removed by each block's drop().
117pub struct SpeculationCache {
118    synced_trees: ExecutedTrees,
119    committed_trees: ExecutedTrees,
120    committed_txns: Vec<Transaction>,
121    // The id of root block.
122    committed_block_id: HashValue,
123    // The chidren of root block.
124    heads: Vec<Arc<Mutex<SpeculationBlock>>>,
125    // A pointer to the global block map keyed by id to achieve O(1) lookup
126    // time complexity. It is optional but an optimization.
127    block_map: Arc<Mutex<HashMap<HashValue, Weak<Mutex<SpeculationBlock>>>>>,
128}
129
130impl SpeculationCache {
131    pub fn new() -> Self {
132        Self {
133            synced_trees: ExecutedTrees::new_empty(),
134            committed_trees: ExecutedTrees::new_empty(),
135            committed_txns: vec![],
136            heads: vec![],
137            block_map: Arc::new(Mutex::new(HashMap::new())),
138            committed_block_id: *PRE_GENESIS_BLOCK_ID,
139        }
140    }
141
142    pub fn new_with_startup_info(startup_info: StartupInfo) -> Self {
143        let mut cache = Self::new();
144        let ledger_info = startup_info.latest_ledger_info.ledger_info();
145        let committed_trees = ExecutedTrees::new_with_pos_state(
146            startup_info.committed_tree_state,
147            startup_info.committed_pos_state,
148        );
149        cache.update_block_tree_root(
150            committed_trees,
151            ledger_info,
152            vec![], /* latest_committed_txns */
153        );
154        if let Some(synced_tree_state) = startup_info.synced_tree_state {
155            // TODO(lpl): synced_tree_state.pos_state is left unhandled since
156            // this is not used.
157            cache.update_synced_trees(ExecutedTrees::from(synced_tree_state));
158        }
159        cache
160    }
161
162    pub fn new_for_db_bootstrapping(
163        tree_state: TreeState, pos_state: PosState,
164    ) -> Self {
165        // The DB-bootstrapper applies genesis txn on a local DB and create a
166        // waypoint, assuming everything is synced and committed.
167        let executor_trees =
168            ExecutedTrees::new_with_pos_state(tree_state, pos_state);
169        Self {
170            synced_trees: executor_trees.clone(),
171            committed_trees: executor_trees,
172            committed_txns: vec![],
173            heads: vec![],
174            block_map: Arc::new(Mutex::new(HashMap::new())),
175            committed_block_id: *PRE_GENESIS_BLOCK_ID,
176        }
177    }
178
179    pub fn committed_txns(&self) -> Vec<Transaction> {
180        self.committed_txns.clone()
181    }
182
183    pub fn committed_block_id(&self) -> HashValue { self.committed_block_id }
184
185    pub fn committed_trees(&self) -> &ExecutedTrees { &self.committed_trees }
186
187    pub fn synced_trees(&self) -> &ExecutedTrees { &self.synced_trees }
188
189    pub fn update_block_tree_root(
190        &mut self, mut committed_trees: ExecutedTrees,
191        committed_ledger_info: &LedgerInfo, committed_txns: Vec<Transaction>,
192    ) {
193        let new_root_block_id = if committed_ledger_info.ends_epoch() {
194            // Update the root block id with reconfig virtual block id, to be
195            // consistent with the logic of Consensus.
196            let id = Block::make_genesis_block_from_ledger_info(
197                committed_ledger_info,
198            )
199            .id();
200            diem_info!(
201                LogSchema::new(LogEntry::SpeculationCache)
202                    .root_block_id(id)
203                    .original_reconfiguration_block_id(committed_ledger_info.consensus_block_id()),
204                "Updated with a new root block as a virtual block of reconfiguration block"
205            );
206            committed_trees.set_pos_state_skipped(false);
207            id
208        } else {
209            let id = committed_ledger_info.consensus_block_id();
210            diem_info!(
211                LogSchema::new(LogEntry::SpeculationCache).root_block_id(id),
212                "Updated with a new root block",
213            );
214            id
215        };
216        self.committed_block_id = new_root_block_id;
217        self.committed_trees = committed_trees.clone();
218        self.committed_txns = committed_txns;
219        self.synced_trees = committed_trees;
220    }
221
222    pub fn update_synced_trees(&mut self, new_trees: ExecutedTrees) {
223        self.synced_trees = new_trees;
224    }
225
226    pub fn reset(&mut self) {
227        self.heads = vec![];
228        *self.block_map.lock() = HashMap::new();
229    }
230
231    pub fn add_block(
232        &mut self, parent_block_id: HashValue,
233        block: (
234            HashValue,         /* block id */
235            Vec<Transaction>,  /* block transactions */
236            ProcessedVMOutput, /* block execution output */
237        ),
238    ) -> Result<(), Error> {
239        // Check existence first
240        let (block_id, txns, output) = block;
241
242        // If block is re-executed, update it.
243        let old_block = self
244            .block_map
245            .lock()
246            .get(&block_id)
247            .map(|b| {
248                b.upgrade().ok_or_else(|| {
249                    format_err!(
250                        "block {:x} has been deallocated. Something went wrong.",
251                        block_id
252                    )
253                })
254            })
255            .transpose()?;
256
257        if let Some(old_block) = old_block {
258            old_block.lock().replace(txns, output);
259            return Ok(());
260        }
261
262        let new_block = Arc::new(Mutex::new(SpeculationBlock::new(
263            block_id,
264            txns,
265            output,
266            Arc::clone(&self.block_map),
267        )));
268        // Add to the map
269        self.block_map
270            .lock()
271            .insert(block_id, Arc::downgrade(&new_block));
272        // Add to the tree
273        if parent_block_id == self.committed_block_id() {
274            self.heads.push(new_block);
275        } else {
276            self.get_block(&parent_block_id)?
277                .lock()
278                .add_child(new_block);
279        }
280        Ok(())
281    }
282
283    /// Return the previous committed block id.
284    pub fn prune(
285        &mut self, committed_ledger_info: &LedgerInfo,
286        committed_txns: Vec<Transaction>,
287    ) -> Result<HashValue, Error> {
288        let old_committed_root = self.committed_block_id;
289        let arc_latest_committed_block =
290            self.get_block(&committed_ledger_info.consensus_block_id())?;
291        let latest_committed_block = arc_latest_committed_block.lock();
292        self.heads = latest_committed_block.children.clone();
293        self.update_block_tree_root(
294            latest_committed_block.output().executed_trees().clone(),
295            committed_ledger_info,
296            committed_txns,
297        );
298        Ok(old_committed_root)
299    }
300
301    // This function is intended to be called internally.
302    pub fn get_block(
303        &self, block_id: &HashValue,
304    ) -> Result<Arc<Mutex<SpeculationBlock>>, Error> {
305        Ok(self
306            .block_map
307            .lock()
308            .get(&block_id)
309            .ok_or_else(|| Error::BlockNotFound(*block_id))?
310            .upgrade()
311            .ok_or_else(|| {
312                format_err!(
313                    "block {:x} has been deallocated. Something went wrong.",
314                    block_id
315                )
316            })?)
317    }
318}