1mod constants;
30mod error;
31mod id_provider;
32mod module;
33
34use cfx_rpc_middlewares::{Logger, Metrics, Throttle};
35pub use error::*;
36pub use id_provider::EthSubscriptionIdProvider;
37use log::debug;
38pub use module::{EthRpcModule, RpcModuleSelection};
39
40use cfx_rpc::{helpers::ChainInfo, *};
41use cfx_rpc_cfx_types::RpcImplConfiguration;
42use cfx_rpc_eth_api::*;
43use cfx_tasks::TaskExecutor;
44use cfxcore::{
45 Notifications, SharedConsensusGraph, SharedSynchronizationService,
46 SharedTransactionPool,
47};
48pub use jsonrpsee::server::ServerBuilder;
49use jsonrpsee::{
50 core::RegisterMethodError,
51 server::{
52 middleware::rpc::RpcServiceBuilder, AlreadyStoppedError, IdProvider,
53 ServerConfigBuilder, ServerHandle,
54 },
55 Methods, RpcModule,
56};
57use std::{
58 collections::HashMap,
59 net::{Ipv4Addr, SocketAddr, SocketAddrV4},
60 sync::Arc,
61};
62
63#[derive(Clone)]
67pub struct RpcModuleBuilder {
68 config: RpcImplConfiguration,
69 consensus: SharedConsensusGraph,
70 sync: SharedSynchronizationService,
71 tx_pool: SharedTransactionPool,
72 executor: TaskExecutor,
73 notifications: Arc<Notifications>,
74}
75
76impl RpcModuleBuilder {
77 pub fn new(
78 config: RpcImplConfiguration, consensus: SharedConsensusGraph,
79 sync: SharedSynchronizationService, tx_pool: SharedTransactionPool,
80 executor: TaskExecutor, notifications: Arc<Notifications>,
81 ) -> Self {
82 Self {
83 config,
84 consensus,
85 sync,
86 tx_pool,
87 executor,
88 notifications,
89 }
90 }
91
92 pub fn build(
96 self, module_config: TransportRpcModuleConfig,
97 ) -> TransportRpcModules<()> {
98 let mut modules = TransportRpcModules::default();
99
100 if !module_config.is_empty() {
101 let TransportRpcModuleConfig { http, ws } = module_config.clone();
102
103 let Self {
104 config,
105 consensus,
106 sync,
107 tx_pool,
108 executor,
109 notifications,
110 } = self;
111
112 let mut registry = RpcRegistryInner::new(
113 config,
114 consensus,
115 sync,
116 tx_pool,
117 executor,
118 notifications,
119 );
120
121 modules.config = module_config;
122 modules.http = registry.maybe_module(http.as_ref());
123 modules.ws = registry.maybe_module(ws.as_ref());
124 }
125
126 modules
127 }
128}
129
130#[derive(Clone)]
132pub struct RpcRegistryInner {
133 consensus: SharedConsensusGraph,
134 config: RpcImplConfiguration,
135 sync: SharedSynchronizationService,
136 tx_pool: SharedTransactionPool,
137 modules: HashMap<EthRpcModule, Methods>,
138 executor: TaskExecutor,
139 notifications: Arc<Notifications>,
140}
141
142impl RpcRegistryInner {
143 pub fn new(
144 config: RpcImplConfiguration, consensus: SharedConsensusGraph,
145 sync: SharedSynchronizationService, tx_pool: SharedTransactionPool,
146 executor: TaskExecutor, notifications: Arc<Notifications>,
147 ) -> Self {
148 Self {
149 consensus,
150 config,
151 sync,
152 tx_pool,
153 modules: Default::default(),
154 executor,
155 notifications,
156 }
157 }
158
159 pub fn methods(&self) -> Vec<Methods> {
161 self.modules.values().cloned().collect()
162 }
163
164 pub fn module(&self) -> RpcModule<()> {
166 let mut module = RpcModule::new(());
167 for methods in self.modules.values().cloned() {
168 module.merge(methods).expect("No conflicts");
169 }
170 module
171 }
172}
173
174impl RpcRegistryInner {
175 pub fn web3_api(&self) -> Web3Api { Web3Api }
176
177 pub fn register_web3(&mut self) -> &mut Self {
178 let web3api = self.web3_api();
179 self.modules
180 .insert(EthRpcModule::Web3, web3api.into_rpc().into());
181 self
182 }
183
184 pub fn trace_api(&self) -> TraceApi {
185 TraceApi::new(
186 self.consensus.clone(),
187 self.sync.network.get_network_type().clone(),
188 )
189 }
190
191 pub fn debug_api(&self) -> DebugApi {
192 DebugApi::new(
193 self.consensus.clone(),
194 self.config.max_estimation_gas_limit,
195 )
196 }
197
198 pub fn net_api(&self) -> NetApi {
199 NetApi::new(Box::new(ChainInfo::new(self.consensus.clone())))
200 }
201
202 fn maybe_module(
204 &mut self, config: Option<&RpcModuleSelection>,
205 ) -> Option<RpcModule<()>> {
206 config.map(|config| self.module_for(config))
207 }
208
209 pub fn module_for(&mut self, config: &RpcModuleSelection) -> RpcModule<()> {
212 let mut module = RpcModule::new(());
213 let all_methods = self.eth_methods(config.iter_selection());
214 for methods in all_methods {
215 module.merge(methods).expect("No conflicts");
216 }
217 module
218 }
219
220 pub fn eth_methods(
221 &mut self, namespaces: impl Iterator<Item = EthRpcModule>,
222 ) -> Vec<Methods> {
223 let namespaces: Vec<_> = namespaces.collect();
224 let module_version = namespaces
225 .iter()
226 .map(|module| (module.to_string(), "1.0".to_string()))
227 .collect::<HashMap<String, String>>();
228
229 let namespace_methods = |namespace| {
230 self.modules
231 .entry(namespace)
232 .or_insert_with(|| match namespace {
233 EthRpcModule::Debug => DebugApi::new(
234 self.consensus.clone(),
235 self.config.max_estimation_gas_limit,
236 )
237 .into_rpc()
238 .into(),
239 EthRpcModule::Eth => {
240 let mut module = EthApi::new(
241 self.config.clone(),
242 self.consensus.clone(),
243 self.sync.clone(),
244 self.tx_pool.clone(),
245 self.executor.clone(),
246 )
247 .into_rpc();
248 if self.config.poll_lifetime_in_seconds.is_some() {
249 let filter_module = EthFilterApi::new(
250 self.consensus.clone(),
251 self.tx_pool.clone(),
252 self.notifications.epochs_ordered.clone(),
253 self.executor.clone(),
254 self.config.poll_lifetime_in_seconds.unwrap(),
255 self.config.get_logs_filter_max_limit,
256 )
257 .into_rpc();
258 module.merge(filter_module).expect("No conflicts");
259 }
260 module.into()
261 }
262 EthRpcModule::Net => NetApi::new(Box::new(ChainInfo::new(
263 self.consensus.clone(),
264 )))
265 .into_rpc()
266 .into(),
267 EthRpcModule::Trace => TraceApi::new(
268 self.consensus.clone(),
269 self.sync.network.get_network_type().clone(),
270 )
271 .into_rpc()
272 .into(),
273 EthRpcModule::Web3 => Web3Api.into_rpc().into(),
274 EthRpcModule::Rpc => {
275 RPCApi::new(module_version.clone()).into_rpc().into()
276 }
277 EthRpcModule::Parity => {
278 let eth_api = EthApi::new(
279 self.config.clone(),
280 self.consensus.clone(),
281 self.sync.clone(),
282 self.tx_pool.clone(),
283 self.executor.clone(),
284 );
285 ParityApi::new(eth_api).into_rpc().into()
286 }
287 EthRpcModule::Txpool => {
288 TxPoolApi::new(self.tx_pool.clone()).into_rpc().into()
289 }
290 EthRpcModule::PubSub => PubSubApi::new(
291 self.consensus.clone(),
292 self.notifications.clone(),
293 self.executor.clone(),
294 )
295 .into_rpc()
296 .into(),
297 })
298 .clone()
299 };
300
301 namespaces
302 .iter()
303 .copied()
304 .map(namespace_methods)
305 .collect::<Vec<_>>()
306 }
307}
308
309#[derive(Debug)]
322pub struct RpcServerConfig {
323 http_server_config: Option<ServerConfigBuilder>,
325 http_cors_domains: Option<String>,
327 http_addr: Option<SocketAddr>,
329 ws_server_config: Option<ServerConfigBuilder>,
331 ws_cors_domains: Option<String>,
333 ws_addr: Option<SocketAddr>,
335}
336
337impl Default for RpcServerConfig {
338 fn default() -> Self {
339 Self {
340 http_server_config: None,
341 http_cors_domains: None,
342 http_addr: None,
343 ws_server_config: None,
344 ws_cors_domains: None,
345 ws_addr: None,
346 }
348 }
349}
350
351impl RpcServerConfig {
352 pub fn http(config: ServerConfigBuilder) -> Self {
354 Self::default().with_http(config)
355 }
356
357 pub fn ws(config: ServerConfigBuilder) -> Self {
359 Self::default().with_ws(config)
360 }
361
362 pub fn with_http(mut self, config: ServerConfigBuilder) -> Self {
368 self.http_server_config =
369 Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
370 self
371 }
372
373 pub fn with_ws(mut self, config: ServerConfigBuilder) -> Self {
379 self.ws_server_config =
380 Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
381 self
382 }
383}
384
385impl RpcServerConfig {
386 pub fn with_cors(self, cors_domain: Option<String>) -> Self {
388 self.with_http_cors(cors_domain.clone())
389 .with_ws_cors(cors_domain)
390 }
391
392 pub fn with_ws_cors(mut self, cors_domain: Option<String>) -> Self {
394 self.ws_cors_domains = cors_domain;
395 self
396 }
397
398 pub fn with_http_cors(mut self, cors_domain: Option<String>) -> Self {
400 self.http_cors_domains = cors_domain;
401 self
402 }
403
404 pub const fn with_http_address(mut self, addr: SocketAddr) -> Self {
408 self.http_addr = Some(addr);
409 self
410 }
411
412 pub const fn with_ws_address(mut self, addr: SocketAddr) -> Self {
416 self.ws_addr = Some(addr);
417 self
418 }
419
420 pub fn with_id_provider<I>(mut self, id_provider: I) -> Self
424 where I: IdProvider + Clone + 'static {
425 if let Some(http) = self.http_server_config {
426 self.http_server_config =
427 Some(http.set_id_provider(id_provider.clone()));
428 }
429 if let Some(ws) = self.ws_server_config {
430 self.ws_server_config =
431 Some(ws.set_id_provider(id_provider.clone()));
432 }
433
434 self
435 }
436
437 pub const fn has_server(&self) -> bool {
442 self.http_server_config.is_some() || self.ws_server_config.is_some()
443 }
444
445 pub const fn http_address(&self) -> Option<SocketAddr> { self.http_addr }
447
448 pub const fn ws_address(&self) -> Option<SocketAddr> { self.ws_addr }
450
451 pub async fn start(
458 self, modules: &TransportRpcModules,
459 throttling_conf_file: Option<String>, enable_metrics: bool,
460 ) -> Result<RpcServerHandle, RpcError> {
461 debug!("enable metrics: {}", enable_metrics);
463
464 let rpc_middleware = RpcServiceBuilder::new()
465 .layer_fn(move |s| {
466 Throttle::new(
467 throttling_conf_file.as_ref().map(|s| s.as_str()),
468 "rpc",
469 s,
470 )
471 })
472 .layer_fn(|s| Metrics::new(s))
473 .layer_fn(|s| Logger::new(s));
474
475 let http_socket_addr =
476 self.http_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new(
477 Ipv4Addr::LOCALHOST,
478 constants::DEFAULT_HTTP_PORT,
479 )));
480
481 let ws_socket_addr = self.ws_addr.unwrap_or(SocketAddr::V4(
482 SocketAddrV4::new(Ipv4Addr::LOCALHOST, constants::DEFAULT_WS_PORT),
483 ));
484
485 if self.http_addr == self.ws_addr
488 && self.http_server_config.is_some()
489 && self.ws_server_config.is_some()
490 {
491 modules.config.ensure_ws_http_identical()?;
493
494 if let Some(config) = self.http_server_config {
495 let server = ServerBuilder::new()
496 .set_rpc_middleware(rpc_middleware)
497 .set_config(config.build())
498 .build(http_socket_addr)
499 .await
500 .map_err(|err| {
501 RpcError::server_error(
502 err,
503 ServerKind::WsHttp(http_socket_addr),
504 )
505 })?;
506 let addr = server.local_addr().map_err(|err| {
507 RpcError::server_error(
508 err,
509 ServerKind::WsHttp(http_socket_addr),
510 )
511 })?;
512 if let Some(module) =
513 modules.http.as_ref().or(modules.ws.as_ref())
514 {
515 let handle = server.start(module.clone());
516 let http_handle = Some(handle.clone());
517 let ws_handle = Some(handle);
518
519 return Ok(RpcServerHandle {
520 http_local_addr: Some(addr),
521 ws_local_addr: Some(addr),
522 http: http_handle,
523 ws: ws_handle,
524 });
525 }
526
527 return Err(RpcError::Custom(
528 "No valid RpcModule found from modules".to_string(),
529 ));
530 }
531 }
532
533 let mut result = RpcServerHandle {
534 http_local_addr: None,
535 ws_local_addr: None,
536 http: None,
537 ws: None,
538 };
539 if let Some(config) = self.ws_server_config {
540 let server = ServerBuilder::new()
541 .set_config(config.ws_only().build())
542 .set_rpc_middleware(rpc_middleware.clone())
543 .build(ws_socket_addr)
544 .await
545 .map_err(|err| {
546 RpcError::server_error(err, ServerKind::WS(ws_socket_addr))
547 })?;
548
549 let addr = server.local_addr().map_err(|err| {
550 RpcError::server_error(err, ServerKind::WS(ws_socket_addr))
551 })?;
552
553 let ws_local_addr = Some(addr);
554 let ws_server = Some(server);
555 let ws_handle = ws_server.map(|ws_server| {
556 ws_server.start(modules.ws.clone().expect("ws server error"))
557 });
558
559 result.ws = ws_handle;
560 result.ws_local_addr = ws_local_addr;
561 }
562
563 if let Some(config) = self.http_server_config {
564 let server = ServerBuilder::new()
565 .set_config(config.http_only().build())
566 .set_rpc_middleware(rpc_middleware)
567 .build(http_socket_addr)
568 .await
569 .map_err(|err| {
570 RpcError::server_error(
571 err,
572 ServerKind::Http(http_socket_addr),
573 )
574 })?;
575 let local_addr = server.local_addr().map_err(|err| {
576 RpcError::server_error(err, ServerKind::Http(http_socket_addr))
577 })?;
578 let http_local_addr = Some(local_addr);
579 let http_server = Some(server);
580 let http_handle = http_server.map(|http_server| {
581 http_server
582 .start(modules.http.clone().expect("http server error"))
583 });
584
585 result.http = http_handle;
586 result.http_local_addr = http_local_addr;
587 }
588
589 Ok(result)
590 }
591}
592
593#[derive(Debug, Clone, Default, Eq, PartialEq)]
595pub struct TransportRpcModuleConfig {
596 http: Option<RpcModuleSelection>,
598 ws: Option<RpcModuleSelection>,
600}
601
602impl TransportRpcModuleConfig {
603 pub fn set_http(http: impl Into<RpcModuleSelection>) -> Self {
605 Self::default().with_http(http)
606 }
607
608 pub fn set_ws(ws: impl Into<RpcModuleSelection>) -> Self {
610 Self::default().with_ws(ws)
611 }
612
613 pub fn with_http(mut self, http: impl Into<RpcModuleSelection>) -> Self {
615 self.http = Some(http.into());
616 self
617 }
618
619 pub fn with_ws(mut self, ws: impl Into<RpcModuleSelection>) -> Self {
621 self.ws = Some(ws.into());
622 self
623 }
624
625 pub fn http_mut(&mut self) -> &mut Option<RpcModuleSelection> {
627 &mut self.http
628 }
629
630 pub fn ws_mut(&mut self) -> &mut Option<RpcModuleSelection> { &mut self.ws }
632
633 pub const fn is_empty(&self) -> bool {
635 self.http.is_none() && self.ws.is_none()
636 }
637
638 pub const fn http(&self) -> Option<&RpcModuleSelection> {
640 self.http.as_ref()
641 }
642
643 pub const fn ws(&self) -> Option<&RpcModuleSelection> { self.ws.as_ref() }
645
646 fn ensure_ws_http_identical(&self) -> Result<(), WsHttpSamePortError> {
649 if RpcModuleSelection::are_identical(
650 self.http.as_ref(),
651 self.ws.as_ref(),
652 ) {
653 Ok(())
654 } else {
655 let http_modules = self
656 .http
657 .as_ref()
658 .map(RpcModuleSelection::to_selection)
659 .unwrap_or_default();
660 let ws_modules = self
661 .ws
662 .as_ref()
663 .map(RpcModuleSelection::to_selection)
664 .unwrap_or_default();
665
666 let http_not_ws =
667 http_modules.difference(&ws_modules).copied().collect();
668 let ws_not_http =
669 ws_modules.difference(&http_modules).copied().collect();
670 let overlap =
671 http_modules.intersection(&ws_modules).copied().collect();
672
673 Err(WsHttpSamePortError::ConflictingModules(Box::new(
674 ConflictingModules {
675 overlap,
676 http_not_ws,
677 ws_not_http,
678 },
679 )))
680 }
681 }
682}
683
684#[derive(Debug, Clone, Default)]
686pub struct TransportRpcModules<Context = ()> {
687 config: TransportRpcModuleConfig,
689 http: Option<RpcModule<Context>>,
691 ws: Option<RpcModule<Context>>,
693}
694
695impl TransportRpcModules {
698 pub const fn module_config(&self) -> &TransportRpcModuleConfig {
701 &self.config
702 }
703
704 pub fn merge_http(
710 &mut self, other: impl Into<Methods>,
711 ) -> Result<bool, RegisterMethodError> {
712 if let Some(ref mut http) = self.http {
713 return http.merge(other.into()).map(|_| true);
714 }
715 Ok(false)
716 }
717
718 pub fn merge_ws(
724 &mut self, other: impl Into<Methods>,
725 ) -> Result<bool, RegisterMethodError> {
726 if let Some(ref mut ws) = self.ws {
727 return ws.merge(other.into()).map(|_| true);
728 }
729 Ok(false)
730 }
731
732 pub fn merge_configured(
736 &mut self, other: impl Into<Methods>,
737 ) -> Result<(), RegisterMethodError> {
738 let other = other.into();
739 self.merge_http(other.clone())?;
740 self.merge_ws(other.clone())?;
741 Ok(())
742 }
743
744 pub fn remove_http_method(&mut self, method_name: &'static str) -> bool {
752 if let Some(http_module) = &mut self.http {
753 http_module.remove_method(method_name).is_some()
754 } else {
755 false
756 }
757 }
758
759 pub fn remove_ws_method(&mut self, method_name: &'static str) -> bool {
767 if let Some(ws_module) = &mut self.ws {
768 ws_module.remove_method(method_name).is_some()
769 } else {
770 false
771 }
772 }
773
774 pub fn remove_method_from_configured(
778 &mut self, method_name: &'static str,
779 ) -> bool {
780 let http_removed = self.remove_http_method(method_name);
781 let ws_removed = self.remove_ws_method(method_name);
782
783 http_removed || ws_removed
784 }
785}
786
787#[derive(Clone, Debug)]
792#[must_use = "Server stops if dropped"]
793pub struct RpcServerHandle {
794 http_local_addr: Option<SocketAddr>,
796 ws_local_addr: Option<SocketAddr>,
797 http: Option<ServerHandle>,
798 ws: Option<ServerHandle>,
799}
800
801impl RpcServerHandle {
802 pub const fn http_local_addr(&self) -> Option<SocketAddr> {
804 self.http_local_addr
805 }
806
807 pub const fn ws_local_addr(&self) -> Option<SocketAddr> {
809 self.ws_local_addr
810 }
811
812 pub fn stop(self) -> Result<(), AlreadyStoppedError> {
814 if let Some(handle) = self.http {
815 handle.stop()?
816 }
817
818 if let Some(handle) = self.ws {
819 handle.stop()?
820 }
821
822 Ok(())
823 }
824
825 pub fn http_url(&self) -> Option<String> {
827 self.http_local_addr.map(|addr| format!("http://{addr}"))
828 }
829
830 pub fn ws_url(&self) -> Option<String> {
832 self.ws_local_addr.map(|addr| format!("ws://{addr}"))
833 }
834}