1#![forbid(unsafe_code)]
9
10use diem_metrics::IntGauge;
22use futures::{
23 channel::mpsc,
24 sink::Sink,
25 stream::{FusedStream, Stream},
26 task::{Context, Poll},
27};
28use std::pin::Pin;
29
30#[cfg(test)]
31mod test;
32
33pub mod diem_channel;
34#[cfg(test)]
35mod diem_channel_test;
36
37pub mod message_queues;
38#[cfg(test)]
39mod message_queues_test;
40
41pub struct Sender<T> {
44 inner: mpsc::Sender<T>,
45 gauge: IntGauge,
46}
47
48pub struct Receiver<T> {
51 inner: mpsc::Receiver<T>,
52 gauge: IntGauge,
53}
54
55impl<T> Clone for Sender<T> {
56 fn clone(&self) -> Self {
57 Self {
58 inner: self.inner.clone(),
59 gauge: self.gauge.clone(),
60 }
61 }
62}
63
64impl<T> Sink<T> for Sender<T> {
67 type Error = mpsc::SendError;
68
69 fn poll_ready(
70 mut self: Pin<&mut Self>, cx: &mut Context<'_>,
71 ) -> Poll<Result<(), Self::Error>> {
72 (*self).inner.poll_ready(cx)
73 }
74
75 fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
76 (*self).inner.start_send(msg).map(|_| self.gauge.inc())
77 }
78
79 fn poll_flush(
80 mut self: Pin<&mut Self>, cx: &mut Context<'_>,
81 ) -> Poll<Result<(), Self::Error>> {
82 Pin::new(&mut self.inner).poll_flush(cx)
83 }
84
85 fn poll_close(
86 mut self: Pin<&mut Self>, cx: &mut Context<'_>,
87 ) -> Poll<Result<(), Self::Error>> {
88 Pin::new(&mut self.inner).poll_close(cx)
89 }
90}
91
92impl<T> Sender<T> {
93 pub fn try_send(&mut self, msg: T) -> Result<(), mpsc::SendError> {
94 (*self)
95 .inner
96 .try_send(msg)
97 .map(|_| self.gauge.inc())
98 .map_err(mpsc::TrySendError::into_send_error)
99 }
100}
101
102impl<T> FusedStream for Receiver<T>
103where T: std::fmt::Debug
104{
105 fn is_terminated(&self) -> bool { self.inner.is_terminated() }
106}
107
108impl<T> Stream for Receiver<T> {
111 type Item = T;
112
113 fn poll_next(
114 mut self: Pin<&mut Self>, cx: &mut Context<'_>,
115 ) -> Poll<Option<Self::Item>> {
116 let next = Pin::new(&mut self.inner).poll_next(cx);
117 if let Poll::Ready(Some(_)) = next {
118 self.gauge.dec();
119 }
120 next
121 }
122}
123
124pub fn new<T>(size: usize, gauge: &IntGauge) -> (Sender<T>, Receiver<T>) {
126 gauge.set(0);
127 let (sender, receiver) = mpsc::channel(size);
128 (
129 Sender {
130 inner: sender,
131 gauge: gauge.clone(),
132 },
133 Receiver {
134 inner: receiver,
135 gauge: gauge.clone(),
136 },
137 )
138}
139
140pub fn new_test<T>(size: usize) -> (Sender<T>, Receiver<T>) {
141 let gauge = IntGauge::new("TEST_COUNTER", "test").unwrap();
142 new(size, &gauge)
143}