channel/
diem_channel.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8//! diem_channel provides an mpsc channel which has two ends
9//! `diem_channel::Receiver` and `diem_channel::Sender` similar to existing mpsc
10//! data structures. What makes it different from existing mpsc channels is that
11//! we have full control over how the internal queueing in the channel happens
12//! and how we schedule messages to be sent out from this channel.
13//! Internally, it uses the `PerKeyQueue` to store messages
14use crate::message_queues::{PerKeyQueue, QueueStyle};
15use anyhow::{ensure, Result};
16use diem_infallible::{Mutex, NonZeroUsize};
17use diem_metrics::IntCounterVec;
18use futures::{
19    channel::oneshot,
20    stream::{FusedStream, Stream},
21};
22use std::{
23    fmt::{Debug, Formatter},
24    hash::Hash,
25    pin::Pin,
26    sync::Arc,
27    task::{Context, Poll, Waker},
28};
29
30/// SharedState is a data structure private to this module which is
31/// shared by the `Receiver` and any `Sender`s.
32#[derive(Debug)]
33struct SharedState<K: Eq + Hash + Clone, M> {
34    /// The internal queue of messages in this channel.
35    internal_queue:
36        PerKeyQueue<K, (M, Option<oneshot::Sender<ElementStatus<M>>>)>,
37    /// The `Receiver` registers its `Waker` in this slot when the queue is
38    /// empty. `Sender`s will try to wake the `Receiver` (if any) when they
39    /// push a new item onto the queue. The last live `Sender` will also
40    /// wake the `Receiver` as it's tearing down so the `Receiver` can
41    /// gracefully drain and shutdown the channel.
42    waker: Option<Waker>,
43    /// The number of active senders. When this value reaches 0, all senders
44    /// have been dropped.
45    num_senders: usize,
46    /// A boolean which tracks whether the receiver has dropped.
47    receiver_dropped: bool,
48    /// A boolean which tracks whether the stream has terminated. A stream is
49    /// considered terminated when sender has dropped and we have drained
50    /// everything inside our internal queue.
51    stream_terminated: bool,
52}
53
54/// The sending end of the diem_channel.
55#[derive(Debug)]
56pub struct Sender<K: Eq + Hash + Clone, M> {
57    shared_state: Arc<Mutex<SharedState<K, M>>>,
58}
59
60/// The status of an element inserted into a diem_channel. If the element is
61/// successfully dequeued, ElementStatus::Dequeued is sent to the sender. If it
62/// is dropped ElementStatus::Dropped is sent to the sender along with the
63/// dropped element.
64pub enum ElementStatus<M> {
65    Dequeued,
66    Dropped(M),
67}
68
69impl<M: PartialEq> PartialEq for ElementStatus<M> {
70    fn eq(&self, other: &ElementStatus<M>) -> bool {
71        match (self, other) {
72            (ElementStatus::Dequeued, ElementStatus::Dequeued) => true,
73            (ElementStatus::Dropped(a), ElementStatus::Dropped(b)) => a.eq(b),
74            _ => false,
75        }
76    }
77}
78
79impl<M: Debug> Debug for ElementStatus<M> {
80    fn fmt(
81        &self, f: &mut Formatter,
82    ) -> std::result::Result<(), std::fmt::Error> {
83        match self {
84            ElementStatus::Dequeued => write!(f, "Dequeued"),
85            ElementStatus::Dropped(v) => write!(f, "Dropped({:?})", v),
86        }
87    }
88}
89
90impl<K: Eq + Hash + Clone, M> Sender<K, M> {
91    /// This adds the message into the internal queue data structure. This is a
92    /// synchronous call.
93    pub fn push(&self, key: K, message: M) -> Result<()> {
94        self.push_with_feedback(key, message, None)
95    }
96
97    /// Same as `push`, but this function also accepts a oneshot::Sender over
98    /// which the sender can be notified when the message eventually gets
99    /// delivered or dropped.
100    pub fn push_with_feedback(
101        &self, key: K, message: M,
102        status_ch: Option<oneshot::Sender<ElementStatus<M>>>,
103    ) -> Result<()> {
104        let mut shared_state = self.shared_state.lock();
105        ensure!(!shared_state.receiver_dropped, "Channel is closed");
106        debug_assert!(shared_state.num_senders > 0);
107
108        let dropped =
109            shared_state.internal_queue.push(key, (message, status_ch));
110        // If this or an existing message had to be dropped because of the queue
111        // being full, we notify the corresponding status channel if it
112        // was registered.
113        if let Some((dropped_val, Some(dropped_status_ch))) = dropped {
114            // Ignore errors.
115            let _err =
116                dropped_status_ch.send(ElementStatus::Dropped(dropped_val));
117        }
118        if let Some(w) = shared_state.waker.take() {
119            w.wake();
120        }
121        Ok(())
122    }
123}
124
125impl<K: Eq + Hash + Clone, M> Clone for Sender<K, M> {
126    fn clone(&self) -> Self {
127        let shared_state = self.shared_state.clone();
128        {
129            let mut shared_state_lock = shared_state.lock();
130            debug_assert!(shared_state_lock.num_senders > 0);
131            shared_state_lock.num_senders += 1;
132        }
133        Sender { shared_state }
134    }
135}
136
137impl<K: Eq + Hash + Clone, M> Drop for Sender<K, M> {
138    fn drop(&mut self) {
139        let mut shared_state = self.shared_state.lock();
140
141        debug_assert!(shared_state.num_senders > 0);
142        shared_state.num_senders -= 1;
143
144        if shared_state.num_senders == 0 {
145            if let Some(waker) = shared_state.waker.take() {
146                waker.wake();
147            }
148        }
149    }
150}
151
152/// The receiving end of the diem_channel.
153pub struct Receiver<K: Eq + Hash + Clone, M> {
154    shared_state: Arc<Mutex<SharedState<K, M>>>,
155}
156
157impl<K: Eq + Hash + Clone, M> Receiver<K, M> {
158    /// Removes all the previously sent transactions that have not been consumed
159    /// yet and cleans up the internal queue structure (GC of the previous
160    /// keys).
161    pub fn clear(&mut self) {
162        let mut shared_state = self.shared_state.lock();
163        shared_state.internal_queue.clear();
164    }
165}
166
167impl<K: Eq + Hash + Clone, M> Drop for Receiver<K, M> {
168    fn drop(&mut self) {
169        let mut shared_state = self.shared_state.lock();
170        debug_assert!(!shared_state.receiver_dropped);
171        shared_state.receiver_dropped = true;
172    }
173}
174
175impl<K: Eq + Hash + Clone, M> Stream for Receiver<K, M> {
176    type Item = M;
177
178    /// poll_next checks whether there is something ready for consumption from
179    /// the internal queue. If there is, then it returns immediately. If the
180    /// internal_queue is empty, it sets the waker passed to it by the
181    /// scheduler/executor and returns Pending
182    fn poll_next(
183        self: Pin<&mut Self>, cx: &mut Context<'_>,
184    ) -> Poll<Option<Self::Item>> {
185        let mut shared_state = self.shared_state.lock();
186        if let Some((val, status_ch)) = shared_state.internal_queue.pop() {
187            if let Some(status_ch) = status_ch {
188                let _err = status_ch.send(ElementStatus::Dequeued);
189            }
190            Poll::Ready(Some(val))
191        // all senders have been dropped (and so the stream is terminated)
192        } else if shared_state.num_senders == 0 {
193            shared_state.stream_terminated = true;
194            Poll::Ready(None)
195        } else {
196            shared_state.waker = Some(cx.waker().clone());
197            Poll::Pending
198        }
199    }
200}
201
202impl<K: Eq + Hash + Clone, M> FusedStream for Receiver<K, M> {
203    fn is_terminated(&self) -> bool {
204        self.shared_state.lock().stream_terminated
205    }
206}
207
208/// Create a new Diem Channel and returns the two ends of the channel.
209pub fn new<K: Eq + Hash + Clone, M>(
210    queue_style: QueueStyle, max_queue_size_per_key: usize,
211    counters: Option<&'static IntCounterVec>,
212) -> (Sender<K, M>, Receiver<K, M>) {
213    let max_queue_size_per_key = NonZeroUsize!(
214        max_queue_size_per_key,
215        "diem_channel cannot be of size 0"
216    );
217    let shared_state = Arc::new(Mutex::new(SharedState {
218        internal_queue: PerKeyQueue::new(
219            queue_style,
220            max_queue_size_per_key,
221            counters,
222        ),
223        waker: None,
224        num_senders: 1,
225        receiver_dropped: false,
226        stream_terminated: false,
227    }));
228    let shared_state_clone = Arc::clone(&shared_state);
229    (
230        Sender { shared_state },
231        Receiver {
232            shared_state: shared_state_clone,
233        },
234    )
235}