1use super::{
2 deferred_pool::DeferredPool,
3 garbage_collector::GarbageCollector,
4 nonce_pool::{InsertResult, TxWithReadyInfo},
5 pool_metrics::pool_inner_metrics::*,
6 state_provider::StateProvider,
7 TransactionPoolError,
8};
9
10use crate::verification::{PackingCheckResult, VerificationConfig};
11use cfx_executor::machine::Machine;
12use cfx_packing_pool::PackingPoolConfig;
13use cfx_parameters::{
14 block::cspace_block_gas_limit_after_cip1559,
15 consensus_internal::ELASTICITY_MULTIPLIER,
16 staking::DRIPS_PER_STORAGE_COLLATERAL_UNIT,
17};
18
19use cfx_rpc_cfx_types::TransactionStatus;
20use cfx_statedb::Result as StateDbResult;
21use cfx_types::{
22 address_util::AddressUtil, AddressWithSpace, Space, SpaceMap, H256, U128,
23 U256, U512,
24};
25use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
26use metrics::MeterTimer;
27use primitives::{
28 block_header::compute_next_price, Account, Action, SignedTransaction,
29 Transaction, TransactionWithSignature,
30};
31use rlp::*;
32use std::{
33 collections::{BTreeMap, BTreeSet, HashMap},
34 sync::Arc,
35 time::{SystemTime, UNIX_EPOCH},
36};
37
38const FURTHEST_FUTURE_TRANSACTION_NONCE_OFFSET: u32 = 2000;
43
44#[derive(Default, DeriveMallocSizeOf)]
45pub struct TransactionSet {
46 inner: HashMap<H256, Arc<SignedTransaction>>,
47 count: SpaceMap<usize>,
48}
49
50impl TransactionSet {
51 fn get(&self, tx_hash: &H256) -> Option<&Arc<SignedTransaction>> {
52 self.inner.get(tx_hash)
53 }
54
55 fn values(
56 &self,
57 ) -> std::collections::hash_map::Values<'_, H256, Arc<SignedTransaction>>
58 {
59 self.inner.values()
60 }
61
62 fn insert(
63 &mut self, tx_hash: H256, tx: Arc<SignedTransaction>,
64 ) -> Option<Arc<SignedTransaction>> {
65 *self.count.in_space_mut(tx.space()) += 1;
66 let res = self.inner.insert(tx_hash, tx);
67 if let Some(ref tx) = res {
68 *self.count.in_space_mut(tx.space()) -= 1;
69 }
70 res
71 }
72
73 fn remove(&mut self, tx_hash: &H256) -> Option<Arc<SignedTransaction>> {
74 let res = self.inner.remove(tx_hash);
75 if let Some(ref tx) = res {
76 *self.count.in_space_mut(tx.space()) -= 1;
77 }
78 res
79 }
80
81 fn clear(&mut self) {
82 self.inner.clear();
83 self.count.apply_all(|x| *x = 0);
84 }
85}
86
87#[derive(DeriveMallocSizeOf)]
88pub struct TransactionPoolInner {
89 capacity: usize,
90 total_received_count: usize,
92 unpacked_transaction_count: SpaceMap<usize>,
93 deferred_pool: DeferredPool,
96 ready_nonces_and_balances: HashMap<AddressWithSpace, (U256, U256)>,
101 garbage_collector: SpaceMap<GarbageCollector>,
102 txs: TransactionSet,
105}
106
107impl TransactionPoolInner {
108 pub fn new(
109 capacity: usize, max_packing_batch_gas_limit: usize,
110 max_packing_batch_size: usize, packing_pool_degree: u8,
111 ) -> Self {
112 let config = PackingPoolConfig::new(
113 max_packing_batch_gas_limit.into(),
114 max_packing_batch_size,
115 packing_pool_degree,
116 );
117 TransactionPoolInner {
118 capacity,
119 total_received_count: 0,
120 unpacked_transaction_count: SpaceMap::default(),
121 deferred_pool: DeferredPool::new(config),
122 ready_nonces_and_balances: HashMap::new(),
123 garbage_collector: SpaceMap::default(),
124 txs: TransactionSet::default(),
125 }
126 }
127
128 #[cfg(test)]
129 pub fn new_for_test() -> Self { Self::new(50_000, 3_000_000, 50, 4) }
130
131 pub fn clear(&mut self) {
132 self.deferred_pool.clear();
133 self.ready_nonces_and_balances.clear();
134 self.garbage_collector.apply_all(|x| x.clear());
135 self.txs.clear();
136 self.total_received_count = 0;
137 self.unpacked_transaction_count.apply_all(|x| *x = 0);
138 }
139
140 pub fn total_deferred(&self, space: Option<Space>) -> usize {
141 match space {
142 Some(space) => *self.txs.count.in_space(space),
143 None => self.txs.count.map_sum(|x| *x),
144 }
145 }
146
147 pub fn ready_transacton_hashes_in_evm_pool(&self) -> BTreeSet<H256> {
148 self.deferred_pool
149 .ready_transaction_hashes(Space::Ethereum)
150 .collect()
151 }
152
153 pub fn ready_transacton_hashes_in_native_pool(&self) -> BTreeSet<H256> {
154 self.deferred_pool
155 .ready_transaction_hashes(Space::Native)
156 .collect()
157 }
158
159 pub fn total_ready_accounts(&self) -> usize {
160 self.deferred_pool.ready_account_number(Space::Ethereum)
161 + self.deferred_pool.ready_account_number(Space::Native)
162 }
163
164 pub fn total_received(&self) -> usize { self.total_received_count }
165
166 pub fn total_unpacked(&self, space: Option<Space>) -> usize {
167 match space {
168 Some(space) => *self.unpacked_transaction_count.in_space(space),
169 None => self.unpacked_transaction_count.map_sum(|x| *x),
170 }
171 }
172
173 pub fn total_pending(&self, space: Option<Space>) -> u64 {
174 let get_nonce_and_balance = |addr: &AddressWithSpace| {
175 self.ready_nonces_and_balances
176 .get(addr)
177 .map(|x| *x)
178 .unwrap_or_default()
179 };
180 self.deferred_pool
181 .pending_tx_number(space, get_nonce_and_balance)
182 }
183
184 pub fn total_queued(&self, space: Option<Space>) -> u64 {
185 self.total_unpacked(space) as u64 - self.total_pending(space)
186 }
187
188 pub fn get(&self, tx_hash: &H256) -> Option<Arc<SignedTransaction>> {
189 self.txs.get(tx_hash).map(|x| x.clone())
190 }
191
192 pub fn get_by_address2nonce(
193 &self, address: AddressWithSpace, nonce: U256,
194 ) -> Option<Arc<SignedTransaction>> {
195 let bucket = self.deferred_pool.get_bucket(&address)?;
196 bucket.get_tx_by_nonce(nonce).map(|tx| tx.transaction)
197 }
198
199 pub fn is_full(&self, space: Space) -> bool {
200 return self.total_deferred(Some(space)) >= self.capacity;
201 }
202
203 pub fn get_current_timestamp(&self) -> u64 {
204 let start = SystemTime::now();
205 let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap();
206 since_the_epoch.as_secs()
207 }
208
209 pub fn collect_garbage(&mut self, new_tx: &SignedTransaction) {
220 let space = new_tx.space();
221 let count_before_gc = self.total_deferred(Some(space));
222 let mut skipped_self_node = None;
223 while self.is_full(space)
224 && !self.garbage_collector.in_space(space).is_empty()
225 {
226 let current_timestamp = self.get_current_timestamp();
227 let (victim_address, victim) =
228 self.garbage_collector.in_space(space).top().unwrap();
229 if !self.deferred_pool.contain_address(victim_address) {
232 self.garbage_collector.in_space_mut(space).pop();
233 continue;
234 }
235
236 if victim.count == 0 {
240 if *victim_address == new_tx.sender() {
241 let (victim_address, victim) = self
244 .garbage_collector
245 .in_space_mut(space)
246 .pop()
247 .unwrap();
248 skipped_self_node = Some((victim_address, victim));
249 continue;
250 } else if victim.has_ready_tx
251 && victim.first_tx_gas_price >= *new_tx.gas_price()
252 {
253 trace!("txpool::collect_garbage fails, victim={:?} new_tx={:?} \
260 new_tx_gas_price={:?}", victim, new_tx.hash(), new_tx.gas_price());
261 return;
262 }
263 }
264
265 let (victim_address, victim) =
267 self.garbage_collector.in_space_mut(space).pop().unwrap();
268
269 let (ready_nonce, _) = self
270 .get_local_nonce_and_balance(&victim_address)
271 .unwrap_or((0.into(), 0.into()));
272
273 let tx_with_ready_info = self
274 .deferred_pool
275 .remove_lowest_nonce(&victim_address)
276 .unwrap();
277 let to_remove_tx = tx_with_ready_info.get_arc_tx().clone();
278 trace!(
279 "txpool::collect_garbage removed tx {:?} sender={:?} nonce={:?} new_tx={:?}",
280 to_remove_tx.hash(),
281 victim_address,
282 to_remove_tx.nonce(),
283 new_tx.hash()
284 );
285
286 if *to_remove_tx.nonce() >= ready_nonce {
289 assert_eq!(victim.count, 0);
290 GC_UNEXECUTED_COUNTER.inc(1);
291 warn!("an unexecuted tx is garbage-collected.");
292 }
293
294 if !tx_with_ready_info.is_already_packed() {
295 let tx_space = tx_with_ready_info.space();
296 *self.unpacked_transaction_count.in_space_mut(tx_space) = self
297 .unpacked_transaction_count
298 .in_space(tx_space)
299 .checked_sub(1)
300 .unwrap_or_else(|| {
301 error!("unpacked_transaction_count under-flows.");
302 0
303 });
304 }
305
306 if !self.deferred_pool.contain_address(&victim_address) {
308 self.ready_nonces_and_balances.remove(&victim_address);
309 } else {
312 let has_ready_tx =
313 self.deferred_pool.has_ready_tx(&victim_address);
314 let first_tx_gas_price = *self
315 .deferred_pool
316 .get_lowest_nonce_tx(&victim_address)
317 .expect("addr exist")
318 .gas_price();
319 let count = if victim.count > 0 {
320 victim.count - 1
321 } else {
322 0
323 };
324 self.garbage_collector.in_space_mut(space).insert(
325 &victim_address,
326 count,
327 current_timestamp,
328 has_ready_tx,
329 first_tx_gas_price,
330 );
331 }
332
333 self.txs.remove(&to_remove_tx.hash());
335 }
336
337 if let Some((addr, node)) = skipped_self_node {
340 self.garbage_collector.in_space_mut(space).insert(
341 &addr,
342 node.count,
343 node.timestamp,
344 node.has_ready_tx,
345 node.first_tx_gas_price,
346 );
347 }
348 GC_METER.mark(count_before_gc - self.total_deferred(Some(space)));
349 }
350
351 pub fn remaining_quota(&self) -> usize {
354 let len = self.total_deferred(None);
355 self.garbage_collector.size() * self.capacity - len
356 + self.garbage_collector.map_sum(|x| x.gc_size())
357 }
358
359 pub fn capacity(&self) -> usize { self.capacity }
360
361 #[cfg(test)]
362 fn insert_transaction_for_test(
363 &mut self, transaction: Arc<SignedTransaction>, sender_nonce: U256,
364 ) -> InsertResult {
365 let sender = transaction.sender();
366 let res = self.insert_transaction_without_readiness_check(
367 transaction,
368 false,
369 true,
370 (sender_nonce, U256::from(u64::MAX)),
371 (0.into(), 0),
372 );
373 self.recalculate_readiness(&sender, sender_nonce, U256::from(u64::MAX));
374 res
375 }
376
377 fn insert_transaction_without_readiness_check(
379 &mut self, transaction: Arc<SignedTransaction>, packed: bool,
380 force: bool, state_nonce_and_balance: (U256, U256),
381 (sponsored_gas, sponsored_storage): (U256, u64),
382 ) -> InsertResult {
383 let _timer = MeterTimer::time_func(
384 TX_POOL_INNER_WITHOUTCHECK_INSERT_TIMER.as_ref(),
385 );
386 if !self.deferred_pool.check_sender_and_nonce_exists(
387 &transaction.sender(),
388 &transaction.nonce(),
389 ) {
390 self.collect_garbage(transaction.as_ref());
391 if self.is_full(transaction.space()) {
392 return InsertResult::Failed(TransactionPoolError::TxPoolFull);
393 }
394 }
395 let result = {
396 let _timer =
397 MeterTimer::time_func(DEFERRED_POOL_INNER_INSERT.as_ref());
398 self.deferred_pool.insert(
399 TxWithReadyInfo::new(
400 transaction.clone(),
401 packed,
402 sponsored_gas,
403 sponsored_storage,
404 ),
405 force,
406 )
407 };
408
409 let tx_space = transaction.space();
410 match &result {
411 InsertResult::NewAdded => {
412 let (state_nonce, state_balance) = state_nonce_and_balance;
413 self.update_nonce_and_balance(
414 &transaction.sender(),
415 state_nonce,
416 state_balance,
417 );
418 self.txs.insert(transaction.hash(), transaction.clone());
420 if !packed {
421 *self.unpacked_transaction_count.in_space_mut(tx_space) +=
422 1;
423 }
424 }
425 InsertResult::Failed(_) => {}
426 InsertResult::Updated(replaced_tx) => {
427 if !replaced_tx.is_already_packed() {
428 *self.unpacked_transaction_count.in_space_mut(tx_space) =
429 self.unpacked_transaction_count
430 .in_space(tx_space)
431 .checked_sub(1)
432 .unwrap_or_else(|| {
433 error!(
434 "unpacked_transaction_count under-flows."
435 );
436 0
437 });
438 }
439 self.txs.remove(&replaced_tx.hash());
440 self.txs.insert(transaction.hash(), transaction.clone());
441 if !packed {
442 *self.unpacked_transaction_count.in_space_mut(tx_space) +=
443 1;
444 }
445 }
446 }
447
448 result
449 }
450
451 #[allow(dead_code)]
452 fn mark_packed(&mut self, tx: &SignedTransaction, packed: bool) {
453 let changed =
454 self.deferred_pool
455 .mark_packed(tx.sender(), tx.nonce(), packed);
456 if changed {
457 let tx_space = tx.space();
458 if packed {
459 if *self.unpacked_transaction_count.in_space(tx_space) == 0 {
460 error!("unpacked_transaction_count under-flows.");
461 } else {
462 *self.unpacked_transaction_count.in_space_mut(tx_space) -=
463 1;
464 }
465 } else {
466 *self.unpacked_transaction_count.in_space_mut(tx_space) += 1;
467 }
468 }
469 }
470
471 pub fn get_account_pending_info(
472 &self, address: &AddressWithSpace,
473 ) -> Option<(U256, U256, U256, H256)> {
474 let (local_nonce, _local_balance) = self
475 .get_local_nonce_and_balance(address)
476 .unwrap_or((U256::from(0), U256::from(0)));
477 match self.deferred_pool.get_pending_info(address, &local_nonce) {
478 Some((pending_count, pending_tx)) => Some((
479 local_nonce,
480 U256::from(pending_count),
481 *pending_tx.nonce(),
482 pending_tx.hash(),
483 )),
484 None => {
485 Some((local_nonce, U256::from(0), U256::from(0), H256::zero()))
486 }
487 }
488 }
489
490 pub fn get_account_pending_transactions(
491 &self, address: &AddressWithSpace, maybe_start_nonce: Option<U256>,
492 maybe_limit: Option<usize>,
493 ) -> (
494 Vec<Arc<SignedTransaction>>,
495 Option<TransactionStatus>,
496 usize,
497 ) {
498 let (local_nonce, local_balance) = self
499 .get_local_nonce_and_balance(address)
500 .unwrap_or((U256::from(0), U256::from(0)));
501 let start_nonce = maybe_start_nonce.unwrap_or(local_nonce);
502 let (pending_txs, pending_reason) =
503 self.deferred_pool.get_pending_transactions(
504 address,
505 &start_nonce,
506 &local_nonce,
507 &local_balance,
508 );
509 if pending_txs.is_empty() {
510 return (Vec::new(), None, 0);
511 }
512 let first_tx_status = match pending_reason {
513 None => TransactionStatus::Ready,
514 Some(reason) => TransactionStatus::Pending(reason),
515 };
516 let pending_count = pending_txs.len();
517 let limit = maybe_limit.unwrap_or(usize::MAX);
518 (
519 pending_txs
520 .into_iter()
521 .map(|x| x.transaction.clone())
522 .take(limit)
523 .collect(),
524 Some(first_tx_status),
525 pending_count,
526 )
527 }
528
529 pub fn get_local_nonce_and_balance(
530 &self, address: &AddressWithSpace,
531 ) -> Option<(U256, U256)> {
532 self.ready_nonces_and_balances.get(address).map(|x| *x)
533 }
534
535 fn update_nonce_and_balance(
536 &mut self, address: &AddressWithSpace, nonce: U256, balance: U256,
537 ) {
538 if !self.deferred_pool.contain_address(address) {
539 return;
540 }
541 self.ready_nonces_and_balances
542 .insert((*address).clone(), (nonce, balance));
543 }
544
545 fn get_and_update_nonce_and_balance_from_storage(
546 &mut self, address: &AddressWithSpace, state: &StateProvider,
547 ) -> StateDbResult<(U256, U256)> {
548 let nonce_and_balance = state.get_nonce_and_balance(address)?;
549 if !self.deferred_pool.contain_address(address) {
550 return Ok(nonce_and_balance);
551 }
552 self.ready_nonces_and_balances
553 .insert((*address).clone(), nonce_and_balance);
554
555 Ok(nonce_and_balance)
556 }
557
558 pub fn get_lowest_nonce(&self, addr: &AddressWithSpace) -> U256 {
559 let mut ret = 0.into();
560 if let Some((nonce, _)) = self.get_local_nonce_and_balance(addr) {
561 ret = nonce;
562 }
563 if let Some(nonce) = self.deferred_pool.get_lowest_nonce(addr) {
564 if *nonce < ret {
565 ret = *nonce;
566 }
567 }
568 ret
569 }
570
571 pub fn get_next_nonce(
572 &self, address: &AddressWithSpace, state_nonce: U256,
573 ) -> U256 {
574 self.deferred_pool
575 .last_succ_nonce(*address, state_nonce)
576 .unwrap_or(state_nonce)
577 }
578
579 #[allow(dead_code)]
580 fn recalculate_readiness_with_local_info(
581 &mut self, addr: &AddressWithSpace,
582 ) {
583 let (nonce, balance) = self
584 .get_local_nonce_and_balance(addr)
585 .unwrap_or((0.into(), 0.into()));
586 self.recalculate_readiness(addr, nonce, balance);
587 }
588
589 fn recalculate_readiness_with_fixed_info(
590 &mut self, addr: &AddressWithSpace, nonce: U256, balance: U256,
591 ) {
592 self.update_nonce_and_balance(addr, nonce, balance);
593 self.recalculate_readiness(addr, nonce, balance);
594 }
595
596 fn recalculate_readiness_with_state(
597 &mut self, addr: &AddressWithSpace, state: &StateProvider,
598 ) -> StateDbResult<()> {
599 let _timer = MeterTimer::time_func(TX_POOL_RECALCULATE.as_ref());
600 let (nonce, balance) =
601 self.get_and_update_nonce_and_balance_from_storage(addr, state)?;
602 self.recalculate_readiness(addr, nonce, balance);
603 trace!(
604 "txpool::recalculate_readiness_with_state addr={:?} nonce={:?} balance={:?}",
605 addr,
606 nonce,
607 balance
608 );
609 Ok(())
610 }
611
612 fn recalculate_readiness(
613 &mut self, addr: &AddressWithSpace, nonce: U256, balance: U256,
614 ) {
615 let space = addr.space;
616 let ret = self
617 .deferred_pool
618 .recalculate_readiness_with_local_info(addr, nonce, balance);
619 match &ret {
620 Some(tx) => trace!(
621 "txpool::recalculate_readiness addr={:?} state_nonce={:?} ready_nonce={:?} ready_hash={:?} balance={:?}",
622 addr,
623 nonce,
624 tx.nonce(),
625 tx.hash(),
626 balance
627 ),
628 None => trace!(
629 "txpool::recalculate_readiness addr={:?} state_nonce={:?} no_ready_tx balance={:?}",
630 addr,
631 nonce,
632 balance
633 ),
634 }
635 if let Some(tx) = self.deferred_pool.get_lowest_nonce_tx(addr) {
638 let count = self.deferred_pool.count_less(addr, &nonce);
639 let timestamp = self
640 .garbage_collector
641 .in_space(space)
642 .get_timestamp(addr)
643 .unwrap_or(self.get_current_timestamp());
644 self.garbage_collector.in_space_mut(space).insert(
645 addr,
646 count,
647 timestamp,
648 ret.is_some(),
649 *tx.gas_price(),
650 );
651 } else {
652 debug!(
657 "recalculate_readiness called for missing account: addr={:?}",
658 addr
659 );
660 }
661 }
662
663 pub fn check_tx_packed_in_deferred_pool(&self, tx_hash: &H256) -> bool {
664 match self.txs.get(tx_hash) {
665 Some(tx) => {
666 self.deferred_pool.check_tx_packed(tx.sender(), *tx.nonce())
667 }
668 None => false,
669 }
670 }
671
672 pub fn pack_transactions<'a>(
674 &mut self, num_txs: usize, block_gas_limit: U256, evm_gas_limit: U256,
675 block_size_limit: usize, best_epoch_height: u64,
676 best_block_number: u64, verification_config: &VerificationConfig,
677 machine: &Machine,
678 ) -> Vec<Arc<SignedTransaction>> {
679 let mut packed_transactions: Vec<Arc<SignedTransaction>> = Vec::new();
680 if num_txs == 0 {
681 return packed_transactions;
682 }
683 debug!(
684 "txpool::pack_transactions start best_epoch={} best_block={} block_gas_limit={} evm_gas_limit={} block_size_limit={} limit={}",
685 best_epoch_height,
686 best_block_number,
687 block_gas_limit,
688 evm_gas_limit,
689 block_size_limit,
690 num_txs
691 );
692
693 let spec = machine.spec(best_block_number, best_epoch_height);
694 let transitions = &machine.params().transition_heights;
695
696 let validity = |tx: &SignedTransaction| {
697 verification_config.fast_recheck(
698 tx,
699 best_epoch_height,
700 transitions,
701 &spec,
702 )
703 };
704
705 let (sampled_tx, used_gas, used_size) =
706 self.deferred_pool.packing_sampler(
707 Space::Ethereum,
708 std::cmp::min(block_gas_limit, evm_gas_limit),
709 block_size_limit,
710 num_txs,
711 U256::zero(),
712 validity,
713 );
714 debug!(
715 "txpool::pack_transactions espace selected={} gas_used={} size_used={}",
716 sampled_tx.len(),
717 used_gas,
718 used_size
719 );
720 packed_transactions.extend_from_slice(&sampled_tx);
721
722 let (sampled_tx, _, _) = self.deferred_pool.packing_sampler(
723 Space::Native,
724 block_gas_limit - used_gas,
725 block_size_limit - used_size,
726 num_txs - sampled_tx.len(),
727 U256::zero(),
728 validity,
729 );
730 debug!(
731 "txpool::pack_transactions native selected={} total={}",
732 sampled_tx.len(),
733 packed_transactions.len() + sampled_tx.len()
734 );
735 packed_transactions.extend_from_slice(&sampled_tx);
736
737 if log::max_level() >= log::Level::Debug {
738 let mut rlp_s = RlpStream::new();
739 for tx in &packed_transactions {
740 rlp_s.append::<TransactionWithSignature>(&**tx);
741 }
742 debug!(
743 "After packing packed_transactions: {}, rlp size: {}",
744 packed_transactions.len(),
745 rlp_s.out().len(),
746 );
747 }
748
749 packed_transactions
750 }
751
752 pub fn pack_transactions_1559<'a>(
753 &mut self, num_txs: usize, block_gas_limit: U256,
754 parent_base_price: SpaceMap<U256>, block_size_limit: usize,
755 best_epoch_height: u64, machine: &Machine,
756 validity: impl Fn(&SignedTransaction) -> PackingCheckResult,
757 ) -> (Vec<Arc<SignedTransaction>>, SpaceMap<U256>) {
758 let mut packed_transactions: Vec<Arc<SignedTransaction>> = Vec::new();
759 if num_txs == 0 {
760 return (packed_transactions, parent_base_price);
761 }
762
763 debug!(
764 "Packing transaction for 1559, parent base price {:?}",
765 parent_base_price
766 );
767
768 let mut block_base_price = parent_base_price.clone();
769
770 let can_pack_evm =
771 machine.params().can_pack_evm_transaction(best_epoch_height);
772
773 let (evm_packed_tx_num, evm_used_size) = if can_pack_evm {
774 let gas_target = block_gas_limit * 5 / 10 / ELASTICITY_MULTIPLIER;
775 let parent_base_price = parent_base_price[Space::Ethereum];
776 let min_base_price =
777 machine.params().min_base_price()[Space::Ethereum];
778
779 let (packing_gas_limit, tx_min_price) =
780 self.deferred_pool.estimate_packing_gas_limit(
781 Space::Ethereum,
782 gas_target,
783 parent_base_price,
784 min_base_price,
785 );
786 debug!(
787 "Packing plan (espace): gas limit: {:?}, tx min price: {:?}",
788 packing_gas_limit, tx_min_price
789 );
790 let (sampled_tx, used_gas, used_size) =
791 self.deferred_pool.packing_sampler(
792 Space::Ethereum,
793 packing_gas_limit,
794 block_size_limit,
795 num_txs,
796 tx_min_price,
797 &validity,
798 );
799
800 let base_price = compute_next_price(
803 gas_target,
804 used_gas,
805 parent_base_price,
806 min_base_price,
807 );
808
809 if base_price <= tx_min_price {
810 debug!(
811 "Packing result (espace): gas used: {:?}, base price: {:?}",
812 used_gas, base_price
813 );
814 block_base_price[Space::Ethereum] = base_price;
815 packed_transactions.extend_from_slice(&sampled_tx);
816
817 (sampled_tx.len(), used_size)
818 } else {
819 warn!(
821 "Inconsistent packing result (espace): gas used: {:?}, base price: {:?}",
822 used_gas, base_price
823 );
824 block_base_price[Space::Ethereum] = compute_next_price(
825 gas_target,
826 U256::zero(),
827 parent_base_price,
828 min_base_price,
829 );
830 (0, 0)
831 }
832 } else {
833 (0, 0)
834 };
835
836 {
837 let gas_target =
838 cspace_block_gas_limit_after_cip1559(block_gas_limit)
839 / ELASTICITY_MULTIPLIER;
840 let parent_base_price = parent_base_price[Space::Native];
841 let min_base_price =
842 machine.params().min_base_price()[Space::Native];
843
844 let (packing_gas_limit, tx_min_price) =
845 self.deferred_pool.estimate_packing_gas_limit(
846 Space::Native,
847 gas_target,
848 parent_base_price,
849 min_base_price,
850 );
851
852 debug!(
853 "Packing plan (core space): gas limit: {:?}, tx min price: {:?}",
854 packing_gas_limit, tx_min_price
855 );
856
857 let (sampled_tx, used_gas, _) = self.deferred_pool.packing_sampler(
858 Space::Native,
859 packing_gas_limit,
860 block_size_limit - evm_used_size,
861 num_txs - evm_packed_tx_num,
862 tx_min_price,
863 &validity,
864 );
865
866 let base_price = compute_next_price(
869 gas_target,
870 used_gas,
871 parent_base_price,
872 min_base_price,
873 );
874
875 if base_price <= tx_min_price {
876 debug!(
877 "Packing result (core space): gas used: {:?}, base price: {:?}",
878 used_gas, base_price
879 );
880 block_base_price[Space::Native] = base_price;
881 packed_transactions.extend_from_slice(&sampled_tx);
882 } else {
883 warn!(
885 "Inconsistent packing result (core space): gas used: {:?}, base price: {:?}",
886 used_gas, base_price
887 );
888 block_base_price[Space::Native] = compute_next_price(
889 gas_target,
890 U256::zero(),
891 parent_base_price,
892 min_base_price,
893 );
894 }
895 }
896
897 if log::max_level() >= log::Level::Debug {
898 let mut rlp_s = RlpStream::new();
899 for tx in &packed_transactions {
900 rlp_s.append::<TransactionWithSignature>(&**tx);
901 }
902 debug!(
903 "After packing packed_transactions: {}, rlp size: {}",
904 packed_transactions.len(),
905 rlp_s.out().len(),
906 );
907 }
908
909 (packed_transactions, block_base_price)
910 }
911
912 pub fn notify_modified_accounts(
913 &mut self, accounts_from_execution: Vec<Account>,
914 ) {
915 for account in &accounts_from_execution {
916 self.recalculate_readiness_with_fixed_info(
917 account.address(),
918 account.nonce,
919 account.balance,
920 );
921 }
922 }
923
924 pub fn content(
926 &self, address: Option<AddressWithSpace>,
927 ) -> (Vec<Arc<SignedTransaction>>, Vec<Arc<SignedTransaction>>) {
928 let ready_txs = match address {
929 Some(addr) => self
930 .deferred_pool
931 .ready_transactions_by_address(addr)
932 .map_or(vec![], |x| x.to_vec()),
933 None => self
934 .deferred_pool
935 .all_ready_transactions()
936 .cloned()
937 .collect(),
938 };
939
940 let deferred_txs = self
941 .txs
942 .values()
943 .filter(|tx| address == None || tx.sender() == address.unwrap())
944 .map(|v| v.clone())
945 .collect();
946
947 (ready_txs, deferred_txs)
948 }
949
950 pub fn eth_content(
951 &self, space: Option<Space>,
952 ) -> (
953 BTreeMap<AddressWithSpace, BTreeMap<U256, Arc<SignedTransaction>>>,
954 BTreeMap<AddressWithSpace, BTreeMap<U256, Arc<SignedTransaction>>>,
955 ) {
956 let get_local_nonce_and_balance = |addr: &AddressWithSpace| {
957 self.ready_nonces_and_balances
958 .get(addr)
959 .map(|x| *x)
960 .unwrap_or_default()
961 };
962 self.deferred_pool
963 .eth_content(space, get_local_nonce_and_balance)
964 }
965
966 pub fn eth_content_from(
967 &self, from: AddressWithSpace,
968 ) -> (
969 BTreeMap<U256, Arc<SignedTransaction>>,
970 BTreeMap<U256, Arc<SignedTransaction>>,
971 ) {
972 let (local_nonce, local_balance) =
973 self.get_local_nonce_and_balance(&from).unwrap_or_default();
974 self.deferred_pool
975 .eth_content_from(from, local_nonce, local_balance)
976 }
977
978 pub fn insert_transaction_with_readiness_check(
982 &mut self, state: &StateProvider, transaction: Arc<SignedTransaction>,
983 packed: bool, force: bool,
984 ) -> Result<(), TransactionPoolError> {
985 let _timer = MeterTimer::time_func(TX_POOL_INNER_INSERT_TIMER.as_ref());
986 let (sponsored_gas, sponsored_storage) =
987 self.get_sponsored_gas_and_storage(state, &transaction)?;
988
989 let (state_nonce, state_balance) =
990 state.get_nonce_and_balance(&transaction.sender())?;
991
992 if transaction.hash[0] & 254 == 0 {
993 trace!(
994 "Transaction {:?} sender: {:?} current nonce: {:?}, state nonce:{:?}",
995 transaction.hash, transaction.sender, transaction.nonce(), state_nonce
996 );
997 }
998 if *transaction.nonce()
999 >= state_nonce
1000 + U256::from(FURTHEST_FUTURE_TRANSACTION_NONCE_OFFSET)
1001 {
1002 trace!(
1003 "Transaction {:?} is discarded due to in too distant future",
1004 transaction.hash()
1005 );
1006 return Err(TransactionPoolError::NonceTooDistant {
1007 hash: transaction.hash(),
1008 nonce: *transaction.nonce(),
1009 });
1010 } else if !packed && *transaction.nonce() < state_nonce
1012 {
1013 trace!(
1014 "Transaction {:?} is discarded due to a too stale nonce, self.nonce()={}, state_nonce={}",
1015 transaction.hash(), transaction.nonce(), state_nonce,
1016 );
1017 return Err(TransactionPoolError::NonceTooStale {
1018 hash: transaction.hash(),
1019 nonce: *transaction.nonce(),
1020 });
1021 }
1022
1023 if !packed && !force {
1025 let mut need_balance = U256::from(0);
1026 let estimate_gas_fee = Self::cal_gas_fee(
1027 transaction.gas().clone(),
1028 transaction.gas_price().clone(),
1029 );
1030 match transaction.unsigned {
1031 Transaction::Native(ref utx) => {
1032 need_balance += utx.value().clone();
1033 if sponsored_gas == U256::from(0) {
1034 need_balance += estimate_gas_fee;
1035 }
1036 if sponsored_storage == 0 {
1037 need_balance += U256::from(*utx.storage_limit())
1038 * *DRIPS_PER_STORAGE_COLLATERAL_UNIT;
1039 }
1040 }
1041 Transaction::Ethereum(ref utx) => {
1042 need_balance += utx.value().clone();
1043 need_balance += estimate_gas_fee;
1044 }
1045 }
1046
1047 if need_balance > state_balance {
1048 let msg = format!(
1049 "Transaction {:?} is discarded due to out of balance, needs {:?} but account balance is {:?}",
1050 transaction.hash(),
1051 need_balance,
1052 state_balance
1053 );
1054 trace!("{}", msg);
1055 return Err(TransactionPoolError::OutOfBalance {
1056 need: need_balance,
1057 have: state_balance,
1058 hash: transaction.hash(),
1059 });
1060 }
1061 }
1062
1063 let result = self.insert_transaction_without_readiness_check(
1064 transaction.clone(),
1065 packed,
1066 force,
1067 (state_nonce, state_balance),
1068 (sponsored_gas, sponsored_storage),
1069 );
1070 if let InsertResult::Failed(err) = result {
1071 return Err(err);
1072 }
1073
1074 self.recalculate_readiness_with_state(&transaction.sender(), state)?;
1075
1076 Ok(())
1077 }
1078
1079 fn cal_gas_fee(gas: U256, gas_price: U256) -> U256 {
1080 let estimated_gas_u512 = gas.full_mul(gas_price);
1081 let estimated_gas =
1083 if estimated_gas_u512 > U512::from(U128::max_value()) {
1084 U256::from(U128::max_value())
1085 } else {
1086 gas * gas_price
1087 };
1088 estimated_gas
1089 }
1090
1091 pub fn get_sponsored_gas_and_storage(
1092 &self, state: &StateProvider, transaction: &SignedTransaction,
1093 ) -> StateDbResult<(U256, u64)> {
1094 let sender = transaction.sender();
1095
1096 let utx = if let Transaction::Native(ref utx) = transaction.unsigned {
1098 utx
1099 } else {
1100 return Ok(Default::default());
1101 };
1102
1103 let contract_address = match utx.action() {
1105 Action::Call(callee) if callee.is_contract_address() => callee,
1106 _ => {
1107 return Ok(Default::default());
1108 }
1109 };
1110
1111 let sponsor_info = if let Some(sponsor_info) =
1113 state.get_sponsor_info(&contract_address)?
1114 {
1115 sponsor_info
1116 } else {
1117 return Ok(Default::default());
1118 };
1119
1120 if !state
1122 .check_commission_privilege(&contract_address, &sender.address)?
1123 {
1124 return Ok(Default::default());
1125 }
1126
1127 let estimated_gas = Self::cal_gas_fee(
1129 transaction.gas().clone(),
1130 transaction.gas_price().clone(),
1131 );
1132 let sponsored_gas = if estimated_gas <= sponsor_info.sponsor_gas_bound
1133 && estimated_gas <= sponsor_info.sponsor_balance_for_gas
1134 {
1135 utx.gas().clone()
1136 } else {
1137 0.into()
1138 };
1139
1140 let estimated_collateral = U256::from(*utx.storage_limit())
1141 * *DRIPS_PER_STORAGE_COLLATERAL_UNIT;
1142 let sponsored_collateral = if estimated_collateral
1143 <= sponsor_info.sponsor_balance_for_collateral
1144 + sponsor_info.unused_storage_points()
1145 {
1146 *utx.storage_limit()
1147 } else {
1148 0
1149 };
1150
1151 Ok((sponsored_gas, sponsored_collateral))
1152 }
1153}
1154
1155#[cfg(test)]
1156mod tests {
1157 use crate::verification::PackingCheckResult;
1158
1159 use super::TransactionPoolInner;
1160 use crate::keylib::{Generator, KeyPair, Random};
1161 use cfx_executor::{
1162 machine::{Machine, VmFactory},
1163 spec::CommonParams,
1164 };
1165 use cfx_parameters::block::{
1166 cspace_block_gas_limit_after_cip1559, espace_block_gas_limit,
1167 };
1168 use cfx_types::{Address, Space, SpaceMap, U256};
1169 use itertools::Itertools;
1170 use primitives::{
1171 block_header::compute_next_price_tuple,
1172 transaction::{
1173 native_transaction::NativeTransaction, Eip155Transaction,
1174 },
1175 Action, SignedTransaction, Transaction,
1176 };
1177 use std::sync::Arc;
1178
1179 fn new_test_tx(
1180 sender: &KeyPair, nonce: usize, gas_price: usize, gas: usize,
1181 value: usize, space: Space,
1182 ) -> Arc<SignedTransaction> {
1183 let tx: Transaction = match space {
1184 Space::Native => NativeTransaction {
1185 nonce: U256::from(nonce),
1186 gas_price: U256::from(gas_price),
1187 gas: U256::from(gas),
1188 action: Action::Call(Address::random()),
1189 value: U256::from(value),
1190 storage_limit: 0,
1191 epoch_height: 0,
1192 chain_id: 1,
1193 data: Vec::new(),
1194 }
1195 .into(),
1196 Space::Ethereum => Eip155Transaction {
1197 nonce: U256::from(nonce),
1198 gas_price: U256::from(gas_price),
1199 gas: U256::from(gas),
1200 action: Action::Call(Address::random()),
1201 value: U256::from(value),
1202 chain_id: Some(1),
1203 data: Vec::new(),
1204 }
1205 .into(),
1206 };
1207 Arc::new(tx.sign(sender.secret()))
1208 }
1209
1210 fn pack_transactions_1559_checked(
1211 pool: &mut TransactionPoolInner, machine: &Machine,
1212 ) {
1213 let parent_base_price = SpaceMap::new(100, 200).map_all(U256::from);
1214 let block_gas_limit = U256::from(6000);
1215 let best_epoch_height = 20;
1216
1217 let (txs, base_price) = pool.pack_transactions_1559(
1218 usize::MAX,
1219 block_gas_limit,
1220 parent_base_price,
1221 usize::MAX,
1222 best_epoch_height,
1223 machine,
1224 |_| PackingCheckResult::Pack,
1225 );
1226
1227 let params = machine.params();
1228
1229 let core_gas_limit =
1230 cspace_block_gas_limit_after_cip1559(block_gas_limit);
1231 let eth_gas_limit = espace_block_gas_limit(
1232 params.can_pack_evm_transaction(best_epoch_height),
1233 block_gas_limit,
1234 );
1235
1236 let gas_target =
1237 SpaceMap::new(core_gas_limit, eth_gas_limit).map_all(|x| x / 2);
1238
1239 let mut gas_used = SpaceMap::default();
1240 let mut min_gas_price =
1241 SpaceMap::new(U256::max_value(), U256::max_value());
1242
1243 for tx in txs {
1244 gas_used[tx.space()] += *tx.gas_limit();
1245 min_gas_price[tx.space()] =
1246 min_gas_price[tx.space()].min(*tx.gas_price());
1247 }
1248
1249 let min_base_price = params.min_base_price();
1250
1251 let expected_base_price = SpaceMap::zip4(
1252 gas_target,
1253 gas_used,
1254 parent_base_price,
1255 min_base_price,
1256 )
1257 .map_all(compute_next_price_tuple);
1258
1259 assert_eq!(expected_base_price, base_price);
1260 assert!(gas_used[Space::Native] <= core_gas_limit);
1261 assert!(gas_used[Space::Ethereum] <= eth_gas_limit);
1262
1263 for space in [Space::Native, Space::Ethereum] {
1264 assert!(base_price[space] <= min_gas_price[space]);
1265 }
1266 }
1267
1268 #[test]
1269 fn test_pack_eip1559_transactions() {
1270 let mut pool = TransactionPoolInner::new_for_test();
1271
1272 let mut params = CommonParams::default();
1273 params.min_base_price = SpaceMap::new(100, 200).map_all(U256::from);
1274
1275 let machine = Arc::new(Machine::new(params, VmFactory::default()));
1276
1277 let test_block_limit = SpaceMap::new(5400, 3000);
1278
1279 let senders: Vec<_> = (0..20)
1280 .into_iter()
1281 .map(|_| Random.generate().unwrap())
1282 .collect();
1283
1284 let tasks = [1, 2, 3]
1285 .into_iter()
1286 .cartesian_product(
1287 [50usize, 95, 100, 105, 150, 1000],
1288 )
1289 .cartesian_product(
1290 [5usize, 10, 40, 60, 100],
1291 )
1292 .cartesian_product([0usize, 1]);
1293
1294 for (((space_bits, gas_price), gas_limit_percent), price_inc) in tasks {
1295 let tx_gas_limit =
1296 test_block_limit.map_all(|x| x * gas_limit_percent / 100);
1297
1298 for (idx, sender) in senders.iter().enumerate() {
1299 let gas_price = gas_price + idx * price_inc;
1300
1301 if space_bits & 0x1 != 0 {
1302 let tx = new_test_tx(
1303 sender,
1304 0,
1305 gas_price,
1306 tx_gas_limit[Space::Native],
1307 0,
1308 Space::Native,
1309 );
1310 pool.insert_transaction_for_test(tx, U256::zero());
1311 }
1312
1313 if space_bits & 0x2 != 0 {
1314 let tx = new_test_tx(
1315 sender,
1316 0,
1317 gas_price * 2,
1318 tx_gas_limit[Space::Ethereum],
1319 0,
1320 Space::Ethereum,
1321 );
1322 pool.insert_transaction_for_test(tx, U256::zero());
1323 }
1324 }
1325 pack_transactions_1559_checked(&mut pool, &machine);
1326 pool.clear();
1327 }
1328 }
1329}