1use crate::{
9 change_set::ChangeSet, event_store::EventStore, ledger_store::LedgerStore,
10 schema::transaction_accumulator::TransactionAccumulatorSchema,
11 state_store::StateStore, transaction_store::TransactionStore, PosLedgerDB,
12};
13use anyhow::{ensure, Result};
14use diem_crypto::{hash::SPARSE_MERKLE_PLACEHOLDER_HASH, HashValue};
15use diem_jellyfish_merkle::restore::JellyfishMerkleRestore;
16use diem_types::{
17 account_state_blob::AccountStateBlob,
18 contract_event::ContractEvent,
19 ledger_info::LedgerInfoWithSignatures,
20 proof::{definition::LeafCount, position::FrozenSubTreeIterator},
21 transaction::{Transaction, TransactionInfo, Version, PRE_GENESIS_VERSION},
22};
23use schemadb::DB;
24use std::sync::Arc;
25use storage_interface::{DbReader, TreeState};
26
27#[derive(Clone)]
29pub struct RestoreHandler {
30 db: Arc<DB>,
31 pub diemdb: Arc<PosLedgerDB>,
32 ledger_store: Arc<LedgerStore>,
33 transaction_store: Arc<TransactionStore>,
34 state_store: Arc<StateStore>,
35 event_store: Arc<EventStore>,
36}
37
38impl RestoreHandler {
39 pub(crate) fn new(
40 db: Arc<DB>, diemdb: Arc<PosLedgerDB>, ledger_store: Arc<LedgerStore>,
41 transaction_store: Arc<TransactionStore>, state_store: Arc<StateStore>,
42 event_store: Arc<EventStore>,
43 ) -> Self {
44 Self {
45 db,
46 diemdb,
47 ledger_store,
48 transaction_store,
49 state_store,
50 event_store,
51 }
52 }
53
54 pub fn get_state_restore_receiver(
55 &self, version: Version, expected_root_hash: HashValue,
56 ) -> Result<JellyfishMerkleRestore<AccountStateBlob>> {
57 JellyfishMerkleRestore::new_overwrite(
58 Arc::clone(&self.state_store),
59 version,
60 expected_root_hash,
61 )
62 }
63
64 pub fn save_ledger_infos(
65 &self, ledger_infos: &[LedgerInfoWithSignatures],
66 ) -> Result<()> {
67 ensure!(!ledger_infos.is_empty(), "No LedgerInfos to save.");
68
69 let mut cs = ChangeSet::new();
70 ledger_infos
71 .iter()
72 .map(|li| self.ledger_store.put_ledger_info(li, &mut cs))
73 .collect::<Result<Vec<_>>>()?;
74 self.db.write_schemas(cs.batch, false)?;
75
76 if let Some(li) = self.ledger_store.get_latest_ledger_info_option() {
77 if li.ledger_info().epoch()
78 > ledger_infos.last().unwrap().ledger_info().epoch()
79 {
80 return Ok(());
82 }
83 }
84
85 self.ledger_store
86 .set_latest_ledger_info(ledger_infos.last().unwrap().clone());
87 Ok(())
88 }
89
90 pub fn confirm_or_save_frozen_subtrees(
91 &self, num_leaves: LeafCount, frozen_subtrees: &[HashValue],
92 ) -> Result<()> {
93 let mut cs = ChangeSet::new();
94 let positions: Vec<_> =
95 FrozenSubTreeIterator::new(num_leaves).collect();
96
97 ensure!(
98 positions.len() == frozen_subtrees.len(),
99 "Number of frozen subtree roots not expected. Expected: {}, actual: {}",
100 positions.len(),
101 frozen_subtrees.len(),
102 );
103
104 positions
105 .iter()
106 .zip(frozen_subtrees.iter().rev())
107 .map(|(p, h)| {
108 if let Some(_h) = self.db.get::<TransactionAccumulatorSchema>(&p)? {
109 ensure!(
110 h == &_h,
111 "Frozen subtree root does not match that already in DB. Provided: {}, in db: {}.",
112 h,
113 _h,
114 );
115 } else {
116 cs.batch.put::<TransactionAccumulatorSchema>(p, h)?;
117 }
118 Ok(())
119 })
120 .collect::<Result<Vec<_>>>()?;
121 self.db.write_schemas(cs.batch, false)
122 }
123
124 pub fn save_transactions(
125 &self, first_version: Version, txns: &[Transaction],
126 txn_infos: &[TransactionInfo], events: &[Vec<ContractEvent>],
127 ) -> Result<()> {
128 let mut cs = ChangeSet::new();
129 for (idx, txn) in txns.iter().enumerate() {
130 self.transaction_store.put_transaction(
131 first_version + idx as Version,
132 txn,
133 &mut cs,
134 )?;
135 }
136 self.ledger_store.put_transaction_infos(
137 first_version,
138 txn_infos,
139 &mut cs,
140 )?;
141 self.event_store.put_events_multiple_versions(
142 first_version,
143 events,
144 &mut cs,
145 )?;
146
147 self.db.write_schemas(cs.batch, false)
148 }
149
150 pub fn get_tree_state(
151 &self, num_transactions: LeafCount,
152 ) -> Result<TreeState> {
153 let frozen_subtrees = self
154 .ledger_store
155 .get_frozen_subtree_hashes(num_transactions)?;
156 let state_root_hash = if num_transactions == 0 {
157 self.state_store
158 .get_root_hash_option(PRE_GENESIS_VERSION)?
159 .unwrap_or(*SPARSE_MERKLE_PLACEHOLDER_HASH)
160 } else {
161 self.state_store.get_root_hash(num_transactions - 1)?
162 };
163
164 Ok(TreeState::new(
165 num_transactions,
166 frozen_subtrees,
167 state_root_hash,
168 ))
169 }
170
171 pub fn get_next_expected_transaction_version(&self) -> Result<Version> {
172 Ok(self
173 .diemdb
174 .get_latest_transaction_info_option()?
175 .map_or(0, |(ver, _txn_info)| ver + 1))
176 }
177}