cfxcore/sync/request_manager/
tx_handler.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::sync::message::TransactionDigests;
6use cfx_types::H256;
7use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
8use metrics::{register_meter_with_group, Meter, MeterTimer};
9use network::node_table::NodeId;
10use primitives::{block::CompactBlock, SignedTransaction, TxPropagateId};
11use std::{
12    collections::{HashMap, HashSet},
13    sync::Arc,
14    time::{SystemTime, UNIX_EPOCH},
15};
16lazy_static! {
17    static ref TX_FIRST_MISS_METER: Arc<dyn Meter> =
18        register_meter_with_group("tx_propagation", "tx_first_miss_size");
19    static ref TX_FOR_COMPARE_METER: Arc<dyn Meter> =
20        register_meter_with_group("tx_propagation", "tx_for_compare_size");
21    static ref TX_RANDOM_BYTE_METER: Arc<dyn Meter> =
22        register_meter_with_group("tx_propagation", "tx_random_byte_size");
23    static ref FULL_TX_COMPARE_METER: Arc<dyn Meter> =
24        register_meter_with_group("tx_propagation", "full_tx_cmpare_size");
25    static ref TX_INFLIGHT_COMPARISON_METER: Arc<dyn Meter> =
26        register_meter_with_group(
27            "tx_propagation",
28            "tx_inflight_comparison_size"
29        );
30    static ref REQUEST_MANAGER_PENDING_INFLIGHT_TX_TIMER: Arc<dyn Meter> =
31        register_meter_with_group(
32            "timer",
33            "request_manager::request_pending_inflight_tx"
34        );
35}
36
37#[derive(DeriveMallocSizeOf)]
38struct TimeWindowEntry<T> {
39    pub secs: u64,
40    pub values: Vec<T>,
41}
42
43#[derive(DeriveMallocSizeOf)]
44struct TimeWindow<T> {
45    window_size: usize,
46    slot_duration_as_secs: u64,
47    time_windowed_indices: Vec<Option<TimeWindowEntry<T>>>,
48}
49
50impl<T> TimeWindow<T> {
51    pub fn new(timeout: u64, window_size: usize) -> Self {
52        let mut time_windowed_indices = Vec::new();
53        for _ in 0..window_size {
54            time_windowed_indices.push(None);
55        }
56        TimeWindow {
57            window_size,
58            slot_duration_as_secs: timeout / window_size as u64,
59            time_windowed_indices,
60        }
61    }
62
63    //returns values that need to be removed
64    pub fn append_entry(&mut self, mut values: Vec<T>) -> Option<Vec<T>> {
65        let now = SystemTime::now();
66        let duration = now.duration_since(UNIX_EPOCH);
67        let secs = duration.unwrap().as_secs();
68        let window_index =
69            (secs / self.slot_duration_as_secs) as usize % self.window_size;
70        let mut res = None;
71
72        if self.time_windowed_indices[window_index].is_none() {
73            self.time_windowed_indices[window_index] =
74                Some(TimeWindowEntry { secs, values });
75        } else {
76            let indices_with_time =
77                self.time_windowed_indices[window_index].as_mut().unwrap();
78            if indices_with_time.secs + self.slot_duration_as_secs <= secs {
79                indices_with_time.secs = secs;
80                std::mem::swap(&mut values, &mut indices_with_time.values);
81                res = Some(values);
82            } else {
83                indices_with_time.values.append(&mut values);
84            }
85        };
86
87        res
88    }
89}
90
91#[derive(DeriveMallocSizeOf)]
92struct ReceivedTransactionContainerInner {
93    tx_hashes_map: HashMap<TxPropagateId, HashSet<H256>>,
94    tx_hashes_set: HashSet<H256>,
95    time_window: TimeWindow<H256>,
96}
97
98impl ReceivedTransactionContainerInner {
99    pub fn new(timeout: u64, window_size: usize) -> Self {
100        ReceivedTransactionContainerInner {
101            tx_hashes_map: HashMap::new(),
102            tx_hashes_set: HashSet::new(),
103            time_window: TimeWindow::new(timeout, window_size),
104        }
105    }
106}
107
108#[derive(DeriveMallocSizeOf)]
109pub struct ReceivedTransactionContainer {
110    inner: ReceivedTransactionContainerInner,
111}
112
113impl ReceivedTransactionContainer {
114    const BUCKET_LIMIT: usize = 10;
115    const RECEIVED_TRANSACTION_CONTAINER_WINDOW_SIZE: usize = 64;
116
117    pub fn new(timeout: u64) -> Self {
118        ReceivedTransactionContainer {
119            inner: ReceivedTransactionContainerInner::new(
120                timeout,
121                ReceivedTransactionContainer::RECEIVED_TRANSACTION_CONTAINER_WINDOW_SIZE,
122            ),
123        }
124    }
125
126    pub fn group_overflow(&self, fixed_bytes: &TxPropagateId) -> bool {
127        if let Some(set) = self.inner.tx_hashes_map.get(&fixed_bytes) {
128            return set.len() >= ReceivedTransactionContainer::BUCKET_LIMIT;
129        }
130        false
131    }
132
133    pub fn group_overflow_from_tx_hash(&self, full_trans_id: &H256) -> bool {
134        let key: TxPropagateId = TransactionDigests::to_u24(
135            full_trans_id[29],
136            full_trans_id[30],
137            full_trans_id[31],
138        );
139        self.group_overflow(&key)
140    }
141
142    pub fn contains_short_id(
143        &self, fixed_bytes: TxPropagateId, random_byte: u8, key1: u64,
144        key2: u64,
145    ) -> bool {
146        let inner = &self.inner;
147        TX_FOR_COMPARE_METER.mark(1);
148
149        match inner.tx_hashes_map.get(&fixed_bytes) {
150            Some(set) => {
151                TX_FIRST_MISS_METER.mark(1);
152                for value in set {
153                    if TransactionDigests::get_random_byte(value, key1, key2)
154                        == random_byte
155                    {
156                        TX_RANDOM_BYTE_METER.mark(1);
157                        return true;
158                    }
159                }
160            }
161            None => {}
162        }
163        false
164    }
165
166    pub fn contains_tx_hash(&self, tx_hash: &H256) -> bool {
167        FULL_TX_COMPARE_METER.mark(1);
168        self.inner.tx_hashes_set.contains(tx_hash)
169    }
170
171    pub fn get_length(&self) -> usize { self.inner.tx_hashes_map.len() }
172
173    pub fn append_transactions(
174        &mut self, transactions: Vec<Arc<SignedTransaction>>,
175    ) {
176        let mut values = Vec::new();
177
178        for transaction in transactions {
179            let tx_hash = transaction.hash();
180            let short_id = TransactionDigests::to_u24(
181                tx_hash[29],
182                tx_hash[30],
183                tx_hash[31],
184            ); //read the last three bytes
185            self.inner
186                .tx_hashes_map
187                .entry(short_id)
188                .and_modify(|s| {
189                    s.insert(tx_hash.clone());
190                })
191                .or_insert_with(|| {
192                    let mut set = HashSet::new();
193                    set.insert(tx_hash.clone());
194                    set
195                }); //if occupied, append, else, insert.
196
197            self.inner.tx_hashes_set.insert(tx_hash.clone());
198
199            values.push(tx_hash);
200        }
201
202        if let Some(remove_values) = self.inner.time_window.append_entry(values)
203        {
204            for tx_hash in &remove_values {
205                let key = TransactionDigests::to_u24(
206                    tx_hash[29],
207                    tx_hash[30],
208                    tx_hash[31],
209                );
210                if let Some(set) = self.inner.tx_hashes_map.get_mut(&key) {
211                    // if there is a value asscicated with the key
212                    if set.len() == 1 {
213                        self.inner.tx_hashes_map.remove(&key);
214                    } else {
215                        set.remove(tx_hash);
216                    }
217                    self.inner.tx_hashes_set.remove(tx_hash);
218                }
219            }
220        }
221    }
222}
223
224#[derive(DeriveMallocSizeOf)]
225struct SentTransactionContainerInner {
226    window_size: usize,
227    base_time_tick: usize,
228    next_time_tick: usize,
229    time_windowed_indices: Vec<Option<Vec<Arc<SignedTransaction>>>>,
230}
231
232impl SentTransactionContainerInner {
233    pub fn new(window_size: usize) -> Self {
234        let mut time_windowed_indices = Vec::new();
235        for _ in 0..window_size {
236            time_windowed_indices.push(None);
237        }
238
239        SentTransactionContainerInner {
240            window_size,
241            base_time_tick: 0,
242            next_time_tick: 0,
243            time_windowed_indices,
244        }
245    }
246}
247
248/// This struct is not implemented as thread-safe since
249/// currently it is only used under protection of lock
250/// on SynchronizationState. Later we may refine the
251/// locking design to make it thread-safe.
252#[derive(DeriveMallocSizeOf)]
253pub struct SentTransactionContainer {
254    inner: SentTransactionContainerInner,
255}
256
257impl SentTransactionContainer {
258    pub fn new(window_size: usize) -> Self {
259        SentTransactionContainer {
260            inner: SentTransactionContainerInner::new(window_size),
261        }
262    }
263
264    pub fn get_transaction(
265        &self, window_index: usize, index: usize,
266    ) -> Option<Arc<SignedTransaction>> {
267        let inner = &self.inner;
268        if window_index >= inner.base_time_tick {
269            if window_index - inner.base_time_tick >= inner.window_size {
270                return None;
271            }
272        } else {
273            if std::usize::MAX - inner.base_time_tick + window_index + 1
274                >= inner.window_size
275            {
276                return None;
277            }
278        }
279
280        let transactions = inner.time_windowed_indices
281            [window_index % inner.window_size]
282            .as_ref();
283        if transactions.is_none() {
284            return None;
285        }
286
287        let transactions = transactions.unwrap();
288        if index >= transactions.len() {
289            return None;
290        }
291
292        Some(transactions[index].clone())
293    }
294
295    pub fn append_transactions(
296        &mut self, transactions: Vec<Arc<SignedTransaction>>,
297    ) -> usize {
298        let inner = &mut self.inner;
299
300        let base_window_index = inner.base_time_tick % inner.window_size;
301        let next_time_tick = inner.next_time_tick;
302        let next_window_index = next_time_tick % inner.window_size;
303        inner.time_windowed_indices[next_window_index] = Some(transactions);
304        if (next_window_index + 1) % inner.window_size == base_window_index {
305            inner.base_time_tick += 1;
306        }
307        inner.next_time_tick += 1;
308        next_time_tick
309    }
310}
311
312#[derive(Eq, PartialEq, Hash, DeriveMallocSizeOf)]
313pub struct InflightPendingTransactionItem {
314    pub fixed_byte_part: TxPropagateId,
315    pub random_byte_part: u8,
316    pub window_index: usize,
317    pub key1: u64,
318    pub key2: u64,
319    pub index: usize,
320    pub peer_id: NodeId,
321}
322impl InflightPendingTransactionItem {
323    pub fn new(
324        fixed_byte_part: TxPropagateId, random_byte_part: u8,
325        window_index: usize, key1: u64, key2: u64, index: usize,
326        peer_id: NodeId,
327    ) -> Self {
328        InflightPendingTransactionItem {
329            fixed_byte_part,
330            random_byte_part,
331            window_index,
332            key1,
333            key2,
334            index,
335            peer_id,
336        }
337    }
338}
339
340#[derive(DeriveMallocSizeOf)]
341struct InflightPendingTransactionContainerInner {
342    txid_hashmap:
343        HashMap<TxPropagateId, HashSet<Arc<InflightPendingTransactionItem>>>,
344    time_window: TimeWindow<Arc<InflightPendingTransactionItem>>,
345}
346
347impl InflightPendingTransactionContainerInner {
348    pub fn new(timeout: u64, window_size: usize) -> Self {
349        InflightPendingTransactionContainerInner {
350            txid_hashmap: HashMap::new(),
351            time_window: TimeWindow::new(timeout, window_size),
352        }
353    }
354}
355
356#[derive(DeriveMallocSizeOf)]
357pub struct InflightPendingTransactionContainer {
358    inner: InflightPendingTransactionContainerInner,
359}
360
361impl InflightPendingTransactionContainer {
362    const INFLIGHT_PENDING_TRANSACTION_CONTAINER_WINDOW_SIZE: usize = 5;
363
364    pub fn new(timeout: u64) -> Self {
365        InflightPendingTransactionContainer {
366            inner: InflightPendingTransactionContainerInner::new(
367                timeout,
368                InflightPendingTransactionContainer::INFLIGHT_PENDING_TRANSACTION_CONTAINER_WINDOW_SIZE,
369            ),
370        }
371    }
372
373    pub fn generate_tx_requests_from_inflight_pending_pool(
374        &mut self, signed_transactions: &Vec<Arc<SignedTransaction>>,
375    ) -> (
376        Vec<Arc<InflightPendingTransactionItem>>,
377        HashSet<TxPropagateId>,
378    ) {
379        let _timer = MeterTimer::time_func(
380            REQUEST_MANAGER_PENDING_INFLIGHT_TX_TIMER.as_ref(),
381        );
382        let mut requests = vec![];
383        let mut keeped_short_inflight_keys = HashSet::new();
384        for tx in signed_transactions {
385            let hash = tx.hash;
386            let fixed_bytes_part =
387                TransactionDigests::to_u24(hash[29], hash[30], hash[31]);
388            match self.inner.txid_hashmap.get_mut(&fixed_bytes_part) {
389                Some(set) => {
390                    set.retain(|item| {
391                        TransactionDigests::get_random_byte(
392                            &hash, item.key1, item.key2,
393                        ) != item.random_byte_part
394                    });
395                    if set.len() == 0 {
396                        self.inner.txid_hashmap.remove(&fixed_bytes_part);
397                    } else {
398                        if let Some(item) = set.iter().next() {
399                            requests.push(item.clone());
400                            keeped_short_inflight_keys
401                                .insert(item.fixed_byte_part);
402                            // Remove `item` from `set`
403                            set.remove(requests.last().expect("Just pushed"));
404                        }
405                        if set.len() == 0 {
406                            self.inner.txid_hashmap.remove(&fixed_bytes_part);
407                        }
408                    }
409                }
410                None => {}
411            }
412        }
413        (requests, keeped_short_inflight_keys)
414    }
415
416    pub fn append_inflight_pending_items(
417        &mut self, items: Vec<InflightPendingTransactionItem>,
418    ) {
419        let mut values = Vec::new();
420        for item in items {
421            let key = item.fixed_byte_part;
422            let inflight_pending_item = Arc::new(item);
423            self.inner
424                .txid_hashmap
425                .entry(key)
426                .and_modify(|s| {
427                    s.insert(inflight_pending_item.clone());
428                })
429                .or_insert_with(|| {
430                    let mut set = HashSet::new();
431                    set.insert(inflight_pending_item.clone());
432                    set
433                }); //if occupied, append, else, insert.
434
435            values.push(inflight_pending_item);
436        }
437
438        if let Some(remove_values) = self.inner.time_window.append_entry(values)
439        {
440            for item in &remove_values {
441                if let Some(set) =
442                    self.inner.txid_hashmap.get_mut(&item.fixed_byte_part)
443                {
444                    //TODO: if this section executed, it means the node has
445                    // not received the corresponding tx responses. this
446                    // should be handled by either disconnected the node
447                    // or making another request from a random inflight
448                    // pending item.
449                    if set.len() == 1 {
450                        self.inner.txid_hashmap.remove(&item.fixed_byte_part);
451                    } else {
452                        set.remove(item);
453                    }
454                }
455            }
456        }
457    }
458}
459
460#[derive(DeriveMallocSizeOf)]
461struct TransactionCacheContainerInner {
462    tx_hashes_map: HashMap<u32, HashSet<H256>>,
463    tx_map: HashMap<H256, Arc<SignedTransaction>>,
464    time_window: TimeWindow<H256>,
465}
466
467impl TransactionCacheContainerInner {
468    pub fn new(timeout: u64, window_size: usize) -> Self {
469        TransactionCacheContainerInner {
470            tx_hashes_map: HashMap::new(),
471            tx_map: HashMap::new(),
472            time_window: TimeWindow::new(timeout, window_size),
473        }
474    }
475}
476
477#[derive(DeriveMallocSizeOf)]
478pub struct TransactionCacheContainer {
479    inner: TransactionCacheContainerInner,
480}
481
482impl TransactionCacheContainer {
483    const TRANSACTION_CACHE_CONTAINER_WINDOW_SIZE: usize = 64;
484
485    pub fn new(timeout: u64) -> Self {
486        TransactionCacheContainer {
487            inner: TransactionCacheContainerInner::new(
488                timeout,
489                TransactionCacheContainer::TRANSACTION_CACHE_CONTAINER_WINDOW_SIZE,
490            ),
491        }
492    }
493
494    pub fn contains_key(&self, tx_hash: &H256) -> bool {
495        self.inner.tx_map.contains_key(tx_hash)
496    }
497
498    pub fn get(&self, tx_hash: &H256) -> Option<&Arc<SignedTransaction>> {
499        self.inner.tx_map.get(tx_hash)
500    }
501
502    pub fn get_transaction(
503        &self, fixed_bytes: u32, random_bytes: u16, key1: u64, key2: u64,
504    ) -> Option<Arc<SignedTransaction>> {
505        let inner = &self.inner;
506        let mut tx = None;
507        match inner.tx_hashes_map.get(&fixed_bytes) {
508            Some(set) => {
509                for value in set {
510                    if CompactBlock::get_random_bytes(value, key1, key2)
511                        == random_bytes
512                    {
513                        if tx.is_none() {
514                            tx = Some(self.get(value).unwrap().clone());
515                        } else {
516                            return None;
517                        }
518                    }
519                }
520            }
521            None => {}
522        }
523        tx
524    }
525
526    pub fn append_transactions(
527        &mut self, transactions: &Vec<(usize, Arc<SignedTransaction>)>,
528    ) {
529        let mut values = Vec::new();
530        for (_, transaction) in transactions {
531            let tx_hash = transaction.hash();
532            let short_id = CompactBlock::to_u32(
533                tx_hash[28],
534                tx_hash[29],
535                tx_hash[30],
536                tx_hash[31],
537            );
538            self.inner
539                .tx_hashes_map
540                .entry(short_id)
541                .and_modify(|s| {
542                    s.insert(tx_hash.clone());
543                })
544                .or_insert_with(|| {
545                    let mut set = HashSet::new();
546                    set.insert(tx_hash.clone());
547                    set
548                }); //if occupied, append, else, insert.
549            self.inner
550                .tx_map
551                .insert(tx_hash.clone(), transaction.clone());
552            values.push(tx_hash);
553        }
554
555        if let Some(remove_values) = self.inner.time_window.append_entry(values)
556        {
557            for tx_hash in &remove_values {
558                let key = CompactBlock::to_u32(
559                    tx_hash[28],
560                    tx_hash[29],
561                    tx_hash[30],
562                    tx_hash[31],
563                );
564
565                if let Some(set) = self.inner.tx_hashes_map.get_mut(&key) {
566                    // if there is a value asscicated with the key
567                    if set.len() == 1 {
568                        self.inner.tx_hashes_map.remove(&key);
569                    } else {
570                        set.remove(tx_hash);
571                    }
572                    self.inner.tx_map.remove(tx_hash);
573                }
574            }
575        }
576    }
577}