cfx_rpc/helpers/
epoch_queue.rs

1// Copyright 2020 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use log::error;
6use std::collections::VecDeque;
7
8/// The goal of EpochQueue is to keep a distance from the tip of the ledger.
9/// This way, we can ensure that the epoch being processed has already been
10/// executed (deferred execution) and we can reduce the number of chain reorgs
11/// that we need to notify subscribers about.
12
13pub struct EpochQueue<T> {
14    capacity: usize,
15    queue: VecDeque<(u64, T)>,
16}
17
18impl<T> EpochQueue<T> {
19    pub fn with_capacity(capacity: usize) -> Self {
20        let mut queue = VecDeque::new();
21        queue.reserve(capacity);
22        Self { capacity, queue }
23    }
24
25    pub fn push(&mut self, new: (u64, T)) -> Option<(u64, T)> {
26        if self.capacity == 0 {
27            return Some(new);
28        }
29
30        // remove epochs from queue that are greater or equal to the new one
31        while matches!(self.queue.back(), Some((e, _)) if *e >= new.0) {
32            self.queue.pop_back();
33        }
34
35        // we should not skip any epochs
36        if let Some((e, _)) = self.queue.back() {
37            if *e != new.0 - 1 {
38                error!("Skipped epoch in epoch queue: {} --> {}", *e, new.0);
39            }
40        }
41
42        // only return epoch if queue is full
43        match self.queue.len() {
44            n if n < self.capacity => {
45                self.queue.push_back(new);
46                return None;
47            }
48            n if n == self.capacity => {
49                let e = self.queue.pop_front().unwrap();
50                self.queue.push_back(new);
51                return Some(e);
52            }
53            _ => unreachable!(),
54        }
55    }
56}
57
58#[cfg(test)]
59mod test {
60    use super::*;
61
62    #[test]
63    fn test_no_queue() {
64        let mut queue = EpochQueue::with_capacity(0);
65
66        assert_eq!(queue.push((0, 0)), Some((0, 0)));
67        assert_eq!(queue.push((1, 1)), Some((1, 1)));
68        assert_eq!(queue.push((2, 2)), Some((2, 2)));
69        assert_eq!(queue.push((3, 3)), Some((3, 3)));
70    }
71
72    #[test]
73    fn test_no_reorgs() {
74        let mut queue = EpochQueue::with_capacity(5);
75
76        assert_eq!(queue.push((0, 0)), None);
77        assert_eq!(queue.push((1, 1)), None);
78        assert_eq!(queue.push((2, 2)), None);
79        assert_eq!(queue.push((3, 3)), None);
80        assert_eq!(queue.push((4, 4)), None);
81        assert_eq!(queue.push((5, 5)), Some((0, 0)));
82        assert_eq!(queue.push((6, 6)), Some((1, 1)));
83        assert_eq!(queue.push((7, 7)), Some((2, 2)));
84        assert_eq!(queue.push((8, 8)), Some((3, 3)));
85    }
86
87    #[test]
88    fn test_shallow_reorgs() {
89        let mut queue = EpochQueue::with_capacity(5);
90
91        assert_eq!(queue.push((0, 0)), None);
92        assert_eq!(queue.push((1, 1)), None);
93        assert_eq!(queue.push((2, 2)), None);
94        assert_eq!(queue.push((1, 3)), None); // reorg: 2 --> 1
95        assert_eq!(queue.push((2, 4)), None);
96        assert_eq!(queue.push((3, 5)), None);
97        assert_eq!(queue.push((4, 6)), None);
98        assert_eq!(queue.push((1, 7)), None); // reorg: 4 --> 1
99        assert_eq!(queue.push((2, 8)), None);
100        assert_eq!(queue.push((3, 9)), None);
101        assert_eq!(queue.push((4, 10)), None);
102        assert_eq!(queue.push((5, 11)), Some((0, 0)));
103        assert_eq!(queue.push((6, 12)), Some((1, 7)));
104        assert_eq!(queue.push((4, 13)), None); // reorg: 6 --> 4
105        assert_eq!(queue.push((5, 14)), None);
106        assert_eq!(queue.push((6, 15)), None);
107        assert_eq!(queue.push((7, 16)), Some((2, 8)));
108        assert_eq!(queue.push((8, 17)), Some((3, 9)));
109    }
110
111    #[test]
112    fn test_deep_reorgs() {
113        let mut queue = EpochQueue::with_capacity(5);
114
115        assert_eq!(queue.push((0, 0)), None);
116        assert_eq!(queue.push((1, 1)), None);
117        assert_eq!(queue.push((2, 2)), None);
118        assert_eq!(queue.push((3, 3)), None);
119        assert_eq!(queue.push((4, 4)), None);
120        assert_eq!(queue.push((5, 5)), Some((0, 0)));
121        assert_eq!(queue.push((6, 6)), Some((1, 1)));
122        assert_eq!(queue.push((7, 7)), Some((2, 2)));
123        assert_eq!(queue.push((8, 8)), Some((3, 3)));
124        assert_eq!(queue.push((0, 9)), None); // reorg: 8 --> 0
125        assert_eq!(queue.push((1, 10)), None);
126        assert_eq!(queue.push((2, 11)), None);
127        assert_eq!(queue.push((3, 12)), None);
128        assert_eq!(queue.push((4, 13)), None);
129        assert_eq!(queue.push((5, 14)), Some((0, 9)));
130        assert_eq!(queue.push((6, 15)), Some((1, 10)));
131        assert_eq!(queue.push((7, 16)), Some((2, 11)));
132        assert_eq!(queue.push((8, 17)), Some((3, 12)));
133    }
134}