cfxcore/pos/protocol/request_manager/
mod.rsuse crate::{
    pos::protocol::sync_protocol::RpcResponse,
    sync::{Error, ProtocolConfiguration},
};
use cfx_parameters::sync::REQUEST_START_WAITING_TIME;
use diem_logger::prelude::diem_debug;
use futures::{channel::oneshot, future::Future};
use network::{node_table::NodeId, NetworkContext};
use parking_lot::Mutex;
pub use request_handler::{
    AsAny, Request, RequestHandler, RequestMessage, SynchronizationPeerRequest,
};
use std::{
    cmp::Ordering,
    collections::binary_heap::BinaryHeap,
    sync::Arc,
    time::{Duration, Instant},
};
pub mod request_handler;
#[derive(Debug)]
struct WaitingRequest(Box<dyn Request>, Duration); pub struct RequestManager {
    waiting_requests: Mutex<BinaryHeap<TimedWaitingRequest>>,
    request_handler: Arc<RequestHandler>,
}
impl RequestManager {
    pub fn new(protocol_config: &ProtocolConfiguration) -> Self {
        Self {
            waiting_requests: Default::default(),
            request_handler: Arc::new(RequestHandler::new(protocol_config)),
        }
    }
    pub async fn unary_rpc<'a>(
        &'a self, io: &'a dyn NetworkContext, recipient: Option<NodeId>,
        mut request: Box<dyn Request>,
    ) -> impl Future<Output = Result<Box<dyn RpcResponse>, Error>> + 'a {
        async move {
            let (res_tx, res_rx) = oneshot::channel();
            request.set_response_notification(res_tx);
            self.request_with_delay(io, request, recipient, None);
            let response = res_rx.await??;
            Ok(response)
        }
    }
    pub fn request_with_delay(
        &self, io: &dyn NetworkContext, mut request: Box<dyn Request>,
        peer: Option<NodeId>, delay: Option<Duration>,
    ) {
        let (cur_delay, next_delay) = match delay {
            Some(d) => (d, d + *REQUEST_START_WAITING_TIME),
            None => (*REQUEST_START_WAITING_TIME, *REQUEST_START_WAITING_TIME),
        };
        if peer.is_none() {
            request.notify_error(Error::RpcCancelledByDisconnection.into());
            return;
        }
        if delay.is_some() {
            diem_debug!("request_with_delay: add request to waiting_requests, peer={:?}, request={:?}, delay={:?}", peer, request, cur_delay);
            self.waiting_requests.lock().push(TimedWaitingRequest::new(
                Instant::now() + cur_delay,
                WaitingRequest(request, next_delay),
                peer.unwrap(),
            ));
            return;
        }
        if let Err(mut req) = self.request_handler.send_request(
            io,
            peer,
            request,
            Some(next_delay),
        ) {
            debug!("request_with_delay: send_request fails, peer={:?}, request={:?}", peer, req);
            req.notify_error(Error::RpcCancelledByDisconnection.into());
        }
    }
    pub fn match_request(
        &self, io: &dyn NetworkContext, peer_id: &NodeId, request_id: u64,
    ) -> Result<RequestMessage, Error> {
        self.request_handler.match_request(io, peer_id, request_id)
    }
    pub fn process_timeout_requests(&self, io: &dyn NetworkContext) {
        trace!("process_timeout_requests: start");
        let timeout_requests = self.request_handler.get_timeout_requests(io);
        for mut req in timeout_requests {
            debug!("Timeout requests: {:?}", req);
            req.request.notify_error(Error::RpcTimeout.into());
        }
    }
    pub fn resend_waiting_requests(&self, io: &dyn NetworkContext) {
        debug!("resend_waiting_requests: start");
        let mut waiting_requests = self.waiting_requests.lock();
        let now = Instant::now();
        while let Some(req) = waiting_requests.pop() {
            if req.time_to_send >= now {
                waiting_requests.push(req);
                break;
            }
            let chosen_peer = req.peer;
            debug!("Send waiting req {:?} to peer={}", req, chosen_peer);
            let WaitingRequest(request, delay) = req.request;
            let next_delay = delay + *REQUEST_START_WAITING_TIME;
            if let Err(mut req) = self.request_handler.send_request(
                io,
                Some(chosen_peer),
                request,
                Some(next_delay),
            ) {
                req.notify_error(Error::RpcCancelledByDisconnection.into());
            }
        }
    }
    pub fn on_peer_connected(&self, peer: &NodeId) {
        self.request_handler.add_peer(*peer);
    }
    pub fn on_peer_disconnected(
        &self, _io: &dyn NetworkContext, peer: &NodeId,
    ) {
        if let Some(unfinished_requests) =
            self.request_handler.remove_peer(peer)
        {
            for mut msg in unfinished_requests {
                msg.request
                    .notify_error(Error::RpcCancelledByDisconnection.into());
            }
        } else {
            debug!("Peer already removed form request manager when disconnected peer={}", peer);
        }
    }
}
#[derive(Debug)]
struct TimedWaitingRequest {
    time_to_send: Instant,
    request: WaitingRequest,
    peer: NodeId,
}
impl TimedWaitingRequest {
    fn new(
        time_to_send: Instant, request: WaitingRequest, peer: NodeId,
    ) -> Self {
        Self {
            time_to_send,
            request,
            peer,
        }
    }
}
impl Ord for TimedWaitingRequest {
    fn cmp(&self, other: &Self) -> Ordering {
        other.time_to_send.cmp(&self.time_to_send)
    }
}
impl PartialOrd for TimedWaitingRequest {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}
impl Eq for TimedWaitingRequest {}
impl PartialEq for TimedWaitingRequest {
    fn eq(&self, other: &Self) -> bool {
        self.time_to_send == other.time_to_send
    }
}