1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
// Copyright 2019 Conflux Foundation. All rights reserved.
// Conflux is free software and distributed under GNU General Public License.
// See http://www.gnu.org/licenses/

// The trait for database manager of Delta MPT.

pub type DeltaDbOwnedReadTraitObj<'db> =
    dyn 'db + KeyValueDbTraitOwnedRead<ValueType = Box<[u8]>>;

pub type DeltaDbTransactionTraitObj =
    dyn KeyValueDbTransactionTrait<ValueType = Box<[u8]>>;

pub trait DeltaDbTrait:
    KeyValueDbTypes<ValueType = Box<[u8]>>
    + KeyValueDbToOwnedReadTrait
    + KeyValueDbTraitRead
    + KeyValueDbTraitTransactionalDyn
    + MallocSizeOf
    + Send
    + Sync
{
}

pub trait DeltaDbManagerTrait {
    type DeltaDb: DeltaDbTrait;

    fn get_delta_db_dir(&self) -> &Path;
    fn get_delta_db_name(&self, snapshot_epoch_id: &EpochId) -> String;
    fn get_delta_db_path(&self, delta_db_name: &str) -> PathBuf;

    // Scan delta db dir, remove extra files and return the list of missing
    // snapshots for which the delta db is missing.
    fn scan_persist_state(
        &self, snapshot_info_map: &HashMap<EpochId, SnapshotInfo>,
    ) -> Result<(Vec<EpochId>, HashMap<EpochId, Self::DeltaDb>)> {
        let mut possible_delta_db_paths = HashMap::new();
        for (snapshot_epoch_id, snapshot_info) in snapshot_info_map {
            // Delta MPT
            possible_delta_db_paths.insert(
                self.get_delta_db_name(snapshot_epoch_id).into_bytes(),
                snapshot_epoch_id.clone(),
            );
            // Intermediate Delta MPT
            possible_delta_db_paths.insert(
                self.get_delta_db_name(&snapshot_info.parent_snapshot_epoch_id)
                    .into_bytes(),
                snapshot_info.parent_snapshot_epoch_id.clone(),
            );
        }
        let mut delta_mpts = HashMap::new();

        // Scan the delta db dir. Remove extra files, and return the list of
        // snapshots for which the delta db is missing.
        for entry in fs::read_dir(self.get_delta_db_dir())? {
            let entry = entry?;
            let path = entry.path();
            let dir_name = path.as_path().file_name().unwrap().to_str();
            if dir_name.is_none() {
                error!(
                    "Unexpected delta db path {}, deleted.",
                    entry.path().display()
                );
                fs::remove_dir_all(entry.path())?;
                continue;
            }
            let dir_name = dir_name.unwrap();
            if !possible_delta_db_paths.contains_key(dir_name.as_bytes()) {
                error!(
                    "Unexpected delta db path {}, deleted.",
                    entry.path().display()
                );
                fs::remove_dir_all(entry.path())?;
            } else {
                let snapshot_epoch_id = possible_delta_db_paths
                    .remove(dir_name.as_bytes())
                    .unwrap();
                delta_mpts.insert(
                    snapshot_epoch_id,
                    self.get_delta_db(
                        &self.get_delta_db_name(&snapshot_epoch_id),
                    )?
                    .unwrap(),
                );
            }
        }

        let mut missing_delta_dbs = vec![];
        for (snapshot_epoch_id, snapshot_info) in snapshot_info_map {
            // Skip if the snapshot doesn't exist.
            if snapshot_info.snapshot_info_kept_to_provide_sync
                == SnapshotKeptToProvideSyncStatus::No
            {
                if !delta_mpts.contains_key(snapshot_epoch_id) {
                    missing_delta_dbs.push(snapshot_epoch_id.clone())
                }
            }
        }

        Ok((missing_delta_dbs, delta_mpts))
    }

    fn new_empty_delta_db(&self, delta_db_name: &str) -> Result<Self::DeltaDb>;

    fn get_delta_db(
        &self, delta_db_name: &str,
    ) -> Result<Option<Self::DeltaDb>>;

    /// Destroy a Delta DB. Keep in mind that this method is irrecoverable.
    /// Ref-counting is necessary for Delta1 MPT in Snapshot.
    fn destroy_delta_db(&self, delta_db_name: &str) -> Result<()>;
}

use crate::{
    impls::errors::*,
    storage_db::{
        key_value_db::*, SnapshotInfo, SnapshotKeptToProvideSyncStatus,
    },
};
use malloc_size_of::MallocSizeOf;
use primitives::EpochId;
use std::{
    collections::HashMap,
    fs,
    path::{Path, PathBuf},
};