cfxcore/light_protocol/handler/sync/common/
future_item.rsuse lru_time_cache::LruCache;
use parking_lot::RwLock;
use std::{
    future::Future,
    hash::Hash,
    pin::Pin,
    sync::Arc,
    task::{Context, Poll, Waker},
};
pub enum PendingItem<Item, Err> {
    Ready(Item),
    Pending(Vec<Waker>),
    Error(Err),
}
impl<Item, Err> PendingItem<Item, Err> {
    pub fn pending() -> Self { Self::Pending(vec![]) }
    pub fn ready(item: Item) -> Self { Self::Ready(item) }
    pub fn clear_error(&mut self) {
        if let Self::Error(_) = self {
            *self = Self::pending();
        }
    }
    pub fn set(&mut self, item: Item) {
        match self {
            Self::Ready(_old) => {
                }
            Self::Pending(ws) => {
                let ws = std::mem::replace(ws, Vec::<Waker>::new());
                *self = Self::Ready(item);
                for w in ws {
                    w.wake();
                }
            }
            Self::Error(_) => {
                *self = Self::Ready(item);
            }
        }
    }
    pub fn set_error(&mut self, err: Err) {
        match self {
            Self::Ready(_) => {
                }
            Self::Pending(ws) => {
                let ws = std::mem::replace(ws, Vec::<Waker>::new());
                *self = Self::Error(err);
                for w in ws {
                    w.wake();
                }
            }
            Self::Error(_) => {
                *self = Self::Error(err);
            }
        }
    }
}
impl<Item: Clone, Err: Clone> PendingItem<Item, Err> {
    fn poll(&mut self, ctx: &mut Context) -> Poll<Result<Item, Err>> {
        match self {
            Self::Ready(item) => Poll::Ready(Ok(item.clone())),
            Self::Pending(ws) => {
                ws.push(ctx.waker().clone());
                Poll::Pending
            }
            Self::Error(e) => Poll::Ready(Err(e.clone())),
        }
    }
}
pub struct FutureItem<K, V, E> {
    key: K,
    verified: Arc<RwLock<LruCache<K, PendingItem<V, E>>>>,
}
impl<K, V, E> FutureItem<K, V, E> {
    pub fn new(
        key: K, verified: Arc<RwLock<LruCache<K, PendingItem<V, E>>>>,
    ) -> FutureItem<K, V, E> {
        FutureItem { key, verified }
    }
}
impl<K, V, E> Future for FutureItem<K, V, E>
where
    K: Clone + Eq + Hash + Ord,
    V: Clone,
    E: Clone,
{
    type Output = Result<V, E>;
    fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
        self.verified
            .write()
            .entry(self.key.clone())
            .or_insert(PendingItem::pending())
            .poll(ctx)
    }
}
#[cfg(test)]
mod tests {
    use super::{FutureItem, PendingItem};
    use futures::future::join3;
    use lru_time_cache::LruCache;
    use parking_lot::RwLock;
    use std::{sync::Arc, time::Duration};
    use tokio::{runtime::Runtime, time::sleep};
    #[test]
    fn test_set() {
        const KEY: u64 = 1;
        const VALUE: u64 = 2;
        const ERROR: u64 = 3;
        let cache = LruCache::<u64, PendingItem<u64, u64>>::with_capacity(1);
        let verified = Arc::new(RwLock::new(cache));
        let runtime = Runtime::new().expect("Unable to create a runtime");
        verified
            .write()
            .entry(KEY)
            .or_insert(PendingItem::pending())
            .set_error(ERROR);
        let res = runtime.block_on(FutureItem::new(KEY, verified.clone()));
        assert_eq!(res, Err(ERROR));
        verified
            .write()
            .entry(KEY)
            .or_insert(PendingItem::pending())
            .set(VALUE);
        let res = runtime.block_on(FutureItem::new(KEY, verified.clone()));
        assert_eq!(res, Ok(VALUE));
        verified
            .write()
            .entry(KEY)
            .or_insert(PendingItem::pending())
            .set_error(ERROR);
        let res = runtime.block_on(FutureItem::new(KEY, verified.clone()));
        assert_eq!(res, Ok(VALUE));
    }
    #[test]
    fn test_concurrent_access() {
        const KEY: u64 = 1;
        const VALUE: u64 = 2;
        const DELAY: u64 = 10;
        let cache = LruCache::<u64, PendingItem<u64, ()>>::with_capacity(1);
        let verified = Arc::new(RwLock::new(cache));
        let item1 = FutureItem::new(KEY, verified.clone());
        let item2 = FutureItem::new(KEY, verified.clone());
        let item3 = FutureItem::new(KEY, verified.clone());
        let fut1 = async move { item1.await };
        let fut2 = async move {
            let res2 = item2.await;
            sleep(Duration::from_millis(2 * DELAY)).await;
            let res3 = item3.await;
            (res2, res3)
        };
        let fut3 = async move {
            sleep(Duration::from_millis(DELAY)).await;
            verified
                .write()
                .entry(KEY)
                .or_insert(PendingItem::pending())
                .set(VALUE);
        };
        let runtime = Runtime::new().expect("Unable to create a runtime");
        let (res1, (res2, res3), _) = runtime.block_on(join3(fut1, fut2, fut3));
        assert_eq!(res1, Ok(VALUE));
        assert_eq!(res2, Ok(VALUE));
        assert_eq!(res3, Ok(VALUE));
    }
}