client/common/
mod.rs

1// Copyright 2020 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use log::{debug, info};
6use std::{
7    collections::HashMap,
8    fs::create_dir_all,
9    path::Path,
10    sync::{Arc, Weak},
11    thread,
12    time::{Duration, Instant},
13};
14
15use cfx_rpc_builder::RpcServerHandle;
16use cfx_util_macros::bail;
17use jsonrpc_http_server::Server as HttpServer;
18use jsonrpc_tcp_server::Server as TcpServer;
19use jsonrpc_ws_server::Server as WSServer;
20use parking_lot::{Condvar, Mutex};
21use rand_08::{prelude::StdRng, rngs::OsRng, SeedableRng};
22use threadpool::ThreadPool;
23
24use blockgen::BlockGenerator;
25use cfx_executor::machine::{Machine, VmFactory};
26use cfx_parameters::genesis::{
27    DEV_GENESIS_KEY_PAIR_2, GENESIS_ACCOUNT_ADDRESS,
28};
29use cfx_rpc_cfx_types::apis::ApiSet;
30use cfx_storage::StorageManager;
31use cfx_tasks::TaskManager;
32use cfx_types::{address_util::AddressUtil, Address, Space, U256};
33pub use cfxcore::pos::pos::PosDropHandle;
34use cfxcore::{
35    block_data_manager::BlockDataManager,
36    consensus::{
37        pivot_hint::PivotHint,
38        pos_handler::{PosConfiguration, PosVerifier},
39    },
40    genesis_block::{self as genesis, genesis_block},
41    pow::PowComputer,
42    statistics::Statistics,
43    sync::SyncPhaseType,
44    ConsensusGraph, LightProvider, NodeType, Notifications, Stopable,
45    SynchronizationGraph, SynchronizationService, TransactionPool,
46    WORKER_COMPUTATION_PARALLELISM,
47};
48use cfxcore_accounts::AccountProvider;
49use cfxkey::public_to_address;
50use diem_config::keys::ConfigKey;
51use diem_crypto::{
52    key_file::{load_pri_key, save_pri_key},
53    PrivateKey, Uniform,
54};
55use diem_types::validator_config::{
56    ConsensusPrivateKey, ConsensusVRFPrivateKey,
57};
58use malloc_size_of::{new_malloc_size_ops, MallocSizeOf, MallocSizeOfOps};
59use network::NetworkService;
60use secret_store::{SecretStore, SharedSecretStore};
61use tokio::runtime::Runtime as TokioRuntime;
62use txgen::{DirectTransactionGenerator, TransactionGenerator};
63
64use cfx_config::{parse_config_address_string, Configuration};
65
66use crate::{
67    accounts::{account_provider, keys_path},
68    keylib::KeyPair,
69    rpc::{
70        extractor::RpcExtractor,
71        impls::{
72            cfx::RpcImpl, common::RpcImpl as CommonRpcImpl,
73            pubsub::PubSubClient,
74        },
75        launch_async_rpc_servers, launch_cfx_async_rpc_servers,
76        setup_debug_rpc_apis, setup_public_rpc_apis,
77    },
78};
79#[cfg(all(unix, feature = "jemalloc-prof"))]
80use cfx_mallocator_utils::start_pprf_server;
81use cfxcore::consensus::pos_handler::read_initial_nodes_from_file;
82
83pub mod delegate_convert;
84pub mod panic_handler;
85pub mod shutdown_handler;
86
87/// Hold all top-level components for a type of client.
88/// This struct implement ClientShutdownTrait.
89pub struct ClientComponents<BlockGenT, Rest> {
90    pub data_manager_weak_ptr: Weak<BlockDataManager>,
91    pub blockgen: Option<Arc<BlockGenT>>,
92    pub pos_handler: Option<Arc<PosVerifier>>,
93    pub other_components: Rest,
94}
95
96impl<BlockGenT, Rest: MallocSizeOf> MallocSizeOf
97    for ClientComponents<BlockGenT, Rest>
98{
99    fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
100        if let Some(data_man) = self.data_manager_weak_ptr.upgrade() {
101            let data_manager_size = data_man.size_of(ops);
102            data_manager_size + self.other_components.size_of(ops)
103        } else {
104            // If data_man is `None`, we will be just shutting down (dropping
105            // components) so we don't care about the size
106            0
107        }
108    }
109}
110
111impl<BlockGenT: 'static + Stopable, Rest> ClientTrait
112    for ClientComponents<BlockGenT, Rest>
113{
114    fn take_out_components_for_shutdown(
115        &self,
116    ) -> (
117        Weak<BlockDataManager>,
118        Option<Arc<PosVerifier>>,
119        Option<Arc<dyn Stopable>>,
120    ) {
121        debug!("take_out_components_for_shutdown");
122        let data_manager_weak_ptr = self.data_manager_weak_ptr.clone();
123        let blockgen: Option<Arc<dyn Stopable>> = match self.blockgen.clone() {
124            Some(blockgen) => Some(blockgen),
125            None => None,
126        };
127
128        (data_manager_weak_ptr, self.pos_handler.clone(), blockgen)
129    }
130}
131
132pub trait ClientTrait {
133    fn take_out_components_for_shutdown(
134        &self,
135    ) -> (
136        Weak<BlockDataManager>,
137        Option<Arc<PosVerifier>>,
138        Option<Arc<dyn Stopable>>,
139    );
140}
141
142pub fn initialize_common_modules(
143    conf: &mut Configuration, exit: Arc<(Mutex<bool>, Condvar)>,
144    node_type: NodeType,
145) -> Result<
146    (
147        Arc<Machine>,
148        Arc<SecretStore>,
149        HashMap<Address, U256>,
150        Arc<BlockDataManager>,
151        Arc<PowComputer>,
152        Arc<PosVerifier>,
153        Arc<TransactionPool>,
154        Arc<ConsensusGraph>,
155        Arc<SynchronizationGraph>,
156        Arc<NetworkService>,
157        Arc<CommonRpcImpl>,
158        Arc<AccountProvider>,
159        Arc<Notifications>,
160        PubSubClient,
161        Arc<TokioRuntime>,
162    ),
163    String,
164> {
165    info!("Working directory: {:?}", std::env::current_dir());
166
167    // TODO(lpl): Keep it properly and allow not running pos.
168    let (self_pos_private_key, self_vrf_private_key) = {
169        let key_path = Path::new(&conf.raw_conf.pos_private_key_path);
170
171        let read_pos_password = |prompt: &str| -> Result<Vec<u8>, String> {
172            match rpassword::read_password_from_tty(Some(prompt)) {
173                Ok(password) => Ok(password.into_bytes()),
174                Err(e) => {
175                    let mut msg = format!("{:?}", e);
176                    // On macOS, attempting to open `/dev/tty` without a
177                    // controlling TTY can return ENXIO
178                    // ("Device not configured"). This commonly happens when
179                    // running under a debugger/IDE that doesn't allocate a real
180                    // terminal.
181                    if e.raw_os_error() == Some(6) {
182                        msg.push_str(" Hint: maybe no controlling TTY detected (macOS ENXIO: \"Device not configured\"). If you are running under VS Code debugger or with redirected stdio, set `CFX_POS_KEY_ENCRYPTION_PASSWORD` env var to avoid interactive prompting.");
183                    }
184                    Err(msg)
185                }
186            }
187        };
188
189        let default_passwd = if conf.is_test_or_dev_mode() {
190            Some(vec![])
191        } else {
192            conf.raw_conf
193                .dev_pos_private_key_encryption_password
194                .clone()
195                // If the password is not set in the config file, read it from
196                // the environment variable.
197                .or(std::env::var("CFX_POS_KEY_ENCRYPTION_PASSWORD").ok())
198                .map(|s| s.into_bytes())
199        };
200        if key_path.exists() {
201            let passwd = match default_passwd {
202                Some(p) => p,
203                None => read_pos_password(
204                    "PoS key detected, please input your encryption password.\nPassword:",
205                )?,
206            };
207            match load_pri_key(key_path, &passwd) {
208                Ok((sk, vrf_sk)) => {
209                    (ConfigKey::new(sk), ConfigKey::new(vrf_sk))
210                }
211                Err(e) => {
212                    bail!("Load pos_key failed: {}", e);
213                }
214            }
215        } else {
216            create_dir_all(key_path.parent().unwrap()).unwrap();
217            let passwd = match default_passwd {
218                Some(p) => p,
219                None => {
220                    let p = read_pos_password("PoS key is not detected and will be generated instead, please input your encryption password. This password is needed when you restart the node\nPassword:")?;
221                    let p2 = read_pos_password("Repeat Password:")?;
222                    if p != p2 {
223                        bail!("Passwords do not match!");
224                    }
225                    p
226                }
227            };
228            let mut rng = StdRng::from_rng(OsRng).unwrap();
229            let private_key = ConsensusPrivateKey::generate(&mut rng);
230            let vrf_private_key = ConsensusVRFPrivateKey::generate(&mut rng);
231            save_pri_key(key_path, &passwd, &(&private_key, &vrf_private_key))
232                .expect("error saving private key");
233            (ConfigKey::new(private_key), ConfigKey::new(vrf_private_key))
234        }
235    };
236
237    let worker_thread_pool = Arc::new(Mutex::new(ThreadPool::with_name(
238        "Tx Recover".into(),
239        WORKER_COMPUTATION_PARALLELISM,
240    )));
241
242    let network_config = conf.net_config()?;
243    let cache_config = conf.cache_config();
244
245    let (db_path, db_config) = conf.db_config();
246    let ledger_db = db::open_database(db_path.to_str().unwrap(), &db_config)
247        .map_err(|e| format!("Failed to open database {:?}", e))?;
248
249    let secret_store = Arc::new(SecretStore::new());
250    let storage_manager = Arc::new(
251        StorageManager::new(conf.storage_config(&node_type))
252            .expect("Failed to initialize storage."),
253    );
254    {
255        let storage_manager_log_weak_ptr = Arc::downgrade(&storage_manager);
256        let exit_clone = exit.clone();
257        thread::spawn(move || loop {
258            let mut exit_lock = exit_clone.0.lock();
259            if exit_clone
260                .1
261                .wait_for(&mut exit_lock, Duration::from_millis(5000))
262                .timed_out()
263            {
264                let manager = storage_manager_log_weak_ptr.upgrade();
265                match manager {
266                    None => return,
267                    Some(manager) => manager.log_usage(),
268                };
269            } else {
270                return;
271            }
272        });
273    }
274
275    let genesis_accounts = if conf.is_test_or_dev_mode() {
276        match (
277            &conf.raw_conf.genesis_secrets,
278            &conf.raw_conf.genesis_evm_secrets,
279        ) {
280            (Some(file), evm_file) => {
281                // Load core space accounts
282                let mut accounts = genesis::load_secrets_file(
283                    file,
284                    secret_store.as_ref(),
285                    Space::Native,
286                )?;
287
288                // Load EVM space accounts if specified
289                if let Some(evm_file) = evm_file {
290                    let evm_accounts = genesis::load_secrets_file(
291                        evm_file,
292                        secret_store.as_ref(),
293                        Space::Ethereum,
294                    )?;
295                    accounts.extend(evm_accounts);
296                }
297                accounts
298            }
299            (None, Some(evm_file)) => {
300                // Only load EVM space accounts
301                genesis::load_secrets_file(
302                    evm_file,
303                    secret_store.as_ref(),
304                    Space::Ethereum,
305                )?
306            }
307            (None, None) => genesis::default(conf.is_test_or_dev_mode()),
308        }
309    } else {
310        match conf.raw_conf.genesis_accounts {
311            Some(ref file) => genesis::load_file(file, |addr_str| {
312                parse_config_address_string(
313                    addr_str,
314                    network_config.get_network_type(),
315                )
316            })?,
317            None => genesis::default(conf.is_test_or_dev_mode()),
318        }
319    };
320
321    // Only try to setup PoW genesis block if pos is enabled from genesis.
322    let initial_nodes = if conf.raw_conf.pos_reference_enable_height == 0 {
323        Some(
324            read_initial_nodes_from_file(
325                conf.raw_conf.pos_initial_nodes_path.as_str(),
326            )
327            .expect("Genesis must have been initialized with pos"),
328        )
329    } else {
330        None
331    };
332
333    let consensus_conf = conf.consensus_config();
334    let vm = VmFactory::new(1024 * 32);
335    let machine = Arc::new(Machine::new_with_builtin(conf.common_params(), vm));
336
337    let genesis_block = genesis_block(
338        &storage_manager,
339        genesis_accounts.clone(),
340        GENESIS_ACCOUNT_ADDRESS,
341        U256::zero(),
342        machine.clone(),
343        conf.raw_conf.execute_genesis, /* need_to_execute */
344        conf.raw_conf.chain_id,
345        &initial_nodes,
346    );
347    storage_manager.notify_genesis_hash(genesis_block.hash());
348    let mut genesis_accounts = genesis_accounts;
349    let genesis_accounts = genesis_accounts
350        .drain()
351        .filter(|(addr, _)| addr.space == Space::Native)
352        .map(|(addr, x)| (addr.address, x))
353        .collect();
354    debug!("Initialize genesis_block={:?}", genesis_block);
355    if conf.raw_conf.pos_genesis_pivot_decision.is_none() {
356        conf.raw_conf.pos_genesis_pivot_decision = Some(genesis_block.hash());
357    }
358
359    let pow_config = conf.pow_config();
360    let pow = Arc::new(PowComputer::new(pow_config.use_octopus()));
361
362    let data_man = Arc::new(BlockDataManager::new(
363        cache_config,
364        Arc::new(genesis_block),
365        ledger_db.clone(),
366        storage_manager,
367        worker_thread_pool,
368        conf.data_mananger_config(),
369        pow.clone(),
370    ));
371
372    let network = {
373        let mut rng = StdRng::from_rng(OsRng).unwrap();
374        let private_key = ConsensusPrivateKey::generate(&mut rng);
375        let vrf_private_key = ConsensusVRFPrivateKey::generate(&mut rng);
376        let mut network = NetworkService::new(network_config.clone());
377        network
378            .initialize((
379                private_key.public_key(),
380                vrf_private_key.public_key(),
381            ))
382            .unwrap();
383        Arc::new(network)
384    };
385
386    let pos_verifier = Arc::new(PosVerifier::new(
387        Some(network.clone()),
388        PosConfiguration {
389            bls_key: self_pos_private_key,
390            vrf_key: self_vrf_private_key,
391            diem_conf_path: conf.raw_conf.pos_config_path.clone(),
392            protocol_conf: conf.protocol_config(),
393            pos_initial_nodes_path: conf
394                .raw_conf
395                .pos_initial_nodes_path
396                .clone(),
397            vrf_proposal_threshold: conf.raw_conf.vrf_proposal_threshold,
398            pos_state_config: conf.pos_state_config(),
399        },
400        conf.raw_conf.pos_reference_enable_height,
401    ));
402    let verification_config = conf.verification_config(machine.clone());
403    let txpool = Arc::new(TransactionPool::new(
404        conf.txpool_config(),
405        verification_config.clone(),
406        data_man.clone(),
407        machine.clone(),
408    ));
409
410    let statistics = Arc::new(Statistics::new());
411    let notifications = Notifications::init();
412    let pivot_hint = if let Some(conf) = &consensus_conf.pivot_hint_conf {
413        Some(Arc::new(PivotHint::new(conf)?))
414    } else {
415        None
416    };
417
418    let consensus = Arc::new(ConsensusGraph::new(
419        consensus_conf,
420        txpool.clone(),
421        statistics.clone(),
422        data_man.clone(),
423        pow_config.clone(),
424        pow.clone(),
425        notifications.clone(),
426        conf.execution_config(),
427        verification_config.clone(),
428        node_type,
429        pos_verifier.clone(),
430        pivot_hint,
431        conf.common_params(),
432    ));
433
434    for terminal in data_man
435        .terminals_from_db()
436        .unwrap_or(vec![data_man.get_cur_consensus_era_genesis_hash()])
437    {
438        if data_man.block_height_by_hash(&terminal).unwrap()
439            >= conf.raw_conf.pos_reference_enable_height
440        {
441            pos_verifier.initialize(consensus.clone())?;
442            break;
443        }
444    }
445
446    let sync_config = conf.sync_graph_config();
447
448    let sync_graph = Arc::new(SynchronizationGraph::new(
449        consensus.clone(),
450        data_man.clone(),
451        statistics.clone(),
452        verification_config,
453        pow_config,
454        pow.clone(),
455        sync_config,
456        notifications.clone(),
457        machine.clone(),
458        pos_verifier.clone(),
459    ));
460    let refresh_time =
461        Duration::from_millis(conf.raw_conf.account_provider_refresh_time_ms);
462
463    let accounts = Arc::new(
464        account_provider(
465            Some(keys_path()),
466            None, /* sstore_iterations */
467            Some(refresh_time),
468        )
469        .expect("failed to initialize account provider"),
470    );
471
472    let common_impl = Arc::new(CommonRpcImpl::new(
473        exit,
474        consensus.clone(),
475        network.clone(),
476        txpool.clone(),
477        accounts.clone(),
478        pos_verifier.clone(),
479    ));
480    let tokio_runtime =
481        Arc::new(TokioRuntime::new().map_err(|e| e.to_string())?);
482
483    let pubsub = PubSubClient::new(
484        tokio_runtime.clone(),
485        consensus.clone(),
486        notifications.clone(),
487        *network.get_network_type(),
488    );
489
490    Ok((
491        machine,
492        secret_store,
493        genesis_accounts,
494        data_man,
495        pow,
496        pos_verifier,
497        txpool,
498        consensus,
499        sync_graph,
500        network,
501        common_impl,
502        accounts,
503        notifications,
504        pubsub,
505        tokio_runtime,
506    ))
507}
508
509pub fn initialize_not_light_node_modules(
510    conf: &mut Configuration, exit: Arc<(Mutex<bool>, Condvar)>,
511    node_type: NodeType,
512) -> Result<
513    (
514        Arc<BlockDataManager>,
515        Arc<PowComputer>,
516        Arc<TransactionPool>,
517        Arc<ConsensusGraph>,
518        Arc<SynchronizationService>,
519        Arc<BlockGenerator>,
520        Option<HttpServer>,
521        Option<HttpServer>,
522        Option<TcpServer>,
523        Option<TcpServer>,
524        Option<WSServer>,
525        Option<WSServer>,
526        Arc<PosVerifier>,
527        Arc<TokioRuntime>,
528        Option<RpcServerHandle>,
529        Option<RpcServerHandle>,
530        Option<RpcServerHandle>,
531        TaskManager,
532    ),
533    String,
534> {
535    let (
536        _machine,
537        secret_store,
538        genesis_accounts,
539        data_man,
540        pow,
541        pos_verifier,
542        txpool,
543        consensus,
544        sync_graph,
545        network,
546        common_impl,
547        accounts,
548        notifications,
549        pubsub,
550        tokio_runtime,
551    ) = initialize_common_modules(conf, exit.clone(), node_type)?;
552
553    let light_provider = Arc::new(LightProvider::new(
554        consensus.clone(),
555        sync_graph.clone(),
556        Arc::downgrade(&network),
557        txpool.clone(),
558        conf.raw_conf.throttling_conf.clone(),
559        node_type,
560    ));
561    light_provider.register(network.clone()).unwrap();
562
563    let sync = Arc::new(SynchronizationService::new(
564        node_type,
565        network.clone(),
566        sync_graph.clone(),
567        conf.protocol_config(),
568        conf.state_sync_config(),
569        SyncPhaseType::CatchUpRecoverBlockHeaderFromDB,
570        light_provider,
571        consensus.clone(),
572    ));
573    sync.register().unwrap();
574
575    if let Some(print_memory_usage_period_s) =
576        conf.raw_conf.print_memory_usage_period_s
577    {
578        let secret_store = secret_store.clone();
579        let data_man = data_man.clone();
580        let txpool = txpool.clone();
581        let consensus = consensus.clone();
582        let sync = sync.clone();
583        thread::Builder::new().name("MallocSizeOf".into()).spawn(
584            move || loop {
585                let start = Instant::now();
586                let mb = 1_000_000;
587                let mut ops = new_malloc_size_ops();
588                let secret_store_size = secret_store.size_of(&mut ops) / mb;
589                // Note `db_manager` is not wrapped in Arc, so it will still be included
590                // in `data_man_size`.
591                let data_manager_db_cache_size = data_man.db_manager.size_of(&mut ops) / mb;
592                let storage_manager_size = data_man.storage_manager.size_of(&mut ops) / mb;
593                let data_man_size = data_man.size_of(&mut ops) / mb;
594                let tx_pool_size = txpool.size_of(&mut ops) / mb;
595                let consensus_graph_size = consensus.size_of(&mut ops) / mb;
596                let sync_graph_size =
597                    sync.get_synchronization_graph().size_of(&mut ops) / mb;
598                let sync_service_size = sync.size_of(&mut ops) / mb;
599                info!(
600                    "Malloc Size(MB): secret_store={} data_manager_db_cache_size={} \
601                    storage_manager_size={} data_man={} txpool={} consensus={} sync_graph={}\
602                    sync_service={}, \
603                    time elapsed={:?}",
604                    secret_store_size,data_manager_db_cache_size,storage_manager_size,
605                    data_man_size, tx_pool_size, consensus_graph_size, sync_graph_size,
606                    sync_service_size, start.elapsed(),
607                );
608                thread::sleep(Duration::from_secs(
609                    print_memory_usage_period_s,
610                ));
611            },
612        ).expect("Memory usage thread start fails");
613    }
614
615    let (maybe_txgen, maybe_direct_txgen) = initialize_txgens(
616        consensus.clone(),
617        txpool.clone(),
618        sync.clone(),
619        secret_store.clone(),
620        genesis_accounts,
621        &conf,
622        network.net_key_pair().unwrap(),
623    );
624
625    let maybe_author: Option<Address> =
626        conf.raw_conf.mining_author.as_ref().map(|addr_str| {
627            parse_config_address_string(addr_str, network.get_network_type())
628                .unwrap_or_else(|err| {
629                    panic!("Error parsing mining-author {}", err)
630                })
631        });
632    let pow_config = conf.pow_config();
633    let blockgen = Arc::new(BlockGenerator::new(
634        sync_graph,
635        txpool.clone(),
636        sync.clone(),
637        maybe_txgen.clone(),
638        pow_config.clone(),
639        pow.clone(),
640        maybe_author.clone().unwrap_or_default(),
641        pos_verifier.clone(),
642    ));
643    if conf.is_dev_mode() {
644        // If `dev_block_interval_ms` is None, blocks are generated after
645        // receiving RPC `cfx_sendRawTransaction`.
646        if let Some(interval_ms) = conf.raw_conf.dev_block_interval_ms {
647            // Automatic block generation with fixed interval.
648            let bg = blockgen.test_api();
649            info!("Start auto block generation");
650            thread::Builder::new()
651                .name("auto_mining".into())
652                .spawn(move || {
653                    bg.auto_block_generation(interval_ms);
654                })
655                .expect("Mining thread spawn error");
656        }
657    } else if let Some(author) = maybe_author {
658        if !author.is_genesis_valid_address() || author.is_builtin_address() {
659            panic!("mining-author must be user address or contract address, otherwise you will not get mining rewards!!!");
660        }
661        if pow_config.enable_mining() {
662            let bg = blockgen.clone();
663            tokio_runtime.spawn(async move {
664                bg.mine().await;
665            });
666        }
667    }
668
669    let use_old_core_rpc = conf.raw_conf.core_space_rpc_use_old_impl;
670
671    let rpc_impl = Arc::new(RpcImpl::new(
672        consensus.clone(),
673        sync.clone(),
674        blockgen.test_api(),
675        txpool.clone(),
676        maybe_txgen.clone(),
677        maybe_direct_txgen.clone(),
678        conf.rpc_impl_config(),
679        accounts.clone(),
680    ));
681
682    // When using old impl, start V1 core space RPC servers (jsonrpc-core
683    // based). When using new impl, these are skipped and V2 async servers
684    // are started instead.
685    let (
686        debug_rpc_http_server,
687        rpc_http_server,
688        debug_rpc_tcp_server,
689        rpc_tcp_server,
690        debug_rpc_ws_server,
691        rpc_ws_server,
692    ) = if use_old_core_rpc {
693        let debug_rpc_http_server = super::rpc::start_http(
694            conf.local_http_config(),
695            setup_debug_rpc_apis(
696                common_impl.clone(),
697                rpc_impl.clone(),
698                pubsub.clone(),
699                &conf,
700            ),
701        )?;
702
703        let debug_rpc_tcp_server = super::rpc::start_tcp(
704            conf.local_tcp_config(),
705            setup_debug_rpc_apis(
706                common_impl.clone(),
707                rpc_impl.clone(),
708                pubsub.clone(),
709                &conf,
710            ),
711            RpcExtractor,
712        )?;
713
714        let debug_rpc_ws_server = super::rpc::start_ws(
715            conf.local_ws_config(),
716            setup_public_rpc_apis(
717                common_impl.clone(),
718                rpc_impl.clone(),
719                pubsub.clone(),
720                &conf,
721            ),
722            RpcExtractor,
723        )?;
724
725        let rpc_tcp_server = super::rpc::start_tcp(
726            conf.tcp_config(),
727            setup_public_rpc_apis(
728                common_impl.clone(),
729                rpc_impl.clone(),
730                pubsub.clone(),
731                &conf,
732            ),
733            RpcExtractor,
734        )?;
735
736        let rpc_ws_server = super::rpc::start_ws(
737            conf.ws_config(),
738            setup_public_rpc_apis(
739                common_impl.clone(),
740                rpc_impl.clone(),
741                pubsub.clone(),
742                &conf,
743            ),
744            RpcExtractor,
745        )?;
746
747        let rpc_http_server = super::rpc::start_http(
748            conf.http_config(),
749            setup_public_rpc_apis(common_impl, rpc_impl, pubsub, &conf),
750        )?;
751
752        (
753            debug_rpc_http_server,
754            rpc_http_server,
755            debug_rpc_tcp_server,
756            rpc_tcp_server,
757            debug_rpc_ws_server,
758            rpc_ws_server,
759        )
760    } else {
761        info!("Using new async core space RPC implementation");
762        (None, None, None, None, None, None)
763    };
764
765    let task_manager = TaskManager::new(tokio_runtime.handle().clone());
766    let task_executor = task_manager.executor();
767
768    let eth_rpc_server_handle =
769        tokio_runtime.block_on(launch_async_rpc_servers(
770            consensus.clone(),
771            sync.clone(),
772            txpool.clone(),
773            notifications.clone(),
774            task_executor.clone(),
775            conf,
776        ))?;
777
778    // Start V2 async core space RPC servers when using the new
779    // implementation.
780    let (cfx_rpc_server_handle, debug_cfx_rpc_server_handle) =
781        if !use_old_core_rpc {
782            let cfx_rpc_server_handle =
783                tokio_runtime.block_on(launch_cfx_async_rpc_servers(
784                    consensus.clone(),
785                    sync.clone(),
786                    txpool.clone(),
787                    data_man.clone(),
788                    network.clone(),
789                    pos_verifier.clone(),
790                    notifications.clone(),
791                    task_executor.clone(),
792                    accounts.clone(),
793                    exit.clone(),
794                    blockgen.test_api(),
795                    maybe_txgen.clone(),
796                    maybe_direct_txgen.clone(),
797                    conf,
798                    conf.raw_conf.public_rpc_apis.clone(),
799                    false,
800                ))?;
801            let debug_cfx_rpc_server_handle =
802                tokio_runtime.block_on(launch_cfx_async_rpc_servers(
803                    consensus.clone(),
804                    sync.clone(),
805                    txpool.clone(),
806                    data_man.clone(),
807                    network.clone(),
808                    pos_verifier.clone(),
809                    notifications.clone(),
810                    task_executor.clone(),
811                    accounts.clone(),
812                    exit.clone(),
813                    blockgen.test_api(),
814                    maybe_txgen.clone(),
815                    maybe_direct_txgen.clone(),
816                    conf,
817                    ApiSet::All,
818                    true,
819                ))?;
820            (cfx_rpc_server_handle, debug_cfx_rpc_server_handle)
821        } else {
822            (None, None)
823        };
824
825    // start pprf server, which is used to serve the pprof data for heap
826    // profiling
827    #[cfg(all(unix, feature = "jemalloc-prof"))]
828    if let Some(pprf_addr) = conf.raw_conf.profiling_listen_addr.as_ref() {
829        let pprf_addr = pprf_addr.clone();
830        let _pprf_server_handle = tokio_runtime.spawn(async move {
831            if let Err(e) = start_pprf_server(&pprf_addr).await {
832                eprintln!("Error starting pprof server: {}", e);
833            }
834        });
835    }
836
837    metrics::initialize(conf.metrics_config(), task_executor.clone());
838
839    network.start();
840
841    Ok((
842        data_man,
843        pow,
844        txpool,
845        consensus,
846        sync,
847        blockgen,
848        debug_rpc_http_server,
849        rpc_http_server,
850        debug_rpc_tcp_server,
851        rpc_tcp_server,
852        debug_rpc_ws_server,
853        rpc_ws_server,
854        pos_verifier,
855        tokio_runtime,
856        eth_rpc_server_handle,
857        cfx_rpc_server_handle,
858        debug_cfx_rpc_server_handle,
859        task_manager,
860    ))
861}
862
863pub fn initialize_txgens(
864    consensus: Arc<ConsensusGraph>, txpool: Arc<TransactionPool>,
865    sync: Arc<SynchronizationService>, secret_store: SharedSecretStore,
866    genesis_accounts: HashMap<Address, U256>, conf: &Configuration,
867    network_key_pair: KeyPair,
868) -> (
869    Option<Arc<TransactionGenerator>>,
870    Option<Arc<Mutex<DirectTransactionGenerator>>>,
871) {
872    // This tx generator directly push simple transactions and erc20
873    // transactions into blocks.
874    let maybe_direct_txgen_with_contract = if conf.is_test_or_dev_mode() {
875        Some(Arc::new(Mutex::new(DirectTransactionGenerator::new(
876            network_key_pair,
877            &public_to_address(DEV_GENESIS_KEY_PAIR_2.public(), true),
878            U256::from_dec_str("10000000000000000").unwrap(),
879            U256::from_dec_str("10000000000000000").unwrap(),
880        ))))
881    } else {
882        None
883    };
884
885    // This tx generator generates transactions from preconfigured multiple
886    // genesis accounts and it pushes transactions into transaction pool.
887    let maybe_multi_genesis_txgen = if let Some(txgen_conf) =
888        conf.tx_gen_config()
889    {
890        let multi_genesis_txgen = Arc::new(TransactionGenerator::new(
891            consensus.clone(),
892            txpool.clone(),
893            sync.clone(),
894            secret_store.clone(),
895        ));
896        if txgen_conf.generate_tx {
897            let txgen_clone = multi_genesis_txgen.clone();
898            let join_handle =
899                thread::Builder::new()
900                    .name("txgen".into())
901                    .spawn(move || {
902                        TransactionGenerator::generate_transactions_with_multiple_genesis_accounts(
903                            txgen_clone,
904                            txgen_conf,
905                            genesis_accounts,
906                        );
907                    })
908                    .expect("should succeed");
909            multi_genesis_txgen.set_join_handle(join_handle);
910        }
911        Some(multi_genesis_txgen)
912    } else {
913        None
914    };
915
916    (maybe_multi_genesis_txgen, maybe_direct_txgen_with_contract)
917}