#![forbid(unsafe_code)]
mod metrics;
#[macro_use]
pub mod schema;
use crate::{
metrics::{
DIEM_SCHEMADB_BATCH_COMMIT_BYTES,
DIEM_SCHEMADB_BATCH_COMMIT_LATENCY_SECONDS, DIEM_SCHEMADB_DELETES,
DIEM_SCHEMADB_GET_BYTES, DIEM_SCHEMADB_GET_LATENCY_SECONDS,
DIEM_SCHEMADB_ITER_BYTES, DIEM_SCHEMADB_ITER_LATENCY_SECONDS,
DIEM_SCHEMADB_PUT_BYTES,
},
schema::{KeyCodec, Schema, SeekKeyCodec, ValueCodec},
};
use anyhow::{ensure, format_err, Result};
use diem_logger::prelude::*;
use rocksdb::Writable;
use std::{
collections::{BTreeMap, HashMap, HashSet},
iter::Iterator,
marker::PhantomData,
path::Path,
};
pub type ReadOptions = rocksdb::ReadOptions;
pub type Options = rocksdb::DBOptions;
pub type ColumnFamilyName = &'static str;
pub const DEFAULT_CF_NAME: ColumnFamilyName = "default";
#[derive(Debug)]
enum WriteOp {
Value(Vec<u8>),
Deletion,
}
#[derive(Debug, Default)]
pub struct SchemaBatch {
rows: HashMap<ColumnFamilyName, BTreeMap<Vec<u8>, WriteOp>>,
}
impl SchemaBatch {
pub fn new() -> Self { Self::default() }
pub fn put<S: Schema>(
&mut self, key: &S::Key, value: &S::Value,
) -> Result<()> {
let key = <S::Key as KeyCodec<S>>::encode_key(key)?;
let value = <S::Value as ValueCodec<S>>::encode_value(value)?;
self.rows
.entry(S::COLUMN_FAMILY_NAME)
.or_insert_with(BTreeMap::new)
.insert(key, WriteOp::Value(value));
Ok(())
}
pub fn delete<S: Schema>(&mut self, key: &S::Key) -> Result<()> {
let key = <S::Key as KeyCodec<S>>::encode_key(key)?;
self.rows
.entry(S::COLUMN_FAMILY_NAME)
.or_insert_with(BTreeMap::new)
.insert(key, WriteOp::Deletion);
Ok(())
}
}
pub enum ScanDirection {
Forward,
Backward,
}
pub struct SchemaIterator<'a, S> {
db_iter: rocksdb::DBIterator<&'a rocksdb::DB>,
direction: ScanDirection,
phantom: PhantomData<S>,
}
impl<'a, S> SchemaIterator<'a, S>
where S: Schema
{
fn new(
db_iter: rocksdb::DBIterator<&'a rocksdb::DB>, direction: ScanDirection,
) -> Self {
SchemaIterator {
db_iter,
direction,
phantom: PhantomData,
}
}
pub fn seek_to_first(&mut self) {
self.db_iter.seek(rocksdb::SeekKey::Start).unwrap();
}
pub fn seek_to_last(&mut self) {
self.db_iter.seek(rocksdb::SeekKey::End).unwrap();
}
pub fn seek<SK>(&mut self, seek_key: &SK) -> Result<()>
where SK: SeekKeyCodec<S> {
let key = <SK as SeekKeyCodec<S>>::encode_seek_key(seek_key)?;
self.db_iter.seek(rocksdb::SeekKey::Key(&key)).unwrap();
Ok(())
}
pub fn seek_for_prev<SK>(&mut self, seek_key: &SK) -> Result<()>
where SK: SeekKeyCodec<S> {
let key = <SK as SeekKeyCodec<S>>::encode_seek_key(seek_key)?;
self.db_iter
.seek_for_prev(rocksdb::SeekKey::Key(&key))
.unwrap();
Ok(())
}
fn next_impl(&mut self) -> Result<Option<(S::Key, S::Value)>> {
let _timer = DIEM_SCHEMADB_ITER_LATENCY_SECONDS
.with_label_values(&[S::COLUMN_FAMILY_NAME])
.start_timer();
if !self.db_iter.valid().unwrap() {
return Ok(None);
}
let raw_key = self.db_iter.key();
let raw_value = self.db_iter.value();
DIEM_SCHEMADB_ITER_BYTES
.with_label_values(&[S::COLUMN_FAMILY_NAME])
.observe((raw_key.len() + raw_value.len()) as f64);
let key = <S::Key as KeyCodec<S>>::decode_key(raw_key)?;
let value = <S::Value as ValueCodec<S>>::decode_value(raw_value)?;
match self.direction {
ScanDirection::Forward => self.db_iter.next().unwrap(),
ScanDirection::Backward => self.db_iter.prev().unwrap(),
};
Ok(Some((key, value)))
}
}
impl<'a, S> Iterator for SchemaIterator<'a, S>
where S: Schema
{
type Item = Result<(S::Key, S::Value)>;
fn next(&mut self) -> Option<Self::Item> { self.next_impl().transpose() }
}
fn convert_rocksdb_err(msg: String) -> anyhow::Error {
format_err!("RocksDB internal error: {}.", msg)
}
#[derive(Debug)]
pub struct DB {
name: &'static str, inner: rocksdb::DB,
}
impl DB {
pub fn open(
path: impl AsRef<Path>, name: &'static str,
column_families: Vec<ColumnFamilyName>, db_opts: Options,
) -> Result<Self> {
{
let cfs_set: HashSet<_> = column_families.iter().collect();
ensure!(
cfs_set.contains(&DEFAULT_CF_NAME),
"No \"default\" column family name is provided.",
);
ensure!(
cfs_set.len() == column_families.len(),
"Duplicate column family name found.",
);
}
let db = DB::open_cf(db_opts, path, name, column_families)?;
Ok(db)
}
pub fn open_readonly(
path: impl AsRef<Path>, name: &'static str,
column_families: Vec<ColumnFamilyName>, db_opts: Options,
) -> Result<Self> {
DB::open_cf_readonly(db_opts, path, name, column_families)
}
fn open_cf(
db_opts: Options, path: impl AsRef<Path>, name: &'static str,
column_families: Vec<ColumnFamilyName>,
) -> Result<DB> {
let inner = rocksdb::DB::open_cf(
db_opts,
path.as_ref().to_str().ok_or_else(|| {
format_err!(
"Path {:?} can not be converted to string.",
path.as_ref()
)
})?,
column_families
.iter()
.map(|cf_name| {
let cf_opts = rocksdb::ColumnFamilyOptions::default();
rocksdb::rocksdb_options::ColumnFamilyDescriptor::new(
*cf_name, cf_opts,
)
})
.collect(),
)
.map_err(convert_rocksdb_err)?;
Ok(Self::log_construct(name, inner))
}
fn open_cf_readonly(
opts: Options, path: impl AsRef<Path>, name: &'static str,
column_families: Vec<ColumnFamilyName>,
) -> Result<DB> {
let error_if_log_file_exists = false;
let inner = rocksdb::DB::open_cf_for_read_only(
opts,
path.as_ref().to_str().ok_or_else(|| {
format_err!(
"Path {:?} can not be converted to string.",
path.as_ref()
)
})?,
column_families
.iter()
.map(|cf_name| {
let cf_opts = rocksdb::ColumnFamilyOptions::default();
rocksdb::rocksdb_options::ColumnFamilyDescriptor::new(
*cf_name, cf_opts,
)
})
.collect(),
error_if_log_file_exists,
)
.map_err(convert_rocksdb_err)?;
Ok(Self::log_construct(name, inner))
}
fn log_construct(name: &'static str, inner: rocksdb::DB) -> DB {
diem_info!(rocksdb_name = name, "Opened RocksDB.");
DB { name, inner }
}
pub fn get<S: Schema>(
&self, schema_key: &S::Key,
) -> Result<Option<S::Value>> {
let _timer = DIEM_SCHEMADB_GET_LATENCY_SECONDS
.with_label_values(&[S::COLUMN_FAMILY_NAME])
.start_timer();
let k = <S::Key as KeyCodec<S>>::encode_key(&schema_key)?;
let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?;
let result = self
.inner
.get_cf(cf_handle, &k)
.map_err(convert_rocksdb_err)?;
DIEM_SCHEMADB_GET_BYTES
.with_label_values(&[S::COLUMN_FAMILY_NAME])
.observe(result.as_ref().map_or(0.0, |v| v.len() as f64));
result
.map(|raw_value| {
<S::Value as ValueCodec<S>>::decode_value(&raw_value)
})
.transpose()
}
pub fn put<S: Schema>(&self, key: &S::Key, value: &S::Value) -> Result<()> {
let mut batch = SchemaBatch::new();
batch.put::<S>(key, value)?;
self.write_schemas(batch, false)
}
pub fn range_delete<S, SK>(&self, begin: &SK, end: &SK) -> Result<()>
where
S: Schema,
SK: SeekKeyCodec<S>,
{
let raw_begin = begin.encode_seek_key()?;
let raw_end = end.encode_seek_key()?;
let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?;
self.inner
.delete_range_cf(&cf_handle, &raw_begin, &raw_end)
.map_err(convert_rocksdb_err)
}
fn iter_with_direction<S: Schema>(
&self, opts: ReadOptions, direction: ScanDirection,
) -> Result<SchemaIterator<S>> {
let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?;
Ok(SchemaIterator::new(
self.inner.iter_cf_opt(cf_handle, opts),
direction,
))
}
pub fn iter<S: Schema>(
&self, opts: ReadOptions,
) -> Result<SchemaIterator<S>> {
self.iter_with_direction::<S>(opts, ScanDirection::Forward)
}
pub fn rev_iter<S: Schema>(
&self, opts: ReadOptions,
) -> Result<SchemaIterator<S>> {
self.iter_with_direction::<S>(opts, ScanDirection::Backward)
}
pub fn write_schemas(
&self, batch: SchemaBatch, fast_write: bool,
) -> Result<()> {
let _timer = DIEM_SCHEMADB_BATCH_COMMIT_LATENCY_SECONDS
.with_label_values(&[self.name])
.start_timer();
let db_batch = rocksdb::WriteBatch::default();
for (cf_name, rows) in &batch.rows {
let cf_handle = self.get_cf_handle(cf_name)?;
for (key, write_op) in rows {
match write_op {
WriteOp::Value(value) => {
db_batch.put_cf(cf_handle, key, value).unwrap()
}
WriteOp::Deletion => {
db_batch.delete_cf(cf_handle, key).unwrap()
}
}
}
}
let serialized_size = db_batch.data_size();
let write_options = if fast_write {
fast_write_options()
} else {
default_write_options()
};
self.inner
.write_opt(&db_batch, &write_options)
.map_err(convert_rocksdb_err)?;
for (cf_name, rows) in &batch.rows {
for (key, write_op) in rows {
match write_op {
WriteOp::Value(value) => {
DIEM_SCHEMADB_PUT_BYTES
.with_label_values(&[cf_name])
.observe((key.len() + value.len()) as f64);
}
WriteOp::Deletion => {
DIEM_SCHEMADB_DELETES
.with_label_values(&[cf_name])
.inc();
}
}
}
}
DIEM_SCHEMADB_BATCH_COMMIT_BYTES
.with_label_values(&[self.name])
.observe(serialized_size as f64);
Ok(())
}
fn get_cf_handle(&self, cf_name: &str) -> Result<&rocksdb::CFHandle> {
self.inner.cf_handle(cf_name).ok_or_else(|| {
format_err!(
"DB::cf_handle not found for column family name: {}",
cf_name
)
})
}
pub fn flush_all(&self, sync: bool) -> Result<()> {
for cf_name in &self.inner.cf_names() {
let cf_handle = self.get_cf_handle(cf_name)?;
self.inner
.flush_cf(cf_handle, sync)
.map_err(convert_rocksdb_err)?;
}
Ok(())
}
pub fn get_property(
&self, cf_name: &str, property_name: &str,
) -> Result<u64> {
self.inner
.get_property_int_cf(self.get_cf_handle(&cf_name)?, property_name)
.ok_or_else(|| {
format_err!(
"Unable to get property \"{}\" of column family \"{}\".",
property_name,
cf_name,
)
})
}
}
fn default_write_options() -> rocksdb::WriteOptions {
let mut opts = rocksdb::WriteOptions::default();
opts.set_sync(true);
opts
}
fn fast_write_options() -> rocksdb::WriteOptions {
let mut opts = rocksdb::WriteOptions::default();
opts.set_sync(false);
opts
}