cfxcore/pos/consensus/
consensus_provider.rs1use std::sync::{atomic::AtomicBool, Arc};
9
10use futures::channel::{mpsc, oneshot};
11use tokio::runtime::{self, Runtime};
12
13use cached_pos_ledger_db::CachedPosLedgerDB;
14use channel::diem_channel;
15use consensus_types::db::LedgerBlockRW;
16use diem_config::config::NodeConfig;
17use diem_logger::prelude::*;
18use diem_types::{
19 account_address::AccountAddress, on_chain_config::OnChainConfigPayload,
20 transaction::SignedTransaction,
21};
22use executor::{vm::PosVM, Executor};
23use storage_interface::DbReader;
24
25use crate::pos::{
26 mempool::{ConsensusRequest, SubmissionStatus},
27 pow_handler::PowHandler,
28 protocol::network_sender::NetworkSender,
29 state_sync::client::StateSyncClient,
30};
31
32use super::{
33 counters, epoch_manager::EpochManager, network::NetworkReceivers,
34 persistent_liveness_storage::StorageWriteProxy,
35 state_computer::ExecutionProxy, txn_manager::MempoolProxy,
36 util::time_service::ClockTimeService,
37};
38use crate::pos::consensus::{ConsensusDB, TestCommand};
39
40pub fn start_consensus(
43 node_config: &NodeConfig, network_sender: NetworkSender,
44 network_receiver: NetworkReceivers,
45 consensus_to_mempool_sender: mpsc::Sender<ConsensusRequest>,
46 state_sync_client: StateSyncClient, pos_ledger_db: Arc<dyn DbReader>,
47 db_with_cache: Arc<CachedPosLedgerDB>,
48 reconfig_events: diem_channel::Receiver<(), OnChainConfigPayload>,
49 author: AccountAddress,
50 tx_sender: mpsc::Sender<(
51 SignedTransaction,
52 oneshot::Sender<anyhow::Result<SubmissionStatus>>,
53 )>,
54 test_command_receiver: channel::Receiver<TestCommand>,
55 started_as_voter: bool,
56) -> (Runtime, Arc<PowHandler>, Arc<AtomicBool>, Arc<ConsensusDB>) {
57 let stopped = Arc::new(AtomicBool::new(false));
58 let runtime = runtime::Builder::new_multi_thread()
59 .thread_name("consensus")
60 .enable_all()
61 .worker_threads(4)
63 .build()
64 .expect("Failed to create Tokio runtime!");
65 let storage = Arc::new(StorageWriteProxy::new(node_config, pos_ledger_db));
66 let consensus_db = storage.consensus_db();
67 let txn_manager = Arc::new(MempoolProxy::new(
68 consensus_to_mempool_sender,
69 node_config.consensus.mempool_poll_count,
70 node_config.consensus.mempool_txn_pull_timeout_ms,
71 node_config.consensus.mempool_executed_txn_timeout_ms,
72 ));
73 let pow_handler = Arc::new(PowHandler::new(
74 runtime.handle().clone(),
75 consensus_db.clone(),
76 ));
77 let executor = Box::new(Executor::<PosVM>::new(
78 db_with_cache,
79 pow_handler.clone(),
80 consensus_db.clone() as Arc<dyn LedgerBlockRW>,
81 ));
82 let state_computer =
83 Arc::new(ExecutionProxy::new(executor, state_sync_client));
84 let time_service =
85 Arc::new(ClockTimeService::new(runtime.handle().clone()));
86
87 let (timeout_sender, timeout_receiver) =
88 channel::new(1_024, &counters::PENDING_ROUND_TIMEOUTS);
89 let (proposal_timeout_sender, proposal_timeout_receiver) =
90 channel::new(1_024, &counters::PENDING_PROPOSAL_TIMEOUTS);
91 let (new_round_timeout_sender, new_round_timeout_receiver) =
92 channel::new(1_024, &counters::PENDING_NEW_ROUND_TIMEOUTS);
93
94 let epoch_mgr = EpochManager::new(
95 node_config,
96 time_service,
97 network_sender,
98 timeout_sender,
99 proposal_timeout_sender,
100 new_round_timeout_sender,
101 txn_manager,
102 state_computer,
103 storage,
104 reconfig_events,
105 pow_handler.clone(),
106 author,
107 tx_sender,
108 started_as_voter,
109 );
110
111 runtime.spawn(epoch_mgr.start(
112 timeout_receiver,
113 proposal_timeout_receiver,
114 new_round_timeout_receiver,
115 network_receiver,
116 test_command_receiver,
117 stopped.clone(),
118 ));
119
120 diem_debug!("Consensus started.");
121 (runtime, pow_handler, stopped, consensus_db)
122}