channel/
message_queues.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8use std::{
9    collections::{HashMap, VecDeque},
10    fmt::{Debug, Formatter, Result},
11    hash::Hash,
12    num::NonZeroUsize,
13};
14
15/// Remove empty per-key-queues every `POPS_PER_GC` dequeue operations.
16const POPS_PER_GC: u32 = 50;
17
18/// QueueStyle is an enum which can be used as a configuration option for
19/// PerValidatorQueue. Since the queue per key is going to be bounded,
20/// QueueStyle also determines the policy for dropping and retrieving messages.
21///
22/// With LIFO, oldest messages are dropped.
23/// With FIFO, newest messages are dropped.
24/// With KLAST, oldest messages are dropped, but remaining are retrieved in FIFO
25/// order
26#[derive(Clone, Copy, Debug)]
27pub enum QueueStyle {
28    LIFO,
29    FIFO,
30    KLAST,
31}
32
33/// PerKeyQueue maintains a queue of messages per key. It
34/// is a bounded queue of messages per Key and the style (FIFO, LIFO) is
35/// configurable. When a new message is added using `push`, it is added to
36/// the key's queue.
37///
38/// When `pop` is called, the next message is picked from one
39/// of the key's queue and returned. This happens in a round-robin
40/// fashion among keys.
41///
42/// If there are no messages, in any of the queues, `None` is returned.
43pub(crate) struct PerKeyQueue<K: Eq + Hash + Clone, T> {
44    /// QueueStyle for the messages stored per key
45    queue_style: QueueStyle,
46    /// per_key_queue maintains a map from a Key to a queue
47    /// of all the messages from that Key. A Key is usually
48    /// represented by AccountAddress
49    per_key_queue: HashMap<K, VecDeque<T>>,
50    /// This is a (round-robin)queue of Keys which have pending messages
51    /// This queue will be used for performing round robin among
52    /// Keys for choosing the next message
53    round_robin_queue: VecDeque<K>,
54    /// Maximum number of messages to store per key
55    max_queue_size: NonZeroUsize,
56    /// Number of messages dequeued since last GC
57    num_popped_since_gc: u32,
58}
59
60impl<K: Eq + Hash + Clone, T> Debug for PerKeyQueue<K, T> {
61    fn fmt(&self, f: &mut Formatter) -> Result {
62        f.debug_struct("PerKeyQueue")
63            .field("queue_style", &self.queue_style)
64            .field("max_queue_size", &self.max_queue_size)
65            .field("num_popped_since_gc", &self.num_popped_since_gc)
66            .finish()
67    }
68}
69
70impl<K: Eq + Hash + Clone, T> PerKeyQueue<K, T> {
71    /// Create a new PerKeyQueue with the provided QueueStyle and
72    /// max_queue_size_per_key
73    pub(crate) fn new(
74        queue_style: QueueStyle, max_queue_size_per_key: NonZeroUsize,
75    ) -> Self {
76        Self {
77            queue_style,
78            max_queue_size: max_queue_size_per_key,
79            per_key_queue: HashMap::new(),
80            round_robin_queue: VecDeque::new(),
81            num_popped_since_gc: 0,
82        }
83    }
84
85    /// Given a key, pops the message from its queue and returns the message
86    /// It also returns a boolean indicating whether the keys queue is empty
87    /// after popping the message
88    fn pop_from_key_queue(&mut self, key: &K) -> (Option<T>, bool) {
89        if let Some(q) = self.per_key_queue.get_mut(key) {
90            // Extract message from the key's queue
91            let retval = match self.queue_style {
92                QueueStyle::FIFO | QueueStyle::KLAST => q.pop_front(),
93                QueueStyle::LIFO => q.pop_back(),
94            };
95            (retval, q.is_empty())
96        } else {
97            (None, true)
98        }
99    }
100
101    /// push a message to the appropriate queue in per_key_queue
102    /// add the key to round_robin_queue if it didnt already exist.
103    /// Returns Some(T) if the new or an existing element was dropped. Returns
104    /// None otherwise.
105    pub(crate) fn push(&mut self, key: K, message: T) -> Option<T> {
106        let key_message_queue = self
107            .per_key_queue
108            .entry(key.clone())
109            // Only allocate a small initial queue for a new key. Previously, we
110            // allocated a queue with all `max_queue_size_per_key` entries;
111            // however, this breaks down when we have lots of transient peers.
112            // For example, many of our queues have a max capacity of 1024. To
113            // handle a single rpc from a transient peer, we would end up
114            // allocating ~ 96 b * 1024 ~ 64 Kib per queue.
115            .or_insert_with(|| VecDeque::with_capacity(1));
116
117        // Add the key to our round-robin queue if it's not already there
118        if key_message_queue.is_empty() {
119            self.round_robin_queue.push_back(key);
120        }
121
122        // Push the message to the actual key message queue
123        if key_message_queue.len() >= self.max_queue_size.get() {
124            match self.queue_style {
125                // Drop the newest message for FIFO
126                QueueStyle::FIFO => Some(message),
127                // Drop the oldest message for LIFO
128                QueueStyle::LIFO | QueueStyle::KLAST => {
129                    let oldest = key_message_queue.pop_front();
130                    key_message_queue.push_back(message);
131                    oldest
132                }
133            }
134        } else {
135            key_message_queue.push_back(message);
136            None
137        }
138    }
139
140    /// pop a message from the appropriate queue in per_key_queue
141    /// remove the key from the round_robin_queue if it has no more messages
142    pub(crate) fn pop(&mut self) -> Option<T> {
143        let key = match self.round_robin_queue.pop_front() {
144            Some(v) => v,
145            _ => {
146                return None;
147            }
148        };
149
150        let (message, is_q_empty) = self.pop_from_key_queue(&key);
151        if !is_q_empty {
152            self.round_robin_queue.push_back(key);
153        }
154
155        if message.is_some() {
156            // Remove empty per-key-queues every `POPS_PER_GC` successful
157            // dequeue operations.
158            //
159            // diem-channel never removes keys from its PerKeyQueue (without
160            // this logic). This works fine for the validator network, where we
161            // have a bounded set of peers that almost never changes; however,
162            // this does not work for servicing public clients, where we can
163            // have large and frequent connection churn.
164            //
165            // Periodically removing these empty queues prevents us from causing
166            // an effective memory leak when we have lots of transient peers in
167            // e.g. the public-facing vfn use-case.
168            //
169            // This GC strategy could probably be more sophisticated, though it
170            // seems to work well in some basic stress tests / micro benches.
171            //
172            // See: common/channel/src/bin/many_keys_stress_test.rs
173            //
174            // For more context, see: https://github.com/diem/diem/issues/5543
175            self.num_popped_since_gc += 1;
176            if self.num_popped_since_gc >= POPS_PER_GC {
177                self.num_popped_since_gc = 0;
178                self.remove_empty_queues();
179            }
180        }
181
182        message
183    }
184
185    /// Garbage collect any empty per-key-queues.
186    fn remove_empty_queues(&mut self) {
187        self.per_key_queue.retain(|_key, queue| !queue.is_empty());
188    }
189
190    /// Clears all the pending messages and cleans up the queue from the
191    /// previous metadata.
192    pub(crate) fn clear(&mut self) {
193        self.per_key_queue.clear();
194        self.round_robin_queue.clear();
195    }
196}