log_device/
lib.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use byteorder::{ByteOrder, LittleEndian};
6use db::SystemDB;
7use kvdb::DBTransaction;
8use parking_lot::Mutex;
9use rlp::Rlp;
10use rlp_derive::{RlpDecodable, RlpEncodable};
11use std::{
12    collections::HashMap,
13    fs::{File, OpenOptions},
14    io::{Error, Read, Seek, SeekFrom, Write},
15    path::PathBuf,
16    sync::Arc,
17};
18
19// database columns for rocksdb
20/// Column for miscellaneous items
21const COL_DB: u32 = 0;
22/// Number of columns in DB
23const NUM_COLUMNS: u32 = 1;
24
25const DB_KEY_LOG_DEVICE_NUM: &[u8] = b"log_device_num";
26
27const NUM_OF_STRIPES_PER_SEGMENT: u64 = 2000;
28const META_DATA_DB_DIR: &str = "meta_db";
29const LOG_DEVICE_DIR_PREFIX: &str = "log_device_";
30const SEGMENT_FILE_NAME_PREFIX: &str = "segment_";
31
32/// Here is the folder structure of log devices.
33/// path_dir/meta_db/
34///         /log_device_0/segment_0
35///                      /segment_1
36///                      /...
37///         /log_device_1/segment_0
38///                      /segment_1
39///                      /...
40///         /...
41
42/// There is an independent file for each segment in a log.
43/// This is to facilitate log trimming in garbage collection.
44
45#[derive(Clone, Copy, Debug, Default, RlpDecodable, RlpEncodable)]
46pub struct StripeReference {
47    /// Segment id of the stripe.
48    segment_id: u64,
49    /// Offset in bytes of the stripe in the segment.
50    offset: u64,
51}
52
53#[derive(Clone, Copy, Debug, Default, RlpDecodable, RlpEncodable)]
54pub struct StripeInfo {
55    /// The reference to the stripe.
56    stripe_ref: StripeReference,
57    /// The id of the stripe in the segment.
58    stripe_id: u64,
59}
60
61pub struct LogDeviceManager {
62    path_dir: PathBuf,
63    db: Arc<SystemDB>,
64    devices: Mutex<Vec<Arc<LogDevice>>>,
65}
66
67impl LogDeviceManager {
68    pub fn new(path_dir: PathBuf) -> Self {
69        let mut db_dir_path = path_dir.clone();
70        db_dir_path.push(META_DATA_DB_DIR);
71        let db_config = db::db_config(
72            &db_dir_path,
73            None,
74            db::DatabaseCompactionProfile::default(),
75            NUM_COLUMNS.clone(),
76            false, /* disable_wal */
77        );
78
79        let db = db::open_database(db_dir_path.to_str().unwrap(), &db_config)
80            .unwrap();
81
82        let mut log_device_manager = LogDeviceManager {
83            path_dir,
84            db,
85            devices: Mutex::new(Vec::new()),
86        };
87        log_device_manager.initialize();
88        log_device_manager
89    }
90
91    fn initialize(&mut self) {
92        let device_num = self.get_device_num_from_db();
93        let mut devices = self.devices.lock();
94        for i in 0..device_num {
95            let mut log_device_filename = String::from(LOG_DEVICE_DIR_PREFIX);
96            log_device_filename.push_str(i.to_string().as_str());
97            let mut device_path_dir = self.path_dir.clone();
98            device_path_dir.push(log_device_filename.as_str());
99            let log_device = LogDevice::new(
100                device_path_dir,
101                i,
102                self.db.clone(),
103                true, /* open */
104            );
105            devices.push(Arc::new(log_device));
106        }
107    }
108
109    fn get_device_num_from_db(&self) -> usize {
110        let res = self
111            .db
112            .key_value()
113            .get(COL_DB, DB_KEY_LOG_DEVICE_NUM)
114            .expect("Low level database error.");
115        let device_num = match res {
116            Some(value) => LittleEndian::read_u64(&value) as usize,
117            None => 0,
118        };
119
120        device_num
121    }
122
123    fn set_device_num_to_db(&self, device_num: usize) {
124        let mut tx = DBTransaction::new();
125        let mut value = [0; 8];
126        LittleEndian::write_u64(&mut value[0..8], device_num as u64);
127        tx.put(COL_DB, DB_KEY_LOG_DEVICE_NUM, &value);
128        self.db.key_value().write(tx).expect("DB write failed.");
129    }
130
131    pub fn get_device_num(&self) -> usize { self.devices.lock().len() }
132
133    pub fn get_device(&self, device_id: usize) -> Option<Arc<LogDevice>> {
134        Some(self.devices.lock().get(device_id)?.clone())
135    }
136
137    pub fn create_new_device(&self) -> usize {
138        let new_device_id = self.get_device_num();
139        let mut log_device_filename = String::from(LOG_DEVICE_DIR_PREFIX);
140        log_device_filename.push_str(new_device_id.to_string().as_str());
141        let mut device_path_dir = self.path_dir.clone();
142        device_path_dir.push(log_device_filename.as_str());
143        let log_device = LogDevice::new(
144            device_path_dir,
145            new_device_id,
146            self.db.clone(),
147            false, /* open */
148        );
149        self.devices.lock().push(Arc::new(log_device));
150        let new_device_num = new_device_id + 1;
151        self.set_device_num_to_db(new_device_num);
152        self.db.key_value().flush().expect("DB flush failed.");
153        new_device_id
154    }
155}
156
157pub struct LogDevice {
158    device_id: usize,
159    tail_db_key: String,
160    head_db_key: String,
161    db: Arc<SystemDB>,
162    inner: Mutex<LogDeviceInner>,
163}
164
165impl LogDevice {
166    pub fn new(
167        path_dir: PathBuf, device_id: usize, db: Arc<SystemDB>, open: bool,
168    ) -> Self {
169        let mut log_device = LogDevice {
170            device_id,
171            tail_db_key: String::default(),
172            head_db_key: String::default(),
173            db: db.clone(),
174            inner: Mutex::new(LogDeviceInner::new(path_dir)),
175        };
176
177        log_device.tail_db_key = log_device.get_tail_key();
178        log_device.head_db_key = log_device.get_head_key();
179        let (head, tail) = if open {
180            let tail = log_device
181                .get_stripe_info_from_db(log_device.tail_db_key.as_bytes())
182                .unwrap();
183            let head = log_device
184                .get_stripe_info_from_db(log_device.head_db_key.as_bytes())
185                .unwrap();
186            (head, tail)
187        } else {
188            let tail = StripeInfo {
189                stripe_ref: StripeReference {
190                    segment_id: 0,
191                    offset: 0,
192                },
193                stripe_id: 0,
194            };
195
196            let head = tail;
197            log_device.set_stripe_info_to_db(
198                log_device.tail_db_key.as_bytes(),
199                &tail,
200            );
201            log_device.set_stripe_info_to_db(
202                log_device.head_db_key.as_bytes(),
203                &head,
204            );
205            db.key_value().flush().expect("DB flush failed.");
206            (head, tail)
207        };
208
209        log_device.inner.lock().initialize(head, tail);
210        log_device
211    }
212
213    fn get_tail_key(&self) -> String {
214        let mut tail_key = String::from(LOG_DEVICE_DIR_PREFIX);
215        tail_key.push_str(self.device_id.to_string().as_str());
216        tail_key.push_str("_tail");
217        tail_key
218    }
219
220    fn get_head_key(&self) -> String {
221        let mut head_key = String::from(LOG_DEVICE_DIR_PREFIX);
222        head_key.push_str(self.device_id.to_string().as_str());
223        head_key.push_str("_head");
224        head_key
225    }
226
227    fn get_stripe_info_from_db(&self, key: &[u8]) -> Option<StripeInfo> {
228        let res = self
229            .db
230            .key_value()
231            .get(COL_DB, key)
232            .expect("Low level database error.");
233        match res {
234            Some(value) => {
235                let rlp = Rlp::new(&value);
236                let stripe_info: StripeInfo = rlp.as_val().expect("rlp error");
237                Some(stripe_info)
238            }
239            None => None,
240        }
241    }
242
243    fn set_stripe_info_to_db(&self, key: &[u8], stripe_info: &StripeInfo) {
244        let value = rlp::encode(stripe_info);
245        let mut tx = DBTransaction::new();
246        tx.put(COL_DB, key, value.as_slice());
247        self.db.key_value().write(tx).expect("DB write failed.");
248    }
249
250    pub fn append_stripe(&self, stripe: &[u8]) -> Result<StripeInfo, Error> {
251        let (appended_stripe, tail) =
252            self.inner.lock().append_stripe(stripe)?;
253        self.set_stripe_info_to_db(self.tail_db_key.as_bytes(), &tail);
254        self.db.key_value().flush()?;
255        Ok(appended_stripe)
256    }
257
258    pub fn get_stripe(
259        &self, stripe_ref: &StripeReference,
260    ) -> Result<Vec<u8>, Error> {
261        self.inner.lock().get_stripe(stripe_ref)
262    }
263
264    pub fn trim(&self, stripe: &StripeInfo) {
265        let mut inner = self.inner.lock();
266        let new_head = inner.check_trim(stripe.stripe_ref.segment_id);
267        if new_head.is_some() {
268            let new_head = new_head.unwrap();
269            self.set_stripe_info_to_db(self.head_db_key.as_bytes(), &new_head);
270            self.db.key_value().flush().expect("DB flush failed.");
271            inner.trim(&new_head);
272        }
273    }
274
275    pub fn segment_to_file_name(segment_id: u64) -> String {
276        let mut filename = String::from(SEGMENT_FILE_NAME_PREFIX);
277        filename.push_str(segment_id.to_string().as_str());
278        filename
279    }
280}
281
282struct LogDeviceInner {
283    /// The info of the next to-be-appended stripe in this log.
284    tail: StripeInfo,
285    /// The info of the head stripe of the log.
286    /// Only segment information matters in head stripe info.
287    head: StripeInfo,
288    /// The directory where the log device files locate.
289    path_dir: PathBuf,
290    /// The cache maintaining the opened segment files.
291    /// FIXME: make it lru.
292    file_cache: HashMap<u64, File>,
293}
294
295impl LogDeviceInner {
296    pub fn new(path_dir: PathBuf) -> Self {
297        LogDeviceInner {
298            tail: StripeInfo::default(),
299            head: StripeInfo::default(),
300            path_dir,
301            file_cache: HashMap::new(),
302        }
303    }
304
305    fn initialize(&mut self, head: StripeInfo, tail: StripeInfo) {
306        self.head = head;
307        self.tail = tail;
308
309        // Open and check the file holding the tail.
310        let segment_path =
311            self.segment_to_path(self.tail.stripe_ref.segment_id);
312        let create = if segment_path.exists() {
313            false
314        } else {
315            assert_eq!(self.tail.stripe_ref.segment_id, 0);
316            assert_eq!(self.tail.stripe_ref.offset, 0);
317            assert_eq!(self.tail.stripe_id, 0);
318            std::fs::create_dir_all(&self.path_dir)
319                .expect("Failed to create log_device dir.");
320            true
321        };
322        let mut segment_file = OpenOptions::new()
323            .read(true)
324            .write(true)
325            .create_new(create)
326            .open(&segment_path)
327            .expect("Failed to open segment file.");
328        let offset = segment_file
329            .seek(SeekFrom::Start(self.tail.stripe_ref.offset))
330            .expect("Failed to seek segment file.");
331        assert_eq!(offset, self.tail.stripe_ref.offset);
332        self.file_cache
333            .insert(self.tail.stripe_ref.segment_id, segment_file);
334    }
335
336    fn segment_to_path(&self, segment: u64) -> PathBuf {
337        let segment_filename = LogDevice::segment_to_file_name(segment);
338        let mut segment_path = self.path_dir.clone();
339        segment_path.push(segment_filename.as_str());
340        segment_path
341    }
342
343    pub fn append_stripe(
344        &mut self, stripe: &[u8],
345    ) -> Result<(StripeInfo, StripeInfo), Error> {
346        // Check size.
347        let payload_size = LittleEndian::read_u32(&stripe[0..4]) as usize;
348        assert_eq!(payload_size + 4, stripe.len(), "Incorrect payload size.");
349
350        if self.tail.stripe_id == NUM_OF_STRIPES_PER_SEGMENT {
351            // The current segment is full. Need to create new segment.
352            self.tail.stripe_ref.segment_id += 1;
353            self.tail.stripe_ref.offset = 0;
354            self.tail.stripe_id = 0;
355
356            // Create new segment file.
357            let segment_path =
358                self.segment_to_path(self.tail.stripe_ref.segment_id);
359            let segment_file = OpenOptions::new()
360                .read(true)
361                .write(true)
362                .create_new(true)
363                .open(&segment_path)?;
364            self.file_cache
365                .insert(self.tail.stripe_ref.segment_id, segment_file);
366        }
367
368        // Append stripe to segment file.
369        let segment_file = self
370            .file_cache
371            .get_mut(&self.tail.stripe_ref.segment_id)
372            .unwrap();
373        let write_size = segment_file.write(stripe)?;
374        // FIXME: for better error handling.
375        assert_eq!(write_size, stripe.len());
376        let offset = self.tail.stripe_ref.offset + write_size as u64;
377        assert_eq!(segment_file.seek(SeekFrom::End(0)).unwrap(), offset);
378        segment_file.flush()?;
379
380        let appended_stripe = self.tail;
381        // Update tail information.
382        self.tail.stripe_id += 1;
383        self.tail.stripe_ref.offset = offset;
384        Ok((appended_stripe, self.tail))
385    }
386
387    pub fn get_stripe(
388        &mut self, stripe_ref: &StripeReference,
389    ) -> Result<Vec<u8>, Error> {
390        if !self.file_cache.contains_key(&stripe_ref.segment_id) {
391            // The segment file hasn't been accessed. Open and cache it.
392            let segment_path = self.segment_to_path(stripe_ref.segment_id);
393            let segment_file = OpenOptions::new()
394                .read(true)
395                .write(true)
396                .open(&segment_path)?;
397            self.file_cache.insert(stripe_ref.segment_id, segment_file);
398        }
399
400        let segment_file =
401            self.file_cache.get_mut(&stripe_ref.segment_id).unwrap();
402        let offset = segment_file.seek(SeekFrom::Start(stripe_ref.offset))?;
403        assert_eq!(offset, stripe_ref.offset);
404        let mut stripe: Vec<u8> = Vec::new();
405        stripe.resize(4, 0);
406        let read_size = segment_file.read(&mut stripe[0..4])?;
407        assert_eq!(read_size, 4);
408        let payload_size = LittleEndian::read_u32(&stripe[0..4]) as usize;
409        if payload_size != 0 {
410            stripe.resize(payload_size + 4, 0);
411            let read_size =
412                segment_file.read(&mut stripe[4..4 + payload_size])?;
413            assert_eq!(read_size, payload_size);
414        }
415        Ok(stripe)
416    }
417
418    pub fn check_trim(&self, segment_id: u64) -> Option<StripeInfo> {
419        if segment_id >= self.head.stripe_ref.segment_id
420            && segment_id <= self.tail.stripe_ref.segment_id
421        {
422            Some(StripeInfo {
423                stripe_ref: StripeReference {
424                    segment_id,
425                    offset: 0,
426                },
427                stripe_id: 0,
428            })
429        } else {
430            None
431        }
432    }
433
434    pub fn trim(&mut self, new_head: &StripeInfo) {
435        let old_head = self.head;
436        self.head = *new_head;
437
438        for segment in
439            old_head.stripe_ref.segment_id..self.head.stripe_ref.segment_id
440        {
441            self.file_cache.remove(&segment);
442            let segment_path = self.segment_to_path(segment);
443            if segment_path.exists() {
444                std::fs::remove_file(&segment_path).ok();
445            }
446        }
447    }
448}
449
450#[cfg(test)]
451mod tests {
452    use super::{
453        LogDevice, LogDeviceManager, LOG_DEVICE_DIR_PREFIX,
454        NUM_OF_STRIPES_PER_SEGMENT,
455    };
456    use crate::{StripeInfo, StripeReference};
457    use byteorder::{ByteOrder, LittleEndian};
458    use rand::Rng;
459    use std::{path::PathBuf, sync::Arc};
460
461    fn gen_random_and_append(
462        log_device: Arc<LogDevice>, stripes: &mut Vec<Vec<u8>>,
463        stripe_refs: &mut Vec<StripeReference>, start: usize, end: usize,
464    ) {
465        for i in start..end {
466            let mut stripe: Vec<u8> = Vec::new();
467            let stripe_size = rand::rng().random_range(4..1024 * 64);
468            stripe.resize(stripe_size, i as u8);
469            let payload_size = stripe_size - 4;
470            LittleEndian::write_u32(&mut stripe[0..4], payload_size as u32);
471            let stripe_info = log_device.append_stripe(&stripe).unwrap();
472            stripes.push(stripe);
473            stripe_refs.push(stripe_info.stripe_ref);
474        }
475    }
476
477    fn read_and_check(
478        log_device: Arc<LogDevice>, stripes: &Vec<Vec<u8>>,
479        stripe_refs: &Vec<StripeReference>, start: usize, end: usize,
480    ) {
481        for i in start..end {
482            let stripe = &stripes[i];
483            let stripe_ref = &stripe_refs[i];
484            let read_stripe = log_device
485                .get_stripe(stripe_ref)
486                .expect("Failed to read stripe");
487            let matching = stripe
488                .iter()
489                .zip(read_stripe.iter())
490                .filter(|&(a, b)| a == b)
491                .count();
492            assert_eq!(matching, stripe.len());
493            assert_eq!(matching, read_stripe.len());
494        }
495    }
496
497    fn create_and_append(
498        stripes: &mut Vec<Vec<u8>>, stripe_refs: &mut Vec<StripeReference>,
499    ) {
500        let path_dir = String::from("./ldm_open");
501        let path_dir = PathBuf::from(path_dir);
502        std::fs::remove_dir_all(&path_dir).ok();
503        std::fs::create_dir_all(&path_dir).ok();
504        let log_device_manager = LogDeviceManager::new(path_dir.clone());
505        assert_eq!(log_device_manager.get_device_num(), 0);
506        let device_id = log_device_manager.create_new_device();
507        assert_eq!(log_device_manager.get_device_num(), 1);
508        assert_eq!(log_device_manager.get_device_num_from_db(), 1);
509        let log_device = log_device_manager.get_device(device_id).unwrap();
510
511        gen_random_and_append(log_device.clone(), stripes, stripe_refs, 0, 10);
512        read_and_check(log_device.clone(), stripes, stripe_refs, 0, 10);
513    }
514
515    fn open_and_append_and_read(
516        stripes: &mut Vec<Vec<u8>>, stripe_refs: &mut Vec<StripeReference>,
517    ) {
518        let path_dir = String::from("./ldm_open");
519        let path_dir = PathBuf::from(path_dir);
520        let log_device_manager = LogDeviceManager::new(path_dir.clone());
521        assert_eq!(log_device_manager.get_device_num(), 1);
522        let log_device = log_device_manager.get_device(0).unwrap();
523
524        gen_random_and_append(log_device.clone(), stripes, stripe_refs, 10, 20);
525        read_and_check(log_device.clone(), stripes, stripe_refs, 0, 20);
526        std::fs::remove_dir_all(&path_dir).ok();
527    }
528
529    #[test]
530    fn test_open_log_device() {
531        let mut stripes = Vec::new();
532        let mut stripe_refs = Vec::new();
533
534        create_and_append(&mut stripes, &mut stripe_refs);
535        open_and_append_and_read(&mut stripes, &mut stripe_refs);
536    }
537
538    #[test]
539    fn test_append_log_device() {
540        let path_dir = String::from("./ldm_append");
541        let path_dir = PathBuf::from(path_dir);
542        std::fs::remove_dir_all(&path_dir).ok();
543        std::fs::create_dir_all(&path_dir).ok();
544        let log_device_manager = LogDeviceManager::new(path_dir.clone());
545        assert_eq!(log_device_manager.get_device_num(), 0);
546        let device_id = log_device_manager.create_new_device();
547        assert_eq!(log_device_manager.get_device_num(), 1);
548        let log_device = log_device_manager.get_device(device_id).unwrap();
549        let mut stripes = Vec::new();
550        let mut stripe_refs = Vec::new();
551
552        gen_random_and_append(
553            log_device.clone(),
554            &mut stripes,
555            &mut stripe_refs,
556            0,
557            10,
558        );
559        read_and_check(log_device.clone(), &stripes, &stripe_refs, 0, 10);
560        std::fs::remove_dir_all(&path_dir).ok();
561    }
562
563    #[test]
564    fn test_trim_log_device() {
565        let path_dir = String::from("./ldm_trim");
566        let path_dir = PathBuf::from(path_dir);
567        std::fs::remove_dir_all(&path_dir).ok();
568        std::fs::create_dir_all(&path_dir).ok();
569        let log_device_manager = LogDeviceManager::new(path_dir.clone());
570        assert_eq!(log_device_manager.get_device_num(), 0);
571        let device_id = log_device_manager.create_new_device();
572        assert_eq!(log_device_manager.get_device_num(), 1);
573        let log_device = log_device_manager.get_device(device_id).unwrap();
574        let mut stripes = Vec::new();
575        let mut stripe_refs = Vec::new();
576
577        gen_random_and_append(
578            log_device.clone(),
579            &mut stripes,
580            &mut stripe_refs,
581            0,
582            4 * NUM_OF_STRIPES_PER_SEGMENT as usize,
583        );
584
585        let mut log_device_path_dir = path_dir.clone();
586        let mut log_device_dir = String::from(LOG_DEVICE_DIR_PREFIX);
587        log_device_dir.push_str("0");
588        log_device_path_dir.push(log_device_dir.as_str());
589
590        let mut segment_0_path = log_device_path_dir.clone();
591        segment_0_path.push("segment_0");
592        let mut segment_1_path = log_device_path_dir.clone();
593        segment_1_path.push("segment_1");
594        let mut segment_2_path = log_device_path_dir.clone();
595        segment_2_path.push("segment_2");
596        let mut segment_3_path = log_device_path_dir.clone();
597        segment_3_path.push("segment_3");
598
599        assert!(segment_0_path.exists());
600        assert!(segment_1_path.exists());
601        assert!(segment_2_path.exists());
602        assert!(segment_3_path.exists());
603
604        let strip_info = StripeInfo {
605            stripe_ref: StripeReference {
606                segment_id: 2,
607                offset: 0,
608            },
609            stripe_id: 0,
610        };
611        log_device.trim(&strip_info);
612
613        assert!(!segment_0_path.exists());
614        assert!(!segment_1_path.exists());
615        assert!(segment_2_path.exists());
616        assert!(segment_3_path.exists());
617
618        read_and_check(
619            log_device.clone(),
620            &stripes,
621            &stripe_refs,
622            2 * NUM_OF_STRIPES_PER_SEGMENT as usize,
623            4 * NUM_OF_STRIPES_PER_SEGMENT as usize,
624        );
625
626        std::fs::remove_dir_all(&path_dir).ok();
627    }
628
629    #[test]
630    fn test_create_log_device() {
631        let path_dir = String::from("./ldm_create");
632        let path_dir = PathBuf::from(path_dir);
633        std::fs::remove_dir_all(&path_dir).ok();
634        std::fs::create_dir_all(&path_dir).ok();
635        let log_device_manager = LogDeviceManager::new(path_dir.clone());
636        assert_eq!(log_device_manager.get_device_num(), 0);
637        assert_eq!(log_device_manager.get_device_num_from_db(), 0);
638        log_device_manager.create_new_device();
639        assert_eq!(log_device_manager.get_device_num(), 1);
640        assert_eq!(log_device_manager.get_device_num_from_db(), 1);
641        log_device_manager.create_new_device();
642        assert_eq!(log_device_manager.get_device_num(), 2);
643        assert_eq!(log_device_manager.get_device_num_from_db(), 2);
644        log_device_manager.create_new_device();
645        assert_eq!(log_device_manager.get_device_num(), 3);
646        assert_eq!(log_device_manager.get_device_num_from_db(), 3);
647        log_device_manager.create_new_device();
648        assert_eq!(log_device_manager.get_device_num(), 4);
649        assert_eq!(log_device_manager.get_device_num_from_db(), 4);
650
651        let mut log_device_path_dir = path_dir.clone();
652        let mut log_device_dir = String::from(LOG_DEVICE_DIR_PREFIX);
653        log_device_dir.push_str("0");
654        log_device_path_dir.push(log_device_dir.as_str());
655        let mut segment_0_path = log_device_path_dir.clone();
656        segment_0_path.push("segment_0");
657        assert!(segment_0_path.exists());
658
659        let mut log_device_path_dir = path_dir.clone();
660        let mut log_device_dir = String::from(LOG_DEVICE_DIR_PREFIX);
661        log_device_dir.push_str("1");
662        log_device_path_dir.push(log_device_dir.as_str());
663        let mut segment_0_path = log_device_path_dir.clone();
664        segment_0_path.push("segment_0");
665        assert!(segment_0_path.exists());
666
667        let mut log_device_path_dir = path_dir.clone();
668        let mut log_device_dir = String::from(LOG_DEVICE_DIR_PREFIX);
669        log_device_dir.push_str("2");
670        log_device_path_dir.push(log_device_dir.as_str());
671        let mut segment_0_path = log_device_path_dir.clone();
672        segment_0_path.push("segment_0");
673        assert!(segment_0_path.exists());
674
675        let mut log_device_path_dir = path_dir.clone();
676        let mut log_device_dir = String::from(LOG_DEVICE_DIR_PREFIX);
677        log_device_dir.push_str("3");
678        log_device_path_dir.push(log_device_dir.as_str());
679        let mut segment_0_path = log_device_path_dir.clone();
680        segment_0_path.push("segment_0");
681        assert!(segment_0_path.exists());
682
683        std::fs::remove_dir_all(&path_dir).ok();
684    }
685}