cfx_storage/impls/storage_db/
snapshot_mpt_db_sqlite.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5pub struct SnapshotMptDbSqlite {
6    maybe_db_connections: Option<Box<[SqliteConnection]>>,
7    already_open_snapshots: AlreadyOpenSnapshots<Self>,
8    open_semaphore: Arc<Semaphore>,
9    path: PathBuf,
10    remove_on_close: AtomicBool,
11    latest_mpt_snapshot_semaphore: Option<Arc<Semaphore>>,
12}
13
14pub struct SnapshotMptDbStatements {
15    pub mpt_statements: Arc<KvdbSqliteStatements>,
16}
17
18lazy_static! {
19    pub static ref SNAPSHOT_MPT_DB_STATEMENTS: SnapshotMptDbStatements = {
20        let mpt_statements = Arc::new(
21            KvdbSqliteStatements::make_statements(
22                &["node_rlp"],
23                &["BLOB"],
24                SnapshotMptDbSqlite::SNAPSHOT_MPT_TABLE_NAME,
25                false,
26            )
27            .unwrap(),
28        );
29
30        SnapshotMptDbStatements { mpt_statements }
31    };
32}
33
34impl Drop for SnapshotMptDbSqlite {
35    fn drop(&mut self) {
36        if !self.path.as_os_str().is_empty() {
37            debug!("drop SnapshotMptDbSqlite {:?}", self.path);
38
39            self.maybe_db_connections.take();
40            SnapshotDbManagerSqlite::on_close_mpt_snapshot(
41                &self.already_open_snapshots,
42                &self.open_semaphore,
43                &self.path,
44                self.remove_on_close.load(Ordering::Relaxed),
45                &self.latest_mpt_snapshot_semaphore,
46            )
47        }
48    }
49}
50
51impl SnapshotMptDbSqlite {
52    pub const DB_SHARDS: u16 = 32;
53    /// MPT Table.
54    pub const SNAPSHOT_MPT_TABLE_NAME: &'static str = "snapshot_mpt";
55}
56
57impl KeyValueDbTypes for SnapshotMptDbSqlite {
58    type ValueType = Box<[u8]>;
59}
60
61// For Snapshot MPT DB.
62impl KvdbSqliteShardedRefDestructureTrait for SnapshotMptDbSqlite {
63    fn destructure(
64        &self,
65    ) -> (Option<&[SqliteConnection]>, &KvdbSqliteStatements) {
66        (
67            self.maybe_db_connections.as_ref().map(|b| &**b),
68            &*SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements,
69        )
70    }
71}
72
73impl KvdbSqliteShardedDestructureTrait for SnapshotMptDbSqlite {
74    fn destructure_mut(
75        &mut self,
76    ) -> (Option<&mut [SqliteConnection]>, &KvdbSqliteStatements) {
77        (
78            self.maybe_db_connections.as_mut().map(|b| &mut **b),
79            &*SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements,
80        )
81    }
82}
83
84/// Automatically implement KeyValueDbTraitRead with the same code of
85/// KvdbSqlite.
86impl ReadImplFamily for SnapshotMptDbSqlite {
87    type FamilyRepresentative = KvdbSqliteSharded<Box<[u8]>>;
88}
89
90impl OwnedReadImplFamily for SnapshotMptDbSqlite {
91    type FamilyRepresentative = KvdbSqliteSharded<Box<[u8]>>;
92}
93
94impl SingleWriterImplFamily for SnapshotMptDbSqlite {
95    type FamilyRepresentative = KvdbSqliteSharded<Box<[u8]>>;
96}
97
98impl<'db> OpenSnapshotMptTrait<'db> for SnapshotMptDbSqlite {
99    type SnapshotDbAsOwnedType = SnapshotMpt<
100        KvdbSqliteSharded<SnapshotMptDbValue>,
101        KvdbSqliteSharded<SnapshotMptDbValue>,
102    >;
103    /// The 'static lifetime is for for<'db> KeyValueDbIterableTrait<'db, ...>.
104    type SnapshotDbBorrowMutType = SnapshotMpt<
105        KvdbSqliteShardedBorrowMut<'static, SnapshotMptDbValue>,
106        KvdbSqliteShardedBorrowMut<'static, SnapshotMptDbValue>,
107    >;
108    type SnapshotDbBorrowSharedType = SnapshotMpt<
109        KvdbSqliteShardedBorrowShared<'static, SnapshotMptDbValue>,
110        KvdbSqliteShardedBorrowShared<'static, SnapshotMptDbValue>,
111    >;
112
113    fn open_snapshot_mpt_owned(
114        &'db mut self,
115    ) -> Result<Self::SnapshotDbBorrowMutType> {
116        Ok(SnapshotMpt::new(unsafe {
117            std::mem::transmute(
118                KvdbSqliteShardedBorrowMut::<SnapshotMptDbValue>::new(
119                    self.maybe_db_connections.as_mut().map(|b| &mut **b),
120                    &SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements,
121                ),
122            )
123        })?)
124    }
125
126    fn open_snapshot_mpt_as_owned(
127        &'db self,
128    ) -> Result<Self::SnapshotDbAsOwnedType> {
129        Ok(SnapshotMpt::new(
130            KvdbSqliteSharded::<SnapshotMptDbValue>::new(
131                self.try_clone_connections()?,
132                SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements.clone(),
133            ),
134        )?)
135    }
136
137    fn open_snapshot_mpt_shared(
138        &'db self,
139    ) -> Result<Self::SnapshotDbBorrowSharedType> {
140        Ok(SnapshotMpt::new(unsafe {
141            std::mem::transmute(KvdbSqliteShardedBorrowShared::<
142                SnapshotMptDbValue,
143            >::new(
144                self.maybe_db_connections.as_ref().map(|b| &**b),
145                &SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements,
146            ))
147        })?)
148    }
149}
150
151impl SnapshotDbTrait for SnapshotMptDbSqlite {
152    type SnapshotKvdbIterTraitTag = KvdbSqliteShardedIteratorTag;
153    type SnapshotKvdbIterType =
154        KvdbSqliteSharded<<Self as KeyValueDbTypes>::ValueType>;
155
156    fn get_null_snapshot() -> Self { unreachable!() }
157
158    fn open(
159        _snapshot_path: &Path, _readonly: bool,
160        _already_open_snapshots: &AlreadyOpenSnapshots<Self>,
161        _open_semaphore: &Arc<Semaphore>,
162    ) -> Result<SnapshotMptDbSqlite> {
163        unreachable!()
164    }
165
166    fn create(
167        _snapshot_path: &Path,
168        _already_open_snapshots: &AlreadyOpenSnapshots<Self>,
169        _open_snapshots_semaphore: &Arc<Semaphore>,
170        _mpt_table_in_current_db: bool,
171    ) -> Result<SnapshotMptDbSqlite> {
172        unreachable!()
173    }
174
175    fn direct_merge(
176        &mut self, _old_snapshot_db: Option<&Arc<SnapshotMptDbSqlite>>,
177        _mpt_snapshot: &mut Option<SnapshotMptDbSqlite>,
178        _recover_mpt_with_kv_snapshot_exist: bool,
179        _in_reconstruct_snapshot_state: bool,
180    ) -> Result<MerkleHash> {
181        unreachable!()
182    }
183
184    fn copy_and_merge(
185        &mut self, _old_snapshot_db: &Arc<SnapshotMptDbSqlite>,
186        _mpt_snapshot_db: &mut Option<SnapshotMptDbSqlite>,
187        _in_reconstruct_snapshot_state: bool,
188    ) -> Result<MerkleHash> {
189        unreachable!()
190    }
191
192    fn start_transaction(&mut self) -> Result<()> {
193        if let Some(connections) = self.maybe_db_connections.as_mut() {
194            for connection in connections.iter_mut() {
195                connection.execute("BEGIN IMMEDIATE", SQLITE_NO_PARAM)?;
196            }
197        }
198        Ok(())
199    }
200
201    fn commit_transaction(&mut self) -> Result<()> {
202        if let Some(connections) = self.maybe_db_connections.as_mut() {
203            for connection in connections.iter_mut() {
204                connection.execute("COMMIT", SQLITE_NO_PARAM)?;
205            }
206        }
207        Ok(())
208    }
209
210    fn is_mpt_table_in_current_db(&self) -> bool { unreachable!() }
211
212    fn snapshot_kv_iterator(
213        &self,
214    ) -> Result<
215        Wrap<
216            '_,
217            Self::SnapshotKvdbIterType,
218            dyn KeyValueDbIterableTrait<
219                MptKeyValue,
220                [u8],
221                KvdbSqliteShardedIteratorTag,
222            >,
223        >,
224    > {
225        unreachable!()
226    }
227}
228
229impl SnapshotMptDbSqlite {
230    fn try_clone_connections(&self) -> Result<Option<Box<[SqliteConnection]>>> {
231        match &self.maybe_db_connections {
232            None => Ok(None),
233            Some(old_connections) => {
234                let mut connections = Vec::with_capacity(old_connections.len());
235                for old_connection in old_connections.iter() {
236                    let new_connection = old_connection.try_clone()?;
237                    connections.push(new_connection);
238                }
239                Ok(Some(connections.into_boxed_slice()))
240            }
241        }
242    }
243
244    pub fn open(
245        snapshot_path: &Path, readonly: bool,
246        already_open_snapshots: &AlreadyOpenSnapshots<Self>,
247        open_semaphore: &Arc<Semaphore>,
248        latest_mpt_snapshot_semaphore: Option<Arc<Semaphore>>,
249    ) -> Result<SnapshotMptDbSqlite> {
250        let kvdb_sqlite_sharded = KvdbSqliteSharded::<Box<[u8]>>::open(
251            Self::DB_SHARDS,
252            snapshot_path,
253            readonly,
254            SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements.clone(),
255        )?;
256
257        Ok(Self {
258            maybe_db_connections: kvdb_sqlite_sharded.into_connections(),
259            already_open_snapshots: already_open_snapshots.clone(),
260            open_semaphore: open_semaphore.clone(),
261            path: snapshot_path.to_path_buf(),
262            remove_on_close: Default::default(),
263            latest_mpt_snapshot_semaphore,
264        })
265    }
266
267    pub fn create(
268        snapshot_path: &Path,
269        already_open_snapshots: &AlreadyOpenSnapshots<Self>,
270        open_snapshots_semaphore: &Arc<Semaphore>,
271        latest_mpt_snapshot_semaphore: Option<Arc<Semaphore>>,
272    ) -> Result<SnapshotMptDbSqlite> {
273        fs::create_dir_all(snapshot_path)?;
274        let create_result = (|| -> Result<Box<[SqliteConnection]>> {
275            let kvdb_sqlite_sharded =
276                KvdbSqliteSharded::<Box<[u8]>>::create_and_open(
277                    Self::DB_SHARDS,
278                    snapshot_path,
279                    SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements.clone(),
280                    /* create_table = */ true,
281                    /* unsafe_mode = */ true,
282                )?;
283            let connections = kvdb_sqlite_sharded.into_connections().unwrap();
284
285            Ok(connections)
286        })();
287        match create_result {
288            Err(e) => {
289                fs::remove_dir_all(&snapshot_path)?;
290                bail!(e);
291            }
292            Ok(connections) => Ok(SnapshotMptDbSqlite {
293                maybe_db_connections: Some(connections),
294                already_open_snapshots: already_open_snapshots.clone(),
295                open_semaphore: open_snapshots_semaphore.clone(),
296                path: snapshot_path.to_path_buf(),
297                remove_on_close: Default::default(),
298                latest_mpt_snapshot_semaphore,
299            }),
300        }
301    }
302
303    pub fn set_remove_on_last_close(&self) {
304        self.remove_on_close.store(true, Ordering::Relaxed);
305    }
306
307    pub fn snapshot_mpt_itertor(
308        &self,
309    ) -> Result<
310        Wrap<
311            '_,
312            KvdbSqliteSharded<<Self as KeyValueDbTypes>::ValueType>,
313            dyn KeyValueDbIterableTrait<
314                MptKeyValue,
315                [u8],
316                KvdbSqliteShardedIteratorTag,
317            >,
318        >,
319    > {
320        Ok(Wrap(KvdbSqliteSharded::new(
321            self.try_clone_connections()?,
322            SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements.clone(),
323        )))
324    }
325}
326
327use primitives::MerkleHash;
328use tokio::sync::Semaphore;
329
330use crate::{
331    impls::{
332        errors::*,
333        storage_db::{
334            kvdb_sqlite::KvdbSqliteStatements,
335            kvdb_sqlite_sharded::{
336                KvdbSqliteSharded, KvdbSqliteShardedBorrowMut,
337                KvdbSqliteShardedBorrowShared,
338            },
339            snapshot_mpt::SnapshotMpt,
340        },
341    },
342    storage_db::{
343        KeyValueDbIterableTrait, KeyValueDbTypes, OpenSnapshotMptTrait,
344        OwnedReadImplFamily, ReadImplFamily, SingleWriterImplFamily,
345        SnapshotDbTrait, SnapshotMptDbValue,
346    },
347    utils::wrap::Wrap,
348    MptKeyValue, SnapshotDbManagerSqlite, SqliteConnection,
349};
350
351use std::{
352    fs,
353    path::{Path, PathBuf},
354    sync::{
355        atomic::{AtomicBool, Ordering},
356        Arc,
357    },
358};
359
360use super::{
361    kvdb_sqlite_sharded::{
362        KvdbSqliteShardedDestructureTrait, KvdbSqliteShardedIteratorTag,
363        KvdbSqliteShardedRefDestructureTrait,
364    },
365    snapshot_db_manager_sqlite::AlreadyOpenSnapshots,
366    sqlite::SQLITE_NO_PARAM,
367};