storage_interface/
lib.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8use anyhow::{format_err, Result};
9use diem_crypto::{hash::SPARSE_MERKLE_PLACEHOLDER_HASH, HashValue};
10use diem_types::{
11    access_path::AccessPath,
12    account_address::AccountAddress,
13    account_state::AccountState,
14    account_state_blob::{AccountStateBlob, AccountStateWithProof},
15    committed_block::CommittedBlock,
16    contract_event::ContractEvent,
17    epoch_change::EpochChangeProof,
18    epoch_state::EpochState,
19    ledger_info::{
20        deserialize_ledger_info_unchecked, LedgerInfoWithSignatures,
21    },
22    move_resource::MoveStorage,
23    proof::{
24        definition::LeafCount, AccumulatorConsistencyProof, SparseMerkleProof,
25    },
26    reward_distribution_event::RewardDistributionEventV2,
27    term_state::PosState,
28    transaction::{
29        TransactionInfo, TransactionListWithProof, TransactionToCommit,
30        TransactionWithProof, Version,
31    },
32};
33use itertools::Itertools;
34use serde::{Deserialize, Serialize};
35use std::{
36    collections::{HashMap, HashSet},
37    convert::TryFrom,
38    sync::Arc,
39};
40use thiserror::Error;
41
42// #[cfg(any(feature = "testing", feature = "fuzzing"))]
43pub mod mock;
44pub mod state_view;
45
46#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
47pub struct StartupInfo {
48    /// The latest ledger info.
49    /// This struct is only used locally, so loaded signatures must be valid.
50    #[serde(deserialize_with = "deserialize_ledger_info_unchecked")]
51    pub latest_ledger_info: LedgerInfoWithSignatures,
52    /// If the above ledger info doesn't carry a validator set, the latest
53    /// validator set. Otherwise `None`.
54    pub latest_epoch_state: Option<EpochState>,
55    pub committed_tree_state: TreeState,
56    pub synced_tree_state: Option<TreeState>,
57
58    pub committed_pos_state: PosState,
59}
60
61impl StartupInfo {
62    pub fn new(
63        latest_ledger_info: LedgerInfoWithSignatures,
64        latest_epoch_state: Option<EpochState>,
65        committed_tree_state: TreeState, synced_tree_state: Option<TreeState>,
66        committed_pos_state: PosState,
67    ) -> Self {
68        Self {
69            latest_ledger_info,
70            latest_epoch_state,
71            committed_tree_state,
72            synced_tree_state,
73            committed_pos_state,
74        }
75    }
76
77    #[cfg(any(feature = "fuzzing"))]
78    pub fn new_for_testing() -> Self {
79        use diem_types::on_chain_config::ValidatorSet;
80
81        let latest_ledger_info = LedgerInfoWithSignatures::genesis(
82            HashValue::zero(),
83            ValidatorSet::empty(),
84        );
85        let latest_epoch_state = None;
86        let committed_tree_state = TreeState {
87            num_transactions: 0,
88            ledger_frozen_subtree_hashes: Vec::new(),
89            account_state_root_hash: *SPARSE_MERKLE_PLACEHOLDER_HASH,
90        };
91        let synced_tree_state = None;
92
93        Self {
94            latest_ledger_info,
95            latest_epoch_state,
96            committed_tree_state,
97            synced_tree_state,
98        }
99    }
100
101    pub fn get_epoch_state(&self) -> &EpochState {
102        self.latest_ledger_info
103            .ledger_info()
104            .next_epoch_state()
105            .unwrap_or_else(|| {
106                self.latest_epoch_state
107                    .as_ref()
108                    .expect("EpochState must exist")
109            })
110    }
111}
112
113#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
114pub struct TreeState {
115    pub num_transactions: LeafCount,
116    pub ledger_frozen_subtree_hashes: Vec<HashValue>,
117    pub account_state_root_hash: HashValue,
118}
119
120impl TreeState {
121    pub fn new(
122        num_transactions: LeafCount,
123        ledger_frozen_subtree_hashes: Vec<HashValue>,
124        account_state_root_hash: HashValue,
125    ) -> Self {
126        Self {
127            num_transactions,
128            ledger_frozen_subtree_hashes,
129            account_state_root_hash,
130        }
131    }
132
133    pub fn describe(&self) -> &'static str {
134        if self.num_transactions != 0 {
135            "DB has been bootstrapped."
136        } else if self.account_state_root_hash
137            != *SPARSE_MERKLE_PLACEHOLDER_HASH
138        {
139            "DB has no transaction, but a non-empty pre-genesis state."
140        } else {
141            "DB is empty, has no transaction or state."
142        }
143    }
144}
145
146#[derive(Debug, Deserialize, Error, PartialEq, Serialize)]
147pub enum Error {
148    #[error("Service error: {:?}", error)]
149    ServiceError { error: String },
150
151    #[error("Serialization error: {0}")]
152    SerializationError(String),
153}
154
155impl From<anyhow::Error> for Error {
156    fn from(error: anyhow::Error) -> Self {
157        Self::ServiceError {
158            error: format!("{}", error),
159        }
160    }
161}
162
163impl From<bcs::Error> for Error {
164    fn from(error: bcs::Error) -> Self {
165        Self::SerializationError(format!("{}", error))
166    }
167}
168
169impl From<diem_secure_net::Error> for Error {
170    fn from(error: diem_secure_net::Error) -> Self {
171        Self::ServiceError {
172            error: format!("{}", error),
173        }
174    }
175}
176
177#[derive(Clone, Copy, Eq, PartialEq)]
178pub enum Order {
179    Ascending,
180    Descending,
181}
182
183/// Trait that is implemented by a DB that supports certain public (to client)
184/// read APIs expected of a Diem DB
185pub trait DbReader: Send + Sync {
186    /// See [`DiemDB::get_epoch_ending_ledger_infos`].
187    ///
188    /// [`DiemDB::get_epoch_ending_ledger_infos`]:
189    /// ../pos-ledger-db/struct.DiemDB.html#method.get_epoch_ending_ledger_infos
190    fn get_epoch_ending_ledger_infos(
191        &self, start_epoch: u64, end_epoch: u64,
192    ) -> Result<EpochChangeProof>;
193
194    /// See [`DiemDB::get_transactions`].
195    ///
196    /// [`DiemDB::get_transactions`]:
197    /// ../pos-ledger-db/struct.DiemDB.html#method.get_transactions
198    fn get_transactions(
199        &self, start_version: Version, batch_size: u64,
200        ledger_version: Version, fetch_events: bool,
201    ) -> Result<TransactionListWithProof>;
202
203    /// See [`DiemDB::get_block_timestamp`].
204    ///
205    /// [`DiemDB::get_block_timestamp`]:
206    /// ../pos-ledger-db/struct.DiemDB.html#method.get_block_timestamp
207    fn get_block_timestamp(&self, version: u64) -> Result<u64>;
208
209    /// Gets the version of the last transaction committed before timestamp,
210    /// a committed block at or after the required timestamp must exist
211    /// (otherwise it's possible the next block committed as a timestamp
212    /// smaller than the one in the request).
213    fn get_last_version_before_timestamp(
214        &self, _timestamp: u64, _ledger_version: Version,
215    ) -> Result<Version> {
216        unimplemented!()
217    }
218
219    /// See [`DiemDB::get_latest_account_state`].
220    ///
221    /// [`DiemDB::get_latest_account_state`]:
222    /// ../pos-ledger-db/struct.DiemDB.html#method.get_latest_account_state
223    fn get_latest_account_state(
224        &self, address: AccountAddress,
225    ) -> Result<Option<AccountStateBlob>>;
226
227    /// Returns the latest ledger info.
228    fn get_latest_ledger_info(&self) -> Result<LedgerInfoWithSignatures>;
229
230    /// Returns the latest ledger info.
231    fn get_latest_version(&self) -> Result<Version> {
232        Ok(self.get_latest_ledger_info()?.ledger_info().version())
233    }
234
235    /// Returns the latest version and committed block timestamp
236    fn get_latest_commit_metadata(&self) -> Result<(Version, u64)> {
237        let ledger_info_with_sig = self.get_latest_ledger_info()?;
238        let ledger_info = ledger_info_with_sig.ledger_info();
239        Ok((ledger_info.version(), ledger_info.timestamp_usecs()))
240    }
241
242    /// Gets information needed from storage during the main node startup.
243    /// See [`DiemDB::get_startup_info`].
244    ///
245    /// [`DiemDB::get_startup_info`]:
246    /// ../pos-ledger-db/struct.DiemDB.html#method.get_startup_info
247    fn get_startup_info(
248        &self, need_pos_state: bool,
249    ) -> Result<Option<StartupInfo>>;
250
251    fn get_txn_by_account(
252        &self, address: AccountAddress, seq_num: u64, ledger_version: Version,
253        fetch_events: bool,
254    ) -> Result<Option<TransactionWithProof>>;
255
256    /// Returns proof of new state for a given ledger info with signatures
257    /// relative to version known to client
258    fn get_state_proof_with_ledger_info(
259        &self, known_version: u64, ledger_info: LedgerInfoWithSignatures,
260    ) -> Result<(EpochChangeProof, AccumulatorConsistencyProof)>;
261
262    /// Returns proof of new state relative to version known to client
263    fn get_state_proof(
264        &self, known_version: u64,
265    ) -> Result<(
266        LedgerInfoWithSignatures,
267        EpochChangeProof,
268        AccumulatorConsistencyProof,
269    )>;
270
271    /// Returns the account state corresponding to the given version and account
272    /// address with proof based on `ledger_version`
273    fn get_account_state_with_proof(
274        &self, address: AccountAddress, version: Version,
275        ledger_version: Version,
276    ) -> Result<AccountStateWithProof>;
277
278    // Gets an account state by account address, out of the ledger state
279    // indicated by the state Merkle tree root with a sparse merkle proof
280    // proving state tree root. See [`DiemDB::
281    // get_account_state_with_proof_by_version`].
282    //
283    // [`DiemDB::get_account_state_with_proof_by_version`]:
284    // ../pos-ledger-db/struct.DiemDB.html#method.
285    // get_account_state_with_proof_by_version
286    //
287    // This is used by diem core (executor) internally.
288    fn get_account_state_with_proof_by_version(
289        &self, address: AccountAddress, version: Version,
290    ) -> Result<(
291        Option<AccountStateBlob>,
292        SparseMerkleProof<AccountStateBlob>,
293    )>;
294
295    /// See [`DiemDB::get_latest_state_root`].
296    ///
297    /// [`DiemDB::get_latest_state_root`]:
298    /// ../pos-ledger-db/struct.DiemDB.html#method.get_latest_state_root
299    fn get_latest_state_root(&self) -> Result<(Version, HashValue)>;
300
301    /// Gets the latest TreeState no matter if db has been bootstrapped.
302    /// Used by the Db-bootstrapper.
303    fn get_latest_tree_state(&self) -> Result<TreeState>;
304
305    /// Get the ledger info of the epoch that `known_version` belongs to.
306    fn get_epoch_ending_ledger_info(
307        &self, known_version: u64,
308    ) -> Result<LedgerInfoWithSignatures>;
309
310    /// Gets the latest transaction info.
311    /// N.B. Unlike get_startup_info(), even if the db is not bootstrapped, this
312    /// can return `Some` -- those from a db-restore run.
313    fn get_latest_transaction_info_option(
314        &self,
315    ) -> Result<Option<(Version, TransactionInfo)>> {
316        unimplemented!()
317    }
318
319    /// Gets the transaction accumulator root hash at specified version.
320    /// Caller must guarantee the version is not greater than the latest
321    /// version.
322    fn get_accumulator_root_hash(
323        &self, _version: Version,
324    ) -> Result<HashValue> {
325        unimplemented!()
326    }
327
328    fn get_pos_state(&self, _block_id: &HashValue) -> Result<PosState> {
329        unimplemented!()
330    }
331
332    fn get_latest_pos_state(&self) -> Arc<PosState> { unimplemented!() }
333}
334
335impl MoveStorage for &dyn DbReader {
336    fn batch_fetch_resources(
337        &self, access_paths: Vec<AccessPath>,
338    ) -> Result<Vec<Vec<u8>>> {
339        self.batch_fetch_resources_by_version(
340            access_paths,
341            self.fetch_synced_version()?,
342        )
343    }
344
345    fn batch_fetch_resources_by_version(
346        &self, access_paths: Vec<AccessPath>, version: Version,
347    ) -> Result<Vec<Vec<u8>>> {
348        let addresses: Vec<AccountAddress> = access_paths
349            .iter()
350            .collect::<HashSet<_>>()
351            .iter()
352            .map(|path| path.address)
353            .collect();
354
355        let results = addresses
356            .iter()
357            .map(|addr| {
358                self.get_account_state_with_proof_by_version(*addr, version)
359            })
360            .collect::<Result<Vec<_>>>()?;
361
362        // Account address --> AccountState
363        let account_states = addresses
364            .iter()
365            .zip_eq(results)
366            .map(|(addr, (blob, _proof))| {
367                let account_state = AccountState::try_from(&blob.ok_or_else(|| {
368                    format_err!("missing blob in account state/account does not exist")
369                })?)?;
370                Ok((addr, account_state))
371            })
372            .collect::<Result<HashMap<_, AccountState>>>()?;
373
374        access_paths
375            .iter()
376            .map(|path| {
377                Ok(account_states
378                    .get(&path.address)
379                    .ok_or_else(|| {
380                        format_err!(
381                            "missing account state for queried access path"
382                        )
383                    })?
384                    .get(&path.path)
385                    .ok_or_else(|| {
386                        format_err!("no value found in account state")
387                    })?
388                    .clone())
389            })
390            .collect()
391    }
392
393    fn fetch_synced_version(&self) -> Result<u64> {
394        let (synced_version, _) = self
395            .get_latest_transaction_info_option()
396            .map_err(|e| {
397                format_err!(
398                    "[MoveStorage] Failed fetching latest transaction info: {}",
399                    e
400                )
401            })?
402            .ok_or_else(|| {
403                format_err!("[MoveStorage] Latest transaction info not found.")
404            })?;
405        Ok(synced_version)
406    }
407}
408
409/// Trait that is implemented by a DB that supports certain public (to client)
410/// write APIs expected of a Diem DB. This adds write APIs to DbReader.
411pub trait DbWriter: Send + Sync {
412    /// Persist transactions. Called by the executor module when either syncing
413    /// nodes or committing blocks during normal operation.
414    /// See [`DiemDB::save_transactions`].
415    ///
416    /// [`DiemDB::save_transactions`]:
417    /// ../pos-ledger-db/struct.DiemDB.html#method.save_transactions
418    fn save_transactions(
419        &self, txns_to_commit: &[TransactionToCommit], first_version: Version,
420        ledger_info_with_sigs: Option<&LedgerInfoWithSignatures>,
421        pos_state: Option<PosState>, committed_blocks: Vec<CommittedBlock>,
422        ledger_infos_with_voted_block: Vec<(
423            HashValue,
424            LedgerInfoWithSignatures,
425        )>,
426    ) -> Result<()>;
427
428    fn save_reward_event(
429        &self, epoch: u64, event: &RewardDistributionEventV2,
430    ) -> Result<()>;
431
432    fn delete_pos_state_by_block(&self, block_id: &HashValue) -> Result<()>;
433}
434
435#[derive(Clone)]
436pub struct DbReaderWriter {
437    pub reader: Arc<dyn DbReader>,
438    pub writer: Arc<dyn DbWriter>,
439}
440
441impl DbReaderWriter {
442    pub fn new<D: 'static + DbReader + DbWriter>(db: D) -> Self {
443        let reader = Arc::new(db);
444        let writer = Arc::clone(&reader);
445
446        Self { reader, writer }
447    }
448
449    pub fn from_arc<D: 'static + DbReader + DbWriter>(arc_db: Arc<D>) -> Self {
450        let reader = Arc::clone(&arc_db);
451        let writer = Arc::clone(&arc_db);
452
453        Self { reader, writer }
454    }
455
456    pub fn wrap<D: 'static + DbReader + DbWriter>(db: D) -> (Arc<D>, Self) {
457        let arc_db = Arc::new(db);
458        (Arc::clone(&arc_db), Self::from_arc(arc_db))
459    }
460}
461
462impl<D> From<D> for DbReaderWriter
463where D: 'static + DbReader + DbWriter
464{
465    fn from(db: D) -> Self { Self::new(db) }
466}
467
468/// Network types for storage service
469#[derive(Clone, Debug, Deserialize, Serialize)]
470pub enum StorageRequest {
471    GetAccountStateWithProofByVersionRequest(
472        Box<GetAccountStateWithProofByVersionRequest>,
473    ),
474    GetStartupInfoRequest,
475    SaveTransactionsRequest(Box<SaveTransactionsRequest>),
476}
477
478#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
479pub struct GetAccountStateWithProofByVersionRequest {
480    /// The access path to query with.
481    pub address: AccountAddress,
482
483    /// The version the query is based on.
484    pub version: Version,
485}
486
487impl GetAccountStateWithProofByVersionRequest {
488    /// Constructor.
489    pub fn new(address: AccountAddress, version: Version) -> Self {
490        Self { address, version }
491    }
492}
493
494#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
495pub struct SaveTransactionsRequest {
496    pub txns_to_commit: Vec<TransactionToCommit>,
497    pub first_version: Version,
498    pub ledger_info_with_signatures: Option<LedgerInfoWithSignatures>,
499}
500
501impl SaveTransactionsRequest {
502    /// Constructor.
503    pub fn new(
504        txns_to_commit: Vec<TransactionToCommit>, first_version: Version,
505        ledger_info_with_signatures: Option<LedgerInfoWithSignatures>,
506    ) -> Self {
507        SaveTransactionsRequest {
508            txns_to_commit,
509            first_version,
510            ledger_info_with_signatures,
511        }
512    }
513}
514
515pub trait DBReaderForPoW: Send + Sync + DbReader {
516    fn get_latest_ledger_info_option(&self)
517        -> Option<LedgerInfoWithSignatures>;
518
519    /// TODO(lpl): It's possible to use round number?
520    fn get_block_ledger_info(
521        &self, consensus_block_id: &HashValue,
522    ) -> Result<LedgerInfoWithSignatures>;
523
524    fn get_events_by_version(
525        &self, start_version: u64, end_version: u64,
526    ) -> Result<Vec<ContractEvent>>;
527
528    fn get_epoch_ending_blocks(
529        &self, start_epoch: u64, end_epoch: u64,
530    ) -> Result<Vec<HashValue>>;
531
532    fn get_reward_event(&self, epoch: u64)
533        -> Result<RewardDistributionEventV2>;
534
535    fn get_committed_block_by_hash(
536        &self, block_hash: &HashValue,
537    ) -> Result<CommittedBlock>;
538
539    fn get_committed_block_hash_by_view(&self, view: u64) -> Result<HashValue>;
540
541    fn get_ledger_info_by_voted_block(
542        &self, block_id: &HashValue,
543    ) -> Result<LedgerInfoWithSignatures>;
544
545    fn get_block_hash_by_epoch_and_round(
546        &self, epoch: u64, round: u64,
547    ) -> Result<HashValue>;
548}