cfx_packing_pool/
pool.rs

1use std::convert::Infallible;
2
3use crate::{
4    packing_batch::{InsertError, PackInfo, PackingBatch, RemoveError},
5    sample::{SampleTag, TxSampler},
6    weight::PackingPoolWeight,
7    PackingPoolConfig,
8};
9
10use super::{
11    transaction::PackingPoolTransaction, treapmap_config::PackingPoolMap,
12};
13use cfx_types::U256;
14use log::trace;
15use malloc_size_of::MallocSizeOf;
16use primitives::block_header::{compute_next_price, estimate_max_possible_gas};
17use rand::RngCore;
18use treap_map::{
19    ApplyOpOutcome, ConsoliableWeight, Node, SearchDirection, SearchResult,
20    TreapMap,
21};
22
23/// A `PackingPool` implementing random packing algorithm and supporting packing
24/// a series of transactions with the same nonce.
25pub struct PackingPool<TX: PackingPoolTransaction> {
26    treap_map: TreapMap<PackingPoolMap<TX>>,
27    config: PackingPoolConfig,
28}
29
30impl<TX: PackingPoolTransaction> PackingPool<TX> {
31    pub fn new(config: PackingPoolConfig) -> Self {
32        Self {
33            treap_map: TreapMap::new(),
34            config,
35        }
36    }
37
38    pub fn config(&self) -> &PackingPoolConfig { &self.config }
39
40    pub fn len(&self) -> usize { self.treap_map.len() }
41
42    pub fn iter(&self) -> impl Iterator<Item = &[TX]> + '_ {
43        self.treap_map.values().map(|x| &x.txs[..])
44    }
45
46    pub fn contains(&self, addr: &TX::Sender) -> bool {
47        self.treap_map.contains_key(addr)
48    }
49
50    pub fn get_transactions(&self, addr: &TX::Sender) -> Option<&[TX]> {
51        Some(&self.treap_map.get(addr)?.txs)
52    }
53
54    pub fn clear(&mut self) { self.treap_map = TreapMap::new(); }
55
56    #[inline]
57    pub fn insert(&mut self, tx: TX) -> (Vec<TX>, Result<(), InsertError>) {
58        let config = &self.config;
59        let tx_hash = tx.hash();
60        let tx_clone = tx.clone();
61        let sender = tx.sender();
62
63        let update = move |node: &mut Node<PackingPoolMap<TX>>| -> Result<_, Infallible> {
64            let old_info = node.value.pack_info();
65            let out = node.value.insert(tx, config);
66            let new_info = node.value.pack_info();
67
68            Ok(make_apply_outcome(old_info, new_info, node, config, out))
69        };
70
71        let insert = move |rng: &mut dyn RngCore| {
72            let node = PackingBatch::new(tx_clone).make_node(config, rng);
73            Ok((node, (vec![], Ok(()))))
74        };
75
76        let (replaced, outcome) =
77            self.treap_map.update(&sender, update, insert).unwrap();
78        match &outcome {
79            Ok(()) => {
80                trace!("packing_pool::insert success hash={:?}", tx_hash);
81            }
82            Err(e) => {
83                trace!(
84                    "packing_pool::insert failed hash={:?} err={:?}",
85                    tx_hash,
86                    e
87                );
88            }
89        }
90        for tx in &replaced {
91            trace!("packing_pool::insert evicted hash={:?}", tx.hash());
92        }
93        (replaced, outcome)
94    }
95
96    pub fn replace(&mut self, mut packing_batch: PackingBatch<TX>) -> Vec<TX> {
97        let config = &self.config;
98        let sender = packing_batch.sender();
99        let packing_batch_clone = packing_batch.clone();
100        for tx in &packing_batch_clone.txs {
101            trace!("packing_pool::replace incoming hash={:?}", tx.hash());
102        }
103
104        let update = move |node: &mut Node<PackingPoolMap<TX>>| -> Result<_, Infallible> {
105            let old_info = node.value.pack_info();
106            std::mem::swap(&mut packing_batch, &mut node.value);
107            let new_info = node.value.pack_info();
108            let out = std::mem::take(&mut packing_batch.txs);
109
110            Ok(make_apply_outcome(old_info, new_info, node, config, out))
111        };
112
113        let insert = move |rng: &mut dyn RngCore| {
114            let node = packing_batch_clone.make_node(config, rng);
115            Ok((node, vec![]))
116        };
117
118        let evicted = self.treap_map.update(&sender, update, insert).unwrap();
119        for tx in &evicted {
120            trace!("packing_pool::replace evicted hash={:?}", tx.hash());
121        }
122        evicted
123    }
124
125    pub fn remove(&mut self, sender: TX::Sender) -> Vec<TX> {
126        trace!("packing_pool::remove sender={:?}", sender);
127        self.split_off_suffix(sender, &U256::zero())
128    }
129
130    pub fn split_off_suffix(
131        &mut self, sender: TX::Sender, start_nonce: &U256,
132    ) -> Vec<TX> {
133        self.split_off(sender, start_nonce, true)
134    }
135
136    pub fn split_off_prefix(
137        &mut self, sender: TX::Sender, start_nonce: &U256,
138    ) -> Vec<TX> {
139        self.split_off(sender, start_nonce, false)
140    }
141
142    fn split_off(
143        &mut self, sender: TX::Sender, start_nonce: &U256, keep_prefix: bool,
144    ) -> Vec<TX> {
145        let config = &self.config;
146        trace!(
147            "packing_pool::split_off sender={:?} start_nonce={} keep_prefix={}",
148            sender,
149            start_nonce,
150            keep_prefix
151        );
152        let update = move |node: &mut Node<PackingPoolMap<TX>>| {
153            let old_info = node.value.pack_info();
154
155            let out =
156                match node.value.split_off_by_nonce(start_nonce, keep_prefix) {
157                    Ok(out) => out,
158                    Err(RemoveError::ShouldDelete) => {
159                        return Ok(node.value.make_outcome_on_delete());
160                    }
161                };
162
163            let new_info = node.value.pack_info();
164
165            Ok(make_apply_outcome(old_info, new_info, node, config, out))
166        };
167        let removed = self
168            .treap_map
169            .update(&sender, update, |_| Err(()))
170            .unwrap_or(vec![]);
171        if removed.is_empty() {
172            trace!(
173                "packing_pool::split_off sender={:?} start_nonce={} keep_prefix={} nothing removed",
174                sender,
175                start_nonce,
176                keep_prefix
177            );
178        } else {
179            for tx in &removed {
180                trace!("packing_pool::split_off removed hash={:?}", tx.hash());
181            }
182        }
183        removed
184    }
185
186    pub fn tx_sampler<'a, 'b, R: RngCore>(
187        &'a self, rng: &'b mut R, block_gas_limit: U256,
188    ) -> impl Iterator<Item = (TX::Sender, &'a [TX], SampleTag)> + 'b
189    where 'a: 'b {
190        let global_loss_base =
191            if let Some(r) = self.truncate_loss_ratio(block_gas_limit) {
192                // It can never be zero.
193                U256::MAX / r
194            } else {
195                U256::zero()
196            };
197        TxSampler::<'a, 'b, TX, R>::new(
198            self.treap_map.iter(),
199            global_loss_base,
200            rng,
201        )
202    }
203
204    /// The maximum loss ratio that a gas_price is considered in random packing
205    /// algorithm. If the return value is `None`, all the transactions can
206    /// not fulfill the given `block_gas_limit`.
207    pub fn truncate_loss_ratio(&self, block_gas_limit: U256) -> Option<U256> {
208        let ret = self.treap_map.search(|left_weight, node| {
209            if !can_sample(left_weight, block_gas_limit) {
210                return SearchDirection::Left;
211            }
212            let right_weight =
213                PackingPoolWeight::consolidate(left_weight, &node.weight);
214            if !can_sample(&right_weight, block_gas_limit) {
215                return SearchDirection::Stop;
216            } else {
217                return SearchDirection::Right(right_weight);
218            }
219        });
220        match ret {
221            Some(
222                SearchResult::Found { base_weight, .. }
223                | SearchResult::RightMost(base_weight),
224            ) if base_weight.gas_limit > block_gas_limit => Some(
225                base_weight.weighted_loss_ratio
226                    / (base_weight.gas_limit - block_gas_limit),
227            ),
228            _ => None,
229        }
230    }
231
232    pub fn estimate_packing_gas_limit(
233        &self, gas_target: U256, parent_base_price: U256, min_base_price: U256,
234    ) -> U256 {
235        let ret = self.treap_map.search(|left_weight, node| {
236            let can_sample = |weight| {
237                can_sample_within_1559(
238                    weight,
239                    gas_target,
240                    parent_base_price,
241                    min_base_price,
242                )
243            };
244
245            if !can_sample(&left_weight) {
246                return SearchDirection::Left;
247            }
248            let right_weight =
249                PackingPoolWeight::consolidate(left_weight, &node.weight);
250            if !can_sample(&right_weight) {
251                return SearchDirection::Stop;
252            } else {
253                return SearchDirection::Right(right_weight);
254            }
255        });
256        match ret {
257            Some(
258                SearchResult::Found { base_weight, .. }
259                | SearchResult::RightMost(base_weight),
260            ) => {
261                let gas_limit = estimate_max_possible_gas(
262                    gas_target,
263                    base_weight.min_gas_price,
264                    parent_base_price,
265                );
266                if cfg!(test) {
267                    // Guarantee the searched result can be packed
268                    let next_price = compute_next_price(
269                        gas_target,
270                        gas_limit,
271                        parent_base_price,
272                        min_base_price,
273                    );
274                    assert!(base_weight.min_gas_price >= next_price);
275                }
276                gas_limit
277            }
278            _ => U256::zero(),
279        }
280    }
281
282    #[cfg(test)]
283    fn assert_consistency(&self) {
284        self.treap_map.assert_consistency();
285        for node in self.treap_map.iter() {
286            let weight = &node.weight;
287            let packing_batch = &node.value;
288            packing_batch.assert_constraints();
289            let loss_ratio =
290                self.config.loss_ratio(packing_batch.first_gas_price());
291            let gas_limit = packing_batch.total_gas_limit();
292            assert_eq!(gas_limit, weight.gas_limit);
293            assert_eq!(loss_ratio, weight.max_loss_ratio);
294            assert_eq!(loss_ratio * gas_limit, weight.weighted_loss_ratio);
295        }
296    }
297}
298
299fn make_apply_outcome<TX: PackingPoolTransaction, T>(
300    old_info: PackInfo, new_info: PackInfo,
301    node: &mut Node<PackingPoolMap<TX>>, config: &PackingPoolConfig, out: T,
302) -> ApplyOpOutcome<T> {
303    let change_gas_price = old_info.first_gas_price != new_info.first_gas_price;
304    let change_gas_limit = old_info.total_gas_limit != new_info.total_gas_limit;
305
306    let mut update_weight = false;
307    let mut update_key = false;
308
309    if change_gas_price {
310        let gas_price = new_info.first_gas_price;
311        node.sort_key = gas_price;
312        node.weight.max_loss_ratio = config.loss_ratio(gas_price);
313        node.weight.gas_limit = new_info.total_gas_limit;
314        node.weight.weighted_loss_ratio =
315            new_info.total_gas_limit * node.weight.max_loss_ratio;
316
317        update_key = true;
318        update_weight = true;
319    } else if change_gas_limit {
320        node.weight.gas_limit = new_info.total_gas_limit;
321        node.weight.weighted_loss_ratio =
322            new_info.total_gas_limit * node.weight.max_loss_ratio;
323
324        update_weight = true;
325    }
326
327    ApplyOpOutcome {
328        out,
329        update_key,
330        update_weight,
331        delete_item: false,
332    }
333}
334fn can_sample(weight: &PackingPoolWeight, gas_limit: U256) -> bool {
335    if weight.gas_limit <= gas_limit {
336        return true;
337    }
338    weight
339        .max_loss_ratio
340        .saturating_mul(weight.gas_limit - gas_limit)
341        < weight.weighted_loss_ratio
342}
343
344fn can_sample_within_1559(
345    weight: &PackingPoolWeight, gas_target: U256, parent_base_price: U256,
346    min_base_price: U256,
347) -> bool {
348    if weight.min_gas_price < min_base_price {
349        return false;
350    }
351
352    let max_target_gas_used = estimate_max_possible_gas(
353        gas_target,
354        weight.min_gas_price,
355        parent_base_price,
356    );
357
358    if max_target_gas_used.is_zero() {
359        return false;
360    }
361
362    if weight.gas_limit <= max_target_gas_used {
363        return true;
364    }
365
366    weight
367        .max_loss_ratio
368        .saturating_mul(weight.gas_limit - max_target_gas_used)
369        < weight.weighted_loss_ratio
370}
371
372impl<TX> MallocSizeOf for PackingPool<TX>
373where
374    TX: PackingPoolTransaction + MallocSizeOf,
375    TX::Sender: MallocSizeOf,
376{
377    fn size_of(&self, ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
378        self.treap_map.size_of(ops) + self.config.size_of(ops)
379    }
380}
381
382#[cfg(test)]
383mod pool_tests {
384    use std::{collections::HashSet, sync::atomic::AtomicUsize};
385
386    use rand::SeedableRng;
387    use rand_xorshift::XorShiftRng;
388
389    use crate::{
390        mock_tx::MockTransaction, transaction::PackingPoolTransaction,
391        PackingBatch, PackingPool, PackingPoolConfig, SampleTag,
392    };
393
394    fn default_pool(
395        len: usize, accounts: usize,
396    ) -> PackingPool<MockTransaction> {
397        let config = PackingPoolConfig::new_for_test();
398        let mut pool = PackingPool::new(config);
399        for accound_id in 0u64..accounts as u64 {
400            let mut batch = PackingBatch::new(default_tx(accound_id, 2));
401            for i in 3..(len as u64 + 2) {
402                batch.insert(default_tx(accound_id, i), &config).1.unwrap();
403            }
404            pool.replace(batch);
405        }
406        pool.assert_consistency();
407        assert_eq!(pool.treap_map.len(), accounts);
408        pool
409    }
410
411    fn default_tx(sender: u64, i: u64) -> MockTransaction {
412        static ID: AtomicUsize = AtomicUsize::new(0);
413        MockTransaction {
414            sender,
415            nonce: i,
416            gas_price: i,
417            gas_limit: i,
418            id: ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
419        }
420    }
421
422    #[allow(dead_code)]
423    fn same_price_txs() -> PackingPool<MockTransaction> {
424        let config = PackingPoolConfig::new_for_test();
425        let mut pool = PackingPool::new(config);
426
427        static ID: AtomicUsize = AtomicUsize::new(0);
428        for i in 1000..2000 {
429            let (_, res) = pool.insert(MockTransaction {
430                sender: i,
431                nonce: 0,
432                gas_price: 20,
433                gas_limit: 1,
434                id: ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
435            });
436            res.unwrap();
437        }
438        pool
439    }
440
441    #[test]
442    fn test_split_in_middle() {
443        let mut pool = default_pool(5, 10);
444        let txs = pool.split_off_prefix(2, &4.into());
445        pool.assert_consistency();
446        assert_eq!(pool.treap_map.len(), 10);
447        assert_eq!(txs.len(), 2);
448        for (idx, tx) in txs.into_iter().enumerate() {
449            assert_eq!(tx.sender(), 2);
450            assert_eq!(tx.nonce(), (2 + idx).into());
451        }
452        assert_eq!(pool.iter().into_iter().flatten().count(), 48);
453    }
454
455    #[test]
456    fn test_split_all() {
457        let mut pool = default_pool(5, 10);
458        let txs = pool.split_off_suffix(2, &2.into());
459        pool.assert_consistency();
460        assert_eq!(pool.treap_map.len(), 9);
461        assert!(!pool.treap_map.contains_key(&2));
462        assert_eq!(txs.len(), 5);
463        for (idx, tx) in txs.into_iter().enumerate() {
464            assert_eq!(tx.sender(), 2);
465            assert_eq!(tx.nonce(), (2 + idx).into());
466        }
467
468        let txs = pool.remove(3);
469        pool.assert_consistency();
470        assert_eq!(pool.treap_map.len(), 8);
471        assert!(!pool.treap_map.contains_key(&3));
472        assert_eq!(txs.len(), 5);
473        for (idx, tx) in txs.into_iter().enumerate() {
474            assert_eq!(tx.sender(), 3);
475            assert_eq!(tx.nonce(), (2 + idx).into());
476        }
477        assert_eq!(pool.iter().into_iter().flatten().count(), 40);
478    }
479
480    #[test]
481    fn test_change_price() {
482        let mut pool = default_pool(5, 10);
483        let mut new_tx = default_tx(2, 2);
484        new_tx.gas_price = 10;
485        let (tx, res) = pool.insert(new_tx);
486        assert_eq!(tx.first().unwrap().nonce(), 2.into());
487        res.unwrap();
488        pool.assert_consistency();
489        assert_eq!(pool.treap_map.len(), 10);
490        let first = pool.treap_map.iter().next().unwrap();
491        assert_eq!(first.value.sender(), 2);
492        assert_eq!(pool.iter().into_iter().flatten().count(), 46);
493    }
494
495    #[test]
496    fn test_change_limit() {
497        let mut pool = default_pool(5, 10);
498        let mut new_tx = default_tx(2, 2);
499        new_tx.gas_limit = 10;
500        let (tx, res) = pool.insert(new_tx);
501        assert_eq!(tx.first().unwrap().nonce(), 2.into());
502        res.unwrap();
503        pool.assert_consistency();
504        assert_eq!(pool.treap_map.len(), 10);
505        assert_eq!(pool.iter().into_iter().flatten().count(), 50);
506    }
507
508    #[test]
509    fn test_insert_empty_sender() {
510        let mut pool = default_pool(5, 10);
511        let new_tx = default_tx(11, 2);
512        let (_tx, res) = pool.insert(new_tx);
513        res.unwrap();
514        pool.assert_consistency();
515        assert_eq!(pool.treap_map.len(), 11);
516        assert_eq!(pool.iter().into_iter().flatten().count(), 51);
517    }
518
519    #[test]
520    fn test_replace() {
521        let mut pool = default_pool(5, 10);
522        let mut batch = PackingBatch::new(default_tx(2, 12));
523        for i in 13..18 {
524            batch.insert(default_tx(2, i), &pool.config).1.unwrap();
525        }
526        let txs = pool.replace(batch);
527        for (idx, tx) in txs.into_iter().enumerate() {
528            assert_eq!(tx.sender(), 2);
529            assert_eq!(tx.nonce(), (2 + idx).into());
530        }
531        pool.assert_consistency();
532        assert_eq!(pool.treap_map.len(), 10);
533        assert_eq!(pool.iter().into_iter().flatten().count(), 51);
534    }
535
536    #[test]
537    fn test_same_price() {
538        let pool = default_pool(5, 100000);
539
540        let pack_txs = || {
541            let mut rng = XorShiftRng::from_os_rng();
542
543            let mut packed = HashSet::new();
544            for (_, txs, tag) in pool.tx_sampler(&mut rng, 40000.into()) {
545                if packed.len() < 8000 {
546                    // This assertation may fails with a small probability
547                    // (~2^-64) even if every thing is correct.
548                    assert_eq!(tag, SampleTag::RandomPick);
549                }
550
551                for tx in txs {
552                    packed.insert(tx.clone());
553                }
554                if packed.len() >= 10000 {
555                    break;
556                }
557            }
558            packed
559        };
560
561        let base_pack = pack_txs();
562        let mut total_same_set = 0usize;
563
564        for _ in 0..5 {
565            total_same_set += pack_txs().intersection(&base_pack).count();
566        }
567        // This assertation may fails with a small probability (~2^-110) even if
568        // every thing is correct.
569        assert!(total_same_set < 2000);
570    }
571}
572
573#[cfg(test)]
574mod sample_tests {
575    use std::{cmp::Reverse, collections::HashSet, sync::atomic::AtomicUsize};
576
577    use crate::{
578        mock_tx::MockTransaction, sample::SampleTag,
579        transaction::PackingPoolTransaction, PackingPool, PackingPoolConfig,
580    };
581    use cfx_types::U256;
582    use rand::{distr::Uniform, Rng, SeedableRng};
583    use rand_xorshift::XorShiftRng;
584
585    #[derive(Default)]
586    struct MockPriceBook(Vec<MockTransaction>);
587    impl MockPriceBook {
588        fn truncate_loss_ratio(&self, block_limit: usize) -> Option<U256> {
589            let config = PackingPoolConfig::new_for_test();
590            let mut total_gas_limit = U256::zero();
591            let mut total_weighted_loss = U256::zero();
592            let mut last_ans = None;
593            for tx in self.0.iter() {
594                total_gas_limit += tx.gas_limit();
595                let loss_ratio = config.loss_ratio(tx.gas_price());
596                total_weighted_loss += loss_ratio * tx.gas_limit();
597                if let Some(quot) =
598                    total_gas_limit.checked_sub(block_limit.into())
599                {
600                    if quot * loss_ratio >= total_weighted_loss {
601                        return Some(last_ans.unwrap());
602                    }
603                    if quot > U256::zero() {
604                        last_ans = Some(total_weighted_loss / quot);
605                    }
606                }
607            }
608            last_ans
609        }
610    }
611
612    fn default_tx(
613        sender: u64, gas_limit: u64, gas_price: u64,
614    ) -> MockTransaction {
615        static ID: AtomicUsize = AtomicUsize::new(0);
616        MockTransaction {
617            sender,
618            nonce: 0,
619            gas_price,
620            gas_limit,
621            id: ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
622        }
623    }
624
625    #[test]
626    fn test_truncate_price_and_sample() {
627        let mut rand = XorShiftRng::from_os_rng();
628        let mut pool = PackingPool::new(PackingPoolConfig::new_for_test());
629        let mut mock_pool = MockPriceBook::default();
630        for i in 0..1000 {
631            let mut gas_limit = 1.01f64.powf(2000.0 + i as f64) as u64;
632            gas_limit -=
633                gas_limit / rand.sample(Uniform::new(50, 200).unwrap());
634            let mut gas_price = 1.01f64.powf(3000.0 - i as f64) as u64;
635            gas_price -=
636                gas_price / rand.sample(Uniform::new(50, 200).unwrap());
637
638            let tx = default_tx(i, gas_limit, gas_price);
639
640            mock_pool.0.push(tx);
641            let (_, res) = pool.insert(tx);
642            res.unwrap();
643        }
644        mock_pool.0.sort_by_key(|x| Reverse(x.gas_price()));
645        pool.assert_consistency();
646        for i in 1900..3500 {
647            let mut total_gas_limit = 1.01f64.powf(i as f64) as u64;
648            total_gas_limit -=
649                total_gas_limit / rand.sample(Uniform::new(50, 200).unwrap());
650            assert_eq!(
651                pool.truncate_loss_ratio(total_gas_limit.into()),
652                mock_pool.truncate_loss_ratio(total_gas_limit as usize)
653            );
654
655            let truncate_loss_ratio =
656                pool.truncate_loss_ratio(total_gas_limit.into());
657            let mut last_loss_ratio = truncate_loss_ratio;
658            // // Debug info
659            // eprintln!("===== Limit Level {} =======", i);
660            // if let Some(x) = truncate_loss_ratio {
661            //     let r = (u64::MAX as f64).log(1.01) - ((x>>64).as_u64() as
662            // f64).log(1.01);     let r = r *
663            // pool.config().loss_ratio_degree as f64;     eprintln!
664            // (">> Truncate price {:.2}", r); }
665            let mut packing_set = HashSet::new();
666            for (_, txs, tag) in
667                pool.tx_sampler(&mut rand, total_gas_limit.into())
668            {
669                let tx = txs.first().unwrap();
670                let loss_ratio = pool.config().loss_ratio(tx.gas_price());
671                if tag != SampleTag::PriceDesc {
672                    assert!(loss_ratio < truncate_loss_ratio.unwrap());
673                } else if let Some(r) = last_loss_ratio {
674                    assert!(loss_ratio >= r);
675                    last_loss_ratio = Some(loss_ratio);
676                }
677                packing_set.insert(tx.clone());
678
679                // let price_level = (tx.gas_price as f64).log(1.01);
680                // let limit_level = (tx.gas_limit as f64).log(1.01);
681                // eprintln!("{:?}: Price: {:.2}, Limit: {:.1}, Sender {}", tag,
682                // price_level, limit_level, tx.sender);
683            }
684            assert_eq!(packing_set.len(), 1000);
685            // eprintln!("");
686        }
687    }
688}