client/rpc/
mod.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use cfx_rpc_builder::{
6    RpcModuleBuilder, RpcServerConfig, RpcServerHandle,
7    TransportRpcModuleConfig,
8};
9use cfx_tasks::TaskExecutor;
10use cfxcore::{
11    Notifications, SharedConsensusGraph, SharedSynchronizationService,
12    SharedTransactionPool,
13};
14use jsonrpc_core::{MetaIoHandler, RemoteProcedure, Value};
15use jsonrpc_http_server::{
16    Server as HttpServer, ServerBuilder as HttpServerBuilder,
17};
18use jsonrpc_tcp_server::{
19    MetaExtractor as TpcMetaExtractor, Server as TcpServer,
20    ServerBuilder as TcpServerBuilder,
21};
22use jsonrpc_ws_server::{
23    MetaExtractor as WsMetaExtractor, Server as WsServer,
24    ServerBuilder as WsServerBuilder,
25};
26pub use jsonrpsee::server::ServerBuilder;
27use log::{info, warn};
28use std::sync::Arc;
29
30mod authcodes;
31pub mod errors;
32pub mod extractor;
33mod helpers;
34mod http_common;
35pub mod impls;
36pub mod informant;
37mod interceptor;
38pub mod metadata;
39mod traits;
40pub mod types;
41
42pub use cfxcore::errors::{
43    BoxFuture as CoreBoxFuture, Error as CoreError, Result as CoreResult,
44};
45pub use errors::{error_codes, invalid_params};
46
47use self::{
48    impls::{
49        cfx::{CfxHandler, LocalRpcImpl, RpcImpl, TestRpcImpl, TraceHandler},
50        cfx_filter::CfxFilterClient,
51        common::RpcImpl as CommonImpl,
52        light::{
53            CfxHandler as LightCfxHandler, DebugRpcImpl as LightDebugRpcImpl,
54            RpcImpl as LightImpl, TestRpcImpl as LightTestRpcImpl,
55        },
56        pool::TransactionPoolHandler,
57        pos::{PoSInterceptor, PosHandler},
58        pubsub::PubSubClient,
59    },
60    traits::{
61        cfx::Cfx, cfx_filter::CfxFilter, debug::LocalRpc,
62        pool::TransactionPool, pos::Pos, pubsub::PubSub, test::TestRpc,
63        trace::Trace,
64    },
65};
66
67pub use self::types::{Block as RpcBlock, Origin};
68use crate::{
69    configuration::Configuration,
70    rpc::{
71        impls::RpcImplConfiguration,
72        interceptor::{RpcInterceptor, RpcProxy},
73    },
74};
75pub use cfx_config::rpc_server_config::{
76    HttpConfiguration, TcpConfiguration, WsConfiguration,
77};
78use cfx_rpc_cfx_types::apis::{Api, ApiSet};
79use interceptor::{MetricsInterceptor, ThrottleInterceptor};
80pub use metadata::Metadata;
81use std::collections::HashSet;
82
83pub fn setup_public_rpc_apis(
84    common: Arc<CommonImpl>, rpc: Arc<RpcImpl>, pubsub: PubSubClient,
85    conf: &Configuration,
86) -> MetaIoHandler<Metadata> {
87    setup_rpc_apis(
88        common,
89        rpc,
90        pubsub,
91        &conf.raw_conf.throttling_conf,
92        "rpc",
93        conf.raw_conf.public_rpc_apis.list_apis(),
94    )
95}
96
97pub fn setup_debug_rpc_apis(
98    common: Arc<CommonImpl>, rpc: Arc<RpcImpl>, pubsub: PubSubClient,
99    conf: &Configuration,
100) -> MetaIoHandler<Metadata> {
101    setup_rpc_apis(
102        common,
103        rpc,
104        pubsub,
105        &conf.raw_conf.throttling_conf,
106        "rpc_local",
107        ApiSet::All.list_apis(),
108    )
109}
110
111fn setup_rpc_apis(
112    common: Arc<CommonImpl>, rpc: Arc<RpcImpl>, pubsub: PubSubClient,
113    throttling_conf: &Option<String>, throttling_section: &str,
114    apis: HashSet<Api>,
115) -> MetaIoHandler<Metadata> {
116    let mut handler = MetaIoHandler::default();
117    for api in &apis {
118        match api {
119            Api::Cfx => {
120                let cfx =
121                    CfxHandler::new(common.clone(), rpc.clone()).to_delegate();
122                extend_with_interceptor(
123                    &mut handler,
124                    &rpc.config,
125                    cfx,
126                    throttling_conf,
127                    throttling_section,
128                );
129
130                if let Some(poll_lifetime) = rpc.config.poll_lifetime_in_seconds
131                {
132                    if let Some(h) = pubsub.handler().upgrade() {
133                        let filter_client = CfxFilterClient::new(
134                            rpc.consensus.clone(),
135                            rpc.tx_pool.clone(),
136                            pubsub.epochs_ordered(),
137                            pubsub.executor.clone(),
138                            poll_lifetime,
139                            rpc.config.get_logs_filter_max_limit,
140                            h.network.clone(),
141                        )
142                        .to_delegate();
143
144                        extend_with_interceptor(
145                            &mut handler,
146                            &rpc.config,
147                            filter_client,
148                            throttling_conf,
149                            throttling_section,
150                        );
151                    }
152                }
153            }
154            Api::Debug => {
155                handler.extend_with(
156                    LocalRpcImpl::new(common.clone(), rpc.clone())
157                        .to_delegate(),
158                );
159            }
160            Api::Pubsub => {
161                extend_with_interceptor(
162                    &mut handler,
163                    &rpc.config,
164                    pubsub.clone().to_delegate(),
165                    throttling_conf,
166                    throttling_section,
167                );
168            }
169            Api::Test => {
170                handler.extend_with(
171                    TestRpcImpl::new(common.clone(), rpc.clone()).to_delegate(),
172                );
173            }
174            Api::Trace => {
175                let trace = TraceHandler::new(
176                    *rpc.sync.network.get_network_type(),
177                    rpc.consensus.clone(),
178                )
179                .to_delegate();
180                extend_with_interceptor(
181                    &mut handler,
182                    &rpc.config,
183                    trace,
184                    throttling_conf,
185                    throttling_section,
186                );
187            }
188            Api::TxPool => {
189                let txpool =
190                    TransactionPoolHandler::new(common.clone()).to_delegate();
191                extend_with_interceptor(
192                    &mut handler,
193                    &rpc.config,
194                    txpool,
195                    throttling_conf,
196                    throttling_section,
197                );
198            }
199            Api::Pos => {
200                let pos = PosHandler::new(
201                    common.pos_handler.clone(),
202                    rpc.consensus.data_manager().clone(),
203                    *rpc.sync.network.get_network_type(),
204                    rpc.consensus.clone(),
205                )
206                .to_delegate();
207                let pos_interceptor =
208                    PoSInterceptor::new(common.pos_handler.clone());
209                handler.extend_with(RpcProxy::new(pos, pos_interceptor));
210            }
211        }
212    }
213
214    add_meta_rpc_methods(handler, apis)
215}
216
217pub fn extend_with_interceptor<
218    T: IntoIterator<Item = (String, RemoteProcedure<Metadata>)>,
219>(
220    handler: &mut MetaIoHandler<Metadata>, rpc_conf: &RpcImplConfiguration,
221    rpc_impl: T, throttling_conf: &Option<String>, throttling_section: &str,
222) {
223    let interceptor =
224        ThrottleInterceptor::new(throttling_conf, throttling_section);
225    if rpc_conf.enable_metrics {
226        handler.extend_with(RpcProxy::new(
227            rpc_impl,
228            MetricsInterceptor::new(interceptor),
229        ));
230    } else {
231        handler.extend_with(RpcProxy::new(rpc_impl, interceptor));
232    }
233}
234
235fn add_meta_rpc_methods(
236    mut handler: MetaIoHandler<Metadata>, apis: HashSet<Api>,
237) -> MetaIoHandler<Metadata> {
238    // rpc_methods to return all available methods
239    let methods: Vec<String> =
240        handler.iter().map(|(method, _)| method).cloned().collect();
241    handler.add_sync_method("rpc_methods", move |_| {
242        let method_list = methods
243            .clone()
244            .iter()
245            .map(|m| Value::String(m.to_string()))
246            .collect();
247        Ok(Value::Array(method_list))
248    });
249
250    // rpc_modules
251    let namespaces: Vec<String> =
252        apis.into_iter().map(|item| format!("{}", item)).collect();
253    handler.add_sync_method("rpc_modules", move |_| {
254        let ns = namespaces
255            .clone()
256            .iter()
257            .map(|m| Value::String(m.to_string()))
258            .collect();
259        Ok(Value::Array(ns))
260    });
261
262    handler
263}
264
265pub fn setup_public_rpc_apis_light(
266    common: Arc<CommonImpl>, rpc: Arc<LightImpl>, pubsub: PubSubClient,
267    conf: &Configuration,
268) -> MetaIoHandler<Metadata> {
269    setup_rpc_apis_light(
270        common,
271        rpc,
272        pubsub,
273        &conf.raw_conf.throttling_conf,
274        "rpc",
275        conf.raw_conf.public_rpc_apis.list_apis(),
276    )
277}
278
279pub fn setup_debug_rpc_apis_light(
280    common: Arc<CommonImpl>, rpc: Arc<LightImpl>, pubsub: PubSubClient,
281    conf: &Configuration,
282) -> MetaIoHandler<Metadata> {
283    let mut light_debug_apis = ApiSet::All.list_apis();
284    light_debug_apis.remove(&Api::Trace);
285    setup_rpc_apis_light(
286        common,
287        rpc,
288        pubsub,
289        &conf.raw_conf.throttling_conf,
290        "rpc_local",
291        light_debug_apis,
292    )
293}
294
295fn setup_rpc_apis_light(
296    common: Arc<CommonImpl>, rpc: Arc<LightImpl>, pubsub: PubSubClient,
297    throttling_conf: &Option<String>, throttling_section: &str,
298    apis: HashSet<Api>,
299) -> MetaIoHandler<Metadata> {
300    let mut handler = MetaIoHandler::default();
301    for api in apis {
302        match api {
303            Api::Cfx => {
304                let cfx = LightCfxHandler::new(common.clone(), rpc.clone())
305                    .to_delegate();
306                let interceptor = ThrottleInterceptor::new(
307                    throttling_conf,
308                    throttling_section,
309                );
310                handler.extend_with(RpcProxy::new(cfx, interceptor));
311            }
312            Api::Debug => {
313                handler.extend_with(
314                    LightDebugRpcImpl::new(common.clone(), rpc.clone())
315                        .to_delegate(),
316                );
317            }
318            Api::Pubsub => handler.extend_with(pubsub.clone().to_delegate()),
319            Api::Test => {
320                handler.extend_with(
321                    LightTestRpcImpl::new(common.clone(), rpc.clone())
322                        .to_delegate(),
323                );
324            }
325            Api::Trace => {
326                warn!("Light nodes do not support trace RPC");
327            }
328            Api::TxPool => {
329                warn!("Light nodes do not support txpool RPC");
330            }
331            Api::Pos => {
332                warn!("Light nodes do not support PoS RPC");
333            }
334        }
335    }
336    handler
337}
338
339pub fn start_tcp<H, T>(
340    conf: TcpConfiguration, handler: H, extractor: T,
341) -> Result<Option<TcpServer>, String>
342where
343    H: Into<MetaIoHandler<Metadata>>,
344    T: TpcMetaExtractor<Metadata> + 'static,
345{
346    if !conf.enabled {
347        return Ok(None);
348    }
349
350    match TcpServerBuilder::with_meta_extractor(handler, extractor)
351        .start(&conf.address)
352    {
353        Ok(server) => Ok(Some(server)),
354        Err(io_error) => {
355            Err(format!("TCP error: {} (addr = {})", io_error, conf.address))
356        }
357    }
358}
359
360pub fn start_http(
361    conf: HttpConfiguration, handler: MetaIoHandler<Metadata>,
362) -> Result<Option<HttpServer>, String> {
363    if !conf.enabled {
364        return Ok(None);
365    }
366    let mut builder = HttpServerBuilder::new(handler);
367    if let Some(threads) = conf.threads {
368        builder = builder.threads(threads);
369    }
370
371    match builder
372        .keep_alive(conf.keep_alive)
373        .cors(conf.cors_domains.clone())
374        .start_http(&conf.address)
375    {
376        Ok(server) => Ok(Some(server)),
377        Err(io_error) => Err(format!(
378            "HTTP error: {} (addr = {})",
379            io_error, conf.address
380        )),
381    }
382}
383
384pub fn start_ws<H, T>(
385    conf: WsConfiguration, handler: H, extractor: T,
386) -> Result<Option<WsServer>, String>
387where
388    H: Into<MetaIoHandler<Metadata>>,
389    T: WsMetaExtractor<Metadata> + 'static,
390{
391    if !conf.enabled {
392        return Ok(None);
393    }
394
395    match WsServerBuilder::with_meta_extractor(handler, extractor)
396        .max_payload(conf.max_payload_bytes)
397        .start(&conf.address)
398    {
399        Ok(server) => Ok(Some(server)),
400        Err(io_error) => {
401            Err(format!("WS error: {} (addr = {})", io_error, conf.address))
402        }
403    }
404}
405
406// start espace rpc server v2(async)
407pub async fn launch_async_rpc_servers(
408    consensus: SharedConsensusGraph, sync: SharedSynchronizationService,
409    tx_pool: SharedTransactionPool, notifications: Arc<Notifications>,
410    executor: TaskExecutor, conf: &Configuration,
411) -> Result<Option<RpcServerHandle>, String> {
412    let http_config = conf.eth_http_config();
413    let ws_config = conf.eth_ws_config();
414    let apis = conf.raw_conf.public_evm_rpc_apis.clone();
415
416    let (transport_rpc_module_config, server_config) =
417        match (http_config.enabled, ws_config.enabled) {
418            (true, true) => {
419                let transport_rpc_module_config =
420                    TransportRpcModuleConfig::set_http(apis.clone())
421                        .with_ws(apis.clone());
422
423                let server_config =
424                    RpcServerConfig::http(conf.jsonrpsee_server_builder())
425                        .with_ws(conf.jsonrpsee_server_builder())
426                        .with_http_address(http_config.address)
427                        .with_ws_address(ws_config.address);
428                (transport_rpc_module_config, server_config)
429            }
430            (true, false) => {
431                let transport_rpc_module_config =
432                    TransportRpcModuleConfig::set_http(apis.clone());
433                let server_config =
434                    RpcServerConfig::http(conf.jsonrpsee_server_builder())
435                        .with_http_address(http_config.address);
436                (transport_rpc_module_config, server_config)
437            }
438            (false, true) => {
439                let transport_rpc_module_config =
440                    TransportRpcModuleConfig::set_ws(apis.clone());
441                let server_config =
442                    RpcServerConfig::ws(conf.jsonrpsee_server_builder())
443                        .with_ws_address(ws_config.address);
444                (transport_rpc_module_config, server_config)
445            }
446            _ => return Ok(None),
447        };
448
449    info!("Enabled evm async rpc modules: {:?}", apis.into_selection());
450    let rpc_conf = conf.rpc_impl_config();
451    let enable_metrics = rpc_conf.enable_metrics;
452
453    let rpc_module_builder = RpcModuleBuilder::new(
454        rpc_conf,
455        consensus,
456        sync,
457        tx_pool,
458        executor,
459        notifications,
460    );
461
462    let transport_rpc_modules =
463        rpc_module_builder.build(transport_rpc_module_config);
464
465    let throttling_conf_file = conf.raw_conf.throttling_conf.clone();
466    let server_handle = server_config
467        .start(&transport_rpc_modules, throttling_conf_file, enable_metrics)
468        .await
469        .map_err(|e| e.to_string())?;
470
471    Ok(Some(server_handle))
472}