1use blockgen::BlockGeneratorTestApi;
6use cfx_rpc_builder::{
7 CfxRpcModuleBuilder, CfxRpcModuleSelection, CfxRpcServerConfig,
8 CfxTransportRpcModuleConfig, RpcModuleBuilder, RpcServerConfig,
9 RpcServerHandle, TransportRpcModuleConfig,
10};
11use cfx_tasks::TaskExecutor;
12use cfxcore::{
13 block_data_manager::BlockDataManager, consensus::pos_handler::PosVerifier,
14 Notifications, SharedConsensusGraph, SharedSynchronizationService,
15 SharedTransactionPool,
16};
17use jsonrpc_core::{MetaIoHandler, RemoteProcedure, Value};
18use jsonrpc_http_server::{
19 Server as HttpServer, ServerBuilder as HttpServerBuilder,
20};
21use jsonrpc_tcp_server::{
22 MetaExtractor as TpcMetaExtractor, Server as TcpServer,
23 ServerBuilder as TcpServerBuilder,
24};
25use jsonrpc_ws_server::{
26 MetaExtractor as WsMetaExtractor, Server as WsServer,
27 ServerBuilder as WsServerBuilder,
28};
29pub use jsonrpsee::server::ServerBuilder;
30use log::{info, warn};
31use network::NetworkService;
32use parking_lot::Mutex;
33use std::sync::Arc;
34use txgen::{DirectTransactionGenerator, TransactionGenerator};
35
36mod authcodes;
37pub mod errors;
38pub mod extractor;
39mod helpers;
40mod http_common;
41pub mod impls;
42pub mod informant;
43mod interceptor;
44pub mod metadata;
45mod traits;
46pub mod types;
47
48pub use cfxcore::errors::{
49 BoxFuture as CoreBoxFuture, Error as CoreError, Result as CoreResult,
50};
51pub use errors::{error_codes, invalid_params};
52
53use self::{
54 impls::{
55 cfx::{CfxHandler, LocalRpcImpl, RpcImpl, TestRpcImpl, TraceHandler},
56 cfx_filter::CfxFilterClient,
57 common::RpcImpl as CommonImpl,
58 light::{
59 CfxHandler as LightCfxHandler, DebugRpcImpl as LightDebugRpcImpl,
60 RpcImpl as LightImpl, TestRpcImpl as LightTestRpcImpl,
61 },
62 pool::TransactionPoolHandler,
63 pos::{PoSInterceptor, PosHandler},
64 pubsub::PubSubClient,
65 },
66 traits::{
67 cfx::Cfx, cfx_filter::CfxFilter, debug::LocalRpc,
68 pool::TransactionPool, pos::Pos, pubsub::PubSub, test::TestRpc,
69 trace::Trace,
70 },
71};
72
73pub use self::types::{Block as RpcBlock, Origin};
74use crate::{
75 configuration::Configuration,
76 rpc::{
77 impls::RpcImplConfiguration,
78 interceptor::{RpcInterceptor, RpcProxy},
79 },
80};
81pub use cfx_config::rpc_server_config::{
82 HttpConfiguration, TcpConfiguration, WsConfiguration,
83};
84use cfx_rpc_cfx_types::apis::{Api, ApiSet};
85use interceptor::{MetricsInterceptor, ThrottleInterceptor};
86pub use metadata::Metadata;
87use std::collections::HashSet;
88
89pub fn setup_public_rpc_apis(
90 common: Arc<CommonImpl>, rpc: Arc<RpcImpl>, pubsub: PubSubClient,
91 conf: &Configuration,
92) -> MetaIoHandler<Metadata> {
93 setup_rpc_apis(
94 common,
95 rpc,
96 pubsub,
97 &conf.raw_conf.throttling_conf,
98 "rpc",
99 conf.raw_conf.public_rpc_apis.list_apis(),
100 )
101}
102
103pub fn setup_debug_rpc_apis(
104 common: Arc<CommonImpl>, rpc: Arc<RpcImpl>, pubsub: PubSubClient,
105 conf: &Configuration,
106) -> MetaIoHandler<Metadata> {
107 setup_rpc_apis(
108 common,
109 rpc,
110 pubsub,
111 &conf.raw_conf.throttling_conf,
112 "rpc_local",
113 ApiSet::All.list_apis(),
114 )
115}
116
117fn setup_rpc_apis(
118 common: Arc<CommonImpl>, rpc: Arc<RpcImpl>, pubsub: PubSubClient,
119 throttling_conf: &Option<String>, throttling_section: &str,
120 apis: HashSet<Api>,
121) -> MetaIoHandler<Metadata> {
122 let mut handler = MetaIoHandler::default();
123 for api in &apis {
124 match api {
125 Api::Cfx => {
126 let cfx =
127 CfxHandler::new(common.clone(), rpc.clone()).to_delegate();
128 extend_with_interceptor(
129 &mut handler,
130 &rpc.config,
131 cfx,
132 throttling_conf,
133 throttling_section,
134 );
135
136 if let Some(poll_lifetime) = rpc.config.poll_lifetime_in_seconds
137 {
138 if let Some(h) = pubsub.handler().upgrade() {
139 let filter_client = CfxFilterClient::new(
140 rpc.consensus.clone(),
141 rpc.tx_pool.clone(),
142 pubsub.epochs_ordered(),
143 pubsub.executor.clone(),
144 poll_lifetime,
145 rpc.config.get_logs_filter_max_limit,
146 h.network.clone(),
147 )
148 .to_delegate();
149
150 extend_with_interceptor(
151 &mut handler,
152 &rpc.config,
153 filter_client,
154 throttling_conf,
155 throttling_section,
156 );
157 }
158 }
159 }
160 Api::Debug => {
161 handler.extend_with(
162 LocalRpcImpl::new(common.clone(), rpc.clone())
163 .to_delegate(),
164 );
165 }
166 Api::Pubsub => {
167 extend_with_interceptor(
168 &mut handler,
169 &rpc.config,
170 pubsub.clone().to_delegate(),
171 throttling_conf,
172 throttling_section,
173 );
174 }
175 Api::Test => {
176 handler.extend_with(
177 TestRpcImpl::new(common.clone(), rpc.clone()).to_delegate(),
178 );
179 }
180 Api::Trace => {
181 let trace = TraceHandler::new(
182 *rpc.sync.network.get_network_type(),
183 rpc.consensus.clone(),
184 )
185 .to_delegate();
186 extend_with_interceptor(
187 &mut handler,
188 &rpc.config,
189 trace,
190 throttling_conf,
191 throttling_section,
192 );
193 }
194 Api::TxPool => {
195 let txpool =
196 TransactionPoolHandler::new(common.clone()).to_delegate();
197 extend_with_interceptor(
198 &mut handler,
199 &rpc.config,
200 txpool,
201 throttling_conf,
202 throttling_section,
203 );
204 }
205 Api::Pos => {
206 let pos = PosHandler::new(
207 common.pos_handler.clone(),
208 rpc.consensus.data_manager().clone(),
209 *rpc.sync.network.get_network_type(),
210 rpc.consensus.clone(),
211 )
212 .to_delegate();
213 let pos_interceptor =
214 PoSInterceptor::new(common.pos_handler.clone());
215 handler.extend_with(RpcProxy::new(pos, pos_interceptor));
216 }
217 }
218 }
219
220 add_meta_rpc_methods(handler, apis)
221}
222
223pub fn extend_with_interceptor<
224 T: IntoIterator<Item = (String, RemoteProcedure<Metadata>)>,
225>(
226 handler: &mut MetaIoHandler<Metadata>, rpc_conf: &RpcImplConfiguration,
227 rpc_impl: T, throttling_conf: &Option<String>, throttling_section: &str,
228) {
229 let interceptor =
230 ThrottleInterceptor::new(throttling_conf, throttling_section);
231 if rpc_conf.enable_metrics {
232 handler.extend_with(RpcProxy::new(
233 rpc_impl,
234 MetricsInterceptor::new(interceptor),
235 ));
236 } else {
237 handler.extend_with(RpcProxy::new(rpc_impl, interceptor));
238 }
239}
240
241fn add_meta_rpc_methods(
242 mut handler: MetaIoHandler<Metadata>, apis: HashSet<Api>,
243) -> MetaIoHandler<Metadata> {
244 let methods: Vec<String> =
246 handler.iter().map(|(method, _)| method).cloned().collect();
247 handler.add_sync_method("rpc_methods", move |_| {
248 let method_list = methods
249 .clone()
250 .iter()
251 .map(|m| Value::String(m.to_string()))
252 .collect();
253 Ok(Value::Array(method_list))
254 });
255
256 let namespaces: Vec<String> =
258 apis.into_iter().map(|item| format!("{}", item)).collect();
259 handler.add_sync_method("rpc_modules", move |_| {
260 let ns = namespaces
261 .clone()
262 .iter()
263 .map(|m| Value::String(m.to_string()))
264 .collect();
265 Ok(Value::Array(ns))
266 });
267
268 handler
269}
270
271pub fn setup_public_rpc_apis_light(
272 common: Arc<CommonImpl>, rpc: Arc<LightImpl>, pubsub: PubSubClient,
273 conf: &Configuration,
274) -> MetaIoHandler<Metadata> {
275 setup_rpc_apis_light(
276 common,
277 rpc,
278 pubsub,
279 &conf.raw_conf.throttling_conf,
280 "rpc",
281 conf.raw_conf.public_rpc_apis.list_apis(),
282 )
283}
284
285pub fn setup_debug_rpc_apis_light(
286 common: Arc<CommonImpl>, rpc: Arc<LightImpl>, pubsub: PubSubClient,
287 conf: &Configuration,
288) -> MetaIoHandler<Metadata> {
289 let mut light_debug_apis = ApiSet::All.list_apis();
290 light_debug_apis.remove(&Api::Trace);
291 setup_rpc_apis_light(
292 common,
293 rpc,
294 pubsub,
295 &conf.raw_conf.throttling_conf,
296 "rpc_local",
297 light_debug_apis,
298 )
299}
300
301fn setup_rpc_apis_light(
302 common: Arc<CommonImpl>, rpc: Arc<LightImpl>, pubsub: PubSubClient,
303 throttling_conf: &Option<String>, throttling_section: &str,
304 apis: HashSet<Api>,
305) -> MetaIoHandler<Metadata> {
306 let mut handler = MetaIoHandler::default();
307 for api in apis {
308 match api {
309 Api::Cfx => {
310 let cfx = LightCfxHandler::new(common.clone(), rpc.clone())
311 .to_delegate();
312 let interceptor = ThrottleInterceptor::new(
313 throttling_conf,
314 throttling_section,
315 );
316 handler.extend_with(RpcProxy::new(cfx, interceptor));
317 }
318 Api::Debug => {
319 handler.extend_with(
320 LightDebugRpcImpl::new(common.clone(), rpc.clone())
321 .to_delegate(),
322 );
323 }
324 Api::Pubsub => handler.extend_with(pubsub.clone().to_delegate()),
325 Api::Test => {
326 handler.extend_with(
327 LightTestRpcImpl::new(common.clone(), rpc.clone())
328 .to_delegate(),
329 );
330 }
331 Api::Trace => {
332 warn!("Light nodes do not support trace RPC");
333 }
334 Api::TxPool => {
335 warn!("Light nodes do not support txpool RPC");
336 }
337 Api::Pos => {
338 warn!("Light nodes do not support PoS RPC");
339 }
340 }
341 }
342 handler
343}
344
345pub fn start_tcp<H, T>(
346 conf: TcpConfiguration, handler: H, extractor: T,
347) -> Result<Option<TcpServer>, String>
348where
349 H: Into<MetaIoHandler<Metadata>>,
350 T: TpcMetaExtractor<Metadata> + 'static,
351{
352 if !conf.enabled {
353 return Ok(None);
354 }
355
356 match TcpServerBuilder::with_meta_extractor(handler, extractor)
357 .start(&conf.address)
358 {
359 Ok(server) => Ok(Some(server)),
360 Err(io_error) => {
361 Err(format!("TCP error: {} (addr = {})", io_error, conf.address))
362 }
363 }
364}
365
366pub fn start_http(
367 conf: HttpConfiguration, handler: MetaIoHandler<Metadata>,
368) -> Result<Option<HttpServer>, String> {
369 if !conf.enabled {
370 return Ok(None);
371 }
372 let mut builder = HttpServerBuilder::new(handler);
373 if let Some(threads) = conf.threads {
374 builder = builder.threads(threads);
375 }
376
377 match builder
378 .keep_alive(conf.keep_alive)
379 .cors(conf.cors_domains.clone())
380 .start_http(&conf.address)
381 {
382 Ok(server) => Ok(Some(server)),
383 Err(io_error) => Err(format!(
384 "HTTP error: {} (addr = {})",
385 io_error, conf.address
386 )),
387 }
388}
389
390pub fn start_ws<H, T>(
391 conf: WsConfiguration, handler: H, extractor: T,
392) -> Result<Option<WsServer>, String>
393where
394 H: Into<MetaIoHandler<Metadata>>,
395 T: WsMetaExtractor<Metadata> + 'static,
396{
397 if !conf.enabled {
398 return Ok(None);
399 }
400
401 match WsServerBuilder::with_meta_extractor(handler, extractor)
402 .max_payload(conf.max_payload_bytes)
403 .start(&conf.address)
404 {
405 Ok(server) => Ok(Some(server)),
406 Err(io_error) => {
407 Err(format!("WS error: {} (addr = {})", io_error, conf.address))
408 }
409 }
410}
411
412pub async fn launch_async_rpc_servers(
414 consensus: SharedConsensusGraph, sync: SharedSynchronizationService,
415 tx_pool: SharedTransactionPool, notifications: Arc<Notifications>,
416 executor: TaskExecutor, conf: &Configuration,
417) -> Result<Option<RpcServerHandle>, String> {
418 let http_config = conf.eth_http_config();
419 let ws_config = conf.eth_ws_config();
420 let apis = conf.raw_conf.public_evm_rpc_apis.clone();
421
422 let (transport_rpc_module_config, server_config) =
423 match (http_config.enabled, ws_config.enabled) {
424 (true, true) => {
425 let transport_rpc_module_config =
426 TransportRpcModuleConfig::set_http(apis.clone())
427 .with_ws(apis.clone());
428
429 let server_config =
430 RpcServerConfig::http(conf.jsonrpsee_server_builder())
431 .with_ws(conf.jsonrpsee_server_builder())
432 .with_http_address(http_config.address)
433 .with_ws_address(ws_config.address);
434 (transport_rpc_module_config, server_config)
435 }
436 (true, false) => {
437 let transport_rpc_module_config =
438 TransportRpcModuleConfig::set_http(apis.clone());
439 let server_config =
440 RpcServerConfig::http(conf.jsonrpsee_server_builder())
441 .with_http_address(http_config.address);
442 (transport_rpc_module_config, server_config)
443 }
444 (false, true) => {
445 let transport_rpc_module_config =
446 TransportRpcModuleConfig::set_ws(apis.clone());
447 let server_config =
448 RpcServerConfig::ws(conf.jsonrpsee_server_builder())
449 .with_ws_address(ws_config.address);
450 (transport_rpc_module_config, server_config)
451 }
452 _ => return Ok(None),
453 };
454
455 info!("Enabled evm async rpc modules: {:?}", apis.into_selection());
456 let rpc_conf = conf.rpc_impl_config();
457 let enable_metrics = rpc_conf.enable_metrics;
458
459 let rpc_module_builder = RpcModuleBuilder::new(
460 rpc_conf,
461 consensus,
462 sync,
463 tx_pool,
464 executor,
465 notifications,
466 );
467
468 let transport_rpc_modules =
469 rpc_module_builder.build(transport_rpc_module_config);
470
471 let throttling_conf_file = conf.raw_conf.throttling_conf.clone();
472 let server_handle = server_config
473 .start(&transport_rpc_modules, throttling_conf_file, enable_metrics)
474 .await
475 .map_err(|e| e.to_string())?;
476
477 Ok(Some(server_handle))
478}
479
480pub async fn launch_cfx_async_rpc_servers(
482 consensus: SharedConsensusGraph, sync: SharedSynchronizationService,
483 tx_pool: SharedTransactionPool, data_man: Arc<BlockDataManager>,
484 network: Arc<NetworkService>, pos_handler: Arc<PosVerifier>,
485 notifications: Arc<Notifications>, executor: TaskExecutor,
486 accounts: Arc<cfxcore_accounts::AccountProvider>,
487 exit: Arc<(parking_lot::Mutex<bool>, parking_lot::Condvar)>,
488 block_gen: BlockGeneratorTestApi,
489 maybe_txgen: Option<Arc<TransactionGenerator>>,
490 maybe_direct_txgen: Option<Arc<Mutex<DirectTransactionGenerator>>>,
491 conf: &Configuration, apis: ApiSet, is_debug: bool,
492) -> Result<Option<RpcServerHandle>, String> {
493 let (http_config, ws_config) = if !is_debug {
494 (conf.http_config(), conf.ws_config())
495 } else {
496 (conf.local_http_config(), conf.local_ws_config())
497 };
498
499 let (transport_rpc_module_config, server_config) =
500 match (http_config.enabled, ws_config.enabled) {
501 (true, true) => {
502 let transport_rpc_module_config =
503 CfxTransportRpcModuleConfig::set_http(apis.clone())
504 .with_ws(apis.clone());
505
506 let server_config =
507 CfxRpcServerConfig::http(conf.jsonrpsee_server_builder())
508 .with_ws(conf.jsonrpsee_server_builder())
509 .with_http_address(http_config.address)
510 .with_ws_address(ws_config.address);
511 (transport_rpc_module_config, server_config)
512 }
513 (true, false) => {
514 let transport_rpc_module_config =
515 CfxTransportRpcModuleConfig::set_http(apis.clone());
516 let server_config =
517 CfxRpcServerConfig::http(conf.jsonrpsee_server_builder())
518 .with_http_address(http_config.address);
519 (transport_rpc_module_config, server_config)
520 }
521 (false, true) => {
522 let transport_rpc_module_config =
523 CfxTransportRpcModuleConfig::set_ws(apis.clone());
524 let server_config =
525 CfxRpcServerConfig::ws(conf.jsonrpsee_server_builder())
526 .with_ws_address(ws_config.address);
527 (transport_rpc_module_config, server_config)
528 }
529 _ => return Ok(None),
530 };
531
532 info!(
533 "Enabled cfx async rpc modules: {:?}",
534 CfxRpcModuleSelection::from(apis).into_selection()
535 );
536
537 let rpc_conf = conf.rpc_impl_config();
538 let rpc_module_builder = CfxRpcModuleBuilder::new(
539 rpc_conf,
540 consensus,
541 sync,
542 tx_pool,
543 executor,
544 data_man,
545 network,
546 pos_handler,
547 notifications,
548 accounts,
549 exit,
550 block_gen,
551 maybe_txgen,
552 maybe_direct_txgen,
553 );
554
555 let transport_rpc_modules =
556 rpc_module_builder.build(transport_rpc_module_config);
557
558 let server_handle = server_config
559 .start(&transport_rpc_modules)
560 .await
561 .map_err(|e| e.to_string())?;
562
563 Ok(Some(server_handle))
564}