schemadb/
lib.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8#![forbid(unsafe_code)]
9
10//! This library implements a schematized DB on top of [RocksDB](https://rocksdb.org/). It makes
11//! sure all data passed in and out are structured according to predefined
12//! schemas and prevents access to raw keys and values. This library also
13//! enforces a set of Diem specific DB options, like custom comparators and
14//! schema-to-column-family mapping.
15//!
16//! It requires that different kinds of key-value pairs be stored in separate
17//! column families.  To use this library to store a kind of key-value pairs,
18//! the user needs to use the [`define_schema!`] macro to define the schema
19//! name, the types of key and value, and name of the column family.
20
21mod metrics;
22#[macro_use]
23pub mod schema;
24
25use crate::{
26    metrics::{
27        DIEM_SCHEMADB_BATCH_COMMIT_BYTES,
28        DIEM_SCHEMADB_BATCH_COMMIT_LATENCY_SECONDS, DIEM_SCHEMADB_DELETES,
29        DIEM_SCHEMADB_GET_BYTES, DIEM_SCHEMADB_GET_LATENCY_SECONDS,
30        DIEM_SCHEMADB_ITER_BYTES, DIEM_SCHEMADB_ITER_LATENCY_SECONDS,
31        DIEM_SCHEMADB_PUT_BYTES,
32    },
33    schema::{KeyCodec, Schema, SeekKeyCodec, ValueCodec},
34};
35use anyhow::{ensure, format_err, Result};
36use diem_logger::prelude::*;
37use rocksdb::Writable;
38use std::{
39    collections::{BTreeMap, HashMap, HashSet},
40    iter::Iterator,
41    marker::PhantomData,
42    path::Path,
43};
44
45/// Type alias to `rocksdb::ReadOptions`. See [`rocksdb doc`](https://github.com/pingcap/rust-rocksdb/blob/master/src/rocksdb_options.rs)
46pub type ReadOptions = rocksdb::ReadOptions;
47
48/// Type alias to `rocksdb::Options`.
49pub type Options = rocksdb::DBOptions;
50
51/// Type alias to improve readability.
52pub type ColumnFamilyName = &'static str;
53
54/// Name for the `default` column family that's always open by RocksDB. We use
55/// it to store [`LedgerInfo`](../types/ledger_info/struct.LedgerInfo.html).
56pub const DEFAULT_CF_NAME: ColumnFamilyName = "default";
57
58#[derive(Debug)]
59enum WriteOp {
60    Value(Vec<u8>),
61    Deletion,
62}
63
64/// `SchemaBatch` holds a collection of updates that can be applied to a DB
65/// atomically. The updates will be applied in the order in which they are added
66/// to the `SchemaBatch`.
67#[derive(Debug, Default)]
68pub struct SchemaBatch {
69    rows: HashMap<ColumnFamilyName, BTreeMap<Vec<u8>, WriteOp>>,
70}
71
72impl SchemaBatch {
73    /// Creates an empty batch.
74    pub fn new() -> Self { Self::default() }
75
76    /// Adds an insert/update operation to the batch.
77    pub fn put<S: Schema>(
78        &mut self, key: &S::Key, value: &S::Value,
79    ) -> Result<()> {
80        let key = <S::Key as KeyCodec<S>>::encode_key(key)?;
81        let value = <S::Value as ValueCodec<S>>::encode_value(value)?;
82        self.rows
83            .entry(S::COLUMN_FAMILY_NAME)
84            .or_insert_with(BTreeMap::new)
85            .insert(key, WriteOp::Value(value));
86
87        Ok(())
88    }
89
90    /// Adds a delete operation to the batch.
91    pub fn delete<S: Schema>(&mut self, key: &S::Key) -> Result<()> {
92        let key = <S::Key as KeyCodec<S>>::encode_key(key)?;
93        self.rows
94            .entry(S::COLUMN_FAMILY_NAME)
95            .or_insert_with(BTreeMap::new)
96            .insert(key, WriteOp::Deletion);
97
98        Ok(())
99    }
100}
101
102pub enum ScanDirection {
103    Forward,
104    Backward,
105}
106
107/// DB Iterator parameterized on [`Schema`] that seeks with [`Schema::Key`] and
108/// yields [`Schema::Key`] and [`Schema::Value`]
109pub struct SchemaIterator<'a, S> {
110    db_iter: rocksdb::DBIterator<&'a rocksdb::DB>,
111    direction: ScanDirection,
112    phantom: PhantomData<S>,
113}
114
115impl<'a, S> SchemaIterator<'a, S>
116where S: Schema
117{
118    fn new(
119        db_iter: rocksdb::DBIterator<&'a rocksdb::DB>, direction: ScanDirection,
120    ) -> Self {
121        SchemaIterator {
122            db_iter,
123            direction,
124            phantom: PhantomData,
125        }
126    }
127
128    /// Seeks to the first key.
129    pub fn seek_to_first(&mut self) {
130        self.db_iter.seek(rocksdb::SeekKey::Start).unwrap();
131    }
132
133    /// Seeks to the last key.
134    pub fn seek_to_last(&mut self) {
135        self.db_iter.seek(rocksdb::SeekKey::End).unwrap();
136    }
137
138    /// Seeks to the first key whose binary representation is equal to or
139    /// greater than that of the `seek_key`.
140    pub fn seek<SK>(&mut self, seek_key: &SK) -> Result<()>
141    where SK: SeekKeyCodec<S> {
142        let key = <SK as SeekKeyCodec<S>>::encode_seek_key(seek_key)?;
143        self.db_iter.seek(rocksdb::SeekKey::Key(&key)).unwrap();
144        Ok(())
145    }
146
147    /// Seeks to the last key whose binary representation is less than or equal
148    /// to that of the `seek_key`.
149    ///
150    /// See example in [`RocksDB doc`](https://github.com/facebook/rocksdb/wiki/SeekForPrev).
151    pub fn seek_for_prev<SK>(&mut self, seek_key: &SK) -> Result<()>
152    where SK: SeekKeyCodec<S> {
153        let key = <SK as SeekKeyCodec<S>>::encode_seek_key(seek_key)?;
154        self.db_iter
155            .seek_for_prev(rocksdb::SeekKey::Key(&key))
156            .unwrap();
157        Ok(())
158    }
159
160    fn next_impl(&mut self) -> Result<Option<(S::Key, S::Value)>> {
161        let _timer = DIEM_SCHEMADB_ITER_LATENCY_SECONDS
162            .with_label_values(&[S::COLUMN_FAMILY_NAME])
163            .start_timer();
164
165        if !self.db_iter.valid().unwrap() {
166            return Ok(None);
167        }
168
169        let raw_key = self.db_iter.key();
170        let raw_value = self.db_iter.value();
171        DIEM_SCHEMADB_ITER_BYTES
172            .with_label_values(&[S::COLUMN_FAMILY_NAME])
173            .observe((raw_key.len() + raw_value.len()) as f64);
174
175        let key = <S::Key as KeyCodec<S>>::decode_key(raw_key)?;
176        let value = <S::Value as ValueCodec<S>>::decode_value(raw_value)?;
177
178        match self.direction {
179            ScanDirection::Forward => self.db_iter.next().unwrap(),
180            ScanDirection::Backward => self.db_iter.prev().unwrap(),
181        };
182
183        Ok(Some((key, value)))
184    }
185}
186
187impl<'a, S> Iterator for SchemaIterator<'a, S>
188where S: Schema
189{
190    type Item = Result<(S::Key, S::Value)>;
191
192    fn next(&mut self) -> Option<Self::Item> { self.next_impl().transpose() }
193}
194
195/// All the RocksDB methods return `std::result::Result<T, String>`. Since our
196/// methods return `anyhow::Result<T>`, manual conversion is needed.
197fn convert_rocksdb_err(msg: String) -> anyhow::Error {
198    format_err!("RocksDB internal error: {}.", msg)
199}
200
201/// This DB is a schematized RocksDB wrapper where all data passed in and out
202/// are typed according to [`Schema`]s.
203#[derive(Debug)]
204pub struct DB {
205    name: &'static str, // for logging
206    inner: rocksdb::DB,
207}
208
209impl DB {
210    /// Create db with all the column families provided if it doesn't exist at
211    /// `path`; Otherwise, try to open it with all the column families.
212    pub fn open(
213        path: impl AsRef<Path>, name: &'static str,
214        column_families: Vec<ColumnFamilyName>, db_opts: Options,
215    ) -> Result<Self> {
216        {
217            let cfs_set: HashSet<_> = column_families.iter().collect();
218            ensure!(
219                cfs_set.contains(&DEFAULT_CF_NAME),
220                "No \"default\" column family name is provided.",
221            );
222            ensure!(
223                cfs_set.len() == column_families.len(),
224                "Duplicate column family name found.",
225            );
226        }
227
228        let db = DB::open_cf(db_opts, path, name, column_families)?;
229        Ok(db)
230    }
231
232    /// Open db in readonly mode
233    /// Note that this still assumes there's only one process that opens the
234    /// same DB. See `open_as_secondary`
235    pub fn open_readonly(
236        path: impl AsRef<Path>, name: &'static str,
237        column_families: Vec<ColumnFamilyName>, db_opts: Options,
238    ) -> Result<Self> {
239        DB::open_cf_readonly(db_opts, path, name, column_families)
240    }
241
242    fn open_cf(
243        db_opts: Options, path: impl AsRef<Path>, name: &'static str,
244        column_families: Vec<ColumnFamilyName>,
245    ) -> Result<DB> {
246        let inner = rocksdb::DB::open_cf(
247            db_opts,
248            path.as_ref().to_str().ok_or_else(|| {
249                format_err!(
250                    "Path {:?} can not be converted to string.",
251                    path.as_ref()
252                )
253            })?,
254            column_families
255                .iter()
256                .map(|cf_name| {
257                    let cf_opts = rocksdb::ColumnFamilyOptions::default();
258                    rocksdb::rocksdb_options::ColumnFamilyDescriptor::new(
259                        *cf_name, cf_opts,
260                    )
261                })
262                .collect(),
263        )
264        .map_err(convert_rocksdb_err)?;
265        Ok(Self::log_construct(name, inner))
266    }
267
268    fn open_cf_readonly(
269        opts: Options, path: impl AsRef<Path>, name: &'static str,
270        column_families: Vec<ColumnFamilyName>,
271    ) -> Result<DB> {
272        let error_if_log_file_exists = false;
273        let inner = rocksdb::DB::open_cf_for_read_only(
274            opts,
275            path.as_ref().to_str().ok_or_else(|| {
276                format_err!(
277                    "Path {:?} can not be converted to string.",
278                    path.as_ref()
279                )
280            })?,
281            column_families
282                .iter()
283                .map(|cf_name| {
284                    let cf_opts = rocksdb::ColumnFamilyOptions::default();
285                    rocksdb::rocksdb_options::ColumnFamilyDescriptor::new(
286                        *cf_name, cf_opts,
287                    )
288                })
289                .collect(),
290            error_if_log_file_exists,
291        )
292        .map_err(convert_rocksdb_err)?;
293
294        Ok(Self::log_construct(name, inner))
295    }
296
297    fn log_construct(name: &'static str, inner: rocksdb::DB) -> DB {
298        diem_info!(rocksdb_name = name, "Opened RocksDB.");
299        DB { name, inner }
300    }
301
302    /// Reads single record by key.
303    pub fn get<S: Schema>(
304        &self, schema_key: &S::Key,
305    ) -> Result<Option<S::Value>> {
306        let _timer = DIEM_SCHEMADB_GET_LATENCY_SECONDS
307            .with_label_values(&[S::COLUMN_FAMILY_NAME])
308            .start_timer();
309
310        let k = <S::Key as KeyCodec<S>>::encode_key(&schema_key)?;
311        let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?;
312
313        let result = self
314            .inner
315            .get_cf(cf_handle, &k)
316            .map_err(convert_rocksdb_err)?;
317        DIEM_SCHEMADB_GET_BYTES
318            .with_label_values(&[S::COLUMN_FAMILY_NAME])
319            .observe(result.as_ref().map_or(0.0, |v| v.len() as f64));
320
321        result
322            .map(|raw_value| {
323                <S::Value as ValueCodec<S>>::decode_value(&raw_value)
324            })
325            .transpose()
326    }
327
328    /// Writes single record.
329    pub fn put<S: Schema>(&self, key: &S::Key, value: &S::Value) -> Result<()> {
330        // Not necessary to use a batch, but we'd like a central place to bump
331        // counters. Used in tests only anyway.
332        let mut batch = SchemaBatch::new();
333        batch.put::<S>(key, value)?;
334        self.write_schemas(batch, false)
335    }
336
337    /// Delete all keys in range [begin, end).
338    ///
339    /// `SK` has to be an explicit type parameter since
340    /// <https://github.com/rust-lang/rust/issues/44721>
341    pub fn range_delete<S, SK>(&self, begin: &SK, end: &SK) -> Result<()>
342    where
343        S: Schema,
344        SK: SeekKeyCodec<S>,
345    {
346        let raw_begin = begin.encode_seek_key()?;
347        let raw_end = end.encode_seek_key()?;
348        let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?;
349
350        self.inner
351            .delete_range_cf(&cf_handle, &raw_begin, &raw_end)
352            .map_err(convert_rocksdb_err)
353    }
354
355    fn iter_with_direction<S: Schema>(
356        &self, opts: ReadOptions, direction: ScanDirection,
357    ) -> Result<SchemaIterator<'_, S>> {
358        let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?;
359        Ok(SchemaIterator::new(
360            self.inner.iter_cf_opt(cf_handle, opts),
361            direction,
362        ))
363    }
364
365    /// Returns a forward [`SchemaIterator`] on a certain schema.
366    pub fn iter<S: Schema>(
367        &self, opts: ReadOptions,
368    ) -> Result<SchemaIterator<'_, S>> {
369        self.iter_with_direction::<S>(opts, ScanDirection::Forward)
370    }
371
372    /// Returns a backward [`SchemaIterator`] on a certain schema.
373    pub fn rev_iter<S: Schema>(
374        &self, opts: ReadOptions,
375    ) -> Result<SchemaIterator<'_, S>> {
376        self.iter_with_direction::<S>(opts, ScanDirection::Backward)
377    }
378
379    /// Writes a group of records wrapped in a [`SchemaBatch`].
380    pub fn write_schemas(
381        &self, batch: SchemaBatch, fast_write: bool,
382    ) -> Result<()> {
383        let _timer = DIEM_SCHEMADB_BATCH_COMMIT_LATENCY_SECONDS
384            .with_label_values(&[self.name])
385            .start_timer();
386
387        let db_batch = rocksdb::WriteBatch::default();
388        for (cf_name, rows) in &batch.rows {
389            let cf_handle = self.get_cf_handle(cf_name)?;
390            for (key, write_op) in rows {
391                match write_op {
392                    WriteOp::Value(value) => {
393                        db_batch.put_cf(cf_handle, key, value).unwrap()
394                    }
395                    WriteOp::Deletion => {
396                        db_batch.delete_cf(cf_handle, key).unwrap()
397                    }
398                }
399            }
400        }
401        let serialized_size = db_batch.data_size();
402
403        let write_options = if fast_write {
404            fast_write_options()
405        } else {
406            default_write_options()
407        };
408        self.inner
409            .write_opt(&db_batch, &write_options)
410            .map_err(convert_rocksdb_err)?;
411
412        // Bump counters only after DB write succeeds.
413        for (cf_name, rows) in &batch.rows {
414            for (key, write_op) in rows {
415                match write_op {
416                    WriteOp::Value(value) => {
417                        DIEM_SCHEMADB_PUT_BYTES
418                            .with_label_values(&[cf_name])
419                            .observe((key.len() + value.len()) as f64);
420                    }
421                    WriteOp::Deletion => {
422                        DIEM_SCHEMADB_DELETES
423                            .with_label_values(&[cf_name])
424                            .inc();
425                    }
426                }
427            }
428        }
429        DIEM_SCHEMADB_BATCH_COMMIT_BYTES
430            .with_label_values(&[self.name])
431            .observe(serialized_size as f64);
432
433        Ok(())
434    }
435
436    fn get_cf_handle(&self, cf_name: &str) -> Result<&rocksdb::CFHandle> {
437        self.inner.cf_handle(cf_name).ok_or_else(|| {
438            format_err!(
439                "DB::cf_handle not found for column family name: {}",
440                cf_name
441            )
442        })
443    }
444
445    /// Flushes all memtable data. This is only used for testing
446    /// `get_approximate_sizes_cf` in unit tests.
447    pub fn flush_all(&self, sync: bool) -> Result<()> {
448        for cf_name in &self.inner.cf_names() {
449            let cf_handle = self.get_cf_handle(cf_name)?;
450            self.inner
451                .flush_cf(cf_handle, sync)
452                .map_err(convert_rocksdb_err)?;
453        }
454        Ok(())
455    }
456
457    pub fn get_property(
458        &self, cf_name: &str, property_name: &str,
459    ) -> Result<u64> {
460        self.inner
461            .get_property_int_cf(self.get_cf_handle(&cf_name)?, property_name)
462            .ok_or_else(|| {
463                format_err!(
464                    "Unable to get property \"{}\" of  column family \"{}\".",
465                    property_name,
466                    cf_name,
467                )
468            })
469    }
470}
471
472/// For now we always use synchronous writes. This makes sure that once the
473/// operation returns `Ok(())` the data is persisted even if the machine
474/// crashes. In the future we might consider selectively turning this off for
475/// some non-critical writes to improve performance.
476fn default_write_options() -> rocksdb::WriteOptions {
477    let mut opts = rocksdb::WriteOptions::default();
478    opts.set_sync(true);
479    opts
480}
481
482fn fast_write_options() -> rocksdb::WriteOptions {
483    let mut opts = rocksdb::WriteOptions::default();
484    opts.set_sync(false);
485    // opts.disable_wal(true);
486    opts
487}