cfx_rpc_builder/cfx/
mod.rs

1mod module;
2
3pub use crate::{
4    error::*, id_provider::SubscriptionIdProvider, RpcServerHandle,
5};
6pub use module::{CfxRpcModule, RpcModuleSelection};
7
8use blockgen::BlockGeneratorTestApi;
9use cfx_rpc_cfx_api::{
10    CfxDebugRpcServer, CfxFilterRpcServer, CfxRpcServer, DebugRpcServer,
11    PosRpcServer, PubSubApiServer, TestRpcServer, TraceServer, TxPoolServer,
12};
13use cfx_rpc_cfx_impl::{
14    CfxFilterHandler, CfxHandler, DebugHandler, PosHandler, PubSubHandler,
15    TestHandler, TraceHandler, TxPoolHandler,
16};
17use cfx_rpc_cfx_types::RpcImplConfiguration;
18use cfx_tasks::TaskExecutor;
19use cfxcore::{
20    block_data_manager::BlockDataManager, consensus::pos_handler::PosVerifier,
21    Notifications, SharedConsensusGraph, SharedSynchronizationService,
22    SharedTransactionPool,
23};
24use cfxcore_accounts::AccountProvider;
25use jsonrpsee::{
26    core::RegisterMethodError,
27    server::{IdProvider, ServerBuilder, ServerConfigBuilder},
28    Methods, RpcModule,
29};
30use network::NetworkService;
31use parking_lot::{Condvar, Mutex};
32use std::{
33    collections::{HashMap, HashSet},
34    net::{Ipv4Addr, SocketAddr, SocketAddrV4},
35    sync::Arc,
36};
37use txgen::{DirectTransactionGenerator, TransactionGenerator};
38
39pub const DEFAULT_HTTP_PORT: u16 = 12537;
40pub const DEFAULT_WS_PORT: u16 = 12538;
41
42#[derive(Clone)]
43pub struct RpcModuleBuilder {
44    rpc_impl_config: RpcImplConfiguration,
45    consensus: SharedConsensusGraph,
46    sync: SharedSynchronizationService,
47    tx_pool: SharedTransactionPool,
48    executor: TaskExecutor,
49    data_man: Arc<BlockDataManager>,
50    network: Arc<NetworkService>,
51    pos_handler: Arc<PosVerifier>,
52    notifications: Arc<Notifications>,
53    accounts: Arc<AccountProvider>,
54    exit: Arc<(Mutex<bool>, Condvar)>,
55    block_gen: BlockGeneratorTestApi,
56    maybe_txgen: Option<Arc<TransactionGenerator>>,
57    maybe_direct_txgen: Option<Arc<Mutex<DirectTransactionGenerator>>>,
58}
59
60impl RpcModuleBuilder {
61    pub fn new(
62        rpc_impl_config: RpcImplConfiguration, consensus: SharedConsensusGraph,
63        sync: SharedSynchronizationService, tx_pool: SharedTransactionPool,
64        executor: TaskExecutor, data_man: Arc<BlockDataManager>,
65        network: Arc<NetworkService>, pos_handler: Arc<PosVerifier>,
66        notifications: Arc<Notifications>, accounts: Arc<AccountProvider>,
67        exit: Arc<(Mutex<bool>, Condvar)>, block_gen: BlockGeneratorTestApi,
68        maybe_txgen: Option<Arc<TransactionGenerator>>,
69        maybe_direct_txgen: Option<Arc<Mutex<DirectTransactionGenerator>>>,
70    ) -> Self {
71        Self {
72            rpc_impl_config,
73            consensus,
74            sync,
75            tx_pool,
76            executor,
77            data_man,
78            network,
79            pos_handler,
80            notifications,
81            accounts,
82            exit,
83            block_gen,
84            maybe_txgen,
85            maybe_direct_txgen,
86        }
87    }
88
89    pub fn build(
90        self, module_config: TransportRpcModuleConfig,
91    ) -> TransportRpcModules<()> {
92        let mut modules = TransportRpcModules::default();
93
94        if !module_config.is_empty() {
95            let TransportRpcModuleConfig { http, ws } = module_config.clone();
96
97            let Self {
98                rpc_impl_config,
99                consensus,
100                sync,
101                tx_pool,
102                executor,
103                data_man,
104                network,
105                pos_handler,
106                notifications,
107                accounts,
108                exit,
109                block_gen,
110                maybe_txgen,
111                maybe_direct_txgen,
112            } = self;
113
114            let mut registry = RpcRegistryInner::new(
115                rpc_impl_config,
116                consensus,
117                sync,
118                tx_pool,
119                executor,
120                data_man,
121                network,
122                pos_handler,
123                notifications,
124                accounts,
125                exit,
126                block_gen,
127                maybe_txgen,
128                maybe_direct_txgen,
129            );
130
131            modules.config = module_config;
132            modules.http = registry.maybe_module(http.as_ref());
133            modules.ws = registry.maybe_module(ws.as_ref());
134        }
135
136        modules
137    }
138}
139
140#[derive(Clone)]
141pub struct RpcRegistryInner {
142    rpc_impl_config: RpcImplConfiguration,
143    consensus: SharedConsensusGraph,
144    sync: SharedSynchronizationService,
145    tx_pool: SharedTransactionPool,
146    executor: TaskExecutor,
147    data_man: Arc<BlockDataManager>,
148    network: Arc<NetworkService>,
149    pos_handler: Arc<PosVerifier>,
150    notifications: Arc<Notifications>,
151    accounts: Arc<AccountProvider>,
152    exit: Arc<(Mutex<bool>, Condvar)>,
153    block_gen: BlockGeneratorTestApi,
154    maybe_txgen: Option<Arc<TransactionGenerator>>,
155    maybe_direct_txgen: Option<Arc<Mutex<DirectTransactionGenerator>>>,
156    modules: HashMap<CfxRpcModule, Methods>,
157}
158
159impl RpcRegistryInner {
160    pub fn new(
161        rpc_impl_config: RpcImplConfiguration, consensus: SharedConsensusGraph,
162        sync: SharedSynchronizationService, tx_pool: SharedTransactionPool,
163        executor: TaskExecutor, data_man: Arc<BlockDataManager>,
164        network: Arc<NetworkService>, pos_handler: Arc<PosVerifier>,
165        notifications: Arc<Notifications>, accounts: Arc<AccountProvider>,
166        exit: Arc<(Mutex<bool>, Condvar)>, block_gen: BlockGeneratorTestApi,
167        maybe_txgen: Option<Arc<TransactionGenerator>>,
168        maybe_direct_txgen: Option<Arc<Mutex<DirectTransactionGenerator>>>,
169    ) -> Self {
170        Self {
171            rpc_impl_config,
172            consensus,
173            sync,
174            tx_pool,
175            executor,
176            data_man,
177            network,
178            pos_handler,
179            notifications,
180            accounts,
181            exit,
182            block_gen,
183            maybe_txgen,
184            maybe_direct_txgen,
185            modules: Default::default(),
186        }
187    }
188
189    fn maybe_module(
190        &mut self, config: Option<&RpcModuleSelection>,
191    ) -> Option<RpcModule<()>> {
192        config.map(|config| self.module_for(config))
193    }
194
195    pub fn module_for(&mut self, config: &RpcModuleSelection) -> RpcModule<()> {
196        let mut module = RpcModule::new(());
197        let all_methods = self.cfx_methods(config.iter_selection());
198        for methods in all_methods {
199            module.merge(methods).expect("No conflicts");
200        }
201        module
202    }
203
204    pub fn cfx_methods(
205        &mut self, namespaces: impl Iterator<Item = CfxRpcModule>,
206    ) -> Vec<Methods> {
207        let namespaces: Vec<_> = namespaces.collect();
208
209        let namespace_methods = |namespace| {
210            self.modules
211                .entry(namespace)
212                .or_insert_with(|| match namespace {
213                    CfxRpcModule::Debug => {
214                        let mut methods = DebugHandler::new(
215                            self.tx_pool.clone(),
216                            self.consensus.clone(),
217                            self.sync.clone(),
218                            self.network.clone(),
219                            self.accounts.clone(),
220                            self.pos_handler.clone(),
221                            self.exit.clone(),
222                        )
223                        .into_rpc();
224                        methods
225                            .merge(CfxDebugRpcServer::into_rpc(
226                                CfxHandler::new(
227                                    self.rpc_impl_config.clone(),
228                                    self.consensus.clone(),
229                                    self.sync.clone(),
230                                    self.tx_pool.clone(),
231                                    self.accounts.clone(),
232                                    self.pos_handler.clone(),
233                                    self.block_gen.clone(),
234                                ),
235                            ))
236                            .expect("No conflicts");
237                        methods.into()
238                    }
239                    CfxRpcModule::Pos => {
240                        let handler = PosHandler::new(
241                            self.pos_handler.clone(),
242                            self.data_man.clone(),
243                            *self.network.get_network_type(),
244                            self.consensus.clone(),
245                        );
246                        handler.into_rpc().into()
247                    }
248                    CfxRpcModule::Trace => {
249                        let handler = TraceHandler::new(
250                            *self.network.get_network_type(),
251                            self.consensus.clone(),
252                        );
253                        handler.into_rpc().into()
254                    }
255                    CfxRpcModule::Txpool => TxPoolHandler::new(
256                        self.tx_pool.clone(),
257                        self.consensus.clone(),
258                        *self.network.get_network_type(),
259                    )
260                    .into_rpc()
261                    .into(),
262                    CfxRpcModule::PubSub => PubSubHandler::new(
263                        self.notifications.clone(),
264                        self.executor.clone(),
265                        self.consensus.clone(),
266                        *self.network.get_network_type(),
267                    )
268                    .into_rpc()
269                    .into(),
270                    CfxRpcModule::Cfx => {
271                        let mut module =
272                            CfxRpcServer::into_rpc(CfxHandler::new(
273                                self.rpc_impl_config.clone(),
274                                self.consensus.clone(),
275                                self.sync.clone(),
276                                self.tx_pool.clone(),
277                                self.accounts.clone(),
278                                self.pos_handler.clone(),
279                                self.block_gen.clone(),
280                            ));
281                        if self
282                            .rpc_impl_config
283                            .poll_lifetime_in_seconds
284                            .is_some()
285                        {
286                            let filter_module =
287                                CfxFilterHandler::new_with_task_executor(
288                                    self.consensus.clone(),
289                                    self.tx_pool.clone(),
290                                    self.notifications.epochs_ordered.clone(),
291                                    self.executor.clone(),
292                                    self.rpc_impl_config
293                                        .poll_lifetime_in_seconds
294                                        .unwrap(),
295                                    self.rpc_impl_config
296                                        .get_logs_filter_max_limit,
297                                    *self.network.get_network_type(),
298                                )
299                                .into_rpc();
300                            module.merge(filter_module).expect("No conflicts");
301                        }
302                        module.into()
303                    }
304                    CfxRpcModule::Test => TestHandler::new(
305                        self.exit.clone(),
306                        self.consensus.clone(),
307                        self.network.clone(),
308                        self.pos_handler.clone(),
309                        self.tx_pool.clone(),
310                        self.accounts.clone(),
311                        self.block_gen.clone(),
312                        self.maybe_txgen.clone(),
313                        self.maybe_direct_txgen.clone(),
314                        self.sync.clone(),
315                    )
316                    .into_rpc()
317                    .into(),
318                })
319                .clone()
320        };
321
322        namespaces.iter().copied().map(namespace_methods).collect()
323    }
324}
325
326#[derive(Debug)]
327pub struct RpcServerConfig {
328    http_server_config: Option<ServerConfigBuilder>,
329    http_cors_domains: Option<String>,
330    http_addr: Option<SocketAddr>,
331    ws_server_config: Option<ServerConfigBuilder>,
332    ws_cors_domains: Option<String>,
333    ws_addr: Option<SocketAddr>,
334}
335
336impl Default for RpcServerConfig {
337    fn default() -> Self {
338        Self {
339            http_server_config: None,
340            http_cors_domains: None,
341            http_addr: None,
342            ws_server_config: None,
343            ws_cors_domains: None,
344            ws_addr: None,
345        }
346    }
347}
348
349impl RpcServerConfig {
350    pub fn http(config: ServerConfigBuilder) -> Self {
351        Self::default().with_http(config)
352    }
353
354    pub fn ws(config: ServerConfigBuilder) -> Self {
355        Self::default().with_ws(config)
356    }
357
358    pub fn with_http(mut self, config: ServerConfigBuilder) -> Self {
359        self.http_server_config =
360            Some(config.set_id_provider(SubscriptionIdProvider::default()));
361        self
362    }
363
364    pub fn with_ws(mut self, config: ServerConfigBuilder) -> Self {
365        self.ws_server_config =
366            Some(config.set_id_provider(SubscriptionIdProvider::default()));
367        self
368    }
369
370    pub fn with_cors(self, cors_domain: Option<String>) -> Self {
371        self.with_http_cors(cors_domain.clone())
372            .with_ws_cors(cors_domain)
373    }
374
375    pub fn with_ws_cors(mut self, cors_domain: Option<String>) -> Self {
376        self.ws_cors_domains = cors_domain;
377        self
378    }
379
380    pub fn with_http_cors(mut self, cors_domain: Option<String>) -> Self {
381        self.http_cors_domains = cors_domain;
382        self
383    }
384
385    pub const fn with_http_address(mut self, addr: SocketAddr) -> Self {
386        self.http_addr = Some(addr);
387        self
388    }
389
390    pub const fn with_ws_address(mut self, addr: SocketAddr) -> Self {
391        self.ws_addr = Some(addr);
392        self
393    }
394
395    pub fn with_id_provider<I>(mut self, id_provider: I) -> Self
396    where I: IdProvider + Clone + 'static {
397        if let Some(http) = self.http_server_config.take() {
398            self.http_server_config =
399                Some(http.set_id_provider(id_provider.clone()));
400        }
401        if let Some(ws) = self.ws_server_config.take() {
402            self.ws_server_config =
403                Some(ws.set_id_provider(id_provider.clone()));
404        }
405
406        self
407    }
408
409    pub const fn has_server(&self) -> bool {
410        self.http_server_config.is_some() || self.ws_server_config.is_some()
411    }
412
413    pub const fn http_address(&self) -> Option<SocketAddr> { self.http_addr }
414
415    pub const fn ws_address(&self) -> Option<SocketAddr> { self.ws_addr }
416
417    pub async fn start(
418        self, modules: &TransportRpcModules,
419    ) -> Result<RpcServerHandle, RpcError<CfxRpcModule>> {
420        let http_socket_addr = self.http_addr.unwrap_or(SocketAddr::V4(
421            SocketAddrV4::new(Ipv4Addr::LOCALHOST, DEFAULT_HTTP_PORT),
422        ));
423
424        let ws_socket_addr = self.ws_addr.unwrap_or(SocketAddr::V4(
425            SocketAddrV4::new(Ipv4Addr::LOCALHOST, DEFAULT_WS_PORT),
426        ));
427
428        if self.http_addr == self.ws_addr
429            && self.http_server_config.is_some()
430            && self.ws_server_config.is_some()
431        {
432            modules.config.ensure_ws_http_identical()?;
433
434            if let Some(config) = self.http_server_config {
435                let server = ServerBuilder::new()
436                    .set_config(config.build())
437                    .build(http_socket_addr)
438                    .await
439                    .map_err(|err| {
440                        RpcError::server_error(
441                            err,
442                            ServerKind::WsHttp(http_socket_addr),
443                        )
444                    })?;
445                let addr = server.local_addr().map_err(|err| {
446                    RpcError::server_error(
447                        err,
448                        ServerKind::WsHttp(http_socket_addr),
449                    )
450                })?;
451                if let Some(module) =
452                    modules.http.as_ref().or(modules.ws.as_ref())
453                {
454                    let handle = server.start(module.clone());
455                    return Ok(RpcServerHandle {
456                        http_local_addr: Some(addr),
457                        ws_local_addr: Some(addr),
458                        http: Some(handle.clone()),
459                        ws: Some(handle),
460                    });
461                }
462
463                return Err(RpcError::Custom(
464                    "No valid RpcModule found from modules".to_string(),
465                ));
466            }
467        }
468
469        let mut result = RpcServerHandle {
470            http_local_addr: None,
471            ws_local_addr: None,
472            http: None,
473            ws: None,
474        };
475
476        if let Some(config) = self.ws_server_config {
477            let server = ServerBuilder::new()
478                .set_config(config.ws_only().build())
479                .build(ws_socket_addr)
480                .await
481                .map_err(|err| {
482                    RpcError::server_error(err, ServerKind::WS(ws_socket_addr))
483                })?;
484
485            let addr = server.local_addr().map_err(|err| {
486                RpcError::server_error(err, ServerKind::WS(ws_socket_addr))
487            })?;
488
489            let ws_local_addr = Some(addr);
490            let ws_handle = Some(
491                server.start(modules.ws.clone().expect("ws server error")),
492            );
493
494            result.ws = ws_handle;
495            result.ws_local_addr = ws_local_addr;
496        }
497
498        if let Some(config) = self.http_server_config {
499            let server = ServerBuilder::new()
500                .set_config(config.http_only().build())
501                .build(http_socket_addr)
502                .await
503                .map_err(|err| {
504                    RpcError::server_error(
505                        err,
506                        ServerKind::Http(http_socket_addr),
507                    )
508                })?;
509            let local_addr = server.local_addr().map_err(|err| {
510                RpcError::server_error(err, ServerKind::Http(http_socket_addr))
511            })?;
512            let http_local_addr = Some(local_addr);
513            let http_handle = Some(
514                server.start(modules.http.clone().expect("http server error")),
515            );
516
517            result.http = http_handle;
518            result.http_local_addr = http_local_addr;
519        }
520
521        Ok(result)
522    }
523}
524
525#[derive(Debug, Clone, Default, Eq, PartialEq)]
526pub struct TransportRpcModuleConfig {
527    pub http: Option<RpcModuleSelection>,
528    pub ws: Option<RpcModuleSelection>,
529}
530
531impl TransportRpcModuleConfig {
532    pub fn set_http(http: impl Into<RpcModuleSelection>) -> Self {
533        Self::default().with_http(http)
534    }
535
536    pub fn set_ws(ws: impl Into<RpcModuleSelection>) -> Self {
537        Self::default().with_ws(ws)
538    }
539
540    pub fn with_http(mut self, http: impl Into<RpcModuleSelection>) -> Self {
541        self.http = Some(http.into());
542        self
543    }
544
545    pub fn with_ws(mut self, ws: impl Into<RpcModuleSelection>) -> Self {
546        self.ws = Some(ws.into());
547        self
548    }
549
550    pub fn http_mut(&mut self) -> &mut Option<RpcModuleSelection> {
551        &mut self.http
552    }
553
554    pub fn ws_mut(&mut self) -> &mut Option<RpcModuleSelection> { &mut self.ws }
555
556    pub const fn is_empty(&self) -> bool {
557        self.http.is_none() && self.ws.is_none()
558    }
559
560    pub const fn http(&self) -> Option<&RpcModuleSelection> {
561        self.http.as_ref()
562    }
563
564    pub const fn ws(&self) -> Option<&RpcModuleSelection> { self.ws.as_ref() }
565
566    fn ensure_ws_http_identical(
567        &self,
568    ) -> Result<(), WsHttpSamePortError<CfxRpcModule>> {
569        if RpcModuleSelection::are_identical(
570            self.http.as_ref(),
571            self.ws.as_ref(),
572        ) {
573            Ok(())
574        } else {
575            let http_modules = self
576                .http
577                .as_ref()
578                .map(RpcModuleSelection::to_selection)
579                .unwrap_or_default();
580            let ws_modules = self
581                .ws
582                .as_ref()
583                .map(RpcModuleSelection::to_selection)
584                .unwrap_or_default();
585
586            let http_not_ws: HashSet<CfxRpcModule> =
587                http_modules.difference(&ws_modules).copied().collect();
588            let ws_not_http: HashSet<CfxRpcModule> =
589                ws_modules.difference(&http_modules).copied().collect();
590            let overlap: HashSet<CfxRpcModule> =
591                http_modules.intersection(&ws_modules).copied().collect();
592            // 指定泛型为 CfxRpcModule 以避免冲突
593            let conflicting_modules = ConflictingModules {
594                overlap,
595                http_not_ws,
596                ws_not_http,
597            };
598            Err(WsHttpSamePortError::ConflictingModules(Box::new(
599                conflicting_modules,
600            )))
601        }
602    }
603}
604
605#[derive(Debug, Clone, Default)]
606pub struct TransportRpcModules<Context = ()> {
607    pub config: TransportRpcModuleConfig,
608    pub http: Option<RpcModule<Context>>,
609    pub ws: Option<RpcModule<Context>>,
610}
611
612impl TransportRpcModules {
613    pub const fn module_config(&self) -> &TransportRpcModuleConfig {
614        &self.config
615    }
616
617    pub fn merge_http(
618        &mut self, other: impl Into<Methods>,
619    ) -> Result<bool, RegisterMethodError> {
620        if let Some(ref mut http) = self.http {
621            return http.merge(other.into()).map(|_| true);
622        }
623        Ok(false)
624    }
625
626    pub fn merge_ws(
627        &mut self, other: impl Into<Methods>,
628    ) -> Result<bool, RegisterMethodError> {
629        if let Some(ref mut ws) = self.ws {
630            return ws.merge(other.into()).map(|_| true);
631        }
632        Ok(false)
633    }
634
635    pub fn merge_configured(
636        &mut self, other: impl Into<Methods>,
637    ) -> Result<(), RegisterMethodError> {
638        let other = other.into();
639        self.merge_http(other.clone())?;
640        self.merge_ws(other.clone())?;
641        Ok(())
642    }
643
644    pub fn remove_http_method(&mut self, method_name: &'static str) -> bool {
645        if let Some(http_module) = &mut self.http {
646            http_module.remove_method(method_name).is_some()
647        } else {
648            false
649        }
650    }
651
652    pub fn remove_ws_method(&mut self, method_name: &'static str) -> bool {
653        if let Some(ws_module) = &mut self.ws {
654            ws_module.remove_method(method_name).is_some()
655        } else {
656            false
657        }
658    }
659
660    pub fn remove_method_from_configured(
661        &mut self, method_name: &'static str,
662    ) -> bool {
663        let http_removed = self.remove_http_method(method_name);
664        let ws_removed = self.remove_ws_method(method_name);
665
666        http_removed || ws_removed
667    }
668}