cfx_packing_pool/
packing_batch.rs

1use cfx_types::U256;
2use rand::RngCore;
3use treap_map::{ApplyOpOutcome, Node};
4
5use crate::{
6    transaction::PackingPoolTransaction, treapmap_config::PackingPoolMap,
7    weight::PackingPoolWeight, PackingPoolConfig,
8};
9use malloc_size_of_derive::MallocSizeOf;
10
11/// A batch of transactions that have the same sender and continuous nonces.
12///
13/// `PackingBatch` is designed to group transactions from the same sender that
14/// can be packed into the same block. This struct ensures that all transactions
15/// in the batch have the same sender and their nonces form a continuous
16/// sequence.
17#[derive(Default, Clone, Eq, PartialEq, Debug, MallocSizeOf)]
18pub struct PackingBatch<TX: PackingPoolTransaction> {
19    /// A list of transactions with the same sender and continuous nonces.
20    /// This vector is guaranteed to contain at least one transaction. The
21    /// transactions are sorted in the order of their nonces.
22    pub(crate) txs: Vec<TX>,
23
24    /// The total gas limit of all transactions in the batch.
25    total_gas_limit: U256,
26}
27
28#[derive(Debug, Eq, PartialEq, Clone, Copy)]
29pub enum InsertError {
30    LargeNonce,
31    TooLargeNonce,
32    ExceedAddrTxCount,
33    ExceedAddrGasLimit,
34    DecreasingGasPrice,
35}
36
37#[derive(Debug, Eq, PartialEq, Clone, Copy)]
38pub enum RemoveError {
39    ShouldDelete,
40}
41
42#[derive(Debug)]
43pub(crate) struct PackInfo {
44    pub first_gas_price: U256,
45    pub total_gas_limit: U256,
46}
47
48impl<TX: PackingPoolTransaction> PackingBatch<TX> {
49    pub fn new(tx: TX) -> Self {
50        let total_gas_limit = tx.gas_limit();
51        Self {
52            txs: vec![tx],
53            total_gas_limit,
54        }
55    }
56
57    #[inline]
58    pub fn sender(&self) -> TX::Sender { self.txs.first().unwrap().sender() }
59
60    #[inline]
61    pub fn start_nonce(&self) -> U256 { self.txs.first().unwrap().nonce() }
62
63    #[inline]
64    pub fn first_gas_price(&self) -> U256 {
65        self.txs.first().unwrap().gas_price()
66    }
67
68    #[inline]
69    pub fn total_gas_limit(&self) -> U256 { self.total_gas_limit }
70
71    #[inline]
72    pub(crate) fn pack_info(&self) -> PackInfo {
73        PackInfo {
74            first_gas_price: self.first_gas_price(),
75            total_gas_limit: self.total_gas_limit(),
76        }
77    }
78
79    #[inline]
80    pub fn len(&self) -> usize { self.txs.len() }
81
82    #[inline]
83    /// Inserts a transaction into the pool according to [`PackingPoolConfig`],
84    /// without violating the assumptions of [`PackingBatch`].
85    ///
86    /// # Returns
87    /// Returns a tuple consisting of:
88    /// - A vector of transactions that were replaced by the insertion. This can
89    ///   be empty if no transactions were displaced.
90    /// - A result indicating the success or failure of the insertion operation.
91    pub fn insert(
92        &mut self, mut tx: TX, config: &PackingPoolConfig,
93    ) -> (Vec<TX>, Result<(), InsertError>) {
94        use self::InsertError::*;
95        assert_eq!(tx.sender(), self.sender());
96
97        if tx.nonce() >= U256::MAX - 1 {
98            return (vec![], Err(TooLargeNonce));
99        }
100
101        let start_nonce = self.start_nonce();
102        let n_txs = self.len();
103
104        let txs = &mut self.txs;
105        if tx.nonce() + 1 < start_nonce
106            || (tx.nonce() + 1 == start_nonce
107                && tx.gas_price() > txs[0].gas_price())
108        {
109            let old_txs = std::mem::take(txs);
110            *self = Self::new(tx);
111            return (old_txs, Ok(()));
112        }
113
114        if tx.nonce() + 1 == start_nonce && tx.gas_price() <= txs[0].gas_price()
115        {
116            txs.insert(0, tx);
117            let (truncate_idx, addr_gas_limit) =
118                config.check_acceptable_batch(&*txs, None);
119            let ret = self.txs.split_off(truncate_idx);
120            self.total_gas_limit = addr_gas_limit;
121            return (ret, Ok(()));
122        }
123
124        if tx.nonce() > start_nonce + n_txs {
125            return (vec![], Err(LargeNonce));
126        }
127
128        if tx.nonce() == start_nonce + n_txs {
129            // Append tx
130            if n_txs >= config.address_tx_count {
131                return (vec![], Err(ExceedAddrTxCount));
132            }
133
134            if txs.last().unwrap().gas_price() > tx.gas_price() {
135                return (vec![], Err(DecreasingGasPrice));
136            }
137            if config.address_gas_limit
138                < self.total_gas_limit.saturating_add(tx.gas_limit())
139            {
140                return (vec![], Err(ExceedAddrGasLimit));
141            }
142
143            self.total_gas_limit += tx.gas_limit();
144            txs.push(tx);
145
146            (vec![], Ok(()))
147        } else {
148            // Replace
149            let to_replaced_idx = (tx.nonce() - start_nonce).as_usize();
150            if to_replaced_idx > 0
151                && tx.gas_price() < txs[to_replaced_idx - 1].gas_price()
152            {
153                let old_txs = self.txs.split_off(to_replaced_idx);
154                self.update_total_limit();
155                return (old_txs, Err(DecreasingGasPrice));
156            }
157
158            let (truncate_idx, addr_gas_limit) = config
159                .check_acceptable_batch(&*txs, Some((&tx, to_replaced_idx)));
160            if truncate_idx <= to_replaced_idx {
161                let old_txs = self.txs.split_off(to_replaced_idx);
162                self.update_total_limit();
163                return (old_txs, Err(ExceedAddrGasLimit));
164            }
165            let my_gas_price = tx.gas_price();
166
167            std::mem::swap(&mut txs[to_replaced_idx], &mut tx);
168
169            let mut res = vec![tx];
170            let truncated_txs;
171            if txs
172                .get(to_replaced_idx + 1)
173                .map_or(false, |tx| tx.gas_price() < my_gas_price)
174            {
175                truncated_txs = txs.split_off(to_replaced_idx + 1);
176                self.update_total_limit();
177            } else {
178                truncated_txs = txs.split_off(truncate_idx);
179                self.total_gas_limit = addr_gas_limit;
180            };
181            res.extend(truncated_txs);
182
183            (res, Ok(()))
184        }
185    }
186
187    /// Removes transactions starting from the specified index (included) and
188    /// returns them.
189    pub fn split_off_suffix(
190        &mut self, index: usize,
191    ) -> Result<Vec<TX>, RemoveError> {
192        self.split_off_inner(index, true)
193    }
194
195    /// Removes transactions ending at the specified index (not included) and
196    /// returns them.
197    pub fn split_off_prefix(
198        &mut self, index: usize,
199    ) -> Result<Vec<TX>, RemoveError> {
200        self.split_off_inner(index, false)
201    }
202
203    fn split_off_inner(
204        &mut self, index: usize, keep_prefix: bool,
205    ) -> Result<Vec<TX>, RemoveError> {
206        if index == 0 || index >= self.len() {
207            return if (index == 0) ^ keep_prefix {
208                Ok(vec![])
209            } else {
210                Err(RemoveError::ShouldDelete)
211            };
212        }
213
214        let mut res = self.txs.split_off(index);
215        if !keep_prefix {
216            std::mem::swap(&mut res, &mut self.txs);
217        }
218
219        self.update_total_limit();
220        Ok(res)
221    }
222
223    fn update_total_limit(&mut self) {
224        self.total_gas_limit = self
225            .txs
226            .iter()
227            .map(|x| x.gas_limit())
228            .fold(U256::zero(), |acc, e| acc + e);
229    }
230
231    /// Split transactions at the specified nonce (the specified one is in the
232    /// past half). Retains a half according to `keep_prefix` and returns the
233    /// rest half.
234    pub fn split_off_by_nonce(
235        &mut self, nonce: &U256, keep_prefix: bool,
236    ) -> Result<Vec<TX>, RemoveError> {
237        if *nonce < self.start_nonce() {
238            self.split_off_inner(0, keep_prefix)
239        } else if *nonce >= self.start_nonce() + self.len() {
240            self.split_off_inner(self.len(), keep_prefix)
241        } else {
242            self.split_off_inner(
243                (nonce - self.start_nonce()).as_usize(),
244                keep_prefix,
245            )
246        }
247    }
248
249    #[inline]
250    pub(crate) fn make_outcome_on_delete(&mut self) -> ApplyOpOutcome<Vec<TX>> {
251        let txs = std::mem::take(&mut self.txs);
252        ApplyOpOutcome {
253            out: txs,
254            update_weight: false,
255            update_key: false,
256            delete_item: true,
257        }
258    }
259
260    pub(crate) fn make_node(
261        self, config: &PackingPoolConfig, rng: &mut dyn RngCore,
262    ) -> treap_map::Node<PackingPoolMap<TX>> {
263        let key = self.txs.first().unwrap().sender();
264        let gas_price = self.first_gas_price();
265        let sort_key = gas_price;
266        let loss_ratio = config.loss_ratio(sort_key);
267        let weight = PackingPoolWeight {
268            gas_limit: self.total_gas_limit,
269            min_gas_price: gas_price,
270            weighted_loss_ratio: loss_ratio * self.total_gas_limit,
271            max_loss_ratio: loss_ratio,
272        };
273        Node::new(key, self, sort_key, weight, rng.next_u64())
274    }
275
276    #[cfg(test)]
277    pub fn assert_constraints(&self) {
278        assert!(self.txs.len() > 0);
279        for i in 0..(self.txs.len() - 1) {
280            assert_eq!(self.txs[i].sender(), self.txs[i + 1].sender());
281            assert_eq!(self.txs[i].nonce() + 1, self.txs[i + 1].nonce());
282            assert!(self.txs[i].gas_price() <= self.txs[i + 1].gas_price());
283        }
284        assert_eq!(
285            self.total_gas_limit.as_u128(),
286            self.txs.iter().map(|x| x.gas_limit().as_u128()).sum()
287        );
288    }
289
290    #[cfg(test)]
291    fn insert_test(
292        &mut self, tx: TX, config: &PackingPoolConfig,
293        expected_output: Vec<TX>, expected_status: Result<(), InsertError>,
294    ) where
295        TX: Copy + Ord + std::fmt::Debug,
296    {
297        let mut before_txs = self.txs.clone();
298        before_txs.push(tx);
299
300        let (res_txs, res) = self.insert(tx, &config);
301
302        let mut after_txs = res_txs.clone();
303        if res.is_err() {
304            after_txs.push(tx);
305        }
306        after_txs.extend(&self.txs);
307
308        before_txs.sort();
309        after_txs.sort();
310        assert_eq!(before_txs, after_txs);
311        self.assert_constraints();
312        assert_eq!(expected_output, res_txs);
313        assert_eq!(expected_status, res);
314    }
315}
316
317#[cfg(test)]
318mod tests {
319    use std::sync::atomic::AtomicUsize;
320
321    use super::InsertError::*;
322    use crate::{mock_tx::MockTransaction, PackingBatch, PackingPoolConfig};
323
324    fn default_batch(len: usize) -> PackingBatch<MockTransaction> {
325        let config = PackingPoolConfig::new_for_test();
326        let mut batch = PackingBatch::new(default_tx(2));
327        for i in 3..(len as u64 + 2) {
328            batch.insert(default_tx(i), &config).1.unwrap();
329        }
330        batch.assert_constraints();
331        batch
332    }
333
334    fn default_tx(i: u64) -> MockTransaction {
335        static ID: AtomicUsize = AtomicUsize::new(0);
336        MockTransaction {
337            sender: 0,
338            nonce: i,
339            gas_price: i,
340            gas_limit: i,
341            id: ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
342        }
343    }
344
345    fn default_split_task() -> (
346        PackingBatch<MockTransaction>,
347        Vec<MockTransaction>,
348        Vec<MockTransaction>,
349        Vec<MockTransaction>,
350    ) {
351        let batch = default_batch(5);
352        let (prefix, suffix) = batch.txs.split_at(3);
353        let (prefix, suffix) = (prefix.to_vec(), suffix.to_vec());
354        let all = batch.txs.clone();
355        (batch, all, prefix, suffix)
356    }
357
358    #[test]
359    fn test_insert_basic() {
360        let mut batch = default_batch(5);
361        let config = &PackingPoolConfig::new_for_test();
362
363        // Append
364        batch.insert_test(default_tx(7), config, vec![], Ok(()));
365
366        // Append with large nonce
367        batch.insert_test(default_tx(9), config, vec![], Err(LargeNonce));
368
369        // Append with small nonce
370        batch.insert_test(default_tx(1), config, vec![], Ok(()));
371
372        // Append with small price
373        let mut tx = default_tx(8);
374        tx.gas_price = 6;
375        batch.insert_test(tx, config, vec![], Err(DecreasingGasPrice));
376
377        // Replace
378        let tx = default_tx(4);
379        let old_tx = batch.txs[3];
380        batch.insert_test(tx, config, vec![old_tx], Ok(()));
381
382        // Replace smaller (acceptable) price
383        let mut tx = default_tx(4);
384        tx.gas_price = 3;
385        let old_tx = batch.txs[3];
386        batch.insert_test(tx, &config, vec![old_tx], Ok(()));
387
388        // Replace larger (acceptable) gas
389        let mut tx = default_tx(4);
390        tx.gas_price = 5;
391        let old_tx = batch.txs[3];
392        batch.insert_test(tx, &config, vec![old_tx], Ok(()));
393
394        // Replace large price
395        let mut tx = default_tx(4);
396        tx.gas_price = 6;
397        let old_txs = batch.txs[3..].to_vec();
398        batch.insert_test(tx, &config, old_txs, Ok(()));
399
400        // Replace small price
401        let mut tx = default_tx(3);
402        tx.gas_price = 1;
403        let old_txs = batch.txs[2..].to_vec();
404        batch.insert_test(tx, &config, old_txs, Err(DecreasingGasPrice));
405
406        // Insert in-continous nonce
407        let mut batch = default_batch(5);
408        let old_txs = batch.txs.clone();
409        batch.insert_test(default_tx(0), &config, old_txs, Ok(()));
410
411        // Insert large price in front
412        let mut batch = default_batch(5);
413        let mut tx = default_tx(1);
414        tx.gas_price = 10;
415        let old_txs = batch.txs.clone();
416        batch.insert_test(tx, &config, old_txs, Ok(()));
417
418        // Replace large gas limit
419        let mut batch = default_batch(5);
420        let mut tx = default_tx(4);
421        tx.gas_limit = config.address_gas_limit.as_u64();
422        let old_txs = batch.txs[2..].to_vec();
423        batch.insert_test(tx, &config, old_txs, Err(ExceedAddrGasLimit));
424
425        // Replace acceptable but truncate gas limit
426        let mut batch = default_batch(5);
427        let mut tx = default_tx(4);
428        tx.gas_limit = config.address_gas_limit.as_u64() - 10;
429        let old_txs = vec![batch.txs[2], batch.txs[4]];
430        batch.insert_test(tx, &config, old_txs, Ok(()));
431    }
432
433    #[test]
434    fn test_large_gas_limit() {
435        let config = &PackingPoolConfig::new_for_test();
436
437        // Append with large limit
438        let mut batch = default_batch(5);
439        let mut tx = default_tx(7);
440        tx.gas_limit = config.address_gas_limit.as_u64() - 2;
441        batch.insert_test(tx, config, vec![], Err(ExceedAddrGasLimit));
442
443        // Replace with large gas limit
444        let mut batch = default_batch(5);
445        let mut tx = default_tx(4);
446        tx.gas_limit = config.address_gas_limit.as_u64();
447        let old_txs = batch.txs[2..].to_vec();
448        batch.insert_test(tx, &config, old_txs, Err(ExceedAddrGasLimit));
449
450        // Replace acceptable but truncate gas limit
451        let mut batch = default_batch(5);
452        let mut tx = default_tx(4);
453        tx.gas_limit = config.address_gas_limit.as_u64() - 10;
454        let old_txs = vec![batch.txs[2], batch.txs[4]];
455        batch.insert_test(tx, &config, old_txs, Ok(()));
456
457        // Replace first with large gas limit
458        let mut batch = default_batch(5);
459        let mut tx = default_tx(1);
460        tx.gas_limit = config.address_gas_limit.as_u64() - 1;
461        let old_txs = batch.txs.clone();
462        batch.insert_test(tx, &config, old_txs, Ok(()));
463
464        // Replace first with large gas limit
465        let mut batch = default_batch(5);
466        let mut tx = default_tx(1);
467        tx.gas_limit = config.address_gas_limit.as_u64() + 1;
468        let old_txs = batch.txs.clone();
469        batch.insert_test(tx, &config, old_txs, Ok(()));
470
471        // Insert front with large gas limit
472        let mut batch = default_batch(5);
473        let mut tx = default_tx(0);
474        tx.gas_limit = config.address_gas_limit.as_u64() - 1;
475        let old_txs = batch.txs.clone();
476        batch.insert_test(tx, &config, old_txs, Ok(()));
477
478        // Insert front with large gas limit
479        let mut batch = default_batch(5);
480        let mut tx = default_tx(0);
481        tx.gas_limit = config.address_gas_limit.as_u64() + 1;
482        let old_txs = batch.txs.clone();
483        batch.insert_test(tx, &config, old_txs, Ok(()));
484    }
485
486    #[test]
487    fn test_many_transactions() {
488        let config = &PackingPoolConfig::new_for_test();
489
490        // Append
491        let mut batch = default_batch(20);
492        batch.insert_test(
493            default_tx(22),
494            config,
495            vec![],
496            Err(ExceedAddrTxCount),
497        );
498
499        // Replace
500        let old_tx = batch.txs[19];
501        batch.insert_test(default_tx(21), config, vec![old_tx], Ok(()));
502
503        // Insert front
504        let old_tx = batch.txs[19];
505        batch.insert_test(default_tx(1), config, vec![old_tx], Ok(()));
506
507        // Insert skipped front
508        let mut batch = default_batch(20);
509        let old_txs = batch.txs.clone();
510        batch.insert_test(default_tx(0), config, old_txs, Ok(()));
511    }
512
513    #[test]
514    #[allow(unused_variables)]
515    fn test_split_off() {
516        use super::RemoveError::ShouldDelete;
517        let config = &PackingPoolConfig::new_for_test();
518
519        let (mut batch, all, prefix, suffix) = default_split_task();
520        assert_eq!(batch.split_off_prefix(3), Ok(prefix));
521        assert_eq!(batch.txs, suffix);
522        batch.assert_constraints();
523
524        let (mut batch, all, prefix, suffix) = default_split_task();
525        assert_eq!(batch.split_off_suffix(3), Ok(suffix));
526        assert_eq!(batch.txs, prefix);
527        batch.assert_constraints();
528
529        let (mut batch, all, prefix, suffix) = default_split_task();
530        assert_eq!(batch.split_off_prefix(0), Ok(vec![]));
531        assert_eq!(batch.txs, all);
532        batch.assert_constraints();
533
534        let (mut batch, all, prefix, suffix) = default_split_task();
535        assert_eq!(batch.split_off_suffix(0), Err(ShouldDelete));
536        assert_eq!(batch.txs, all);
537        batch.assert_constraints();
538
539        let (mut batch, all, prefix, suffix) = default_split_task();
540        assert_eq!(batch.split_off_prefix(5), Err(ShouldDelete));
541        assert_eq!(batch.txs, all);
542        batch.assert_constraints();
543
544        let (mut batch, all, prefix, suffix) = default_split_task();
545        assert_eq!(batch.split_off_suffix(5), Ok(vec![]));
546        assert_eq!(batch.txs, all);
547        batch.assert_constraints();
548
549        let (mut batch, all, prefix, suffix) = default_split_task();
550        assert_eq!(batch.split_off_by_nonce(&5.into(), false), Ok(prefix));
551        assert_eq!(batch.txs, suffix);
552        batch.assert_constraints();
553
554        let (mut batch, all, prefix, suffix) = default_split_task();
555        assert_eq!(batch.split_off_by_nonce(&5.into(), true), Ok(suffix));
556        assert_eq!(batch.txs, prefix);
557        batch.assert_constraints();
558
559        let (mut batch, all, prefix, suffix) = default_split_task();
560        assert_eq!(batch.split_off_by_nonce(&2.into(), false), Ok(vec![]));
561        assert_eq!(batch.txs, all);
562        batch.assert_constraints();
563
564        let (mut batch, all, prefix, suffix) = default_split_task();
565        assert_eq!(
566            batch.split_off_by_nonce(&2.into(), true),
567            Err(ShouldDelete)
568        );
569        assert_eq!(batch.txs, all);
570        batch.assert_constraints();
571
572        let (mut batch, all, prefix, suffix) = default_split_task();
573        assert_eq!(batch.split_off_by_nonce(&1.into(), false), Ok(vec![]));
574        assert_eq!(batch.txs, all);
575        batch.assert_constraints();
576
577        let (mut batch, all, prefix, suffix) = default_split_task();
578        assert_eq!(
579            batch.split_off_by_nonce(&1.into(), true),
580            Err(ShouldDelete)
581        );
582        assert_eq!(batch.txs, all);
583        batch.assert_constraints();
584
585        let (mut batch, all, prefix, suffix) = default_split_task();
586        assert_eq!(
587            batch.split_off_by_nonce(&7.into(), false),
588            Err(ShouldDelete)
589        );
590        assert_eq!(batch.txs, all);
591        batch.assert_constraints();
592
593        let (mut batch, all, prefix, suffix) = default_split_task();
594        assert_eq!(batch.split_off_by_nonce(&7.into(), true), Ok(vec![]));
595        assert_eq!(batch.txs, all);
596        batch.assert_constraints();
597
598        let (mut batch, all, prefix, suffix) = default_split_task();
599        assert_eq!(
600            batch.split_off_by_nonce(&8.into(), false),
601            Err(ShouldDelete)
602        );
603        assert_eq!(batch.txs, all);
604        batch.assert_constraints();
605
606        let (mut batch, all, prefix, suffix) = default_split_task();
607        assert_eq!(batch.split_off_by_nonce(&8.into(), true), Ok(vec![]));
608        assert_eq!(batch.txs, all);
609        batch.assert_constraints();
610    }
611}