cfx_storage/storage_db/
snapshot_db_manager.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5#[derive(Debug)]
6pub struct SnapshotPersistState {
7    pub missing_snapshots: Vec<EpochId>,
8    pub max_epoch_id: EpochId,
9    pub max_epoch_height: u64,
10    pub temp_snapshot_db_existing: Option<EpochId>,
11    pub removed_snapshots: HashSet<EpochId>,
12    pub max_snapshot_epoch_height_has_mpt: Option<u64>,
13}
14
15pub trait SnapshotDbWriteableTrait: KeyValueDbTypes {
16    type SnapshotDbBorrowMutType: SnapshotMptTraitRw;
17
18    fn start_transaction(&mut self) -> Result<()>;
19
20    fn commit_transaction(&mut self) -> Result<()>;
21
22    fn put_kv(
23        &mut self, key: &[u8], value: &<Self::ValueType as DbValueType>::Type,
24    ) -> Result<Option<Option<Self::ValueType>>>;
25
26    fn open_snapshot_mpt_owned(
27        &mut self,
28    ) -> Result<Self::SnapshotDbBorrowMutType>;
29}
30
31/// The trait for database manager of Snapshot.
32pub trait SnapshotDbManagerTrait {
33    type SnapshotDb: SnapshotDbTrait<ValueType = Box<[u8]>>;
34    type SnapshotDbWrite: SnapshotDbWriteableTrait<ValueType = Box<[u8]>>;
35
36    fn get_snapshot_dir(&self) -> &Path;
37    fn get_snapshot_db_name(&self, snapshot_epoch_id: &EpochId) -> String;
38    fn get_snapshot_db_path(&self, snapshot_epoch_id: &EpochId) -> PathBuf;
39    fn get_mpt_snapshot_dir(&self) -> &Path;
40    fn get_latest_mpt_snapshot_db_name(&self) -> String;
41    fn recovery_latest_mpt_snapshot_from_checkpoint(
42        &self, snapshot_epoch_id: &EpochId,
43        before_era_pivot_hash: Option<EpochId>,
44    ) -> Result<()>;
45    fn create_mpt_snapshot_from_latest(
46        &self, new_snapshot_epoch_id: &EpochId,
47    ) -> Result<()>;
48    fn get_epoch_id_from_snapshot_db_name(
49        &self, snapshot_db_name: &str,
50    ) -> Result<EpochId>;
51    fn try_get_new_snapshot_epoch_from_temp_path(
52        &self, dir_name: &str,
53    ) -> Option<EpochId>;
54    fn try_get_new_snapshot_epoch_from_mpt_temp_path(
55        &self, dir_name: &str,
56    ) -> Option<EpochId>;
57
58    // Scan snapshot dir, remove extra files and return the list of missing
59    // snapshots.
60    fn scan_persist_state(
61        &self, snapshot_info_map: &HashMap<EpochId, SnapshotInfo>,
62    ) -> Result<SnapshotPersistState> {
63        let mut missing_snapshots = HashMap::new();
64        let mut all_snapshots = HashMap::new();
65        for (snapshot_epoch_id, snapshot_info) in snapshot_info_map {
66            all_snapshots.insert(
67                self.get_snapshot_db_name(snapshot_epoch_id).into_bytes(),
68                snapshot_epoch_id.clone(),
69            );
70            // If the snapshot info is kept to provide sync, we allow the
71            // snapshot itself to be missing, because a snapshot of
72            // snapshot_epoch_id's ancestor is kept to provide sync. We need to
73            // keep this snapshot info to know the parental relationship.
74            if snapshot_info.snapshot_info_kept_to_provide_sync
75                != SnapshotKeptToProvideSyncStatus::InfoOnly
76            {
77                missing_snapshots.insert(
78                    self.get_snapshot_db_name(snapshot_epoch_id).into_bytes(),
79                    (snapshot_epoch_id.clone(), snapshot_info.height),
80                );
81            }
82        }
83
84        // Scan the snapshot dir. Remove extra files, and return the list of
85        // missing snapshots.
86        let mut max_epoch_height = 0;
87        let mut max_epoch_id = NULL_EPOCH;
88        let mut temp_snapshot_db_existing = None;
89        let mut removed_snapshots = HashSet::new();
90        let mut max_snapshot_epoch_height_has_mpt = None;
91
92        for entry in fs::read_dir(self.get_snapshot_dir())? {
93            let entry = entry?;
94            let path = entry.path();
95            let dir_name = path.as_path().file_name().unwrap().to_str();
96            if dir_name.is_none() {
97                error!(
98                    "Unexpected snapshot path {}, deleted.",
99                    entry.path().display()
100                );
101                fs::remove_dir_all(entry.path())?;
102                continue;
103            }
104            let dir_name = dir_name.unwrap();
105            if !all_snapshots.contains_key(dir_name.as_bytes()) {
106                error!(
107                    "Unexpected snapshot path {}, deleted.",
108                    entry.path().display()
109                );
110
111                match self.try_get_new_snapshot_epoch_from_temp_path(dir_name) {
112                    Some(e) => {
113                        info!("remove temp kv snapshot {}", e);
114                        if temp_snapshot_db_existing.is_none() {
115                            temp_snapshot_db_existing = Some(e);
116                        } else {
117                            panic!("there are more than one temp kv snapshot");
118                        }
119                    }
120                    None => {
121                        if let Ok(epoch_id) =
122                            self.get_epoch_id_from_snapshot_db_name(dir_name)
123                        {
124                            removed_snapshots.insert(epoch_id);
125                        }
126                    }
127                }
128
129                fs::remove_dir_all(entry.path())?;
130            } else {
131                if let Some((epoch, height)) =
132                    missing_snapshots.remove(dir_name.as_bytes())
133                {
134                    if height > max_epoch_height {
135                        max_epoch_height = height;
136                        max_epoch_id = epoch;
137                    }
138
139                    if self
140                        .get_snapshot_by_epoch_id(&epoch, false, false)?
141                        .expect("should be open snapshot")
142                        .is_mpt_table_in_current_db()
143                    {
144                        if max_snapshot_epoch_height_has_mpt.is_none()
145                            || *max_snapshot_epoch_height_has_mpt
146                                .as_ref()
147                                .unwrap()
148                                < height
149                        {
150                            max_snapshot_epoch_height_has_mpt = Some(height);
151                        }
152                    }
153                }
154            }
155        }
156
157        // scan mpt directory, and delete unnecessary snapshots
158        for entry in fs::read_dir(self.get_mpt_snapshot_dir())? {
159            let entry = entry?;
160            let path = entry.path();
161            let dir_name = path.as_path().file_name().unwrap().to_str();
162
163            if dir_name.is_none() {
164                error!(
165                    "Unexpected MPT snapshot path {}, deleted.",
166                    entry.path().display()
167                );
168                fs::remove_dir_all(entry.path())?;
169                continue;
170            }
171
172            let dir_name = dir_name.unwrap();
173            if !all_snapshots.contains_key(dir_name.as_bytes())
174                && !self.get_latest_mpt_snapshot_db_name().eq(dir_name)
175            {
176                error!(
177                    "Unexpected MPT snapshot path {}, deleted.",
178                    entry.path().display()
179                );
180                fs::remove_dir_all(entry.path())?;
181
182                if let Some(snapshot_id) =
183                    self.try_get_new_snapshot_epoch_from_mpt_temp_path(dir_name)
184                {
185                    if snapshot_id == max_epoch_id {
186                        self.create_mpt_snapshot_from_latest(&snapshot_id)?;
187                    }
188                }
189            }
190        }
191
192        info!("max epoch height: {}, temp snapshot db existing: {:?}, removed snapshots: {:?}, max snapshot epoch height has mpt: {:?}", max_epoch_height, temp_snapshot_db_existing, removed_snapshots, max_snapshot_epoch_height_has_mpt);
193        Ok(SnapshotPersistState {
194            missing_snapshots: missing_snapshots
195                .into_iter()
196                .map(|(_path_bytes, (snapshot_epoch_id, _))| snapshot_epoch_id)
197                .collect(),
198            max_epoch_id,
199            max_epoch_height,
200            temp_snapshot_db_existing,
201            removed_snapshots,
202            max_snapshot_epoch_height_has_mpt,
203        })
204    }
205
206    fn new_snapshot_by_merging<'m>(
207        &self, old_snapshot_epoch_id: &EpochId, snapshot_epoch_id: EpochId,
208        delta_mpt: DeltaMptIterator, in_progress_snapshot_info: SnapshotInfo,
209        snapshot_info_map: &'m RwLock<PersistedSnapshotInfoMap>,
210        new_epoch_height: u64, recover_mpt_with_kv_snapshot_exist: bool,
211    ) -> Result<(RwLockWriteGuard<'m, PersistedSnapshotInfoMap>, SnapshotInfo)>;
212    fn get_snapshot_by_epoch_id(
213        &self, epoch_id: &EpochId, try_open: bool, open_mpt_snapshot: bool,
214    ) -> Result<Option<Self::SnapshotDb>>;
215    fn destroy_snapshot(&self, snapshot_epoch_id: &EpochId) -> Result<()>;
216
217    fn new_temp_snapshot_for_full_sync(
218        &self, snapshot_epoch_id: &EpochId, merkle_root: &MerkleHash,
219        new_epoch_height: u64,
220    ) -> Result<Self::SnapshotDbWrite>;
221    fn finalize_full_sync_snapshot<'m>(
222        &self, snapshot_epoch_id: &EpochId, merkle_root: &MerkleHash,
223        snapshot_info_map_rwlock: &'m RwLock<PersistedSnapshotInfoMap>,
224    ) -> Result<RwLockWriteGuard<'m, PersistedSnapshotInfoMap>>;
225}
226
227use super::{
228    super::impls::{delta_mpt::DeltaMptIterator, errors::*},
229    snapshot_db::*,
230    DbValueType, KeyValueDbTypes, SnapshotMptTraitRw,
231};
232use crate::impls::storage_manager::PersistedSnapshotInfoMap;
233use parking_lot::{RwLock, RwLockWriteGuard};
234use primitives::{EpochId, MerkleHash, NULL_EPOCH};
235use std::{
236    collections::{HashMap, HashSet},
237    fs,
238    path::{Path, PathBuf},
239};