cfx_storage/storage_db/
delta_db_manager.rspub type DeltaDbOwnedReadTraitObj<'db> =
    dyn 'db + KeyValueDbTraitOwnedRead<ValueType = Box<[u8]>>;
pub type DeltaDbTransactionTraitObj =
    dyn KeyValueDbTransactionTrait<ValueType = Box<[u8]>>;
pub trait DeltaDbTrait:
    KeyValueDbTypes<ValueType = Box<[u8]>>
    + KeyValueDbToOwnedReadTrait
    + KeyValueDbTraitRead
    + KeyValueDbTraitTransactionalDyn
    + MallocSizeOf
    + Send
    + Sync
{
}
pub trait DeltaDbManagerTrait {
    type DeltaDb: DeltaDbTrait;
    fn get_delta_db_dir(&self) -> &Path;
    fn get_delta_db_name(&self, snapshot_epoch_id: &EpochId) -> String;
    fn get_delta_db_path(&self, delta_db_name: &str) -> PathBuf;
    fn scan_persist_state(
        &self, snapshot_info_map: &HashMap<EpochId, SnapshotInfo>,
    ) -> Result<(Vec<EpochId>, HashMap<EpochId, Self::DeltaDb>)> {
        let mut possible_delta_db_paths = HashMap::new();
        for (snapshot_epoch_id, snapshot_info) in snapshot_info_map {
            possible_delta_db_paths.insert(
                self.get_delta_db_name(snapshot_epoch_id).into_bytes(),
                snapshot_epoch_id.clone(),
            );
            possible_delta_db_paths.insert(
                self.get_delta_db_name(&snapshot_info.parent_snapshot_epoch_id)
                    .into_bytes(),
                snapshot_info.parent_snapshot_epoch_id.clone(),
            );
        }
        let mut delta_mpts = HashMap::new();
        for entry in fs::read_dir(self.get_delta_db_dir())? {
            let entry = entry?;
            let path = entry.path();
            let dir_name = path.as_path().file_name().unwrap().to_str();
            if dir_name.is_none() {
                error!(
                    "Unexpected delta db path {}, deleted.",
                    entry.path().display()
                );
                fs::remove_dir_all(entry.path())?;
                continue;
            }
            let dir_name = dir_name.unwrap();
            if !possible_delta_db_paths.contains_key(dir_name.as_bytes()) {
                error!(
                    "Unexpected delta db path {}, deleted.",
                    entry.path().display()
                );
                fs::remove_dir_all(entry.path())?;
            } else {
                let snapshot_epoch_id = possible_delta_db_paths
                    .remove(dir_name.as_bytes())
                    .unwrap();
                delta_mpts.insert(
                    snapshot_epoch_id,
                    self.get_delta_db(
                        &self.get_delta_db_name(&snapshot_epoch_id),
                    )?
                    .unwrap(),
                );
            }
        }
        let mut missing_delta_dbs = vec![];
        for (snapshot_epoch_id, snapshot_info) in snapshot_info_map {
            if snapshot_info.snapshot_info_kept_to_provide_sync
                == SnapshotKeptToProvideSyncStatus::No
            {
                if !delta_mpts.contains_key(snapshot_epoch_id) {
                    missing_delta_dbs.push(snapshot_epoch_id.clone())
                }
            }
        }
        Ok((missing_delta_dbs, delta_mpts))
    }
    fn new_empty_delta_db(&self, delta_db_name: &str) -> Result<Self::DeltaDb>;
    fn get_delta_db(
        &self, delta_db_name: &str,
    ) -> Result<Option<Self::DeltaDb>>;
    fn destroy_delta_db(&self, delta_db_name: &str) -> Result<()>;
}
use crate::{
    impls::errors::*,
    storage_db::{
        key_value_db::*, SnapshotInfo, SnapshotKeptToProvideSyncStatus,
    },
};
use malloc_size_of::MallocSizeOf;
use primitives::EpochId;
use std::{
    collections::HashMap,
    fs,
    path::{Path, PathBuf},
};