cfxcore/pos/consensus/consensusdb/
mod.rs1#[cfg(test)]
9mod consensusdb_test;
10mod schema;
11
12use crate::pos::consensus::{
13 consensusdb::schema::{
14 block::BlockSchema,
15 ledger_block::LedgerBlockSchema,
16 quorum_certificate::QCSchema,
17 single_entry::{SingleEntryKey, SingleEntrySchema},
18 staking_event::StakingEventsSchema,
19 STAKING_EVENTS_CF_NAME,
20 },
21 error::DbError,
22};
23use anyhow::{anyhow, Result};
24use cfx_types::H256;
25use consensus_types::{
26 block::Block, db::LedgerBlockRW, quorum_cert::QuorumCert,
27};
28use diem_crypto::HashValue;
29use diem_logger::prelude::*;
30use diem_types::block_info::PivotBlockDecision;
31use pow_types::StakingEvent;
32use schema::{
33 BLOCK_CF_NAME, LEDGER_BLOCK_CF_NAME, QC_CF_NAME, SINGLE_ENTRY_CF_NAME,
34};
35use schemadb::{Options, ReadOptions, SchemaBatch, DB, DEFAULT_CF_NAME};
36use std::{collections::HashMap, iter::Iterator, path::Path, time::Instant};
37
38pub struct ConsensusDB {
40 db: DB,
41}
42
43impl ConsensusDB {
44 pub fn new<P: AsRef<Path> + Clone>(db_root_path: P) -> Self {
46 let column_families = vec![
47 DEFAULT_CF_NAME,
48 BLOCK_CF_NAME,
49 QC_CF_NAME,
50 SINGLE_ENTRY_CF_NAME,
51 LEDGER_BLOCK_CF_NAME,
52 STAKING_EVENTS_CF_NAME,
53 ];
54
55 let path = db_root_path.as_ref().join("consensusdb");
56 let instant = Instant::now();
57 let mut opts = Options::default();
58 opts.create_if_missing(true);
59 opts.create_missing_column_families(true);
60 let db = DB::open(path.clone(), "consensus", column_families, opts)
61 .expect("ConsensusDB open failed; unable to continue");
62
63 diem_info!(
64 "Opened ConsensusDB at {:?} in {} ms",
65 path,
66 instant.elapsed().as_millis()
67 );
68
69 Self { db }
70 }
71
72 pub fn get_data(
74 &self,
75 ) -> Result<(
76 Option<Vec<u8>>,
77 Option<Vec<u8>>,
78 Vec<Block>,
79 Vec<QuorumCert>,
80 )> {
81 let last_vote = self.get_last_vote()?;
82 let highest_timeout_certificate =
83 self.get_highest_timeout_certificate()?;
84 let consensus_blocks = self
85 .get_blocks()?
86 .into_iter()
87 .map(|(_block_hash, block_content)| block_content)
88 .collect::<Vec<_>>();
89 let consensus_qcs = self
90 .get_quorum_certificates()?
91 .into_iter()
92 .map(|(_block_hash, qc)| qc)
93 .collect::<Vec<_>>();
94 Ok((
95 last_vote,
96 highest_timeout_certificate,
97 consensus_blocks,
98 consensus_qcs,
99 ))
100 }
101
102 pub fn save_highest_timeout_certificate(
104 &self, highest_timeout_certificate: Vec<u8>,
105 ) -> Result<(), DbError> {
106 let mut batch = SchemaBatch::new();
107 batch.put::<SingleEntrySchema>(
108 &SingleEntryKey::HighestTimeoutCertificate,
109 &highest_timeout_certificate,
110 )?;
111 self.commit(batch, false)?;
112 Ok(())
113 }
114
115 pub fn save_vote(&self, last_vote: Vec<u8>) -> Result<(), DbError> {
117 let mut batch = SchemaBatch::new();
118 batch.put::<SingleEntrySchema>(
119 &SingleEntryKey::LastVoteMsg,
120 &last_vote,
121 )?;
122 self.commit(batch, false)
123 }
124
125 pub fn save_blocks_and_quorum_certificates(
127 &self, block_data: Vec<Block>, qc_data: Vec<QuorumCert>,
128 ) -> Result<(), DbError> {
129 if block_data.is_empty() && qc_data.is_empty() {
130 return Err(anyhow::anyhow!(
131 "Consensus block and qc data is empty!"
132 )
133 .into());
134 }
135 let mut batch = SchemaBatch::new();
136 block_data.iter().try_for_each(|block| {
137 batch.put::<BlockSchema>(&block.id(), block)
138 })?;
139 qc_data.iter().try_for_each(|qc| {
140 batch.put::<QCSchema>(&qc.certified_block().id(), qc)
141 })?;
142 self.commit(batch, false)
143 }
144
145 pub fn delete_blocks_and_quorum_certificates(
147 &self, block_ids: Vec<HashValue>,
148 ) -> Result<(), DbError> {
149 if block_ids.is_empty() {
150 return Err(anyhow::anyhow!("Consensus block ids is empty!").into());
151 }
152 let mut batch = SchemaBatch::new();
153 block_ids.iter().try_for_each(|hash| {
154 batch.delete::<BlockSchema>(hash)?;
155 batch.delete::<QCSchema>(hash)
156 })?;
157 self.commit(batch, false)
158 }
159
160 fn commit(
164 &self, batch: SchemaBatch, fast_write: bool,
165 ) -> Result<(), DbError> {
166 self.db.write_schemas(batch, fast_write)?;
167 Ok(())
168 }
169
170 fn get_highest_timeout_certificate(
173 &self,
174 ) -> Result<Option<Vec<u8>>, DbError> {
175 Ok(self.db.get::<SingleEntrySchema>(
176 &SingleEntryKey::HighestTimeoutCertificate,
177 )?)
178 }
179
180 pub fn delete_highest_timeout_certificate(&self) -> Result<(), DbError> {
182 let mut batch = SchemaBatch::new();
183 batch.delete::<SingleEntrySchema>(
184 &SingleEntryKey::HighestTimeoutCertificate,
185 )?;
186 self.commit(batch, false)
187 }
188
189 fn get_last_vote(&self) -> Result<Option<Vec<u8>>, DbError> {
191 Ok(self
192 .db
193 .get::<SingleEntrySchema>(&SingleEntryKey::LastVoteMsg)?)
194 }
195
196 pub fn delete_last_vote_msg(&self) -> Result<(), DbError> {
198 let mut batch = SchemaBatch::new();
199 batch.delete::<SingleEntrySchema>(&SingleEntryKey::LastVoteMsg)?;
200 self.commit(batch, false)?;
201 Ok(())
202 }
203
204 pub fn get_blocks(&self) -> Result<HashMap<HashValue, Block>, DbError> {
206 let mut iter = self.db.iter::<BlockSchema>(ReadOptions::default())?;
207 iter.seek_to_first();
208 Ok(iter.collect::<Result<HashMap<HashValue, Block>>>()?)
209 }
210
211 pub fn get_quorum_certificates(
213 &self,
214 ) -> Result<HashMap<HashValue, QuorumCert>, DbError> {
215 let mut iter = self.db.iter::<QCSchema>(ReadOptions::default())?;
216 iter.seek_to_first();
217 Ok(iter.collect::<Result<HashMap<HashValue, QuorumCert>>>()?)
218 }
219
220 pub fn put_staking_events(
222 &self, pow_epoch_number: u64, pow_epoch_hash: H256,
223 events: Vec<StakingEvent>,
224 ) -> Result<(), DbError> {
225 let mut batch = SchemaBatch::new();
226 batch.put::<StakingEventsSchema>(
227 &pow_epoch_number,
228 &(events, pow_epoch_hash),
229 )?;
230 self.commit(batch, true)
231 }
232
233 pub fn get_staking_events(
235 &self, parent_decision: PivotBlockDecision,
236 me_decision: PivotBlockDecision,
237 ) -> Result<Vec<StakingEvent>, DbError> {
238 diem_debug!(
239 "consensusdb::get_staking_events: parent={:?} me={:?}",
240 parent_decision,
241 me_decision
242 );
243 if parent_decision == me_decision {
244 return Ok(vec![]);
245 }
246 if me_decision.height <= parent_decision.height {
247 return Err(anyhow!("only forward querying allowed").into());
248 }
249 let mut read_opt = ReadOptions::default();
250 read_opt.set_iterate_lower_bound(
252 parent_decision.height.to_be_bytes().to_vec(),
253 );
254 read_opt.set_iterate_upper_bound(
256 (me_decision.height + 1).to_be_bytes().to_vec(),
257 );
258 let mut staking_events = Vec::with_capacity(
259 (me_decision.height - parent_decision.height + 1) as usize,
260 );
261 let mut iter = self.db.iter::<StakingEventsSchema>(read_opt)?;
262 iter.seek_to_first();
263 let mut expected_epoch_number = parent_decision.height;
264 for element in iter {
265 let (pow_epoch_number, (mut events, pow_epoch_hash)) = element?;
266 if pow_epoch_number != expected_epoch_number {
267 return Err(anyhow!(
268 "skipped staking events, expected={}, get={}",
269 expected_epoch_number,
270 pow_epoch_number
271 )
272 .into());
273 }
274 if pow_epoch_number == parent_decision.height
275 && pow_epoch_hash != parent_decision.block_hash
276 {
277 return Err(anyhow!("inconsistent parent epoch hash, height={} expected={:?}, get={:?}", pow_epoch_number, parent_decision.block_hash, pow_epoch_hash).into());
278 }
279 if pow_epoch_number == me_decision.height
280 && pow_epoch_hash != me_decision.block_hash
281 {
282 return Err(anyhow!("inconsistent me epoch hash, height={} expected={:?}, get={:?}", pow_epoch_number, me_decision.block_hash, pow_epoch_hash).into());
283 }
284 if pow_epoch_number != parent_decision.height {
285 staking_events.append(&mut events);
288 }
289 expected_epoch_number += 1;
290 }
291 if expected_epoch_number != me_decision.height + 1 {
292 return Err(anyhow!(
293 "incomplete staking events, reach height={} me_decision={:?}",
294 expected_epoch_number,
295 me_decision
296 )
297 .into());
298 }
299 diem_debug!(
300 "consensusdb::get_staking_events returns len={} ",
301 staking_events.len()
302 );
303 Ok(staking_events)
304 }
305
306 pub fn delete_staking_events_before(
309 &self, committed_pow_epoch_number: u64,
310 ) -> Result<(), DbError> {
311 self.db
312 .range_delete::<StakingEventsSchema, u64>(
313 &0,
314 &committed_pow_epoch_number,
315 )
316 .map_err(|e| e.into())
317 }
318}
319
320impl LedgerBlockRW for ConsensusDB {
321 fn get_ledger_block(&self, block_id: &HashValue) -> Result<Option<Block>> {
323 Ok(self.db.get::<LedgerBlockSchema>(block_id)?)
324 }
325
326 fn save_ledger_blocks(&self, blocks: Vec<Block>) -> Result<()> {
328 let mut batch = SchemaBatch::new();
329 for block in blocks {
330 batch.put::<LedgerBlockSchema>(&block.id(), &block)?;
331 }
332 Ok(self.commit(batch, true)?)
333 }
334
335 fn get_qc_for_block(
337 &self, block_id: &HashValue,
338 ) -> Result<Option<QuorumCert>> {
339 Ok(self.db.get::<QCSchema>(block_id)?)
340 }
341}