cfxcore/block_data_manager/
tx_data_manager.rs

1use crate::{
2    sync::request_manager::tx_handler::TransactionCacheContainer,
3    WORKER_COMPUTATION_PARALLELISM,
4};
5use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
6use metrics::{register_queue, Queue};
7use parking_lot::{Mutex, RwLock};
8use primitives::{
9    block::CompactBlock, Block, SignedTransaction, TransactionWithSignature,
10};
11use rlp::DecoderError;
12use std::{
13    sync::{mpsc::channel, Arc},
14    time::Duration,
15};
16use threadpool::ThreadPool;
17
18lazy_static! {
19    static ref RECOVER_PUB_KEY_QUEUE: Arc<dyn Queue> =
20        register_queue("recover_public_key_queue");
21}
22
23pub struct TransactionDataManager {
24    tx_time_window: RwLock<TransactionCacheContainer>,
25    worker_pool: Arc<Mutex<ThreadPool>>,
26}
27
28impl MallocSizeOf for TransactionDataManager {
29    fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
30        self.tx_time_window.read().size_of(ops)
31    }
32}
33
34impl TransactionDataManager {
35    pub fn new(
36        tx_cache_index_maintain_timeout: Duration,
37        worker_pool: Arc<Mutex<ThreadPool>>,
38    ) -> Self {
39        Self {
40            tx_time_window: RwLock::new(TransactionCacheContainer::new(
41                tx_cache_index_maintain_timeout.as_secs(),
42            )),
43            worker_pool,
44        }
45    }
46
47    /// Recover the public keys for uncached transactions in `transactions`.
48    /// If a tx is already in the cache, it will be ignored and not included in
49    /// the output vec.
50    pub fn recover_unsigned_tx(
51        &self, transactions: &Vec<TransactionWithSignature>,
52    ) -> Result<Vec<Arc<SignedTransaction>>, DecoderError> {
53        let uncached_trans = {
54            let tx_time_window = self.tx_time_window.read();
55            transactions
56                .iter()
57                .filter(|tx| {
58                    let tx_hash = tx.hash();
59                    let inserted = tx_time_window.contains_key(&tx_hash);
60                    // Sample 1/128 transactions
61                    if tx_hash[0] & 254 == 0 {
62                        debug!("Sampled transaction {:?} in tx pool", tx_hash);
63                    }
64                    !inserted
65                })
66                .map(|tx| (0, tx.clone())) // idx not used
67                .collect()
68        };
69        // Ignore the index and return the recovered tx list
70        self.recover_uncached_tx(uncached_trans)
71            .map(|tx_vec| tx_vec.into_iter().map(|(_, tx)| tx).collect())
72    }
73
74    /// Recover public keys for the transactions in `block`.
75    ///
76    /// The sender and public key are always re-derived from each
77    /// transaction's signature. Any `sender`/`public` metadata in the input
78    /// (e.g. from a `GetBlocksWithPublicResponse`) is untrusted and ignored:
79    /// `sender` is an independent RLP field the signature does not bind, so
80    /// trusting it would let a peer attribute a validly-signed transaction to
81    /// an attacker-chosen sender. `block` is left unchanged on error.
82    pub fn recover_block(&self, block: &mut Block) -> Result<(), DecoderError> {
83        let (uncached_trans, mut recovered_trans) = {
84            let tx_time_window = self.tx_time_window.read();
85            let mut uncached_trans = Vec::new();
86            let mut recovered_trans = Vec::new();
87            for (idx, transaction) in block.transactions.iter().enumerate() {
88                let tx_hash = transaction.hash();
89                // Sample 1/128 transactions
90                if tx_hash[0] & 254 == 0 {
91                    debug!("Sampled transaction {:?} in block", tx_hash);
92                }
93                match tx_time_window.get(&tx_hash) {
94                    Some(tx) => recovered_trans.push(Some(tx.clone())),
95                    None => {
96                        uncached_trans
97                            .push((idx, transaction.transaction.clone()));
98                        recovered_trans.push(None);
99                    }
100                }
101            }
102            (uncached_trans, recovered_trans)
103        };
104        for (idx, tx) in self.recover_uncached_tx(uncached_trans)? {
105            recovered_trans[idx] = Some(tx);
106        }
107        block.transactions = recovered_trans
108            .into_iter()
109            .map(|e| e.expect("All tx recovered"))
110            .collect();
111        Ok(())
112    }
113
114    pub fn recover_unsigned_tx_with_order(
115        &self, transactions: &Vec<TransactionWithSignature>,
116    ) -> Result<Vec<Arc<SignedTransaction>>, DecoderError> {
117        let (uncached_trans, mut recovered_trans) = {
118            let tx_time_window = self.tx_time_window.read();
119            let mut uncached_trans = Vec::new();
120            let mut recovered_trans = Vec::new();
121            for (idx, transaction) in transactions.iter().enumerate() {
122                let tx_hash = transaction.hash();
123                // Sample 1/128 transactions
124                if tx_hash[0] & 254 == 0 {
125                    debug!("Sampled transaction {:?} in block", tx_hash);
126                }
127                match tx_time_window.get(&tx_hash) {
128                    Some(tx) => recovered_trans.push(Some(tx.clone())),
129                    None => {
130                        uncached_trans.push((idx, transaction.clone()));
131                        recovered_trans.push(None);
132                    }
133                }
134            }
135            (uncached_trans, recovered_trans)
136        };
137        for (idx, tx) in self.recover_uncached_tx(uncached_trans)? {
138            recovered_trans[idx] = Some(tx);
139        }
140        Ok(recovered_trans
141            .into_iter()
142            .map(|e| e.expect("All tx recovered"))
143            .collect())
144    }
145
146    /// Recover public key for `uncached_trans` and keep the corresponding index
147    /// unchanged.
148    ///
149    /// Note that we release `tx_time_window` lock during pubkey recovery to
150    /// allow more parallelism, but we may recover a tx twice if it is
151    /// received again before the recovery finishes.
152    fn recover_uncached_tx(
153        &self, uncached_trans: Vec<(usize, TransactionWithSignature)>,
154    ) -> Result<Vec<(usize, Arc<SignedTransaction>)>, DecoderError> {
155        let mut recovered_trans = Vec::new();
156        if uncached_trans.len() < WORKER_COMPUTATION_PARALLELISM * 8 {
157            for (idx, tx) in uncached_trans {
158                if let Ok(public) = tx.recover_public() {
159                    recovered_trans.push((
160                        idx,
161                        Arc::new(SignedTransaction::new(public, tx.clone())),
162                    ));
163                } else {
164                    info!(
165                        "Unable to recover the public key of transaction {:?}",
166                        tx.hash()
167                    );
168                    return Err(DecoderError::Custom(
169                        "Cannot recover public key",
170                    ));
171                }
172            }
173        } else {
174            let tx_num = uncached_trans.len();
175            let tx_num_per_worker = tx_num / WORKER_COMPUTATION_PARALLELISM;
176            let mut remainder =
177                tx_num - (tx_num_per_worker * WORKER_COMPUTATION_PARALLELISM);
178            let mut start_idx = 0;
179            let mut end_idx = 0;
180            let mut unsigned_trans = Vec::new();
181
182            for tx in uncached_trans {
183                if start_idx == end_idx {
184                    // a new segment of transactions
185                    end_idx = start_idx + tx_num_per_worker;
186                    if remainder > 0 {
187                        end_idx += 1;
188                        remainder -= 1;
189                    }
190                    let unsigned_txns = Vec::new();
191                    unsigned_trans.push(unsigned_txns);
192                }
193
194                unsigned_trans.last_mut().unwrap().push(tx);
195
196                start_idx += 1;
197            }
198
199            let (sender, receiver) = channel();
200            let n_thread = unsigned_trans.len();
201            for unsigned_txns in unsigned_trans {
202                RECOVER_PUB_KEY_QUEUE.enqueue(unsigned_txns.len());
203                let sender = sender.clone();
204                self.worker_pool.lock().execute(move || {
205                    let mut signed_txns = Vec::new();
206                    for (idx, tx) in unsigned_txns {
207                        if let Ok(public) = tx.recover_public() {
208                            signed_txns.push((idx, Arc::new(SignedTransaction::new(
209                                public,
210                                tx.clone(),
211                            ))));
212                        } else {
213                            info!(
214                                "Unable to recover the public key of transaction {:?}",
215                                tx.hash()
216                            );
217                            break;
218                        }
219                    }
220                    sender.send(signed_txns).unwrap();
221                });
222            }
223
224            let mut total_recovered_num = 0 as usize;
225            for tx_publics in receiver.iter().take(n_thread) {
226                RECOVER_PUB_KEY_QUEUE.dequeue(tx_publics.len());
227                total_recovered_num += tx_publics.len();
228                for (idx, tx) in tx_publics {
229                    recovered_trans.push((idx, tx));
230                }
231            }
232            if total_recovered_num != tx_num {
233                return Err(DecoderError::Custom("Cannot recover public key"));
234            }
235        }
236        let mut tx_time_window = self.tx_time_window.write();
237        tx_time_window.append_transactions(&recovered_trans.clone());
238        Ok(recovered_trans)
239    }
240
241    /// Find tx in tx_time_window that matches tx_short_ids to fill in
242    /// reconstruced_txns Return the differentially encoded index of missing
243    /// transactions Now should only called once after CompactBlock is
244    /// decoded
245    pub fn find_missing_tx_indices_encoded(
246        &self, compact_block: &mut CompactBlock,
247    ) -> Vec<usize> {
248        compact_block
249            .reconstructed_txns
250            .resize(compact_block.len(), None);
251
252        let (random_bytes_vector, fixed_bytes_vector) =
253            compact_block.get_decomposed_short_ids();
254        let (k0, k1) = CompactBlock::get_shortid_key(
255            &compact_block.block_header,
256            &compact_block.nonce,
257        );
258        let mut missing_index = Vec::new();
259        {
260            let tx_time_window = self.tx_time_window.read();
261            for i in 0..fixed_bytes_vector.len() {
262                match tx_time_window.get_transaction(
263                    fixed_bytes_vector[i],
264                    random_bytes_vector[i],
265                    k0,
266                    k1,
267                ) {
268                    Some(tx) => {
269                        compact_block.reconstructed_txns[i] = Some(tx.clone());
270                    }
271                    None => {
272                        missing_index.push(i);
273                    }
274                }
275            }
276        }
277
278        let mut last = 0;
279        let mut missing_encoded = Vec::new();
280        for index in missing_index {
281            missing_encoded.push(index - last);
282            last = index + 1;
283        }
284        missing_encoded
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use super::TransactionDataManager;
291    use crate::{
292        keylib::{Generator, Random},
293        sync::message::GetBlocksWithPublicResponse,
294    };
295    use cfx_types::{AddressSpaceUtil, U256};
296    use parking_lot::Mutex;
297    use primitives::{
298        transaction::native_transaction::NativeTransaction, Action, Block,
299        BlockHeaderBuilder, Transaction,
300    };
301    use std::{sync::Arc, time::Duration};
302    use threadpool::ThreadPool;
303
304    /// `recover_block` must re-derive each sender from the signature, not
305    /// trust the `sender`/`public` a `GetBlocksWithPublicResponse` peer
306    /// supplies.
307    #[test]
308    fn recover_block_ignores_forged_with_public_sender() {
309        let attacker = Random.generate().unwrap();
310        let victim = Random.generate().unwrap();
311        let recipient = Random.generate().unwrap().address();
312
313        let mut forged = Transaction::from(NativeTransaction {
314            nonce: U256::zero(),
315            gas_price: U256::one(),
316            gas: U256::from(21_000),
317            action: Action::Call(recipient),
318            value: U256::from(7),
319            storage_limit: 0,
320            epoch_height: 0,
321            chain_id: 1,
322            data: Vec::new(),
323        })
324        .sign(attacker.secret());
325
326        // Forge only `sender`; the attacker's public key still matches the
327        // signature, so `verify_public` cannot catch it.
328        forged.sender = victim.address();
329        assert!(forged.verify_public(false).unwrap());
330
331        // Round-trip through the real with-public wire encoding.
332        let response = GetBlocksWithPublicResponse {
333            request_id: 1,
334            blocks: vec![Block::new(
335                BlockHeaderBuilder::new().build(),
336                vec![Arc::new(forged)],
337            )],
338        };
339        let decoded: GetBlocksWithPublicResponse =
340            rlp::decode(&rlp::encode(&response))
341                .expect("with-public response decodes");
342        let mut block = decoded.blocks.into_iter().next().unwrap();
343
344        // The forgery must survive decoding, or the test proves nothing.
345        assert!(block.transactions[0].public.is_some());
346        assert_eq!(
347            block.transactions[0].sender(),
348            victim.address().with_native_space()
349        );
350
351        let tx_manager = TransactionDataManager::new(
352            Duration::from_secs(600),
353            Arc::new(Mutex::new(ThreadPool::new(1))),
354        );
355        tx_manager
356            .recover_block(&mut block)
357            .expect("block transactions recover from their signatures");
358
359        let recovered = block.transactions[0].clone();
360        assert_eq!(
361            recovered.sender(),
362            attacker.address().with_native_space(),
363            "sender must be re-derived from the signature, not the forged \
364             peer-supplied metadata"
365        );
366    }
367}