use crate::{
    impls::errors::*, state::StateTrait, MptKeyValue, NodeMerkleProof,
    StateProof, StorageStateTraitExt,
};
use cfx_internal_common::StateRootWithAuxInfo;
use cfx_types::Space;
use parking_lot::Mutex;
use primitives::{
    EpochId, NodeMerkleTriplet, StaticBool, StorageKey, StorageKeyWithSpace,
};
use std::{
    sync::mpsc::{channel, Sender},
    thread::{self, JoinHandle},
};
pub struct ReplicatedState<Main> {
    state: Main,
    replication_handler: ReplicationHandler,
}
pub trait StateFilter: Sync + Send {
    fn keep_key(&self, _key: &StorageKeyWithSpace) -> bool;
}
impl StateFilter for Space {
    fn keep_key(&self, key: &StorageKeyWithSpace) -> bool { key.space == *self }
}
impl<Main: StateTrait> ReplicatedState<Main> {
    pub fn new<Replicate: StateTrait + Send + 'static>(
        main_state: Main, replicated_state: Replicate,
        filter: Option<Box<dyn StateFilter>>,
    ) -> ReplicatedState<Main> {
        let replication_handler =
            ReplicationHandler::new(replicated_state, filter);
        Self {
            state: main_state,
            replication_handler,
        }
    }
}
struct ReplicationHandler {
    filter: Option<Box<dyn StateFilter>>,
    op_sender: Mutex<Sender<StateOperation>>,
    thread_handle: Option<JoinHandle<Result<()>>>,
}
impl ReplicationHandler {
    fn new<Replicate: StateTrait + Send + 'static>(
        mut replicated_state: Replicate, filter: Option<Box<dyn StateFilter>>,
    ) -> ReplicationHandler {
        let (op_tx, op_rx) = channel();
        let thread_handle = thread::Builder::new()
            .name("state_replication".into())
            .spawn(move || {
                for op in op_rx {
                    trace!("replicated_state: op={:?}", op);
                    let err = match op {
                        StateOperation::Set { access_key, value } => {
                            replicated_state
                                .set(access_key.as_storage_key(), value)
                                .err()
                        }
                        StateOperation::Delete { access_key } => {
                            replicated_state
                                .delete(access_key.as_storage_key())
                                .err()
                        }
                        StateOperation::DeleteAll { access_key_prefix } => {
                            replicated_state
                                .delete_all(access_key_prefix.as_storage_key())
                                .err()
                        }
                        StateOperation::ComputeStateRoot => {
                            replicated_state.compute_state_root().err()
                        }
                        StateOperation::Commit { epoch_id } => {
                            return replicated_state
                                .commit(epoch_id)
                                .map(|_| ());
                        }
                    };
                    if let Some(e) = err {
                        error!("StateReplication Error: err={:?}", e);
                        return Err(e);
                    }
                }
                Ok(())
            })
            .expect("spawn error");
        Self {
            filter,
            op_sender: Mutex::new(op_tx),
            thread_handle: Some(thread_handle),
        }
    }
    fn send_op(&self, op: StateOperation) {
        if let Some(filter) = &self.filter {
            if let Some(key) = op.get_key() {
                if !filter.keep_key(&key) {
                    return;
                }
            }
        }
        if let Err(e) = self.op_sender.lock().send(op) {
            error!("send_op: err={:?}", e);
        }
    }
}
#[derive(Debug)]
enum StateOperation {
    Set {
        access_key: OwnedStorageKeyWithSpace,
        value: Box<[u8]>,
    },
    Delete {
        access_key: OwnedStorageKeyWithSpace,
    },
    DeleteAll {
        access_key_prefix: OwnedStorageKeyWithSpace,
    },
    ComputeStateRoot,
    Commit {
        epoch_id: EpochId,
    },
}
impl StateOperation {
    fn get_key(&self) -> Option<StorageKeyWithSpace> {
        match self {
            StateOperation::Set { access_key, .. }
            | StateOperation::Delete { access_key, .. }
            | StateOperation::DeleteAll {
                access_key_prefix: access_key,
                ..
            } => Some(access_key.as_storage_key()),
            StateOperation::ComputeStateRoot
            | StateOperation::Commit { .. } => None,
        }
    }
}
#[derive(Debug)]
enum OwnedStorageKey {
    AccountKey(Vec<u8>),
    StorageRootKey(Vec<u8>),
    StorageKey {
        address_bytes: Vec<u8>,
        storage_key: Vec<u8>,
    },
    CodeRootKey(Vec<u8>),
    CodeKey {
        address_bytes: Vec<u8>,
        code_hash_bytes: Vec<u8>,
    },
    DepositListKey(Vec<u8>),
    VoteListKey(Vec<u8>),
}
impl OwnedStorageKey {
    fn as_storage_key(&self) -> StorageKey {
        match &self {
            OwnedStorageKey::AccountKey(k) => {
                StorageKey::AccountKey(k.as_slice())
            }
            OwnedStorageKey::StorageRootKey(k) => {
                StorageKey::StorageRootKey(k.as_slice())
            }
            OwnedStorageKey::StorageKey {
                address_bytes,
                storage_key,
            } => StorageKey::StorageKey {
                address_bytes: address_bytes.as_slice(),
                storage_key: &storage_key,
            },
            OwnedStorageKey::CodeRootKey(k) => {
                StorageKey::CodeRootKey(k.as_slice())
            }
            OwnedStorageKey::CodeKey {
                address_bytes,
                code_hash_bytes,
            } => StorageKey::CodeKey {
                address_bytes: &address_bytes,
                code_hash_bytes: &code_hash_bytes,
            },
            OwnedStorageKey::DepositListKey(k) => {
                StorageKey::DepositListKey(k.as_slice())
            }
            OwnedStorageKey::VoteListKey(k) => {
                StorageKey::VoteListKey(k.as_slice())
            }
        }
    }
}
#[derive(Debug)]
struct OwnedStorageKeyWithSpace {
    pub key: OwnedStorageKey,
    pub space: Space,
}
impl OwnedStorageKeyWithSpace {
    fn as_storage_key(&self) -> StorageKeyWithSpace {
        StorageKeyWithSpace {
            key: self.key.as_storage_key(),
            space: self.space,
        }
    }
}
impl<'a> From<StorageKey<'a>> for OwnedStorageKey {
    fn from(ref_key: StorageKey<'a>) -> Self {
        match ref_key {
            StorageKey::AccountKey(k) => {
                OwnedStorageKey::AccountKey(k.to_vec())
            }
            StorageKey::StorageRootKey(k) => {
                OwnedStorageKey::StorageRootKey(k.to_vec())
            }
            StorageKey::StorageKey {
                address_bytes,
                storage_key,
            } => OwnedStorageKey::StorageKey {
                address_bytes: address_bytes.to_vec(),
                storage_key: storage_key.to_vec(),
            },
            StorageKey::CodeRootKey(k) => {
                OwnedStorageKey::CodeRootKey(k.to_vec())
            }
            StorageKey::CodeKey {
                address_bytes,
                code_hash_bytes,
            } => OwnedStorageKey::CodeKey {
                address_bytes: address_bytes.to_vec(),
                code_hash_bytes: code_hash_bytes.to_vec(),
            },
            StorageKey::DepositListKey(k) => {
                OwnedStorageKey::DepositListKey(k.to_vec())
            }
            StorageKey::VoteListKey(k) => {
                OwnedStorageKey::VoteListKey(k.to_vec())
            }
        }
    }
}
impl<'a> From<StorageKeyWithSpace<'a>> for OwnedStorageKeyWithSpace {
    fn from(ref_key: StorageKeyWithSpace<'a>) -> Self {
        Self {
            key: ref_key.key.into(),
            space: ref_key.space,
        }
    }
}
impl<Main: StateTrait> StateTrait for ReplicatedState<Main> {
    fn get(
        &self, access_key: StorageKeyWithSpace,
    ) -> Result<Option<Box<[u8]>>> {
        self.state.get(access_key)
    }
    fn set(
        &mut self, access_key: StorageKeyWithSpace, value: Box<[u8]>,
    ) -> Result<()> {
        self.replication_handler.send_op(StateOperation::Set {
            access_key: access_key.into(),
            value: value.clone(),
        });
        self.state.set(access_key, value)
    }
    fn delete(&mut self, access_key: StorageKeyWithSpace) -> Result<()> {
        self.replication_handler.send_op(StateOperation::Delete {
            access_key: access_key.into(),
        });
        self.state.delete(access_key)
    }
    fn delete_test_only(
        &mut self, _access_key: StorageKeyWithSpace,
    ) -> Result<Option<Box<[u8]>>> {
        todo!()
    }
    fn delete_all(
        &mut self, access_key_prefix: StorageKeyWithSpace,
    ) -> Result<Option<Vec<MptKeyValue>>> {
        self.replication_handler.send_op(StateOperation::DeleteAll {
            access_key_prefix: access_key_prefix.into(),
        });
        self.state.delete_all(access_key_prefix)
    }
    fn read_all(
        &mut self, access_key_prefix: StorageKeyWithSpace,
    ) -> Result<Option<Vec<MptKeyValue>>> {
        self.state.read_all(access_key_prefix)
    }
    fn compute_state_root(&mut self) -> Result<StateRootWithAuxInfo> {
        self.replication_handler
            .send_op(StateOperation::ComputeStateRoot);
        self.state.compute_state_root()
    }
    fn get_state_root(&self) -> Result<StateRootWithAuxInfo> {
        self.state.get_state_root()
    }
    fn commit(&mut self, epoch_id: EpochId) -> Result<StateRootWithAuxInfo> {
        let r = self.state.commit(epoch_id);
        self.replication_handler
            .send_op(StateOperation::Commit { epoch_id });
        self.replication_handler
            .thread_handle
            .take()
            .expect("only commit once")
            .join()
            .expect("ReplicationHandler thread join error")?;
        r
    }
}
impl<Main: StorageStateTraitExt> StorageStateTraitExt
    for ReplicatedState<Main>
{
    fn get_with_proof(
        &self, access_key: StorageKeyWithSpace,
    ) -> Result<(Option<Box<[u8]>>, StateProof)> {
        self.state.get_with_proof(access_key)
    }
    fn get_node_merkle_all_versions<WithProof: StaticBool>(
        &self, access_key: StorageKeyWithSpace,
    ) -> Result<(NodeMerkleTriplet, NodeMerkleProof)> {
        self.state
            .get_node_merkle_all_versions::<WithProof>(access_key)
    }
}