cfxcore/transaction_pool/
transaction_pool_inner.rs

1use super::{
2    deferred_pool::DeferredPool,
3    garbage_collector::GarbageCollector,
4    nonce_pool::{InsertResult, TxWithReadyInfo},
5    pool_metrics::pool_inner_metrics::*,
6    state_provider::StateProvider,
7    TransactionPoolError,
8};
9
10use crate::verification::{PackingCheckResult, VerificationConfig};
11use cfx_executor::machine::Machine;
12use cfx_packing_pool::PackingPoolConfig;
13use cfx_parameters::{
14    block::cspace_block_gas_limit_after_cip1559,
15    consensus_internal::ELASTICITY_MULTIPLIER,
16    staking::DRIPS_PER_STORAGE_COLLATERAL_UNIT,
17};
18
19use cfx_rpc_cfx_types::TransactionStatus;
20use cfx_statedb::Result as StateDbResult;
21use cfx_types::{
22    address_util::AddressUtil, AddressWithSpace, Space, SpaceMap, H256, U128,
23    U256, U512,
24};
25use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
26use metrics::MeterTimer;
27use primitives::{
28    block_header::compute_next_price, Account, Action, SignedTransaction,
29    Transaction, TransactionWithSignature,
30};
31use rlp::*;
32use std::{
33    collections::{BTreeMap, BTreeSet, HashMap},
34    sync::Arc,
35    time::{SystemTime, UNIX_EPOCH},
36};
37
38// lazy_static! {
39//     pub static ref MAX_WEIGHT: U256 = u128::max_value().into();
40// }
41
42const FURTHEST_FUTURE_TRANSACTION_NONCE_OFFSET: u32 = 2000;
43
44#[derive(Default, DeriveMallocSizeOf)]
45pub struct TransactionSet {
46    inner: HashMap<H256, Arc<SignedTransaction>>,
47    count: SpaceMap<usize>,
48}
49
50impl TransactionSet {
51    fn get(&self, tx_hash: &H256) -> Option<&Arc<SignedTransaction>> {
52        self.inner.get(tx_hash)
53    }
54
55    fn values(
56        &self,
57    ) -> std::collections::hash_map::Values<'_, H256, Arc<SignedTransaction>>
58    {
59        self.inner.values()
60    }
61
62    fn insert(
63        &mut self, tx_hash: H256, tx: Arc<SignedTransaction>,
64    ) -> Option<Arc<SignedTransaction>> {
65        *self.count.in_space_mut(tx.space()) += 1;
66        let res = self.inner.insert(tx_hash, tx);
67        if let Some(ref tx) = res {
68            *self.count.in_space_mut(tx.space()) -= 1;
69        }
70        res
71    }
72
73    fn remove(&mut self, tx_hash: &H256) -> Option<Arc<SignedTransaction>> {
74        let res = self.inner.remove(tx_hash);
75        if let Some(ref tx) = res {
76            *self.count.in_space_mut(tx.space()) -= 1;
77        }
78        res
79    }
80
81    fn clear(&mut self) {
82        self.inner.clear();
83        self.count.apply_all(|x| *x = 0);
84    }
85}
86
87#[derive(DeriveMallocSizeOf)]
88pub struct TransactionPoolInner {
89    capacity: usize,
90    // deprecated, this value is never updated
91    total_received_count: usize,
92    unpacked_transaction_count: SpaceMap<usize>,
93    /// Tracks all transactions in the transaction pool by account and nonce.
94    /// Packed and executed transactions will eventually be garbage collected.
95    deferred_pool: DeferredPool,
96    /// The cache of the latest nonce and balance in the state.
97    /// Updated with the storage data after a block is processed in consensus
98    /// (set_tx_packed), after epoch execution, or during transaction
99    /// insertion.
100    ready_nonces_and_balances: HashMap<AddressWithSpace, (U256, U256)>,
101    garbage_collector: SpaceMap<GarbageCollector>,
102    /// Keeps all transactions in the transaction pool.
103    /// It should contain the same transaction set as `deferred_pool`.
104    txs: TransactionSet,
105}
106
107impl TransactionPoolInner {
108    pub fn new(
109        capacity: usize, max_packing_batch_gas_limit: usize,
110        max_packing_batch_size: usize, packing_pool_degree: u8,
111    ) -> Self {
112        let config = PackingPoolConfig::new(
113            max_packing_batch_gas_limit.into(),
114            max_packing_batch_size,
115            packing_pool_degree,
116        );
117        TransactionPoolInner {
118            capacity,
119            total_received_count: 0,
120            unpacked_transaction_count: SpaceMap::default(),
121            deferred_pool: DeferredPool::new(config),
122            ready_nonces_and_balances: HashMap::new(),
123            garbage_collector: SpaceMap::default(),
124            txs: TransactionSet::default(),
125        }
126    }
127
128    #[cfg(test)]
129    pub fn new_for_test() -> Self { Self::new(50_000, 3_000_000, 50, 4) }
130
131    pub fn clear(&mut self) {
132        self.deferred_pool.clear();
133        self.ready_nonces_and_balances.clear();
134        self.garbage_collector.apply_all(|x| x.clear());
135        self.txs.clear();
136        self.total_received_count = 0;
137        self.unpacked_transaction_count.apply_all(|x| *x = 0);
138    }
139
140    pub fn total_deferred(&self, space: Option<Space>) -> usize {
141        match space {
142            Some(space) => *self.txs.count.in_space(space),
143            None => self.txs.count.map_sum(|x| *x),
144        }
145    }
146
147    pub fn ready_transacton_hashes_in_evm_pool(&self) -> BTreeSet<H256> {
148        self.deferred_pool
149            .ready_transaction_hashes(Space::Ethereum)
150            .collect()
151    }
152
153    pub fn ready_transacton_hashes_in_native_pool(&self) -> BTreeSet<H256> {
154        self.deferred_pool
155            .ready_transaction_hashes(Space::Native)
156            .collect()
157    }
158
159    pub fn total_ready_accounts(&self) -> usize {
160        self.deferred_pool.ready_account_number(Space::Ethereum)
161            + self.deferred_pool.ready_account_number(Space::Native)
162    }
163
164    pub fn total_received(&self) -> usize { self.total_received_count }
165
166    pub fn total_unpacked(&self, space: Option<Space>) -> usize {
167        match space {
168            Some(space) => *self.unpacked_transaction_count.in_space(space),
169            None => self.unpacked_transaction_count.map_sum(|x| *x),
170        }
171    }
172
173    pub fn total_pending(&self, space: Option<Space>) -> u64 {
174        let get_nonce_and_balance = |addr: &AddressWithSpace| {
175            self.ready_nonces_and_balances
176                .get(addr)
177                .map(|x| *x)
178                .unwrap_or_default()
179        };
180        self.deferred_pool
181            .pending_tx_number(space, get_nonce_and_balance)
182    }
183
184    pub fn total_queued(&self, space: Option<Space>) -> u64 {
185        self.total_unpacked(space) as u64 - self.total_pending(space)
186    }
187
188    pub fn get(&self, tx_hash: &H256) -> Option<Arc<SignedTransaction>> {
189        self.txs.get(tx_hash).map(|x| x.clone())
190    }
191
192    pub fn get_by_address2nonce(
193        &self, address: AddressWithSpace, nonce: U256,
194    ) -> Option<Arc<SignedTransaction>> {
195        let bucket = self.deferred_pool.get_bucket(&address)?;
196        bucket.get_tx_by_nonce(nonce).map(|tx| tx.transaction)
197    }
198
199    pub fn is_full(&self, space: Space) -> bool {
200        return self.total_deferred(Some(space)) >= self.capacity;
201    }
202
203    pub fn get_current_timestamp(&self) -> u64 {
204        let start = SystemTime::now();
205        let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap();
206        since_the_epoch.as_secs()
207    }
208
209    /// A sender has a transaction which is garbage collectable if
210    ///    1. there is at least a transaction whose nonce is less than
211    /// `ready_nonce`
212    ///    2. the nonce of all transactions are greater than or equal to
213    /// `ready_nonce` and it is not garbage collected during the last
214    /// `TIME_WINDOW` seconds
215    ///
216    /// We will pick a sender who has maximum number of transactions which are
217    /// garbage collectable. And if there is a tie, the one who has minimum
218    /// timestamp will be picked.
219    pub fn collect_garbage(&mut self, new_tx: &SignedTransaction) {
220        let space = new_tx.space();
221        let count_before_gc = self.total_deferred(Some(space));
222        let mut skipped_self_node = None;
223        while self.is_full(space)
224            && !self.garbage_collector.in_space(space).is_empty()
225        {
226            let current_timestamp = self.get_current_timestamp();
227            let (victim_address, victim) =
228                self.garbage_collector.in_space(space).top().unwrap();
229            // Accounts which are not in `deferred_pool` may be inserted
230            // into `garbage_collector`, we can just ignore them.
231            if !self.deferred_pool.contain_address(victim_address) {
232                self.garbage_collector.in_space_mut(space).pop();
233                continue;
234            }
235
236            // `count == 0` means all transactions are not executed, so there is
237            // no unconditional garbage collection to conduct and we need to
238            // check if we should replace one unexecuted tx.
239            if victim.count == 0 {
240                if *victim_address == new_tx.sender() {
241                    // We do not GC a not-executed transaction from the same
242                    // sender, so save it and try another account.
243                    let (victim_address, victim) = self
244                        .garbage_collector
245                        .in_space_mut(space)
246                        .pop()
247                        .unwrap();
248                    skipped_self_node = Some((victim_address, victim));
249                    continue;
250                } else if victim.has_ready_tx
251                    && victim.first_tx_gas_price >= *new_tx.gas_price()
252                {
253                    // If all transactions are not executed but some accounts
254                    // are not ready to be packed, we directly replace a
255                    // not-ready transaction (with the least gas_price in
256                    // garbage_collector). If all accounts
257                    // are ready, we check if the new tx has larger gas price
258                    // than some.
259                    trace!("txpool::collect_garbage fails, victim={:?} new_tx={:?} \
260                    new_tx_gas_price={:?}", victim, new_tx.hash(), new_tx.gas_price());
261                    return;
262                }
263            }
264
265            // victim is now chosen to be evicted.
266            let (victim_address, victim) =
267                self.garbage_collector.in_space_mut(space).pop().unwrap();
268
269            let (ready_nonce, _) = self
270                .get_local_nonce_and_balance(&victim_address)
271                .unwrap_or((0.into(), 0.into()));
272
273            let tx_with_ready_info = self
274                .deferred_pool
275                .remove_lowest_nonce(&victim_address)
276                .unwrap();
277            let to_remove_tx = tx_with_ready_info.get_arc_tx().clone();
278            trace!(
279                "txpool::collect_garbage removed tx {:?} sender={:?} nonce={:?} new_tx={:?}",
280                to_remove_tx.hash(),
281                victim_address,
282                to_remove_tx.nonce(),
283                new_tx.hash()
284            );
285
286            // We have to garbage collect an unexecuted transaction.
287            // TODO: Implement more heuristic strategies
288            if *to_remove_tx.nonce() >= ready_nonce {
289                assert_eq!(victim.count, 0);
290                GC_UNEXECUTED_COUNTER.inc(1);
291                warn!("an unexecuted tx is garbage-collected.");
292            }
293
294            if !tx_with_ready_info.is_already_packed() {
295                let tx_space = tx_with_ready_info.space();
296                *self.unpacked_transaction_count.in_space_mut(tx_space) = self
297                    .unpacked_transaction_count
298                    .in_space(tx_space)
299                    .checked_sub(1)
300                    .unwrap_or_else(|| {
301                        error!("unpacked_transaction_count under-flows.");
302                        0
303                    });
304            }
305
306            // maintain ready info
307            if !self.deferred_pool.contain_address(&victim_address) {
308                self.ready_nonces_and_balances.remove(&victim_address);
309            // The picked sender has no transactions now, and has been popped
310            // from `garbage_collector`.
311            } else {
312                let has_ready_tx =
313                    self.deferred_pool.has_ready_tx(&victim_address);
314                let first_tx_gas_price = *self
315                    .deferred_pool
316                    .get_lowest_nonce_tx(&victim_address)
317                    .expect("addr exist")
318                    .gas_price();
319                let count = if victim.count > 0 {
320                    victim.count - 1
321                } else {
322                    0
323                };
324                self.garbage_collector.in_space_mut(space).insert(
325                    &victim_address,
326                    count,
327                    current_timestamp,
328                    has_ready_tx,
329                    first_tx_gas_price,
330                );
331            }
332
333            // maintain txs
334            self.txs.remove(&to_remove_tx.hash());
335        }
336
337        // Insert back skipped nodes to keep `garbage_collector`
338        // unchanged.
339        if let Some((addr, node)) = skipped_self_node {
340            self.garbage_collector.in_space_mut(space).insert(
341                &addr,
342                node.count,
343                node.timestamp,
344                node.has_ready_tx,
345                node.first_tx_gas_price,
346            );
347        }
348        GC_METER.mark(count_before_gc - self.total_deferred(Some(space)));
349    }
350
351    /// Collect garbage and return the remaining quota of the pool to insert new
352    /// transactions.
353    pub fn remaining_quota(&self) -> usize {
354        let len = self.total_deferred(None);
355        self.garbage_collector.size() * self.capacity - len
356            + self.garbage_collector.map_sum(|x| x.gc_size())
357    }
358
359    pub fn capacity(&self) -> usize { self.capacity }
360
361    #[cfg(test)]
362    fn insert_transaction_for_test(
363        &mut self, transaction: Arc<SignedTransaction>, sender_nonce: U256,
364    ) -> InsertResult {
365        let sender = transaction.sender();
366        let res = self.insert_transaction_without_readiness_check(
367            transaction,
368            false,
369            true,
370            (sender_nonce, U256::from(u64::MAX)),
371            (0.into(), 0),
372        );
373        self.recalculate_readiness(&sender, sender_nonce, U256::from(u64::MAX));
374        res
375    }
376
377    // the new inserting will fail if tx_pool is full (even if `force` is true)
378    fn insert_transaction_without_readiness_check(
379        &mut self, transaction: Arc<SignedTransaction>, packed: bool,
380        force: bool, state_nonce_and_balance: (U256, U256),
381        (sponsored_gas, sponsored_storage): (U256, u64),
382    ) -> InsertResult {
383        let _timer = MeterTimer::time_func(
384            TX_POOL_INNER_WITHOUTCHECK_INSERT_TIMER.as_ref(),
385        );
386        if !self.deferred_pool.check_sender_and_nonce_exists(
387            &transaction.sender(),
388            &transaction.nonce(),
389        ) {
390            self.collect_garbage(transaction.as_ref());
391            if self.is_full(transaction.space()) {
392                return InsertResult::Failed(TransactionPoolError::TxPoolFull);
393            }
394        }
395        let result = {
396            let _timer =
397                MeterTimer::time_func(DEFERRED_POOL_INNER_INSERT.as_ref());
398            self.deferred_pool.insert(
399                TxWithReadyInfo::new(
400                    transaction.clone(),
401                    packed,
402                    sponsored_gas,
403                    sponsored_storage,
404                ),
405                force,
406            )
407        };
408
409        let tx_space = transaction.space();
410        match &result {
411            InsertResult::NewAdded => {
412                let (state_nonce, state_balance) = state_nonce_and_balance;
413                self.update_nonce_and_balance(
414                    &transaction.sender(),
415                    state_nonce,
416                    state_balance,
417                );
418                // GarbageCollector will be updated by the caller.
419                self.txs.insert(transaction.hash(), transaction.clone());
420                if !packed {
421                    *self.unpacked_transaction_count.in_space_mut(tx_space) +=
422                        1;
423                }
424            }
425            InsertResult::Failed(_) => {}
426            InsertResult::Updated(replaced_tx) => {
427                if !replaced_tx.is_already_packed() {
428                    *self.unpacked_transaction_count.in_space_mut(tx_space) =
429                        self.unpacked_transaction_count
430                            .in_space(tx_space)
431                            .checked_sub(1)
432                            .unwrap_or_else(|| {
433                                error!(
434                                    "unpacked_transaction_count under-flows."
435                                );
436                                0
437                            });
438                }
439                self.txs.remove(&replaced_tx.hash());
440                self.txs.insert(transaction.hash(), transaction.clone());
441                if !packed {
442                    *self.unpacked_transaction_count.in_space_mut(tx_space) +=
443                        1;
444                }
445            }
446        }
447
448        result
449    }
450
451    #[allow(dead_code)]
452    fn mark_packed(&mut self, tx: &SignedTransaction, packed: bool) {
453        let changed =
454            self.deferred_pool
455                .mark_packed(tx.sender(), tx.nonce(), packed);
456        if changed {
457            let tx_space = tx.space();
458            if packed {
459                if *self.unpacked_transaction_count.in_space(tx_space) == 0 {
460                    error!("unpacked_transaction_count under-flows.");
461                } else {
462                    *self.unpacked_transaction_count.in_space_mut(tx_space) -=
463                        1;
464                }
465            } else {
466                *self.unpacked_transaction_count.in_space_mut(tx_space) += 1;
467            }
468        }
469    }
470
471    pub fn get_account_pending_info(
472        &self, address: &AddressWithSpace,
473    ) -> Option<(U256, U256, U256, H256)> {
474        let (local_nonce, _local_balance) = self
475            .get_local_nonce_and_balance(address)
476            .unwrap_or((U256::from(0), U256::from(0)));
477        match self.deferred_pool.get_pending_info(address, &local_nonce) {
478            Some((pending_count, pending_tx)) => Some((
479                local_nonce,
480                U256::from(pending_count),
481                *pending_tx.nonce(),
482                pending_tx.hash(),
483            )),
484            None => {
485                Some((local_nonce, U256::from(0), U256::from(0), H256::zero()))
486            }
487        }
488    }
489
490    pub fn get_account_pending_transactions(
491        &self, address: &AddressWithSpace, maybe_start_nonce: Option<U256>,
492        maybe_limit: Option<usize>,
493    ) -> (
494        Vec<Arc<SignedTransaction>>,
495        Option<TransactionStatus>,
496        usize,
497    ) {
498        let (local_nonce, local_balance) = self
499            .get_local_nonce_and_balance(address)
500            .unwrap_or((U256::from(0), U256::from(0)));
501        let start_nonce = maybe_start_nonce.unwrap_or(local_nonce);
502        let (pending_txs, pending_reason) =
503            self.deferred_pool.get_pending_transactions(
504                address,
505                &start_nonce,
506                &local_nonce,
507                &local_balance,
508            );
509        if pending_txs.is_empty() {
510            return (Vec::new(), None, 0);
511        }
512        let first_tx_status = match pending_reason {
513            None => TransactionStatus::Ready,
514            Some(reason) => TransactionStatus::Pending(reason),
515        };
516        let pending_count = pending_txs.len();
517        let limit = maybe_limit.unwrap_or(usize::MAX);
518        (
519            pending_txs
520                .into_iter()
521                .map(|x| x.transaction.clone())
522                .take(limit)
523                .collect(),
524            Some(first_tx_status),
525            pending_count,
526        )
527    }
528
529    pub fn get_local_nonce_and_balance(
530        &self, address: &AddressWithSpace,
531    ) -> Option<(U256, U256)> {
532        self.ready_nonces_and_balances.get(address).map(|x| *x)
533    }
534
535    fn update_nonce_and_balance(
536        &mut self, address: &AddressWithSpace, nonce: U256, balance: U256,
537    ) {
538        if !self.deferred_pool.contain_address(address) {
539            return;
540        }
541        self.ready_nonces_and_balances
542            .insert((*address).clone(), (nonce, balance));
543    }
544
545    fn get_and_update_nonce_and_balance_from_storage(
546        &mut self, address: &AddressWithSpace, state: &StateProvider,
547    ) -> StateDbResult<(U256, U256)> {
548        let nonce_and_balance = state.get_nonce_and_balance(address)?;
549        if !self.deferred_pool.contain_address(address) {
550            return Ok(nonce_and_balance);
551        }
552        self.ready_nonces_and_balances
553            .insert((*address).clone(), nonce_and_balance);
554
555        Ok(nonce_and_balance)
556    }
557
558    pub fn get_lowest_nonce(&self, addr: &AddressWithSpace) -> U256 {
559        let mut ret = 0.into();
560        if let Some((nonce, _)) = self.get_local_nonce_and_balance(addr) {
561            ret = nonce;
562        }
563        if let Some(nonce) = self.deferred_pool.get_lowest_nonce(addr) {
564            if *nonce < ret {
565                ret = *nonce;
566            }
567        }
568        ret
569    }
570
571    pub fn get_next_nonce(
572        &self, address: &AddressWithSpace, state_nonce: U256,
573    ) -> U256 {
574        self.deferred_pool
575            .last_succ_nonce(*address, state_nonce)
576            .unwrap_or(state_nonce)
577    }
578
579    #[allow(dead_code)]
580    fn recalculate_readiness_with_local_info(
581        &mut self, addr: &AddressWithSpace,
582    ) {
583        let (nonce, balance) = self
584            .get_local_nonce_and_balance(addr)
585            .unwrap_or((0.into(), 0.into()));
586        self.recalculate_readiness(addr, nonce, balance);
587    }
588
589    fn recalculate_readiness_with_fixed_info(
590        &mut self, addr: &AddressWithSpace, nonce: U256, balance: U256,
591    ) {
592        self.update_nonce_and_balance(addr, nonce, balance);
593        self.recalculate_readiness(addr, nonce, balance);
594    }
595
596    fn recalculate_readiness_with_state(
597        &mut self, addr: &AddressWithSpace, state: &StateProvider,
598    ) -> StateDbResult<()> {
599        let _timer = MeterTimer::time_func(TX_POOL_RECALCULATE.as_ref());
600        let (nonce, balance) =
601            self.get_and_update_nonce_and_balance_from_storage(addr, state)?;
602        self.recalculate_readiness(addr, nonce, balance);
603        trace!(
604            "txpool::recalculate_readiness_with_state addr={:?} nonce={:?} balance={:?}",
605            addr,
606            nonce,
607            balance
608        );
609        Ok(())
610    }
611
612    fn recalculate_readiness(
613        &mut self, addr: &AddressWithSpace, nonce: U256, balance: U256,
614    ) {
615        let space = addr.space;
616        let ret = self
617            .deferred_pool
618            .recalculate_readiness_with_local_info(addr, nonce, balance);
619        match &ret {
620            Some(tx) => trace!(
621                "txpool::recalculate_readiness addr={:?} state_nonce={:?} ready_nonce={:?} ready_hash={:?} balance={:?}",
622                addr,
623                nonce,
624                tx.nonce(),
625                tx.hash(),
626                balance
627            ),
628            None => trace!(
629                "txpool::recalculate_readiness addr={:?} state_nonce={:?} no_ready_tx balance={:?}",
630                addr,
631                nonce,
632                balance
633            ),
634        }
635        // If addr is not in `deferred_pool`, it should have also been removed
636        // from garbage_collector
637        if let Some(tx) = self.deferred_pool.get_lowest_nonce_tx(addr) {
638            let count = self.deferred_pool.count_less(addr, &nonce);
639            let timestamp = self
640                .garbage_collector
641                .in_space(space)
642                .get_timestamp(addr)
643                .unwrap_or(self.get_current_timestamp());
644            self.garbage_collector.in_space_mut(space).insert(
645                addr,
646                count,
647                timestamp,
648                ret.is_some(),
649                *tx.gas_price(),
650            );
651        } else {
652            // An account is only removed from `deferred_pool` in GC,
653            // so this is not likely to happen.
654            // One possible reason is that an transactions not in txpool is
655            // executed and passed to notify_modified_accounts.
656            debug!(
657                "recalculate_readiness called for missing account: addr={:?}",
658                addr
659            );
660        }
661    }
662
663    pub fn check_tx_packed_in_deferred_pool(&self, tx_hash: &H256) -> bool {
664        match self.txs.get(tx_hash) {
665            Some(tx) => {
666                self.deferred_pool.check_tx_packed(tx.sender(), *tx.nonce())
667            }
668            None => false,
669        }
670    }
671
672    /// pack at most num_txs transactions randomly
673    pub fn pack_transactions<'a>(
674        &mut self, num_txs: usize, block_gas_limit: U256, evm_gas_limit: U256,
675        block_size_limit: usize, best_epoch_height: u64,
676        best_block_number: u64, verification_config: &VerificationConfig,
677        machine: &Machine,
678    ) -> Vec<Arc<SignedTransaction>> {
679        let mut packed_transactions: Vec<Arc<SignedTransaction>> = Vec::new();
680        if num_txs == 0 {
681            return packed_transactions;
682        }
683        debug!(
684            "txpool::pack_transactions start best_epoch={} best_block={} block_gas_limit={} evm_gas_limit={} block_size_limit={} limit={}",
685            best_epoch_height,
686            best_block_number,
687            block_gas_limit,
688            evm_gas_limit,
689            block_size_limit,
690            num_txs
691        );
692
693        let spec = machine.spec(best_block_number, best_epoch_height);
694        let transitions = &machine.params().transition_heights;
695
696        let validity = |tx: &SignedTransaction| {
697            verification_config.fast_recheck(
698                tx,
699                best_epoch_height,
700                transitions,
701                &spec,
702            )
703        };
704
705        let (sampled_tx, used_gas, used_size) =
706            self.deferred_pool.packing_sampler(
707                Space::Ethereum,
708                std::cmp::min(block_gas_limit, evm_gas_limit),
709                block_size_limit,
710                num_txs,
711                U256::zero(),
712                validity,
713            );
714        debug!(
715            "txpool::pack_transactions espace selected={} gas_used={} size_used={}",
716            sampled_tx.len(),
717            used_gas,
718            used_size
719        );
720        packed_transactions.extend_from_slice(&sampled_tx);
721
722        let (sampled_tx, _, _) = self.deferred_pool.packing_sampler(
723            Space::Native,
724            block_gas_limit - used_gas,
725            block_size_limit - used_size,
726            num_txs - sampled_tx.len(),
727            U256::zero(),
728            validity,
729        );
730        debug!(
731            "txpool::pack_transactions native selected={} total={}",
732            sampled_tx.len(),
733            packed_transactions.len() + sampled_tx.len()
734        );
735        packed_transactions.extend_from_slice(&sampled_tx);
736
737        if log::max_level() >= log::Level::Debug {
738            let mut rlp_s = RlpStream::new();
739            for tx in &packed_transactions {
740                rlp_s.append::<TransactionWithSignature>(&**tx);
741            }
742            debug!(
743                "After packing packed_transactions: {}, rlp size: {}",
744                packed_transactions.len(),
745                rlp_s.out().len(),
746            );
747        }
748
749        packed_transactions
750    }
751
752    pub fn pack_transactions_1559<'a>(
753        &mut self, num_txs: usize, block_gas_limit: U256,
754        parent_base_price: SpaceMap<U256>, block_size_limit: usize,
755        best_epoch_height: u64, machine: &Machine,
756        validity: impl Fn(&SignedTransaction) -> PackingCheckResult,
757    ) -> (Vec<Arc<SignedTransaction>>, SpaceMap<U256>) {
758        let mut packed_transactions: Vec<Arc<SignedTransaction>> = Vec::new();
759        if num_txs == 0 {
760            return (packed_transactions, parent_base_price);
761        }
762
763        debug!(
764            "Packing transaction for 1559, parent base price {:?}",
765            parent_base_price
766        );
767
768        let mut block_base_price = parent_base_price.clone();
769
770        let can_pack_evm =
771            machine.params().can_pack_evm_transaction(best_epoch_height);
772
773        let (evm_packed_tx_num, evm_used_size) = if can_pack_evm {
774            let gas_target = block_gas_limit * 5 / 10 / ELASTICITY_MULTIPLIER;
775            let parent_base_price = parent_base_price[Space::Ethereum];
776            let min_base_price =
777                machine.params().min_base_price()[Space::Ethereum];
778
779            let (packing_gas_limit, tx_min_price) =
780                self.deferred_pool.estimate_packing_gas_limit(
781                    Space::Ethereum,
782                    gas_target,
783                    parent_base_price,
784                    min_base_price,
785                );
786            debug!(
787                "Packing plan (espace): gas limit: {:?}, tx min price: {:?}",
788                packing_gas_limit, tx_min_price
789            );
790            let (sampled_tx, used_gas, used_size) =
791                self.deferred_pool.packing_sampler(
792                    Space::Ethereum,
793                    packing_gas_limit,
794                    block_size_limit,
795                    num_txs,
796                    tx_min_price,
797                    &validity,
798                );
799
800            // Recompute the base price, it should be <= estimated base price,
801            // since the actual used gas is <= estimated limit
802            let base_price = compute_next_price(
803                gas_target,
804                used_gas,
805                parent_base_price,
806                min_base_price,
807            );
808
809            if base_price <= tx_min_price {
810                debug!(
811                    "Packing result (espace): gas used: {:?}, base price: {:?}",
812                    used_gas, base_price
813                );
814                block_base_price[Space::Ethereum] = base_price;
815                packed_transactions.extend_from_slice(&sampled_tx);
816
817                (sampled_tx.len(), used_size)
818            } else {
819                // Should be unreachable
820                warn!(
821                    "Inconsistent packing result (espace): gas used: {:?}, base price: {:?}", 
822                    used_gas, base_price
823                );
824                block_base_price[Space::Ethereum] = compute_next_price(
825                    gas_target,
826                    U256::zero(),
827                    parent_base_price,
828                    min_base_price,
829                );
830                (0, 0)
831            }
832        } else {
833            (0, 0)
834        };
835
836        {
837            let gas_target =
838                cspace_block_gas_limit_after_cip1559(block_gas_limit)
839                    / ELASTICITY_MULTIPLIER;
840            let parent_base_price = parent_base_price[Space::Native];
841            let min_base_price =
842                machine.params().min_base_price()[Space::Native];
843
844            let (packing_gas_limit, tx_min_price) =
845                self.deferred_pool.estimate_packing_gas_limit(
846                    Space::Native,
847                    gas_target,
848                    parent_base_price,
849                    min_base_price,
850                );
851
852            debug!(
853                "Packing plan (core space): gas limit: {:?}, tx min price: {:?}",
854                packing_gas_limit, tx_min_price
855            );
856
857            let (sampled_tx, used_gas, _) = self.deferred_pool.packing_sampler(
858                Space::Native,
859                packing_gas_limit,
860                block_size_limit - evm_used_size,
861                num_txs - evm_packed_tx_num,
862                tx_min_price,
863                &validity,
864            );
865
866            // Recompute the base price, it should be <= estimated base price,
867            // since the actual used gas is <= estimated limit
868            let base_price = compute_next_price(
869                gas_target,
870                used_gas,
871                parent_base_price,
872                min_base_price,
873            );
874
875            if base_price <= tx_min_price {
876                debug!(
877                    "Packing result (core space): gas used: {:?}, base price: {:?}",
878                    used_gas, base_price
879                );
880                block_base_price[Space::Native] = base_price;
881                packed_transactions.extend_from_slice(&sampled_tx);
882            } else {
883                // Should be unreachable
884                warn!(
885                    "Inconsistent packing result (core space): gas used: {:?}, base price: {:?}", 
886                    used_gas, base_price
887                );
888                block_base_price[Space::Native] = compute_next_price(
889                    gas_target,
890                    U256::zero(),
891                    parent_base_price,
892                    min_base_price,
893                );
894            }
895        }
896
897        if log::max_level() >= log::Level::Debug {
898            let mut rlp_s = RlpStream::new();
899            for tx in &packed_transactions {
900                rlp_s.append::<TransactionWithSignature>(&**tx);
901            }
902            debug!(
903                "After packing packed_transactions: {}, rlp size: {}",
904                packed_transactions.len(),
905                rlp_s.out().len(),
906            );
907        }
908
909        (packed_transactions, block_base_price)
910    }
911
912    pub fn notify_modified_accounts(
913        &mut self, accounts_from_execution: Vec<Account>,
914    ) {
915        for account in &accounts_from_execution {
916            self.recalculate_readiness_with_fixed_info(
917                account.address(),
918                account.nonce,
919                account.balance,
920            );
921        }
922    }
923
924    /// content retrieves the ready and deferred transactions.
925    pub fn content(
926        &self, address: Option<AddressWithSpace>,
927    ) -> (Vec<Arc<SignedTransaction>>, Vec<Arc<SignedTransaction>>) {
928        let ready_txs = match address {
929            Some(addr) => self
930                .deferred_pool
931                .ready_transactions_by_address(addr)
932                .map_or(vec![], |x| x.to_vec()),
933            None => self
934                .deferred_pool
935                .all_ready_transactions()
936                .cloned()
937                .collect(),
938        };
939
940        let deferred_txs = self
941            .txs
942            .values()
943            .filter(|tx| address == None || tx.sender() == address.unwrap())
944            .map(|v| v.clone())
945            .collect();
946
947        (ready_txs, deferred_txs)
948    }
949
950    pub fn eth_content(
951        &self, space: Option<Space>,
952    ) -> (
953        BTreeMap<AddressWithSpace, BTreeMap<U256, Arc<SignedTransaction>>>,
954        BTreeMap<AddressWithSpace, BTreeMap<U256, Arc<SignedTransaction>>>,
955    ) {
956        let get_local_nonce_and_balance = |addr: &AddressWithSpace| {
957            self.ready_nonces_and_balances
958                .get(addr)
959                .map(|x| *x)
960                .unwrap_or_default()
961        };
962        self.deferred_pool
963            .eth_content(space, get_local_nonce_and_balance)
964    }
965
966    pub fn eth_content_from(
967        &self, from: AddressWithSpace,
968    ) -> (
969        BTreeMap<U256, Arc<SignedTransaction>>,
970        BTreeMap<U256, Arc<SignedTransaction>>,
971    ) {
972        let (local_nonce, local_balance) =
973            self.get_local_nonce_and_balance(&from).unwrap_or_default();
974        self.deferred_pool
975            .eth_content_from(from, local_nonce, local_balance)
976    }
977
978    // Add transaction into deferred pool and maintain its readiness
979    // the packed tag provided
980    // if force tag is true, the replacement in nonce pool must be happened
981    pub fn insert_transaction_with_readiness_check(
982        &mut self, state: &StateProvider, transaction: Arc<SignedTransaction>,
983        packed: bool, force: bool,
984    ) -> Result<(), TransactionPoolError> {
985        let _timer = MeterTimer::time_func(TX_POOL_INNER_INSERT_TIMER.as_ref());
986        let (sponsored_gas, sponsored_storage) =
987            self.get_sponsored_gas_and_storage(state, &transaction)?;
988
989        let (state_nonce, state_balance) =
990            state.get_nonce_and_balance(&transaction.sender())?;
991
992        if transaction.hash[0] & 254 == 0 {
993            trace!(
994                "Transaction {:?} sender: {:?} current nonce: {:?}, state nonce:{:?}",
995                transaction.hash, transaction.sender, transaction.nonce(), state_nonce
996            );
997        }
998        if *transaction.nonce()
999            >= state_nonce
1000                + U256::from(FURTHEST_FUTURE_TRANSACTION_NONCE_OFFSET)
1001        {
1002            trace!(
1003                "Transaction {:?} is discarded due to in too distant future",
1004                transaction.hash()
1005            );
1006            return Err(TransactionPoolError::NonceTooDistant {
1007                hash: transaction.hash(),
1008                nonce: *transaction.nonce(),
1009            });
1010        } else if !packed /* Because we may get slightly out-dated state for transaction pool, we should allow transaction pool to set already past-nonce transactions to packed. */
1011            && *transaction.nonce() < state_nonce
1012        {
1013            trace!(
1014                "Transaction {:?} is discarded due to a too stale nonce, self.nonce()={}, state_nonce={}",
1015                transaction.hash(), transaction.nonce(), state_nonce,
1016            );
1017            return Err(TransactionPoolError::NonceTooStale {
1018                hash: transaction.hash(),
1019                nonce: *transaction.nonce(),
1020            });
1021        }
1022
1023        // check balance
1024        if !packed && !force {
1025            let mut need_balance = U256::from(0);
1026            let estimate_gas_fee = Self::cal_gas_fee(
1027                transaction.gas().clone(),
1028                transaction.gas_price().clone(),
1029            );
1030            match transaction.unsigned {
1031                Transaction::Native(ref utx) => {
1032                    need_balance += utx.value().clone();
1033                    if sponsored_gas == U256::from(0) {
1034                        need_balance += estimate_gas_fee;
1035                    }
1036                    if sponsored_storage == 0 {
1037                        need_balance += U256::from(*utx.storage_limit())
1038                            * *DRIPS_PER_STORAGE_COLLATERAL_UNIT;
1039                    }
1040                }
1041                Transaction::Ethereum(ref utx) => {
1042                    need_balance += utx.value().clone();
1043                    need_balance += estimate_gas_fee;
1044                }
1045            }
1046
1047            if need_balance > state_balance {
1048                let msg = format!(
1049                    "Transaction {:?} is discarded due to out of balance, needs {:?} but account balance is {:?}",
1050                    transaction.hash(),
1051                    need_balance,
1052                    state_balance
1053                );
1054                trace!("{}", msg);
1055                return Err(TransactionPoolError::OutOfBalance {
1056                    need: need_balance,
1057                    have: state_balance,
1058                    hash: transaction.hash(),
1059                });
1060            }
1061        }
1062
1063        let result = self.insert_transaction_without_readiness_check(
1064            transaction.clone(),
1065            packed,
1066            force,
1067            (state_nonce, state_balance),
1068            (sponsored_gas, sponsored_storage),
1069        );
1070        if let InsertResult::Failed(err) = result {
1071            return Err(err);
1072        }
1073
1074        self.recalculate_readiness_with_state(&transaction.sender(), state)?;
1075
1076        Ok(())
1077    }
1078
1079    fn cal_gas_fee(gas: U256, gas_price: U256) -> U256 {
1080        let estimated_gas_u512 = gas.full_mul(gas_price);
1081        // Normally, it is less than 2^128
1082        let estimated_gas =
1083            if estimated_gas_u512 > U512::from(U128::max_value()) {
1084                U256::from(U128::max_value())
1085            } else {
1086                gas * gas_price
1087            };
1088        estimated_gas
1089    }
1090
1091    pub fn get_sponsored_gas_and_storage(
1092        &self, state: &StateProvider, transaction: &SignedTransaction,
1093    ) -> StateDbResult<(U256, u64)> {
1094        let sender = transaction.sender();
1095
1096        // Filter out espace transactions
1097        let utx = if let Transaction::Native(ref utx) = transaction.unsigned {
1098            utx
1099        } else {
1100            return Ok(Default::default());
1101        };
1102
1103        // Keep contract call only
1104        let contract_address = match utx.action() {
1105            Action::Call(callee) if callee.is_contract_address() => callee,
1106            _ => {
1107                return Ok(Default::default());
1108            }
1109        };
1110
1111        // Get sponsor info
1112        let sponsor_info = if let Some(sponsor_info) =
1113            state.get_sponsor_info(&contract_address)?
1114        {
1115            sponsor_info
1116        } else {
1117            return Ok(Default::default());
1118        };
1119
1120        // Check if sender is eligible for sponsor
1121        if !state
1122            .check_commission_privilege(&contract_address, &sender.address)?
1123        {
1124            return Ok(Default::default());
1125        }
1126
1127        // Detailed logics
1128        let estimated_gas = Self::cal_gas_fee(
1129            transaction.gas().clone(),
1130            transaction.gas_price().clone(),
1131        );
1132        let sponsored_gas = if estimated_gas <= sponsor_info.sponsor_gas_bound
1133            && estimated_gas <= sponsor_info.sponsor_balance_for_gas
1134        {
1135            utx.gas().clone()
1136        } else {
1137            0.into()
1138        };
1139
1140        let estimated_collateral = U256::from(*utx.storage_limit())
1141            * *DRIPS_PER_STORAGE_COLLATERAL_UNIT;
1142        let sponsored_collateral = if estimated_collateral
1143            <= sponsor_info.sponsor_balance_for_collateral
1144                + sponsor_info.unused_storage_points()
1145        {
1146            *utx.storage_limit()
1147        } else {
1148            0
1149        };
1150
1151        Ok((sponsored_gas, sponsored_collateral))
1152    }
1153}
1154
1155#[cfg(test)]
1156mod tests {
1157    use crate::verification::PackingCheckResult;
1158
1159    use super::TransactionPoolInner;
1160    use crate::keylib::{Generator, KeyPair, Random};
1161    use cfx_executor::{
1162        machine::{Machine, VmFactory},
1163        spec::CommonParams,
1164    };
1165    use cfx_parameters::block::{
1166        cspace_block_gas_limit_after_cip1559, espace_block_gas_limit,
1167    };
1168    use cfx_types::{Address, Space, SpaceMap, U256};
1169    use itertools::Itertools;
1170    use primitives::{
1171        block_header::compute_next_price_tuple,
1172        transaction::{
1173            native_transaction::NativeTransaction, Eip155Transaction,
1174        },
1175        Action, SignedTransaction, Transaction,
1176    };
1177    use std::sync::Arc;
1178
1179    fn new_test_tx(
1180        sender: &KeyPair, nonce: usize, gas_price: usize, gas: usize,
1181        value: usize, space: Space,
1182    ) -> Arc<SignedTransaction> {
1183        let tx: Transaction = match space {
1184            Space::Native => NativeTransaction {
1185                nonce: U256::from(nonce),
1186                gas_price: U256::from(gas_price),
1187                gas: U256::from(gas),
1188                action: Action::Call(Address::random()),
1189                value: U256::from(value),
1190                storage_limit: 0,
1191                epoch_height: 0,
1192                chain_id: 1,
1193                data: Vec::new(),
1194            }
1195            .into(),
1196            Space::Ethereum => Eip155Transaction {
1197                nonce: U256::from(nonce),
1198                gas_price: U256::from(gas_price),
1199                gas: U256::from(gas),
1200                action: Action::Call(Address::random()),
1201                value: U256::from(value),
1202                chain_id: Some(1),
1203                data: Vec::new(),
1204            }
1205            .into(),
1206        };
1207        Arc::new(tx.sign(sender.secret()))
1208    }
1209
1210    fn pack_transactions_1559_checked(
1211        pool: &mut TransactionPoolInner, machine: &Machine,
1212    ) {
1213        let parent_base_price = SpaceMap::new(100, 200).map_all(U256::from);
1214        let block_gas_limit = U256::from(6000);
1215        let best_epoch_height = 20;
1216
1217        let (txs, base_price) = pool.pack_transactions_1559(
1218            usize::MAX,
1219            block_gas_limit,
1220            parent_base_price,
1221            usize::MAX,
1222            best_epoch_height,
1223            machine,
1224            |_| PackingCheckResult::Pack,
1225        );
1226
1227        let params = machine.params();
1228
1229        let core_gas_limit =
1230            cspace_block_gas_limit_after_cip1559(block_gas_limit);
1231        let eth_gas_limit = espace_block_gas_limit(
1232            params.can_pack_evm_transaction(best_epoch_height),
1233            block_gas_limit,
1234        );
1235
1236        let gas_target =
1237            SpaceMap::new(core_gas_limit, eth_gas_limit).map_all(|x| x / 2);
1238
1239        let mut gas_used = SpaceMap::default();
1240        let mut min_gas_price =
1241            SpaceMap::new(U256::max_value(), U256::max_value());
1242
1243        for tx in txs {
1244            gas_used[tx.space()] += *tx.gas_limit();
1245            min_gas_price[tx.space()] =
1246                min_gas_price[tx.space()].min(*tx.gas_price());
1247        }
1248
1249        let min_base_price = params.min_base_price();
1250
1251        let expected_base_price = SpaceMap::zip4(
1252            gas_target,
1253            gas_used,
1254            parent_base_price,
1255            min_base_price,
1256        )
1257        .map_all(compute_next_price_tuple);
1258
1259        assert_eq!(expected_base_price, base_price);
1260        assert!(gas_used[Space::Native] <= core_gas_limit);
1261        assert!(gas_used[Space::Ethereum] <= eth_gas_limit);
1262
1263        for space in [Space::Native, Space::Ethereum] {
1264            assert!(base_price[space] <= min_gas_price[space]);
1265        }
1266    }
1267
1268    #[test]
1269    fn test_pack_eip1559_transactions() {
1270        let mut pool = TransactionPoolInner::new_for_test();
1271
1272        let mut params = CommonParams::default();
1273        params.min_base_price = SpaceMap::new(100, 200).map_all(U256::from);
1274
1275        let machine = Arc::new(Machine::new(params, VmFactory::default()));
1276
1277        let test_block_limit = SpaceMap::new(5400, 3000);
1278
1279        let senders: Vec<_> = (0..20)
1280            .into_iter()
1281            .map(|_| Random.generate().unwrap())
1282            .collect();
1283
1284        let tasks = [1, 2, 3]
1285            .into_iter()
1286            .cartesian_product(
1287                /* gas_price */ [50usize, 95, 100, 105, 150, 1000],
1288            )
1289            .cartesian_product(
1290                /* gas_limit_percent */ [5usize, 10, 40, 60, 100],
1291            )
1292            .cartesian_product(/* price_increasing */ [0usize, 1]);
1293
1294        for (((space_bits, gas_price), gas_limit_percent), price_inc) in tasks {
1295            let tx_gas_limit =
1296                test_block_limit.map_all(|x| x * gas_limit_percent / 100);
1297
1298            for (idx, sender) in senders.iter().enumerate() {
1299                let gas_price = gas_price + idx * price_inc;
1300
1301                if space_bits & 0x1 != 0 {
1302                    let tx = new_test_tx(
1303                        sender,
1304                        0,
1305                        gas_price,
1306                        tx_gas_limit[Space::Native],
1307                        0,
1308                        Space::Native,
1309                    );
1310                    pool.insert_transaction_for_test(tx, U256::zero());
1311                }
1312
1313                if space_bits & 0x2 != 0 {
1314                    let tx = new_test_tx(
1315                        sender,
1316                        0,
1317                        gas_price * 2,
1318                        tx_gas_limit[Space::Ethereum],
1319                        0,
1320                        Space::Ethereum,
1321                    );
1322                    pool.insert_transaction_for_test(tx, U256::zero());
1323                }
1324            }
1325            pack_transactions_1559_checked(&mut pool, &machine);
1326            pool.clear();
1327        }
1328    }
1329}