pos_ledger_db/backup/
backup_handler.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8use crate::{
9    event_store::EventStore,
10    ledger_store::LedgerStore,
11    metrics::{
12        BACKUP_EPOCH_ENDING_EPOCH, BACKUP_STATE_SNAPSHOT_LEAF_IDX,
13        BACKUP_STATE_SNAPSHOT_VERSION, BACKUP_TXN_VERSION,
14    },
15    state_store::StateStore,
16    transaction_store::TransactionStore,
17};
18use anyhow::{anyhow, ensure, Result};
19use diem_crypto::hash::HashValue;
20use diem_jellyfish_merkle::iterator::JellyfishMerkleIterator;
21use diem_types::{
22    account_state_blob::AccountStateBlob,
23    contract_event::ContractEvent,
24    ledger_info::LedgerInfoWithSignatures,
25    proof::{
26        SparseMerkleRangeProof, TransactionAccumulatorRangeProof,
27        TransactionInfoWithProof,
28    },
29    transaction::{Transaction, TransactionInfo, Version},
30};
31use itertools::zip_eq;
32use serde::{Deserialize, Serialize};
33use std::{fmt, sync::Arc};
34
35/// `BackupHandler` provides functionalities for DiemDB data backup.
36#[derive(Clone)]
37pub struct BackupHandler {
38    ledger_store: Arc<LedgerStore>,
39    transaction_store: Arc<TransactionStore>,
40    state_store: Arc<StateStore>,
41    event_store: Arc<EventStore>,
42}
43
44impl BackupHandler {
45    pub(crate) fn new(
46        ledger_store: Arc<LedgerStore>,
47        transaction_store: Arc<TransactionStore>, state_store: Arc<StateStore>,
48        event_store: Arc<EventStore>,
49    ) -> Self {
50        Self {
51            ledger_store,
52            transaction_store,
53            state_store,
54            event_store,
55        }
56    }
57
58    /// Gets an iterator that yields a range of transactions.
59    pub fn get_transaction_iter(
60        &self, start_version: Version, num_transactions: usize,
61    ) -> Result<
62        impl Iterator<
63                Item = Result<(
64                    Transaction,
65                    TransactionInfo,
66                    Vec<ContractEvent>,
67                )>,
68            > + '_,
69    > {
70        let txn_iter = self
71            .transaction_store
72            .get_transaction_iter(start_version, num_transactions)?;
73        let txn_info_iter = self
74            .ledger_store
75            .get_transaction_info_iter(start_version, num_transactions)?;
76        let events_iter = self
77            .event_store
78            .get_events_by_version_iter(start_version, num_transactions)?;
79
80        let zipped = zip_eq(zip_eq(txn_iter, txn_info_iter), events_iter)
81            .enumerate()
82            .map(move |(idx, ((txn_res, txn_info_res), events_res))| {
83                BACKUP_TXN_VERSION
84                    .set((start_version.wrapping_add(idx as u64)) as i64);
85                Ok((txn_res?, txn_info_res?, events_res?))
86            });
87        Ok(zipped)
88    }
89
90    /// Gets the proof for a transaction chunk.
91    /// N.B. the `LedgerInfo` returned will always be in the same epoch of the
92    /// `last_version`.
93    pub fn get_transaction_range_proof(
94        &self, first_version: Version, last_version: Version,
95    ) -> Result<(TransactionAccumulatorRangeProof, LedgerInfoWithSignatures)>
96    {
97        ensure!(
98            last_version >= first_version,
99            "Bad transaction range: [{}, {}]",
100            first_version,
101            last_version
102        );
103        let num_transactions = last_version - first_version + 1;
104        let epoch = self.ledger_store.get_epoch(last_version)?;
105        let ledger_info =
106            self.ledger_store.get_latest_ledger_info_in_epoch(epoch)?;
107        let accumulator_proof = self.ledger_store.get_transaction_range_proof(
108            Some(first_version),
109            num_transactions,
110            ledger_info.ledger_info().version(),
111        )?;
112        Ok((accumulator_proof, ledger_info))
113    }
114
115    /// Gets an iterator which can yield all accounts in the state tree.
116    pub fn get_account_iter(
117        &self, version: Version,
118    ) -> Result<
119        Box<
120            dyn Iterator<Item = Result<(HashValue, AccountStateBlob)>>
121                + Send
122                + Sync,
123        >,
124    > {
125        let iterator = JellyfishMerkleIterator::new(
126            Arc::clone(&self.state_store),
127            version,
128            HashValue::zero(),
129        )?
130        .enumerate()
131        .map(move |(idx, res)| {
132            BACKUP_STATE_SNAPSHOT_VERSION.set(version as i64);
133            BACKUP_STATE_SNAPSHOT_LEAF_IDX.set(idx as i64);
134            res
135        });
136        Ok(Box::new(iterator))
137    }
138
139    /// Gets the proof that proves a range of accounts.
140    pub fn get_account_state_range_proof(
141        &self, rightmost_key: HashValue, version: Version,
142    ) -> Result<SparseMerkleRangeProof> {
143        self.state_store
144            .get_account_state_range_proof(rightmost_key, version)
145    }
146
147    /// Gets the epoch, committed version, and synced version of the DB.
148    pub fn get_db_state(&self) -> Result<Option<DbState>> {
149        self.ledger_store
150            .get_startup_info(false)?
151            .map(|s| {
152                Ok(DbState {
153                    epoch: s.get_epoch_state().epoch,
154                    committed_version: s
155                        .committed_tree_state
156                        .num_transactions
157                        .checked_sub(1)
158                        .ok_or_else(|| {
159                            anyhow!("Bootstrapped DB has no transactions.")
160                        })?,
161                    synced_version: s
162                        .synced_tree_state
163                        .as_ref()
164                        .unwrap_or(&s.committed_tree_state)
165                        .num_transactions
166                        .checked_sub(1)
167                        .ok_or_else(|| {
168                            anyhow!("Bootstrapped DB has no transactions.")
169                        })?,
170                })
171            })
172            .transpose()
173    }
174
175    /// Gets the proof of the state root at specified version.
176    /// N.B. the `LedgerInfo` returned will always be in the same epoch of the
177    /// version.
178    pub fn get_state_root_proof(
179        &self, version: Version,
180    ) -> Result<(TransactionInfoWithProof, LedgerInfoWithSignatures)> {
181        let epoch = self.ledger_store.get_epoch(version)?;
182        let ledger_info =
183            self.ledger_store.get_latest_ledger_info_in_epoch(epoch)?;
184        let txn_info = self.ledger_store.get_transaction_info_with_proof(
185            version,
186            ledger_info.ledger_info().version(),
187        )?;
188
189        Ok((txn_info, ledger_info))
190    }
191
192    pub fn get_epoch_ending_ledger_info_iter(
193        &self, start_epoch: u64, end_epoch: u64,
194    ) -> Result<impl Iterator<Item = Result<LedgerInfoWithSignatures>> + '_>
195    {
196        Ok(self
197            .ledger_store
198            .get_epoch_ending_ledger_info_iter(start_epoch, end_epoch)?
199            .enumerate()
200            .map(move |(idx, li)| {
201                BACKUP_EPOCH_ENDING_EPOCH
202                    .set((start_epoch + idx as u64) as i64);
203                li
204            }))
205    }
206}
207
208#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
209pub struct DbState {
210    pub epoch: u64,
211    pub committed_version: Version,
212    pub synced_version: Version,
213}
214
215impl fmt::Display for DbState {
216    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
217        write!(
218            f,
219            "epoch: {}, committed_version: {}, synced_version: {}",
220            self.epoch, self.committed_version, self.synced_version,
221        )
222    }
223}