kvdb_rocksdb/
lib.rs

1// Copyright 2015-2018 Parity Technologies (UK) Ltd.
2// This file is part of Parity.
3
4// Parity is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Parity is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Parity.  If not, see <http://www.gnu.org/licenses/>.
16
17// Copyright 2019 Conflux Foundation. All rights reserved.
18// Conflux is free software and distributed under GNU General Public License.
19// See http://www.gnu.org/licenses/
20
21use std::{cmp, collections::HashMap, error, fs, io, mem, path::Path, result};
22
23use parking_lot::{Mutex, MutexGuard, RwLock};
24use rocksdb::{
25    BlockBasedOptions, CFHandle, ColumnFamilyOptions, DBOptions, ReadOptions,
26    Writable, WriteBatch, WriteOptions, DB,
27};
28
29use fs_swap::{swap, swap_nonatomic};
30use kvdb::{DBKey, DBOp, DBTransaction, DBValue, KeyValueDB};
31use log::warn;
32
33use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
34use malloc_size_of_derive::MallocSizeOf as MallocSizeOfDerive;
35#[cfg(target_os = "linux")]
36use regex::Regex;
37#[cfg(target_os = "linux")]
38use std::fs::File;
39#[cfg(target_os = "linux")]
40use std::path::PathBuf;
41#[cfg(target_os = "linux")]
42use std::process::Command;
43
44type DBError = String;
45fn other_io_err<E>(e: E) -> io::Error
46where E: Into<Box<dyn error::Error + Send + Sync>> {
47    io::Error::new(io::ErrorKind::Other, e)
48}
49
50const KB: usize = 1024;
51const MB: usize = 1024 * KB;
52const DB_DEFAULT_MEMORY_BUDGET_MB: usize = 128;
53
54#[derive(MallocSizeOfDerive)]
55enum KeyState {
56    Insert(DBValue),
57    Delete,
58}
59
60/// Compaction profile for the database settings
61#[derive(Clone, Copy, PartialEq, Debug)]
62pub struct CompactionProfile {
63    /// L0-L1 target file size
64    pub initial_file_size: u64,
65    /// block size
66    pub block_size: usize,
67    /// rate limiter for background flushes and compactions, bytes/sec, if any
68    pub write_rate_limit: Option<u64>,
69}
70
71impl Default for CompactionProfile {
72    /// Default profile suitable for most storage
73    fn default() -> CompactionProfile { CompactionProfile::ssd() }
74}
75
76/// Given output of df command return Linux rotational flag file path.
77#[cfg(target_os = "linux")]
78pub fn rotational_from_df_output(df_out: Vec<u8>) -> Option<PathBuf> {
79    use std::str;
80    str::from_utf8(df_out.as_slice())
81        .ok()
82        // Get the drive name.
83        .and_then(|df_str| {
84            Regex::new(r"/dev/(sd[:alpha:]{1,2})")
85                .ok()
86                .and_then(|re| re.captures(df_str))
87                .and_then(|captures| captures.get(1))
88        })
89        // Generate path e.g. /sys/block/sda/queue/rotational
90        .map(|drive_path| {
91            let mut p = PathBuf::from("/sys/block");
92            p.push(drive_path.as_str());
93            p.push("queue/rotational");
94            p
95        })
96}
97
98impl CompactionProfile {
99    /// Attempt to determine the best profile automatically, only Linux for now.
100    #[cfg(target_os = "linux")]
101    pub fn auto(db_path: &Path) -> CompactionProfile {
102        use std::io::Read;
103        let hdd_check_file = db_path
104            .to_str()
105            .and_then(|path_str| Command::new("df").arg(path_str).output().ok())
106            .and_then(|df_res| match df_res.status.success() {
107                true => Some(df_res.stdout),
108                false => None,
109            })
110            .and_then(rotational_from_df_output);
111        // Read out the file and match compaction profile.
112        if let Some(hdd_check) = hdd_check_file {
113            if let Ok(mut file) = File::open(hdd_check.as_path()) {
114                let mut buffer = [0; 1];
115                if file.read_exact(&mut buffer).is_ok() {
116                    // 0 means not rotational.
117                    if buffer == [48] {
118                        return Self::ssd();
119                    }
120                    // 1 means rotational.
121                    if buffer == [49] {
122                        return Self::hdd();
123                    }
124                }
125            }
126        }
127        // Fallback if drive type was not determined.
128        Self::default()
129    }
130
131    /// Just default for other platforms.
132    #[cfg(not(target_os = "linux"))]
133    pub fn auto(_db_path: &Path) -> CompactionProfile { Self::default() }
134
135    /// Default profile suitable for SSD storage
136    pub fn ssd() -> CompactionProfile {
137        CompactionProfile {
138            initial_file_size: 64 * MB as u64,
139            block_size: 16 * KB,
140            write_rate_limit: None,
141        }
142    }
143
144    /// Slow HDD compaction profile
145    pub fn hdd() -> CompactionProfile {
146        CompactionProfile {
147            initial_file_size: 256 * MB as u64,
148            block_size: 64 * KB,
149            write_rate_limit: Some(16 * MB as u64),
150        }
151    }
152}
153
154/// Database configuration
155#[derive(Clone)]
156pub struct DatabaseConfig {
157    /// Max number of open files.
158    pub max_open_files: i32,
159    /// Memory budget (in MiB) used for setting block cache size, write buffer
160    /// size.
161    pub memory_budget: Option<usize>,
162    /// Compaction profile
163    pub compaction: CompactionProfile,
164    /// Set number of columns
165    pub columns: u32,
166    /// Disable WAL if set to `true`
167    pub disable_wal: bool,
168}
169
170impl DatabaseConfig {
171    /// Create new `DatabaseConfig` with default parameters and specified set of
172    /// columns. Note that cache sizes must be explicitly set.
173    pub fn with_columns(columns: u32) -> Self {
174        let mut config = Self::default();
175        config.columns = columns;
176        config
177    }
178
179    pub fn memory_budget(&self) -> usize {
180        self.memory_budget.unwrap_or(DB_DEFAULT_MEMORY_BUDGET_MB) * MB
181    }
182
183    pub fn memory_budget_per_col(&self) -> usize {
184        self.memory_budget() / self.columns as usize
185    }
186
187    pub fn memory_budget_mb(&self) -> usize {
188        self.memory_budget.unwrap_or(DB_DEFAULT_MEMORY_BUDGET_MB)
189    }
190}
191
192impl Default for DatabaseConfig {
193    fn default() -> DatabaseConfig {
194        DatabaseConfig {
195            max_open_files: 512,
196            memory_budget: None,
197            compaction: CompactionProfile::default(),
198            columns: 1,
199            disable_wal: false,
200        }
201    }
202}
203
204struct DBAndColumns {
205    db: DB,
206    column_names: Vec<String>,
207}
208
209impl DBAndColumns {
210    fn get_cf(&self, i: usize) -> &CFHandle {
211        self.db
212            .cf_handle(&self.column_names[i])
213            .expect("the specified column name is correct; qed")
214    }
215
216    fn static_property_or_warn(&self, col: usize, prop: &str) -> Option<usize> {
217        match self.db.get_property_int_cf(self.get_cf(col), prop) {
218            Some(v) => Some(v as usize),
219            None => {
220                warn!("Cannot read expected static property of RocksDb database: {}", prop);
221                None
222            }
223        }
224    }
225}
226
227impl MallocSizeOf for DBAndColumns {
228    fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
229        let mut total = MallocSizeOf::size_of(&self.column_names, ops)
230            // we have at least one column always, so we can call property on it
231            + self.static_property_or_warn(0, "rocksdb.block-cache-usage").unwrap_or(0);
232
233        for v in 0..self.column_names.len() {
234            total += self
235                .static_property_or_warn(
236                    v,
237                    "rocksdb.estimate-table-readers-mem",
238                )
239                .unwrap_or(0);
240            total += self
241                .static_property_or_warn(v, "rocksdb.cur-size-all-mem-tables")
242                .unwrap_or(0);
243        }
244
245        total
246    }
247}
248
249// get column family configuration from database config.
250fn col_config(
251    config: &DatabaseConfig, block_opts: &BlockBasedOptions,
252) -> io::Result<ColumnFamilyOptions> {
253    let mut opts = ColumnFamilyOptions::default();
254
255    opts.set_level_compaction_dynamic_level_bytes(true);
256    opts.set_block_based_table_factory(block_opts);
257    opts.optimize_level_style_compaction(config.memory_budget_per_col() as i32);
258    opts.set_target_file_size_base(config.compaction.initial_file_size);
259    opts.set_write_buffer_size(config.memory_budget_per_col() as u64 / 2);
260    opts.set_block_cache_size_mb(config.memory_budget_mb() as u64 / 3);
261
262    Ok(opts)
263}
264
265unsafe impl Send for Database {}
266unsafe impl Sync for Database {}
267/// Key-Value database.
268#[derive(MallocSizeOfDerive)]
269pub struct Database {
270    db: RwLock<Option<DBAndColumns>>,
271    #[ignore_malloc_size_of = "insignificant"]
272    config: DatabaseConfig,
273    path: String,
274    #[ignore_malloc_size_of = "insignificant"]
275    write_opts: WriteOptions,
276    #[ignore_malloc_size_of = "insignificant"]
277    read_opts: ReadOptions,
278    #[ignore_malloc_size_of = "insignificant"]
279    block_opts: BlockBasedOptions,
280    // Dirty values added with `write_buffered`. Cleaned on `flush`.
281    overlay: RwLock<Vec<HashMap<DBKey, KeyState>>>,
282    // Values currently being flushed. Cleared when `flush` completes.
283    flushing: RwLock<Vec<HashMap<DBKey, KeyState>>>,
284    // Prevents concurrent flushes.
285    // Value indicates if a flush is in progress.
286    flushing_lock: Mutex<bool>,
287}
288
289#[inline]
290fn check_for_corruption<T, P: AsRef<Path>>(
291    path: P, res: result::Result<T, DBError>,
292) -> io::Result<T> {
293    if let Err(ref s) = res {
294        if is_corrupted(s) {
295            warn!(
296                "DB corrupted: {}. Repair will be triggered on next restart",
297                s
298            );
299            let _ = fs::File::create(
300                path.as_ref().join(Database::CORRUPTION_FILE_NAME),
301            );
302        }
303    }
304
305    res.map_err(other_io_err)
306}
307
308fn is_corrupted(err: &DBError) -> bool {
309    err.starts_with("Corruption:")
310        || err.starts_with(
311            "Invalid argument: You have to open all column families",
312        )
313}
314
315/// Generate the options for RocksDB, based on the given `DatabaseConfig`.
316fn generate_options(config: &DatabaseConfig) -> DBOptions {
317    let mut opts = DBOptions::default();
318
319    //TODO: rate_limiter_bytes_per_sec={} was removed
320
321    opts.set_use_fsync(false);
322    opts.create_if_missing(true);
323    opts.set_max_open_files(config.max_open_files);
324    opts.set_bytes_per_sync(1 * MB as u64);
325    opts.set_keep_log_file_num(1);
326    opts.increase_parallelism(cmp::max(1, num_cpus::get() as i32 / 2));
327    opts.enable_statistics(true);
328    opts.create_missing_column_families(true);
329
330    opts
331}
332
333impl Database {
334    const CORRUPTION_FILE_NAME: &'static str = "CORRUPTED";
335
336    /// Open database with default settings.
337    pub fn open_default(path: &str) -> io::Result<Database> {
338        Database::open(&DatabaseConfig::default(), path)
339    }
340
341    /// Open database file. Creates if it does not exist.
342    pub fn open(config: &DatabaseConfig, path: &str) -> io::Result<Database> {
343        let mut block_opts = BlockBasedOptions::default();
344        block_opts.set_block_size(config.compaction.block_size);
345        // Set cache size as recommended by
346        // https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#block-cache-size
347        block_opts.set_cache_index_and_filter_blocks(true);
348        block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
349        block_opts.set_bloom_filter(10, true);
350
351        let opts = generate_options(config);
352
353        // attempt database repair if it has been previously marked as corrupted
354        let db_corrupted = Path::new(path).join(Database::CORRUPTION_FILE_NAME);
355        if db_corrupted.exists() {
356            warn!(
357                "DB has been previously marked as corrupted, attempting repair"
358            );
359            DB::repair(opts.clone(), path).map_err(other_io_err)?;
360            fs::remove_file(db_corrupted)?;
361        }
362
363        let columns = config.columns as usize;
364        if columns == 0 {
365            return Err(other_io_err("columns number cannot be 0"));
366        }
367
368        let mut cf_options = Vec::with_capacity(columns);
369        let column_names: Vec<_> =
370            (0..columns).map(|c| format!("col{}", c)).collect();
371        let cfnames: Vec<&str> =
372            column_names.iter().map(|n| n as &str).collect();
373
374        for i in 0..config.columns {
375            cf_options
376                .push((cfnames[i as usize], col_config(&config, &block_opts)?));
377        }
378
379        let mut write_opts = WriteOptions::new();
380        write_opts.disable_wal(config.disable_wal);
381        let mut read_opts = ReadOptions::default();
382        read_opts.set_prefix_same_as_start(true);
383        read_opts.set_verify_checksums(false);
384
385        let db = match DB::open_cf(opts.clone(), path, cf_options.clone()) {
386            Ok(db) => {
387                for name in &cfnames {
388                    let _ = db.cf_handle(name).expect(
389                        "rocksdb opens a cf_handle for each cfname; qed",
390                    );
391                }
392                Ok(db)
393            }
394            Err(_) => {
395                // retry and create CFs
396                match DB::open_cf(
397                    opts.clone(),
398                    path,
399                    Vec::<(&str, ColumnFamilyOptions)>::new(),
400                ) {
401                    Ok(mut db) => {
402                        for cfd in &cf_options {
403                            db.create_cf(cfd.clone()).map_err(other_io_err)?;
404                        }
405                        Ok(db)
406                    }
407                    err => err,
408                }
409            }
410        };
411
412        let db = match db {
413            Ok(db) => db,
414            Err(ref s) if is_corrupted(s) => {
415                warn!("DB corrupted: {}, attempting repair", s);
416                DB::repair(opts.clone(), path).map_err(other_io_err)?;
417                let db = DB::open_cf(opts, path, cf_options)
418                    .map_err(other_io_err)?;
419                for name in cfnames {
420                    let _ = db.cf_handle(name).expect(
421                        "rocksdb opens a cf_handle for each cfname; qed",
422                    );
423                }
424                db
425            }
426            Err(s) => return Err(other_io_err(s)),
427        };
428        let num_cols = column_names.len();
429        Ok(Database {
430            db: RwLock::new(Some(DBAndColumns { db, column_names })),
431            config: config.clone(),
432            overlay: RwLock::new(
433                (0..=num_cols).map(|_| HashMap::new()).collect(),
434            ),
435            flushing: RwLock::new(
436                (0..=num_cols).map(|_| HashMap::new()).collect(),
437            ),
438            flushing_lock: Mutex::new(false),
439            path: path.to_owned(),
440            read_opts,
441            write_opts,
442            block_opts,
443        })
444    }
445
446    /// Helper to create new transaction for this database.
447    pub fn transaction(&self) -> DBTransaction { DBTransaction::new() }
448
449    /// Commit transaction to database.
450    pub fn write_buffered(&self, tr: DBTransaction) {
451        let mut overlay = self.overlay.write();
452        let ops = tr.ops;
453        for op in ops {
454            match op {
455                DBOp::Insert { col, key, value } => {
456                    overlay[col as usize].insert(key, KeyState::Insert(value));
457                }
458                DBOp::Delete { col, key } => {
459                    overlay[col as usize].insert(key, KeyState::Delete);
460                }
461                DBOp::DeletePrefix { .. } => {
462                    unimplemented!("DeletePrefix is not supported")
463                }
464            }
465        }
466    }
467
468    /// Commit buffered changes to database. Must be called under `flush_lock`
469    fn write_flushing_with_lock(
470        &self, _lock: &mut MutexGuard<'_, bool>,
471    ) -> io::Result<()> {
472        match *self.db.read() {
473            Some(ref cfs) => {
474                let batch = WriteBatch::default();
475                mem::swap(
476                    &mut *self.overlay.write(),
477                    &mut *self.flushing.write(),
478                );
479                {
480                    for (c, column) in self.flushing.read().iter().enumerate() {
481                        for (key, state) in column.iter() {
482                            match *state {
483                                KeyState::Delete => {
484                                    let cf = cfs.get_cf(c);
485                                    batch
486                                        .delete_cf(cf, key)
487                                        .map_err(other_io_err)?;
488                                }
489                                KeyState::Insert(ref value) => {
490                                    let cf = cfs.get_cf(c);
491                                    batch
492                                        .put_cf(cf, key, value)
493                                        .map_err(other_io_err)?;
494                                }
495                            }
496                        }
497                    }
498                }
499
500                check_for_corruption(
501                    &self.path,
502                    cfs.db.write_opt(&batch, &self.write_opts),
503                )?;
504
505                for column in self.flushing.write().iter_mut() {
506                    column.clear();
507                    column.shrink_to_fit();
508                }
509                Ok(())
510            }
511            None => Err(other_io_err("Database is closed")),
512        }
513    }
514
515    /// Commit buffered changes to database.
516    pub fn flush(&self) -> io::Result<()> {
517        let mut lock = self.flushing_lock.lock();
518        // If RocksDB batch allocation fails the thread gets terminated and the
519        // lock is released. The value inside the lock is used to detect
520        // that.
521        if *lock {
522            // This can only happen if another flushing thread is terminated
523            // unexpectedly.
524            return Err(other_io_err(
525                "Database write failure. Running low on memory perhaps?",
526            ));
527        }
528        *lock = true;
529        let result = self.write_flushing_with_lock(&mut lock);
530        *lock = false;
531        result
532    }
533
534    /// Commit transaction to database.
535    pub fn write(&self, tr: DBTransaction) -> io::Result<()> {
536        match *self.db.read() {
537            Some(ref cfs) => {
538                let batch = WriteBatch::default();
539                let ops = tr.ops;
540                for op in ops {
541                    // remove any buffered operation for this key
542                    self.overlay.write()[op.col() as usize].remove(op.key());
543
544                    match op {
545                        DBOp::Insert { col, key, value } => batch
546                            .put_cf(cfs.get_cf(col as usize), &key, &value)
547                            .map_err(other_io_err)?,
548                        DBOp::Delete { col, key } => batch
549                            .delete_cf(cfs.get_cf(col as usize), &key)
550                            .map_err(other_io_err)?,
551                        DBOp::DeletePrefix { .. } => {
552                            unimplemented!("DeletePrefix is not supported")
553                        }
554                    }
555                }
556
557                check_for_corruption(
558                    &self.path,
559                    cfs.db.write_opt(&batch, &self.write_opts),
560                )
561            }
562            None => Err(other_io_err("Database is closed")),
563        }
564    }
565
566    /// Get value by key.
567    pub fn get(&self, col: u32, key: &[u8]) -> io::Result<Option<DBValue>> {
568        match *self.db.read() {
569            Some(ref cfs) => {
570                let overlay = &self.overlay.read()[col as usize];
571                match overlay.get(key) {
572                    Some(&KeyState::Insert(ref value)) => {
573                        Ok(Some(value.clone()))
574                    }
575                    Some(&KeyState::Delete) => Ok(None),
576                    None => {
577                        let flushing = &self.flushing.read()[col as usize];
578                        match flushing.get(key) {
579                            Some(&KeyState::Insert(ref value)) => {
580                                Ok(Some(value.clone()))
581                            }
582                            Some(&KeyState::Delete) => Ok(None),
583                            None => cfs
584                                .db
585                                .get_cf_opt(
586                                    cfs.get_cf(col as usize),
587                                    key,
588                                    &self.read_opts,
589                                )
590                                .map(|r| r.map(|v| v.to_vec()))
591                                .map_err(other_io_err),
592                        }
593                    }
594                }
595            }
596            None => Ok(None),
597        }
598    }
599
600    /// Close the database
601    fn close(&self) {
602        *self.db.write() = None;
603        self.overlay.write().clear();
604        self.flushing.write().clear();
605    }
606
607    /// Restore the database from a copy at given path.
608    pub fn restore(&self, new_db: &str) -> io::Result<()> {
609        self.close();
610
611        // swap is guaranteed to be atomic
612        match swap(new_db, &self.path) {
613            Ok(_) => {
614                // ignore errors
615                let _ = fs::remove_dir_all(new_db);
616            }
617            Err(err) => {
618                warn!("DB atomic swap failed: {}", err);
619                match swap_nonatomic(new_db, &self.path) {
620                    Ok(_) => {
621                        // ignore errors
622                        let _ = fs::remove_dir_all(new_db);
623                    }
624                    Err(err) => {
625                        warn!("Failed to swap DB directories: {:?}", err);
626                        return Err(io::Error::new(
627                            io::ErrorKind::Other,
628                            "DB restoration failed: could not swap DB directories",
629                        ));
630                    }
631                }
632            }
633        }
634
635        // reopen the database and steal handles into self
636        let db = Self::open(&self.config, &self.path)?;
637        *self.db.write() = mem::replace(&mut *db.db.write(), None);
638        *self.overlay.write() =
639            mem::replace(&mut *db.overlay.write(), Vec::new());
640        *self.flushing.write() =
641            mem::replace(&mut *db.flushing.write(), Vec::new());
642        Ok(())
643    }
644
645    /// The number of non-default column families.
646    pub fn num_columns(&self) -> u32 {
647        self.db
648            .read()
649            .as_ref()
650            .and_then(|db| {
651                if db.column_names.is_empty() {
652                    None
653                } else {
654                    Some(db.column_names.len())
655                }
656            })
657            .map(|n| n as u32)
658            .unwrap_or(0)
659    }
660
661    /// Drop a column family.
662    pub fn drop_column(&self) -> io::Result<()> {
663        match *self.db.write() {
664            Some(DBAndColumns {
665                ref mut db,
666                ref mut column_names,
667            }) => {
668                if let Some(name) = column_names.pop() {
669                    db.drop_cf(&name).map_err(other_io_err)?;
670                }
671                Ok(())
672            }
673            None => Ok(()),
674        }
675    }
676
677    /// Add a column family.
678    pub fn add_column(&self) -> io::Result<()> {
679        match *self.db.write() {
680            Some(DBAndColumns {
681                ref mut db,
682                ref mut column_names,
683            }) => {
684                let col = column_names.len() as u32;
685                let name = format!("col{}", col);
686                db.create_cf((
687                    name.as_str(),
688                    col_config(&self.config, &self.block_opts)?,
689                ))
690                .map_err(other_io_err)?;
691                column_names.push(name);
692                Ok(())
693            }
694            None => Ok(()),
695        }
696    }
697}
698
699// duplicate declaration of methods here to avoid trait import in certain
700// existing cases at time of addition.
701impl KeyValueDB for Database {
702    fn get(&self, col: u32, key: &[u8]) -> io::Result<Option<DBValue>> {
703        Database::get(self, col, key)
704    }
705
706    fn get_by_prefix(
707        &self, _col: u32, _prefix: &[u8],
708    ) -> io::Result<Option<DBValue>> {
709        unimplemented!()
710    }
711
712    fn write(&self, transaction: DBTransaction) -> io::Result<()> {
713        Database::write(self, transaction)
714    }
715
716    fn iter<'a>(
717        &'a self, _col: u32,
718    ) -> Box<dyn Iterator<Item = io::Result<kvdb::DBKeyValue>> + 'a> {
719        unimplemented!()
720    }
721
722    fn iter_with_prefix<'a>(
723        &'a self, _col: u32, _prefix: &'a [u8],
724    ) -> Box<dyn Iterator<Item = io::Result<kvdb::DBKeyValue>> + 'a> {
725        unimplemented!()
726    }
727}
728
729impl Drop for Database {
730    fn drop(&mut self) {
731        // write all buffered changes if we can.
732        let _ = self.flush();
733    }
734}
735
736#[cfg(test)]
737mod tests {
738    use super::*;
739    use cfx_types::H256;
740    use std::str::FromStr;
741    use tempfile::tempdir;
742
743    fn test_db(config: &DatabaseConfig) {
744        let tempdir = tempdir().unwrap();
745        let db =
746            Database::open(config, tempdir.path().to_str().unwrap()).unwrap();
747        let key1 = H256::from_str(
748            "02c69be41d0b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc",
749        )
750        .unwrap();
751        let key2 = H256::from_str(
752            "03c69be41d0b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc",
753        )
754        .unwrap();
755        let key3 = H256::from_str(
756            "01c69be41d0b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc",
757        )
758        .unwrap();
759
760        let mut batch = db.transaction();
761        batch.put(0, key1.as_bytes(), b"cat");
762        batch.put(0, key2.as_bytes(), b"dog");
763        db.write(batch).unwrap();
764
765        assert_eq!(&*db.get(0, key1.as_bytes()).unwrap().unwrap(), b"cat");
766
767        // TODO implement iter
768        //        let contents: Vec<_> = db.iter(None).collect();
769        //        assert_eq!(contents.len(), 2);
770        //        assert_eq!(&*contents[0].0, key1.as_bytes());
771        //        assert_eq!(&*contents[0].1, b"cat");
772        //        assert_eq!(&*contents[1].0, key2.as_bytes());
773        //        assert_eq!(&*contents[1].1, b"dog");
774
775        let mut batch = db.transaction();
776        batch.delete(0, key1.as_bytes());
777        db.write(batch).unwrap();
778
779        assert!(db.get(0, key1.as_bytes()).unwrap().is_none());
780
781        let mut batch = db.transaction();
782        batch.put(0, key1.as_bytes(), b"cat");
783        db.write(batch).unwrap();
784
785        let mut transaction = db.transaction();
786        transaction.put(0, key3.as_bytes(), b"elephant");
787        transaction.delete(0, key1.as_bytes());
788        db.write(transaction).unwrap();
789        assert!(db.get(0, key1.as_bytes()).unwrap().is_none());
790        assert_eq!(&*db.get(0, key3.as_bytes()).unwrap().unwrap(), b"elephant");
791
792        // TODO Implement get_by_prefix
793        //        assert_eq!(
794        //            &*db.get_by_prefix(None, key3.as_bytes()).unwrap(),
795        //            b"elephant"
796        //        );
797        //        assert_eq!(&*db.get_by_prefix(None, key2.as_bytes()).unwrap(),
798        // b"dog");
799
800        let mut transaction = db.transaction();
801        transaction.put(0, key1.as_bytes(), b"horse");
802        transaction.delete(0, key3.as_bytes());
803        db.write_buffered(transaction);
804        assert!(db.get(0, key3.as_bytes()).unwrap().is_none());
805        assert_eq!(&*db.get(0, key1.as_bytes()).unwrap().unwrap(), b"horse");
806
807        db.flush().unwrap();
808        assert!(db.get(0, key3.as_bytes()).unwrap().is_none());
809        assert_eq!(&*db.get(0, key1.as_bytes()).unwrap().unwrap(), b"horse");
810    }
811
812    #[test]
813    fn kvdb() {
814        let tempdir = tempdir().unwrap();
815        let _ =
816            Database::open_default(tempdir.path().to_str().unwrap()).unwrap();
817        test_db(&DatabaseConfig::default());
818    }
819
820    #[test]
821    #[cfg(target_os = "linux")]
822    fn df_to_rotational() {
823        use std::path::PathBuf;
824        // Example df output.
825        let example_df = vec![
826            70, 105, 108, 101, 115, 121, 115, 116, 101, 109, 32, 32, 32, 32,
827            32, 49, 75, 45, 98, 108, 111, 99, 107, 115, 32, 32, 32, 32, 32, 85,
828            115, 101, 100, 32, 65, 118, 97, 105, 108, 97, 98, 108, 101, 32, 85,
829            115, 101, 37, 32, 77, 111, 117, 110, 116, 101, 100, 32, 111, 110,
830            10, 47, 100, 101, 118, 47, 115, 100, 97, 49, 32, 32, 32, 32, 32,
831            32, 32, 54, 49, 52, 48, 57, 51, 48, 48, 32, 51, 56, 56, 50, 50, 50,
832            51, 54, 32, 32, 49, 57, 52, 52, 52, 54, 49, 54, 32, 32, 54, 55, 37,
833            32, 47, 10,
834        ];
835        let expected_output =
836            Some(PathBuf::from("/sys/block/sda/queue/rotational"));
837        assert_eq!(rotational_from_df_output(example_df), expected_output);
838    }
839
840    #[test]
841    fn add_columns() {
842        let config = DatabaseConfig::default();
843        let config_5 = DatabaseConfig::with_columns(5);
844
845        let tempdir = tempdir().unwrap();
846
847        // open empty, add 5.
848        {
849            let db = Database::open(&config, tempdir.path().to_str().unwrap())
850                .unwrap();
851            assert_eq!(db.num_columns(), 1);
852
853            for i in 0..4 {
854                db.add_column().unwrap();
855                assert_eq!(db.num_columns(), i + 2);
856            }
857        }
858
859        // reopen as 5.
860        {
861            let db =
862                Database::open(&config_5, tempdir.path().to_str().unwrap())
863                    .unwrap();
864            assert_eq!(db.num_columns(), 5);
865        }
866    }
867
868    #[test]
869    fn drop_columns() {
870        let config = DatabaseConfig::default();
871        let config_5 = DatabaseConfig::with_columns(5);
872
873        let tempdir = tempdir().unwrap();
874
875        // open 5, remove all.
876        {
877            let db =
878                Database::open(&config_5, tempdir.path().to_str().unwrap())
879                    .unwrap();
880            assert_eq!(db.num_columns(), 5);
881
882            for i in (0..5).rev() {
883                db.drop_column().unwrap();
884                assert_eq!(db.num_columns(), i);
885            }
886        }
887
888        // reopen as 0.
889        {
890            let db = Database::open(&config, tempdir.path().to_str().unwrap())
891                .unwrap();
892            assert_eq!(db.num_columns(), 1);
893        }
894    }
895
896    #[test]
897    fn write_clears_buffered_ops() {
898        let tempdir = tempdir().unwrap();
899        let config = DatabaseConfig::default();
900        let db =
901            Database::open(&config, tempdir.path().to_str().unwrap()).unwrap();
902
903        let mut batch = db.transaction();
904        batch.put(0, b"foo", b"bar");
905        db.write_buffered(batch);
906
907        let mut batch = db.transaction();
908        batch.put(0, b"foo", b"baz");
909        db.write(batch).unwrap();
910
911        assert_eq!(db.get(0, b"foo").unwrap().unwrap(), b"baz");
912    }
913
914    #[test]
915    fn test_memory_property() {
916        let tempdir = tempdir().unwrap();
917        let db = Database::open(
918            &DatabaseConfig::default(),
919            tempdir.path().to_str().unwrap(),
920        )
921        .unwrap();
922        let key1 = H256::from_str(
923            "02c69be41d0b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc",
924        )
925        .unwrap();
926        let mut tx = db.transaction();
927        tx.put(0, key1.as_bytes(), b"123");
928        db.write(tx).unwrap();
929        db.flush().unwrap();
930        let db_locked = db.db.read();
931        let db_and_col = db_locked.as_ref().unwrap();
932        assert!(db_and_col
933            .static_property_or_warn(0, "rocksdb.block-cache-usage")
934            .is_some());
935        assert!(db_and_col
936            .static_property_or_warn(0, "rocksdb.estimate-table-readers-mem",)
937            .is_some());
938        assert!(db_and_col
939            .static_property_or_warn(0, "rocksdb.cur-size-all-mem-tables")
940            .is_some());
941        assert!(db_and_col
942            .static_property_or_warn(0, "rocksdb.fake-property")
943            .is_none());
944    }
945}