1#![forbid(unsafe_code)]
9
10mod metrics;
22#[macro_use]
23pub mod schema;
24
25use crate::{
26 metrics::{
27 DIEM_SCHEMADB_BATCH_COMMIT_BYTES,
28 DIEM_SCHEMADB_BATCH_COMMIT_LATENCY_SECONDS, DIEM_SCHEMADB_DELETES,
29 DIEM_SCHEMADB_GET_BYTES, DIEM_SCHEMADB_GET_LATENCY_SECONDS,
30 DIEM_SCHEMADB_ITER_BYTES, DIEM_SCHEMADB_ITER_LATENCY_SECONDS,
31 DIEM_SCHEMADB_PUT_BYTES,
32 },
33 schema::{KeyCodec, Schema, SeekKeyCodec, ValueCodec},
34};
35use anyhow::{ensure, format_err, Result};
36use diem_logger::prelude::*;
37use rocksdb::Writable;
38use std::{
39 collections::{BTreeMap, HashMap, HashSet},
40 iter::Iterator,
41 marker::PhantomData,
42 path::Path,
43};
44
45pub type ReadOptions = rocksdb::ReadOptions;
47
48pub type Options = rocksdb::DBOptions;
50
51pub type ColumnFamilyName = &'static str;
53
54pub const DEFAULT_CF_NAME: ColumnFamilyName = "default";
57
58#[derive(Debug)]
59enum WriteOp {
60 Value(Vec<u8>),
61 Deletion,
62}
63
64#[derive(Debug, Default)]
68pub struct SchemaBatch {
69 rows: HashMap<ColumnFamilyName, BTreeMap<Vec<u8>, WriteOp>>,
70}
71
72impl SchemaBatch {
73 pub fn new() -> Self { Self::default() }
75
76 pub fn put<S: Schema>(
78 &mut self, key: &S::Key, value: &S::Value,
79 ) -> Result<()> {
80 let key = <S::Key as KeyCodec<S>>::encode_key(key)?;
81 let value = <S::Value as ValueCodec<S>>::encode_value(value)?;
82 self.rows
83 .entry(S::COLUMN_FAMILY_NAME)
84 .or_insert_with(BTreeMap::new)
85 .insert(key, WriteOp::Value(value));
86
87 Ok(())
88 }
89
90 pub fn delete<S: Schema>(&mut self, key: &S::Key) -> Result<()> {
92 let key = <S::Key as KeyCodec<S>>::encode_key(key)?;
93 self.rows
94 .entry(S::COLUMN_FAMILY_NAME)
95 .or_insert_with(BTreeMap::new)
96 .insert(key, WriteOp::Deletion);
97
98 Ok(())
99 }
100}
101
102pub enum ScanDirection {
103 Forward,
104 Backward,
105}
106
107pub struct SchemaIterator<'a, S> {
110 db_iter: rocksdb::DBIterator<&'a rocksdb::DB>,
111 direction: ScanDirection,
112 phantom: PhantomData<S>,
113}
114
115impl<'a, S> SchemaIterator<'a, S>
116where S: Schema
117{
118 fn new(
119 db_iter: rocksdb::DBIterator<&'a rocksdb::DB>, direction: ScanDirection,
120 ) -> Self {
121 SchemaIterator {
122 db_iter,
123 direction,
124 phantom: PhantomData,
125 }
126 }
127
128 pub fn seek_to_first(&mut self) {
130 self.db_iter.seek(rocksdb::SeekKey::Start).unwrap();
131 }
132
133 pub fn seek_to_last(&mut self) {
135 self.db_iter.seek(rocksdb::SeekKey::End).unwrap();
136 }
137
138 pub fn seek<SK>(&mut self, seek_key: &SK) -> Result<()>
141 where SK: SeekKeyCodec<S> {
142 let key = <SK as SeekKeyCodec<S>>::encode_seek_key(seek_key)?;
143 self.db_iter.seek(rocksdb::SeekKey::Key(&key)).unwrap();
144 Ok(())
145 }
146
147 pub fn seek_for_prev<SK>(&mut self, seek_key: &SK) -> Result<()>
152 where SK: SeekKeyCodec<S> {
153 let key = <SK as SeekKeyCodec<S>>::encode_seek_key(seek_key)?;
154 self.db_iter
155 .seek_for_prev(rocksdb::SeekKey::Key(&key))
156 .unwrap();
157 Ok(())
158 }
159
160 fn next_impl(&mut self) -> Result<Option<(S::Key, S::Value)>> {
161 let _timer = DIEM_SCHEMADB_ITER_LATENCY_SECONDS
162 .with_label_values(&[S::COLUMN_FAMILY_NAME])
163 .start_timer();
164
165 if !self.db_iter.valid().unwrap() {
166 return Ok(None);
167 }
168
169 let raw_key = self.db_iter.key();
170 let raw_value = self.db_iter.value();
171 DIEM_SCHEMADB_ITER_BYTES
172 .with_label_values(&[S::COLUMN_FAMILY_NAME])
173 .observe((raw_key.len() + raw_value.len()) as f64);
174
175 let key = <S::Key as KeyCodec<S>>::decode_key(raw_key)?;
176 let value = <S::Value as ValueCodec<S>>::decode_value(raw_value)?;
177
178 match self.direction {
179 ScanDirection::Forward => self.db_iter.next().unwrap(),
180 ScanDirection::Backward => self.db_iter.prev().unwrap(),
181 };
182
183 Ok(Some((key, value)))
184 }
185}
186
187impl<'a, S> Iterator for SchemaIterator<'a, S>
188where S: Schema
189{
190 type Item = Result<(S::Key, S::Value)>;
191
192 fn next(&mut self) -> Option<Self::Item> { self.next_impl().transpose() }
193}
194
195fn convert_rocksdb_err(msg: String) -> anyhow::Error {
198 format_err!("RocksDB internal error: {}.", msg)
199}
200
201#[derive(Debug)]
204pub struct DB {
205 name: &'static str, inner: rocksdb::DB,
207}
208
209impl DB {
210 pub fn open(
213 path: impl AsRef<Path>, name: &'static str,
214 column_families: Vec<ColumnFamilyName>, db_opts: Options,
215 ) -> Result<Self> {
216 {
217 let cfs_set: HashSet<_> = column_families.iter().collect();
218 ensure!(
219 cfs_set.contains(&DEFAULT_CF_NAME),
220 "No \"default\" column family name is provided.",
221 );
222 ensure!(
223 cfs_set.len() == column_families.len(),
224 "Duplicate column family name found.",
225 );
226 }
227
228 let db = DB::open_cf(db_opts, path, name, column_families)?;
229 Ok(db)
230 }
231
232 pub fn open_readonly(
236 path: impl AsRef<Path>, name: &'static str,
237 column_families: Vec<ColumnFamilyName>, db_opts: Options,
238 ) -> Result<Self> {
239 DB::open_cf_readonly(db_opts, path, name, column_families)
240 }
241
242 fn open_cf(
243 db_opts: Options, path: impl AsRef<Path>, name: &'static str,
244 column_families: Vec<ColumnFamilyName>,
245 ) -> Result<DB> {
246 let inner = rocksdb::DB::open_cf(
247 db_opts,
248 path.as_ref().to_str().ok_or_else(|| {
249 format_err!(
250 "Path {:?} can not be converted to string.",
251 path.as_ref()
252 )
253 })?,
254 column_families
255 .iter()
256 .map(|cf_name| {
257 let cf_opts = rocksdb::ColumnFamilyOptions::default();
258 rocksdb::rocksdb_options::ColumnFamilyDescriptor::new(
259 *cf_name, cf_opts,
260 )
261 })
262 .collect(),
263 )
264 .map_err(convert_rocksdb_err)?;
265 Ok(Self::log_construct(name, inner))
266 }
267
268 fn open_cf_readonly(
269 opts: Options, path: impl AsRef<Path>, name: &'static str,
270 column_families: Vec<ColumnFamilyName>,
271 ) -> Result<DB> {
272 let error_if_log_file_exists = false;
273 let inner = rocksdb::DB::open_cf_for_read_only(
274 opts,
275 path.as_ref().to_str().ok_or_else(|| {
276 format_err!(
277 "Path {:?} can not be converted to string.",
278 path.as_ref()
279 )
280 })?,
281 column_families
282 .iter()
283 .map(|cf_name| {
284 let cf_opts = rocksdb::ColumnFamilyOptions::default();
285 rocksdb::rocksdb_options::ColumnFamilyDescriptor::new(
286 *cf_name, cf_opts,
287 )
288 })
289 .collect(),
290 error_if_log_file_exists,
291 )
292 .map_err(convert_rocksdb_err)?;
293
294 Ok(Self::log_construct(name, inner))
295 }
296
297 fn log_construct(name: &'static str, inner: rocksdb::DB) -> DB {
298 diem_info!(rocksdb_name = name, "Opened RocksDB.");
299 DB { name, inner }
300 }
301
302 pub fn get<S: Schema>(
304 &self, schema_key: &S::Key,
305 ) -> Result<Option<S::Value>> {
306 let _timer = DIEM_SCHEMADB_GET_LATENCY_SECONDS
307 .with_label_values(&[S::COLUMN_FAMILY_NAME])
308 .start_timer();
309
310 let k = <S::Key as KeyCodec<S>>::encode_key(&schema_key)?;
311 let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?;
312
313 let result = self
314 .inner
315 .get_cf(cf_handle, &k)
316 .map_err(convert_rocksdb_err)?;
317 DIEM_SCHEMADB_GET_BYTES
318 .with_label_values(&[S::COLUMN_FAMILY_NAME])
319 .observe(result.as_ref().map_or(0.0, |v| v.len() as f64));
320
321 result
322 .map(|raw_value| {
323 <S::Value as ValueCodec<S>>::decode_value(&raw_value)
324 })
325 .transpose()
326 }
327
328 pub fn put<S: Schema>(&self, key: &S::Key, value: &S::Value) -> Result<()> {
330 let mut batch = SchemaBatch::new();
333 batch.put::<S>(key, value)?;
334 self.write_schemas(batch, false)
335 }
336
337 pub fn range_delete<S, SK>(&self, begin: &SK, end: &SK) -> Result<()>
342 where
343 S: Schema,
344 SK: SeekKeyCodec<S>,
345 {
346 let raw_begin = begin.encode_seek_key()?;
347 let raw_end = end.encode_seek_key()?;
348 let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?;
349
350 self.inner
351 .delete_range_cf(&cf_handle, &raw_begin, &raw_end)
352 .map_err(convert_rocksdb_err)
353 }
354
355 fn iter_with_direction<S: Schema>(
356 &self, opts: ReadOptions, direction: ScanDirection,
357 ) -> Result<SchemaIterator<'_, S>> {
358 let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?;
359 Ok(SchemaIterator::new(
360 self.inner.iter_cf_opt(cf_handle, opts),
361 direction,
362 ))
363 }
364
365 pub fn iter<S: Schema>(
367 &self, opts: ReadOptions,
368 ) -> Result<SchemaIterator<'_, S>> {
369 self.iter_with_direction::<S>(opts, ScanDirection::Forward)
370 }
371
372 pub fn rev_iter<S: Schema>(
374 &self, opts: ReadOptions,
375 ) -> Result<SchemaIterator<'_, S>> {
376 self.iter_with_direction::<S>(opts, ScanDirection::Backward)
377 }
378
379 pub fn write_schemas(
381 &self, batch: SchemaBatch, fast_write: bool,
382 ) -> Result<()> {
383 let _timer = DIEM_SCHEMADB_BATCH_COMMIT_LATENCY_SECONDS
384 .with_label_values(&[self.name])
385 .start_timer();
386
387 let db_batch = rocksdb::WriteBatch::default();
388 for (cf_name, rows) in &batch.rows {
389 let cf_handle = self.get_cf_handle(cf_name)?;
390 for (key, write_op) in rows {
391 match write_op {
392 WriteOp::Value(value) => {
393 db_batch.put_cf(cf_handle, key, value).unwrap()
394 }
395 WriteOp::Deletion => {
396 db_batch.delete_cf(cf_handle, key).unwrap()
397 }
398 }
399 }
400 }
401 let serialized_size = db_batch.data_size();
402
403 let write_options = if fast_write {
404 fast_write_options()
405 } else {
406 default_write_options()
407 };
408 self.inner
409 .write_opt(&db_batch, &write_options)
410 .map_err(convert_rocksdb_err)?;
411
412 for (cf_name, rows) in &batch.rows {
414 for (key, write_op) in rows {
415 match write_op {
416 WriteOp::Value(value) => {
417 DIEM_SCHEMADB_PUT_BYTES
418 .with_label_values(&[cf_name])
419 .observe((key.len() + value.len()) as f64);
420 }
421 WriteOp::Deletion => {
422 DIEM_SCHEMADB_DELETES
423 .with_label_values(&[cf_name])
424 .inc();
425 }
426 }
427 }
428 }
429 DIEM_SCHEMADB_BATCH_COMMIT_BYTES
430 .with_label_values(&[self.name])
431 .observe(serialized_size as f64);
432
433 Ok(())
434 }
435
436 fn get_cf_handle(&self, cf_name: &str) -> Result<&rocksdb::CFHandle> {
437 self.inner.cf_handle(cf_name).ok_or_else(|| {
438 format_err!(
439 "DB::cf_handle not found for column family name: {}",
440 cf_name
441 )
442 })
443 }
444
445 pub fn flush_all(&self, sync: bool) -> Result<()> {
448 for cf_name in &self.inner.cf_names() {
449 let cf_handle = self.get_cf_handle(cf_name)?;
450 self.inner
451 .flush_cf(cf_handle, sync)
452 .map_err(convert_rocksdb_err)?;
453 }
454 Ok(())
455 }
456
457 pub fn get_property(
458 &self, cf_name: &str, property_name: &str,
459 ) -> Result<u64> {
460 self.inner
461 .get_property_int_cf(self.get_cf_handle(&cf_name)?, property_name)
462 .ok_or_else(|| {
463 format_err!(
464 "Unable to get property \"{}\" of column family \"{}\".",
465 property_name,
466 cf_name,
467 )
468 })
469 }
470}
471
472fn default_write_options() -> rocksdb::WriteOptions {
477 let mut opts = rocksdb::WriteOptions::default();
478 opts.set_sync(true);
479 opts
480}
481
482fn fast_write_options() -> rocksdb::WriteOptions {
483 let mut opts = rocksdb::WriteOptions::default();
484 opts.set_sync(false);
485 opts
487}