cfxcore/pos/consensus/
consensus_provider.rs1use std::sync::{atomic::AtomicBool, Arc};
9
10use futures::channel::{mpsc, oneshot};
11use tokio::runtime::{self, Runtime};
12
13use cached_pos_ledger_db::CachedPosLedgerDB;
14use consensus_types::db::LedgerBlockRW;
15use diem_config::config::NodeConfig;
16use diem_logger::prelude::*;
17use diem_types::transaction::SignedTransaction;
18use executor::Executor;
19use storage_interface::DbReader;
20
21use crate::pos::{
22 mempool::{ConsensusRequest, SubmissionStatus},
23 pos::{PosChainParams, PosNodeKeys},
24 pow_handler::PowHandler,
25 protocol::network_sender::NetworkSender,
26};
27
28use super::{
29 epoch_manager::EpochManager, network::NetworkReceivers,
30 persistent_liveness_storage::StorageWriteProxy,
31 state_computer::ExecutionProxy, txn_manager::MempoolProxy,
32 util::time_service::ClockTimeService,
33};
34use crate::pos::consensus::{ConsensusDB, TestCommand};
35
36pub fn start_consensus(
39 node_config: &NodeConfig, network_sender: NetworkSender,
40 network_receiver: NetworkReceivers,
41 consensus_to_mempool_sender: mpsc::Sender<ConsensusRequest>,
42 mempool_commit_sender: mpsc::Sender<
43 crate::pos::mempool::CommitNotification,
44 >,
45 mempool_commit_timeout_ms: u64, pos_ledger_db: Arc<dyn DbReader>,
46 db_with_cache: Arc<CachedPosLedgerDB>, node_keys: PosNodeKeys,
47 chain_params: PosChainParams,
48 tx_sender: mpsc::Sender<(
49 SignedTransaction,
50 oneshot::Sender<anyhow::Result<SubmissionStatus>>,
51 )>,
52 test_command_receiver: mpsc::Receiver<TestCommand>, started_as_voter: bool,
53) -> (Runtime, Arc<PowHandler>, Arc<AtomicBool>, Arc<ConsensusDB>) {
54 let stopped = Arc::new(AtomicBool::new(false));
55 let runtime = runtime::Builder::new_multi_thread()
56 .thread_name("consensus")
57 .enable_all()
58 .worker_threads(4)
60 .build()
61 .expect("Failed to create Tokio runtime!");
62 let storage = Arc::new(StorageWriteProxy::new(node_config, pos_ledger_db));
63 let consensus_db = storage.consensus_db();
64 let txn_manager = Arc::new(MempoolProxy::new(
65 consensus_to_mempool_sender,
66 node_config.consensus.mempool_poll_count,
67 node_config.consensus.mempool_txn_pull_timeout_ms,
68 ));
69 let pow_handler = Arc::new(PowHandler::new(
70 runtime.handle().clone(),
71 consensus_db.clone(),
72 ));
73 let executor = Box::new(Executor::new(
74 db_with_cache,
75 pow_handler.clone(),
76 consensus_db.clone() as Arc<dyn LedgerBlockRW>,
77 ));
78 let state_computer = Arc::new(ExecutionProxy::new(
79 executor,
80 mempool_commit_sender,
81 mempool_commit_timeout_ms,
82 ));
83 let time_service =
84 Arc::new(ClockTimeService::new(runtime.handle().clone()));
85
86 let (timeout_sender, timeout_receiver) = mpsc::channel(1_024);
87 let (proposal_timeout_sender, proposal_timeout_receiver) =
88 mpsc::channel(1_024);
89 let (new_round_timeout_sender, new_round_timeout_receiver) =
90 mpsc::channel(1_024);
91
92 let epoch_mgr = EpochManager::new(
93 node_config,
94 time_service,
95 network_sender,
96 timeout_sender,
97 proposal_timeout_sender,
98 new_round_timeout_sender,
99 txn_manager,
100 state_computer,
101 storage,
102 pow_handler.clone(),
103 node_keys,
104 chain_params,
105 tx_sender,
106 started_as_voter,
107 );
108
109 runtime.spawn(epoch_mgr.start(
110 timeout_receiver,
111 proposal_timeout_receiver,
112 new_round_timeout_receiver,
113 network_receiver,
114 test_command_receiver,
115 stopped.clone(),
116 ));
117
118 diem_debug!("Consensus started.");
119 (runtime, pow_handler, stopped, consensus_db)
120}