channel/
message_queues.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8use diem_metrics::IntCounterVec;
9use std::{
10    collections::{HashMap, VecDeque},
11    fmt::{Debug, Formatter, Result},
12    hash::Hash,
13    num::NonZeroUsize,
14};
15
16/// Remove empty per-key-queues every `POPS_PER_GC` dequeue operations.
17const POPS_PER_GC: u32 = 50;
18
19/// QueueStyle is an enum which can be used as a configuration option for
20/// PerValidatorQueue. Since the queue per key is going to be bounded,
21/// QueueStyle also determines the policy for dropping and retrieving messages.
22///
23/// With LIFO, oldest messages are dropped.
24/// With FIFO, newest messages are dropped.
25/// With KLAST, oldest messages are dropped, but remaining are retrieved in FIFO
26/// order
27#[derive(Clone, Copy, Debug)]
28pub enum QueueStyle {
29    LIFO,
30    FIFO,
31    KLAST,
32}
33
34/// PerKeyQueue maintains a queue of messages per key. It
35/// is a bounded queue of messages per Key and the style (FIFO, LIFO) is
36/// configurable. When a new message is added using `push`, it is added to
37/// the key's queue.
38///
39/// When `pop` is called, the next message is picked from one
40/// of the key's queue and returned. This happens in a round-robin
41/// fashion among keys.
42///
43/// If there are no messages, in any of the queues, `None` is returned.
44pub(crate) struct PerKeyQueue<K: Eq + Hash + Clone, T> {
45    /// QueueStyle for the messages stored per key
46    queue_style: QueueStyle,
47    /// per_key_queue maintains a map from a Key to a queue
48    /// of all the messages from that Key. A Key is usually
49    /// represented by AccountAddress
50    per_key_queue: HashMap<K, VecDeque<T>>,
51    /// This is a (round-robin)queue of Keys which have pending messages
52    /// This queue will be used for performing round robin among
53    /// Keys for choosing the next message
54    round_robin_queue: VecDeque<K>,
55    /// Maximum number of messages to store per key
56    max_queue_size: NonZeroUsize,
57    /// Number of messages dequeued since last GC
58    num_popped_since_gc: u32,
59    /// Optional counters for recording # enqueued, # dequeued, and # dropped
60    /// messages
61    counters: Option<&'static IntCounterVec>,
62}
63
64impl<K: Eq + Hash + Clone, T> Debug for PerKeyQueue<K, T> {
65    fn fmt(&self, f: &mut Formatter) -> Result {
66        f.debug_struct("PerKeyQueue")
67            .field("queue_style", &self.queue_style)
68            .field("max_queue_size", &self.max_queue_size)
69            .field("num_popped_since_gc", &self.num_popped_since_gc)
70            .finish()
71    }
72}
73
74impl<K: Eq + Hash + Clone, T> PerKeyQueue<K, T> {
75    /// Create a new PerKeyQueue with the provided QueueStyle and
76    /// max_queue_size_per_key
77    pub(crate) fn new(
78        queue_style: QueueStyle, max_queue_size_per_key: NonZeroUsize,
79        counters: Option<&'static IntCounterVec>,
80    ) -> Self {
81        Self {
82            queue_style,
83            max_queue_size: max_queue_size_per_key,
84            per_key_queue: HashMap::new(),
85            round_robin_queue: VecDeque::new(),
86            num_popped_since_gc: 0,
87            counters,
88        }
89    }
90
91    /// Given a key, pops the message from its queue and returns the message
92    /// It also returns a boolean indicating whether the keys queue is empty
93    /// after popping the message
94    fn pop_from_key_queue(&mut self, key: &K) -> (Option<T>, bool) {
95        if let Some(q) = self.per_key_queue.get_mut(key) {
96            // Extract message from the key's queue
97            let retval = match self.queue_style {
98                QueueStyle::FIFO | QueueStyle::KLAST => q.pop_front(),
99                QueueStyle::LIFO => q.pop_back(),
100            };
101            (retval, q.is_empty())
102        } else {
103            (None, true)
104        }
105    }
106
107    /// push a message to the appropriate queue in per_key_queue
108    /// add the key to round_robin_queue if it didnt already exist.
109    /// Returns Some(T) if the new or an existing element was dropped. Returns
110    /// None otherwise.
111    pub(crate) fn push(&mut self, key: K, message: T) -> Option<T> {
112        if let Some(c) = self.counters.as_ref() {
113            c.with_label_values(&["enqueued"]).inc();
114        }
115
116        let key_message_queue = self
117            .per_key_queue
118            .entry(key.clone())
119            // Only allocate a small initial queue for a new key. Previously, we
120            // allocated a queue with all `max_queue_size_per_key` entries;
121            // however, this breaks down when we have lots of transient peers.
122            // For example, many of our queues have a max capacity of 1024. To
123            // handle a single rpc from a transient peer, we would end up
124            // allocating ~ 96 b * 1024 ~ 64 Kib per queue.
125            .or_insert_with(|| VecDeque::with_capacity(1));
126
127        // Add the key to our round-robin queue if it's not already there
128        if key_message_queue.is_empty() {
129            self.round_robin_queue.push_back(key);
130        }
131
132        // Push the message to the actual key message queue
133        if key_message_queue.len() >= self.max_queue_size.get() {
134            if let Some(c) = self.counters.as_ref() {
135                c.with_label_values(&["dropped"]).inc();
136            }
137            match self.queue_style {
138                // Drop the newest message for FIFO
139                QueueStyle::FIFO => Some(message),
140                // Drop the oldest message for LIFO
141                QueueStyle::LIFO | QueueStyle::KLAST => {
142                    let oldest = key_message_queue.pop_front();
143                    key_message_queue.push_back(message);
144                    oldest
145                }
146            }
147        } else {
148            key_message_queue.push_back(message);
149            None
150        }
151    }
152
153    /// pop a message from the appropriate queue in per_key_queue
154    /// remove the key from the round_robin_queue if it has no more messages
155    pub(crate) fn pop(&mut self) -> Option<T> {
156        let key = match self.round_robin_queue.pop_front() {
157            Some(v) => v,
158            _ => {
159                return None;
160            }
161        };
162
163        let (message, is_q_empty) = self.pop_from_key_queue(&key);
164        if !is_q_empty {
165            self.round_robin_queue.push_back(key);
166        }
167
168        if message.is_some() {
169            if let Some(c) = self.counters.as_ref() {
170                c.with_label_values(&["dequeued"]).inc();
171            }
172
173            // Remove empty per-key-queues every `POPS_PER_GC` successful
174            // dequeue operations.
175            //
176            // diem-channel never removes keys from its PerKeyQueue (without
177            // this logic). This works fine for the validator network, where we
178            // have a bounded set of peers that almost never changes; however,
179            // this does not work for servicing public clients, where we can
180            // have large and frequent connection churn.
181            //
182            // Periodically removing these empty queues prevents us from causing
183            // an effective memory leak when we have lots of transient peers in
184            // e.g. the public-facing vfn use-case.
185            //
186            // This GC strategy could probably be more sophisticated, though it
187            // seems to work well in some basic stress tests / micro benches.
188            //
189            // See: common/channel/src/bin/many_keys_stress_test.rs
190            //
191            // For more context, see: https://github.com/diem/diem/issues/5543
192            self.num_popped_since_gc += 1;
193            if self.num_popped_since_gc >= POPS_PER_GC {
194                self.num_popped_since_gc = 0;
195                self.remove_empty_queues();
196            }
197        }
198
199        message
200    }
201
202    /// Garbage collect any empty per-key-queues.
203    fn remove_empty_queues(&mut self) {
204        self.per_key_queue.retain(|_key, queue| !queue.is_empty());
205    }
206
207    /// Clears all the pending messages and cleans up the queue from the
208    /// previous metadata.
209    pub(crate) fn clear(&mut self) {
210        self.per_key_queue.clear();
211        self.round_robin_queue.clear();
212    }
213}