1pub struct SnapshotDbManagerSqlite {
6 snapshot_path: PathBuf,
7 force_cow: bool,
10 already_open_snapshots: AlreadyOpenSnapshots<SnapshotKvDbSqlite>,
11 open_snapshot_semaphore: Arc<Semaphore>,
15 open_create_delete_lock: Mutex<()>,
16 use_isolated_db_for_mpt_table: bool,
17 use_isolated_db_for_mpt_table_height: Option<u64>,
18 mpt_snapshot_path: PathBuf,
19 mpt_already_open_snapshots: AlreadyOpenSnapshots<SnapshotMptDbSqlite>,
20 mpt_open_snapshot_semaphore: Arc<Semaphore>,
21 era_epoch_count: u64,
22 max_open_snapshots: u16,
23 latest_mpt_snapshot_semaphore: Arc<Semaphore>,
24 latest_snapshot_id: RwLock<(EpochId, u64)>,
25 copying_mpt_snapshot: Arc<Mutex<()>>,
26 snapshot_epoch_id_before_recovered: RwLock<Option<EpochId>>,
27 reconstruct_snapshot_id_for_reboot: RwLock<Option<EpochId>>,
28 backup_mpt_snapshot: bool,
29}
30
31#[derive(Debug)]
32enum CopyType {
33 Cow,
34 Std,
35}
36
37pub struct SnapshotDbWriteable {
38 pub kv_snapshot_db: SnapshotKvDbSqlite,
39 pub mpt_snapshot_db: Option<SnapshotMptDbSqlite>,
40}
41
42impl KeyValueDbTypes for SnapshotDbWriteable {
43 type ValueType = Box<[u8]>;
44}
45
46impl SnapshotDbWriteableTrait for SnapshotDbWriteable {
47 type SnapshotDbBorrowMutType = SnapshotMpt<
48 KvdbSqliteShardedBorrowMut<'static, SnapshotMptDbValue>,
49 KvdbSqliteShardedBorrowMut<'static, SnapshotMptDbValue>,
50 >;
51
52 fn start_transaction(&mut self) -> Result<()> {
53 self.kv_snapshot_db.start_transaction()?;
54
55 if self.mpt_snapshot_db.is_some() {
56 self.mpt_snapshot_db.as_mut().unwrap().start_transaction()?;
57 }
58
59 Ok(())
60 }
61
62 fn commit_transaction(&mut self) -> Result<()> {
63 self.kv_snapshot_db.commit_transaction()?;
64
65 if self.mpt_snapshot_db.is_some() {
66 self.mpt_snapshot_db
67 .as_mut()
68 .unwrap()
69 .commit_transaction()?;
70 }
71
72 Ok(())
73 }
74
75 fn put_kv(
76 &mut self, key: &[u8], value: &<Self::ValueType as DbValueType>::Type,
77 ) -> Result<Option<Option<Self::ValueType>>> {
78 self.kv_snapshot_db.put(key, value)
79 }
80
81 fn open_snapshot_mpt_owned(
82 &mut self,
83 ) -> Result<Self::SnapshotDbBorrowMutType> {
84 if self.kv_snapshot_db.is_mpt_table_in_current_db() {
85 self.kv_snapshot_db.open_snapshot_mpt_owned()
86 } else {
87 self.mpt_snapshot_db
88 .as_mut()
89 .unwrap()
90 .open_snapshot_mpt_owned()
91 }
92 }
93}
94
95pub type AlreadyOpenSnapshots<T> =
100 Arc<RwLock<HashMap<PathBuf, Option<Weak<T>>>>>;
101
102impl SnapshotDbManagerSqlite {
103 pub const LATEST_MPT_SNAPSHOT_DIR: &'static str = "latest";
104 const MPT_SNAPSHOT_DIR: &'static str = "mpt_snapshot";
105 const SNAPSHOT_DB_SQLITE_DIR_PREFIX: &'static str = "sqlite_";
106
107 pub fn new(
108 snapshot_path: PathBuf, max_open_snapshots: u16,
109 use_isolated_db_for_mpt_table: bool,
110 use_isolated_db_for_mpt_table_height: Option<u64>,
111 era_epoch_count: u64, backup_mpt_snapshot: bool,
112 ) -> Result<Self> {
113 if !snapshot_path.exists() {
114 fs::create_dir_all(snapshot_path.clone())?;
115 }
116
117 let mpt_snapshot_path = snapshot_path
118 .parent()
119 .unwrap()
120 .join(SnapshotDbManagerSqlite::MPT_SNAPSHOT_DIR);
121 let latest_mpt_snapshot_path = mpt_snapshot_path.join(
122 Self::SNAPSHOT_DB_SQLITE_DIR_PREFIX.to_string()
123 + SnapshotDbManagerSqlite::LATEST_MPT_SNAPSHOT_DIR,
124 );
125
126 SnapshotMptDbSqlite::create(
128 latest_mpt_snapshot_path.as_path(),
129 &Default::default(),
130 &Arc::new(Semaphore::new(max_open_snapshots as usize)),
131 None,
132 )?;
133
134 Ok(Self {
135 snapshot_path,
136 force_cow: false,
137 already_open_snapshots: Default::default(),
138 open_snapshot_semaphore: Arc::new(Semaphore::new(
139 max_open_snapshots as usize,
140 )),
141 open_create_delete_lock: Default::default(),
142 mpt_snapshot_path,
143 use_isolated_db_for_mpt_table,
144 use_isolated_db_for_mpt_table_height,
145 mpt_already_open_snapshots: Default::default(),
146 mpt_open_snapshot_semaphore: Arc::new(Semaphore::new(
147 max_open_snapshots as usize,
148 )),
149 era_epoch_count,
150 max_open_snapshots,
151 latest_mpt_snapshot_semaphore: Arc::new(Semaphore::new(1)),
152 latest_snapshot_id: RwLock::new((NULL_EPOCH, 0)),
153 copying_mpt_snapshot: Arc::new(Default::default()),
154 snapshot_epoch_id_before_recovered: RwLock::new(None),
155 reconstruct_snapshot_id_for_reboot: RwLock::new(None),
156 backup_mpt_snapshot,
157 })
158 }
159
160 pub fn update_latest_snapshot_id(&self, snapshot_id: EpochId, height: u64) {
161 *self.latest_snapshot_id.write() = (snapshot_id, height);
162 }
163
164 pub fn clean_snapshot_epoch_id_before_recovered(&self) {
165 *self.snapshot_epoch_id_before_recovered.write() = None;
166 }
167
168 pub fn set_reconstruct_snapshot_id(
169 &self, reconstruct_pivot: Option<EpochId>,
170 ) {
171 debug!("set_reconstruct_snapshot_id to {:?}", reconstruct_pivot);
172 *self.reconstruct_snapshot_id_for_reboot.write() = reconstruct_pivot;
173 }
174
175 pub fn recreate_latest_mpt_snapshot(&self) -> Result<()> {
176 info!("recreate latest mpt snapshot");
177 let latest_mpt_snapshot_path = self.get_latest_mpt_snapshot_db_path();
178 if latest_mpt_snapshot_path.exists() {
179 debug!("remove mpt snapshot {:?}", latest_mpt_snapshot_path);
180 if let Err(e) =
181 fs::remove_dir_all(&latest_mpt_snapshot_path.as_path())
182 {
183 error!(
184 "remove mpt snapshot err: path={:?} err={:?}",
185 latest_mpt_snapshot_path.as_path(),
186 e
187 );
188 }
189 }
190
191 SnapshotMptDbSqlite::create(
193 latest_mpt_snapshot_path.as_path(),
194 &Default::default(),
195 &Arc::new(Semaphore::new(self.max_open_snapshots as usize)),
196 None,
197 )?;
198
199 Ok(())
200 }
201
202 fn open_snapshot_readonly(
203 &self, snapshot_path: PathBuf, try_open: bool,
204 snapshot_epoch_id: &EpochId, read_mpt_snapshot: bool,
205 ) -> Result<Option<SnapshotDbSqlite>> {
206 if self
208 .snapshot_epoch_id_before_recovered
209 .read()
210 .is_some_and(|e| &e == snapshot_epoch_id)
211 {
212 return Ok(None);
213 }
214
215 let _open_lock = self.open_create_delete_lock.lock();
217
218 if let Some(snapshot_db) =
219 self.open_kv_snapshot_readonly(snapshot_path, try_open)?
220 {
221 let mpt_snapshot_db = if !snapshot_db.is_mpt_table_in_current_db()
222 && read_mpt_snapshot
223 {
224 let mpt_snapshot_path = if self.use_isolated_db_for_mpt_table {
227 let mpt_snapshot_path =
228 self.get_mpt_snapshot_db_path(snapshot_epoch_id);
229 if mpt_snapshot_path.exists() {
230 mpt_snapshot_path
231 } else {
232 if self.latest_snapshot_id.read().0
233 == *snapshot_epoch_id
234 {
235 self.get_latest_mpt_snapshot_db_path()
236 } else {
237 error!("MPT DB not exist, latest snapshot id {:?}, try to open {:?}.", self.latest_snapshot_id.read().0, snapshot_epoch_id);
238 return Ok(None);
239 }
240 }
241 } else {
242 error!("MPT table should be in snapshot database.");
243 return Ok(None);
244 };
245
246 let mpt_snapshot_db = self.open_mpt_snapshot_readonly(
247 mpt_snapshot_path,
248 try_open,
249 snapshot_epoch_id,
250 )?;
251
252 mpt_snapshot_db
253 } else {
254 None
255 };
256
257 return Ok(Some(SnapshotDbSqlite {
258 snapshot_db,
259 mpt_snapshot_db,
260 }));
261 } else {
262 return Ok(None);
263 }
264 }
265
266 fn open_kv_snapshot_readonly(
267 &self, snapshot_path: PathBuf, try_open: bool,
268 ) -> Result<Option<Arc<SnapshotKvDbSqlite>>> {
269 if let Some(already_open) =
270 self.already_open_snapshots.read().get(&snapshot_path)
271 {
272 match already_open {
273 None => {
274 return Ok(None);
276 }
277 Some(open_shared_weak) => {
278 match Weak::upgrade(open_shared_weak) {
279 None => {}
280 Some(already_open) => {
281 return Ok(Some(already_open));
282 }
283 }
284 }
285 }
286 }
287 let file_exists = snapshot_path.exists();
288 if file_exists {
289 let semaphore_permit = if try_open {
290 self.open_snapshot_semaphore
291 .try_acquire()
292 .map_err(|_err| Error::SemaphoreTryAcquireError)?
295 } else {
296 executor::block_on(self.open_snapshot_semaphore.acquire())
297 .map_err(|_err| Error::SemaphoreAcquireError)?
298 };
299
300 while let Some(already_open) =
303 self.already_open_snapshots.read().get(&snapshot_path)
304 {
305 match already_open {
306 None => {
307 return Ok(None);
309 }
310 Some(open_shared_weak) => {
311 match Weak::upgrade(open_shared_weak) {
312 None => {
313 thread::sleep(Duration::from_millis(5));
325 continue;
326 }
327 Some(already_open) => {
328 return Ok(Some(already_open));
329 }
330 }
331 }
332 }
333 }
334
335 let snapshot_db = Arc::new(SnapshotKvDbSqlite::open(
336 snapshot_path.as_path(),
337 true,
338 &self.already_open_snapshots,
339 &self.open_snapshot_semaphore,
340 )?);
341
342 semaphore_permit.forget();
343 self.already_open_snapshots.write().insert(
344 snapshot_path.into(),
345 Some(Arc::downgrade(&snapshot_db)),
346 );
347
348 return Ok(Some(snapshot_db));
349 } else {
350 return Ok(None);
351 }
352 }
353
354 fn open_snapshot_write(
355 &self, snapshot_path: PathBuf, create: bool, new_epoch_height: u64,
356 mpt_snapshot_path: Option<PathBuf>, new_snapshot_id: &EpochId,
357 ) -> Result<(SnapshotKvDbSqlite, Option<SnapshotMptDbSqlite>)> {
358 let mut _open_lock = self.open_create_delete_lock.lock();
359
360 let kv_db = self.open_kv_snapshot_write(
361 snapshot_path,
362 create,
363 new_epoch_height,
364 )?;
365
366 let mpt_table_in_current_db =
367 self.is_mpt_table_in_current_db_for_epoch(new_epoch_height);
368 let mpt_db: Option<SnapshotMptDbSqlite> = if !mpt_table_in_current_db {
369 let (mpt_snapshot_path, create_mpt) = match mpt_snapshot_path {
370 Some(v) => (v, true),
371 _ => {
372 let latest_snapshot_id = self.latest_snapshot_id.read();
373 debug!(
374 "new_epoch_height {}, latest_snapshot_id {} {}",
375 new_epoch_height,
376 latest_snapshot_id.0,
377 latest_snapshot_id.1
378 );
379 if new_epoch_height <= latest_snapshot_id.1 {
380 bail!(format!(
381 "Try to write an old snapshot {}, {}",
382 new_epoch_height, latest_snapshot_id.1
383 ))
384 }
385
386 (self.get_latest_mpt_snapshot_db_path(), false)
387 }
388 };
389
390 Some(self.open_mpt_snapshot_write(
391 mpt_snapshot_path,
392 create_mpt,
393 new_epoch_height,
394 new_snapshot_id,
395 )?)
396 } else {
397 None
398 };
399
400 return Ok((kv_db, mpt_db));
401 }
402
403 fn open_kv_snapshot_write(
404 &self, snapshot_path: PathBuf, create: bool, new_epoch_height: u64,
405 ) -> Result<SnapshotKvDbSqlite> {
406 if self
407 .already_open_snapshots
408 .read()
409 .get(&snapshot_path)
410 .is_some()
411 {
412 bail!(Error::SnapshotAlreadyExists)
413 }
414
415 let semaphore_permit =
416 executor::block_on(self.open_snapshot_semaphore.acquire())
417 .map_err(|_err| Error::SemaphoreAcquireError)?;
418 if self
423 .already_open_snapshots
424 .read()
425 .get(&snapshot_path)
426 .is_some()
427 {
428 bail!(Error::SnapshotAlreadyExists)
429 }
430
431 let mpt_table_in_current_db =
432 self.is_mpt_table_in_current_db_for_epoch(new_epoch_height);
433
434 let snapshot_db = if create {
435 SnapshotKvDbSqlite::create(
436 snapshot_path.as_path(),
437 &self.already_open_snapshots,
438 &self.open_snapshot_semaphore,
439 mpt_table_in_current_db,
440 )
441 } else {
442 let file_exists = snapshot_path.exists();
443 if file_exists {
444 let mut db = SnapshotKvDbSqlite::open(
445 snapshot_path.as_path(),
446 false,
447 &self.already_open_snapshots,
448 &self.open_snapshot_semaphore,
449 )?;
450
451 if !mpt_table_in_current_db {
452 db.drop_mpt_table()?;
453 }
454
455 Ok(db)
456 } else {
457 bail!(Error::SnapshotNotFound);
458 }
459 }?;
460
461 semaphore_permit.forget();
462 self.already_open_snapshots
463 .write()
464 .insert(snapshot_path.clone(), None);
465 Ok(snapshot_db)
466 }
467
468 fn open_mpt_snapshot_readonly(
469 &self, snapshot_path: PathBuf, try_open: bool,
470 snapshot_epoch_id: &EpochId,
471 ) -> Result<Option<Arc<SnapshotMptDbSqlite>>> {
472 debug!(
473 "Open mpt snapshot with readonly {:?}, snapshot_epoch_id {:?}",
474 snapshot_path, snapshot_epoch_id
475 );
476 if let Some(already_open) =
477 self.mpt_already_open_snapshots.read().get(&snapshot_path)
478 {
479 match already_open {
480 None => {
481 return Ok(None);
483 }
484 Some(open_shared_weak) => {
485 match Weak::upgrade(open_shared_weak) {
486 None => {}
487 Some(already_open) => {
488 return Ok(Some(already_open));
489 }
490 }
491 }
492 }
493 }
494 let file_exists = snapshot_path.exists();
495 if file_exists {
496 let semaphore_permit = if try_open {
497 self.mpt_open_snapshot_semaphore
498 .try_acquire()
499 .map_err(|_err| Error::SemaphoreTryAcquireError)?
502 } else {
503 executor::block_on(self.mpt_open_snapshot_semaphore.acquire())
504 .map_err(|_err| Error::SemaphoreAcquireError)?
505 };
506
507 while let Some(already_open) =
510 self.mpt_already_open_snapshots.read().get(&snapshot_path)
511 {
512 match already_open {
513 None => {
514 return Ok(None);
516 }
517 Some(open_shared_weak) => {
518 match Weak::upgrade(open_shared_weak) {
519 None => {
520 thread::sleep(Duration::from_millis(5));
521 continue;
522 }
523 Some(already_open) => {
524 return Ok(Some(already_open));
525 }
526 }
527 }
528 }
529 }
530
531 let (latest_mpt_semaphore_permit, v) = if self
532 .latest_snapshot_id
533 .read()
534 .0
535 == *snapshot_epoch_id
536 && self.latest_snapshot_id.read().1 % self.era_epoch_count != 0
537 {
538 let s =
539 self.latest_mpt_snapshot_semaphore.try_acquire().map_err(
540 |_err| "The MPT snapshot is already open for writing.",
541 )?;
542
543 (Some(s), Some(self.latest_mpt_snapshot_semaphore.clone()))
544 } else {
545 (None, None)
546 };
547
548 let snapshot_db = Arc::new(SnapshotMptDbSqlite::open(
549 snapshot_path.as_path(),
550 true,
551 &self.mpt_already_open_snapshots,
552 &self.mpt_open_snapshot_semaphore,
553 v,
554 )?);
555
556 if let Some(s) = latest_mpt_semaphore_permit {
557 s.forget();
558 }
559 semaphore_permit.forget();
560 self.mpt_already_open_snapshots.write().insert(
561 snapshot_path.into(),
562 Some(Arc::downgrade(&snapshot_db)),
563 );
564
565 return Ok(Some(snapshot_db));
566 } else {
567 return Ok(None);
568 }
569 }
570
571 fn open_mpt_snapshot_write(
572 &self, snapshot_path: PathBuf, create: bool, new_epoch_height: u64,
573 new_snapshot_id: &EpochId,
574 ) -> Result<SnapshotMptDbSqlite> {
575 debug!(
576 "open mpt snapshot with write {:?}, new epoch height {}",
577 snapshot_path, new_epoch_height
578 );
579 let latest_mpt_semaphore_permit: tokio::sync::SemaphorePermit =
580 executor::block_on(self.latest_mpt_snapshot_semaphore.acquire())
581 .map_err(|_err| Error::SemaphoreAcquireError)?;
582
583 if self
584 .mpt_already_open_snapshots
585 .read()
586 .get(&snapshot_path)
587 .is_some()
588 {
589 bail!(Error::SnapshotAlreadyExists)
590 }
591
592 let semaphore_permit =
593 executor::block_on(self.mpt_open_snapshot_semaphore.acquire())
594 .map_err(|_err| Error::SemaphoreAcquireError)?;
595
596 let snapshot_db = if create {
597 SnapshotMptDbSqlite::create(
598 snapshot_path.as_path(),
599 &self.mpt_already_open_snapshots,
600 &self.mpt_open_snapshot_semaphore,
601 Some(self.latest_mpt_snapshot_semaphore.clone()),
602 )
603 } else {
604 let file_exists = snapshot_path.exists();
605 if file_exists {
606 SnapshotMptDbSqlite::open(
607 snapshot_path.as_path(),
608 false,
609 &self.mpt_already_open_snapshots,
610 &self.mpt_open_snapshot_semaphore,
611 Some(self.latest_mpt_snapshot_semaphore.clone()),
612 )
613 } else {
614 bail!(Error::SnapshotNotFound);
615 }
616 }?;
617
618 latest_mpt_semaphore_permit.forget();
619 semaphore_permit.forget();
620
621 *self.latest_snapshot_id.write() =
622 (new_snapshot_id.clone(), new_epoch_height);
623
624 self.mpt_already_open_snapshots
625 .write()
626 .insert(snapshot_path.clone(), None);
627
628 Ok(snapshot_db)
629 }
630
631 pub fn on_close(
632 already_open_snapshots: &AlreadyOpenSnapshots<SnapshotKvDbSqlite>,
633 open_semaphore: &Arc<Semaphore>, path: &Path, remove_on_close: bool,
634 ) {
635 if remove_on_close {
637 Self::fs_remove_snapshot(path);
646 }
647 already_open_snapshots.write().remove(path);
648 open_semaphore.add_permits(1);
649 }
650
651 pub fn on_close_mpt_snapshot(
652 already_open_snapshots: &AlreadyOpenSnapshots<SnapshotMptDbSqlite>,
653 open_semaphore: &Arc<Semaphore>, path: &Path, remove_on_close: bool,
654 latest_mpt_snapshot_semaphore: &Option<Arc<Semaphore>>,
655 ) {
656 debug!("on_close_mpt_snapshot path {:?}", path);
657 if remove_on_close {
659 Self::fs_remove_snapshot(path);
660 }
661 already_open_snapshots.write().remove(path);
662 open_semaphore.add_permits(1);
663
664 if let Some(s) = latest_mpt_snapshot_semaphore {
665 s.add_permits(1);
666 }
667 }
668
669 fn fs_remove_snapshot(path: &Path) {
670 debug!("Remove snapshot at {}", path.display());
671 let path = path.to_owned();
672 thread::spawn(move || {
673 if let Err(e) = fs::remove_dir_all(&path) {
674 error!("remove snapshot err: path={:?} err={:?}", path, e);
675 }
676 debug!("Finish removing snapshot at {}", path.display());
677 });
678 }
679
680 fn get_merge_temp_snapshot_db_path(
681 &self, old_snapshot_epoch_id: &EpochId, new_snapshot_epoch_id: &EpochId,
682 ) -> PathBuf {
683 self.snapshot_path.join(
684 Self::SNAPSHOT_DB_SQLITE_DIR_PREFIX.to_string()
685 + "merge_temp_"
686 + &old_snapshot_epoch_id.as_ref().to_hex::<String>()
687 + &new_snapshot_epoch_id.as_ref().to_hex::<String>(),
688 )
689 }
690
691 fn try_get_new_snapshot_epoch_from_temp_path(
692 &self, dir_name: &str,
693 ) -> Option<EpochId> {
694 let prefix =
695 Self::SNAPSHOT_DB_SQLITE_DIR_PREFIX.to_string() + "merge_temp_";
696
697 if dir_name.starts_with(&prefix) {
698 match EpochId::from_str(
699 &dir_name[(prefix.len() + EpochId::len_bytes() * 2)..],
700 ) {
701 Ok(e) => Some(e),
702 Err(e) => {
703 error!(
704 "get new snapshot epoch id from temp path failed: {}",
705 e
706 );
707 None
708 }
709 }
710 } else {
711 None
712 }
713 }
714
715 fn get_full_sync_temp_snapshot_db_path(
716 &self, snapshot_epoch_id: &EpochId, merkle_root: &MerkleHash,
717 ) -> PathBuf {
718 self.snapshot_path.join(
719 Self::SNAPSHOT_DB_SQLITE_DIR_PREFIX.to_string()
720 + "full_sync_temp_"
721 + &snapshot_epoch_id.as_ref().to_hex::<String>()
722 + &merkle_root.as_ref().to_hex::<String>(),
723 )
724 }
725
726 fn get_merge_temp_mpt_snapshot_db_path(
727 &self, new_snapshot_epoch_id: &EpochId,
728 ) -> PathBuf {
729 self.mpt_snapshot_path.join(
730 Self::SNAPSHOT_DB_SQLITE_DIR_PREFIX.to_string()
731 + "merge_temp_"
732 + &new_snapshot_epoch_id.as_ref().to_hex::<String>(),
733 )
734 }
735
736 fn try_get_new_snapshot_epoch_from_mpt_temp_path(
737 &self, dir_name: &str,
738 ) -> Option<EpochId> {
739 let prefix =
740 Self::SNAPSHOT_DB_SQLITE_DIR_PREFIX.to_string() + "merge_temp_";
741 if dir_name.starts_with(&prefix) {
742 match EpochId::from_str(&dir_name[prefix.len()..]) {
743 Ok(e) => Some(e),
744 Err(e) => {
745 error!("get new snapshot epoch id from mpt temp path failed: {}", e);
746 None
747 }
748 }
749 } else {
750 None
751 }
752 }
753
754 fn get_latest_mpt_snapshot_db_path(&self) -> PathBuf {
755 self.mpt_snapshot_path
756 .join(self.get_latest_mpt_snapshot_db_name())
757 }
758
759 fn get_mpt_snapshot_db_path(&self, snapshot_epoch_id: &EpochId) -> PathBuf {
760 self.mpt_snapshot_path
761 .join(&self.get_snapshot_db_name(snapshot_epoch_id))
762 }
763
764 fn get_full_sync_temp_mpt_snapshot_db_path(
765 &self, snapshot_epoch_id: &EpochId, merkle_root: &MerkleHash,
766 ) -> PathBuf {
767 self.mpt_snapshot_path.join(
768 Self::SNAPSHOT_DB_SQLITE_DIR_PREFIX.to_string()
769 + "full_sync_temp_"
770 + &snapshot_epoch_id.as_ref().to_hex::<String>()
771 + &merkle_root.as_ref().to_hex::<String>(),
772 )
773 }
774
775 fn try_make_snapshot_cow_copy_impl(
779 old_snapshot_path: &Path, new_snapshot_path: &Path, force_cow: bool,
780 ) -> Result<bool> {
781 let mut command;
782 if cfg!(target_os = "linux") {
783 command = Command::new("cp");
785 command
786 .arg("-R")
787 .arg("--reflink=always")
788 .arg(old_snapshot_path)
789 .arg(new_snapshot_path);
790 } else if cfg!(target_os = "macos") {
791 command = Command::new("cp");
793 command
794 .arg("-R")
795 .arg("-c")
796 .arg(old_snapshot_path)
797 .arg(new_snapshot_path);
798 } else {
799 return Ok(false);
800 };
801
802 let command_result = command.output();
803 if command_result.is_err() {
804 fs::remove_dir_all(new_snapshot_path)?;
805 }
806 if !command_result?.status.success() {
807 fs::remove_dir_all(new_snapshot_path)?;
808 if force_cow {
809 error!(
810 "COW copy failed, check file system support. Command {:?}",
811 command,
812 );
813 Err(Error::SnapshotCowCreation.into())
814 } else {
815 info!(
816 "COW copy failed, check file system support. Command {:?}",
817 command,
818 );
819 Ok(false)
820 }
821 } else {
822 Ok(true)
823 }
824 }
825
826 fn try_copy_snapshot(
827 old_snapshot_path: &Path, new_snapshot_path: &Path, force_cow: bool,
828 ) -> Result<CopyType> {
829 debug!("try copy into {:?}", new_snapshot_path);
830 if Self::try_make_snapshot_cow_copy(
831 old_snapshot_path,
832 new_snapshot_path,
833 force_cow,
834 )? {
835 Ok(CopyType::Cow)
836 } else {
837 let mut options = CopyOptions::new();
838 options.copy_inside = true; fs_extra::dir::copy(old_snapshot_path, new_snapshot_path, &options)
840 .map(|_| CopyType::Std)
841 .map_err(|e| {
842 warn!(
843 "Fail to copy snapshot {:?}, err={:?}",
844 old_snapshot_path, e,
845 );
846 Error::SnapshotCopyFailure.into()
847 })
848 }
849 }
850
851 fn try_make_snapshot_cow_copy(
855 old_snapshot_path: &Path, new_snapshot_path: &Path, force_cow: bool,
856 ) -> Result<bool> {
857 let result = Self::try_make_snapshot_cow_copy_impl(
858 old_snapshot_path,
859 new_snapshot_path,
860 force_cow,
861 );
862
863 if result.is_err() {
864 Ok(false)
865 } else if result.unwrap() == false {
866 if force_cow {
867 error!(
869 "Failed to create a new snapshot by COW. \
870 Use XFS on linux or APFS on Mac"
871 );
872 Err(Error::SnapshotCowCreation.into())
873 } else {
874 Ok(false)
875 }
876 } else {
877 Ok(true)
878 }
879 }
880
881 fn copy_and_merge(
882 &self, temp_snapshot_db: &mut SnapshotKvDbSqlite,
883 mpt_snapshot_db: &mut Option<SnapshotMptDbSqlite>,
884 old_snapshot_epoch_id: &EpochId, snapshot_epoch_id: EpochId,
885 ) -> Result<MerkleHash> {
886 let snapshot_path = self.get_snapshot_db_path(old_snapshot_epoch_id);
887 let maybe_old_snapshot_db = Self::open_snapshot_readonly(
888 self,
889 snapshot_path,
890 false,
891 old_snapshot_epoch_id,
892 false,
893 )?;
894 let old_snapshot_db = maybe_old_snapshot_db
895 .ok_or(Error::from(Error::SnapshotNotFound))?;
896 temp_snapshot_db.copy_and_merge(
897 &old_snapshot_db.snapshot_db,
898 mpt_snapshot_db,
899 self.reconstruct_snapshot_id_for_reboot
900 .write()
901 .take()
902 .is_some_and(|v| v == snapshot_epoch_id),
903 )
904 }
905
906 fn rename_snapshot_db<P: AsRef<Path>>(
907 old_path: P, new_path: P,
908 ) -> Result<()> {
909 Ok(fs::rename(old_path, new_path)?)
910 }
911
912 fn defragmenting_xfs_files(&self, new_snapshot_db_path: PathBuf) {
913 thread::Builder::new()
914 .name("Defragmenting XFS Files".into())
915 .spawn(move || {
916 let paths = fs::read_dir(new_snapshot_db_path).unwrap();
917 let mut files = vec![];
918 for path in paths {
919 if let Ok(p) = path {
920 let f = p.path();
921 if f.is_file() {
922 files.push(f.as_path().display().to_string());
923 }
924 }
925 }
926
927 let mut command = Command::new("xfs_fsr");
928 command.arg("-v");
929 command.args(files);
930 let command_result = command.output();
931 match command_result {
932 Ok(o) => {
933 if o.status.success() {
934 info!(
935 "Defragmenting XFS files success. Command {:?}",
936 command
937 );
938 }
939 }
940 Err(e) => {
941 error!(
942 "Defragmenting XFS files failed. Command {:?} \n Error {:?}",
943 command, e
944 );
945 }
946 }
947 })
948 .unwrap();
949 }
950
951 fn copy_mpt_snapshot(
952 &self, temp_mpt_path: PathBuf, new_mpt_snapshot_db_path: PathBuf,
953 ) -> Result<()> {
954 debug!(
955 "Copy mpt db to {:?}, era_epoch_count {}",
956 new_mpt_snapshot_db_path, self.era_epoch_count
957 );
958 let latest_mpt_path = self.get_latest_mpt_snapshot_db_path();
959
960 let force_cow = self.force_cow;
961 let copying_mpt_snapshot = Arc::clone(&self.copying_mpt_snapshot);
962 thread::Builder::new()
963 .name("Copy mpt snapshot".into())
964 .spawn(move || {
965 let _open_lock = copying_mpt_snapshot.lock();
966
967 if let Err(e) = Self::try_copy_snapshot(
968 latest_mpt_path.as_path(),
969 temp_mpt_path.as_path(),
970 force_cow,
971 ) {
972 error!(
973 "Copy mpt snapshot failed. Try copy snapshot error. \n Error {:?}",
974 e
975 );
976 return;
977 }
978
979 if new_mpt_snapshot_db_path.exists() {
980 if let Err(e) =
981 fs::remove_dir_all(&new_mpt_snapshot_db_path.as_path())
982 {
983 error!(
984 "remove mpt snapshot err: path={:?} err={:?}",
985 new_mpt_snapshot_db_path.as_path(),
986 e
987 );
988 }
989 }
990
991 if let Err(e) = Self::rename_snapshot_db(
992 &temp_mpt_path,
993 &new_mpt_snapshot_db_path,
994 ) {
995 error!(
996 "Copy mpt snapshot failed. Rename snapshot db error. \n Error {:?}",
997 e
998 );
999 }
1000 })
1001 .unwrap();
1002 Ok(())
1003 }
1004
1005 fn is_mpt_table_in_current_db_for_epoch(&self, epoch_height: u64) -> bool {
1006 if self.use_isolated_db_for_mpt_table {
1007 match self.use_isolated_db_for_mpt_table_height {
1008 Some(v) => epoch_height < v,
1009 _ => false,
1010 }
1011 } else {
1012 true
1013 }
1014 }
1015}
1016
1017impl SnapshotDbManagerTrait for SnapshotDbManagerSqlite {
1018 type SnapshotDb = SnapshotDbSqlite;
1019 type SnapshotDbWrite = SnapshotDbWriteable;
1020
1021 fn get_snapshot_dir(&self) -> &Path { self.snapshot_path.as_path() }
1022
1023 fn get_mpt_snapshot_dir(&self) -> &Path { self.mpt_snapshot_path.as_path() }
1024
1025 fn get_latest_mpt_snapshot_db_name(&self) -> String {
1026 Self::SNAPSHOT_DB_SQLITE_DIR_PREFIX.to_string()
1027 + Self::LATEST_MPT_SNAPSHOT_DIR
1028 }
1029
1030 fn get_snapshot_db_name(&self, snapshot_epoch_id: &EpochId) -> String {
1031 Self::SNAPSHOT_DB_SQLITE_DIR_PREFIX.to_string()
1032 + &snapshot_epoch_id.as_ref().to_hex::<String>()
1033 }
1034
1035 fn get_snapshot_db_path(&self, snapshot_epoch_id: &EpochId) -> PathBuf {
1036 self.snapshot_path
1037 .join(&self.get_snapshot_db_name(snapshot_epoch_id))
1038 }
1039
1040 fn get_epoch_id_from_snapshot_db_name(
1041 &self, snapshot_db_name: &str,
1042 ) -> Result<EpochId> {
1043 let prefix_len = Self::SNAPSHOT_DB_SQLITE_DIR_PREFIX.len();
1044 Ok(EpochId::from_str(&snapshot_db_name[prefix_len..])
1045 .map_err(|_op| "not correct snapshot db name")?)
1046 }
1047
1048 fn new_snapshot_by_merging<'m>(
1049 &self, old_snapshot_epoch_id: &EpochId, snapshot_epoch_id: EpochId,
1050 delta_mpt: DeltaMptIterator,
1051 mut in_progress_snapshot_info: SnapshotInfo,
1052 snapshot_info_map_rwlock: &'m RwLock<PersistedSnapshotInfoMap>,
1053 new_epoch_height: u64, recover_mpt_with_kv_snapshot_exist: bool,
1054 ) -> Result<(RwLockWriteGuard<'m, PersistedSnapshotInfoMap>, SnapshotInfo)>
1055 {
1056 info!(
1057 "new_snapshot_by_merging: old={:?} new={:?} new epoch height={}, recovering mpt={}",
1058 old_snapshot_epoch_id, snapshot_epoch_id, new_epoch_height, recover_mpt_with_kv_snapshot_exist
1059 );
1060 let temp_db_path = self.get_merge_temp_snapshot_db_path(
1062 old_snapshot_epoch_id,
1063 &snapshot_epoch_id,
1064 );
1065 let new_snapshot_db_path =
1066 self.get_snapshot_db_path(&snapshot_epoch_id);
1067
1068 let mut snapshot_kv_db;
1069 let mut snapshot_mpt_db;
1070 let mut cow = false;
1071
1072 let mpt_table_in_current_db =
1073 self.is_mpt_table_in_current_db_for_epoch(new_epoch_height);
1074 let new_snapshot_root = if *old_snapshot_epoch_id == NULL_EPOCH {
1075 snapshot_mpt_db = if mpt_table_in_current_db {
1076 None
1077 } else {
1078 Some(self.open_mpt_snapshot_write(
1079 self.get_latest_mpt_snapshot_db_path(),
1080 true,
1081 new_epoch_height,
1082 &snapshot_epoch_id,
1083 )?)
1084 };
1085 snapshot_kv_db = if recover_mpt_with_kv_snapshot_exist {
1086 let mut kv_db = self.open_kv_snapshot_write(
1087 new_snapshot_db_path.clone(),
1088 false,
1089 new_epoch_height,
1090 )?;
1091
1092 kv_db.drop_delta_mpt_dump()?;
1093 kv_db
1094 } else {
1095 SnapshotKvDbSqlite::create(
1096 temp_db_path.as_path(),
1097 &self.already_open_snapshots,
1098 &self.open_snapshot_semaphore,
1099 mpt_table_in_current_db,
1100 )?
1101 };
1102
1103 snapshot_kv_db.dump_delta_mpt(&delta_mpt)?;
1104 let _open_lock = self.copying_mpt_snapshot.lock();
1105 snapshot_kv_db.direct_merge(
1106 None,
1107 &mut snapshot_mpt_db,
1108 recover_mpt_with_kv_snapshot_exist,
1109 self.reconstruct_snapshot_id_for_reboot
1110 .write()
1111 .take()
1112 .is_some_and(|v| v == snapshot_epoch_id),
1113 )?
1114 } else {
1115 let (db_path, copied) = if recover_mpt_with_kv_snapshot_exist {
1116 (new_snapshot_db_path.clone(), false)
1117 } else {
1118 let copied = if let Ok(copy_type) = Self::try_copy_snapshot(
1119 self.get_snapshot_db_path(old_snapshot_epoch_id).as_path(),
1120 temp_db_path.as_path(),
1121 self.force_cow,
1122 ) {
1123 cow = match copy_type {
1124 CopyType::Cow => true,
1125 _ => false,
1126 };
1127 debug!("copy on write state: {}", cow);
1128 true
1129 } else {
1130 false
1131 };
1132
1133 (temp_db_path.clone(), copied)
1134 };
1135
1136 if recover_mpt_with_kv_snapshot_exist || copied {
1137 (snapshot_kv_db, snapshot_mpt_db) = self.open_snapshot_write(
1139 db_path,
1140 false,
1141 new_epoch_height,
1142 None,
1143 &snapshot_epoch_id,
1144 )?;
1145
1146 snapshot_kv_db.drop_delta_mpt_dump()?;
1148
1149 snapshot_kv_db.dump_delta_mpt(&delta_mpt)?;
1151
1152 let old_snapshot;
1153 let old_snapshot_db = if snapshot_kv_db
1154 .is_mpt_table_in_current_db()
1155 {
1156 None
1157 } else {
1158 let snapshot_path =
1159 self.get_snapshot_db_path(old_snapshot_epoch_id);
1160 let maybe_old_snapshot_db = Self::open_snapshot_readonly(
1161 self,
1162 snapshot_path,
1163 false,
1164 old_snapshot_epoch_id,
1165 false,
1166 )?;
1167 old_snapshot = maybe_old_snapshot_db
1168 .ok_or(Error::from(Error::SnapshotNotFound))?;
1169 if old_snapshot.is_mpt_table_in_current_db() {
1170 Some(&old_snapshot.snapshot_db)
1171 } else {
1172 None
1173 }
1174 };
1175
1176 let _open_lock = self.copying_mpt_snapshot.lock();
1177 snapshot_kv_db.direct_merge(
1178 old_snapshot_db,
1179 &mut snapshot_mpt_db,
1180 recover_mpt_with_kv_snapshot_exist,
1181 self.reconstruct_snapshot_id_for_reboot
1182 .write()
1183 .take()
1184 .is_some_and(|v| v == snapshot_epoch_id),
1185 )?
1186 } else {
1187 (snapshot_kv_db, snapshot_mpt_db) = self.open_snapshot_write(
1188 temp_db_path.clone(),
1189 true,
1190 new_epoch_height,
1191 None,
1192 &snapshot_epoch_id,
1193 )?;
1194 snapshot_kv_db.dump_delta_mpt(&delta_mpt)?;
1195 let _open_lock = self.copying_mpt_snapshot.lock();
1196 self.copy_and_merge(
1197 &mut snapshot_kv_db,
1198 &mut snapshot_mpt_db,
1199 old_snapshot_epoch_id,
1200 snapshot_epoch_id,
1201 )?
1202 }
1203 };
1204
1205 let temp_mpt_path = if self.backup_mpt_snapshot
1207 && !mpt_table_in_current_db
1208 && new_epoch_height % self.era_epoch_count == 0
1209 {
1210 let temp_mpt_path =
1211 self.get_merge_temp_mpt_snapshot_db_path(&snapshot_epoch_id);
1212 let new_mpt_snapshot_db_path =
1213 self.get_mpt_snapshot_db_path(&snapshot_epoch_id);
1214
1215 self.copy_mpt_snapshot(
1216 temp_mpt_path.clone(),
1217 new_mpt_snapshot_db_path.clone(),
1218 )?;
1219 Some((temp_mpt_path, new_mpt_snapshot_db_path))
1220 } else {
1221 None
1222 };
1223
1224 in_progress_snapshot_info.merkle_root = new_snapshot_root.clone();
1225 drop(snapshot_kv_db);
1226 drop(snapshot_mpt_db);
1227 let locked = snapshot_info_map_rwlock.write();
1228
1229 if !recover_mpt_with_kv_snapshot_exist {
1230 if let Some((temp_mpt_path, new_mpt_snapshot_db_path)) =
1231 temp_mpt_path
1232 {
1233 while !temp_mpt_path.exists()
1234 && !new_mpt_snapshot_db_path.exists()
1235 {
1236 debug!("Wait mpt path existing");
1238 thread::sleep(Duration::from_millis(5));
1239 }
1240 }
1241
1242 Self::rename_snapshot_db(&temp_db_path, &new_snapshot_db_path)?;
1243 }
1244
1245 if cfg!(target_os = "linux")
1246 && cow
1247 && snapshot_epoch_id.as_fixed_bytes()[31] & 15 == 0
1248 {
1249 self.defragmenting_xfs_files(new_snapshot_db_path);
1250 }
1251
1252 Ok((locked, in_progress_snapshot_info))
1253 }
1254
1255 fn get_snapshot_by_epoch_id(
1256 &self, snapshot_epoch_id: &EpochId, try_open: bool,
1257 open_mpt_snapshot: bool,
1258 ) -> Result<Option<Self::SnapshotDb>> {
1259 if snapshot_epoch_id.eq(&NULL_EPOCH) {
1260 return Ok(Some(Self::SnapshotDb::get_null_snapshot()));
1261 } else {
1262 let path = self.get_snapshot_db_path(snapshot_epoch_id);
1263 self.open_snapshot_readonly(
1264 path,
1265 try_open,
1266 snapshot_epoch_id,
1267 open_mpt_snapshot,
1268 )
1269 }
1270 }
1271
1272 fn destroy_snapshot(&self, snapshot_epoch_id: &EpochId) -> Result<()> {
1273 debug!("destroy snapshot {:?}", snapshot_epoch_id);
1274 let path = self.get_snapshot_db_path(snapshot_epoch_id);
1275 let maybe_snapshot = loop {
1276 match self.already_open_snapshots.read().get(&path) {
1277 Some(Some(snapshot)) => {
1278 match Weak::upgrade(snapshot) {
1279 None => {
1280 thread::sleep(Duration::from_millis(5));
1285 continue;
1286 }
1287 Some(snapshot) => break Some(snapshot),
1288 }
1289 }
1290 Some(None) => {
1291 unreachable!("Try to destroy a snapshot being open exclusively for write.")
1296 }
1297 None => break None,
1298 };
1299 };
1300
1301 match maybe_snapshot {
1302 None => {
1303 if snapshot_epoch_id.ne(&NULL_EPOCH) {
1304 Self::fs_remove_snapshot(&path);
1305 }
1306 }
1307 Some(snapshot) => {
1308 snapshot.set_remove_on_last_close();
1309 }
1310 };
1311
1312 {
1313 let mpt_snapshot_path =
1315 self.get_mpt_snapshot_db_path(&snapshot_epoch_id);
1316
1317 let maybe_snapshot = loop {
1318 match self
1319 .mpt_already_open_snapshots
1320 .read()
1321 .get(&mpt_snapshot_path)
1322 {
1323 Some(Some(snapshot)) => match Weak::upgrade(snapshot) {
1324 None => {
1325 thread::sleep(Duration::from_millis(5));
1326 continue;
1327 }
1328 Some(snapshot) => break Some(snapshot),
1329 },
1330 Some(None) => {
1331 unreachable!("Try to destroy a snapshot being open exclusively for write.")
1332 }
1333 None => break None,
1334 };
1335 };
1336
1337 match maybe_snapshot {
1338 None => {
1339 if snapshot_epoch_id.ne(&NULL_EPOCH)
1340 && mpt_snapshot_path.exists()
1341 {
1342 debug!(
1343 "destroy_mpt_snapshot remove mpt db {:?}",
1344 mpt_snapshot_path
1345 );
1346 Self::fs_remove_snapshot(&mpt_snapshot_path);
1347 }
1348 }
1349 Some(snapshot) => {
1350 snapshot.set_remove_on_last_close();
1351 }
1352 };
1353 }
1354
1355 Ok(())
1356 }
1357
1358 fn new_temp_snapshot_for_full_sync(
1359 &self, snapshot_epoch_id: &EpochId, merkle_root: &MerkleHash,
1360 epoch_height: u64,
1361 ) -> Result<Self::SnapshotDbWrite> {
1362 let mpt_table_in_current_db =
1363 self.is_mpt_table_in_current_db_for_epoch(epoch_height);
1364 let temp_mpt_snapshot_path = if mpt_table_in_current_db {
1365 None
1366 } else {
1367 Some(self.get_full_sync_temp_mpt_snapshot_db_path(
1368 snapshot_epoch_id,
1369 merkle_root,
1370 ))
1371 };
1372
1373 let temp_db_path = self.get_full_sync_temp_snapshot_db_path(
1374 snapshot_epoch_id,
1375 merkle_root,
1376 );
1377 let (kv_snapshot_db, mpt_snapshot_db) = self.open_snapshot_write(
1378 temp_db_path.to_path_buf(),
1379 true,
1380 epoch_height,
1381 temp_mpt_snapshot_path,
1382 snapshot_epoch_id,
1383 )?;
1384
1385 Ok(SnapshotDbWriteable {
1386 kv_snapshot_db,
1387 mpt_snapshot_db,
1388 })
1389 }
1390
1391 fn finalize_full_sync_snapshot<'m>(
1392 &self, snapshot_epoch_id: &EpochId, merkle_root: &MerkleHash,
1393 snapshot_info_map_rwlock: &'m RwLock<PersistedSnapshotInfoMap>,
1394 ) -> Result<RwLockWriteGuard<'m, PersistedSnapshotInfoMap>> {
1395 let temp_mpt_snapshot_path = self
1396 .get_full_sync_temp_mpt_snapshot_db_path(
1397 snapshot_epoch_id,
1398 merkle_root,
1399 );
1400 let latest_mpt_snapshot_path = self.get_latest_mpt_snapshot_db_path();
1401
1402 let temp_db_path = self.get_full_sync_temp_snapshot_db_path(
1403 snapshot_epoch_id,
1404 merkle_root,
1405 );
1406 let final_db_path = self.get_snapshot_db_path(snapshot_epoch_id);
1407 let locked = snapshot_info_map_rwlock.write();
1408
1409 if temp_mpt_snapshot_path.exists() {
1410 if latest_mpt_snapshot_path.exists() {
1411 debug!(
1412 "Remove latest mpt snapshot {:?}",
1413 latest_mpt_snapshot_path
1414 );
1415 if let Err(e) =
1416 fs::remove_dir_all(&latest_mpt_snapshot_path.as_path())
1417 {
1418 error!(
1419 "remove snapshot err: path={:?} err={:?}",
1420 latest_mpt_snapshot_path.as_path(),
1421 e
1422 );
1423 }
1424 }
1425
1426 Self::rename_snapshot_db(
1427 &temp_mpt_snapshot_path,
1428 &latest_mpt_snapshot_path,
1429 )?;
1430
1431 let temp_mpt_path =
1432 self.get_merge_temp_mpt_snapshot_db_path(&snapshot_epoch_id);
1433 let new_mpt_snapshot_db_path =
1434 self.get_mpt_snapshot_db_path(&snapshot_epoch_id);
1435
1436 self.copy_mpt_snapshot(
1437 temp_mpt_path.clone(),
1438 new_mpt_snapshot_db_path.clone(),
1439 )?;
1440 while !temp_mpt_path.exists() && !new_mpt_snapshot_db_path.exists()
1441 {
1442 debug!("Wait mpt path existing");
1444 thread::sleep(Duration::from_millis(5));
1445 }
1446 }
1447
1448 Self::rename_snapshot_db(&temp_db_path, &final_db_path)?;
1449 Ok(locked)
1450 }
1451
1452 fn recovery_latest_mpt_snapshot_from_checkpoint(
1453 &self, snapshot_epoch_id: &EpochId,
1454 snapshot_epoch_id_before_recovered: Option<EpochId>,
1455 ) -> Result<()> {
1456 info!(
1457 "recovery latest mpt snapshot from checkpoint {}",
1458 snapshot_epoch_id
1459 );
1460 if snapshot_epoch_id == &NULL_EPOCH {
1461 self.recreate_latest_mpt_snapshot()?;
1462 return Ok(());
1463 }
1464
1465 *self.snapshot_epoch_id_before_recovered.write() =
1466 snapshot_epoch_id_before_recovered;
1467
1468 let source = self.get_mpt_snapshot_db_path(snapshot_epoch_id);
1471 if source.exists() {
1472 let latest_mpt_snapshot_path =
1473 self.get_latest_mpt_snapshot_db_path();
1474 if latest_mpt_snapshot_path.exists() {
1475 debug!("remove mpt snapshot {:?}", latest_mpt_snapshot_path);
1476 if let Err(e) =
1477 fs::remove_dir_all(&latest_mpt_snapshot_path.as_path())
1478 {
1479 error!(
1480 "remove mpt snapshot err: path={:?} err={:?}",
1481 latest_mpt_snapshot_path.as_path(),
1482 e
1483 );
1484 }
1485 }
1486
1487 debug!(
1488 "Copy mpt db for latest from snapshot {:?} ",
1489 snapshot_epoch_id
1490 );
1491 let temp_mpt_path =
1492 self.get_merge_temp_mpt_snapshot_db_path(&snapshot_epoch_id);
1493
1494 Self::try_copy_snapshot(
1495 source.as_path(),
1496 temp_mpt_path.as_path(),
1497 self.force_cow,
1498 )?;
1499 Self::rename_snapshot_db(&temp_mpt_path, &latest_mpt_snapshot_path)
1500 } else {
1501 panic!(
1502 "mpt snapshot for epoch {} does not exist",
1503 snapshot_epoch_id
1504 );
1505 }
1506 }
1507
1508 fn create_mpt_snapshot_from_latest(
1509 &self, new_snapshot_epoch_id: &EpochId,
1510 ) -> Result<()> {
1511 info!(
1512 "copy latest mpt snapshot for epoch {}",
1513 new_snapshot_epoch_id
1514 );
1515 let latest_mpt_snapshot_path = self.get_latest_mpt_snapshot_db_path();
1516 if !latest_mpt_snapshot_path.exists() {
1517 return Err(Error::from("latest mpt snapshot not exists"));
1518 }
1519
1520 let new_mpt_snapshot_db_path =
1521 self.get_mpt_snapshot_db_path(new_snapshot_epoch_id);
1522 let temp_mpt_path =
1523 self.get_merge_temp_mpt_snapshot_db_path(new_snapshot_epoch_id);
1524
1525 Self::try_copy_snapshot(
1526 latest_mpt_snapshot_path.as_path(),
1527 temp_mpt_path.as_path(),
1528 self.force_cow,
1529 )?;
1530
1531 if new_mpt_snapshot_db_path.exists() {
1532 fs::remove_dir_all(&new_mpt_snapshot_db_path.as_path())?;
1533 }
1534
1535 Self::rename_snapshot_db(&temp_mpt_path, &new_mpt_snapshot_db_path)
1536 }
1537
1538 fn try_get_new_snapshot_epoch_from_temp_path(
1539 &self, dir_name: &str,
1540 ) -> Option<EpochId> {
1541 self.try_get_new_snapshot_epoch_from_temp_path(dir_name)
1542 }
1543
1544 fn try_get_new_snapshot_epoch_from_mpt_temp_path(
1545 &self, dir_name: &str,
1546 ) -> Option<EpochId> {
1547 self.try_get_new_snapshot_epoch_from_mpt_temp_path(dir_name)
1548 }
1549}
1550
1551use crate::{
1552 impls::{
1553 delta_mpt::DeltaMptIterator, errors::*,
1554 storage_db::snapshot_kv_db_sqlite::*,
1555 storage_manager::PersistedSnapshotInfoMap,
1556 },
1557 storage_db::{
1558 key_value_db::KeyValueDbTraitSingleWriter, DbValueType,
1559 KeyValueDbTypes, OpenSnapshotMptTrait, SnapshotDbManagerTrait,
1560 SnapshotDbTrait, SnapshotDbWriteableTrait, SnapshotInfo,
1561 SnapshotMptDbValue,
1562 },
1563};
1564use fs_extra::dir::CopyOptions;
1565use futures::executor;
1566use parking_lot::{Mutex, RwLock, RwLockWriteGuard};
1567use primitives::{EpochId, MerkleHash, NULL_EPOCH};
1568use rustc_hex::ToHex;
1569use std::{
1570 collections::HashMap,
1571 fs,
1572 path::{Path, PathBuf},
1573 process::Command,
1574 str::FromStr,
1575 sync::{Arc, Weak},
1576 thread,
1577 time::Duration,
1578};
1579use tokio::sync::Semaphore;
1580
1581use super::{
1582 kvdb_sqlite_sharded::KvdbSqliteShardedBorrowMut,
1583 snapshot_db_sqlite::SnapshotDbSqlite, snapshot_mpt::SnapshotMpt,
1584 snapshot_mpt_db_sqlite::SnapshotMptDbSqlite,
1585};