channel/message_queues.rs
1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8use std::{
9 collections::{HashMap, VecDeque},
10 fmt::{Debug, Formatter, Result},
11 hash::Hash,
12 num::NonZeroUsize,
13};
14
15/// Remove empty per-key-queues every `POPS_PER_GC` dequeue operations.
16const POPS_PER_GC: u32 = 50;
17
18/// QueueStyle is an enum which can be used as a configuration option for
19/// PerValidatorQueue. Since the queue per key is going to be bounded,
20/// QueueStyle also determines the policy for dropping and retrieving messages.
21///
22/// With LIFO, oldest messages are dropped.
23/// With FIFO, newest messages are dropped.
24/// With KLAST, oldest messages are dropped, but remaining are retrieved in FIFO
25/// order
26#[derive(Clone, Copy, Debug)]
27pub enum QueueStyle {
28 LIFO,
29 FIFO,
30 KLAST,
31}
32
33/// PerKeyQueue maintains a queue of messages per key. It
34/// is a bounded queue of messages per Key and the style (FIFO, LIFO) is
35/// configurable. When a new message is added using `push`, it is added to
36/// the key's queue.
37///
38/// When `pop` is called, the next message is picked from one
39/// of the key's queue and returned. This happens in a round-robin
40/// fashion among keys.
41///
42/// If there are no messages, in any of the queues, `None` is returned.
43pub(crate) struct PerKeyQueue<K: Eq + Hash + Clone, T> {
44 /// QueueStyle for the messages stored per key
45 queue_style: QueueStyle,
46 /// per_key_queue maintains a map from a Key to a queue
47 /// of all the messages from that Key. A Key is usually
48 /// represented by AccountAddress
49 per_key_queue: HashMap<K, VecDeque<T>>,
50 /// This is a (round-robin)queue of Keys which have pending messages
51 /// This queue will be used for performing round robin among
52 /// Keys for choosing the next message
53 round_robin_queue: VecDeque<K>,
54 /// Maximum number of messages to store per key
55 max_queue_size: NonZeroUsize,
56 /// Number of messages dequeued since last GC
57 num_popped_since_gc: u32,
58}
59
60impl<K: Eq + Hash + Clone, T> Debug for PerKeyQueue<K, T> {
61 fn fmt(&self, f: &mut Formatter) -> Result {
62 f.debug_struct("PerKeyQueue")
63 .field("queue_style", &self.queue_style)
64 .field("max_queue_size", &self.max_queue_size)
65 .field("num_popped_since_gc", &self.num_popped_since_gc)
66 .finish()
67 }
68}
69
70impl<K: Eq + Hash + Clone, T> PerKeyQueue<K, T> {
71 /// Create a new PerKeyQueue with the provided QueueStyle and
72 /// max_queue_size_per_key
73 pub(crate) fn new(
74 queue_style: QueueStyle, max_queue_size_per_key: NonZeroUsize,
75 ) -> Self {
76 Self {
77 queue_style,
78 max_queue_size: max_queue_size_per_key,
79 per_key_queue: HashMap::new(),
80 round_robin_queue: VecDeque::new(),
81 num_popped_since_gc: 0,
82 }
83 }
84
85 /// Given a key, pops the message from its queue and returns the message
86 /// It also returns a boolean indicating whether the keys queue is empty
87 /// after popping the message
88 fn pop_from_key_queue(&mut self, key: &K) -> (Option<T>, bool) {
89 if let Some(q) = self.per_key_queue.get_mut(key) {
90 // Extract message from the key's queue
91 let retval = match self.queue_style {
92 QueueStyle::FIFO | QueueStyle::KLAST => q.pop_front(),
93 QueueStyle::LIFO => q.pop_back(),
94 };
95 (retval, q.is_empty())
96 } else {
97 (None, true)
98 }
99 }
100
101 /// push a message to the appropriate queue in per_key_queue
102 /// add the key to round_robin_queue if it didnt already exist.
103 /// Returns Some(T) if the new or an existing element was dropped. Returns
104 /// None otherwise.
105 pub(crate) fn push(&mut self, key: K, message: T) -> Option<T> {
106 let key_message_queue = self
107 .per_key_queue
108 .entry(key.clone())
109 // Only allocate a small initial queue for a new key. Previously, we
110 // allocated a queue with all `max_queue_size_per_key` entries;
111 // however, this breaks down when we have lots of transient peers.
112 // For example, many of our queues have a max capacity of 1024. To
113 // handle a single rpc from a transient peer, we would end up
114 // allocating ~ 96 b * 1024 ~ 64 Kib per queue.
115 .or_insert_with(|| VecDeque::with_capacity(1));
116
117 // Add the key to our round-robin queue if it's not already there
118 if key_message_queue.is_empty() {
119 self.round_robin_queue.push_back(key);
120 }
121
122 // Push the message to the actual key message queue
123 if key_message_queue.len() >= self.max_queue_size.get() {
124 match self.queue_style {
125 // Drop the newest message for FIFO
126 QueueStyle::FIFO => Some(message),
127 // Drop the oldest message for LIFO
128 QueueStyle::LIFO | QueueStyle::KLAST => {
129 let oldest = key_message_queue.pop_front();
130 key_message_queue.push_back(message);
131 oldest
132 }
133 }
134 } else {
135 key_message_queue.push_back(message);
136 None
137 }
138 }
139
140 /// pop a message from the appropriate queue in per_key_queue
141 /// remove the key from the round_robin_queue if it has no more messages
142 pub(crate) fn pop(&mut self) -> Option<T> {
143 let key = match self.round_robin_queue.pop_front() {
144 Some(v) => v,
145 _ => {
146 return None;
147 }
148 };
149
150 let (message, is_q_empty) = self.pop_from_key_queue(&key);
151 if !is_q_empty {
152 self.round_robin_queue.push_back(key);
153 }
154
155 if message.is_some() {
156 // Remove empty per-key-queues every `POPS_PER_GC` successful
157 // dequeue operations.
158 //
159 // diem-channel never removes keys from its PerKeyQueue (without
160 // this logic). This works fine for the validator network, where we
161 // have a bounded set of peers that almost never changes; however,
162 // this does not work for servicing public clients, where we can
163 // have large and frequent connection churn.
164 //
165 // Periodically removing these empty queues prevents us from causing
166 // an effective memory leak when we have lots of transient peers in
167 // e.g. the public-facing vfn use-case.
168 //
169 // This GC strategy could probably be more sophisticated, though it
170 // seems to work well in some basic stress tests / micro benches.
171 //
172 // See: common/channel/src/bin/many_keys_stress_test.rs
173 //
174 // For more context, see: https://github.com/diem/diem/issues/5543
175 self.num_popped_since_gc += 1;
176 if self.num_popped_since_gc >= POPS_PER_GC {
177 self.num_popped_since_gc = 0;
178 self.remove_empty_queues();
179 }
180 }
181
182 message
183 }
184
185 /// Garbage collect any empty per-key-queues.
186 fn remove_empty_queues(&mut self) {
187 self.per_key_queue.retain(|_key, queue| !queue.is_empty());
188 }
189
190 /// Clears all the pending messages and cleans up the queue from the
191 /// previous metadata.
192 pub(crate) fn clear(&mut self) {
193 self.per_key_queue.clear();
194 self.round_robin_queue.clear();
195 }
196}