1use cfx_rpc_builder::{
6 RpcModuleBuilder, RpcServerConfig, RpcServerHandle,
7 TransportRpcModuleConfig,
8};
9use cfx_tasks::TaskExecutor;
10use cfxcore::{
11 Notifications, SharedConsensusGraph, SharedSynchronizationService,
12 SharedTransactionPool,
13};
14use jsonrpc_core::{MetaIoHandler, RemoteProcedure, Value};
15use jsonrpc_http_server::{
16 Server as HttpServer, ServerBuilder as HttpServerBuilder,
17};
18use jsonrpc_tcp_server::{
19 MetaExtractor as TpcMetaExtractor, Server as TcpServer,
20 ServerBuilder as TcpServerBuilder,
21};
22use jsonrpc_ws_server::{
23 MetaExtractor as WsMetaExtractor, Server as WsServer,
24 ServerBuilder as WsServerBuilder,
25};
26pub use jsonrpsee::server::ServerBuilder;
27use log::{info, warn};
28use std::sync::Arc;
29
30mod authcodes;
31pub mod errors;
32pub mod extractor;
33mod helpers;
34mod http_common;
35pub mod impls;
36pub mod informant;
37mod interceptor;
38pub mod metadata;
39mod traits;
40pub mod types;
41
42pub use cfxcore::errors::{
43 BoxFuture as CoreBoxFuture, Error as CoreError, Result as CoreResult,
44};
45pub use errors::{error_codes, invalid_params};
46
47use self::{
48 impls::{
49 cfx::{CfxHandler, LocalRpcImpl, RpcImpl, TestRpcImpl, TraceHandler},
50 cfx_filter::CfxFilterClient,
51 common::RpcImpl as CommonImpl,
52 light::{
53 CfxHandler as LightCfxHandler, DebugRpcImpl as LightDebugRpcImpl,
54 RpcImpl as LightImpl, TestRpcImpl as LightTestRpcImpl,
55 },
56 pool::TransactionPoolHandler,
57 pos::{PoSInterceptor, PosHandler},
58 pubsub::PubSubClient,
59 },
60 traits::{
61 cfx::Cfx, cfx_filter::CfxFilter, debug::LocalRpc,
62 pool::TransactionPool, pos::Pos, pubsub::PubSub, test::TestRpc,
63 trace::Trace,
64 },
65};
66
67pub use self::types::{Block as RpcBlock, Origin};
68use crate::{
69 configuration::Configuration,
70 rpc::{
71 impls::RpcImplConfiguration,
72 interceptor::{RpcInterceptor, RpcProxy},
73 },
74};
75pub use cfx_config::rpc_server_config::{
76 HttpConfiguration, TcpConfiguration, WsConfiguration,
77};
78use cfx_rpc_cfx_types::apis::{Api, ApiSet};
79use interceptor::{MetricsInterceptor, ThrottleInterceptor};
80pub use metadata::Metadata;
81use std::collections::HashSet;
82
83pub fn setup_public_rpc_apis(
84 common: Arc<CommonImpl>, rpc: Arc<RpcImpl>, pubsub: PubSubClient,
85 conf: &Configuration,
86) -> MetaIoHandler<Metadata> {
87 setup_rpc_apis(
88 common,
89 rpc,
90 pubsub,
91 &conf.raw_conf.throttling_conf,
92 "rpc",
93 conf.raw_conf.public_rpc_apis.list_apis(),
94 )
95}
96
97pub fn setup_debug_rpc_apis(
98 common: Arc<CommonImpl>, rpc: Arc<RpcImpl>, pubsub: PubSubClient,
99 conf: &Configuration,
100) -> MetaIoHandler<Metadata> {
101 setup_rpc_apis(
102 common,
103 rpc,
104 pubsub,
105 &conf.raw_conf.throttling_conf,
106 "rpc_local",
107 ApiSet::All.list_apis(),
108 )
109}
110
111fn setup_rpc_apis(
112 common: Arc<CommonImpl>, rpc: Arc<RpcImpl>, pubsub: PubSubClient,
113 throttling_conf: &Option<String>, throttling_section: &str,
114 apis: HashSet<Api>,
115) -> MetaIoHandler<Metadata> {
116 let mut handler = MetaIoHandler::default();
117 for api in &apis {
118 match api {
119 Api::Cfx => {
120 let cfx =
121 CfxHandler::new(common.clone(), rpc.clone()).to_delegate();
122 extend_with_interceptor(
123 &mut handler,
124 &rpc.config,
125 cfx,
126 throttling_conf,
127 throttling_section,
128 );
129
130 if let Some(poll_lifetime) = rpc.config.poll_lifetime_in_seconds
131 {
132 if let Some(h) = pubsub.handler().upgrade() {
133 let filter_client = CfxFilterClient::new(
134 rpc.consensus.clone(),
135 rpc.tx_pool.clone(),
136 pubsub.epochs_ordered(),
137 pubsub.executor.clone(),
138 poll_lifetime,
139 rpc.config.get_logs_filter_max_limit,
140 h.network.clone(),
141 )
142 .to_delegate();
143
144 extend_with_interceptor(
145 &mut handler,
146 &rpc.config,
147 filter_client,
148 throttling_conf,
149 throttling_section,
150 );
151 }
152 }
153 }
154 Api::Debug => {
155 handler.extend_with(
156 LocalRpcImpl::new(common.clone(), rpc.clone())
157 .to_delegate(),
158 );
159 }
160 Api::Pubsub => {
161 extend_with_interceptor(
162 &mut handler,
163 &rpc.config,
164 pubsub.clone().to_delegate(),
165 throttling_conf,
166 throttling_section,
167 );
168 }
169 Api::Test => {
170 handler.extend_with(
171 TestRpcImpl::new(common.clone(), rpc.clone()).to_delegate(),
172 );
173 }
174 Api::Trace => {
175 let trace = TraceHandler::new(
176 *rpc.sync.network.get_network_type(),
177 rpc.consensus.clone(),
178 )
179 .to_delegate();
180 extend_with_interceptor(
181 &mut handler,
182 &rpc.config,
183 trace,
184 throttling_conf,
185 throttling_section,
186 );
187 }
188 Api::TxPool => {
189 let txpool =
190 TransactionPoolHandler::new(common.clone()).to_delegate();
191 extend_with_interceptor(
192 &mut handler,
193 &rpc.config,
194 txpool,
195 throttling_conf,
196 throttling_section,
197 );
198 }
199 Api::Pos => {
200 let pos = PosHandler::new(
201 common.pos_handler.clone(),
202 rpc.consensus.data_manager().clone(),
203 *rpc.sync.network.get_network_type(),
204 rpc.consensus.clone(),
205 )
206 .to_delegate();
207 let pos_interceptor =
208 PoSInterceptor::new(common.pos_handler.clone());
209 handler.extend_with(RpcProxy::new(pos, pos_interceptor));
210 }
211 }
212 }
213
214 add_meta_rpc_methods(handler, apis)
215}
216
217pub fn extend_with_interceptor<
218 T: IntoIterator<Item = (String, RemoteProcedure<Metadata>)>,
219>(
220 handler: &mut MetaIoHandler<Metadata>, rpc_conf: &RpcImplConfiguration,
221 rpc_impl: T, throttling_conf: &Option<String>, throttling_section: &str,
222) {
223 let interceptor =
224 ThrottleInterceptor::new(throttling_conf, throttling_section);
225 if rpc_conf.enable_metrics {
226 handler.extend_with(RpcProxy::new(
227 rpc_impl,
228 MetricsInterceptor::new(interceptor),
229 ));
230 } else {
231 handler.extend_with(RpcProxy::new(rpc_impl, interceptor));
232 }
233}
234
235fn add_meta_rpc_methods(
236 mut handler: MetaIoHandler<Metadata>, apis: HashSet<Api>,
237) -> MetaIoHandler<Metadata> {
238 let methods: Vec<String> =
240 handler.iter().map(|(method, _)| method).cloned().collect();
241 handler.add_sync_method("rpc_methods", move |_| {
242 let method_list = methods
243 .clone()
244 .iter()
245 .map(|m| Value::String(m.to_string()))
246 .collect();
247 Ok(Value::Array(method_list))
248 });
249
250 let namespaces: Vec<String> =
252 apis.into_iter().map(|item| format!("{}", item)).collect();
253 handler.add_sync_method("rpc_modules", move |_| {
254 let ns = namespaces
255 .clone()
256 .iter()
257 .map(|m| Value::String(m.to_string()))
258 .collect();
259 Ok(Value::Array(ns))
260 });
261
262 handler
263}
264
265pub fn setup_public_rpc_apis_light(
266 common: Arc<CommonImpl>, rpc: Arc<LightImpl>, pubsub: PubSubClient,
267 conf: &Configuration,
268) -> MetaIoHandler<Metadata> {
269 setup_rpc_apis_light(
270 common,
271 rpc,
272 pubsub,
273 &conf.raw_conf.throttling_conf,
274 "rpc",
275 conf.raw_conf.public_rpc_apis.list_apis(),
276 )
277}
278
279pub fn setup_debug_rpc_apis_light(
280 common: Arc<CommonImpl>, rpc: Arc<LightImpl>, pubsub: PubSubClient,
281 conf: &Configuration,
282) -> MetaIoHandler<Metadata> {
283 let mut light_debug_apis = ApiSet::All.list_apis();
284 light_debug_apis.remove(&Api::Trace);
285 setup_rpc_apis_light(
286 common,
287 rpc,
288 pubsub,
289 &conf.raw_conf.throttling_conf,
290 "rpc_local",
291 light_debug_apis,
292 )
293}
294
295fn setup_rpc_apis_light(
296 common: Arc<CommonImpl>, rpc: Arc<LightImpl>, pubsub: PubSubClient,
297 throttling_conf: &Option<String>, throttling_section: &str,
298 apis: HashSet<Api>,
299) -> MetaIoHandler<Metadata> {
300 let mut handler = MetaIoHandler::default();
301 for api in apis {
302 match api {
303 Api::Cfx => {
304 let cfx = LightCfxHandler::new(common.clone(), rpc.clone())
305 .to_delegate();
306 let interceptor = ThrottleInterceptor::new(
307 throttling_conf,
308 throttling_section,
309 );
310 handler.extend_with(RpcProxy::new(cfx, interceptor));
311 }
312 Api::Debug => {
313 handler.extend_with(
314 LightDebugRpcImpl::new(common.clone(), rpc.clone())
315 .to_delegate(),
316 );
317 }
318 Api::Pubsub => handler.extend_with(pubsub.clone().to_delegate()),
319 Api::Test => {
320 handler.extend_with(
321 LightTestRpcImpl::new(common.clone(), rpc.clone())
322 .to_delegate(),
323 );
324 }
325 Api::Trace => {
326 warn!("Light nodes do not support trace RPC");
327 }
328 Api::TxPool => {
329 warn!("Light nodes do not support txpool RPC");
330 }
331 Api::Pos => {
332 warn!("Light nodes do not support PoS RPC");
333 }
334 }
335 }
336 handler
337}
338
339pub fn start_tcp<H, T>(
340 conf: TcpConfiguration, handler: H, extractor: T,
341) -> Result<Option<TcpServer>, String>
342where
343 H: Into<MetaIoHandler<Metadata>>,
344 T: TpcMetaExtractor<Metadata> + 'static,
345{
346 if !conf.enabled {
347 return Ok(None);
348 }
349
350 match TcpServerBuilder::with_meta_extractor(handler, extractor)
351 .start(&conf.address)
352 {
353 Ok(server) => Ok(Some(server)),
354 Err(io_error) => {
355 Err(format!("TCP error: {} (addr = {})", io_error, conf.address))
356 }
357 }
358}
359
360pub fn start_http(
361 conf: HttpConfiguration, handler: MetaIoHandler<Metadata>,
362) -> Result<Option<HttpServer>, String> {
363 if !conf.enabled {
364 return Ok(None);
365 }
366 let mut builder = HttpServerBuilder::new(handler);
367 if let Some(threads) = conf.threads {
368 builder = builder.threads(threads);
369 }
370
371 match builder
372 .keep_alive(conf.keep_alive)
373 .cors(conf.cors_domains.clone())
374 .start_http(&conf.address)
375 {
376 Ok(server) => Ok(Some(server)),
377 Err(io_error) => Err(format!(
378 "HTTP error: {} (addr = {})",
379 io_error, conf.address
380 )),
381 }
382}
383
384pub fn start_ws<H, T>(
385 conf: WsConfiguration, handler: H, extractor: T,
386) -> Result<Option<WsServer>, String>
387where
388 H: Into<MetaIoHandler<Metadata>>,
389 T: WsMetaExtractor<Metadata> + 'static,
390{
391 if !conf.enabled {
392 return Ok(None);
393 }
394
395 match WsServerBuilder::with_meta_extractor(handler, extractor)
396 .max_payload(conf.max_payload_bytes)
397 .start(&conf.address)
398 {
399 Ok(server) => Ok(Some(server)),
400 Err(io_error) => {
401 Err(format!("WS error: {} (addr = {})", io_error, conf.address))
402 }
403 }
404}
405
406pub async fn launch_async_rpc_servers(
408 consensus: SharedConsensusGraph, sync: SharedSynchronizationService,
409 tx_pool: SharedTransactionPool, notifications: Arc<Notifications>,
410 executor: TaskExecutor, conf: &Configuration,
411) -> Result<Option<RpcServerHandle>, String> {
412 let http_config = conf.eth_http_config();
413 let ws_config = conf.eth_ws_config();
414 let apis = conf.raw_conf.public_evm_rpc_apis.clone();
415
416 let (transport_rpc_module_config, server_config) =
417 match (http_config.enabled, ws_config.enabled) {
418 (true, true) => {
419 let transport_rpc_module_config =
420 TransportRpcModuleConfig::set_http(apis.clone())
421 .with_ws(apis.clone());
422
423 let server_config =
424 RpcServerConfig::http(conf.jsonrpsee_server_builder())
425 .with_ws(conf.jsonrpsee_server_builder())
426 .with_http_address(http_config.address)
427 .with_ws_address(ws_config.address);
428 (transport_rpc_module_config, server_config)
429 }
430 (true, false) => {
431 let transport_rpc_module_config =
432 TransportRpcModuleConfig::set_http(apis.clone());
433 let server_config =
434 RpcServerConfig::http(conf.jsonrpsee_server_builder())
435 .with_http_address(http_config.address);
436 (transport_rpc_module_config, server_config)
437 }
438 (false, true) => {
439 let transport_rpc_module_config =
440 TransportRpcModuleConfig::set_ws(apis.clone());
441 let server_config =
442 RpcServerConfig::ws(conf.jsonrpsee_server_builder())
443 .with_ws_address(ws_config.address);
444 (transport_rpc_module_config, server_config)
445 }
446 _ => return Ok(None),
447 };
448
449 info!("Enabled evm async rpc modules: {:?}", apis.into_selection());
450 let rpc_conf = conf.rpc_impl_config();
451 let enable_metrics = rpc_conf.enable_metrics;
452
453 let rpc_module_builder = RpcModuleBuilder::new(
454 rpc_conf,
455 consensus,
456 sync,
457 tx_pool,
458 executor,
459 notifications,
460 );
461
462 let transport_rpc_modules =
463 rpc_module_builder.build(transport_rpc_module_config);
464
465 let throttling_conf_file = conf.raw_conf.throttling_conf.clone();
466 let server_handle = server_config
467 .start(&transport_rpc_modules, throttling_conf_file, enable_metrics)
468 .await
469 .map_err(|e| e.to_string())?;
470
471 Ok(Some(server_handle))
472}