cfxcore/pos/mempool/core_mempool/
index.rs1use crate::pos::mempool::core_mempool::transaction::{
10 MempoolTransaction, TimelineState,
11};
12use diem_crypto::HashValue;
13use diem_types::account_address::AccountAddress;
14use std::{
15 cmp::Ordering,
16 collections::{hash_map::Values, BTreeMap, BTreeSet, HashMap},
17 ops::Bound,
18 time::Duration,
19};
20
21pub struct AccountTransactions {
22 normal_transaction: HashMap<HashValue, MempoolTransaction>,
23 pivot_decision_transaction: HashMap<HashValue, MempoolTransaction>,
24}
25
26pub type AccountTransactionIter<'a> = Values<'a, HashValue, MempoolTransaction>;
27
28impl AccountTransactions {
29 pub(crate) fn new() -> Self {
30 Self {
31 normal_transaction: HashMap::new(),
32 pivot_decision_transaction: HashMap::new(),
33 }
34 }
35
36 pub(crate) fn get(&self, hash: &HashValue) -> Option<&MempoolTransaction> {
37 if let Some(txn) = self.normal_transaction.get(hash) {
38 Some(txn)
39 } else {
40 self.pivot_decision_transaction.get(hash)
41 }
42 }
43
44 pub(crate) fn remove(
45 &mut self, hash: &HashValue,
46 ) -> Option<MempoolTransaction> {
47 if let Some(txn) = self.normal_transaction.remove(hash) {
48 Some(txn)
49 } else {
50 self.pivot_decision_transaction.remove(hash)
51 }
52 }
53
54 pub(crate) fn insert(
55 &mut self, hash: HashValue, txn: MempoolTransaction,
56 is_pivot_decision: bool,
57 ) {
58 if is_pivot_decision {
59 self.pivot_decision_transaction.insert(hash, txn);
60 } else {
61 self.normal_transaction.insert(hash, txn);
62 }
63 }
64
65 pub(crate) fn iter(&self) -> AccountTransactionIter<'_> {
66 self.normal_transaction.values()
67 }
68}
69
70pub type TxnPointer = (AccountAddress, HashValue);
71
72impl From<&MempoolTransaction> for TxnPointer {
73 fn from(transaction: &MempoolTransaction) -> Self {
74 (transaction.get_sender(), transaction.get_hash())
75 }
76}
77
78pub struct TTLIndex {
84 data: BTreeSet<TTLOrderingKey>,
85 get_expiration_time:
86 Box<dyn Fn(&MempoolTransaction) -> Duration + Send + Sync>,
87}
88
89impl TTLIndex {
90 pub(crate) fn new<F>(get_expiration_time: Box<F>) -> Self
91 where F: Fn(&MempoolTransaction) -> Duration + 'static + Send + Sync {
92 Self {
93 data: BTreeSet::new(),
94 get_expiration_time,
95 }
96 }
97
98 pub(crate) fn insert(&mut self, txn: &MempoolTransaction) {
99 self.data.insert(self.make_key(&txn));
100 }
101
102 pub(crate) fn remove(&mut self, txn: &MempoolTransaction) {
103 self.data.remove(&self.make_key(&txn));
104 }
105
106 pub(crate) fn gc(&mut self, now: Duration) -> Vec<TTLOrderingKey> {
108 let ttl_key = TTLOrderingKey {
109 expiration_time: now,
110 address: AccountAddress::ZERO,
111 hash: HashValue::zero(),
112 };
113
114 let mut active = self.data.split_off(&ttl_key);
115 let ttl_transactions = self.data.iter().cloned().collect();
116 self.data.clear();
117 self.data.append(&mut active);
118 ttl_transactions
119 }
120
121 fn make_key(&self, txn: &MempoolTransaction) -> TTLOrderingKey {
122 TTLOrderingKey {
123 expiration_time: (self.get_expiration_time)(txn),
124 address: txn.get_sender(),
125 hash: txn.get_hash(),
126 }
127 }
128}
129
130#[allow(clippy::derive_ord_xor_partial_ord)]
131#[derive(Eq, PartialEq, PartialOrd, Clone, Debug)]
132pub struct TTLOrderingKey {
133 pub expiration_time: Duration,
134 pub address: AccountAddress,
135 pub hash: HashValue,
136}
137
138#[allow(clippy::derive_ord_xor_partial_ord)]
141impl Ord for TTLOrderingKey {
142 fn cmp(&self, other: &TTLOrderingKey) -> Ordering {
143 match self.expiration_time.cmp(&other.expiration_time) {
144 Ordering::Equal => {
145 (&self.address, self.hash).cmp(&(&other.address, other.hash))
146 }
147 ordering => ordering,
148 }
149 }
150}
151
152pub struct TimelineIndex {
162 timeline_id: u64,
163 timeline: BTreeMap<u64, (AccountAddress, HashValue)>,
164}
165
166impl TimelineIndex {
167 pub(crate) fn new() -> Self {
168 Self {
169 timeline_id: 1,
170 timeline: BTreeMap::new(),
171 }
172 }
173
174 pub(crate) fn read_timeline(
176 &mut self, timeline_id: u64, count: usize,
177 ) -> Vec<(AccountAddress, HashValue)> {
178 let mut batch = vec![];
179 for (_id, &(address, hash)) in self
180 .timeline
181 .range((Bound::Excluded(timeline_id), Bound::Unbounded))
182 {
183 batch.push((address, hash));
184 if batch.len() == count {
185 break;
186 }
187 }
188 batch
189 }
190
191 pub(crate) fn timeline_range(
194 &mut self, start_id: u64, end_id: u64,
195 ) -> Vec<(AccountAddress, HashValue)> {
196 self.timeline
197 .range((Bound::Excluded(start_id), Bound::Included(end_id)))
198 .map(|(_idx, txn)| txn)
199 .cloned()
200 .collect()
201 }
202
203 pub(crate) fn insert(&mut self, txn: &mut MempoolTransaction) {
204 self.timeline
205 .insert(self.timeline_id, (txn.get_sender(), txn.get_hash()));
206 txn.timeline_state = TimelineState::Ready(self.timeline_id);
207 self.timeline_id += 1;
208 }
209
210 pub(crate) fn remove(&mut self, txn: &MempoolTransaction) {
211 if let TimelineState::Ready(timeline_id) = txn.timeline_state {
212 self.timeline.remove(&timeline_id);
213 }
214 }
215}