channel/
lib.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8#![forbid(unsafe_code)]
9
10//! Provides an mpsc (multi-producer single-consumer) channel wrapped in an
11//! [`IntGauge`](diem_metrics::IntGauge) that counts the number of currently
12//! queued items. While there is only one `channel::Receiver`, there can be
13//! many `channel::Sender`s, which are also cheap to clone.
14//!
15//! This channel differs from our other channel implementation,
16//! `channel::diem_channel`, in that it is just a single queue (vs. different
17//! queues for different keys) with backpressure (senders will block if the
18//! queue is full instead of evicting another item in the queue) that only
19//! implements FIFO (vs. LIFO or KLAST).
20
21use diem_metrics::IntGauge;
22use futures::{
23    channel::mpsc,
24    sink::Sink,
25    stream::{FusedStream, Stream},
26    task::{Context, Poll},
27};
28use std::pin::Pin;
29
30#[cfg(test)]
31mod test;
32
33pub mod diem_channel;
34#[cfg(test)]
35mod diem_channel_test;
36
37pub mod message_queues;
38#[cfg(test)]
39mod message_queues_test;
40
41/// An [`mpsc::Sender`](futures::channel::mpsc::Sender) with an [`IntGauge`]
42/// counting the number of currently queued items.
43pub struct Sender<T> {
44    inner: mpsc::Sender<T>,
45    gauge: IntGauge,
46}
47
48/// An [`mpsc::Receiver`](futures::channel::mpsc::Receiver) with an [`IntGauge`]
49/// counting the number of currently queued items.
50pub struct Receiver<T> {
51    inner: mpsc::Receiver<T>,
52    gauge: IntGauge,
53}
54
55impl<T> Clone for Sender<T> {
56    fn clone(&self) -> Self {
57        Self {
58            inner: self.inner.clone(),
59            gauge: self.gauge.clone(),
60        }
61    }
62}
63
64/// `Sender` implements `Sink` in the same way as `mpsc::Sender`, but it
65/// increments the associated `IntGauge` when it sends a message successfully.
66impl<T> Sink<T> for Sender<T> {
67    type Error = mpsc::SendError;
68
69    fn poll_ready(
70        mut self: Pin<&mut Self>, cx: &mut Context<'_>,
71    ) -> Poll<Result<(), Self::Error>> {
72        (*self).inner.poll_ready(cx)
73    }
74
75    fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
76        (*self).inner.start_send(msg).map(|_| self.gauge.inc())
77    }
78
79    fn poll_flush(
80        mut self: Pin<&mut Self>, cx: &mut Context<'_>,
81    ) -> Poll<Result<(), Self::Error>> {
82        Pin::new(&mut self.inner).poll_flush(cx)
83    }
84
85    fn poll_close(
86        mut self: Pin<&mut Self>, cx: &mut Context<'_>,
87    ) -> Poll<Result<(), Self::Error>> {
88        Pin::new(&mut self.inner).poll_close(cx)
89    }
90}
91
92impl<T> Sender<T> {
93    pub fn try_send(&mut self, msg: T) -> Result<(), mpsc::SendError> {
94        (*self)
95            .inner
96            .try_send(msg)
97            .map(|_| self.gauge.inc())
98            .map_err(mpsc::TrySendError::into_send_error)
99    }
100}
101
102impl<T> FusedStream for Receiver<T>
103where T: std::fmt::Debug
104{
105    fn is_terminated(&self) -> bool { self.inner.is_terminated() }
106}
107
108/// `Receiver` implements `Stream` in the same way as `mpsc::Stream`, but it
109/// decrements the associated `IntGauge` when it gets polled successfully.
110impl<T> Stream for Receiver<T> {
111    type Item = T;
112
113    fn poll_next(
114        mut self: Pin<&mut Self>, cx: &mut Context<'_>,
115    ) -> Poll<Option<Self::Item>> {
116        let next = Pin::new(&mut self.inner).poll_next(cx);
117        if let Poll::Ready(Some(_)) = next {
118            self.gauge.dec();
119        }
120        next
121    }
122}
123
124/// Similar to `mpsc::channel`, `new` creates a pair of `Sender` and `Receiver`
125pub fn new<T>(size: usize, gauge: &IntGauge) -> (Sender<T>, Receiver<T>) {
126    gauge.set(0);
127    let (sender, receiver) = mpsc::channel(size);
128    (
129        Sender {
130            inner: sender,
131            gauge: gauge.clone(),
132        },
133        Receiver {
134            inner: receiver,
135            gauge: gauge.clone(),
136        },
137    )
138}
139
140pub fn new_test<T>(size: usize) -> (Sender<T>, Receiver<T>) {
141    let gauge = IntGauge::new("TEST_COUNTER", "test").unwrap();
142    new(size, &gauge)
143}