1#![forbid(unsafe_code)]
9
10#[macro_use]
22pub mod schema;
23
24use crate::schema::{KeyCodec, Schema, SeekKeyCodec, ValueCodec};
25use anyhow::{ensure, format_err, Result};
26use diem_logger::prelude::*;
27use rocksdb::Writable;
28use std::{
29 collections::{BTreeMap, HashMap, HashSet},
30 iter::Iterator,
31 marker::PhantomData,
32 path::Path,
33};
34
35pub type ReadOptions = rocksdb::ReadOptions;
37
38pub type Options = rocksdb::DBOptions;
40
41pub type ColumnFamilyName = &'static str;
43
44pub const DEFAULT_CF_NAME: ColumnFamilyName = "default";
47
48#[derive(Debug)]
49enum WriteOp {
50 Value(Vec<u8>),
51 Deletion,
52}
53
54#[derive(Debug, Default)]
58pub struct SchemaBatch {
59 rows: HashMap<ColumnFamilyName, BTreeMap<Vec<u8>, WriteOp>>,
60}
61
62impl SchemaBatch {
63 pub fn new() -> Self { Self::default() }
65
66 pub fn put<S: Schema>(
68 &mut self, key: &S::Key, value: &S::Value,
69 ) -> Result<()> {
70 let key = <S::Key as KeyCodec<S>>::encode_key(key)?;
71 let value = <S::Value as ValueCodec<S>>::encode_value(value)?;
72 self.rows
73 .entry(S::COLUMN_FAMILY_NAME)
74 .or_insert_with(BTreeMap::new)
75 .insert(key, WriteOp::Value(value));
76
77 Ok(())
78 }
79
80 pub fn delete<S: Schema>(&mut self, key: &S::Key) -> Result<()> {
82 let key = <S::Key as KeyCodec<S>>::encode_key(key)?;
83 self.rows
84 .entry(S::COLUMN_FAMILY_NAME)
85 .or_insert_with(BTreeMap::new)
86 .insert(key, WriteOp::Deletion);
87
88 Ok(())
89 }
90}
91
92pub enum ScanDirection {
93 Forward,
94 Backward,
95}
96
97pub struct SchemaIterator<'a, S> {
100 db_iter: rocksdb::DBIterator<&'a rocksdb::DB>,
101 direction: ScanDirection,
102 phantom: PhantomData<S>,
103}
104
105impl<'a, S> SchemaIterator<'a, S>
106where S: Schema
107{
108 fn new(
109 db_iter: rocksdb::DBIterator<&'a rocksdb::DB>, direction: ScanDirection,
110 ) -> Self {
111 SchemaIterator {
112 db_iter,
113 direction,
114 phantom: PhantomData,
115 }
116 }
117
118 pub fn seek_to_first(&mut self) {
120 self.db_iter.seek(rocksdb::SeekKey::Start).unwrap();
121 }
122
123 pub fn seek_to_last(&mut self) {
125 self.db_iter.seek(rocksdb::SeekKey::End).unwrap();
126 }
127
128 pub fn seek<SK>(&mut self, seek_key: &SK) -> Result<()>
131 where SK: SeekKeyCodec<S> {
132 let key = <SK as SeekKeyCodec<S>>::encode_seek_key(seek_key)?;
133 self.db_iter.seek(rocksdb::SeekKey::Key(&key)).unwrap();
134 Ok(())
135 }
136
137 pub fn seek_for_prev<SK>(&mut self, seek_key: &SK) -> Result<()>
142 where SK: SeekKeyCodec<S> {
143 let key = <SK as SeekKeyCodec<S>>::encode_seek_key(seek_key)?;
144 self.db_iter
145 .seek_for_prev(rocksdb::SeekKey::Key(&key))
146 .unwrap();
147 Ok(())
148 }
149
150 fn next_impl(&mut self) -> Result<Option<(S::Key, S::Value)>> {
151 if !self.db_iter.valid().unwrap() {
152 return Ok(None);
153 }
154
155 let raw_key = self.db_iter.key();
156 let raw_value = self.db_iter.value();
157
158 let key = <S::Key as KeyCodec<S>>::decode_key(raw_key)?;
159 let value = <S::Value as ValueCodec<S>>::decode_value(raw_value)?;
160
161 match self.direction {
162 ScanDirection::Forward => self.db_iter.next().unwrap(),
163 ScanDirection::Backward => self.db_iter.prev().unwrap(),
164 };
165
166 Ok(Some((key, value)))
167 }
168}
169
170impl<'a, S> Iterator for SchemaIterator<'a, S>
171where S: Schema
172{
173 type Item = Result<(S::Key, S::Value)>;
174
175 fn next(&mut self) -> Option<Self::Item> { self.next_impl().transpose() }
176}
177
178fn convert_rocksdb_err(msg: String) -> anyhow::Error {
181 format_err!("RocksDB internal error: {}.", msg)
182}
183
184#[derive(Debug)]
187pub struct DB {
188 inner: rocksdb::DB,
189}
190
191impl DB {
192 pub fn open(
195 path: impl AsRef<Path>, name: &'static str,
196 column_families: Vec<ColumnFamilyName>, db_opts: Options,
197 ) -> Result<Self> {
198 {
199 let cfs_set: HashSet<_> = column_families.iter().collect();
200 ensure!(
201 cfs_set.contains(&DEFAULT_CF_NAME),
202 "No \"default\" column family name is provided.",
203 );
204 ensure!(
205 cfs_set.len() == column_families.len(),
206 "Duplicate column family name found.",
207 );
208 }
209
210 let db = DB::open_cf(db_opts, path, name, column_families)?;
211 Ok(db)
212 }
213
214 pub fn open_readonly(
218 path: impl AsRef<Path>, name: &'static str,
219 column_families: Vec<ColumnFamilyName>, db_opts: Options,
220 ) -> Result<Self> {
221 DB::open_cf_readonly(db_opts, path, name, column_families)
222 }
223
224 fn open_cf(
225 db_opts: Options, path: impl AsRef<Path>, name: &'static str,
226 column_families: Vec<ColumnFamilyName>,
227 ) -> Result<DB> {
228 let inner = rocksdb::DB::open_cf(
229 db_opts,
230 path.as_ref().to_str().ok_or_else(|| {
231 format_err!(
232 "Path {:?} can not be converted to string.",
233 path.as_ref()
234 )
235 })?,
236 column_families
237 .iter()
238 .map(|cf_name| {
239 let cf_opts = rocksdb::ColumnFamilyOptions::default();
240 rocksdb::rocksdb_options::ColumnFamilyDescriptor::new(
241 *cf_name, cf_opts,
242 )
243 })
244 .collect(),
245 )
246 .map_err(convert_rocksdb_err)?;
247 Ok(Self::log_construct(name, inner))
248 }
249
250 fn open_cf_readonly(
251 opts: Options, path: impl AsRef<Path>, name: &'static str,
252 column_families: Vec<ColumnFamilyName>,
253 ) -> Result<DB> {
254 let error_if_log_file_exists = false;
255 let inner = rocksdb::DB::open_cf_for_read_only(
256 opts,
257 path.as_ref().to_str().ok_or_else(|| {
258 format_err!(
259 "Path {:?} can not be converted to string.",
260 path.as_ref()
261 )
262 })?,
263 column_families
264 .iter()
265 .map(|cf_name| {
266 let cf_opts = rocksdb::ColumnFamilyOptions::default();
267 rocksdb::rocksdb_options::ColumnFamilyDescriptor::new(
268 *cf_name, cf_opts,
269 )
270 })
271 .collect(),
272 error_if_log_file_exists,
273 )
274 .map_err(convert_rocksdb_err)?;
275
276 Ok(Self::log_construct(name, inner))
277 }
278
279 fn log_construct(name: &'static str, inner: rocksdb::DB) -> DB {
280 diem_info!(rocksdb_name = name, "Opened RocksDB.");
281 DB { inner }
282 }
283
284 pub fn get<S: Schema>(
286 &self, schema_key: &S::Key,
287 ) -> Result<Option<S::Value>> {
288 let k = <S::Key as KeyCodec<S>>::encode_key(&schema_key)?;
289 let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?;
290
291 let result = self
292 .inner
293 .get_cf(cf_handle, &k)
294 .map_err(convert_rocksdb_err)?;
295
296 result
297 .map(|raw_value| {
298 <S::Value as ValueCodec<S>>::decode_value(&raw_value)
299 })
300 .transpose()
301 }
302
303 pub fn put<S: Schema>(&self, key: &S::Key, value: &S::Value) -> Result<()> {
305 let mut batch = SchemaBatch::new();
308 batch.put::<S>(key, value)?;
309 self.write_schemas(batch, false)
310 }
311
312 pub fn range_delete<S, SK>(&self, begin: &SK, end: &SK) -> Result<()>
317 where
318 S: Schema,
319 SK: SeekKeyCodec<S>,
320 {
321 let raw_begin = begin.encode_seek_key()?;
322 let raw_end = end.encode_seek_key()?;
323 let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?;
324
325 self.inner
326 .delete_range_cf(&cf_handle, &raw_begin, &raw_end)
327 .map_err(convert_rocksdb_err)
328 }
329
330 fn iter_with_direction<S: Schema>(
331 &self, opts: ReadOptions, direction: ScanDirection,
332 ) -> Result<SchemaIterator<'_, S>> {
333 let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?;
334 Ok(SchemaIterator::new(
335 self.inner.iter_cf_opt(cf_handle, opts),
336 direction,
337 ))
338 }
339
340 pub fn iter<S: Schema>(
342 &self, opts: ReadOptions,
343 ) -> Result<SchemaIterator<'_, S>> {
344 self.iter_with_direction::<S>(opts, ScanDirection::Forward)
345 }
346
347 pub fn rev_iter<S: Schema>(
349 &self, opts: ReadOptions,
350 ) -> Result<SchemaIterator<'_, S>> {
351 self.iter_with_direction::<S>(opts, ScanDirection::Backward)
352 }
353
354 pub fn write_schemas(
356 &self, batch: SchemaBatch, fast_write: bool,
357 ) -> Result<()> {
358 let db_batch = rocksdb::WriteBatch::default();
359 for (cf_name, rows) in &batch.rows {
360 let cf_handle = self.get_cf_handle(cf_name)?;
361 for (key, write_op) in rows {
362 match write_op {
363 WriteOp::Value(value) => {
364 db_batch.put_cf(cf_handle, key, value).unwrap()
365 }
366 WriteOp::Deletion => {
367 db_batch.delete_cf(cf_handle, key).unwrap()
368 }
369 }
370 }
371 }
372
373 let write_options = if fast_write {
374 fast_write_options()
375 } else {
376 default_write_options()
377 };
378 self.inner
379 .write_opt(&db_batch, &write_options)
380 .map_err(convert_rocksdb_err)?;
381
382 Ok(())
383 }
384
385 fn get_cf_handle(&self, cf_name: &str) -> Result<&rocksdb::CFHandle> {
386 self.inner.cf_handle(cf_name).ok_or_else(|| {
387 format_err!(
388 "DB::cf_handle not found for column family name: {}",
389 cf_name
390 )
391 })
392 }
393
394 pub fn flush_all(&self, sync: bool) -> Result<()> {
397 for cf_name in &self.inner.cf_names() {
398 let cf_handle = self.get_cf_handle(cf_name)?;
399 self.inner
400 .flush_cf(cf_handle, sync)
401 .map_err(convert_rocksdb_err)?;
402 }
403 Ok(())
404 }
405
406 pub fn get_property(
407 &self, cf_name: &str, property_name: &str,
408 ) -> Result<u64> {
409 self.inner
410 .get_property_int_cf(self.get_cf_handle(&cf_name)?, property_name)
411 .ok_or_else(|| {
412 format_err!(
413 "Unable to get property \"{}\" of column family \"{}\".",
414 property_name,
415 cf_name,
416 )
417 })
418 }
419}
420
421fn default_write_options() -> rocksdb::WriteOptions {
426 let mut opts = rocksdb::WriteOptions::default();
427 opts.set_sync(true);
428 opts
429}
430
431fn fast_write_options() -> rocksdb::WriteOptions {
432 let mut opts = rocksdb::WriteOptions::default();
433 opts.set_sync(false);
434 opts
436}