cfxcore/pos/consensus/consensusdb/
mod.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8#[cfg(test)]
9mod consensusdb_test;
10mod schema;
11
12use crate::pos::consensus::{
13    consensusdb::schema::{
14        block::BlockSchema,
15        ledger_block::LedgerBlockSchema,
16        quorum_certificate::QCSchema,
17        single_entry::{SingleEntryKey, SingleEntrySchema},
18        staking_event::StakingEventsSchema,
19        STAKING_EVENTS_CF_NAME,
20    },
21    error::DbError,
22};
23use anyhow::{anyhow, Result};
24use cfx_types::H256;
25use consensus_types::{
26    block::Block, db::LedgerBlockRW, quorum_cert::QuorumCert,
27};
28use diem_crypto::HashValue;
29use diem_logger::prelude::*;
30use diem_types::block_info::PivotBlockDecision;
31use pow_types::StakingEvent;
32use schema::{
33    BLOCK_CF_NAME, LEDGER_BLOCK_CF_NAME, QC_CF_NAME, SINGLE_ENTRY_CF_NAME,
34};
35use schemadb::{Options, ReadOptions, SchemaBatch, DB, DEFAULT_CF_NAME};
36use std::{collections::HashMap, iter::Iterator, path::Path, time::Instant};
37
38/// ConsensusDB
39pub struct ConsensusDB {
40    db: DB,
41}
42
43impl ConsensusDB {
44    /// new
45    pub fn new<P: AsRef<Path> + Clone>(db_root_path: P) -> Self {
46        let column_families = vec![
47            /* UNUSED CF = */ DEFAULT_CF_NAME,
48            BLOCK_CF_NAME,
49            QC_CF_NAME,
50            SINGLE_ENTRY_CF_NAME,
51            LEDGER_BLOCK_CF_NAME,
52            STAKING_EVENTS_CF_NAME,
53        ];
54
55        let path = db_root_path.as_ref().join("consensusdb");
56        let instant = Instant::now();
57        let mut opts = Options::default();
58        opts.create_if_missing(true);
59        opts.create_missing_column_families(true);
60        let db = DB::open(path.clone(), "consensus", column_families, opts)
61            .expect("ConsensusDB open failed; unable to continue");
62
63        diem_info!(
64            "Opened ConsensusDB at {:?} in {} ms",
65            path,
66            instant.elapsed().as_millis()
67        );
68
69        Self { db }
70    }
71
72    /// get_data
73    pub fn get_data(
74        &self,
75    ) -> Result<(
76        Option<Vec<u8>>,
77        Option<Vec<u8>>,
78        Vec<Block>,
79        Vec<QuorumCert>,
80    )> {
81        let last_vote = self.get_last_vote()?;
82        let highest_timeout_certificate =
83            self.get_highest_timeout_certificate()?;
84        let consensus_blocks = self
85            .get_blocks()?
86            .into_iter()
87            .map(|(_block_hash, block_content)| block_content)
88            .collect::<Vec<_>>();
89        let consensus_qcs = self
90            .get_quorum_certificates()?
91            .into_iter()
92            .map(|(_block_hash, qc)| qc)
93            .collect::<Vec<_>>();
94        Ok((
95            last_vote,
96            highest_timeout_certificate,
97            consensus_blocks,
98            consensus_qcs,
99        ))
100    }
101
102    /// save_highest_timeout_certificate
103    pub fn save_highest_timeout_certificate(
104        &self, highest_timeout_certificate: Vec<u8>,
105    ) -> Result<(), DbError> {
106        let mut batch = SchemaBatch::new();
107        batch.put::<SingleEntrySchema>(
108            &SingleEntryKey::HighestTimeoutCertificate,
109            &highest_timeout_certificate,
110        )?;
111        self.commit(batch, false)?;
112        Ok(())
113    }
114
115    /// save_vote
116    pub fn save_vote(&self, last_vote: Vec<u8>) -> Result<(), DbError> {
117        let mut batch = SchemaBatch::new();
118        batch.put::<SingleEntrySchema>(
119            &SingleEntryKey::LastVoteMsg,
120            &last_vote,
121        )?;
122        self.commit(batch, false)
123    }
124
125    /// save_blocks_and_quorum_certificates
126    pub fn save_blocks_and_quorum_certificates(
127        &self, block_data: Vec<Block>, qc_data: Vec<QuorumCert>,
128    ) -> Result<(), DbError> {
129        if block_data.is_empty() && qc_data.is_empty() {
130            return Err(anyhow::anyhow!(
131                "Consensus block and qc data is empty!"
132            )
133            .into());
134        }
135        let mut batch = SchemaBatch::new();
136        block_data.iter().try_for_each(|block| {
137            batch.put::<BlockSchema>(&block.id(), block)
138        })?;
139        qc_data.iter().try_for_each(|qc| {
140            batch.put::<QCSchema>(&qc.certified_block().id(), qc)
141        })?;
142        self.commit(batch, false)
143    }
144
145    /// delete_blocks_and_quorum_certificates
146    pub fn delete_blocks_and_quorum_certificates(
147        &self, block_ids: Vec<HashValue>,
148    ) -> Result<(), DbError> {
149        if block_ids.is_empty() {
150            return Err(anyhow::anyhow!("Consensus block ids is empty!").into());
151        }
152        let mut batch = SchemaBatch::new();
153        block_ids.iter().try_for_each(|hash| {
154            batch.delete::<BlockSchema>(hash)?;
155            batch.delete::<QCSchema>(hash)
156        })?;
157        self.commit(batch, false)
158    }
159
160    /// Write the whole schema batch including all data necessary to mutate the
161    /// ledger state of some transaction by leveraging rocksdb atomicity
162    /// support.
163    fn commit(
164        &self, batch: SchemaBatch, fast_write: bool,
165    ) -> Result<(), DbError> {
166        self.db.write_schemas(batch, fast_write)?;
167        Ok(())
168    }
169
170    /// Get latest timeout certificates (we only store the latest highest
171    /// timeout certificates).
172    fn get_highest_timeout_certificate(
173        &self,
174    ) -> Result<Option<Vec<u8>>, DbError> {
175        Ok(self.db.get::<SingleEntrySchema>(
176            &SingleEntryKey::HighestTimeoutCertificate,
177        )?)
178    }
179
180    /// Delete the timeout certificates
181    pub fn delete_highest_timeout_certificate(&self) -> Result<(), DbError> {
182        let mut batch = SchemaBatch::new();
183        batch.delete::<SingleEntrySchema>(
184            &SingleEntryKey::HighestTimeoutCertificate,
185        )?;
186        self.commit(batch, false)
187    }
188
189    /// Get serialized latest vote (if available)
190    fn get_last_vote(&self) -> Result<Option<Vec<u8>>, DbError> {
191        Ok(self
192            .db
193            .get::<SingleEntrySchema>(&SingleEntryKey::LastVoteMsg)?)
194    }
195
196    /// delete_last_vote_msg
197    pub fn delete_last_vote_msg(&self) -> Result<(), DbError> {
198        let mut batch = SchemaBatch::new();
199        batch.delete::<SingleEntrySchema>(&SingleEntryKey::LastVoteMsg)?;
200        self.commit(batch, false)?;
201        Ok(())
202    }
203
204    /// Get all consensus blocks.
205    pub fn get_blocks(&self) -> Result<HashMap<HashValue, Block>, DbError> {
206        let mut iter = self.db.iter::<BlockSchema>(ReadOptions::default())?;
207        iter.seek_to_first();
208        Ok(iter.collect::<Result<HashMap<HashValue, Block>>>()?)
209    }
210
211    /// Get all consensus QCs.
212    pub fn get_quorum_certificates(
213        &self,
214    ) -> Result<HashMap<HashValue, QuorumCert>, DbError> {
215        let mut iter = self.db.iter::<QCSchema>(ReadOptions::default())?;
216        iter.seek_to_first();
217        Ok(iter.collect::<Result<HashMap<HashValue, QuorumCert>>>()?)
218    }
219
220    /// Save pow staking events.
221    pub fn put_staking_events(
222        &self, pow_epoch_number: u64, pow_epoch_hash: H256,
223        events: Vec<StakingEvent>,
224    ) -> Result<(), DbError> {
225        let mut batch = SchemaBatch::new();
226        batch.put::<StakingEventsSchema>(
227            &pow_epoch_number,
228            &(events, pow_epoch_hash),
229        )?;
230        self.commit(batch, true)
231    }
232
233    /// Save staking events between two pivot decisions.
234    pub fn get_staking_events(
235        &self, parent_decision: PivotBlockDecision,
236        me_decision: PivotBlockDecision,
237    ) -> Result<Vec<StakingEvent>, DbError> {
238        diem_debug!(
239            "consensusdb::get_staking_events: parent={:?} me={:?}",
240            parent_decision,
241            me_decision
242        );
243        if parent_decision == me_decision {
244            return Ok(vec![]);
245        }
246        if me_decision.height <= parent_decision.height {
247            return Err(anyhow!("only forward querying allowed").into());
248        }
249        let mut read_opt = ReadOptions::default();
250        // lower bound is inclusive
251        read_opt.set_iterate_lower_bound(
252            parent_decision.height.to_be_bytes().to_vec(),
253        );
254        // upper bound is exclusive
255        read_opt.set_iterate_upper_bound(
256            (me_decision.height + 1).to_be_bytes().to_vec(),
257        );
258        let mut staking_events = Vec::with_capacity(
259            (me_decision.height - parent_decision.height + 1) as usize,
260        );
261        let mut iter = self.db.iter::<StakingEventsSchema>(read_opt)?;
262        iter.seek_to_first();
263        let mut expected_epoch_number = parent_decision.height;
264        for element in iter {
265            let (pow_epoch_number, (mut events, pow_epoch_hash)) = element?;
266            if pow_epoch_number != expected_epoch_number {
267                return Err(anyhow!(
268                    "skipped staking events, expected={}, get={}",
269                    expected_epoch_number,
270                    pow_epoch_number
271                )
272                .into());
273            }
274            if pow_epoch_number == parent_decision.height
275                && pow_epoch_hash != parent_decision.block_hash
276            {
277                return Err(anyhow!("inconsistent parent epoch hash, height={} expected={:?}, get={:?}", pow_epoch_number, parent_decision.block_hash, pow_epoch_hash).into());
278            }
279            if pow_epoch_number == me_decision.height
280                && pow_epoch_hash != me_decision.block_hash
281            {
282                return Err(anyhow!("inconsistent me epoch hash, height={} expected={:?}, get={:?}", pow_epoch_number, me_decision.block_hash, pow_epoch_hash).into());
283            }
284            if pow_epoch_number != parent_decision.height {
285                // Skip the events in parent_decision since they are in the
286                // previous pos block.
287                staking_events.append(&mut events);
288            }
289            expected_epoch_number += 1;
290        }
291        if expected_epoch_number != me_decision.height + 1 {
292            return Err(anyhow!(
293                "incomplete staking events, reach height={} me_decision={:?}",
294                expected_epoch_number,
295                me_decision
296            )
297            .into());
298        }
299        diem_debug!(
300            "consensusdb::get_staking_events returns len={} ",
301            staking_events.len()
302        );
303        Ok(staking_events)
304    }
305
306    /// Delete all staking events before an PoW epoch number after we have
307    /// committed a PoS block that processes this PoW pivot decision.
308    pub fn delete_staking_events_before(
309        &self, committed_pow_epoch_number: u64,
310    ) -> Result<(), DbError> {
311        self.db
312            .range_delete::<StakingEventsSchema, u64>(
313                &0,
314                &committed_pow_epoch_number,
315            )
316            .map_err(|e| e.into())
317    }
318}
319
320impl LedgerBlockRW for ConsensusDB {
321    /// get_ledger_block
322    fn get_ledger_block(&self, block_id: &HashValue) -> Result<Option<Block>> {
323        Ok(self.db.get::<LedgerBlockSchema>(block_id)?)
324    }
325
326    /// save_ledger_blocks
327    fn save_ledger_blocks(&self, blocks: Vec<Block>) -> Result<()> {
328        let mut batch = SchemaBatch::new();
329        for block in blocks {
330            batch.put::<LedgerBlockSchema>(&block.id(), &block)?;
331        }
332        Ok(self.commit(batch, true)?)
333    }
334
335    /// Get qc for not committed blocks.
336    fn get_qc_for_block(
337        &self, block_id: &HashValue,
338    ) -> Result<Option<QuorumCert>> {
339        Ok(self.db.get::<QCSchema>(block_id)?)
340    }
341}