cfxcore/pos/mempool/core_mempool/
index.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8/// This module provides various indexes used by Mempool.
9use crate::pos::mempool::core_mempool::transaction::{
10    MempoolTransaction, TimelineState,
11};
12use diem_crypto::HashValue;
13use diem_types::account_address::AccountAddress;
14use std::{
15    cmp::Ordering,
16    collections::{hash_map::Values, BTreeMap, BTreeSet, HashMap},
17    ops::Bound,
18    time::Duration,
19};
20
21pub struct AccountTransactions {
22    normal_transaction: HashMap<HashValue, MempoolTransaction>,
23    pivot_decision_transaction: HashMap<HashValue, MempoolTransaction>,
24}
25
26pub type AccountTransactionIter<'a> = Values<'a, HashValue, MempoolTransaction>;
27
28impl AccountTransactions {
29    pub(crate) fn new() -> Self {
30        Self {
31            normal_transaction: HashMap::new(),
32            pivot_decision_transaction: HashMap::new(),
33        }
34    }
35
36    pub(crate) fn get(&self, hash: &HashValue) -> Option<&MempoolTransaction> {
37        if let Some(txn) = self.normal_transaction.get(hash) {
38            Some(txn)
39        } else {
40            self.pivot_decision_transaction.get(hash)
41        }
42    }
43
44    pub(crate) fn remove(
45        &mut self, hash: &HashValue,
46    ) -> Option<MempoolTransaction> {
47        if let Some(txn) = self.normal_transaction.remove(hash) {
48            Some(txn)
49        } else {
50            self.pivot_decision_transaction.remove(hash)
51        }
52    }
53
54    pub(crate) fn insert(
55        &mut self, hash: HashValue, txn: MempoolTransaction,
56        is_pivot_decision: bool,
57    ) {
58        if is_pivot_decision {
59            self.pivot_decision_transaction.insert(hash, txn);
60        } else {
61            self.normal_transaction.insert(hash, txn);
62        }
63    }
64
65    pub(crate) fn iter(&self) -> AccountTransactionIter<'_> {
66        self.normal_transaction.values()
67    }
68}
69
70pub type TxnPointer = (AccountAddress, HashValue);
71
72impl From<&MempoolTransaction> for TxnPointer {
73    fn from(transaction: &MempoolTransaction) -> Self {
74        (transaction.get_sender(), transaction.get_hash())
75    }
76}
77
78/// TTLIndex is used to perform garbage collection of old transactions in
79/// Mempool. Periodically separate GC-like job queries this index to find out
80/// transactions that have to be removed. Index is represented as
81/// `BTreeSet<TTLOrderingKey>`, where `TTLOrderingKey` is a logical reference to
82/// TxnInfo. Index is ordered by `TTLOrderingKey::expiration_time`.
83pub struct TTLIndex {
84    data: BTreeSet<TTLOrderingKey>,
85    get_expiration_time:
86        Box<dyn Fn(&MempoolTransaction) -> Duration + Send + Sync>,
87}
88
89impl TTLIndex {
90    pub(crate) fn new<F>(get_expiration_time: Box<F>) -> Self
91    where F: Fn(&MempoolTransaction) -> Duration + 'static + Send + Sync {
92        Self {
93            data: BTreeSet::new(),
94            get_expiration_time,
95        }
96    }
97
98    pub(crate) fn insert(&mut self, txn: &MempoolTransaction) {
99        self.data.insert(self.make_key(&txn));
100    }
101
102    pub(crate) fn remove(&mut self, txn: &MempoolTransaction) {
103        self.data.remove(&self.make_key(&txn));
104    }
105
106    /// Garbage collect all old transactions.
107    pub(crate) fn gc(&mut self, now: Duration) -> Vec<TTLOrderingKey> {
108        let ttl_key = TTLOrderingKey {
109            expiration_time: now,
110            address: AccountAddress::ZERO,
111            hash: HashValue::zero(),
112        };
113
114        let mut active = self.data.split_off(&ttl_key);
115        let ttl_transactions = self.data.iter().cloned().collect();
116        self.data.clear();
117        self.data.append(&mut active);
118        ttl_transactions
119    }
120
121    fn make_key(&self, txn: &MempoolTransaction) -> TTLOrderingKey {
122        TTLOrderingKey {
123            expiration_time: (self.get_expiration_time)(txn),
124            address: txn.get_sender(),
125            hash: txn.get_hash(),
126        }
127    }
128}
129
130#[allow(clippy::derive_ord_xor_partial_ord)]
131#[derive(Eq, PartialEq, PartialOrd, Clone, Debug)]
132pub struct TTLOrderingKey {
133    pub expiration_time: Duration,
134    pub address: AccountAddress,
135    pub hash: HashValue,
136}
137
138/// Be very careful with this, to not break the partial ordering.
139/// See:  https://rust-lang.github.io/rust-clippy/master/index.html#derive_ord_xor_partial_ord
140#[allow(clippy::derive_ord_xor_partial_ord)]
141impl Ord for TTLOrderingKey {
142    fn cmp(&self, other: &TTLOrderingKey) -> Ordering {
143        match self.expiration_time.cmp(&other.expiration_time) {
144            Ordering::Equal => {
145                (&self.address, self.hash).cmp(&(&other.address, other.hash))
146            }
147            ordering => ordering,
148        }
149    }
150}
151
152/// TimelineIndex is an ordered log of all transactions that are "ready" for
153/// broadcast. We only add a transaction to the index if it has a chance to be
154/// included in the next consensus block (which means its status is != NotReady
155/// or its sequential to another "ready" transaction).
156///
157/// It's represented as Map <timeline_id, (Address, hash)>, where
158/// timeline_id is auto increment unique id of "ready" transaction in local
159/// Mempool. (Address, hash) is a logical reference to transaction
160/// content in main storage.
161pub struct TimelineIndex {
162    timeline_id: u64,
163    timeline: BTreeMap<u64, (AccountAddress, HashValue)>,
164}
165
166impl TimelineIndex {
167    pub(crate) fn new() -> Self {
168        Self {
169            timeline_id: 1,
170            timeline: BTreeMap::new(),
171        }
172    }
173
174    /// Read all transactions from the timeline since <timeline_id>.
175    pub(crate) fn read_timeline(
176        &mut self, timeline_id: u64, count: usize,
177    ) -> Vec<(AccountAddress, HashValue)> {
178        let mut batch = vec![];
179        for (_id, &(address, hash)) in self
180            .timeline
181            .range((Bound::Excluded(timeline_id), Bound::Unbounded))
182        {
183            batch.push((address, hash));
184            if batch.len() == count {
185                break;
186            }
187        }
188        batch
189    }
190
191    /// Read transactions from the timeline from `start_id` (exclusive) to
192    /// `end_id` (inclusive).
193    pub(crate) fn timeline_range(
194        &mut self, start_id: u64, end_id: u64,
195    ) -> Vec<(AccountAddress, HashValue)> {
196        self.timeline
197            .range((Bound::Excluded(start_id), Bound::Included(end_id)))
198            .map(|(_idx, txn)| txn)
199            .cloned()
200            .collect()
201    }
202
203    pub(crate) fn insert(&mut self, txn: &mut MempoolTransaction) {
204        self.timeline
205            .insert(self.timeline_id, (txn.get_sender(), txn.get_hash()));
206        txn.timeline_state = TimelineState::Ready(self.timeline_id);
207        self.timeline_id += 1;
208    }
209
210    pub(crate) fn remove(&mut self, txn: &MempoolTransaction) {
211        if let TimelineState::Ready(timeline_id) = txn.timeline_state {
212            self.timeline.remove(&timeline_id);
213        }
214    }
215}