1use crate::{
9 genesis_block::GenesisPosState,
10 pos::{
11 consensus::{
12 consensus_provider::start_consensus,
13 gen_consensus_reconfig_subscription,
14 network::NetworkReceivers as ConsensusNetworkReceivers,
15 ConsensusDB, TestCommand,
16 },
17 mempool as diem_mempool,
18 mempool::{
19 gen_mempool_reconfig_subscription,
20 network::NetworkReceivers as MemPoolNetworkReceivers,
21 SubmissionStatus,
22 },
23 pow_handler::PowHandler,
24 protocol::{
25 network_sender::NetworkSender,
26 sync_protocol::HotStuffSynchronizationProtocol,
27 },
28 state_sync::bootstrapper::StateSyncBootstrapper,
29 },
30 sync::ProtocolConfiguration,
31};
32
33use cached_pos_ledger_db::CachedPosLedgerDB;
34use consensus_types::db::FakeLedgerBlockDB;
35use diem_config::{config::NodeConfig, utils::get_genesis_txn};
36use diem_logger::{prelude::*, Writer};
37use diem_types::{
38 account_address::{from_consensus_public_key, AccountAddress},
39 block_info::PivotBlockDecision,
40 term_state::NodeID,
41 transaction::SignedTransaction,
42 validator_config::{ConsensusPublicKey, ConsensusVRFPublicKey},
43 PeerId,
44};
45use executor::{db_bootstrapper::maybe_bootstrap, vm::PosVM, Executor};
46use executor_types::ChunkExecutor;
47use futures::{
48 channel::{
49 mpsc::{self, channel},
50 oneshot,
51 },
52 executor::block_on,
53};
54use network::NetworkService;
55use pos_ledger_db::PosLedgerDB;
56use pow_types::FakePowHandler;
57use std::{
58 boxed::Box,
59 fs,
60 sync::{
61 atomic::{AtomicBool, Ordering},
62 Arc,
63 },
64 time::Instant,
65};
66use storage_interface::DbReaderWriter;
67use tokio::runtime::Runtime;
68
69const AC_SMP_CHANNEL_BUFFER_SIZE: usize = 1_024;
70const INTRA_NODE_CHANNEL_BUFFER_SIZE: usize = 1;
71
72pub struct PosDropHandle {
73 pub pow_handler: Arc<PowHandler>,
75 pub pos_ledger_db: Arc<PosLedgerDB>,
76 pub cached_db: Arc<CachedPosLedgerDB>,
77 pub consensus_db: Arc<ConsensusDB>,
78 pub tx_sender: mpsc::Sender<(
79 SignedTransaction,
80 oneshot::Sender<anyhow::Result<SubmissionStatus>>,
81 )>,
82 pub stopped: Arc<AtomicBool>,
83 _mempool: Runtime,
84 _state_sync_bootstrapper: StateSyncBootstrapper,
85 _consensus_runtime: Runtime,
86}
87
88pub fn start_pos_consensus(
89 config: &NodeConfig, network: Arc<NetworkService>,
90 protocol_config: ProtocolConfiguration,
91 own_pos_public_key: Option<(ConsensusPublicKey, ConsensusVRFPublicKey)>,
92 pos_genesis_state: GenesisPosState,
93 consensus_network_receiver: ConsensusNetworkReceivers,
94 mempool_network_receiver: MemPoolNetworkReceivers,
95 test_command_receiver: channel::Receiver<TestCommand>,
96 hsb_protocol: Arc<HotStuffSynchronizationProtocol>,
97) -> PosDropHandle {
98 crash_handler::setup_panic_handler();
99
100 let mut logger = diem_logger::Logger::new();
101 logger
102 .channel_size(config.logger.chan_size)
103 .is_async(config.logger.is_async)
104 .level(config.logger.level)
105 .read_env();
106 if let Some(log_file) = config.logger.file.clone() {
107 if let Some(parent) = log_file.parent() {
108 fs::create_dir_all(parent).expect(&format!(
109 "error creating PoS log file directory: parent={:?}",
110 parent
111 ));
112 }
113 let writer = match config.logger.rotation_count {
114 Some(count) => Box::new(RollingFileWriter::new(
115 log_file,
116 count,
117 config.logger.rotation_file_size_mb.unwrap_or(500),
118 ))
119 as Box<dyn Writer + Send + Sync + 'static>,
120 None => Box::new(FileWriter::new(log_file))
121 as Box<dyn Writer + Send + Sync + 'static>,
122 };
123 logger.printer(writer);
124 }
125 let _logger = Some(logger.build());
126
127 diem_info!(config = config, "Loaded Pos config");
129
130 if fail::has_failpoints() {
142 diem_warn!("Failpoints is enabled");
143 if let Some(failpoints) = &config.failpoints {
144 for (point, actions) in failpoints {
145 fail::cfg(point, actions)
146 .expect("fail to set actions for failpoint");
147 }
148 }
149 } else if config.failpoints.is_some() {
150 diem_warn!("failpoints is set in config, but the binary doesn't compile with this feature");
151 }
152
153 setup_pos_environment(
154 &config,
155 network,
156 protocol_config,
157 own_pos_public_key,
158 pos_genesis_state,
159 consensus_network_receiver,
160 mempool_network_receiver,
161 test_command_receiver,
162 hsb_protocol,
163 )
164}
165
166#[allow(unused)]
167fn setup_metrics(peer_id: PeerId, config: &NodeConfig) {
168 diem_metrics::dump_all_metrics_to_file_periodically(
169 &config.metrics.dir(),
170 &format!("{}.metrics", peer_id),
171 config.metrics.collection_interval_ms,
172 );
173}
174
175fn setup_chunk_executor(db: DbReaderWriter) -> Box<dyn ChunkExecutor> {
176 Box::new(Executor::<PosVM>::new(
177 Arc::new(CachedPosLedgerDB::new(db)),
178 Arc::new(FakePowHandler {}),
179 Arc::new(FakeLedgerBlockDB {}),
180 ))
181}
182
183pub fn setup_pos_environment(
184 node_config: &NodeConfig, network: Arc<NetworkService>,
185 protocol_config: ProtocolConfiguration,
186 own_pos_public_key: Option<(ConsensusPublicKey, ConsensusVRFPublicKey)>,
187 pos_genesis_state: GenesisPosState,
188 consensus_network_receiver: ConsensusNetworkReceivers,
189 mempool_network_receiver: MemPoolNetworkReceivers,
190 test_command_receiver: channel::Receiver<TestCommand>,
191 hsb_protocol: Arc<HotStuffSynchronizationProtocol>,
192) -> PosDropHandle {
193 let mut instant = Instant::now();
211 let (pos_ledger_db, db_rw) = DbReaderWriter::wrap(
212 PosLedgerDB::open(
213 &node_config.storage.dir(),
214 false, node_config.storage.prune_window,
216 node_config.storage.rocksdb_config,
217 )
218 .expect("DB should open."),
219 );
220
221 let genesis_waypoint = node_config.base.waypoint.genesis_waypoint();
222 if let Some(genesis) = get_genesis_txn(&node_config) {
224 maybe_bootstrap::<PosVM>(
225 &db_rw,
226 genesis,
227 genesis_waypoint,
228 Some(PivotBlockDecision {
229 block_hash: protocol_config.pos_genesis_pivot_decision,
230 height: 0,
231 }),
232 pos_genesis_state.initial_seed.as_bytes().to_vec(),
233 pos_genesis_state
234 .initial_nodes
235 .into_iter()
236 .map(|node| {
237 (NodeID::new(node.bls_key, node.vrf_key), node.voting_power)
238 })
239 .collect(),
240 pos_genesis_state.initial_committee,
241 )
242 .expect("Db-bootstrapper should not fail.");
243 } else {
244 panic!("Genesis txn not provided.");
245 }
246
247 debug!(
248 "Storage service started in {} ms",
249 instant.elapsed().as_millis()
250 );
251
252 instant = Instant::now();
253 let chunk_executor = setup_chunk_executor(db_rw.clone());
254 debug!(
255 "ChunkExecutor setup in {} ms",
256 instant.elapsed().as_millis()
257 );
258 let mut reconfig_subscriptions = vec![];
259
260 let (mempool_reconfig_subscription, mempool_reconfig_events) =
261 gen_mempool_reconfig_subscription();
262 reconfig_subscriptions.push(mempool_reconfig_subscription);
263 let (consensus_reconfig_subscription, consensus_reconfig_events) =
265 gen_consensus_reconfig_subscription();
266 if node_config.base.role.is_validator() {
267 reconfig_subscriptions.push(consensus_reconfig_subscription);
268 }
269
270 let (state_sync_to_mempool_sender, state_sync_requests) =
272 channel(INTRA_NODE_CHANNEL_BUFFER_SIZE);
273 let state_sync_bootstrapper = StateSyncBootstrapper::bootstrap(
274 state_sync_to_mempool_sender,
275 Arc::clone(&db_rw.reader),
276 chunk_executor,
277 node_config,
278 genesis_waypoint,
279 reconfig_subscriptions,
280 );
281
282 let state_sync_client = state_sync_bootstrapper
283 .create_client(node_config.state_sync.client_commit_timeout_ms);
284
285 let (consensus_to_mempool_sender, consensus_requests) =
286 channel(INTRA_NODE_CHANNEL_BUFFER_SIZE);
287
288 let network_sender = NetworkSender {
289 network,
290 protocol_handler: hsb_protocol,
291 };
292
293 let (mp_client_sender, mp_client_events) =
294 channel(AC_SMP_CHANNEL_BUFFER_SIZE);
295
296 let db_with_cache = Arc::new(CachedPosLedgerDB::new(db_rw));
297
298 instant = Instant::now();
299 let mempool = diem_mempool::bootstrap(
300 node_config,
301 db_with_cache.clone(),
302 network_sender.clone(),
303 mempool_network_receiver,
304 mp_client_events,
305 consensus_requests,
306 state_sync_requests,
307 mempool_reconfig_events,
308 );
309 debug!("Mempool started in {} ms", instant.elapsed().as_millis());
310
311 debug!("Wait until state sync is initialized");
317 block_on(state_sync_client.wait_until_initialized())
318 .expect("State sync initialization failure");
319 debug!("State sync initialization complete.");
320
321 instant = Instant::now();
323 debug!("own_pos_public_key: {:?}", own_pos_public_key);
324 let (consensus_runtime, pow_handler, stopped, consensus_db) =
325 start_consensus(
326 node_config,
327 network_sender,
328 consensus_network_receiver,
329 consensus_to_mempool_sender,
330 state_sync_client,
331 pos_ledger_db.clone(),
332 db_with_cache.clone(),
333 consensus_reconfig_events,
334 own_pos_public_key.map_or_else(
335 || AccountAddress::random(),
336 |public_key| {
337 from_consensus_public_key(&public_key.0, &public_key.1)
338 },
339 ),
340 mp_client_sender.clone(),
341 test_command_receiver,
342 protocol_config.pos_started_as_voter,
343 );
344 debug!("Consensus started in {} ms", instant.elapsed().as_millis());
345
346 PosDropHandle {
347 pow_handler,
348 _consensus_runtime: consensus_runtime,
349 stopped,
350 _state_sync_bootstrapper: state_sync_bootstrapper,
351 _mempool: mempool,
352 pos_ledger_db,
353 cached_db: db_with_cache,
354 consensus_db,
355 tx_sender: mp_client_sender,
356 }
357}
358
359impl Drop for PosDropHandle {
360 fn drop(&mut self) {
361 debug!("Drop PosDropHandle");
362 self.stopped.store(true, Ordering::SeqCst);
363 self.pow_handler.stop();
364 }
365}