1use std::convert::Infallible;
2
3use crate::{
4 packing_batch::{InsertError, PackInfo, PackingBatch, RemoveError},
5 sample::{SampleTag, TxSampler},
6 weight::PackingPoolWeight,
7 PackingPoolConfig,
8};
9
10use super::{
11 transaction::PackingPoolTransaction, treapmap_config::PackingPoolMap,
12};
13use cfx_types::U256;
14use log::trace;
15use malloc_size_of::MallocSizeOf;
16use primitives::block_header::{compute_next_price, estimate_max_possible_gas};
17use rand::RngCore;
18use treap_map::{
19 ApplyOpOutcome, ConsoliableWeight, Node, SearchDirection, SearchResult,
20 TreapMap,
21};
22
23pub struct PackingPool<TX: PackingPoolTransaction> {
26 treap_map: TreapMap<PackingPoolMap<TX>>,
27 config: PackingPoolConfig,
28}
29
30impl<TX: PackingPoolTransaction> PackingPool<TX> {
31 pub fn new(config: PackingPoolConfig) -> Self {
32 Self {
33 treap_map: TreapMap::new(),
34 config,
35 }
36 }
37
38 pub fn config(&self) -> &PackingPoolConfig { &self.config }
39
40 pub fn len(&self) -> usize { self.treap_map.len() }
41
42 pub fn iter(&self) -> impl Iterator<Item = &[TX]> + '_ {
43 self.treap_map.values().map(|x| &x.txs[..])
44 }
45
46 pub fn contains(&self, addr: &TX::Sender) -> bool {
47 self.treap_map.contains_key(addr)
48 }
49
50 pub fn get_transactions(&self, addr: &TX::Sender) -> Option<&[TX]> {
51 Some(&self.treap_map.get(addr)?.txs)
52 }
53
54 pub fn clear(&mut self) { self.treap_map = TreapMap::new(); }
55
56 #[inline]
57 pub fn insert(&mut self, tx: TX) -> (Vec<TX>, Result<(), InsertError>) {
58 let config = &self.config;
59 let tx_hash = tx.hash();
60 let tx_clone = tx.clone();
61 let sender = tx.sender();
62
63 let update = move |node: &mut Node<PackingPoolMap<TX>>| -> Result<_, Infallible> {
64 let old_info = node.value.pack_info();
65 let out = node.value.insert(tx, config);
66 let new_info = node.value.pack_info();
67
68 Ok(make_apply_outcome(old_info, new_info, node, config, out))
69 };
70
71 let insert = move |rng: &mut dyn RngCore| {
72 let node = PackingBatch::new(tx_clone).make_node(config, rng);
73 Ok((node, (vec![], Ok(()))))
74 };
75
76 let (replaced, outcome) =
77 self.treap_map.update(&sender, update, insert).unwrap();
78 match &outcome {
79 Ok(()) => {
80 trace!("packing_pool::insert success hash={:?}", tx_hash);
81 }
82 Err(e) => {
83 trace!(
84 "packing_pool::insert failed hash={:?} err={:?}",
85 tx_hash,
86 e
87 );
88 }
89 }
90 for tx in &replaced {
91 trace!("packing_pool::insert evicted hash={:?}", tx.hash());
92 }
93 (replaced, outcome)
94 }
95
96 pub fn replace(&mut self, mut packing_batch: PackingBatch<TX>) -> Vec<TX> {
97 let config = &self.config;
98 let sender = packing_batch.sender();
99 let packing_batch_clone = packing_batch.clone();
100 for tx in &packing_batch_clone.txs {
101 trace!("packing_pool::replace incoming hash={:?}", tx.hash());
102 }
103
104 let update = move |node: &mut Node<PackingPoolMap<TX>>| -> Result<_, Infallible> {
105 let old_info = node.value.pack_info();
106 std::mem::swap(&mut packing_batch, &mut node.value);
107 let new_info = node.value.pack_info();
108 let out = std::mem::take(&mut packing_batch.txs);
109
110 Ok(make_apply_outcome(old_info, new_info, node, config, out))
111 };
112
113 let insert = move |rng: &mut dyn RngCore| {
114 let node = packing_batch_clone.make_node(config, rng);
115 Ok((node, vec![]))
116 };
117
118 let evicted = self.treap_map.update(&sender, update, insert).unwrap();
119 for tx in &evicted {
120 trace!("packing_pool::replace evicted hash={:?}", tx.hash());
121 }
122 evicted
123 }
124
125 pub fn remove(&mut self, sender: TX::Sender) -> Vec<TX> {
126 trace!("packing_pool::remove sender={:?}", sender);
127 self.split_off_suffix(sender, &U256::zero())
128 }
129
130 pub fn split_off_suffix(
131 &mut self, sender: TX::Sender, start_nonce: &U256,
132 ) -> Vec<TX> {
133 self.split_off(sender, start_nonce, true)
134 }
135
136 pub fn split_off_prefix(
137 &mut self, sender: TX::Sender, start_nonce: &U256,
138 ) -> Vec<TX> {
139 self.split_off(sender, start_nonce, false)
140 }
141
142 fn split_off(
143 &mut self, sender: TX::Sender, start_nonce: &U256, keep_prefix: bool,
144 ) -> Vec<TX> {
145 let config = &self.config;
146 trace!(
147 "packing_pool::split_off sender={:?} start_nonce={} keep_prefix={}",
148 sender,
149 start_nonce,
150 keep_prefix
151 );
152 let update = move |node: &mut Node<PackingPoolMap<TX>>| {
153 let old_info = node.value.pack_info();
154
155 let out =
156 match node.value.split_off_by_nonce(start_nonce, keep_prefix) {
157 Ok(out) => out,
158 Err(RemoveError::ShouldDelete) => {
159 return Ok(node.value.make_outcome_on_delete());
160 }
161 };
162
163 let new_info = node.value.pack_info();
164
165 Ok(make_apply_outcome(old_info, new_info, node, config, out))
166 };
167 let removed = self
168 .treap_map
169 .update(&sender, update, |_| Err(()))
170 .unwrap_or(vec![]);
171 if removed.is_empty() {
172 trace!(
173 "packing_pool::split_off sender={:?} start_nonce={} keep_prefix={} nothing removed",
174 sender,
175 start_nonce,
176 keep_prefix
177 );
178 } else {
179 for tx in &removed {
180 trace!("packing_pool::split_off removed hash={:?}", tx.hash());
181 }
182 }
183 removed
184 }
185
186 pub fn tx_sampler<'a, 'b, R: RngCore>(
187 &'a self, rng: &'b mut R, block_gas_limit: U256,
188 ) -> impl Iterator<Item = (TX::Sender, &'a [TX], SampleTag)> + 'b
189 where 'a: 'b {
190 let global_loss_base =
191 if let Some(r) = self.truncate_loss_ratio(block_gas_limit) {
192 U256::MAX / r
194 } else {
195 U256::zero()
196 };
197 TxSampler::<'a, 'b, TX, R>::new(
198 self.treap_map.iter(),
199 global_loss_base,
200 rng,
201 )
202 }
203
204 pub fn truncate_loss_ratio(&self, block_gas_limit: U256) -> Option<U256> {
208 let ret = self.treap_map.search(|left_weight, node| {
209 if !can_sample(left_weight, block_gas_limit) {
210 return SearchDirection::Left;
211 }
212 let right_weight =
213 PackingPoolWeight::consolidate(left_weight, &node.weight);
214 if !can_sample(&right_weight, block_gas_limit) {
215 return SearchDirection::Stop;
216 } else {
217 return SearchDirection::Right(right_weight);
218 }
219 });
220 match ret {
221 Some(
222 SearchResult::Found { base_weight, .. }
223 | SearchResult::RightMost(base_weight),
224 ) if base_weight.gas_limit > block_gas_limit => Some(
225 base_weight.weighted_loss_ratio
226 / (base_weight.gas_limit - block_gas_limit),
227 ),
228 _ => None,
229 }
230 }
231
232 pub fn estimate_packing_gas_limit(
233 &self, gas_target: U256, parent_base_price: U256, min_base_price: U256,
234 ) -> U256 {
235 let ret = self.treap_map.search(|left_weight, node| {
236 let can_sample = |weight| {
237 can_sample_within_1559(
238 weight,
239 gas_target,
240 parent_base_price,
241 min_base_price,
242 )
243 };
244
245 if !can_sample(&left_weight) {
246 return SearchDirection::Left;
247 }
248 let right_weight =
249 PackingPoolWeight::consolidate(left_weight, &node.weight);
250 if !can_sample(&right_weight) {
251 return SearchDirection::Stop;
252 } else {
253 return SearchDirection::Right(right_weight);
254 }
255 });
256 match ret {
257 Some(
258 SearchResult::Found { base_weight, .. }
259 | SearchResult::RightMost(base_weight),
260 ) => {
261 let gas_limit = estimate_max_possible_gas(
262 gas_target,
263 base_weight.min_gas_price,
264 parent_base_price,
265 );
266 if cfg!(test) {
267 let next_price = compute_next_price(
269 gas_target,
270 gas_limit,
271 parent_base_price,
272 min_base_price,
273 );
274 assert!(base_weight.min_gas_price >= next_price);
275 }
276 gas_limit
277 }
278 _ => U256::zero(),
279 }
280 }
281
282 #[cfg(test)]
283 fn assert_consistency(&self) {
284 self.treap_map.assert_consistency();
285 for node in self.treap_map.iter() {
286 let weight = &node.weight;
287 let packing_batch = &node.value;
288 packing_batch.assert_constraints();
289 let loss_ratio =
290 self.config.loss_ratio(packing_batch.first_gas_price());
291 let gas_limit = packing_batch.total_gas_limit();
292 assert_eq!(gas_limit, weight.gas_limit);
293 assert_eq!(loss_ratio, weight.max_loss_ratio);
294 assert_eq!(loss_ratio * gas_limit, weight.weighted_loss_ratio);
295 }
296 }
297}
298
299fn make_apply_outcome<TX: PackingPoolTransaction, T>(
300 old_info: PackInfo, new_info: PackInfo,
301 node: &mut Node<PackingPoolMap<TX>>, config: &PackingPoolConfig, out: T,
302) -> ApplyOpOutcome<T> {
303 let change_gas_price = old_info.first_gas_price != new_info.first_gas_price;
304 let change_gas_limit = old_info.total_gas_limit != new_info.total_gas_limit;
305
306 let mut update_weight = false;
307 let mut update_key = false;
308
309 if change_gas_price {
310 let gas_price = new_info.first_gas_price;
311 node.sort_key = gas_price;
312 node.weight.max_loss_ratio = config.loss_ratio(gas_price);
313 node.weight.gas_limit = new_info.total_gas_limit;
314 node.weight.weighted_loss_ratio =
315 new_info.total_gas_limit * node.weight.max_loss_ratio;
316
317 update_key = true;
318 update_weight = true;
319 } else if change_gas_limit {
320 node.weight.gas_limit = new_info.total_gas_limit;
321 node.weight.weighted_loss_ratio =
322 new_info.total_gas_limit * node.weight.max_loss_ratio;
323
324 update_weight = true;
325 }
326
327 ApplyOpOutcome {
328 out,
329 update_key,
330 update_weight,
331 delete_item: false,
332 }
333}
334fn can_sample(weight: &PackingPoolWeight, gas_limit: U256) -> bool {
335 if weight.gas_limit <= gas_limit {
336 return true;
337 }
338 weight
339 .max_loss_ratio
340 .saturating_mul(weight.gas_limit - gas_limit)
341 < weight.weighted_loss_ratio
342}
343
344fn can_sample_within_1559(
345 weight: &PackingPoolWeight, gas_target: U256, parent_base_price: U256,
346 min_base_price: U256,
347) -> bool {
348 if weight.min_gas_price < min_base_price {
349 return false;
350 }
351
352 let max_target_gas_used = estimate_max_possible_gas(
353 gas_target,
354 weight.min_gas_price,
355 parent_base_price,
356 );
357
358 if max_target_gas_used.is_zero() {
359 return false;
360 }
361
362 if weight.gas_limit <= max_target_gas_used {
363 return true;
364 }
365
366 weight
367 .max_loss_ratio
368 .saturating_mul(weight.gas_limit - max_target_gas_used)
369 < weight.weighted_loss_ratio
370}
371
372impl<TX> MallocSizeOf for PackingPool<TX>
373where
374 TX: PackingPoolTransaction + MallocSizeOf,
375 TX::Sender: MallocSizeOf,
376{
377 fn size_of(&self, ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
378 self.treap_map.size_of(ops) + self.config.size_of(ops)
379 }
380}
381
382#[cfg(test)]
383mod pool_tests {
384 use std::{collections::HashSet, sync::atomic::AtomicUsize};
385
386 use rand::SeedableRng;
387 use rand_xorshift::XorShiftRng;
388
389 use crate::{
390 mock_tx::MockTransaction, transaction::PackingPoolTransaction,
391 PackingBatch, PackingPool, PackingPoolConfig, SampleTag,
392 };
393
394 fn default_pool(
395 len: usize, accounts: usize,
396 ) -> PackingPool<MockTransaction> {
397 let config = PackingPoolConfig::new_for_test();
398 let mut pool = PackingPool::new(config);
399 for accound_id in 0u64..accounts as u64 {
400 let mut batch = PackingBatch::new(default_tx(accound_id, 2));
401 for i in 3..(len as u64 + 2) {
402 batch.insert(default_tx(accound_id, i), &config).1.unwrap();
403 }
404 pool.replace(batch);
405 }
406 pool.assert_consistency();
407 assert_eq!(pool.treap_map.len(), accounts);
408 pool
409 }
410
411 fn default_tx(sender: u64, i: u64) -> MockTransaction {
412 static ID: AtomicUsize = AtomicUsize::new(0);
413 MockTransaction {
414 sender,
415 nonce: i,
416 gas_price: i,
417 gas_limit: i,
418 id: ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
419 }
420 }
421
422 #[allow(dead_code)]
423 fn same_price_txs() -> PackingPool<MockTransaction> {
424 let config = PackingPoolConfig::new_for_test();
425 let mut pool = PackingPool::new(config);
426
427 static ID: AtomicUsize = AtomicUsize::new(0);
428 for i in 1000..2000 {
429 let (_, res) = pool.insert(MockTransaction {
430 sender: i,
431 nonce: 0,
432 gas_price: 20,
433 gas_limit: 1,
434 id: ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
435 });
436 res.unwrap();
437 }
438 pool
439 }
440
441 #[test]
442 fn test_split_in_middle() {
443 let mut pool = default_pool(5, 10);
444 let txs = pool.split_off_prefix(2, &4.into());
445 pool.assert_consistency();
446 assert_eq!(pool.treap_map.len(), 10);
447 assert_eq!(txs.len(), 2);
448 for (idx, tx) in txs.into_iter().enumerate() {
449 assert_eq!(tx.sender(), 2);
450 assert_eq!(tx.nonce(), (2 + idx).into());
451 }
452 assert_eq!(pool.iter().into_iter().flatten().count(), 48);
453 }
454
455 #[test]
456 fn test_split_all() {
457 let mut pool = default_pool(5, 10);
458 let txs = pool.split_off_suffix(2, &2.into());
459 pool.assert_consistency();
460 assert_eq!(pool.treap_map.len(), 9);
461 assert!(!pool.treap_map.contains_key(&2));
462 assert_eq!(txs.len(), 5);
463 for (idx, tx) in txs.into_iter().enumerate() {
464 assert_eq!(tx.sender(), 2);
465 assert_eq!(tx.nonce(), (2 + idx).into());
466 }
467
468 let txs = pool.remove(3);
469 pool.assert_consistency();
470 assert_eq!(pool.treap_map.len(), 8);
471 assert!(!pool.treap_map.contains_key(&3));
472 assert_eq!(txs.len(), 5);
473 for (idx, tx) in txs.into_iter().enumerate() {
474 assert_eq!(tx.sender(), 3);
475 assert_eq!(tx.nonce(), (2 + idx).into());
476 }
477 assert_eq!(pool.iter().into_iter().flatten().count(), 40);
478 }
479
480 #[test]
481 fn test_change_price() {
482 let mut pool = default_pool(5, 10);
483 let mut new_tx = default_tx(2, 2);
484 new_tx.gas_price = 10;
485 let (tx, res) = pool.insert(new_tx);
486 assert_eq!(tx.first().unwrap().nonce(), 2.into());
487 res.unwrap();
488 pool.assert_consistency();
489 assert_eq!(pool.treap_map.len(), 10);
490 let first = pool.treap_map.iter().next().unwrap();
491 assert_eq!(first.value.sender(), 2);
492 assert_eq!(pool.iter().into_iter().flatten().count(), 46);
493 }
494
495 #[test]
496 fn test_change_limit() {
497 let mut pool = default_pool(5, 10);
498 let mut new_tx = default_tx(2, 2);
499 new_tx.gas_limit = 10;
500 let (tx, res) = pool.insert(new_tx);
501 assert_eq!(tx.first().unwrap().nonce(), 2.into());
502 res.unwrap();
503 pool.assert_consistency();
504 assert_eq!(pool.treap_map.len(), 10);
505 assert_eq!(pool.iter().into_iter().flatten().count(), 50);
506 }
507
508 #[test]
509 fn test_insert_empty_sender() {
510 let mut pool = default_pool(5, 10);
511 let new_tx = default_tx(11, 2);
512 let (_tx, res) = pool.insert(new_tx);
513 res.unwrap();
514 pool.assert_consistency();
515 assert_eq!(pool.treap_map.len(), 11);
516 assert_eq!(pool.iter().into_iter().flatten().count(), 51);
517 }
518
519 #[test]
520 fn test_replace() {
521 let mut pool = default_pool(5, 10);
522 let mut batch = PackingBatch::new(default_tx(2, 12));
523 for i in 13..18 {
524 batch.insert(default_tx(2, i), &pool.config).1.unwrap();
525 }
526 let txs = pool.replace(batch);
527 for (idx, tx) in txs.into_iter().enumerate() {
528 assert_eq!(tx.sender(), 2);
529 assert_eq!(tx.nonce(), (2 + idx).into());
530 }
531 pool.assert_consistency();
532 assert_eq!(pool.treap_map.len(), 10);
533 assert_eq!(pool.iter().into_iter().flatten().count(), 51);
534 }
535
536 #[test]
537 fn test_same_price() {
538 let pool = default_pool(5, 100000);
539
540 let pack_txs = || {
541 let mut rng = XorShiftRng::from_os_rng();
542
543 let mut packed = HashSet::new();
544 for (_, txs, tag) in pool.tx_sampler(&mut rng, 40000.into()) {
545 if packed.len() < 8000 {
546 assert_eq!(tag, SampleTag::RandomPick);
549 }
550
551 for tx in txs {
552 packed.insert(tx.clone());
553 }
554 if packed.len() >= 10000 {
555 break;
556 }
557 }
558 packed
559 };
560
561 let base_pack = pack_txs();
562 let mut total_same_set = 0usize;
563
564 for _ in 0..5 {
565 total_same_set += pack_txs().intersection(&base_pack).count();
566 }
567 assert!(total_same_set < 2000);
570 }
571}
572
573#[cfg(test)]
574mod sample_tests {
575 use std::{cmp::Reverse, collections::HashSet, sync::atomic::AtomicUsize};
576
577 use crate::{
578 mock_tx::MockTransaction, sample::SampleTag,
579 transaction::PackingPoolTransaction, PackingPool, PackingPoolConfig,
580 };
581 use cfx_types::U256;
582 use rand::{distr::Uniform, Rng, SeedableRng};
583 use rand_xorshift::XorShiftRng;
584
585 #[derive(Default)]
586 struct MockPriceBook(Vec<MockTransaction>);
587 impl MockPriceBook {
588 fn truncate_loss_ratio(&self, block_limit: usize) -> Option<U256> {
589 let config = PackingPoolConfig::new_for_test();
590 let mut total_gas_limit = U256::zero();
591 let mut total_weighted_loss = U256::zero();
592 let mut last_ans = None;
593 for tx in self.0.iter() {
594 total_gas_limit += tx.gas_limit();
595 let loss_ratio = config.loss_ratio(tx.gas_price());
596 total_weighted_loss += loss_ratio * tx.gas_limit();
597 if let Some(quot) =
598 total_gas_limit.checked_sub(block_limit.into())
599 {
600 if quot * loss_ratio >= total_weighted_loss {
601 return Some(last_ans.unwrap());
602 }
603 if quot > U256::zero() {
604 last_ans = Some(total_weighted_loss / quot);
605 }
606 }
607 }
608 last_ans
609 }
610 }
611
612 fn default_tx(
613 sender: u64, gas_limit: u64, gas_price: u64,
614 ) -> MockTransaction {
615 static ID: AtomicUsize = AtomicUsize::new(0);
616 MockTransaction {
617 sender,
618 nonce: 0,
619 gas_price,
620 gas_limit,
621 id: ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
622 }
623 }
624
625 #[test]
626 fn test_truncate_price_and_sample() {
627 let mut rand = XorShiftRng::from_os_rng();
628 let mut pool = PackingPool::new(PackingPoolConfig::new_for_test());
629 let mut mock_pool = MockPriceBook::default();
630 for i in 0..1000 {
631 let mut gas_limit = 1.01f64.powf(2000.0 + i as f64) as u64;
632 gas_limit -=
633 gas_limit / rand.sample(Uniform::new(50, 200).unwrap());
634 let mut gas_price = 1.01f64.powf(3000.0 - i as f64) as u64;
635 gas_price -=
636 gas_price / rand.sample(Uniform::new(50, 200).unwrap());
637
638 let tx = default_tx(i, gas_limit, gas_price);
639
640 mock_pool.0.push(tx);
641 let (_, res) = pool.insert(tx);
642 res.unwrap();
643 }
644 mock_pool.0.sort_by_key(|x| Reverse(x.gas_price()));
645 pool.assert_consistency();
646 for i in 1900..3500 {
647 let mut total_gas_limit = 1.01f64.powf(i as f64) as u64;
648 total_gas_limit -=
649 total_gas_limit / rand.sample(Uniform::new(50, 200).unwrap());
650 assert_eq!(
651 pool.truncate_loss_ratio(total_gas_limit.into()),
652 mock_pool.truncate_loss_ratio(total_gas_limit as usize)
653 );
654
655 let truncate_loss_ratio =
656 pool.truncate_loss_ratio(total_gas_limit.into());
657 let mut last_loss_ratio = truncate_loss_ratio;
658 let mut packing_set = HashSet::new();
666 for (_, txs, tag) in
667 pool.tx_sampler(&mut rand, total_gas_limit.into())
668 {
669 let tx = txs.first().unwrap();
670 let loss_ratio = pool.config().loss_ratio(tx.gas_price());
671 if tag != SampleTag::PriceDesc {
672 assert!(loss_ratio < truncate_loss_ratio.unwrap());
673 } else if let Some(r) = last_loss_ratio {
674 assert!(loss_ratio >= r);
675 last_loss_ratio = Some(loss_ratio);
676 }
677 packing_set.insert(tx.clone());
678
679 }
684 assert_eq!(packing_set.len(), 1000);
685 }
687 }
688}