pos_ledger_db/
lib.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8#![forbid(unsafe_code)]
9
10//! This crate provides [`PosLedgerDB`] which represents physical storage of the
11//! core Diem data structures.
12//!
13//! It relays read/write operations on the physical storage via [`schemadb`] to
14//! the underlying Key-Value storage system, and implements diem data structures
15//! on top of it.
16
17use std::{iter::Iterator, path::Path, sync::Arc, time::Instant};
18
19use anyhow::{ensure, Result};
20use itertools::{izip, zip_eq};
21
22use diem_config::config::RocksdbConfig;
23use diem_crypto::hash::{
24    CryptoHash, HashValue, SPARSE_MERKLE_PLACEHOLDER_HASH,
25};
26use diem_logger::prelude::*;
27use diem_types::{
28    committed_block::CommittedBlock,
29    contract_event::ContractEvent,
30    epoch_change::EpochChangeProof,
31    ledger_info::LedgerInfoWithSignatures,
32    reward_distribution_event::RewardDistributionEventV2,
33    term_state::PosState,
34    transaction::{Transaction, TransactionInfo, TransactionToCommit, Version},
35};
36#[cfg(feature = "fuzzing")]
37pub use diemdb_test::test_save_blocks_impl;
38use schemadb::{ColumnFamilyName, Options, DB, DEFAULT_CF_NAME};
39#[cfg(any(test, feature = "fuzzing"))]
40use storage_interface::Order;
41use storage_interface::{
42    DBReaderForPoW, DbReader, DbWriter, StartupInfo, TreeState,
43};
44
45use crate::{
46    change_set::{ChangeSet, SealedChangeSet},
47    event_store::EventStore,
48    ledger_store::LedgerStore,
49    schema::*,
50    transaction_store::TransactionStore,
51};
52use diem_types::block_metadata::BlockMetadata;
53
54// Used in this and other crates for testing.
55#[cfg(any(test, feature = "fuzzing"))]
56pub mod test_helper;
57
58pub mod errors;
59pub mod schema;
60
61mod change_set;
62mod event_store;
63mod ledger_store;
64mod transaction_store;
65
66#[cfg(any(test, feature = "fuzzing"))]
67#[allow(dead_code)]
68mod diemdb_test;
69
70// TODO: Either implement an iteration API to allow a very old client to loop
71// through a long history or guarantee that there is always a recent enough
72// waypoint and client knows to boot from there.
73const MAX_NUM_EPOCH_ENDING_LEDGER_INFO: usize = 100;
74
75fn gen_rocksdb_options(config: &RocksdbConfig) -> Options {
76    let mut db_opts = Options::default();
77    db_opts.set_max_open_files(config.max_open_files);
78    db_opts.set_max_total_wal_size(config.max_total_wal_size);
79    db_opts
80}
81
82/// This holds a handle to the underlying DB responsible for physical storage
83/// and provides APIs for access to the core Diem data structures.
84#[derive(Debug)]
85pub struct PosLedgerDB {
86    db: Arc<DB>,
87    ledger_store: Arc<LedgerStore>,
88    transaction_store: Arc<TransactionStore>,
89    event_store: Arc<EventStore>,
90}
91
92impl PosLedgerDB {
93    fn column_families() -> Vec<ColumnFamilyName> {
94        vec![
95            /* LedgerInfo CF = */ DEFAULT_CF_NAME,
96            EPOCH_BY_VERSION_CF_NAME,
97            EVENT_ACCUMULATOR_CF_NAME,
98            EVENT_BY_KEY_CF_NAME,
99            EVENT_BY_VERSION_CF_NAME,
100            EVENT_CF_NAME,
101            JELLYFISH_MERKLE_NODE_CF_NAME,
102            LEDGER_COUNTERS_CF_NAME,
103            STALE_NODE_INDEX_CF_NAME,
104            TRANSACTION_CF_NAME,
105            TRANSACTION_ACCUMULATOR_CF_NAME,
106            TRANSACTION_BY_ACCOUNT_CF_NAME,
107            TRANSACTION_INFO_CF_NAME,
108            LEDGER_INFO_BY_BLOCK_CF_NAME,
109            POS_STATE_CF_NAME,
110            REWARD_EVENT_CF_NAME,
111            COMMITTED_BLOCK_CF_NAME,
112            COMMITTED_BLOCK_BY_VIEW_CF_NAME,
113            LEDGER_INFO_BY_VOTED_BLOCK_CF_NAME,
114            BLOCK_BY_EPOCH_AND_ROUND_CF_NAME,
115        ]
116    }
117
118    fn new_with_db(db: DB) -> Self {
119        let db = Arc::new(db);
120
121        PosLedgerDB {
122            db: Arc::clone(&db),
123            event_store: Arc::new(EventStore::new(Arc::clone(&db))),
124            ledger_store: Arc::new(LedgerStore::new(Arc::clone(&db))),
125            transaction_store: Arc::new(TransactionStore::new(Arc::clone(&db))),
126        }
127    }
128
129    pub fn open<P: AsRef<Path> + Clone>(
130        db_root_path: P, readonly: bool, rocksdb_config: RocksdbConfig,
131    ) -> Result<Self> {
132        let path = db_root_path.as_ref().join("pos-ledger-db");
133        let instant = Instant::now();
134
135        let mut rocksdb_opts = gen_rocksdb_options(&rocksdb_config);
136
137        let db = if readonly {
138            DB::open_readonly(
139                path.clone(),
140                "diemdb_ro",
141                Self::column_families(),
142                rocksdb_opts,
143            )?
144        } else {
145            rocksdb_opts.create_if_missing(true);
146            rocksdb_opts.create_missing_column_families(true);
147            DB::open(
148                path.clone(),
149                "pos-ledger-db",
150                Self::column_families(),
151                rocksdb_opts,
152            )?
153        };
154
155        let ret = Self::new_with_db(db);
156        diem_info!(
157            path = path,
158            time_ms = %instant.elapsed().as_millis(),
159            "Opened PosLedgerDB.",
160        );
161        Ok(ret)
162    }
163
164    /// This opens db in non-readonly mode.
165    #[cfg(any(test, feature = "fuzzing"))]
166    pub fn new_for_test<P: AsRef<Path> + Clone>(db_root_path: P) -> Self {
167        Self::open(
168            db_root_path,
169            false, /* readonly */
170            RocksdbConfig::default(),
171        )
172        .expect("Unable to open DiemDB")
173    }
174
175    /// Returns ledger infos reflecting epoch bumps starting with the given
176    /// epoch. If there are no more than `MAX_NUM_EPOCH_ENDING_LEDGER_INFO`
177    /// results, this function returns all of them, otherwise the first
178    /// `MAX_NUM_EPOCH_ENDING_LEDGER_INFO` results are returned and a flag
179    /// (when true) will be used to indicate the fact that there is more.
180    fn get_epoch_ending_ledger_infos(
181        &self, start_epoch: u64, end_epoch: u64, limit: usize,
182    ) -> Result<(Vec<LedgerInfoWithSignatures>, bool)> {
183        self.get_epoch_ending_ledger_infos_impl(start_epoch, end_epoch, limit)
184    }
185
186    fn get_epoch_ending_ledger_infos_impl(
187        &self, start_epoch: u64, end_epoch: u64, limit: usize,
188    ) -> Result<(Vec<LedgerInfoWithSignatures>, bool)> {
189        ensure!(
190            start_epoch <= end_epoch,
191            "Bad epoch range [{}, {})",
192            start_epoch,
193            end_epoch,
194        );
195        // Note that the latest epoch can be the same with the current epoch (in
196        // most cases), or current_epoch + 1 (when the latest
197        // ledger_info carries next validator set)
198        let latest_epoch = self
199            .ledger_store
200            .get_latest_ledger_info()?
201            .ledger_info()
202            .next_block_epoch();
203        ensure!(
204            end_epoch <= latest_epoch,
205            "Unable to provide epoch change ledger info for still open epoch. asked upper bound: {}, last sealed epoch: {}",
206            end_epoch,
207            latest_epoch - 1,  // okay to -1 because genesis LedgerInfo has .next_block_epoch() == 1
208        );
209
210        let (paging_epoch, more) = if end_epoch - start_epoch > limit as u64 {
211            (start_epoch + limit as u64, true)
212        } else {
213            (end_epoch, false)
214        };
215
216        let lis = self
217            .ledger_store
218            .get_epoch_ending_ledger_info_iter(start_epoch, paging_epoch)?
219            .collect::<Result<Vec<_>>>()?;
220        ensure!(
221            lis.len() == (paging_epoch - start_epoch) as usize,
222            "DB corruption: missing epoch ending ledger info for epoch {}",
223            lis.last()
224                .map(|li| li.ledger_info().next_block_epoch())
225                .unwrap_or(start_epoch),
226        );
227        Ok((lis, more))
228    }
229
230    // TODO check if already have methods can do this
231    pub fn get_transaction(&self, version: Version) -> Result<Transaction> {
232        self.transaction_store.get_transaction(version)
233    }
234
235    pub fn get_transaction_block_meta(
236        &self, version: Version,
237    ) -> Result<Option<(Version, BlockMetadata)>> {
238        self.transaction_store.get_block_metadata(version)
239    }
240
241    pub fn get_transaction_info(
242        &self, version: u64,
243    ) -> Result<TransactionInfo> {
244        self.ledger_store.get_transaction_info(version)
245    }
246
247    /// Convert a `ChangeSet` to `SealedChangeSet`.
248    fn seal_change_set(&self, cs: ChangeSet) -> Result<SealedChangeSet> {
249        Ok(SealedChangeSet { batch: cs.batch })
250    }
251
252    fn save_transactions_impl(
253        &self, txns_to_commit: &[TransactionToCommit], first_version: u64,
254        mut cs: &mut ChangeSet,
255    ) -> Result<HashValue> {
256        let last_version = first_version + txns_to_commit.len() as u64 - 1;
257
258        // State root hashes are always default (state tree removed).
259        let state_root_hashes = vec![Default::default(); txns_to_commit.len()];
260
261        // Event updates. Gather event accumulator root hashes.
262        let event_root_hashes =
263            zip_eq(first_version..=last_version, txns_to_commit)
264                .map(|(ver, txn_to_commit)| {
265                    self.event_store.put_events(
266                        ver,
267                        txn_to_commit.events(),
268                        &mut cs,
269                    )
270                })
271                .collect::<Result<Vec<_>>>()?;
272
273        // Transaction updates. Gather transaction hashes.
274        zip_eq(first_version..=last_version, txns_to_commit).try_for_each(
275            |(ver, txn_to_commit)| {
276                self.transaction_store.put_transaction(
277                    ver,
278                    txn_to_commit.transaction(),
279                    &mut cs,
280                )
281            },
282        )?;
283
284        // Transaction accumulator updates. Get result root hash.
285        let txn_infos =
286            izip!(txns_to_commit, state_root_hashes, event_root_hashes)
287                .map(|(t, s, e)| {
288                    Ok(TransactionInfo::new(
289                        t.transaction().hash(),
290                        s,
291                        e,
292                        t.gas_used(),
293                        t.status().clone(),
294                    ))
295                })
296                .collect::<Result<Vec<_>>>()?;
297        assert_eq!(txn_infos.len(), txns_to_commit.len());
298
299        let new_root_hash = self.ledger_store.put_transaction_infos(
300            first_version,
301            &txn_infos,
302            &mut cs,
303        )?;
304
305        Ok(new_root_hash)
306    }
307
308    /// Write the whole schema batch including all data necessary to mutate the
309    /// ledger state of some transaction by leveraging rocksdb atomicity
310    /// support. Also committed are the LedgerCounters.
311    fn commit(&self, sealed_cs: SealedChangeSet) -> Result<()> {
312        self.db.write_schemas(sealed_cs.batch, false)?;
313
314        Ok(())
315    }
316}
317
318impl DbReader for PosLedgerDB {
319    fn get_epoch_ending_ledger_infos(
320        &self, start_epoch: u64, end_epoch: u64,
321    ) -> Result<EpochChangeProof> {
322        gauged_api("get_epoch_ending_ledger_infos", || {
323            let (ledger_info_with_sigs, more) =
324                Self::get_epoch_ending_ledger_infos(
325                    &self,
326                    start_epoch,
327                    end_epoch,
328                    MAX_NUM_EPOCH_ENDING_LEDGER_INFO,
329                )?;
330            Ok(EpochChangeProof::new(ledger_info_with_sigs, more))
331        })
332    }
333
334    fn get_block_timestamp(&self, version: u64) -> Result<u64> {
335        gauged_api("get_block_timestamp", || {
336            let ts = match self.transaction_store.get_block_metadata(version)? {
337                Some((_v, block_meta)) => block_meta.into_inner().1,
338                // genesis timestamp is 0
339                None => 0,
340            };
341            Ok(ts)
342        })
343    }
344
345    fn get_latest_ledger_info(&self) -> Result<LedgerInfoWithSignatures> {
346        gauged_api("get_latest_ledger_info", || {
347            self.ledger_store.get_latest_ledger_info()
348        })
349    }
350
351    fn get_startup_info(
352        &self, need_pos_state: bool,
353    ) -> Result<Option<StartupInfo>> {
354        gauged_api("get_startup_info", || {
355            self.ledger_store.get_startup_info(need_pos_state)
356        })
357    }
358
359    fn get_latest_tree_state(&self) -> Result<TreeState> {
360        gauged_api("get_latest_tree_state", || {
361            let tree_state = match self
362                .ledger_store
363                .get_latest_transaction_info_option()?
364            {
365                Some((version, _txn_info)) => {
366                    let frozen_subtrees = self
367                        .ledger_store
368                        .get_frozen_subtree_hashes(version + 1)?;
369                    TreeState::new(
370                        version + 1,
371                        frozen_subtrees,
372                        *SPARSE_MERKLE_PLACEHOLDER_HASH,
373                    )
374                }
375                None => {
376                    TreeState::new(0, vec![], *SPARSE_MERKLE_PLACEHOLDER_HASH)
377                }
378            };
379
380            diem_info!(
381                num_transactions = tree_state.num_transactions,
382                state_root_hash = %tree_state.account_state_root_hash,
383                description = tree_state.describe(),
384                "Got latest TreeState."
385            );
386
387            Ok(tree_state)
388        })
389    }
390
391    /// Gets ledger info at specified version and ensures it's an epoch ending.
392    fn get_epoch_ending_ledger_info(
393        &self, version: u64,
394    ) -> Result<LedgerInfoWithSignatures> {
395        gauged_api("get_epoch_ending_ledger_info", || {
396            self.ledger_store.get_epoch_ending_ledger_info(version)
397        })
398    }
399
400    fn get_latest_transaction_info_option(
401        &self,
402    ) -> Result<Option<(Version, TransactionInfo)>> {
403        gauged_api("get_latest_transaction_info_option", || {
404            self.ledger_store.get_latest_transaction_info_option()
405        })
406    }
407
408    fn get_pos_state(&self, block_id: &HashValue) -> Result<PosState> {
409        diem_debug!("get_pos_state:{}", block_id);
410        self.ledger_store.get_pos_state(block_id)
411    }
412
413    fn get_latest_pos_state(&self) -> Arc<PosState> {
414        self.ledger_store.get_latest_pos_state()
415    }
416}
417
418impl DbWriter for PosLedgerDB {
419    /// `first_version` is the version of the first transaction in
420    /// `txns_to_commit`. When `ledger_info_with_sigs` is provided, verify
421    /// that the transaction accumulator root hash it carries is generated
422    /// after the `txns_to_commit` are applied. Note that even if
423    /// `txns_to_commit` is empty, `frist_version` is checked to be
424    /// `ledger_info_with_sigs.ledger_info.version + 1` if
425    /// `ledger_info_with_sigs` is not `None`.
426    fn save_transactions(
427        &self, txns_to_commit: &[TransactionToCommit], first_version: Version,
428        ledger_info_with_sigs: Option<&LedgerInfoWithSignatures>,
429        pos_state: Option<PosState>, committed_blocks: Vec<CommittedBlock>,
430        ledger_infos_with_voted_block: Vec<(
431            HashValue,
432            LedgerInfoWithSignatures,
433        )>,
434    ) -> Result<()> {
435        gauged_api("save_transactions", || {
436            let num_txns = txns_to_commit.len() as u64;
437            // ledger_info_with_sigs could be None if we are doing state
438            // synchronization. In this case txns_to_commit should
439            // not be empty. Otherwise it is okay to commit empty blocks.
440            ensure!(
441                ledger_info_with_sigs.is_some() || num_txns > 0,
442                "txns_to_commit is empty while ledger_info_with_sigs is None.",
443            );
444
445            // Gather db mutations to `batch`.
446            let mut cs = ChangeSet::new();
447
448            if let Some(x) = ledger_info_with_sigs {
449                let claimed_last_version = x.ledger_info().version();
450                ensure!(
451                    claimed_last_version + 1 == first_version + num_txns,
452                    "Transaction batch not applicable: first_version {}, num_txns {}, last_version {}",
453                    first_version,
454                    num_txns,
455                    claimed_last_version,
456                );
457            }
458
459            let _new_root_hash = self.save_transactions_impl(
460                txns_to_commit,
461                first_version,
462                &mut cs,
463            )?;
464
465            for b in committed_blocks {
466                self.ledger_store.put_committed_block(&b, &mut cs)?;
467            }
468
469            for (voted_block, ledger_info) in ledger_infos_with_voted_block {
470                self.ledger_store.put_ledger_info_by_voted_block(
471                    &voted_block,
472                    &ledger_info,
473                    &mut cs,
474                )?;
475            }
476
477            // If expected ledger info is provided, verify result root hash and
478            // save the ledger info.
479            if let Some(x) = ledger_info_with_sigs {
480                // let expected_root_hash =
481                //     x.ledger_info().transaction_accumulator_hash();
482                // ensure!(
483                //     new_root_hash == expected_root_hash,
484                //     "Root hash calculated doesn't match expected. {:?} vs
485                // {:?}",     new_root_hash,
486                //     expected_root_hash,
487                // );
488
489                self.ledger_store.put_ledger_info(x, &mut cs)?;
490                if let Some(pos_state) = pos_state {
491                    // ledger_info and pos_state are always `Some` for now.
492                    self.ledger_store.put_pos_state(
493                        &x.ledger_info().consensus_block_id(),
494                        pos_state,
495                        &mut cs,
496                    )?;
497                }
498            }
499
500            // Persist.
501            let sealed_cs = self.seal_change_set(cs)?;
502            self.commit(sealed_cs)?;
503
504            // Once everything is successfully persisted, update the latest
505            // in-memory ledger info.
506            if let Some(x) = ledger_info_with_sigs {
507                self.ledger_store.set_latest_ledger_info(x.clone());
508            }
509
510            Ok(())
511        })
512    }
513
514    fn save_reward_event(
515        &self, epoch: u64, event: &RewardDistributionEventV2,
516    ) -> Result<()> {
517        self.ledger_store.put_reward_event(epoch, event)
518    }
519
520    fn delete_pos_state_by_block(&self, block_id: &HashValue) -> Result<()> {
521        self.ledger_store.delete_pos_state(block_id)
522    }
523}
524
525impl DBReaderForPoW for PosLedgerDB {
526    fn get_latest_ledger_info_option(
527        &self,
528    ) -> Option<LedgerInfoWithSignatures> {
529        self.ledger_store.get_latest_ledger_info_option()
530    }
531
532    fn get_block_ledger_info(
533        &self, consensus_block_id: &HashValue,
534    ) -> Result<LedgerInfoWithSignatures> {
535        self.ledger_store.get_block_ledger_info(consensus_block_id)
536    }
537
538    fn get_events_by_version(
539        &self, start_version: u64, end_version: u64,
540    ) -> Result<Vec<ContractEvent>> {
541        let iter = self.event_store.get_events_by_version_iter(
542            start_version,
543            (end_version - start_version) as usize,
544        )?;
545        let events_vec = iter.collect::<Result<Vec<Vec<ContractEvent>>>>()?;
546        Ok(events_vec.into_iter().flatten().collect())
547    }
548
549    fn get_epoch_ending_blocks(
550        &self, start_epoch: u64, end_epoch: u64,
551    ) -> Result<Vec<HashValue>> {
552        let mut ending_blocks = Vec::new();
553        for ledger_info in self
554            .ledger_store
555            .get_epoch_ending_ledger_info_iter(start_epoch, end_epoch)?
556        {
557            ending_blocks.push(ledger_info?.ledger_info().consensus_block_id());
558        }
559        Ok(ending_blocks)
560    }
561
562    fn get_reward_event(
563        &self, epoch: u64,
564    ) -> Result<RewardDistributionEventV2> {
565        self.ledger_store.get_reward_event(epoch)
566    }
567
568    fn get_committed_block_by_hash(
569        &self, block_hash: &HashValue,
570    ) -> Result<CommittedBlock> {
571        self.ledger_store.get_committed_block_by_hash(block_hash)
572    }
573
574    fn get_committed_block_hash_by_view(&self, view: u64) -> Result<HashValue> {
575        self.ledger_store.get_committed_block_hash_by_view(view)
576    }
577
578    fn get_ledger_info_by_voted_block(
579        &self, block_id: &HashValue,
580    ) -> Result<LedgerInfoWithSignatures> {
581        self.ledger_store.get_ledger_info_by_voted_block(block_id)
582    }
583
584    fn get_block_hash_by_epoch_and_round(
585        &self, epoch: u64, round: u64,
586    ) -> Result<HashValue> {
587        self.ledger_store
588            .get_block_hash_by_epoch_and_round(epoch, round)
589    }
590}
591
592#[cfg(any(test, feature = "fuzzing"))]
593// Convert requested range and order to a range in ascending order.
594fn get_first_seq_num_and_limit(
595    order: Order, cursor: u64, limit: u64,
596) -> Result<(u64, u64)> {
597    ensure!(limit > 0, "limit should > 0, got {}", limit);
598
599    Ok(if order == Order::Ascending {
600        (cursor, limit)
601    } else if limit <= cursor {
602        (cursor - limit + 1, limit)
603    } else {
604        (0, cursor + 1)
605    })
606}
607
608fn gauged_api<T, F>(api_name: &'static str, api_impl: F) -> Result<T>
609where F: FnOnce() -> Result<T> {
610    let res = api_impl();
611
612    if let Err(e) = &res {
613        diem_warn!(
614            api_name = api_name,
615            error = ?e,
616            "DiemDB API returned error."
617        );
618    }
619
620    res
621}