1#![forbid(unsafe_code)]
9
10use std::{
18 collections::HashMap,
19 iter::Iterator,
20 path::Path,
21 sync::{mpsc, Arc, Mutex},
22 thread::{self, JoinHandle},
23 time::{Duration, Instant},
24};
25
26use anyhow::{ensure, Result};
27use itertools::{izip, zip_eq};
28use once_cell::sync::Lazy;
29
30use diem_config::config::RocksdbConfig;
31use diem_crypto::hash::{
32 CryptoHash, HashValue, SPARSE_MERKLE_PLACEHOLDER_HASH,
33};
34use diem_logger::prelude::*;
35use diem_types::{
36 account_address::AccountAddress,
37 account_state_blob::{AccountStateBlob, AccountStateWithProof},
38 committed_block::CommittedBlock,
39 contract_event::ContractEvent,
40 epoch_change::EpochChangeProof,
41 ledger_info::LedgerInfoWithSignatures,
42 proof::{
43 AccountStateProof, AccumulatorConsistencyProof, SparseMerkleProof,
44 TransactionListProof,
45 },
46 reward_distribution_event::RewardDistributionEventV2,
47 term_state::PosState,
48 transaction::{
49 Transaction, TransactionInfo, TransactionListWithProof,
50 TransactionToCommit, TransactionWithProof, Version,
51 PRE_GENESIS_VERSION,
52 },
53};
54#[cfg(feature = "fuzzing")]
55pub use diemdb_test::test_save_blocks_impl;
56use schemadb::{ColumnFamilyName, Options, DB, DEFAULT_CF_NAME};
57#[cfg(any(test, feature = "fuzzing"))]
58use storage_interface::Order;
59use storage_interface::{
60 DBReaderForPoW, DbReader, DbWriter, StartupInfo, TreeState,
61};
62
63use crate::{
64 backup::{backup_handler::BackupHandler, restore_handler::RestoreHandler},
65 change_set::{ChangeSet, SealedChangeSet},
66 errors::DiemDbError,
67 event_store::EventStore,
68 ledger_counters::LedgerCounters,
69 ledger_store::LedgerStore,
70 metrics::{
71 DIEM_STORAGE_API_LATENCY_SECONDS, DIEM_STORAGE_COMMITTED_TXNS,
72 DIEM_STORAGE_LATEST_TXN_VERSION, DIEM_STORAGE_LEDGER_VERSION,
73 DIEM_STORAGE_NEXT_BLOCK_EPOCH, DIEM_STORAGE_OTHER_TIMERS_SECONDS,
74 DIEM_STORAGE_ROCKSDB_PROPERTIES,
75 },
76 pruner::Pruner,
77 schema::*,
78 state_store::StateStore,
79 system_store::SystemStore,
80 transaction_store::TransactionStore,
81};
82use diem_types::block_metadata::BlockMetadata;
83
84#[cfg(any(feature = "diemsum"))]
85pub mod diemsum;
86#[cfg(any(test, feature = "fuzzing"))]
88pub mod test_helper;
89
90pub mod backup;
91pub mod errors;
92pub mod metrics;
93pub mod schema;
94
95mod change_set;
96mod event_store;
97mod ledger_counters;
98mod ledger_store;
99mod pruner;
100mod state_store;
101mod system_store;
102mod transaction_store;
103
104#[cfg(any(test, feature = "fuzzing"))]
105#[allow(dead_code)]
106mod diemdb_test;
107
108const MAX_LIMIT: u64 = 1000;
109
110const MAX_NUM_EPOCH_ENDING_LEDGER_INFO: usize = 100;
114
115static ROCKSDB_PROPERTY_MAP: Lazy<HashMap<&str, &str>> = Lazy::new(|| {
116 [
117 (
118 "diem_rocksdb_live_sst_files_size_bytes",
119 "rocksdb.live-sst-files-size",
120 ),
121 (
122 "diem_rocksdb_all_memtables_size_bytes",
123 "rocksdb.size-all-mem-tables",
124 ),
125 (
126 "diem_rocksdb_num_running_compactions",
127 "rocksdb.num-running-compactions",
128 ),
129 (
130 "diem_rocksdb_num_running_flushes",
131 "rocksdb.num-running-flushes",
132 ),
133 (
134 "diem_rocksdb_block_cache_usage_bytes",
135 "rocksdb.block-cache-usage",
136 ),
137 (
138 "diem_rocksdb_cf_size_bytes",
139 "rocksdb.estimate-live-data-size",
140 ),
141 ]
142 .iter()
143 .cloned()
144 .collect()
145});
146
147fn error_if_too_many_requested(
148 num_requested: u64, max_allowed: u64,
149) -> Result<()> {
150 if num_requested > max_allowed {
151 Err(DiemDbError::TooManyRequested(num_requested, max_allowed).into())
152 } else {
153 Ok(())
154 }
155}
156
157fn gen_rocksdb_options(config: &RocksdbConfig) -> Options {
158 let mut db_opts = Options::default();
159 db_opts.set_max_open_files(config.max_open_files);
160 db_opts.set_max_total_wal_size(config.max_total_wal_size);
161 db_opts
162}
163
164fn update_rocksdb_properties(db: &DB) -> Result<()> {
165 let _timer = DIEM_STORAGE_OTHER_TIMERS_SECONDS
166 .with_label_values(&["update_rocksdb_properties"])
167 .start_timer();
168 for cf_name in PosLedgerDB::column_families() {
169 for (property_name, rocksdb_property_argument) in &*ROCKSDB_PROPERTY_MAP
170 {
171 DIEM_STORAGE_ROCKSDB_PROPERTIES
172 .with_label_values(&[cf_name, property_name])
173 .set(
174 db.get_property(cf_name, rocksdb_property_argument)? as i64
175 );
176 }
177 }
178 Ok(())
179}
180
181#[derive(Debug)]
182struct RocksdbPropertyReporter {
183 sender: Mutex<mpsc::Sender<()>>,
184 join_handle: Option<JoinHandle<()>>,
185}
186
187impl RocksdbPropertyReporter {
188 fn new(db: Arc<DB>) -> Self {
189 let (send, recv) = mpsc::channel();
190 let join_handle = Some(thread::spawn(move || loop {
191 if let Err(e) = update_rocksdb_properties(&db) {
192 diem_warn!(
193 error = ?e,
194 "Updating rocksdb property failed."
195 );
196 }
197 match recv.recv_timeout(Duration::from_secs(10)) {
199 Ok(_) => break,
200 Err(mpsc::RecvTimeoutError::Timeout) => (),
201 Err(mpsc::RecvTimeoutError::Disconnected) => break,
202 }
203 }));
204 Self {
205 sender: Mutex::new(send),
206 join_handle,
207 }
208 }
209}
210
211impl Drop for RocksdbPropertyReporter {
212 fn drop(&mut self) {
213 self.sender.lock().unwrap().send(()).unwrap();
215 self.join_handle
216 .take()
217 .expect("Rocksdb property reporting thread must exist.")
218 .join()
219 .expect(
220 "Rocksdb property reporting thread should join peacefully.",
221 );
222 }
223}
224
225#[derive(Debug)]
228pub struct PosLedgerDB {
229 db: Arc<DB>,
230 ledger_store: Arc<LedgerStore>,
231 transaction_store: Arc<TransactionStore>,
232 state_store: Arc<StateStore>,
233 event_store: Arc<EventStore>,
234 system_store: SystemStore,
235 #[allow(dead_code)]
236 rocksdb_property_reporter: RocksdbPropertyReporter,
237 pruner: Option<Pruner>,
238}
239
240impl PosLedgerDB {
241 fn column_families() -> Vec<ColumnFamilyName> {
242 vec![
243 DEFAULT_CF_NAME,
244 EPOCH_BY_VERSION_CF_NAME,
245 EVENT_ACCUMULATOR_CF_NAME,
246 EVENT_BY_KEY_CF_NAME,
247 EVENT_BY_VERSION_CF_NAME,
248 EVENT_CF_NAME,
249 JELLYFISH_MERKLE_NODE_CF_NAME,
250 LEDGER_COUNTERS_CF_NAME,
251 STALE_NODE_INDEX_CF_NAME,
252 TRANSACTION_CF_NAME,
253 TRANSACTION_ACCUMULATOR_CF_NAME,
254 TRANSACTION_BY_ACCOUNT_CF_NAME,
255 TRANSACTION_INFO_CF_NAME,
256 LEDGER_INFO_BY_BLOCK_CF_NAME,
257 POS_STATE_CF_NAME,
258 REWARD_EVENT_CF_NAME,
259 COMMITTED_BLOCK_CF_NAME,
260 COMMITTED_BLOCK_BY_VIEW_CF_NAME,
261 LEDGER_INFO_BY_VOTED_BLOCK_CF_NAME,
262 BLOCK_BY_EPOCH_AND_ROUND_CF_NAME,
263 ]
264 }
265
266 fn new_with_db(db: DB, prune_window: Option<u64>) -> Self {
267 let db = Arc::new(db);
268
269 PosLedgerDB {
270 db: Arc::clone(&db),
271 event_store: Arc::new(EventStore::new(Arc::clone(&db))),
272 ledger_store: Arc::new(LedgerStore::new(Arc::clone(&db))),
273 state_store: Arc::new(StateStore::new(Arc::clone(&db))),
274 transaction_store: Arc::new(TransactionStore::new(Arc::clone(&db))),
275 system_store: SystemStore::new(Arc::clone(&db)),
276 rocksdb_property_reporter: RocksdbPropertyReporter::new(
277 Arc::clone(&db),
278 ),
279 pruner: prune_window.map(|n| Pruner::new(Arc::clone(&db), n)),
280 }
281 }
282
283 pub fn open<P: AsRef<Path> + Clone>(
284 db_root_path: P, readonly: bool, prune_window: Option<u64>,
285 rocksdb_config: RocksdbConfig,
286 ) -> Result<Self> {
287 ensure!(
288 prune_window.is_none() || !readonly,
289 "Do not set prune_window when opening readonly.",
290 );
291
292 let path = db_root_path.as_ref().join("pos-ledger-db");
293 let instant = Instant::now();
294
295 let mut rocksdb_opts = gen_rocksdb_options(&rocksdb_config);
296
297 let db = if readonly {
298 DB::open_readonly(
299 path.clone(),
300 "diemdb_ro",
301 Self::column_families(),
302 rocksdb_opts,
303 )?
304 } else {
305 rocksdb_opts.create_if_missing(true);
306 rocksdb_opts.create_missing_column_families(true);
307 DB::open(
308 path.clone(),
309 "pos-ledger-db",
310 Self::column_families(),
311 rocksdb_opts,
312 )?
313 };
314
315 let ret = Self::new_with_db(db, prune_window);
316 diem_info!(
317 path = path,
318 time_ms = %instant.elapsed().as_millis(),
319 "Opened PosLedgerDB.",
320 );
321 Ok(ret)
322 }
323
324 #[cfg(any(test, feature = "fuzzing"))]
326 pub fn new_for_test<P: AsRef<Path> + Clone>(db_root_path: P) -> Self {
327 Self::open(
328 db_root_path,
329 false, None, RocksdbConfig::default(),
332 )
333 .expect("Unable to open DiemDB")
334 }
335
336 pub fn update_rocksdb_properties(&self) -> Result<()> {
338 update_rocksdb_properties(&self.db)
339 }
340
341 fn get_epoch_ending_ledger_infos(
347 &self, start_epoch: u64, end_epoch: u64, limit: usize,
348 ) -> Result<(Vec<LedgerInfoWithSignatures>, bool)> {
349 self.get_epoch_ending_ledger_infos_impl(start_epoch, end_epoch, limit)
350 }
351
352 fn get_epoch_ending_ledger_infos_impl(
353 &self, start_epoch: u64, end_epoch: u64, limit: usize,
354 ) -> Result<(Vec<LedgerInfoWithSignatures>, bool)> {
355 ensure!(
356 start_epoch <= end_epoch,
357 "Bad epoch range [{}, {})",
358 start_epoch,
359 end_epoch,
360 );
361 let latest_epoch = self
365 .ledger_store
366 .get_latest_ledger_info()?
367 .ledger_info()
368 .next_block_epoch();
369 ensure!(
370 end_epoch <= latest_epoch,
371 "Unable to provide epoch change ledger info for still open epoch. asked upper bound: {}, last sealed epoch: {}",
372 end_epoch,
373 latest_epoch - 1, );
375
376 let (paging_epoch, more) = if end_epoch - start_epoch > limit as u64 {
377 (start_epoch + limit as u64, true)
378 } else {
379 (end_epoch, false)
380 };
381
382 let lis = self
383 .ledger_store
384 .get_epoch_ending_ledger_info_iter(start_epoch, paging_epoch)?
385 .collect::<Result<Vec<_>>>()?;
386 ensure!(
387 lis.len() == (paging_epoch - start_epoch) as usize,
388 "DB corruption: missing epoch ending ledger info for epoch {}",
389 lis.last()
390 .map(|li| li.ledger_info().next_block_epoch())
391 .unwrap_or(start_epoch),
392 );
393 Ok((lis, more))
394 }
395
396 fn get_transaction_with_proof(
397 &self, version: Version, ledger_version: Version, fetch_events: bool,
398 ) -> Result<TransactionWithProof> {
399 let proof = self
400 .ledger_store
401 .get_transaction_info_with_proof(version, ledger_version)?;
402 let transaction = self.transaction_store.get_transaction(version)?;
403
404 let events = if fetch_events {
406 Some(self.event_store.get_events_by_version(version)?)
407 } else {
408 None
409 };
410
411 Ok(TransactionWithProof {
412 version,
413 transaction,
414 events,
415 proof,
416 })
417 }
418
419 pub fn get_transaction(&self, version: Version) -> Result<Transaction> {
421 self.transaction_store.get_transaction(version)
422 }
423
424 pub fn get_transaction_block_meta(
425 &self, version: Version,
426 ) -> Result<Option<(Version, BlockMetadata)>> {
427 self.transaction_store.get_block_metadata(version)
428 }
429
430 pub fn get_transaction_info(
431 &self, version: u64,
432 ) -> Result<TransactionInfo> {
433 self.ledger_store.get_transaction_info(version)
434 }
435
436 pub fn get_backup_handler(&self) -> BackupHandler {
441 BackupHandler::new(
442 Arc::clone(&self.ledger_store),
443 Arc::clone(&self.transaction_store),
444 Arc::clone(&self.state_store),
445 Arc::clone(&self.event_store),
446 )
447 }
448
449 fn seal_change_set(
454 &self, first_version: Version, num_txns: Version, mut cs: ChangeSet,
455 ) -> Result<(SealedChangeSet, Option<LedgerCounters>)> {
456 let counters = if num_txns > 0 {
458 Some(self.system_store.bump_ledger_counters(
459 first_version,
460 first_version + num_txns - 1,
461 &mut cs,
462 )?)
463 } else {
464 None
465 };
466
467 Ok((SealedChangeSet { batch: cs.batch }, counters))
468 }
469
470 fn save_transactions_impl(
471 &self, txns_to_commit: &[TransactionToCommit], first_version: u64,
472 mut cs: &mut ChangeSet,
473 ) -> Result<HashValue> {
474 let last_version = first_version + txns_to_commit.len() as u64 - 1;
475
476 let account_state_sets = txns_to_commit
478 .iter()
479 .map(|txn_to_commit| txn_to_commit.account_states().clone())
480 .collect::<Vec<_>>();
481 let state_root_hashes = if first_version == 0 {
482 self.state_store.put_account_state_sets(
484 account_state_sets,
485 first_version,
486 &mut cs,
487 )?
488 } else {
489 vec![Default::default(); txns_to_commit.len()]
491 };
492 diem_debug!(
493 "save_transactions_impl: {} {:?}",
494 first_version,
495 state_root_hashes
496 );
497
498 let event_root_hashes =
500 zip_eq(first_version..=last_version, txns_to_commit)
501 .map(|(ver, txn_to_commit)| {
502 self.event_store.put_events(
503 ver,
504 txn_to_commit.events(),
505 &mut cs,
506 )
507 })
508 .collect::<Result<Vec<_>>>()?;
509
510 zip_eq(first_version..=last_version, txns_to_commit).try_for_each(
512 |(ver, txn_to_commit)| {
513 self.transaction_store.put_transaction(
514 ver,
515 txn_to_commit.transaction(),
516 &mut cs,
517 )
518 },
519 )?;
520
521 let txn_infos =
523 izip!(txns_to_commit, state_root_hashes, event_root_hashes)
524 .map(|(t, s, e)| {
525 Ok(TransactionInfo::new(
526 t.transaction().hash(),
527 s,
528 e,
529 t.gas_used(),
530 t.status().clone(),
531 ))
532 })
533 .collect::<Result<Vec<_>>>()?;
534 assert_eq!(txn_infos.len(), txns_to_commit.len());
535
536 let mut new_root_hash = self.ledger_store.put_transaction_infos(
537 first_version,
538 &txn_infos,
539 &mut cs,
540 )?;
541 if first_version != 0 {
542 new_root_hash = Default::default();
544 };
545
546 Ok(new_root_hash)
547 }
548
549 fn commit(&self, sealed_cs: SealedChangeSet) -> Result<()> {
553 self.db.write_schemas(sealed_cs.batch, false)?;
554
555 Ok(())
556 }
557
558 fn wake_pruner(&self, latest_version: Version) {
559 if let Some(pruner) = self.pruner.as_ref() {
560 pruner.wake(latest_version)
561 }
562 }
563}
564
565impl DbReader for PosLedgerDB {
566 fn get_epoch_ending_ledger_infos(
567 &self, start_epoch: u64, end_epoch: u64,
568 ) -> Result<EpochChangeProof> {
569 gauged_api("get_epoch_ending_ledger_infos", || {
570 let (ledger_info_with_sigs, more) =
571 Self::get_epoch_ending_ledger_infos(
572 &self,
573 start_epoch,
574 end_epoch,
575 MAX_NUM_EPOCH_ENDING_LEDGER_INFO,
576 )?;
577 Ok(EpochChangeProof::new(ledger_info_with_sigs, more))
578 })
579 }
580
581 fn get_transactions(
588 &self, start_version: Version, limit: u64, ledger_version: Version,
589 fetch_events: bool,
590 ) -> Result<TransactionListWithProof> {
591 gauged_api("get_transactions", || {
592 error_if_too_many_requested(limit, MAX_LIMIT)?;
593
594 if start_version > ledger_version || limit == 0 {
595 return Ok(TransactionListWithProof::new_empty());
596 }
597
598 let limit =
599 std::cmp::min(limit, ledger_version - start_version + 1);
600
601 let txns = (start_version..start_version + limit)
602 .map(|version| self.transaction_store.get_transaction(version))
603 .collect::<Result<Vec<_>>>()?;
604 let txn_infos = (start_version..start_version + limit)
605 .map(|version| self.ledger_store.get_transaction_info(version))
606 .collect::<Result<Vec<_>>>()?;
607 let events = if fetch_events {
608 Some(
609 (start_version..start_version + limit)
610 .map(|version| {
611 self.event_store.get_events_by_version(version)
612 })
613 .collect::<Result<Vec<_>>>()?,
614 )
615 } else {
616 None
617 };
618 let proof = TransactionListProof::new(
619 self.ledger_store.get_transaction_range_proof(
620 Some(start_version),
621 limit,
622 ledger_version,
623 )?,
624 txn_infos,
625 );
626
627 Ok(TransactionListWithProof::new(
628 txns,
629 events,
630 Some(start_version),
631 proof,
632 ))
633 })
634 }
635
636 fn get_block_timestamp(&self, version: u64) -> Result<u64> {
637 gauged_api("get_block_timestamp", || {
638 let ts = match self.transaction_store.get_block_metadata(version)? {
639 Some((_v, block_meta)) => block_meta.into_inner().1,
640 None => 0,
642 };
643 Ok(ts)
644 })
645 }
646
647 fn get_latest_account_state(
648 &self, address: AccountAddress,
649 ) -> Result<Option<AccountStateBlob>> {
650 gauged_api("get_latest_account_state", || {
651 let ledger_info_with_sigs =
652 self.ledger_store.get_latest_ledger_info()?;
653 let version = ledger_info_with_sigs.ledger_info().version();
654 let (blob, _proof) = self
655 .state_store
656 .get_account_state_with_proof_by_version(address, version)?;
657 Ok(blob)
658 })
659 }
660
661 fn get_latest_ledger_info(&self) -> Result<LedgerInfoWithSignatures> {
662 gauged_api("get_latest_ledger_info", || {
663 self.ledger_store.get_latest_ledger_info()
664 })
665 }
666
667 fn get_startup_info(
668 &self, need_pos_state: bool,
669 ) -> Result<Option<StartupInfo>> {
670 gauged_api("get_startup_info", || {
671 self.ledger_store.get_startup_info(need_pos_state)
672 })
673 }
674
675 fn get_txn_by_account(
679 &self, address: AccountAddress, seq_num: u64, ledger_version: Version,
680 fetch_events: bool,
681 ) -> Result<Option<TransactionWithProof>> {
682 gauged_api("get_txn_by_account", || {
683 self.transaction_store
684 .lookup_transaction_by_account(
685 address,
686 seq_num,
687 ledger_version,
688 )?
689 .map(|version| {
690 self.get_transaction_with_proof(
691 version,
692 ledger_version,
693 fetch_events,
694 )
695 })
696 .transpose()
697 })
698 }
699
700 fn get_state_proof_with_ledger_info(
701 &self, known_version: u64,
702 ledger_info_with_sigs: LedgerInfoWithSignatures,
703 ) -> Result<(EpochChangeProof, AccumulatorConsistencyProof)> {
704 gauged_api("get_state_proof_with_ledger_info", || {
705 let ledger_info = ledger_info_with_sigs.ledger_info();
706 ensure!(
707 known_version <= ledger_info.version(),
708 "Client known_version {} larger than ledger version {}.",
709 known_version,
710 ledger_info.version(),
711 );
712 let known_epoch = self.ledger_store.get_epoch(known_version)?;
713 let epoch_change_proof =
714 if known_epoch < ledger_info.next_block_epoch() {
715 let (ledger_infos_with_sigs, more) = self
716 .get_epoch_ending_ledger_infos(
717 known_epoch,
718 ledger_info.next_block_epoch(),
719 usize::MAX,
722 )?;
723 EpochChangeProof::new(ledger_infos_with_sigs, more)
724 } else {
725 EpochChangeProof::new(vec![], false)
726 };
727
728 let ledger_consistency_proof = self
729 .ledger_store
730 .get_consistency_proof(known_version, ledger_info.version())?;
731 Ok((epoch_change_proof, ledger_consistency_proof))
732 })
733 }
734
735 fn get_state_proof(
736 &self, known_version: u64,
737 ) -> Result<(
738 LedgerInfoWithSignatures,
739 EpochChangeProof,
740 AccumulatorConsistencyProof,
741 )> {
742 gauged_api("get_state_proof", || {
743 let ledger_info_with_sigs =
744 self.ledger_store.get_latest_ledger_info()?;
745 let (epoch_change_proof, ledger_consistency_proof) = self
746 .get_state_proof_with_ledger_info(
747 known_version,
748 ledger_info_with_sigs.clone(),
749 )?;
750 Ok((
751 ledger_info_with_sigs,
752 epoch_change_proof,
753 ledger_consistency_proof,
754 ))
755 })
756 }
757
758 fn get_account_state_with_proof(
759 &self, address: AccountAddress, version: Version,
760 ledger_version: Version,
761 ) -> Result<AccountStateWithProof> {
762 gauged_api("get_account_state_with_proof", || {
763 ensure!(
764 version <= ledger_version,
765 "The queried version {} should be equal to or older than ledger version {}.",
766 version,
767 ledger_version
768 );
769 {
770 let latest_version = self.get_latest_version()?;
771 ensure!(
772 ledger_version <= latest_version,
773 "ledger_version specified {} is greater than committed version {}.",
774 ledger_version,
775 latest_version
776 );
777 }
778
779 let txn_info_with_proof = self
780 .ledger_store
781 .get_transaction_info_with_proof(version, ledger_version)?;
782 let (account_state_blob, sparse_merkle_proof) = self
783 .state_store
784 .get_account_state_with_proof_by_version(address, version)?;
785 Ok(AccountStateWithProof::new(
786 version,
787 account_state_blob,
788 AccountStateProof::new(
789 txn_info_with_proof,
790 sparse_merkle_proof,
791 ),
792 ))
793 })
794 }
795
796 fn get_account_state_with_proof_by_version(
797 &self, address: AccountAddress, version: Version,
798 ) -> Result<(
799 Option<AccountStateBlob>,
800 SparseMerkleProof<AccountStateBlob>,
801 )> {
802 gauged_api("get_account_state_with_proof_by_version", || {
803 self.state_store
804 .get_account_state_with_proof_by_version(address, version)
805 })
806 }
807
808 fn get_latest_state_root(&self) -> Result<(Version, HashValue)> {
809 gauged_api("get_latest_state_root", || {
810 let (version, txn_info) =
811 self.ledger_store.get_latest_transaction_info()?;
812 Ok((version, txn_info.state_root_hash()))
813 })
814 }
815
816 fn get_latest_tree_state(&self) -> Result<TreeState> {
817 gauged_api("get_latest_tree_state", || {
818 let tree_state =
819 match self.ledger_store.get_latest_transaction_info_option()? {
820 Some((version, txn_info)) => self
821 .ledger_store
822 .get_tree_state(version + 1, txn_info)?,
823 None => TreeState::new(
824 0,
825 vec![],
826 self.state_store
827 .get_root_hash_option(PRE_GENESIS_VERSION)?
828 .unwrap_or(*SPARSE_MERKLE_PLACEHOLDER_HASH),
829 ),
830 };
831
832 diem_info!(
833 num_transactions = tree_state.num_transactions,
834 state_root_hash = %tree_state.account_state_root_hash,
835 description = tree_state.describe(),
836 "Got latest TreeState."
837 );
838
839 Ok(tree_state)
840 })
841 }
842
843 fn get_epoch_ending_ledger_info(
845 &self, version: u64,
846 ) -> Result<LedgerInfoWithSignatures> {
847 gauged_api("get_epoch_ending_ledger_info", || {
848 self.ledger_store.get_epoch_ending_ledger_info(version)
849 })
850 }
851
852 fn get_latest_transaction_info_option(
853 &self,
854 ) -> Result<Option<(Version, TransactionInfo)>> {
855 gauged_api("get_latest_transaction_info_option", || {
856 self.ledger_store.get_latest_transaction_info_option()
857 })
858 }
859
860 fn get_accumulator_root_hash(&self, version: Version) -> Result<HashValue> {
861 gauged_api("get_accumulator_root_hash", || {
862 self.ledger_store.get_root_hash(version)
863 })
864 }
865
866 fn get_pos_state(&self, block_id: &HashValue) -> Result<PosState> {
867 diem_debug!("get_pos_state:{}", block_id);
868 self.ledger_store.get_pos_state(block_id)
869 }
870
871 fn get_latest_pos_state(&self) -> Arc<PosState> {
872 self.ledger_store.get_latest_pos_state()
873 }
874}
875
876impl DbWriter for PosLedgerDB {
877 fn save_transactions(
885 &self, txns_to_commit: &[TransactionToCommit], first_version: Version,
886 ledger_info_with_sigs: Option<&LedgerInfoWithSignatures>,
887 pos_state: Option<PosState>, committed_blocks: Vec<CommittedBlock>,
888 ledger_infos_with_voted_block: Vec<(
889 HashValue,
890 LedgerInfoWithSignatures,
891 )>,
892 ) -> Result<()> {
893 gauged_api("save_transactions", || {
894 let num_txns = txns_to_commit.len() as u64;
895 ensure!(
899 ledger_info_with_sigs.is_some() || num_txns > 0,
900 "txns_to_commit is empty while ledger_info_with_sigs is None.",
901 );
902
903 let mut cs = ChangeSet::new();
905
906 if let Some(x) = ledger_info_with_sigs {
907 let claimed_last_version = x.ledger_info().version();
908 ensure!(
909 claimed_last_version + 1 == first_version + num_txns,
910 "Transaction batch not applicable: first_version {}, num_txns {}, last_version {}",
911 first_version,
912 num_txns,
913 claimed_last_version,
914 );
915 }
916
917 let _new_root_hash = self.save_transactions_impl(
918 txns_to_commit,
919 first_version,
920 &mut cs,
921 )?;
922
923 for b in committed_blocks {
924 self.ledger_store.put_committed_block(&b, &mut cs)?;
925 }
926
927 for (voted_block, ledger_info) in ledger_infos_with_voted_block {
928 self.ledger_store.put_ledger_info_by_voted_block(
929 &voted_block,
930 &ledger_info,
931 &mut cs,
932 )?;
933 }
934
935 if let Some(x) = ledger_info_with_sigs {
938 self.ledger_store.put_ledger_info(x, &mut cs)?;
948 if let Some(pos_state) = pos_state {
949 self.ledger_store.put_pos_state(
951 &x.ledger_info().consensus_block_id(),
952 pos_state,
953 &mut cs,
954 )?;
955 }
956 }
957
958 let (sealed_cs, counters) =
960 self.seal_change_set(first_version, num_txns, cs)?;
961 {
962 let _timer = DIEM_STORAGE_OTHER_TIMERS_SECONDS
963 .with_label_values(&["save_transactions_commit"])
964 .start_timer();
965 self.commit(sealed_cs)?;
966 }
967
968 if let Some(x) = ledger_info_with_sigs {
971 self.ledger_store.set_latest_ledger_info(x.clone());
972
973 DIEM_STORAGE_LEDGER_VERSION
974 .set(x.ledger_info().version() as i64);
975 DIEM_STORAGE_NEXT_BLOCK_EPOCH
976 .set(x.ledger_info().next_block_epoch() as i64);
977 }
978
979 if num_txns > 0 {
983 let last_version = first_version + num_txns - 1;
984 DIEM_STORAGE_COMMITTED_TXNS.inc_by(num_txns);
985 DIEM_STORAGE_LATEST_TXN_VERSION.set(last_version as i64);
986 counters
987 .expect("Counters should be bumped with transactions being saved.")
988 .bump_op_counters();
989
990 self.wake_pruner(last_version);
991 }
992
993 Ok(())
994 })
995 }
996
997 fn save_reward_event(
998 &self, epoch: u64, event: &RewardDistributionEventV2,
999 ) -> Result<()> {
1000 self.ledger_store.put_reward_event(epoch, event)
1001 }
1002
1003 fn delete_pos_state_by_block(&self, block_id: &HashValue) -> Result<()> {
1004 self.ledger_store.delete_pos_state(block_id)
1005 }
1006}
1007
1008impl DBReaderForPoW for PosLedgerDB {
1009 fn get_latest_ledger_info_option(
1010 &self,
1011 ) -> Option<LedgerInfoWithSignatures> {
1012 self.ledger_store.get_latest_ledger_info_option()
1013 }
1014
1015 fn get_block_ledger_info(
1016 &self, consensus_block_id: &HashValue,
1017 ) -> Result<LedgerInfoWithSignatures> {
1018 self.ledger_store.get_block_ledger_info(consensus_block_id)
1019 }
1020
1021 fn get_events_by_version(
1022 &self, start_version: u64, end_version: u64,
1023 ) -> Result<Vec<ContractEvent>> {
1024 let iter = self.event_store.get_events_by_version_iter(
1025 start_version,
1026 (end_version - start_version) as usize,
1027 )?;
1028 let events_vec = iter.collect::<Result<Vec<Vec<ContractEvent>>>>()?;
1029 Ok(events_vec.into_iter().flatten().collect())
1030 }
1031
1032 fn get_epoch_ending_blocks(
1033 &self, start_epoch: u64, end_epoch: u64,
1034 ) -> Result<Vec<HashValue>> {
1035 let mut ending_blocks = Vec::new();
1036 for ledger_info in self
1037 .ledger_store
1038 .get_epoch_ending_ledger_info_iter(start_epoch, end_epoch)?
1039 {
1040 ending_blocks.push(ledger_info?.ledger_info().consensus_block_id());
1041 }
1042 Ok(ending_blocks)
1043 }
1044
1045 fn get_reward_event(
1046 &self, epoch: u64,
1047 ) -> Result<RewardDistributionEventV2> {
1048 self.ledger_store.get_reward_event(epoch)
1049 }
1050
1051 fn get_committed_block_by_hash(
1052 &self, block_hash: &HashValue,
1053 ) -> Result<CommittedBlock> {
1054 self.ledger_store.get_committed_block_by_hash(block_hash)
1055 }
1056
1057 fn get_committed_block_hash_by_view(&self, view: u64) -> Result<HashValue> {
1058 self.ledger_store.get_committed_block_hash_by_view(view)
1059 }
1060
1061 fn get_ledger_info_by_voted_block(
1062 &self, block_id: &HashValue,
1063 ) -> Result<LedgerInfoWithSignatures> {
1064 self.ledger_store.get_ledger_info_by_voted_block(block_id)
1065 }
1066
1067 fn get_block_hash_by_epoch_and_round(
1068 &self, epoch: u64, round: u64,
1069 ) -> Result<HashValue> {
1070 self.ledger_store
1071 .get_block_hash_by_epoch_and_round(epoch, round)
1072 }
1073}
1074
1075#[cfg(any(test, feature = "fuzzing"))]
1076fn get_first_seq_num_and_limit(
1078 order: Order, cursor: u64, limit: u64,
1079) -> Result<(u64, u64)> {
1080 ensure!(limit > 0, "limit should > 0, got {}", limit);
1081
1082 Ok(if order == Order::Ascending {
1083 (cursor, limit)
1084 } else if limit <= cursor {
1085 (cursor - limit + 1, limit)
1086 } else {
1087 (0, cursor + 1)
1088 })
1089}
1090
1091pub trait GetRestoreHandler {
1092 fn get_restore_handler(&self) -> RestoreHandler;
1094}
1095
1096impl GetRestoreHandler for Arc<PosLedgerDB> {
1097 fn get_restore_handler(&self) -> RestoreHandler {
1098 RestoreHandler::new(
1099 Arc::clone(&self.db),
1100 Arc::clone(self),
1101 Arc::clone(&self.ledger_store),
1102 Arc::clone(&self.transaction_store),
1103 Arc::clone(&self.state_store),
1104 Arc::clone(&self.event_store),
1105 )
1106 }
1107}
1108
1109fn gauged_api<T, F>(api_name: &'static str, api_impl: F) -> Result<T>
1110where F: FnOnce() -> Result<T> {
1111 let timer = Instant::now();
1112
1113 let res = api_impl();
1114
1115 let res_type = match &res {
1116 Ok(_) => "Ok",
1117 Err(e) => {
1118 diem_warn!(
1119 api_name = api_name,
1120 error = ?e,
1121 "DiemDB API returned error."
1122 );
1123 "Err"
1124 }
1125 };
1126 DIEM_STORAGE_API_LATENCY_SECONDS
1127 .with_label_values(&[api_name, res_type])
1128 .observe(timer.elapsed().as_secs_f64());
1129
1130 res
1131}