cfxcore/pos/mempool/core_mempool/
mempool.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8//! Mempool is used to track transactions which have been submitted but not yet
9//! agreed upon.
10use crate::pos::mempool::{
11    core_mempool::{
12        index::TxnPointer,
13        transaction::{MempoolTransaction, TimelineState},
14        transaction_store::TransactionStore,
15        ttl_cache::TtlCache,
16    },
17    counters,
18    logging::{LogEntry, LogSchema, TxnsLog},
19};
20use diem_config::config::NodeConfig;
21use diem_crypto::{hash::CryptoHash, HashValue};
22use diem_logger::prelude::*;
23use diem_types::{
24    account_address::AccountAddress,
25    mempool_status::MempoolStatus,
26    term_state::PosState,
27    transaction::{
28        authenticator::TransactionAuthenticator, GovernanceRole,
29        SignedTransaction, TransactionPayload,
30    },
31    validator_verifier::ValidatorVerifier,
32};
33use executor::vm::verify_dispute;
34use std::{
35    collections::HashSet,
36    time::{Duration, SystemTime},
37};
38
39pub struct Mempool {
40    // Stores the metadata of all transactions in mempool (of all states).
41    pub transactions: TransactionStore,
42
43    // For each transaction, an entry with a timestamp is added when the
44    // transaction enters mempool. This is used to measure e2e latency of
45    // transactions in the system, as well as the time it takes to pick it
46    // up by consensus.
47    pub(crate) metrics_cache: TtlCache<(AccountAddress, HashValue), SystemTime>,
48    pub system_transaction_timeout: Duration,
49}
50
51impl Mempool {
52    pub fn new(config: &NodeConfig) -> Self {
53        Mempool {
54            transactions: TransactionStore::new(&config.mempool),
55            metrics_cache: TtlCache::new(
56                config.mempool.capacity,
57                Duration::from_secs(100),
58            ),
59            system_transaction_timeout: Duration::from_secs(
60                config.mempool.system_transaction_timeout_secs,
61            ),
62        }
63    }
64
65    /// This function will be called once the transaction has been stored.
66    pub(crate) fn remove_transaction(
67        &mut self, sender: &AccountAddress, hash: HashValue, is_rejected: bool,
68    ) {
69        diem_trace!(
70            LogSchema::new(LogEntry::RemoveTxn)
71                .txns(TxnsLog::new_txn(*sender, hash)),
72            is_rejected = is_rejected
73        );
74        let metric_label = if is_rejected {
75            counters::COMMIT_REJECTED_LABEL
76        } else {
77            counters::COMMIT_ACCEPTED_LABEL
78        };
79        self.log_latency(*sender, hash, metric_label);
80        self.metrics_cache.remove(&(*sender, hash));
81
82        if is_rejected {
83            self.transactions.reject_transaction(&sender, hash);
84        } else {
85            self.transactions.commit_transaction(&sender, hash);
86        }
87    }
88
89    fn log_latency(
90        &mut self, account: AccountAddress, hash: HashValue, metric: &str,
91    ) {
92        if let Some(&creation_time) = self.metrics_cache.get(&(account, hash)) {
93            if let Ok(time_delta) =
94                SystemTime::now().duration_since(creation_time)
95            {
96                counters::CORE_MEMPOOL_TXN_COMMIT_LATENCY
97                    .with_label_values(&[metric])
98                    .observe(time_delta.as_secs_f64());
99            }
100        }
101    }
102
103    /// Used to add a transaction to the Mempool.
104    /// Performs basic validation: checks account's sequence number.
105    pub(crate) fn add_txn(
106        &mut self, txn: SignedTransaction, ranking_score: u64,
107        timeline_state: TimelineState, governance_role: GovernanceRole,
108    ) -> MempoolStatus {
109        diem_trace!(LogSchema::new(LogEntry::AddTxn)
110            .txns(TxnsLog::new_txn(txn.sender(), txn.hash())),);
111
112        let expiration_time = diem_infallible::duration_since_epoch()
113            + self.system_transaction_timeout;
114        if timeline_state != TimelineState::NonQualified {
115            self.metrics_cache
116                .insert((txn.sender(), txn.hash()), SystemTime::now());
117        }
118
119        let txn_info = MempoolTransaction::new(
120            txn,
121            expiration_time,
122            ranking_score,
123            timeline_state,
124            governance_role,
125        );
126
127        self.transactions.insert(txn_info)
128    }
129
130    /// Fetches next block of transactions for consensus.
131    /// `batch_size` - size of requested block.
132    /// `seen_txns` - transactions that were sent to Consensus but were not
133    /// committed yet,  mempool should filter out such transactions.
134    #[allow(clippy::explicit_counter_loop)]
135    pub(crate) fn get_block(
136        &mut self, _batch_size: u64, mut seen: HashSet<TxnPointer>,
137        pos_state: &PosState, validators: ValidatorVerifier,
138    ) -> Vec<SignedTransaction> {
139        let mut block = vec![];
140        let mut block_log = TxnsLog::new();
141        // Helper DS. Helps to mitigate scenarios where account submits several
142        // transactions with increasing gas price (e.g. user submits
143        // transactions with sequence number 1, 2 and gas_price 1, 10
144        // respectively) Later txn has higher gas price and will be
145        // observed first in priority index iterator, but can't be
146        // executed before first txn. Once observed, such txn will be saved in
147        // `skipped` DS and rechecked once it's ancestor becomes available
148        let seen_size = seen.len();
149        let mut txn_walked = 0usize;
150        // iterate all normal transaction
151        for txn in self.transactions.iter() {
152            txn_walked += 1;
153            if seen.contains(&TxnPointer::from(txn)) {
154                continue;
155            }
156            let validate_result = match txn.txn.payload() {
157                TransactionPayload::Election(election_payload) => {
158                    pos_state.validate_election(election_payload)
159                }
160                TransactionPayload::PivotDecision(_) => {
161                    seen.insert((txn.get_sender(), txn.get_hash()));
162                    continue;
163                }
164                TransactionPayload::Dispute(dispute_payload) => {
165                    // TODO(lpl): Only dispute a node once.
166                    pos_state.validate_dispute(dispute_payload).and(
167                        verify_dispute(dispute_payload)
168                            .then_some(())
169                            .ok_or(anyhow::anyhow!("invalid dispute")),
170                    )
171                }
172                _ => {
173                    continue;
174                }
175            };
176            if validate_result.is_ok() {
177                block.push(txn.txn.clone());
178                block_log.add(txn.get_sender(), txn.get_hash());
179                seen.insert((txn.get_sender(), txn.get_hash()));
180            }
181        }
182        let mut max_pivot_height = 0;
183        let mut chosen_pivot_tx = None;
184        // iterate all pivot decision transaction
185        for pivot_decision_set in self.transactions.iter_pivot_decision() {
186            let mut pivot_decision_opt = None;
187            diem_debug!("get_block: 0 {:?}", pivot_decision_set.len());
188            for (account, hash) in pivot_decision_set.iter() {
189                if validators.get_public_key(account).is_some() {
190                    if pivot_decision_opt.is_none() {
191                        if let Some(txn) = self.transactions.get(hash) {
192                            pivot_decision_opt = Some(txn);
193                        }
194                    }
195                }
196            }
197            diem_debug!("get_block: 1 {:?}", pivot_decision_opt);
198            if validators
199                .check_voting_power(
200                    pivot_decision_set.iter().map(|(addr, _)| addr),
201                )
202                .is_ok()
203            {
204                let pivot_decision = pivot_decision_opt.unwrap();
205                let pivot_height = match pivot_decision.payload() {
206                    TransactionPayload::PivotDecision(decision) => {
207                        decision.height
208                    }
209                    _ => unreachable!(),
210                };
211                if pivot_height > max_pivot_height
212                    && pivot_height > pos_state.pivot_decision().height
213                {
214                    max_pivot_height = pivot_height;
215                    chosen_pivot_tx = Some(pivot_decision);
216                }
217            }
218            diem_debug!("get_block: 2 {:?}", chosen_pivot_tx);
219        }
220        if let Some(tx) = chosen_pivot_tx {
221            let pivot_decision_hash = match tx.payload() {
222                TransactionPayload::PivotDecision(decision) => decision.hash(),
223                _ => unreachable!(),
224            };
225            // aggregate signatures
226            let txn_hashes =
227                self.transactions.get_pivot_decisions(&pivot_decision_hash);
228            let senders: Vec<AccountAddress> =
229                validators.get_ordered_account_addresses_iter().collect();
230            let mut signatures = vec![];
231            for hash in &txn_hashes {
232                if let Some(txn) = self.transactions.get(hash) {
233                    match txn.authenticator() {
234                        TransactionAuthenticator::BLS { signature, .. } => {
235                            if let Ok(index) =
236                                senders.binary_search(&txn.sender())
237                            {
238                                signatures.push((signature, index));
239                            }
240                        }
241                        _ => unreachable!(),
242                    }
243                }
244            }
245            let new_tx =
246                SignedTransaction::new_multisig(tx.raw_txn(), signatures);
247            block_log.add(new_tx.sender(), new_tx.hash());
248            block.push(new_tx);
249        }
250
251        diem_debug!(
252            LogSchema::new(LogEntry::GetBlock).txns(block_log),
253            seen_consensus = seen_size,
254            walked = txn_walked,
255            seen_after = seen.len(),
256            result_size = block.len(),
257            block_size = block.len()
258        );
259        for transaction in &block {
260            self.log_latency(
261                transaction.sender(),
262                transaction.hash(),
263                counters::GET_BLOCK_STAGE_LABEL,
264            );
265        }
266        block
267    }
268
269    /// Periodic core mempool garbage collection.
270    /// Removes all expired transactions and clears expired entries in metrics
271    /// cache and sequence number cache.
272    pub(crate) fn gc(&mut self) {
273        let now = SystemTime::now();
274        self.transactions.gc_by_system_ttl(&self.metrics_cache);
275        self.metrics_cache.gc(now);
276    }
277
278    /// Garbage collection based on client-specified expiration time.
279    pub(crate) fn gc_by_expiration_time(&mut self, block_time: Duration) {
280        self.transactions
281            .gc_by_expiration_time(block_time, &self.metrics_cache);
282    }
283
284    /// Read `count` transactions from timeline since `timeline_id`.
285    /// Returns block of transactions and new last_timeline_id.
286    pub(crate) fn read_timeline(
287        &mut self, timeline_id: u64, count: usize,
288    ) -> (Vec<SignedTransaction>, u64) {
289        self.transactions.read_timeline(timeline_id, count)
290    }
291
292    /// Read transactions from timeline from `start_id` (exclusive) to `end_id`
293    /// (inclusive).
294    pub(crate) fn timeline_range(
295        &mut self, start_id: u64, end_id: u64,
296    ) -> Vec<SignedTransaction> {
297        self.transactions.timeline_range(start_id, end_id)
298    }
299}