cached_pos_ledger_db/speculation_cache/
mod.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8//! In a leader based consensus algorithm, each participant maintains a block
9//! tree that looks like the following in the executor:
10//! ```text
11//!  Height      5      6      7      ...
12//!
13//! Committed -> B5  -> B6  -> B7
14//!         |
15//!         └--> B5' -> B6' -> B7'
16//!                     |
17//!                     └----> B7"
18//! ```
19//! This module implements `SpeculationCache` that is an in-memory
20//! representation of this tree.
21
22#[cfg(test)]
23mod test;
24
25use crate::logging::{LogEntry, LogSchema};
26use anyhow::{format_err, Result};
27use consensus_types::block::Block;
28use diem_crypto::{hash::PRE_GENESIS_BLOCK_ID, HashValue};
29use diem_infallible::Mutex;
30use diem_logger::prelude::*;
31use diem_types::{
32    contract_event::ContractEvent, ledger_info::LedgerInfo,
33    term_state::PosState, transaction::Transaction,
34};
35use executor_types::{Error, ExecutedTrees, ProcessedVMOutput};
36use std::{
37    collections::HashMap,
38    sync::{Arc, Weak},
39};
40use storage_interface::{StartupInfo, TreeState};
41
42/// The struct that stores all speculation result of its counterpart in
43/// consensus.
44pub struct SpeculationBlock {
45    // The block id of which the output is computed from.
46    id: HashValue,
47    // The transactions in the block.
48    transactions: Vec<Transaction>,
49    // The pointers to all the children blocks.
50    children: Vec<Arc<Mutex<SpeculationBlock>>>,
51    // The speculative execution result.
52    output: ProcessedVMOutput,
53    // A pointer to the global block map keyed by id to achieve O(1) lookup
54    // time complexity.
55    block_map: Arc<Mutex<HashMap<HashValue, Weak<Mutex<SpeculationBlock>>>>>,
56}
57
58impl SpeculationBlock {
59    pub fn new(
60        id: HashValue, transactions: Vec<Transaction>,
61        output: ProcessedVMOutput,
62        block_map: Arc<
63            Mutex<HashMap<HashValue, Weak<Mutex<SpeculationBlock>>>>,
64        >,
65    ) -> Self {
66        Self {
67            id,
68            transactions,
69            children: vec![],
70            output,
71            block_map,
72        }
73    }
74
75    pub fn id(&self) -> HashValue { self.id }
76
77    pub fn transactions(&self) -> &Vec<Transaction> { &self.transactions }
78
79    pub fn add_child(&mut self, child: Arc<Mutex<SpeculationBlock>>) {
80        self.children.push(child)
81    }
82
83    pub fn output(&self) -> &ProcessedVMOutput { &self.output }
84
85    pub fn replace(
86        &mut self, transactions: Vec<Transaction>, output: ProcessedVMOutput,
87    ) {
88        self.transactions = transactions;
89        self.output = output;
90        self.children = vec![];
91    }
92
93    pub fn replace_pos_state(&mut self, new_pos_state: PosState) {
94        self.output.replace_pos_state(new_pos_state)
95    }
96}
97
98/// drop() will clean the current block entry from the global map.
99impl Drop for SpeculationBlock {
100    fn drop(&mut self) {
101        self.block_map.lock().remove(&self.id()).expect(
102            "Speculation block must exist in block_map before being dropped.",
103        );
104        diem_debug!(
105            LogSchema::new(LogEntry::SpeculationCache).block_id(self.id()),
106            "Block dropped"
107        );
108    }
109}
110
111/// SpeculationCache implements the block tree structure. The tree is
112/// represented by a root block id, all the children of root and a global block
113/// map. Each block is an `Arc<Mutex<SpeculationBlock>>` with ref_count = 1. For
114/// the chidren of the root, the sole owner is `heads`. For the rest, the sole
115/// owner is their parent block. So when a block is dropped, all its descendants
116/// will be dropped recursively. In the meanwhile, wheir entries in the block
117/// map will be removed by each block's drop().
118pub struct SpeculationCache {
119    synced_trees: ExecutedTrees,
120    committed_trees: ExecutedTrees,
121    committed_txns_and_events: (Vec<Transaction>, Vec<ContractEvent>),
122    // The id of root block.
123    committed_block_id: HashValue,
124    // The chidren of root block.
125    heads: Vec<Arc<Mutex<SpeculationBlock>>>,
126    // A pointer to the global block map keyed by id to achieve O(1) lookup
127    // time complexity. It is optional but an optimization.
128    block_map: Arc<Mutex<HashMap<HashValue, Weak<Mutex<SpeculationBlock>>>>>,
129}
130
131impl SpeculationCache {
132    pub fn new() -> Self {
133        Self {
134            synced_trees: ExecutedTrees::new_empty(),
135            committed_trees: ExecutedTrees::new_empty(),
136            committed_txns_and_events: (vec![], vec![]),
137            heads: vec![],
138            block_map: Arc::new(Mutex::new(HashMap::new())),
139            committed_block_id: *PRE_GENESIS_BLOCK_ID,
140        }
141    }
142
143    pub fn new_with_startup_info(startup_info: StartupInfo) -> Self {
144        let mut cache = Self::new();
145        let ledger_info = startup_info.latest_ledger_info.ledger_info();
146        let committed_trees = ExecutedTrees::new_with_pos_state(
147            startup_info.committed_tree_state,
148            startup_info.committed_pos_state,
149        );
150        cache.update_block_tree_root(
151            committed_trees,
152            ledger_info,
153            vec![], /* lastest_committed_txns */
154            vec![], /* latest_reconfig_events */
155        );
156        if let Some(synced_tree_state) = startup_info.synced_tree_state {
157            // TODO(lpl): synced_tree_state.pos_state is left unhandled since
158            // this is not used.
159            cache.update_synced_trees(ExecutedTrees::from(synced_tree_state));
160        }
161        cache
162    }
163
164    pub fn new_for_db_bootstrapping(
165        tree_state: TreeState, pos_state: PosState,
166    ) -> Self {
167        // The DB-bootstrapper applies genesis txn on a local DB and create a
168        // waypoint, assuming everything is synced and committed.
169        let executor_trees =
170            ExecutedTrees::new_with_pos_state(tree_state, pos_state);
171        Self {
172            synced_trees: executor_trees.clone(),
173            committed_trees: executor_trees,
174            committed_txns_and_events: (vec![], vec![]),
175            heads: vec![],
176            block_map: Arc::new(Mutex::new(HashMap::new())),
177            committed_block_id: *PRE_GENESIS_BLOCK_ID,
178        }
179    }
180
181    pub fn committed_txns_and_events(
182        &self,
183    ) -> (Vec<Transaction>, Vec<ContractEvent>) {
184        self.committed_txns_and_events.clone()
185    }
186
187    pub fn committed_block_id(&self) -> HashValue { self.committed_block_id }
188
189    pub fn committed_trees(&self) -> &ExecutedTrees { &self.committed_trees }
190
191    pub fn synced_trees(&self) -> &ExecutedTrees { &self.synced_trees }
192
193    pub fn update_block_tree_root(
194        &mut self, mut committed_trees: ExecutedTrees,
195        committed_ledger_info: &LedgerInfo, committed_txns: Vec<Transaction>,
196        reconfig_events: Vec<ContractEvent>,
197    ) {
198        let new_root_block_id = if committed_ledger_info.ends_epoch() {
199            // Update the root block id with reconfig virtual block id, to be
200            // consistent with the logic of Consensus.
201            let id = Block::make_genesis_block_from_ledger_info(
202                committed_ledger_info,
203            )
204            .id();
205            diem_info!(
206                LogSchema::new(LogEntry::SpeculationCache)
207                    .root_block_id(id)
208                    .original_reconfiguration_block_id(committed_ledger_info.consensus_block_id()),
209                "Updated with a new root block as a virtual block of reconfiguration block"
210            );
211            committed_trees.set_pos_state_skipped(false);
212            id
213        } else {
214            let id = committed_ledger_info.consensus_block_id();
215            diem_info!(
216                LogSchema::new(LogEntry::SpeculationCache).root_block_id(id),
217                "Updated with a new root block",
218            );
219            id
220        };
221        self.committed_block_id = new_root_block_id;
222        self.committed_trees = committed_trees.clone();
223        self.committed_txns_and_events = (committed_txns, reconfig_events);
224        self.synced_trees = committed_trees;
225    }
226
227    pub fn update_synced_trees(&mut self, new_trees: ExecutedTrees) {
228        self.synced_trees = new_trees;
229    }
230
231    pub fn reset(&mut self) {
232        self.heads = vec![];
233        *self.block_map.lock() = HashMap::new();
234    }
235
236    pub fn add_block(
237        &mut self, parent_block_id: HashValue,
238        block: (
239            HashValue,         /* block id */
240            Vec<Transaction>,  /* block transactions */
241            ProcessedVMOutput, /* block execution output */
242        ),
243    ) -> Result<(), Error> {
244        // Check existence first
245        let (block_id, txns, output) = block;
246
247        // If block is re-executed, update it.
248        let old_block = self
249            .block_map
250            .lock()
251            .get(&block_id)
252            .map(|b| {
253                b.upgrade().ok_or_else(|| {
254                    format_err!(
255                        "block {:x} has been deallocated. Something went wrong.",
256                        block_id
257                    )
258                })
259            })
260            .transpose()?;
261
262        if let Some(old_block) = old_block {
263            old_block.lock().replace(txns, output);
264            return Ok(());
265        }
266
267        let new_block = Arc::new(Mutex::new(SpeculationBlock::new(
268            block_id,
269            txns,
270            output,
271            Arc::clone(&self.block_map),
272        )));
273        // Add to the map
274        self.block_map
275            .lock()
276            .insert(block_id, Arc::downgrade(&new_block));
277        // Add to the tree
278        if parent_block_id == self.committed_block_id() {
279            self.heads.push(new_block);
280        } else {
281            self.get_block(&parent_block_id)?
282                .lock()
283                .add_child(new_block);
284        }
285        Ok(())
286    }
287
288    /// Return the previous committed block id.
289    pub fn prune(
290        &mut self, committed_ledger_info: &LedgerInfo,
291        committed_txns: Vec<Transaction>, reconfig_events: Vec<ContractEvent>,
292    ) -> Result<HashValue, Error> {
293        let old_committed_root = self.committed_block_id;
294        let arc_latest_committed_block =
295            self.get_block(&committed_ledger_info.consensus_block_id())?;
296        let latest_committed_block = arc_latest_committed_block.lock();
297        self.heads = latest_committed_block.children.clone();
298        self.update_block_tree_root(
299            latest_committed_block.output().executed_trees().clone(),
300            committed_ledger_info,
301            committed_txns,
302            reconfig_events,
303        );
304        Ok(old_committed_root)
305    }
306
307    // This function is intended to be called internally.
308    pub fn get_block(
309        &self, block_id: &HashValue,
310    ) -> Result<Arc<Mutex<SpeculationBlock>>, Error> {
311        Ok(self
312            .block_map
313            .lock()
314            .get(&block_id)
315            .ok_or_else(|| Error::BlockNotFound(*block_id))?
316            .upgrade()
317            .ok_or_else(|| {
318                format_err!(
319                    "block {:x} has been deallocated. Something went wrong.",
320                    block_id
321                )
322            })?)
323    }
324}