1use log::{debug, info};
6use std::{
7 collections::HashMap,
8 fs::create_dir_all,
9 path::Path,
10 sync::{Arc, Weak},
11 thread,
12 time::{Duration, Instant},
13};
14
15use cfx_rpc_builder::RpcServerHandle;
16use cfx_util_macros::bail;
17use jsonrpc_http_server::Server as HttpServer;
18use jsonrpc_tcp_server::Server as TcpServer;
19use jsonrpc_ws_server::Server as WSServer;
20use parking_lot::{Condvar, Mutex};
21use rand_08::{prelude::StdRng, rngs::OsRng, SeedableRng};
22use threadpool::ThreadPool;
23
24use blockgen::BlockGenerator;
25use cfx_executor::machine::{Machine, VmFactory};
26use cfx_parameters::genesis::{
27 DEV_GENESIS_KEY_PAIR_2, GENESIS_ACCOUNT_ADDRESS,
28};
29use cfx_rpc_cfx_types::apis::ApiSet;
30use cfx_storage::StorageManager;
31use cfx_tasks::TaskManager;
32use cfx_types::{address_util::AddressUtil, Address, Space, U256};
33pub use cfxcore::pos::pos::PosDropHandle;
34use cfxcore::{
35 block_data_manager::BlockDataManager,
36 consensus::{
37 pivot_hint::PivotHint,
38 pos_handler::{PosConfiguration, PosVerifier},
39 },
40 genesis_block::{self as genesis, genesis_block},
41 pow::PowComputer,
42 statistics::Statistics,
43 sync::SyncPhaseType,
44 ConsensusGraph, LightProvider, NodeType, Notifications, Stopable,
45 SynchronizationGraph, SynchronizationService, TransactionPool,
46 WORKER_COMPUTATION_PARALLELISM,
47};
48use cfxcore_accounts::AccountProvider;
49use cfxkey::public_to_address;
50use diem_config::keys::ConfigKey;
51use diem_crypto::{
52 key_file::{load_pri_key, save_pri_key},
53 PrivateKey, Uniform,
54};
55use diem_types::validator_config::{
56 ConsensusPrivateKey, ConsensusVRFPrivateKey,
57};
58use malloc_size_of::{new_malloc_size_ops, MallocSizeOf, MallocSizeOfOps};
59use network::NetworkService;
60use secret_store::{SecretStore, SharedSecretStore};
61use tokio::runtime::Runtime as TokioRuntime;
62use txgen::{DirectTransactionGenerator, TransactionGenerator};
63
64use cfx_config::{parse_config_address_string, Configuration};
65
66use crate::{
67 accounts::{account_provider, keys_path},
68 keylib::KeyPair,
69 rpc::{
70 extractor::RpcExtractor,
71 impls::{
72 cfx::RpcImpl, common::RpcImpl as CommonRpcImpl,
73 pubsub::PubSubClient,
74 },
75 launch_async_rpc_servers, launch_cfx_async_rpc_servers,
76 setup_debug_rpc_apis, setup_public_rpc_apis,
77 },
78};
79#[cfg(all(unix, feature = "jemalloc-prof"))]
80use cfx_mallocator_utils::start_pprf_server;
81use cfxcore::consensus::pos_handler::read_initial_nodes_from_file;
82
83pub mod delegate_convert;
84pub mod panic_handler;
85pub mod shutdown_handler;
86
87pub struct ClientComponents<BlockGenT, Rest> {
90 pub data_manager_weak_ptr: Weak<BlockDataManager>,
91 pub blockgen: Option<Arc<BlockGenT>>,
92 pub pos_handler: Option<Arc<PosVerifier>>,
93 pub other_components: Rest,
94}
95
96impl<BlockGenT, Rest: MallocSizeOf> MallocSizeOf
97 for ClientComponents<BlockGenT, Rest>
98{
99 fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
100 if let Some(data_man) = self.data_manager_weak_ptr.upgrade() {
101 let data_manager_size = data_man.size_of(ops);
102 data_manager_size + self.other_components.size_of(ops)
103 } else {
104 0
107 }
108 }
109}
110
111impl<BlockGenT: 'static + Stopable, Rest> ClientTrait
112 for ClientComponents<BlockGenT, Rest>
113{
114 fn take_out_components_for_shutdown(
115 &self,
116 ) -> (
117 Weak<BlockDataManager>,
118 Option<Arc<PosVerifier>>,
119 Option<Arc<dyn Stopable>>,
120 ) {
121 debug!("take_out_components_for_shutdown");
122 let data_manager_weak_ptr = self.data_manager_weak_ptr.clone();
123 let blockgen: Option<Arc<dyn Stopable>> = match self.blockgen.clone() {
124 Some(blockgen) => Some(blockgen),
125 None => None,
126 };
127
128 (data_manager_weak_ptr, self.pos_handler.clone(), blockgen)
129 }
130}
131
132pub trait ClientTrait {
133 fn take_out_components_for_shutdown(
134 &self,
135 ) -> (
136 Weak<BlockDataManager>,
137 Option<Arc<PosVerifier>>,
138 Option<Arc<dyn Stopable>>,
139 );
140}
141
142pub fn initialize_common_modules(
143 conf: &mut Configuration, exit: Arc<(Mutex<bool>, Condvar)>,
144 node_type: NodeType,
145) -> Result<
146 (
147 Arc<Machine>,
148 Arc<SecretStore>,
149 HashMap<Address, U256>,
150 Arc<BlockDataManager>,
151 Arc<PowComputer>,
152 Arc<PosVerifier>,
153 Arc<TransactionPool>,
154 Arc<ConsensusGraph>,
155 Arc<SynchronizationGraph>,
156 Arc<NetworkService>,
157 Arc<CommonRpcImpl>,
158 Arc<AccountProvider>,
159 Arc<Notifications>,
160 PubSubClient,
161 Arc<TokioRuntime>,
162 ),
163 String,
164> {
165 info!("Working directory: {:?}", std::env::current_dir());
166
167 let (self_pos_private_key, self_vrf_private_key) = {
169 let key_path = Path::new(&conf.raw_conf.pos_private_key_path);
170
171 let read_pos_password = |prompt: &str| -> Result<Vec<u8>, String> {
172 match rpassword::read_password_from_tty(Some(prompt)) {
173 Ok(password) => Ok(password.into_bytes()),
174 Err(e) => {
175 let mut msg = format!("{:?}", e);
176 if e.raw_os_error() == Some(6) {
182 msg.push_str(" Hint: maybe no controlling TTY detected (macOS ENXIO: \"Device not configured\"). If you are running under VS Code debugger or with redirected stdio, set `CFX_POS_KEY_ENCRYPTION_PASSWORD` env var to avoid interactive prompting.");
183 }
184 Err(msg)
185 }
186 }
187 };
188
189 let default_passwd = if conf.is_test_or_dev_mode() {
190 Some(vec![])
191 } else {
192 conf.raw_conf
193 .dev_pos_private_key_encryption_password
194 .clone()
195 .or(std::env::var("CFX_POS_KEY_ENCRYPTION_PASSWORD").ok())
198 .map(|s| s.into_bytes())
199 };
200 if key_path.exists() {
201 let passwd = match default_passwd {
202 Some(p) => p,
203 None => read_pos_password(
204 "PoS key detected, please input your encryption password.\nPassword:",
205 )?,
206 };
207 match load_pri_key(key_path, &passwd) {
208 Ok((sk, vrf_sk)) => {
209 (ConfigKey::new(sk), ConfigKey::new(vrf_sk))
210 }
211 Err(e) => {
212 bail!("Load pos_key failed: {}", e);
213 }
214 }
215 } else {
216 create_dir_all(key_path.parent().unwrap()).unwrap();
217 let passwd = match default_passwd {
218 Some(p) => p,
219 None => {
220 let p = read_pos_password("PoS key is not detected and will be generated instead, please input your encryption password. This password is needed when you restart the node\nPassword:")?;
221 let p2 = read_pos_password("Repeat Password:")?;
222 if p != p2 {
223 bail!("Passwords do not match!");
224 }
225 p
226 }
227 };
228 let mut rng = StdRng::from_rng(OsRng).unwrap();
229 let private_key = ConsensusPrivateKey::generate(&mut rng);
230 let vrf_private_key = ConsensusVRFPrivateKey::generate(&mut rng);
231 save_pri_key(key_path, &passwd, &(&private_key, &vrf_private_key))
232 .expect("error saving private key");
233 (ConfigKey::new(private_key), ConfigKey::new(vrf_private_key))
234 }
235 };
236
237 let worker_thread_pool = Arc::new(Mutex::new(ThreadPool::with_name(
238 "Tx Recover".into(),
239 WORKER_COMPUTATION_PARALLELISM,
240 )));
241
242 let network_config = conf.net_config()?;
243 let cache_config = conf.cache_config();
244
245 let (db_path, db_config) = conf.db_config();
246 let ledger_db = db::open_database(db_path.to_str().unwrap(), &db_config)
247 .map_err(|e| format!("Failed to open database {:?}", e))?;
248
249 let secret_store = Arc::new(SecretStore::new());
250 let storage_manager = Arc::new(
251 StorageManager::new(conf.storage_config(&node_type))
252 .expect("Failed to initialize storage."),
253 );
254 {
255 let storage_manager_log_weak_ptr = Arc::downgrade(&storage_manager);
256 let exit_clone = exit.clone();
257 thread::spawn(move || loop {
258 let mut exit_lock = exit_clone.0.lock();
259 if exit_clone
260 .1
261 .wait_for(&mut exit_lock, Duration::from_millis(5000))
262 .timed_out()
263 {
264 let manager = storage_manager_log_weak_ptr.upgrade();
265 match manager {
266 None => return,
267 Some(manager) => manager.log_usage(),
268 };
269 } else {
270 return;
271 }
272 });
273 }
274
275 let genesis_accounts = if conf.is_test_or_dev_mode() {
276 match (
277 &conf.raw_conf.genesis_secrets,
278 &conf.raw_conf.genesis_evm_secrets,
279 ) {
280 (Some(file), evm_file) => {
281 let mut accounts = genesis::load_secrets_file(
283 file,
284 secret_store.as_ref(),
285 Space::Native,
286 )?;
287
288 if let Some(evm_file) = evm_file {
290 let evm_accounts = genesis::load_secrets_file(
291 evm_file,
292 secret_store.as_ref(),
293 Space::Ethereum,
294 )?;
295 accounts.extend(evm_accounts);
296 }
297 accounts
298 }
299 (None, Some(evm_file)) => {
300 genesis::load_secrets_file(
302 evm_file,
303 secret_store.as_ref(),
304 Space::Ethereum,
305 )?
306 }
307 (None, None) => genesis::default(conf.is_test_or_dev_mode()),
308 }
309 } else {
310 match conf.raw_conf.genesis_accounts {
311 Some(ref file) => genesis::load_file(file, |addr_str| {
312 parse_config_address_string(
313 addr_str,
314 network_config.get_network_type(),
315 )
316 })?,
317 None => genesis::default(conf.is_test_or_dev_mode()),
318 }
319 };
320
321 let initial_nodes = if conf.raw_conf.pos_reference_enable_height == 0 {
323 Some(
324 read_initial_nodes_from_file(
325 conf.raw_conf.pos_initial_nodes_path.as_str(),
326 )
327 .expect("Genesis must have been initialized with pos"),
328 )
329 } else {
330 None
331 };
332
333 let consensus_conf = conf.consensus_config();
334 let vm = VmFactory::new(1024 * 32);
335 let machine = Arc::new(Machine::new_with_builtin(conf.common_params(), vm));
336
337 let genesis_block = genesis_block(
338 &storage_manager,
339 genesis_accounts.clone(),
340 GENESIS_ACCOUNT_ADDRESS,
341 U256::zero(),
342 machine.clone(),
343 conf.raw_conf.execute_genesis, conf.raw_conf.chain_id,
345 &initial_nodes,
346 );
347 storage_manager.notify_genesis_hash(genesis_block.hash());
348 let mut genesis_accounts = genesis_accounts;
349 let genesis_accounts = genesis_accounts
350 .drain()
351 .filter(|(addr, _)| addr.space == Space::Native)
352 .map(|(addr, x)| (addr.address, x))
353 .collect();
354 debug!("Initialize genesis_block={:?}", genesis_block);
355 if conf.raw_conf.pos_genesis_pivot_decision.is_none() {
356 conf.raw_conf.pos_genesis_pivot_decision = Some(genesis_block.hash());
357 }
358
359 let pow_config = conf.pow_config();
360 let pow = Arc::new(PowComputer::new(pow_config.use_octopus()));
361
362 let data_man = Arc::new(BlockDataManager::new(
363 cache_config,
364 Arc::new(genesis_block),
365 ledger_db.clone(),
366 storage_manager,
367 worker_thread_pool,
368 conf.data_mananger_config(),
369 pow.clone(),
370 ));
371
372 let network = {
373 let mut rng = StdRng::from_rng(OsRng).unwrap();
374 let private_key = ConsensusPrivateKey::generate(&mut rng);
375 let vrf_private_key = ConsensusVRFPrivateKey::generate(&mut rng);
376 let mut network = NetworkService::new(network_config.clone());
377 network
378 .initialize((
379 private_key.public_key(),
380 vrf_private_key.public_key(),
381 ))
382 .unwrap();
383 Arc::new(network)
384 };
385
386 let pos_verifier = Arc::new(PosVerifier::new(
387 Some(network.clone()),
388 PosConfiguration {
389 bls_key: self_pos_private_key,
390 vrf_key: self_vrf_private_key,
391 diem_conf_path: conf.raw_conf.pos_config_path.clone(),
392 protocol_conf: conf.protocol_config(),
393 pos_initial_nodes_path: conf
394 .raw_conf
395 .pos_initial_nodes_path
396 .clone(),
397 vrf_proposal_threshold: conf.raw_conf.vrf_proposal_threshold,
398 pos_state_config: conf.pos_state_config(),
399 },
400 conf.raw_conf.pos_reference_enable_height,
401 ));
402 let verification_config = conf.verification_config(machine.clone());
403 let txpool = Arc::new(TransactionPool::new(
404 conf.txpool_config(),
405 verification_config.clone(),
406 data_man.clone(),
407 machine.clone(),
408 ));
409
410 let statistics = Arc::new(Statistics::new());
411 let notifications = Notifications::init();
412 let pivot_hint = if let Some(conf) = &consensus_conf.pivot_hint_conf {
413 Some(Arc::new(PivotHint::new(conf)?))
414 } else {
415 None
416 };
417
418 let consensus = Arc::new(ConsensusGraph::new(
419 consensus_conf,
420 txpool.clone(),
421 statistics.clone(),
422 data_man.clone(),
423 pow_config.clone(),
424 pow.clone(),
425 notifications.clone(),
426 conf.execution_config(),
427 verification_config.clone(),
428 node_type,
429 pos_verifier.clone(),
430 pivot_hint,
431 conf.common_params(),
432 ));
433
434 for terminal in data_man
435 .terminals_from_db()
436 .unwrap_or(vec![data_man.get_cur_consensus_era_genesis_hash()])
437 {
438 if data_man.block_height_by_hash(&terminal).unwrap()
439 >= conf.raw_conf.pos_reference_enable_height
440 {
441 pos_verifier.initialize(consensus.clone())?;
442 break;
443 }
444 }
445
446 let sync_config = conf.sync_graph_config();
447
448 let sync_graph = Arc::new(SynchronizationGraph::new(
449 consensus.clone(),
450 data_man.clone(),
451 statistics.clone(),
452 verification_config,
453 pow_config,
454 pow.clone(),
455 sync_config,
456 notifications.clone(),
457 machine.clone(),
458 pos_verifier.clone(),
459 ));
460 let refresh_time =
461 Duration::from_millis(conf.raw_conf.account_provider_refresh_time_ms);
462
463 let accounts = Arc::new(
464 account_provider(
465 Some(keys_path()),
466 None, Some(refresh_time),
468 )
469 .expect("failed to initialize account provider"),
470 );
471
472 let common_impl = Arc::new(CommonRpcImpl::new(
473 exit,
474 consensus.clone(),
475 network.clone(),
476 txpool.clone(),
477 accounts.clone(),
478 pos_verifier.clone(),
479 ));
480 let tokio_runtime =
481 Arc::new(TokioRuntime::new().map_err(|e| e.to_string())?);
482
483 let pubsub = PubSubClient::new(
484 tokio_runtime.clone(),
485 consensus.clone(),
486 notifications.clone(),
487 *network.get_network_type(),
488 );
489
490 Ok((
491 machine,
492 secret_store,
493 genesis_accounts,
494 data_man,
495 pow,
496 pos_verifier,
497 txpool,
498 consensus,
499 sync_graph,
500 network,
501 common_impl,
502 accounts,
503 notifications,
504 pubsub,
505 tokio_runtime,
506 ))
507}
508
509pub fn initialize_not_light_node_modules(
510 conf: &mut Configuration, exit: Arc<(Mutex<bool>, Condvar)>,
511 node_type: NodeType,
512) -> Result<
513 (
514 Arc<BlockDataManager>,
515 Arc<PowComputer>,
516 Arc<TransactionPool>,
517 Arc<ConsensusGraph>,
518 Arc<SynchronizationService>,
519 Arc<BlockGenerator>,
520 Option<HttpServer>,
521 Option<HttpServer>,
522 Option<TcpServer>,
523 Option<TcpServer>,
524 Option<WSServer>,
525 Option<WSServer>,
526 Arc<PosVerifier>,
527 Arc<TokioRuntime>,
528 Option<RpcServerHandle>,
529 Option<RpcServerHandle>,
530 Option<RpcServerHandle>,
531 TaskManager,
532 ),
533 String,
534> {
535 let (
536 _machine,
537 secret_store,
538 genesis_accounts,
539 data_man,
540 pow,
541 pos_verifier,
542 txpool,
543 consensus,
544 sync_graph,
545 network,
546 common_impl,
547 accounts,
548 notifications,
549 pubsub,
550 tokio_runtime,
551 ) = initialize_common_modules(conf, exit.clone(), node_type)?;
552
553 let light_provider = Arc::new(LightProvider::new(
554 consensus.clone(),
555 sync_graph.clone(),
556 Arc::downgrade(&network),
557 txpool.clone(),
558 conf.raw_conf.throttling_conf.clone(),
559 node_type,
560 ));
561 light_provider.register(network.clone()).unwrap();
562
563 let sync = Arc::new(SynchronizationService::new(
564 node_type,
565 network.clone(),
566 sync_graph.clone(),
567 conf.protocol_config(),
568 conf.state_sync_config(),
569 SyncPhaseType::CatchUpRecoverBlockHeaderFromDB,
570 light_provider,
571 consensus.clone(),
572 ));
573 sync.register().unwrap();
574
575 if let Some(print_memory_usage_period_s) =
576 conf.raw_conf.print_memory_usage_period_s
577 {
578 let secret_store = secret_store.clone();
579 let data_man = data_man.clone();
580 let txpool = txpool.clone();
581 let consensus = consensus.clone();
582 let sync = sync.clone();
583 thread::Builder::new().name("MallocSizeOf".into()).spawn(
584 move || loop {
585 let start = Instant::now();
586 let mb = 1_000_000;
587 let mut ops = new_malloc_size_ops();
588 let secret_store_size = secret_store.size_of(&mut ops) / mb;
589 let data_manager_db_cache_size = data_man.db_manager.size_of(&mut ops) / mb;
592 let storage_manager_size = data_man.storage_manager.size_of(&mut ops) / mb;
593 let data_man_size = data_man.size_of(&mut ops) / mb;
594 let tx_pool_size = txpool.size_of(&mut ops) / mb;
595 let consensus_graph_size = consensus.size_of(&mut ops) / mb;
596 let sync_graph_size =
597 sync.get_synchronization_graph().size_of(&mut ops) / mb;
598 let sync_service_size = sync.size_of(&mut ops) / mb;
599 info!(
600 "Malloc Size(MB): secret_store={} data_manager_db_cache_size={} \
601 storage_manager_size={} data_man={} txpool={} consensus={} sync_graph={}\
602 sync_service={}, \
603 time elapsed={:?}",
604 secret_store_size,data_manager_db_cache_size,storage_manager_size,
605 data_man_size, tx_pool_size, consensus_graph_size, sync_graph_size,
606 sync_service_size, start.elapsed(),
607 );
608 thread::sleep(Duration::from_secs(
609 print_memory_usage_period_s,
610 ));
611 },
612 ).expect("Memory usage thread start fails");
613 }
614
615 let (maybe_txgen, maybe_direct_txgen) = initialize_txgens(
616 consensus.clone(),
617 txpool.clone(),
618 sync.clone(),
619 secret_store.clone(),
620 genesis_accounts,
621 &conf,
622 network.net_key_pair().unwrap(),
623 );
624
625 let maybe_author: Option<Address> =
626 conf.raw_conf.mining_author.as_ref().map(|addr_str| {
627 parse_config_address_string(addr_str, network.get_network_type())
628 .unwrap_or_else(|err| {
629 panic!("Error parsing mining-author {}", err)
630 })
631 });
632 let pow_config = conf.pow_config();
633 let blockgen = Arc::new(BlockGenerator::new(
634 sync_graph,
635 txpool.clone(),
636 sync.clone(),
637 maybe_txgen.clone(),
638 pow_config.clone(),
639 pow.clone(),
640 maybe_author.clone().unwrap_or_default(),
641 pos_verifier.clone(),
642 ));
643 if conf.is_dev_mode() {
644 if let Some(interval_ms) = conf.raw_conf.dev_block_interval_ms {
647 let bg = blockgen.test_api();
649 info!("Start auto block generation");
650 thread::Builder::new()
651 .name("auto_mining".into())
652 .spawn(move || {
653 bg.auto_block_generation(interval_ms);
654 })
655 .expect("Mining thread spawn error");
656 }
657 } else if let Some(author) = maybe_author {
658 if !author.is_genesis_valid_address() || author.is_builtin_address() {
659 panic!("mining-author must be user address or contract address, otherwise you will not get mining rewards!!!");
660 }
661 if pow_config.enable_mining() {
662 let bg = blockgen.clone();
663 tokio_runtime.spawn(async move {
664 bg.mine().await;
665 });
666 }
667 }
668
669 let use_old_core_rpc = conf.raw_conf.core_space_rpc_use_old_impl;
670
671 let rpc_impl = Arc::new(RpcImpl::new(
672 consensus.clone(),
673 sync.clone(),
674 blockgen.test_api(),
675 txpool.clone(),
676 maybe_txgen.clone(),
677 maybe_direct_txgen.clone(),
678 conf.rpc_impl_config(),
679 accounts.clone(),
680 ));
681
682 let (
686 debug_rpc_http_server,
687 rpc_http_server,
688 debug_rpc_tcp_server,
689 rpc_tcp_server,
690 debug_rpc_ws_server,
691 rpc_ws_server,
692 ) = if use_old_core_rpc {
693 let debug_rpc_http_server = super::rpc::start_http(
694 conf.local_http_config(),
695 setup_debug_rpc_apis(
696 common_impl.clone(),
697 rpc_impl.clone(),
698 pubsub.clone(),
699 &conf,
700 ),
701 )?;
702
703 let debug_rpc_tcp_server = super::rpc::start_tcp(
704 conf.local_tcp_config(),
705 setup_debug_rpc_apis(
706 common_impl.clone(),
707 rpc_impl.clone(),
708 pubsub.clone(),
709 &conf,
710 ),
711 RpcExtractor,
712 )?;
713
714 let debug_rpc_ws_server = super::rpc::start_ws(
715 conf.local_ws_config(),
716 setup_public_rpc_apis(
717 common_impl.clone(),
718 rpc_impl.clone(),
719 pubsub.clone(),
720 &conf,
721 ),
722 RpcExtractor,
723 )?;
724
725 let rpc_tcp_server = super::rpc::start_tcp(
726 conf.tcp_config(),
727 setup_public_rpc_apis(
728 common_impl.clone(),
729 rpc_impl.clone(),
730 pubsub.clone(),
731 &conf,
732 ),
733 RpcExtractor,
734 )?;
735
736 let rpc_ws_server = super::rpc::start_ws(
737 conf.ws_config(),
738 setup_public_rpc_apis(
739 common_impl.clone(),
740 rpc_impl.clone(),
741 pubsub.clone(),
742 &conf,
743 ),
744 RpcExtractor,
745 )?;
746
747 let rpc_http_server = super::rpc::start_http(
748 conf.http_config(),
749 setup_public_rpc_apis(common_impl, rpc_impl, pubsub, &conf),
750 )?;
751
752 (
753 debug_rpc_http_server,
754 rpc_http_server,
755 debug_rpc_tcp_server,
756 rpc_tcp_server,
757 debug_rpc_ws_server,
758 rpc_ws_server,
759 )
760 } else {
761 info!("Using new async core space RPC implementation");
762 (None, None, None, None, None, None)
763 };
764
765 let task_manager = TaskManager::new(tokio_runtime.handle().clone());
766 let task_executor = task_manager.executor();
767
768 let eth_rpc_server_handle =
769 tokio_runtime.block_on(launch_async_rpc_servers(
770 consensus.clone(),
771 sync.clone(),
772 txpool.clone(),
773 notifications.clone(),
774 task_executor.clone(),
775 conf,
776 ))?;
777
778 let (cfx_rpc_server_handle, debug_cfx_rpc_server_handle) =
781 if !use_old_core_rpc {
782 let cfx_rpc_server_handle =
783 tokio_runtime.block_on(launch_cfx_async_rpc_servers(
784 consensus.clone(),
785 sync.clone(),
786 txpool.clone(),
787 data_man.clone(),
788 network.clone(),
789 pos_verifier.clone(),
790 notifications.clone(),
791 task_executor.clone(),
792 accounts.clone(),
793 exit.clone(),
794 blockgen.test_api(),
795 maybe_txgen.clone(),
796 maybe_direct_txgen.clone(),
797 conf,
798 conf.raw_conf.public_rpc_apis.clone(),
799 false,
800 ))?;
801 let debug_cfx_rpc_server_handle =
802 tokio_runtime.block_on(launch_cfx_async_rpc_servers(
803 consensus.clone(),
804 sync.clone(),
805 txpool.clone(),
806 data_man.clone(),
807 network.clone(),
808 pos_verifier.clone(),
809 notifications.clone(),
810 task_executor.clone(),
811 accounts.clone(),
812 exit.clone(),
813 blockgen.test_api(),
814 maybe_txgen.clone(),
815 maybe_direct_txgen.clone(),
816 conf,
817 ApiSet::All,
818 true,
819 ))?;
820 (cfx_rpc_server_handle, debug_cfx_rpc_server_handle)
821 } else {
822 (None, None)
823 };
824
825 #[cfg(all(unix, feature = "jemalloc-prof"))]
828 if let Some(pprf_addr) = conf.raw_conf.profiling_listen_addr.as_ref() {
829 let pprf_addr = pprf_addr.clone();
830 let _pprf_server_handle = tokio_runtime.spawn(async move {
831 if let Err(e) = start_pprf_server(&pprf_addr).await {
832 eprintln!("Error starting pprof server: {}", e);
833 }
834 });
835 }
836
837 metrics::initialize(conf.metrics_config(), task_executor.clone());
838
839 network.start();
840
841 Ok((
842 data_man,
843 pow,
844 txpool,
845 consensus,
846 sync,
847 blockgen,
848 debug_rpc_http_server,
849 rpc_http_server,
850 debug_rpc_tcp_server,
851 rpc_tcp_server,
852 debug_rpc_ws_server,
853 rpc_ws_server,
854 pos_verifier,
855 tokio_runtime,
856 eth_rpc_server_handle,
857 cfx_rpc_server_handle,
858 debug_cfx_rpc_server_handle,
859 task_manager,
860 ))
861}
862
863pub fn initialize_txgens(
864 consensus: Arc<ConsensusGraph>, txpool: Arc<TransactionPool>,
865 sync: Arc<SynchronizationService>, secret_store: SharedSecretStore,
866 genesis_accounts: HashMap<Address, U256>, conf: &Configuration,
867 network_key_pair: KeyPair,
868) -> (
869 Option<Arc<TransactionGenerator>>,
870 Option<Arc<Mutex<DirectTransactionGenerator>>>,
871) {
872 let maybe_direct_txgen_with_contract = if conf.is_test_or_dev_mode() {
875 Some(Arc::new(Mutex::new(DirectTransactionGenerator::new(
876 network_key_pair,
877 &public_to_address(DEV_GENESIS_KEY_PAIR_2.public(), true),
878 U256::from_dec_str("10000000000000000").unwrap(),
879 U256::from_dec_str("10000000000000000").unwrap(),
880 ))))
881 } else {
882 None
883 };
884
885 let maybe_multi_genesis_txgen = if let Some(txgen_conf) =
888 conf.tx_gen_config()
889 {
890 let multi_genesis_txgen = Arc::new(TransactionGenerator::new(
891 consensus.clone(),
892 txpool.clone(),
893 sync.clone(),
894 secret_store.clone(),
895 ));
896 if txgen_conf.generate_tx {
897 let txgen_clone = multi_genesis_txgen.clone();
898 let join_handle =
899 thread::Builder::new()
900 .name("txgen".into())
901 .spawn(move || {
902 TransactionGenerator::generate_transactions_with_multiple_genesis_accounts(
903 txgen_clone,
904 txgen_conf,
905 genesis_accounts,
906 );
907 })
908 .expect("should succeed");
909 multi_genesis_txgen.set_join_handle(join_handle);
910 }
911 Some(multi_genesis_txgen)
912 } else {
913 None
914 };
915
916 (maybe_multi_genesis_txgen, maybe_direct_txgen_with_contract)
917}