1pub mod test_lib;
6
7pub struct SnapshotKvDbSqlite {
8 pub maybe_db_connections: Option<Box<[SqliteConnection]>>,
10 already_open_snapshots: AlreadyOpenSnapshots<Self>,
11 open_semaphore: Arc<Semaphore>,
12 path: PathBuf,
13 remove_on_close: AtomicBool,
14 mpt_table_in_current_db: bool,
15}
16
17pub struct SnapshotDbStatements {
18 pub kvdb_statements: Arc<KvdbSqliteStatements>,
19 delta_mpt_set_keys_statements: Arc<KvdbSqliteStatements>,
20 delta_mpt_delete_keys_statements: Arc<KvdbSqliteStatements>,
21}
22
23lazy_static! {
24 pub static ref SNAPSHOT_DB_STATEMENTS: SnapshotDbStatements = {
25 let kvdb_statements = Arc::new(
26 KvdbSqliteStatements::make_statements(
27 &["value"],
28 &["BLOB"],
29 SnapshotKvDbSqlite::SNAPSHOT_KV_TABLE_NAME,
30 false,
31 )
32 .unwrap(),
33 );
34
35 let delta_mpt_set_keys_statements = Arc::new(
36 KvdbSqliteStatements::make_statements(
37 &["value"],
38 &["BLOB"],
39 SnapshotKvDbSqlite::DELTA_KV_SET_TABLE_NAME,
40 false,
41 )
42 .unwrap(),
43 );
44 let delta_mpt_delete_keys_statements = Arc::new(
45 KvdbSqliteStatements::make_statements(
46 &[],
47 &[],
48 SnapshotKvDbSqlite::DELTA_KV_DELETE_TABLE_NAME,
49 false,
50 )
51 .unwrap(),
52 );
53
54 SnapshotDbStatements {
55 kvdb_statements,
56 delta_mpt_set_keys_statements,
57 delta_mpt_delete_keys_statements,
58 }
59 };
60}
61
62impl Drop for SnapshotKvDbSqlite {
63 fn drop(&mut self) {
64 if !self.path.as_os_str().is_empty() {
65 self.maybe_db_connections.take();
66 SnapshotDbManagerSqlite::on_close(
67 &self.already_open_snapshots,
68 &self.open_semaphore,
69 &self.path,
70 self.remove_on_close.load(Ordering::Relaxed),
71 )
72 }
73 }
74}
75
76impl SnapshotKvDbSqlite {
77 pub const DB_SHARDS: u16 = 32;
78 pub const DELTA_KV_DELETE_TABLE_NAME: &'static str =
81 "delta_mpt_key_value_delete";
82 pub const DELTA_KV_SET_TABLE_NAME: &'static str = "delta_mpt_key_value_set";
83 pub const SNAPSHOT_KV_TABLE_NAME: &'static str = "snapshot_key_value";
99}
100
101impl KeyValueDbTypes for SnapshotKvDbSqlite {
102 type ValueType = Box<[u8]>;
103}
104
105impl KvdbSqliteShardedRefDestructureTrait for SnapshotKvDbSqlite {
107 fn destructure(
108 &self,
109 ) -> (Option<&[SqliteConnection]>, &KvdbSqliteStatements) {
110 (
111 self.maybe_db_connections.as_ref().map(|b| &**b),
112 &*SNAPSHOT_DB_STATEMENTS.kvdb_statements,
113 )
114 }
115}
116
117impl KvdbSqliteShardedDestructureTrait for SnapshotKvDbSqlite {
118 fn destructure_mut(
119 &mut self,
120 ) -> (Option<&mut [SqliteConnection]>, &KvdbSqliteStatements) {
121 (
122 self.maybe_db_connections.as_mut().map(|b| &mut **b),
123 &*SNAPSHOT_DB_STATEMENTS.kvdb_statements,
124 )
125 }
126}
127
128impl ReadImplFamily for SnapshotKvDbSqlite {
131 type FamilyRepresentative = KvdbSqliteSharded<Box<[u8]>>;
132}
133
134impl OwnedReadImplFamily for SnapshotKvDbSqlite {
135 type FamilyRepresentative = KvdbSqliteSharded<Box<[u8]>>;
136}
137
138impl SingleWriterImplFamily for SnapshotKvDbSqlite {
139 type FamilyRepresentative = KvdbSqliteSharded<Box<[u8]>>;
140}
141
142impl SnapshotMptLoadNode
143 for KvdbSqliteShardedBorrowMut<'static, SnapshotMptDbValue>
144{
145 fn load_node_rlp(
146 &mut self, key: &[u8],
147 ) -> Result<Option<SnapshotMptDbValue>> {
148 self.get_mut_impl(key)
149 }
150}
151
152impl SnapshotMptLoadNode for KvdbSqliteSharded<SnapshotMptDbValue> {
153 fn load_node_rlp(
154 &mut self, key: &[u8],
155 ) -> Result<Option<SnapshotMptDbValue>> {
156 self.get_mut_impl(key)
157 }
158}
159
160impl SnapshotMptLoadNode
161 for KvdbSqliteShardedBorrowShared<'static, SnapshotMptDbValue>
162{
163 fn load_node_rlp(
164 &mut self, key: &[u8],
165 ) -> Result<Option<SnapshotMptDbValue>> {
166 self.get_impl(key)
167 }
168}
169
170impl<'db> OpenSnapshotMptTrait<'db> for SnapshotKvDbSqlite {
171 type SnapshotDbAsOwnedType = SnapshotMpt<
172 KvdbSqliteSharded<SnapshotMptDbValue>,
173 KvdbSqliteSharded<SnapshotMptDbValue>,
174 >;
175 type SnapshotDbBorrowMutType = SnapshotMpt<
177 KvdbSqliteShardedBorrowMut<'static, SnapshotMptDbValue>,
178 KvdbSqliteShardedBorrowMut<'static, SnapshotMptDbValue>,
179 >;
180 type SnapshotDbBorrowSharedType = SnapshotMpt<
181 KvdbSqliteShardedBorrowShared<'static, SnapshotMptDbValue>,
182 KvdbSqliteShardedBorrowShared<'static, SnapshotMptDbValue>,
183 >;
184
185 fn open_snapshot_mpt_owned(
186 &'db mut self,
187 ) -> Result<Self::SnapshotDbBorrowMutType> {
188 debug!(
189 "open_snapshot_mpt_owned mpt_table_in_current_db {}",
190 self.mpt_table_in_current_db
191 );
192
193 if self.mpt_table_in_current_db {
196 Ok(SnapshotMpt::new(unsafe {
197 std::mem::transmute(KvdbSqliteShardedBorrowMut::<
198 SnapshotMptDbValue,
199 >::new(
200 self.maybe_db_connections.as_mut().map(|b| &mut **b),
201 &SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements,
202 ))
203 })?)
204 } else {
205 unreachable!()
206 }
207 }
208
209 fn open_snapshot_mpt_as_owned(
210 &'db self,
211 ) -> Result<Self::SnapshotDbAsOwnedType> {
212 debug!(
213 "open_snapshot_mpt_as_owned mpt_table_in_current_db {}",
214 self.mpt_table_in_current_db
215 );
216 if self.mpt_table_in_current_db {
217 Ok(SnapshotMpt::new(
218 KvdbSqliteSharded::<SnapshotMptDbValue>::new(
219 self.try_clone_connections()?,
220 SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements.clone(),
221 ),
222 )?)
223 } else {
224 unreachable!()
225 }
226 }
227
228 fn open_snapshot_mpt_shared(
229 &'db self,
230 ) -> Result<Self::SnapshotDbBorrowSharedType> {
231 debug!(
232 "open_snapshot_mpt_shared mpt_table_in_current_db {}",
233 self.mpt_table_in_current_db
234 );
235 if self.mpt_table_in_current_db {
236 Ok(SnapshotMpt::new(unsafe {
237 std::mem::transmute(KvdbSqliteShardedBorrowShared::<
238 SnapshotMptDbValue,
239 >::new(
240 self.maybe_db_connections.as_ref().map(|b| &**b),
241 &SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements,
242 ))
243 })?)
244 } else {
245 unreachable!()
246 }
247 }
248}
249
250impl SnapshotDbTrait for SnapshotKvDbSqlite {
251 type SnapshotKvdbIterTraitTag = KvdbSqliteShardedIteratorTag;
252 type SnapshotKvdbIterType =
253 KvdbSqliteSharded<<Self as KeyValueDbTypes>::ValueType>;
254
255 fn get_null_snapshot() -> Self {
256 Self {
257 maybe_db_connections: None,
258 already_open_snapshots: Default::default(),
259 open_semaphore: Arc::new(Semaphore::new(0)),
260 path: Default::default(),
261 remove_on_close: Default::default(),
262 mpt_table_in_current_db: true,
263 }
264 }
265
266 fn open(
267 snapshot_path: &Path, readonly: bool,
268 already_open_snapshots: &AlreadyOpenSnapshots<Self>,
269 open_semaphore: &Arc<Semaphore>,
270 ) -> Result<SnapshotKvDbSqlite> {
271 let kvdb_sqlite_sharded = KvdbSqliteSharded::<Box<[u8]>>::open(
272 Self::DB_SHARDS,
273 snapshot_path,
274 readonly,
275 SNAPSHOT_DB_STATEMENTS.kvdb_statements.clone(),
276 )?;
277
278 let mut conn = kvdb_sqlite_sharded.into_connections().unwrap();
279 let mpt_table_exist =
280 KvdbSqliteSharded::<Self::ValueType>::check_if_table_exist(
281 &mut conn,
282 &SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements,
283 )?;
284
285 Ok(Self {
286 maybe_db_connections: Some(conn),
287 already_open_snapshots: already_open_snapshots.clone(),
288 open_semaphore: open_semaphore.clone(),
289 path: snapshot_path.to_path_buf(),
290 remove_on_close: Default::default(),
291 mpt_table_in_current_db: mpt_table_exist,
292 })
293 }
294
295 fn create(
296 snapshot_path: &Path,
297 already_open_snapshots: &AlreadyOpenSnapshots<Self>,
298 open_snapshots_semaphore: &Arc<Semaphore>,
299 mpt_table_in_current_db: bool,
300 ) -> Result<SnapshotKvDbSqlite> {
301 fs::create_dir_all(snapshot_path)?;
302 let create_result = (|| -> Result<Box<[SqliteConnection]>> {
303 let kvdb_sqlite_sharded =
304 KvdbSqliteSharded::<Box<[u8]>>::create_and_open(
305 Self::DB_SHARDS,
306 snapshot_path,
307 SNAPSHOT_DB_STATEMENTS.kvdb_statements.clone(),
308 true,
309 true,
310 )?;
311 let mut connections =
312 kvdb_sqlite_sharded.into_connections().unwrap();
314 if mpt_table_in_current_db {
316 KvdbSqliteSharded::<Self::ValueType>::create_table(
317 &mut connections,
318 &SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements,
319 )?;
320 }
321 Ok(connections)
322 })();
323 match create_result {
324 Err(e) => {
325 fs::remove_dir_all(&snapshot_path)?;
326 bail!(e);
327 }
328 Ok(connections) => Ok(SnapshotKvDbSqlite {
329 maybe_db_connections: Some(connections),
330 already_open_snapshots: already_open_snapshots.clone(),
331 open_semaphore: open_snapshots_semaphore.clone(),
332 path: snapshot_path.to_path_buf(),
333 remove_on_close: Default::default(),
334 mpt_table_in_current_db,
335 }),
336 }
337 }
338
339 fn direct_merge(
341 &mut self, old_snapshot_db: Option<&Arc<SnapshotKvDbSqlite>>,
342 mpt_snapshot: &mut Option<SnapshotMptDbSqlite>,
343 recover_mpt_with_kv_snapshot_exist: bool,
344 in_reconstruct_snapshot_state: bool,
345 ) -> Result<MerkleHash> {
346 debug!("direct_merge begins.");
347
348 if !recover_mpt_with_kv_snapshot_exist {
349 self.apply_update_to_kvdb()?;
350 }
351
352 let mut set_keys_iter = self.dumped_delta_kv_set_keys_iterator()?;
353 let mut delete_keys_iter =
354 self.dumped_delta_kv_delete_keys_iterator()?;
355
356 self.start_transaction()?;
357 if let Some(old_db) = old_snapshot_db {
359 let mut key_value_iter =
360 old_db.snapshot_mpt_iterator().unwrap().take();
361 let mut kv_iter =
362 key_value_iter.iter_range(&[], None).unwrap().take();
363
364 let new_mpt_snapshot = mpt_snapshot.as_mut().unwrap();
365 new_mpt_snapshot.start_transaction()?;
366 while let Some((access_key, expected_value)) = kv_iter.next()? {
367 new_mpt_snapshot.put(&access_key, &expected_value)?;
368 }
369 new_mpt_snapshot.commit_transaction()?;
370 }
371
372 let mut mpt_to_modify = if self.is_mpt_table_in_current_db() {
373 self.open_snapshot_mpt_owned()?
374 } else {
375 let mpt = mpt_snapshot.as_mut().unwrap();
376 mpt.start_transaction()?;
377 mpt.open_snapshot_mpt_owned()?
378 };
379
380 let mut mpt_merger = MptMerger::new(
381 None,
382 &mut mpt_to_modify as &mut dyn SnapshotMptTraitRw,
383 );
384
385 let snapshot_root = mpt_merger.merge_insertion_deletion_separated(
386 delete_keys_iter.iter_range(&[], None)?.take(),
387 set_keys_iter.iter_range(&[], None)?.take(),
388 in_reconstruct_snapshot_state,
389 )?;
390 self.commit_transaction()?;
391
392 if !self.is_mpt_table_in_current_db() {
393 mpt_snapshot.as_mut().unwrap().commit_transaction()?;
394 }
395
396 Ok(snapshot_root)
397 }
398
399 fn copy_and_merge(
400 &mut self, old_snapshot_db: &Arc<SnapshotKvDbSqlite>,
401 mpt_snapshot_db: &mut Option<SnapshotMptDbSqlite>,
402 in_reconstruct_snapshot_state: bool,
403 ) -> Result<MerkleHash> {
404 debug!("copy_and_merge begins.");
405 let mut kv_iter = old_snapshot_db.snapshot_kv_iterator()?.take();
406 let mut iter = kv_iter.iter_range(&[], None)?.take();
407 self.start_transaction()?;
408 while let Ok(kv_item) = iter.next() {
409 match kv_item {
410 Some((k, v)) => {
411 self.put(&k, &v)?;
412 }
413 None => break,
414 }
415 }
416 self.commit_transaction()?;
417 self.apply_update_to_kvdb()?;
418
419 let mut set_keys_iter = self.dumped_delta_kv_set_keys_iterator()?;
420 let mut delete_keys_iter =
421 self.dumped_delta_kv_delete_keys_iterator()?;
422 self.start_transaction()?;
423 let mut base_mpt;
425 let mut save_as_mpt = if self.is_mpt_table_in_current_db() {
426 self.open_snapshot_mpt_owned()?
427 } else {
428 let mpt = mpt_snapshot_db.as_mut().unwrap();
429 mpt.start_transaction()?;
430 mpt.open_snapshot_mpt_owned()?
431 };
432
433 let mut mpt_merger = if old_snapshot_db.is_mpt_table_in_current_db() {
434 base_mpt = old_snapshot_db.open_snapshot_mpt_as_owned()?;
435 MptMerger::new(
436 Some(&mut base_mpt as &mut dyn SnapshotMptTraitReadAndIterate),
437 &mut save_as_mpt as &mut dyn SnapshotMptTraitRw,
438 )
439 } else {
440 MptMerger::new(
443 None,
444 &mut save_as_mpt as &mut dyn SnapshotMptTraitRw,
445 )
446 };
447 let snapshot_root = mpt_merger.merge_insertion_deletion_separated(
448 delete_keys_iter.iter_range(&[], None)?.take(),
449 set_keys_iter.iter_range(&[], None)?.take(),
450 in_reconstruct_snapshot_state,
451 )?;
452 self.commit_transaction()?;
453
454 if !self.is_mpt_table_in_current_db() {
455 mpt_snapshot_db.as_mut().unwrap().commit_transaction()?;
456 }
457
458 Ok(snapshot_root)
459 }
460
461 fn start_transaction(&mut self) -> Result<()> {
462 if let Some(connections) = self.maybe_db_connections.as_mut() {
463 for connection in connections.iter_mut() {
464 connection.execute("BEGIN IMMEDIATE", SQLITE_NO_PARAM)?;
465 }
466 }
467
468 Ok(())
469 }
470
471 fn commit_transaction(&mut self) -> Result<()> {
472 if let Some(connections) = self.maybe_db_connections.as_mut() {
473 for connection in connections.iter_mut() {
474 connection.execute("COMMIT", SQLITE_NO_PARAM)?;
475 }
476 }
477
478 Ok(())
479 }
480
481 fn is_mpt_table_in_current_db(&self) -> bool {
482 debug!(
483 "is_mpt_table_in_current_db {}",
484 self.mpt_table_in_current_db
485 );
486 self.mpt_table_in_current_db
487 }
488
489 fn snapshot_kv_iterator(
490 &self,
491 ) -> Result<
492 Wrap<
493 '_,
494 Self::SnapshotKvdbIterType,
495 dyn KeyValueDbIterableTrait<
496 MptKeyValue,
497 [u8],
498 KvdbSqliteShardedIteratorTag,
499 >,
500 >,
501 > {
502 Ok(Wrap(KvdbSqliteSharded::new(
503 self.try_clone_connections()?,
504 SNAPSHOT_DB_STATEMENTS.kvdb_statements.clone(),
505 )))
506 }
507}
508
509impl SnapshotKvDbSqlite {
510 pub fn try_clone_connections(
515 &self,
516 ) -> Result<Option<Box<[SqliteConnection]>>> {
517 match &self.maybe_db_connections {
518 None => Ok(None),
519 Some(old_connections) => {
520 let mut connections = Vec::with_capacity(old_connections.len());
521 for old_connection in old_connections.iter() {
522 let new_connection = old_connection.try_clone()?;
523 connections.push(new_connection);
524 }
525 Ok(Some(connections.into_boxed_slice()))
526 }
527 }
528 }
529
530 pub fn set_remove_on_last_close(&self) {
531 self.remove_on_close.store(true, Ordering::Relaxed);
532 }
533
534 pub fn dumped_delta_kv_set_keys_iterator(
535 &self,
536 ) -> Result<KvdbSqliteSharded<<Self as KeyValueDbTypes>::ValueType>> {
537 Ok(KvdbSqliteSharded::new(
538 self.try_clone_connections()?,
539 SNAPSHOT_DB_STATEMENTS.delta_mpt_set_keys_statements.clone(),
540 ))
541 }
542
543 pub fn dumped_delta_kv_delete_keys_iterator(
544 &self,
545 ) -> Result<KvdbSqliteSharded<()>> {
546 Ok(KvdbSqliteSharded::new(
547 self.try_clone_connections()?,
548 SNAPSHOT_DB_STATEMENTS
549 .delta_mpt_delete_keys_statements
550 .clone(),
551 ))
552 }
553
554 pub fn dump_delta_mpt(
557 &mut self, delta_mpt: &DeltaMptIterator,
558 ) -> Result<()> {
559 debug!("dump_delta_mpt starts");
560 {
562 let connections = self.maybe_db_connections.as_mut().unwrap();
564 <DeltaMptDumperSetDb as SingleWriterImplFamily>::FamilyRepresentative::create_table(
565 connections,
566 &SNAPSHOT_DB_STATEMENTS.delta_mpt_set_keys_statements,
567 )?;
568 <DeltaMptDumperDeleteDb as SingleWriterImplFamily>::FamilyRepresentative::create_table(
569 connections,
570 &SNAPSHOT_DB_STATEMENTS.delta_mpt_delete_keys_statements,
571 )?;
572 }
573
574 self.start_transaction()?;
576 delta_mpt.iterate(&mut DeltaMptMergeDumperSqlite {
577 connections: self.maybe_db_connections.as_mut().unwrap(),
578 })?;
579 self.commit_transaction()?;
580
581 Ok(())
582 }
583
584 pub fn drop_delta_mpt_dump(&mut self) -> Result<()> {
587 let connections = self.maybe_db_connections.as_mut().unwrap();
589 <DeltaMptDumperSetDb as SingleWriterImplFamily>::FamilyRepresentative::drop_table(
590 connections,
591 &SNAPSHOT_DB_STATEMENTS.delta_mpt_set_keys_statements,
592 )?;
593 <DeltaMptDumperDeleteDb as SingleWriterImplFamily>::FamilyRepresentative::drop_table(
594 connections,
595 &SNAPSHOT_DB_STATEMENTS.delta_mpt_delete_keys_statements,
596 )
597 }
598
599 pub fn drop_mpt_table(&mut self) -> Result<()> {
600 if self.mpt_table_in_current_db {
601 let connections = self.maybe_db_connections.as_mut().unwrap();
602 KvdbSqliteSharded::<()>::drop_table(
603 connections,
604 &SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements,
605 )?;
606
607 self.mpt_table_in_current_db = false;
608
609 KvdbSqliteSharded::<()>::vacumm_db(
610 connections,
611 &SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements,
612 )
613 } else {
614 Ok(())
615 }
616 }
617
618 fn apply_update_to_kvdb(&mut self) -> Result<()> {
619 for sqlite in self.maybe_db_connections.as_mut().unwrap().iter_mut() {
621 sqlite
622 .execute(
623 format!(
624 "DELETE FROM {} WHERE KEY IN (SELECT key FROM {})",
625 Self::SNAPSHOT_KV_TABLE_NAME,
626 Self::DELTA_KV_DELETE_TABLE_NAME
627 )
628 .as_str(),
629 SQLITE_NO_PARAM,
630 )?
631 .finish_ignore_rows()?;
632 sqlite
633 .execute(
634 format!(
635 "INSERT OR REPLACE INTO {} (key, value) \
636 SELECT key, value FROM {}",
637 Self::SNAPSHOT_KV_TABLE_NAME,
638 Self::DELTA_KV_SET_TABLE_NAME
639 )
640 .as_str(),
641 SQLITE_NO_PARAM,
642 )?
643 .finish_ignore_rows()?;
644 }
645 Ok(())
646 }
647
648 fn snapshot_mpt_iterator(
649 &self,
650 ) -> Result<
651 Wrap<
652 '_,
653 KvdbSqliteSharded<<Self as KeyValueDbTypes>::ValueType>,
654 dyn KeyValueDbIterableTrait<
655 MptKeyValue,
656 [u8],
657 KvdbSqliteShardedIteratorTag,
658 >,
659 >,
660 > {
661 if self.mpt_table_in_current_db {
662 Ok(Wrap(KvdbSqliteSharded::new(
663 self.try_clone_connections()?,
664 SNAPSHOT_MPT_DB_STATEMENTS.mpt_statements.clone(),
665 )))
666 } else {
667 bail!("mpt_snapshot is not in current db");
668 }
669 }
670}
671
672pub struct DeltaMptMergeDumperSqlite<'a> {
673 connections: &'a mut [SqliteConnection],
674}
675
676pub struct DeltaMptDumperSetDb<'a> {
677 connections: &'a mut [SqliteConnection],
678}
679
680pub struct DeltaMptDumperDeleteDb<'a> {
681 connections: &'a mut [SqliteConnection],
682}
683
684impl KeyValueDbTypes for DeltaMptDumperSetDb<'_> {
685 type ValueType = Box<[u8]>;
686}
687
688impl KeyValueDbTypes for DeltaMptDumperDeleteDb<'_> {
689 type ValueType = ();
690}
691
692impl SingleWriterImplFamily for DeltaMptDumperSetDb<'_> {
693 type FamilyRepresentative = KvdbSqliteSharded<Box<[u8]>>;
694}
695
696impl SingleWriterImplFamily for DeltaMptDumperDeleteDb<'_> {
697 type FamilyRepresentative = KvdbSqliteSharded<()>;
698}
699
700impl KvdbSqliteShardedDestructureTrait for DeltaMptDumperSetDb<'_> {
701 fn destructure_mut(
702 &mut self,
703 ) -> (Option<&mut [SqliteConnection]>, &KvdbSqliteStatements) {
704 (
705 Some(*&mut self.connections),
706 &SNAPSHOT_DB_STATEMENTS.delta_mpt_set_keys_statements,
707 )
708 }
709}
710
711impl KvdbSqliteShardedDestructureTrait for DeltaMptDumperDeleteDb<'_> {
712 fn destructure_mut(
713 &mut self,
714 ) -> (Option<&mut [SqliteConnection]>, &KvdbSqliteStatements) {
715 (
716 Some(*&mut self.connections),
717 &SNAPSHOT_DB_STATEMENTS.delta_mpt_delete_keys_statements,
718 )
719 }
720}
721
722impl<'a> KVInserter<MptKeyValue> for DeltaMptMergeDumperSqlite<'a> {
723 fn push(&mut self, x: MptKeyValue) -> Result<()> {
724 let (mpt_key, value) = x;
726 let snapshot_key =
727 StorageKeyWithSpace::from_delta_mpt_key(&mpt_key).to_key_bytes();
728 if value.len() > 0 {
729 DeltaMptDumperSetDb {
730 connections: *&mut self.connections,
731 }
732 .put_impl(&snapshot_key, &value)?;
733 } else {
734 DeltaMptDumperDeleteDb {
735 connections: *&mut self.connections,
736 }
737 .put_impl(&snapshot_key, &())?;
738 }
739
740 Ok(())
741 }
742}
743
744use crate::{
745 impls::{
746 delta_mpt::DeltaMptIterator,
747 errors::*,
748 merkle_patricia_trie::{MptKeyValue, MptMerger},
749 storage_db::{
750 kvdb_sqlite::KvdbSqliteStatements,
751 kvdb_sqlite_sharded::{
752 KvdbSqliteSharded, KvdbSqliteShardedBorrowMut,
753 KvdbSqliteShardedBorrowShared,
754 KvdbSqliteShardedDestructureTrait,
755 KvdbSqliteShardedIteratorTag,
756 KvdbSqliteShardedRefDestructureTrait,
757 },
758 snapshot_db_manager_sqlite::AlreadyOpenSnapshots,
759 snapshot_mpt::{SnapshotMpt, SnapshotMptLoadNode},
760 sqlite::SQLITE_NO_PARAM,
761 },
762 },
763 storage_db::{
764 KeyValueDbIterableTrait, KeyValueDbTraitSingleWriter, KeyValueDbTypes,
765 OpenSnapshotMptTrait, OwnedReadImplByFamily, OwnedReadImplFamily,
766 ReadImplByFamily, ReadImplFamily, SingleWriterImplByFamily,
767 SingleWriterImplFamily, SnapshotDbTrait, SnapshotMptDbValue,
768 SnapshotMptTraitReadAndIterate, SnapshotMptTraitRw,
769 },
770 utils::wrap::Wrap,
771 KVInserter, SnapshotDbManagerSqlite, SqliteConnection,
772};
773use fallible_iterator::FallibleIterator;
774use primitives::{MerkleHash, StorageKeyWithSpace};
775use std::{
776 fs,
777 path::{Path, PathBuf},
778 sync::{
779 atomic::{AtomicBool, Ordering},
780 Arc,
781 },
782};
783use tokio::sync::Semaphore;
784
785use super::snapshot_mpt_db_sqlite::{
786 SnapshotMptDbSqlite, SNAPSHOT_MPT_DB_STATEMENTS,
787};