pos_ledger_db/
lib.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8#![forbid(unsafe_code)]
9
10//! This crate provides [`PosLedgerDB`] which represents physical storage of the
11//! core Diem data structures.
12//!
13//! It relays read/write operations on the physical storage via [`schemadb`] to
14//! the underlying Key-Value storage system, and implements diem data structures
15//! on top of it.
16
17use std::{
18    collections::HashMap,
19    iter::Iterator,
20    path::Path,
21    sync::{mpsc, Arc, Mutex},
22    thread::{self, JoinHandle},
23    time::{Duration, Instant},
24};
25
26use anyhow::{ensure, Result};
27use itertools::{izip, zip_eq};
28use once_cell::sync::Lazy;
29
30use diem_config::config::RocksdbConfig;
31use diem_crypto::hash::{
32    CryptoHash, HashValue, SPARSE_MERKLE_PLACEHOLDER_HASH,
33};
34use diem_logger::prelude::*;
35use diem_types::{
36    account_address::AccountAddress,
37    account_state_blob::{AccountStateBlob, AccountStateWithProof},
38    committed_block::CommittedBlock,
39    contract_event::ContractEvent,
40    epoch_change::EpochChangeProof,
41    ledger_info::LedgerInfoWithSignatures,
42    proof::{
43        AccountStateProof, AccumulatorConsistencyProof, SparseMerkleProof,
44        TransactionListProof,
45    },
46    reward_distribution_event::RewardDistributionEventV2,
47    term_state::PosState,
48    transaction::{
49        Transaction, TransactionInfo, TransactionListWithProof,
50        TransactionToCommit, TransactionWithProof, Version,
51        PRE_GENESIS_VERSION,
52    },
53};
54#[cfg(feature = "fuzzing")]
55pub use diemdb_test::test_save_blocks_impl;
56use schemadb::{ColumnFamilyName, Options, DB, DEFAULT_CF_NAME};
57#[cfg(any(test, feature = "fuzzing"))]
58use storage_interface::Order;
59use storage_interface::{
60    DBReaderForPoW, DbReader, DbWriter, StartupInfo, TreeState,
61};
62
63use crate::{
64    backup::{backup_handler::BackupHandler, restore_handler::RestoreHandler},
65    change_set::{ChangeSet, SealedChangeSet},
66    errors::DiemDbError,
67    event_store::EventStore,
68    ledger_counters::LedgerCounters,
69    ledger_store::LedgerStore,
70    metrics::{
71        DIEM_STORAGE_API_LATENCY_SECONDS, DIEM_STORAGE_COMMITTED_TXNS,
72        DIEM_STORAGE_LATEST_TXN_VERSION, DIEM_STORAGE_LEDGER_VERSION,
73        DIEM_STORAGE_NEXT_BLOCK_EPOCH, DIEM_STORAGE_OTHER_TIMERS_SECONDS,
74        DIEM_STORAGE_ROCKSDB_PROPERTIES,
75    },
76    pruner::Pruner,
77    schema::*,
78    state_store::StateStore,
79    system_store::SystemStore,
80    transaction_store::TransactionStore,
81};
82use diem_types::block_metadata::BlockMetadata;
83
84#[cfg(any(feature = "diemsum"))]
85pub mod diemsum;
86// Used in this and other crates for testing.
87#[cfg(any(test, feature = "fuzzing"))]
88pub mod test_helper;
89
90pub mod backup;
91pub mod errors;
92pub mod metrics;
93pub mod schema;
94
95mod change_set;
96mod event_store;
97mod ledger_counters;
98mod ledger_store;
99mod pruner;
100mod state_store;
101mod system_store;
102mod transaction_store;
103
104#[cfg(any(test, feature = "fuzzing"))]
105#[allow(dead_code)]
106mod diemdb_test;
107
108const MAX_LIMIT: u64 = 1000;
109
110// TODO: Either implement an iteration API to allow a very old client to loop
111// through a long history or guarantee that there is always a recent enough
112// waypoint and client knows to boot from there.
113const MAX_NUM_EPOCH_ENDING_LEDGER_INFO: usize = 100;
114
115static ROCKSDB_PROPERTY_MAP: Lazy<HashMap<&str, &str>> = Lazy::new(|| {
116    [
117        (
118            "diem_rocksdb_live_sst_files_size_bytes",
119            "rocksdb.live-sst-files-size",
120        ),
121        (
122            "diem_rocksdb_all_memtables_size_bytes",
123            "rocksdb.size-all-mem-tables",
124        ),
125        (
126            "diem_rocksdb_num_running_compactions",
127            "rocksdb.num-running-compactions",
128        ),
129        (
130            "diem_rocksdb_num_running_flushes",
131            "rocksdb.num-running-flushes",
132        ),
133        (
134            "diem_rocksdb_block_cache_usage_bytes",
135            "rocksdb.block-cache-usage",
136        ),
137        (
138            "diem_rocksdb_cf_size_bytes",
139            "rocksdb.estimate-live-data-size",
140        ),
141    ]
142    .iter()
143    .cloned()
144    .collect()
145});
146
147fn error_if_too_many_requested(
148    num_requested: u64, max_allowed: u64,
149) -> Result<()> {
150    if num_requested > max_allowed {
151        Err(DiemDbError::TooManyRequested(num_requested, max_allowed).into())
152    } else {
153        Ok(())
154    }
155}
156
157fn gen_rocksdb_options(config: &RocksdbConfig) -> Options {
158    let mut db_opts = Options::default();
159    db_opts.set_max_open_files(config.max_open_files);
160    db_opts.set_max_total_wal_size(config.max_total_wal_size);
161    db_opts
162}
163
164fn update_rocksdb_properties(db: &DB) -> Result<()> {
165    let _timer = DIEM_STORAGE_OTHER_TIMERS_SECONDS
166        .with_label_values(&["update_rocksdb_properties"])
167        .start_timer();
168    for cf_name in PosLedgerDB::column_families() {
169        for (property_name, rocksdb_property_argument) in &*ROCKSDB_PROPERTY_MAP
170        {
171            DIEM_STORAGE_ROCKSDB_PROPERTIES
172                .with_label_values(&[cf_name, property_name])
173                .set(
174                    db.get_property(cf_name, rocksdb_property_argument)? as i64
175                );
176        }
177    }
178    Ok(())
179}
180
181#[derive(Debug)]
182struct RocksdbPropertyReporter {
183    sender: Mutex<mpsc::Sender<()>>,
184    join_handle: Option<JoinHandle<()>>,
185}
186
187impl RocksdbPropertyReporter {
188    fn new(db: Arc<DB>) -> Self {
189        let (send, recv) = mpsc::channel();
190        let join_handle = Some(thread::spawn(move || loop {
191            if let Err(e) = update_rocksdb_properties(&db) {
192                diem_warn!(
193                    error = ?e,
194                    "Updating rocksdb property failed."
195                );
196            }
197            // report rocksdb properties each 10 seconds
198            match recv.recv_timeout(Duration::from_secs(10)) {
199                Ok(_) => break,
200                Err(mpsc::RecvTimeoutError::Timeout) => (),
201                Err(mpsc::RecvTimeoutError::Disconnected) => break,
202            }
203        }));
204        Self {
205            sender: Mutex::new(send),
206            join_handle,
207        }
208    }
209}
210
211impl Drop for RocksdbPropertyReporter {
212    fn drop(&mut self) {
213        // Notify the property reporting thread to exit
214        self.sender.lock().unwrap().send(()).unwrap();
215        self.join_handle
216            .take()
217            .expect("Rocksdb property reporting thread must exist.")
218            .join()
219            .expect(
220                "Rocksdb property reporting thread should join peacefully.",
221            );
222    }
223}
224
225/// This holds a handle to the underlying DB responsible for physical storage
226/// and provides APIs for access to the core Diem data structures.
227#[derive(Debug)]
228pub struct PosLedgerDB {
229    db: Arc<DB>,
230    ledger_store: Arc<LedgerStore>,
231    transaction_store: Arc<TransactionStore>,
232    state_store: Arc<StateStore>,
233    event_store: Arc<EventStore>,
234    system_store: SystemStore,
235    #[allow(dead_code)]
236    rocksdb_property_reporter: RocksdbPropertyReporter,
237    pruner: Option<Pruner>,
238}
239
240impl PosLedgerDB {
241    fn column_families() -> Vec<ColumnFamilyName> {
242        vec![
243            /* LedgerInfo CF = */ DEFAULT_CF_NAME,
244            EPOCH_BY_VERSION_CF_NAME,
245            EVENT_ACCUMULATOR_CF_NAME,
246            EVENT_BY_KEY_CF_NAME,
247            EVENT_BY_VERSION_CF_NAME,
248            EVENT_CF_NAME,
249            JELLYFISH_MERKLE_NODE_CF_NAME,
250            LEDGER_COUNTERS_CF_NAME,
251            STALE_NODE_INDEX_CF_NAME,
252            TRANSACTION_CF_NAME,
253            TRANSACTION_ACCUMULATOR_CF_NAME,
254            TRANSACTION_BY_ACCOUNT_CF_NAME,
255            TRANSACTION_INFO_CF_NAME,
256            LEDGER_INFO_BY_BLOCK_CF_NAME,
257            POS_STATE_CF_NAME,
258            REWARD_EVENT_CF_NAME,
259            COMMITTED_BLOCK_CF_NAME,
260            COMMITTED_BLOCK_BY_VIEW_CF_NAME,
261            LEDGER_INFO_BY_VOTED_BLOCK_CF_NAME,
262            BLOCK_BY_EPOCH_AND_ROUND_CF_NAME,
263        ]
264    }
265
266    fn new_with_db(db: DB, prune_window: Option<u64>) -> Self {
267        let db = Arc::new(db);
268
269        PosLedgerDB {
270            db: Arc::clone(&db),
271            event_store: Arc::new(EventStore::new(Arc::clone(&db))),
272            ledger_store: Arc::new(LedgerStore::new(Arc::clone(&db))),
273            state_store: Arc::new(StateStore::new(Arc::clone(&db))),
274            transaction_store: Arc::new(TransactionStore::new(Arc::clone(&db))),
275            system_store: SystemStore::new(Arc::clone(&db)),
276            rocksdb_property_reporter: RocksdbPropertyReporter::new(
277                Arc::clone(&db),
278            ),
279            pruner: prune_window.map(|n| Pruner::new(Arc::clone(&db), n)),
280        }
281    }
282
283    pub fn open<P: AsRef<Path> + Clone>(
284        db_root_path: P, readonly: bool, prune_window: Option<u64>,
285        rocksdb_config: RocksdbConfig,
286    ) -> Result<Self> {
287        ensure!(
288            prune_window.is_none() || !readonly,
289            "Do not set prune_window when opening readonly.",
290        );
291
292        let path = db_root_path.as_ref().join("pos-ledger-db");
293        let instant = Instant::now();
294
295        let mut rocksdb_opts = gen_rocksdb_options(&rocksdb_config);
296
297        let db = if readonly {
298            DB::open_readonly(
299                path.clone(),
300                "diemdb_ro",
301                Self::column_families(),
302                rocksdb_opts,
303            )?
304        } else {
305            rocksdb_opts.create_if_missing(true);
306            rocksdb_opts.create_missing_column_families(true);
307            DB::open(
308                path.clone(),
309                "pos-ledger-db",
310                Self::column_families(),
311                rocksdb_opts,
312            )?
313        };
314
315        let ret = Self::new_with_db(db, prune_window);
316        diem_info!(
317            path = path,
318            time_ms = %instant.elapsed().as_millis(),
319            "Opened PosLedgerDB.",
320        );
321        Ok(ret)
322    }
323
324    /// This opens db in non-readonly mode, without the pruner.
325    #[cfg(any(test, feature = "fuzzing"))]
326    pub fn new_for_test<P: AsRef<Path> + Clone>(db_root_path: P) -> Self {
327        Self::open(
328            db_root_path,
329            false, /* readonly */
330            None,  /* pruner */
331            RocksdbConfig::default(),
332        )
333        .expect("Unable to open DiemDB")
334    }
335
336    /// This force the db to update rocksdb properties immediately.
337    pub fn update_rocksdb_properties(&self) -> Result<()> {
338        update_rocksdb_properties(&self.db)
339    }
340
341    /// Returns ledger infos reflecting epoch bumps starting with the given
342    /// epoch. If there are no more than `MAX_NUM_EPOCH_ENDING_LEDGER_INFO`
343    /// results, this function returns all of them, otherwise the first
344    /// `MAX_NUM_EPOCH_ENDING_LEDGER_INFO` results are returned and a flag
345    /// (when true) will be used to indicate the fact that there is more.
346    fn get_epoch_ending_ledger_infos(
347        &self, start_epoch: u64, end_epoch: u64, limit: usize,
348    ) -> Result<(Vec<LedgerInfoWithSignatures>, bool)> {
349        self.get_epoch_ending_ledger_infos_impl(start_epoch, end_epoch, limit)
350    }
351
352    fn get_epoch_ending_ledger_infos_impl(
353        &self, start_epoch: u64, end_epoch: u64, limit: usize,
354    ) -> Result<(Vec<LedgerInfoWithSignatures>, bool)> {
355        ensure!(
356            start_epoch <= end_epoch,
357            "Bad epoch range [{}, {})",
358            start_epoch,
359            end_epoch,
360        );
361        // Note that the latest epoch can be the same with the current epoch (in
362        // most cases), or current_epoch + 1 (when the latest
363        // ledger_info carries next validator set)
364        let latest_epoch = self
365            .ledger_store
366            .get_latest_ledger_info()?
367            .ledger_info()
368            .next_block_epoch();
369        ensure!(
370            end_epoch <= latest_epoch,
371            "Unable to provide epoch change ledger info for still open epoch. asked upper bound: {}, last sealed epoch: {}",
372            end_epoch,
373            latest_epoch - 1,  // okay to -1 because genesis LedgerInfo has .next_block_epoch() == 1
374        );
375
376        let (paging_epoch, more) = if end_epoch - start_epoch > limit as u64 {
377            (start_epoch + limit as u64, true)
378        } else {
379            (end_epoch, false)
380        };
381
382        let lis = self
383            .ledger_store
384            .get_epoch_ending_ledger_info_iter(start_epoch, paging_epoch)?
385            .collect::<Result<Vec<_>>>()?;
386        ensure!(
387            lis.len() == (paging_epoch - start_epoch) as usize,
388            "DB corruption: missing epoch ending ledger info for epoch {}",
389            lis.last()
390                .map(|li| li.ledger_info().next_block_epoch())
391                .unwrap_or(start_epoch),
392        );
393        Ok((lis, more))
394    }
395
396    fn get_transaction_with_proof(
397        &self, version: Version, ledger_version: Version, fetch_events: bool,
398    ) -> Result<TransactionWithProof> {
399        let proof = self
400            .ledger_store
401            .get_transaction_info_with_proof(version, ledger_version)?;
402        let transaction = self.transaction_store.get_transaction(version)?;
403
404        // If events were requested, also fetch those.
405        let events = if fetch_events {
406            Some(self.event_store.get_events_by_version(version)?)
407        } else {
408            None
409        };
410
411        Ok(TransactionWithProof {
412            version,
413            transaction,
414            events,
415            proof,
416        })
417    }
418
419    // TODO check if already have methods can do this
420    pub fn get_transaction(&self, version: Version) -> Result<Transaction> {
421        self.transaction_store.get_transaction(version)
422    }
423
424    pub fn get_transaction_block_meta(
425        &self, version: Version,
426    ) -> Result<Option<(Version, BlockMetadata)>> {
427        self.transaction_store.get_block_metadata(version)
428    }
429
430    pub fn get_transaction_info(
431        &self, version: u64,
432    ) -> Result<TransactionInfo> {
433        self.ledger_store.get_transaction_info(version)
434    }
435
436    // ================================== Backup APIs
437    // ===================================
438
439    /// Gets an instance of `BackupHandler` for data backup purpose.
440    pub fn get_backup_handler(&self) -> BackupHandler {
441        BackupHandler::new(
442            Arc::clone(&self.ledger_store),
443            Arc::clone(&self.transaction_store),
444            Arc::clone(&self.state_store),
445            Arc::clone(&self.event_store),
446        )
447    }
448
449    /// Convert a `ChangeSet` to `SealedChangeSet`.
450    ///
451    /// Specifically, counter increases are added to current counter values and
452    /// converted to DB alternations.
453    fn seal_change_set(
454        &self, first_version: Version, num_txns: Version, mut cs: ChangeSet,
455    ) -> Result<(SealedChangeSet, Option<LedgerCounters>)> {
456        // Avoid reading base counter values when not necessary.
457        let counters = if num_txns > 0 {
458            Some(self.system_store.bump_ledger_counters(
459                first_version,
460                first_version + num_txns - 1,
461                &mut cs,
462            )?)
463        } else {
464            None
465        };
466
467        Ok((SealedChangeSet { batch: cs.batch }, counters))
468    }
469
470    fn save_transactions_impl(
471        &self, txns_to_commit: &[TransactionToCommit], first_version: u64,
472        mut cs: &mut ChangeSet,
473    ) -> Result<HashValue> {
474        let last_version = first_version + txns_to_commit.len() as u64 - 1;
475
476        // Account state updates. Gather account state root hashes
477        let account_state_sets = txns_to_commit
478            .iter()
479            .map(|txn_to_commit| txn_to_commit.account_states().clone())
480            .collect::<Vec<_>>();
481        let state_root_hashes = if first_version == 0 {
482            // Genesis transactions.
483            self.state_store.put_account_state_sets(
484                account_state_sets,
485                first_version,
486                &mut cs,
487            )?
488        } else {
489            // TODO(lpl): Remove state tree.
490            vec![Default::default(); txns_to_commit.len()]
491        };
492        diem_debug!(
493            "save_transactions_impl: {} {:?}",
494            first_version,
495            state_root_hashes
496        );
497
498        // Event updates. Gather event accumulator root hashes.
499        let event_root_hashes =
500            zip_eq(first_version..=last_version, txns_to_commit)
501                .map(|(ver, txn_to_commit)| {
502                    self.event_store.put_events(
503                        ver,
504                        txn_to_commit.events(),
505                        &mut cs,
506                    )
507                })
508                .collect::<Result<Vec<_>>>()?;
509
510        // Transaction updates. Gather transaction hashes.
511        zip_eq(first_version..=last_version, txns_to_commit).try_for_each(
512            |(ver, txn_to_commit)| {
513                self.transaction_store.put_transaction(
514                    ver,
515                    txn_to_commit.transaction(),
516                    &mut cs,
517                )
518            },
519        )?;
520
521        // Transaction accumulator updates. Get result root hash.
522        let txn_infos =
523            izip!(txns_to_commit, state_root_hashes, event_root_hashes)
524                .map(|(t, s, e)| {
525                    Ok(TransactionInfo::new(
526                        t.transaction().hash(),
527                        s,
528                        e,
529                        t.gas_used(),
530                        t.status().clone(),
531                    ))
532                })
533                .collect::<Result<Vec<_>>>()?;
534        assert_eq!(txn_infos.len(), txns_to_commit.len());
535
536        let mut new_root_hash = self.ledger_store.put_transaction_infos(
537            first_version,
538            &txn_infos,
539            &mut cs,
540        )?;
541        if first_version != 0 {
542            // TODO(lpl): Remove StateTree.
543            new_root_hash = Default::default();
544        };
545
546        Ok(new_root_hash)
547    }
548
549    /// Write the whole schema batch including all data necessary to mutate the
550    /// ledger state of some transaction by leveraging rocksdb atomicity
551    /// support. Also committed are the LedgerCounters.
552    fn commit(&self, sealed_cs: SealedChangeSet) -> Result<()> {
553        self.db.write_schemas(sealed_cs.batch, false)?;
554
555        Ok(())
556    }
557
558    fn wake_pruner(&self, latest_version: Version) {
559        if let Some(pruner) = self.pruner.as_ref() {
560            pruner.wake(latest_version)
561        }
562    }
563}
564
565impl DbReader for PosLedgerDB {
566    fn get_epoch_ending_ledger_infos(
567        &self, start_epoch: u64, end_epoch: u64,
568    ) -> Result<EpochChangeProof> {
569        gauged_api("get_epoch_ending_ledger_infos", || {
570            let (ledger_info_with_sigs, more) =
571                Self::get_epoch_ending_ledger_infos(
572                    &self,
573                    start_epoch,
574                    end_epoch,
575                    MAX_NUM_EPOCH_ENDING_LEDGER_INFO,
576                )?;
577            Ok(EpochChangeProof::new(ledger_info_with_sigs, more))
578        })
579    }
580
581    // ======================= State Synchronizer Internal APIs
582    // ===================================
583    /// Gets a batch of transactions for the purpose of synchronizing state to
584    /// another node.
585    ///
586    /// This is used by the State Synchronizer module internally.
587    fn get_transactions(
588        &self, start_version: Version, limit: u64, ledger_version: Version,
589        fetch_events: bool,
590    ) -> Result<TransactionListWithProof> {
591        gauged_api("get_transactions", || {
592            error_if_too_many_requested(limit, MAX_LIMIT)?;
593
594            if start_version > ledger_version || limit == 0 {
595                return Ok(TransactionListWithProof::new_empty());
596            }
597
598            let limit =
599                std::cmp::min(limit, ledger_version - start_version + 1);
600
601            let txns = (start_version..start_version + limit)
602                .map(|version| self.transaction_store.get_transaction(version))
603                .collect::<Result<Vec<_>>>()?;
604            let txn_infos = (start_version..start_version + limit)
605                .map(|version| self.ledger_store.get_transaction_info(version))
606                .collect::<Result<Vec<_>>>()?;
607            let events = if fetch_events {
608                Some(
609                    (start_version..start_version + limit)
610                        .map(|version| {
611                            self.event_store.get_events_by_version(version)
612                        })
613                        .collect::<Result<Vec<_>>>()?,
614                )
615            } else {
616                None
617            };
618            let proof = TransactionListProof::new(
619                self.ledger_store.get_transaction_range_proof(
620                    Some(start_version),
621                    limit,
622                    ledger_version,
623                )?,
624                txn_infos,
625            );
626
627            Ok(TransactionListWithProof::new(
628                txns,
629                events,
630                Some(start_version),
631                proof,
632            ))
633        })
634    }
635
636    fn get_block_timestamp(&self, version: u64) -> Result<u64> {
637        gauged_api("get_block_timestamp", || {
638            let ts = match self.transaction_store.get_block_metadata(version)? {
639                Some((_v, block_meta)) => block_meta.into_inner().1,
640                // genesis timestamp is 0
641                None => 0,
642            };
643            Ok(ts)
644        })
645    }
646
647    fn get_latest_account_state(
648        &self, address: AccountAddress,
649    ) -> Result<Option<AccountStateBlob>> {
650        gauged_api("get_latest_account_state", || {
651            let ledger_info_with_sigs =
652                self.ledger_store.get_latest_ledger_info()?;
653            let version = ledger_info_with_sigs.ledger_info().version();
654            let (blob, _proof) = self
655                .state_store
656                .get_account_state_with_proof_by_version(address, version)?;
657            Ok(blob)
658        })
659    }
660
661    fn get_latest_ledger_info(&self) -> Result<LedgerInfoWithSignatures> {
662        gauged_api("get_latest_ledger_info", || {
663            self.ledger_store.get_latest_ledger_info()
664        })
665    }
666
667    fn get_startup_info(
668        &self, need_pos_state: bool,
669    ) -> Result<Option<StartupInfo>> {
670        gauged_api("get_startup_info", || {
671            self.ledger_store.get_startup_info(need_pos_state)
672        })
673    }
674
675    /// Returns a transaction that is the `seq_num`-th one associated with the
676    /// given account. If the transaction with given `seq_num` doesn't
677    /// exist, returns `None`.
678    fn get_txn_by_account(
679        &self, address: AccountAddress, seq_num: u64, ledger_version: Version,
680        fetch_events: bool,
681    ) -> Result<Option<TransactionWithProof>> {
682        gauged_api("get_txn_by_account", || {
683            self.transaction_store
684                .lookup_transaction_by_account(
685                    address,
686                    seq_num,
687                    ledger_version,
688                )?
689                .map(|version| {
690                    self.get_transaction_with_proof(
691                        version,
692                        ledger_version,
693                        fetch_events,
694                    )
695                })
696                .transpose()
697        })
698    }
699
700    fn get_state_proof_with_ledger_info(
701        &self, known_version: u64,
702        ledger_info_with_sigs: LedgerInfoWithSignatures,
703    ) -> Result<(EpochChangeProof, AccumulatorConsistencyProof)> {
704        gauged_api("get_state_proof_with_ledger_info", || {
705            let ledger_info = ledger_info_with_sigs.ledger_info();
706            ensure!(
707                known_version <= ledger_info.version(),
708                "Client known_version {} larger than ledger version {}.",
709                known_version,
710                ledger_info.version(),
711            );
712            let known_epoch = self.ledger_store.get_epoch(known_version)?;
713            let epoch_change_proof =
714                if known_epoch < ledger_info.next_block_epoch() {
715                    let (ledger_infos_with_sigs, more) = self
716                        .get_epoch_ending_ledger_infos(
717                            known_epoch,
718                            ledger_info.next_block_epoch(),
719                            // This is only used for local initialization, so
720                            // it's ok to use MAX.
721                            usize::MAX,
722                        )?;
723                    EpochChangeProof::new(ledger_infos_with_sigs, more)
724                } else {
725                    EpochChangeProof::new(vec![], /* more = */ false)
726                };
727
728            let ledger_consistency_proof = self
729                .ledger_store
730                .get_consistency_proof(known_version, ledger_info.version())?;
731            Ok((epoch_change_proof, ledger_consistency_proof))
732        })
733    }
734
735    fn get_state_proof(
736        &self, known_version: u64,
737    ) -> Result<(
738        LedgerInfoWithSignatures,
739        EpochChangeProof,
740        AccumulatorConsistencyProof,
741    )> {
742        gauged_api("get_state_proof", || {
743            let ledger_info_with_sigs =
744                self.ledger_store.get_latest_ledger_info()?;
745            let (epoch_change_proof, ledger_consistency_proof) = self
746                .get_state_proof_with_ledger_info(
747                    known_version,
748                    ledger_info_with_sigs.clone(),
749                )?;
750            Ok((
751                ledger_info_with_sigs,
752                epoch_change_proof,
753                ledger_consistency_proof,
754            ))
755        })
756    }
757
758    fn get_account_state_with_proof(
759        &self, address: AccountAddress, version: Version,
760        ledger_version: Version,
761    ) -> Result<AccountStateWithProof> {
762        gauged_api("get_account_state_with_proof", || {
763            ensure!(
764                version <= ledger_version,
765                "The queried version {} should be equal to or older than ledger version {}.",
766                version,
767                ledger_version
768            );
769            {
770                let latest_version = self.get_latest_version()?;
771                ensure!(
772                    ledger_version <= latest_version,
773                    "ledger_version specified {} is greater than committed version {}.",
774                    ledger_version,
775                    latest_version
776                );
777            }
778
779            let txn_info_with_proof = self
780                .ledger_store
781                .get_transaction_info_with_proof(version, ledger_version)?;
782            let (account_state_blob, sparse_merkle_proof) = self
783                .state_store
784                .get_account_state_with_proof_by_version(address, version)?;
785            Ok(AccountStateWithProof::new(
786                version,
787                account_state_blob,
788                AccountStateProof::new(
789                    txn_info_with_proof,
790                    sparse_merkle_proof,
791                ),
792            ))
793        })
794    }
795
796    fn get_account_state_with_proof_by_version(
797        &self, address: AccountAddress, version: Version,
798    ) -> Result<(
799        Option<AccountStateBlob>,
800        SparseMerkleProof<AccountStateBlob>,
801    )> {
802        gauged_api("get_account_state_with_proof_by_version", || {
803            self.state_store
804                .get_account_state_with_proof_by_version(address, version)
805        })
806    }
807
808    fn get_latest_state_root(&self) -> Result<(Version, HashValue)> {
809        gauged_api("get_latest_state_root", || {
810            let (version, txn_info) =
811                self.ledger_store.get_latest_transaction_info()?;
812            Ok((version, txn_info.state_root_hash()))
813        })
814    }
815
816    fn get_latest_tree_state(&self) -> Result<TreeState> {
817        gauged_api("get_latest_tree_state", || {
818            let tree_state =
819                match self.ledger_store.get_latest_transaction_info_option()? {
820                    Some((version, txn_info)) => self
821                        .ledger_store
822                        .get_tree_state(version + 1, txn_info)?,
823                    None => TreeState::new(
824                        0,
825                        vec![],
826                        self.state_store
827                            .get_root_hash_option(PRE_GENESIS_VERSION)?
828                            .unwrap_or(*SPARSE_MERKLE_PLACEHOLDER_HASH),
829                    ),
830                };
831
832            diem_info!(
833                num_transactions = tree_state.num_transactions,
834                state_root_hash = %tree_state.account_state_root_hash,
835                description = tree_state.describe(),
836                "Got latest TreeState."
837            );
838
839            Ok(tree_state)
840        })
841    }
842
843    /// Gets ledger info at specified version and ensures it's an epoch ending.
844    fn get_epoch_ending_ledger_info(
845        &self, version: u64,
846    ) -> Result<LedgerInfoWithSignatures> {
847        gauged_api("get_epoch_ending_ledger_info", || {
848            self.ledger_store.get_epoch_ending_ledger_info(version)
849        })
850    }
851
852    fn get_latest_transaction_info_option(
853        &self,
854    ) -> Result<Option<(Version, TransactionInfo)>> {
855        gauged_api("get_latest_transaction_info_option", || {
856            self.ledger_store.get_latest_transaction_info_option()
857        })
858    }
859
860    fn get_accumulator_root_hash(&self, version: Version) -> Result<HashValue> {
861        gauged_api("get_accumulator_root_hash", || {
862            self.ledger_store.get_root_hash(version)
863        })
864    }
865
866    fn get_pos_state(&self, block_id: &HashValue) -> Result<PosState> {
867        diem_debug!("get_pos_state:{}", block_id);
868        self.ledger_store.get_pos_state(block_id)
869    }
870
871    fn get_latest_pos_state(&self) -> Arc<PosState> {
872        self.ledger_store.get_latest_pos_state()
873    }
874}
875
876impl DbWriter for PosLedgerDB {
877    /// `first_version` is the version of the first transaction in
878    /// `txns_to_commit`. When `ledger_info_with_sigs` is provided, verify
879    /// that the transaction accumulator root hash it carries is generated
880    /// after the `txns_to_commit` are applied. Note that even if
881    /// `txns_to_commit` is empty, `frist_version` is checked to be
882    /// `ledger_info_with_sigs.ledger_info.version + 1` if
883    /// `ledger_info_with_sigs` is not `None`.
884    fn save_transactions(
885        &self, txns_to_commit: &[TransactionToCommit], first_version: Version,
886        ledger_info_with_sigs: Option<&LedgerInfoWithSignatures>,
887        pos_state: Option<PosState>, committed_blocks: Vec<CommittedBlock>,
888        ledger_infos_with_voted_block: Vec<(
889            HashValue,
890            LedgerInfoWithSignatures,
891        )>,
892    ) -> Result<()> {
893        gauged_api("save_transactions", || {
894            let num_txns = txns_to_commit.len() as u64;
895            // ledger_info_with_sigs could be None if we are doing state
896            // synchronization. In this case txns_to_commit should
897            // not be empty. Otherwise it is okay to commit empty blocks.
898            ensure!(
899                ledger_info_with_sigs.is_some() || num_txns > 0,
900                "txns_to_commit is empty while ledger_info_with_sigs is None.",
901            );
902
903            // Gather db mutations to `batch`.
904            let mut cs = ChangeSet::new();
905
906            if let Some(x) = ledger_info_with_sigs {
907                let claimed_last_version = x.ledger_info().version();
908                ensure!(
909                    claimed_last_version + 1 == first_version + num_txns,
910                    "Transaction batch not applicable: first_version {}, num_txns {}, last_version {}",
911                    first_version,
912                    num_txns,
913                    claimed_last_version,
914                );
915            }
916
917            let _new_root_hash = self.save_transactions_impl(
918                txns_to_commit,
919                first_version,
920                &mut cs,
921            )?;
922
923            for b in committed_blocks {
924                self.ledger_store.put_committed_block(&b, &mut cs)?;
925            }
926
927            for (voted_block, ledger_info) in ledger_infos_with_voted_block {
928                self.ledger_store.put_ledger_info_by_voted_block(
929                    &voted_block,
930                    &ledger_info,
931                    &mut cs,
932                )?;
933            }
934
935            // If expected ledger info is provided, verify result root hash and
936            // save the ledger info.
937            if let Some(x) = ledger_info_with_sigs {
938                // let expected_root_hash =
939                //     x.ledger_info().transaction_accumulator_hash();
940                // ensure!(
941                //     new_root_hash == expected_root_hash,
942                //     "Root hash calculated doesn't match expected. {:?} vs
943                // {:?}",     new_root_hash,
944                //     expected_root_hash,
945                // );
946
947                self.ledger_store.put_ledger_info(x, &mut cs)?;
948                if let Some(pos_state) = pos_state {
949                    // ledger_info and pos_state are always `Some` for now.
950                    self.ledger_store.put_pos_state(
951                        &x.ledger_info().consensus_block_id(),
952                        pos_state,
953                        &mut cs,
954                    )?;
955                }
956            }
957
958            // Persist.
959            let (sealed_cs, counters) =
960                self.seal_change_set(first_version, num_txns, cs)?;
961            {
962                let _timer = DIEM_STORAGE_OTHER_TIMERS_SECONDS
963                    .with_label_values(&["save_transactions_commit"])
964                    .start_timer();
965                self.commit(sealed_cs)?;
966            }
967
968            // Once everything is successfully persisted, update the latest
969            // in-memory ledger info.
970            if let Some(x) = ledger_info_with_sigs {
971                self.ledger_store.set_latest_ledger_info(x.clone());
972
973                DIEM_STORAGE_LEDGER_VERSION
974                    .set(x.ledger_info().version() as i64);
975                DIEM_STORAGE_NEXT_BLOCK_EPOCH
976                    .set(x.ledger_info().next_block_epoch() as i64);
977            }
978
979            // Only increment counter if commit succeeds and there are at least
980            // one transaction written to the storage. That's also
981            // when we'd inform the pruner thread to work.
982            if num_txns > 0 {
983                let last_version = first_version + num_txns - 1;
984                DIEM_STORAGE_COMMITTED_TXNS.inc_by(num_txns);
985                DIEM_STORAGE_LATEST_TXN_VERSION.set(last_version as i64);
986                counters
987                    .expect("Counters should be bumped with transactions being saved.")
988                    .bump_op_counters();
989
990                self.wake_pruner(last_version);
991            }
992
993            Ok(())
994        })
995    }
996
997    fn save_reward_event(
998        &self, epoch: u64, event: &RewardDistributionEventV2,
999    ) -> Result<()> {
1000        self.ledger_store.put_reward_event(epoch, event)
1001    }
1002
1003    fn delete_pos_state_by_block(&self, block_id: &HashValue) -> Result<()> {
1004        self.ledger_store.delete_pos_state(block_id)
1005    }
1006}
1007
1008impl DBReaderForPoW for PosLedgerDB {
1009    fn get_latest_ledger_info_option(
1010        &self,
1011    ) -> Option<LedgerInfoWithSignatures> {
1012        self.ledger_store.get_latest_ledger_info_option()
1013    }
1014
1015    fn get_block_ledger_info(
1016        &self, consensus_block_id: &HashValue,
1017    ) -> Result<LedgerInfoWithSignatures> {
1018        self.ledger_store.get_block_ledger_info(consensus_block_id)
1019    }
1020
1021    fn get_events_by_version(
1022        &self, start_version: u64, end_version: u64,
1023    ) -> Result<Vec<ContractEvent>> {
1024        let iter = self.event_store.get_events_by_version_iter(
1025            start_version,
1026            (end_version - start_version) as usize,
1027        )?;
1028        let events_vec = iter.collect::<Result<Vec<Vec<ContractEvent>>>>()?;
1029        Ok(events_vec.into_iter().flatten().collect())
1030    }
1031
1032    fn get_epoch_ending_blocks(
1033        &self, start_epoch: u64, end_epoch: u64,
1034    ) -> Result<Vec<HashValue>> {
1035        let mut ending_blocks = Vec::new();
1036        for ledger_info in self
1037            .ledger_store
1038            .get_epoch_ending_ledger_info_iter(start_epoch, end_epoch)?
1039        {
1040            ending_blocks.push(ledger_info?.ledger_info().consensus_block_id());
1041        }
1042        Ok(ending_blocks)
1043    }
1044
1045    fn get_reward_event(
1046        &self, epoch: u64,
1047    ) -> Result<RewardDistributionEventV2> {
1048        self.ledger_store.get_reward_event(epoch)
1049    }
1050
1051    fn get_committed_block_by_hash(
1052        &self, block_hash: &HashValue,
1053    ) -> Result<CommittedBlock> {
1054        self.ledger_store.get_committed_block_by_hash(block_hash)
1055    }
1056
1057    fn get_committed_block_hash_by_view(&self, view: u64) -> Result<HashValue> {
1058        self.ledger_store.get_committed_block_hash_by_view(view)
1059    }
1060
1061    fn get_ledger_info_by_voted_block(
1062        &self, block_id: &HashValue,
1063    ) -> Result<LedgerInfoWithSignatures> {
1064        self.ledger_store.get_ledger_info_by_voted_block(block_id)
1065    }
1066
1067    fn get_block_hash_by_epoch_and_round(
1068        &self, epoch: u64, round: u64,
1069    ) -> Result<HashValue> {
1070        self.ledger_store
1071            .get_block_hash_by_epoch_and_round(epoch, round)
1072    }
1073}
1074
1075#[cfg(any(test, feature = "fuzzing"))]
1076// Convert requested range and order to a range in ascending order.
1077fn get_first_seq_num_and_limit(
1078    order: Order, cursor: u64, limit: u64,
1079) -> Result<(u64, u64)> {
1080    ensure!(limit > 0, "limit should > 0, got {}", limit);
1081
1082    Ok(if order == Order::Ascending {
1083        (cursor, limit)
1084    } else if limit <= cursor {
1085        (cursor - limit + 1, limit)
1086    } else {
1087        (0, cursor + 1)
1088    })
1089}
1090
1091pub trait GetRestoreHandler {
1092    /// Gets an instance of `RestoreHandler` for data restore purpose.
1093    fn get_restore_handler(&self) -> RestoreHandler;
1094}
1095
1096impl GetRestoreHandler for Arc<PosLedgerDB> {
1097    fn get_restore_handler(&self) -> RestoreHandler {
1098        RestoreHandler::new(
1099            Arc::clone(&self.db),
1100            Arc::clone(self),
1101            Arc::clone(&self.ledger_store),
1102            Arc::clone(&self.transaction_store),
1103            Arc::clone(&self.state_store),
1104            Arc::clone(&self.event_store),
1105        )
1106    }
1107}
1108
1109fn gauged_api<T, F>(api_name: &'static str, api_impl: F) -> Result<T>
1110where F: FnOnce() -> Result<T> {
1111    let timer = Instant::now();
1112
1113    let res = api_impl();
1114
1115    let res_type = match &res {
1116        Ok(_) => "Ok",
1117        Err(e) => {
1118            diem_warn!(
1119                api_name = api_name,
1120                error = ?e,
1121                "DiemDB API returned error."
1122            );
1123            "Err"
1124        }
1125    };
1126    DIEM_STORAGE_API_LATENCY_SECONDS
1127        .with_label_values(&[api_name, res_type])
1128        .observe(timer.elapsed().as_secs_f64());
1129
1130    res
1131}