1use byteorder::{ByteOrder, LittleEndian};
6use db::SystemDB;
7use kvdb::DBTransaction;
8use parking_lot::Mutex;
9use rlp::Rlp;
10use rlp_derive::{RlpDecodable, RlpEncodable};
11use std::{
12 collections::HashMap,
13 fs::{File, OpenOptions},
14 io::{Error, Read, Seek, SeekFrom, Write},
15 path::PathBuf,
16 sync::Arc,
17};
18
19const COL_DB: u32 = 0;
22const NUM_COLUMNS: u32 = 1;
24
25const DB_KEY_LOG_DEVICE_NUM: &[u8] = b"log_device_num";
26
27const NUM_OF_STRIPES_PER_SEGMENT: u64 = 2000;
28const META_DATA_DB_DIR: &str = "meta_db";
29const LOG_DEVICE_DIR_PREFIX: &str = "log_device_";
30const SEGMENT_FILE_NAME_PREFIX: &str = "segment_";
31
32#[derive(Clone, Copy, Debug, Default, RlpDecodable, RlpEncodable)]
46pub struct StripeReference {
47 segment_id: u64,
49 offset: u64,
51}
52
53#[derive(Clone, Copy, Debug, Default, RlpDecodable, RlpEncodable)]
54pub struct StripeInfo {
55 stripe_ref: StripeReference,
57 stripe_id: u64,
59}
60
61pub struct LogDeviceManager {
62 path_dir: PathBuf,
63 db: Arc<SystemDB>,
64 devices: Mutex<Vec<Arc<LogDevice>>>,
65}
66
67impl LogDeviceManager {
68 pub fn new(path_dir: PathBuf) -> Self {
69 let mut db_dir_path = path_dir.clone();
70 db_dir_path.push(META_DATA_DB_DIR);
71 let db_config = db::db_config(
72 &db_dir_path,
73 None,
74 db::DatabaseCompactionProfile::default(),
75 NUM_COLUMNS.clone(),
76 false, );
78
79 let db = db::open_database(db_dir_path.to_str().unwrap(), &db_config)
80 .unwrap();
81
82 let mut log_device_manager = LogDeviceManager {
83 path_dir,
84 db,
85 devices: Mutex::new(Vec::new()),
86 };
87 log_device_manager.initialize();
88 log_device_manager
89 }
90
91 fn initialize(&mut self) {
92 let device_num = self.get_device_num_from_db();
93 let mut devices = self.devices.lock();
94 for i in 0..device_num {
95 let mut log_device_filename = String::from(LOG_DEVICE_DIR_PREFIX);
96 log_device_filename.push_str(i.to_string().as_str());
97 let mut device_path_dir = self.path_dir.clone();
98 device_path_dir.push(log_device_filename.as_str());
99 let log_device = LogDevice::new(
100 device_path_dir,
101 i,
102 self.db.clone(),
103 true, );
105 devices.push(Arc::new(log_device));
106 }
107 }
108
109 fn get_device_num_from_db(&self) -> usize {
110 let res = self
111 .db
112 .key_value()
113 .get(COL_DB, DB_KEY_LOG_DEVICE_NUM)
114 .expect("Low level database error.");
115 let device_num = match res {
116 Some(value) => LittleEndian::read_u64(&value) as usize,
117 None => 0,
118 };
119
120 device_num
121 }
122
123 fn set_device_num_to_db(&self, device_num: usize) {
124 let mut tx = DBTransaction::new();
125 let mut value = [0; 8];
126 LittleEndian::write_u64(&mut value[0..8], device_num as u64);
127 tx.put(COL_DB, DB_KEY_LOG_DEVICE_NUM, &value);
128 self.db.key_value().write(tx).expect("DB write failed.");
129 }
130
131 pub fn get_device_num(&self) -> usize { self.devices.lock().len() }
132
133 pub fn get_device(&self, device_id: usize) -> Option<Arc<LogDevice>> {
134 Some(self.devices.lock().get(device_id)?.clone())
135 }
136
137 pub fn create_new_device(&self) -> usize {
138 let new_device_id = self.get_device_num();
139 let mut log_device_filename = String::from(LOG_DEVICE_DIR_PREFIX);
140 log_device_filename.push_str(new_device_id.to_string().as_str());
141 let mut device_path_dir = self.path_dir.clone();
142 device_path_dir.push(log_device_filename.as_str());
143 let log_device = LogDevice::new(
144 device_path_dir,
145 new_device_id,
146 self.db.clone(),
147 false, );
149 self.devices.lock().push(Arc::new(log_device));
150 let new_device_num = new_device_id + 1;
151 self.set_device_num_to_db(new_device_num);
152 self.db.key_value().flush().expect("DB flush failed.");
153 new_device_id
154 }
155}
156
157pub struct LogDevice {
158 device_id: usize,
159 tail_db_key: String,
160 head_db_key: String,
161 db: Arc<SystemDB>,
162 inner: Mutex<LogDeviceInner>,
163}
164
165impl LogDevice {
166 pub fn new(
167 path_dir: PathBuf, device_id: usize, db: Arc<SystemDB>, open: bool,
168 ) -> Self {
169 let mut log_device = LogDevice {
170 device_id,
171 tail_db_key: String::default(),
172 head_db_key: String::default(),
173 db: db.clone(),
174 inner: Mutex::new(LogDeviceInner::new(path_dir)),
175 };
176
177 log_device.tail_db_key = log_device.get_tail_key();
178 log_device.head_db_key = log_device.get_head_key();
179 let (head, tail) = if open {
180 let tail = log_device
181 .get_stripe_info_from_db(log_device.tail_db_key.as_bytes())
182 .unwrap();
183 let head = log_device
184 .get_stripe_info_from_db(log_device.head_db_key.as_bytes())
185 .unwrap();
186 (head, tail)
187 } else {
188 let tail = StripeInfo {
189 stripe_ref: StripeReference {
190 segment_id: 0,
191 offset: 0,
192 },
193 stripe_id: 0,
194 };
195
196 let head = tail;
197 log_device.set_stripe_info_to_db(
198 log_device.tail_db_key.as_bytes(),
199 &tail,
200 );
201 log_device.set_stripe_info_to_db(
202 log_device.head_db_key.as_bytes(),
203 &head,
204 );
205 db.key_value().flush().expect("DB flush failed.");
206 (head, tail)
207 };
208
209 log_device.inner.lock().initialize(head, tail);
210 log_device
211 }
212
213 fn get_tail_key(&self) -> String {
214 let mut tail_key = String::from(LOG_DEVICE_DIR_PREFIX);
215 tail_key.push_str(self.device_id.to_string().as_str());
216 tail_key.push_str("_tail");
217 tail_key
218 }
219
220 fn get_head_key(&self) -> String {
221 let mut head_key = String::from(LOG_DEVICE_DIR_PREFIX);
222 head_key.push_str(self.device_id.to_string().as_str());
223 head_key.push_str("_head");
224 head_key
225 }
226
227 fn get_stripe_info_from_db(&self, key: &[u8]) -> Option<StripeInfo> {
228 let res = self
229 .db
230 .key_value()
231 .get(COL_DB, key)
232 .expect("Low level database error.");
233 match res {
234 Some(value) => {
235 let rlp = Rlp::new(&value);
236 let stripe_info: StripeInfo = rlp.as_val().expect("rlp error");
237 Some(stripe_info)
238 }
239 None => None,
240 }
241 }
242
243 fn set_stripe_info_to_db(&self, key: &[u8], stripe_info: &StripeInfo) {
244 let value = rlp::encode(stripe_info);
245 let mut tx = DBTransaction::new();
246 tx.put(COL_DB, key, value.as_slice());
247 self.db.key_value().write(tx).expect("DB write failed.");
248 }
249
250 pub fn append_stripe(&self, stripe: &[u8]) -> Result<StripeInfo, Error> {
251 let (appended_stripe, tail) =
252 self.inner.lock().append_stripe(stripe)?;
253 self.set_stripe_info_to_db(self.tail_db_key.as_bytes(), &tail);
254 self.db.key_value().flush()?;
255 Ok(appended_stripe)
256 }
257
258 pub fn get_stripe(
259 &self, stripe_ref: &StripeReference,
260 ) -> Result<Vec<u8>, Error> {
261 self.inner.lock().get_stripe(stripe_ref)
262 }
263
264 pub fn trim(&self, stripe: &StripeInfo) {
265 let mut inner = self.inner.lock();
266 let new_head = inner.check_trim(stripe.stripe_ref.segment_id);
267 if new_head.is_some() {
268 let new_head = new_head.unwrap();
269 self.set_stripe_info_to_db(self.head_db_key.as_bytes(), &new_head);
270 self.db.key_value().flush().expect("DB flush failed.");
271 inner.trim(&new_head);
272 }
273 }
274
275 pub fn segment_to_file_name(segment_id: u64) -> String {
276 let mut filename = String::from(SEGMENT_FILE_NAME_PREFIX);
277 filename.push_str(segment_id.to_string().as_str());
278 filename
279 }
280}
281
282struct LogDeviceInner {
283 tail: StripeInfo,
285 head: StripeInfo,
288 path_dir: PathBuf,
290 file_cache: HashMap<u64, File>,
293}
294
295impl LogDeviceInner {
296 pub fn new(path_dir: PathBuf) -> Self {
297 LogDeviceInner {
298 tail: StripeInfo::default(),
299 head: StripeInfo::default(),
300 path_dir,
301 file_cache: HashMap::new(),
302 }
303 }
304
305 fn initialize(&mut self, head: StripeInfo, tail: StripeInfo) {
306 self.head = head;
307 self.tail = tail;
308
309 let segment_path =
311 self.segment_to_path(self.tail.stripe_ref.segment_id);
312 let create = if segment_path.exists() {
313 false
314 } else {
315 assert_eq!(self.tail.stripe_ref.segment_id, 0);
316 assert_eq!(self.tail.stripe_ref.offset, 0);
317 assert_eq!(self.tail.stripe_id, 0);
318 std::fs::create_dir_all(&self.path_dir)
319 .expect("Failed to create log_device dir.");
320 true
321 };
322 let mut segment_file = OpenOptions::new()
323 .read(true)
324 .write(true)
325 .create_new(create)
326 .open(&segment_path)
327 .expect("Failed to open segment file.");
328 let offset = segment_file
329 .seek(SeekFrom::Start(self.tail.stripe_ref.offset))
330 .expect("Failed to seek segment file.");
331 assert_eq!(offset, self.tail.stripe_ref.offset);
332 self.file_cache
333 .insert(self.tail.stripe_ref.segment_id, segment_file);
334 }
335
336 fn segment_to_path(&self, segment: u64) -> PathBuf {
337 let segment_filename = LogDevice::segment_to_file_name(segment);
338 let mut segment_path = self.path_dir.clone();
339 segment_path.push(segment_filename.as_str());
340 segment_path
341 }
342
343 pub fn append_stripe(
344 &mut self, stripe: &[u8],
345 ) -> Result<(StripeInfo, StripeInfo), Error> {
346 let payload_size = LittleEndian::read_u32(&stripe[0..4]) as usize;
348 assert_eq!(payload_size + 4, stripe.len(), "Incorrect payload size.");
349
350 if self.tail.stripe_id == NUM_OF_STRIPES_PER_SEGMENT {
351 self.tail.stripe_ref.segment_id += 1;
353 self.tail.stripe_ref.offset = 0;
354 self.tail.stripe_id = 0;
355
356 let segment_path =
358 self.segment_to_path(self.tail.stripe_ref.segment_id);
359 let segment_file = OpenOptions::new()
360 .read(true)
361 .write(true)
362 .create_new(true)
363 .open(&segment_path)?;
364 self.file_cache
365 .insert(self.tail.stripe_ref.segment_id, segment_file);
366 }
367
368 let segment_file = self
370 .file_cache
371 .get_mut(&self.tail.stripe_ref.segment_id)
372 .unwrap();
373 let write_size = segment_file.write(stripe)?;
374 assert_eq!(write_size, stripe.len());
376 let offset = self.tail.stripe_ref.offset + write_size as u64;
377 assert_eq!(segment_file.seek(SeekFrom::End(0)).unwrap(), offset);
378 segment_file.flush()?;
379
380 let appended_stripe = self.tail;
381 self.tail.stripe_id += 1;
383 self.tail.stripe_ref.offset = offset;
384 Ok((appended_stripe, self.tail))
385 }
386
387 pub fn get_stripe(
388 &mut self, stripe_ref: &StripeReference,
389 ) -> Result<Vec<u8>, Error> {
390 if !self.file_cache.contains_key(&stripe_ref.segment_id) {
391 let segment_path = self.segment_to_path(stripe_ref.segment_id);
393 let segment_file = OpenOptions::new()
394 .read(true)
395 .write(true)
396 .open(&segment_path)?;
397 self.file_cache.insert(stripe_ref.segment_id, segment_file);
398 }
399
400 let segment_file =
401 self.file_cache.get_mut(&stripe_ref.segment_id).unwrap();
402 let offset = segment_file.seek(SeekFrom::Start(stripe_ref.offset))?;
403 assert_eq!(offset, stripe_ref.offset);
404 let mut stripe: Vec<u8> = Vec::new();
405 stripe.resize(4, 0);
406 let read_size = segment_file.read(&mut stripe[0..4])?;
407 assert_eq!(read_size, 4);
408 let payload_size = LittleEndian::read_u32(&stripe[0..4]) as usize;
409 if payload_size != 0 {
410 stripe.resize(payload_size + 4, 0);
411 let read_size =
412 segment_file.read(&mut stripe[4..4 + payload_size])?;
413 assert_eq!(read_size, payload_size);
414 }
415 Ok(stripe)
416 }
417
418 pub fn check_trim(&self, segment_id: u64) -> Option<StripeInfo> {
419 if segment_id >= self.head.stripe_ref.segment_id
420 && segment_id <= self.tail.stripe_ref.segment_id
421 {
422 Some(StripeInfo {
423 stripe_ref: StripeReference {
424 segment_id,
425 offset: 0,
426 },
427 stripe_id: 0,
428 })
429 } else {
430 None
431 }
432 }
433
434 pub fn trim(&mut self, new_head: &StripeInfo) {
435 let old_head = self.head;
436 self.head = *new_head;
437
438 for segment in
439 old_head.stripe_ref.segment_id..self.head.stripe_ref.segment_id
440 {
441 self.file_cache.remove(&segment);
442 let segment_path = self.segment_to_path(segment);
443 if segment_path.exists() {
444 std::fs::remove_file(&segment_path).ok();
445 }
446 }
447 }
448}
449
450#[cfg(test)]
451mod tests {
452 use super::{
453 LogDevice, LogDeviceManager, LOG_DEVICE_DIR_PREFIX,
454 NUM_OF_STRIPES_PER_SEGMENT,
455 };
456 use crate::{StripeInfo, StripeReference};
457 use byteorder::{ByteOrder, LittleEndian};
458 use rand::Rng;
459 use std::{path::PathBuf, sync::Arc};
460
461 fn gen_random_and_append(
462 log_device: Arc<LogDevice>, stripes: &mut Vec<Vec<u8>>,
463 stripe_refs: &mut Vec<StripeReference>, start: usize, end: usize,
464 ) {
465 for i in start..end {
466 let mut stripe: Vec<u8> = Vec::new();
467 let stripe_size = rand::rng().random_range(4..1024 * 64);
468 stripe.resize(stripe_size, i as u8);
469 let payload_size = stripe_size - 4;
470 LittleEndian::write_u32(&mut stripe[0..4], payload_size as u32);
471 let stripe_info = log_device.append_stripe(&stripe).unwrap();
472 stripes.push(stripe);
473 stripe_refs.push(stripe_info.stripe_ref);
474 }
475 }
476
477 fn read_and_check(
478 log_device: Arc<LogDevice>, stripes: &Vec<Vec<u8>>,
479 stripe_refs: &Vec<StripeReference>, start: usize, end: usize,
480 ) {
481 for i in start..end {
482 let stripe = &stripes[i];
483 let stripe_ref = &stripe_refs[i];
484 let read_stripe = log_device
485 .get_stripe(stripe_ref)
486 .expect("Failed to read stripe");
487 let matching = stripe
488 .iter()
489 .zip(read_stripe.iter())
490 .filter(|&(a, b)| a == b)
491 .count();
492 assert_eq!(matching, stripe.len());
493 assert_eq!(matching, read_stripe.len());
494 }
495 }
496
497 fn create_and_append(
498 stripes: &mut Vec<Vec<u8>>, stripe_refs: &mut Vec<StripeReference>,
499 ) {
500 let path_dir = String::from("./ldm_open");
501 let path_dir = PathBuf::from(path_dir);
502 std::fs::remove_dir_all(&path_dir).ok();
503 std::fs::create_dir_all(&path_dir).ok();
504 let log_device_manager = LogDeviceManager::new(path_dir.clone());
505 assert_eq!(log_device_manager.get_device_num(), 0);
506 let device_id = log_device_manager.create_new_device();
507 assert_eq!(log_device_manager.get_device_num(), 1);
508 assert_eq!(log_device_manager.get_device_num_from_db(), 1);
509 let log_device = log_device_manager.get_device(device_id).unwrap();
510
511 gen_random_and_append(log_device.clone(), stripes, stripe_refs, 0, 10);
512 read_and_check(log_device.clone(), stripes, stripe_refs, 0, 10);
513 }
514
515 fn open_and_append_and_read(
516 stripes: &mut Vec<Vec<u8>>, stripe_refs: &mut Vec<StripeReference>,
517 ) {
518 let path_dir = String::from("./ldm_open");
519 let path_dir = PathBuf::from(path_dir);
520 let log_device_manager = LogDeviceManager::new(path_dir.clone());
521 assert_eq!(log_device_manager.get_device_num(), 1);
522 let log_device = log_device_manager.get_device(0).unwrap();
523
524 gen_random_and_append(log_device.clone(), stripes, stripe_refs, 10, 20);
525 read_and_check(log_device.clone(), stripes, stripe_refs, 0, 20);
526 std::fs::remove_dir_all(&path_dir).ok();
527 }
528
529 #[test]
530 fn test_open_log_device() {
531 let mut stripes = Vec::new();
532 let mut stripe_refs = Vec::new();
533
534 create_and_append(&mut stripes, &mut stripe_refs);
535 open_and_append_and_read(&mut stripes, &mut stripe_refs);
536 }
537
538 #[test]
539 fn test_append_log_device() {
540 let path_dir = String::from("./ldm_append");
541 let path_dir = PathBuf::from(path_dir);
542 std::fs::remove_dir_all(&path_dir).ok();
543 std::fs::create_dir_all(&path_dir).ok();
544 let log_device_manager = LogDeviceManager::new(path_dir.clone());
545 assert_eq!(log_device_manager.get_device_num(), 0);
546 let device_id = log_device_manager.create_new_device();
547 assert_eq!(log_device_manager.get_device_num(), 1);
548 let log_device = log_device_manager.get_device(device_id).unwrap();
549 let mut stripes = Vec::new();
550 let mut stripe_refs = Vec::new();
551
552 gen_random_and_append(
553 log_device.clone(),
554 &mut stripes,
555 &mut stripe_refs,
556 0,
557 10,
558 );
559 read_and_check(log_device.clone(), &stripes, &stripe_refs, 0, 10);
560 std::fs::remove_dir_all(&path_dir).ok();
561 }
562
563 #[test]
564 fn test_trim_log_device() {
565 let path_dir = String::from("./ldm_trim");
566 let path_dir = PathBuf::from(path_dir);
567 std::fs::remove_dir_all(&path_dir).ok();
568 std::fs::create_dir_all(&path_dir).ok();
569 let log_device_manager = LogDeviceManager::new(path_dir.clone());
570 assert_eq!(log_device_manager.get_device_num(), 0);
571 let device_id = log_device_manager.create_new_device();
572 assert_eq!(log_device_manager.get_device_num(), 1);
573 let log_device = log_device_manager.get_device(device_id).unwrap();
574 let mut stripes = Vec::new();
575 let mut stripe_refs = Vec::new();
576
577 gen_random_and_append(
578 log_device.clone(),
579 &mut stripes,
580 &mut stripe_refs,
581 0,
582 4 * NUM_OF_STRIPES_PER_SEGMENT as usize,
583 );
584
585 let mut log_device_path_dir = path_dir.clone();
586 let mut log_device_dir = String::from(LOG_DEVICE_DIR_PREFIX);
587 log_device_dir.push_str("0");
588 log_device_path_dir.push(log_device_dir.as_str());
589
590 let mut segment_0_path = log_device_path_dir.clone();
591 segment_0_path.push("segment_0");
592 let mut segment_1_path = log_device_path_dir.clone();
593 segment_1_path.push("segment_1");
594 let mut segment_2_path = log_device_path_dir.clone();
595 segment_2_path.push("segment_2");
596 let mut segment_3_path = log_device_path_dir.clone();
597 segment_3_path.push("segment_3");
598
599 assert!(segment_0_path.exists());
600 assert!(segment_1_path.exists());
601 assert!(segment_2_path.exists());
602 assert!(segment_3_path.exists());
603
604 let strip_info = StripeInfo {
605 stripe_ref: StripeReference {
606 segment_id: 2,
607 offset: 0,
608 },
609 stripe_id: 0,
610 };
611 log_device.trim(&strip_info);
612
613 assert!(!segment_0_path.exists());
614 assert!(!segment_1_path.exists());
615 assert!(segment_2_path.exists());
616 assert!(segment_3_path.exists());
617
618 read_and_check(
619 log_device.clone(),
620 &stripes,
621 &stripe_refs,
622 2 * NUM_OF_STRIPES_PER_SEGMENT as usize,
623 4 * NUM_OF_STRIPES_PER_SEGMENT as usize,
624 );
625
626 std::fs::remove_dir_all(&path_dir).ok();
627 }
628
629 #[test]
630 fn test_create_log_device() {
631 let path_dir = String::from("./ldm_create");
632 let path_dir = PathBuf::from(path_dir);
633 std::fs::remove_dir_all(&path_dir).ok();
634 std::fs::create_dir_all(&path_dir).ok();
635 let log_device_manager = LogDeviceManager::new(path_dir.clone());
636 assert_eq!(log_device_manager.get_device_num(), 0);
637 assert_eq!(log_device_manager.get_device_num_from_db(), 0);
638 log_device_manager.create_new_device();
639 assert_eq!(log_device_manager.get_device_num(), 1);
640 assert_eq!(log_device_manager.get_device_num_from_db(), 1);
641 log_device_manager.create_new_device();
642 assert_eq!(log_device_manager.get_device_num(), 2);
643 assert_eq!(log_device_manager.get_device_num_from_db(), 2);
644 log_device_manager.create_new_device();
645 assert_eq!(log_device_manager.get_device_num(), 3);
646 assert_eq!(log_device_manager.get_device_num_from_db(), 3);
647 log_device_manager.create_new_device();
648 assert_eq!(log_device_manager.get_device_num(), 4);
649 assert_eq!(log_device_manager.get_device_num_from_db(), 4);
650
651 let mut log_device_path_dir = path_dir.clone();
652 let mut log_device_dir = String::from(LOG_DEVICE_DIR_PREFIX);
653 log_device_dir.push_str("0");
654 log_device_path_dir.push(log_device_dir.as_str());
655 let mut segment_0_path = log_device_path_dir.clone();
656 segment_0_path.push("segment_0");
657 assert!(segment_0_path.exists());
658
659 let mut log_device_path_dir = path_dir.clone();
660 let mut log_device_dir = String::from(LOG_DEVICE_DIR_PREFIX);
661 log_device_dir.push_str("1");
662 log_device_path_dir.push(log_device_dir.as_str());
663 let mut segment_0_path = log_device_path_dir.clone();
664 segment_0_path.push("segment_0");
665 assert!(segment_0_path.exists());
666
667 let mut log_device_path_dir = path_dir.clone();
668 let mut log_device_dir = String::from(LOG_DEVICE_DIR_PREFIX);
669 log_device_dir.push_str("2");
670 log_device_path_dir.push(log_device_dir.as_str());
671 let mut segment_0_path = log_device_path_dir.clone();
672 segment_0_path.push("segment_0");
673 assert!(segment_0_path.exists());
674
675 let mut log_device_path_dir = path_dir.clone();
676 let mut log_device_dir = String::from(LOG_DEVICE_DIR_PREFIX);
677 log_device_dir.push_str("3");
678 log_device_path_dir.push(log_device_dir.as_str());
679 let mut segment_0_path = log_device_path_dir.clone();
680 segment_0_path.push("segment_0");
681 assert!(segment_0_path.exists());
682
683 std::fs::remove_dir_all(&path_dir).ok();
684 }
685}