1use crate::{
9 genesis_block::GenesisPosState,
10 pos::{
11 consensus::{
12 consensus_provider::start_consensus,
13 network::NetworkReceivers as ConsensusNetworkReceivers,
14 ConsensusDB, TestCommand,
15 },
16 mempool as diem_mempool,
17 mempool::{
18 network::NetworkReceivers as MemPoolNetworkReceivers,
19 SubmissionStatus,
20 },
21 pow_handler::PowHandler,
22 protocol::{
23 network_sender::NetworkSender,
24 sync_protocol::HotStuffSynchronizationProtocol,
25 },
26 },
27 sync::ProtocolConfiguration,
28};
29
30use cached_pos_ledger_db::CachedPosLedgerDB;
31use cfx_types::U256;
32use diem_config::config::NodeConfig;
33use diem_logger::{prelude::*, Writer};
34use diem_types::{
35 account_address::AccountAddress,
36 block_info::PivotBlockDecision,
37 chain_id::ChainId,
38 term_state::NodeID,
39 transaction::SignedTransaction,
40 validator_config::{ConsensusPrivateKey, ConsensusVRFPrivateKey},
41};
42use executor::db_bootstrapper::maybe_bootstrap;
43use futures::channel::{
44 mpsc::{self, channel},
45 oneshot,
46};
47use network::NetworkService;
48use pos_ledger_db::PosLedgerDB;
49use std::{
50 boxed::Box,
51 fs,
52 sync::{
53 atomic::{AtomicBool, Ordering},
54 Arc,
55 },
56 time::Instant,
57};
58use storage_interface::DbReaderWriter;
59use tokio::runtime::Runtime;
60
61const AC_SMP_CHANNEL_BUFFER_SIZE: usize = 1_024;
62const INTRA_NODE_CHANNEL_BUFFER_SIZE: usize = 1;
63
64pub struct PosNodeKeys {
70 pub author: AccountAddress,
71 pub consensus_private_key: ConsensusPrivateKey,
72 pub vrf_private_key: ConsensusVRFPrivateKey,
73}
74
75pub struct PosChainParams {
82 pub chain_id: ChainId,
83 pub vrf_proposal_threshold: U256,
84}
85
86pub struct PosDropHandle {
87 pub pow_handler: Arc<PowHandler>,
89 pub pos_ledger_db: Arc<PosLedgerDB>,
90 pub cached_db: Arc<CachedPosLedgerDB>,
91 pub consensus_db: Arc<ConsensusDB>,
92 pub tx_sender: mpsc::Sender<(
93 SignedTransaction,
94 oneshot::Sender<anyhow::Result<SubmissionStatus>>,
95 )>,
96 pub stopped: Arc<AtomicBool>,
97 _mempool: Runtime,
98 _consensus_runtime: Runtime,
99}
100
101pub fn start_pos_consensus(
102 config: &NodeConfig, network: Arc<NetworkService>,
103 protocol_config: ProtocolConfiguration, node_keys: PosNodeKeys,
104 chain_params: PosChainParams, pos_genesis_state: GenesisPosState,
105 consensus_network_receiver: ConsensusNetworkReceivers,
106 mempool_network_receiver: MemPoolNetworkReceivers,
107 test_command_receiver: mpsc::Receiver<TestCommand>,
108 hsb_protocol: Arc<HotStuffSynchronizationProtocol>,
109) -> PosDropHandle {
110 let mut logger = diem_logger::Logger::new();
111 logger
112 .channel_size(config.logger.chan_size)
113 .is_async(config.logger.is_async)
114 .level(config.logger.level)
115 .read_env();
116 if let Some(log_file) = config.logger.file.clone() {
117 if let Some(parent) = log_file.parent() {
118 fs::create_dir_all(parent).expect(&format!(
119 "error creating PoS log file directory: parent={:?}",
120 parent
121 ));
122 }
123 let writer = match config.logger.rotation_count {
124 Some(count) => Box::new(RollingFileWriter::new(
125 log_file,
126 count,
127 config.logger.rotation_file_size_mb.unwrap_or(500),
128 ))
129 as Box<dyn Writer + Send + Sync + 'static>,
130 None => Box::new(FileWriter::new(log_file))
131 as Box<dyn Writer + Send + Sync + 'static>,
132 };
133 logger.printer(writer);
134 }
135 let _logger = Some(logger.build());
136
137 diem_info!(config = config, "Loaded Pos config");
139
140 if fail::has_failpoints() {
141 diem_warn!("Failpoints is enabled");
142 if let Some(failpoints) = &config.failpoints {
143 for (point, actions) in failpoints {
144 fail::cfg(point, actions)
145 .expect("fail to set actions for failpoint");
146 }
147 }
148 } else if config.failpoints.is_some() {
149 diem_warn!("failpoints is set in config, but the binary doesn't compile with this feature");
150 }
151
152 setup_pos_environment(
153 &config,
154 network,
155 protocol_config,
156 node_keys,
157 chain_params,
158 pos_genesis_state,
159 consensus_network_receiver,
160 mempool_network_receiver,
161 test_command_receiver,
162 hsb_protocol,
163 )
164}
165
166pub fn setup_pos_environment(
167 node_config: &NodeConfig, network: Arc<NetworkService>,
168 protocol_config: ProtocolConfiguration, node_keys: PosNodeKeys,
169 chain_params: PosChainParams, pos_genesis_state: GenesisPosState,
170 consensus_network_receiver: ConsensusNetworkReceivers,
171 mempool_network_receiver: MemPoolNetworkReceivers,
172 test_command_receiver: mpsc::Receiver<TestCommand>,
173 hsb_protocol: Arc<HotStuffSynchronizationProtocol>,
174) -> PosDropHandle {
175 let mut instant = Instant::now();
176 let (pos_ledger_db, db_rw) = DbReaderWriter::wrap(
177 PosLedgerDB::open(
178 &node_config.storage.dir(),
179 false, node_config.storage.rocksdb_config,
181 )
182 .expect("DB should open."),
183 );
184
185 maybe_bootstrap(
187 &db_rw,
188 Some(PivotBlockDecision {
189 block_hash: protocol_config.pos_genesis_pivot_decision,
190 height: 0,
191 }),
192 pos_genesis_state.initial_seed.as_bytes().to_vec(),
193 pos_genesis_state
194 .initial_nodes
195 .into_iter()
196 .map(|node| {
197 (NodeID::new(node.bls_key, node.vrf_key), node.voting_power)
198 })
199 .collect(),
200 pos_genesis_state.initial_committee,
201 )
202 .expect("Db-bootstrapper should not fail.");
203
204 debug!(
205 "Storage service started in {} ms",
206 instant.elapsed().as_millis()
207 );
208
209 let (mempool_commit_sender, mempool_commit_receiver) =
211 channel(INTRA_NODE_CHANNEL_BUFFER_SIZE);
212
213 let (consensus_to_mempool_sender, consensus_requests) =
214 channel(INTRA_NODE_CHANNEL_BUFFER_SIZE);
215
216 let network_sender = NetworkSender {
217 network,
218 protocol_handler: hsb_protocol,
219 };
220
221 let (mp_client_sender, mp_client_events) =
222 channel(AC_SMP_CHANNEL_BUFFER_SIZE);
223
224 let db_with_cache = Arc::new(CachedPosLedgerDB::new(db_rw));
225
226 instant = Instant::now();
227 let mempool = diem_mempool::bootstrap(
228 node_config,
229 db_with_cache.clone(),
230 network_sender.clone(),
231 mempool_network_receiver,
232 mp_client_events,
233 consensus_requests,
234 mempool_commit_receiver,
235 );
236 debug!("Mempool started in {} ms", instant.elapsed().as_millis());
237
238 instant = Instant::now();
240 debug!("pos author: {:?}", node_keys.author);
241 let (consensus_runtime, pow_handler, stopped, consensus_db) =
242 start_consensus(
243 node_config,
244 network_sender,
245 consensus_network_receiver,
246 consensus_to_mempool_sender,
247 mempool_commit_sender.clone(),
248 node_config.consensus.mempool_commit_timeout_ms,
249 pos_ledger_db.clone(),
250 db_with_cache.clone(),
251 node_keys,
252 chain_params,
253 mp_client_sender.clone(),
254 test_command_receiver,
255 protocol_config.pos_started_as_voter,
256 );
257 debug!("Consensus started in {} ms", instant.elapsed().as_millis());
258
259 PosDropHandle {
260 pow_handler,
261 _consensus_runtime: consensus_runtime,
262 stopped,
263 _mempool: mempool,
264 pos_ledger_db,
265 cached_db: db_with_cache,
266 consensus_db,
267 tx_sender: mp_client_sender,
268 }
269}
270
271impl Drop for PosDropHandle {
272 fn drop(&mut self) {
273 debug!("Drop PosDropHandle");
274 self.stopped.store(true, Ordering::SeqCst);
275 self.pow_handler.stop();
276 }
277}