client/rpc/
mod.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use blockgen::BlockGeneratorTestApi;
6use cfx_rpc_builder::{
7    CfxRpcModuleBuilder, CfxRpcModuleSelection, CfxRpcServerConfig,
8    CfxTransportRpcModuleConfig, RpcModuleBuilder, RpcServerConfig,
9    RpcServerHandle, TransportRpcModuleConfig,
10};
11use cfx_tasks::TaskExecutor;
12use cfxcore::{
13    block_data_manager::BlockDataManager, consensus::pos_handler::PosVerifier,
14    Notifications, SharedConsensusGraph, SharedSynchronizationService,
15    SharedTransactionPool,
16};
17use jsonrpc_core::{MetaIoHandler, RemoteProcedure, Value};
18use jsonrpc_http_server::{
19    Server as HttpServer, ServerBuilder as HttpServerBuilder,
20};
21use jsonrpc_tcp_server::{
22    MetaExtractor as TpcMetaExtractor, Server as TcpServer,
23    ServerBuilder as TcpServerBuilder,
24};
25use jsonrpc_ws_server::{
26    MetaExtractor as WsMetaExtractor, Server as WsServer,
27    ServerBuilder as WsServerBuilder,
28};
29pub use jsonrpsee::server::ServerBuilder;
30use log::{info, warn};
31use network::NetworkService;
32use parking_lot::Mutex;
33use std::sync::Arc;
34use txgen::{DirectTransactionGenerator, TransactionGenerator};
35
36mod authcodes;
37pub mod errors;
38pub mod extractor;
39mod helpers;
40mod http_common;
41pub mod impls;
42pub mod informant;
43mod interceptor;
44pub mod metadata;
45mod traits;
46pub mod types;
47
48pub use cfxcore::errors::{
49    BoxFuture as CoreBoxFuture, Error as CoreError, Result as CoreResult,
50};
51pub use errors::{error_codes, invalid_params};
52
53use self::{
54    impls::{
55        cfx::{CfxHandler, LocalRpcImpl, RpcImpl, TestRpcImpl, TraceHandler},
56        cfx_filter::CfxFilterClient,
57        common::RpcImpl as CommonImpl,
58        light::{
59            CfxHandler as LightCfxHandler, DebugRpcImpl as LightDebugRpcImpl,
60            RpcImpl as LightImpl, TestRpcImpl as LightTestRpcImpl,
61        },
62        pool::TransactionPoolHandler,
63        pos::{PoSInterceptor, PosHandler},
64        pubsub::PubSubClient,
65    },
66    traits::{
67        cfx::Cfx, cfx_filter::CfxFilter, debug::LocalRpc,
68        pool::TransactionPool, pos::Pos, pubsub::PubSub, test::TestRpc,
69        trace::Trace,
70    },
71};
72
73pub use self::types::{Block as RpcBlock, Origin};
74use crate::{
75    configuration::Configuration,
76    rpc::{
77        impls::RpcImplConfiguration,
78        interceptor::{RpcInterceptor, RpcProxy},
79    },
80};
81pub use cfx_config::rpc_server_config::{
82    HttpConfiguration, TcpConfiguration, WsConfiguration,
83};
84use cfx_rpc_cfx_types::apis::{Api, ApiSet};
85use interceptor::{MetricsInterceptor, ThrottleInterceptor};
86pub use metadata::Metadata;
87use std::collections::HashSet;
88
89pub fn setup_public_rpc_apis(
90    common: Arc<CommonImpl>, rpc: Arc<RpcImpl>, pubsub: PubSubClient,
91    conf: &Configuration,
92) -> MetaIoHandler<Metadata> {
93    setup_rpc_apis(
94        common,
95        rpc,
96        pubsub,
97        &conf.raw_conf.throttling_conf,
98        "rpc",
99        conf.raw_conf.public_rpc_apis.list_apis(),
100    )
101}
102
103pub fn setup_debug_rpc_apis(
104    common: Arc<CommonImpl>, rpc: Arc<RpcImpl>, pubsub: PubSubClient,
105    conf: &Configuration,
106) -> MetaIoHandler<Metadata> {
107    setup_rpc_apis(
108        common,
109        rpc,
110        pubsub,
111        &conf.raw_conf.throttling_conf,
112        "rpc_local",
113        ApiSet::All.list_apis(),
114    )
115}
116
117fn setup_rpc_apis(
118    common: Arc<CommonImpl>, rpc: Arc<RpcImpl>, pubsub: PubSubClient,
119    throttling_conf: &Option<String>, throttling_section: &str,
120    apis: HashSet<Api>,
121) -> MetaIoHandler<Metadata> {
122    let mut handler = MetaIoHandler::default();
123    for api in &apis {
124        match api {
125            Api::Cfx => {
126                let cfx =
127                    CfxHandler::new(common.clone(), rpc.clone()).to_delegate();
128                extend_with_interceptor(
129                    &mut handler,
130                    &rpc.config,
131                    cfx,
132                    throttling_conf,
133                    throttling_section,
134                );
135
136                if let Some(poll_lifetime) = rpc.config.poll_lifetime_in_seconds
137                {
138                    if let Some(h) = pubsub.handler().upgrade() {
139                        let filter_client = CfxFilterClient::new(
140                            rpc.consensus.clone(),
141                            rpc.tx_pool.clone(),
142                            pubsub.epochs_ordered(),
143                            pubsub.executor.clone(),
144                            poll_lifetime,
145                            rpc.config.get_logs_filter_max_limit,
146                            h.network.clone(),
147                        )
148                        .to_delegate();
149
150                        extend_with_interceptor(
151                            &mut handler,
152                            &rpc.config,
153                            filter_client,
154                            throttling_conf,
155                            throttling_section,
156                        );
157                    }
158                }
159            }
160            Api::Debug => {
161                handler.extend_with(
162                    LocalRpcImpl::new(common.clone(), rpc.clone())
163                        .to_delegate(),
164                );
165            }
166            Api::Pubsub => {
167                extend_with_interceptor(
168                    &mut handler,
169                    &rpc.config,
170                    pubsub.clone().to_delegate(),
171                    throttling_conf,
172                    throttling_section,
173                );
174            }
175            Api::Test => {
176                handler.extend_with(
177                    TestRpcImpl::new(common.clone(), rpc.clone()).to_delegate(),
178                );
179            }
180            Api::Trace => {
181                let trace = TraceHandler::new(
182                    *rpc.sync.network.get_network_type(),
183                    rpc.consensus.clone(),
184                )
185                .to_delegate();
186                extend_with_interceptor(
187                    &mut handler,
188                    &rpc.config,
189                    trace,
190                    throttling_conf,
191                    throttling_section,
192                );
193            }
194            Api::TxPool => {
195                let txpool =
196                    TransactionPoolHandler::new(common.clone()).to_delegate();
197                extend_with_interceptor(
198                    &mut handler,
199                    &rpc.config,
200                    txpool,
201                    throttling_conf,
202                    throttling_section,
203                );
204            }
205            Api::Pos => {
206                let pos = PosHandler::new(
207                    common.pos_handler.clone(),
208                    rpc.consensus.data_manager().clone(),
209                    *rpc.sync.network.get_network_type(),
210                    rpc.consensus.clone(),
211                )
212                .to_delegate();
213                let pos_interceptor =
214                    PoSInterceptor::new(common.pos_handler.clone());
215                handler.extend_with(RpcProxy::new(pos, pos_interceptor));
216            }
217        }
218    }
219
220    add_meta_rpc_methods(handler, apis)
221}
222
223pub fn extend_with_interceptor<
224    T: IntoIterator<Item = (String, RemoteProcedure<Metadata>)>,
225>(
226    handler: &mut MetaIoHandler<Metadata>, rpc_conf: &RpcImplConfiguration,
227    rpc_impl: T, throttling_conf: &Option<String>, throttling_section: &str,
228) {
229    let interceptor =
230        ThrottleInterceptor::new(throttling_conf, throttling_section);
231    if rpc_conf.enable_metrics {
232        handler.extend_with(RpcProxy::new(
233            rpc_impl,
234            MetricsInterceptor::new(interceptor),
235        ));
236    } else {
237        handler.extend_with(RpcProxy::new(rpc_impl, interceptor));
238    }
239}
240
241fn add_meta_rpc_methods(
242    mut handler: MetaIoHandler<Metadata>, apis: HashSet<Api>,
243) -> MetaIoHandler<Metadata> {
244    // rpc_methods to return all available methods
245    let methods: Vec<String> =
246        handler.iter().map(|(method, _)| method).cloned().collect();
247    handler.add_sync_method("rpc_methods", move |_| {
248        let method_list = methods
249            .clone()
250            .iter()
251            .map(|m| Value::String(m.to_string()))
252            .collect();
253        Ok(Value::Array(method_list))
254    });
255
256    // rpc_modules
257    let namespaces: Vec<String> =
258        apis.into_iter().map(|item| format!("{}", item)).collect();
259    handler.add_sync_method("rpc_modules", move |_| {
260        let ns = namespaces
261            .clone()
262            .iter()
263            .map(|m| Value::String(m.to_string()))
264            .collect();
265        Ok(Value::Array(ns))
266    });
267
268    handler
269}
270
271pub fn setup_public_rpc_apis_light(
272    common: Arc<CommonImpl>, rpc: Arc<LightImpl>, pubsub: PubSubClient,
273    conf: &Configuration,
274) -> MetaIoHandler<Metadata> {
275    setup_rpc_apis_light(
276        common,
277        rpc,
278        pubsub,
279        &conf.raw_conf.throttling_conf,
280        "rpc",
281        conf.raw_conf.public_rpc_apis.list_apis(),
282    )
283}
284
285pub fn setup_debug_rpc_apis_light(
286    common: Arc<CommonImpl>, rpc: Arc<LightImpl>, pubsub: PubSubClient,
287    conf: &Configuration,
288) -> MetaIoHandler<Metadata> {
289    let mut light_debug_apis = ApiSet::All.list_apis();
290    light_debug_apis.remove(&Api::Trace);
291    setup_rpc_apis_light(
292        common,
293        rpc,
294        pubsub,
295        &conf.raw_conf.throttling_conf,
296        "rpc_local",
297        light_debug_apis,
298    )
299}
300
301fn setup_rpc_apis_light(
302    common: Arc<CommonImpl>, rpc: Arc<LightImpl>, pubsub: PubSubClient,
303    throttling_conf: &Option<String>, throttling_section: &str,
304    apis: HashSet<Api>,
305) -> MetaIoHandler<Metadata> {
306    let mut handler = MetaIoHandler::default();
307    for api in apis {
308        match api {
309            Api::Cfx => {
310                let cfx = LightCfxHandler::new(common.clone(), rpc.clone())
311                    .to_delegate();
312                let interceptor = ThrottleInterceptor::new(
313                    throttling_conf,
314                    throttling_section,
315                );
316                handler.extend_with(RpcProxy::new(cfx, interceptor));
317            }
318            Api::Debug => {
319                handler.extend_with(
320                    LightDebugRpcImpl::new(common.clone(), rpc.clone())
321                        .to_delegate(),
322                );
323            }
324            Api::Pubsub => handler.extend_with(pubsub.clone().to_delegate()),
325            Api::Test => {
326                handler.extend_with(
327                    LightTestRpcImpl::new(common.clone(), rpc.clone())
328                        .to_delegate(),
329                );
330            }
331            Api::Trace => {
332                warn!("Light nodes do not support trace RPC");
333            }
334            Api::TxPool => {
335                warn!("Light nodes do not support txpool RPC");
336            }
337            Api::Pos => {
338                warn!("Light nodes do not support PoS RPC");
339            }
340        }
341    }
342    handler
343}
344
345pub fn start_tcp<H, T>(
346    conf: TcpConfiguration, handler: H, extractor: T,
347) -> Result<Option<TcpServer>, String>
348where
349    H: Into<MetaIoHandler<Metadata>>,
350    T: TpcMetaExtractor<Metadata> + 'static,
351{
352    if !conf.enabled {
353        return Ok(None);
354    }
355
356    match TcpServerBuilder::with_meta_extractor(handler, extractor)
357        .start(&conf.address)
358    {
359        Ok(server) => Ok(Some(server)),
360        Err(io_error) => {
361            Err(format!("TCP error: {} (addr = {})", io_error, conf.address))
362        }
363    }
364}
365
366pub fn start_http(
367    conf: HttpConfiguration, handler: MetaIoHandler<Metadata>,
368) -> Result<Option<HttpServer>, String> {
369    if !conf.enabled {
370        return Ok(None);
371    }
372    let mut builder = HttpServerBuilder::new(handler);
373    if let Some(threads) = conf.threads {
374        builder = builder.threads(threads);
375    }
376
377    match builder
378        .keep_alive(conf.keep_alive)
379        .cors(conf.cors_domains.clone())
380        .start_http(&conf.address)
381    {
382        Ok(server) => Ok(Some(server)),
383        Err(io_error) => Err(format!(
384            "HTTP error: {} (addr = {})",
385            io_error, conf.address
386        )),
387    }
388}
389
390pub fn start_ws<H, T>(
391    conf: WsConfiguration, handler: H, extractor: T,
392) -> Result<Option<WsServer>, String>
393where
394    H: Into<MetaIoHandler<Metadata>>,
395    T: WsMetaExtractor<Metadata> + 'static,
396{
397    if !conf.enabled {
398        return Ok(None);
399    }
400
401    match WsServerBuilder::with_meta_extractor(handler, extractor)
402        .max_payload(conf.max_payload_bytes)
403        .start(&conf.address)
404    {
405        Ok(server) => Ok(Some(server)),
406        Err(io_error) => {
407            Err(format!("WS error: {} (addr = {})", io_error, conf.address))
408        }
409    }
410}
411
412// start espace rpc server v2(async)
413pub async fn launch_async_rpc_servers(
414    consensus: SharedConsensusGraph, sync: SharedSynchronizationService,
415    tx_pool: SharedTransactionPool, notifications: Arc<Notifications>,
416    executor: TaskExecutor, conf: &Configuration,
417) -> Result<Option<RpcServerHandle>, String> {
418    let http_config = conf.eth_http_config();
419    let ws_config = conf.eth_ws_config();
420    let apis = conf.raw_conf.public_evm_rpc_apis.clone();
421
422    let (transport_rpc_module_config, server_config) =
423        match (http_config.enabled, ws_config.enabled) {
424            (true, true) => {
425                let transport_rpc_module_config =
426                    TransportRpcModuleConfig::set_http(apis.clone())
427                        .with_ws(apis.clone());
428
429                let server_config =
430                    RpcServerConfig::http(conf.jsonrpsee_server_builder())
431                        .with_ws(conf.jsonrpsee_server_builder())
432                        .with_http_address(http_config.address)
433                        .with_ws_address(ws_config.address);
434                (transport_rpc_module_config, server_config)
435            }
436            (true, false) => {
437                let transport_rpc_module_config =
438                    TransportRpcModuleConfig::set_http(apis.clone());
439                let server_config =
440                    RpcServerConfig::http(conf.jsonrpsee_server_builder())
441                        .with_http_address(http_config.address);
442                (transport_rpc_module_config, server_config)
443            }
444            (false, true) => {
445                let transport_rpc_module_config =
446                    TransportRpcModuleConfig::set_ws(apis.clone());
447                let server_config =
448                    RpcServerConfig::ws(conf.jsonrpsee_server_builder())
449                        .with_ws_address(ws_config.address);
450                (transport_rpc_module_config, server_config)
451            }
452            _ => return Ok(None),
453        };
454
455    info!("Enabled evm async rpc modules: {:?}", apis.into_selection());
456    let rpc_conf = conf.rpc_impl_config();
457    let enable_metrics = rpc_conf.enable_metrics;
458
459    let rpc_module_builder = RpcModuleBuilder::new(
460        rpc_conf,
461        consensus,
462        sync,
463        tx_pool,
464        executor,
465        notifications,
466    );
467
468    let transport_rpc_modules =
469        rpc_module_builder.build(transport_rpc_module_config);
470
471    let throttling_conf_file = conf.raw_conf.throttling_conf.clone();
472    let server_handle = server_config
473        .start(&transport_rpc_modules, throttling_conf_file, enable_metrics)
474        .await
475        .map_err(|e| e.to_string())?;
476
477    Ok(Some(server_handle))
478}
479
480// start core space rpc server v2(async)
481pub async fn launch_cfx_async_rpc_servers(
482    consensus: SharedConsensusGraph, sync: SharedSynchronizationService,
483    tx_pool: SharedTransactionPool, data_man: Arc<BlockDataManager>,
484    network: Arc<NetworkService>, pos_handler: Arc<PosVerifier>,
485    notifications: Arc<Notifications>, executor: TaskExecutor,
486    accounts: Arc<cfxcore_accounts::AccountProvider>,
487    exit: Arc<(parking_lot::Mutex<bool>, parking_lot::Condvar)>,
488    block_gen: BlockGeneratorTestApi,
489    maybe_txgen: Option<Arc<TransactionGenerator>>,
490    maybe_direct_txgen: Option<Arc<Mutex<DirectTransactionGenerator>>>,
491    conf: &Configuration, apis: ApiSet, is_debug: bool,
492) -> Result<Option<RpcServerHandle>, String> {
493    let (http_config, ws_config) = if !is_debug {
494        (conf.http_config(), conf.ws_config())
495    } else {
496        (conf.local_http_config(), conf.local_ws_config())
497    };
498
499    let (transport_rpc_module_config, server_config) =
500        match (http_config.enabled, ws_config.enabled) {
501            (true, true) => {
502                let transport_rpc_module_config =
503                    CfxTransportRpcModuleConfig::set_http(apis.clone())
504                        .with_ws(apis.clone());
505
506                let server_config =
507                    CfxRpcServerConfig::http(conf.jsonrpsee_server_builder())
508                        .with_ws(conf.jsonrpsee_server_builder())
509                        .with_http_address(http_config.address)
510                        .with_ws_address(ws_config.address);
511                (transport_rpc_module_config, server_config)
512            }
513            (true, false) => {
514                let transport_rpc_module_config =
515                    CfxTransportRpcModuleConfig::set_http(apis.clone());
516                let server_config =
517                    CfxRpcServerConfig::http(conf.jsonrpsee_server_builder())
518                        .with_http_address(http_config.address);
519                (transport_rpc_module_config, server_config)
520            }
521            (false, true) => {
522                let transport_rpc_module_config =
523                    CfxTransportRpcModuleConfig::set_ws(apis.clone());
524                let server_config =
525                    CfxRpcServerConfig::ws(conf.jsonrpsee_server_builder())
526                        .with_ws_address(ws_config.address);
527                (transport_rpc_module_config, server_config)
528            }
529            _ => return Ok(None),
530        };
531
532    info!(
533        "Enabled cfx async rpc modules: {:?}",
534        CfxRpcModuleSelection::from(apis).into_selection()
535    );
536
537    let rpc_conf = conf.rpc_impl_config();
538    let rpc_module_builder = CfxRpcModuleBuilder::new(
539        rpc_conf,
540        consensus,
541        sync,
542        tx_pool,
543        executor,
544        data_man,
545        network,
546        pos_handler,
547        notifications,
548        accounts,
549        exit,
550        block_gen,
551        maybe_txgen,
552        maybe_direct_txgen,
553    );
554
555    let transport_rpc_modules =
556        rpc_module_builder.build(transport_rpc_module_config);
557
558    let server_handle = server_config
559        .start(&transport_rpc_modules)
560        .await
561        .map_err(|e| e.to_string())?;
562
563    Ok(Some(server_handle))
564}