cfxcore/sync/message/
throttling.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::{
6    message::{Message, MsgId, RequestId},
7    sync::{
8        message::{Context, Handleable},
9        Error,
10    },
11};
12use rlp_derive::{RlpDecodable, RlpEncodable};
13use std::time::{Duration, Instant};
14use throttling::token_bucket::ThrottleResult;
15
16// TODO: It seems better to distinguish request, response, and different kind of
17// TODO: requests, as here in this class it tries to resend request to another
18// TODO: peer. This class is implemented to all Message type. But the resend
19// TODO: functionality applies only to AnyCast request.
20#[derive(Debug, RlpDecodable, RlpEncodable)]
21pub struct Throttled {
22    pub msg_id: MsgId,
23    pub wait_time_nanos: u64,
24    // resend request to another peer if throttled
25    pub request_id: Option<RequestId>,
26}
27
28impl Handleable for Throttled {
29    fn handle(self, ctx: &Context) -> Result<(), Error> {
30        let peer = match ctx.manager.syn.peers.read().get(&ctx.node_id) {
31            Some(peer) => peer.clone(),
32            None => return Ok(()),
33        };
34
35        peer.write().throttled_msgs.set_throttled(
36            self.msg_id,
37            Instant::now() + Duration::from_nanos(self.wait_time_nanos),
38        );
39
40        if let Some(request_id) = self.request_id {
41            let request = ctx.match_request(request_id)?;
42            ctx.manager
43                .request_manager
44                .resend_request_to_another_peer(ctx.io, &request);
45        }
46
47        Ok(())
48    }
49}
50
51pub trait Throttle {
52    fn throttle(&self, ctx: &Context) -> Result<(), Error>;
53}
54
55impl<T: Message> Throttle for T {
56    fn throttle(&self, ctx: &Context) -> Result<(), Error> {
57        let peer = match ctx.manager.syn.peers.read().get(&ctx.node_id) {
58            Some(peer) => peer.clone(),
59            None => return Ok(()),
60        };
61
62        let bucket_name = self.msg_name().to_string();
63        let bucket = match peer.read().throttling.get(&bucket_name) {
64            Some(bucket) => bucket,
65            None => return Ok(()),
66        };
67
68        let (cpu_cost, message_size_cost) = self.throttle_token_cost();
69        let result = bucket.lock().throttle(cpu_cost, message_size_cost);
70
71        match result {
72            ThrottleResult::Success => Ok(()),
73            ThrottleResult::Throttled(wait_time) => {
74                let throttled = Throttled {
75                    msg_id: self.msg_id(),
76                    wait_time_nanos: wait_time.as_nanos() as u64,
77                    request_id: self.get_request_id(),
78                };
79
80                Err(Error::Throttled(self.msg_name(), throttled).into())
81            }
82            ThrottleResult::AlreadyThrottled => {
83                Err(Error::AlreadyThrottled(self.msg_name()).into())
84            }
85        }
86    }
87}