cfx_storage/impls/storage_db/
snapshot_mpt_db_sqlite.rs1pub struct SnapshotMptDbSqlite {
6 maybe_db_connections: Option<Box<[SqliteConnection]>>,
7 already_open_snapshots: AlreadyOpenSnapshots<Self>,
8 open_semaphore: Arc<Semaphore>,
9 path: PathBuf,
10 remove_on_close: AtomicBool,
11 latest_mpt_snapshot_semaphore: Option<Arc<Semaphore>>,
12}
13
14pub struct SnapshotMptDbStatements {
15 pub mpt_statements: Arc<KvdbSqliteStatements>,
16}
17
18lazy_static! {
19 pub static ref SNAPSHOT_MPT_DB_STATEMENTS: SnapshotMptDbStatements = {
20 let mpt_statements = Arc::new(
21 KvdbSqliteStatements::make_statements(
22 &["node_rlp"],
23 &["BLOB"],
24 SnapshotMptDbSqlite::SNAPSHOT_MPT_TABLE_NAME,
25 false,
26 )
27 .unwrap(),
28 );
29
30 SnapshotMptDbStatements { mpt_statements }
31 };
32}
33
34impl Drop for SnapshotMptDbSqlite {
35 fn drop(&mut self) {
36 if !self.path.as_os_str().is_empty() {
37 debug!("drop SnapshotMptDbSqlite {:?}", self.path);
38
39 self.maybe_db_connections.take();
40 SnapshotDbManagerSqlite::on_close_mpt_snapshot(
41 &self.already_open_snapshots,
42 &self.open_semaphore,
43 &self.path,
44 self.remove_on_close.load(Ordering::Relaxed),
45 &self.latest_mpt_snapshot_semaphore,
46 )
47 }
48 }
49}
50
51impl SnapshotMptDbSqlite {
52 pub const DB_SHARDS: u16 = 32;
53 pub const SNAPSHOT_MPT_TABLE_NAME: &'static str = "snapshot_mpt";
55}
56
57impl KeyValueDbTypes for SnapshotMptDbSqlite {
58 type ValueType = Box<[u8]>;
59}
60
61impl KvdbSqliteShardedRefDestructureTrait for SnapshotMptDbSqlite {
63 fn destructure(
64 &self,
65 ) -> (Option<&[SqliteConnection]>, &KvdbSqliteStatements) {
66 (
67 self.maybe_db_connections.as_ref().map(|b| &**b),
68 &*SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements,
69 )
70 }
71}
72
73impl KvdbSqliteShardedDestructureTrait for SnapshotMptDbSqlite {
74 fn destructure_mut(
75 &mut self,
76 ) -> (Option<&mut [SqliteConnection]>, &KvdbSqliteStatements) {
77 (
78 self.maybe_db_connections.as_mut().map(|b| &mut **b),
79 &*SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements,
80 )
81 }
82}
83
84impl ReadImplFamily for SnapshotMptDbSqlite {
87 type FamilyRepresentative = KvdbSqliteSharded<Box<[u8]>>;
88}
89
90impl OwnedReadImplFamily for SnapshotMptDbSqlite {
91 type FamilyRepresentative = KvdbSqliteSharded<Box<[u8]>>;
92}
93
94impl SingleWriterImplFamily for SnapshotMptDbSqlite {
95 type FamilyRepresentative = KvdbSqliteSharded<Box<[u8]>>;
96}
97
98impl<'db> OpenSnapshotMptTrait<'db> for SnapshotMptDbSqlite {
99 type SnapshotDbAsOwnedType = SnapshotMpt<
100 KvdbSqliteSharded<SnapshotMptDbValue>,
101 KvdbSqliteSharded<SnapshotMptDbValue>,
102 >;
103 type SnapshotDbBorrowMutType = SnapshotMpt<
105 KvdbSqliteShardedBorrowMut<'static, SnapshotMptDbValue>,
106 KvdbSqliteShardedBorrowMut<'static, SnapshotMptDbValue>,
107 >;
108 type SnapshotDbBorrowSharedType = SnapshotMpt<
109 KvdbSqliteShardedBorrowShared<'static, SnapshotMptDbValue>,
110 KvdbSqliteShardedBorrowShared<'static, SnapshotMptDbValue>,
111 >;
112
113 fn open_snapshot_mpt_owned(
114 &'db mut self,
115 ) -> Result<Self::SnapshotDbBorrowMutType> {
116 Ok(SnapshotMpt::new(unsafe {
117 std::mem::transmute(
118 KvdbSqliteShardedBorrowMut::<SnapshotMptDbValue>::new(
119 self.maybe_db_connections.as_mut().map(|b| &mut **b),
120 &SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements,
121 ),
122 )
123 })?)
124 }
125
126 fn open_snapshot_mpt_as_owned(
127 &'db self,
128 ) -> Result<Self::SnapshotDbAsOwnedType> {
129 Ok(SnapshotMpt::new(
130 KvdbSqliteSharded::<SnapshotMptDbValue>::new(
131 self.try_clone_connections()?,
132 SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements.clone(),
133 ),
134 )?)
135 }
136
137 fn open_snapshot_mpt_shared(
138 &'db self,
139 ) -> Result<Self::SnapshotDbBorrowSharedType> {
140 Ok(SnapshotMpt::new(unsafe {
141 std::mem::transmute(KvdbSqliteShardedBorrowShared::<
142 SnapshotMptDbValue,
143 >::new(
144 self.maybe_db_connections.as_ref().map(|b| &**b),
145 &SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements,
146 ))
147 })?)
148 }
149}
150
151impl SnapshotDbTrait for SnapshotMptDbSqlite {
152 type SnapshotKvdbIterTraitTag = KvdbSqliteShardedIteratorTag;
153 type SnapshotKvdbIterType =
154 KvdbSqliteSharded<<Self as KeyValueDbTypes>::ValueType>;
155
156 fn get_null_snapshot() -> Self { unreachable!() }
157
158 fn open(
159 _snapshot_path: &Path, _readonly: bool,
160 _already_open_snapshots: &AlreadyOpenSnapshots<Self>,
161 _open_semaphore: &Arc<Semaphore>,
162 ) -> Result<SnapshotMptDbSqlite> {
163 unreachable!()
164 }
165
166 fn create(
167 _snapshot_path: &Path,
168 _already_open_snapshots: &AlreadyOpenSnapshots<Self>,
169 _open_snapshots_semaphore: &Arc<Semaphore>,
170 _mpt_table_in_current_db: bool,
171 ) -> Result<SnapshotMptDbSqlite> {
172 unreachable!()
173 }
174
175 fn direct_merge(
176 &mut self, _old_snapshot_db: Option<&Arc<SnapshotMptDbSqlite>>,
177 _mpt_snapshot: &mut Option<SnapshotMptDbSqlite>,
178 _recover_mpt_with_kv_snapshot_exist: bool,
179 _in_reconstruct_snapshot_state: bool,
180 ) -> Result<MerkleHash> {
181 unreachable!()
182 }
183
184 fn copy_and_merge(
185 &mut self, _old_snapshot_db: &Arc<SnapshotMptDbSqlite>,
186 _mpt_snapshot_db: &mut Option<SnapshotMptDbSqlite>,
187 _in_reconstruct_snapshot_state: bool,
188 ) -> Result<MerkleHash> {
189 unreachable!()
190 }
191
192 fn start_transaction(&mut self) -> Result<()> {
193 if let Some(connections) = self.maybe_db_connections.as_mut() {
194 for connection in connections.iter_mut() {
195 connection.execute("BEGIN IMMEDIATE", SQLITE_NO_PARAM)?;
196 }
197 }
198 Ok(())
199 }
200
201 fn commit_transaction(&mut self) -> Result<()> {
202 if let Some(connections) = self.maybe_db_connections.as_mut() {
203 for connection in connections.iter_mut() {
204 connection.execute("COMMIT", SQLITE_NO_PARAM)?;
205 }
206 }
207 Ok(())
208 }
209
210 fn is_mpt_table_in_current_db(&self) -> bool { unreachable!() }
211
212 fn snapshot_kv_iterator(
213 &self,
214 ) -> Result<
215 Wrap<
216 '_,
217 Self::SnapshotKvdbIterType,
218 dyn KeyValueDbIterableTrait<
219 MptKeyValue,
220 [u8],
221 KvdbSqliteShardedIteratorTag,
222 >,
223 >,
224 > {
225 unreachable!()
226 }
227}
228
229impl SnapshotMptDbSqlite {
230 fn try_clone_connections(&self) -> Result<Option<Box<[SqliteConnection]>>> {
231 match &self.maybe_db_connections {
232 None => Ok(None),
233 Some(old_connections) => {
234 let mut connections = Vec::with_capacity(old_connections.len());
235 for old_connection in old_connections.iter() {
236 let new_connection = old_connection.try_clone()?;
237 connections.push(new_connection);
238 }
239 Ok(Some(connections.into_boxed_slice()))
240 }
241 }
242 }
243
244 pub fn open(
245 snapshot_path: &Path, readonly: bool,
246 already_open_snapshots: &AlreadyOpenSnapshots<Self>,
247 open_semaphore: &Arc<Semaphore>,
248 latest_mpt_snapshot_semaphore: Option<Arc<Semaphore>>,
249 ) -> Result<SnapshotMptDbSqlite> {
250 let kvdb_sqlite_sharded = KvdbSqliteSharded::<Box<[u8]>>::open(
251 Self::DB_SHARDS,
252 snapshot_path,
253 readonly,
254 SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements.clone(),
255 )?;
256
257 Ok(Self {
258 maybe_db_connections: kvdb_sqlite_sharded.into_connections(),
259 already_open_snapshots: already_open_snapshots.clone(),
260 open_semaphore: open_semaphore.clone(),
261 path: snapshot_path.to_path_buf(),
262 remove_on_close: Default::default(),
263 latest_mpt_snapshot_semaphore,
264 })
265 }
266
267 pub fn create(
268 snapshot_path: &Path,
269 already_open_snapshots: &AlreadyOpenSnapshots<Self>,
270 open_snapshots_semaphore: &Arc<Semaphore>,
271 latest_mpt_snapshot_semaphore: Option<Arc<Semaphore>>,
272 ) -> Result<SnapshotMptDbSqlite> {
273 fs::create_dir_all(snapshot_path)?;
274 let create_result = (|| -> Result<Box<[SqliteConnection]>> {
275 let kvdb_sqlite_sharded =
276 KvdbSqliteSharded::<Box<[u8]>>::create_and_open(
277 Self::DB_SHARDS,
278 snapshot_path,
279 SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements.clone(),
280 true,
281 true,
282 )?;
283 let connections = kvdb_sqlite_sharded.into_connections().unwrap();
284
285 Ok(connections)
286 })();
287 match create_result {
288 Err(e) => {
289 fs::remove_dir_all(&snapshot_path)?;
290 bail!(e);
291 }
292 Ok(connections) => Ok(SnapshotMptDbSqlite {
293 maybe_db_connections: Some(connections),
294 already_open_snapshots: already_open_snapshots.clone(),
295 open_semaphore: open_snapshots_semaphore.clone(),
296 path: snapshot_path.to_path_buf(),
297 remove_on_close: Default::default(),
298 latest_mpt_snapshot_semaphore,
299 }),
300 }
301 }
302
303 pub fn set_remove_on_last_close(&self) {
304 self.remove_on_close.store(true, Ordering::Relaxed);
305 }
306
307 pub fn snapshot_mpt_itertor(
308 &self,
309 ) -> Result<
310 Wrap<
311 '_,
312 KvdbSqliteSharded<<Self as KeyValueDbTypes>::ValueType>,
313 dyn KeyValueDbIterableTrait<
314 MptKeyValue,
315 [u8],
316 KvdbSqliteShardedIteratorTag,
317 >,
318 >,
319 > {
320 Ok(Wrap(KvdbSqliteSharded::new(
321 self.try_clone_connections()?,
322 SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements.clone(),
323 )))
324 }
325}
326
327use primitives::MerkleHash;
328use tokio::sync::Semaphore;
329
330use crate::{
331 impls::{
332 errors::*,
333 storage_db::{
334 kvdb_sqlite::KvdbSqliteStatements,
335 kvdb_sqlite_sharded::{
336 KvdbSqliteSharded, KvdbSqliteShardedBorrowMut,
337 KvdbSqliteShardedBorrowShared,
338 },
339 snapshot_mpt::SnapshotMpt,
340 },
341 },
342 storage_db::{
343 KeyValueDbIterableTrait, KeyValueDbTypes, OpenSnapshotMptTrait,
344 OwnedReadImplFamily, ReadImplFamily, SingleWriterImplFamily,
345 SnapshotDbTrait, SnapshotMptDbValue,
346 },
347 utils::wrap::Wrap,
348 MptKeyValue, SnapshotDbManagerSqlite, SqliteConnection,
349};
350
351use std::{
352 fs,
353 path::{Path, PathBuf},
354 sync::{
355 atomic::{AtomicBool, Ordering},
356 Arc,
357 },
358};
359
360use super::{
361 kvdb_sqlite_sharded::{
362 KvdbSqliteShardedDestructureTrait, KvdbSqliteShardedIteratorTag,
363 KvdbSqliteShardedRefDestructureTrait,
364 },
365 snapshot_db_manager_sqlite::AlreadyOpenSnapshots,
366 sqlite::SQLITE_NO_PARAM,
367};