1use crate::{
9 event_store::EventStore,
10 ledger_store::LedgerStore,
11 metrics::{
12 BACKUP_EPOCH_ENDING_EPOCH, BACKUP_STATE_SNAPSHOT_LEAF_IDX,
13 BACKUP_STATE_SNAPSHOT_VERSION, BACKUP_TXN_VERSION,
14 },
15 state_store::StateStore,
16 transaction_store::TransactionStore,
17};
18use anyhow::{anyhow, ensure, Result};
19use diem_crypto::hash::HashValue;
20use diem_jellyfish_merkle::iterator::JellyfishMerkleIterator;
21use diem_types::{
22 account_state_blob::AccountStateBlob,
23 contract_event::ContractEvent,
24 ledger_info::LedgerInfoWithSignatures,
25 proof::{
26 SparseMerkleRangeProof, TransactionAccumulatorRangeProof,
27 TransactionInfoWithProof,
28 },
29 transaction::{Transaction, TransactionInfo, Version},
30};
31use itertools::zip_eq;
32use serde::{Deserialize, Serialize};
33use std::{fmt, sync::Arc};
34
35#[derive(Clone)]
37pub struct BackupHandler {
38 ledger_store: Arc<LedgerStore>,
39 transaction_store: Arc<TransactionStore>,
40 state_store: Arc<StateStore>,
41 event_store: Arc<EventStore>,
42}
43
44impl BackupHandler {
45 pub(crate) fn new(
46 ledger_store: Arc<LedgerStore>,
47 transaction_store: Arc<TransactionStore>, state_store: Arc<StateStore>,
48 event_store: Arc<EventStore>,
49 ) -> Self {
50 Self {
51 ledger_store,
52 transaction_store,
53 state_store,
54 event_store,
55 }
56 }
57
58 pub fn get_transaction_iter(
60 &self, start_version: Version, num_transactions: usize,
61 ) -> Result<
62 impl Iterator<
63 Item = Result<(
64 Transaction,
65 TransactionInfo,
66 Vec<ContractEvent>,
67 )>,
68 > + '_,
69 > {
70 let txn_iter = self
71 .transaction_store
72 .get_transaction_iter(start_version, num_transactions)?;
73 let txn_info_iter = self
74 .ledger_store
75 .get_transaction_info_iter(start_version, num_transactions)?;
76 let events_iter = self
77 .event_store
78 .get_events_by_version_iter(start_version, num_transactions)?;
79
80 let zipped = zip_eq(zip_eq(txn_iter, txn_info_iter), events_iter)
81 .enumerate()
82 .map(move |(idx, ((txn_res, txn_info_res), events_res))| {
83 BACKUP_TXN_VERSION
84 .set((start_version.wrapping_add(idx as u64)) as i64);
85 Ok((txn_res?, txn_info_res?, events_res?))
86 });
87 Ok(zipped)
88 }
89
90 pub fn get_transaction_range_proof(
94 &self, first_version: Version, last_version: Version,
95 ) -> Result<(TransactionAccumulatorRangeProof, LedgerInfoWithSignatures)>
96 {
97 ensure!(
98 last_version >= first_version,
99 "Bad transaction range: [{}, {}]",
100 first_version,
101 last_version
102 );
103 let num_transactions = last_version - first_version + 1;
104 let epoch = self.ledger_store.get_epoch(last_version)?;
105 let ledger_info =
106 self.ledger_store.get_latest_ledger_info_in_epoch(epoch)?;
107 let accumulator_proof = self.ledger_store.get_transaction_range_proof(
108 Some(first_version),
109 num_transactions,
110 ledger_info.ledger_info().version(),
111 )?;
112 Ok((accumulator_proof, ledger_info))
113 }
114
115 pub fn get_account_iter(
117 &self, version: Version,
118 ) -> Result<
119 Box<
120 dyn Iterator<Item = Result<(HashValue, AccountStateBlob)>>
121 + Send
122 + Sync,
123 >,
124 > {
125 let iterator = JellyfishMerkleIterator::new(
126 Arc::clone(&self.state_store),
127 version,
128 HashValue::zero(),
129 )?
130 .enumerate()
131 .map(move |(idx, res)| {
132 BACKUP_STATE_SNAPSHOT_VERSION.set(version as i64);
133 BACKUP_STATE_SNAPSHOT_LEAF_IDX.set(idx as i64);
134 res
135 });
136 Ok(Box::new(iterator))
137 }
138
139 pub fn get_account_state_range_proof(
141 &self, rightmost_key: HashValue, version: Version,
142 ) -> Result<SparseMerkleRangeProof> {
143 self.state_store
144 .get_account_state_range_proof(rightmost_key, version)
145 }
146
147 pub fn get_db_state(&self) -> Result<Option<DbState>> {
149 self.ledger_store
150 .get_startup_info(false)?
151 .map(|s| {
152 Ok(DbState {
153 epoch: s.get_epoch_state().epoch,
154 committed_version: s
155 .committed_tree_state
156 .num_transactions
157 .checked_sub(1)
158 .ok_or_else(|| {
159 anyhow!("Bootstrapped DB has no transactions.")
160 })?,
161 synced_version: s
162 .synced_tree_state
163 .as_ref()
164 .unwrap_or(&s.committed_tree_state)
165 .num_transactions
166 .checked_sub(1)
167 .ok_or_else(|| {
168 anyhow!("Bootstrapped DB has no transactions.")
169 })?,
170 })
171 })
172 .transpose()
173 }
174
175 pub fn get_state_root_proof(
179 &self, version: Version,
180 ) -> Result<(TransactionInfoWithProof, LedgerInfoWithSignatures)> {
181 let epoch = self.ledger_store.get_epoch(version)?;
182 let ledger_info =
183 self.ledger_store.get_latest_ledger_info_in_epoch(epoch)?;
184 let txn_info = self.ledger_store.get_transaction_info_with_proof(
185 version,
186 ledger_info.ledger_info().version(),
187 )?;
188
189 Ok((txn_info, ledger_info))
190 }
191
192 pub fn get_epoch_ending_ledger_info_iter(
193 &self, start_epoch: u64, end_epoch: u64,
194 ) -> Result<impl Iterator<Item = Result<LedgerInfoWithSignatures>> + '_>
195 {
196 Ok(self
197 .ledger_store
198 .get_epoch_ending_ledger_info_iter(start_epoch, end_epoch)?
199 .enumerate()
200 .map(move |(idx, li)| {
201 BACKUP_EPOCH_ENDING_EPOCH
202 .set((start_epoch + idx as u64) as i64);
203 li
204 }))
205 }
206}
207
208#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
209pub struct DbState {
210 pub epoch: u64,
211 pub committed_version: Version,
212 pub synced_version: Version,
213}
214
215impl fmt::Display for DbState {
216 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
217 write!(
218 f,
219 "epoch: {}, committed_version: {}, synced_version: {}",
220 self.epoch, self.committed_version, self.synced_version,
221 )
222 }
223}