cfx_storage/impls/storage_db/
snapshot_db_manager_sqlite.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5pub struct SnapshotDbManagerSqlite {
6    snapshot_path: PathBuf,
7    // FIXME: add an command line option to assert that this method made
8    // successfully cow_copy and print error messages if it fails.
9    force_cow: bool,
10    already_open_snapshots: AlreadyOpenSnapshots<SnapshotKvDbSqlite>,
11    /// Set a limit on the number of open snapshots. When the limit is reached,
12    /// consensus initiated open should wait, other non-critical opens such as
13    /// rpc initiated opens should simply abort when the limit is reached.
14    open_snapshot_semaphore: Arc<Semaphore>,
15    open_create_delete_lock: Mutex<()>,
16    use_isolated_db_for_mpt_table: bool,
17    use_isolated_db_for_mpt_table_height: Option<u64>,
18    mpt_snapshot_path: PathBuf,
19    mpt_already_open_snapshots: AlreadyOpenSnapshots<SnapshotMptDbSqlite>,
20    mpt_open_snapshot_semaphore: Arc<Semaphore>,
21    era_epoch_count: u64,
22    max_open_snapshots: u16,
23    latest_mpt_snapshot_semaphore: Arc<Semaphore>,
24    latest_snapshot_id: RwLock<(EpochId, u64)>,
25    copying_mpt_snapshot: Arc<Mutex<()>>,
26    snapshot_epoch_id_before_recovered: RwLock<Option<EpochId>>,
27    reconstruct_snapshot_id_for_reboot: RwLock<Option<EpochId>>,
28    backup_mpt_snapshot: bool,
29}
30
31#[derive(Debug)]
32enum CopyType {
33    Cow,
34    Std,
35}
36
37pub struct SnapshotDbWriteable {
38    pub kv_snapshot_db: SnapshotKvDbSqlite,
39    pub mpt_snapshot_db: Option<SnapshotMptDbSqlite>,
40}
41
42impl KeyValueDbTypes for SnapshotDbWriteable {
43    type ValueType = Box<[u8]>;
44}
45
46impl SnapshotDbWriteableTrait for SnapshotDbWriteable {
47    type SnapshotDbBorrowMutType = SnapshotMpt<
48        KvdbSqliteShardedBorrowMut<'static, SnapshotMptDbValue>,
49        KvdbSqliteShardedBorrowMut<'static, SnapshotMptDbValue>,
50    >;
51
52    fn start_transaction(&mut self) -> Result<()> {
53        self.kv_snapshot_db.start_transaction()?;
54
55        if self.mpt_snapshot_db.is_some() {
56            self.mpt_snapshot_db.as_mut().unwrap().start_transaction()?;
57        }
58
59        Ok(())
60    }
61
62    fn commit_transaction(&mut self) -> Result<()> {
63        self.kv_snapshot_db.commit_transaction()?;
64
65        if self.mpt_snapshot_db.is_some() {
66            self.mpt_snapshot_db
67                .as_mut()
68                .unwrap()
69                .commit_transaction()?;
70        }
71
72        Ok(())
73    }
74
75    fn put_kv(
76        &mut self, key: &[u8], value: &<Self::ValueType as DbValueType>::Type,
77    ) -> Result<Option<Option<Self::ValueType>>> {
78        self.kv_snapshot_db.put(key, value)
79    }
80
81    fn open_snapshot_mpt_owned(
82        &mut self,
83    ) -> Result<Self::SnapshotDbBorrowMutType> {
84        if self.kv_snapshot_db.is_mpt_table_in_current_db() {
85            self.kv_snapshot_db.open_snapshot_mpt_owned()
86        } else {
87            self.mpt_snapshot_db
88                .as_mut()
89                .unwrap()
90                .open_snapshot_mpt_owned()
91        }
92    }
93}
94
95// The map from path to the already open snapshots.
96// when the mapped snapshot is None, the snapshot is open exclusively for write,
97// when the mapped snapshot is Some(), the snapshot can be shared by other
98// readers.
99pub type AlreadyOpenSnapshots<T> =
100    Arc<RwLock<HashMap<PathBuf, Option<Weak<T>>>>>;
101
102impl SnapshotDbManagerSqlite {
103    pub const LATEST_MPT_SNAPSHOT_DIR: &'static str = "latest";
104    const MPT_SNAPSHOT_DIR: &'static str = "mpt_snapshot";
105    const SNAPSHOT_DB_SQLITE_DIR_PREFIX: &'static str = "sqlite_";
106
107    pub fn new(
108        snapshot_path: PathBuf, max_open_snapshots: u16,
109        use_isolated_db_for_mpt_table: bool,
110        use_isolated_db_for_mpt_table_height: Option<u64>,
111        era_epoch_count: u64, backup_mpt_snapshot: bool,
112    ) -> Result<Self> {
113        if !snapshot_path.exists() {
114            fs::create_dir_all(snapshot_path.clone())?;
115        }
116
117        let mpt_snapshot_path = snapshot_path
118            .parent()
119            .unwrap()
120            .join(SnapshotDbManagerSqlite::MPT_SNAPSHOT_DIR);
121        let latest_mpt_snapshot_path = mpt_snapshot_path.join(
122            Self::SNAPSHOT_DB_SQLITE_DIR_PREFIX.to_string()
123                + SnapshotDbManagerSqlite::LATEST_MPT_SNAPSHOT_DIR,
124        );
125
126        // Create the latest MPT database if database not exist
127        SnapshotMptDbSqlite::create(
128            latest_mpt_snapshot_path.as_path(),
129            &Default::default(),
130            &Arc::new(Semaphore::new(max_open_snapshots as usize)),
131            None,
132        )?;
133
134        Ok(Self {
135            snapshot_path,
136            force_cow: false,
137            already_open_snapshots: Default::default(),
138            open_snapshot_semaphore: Arc::new(Semaphore::new(
139                max_open_snapshots as usize,
140            )),
141            open_create_delete_lock: Default::default(),
142            mpt_snapshot_path,
143            use_isolated_db_for_mpt_table,
144            use_isolated_db_for_mpt_table_height,
145            mpt_already_open_snapshots: Default::default(),
146            mpt_open_snapshot_semaphore: Arc::new(Semaphore::new(
147                max_open_snapshots as usize,
148            )),
149            era_epoch_count,
150            max_open_snapshots,
151            latest_mpt_snapshot_semaphore: Arc::new(Semaphore::new(1)),
152            latest_snapshot_id: RwLock::new((NULL_EPOCH, 0)),
153            copying_mpt_snapshot: Arc::new(Default::default()),
154            snapshot_epoch_id_before_recovered: RwLock::new(None),
155            reconstruct_snapshot_id_for_reboot: RwLock::new(None),
156            backup_mpt_snapshot,
157        })
158    }
159
160    pub fn update_latest_snapshot_id(&self, snapshot_id: EpochId, height: u64) {
161        *self.latest_snapshot_id.write() = (snapshot_id, height);
162    }
163
164    pub fn clean_snapshot_epoch_id_before_recovered(&self) {
165        *self.snapshot_epoch_id_before_recovered.write() = None;
166    }
167
168    pub fn set_reconstruct_snapshot_id(
169        &self, reconstruct_pivot: Option<EpochId>,
170    ) {
171        debug!("set_reconstruct_snapshot_id to {:?}", reconstruct_pivot);
172        *self.reconstruct_snapshot_id_for_reboot.write() = reconstruct_pivot;
173    }
174
175    pub fn recreate_latest_mpt_snapshot(&self) -> Result<()> {
176        info!("recreate latest mpt snapshot");
177        let latest_mpt_snapshot_path = self.get_latest_mpt_snapshot_db_path();
178        if latest_mpt_snapshot_path.exists() {
179            debug!("remove mpt snapshot {:?}", latest_mpt_snapshot_path);
180            if let Err(e) =
181                fs::remove_dir_all(&latest_mpt_snapshot_path.as_path())
182            {
183                error!(
184                    "remove mpt snapshot err: path={:?} err={:?}",
185                    latest_mpt_snapshot_path.as_path(),
186                    e
187                );
188            }
189        }
190
191        // recreate latest MPT database
192        SnapshotMptDbSqlite::create(
193            latest_mpt_snapshot_path.as_path(),
194            &Default::default(),
195            &Arc::new(Semaphore::new(self.max_open_snapshots as usize)),
196            None,
197        )?;
198
199        Ok(())
200    }
201
202    fn open_snapshot_readonly(
203        &self, snapshot_path: PathBuf, try_open: bool,
204        snapshot_epoch_id: &EpochId, read_mpt_snapshot: bool,
205    ) -> Result<Option<SnapshotDbSqlite>> {
206        // indicate under recovery
207        if self
208            .snapshot_epoch_id_before_recovered
209            .read()
210            .is_some_and(|e| &e == snapshot_epoch_id)
211        {
212            return Ok(None);
213        }
214
215        // To serialize simultaneous opens.
216        let _open_lock = self.open_create_delete_lock.lock();
217
218        if let Some(snapshot_db) =
219            self.open_kv_snapshot_readonly(snapshot_path, try_open)?
220        {
221            let mpt_snapshot_db = if !snapshot_db.is_mpt_table_in_current_db()
222                && read_mpt_snapshot
223            {
224                // Use the existing directory for the specific database (for ear
225                // checkpoint)
226                let mpt_snapshot_path = if self.use_isolated_db_for_mpt_table {
227                    let mpt_snapshot_path =
228                        self.get_mpt_snapshot_db_path(snapshot_epoch_id);
229                    if mpt_snapshot_path.exists() {
230                        mpt_snapshot_path
231                    } else {
232                        if self.latest_snapshot_id.read().0
233                            == *snapshot_epoch_id
234                        {
235                            self.get_latest_mpt_snapshot_db_path()
236                        } else {
237                            error!("MPT DB not exist, latest snapshot id {:?}, try to open {:?}.", self.latest_snapshot_id.read().0, snapshot_epoch_id);
238                            return Ok(None);
239                        }
240                    }
241                } else {
242                    error!("MPT table should be in snapshot database.");
243                    return Ok(None);
244                };
245
246                let mpt_snapshot_db = self.open_mpt_snapshot_readonly(
247                    mpt_snapshot_path,
248                    try_open,
249                    snapshot_epoch_id,
250                )?;
251
252                mpt_snapshot_db
253            } else {
254                None
255            };
256
257            return Ok(Some(SnapshotDbSqlite {
258                snapshot_db,
259                mpt_snapshot_db,
260            }));
261        } else {
262            return Ok(None);
263        }
264    }
265
266    fn open_kv_snapshot_readonly(
267        &self, snapshot_path: PathBuf, try_open: bool,
268    ) -> Result<Option<Arc<SnapshotKvDbSqlite>>> {
269        if let Some(already_open) =
270            self.already_open_snapshots.read().get(&snapshot_path)
271        {
272            match already_open {
273                None => {
274                    // Already open for exclusive write
275                    return Ok(None);
276                }
277                Some(open_shared_weak) => {
278                    match Weak::upgrade(open_shared_weak) {
279                        None => {}
280                        Some(already_open) => {
281                            return Ok(Some(already_open));
282                        }
283                    }
284                }
285            }
286        }
287        let file_exists = snapshot_path.exists();
288        if file_exists {
289            let semaphore_permit = if try_open {
290                self.open_snapshot_semaphore
291                    .try_acquire()
292                    // Unfortunately we have to use map_error because the
293                    // TryAcquireError isn't public.
294                    .map_err(|_err| Error::SemaphoreTryAcquireError)?
295            } else {
296                executor::block_on(self.open_snapshot_semaphore.acquire())
297                    .map_err(|_err| Error::SemaphoreAcquireError)?
298            };
299
300            // If it's not in already_open_snapshots, the sqlite db must have
301            // been closed.
302            while let Some(already_open) =
303                self.already_open_snapshots.read().get(&snapshot_path)
304            {
305                match already_open {
306                    None => {
307                        // Already open for exclusive write
308                        return Ok(None);
309                    }
310                    Some(open_shared_weak) => {
311                        match Weak::upgrade(open_shared_weak) {
312                            None => {
313                                // All `Arc` of the sqlite db have been dropped,
314                                // but the inner
315                                // struct (sqlite db itself) drop is called
316                                // after decreasing
317                                // the strong_ref count, so it may still be open
318                                // at this time,
319                                // and after it's closed it will be removed from
320                                // `already_open_snapshots`.
321                                // Thus, here we wait for it to be removed to
322                                // ensure that when we try
323                                // to open it, it's guaranteed to be closed.
324                                thread::sleep(Duration::from_millis(5));
325                                continue;
326                            }
327                            Some(already_open) => {
328                                return Ok(Some(already_open));
329                            }
330                        }
331                    }
332                }
333            }
334
335            let snapshot_db = Arc::new(SnapshotKvDbSqlite::open(
336                snapshot_path.as_path(),
337                /* readonly = */ true,
338                &self.already_open_snapshots,
339                &self.open_snapshot_semaphore,
340            )?);
341
342            semaphore_permit.forget();
343            self.already_open_snapshots.write().insert(
344                snapshot_path.into(),
345                Some(Arc::downgrade(&snapshot_db)),
346            );
347
348            return Ok(Some(snapshot_db));
349        } else {
350            return Ok(None);
351        }
352    }
353
354    fn open_snapshot_write(
355        &self, snapshot_path: PathBuf, create: bool, new_epoch_height: u64,
356        mpt_snapshot_path: Option<PathBuf>, new_snapshot_id: &EpochId,
357    ) -> Result<(SnapshotKvDbSqlite, Option<SnapshotMptDbSqlite>)> {
358        let mut _open_lock = self.open_create_delete_lock.lock();
359
360        let kv_db = self.open_kv_snapshot_write(
361            snapshot_path,
362            create,
363            new_epoch_height,
364        )?;
365
366        let mpt_table_in_current_db =
367            self.is_mpt_table_in_current_db_for_epoch(new_epoch_height);
368        let mpt_db: Option<SnapshotMptDbSqlite> = if !mpt_table_in_current_db {
369            let (mpt_snapshot_path, create_mpt) = match mpt_snapshot_path {
370                Some(v) => (v, true),
371                _ => {
372                    let latest_snapshot_id = self.latest_snapshot_id.read();
373                    debug!(
374                        "new_epoch_height {}, latest_snapshot_id {} {}",
375                        new_epoch_height,
376                        latest_snapshot_id.0,
377                        latest_snapshot_id.1
378                    );
379                    if new_epoch_height <= latest_snapshot_id.1 {
380                        bail!(format!(
381                            "Try to write an old snapshot {}, {}",
382                            new_epoch_height, latest_snapshot_id.1
383                        ))
384                    }
385
386                    (self.get_latest_mpt_snapshot_db_path(), false)
387                }
388            };
389
390            Some(self.open_mpt_snapshot_write(
391                mpt_snapshot_path,
392                create_mpt,
393                new_epoch_height,
394                new_snapshot_id,
395            )?)
396        } else {
397            None
398        };
399
400        return Ok((kv_db, mpt_db));
401    }
402
403    fn open_kv_snapshot_write(
404        &self, snapshot_path: PathBuf, create: bool, new_epoch_height: u64,
405    ) -> Result<SnapshotKvDbSqlite> {
406        if self
407            .already_open_snapshots
408            .read()
409            .get(&snapshot_path)
410            .is_some()
411        {
412            bail!(Error::SnapshotAlreadyExists)
413        }
414
415        let semaphore_permit =
416            executor::block_on(self.open_snapshot_semaphore.acquire())
417                .map_err(|_err| Error::SemaphoreAcquireError)?;
418        // When an open happens around the same time, we should make sure that
419        // the open returns None.
420
421        // Simultaneous creation fails here.
422        if self
423            .already_open_snapshots
424            .read()
425            .get(&snapshot_path)
426            .is_some()
427        {
428            bail!(Error::SnapshotAlreadyExists)
429        }
430
431        let mpt_table_in_current_db =
432            self.is_mpt_table_in_current_db_for_epoch(new_epoch_height);
433
434        let snapshot_db = if create {
435            SnapshotKvDbSqlite::create(
436                snapshot_path.as_path(),
437                &self.already_open_snapshots,
438                &self.open_snapshot_semaphore,
439                mpt_table_in_current_db,
440            )
441        } else {
442            let file_exists = snapshot_path.exists();
443            if file_exists {
444                let mut db = SnapshotKvDbSqlite::open(
445                    snapshot_path.as_path(),
446                    /* readonly = */ false,
447                    &self.already_open_snapshots,
448                    &self.open_snapshot_semaphore,
449                )?;
450
451                if !mpt_table_in_current_db {
452                    db.drop_mpt_table()?;
453                }
454
455                Ok(db)
456            } else {
457                bail!(Error::SnapshotNotFound);
458            }
459        }?;
460
461        semaphore_permit.forget();
462        self.already_open_snapshots
463            .write()
464            .insert(snapshot_path.clone(), None);
465        Ok(snapshot_db)
466    }
467
468    fn open_mpt_snapshot_readonly(
469        &self, snapshot_path: PathBuf, try_open: bool,
470        snapshot_epoch_id: &EpochId,
471    ) -> Result<Option<Arc<SnapshotMptDbSqlite>>> {
472        debug!(
473            "Open mpt snapshot with readonly {:?}, snapshot_epoch_id {:?}",
474            snapshot_path, snapshot_epoch_id
475        );
476        if let Some(already_open) =
477            self.mpt_already_open_snapshots.read().get(&snapshot_path)
478        {
479            match already_open {
480                None => {
481                    // Already open for exclusive write
482                    return Ok(None);
483                }
484                Some(open_shared_weak) => {
485                    match Weak::upgrade(open_shared_weak) {
486                        None => {}
487                        Some(already_open) => {
488                            return Ok(Some(already_open));
489                        }
490                    }
491                }
492            }
493        }
494        let file_exists = snapshot_path.exists();
495        if file_exists {
496            let semaphore_permit = if try_open {
497                self.mpt_open_snapshot_semaphore
498                    .try_acquire()
499                    // Unfortunately we have to use map_error because the
500                    // TryAcquireError isn't public.
501                    .map_err(|_err| Error::SemaphoreTryAcquireError)?
502            } else {
503                executor::block_on(self.mpt_open_snapshot_semaphore.acquire())
504                    .map_err(|_err| Error::SemaphoreAcquireError)?
505            };
506
507            // If it's not in already_open_snapshots, the sqlite db must have
508            // been closed.
509            while let Some(already_open) =
510                self.mpt_already_open_snapshots.read().get(&snapshot_path)
511            {
512                match already_open {
513                    None => {
514                        // Already open for exclusive write
515                        return Ok(None);
516                    }
517                    Some(open_shared_weak) => {
518                        match Weak::upgrade(open_shared_weak) {
519                            None => {
520                                thread::sleep(Duration::from_millis(5));
521                                continue;
522                            }
523                            Some(already_open) => {
524                                return Ok(Some(already_open));
525                            }
526                        }
527                    }
528                }
529            }
530
531            let (latest_mpt_semaphore_permit, v) = if self
532                .latest_snapshot_id
533                .read()
534                .0
535                == *snapshot_epoch_id
536                && self.latest_snapshot_id.read().1 % self.era_epoch_count != 0
537            {
538                let s =
539                    self.latest_mpt_snapshot_semaphore.try_acquire().map_err(
540                        |_err| "The MPT snapshot is already open for writing.",
541                    )?;
542
543                (Some(s), Some(self.latest_mpt_snapshot_semaphore.clone()))
544            } else {
545                (None, None)
546            };
547
548            let snapshot_db = Arc::new(SnapshotMptDbSqlite::open(
549                snapshot_path.as_path(),
550                /* readonly = */ true,
551                &self.mpt_already_open_snapshots,
552                &self.mpt_open_snapshot_semaphore,
553                v,
554            )?);
555
556            if let Some(s) = latest_mpt_semaphore_permit {
557                s.forget();
558            }
559            semaphore_permit.forget();
560            self.mpt_already_open_snapshots.write().insert(
561                snapshot_path.into(),
562                Some(Arc::downgrade(&snapshot_db)),
563            );
564
565            return Ok(Some(snapshot_db));
566        } else {
567            return Ok(None);
568        }
569    }
570
571    fn open_mpt_snapshot_write(
572        &self, snapshot_path: PathBuf, create: bool, new_epoch_height: u64,
573        new_snapshot_id: &EpochId,
574    ) -> Result<SnapshotMptDbSqlite> {
575        debug!(
576            "open mpt snapshot with write {:?}, new epoch height {}",
577            snapshot_path, new_epoch_height
578        );
579        let latest_mpt_semaphore_permit: tokio::sync::SemaphorePermit =
580            executor::block_on(self.latest_mpt_snapshot_semaphore.acquire())
581                .map_err(|_err| Error::SemaphoreAcquireError)?;
582
583        if self
584            .mpt_already_open_snapshots
585            .read()
586            .get(&snapshot_path)
587            .is_some()
588        {
589            bail!(Error::SnapshotAlreadyExists)
590        }
591
592        let semaphore_permit =
593            executor::block_on(self.mpt_open_snapshot_semaphore.acquire())
594                .map_err(|_err| Error::SemaphoreAcquireError)?;
595
596        let snapshot_db = if create {
597            SnapshotMptDbSqlite::create(
598                snapshot_path.as_path(),
599                &self.mpt_already_open_snapshots,
600                &self.mpt_open_snapshot_semaphore,
601                Some(self.latest_mpt_snapshot_semaphore.clone()),
602            )
603        } else {
604            let file_exists = snapshot_path.exists();
605            if file_exists {
606                SnapshotMptDbSqlite::open(
607                    snapshot_path.as_path(),
608                    /* readonly = */ false,
609                    &self.mpt_already_open_snapshots,
610                    &self.mpt_open_snapshot_semaphore,
611                    Some(self.latest_mpt_snapshot_semaphore.clone()),
612                )
613            } else {
614                bail!(Error::SnapshotNotFound);
615            }
616        }?;
617
618        latest_mpt_semaphore_permit.forget();
619        semaphore_permit.forget();
620
621        *self.latest_snapshot_id.write() =
622            (new_snapshot_id.clone(), new_epoch_height);
623
624        self.mpt_already_open_snapshots
625            .write()
626            .insert(snapshot_path.clone(), None);
627
628        Ok(snapshot_db)
629    }
630
631    pub fn on_close(
632        already_open_snapshots: &AlreadyOpenSnapshots<SnapshotKvDbSqlite>,
633        open_semaphore: &Arc<Semaphore>, path: &Path, remove_on_close: bool,
634    ) {
635        // Destroy at close.
636        if remove_on_close {
637            // When removal fails, we can not raise the error because this
638            // function is called within a destructor.
639            //
640            // It's fine to just ignore the error because Conflux doesn't remove
641            // then immediate create a snapshot, or open the snapshot for
642            // modification.
643            //
644            // Conflux will remove orphan storage upon restart.
645            Self::fs_remove_snapshot(path);
646        }
647        already_open_snapshots.write().remove(path);
648        open_semaphore.add_permits(1);
649    }
650
651    pub fn on_close_mpt_snapshot(
652        already_open_snapshots: &AlreadyOpenSnapshots<SnapshotMptDbSqlite>,
653        open_semaphore: &Arc<Semaphore>, path: &Path, remove_on_close: bool,
654        latest_mpt_snapshot_semaphore: &Option<Arc<Semaphore>>,
655    ) {
656        debug!("on_close_mpt_snapshot path {:?}", path);
657        // Destroy at close.
658        if remove_on_close {
659            Self::fs_remove_snapshot(path);
660        }
661        already_open_snapshots.write().remove(path);
662        open_semaphore.add_permits(1);
663
664        if let Some(s) = latest_mpt_snapshot_semaphore {
665            s.add_permits(1);
666        }
667    }
668
669    fn fs_remove_snapshot(path: &Path) {
670        debug!("Remove snapshot at {}", path.display());
671        let path = path.to_owned();
672        thread::spawn(move || {
673            if let Err(e) = fs::remove_dir_all(&path) {
674                error!("remove snapshot err: path={:?} err={:?}", path, e);
675            }
676            debug!("Finish removing snapshot at {}", path.display());
677        });
678    }
679
680    fn get_merge_temp_snapshot_db_path(
681        &self, old_snapshot_epoch_id: &EpochId, new_snapshot_epoch_id: &EpochId,
682    ) -> PathBuf {
683        self.snapshot_path.join(
684            Self::SNAPSHOT_DB_SQLITE_DIR_PREFIX.to_string()
685                + "merge_temp_"
686                + &old_snapshot_epoch_id.as_ref().to_hex::<String>()
687                + &new_snapshot_epoch_id.as_ref().to_hex::<String>(),
688        )
689    }
690
691    fn try_get_new_snapshot_epoch_from_temp_path(
692        &self, dir_name: &str,
693    ) -> Option<EpochId> {
694        let prefix =
695            Self::SNAPSHOT_DB_SQLITE_DIR_PREFIX.to_string() + "merge_temp_";
696
697        if dir_name.starts_with(&prefix) {
698            match EpochId::from_str(
699                &dir_name[(prefix.len() + EpochId::len_bytes() * 2)..],
700            ) {
701                Ok(e) => Some(e),
702                Err(e) => {
703                    error!(
704                        "get new snapshot epoch id from temp path failed: {}",
705                        e
706                    );
707                    None
708                }
709            }
710        } else {
711            None
712        }
713    }
714
715    fn get_full_sync_temp_snapshot_db_path(
716        &self, snapshot_epoch_id: &EpochId, merkle_root: &MerkleHash,
717    ) -> PathBuf {
718        self.snapshot_path.join(
719            Self::SNAPSHOT_DB_SQLITE_DIR_PREFIX.to_string()
720                + "full_sync_temp_"
721                + &snapshot_epoch_id.as_ref().to_hex::<String>()
722                + &merkle_root.as_ref().to_hex::<String>(),
723        )
724    }
725
726    fn get_merge_temp_mpt_snapshot_db_path(
727        &self, new_snapshot_epoch_id: &EpochId,
728    ) -> PathBuf {
729        self.mpt_snapshot_path.join(
730            Self::SNAPSHOT_DB_SQLITE_DIR_PREFIX.to_string()
731                + "merge_temp_"
732                + &new_snapshot_epoch_id.as_ref().to_hex::<String>(),
733        )
734    }
735
736    fn try_get_new_snapshot_epoch_from_mpt_temp_path(
737        &self, dir_name: &str,
738    ) -> Option<EpochId> {
739        let prefix =
740            Self::SNAPSHOT_DB_SQLITE_DIR_PREFIX.to_string() + "merge_temp_";
741        if dir_name.starts_with(&prefix) {
742            match EpochId::from_str(&dir_name[prefix.len()..]) {
743                Ok(e) => Some(e),
744                Err(e) => {
745                    error!("get new snapshot epoch id from mpt temp path failed: {}", e);
746                    None
747                }
748            }
749        } else {
750            None
751        }
752    }
753
754    fn get_latest_mpt_snapshot_db_path(&self) -> PathBuf {
755        self.mpt_snapshot_path
756            .join(self.get_latest_mpt_snapshot_db_name())
757    }
758
759    fn get_mpt_snapshot_db_path(&self, snapshot_epoch_id: &EpochId) -> PathBuf {
760        self.mpt_snapshot_path
761            .join(&self.get_snapshot_db_name(snapshot_epoch_id))
762    }
763
764    fn get_full_sync_temp_mpt_snapshot_db_path(
765        &self, snapshot_epoch_id: &EpochId, merkle_root: &MerkleHash,
766    ) -> PathBuf {
767        self.mpt_snapshot_path.join(
768            Self::SNAPSHOT_DB_SQLITE_DIR_PREFIX.to_string()
769                + "full_sync_temp_"
770                + &snapshot_epoch_id.as_ref().to_hex::<String>()
771                + &merkle_root.as_ref().to_hex::<String>(),
772        )
773    }
774
775    /// Returns error when cow copy fails; Ok(true) when cow copy succeeded;
776    /// Ok(false) when we are running on a system where cow copy isn't
777    /// available.
778    fn try_make_snapshot_cow_copy_impl(
779        old_snapshot_path: &Path, new_snapshot_path: &Path, force_cow: bool,
780    ) -> Result<bool> {
781        let mut command;
782        if cfg!(target_os = "linux") {
783            // XFS
784            command = Command::new("cp");
785            command
786                .arg("-R")
787                .arg("--reflink=always")
788                .arg(old_snapshot_path)
789                .arg(new_snapshot_path);
790        } else if cfg!(target_os = "macos") {
791            // APFS
792            command = Command::new("cp");
793            command
794                .arg("-R")
795                .arg("-c")
796                .arg(old_snapshot_path)
797                .arg(new_snapshot_path);
798        } else {
799            return Ok(false);
800        };
801
802        let command_result = command.output();
803        if command_result.is_err() {
804            fs::remove_dir_all(new_snapshot_path)?;
805        }
806        if !command_result?.status.success() {
807            fs::remove_dir_all(new_snapshot_path)?;
808            if force_cow {
809                error!(
810                    "COW copy failed, check file system support. Command {:?}",
811                    command,
812                );
813                Err(Error::SnapshotCowCreation.into())
814            } else {
815                info!(
816                    "COW copy failed, check file system support. Command {:?}",
817                    command,
818                );
819                Ok(false)
820            }
821        } else {
822            Ok(true)
823        }
824    }
825
826    fn try_copy_snapshot(
827        old_snapshot_path: &Path, new_snapshot_path: &Path, force_cow: bool,
828    ) -> Result<CopyType> {
829        debug!("try copy into {:?}", new_snapshot_path);
830        if Self::try_make_snapshot_cow_copy(
831            old_snapshot_path,
832            new_snapshot_path,
833            force_cow,
834        )? {
835            Ok(CopyType::Cow)
836        } else {
837            let mut options = CopyOptions::new();
838            options.copy_inside = true; // copy recursively like `cp -r`
839            fs_extra::dir::copy(old_snapshot_path, new_snapshot_path, &options)
840                .map(|_| CopyType::Std)
841                .map_err(|e| {
842                    warn!(
843                        "Fail to copy snapshot {:?}, err={:?}",
844                        old_snapshot_path, e,
845                    );
846                    Error::SnapshotCopyFailure.into()
847                })
848        }
849    }
850
851    /// Returns error when cow copy fails, or when cow copy isn't supported with
852    /// force_cow setting enabled; Ok(true) when cow copy succeeded;
853    /// Ok(false) when cow copy isn't supported with force_cow setting disabled.
854    fn try_make_snapshot_cow_copy(
855        old_snapshot_path: &Path, new_snapshot_path: &Path, force_cow: bool,
856    ) -> Result<bool> {
857        let result = Self::try_make_snapshot_cow_copy_impl(
858            old_snapshot_path,
859            new_snapshot_path,
860            force_cow,
861        );
862
863        if result.is_err() {
864            Ok(false)
865        } else if result.unwrap() == false {
866            if force_cow {
867                // FIXME: Check error string.
868                error!(
869                    "Failed to create a new snapshot by COW. \
870                     Use XFS on linux or APFS on Mac"
871                );
872                Err(Error::SnapshotCowCreation.into())
873            } else {
874                Ok(false)
875            }
876        } else {
877            Ok(true)
878        }
879    }
880
881    fn copy_and_merge(
882        &self, temp_snapshot_db: &mut SnapshotKvDbSqlite,
883        mpt_snapshot_db: &mut Option<SnapshotMptDbSqlite>,
884        old_snapshot_epoch_id: &EpochId, snapshot_epoch_id: EpochId,
885    ) -> Result<MerkleHash> {
886        let snapshot_path = self.get_snapshot_db_path(old_snapshot_epoch_id);
887        let maybe_old_snapshot_db = Self::open_snapshot_readonly(
888            self,
889            snapshot_path,
890            /* try_open = */ false,
891            old_snapshot_epoch_id,
892            false,
893        )?;
894        let old_snapshot_db = maybe_old_snapshot_db
895            .ok_or(Error::from(Error::SnapshotNotFound))?;
896        temp_snapshot_db.copy_and_merge(
897            &old_snapshot_db.snapshot_db,
898            mpt_snapshot_db,
899            self.reconstruct_snapshot_id_for_reboot
900                .write()
901                .take()
902                .is_some_and(|v| v == snapshot_epoch_id),
903        )
904    }
905
906    fn rename_snapshot_db<P: AsRef<Path>>(
907        old_path: P, new_path: P,
908    ) -> Result<()> {
909        Ok(fs::rename(old_path, new_path)?)
910    }
911
912    fn defragmenting_xfs_files(&self, new_snapshot_db_path: PathBuf) {
913        thread::Builder::new()
914            .name("Defragmenting XFS Files".into())
915            .spawn(move || {
916                let paths = fs::read_dir(new_snapshot_db_path).unwrap();
917                let mut files = vec![];
918                for path in paths {
919                    if let Ok(p) = path {
920                        let f = p.path();
921                        if f.is_file() {
922                            files.push(f.as_path().display().to_string());
923                        }
924                    }
925                }
926
927                let mut command = Command::new("xfs_fsr");
928                command.arg("-v");
929                command.args(files);
930                let command_result = command.output();
931                match command_result {
932                    Ok(o) => {
933                        if o.status.success() {
934                            info!(
935                                "Defragmenting XFS files success. Command {:?}",
936                                command
937                            );
938                        }
939                    }
940                    Err(e) => {
941                        error!(
942                            "Defragmenting XFS files failed. Command {:?} \n Error {:?}",
943                            command, e
944                        );
945                    }
946                }
947            })
948            .unwrap();
949    }
950
951    fn copy_mpt_snapshot(
952        &self, temp_mpt_path: PathBuf, new_mpt_snapshot_db_path: PathBuf,
953    ) -> Result<()> {
954        debug!(
955            "Copy mpt db to {:?}, era_epoch_count {}",
956            new_mpt_snapshot_db_path, self.era_epoch_count
957        );
958        let latest_mpt_path = self.get_latest_mpt_snapshot_db_path();
959
960        let force_cow = self.force_cow;
961        let copying_mpt_snapshot = Arc::clone(&self.copying_mpt_snapshot);
962        thread::Builder::new()
963            .name("Copy mpt snapshot".into())
964            .spawn(move || {
965                let _open_lock = copying_mpt_snapshot.lock();
966
967                if let Err(e) = Self::try_copy_snapshot(
968                    latest_mpt_path.as_path(),
969                    temp_mpt_path.as_path(),
970                    force_cow,
971                ) {
972                    error!(
973                        "Copy mpt snapshot failed. Try copy snapshot error. \n Error {:?}",
974                        e
975                    );
976                    return;
977                }
978
979                if new_mpt_snapshot_db_path.exists() {
980                    if let Err(e) =
981                        fs::remove_dir_all(&new_mpt_snapshot_db_path.as_path())
982                    {
983                        error!(
984                            "remove mpt snapshot err: path={:?} err={:?}",
985                            new_mpt_snapshot_db_path.as_path(),
986                            e
987                        );
988                    }
989                }
990
991                if let Err(e) = Self::rename_snapshot_db(
992                    &temp_mpt_path,
993                    &new_mpt_snapshot_db_path,
994                ) {
995                    error!(
996                        "Copy mpt snapshot failed. Rename snapshot db error. \n Error {:?}",
997                        e
998                    );
999                }
1000            })
1001            .unwrap();
1002        Ok(())
1003    }
1004
1005    fn is_mpt_table_in_current_db_for_epoch(&self, epoch_height: u64) -> bool {
1006        if self.use_isolated_db_for_mpt_table {
1007            match self.use_isolated_db_for_mpt_table_height {
1008                Some(v) => epoch_height < v,
1009                _ => false,
1010            }
1011        } else {
1012            true
1013        }
1014    }
1015}
1016
1017impl SnapshotDbManagerTrait for SnapshotDbManagerSqlite {
1018    type SnapshotDb = SnapshotDbSqlite;
1019    type SnapshotDbWrite = SnapshotDbWriteable;
1020
1021    fn get_snapshot_dir(&self) -> &Path { self.snapshot_path.as_path() }
1022
1023    fn get_mpt_snapshot_dir(&self) -> &Path { self.mpt_snapshot_path.as_path() }
1024
1025    fn get_latest_mpt_snapshot_db_name(&self) -> String {
1026        Self::SNAPSHOT_DB_SQLITE_DIR_PREFIX.to_string()
1027            + Self::LATEST_MPT_SNAPSHOT_DIR
1028    }
1029
1030    fn get_snapshot_db_name(&self, snapshot_epoch_id: &EpochId) -> String {
1031        Self::SNAPSHOT_DB_SQLITE_DIR_PREFIX.to_string()
1032            + &snapshot_epoch_id.as_ref().to_hex::<String>()
1033    }
1034
1035    fn get_snapshot_db_path(&self, snapshot_epoch_id: &EpochId) -> PathBuf {
1036        self.snapshot_path
1037            .join(&self.get_snapshot_db_name(snapshot_epoch_id))
1038    }
1039
1040    fn get_epoch_id_from_snapshot_db_name(
1041        &self, snapshot_db_name: &str,
1042    ) -> Result<EpochId> {
1043        let prefix_len = Self::SNAPSHOT_DB_SQLITE_DIR_PREFIX.len();
1044        Ok(EpochId::from_str(&snapshot_db_name[prefix_len..])
1045            .map_err(|_op| "not correct snapshot db name")?)
1046    }
1047
1048    fn new_snapshot_by_merging<'m>(
1049        &self, old_snapshot_epoch_id: &EpochId, snapshot_epoch_id: EpochId,
1050        delta_mpt: DeltaMptIterator,
1051        mut in_progress_snapshot_info: SnapshotInfo,
1052        snapshot_info_map_rwlock: &'m RwLock<PersistedSnapshotInfoMap>,
1053        new_epoch_height: u64, recover_mpt_with_kv_snapshot_exist: bool,
1054    ) -> Result<(RwLockWriteGuard<'m, PersistedSnapshotInfoMap>, SnapshotInfo)>
1055    {
1056        info!(
1057            "new_snapshot_by_merging: old={:?} new={:?} new epoch height={}, recovering mpt={}",
1058            old_snapshot_epoch_id, snapshot_epoch_id, new_epoch_height, recover_mpt_with_kv_snapshot_exist
1059        );
1060        // FIXME: clean-up when error happens.
1061        let temp_db_path = self.get_merge_temp_snapshot_db_path(
1062            old_snapshot_epoch_id,
1063            &snapshot_epoch_id,
1064        );
1065        let new_snapshot_db_path =
1066            self.get_snapshot_db_path(&snapshot_epoch_id);
1067
1068        let mut snapshot_kv_db;
1069        let mut snapshot_mpt_db;
1070        let mut cow = false;
1071
1072        let mpt_table_in_current_db =
1073            self.is_mpt_table_in_current_db_for_epoch(new_epoch_height);
1074        let new_snapshot_root = if *old_snapshot_epoch_id == NULL_EPOCH {
1075            snapshot_mpt_db = if mpt_table_in_current_db {
1076                None
1077            } else {
1078                Some(self.open_mpt_snapshot_write(
1079                    self.get_latest_mpt_snapshot_db_path(),
1080                    true,
1081                    new_epoch_height,
1082                    &snapshot_epoch_id,
1083                )?)
1084            };
1085            snapshot_kv_db = if recover_mpt_with_kv_snapshot_exist {
1086                let mut kv_db = self.open_kv_snapshot_write(
1087                    new_snapshot_db_path.clone(),
1088                    false,
1089                    new_epoch_height,
1090                )?;
1091
1092                kv_db.drop_delta_mpt_dump()?;
1093                kv_db
1094            } else {
1095                SnapshotKvDbSqlite::create(
1096                    temp_db_path.as_path(),
1097                    &self.already_open_snapshots,
1098                    &self.open_snapshot_semaphore,
1099                    mpt_table_in_current_db,
1100                )?
1101            };
1102
1103            snapshot_kv_db.dump_delta_mpt(&delta_mpt)?;
1104            let _open_lock = self.copying_mpt_snapshot.lock();
1105            snapshot_kv_db.direct_merge(
1106                None,
1107                &mut snapshot_mpt_db,
1108                recover_mpt_with_kv_snapshot_exist,
1109                self.reconstruct_snapshot_id_for_reboot
1110                    .write()
1111                    .take()
1112                    .is_some_and(|v| v == snapshot_epoch_id),
1113            )?
1114        } else {
1115            let (db_path, copied) = if recover_mpt_with_kv_snapshot_exist {
1116                (new_snapshot_db_path.clone(), false)
1117            } else {
1118                let copied = if let Ok(copy_type) = Self::try_copy_snapshot(
1119                    self.get_snapshot_db_path(old_snapshot_epoch_id).as_path(),
1120                    temp_db_path.as_path(),
1121                    self.force_cow,
1122                ) {
1123                    cow = match copy_type {
1124                        CopyType::Cow => true,
1125                        _ => false,
1126                    };
1127                    debug!("copy on write state: {}", cow);
1128                    true
1129                } else {
1130                    false
1131                };
1132
1133                (temp_db_path.clone(), copied)
1134            };
1135
1136            if recover_mpt_with_kv_snapshot_exist || copied {
1137                // Open the copied database.
1138                (snapshot_kv_db, snapshot_mpt_db) = self.open_snapshot_write(
1139                    db_path,
1140                    /* create = */ false,
1141                    new_epoch_height,
1142                    None,
1143                    &snapshot_epoch_id,
1144                )?;
1145
1146                // Drop copied old snapshot delta mpt dump
1147                snapshot_kv_db.drop_delta_mpt_dump()?;
1148
1149                // iterate and insert into temp table.
1150                snapshot_kv_db.dump_delta_mpt(&delta_mpt)?;
1151
1152                let old_snapshot;
1153                let old_snapshot_db = if snapshot_kv_db
1154                    .is_mpt_table_in_current_db()
1155                {
1156                    None
1157                } else {
1158                    let snapshot_path =
1159                        self.get_snapshot_db_path(old_snapshot_epoch_id);
1160                    let maybe_old_snapshot_db = Self::open_snapshot_readonly(
1161                        self,
1162                        snapshot_path,
1163                        /* try_open = */ false,
1164                        old_snapshot_epoch_id,
1165                        false,
1166                    )?;
1167                    old_snapshot = maybe_old_snapshot_db
1168                        .ok_or(Error::from(Error::SnapshotNotFound))?;
1169                    if old_snapshot.is_mpt_table_in_current_db() {
1170                        Some(&old_snapshot.snapshot_db)
1171                    } else {
1172                        None
1173                    }
1174                };
1175
1176                let _open_lock = self.copying_mpt_snapshot.lock();
1177                snapshot_kv_db.direct_merge(
1178                    old_snapshot_db,
1179                    &mut snapshot_mpt_db,
1180                    recover_mpt_with_kv_snapshot_exist,
1181                    self.reconstruct_snapshot_id_for_reboot
1182                        .write()
1183                        .take()
1184                        .is_some_and(|v| v == snapshot_epoch_id),
1185                )?
1186            } else {
1187                (snapshot_kv_db, snapshot_mpt_db) = self.open_snapshot_write(
1188                    temp_db_path.clone(),
1189                    /* create = */ true,
1190                    new_epoch_height,
1191                    None,
1192                    &snapshot_epoch_id,
1193                )?;
1194                snapshot_kv_db.dump_delta_mpt(&delta_mpt)?;
1195                let _open_lock = self.copying_mpt_snapshot.lock();
1196                self.copy_and_merge(
1197                    &mut snapshot_kv_db,
1198                    &mut snapshot_mpt_db,
1199                    old_snapshot_epoch_id,
1200                    snapshot_epoch_id,
1201                )?
1202            }
1203        };
1204
1205        // Create a specific MPT database for EAR checkpoint
1206        let temp_mpt_path = if self.backup_mpt_snapshot
1207            && !mpt_table_in_current_db
1208            && new_epoch_height % self.era_epoch_count == 0
1209        {
1210            let temp_mpt_path =
1211                self.get_merge_temp_mpt_snapshot_db_path(&snapshot_epoch_id);
1212            let new_mpt_snapshot_db_path =
1213                self.get_mpt_snapshot_db_path(&snapshot_epoch_id);
1214
1215            self.copy_mpt_snapshot(
1216                temp_mpt_path.clone(),
1217                new_mpt_snapshot_db_path.clone(),
1218            )?;
1219            Some((temp_mpt_path, new_mpt_snapshot_db_path))
1220        } else {
1221            None
1222        };
1223
1224        in_progress_snapshot_info.merkle_root = new_snapshot_root.clone();
1225        drop(snapshot_kv_db);
1226        drop(snapshot_mpt_db);
1227        let locked = snapshot_info_map_rwlock.write();
1228
1229        if !recover_mpt_with_kv_snapshot_exist {
1230            if let Some((temp_mpt_path, new_mpt_snapshot_db_path)) =
1231                temp_mpt_path
1232            {
1233                while !temp_mpt_path.exists()
1234                    && !new_mpt_snapshot_db_path.exists()
1235                {
1236                    // wait till mpt path created
1237                    debug!("Wait mpt path existing");
1238                    thread::sleep(Duration::from_millis(5));
1239                }
1240            }
1241
1242            Self::rename_snapshot_db(&temp_db_path, &new_snapshot_db_path)?;
1243        }
1244
1245        if cfg!(target_os = "linux")
1246            && cow
1247            && snapshot_epoch_id.as_fixed_bytes()[31] & 15 == 0
1248        {
1249            self.defragmenting_xfs_files(new_snapshot_db_path);
1250        }
1251
1252        Ok((locked, in_progress_snapshot_info))
1253    }
1254
1255    fn get_snapshot_by_epoch_id(
1256        &self, snapshot_epoch_id: &EpochId, try_open: bool,
1257        open_mpt_snapshot: bool,
1258    ) -> Result<Option<Self::SnapshotDb>> {
1259        if snapshot_epoch_id.eq(&NULL_EPOCH) {
1260            return Ok(Some(Self::SnapshotDb::get_null_snapshot()));
1261        } else {
1262            let path = self.get_snapshot_db_path(snapshot_epoch_id);
1263            self.open_snapshot_readonly(
1264                path,
1265                try_open,
1266                snapshot_epoch_id,
1267                open_mpt_snapshot,
1268            )
1269        }
1270    }
1271
1272    fn destroy_snapshot(&self, snapshot_epoch_id: &EpochId) -> Result<()> {
1273        debug!("destroy snapshot {:?}", snapshot_epoch_id);
1274        let path = self.get_snapshot_db_path(snapshot_epoch_id);
1275        let maybe_snapshot = loop {
1276            match self.already_open_snapshots.read().get(&path) {
1277                Some(Some(snapshot)) => {
1278                    match Weak::upgrade(snapshot) {
1279                        None => {
1280                            // This is transient and we wait for the db to be
1281                            // fully closed.
1282                            // The assumption is the same as in
1283                            // `open_snapshot_readonly`.
1284                            thread::sleep(Duration::from_millis(5));
1285                            continue;
1286                        }
1287                        Some(snapshot) => break Some(snapshot),
1288                    }
1289                }
1290                Some(None) => {
1291                    // This should not happen because Conflux always write on a
1292                    // snapshot db under a temporary name. All completed
1293                    // snapshots are readonly.
1294
1295                    unreachable!("Try to destroy a snapshot being open exclusively for write.")
1296                }
1297                None => break None,
1298            };
1299        };
1300
1301        match maybe_snapshot {
1302            None => {
1303                if snapshot_epoch_id.ne(&NULL_EPOCH) {
1304                    Self::fs_remove_snapshot(&path);
1305                }
1306            }
1307            Some(snapshot) => {
1308                snapshot.set_remove_on_last_close();
1309            }
1310        };
1311
1312        {
1313            // destroy MPT snapshot
1314            let mpt_snapshot_path =
1315                self.get_mpt_snapshot_db_path(&snapshot_epoch_id);
1316
1317            let maybe_snapshot = loop {
1318                match self
1319                    .mpt_already_open_snapshots
1320                    .read()
1321                    .get(&mpt_snapshot_path)
1322                {
1323                    Some(Some(snapshot)) => match Weak::upgrade(snapshot) {
1324                        None => {
1325                            thread::sleep(Duration::from_millis(5));
1326                            continue;
1327                        }
1328                        Some(snapshot) => break Some(snapshot),
1329                    },
1330                    Some(None) => {
1331                        unreachable!("Try to destroy a snapshot being open exclusively for write.")
1332                    }
1333                    None => break None,
1334                };
1335            };
1336
1337            match maybe_snapshot {
1338                None => {
1339                    if snapshot_epoch_id.ne(&NULL_EPOCH)
1340                        && mpt_snapshot_path.exists()
1341                    {
1342                        debug!(
1343                            "destroy_mpt_snapshot remove mpt db {:?}",
1344                            mpt_snapshot_path
1345                        );
1346                        Self::fs_remove_snapshot(&mpt_snapshot_path);
1347                    }
1348                }
1349                Some(snapshot) => {
1350                    snapshot.set_remove_on_last_close();
1351                }
1352            };
1353        }
1354
1355        Ok(())
1356    }
1357
1358    fn new_temp_snapshot_for_full_sync(
1359        &self, snapshot_epoch_id: &EpochId, merkle_root: &MerkleHash,
1360        epoch_height: u64,
1361    ) -> Result<Self::SnapshotDbWrite> {
1362        let mpt_table_in_current_db =
1363            self.is_mpt_table_in_current_db_for_epoch(epoch_height);
1364        let temp_mpt_snapshot_path = if mpt_table_in_current_db {
1365            None
1366        } else {
1367            Some(self.get_full_sync_temp_mpt_snapshot_db_path(
1368                snapshot_epoch_id,
1369                merkle_root,
1370            ))
1371        };
1372
1373        let temp_db_path = self.get_full_sync_temp_snapshot_db_path(
1374            snapshot_epoch_id,
1375            merkle_root,
1376        );
1377        let (kv_snapshot_db, mpt_snapshot_db) = self.open_snapshot_write(
1378            temp_db_path.to_path_buf(),
1379            /* create = */ true,
1380            epoch_height,
1381            temp_mpt_snapshot_path,
1382            snapshot_epoch_id,
1383        )?;
1384
1385        Ok(SnapshotDbWriteable {
1386            kv_snapshot_db,
1387            mpt_snapshot_db,
1388        })
1389    }
1390
1391    fn finalize_full_sync_snapshot<'m>(
1392        &self, snapshot_epoch_id: &EpochId, merkle_root: &MerkleHash,
1393        snapshot_info_map_rwlock: &'m RwLock<PersistedSnapshotInfoMap>,
1394    ) -> Result<RwLockWriteGuard<'m, PersistedSnapshotInfoMap>> {
1395        let temp_mpt_snapshot_path = self
1396            .get_full_sync_temp_mpt_snapshot_db_path(
1397                snapshot_epoch_id,
1398                merkle_root,
1399            );
1400        let latest_mpt_snapshot_path = self.get_latest_mpt_snapshot_db_path();
1401
1402        let temp_db_path = self.get_full_sync_temp_snapshot_db_path(
1403            snapshot_epoch_id,
1404            merkle_root,
1405        );
1406        let final_db_path = self.get_snapshot_db_path(snapshot_epoch_id);
1407        let locked = snapshot_info_map_rwlock.write();
1408
1409        if temp_mpt_snapshot_path.exists() {
1410            if latest_mpt_snapshot_path.exists() {
1411                debug!(
1412                    "Remove latest mpt snapshot {:?}",
1413                    latest_mpt_snapshot_path
1414                );
1415                if let Err(e) =
1416                    fs::remove_dir_all(&latest_mpt_snapshot_path.as_path())
1417                {
1418                    error!(
1419                        "remove snapshot err: path={:?} err={:?}",
1420                        latest_mpt_snapshot_path.as_path(),
1421                        e
1422                    );
1423                }
1424            }
1425
1426            Self::rename_snapshot_db(
1427                &temp_mpt_snapshot_path,
1428                &latest_mpt_snapshot_path,
1429            )?;
1430
1431            let temp_mpt_path =
1432                self.get_merge_temp_mpt_snapshot_db_path(&snapshot_epoch_id);
1433            let new_mpt_snapshot_db_path =
1434                self.get_mpt_snapshot_db_path(&snapshot_epoch_id);
1435
1436            self.copy_mpt_snapshot(
1437                temp_mpt_path.clone(),
1438                new_mpt_snapshot_db_path.clone(),
1439            )?;
1440            while !temp_mpt_path.exists() && !new_mpt_snapshot_db_path.exists()
1441            {
1442                // wait till mpt path created
1443                debug!("Wait mpt path existing");
1444                thread::sleep(Duration::from_millis(5));
1445            }
1446        }
1447
1448        Self::rename_snapshot_db(&temp_db_path, &final_db_path)?;
1449        Ok(locked)
1450    }
1451
1452    fn recovery_latest_mpt_snapshot_from_checkpoint(
1453        &self, snapshot_epoch_id: &EpochId,
1454        snapshot_epoch_id_before_recovered: Option<EpochId>,
1455    ) -> Result<()> {
1456        info!(
1457            "recovery latest mpt snapshot from checkpoint {}",
1458            snapshot_epoch_id
1459        );
1460        if snapshot_epoch_id == &NULL_EPOCH {
1461            self.recreate_latest_mpt_snapshot()?;
1462            return Ok(());
1463        }
1464
1465        *self.snapshot_epoch_id_before_recovered.write() =
1466            snapshot_epoch_id_before_recovered;
1467
1468        // Replace the latest MPT snapshot with the MPT snapshot of the
1469        // specified snapshot_epoch_id
1470        let source = self.get_mpt_snapshot_db_path(snapshot_epoch_id);
1471        if source.exists() {
1472            let latest_mpt_snapshot_path =
1473                self.get_latest_mpt_snapshot_db_path();
1474            if latest_mpt_snapshot_path.exists() {
1475                debug!("remove mpt snapshot {:?}", latest_mpt_snapshot_path);
1476                if let Err(e) =
1477                    fs::remove_dir_all(&latest_mpt_snapshot_path.as_path())
1478                {
1479                    error!(
1480                        "remove mpt snapshot err: path={:?} err={:?}",
1481                        latest_mpt_snapshot_path.as_path(),
1482                        e
1483                    );
1484                }
1485            }
1486
1487            debug!(
1488                "Copy mpt db for latest from snapshot {:?}  ",
1489                snapshot_epoch_id
1490            );
1491            let temp_mpt_path =
1492                self.get_merge_temp_mpt_snapshot_db_path(&snapshot_epoch_id);
1493
1494            Self::try_copy_snapshot(
1495                source.as_path(),
1496                temp_mpt_path.as_path(),
1497                self.force_cow,
1498            )?;
1499            Self::rename_snapshot_db(&temp_mpt_path, &latest_mpt_snapshot_path)
1500        } else {
1501            panic!(
1502                "mpt snapshot for epoch {} does not exist",
1503                snapshot_epoch_id
1504            );
1505        }
1506    }
1507
1508    fn create_mpt_snapshot_from_latest(
1509        &self, new_snapshot_epoch_id: &EpochId,
1510    ) -> Result<()> {
1511        info!(
1512            "copy latest mpt snapshot for epoch {}",
1513            new_snapshot_epoch_id
1514        );
1515        let latest_mpt_snapshot_path = self.get_latest_mpt_snapshot_db_path();
1516        if !latest_mpt_snapshot_path.exists() {
1517            return Err(Error::from("latest mpt snapshot not exists"));
1518        }
1519
1520        let new_mpt_snapshot_db_path =
1521            self.get_mpt_snapshot_db_path(new_snapshot_epoch_id);
1522        let temp_mpt_path =
1523            self.get_merge_temp_mpt_snapshot_db_path(new_snapshot_epoch_id);
1524
1525        Self::try_copy_snapshot(
1526            latest_mpt_snapshot_path.as_path(),
1527            temp_mpt_path.as_path(),
1528            self.force_cow,
1529        )?;
1530
1531        if new_mpt_snapshot_db_path.exists() {
1532            fs::remove_dir_all(&new_mpt_snapshot_db_path.as_path())?;
1533        }
1534
1535        Self::rename_snapshot_db(&temp_mpt_path, &new_mpt_snapshot_db_path)
1536    }
1537
1538    fn try_get_new_snapshot_epoch_from_temp_path(
1539        &self, dir_name: &str,
1540    ) -> Option<EpochId> {
1541        self.try_get_new_snapshot_epoch_from_temp_path(dir_name)
1542    }
1543
1544    fn try_get_new_snapshot_epoch_from_mpt_temp_path(
1545        &self, dir_name: &str,
1546    ) -> Option<EpochId> {
1547        self.try_get_new_snapshot_epoch_from_mpt_temp_path(dir_name)
1548    }
1549}
1550
1551use crate::{
1552    impls::{
1553        delta_mpt::DeltaMptIterator, errors::*,
1554        storage_db::snapshot_kv_db_sqlite::*,
1555        storage_manager::PersistedSnapshotInfoMap,
1556    },
1557    storage_db::{
1558        key_value_db::KeyValueDbTraitSingleWriter, DbValueType,
1559        KeyValueDbTypes, OpenSnapshotMptTrait, SnapshotDbManagerTrait,
1560        SnapshotDbTrait, SnapshotDbWriteableTrait, SnapshotInfo,
1561        SnapshotMptDbValue,
1562    },
1563};
1564use fs_extra::dir::CopyOptions;
1565use futures::executor;
1566use parking_lot::{Mutex, RwLock, RwLockWriteGuard};
1567use primitives::{EpochId, MerkleHash, NULL_EPOCH};
1568use rustc_hex::ToHex;
1569use std::{
1570    collections::HashMap,
1571    fs,
1572    path::{Path, PathBuf},
1573    process::Command,
1574    str::FromStr,
1575    sync::{Arc, Weak},
1576    thread,
1577    time::Duration,
1578};
1579use tokio::sync::Semaphore;
1580
1581use super::{
1582    kvdb_sqlite_sharded::KvdbSqliteShardedBorrowMut,
1583    snapshot_db_sqlite::SnapshotDbSqlite, snapshot_mpt::SnapshotMpt,
1584    snapshot_mpt_db_sqlite::SnapshotMptDbSqlite,
1585};