use lru_time_cache::LruCache;
use parking_lot::RwLock;
use std::{
future::Future,
hash::Hash,
pin::Pin,
sync::Arc,
task::{Context, Poll, Waker},
};
pub enum PendingItem<Item, Err> {
Ready(Item),
Pending(Vec<Waker>),
Error(Err),
}
impl<Item, Err> PendingItem<Item, Err> {
pub fn pending() -> Self { Self::Pending(vec![]) }
pub fn ready(item: Item) -> Self { Self::Ready(item) }
pub fn clear_error(&mut self) {
if let Self::Error(_) = self {
*self = Self::pending();
}
}
pub fn set(&mut self, item: Item) {
match self {
Self::Ready(_old) => {
}
Self::Pending(ws) => {
let ws = std::mem::replace(ws, Vec::<Waker>::new());
*self = Self::Ready(item);
for w in ws {
w.wake();
}
}
Self::Error(_) => {
*self = Self::Ready(item);
}
}
}
pub fn set_error(&mut self, err: Err) {
match self {
Self::Ready(_) => {
}
Self::Pending(ws) => {
let ws = std::mem::replace(ws, Vec::<Waker>::new());
*self = Self::Error(err);
for w in ws {
w.wake();
}
}
Self::Error(_) => {
*self = Self::Error(err);
}
}
}
}
impl<Item: Clone, Err: Clone> PendingItem<Item, Err> {
fn poll(&mut self, ctx: &mut Context) -> Poll<Result<Item, Err>> {
match self {
Self::Ready(item) => Poll::Ready(Ok(item.clone())),
Self::Pending(ws) => {
ws.push(ctx.waker().clone());
Poll::Pending
}
Self::Error(e) => Poll::Ready(Err(e.clone())),
}
}
}
pub struct FutureItem<K, V, E> {
key: K,
verified: Arc<RwLock<LruCache<K, PendingItem<V, E>>>>,
}
impl<K, V, E> FutureItem<K, V, E> {
pub fn new(
key: K, verified: Arc<RwLock<LruCache<K, PendingItem<V, E>>>>,
) -> FutureItem<K, V, E> {
FutureItem { key, verified }
}
}
impl<K, V, E> Future for FutureItem<K, V, E>
where
K: Clone + Eq + Hash + Ord,
V: Clone,
E: Clone,
{
type Output = Result<V, E>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
self.verified
.write()
.entry(self.key.clone())
.or_insert(PendingItem::pending())
.poll(ctx)
}
}
#[cfg(test)]
mod tests {
use super::{FutureItem, PendingItem};
use futures::future::join3;
use lru_time_cache::LruCache;
use parking_lot::RwLock;
use std::{sync::Arc, time::Duration};
use tokio::{runtime::Runtime, time::sleep};
#[test]
fn test_set() {
const KEY: u64 = 1;
const VALUE: u64 = 2;
const ERROR: u64 = 3;
let cache = LruCache::<u64, PendingItem<u64, u64>>::with_capacity(1);
let verified = Arc::new(RwLock::new(cache));
let runtime = Runtime::new().expect("Unable to create a runtime");
verified
.write()
.entry(KEY)
.or_insert(PendingItem::pending())
.set_error(ERROR);
let res = runtime.block_on(FutureItem::new(KEY, verified.clone()));
assert_eq!(res, Err(ERROR));
verified
.write()
.entry(KEY)
.or_insert(PendingItem::pending())
.set(VALUE);
let res = runtime.block_on(FutureItem::new(KEY, verified.clone()));
assert_eq!(res, Ok(VALUE));
verified
.write()
.entry(KEY)
.or_insert(PendingItem::pending())
.set_error(ERROR);
let res = runtime.block_on(FutureItem::new(KEY, verified.clone()));
assert_eq!(res, Ok(VALUE));
}
#[test]
fn test_concurrent_access() {
const KEY: u64 = 1;
const VALUE: u64 = 2;
const DELAY: u64 = 10;
let cache = LruCache::<u64, PendingItem<u64, ()>>::with_capacity(1);
let verified = Arc::new(RwLock::new(cache));
let item1 = FutureItem::new(KEY, verified.clone());
let item2 = FutureItem::new(KEY, verified.clone());
let item3 = FutureItem::new(KEY, verified.clone());
let fut1 = async move { item1.await };
let fut2 = async move {
let res2 = item2.await;
sleep(Duration::from_millis(2 * DELAY)).await;
let res3 = item3.await;
(res2, res3)
};
let fut3 = async move {
sleep(Duration::from_millis(DELAY)).await;
verified
.write()
.entry(KEY)
.or_insert(PendingItem::pending())
.set(VALUE);
};
let runtime = Runtime::new().expect("Unable to create a runtime");
let (res1, (res2, res3), _) = runtime.block_on(join3(fut1, fut2, fut3));
assert_eq!(res1, Ok(VALUE));
assert_eq!(res2, Ok(VALUE));
assert_eq!(res3, Ok(VALUE));
}
}