1#![forbid(unsafe_code)]
9
10use std::{
18 collections::HashMap,
19 iter::Iterator,
20 path::Path,
21 sync::{mpsc, Arc, Mutex},
22 thread::{self, JoinHandle},
23 time::{Duration, Instant},
24};
25
26use anyhow::{ensure, Result};
27use itertools::{izip, zip_eq};
28use once_cell::sync::Lazy;
29
30use diem_config::config::RocksdbConfig;
31use diem_crypto::hash::{
32 CryptoHash, HashValue, SPARSE_MERKLE_PLACEHOLDER_HASH,
33};
34use diem_logger::prelude::*;
35use diem_types::{
36 account_address::AccountAddress,
37 committed_block::CommittedBlock,
38 contract_event::ContractEvent,
39 epoch_change::EpochChangeProof,
40 ledger_info::LedgerInfoWithSignatures,
41 proof::{AccumulatorConsistencyProof, TransactionListProof},
42 reward_distribution_event::RewardDistributionEventV2,
43 term_state::PosState,
44 transaction::{
45 Transaction, TransactionInfo, TransactionListWithProof,
46 TransactionToCommit, TransactionWithProof, Version,
47 },
48};
49#[cfg(feature = "fuzzing")]
50pub use diemdb_test::test_save_blocks_impl;
51use schemadb::{ColumnFamilyName, Options, DB, DEFAULT_CF_NAME};
52#[cfg(any(test, feature = "fuzzing"))]
53use storage_interface::Order;
54use storage_interface::{
55 DBReaderForPoW, DbReader, DbWriter, StartupInfo, TreeState,
56};
57
58use crate::{
59 change_set::{ChangeSet, SealedChangeSet},
60 errors::DiemDbError,
61 event_store::EventStore,
62 ledger_store::LedgerStore,
63 metrics::{
64 DIEM_STORAGE_API_LATENCY_SECONDS, DIEM_STORAGE_COMMITTED_TXNS,
65 DIEM_STORAGE_LATEST_TXN_VERSION, DIEM_STORAGE_LEDGER_VERSION,
66 DIEM_STORAGE_NEXT_BLOCK_EPOCH, DIEM_STORAGE_OTHER_TIMERS_SECONDS,
67 DIEM_STORAGE_ROCKSDB_PROPERTIES,
68 },
69 schema::*,
70 transaction_store::TransactionStore,
71};
72use diem_types::block_metadata::BlockMetadata;
73
74#[cfg(any(test, feature = "fuzzing"))]
76pub mod test_helper;
77
78pub mod errors;
79pub mod metrics;
80pub mod schema;
81
82mod change_set;
83mod event_store;
84mod ledger_store;
85mod transaction_store;
86
87#[cfg(any(test, feature = "fuzzing"))]
88#[allow(dead_code)]
89mod diemdb_test;
90
91const MAX_LIMIT: u64 = 1000;
92
93const MAX_NUM_EPOCH_ENDING_LEDGER_INFO: usize = 100;
97
98static ROCKSDB_PROPERTY_MAP: Lazy<HashMap<&str, &str>> = Lazy::new(|| {
99 [
100 (
101 "diem_rocksdb_live_sst_files_size_bytes",
102 "rocksdb.live-sst-files-size",
103 ),
104 (
105 "diem_rocksdb_all_memtables_size_bytes",
106 "rocksdb.size-all-mem-tables",
107 ),
108 (
109 "diem_rocksdb_num_running_compactions",
110 "rocksdb.num-running-compactions",
111 ),
112 (
113 "diem_rocksdb_num_running_flushes",
114 "rocksdb.num-running-flushes",
115 ),
116 (
117 "diem_rocksdb_block_cache_usage_bytes",
118 "rocksdb.block-cache-usage",
119 ),
120 (
121 "diem_rocksdb_cf_size_bytes",
122 "rocksdb.estimate-live-data-size",
123 ),
124 ]
125 .iter()
126 .cloned()
127 .collect()
128});
129
130fn error_if_too_many_requested(
131 num_requested: u64, max_allowed: u64,
132) -> Result<()> {
133 if num_requested > max_allowed {
134 Err(DiemDbError::TooManyRequested(num_requested, max_allowed).into())
135 } else {
136 Ok(())
137 }
138}
139
140fn gen_rocksdb_options(config: &RocksdbConfig) -> Options {
141 let mut db_opts = Options::default();
142 db_opts.set_max_open_files(config.max_open_files);
143 db_opts.set_max_total_wal_size(config.max_total_wal_size);
144 db_opts
145}
146
147fn update_rocksdb_properties(db: &DB) -> Result<()> {
148 let _timer = DIEM_STORAGE_OTHER_TIMERS_SECONDS
149 .with_label_values(&["update_rocksdb_properties"])
150 .start_timer();
151 for cf_name in PosLedgerDB::column_families() {
152 for (property_name, rocksdb_property_argument) in &*ROCKSDB_PROPERTY_MAP
153 {
154 DIEM_STORAGE_ROCKSDB_PROPERTIES
155 .with_label_values(&[cf_name, property_name])
156 .set(
157 db.get_property(cf_name, rocksdb_property_argument)? as i64
158 );
159 }
160 }
161 Ok(())
162}
163
164#[derive(Debug)]
165struct RocksdbPropertyReporter {
166 sender: Mutex<mpsc::Sender<()>>,
167 join_handle: Option<JoinHandle<()>>,
168}
169
170impl RocksdbPropertyReporter {
171 fn new(db: Arc<DB>) -> Self {
172 let (send, recv) = mpsc::channel();
173 let join_handle = Some(thread::spawn(move || loop {
174 if let Err(e) = update_rocksdb_properties(&db) {
175 diem_warn!(
176 error = ?e,
177 "Updating rocksdb property failed."
178 );
179 }
180 match recv.recv_timeout(Duration::from_secs(10)) {
182 Ok(_) => break,
183 Err(mpsc::RecvTimeoutError::Timeout) => (),
184 Err(mpsc::RecvTimeoutError::Disconnected) => break,
185 }
186 }));
187 Self {
188 sender: Mutex::new(send),
189 join_handle,
190 }
191 }
192}
193
194impl Drop for RocksdbPropertyReporter {
195 fn drop(&mut self) {
196 self.sender.lock().unwrap().send(()).unwrap();
198 self.join_handle
199 .take()
200 .expect("Rocksdb property reporting thread must exist.")
201 .join()
202 .expect(
203 "Rocksdb property reporting thread should join peacefully.",
204 );
205 }
206}
207
208#[derive(Debug)]
211pub struct PosLedgerDB {
212 db: Arc<DB>,
213 ledger_store: Arc<LedgerStore>,
214 transaction_store: Arc<TransactionStore>,
215 event_store: Arc<EventStore>,
216 #[allow(dead_code)]
217 rocksdb_property_reporter: RocksdbPropertyReporter,
218}
219
220impl PosLedgerDB {
221 fn column_families() -> Vec<ColumnFamilyName> {
222 vec![
223 DEFAULT_CF_NAME,
224 EPOCH_BY_VERSION_CF_NAME,
225 EVENT_ACCUMULATOR_CF_NAME,
226 EVENT_BY_KEY_CF_NAME,
227 EVENT_BY_VERSION_CF_NAME,
228 EVENT_CF_NAME,
229 JELLYFISH_MERKLE_NODE_CF_NAME,
230 LEDGER_COUNTERS_CF_NAME,
231 STALE_NODE_INDEX_CF_NAME,
232 TRANSACTION_CF_NAME,
233 TRANSACTION_ACCUMULATOR_CF_NAME,
234 TRANSACTION_BY_ACCOUNT_CF_NAME,
235 TRANSACTION_INFO_CF_NAME,
236 LEDGER_INFO_BY_BLOCK_CF_NAME,
237 POS_STATE_CF_NAME,
238 REWARD_EVENT_CF_NAME,
239 COMMITTED_BLOCK_CF_NAME,
240 COMMITTED_BLOCK_BY_VIEW_CF_NAME,
241 LEDGER_INFO_BY_VOTED_BLOCK_CF_NAME,
242 BLOCK_BY_EPOCH_AND_ROUND_CF_NAME,
243 ]
244 }
245
246 fn new_with_db(db: DB, _prune_window: Option<u64>) -> Self {
247 let db = Arc::new(db);
248
249 PosLedgerDB {
250 db: Arc::clone(&db),
251 event_store: Arc::new(EventStore::new(Arc::clone(&db))),
252 ledger_store: Arc::new(LedgerStore::new(Arc::clone(&db))),
253 transaction_store: Arc::new(TransactionStore::new(Arc::clone(&db))),
254 rocksdb_property_reporter: RocksdbPropertyReporter::new(
255 Arc::clone(&db),
256 ),
257 }
258 }
259
260 pub fn open<P: AsRef<Path> + Clone>(
261 db_root_path: P, readonly: bool, prune_window: Option<u64>,
262 rocksdb_config: RocksdbConfig,
263 ) -> Result<Self> {
264 ensure!(
265 prune_window.is_none() || !readonly,
266 "Do not set prune_window when opening readonly.",
267 );
268
269 let path = db_root_path.as_ref().join("pos-ledger-db");
270 let instant = Instant::now();
271
272 let mut rocksdb_opts = gen_rocksdb_options(&rocksdb_config);
273
274 let db = if readonly {
275 DB::open_readonly(
276 path.clone(),
277 "diemdb_ro",
278 Self::column_families(),
279 rocksdb_opts,
280 )?
281 } else {
282 rocksdb_opts.create_if_missing(true);
283 rocksdb_opts.create_missing_column_families(true);
284 DB::open(
285 path.clone(),
286 "pos-ledger-db",
287 Self::column_families(),
288 rocksdb_opts,
289 )?
290 };
291
292 let ret = Self::new_with_db(db, prune_window);
293 diem_info!(
294 path = path,
295 time_ms = %instant.elapsed().as_millis(),
296 "Opened PosLedgerDB.",
297 );
298 Ok(ret)
299 }
300
301 #[cfg(any(test, feature = "fuzzing"))]
303 pub fn new_for_test<P: AsRef<Path> + Clone>(db_root_path: P) -> Self {
304 Self::open(
305 db_root_path,
306 false, None, RocksdbConfig::default(),
309 )
310 .expect("Unable to open DiemDB")
311 }
312
313 pub fn update_rocksdb_properties(&self) -> Result<()> {
315 update_rocksdb_properties(&self.db)
316 }
317
318 fn get_epoch_ending_ledger_infos(
324 &self, start_epoch: u64, end_epoch: u64, limit: usize,
325 ) -> Result<(Vec<LedgerInfoWithSignatures>, bool)> {
326 self.get_epoch_ending_ledger_infos_impl(start_epoch, end_epoch, limit)
327 }
328
329 fn get_epoch_ending_ledger_infos_impl(
330 &self, start_epoch: u64, end_epoch: u64, limit: usize,
331 ) -> Result<(Vec<LedgerInfoWithSignatures>, bool)> {
332 ensure!(
333 start_epoch <= end_epoch,
334 "Bad epoch range [{}, {})",
335 start_epoch,
336 end_epoch,
337 );
338 let latest_epoch = self
342 .ledger_store
343 .get_latest_ledger_info()?
344 .ledger_info()
345 .next_block_epoch();
346 ensure!(
347 end_epoch <= latest_epoch,
348 "Unable to provide epoch change ledger info for still open epoch. asked upper bound: {}, last sealed epoch: {}",
349 end_epoch,
350 latest_epoch - 1, );
352
353 let (paging_epoch, more) = if end_epoch - start_epoch > limit as u64 {
354 (start_epoch + limit as u64, true)
355 } else {
356 (end_epoch, false)
357 };
358
359 let lis = self
360 .ledger_store
361 .get_epoch_ending_ledger_info_iter(start_epoch, paging_epoch)?
362 .collect::<Result<Vec<_>>>()?;
363 ensure!(
364 lis.len() == (paging_epoch - start_epoch) as usize,
365 "DB corruption: missing epoch ending ledger info for epoch {}",
366 lis.last()
367 .map(|li| li.ledger_info().next_block_epoch())
368 .unwrap_or(start_epoch),
369 );
370 Ok((lis, more))
371 }
372
373 fn get_transaction_with_proof(
374 &self, version: Version, ledger_version: Version, fetch_events: bool,
375 ) -> Result<TransactionWithProof> {
376 let proof = self
377 .ledger_store
378 .get_transaction_info_with_proof(version, ledger_version)?;
379 let transaction = self.transaction_store.get_transaction(version)?;
380
381 let events = if fetch_events {
383 Some(self.event_store.get_events_by_version(version)?)
384 } else {
385 None
386 };
387
388 Ok(TransactionWithProof {
389 version,
390 transaction,
391 events,
392 proof,
393 })
394 }
395
396 pub fn get_transaction(&self, version: Version) -> Result<Transaction> {
398 self.transaction_store.get_transaction(version)
399 }
400
401 pub fn get_transaction_block_meta(
402 &self, version: Version,
403 ) -> Result<Option<(Version, BlockMetadata)>> {
404 self.transaction_store.get_block_metadata(version)
405 }
406
407 pub fn get_transaction_info(
408 &self, version: u64,
409 ) -> Result<TransactionInfo> {
410 self.ledger_store.get_transaction_info(version)
411 }
412
413 fn seal_change_set(&self, cs: ChangeSet) -> Result<SealedChangeSet> {
415 Ok(SealedChangeSet { batch: cs.batch })
416 }
417
418 fn save_transactions_impl(
419 &self, txns_to_commit: &[TransactionToCommit], first_version: u64,
420 mut cs: &mut ChangeSet,
421 ) -> Result<HashValue> {
422 let last_version = first_version + txns_to_commit.len() as u64 - 1;
423
424 let state_root_hashes = vec![Default::default(); txns_to_commit.len()];
426
427 let event_root_hashes =
429 zip_eq(first_version..=last_version, txns_to_commit)
430 .map(|(ver, txn_to_commit)| {
431 self.event_store.put_events(
432 ver,
433 txn_to_commit.events(),
434 &mut cs,
435 )
436 })
437 .collect::<Result<Vec<_>>>()?;
438
439 zip_eq(first_version..=last_version, txns_to_commit).try_for_each(
441 |(ver, txn_to_commit)| {
442 self.transaction_store.put_transaction(
443 ver,
444 txn_to_commit.transaction(),
445 &mut cs,
446 )
447 },
448 )?;
449
450 let txn_infos =
452 izip!(txns_to_commit, state_root_hashes, event_root_hashes)
453 .map(|(t, s, e)| {
454 Ok(TransactionInfo::new(
455 t.transaction().hash(),
456 s,
457 e,
458 t.gas_used(),
459 t.status().clone(),
460 ))
461 })
462 .collect::<Result<Vec<_>>>()?;
463 assert_eq!(txn_infos.len(), txns_to_commit.len());
464
465 let new_root_hash = self.ledger_store.put_transaction_infos(
466 first_version,
467 &txn_infos,
468 &mut cs,
469 )?;
470
471 Ok(new_root_hash)
472 }
473
474 fn commit(&self, sealed_cs: SealedChangeSet) -> Result<()> {
478 self.db.write_schemas(sealed_cs.batch, false)?;
479
480 Ok(())
481 }
482}
483
484impl DbReader for PosLedgerDB {
485 fn get_epoch_ending_ledger_infos(
486 &self, start_epoch: u64, end_epoch: u64,
487 ) -> Result<EpochChangeProof> {
488 gauged_api("get_epoch_ending_ledger_infos", || {
489 let (ledger_info_with_sigs, more) =
490 Self::get_epoch_ending_ledger_infos(
491 &self,
492 start_epoch,
493 end_epoch,
494 MAX_NUM_EPOCH_ENDING_LEDGER_INFO,
495 )?;
496 Ok(EpochChangeProof::new(ledger_info_with_sigs, more))
497 })
498 }
499
500 fn get_transactions(
507 &self, start_version: Version, limit: u64, ledger_version: Version,
508 fetch_events: bool,
509 ) -> Result<TransactionListWithProof> {
510 gauged_api("get_transactions", || {
511 error_if_too_many_requested(limit, MAX_LIMIT)?;
512
513 if start_version > ledger_version || limit == 0 {
514 return Ok(TransactionListWithProof::new_empty());
515 }
516
517 let limit =
518 std::cmp::min(limit, ledger_version - start_version + 1);
519
520 let txns = (start_version..start_version + limit)
521 .map(|version| self.transaction_store.get_transaction(version))
522 .collect::<Result<Vec<_>>>()?;
523 let txn_infos = (start_version..start_version + limit)
524 .map(|version| self.ledger_store.get_transaction_info(version))
525 .collect::<Result<Vec<_>>>()?;
526 let events = if fetch_events {
527 Some(
528 (start_version..start_version + limit)
529 .map(|version| {
530 self.event_store.get_events_by_version(version)
531 })
532 .collect::<Result<Vec<_>>>()?,
533 )
534 } else {
535 None
536 };
537 let proof = TransactionListProof::new(
538 self.ledger_store.get_transaction_range_proof(
539 Some(start_version),
540 limit,
541 ledger_version,
542 )?,
543 txn_infos,
544 );
545
546 Ok(TransactionListWithProof::new(
547 txns,
548 events,
549 Some(start_version),
550 proof,
551 ))
552 })
553 }
554
555 fn get_block_timestamp(&self, version: u64) -> Result<u64> {
556 gauged_api("get_block_timestamp", || {
557 let ts = match self.transaction_store.get_block_metadata(version)? {
558 Some((_v, block_meta)) => block_meta.into_inner().1,
559 None => 0,
561 };
562 Ok(ts)
563 })
564 }
565
566 fn get_latest_ledger_info(&self) -> Result<LedgerInfoWithSignatures> {
567 gauged_api("get_latest_ledger_info", || {
568 self.ledger_store.get_latest_ledger_info()
569 })
570 }
571
572 fn get_startup_info(
573 &self, need_pos_state: bool,
574 ) -> Result<Option<StartupInfo>> {
575 gauged_api("get_startup_info", || {
576 self.ledger_store.get_startup_info(need_pos_state)
577 })
578 }
579
580 fn get_txn_by_account(
584 &self, address: AccountAddress, seq_num: u64, ledger_version: Version,
585 fetch_events: bool,
586 ) -> Result<Option<TransactionWithProof>> {
587 gauged_api("get_txn_by_account", || {
588 self.transaction_store
589 .lookup_transaction_by_account(
590 address,
591 seq_num,
592 ledger_version,
593 )?
594 .map(|version| {
595 self.get_transaction_with_proof(
596 version,
597 ledger_version,
598 fetch_events,
599 )
600 })
601 .transpose()
602 })
603 }
604
605 fn get_state_proof_with_ledger_info(
606 &self, known_version: u64,
607 ledger_info_with_sigs: LedgerInfoWithSignatures,
608 ) -> Result<(EpochChangeProof, AccumulatorConsistencyProof)> {
609 gauged_api("get_state_proof_with_ledger_info", || {
610 let ledger_info = ledger_info_with_sigs.ledger_info();
611 ensure!(
612 known_version <= ledger_info.version(),
613 "Client known_version {} larger than ledger version {}.",
614 known_version,
615 ledger_info.version(),
616 );
617 let known_epoch = self.ledger_store.get_epoch(known_version)?;
618 let epoch_change_proof =
619 if known_epoch < ledger_info.next_block_epoch() {
620 let (ledger_infos_with_sigs, more) = self
621 .get_epoch_ending_ledger_infos(
622 known_epoch,
623 ledger_info.next_block_epoch(),
624 usize::MAX,
627 )?;
628 EpochChangeProof::new(ledger_infos_with_sigs, more)
629 } else {
630 EpochChangeProof::new(vec![], false)
631 };
632
633 let ledger_consistency_proof = self
634 .ledger_store
635 .get_consistency_proof(known_version, ledger_info.version())?;
636 Ok((epoch_change_proof, ledger_consistency_proof))
637 })
638 }
639
640 fn get_state_proof(
641 &self, known_version: u64,
642 ) -> Result<(
643 LedgerInfoWithSignatures,
644 EpochChangeProof,
645 AccumulatorConsistencyProof,
646 )> {
647 gauged_api("get_state_proof", || {
648 let ledger_info_with_sigs =
649 self.ledger_store.get_latest_ledger_info()?;
650 let (epoch_change_proof, ledger_consistency_proof) = self
651 .get_state_proof_with_ledger_info(
652 known_version,
653 ledger_info_with_sigs.clone(),
654 )?;
655 Ok((
656 ledger_info_with_sigs,
657 epoch_change_proof,
658 ledger_consistency_proof,
659 ))
660 })
661 }
662
663 fn get_latest_tree_state(&self) -> Result<TreeState> {
664 gauged_api("get_latest_tree_state", || {
665 let tree_state = match self
666 .ledger_store
667 .get_latest_transaction_info_option()?
668 {
669 Some((version, _txn_info)) => {
670 let frozen_subtrees = self
671 .ledger_store
672 .get_frozen_subtree_hashes(version + 1)?;
673 TreeState::new(
674 version + 1,
675 frozen_subtrees,
676 *SPARSE_MERKLE_PLACEHOLDER_HASH,
677 )
678 }
679 None => {
680 TreeState::new(0, vec![], *SPARSE_MERKLE_PLACEHOLDER_HASH)
681 }
682 };
683
684 diem_info!(
685 num_transactions = tree_state.num_transactions,
686 state_root_hash = %tree_state.account_state_root_hash,
687 description = tree_state.describe(),
688 "Got latest TreeState."
689 );
690
691 Ok(tree_state)
692 })
693 }
694
695 fn get_epoch_ending_ledger_info(
697 &self, version: u64,
698 ) -> Result<LedgerInfoWithSignatures> {
699 gauged_api("get_epoch_ending_ledger_info", || {
700 self.ledger_store.get_epoch_ending_ledger_info(version)
701 })
702 }
703
704 fn get_latest_transaction_info_option(
705 &self,
706 ) -> Result<Option<(Version, TransactionInfo)>> {
707 gauged_api("get_latest_transaction_info_option", || {
708 self.ledger_store.get_latest_transaction_info_option()
709 })
710 }
711
712 fn get_accumulator_root_hash(&self, version: Version) -> Result<HashValue> {
713 gauged_api("get_accumulator_root_hash", || {
714 self.ledger_store.get_root_hash(version)
715 })
716 }
717
718 fn get_pos_state(&self, block_id: &HashValue) -> Result<PosState> {
719 diem_debug!("get_pos_state:{}", block_id);
720 self.ledger_store.get_pos_state(block_id)
721 }
722
723 fn get_latest_pos_state(&self) -> Arc<PosState> {
724 self.ledger_store.get_latest_pos_state()
725 }
726}
727
728impl DbWriter for PosLedgerDB {
729 fn save_transactions(
737 &self, txns_to_commit: &[TransactionToCommit], first_version: Version,
738 ledger_info_with_sigs: Option<&LedgerInfoWithSignatures>,
739 pos_state: Option<PosState>, committed_blocks: Vec<CommittedBlock>,
740 ledger_infos_with_voted_block: Vec<(
741 HashValue,
742 LedgerInfoWithSignatures,
743 )>,
744 ) -> Result<()> {
745 gauged_api("save_transactions", || {
746 let num_txns = txns_to_commit.len() as u64;
747 ensure!(
751 ledger_info_with_sigs.is_some() || num_txns > 0,
752 "txns_to_commit is empty while ledger_info_with_sigs is None.",
753 );
754
755 let mut cs = ChangeSet::new();
757
758 if let Some(x) = ledger_info_with_sigs {
759 let claimed_last_version = x.ledger_info().version();
760 ensure!(
761 claimed_last_version + 1 == first_version + num_txns,
762 "Transaction batch not applicable: first_version {}, num_txns {}, last_version {}",
763 first_version,
764 num_txns,
765 claimed_last_version,
766 );
767 }
768
769 let _new_root_hash = self.save_transactions_impl(
770 txns_to_commit,
771 first_version,
772 &mut cs,
773 )?;
774
775 for b in committed_blocks {
776 self.ledger_store.put_committed_block(&b, &mut cs)?;
777 }
778
779 for (voted_block, ledger_info) in ledger_infos_with_voted_block {
780 self.ledger_store.put_ledger_info_by_voted_block(
781 &voted_block,
782 &ledger_info,
783 &mut cs,
784 )?;
785 }
786
787 if let Some(x) = ledger_info_with_sigs {
790 self.ledger_store.put_ledger_info(x, &mut cs)?;
800 if let Some(pos_state) = pos_state {
801 self.ledger_store.put_pos_state(
803 &x.ledger_info().consensus_block_id(),
804 pos_state,
805 &mut cs,
806 )?;
807 }
808 }
809
810 let sealed_cs = self.seal_change_set(cs)?;
812 {
813 let _timer = DIEM_STORAGE_OTHER_TIMERS_SECONDS
814 .with_label_values(&["save_transactions_commit"])
815 .start_timer();
816 self.commit(sealed_cs)?;
817 }
818
819 if let Some(x) = ledger_info_with_sigs {
822 self.ledger_store.set_latest_ledger_info(x.clone());
823
824 DIEM_STORAGE_LEDGER_VERSION
825 .set(x.ledger_info().version() as i64);
826 DIEM_STORAGE_NEXT_BLOCK_EPOCH
827 .set(x.ledger_info().next_block_epoch() as i64);
828 }
829
830 if num_txns > 0 {
834 let last_version = first_version + num_txns - 1;
835 DIEM_STORAGE_COMMITTED_TXNS.inc_by(num_txns);
836 DIEM_STORAGE_LATEST_TXN_VERSION.set(last_version as i64);
837 }
838
839 Ok(())
840 })
841 }
842
843 fn save_reward_event(
844 &self, epoch: u64, event: &RewardDistributionEventV2,
845 ) -> Result<()> {
846 self.ledger_store.put_reward_event(epoch, event)
847 }
848
849 fn delete_pos_state_by_block(&self, block_id: &HashValue) -> Result<()> {
850 self.ledger_store.delete_pos_state(block_id)
851 }
852}
853
854impl DBReaderForPoW for PosLedgerDB {
855 fn get_latest_ledger_info_option(
856 &self,
857 ) -> Option<LedgerInfoWithSignatures> {
858 self.ledger_store.get_latest_ledger_info_option()
859 }
860
861 fn get_block_ledger_info(
862 &self, consensus_block_id: &HashValue,
863 ) -> Result<LedgerInfoWithSignatures> {
864 self.ledger_store.get_block_ledger_info(consensus_block_id)
865 }
866
867 fn get_events_by_version(
868 &self, start_version: u64, end_version: u64,
869 ) -> Result<Vec<ContractEvent>> {
870 let iter = self.event_store.get_events_by_version_iter(
871 start_version,
872 (end_version - start_version) as usize,
873 )?;
874 let events_vec = iter.collect::<Result<Vec<Vec<ContractEvent>>>>()?;
875 Ok(events_vec.into_iter().flatten().collect())
876 }
877
878 fn get_epoch_ending_blocks(
879 &self, start_epoch: u64, end_epoch: u64,
880 ) -> Result<Vec<HashValue>> {
881 let mut ending_blocks = Vec::new();
882 for ledger_info in self
883 .ledger_store
884 .get_epoch_ending_ledger_info_iter(start_epoch, end_epoch)?
885 {
886 ending_blocks.push(ledger_info?.ledger_info().consensus_block_id());
887 }
888 Ok(ending_blocks)
889 }
890
891 fn get_reward_event(
892 &self, epoch: u64,
893 ) -> Result<RewardDistributionEventV2> {
894 self.ledger_store.get_reward_event(epoch)
895 }
896
897 fn get_committed_block_by_hash(
898 &self, block_hash: &HashValue,
899 ) -> Result<CommittedBlock> {
900 self.ledger_store.get_committed_block_by_hash(block_hash)
901 }
902
903 fn get_committed_block_hash_by_view(&self, view: u64) -> Result<HashValue> {
904 self.ledger_store.get_committed_block_hash_by_view(view)
905 }
906
907 fn get_ledger_info_by_voted_block(
908 &self, block_id: &HashValue,
909 ) -> Result<LedgerInfoWithSignatures> {
910 self.ledger_store.get_ledger_info_by_voted_block(block_id)
911 }
912
913 fn get_block_hash_by_epoch_and_round(
914 &self, epoch: u64, round: u64,
915 ) -> Result<HashValue> {
916 self.ledger_store
917 .get_block_hash_by_epoch_and_round(epoch, round)
918 }
919}
920
921#[cfg(any(test, feature = "fuzzing"))]
922fn get_first_seq_num_and_limit(
924 order: Order, cursor: u64, limit: u64,
925) -> Result<(u64, u64)> {
926 ensure!(limit > 0, "limit should > 0, got {}", limit);
927
928 Ok(if order == Order::Ascending {
929 (cursor, limit)
930 } else if limit <= cursor {
931 (cursor - limit + 1, limit)
932 } else {
933 (0, cursor + 1)
934 })
935}
936
937fn gauged_api<T, F>(api_name: &'static str, api_impl: F) -> Result<T>
938where F: FnOnce() -> Result<T> {
939 let timer = Instant::now();
940
941 let res = api_impl();
942
943 let res_type = match &res {
944 Ok(_) => "Ok",
945 Err(e) => {
946 diem_warn!(
947 api_name = api_name,
948 error = ?e,
949 "DiemDB API returned error."
950 );
951 "Err"
952 }
953 };
954 DIEM_STORAGE_API_LATENCY_SECONDS
955 .with_label_values(&[api_name, res_type])
956 .observe(timer.elapsed().as_secs_f64());
957
958 res
959}