cfxcore/sync/message/
throttling.rs1use crate::{
6 message::{Message, MsgId, RequestId},
7 sync::{
8 message::{Context, Handleable},
9 Error,
10 },
11};
12use rlp_derive::{RlpDecodable, RlpEncodable};
13use std::time::{Duration, Instant};
14use throttling::token_bucket::ThrottleResult;
15
16#[derive(Debug, RlpDecodable, RlpEncodable)]
21pub struct Throttled {
22 pub msg_id: MsgId,
23 pub wait_time_nanos: u64,
24 pub request_id: Option<RequestId>,
26}
27
28impl Handleable for Throttled {
29 fn handle(self, ctx: &Context) -> Result<(), Error> {
30 let peer = match ctx.manager.syn.peers.read().get(&ctx.node_id) {
31 Some(peer) => peer.clone(),
32 None => return Ok(()),
33 };
34
35 peer.write().throttled_msgs.set_throttled(
36 self.msg_id,
37 Instant::now() + Duration::from_nanos(self.wait_time_nanos),
38 );
39
40 if let Some(request_id) = self.request_id {
41 let request = ctx.match_request(request_id)?;
42 ctx.manager
43 .request_manager
44 .resend_request_to_another_peer(ctx.io, &request);
45 }
46
47 Ok(())
48 }
49}
50
51pub trait Throttle {
52 fn throttle(&self, ctx: &Context) -> Result<(), Error>;
53}
54
55impl<T: Message> Throttle for T {
56 fn throttle(&self, ctx: &Context) -> Result<(), Error> {
57 let peer = match ctx.manager.syn.peers.read().get(&ctx.node_id) {
58 Some(peer) => peer.clone(),
59 None => return Ok(()),
60 };
61
62 let bucket_name = self.msg_name().to_string();
63 let bucket = match peer.read().throttling.get(&bucket_name) {
64 Some(bucket) => bucket,
65 None => return Ok(()),
66 };
67
68 let (cpu_cost, message_size_cost) = self.throttle_token_cost();
69 let result = bucket.lock().throttle(cpu_cost, message_size_cost);
70
71 match result {
72 ThrottleResult::Success => Ok(()),
73 ThrottleResult::Throttled(wait_time) => {
74 let throttled = Throttled {
75 msg_id: self.msg_id(),
76 wait_time_nanos: wait_time.as_nanos() as u64,
77 request_id: self.get_request_id(),
78 };
79
80 Err(Error::Throttled(self.msg_name(), throttled).into())
81 }
82 ThrottleResult::AlreadyThrottled => {
83 Err(Error::AlreadyThrottled(self.msg_name()).into())
84 }
85 }
86 }
87}