cfxcore/sync/message/
throttling.rsuse crate::{
    message::{Message, MsgId, RequestId},
    sync::{
        message::{Context, Handleable},
        Error,
    },
};
use rlp_derive::{RlpDecodable, RlpEncodable};
use std::time::{Duration, Instant};
use throttling::token_bucket::ThrottleResult;
#[derive(Debug, RlpDecodable, RlpEncodable)]
pub struct Throttled {
    pub msg_id: MsgId,
    pub wait_time_nanos: u64,
    pub request_id: Option<RequestId>,
}
impl Handleable for Throttled {
    fn handle(self, ctx: &Context) -> Result<(), Error> {
        let peer = match ctx.manager.syn.peers.read().get(&ctx.node_id) {
            Some(peer) => peer.clone(),
            None => return Ok(()),
        };
        peer.write().throttled_msgs.set_throttled(
            self.msg_id,
            Instant::now() + Duration::from_nanos(self.wait_time_nanos),
        );
        if let Some(request_id) = self.request_id {
            let request = ctx.match_request(request_id)?;
            ctx.manager
                .request_manager
                .resend_request_to_another_peer(ctx.io, &request);
        }
        Ok(())
    }
}
pub trait Throttle {
    fn throttle(&self, ctx: &Context) -> Result<(), Error>;
}
impl<T: Message> Throttle for T {
    fn throttle(&self, ctx: &Context) -> Result<(), Error> {
        let peer = match ctx.manager.syn.peers.read().get(&ctx.node_id) {
            Some(peer) => peer.clone(),
            None => return Ok(()),
        };
        let bucket_name = self.msg_name().to_string();
        let bucket = match peer.read().throttling.get(&bucket_name) {
            Some(bucket) => bucket,
            None => return Ok(()),
        };
        let (cpu_cost, message_size_cost) = self.throttle_token_cost();
        let result = bucket.lock().throttle(cpu_cost, message_size_cost);
        match result {
            ThrottleResult::Success => Ok(()),
            ThrottleResult::Throttled(wait_time) => {
                let throttled = Throttled {
                    msg_id: self.msg_id(),
                    wait_time_nanos: wait_time.as_nanos() as u64,
                    request_id: self.get_request_id(),
                };
                Err(Error::Throttled(self.msg_name(), throttled).into())
            }
            ThrottleResult::AlreadyThrottled => {
                Err(Error::AlreadyThrottled(self.msg_name()).into())
            }
        }
    }
}