cfx_storage/impls/storage_db/
snapshot_kv_db_sqlite.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5pub mod test_lib;
6
7pub struct SnapshotKvDbSqlite {
8    // Option because we need an empty snapshot db for empty snapshot.
9    pub maybe_db_connections: Option<Box<[SqliteConnection]>>,
10    already_open_snapshots: AlreadyOpenSnapshots<Self>,
11    open_semaphore: Arc<Semaphore>,
12    path: PathBuf,
13    remove_on_close: AtomicBool,
14    mpt_table_in_current_db: bool,
15}
16
17pub struct SnapshotDbStatements {
18    pub kvdb_statements: Arc<KvdbSqliteStatements>,
19    delta_mpt_set_keys_statements: Arc<KvdbSqliteStatements>,
20    delta_mpt_delete_keys_statements: Arc<KvdbSqliteStatements>,
21}
22
23lazy_static! {
24    pub static ref SNAPSHOT_DB_STATEMENTS: SnapshotDbStatements = {
25        let kvdb_statements = Arc::new(
26            KvdbSqliteStatements::make_statements(
27                &["value"],
28                &["BLOB"],
29                SnapshotKvDbSqlite::SNAPSHOT_KV_TABLE_NAME,
30                false,
31            )
32            .unwrap(),
33        );
34
35        let delta_mpt_set_keys_statements = Arc::new(
36            KvdbSqliteStatements::make_statements(
37                &["value"],
38                &["BLOB"],
39                SnapshotKvDbSqlite::DELTA_KV_SET_TABLE_NAME,
40                false,
41            )
42            .unwrap(),
43        );
44        let delta_mpt_delete_keys_statements = Arc::new(
45            KvdbSqliteStatements::make_statements(
46                &[],
47                &[],
48                SnapshotKvDbSqlite::DELTA_KV_DELETE_TABLE_NAME,
49                false,
50            )
51            .unwrap(),
52        );
53
54        SnapshotDbStatements {
55            kvdb_statements,
56            delta_mpt_set_keys_statements,
57            delta_mpt_delete_keys_statements,
58        }
59    };
60}
61
62impl Drop for SnapshotKvDbSqlite {
63    fn drop(&mut self) {
64        if !self.path.as_os_str().is_empty() {
65            self.maybe_db_connections.take();
66            SnapshotDbManagerSqlite::on_close(
67                &self.already_open_snapshots,
68                &self.open_semaphore,
69                &self.path,
70                self.remove_on_close.load(Ordering::Relaxed),
71            )
72        }
73    }
74}
75
76impl SnapshotKvDbSqlite {
77    pub const DB_SHARDS: u16 = 32;
78    /// These two tables are temporary table for the merging process, but they
79    /// remain to help other nodes to do 1-step syncing.
80    pub const DELTA_KV_DELETE_TABLE_NAME: &'static str =
81        "delta_mpt_key_value_delete";
82    pub const DELTA_KV_SET_TABLE_NAME: &'static str = "delta_mpt_key_value_set";
83    // FIXME: Archive node will have different db schema to support versioned
84    // FIXME: read and to provide incremental syncing.
85    // FIXME:
86    // FIXME: for archive mode, the delete table may live in its own db file
87    // FIXME: which contains delete table for a version range.
88    // FIXME: model this fact and refactor.
89    /*
90    pub const KVV_PUT_STATEMENT: &'static str =
91        "INSERT OR REPLACE INTO :table_name VALUES (:key, :value, :version)";
92    /// Key is not unique, because the same key can appear with different
93    /// version number.
94    pub const SNAPSHOT_KV_DELETE_TABLE_NAME: &'static str =
95        "snapshot_key_value_delete";
96    */
97    /// Key-Value table. Key is unique key in this table.
98    pub const SNAPSHOT_KV_TABLE_NAME: &'static str = "snapshot_key_value";
99}
100
101impl KeyValueDbTypes for SnapshotKvDbSqlite {
102    type ValueType = Box<[u8]>;
103}
104
105// For Snapshot KV DB.
106impl KvdbSqliteShardedRefDestructureTrait for SnapshotKvDbSqlite {
107    fn destructure(
108        &self,
109    ) -> (Option<&[SqliteConnection]>, &KvdbSqliteStatements) {
110        (
111            self.maybe_db_connections.as_ref().map(|b| &**b),
112            &*SNAPSHOT_DB_STATEMENTS.kvdb_statements,
113        )
114    }
115}
116
117impl KvdbSqliteShardedDestructureTrait for SnapshotKvDbSqlite {
118    fn destructure_mut(
119        &mut self,
120    ) -> (Option<&mut [SqliteConnection]>, &KvdbSqliteStatements) {
121        (
122            self.maybe_db_connections.as_mut().map(|b| &mut **b),
123            &*SNAPSHOT_DB_STATEMENTS.kvdb_statements,
124        )
125    }
126}
127
128/// Automatically implement KeyValueDbTraitRead with the same code of
129/// KvdbSqlite.
130impl ReadImplFamily for SnapshotKvDbSqlite {
131    type FamilyRepresentative = KvdbSqliteSharded<Box<[u8]>>;
132}
133
134impl OwnedReadImplFamily for SnapshotKvDbSqlite {
135    type FamilyRepresentative = KvdbSqliteSharded<Box<[u8]>>;
136}
137
138impl SingleWriterImplFamily for SnapshotKvDbSqlite {
139    type FamilyRepresentative = KvdbSqliteSharded<Box<[u8]>>;
140}
141
142impl SnapshotMptLoadNode
143    for KvdbSqliteShardedBorrowMut<'static, SnapshotMptDbValue>
144{
145    fn load_node_rlp(
146        &mut self, key: &[u8],
147    ) -> Result<Option<SnapshotMptDbValue>> {
148        self.get_mut_impl(key)
149    }
150}
151
152impl SnapshotMptLoadNode for KvdbSqliteSharded<SnapshotMptDbValue> {
153    fn load_node_rlp(
154        &mut self, key: &[u8],
155    ) -> Result<Option<SnapshotMptDbValue>> {
156        self.get_mut_impl(key)
157    }
158}
159
160impl SnapshotMptLoadNode
161    for KvdbSqliteShardedBorrowShared<'static, SnapshotMptDbValue>
162{
163    fn load_node_rlp(
164        &mut self, key: &[u8],
165    ) -> Result<Option<SnapshotMptDbValue>> {
166        self.get_impl(key)
167    }
168}
169
170impl<'db> OpenSnapshotMptTrait<'db> for SnapshotKvDbSqlite {
171    type SnapshotDbAsOwnedType = SnapshotMpt<
172        KvdbSqliteSharded<SnapshotMptDbValue>,
173        KvdbSqliteSharded<SnapshotMptDbValue>,
174    >;
175    /// The 'static lifetime is for for<'db> KeyValueDbIterableTrait<'db, ...>.
176    type SnapshotDbBorrowMutType = SnapshotMpt<
177        KvdbSqliteShardedBorrowMut<'static, SnapshotMptDbValue>,
178        KvdbSqliteShardedBorrowMut<'static, SnapshotMptDbValue>,
179    >;
180    type SnapshotDbBorrowSharedType = SnapshotMpt<
181        KvdbSqliteShardedBorrowShared<'static, SnapshotMptDbValue>,
182        KvdbSqliteShardedBorrowShared<'static, SnapshotMptDbValue>,
183    >;
184
185    fn open_snapshot_mpt_owned(
186        &'db mut self,
187    ) -> Result<Self::SnapshotDbBorrowMutType> {
188        debug!(
189            "open_snapshot_mpt_owned mpt_table_in_current_db {}",
190            self.mpt_table_in_current_db
191        );
192
193        // Prioritize using the MPT table in the self-database; if unavailable,
194        // then using MPT table from MPT snapshot.
195        if self.mpt_table_in_current_db {
196            Ok(SnapshotMpt::new(unsafe {
197                std::mem::transmute(KvdbSqliteShardedBorrowMut::<
198                    SnapshotMptDbValue,
199                >::new(
200                    self.maybe_db_connections.as_mut().map(|b| &mut **b),
201                    &SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements,
202                ))
203            })?)
204        } else {
205            unreachable!()
206        }
207    }
208
209    fn open_snapshot_mpt_as_owned(
210        &'db self,
211    ) -> Result<Self::SnapshotDbAsOwnedType> {
212        debug!(
213            "open_snapshot_mpt_as_owned mpt_table_in_current_db {}",
214            self.mpt_table_in_current_db
215        );
216        if self.mpt_table_in_current_db {
217            Ok(SnapshotMpt::new(
218                KvdbSqliteSharded::<SnapshotMptDbValue>::new(
219                    self.try_clone_connections()?,
220                    SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements.clone(),
221                ),
222            )?)
223        } else {
224            unreachable!()
225        }
226    }
227
228    fn open_snapshot_mpt_shared(
229        &'db self,
230    ) -> Result<Self::SnapshotDbBorrowSharedType> {
231        debug!(
232            "open_snapshot_mpt_shared mpt_table_in_current_db {}",
233            self.mpt_table_in_current_db
234        );
235        if self.mpt_table_in_current_db {
236            Ok(SnapshotMpt::new(unsafe {
237                std::mem::transmute(KvdbSqliteShardedBorrowShared::<
238                    SnapshotMptDbValue,
239                >::new(
240                    self.maybe_db_connections.as_ref().map(|b| &**b),
241                    &SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements,
242                ))
243            })?)
244        } else {
245            unreachable!()
246        }
247    }
248}
249
250impl SnapshotDbTrait for SnapshotKvDbSqlite {
251    type SnapshotKvdbIterTraitTag = KvdbSqliteShardedIteratorTag;
252    type SnapshotKvdbIterType =
253        KvdbSqliteSharded<<Self as KeyValueDbTypes>::ValueType>;
254
255    fn get_null_snapshot() -> Self {
256        Self {
257            maybe_db_connections: None,
258            already_open_snapshots: Default::default(),
259            open_semaphore: Arc::new(Semaphore::new(0)),
260            path: Default::default(),
261            remove_on_close: Default::default(),
262            mpt_table_in_current_db: true,
263        }
264    }
265
266    fn open(
267        snapshot_path: &Path, readonly: bool,
268        already_open_snapshots: &AlreadyOpenSnapshots<Self>,
269        open_semaphore: &Arc<Semaphore>,
270    ) -> Result<SnapshotKvDbSqlite> {
271        let kvdb_sqlite_sharded = KvdbSqliteSharded::<Box<[u8]>>::open(
272            Self::DB_SHARDS,
273            snapshot_path,
274            readonly,
275            SNAPSHOT_DB_STATEMENTS.kvdb_statements.clone(),
276        )?;
277
278        let mut conn = kvdb_sqlite_sharded.into_connections().unwrap();
279        let mpt_table_exist =
280            KvdbSqliteSharded::<Self::ValueType>::check_if_table_exist(
281                &mut conn,
282                &SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements,
283            )?;
284
285        Ok(Self {
286            maybe_db_connections: Some(conn),
287            already_open_snapshots: already_open_snapshots.clone(),
288            open_semaphore: open_semaphore.clone(),
289            path: snapshot_path.to_path_buf(),
290            remove_on_close: Default::default(),
291            mpt_table_in_current_db: mpt_table_exist,
292        })
293    }
294
295    fn create(
296        snapshot_path: &Path,
297        already_open_snapshots: &AlreadyOpenSnapshots<Self>,
298        open_snapshots_semaphore: &Arc<Semaphore>,
299        mpt_table_in_current_db: bool,
300    ) -> Result<SnapshotKvDbSqlite> {
301        fs::create_dir_all(snapshot_path)?;
302        let create_result = (|| -> Result<Box<[SqliteConnection]>> {
303            let kvdb_sqlite_sharded =
304                KvdbSqliteSharded::<Box<[u8]>>::create_and_open(
305                    Self::DB_SHARDS,
306                    snapshot_path,
307                    SNAPSHOT_DB_STATEMENTS.kvdb_statements.clone(),
308                    /* create_table = */ true,
309                    /* unsafe_mode = */ true,
310                )?;
311            let mut connections =
312                // Safe to unwrap since the connections are newly created.
313                kvdb_sqlite_sharded.into_connections().unwrap();
314            // Create Snapshot MPT table.
315            if mpt_table_in_current_db {
316                KvdbSqliteSharded::<Self::ValueType>::create_table(
317                    &mut connections,
318                    &SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements,
319                )?;
320            }
321            Ok(connections)
322        })();
323        match create_result {
324            Err(e) => {
325                fs::remove_dir_all(&snapshot_path)?;
326                bail!(e);
327            }
328            Ok(connections) => Ok(SnapshotKvDbSqlite {
329                maybe_db_connections: Some(connections),
330                already_open_snapshots: already_open_snapshots.clone(),
331                open_semaphore: open_snapshots_semaphore.clone(),
332                path: snapshot_path.to_path_buf(),
333                remove_on_close: Default::default(),
334                mpt_table_in_current_db,
335            }),
336        }
337    }
338
339    // FIXME: use a mechanism with rate limit.
340    fn direct_merge(
341        &mut self, old_snapshot_db: Option<&Arc<SnapshotKvDbSqlite>>,
342        mpt_snapshot: &mut Option<SnapshotMptDbSqlite>,
343        recover_mpt_with_kv_snapshot_exist: bool,
344        in_reconstruct_snapshot_state: bool,
345    ) -> Result<MerkleHash> {
346        debug!("direct_merge begins.");
347
348        if !recover_mpt_with_kv_snapshot_exist {
349            self.apply_update_to_kvdb()?;
350        }
351
352        let mut set_keys_iter = self.dumped_delta_kv_set_keys_iterator()?;
353        let mut delete_keys_iter =
354            self.dumped_delta_kv_delete_keys_iterator()?;
355
356        self.start_transaction()?;
357        // TODO: what about multi-threading node load?
358        if let Some(old_db) = old_snapshot_db {
359            let mut key_value_iter =
360                old_db.snapshot_mpt_iterator().unwrap().take();
361            let mut kv_iter =
362                key_value_iter.iter_range(&[], None).unwrap().take();
363
364            let new_mpt_snapshot = mpt_snapshot.as_mut().unwrap();
365            new_mpt_snapshot.start_transaction()?;
366            while let Some((access_key, expected_value)) = kv_iter.next()? {
367                new_mpt_snapshot.put(&access_key, &expected_value)?;
368            }
369            new_mpt_snapshot.commit_transaction()?;
370        }
371
372        let mut mpt_to_modify = if self.is_mpt_table_in_current_db() {
373            self.open_snapshot_mpt_owned()?
374        } else {
375            let mpt = mpt_snapshot.as_mut().unwrap();
376            mpt.start_transaction()?;
377            mpt.open_snapshot_mpt_owned()?
378        };
379
380        let mut mpt_merger = MptMerger::new(
381            None,
382            &mut mpt_to_modify as &mut dyn SnapshotMptTraitRw,
383        );
384
385        let snapshot_root = mpt_merger.merge_insertion_deletion_separated(
386            delete_keys_iter.iter_range(&[], None)?.take(),
387            set_keys_iter.iter_range(&[], None)?.take(),
388            in_reconstruct_snapshot_state,
389        )?;
390        self.commit_transaction()?;
391
392        if !self.is_mpt_table_in_current_db() {
393            mpt_snapshot.as_mut().unwrap().commit_transaction()?;
394        }
395
396        Ok(snapshot_root)
397    }
398
399    fn copy_and_merge(
400        &mut self, old_snapshot_db: &Arc<SnapshotKvDbSqlite>,
401        mpt_snapshot_db: &mut Option<SnapshotMptDbSqlite>,
402        in_reconstruct_snapshot_state: bool,
403    ) -> Result<MerkleHash> {
404        debug!("copy_and_merge begins.");
405        let mut kv_iter = old_snapshot_db.snapshot_kv_iterator()?.take();
406        let mut iter = kv_iter.iter_range(&[], None)?.take();
407        self.start_transaction()?;
408        while let Ok(kv_item) = iter.next() {
409            match kv_item {
410                Some((k, v)) => {
411                    self.put(&k, &v)?;
412                }
413                None => break,
414            }
415        }
416        self.commit_transaction()?;
417        self.apply_update_to_kvdb()?;
418
419        let mut set_keys_iter = self.dumped_delta_kv_set_keys_iterator()?;
420        let mut delete_keys_iter =
421            self.dumped_delta_kv_delete_keys_iterator()?;
422        self.start_transaction()?;
423        // TODO: what about multi-threading node load?
424        let mut base_mpt;
425        let mut save_as_mpt = if self.is_mpt_table_in_current_db() {
426            self.open_snapshot_mpt_owned()?
427        } else {
428            let mpt = mpt_snapshot_db.as_mut().unwrap();
429            mpt.start_transaction()?;
430            mpt.open_snapshot_mpt_owned()?
431        };
432
433        let mut mpt_merger = if old_snapshot_db.is_mpt_table_in_current_db() {
434            base_mpt = old_snapshot_db.open_snapshot_mpt_as_owned()?;
435            MptMerger::new(
436                Some(&mut base_mpt as &mut dyn SnapshotMptTraitReadAndIterate),
437                &mut save_as_mpt as &mut dyn SnapshotMptTraitRw,
438            )
439        } else {
440            // When the MPT table is in another database, readonly_mpt can be
441            // safely left as None since there's no need to copy the MPT table
442            MptMerger::new(
443                None,
444                &mut save_as_mpt as &mut dyn SnapshotMptTraitRw,
445            )
446        };
447        let snapshot_root = mpt_merger.merge_insertion_deletion_separated(
448            delete_keys_iter.iter_range(&[], None)?.take(),
449            set_keys_iter.iter_range(&[], None)?.take(),
450            in_reconstruct_snapshot_state,
451        )?;
452        self.commit_transaction()?;
453
454        if !self.is_mpt_table_in_current_db() {
455            mpt_snapshot_db.as_mut().unwrap().commit_transaction()?;
456        }
457
458        Ok(snapshot_root)
459    }
460
461    fn start_transaction(&mut self) -> Result<()> {
462        if let Some(connections) = self.maybe_db_connections.as_mut() {
463            for connection in connections.iter_mut() {
464                connection.execute("BEGIN IMMEDIATE", SQLITE_NO_PARAM)?;
465            }
466        }
467
468        Ok(())
469    }
470
471    fn commit_transaction(&mut self) -> Result<()> {
472        if let Some(connections) = self.maybe_db_connections.as_mut() {
473            for connection in connections.iter_mut() {
474                connection.execute("COMMIT", SQLITE_NO_PARAM)?;
475            }
476        }
477
478        Ok(())
479    }
480
481    fn is_mpt_table_in_current_db(&self) -> bool {
482        debug!(
483            "is_mpt_table_in_current_db {}",
484            self.mpt_table_in_current_db
485        );
486        self.mpt_table_in_current_db
487    }
488
489    fn snapshot_kv_iterator(
490        &self,
491    ) -> Result<
492        Wrap<
493            '_,
494            Self::SnapshotKvdbIterType,
495            dyn KeyValueDbIterableTrait<
496                MptKeyValue,
497                [u8],
498                KvdbSqliteShardedIteratorTag,
499            >,
500        >,
501    > {
502        Ok(Wrap(KvdbSqliteSharded::new(
503            self.try_clone_connections()?,
504            SNAPSHOT_DB_STATEMENTS.kvdb_statements.clone(),
505        )))
506    }
507}
508
509impl SnapshotKvDbSqlite {
510    // FIXME: Do not clone connections.
511    // FIXME: 1. we shouldn't not clone connections without acquire the
512    // FIXME: semaphore; 2. we should implement the range iter for
513    // FIXME: shared reading connections.
514    pub fn try_clone_connections(
515        &self,
516    ) -> Result<Option<Box<[SqliteConnection]>>> {
517        match &self.maybe_db_connections {
518            None => Ok(None),
519            Some(old_connections) => {
520                let mut connections = Vec::with_capacity(old_connections.len());
521                for old_connection in old_connections.iter() {
522                    let new_connection = old_connection.try_clone()?;
523                    connections.push(new_connection);
524                }
525                Ok(Some(connections.into_boxed_slice()))
526            }
527        }
528    }
529
530    pub fn set_remove_on_last_close(&self) {
531        self.remove_on_close.store(true, Ordering::Relaxed);
532    }
533
534    pub fn dumped_delta_kv_set_keys_iterator(
535        &self,
536    ) -> Result<KvdbSqliteSharded<<Self as KeyValueDbTypes>::ValueType>> {
537        Ok(KvdbSqliteSharded::new(
538            self.try_clone_connections()?,
539            SNAPSHOT_DB_STATEMENTS.delta_mpt_set_keys_statements.clone(),
540        ))
541    }
542
543    pub fn dumped_delta_kv_delete_keys_iterator(
544        &self,
545    ) -> Result<KvdbSqliteSharded<()>> {
546        Ok(KvdbSqliteSharded::new(
547            self.try_clone_connections()?,
548            SNAPSHOT_DB_STATEMENTS
549                .delta_mpt_delete_keys_statements
550                .clone(),
551        ))
552    }
553
554    // FIXME: add rate limit.
555    // FIXME: how to handle row_id, this should go to the merkle tree?
556    pub fn dump_delta_mpt(
557        &mut self, delta_mpt: &DeltaMptIterator,
558    ) -> Result<()> {
559        debug!("dump_delta_mpt starts");
560        // Create tables.
561        {
562            // Safe to unwrap since we are not on a NULL snapshot.
563            let connections = self.maybe_db_connections.as_mut().unwrap();
564            <DeltaMptDumperSetDb as SingleWriterImplFamily>::FamilyRepresentative::create_table(
565                connections,
566                &SNAPSHOT_DB_STATEMENTS.delta_mpt_set_keys_statements,
567            )?;
568            <DeltaMptDumperDeleteDb as SingleWriterImplFamily>::FamilyRepresentative::create_table(
569                connections,
570                &SNAPSHOT_DB_STATEMENTS.delta_mpt_delete_keys_statements,
571            )?;
572        }
573
574        // Dump code.
575        self.start_transaction()?;
576        delta_mpt.iterate(&mut DeltaMptMergeDumperSqlite {
577            connections: self.maybe_db_connections.as_mut().unwrap(),
578        })?;
579        self.commit_transaction()?;
580
581        Ok(())
582    }
583
584    /// Dropping is optional, because these tables are necessary to provide
585    /// 1-step syncing.
586    pub fn drop_delta_mpt_dump(&mut self) -> Result<()> {
587        // Safe to unwrap since we are not on a NULL snapshot.
588        let connections = self.maybe_db_connections.as_mut().unwrap();
589        <DeltaMptDumperSetDb as SingleWriterImplFamily>::FamilyRepresentative::drop_table(
590            connections,
591            &SNAPSHOT_DB_STATEMENTS.delta_mpt_set_keys_statements,
592        )?;
593        <DeltaMptDumperDeleteDb as SingleWriterImplFamily>::FamilyRepresentative::drop_table(
594            connections,
595            &SNAPSHOT_DB_STATEMENTS.delta_mpt_delete_keys_statements,
596        )
597    }
598
599    pub fn drop_mpt_table(&mut self) -> Result<()> {
600        if self.mpt_table_in_current_db {
601            let connections = self.maybe_db_connections.as_mut().unwrap();
602            KvdbSqliteSharded::<()>::drop_table(
603                connections,
604                &SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements,
605            )?;
606
607            self.mpt_table_in_current_db = false;
608
609            KvdbSqliteSharded::<()>::vacumm_db(
610                connections,
611                &SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements,
612            )
613        } else {
614            Ok(())
615        }
616    }
617
618    fn apply_update_to_kvdb(&mut self) -> Result<()> {
619        // Safe to unwrap since we are not on a NULL snapshot.
620        for sqlite in self.maybe_db_connections.as_mut().unwrap().iter_mut() {
621            sqlite
622                .execute(
623                    format!(
624                        "DELETE FROM {} WHERE KEY IN (SELECT key FROM {})",
625                        Self::SNAPSHOT_KV_TABLE_NAME,
626                        Self::DELTA_KV_DELETE_TABLE_NAME
627                    )
628                    .as_str(),
629                    SQLITE_NO_PARAM,
630                )?
631                .finish_ignore_rows()?;
632            sqlite
633                .execute(
634                    format!(
635                        "INSERT OR REPLACE INTO {} (key, value) \
636                         SELECT key, value FROM {}",
637                        Self::SNAPSHOT_KV_TABLE_NAME,
638                        Self::DELTA_KV_SET_TABLE_NAME
639                    )
640                    .as_str(),
641                    SQLITE_NO_PARAM,
642                )?
643                .finish_ignore_rows()?;
644        }
645        Ok(())
646    }
647
648    fn snapshot_mpt_iterator(
649        &self,
650    ) -> Result<
651        Wrap<
652            '_,
653            KvdbSqliteSharded<<Self as KeyValueDbTypes>::ValueType>,
654            dyn KeyValueDbIterableTrait<
655                MptKeyValue,
656                [u8],
657                KvdbSqliteShardedIteratorTag,
658            >,
659        >,
660    > {
661        if self.mpt_table_in_current_db {
662            Ok(Wrap(KvdbSqliteSharded::new(
663                self.try_clone_connections()?,
664                SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements.clone(),
665            )))
666        } else {
667            bail!("mpt_snapshot is not in current db");
668        }
669    }
670}
671
672pub struct DeltaMptMergeDumperSqlite<'a> {
673    connections: &'a mut [SqliteConnection],
674}
675
676pub struct DeltaMptDumperSetDb<'a> {
677    connections: &'a mut [SqliteConnection],
678}
679
680pub struct DeltaMptDumperDeleteDb<'a> {
681    connections: &'a mut [SqliteConnection],
682}
683
684impl KeyValueDbTypes for DeltaMptDumperSetDb<'_> {
685    type ValueType = Box<[u8]>;
686}
687
688impl KeyValueDbTypes for DeltaMptDumperDeleteDb<'_> {
689    type ValueType = ();
690}
691
692impl SingleWriterImplFamily for DeltaMptDumperSetDb<'_> {
693    type FamilyRepresentative = KvdbSqliteSharded<Box<[u8]>>;
694}
695
696impl SingleWriterImplFamily for DeltaMptDumperDeleteDb<'_> {
697    type FamilyRepresentative = KvdbSqliteSharded<()>;
698}
699
700impl KvdbSqliteShardedDestructureTrait for DeltaMptDumperSetDb<'_> {
701    fn destructure_mut(
702        &mut self,
703    ) -> (Option<&mut [SqliteConnection]>, &KvdbSqliteStatements) {
704        (
705            Some(*&mut self.connections),
706            &SNAPSHOT_DB_STATEMENTS.delta_mpt_set_keys_statements,
707        )
708    }
709}
710
711impl KvdbSqliteShardedDestructureTrait for DeltaMptDumperDeleteDb<'_> {
712    fn destructure_mut(
713        &mut self,
714    ) -> (Option<&mut [SqliteConnection]>, &KvdbSqliteStatements) {
715        (
716            Some(*&mut self.connections),
717            &SNAPSHOT_DB_STATEMENTS.delta_mpt_delete_keys_statements,
718        )
719    }
720}
721
722impl<'a> KVInserter<MptKeyValue> for DeltaMptMergeDumperSqlite<'a> {
723    fn push(&mut self, x: MptKeyValue) -> Result<()> {
724        // TODO: what about multi-threading put?
725        let (mpt_key, value) = x;
726        let snapshot_key =
727            StorageKeyWithSpace::from_delta_mpt_key(&mpt_key).to_key_bytes();
728        if value.len() > 0 {
729            DeltaMptDumperSetDb {
730                connections: *&mut self.connections,
731            }
732            .put_impl(&snapshot_key, &value)?;
733        } else {
734            DeltaMptDumperDeleteDb {
735                connections: *&mut self.connections,
736            }
737            .put_impl(&snapshot_key, &())?;
738        }
739
740        Ok(())
741    }
742}
743
744use crate::{
745    impls::{
746        delta_mpt::DeltaMptIterator,
747        errors::*,
748        merkle_patricia_trie::{MptKeyValue, MptMerger},
749        storage_db::{
750            kvdb_sqlite::KvdbSqliteStatements,
751            kvdb_sqlite_sharded::{
752                KvdbSqliteSharded, KvdbSqliteShardedBorrowMut,
753                KvdbSqliteShardedBorrowShared,
754                KvdbSqliteShardedDestructureTrait,
755                KvdbSqliteShardedIteratorTag,
756                KvdbSqliteShardedRefDestructureTrait,
757            },
758            snapshot_db_manager_sqlite::AlreadyOpenSnapshots,
759            snapshot_mpt::{SnapshotMpt, SnapshotMptLoadNode},
760            sqlite::SQLITE_NO_PARAM,
761        },
762    },
763    storage_db::{
764        KeyValueDbIterableTrait, KeyValueDbTraitSingleWriter, KeyValueDbTypes,
765        OpenSnapshotMptTrait, OwnedReadImplByFamily, OwnedReadImplFamily,
766        ReadImplByFamily, ReadImplFamily, SingleWriterImplByFamily,
767        SingleWriterImplFamily, SnapshotDbTrait, SnapshotMptDbValue,
768        SnapshotMptTraitReadAndIterate, SnapshotMptTraitRw,
769    },
770    utils::wrap::Wrap,
771    KVInserter, SnapshotDbManagerSqlite, SqliteConnection,
772};
773use fallible_iterator::FallibleIterator;
774use primitives::{MerkleHash, StorageKeyWithSpace};
775use std::{
776    fs,
777    path::{Path, PathBuf},
778    sync::{
779        atomic::{AtomicBool, Ordering},
780        Arc,
781    },
782};
783use tokio::sync::Semaphore;
784
785use super::snapshot_mpt_db_sqlite::{
786    SnapshotMptDbSqlite, SNAPSHOT_MPT_DB_STATEMENTS,
787};