cfxcore/sync/request_manager/
tx_handler.rs1use crate::sync::message::TransactionDigests;
6use cfx_types::H256;
7use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
8use metrics::{register_meter_with_group, Meter, MeterTimer};
9use network::node_table::NodeId;
10use primitives::{block::CompactBlock, SignedTransaction, TxPropagateId};
11use std::{
12 collections::{HashMap, HashSet},
13 sync::Arc,
14 time::{SystemTime, UNIX_EPOCH},
15};
16lazy_static! {
17 static ref TX_FIRST_MISS_METER: Arc<dyn Meter> =
18 register_meter_with_group("tx_propagation", "tx_first_miss_size");
19 static ref TX_FOR_COMPARE_METER: Arc<dyn Meter> =
20 register_meter_with_group("tx_propagation", "tx_for_compare_size");
21 static ref TX_RANDOM_BYTE_METER: Arc<dyn Meter> =
22 register_meter_with_group("tx_propagation", "tx_random_byte_size");
23 static ref FULL_TX_COMPARE_METER: Arc<dyn Meter> =
24 register_meter_with_group("tx_propagation", "full_tx_cmpare_size");
25 static ref TX_INFLIGHT_COMPARISON_METER: Arc<dyn Meter> =
26 register_meter_with_group(
27 "tx_propagation",
28 "tx_inflight_comparison_size"
29 );
30 static ref REQUEST_MANAGER_PENDING_INFLIGHT_TX_TIMER: Arc<dyn Meter> =
31 register_meter_with_group(
32 "timer",
33 "request_manager::request_pending_inflight_tx"
34 );
35}
36
37#[derive(DeriveMallocSizeOf)]
38struct TimeWindowEntry<T> {
39 pub secs: u64,
40 pub values: Vec<T>,
41}
42
43#[derive(DeriveMallocSizeOf)]
44struct TimeWindow<T> {
45 window_size: usize,
46 slot_duration_as_secs: u64,
47 time_windowed_indices: Vec<Option<TimeWindowEntry<T>>>,
48}
49
50impl<T> TimeWindow<T> {
51 pub fn new(timeout: u64, window_size: usize) -> Self {
52 let mut time_windowed_indices = Vec::new();
53 for _ in 0..window_size {
54 time_windowed_indices.push(None);
55 }
56 TimeWindow {
57 window_size,
58 slot_duration_as_secs: timeout / window_size as u64,
59 time_windowed_indices,
60 }
61 }
62
63 pub fn append_entry(&mut self, mut values: Vec<T>) -> Option<Vec<T>> {
65 let now = SystemTime::now();
66 let duration = now.duration_since(UNIX_EPOCH);
67 let secs = duration.unwrap().as_secs();
68 let window_index =
69 (secs / self.slot_duration_as_secs) as usize % self.window_size;
70 let mut res = None;
71
72 if self.time_windowed_indices[window_index].is_none() {
73 self.time_windowed_indices[window_index] =
74 Some(TimeWindowEntry { secs, values });
75 } else {
76 let indices_with_time =
77 self.time_windowed_indices[window_index].as_mut().unwrap();
78 if indices_with_time.secs + self.slot_duration_as_secs <= secs {
79 indices_with_time.secs = secs;
80 std::mem::swap(&mut values, &mut indices_with_time.values);
81 res = Some(values);
82 } else {
83 indices_with_time.values.append(&mut values);
84 }
85 };
86
87 res
88 }
89}
90
91#[derive(DeriveMallocSizeOf)]
92struct ReceivedTransactionContainerInner {
93 tx_hashes_map: HashMap<TxPropagateId, HashSet<H256>>,
94 tx_hashes_set: HashSet<H256>,
95 time_window: TimeWindow<H256>,
96}
97
98impl ReceivedTransactionContainerInner {
99 pub fn new(timeout: u64, window_size: usize) -> Self {
100 ReceivedTransactionContainerInner {
101 tx_hashes_map: HashMap::new(),
102 tx_hashes_set: HashSet::new(),
103 time_window: TimeWindow::new(timeout, window_size),
104 }
105 }
106}
107
108#[derive(DeriveMallocSizeOf)]
109pub struct ReceivedTransactionContainer {
110 inner: ReceivedTransactionContainerInner,
111}
112
113impl ReceivedTransactionContainer {
114 const BUCKET_LIMIT: usize = 10;
115 const RECEIVED_TRANSACTION_CONTAINER_WINDOW_SIZE: usize = 64;
116
117 pub fn new(timeout: u64) -> Self {
118 ReceivedTransactionContainer {
119 inner: ReceivedTransactionContainerInner::new(
120 timeout,
121 ReceivedTransactionContainer::RECEIVED_TRANSACTION_CONTAINER_WINDOW_SIZE,
122 ),
123 }
124 }
125
126 pub fn group_overflow(&self, fixed_bytes: &TxPropagateId) -> bool {
127 if let Some(set) = self.inner.tx_hashes_map.get(&fixed_bytes) {
128 return set.len() >= ReceivedTransactionContainer::BUCKET_LIMIT;
129 }
130 false
131 }
132
133 pub fn group_overflow_from_tx_hash(&self, full_trans_id: &H256) -> bool {
134 let key: TxPropagateId = TransactionDigests::to_u24(
135 full_trans_id[29],
136 full_trans_id[30],
137 full_trans_id[31],
138 );
139 self.group_overflow(&key)
140 }
141
142 pub fn contains_short_id(
143 &self, fixed_bytes: TxPropagateId, random_byte: u8, key1: u64,
144 key2: u64,
145 ) -> bool {
146 let inner = &self.inner;
147 TX_FOR_COMPARE_METER.mark(1);
148
149 match inner.tx_hashes_map.get(&fixed_bytes) {
150 Some(set) => {
151 TX_FIRST_MISS_METER.mark(1);
152 for value in set {
153 if TransactionDigests::get_random_byte(value, key1, key2)
154 == random_byte
155 {
156 TX_RANDOM_BYTE_METER.mark(1);
157 return true;
158 }
159 }
160 }
161 None => {}
162 }
163 false
164 }
165
166 pub fn contains_tx_hash(&self, tx_hash: &H256) -> bool {
167 FULL_TX_COMPARE_METER.mark(1);
168 self.inner.tx_hashes_set.contains(tx_hash)
169 }
170
171 pub fn get_length(&self) -> usize { self.inner.tx_hashes_map.len() }
172
173 pub fn append_transactions(
174 &mut self, transactions: Vec<Arc<SignedTransaction>>,
175 ) {
176 let mut values = Vec::new();
177
178 for transaction in transactions {
179 let tx_hash = transaction.hash();
180 let short_id = TransactionDigests::to_u24(
181 tx_hash[29],
182 tx_hash[30],
183 tx_hash[31],
184 ); self.inner
186 .tx_hashes_map
187 .entry(short_id)
188 .and_modify(|s| {
189 s.insert(tx_hash.clone());
190 })
191 .or_insert_with(|| {
192 let mut set = HashSet::new();
193 set.insert(tx_hash.clone());
194 set
195 }); self.inner.tx_hashes_set.insert(tx_hash.clone());
198
199 values.push(tx_hash);
200 }
201
202 if let Some(remove_values) = self.inner.time_window.append_entry(values)
203 {
204 for tx_hash in &remove_values {
205 let key = TransactionDigests::to_u24(
206 tx_hash[29],
207 tx_hash[30],
208 tx_hash[31],
209 );
210 if let Some(set) = self.inner.tx_hashes_map.get_mut(&key) {
211 if set.len() == 1 {
213 self.inner.tx_hashes_map.remove(&key);
214 } else {
215 set.remove(tx_hash);
216 }
217 self.inner.tx_hashes_set.remove(tx_hash);
218 }
219 }
220 }
221 }
222}
223
224#[derive(DeriveMallocSizeOf)]
225struct SentTransactionContainerInner {
226 window_size: usize,
227 base_time_tick: usize,
228 next_time_tick: usize,
229 time_windowed_indices: Vec<Option<Vec<Arc<SignedTransaction>>>>,
230}
231
232impl SentTransactionContainerInner {
233 pub fn new(window_size: usize) -> Self {
234 let mut time_windowed_indices = Vec::new();
235 for _ in 0..window_size {
236 time_windowed_indices.push(None);
237 }
238
239 SentTransactionContainerInner {
240 window_size,
241 base_time_tick: 0,
242 next_time_tick: 0,
243 time_windowed_indices,
244 }
245 }
246}
247
248#[derive(DeriveMallocSizeOf)]
253pub struct SentTransactionContainer {
254 inner: SentTransactionContainerInner,
255}
256
257impl SentTransactionContainer {
258 pub fn new(window_size: usize) -> Self {
259 SentTransactionContainer {
260 inner: SentTransactionContainerInner::new(window_size),
261 }
262 }
263
264 pub fn get_transaction(
265 &self, window_index: usize, index: usize,
266 ) -> Option<Arc<SignedTransaction>> {
267 let inner = &self.inner;
268 if window_index >= inner.base_time_tick {
269 if window_index - inner.base_time_tick >= inner.window_size {
270 return None;
271 }
272 } else {
273 if std::usize::MAX - inner.base_time_tick + window_index + 1
274 >= inner.window_size
275 {
276 return None;
277 }
278 }
279
280 let transactions = inner.time_windowed_indices
281 [window_index % inner.window_size]
282 .as_ref();
283 if transactions.is_none() {
284 return None;
285 }
286
287 let transactions = transactions.unwrap();
288 if index >= transactions.len() {
289 return None;
290 }
291
292 Some(transactions[index].clone())
293 }
294
295 pub fn append_transactions(
296 &mut self, transactions: Vec<Arc<SignedTransaction>>,
297 ) -> usize {
298 let inner = &mut self.inner;
299
300 let base_window_index = inner.base_time_tick % inner.window_size;
301 let next_time_tick = inner.next_time_tick;
302 let next_window_index = next_time_tick % inner.window_size;
303 inner.time_windowed_indices[next_window_index] = Some(transactions);
304 if (next_window_index + 1) % inner.window_size == base_window_index {
305 inner.base_time_tick += 1;
306 }
307 inner.next_time_tick += 1;
308 next_time_tick
309 }
310}
311
312#[derive(Eq, PartialEq, Hash, DeriveMallocSizeOf)]
313pub struct InflightPendingTransactionItem {
314 pub fixed_byte_part: TxPropagateId,
315 pub random_byte_part: u8,
316 pub window_index: usize,
317 pub key1: u64,
318 pub key2: u64,
319 pub index: usize,
320 pub peer_id: NodeId,
321}
322impl InflightPendingTransactionItem {
323 pub fn new(
324 fixed_byte_part: TxPropagateId, random_byte_part: u8,
325 window_index: usize, key1: u64, key2: u64, index: usize,
326 peer_id: NodeId,
327 ) -> Self {
328 InflightPendingTransactionItem {
329 fixed_byte_part,
330 random_byte_part,
331 window_index,
332 key1,
333 key2,
334 index,
335 peer_id,
336 }
337 }
338}
339
340#[derive(DeriveMallocSizeOf)]
341struct InflightPendingTransactionContainerInner {
342 txid_hashmap:
343 HashMap<TxPropagateId, HashSet<Arc<InflightPendingTransactionItem>>>,
344 time_window: TimeWindow<Arc<InflightPendingTransactionItem>>,
345}
346
347impl InflightPendingTransactionContainerInner {
348 pub fn new(timeout: u64, window_size: usize) -> Self {
349 InflightPendingTransactionContainerInner {
350 txid_hashmap: HashMap::new(),
351 time_window: TimeWindow::new(timeout, window_size),
352 }
353 }
354}
355
356#[derive(DeriveMallocSizeOf)]
357pub struct InflightPendingTransactionContainer {
358 inner: InflightPendingTransactionContainerInner,
359}
360
361impl InflightPendingTransactionContainer {
362 const INFLIGHT_PENDING_TRANSACTION_CONTAINER_WINDOW_SIZE: usize = 5;
363
364 pub fn new(timeout: u64) -> Self {
365 InflightPendingTransactionContainer {
366 inner: InflightPendingTransactionContainerInner::new(
367 timeout,
368 InflightPendingTransactionContainer::INFLIGHT_PENDING_TRANSACTION_CONTAINER_WINDOW_SIZE,
369 ),
370 }
371 }
372
373 pub fn generate_tx_requests_from_inflight_pending_pool(
374 &mut self, signed_transactions: &Vec<Arc<SignedTransaction>>,
375 ) -> (
376 Vec<Arc<InflightPendingTransactionItem>>,
377 HashSet<TxPropagateId>,
378 ) {
379 let _timer = MeterTimer::time_func(
380 REQUEST_MANAGER_PENDING_INFLIGHT_TX_TIMER.as_ref(),
381 );
382 let mut requests = vec![];
383 let mut keeped_short_inflight_keys = HashSet::new();
384 for tx in signed_transactions {
385 let hash = tx.hash;
386 let fixed_bytes_part =
387 TransactionDigests::to_u24(hash[29], hash[30], hash[31]);
388 match self.inner.txid_hashmap.get_mut(&fixed_bytes_part) {
389 Some(set) => {
390 set.retain(|item| {
391 TransactionDigests::get_random_byte(
392 &hash, item.key1, item.key2,
393 ) != item.random_byte_part
394 });
395 if set.len() == 0 {
396 self.inner.txid_hashmap.remove(&fixed_bytes_part);
397 } else {
398 if let Some(item) = set.iter().next() {
399 requests.push(item.clone());
400 keeped_short_inflight_keys
401 .insert(item.fixed_byte_part);
402 set.remove(requests.last().expect("Just pushed"));
404 }
405 if set.len() == 0 {
406 self.inner.txid_hashmap.remove(&fixed_bytes_part);
407 }
408 }
409 }
410 None => {}
411 }
412 }
413 (requests, keeped_short_inflight_keys)
414 }
415
416 pub fn append_inflight_pending_items(
417 &mut self, items: Vec<InflightPendingTransactionItem>,
418 ) {
419 let mut values = Vec::new();
420 for item in items {
421 let key = item.fixed_byte_part;
422 let inflight_pending_item = Arc::new(item);
423 self.inner
424 .txid_hashmap
425 .entry(key)
426 .and_modify(|s| {
427 s.insert(inflight_pending_item.clone());
428 })
429 .or_insert_with(|| {
430 let mut set = HashSet::new();
431 set.insert(inflight_pending_item.clone());
432 set
433 }); values.push(inflight_pending_item);
436 }
437
438 if let Some(remove_values) = self.inner.time_window.append_entry(values)
439 {
440 for item in &remove_values {
441 if let Some(set) =
442 self.inner.txid_hashmap.get_mut(&item.fixed_byte_part)
443 {
444 if set.len() == 1 {
450 self.inner.txid_hashmap.remove(&item.fixed_byte_part);
451 } else {
452 set.remove(item);
453 }
454 }
455 }
456 }
457 }
458}
459
460#[derive(DeriveMallocSizeOf)]
461struct TransactionCacheContainerInner {
462 tx_hashes_map: HashMap<u32, HashSet<H256>>,
463 tx_map: HashMap<H256, Arc<SignedTransaction>>,
464 time_window: TimeWindow<H256>,
465}
466
467impl TransactionCacheContainerInner {
468 pub fn new(timeout: u64, window_size: usize) -> Self {
469 TransactionCacheContainerInner {
470 tx_hashes_map: HashMap::new(),
471 tx_map: HashMap::new(),
472 time_window: TimeWindow::new(timeout, window_size),
473 }
474 }
475}
476
477#[derive(DeriveMallocSizeOf)]
478pub struct TransactionCacheContainer {
479 inner: TransactionCacheContainerInner,
480}
481
482impl TransactionCacheContainer {
483 const TRANSACTION_CACHE_CONTAINER_WINDOW_SIZE: usize = 64;
484
485 pub fn new(timeout: u64) -> Self {
486 TransactionCacheContainer {
487 inner: TransactionCacheContainerInner::new(
488 timeout,
489 TransactionCacheContainer::TRANSACTION_CACHE_CONTAINER_WINDOW_SIZE,
490 ),
491 }
492 }
493
494 pub fn contains_key(&self, tx_hash: &H256) -> bool {
495 self.inner.tx_map.contains_key(tx_hash)
496 }
497
498 pub fn get(&self, tx_hash: &H256) -> Option<&Arc<SignedTransaction>> {
499 self.inner.tx_map.get(tx_hash)
500 }
501
502 pub fn get_transaction(
503 &self, fixed_bytes: u32, random_bytes: u16, key1: u64, key2: u64,
504 ) -> Option<Arc<SignedTransaction>> {
505 let inner = &self.inner;
506 let mut tx = None;
507 match inner.tx_hashes_map.get(&fixed_bytes) {
508 Some(set) => {
509 for value in set {
510 if CompactBlock::get_random_bytes(value, key1, key2)
511 == random_bytes
512 {
513 if tx.is_none() {
514 tx = Some(self.get(value).unwrap().clone());
515 } else {
516 return None;
517 }
518 }
519 }
520 }
521 None => {}
522 }
523 tx
524 }
525
526 pub fn append_transactions(
527 &mut self, transactions: &Vec<(usize, Arc<SignedTransaction>)>,
528 ) {
529 let mut values = Vec::new();
530 for (_, transaction) in transactions {
531 let tx_hash = transaction.hash();
532 let short_id = CompactBlock::to_u32(
533 tx_hash[28],
534 tx_hash[29],
535 tx_hash[30],
536 tx_hash[31],
537 );
538 self.inner
539 .tx_hashes_map
540 .entry(short_id)
541 .and_modify(|s| {
542 s.insert(tx_hash.clone());
543 })
544 .or_insert_with(|| {
545 let mut set = HashSet::new();
546 set.insert(tx_hash.clone());
547 set
548 }); self.inner
550 .tx_map
551 .insert(tx_hash.clone(), transaction.clone());
552 values.push(tx_hash);
553 }
554
555 if let Some(remove_values) = self.inner.time_window.append_entry(values)
556 {
557 for tx_hash in &remove_values {
558 let key = CompactBlock::to_u32(
559 tx_hash[28],
560 tx_hash[29],
561 tx_hash[30],
562 tx_hash[31],
563 );
564
565 if let Some(set) = self.inner.tx_hashes_map.get_mut(&key) {
566 if set.len() == 1 {
568 self.inner.tx_hashes_map.remove(&key);
569 } else {
570 set.remove(tx_hash);
571 }
572 self.inner.tx_map.remove(tx_hash);
573 }
574 }
575 }
576 }
577}