1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
// Copyright 2019 Conflux Foundation. All rights reserved.
// Conflux is free software and distributed under GNU General Public License.
// See http://www.gnu.org/licenses/

use crate::sync::state::storage::{Chunk, ChunkKey};
use cfx_storage::{
    state_manager::StateManager,
    storage_db::{SnapshotDbManagerTrait, SnapshotInfo},
    FullSyncVerifier, Result as StorageResult, SnapshotDbManagerSqlite,
};
use primitives::{EpochId, MerkleHash, NULL_EPOCH};
use std::sync::Arc;

pub struct Restorer {
    pub snapshot_epoch_id: EpochId,
    pub snapshot_merkle_root: MerkleHash,

    /// The verifier for chunks.
    /// Initialized after receiving a valid manifest.
    verifier: Option<FullSyncVerifier<SnapshotDbManagerSqlite>>,
}

impl Restorer {
    pub fn new(
        snapshot_epoch_id: EpochId, snapshot_merkle_root: MerkleHash,
    ) -> Self {
        Restorer {
            snapshot_epoch_id,
            snapshot_merkle_root,
            verifier: None,
        }
    }

    pub fn initialize_verifier(
        &mut self, verifier: FullSyncVerifier<SnapshotDbManagerSqlite>,
    ) {
        self.verifier = Some(verifier);
    }

    /// Append a chunk for restoration.
    pub fn append(&mut self, key: ChunkKey, chunk: Chunk) -> bool {
        match &mut self.verifier {
            // Not waiting for chunks
            None => false,
            Some(verifier) => {
                match verifier.restore_chunk(
                    &key.upper_bound_excl,
                    &chunk.keys,
                    chunk.values,
                ) {
                    Ok(true) => true,
                    Ok(false) => false,
                    Err(e) => {
                        warn!("error for restore_chunk: err={:?}", e);
                        false
                    }
                }
            }
        }
    }

    /// Start to restore chunks asynchronously.
    pub fn finalize_restoration(
        &mut self, state_manager: Arc<StateManager>,
        snapshot_info: SnapshotInfo,
        parent_snapshot_info: Option<SnapshotInfo>,
        intermediate_trie_root_merkle: MerkleHash,
    ) -> StorageResult<()> {
        // Release temp snapshot db so it can be renamed on Windows.
        // `self.verifier()` is never unwrapped, so it's safe to set it to None,
        self.verifier = None;

        let storage_manager = state_manager.get_storage_manager_arc();

        let parent_snapshot = if let Some(snapshot_info) =
            parent_snapshot_info.clone()
        {
            let snapshot_epoch_id = snapshot_info.get_snapshot_epoch_id();
            if let Some(snapshot_info_in_db) =
                storage_manager.get_snapshot_info_at_epoch(snapshot_epoch_id)
            {
                if snapshot_info.merkle_root != snapshot_info_in_db.merkle_root
                    || snapshot_info.get_snapshot_epoch_id()
                        != snapshot_info_in_db.get_snapshot_epoch_id()
                {
                    debug!("destory snapshot {:?}", snapshot_epoch_id);
                    storage_manager
                        .get_snapshot_manager()
                        .get_snapshot_db_manager()
                        .destroy_snapshot(snapshot_epoch_id)?;
                    Some(snapshot_info)
                } else {
                    None
                }
            } else {
                Some(snapshot_info)
            }
        } else {
            None
        };

        let mut snapshot_info_map_locked = storage_manager
            .get_snapshot_manager()
            .get_snapshot_db_manager()
            .finalize_full_sync_snapshot(
                &self.snapshot_epoch_id,
                &self.snapshot_merkle_root,
                &storage_manager.snapshot_info_map_by_epoch,
            )?;

        if let Some(parent_snapshot) = parent_snapshot {
            storage_manager.register_new_snapshot(
                parent_snapshot,
                &mut snapshot_info_map_locked,
            )?;
        }

        let snapshot_height = snapshot_info.height;
        storage_manager.register_new_snapshot(
            snapshot_info,
            &mut snapshot_info_map_locked,
        )?;

        debug!(
            "intermediate_trie_root_merkle for next epoch in finalize_restoration {:?}",
            intermediate_trie_root_merkle
        );
        *storage_manager.intermediate_trie_root_merkle.write() =
            Some(intermediate_trie_root_merkle);

        // rewrite for special case
        *storage_manager.persist_state_from_initialization.write() = Some((
            None,
            std::collections::HashSet::from([NULL_EPOCH]),
            snapshot_height,
            None,
        ));

        debug!("Completed snapshot restoration.");
        Ok(())
    }
}