channel/message_queues.rs
1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8use diem_metrics::IntCounterVec;
9use std::{
10 collections::{HashMap, VecDeque},
11 fmt::{Debug, Formatter, Result},
12 hash::Hash,
13 num::NonZeroUsize,
14};
15
16/// Remove empty per-key-queues every `POPS_PER_GC` dequeue operations.
17const POPS_PER_GC: u32 = 50;
18
19/// QueueStyle is an enum which can be used as a configuration option for
20/// PerValidatorQueue. Since the queue per key is going to be bounded,
21/// QueueStyle also determines the policy for dropping and retrieving messages.
22///
23/// With LIFO, oldest messages are dropped.
24/// With FIFO, newest messages are dropped.
25/// With KLAST, oldest messages are dropped, but remaining are retrieved in FIFO
26/// order
27#[derive(Clone, Copy, Debug)]
28pub enum QueueStyle {
29 LIFO,
30 FIFO,
31 KLAST,
32}
33
34/// PerKeyQueue maintains a queue of messages per key. It
35/// is a bounded queue of messages per Key and the style (FIFO, LIFO) is
36/// configurable. When a new message is added using `push`, it is added to
37/// the key's queue.
38///
39/// When `pop` is called, the next message is picked from one
40/// of the key's queue and returned. This happens in a round-robin
41/// fashion among keys.
42///
43/// If there are no messages, in any of the queues, `None` is returned.
44pub(crate) struct PerKeyQueue<K: Eq + Hash + Clone, T> {
45 /// QueueStyle for the messages stored per key
46 queue_style: QueueStyle,
47 /// per_key_queue maintains a map from a Key to a queue
48 /// of all the messages from that Key. A Key is usually
49 /// represented by AccountAddress
50 per_key_queue: HashMap<K, VecDeque<T>>,
51 /// This is a (round-robin)queue of Keys which have pending messages
52 /// This queue will be used for performing round robin among
53 /// Keys for choosing the next message
54 round_robin_queue: VecDeque<K>,
55 /// Maximum number of messages to store per key
56 max_queue_size: NonZeroUsize,
57 /// Number of messages dequeued since last GC
58 num_popped_since_gc: u32,
59 /// Optional counters for recording # enqueued, # dequeued, and # dropped
60 /// messages
61 counters: Option<&'static IntCounterVec>,
62}
63
64impl<K: Eq + Hash + Clone, T> Debug for PerKeyQueue<K, T> {
65 fn fmt(&self, f: &mut Formatter) -> Result {
66 f.debug_struct("PerKeyQueue")
67 .field("queue_style", &self.queue_style)
68 .field("max_queue_size", &self.max_queue_size)
69 .field("num_popped_since_gc", &self.num_popped_since_gc)
70 .finish()
71 }
72}
73
74impl<K: Eq + Hash + Clone, T> PerKeyQueue<K, T> {
75 /// Create a new PerKeyQueue with the provided QueueStyle and
76 /// max_queue_size_per_key
77 pub(crate) fn new(
78 queue_style: QueueStyle, max_queue_size_per_key: NonZeroUsize,
79 counters: Option<&'static IntCounterVec>,
80 ) -> Self {
81 Self {
82 queue_style,
83 max_queue_size: max_queue_size_per_key,
84 per_key_queue: HashMap::new(),
85 round_robin_queue: VecDeque::new(),
86 num_popped_since_gc: 0,
87 counters,
88 }
89 }
90
91 /// Given a key, pops the message from its queue and returns the message
92 /// It also returns a boolean indicating whether the keys queue is empty
93 /// after popping the message
94 fn pop_from_key_queue(&mut self, key: &K) -> (Option<T>, bool) {
95 if let Some(q) = self.per_key_queue.get_mut(key) {
96 // Extract message from the key's queue
97 let retval = match self.queue_style {
98 QueueStyle::FIFO | QueueStyle::KLAST => q.pop_front(),
99 QueueStyle::LIFO => q.pop_back(),
100 };
101 (retval, q.is_empty())
102 } else {
103 (None, true)
104 }
105 }
106
107 /// push a message to the appropriate queue in per_key_queue
108 /// add the key to round_robin_queue if it didnt already exist.
109 /// Returns Some(T) if the new or an existing element was dropped. Returns
110 /// None otherwise.
111 pub(crate) fn push(&mut self, key: K, message: T) -> Option<T> {
112 if let Some(c) = self.counters.as_ref() {
113 c.with_label_values(&["enqueued"]).inc();
114 }
115
116 let key_message_queue = self
117 .per_key_queue
118 .entry(key.clone())
119 // Only allocate a small initial queue for a new key. Previously, we
120 // allocated a queue with all `max_queue_size_per_key` entries;
121 // however, this breaks down when we have lots of transient peers.
122 // For example, many of our queues have a max capacity of 1024. To
123 // handle a single rpc from a transient peer, we would end up
124 // allocating ~ 96 b * 1024 ~ 64 Kib per queue.
125 .or_insert_with(|| VecDeque::with_capacity(1));
126
127 // Add the key to our round-robin queue if it's not already there
128 if key_message_queue.is_empty() {
129 self.round_robin_queue.push_back(key);
130 }
131
132 // Push the message to the actual key message queue
133 if key_message_queue.len() >= self.max_queue_size.get() {
134 if let Some(c) = self.counters.as_ref() {
135 c.with_label_values(&["dropped"]).inc();
136 }
137 match self.queue_style {
138 // Drop the newest message for FIFO
139 QueueStyle::FIFO => Some(message),
140 // Drop the oldest message for LIFO
141 QueueStyle::LIFO | QueueStyle::KLAST => {
142 let oldest = key_message_queue.pop_front();
143 key_message_queue.push_back(message);
144 oldest
145 }
146 }
147 } else {
148 key_message_queue.push_back(message);
149 None
150 }
151 }
152
153 /// pop a message from the appropriate queue in per_key_queue
154 /// remove the key from the round_robin_queue if it has no more messages
155 pub(crate) fn pop(&mut self) -> Option<T> {
156 let key = match self.round_robin_queue.pop_front() {
157 Some(v) => v,
158 _ => {
159 return None;
160 }
161 };
162
163 let (message, is_q_empty) = self.pop_from_key_queue(&key);
164 if !is_q_empty {
165 self.round_robin_queue.push_back(key);
166 }
167
168 if message.is_some() {
169 if let Some(c) = self.counters.as_ref() {
170 c.with_label_values(&["dequeued"]).inc();
171 }
172
173 // Remove empty per-key-queues every `POPS_PER_GC` successful
174 // dequeue operations.
175 //
176 // diem-channel never removes keys from its PerKeyQueue (without
177 // this logic). This works fine for the validator network, where we
178 // have a bounded set of peers that almost never changes; however,
179 // this does not work for servicing public clients, where we can
180 // have large and frequent connection churn.
181 //
182 // Periodically removing these empty queues prevents us from causing
183 // an effective memory leak when we have lots of transient peers in
184 // e.g. the public-facing vfn use-case.
185 //
186 // This GC strategy could probably be more sophisticated, though it
187 // seems to work well in some basic stress tests / micro benches.
188 //
189 // See: common/channel/src/bin/many_keys_stress_test.rs
190 //
191 // For more context, see: https://github.com/diem/diem/issues/5543
192 self.num_popped_since_gc += 1;
193 if self.num_popped_since_gc >= POPS_PER_GC {
194 self.num_popped_since_gc = 0;
195 self.remove_empty_queues();
196 }
197 }
198
199 message
200 }
201
202 /// Garbage collect any empty per-key-queues.
203 fn remove_empty_queues(&mut self) {
204 self.per_key_queue.retain(|_key, queue| !queue.is_empty());
205 }
206
207 /// Clears all the pending messages and cleans up the queue from the
208 /// previous metadata.
209 pub(crate) fn clear(&mut self) {
210 self.per_key_queue.clear();
211 self.round_robin_queue.clear();
212 }
213}