cached_pos_ledger_db/
lib.rs

1// Copyright 2020 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use std::sync::Arc;
6
7use anyhow::{format_err, Result};
8
9use diem_crypto::HashValue;
10use diem_logger::prelude::*;
11use diem_types::{
12    account_address::AccountAddress,
13    block_info::PivotBlockDecision,
14    ledger_info::LedgerInfo,
15    term_state::{NodeID, PosState},
16    transaction::Transaction,
17};
18use executor_types::{Error, ExecutedTrees, ProcessedVMOutput};
19use parking_lot::Mutex;
20pub use speculation_cache::{SpeculationBlock, SpeculationCache};
21use storage_interface::{DbReaderWriter, TreeState};
22
23mod logging;
24mod speculation_cache;
25
26pub struct CachedPosLedgerDB {
27    pub db: DbReaderWriter,
28    pub cache: Mutex<SpeculationCache>,
29}
30
31impl CachedPosLedgerDB {
32    pub fn new(db: DbReaderWriter) -> Self {
33        let startup_info = db
34            .reader
35            .get_startup_info(true)
36            .expect("Shouldn't fail")
37            .expect("DB not bootstrapped.");
38
39        Self {
40            db,
41            cache: Mutex::new(SpeculationCache::new_with_startup_info(
42                startup_info,
43            )),
44        }
45    }
46
47    fn get_executed_trees(
48        &self, block_id: HashValue,
49    ) -> Result<ExecutedTrees, Error> {
50        diem_debug!(
51            "get_executed_trees:{} {}",
52            block_id,
53            self.cache.lock().committed_block_id()
54        );
55        let executed_trees =
56            if block_id == self.cache.lock().committed_block_id() {
57                self.cache.lock().committed_trees().clone()
58            } else {
59                self.get_block(&block_id)?
60                    .lock()
61                    .output()
62                    .executed_trees()
63                    .clone()
64            };
65
66        Ok(executed_trees)
67    }
68
69    pub fn get_pos_state(
70        &self, block_id: &HashValue,
71    ) -> Result<PosState, Error> {
72        if let Ok(executed_tree) = self.get_executed_trees(*block_id) {
73            Ok(executed_tree.pos_state().clone())
74        } else {
75            self.db.reader.get_pos_state(block_id).map_err(|_| {
76                Error::InternalError {
77                    error: "pos state not found".to_string(),
78                }
79            })
80        }
81    }
82
83    pub fn reset_cache(&self) -> Result<(), Error> {
84        let startup_info = self
85            .db
86            .reader
87            .get_startup_info(true)?
88            .ok_or_else(|| format_err!("DB not bootstrapped."))?;
89        *(self.cache.lock()) =
90            SpeculationCache::new_with_startup_info(startup_info);
91        Ok(())
92    }
93
94    pub fn new_on_unbootstrapped_db(
95        db: DbReaderWriter, tree_state: TreeState, initial_seed: Vec<u8>,
96        initial_nodes: Vec<(NodeID, u64)>,
97        initial_committee: Vec<(AccountAddress, u64)>,
98        genesis_pivot_decision: Option<PivotBlockDecision>,
99    ) -> Self {
100        // if initial_nodes.is_empty() {
101        //     let access_paths = ON_CHAIN_CONFIG_REGISTRY
102        //         .iter()
103        //         .map(|config_id| config_id.access_path())
104        //         .collect();
105        //     let configs = db
106        //         .reader
107        //         .as_ref()
108        //         .batch_fetch_resources_by_version(access_paths, 0)
109        //         .unwrap();
110        //     let validators: ValidatorSet = OnChainConfigPayload::new(
111        //         0,
112        //         Arc::new(
113        //             ON_CHAIN_CONFIG_REGISTRY
114        //                 .iter()
115        //                 .cloned()
116        //                 .zip_eq(configs)
117        //                 .collect(),
118        //         ),
119        //     )
120        //     .get()
121        //     .unwrap();
122        //     for node in validators {
123        //         let node_id = NodeID::new(
124        //             node.consensus_public_key().clone(),
125        //             node.vrf_public_key().clone().unwrap(),
126        //         );
127        //         initial_nodes.push((node_id, node.consensus_voting_power()));
128        //     }
129        // }
130        // TODO(lpl): The default value is only for pos-tool.
131        let genesis_pivot_decision =
132            genesis_pivot_decision.unwrap_or(PivotBlockDecision {
133                block_hash: Default::default(),
134                height: 0,
135            });
136        let pos_state = PosState::new(
137            initial_seed,
138            initial_nodes,
139            initial_committee,
140            genesis_pivot_decision,
141        );
142        Self {
143            db,
144            cache: Mutex::new(SpeculationCache::new_for_db_bootstrapping(
145                tree_state, pos_state,
146            )),
147        }
148    }
149
150    pub fn committed_block_id(&self) -> HashValue {
151        return self.cache.lock().committed_block_id();
152    }
153
154    pub fn update_block_tree_root(
155        &self, committed_trees: ExecutedTrees,
156        committed_ledger_info: &LedgerInfo, committed_txns: Vec<Transaction>,
157    ) {
158        self.cache.lock().update_block_tree_root(
159            committed_trees,
160            committed_ledger_info,
161            committed_txns,
162        )
163    }
164
165    pub fn update_synced_trees(&self, new_trees: ExecutedTrees) {
166        self.cache.lock().update_synced_trees(new_trees)
167    }
168
169    pub fn add_block(
170        &self, parent_block_id: HashValue,
171        block: (
172            HashValue,         /* block id */
173            Vec<Transaction>,  /* block transactions */
174            ProcessedVMOutput, /* block execution output */
175        ),
176    ) -> Result<(), Error> {
177        self.cache.lock().add_block(parent_block_id, block)
178    }
179
180    pub fn reset(&self) { self.cache.lock().reset() }
181
182    pub fn prune(
183        &self, committed_ledger_info: &LedgerInfo,
184        committed_txns: Vec<Transaction>,
185    ) -> Result<HashValue, Error> {
186        self.cache
187            .lock()
188            .prune(committed_ledger_info, committed_txns)
189    }
190
191    pub fn get_block(
192        &self, block_id: &HashValue,
193    ) -> Result<Arc<Mutex<SpeculationBlock>>, Error> {
194        self.cache.lock().get_block(block_id)
195    }
196}