cfxcore/block_data_manager/
tx_data_manager.rs1use crate::{
2 sync::request_manager::tx_handler::TransactionCacheContainer,
3 WORKER_COMPUTATION_PARALLELISM,
4};
5use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
6use metrics::{register_queue, Queue};
7use parking_lot::{Mutex, RwLock};
8use primitives::{
9 block::CompactBlock, Block, SignedTransaction, TransactionWithSignature,
10};
11use rlp::DecoderError;
12use std::{
13 sync::{mpsc::channel, Arc},
14 time::Duration,
15};
16use threadpool::ThreadPool;
17
18lazy_static! {
19 static ref RECOVER_PUB_KEY_QUEUE: Arc<dyn Queue> =
20 register_queue("recover_public_key_queue");
21}
22
23pub struct TransactionDataManager {
24 tx_time_window: RwLock<TransactionCacheContainer>,
25 worker_pool: Arc<Mutex<ThreadPool>>,
26}
27
28impl MallocSizeOf for TransactionDataManager {
29 fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
30 self.tx_time_window.read().size_of(ops)
31 }
32}
33
34impl TransactionDataManager {
35 pub fn new(
36 tx_cache_index_maintain_timeout: Duration,
37 worker_pool: Arc<Mutex<ThreadPool>>,
38 ) -> Self {
39 Self {
40 tx_time_window: RwLock::new(TransactionCacheContainer::new(
41 tx_cache_index_maintain_timeout.as_secs(),
42 )),
43 worker_pool,
44 }
45 }
46
47 pub fn recover_unsigned_tx(
51 &self, transactions: &Vec<TransactionWithSignature>,
52 ) -> Result<Vec<Arc<SignedTransaction>>, DecoderError> {
53 let uncached_trans = {
54 let tx_time_window = self.tx_time_window.read();
55 transactions
56 .iter()
57 .filter(|tx| {
58 let tx_hash = tx.hash();
59 let inserted = tx_time_window.contains_key(&tx_hash);
60 if tx_hash[0] & 254 == 0 {
62 debug!("Sampled transaction {:?} in tx pool", tx_hash);
63 }
64 !inserted
65 })
66 .map(|tx| (0, tx.clone())) .collect()
68 };
69 self.recover_uncached_tx(uncached_trans)
71 .map(|tx_vec| tx_vec.into_iter().map(|(_, tx)| tx).collect())
72 }
73
74 pub fn recover_block(&self, block: &mut Block) -> Result<(), DecoderError> {
83 let (uncached_trans, mut recovered_trans) = {
84 let tx_time_window = self.tx_time_window.read();
85 let mut uncached_trans = Vec::new();
86 let mut recovered_trans = Vec::new();
87 for (idx, transaction) in block.transactions.iter().enumerate() {
88 let tx_hash = transaction.hash();
89 if tx_hash[0] & 254 == 0 {
91 debug!("Sampled transaction {:?} in block", tx_hash);
92 }
93 match tx_time_window.get(&tx_hash) {
94 Some(tx) => recovered_trans.push(Some(tx.clone())),
95 None => {
96 uncached_trans
97 .push((idx, transaction.transaction.clone()));
98 recovered_trans.push(None);
99 }
100 }
101 }
102 (uncached_trans, recovered_trans)
103 };
104 for (idx, tx) in self.recover_uncached_tx(uncached_trans)? {
105 recovered_trans[idx] = Some(tx);
106 }
107 block.transactions = recovered_trans
108 .into_iter()
109 .map(|e| e.expect("All tx recovered"))
110 .collect();
111 Ok(())
112 }
113
114 pub fn recover_unsigned_tx_with_order(
115 &self, transactions: &Vec<TransactionWithSignature>,
116 ) -> Result<Vec<Arc<SignedTransaction>>, DecoderError> {
117 let (uncached_trans, mut recovered_trans) = {
118 let tx_time_window = self.tx_time_window.read();
119 let mut uncached_trans = Vec::new();
120 let mut recovered_trans = Vec::new();
121 for (idx, transaction) in transactions.iter().enumerate() {
122 let tx_hash = transaction.hash();
123 if tx_hash[0] & 254 == 0 {
125 debug!("Sampled transaction {:?} in block", tx_hash);
126 }
127 match tx_time_window.get(&tx_hash) {
128 Some(tx) => recovered_trans.push(Some(tx.clone())),
129 None => {
130 uncached_trans.push((idx, transaction.clone()));
131 recovered_trans.push(None);
132 }
133 }
134 }
135 (uncached_trans, recovered_trans)
136 };
137 for (idx, tx) in self.recover_uncached_tx(uncached_trans)? {
138 recovered_trans[idx] = Some(tx);
139 }
140 Ok(recovered_trans
141 .into_iter()
142 .map(|e| e.expect("All tx recovered"))
143 .collect())
144 }
145
146 fn recover_uncached_tx(
153 &self, uncached_trans: Vec<(usize, TransactionWithSignature)>,
154 ) -> Result<Vec<(usize, Arc<SignedTransaction>)>, DecoderError> {
155 let mut recovered_trans = Vec::new();
156 if uncached_trans.len() < WORKER_COMPUTATION_PARALLELISM * 8 {
157 for (idx, tx) in uncached_trans {
158 if let Ok(public) = tx.recover_public() {
159 recovered_trans.push((
160 idx,
161 Arc::new(SignedTransaction::new(public, tx.clone())),
162 ));
163 } else {
164 info!(
165 "Unable to recover the public key of transaction {:?}",
166 tx.hash()
167 );
168 return Err(DecoderError::Custom(
169 "Cannot recover public key",
170 ));
171 }
172 }
173 } else {
174 let tx_num = uncached_trans.len();
175 let tx_num_per_worker = tx_num / WORKER_COMPUTATION_PARALLELISM;
176 let mut remainder =
177 tx_num - (tx_num_per_worker * WORKER_COMPUTATION_PARALLELISM);
178 let mut start_idx = 0;
179 let mut end_idx = 0;
180 let mut unsigned_trans = Vec::new();
181
182 for tx in uncached_trans {
183 if start_idx == end_idx {
184 end_idx = start_idx + tx_num_per_worker;
186 if remainder > 0 {
187 end_idx += 1;
188 remainder -= 1;
189 }
190 let unsigned_txns = Vec::new();
191 unsigned_trans.push(unsigned_txns);
192 }
193
194 unsigned_trans.last_mut().unwrap().push(tx);
195
196 start_idx += 1;
197 }
198
199 let (sender, receiver) = channel();
200 let n_thread = unsigned_trans.len();
201 for unsigned_txns in unsigned_trans {
202 RECOVER_PUB_KEY_QUEUE.enqueue(unsigned_txns.len());
203 let sender = sender.clone();
204 self.worker_pool.lock().execute(move || {
205 let mut signed_txns = Vec::new();
206 for (idx, tx) in unsigned_txns {
207 if let Ok(public) = tx.recover_public() {
208 signed_txns.push((idx, Arc::new(SignedTransaction::new(
209 public,
210 tx.clone(),
211 ))));
212 } else {
213 info!(
214 "Unable to recover the public key of transaction {:?}",
215 tx.hash()
216 );
217 break;
218 }
219 }
220 sender.send(signed_txns).unwrap();
221 });
222 }
223
224 let mut total_recovered_num = 0 as usize;
225 for tx_publics in receiver.iter().take(n_thread) {
226 RECOVER_PUB_KEY_QUEUE.dequeue(tx_publics.len());
227 total_recovered_num += tx_publics.len();
228 for (idx, tx) in tx_publics {
229 recovered_trans.push((idx, tx));
230 }
231 }
232 if total_recovered_num != tx_num {
233 return Err(DecoderError::Custom("Cannot recover public key"));
234 }
235 }
236 let mut tx_time_window = self.tx_time_window.write();
237 tx_time_window.append_transactions(&recovered_trans.clone());
238 Ok(recovered_trans)
239 }
240
241 pub fn find_missing_tx_indices_encoded(
246 &self, compact_block: &mut CompactBlock,
247 ) -> Vec<usize> {
248 compact_block
249 .reconstructed_txns
250 .resize(compact_block.len(), None);
251
252 let (random_bytes_vector, fixed_bytes_vector) =
253 compact_block.get_decomposed_short_ids();
254 let (k0, k1) = CompactBlock::get_shortid_key(
255 &compact_block.block_header,
256 &compact_block.nonce,
257 );
258 let mut missing_index = Vec::new();
259 {
260 let tx_time_window = self.tx_time_window.read();
261 for i in 0..fixed_bytes_vector.len() {
262 match tx_time_window.get_transaction(
263 fixed_bytes_vector[i],
264 random_bytes_vector[i],
265 k0,
266 k1,
267 ) {
268 Some(tx) => {
269 compact_block.reconstructed_txns[i] = Some(tx.clone());
270 }
271 None => {
272 missing_index.push(i);
273 }
274 }
275 }
276 }
277
278 let mut last = 0;
279 let mut missing_encoded = Vec::new();
280 for index in missing_index {
281 missing_encoded.push(index - last);
282 last = index + 1;
283 }
284 missing_encoded
285 }
286}
287
288#[cfg(test)]
289mod tests {
290 use super::TransactionDataManager;
291 use crate::{
292 keylib::{Generator, Random},
293 sync::message::GetBlocksWithPublicResponse,
294 };
295 use cfx_types::{AddressSpaceUtil, U256};
296 use parking_lot::Mutex;
297 use primitives::{
298 transaction::native_transaction::NativeTransaction, Action, Block,
299 BlockHeaderBuilder, Transaction,
300 };
301 use std::{sync::Arc, time::Duration};
302 use threadpool::ThreadPool;
303
304 #[test]
308 fn recover_block_ignores_forged_with_public_sender() {
309 let attacker = Random.generate().unwrap();
310 let victim = Random.generate().unwrap();
311 let recipient = Random.generate().unwrap().address();
312
313 let mut forged = Transaction::from(NativeTransaction {
314 nonce: U256::zero(),
315 gas_price: U256::one(),
316 gas: U256::from(21_000),
317 action: Action::Call(recipient),
318 value: U256::from(7),
319 storage_limit: 0,
320 epoch_height: 0,
321 chain_id: 1,
322 data: Vec::new(),
323 })
324 .sign(attacker.secret());
325
326 forged.sender = victim.address();
329 assert!(forged.verify_public(false).unwrap());
330
331 let response = GetBlocksWithPublicResponse {
333 request_id: 1,
334 blocks: vec![Block::new(
335 BlockHeaderBuilder::new().build(),
336 vec![Arc::new(forged)],
337 )],
338 };
339 let decoded: GetBlocksWithPublicResponse =
340 rlp::decode(&rlp::encode(&response))
341 .expect("with-public response decodes");
342 let mut block = decoded.blocks.into_iter().next().unwrap();
343
344 assert!(block.transactions[0].public.is_some());
346 assert_eq!(
347 block.transactions[0].sender(),
348 victim.address().with_native_space()
349 );
350
351 let tx_manager = TransactionDataManager::new(
352 Duration::from_secs(600),
353 Arc::new(Mutex::new(ThreadPool::new(1))),
354 );
355 tx_manager
356 .recover_block(&mut block)
357 .expect("block transactions recover from their signatures");
358
359 let recovered = block.transactions[0].clone();
360 assert_eq!(
361 recovered.sender(),
362 attacker.address().with_native_space(),
363 "sender must be re-derived from the signature, not the forged \
364 peer-supplied metadata"
365 );
366 }
367}