cfxcore/light_protocol/handler/sync/common/
future_item.rs1use lru_time_cache::LruCache;
6use parking_lot::RwLock;
7use std::{
8 future::Future,
9 hash::Hash,
10 pin::Pin,
11 sync::Arc,
12 task::{Context, Poll, Waker},
13};
14
15pub enum PendingItem<Item, Err> {
16 Ready(Item),
17 Pending(Vec<Waker>),
18 Error(Err),
19}
20
21impl<Item, Err> PendingItem<Item, Err> {
22 pub fn pending() -> Self { Self::Pending(vec![]) }
23
24 pub fn ready(item: Item) -> Self { Self::Ready(item) }
25
26 pub fn clear_error(&mut self) {
27 if let Self::Error(_) = self {
28 *self = Self::pending();
29 }
30 }
31
32 pub fn set(&mut self, item: Item) {
34 match self {
35 Self::Ready(_old) => {
36 }
40 Self::Pending(ws) => {
41 let ws = std::mem::replace(ws, Vec::<Waker>::new());
43
44 *self = Self::Ready(item);
46
47 for w in ws {
49 w.wake();
50 }
51 }
52 Self::Error(_) => {
53 *self = Self::Ready(item);
58 }
59 }
60 }
61
62 pub fn set_error(&mut self, err: Err) {
64 match self {
65 Self::Ready(_) => {
66 }
69 Self::Pending(ws) => {
70 let ws = std::mem::replace(ws, Vec::<Waker>::new());
72
73 *self = Self::Error(err);
75
76 for w in ws {
78 w.wake();
79 }
80 }
81 Self::Error(_) => {
82 *self = Self::Error(err);
83 }
84 }
85 }
86}
87
88impl<Item: Clone, Err: Clone> PendingItem<Item, Err> {
89 fn poll(&mut self, ctx: &mut Context) -> Poll<Result<Item, Err>> {
91 match self {
92 Self::Ready(item) => Poll::Ready(Ok(item.clone())),
93 Self::Pending(ws) => {
94 ws.push(ctx.waker().clone());
96 Poll::Pending
97 }
98 Self::Error(e) => Poll::Ready(Err(e.clone())),
99 }
100 }
101}
102
103pub struct FutureItem<K, V, E> {
104 key: K,
105 verified: Arc<RwLock<LruCache<K, PendingItem<V, E>>>>,
106}
107
108impl<K, V, E> FutureItem<K, V, E> {
109 pub fn new(
110 key: K, verified: Arc<RwLock<LruCache<K, PendingItem<V, E>>>>,
111 ) -> FutureItem<K, V, E> {
112 FutureItem { key, verified }
113 }
114}
115
116impl<K, V, E> Future for FutureItem<K, V, E>
117where
118 K: Clone + Eq + Hash + Ord,
119 V: Clone,
120 E: Clone,
121{
122 type Output = Result<V, E>;
123
124 fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
125 self.verified
126 .write()
127 .entry(self.key.clone())
128 .or_insert(PendingItem::pending())
129 .poll(ctx)
130 }
131}
132
133#[cfg(test)]
134mod tests {
135 use super::{FutureItem, PendingItem};
136 use futures::future::join3;
137 use lru_time_cache::LruCache;
138 use parking_lot::RwLock;
139 use std::{sync::Arc, time::Duration};
140 use tokio::{runtime::Runtime, time::sleep};
141
142 #[test]
143 fn test_set() {
144 const KEY: u64 = 1;
145 const VALUE: u64 = 2;
146 const ERROR: u64 = 3;
147
148 let cache = LruCache::<u64, PendingItem<u64, u64>>::with_capacity(1);
149 let verified = Arc::new(RwLock::new(cache));
150
151 let runtime = Runtime::new().expect("Unable to create a runtime");
152
153 verified
155 .write()
156 .entry(KEY)
157 .or_insert(PendingItem::pending())
158 .set_error(ERROR);
159
160 let res = runtime.block_on(FutureItem::new(KEY, verified.clone()));
162 assert_eq!(res, Err(ERROR));
163
164 verified
166 .write()
167 .entry(KEY)
168 .or_insert(PendingItem::pending())
169 .set(VALUE);
170
171 let res = runtime.block_on(FutureItem::new(KEY, verified.clone()));
173 assert_eq!(res, Ok(VALUE));
174
175 verified
177 .write()
178 .entry(KEY)
179 .or_insert(PendingItem::pending())
180 .set_error(ERROR);
181
182 let res = runtime.block_on(FutureItem::new(KEY, verified.clone()));
184 assert_eq!(res, Ok(VALUE));
185 }
186
187 #[test]
188 fn test_concurrent_access() {
189 const KEY: u64 = 1;
190 const VALUE: u64 = 2;
191 const DELAY: u64 = 10;
192
193 let cache = LruCache::<u64, PendingItem<u64, ()>>::with_capacity(1);
194 let verified = Arc::new(RwLock::new(cache));
195
196 let item1 = FutureItem::new(KEY, verified.clone());
198 let item2 = FutureItem::new(KEY, verified.clone());
199 let item3 = FutureItem::new(KEY, verified.clone());
200
201 let fut1 = async move { item1.await };
203
204 let fut2 = async move {
206 let res2 = item2.await;
207 sleep(Duration::from_millis(2 * DELAY)).await;
208 let res3 = item3.await;
209 (res2, res3)
210 };
211
212 let fut3 = async move {
214 sleep(Duration::from_millis(DELAY)).await;
215
216 verified
217 .write()
218 .entry(KEY)
219 .or_insert(PendingItem::pending())
220 .set(VALUE);
221 };
222
223 let runtime = Runtime::new().expect("Unable to create a runtime");
224 let (res1, (res2, res3), _) = runtime.block_on(join3(fut1, fut2, fut3));
225
226 assert_eq!(res1, Ok(VALUE));
227 assert_eq!(res2, Ok(VALUE));
228 assert_eq!(res3, Ok(VALUE));
229 }
230}