cfxcore/pos/consensus/
consensus_provider.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8use std::sync::{atomic::AtomicBool, Arc};
9
10use futures::channel::{mpsc, oneshot};
11use tokio::runtime::{self, Runtime};
12
13use cached_pos_ledger_db::CachedPosLedgerDB;
14use channel::diem_channel;
15use consensus_types::db::LedgerBlockRW;
16use diem_config::config::NodeConfig;
17use diem_logger::prelude::*;
18use diem_types::{
19    account_address::AccountAddress, on_chain_config::OnChainConfigPayload,
20    transaction::SignedTransaction,
21};
22use executor::{vm::PosVM, Executor};
23use storage_interface::DbReader;
24
25use crate::pos::{
26    mempool::{ConsensusRequest, SubmissionStatus},
27    pow_handler::PowHandler,
28    protocol::network_sender::NetworkSender,
29    state_sync::client::StateSyncClient,
30};
31
32use super::{
33    counters, epoch_manager::EpochManager, network::NetworkReceivers,
34    persistent_liveness_storage::StorageWriteProxy,
35    state_computer::ExecutionProxy, txn_manager::MempoolProxy,
36    util::time_service::ClockTimeService,
37};
38use crate::pos::consensus::{ConsensusDB, TestCommand};
39
40/// Helper function to start consensus based on configuration and return the
41/// runtime
42pub fn start_consensus(
43    node_config: &NodeConfig, network_sender: NetworkSender,
44    network_receiver: NetworkReceivers,
45    consensus_to_mempool_sender: mpsc::Sender<ConsensusRequest>,
46    state_sync_client: StateSyncClient, pos_ledger_db: Arc<dyn DbReader>,
47    db_with_cache: Arc<CachedPosLedgerDB>,
48    reconfig_events: diem_channel::Receiver<(), OnChainConfigPayload>,
49    author: AccountAddress,
50    tx_sender: mpsc::Sender<(
51        SignedTransaction,
52        oneshot::Sender<anyhow::Result<SubmissionStatus>>,
53    )>,
54    test_command_receiver: channel::Receiver<TestCommand>,
55    started_as_voter: bool,
56) -> (Runtime, Arc<PowHandler>, Arc<AtomicBool>, Arc<ConsensusDB>) {
57    let stopped = Arc::new(AtomicBool::new(false));
58    let runtime = runtime::Builder::new_multi_thread()
59        .thread_name("consensus")
60        .enable_all()
61        // TODO(lpl): This is for debugging.
62        .worker_threads(4)
63        .build()
64        .expect("Failed to create Tokio runtime!");
65    let storage = Arc::new(StorageWriteProxy::new(node_config, pos_ledger_db));
66    let consensus_db = storage.consensus_db();
67    let txn_manager = Arc::new(MempoolProxy::new(
68        consensus_to_mempool_sender,
69        node_config.consensus.mempool_poll_count,
70        node_config.consensus.mempool_txn_pull_timeout_ms,
71        node_config.consensus.mempool_executed_txn_timeout_ms,
72    ));
73    let pow_handler = Arc::new(PowHandler::new(
74        runtime.handle().clone(),
75        consensus_db.clone(),
76    ));
77    let executor = Box::new(Executor::<PosVM>::new(
78        db_with_cache,
79        pow_handler.clone(),
80        consensus_db.clone() as Arc<dyn LedgerBlockRW>,
81    ));
82    let state_computer =
83        Arc::new(ExecutionProxy::new(executor, state_sync_client));
84    let time_service =
85        Arc::new(ClockTimeService::new(runtime.handle().clone()));
86
87    let (timeout_sender, timeout_receiver) =
88        channel::new(1_024, &counters::PENDING_ROUND_TIMEOUTS);
89    let (proposal_timeout_sender, proposal_timeout_receiver) =
90        channel::new(1_024, &counters::PENDING_PROPOSAL_TIMEOUTS);
91    let (new_round_timeout_sender, new_round_timeout_receiver) =
92        channel::new(1_024, &counters::PENDING_NEW_ROUND_TIMEOUTS);
93
94    let epoch_mgr = EpochManager::new(
95        node_config,
96        time_service,
97        network_sender,
98        timeout_sender,
99        proposal_timeout_sender,
100        new_round_timeout_sender,
101        txn_manager,
102        state_computer,
103        storage,
104        reconfig_events,
105        pow_handler.clone(),
106        author,
107        tx_sender,
108        started_as_voter,
109    );
110
111    runtime.spawn(epoch_mgr.start(
112        timeout_receiver,
113        proposal_timeout_receiver,
114        new_round_timeout_receiver,
115        network_receiver,
116        test_command_receiver,
117        stopped.clone(),
118    ));
119
120    diem_debug!("Consensus started.");
121    (runtime, pow_handler, stopped, consensus_db)
122}