cfxcore/block_data_manager/
tx_data_manager.rsuse crate::{
    sync::request_manager::tx_handler::TransactionCacheContainer,
    WORKER_COMPUTATION_PARALLELISM,
};
use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
use metrics::{register_queue, Queue};
use parking_lot::{Mutex, RwLock};
use primitives::{
    block::CompactBlock, Block, SignedTransaction, TransactionWithSignature,
};
use rlp::DecoderError;
use std::{
    sync::{mpsc::channel, Arc},
    time::Duration,
};
use threadpool::ThreadPool;
lazy_static! {
    static ref RECOVER_PUB_KEY_QUEUE: Arc<dyn Queue> =
        register_queue("recover_public_key_queue");
}
pub struct TransactionDataManager {
    tx_time_window: RwLock<TransactionCacheContainer>,
    worker_pool: Arc<Mutex<ThreadPool>>,
}
impl MallocSizeOf for TransactionDataManager {
    fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
        self.tx_time_window.read().size_of(ops)
    }
}
impl TransactionDataManager {
    pub fn new(
        tx_cache_index_maintain_timeout: Duration,
        worker_pool: Arc<Mutex<ThreadPool>>,
    ) -> Self {
        Self {
            tx_time_window: RwLock::new(TransactionCacheContainer::new(
                tx_cache_index_maintain_timeout.as_secs(),
            )),
            worker_pool,
        }
    }
    pub fn recover_unsigned_tx(
        &self, transactions: &Vec<TransactionWithSignature>,
    ) -> Result<Vec<Arc<SignedTransaction>>, DecoderError> {
        let uncached_trans = {
            let tx_time_window = self.tx_time_window.read();
            transactions
                .iter()
                .filter(|tx| {
                    let tx_hash = tx.hash();
                    let inserted = tx_time_window.contains_key(&tx_hash);
                    if tx_hash[0] & 254 == 0 {
                        debug!("Sampled transaction {:?} in tx pool", tx_hash);
                    }
                    !inserted
                })
                .map(|tx| (0, tx.clone())) .collect()
        };
        self.recover_uncached_tx(uncached_trans)
            .map(|tx_vec| tx_vec.into_iter().map(|(_, tx)| tx).collect())
    }
    pub fn recover_block(&self, block: &mut Block) -> Result<(), DecoderError> {
        let (uncached_trans, mut recovered_trans) = {
            let tx_time_window = self.tx_time_window.read();
            let mut uncached_trans = Vec::new();
            let mut recovered_trans = Vec::new();
            for (idx, transaction) in block.transactions.iter().enumerate() {
                if transaction.public.is_some() {
                    recovered_trans.push(Some(transaction.clone()));
                    continue;
                }
                let tx_hash = transaction.hash();
                if tx_hash[0] & 254 == 0 {
                    debug!("Sampled transaction {:?} in block", tx_hash);
                }
                match tx_time_window.get(&tx_hash) {
                    Some(tx) => recovered_trans.push(Some(tx.clone())),
                    None => {
                        uncached_trans
                            .push((idx, transaction.transaction.clone()));
                        recovered_trans.push(None);
                    }
                }
            }
            (uncached_trans, recovered_trans)
        };
        for (idx, tx) in self.recover_uncached_tx(uncached_trans)? {
            recovered_trans[idx] = Some(tx);
        }
        block.transactions = recovered_trans
            .into_iter()
            .map(|e| e.expect("All tx recovered"))
            .collect();
        Ok(())
    }
    pub fn recover_unsigned_tx_with_order(
        &self, transactions: &Vec<TransactionWithSignature>,
    ) -> Result<Vec<Arc<SignedTransaction>>, DecoderError> {
        let (uncached_trans, mut recovered_trans) = {
            let tx_time_window = self.tx_time_window.read();
            let mut uncached_trans = Vec::new();
            let mut recovered_trans = Vec::new();
            for (idx, transaction) in transactions.iter().enumerate() {
                let tx_hash = transaction.hash();
                if tx_hash[0] & 254 == 0 {
                    debug!("Sampled transaction {:?} in block", tx_hash);
                }
                match tx_time_window.get(&tx_hash) {
                    Some(tx) => recovered_trans.push(Some(tx.clone())),
                    None => {
                        uncached_trans.push((idx, transaction.clone()));
                        recovered_trans.push(None);
                    }
                }
            }
            (uncached_trans, recovered_trans)
        };
        for (idx, tx) in self.recover_uncached_tx(uncached_trans)? {
            recovered_trans[idx] = Some(tx);
        }
        Ok(recovered_trans
            .into_iter()
            .map(|e| e.expect("All tx recovered"))
            .collect())
    }
    fn recover_uncached_tx(
        &self, uncached_trans: Vec<(usize, TransactionWithSignature)>,
    ) -> Result<Vec<(usize, Arc<SignedTransaction>)>, DecoderError> {
        let mut recovered_trans = Vec::new();
        if uncached_trans.len() < WORKER_COMPUTATION_PARALLELISM * 8 {
            for (idx, tx) in uncached_trans {
                if let Ok(public) = tx.recover_public() {
                    recovered_trans.push((
                        idx,
                        Arc::new(SignedTransaction::new(public, tx.clone())),
                    ));
                } else {
                    info!(
                        "Unable to recover the public key of transaction {:?}",
                        tx.hash()
                    );
                    return Err(DecoderError::Custom(
                        "Cannot recover public key",
                    ));
                }
            }
        } else {
            let tx_num = uncached_trans.len();
            let tx_num_per_worker = tx_num / WORKER_COMPUTATION_PARALLELISM;
            let mut remainder =
                tx_num - (tx_num_per_worker * WORKER_COMPUTATION_PARALLELISM);
            let mut start_idx = 0;
            let mut end_idx = 0;
            let mut unsigned_trans = Vec::new();
            for tx in uncached_trans {
                if start_idx == end_idx {
                    end_idx = start_idx + tx_num_per_worker;
                    if remainder > 0 {
                        end_idx += 1;
                        remainder -= 1;
                    }
                    let unsigned_txns = Vec::new();
                    unsigned_trans.push(unsigned_txns);
                }
                unsigned_trans.last_mut().unwrap().push(tx);
                start_idx += 1;
            }
            let (sender, receiver) = channel();
            let n_thread = unsigned_trans.len();
            for unsigned_txns in unsigned_trans {
                RECOVER_PUB_KEY_QUEUE.enqueue(unsigned_txns.len());
                let sender = sender.clone();
                self.worker_pool.lock().execute(move || {
                    let mut signed_txns = Vec::new();
                    for (idx, tx) in unsigned_txns {
                        if let Ok(public) = tx.recover_public() {
                            signed_txns.push((idx, Arc::new(SignedTransaction::new(
                                public,
                                tx.clone(),
                            ))));
                        } else {
                            info!(
                                "Unable to recover the public key of transaction {:?}",
                                tx.hash()
                            );
                            break;
                        }
                    }
                    sender.send(signed_txns).unwrap();
                });
            }
            let mut total_recovered_num = 0 as usize;
            for tx_publics in receiver.iter().take(n_thread) {
                RECOVER_PUB_KEY_QUEUE.dequeue(tx_publics.len());
                total_recovered_num += tx_publics.len();
                for (idx, tx) in tx_publics {
                    recovered_trans.push((idx, tx));
                }
            }
            if total_recovered_num != tx_num {
                return Err(DecoderError::Custom("Cannot recover public key"));
            }
        }
        let mut tx_time_window = self.tx_time_window.write();
        tx_time_window.append_transactions(&recovered_trans.clone());
        Ok(recovered_trans)
    }
    pub fn find_missing_tx_indices_encoded(
        &self, compact_block: &mut CompactBlock,
    ) -> Vec<usize> {
        compact_block
            .reconstructed_txns
            .resize(compact_block.len(), None);
        let (random_bytes_vector, fixed_bytes_vector) =
            compact_block.get_decomposed_short_ids();
        let (k0, k1) = CompactBlock::get_shortid_key(
            &compact_block.block_header,
            &compact_block.nonce,
        );
        let mut missing_index = Vec::new();
        {
            let tx_time_window = self.tx_time_window.read();
            for i in 0..fixed_bytes_vector.len() {
                match tx_time_window.get_transaction(
                    fixed_bytes_vector[i],
                    random_bytes_vector[i],
                    k0,
                    k1,
                ) {
                    Some(tx) => {
                        compact_block.reconstructed_txns[i] = Some(tx.clone());
                    }
                    None => {
                        missing_index.push(i);
                    }
                }
            }
        }
        let mut last = 0;
        let mut missing_encoded = Vec::new();
        for index in missing_index {
            missing_encoded.push(index - last);
            last = index + 1;
        }
        missing_encoded
    }
}