cfxcore/block_data_manager/
tx_data_manager.rs

1use crate::{
2    sync::request_manager::tx_handler::TransactionCacheContainer,
3    WORKER_COMPUTATION_PARALLELISM,
4};
5use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
6use metrics::{register_queue, Queue};
7use parking_lot::{Mutex, RwLock};
8use primitives::{
9    block::CompactBlock, Block, SignedTransaction, TransactionWithSignature,
10};
11use rlp::DecoderError;
12use std::{
13    sync::{mpsc::channel, Arc},
14    time::Duration,
15};
16use threadpool::ThreadPool;
17
18lazy_static! {
19    static ref RECOVER_PUB_KEY_QUEUE: Arc<dyn Queue> =
20        register_queue("recover_public_key_queue");
21}
22
23pub struct TransactionDataManager {
24    tx_time_window: RwLock<TransactionCacheContainer>,
25    worker_pool: Arc<Mutex<ThreadPool>>,
26}
27
28impl MallocSizeOf for TransactionDataManager {
29    fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
30        self.tx_time_window.read().size_of(ops)
31    }
32}
33
34impl TransactionDataManager {
35    pub fn new(
36        tx_cache_index_maintain_timeout: Duration,
37        worker_pool: Arc<Mutex<ThreadPool>>,
38    ) -> Self {
39        Self {
40            tx_time_window: RwLock::new(TransactionCacheContainer::new(
41                tx_cache_index_maintain_timeout.as_secs(),
42            )),
43            worker_pool,
44        }
45    }
46
47    /// Recover the public keys for uncached transactions in `transactions`.
48    /// If a tx is already in the cache, it will be ignored and not included in
49    /// the output vec.
50    pub fn recover_unsigned_tx(
51        &self, transactions: &Vec<TransactionWithSignature>,
52    ) -> Result<Vec<Arc<SignedTransaction>>, DecoderError> {
53        let uncached_trans = {
54            let tx_time_window = self.tx_time_window.read();
55            transactions
56                .iter()
57                .filter(|tx| {
58                    let tx_hash = tx.hash();
59                    let inserted = tx_time_window.contains_key(&tx_hash);
60                    // Sample 1/128 transactions
61                    if tx_hash[0] & 254 == 0 {
62                        debug!("Sampled transaction {:?} in tx pool", tx_hash);
63                    }
64                    !inserted
65                })
66                .map(|tx| (0, tx.clone())) // idx not used
67                .collect()
68        };
69        // Ignore the index and return the recovered tx list
70        self.recover_uncached_tx(uncached_trans)
71            .map(|tx_vec| tx_vec.into_iter().map(|(_, tx)| tx).collect())
72    }
73
74    /// Recover public keys for the transactions in `block`.
75    ///
76    /// The public keys already in input transactions will be used directly
77    /// without verification. `block` will not be updated if any error is
78    /// thrown.
79    pub fn recover_block(&self, block: &mut Block) -> Result<(), DecoderError> {
80        let (uncached_trans, mut recovered_trans) = {
81            let tx_time_window = self.tx_time_window.read();
82            let mut uncached_trans = Vec::new();
83            let mut recovered_trans = Vec::new();
84            for (idx, transaction) in block.transactions.iter().enumerate() {
85                if transaction.public.is_some() {
86                    // This may only happen for `GetBlocksWithPublicResponse`
87                    // for now.
88                    recovered_trans.push(Some(transaction.clone()));
89                    continue;
90                }
91                let tx_hash = transaction.hash();
92                // Sample 1/128 transactions
93                if tx_hash[0] & 254 == 0 {
94                    debug!("Sampled transaction {:?} in block", tx_hash);
95                }
96                match tx_time_window.get(&tx_hash) {
97                    Some(tx) => recovered_trans.push(Some(tx.clone())),
98                    None => {
99                        uncached_trans
100                            .push((idx, transaction.transaction.clone()));
101                        recovered_trans.push(None);
102                    }
103                }
104            }
105            (uncached_trans, recovered_trans)
106        };
107        for (idx, tx) in self.recover_uncached_tx(uncached_trans)? {
108            recovered_trans[idx] = Some(tx);
109        }
110        block.transactions = recovered_trans
111            .into_iter()
112            .map(|e| e.expect("All tx recovered"))
113            .collect();
114        Ok(())
115    }
116
117    pub fn recover_unsigned_tx_with_order(
118        &self, transactions: &Vec<TransactionWithSignature>,
119    ) -> Result<Vec<Arc<SignedTransaction>>, DecoderError> {
120        let (uncached_trans, mut recovered_trans) = {
121            let tx_time_window = self.tx_time_window.read();
122            let mut uncached_trans = Vec::new();
123            let mut recovered_trans = Vec::new();
124            for (idx, transaction) in transactions.iter().enumerate() {
125                let tx_hash = transaction.hash();
126                // Sample 1/128 transactions
127                if tx_hash[0] & 254 == 0 {
128                    debug!("Sampled transaction {:?} in block", tx_hash);
129                }
130                match tx_time_window.get(&tx_hash) {
131                    Some(tx) => recovered_trans.push(Some(tx.clone())),
132                    None => {
133                        uncached_trans.push((idx, transaction.clone()));
134                        recovered_trans.push(None);
135                    }
136                }
137            }
138            (uncached_trans, recovered_trans)
139        };
140        for (idx, tx) in self.recover_uncached_tx(uncached_trans)? {
141            recovered_trans[idx] = Some(tx);
142        }
143        Ok(recovered_trans
144            .into_iter()
145            .map(|e| e.expect("All tx recovered"))
146            .collect())
147    }
148
149    /// Recover public key for `uncached_trans` and keep the corresponding index
150    /// unchanged.
151    ///
152    /// Note that we release `tx_time_window` lock during pubkey recovery to
153    /// allow more parallelism, but we may recover a tx twice if it is
154    /// received again before the recovery finishes.
155    fn recover_uncached_tx(
156        &self, uncached_trans: Vec<(usize, TransactionWithSignature)>,
157    ) -> Result<Vec<(usize, Arc<SignedTransaction>)>, DecoderError> {
158        let mut recovered_trans = Vec::new();
159        if uncached_trans.len() < WORKER_COMPUTATION_PARALLELISM * 8 {
160            for (idx, tx) in uncached_trans {
161                if let Ok(public) = tx.recover_public() {
162                    recovered_trans.push((
163                        idx,
164                        Arc::new(SignedTransaction::new(public, tx.clone())),
165                    ));
166                } else {
167                    info!(
168                        "Unable to recover the public key of transaction {:?}",
169                        tx.hash()
170                    );
171                    return Err(DecoderError::Custom(
172                        "Cannot recover public key",
173                    ));
174                }
175            }
176        } else {
177            let tx_num = uncached_trans.len();
178            let tx_num_per_worker = tx_num / WORKER_COMPUTATION_PARALLELISM;
179            let mut remainder =
180                tx_num - (tx_num_per_worker * WORKER_COMPUTATION_PARALLELISM);
181            let mut start_idx = 0;
182            let mut end_idx = 0;
183            let mut unsigned_trans = Vec::new();
184
185            for tx in uncached_trans {
186                if start_idx == end_idx {
187                    // a new segment of transactions
188                    end_idx = start_idx + tx_num_per_worker;
189                    if remainder > 0 {
190                        end_idx += 1;
191                        remainder -= 1;
192                    }
193                    let unsigned_txns = Vec::new();
194                    unsigned_trans.push(unsigned_txns);
195                }
196
197                unsigned_trans.last_mut().unwrap().push(tx);
198
199                start_idx += 1;
200            }
201
202            let (sender, receiver) = channel();
203            let n_thread = unsigned_trans.len();
204            for unsigned_txns in unsigned_trans {
205                RECOVER_PUB_KEY_QUEUE.enqueue(unsigned_txns.len());
206                let sender = sender.clone();
207                self.worker_pool.lock().execute(move || {
208                    let mut signed_txns = Vec::new();
209                    for (idx, tx) in unsigned_txns {
210                        if let Ok(public) = tx.recover_public() {
211                            signed_txns.push((idx, Arc::new(SignedTransaction::new(
212                                public,
213                                tx.clone(),
214                            ))));
215                        } else {
216                            info!(
217                                "Unable to recover the public key of transaction {:?}",
218                                tx.hash()
219                            );
220                            break;
221                        }
222                    }
223                    sender.send(signed_txns).unwrap();
224                });
225            }
226
227            let mut total_recovered_num = 0 as usize;
228            for tx_publics in receiver.iter().take(n_thread) {
229                RECOVER_PUB_KEY_QUEUE.dequeue(tx_publics.len());
230                total_recovered_num += tx_publics.len();
231                for (idx, tx) in tx_publics {
232                    recovered_trans.push((idx, tx));
233                }
234            }
235            if total_recovered_num != tx_num {
236                return Err(DecoderError::Custom("Cannot recover public key"));
237            }
238        }
239        let mut tx_time_window = self.tx_time_window.write();
240        tx_time_window.append_transactions(&recovered_trans.clone());
241        Ok(recovered_trans)
242    }
243
244    /// Find tx in tx_time_window that matches tx_short_ids to fill in
245    /// reconstruced_txns Return the differentially encoded index of missing
246    /// transactions Now should only called once after CompactBlock is
247    /// decoded
248    pub fn find_missing_tx_indices_encoded(
249        &self, compact_block: &mut CompactBlock,
250    ) -> Vec<usize> {
251        compact_block
252            .reconstructed_txns
253            .resize(compact_block.len(), None);
254
255        let (random_bytes_vector, fixed_bytes_vector) =
256            compact_block.get_decomposed_short_ids();
257        let (k0, k1) = CompactBlock::get_shortid_key(
258            &compact_block.block_header,
259            &compact_block.nonce,
260        );
261        let mut missing_index = Vec::new();
262        {
263            let tx_time_window = self.tx_time_window.read();
264            for i in 0..fixed_bytes_vector.len() {
265                match tx_time_window.get_transaction(
266                    fixed_bytes_vector[i],
267                    random_bytes_vector[i],
268                    k0,
269                    k1,
270                ) {
271                    Some(tx) => {
272                        compact_block.reconstructed_txns[i] = Some(tx.clone());
273                    }
274                    None => {
275                        missing_index.push(i);
276                    }
277                }
278            }
279        }
280
281        let mut last = 0;
282        let mut missing_encoded = Vec::new();
283        for index in missing_index {
284            missing_encoded.push(index - last);
285            last = index + 1;
286        }
287        missing_encoded
288    }
289}