cfxcore/pos/consensus/
consensus_provider.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8use std::sync::{atomic::AtomicBool, Arc};
9
10use futures::channel::{mpsc, oneshot};
11use tokio::runtime::{self, Runtime};
12
13use cached_pos_ledger_db::CachedPosLedgerDB;
14use consensus_types::db::LedgerBlockRW;
15use diem_config::config::NodeConfig;
16use diem_logger::prelude::*;
17use diem_types::transaction::SignedTransaction;
18use executor::Executor;
19use storage_interface::DbReader;
20
21use crate::pos::{
22    mempool::{ConsensusRequest, SubmissionStatus},
23    pos::{PosChainParams, PosNodeKeys},
24    pow_handler::PowHandler,
25    protocol::network_sender::NetworkSender,
26};
27
28use super::{
29    epoch_manager::EpochManager, network::NetworkReceivers,
30    persistent_liveness_storage::StorageWriteProxy,
31    state_computer::ExecutionProxy, txn_manager::MempoolProxy,
32    util::time_service::ClockTimeService,
33};
34use crate::pos::consensus::{ConsensusDB, TestCommand};
35
36/// Helper function to start consensus based on configuration and return the
37/// runtime
38pub fn start_consensus(
39    node_config: &NodeConfig, network_sender: NetworkSender,
40    network_receiver: NetworkReceivers,
41    consensus_to_mempool_sender: mpsc::Sender<ConsensusRequest>,
42    mempool_commit_sender: mpsc::Sender<
43        crate::pos::mempool::CommitNotification,
44    >,
45    mempool_commit_timeout_ms: u64, pos_ledger_db: Arc<dyn DbReader>,
46    db_with_cache: Arc<CachedPosLedgerDB>, node_keys: PosNodeKeys,
47    chain_params: PosChainParams,
48    tx_sender: mpsc::Sender<(
49        SignedTransaction,
50        oneshot::Sender<anyhow::Result<SubmissionStatus>>,
51    )>,
52    test_command_receiver: mpsc::Receiver<TestCommand>, started_as_voter: bool,
53) -> (Runtime, Arc<PowHandler>, Arc<AtomicBool>, Arc<ConsensusDB>) {
54    let stopped = Arc::new(AtomicBool::new(false));
55    let runtime = runtime::Builder::new_multi_thread()
56        .thread_name("consensus")
57        .enable_all()
58        // TODO(lpl): This is for debugging.
59        .worker_threads(4)
60        .build()
61        .expect("Failed to create Tokio runtime!");
62    let storage = Arc::new(StorageWriteProxy::new(node_config, pos_ledger_db));
63    let consensus_db = storage.consensus_db();
64    let txn_manager = Arc::new(MempoolProxy::new(
65        consensus_to_mempool_sender,
66        node_config.consensus.mempool_poll_count,
67        node_config.consensus.mempool_txn_pull_timeout_ms,
68    ));
69    let pow_handler = Arc::new(PowHandler::new(
70        runtime.handle().clone(),
71        consensus_db.clone(),
72    ));
73    let executor = Box::new(Executor::new(
74        db_with_cache,
75        pow_handler.clone(),
76        consensus_db.clone() as Arc<dyn LedgerBlockRW>,
77    ));
78    let state_computer = Arc::new(ExecutionProxy::new(
79        executor,
80        mempool_commit_sender,
81        mempool_commit_timeout_ms,
82    ));
83    let time_service =
84        Arc::new(ClockTimeService::new(runtime.handle().clone()));
85
86    let (timeout_sender, timeout_receiver) = mpsc::channel(1_024);
87    let (proposal_timeout_sender, proposal_timeout_receiver) =
88        mpsc::channel(1_024);
89    let (new_round_timeout_sender, new_round_timeout_receiver) =
90        mpsc::channel(1_024);
91
92    let epoch_mgr = EpochManager::new(
93        node_config,
94        time_service,
95        network_sender,
96        timeout_sender,
97        proposal_timeout_sender,
98        new_round_timeout_sender,
99        txn_manager,
100        state_computer,
101        storage,
102        pow_handler.clone(),
103        node_keys,
104        chain_params,
105        tx_sender,
106        started_as_voter,
107    );
108
109    runtime.spawn(epoch_mgr.start(
110        timeout_receiver,
111        proposal_timeout_receiver,
112        new_round_timeout_receiver,
113        network_receiver,
114        test_command_receiver,
115        stopped.clone(),
116    ));
117
118    diem_debug!("Consensus started.");
119    (runtime, pow_handler, stopped, consensus_db)
120}