pos_ledger_db/
lib.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8#![forbid(unsafe_code)]
9
10//! This crate provides [`PosLedgerDB`] which represents physical storage of the
11//! core Diem data structures.
12//!
13//! It relays read/write operations on the physical storage via [`schemadb`] to
14//! the underlying Key-Value storage system, and implements diem data structures
15//! on top of it.
16
17use std::{
18    collections::HashMap,
19    iter::Iterator,
20    path::Path,
21    sync::{mpsc, Arc, Mutex},
22    thread::{self, JoinHandle},
23    time::{Duration, Instant},
24};
25
26use anyhow::{ensure, Result};
27use itertools::{izip, zip_eq};
28use once_cell::sync::Lazy;
29
30use diem_config::config::RocksdbConfig;
31use diem_crypto::hash::{
32    CryptoHash, HashValue, SPARSE_MERKLE_PLACEHOLDER_HASH,
33};
34use diem_logger::prelude::*;
35use diem_types::{
36    account_address::AccountAddress,
37    committed_block::CommittedBlock,
38    contract_event::ContractEvent,
39    epoch_change::EpochChangeProof,
40    ledger_info::LedgerInfoWithSignatures,
41    proof::{AccumulatorConsistencyProof, TransactionListProof},
42    reward_distribution_event::RewardDistributionEventV2,
43    term_state::PosState,
44    transaction::{
45        Transaction, TransactionInfo, TransactionListWithProof,
46        TransactionToCommit, TransactionWithProof, Version,
47    },
48};
49#[cfg(feature = "fuzzing")]
50pub use diemdb_test::test_save_blocks_impl;
51use schemadb::{ColumnFamilyName, Options, DB, DEFAULT_CF_NAME};
52#[cfg(any(test, feature = "fuzzing"))]
53use storage_interface::Order;
54use storage_interface::{
55    DBReaderForPoW, DbReader, DbWriter, StartupInfo, TreeState,
56};
57
58use crate::{
59    change_set::{ChangeSet, SealedChangeSet},
60    errors::DiemDbError,
61    event_store::EventStore,
62    ledger_store::LedgerStore,
63    metrics::{
64        DIEM_STORAGE_API_LATENCY_SECONDS, DIEM_STORAGE_COMMITTED_TXNS,
65        DIEM_STORAGE_LATEST_TXN_VERSION, DIEM_STORAGE_LEDGER_VERSION,
66        DIEM_STORAGE_NEXT_BLOCK_EPOCH, DIEM_STORAGE_OTHER_TIMERS_SECONDS,
67        DIEM_STORAGE_ROCKSDB_PROPERTIES,
68    },
69    schema::*,
70    transaction_store::TransactionStore,
71};
72use diem_types::block_metadata::BlockMetadata;
73
74// Used in this and other crates for testing.
75#[cfg(any(test, feature = "fuzzing"))]
76pub mod test_helper;
77
78pub mod errors;
79pub mod metrics;
80pub mod schema;
81
82mod change_set;
83mod event_store;
84mod ledger_store;
85mod transaction_store;
86
87#[cfg(any(test, feature = "fuzzing"))]
88#[allow(dead_code)]
89mod diemdb_test;
90
91const MAX_LIMIT: u64 = 1000;
92
93// TODO: Either implement an iteration API to allow a very old client to loop
94// through a long history or guarantee that there is always a recent enough
95// waypoint and client knows to boot from there.
96const MAX_NUM_EPOCH_ENDING_LEDGER_INFO: usize = 100;
97
98static ROCKSDB_PROPERTY_MAP: Lazy<HashMap<&str, &str>> = Lazy::new(|| {
99    [
100        (
101            "diem_rocksdb_live_sst_files_size_bytes",
102            "rocksdb.live-sst-files-size",
103        ),
104        (
105            "diem_rocksdb_all_memtables_size_bytes",
106            "rocksdb.size-all-mem-tables",
107        ),
108        (
109            "diem_rocksdb_num_running_compactions",
110            "rocksdb.num-running-compactions",
111        ),
112        (
113            "diem_rocksdb_num_running_flushes",
114            "rocksdb.num-running-flushes",
115        ),
116        (
117            "diem_rocksdb_block_cache_usage_bytes",
118            "rocksdb.block-cache-usage",
119        ),
120        (
121            "diem_rocksdb_cf_size_bytes",
122            "rocksdb.estimate-live-data-size",
123        ),
124    ]
125    .iter()
126    .cloned()
127    .collect()
128});
129
130fn error_if_too_many_requested(
131    num_requested: u64, max_allowed: u64,
132) -> Result<()> {
133    if num_requested > max_allowed {
134        Err(DiemDbError::TooManyRequested(num_requested, max_allowed).into())
135    } else {
136        Ok(())
137    }
138}
139
140fn gen_rocksdb_options(config: &RocksdbConfig) -> Options {
141    let mut db_opts = Options::default();
142    db_opts.set_max_open_files(config.max_open_files);
143    db_opts.set_max_total_wal_size(config.max_total_wal_size);
144    db_opts
145}
146
147fn update_rocksdb_properties(db: &DB) -> Result<()> {
148    let _timer = DIEM_STORAGE_OTHER_TIMERS_SECONDS
149        .with_label_values(&["update_rocksdb_properties"])
150        .start_timer();
151    for cf_name in PosLedgerDB::column_families() {
152        for (property_name, rocksdb_property_argument) in &*ROCKSDB_PROPERTY_MAP
153        {
154            DIEM_STORAGE_ROCKSDB_PROPERTIES
155                .with_label_values(&[cf_name, property_name])
156                .set(
157                    db.get_property(cf_name, rocksdb_property_argument)? as i64
158                );
159        }
160    }
161    Ok(())
162}
163
164#[derive(Debug)]
165struct RocksdbPropertyReporter {
166    sender: Mutex<mpsc::Sender<()>>,
167    join_handle: Option<JoinHandle<()>>,
168}
169
170impl RocksdbPropertyReporter {
171    fn new(db: Arc<DB>) -> Self {
172        let (send, recv) = mpsc::channel();
173        let join_handle = Some(thread::spawn(move || loop {
174            if let Err(e) = update_rocksdb_properties(&db) {
175                diem_warn!(
176                    error = ?e,
177                    "Updating rocksdb property failed."
178                );
179            }
180            // report rocksdb properties each 10 seconds
181            match recv.recv_timeout(Duration::from_secs(10)) {
182                Ok(_) => break,
183                Err(mpsc::RecvTimeoutError::Timeout) => (),
184                Err(mpsc::RecvTimeoutError::Disconnected) => break,
185            }
186        }));
187        Self {
188            sender: Mutex::new(send),
189            join_handle,
190        }
191    }
192}
193
194impl Drop for RocksdbPropertyReporter {
195    fn drop(&mut self) {
196        // Notify the property reporting thread to exit
197        self.sender.lock().unwrap().send(()).unwrap();
198        self.join_handle
199            .take()
200            .expect("Rocksdb property reporting thread must exist.")
201            .join()
202            .expect(
203                "Rocksdb property reporting thread should join peacefully.",
204            );
205    }
206}
207
208/// This holds a handle to the underlying DB responsible for physical storage
209/// and provides APIs for access to the core Diem data structures.
210#[derive(Debug)]
211pub struct PosLedgerDB {
212    db: Arc<DB>,
213    ledger_store: Arc<LedgerStore>,
214    transaction_store: Arc<TransactionStore>,
215    event_store: Arc<EventStore>,
216    #[allow(dead_code)]
217    rocksdb_property_reporter: RocksdbPropertyReporter,
218}
219
220impl PosLedgerDB {
221    fn column_families() -> Vec<ColumnFamilyName> {
222        vec![
223            /* LedgerInfo CF = */ DEFAULT_CF_NAME,
224            EPOCH_BY_VERSION_CF_NAME,
225            EVENT_ACCUMULATOR_CF_NAME,
226            EVENT_BY_KEY_CF_NAME,
227            EVENT_BY_VERSION_CF_NAME,
228            EVENT_CF_NAME,
229            JELLYFISH_MERKLE_NODE_CF_NAME,
230            LEDGER_COUNTERS_CF_NAME,
231            STALE_NODE_INDEX_CF_NAME,
232            TRANSACTION_CF_NAME,
233            TRANSACTION_ACCUMULATOR_CF_NAME,
234            TRANSACTION_BY_ACCOUNT_CF_NAME,
235            TRANSACTION_INFO_CF_NAME,
236            LEDGER_INFO_BY_BLOCK_CF_NAME,
237            POS_STATE_CF_NAME,
238            REWARD_EVENT_CF_NAME,
239            COMMITTED_BLOCK_CF_NAME,
240            COMMITTED_BLOCK_BY_VIEW_CF_NAME,
241            LEDGER_INFO_BY_VOTED_BLOCK_CF_NAME,
242            BLOCK_BY_EPOCH_AND_ROUND_CF_NAME,
243        ]
244    }
245
246    fn new_with_db(db: DB, _prune_window: Option<u64>) -> Self {
247        let db = Arc::new(db);
248
249        PosLedgerDB {
250            db: Arc::clone(&db),
251            event_store: Arc::new(EventStore::new(Arc::clone(&db))),
252            ledger_store: Arc::new(LedgerStore::new(Arc::clone(&db))),
253            transaction_store: Arc::new(TransactionStore::new(Arc::clone(&db))),
254            rocksdb_property_reporter: RocksdbPropertyReporter::new(
255                Arc::clone(&db),
256            ),
257        }
258    }
259
260    pub fn open<P: AsRef<Path> + Clone>(
261        db_root_path: P, readonly: bool, prune_window: Option<u64>,
262        rocksdb_config: RocksdbConfig,
263    ) -> Result<Self> {
264        ensure!(
265            prune_window.is_none() || !readonly,
266            "Do not set prune_window when opening readonly.",
267        );
268
269        let path = db_root_path.as_ref().join("pos-ledger-db");
270        let instant = Instant::now();
271
272        let mut rocksdb_opts = gen_rocksdb_options(&rocksdb_config);
273
274        let db = if readonly {
275            DB::open_readonly(
276                path.clone(),
277                "diemdb_ro",
278                Self::column_families(),
279                rocksdb_opts,
280            )?
281        } else {
282            rocksdb_opts.create_if_missing(true);
283            rocksdb_opts.create_missing_column_families(true);
284            DB::open(
285                path.clone(),
286                "pos-ledger-db",
287                Self::column_families(),
288                rocksdb_opts,
289            )?
290        };
291
292        let ret = Self::new_with_db(db, prune_window);
293        diem_info!(
294            path = path,
295            time_ms = %instant.elapsed().as_millis(),
296            "Opened PosLedgerDB.",
297        );
298        Ok(ret)
299    }
300
301    /// This opens db in non-readonly mode, without the pruner.
302    #[cfg(any(test, feature = "fuzzing"))]
303    pub fn new_for_test<P: AsRef<Path> + Clone>(db_root_path: P) -> Self {
304        Self::open(
305            db_root_path,
306            false, /* readonly */
307            None,  /* pruner */
308            RocksdbConfig::default(),
309        )
310        .expect("Unable to open DiemDB")
311    }
312
313    /// This force the db to update rocksdb properties immediately.
314    pub fn update_rocksdb_properties(&self) -> Result<()> {
315        update_rocksdb_properties(&self.db)
316    }
317
318    /// Returns ledger infos reflecting epoch bumps starting with the given
319    /// epoch. If there are no more than `MAX_NUM_EPOCH_ENDING_LEDGER_INFO`
320    /// results, this function returns all of them, otherwise the first
321    /// `MAX_NUM_EPOCH_ENDING_LEDGER_INFO` results are returned and a flag
322    /// (when true) will be used to indicate the fact that there is more.
323    fn get_epoch_ending_ledger_infos(
324        &self, start_epoch: u64, end_epoch: u64, limit: usize,
325    ) -> Result<(Vec<LedgerInfoWithSignatures>, bool)> {
326        self.get_epoch_ending_ledger_infos_impl(start_epoch, end_epoch, limit)
327    }
328
329    fn get_epoch_ending_ledger_infos_impl(
330        &self, start_epoch: u64, end_epoch: u64, limit: usize,
331    ) -> Result<(Vec<LedgerInfoWithSignatures>, bool)> {
332        ensure!(
333            start_epoch <= end_epoch,
334            "Bad epoch range [{}, {})",
335            start_epoch,
336            end_epoch,
337        );
338        // Note that the latest epoch can be the same with the current epoch (in
339        // most cases), or current_epoch + 1 (when the latest
340        // ledger_info carries next validator set)
341        let latest_epoch = self
342            .ledger_store
343            .get_latest_ledger_info()?
344            .ledger_info()
345            .next_block_epoch();
346        ensure!(
347            end_epoch <= latest_epoch,
348            "Unable to provide epoch change ledger info for still open epoch. asked upper bound: {}, last sealed epoch: {}",
349            end_epoch,
350            latest_epoch - 1,  // okay to -1 because genesis LedgerInfo has .next_block_epoch() == 1
351        );
352
353        let (paging_epoch, more) = if end_epoch - start_epoch > limit as u64 {
354            (start_epoch + limit as u64, true)
355        } else {
356            (end_epoch, false)
357        };
358
359        let lis = self
360            .ledger_store
361            .get_epoch_ending_ledger_info_iter(start_epoch, paging_epoch)?
362            .collect::<Result<Vec<_>>>()?;
363        ensure!(
364            lis.len() == (paging_epoch - start_epoch) as usize,
365            "DB corruption: missing epoch ending ledger info for epoch {}",
366            lis.last()
367                .map(|li| li.ledger_info().next_block_epoch())
368                .unwrap_or(start_epoch),
369        );
370        Ok((lis, more))
371    }
372
373    fn get_transaction_with_proof(
374        &self, version: Version, ledger_version: Version, fetch_events: bool,
375    ) -> Result<TransactionWithProof> {
376        let proof = self
377            .ledger_store
378            .get_transaction_info_with_proof(version, ledger_version)?;
379        let transaction = self.transaction_store.get_transaction(version)?;
380
381        // If events were requested, also fetch those.
382        let events = if fetch_events {
383            Some(self.event_store.get_events_by_version(version)?)
384        } else {
385            None
386        };
387
388        Ok(TransactionWithProof {
389            version,
390            transaction,
391            events,
392            proof,
393        })
394    }
395
396    // TODO check if already have methods can do this
397    pub fn get_transaction(&self, version: Version) -> Result<Transaction> {
398        self.transaction_store.get_transaction(version)
399    }
400
401    pub fn get_transaction_block_meta(
402        &self, version: Version,
403    ) -> Result<Option<(Version, BlockMetadata)>> {
404        self.transaction_store.get_block_metadata(version)
405    }
406
407    pub fn get_transaction_info(
408        &self, version: u64,
409    ) -> Result<TransactionInfo> {
410        self.ledger_store.get_transaction_info(version)
411    }
412
413    /// Convert a `ChangeSet` to `SealedChangeSet`.
414    fn seal_change_set(&self, cs: ChangeSet) -> Result<SealedChangeSet> {
415        Ok(SealedChangeSet { batch: cs.batch })
416    }
417
418    fn save_transactions_impl(
419        &self, txns_to_commit: &[TransactionToCommit], first_version: u64,
420        mut cs: &mut ChangeSet,
421    ) -> Result<HashValue> {
422        let last_version = first_version + txns_to_commit.len() as u64 - 1;
423
424        // State root hashes are always default (state tree removed).
425        let state_root_hashes = vec![Default::default(); txns_to_commit.len()];
426
427        // Event updates. Gather event accumulator root hashes.
428        let event_root_hashes =
429            zip_eq(first_version..=last_version, txns_to_commit)
430                .map(|(ver, txn_to_commit)| {
431                    self.event_store.put_events(
432                        ver,
433                        txn_to_commit.events(),
434                        &mut cs,
435                    )
436                })
437                .collect::<Result<Vec<_>>>()?;
438
439        // Transaction updates. Gather transaction hashes.
440        zip_eq(first_version..=last_version, txns_to_commit).try_for_each(
441            |(ver, txn_to_commit)| {
442                self.transaction_store.put_transaction(
443                    ver,
444                    txn_to_commit.transaction(),
445                    &mut cs,
446                )
447            },
448        )?;
449
450        // Transaction accumulator updates. Get result root hash.
451        let txn_infos =
452            izip!(txns_to_commit, state_root_hashes, event_root_hashes)
453                .map(|(t, s, e)| {
454                    Ok(TransactionInfo::new(
455                        t.transaction().hash(),
456                        s,
457                        e,
458                        t.gas_used(),
459                        t.status().clone(),
460                    ))
461                })
462                .collect::<Result<Vec<_>>>()?;
463        assert_eq!(txn_infos.len(), txns_to_commit.len());
464
465        let new_root_hash = self.ledger_store.put_transaction_infos(
466            first_version,
467            &txn_infos,
468            &mut cs,
469        )?;
470
471        Ok(new_root_hash)
472    }
473
474    /// Write the whole schema batch including all data necessary to mutate the
475    /// ledger state of some transaction by leveraging rocksdb atomicity
476    /// support. Also committed are the LedgerCounters.
477    fn commit(&self, sealed_cs: SealedChangeSet) -> Result<()> {
478        self.db.write_schemas(sealed_cs.batch, false)?;
479
480        Ok(())
481    }
482}
483
484impl DbReader for PosLedgerDB {
485    fn get_epoch_ending_ledger_infos(
486        &self, start_epoch: u64, end_epoch: u64,
487    ) -> Result<EpochChangeProof> {
488        gauged_api("get_epoch_ending_ledger_infos", || {
489            let (ledger_info_with_sigs, more) =
490                Self::get_epoch_ending_ledger_infos(
491                    &self,
492                    start_epoch,
493                    end_epoch,
494                    MAX_NUM_EPOCH_ENDING_LEDGER_INFO,
495                )?;
496            Ok(EpochChangeProof::new(ledger_info_with_sigs, more))
497        })
498    }
499
500    // ======================= State Synchronizer Internal APIs
501    // ===================================
502    /// Gets a batch of transactions for the purpose of synchronizing state to
503    /// another node.
504    ///
505    /// This is used by the State Synchronizer module internally.
506    fn get_transactions(
507        &self, start_version: Version, limit: u64, ledger_version: Version,
508        fetch_events: bool,
509    ) -> Result<TransactionListWithProof> {
510        gauged_api("get_transactions", || {
511            error_if_too_many_requested(limit, MAX_LIMIT)?;
512
513            if start_version > ledger_version || limit == 0 {
514                return Ok(TransactionListWithProof::new_empty());
515            }
516
517            let limit =
518                std::cmp::min(limit, ledger_version - start_version + 1);
519
520            let txns = (start_version..start_version + limit)
521                .map(|version| self.transaction_store.get_transaction(version))
522                .collect::<Result<Vec<_>>>()?;
523            let txn_infos = (start_version..start_version + limit)
524                .map(|version| self.ledger_store.get_transaction_info(version))
525                .collect::<Result<Vec<_>>>()?;
526            let events = if fetch_events {
527                Some(
528                    (start_version..start_version + limit)
529                        .map(|version| {
530                            self.event_store.get_events_by_version(version)
531                        })
532                        .collect::<Result<Vec<_>>>()?,
533                )
534            } else {
535                None
536            };
537            let proof = TransactionListProof::new(
538                self.ledger_store.get_transaction_range_proof(
539                    Some(start_version),
540                    limit,
541                    ledger_version,
542                )?,
543                txn_infos,
544            );
545
546            Ok(TransactionListWithProof::new(
547                txns,
548                events,
549                Some(start_version),
550                proof,
551            ))
552        })
553    }
554
555    fn get_block_timestamp(&self, version: u64) -> Result<u64> {
556        gauged_api("get_block_timestamp", || {
557            let ts = match self.transaction_store.get_block_metadata(version)? {
558                Some((_v, block_meta)) => block_meta.into_inner().1,
559                // genesis timestamp is 0
560                None => 0,
561            };
562            Ok(ts)
563        })
564    }
565
566    fn get_latest_ledger_info(&self) -> Result<LedgerInfoWithSignatures> {
567        gauged_api("get_latest_ledger_info", || {
568            self.ledger_store.get_latest_ledger_info()
569        })
570    }
571
572    fn get_startup_info(
573        &self, need_pos_state: bool,
574    ) -> Result<Option<StartupInfo>> {
575        gauged_api("get_startup_info", || {
576            self.ledger_store.get_startup_info(need_pos_state)
577        })
578    }
579
580    /// Returns a transaction that is the `seq_num`-th one associated with the
581    /// given account. If the transaction with given `seq_num` doesn't
582    /// exist, returns `None`.
583    fn get_txn_by_account(
584        &self, address: AccountAddress, seq_num: u64, ledger_version: Version,
585        fetch_events: bool,
586    ) -> Result<Option<TransactionWithProof>> {
587        gauged_api("get_txn_by_account", || {
588            self.transaction_store
589                .lookup_transaction_by_account(
590                    address,
591                    seq_num,
592                    ledger_version,
593                )?
594                .map(|version| {
595                    self.get_transaction_with_proof(
596                        version,
597                        ledger_version,
598                        fetch_events,
599                    )
600                })
601                .transpose()
602        })
603    }
604
605    fn get_state_proof_with_ledger_info(
606        &self, known_version: u64,
607        ledger_info_with_sigs: LedgerInfoWithSignatures,
608    ) -> Result<(EpochChangeProof, AccumulatorConsistencyProof)> {
609        gauged_api("get_state_proof_with_ledger_info", || {
610            let ledger_info = ledger_info_with_sigs.ledger_info();
611            ensure!(
612                known_version <= ledger_info.version(),
613                "Client known_version {} larger than ledger version {}.",
614                known_version,
615                ledger_info.version(),
616            );
617            let known_epoch = self.ledger_store.get_epoch(known_version)?;
618            let epoch_change_proof =
619                if known_epoch < ledger_info.next_block_epoch() {
620                    let (ledger_infos_with_sigs, more) = self
621                        .get_epoch_ending_ledger_infos(
622                            known_epoch,
623                            ledger_info.next_block_epoch(),
624                            // This is only used for local initialization, so
625                            // it's ok to use MAX.
626                            usize::MAX,
627                        )?;
628                    EpochChangeProof::new(ledger_infos_with_sigs, more)
629                } else {
630                    EpochChangeProof::new(vec![], /* more = */ false)
631                };
632
633            let ledger_consistency_proof = self
634                .ledger_store
635                .get_consistency_proof(known_version, ledger_info.version())?;
636            Ok((epoch_change_proof, ledger_consistency_proof))
637        })
638    }
639
640    fn get_state_proof(
641        &self, known_version: u64,
642    ) -> Result<(
643        LedgerInfoWithSignatures,
644        EpochChangeProof,
645        AccumulatorConsistencyProof,
646    )> {
647        gauged_api("get_state_proof", || {
648            let ledger_info_with_sigs =
649                self.ledger_store.get_latest_ledger_info()?;
650            let (epoch_change_proof, ledger_consistency_proof) = self
651                .get_state_proof_with_ledger_info(
652                    known_version,
653                    ledger_info_with_sigs.clone(),
654                )?;
655            Ok((
656                ledger_info_with_sigs,
657                epoch_change_proof,
658                ledger_consistency_proof,
659            ))
660        })
661    }
662
663    fn get_latest_tree_state(&self) -> Result<TreeState> {
664        gauged_api("get_latest_tree_state", || {
665            let tree_state = match self
666                .ledger_store
667                .get_latest_transaction_info_option()?
668            {
669                Some((version, _txn_info)) => {
670                    let frozen_subtrees = self
671                        .ledger_store
672                        .get_frozen_subtree_hashes(version + 1)?;
673                    TreeState::new(
674                        version + 1,
675                        frozen_subtrees,
676                        *SPARSE_MERKLE_PLACEHOLDER_HASH,
677                    )
678                }
679                None => {
680                    TreeState::new(0, vec![], *SPARSE_MERKLE_PLACEHOLDER_HASH)
681                }
682            };
683
684            diem_info!(
685                num_transactions = tree_state.num_transactions,
686                state_root_hash = %tree_state.account_state_root_hash,
687                description = tree_state.describe(),
688                "Got latest TreeState."
689            );
690
691            Ok(tree_state)
692        })
693    }
694
695    /// Gets ledger info at specified version and ensures it's an epoch ending.
696    fn get_epoch_ending_ledger_info(
697        &self, version: u64,
698    ) -> Result<LedgerInfoWithSignatures> {
699        gauged_api("get_epoch_ending_ledger_info", || {
700            self.ledger_store.get_epoch_ending_ledger_info(version)
701        })
702    }
703
704    fn get_latest_transaction_info_option(
705        &self,
706    ) -> Result<Option<(Version, TransactionInfo)>> {
707        gauged_api("get_latest_transaction_info_option", || {
708            self.ledger_store.get_latest_transaction_info_option()
709        })
710    }
711
712    fn get_accumulator_root_hash(&self, version: Version) -> Result<HashValue> {
713        gauged_api("get_accumulator_root_hash", || {
714            self.ledger_store.get_root_hash(version)
715        })
716    }
717
718    fn get_pos_state(&self, block_id: &HashValue) -> Result<PosState> {
719        diem_debug!("get_pos_state:{}", block_id);
720        self.ledger_store.get_pos_state(block_id)
721    }
722
723    fn get_latest_pos_state(&self) -> Arc<PosState> {
724        self.ledger_store.get_latest_pos_state()
725    }
726}
727
728impl DbWriter for PosLedgerDB {
729    /// `first_version` is the version of the first transaction in
730    /// `txns_to_commit`. When `ledger_info_with_sigs` is provided, verify
731    /// that the transaction accumulator root hash it carries is generated
732    /// after the `txns_to_commit` are applied. Note that even if
733    /// `txns_to_commit` is empty, `frist_version` is checked to be
734    /// `ledger_info_with_sigs.ledger_info.version + 1` if
735    /// `ledger_info_with_sigs` is not `None`.
736    fn save_transactions(
737        &self, txns_to_commit: &[TransactionToCommit], first_version: Version,
738        ledger_info_with_sigs: Option<&LedgerInfoWithSignatures>,
739        pos_state: Option<PosState>, committed_blocks: Vec<CommittedBlock>,
740        ledger_infos_with_voted_block: Vec<(
741            HashValue,
742            LedgerInfoWithSignatures,
743        )>,
744    ) -> Result<()> {
745        gauged_api("save_transactions", || {
746            let num_txns = txns_to_commit.len() as u64;
747            // ledger_info_with_sigs could be None if we are doing state
748            // synchronization. In this case txns_to_commit should
749            // not be empty. Otherwise it is okay to commit empty blocks.
750            ensure!(
751                ledger_info_with_sigs.is_some() || num_txns > 0,
752                "txns_to_commit is empty while ledger_info_with_sigs is None.",
753            );
754
755            // Gather db mutations to `batch`.
756            let mut cs = ChangeSet::new();
757
758            if let Some(x) = ledger_info_with_sigs {
759                let claimed_last_version = x.ledger_info().version();
760                ensure!(
761                    claimed_last_version + 1 == first_version + num_txns,
762                    "Transaction batch not applicable: first_version {}, num_txns {}, last_version {}",
763                    first_version,
764                    num_txns,
765                    claimed_last_version,
766                );
767            }
768
769            let _new_root_hash = self.save_transactions_impl(
770                txns_to_commit,
771                first_version,
772                &mut cs,
773            )?;
774
775            for b in committed_blocks {
776                self.ledger_store.put_committed_block(&b, &mut cs)?;
777            }
778
779            for (voted_block, ledger_info) in ledger_infos_with_voted_block {
780                self.ledger_store.put_ledger_info_by_voted_block(
781                    &voted_block,
782                    &ledger_info,
783                    &mut cs,
784                )?;
785            }
786
787            // If expected ledger info is provided, verify result root hash and
788            // save the ledger info.
789            if let Some(x) = ledger_info_with_sigs {
790                // let expected_root_hash =
791                //     x.ledger_info().transaction_accumulator_hash();
792                // ensure!(
793                //     new_root_hash == expected_root_hash,
794                //     "Root hash calculated doesn't match expected. {:?} vs
795                // {:?}",     new_root_hash,
796                //     expected_root_hash,
797                // );
798
799                self.ledger_store.put_ledger_info(x, &mut cs)?;
800                if let Some(pos_state) = pos_state {
801                    // ledger_info and pos_state are always `Some` for now.
802                    self.ledger_store.put_pos_state(
803                        &x.ledger_info().consensus_block_id(),
804                        pos_state,
805                        &mut cs,
806                    )?;
807                }
808            }
809
810            // Persist.
811            let sealed_cs = self.seal_change_set(cs)?;
812            {
813                let _timer = DIEM_STORAGE_OTHER_TIMERS_SECONDS
814                    .with_label_values(&["save_transactions_commit"])
815                    .start_timer();
816                self.commit(sealed_cs)?;
817            }
818
819            // Once everything is successfully persisted, update the latest
820            // in-memory ledger info.
821            if let Some(x) = ledger_info_with_sigs {
822                self.ledger_store.set_latest_ledger_info(x.clone());
823
824                DIEM_STORAGE_LEDGER_VERSION
825                    .set(x.ledger_info().version() as i64);
826                DIEM_STORAGE_NEXT_BLOCK_EPOCH
827                    .set(x.ledger_info().next_block_epoch() as i64);
828            }
829
830            // Only increment counter if commit succeeds and there are at least
831            // one transaction written to the storage. That's also
832            // when we'd inform the pruner thread to work.
833            if num_txns > 0 {
834                let last_version = first_version + num_txns - 1;
835                DIEM_STORAGE_COMMITTED_TXNS.inc_by(num_txns);
836                DIEM_STORAGE_LATEST_TXN_VERSION.set(last_version as i64);
837            }
838
839            Ok(())
840        })
841    }
842
843    fn save_reward_event(
844        &self, epoch: u64, event: &RewardDistributionEventV2,
845    ) -> Result<()> {
846        self.ledger_store.put_reward_event(epoch, event)
847    }
848
849    fn delete_pos_state_by_block(&self, block_id: &HashValue) -> Result<()> {
850        self.ledger_store.delete_pos_state(block_id)
851    }
852}
853
854impl DBReaderForPoW for PosLedgerDB {
855    fn get_latest_ledger_info_option(
856        &self,
857    ) -> Option<LedgerInfoWithSignatures> {
858        self.ledger_store.get_latest_ledger_info_option()
859    }
860
861    fn get_block_ledger_info(
862        &self, consensus_block_id: &HashValue,
863    ) -> Result<LedgerInfoWithSignatures> {
864        self.ledger_store.get_block_ledger_info(consensus_block_id)
865    }
866
867    fn get_events_by_version(
868        &self, start_version: u64, end_version: u64,
869    ) -> Result<Vec<ContractEvent>> {
870        let iter = self.event_store.get_events_by_version_iter(
871            start_version,
872            (end_version - start_version) as usize,
873        )?;
874        let events_vec = iter.collect::<Result<Vec<Vec<ContractEvent>>>>()?;
875        Ok(events_vec.into_iter().flatten().collect())
876    }
877
878    fn get_epoch_ending_blocks(
879        &self, start_epoch: u64, end_epoch: u64,
880    ) -> Result<Vec<HashValue>> {
881        let mut ending_blocks = Vec::new();
882        for ledger_info in self
883            .ledger_store
884            .get_epoch_ending_ledger_info_iter(start_epoch, end_epoch)?
885        {
886            ending_blocks.push(ledger_info?.ledger_info().consensus_block_id());
887        }
888        Ok(ending_blocks)
889    }
890
891    fn get_reward_event(
892        &self, epoch: u64,
893    ) -> Result<RewardDistributionEventV2> {
894        self.ledger_store.get_reward_event(epoch)
895    }
896
897    fn get_committed_block_by_hash(
898        &self, block_hash: &HashValue,
899    ) -> Result<CommittedBlock> {
900        self.ledger_store.get_committed_block_by_hash(block_hash)
901    }
902
903    fn get_committed_block_hash_by_view(&self, view: u64) -> Result<HashValue> {
904        self.ledger_store.get_committed_block_hash_by_view(view)
905    }
906
907    fn get_ledger_info_by_voted_block(
908        &self, block_id: &HashValue,
909    ) -> Result<LedgerInfoWithSignatures> {
910        self.ledger_store.get_ledger_info_by_voted_block(block_id)
911    }
912
913    fn get_block_hash_by_epoch_and_round(
914        &self, epoch: u64, round: u64,
915    ) -> Result<HashValue> {
916        self.ledger_store
917            .get_block_hash_by_epoch_and_round(epoch, round)
918    }
919}
920
921#[cfg(any(test, feature = "fuzzing"))]
922// Convert requested range and order to a range in ascending order.
923fn get_first_seq_num_and_limit(
924    order: Order, cursor: u64, limit: u64,
925) -> Result<(u64, u64)> {
926    ensure!(limit > 0, "limit should > 0, got {}", limit);
927
928    Ok(if order == Order::Ascending {
929        (cursor, limit)
930    } else if limit <= cursor {
931        (cursor - limit + 1, limit)
932    } else {
933        (0, cursor + 1)
934    })
935}
936
937fn gauged_api<T, F>(api_name: &'static str, api_impl: F) -> Result<T>
938where F: FnOnce() -> Result<T> {
939    let timer = Instant::now();
940
941    let res = api_impl();
942
943    let res_type = match &res {
944        Ok(_) => "Ok",
945        Err(e) => {
946            diem_warn!(
947                api_name = api_name,
948                error = ?e,
949                "DiemDB API returned error."
950            );
951            "Err"
952        }
953    };
954    DIEM_STORAGE_API_LATENCY_SECONDS
955        .with_label_values(&[api_name, res_type])
956        .observe(timer.elapsed().as_secs_f64());
957
958    res
959}