use crate::{
message::{Message, MsgId, RequestId},
sync::{
message::{Context, Handleable},
Error,
},
};
use rlp_derive::{RlpDecodable, RlpEncodable};
use std::time::{Duration, Instant};
use throttling::token_bucket::ThrottleResult;
#[derive(Debug, RlpDecodable, RlpEncodable)]
pub struct Throttled {
pub msg_id: MsgId,
pub wait_time_nanos: u64,
pub request_id: Option<RequestId>,
}
impl Handleable for Throttled {
fn handle(self, ctx: &Context) -> Result<(), Error> {
let peer = match ctx.manager.syn.peers.read().get(&ctx.node_id) {
Some(peer) => peer.clone(),
None => return Ok(()),
};
peer.write().throttled_msgs.set_throttled(
self.msg_id,
Instant::now() + Duration::from_nanos(self.wait_time_nanos),
);
if let Some(request_id) = self.request_id {
let request = ctx.match_request(request_id)?;
ctx.manager
.request_manager
.resend_request_to_another_peer(ctx.io, &request);
}
Ok(())
}
}
pub trait Throttle {
fn throttle(&self, ctx: &Context) -> Result<(), Error>;
}
impl<T: Message> Throttle for T {
fn throttle(&self, ctx: &Context) -> Result<(), Error> {
let peer = match ctx.manager.syn.peers.read().get(&ctx.node_id) {
Some(peer) => peer.clone(),
None => return Ok(()),
};
let bucket_name = self.msg_name().to_string();
let bucket = match peer.read().throttling.get(&bucket_name) {
Some(bucket) => bucket,
None => return Ok(()),
};
let (cpu_cost, message_size_cost) = self.throttle_token_cost();
let result = bucket.lock().throttle(cpu_cost, message_size_cost);
match result {
ThrottleResult::Success => Ok(()),
ThrottleResult::Throttled(wait_time) => {
let throttled = Throttled {
msg_id: self.msg_id(),
wait_time_nanos: wait_time.as_nanos() as u64,
request_id: self.get_request_id(),
};
Err(Error::Throttled(self.msg_name(), throttled).into())
}
ThrottleResult::AlreadyThrottled => {
Err(Error::AlreadyThrottled(self.msg_name()).into())
}
}
}
}