1#![allow(unused)]
30mod module;
31
32pub use crate::{
33 error::*, id_provider::SubscriptionIdProvider, RpcServerHandle,
34};
35use cfx_rpc_middlewares::{Logger, Metrics, Throttle};
36use log::debug;
37pub use module::{EthRpcModule, RpcModuleSelection};
38
39use cfx_rpc_cfx_types::RpcImplConfiguration;
40use cfx_rpc_eth_api::*;
41use cfx_rpc_eth_impl::{helpers::ChainInfo, *};
42use cfx_tasks::TaskExecutor;
43use cfxcore::{
44 Notifications, SharedConsensusGraph, SharedSynchronizationService,
45 SharedTransactionPool,
46};
47pub use jsonrpsee::server::ServerBuilder;
48use jsonrpsee::{
49 core::RegisterMethodError,
50 server::{
51 middleware::rpc::RpcServiceBuilder, AlreadyStoppedError, IdProvider,
52 ServerConfigBuilder, ServerHandle,
53 },
54 Methods, RpcModule,
55};
56use std::{
57 collections::HashMap,
58 net::{Ipv4Addr, SocketAddr, SocketAddrV4},
59 sync::Arc,
60};
61
62pub const DEFAULT_HTTP_PORT: u16 = 8545;
63pub const DEFAULT_WS_PORT: u16 = 8546;
64
65#[derive(Clone)]
69pub struct RpcModuleBuilder {
70 config: RpcImplConfiguration,
71 consensus: SharedConsensusGraph,
72 sync: SharedSynchronizationService,
73 tx_pool: SharedTransactionPool,
74 executor: TaskExecutor,
75 notifications: Arc<Notifications>,
76}
77
78impl RpcModuleBuilder {
79 pub fn new(
80 config: RpcImplConfiguration, consensus: SharedConsensusGraph,
81 sync: SharedSynchronizationService, tx_pool: SharedTransactionPool,
82 executor: TaskExecutor, notifications: Arc<Notifications>,
83 ) -> Self {
84 Self {
85 config,
86 consensus,
87 sync,
88 tx_pool,
89 executor,
90 notifications,
91 }
92 }
93
94 pub fn build(
98 self, module_config: TransportRpcModuleConfig,
99 ) -> TransportRpcModules<()> {
100 let mut modules = TransportRpcModules::default();
101
102 if !module_config.is_empty() {
103 let TransportRpcModuleConfig { http, ws } = module_config.clone();
104
105 let Self {
106 config,
107 consensus,
108 sync,
109 tx_pool,
110 executor,
111 notifications,
112 } = self;
113
114 let mut registry = RpcRegistryInner::new(
115 config,
116 consensus,
117 sync,
118 tx_pool,
119 executor,
120 notifications,
121 );
122
123 modules.config = module_config;
124 modules.http = registry.maybe_module(http.as_ref());
125 modules.ws = registry.maybe_module(ws.as_ref());
126 }
127
128 modules
129 }
130}
131
132#[derive(Clone)]
134pub struct RpcRegistryInner {
135 consensus: SharedConsensusGraph,
136 config: RpcImplConfiguration,
137 sync: SharedSynchronizationService,
138 tx_pool: SharedTransactionPool,
139 modules: HashMap<EthRpcModule, Methods>,
140 executor: TaskExecutor,
141 notifications: Arc<Notifications>,
142}
143
144impl RpcRegistryInner {
145 pub fn new(
146 config: RpcImplConfiguration, consensus: SharedConsensusGraph,
147 sync: SharedSynchronizationService, tx_pool: SharedTransactionPool,
148 executor: TaskExecutor, notifications: Arc<Notifications>,
149 ) -> Self {
150 Self {
151 consensus,
152 config,
153 sync,
154 tx_pool,
155 modules: Default::default(),
156 executor,
157 notifications,
158 }
159 }
160
161 pub fn methods(&self) -> Vec<Methods> {
163 self.modules.values().cloned().collect()
164 }
165
166 pub fn module(&self) -> RpcModule<()> {
168 let mut module = RpcModule::new(());
169 for methods in self.modules.values().cloned() {
170 module.merge(methods).expect("No conflicts");
171 }
172 module
173 }
174}
175
176impl RpcRegistryInner {
177 pub fn web3_api(&self) -> Web3Api { Web3Api }
178
179 pub fn register_web3(&mut self) -> &mut Self {
180 let web3api = self.web3_api();
181 self.modules
182 .insert(EthRpcModule::Web3, web3api.into_rpc().into());
183 self
184 }
185
186 pub fn trace_api(&self) -> TraceApi {
187 TraceApi::new(
188 self.consensus.clone(),
189 self.sync.network.get_network_type().clone(),
190 )
191 }
192
193 pub fn debug_api(&self) -> DebugApi {
194 DebugApi::new(
195 self.consensus.clone(),
196 self.config.max_estimation_gas_limit,
197 )
198 }
199
200 pub fn net_api(&self) -> NetApi {
201 NetApi::new(Box::new(ChainInfo::new(self.consensus.clone())))
202 }
203
204 fn maybe_module(
206 &mut self, config: Option<&RpcModuleSelection>,
207 ) -> Option<RpcModule<()>> {
208 config.map(|config| self.module_for(config))
209 }
210
211 pub fn module_for(&mut self, config: &RpcModuleSelection) -> RpcModule<()> {
214 let mut module = RpcModule::new(());
215 let all_methods = self.eth_methods(config.iter_selection());
216 for methods in all_methods {
217 module.merge(methods).expect("No conflicts");
218 }
219 module
220 }
221
222 pub fn eth_methods(
223 &mut self, namespaces: impl Iterator<Item = EthRpcModule>,
224 ) -> Vec<Methods> {
225 let namespaces: Vec<_> = namespaces.collect();
226 let module_version = namespaces
227 .iter()
228 .map(|module| (module.to_string(), "1.0".to_string()))
229 .collect::<HashMap<String, String>>();
230
231 let namespace_methods = |namespace| {
232 self.modules
233 .entry(namespace)
234 .or_insert_with(|| match namespace {
235 EthRpcModule::Debug => DebugApi::new(
236 self.consensus.clone(),
237 self.config.max_estimation_gas_limit,
238 )
239 .into_rpc()
240 .into(),
241 EthRpcModule::Eth => {
242 let mut module = EthApi::new(
243 self.config.clone(),
244 self.consensus.clone(),
245 self.sync.clone(),
246 self.tx_pool.clone(),
247 self.executor.clone(),
248 )
249 .into_rpc();
250 if self.config.poll_lifetime_in_seconds.is_some() {
251 let filter_module = EthFilterApi::new(
252 self.consensus.clone(),
253 self.tx_pool.clone(),
254 self.notifications.epochs_ordered.clone(),
255 self.executor.clone(),
256 self.config.poll_lifetime_in_seconds.unwrap(),
257 self.config.get_logs_filter_max_limit,
258 )
259 .into_rpc();
260 module.merge(filter_module).expect("No conflicts");
261 }
262 module.into()
263 }
264 EthRpcModule::Net => NetApi::new(Box::new(ChainInfo::new(
265 self.consensus.clone(),
266 )))
267 .into_rpc()
268 .into(),
269 EthRpcModule::Trace => TraceApi::new(
270 self.consensus.clone(),
271 self.sync.network.get_network_type().clone(),
272 )
273 .into_rpc()
274 .into(),
275 EthRpcModule::Web3 => Web3Api.into_rpc().into(),
276 EthRpcModule::Rpc => {
277 RPCApi::new(module_version.clone()).into_rpc().into()
278 }
279 EthRpcModule::Parity => {
280 let eth_api = EthApi::new(
281 self.config.clone(),
282 self.consensus.clone(),
283 self.sync.clone(),
284 self.tx_pool.clone(),
285 self.executor.clone(),
286 );
287 ParityApi::new(eth_api).into_rpc().into()
288 }
289 EthRpcModule::Txpool => {
290 TxPoolApi::new(self.tx_pool.clone()).into_rpc().into()
291 }
292 EthRpcModule::PubSub => PubSubApi::new(
293 self.consensus.clone(),
294 self.notifications.clone(),
295 self.executor.clone(),
296 )
297 .into_rpc()
298 .into(),
299 })
300 .clone()
301 };
302
303 namespaces
304 .iter()
305 .copied()
306 .map(namespace_methods)
307 .collect::<Vec<_>>()
308 }
309}
310
311#[derive(Debug)]
324pub struct RpcServerConfig {
325 http_server_config: Option<ServerConfigBuilder>,
327 http_cors_domains: Option<String>,
329 http_addr: Option<SocketAddr>,
331 ws_server_config: Option<ServerConfigBuilder>,
333 ws_cors_domains: Option<String>,
335 ws_addr: Option<SocketAddr>,
337}
338
339impl Default for RpcServerConfig {
340 fn default() -> Self {
341 Self {
342 http_server_config: None,
343 http_cors_domains: None,
344 http_addr: None,
345 ws_server_config: None,
346 ws_cors_domains: None,
347 ws_addr: None,
348 }
350 }
351}
352
353impl RpcServerConfig {
354 pub fn http(config: ServerConfigBuilder) -> Self {
356 Self::default().with_http(config)
357 }
358
359 pub fn ws(config: ServerConfigBuilder) -> Self {
361 Self::default().with_ws(config)
362 }
363
364 pub fn with_http(mut self, config: ServerConfigBuilder) -> Self {
370 self.http_server_config =
371 Some(config.set_id_provider(SubscriptionIdProvider::default()));
372 self
373 }
374
375 pub fn with_ws(mut self, config: ServerConfigBuilder) -> Self {
381 self.ws_server_config =
382 Some(config.set_id_provider(SubscriptionIdProvider::default()));
383 self
384 }
385}
386
387impl RpcServerConfig {
388 pub fn with_cors(self, cors_domain: Option<String>) -> Self {
390 self.with_http_cors(cors_domain.clone())
391 .with_ws_cors(cors_domain)
392 }
393
394 pub fn with_ws_cors(mut self, cors_domain: Option<String>) -> Self {
396 self.ws_cors_domains = cors_domain;
397 self
398 }
399
400 pub fn with_http_cors(mut self, cors_domain: Option<String>) -> Self {
402 self.http_cors_domains = cors_domain;
403 self
404 }
405
406 pub const fn with_http_address(mut self, addr: SocketAddr) -> Self {
410 self.http_addr = Some(addr);
411 self
412 }
413
414 pub const fn with_ws_address(mut self, addr: SocketAddr) -> Self {
418 self.ws_addr = Some(addr);
419 self
420 }
421
422 pub fn with_id_provider<I>(mut self, id_provider: I) -> Self
426 where I: IdProvider + Clone + 'static {
427 if let Some(http) = self.http_server_config {
428 self.http_server_config =
429 Some(http.set_id_provider(id_provider.clone()));
430 }
431 if let Some(ws) = self.ws_server_config {
432 self.ws_server_config =
433 Some(ws.set_id_provider(id_provider.clone()));
434 }
435
436 self
437 }
438
439 pub const fn has_server(&self) -> bool {
444 self.http_server_config.is_some() || self.ws_server_config.is_some()
445 }
446
447 pub const fn http_address(&self) -> Option<SocketAddr> { self.http_addr }
449
450 pub const fn ws_address(&self) -> Option<SocketAddr> { self.ws_addr }
452
453 pub async fn start(
460 self, modules: &TransportRpcModules,
461 throttling_conf_file: Option<String>, enable_metrics: bool,
462 ) -> Result<RpcServerHandle, RpcError> {
463 debug!("enable metrics: {}", enable_metrics);
465
466 let rpc_middleware = RpcServiceBuilder::new()
467 .layer_fn(move |s| {
468 Throttle::new(
469 throttling_conf_file.as_ref().map(|s| s.as_str()),
470 "rpc",
471 s,
472 )
473 })
474 .layer_fn(|s| Metrics::new(s))
475 .layer_fn(|s| Logger::new(s));
476
477 let http_socket_addr = self.http_addr.unwrap_or(SocketAddr::V4(
478 SocketAddrV4::new(Ipv4Addr::LOCALHOST, DEFAULT_HTTP_PORT),
479 ));
480
481 let ws_socket_addr = self.ws_addr.unwrap_or(SocketAddr::V4(
482 SocketAddrV4::new(Ipv4Addr::LOCALHOST, DEFAULT_WS_PORT),
483 ));
484
485 if self.http_addr == self.ws_addr
488 && self.http_server_config.is_some()
489 && self.ws_server_config.is_some()
490 {
491 modules.config.ensure_ws_http_identical()?;
493
494 if let Some(config) = self.http_server_config {
495 let server = ServerBuilder::new()
496 .set_rpc_middleware(rpc_middleware)
497 .set_config(config.build())
498 .build(http_socket_addr)
499 .await
500 .map_err(|err| {
501 RpcError::server_error(
502 err,
503 ServerKind::WsHttp(http_socket_addr),
504 )
505 })?;
506 let addr = server.local_addr().map_err(|err| {
507 RpcError::server_error(
508 err,
509 ServerKind::WsHttp(http_socket_addr),
510 )
511 })?;
512 if let Some(module) =
513 modules.http.as_ref().or(modules.ws.as_ref())
514 {
515 let handle = server.start(module.clone());
516 let http_handle = Some(handle.clone());
517 let ws_handle = Some(handle);
518
519 return Ok(RpcServerHandle {
520 http_local_addr: Some(addr),
521 ws_local_addr: Some(addr),
522 http: http_handle,
523 ws: ws_handle,
524 });
525 }
526
527 return Err(RpcError::Custom(
528 "No valid RpcModule found from modules".to_string(),
529 ));
530 }
531 }
532
533 let mut result = RpcServerHandle {
534 http_local_addr: None,
535 ws_local_addr: None,
536 http: None,
537 ws: None,
538 };
539 if let Some(config) = self.ws_server_config {
540 let server = ServerBuilder::new()
541 .set_config(config.ws_only().build())
542 .set_rpc_middleware(rpc_middleware.clone())
543 .build(ws_socket_addr)
544 .await
545 .map_err(|err| {
546 RpcError::server_error(err, ServerKind::WS(ws_socket_addr))
547 })?;
548
549 let addr = server.local_addr().map_err(|err| {
550 RpcError::server_error(err, ServerKind::WS(ws_socket_addr))
551 })?;
552
553 let ws_local_addr = Some(addr);
554 let ws_server = Some(server);
555 let ws_handle = ws_server.map(|ws_server| {
556 ws_server.start(modules.ws.clone().expect("ws server error"))
557 });
558
559 result.ws = ws_handle;
560 result.ws_local_addr = ws_local_addr;
561 }
562
563 if let Some(config) = self.http_server_config {
564 let server = ServerBuilder::new()
565 .set_config(config.http_only().build())
566 .set_rpc_middleware(rpc_middleware)
567 .build(http_socket_addr)
568 .await
569 .map_err(|err| {
570 RpcError::server_error(
571 err,
572 ServerKind::Http(http_socket_addr),
573 )
574 })?;
575 let local_addr = server.local_addr().map_err(|err| {
576 RpcError::server_error(err, ServerKind::Http(http_socket_addr))
577 })?;
578 let http_local_addr = Some(local_addr);
579 let http_server = Some(server);
580 let http_handle = http_server.map(|http_server| {
581 http_server
582 .start(modules.http.clone().expect("http server error"))
583 });
584
585 result.http = http_handle;
586 result.http_local_addr = http_local_addr;
587 }
588
589 Ok(result)
590 }
591}
592
593#[derive(Debug, Clone, Default, Eq, PartialEq)]
595pub struct TransportRpcModuleConfig {
596 http: Option<RpcModuleSelection>,
598 ws: Option<RpcModuleSelection>,
600}
601
602impl TransportRpcModuleConfig {
603 pub fn set_http(http: impl Into<RpcModuleSelection>) -> Self {
605 Self::default().with_http(http)
606 }
607
608 pub fn set_ws(ws: impl Into<RpcModuleSelection>) -> Self {
610 Self::default().with_ws(ws)
611 }
612
613 pub fn with_http(mut self, http: impl Into<RpcModuleSelection>) -> Self {
615 self.http = Some(http.into());
616 self
617 }
618
619 pub fn with_ws(mut self, ws: impl Into<RpcModuleSelection>) -> Self {
621 self.ws = Some(ws.into());
622 self
623 }
624
625 pub fn http_mut(&mut self) -> &mut Option<RpcModuleSelection> {
627 &mut self.http
628 }
629
630 pub fn ws_mut(&mut self) -> &mut Option<RpcModuleSelection> { &mut self.ws }
632
633 pub const fn is_empty(&self) -> bool {
635 self.http.is_none() && self.ws.is_none()
636 }
637
638 pub const fn http(&self) -> Option<&RpcModuleSelection> {
640 self.http.as_ref()
641 }
642
643 pub const fn ws(&self) -> Option<&RpcModuleSelection> { self.ws.as_ref() }
645
646 fn ensure_ws_http_identical(&self) -> Result<(), WsHttpSamePortError> {
649 if RpcModuleSelection::are_identical(
650 self.http.as_ref(),
651 self.ws.as_ref(),
652 ) {
653 Ok(())
654 } else {
655 let http_modules = self
656 .http
657 .as_ref()
658 .map(RpcModuleSelection::to_selection)
659 .unwrap_or_default();
660 let ws_modules = self
661 .ws
662 .as_ref()
663 .map(RpcModuleSelection::to_selection)
664 .unwrap_or_default();
665
666 let http_not_ws =
667 http_modules.difference(&ws_modules).copied().collect();
668 let ws_not_http =
669 ws_modules.difference(&http_modules).copied().collect();
670 let overlap =
671 http_modules.intersection(&ws_modules).copied().collect();
672
673 Err(WsHttpSamePortError::ConflictingModules(Box::new(
674 ConflictingModules {
675 overlap,
676 http_not_ws,
677 ws_not_http,
678 },
679 )))
680 }
681 }
682}
683
684#[derive(Debug, Clone, Default)]
686pub struct TransportRpcModules<Context = ()> {
687 config: TransportRpcModuleConfig,
689 http: Option<RpcModule<Context>>,
691 ws: Option<RpcModule<Context>>,
693}
694
695impl TransportRpcModules {
698 pub const fn module_config(&self) -> &TransportRpcModuleConfig {
701 &self.config
702 }
703
704 pub fn merge_http(
710 &mut self, other: impl Into<Methods>,
711 ) -> Result<bool, RegisterMethodError> {
712 if let Some(ref mut http) = self.http {
713 return http.merge(other.into()).map(|_| true);
714 }
715 Ok(false)
716 }
717
718 pub fn merge_ws(
724 &mut self, other: impl Into<Methods>,
725 ) -> Result<bool, RegisterMethodError> {
726 if let Some(ref mut ws) = self.ws {
727 return ws.merge(other.into()).map(|_| true);
728 }
729 Ok(false)
730 }
731
732 pub fn merge_configured(
736 &mut self, other: impl Into<Methods>,
737 ) -> Result<(), RegisterMethodError> {
738 let other = other.into();
739 self.merge_http(other.clone())?;
740 self.merge_ws(other.clone())?;
741 Ok(())
742 }
743
744 pub fn remove_http_method(&mut self, method_name: &'static str) -> bool {
752 if let Some(http_module) = &mut self.http {
753 http_module.remove_method(method_name).is_some()
754 } else {
755 false
756 }
757 }
758
759 pub fn remove_ws_method(&mut self, method_name: &'static str) -> bool {
767 if let Some(ws_module) = &mut self.ws {
768 ws_module.remove_method(method_name).is_some()
769 } else {
770 false
771 }
772 }
773
774 pub fn remove_method_from_configured(
778 &mut self, method_name: &'static str,
779 ) -> bool {
780 let http_removed = self.remove_http_method(method_name);
781 let ws_removed = self.remove_ws_method(method_name);
782
783 http_removed || ws_removed
784 }
785}