client/common/
mod.rs

1// Copyright 2020 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use log::{debug, info};
6use std::{
7    collections::HashMap,
8    fs::create_dir_all,
9    path::Path,
10    sync::{Arc, Weak},
11    thread,
12    time::{Duration, Instant},
13};
14
15use cfx_rpc_builder::RpcServerHandle;
16use cfx_util_macros::bail;
17use jsonrpc_http_server::Server as HttpServer;
18use jsonrpc_tcp_server::Server as TcpServer;
19use jsonrpc_ws_server::Server as WSServer;
20use parking_lot::{Condvar, Mutex};
21use rand_08::{prelude::StdRng, rngs::OsRng, SeedableRng};
22use threadpool::ThreadPool;
23
24use blockgen::BlockGenerator;
25use cfx_executor::machine::{Machine, VmFactory};
26use cfx_parameters::genesis::{
27    DEV_GENESIS_KEY_PAIR_2, GENESIS_ACCOUNT_ADDRESS,
28};
29use cfx_storage::StorageManager;
30use cfx_tasks::TaskManager;
31use cfx_types::{address_util::AddressUtil, Address, Space, U256};
32pub use cfxcore::pos::pos::PosDropHandle;
33use cfxcore::{
34    block_data_manager::BlockDataManager,
35    consensus::{
36        pivot_hint::PivotHint,
37        pos_handler::{PosConfiguration, PosVerifier},
38    },
39    genesis_block::{self as genesis, genesis_block},
40    pow::PowComputer,
41    statistics::Statistics,
42    sync::SyncPhaseType,
43    ConsensusGraph, LightProvider, NodeType, Notifications, Stopable,
44    SynchronizationGraph, SynchronizationService, TransactionPool,
45    WORKER_COMPUTATION_PARALLELISM,
46};
47use cfxcore_accounts::AccountProvider;
48use cfxkey::public_to_address;
49use diem_config::keys::ConfigKey;
50use diem_crypto::{
51    key_file::{load_pri_key, save_pri_key},
52    PrivateKey, Uniform,
53};
54use diem_types::validator_config::{
55    ConsensusPrivateKey, ConsensusVRFPrivateKey,
56};
57use malloc_size_of::{new_malloc_size_ops, MallocSizeOf, MallocSizeOfOps};
58use network::NetworkService;
59use secret_store::{SecretStore, SharedSecretStore};
60use tokio::runtime::Runtime as TokioRuntime;
61use txgen::{DirectTransactionGenerator, TransactionGenerator};
62
63use cfx_config::{parse_config_address_string, Configuration};
64
65use crate::{
66    accounts::{account_provider, keys_path},
67    keylib::KeyPair,
68    rpc::{
69        extractor::RpcExtractor,
70        impls::{
71            cfx::RpcImpl, common::RpcImpl as CommonRpcImpl,
72            pubsub::PubSubClient,
73        },
74        launch_async_rpc_servers, setup_debug_rpc_apis, setup_public_rpc_apis,
75    },
76};
77#[cfg(all(unix, feature = "jemalloc-prof"))]
78use cfx_mallocator_utils::start_pprf_server;
79use cfxcore::consensus::pos_handler::read_initial_nodes_from_file;
80
81pub mod delegate_convert;
82pub mod shutdown_handler;
83
84/// Hold all top-level components for a type of client.
85/// This struct implement ClientShutdownTrait.
86pub struct ClientComponents<BlockGenT, Rest> {
87    pub data_manager_weak_ptr: Weak<BlockDataManager>,
88    pub blockgen: Option<Arc<BlockGenT>>,
89    pub pos_handler: Option<Arc<PosVerifier>>,
90    pub other_components: Rest,
91}
92
93impl<BlockGenT, Rest: MallocSizeOf> MallocSizeOf
94    for ClientComponents<BlockGenT, Rest>
95{
96    fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
97        if let Some(data_man) = self.data_manager_weak_ptr.upgrade() {
98            let data_manager_size = data_man.size_of(ops);
99            data_manager_size + self.other_components.size_of(ops)
100        } else {
101            // If data_man is `None`, we will be just shutting down (dropping
102            // components) so we don't care about the size
103            0
104        }
105    }
106}
107
108impl<BlockGenT: 'static + Stopable, Rest> ClientTrait
109    for ClientComponents<BlockGenT, Rest>
110{
111    fn take_out_components_for_shutdown(
112        &self,
113    ) -> (
114        Weak<BlockDataManager>,
115        Option<Arc<PosVerifier>>,
116        Option<Arc<dyn Stopable>>,
117    ) {
118        debug!("take_out_components_for_shutdown");
119        let data_manager_weak_ptr = self.data_manager_weak_ptr.clone();
120        let blockgen: Option<Arc<dyn Stopable>> = match self.blockgen.clone() {
121            Some(blockgen) => Some(blockgen),
122            None => None,
123        };
124
125        (data_manager_weak_ptr, self.pos_handler.clone(), blockgen)
126    }
127}
128
129pub trait ClientTrait {
130    fn take_out_components_for_shutdown(
131        &self,
132    ) -> (
133        Weak<BlockDataManager>,
134        Option<Arc<PosVerifier>>,
135        Option<Arc<dyn Stopable>>,
136    );
137}
138
139pub fn initialize_common_modules(
140    conf: &mut Configuration, exit: Arc<(Mutex<bool>, Condvar)>,
141    node_type: NodeType,
142) -> Result<
143    (
144        Arc<Machine>,
145        Arc<SecretStore>,
146        HashMap<Address, U256>,
147        Arc<BlockDataManager>,
148        Arc<PowComputer>,
149        Arc<PosVerifier>,
150        Arc<TransactionPool>,
151        Arc<ConsensusGraph>,
152        Arc<SynchronizationGraph>,
153        Arc<NetworkService>,
154        Arc<CommonRpcImpl>,
155        Arc<AccountProvider>,
156        Arc<Notifications>,
157        PubSubClient,
158        Arc<TokioRuntime>,
159    ),
160    String,
161> {
162    info!("Working directory: {:?}", std::env::current_dir());
163
164    // TODO(lpl): Keep it properly and allow not running pos.
165    let (self_pos_private_key, self_vrf_private_key) = {
166        let key_path = Path::new(&conf.raw_conf.pos_private_key_path);
167
168        let read_pos_password = |prompt: &str| -> Result<Vec<u8>, String> {
169            match rpassword::read_password_from_tty(Some(prompt)) {
170                Ok(password) => Ok(password.into_bytes()),
171                Err(e) => {
172                    let mut msg = format!("{:?}", e);
173                    // On macOS, attempting to open `/dev/tty` without a
174                    // controlling TTY can return ENXIO
175                    // ("Device not configured"). This commonly happens when
176                    // running under a debugger/IDE that doesn't allocate a real
177                    // terminal.
178                    if e.raw_os_error() == Some(6) {
179                        msg.push_str(" Hint: maybe no controlling TTY detected (macOS ENXIO: \"Device not configured\"). If you are running under VS Code debugger or with redirected stdio, set `CFX_POS_KEY_ENCRYPTION_PASSWORD` env var to avoid interactive prompting.");
180                    }
181                    Err(msg)
182                }
183            }
184        };
185
186        let default_passwd = if conf.is_test_or_dev_mode() {
187            Some(vec![])
188        } else {
189            conf.raw_conf
190                .dev_pos_private_key_encryption_password
191                .clone()
192                // If the password is not set in the config file, read it from
193                // the environment variable.
194                .or(std::env::var("CFX_POS_KEY_ENCRYPTION_PASSWORD").ok())
195                .map(|s| s.into_bytes())
196        };
197        if key_path.exists() {
198            let passwd = match default_passwd {
199                Some(p) => p,
200                None => read_pos_password(
201                    "PoS key detected, please input your encryption password.\nPassword:",
202                )?,
203            };
204            match load_pri_key(key_path, &passwd) {
205                Ok((sk, vrf_sk)) => {
206                    (ConfigKey::new(sk), ConfigKey::new(vrf_sk))
207                }
208                Err(e) => {
209                    bail!("Load pos_key failed: {}", e);
210                }
211            }
212        } else {
213            create_dir_all(key_path.parent().unwrap()).unwrap();
214            let passwd = match default_passwd {
215                Some(p) => p,
216                None => {
217                    let p = read_pos_password("PoS key is not detected and will be generated instead, please input your encryption password. This password is needed when you restart the node\nPassword:")?;
218                    let p2 = read_pos_password("Repeat Password:")?;
219                    if p != p2 {
220                        bail!("Passwords do not match!");
221                    }
222                    p
223                }
224            };
225            let mut rng = StdRng::from_rng(OsRng).unwrap();
226            let private_key = ConsensusPrivateKey::generate(&mut rng);
227            let vrf_private_key = ConsensusVRFPrivateKey::generate(&mut rng);
228            save_pri_key(key_path, &passwd, &(&private_key, &vrf_private_key))
229                .expect("error saving private key");
230            (ConfigKey::new(private_key), ConfigKey::new(vrf_private_key))
231        }
232    };
233
234    let worker_thread_pool = Arc::new(Mutex::new(ThreadPool::with_name(
235        "Tx Recover".into(),
236        WORKER_COMPUTATION_PARALLELISM,
237    )));
238
239    let network_config = conf.net_config()?;
240    let cache_config = conf.cache_config();
241
242    let (db_path, db_config) = conf.db_config();
243    let ledger_db = db::open_database(db_path.to_str().unwrap(), &db_config)
244        .map_err(|e| format!("Failed to open database {:?}", e))?;
245
246    let secret_store = Arc::new(SecretStore::new());
247    let storage_manager = Arc::new(
248        StorageManager::new(conf.storage_config(&node_type))
249            .expect("Failed to initialize storage."),
250    );
251    {
252        let storage_manager_log_weak_ptr = Arc::downgrade(&storage_manager);
253        let exit_clone = exit.clone();
254        thread::spawn(move || loop {
255            let mut exit_lock = exit_clone.0.lock();
256            if exit_clone
257                .1
258                .wait_for(&mut exit_lock, Duration::from_millis(5000))
259                .timed_out()
260            {
261                let manager = storage_manager_log_weak_ptr.upgrade();
262                match manager {
263                    None => return,
264                    Some(manager) => manager.log_usage(),
265                };
266            } else {
267                return;
268            }
269        });
270    }
271
272    let genesis_accounts = if conf.is_test_or_dev_mode() {
273        match (
274            &conf.raw_conf.genesis_secrets,
275            &conf.raw_conf.genesis_evm_secrets,
276        ) {
277            (Some(file), evm_file) => {
278                // Load core space accounts
279                let mut accounts = genesis::load_secrets_file(
280                    file,
281                    secret_store.as_ref(),
282                    Space::Native,
283                )?;
284
285                // Load EVM space accounts if specified
286                if let Some(evm_file) = evm_file {
287                    let evm_accounts = genesis::load_secrets_file(
288                        evm_file,
289                        secret_store.as_ref(),
290                        Space::Ethereum,
291                    )?;
292                    accounts.extend(evm_accounts);
293                }
294                accounts
295            }
296            (None, Some(evm_file)) => {
297                // Only load EVM space accounts
298                genesis::load_secrets_file(
299                    evm_file,
300                    secret_store.as_ref(),
301                    Space::Ethereum,
302                )?
303            }
304            (None, None) => genesis::default(conf.is_test_or_dev_mode()),
305        }
306    } else {
307        match conf.raw_conf.genesis_accounts {
308            Some(ref file) => genesis::load_file(file, |addr_str| {
309                parse_config_address_string(
310                    addr_str,
311                    network_config.get_network_type(),
312                )
313            })?,
314            None => genesis::default(conf.is_test_or_dev_mode()),
315        }
316    };
317
318    // Only try to setup PoW genesis block if pos is enabled from genesis.
319    let initial_nodes = if conf.raw_conf.pos_reference_enable_height == 0 {
320        Some(
321            read_initial_nodes_from_file(
322                conf.raw_conf.pos_initial_nodes_path.as_str(),
323            )
324            .expect("Genesis must have been initialized with pos"),
325        )
326    } else {
327        None
328    };
329
330    let consensus_conf = conf.consensus_config();
331    let vm = VmFactory::new(1024 * 32);
332    let machine = Arc::new(Machine::new_with_builtin(conf.common_params(), vm));
333
334    let genesis_block = genesis_block(
335        &storage_manager,
336        genesis_accounts.clone(),
337        GENESIS_ACCOUNT_ADDRESS,
338        U256::zero(),
339        machine.clone(),
340        conf.raw_conf.execute_genesis, /* need_to_execute */
341        conf.raw_conf.chain_id,
342        &initial_nodes,
343    );
344    storage_manager.notify_genesis_hash(genesis_block.hash());
345    let mut genesis_accounts = genesis_accounts;
346    let genesis_accounts = genesis_accounts
347        .drain()
348        .filter(|(addr, _)| addr.space == Space::Native)
349        .map(|(addr, x)| (addr.address, x))
350        .collect();
351    debug!("Initialize genesis_block={:?}", genesis_block);
352    if conf.raw_conf.pos_genesis_pivot_decision.is_none() {
353        conf.raw_conf.pos_genesis_pivot_decision = Some(genesis_block.hash());
354    }
355
356    let pow_config = conf.pow_config();
357    let pow = Arc::new(PowComputer::new(pow_config.use_octopus()));
358
359    let data_man = Arc::new(BlockDataManager::new(
360        cache_config,
361        Arc::new(genesis_block),
362        ledger_db.clone(),
363        storage_manager,
364        worker_thread_pool,
365        conf.data_mananger_config(),
366        pow.clone(),
367    ));
368
369    let network = {
370        let mut rng = StdRng::from_rng(OsRng).unwrap();
371        let private_key = ConsensusPrivateKey::generate(&mut rng);
372        let vrf_private_key = ConsensusVRFPrivateKey::generate(&mut rng);
373        let mut network = NetworkService::new(network_config.clone());
374        network
375            .initialize((
376                private_key.public_key(),
377                vrf_private_key.public_key(),
378            ))
379            .unwrap();
380        Arc::new(network)
381    };
382
383    let pos_verifier = Arc::new(PosVerifier::new(
384        Some(network.clone()),
385        PosConfiguration {
386            bls_key: self_pos_private_key,
387            vrf_key: self_vrf_private_key,
388            diem_conf_path: conf.raw_conf.pos_config_path.clone(),
389            protocol_conf: conf.protocol_config(),
390            pos_initial_nodes_path: conf
391                .raw_conf
392                .pos_initial_nodes_path
393                .clone(),
394            vrf_proposal_threshold: conf.raw_conf.vrf_proposal_threshold,
395            pos_state_config: conf.pos_state_config(),
396        },
397        conf.raw_conf.pos_reference_enable_height,
398    ));
399    let verification_config = conf.verification_config(machine.clone());
400    let txpool = Arc::new(TransactionPool::new(
401        conf.txpool_config(),
402        verification_config.clone(),
403        data_man.clone(),
404        machine.clone(),
405    ));
406
407    let statistics = Arc::new(Statistics::new());
408    let notifications = Notifications::init();
409    let pivot_hint = if let Some(conf) = &consensus_conf.pivot_hint_conf {
410        Some(Arc::new(PivotHint::new(conf)?))
411    } else {
412        None
413    };
414
415    let consensus = Arc::new(ConsensusGraph::new(
416        consensus_conf,
417        txpool.clone(),
418        statistics.clone(),
419        data_man.clone(),
420        pow_config.clone(),
421        pow.clone(),
422        notifications.clone(),
423        conf.execution_config(),
424        verification_config.clone(),
425        node_type,
426        pos_verifier.clone(),
427        pivot_hint,
428        conf.common_params(),
429    ));
430
431    for terminal in data_man
432        .terminals_from_db()
433        .unwrap_or(vec![data_man.get_cur_consensus_era_genesis_hash()])
434    {
435        if data_man.block_height_by_hash(&terminal).unwrap()
436            >= conf.raw_conf.pos_reference_enable_height
437        {
438            pos_verifier.initialize(consensus.clone())?;
439            break;
440        }
441    }
442
443    let sync_config = conf.sync_graph_config();
444
445    let sync_graph = Arc::new(SynchronizationGraph::new(
446        consensus.clone(),
447        data_man.clone(),
448        statistics.clone(),
449        verification_config,
450        pow_config,
451        pow.clone(),
452        sync_config,
453        notifications.clone(),
454        machine.clone(),
455        pos_verifier.clone(),
456    ));
457    let refresh_time =
458        Duration::from_millis(conf.raw_conf.account_provider_refresh_time_ms);
459
460    let accounts = Arc::new(
461        account_provider(
462            Some(keys_path()),
463            None, /* sstore_iterations */
464            Some(refresh_time),
465        )
466        .expect("failed to initialize account provider"),
467    );
468
469    let common_impl = Arc::new(CommonRpcImpl::new(
470        exit,
471        consensus.clone(),
472        network.clone(),
473        txpool.clone(),
474        accounts.clone(),
475        pos_verifier.clone(),
476    ));
477    let tokio_runtime =
478        Arc::new(TokioRuntime::new().map_err(|e| e.to_string())?);
479
480    let pubsub = PubSubClient::new(
481        tokio_runtime.clone(),
482        consensus.clone(),
483        notifications.clone(),
484        *network.get_network_type(),
485    );
486
487    Ok((
488        machine,
489        secret_store,
490        genesis_accounts,
491        data_man,
492        pow,
493        pos_verifier,
494        txpool,
495        consensus,
496        sync_graph,
497        network,
498        common_impl,
499        accounts,
500        notifications,
501        pubsub,
502        tokio_runtime,
503    ))
504}
505
506pub fn initialize_not_light_node_modules(
507    conf: &mut Configuration, exit: Arc<(Mutex<bool>, Condvar)>,
508    node_type: NodeType,
509) -> Result<
510    (
511        Arc<BlockDataManager>,
512        Arc<PowComputer>,
513        Arc<TransactionPool>,
514        Arc<ConsensusGraph>,
515        Arc<SynchronizationService>,
516        Arc<BlockGenerator>,
517        Option<HttpServer>,
518        Option<HttpServer>,
519        Option<TcpServer>,
520        Option<TcpServer>,
521        Option<WSServer>,
522        Option<WSServer>,
523        Arc<PosVerifier>,
524        Arc<TokioRuntime>,
525        Option<RpcServerHandle>,
526        TaskManager,
527    ),
528    String,
529> {
530    let (
531        _machine,
532        secret_store,
533        genesis_accounts,
534        data_man,
535        pow,
536        pos_verifier,
537        txpool,
538        consensus,
539        sync_graph,
540        network,
541        common_impl,
542        accounts,
543        notifications,
544        pubsub,
545        tokio_runtime,
546    ) = initialize_common_modules(conf, exit.clone(), node_type)?;
547
548    let light_provider = Arc::new(LightProvider::new(
549        consensus.clone(),
550        sync_graph.clone(),
551        Arc::downgrade(&network),
552        txpool.clone(),
553        conf.raw_conf.throttling_conf.clone(),
554        node_type,
555    ));
556    light_provider.register(network.clone()).unwrap();
557
558    let sync = Arc::new(SynchronizationService::new(
559        node_type,
560        network.clone(),
561        sync_graph.clone(),
562        conf.protocol_config(),
563        conf.state_sync_config(),
564        SyncPhaseType::CatchUpRecoverBlockHeaderFromDB,
565        light_provider,
566        consensus.clone(),
567    ));
568    sync.register().unwrap();
569
570    if let Some(print_memory_usage_period_s) =
571        conf.raw_conf.print_memory_usage_period_s
572    {
573        let secret_store = secret_store.clone();
574        let data_man = data_man.clone();
575        let txpool = txpool.clone();
576        let consensus = consensus.clone();
577        let sync = sync.clone();
578        thread::Builder::new().name("MallocSizeOf".into()).spawn(
579            move || loop {
580                let start = Instant::now();
581                let mb = 1_000_000;
582                let mut ops = new_malloc_size_ops();
583                let secret_store_size = secret_store.size_of(&mut ops) / mb;
584                // Note `db_manager` is not wrapped in Arc, so it will still be included
585                // in `data_man_size`.
586                let data_manager_db_cache_size = data_man.db_manager.size_of(&mut ops) / mb;
587                let storage_manager_size = data_man.storage_manager.size_of(&mut ops) / mb;
588                let data_man_size = data_man.size_of(&mut ops) / mb;
589                let tx_pool_size = txpool.size_of(&mut ops) / mb;
590                let consensus_graph_size = consensus.size_of(&mut ops) / mb;
591                let sync_graph_size =
592                    sync.get_synchronization_graph().size_of(&mut ops) / mb;
593                let sync_service_size = sync.size_of(&mut ops) / mb;
594                info!(
595                    "Malloc Size(MB): secret_store={} data_manager_db_cache_size={} \
596                    storage_manager_size={} data_man={} txpool={} consensus={} sync_graph={}\
597                    sync_service={}, \
598                    time elapsed={:?}",
599                    secret_store_size,data_manager_db_cache_size,storage_manager_size,
600                    data_man_size, tx_pool_size, consensus_graph_size, sync_graph_size,
601                    sync_service_size, start.elapsed(),
602                );
603                thread::sleep(Duration::from_secs(
604                    print_memory_usage_period_s,
605                ));
606            },
607        ).expect("Memory usage thread start fails");
608    }
609
610    let (maybe_txgen, maybe_direct_txgen) = initialize_txgens(
611        consensus.clone(),
612        txpool.clone(),
613        sync.clone(),
614        secret_store.clone(),
615        genesis_accounts,
616        &conf,
617        network.net_key_pair().unwrap(),
618    );
619
620    let maybe_author: Option<Address> =
621        conf.raw_conf.mining_author.as_ref().map(|addr_str| {
622            parse_config_address_string(addr_str, network.get_network_type())
623                .unwrap_or_else(|err| {
624                    panic!("Error parsing mining-author {}", err)
625                })
626        });
627    let pow_config = conf.pow_config();
628    let blockgen = Arc::new(BlockGenerator::new(
629        sync_graph,
630        txpool.clone(),
631        sync.clone(),
632        maybe_txgen.clone(),
633        pow_config.clone(),
634        pow.clone(),
635        maybe_author.clone().unwrap_or_default(),
636        pos_verifier.clone(),
637    ));
638    if conf.is_dev_mode() {
639        // If `dev_block_interval_ms` is None, blocks are generated after
640        // receiving RPC `cfx_sendRawTransaction`.
641        if let Some(interval_ms) = conf.raw_conf.dev_block_interval_ms {
642            // Automatic block generation with fixed interval.
643            let bg = blockgen.test_api();
644            info!("Start auto block generation");
645            thread::Builder::new()
646                .name("auto_mining".into())
647                .spawn(move || {
648                    bg.auto_block_generation(interval_ms);
649                })
650                .expect("Mining thread spawn error");
651        }
652    } else if let Some(author) = maybe_author {
653        if !author.is_genesis_valid_address() || author.is_builtin_address() {
654            panic!("mining-author must be user address or contract address, otherwise you will not get mining rewards!!!");
655        }
656        if pow_config.enable_mining() {
657            let bg = blockgen.clone();
658            thread::Builder::new()
659                .name("mining".into())
660                .spawn(move || {
661                    bg.mine();
662                })
663                .expect("Mining thread spawn error");
664        }
665    }
666
667    let rpc_impl = Arc::new(RpcImpl::new(
668        consensus.clone(),
669        sync.clone(),
670        blockgen.test_api(),
671        txpool.clone(),
672        maybe_txgen.clone(),
673        maybe_direct_txgen,
674        conf.rpc_impl_config(),
675        accounts,
676    ));
677
678    let debug_rpc_http_server = super::rpc::start_http(
679        conf.local_http_config(),
680        setup_debug_rpc_apis(
681            common_impl.clone(),
682            rpc_impl.clone(),
683            pubsub.clone(),
684            &conf,
685        ),
686    )?;
687
688    let debug_rpc_tcp_server = super::rpc::start_tcp(
689        conf.local_tcp_config(),
690        setup_debug_rpc_apis(
691            common_impl.clone(),
692            rpc_impl.clone(),
693            pubsub.clone(),
694            &conf,
695        ),
696        RpcExtractor,
697    )?;
698
699    let rpc_tcp_server = super::rpc::start_tcp(
700        conf.tcp_config(),
701        setup_public_rpc_apis(
702            common_impl.clone(),
703            rpc_impl.clone(),
704            pubsub.clone(),
705            &conf,
706        ),
707        RpcExtractor,
708    )?;
709
710    let debug_rpc_ws_server = super::rpc::start_ws(
711        conf.local_ws_config(),
712        setup_public_rpc_apis(
713            common_impl.clone(),
714            rpc_impl.clone(),
715            pubsub.clone(),
716            &conf,
717        ),
718        RpcExtractor,
719    )?;
720
721    let rpc_ws_server = super::rpc::start_ws(
722        conf.ws_config(),
723        setup_public_rpc_apis(
724            common_impl.clone(),
725            rpc_impl.clone(),
726            pubsub.clone(),
727            &conf,
728        ),
729        RpcExtractor,
730    )?;
731
732    let rpc_http_server = super::rpc::start_http(
733        conf.http_config(),
734        setup_public_rpc_apis(common_impl, rpc_impl, pubsub, &conf),
735    )?;
736
737    let task_manager = TaskManager::new(tokio_runtime.handle().clone());
738    let task_executor = task_manager.executor();
739
740    let eth_rpc_server_handle =
741        tokio_runtime.block_on(launch_async_rpc_servers(
742            consensus.clone(),
743            sync.clone(),
744            txpool.clone(),
745            notifications.clone(),
746            task_executor.clone(),
747            conf,
748        ))?;
749
750    // start pprf server, which is used to serve the pprof data for heap
751    // profiling
752    #[cfg(all(unix, feature = "jemalloc-prof"))]
753    if let Some(pprf_addr) = conf.raw_conf.profiling_listen_addr.as_ref() {
754        let pprf_addr = pprf_addr.clone();
755        let _pprf_server_handle = tokio_runtime.spawn(async move {
756            if let Err(e) = start_pprf_server(&pprf_addr).await {
757                eprintln!("Error starting pprof server: {}", e);
758            }
759        });
760    }
761
762    metrics::initialize(conf.metrics_config(), task_executor.clone());
763
764    network.start();
765
766    Ok((
767        data_man,
768        pow,
769        txpool,
770        consensus,
771        sync,
772        blockgen,
773        debug_rpc_http_server,
774        rpc_http_server,
775        debug_rpc_tcp_server,
776        rpc_tcp_server,
777        debug_rpc_ws_server,
778        rpc_ws_server,
779        pos_verifier,
780        tokio_runtime,
781        eth_rpc_server_handle,
782        task_manager,
783    ))
784}
785
786pub fn initialize_txgens(
787    consensus: Arc<ConsensusGraph>, txpool: Arc<TransactionPool>,
788    sync: Arc<SynchronizationService>, secret_store: SharedSecretStore,
789    genesis_accounts: HashMap<Address, U256>, conf: &Configuration,
790    network_key_pair: KeyPair,
791) -> (
792    Option<Arc<TransactionGenerator>>,
793    Option<Arc<Mutex<DirectTransactionGenerator>>>,
794) {
795    // This tx generator directly push simple transactions and erc20
796    // transactions into blocks.
797    let maybe_direct_txgen_with_contract = if conf.is_test_or_dev_mode() {
798        Some(Arc::new(Mutex::new(DirectTransactionGenerator::new(
799            network_key_pair,
800            &public_to_address(DEV_GENESIS_KEY_PAIR_2.public(), true),
801            U256::from_dec_str("10000000000000000").unwrap(),
802            U256::from_dec_str("10000000000000000").unwrap(),
803        ))))
804    } else {
805        None
806    };
807
808    // This tx generator generates transactions from preconfigured multiple
809    // genesis accounts and it pushes transactions into transaction pool.
810    let maybe_multi_genesis_txgen = if let Some(txgen_conf) =
811        conf.tx_gen_config()
812    {
813        let multi_genesis_txgen = Arc::new(TransactionGenerator::new(
814            consensus.clone(),
815            txpool.clone(),
816            sync.clone(),
817            secret_store.clone(),
818        ));
819        if txgen_conf.generate_tx {
820            let txgen_clone = multi_genesis_txgen.clone();
821            let join_handle =
822                thread::Builder::new()
823                    .name("txgen".into())
824                    .spawn(move || {
825                        TransactionGenerator::generate_transactions_with_multiple_genesis_accounts(
826                            txgen_clone,
827                            txgen_conf,
828                            genesis_accounts,
829                        );
830                    })
831                    .expect("should succeed");
832            multi_genesis_txgen.set_join_handle(join_handle);
833        }
834        Some(multi_genesis_txgen)
835    } else {
836        None
837    };
838
839    (maybe_multi_genesis_txgen, maybe_direct_txgen_with_contract)
840}