1mod module;
2
3pub use crate::{
4 error::*, id_provider::SubscriptionIdProvider, RpcServerHandle,
5};
6pub use module::{CfxRpcModule, RpcModuleSelection};
7
8use blockgen::BlockGeneratorTestApi;
9use cfx_rpc_cfx_api::{
10 CfxDebugRpcServer, CfxFilterRpcServer, CfxRpcServer, DebugRpcServer,
11 PosRpcServer, PubSubApiServer, TestRpcServer, TraceServer, TxPoolServer,
12};
13use cfx_rpc_cfx_impl::{
14 CfxFilterHandler, CfxHandler, DebugHandler, PosHandler, PubSubHandler,
15 TestHandler, TraceHandler, TxPoolHandler,
16};
17use cfx_rpc_cfx_types::RpcImplConfiguration;
18use cfx_tasks::TaskExecutor;
19use cfxcore::{
20 block_data_manager::BlockDataManager, consensus::pos_handler::PosVerifier,
21 Notifications, SharedConsensusGraph, SharedSynchronizationService,
22 SharedTransactionPool,
23};
24use cfxcore_accounts::AccountProvider;
25use jsonrpsee::{
26 core::RegisterMethodError,
27 server::{IdProvider, ServerBuilder, ServerConfigBuilder},
28 Methods, RpcModule,
29};
30use network::NetworkService;
31use parking_lot::{Condvar, Mutex};
32use std::{
33 collections::{HashMap, HashSet},
34 net::{Ipv4Addr, SocketAddr, SocketAddrV4},
35 sync::Arc,
36};
37use txgen::{DirectTransactionGenerator, TransactionGenerator};
38
39pub const DEFAULT_HTTP_PORT: u16 = 12537;
40pub const DEFAULT_WS_PORT: u16 = 12538;
41
42#[derive(Clone)]
43pub struct RpcModuleBuilder {
44 rpc_impl_config: RpcImplConfiguration,
45 consensus: SharedConsensusGraph,
46 sync: SharedSynchronizationService,
47 tx_pool: SharedTransactionPool,
48 executor: TaskExecutor,
49 data_man: Arc<BlockDataManager>,
50 network: Arc<NetworkService>,
51 pos_handler: Arc<PosVerifier>,
52 notifications: Arc<Notifications>,
53 accounts: Arc<AccountProvider>,
54 exit: Arc<(Mutex<bool>, Condvar)>,
55 block_gen: BlockGeneratorTestApi,
56 maybe_txgen: Option<Arc<TransactionGenerator>>,
57 maybe_direct_txgen: Option<Arc<Mutex<DirectTransactionGenerator>>>,
58}
59
60impl RpcModuleBuilder {
61 pub fn new(
62 rpc_impl_config: RpcImplConfiguration, consensus: SharedConsensusGraph,
63 sync: SharedSynchronizationService, tx_pool: SharedTransactionPool,
64 executor: TaskExecutor, data_man: Arc<BlockDataManager>,
65 network: Arc<NetworkService>, pos_handler: Arc<PosVerifier>,
66 notifications: Arc<Notifications>, accounts: Arc<AccountProvider>,
67 exit: Arc<(Mutex<bool>, Condvar)>, block_gen: BlockGeneratorTestApi,
68 maybe_txgen: Option<Arc<TransactionGenerator>>,
69 maybe_direct_txgen: Option<Arc<Mutex<DirectTransactionGenerator>>>,
70 ) -> Self {
71 Self {
72 rpc_impl_config,
73 consensus,
74 sync,
75 tx_pool,
76 executor,
77 data_man,
78 network,
79 pos_handler,
80 notifications,
81 accounts,
82 exit,
83 block_gen,
84 maybe_txgen,
85 maybe_direct_txgen,
86 }
87 }
88
89 pub fn build(
90 self, module_config: TransportRpcModuleConfig,
91 ) -> TransportRpcModules<()> {
92 let mut modules = TransportRpcModules::default();
93
94 if !module_config.is_empty() {
95 let TransportRpcModuleConfig { http, ws } = module_config.clone();
96
97 let Self {
98 rpc_impl_config,
99 consensus,
100 sync,
101 tx_pool,
102 executor,
103 data_man,
104 network,
105 pos_handler,
106 notifications,
107 accounts,
108 exit,
109 block_gen,
110 maybe_txgen,
111 maybe_direct_txgen,
112 } = self;
113
114 let mut registry = RpcRegistryInner::new(
115 rpc_impl_config,
116 consensus,
117 sync,
118 tx_pool,
119 executor,
120 data_man,
121 network,
122 pos_handler,
123 notifications,
124 accounts,
125 exit,
126 block_gen,
127 maybe_txgen,
128 maybe_direct_txgen,
129 );
130
131 modules.config = module_config;
132 modules.http = registry.maybe_module(http.as_ref());
133 modules.ws = registry.maybe_module(ws.as_ref());
134 }
135
136 modules
137 }
138}
139
140#[derive(Clone)]
141pub struct RpcRegistryInner {
142 rpc_impl_config: RpcImplConfiguration,
143 consensus: SharedConsensusGraph,
144 sync: SharedSynchronizationService,
145 tx_pool: SharedTransactionPool,
146 executor: TaskExecutor,
147 data_man: Arc<BlockDataManager>,
148 network: Arc<NetworkService>,
149 pos_handler: Arc<PosVerifier>,
150 notifications: Arc<Notifications>,
151 accounts: Arc<AccountProvider>,
152 exit: Arc<(Mutex<bool>, Condvar)>,
153 block_gen: BlockGeneratorTestApi,
154 maybe_txgen: Option<Arc<TransactionGenerator>>,
155 maybe_direct_txgen: Option<Arc<Mutex<DirectTransactionGenerator>>>,
156 modules: HashMap<CfxRpcModule, Methods>,
157}
158
159impl RpcRegistryInner {
160 pub fn new(
161 rpc_impl_config: RpcImplConfiguration, consensus: SharedConsensusGraph,
162 sync: SharedSynchronizationService, tx_pool: SharedTransactionPool,
163 executor: TaskExecutor, data_man: Arc<BlockDataManager>,
164 network: Arc<NetworkService>, pos_handler: Arc<PosVerifier>,
165 notifications: Arc<Notifications>, accounts: Arc<AccountProvider>,
166 exit: Arc<(Mutex<bool>, Condvar)>, block_gen: BlockGeneratorTestApi,
167 maybe_txgen: Option<Arc<TransactionGenerator>>,
168 maybe_direct_txgen: Option<Arc<Mutex<DirectTransactionGenerator>>>,
169 ) -> Self {
170 Self {
171 rpc_impl_config,
172 consensus,
173 sync,
174 tx_pool,
175 executor,
176 data_man,
177 network,
178 pos_handler,
179 notifications,
180 accounts,
181 exit,
182 block_gen,
183 maybe_txgen,
184 maybe_direct_txgen,
185 modules: Default::default(),
186 }
187 }
188
189 fn maybe_module(
190 &mut self, config: Option<&RpcModuleSelection>,
191 ) -> Option<RpcModule<()>> {
192 config.map(|config| self.module_for(config))
193 }
194
195 pub fn module_for(&mut self, config: &RpcModuleSelection) -> RpcModule<()> {
196 let mut module = RpcModule::new(());
197 let all_methods = self.cfx_methods(config.iter_selection());
198 for methods in all_methods {
199 module.merge(methods).expect("No conflicts");
200 }
201 module
202 }
203
204 pub fn cfx_methods(
205 &mut self, namespaces: impl Iterator<Item = CfxRpcModule>,
206 ) -> Vec<Methods> {
207 let namespaces: Vec<_> = namespaces.collect();
208
209 let namespace_methods = |namespace| {
210 self.modules
211 .entry(namespace)
212 .or_insert_with(|| match namespace {
213 CfxRpcModule::Debug => {
214 let mut methods = DebugHandler::new(
215 self.tx_pool.clone(),
216 self.consensus.clone(),
217 self.sync.clone(),
218 self.network.clone(),
219 self.accounts.clone(),
220 self.pos_handler.clone(),
221 self.exit.clone(),
222 )
223 .into_rpc();
224 methods
225 .merge(CfxDebugRpcServer::into_rpc(
226 CfxHandler::new(
227 self.rpc_impl_config.clone(),
228 self.consensus.clone(),
229 self.sync.clone(),
230 self.tx_pool.clone(),
231 self.accounts.clone(),
232 self.pos_handler.clone(),
233 self.block_gen.clone(),
234 ),
235 ))
236 .expect("No conflicts");
237 methods.into()
238 }
239 CfxRpcModule::Pos => {
240 let handler = PosHandler::new(
241 self.pos_handler.clone(),
242 self.data_man.clone(),
243 *self.network.get_network_type(),
244 self.consensus.clone(),
245 );
246 handler.into_rpc().into()
247 }
248 CfxRpcModule::Trace => {
249 let handler = TraceHandler::new(
250 *self.network.get_network_type(),
251 self.consensus.clone(),
252 );
253 handler.into_rpc().into()
254 }
255 CfxRpcModule::Txpool => TxPoolHandler::new(
256 self.tx_pool.clone(),
257 self.consensus.clone(),
258 *self.network.get_network_type(),
259 )
260 .into_rpc()
261 .into(),
262 CfxRpcModule::PubSub => PubSubHandler::new(
263 self.notifications.clone(),
264 self.executor.clone(),
265 self.consensus.clone(),
266 *self.network.get_network_type(),
267 )
268 .into_rpc()
269 .into(),
270 CfxRpcModule::Cfx => {
271 let mut module =
272 CfxRpcServer::into_rpc(CfxHandler::new(
273 self.rpc_impl_config.clone(),
274 self.consensus.clone(),
275 self.sync.clone(),
276 self.tx_pool.clone(),
277 self.accounts.clone(),
278 self.pos_handler.clone(),
279 self.block_gen.clone(),
280 ));
281 if self
282 .rpc_impl_config
283 .poll_lifetime_in_seconds
284 .is_some()
285 {
286 let filter_module =
287 CfxFilterHandler::new_with_task_executor(
288 self.consensus.clone(),
289 self.tx_pool.clone(),
290 self.notifications.epochs_ordered.clone(),
291 self.executor.clone(),
292 self.rpc_impl_config
293 .poll_lifetime_in_seconds
294 .unwrap(),
295 self.rpc_impl_config
296 .get_logs_filter_max_limit,
297 *self.network.get_network_type(),
298 )
299 .into_rpc();
300 module.merge(filter_module).expect("No conflicts");
301 }
302 module.into()
303 }
304 CfxRpcModule::Test => TestHandler::new(
305 self.exit.clone(),
306 self.consensus.clone(),
307 self.network.clone(),
308 self.pos_handler.clone(),
309 self.tx_pool.clone(),
310 self.accounts.clone(),
311 self.block_gen.clone(),
312 self.maybe_txgen.clone(),
313 self.maybe_direct_txgen.clone(),
314 self.sync.clone(),
315 )
316 .into_rpc()
317 .into(),
318 })
319 .clone()
320 };
321
322 namespaces.iter().copied().map(namespace_methods).collect()
323 }
324}
325
326#[derive(Debug)]
327pub struct RpcServerConfig {
328 http_server_config: Option<ServerConfigBuilder>,
329 http_cors_domains: Option<String>,
330 http_addr: Option<SocketAddr>,
331 ws_server_config: Option<ServerConfigBuilder>,
332 ws_cors_domains: Option<String>,
333 ws_addr: Option<SocketAddr>,
334}
335
336impl Default for RpcServerConfig {
337 fn default() -> Self {
338 Self {
339 http_server_config: None,
340 http_cors_domains: None,
341 http_addr: None,
342 ws_server_config: None,
343 ws_cors_domains: None,
344 ws_addr: None,
345 }
346 }
347}
348
349impl RpcServerConfig {
350 pub fn http(config: ServerConfigBuilder) -> Self {
351 Self::default().with_http(config)
352 }
353
354 pub fn ws(config: ServerConfigBuilder) -> Self {
355 Self::default().with_ws(config)
356 }
357
358 pub fn with_http(mut self, config: ServerConfigBuilder) -> Self {
359 self.http_server_config =
360 Some(config.set_id_provider(SubscriptionIdProvider::default()));
361 self
362 }
363
364 pub fn with_ws(mut self, config: ServerConfigBuilder) -> Self {
365 self.ws_server_config =
366 Some(config.set_id_provider(SubscriptionIdProvider::default()));
367 self
368 }
369
370 pub fn with_cors(self, cors_domain: Option<String>) -> Self {
371 self.with_http_cors(cors_domain.clone())
372 .with_ws_cors(cors_domain)
373 }
374
375 pub fn with_ws_cors(mut self, cors_domain: Option<String>) -> Self {
376 self.ws_cors_domains = cors_domain;
377 self
378 }
379
380 pub fn with_http_cors(mut self, cors_domain: Option<String>) -> Self {
381 self.http_cors_domains = cors_domain;
382 self
383 }
384
385 pub const fn with_http_address(mut self, addr: SocketAddr) -> Self {
386 self.http_addr = Some(addr);
387 self
388 }
389
390 pub const fn with_ws_address(mut self, addr: SocketAddr) -> Self {
391 self.ws_addr = Some(addr);
392 self
393 }
394
395 pub fn with_id_provider<I>(mut self, id_provider: I) -> Self
396 where I: IdProvider + Clone + 'static {
397 if let Some(http) = self.http_server_config.take() {
398 self.http_server_config =
399 Some(http.set_id_provider(id_provider.clone()));
400 }
401 if let Some(ws) = self.ws_server_config.take() {
402 self.ws_server_config =
403 Some(ws.set_id_provider(id_provider.clone()));
404 }
405
406 self
407 }
408
409 pub const fn has_server(&self) -> bool {
410 self.http_server_config.is_some() || self.ws_server_config.is_some()
411 }
412
413 pub const fn http_address(&self) -> Option<SocketAddr> { self.http_addr }
414
415 pub const fn ws_address(&self) -> Option<SocketAddr> { self.ws_addr }
416
417 pub async fn start(
418 self, modules: &TransportRpcModules,
419 ) -> Result<RpcServerHandle, RpcError<CfxRpcModule>> {
420 let http_socket_addr = self.http_addr.unwrap_or(SocketAddr::V4(
421 SocketAddrV4::new(Ipv4Addr::LOCALHOST, DEFAULT_HTTP_PORT),
422 ));
423
424 let ws_socket_addr = self.ws_addr.unwrap_or(SocketAddr::V4(
425 SocketAddrV4::new(Ipv4Addr::LOCALHOST, DEFAULT_WS_PORT),
426 ));
427
428 if self.http_addr == self.ws_addr
429 && self.http_server_config.is_some()
430 && self.ws_server_config.is_some()
431 {
432 modules.config.ensure_ws_http_identical()?;
433
434 if let Some(config) = self.http_server_config {
435 let server = ServerBuilder::new()
436 .set_config(config.build())
437 .build(http_socket_addr)
438 .await
439 .map_err(|err| {
440 RpcError::server_error(
441 err,
442 ServerKind::WsHttp(http_socket_addr),
443 )
444 })?;
445 let addr = server.local_addr().map_err(|err| {
446 RpcError::server_error(
447 err,
448 ServerKind::WsHttp(http_socket_addr),
449 )
450 })?;
451 if let Some(module) =
452 modules.http.as_ref().or(modules.ws.as_ref())
453 {
454 let handle = server.start(module.clone());
455 return Ok(RpcServerHandle {
456 http_local_addr: Some(addr),
457 ws_local_addr: Some(addr),
458 http: Some(handle.clone()),
459 ws: Some(handle),
460 });
461 }
462
463 return Err(RpcError::Custom(
464 "No valid RpcModule found from modules".to_string(),
465 ));
466 }
467 }
468
469 let mut result = RpcServerHandle {
470 http_local_addr: None,
471 ws_local_addr: None,
472 http: None,
473 ws: None,
474 };
475
476 if let Some(config) = self.ws_server_config {
477 let server = ServerBuilder::new()
478 .set_config(config.ws_only().build())
479 .build(ws_socket_addr)
480 .await
481 .map_err(|err| {
482 RpcError::server_error(err, ServerKind::WS(ws_socket_addr))
483 })?;
484
485 let addr = server.local_addr().map_err(|err| {
486 RpcError::server_error(err, ServerKind::WS(ws_socket_addr))
487 })?;
488
489 let ws_local_addr = Some(addr);
490 let ws_handle = Some(
491 server.start(modules.ws.clone().expect("ws server error")),
492 );
493
494 result.ws = ws_handle;
495 result.ws_local_addr = ws_local_addr;
496 }
497
498 if let Some(config) = self.http_server_config {
499 let server = ServerBuilder::new()
500 .set_config(config.http_only().build())
501 .build(http_socket_addr)
502 .await
503 .map_err(|err| {
504 RpcError::server_error(
505 err,
506 ServerKind::Http(http_socket_addr),
507 )
508 })?;
509 let local_addr = server.local_addr().map_err(|err| {
510 RpcError::server_error(err, ServerKind::Http(http_socket_addr))
511 })?;
512 let http_local_addr = Some(local_addr);
513 let http_handle = Some(
514 server.start(modules.http.clone().expect("http server error")),
515 );
516
517 result.http = http_handle;
518 result.http_local_addr = http_local_addr;
519 }
520
521 Ok(result)
522 }
523}
524
525#[derive(Debug, Clone, Default, Eq, PartialEq)]
526pub struct TransportRpcModuleConfig {
527 pub http: Option<RpcModuleSelection>,
528 pub ws: Option<RpcModuleSelection>,
529}
530
531impl TransportRpcModuleConfig {
532 pub fn set_http(http: impl Into<RpcModuleSelection>) -> Self {
533 Self::default().with_http(http)
534 }
535
536 pub fn set_ws(ws: impl Into<RpcModuleSelection>) -> Self {
537 Self::default().with_ws(ws)
538 }
539
540 pub fn with_http(mut self, http: impl Into<RpcModuleSelection>) -> Self {
541 self.http = Some(http.into());
542 self
543 }
544
545 pub fn with_ws(mut self, ws: impl Into<RpcModuleSelection>) -> Self {
546 self.ws = Some(ws.into());
547 self
548 }
549
550 pub fn http_mut(&mut self) -> &mut Option<RpcModuleSelection> {
551 &mut self.http
552 }
553
554 pub fn ws_mut(&mut self) -> &mut Option<RpcModuleSelection> { &mut self.ws }
555
556 pub const fn is_empty(&self) -> bool {
557 self.http.is_none() && self.ws.is_none()
558 }
559
560 pub const fn http(&self) -> Option<&RpcModuleSelection> {
561 self.http.as_ref()
562 }
563
564 pub const fn ws(&self) -> Option<&RpcModuleSelection> { self.ws.as_ref() }
565
566 fn ensure_ws_http_identical(
567 &self,
568 ) -> Result<(), WsHttpSamePortError<CfxRpcModule>> {
569 if RpcModuleSelection::are_identical(
570 self.http.as_ref(),
571 self.ws.as_ref(),
572 ) {
573 Ok(())
574 } else {
575 let http_modules = self
576 .http
577 .as_ref()
578 .map(RpcModuleSelection::to_selection)
579 .unwrap_or_default();
580 let ws_modules = self
581 .ws
582 .as_ref()
583 .map(RpcModuleSelection::to_selection)
584 .unwrap_or_default();
585
586 let http_not_ws: HashSet<CfxRpcModule> =
587 http_modules.difference(&ws_modules).copied().collect();
588 let ws_not_http: HashSet<CfxRpcModule> =
589 ws_modules.difference(&http_modules).copied().collect();
590 let overlap: HashSet<CfxRpcModule> =
591 http_modules.intersection(&ws_modules).copied().collect();
592 let conflicting_modules = ConflictingModules {
594 overlap,
595 http_not_ws,
596 ws_not_http,
597 };
598 Err(WsHttpSamePortError::ConflictingModules(Box::new(
599 conflicting_modules,
600 )))
601 }
602 }
603}
604
605#[derive(Debug, Clone, Default)]
606pub struct TransportRpcModules<Context = ()> {
607 pub config: TransportRpcModuleConfig,
608 pub http: Option<RpcModule<Context>>,
609 pub ws: Option<RpcModule<Context>>,
610}
611
612impl TransportRpcModules {
613 pub const fn module_config(&self) -> &TransportRpcModuleConfig {
614 &self.config
615 }
616
617 pub fn merge_http(
618 &mut self, other: impl Into<Methods>,
619 ) -> Result<bool, RegisterMethodError> {
620 if let Some(ref mut http) = self.http {
621 return http.merge(other.into()).map(|_| true);
622 }
623 Ok(false)
624 }
625
626 pub fn merge_ws(
627 &mut self, other: impl Into<Methods>,
628 ) -> Result<bool, RegisterMethodError> {
629 if let Some(ref mut ws) = self.ws {
630 return ws.merge(other.into()).map(|_| true);
631 }
632 Ok(false)
633 }
634
635 pub fn merge_configured(
636 &mut self, other: impl Into<Methods>,
637 ) -> Result<(), RegisterMethodError> {
638 let other = other.into();
639 self.merge_http(other.clone())?;
640 self.merge_ws(other.clone())?;
641 Ok(())
642 }
643
644 pub fn remove_http_method(&mut self, method_name: &'static str) -> bool {
645 if let Some(http_module) = &mut self.http {
646 http_module.remove_method(method_name).is_some()
647 } else {
648 false
649 }
650 }
651
652 pub fn remove_ws_method(&mut self, method_name: &'static str) -> bool {
653 if let Some(ws_module) = &mut self.ws {
654 ws_module.remove_method(method_name).is_some()
655 } else {
656 false
657 }
658 }
659
660 pub fn remove_method_from_configured(
661 &mut self, method_name: &'static str,
662 ) -> bool {
663 let http_removed = self.remove_http_method(method_name);
664 let ws_removed = self.remove_ws_method(method_name);
665
666 http_removed || ws_removed
667 }
668}