1#[derive(Debug)]
6pub struct SnapshotPersistState {
7 pub missing_snapshots: Vec<EpochId>,
8 pub max_epoch_id: EpochId,
9 pub max_epoch_height: u64,
10 pub temp_snapshot_db_existing: Option<EpochId>,
11 pub removed_snapshots: HashSet<EpochId>,
12 pub max_snapshot_epoch_height_has_mpt: Option<u64>,
13}
14
15pub trait SnapshotDbWriteableTrait: KeyValueDbTypes {
16 type SnapshotDbBorrowMutType: SnapshotMptTraitRw;
17
18 fn start_transaction(&mut self) -> Result<()>;
19
20 fn commit_transaction(&mut self) -> Result<()>;
21
22 fn put_kv(
23 &mut self, key: &[u8], value: &<Self::ValueType as DbValueType>::Type,
24 ) -> Result<Option<Option<Self::ValueType>>>;
25
26 fn open_snapshot_mpt_owned(
27 &mut self,
28 ) -> Result<Self::SnapshotDbBorrowMutType>;
29}
30
31pub trait SnapshotDbManagerTrait {
33 type SnapshotDb: SnapshotDbTrait<ValueType = Box<[u8]>>;
34 type SnapshotDbWrite: SnapshotDbWriteableTrait<ValueType = Box<[u8]>>;
35
36 fn get_snapshot_dir(&self) -> &Path;
37 fn get_snapshot_db_name(&self, snapshot_epoch_id: &EpochId) -> String;
38 fn get_snapshot_db_path(&self, snapshot_epoch_id: &EpochId) -> PathBuf;
39 fn get_mpt_snapshot_dir(&self) -> &Path;
40 fn get_latest_mpt_snapshot_db_name(&self) -> String;
41 fn recovery_latest_mpt_snapshot_from_checkpoint(
42 &self, snapshot_epoch_id: &EpochId,
43 before_era_pivot_hash: Option<EpochId>,
44 ) -> Result<()>;
45 fn create_mpt_snapshot_from_latest(
46 &self, new_snapshot_epoch_id: &EpochId,
47 ) -> Result<()>;
48 fn get_epoch_id_from_snapshot_db_name(
49 &self, snapshot_db_name: &str,
50 ) -> Result<EpochId>;
51 fn try_get_new_snapshot_epoch_from_temp_path(
52 &self, dir_name: &str,
53 ) -> Option<EpochId>;
54 fn try_get_new_snapshot_epoch_from_mpt_temp_path(
55 &self, dir_name: &str,
56 ) -> Option<EpochId>;
57
58 fn scan_persist_state(
61 &self, snapshot_info_map: &HashMap<EpochId, SnapshotInfo>,
62 ) -> Result<SnapshotPersistState> {
63 let mut missing_snapshots = HashMap::new();
64 let mut all_snapshots = HashMap::new();
65 for (snapshot_epoch_id, snapshot_info) in snapshot_info_map {
66 all_snapshots.insert(
67 self.get_snapshot_db_name(snapshot_epoch_id).into_bytes(),
68 snapshot_epoch_id.clone(),
69 );
70 if snapshot_info.snapshot_info_kept_to_provide_sync
75 != SnapshotKeptToProvideSyncStatus::InfoOnly
76 {
77 missing_snapshots.insert(
78 self.get_snapshot_db_name(snapshot_epoch_id).into_bytes(),
79 (snapshot_epoch_id.clone(), snapshot_info.height),
80 );
81 }
82 }
83
84 let mut max_epoch_height = 0;
87 let mut max_epoch_id = NULL_EPOCH;
88 let mut temp_snapshot_db_existing = None;
89 let mut removed_snapshots = HashSet::new();
90 let mut max_snapshot_epoch_height_has_mpt = None;
91
92 for entry in fs::read_dir(self.get_snapshot_dir())? {
93 let entry = entry?;
94 let path = entry.path();
95 let dir_name = path.as_path().file_name().unwrap().to_str();
96 if dir_name.is_none() {
97 error!(
98 "Unexpected snapshot path {}, deleted.",
99 entry.path().display()
100 );
101 fs::remove_dir_all(entry.path())?;
102 continue;
103 }
104 let dir_name = dir_name.unwrap();
105 if !all_snapshots.contains_key(dir_name.as_bytes()) {
106 error!(
107 "Unexpected snapshot path {}, deleted.",
108 entry.path().display()
109 );
110
111 match self.try_get_new_snapshot_epoch_from_temp_path(dir_name) {
112 Some(e) => {
113 info!("remove temp kv snapshot {}", e);
114 if temp_snapshot_db_existing.is_none() {
115 temp_snapshot_db_existing = Some(e);
116 } else {
117 panic!("there are more than one temp kv snapshot");
118 }
119 }
120 None => {
121 if let Ok(epoch_id) =
122 self.get_epoch_id_from_snapshot_db_name(dir_name)
123 {
124 removed_snapshots.insert(epoch_id);
125 }
126 }
127 }
128
129 fs::remove_dir_all(entry.path())?;
130 } else {
131 if let Some((epoch, height)) =
132 missing_snapshots.remove(dir_name.as_bytes())
133 {
134 if height > max_epoch_height {
135 max_epoch_height = height;
136 max_epoch_id = epoch;
137 }
138
139 if self
140 .get_snapshot_by_epoch_id(&epoch, false, false)?
141 .expect("should be open snapshot")
142 .is_mpt_table_in_current_db()
143 {
144 if max_snapshot_epoch_height_has_mpt.is_none()
145 || *max_snapshot_epoch_height_has_mpt
146 .as_ref()
147 .unwrap()
148 < height
149 {
150 max_snapshot_epoch_height_has_mpt = Some(height);
151 }
152 }
153 }
154 }
155 }
156
157 for entry in fs::read_dir(self.get_mpt_snapshot_dir())? {
159 let entry = entry?;
160 let path = entry.path();
161 let dir_name = path.as_path().file_name().unwrap().to_str();
162
163 if dir_name.is_none() {
164 error!(
165 "Unexpected MPT snapshot path {}, deleted.",
166 entry.path().display()
167 );
168 fs::remove_dir_all(entry.path())?;
169 continue;
170 }
171
172 let dir_name = dir_name.unwrap();
173 if !all_snapshots.contains_key(dir_name.as_bytes())
174 && !self.get_latest_mpt_snapshot_db_name().eq(dir_name)
175 {
176 error!(
177 "Unexpected MPT snapshot path {}, deleted.",
178 entry.path().display()
179 );
180 fs::remove_dir_all(entry.path())?;
181
182 if let Some(snapshot_id) =
183 self.try_get_new_snapshot_epoch_from_mpt_temp_path(dir_name)
184 {
185 if snapshot_id == max_epoch_id {
186 self.create_mpt_snapshot_from_latest(&snapshot_id)?;
187 }
188 }
189 }
190 }
191
192 info!("max epoch height: {}, temp snapshot db existing: {:?}, removed snapshots: {:?}, max snapshot epoch height has mpt: {:?}", max_epoch_height, temp_snapshot_db_existing, removed_snapshots, max_snapshot_epoch_height_has_mpt);
193 Ok(SnapshotPersistState {
194 missing_snapshots: missing_snapshots
195 .into_iter()
196 .map(|(_path_bytes, (snapshot_epoch_id, _))| snapshot_epoch_id)
197 .collect(),
198 max_epoch_id,
199 max_epoch_height,
200 temp_snapshot_db_existing,
201 removed_snapshots,
202 max_snapshot_epoch_height_has_mpt,
203 })
204 }
205
206 fn new_snapshot_by_merging<'m>(
207 &self, old_snapshot_epoch_id: &EpochId, snapshot_epoch_id: EpochId,
208 delta_mpt: DeltaMptIterator, in_progress_snapshot_info: SnapshotInfo,
209 snapshot_info_map: &'m RwLock<PersistedSnapshotInfoMap>,
210 new_epoch_height: u64, recover_mpt_with_kv_snapshot_exist: bool,
211 ) -> Result<(RwLockWriteGuard<'m, PersistedSnapshotInfoMap>, SnapshotInfo)>;
212 fn get_snapshot_by_epoch_id(
213 &self, epoch_id: &EpochId, try_open: bool, open_mpt_snapshot: bool,
214 ) -> Result<Option<Self::SnapshotDb>>;
215 fn destroy_snapshot(&self, snapshot_epoch_id: &EpochId) -> Result<()>;
216
217 fn new_temp_snapshot_for_full_sync(
218 &self, snapshot_epoch_id: &EpochId, merkle_root: &MerkleHash,
219 new_epoch_height: u64,
220 ) -> Result<Self::SnapshotDbWrite>;
221 fn finalize_full_sync_snapshot<'m>(
222 &self, snapshot_epoch_id: &EpochId, merkle_root: &MerkleHash,
223 snapshot_info_map_rwlock: &'m RwLock<PersistedSnapshotInfoMap>,
224 ) -> Result<RwLockWriteGuard<'m, PersistedSnapshotInfoMap>>;
225}
226
227use super::{
228 super::impls::{delta_mpt::DeltaMptIterator, errors::*},
229 snapshot_db::*,
230 DbValueType, KeyValueDbTypes, SnapshotMptTraitRw,
231};
232use crate::impls::storage_manager::PersistedSnapshotInfoMap;
233use parking_lot::{RwLock, RwLockWriteGuard};
234use primitives::{EpochId, MerkleHash, NULL_EPOCH};
235use std::{
236 collections::{HashMap, HashSet},
237 fs,
238 path::{Path, PathBuf},
239};