use crate::{
    event_store::EventStore,
    ledger_store::LedgerStore,
    metrics::{
        BACKUP_EPOCH_ENDING_EPOCH, BACKUP_STATE_SNAPSHOT_LEAF_IDX,
        BACKUP_STATE_SNAPSHOT_VERSION, BACKUP_TXN_VERSION,
    },
    state_store::StateStore,
    transaction_store::TransactionStore,
};
use anyhow::{anyhow, ensure, Result};
use diem_crypto::hash::HashValue;
use diem_jellyfish_merkle::iterator::JellyfishMerkleIterator;
use diem_types::{
    account_state_blob::AccountStateBlob,
    contract_event::ContractEvent,
    ledger_info::LedgerInfoWithSignatures,
    proof::{
        SparseMerkleRangeProof, TransactionAccumulatorRangeProof,
        TransactionInfoWithProof,
    },
    transaction::{Transaction, TransactionInfo, Version},
};
use itertools::zip_eq;
use serde::{Deserialize, Serialize};
use std::{fmt, sync::Arc};
#[derive(Clone)]
pub struct BackupHandler {
    ledger_store: Arc<LedgerStore>,
    transaction_store: Arc<TransactionStore>,
    state_store: Arc<StateStore>,
    event_store: Arc<EventStore>,
}
impl BackupHandler {
    pub(crate) fn new(
        ledger_store: Arc<LedgerStore>,
        transaction_store: Arc<TransactionStore>, state_store: Arc<StateStore>,
        event_store: Arc<EventStore>,
    ) -> Self {
        Self {
            ledger_store,
            transaction_store,
            state_store,
            event_store,
        }
    }
    pub fn get_transaction_iter(
        &self, start_version: Version, num_transactions: usize,
    ) -> Result<
        impl Iterator<
                Item = Result<(
                    Transaction,
                    TransactionInfo,
                    Vec<ContractEvent>,
                )>,
            > + '_,
    > {
        let txn_iter = self
            .transaction_store
            .get_transaction_iter(start_version, num_transactions)?;
        let txn_info_iter = self
            .ledger_store
            .get_transaction_info_iter(start_version, num_transactions)?;
        let events_iter = self
            .event_store
            .get_events_by_version_iter(start_version, num_transactions)?;
        let zipped = zip_eq(zip_eq(txn_iter, txn_info_iter), events_iter)
            .enumerate()
            .map(move |(idx, ((txn_res, txn_info_res), events_res))| {
                BACKUP_TXN_VERSION
                    .set((start_version.wrapping_add(idx as u64)) as i64);
                Ok((txn_res?, txn_info_res?, events_res?))
            });
        Ok(zipped)
    }
    pub fn get_transaction_range_proof(
        &self, first_version: Version, last_version: Version,
    ) -> Result<(TransactionAccumulatorRangeProof, LedgerInfoWithSignatures)>
    {
        ensure!(
            last_version >= first_version,
            "Bad transaction range: [{}, {}]",
            first_version,
            last_version
        );
        let num_transactions = last_version - first_version + 1;
        let epoch = self.ledger_store.get_epoch(last_version)?;
        let ledger_info =
            self.ledger_store.get_latest_ledger_info_in_epoch(epoch)?;
        let accumulator_proof = self.ledger_store.get_transaction_range_proof(
            Some(first_version),
            num_transactions,
            ledger_info.ledger_info().version(),
        )?;
        Ok((accumulator_proof, ledger_info))
    }
    pub fn get_account_iter(
        &self, version: Version,
    ) -> Result<
        Box<
            dyn Iterator<Item = Result<(HashValue, AccountStateBlob)>>
                + Send
                + Sync,
        >,
    > {
        let iterator = JellyfishMerkleIterator::new(
            Arc::clone(&self.state_store),
            version,
            HashValue::zero(),
        )?
        .enumerate()
        .map(move |(idx, res)| {
            BACKUP_STATE_SNAPSHOT_VERSION.set(version as i64);
            BACKUP_STATE_SNAPSHOT_LEAF_IDX.set(idx as i64);
            res
        });
        Ok(Box::new(iterator))
    }
    pub fn get_account_state_range_proof(
        &self, rightmost_key: HashValue, version: Version,
    ) -> Result<SparseMerkleRangeProof> {
        self.state_store
            .get_account_state_range_proof(rightmost_key, version)
    }
    pub fn get_db_state(&self) -> Result<Option<DbState>> {
        self.ledger_store
            .get_startup_info(false)?
            .map(|s| {
                Ok(DbState {
                    epoch: s.get_epoch_state().epoch,
                    committed_version: s
                        .committed_tree_state
                        .num_transactions
                        .checked_sub(1)
                        .ok_or_else(|| {
                            anyhow!("Bootstrapped DB has no transactions.")
                        })?,
                    synced_version: s
                        .synced_tree_state
                        .as_ref()
                        .unwrap_or(&s.committed_tree_state)
                        .num_transactions
                        .checked_sub(1)
                        .ok_or_else(|| {
                            anyhow!("Bootstrapped DB has no transactions.")
                        })?,
                })
            })
            .transpose()
    }
    pub fn get_state_root_proof(
        &self, version: Version,
    ) -> Result<(TransactionInfoWithProof, LedgerInfoWithSignatures)> {
        let epoch = self.ledger_store.get_epoch(version)?;
        let ledger_info =
            self.ledger_store.get_latest_ledger_info_in_epoch(epoch)?;
        let txn_info = self.ledger_store.get_transaction_info_with_proof(
            version,
            ledger_info.ledger_info().version(),
        )?;
        Ok((txn_info, ledger_info))
    }
    pub fn get_epoch_ending_ledger_info_iter(
        &self, start_epoch: u64, end_epoch: u64,
    ) -> Result<impl Iterator<Item = Result<LedgerInfoWithSignatures>> + '_>
    {
        Ok(self
            .ledger_store
            .get_epoch_ending_ledger_info_iter(start_epoch, end_epoch)?
            .enumerate()
            .map(move |(idx, li)| {
                BACKUP_EPOCH_ENDING_EPOCH
                    .set((start_epoch + idx as u64) as i64);
                li
            }))
    }
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct DbState {
    pub epoch: u64,
    pub committed_version: Version,
    pub synced_version: Version,
}
impl fmt::Display for DbState {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "epoch: {}, committed_version: {}, synced_version: {}",
            self.epoch, self.committed_version, self.synced_version,
        )
    }
}