1use anyhow::{format_err, Result};
9use diem_crypto::{hash::SPARSE_MERKLE_PLACEHOLDER_HASH, HashValue};
10use diem_types::{
11 access_path::AccessPath,
12 account_address::AccountAddress,
13 account_state::AccountState,
14 account_state_blob::{AccountStateBlob, AccountStateWithProof},
15 committed_block::CommittedBlock,
16 contract_event::ContractEvent,
17 epoch_change::EpochChangeProof,
18 epoch_state::EpochState,
19 ledger_info::{
20 deserialize_ledger_info_unchecked, LedgerInfoWithSignatures,
21 },
22 move_resource::MoveStorage,
23 proof::{
24 definition::LeafCount, AccumulatorConsistencyProof, SparseMerkleProof,
25 },
26 reward_distribution_event::RewardDistributionEventV2,
27 term_state::PosState,
28 transaction::{
29 TransactionInfo, TransactionListWithProof, TransactionToCommit,
30 TransactionWithProof, Version,
31 },
32};
33use itertools::Itertools;
34use serde::{Deserialize, Serialize};
35use std::{
36 collections::{HashMap, HashSet},
37 convert::TryFrom,
38 sync::Arc,
39};
40use thiserror::Error;
41
42pub mod mock;
44pub mod state_view;
45
46#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
47pub struct StartupInfo {
48 #[serde(deserialize_with = "deserialize_ledger_info_unchecked")]
51 pub latest_ledger_info: LedgerInfoWithSignatures,
52 pub latest_epoch_state: Option<EpochState>,
55 pub committed_tree_state: TreeState,
56 pub synced_tree_state: Option<TreeState>,
57
58 pub committed_pos_state: PosState,
59}
60
61impl StartupInfo {
62 pub fn new(
63 latest_ledger_info: LedgerInfoWithSignatures,
64 latest_epoch_state: Option<EpochState>,
65 committed_tree_state: TreeState, synced_tree_state: Option<TreeState>,
66 committed_pos_state: PosState,
67 ) -> Self {
68 Self {
69 latest_ledger_info,
70 latest_epoch_state,
71 committed_tree_state,
72 synced_tree_state,
73 committed_pos_state,
74 }
75 }
76
77 #[cfg(any(feature = "fuzzing"))]
78 pub fn new_for_testing() -> Self {
79 use diem_types::on_chain_config::ValidatorSet;
80
81 let latest_ledger_info = LedgerInfoWithSignatures::genesis(
82 HashValue::zero(),
83 ValidatorSet::empty(),
84 );
85 let latest_epoch_state = None;
86 let committed_tree_state = TreeState {
87 num_transactions: 0,
88 ledger_frozen_subtree_hashes: Vec::new(),
89 account_state_root_hash: *SPARSE_MERKLE_PLACEHOLDER_HASH,
90 };
91 let synced_tree_state = None;
92
93 Self {
94 latest_ledger_info,
95 latest_epoch_state,
96 committed_tree_state,
97 synced_tree_state,
98 }
99 }
100
101 pub fn get_epoch_state(&self) -> &EpochState {
102 self.latest_ledger_info
103 .ledger_info()
104 .next_epoch_state()
105 .unwrap_or_else(|| {
106 self.latest_epoch_state
107 .as_ref()
108 .expect("EpochState must exist")
109 })
110 }
111}
112
113#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
114pub struct TreeState {
115 pub num_transactions: LeafCount,
116 pub ledger_frozen_subtree_hashes: Vec<HashValue>,
117 pub account_state_root_hash: HashValue,
118}
119
120impl TreeState {
121 pub fn new(
122 num_transactions: LeafCount,
123 ledger_frozen_subtree_hashes: Vec<HashValue>,
124 account_state_root_hash: HashValue,
125 ) -> Self {
126 Self {
127 num_transactions,
128 ledger_frozen_subtree_hashes,
129 account_state_root_hash,
130 }
131 }
132
133 pub fn describe(&self) -> &'static str {
134 if self.num_transactions != 0 {
135 "DB has been bootstrapped."
136 } else if self.account_state_root_hash
137 != *SPARSE_MERKLE_PLACEHOLDER_HASH
138 {
139 "DB has no transaction, but a non-empty pre-genesis state."
140 } else {
141 "DB is empty, has no transaction or state."
142 }
143 }
144}
145
146#[derive(Debug, Deserialize, Error, PartialEq, Serialize)]
147pub enum Error {
148 #[error("Service error: {:?}", error)]
149 ServiceError { error: String },
150
151 #[error("Serialization error: {0}")]
152 SerializationError(String),
153}
154
155impl From<anyhow::Error> for Error {
156 fn from(error: anyhow::Error) -> Self {
157 Self::ServiceError {
158 error: format!("{}", error),
159 }
160 }
161}
162
163impl From<bcs::Error> for Error {
164 fn from(error: bcs::Error) -> Self {
165 Self::SerializationError(format!("{}", error))
166 }
167}
168
169impl From<diem_secure_net::Error> for Error {
170 fn from(error: diem_secure_net::Error) -> Self {
171 Self::ServiceError {
172 error: format!("{}", error),
173 }
174 }
175}
176
177#[derive(Clone, Copy, Eq, PartialEq)]
178pub enum Order {
179 Ascending,
180 Descending,
181}
182
183pub trait DbReader: Send + Sync {
186 fn get_epoch_ending_ledger_infos(
191 &self, start_epoch: u64, end_epoch: u64,
192 ) -> Result<EpochChangeProof>;
193
194 fn get_transactions(
199 &self, start_version: Version, batch_size: u64,
200 ledger_version: Version, fetch_events: bool,
201 ) -> Result<TransactionListWithProof>;
202
203 fn get_block_timestamp(&self, version: u64) -> Result<u64>;
208
209 fn get_last_version_before_timestamp(
214 &self, _timestamp: u64, _ledger_version: Version,
215 ) -> Result<Version> {
216 unimplemented!()
217 }
218
219 fn get_latest_account_state(
224 &self, address: AccountAddress,
225 ) -> Result<Option<AccountStateBlob>>;
226
227 fn get_latest_ledger_info(&self) -> Result<LedgerInfoWithSignatures>;
229
230 fn get_latest_version(&self) -> Result<Version> {
232 Ok(self.get_latest_ledger_info()?.ledger_info().version())
233 }
234
235 fn get_latest_commit_metadata(&self) -> Result<(Version, u64)> {
237 let ledger_info_with_sig = self.get_latest_ledger_info()?;
238 let ledger_info = ledger_info_with_sig.ledger_info();
239 Ok((ledger_info.version(), ledger_info.timestamp_usecs()))
240 }
241
242 fn get_startup_info(
248 &self, need_pos_state: bool,
249 ) -> Result<Option<StartupInfo>>;
250
251 fn get_txn_by_account(
252 &self, address: AccountAddress, seq_num: u64, ledger_version: Version,
253 fetch_events: bool,
254 ) -> Result<Option<TransactionWithProof>>;
255
256 fn get_state_proof_with_ledger_info(
259 &self, known_version: u64, ledger_info: LedgerInfoWithSignatures,
260 ) -> Result<(EpochChangeProof, AccumulatorConsistencyProof)>;
261
262 fn get_state_proof(
264 &self, known_version: u64,
265 ) -> Result<(
266 LedgerInfoWithSignatures,
267 EpochChangeProof,
268 AccumulatorConsistencyProof,
269 )>;
270
271 fn get_account_state_with_proof(
274 &self, address: AccountAddress, version: Version,
275 ledger_version: Version,
276 ) -> Result<AccountStateWithProof>;
277
278 fn get_account_state_with_proof_by_version(
289 &self, address: AccountAddress, version: Version,
290 ) -> Result<(
291 Option<AccountStateBlob>,
292 SparseMerkleProof<AccountStateBlob>,
293 )>;
294
295 fn get_latest_state_root(&self) -> Result<(Version, HashValue)>;
300
301 fn get_latest_tree_state(&self) -> Result<TreeState>;
304
305 fn get_epoch_ending_ledger_info(
307 &self, known_version: u64,
308 ) -> Result<LedgerInfoWithSignatures>;
309
310 fn get_latest_transaction_info_option(
314 &self,
315 ) -> Result<Option<(Version, TransactionInfo)>> {
316 unimplemented!()
317 }
318
319 fn get_accumulator_root_hash(
323 &self, _version: Version,
324 ) -> Result<HashValue> {
325 unimplemented!()
326 }
327
328 fn get_pos_state(&self, _block_id: &HashValue) -> Result<PosState> {
329 unimplemented!()
330 }
331
332 fn get_latest_pos_state(&self) -> Arc<PosState> { unimplemented!() }
333}
334
335impl MoveStorage for &dyn DbReader {
336 fn batch_fetch_resources(
337 &self, access_paths: Vec<AccessPath>,
338 ) -> Result<Vec<Vec<u8>>> {
339 self.batch_fetch_resources_by_version(
340 access_paths,
341 self.fetch_synced_version()?,
342 )
343 }
344
345 fn batch_fetch_resources_by_version(
346 &self, access_paths: Vec<AccessPath>, version: Version,
347 ) -> Result<Vec<Vec<u8>>> {
348 let addresses: Vec<AccountAddress> = access_paths
349 .iter()
350 .collect::<HashSet<_>>()
351 .iter()
352 .map(|path| path.address)
353 .collect();
354
355 let results = addresses
356 .iter()
357 .map(|addr| {
358 self.get_account_state_with_proof_by_version(*addr, version)
359 })
360 .collect::<Result<Vec<_>>>()?;
361
362 let account_states = addresses
364 .iter()
365 .zip_eq(results)
366 .map(|(addr, (blob, _proof))| {
367 let account_state = AccountState::try_from(&blob.ok_or_else(|| {
368 format_err!("missing blob in account state/account does not exist")
369 })?)?;
370 Ok((addr, account_state))
371 })
372 .collect::<Result<HashMap<_, AccountState>>>()?;
373
374 access_paths
375 .iter()
376 .map(|path| {
377 Ok(account_states
378 .get(&path.address)
379 .ok_or_else(|| {
380 format_err!(
381 "missing account state for queried access path"
382 )
383 })?
384 .get(&path.path)
385 .ok_or_else(|| {
386 format_err!("no value found in account state")
387 })?
388 .clone())
389 })
390 .collect()
391 }
392
393 fn fetch_synced_version(&self) -> Result<u64> {
394 let (synced_version, _) = self
395 .get_latest_transaction_info_option()
396 .map_err(|e| {
397 format_err!(
398 "[MoveStorage] Failed fetching latest transaction info: {}",
399 e
400 )
401 })?
402 .ok_or_else(|| {
403 format_err!("[MoveStorage] Latest transaction info not found.")
404 })?;
405 Ok(synced_version)
406 }
407}
408
409pub trait DbWriter: Send + Sync {
412 fn save_transactions(
419 &self, txns_to_commit: &[TransactionToCommit], first_version: Version,
420 ledger_info_with_sigs: Option<&LedgerInfoWithSignatures>,
421 pos_state: Option<PosState>, committed_blocks: Vec<CommittedBlock>,
422 ledger_infos_with_voted_block: Vec<(
423 HashValue,
424 LedgerInfoWithSignatures,
425 )>,
426 ) -> Result<()>;
427
428 fn save_reward_event(
429 &self, epoch: u64, event: &RewardDistributionEventV2,
430 ) -> Result<()>;
431
432 fn delete_pos_state_by_block(&self, block_id: &HashValue) -> Result<()>;
433}
434
435#[derive(Clone)]
436pub struct DbReaderWriter {
437 pub reader: Arc<dyn DbReader>,
438 pub writer: Arc<dyn DbWriter>,
439}
440
441impl DbReaderWriter {
442 pub fn new<D: 'static + DbReader + DbWriter>(db: D) -> Self {
443 let reader = Arc::new(db);
444 let writer = Arc::clone(&reader);
445
446 Self { reader, writer }
447 }
448
449 pub fn from_arc<D: 'static + DbReader + DbWriter>(arc_db: Arc<D>) -> Self {
450 let reader = Arc::clone(&arc_db);
451 let writer = Arc::clone(&arc_db);
452
453 Self { reader, writer }
454 }
455
456 pub fn wrap<D: 'static + DbReader + DbWriter>(db: D) -> (Arc<D>, Self) {
457 let arc_db = Arc::new(db);
458 (Arc::clone(&arc_db), Self::from_arc(arc_db))
459 }
460}
461
462impl<D> From<D> for DbReaderWriter
463where D: 'static + DbReader + DbWriter
464{
465 fn from(db: D) -> Self { Self::new(db) }
466}
467
468#[derive(Clone, Debug, Deserialize, Serialize)]
470pub enum StorageRequest {
471 GetAccountStateWithProofByVersionRequest(
472 Box<GetAccountStateWithProofByVersionRequest>,
473 ),
474 GetStartupInfoRequest,
475 SaveTransactionsRequest(Box<SaveTransactionsRequest>),
476}
477
478#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
479pub struct GetAccountStateWithProofByVersionRequest {
480 pub address: AccountAddress,
482
483 pub version: Version,
485}
486
487impl GetAccountStateWithProofByVersionRequest {
488 pub fn new(address: AccountAddress, version: Version) -> Self {
490 Self { address, version }
491 }
492}
493
494#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
495pub struct SaveTransactionsRequest {
496 pub txns_to_commit: Vec<TransactionToCommit>,
497 pub first_version: Version,
498 pub ledger_info_with_signatures: Option<LedgerInfoWithSignatures>,
499}
500
501impl SaveTransactionsRequest {
502 pub fn new(
504 txns_to_commit: Vec<TransactionToCommit>, first_version: Version,
505 ledger_info_with_signatures: Option<LedgerInfoWithSignatures>,
506 ) -> Self {
507 SaveTransactionsRequest {
508 txns_to_commit,
509 first_version,
510 ledger_info_with_signatures,
511 }
512 }
513}
514
515pub trait DBReaderForPoW: Send + Sync + DbReader {
516 fn get_latest_ledger_info_option(&self)
517 -> Option<LedgerInfoWithSignatures>;
518
519 fn get_block_ledger_info(
521 &self, consensus_block_id: &HashValue,
522 ) -> Result<LedgerInfoWithSignatures>;
523
524 fn get_events_by_version(
525 &self, start_version: u64, end_version: u64,
526 ) -> Result<Vec<ContractEvent>>;
527
528 fn get_epoch_ending_blocks(
529 &self, start_epoch: u64, end_epoch: u64,
530 ) -> Result<Vec<HashValue>>;
531
532 fn get_reward_event(&self, epoch: u64)
533 -> Result<RewardDistributionEventV2>;
534
535 fn get_committed_block_by_hash(
536 &self, block_hash: &HashValue,
537 ) -> Result<CommittedBlock>;
538
539 fn get_committed_block_hash_by_view(&self, view: u64) -> Result<HashValue>;
540
541 fn get_ledger_info_by_voted_block(
542 &self, block_id: &HashValue,
543 ) -> Result<LedgerInfoWithSignatures>;
544
545 fn get_block_hash_by_epoch_and_round(
546 &self, epoch: u64, round: u64,
547 ) -> Result<HashValue>;
548}