1pub struct PersistedSnapshotInfoMap {
8 snapshot_info_db: KvdbSqlite<Box<[u8]>>,
10 snapshot_info_map_by_epoch: HashMap<EpochId, SnapshotInfo>,
12}
13
14impl PersistedSnapshotInfoMap {
15 fn new(snapshot_info_db: KvdbSqlite<Box<[u8]>>) -> Result<Self> {
16 let mut result = Self {
17 snapshot_info_map_by_epoch: Default::default(),
19 snapshot_info_db,
20 };
21 result.load_persist_state()?;
22 Ok(result)
23 }
24
25 fn insert(
26 &mut self, epoch: &EpochId, snapshot_info: SnapshotInfo,
27 ) -> Result<()> {
28 let rlp_bytes = snapshot_info.rlp_bytes();
29 self.snapshot_info_map_by_epoch
30 .insert(epoch.clone(), snapshot_info);
31 self.snapshot_info_db.put(epoch.as_ref(), &rlp_bytes)?;
32 Ok(())
33 }
34
35 fn get_map(&self) -> &HashMap<EpochId, SnapshotInfo> {
36 &self.snapshot_info_map_by_epoch
37 }
38
39 fn get(&self, epoch: &EpochId) -> Option<&SnapshotInfo> {
40 self.snapshot_info_map_by_epoch.get(epoch)
41 }
42
43 fn remove(&mut self, epoch: &EpochId) -> Result<()> {
44 self.snapshot_info_map_by_epoch.remove(epoch);
45 self.snapshot_info_db.delete(epoch.as_ref())?;
46 Ok(())
47 }
48
49 unsafe fn remove_in_mem_only(
51 &mut self, epoch: &EpochId,
52 ) -> Option<SnapshotInfo> {
53 self.snapshot_info_map_by_epoch.remove(epoch)
54 }
55
56 fn load_persist_state(&mut self) -> Result<()> {
57 let (maybe_info_db_connection, statements) =
59 self.snapshot_info_db.destructure_mut();
60
61 let mut snapshot_info_iter = kvdb_sqlite_iter_range_impl(
62 maybe_info_db_connection,
63 statements,
64 &[],
65 None,
66 |row: &Statement<'_>| {
67 let key = row.read::<Vec<u8>>(0)?;
68 let value = row.read::<Vec<u8>>(1)?;
69
70 if key.len() != EpochId::len_bytes() {
71 Err(DecoderError::RlpInvalidLength.into())
72 } else {
73 Ok((
74 EpochId::from_slice(&key),
75 SnapshotInfo::decode(&Rlp::new(&value))?,
76 ))
77 }
78 },
79 )?;
80 while let Some((snapshot_epoch, snapshot_info)) =
81 snapshot_info_iter.next()?
82 {
83 self.snapshot_info_map_by_epoch
84 .insert(snapshot_epoch, snapshot_info);
85 }
86 Ok(())
87 }
88}
89
90pub struct StorageManager {
92 delta_db_manager: Arc<DeltaDbManager>,
93 delta_mpt_open_db_lru: Arc<OpenDeltaDbLru<DeltaDbManager>>,
94 snapshot_manager: Box<
95 dyn SnapshotManagerTrait<
96 SnapshotDb = SnapshotDb,
97 SnapshotDbManager = SnapshotDbManager,
98 > + Send
99 + Sync,
100 >,
101 delta_mpts_id_gen: Mutex<DeltaMptIdGen>,
102 delta_mpts_node_memory_manager: Arc<DeltaMptsNodeMemoryManager>,
103
104 maybe_db_errors: MaybeDeltaTrieDestroyErrors,
105 snapshot_associated_mpts_by_epoch: RwLock<
106 HashMap<EpochId, (Option<Arc<DeltaMpt>>, Option<Arc<DeltaMpt>>)>,
107 >,
108
109 pub in_progress_snapshotting_tasks:
113 RwLock<HashMap<EpochId, Arc<RwLock<InProgressSnapshotTask>>>>,
114 in_progress_snapshot_finish_signaler: Arc<Mutex<Sender<Option<EpochId>>>>,
115 in_progress_snapshotting_joiner: Mutex<Option<JoinHandle<()>>>,
116
117 current_snapshots: RwLock<Vec<SnapshotInfo>>,
126 pub snapshot_info_map_by_epoch: RwLock<PersistedSnapshotInfoMap>,
130
131 last_confirmed_snapshottable_epoch_id: Mutex<Option<EpochId>>,
132
133 pub storage_conf: StorageConfiguration,
134
135 pub intermediate_trie_root_merkle: RwLock<Option<MerkleHash>>,
137
138 pub persist_state_from_initialization:
139 RwLock<Option<(Option<EpochId>, HashSet<EpochId>, u64, Option<u64>)>>,
140}
141
142impl MallocSizeOf for StorageManager {
143 fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
144 let mut size = 0;
148 size += self.delta_mpts_node_memory_manager.size_of(ops);
149 size += self.snapshot_associated_mpts_by_epoch.size_of(ops);
150 size
151 }
152}
153
154pub struct DeltaDbReleaser {
157 pub storage_manager: Weak<StorageManager>,
158 pub snapshot_epoch_id: EpochId,
159 pub mpt_id: DeltaMptId,
160}
161
162impl Drop for DeltaDbReleaser {
163 fn drop(&mut self) {
164 Weak::upgrade(&self.storage_manager).map(|storage_manager| {
170 storage_manager.release_delta_mpt_actions_in_drop(
171 &self.snapshot_epoch_id,
172 self.mpt_id,
173 )
174 });
175 }
176}
177
178pub struct InProgressSnapshotTask {
180 snapshot_info: SnapshotInfo,
181 thread_handle: Option<thread::JoinHandle<Result<()>>>,
182}
183
184impl InProgressSnapshotTask {
185 pub fn join(&mut self) -> Option<Result<()>> {
188 if let Some(join_handle) = self.thread_handle.take() {
189 match join_handle.join() {
190 Ok(task_result) => Some(task_result),
191 Err(_) => Some(Err(Error::ThreadPanicked(format!(
192 "Background Snapshotting for {:?} panicked.",
193 self.snapshot_info
194 ))
195 .into())),
196 }
197 } else {
198 None
199 }
200 }
201}
202
203impl StorageManager {
204 pub fn new_arc(
205 storage_conf: StorageConfiguration,
207 ) -> Result<Arc<Self>> {
208 let storage_dir = storage_conf.path_storage_dir.as_path();
209 debug!(
210 "new StorageManager within storage_dir {}",
211 storage_dir.display()
212 );
213 if !storage_dir.exists() {
214 fs::create_dir_all(storage_dir)?;
215 }
216
217 let (_, snapshot_info_db) = KvdbSqlite::open_or_create(
218 &storage_conf.path_snapshot_info_db,
219 SNAPSHOT_KVDB_STATEMENTS.clone(),
220 false, )?;
222 let snapshot_info_map =
223 PersistedSnapshotInfoMap::new(snapshot_info_db)?;
224
225 let (
226 in_progress_snapshot_finish_signaler,
227 in_progress_snapshot_finish_signal_receiver,
228 ) = channel();
229
230 let delta_db_manager = Arc::new(DeltaDbManager::new(
231 storage_conf.path_delta_mpts_dir.clone(),
232 )?);
233 let new_storage_manager_result = Ok(Arc::new(Self {
234 delta_db_manager: delta_db_manager.clone(),
235 delta_mpt_open_db_lru: Arc::new(OpenDeltaDbLru::new(
236 delta_db_manager.clone(),
237 storage_conf.max_open_mpt_count,
238 )?),
239 snapshot_manager: Box::new(SnapshotManager::<SnapshotDbManager> {
240 snapshot_db_manager: SnapshotDbManager::new(
241 storage_conf.path_snapshot_dir.clone(),
242 storage_conf.max_open_snapshots,
243 storage_conf.use_isolated_db_for_mpt_table,
244 storage_conf.use_isolated_db_for_mpt_table_height,
245 storage_conf.consensus_param.era_epoch_count,
246 storage_conf.backup_mpt_snapshot,
247 )?,
248 }),
249 delta_mpts_id_gen: Default::default(),
250 delta_mpts_node_memory_manager: Arc::new(
251 DeltaMptsNodeMemoryManager::new(
252 storage_conf.delta_mpts_cache_start_size,
253 storage_conf.delta_mpts_cache_size,
254 storage_conf.delta_mpts_slab_idle_size,
255 storage_conf.delta_mpts_node_map_vec_size,
256 DeltaMptsCacheAlgorithm::new(
257 storage_conf.delta_mpts_cache_size,
258 ),
259 ),
260 ),
261 maybe_db_errors: MaybeDeltaTrieDestroyErrors::new(),
262 snapshot_associated_mpts_by_epoch: Default::default(),
263 in_progress_snapshotting_tasks: Default::default(),
264 in_progress_snapshot_finish_signaler: Arc::new(Mutex::new(
265 in_progress_snapshot_finish_signaler,
266 )),
267 in_progress_snapshotting_joiner: Default::default(),
268 current_snapshots: Default::default(),
269 snapshot_info_map_by_epoch: RwLock::new(snapshot_info_map),
270 last_confirmed_snapshottable_epoch_id: Default::default(),
271 storage_conf,
272 intermediate_trie_root_merkle: RwLock::new(None),
273 persist_state_from_initialization: RwLock::new(None),
274 }));
275
276 let storage_manager_arc =
277 new_storage_manager_result.as_ref().unwrap().clone();
278 *new_storage_manager_result.as_ref().unwrap().in_progress_snapshotting_joiner.lock() =
279 Some(thread::Builder::new()
280 .name("Background Snapshot Joiner".to_string()).spawn(
281 move || {
282 for exit_program_or_finished_snapshot in
283 in_progress_snapshot_finish_signal_receiver.iter() {
284 if exit_program_or_finished_snapshot.is_none() {
285 break;
286 }
287 let finished_snapshot = exit_program_or_finished_snapshot.unwrap();
288 if let Some(task) = storage_manager_arc
289 .in_progress_snapshotting_tasks.read().get(&finished_snapshot) {
290 let snapshot_result = task.write().join();
291 if let Some(Err(e)) = snapshot_result {
292 warn!(
293 "Background snapshotting for {:?} failed with {}",
294 finished_snapshot, e);
295 }
296 }
297 storage_manager_arc.in_progress_snapshotting_tasks
298 .write().remove(&finished_snapshot);
299 }
300 }
302 )?);
303
304 new_storage_manager_result
305 .as_ref()
306 .unwrap()
307 .load_persist_state()?;
308
309 new_storage_manager_result
310 }
311
312 pub fn find_merkle_root(
313 current_snapshots: &Vec<SnapshotInfo>, epoch_id: &EpochId,
314 ) -> Option<MerkleHash> {
315 current_snapshots
316 .iter()
317 .find(|i| i.get_snapshot_epoch_id() == epoch_id)
318 .map(|i| i.merkle_root.clone())
319 }
320
321 pub fn wait_for_snapshot(
322 &self, snapshot_epoch_id: &EpochId, try_open: bool,
323 open_mpt_snapshot: bool,
324 ) -> Result<
325 Option<
326 GuardedValue<RwLockReadGuard<'_, Vec<SnapshotInfo>>, SnapshotDb>,
327 >,
328 > {
329 let _snapshot_info_lock = self.snapshot_info_map_by_epoch.read();
333 let guard = self.current_snapshots.read();
336 match self.snapshot_manager.get_snapshot_by_epoch_id(
337 snapshot_epoch_id,
338 try_open,
339 open_mpt_snapshot,
340 )? {
341 Some(snapshot_db) => {
342 Ok(Some(GuardedValue::new(guard, snapshot_db)))
343 }
344 None => {
345 drop(_snapshot_info_lock);
346 drop(guard);
347 if let Some(in_progress_snapshot_task) = self
349 .in_progress_snapshotting_tasks
350 .read()
351 .get(snapshot_epoch_id)
352 .cloned()
353 {
354 if let Some(result) =
357 in_progress_snapshot_task.write().join()
358 {
359 result?;
360 }
361 let guard = self.current_snapshots.read();
362 match self.snapshot_manager.get_snapshot_by_epoch_id(
363 snapshot_epoch_id,
364 try_open,
365 open_mpt_snapshot,
366 ) {
367 Err(e) => Err(e),
368 Ok(None) => Ok(None),
369 Ok(Some(snapshot_db)) => {
370 Ok(Some(GuardedValue::new(guard, snapshot_db)))
371 }
372 }
373 } else {
374 Ok(None)
375 }
376 }
377 }
378 }
379
380 pub fn graceful_shutdown(&self) {
381 self.in_progress_snapshot_finish_signaler
384 .lock()
385 .send(None)
386 .ok();
387 if let Some(joiner) = self.in_progress_snapshotting_joiner.lock().take()
388 {
389 joiner.join().ok();
390 }
391 }
392
393 pub fn get_snapshot_manager(
394 &self,
395 ) -> &(dyn SnapshotManagerTrait<
396 SnapshotDb = SnapshotDb,
397 SnapshotDbManager = SnapshotDbManager,
398 > + Send
399 + Sync) {
400 &*self.snapshot_manager
401 }
402
403 pub fn get_snapshot_epoch_count(&self) -> u32 {
404 self.storage_conf.consensus_param.snapshot_epoch_count
405 }
406
407 pub fn get_snapshot_info_at_epoch(
408 &self, snapshot_epoch_id: &EpochId,
409 ) -> Option<SnapshotInfo> {
410 self.snapshot_info_map_by_epoch
411 .read()
412 .get(snapshot_epoch_id)
413 .map(Clone::clone)
414 }
415
416 pub fn get_delta_mpt(
417 self: &Arc<Self>, snapshot_epoch_id: &EpochId,
418 ) -> Result<Arc<DeltaMpt>> {
419 {
420 let snapshot_associated_mpts_locked =
421 self.snapshot_associated_mpts_by_epoch.read();
422 match snapshot_associated_mpts_locked.get(snapshot_epoch_id) {
423 None => bail!(Error::DeltaMPTEntryNotFound),
424 Some(delta_mpts) => {
425 if delta_mpts.1.is_some() {
426 return Ok(delta_mpts.1.as_ref().unwrap().clone());
427 }
428 }
429 }
430 }
431
432 StorageManager::new_or_get_delta_mpt(
433 self.clone(),
434 snapshot_epoch_id,
435 &mut *self.snapshot_associated_mpts_by_epoch.write(),
436 )
437 }
438
439 pub fn get_intermediate_mpt(
440 &self, snapshot_epoch_id: &EpochId,
441 ) -> Result<Option<Arc<DeltaMpt>>> {
442 match self
443 .snapshot_associated_mpts_by_epoch
444 .read()
445 .get(snapshot_epoch_id)
446 {
447 None => bail!(Error::DeltaMPTEntryNotFound),
448 Some(mpts) => Ok(mpts.0.clone()),
449 }
450 }
451
452 pub fn new_or_get_delta_mpt(
454 storage_manager: Arc<StorageManager>, snapshot_epoch_id: &EpochId,
455 snapshot_associated_mpts_mut: &mut HashMap<
456 EpochId,
457 (Option<Arc<DeltaMpt>>, Option<Arc<DeltaMpt>>),
458 >,
459 ) -> Result<Arc<DeltaMpt>> {
460 let mut maybe_snapshot_entry =
465 snapshot_associated_mpts_mut.get_mut(snapshot_epoch_id);
466 if maybe_snapshot_entry.is_none() {
467 bail!(Error::SnapshotNotFound);
468 };
469 if maybe_snapshot_entry.as_ref().unwrap().1.is_some() {
471 return Ok(maybe_snapshot_entry
472 .unwrap()
473 .1
474 .as_ref()
475 .unwrap()
476 .clone());
477 } else {
478 let mpt_id = storage_manager.delta_mpts_id_gen.lock().allocate()?;
479 let db_result = storage_manager
480 .delta_mpt_open_db_lru
481 .create(&snapshot_epoch_id, mpt_id);
482 if db_result.is_err() {
483 storage_manager.delta_mpts_id_gen.lock().free(mpt_id);
484 db_result?;
485 }
486 let arc_delta_mpt = Arc::new(DeltaMpt::new(
487 storage_manager.delta_mpt_open_db_lru.clone(),
488 snapshot_epoch_id.clone(),
489 storage_manager.clone(),
490 mpt_id,
491 storage_manager.delta_mpts_node_memory_manager.clone(),
492 )?);
493
494 maybe_snapshot_entry.as_mut().unwrap().1 =
495 Some(arc_delta_mpt.clone());
496 if snapshot_epoch_id.eq(&NULL_EPOCH) {
499 maybe_snapshot_entry.unwrap().0 = Some(arc_delta_mpt.clone());
500 }
501
502 return Ok(arc_delta_mpt);
503 }
504 }
505
506 fn release_delta_mpt_actions_in_drop(
510 &self, snapshot_epoch_id: &EpochId, delta_mpt_id: DeltaMptId,
511 ) {
512 debug!(
513 "release_delta_mpt_actions_in_drop: snapshot_epoch_id: {:?}, delta_mpt_id: {}",
514 snapshot_epoch_id, delta_mpt_id
515 );
516 self.delta_mpts_node_memory_manager
517 .delete_mpt_from_cache(delta_mpt_id);
518 self.delta_mpt_open_db_lru.release(delta_mpt_id, true);
519 self.delta_mpts_id_gen.lock().free(delta_mpt_id);
520 self.maybe_db_errors.set_maybe_error(
521 self.delta_db_manager
522 .destroy_delta_db(
523 &self.delta_db_manager.get_delta_db_name(snapshot_epoch_id),
524 )
525 .err(),
526 );
527 }
528
529 fn release_delta_mpts_from_snapshot(
530 &self,
531 snapshot_associated_mpts_by_epoch: &mut HashMap<
532 EpochId,
533 (Option<Arc<DeltaMpt>>, Option<Arc<DeltaMpt>>),
534 >,
535 snapshot_epoch_id: &EpochId,
536 ) -> Result<()> {
537 snapshot_associated_mpts_by_epoch.remove(snapshot_epoch_id);
539 self.maybe_db_errors.take_result()
540 }
541
542 pub fn check_make_register_snapshot_background(
543 this: Arc<Self>, snapshot_epoch_id: EpochId, height: u64,
544 maybe_delta_db: Option<DeltaMptIterator>,
545 recover_mpt_during_construct_pivot_state: bool,
546 ) -> Result<()> {
547 let this_cloned = this.clone();
548 let mut in_progress_snapshotting_tasks =
549 this_cloned.in_progress_snapshotting_tasks.write();
550
551 let mut recover_mpt_with_kv_snapshot_exist = false;
552 if !in_progress_snapshotting_tasks.contains_key(&snapshot_epoch_id)
553 && this
554 .snapshot_info_map_by_epoch
555 .read()
556 .get(&snapshot_epoch_id)
557 .map_or(true, |info| {
558 if info.snapshot_info_kept_to_provide_sync
559 == SnapshotKeptToProvideSyncStatus::InfoOnly
560 {
561 true
562 } else {
563 recover_mpt_with_kv_snapshot_exist =
564 recover_mpt_during_construct_pivot_state;
565 recover_mpt_during_construct_pivot_state
566 }
567 })
568 {
569 debug!(
570 "start check_make_register_snapshot_background: epoch={:?} height={:?}",
571 snapshot_epoch_id, height
572 );
573
574 let mut pivot_chain_parts = vec![
575 Default::default();
576 this.storage_conf.consensus_param.snapshot_epoch_count
577 as usize
578 ];
579 let mut epoch_id = snapshot_epoch_id.clone();
581 let mut delta_height =
582 this.storage_conf.consensus_param.snapshot_epoch_count as usize
583 - 1;
584 pivot_chain_parts[delta_height] = epoch_id.clone();
585 let parent_snapshot_epoch_id = if maybe_delta_db.is_none() {
587 NULL_EPOCH
593 } else {
594 let delta_db = maybe_delta_db.as_ref().unwrap();
595 while delta_height > 0 {
596 epoch_id = match delta_db.mpt.get_parent_epoch(&epoch_id)? {
597 None => bail!(Error::DbValueError),
598 Some(epoch_id) => epoch_id,
599 };
600 delta_height -= 1;
601 pivot_chain_parts[delta_height] = epoch_id.clone();
602 trace!(
603 "check_make_register_snapshot_background: parent epoch_id={:?}",
604 epoch_id
605 );
606 }
607 if height
608 == this.storage_conf.consensus_param.snapshot_epoch_count
609 as u64
610 {
611 NULL_EPOCH
616 } else {
617 delta_db.mpt.get_parent_epoch(&epoch_id)?.unwrap()
618 }
619 };
620
621 let in_progress_snapshot_info = SnapshotInfo {
622 snapshot_info_kept_to_provide_sync: Default::default(),
623 serve_one_step_sync: true,
624 height: height as u64,
625 parent_snapshot_height: height
626 - this.storage_conf.consensus_param.snapshot_epoch_count
627 as u64,
628 merkle_root: Default::default(),
630 parent_snapshot_epoch_id,
631 pivot_chain_parts,
632 };
633
634 let parent_snapshot_epoch_id_cloned =
635 in_progress_snapshot_info.parent_snapshot_epoch_id.clone();
636 let mut in_progress_snapshot_info_cloned =
637 in_progress_snapshot_info.clone();
638 let task_finished_sender_cloned =
639 this.in_progress_snapshot_finish_signaler.clone();
640 let thread_handle = thread::Builder::new()
641 .name("Background Snapshotting".into()).spawn(move || {
642 let f = || -> Result<()> {
644 let (mut snapshot_info_map_locked, new_snapshot_info) = match maybe_delta_db {
645 None => {
646 in_progress_snapshot_info_cloned.merkle_root = MERKLE_NULL_NODE;
647 (this.snapshot_info_map_by_epoch.write(), in_progress_snapshot_info_cloned)
648 }
649 Some(delta_db) => {
650 this.snapshot_manager
651 .get_snapshot_db_manager()
652 .new_snapshot_by_merging(
653 &parent_snapshot_epoch_id_cloned,
654 snapshot_epoch_id.clone(), delta_db,
655 in_progress_snapshot_info_cloned,
656 &this.snapshot_info_map_by_epoch,
657 height,
658 recover_mpt_with_kv_snapshot_exist)?
659 }
660 };
661 if let Err(e) = this.register_new_snapshot(new_snapshot_info.clone(), &mut snapshot_info_map_locked) {
662 error!(
663 "Failed to register new snapshot {:?} {:?}.",
664 snapshot_epoch_id, new_snapshot_info
665 );
666 bail!(e);
667 }
668
669 task_finished_sender_cloned.lock().send(Some(snapshot_epoch_id))
670 .or(Err(Error::from(Error::MpscError)))?;
671 drop(snapshot_info_map_locked);
672
673 let debug_snapshot_checkers =
674 this.storage_conf.debug_snapshot_checker_threads;
675 for snapshot_checker in 0..debug_snapshot_checkers {
676 let begin_range =
677 (256 / debug_snapshot_checkers * snapshot_checker) as u8;
678 let end_range =
679 256 / debug_snapshot_checkers * (snapshot_checker + 1);
680 let end_range_excl = if end_range != 256 {
681 Some(vec![end_range as u8])
682 } else {
683 None
684 };
685 let this = this.clone();
686 thread::Builder::new().name(
687 format!("snapshot checker {} - {}", begin_range, end_range)).spawn(
688 move || -> Result<()> {
689 debug!(
690 "Start snapshot checker {} of {}",
691 snapshot_checker, debug_snapshot_checkers);
692 let snapshot_db = this.snapshot_manager
693 .get_snapshot_by_epoch_id(
694 &snapshot_epoch_id,
695 false,
696 true
697 )?.unwrap();
698 let mut set_keys_iter =
699 snapshot_db.dumped_delta_kv_set_keys_iterator()?;
700 let mut delete_keys_iter =
701 snapshot_db.dumped_delta_kv_delete_keys_iterator()?;
702 let previous_snapshot_db = this.snapshot_manager
703 .get_snapshot_by_epoch_id(
704 &parent_snapshot_epoch_id_cloned,
705 false,
706 false
707 )?.unwrap();
708 let mut previous_set_keys_iter = previous_snapshot_db
709 .dumped_delta_kv_set_keys_iterator()?;
710 let mut previous_delete_keys_iter =
711 previous_snapshot_db
712 .dumped_delta_kv_delete_keys_iterator()?;
713
714 let mut checker_count = 0;
715
716 let set_iter = set_keys_iter.iter_range(
717 &[begin_range],
718 end_range_excl.as_ref().map(|v| &**v))?
719 .take();
720 checker_count += check_key_value_load(&snapshot_db, set_iter, true)?;
721
722 let set_iter = previous_set_keys_iter.iter_range(
723 &[begin_range], end_range_excl.as_ref().map(|v| &**v))?
724 .take();
725 checker_count += check_key_value_load(&snapshot_db, set_iter, false)?;
726
727 let delete_iter = delete_keys_iter.iter_range(
728 &[begin_range], end_range_excl.as_ref().map(|v| &**v))?
729 .take();
730 checker_count += check_key_value_load(&snapshot_db, delete_iter, false)?;
731
732 let delete_iter = previous_delete_keys_iter.iter_range(
733 &[begin_range], end_range_excl.as_ref().map(|v| &**v))?
734 .take();
735 checker_count += check_key_value_load(&snapshot_db, delete_iter, false)?;
736
737 debug!(
738 "Finished: snapshot checker {} of {}, {} keys",
739 snapshot_checker, debug_snapshot_checkers, checker_count);
740 Ok(())
741 }
742 )?;
743 }
744
745 Ok(())
746 };
747
748 let task_result = f();
749 if task_result.is_err() {
750 warn!(
751 "Failed to create snapshot for epoch_id {:?} with error {:?}",
752 snapshot_epoch_id, task_result.as_ref().unwrap_err());
753 }
754
755 task_result
756 })?;
757
758 in_progress_snapshotting_tasks.insert(
759 snapshot_epoch_id,
760 Arc::new(RwLock::new(InProgressSnapshotTask {
761 snapshot_info: in_progress_snapshot_info,
762 thread_handle: Some(thread_handle),
763 })),
764 );
765 }
766
767 Ok(())
768 }
769
770 pub fn register_new_snapshot(
772 self: &Arc<Self>, new_snapshot_info: SnapshotInfo,
773 snapshot_info_map_locked: &mut PersistedSnapshotInfoMap,
774 ) -> Result<()> {
775 debug!("register_new_snapshot: info={:?}", new_snapshot_info);
776 let snapshot_epoch_id = new_snapshot_info.get_snapshot_epoch_id();
777 let mut snapshot_associated_mpts_locked =
779 self.snapshot_associated_mpts_by_epoch.write();
780 let in_recover_mode =
781 snapshot_associated_mpts_locked.contains_key(snapshot_epoch_id);
782
783 let maybe_intermediate_delta_mpt = match snapshot_associated_mpts_locked
793 .get(&new_snapshot_info.parent_snapshot_epoch_id)
794 {
795 None => {
796 snapshot_associated_mpts_locked.insert(
799 new_snapshot_info.parent_snapshot_epoch_id.clone(),
800 (None, None),
801 );
802 let parent_delta_mpt =
803 Some(StorageManager::new_or_get_delta_mpt(
804 self.clone(),
805 &new_snapshot_info.parent_snapshot_epoch_id,
806 &mut *snapshot_associated_mpts_locked,
807 )?);
808 snapshot_associated_mpts_locked
809 .remove(&new_snapshot_info.parent_snapshot_epoch_id);
810
811 parent_delta_mpt
812 }
813 Some(parent_snapshot_associated_mpts) => {
814 if parent_snapshot_associated_mpts.1.is_none() {
815 debug!("MPT for parent_snapshot_epoch_id is none");
816 Some(StorageManager::new_or_get_delta_mpt(
817 self.clone(),
818 &new_snapshot_info.parent_snapshot_epoch_id,
819 &mut *snapshot_associated_mpts_locked,
820 )?)
821 } else {
822 parent_snapshot_associated_mpts.1.clone()
823 }
824 }
825 };
826 let delta_mpt = if in_recover_mode {
827 snapshot_associated_mpts_locked
828 .get_mut(snapshot_epoch_id)
829 .unwrap()
831 .1
832 .take()
833 } else {
834 None
835 };
836 if !in_recover_mode || maybe_intermediate_delta_mpt.is_some() {
837 snapshot_associated_mpts_locked.insert(
838 snapshot_epoch_id.clone(),
839 (maybe_intermediate_delta_mpt, delta_mpt),
840 );
841 }
842
843 drop(snapshot_associated_mpts_locked);
844 snapshot_info_map_locked
845 .insert(snapshot_epoch_id, new_snapshot_info.clone())?;
846 if !in_recover_mode {
847 self.current_snapshots.write().push(new_snapshot_info);
848 }
849
850 Ok(())
851 }
852
853 pub fn maintain_state_confirmed<ConsensusInner: StateMaintenanceTrait>(
854 &self, consensus_inner: &ConsensusInner, stable_checkpoint_height: u64,
855 era_epoch_count: u64, confirmed_height: u64,
856 state_availability_boundary: &RwLock<StateAvailabilityBoundary>,
857 ) -> Result<()> {
858 let additional_state_height_gap =
859 (self.storage_conf.additional_maintained_snapshot_count
860 * self.get_snapshot_epoch_count()) as u64;
861 let maintained_state_height_lower_bound =
862 if confirmed_height > additional_state_height_gap {
863 confirmed_height - additional_state_height_gap
864 } else {
865 0
866 };
867 if maintained_state_height_lower_bound
868 <= state_availability_boundary.read().lower_bound
869 {
870 return Ok(());
871 }
872 let maintained_epoch_id = consensus_inner
873 .get_pivot_hash_from_epoch_number(
874 maintained_state_height_lower_bound,
875 )?;
876 let maintained_epoch_execution_commitment = consensus_inner
877 .get_epoch_execution_commitment_with_db(&maintained_epoch_id);
878 let maintained_state_root = match &maintained_epoch_execution_commitment
879 {
880 Some(commitment) => &commitment.state_root_with_aux_info,
881 None => return Ok(()),
882 };
883
884 self.maintain_snapshots_pivot_chain_confirmed(
885 maintained_state_height_lower_bound,
886 &maintained_epoch_id,
887 maintained_state_root,
888 state_availability_boundary,
889 &|height, find_nearest_snapshot_multiple_of| {
890 extra_snapshots_to_keep_predicate(
891 &self.storage_conf,
892 stable_checkpoint_height,
893 era_epoch_count,
894 height,
895 find_nearest_snapshot_multiple_of,
896 )
897 },
898 stable_checkpoint_height,
899 )
900 }
901
902 pub fn maintain_snapshots_pivot_chain_confirmed(
916 &self, maintained_state_height_lower_bound: u64,
917 maintained_epoch_id: &EpochId,
918 maintained_state_root: &StateRootWithAuxInfo,
919 state_availability_boundary: &RwLock<StateAvailabilityBoundary>,
920 extra_snapshots_to_keep: &dyn Fn(u64, &mut bool) -> bool,
921 stable_checkpoint_height: u64,
922 ) -> Result<()> {
923 {
926 let mut last_confirmed_snapshottable_id_locked =
927 self.last_confirmed_snapshottable_epoch_id.lock();
928 if last_confirmed_snapshottable_id_locked.is_some() {
929 if maintained_state_root.aux_info.intermediate_epoch_id.eq(
930 last_confirmed_snapshottable_id_locked.as_ref().unwrap(),
931 ) {
932 return Ok(());
933 }
934 }
935 *last_confirmed_snapshottable_id_locked = Some(
936 maintained_state_root.aux_info.intermediate_epoch_id.clone(),
937 );
938 }
939
940 let confirmed_intermediate_height = maintained_state_height_lower_bound
941 - StateIndex::height_to_delta_height(
942 maintained_state_height_lower_bound,
943 self.get_snapshot_epoch_count(),
944 ) as u64;
945
946 let confirmed_snapshot_height = if confirmed_intermediate_height
947 > self.get_snapshot_epoch_count() as u64
948 {
949 confirmed_intermediate_height
950 - self.get_snapshot_epoch_count() as u64
951 } else {
952 0
953 };
954 let first_available_state_height = if confirmed_snapshot_height > 0 {
955 confirmed_snapshot_height + 1
956 } else {
957 0
958 };
959
960 debug!(
961 "maintain_snapshots_pivot_chain_confirmed: confirmed_height {}, \
962 confirmed_epoch_id {:?}, confirmed_intermediate_id {:?}, \
963 confirmed_snapshot_id {:?}, confirmed_intermediate_height {}, \
964 confirmed_snapshot_height {}, first_available_state_height {}",
965 maintained_state_height_lower_bound,
966 maintained_epoch_id,
967 maintained_state_root.aux_info.intermediate_epoch_id,
968 maintained_state_root.aux_info.snapshot_epoch_id,
969 confirmed_intermediate_height,
970 confirmed_snapshot_height,
971 first_available_state_height,
972 );
973 let mut extra_snapshot_infos_kept_for_sync = vec![];
974 let mut non_pivot_snapshots_to_remove = HashSet::new();
975 let mut old_pivot_snapshots_to_remove = vec![];
976 let mut old_pivot_snapshot_infos_to_remove = vec![];
981 let mut find_nearest_multiple_of = false;
982 let mut in_progress_snapshot_to_cancel = vec![];
983
984 {
985 let current_snapshots = self.current_snapshots.read();
986
987 let mut prev_snapshot_epoch_id = &NULL_EPOCH;
988
989 for snapshot_info in current_snapshots.iter().rev() {
991 let snapshot_epoch_id = snapshot_info.get_snapshot_epoch_id();
992 if snapshot_info.height == confirmed_snapshot_height {
993 if snapshot_epoch_id
996 .eq(&maintained_state_root.aux_info.snapshot_epoch_id)
997 {
998 prev_snapshot_epoch_id =
999 &snapshot_info.parent_snapshot_epoch_id;
1000 } else {
1001 non_pivot_snapshots_to_remove
1002 .insert(snapshot_epoch_id.clone());
1003 }
1004 } else if snapshot_info.height < confirmed_snapshot_height {
1005 if snapshot_epoch_id.eq(prev_snapshot_epoch_id) {
1007 if extra_snapshots_to_keep(
1008 snapshot_info.height,
1009 &mut find_nearest_multiple_of,
1010 ) {
1011 for snapshot_epoch_id_to_keep_info in std::mem::take(
1014 &mut old_pivot_snapshot_infos_to_remove,
1015 ) {
1016 extra_snapshot_infos_kept_for_sync.push((
1017 snapshot_epoch_id_to_keep_info,
1018 SnapshotKeptToProvideSyncStatus::InfoOnly,
1019 ));
1020 }
1021 extra_snapshot_infos_kept_for_sync
1022 .push((snapshot_epoch_id.clone(), SnapshotKeptToProvideSyncStatus::InfoAndSnapshot));
1023 } else {
1024 if snapshot_info.height
1027 + self
1028 .storage_conf
1029 .consensus_param
1030 .snapshot_epoch_count
1031 as u64
1032 != stable_checkpoint_height
1033 {
1034 old_pivot_snapshot_infos_to_remove
1035 .push(snapshot_epoch_id.clone());
1036 }
1037 old_pivot_snapshots_to_remove
1038 .push(snapshot_epoch_id.clone());
1039 }
1040 prev_snapshot_epoch_id =
1041 &snapshot_info.parent_snapshot_epoch_id;
1042 } else {
1043 non_pivot_snapshots_to_remove
1045 .insert(snapshot_epoch_id.clone());
1046 }
1047 } else if snapshot_info.height
1048 < maintained_state_height_lower_bound
1049 {
1050 if snapshot_info
1057 .get_epoch_id_at_height(confirmed_intermediate_height)
1058 != Some(
1059 &maintained_state_root
1060 .aux_info
1061 .intermediate_epoch_id,
1062 )
1063 {
1064 debug!(
1065 "remove mismatch intermediate snapshot: {:?}",
1066 snapshot_info.get_epoch_id_at_height(
1067 confirmed_intermediate_height
1068 )
1069 );
1070 non_pivot_snapshots_to_remove
1071 .insert(snapshot_epoch_id.clone());
1072 }
1073 }
1074 }
1075
1076 debug!(
1077 "finished scanning for lower snapshots: \
1078 old_pivot_snapshots_to_remove {:?}, \
1079 old_pivot_snapshot_infos_to_remove {:?}, \
1080 non_pivot_snapshots_to_remove {:?}",
1081 old_pivot_snapshots_to_remove,
1082 old_pivot_snapshot_infos_to_remove,
1083 non_pivot_snapshots_to_remove
1084 );
1085
1086 for snapshot_info in &*current_snapshots {
1088 match snapshot_info
1090 .get_epoch_id_at_height(maintained_state_height_lower_bound)
1091 {
1092 Some(path_epoch_id) => {
1093 if path_epoch_id != maintained_epoch_id {
1097 debug!(
1098 "remove non-subtree snapshot {:?}, got {:?}, expected {:?}",
1099 snapshot_info.get_snapshot_epoch_id(),
1100 path_epoch_id, maintained_epoch_id,
1101 );
1102 non_pivot_snapshots_to_remove.insert(
1103 snapshot_info.get_snapshot_epoch_id().clone(),
1104 );
1105 }
1106 }
1107 None => {
1108 if non_pivot_snapshots_to_remove
1112 .contains(&snapshot_info.parent_snapshot_epoch_id)
1113 {
1114 debug!(
1115 "remove non-subtree deep snapshot {:?}, parent_snapshot_epoch_id {:?}",
1116 snapshot_info.get_snapshot_epoch_id(),
1117 snapshot_info.parent_snapshot_epoch_id
1118 );
1119 non_pivot_snapshots_to_remove.insert(
1123 snapshot_info.get_snapshot_epoch_id().clone(),
1124 );
1125 }
1126 }
1127 }
1128 }
1129 }
1130
1131 for (in_progress_epoch_id, in_progress_snapshot_task) in
1132 &*self.in_progress_snapshotting_tasks.read()
1133 {
1134 let mut to_cancel = false;
1135 let in_progress_snapshot_info =
1136 &in_progress_snapshot_task.read().snapshot_info;
1137
1138 if in_progress_snapshot_info.height < confirmed_intermediate_height
1140 {
1141 to_cancel = true;
1142 } else if in_progress_snapshot_info.height
1143 < maintained_state_height_lower_bound
1144 {
1145 if in_progress_snapshot_info
1146 .get_epoch_id_at_height(confirmed_intermediate_height)
1147 != Some(
1148 &maintained_state_root.aux_info.intermediate_epoch_id,
1149 )
1150 {
1151 to_cancel = true;
1152 }
1153 } else {
1154 match in_progress_snapshot_info
1155 .get_epoch_id_at_height(maintained_state_height_lower_bound)
1156 {
1157 Some(path_epoch_id) => {
1158 if path_epoch_id != maintained_epoch_id {
1159 to_cancel = true;
1160 }
1161 }
1162 None => {
1163 if non_pivot_snapshots_to_remove.contains(
1164 &in_progress_snapshot_info.parent_snapshot_epoch_id,
1165 ) {
1166 to_cancel = true;
1167 }
1168 }
1169 }
1170 }
1171
1172 if to_cancel {
1173 in_progress_snapshot_to_cancel
1174 .push(in_progress_epoch_id.clone())
1175 }
1176 }
1177
1178 let mut non_pivot_snapshots_to_remove =
1179 non_pivot_snapshots_to_remove.drain().collect();
1180 {
1183 let mut info_maps = self.snapshot_info_map_by_epoch.write();
1184 let removal_filter = |vec: &mut Vec<EpochId>| {
1185 vec.retain(|epoch| {
1186 info_maps.get(epoch).map_or(true, |info| {
1187 info.snapshot_info_kept_to_provide_sync
1189 != SnapshotKeptToProvideSyncStatus::InfoOnly
1190 })
1191 })
1192 };
1193 removal_filter(&mut non_pivot_snapshots_to_remove);
1194 removal_filter(&mut old_pivot_snapshots_to_remove);
1195
1196 let mut updated_snapshot_info_epochs =
1197 HashMap::<EpochId, SnapshotKeptToProvideSyncStatus>::default();
1198 for (epoch, new_status) in &extra_snapshot_infos_kept_for_sync {
1199 if let Some(info) = info_maps.get(epoch) {
1200 if info.snapshot_info_kept_to_provide_sync != *new_status {
1201 let mut new_snapshot_info = info.clone();
1202 new_snapshot_info.snapshot_info_kept_to_provide_sync =
1203 *new_status;
1204 info_maps.insert(epoch, new_snapshot_info)?;
1205 updated_snapshot_info_epochs
1206 .insert(*epoch, *new_status);
1207 }
1208 }
1209 }
1210 if updated_snapshot_info_epochs.len() > 0 {
1211 let mut current_snapshots = self.current_snapshots.write();
1212 for snapshot_info in current_snapshots.iter_mut() {
1213 if let Some(new_status) = updated_snapshot_info_epochs
1214 .get(&snapshot_info.get_snapshot_epoch_id())
1215 {
1216 snapshot_info.snapshot_info_kept_to_provide_sync =
1217 *new_status;
1218 }
1219 }
1220 }
1221 }
1222 if !non_pivot_snapshots_to_remove.is_empty()
1223 || !old_pivot_snapshots_to_remove.is_empty()
1224 {
1225 {
1226 let state_boundary = &mut *state_availability_boundary.write();
1228 if first_available_state_height > state_boundary.lower_bound {
1229 state_boundary
1230 .adjust_lower_bound(first_available_state_height);
1231 }
1232 }
1233
1234 self.remove_snapshots(
1235 &old_pivot_snapshots_to_remove,
1236 &non_pivot_snapshots_to_remove,
1237 &old_pivot_snapshot_infos_to_remove
1238 .iter()
1239 .chain(non_pivot_snapshots_to_remove.iter())
1240 .cloned()
1241 .collect(),
1242 )?;
1243 }
1244
1245 info!("maintain_snapshots_pivot_chain_confirmed: finished");
1257 Ok(())
1258 }
1259
1260 fn remove_snapshots(
1261 &self, old_pivot_snapshots_to_remove: &[EpochId],
1262 non_pivot_snapshots_to_remove: &[EpochId],
1263 snapshot_infos_to_remove: &HashSet<EpochId>,
1264 ) -> Result<()> {
1265 let mut current_snapshots_locked = self.current_snapshots.write();
1266 current_snapshots_locked.retain(|x| {
1267 !snapshot_infos_to_remove.contains(x.get_snapshot_epoch_id())
1268 });
1269 info!(
1270 "maintain_snapshots_pivot_chain_confirmed: remove the following snapshot infos {:?}",
1271 snapshot_infos_to_remove,
1272 );
1273 for snapshot_epoch_id in old_pivot_snapshots_to_remove {
1274 self.snapshot_manager
1275 .remove_old_pivot_snapshot(&snapshot_epoch_id)?;
1276 }
1277 for snapshot_epoch_id in non_pivot_snapshots_to_remove {
1278 self.snapshot_manager
1279 .remove_non_pivot_snapshot(&snapshot_epoch_id)?;
1280 }
1281
1282 drop(current_snapshots_locked);
1283 unsafe {
1284 let mut snapshot_info_map = self.snapshot_info_map_by_epoch.write();
1285 for snapshot_epoch_id in snapshot_infos_to_remove {
1286 snapshot_info_map.remove_in_mem_only(snapshot_epoch_id);
1287 }
1288 }
1289 {
1290 let snapshot_associated_mpts_by_epoch_locked =
1291 &mut *self.snapshot_associated_mpts_by_epoch.write();
1292
1293 for snapshot_epoch_id in old_pivot_snapshots_to_remove
1294 .iter()
1295 .chain(non_pivot_snapshots_to_remove.iter())
1296 {
1297 self.release_delta_mpts_from_snapshot(
1298 snapshot_associated_mpts_by_epoch_locked,
1299 snapshot_epoch_id,
1300 )?
1301 }
1302 }
1303 {
1304 let mut snapshot_info_map_by_epoch =
1307 self.snapshot_info_map_by_epoch.write();
1308 for snapshot_epoch_id in snapshot_infos_to_remove {
1309 snapshot_info_map_by_epoch.remove(&snapshot_epoch_id)?;
1310 }
1311 }
1312
1313 Ok(())
1314 }
1315
1316 pub fn log_usage(&self) {
1317 let mut delta_mpts = HashMap::new();
1318 for (_snapshot_epoch_id, associated_delta_mpts) in
1319 &*self.snapshot_associated_mpts_by_epoch.read()
1320 {
1321 if let Some(delta_mpt) = associated_delta_mpts.0.as_ref() {
1322 delta_mpts.insert(delta_mpt.get_mpt_id(), delta_mpt.clone());
1323 }
1324 if let Some(delta_mpt) = associated_delta_mpts.1.as_ref() {
1325 delta_mpts.insert(delta_mpt.get_mpt_id(), delta_mpt.clone());
1326 }
1327 }
1328 if let Some((_mpt_id, delta_mpt)) = delta_mpts.iter().next() {
1329 delta_mpt.log_usage();
1330
1331 }
1335 }
1336
1337 pub fn load_persist_state(self: &Arc<Self>) -> Result<()> {
1338 let snapshot_info_map = &mut *self.snapshot_info_map_by_epoch.write();
1339
1340 self.snapshot_associated_mpts_by_epoch
1342 .write()
1343 .insert(NULL_EPOCH, (None, None));
1344 snapshot_info_map
1345 .insert(&NULL_EPOCH, SnapshotInfo::genesis_snapshot_info())?;
1346 self.current_snapshots
1347 .write()
1348 .push(SnapshotInfo::genesis_snapshot_info());
1349
1350 let snapshot_persist_state = self
1352 .snapshot_manager
1353 .get_snapshot_db_manager()
1354 .scan_persist_state(snapshot_info_map.get_map())?;
1355
1356 debug!("snapshot persist state {:?}", snapshot_persist_state);
1357
1358 *self.persist_state_from_initialization.write() = Some((
1359 snapshot_persist_state.temp_snapshot_db_existing,
1360 snapshot_persist_state.removed_snapshots,
1361 snapshot_persist_state.max_epoch_height,
1362 snapshot_persist_state.max_snapshot_epoch_height_has_mpt,
1363 ));
1364 self.snapshot_manager
1365 .get_snapshot_db_manager()
1366 .update_latest_snapshot_id(
1367 snapshot_persist_state.max_epoch_id,
1368 snapshot_persist_state.max_epoch_height,
1369 );
1370
1371 for snapshot_epoch_id in snapshot_persist_state.missing_snapshots {
1373 if snapshot_epoch_id == NULL_EPOCH {
1374 continue;
1375 }
1376 self.delta_db_manager
1378 .destroy_delta_db(
1379 &self
1380 .delta_db_manager
1381 .get_delta_db_name(&snapshot_epoch_id),
1382 )
1383 .or_else(|e| match &e {
1384 Error::Io(io_err) => match io_err.kind() {
1385 std::io::ErrorKind::NotFound => Ok(()),
1386 _ => Err(e),
1387 },
1388 _ => Err(e),
1389 })?;
1390 snapshot_info_map.remove(&snapshot_epoch_id)?;
1391 }
1392
1393 let (missing_delta_db_snapshots, delta_dbs) = self
1394 .delta_db_manager
1395 .scan_persist_state(snapshot_info_map.get_map())?;
1396
1397 let mut delta_mpts = HashMap::new();
1398 for (snapshot_epoch_id, delta_db) in delta_dbs {
1399 let mpt_id = self.delta_mpts_id_gen.lock().allocate()?;
1400 self.delta_mpt_open_db_lru.import(
1401 &snapshot_epoch_id,
1402 mpt_id,
1403 delta_db,
1404 )?;
1405 delta_mpts.insert(
1406 snapshot_epoch_id.clone(),
1407 Arc::new(DeltaMpt::new(
1408 self.delta_mpt_open_db_lru.clone(),
1409 snapshot_epoch_id.clone(),
1410 self.clone(),
1411 mpt_id,
1412 self.delta_mpts_node_memory_manager.clone(),
1413 )?),
1414 );
1415 }
1416
1417 for snapshot_epoch_id in missing_delta_db_snapshots {
1418 if snapshot_epoch_id == NULL_EPOCH {
1419 continue;
1420 }
1421 if let Some(snapshot_info) =
1425 snapshot_info_map.get(&snapshot_epoch_id)
1426 {
1427 if delta_mpts
1428 .contains_key(&snapshot_info.parent_snapshot_epoch_id)
1429 {
1430 continue;
1431 }
1432 }
1433 error!(
1434 "Missing intermediate mpt and delta mpt for snapshot {:?}",
1435 snapshot_epoch_id
1436 );
1437 snapshot_info_map.remove(&snapshot_epoch_id)?;
1438 self.snapshot_manager
1439 .get_snapshot_db_manager()
1440 .destroy_snapshot(&snapshot_epoch_id)?;
1441 }
1442
1443 let mut snapshots = snapshot_info_map
1445 .get_map()
1446 .iter()
1447 .map(|(_, snapshot_info)| snapshot_info.clone())
1448 .collect::<Vec<_>>();
1449 snapshots.sort_by(|x, y| x.height.partial_cmp(&y.height).unwrap());
1450
1451 let current_snapshots = &mut *self.current_snapshots.write();
1452 *current_snapshots = snapshots;
1453
1454 let snapshot_associated_mpts =
1455 &mut *self.snapshot_associated_mpts_by_epoch.write();
1456 for snapshot_info in current_snapshots {
1457 snapshot_associated_mpts.insert(
1458 snapshot_info.get_snapshot_epoch_id().clone(),
1459 (
1460 delta_mpts
1461 .get(&snapshot_info.parent_snapshot_epoch_id)
1462 .map(|x| x.clone()),
1463 delta_mpts
1464 .get(snapshot_info.get_snapshot_epoch_id())
1465 .map(|x| x.clone()),
1466 ),
1467 );
1468 }
1469
1470 Ok(())
1471 }
1472}
1473
1474fn extra_snapshots_to_keep_predicate(
1475 storage_conf: &StorageConfiguration, stable_checkpoint_height: u64,
1476 era_epoch_count: u64, height: u64,
1477 find_epoch_nearest_multiple_of: &mut bool,
1478) -> bool {
1479 for conf in &storage_conf.provide_more_snapshot_for_sync {
1480 match conf {
1481 ProvideExtraSnapshotSyncConfig::StableCheckpoint => {
1482 if height >= stable_checkpoint_height
1483 && (height - stable_checkpoint_height) % era_epoch_count
1484 == 0
1485 {
1486 return true;
1487 }
1488 let check_next_snapshot_height = height
1497 + (storage_conf.consensus_param.snapshot_epoch_count
1498 as u64);
1499 if (check_next_snapshot_height >= stable_checkpoint_height)
1500 && (check_next_snapshot_height - stable_checkpoint_height)
1501 % era_epoch_count
1502 == 0
1503 {
1504 return storage_conf.keep_snapshot_before_stable_checkpoint;
1505 }
1506
1507 if storage_conf.keep_era_genesis_snapshot {
1508 let era_genesis_snapshot_height =
1509 if stable_checkpoint_height
1510 >= storage_conf.consensus_param.era_epoch_count
1511 {
1512 stable_checkpoint_height
1513 - storage_conf.consensus_param.era_epoch_count
1514 } else {
1515 0
1516 };
1517
1518 if era_genesis_snapshot_height == height {
1519 return true;
1520 }
1521 }
1522 }
1523 ProvideExtraSnapshotSyncConfig::EpochNearestMultipleOf(
1524 multiple,
1525 ) => {
1526 if *find_epoch_nearest_multiple_of
1527 && height % (*multiple as u64) == 0
1528 {
1529 *find_epoch_nearest_multiple_of = false;
1530 return true;
1531 }
1532 }
1533 }
1534 }
1535 false
1536}
1537
1538struct MaybeDeltaTrieDestroyErrors {
1539 delta_trie_destroy_error_1: Cell<Option<Error>>,
1540 delta_trie_destroy_error_2: Cell<Option<Error>>,
1541}
1542
1543unsafe impl Sync for MaybeDeltaTrieDestroyErrors {}
1545
1546impl MaybeDeltaTrieDestroyErrors {
1547 fn new() -> Self {
1548 Self {
1549 delta_trie_destroy_error_1: Cell::new(None),
1550 delta_trie_destroy_error_2: Cell::new(None),
1551 }
1552 }
1553
1554 fn set_maybe_error(&self, e: Option<Error>) {
1555 self.delta_trie_destroy_error_2
1556 .replace(self.delta_trie_destroy_error_1.replace(e));
1557 }
1558
1559 fn take_result(&self) -> Result<()> {
1560 let e1 = self.delta_trie_destroy_error_1.take().map(|e| Box::new(e));
1561 let e2 = self.delta_trie_destroy_error_2.take().map(|e| Box::new(e));
1562 if e1.is_some() || e2.is_some() {
1563 Err(Error::DeltaMPTDestroyErrors { e1, e2 }.into())
1564 } else {
1565 Ok(())
1566 }
1567 }
1568}
1569
1570lazy_static! {
1571 static ref SNAPSHOT_KVDB_STATEMENTS: Arc<KvdbSqliteStatements> = Arc::new(
1572 KvdbSqliteStatements::make_statements(
1573 &["value"],
1574 &["BLOB"],
1575 &storage_dir::SNAPSHOT_INFO_DB_NAME,
1576 false
1577 )
1578 .unwrap()
1579 );
1580}
1581
1582use crate::{
1583 impls::{
1584 delta_mpt::{
1585 node_memory_manager::{
1586 DeltaMptsCacheAlgorithm, DeltaMptsNodeMemoryManager,
1587 },
1588 node_ref_map::DeltaMptId,
1589 },
1590 errors::*,
1591 state_manager::{DeltaDbManager, SnapshotDb, SnapshotDbManager},
1592 storage_db::{
1593 kvdb_sqlite::{
1594 kvdb_sqlite_iter_range_impl, KvdbSqliteDestructureTrait,
1595 KvdbSqliteStatements,
1596 },
1597 snapshot_kv_db_sqlite::test_lib::check_key_value_load,
1598 },
1599 storage_manager::snapshot_manager::SnapshotManager,
1600 },
1601 snapshot_manager::SnapshotManagerTrait,
1602 storage_db::{
1603 DeltaDbManagerTrait, KeyValueDbIterableTrait, SnapshotDbManagerTrait,
1604 SnapshotInfo, SnapshotKeptToProvideSyncStatus,
1605 },
1606 storage_dir,
1607 utils::guarded_value::GuardedValue,
1608 DeltaMpt, DeltaMptIdGen, DeltaMptIterator, KeyValueDbTrait, KvdbSqlite,
1609 OpenDeltaDbLru, ProvideExtraSnapshotSyncConfig, StateIndex,
1610 StateRootWithAuxInfo, StorageConfiguration,
1611};
1612use cfx_internal_common::{
1613 consensus_api::StateMaintenanceTrait, StateAvailabilityBoundary,
1614};
1615use fallible_iterator::FallibleIterator;
1616use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
1617use parking_lot::{Mutex, RwLock, RwLockReadGuard};
1618use primitives::{EpochId, MerkleHash, MERKLE_NULL_NODE, NULL_EPOCH};
1619use rlp::{Decodable, DecoderError, Encodable, Rlp};
1620use sqlite::Statement;
1621use std::{
1622 cell::Cell,
1623 collections::{HashMap, HashSet},
1624 fs,
1625 sync::{
1626 mpsc::{channel, Sender},
1627 Arc, Weak,
1628 },
1629 thread::{self, JoinHandle},
1630};