1use std::{cmp, collections::HashMap, error, fs, io, mem, path::Path, result};
22
23use parking_lot::{Mutex, MutexGuard, RwLock};
24use rocksdb::{
25 BlockBasedOptions, CFHandle, ColumnFamilyOptions, DBOptions, ReadOptions,
26 Writable, WriteBatch, WriteOptions, DB,
27};
28
29use fs_swap::{swap, swap_nonatomic};
30use kvdb::{DBKey, DBOp, DBTransaction, DBValue, KeyValueDB};
31use log::warn;
32
33use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
34use malloc_size_of_derive::MallocSizeOf as MallocSizeOfDerive;
35#[cfg(target_os = "linux")]
36use regex::Regex;
37#[cfg(target_os = "linux")]
38use std::fs::File;
39#[cfg(target_os = "linux")]
40use std::path::PathBuf;
41#[cfg(target_os = "linux")]
42use std::process::Command;
43
44type DBError = String;
45fn other_io_err<E>(e: E) -> io::Error
46where E: Into<Box<dyn error::Error + Send + Sync>> {
47 io::Error::new(io::ErrorKind::Other, e)
48}
49
50const KB: usize = 1024;
51const MB: usize = 1024 * KB;
52const DB_DEFAULT_MEMORY_BUDGET_MB: usize = 128;
53
54#[derive(MallocSizeOfDerive)]
55enum KeyState {
56 Insert(DBValue),
57 Delete,
58}
59
60#[derive(Clone, Copy, PartialEq, Debug)]
62pub struct CompactionProfile {
63 pub initial_file_size: u64,
65 pub block_size: usize,
67 pub write_rate_limit: Option<u64>,
69}
70
71impl Default for CompactionProfile {
72 fn default() -> CompactionProfile { CompactionProfile::ssd() }
74}
75
76#[cfg(target_os = "linux")]
78pub fn rotational_from_df_output(df_out: Vec<u8>) -> Option<PathBuf> {
79 use std::str;
80 str::from_utf8(df_out.as_slice())
81 .ok()
82 .and_then(|df_str| {
84 Regex::new(r"/dev/(sd[:alpha:]{1,2})")
85 .ok()
86 .and_then(|re| re.captures(df_str))
87 .and_then(|captures| captures.get(1))
88 })
89 .map(|drive_path| {
91 let mut p = PathBuf::from("/sys/block");
92 p.push(drive_path.as_str());
93 p.push("queue/rotational");
94 p
95 })
96}
97
98impl CompactionProfile {
99 #[cfg(target_os = "linux")]
101 pub fn auto(db_path: &Path) -> CompactionProfile {
102 use std::io::Read;
103 let hdd_check_file = db_path
104 .to_str()
105 .and_then(|path_str| Command::new("df").arg(path_str).output().ok())
106 .and_then(|df_res| match df_res.status.success() {
107 true => Some(df_res.stdout),
108 false => None,
109 })
110 .and_then(rotational_from_df_output);
111 if let Some(hdd_check) = hdd_check_file {
113 if let Ok(mut file) = File::open(hdd_check.as_path()) {
114 let mut buffer = [0; 1];
115 if file.read_exact(&mut buffer).is_ok() {
116 if buffer == [48] {
118 return Self::ssd();
119 }
120 if buffer == [49] {
122 return Self::hdd();
123 }
124 }
125 }
126 }
127 Self::default()
129 }
130
131 #[cfg(not(target_os = "linux"))]
133 pub fn auto(_db_path: &Path) -> CompactionProfile { Self::default() }
134
135 pub fn ssd() -> CompactionProfile {
137 CompactionProfile {
138 initial_file_size: 64 * MB as u64,
139 block_size: 16 * KB,
140 write_rate_limit: None,
141 }
142 }
143
144 pub fn hdd() -> CompactionProfile {
146 CompactionProfile {
147 initial_file_size: 256 * MB as u64,
148 block_size: 64 * KB,
149 write_rate_limit: Some(16 * MB as u64),
150 }
151 }
152}
153
154#[derive(Clone)]
156pub struct DatabaseConfig {
157 pub max_open_files: i32,
159 pub memory_budget: Option<usize>,
162 pub compaction: CompactionProfile,
164 pub columns: u32,
166 pub disable_wal: bool,
168}
169
170impl DatabaseConfig {
171 pub fn with_columns(columns: u32) -> Self {
174 let mut config = Self::default();
175 config.columns = columns;
176 config
177 }
178
179 pub fn memory_budget(&self) -> usize {
180 self.memory_budget.unwrap_or(DB_DEFAULT_MEMORY_BUDGET_MB) * MB
181 }
182
183 pub fn memory_budget_per_col(&self) -> usize {
184 self.memory_budget() / self.columns as usize
185 }
186
187 pub fn memory_budget_mb(&self) -> usize {
188 self.memory_budget.unwrap_or(DB_DEFAULT_MEMORY_BUDGET_MB)
189 }
190}
191
192impl Default for DatabaseConfig {
193 fn default() -> DatabaseConfig {
194 DatabaseConfig {
195 max_open_files: 512,
196 memory_budget: None,
197 compaction: CompactionProfile::default(),
198 columns: 1,
199 disable_wal: false,
200 }
201 }
202}
203
204struct DBAndColumns {
205 db: DB,
206 column_names: Vec<String>,
207}
208
209impl DBAndColumns {
210 fn get_cf(&self, i: usize) -> &CFHandle {
211 self.db
212 .cf_handle(&self.column_names[i])
213 .expect("the specified column name is correct; qed")
214 }
215
216 fn static_property_or_warn(&self, col: usize, prop: &str) -> Option<usize> {
217 match self.db.get_property_int_cf(self.get_cf(col), prop) {
218 Some(v) => Some(v as usize),
219 None => {
220 warn!("Cannot read expected static property of RocksDb database: {}", prop);
221 None
222 }
223 }
224 }
225}
226
227impl MallocSizeOf for DBAndColumns {
228 fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
229 let mut total = MallocSizeOf::size_of(&self.column_names, ops)
230 + self.static_property_or_warn(0, "rocksdb.block-cache-usage").unwrap_or(0);
232
233 for v in 0..self.column_names.len() {
234 total += self
235 .static_property_or_warn(
236 v,
237 "rocksdb.estimate-table-readers-mem",
238 )
239 .unwrap_or(0);
240 total += self
241 .static_property_or_warn(v, "rocksdb.cur-size-all-mem-tables")
242 .unwrap_or(0);
243 }
244
245 total
246 }
247}
248
249fn col_config(
251 config: &DatabaseConfig, block_opts: &BlockBasedOptions,
252) -> io::Result<ColumnFamilyOptions> {
253 let mut opts = ColumnFamilyOptions::default();
254
255 opts.set_level_compaction_dynamic_level_bytes(true);
256 opts.set_block_based_table_factory(block_opts);
257 opts.optimize_level_style_compaction(config.memory_budget_per_col() as i32);
258 opts.set_target_file_size_base(config.compaction.initial_file_size);
259 opts.set_write_buffer_size(config.memory_budget_per_col() as u64 / 2);
260 opts.set_block_cache_size_mb(config.memory_budget_mb() as u64 / 3);
261
262 Ok(opts)
263}
264
265unsafe impl Send for Database {}
266unsafe impl Sync for Database {}
267#[derive(MallocSizeOfDerive)]
269pub struct Database {
270 db: RwLock<Option<DBAndColumns>>,
271 #[ignore_malloc_size_of = "insignificant"]
272 config: DatabaseConfig,
273 path: String,
274 #[ignore_malloc_size_of = "insignificant"]
275 write_opts: WriteOptions,
276 #[ignore_malloc_size_of = "insignificant"]
277 read_opts: ReadOptions,
278 #[ignore_malloc_size_of = "insignificant"]
279 block_opts: BlockBasedOptions,
280 overlay: RwLock<Vec<HashMap<DBKey, KeyState>>>,
282 flushing: RwLock<Vec<HashMap<DBKey, KeyState>>>,
284 flushing_lock: Mutex<bool>,
287}
288
289#[inline]
290fn check_for_corruption<T, P: AsRef<Path>>(
291 path: P, res: result::Result<T, DBError>,
292) -> io::Result<T> {
293 if let Err(ref s) = res {
294 if is_corrupted(s) {
295 warn!(
296 "DB corrupted: {}. Repair will be triggered on next restart",
297 s
298 );
299 let _ = fs::File::create(
300 path.as_ref().join(Database::CORRUPTION_FILE_NAME),
301 );
302 }
303 }
304
305 res.map_err(other_io_err)
306}
307
308fn is_corrupted(err: &DBError) -> bool {
309 err.starts_with("Corruption:")
310 || err.starts_with(
311 "Invalid argument: You have to open all column families",
312 )
313}
314
315fn generate_options(config: &DatabaseConfig) -> DBOptions {
317 let mut opts = DBOptions::default();
318
319 opts.set_use_fsync(false);
322 opts.create_if_missing(true);
323 opts.set_max_open_files(config.max_open_files);
324 opts.set_bytes_per_sync(1 * MB as u64);
325 opts.set_keep_log_file_num(1);
326 opts.increase_parallelism(cmp::max(1, num_cpus::get() as i32 / 2));
327 opts.enable_statistics(true);
328 opts.create_missing_column_families(true);
329
330 opts
331}
332
333impl Database {
334 const CORRUPTION_FILE_NAME: &'static str = "CORRUPTED";
335
336 pub fn open_default(path: &str) -> io::Result<Database> {
338 Database::open(&DatabaseConfig::default(), path)
339 }
340
341 pub fn open(config: &DatabaseConfig, path: &str) -> io::Result<Database> {
343 let mut block_opts = BlockBasedOptions::default();
344 block_opts.set_block_size(config.compaction.block_size);
345 block_opts.set_cache_index_and_filter_blocks(true);
348 block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
349 block_opts.set_bloom_filter(10, true);
350
351 let opts = generate_options(config);
352
353 let db_corrupted = Path::new(path).join(Database::CORRUPTION_FILE_NAME);
355 if db_corrupted.exists() {
356 warn!(
357 "DB has been previously marked as corrupted, attempting repair"
358 );
359 DB::repair(opts.clone(), path).map_err(other_io_err)?;
360 fs::remove_file(db_corrupted)?;
361 }
362
363 let columns = config.columns as usize;
364 if columns == 0 {
365 return Err(other_io_err("columns number cannot be 0"));
366 }
367
368 let mut cf_options = Vec::with_capacity(columns);
369 let column_names: Vec<_> =
370 (0..columns).map(|c| format!("col{}", c)).collect();
371 let cfnames: Vec<&str> =
372 column_names.iter().map(|n| n as &str).collect();
373
374 for i in 0..config.columns {
375 cf_options
376 .push((cfnames[i as usize], col_config(&config, &block_opts)?));
377 }
378
379 let mut write_opts = WriteOptions::new();
380 write_opts.disable_wal(config.disable_wal);
381 let mut read_opts = ReadOptions::default();
382 read_opts.set_prefix_same_as_start(true);
383 read_opts.set_verify_checksums(false);
384
385 let db = match DB::open_cf(opts.clone(), path, cf_options.clone()) {
386 Ok(db) => {
387 for name in &cfnames {
388 let _ = db.cf_handle(name).expect(
389 "rocksdb opens a cf_handle for each cfname; qed",
390 );
391 }
392 Ok(db)
393 }
394 Err(_) => {
395 match DB::open_cf(
397 opts.clone(),
398 path,
399 Vec::<(&str, ColumnFamilyOptions)>::new(),
400 ) {
401 Ok(mut db) => {
402 for cfd in &cf_options {
403 db.create_cf(cfd.clone()).map_err(other_io_err)?;
404 }
405 Ok(db)
406 }
407 err => err,
408 }
409 }
410 };
411
412 let db = match db {
413 Ok(db) => db,
414 Err(ref s) if is_corrupted(s) => {
415 warn!("DB corrupted: {}, attempting repair", s);
416 DB::repair(opts.clone(), path).map_err(other_io_err)?;
417 let db = DB::open_cf(opts, path, cf_options)
418 .map_err(other_io_err)?;
419 for name in cfnames {
420 let _ = db.cf_handle(name).expect(
421 "rocksdb opens a cf_handle for each cfname; qed",
422 );
423 }
424 db
425 }
426 Err(s) => return Err(other_io_err(s)),
427 };
428 let num_cols = column_names.len();
429 Ok(Database {
430 db: RwLock::new(Some(DBAndColumns { db, column_names })),
431 config: config.clone(),
432 overlay: RwLock::new(
433 (0..=num_cols).map(|_| HashMap::new()).collect(),
434 ),
435 flushing: RwLock::new(
436 (0..=num_cols).map(|_| HashMap::new()).collect(),
437 ),
438 flushing_lock: Mutex::new(false),
439 path: path.to_owned(),
440 read_opts,
441 write_opts,
442 block_opts,
443 })
444 }
445
446 pub fn transaction(&self) -> DBTransaction { DBTransaction::new() }
448
449 pub fn write_buffered(&self, tr: DBTransaction) {
451 let mut overlay = self.overlay.write();
452 let ops = tr.ops;
453 for op in ops {
454 match op {
455 DBOp::Insert { col, key, value } => {
456 overlay[col as usize].insert(key, KeyState::Insert(value));
457 }
458 DBOp::Delete { col, key } => {
459 overlay[col as usize].insert(key, KeyState::Delete);
460 }
461 DBOp::DeletePrefix { .. } => {
462 unimplemented!("DeletePrefix is not supported")
463 }
464 }
465 }
466 }
467
468 fn write_flushing_with_lock(
470 &self, _lock: &mut MutexGuard<'_, bool>,
471 ) -> io::Result<()> {
472 match *self.db.read() {
473 Some(ref cfs) => {
474 let batch = WriteBatch::default();
475 mem::swap(
476 &mut *self.overlay.write(),
477 &mut *self.flushing.write(),
478 );
479 {
480 for (c, column) in self.flushing.read().iter().enumerate() {
481 for (key, state) in column.iter() {
482 match *state {
483 KeyState::Delete => {
484 let cf = cfs.get_cf(c);
485 batch
486 .delete_cf(cf, key)
487 .map_err(other_io_err)?;
488 }
489 KeyState::Insert(ref value) => {
490 let cf = cfs.get_cf(c);
491 batch
492 .put_cf(cf, key, value)
493 .map_err(other_io_err)?;
494 }
495 }
496 }
497 }
498 }
499
500 check_for_corruption(
501 &self.path,
502 cfs.db.write_opt(&batch, &self.write_opts),
503 )?;
504
505 for column in self.flushing.write().iter_mut() {
506 column.clear();
507 column.shrink_to_fit();
508 }
509 Ok(())
510 }
511 None => Err(other_io_err("Database is closed")),
512 }
513 }
514
515 pub fn flush(&self) -> io::Result<()> {
517 let mut lock = self.flushing_lock.lock();
518 if *lock {
522 return Err(other_io_err(
525 "Database write failure. Running low on memory perhaps?",
526 ));
527 }
528 *lock = true;
529 let result = self.write_flushing_with_lock(&mut lock);
530 *lock = false;
531 result
532 }
533
534 pub fn write(&self, tr: DBTransaction) -> io::Result<()> {
536 match *self.db.read() {
537 Some(ref cfs) => {
538 let batch = WriteBatch::default();
539 let ops = tr.ops;
540 for op in ops {
541 self.overlay.write()[op.col() as usize].remove(op.key());
543
544 match op {
545 DBOp::Insert { col, key, value } => batch
546 .put_cf(cfs.get_cf(col as usize), &key, &value)
547 .map_err(other_io_err)?,
548 DBOp::Delete { col, key } => batch
549 .delete_cf(cfs.get_cf(col as usize), &key)
550 .map_err(other_io_err)?,
551 DBOp::DeletePrefix { .. } => {
552 unimplemented!("DeletePrefix is not supported")
553 }
554 }
555 }
556
557 check_for_corruption(
558 &self.path,
559 cfs.db.write_opt(&batch, &self.write_opts),
560 )
561 }
562 None => Err(other_io_err("Database is closed")),
563 }
564 }
565
566 pub fn get(&self, col: u32, key: &[u8]) -> io::Result<Option<DBValue>> {
568 match *self.db.read() {
569 Some(ref cfs) => {
570 let overlay = &self.overlay.read()[col as usize];
571 match overlay.get(key) {
572 Some(&KeyState::Insert(ref value)) => {
573 Ok(Some(value.clone()))
574 }
575 Some(&KeyState::Delete) => Ok(None),
576 None => {
577 let flushing = &self.flushing.read()[col as usize];
578 match flushing.get(key) {
579 Some(&KeyState::Insert(ref value)) => {
580 Ok(Some(value.clone()))
581 }
582 Some(&KeyState::Delete) => Ok(None),
583 None => cfs
584 .db
585 .get_cf_opt(
586 cfs.get_cf(col as usize),
587 key,
588 &self.read_opts,
589 )
590 .map(|r| r.map(|v| v.to_vec()))
591 .map_err(other_io_err),
592 }
593 }
594 }
595 }
596 None => Ok(None),
597 }
598 }
599
600 fn close(&self) {
602 *self.db.write() = None;
603 self.overlay.write().clear();
604 self.flushing.write().clear();
605 }
606
607 pub fn restore(&self, new_db: &str) -> io::Result<()> {
609 self.close();
610
611 match swap(new_db, &self.path) {
613 Ok(_) => {
614 let _ = fs::remove_dir_all(new_db);
616 }
617 Err(err) => {
618 warn!("DB atomic swap failed: {}", err);
619 match swap_nonatomic(new_db, &self.path) {
620 Ok(_) => {
621 let _ = fs::remove_dir_all(new_db);
623 }
624 Err(err) => {
625 warn!("Failed to swap DB directories: {:?}", err);
626 return Err(io::Error::new(
627 io::ErrorKind::Other,
628 "DB restoration failed: could not swap DB directories",
629 ));
630 }
631 }
632 }
633 }
634
635 let db = Self::open(&self.config, &self.path)?;
637 *self.db.write() = mem::replace(&mut *db.db.write(), None);
638 *self.overlay.write() =
639 mem::replace(&mut *db.overlay.write(), Vec::new());
640 *self.flushing.write() =
641 mem::replace(&mut *db.flushing.write(), Vec::new());
642 Ok(())
643 }
644
645 pub fn num_columns(&self) -> u32 {
647 self.db
648 .read()
649 .as_ref()
650 .and_then(|db| {
651 if db.column_names.is_empty() {
652 None
653 } else {
654 Some(db.column_names.len())
655 }
656 })
657 .map(|n| n as u32)
658 .unwrap_or(0)
659 }
660
661 pub fn drop_column(&self) -> io::Result<()> {
663 match *self.db.write() {
664 Some(DBAndColumns {
665 ref mut db,
666 ref mut column_names,
667 }) => {
668 if let Some(name) = column_names.pop() {
669 db.drop_cf(&name).map_err(other_io_err)?;
670 }
671 Ok(())
672 }
673 None => Ok(()),
674 }
675 }
676
677 pub fn add_column(&self) -> io::Result<()> {
679 match *self.db.write() {
680 Some(DBAndColumns {
681 ref mut db,
682 ref mut column_names,
683 }) => {
684 let col = column_names.len() as u32;
685 let name = format!("col{}", col);
686 db.create_cf((
687 name.as_str(),
688 col_config(&self.config, &self.block_opts)?,
689 ))
690 .map_err(other_io_err)?;
691 column_names.push(name);
692 Ok(())
693 }
694 None => Ok(()),
695 }
696 }
697}
698
699impl KeyValueDB for Database {
702 fn get(&self, col: u32, key: &[u8]) -> io::Result<Option<DBValue>> {
703 Database::get(self, col, key)
704 }
705
706 fn get_by_prefix(
707 &self, _col: u32, _prefix: &[u8],
708 ) -> io::Result<Option<DBValue>> {
709 unimplemented!()
710 }
711
712 fn write(&self, transaction: DBTransaction) -> io::Result<()> {
713 Database::write(self, transaction)
714 }
715
716 fn iter<'a>(
717 &'a self, _col: u32,
718 ) -> Box<dyn Iterator<Item = io::Result<kvdb::DBKeyValue>> + 'a> {
719 unimplemented!()
720 }
721
722 fn iter_with_prefix<'a>(
723 &'a self, _col: u32, _prefix: &'a [u8],
724 ) -> Box<dyn Iterator<Item = io::Result<kvdb::DBKeyValue>> + 'a> {
725 unimplemented!()
726 }
727}
728
729impl Drop for Database {
730 fn drop(&mut self) {
731 let _ = self.flush();
733 }
734}
735
736#[cfg(test)]
737mod tests {
738 use super::*;
739 use cfx_types::H256;
740 use std::str::FromStr;
741 use tempfile::tempdir;
742
743 fn test_db(config: &DatabaseConfig) {
744 let tempdir = tempdir().unwrap();
745 let db =
746 Database::open(config, tempdir.path().to_str().unwrap()).unwrap();
747 let key1 = H256::from_str(
748 "02c69be41d0b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc",
749 )
750 .unwrap();
751 let key2 = H256::from_str(
752 "03c69be41d0b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc",
753 )
754 .unwrap();
755 let key3 = H256::from_str(
756 "01c69be41d0b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc",
757 )
758 .unwrap();
759
760 let mut batch = db.transaction();
761 batch.put(0, key1.as_bytes(), b"cat");
762 batch.put(0, key2.as_bytes(), b"dog");
763 db.write(batch).unwrap();
764
765 assert_eq!(&*db.get(0, key1.as_bytes()).unwrap().unwrap(), b"cat");
766
767 let mut batch = db.transaction();
776 batch.delete(0, key1.as_bytes());
777 db.write(batch).unwrap();
778
779 assert!(db.get(0, key1.as_bytes()).unwrap().is_none());
780
781 let mut batch = db.transaction();
782 batch.put(0, key1.as_bytes(), b"cat");
783 db.write(batch).unwrap();
784
785 let mut transaction = db.transaction();
786 transaction.put(0, key3.as_bytes(), b"elephant");
787 transaction.delete(0, key1.as_bytes());
788 db.write(transaction).unwrap();
789 assert!(db.get(0, key1.as_bytes()).unwrap().is_none());
790 assert_eq!(&*db.get(0, key3.as_bytes()).unwrap().unwrap(), b"elephant");
791
792 let mut transaction = db.transaction();
801 transaction.put(0, key1.as_bytes(), b"horse");
802 transaction.delete(0, key3.as_bytes());
803 db.write_buffered(transaction);
804 assert!(db.get(0, key3.as_bytes()).unwrap().is_none());
805 assert_eq!(&*db.get(0, key1.as_bytes()).unwrap().unwrap(), b"horse");
806
807 db.flush().unwrap();
808 assert!(db.get(0, key3.as_bytes()).unwrap().is_none());
809 assert_eq!(&*db.get(0, key1.as_bytes()).unwrap().unwrap(), b"horse");
810 }
811
812 #[test]
813 fn kvdb() {
814 let tempdir = tempdir().unwrap();
815 let _ =
816 Database::open_default(tempdir.path().to_str().unwrap()).unwrap();
817 test_db(&DatabaseConfig::default());
818 }
819
820 #[test]
821 #[cfg(target_os = "linux")]
822 fn df_to_rotational() {
823 use std::path::PathBuf;
824 let example_df = vec![
826 70, 105, 108, 101, 115, 121, 115, 116, 101, 109, 32, 32, 32, 32,
827 32, 49, 75, 45, 98, 108, 111, 99, 107, 115, 32, 32, 32, 32, 32, 85,
828 115, 101, 100, 32, 65, 118, 97, 105, 108, 97, 98, 108, 101, 32, 85,
829 115, 101, 37, 32, 77, 111, 117, 110, 116, 101, 100, 32, 111, 110,
830 10, 47, 100, 101, 118, 47, 115, 100, 97, 49, 32, 32, 32, 32, 32,
831 32, 32, 54, 49, 52, 48, 57, 51, 48, 48, 32, 51, 56, 56, 50, 50, 50,
832 51, 54, 32, 32, 49, 57, 52, 52, 52, 54, 49, 54, 32, 32, 54, 55, 37,
833 32, 47, 10,
834 ];
835 let expected_output =
836 Some(PathBuf::from("/sys/block/sda/queue/rotational"));
837 assert_eq!(rotational_from_df_output(example_df), expected_output);
838 }
839
840 #[test]
841 fn add_columns() {
842 let config = DatabaseConfig::default();
843 let config_5 = DatabaseConfig::with_columns(5);
844
845 let tempdir = tempdir().unwrap();
846
847 {
849 let db = Database::open(&config, tempdir.path().to_str().unwrap())
850 .unwrap();
851 assert_eq!(db.num_columns(), 1);
852
853 for i in 0..4 {
854 db.add_column().unwrap();
855 assert_eq!(db.num_columns(), i + 2);
856 }
857 }
858
859 {
861 let db =
862 Database::open(&config_5, tempdir.path().to_str().unwrap())
863 .unwrap();
864 assert_eq!(db.num_columns(), 5);
865 }
866 }
867
868 #[test]
869 fn drop_columns() {
870 let config = DatabaseConfig::default();
871 let config_5 = DatabaseConfig::with_columns(5);
872
873 let tempdir = tempdir().unwrap();
874
875 {
877 let db =
878 Database::open(&config_5, tempdir.path().to_str().unwrap())
879 .unwrap();
880 assert_eq!(db.num_columns(), 5);
881
882 for i in (0..5).rev() {
883 db.drop_column().unwrap();
884 assert_eq!(db.num_columns(), i);
885 }
886 }
887
888 {
890 let db = Database::open(&config, tempdir.path().to_str().unwrap())
891 .unwrap();
892 assert_eq!(db.num_columns(), 1);
893 }
894 }
895
896 #[test]
897 fn write_clears_buffered_ops() {
898 let tempdir = tempdir().unwrap();
899 let config = DatabaseConfig::default();
900 let db =
901 Database::open(&config, tempdir.path().to_str().unwrap()).unwrap();
902
903 let mut batch = db.transaction();
904 batch.put(0, b"foo", b"bar");
905 db.write_buffered(batch);
906
907 let mut batch = db.transaction();
908 batch.put(0, b"foo", b"baz");
909 db.write(batch).unwrap();
910
911 assert_eq!(db.get(0, b"foo").unwrap().unwrap(), b"baz");
912 }
913
914 #[test]
915 fn test_memory_property() {
916 let tempdir = tempdir().unwrap();
917 let db = Database::open(
918 &DatabaseConfig::default(),
919 tempdir.path().to_str().unwrap(),
920 )
921 .unwrap();
922 let key1 = H256::from_str(
923 "02c69be41d0b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc",
924 )
925 .unwrap();
926 let mut tx = db.transaction();
927 tx.put(0, key1.as_bytes(), b"123");
928 db.write(tx).unwrap();
929 db.flush().unwrap();
930 let db_locked = db.db.read();
931 let db_and_col = db_locked.as_ref().unwrap();
932 assert!(db_and_col
933 .static_property_or_warn(0, "rocksdb.block-cache-usage")
934 .is_some());
935 assert!(db_and_col
936 .static_property_or_warn(0, "rocksdb.estimate-table-readers-mem",)
937 .is_some());
938 assert!(db_and_col
939 .static_property_or_warn(0, "rocksdb.cur-size-all-mem-tables")
940 .is_some());
941 assert!(db_and_col
942 .static_property_or_warn(0, "rocksdb.fake-property")
943 .is_none());
944 }
945}