schemadb/
lib.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8#![forbid(unsafe_code)]
9
10//! This library implements a schematized DB on top of [RocksDB](https://rocksdb.org/). It makes
11//! sure all data passed in and out are structured according to predefined
12//! schemas and prevents access to raw keys and values. This library also
13//! enforces a set of Diem specific DB options, like custom comparators and
14//! schema-to-column-family mapping.
15//!
16//! It requires that different kinds of key-value pairs be stored in separate
17//! column families.  To use this library to store a kind of key-value pairs,
18//! the user needs to use the [`define_schema!`] macro to define the schema
19//! name, the types of key and value, and name of the column family.
20
21#[macro_use]
22pub mod schema;
23
24use crate::schema::{KeyCodec, Schema, SeekKeyCodec, ValueCodec};
25use anyhow::{ensure, format_err, Result};
26use diem_logger::prelude::*;
27use rocksdb::Writable;
28use std::{
29    collections::{BTreeMap, HashMap, HashSet},
30    iter::Iterator,
31    marker::PhantomData,
32    path::Path,
33};
34
35/// Type alias to `rocksdb::ReadOptions`. See [`rocksdb doc`](https://github.com/pingcap/rust-rocksdb/blob/master/src/rocksdb_options.rs)
36pub type ReadOptions = rocksdb::ReadOptions;
37
38/// Type alias to `rocksdb::Options`.
39pub type Options = rocksdb::DBOptions;
40
41/// Type alias to improve readability.
42pub type ColumnFamilyName = &'static str;
43
44/// Name for the `default` column family that's always open by RocksDB. We use
45/// it to store [`LedgerInfo`](../types/ledger_info/struct.LedgerInfo.html).
46pub const DEFAULT_CF_NAME: ColumnFamilyName = "default";
47
48#[derive(Debug)]
49enum WriteOp {
50    Value(Vec<u8>),
51    Deletion,
52}
53
54/// `SchemaBatch` holds a collection of updates that can be applied to a DB
55/// atomically. The updates will be applied in the order in which they are added
56/// to the `SchemaBatch`.
57#[derive(Debug, Default)]
58pub struct SchemaBatch {
59    rows: HashMap<ColumnFamilyName, BTreeMap<Vec<u8>, WriteOp>>,
60}
61
62impl SchemaBatch {
63    /// Creates an empty batch.
64    pub fn new() -> Self { Self::default() }
65
66    /// Adds an insert/update operation to the batch.
67    pub fn put<S: Schema>(
68        &mut self, key: &S::Key, value: &S::Value,
69    ) -> Result<()> {
70        let key = <S::Key as KeyCodec<S>>::encode_key(key)?;
71        let value = <S::Value as ValueCodec<S>>::encode_value(value)?;
72        self.rows
73            .entry(S::COLUMN_FAMILY_NAME)
74            .or_insert_with(BTreeMap::new)
75            .insert(key, WriteOp::Value(value));
76
77        Ok(())
78    }
79
80    /// Adds a delete operation to the batch.
81    pub fn delete<S: Schema>(&mut self, key: &S::Key) -> Result<()> {
82        let key = <S::Key as KeyCodec<S>>::encode_key(key)?;
83        self.rows
84            .entry(S::COLUMN_FAMILY_NAME)
85            .or_insert_with(BTreeMap::new)
86            .insert(key, WriteOp::Deletion);
87
88        Ok(())
89    }
90}
91
92pub enum ScanDirection {
93    Forward,
94    Backward,
95}
96
97/// DB Iterator parameterized on [`Schema`] that seeks with [`Schema::Key`] and
98/// yields [`Schema::Key`] and [`Schema::Value`]
99pub struct SchemaIterator<'a, S> {
100    db_iter: rocksdb::DBIterator<&'a rocksdb::DB>,
101    direction: ScanDirection,
102    phantom: PhantomData<S>,
103}
104
105impl<'a, S> SchemaIterator<'a, S>
106where S: Schema
107{
108    fn new(
109        db_iter: rocksdb::DBIterator<&'a rocksdb::DB>, direction: ScanDirection,
110    ) -> Self {
111        SchemaIterator {
112            db_iter,
113            direction,
114            phantom: PhantomData,
115        }
116    }
117
118    /// Seeks to the first key.
119    pub fn seek_to_first(&mut self) {
120        self.db_iter.seek(rocksdb::SeekKey::Start).unwrap();
121    }
122
123    /// Seeks to the last key.
124    pub fn seek_to_last(&mut self) {
125        self.db_iter.seek(rocksdb::SeekKey::End).unwrap();
126    }
127
128    /// Seeks to the first key whose binary representation is equal to or
129    /// greater than that of the `seek_key`.
130    pub fn seek<SK>(&mut self, seek_key: &SK) -> Result<()>
131    where SK: SeekKeyCodec<S> {
132        let key = <SK as SeekKeyCodec<S>>::encode_seek_key(seek_key)?;
133        self.db_iter.seek(rocksdb::SeekKey::Key(&key)).unwrap();
134        Ok(())
135    }
136
137    /// Seeks to the last key whose binary representation is less than or equal
138    /// to that of the `seek_key`.
139    ///
140    /// See example in [`RocksDB doc`](https://github.com/facebook/rocksdb/wiki/SeekForPrev).
141    pub fn seek_for_prev<SK>(&mut self, seek_key: &SK) -> Result<()>
142    where SK: SeekKeyCodec<S> {
143        let key = <SK as SeekKeyCodec<S>>::encode_seek_key(seek_key)?;
144        self.db_iter
145            .seek_for_prev(rocksdb::SeekKey::Key(&key))
146            .unwrap();
147        Ok(())
148    }
149
150    fn next_impl(&mut self) -> Result<Option<(S::Key, S::Value)>> {
151        if !self.db_iter.valid().unwrap() {
152            return Ok(None);
153        }
154
155        let raw_key = self.db_iter.key();
156        let raw_value = self.db_iter.value();
157
158        let key = <S::Key as KeyCodec<S>>::decode_key(raw_key)?;
159        let value = <S::Value as ValueCodec<S>>::decode_value(raw_value)?;
160
161        match self.direction {
162            ScanDirection::Forward => self.db_iter.next().unwrap(),
163            ScanDirection::Backward => self.db_iter.prev().unwrap(),
164        };
165
166        Ok(Some((key, value)))
167    }
168}
169
170impl<'a, S> Iterator for SchemaIterator<'a, S>
171where S: Schema
172{
173    type Item = Result<(S::Key, S::Value)>;
174
175    fn next(&mut self) -> Option<Self::Item> { self.next_impl().transpose() }
176}
177
178/// All the RocksDB methods return `std::result::Result<T, String>`. Since our
179/// methods return `anyhow::Result<T>`, manual conversion is needed.
180fn convert_rocksdb_err(msg: String) -> anyhow::Error {
181    format_err!("RocksDB internal error: {}.", msg)
182}
183
184/// This DB is a schematized RocksDB wrapper where all data passed in and out
185/// are typed according to [`Schema`]s.
186#[derive(Debug)]
187pub struct DB {
188    inner: rocksdb::DB,
189}
190
191impl DB {
192    /// Create db with all the column families provided if it doesn't exist at
193    /// `path`; Otherwise, try to open it with all the column families.
194    pub fn open(
195        path: impl AsRef<Path>, name: &'static str,
196        column_families: Vec<ColumnFamilyName>, db_opts: Options,
197    ) -> Result<Self> {
198        {
199            let cfs_set: HashSet<_> = column_families.iter().collect();
200            ensure!(
201                cfs_set.contains(&DEFAULT_CF_NAME),
202                "No \"default\" column family name is provided.",
203            );
204            ensure!(
205                cfs_set.len() == column_families.len(),
206                "Duplicate column family name found.",
207            );
208        }
209
210        let db = DB::open_cf(db_opts, path, name, column_families)?;
211        Ok(db)
212    }
213
214    /// Open db in readonly mode
215    /// Note that this still assumes there's only one process that opens the
216    /// same DB. See `open_as_secondary`
217    pub fn open_readonly(
218        path: impl AsRef<Path>, name: &'static str,
219        column_families: Vec<ColumnFamilyName>, db_opts: Options,
220    ) -> Result<Self> {
221        DB::open_cf_readonly(db_opts, path, name, column_families)
222    }
223
224    fn open_cf(
225        db_opts: Options, path: impl AsRef<Path>, name: &'static str,
226        column_families: Vec<ColumnFamilyName>,
227    ) -> Result<DB> {
228        let inner = rocksdb::DB::open_cf(
229            db_opts,
230            path.as_ref().to_str().ok_or_else(|| {
231                format_err!(
232                    "Path {:?} can not be converted to string.",
233                    path.as_ref()
234                )
235            })?,
236            column_families
237                .iter()
238                .map(|cf_name| {
239                    let cf_opts = rocksdb::ColumnFamilyOptions::default();
240                    rocksdb::rocksdb_options::ColumnFamilyDescriptor::new(
241                        *cf_name, cf_opts,
242                    )
243                })
244                .collect(),
245        )
246        .map_err(convert_rocksdb_err)?;
247        Ok(Self::log_construct(name, inner))
248    }
249
250    fn open_cf_readonly(
251        opts: Options, path: impl AsRef<Path>, name: &'static str,
252        column_families: Vec<ColumnFamilyName>,
253    ) -> Result<DB> {
254        let error_if_log_file_exists = false;
255        let inner = rocksdb::DB::open_cf_for_read_only(
256            opts,
257            path.as_ref().to_str().ok_or_else(|| {
258                format_err!(
259                    "Path {:?} can not be converted to string.",
260                    path.as_ref()
261                )
262            })?,
263            column_families
264                .iter()
265                .map(|cf_name| {
266                    let cf_opts = rocksdb::ColumnFamilyOptions::default();
267                    rocksdb::rocksdb_options::ColumnFamilyDescriptor::new(
268                        *cf_name, cf_opts,
269                    )
270                })
271                .collect(),
272            error_if_log_file_exists,
273        )
274        .map_err(convert_rocksdb_err)?;
275
276        Ok(Self::log_construct(name, inner))
277    }
278
279    fn log_construct(name: &'static str, inner: rocksdb::DB) -> DB {
280        diem_info!(rocksdb_name = name, "Opened RocksDB.");
281        DB { inner }
282    }
283
284    /// Reads single record by key.
285    pub fn get<S: Schema>(
286        &self, schema_key: &S::Key,
287    ) -> Result<Option<S::Value>> {
288        let k = <S::Key as KeyCodec<S>>::encode_key(&schema_key)?;
289        let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?;
290
291        let result = self
292            .inner
293            .get_cf(cf_handle, &k)
294            .map_err(convert_rocksdb_err)?;
295
296        result
297            .map(|raw_value| {
298                <S::Value as ValueCodec<S>>::decode_value(&raw_value)
299            })
300            .transpose()
301    }
302
303    /// Writes single record.
304    pub fn put<S: Schema>(&self, key: &S::Key, value: &S::Value) -> Result<()> {
305        // Not necessary to use a batch, but we'd like a central place to bump
306        // counters. Used in tests only anyway.
307        let mut batch = SchemaBatch::new();
308        batch.put::<S>(key, value)?;
309        self.write_schemas(batch, false)
310    }
311
312    /// Delete all keys in range [begin, end).
313    ///
314    /// `SK` has to be an explicit type parameter since
315    /// <https://github.com/rust-lang/rust/issues/44721>
316    pub fn range_delete<S, SK>(&self, begin: &SK, end: &SK) -> Result<()>
317    where
318        S: Schema,
319        SK: SeekKeyCodec<S>,
320    {
321        let raw_begin = begin.encode_seek_key()?;
322        let raw_end = end.encode_seek_key()?;
323        let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?;
324
325        self.inner
326            .delete_range_cf(&cf_handle, &raw_begin, &raw_end)
327            .map_err(convert_rocksdb_err)
328    }
329
330    fn iter_with_direction<S: Schema>(
331        &self, opts: ReadOptions, direction: ScanDirection,
332    ) -> Result<SchemaIterator<'_, S>> {
333        let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?;
334        Ok(SchemaIterator::new(
335            self.inner.iter_cf_opt(cf_handle, opts),
336            direction,
337        ))
338    }
339
340    /// Returns a forward [`SchemaIterator`] on a certain schema.
341    pub fn iter<S: Schema>(
342        &self, opts: ReadOptions,
343    ) -> Result<SchemaIterator<'_, S>> {
344        self.iter_with_direction::<S>(opts, ScanDirection::Forward)
345    }
346
347    /// Returns a backward [`SchemaIterator`] on a certain schema.
348    pub fn rev_iter<S: Schema>(
349        &self, opts: ReadOptions,
350    ) -> Result<SchemaIterator<'_, S>> {
351        self.iter_with_direction::<S>(opts, ScanDirection::Backward)
352    }
353
354    /// Writes a group of records wrapped in a [`SchemaBatch`].
355    pub fn write_schemas(
356        &self, batch: SchemaBatch, fast_write: bool,
357    ) -> Result<()> {
358        let db_batch = rocksdb::WriteBatch::default();
359        for (cf_name, rows) in &batch.rows {
360            let cf_handle = self.get_cf_handle(cf_name)?;
361            for (key, write_op) in rows {
362                match write_op {
363                    WriteOp::Value(value) => {
364                        db_batch.put_cf(cf_handle, key, value).unwrap()
365                    }
366                    WriteOp::Deletion => {
367                        db_batch.delete_cf(cf_handle, key).unwrap()
368                    }
369                }
370            }
371        }
372
373        let write_options = if fast_write {
374            fast_write_options()
375        } else {
376            default_write_options()
377        };
378        self.inner
379            .write_opt(&db_batch, &write_options)
380            .map_err(convert_rocksdb_err)?;
381
382        Ok(())
383    }
384
385    fn get_cf_handle(&self, cf_name: &str) -> Result<&rocksdb::CFHandle> {
386        self.inner.cf_handle(cf_name).ok_or_else(|| {
387            format_err!(
388                "DB::cf_handle not found for column family name: {}",
389                cf_name
390            )
391        })
392    }
393
394    /// Flushes all memtable data. This is only used for testing
395    /// `get_approximate_sizes_cf` in unit tests.
396    pub fn flush_all(&self, sync: bool) -> Result<()> {
397        for cf_name in &self.inner.cf_names() {
398            let cf_handle = self.get_cf_handle(cf_name)?;
399            self.inner
400                .flush_cf(cf_handle, sync)
401                .map_err(convert_rocksdb_err)?;
402        }
403        Ok(())
404    }
405
406    pub fn get_property(
407        &self, cf_name: &str, property_name: &str,
408    ) -> Result<u64> {
409        self.inner
410            .get_property_int_cf(self.get_cf_handle(&cf_name)?, property_name)
411            .ok_or_else(|| {
412                format_err!(
413                    "Unable to get property \"{}\" of  column family \"{}\".",
414                    property_name,
415                    cf_name,
416                )
417            })
418    }
419}
420
421/// For now we always use synchronous writes. This makes sure that once the
422/// operation returns `Ok(())` the data is persisted even if the machine
423/// crashes. In the future we might consider selectively turning this off for
424/// some non-critical writes to improve performance.
425fn default_write_options() -> rocksdb::WriteOptions {
426    let mut opts = rocksdb::WriteOptions::default();
427    opts.set_sync(true);
428    opts
429}
430
431fn fast_write_options() -> rocksdb::WriteOptions {
432    let mut opts = rocksdb::WriteOptions::default();
433    opts.set_sync(false);
434    // opts.disable_wal(true);
435    opts
436}