1use log::{debug, info};
6use std::{
7 collections::HashMap,
8 fs::create_dir_all,
9 path::Path,
10 sync::{Arc, Weak},
11 thread,
12 time::{Duration, Instant},
13};
14
15use cfx_rpc_builder::RpcServerHandle;
16use cfx_util_macros::bail;
17use jsonrpc_http_server::Server as HttpServer;
18use jsonrpc_tcp_server::Server as TcpServer;
19use jsonrpc_ws_server::Server as WSServer;
20use parking_lot::{Condvar, Mutex};
21use rand_08::{prelude::StdRng, rngs::OsRng, SeedableRng};
22use threadpool::ThreadPool;
23
24use blockgen::BlockGenerator;
25use cfx_executor::machine::{Machine, VmFactory};
26use cfx_parameters::genesis::{
27 DEV_GENESIS_KEY_PAIR_2, GENESIS_ACCOUNT_ADDRESS,
28};
29use cfx_storage::StorageManager;
30use cfx_tasks::TaskManager;
31use cfx_types::{address_util::AddressUtil, Address, Space, U256};
32pub use cfxcore::pos::pos::PosDropHandle;
33use cfxcore::{
34 block_data_manager::BlockDataManager,
35 consensus::{
36 pivot_hint::PivotHint,
37 pos_handler::{PosConfiguration, PosVerifier},
38 },
39 genesis_block::{self as genesis, genesis_block},
40 pow::PowComputer,
41 statistics::Statistics,
42 sync::SyncPhaseType,
43 ConsensusGraph, LightProvider, NodeType, Notifications, Stopable,
44 SynchronizationGraph, SynchronizationService, TransactionPool,
45 WORKER_COMPUTATION_PARALLELISM,
46};
47use cfxcore_accounts::AccountProvider;
48use cfxkey::public_to_address;
49use diem_config::keys::ConfigKey;
50use diem_crypto::{
51 key_file::{load_pri_key, save_pri_key},
52 PrivateKey, Uniform,
53};
54use diem_types::validator_config::{
55 ConsensusPrivateKey, ConsensusVRFPrivateKey,
56};
57use malloc_size_of::{new_malloc_size_ops, MallocSizeOf, MallocSizeOfOps};
58use network::NetworkService;
59use secret_store::{SecretStore, SharedSecretStore};
60use tokio::runtime::Runtime as TokioRuntime;
61use txgen::{DirectTransactionGenerator, TransactionGenerator};
62
63use cfx_config::{parse_config_address_string, Configuration};
64
65use crate::{
66 accounts::{account_provider, keys_path},
67 keylib::KeyPair,
68 rpc::{
69 extractor::RpcExtractor,
70 impls::{
71 cfx::RpcImpl, common::RpcImpl as CommonRpcImpl,
72 pubsub::PubSubClient,
73 },
74 launch_async_rpc_servers, setup_debug_rpc_apis, setup_public_rpc_apis,
75 },
76};
77#[cfg(all(unix, feature = "jemalloc-prof"))]
78use cfx_mallocator_utils::start_pprf_server;
79use cfxcore::consensus::pos_handler::read_initial_nodes_from_file;
80
81pub mod delegate_convert;
82pub mod shutdown_handler;
83
84pub struct ClientComponents<BlockGenT, Rest> {
87 pub data_manager_weak_ptr: Weak<BlockDataManager>,
88 pub blockgen: Option<Arc<BlockGenT>>,
89 pub pos_handler: Option<Arc<PosVerifier>>,
90 pub other_components: Rest,
91}
92
93impl<BlockGenT, Rest: MallocSizeOf> MallocSizeOf
94 for ClientComponents<BlockGenT, Rest>
95{
96 fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
97 if let Some(data_man) = self.data_manager_weak_ptr.upgrade() {
98 let data_manager_size = data_man.size_of(ops);
99 data_manager_size + self.other_components.size_of(ops)
100 } else {
101 0
104 }
105 }
106}
107
108impl<BlockGenT: 'static + Stopable, Rest> ClientTrait
109 for ClientComponents<BlockGenT, Rest>
110{
111 fn take_out_components_for_shutdown(
112 &self,
113 ) -> (
114 Weak<BlockDataManager>,
115 Option<Arc<PosVerifier>>,
116 Option<Arc<dyn Stopable>>,
117 ) {
118 debug!("take_out_components_for_shutdown");
119 let data_manager_weak_ptr = self.data_manager_weak_ptr.clone();
120 let blockgen: Option<Arc<dyn Stopable>> = match self.blockgen.clone() {
121 Some(blockgen) => Some(blockgen),
122 None => None,
123 };
124
125 (data_manager_weak_ptr, self.pos_handler.clone(), blockgen)
126 }
127}
128
129pub trait ClientTrait {
130 fn take_out_components_for_shutdown(
131 &self,
132 ) -> (
133 Weak<BlockDataManager>,
134 Option<Arc<PosVerifier>>,
135 Option<Arc<dyn Stopable>>,
136 );
137}
138
139pub fn initialize_common_modules(
140 conf: &mut Configuration, exit: Arc<(Mutex<bool>, Condvar)>,
141 node_type: NodeType,
142) -> Result<
143 (
144 Arc<Machine>,
145 Arc<SecretStore>,
146 HashMap<Address, U256>,
147 Arc<BlockDataManager>,
148 Arc<PowComputer>,
149 Arc<PosVerifier>,
150 Arc<TransactionPool>,
151 Arc<ConsensusGraph>,
152 Arc<SynchronizationGraph>,
153 Arc<NetworkService>,
154 Arc<CommonRpcImpl>,
155 Arc<AccountProvider>,
156 Arc<Notifications>,
157 PubSubClient,
158 Arc<TokioRuntime>,
159 ),
160 String,
161> {
162 info!("Working directory: {:?}", std::env::current_dir());
163
164 let (self_pos_private_key, self_vrf_private_key) = {
166 let key_path = Path::new(&conf.raw_conf.pos_private_key_path);
167
168 let read_pos_password = |prompt: &str| -> Result<Vec<u8>, String> {
169 match rpassword::read_password_from_tty(Some(prompt)) {
170 Ok(password) => Ok(password.into_bytes()),
171 Err(e) => {
172 let mut msg = format!("{:?}", e);
173 if e.raw_os_error() == Some(6) {
179 msg.push_str(" Hint: maybe no controlling TTY detected (macOS ENXIO: \"Device not configured\"). If you are running under VS Code debugger or with redirected stdio, set `CFX_POS_KEY_ENCRYPTION_PASSWORD` env var to avoid interactive prompting.");
180 }
181 Err(msg)
182 }
183 }
184 };
185
186 let default_passwd = if conf.is_test_or_dev_mode() {
187 Some(vec![])
188 } else {
189 conf.raw_conf
190 .dev_pos_private_key_encryption_password
191 .clone()
192 .or(std::env::var("CFX_POS_KEY_ENCRYPTION_PASSWORD").ok())
195 .map(|s| s.into_bytes())
196 };
197 if key_path.exists() {
198 let passwd = match default_passwd {
199 Some(p) => p,
200 None => read_pos_password(
201 "PoS key detected, please input your encryption password.\nPassword:",
202 )?,
203 };
204 match load_pri_key(key_path, &passwd) {
205 Ok((sk, vrf_sk)) => {
206 (ConfigKey::new(sk), ConfigKey::new(vrf_sk))
207 }
208 Err(e) => {
209 bail!("Load pos_key failed: {}", e);
210 }
211 }
212 } else {
213 create_dir_all(key_path.parent().unwrap()).unwrap();
214 let passwd = match default_passwd {
215 Some(p) => p,
216 None => {
217 let p = read_pos_password("PoS key is not detected and will be generated instead, please input your encryption password. This password is needed when you restart the node\nPassword:")?;
218 let p2 = read_pos_password("Repeat Password:")?;
219 if p != p2 {
220 bail!("Passwords do not match!");
221 }
222 p
223 }
224 };
225 let mut rng = StdRng::from_rng(OsRng).unwrap();
226 let private_key = ConsensusPrivateKey::generate(&mut rng);
227 let vrf_private_key = ConsensusVRFPrivateKey::generate(&mut rng);
228 save_pri_key(key_path, &passwd, &(&private_key, &vrf_private_key))
229 .expect("error saving private key");
230 (ConfigKey::new(private_key), ConfigKey::new(vrf_private_key))
231 }
232 };
233
234 let worker_thread_pool = Arc::new(Mutex::new(ThreadPool::with_name(
235 "Tx Recover".into(),
236 WORKER_COMPUTATION_PARALLELISM,
237 )));
238
239 let network_config = conf.net_config()?;
240 let cache_config = conf.cache_config();
241
242 let (db_path, db_config) = conf.db_config();
243 let ledger_db = db::open_database(db_path.to_str().unwrap(), &db_config)
244 .map_err(|e| format!("Failed to open database {:?}", e))?;
245
246 let secret_store = Arc::new(SecretStore::new());
247 let storage_manager = Arc::new(
248 StorageManager::new(conf.storage_config(&node_type))
249 .expect("Failed to initialize storage."),
250 );
251 {
252 let storage_manager_log_weak_ptr = Arc::downgrade(&storage_manager);
253 let exit_clone = exit.clone();
254 thread::spawn(move || loop {
255 let mut exit_lock = exit_clone.0.lock();
256 if exit_clone
257 .1
258 .wait_for(&mut exit_lock, Duration::from_millis(5000))
259 .timed_out()
260 {
261 let manager = storage_manager_log_weak_ptr.upgrade();
262 match manager {
263 None => return,
264 Some(manager) => manager.log_usage(),
265 };
266 } else {
267 return;
268 }
269 });
270 }
271
272 let genesis_accounts = if conf.is_test_or_dev_mode() {
273 match (
274 &conf.raw_conf.genesis_secrets,
275 &conf.raw_conf.genesis_evm_secrets,
276 ) {
277 (Some(file), evm_file) => {
278 let mut accounts = genesis::load_secrets_file(
280 file,
281 secret_store.as_ref(),
282 Space::Native,
283 )?;
284
285 if let Some(evm_file) = evm_file {
287 let evm_accounts = genesis::load_secrets_file(
288 evm_file,
289 secret_store.as_ref(),
290 Space::Ethereum,
291 )?;
292 accounts.extend(evm_accounts);
293 }
294 accounts
295 }
296 (None, Some(evm_file)) => {
297 genesis::load_secrets_file(
299 evm_file,
300 secret_store.as_ref(),
301 Space::Ethereum,
302 )?
303 }
304 (None, None) => genesis::default(conf.is_test_or_dev_mode()),
305 }
306 } else {
307 match conf.raw_conf.genesis_accounts {
308 Some(ref file) => genesis::load_file(file, |addr_str| {
309 parse_config_address_string(
310 addr_str,
311 network_config.get_network_type(),
312 )
313 })?,
314 None => genesis::default(conf.is_test_or_dev_mode()),
315 }
316 };
317
318 let initial_nodes = if conf.raw_conf.pos_reference_enable_height == 0 {
320 Some(
321 read_initial_nodes_from_file(
322 conf.raw_conf.pos_initial_nodes_path.as_str(),
323 )
324 .expect("Genesis must have been initialized with pos"),
325 )
326 } else {
327 None
328 };
329
330 let consensus_conf = conf.consensus_config();
331 let vm = VmFactory::new(1024 * 32);
332 let machine = Arc::new(Machine::new_with_builtin(conf.common_params(), vm));
333
334 let genesis_block = genesis_block(
335 &storage_manager,
336 genesis_accounts.clone(),
337 GENESIS_ACCOUNT_ADDRESS,
338 U256::zero(),
339 machine.clone(),
340 conf.raw_conf.execute_genesis, conf.raw_conf.chain_id,
342 &initial_nodes,
343 );
344 storage_manager.notify_genesis_hash(genesis_block.hash());
345 let mut genesis_accounts = genesis_accounts;
346 let genesis_accounts = genesis_accounts
347 .drain()
348 .filter(|(addr, _)| addr.space == Space::Native)
349 .map(|(addr, x)| (addr.address, x))
350 .collect();
351 debug!("Initialize genesis_block={:?}", genesis_block);
352 if conf.raw_conf.pos_genesis_pivot_decision.is_none() {
353 conf.raw_conf.pos_genesis_pivot_decision = Some(genesis_block.hash());
354 }
355
356 let pow_config = conf.pow_config();
357 let pow = Arc::new(PowComputer::new(pow_config.use_octopus()));
358
359 let data_man = Arc::new(BlockDataManager::new(
360 cache_config,
361 Arc::new(genesis_block),
362 ledger_db.clone(),
363 storage_manager,
364 worker_thread_pool,
365 conf.data_mananger_config(),
366 pow.clone(),
367 ));
368
369 let network = {
370 let mut rng = StdRng::from_rng(OsRng).unwrap();
371 let private_key = ConsensusPrivateKey::generate(&mut rng);
372 let vrf_private_key = ConsensusVRFPrivateKey::generate(&mut rng);
373 let mut network = NetworkService::new(network_config.clone());
374 network
375 .initialize((
376 private_key.public_key(),
377 vrf_private_key.public_key(),
378 ))
379 .unwrap();
380 Arc::new(network)
381 };
382
383 let pos_verifier = Arc::new(PosVerifier::new(
384 Some(network.clone()),
385 PosConfiguration {
386 bls_key: self_pos_private_key,
387 vrf_key: self_vrf_private_key,
388 diem_conf_path: conf.raw_conf.pos_config_path.clone(),
389 protocol_conf: conf.protocol_config(),
390 pos_initial_nodes_path: conf
391 .raw_conf
392 .pos_initial_nodes_path
393 .clone(),
394 vrf_proposal_threshold: conf.raw_conf.vrf_proposal_threshold,
395 pos_state_config: conf.pos_state_config(),
396 },
397 conf.raw_conf.pos_reference_enable_height,
398 ));
399 let verification_config = conf.verification_config(machine.clone());
400 let txpool = Arc::new(TransactionPool::new(
401 conf.txpool_config(),
402 verification_config.clone(),
403 data_man.clone(),
404 machine.clone(),
405 ));
406
407 let statistics = Arc::new(Statistics::new());
408 let notifications = Notifications::init();
409 let pivot_hint = if let Some(conf) = &consensus_conf.pivot_hint_conf {
410 Some(Arc::new(PivotHint::new(conf)?))
411 } else {
412 None
413 };
414
415 let consensus = Arc::new(ConsensusGraph::new(
416 consensus_conf,
417 txpool.clone(),
418 statistics.clone(),
419 data_man.clone(),
420 pow_config.clone(),
421 pow.clone(),
422 notifications.clone(),
423 conf.execution_config(),
424 verification_config.clone(),
425 node_type,
426 pos_verifier.clone(),
427 pivot_hint,
428 conf.common_params(),
429 ));
430
431 for terminal in data_man
432 .terminals_from_db()
433 .unwrap_or(vec![data_man.get_cur_consensus_era_genesis_hash()])
434 {
435 if data_man.block_height_by_hash(&terminal).unwrap()
436 >= conf.raw_conf.pos_reference_enable_height
437 {
438 pos_verifier.initialize(consensus.clone())?;
439 break;
440 }
441 }
442
443 let sync_config = conf.sync_graph_config();
444
445 let sync_graph = Arc::new(SynchronizationGraph::new(
446 consensus.clone(),
447 data_man.clone(),
448 statistics.clone(),
449 verification_config,
450 pow_config,
451 pow.clone(),
452 sync_config,
453 notifications.clone(),
454 machine.clone(),
455 pos_verifier.clone(),
456 ));
457 let refresh_time =
458 Duration::from_millis(conf.raw_conf.account_provider_refresh_time_ms);
459
460 let accounts = Arc::new(
461 account_provider(
462 Some(keys_path()),
463 None, Some(refresh_time),
465 )
466 .expect("failed to initialize account provider"),
467 );
468
469 let common_impl = Arc::new(CommonRpcImpl::new(
470 exit,
471 consensus.clone(),
472 network.clone(),
473 txpool.clone(),
474 accounts.clone(),
475 pos_verifier.clone(),
476 ));
477 let tokio_runtime =
478 Arc::new(TokioRuntime::new().map_err(|e| e.to_string())?);
479
480 let pubsub = PubSubClient::new(
481 tokio_runtime.clone(),
482 consensus.clone(),
483 notifications.clone(),
484 *network.get_network_type(),
485 );
486
487 Ok((
488 machine,
489 secret_store,
490 genesis_accounts,
491 data_man,
492 pow,
493 pos_verifier,
494 txpool,
495 consensus,
496 sync_graph,
497 network,
498 common_impl,
499 accounts,
500 notifications,
501 pubsub,
502 tokio_runtime,
503 ))
504}
505
506pub fn initialize_not_light_node_modules(
507 conf: &mut Configuration, exit: Arc<(Mutex<bool>, Condvar)>,
508 node_type: NodeType,
509) -> Result<
510 (
511 Arc<BlockDataManager>,
512 Arc<PowComputer>,
513 Arc<TransactionPool>,
514 Arc<ConsensusGraph>,
515 Arc<SynchronizationService>,
516 Arc<BlockGenerator>,
517 Option<HttpServer>,
518 Option<HttpServer>,
519 Option<TcpServer>,
520 Option<TcpServer>,
521 Option<WSServer>,
522 Option<WSServer>,
523 Arc<PosVerifier>,
524 Arc<TokioRuntime>,
525 Option<RpcServerHandle>,
526 TaskManager,
527 ),
528 String,
529> {
530 let (
531 _machine,
532 secret_store,
533 genesis_accounts,
534 data_man,
535 pow,
536 pos_verifier,
537 txpool,
538 consensus,
539 sync_graph,
540 network,
541 common_impl,
542 accounts,
543 notifications,
544 pubsub,
545 tokio_runtime,
546 ) = initialize_common_modules(conf, exit.clone(), node_type)?;
547
548 let light_provider = Arc::new(LightProvider::new(
549 consensus.clone(),
550 sync_graph.clone(),
551 Arc::downgrade(&network),
552 txpool.clone(),
553 conf.raw_conf.throttling_conf.clone(),
554 node_type,
555 ));
556 light_provider.register(network.clone()).unwrap();
557
558 let sync = Arc::new(SynchronizationService::new(
559 node_type,
560 network.clone(),
561 sync_graph.clone(),
562 conf.protocol_config(),
563 conf.state_sync_config(),
564 SyncPhaseType::CatchUpRecoverBlockHeaderFromDB,
565 light_provider,
566 consensus.clone(),
567 ));
568 sync.register().unwrap();
569
570 if let Some(print_memory_usage_period_s) =
571 conf.raw_conf.print_memory_usage_period_s
572 {
573 let secret_store = secret_store.clone();
574 let data_man = data_man.clone();
575 let txpool = txpool.clone();
576 let consensus = consensus.clone();
577 let sync = sync.clone();
578 thread::Builder::new().name("MallocSizeOf".into()).spawn(
579 move || loop {
580 let start = Instant::now();
581 let mb = 1_000_000;
582 let mut ops = new_malloc_size_ops();
583 let secret_store_size = secret_store.size_of(&mut ops) / mb;
584 let data_manager_db_cache_size = data_man.db_manager.size_of(&mut ops) / mb;
587 let storage_manager_size = data_man.storage_manager.size_of(&mut ops) / mb;
588 let data_man_size = data_man.size_of(&mut ops) / mb;
589 let tx_pool_size = txpool.size_of(&mut ops) / mb;
590 let consensus_graph_size = consensus.size_of(&mut ops) / mb;
591 let sync_graph_size =
592 sync.get_synchronization_graph().size_of(&mut ops) / mb;
593 let sync_service_size = sync.size_of(&mut ops) / mb;
594 info!(
595 "Malloc Size(MB): secret_store={} data_manager_db_cache_size={} \
596 storage_manager_size={} data_man={} txpool={} consensus={} sync_graph={}\
597 sync_service={}, \
598 time elapsed={:?}",
599 secret_store_size,data_manager_db_cache_size,storage_manager_size,
600 data_man_size, tx_pool_size, consensus_graph_size, sync_graph_size,
601 sync_service_size, start.elapsed(),
602 );
603 thread::sleep(Duration::from_secs(
604 print_memory_usage_period_s,
605 ));
606 },
607 ).expect("Memory usage thread start fails");
608 }
609
610 let (maybe_txgen, maybe_direct_txgen) = initialize_txgens(
611 consensus.clone(),
612 txpool.clone(),
613 sync.clone(),
614 secret_store.clone(),
615 genesis_accounts,
616 &conf,
617 network.net_key_pair().unwrap(),
618 );
619
620 let maybe_author: Option<Address> =
621 conf.raw_conf.mining_author.as_ref().map(|addr_str| {
622 parse_config_address_string(addr_str, network.get_network_type())
623 .unwrap_or_else(|err| {
624 panic!("Error parsing mining-author {}", err)
625 })
626 });
627 let pow_config = conf.pow_config();
628 let blockgen = Arc::new(BlockGenerator::new(
629 sync_graph,
630 txpool.clone(),
631 sync.clone(),
632 maybe_txgen.clone(),
633 pow_config.clone(),
634 pow.clone(),
635 maybe_author.clone().unwrap_or_default(),
636 pos_verifier.clone(),
637 ));
638 if conf.is_dev_mode() {
639 if let Some(interval_ms) = conf.raw_conf.dev_block_interval_ms {
642 let bg = blockgen.test_api();
644 info!("Start auto block generation");
645 thread::Builder::new()
646 .name("auto_mining".into())
647 .spawn(move || {
648 bg.auto_block_generation(interval_ms);
649 })
650 .expect("Mining thread spawn error");
651 }
652 } else if let Some(author) = maybe_author {
653 if !author.is_genesis_valid_address() || author.is_builtin_address() {
654 panic!("mining-author must be user address or contract address, otherwise you will not get mining rewards!!!");
655 }
656 if pow_config.enable_mining() {
657 let bg = blockgen.clone();
658 thread::Builder::new()
659 .name("mining".into())
660 .spawn(move || {
661 bg.mine();
662 })
663 .expect("Mining thread spawn error");
664 }
665 }
666
667 let rpc_impl = Arc::new(RpcImpl::new(
668 consensus.clone(),
669 sync.clone(),
670 blockgen.test_api(),
671 txpool.clone(),
672 maybe_txgen.clone(),
673 maybe_direct_txgen,
674 conf.rpc_impl_config(),
675 accounts,
676 ));
677
678 let debug_rpc_http_server = super::rpc::start_http(
679 conf.local_http_config(),
680 setup_debug_rpc_apis(
681 common_impl.clone(),
682 rpc_impl.clone(),
683 pubsub.clone(),
684 &conf,
685 ),
686 )?;
687
688 let debug_rpc_tcp_server = super::rpc::start_tcp(
689 conf.local_tcp_config(),
690 setup_debug_rpc_apis(
691 common_impl.clone(),
692 rpc_impl.clone(),
693 pubsub.clone(),
694 &conf,
695 ),
696 RpcExtractor,
697 )?;
698
699 let rpc_tcp_server = super::rpc::start_tcp(
700 conf.tcp_config(),
701 setup_public_rpc_apis(
702 common_impl.clone(),
703 rpc_impl.clone(),
704 pubsub.clone(),
705 &conf,
706 ),
707 RpcExtractor,
708 )?;
709
710 let debug_rpc_ws_server = super::rpc::start_ws(
711 conf.local_ws_config(),
712 setup_public_rpc_apis(
713 common_impl.clone(),
714 rpc_impl.clone(),
715 pubsub.clone(),
716 &conf,
717 ),
718 RpcExtractor,
719 )?;
720
721 let rpc_ws_server = super::rpc::start_ws(
722 conf.ws_config(),
723 setup_public_rpc_apis(
724 common_impl.clone(),
725 rpc_impl.clone(),
726 pubsub.clone(),
727 &conf,
728 ),
729 RpcExtractor,
730 )?;
731
732 let rpc_http_server = super::rpc::start_http(
733 conf.http_config(),
734 setup_public_rpc_apis(common_impl, rpc_impl, pubsub, &conf),
735 )?;
736
737 let task_manager = TaskManager::new(tokio_runtime.handle().clone());
738 let task_executor = task_manager.executor();
739
740 let eth_rpc_server_handle =
741 tokio_runtime.block_on(launch_async_rpc_servers(
742 consensus.clone(),
743 sync.clone(),
744 txpool.clone(),
745 notifications.clone(),
746 task_executor.clone(),
747 conf,
748 ))?;
749
750 #[cfg(all(unix, feature = "jemalloc-prof"))]
753 if let Some(pprf_addr) = conf.raw_conf.profiling_listen_addr.as_ref() {
754 let pprf_addr = pprf_addr.clone();
755 let _pprf_server_handle = tokio_runtime.spawn(async move {
756 if let Err(e) = start_pprf_server(&pprf_addr).await {
757 eprintln!("Error starting pprof server: {}", e);
758 }
759 });
760 }
761
762 metrics::initialize(conf.metrics_config(), task_executor.clone());
763
764 network.start();
765
766 Ok((
767 data_man,
768 pow,
769 txpool,
770 consensus,
771 sync,
772 blockgen,
773 debug_rpc_http_server,
774 rpc_http_server,
775 debug_rpc_tcp_server,
776 rpc_tcp_server,
777 debug_rpc_ws_server,
778 rpc_ws_server,
779 pos_verifier,
780 tokio_runtime,
781 eth_rpc_server_handle,
782 task_manager,
783 ))
784}
785
786pub fn initialize_txgens(
787 consensus: Arc<ConsensusGraph>, txpool: Arc<TransactionPool>,
788 sync: Arc<SynchronizationService>, secret_store: SharedSecretStore,
789 genesis_accounts: HashMap<Address, U256>, conf: &Configuration,
790 network_key_pair: KeyPair,
791) -> (
792 Option<Arc<TransactionGenerator>>,
793 Option<Arc<Mutex<DirectTransactionGenerator>>>,
794) {
795 let maybe_direct_txgen_with_contract = if conf.is_test_or_dev_mode() {
798 Some(Arc::new(Mutex::new(DirectTransactionGenerator::new(
799 network_key_pair,
800 &public_to_address(DEV_GENESIS_KEY_PAIR_2.public(), true),
801 U256::from_dec_str("10000000000000000").unwrap(),
802 U256::from_dec_str("10000000000000000").unwrap(),
803 ))))
804 } else {
805 None
806 };
807
808 let maybe_multi_genesis_txgen = if let Some(txgen_conf) =
811 conf.tx_gen_config()
812 {
813 let multi_genesis_txgen = Arc::new(TransactionGenerator::new(
814 consensus.clone(),
815 txpool.clone(),
816 sync.clone(),
817 secret_store.clone(),
818 ));
819 if txgen_conf.generate_tx {
820 let txgen_clone = multi_genesis_txgen.clone();
821 let join_handle =
822 thread::Builder::new()
823 .name("txgen".into())
824 .spawn(move || {
825 TransactionGenerator::generate_transactions_with_multiple_genesis_accounts(
826 txgen_clone,
827 txgen_conf,
828 genesis_accounts,
829 );
830 })
831 .expect("should succeed");
832 multi_genesis_txgen.set_join_handle(join_handle);
833 }
834 Some(multi_genesis_txgen)
835 } else {
836 None
837 };
838
839 (maybe_multi_genesis_txgen, maybe_direct_txgen_with_contract)
840}