cfxcore/transaction_pool/
mod.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5mod deferred_pool;
6mod error;
7mod garbage_collector;
8mod nonce_pool;
9mod pool_metrics;
10mod state_provider;
11mod transaction_pool_inner;
12
13pub use error::TransactionPoolError;
14
15use crate::{
16    block_data_manager::BlockDataManager,
17    consensus::BestInformation,
18    transaction_pool::{nonce_pool::TxWithReadyInfo, pool_metrics::*},
19    verification::{VerificationConfig, VerifyTxLocalMode, VerifyTxMode},
20};
21use cfx_executor::{
22    machine::Machine, spec::TransitionsEpochHeight, state::State,
23};
24use cfx_parameters::{
25    block::{
26        cspace_block_gas_limit_after_cip1559, espace_block_gas_limit,
27        espace_block_gas_limit_of_enabled_block,
28        DEFAULT_TARGET_BLOCK_GAS_LIMIT,
29    },
30    consensus_internal::ELASTICITY_MULTIPLIER,
31};
32use cfx_rpc_cfx_types::{PendingReason, TransactionStatus};
33use cfx_statedb::{Result as StateDbResult, StateDb};
34use cfx_storage::{StateIndex, StorageManagerTrait};
35use cfx_types::{
36    AddressWithSpace as Address, AllChainID, Space, SpaceMap, H256, U256,
37};
38use cfx_vm_types::Spec;
39
40use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
41use metrics::{MeterTimer, RwLockExtensions};
42use parking_lot::{Mutex, RwLock};
43use primitives::{
44    block::BlockHeight,
45    block_header::{compute_next_price, compute_next_price_tuple},
46    Account, SignedTransaction, Transaction, TransactionWithSignature,
47};
48use state_provider::StateProvider;
49use std::{
50    cmp::{max, min},
51    collections::{hash_map::HashMap, BTreeMap, BTreeSet},
52    mem,
53    ops::DerefMut,
54    sync::{
55        atomic::{AtomicBool, Ordering},
56        Arc,
57    },
58};
59use transaction_pool_inner::TransactionPoolInner;
60
61pub struct TxPoolConfig {
62    pub capacity: usize,
63    pub min_native_tx_price: u64,
64    pub min_eth_tx_price: u64,
65    pub half_block_gas_limit: RwLock<U256>,
66    pub allow_gas_over_half_block: bool,
67    pub target_block_gas_limit: u64,
68    pub max_packing_batch_gas_limit: u64,
69    pub max_packing_batch_size: usize,
70    pub packing_pool_degree: u8,
71}
72
73impl MallocSizeOf for TxPoolConfig {
74    fn size_of(&self, _ops: &mut MallocSizeOfOps) -> usize { 0 }
75}
76
77impl Default for TxPoolConfig {
78    fn default() -> Self {
79        TxPoolConfig {
80            capacity: 500_000,
81            min_native_tx_price: 1,
82            min_eth_tx_price: 1,
83            half_block_gas_limit: RwLock::new(U256::from(
84                DEFAULT_TARGET_BLOCK_GAS_LIMIT,
85            )),
86            allow_gas_over_half_block: true,
87            max_packing_batch_size: 20,
88            max_packing_batch_gas_limit: DEFAULT_TARGET_BLOCK_GAS_LIMIT / 10,
89            packing_pool_degree: 4,
90            target_block_gas_limit: DEFAULT_TARGET_BLOCK_GAS_LIMIT,
91        }
92    }
93}
94
95impl TxPoolConfig {
96    pub fn check_gas_price_and_limit(
97        &self, tx: &TransactionWithSignature,
98    ) -> Result<(), TransactionPoolError> {
99        // If the actual block gas limit is less than the miners' preference,
100        // the miner chooses the actual limit to ensure compatibility with other
101        // nodes. If the actual block gas limit exceeds the miners'
102        // preference, the miner adheres to their own settings since this does
103        // not result in incompatibility with others.
104        let half_block_gas_limit = std::cmp::min(
105            *self.half_block_gas_limit.read(),
106            U256::from(self.target_block_gas_limit),
107        );
108
109        // The current implementation is designed for after the activation of
110        // CIP-1559. However, it is also compatible with the system before
111        // CIP-1559 was activated, although there are some minor behavioral
112        // differences.
113        let block_gas_target = half_block_gas_limit;
114
115        let min_tx_price = match tx.space() {
116            Space::Native => self.min_native_tx_price,
117            Space::Ethereum => self.min_eth_tx_price,
118        };
119
120        let space_gas_target: U256 = match tx.space() {
121            Space::Native => {
122                cspace_block_gas_limit_after_cip1559(block_gas_target)
123            }
124            Space::Ethereum => {
125                espace_block_gas_limit_of_enabled_block(block_gas_target)
126            }
127        };
128
129        let space_gas_limit = space_gas_target * 2;
130        let max_tx_gas = if self.allow_gas_over_half_block {
131            space_gas_limit
132        } else {
133            space_gas_limit / 2
134        };
135
136        let tx_gas = *tx.gas();
137        let tx_gas_price = *tx.gas_price();
138
139        if tx_gas > max_tx_gas {
140            warn!(
141                "Transaction discarded due to above gas limit: {} > {:?}",
142                tx.gas(),
143                max_tx_gas
144            );
145            return Err(TransactionPoolError::GasLimitExceeded {
146                max: max_tx_gas,
147                have: tx_gas,
148            });
149        }
150
151        let minimum_price = compute_next_price(
152            space_gas_target,
153            tx_gas,
154            min_tx_price.into(),
155            min_tx_price.into(),
156        );
157
158        // check transaction gas price
159        if tx_gas_price < minimum_price {
160            trace!("Transaction {} discarded due to below minimal gas price: price {}", tx.hash(), tx_gas_price);
161            return Err(TransactionPoolError::GasPriceLessThanMinimum {
162                min: minimum_price,
163                have: tx_gas_price,
164            });
165        }
166
167        Ok(())
168    }
169}
170
171pub struct TransactionPool {
172    pub config: TxPoolConfig,
173    verification_config: VerificationConfig,
174    inner: RwLock<TransactionPoolInner>,
175    to_propagate_trans: Arc<RwLock<HashMap<H256, Arc<SignedTransaction>>>>,
176    pub data_man: Arc<BlockDataManager>,
177    best_executed_state: Mutex<Arc<State>>,
178    consensus_best_info: Mutex<Arc<BestInformation>>,
179    set_tx_requests: Mutex<Vec<Arc<SignedTransaction>>>,
180    recycle_tx_requests: Mutex<Vec<Arc<SignedTransaction>>>,
181    machine: Arc<Machine>,
182
183    /// If it's `false`, operations on the tx pool will be ignored to save
184    /// memory/CPU cost.
185    ready_for_mining: AtomicBool,
186}
187
188impl MallocSizeOf for TransactionPool {
189    fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
190        let inner_size = self.inner.read().size_of(ops);
191        let to_propagate_trans_size =
192            self.to_propagate_trans.read().size_of(ops);
193        let consensus_best_info_size =
194            self.consensus_best_info.lock().size_of(ops);
195        let set_tx_requests_size = self.set_tx_requests.lock().size_of(ops);
196        let recycle_tx_requests_size =
197            self.recycle_tx_requests.lock().size_of(ops);
198        self.config.size_of(ops)
199            + inner_size
200            + to_propagate_trans_size
201            + self.data_man.size_of(ops)
202            + consensus_best_info_size
203            + set_tx_requests_size
204            + recycle_tx_requests_size
205        // Does not count size_of machine
206    }
207}
208
209pub type SharedTransactionPool = Arc<TransactionPool>;
210
211impl TransactionPool {
212    pub fn new(
213        config: TxPoolConfig, verification_config: VerificationConfig,
214        data_man: Arc<BlockDataManager>, machine: Arc<Machine>,
215    ) -> Self {
216        let genesis_hash = data_man.true_genesis.hash();
217        let inner = TransactionPoolInner::new(
218            config.capacity,
219            config.max_packing_batch_gas_limit as usize,
220            config.max_packing_batch_size,
221            config.packing_pool_degree,
222        );
223        let best_executed_state = Mutex::new(
224            Self::get_best_executed_state_by_epoch(
225                &data_man,
226                StateIndex::new_for_readonly(
227                    &genesis_hash,
228                    &data_man.true_genesis_state_root(),
229                ),
230            )
231            .expect("The genesis state is guaranteed to exist."),
232        );
233        TransactionPool {
234            config,
235            verification_config,
236            inner: RwLock::new(inner),
237            to_propagate_trans: Arc::new(RwLock::new(HashMap::new())),
238            data_man: data_man.clone(),
239            best_executed_state,
240            consensus_best_info: Mutex::new(Arc::new(Default::default())),
241            set_tx_requests: Mutex::new(Default::default()),
242            recycle_tx_requests: Mutex::new(Default::default()),
243            machine,
244            ready_for_mining: AtomicBool::new(false),
245        }
246    }
247
248    pub fn machine(&self) -> Arc<Machine> { self.machine.clone() }
249
250    pub fn get_transaction(
251        &self, tx_hash: &H256,
252    ) -> Option<Arc<SignedTransaction>> {
253        self.inner.read().get(tx_hash)
254    }
255
256    pub fn get_transaction_by_address2nonce(
257        &self, address: Address, nonce: U256,
258    ) -> Option<Arc<SignedTransaction>> {
259        self.inner.read().get_by_address2nonce(address, nonce)
260    }
261
262    pub fn check_tx_packed_in_deferred_pool(&self, tx_hash: &H256) -> bool {
263        self.inner.read().check_tx_packed_in_deferred_pool(tx_hash)
264    }
265
266    pub fn get_local_account_info(&self, address: &Address) -> (U256, U256) {
267        self.inner
268            .read()
269            .get_local_nonce_and_balance(address)
270            .unwrap_or((0.into(), 0.into()))
271    }
272
273    pub fn get_next_nonce(&self, address: &Address) -> U256 {
274        let (state_nonce, _) = self
275            .get_state_account_info(address)
276            .unwrap_or((0.into(), 0.into()));
277        self.inner.read().get_next_nonce(address, state_nonce)
278    }
279
280    pub fn get_account_pending_info(
281        &self, address: &Address,
282    ) -> Option<(U256, U256, U256, H256)> {
283        self.inner.read().get_account_pending_info(address)
284    }
285
286    /// Return `(pending_txs, first_tx_status, pending_count)`.
287    pub fn get_account_pending_transactions(
288        &self, address: &Address, maybe_start_nonce: Option<U256>,
289        maybe_limit: Option<usize>, best_height: BlockHeight,
290    ) -> StateDbResult<(
291        Vec<Arc<SignedTransaction>>,
292        Option<TransactionStatus>,
293        usize,
294    )> {
295        use TransactionStatus::{Pending, Ready};
296
297        let inner = self.inner.read();
298        let (txs, mut first_tx_status, pending_count) = inner
299            .get_account_pending_transactions(
300                address,
301                maybe_start_nonce,
302                maybe_limit,
303            );
304
305        let first_tx = if let Some(first) = txs.first() {
306            first
307        } else {
308            return Ok((txs, first_tx_status, pending_count));
309        };
310
311        if let Transaction::Native(tx) = &first_tx.unsigned {
312            if VerificationConfig::check_transaction_epoch_bound(
313                tx,
314                best_height,
315                self.verification_config.transaction_epoch_bound,
316            ) == -1
317            {
318                // If the epoch height is out of bound, overwrite the
319                // pending reason.
320                first_tx_status = Some(Pending(PendingReason::OldEpochHeight));
321            }
322        }
323
324        if !matches!(
325            first_tx_status,
326            Some(Ready | Pending(PendingReason::NotEnoughCash))
327        ) {
328            return Ok((txs, first_tx_status, pending_count));
329        }
330
331        // The sponsor status may have changed, check again.
332        // This is not applied to the tx pool state because this check is
333        // only triggered on the RPC server.
334        let state = self.get_best_state_provider();
335
336        let (sponsored_gas, sponsored_storage) =
337            inner.get_sponsored_gas_and_storage(&state, &first_tx)?;
338        let (_, balance) = state.get_nonce_and_balance(&first_tx.sender())?;
339        let tx_cost = TxWithReadyInfo::new(
340            first_tx.clone(),
341            false,
342            sponsored_gas,
343            sponsored_storage,
344        )
345        .get_tx_cost();
346
347        let outdated = match (tx_cost <= balance, &first_tx_status) {
348            (true, Some(Pending(PendingReason::NotEnoughCash)))
349            | (false, Some(Ready)) => true,
350            _ => false,
351        };
352        if outdated {
353            first_tx_status = Some(Pending(PendingReason::OutdatedStatus));
354        }
355
356        return Ok((txs, first_tx_status, pending_count));
357    }
358
359    pub fn get_pending_transaction_hashes_in_evm_pool(&self) -> BTreeSet<H256> {
360        self.inner.read().ready_transacton_hashes_in_evm_pool()
361    }
362
363    pub fn get_pending_transaction_hashes_in_native_pool(
364        &self,
365    ) -> BTreeSet<H256> {
366        self.inner.read().ready_transacton_hashes_in_native_pool()
367    }
368
369    pub fn get_state_account_info(
370        &self, address: &Address,
371    ) -> StateDbResult<(U256, U256)> {
372        let state = self.get_best_state_provider();
373        state.get_nonce_and_balance(address)
374    }
375
376    pub fn calc_half_block_gas_limit(&self) -> Option<U256> {
377        let current_best_info = self.consensus_best_info.lock().clone();
378        self.data_man
379            .block_from_db(&current_best_info.best_block_hash)
380            .map(|pivot_block| pivot_block.block_header.gas_limit() / 2)
381    }
382
383    /// Try to insert `transactions` into transaction pool.
384    ///
385    /// If some tx is already in our tx_cache, it will be ignored and will not
386    /// be added to returned `passed_transactions`. If some tx invalid or
387    /// cannot be inserted to the tx pool, it will be included in the returned
388    /// `failure` and will not be propagated.
389    pub fn insert_new_transactions(
390        &self, mut transactions: Vec<TransactionWithSignature>,
391    ) -> (
392        Vec<Arc<SignedTransaction>>,
393        HashMap<H256, TransactionPoolError>,
394    ) {
395        INSERT_TPS.mark(1);
396        INSERT_TXS_TPS.mark(transactions.len());
397        let _timer = MeterTimer::time_func(TX_POOL_INSERT_TIMER.as_ref());
398
399        let mut passed_transactions = Vec::new();
400        let mut failure = HashMap::new();
401        let current_best_info = self.consensus_best_info.lock().clone();
402
403        let (chain_id, best_height, best_block_number) = {
404            (
405                current_best_info.best_chain_id(),
406                current_best_info.best_epoch_number,
407                current_best_info.best_block_number,
408            )
409        };
410        // FIXME: Needs further discussion here, some transactions may be valid
411        // and invalid back and forth does this matters? But for the epoch
412        // height check, it may also become valid and invalid back and forth.
413        let vm_spec = self.machine.spec(best_block_number, best_height);
414        let transitions = &self.machine.params().transition_heights;
415
416        // filter out invalid transactions.
417        let mut index = 0;
418        while let Some(tx) = transactions.get(index) {
419            match self.verify_transaction_tx_pool(
420                tx,
421                /* basic_check = */ true,
422                chain_id,
423                best_height,
424                transitions,
425                &vm_spec,
426            ) {
427                Ok(_) => index += 1,
428                Err(e) => {
429                    let removed = transactions.swap_remove(index);
430                    debug!("failed to insert tx into pool (validation failed), hash = {:?}, error = {:?}", removed.hash, e);
431                    failure.insert(removed.hash, e);
432                }
433            }
434        }
435
436        if transactions.is_empty() {
437            INSERT_TXS_SUCCESS_TPS.mark(passed_transactions.len());
438            INSERT_TXS_FAILURE_TPS.mark(failure.len());
439            return (passed_transactions, failure);
440        }
441
442        // Recover public key and insert into pool with readiness check.
443        // Note, the workload of recovering public key is very heavy, especially
444        // in case of high TPS (e.g. > 8000). So, it's better to recover public
445        // key after basic verification.
446        match self.data_man.recover_unsigned_tx(&transactions) {
447            Ok(signed_trans) => {
448                let state = self.get_best_state_provider();
449                let mut inner =
450                    self.inner.write_with_metric(&INSERT_TXS_ENQUEUE_LOCK);
451                let mut to_prop = self.to_propagate_trans.write();
452
453                for tx in signed_trans {
454                    if inner.get(&tx.hash).is_some() {
455                        continue;
456                    }
457
458                    if let Err(e) = self.add_transaction_with_readiness_check(
459                        &mut *inner,
460                        &state,
461                        tx.clone(),
462                        false,
463                        false,
464                    ) {
465                        debug!(
466                            "tx {:?} fails to be inserted to pool, err={:?}",
467                            &tx.hash, e
468                        );
469                        failure.insert(tx.hash(), e);
470                        continue;
471                    }
472
473                    passed_transactions.push(tx.clone());
474                    if to_prop.len() < inner.capacity() {
475                        to_prop.entry(tx.hash).or_insert(tx);
476                    }
477                }
478            }
479            Err(e) => {
480                for tx in transactions {
481                    failure.insert(
482                        tx.hash(),
483                        TransactionPoolError::RlpDecodeError(format!(
484                            "{:?}",
485                            e
486                        )),
487                    );
488                }
489            }
490        }
491
492        TX_POOL_DEFERRED_GAUGE.update(self.total_deferred(None));
493        TX_POOL_UNPACKED_GAUGE.update(self.total_unpacked());
494        TX_POOL_READY_GAUGE.update(self.total_ready_accounts());
495
496        INSERT_TXS_SUCCESS_TPS.mark(passed_transactions.len());
497        INSERT_TXS_FAILURE_TPS.mark(failure.len());
498
499        (passed_transactions, failure)
500    }
501
502    /// Try to insert `signed_transaction` into transaction pool.
503    ///
504    /// If some tx is already in our tx_cache, it will be ignored and will not
505    /// be added to returned `passed_transactions`. If some tx invalid or
506    /// cannot be inserted to the tx pool, it will be included in the returned
507    /// `failure` and will not be propagated.
508    pub fn insert_new_signed_transactions(
509        &self, mut signed_transactions: Vec<Arc<SignedTransaction>>,
510    ) -> (
511        Vec<Arc<SignedTransaction>>,
512        HashMap<H256, TransactionPoolError>,
513    ) {
514        INSERT_TPS.mark(1);
515        INSERT_TXS_TPS.mark(signed_transactions.len());
516        let _timer = MeterTimer::time_func(TX_POOL_INSERT_TIMER.as_ref());
517
518        let mut passed_transactions = Vec::new();
519        let mut failure = HashMap::new();
520        let current_best_info = { self.consensus_best_info.lock().clone() };
521
522        // filter out invalid transactions.
523        let mut index = 0;
524
525        let (chain_id, best_height, best_block_number) = {
526            (
527                current_best_info.best_chain_id(),
528                current_best_info.best_epoch_number,
529                current_best_info.best_block_number,
530            )
531        };
532        // FIXME: Needs further discussion here, some transactions may be valid
533        // and invalid back and forth does this matters?
534        let vm_spec = self.machine.spec(best_block_number, best_height);
535        let transitions = &self.machine.params().transition_heights;
536
537        while let Some(tx) = signed_transactions.get(index) {
538            match self.verify_transaction_tx_pool(
539                &tx.transaction,
540                true, /* basic_check = */
541                chain_id,
542                best_height,
543                transitions,
544                &vm_spec,
545            ) {
546                Ok(_) => index += 1,
547                Err(e) => {
548                    let removed = signed_transactions.swap_remove(index);
549                    debug!("failed to insert tx into pool (validation failed), hash = {:?}, error = {:?}", removed.hash, e);
550                    failure.insert(removed.hash, e);
551                }
552            }
553        }
554
555        // ensure the pool has enough quota to insert new signed transactions.
556        let quota = self
557            .inner
558            .write_with_metric(&INSERT_TXS_QUOTA_LOCK)
559            .remaining_quota();
560        if quota < signed_transactions.len() {
561            for tx in signed_transactions.split_off(quota) {
562                trace!("failed to insert tx into pool (quota not enough), hash = {:?}", tx.hash);
563                failure.insert(tx.hash, TransactionPoolError::TxPoolFull);
564            }
565        }
566
567        if signed_transactions.is_empty() {
568            INSERT_TXS_SUCCESS_TPS.mark(passed_transactions.len());
569            INSERT_TXS_FAILURE_TPS.mark(failure.len());
570            return (passed_transactions, failure);
571        }
572
573        // Insert into pool with readiness check.
574        // Notice it does not recover the public as the input transactions are
575        // already signed.
576
577        {
578            let state = self.get_best_state_provider();
579            let mut inner =
580                self.inner.write_with_metric(&INSERT_TXS_ENQUEUE_LOCK);
581            let mut to_prop = self.to_propagate_trans.write();
582
583            for tx in signed_transactions {
584                if let Err(e) = self.add_transaction_with_readiness_check(
585                    &mut *inner,
586                    &state,
587                    tx.clone(),
588                    false,
589                    false,
590                ) {
591                    debug!(
592                        "tx {:?} fails to be inserted to pool, err={:?}",
593                        &tx.hash, e
594                    );
595                    failure.insert(tx.hash(), e);
596                    continue;
597                }
598                passed_transactions.push(tx.clone());
599                if !to_prop.contains_key(&tx.hash) {
600                    to_prop.insert(tx.hash, tx);
601                }
602            }
603            //RwLock is dropped here
604        }
605
606        TX_POOL_DEFERRED_GAUGE.update(self.total_deferred(None));
607        TX_POOL_UNPACKED_GAUGE.update(self.total_unpacked());
608        TX_POOL_READY_GAUGE.update(self.total_ready_accounts());
609
610        INSERT_TXS_SUCCESS_TPS.mark(passed_transactions.len());
611        INSERT_TXS_FAILURE_TPS.mark(failure.len());
612
613        (passed_transactions, failure)
614    }
615
616    /// verify transactions based on the rules that have nothing to do with
617    /// readiness
618    fn verify_transaction_tx_pool(
619        &self, transaction: &TransactionWithSignature, basic_check: bool,
620        chain_id: AllChainID, best_height: u64,
621        transitions: &TransitionsEpochHeight, spec: &Spec,
622    ) -> Result<(), TransactionPoolError> {
623        let _timer = MeterTimer::time_func(TX_POOL_VERIFY_TIMER.as_ref());
624        let mode = VerifyTxMode::Local(VerifyTxLocalMode::MaybeLater, spec);
625
626        if basic_check {
627            self.verification_config
628                .check_tx_size(transaction)
629                .map_err(|e| TransactionPoolError::TransactionError(e))?;
630            if let Err(e) = self.verification_config.verify_transaction_common(
631                transaction,
632                chain_id,
633                best_height,
634                transitions,
635                mode,
636            ) {
637                warn!("Transaction {:?} discarded due to not passing basic verification.", transaction.hash());
638                return Err(TransactionPoolError::TransactionError(e));
639            }
640        }
641
642        // Check the epoch height is moved to verify_transaction_common. In
643        // VerifyTxLocalMode::MaybeLater mode, a transaction with larger target
644        // epoch can be accepted. Since PR #1610, it is guaranteed that
645        // best info is initialized here.
646
647        // check transaction gas limit
648        self.config.check_gas_price_and_limit(transaction)?;
649
650        Ok(())
651    }
652
653    // Add transaction into deferred pool and maintain its readiness
654    // the packed tag provided
655    // if force tag is true, the replacement in nonce pool must be happened
656    pub fn add_transaction_with_readiness_check(
657        &self, inner: &mut TransactionPoolInner, state: &StateProvider,
658        transaction: Arc<SignedTransaction>, packed: bool, force: bool,
659    ) -> Result<(), TransactionPoolError> {
660        inner.insert_transaction_with_readiness_check(
661            state,
662            transaction,
663            packed,
664            force,
665        )
666    }
667
668    pub fn get_to_be_propagated_transactions(
669        &self,
670    ) -> HashMap<H256, Arc<SignedTransaction>> {
671        let mut to_prop = self.to_propagate_trans.write();
672        let mut res = HashMap::new();
673        mem::swap(&mut *to_prop, &mut res);
674        res
675    }
676
677    pub fn set_to_be_propagated_transactions(
678        &self, transactions: HashMap<H256, Arc<SignedTransaction>>,
679    ) {
680        let mut to_prop = self.to_propagate_trans.write();
681        to_prop.extend(transactions);
682    }
683
684    pub fn remove_to_be_propagated_transactions(&self, tx_hash: &H256) {
685        self.to_propagate_trans.write().remove(tx_hash);
686    }
687
688    // If a tx is failed executed due to invalid nonce or if its enclosing block
689    // becomes orphan due to era transition. This function should be invoked
690    // to recycle it
691    pub fn recycle_transactions(
692        &self, transactions: Vec<Arc<SignedTransaction>>,
693    ) {
694        trace!(
695            "To re-add transactions to transaction pool. \
696             transactions={:?}",
697            &transactions
698        );
699        if transactions.is_empty() || !self.ready_for_mining() {
700            // Fast return.
701            return;
702        }
703
704        let mut recycle_req_buffer = self.recycle_tx_requests.lock();
705        for tx in transactions {
706            recycle_req_buffer.push(tx);
707        }
708    }
709
710    pub fn set_tx_packed(&self, transactions: &Vec<Arc<SignedTransaction>>) {
711        if transactions.is_empty() || !self.ready_for_mining() {
712            // Fast return.
713            return;
714        }
715        let mut tx_req_buffer = self.set_tx_requests.lock();
716        for tx in transactions {
717            tx_req_buffer.push(tx.clone());
718        }
719    }
720
721    pub fn pack_transactions<'a>(
722        &self, num_txs: usize, block_gas_limit: U256, evm_gas_limit: U256,
723        block_size_limit: usize, mut best_epoch_height: u64,
724        mut best_block_number: u64,
725    ) -> Vec<Arc<SignedTransaction>> {
726        let mut inner = self.inner.write_with_metric(&PACK_TRANSACTION_LOCK);
727        best_epoch_height += 1;
728        // The best block number is not necessary an exact number.
729        best_block_number += 1;
730        inner.pack_transactions(
731            num_txs,
732            block_gas_limit,
733            evm_gas_limit,
734            block_size_limit,
735            best_epoch_height,
736            best_block_number,
737            &self.verification_config,
738            &self.machine,
739        )
740    }
741
742    pub fn pack_transactions_1559<'a>(
743        &self, num_txs: usize, block_gas_limit: U256,
744        parent_base_price: SpaceMap<U256>, block_size_limit: usize,
745        mut best_epoch_height: u64, mut best_block_number: u64,
746    ) -> (Vec<Arc<SignedTransaction>>, SpaceMap<U256>) {
747        let mut inner = self.inner.write_with_metric(&PACK_TRANSACTION_LOCK);
748        best_epoch_height += 1;
749        // The best block number is not necessary an exact number.
750        best_block_number += 1;
751
752        let spec = self.machine.spec(best_block_number, best_epoch_height);
753        let transitions = &self.machine.params().transition_heights;
754
755        let validity = |tx: &SignedTransaction| {
756            self.verification_config.fast_recheck(
757                tx,
758                best_epoch_height,
759                transitions,
760                &spec,
761            )
762        };
763
764        inner.pack_transactions_1559(
765            num_txs,
766            block_gas_limit,
767            parent_base_price,
768            block_size_limit,
769            best_epoch_height,
770            &self.machine,
771            validity,
772        )
773    }
774
775    // A helper function for python test. Not intented to be used in the
776    // production mode because of its inefficiency
777    // LINT: this function should not belongs to txpool, since it does not
778    // access the pools. However, since transaction pool has context for
779    // computing the base price, it is the most proper position at this
780    // time. May be fixed in future refactoring.
781    pub fn compute_1559_base_price<'a, I>(
782        &self, parent_hash: &H256, block_gas_limit: U256, txs: I,
783    ) -> Result<Option<SpaceMap<U256>>, String>
784    where I: Iterator<Item = &'a SignedTransaction> + 'a {
785        let parent = self
786            .data_man
787            .block_header_by_hash(parent_hash)
788            .ok_or("Cannot find parent block")?;
789        let current_height = parent.height() + 1;
790
791        let params = self.machine.params();
792        let cip_1559_height = params.transition_heights.cip1559;
793        if current_height < cip_1559_height {
794            return Ok(None);
795        }
796
797        let mut gas_used = SpaceMap::default();
798        let mut min_gas_price =
799            SpaceMap::new(U256::max_value(), U256::max_value());
800        for tx in txs {
801            gas_used[tx.space()] += *tx.gas_limit();
802            min_gas_price[tx.space()] =
803                min_gas_price[tx.space()].min(*tx.gas_price());
804        }
805
806        let core_gas_limit =
807            cspace_block_gas_limit_after_cip1559(block_gas_limit);
808        let eth_gas_limit = espace_block_gas_limit(
809            params.can_pack_evm_transaction(current_height),
810            block_gas_limit,
811        );
812
813        let gas_target =
814            SpaceMap::new(core_gas_limit, eth_gas_limit).map_all(|x| x / 2);
815
816        let parent_base_price = if current_height == cip_1559_height {
817            params.init_base_price()
818        } else {
819            parent.base_price().unwrap()
820        };
821
822        let min_base_price = params.min_base_price();
823
824        let base_price = SpaceMap::zip4(
825            gas_target,
826            gas_used,
827            parent_base_price,
828            min_base_price,
829        )
830        .map_all(compute_next_price_tuple);
831
832        for space in [Space::Native, Space::Ethereum] {
833            if base_price[space] > min_gas_price[space] {
834                return Err(format!("Not sufficient min price in space {:?}, expected {:?}, actual {:?}", space, base_price[space], min_gas_price[space]));
835            }
836        }
837
838        Ok(Some(base_price))
839    }
840
841    pub fn notify_modified_accounts(
842        &self, accounts_from_execution: Vec<Account>,
843    ) {
844        let mut inner = self.inner.write_with_metric(&NOTIFY_MODIFIED_LOCK);
845        inner.notify_modified_accounts(accounts_from_execution)
846    }
847
848    pub fn clear_tx_pool(&self) {
849        let mut inner = self.inner.write();
850        inner.clear()
851    }
852
853    pub fn total_deferred(&self, space: Option<Space>) -> usize {
854        let inner = self.inner.read();
855        inner.total_deferred(space)
856    }
857
858    pub fn total_ready_accounts(&self) -> usize {
859        let inner = self.inner.read();
860        inner.total_ready_accounts()
861    }
862
863    pub fn total_received(&self) -> usize {
864        let inner = self.inner.read();
865        inner.total_received()
866    }
867
868    pub fn total_unpacked(&self) -> usize {
869        let inner = self.inner.read();
870        inner.total_unpacked(None)
871    }
872
873    // The total pending transactions in the pool
874    // Pending transactions are transactions that are ready to be packed
875    pub fn total_pending(&self, space: Option<Space>) -> u64 {
876        let inner = self.inner.read();
877        inner.total_pending(space)
878    }
879
880    // The total queued transactions in the pool
881    // Queued transactions are transactions that are not ready to be packed
882    // e.g. due to nonce gap or not enough balance
883    pub fn total_queued(&self, space: Option<Space>) -> u64 {
884        let inner = self.inner.read();
885        inner.total_queued(space)
886    }
887
888    /// stats retrieves the length of ready and deferred pool.
889    pub fn stats(&self) -> (usize, usize, usize, usize) {
890        let inner = self.inner.read();
891        (
892            inner.total_ready_accounts(),
893            inner.total_deferred(None),
894            inner.total_received(),
895            inner.total_unpacked(None),
896        )
897    }
898
899    pub fn eth_content(
900        &self, space: Option<Space>,
901    ) -> (
902        BTreeMap<Address, BTreeMap<U256, Arc<SignedTransaction>>>,
903        BTreeMap<Address, BTreeMap<U256, Arc<SignedTransaction>>>,
904    ) {
905        let inner = self.inner.read();
906        inner.eth_content(space)
907    }
908
909    pub fn eth_content_from(
910        &self, from: Address,
911    ) -> (
912        BTreeMap<U256, Arc<SignedTransaction>>,
913        BTreeMap<U256, Arc<SignedTransaction>>,
914    ) {
915        let inner = self.inner.read();
916        inner.eth_content_from(from)
917    }
918
919    /// content retrieves the ready and deferred transactions(all pool
920    /// transactions). deprecated: use eth_content instead
921    pub fn content(
922        &self, address: Option<Address>,
923    ) -> (Vec<Arc<SignedTransaction>>, Vec<Arc<SignedTransaction>>) {
924        let inner = self.inner.read();
925        inner.content(address)
926    }
927
928    pub fn notify_new_best_info(
929        &self, best_info: Arc<BestInformation>,
930    ) -> StateDbResult<()> {
931        let mut set_tx_buffer = self.set_tx_requests.lock();
932        let mut recycle_tx_buffer = self.recycle_tx_requests.lock();
933        {
934            let mut consensus_best_info = self.consensus_best_info.lock();
935            *consensus_best_info = best_info.clone();
936        }
937        if let Some(half_block_gas_limit) = self.calc_half_block_gas_limit() {
938            *self.config.half_block_gas_limit.write() = half_block_gas_limit;
939        }
940
941        let state = self.get_best_state_provider();
942        let mut inner = self.inner.write_with_metric(&NOTIFY_BEST_INFO_LOCK);
943        let inner = inner.deref_mut();
944
945        while let Some(tx) = set_tx_buffer.pop() {
946            let tx_hash = tx.hash();
947            if let Err(e) = self.add_transaction_with_readiness_check(
948                inner, &state, tx, true, false,
949            ) {
950                // TODO: A transaction that is packed multiple times would also
951                // throw an error here, but it should be normal.
952                trace!("set tx err: tx={}, e={:?}", tx_hash, e);
953            }
954        }
955
956        let (chain_id, best_height, best_block_number) = {
957            (
958                best_info.best_chain_id(),
959                best_info.best_epoch_number,
960                best_info.best_block_number,
961            )
962        };
963        // FIXME: Needs further discussion here, some transactions may be valid
964        // and invalid back and forth, does this matters?
965        let vm_spec = self.machine.spec(best_block_number, best_height);
966        let transitions = &self.machine.params().transition_heights;
967
968        while let Some(tx) = recycle_tx_buffer.pop() {
969            info!(
970                "should not trigger recycle transaction, nonce = {}, sender = {:?}, \
971                account nonce = {}, hash = {:?} .",
972                &tx.nonce(), &tx.sender(),
973                state.get_nonce(&tx.sender())?, tx.hash);
974
975            if let Err(e) = self.verify_transaction_tx_pool(
976                &tx,
977                /* basic_check = */ false,
978                chain_id,
979                best_height,
980                transitions,
981                &vm_spec,
982            ) {
983                warn!(
984                    "Recycled transaction {:?} discarded due to not passing verification {}.",
985                    tx.hash(), e
986                );
987            }
988            if let Err(e) = self.add_transaction_with_readiness_check(
989                inner, &state, tx, false, true,
990            ) {
991                warn!("recycle tx err: e={:?}", e);
992            }
993        }
994        debug!(
995            "notify_new_best_info: {:?}",
996            self.consensus_best_info.lock()
997        );
998
999        Ok(())
1000    }
1001
1002    // For RPC use only
1003    pub fn get_best_info_with_parent_base_price(
1004        &self,
1005    ) -> (Arc<BestInformation>, Option<SpaceMap<U256>>) {
1006        let consensus_best_info_clone = self.consensus_best_info.lock().clone();
1007        debug!(
1008            "get_best_info_with_base_price: {:?}",
1009            consensus_best_info_clone
1010        );
1011
1012        let params = self.machine.params();
1013        let parent_block = self
1014            .data_man
1015            .block_header_by_hash(&consensus_best_info_clone.best_block_hash)
1016            // The parent block must exists.
1017            .expect(&concat!(file!(), ":", line!(), ":", column!()));
1018
1019        let cip1559_height = params.transition_heights.cip1559;
1020        let pack_height = consensus_best_info_clone.best_epoch_number + 1;
1021
1022        (
1023            consensus_best_info_clone,
1024            if pack_height <= cip1559_height {
1025                None
1026            } else {
1027                // TODO: should we compute for the current base_price?
1028                Some(parent_block.base_price().unwrap())
1029            },
1030        )
1031    }
1032
1033    pub fn get_best_info_with_packed_transactions(
1034        &self, num_txs: usize, block_size_limit: usize,
1035        additional_transactions: Vec<Arc<SignedTransaction>>,
1036    ) -> (
1037        Arc<BestInformation>,
1038        U256,
1039        Vec<Arc<SignedTransaction>>,
1040        Option<SpaceMap<U256>>,
1041    ) {
1042        // We do not need to hold the lock because it is fine for us to generate
1043        // blocks that are slightly behind the best state.
1044        // We do not want to stall the consensus thread.
1045        let consensus_best_info_clone = self.consensus_best_info.lock().clone();
1046        debug!(
1047            "get_best_info_with_packed_transactions: {:?}",
1048            consensus_best_info_clone
1049        );
1050
1051        let params = self.machine.params();
1052
1053        let cip1559_height = params.transition_heights.cip1559;
1054        let pack_height = consensus_best_info_clone.best_epoch_number + 1;
1055
1056        let parent_block = self
1057            .data_man
1058            .block_header_by_hash(&consensus_best_info_clone.best_block_hash)
1059            // The parent block must exists.
1060            .expect(&concat!(file!(), ":", line!(), ":", column!()));
1061        let parent_block_gas_limit = *parent_block.gas_limit()
1062            * if cip1559_height == pack_height {
1063                ELASTICITY_MULTIPLIER
1064            } else {
1065                1
1066            };
1067
1068        let gas_limit_divisor = params.gas_limit_bound_divisor;
1069        let min_gas_limit = params.min_gas_limit;
1070        assert!(parent_block_gas_limit >= min_gas_limit);
1071        let gas_lower = max(
1072            parent_block_gas_limit - parent_block_gas_limit / gas_limit_divisor
1073                + 1,
1074            min_gas_limit,
1075        );
1076        let gas_upper = parent_block_gas_limit
1077            + parent_block_gas_limit / gas_limit_divisor
1078            - 1;
1079
1080        let target_gas_limit = self.config.target_block_gas_limit
1081            * if pack_height >= cip1559_height {
1082                ELASTICITY_MULTIPLIER as u64
1083            } else {
1084                1
1085            };
1086
1087        let self_gas_limit =
1088            min(max(target_gas_limit.into(), gas_lower), gas_upper);
1089
1090        let (transactions_from_pool, maybe_base_price) = if pack_height
1091            < cip1559_height
1092        {
1093            let evm_gas_limit = if self
1094                .machine
1095                .params()
1096                .can_pack_evm_transaction(pack_height)
1097            {
1098                self_gas_limit / params.evm_transaction_gas_ratio
1099            } else {
1100                U256::zero()
1101            };
1102
1103            let txs = self.pack_transactions(
1104                num_txs,
1105                self_gas_limit.clone(),
1106                evm_gas_limit,
1107                block_size_limit,
1108                consensus_best_info_clone.best_epoch_number,
1109                consensus_best_info_clone.best_block_number,
1110            );
1111            (txs, None)
1112        } else {
1113            let parent_base_price = if pack_height == cip1559_height {
1114                params.init_base_price()
1115            } else {
1116                parent_block.base_price().unwrap()
1117            };
1118
1119            let (txs, packing_base_price) = self.pack_transactions_1559(
1120                num_txs,
1121                self_gas_limit.clone(),
1122                parent_base_price,
1123                block_size_limit,
1124                consensus_best_info_clone.best_epoch_number,
1125                consensus_best_info_clone.best_block_number,
1126            );
1127
1128            let mut base_price = packing_base_price;
1129
1130            // May only happens in test mode
1131            if !additional_transactions.is_empty() {
1132                let iter = additional_transactions
1133                    .iter()
1134                    .chain(txs.iter())
1135                    .map(|x| &**x);
1136                match self.compute_1559_base_price(
1137                    &parent_block.hash(),
1138                    self_gas_limit,
1139                    iter,
1140                ) {
1141                    Ok(Some(p)) => {
1142                        base_price = p;
1143                    }
1144                    Ok(None) => {
1145                        warn!("Should not happen");
1146                    }
1147                    Err(e) => {
1148                        error!("Cannot compute base price with additional transactions: {}", e);
1149                    }
1150                }
1151            }
1152            (txs, Some(base_price))
1153        };
1154
1155        let transactions = [
1156            additional_transactions.as_slice(),
1157            transactions_from_pool.as_slice(),
1158        ]
1159        .concat();
1160
1161        (
1162            consensus_best_info_clone,
1163            self_gas_limit,
1164            transactions,
1165            maybe_base_price,
1166        )
1167    }
1168
1169    fn get_best_executed_state_by_epoch(
1170        data_man: &BlockDataManager, best_executed_epoch: StateIndex,
1171    ) -> StateDbResult<Arc<State>> {
1172        let storage = data_man
1173            .storage_manager
1174            .get_state_no_commit(
1175                best_executed_epoch,
1176                /* try_open = */ false,
1177                None,
1178            )?
1179            // Safe because the state is guaranteed to be available
1180            .unwrap();
1181        let state_db = StateDb::new(storage);
1182        let state = State::new(state_db)?;
1183        Ok(Arc::new(state))
1184    }
1185
1186    pub fn set_best_executed_state_by_epoch(
1187        &self, best_executed_epoch: StateIndex,
1188    ) -> StateDbResult<()> {
1189        *self.best_executed_state.lock() =
1190            Self::get_best_executed_state_by_epoch(
1191                &self.data_man,
1192                best_executed_epoch,
1193            )?;
1194
1195        Ok(())
1196    }
1197
1198    fn get_best_state_provider(&self) -> StateProvider {
1199        let _timer = MeterTimer::time_func(TX_POOL_GET_STATE_TIMER.as_ref());
1200        StateProvider::new((self.best_executed_state.lock()).clone())
1201    }
1202
1203    pub fn ready_for_mining(&self) -> bool {
1204        self.ready_for_mining.load(Ordering::SeqCst)
1205    }
1206
1207    pub fn set_ready_for_mining(&self) {
1208        self.ready_for_mining.store(true, Ordering::SeqCst);
1209    }
1210}