cfxcore/pos/mempool/core_mempool/
transaction_store.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8use crate::pos::mempool::{
9    core_mempool::{
10        index::{
11            AccountTransactionIter, AccountTransactions, TTLIndex,
12            TimelineIndex,
13        },
14        transaction::{MempoolTransaction, TimelineState},
15        ttl_cache::TtlCache,
16    },
17    counters,
18    logging::{LogEntry, LogEvent, LogSchema, TxnsLog},
19};
20use diem_config::config::MempoolConfig;
21use diem_crypto::{hash::CryptoHash, HashValue};
22use diem_logger::prelude::*;
23use diem_types::{
24    account_address::AccountAddress,
25    mempool_status::{MempoolStatus, MempoolStatusCode},
26    transaction::{SignedTransaction, TransactionPayload},
27};
28use std::{
29    collections::{hash_map::Values, HashMap, HashSet},
30    time::{Duration, SystemTime},
31};
32
33/// TransactionStore is in-memory storage for all transactions in mempool.
34pub struct TransactionStore {
35    // normal transactions
36    transactions: AccountTransactions,
37    // pivot decision helper structure
38    pivot_decisions: HashMap<HashValue, HashSet<(AccountAddress, HashValue)>>,
39
40    // TTLIndex based on client-specified expiration time
41    expiration_time_index: TTLIndex,
42    // TTLIndex based on system expiration time
43    // we keep it separate from `expiration_time_index` so Mempool can't be
44    // clogged  by old transactions even if it hasn't received commit
45    // callbacks for a while
46    system_ttl_index: TTLIndex,
47    timeline_index: TimelineIndex,
48
49    // configuration
50    _capacity: usize,
51}
52
53pub type PivotDecisionIter<'a> =
54    Values<'a, HashValue, HashSet<(AccountAddress, HashValue)>>;
55
56impl TransactionStore {
57    pub(crate) fn new(config: &MempoolConfig) -> Self {
58        Self {
59            // main DS
60            transactions: AccountTransactions::new(),
61            pivot_decisions: HashMap::new(),
62
63            // various indexes
64            system_ttl_index: TTLIndex::new(Box::new(
65                |t: &MempoolTransaction| t.expiration_time,
66            )),
67            expiration_time_index: TTLIndex::new(Box::new(
68                |t: &MempoolTransaction| {
69                    Duration::from_secs(t.txn.expiration_timestamp_secs())
70                },
71            )),
72            timeline_index: TimelineIndex::new(),
73
74            // configuration
75            _capacity: config.capacity,
76        }
77    }
78
79    /// Fetch transaction by account address + hash.
80    pub(crate) fn get(&self, hash: &HashValue) -> Option<SignedTransaction> {
81        if let Some(txn) = self.transactions.get(hash) {
82            return Some(txn.txn.clone());
83        }
84        None
85    }
86
87    /// Fetch pivot decisions by pivot hash.
88    pub(crate) fn get_pivot_decisions(
89        &self, hash: &HashValue,
90    ) -> Vec<HashValue> {
91        if let Some(decisions) = self.pivot_decisions.get(hash) {
92            decisions
93                .iter()
94                .map(|(_, tx_hash)| tx_hash.clone())
95                .collect::<_>()
96        } else {
97            vec![]
98        }
99    }
100
101    /// Insert transaction into TransactionStore. Performs validation checks and
102    /// updates indexes.
103    pub(crate) fn insert(
104        &mut self, mut txn: MempoolTransaction,
105    ) -> MempoolStatus {
106        let address = txn.get_sender();
107        let hash = txn.get_hash();
108        let has_tx = self.get(&hash).is_some();
109
110        if has_tx {
111            return MempoolStatus::new(MempoolStatusCode::Accepted);
112        }
113
114        self.timeline_index.insert(&mut txn);
115
116        // TODO(linxi): evict transaction when mempool is full
117
118        // insert into storage and other indexes
119        self.system_ttl_index.insert(&txn);
120        self.expiration_time_index.insert(&txn);
121
122        let payload = txn.txn.clone().into_raw_transaction().into_payload();
123        if let TransactionPayload::PivotDecision(pivot_decision) = payload {
124            let pivot_decision_hash = pivot_decision.hash();
125            self.pivot_decisions
126                .entry(pivot_decision_hash)
127                .or_insert_with(HashSet::new);
128            if let Some(account_decision) =
129                self.pivot_decisions.get_mut(&pivot_decision_hash)
130            {
131                diem_debug!("txpool::insert pivot {:?}", hash);
132                account_decision.insert((address, hash));
133            }
134            self.transactions.insert(hash, txn, true);
135        } else {
136            self.transactions.insert(hash, txn, false);
137        }
138        self.track_indices();
139        diem_debug!(
140            LogSchema::new(LogEntry::AddTxn)
141                .txns(TxnsLog::new_txn(address, hash)),
142            hash = hash,
143            has_tx = has_tx
144        );
145
146        MempoolStatus::new(MempoolStatusCode::Accepted)
147    }
148
149    fn track_indices(&self) {
150        counters::core_mempool_index_size(
151            counters::SYSTEM_TTL_INDEX_LABEL,
152            self.system_ttl_index.size(),
153        );
154        counters::core_mempool_index_size(
155            counters::EXPIRATION_TIME_INDEX_LABEL,
156            self.expiration_time_index.size(),
157        );
158        counters::core_mempool_index_size(
159            counters::TIMELINE_INDEX_LABEL,
160            self.timeline_index.size(),
161        );
162    }
163
164    /// Handles transaction commit.
165    /// It includes deletion of all transactions with sequence number <=
166    /// `account_sequence_number` and potential promotion of sequential txns
167    /// to PriorityIndex/TimelineIndex.
168    pub(crate) fn commit_transaction(
169        &mut self, _account: &AccountAddress, hash: HashValue,
170    ) {
171        let mut txns_log = TxnsLog::new();
172        if let Some(transaction) = self.transactions.remove(&hash) {
173            txns_log.add(transaction.get_sender(), transaction.get_hash());
174            self.index_remove(&transaction);
175            // handle pivot decision
176            let payload = transaction.txn.into_raw_transaction().into_payload();
177            if let TransactionPayload::PivotDecision(pivot_decision) = payload {
178                let pivot_decision_hash = pivot_decision.hash();
179                if let Some(indices) =
180                    self.pivot_decisions.remove(&pivot_decision_hash)
181                {
182                    for (_, hash) in indices {
183                        if let Some(txn) = self.transactions.remove(&hash) {
184                            txns_log.add(txn.get_sender(), txn.get_hash());
185                            self.index_remove(&txn);
186                        }
187                    }
188                }
189            }
190        }
191        diem_debug!(LogSchema::new(LogEntry::CleanCommittedTxn).txns(txns_log));
192    }
193
194    pub(crate) fn reject_transaction(
195        &mut self, account: &AccountAddress, _hash: HashValue,
196    ) {
197        let mut txns_log = TxnsLog::new();
198        let mut hashes = Vec::new();
199        for txn in self.transactions.iter() {
200            if txn.get_sender() == *account {
201                txns_log.add(txn.get_sender(), txn.get_hash());
202                hashes.push(txn.get_hash());
203            }
204        }
205        for txn in self.transactions.iter_pivot_decision() {
206            if txn.get_sender() == *account {
207                txns_log.add(txn.get_sender(), txn.get_hash());
208                hashes.push(txn.get_hash());
209            }
210        }
211        for hash in hashes {
212            if let Some(txn) = self.transactions.remove(&hash) {
213                self.index_remove(&txn);
214            }
215        }
216        diem_debug!(LogSchema::new(LogEntry::CleanRejectedTxn).txns(txns_log));
217    }
218
219    /// Removes transaction from all indexes.
220    fn index_remove(&mut self, txn: &MempoolTransaction) {
221        counters::CORE_MEMPOOL_REMOVED_TXNS.inc();
222        self.system_ttl_index.remove(&txn);
223        self.expiration_time_index.remove(&txn);
224        self.timeline_index.remove(&txn);
225        self.track_indices();
226    }
227
228    /// Read `count` transactions from timeline since `timeline_id`.
229    /// Returns block of transactions and new last_timeline_id.
230    pub(crate) fn read_timeline(
231        &mut self, timeline_id: u64, count: usize,
232    ) -> (Vec<SignedTransaction>, u64) {
233        let mut batch = vec![];
234        let mut last_timeline_id = timeline_id;
235        for (_, hash) in self.timeline_index.read_timeline(timeline_id, count) {
236            if let Some(txn) = self.transactions.get(&hash) {
237                batch.push(txn.txn.clone());
238                if let TimelineState::Ready(timeline_id) = txn.timeline_state {
239                    last_timeline_id = timeline_id;
240                }
241            }
242        }
243        (batch, last_timeline_id)
244    }
245
246    pub(crate) fn timeline_range(
247        &mut self, start_id: u64, end_id: u64,
248    ) -> Vec<SignedTransaction> {
249        self.timeline_index
250            .timeline_range(start_id, end_id)
251            .iter()
252            .filter_map(|(_, hash)| {
253                self.transactions.get(hash).map(|txn| txn.txn.clone())
254            })
255            .collect()
256    }
257
258    /// Garbage collect old transactions.
259    pub(crate) fn gc_by_system_ttl(
260        &mut self,
261        metrics_cache: &TtlCache<(AccountAddress, HashValue), SystemTime>,
262    ) {
263        let now = diem_infallible::duration_since_epoch();
264
265        self.gc(now, true, metrics_cache);
266    }
267
268    /// Garbage collect old transactions based on client-specified expiration
269    /// time.
270    pub(crate) fn gc_by_expiration_time(
271        &mut self, block_time: Duration,
272        metrics_cache: &TtlCache<(AccountAddress, HashValue), SystemTime>,
273    ) {
274        self.gc(block_time, false, metrics_cache);
275    }
276
277    fn gc(
278        &mut self, now: Duration, by_system_ttl: bool,
279        _metrics_cache: &TtlCache<(AccountAddress, HashValue), SystemTime>,
280    ) {
281        let (metric_label, index, log_event) = if by_system_ttl {
282            (
283                counters::GC_SYSTEM_TTL_LABEL,
284                &mut self.system_ttl_index,
285                LogEvent::SystemTTLExpiration,
286            )
287        } else {
288            (
289                counters::GC_CLIENT_EXP_LABEL,
290                &mut self.expiration_time_index,
291                LogEvent::ClientExpiration,
292            )
293        };
294        counters::CORE_MEMPOOL_GC_EVENT_COUNT
295            .with_label_values(&[metric_label])
296            .inc();
297
298        let mut gc_txns = index.gc(now);
299        // sort the expired txns by order of sequence number per account
300        gc_txns.sort_by_key(|key| (key.address, key.hash));
301        let mut gc_iter = gc_txns.iter().peekable();
302
303        let mut gc_txns_log = TxnsLog::new();
304        while let Some(key) = gc_iter.next() {
305            if let Some(txn) = self.transactions.remove(&key.hash) {
306                gc_txns_log.add(txn.get_sender(), txn.get_hash());
307                self.index_remove(&txn);
308                if let TransactionPayload::PivotDecision(pivot_decision) =
309                    txn.txn.into_raw_transaction().into_payload()
310                {
311                    self.pivot_decisions.remove(&pivot_decision.hash());
312                }
313            }
314        }
315
316        diem_debug!(LogSchema::event_log(LogEntry::GCRemoveTxns, log_event)
317            .txns(gc_txns_log));
318        self.track_indices();
319    }
320
321    pub(crate) fn iter(&self) -> AccountTransactionIter<'_> {
322        self.transactions.iter()
323    }
324
325    pub(crate) fn iter_pivot_decision(&self) -> PivotDecisionIter<'_> {
326        self.pivot_decisions.values()
327    }
328}