1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
use cfx_rpc_utils::error::{
    jsonrpc_error_helpers::request_rejected_too_many_request_error,
    jsonrpsee_error_helpers::jsonrpc_error_to_error_object_owned,
};
use cfx_util_macros::bail;
use futures::FutureExt;
use futures_util::future::BoxFuture;
use jsonrpsee::{
    core::RpcResult,
    server::{middleware::rpc::RpcServiceT, MethodResponse},
};
use jsonrpsee_types::Request;
use log::debug;
use throttling::token_bucket::{ThrottleResult, TokenBucketManager};

#[derive(Clone)]
pub struct Throttle<S> {
    service: S,
    manager: TokenBucketManager,
}

impl<S> Throttle<S> {
    pub fn new(file: Option<&str>, section: &str, s: S) -> Self {
        let manager = match file {
            Some(file) => TokenBucketManager::load(file, Some(section))
                .expect("invalid throttling configuration file"),
            None => TokenBucketManager::default(),
        };

        Throttle {
            service: s,
            manager,
        }
    }

    pub fn before(&self, name: &String) -> RpcResult<()> {
        let bucket = match self.manager.get(name) {
            Some(bucket) => bucket,
            None => return Ok(()),
        };

        let result = bucket.lock().throttle_default();

        match result {
            ThrottleResult::Success => Ok(()),
            ThrottleResult::Throttled(wait_time) => {
                debug!("RPC {} throttled in {:?}", name, wait_time);
                let err = request_rejected_too_many_request_error(Some(
                    format!("throttled in {:?}", wait_time),
                ));
                bail!(jsonrpc_error_to_error_object_owned(err))
            }
            ThrottleResult::AlreadyThrottled => {
                debug!("RPC {} already throttled", name);
                let err = request_rejected_too_many_request_error(Some(
                    "already throttled, please try again later".into(),
                ));
                bail!(jsonrpc_error_to_error_object_owned(err))
            }
        }
    }
}

impl<'a, S> RpcServiceT<'a> for Throttle<S>
where S: RpcServiceT<'a> + Send + Sync + Clone + 'static
{
    type Future = BoxFuture<'a, MethodResponse>;

    fn call(&self, req: Request<'a>) -> Self::Future {
        let service = self.service.clone();
        let throlltle_result = self.before(&req.method_name().to_string());
        // Box::pin(async move { service.call(req).await })
        match throlltle_result {
            Ok(_) => {
                debug!("throttle interceptor: method `{}` success", req.method);
                Box::pin(async move { service.call(req).await }).boxed()
            }
            Err(e) => {
                debug!("throttle interceptor: method `{}` failed", req.method);
                Box::pin(async move { MethodResponse::error(req.id, e) })
                    .boxed()
            }
        }
    }
}