cfxcore/light_protocol/handler/sync/common/
future_item.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use lru_time_cache::LruCache;
6use parking_lot::RwLock;
7use std::{
8    future::Future,
9    hash::Hash,
10    pin::Pin,
11    sync::Arc,
12    task::{Context, Poll, Waker},
13};
14
15pub enum PendingItem<Item, Err> {
16    Ready(Item),
17    Pending(Vec<Waker>),
18    Error(Err),
19}
20
21impl<Item, Err> PendingItem<Item, Err> {
22    pub fn pending() -> Self { Self::Pending(vec![]) }
23
24    pub fn ready(item: Item) -> Self { Self::Ready(item) }
25
26    pub fn clear_error(&mut self) {
27        if let Self::Error(_) = self {
28            *self = Self::pending();
29        }
30    }
31
32    // NOTE: `set` has to be called in a thread-safe environment
33    pub fn set(&mut self, item: Item) {
34        match self {
35            Self::Ready(_old) => {
36                // FIXME: we might want to check if old == item and raise an
37                // error if not. This, however, would require that T : Eq.
38                // This should not happen unless there are deep chain reorgs.
39            }
40            Self::Pending(ws) => {
41                // move `ws` out
42                let ws = std::mem::replace(ws, Vec::<Waker>::new());
43
44                // transform `self`
45                *self = Self::Ready(item);
46
47                // notify waiting futures
48                for w in ws {
49                    w.wake();
50                }
51            }
52            Self::Error(_) => {
53                // if we managed to verify the item, we do not care about the
54                // error anymore. wakers must have been notified when `self` was
55                // set to `Error`, so they either received an error or haven't
56                // polled yet.
57                *self = Self::Ready(item);
58            }
59        }
60    }
61
62    // NOTE: `set_error` has to be called in a thread-safe environment
63    pub fn set_error(&mut self, err: Err) {
64        match self {
65            Self::Ready(_) => {
66                // if we already have a verified value, we do not care about
67                // errors anymore
68            }
69            Self::Pending(ws) => {
70                // move `ws` out
71                let ws = std::mem::replace(ws, Vec::<Waker>::new());
72
73                // transform `self`
74                *self = Self::Error(err);
75
76                // notify waiting futures
77                for w in ws {
78                    w.wake();
79                }
80            }
81            Self::Error(_) => {
82                *self = Self::Error(err);
83            }
84        }
85    }
86}
87
88impl<Item: Clone, Err: Clone> PendingItem<Item, Err> {
89    // NOTE: `poll` has to be called in a thread-safe environment
90    fn poll(&mut self, ctx: &mut Context) -> Poll<Result<Item, Err>> {
91        match self {
92            Self::Ready(item) => Poll::Ready(Ok(item.clone())),
93            Self::Pending(ws) => {
94                // FIXME: is it safe to keep old wakers?
95                ws.push(ctx.waker().clone());
96                Poll::Pending
97            }
98            Self::Error(e) => Poll::Ready(Err(e.clone())),
99        }
100    }
101}
102
103pub struct FutureItem<K, V, E> {
104    key: K,
105    verified: Arc<RwLock<LruCache<K, PendingItem<V, E>>>>,
106}
107
108impl<K, V, E> FutureItem<K, V, E> {
109    pub fn new(
110        key: K, verified: Arc<RwLock<LruCache<K, PendingItem<V, E>>>>,
111    ) -> FutureItem<K, V, E> {
112        FutureItem { key, verified }
113    }
114}
115
116impl<K, V, E> Future for FutureItem<K, V, E>
117where
118    K: Clone + Eq + Hash + Ord,
119    V: Clone,
120    E: Clone,
121{
122    type Output = Result<V, E>;
123
124    fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
125        self.verified
126            .write()
127            .entry(self.key.clone())
128            .or_insert(PendingItem::pending())
129            .poll(ctx)
130    }
131}
132
133#[cfg(test)]
134mod tests {
135    use super::{FutureItem, PendingItem};
136    use futures::future::join3;
137    use lru_time_cache::LruCache;
138    use parking_lot::RwLock;
139    use std::{sync::Arc, time::Duration};
140    use tokio::{runtime::Runtime, time::sleep};
141
142    #[test]
143    fn test_set() {
144        const KEY: u64 = 1;
145        const VALUE: u64 = 2;
146        const ERROR: u64 = 3;
147
148        let cache = LruCache::<u64, PendingItem<u64, u64>>::with_capacity(1);
149        let verified = Arc::new(RwLock::new(cache));
150
151        let runtime = Runtime::new().expect("Unable to create a runtime");
152
153        // set error
154        verified
155            .write()
156            .entry(KEY)
157            .or_insert(PendingItem::pending())
158            .set_error(ERROR);
159
160        // caller should get the error
161        let res = runtime.block_on(FutureItem::new(KEY, verified.clone()));
162        assert_eq!(res, Err(ERROR));
163
164        // set value
165        verified
166            .write()
167            .entry(KEY)
168            .or_insert(PendingItem::pending())
169            .set(VALUE);
170
171        // caller should get the value
172        let res = runtime.block_on(FutureItem::new(KEY, verified.clone()));
173        assert_eq!(res, Ok(VALUE));
174
175        // set error again
176        verified
177            .write()
178            .entry(KEY)
179            .or_insert(PendingItem::pending())
180            .set_error(ERROR);
181
182        // result is not overwritten by error
183        let res = runtime.block_on(FutureItem::new(KEY, verified.clone()));
184        assert_eq!(res, Ok(VALUE));
185    }
186
187    #[test]
188    fn test_concurrent_access() {
189        const KEY: u64 = 1;
190        const VALUE: u64 = 2;
191        const DELAY: u64 = 10;
192
193        let cache = LruCache::<u64, PendingItem<u64, ()>>::with_capacity(1);
194        let verified = Arc::new(RwLock::new(cache));
195
196        // we will simulate 3 concurrent accesses to the same item
197        let item1 = FutureItem::new(KEY, verified.clone());
198        let item2 = FutureItem::new(KEY, verified.clone());
199        let item3 = FutureItem::new(KEY, verified.clone());
200
201        // request item once
202        let fut1 = async move { item1.await };
203
204        // request item, wait, then request again
205        let fut2 = async move {
206            let res2 = item2.await;
207            sleep(Duration::from_millis(2 * DELAY)).await;
208            let res3 = item3.await;
209            (res2, res3)
210        };
211
212        // wait, then provide item
213        let fut3 = async move {
214            sleep(Duration::from_millis(DELAY)).await;
215
216            verified
217                .write()
218                .entry(KEY)
219                .or_insert(PendingItem::pending())
220                .set(VALUE);
221        };
222
223        let runtime = Runtime::new().expect("Unable to create a runtime");
224        let (res1, (res2, res3), _) = runtime.block_on(join3(fut1, fut2, fut3));
225
226        assert_eq!(res1, Ok(VALUE));
227        assert_eq!(res2, Ok(VALUE));
228        assert_eq!(res3, Ok(VALUE));
229    }
230}