1#![forbid(unsafe_code)]
9
10use std::{iter::Iterator, path::Path, sync::Arc, time::Instant};
18
19use anyhow::{ensure, Result};
20use itertools::{izip, zip_eq};
21
22use diem_config::config::RocksdbConfig;
23use diem_crypto::hash::{
24 CryptoHash, HashValue, SPARSE_MERKLE_PLACEHOLDER_HASH,
25};
26use diem_logger::prelude::*;
27use diem_types::{
28 committed_block::CommittedBlock,
29 contract_event::ContractEvent,
30 epoch_change::EpochChangeProof,
31 ledger_info::LedgerInfoWithSignatures,
32 reward_distribution_event::RewardDistributionEventV2,
33 term_state::PosState,
34 transaction::{Transaction, TransactionInfo, TransactionToCommit, Version},
35};
36#[cfg(feature = "fuzzing")]
37pub use diemdb_test::test_save_blocks_impl;
38use schemadb::{ColumnFamilyName, Options, DB, DEFAULT_CF_NAME};
39#[cfg(any(test, feature = "fuzzing"))]
40use storage_interface::Order;
41use storage_interface::{
42 DBReaderForPoW, DbReader, DbWriter, StartupInfo, TreeState,
43};
44
45use crate::{
46 change_set::{ChangeSet, SealedChangeSet},
47 event_store::EventStore,
48 ledger_store::LedgerStore,
49 schema::*,
50 transaction_store::TransactionStore,
51};
52use diem_types::block_metadata::BlockMetadata;
53
54#[cfg(any(test, feature = "fuzzing"))]
56pub mod test_helper;
57
58pub mod errors;
59pub mod schema;
60
61mod change_set;
62mod event_store;
63mod ledger_store;
64mod transaction_store;
65
66#[cfg(any(test, feature = "fuzzing"))]
67#[allow(dead_code)]
68mod diemdb_test;
69
70const MAX_NUM_EPOCH_ENDING_LEDGER_INFO: usize = 100;
74
75fn gen_rocksdb_options(config: &RocksdbConfig) -> Options {
76 let mut db_opts = Options::default();
77 db_opts.set_max_open_files(config.max_open_files);
78 db_opts.set_max_total_wal_size(config.max_total_wal_size);
79 db_opts
80}
81
82#[derive(Debug)]
85pub struct PosLedgerDB {
86 db: Arc<DB>,
87 ledger_store: Arc<LedgerStore>,
88 transaction_store: Arc<TransactionStore>,
89 event_store: Arc<EventStore>,
90}
91
92impl PosLedgerDB {
93 fn column_families() -> Vec<ColumnFamilyName> {
94 vec![
95 DEFAULT_CF_NAME,
96 EPOCH_BY_VERSION_CF_NAME,
97 EVENT_ACCUMULATOR_CF_NAME,
98 EVENT_BY_KEY_CF_NAME,
99 EVENT_BY_VERSION_CF_NAME,
100 EVENT_CF_NAME,
101 JELLYFISH_MERKLE_NODE_CF_NAME,
102 LEDGER_COUNTERS_CF_NAME,
103 STALE_NODE_INDEX_CF_NAME,
104 TRANSACTION_CF_NAME,
105 TRANSACTION_ACCUMULATOR_CF_NAME,
106 TRANSACTION_BY_ACCOUNT_CF_NAME,
107 TRANSACTION_INFO_CF_NAME,
108 LEDGER_INFO_BY_BLOCK_CF_NAME,
109 POS_STATE_CF_NAME,
110 REWARD_EVENT_CF_NAME,
111 COMMITTED_BLOCK_CF_NAME,
112 COMMITTED_BLOCK_BY_VIEW_CF_NAME,
113 LEDGER_INFO_BY_VOTED_BLOCK_CF_NAME,
114 BLOCK_BY_EPOCH_AND_ROUND_CF_NAME,
115 ]
116 }
117
118 fn new_with_db(db: DB) -> Self {
119 let db = Arc::new(db);
120
121 PosLedgerDB {
122 db: Arc::clone(&db),
123 event_store: Arc::new(EventStore::new(Arc::clone(&db))),
124 ledger_store: Arc::new(LedgerStore::new(Arc::clone(&db))),
125 transaction_store: Arc::new(TransactionStore::new(Arc::clone(&db))),
126 }
127 }
128
129 pub fn open<P: AsRef<Path> + Clone>(
130 db_root_path: P, readonly: bool, rocksdb_config: RocksdbConfig,
131 ) -> Result<Self> {
132 let path = db_root_path.as_ref().join("pos-ledger-db");
133 let instant = Instant::now();
134
135 let mut rocksdb_opts = gen_rocksdb_options(&rocksdb_config);
136
137 let db = if readonly {
138 DB::open_readonly(
139 path.clone(),
140 "diemdb_ro",
141 Self::column_families(),
142 rocksdb_opts,
143 )?
144 } else {
145 rocksdb_opts.create_if_missing(true);
146 rocksdb_opts.create_missing_column_families(true);
147 DB::open(
148 path.clone(),
149 "pos-ledger-db",
150 Self::column_families(),
151 rocksdb_opts,
152 )?
153 };
154
155 let ret = Self::new_with_db(db);
156 diem_info!(
157 path = path,
158 time_ms = %instant.elapsed().as_millis(),
159 "Opened PosLedgerDB.",
160 );
161 Ok(ret)
162 }
163
164 #[cfg(any(test, feature = "fuzzing"))]
166 pub fn new_for_test<P: AsRef<Path> + Clone>(db_root_path: P) -> Self {
167 Self::open(
168 db_root_path,
169 false, RocksdbConfig::default(),
171 )
172 .expect("Unable to open DiemDB")
173 }
174
175 fn get_epoch_ending_ledger_infos(
181 &self, start_epoch: u64, end_epoch: u64, limit: usize,
182 ) -> Result<(Vec<LedgerInfoWithSignatures>, bool)> {
183 self.get_epoch_ending_ledger_infos_impl(start_epoch, end_epoch, limit)
184 }
185
186 fn get_epoch_ending_ledger_infos_impl(
187 &self, start_epoch: u64, end_epoch: u64, limit: usize,
188 ) -> Result<(Vec<LedgerInfoWithSignatures>, bool)> {
189 ensure!(
190 start_epoch <= end_epoch,
191 "Bad epoch range [{}, {})",
192 start_epoch,
193 end_epoch,
194 );
195 let latest_epoch = self
199 .ledger_store
200 .get_latest_ledger_info()?
201 .ledger_info()
202 .next_block_epoch();
203 ensure!(
204 end_epoch <= latest_epoch,
205 "Unable to provide epoch change ledger info for still open epoch. asked upper bound: {}, last sealed epoch: {}",
206 end_epoch,
207 latest_epoch - 1, );
209
210 let (paging_epoch, more) = if end_epoch - start_epoch > limit as u64 {
211 (start_epoch + limit as u64, true)
212 } else {
213 (end_epoch, false)
214 };
215
216 let lis = self
217 .ledger_store
218 .get_epoch_ending_ledger_info_iter(start_epoch, paging_epoch)?
219 .collect::<Result<Vec<_>>>()?;
220 ensure!(
221 lis.len() == (paging_epoch - start_epoch) as usize,
222 "DB corruption: missing epoch ending ledger info for epoch {}",
223 lis.last()
224 .map(|li| li.ledger_info().next_block_epoch())
225 .unwrap_or(start_epoch),
226 );
227 Ok((lis, more))
228 }
229
230 pub fn get_transaction(&self, version: Version) -> Result<Transaction> {
232 self.transaction_store.get_transaction(version)
233 }
234
235 pub fn get_transaction_block_meta(
236 &self, version: Version,
237 ) -> Result<Option<(Version, BlockMetadata)>> {
238 self.transaction_store.get_block_metadata(version)
239 }
240
241 pub fn get_transaction_info(
242 &self, version: u64,
243 ) -> Result<TransactionInfo> {
244 self.ledger_store.get_transaction_info(version)
245 }
246
247 fn seal_change_set(&self, cs: ChangeSet) -> Result<SealedChangeSet> {
249 Ok(SealedChangeSet { batch: cs.batch })
250 }
251
252 fn save_transactions_impl(
253 &self, txns_to_commit: &[TransactionToCommit], first_version: u64,
254 mut cs: &mut ChangeSet,
255 ) -> Result<HashValue> {
256 let last_version = first_version + txns_to_commit.len() as u64 - 1;
257
258 let state_root_hashes = vec![Default::default(); txns_to_commit.len()];
260
261 let event_root_hashes =
263 zip_eq(first_version..=last_version, txns_to_commit)
264 .map(|(ver, txn_to_commit)| {
265 self.event_store.put_events(
266 ver,
267 txn_to_commit.events(),
268 &mut cs,
269 )
270 })
271 .collect::<Result<Vec<_>>>()?;
272
273 zip_eq(first_version..=last_version, txns_to_commit).try_for_each(
275 |(ver, txn_to_commit)| {
276 self.transaction_store.put_transaction(
277 ver,
278 txn_to_commit.transaction(),
279 &mut cs,
280 )
281 },
282 )?;
283
284 let txn_infos =
286 izip!(txns_to_commit, state_root_hashes, event_root_hashes)
287 .map(|(t, s, e)| {
288 Ok(TransactionInfo::new(
289 t.transaction().hash(),
290 s,
291 e,
292 t.gas_used(),
293 t.status().clone(),
294 ))
295 })
296 .collect::<Result<Vec<_>>>()?;
297 assert_eq!(txn_infos.len(), txns_to_commit.len());
298
299 let new_root_hash = self.ledger_store.put_transaction_infos(
300 first_version,
301 &txn_infos,
302 &mut cs,
303 )?;
304
305 Ok(new_root_hash)
306 }
307
308 fn commit(&self, sealed_cs: SealedChangeSet) -> Result<()> {
312 self.db.write_schemas(sealed_cs.batch, false)?;
313
314 Ok(())
315 }
316}
317
318impl DbReader for PosLedgerDB {
319 fn get_epoch_ending_ledger_infos(
320 &self, start_epoch: u64, end_epoch: u64,
321 ) -> Result<EpochChangeProof> {
322 gauged_api("get_epoch_ending_ledger_infos", || {
323 let (ledger_info_with_sigs, more) =
324 Self::get_epoch_ending_ledger_infos(
325 &self,
326 start_epoch,
327 end_epoch,
328 MAX_NUM_EPOCH_ENDING_LEDGER_INFO,
329 )?;
330 Ok(EpochChangeProof::new(ledger_info_with_sigs, more))
331 })
332 }
333
334 fn get_block_timestamp(&self, version: u64) -> Result<u64> {
335 gauged_api("get_block_timestamp", || {
336 let ts = match self.transaction_store.get_block_metadata(version)? {
337 Some((_v, block_meta)) => block_meta.into_inner().1,
338 None => 0,
340 };
341 Ok(ts)
342 })
343 }
344
345 fn get_latest_ledger_info(&self) -> Result<LedgerInfoWithSignatures> {
346 gauged_api("get_latest_ledger_info", || {
347 self.ledger_store.get_latest_ledger_info()
348 })
349 }
350
351 fn get_startup_info(
352 &self, need_pos_state: bool,
353 ) -> Result<Option<StartupInfo>> {
354 gauged_api("get_startup_info", || {
355 self.ledger_store.get_startup_info(need_pos_state)
356 })
357 }
358
359 fn get_latest_tree_state(&self) -> Result<TreeState> {
360 gauged_api("get_latest_tree_state", || {
361 let tree_state = match self
362 .ledger_store
363 .get_latest_transaction_info_option()?
364 {
365 Some((version, _txn_info)) => {
366 let frozen_subtrees = self
367 .ledger_store
368 .get_frozen_subtree_hashes(version + 1)?;
369 TreeState::new(
370 version + 1,
371 frozen_subtrees,
372 *SPARSE_MERKLE_PLACEHOLDER_HASH,
373 )
374 }
375 None => {
376 TreeState::new(0, vec![], *SPARSE_MERKLE_PLACEHOLDER_HASH)
377 }
378 };
379
380 diem_info!(
381 num_transactions = tree_state.num_transactions,
382 state_root_hash = %tree_state.account_state_root_hash,
383 description = tree_state.describe(),
384 "Got latest TreeState."
385 );
386
387 Ok(tree_state)
388 })
389 }
390
391 fn get_epoch_ending_ledger_info(
393 &self, version: u64,
394 ) -> Result<LedgerInfoWithSignatures> {
395 gauged_api("get_epoch_ending_ledger_info", || {
396 self.ledger_store.get_epoch_ending_ledger_info(version)
397 })
398 }
399
400 fn get_latest_transaction_info_option(
401 &self,
402 ) -> Result<Option<(Version, TransactionInfo)>> {
403 gauged_api("get_latest_transaction_info_option", || {
404 self.ledger_store.get_latest_transaction_info_option()
405 })
406 }
407
408 fn get_pos_state(&self, block_id: &HashValue) -> Result<PosState> {
409 diem_debug!("get_pos_state:{}", block_id);
410 self.ledger_store.get_pos_state(block_id)
411 }
412
413 fn get_latest_pos_state(&self) -> Arc<PosState> {
414 self.ledger_store.get_latest_pos_state()
415 }
416}
417
418impl DbWriter for PosLedgerDB {
419 fn save_transactions(
427 &self, txns_to_commit: &[TransactionToCommit], first_version: Version,
428 ledger_info_with_sigs: Option<&LedgerInfoWithSignatures>,
429 pos_state: Option<PosState>, committed_blocks: Vec<CommittedBlock>,
430 ledger_infos_with_voted_block: Vec<(
431 HashValue,
432 LedgerInfoWithSignatures,
433 )>,
434 ) -> Result<()> {
435 gauged_api("save_transactions", || {
436 let num_txns = txns_to_commit.len() as u64;
437 ensure!(
441 ledger_info_with_sigs.is_some() || num_txns > 0,
442 "txns_to_commit is empty while ledger_info_with_sigs is None.",
443 );
444
445 let mut cs = ChangeSet::new();
447
448 if let Some(x) = ledger_info_with_sigs {
449 let claimed_last_version = x.ledger_info().version();
450 ensure!(
451 claimed_last_version + 1 == first_version + num_txns,
452 "Transaction batch not applicable: first_version {}, num_txns {}, last_version {}",
453 first_version,
454 num_txns,
455 claimed_last_version,
456 );
457 }
458
459 let _new_root_hash = self.save_transactions_impl(
460 txns_to_commit,
461 first_version,
462 &mut cs,
463 )?;
464
465 for b in committed_blocks {
466 self.ledger_store.put_committed_block(&b, &mut cs)?;
467 }
468
469 for (voted_block, ledger_info) in ledger_infos_with_voted_block {
470 self.ledger_store.put_ledger_info_by_voted_block(
471 &voted_block,
472 &ledger_info,
473 &mut cs,
474 )?;
475 }
476
477 if let Some(x) = ledger_info_with_sigs {
480 self.ledger_store.put_ledger_info(x, &mut cs)?;
490 if let Some(pos_state) = pos_state {
491 self.ledger_store.put_pos_state(
493 &x.ledger_info().consensus_block_id(),
494 pos_state,
495 &mut cs,
496 )?;
497 }
498 }
499
500 let sealed_cs = self.seal_change_set(cs)?;
502 self.commit(sealed_cs)?;
503
504 if let Some(x) = ledger_info_with_sigs {
507 self.ledger_store.set_latest_ledger_info(x.clone());
508 }
509
510 Ok(())
511 })
512 }
513
514 fn save_reward_event(
515 &self, epoch: u64, event: &RewardDistributionEventV2,
516 ) -> Result<()> {
517 self.ledger_store.put_reward_event(epoch, event)
518 }
519
520 fn delete_pos_state_by_block(&self, block_id: &HashValue) -> Result<()> {
521 self.ledger_store.delete_pos_state(block_id)
522 }
523}
524
525impl DBReaderForPoW for PosLedgerDB {
526 fn get_latest_ledger_info_option(
527 &self,
528 ) -> Option<LedgerInfoWithSignatures> {
529 self.ledger_store.get_latest_ledger_info_option()
530 }
531
532 fn get_block_ledger_info(
533 &self, consensus_block_id: &HashValue,
534 ) -> Result<LedgerInfoWithSignatures> {
535 self.ledger_store.get_block_ledger_info(consensus_block_id)
536 }
537
538 fn get_events_by_version(
539 &self, start_version: u64, end_version: u64,
540 ) -> Result<Vec<ContractEvent>> {
541 let iter = self.event_store.get_events_by_version_iter(
542 start_version,
543 (end_version - start_version) as usize,
544 )?;
545 let events_vec = iter.collect::<Result<Vec<Vec<ContractEvent>>>>()?;
546 Ok(events_vec.into_iter().flatten().collect())
547 }
548
549 fn get_epoch_ending_blocks(
550 &self, start_epoch: u64, end_epoch: u64,
551 ) -> Result<Vec<HashValue>> {
552 let mut ending_blocks = Vec::new();
553 for ledger_info in self
554 .ledger_store
555 .get_epoch_ending_ledger_info_iter(start_epoch, end_epoch)?
556 {
557 ending_blocks.push(ledger_info?.ledger_info().consensus_block_id());
558 }
559 Ok(ending_blocks)
560 }
561
562 fn get_reward_event(
563 &self, epoch: u64,
564 ) -> Result<RewardDistributionEventV2> {
565 self.ledger_store.get_reward_event(epoch)
566 }
567
568 fn get_committed_block_by_hash(
569 &self, block_hash: &HashValue,
570 ) -> Result<CommittedBlock> {
571 self.ledger_store.get_committed_block_by_hash(block_hash)
572 }
573
574 fn get_committed_block_hash_by_view(&self, view: u64) -> Result<HashValue> {
575 self.ledger_store.get_committed_block_hash_by_view(view)
576 }
577
578 fn get_ledger_info_by_voted_block(
579 &self, block_id: &HashValue,
580 ) -> Result<LedgerInfoWithSignatures> {
581 self.ledger_store.get_ledger_info_by_voted_block(block_id)
582 }
583
584 fn get_block_hash_by_epoch_and_round(
585 &self, epoch: u64, round: u64,
586 ) -> Result<HashValue> {
587 self.ledger_store
588 .get_block_hash_by_epoch_and_round(epoch, round)
589 }
590}
591
592#[cfg(any(test, feature = "fuzzing"))]
593fn get_first_seq_num_and_limit(
595 order: Order, cursor: u64, limit: u64,
596) -> Result<(u64, u64)> {
597 ensure!(limit > 0, "limit should > 0, got {}", limit);
598
599 Ok(if order == Order::Ascending {
600 (cursor, limit)
601 } else if limit <= cursor {
602 (cursor - limit + 1, limit)
603 } else {
604 (0, cursor + 1)
605 })
606}
607
608fn gauged_api<T, F>(api_name: &'static str, api_impl: F) -> Result<T>
609where F: FnOnce() -> Result<T> {
610 let res = api_impl();
611
612 if let Err(e) = &res {
613 diem_warn!(
614 api_name = api_name,
615 error = ?e,
616 "DiemDB API returned error."
617 );
618 }
619
620 res
621}