cfx_storage/impls/storage_manager/
storage_manager.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5/// The in mem snapshot_info map and the on disk snapshot_info_db is always in
6/// sync.
7pub struct PersistedSnapshotInfoMap {
8    // Db to persist snapshot_info.
9    snapshot_info_db: KvdbSqlite<Box<[u8]>>,
10    // In memory snapshot_info_map_by_epoch.
11    snapshot_info_map_by_epoch: HashMap<EpochId, SnapshotInfo>,
12}
13
14impl PersistedSnapshotInfoMap {
15    fn new(snapshot_info_db: KvdbSqlite<Box<[u8]>>) -> Result<Self> {
16        let mut result = Self {
17            // The map is loaded later
18            snapshot_info_map_by_epoch: Default::default(),
19            snapshot_info_db,
20        };
21        result.load_persist_state()?;
22        Ok(result)
23    }
24
25    fn insert(
26        &mut self, epoch: &EpochId, snapshot_info: SnapshotInfo,
27    ) -> Result<()> {
28        let rlp_bytes = snapshot_info.rlp_bytes();
29        self.snapshot_info_map_by_epoch
30            .insert(epoch.clone(), snapshot_info);
31        self.snapshot_info_db.put(epoch.as_ref(), &rlp_bytes)?;
32        Ok(())
33    }
34
35    fn get_map(&self) -> &HashMap<EpochId, SnapshotInfo> {
36        &self.snapshot_info_map_by_epoch
37    }
38
39    fn get(&self, epoch: &EpochId) -> Option<&SnapshotInfo> {
40        self.snapshot_info_map_by_epoch.get(epoch)
41    }
42
43    fn remove(&mut self, epoch: &EpochId) -> Result<()> {
44        self.snapshot_info_map_by_epoch.remove(epoch);
45        self.snapshot_info_db.delete(epoch.as_ref())?;
46        Ok(())
47    }
48
49    // Unsafe because the in mem map isn't in sync with the db.
50    unsafe fn remove_in_mem_only(
51        &mut self, epoch: &EpochId,
52    ) -> Option<SnapshotInfo> {
53        self.snapshot_info_map_by_epoch.remove(epoch)
54    }
55
56    fn load_persist_state(&mut self) -> Result<()> {
57        // Load snapshot info from db.
58        let (maybe_info_db_connection, statements) =
59            self.snapshot_info_db.destructure_mut();
60
61        let mut snapshot_info_iter = kvdb_sqlite_iter_range_impl(
62            maybe_info_db_connection,
63            statements,
64            &[],
65            None,
66            |row: &Statement<'_>| {
67                let key = row.read::<Vec<u8>>(0)?;
68                let value = row.read::<Vec<u8>>(1)?;
69
70                if key.len() != EpochId::len_bytes() {
71                    Err(DecoderError::RlpInvalidLength.into())
72                } else {
73                    Ok((
74                        EpochId::from_slice(&key),
75                        SnapshotInfo::decode(&Rlp::new(&value))?,
76                    ))
77                }
78            },
79        )?;
80        while let Some((snapshot_epoch, snapshot_info)) =
81            snapshot_info_iter.next()?
82        {
83            self.snapshot_info_map_by_epoch
84                .insert(snapshot_epoch, snapshot_info);
85        }
86        Ok(())
87    }
88}
89
90// FIXME: correctly order code blocks.
91pub struct StorageManager {
92    delta_db_manager: Arc<DeltaDbManager>,
93    delta_mpt_open_db_lru: Arc<OpenDeltaDbLru<DeltaDbManager>>,
94    snapshot_manager: Box<
95        dyn SnapshotManagerTrait<
96                SnapshotDb = SnapshotDb,
97                SnapshotDbManager = SnapshotDbManager,
98            > + Send
99            + Sync,
100    >,
101    delta_mpts_id_gen: Mutex<DeltaMptIdGen>,
102    delta_mpts_node_memory_manager: Arc<DeltaMptsNodeMemoryManager>,
103
104    maybe_db_errors: MaybeDeltaTrieDestroyErrors,
105    snapshot_associated_mpts_by_epoch: RwLock<
106        HashMap<EpochId, (Option<Arc<DeltaMpt>>, Option<Arc<DeltaMpt>>)>,
107    >,
108
109    // Lock order: while this is locked, in
110    // check_make_register_snapshot_background, snapshot_info_map_by_epoch
111    // is locked later.
112    pub in_progress_snapshotting_tasks:
113        RwLock<HashMap<EpochId, Arc<RwLock<InProgressSnapshotTask>>>>,
114    in_progress_snapshot_finish_signaler: Arc<Mutex<Sender<Option<EpochId>>>>,
115    in_progress_snapshotting_joiner: Mutex<Option<JoinHandle<()>>>,
116
117    // The order doesn't matter as long as parent snapshot comes before
118    // children snapshots.
119    // Note that for archive node the list here is just a subset of what's
120    // available.
121    //
122    // Lock order: while this is locked, in load_persist_state and
123    // state_manager.rs:get_state_trees_for_next_epoch
124    // snapshot_associated_mpts_by_epoch is locked later.
125    current_snapshots: RwLock<Vec<SnapshotInfo>>,
126    // Lock order: while this is locked, in register_new_snapshot and
127    // load_persist_state, current_snapshots and
128    // snapshot_associated_mpts_by_epoch are locked later.
129    pub snapshot_info_map_by_epoch: RwLock<PersistedSnapshotInfoMap>,
130
131    last_confirmed_snapshottable_epoch_id: Mutex<Option<EpochId>>,
132
133    pub storage_conf: StorageConfiguration,
134
135    // used during startup for the next compute epoch
136    pub intermediate_trie_root_merkle: RwLock<Option<MerkleHash>>,
137
138    pub persist_state_from_initialization:
139        RwLock<Option<(Option<EpochId>, HashSet<EpochId>, u64, Option<u64>)>>,
140}
141
142impl MallocSizeOf for StorageManager {
143    fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
144        // TODO: Sqlite for snapshot may also use a significant amount of
145        // memory. We need to fork the crate `sqlite` ourselves to
146        // expose `sqlite3_status` to get the memory usage statistics.
147        let mut size = 0;
148        size += self.delta_mpts_node_memory_manager.size_of(ops);
149        size += self.snapshot_associated_mpts_by_epoch.size_of(ops);
150        size
151    }
152}
153
154/// Struct which makes sure that the delta mpt is properly ref-counted and
155/// released.
156pub struct DeltaDbReleaser {
157    pub storage_manager: Weak<StorageManager>,
158    pub snapshot_epoch_id: EpochId,
159    pub mpt_id: DeltaMptId,
160}
161
162impl Drop for DeltaDbReleaser {
163    fn drop(&mut self) {
164        // Don't drop any delta mpt at graceful shutdown because those remaining
165        // DeltaMPTs are useful.
166
167        // Note that when an error happens in db, the program should fail
168        // gracefully, but not in destructor.
169        Weak::upgrade(&self.storage_manager).map(|storage_manager| {
170            storage_manager.release_delta_mpt_actions_in_drop(
171                &self.snapshot_epoch_id,
172                self.mpt_id,
173            )
174        });
175    }
176}
177
178// TODO: Add support for cancellation and io throttling.
179pub struct InProgressSnapshotTask {
180    snapshot_info: SnapshotInfo,
181    thread_handle: Option<thread::JoinHandle<Result<()>>>,
182}
183
184impl InProgressSnapshotTask {
185    // Returns None if the thread has been joined already. Returns the
186    // background snapshotting result when the thread is first joined.
187    pub fn join(&mut self) -> Option<Result<()>> {
188        if let Some(join_handle) = self.thread_handle.take() {
189            match join_handle.join() {
190                Ok(task_result) => Some(task_result),
191                Err(_) => Some(Err(Error::ThreadPanicked(format!(
192                    "Background Snapshotting for {:?} panicked.",
193                    self.snapshot_info
194                ))
195                .into())),
196            }
197        } else {
198            None
199        }
200    }
201}
202
203impl StorageManager {
204    pub fn new_arc(
205        /* TODO: Add node type, full node or archive node */
206        storage_conf: StorageConfiguration,
207    ) -> Result<Arc<Self>> {
208        let storage_dir = storage_conf.path_storage_dir.as_path();
209        debug!(
210            "new StorageManager within storage_dir {}",
211            storage_dir.display()
212        );
213        if !storage_dir.exists() {
214            fs::create_dir_all(storage_dir)?;
215        }
216
217        let (_, snapshot_info_db) = KvdbSqlite::open_or_create(
218            &storage_conf.path_snapshot_info_db,
219            SNAPSHOT_KVDB_STATEMENTS.clone(),
220            false, /* unsafe_mode */
221        )?;
222        let snapshot_info_map =
223            PersistedSnapshotInfoMap::new(snapshot_info_db)?;
224
225        let (
226            in_progress_snapshot_finish_signaler,
227            in_progress_snapshot_finish_signal_receiver,
228        ) = channel();
229
230        let delta_db_manager = Arc::new(DeltaDbManager::new(
231            storage_conf.path_delta_mpts_dir.clone(),
232        )?);
233        let new_storage_manager_result = Ok(Arc::new(Self {
234            delta_db_manager: delta_db_manager.clone(),
235            delta_mpt_open_db_lru: Arc::new(OpenDeltaDbLru::new(
236                delta_db_manager.clone(),
237                storage_conf.max_open_mpt_count,
238            )?),
239            snapshot_manager: Box::new(SnapshotManager::<SnapshotDbManager> {
240                snapshot_db_manager: SnapshotDbManager::new(
241                    storage_conf.path_snapshot_dir.clone(),
242                    storage_conf.max_open_snapshots,
243                    storage_conf.use_isolated_db_for_mpt_table,
244                    storage_conf.use_isolated_db_for_mpt_table_height,
245                    storage_conf.consensus_param.era_epoch_count,
246                    storage_conf.backup_mpt_snapshot,
247                )?,
248            }),
249            delta_mpts_id_gen: Default::default(),
250            delta_mpts_node_memory_manager: Arc::new(
251                DeltaMptsNodeMemoryManager::new(
252                    storage_conf.delta_mpts_cache_start_size,
253                    storage_conf.delta_mpts_cache_size,
254                    storage_conf.delta_mpts_slab_idle_size,
255                    storage_conf.delta_mpts_node_map_vec_size,
256                    DeltaMptsCacheAlgorithm::new(
257                        storage_conf.delta_mpts_cache_size,
258                    ),
259                ),
260            ),
261            maybe_db_errors: MaybeDeltaTrieDestroyErrors::new(),
262            snapshot_associated_mpts_by_epoch: Default::default(),
263            in_progress_snapshotting_tasks: Default::default(),
264            in_progress_snapshot_finish_signaler: Arc::new(Mutex::new(
265                in_progress_snapshot_finish_signaler,
266            )),
267            in_progress_snapshotting_joiner: Default::default(),
268            current_snapshots: Default::default(),
269            snapshot_info_map_by_epoch: RwLock::new(snapshot_info_map),
270            last_confirmed_snapshottable_epoch_id: Default::default(),
271            storage_conf,
272            intermediate_trie_root_merkle: RwLock::new(None),
273            persist_state_from_initialization: RwLock::new(None),
274        }));
275
276        let storage_manager_arc =
277            new_storage_manager_result.as_ref().unwrap().clone();
278        *new_storage_manager_result.as_ref().unwrap().in_progress_snapshotting_joiner.lock() =
279            Some(thread::Builder::new()
280                .name("Background Snapshot Joiner".to_string()).spawn(
281            move || {
282                for exit_program_or_finished_snapshot in
283                    in_progress_snapshot_finish_signal_receiver.iter() {
284                    if exit_program_or_finished_snapshot.is_none() {
285                        break;
286                    }
287                    let finished_snapshot = exit_program_or_finished_snapshot.unwrap();
288                    if let Some(task) = storage_manager_arc
289                        .in_progress_snapshotting_tasks.read().get(&finished_snapshot) {
290                        let snapshot_result = task.write().join();
291                        if let Some(Err(e)) = snapshot_result {
292                            warn!(
293                                "Background snapshotting for {:?} failed with {}",
294                                finished_snapshot, e);
295                        }
296                    }
297                    storage_manager_arc.in_progress_snapshotting_tasks
298                        .write().remove(&finished_snapshot);
299                }
300                // TODO: handle program exit signal.
301            }
302        )?);
303
304        new_storage_manager_result
305            .as_ref()
306            .unwrap()
307            .load_persist_state()?;
308
309        new_storage_manager_result
310    }
311
312    pub fn find_merkle_root(
313        current_snapshots: &Vec<SnapshotInfo>, epoch_id: &EpochId,
314    ) -> Option<MerkleHash> {
315        current_snapshots
316            .iter()
317            .find(|i| i.get_snapshot_epoch_id() == epoch_id)
318            .map(|i| i.merkle_root.clone())
319    }
320
321    pub fn wait_for_snapshot(
322        &self, snapshot_epoch_id: &EpochId, try_open: bool,
323        open_mpt_snapshot: bool,
324    ) -> Result<
325        Option<
326            GuardedValue<RwLockReadGuard<'_, Vec<SnapshotInfo>>, SnapshotDb>,
327        >,
328    > {
329        // Make sure that the snapshot info is ready at the same time of the
330        // snapshot db. This variable is used for the whole scope
331        // however prefixed with _ to please cargo fmt.
332        let _snapshot_info_lock = self.snapshot_info_map_by_epoch.read();
333        // maintain_snapshots_pivot_chain_confirmed() can not delete snapshot
334        // while the current_snapshots are read locked.
335        let guard = self.current_snapshots.read();
336        match self.snapshot_manager.get_snapshot_by_epoch_id(
337            snapshot_epoch_id,
338            try_open,
339            open_mpt_snapshot,
340        )? {
341            Some(snapshot_db) => {
342                Ok(Some(GuardedValue::new(guard, snapshot_db)))
343            }
344            None => {
345                drop(_snapshot_info_lock);
346                drop(guard);
347                // Wait for in progress snapshot.
348                if let Some(in_progress_snapshot_task) = self
349                    .in_progress_snapshotting_tasks
350                    .read()
351                    .get(snapshot_epoch_id)
352                    .cloned()
353                {
354                    // Snapshotting error is thrown-out when the snapshot is
355                    // first requested here.
356                    if let Some(result) =
357                        in_progress_snapshot_task.write().join()
358                    {
359                        result?;
360                    }
361                    let guard = self.current_snapshots.read();
362                    match self.snapshot_manager.get_snapshot_by_epoch_id(
363                        snapshot_epoch_id,
364                        try_open,
365                        open_mpt_snapshot,
366                    ) {
367                        Err(e) => Err(e),
368                        Ok(None) => Ok(None),
369                        Ok(Some(snapshot_db)) => {
370                            Ok(Some(GuardedValue::new(guard, snapshot_db)))
371                        }
372                    }
373                } else {
374                    Ok(None)
375                }
376            }
377        }
378    }
379
380    pub fn graceful_shutdown(&self) {
381        // TODO: First cancel any ongoing thread join from
382        // in_progress_snapshotting_joiner thread.
383        self.in_progress_snapshot_finish_signaler
384            .lock()
385            .send(None)
386            .ok();
387        if let Some(joiner) = self.in_progress_snapshotting_joiner.lock().take()
388        {
389            joiner.join().ok();
390        }
391    }
392
393    pub fn get_snapshot_manager(
394        &self,
395    ) -> &(dyn SnapshotManagerTrait<
396        SnapshotDb = SnapshotDb,
397        SnapshotDbManager = SnapshotDbManager,
398    > + Send
399             + Sync) {
400        &*self.snapshot_manager
401    }
402
403    pub fn get_snapshot_epoch_count(&self) -> u32 {
404        self.storage_conf.consensus_param.snapshot_epoch_count
405    }
406
407    pub fn get_snapshot_info_at_epoch(
408        &self, snapshot_epoch_id: &EpochId,
409    ) -> Option<SnapshotInfo> {
410        self.snapshot_info_map_by_epoch
411            .read()
412            .get(snapshot_epoch_id)
413            .map(Clone::clone)
414    }
415
416    pub fn get_delta_mpt(
417        self: &Arc<Self>, snapshot_epoch_id: &EpochId,
418    ) -> Result<Arc<DeltaMpt>> {
419        {
420            let snapshot_associated_mpts_locked =
421                self.snapshot_associated_mpts_by_epoch.read();
422            match snapshot_associated_mpts_locked.get(snapshot_epoch_id) {
423                None => bail!(Error::DeltaMPTEntryNotFound),
424                Some(delta_mpts) => {
425                    if delta_mpts.1.is_some() {
426                        return Ok(delta_mpts.1.as_ref().unwrap().clone());
427                    }
428                }
429            }
430        }
431
432        StorageManager::new_or_get_delta_mpt(
433            self.clone(),
434            snapshot_epoch_id,
435            &mut *self.snapshot_associated_mpts_by_epoch.write(),
436        )
437    }
438
439    pub fn get_intermediate_mpt(
440        &self, snapshot_epoch_id: &EpochId,
441    ) -> Result<Option<Arc<DeltaMpt>>> {
442        match self
443            .snapshot_associated_mpts_by_epoch
444            .read()
445            .get(snapshot_epoch_id)
446        {
447            None => bail!(Error::DeltaMPTEntryNotFound),
448            Some(mpts) => Ok(mpts.0.clone()),
449        }
450    }
451
452    /// Return the existing delta mpt if the delta mpt already exists.
453    pub fn new_or_get_delta_mpt(
454        storage_manager: Arc<StorageManager>, snapshot_epoch_id: &EpochId,
455        snapshot_associated_mpts_mut: &mut HashMap<
456            EpochId,
457            (Option<Arc<DeltaMpt>>, Option<Arc<DeltaMpt>>),
458        >,
459    ) -> Result<Arc<DeltaMpt>> {
460        // Don't hold the lock while doing db io.
461        // If the DeltaMpt already exists, the empty delta db creation should
462        // fail already.
463
464        let mut maybe_snapshot_entry =
465            snapshot_associated_mpts_mut.get_mut(snapshot_epoch_id);
466        if maybe_snapshot_entry.is_none() {
467            bail!(Error::SnapshotNotFound);
468        };
469        // DeltaMpt already exists
470        if maybe_snapshot_entry.as_ref().unwrap().1.is_some() {
471            return Ok(maybe_snapshot_entry
472                .unwrap()
473                .1
474                .as_ref()
475                .unwrap()
476                .clone());
477        } else {
478            let mpt_id = storage_manager.delta_mpts_id_gen.lock().allocate()?;
479            let db_result = storage_manager
480                .delta_mpt_open_db_lru
481                .create(&snapshot_epoch_id, mpt_id);
482            if db_result.is_err() {
483                storage_manager.delta_mpts_id_gen.lock().free(mpt_id);
484                db_result?;
485            }
486            let arc_delta_mpt = Arc::new(DeltaMpt::new(
487                storage_manager.delta_mpt_open_db_lru.clone(),
488                snapshot_epoch_id.clone(),
489                storage_manager.clone(),
490                mpt_id,
491                storage_manager.delta_mpts_node_memory_manager.clone(),
492            )?);
493
494            maybe_snapshot_entry.as_mut().unwrap().1 =
495                Some(arc_delta_mpt.clone());
496            // For Genesis snapshot, the intermediate MPT is the same as the
497            // delta MPT.
498            if snapshot_epoch_id.eq(&NULL_EPOCH) {
499                maybe_snapshot_entry.unwrap().0 = Some(arc_delta_mpt.clone());
500            }
501
502            return Ok(arc_delta_mpt);
503        }
504    }
505
506    /// The methods clean up Delta DB when dropping an Delta MPT.
507    /// It silently finishes and in case of error, it keeps the error
508    /// and raise it later on.
509    fn release_delta_mpt_actions_in_drop(
510        &self, snapshot_epoch_id: &EpochId, delta_mpt_id: DeltaMptId,
511    ) {
512        debug!(
513            "release_delta_mpt_actions_in_drop: snapshot_epoch_id: {:?}, delta_mpt_id: {}",
514            snapshot_epoch_id, delta_mpt_id
515        );
516        self.delta_mpts_node_memory_manager
517            .delete_mpt_from_cache(delta_mpt_id);
518        self.delta_mpt_open_db_lru.release(delta_mpt_id, true);
519        self.delta_mpts_id_gen.lock().free(delta_mpt_id);
520        self.maybe_db_errors.set_maybe_error(
521            self.delta_db_manager
522                .destroy_delta_db(
523                    &self.delta_db_manager.get_delta_db_name(snapshot_epoch_id),
524                )
525                .err(),
526        );
527    }
528
529    fn release_delta_mpts_from_snapshot(
530        &self,
531        snapshot_associated_mpts_by_epoch: &mut HashMap<
532            EpochId,
533            (Option<Arc<DeltaMpt>>, Option<Arc<DeltaMpt>>),
534        >,
535        snapshot_epoch_id: &EpochId,
536    ) -> Result<()> {
537        // Release
538        snapshot_associated_mpts_by_epoch.remove(snapshot_epoch_id);
539        self.maybe_db_errors.take_result()
540    }
541
542    pub fn check_make_register_snapshot_background(
543        this: Arc<Self>, snapshot_epoch_id: EpochId, height: u64,
544        maybe_delta_db: Option<DeltaMptIterator>,
545        recover_mpt_during_construct_pivot_state: bool,
546    ) -> Result<()> {
547        let this_cloned = this.clone();
548        let mut in_progress_snapshotting_tasks =
549            this_cloned.in_progress_snapshotting_tasks.write();
550
551        let mut recover_mpt_with_kv_snapshot_exist = false;
552        if !in_progress_snapshotting_tasks.contains_key(&snapshot_epoch_id)
553            && this
554                .snapshot_info_map_by_epoch
555                .read()
556                .get(&snapshot_epoch_id)
557                .map_or(true, |info| {
558                    if info.snapshot_info_kept_to_provide_sync
559                        == SnapshotKeptToProvideSyncStatus::InfoOnly
560                    {
561                        true
562                    } else {
563                        recover_mpt_with_kv_snapshot_exist =
564                            recover_mpt_during_construct_pivot_state;
565                        recover_mpt_during_construct_pivot_state
566                    }
567                })
568        {
569            debug!(
570                "start check_make_register_snapshot_background: epoch={:?} height={:?}",
571                snapshot_epoch_id, height
572            );
573
574            let mut pivot_chain_parts = vec![
575                Default::default();
576                this.storage_conf.consensus_param.snapshot_epoch_count
577                    as usize
578            ];
579            // Calculate pivot chain parts.
580            let mut epoch_id = snapshot_epoch_id.clone();
581            let mut delta_height =
582                this.storage_conf.consensus_param.snapshot_epoch_count as usize
583                    - 1;
584            pivot_chain_parts[delta_height] = epoch_id.clone();
585            // TODO Handle the special cases better
586            let parent_snapshot_epoch_id = if maybe_delta_db.is_none() {
587                // The case maybe_delta_db.is_none() means we are at height 0.
588                // We set parent_snapshot of NULL to NULL, so that in
589                // register_new_snapshot we will move the initial
590                // delta_mpt to intermediate_mpt for NULL_EPOCH
591                //
592                NULL_EPOCH
593            } else {
594                let delta_db = maybe_delta_db.as_ref().unwrap();
595                while delta_height > 0 {
596                    epoch_id = match delta_db.mpt.get_parent_epoch(&epoch_id)? {
597                        None => bail!(Error::DbValueError),
598                        Some(epoch_id) => epoch_id,
599                    };
600                    delta_height -= 1;
601                    pivot_chain_parts[delta_height] = epoch_id.clone();
602                    trace!(
603                        "check_make_register_snapshot_background: parent epoch_id={:?}",
604                        epoch_id
605                    );
606                }
607                if height
608                    == this.storage_conf.consensus_param.snapshot_epoch_count
609                        as u64
610                {
611                    // We need the case height == SNAPSHOT_EPOCHS_CAPACITY
612                    // because the snapshot_info for genesis is
613                    // stored in NULL_EPOCH. If we do not use the special case,
614                    // it will be the epoch_id of genesis.
615                    NULL_EPOCH
616                } else {
617                    delta_db.mpt.get_parent_epoch(&epoch_id)?.unwrap()
618                }
619            };
620
621            let in_progress_snapshot_info = SnapshotInfo {
622                snapshot_info_kept_to_provide_sync: Default::default(),
623                serve_one_step_sync: true,
624                height: height as u64,
625                parent_snapshot_height: height
626                    - this.storage_conf.consensus_param.snapshot_epoch_count
627                        as u64,
628                // This is unknown for now, and we don't care.
629                merkle_root: Default::default(),
630                parent_snapshot_epoch_id,
631                pivot_chain_parts,
632            };
633
634            let parent_snapshot_epoch_id_cloned =
635                in_progress_snapshot_info.parent_snapshot_epoch_id.clone();
636            let mut in_progress_snapshot_info_cloned =
637                in_progress_snapshot_info.clone();
638            let task_finished_sender_cloned =
639                this.in_progress_snapshot_finish_signaler.clone();
640            let thread_handle = thread::Builder::new()
641                .name("Background Snapshotting".into()).spawn(move || {
642                // TODO: add support for cancellation and io throttling.
643                let f = || -> Result<()> {
644                    let (mut snapshot_info_map_locked, new_snapshot_info) = match maybe_delta_db {
645                        None => {
646                            in_progress_snapshot_info_cloned.merkle_root = MERKLE_NULL_NODE;
647                            (this.snapshot_info_map_by_epoch.write(), in_progress_snapshot_info_cloned)
648                        }
649                        Some(delta_db) => {
650                            this.snapshot_manager
651                                .get_snapshot_db_manager()
652                                .new_snapshot_by_merging(
653                                    &parent_snapshot_epoch_id_cloned,
654                                    snapshot_epoch_id.clone(), delta_db,
655                                    in_progress_snapshot_info_cloned,
656                                    &this.snapshot_info_map_by_epoch,
657                                    height,
658                                    recover_mpt_with_kv_snapshot_exist)?
659                        }
660                    };
661                    if let Err(e) = this.register_new_snapshot(new_snapshot_info.clone(), &mut snapshot_info_map_locked) {
662                        error!(
663                            "Failed to register new snapshot {:?} {:?}.",
664                            snapshot_epoch_id, new_snapshot_info
665                        );
666                        bail!(e);
667                    }
668
669                    task_finished_sender_cloned.lock().send(Some(snapshot_epoch_id))
670                        .or(Err(Error::from(Error::MpscError)))?;
671                    drop(snapshot_info_map_locked);
672
673                    let debug_snapshot_checkers =
674                        this.storage_conf.debug_snapshot_checker_threads;
675                    for snapshot_checker in 0..debug_snapshot_checkers {
676                        let begin_range =
677                            (256 / debug_snapshot_checkers * snapshot_checker) as u8;
678                        let end_range =
679                            256 / debug_snapshot_checkers * (snapshot_checker + 1);
680                        let end_range_excl = if end_range != 256 {
681                            Some(vec![end_range as u8])
682                        } else {
683                            None
684                        };
685                        let this = this.clone();
686                        thread::Builder::new().name(
687                            format!("snapshot checker {} - {}", begin_range, end_range)).spawn(
688                            move || -> Result<()> {
689                                debug!(
690                                    "Start snapshot checker {} of {}",
691                                    snapshot_checker, debug_snapshot_checkers);
692                                let snapshot_db = this.snapshot_manager
693                                    .get_snapshot_by_epoch_id(
694                                        &snapshot_epoch_id,
695                                        /* try_open = */ false,
696                                        true
697                                    )?.unwrap();
698                                let mut set_keys_iter =
699                                    snapshot_db.dumped_delta_kv_set_keys_iterator()?;
700                                let mut delete_keys_iter =
701                                    snapshot_db.dumped_delta_kv_delete_keys_iterator()?;
702                                let previous_snapshot_db = this.snapshot_manager
703                                    .get_snapshot_by_epoch_id(
704                                        &parent_snapshot_epoch_id_cloned,
705                                        /* try_open = */ false,
706                                        false
707                                    )?.unwrap();
708                                let mut previous_set_keys_iter = previous_snapshot_db
709                                    .dumped_delta_kv_set_keys_iterator()?;
710                                let mut previous_delete_keys_iter =
711                                    previous_snapshot_db
712                                        .dumped_delta_kv_delete_keys_iterator()?;
713
714                                let mut checker_count = 0;
715
716                                let set_iter = set_keys_iter.iter_range(
717                                    &[begin_range],
718                                    end_range_excl.as_ref().map(|v| &**v))?
719                                    .take();
720                                checker_count += check_key_value_load(&snapshot_db, set_iter, /* check_value = */ true)?;
721
722                                let set_iter = previous_set_keys_iter.iter_range(
723                                    &[begin_range], end_range_excl.as_ref().map(|v| &**v))?
724                                    .take();
725                                checker_count += check_key_value_load(&snapshot_db, set_iter, /* check_value = */ false)?;
726
727                                let delete_iter = delete_keys_iter.iter_range(
728                                    &[begin_range], end_range_excl.as_ref().map(|v| &**v))?
729                                    .take();
730                                checker_count += check_key_value_load(&snapshot_db, delete_iter, /* check_value = */ false)?;
731
732                                let delete_iter = previous_delete_keys_iter.iter_range(
733                                    &[begin_range], end_range_excl.as_ref().map(|v| &**v))?
734                                    .take();
735                                checker_count += check_key_value_load(&snapshot_db, delete_iter, /* check_value = */ false)?;
736
737                                debug!(
738                                    "Finished: snapshot checker {} of {}, {} keys",
739                                    snapshot_checker, debug_snapshot_checkers, checker_count);
740                                Ok(())
741                            }
742                        )?;
743                    }
744
745                    Ok(())
746                };
747
748                let task_result = f();
749                if task_result.is_err() {
750                    warn!(
751                        "Failed to create snapshot for epoch_id {:?} with error {:?}",
752                        snapshot_epoch_id, task_result.as_ref().unwrap_err());
753                }
754
755                task_result
756            })?;
757
758            in_progress_snapshotting_tasks.insert(
759                snapshot_epoch_id,
760                Arc::new(RwLock::new(InProgressSnapshotTask {
761                    snapshot_info: in_progress_snapshot_info,
762                    thread_handle: Some(thread_handle),
763                })),
764            );
765        }
766
767        Ok(())
768    }
769
770    /// This function is made public only for testing.
771    pub fn register_new_snapshot(
772        self: &Arc<Self>, new_snapshot_info: SnapshotInfo,
773        snapshot_info_map_locked: &mut PersistedSnapshotInfoMap,
774    ) -> Result<()> {
775        debug!("register_new_snapshot: info={:?}", new_snapshot_info);
776        let snapshot_epoch_id = new_snapshot_info.get_snapshot_epoch_id();
777        // Register intermediate MPT for the new snapshot.
778        let mut snapshot_associated_mpts_locked =
779            self.snapshot_associated_mpts_by_epoch.write();
780        let in_recover_mode =
781            snapshot_associated_mpts_locked.contains_key(snapshot_epoch_id);
782
783        // Parent's delta mpt becomes intermediate_delta_mpt for the new
784        // snapshot.
785        //
786        // It can't happen when the parent's delta mpt is still empty we
787        // are already making the snapshot.
788        //
789        // But when we synced a new snapshot, the parent snapshot may not be
790        // available at all, so when maybe_intermediate_delta_mpt is empty,
791        // create it.
792        let maybe_intermediate_delta_mpt = match snapshot_associated_mpts_locked
793            .get(&new_snapshot_info.parent_snapshot_epoch_id)
794        {
795            None => {
796                // The case when we synced a new snapshot and the parent
797                // snapshot isn't available.
798                snapshot_associated_mpts_locked.insert(
799                    new_snapshot_info.parent_snapshot_epoch_id.clone(),
800                    (None, None),
801                );
802                let parent_delta_mpt =
803                    Some(StorageManager::new_or_get_delta_mpt(
804                        self.clone(),
805                        &new_snapshot_info.parent_snapshot_epoch_id,
806                        &mut *snapshot_associated_mpts_locked,
807                    )?);
808                snapshot_associated_mpts_locked
809                    .remove(&new_snapshot_info.parent_snapshot_epoch_id);
810
811                parent_delta_mpt
812            }
813            Some(parent_snapshot_associated_mpts) => {
814                if parent_snapshot_associated_mpts.1.is_none() {
815                    debug!("MPT for parent_snapshot_epoch_id is none");
816                    Some(StorageManager::new_or_get_delta_mpt(
817                        self.clone(),
818                        &new_snapshot_info.parent_snapshot_epoch_id,
819                        &mut *snapshot_associated_mpts_locked,
820                    )?)
821                } else {
822                    parent_snapshot_associated_mpts.1.clone()
823                }
824            }
825        };
826        let delta_mpt = if in_recover_mode {
827            snapshot_associated_mpts_locked
828                .get_mut(snapshot_epoch_id)
829                // This is guaranteed in the in_recover_mode condition above.
830                .unwrap()
831                .1
832                .take()
833        } else {
834            None
835        };
836        if !in_recover_mode || maybe_intermediate_delta_mpt.is_some() {
837            snapshot_associated_mpts_locked.insert(
838                snapshot_epoch_id.clone(),
839                (maybe_intermediate_delta_mpt, delta_mpt),
840            );
841        }
842
843        drop(snapshot_associated_mpts_locked);
844        snapshot_info_map_locked
845            .insert(snapshot_epoch_id, new_snapshot_info.clone())?;
846        if !in_recover_mode {
847            self.current_snapshots.write().push(new_snapshot_info);
848        }
849
850        Ok(())
851    }
852
853    pub fn maintain_state_confirmed<ConsensusInner: StateMaintenanceTrait>(
854        &self, consensus_inner: &ConsensusInner, stable_checkpoint_height: u64,
855        era_epoch_count: u64, confirmed_height: u64,
856        state_availability_boundary: &RwLock<StateAvailabilityBoundary>,
857    ) -> Result<()> {
858        let additional_state_height_gap =
859            (self.storage_conf.additional_maintained_snapshot_count
860                * self.get_snapshot_epoch_count()) as u64;
861        let maintained_state_height_lower_bound =
862            if confirmed_height > additional_state_height_gap {
863                confirmed_height - additional_state_height_gap
864            } else {
865                0
866            };
867        if maintained_state_height_lower_bound
868            <= state_availability_boundary.read().lower_bound
869        {
870            return Ok(());
871        }
872        let maintained_epoch_id = consensus_inner
873            .get_pivot_hash_from_epoch_number(
874                maintained_state_height_lower_bound,
875            )?;
876        let maintained_epoch_execution_commitment = consensus_inner
877            .get_epoch_execution_commitment_with_db(&maintained_epoch_id);
878        let maintained_state_root = match &maintained_epoch_execution_commitment
879        {
880            Some(commitment) => &commitment.state_root_with_aux_info,
881            None => return Ok(()),
882        };
883
884        self.maintain_snapshots_pivot_chain_confirmed(
885            maintained_state_height_lower_bound,
886            &maintained_epoch_id,
887            maintained_state_root,
888            state_availability_boundary,
889            &|height, find_nearest_snapshot_multiple_of| {
890                extra_snapshots_to_keep_predicate(
891                    &self.storage_conf,
892                    stable_checkpoint_height,
893                    era_epoch_count,
894                    height,
895                    find_nearest_snapshot_multiple_of,
896                )
897            },
898            stable_checkpoint_height,
899        )
900    }
901
902    /// The algorithm figure out which snapshot to remove by simply going
903    /// through all SnapshotInfo in one pass in the reverse order such that
904    /// the parent snapshot is processed after the children snapshot.
905    ///
906    /// In the scan, pivot chain is traced from the confirmed snapshot. Whatever
907    /// can't be traced shall be removed as non-pivot snapshot. Traced
908    /// old pivot snapshot shall be deleted as well.
909    ///
910    /// Another maintenance of snapshots shall happen at Conflux start-up and
911    /// after pivot chain is recognized.
912    ///
913    /// The behavior of old pivot snapshot deletion can be different between
914    /// Archive Node and Full Node.
915    pub fn maintain_snapshots_pivot_chain_confirmed(
916        &self, maintained_state_height_lower_bound: u64,
917        maintained_epoch_id: &EpochId,
918        maintained_state_root: &StateRootWithAuxInfo,
919        state_availability_boundary: &RwLock<StateAvailabilityBoundary>,
920        extra_snapshots_to_keep: &dyn Fn(u64, &mut bool) -> bool,
921        stable_checkpoint_height: u64,
922    ) -> Result<()> {
923        // Update the confirmed epoch id. Skip remaining actions when the
924        // confirmed snapshot-able epoch id doesn't change
925        {
926            let mut last_confirmed_snapshottable_id_locked =
927                self.last_confirmed_snapshottable_epoch_id.lock();
928            if last_confirmed_snapshottable_id_locked.is_some() {
929                if maintained_state_root.aux_info.intermediate_epoch_id.eq(
930                    last_confirmed_snapshottable_id_locked.as_ref().unwrap(),
931                ) {
932                    return Ok(());
933                }
934            }
935            *last_confirmed_snapshottable_id_locked = Some(
936                maintained_state_root.aux_info.intermediate_epoch_id.clone(),
937            );
938        }
939
940        let confirmed_intermediate_height = maintained_state_height_lower_bound
941            - StateIndex::height_to_delta_height(
942                maintained_state_height_lower_bound,
943                self.get_snapshot_epoch_count(),
944            ) as u64;
945
946        let confirmed_snapshot_height = if confirmed_intermediate_height
947            > self.get_snapshot_epoch_count() as u64
948        {
949            confirmed_intermediate_height
950                - self.get_snapshot_epoch_count() as u64
951        } else {
952            0
953        };
954        let first_available_state_height = if confirmed_snapshot_height > 0 {
955            confirmed_snapshot_height + 1
956        } else {
957            0
958        };
959
960        debug!(
961            "maintain_snapshots_pivot_chain_confirmed: confirmed_height {}, \
962             confirmed_epoch_id {:?}, confirmed_intermediate_id {:?}, \
963             confirmed_snapshot_id {:?}, confirmed_intermediate_height {}, \
964             confirmed_snapshot_height {}, first_available_state_height {}",
965            maintained_state_height_lower_bound,
966            maintained_epoch_id,
967            maintained_state_root.aux_info.intermediate_epoch_id,
968            maintained_state_root.aux_info.snapshot_epoch_id,
969            confirmed_intermediate_height,
970            confirmed_snapshot_height,
971            first_available_state_height,
972        );
973        let mut extra_snapshot_infos_kept_for_sync = vec![];
974        let mut non_pivot_snapshots_to_remove = HashSet::new();
975        let mut old_pivot_snapshots_to_remove = vec![];
976        // We will keep some extra snapshots to provide sync. For any snapshot
977        // to keep, we must keep all snapshot_info from the pivot tip to
978        // the snapshot, so that in the next run the snapshot is still
979        // recognized as "old pivot".
980        let mut old_pivot_snapshot_infos_to_remove = vec![];
981        let mut find_nearest_multiple_of = false;
982        let mut in_progress_snapshot_to_cancel = vec![];
983
984        {
985            let current_snapshots = self.current_snapshots.read();
986
987            let mut prev_snapshot_epoch_id = &NULL_EPOCH;
988
989            // Check snapshots which has height lower than confirmed_height
990            for snapshot_info in current_snapshots.iter().rev() {
991                let snapshot_epoch_id = snapshot_info.get_snapshot_epoch_id();
992                if snapshot_info.height == confirmed_snapshot_height {
993                    // Remove all non-pivot Snapshot at
994                    // confirmed_snapshot_height
995                    if snapshot_epoch_id
996                        .eq(&maintained_state_root.aux_info.snapshot_epoch_id)
997                    {
998                        prev_snapshot_epoch_id =
999                            &snapshot_info.parent_snapshot_epoch_id;
1000                    } else {
1001                        non_pivot_snapshots_to_remove
1002                            .insert(snapshot_epoch_id.clone());
1003                    }
1004                } else if snapshot_info.height < confirmed_snapshot_height {
1005                    // We remove for older pivot snapshot one after another.
1006                    if snapshot_epoch_id.eq(prev_snapshot_epoch_id) {
1007                        if extra_snapshots_to_keep(
1008                            snapshot_info.height,
1009                            &mut find_nearest_multiple_of,
1010                        ) {
1011                            // For any snapshot to keep, we keep all snapshot
1012                            // infos from pivot tip to it.
1013                            for snapshot_epoch_id_to_keep_info in std::mem::take(
1014                                &mut old_pivot_snapshot_infos_to_remove,
1015                            ) {
1016                                extra_snapshot_infos_kept_for_sync.push((
1017                                    snapshot_epoch_id_to_keep_info,
1018                                    SnapshotKeptToProvideSyncStatus::InfoOnly,
1019                                ));
1020                            }
1021                            extra_snapshot_infos_kept_for_sync
1022                                .push((snapshot_epoch_id.clone(), SnapshotKeptToProvideSyncStatus::InfoAndSnapshot));
1023                        } else {
1024                            // Retain the snapshot information for the one
1025                            // preceding the stable checkpoint
1026                            if snapshot_info.height
1027                                + self
1028                                    .storage_conf
1029                                    .consensus_param
1030                                    .snapshot_epoch_count
1031                                    as u64
1032                                != stable_checkpoint_height
1033                            {
1034                                old_pivot_snapshot_infos_to_remove
1035                                    .push(snapshot_epoch_id.clone());
1036                            }
1037                            old_pivot_snapshots_to_remove
1038                                .push(snapshot_epoch_id.clone());
1039                        }
1040                        prev_snapshot_epoch_id =
1041                            &snapshot_info.parent_snapshot_epoch_id;
1042                    } else {
1043                        // Any other snapshot with higher height is non-pivot.
1044                        non_pivot_snapshots_to_remove
1045                            .insert(snapshot_epoch_id.clone());
1046                    }
1047                } else if snapshot_info.height
1048                    < maintained_state_height_lower_bound
1049                {
1050                    // There can be at most 1 snapshot between the snapshot at
1051                    // confirmed_snapshot_height and confirmed_height.
1052                    //
1053                    // When a snapshot has height > confirmed_snapshot_height,
1054                    // but doesn't contain confirmed_state_root.aux_info.
1055                    // intermediate_epoch_id, it must be a non-pivot fork.
1056                    if snapshot_info
1057                        .get_epoch_id_at_height(confirmed_intermediate_height)
1058                        != Some(
1059                            &maintained_state_root
1060                                .aux_info
1061                                .intermediate_epoch_id,
1062                        )
1063                    {
1064                        debug!(
1065                            "remove mismatch intermediate snapshot: {:?}",
1066                            snapshot_info.get_epoch_id_at_height(
1067                                confirmed_intermediate_height
1068                            )
1069                        );
1070                        non_pivot_snapshots_to_remove
1071                            .insert(snapshot_epoch_id.clone());
1072                    }
1073                }
1074            }
1075
1076            debug!(
1077                "finished scanning for lower snapshots: \
1078                 old_pivot_snapshots_to_remove {:?}, \
1079                 old_pivot_snapshot_infos_to_remove {:?}, \
1080                 non_pivot_snapshots_to_remove {:?}",
1081                old_pivot_snapshots_to_remove,
1082                old_pivot_snapshot_infos_to_remove,
1083                non_pivot_snapshots_to_remove
1084            );
1085
1086            // Check snapshots which has height >= confirmed_height
1087            for snapshot_info in &*current_snapshots {
1088                // Check for non-pivot snapshot to remove.
1089                match snapshot_info
1090                    .get_epoch_id_at_height(maintained_state_height_lower_bound)
1091                {
1092                    Some(path_epoch_id) => {
1093                        // Check if the snapshot is within
1094                        // confirmed_epoch's
1095                        // subtree.
1096                        if path_epoch_id != maintained_epoch_id {
1097                            debug!(
1098                                "remove non-subtree snapshot {:?}, got {:?}, expected {:?}",
1099                                snapshot_info.get_snapshot_epoch_id(),
1100                                path_epoch_id, maintained_epoch_id,
1101                            );
1102                            non_pivot_snapshots_to_remove.insert(
1103                                snapshot_info.get_snapshot_epoch_id().clone(),
1104                            );
1105                        }
1106                    }
1107                    None => {
1108                        // The snapshot is so deep that we have to check its
1109                        // parent to see if it's within confirmed_epoch's
1110                        // subtree.
1111                        if non_pivot_snapshots_to_remove
1112                            .contains(&snapshot_info.parent_snapshot_epoch_id)
1113                        {
1114                            debug!(
1115                                "remove non-subtree deep snapshot {:?}, parent_snapshot_epoch_id {:?}",
1116                                snapshot_info.get_snapshot_epoch_id(),
1117                                snapshot_info.parent_snapshot_epoch_id
1118                            );
1119                            // The snapshot may already exist. This is why we
1120                            // must use HashSet for
1121                            // non_pivot_snapshots_to_remove.
1122                            non_pivot_snapshots_to_remove.insert(
1123                                snapshot_info.get_snapshot_epoch_id().clone(),
1124                            );
1125                        }
1126                    }
1127                }
1128            }
1129        }
1130
1131        for (in_progress_epoch_id, in_progress_snapshot_task) in
1132            &*self.in_progress_snapshotting_tasks.read()
1133        {
1134            let mut to_cancel = false;
1135            let in_progress_snapshot_info =
1136                &in_progress_snapshot_task.read().snapshot_info;
1137
1138            // The logic is similar as above for snapshot deletion.
1139            if in_progress_snapshot_info.height < confirmed_intermediate_height
1140            {
1141                to_cancel = true;
1142            } else if in_progress_snapshot_info.height
1143                < maintained_state_height_lower_bound
1144            {
1145                if in_progress_snapshot_info
1146                    .get_epoch_id_at_height(confirmed_intermediate_height)
1147                    != Some(
1148                        &maintained_state_root.aux_info.intermediate_epoch_id,
1149                    )
1150                {
1151                    to_cancel = true;
1152                }
1153            } else {
1154                match in_progress_snapshot_info
1155                    .get_epoch_id_at_height(maintained_state_height_lower_bound)
1156                {
1157                    Some(path_epoch_id) => {
1158                        if path_epoch_id != maintained_epoch_id {
1159                            to_cancel = true;
1160                        }
1161                    }
1162                    None => {
1163                        if non_pivot_snapshots_to_remove.contains(
1164                            &in_progress_snapshot_info.parent_snapshot_epoch_id,
1165                        ) {
1166                            to_cancel = true;
1167                        }
1168                    }
1169                }
1170            }
1171
1172            if to_cancel {
1173                in_progress_snapshot_to_cancel
1174                    .push(in_progress_epoch_id.clone())
1175            }
1176        }
1177
1178        let mut non_pivot_snapshots_to_remove =
1179            non_pivot_snapshots_to_remove.drain().collect();
1180        // Update snapshot_infos and filter out already removed snapshots from
1181        // the removal lists.
1182        {
1183            let mut info_maps = self.snapshot_info_map_by_epoch.write();
1184            let removal_filter = |vec: &mut Vec<EpochId>| {
1185                vec.retain(|epoch| {
1186                    info_maps.get(epoch).map_or(true, |info| {
1187                        // The snapshot itself is already removed.
1188                        info.snapshot_info_kept_to_provide_sync
1189                            != SnapshotKeptToProvideSyncStatus::InfoOnly
1190                    })
1191                })
1192            };
1193            removal_filter(&mut non_pivot_snapshots_to_remove);
1194            removal_filter(&mut old_pivot_snapshots_to_remove);
1195
1196            let mut updated_snapshot_info_epochs =
1197                HashMap::<EpochId, SnapshotKeptToProvideSyncStatus>::default();
1198            for (epoch, new_status) in &extra_snapshot_infos_kept_for_sync {
1199                if let Some(info) = info_maps.get(epoch) {
1200                    if info.snapshot_info_kept_to_provide_sync != *new_status {
1201                        let mut new_snapshot_info = info.clone();
1202                        new_snapshot_info.snapshot_info_kept_to_provide_sync =
1203                            *new_status;
1204                        info_maps.insert(epoch, new_snapshot_info)?;
1205                        updated_snapshot_info_epochs
1206                            .insert(*epoch, *new_status);
1207                    }
1208                }
1209            }
1210            if updated_snapshot_info_epochs.len() > 0 {
1211                let mut current_snapshots = self.current_snapshots.write();
1212                for snapshot_info in current_snapshots.iter_mut() {
1213                    if let Some(new_status) = updated_snapshot_info_epochs
1214                        .get(&snapshot_info.get_snapshot_epoch_id())
1215                    {
1216                        snapshot_info.snapshot_info_kept_to_provide_sync =
1217                            *new_status;
1218                    }
1219                }
1220            }
1221        }
1222        if !non_pivot_snapshots_to_remove.is_empty()
1223            || !old_pivot_snapshots_to_remove.is_empty()
1224        {
1225            {
1226                // TODO: Archive node may do something different.
1227                let state_boundary = &mut *state_availability_boundary.write();
1228                if first_available_state_height > state_boundary.lower_bound {
1229                    state_boundary
1230                        .adjust_lower_bound(first_available_state_height);
1231                }
1232            }
1233
1234            self.remove_snapshots(
1235                &old_pivot_snapshots_to_remove,
1236                &non_pivot_snapshots_to_remove,
1237                &old_pivot_snapshot_infos_to_remove
1238                    .iter()
1239                    .chain(non_pivot_snapshots_to_remove.iter())
1240                    .cloned()
1241                    .collect(),
1242            )?;
1243        }
1244
1245        // TODO: implement in_progress_snapshot cancellation.
1246        /*
1247        if !in_progress_snapshot_to_cancel.is_empty() {
1248            let mut in_progress_snapshotting_locked =
1249                self.in_progress_snapshotting_tasks.write();
1250            for epoch_id in in_progress_snapshot_to_cancel {
1251                unimplemented!();
1252            }
1253        }
1254        */
1255
1256        info!("maintain_snapshots_pivot_chain_confirmed: finished");
1257        Ok(())
1258    }
1259
1260    fn remove_snapshots(
1261        &self, old_pivot_snapshots_to_remove: &[EpochId],
1262        non_pivot_snapshots_to_remove: &[EpochId],
1263        snapshot_infos_to_remove: &HashSet<EpochId>,
1264    ) -> Result<()> {
1265        let mut current_snapshots_locked = self.current_snapshots.write();
1266        current_snapshots_locked.retain(|x| {
1267            !snapshot_infos_to_remove.contains(x.get_snapshot_epoch_id())
1268        });
1269        info!(
1270            "maintain_snapshots_pivot_chain_confirmed: remove the following snapshot infos {:?}",
1271            snapshot_infos_to_remove,
1272        );
1273        for snapshot_epoch_id in old_pivot_snapshots_to_remove {
1274            self.snapshot_manager
1275                .remove_old_pivot_snapshot(&snapshot_epoch_id)?;
1276        }
1277        for snapshot_epoch_id in non_pivot_snapshots_to_remove {
1278            self.snapshot_manager
1279                .remove_non_pivot_snapshot(&snapshot_epoch_id)?;
1280        }
1281
1282        drop(current_snapshots_locked);
1283        unsafe {
1284            let mut snapshot_info_map = self.snapshot_info_map_by_epoch.write();
1285            for snapshot_epoch_id in snapshot_infos_to_remove {
1286                snapshot_info_map.remove_in_mem_only(snapshot_epoch_id);
1287            }
1288        }
1289        {
1290            let snapshot_associated_mpts_by_epoch_locked =
1291                &mut *self.snapshot_associated_mpts_by_epoch.write();
1292
1293            for snapshot_epoch_id in old_pivot_snapshots_to_remove
1294                .iter()
1295                .chain(non_pivot_snapshots_to_remove.iter())
1296            {
1297                self.release_delta_mpts_from_snapshot(
1298                    snapshot_associated_mpts_by_epoch_locked,
1299                    snapshot_epoch_id,
1300                )?
1301            }
1302        }
1303        {
1304            // Only remove snapshot_info from db when no exception have
1305            // happened.
1306            let mut snapshot_info_map_by_epoch =
1307                self.snapshot_info_map_by_epoch.write();
1308            for snapshot_epoch_id in snapshot_infos_to_remove {
1309                snapshot_info_map_by_epoch.remove(&snapshot_epoch_id)?;
1310            }
1311        }
1312
1313        Ok(())
1314    }
1315
1316    pub fn log_usage(&self) {
1317        let mut delta_mpts = HashMap::new();
1318        for (_snapshot_epoch_id, associated_delta_mpts) in
1319            &*self.snapshot_associated_mpts_by_epoch.read()
1320        {
1321            if let Some(delta_mpt) = associated_delta_mpts.0.as_ref() {
1322                delta_mpts.insert(delta_mpt.get_mpt_id(), delta_mpt.clone());
1323            }
1324            if let Some(delta_mpt) = associated_delta_mpts.1.as_ref() {
1325                delta_mpts.insert(delta_mpt.get_mpt_id(), delta_mpt.clone());
1326            }
1327        }
1328        if let Some((_mpt_id, delta_mpt)) = delta_mpts.iter().next() {
1329            delta_mpt.log_usage();
1330
1331            // Now delta_mpt calls log_usage of the singleton
1332            // node_memory_manager, so there is no need to log_usage
1333            // on second delta_mpt.
1334        }
1335    }
1336
1337    pub fn load_persist_state(self: &Arc<Self>) -> Result<()> {
1338        let snapshot_info_map = &mut *self.snapshot_info_map_by_epoch.write();
1339
1340        // Always keep the information for genesis snapshot.
1341        self.snapshot_associated_mpts_by_epoch
1342            .write()
1343            .insert(NULL_EPOCH, (None, None));
1344        snapshot_info_map
1345            .insert(&NULL_EPOCH, SnapshotInfo::genesis_snapshot_info())?;
1346        self.current_snapshots
1347            .write()
1348            .push(SnapshotInfo::genesis_snapshot_info());
1349
1350        // Persist state loaded.
1351        let snapshot_persist_state = self
1352            .snapshot_manager
1353            .get_snapshot_db_manager()
1354            .scan_persist_state(snapshot_info_map.get_map())?;
1355
1356        debug!("snapshot persist state {:?}", snapshot_persist_state);
1357
1358        *self.persist_state_from_initialization.write() = Some((
1359            snapshot_persist_state.temp_snapshot_db_existing,
1360            snapshot_persist_state.removed_snapshots,
1361            snapshot_persist_state.max_epoch_height,
1362            snapshot_persist_state.max_snapshot_epoch_height_has_mpt,
1363        ));
1364        self.snapshot_manager
1365            .get_snapshot_db_manager()
1366            .update_latest_snapshot_id(
1367                snapshot_persist_state.max_epoch_id,
1368                snapshot_persist_state.max_epoch_height,
1369            );
1370
1371        // Remove missing snapshots.
1372        for snapshot_epoch_id in snapshot_persist_state.missing_snapshots {
1373            if snapshot_epoch_id == NULL_EPOCH {
1374                continue;
1375            }
1376            // Remove the delta mpt if the snapshot is missing.
1377            self.delta_db_manager
1378                .destroy_delta_db(
1379                    &self
1380                        .delta_db_manager
1381                        .get_delta_db_name(&snapshot_epoch_id),
1382                )
1383                .or_else(|e| match &e {
1384                    Error::Io(io_err) => match io_err.kind() {
1385                        std::io::ErrorKind::NotFound => Ok(()),
1386                        _ => Err(e),
1387                    },
1388                    _ => Err(e),
1389                })?;
1390            snapshot_info_map.remove(&snapshot_epoch_id)?;
1391        }
1392
1393        let (missing_delta_db_snapshots, delta_dbs) = self
1394            .delta_db_manager
1395            .scan_persist_state(snapshot_info_map.get_map())?;
1396
1397        let mut delta_mpts = HashMap::new();
1398        for (snapshot_epoch_id, delta_db) in delta_dbs {
1399            let mpt_id = self.delta_mpts_id_gen.lock().allocate()?;
1400            self.delta_mpt_open_db_lru.import(
1401                &snapshot_epoch_id,
1402                mpt_id,
1403                delta_db,
1404            )?;
1405            delta_mpts.insert(
1406                snapshot_epoch_id.clone(),
1407                Arc::new(DeltaMpt::new(
1408                    self.delta_mpt_open_db_lru.clone(),
1409                    snapshot_epoch_id.clone(),
1410                    self.clone(),
1411                    mpt_id,
1412                    self.delta_mpts_node_memory_manager.clone(),
1413                )?),
1414            );
1415        }
1416
1417        for snapshot_epoch_id in missing_delta_db_snapshots {
1418            if snapshot_epoch_id == NULL_EPOCH {
1419                continue;
1420            }
1421            // Do not remove a snapshot which has intermediate delta mpt,
1422            // because it could be a freshly made snapshot before the previous
1423            // shutdown. A freshly made snapshot does not have delta db yet.
1424            if let Some(snapshot_info) =
1425                snapshot_info_map.get(&snapshot_epoch_id)
1426            {
1427                if delta_mpts
1428                    .contains_key(&snapshot_info.parent_snapshot_epoch_id)
1429                {
1430                    continue;
1431                }
1432            }
1433            error!(
1434                "Missing intermediate mpt and delta mpt for snapshot {:?}",
1435                snapshot_epoch_id
1436            );
1437            snapshot_info_map.remove(&snapshot_epoch_id)?;
1438            self.snapshot_manager
1439                .get_snapshot_db_manager()
1440                .destroy_snapshot(&snapshot_epoch_id)?;
1441        }
1442
1443        // Restore current_snapshots.
1444        let mut snapshots = snapshot_info_map
1445            .get_map()
1446            .iter()
1447            .map(|(_, snapshot_info)| snapshot_info.clone())
1448            .collect::<Vec<_>>();
1449        snapshots.sort_by(|x, y| x.height.partial_cmp(&y.height).unwrap());
1450
1451        let current_snapshots = &mut *self.current_snapshots.write();
1452        *current_snapshots = snapshots;
1453
1454        let snapshot_associated_mpts =
1455            &mut *self.snapshot_associated_mpts_by_epoch.write();
1456        for snapshot_info in current_snapshots {
1457            snapshot_associated_mpts.insert(
1458                snapshot_info.get_snapshot_epoch_id().clone(),
1459                (
1460                    delta_mpts
1461                        .get(&snapshot_info.parent_snapshot_epoch_id)
1462                        .map(|x| x.clone()),
1463                    delta_mpts
1464                        .get(snapshot_info.get_snapshot_epoch_id())
1465                        .map(|x| x.clone()),
1466                ),
1467            );
1468        }
1469
1470        Ok(())
1471    }
1472}
1473
1474fn extra_snapshots_to_keep_predicate(
1475    storage_conf: &StorageConfiguration, stable_checkpoint_height: u64,
1476    era_epoch_count: u64, height: u64,
1477    find_epoch_nearest_multiple_of: &mut bool,
1478) -> bool {
1479    for conf in &storage_conf.provide_more_snapshot_for_sync {
1480        match conf {
1481            ProvideExtraSnapshotSyncConfig::StableCheckpoint => {
1482                if height >= stable_checkpoint_height
1483                    && (height - stable_checkpoint_height) % era_epoch_count
1484                        == 0
1485                {
1486                    return true;
1487                }
1488                // The bound_height ensures that the snapshot before
1489                // stable_genesis will not be removed, so that
1490                // the execution of the epochs following
1491                // stable_genesis can go through a normal path where both
1492                // snapshot and intermediate delta mpt exist.
1493                // TODO:
1494                //  this is a corner case which should be addressed, so that we
1495                //  can don't really need the snapshot prior to the checkpoint.
1496                let check_next_snapshot_height = height
1497                    + (storage_conf.consensus_param.snapshot_epoch_count
1498                        as u64);
1499                if (check_next_snapshot_height >= stable_checkpoint_height)
1500                    && (check_next_snapshot_height - stable_checkpoint_height)
1501                        % era_epoch_count
1502                        == 0
1503                {
1504                    return storage_conf.keep_snapshot_before_stable_checkpoint;
1505                }
1506
1507                if storage_conf.keep_era_genesis_snapshot {
1508                    let era_genesis_snapshot_height =
1509                        if stable_checkpoint_height
1510                            >= storage_conf.consensus_param.era_epoch_count
1511                        {
1512                            stable_checkpoint_height
1513                                - storage_conf.consensus_param.era_epoch_count
1514                        } else {
1515                            0
1516                        };
1517
1518                    if era_genesis_snapshot_height == height {
1519                        return true;
1520                    }
1521                }
1522            }
1523            ProvideExtraSnapshotSyncConfig::EpochNearestMultipleOf(
1524                multiple,
1525            ) => {
1526                if *find_epoch_nearest_multiple_of
1527                    && height % (*multiple as u64) == 0
1528                {
1529                    *find_epoch_nearest_multiple_of = false;
1530                    return true;
1531                }
1532            }
1533        }
1534    }
1535    false
1536}
1537
1538struct MaybeDeltaTrieDestroyErrors {
1539    delta_trie_destroy_error_1: Cell<Option<Error>>,
1540    delta_trie_destroy_error_2: Cell<Option<Error>>,
1541}
1542
1543// It's only used when relevant lock has been acquired.
1544unsafe impl Sync for MaybeDeltaTrieDestroyErrors {}
1545
1546impl MaybeDeltaTrieDestroyErrors {
1547    fn new() -> Self {
1548        Self {
1549            delta_trie_destroy_error_1: Cell::new(None),
1550            delta_trie_destroy_error_2: Cell::new(None),
1551        }
1552    }
1553
1554    fn set_maybe_error(&self, e: Option<Error>) {
1555        self.delta_trie_destroy_error_2
1556            .replace(self.delta_trie_destroy_error_1.replace(e));
1557    }
1558
1559    fn take_result(&self) -> Result<()> {
1560        let e1 = self.delta_trie_destroy_error_1.take().map(|e| Box::new(e));
1561        let e2 = self.delta_trie_destroy_error_2.take().map(|e| Box::new(e));
1562        if e1.is_some() || e2.is_some() {
1563            Err(Error::DeltaMPTDestroyErrors { e1, e2 }.into())
1564        } else {
1565            Ok(())
1566        }
1567    }
1568}
1569
1570lazy_static! {
1571    static ref SNAPSHOT_KVDB_STATEMENTS: Arc<KvdbSqliteStatements> = Arc::new(
1572        KvdbSqliteStatements::make_statements(
1573            &["value"],
1574            &["BLOB"],
1575            &storage_dir::SNAPSHOT_INFO_DB_NAME,
1576            false
1577        )
1578        .unwrap()
1579    );
1580}
1581
1582use crate::{
1583    impls::{
1584        delta_mpt::{
1585            node_memory_manager::{
1586                DeltaMptsCacheAlgorithm, DeltaMptsNodeMemoryManager,
1587            },
1588            node_ref_map::DeltaMptId,
1589        },
1590        errors::*,
1591        state_manager::{DeltaDbManager, SnapshotDb, SnapshotDbManager},
1592        storage_db::{
1593            kvdb_sqlite::{
1594                kvdb_sqlite_iter_range_impl, KvdbSqliteDestructureTrait,
1595                KvdbSqliteStatements,
1596            },
1597            snapshot_kv_db_sqlite::test_lib::check_key_value_load,
1598        },
1599        storage_manager::snapshot_manager::SnapshotManager,
1600    },
1601    snapshot_manager::SnapshotManagerTrait,
1602    storage_db::{
1603        DeltaDbManagerTrait, KeyValueDbIterableTrait, SnapshotDbManagerTrait,
1604        SnapshotInfo, SnapshotKeptToProvideSyncStatus,
1605    },
1606    storage_dir,
1607    utils::guarded_value::GuardedValue,
1608    DeltaMpt, DeltaMptIdGen, DeltaMptIterator, KeyValueDbTrait, KvdbSqlite,
1609    OpenDeltaDbLru, ProvideExtraSnapshotSyncConfig, StateIndex,
1610    StateRootWithAuxInfo, StorageConfiguration,
1611};
1612use cfx_internal_common::{
1613    consensus_api::StateMaintenanceTrait, StateAvailabilityBoundary,
1614};
1615use fallible_iterator::FallibleIterator;
1616use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
1617use parking_lot::{Mutex, RwLock, RwLockReadGuard};
1618use primitives::{EpochId, MerkleHash, MERKLE_NULL_NODE, NULL_EPOCH};
1619use rlp::{Decodable, DecoderError, Encodable, Rlp};
1620use sqlite::Statement;
1621use std::{
1622    cell::Cell,
1623    collections::{HashMap, HashSet},
1624    fs,
1625    sync::{
1626        mpsc::{channel, Sender},
1627        Arc, Weak,
1628    },
1629    thread::{self, JoinHandle},
1630};