pub type DeltaDbOwnedReadTraitObj<'db> =
dyn 'db + KeyValueDbTraitOwnedRead<ValueType = Box<[u8]>>;
pub type DeltaDbTransactionTraitObj =
dyn KeyValueDbTransactionTrait<ValueType = Box<[u8]>>;
pub trait DeltaDbTrait:
KeyValueDbTypes<ValueType = Box<[u8]>>
+ KeyValueDbToOwnedReadTrait
+ KeyValueDbTraitRead
+ KeyValueDbTraitTransactionalDyn
+ MallocSizeOf
+ Send
+ Sync
{
}
pub trait DeltaDbManagerTrait {
type DeltaDb: DeltaDbTrait;
fn get_delta_db_dir(&self) -> &Path;
fn get_delta_db_name(&self, snapshot_epoch_id: &EpochId) -> String;
fn get_delta_db_path(&self, delta_db_name: &str) -> PathBuf;
fn scan_persist_state(
&self, snapshot_info_map: &HashMap<EpochId, SnapshotInfo>,
) -> Result<(Vec<EpochId>, HashMap<EpochId, Self::DeltaDb>)> {
let mut possible_delta_db_paths = HashMap::new();
for (snapshot_epoch_id, snapshot_info) in snapshot_info_map {
possible_delta_db_paths.insert(
self.get_delta_db_name(snapshot_epoch_id).into_bytes(),
snapshot_epoch_id.clone(),
);
possible_delta_db_paths.insert(
self.get_delta_db_name(&snapshot_info.parent_snapshot_epoch_id)
.into_bytes(),
snapshot_info.parent_snapshot_epoch_id.clone(),
);
}
let mut delta_mpts = HashMap::new();
for entry in fs::read_dir(self.get_delta_db_dir())? {
let entry = entry?;
let path = entry.path();
let dir_name = path.as_path().file_name().unwrap().to_str();
if dir_name.is_none() {
error!(
"Unexpected delta db path {}, deleted.",
entry.path().display()
);
fs::remove_dir_all(entry.path())?;
continue;
}
let dir_name = dir_name.unwrap();
if !possible_delta_db_paths.contains_key(dir_name.as_bytes()) {
error!(
"Unexpected delta db path {}, deleted.",
entry.path().display()
);
fs::remove_dir_all(entry.path())?;
} else {
let snapshot_epoch_id = possible_delta_db_paths
.remove(dir_name.as_bytes())
.unwrap();
delta_mpts.insert(
snapshot_epoch_id,
self.get_delta_db(
&self.get_delta_db_name(&snapshot_epoch_id),
)?
.unwrap(),
);
}
}
let mut missing_delta_dbs = vec![];
for (snapshot_epoch_id, snapshot_info) in snapshot_info_map {
if snapshot_info.snapshot_info_kept_to_provide_sync
== SnapshotKeptToProvideSyncStatus::No
{
if !delta_mpts.contains_key(snapshot_epoch_id) {
missing_delta_dbs.push(snapshot_epoch_id.clone())
}
}
}
Ok((missing_delta_dbs, delta_mpts))
}
fn new_empty_delta_db(&self, delta_db_name: &str) -> Result<Self::DeltaDb>;
fn get_delta_db(
&self, delta_db_name: &str,
) -> Result<Option<Self::DeltaDb>>;
fn destroy_delta_db(&self, delta_db_name: &str) -> Result<()>;
}
use crate::{
impls::errors::*,
storage_db::{
key_value_db::*, SnapshotInfo, SnapshotKeptToProvideSyncStatus,
},
};
use malloc_size_of::MallocSizeOf;
use primitives::EpochId;
use std::{
collections::HashMap,
fs,
path::{Path, PathBuf},
};