cfxcore/block_data_manager/
tx_data_manager.rs1use crate::{
2 sync::request_manager::tx_handler::TransactionCacheContainer,
3 WORKER_COMPUTATION_PARALLELISM,
4};
5use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
6use metrics::{register_queue, Queue};
7use parking_lot::{Mutex, RwLock};
8use primitives::{
9 block::CompactBlock, Block, SignedTransaction, TransactionWithSignature,
10};
11use rlp::DecoderError;
12use std::{
13 sync::{mpsc::channel, Arc},
14 time::Duration,
15};
16use threadpool::ThreadPool;
17
18lazy_static! {
19 static ref RECOVER_PUB_KEY_QUEUE: Arc<dyn Queue> =
20 register_queue("recover_public_key_queue");
21}
22
23pub struct TransactionDataManager {
24 tx_time_window: RwLock<TransactionCacheContainer>,
25 worker_pool: Arc<Mutex<ThreadPool>>,
26}
27
28impl MallocSizeOf for TransactionDataManager {
29 fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
30 self.tx_time_window.read().size_of(ops)
31 }
32}
33
34impl TransactionDataManager {
35 pub fn new(
36 tx_cache_index_maintain_timeout: Duration,
37 worker_pool: Arc<Mutex<ThreadPool>>,
38 ) -> Self {
39 Self {
40 tx_time_window: RwLock::new(TransactionCacheContainer::new(
41 tx_cache_index_maintain_timeout.as_secs(),
42 )),
43 worker_pool,
44 }
45 }
46
47 pub fn recover_unsigned_tx(
51 &self, transactions: &Vec<TransactionWithSignature>,
52 ) -> Result<Vec<Arc<SignedTransaction>>, DecoderError> {
53 let uncached_trans = {
54 let tx_time_window = self.tx_time_window.read();
55 transactions
56 .iter()
57 .filter(|tx| {
58 let tx_hash = tx.hash();
59 let inserted = tx_time_window.contains_key(&tx_hash);
60 if tx_hash[0] & 254 == 0 {
62 debug!("Sampled transaction {:?} in tx pool", tx_hash);
63 }
64 !inserted
65 })
66 .map(|tx| (0, tx.clone())) .collect()
68 };
69 self.recover_uncached_tx(uncached_trans)
71 .map(|tx_vec| tx_vec.into_iter().map(|(_, tx)| tx).collect())
72 }
73
74 pub fn recover_block(&self, block: &mut Block) -> Result<(), DecoderError> {
80 let (uncached_trans, mut recovered_trans) = {
81 let tx_time_window = self.tx_time_window.read();
82 let mut uncached_trans = Vec::new();
83 let mut recovered_trans = Vec::new();
84 for (idx, transaction) in block.transactions.iter().enumerate() {
85 if transaction.public.is_some() {
86 recovered_trans.push(Some(transaction.clone()));
89 continue;
90 }
91 let tx_hash = transaction.hash();
92 if tx_hash[0] & 254 == 0 {
94 debug!("Sampled transaction {:?} in block", tx_hash);
95 }
96 match tx_time_window.get(&tx_hash) {
97 Some(tx) => recovered_trans.push(Some(tx.clone())),
98 None => {
99 uncached_trans
100 .push((idx, transaction.transaction.clone()));
101 recovered_trans.push(None);
102 }
103 }
104 }
105 (uncached_trans, recovered_trans)
106 };
107 for (idx, tx) in self.recover_uncached_tx(uncached_trans)? {
108 recovered_trans[idx] = Some(tx);
109 }
110 block.transactions = recovered_trans
111 .into_iter()
112 .map(|e| e.expect("All tx recovered"))
113 .collect();
114 Ok(())
115 }
116
117 pub fn recover_unsigned_tx_with_order(
118 &self, transactions: &Vec<TransactionWithSignature>,
119 ) -> Result<Vec<Arc<SignedTransaction>>, DecoderError> {
120 let (uncached_trans, mut recovered_trans) = {
121 let tx_time_window = self.tx_time_window.read();
122 let mut uncached_trans = Vec::new();
123 let mut recovered_trans = Vec::new();
124 for (idx, transaction) in transactions.iter().enumerate() {
125 let tx_hash = transaction.hash();
126 if tx_hash[0] & 254 == 0 {
128 debug!("Sampled transaction {:?} in block", tx_hash);
129 }
130 match tx_time_window.get(&tx_hash) {
131 Some(tx) => recovered_trans.push(Some(tx.clone())),
132 None => {
133 uncached_trans.push((idx, transaction.clone()));
134 recovered_trans.push(None);
135 }
136 }
137 }
138 (uncached_trans, recovered_trans)
139 };
140 for (idx, tx) in self.recover_uncached_tx(uncached_trans)? {
141 recovered_trans[idx] = Some(tx);
142 }
143 Ok(recovered_trans
144 .into_iter()
145 .map(|e| e.expect("All tx recovered"))
146 .collect())
147 }
148
149 fn recover_uncached_tx(
156 &self, uncached_trans: Vec<(usize, TransactionWithSignature)>,
157 ) -> Result<Vec<(usize, Arc<SignedTransaction>)>, DecoderError> {
158 let mut recovered_trans = Vec::new();
159 if uncached_trans.len() < WORKER_COMPUTATION_PARALLELISM * 8 {
160 for (idx, tx) in uncached_trans {
161 if let Ok(public) = tx.recover_public() {
162 recovered_trans.push((
163 idx,
164 Arc::new(SignedTransaction::new(public, tx.clone())),
165 ));
166 } else {
167 info!(
168 "Unable to recover the public key of transaction {:?}",
169 tx.hash()
170 );
171 return Err(DecoderError::Custom(
172 "Cannot recover public key",
173 ));
174 }
175 }
176 } else {
177 let tx_num = uncached_trans.len();
178 let tx_num_per_worker = tx_num / WORKER_COMPUTATION_PARALLELISM;
179 let mut remainder =
180 tx_num - (tx_num_per_worker * WORKER_COMPUTATION_PARALLELISM);
181 let mut start_idx = 0;
182 let mut end_idx = 0;
183 let mut unsigned_trans = Vec::new();
184
185 for tx in uncached_trans {
186 if start_idx == end_idx {
187 end_idx = start_idx + tx_num_per_worker;
189 if remainder > 0 {
190 end_idx += 1;
191 remainder -= 1;
192 }
193 let unsigned_txns = Vec::new();
194 unsigned_trans.push(unsigned_txns);
195 }
196
197 unsigned_trans.last_mut().unwrap().push(tx);
198
199 start_idx += 1;
200 }
201
202 let (sender, receiver) = channel();
203 let n_thread = unsigned_trans.len();
204 for unsigned_txns in unsigned_trans {
205 RECOVER_PUB_KEY_QUEUE.enqueue(unsigned_txns.len());
206 let sender = sender.clone();
207 self.worker_pool.lock().execute(move || {
208 let mut signed_txns = Vec::new();
209 for (idx, tx) in unsigned_txns {
210 if let Ok(public) = tx.recover_public() {
211 signed_txns.push((idx, Arc::new(SignedTransaction::new(
212 public,
213 tx.clone(),
214 ))));
215 } else {
216 info!(
217 "Unable to recover the public key of transaction {:?}",
218 tx.hash()
219 );
220 break;
221 }
222 }
223 sender.send(signed_txns).unwrap();
224 });
225 }
226
227 let mut total_recovered_num = 0 as usize;
228 for tx_publics in receiver.iter().take(n_thread) {
229 RECOVER_PUB_KEY_QUEUE.dequeue(tx_publics.len());
230 total_recovered_num += tx_publics.len();
231 for (idx, tx) in tx_publics {
232 recovered_trans.push((idx, tx));
233 }
234 }
235 if total_recovered_num != tx_num {
236 return Err(DecoderError::Custom("Cannot recover public key"));
237 }
238 }
239 let mut tx_time_window = self.tx_time_window.write();
240 tx_time_window.append_transactions(&recovered_trans.clone());
241 Ok(recovered_trans)
242 }
243
244 pub fn find_missing_tx_indices_encoded(
249 &self, compact_block: &mut CompactBlock,
250 ) -> Vec<usize> {
251 compact_block
252 .reconstructed_txns
253 .resize(compact_block.len(), None);
254
255 let (random_bytes_vector, fixed_bytes_vector) =
256 compact_block.get_decomposed_short_ids();
257 let (k0, k1) = CompactBlock::get_shortid_key(
258 &compact_block.block_header,
259 &compact_block.nonce,
260 );
261 let mut missing_index = Vec::new();
262 {
263 let tx_time_window = self.tx_time_window.read();
264 for i in 0..fixed_bytes_vector.len() {
265 match tx_time_window.get_transaction(
266 fixed_bytes_vector[i],
267 random_bytes_vector[i],
268 k0,
269 k1,
270 ) {
271 Some(tx) => {
272 compact_block.reconstructed_txns[i] = Some(tx.clone());
273 }
274 None => {
275 missing_index.push(i);
276 }
277 }
278 }
279 }
280
281 let mut last = 0;
282 let mut missing_encoded = Vec::new();
283 for index in missing_index {
284 missing_encoded.push(index - last);
285 last = index + 1;
286 }
287 missing_encoded
288 }
289}