cfxcore/pos/mempool/core_mempool/
transaction_store.rs1use crate::pos::mempool::{
9 core_mempool::{
10 index::{
11 AccountTransactionIter, AccountTransactions, TTLIndex,
12 TimelineIndex,
13 },
14 transaction::{MempoolTransaction, TimelineState},
15 ttl_cache::TtlCache,
16 },
17 counters,
18 logging::{LogEntry, LogEvent, LogSchema, TxnsLog},
19};
20use diem_config::config::MempoolConfig;
21use diem_crypto::{hash::CryptoHash, HashValue};
22use diem_logger::prelude::*;
23use diem_types::{
24 account_address::AccountAddress,
25 mempool_status::{MempoolStatus, MempoolStatusCode},
26 transaction::{SignedTransaction, TransactionPayload},
27};
28use std::{
29 collections::{hash_map::Values, HashMap, HashSet},
30 time::{Duration, SystemTime},
31};
32
33pub struct TransactionStore {
35 transactions: AccountTransactions,
37 pivot_decisions: HashMap<HashValue, HashSet<(AccountAddress, HashValue)>>,
39
40 expiration_time_index: TTLIndex,
42 system_ttl_index: TTLIndex,
47 timeline_index: TimelineIndex,
48
49 _capacity: usize,
51}
52
53pub type PivotDecisionIter<'a> =
54 Values<'a, HashValue, HashSet<(AccountAddress, HashValue)>>;
55
56impl TransactionStore {
57 pub(crate) fn new(config: &MempoolConfig) -> Self {
58 Self {
59 transactions: AccountTransactions::new(),
61 pivot_decisions: HashMap::new(),
62
63 system_ttl_index: TTLIndex::new(Box::new(
65 |t: &MempoolTransaction| t.expiration_time,
66 )),
67 expiration_time_index: TTLIndex::new(Box::new(
68 |t: &MempoolTransaction| {
69 Duration::from_secs(t.txn.expiration_timestamp_secs())
70 },
71 )),
72 timeline_index: TimelineIndex::new(),
73
74 _capacity: config.capacity,
76 }
77 }
78
79 pub(crate) fn get(&self, hash: &HashValue) -> Option<SignedTransaction> {
81 if let Some(txn) = self.transactions.get(hash) {
82 return Some(txn.txn.clone());
83 }
84 None
85 }
86
87 pub(crate) fn get_pivot_decisions(
89 &self, hash: &HashValue,
90 ) -> Vec<HashValue> {
91 if let Some(decisions) = self.pivot_decisions.get(hash) {
92 decisions
93 .iter()
94 .map(|(_, tx_hash)| tx_hash.clone())
95 .collect::<_>()
96 } else {
97 vec![]
98 }
99 }
100
101 pub(crate) fn insert(
104 &mut self, mut txn: MempoolTransaction,
105 ) -> MempoolStatus {
106 let address = txn.get_sender();
107 let hash = txn.get_hash();
108 let has_tx = self.get(&hash).is_some();
109
110 if has_tx {
111 return MempoolStatus::new(MempoolStatusCode::Accepted);
112 }
113
114 self.timeline_index.insert(&mut txn);
115
116 self.system_ttl_index.insert(&txn);
120 self.expiration_time_index.insert(&txn);
121
122 let payload = txn.txn.clone().into_raw_transaction().into_payload();
123 if let TransactionPayload::PivotDecision(pivot_decision) = payload {
124 let pivot_decision_hash = pivot_decision.hash();
125 self.pivot_decisions
126 .entry(pivot_decision_hash)
127 .or_insert_with(HashSet::new);
128 if let Some(account_decision) =
129 self.pivot_decisions.get_mut(&pivot_decision_hash)
130 {
131 diem_debug!("txpool::insert pivot {:?}", hash);
132 account_decision.insert((address, hash));
133 }
134 self.transactions.insert(hash, txn, true);
135 } else {
136 self.transactions.insert(hash, txn, false);
137 }
138 self.track_indices();
139 diem_debug!(
140 LogSchema::new(LogEntry::AddTxn)
141 .txns(TxnsLog::new_txn(address, hash)),
142 hash = hash,
143 has_tx = has_tx
144 );
145
146 MempoolStatus::new(MempoolStatusCode::Accepted)
147 }
148
149 fn track_indices(&self) {
150 counters::core_mempool_index_size(
151 counters::SYSTEM_TTL_INDEX_LABEL,
152 self.system_ttl_index.size(),
153 );
154 counters::core_mempool_index_size(
155 counters::EXPIRATION_TIME_INDEX_LABEL,
156 self.expiration_time_index.size(),
157 );
158 counters::core_mempool_index_size(
159 counters::TIMELINE_INDEX_LABEL,
160 self.timeline_index.size(),
161 );
162 }
163
164 pub(crate) fn commit_transaction(
169 &mut self, _account: &AccountAddress, hash: HashValue,
170 ) {
171 let mut txns_log = TxnsLog::new();
172 if let Some(transaction) = self.transactions.remove(&hash) {
173 txns_log.add(transaction.get_sender(), transaction.get_hash());
174 self.index_remove(&transaction);
175 let payload = transaction.txn.into_raw_transaction().into_payload();
177 if let TransactionPayload::PivotDecision(pivot_decision) = payload {
178 let pivot_decision_hash = pivot_decision.hash();
179 if let Some(indices) =
180 self.pivot_decisions.remove(&pivot_decision_hash)
181 {
182 for (_, hash) in indices {
183 if let Some(txn) = self.transactions.remove(&hash) {
184 txns_log.add(txn.get_sender(), txn.get_hash());
185 self.index_remove(&txn);
186 }
187 }
188 }
189 }
190 }
191 diem_debug!(LogSchema::new(LogEntry::CleanCommittedTxn).txns(txns_log));
192 }
193
194 pub(crate) fn reject_transaction(
195 &mut self, account: &AccountAddress, _hash: HashValue,
196 ) {
197 let mut txns_log = TxnsLog::new();
198 let mut hashes = Vec::new();
199 for txn in self.transactions.iter() {
200 if txn.get_sender() == *account {
201 txns_log.add(txn.get_sender(), txn.get_hash());
202 hashes.push(txn.get_hash());
203 }
204 }
205 for txn in self.transactions.iter_pivot_decision() {
206 if txn.get_sender() == *account {
207 txns_log.add(txn.get_sender(), txn.get_hash());
208 hashes.push(txn.get_hash());
209 }
210 }
211 for hash in hashes {
212 if let Some(txn) = self.transactions.remove(&hash) {
213 self.index_remove(&txn);
214 }
215 }
216 diem_debug!(LogSchema::new(LogEntry::CleanRejectedTxn).txns(txns_log));
217 }
218
219 fn index_remove(&mut self, txn: &MempoolTransaction) {
221 counters::CORE_MEMPOOL_REMOVED_TXNS.inc();
222 self.system_ttl_index.remove(&txn);
223 self.expiration_time_index.remove(&txn);
224 self.timeline_index.remove(&txn);
225 self.track_indices();
226 }
227
228 pub(crate) fn read_timeline(
231 &mut self, timeline_id: u64, count: usize,
232 ) -> (Vec<SignedTransaction>, u64) {
233 let mut batch = vec![];
234 let mut last_timeline_id = timeline_id;
235 for (_, hash) in self.timeline_index.read_timeline(timeline_id, count) {
236 if let Some(txn) = self.transactions.get(&hash) {
237 batch.push(txn.txn.clone());
238 if let TimelineState::Ready(timeline_id) = txn.timeline_state {
239 last_timeline_id = timeline_id;
240 }
241 }
242 }
243 (batch, last_timeline_id)
244 }
245
246 pub(crate) fn timeline_range(
247 &mut self, start_id: u64, end_id: u64,
248 ) -> Vec<SignedTransaction> {
249 self.timeline_index
250 .timeline_range(start_id, end_id)
251 .iter()
252 .filter_map(|(_, hash)| {
253 self.transactions.get(hash).map(|txn| txn.txn.clone())
254 })
255 .collect()
256 }
257
258 pub(crate) fn gc_by_system_ttl(
260 &mut self,
261 metrics_cache: &TtlCache<(AccountAddress, HashValue), SystemTime>,
262 ) {
263 let now = diem_infallible::duration_since_epoch();
264
265 self.gc(now, true, metrics_cache);
266 }
267
268 pub(crate) fn gc_by_expiration_time(
271 &mut self, block_time: Duration,
272 metrics_cache: &TtlCache<(AccountAddress, HashValue), SystemTime>,
273 ) {
274 self.gc(block_time, false, metrics_cache);
275 }
276
277 fn gc(
278 &mut self, now: Duration, by_system_ttl: bool,
279 _metrics_cache: &TtlCache<(AccountAddress, HashValue), SystemTime>,
280 ) {
281 let (metric_label, index, log_event) = if by_system_ttl {
282 (
283 counters::GC_SYSTEM_TTL_LABEL,
284 &mut self.system_ttl_index,
285 LogEvent::SystemTTLExpiration,
286 )
287 } else {
288 (
289 counters::GC_CLIENT_EXP_LABEL,
290 &mut self.expiration_time_index,
291 LogEvent::ClientExpiration,
292 )
293 };
294 counters::CORE_MEMPOOL_GC_EVENT_COUNT
295 .with_label_values(&[metric_label])
296 .inc();
297
298 let mut gc_txns = index.gc(now);
299 gc_txns.sort_by_key(|key| (key.address, key.hash));
301 let mut gc_iter = gc_txns.iter().peekable();
302
303 let mut gc_txns_log = TxnsLog::new();
304 while let Some(key) = gc_iter.next() {
305 if let Some(txn) = self.transactions.remove(&key.hash) {
306 gc_txns_log.add(txn.get_sender(), txn.get_hash());
307 self.index_remove(&txn);
308 if let TransactionPayload::PivotDecision(pivot_decision) =
309 txn.txn.into_raw_transaction().into_payload()
310 {
311 self.pivot_decisions.remove(&pivot_decision.hash());
312 }
313 }
314 }
315
316 diem_debug!(LogSchema::event_log(LogEntry::GCRemoveTxns, log_event)
317 .txns(gc_txns_log));
318 self.track_indices();
319 }
320
321 pub(crate) fn iter(&self) -> AccountTransactionIter<'_> {
322 self.transactions.iter()
323 }
324
325 pub(crate) fn iter_pivot_decision(&self) -> PivotDecisionIter<'_> {
326 self.pivot_decisions.values()
327 }
328}