1use crate::{
9 genesis_block::GenesisPosState,
10 pos::{
11 consensus::{
12 consensus_provider::start_consensus,
13 gen_consensus_reconfig_subscription,
14 network::NetworkReceivers as ConsensusNetworkReceivers,
15 ConsensusDB, TestCommand,
16 },
17 mempool as diem_mempool,
18 mempool::{
19 gen_mempool_reconfig_subscription,
20 network::NetworkReceivers as MemPoolNetworkReceivers,
21 SubmissionStatus,
22 },
23 pow_handler::PowHandler,
24 protocol::{
25 network_sender::NetworkSender,
26 sync_protocol::HotStuffSynchronizationProtocol,
27 },
28 state_sync::bootstrapper::StateSyncBootstrapper,
29 },
30 sync::ProtocolConfiguration,
31};
32
33use cached_pos_ledger_db::CachedPosLedgerDB;
34use diem_config::{config::NodeConfig, utils::get_genesis_txn};
35use diem_logger::{prelude::*, Writer};
36use diem_types::{
37 account_address::{from_consensus_public_key, AccountAddress},
38 block_info::PivotBlockDecision,
39 term_state::NodeID,
40 transaction::SignedTransaction,
41 validator_config::{ConsensusPublicKey, ConsensusVRFPublicKey},
42 PeerId,
43};
44use executor::{db_bootstrapper::maybe_bootstrap, vm::PosVM};
45use futures::{
46 channel::{
47 mpsc::{self, channel},
48 oneshot,
49 },
50 executor::block_on,
51};
52use network::NetworkService;
53use pos_ledger_db::PosLedgerDB;
54use std::{
55 boxed::Box,
56 fs,
57 sync::{
58 atomic::{AtomicBool, Ordering},
59 Arc,
60 },
61 time::Instant,
62};
63use storage_interface::DbReaderWriter;
64use tokio::runtime::Runtime;
65
66const AC_SMP_CHANNEL_BUFFER_SIZE: usize = 1_024;
67const INTRA_NODE_CHANNEL_BUFFER_SIZE: usize = 1;
68
69pub struct PosDropHandle {
70 pub pow_handler: Arc<PowHandler>,
72 pub pos_ledger_db: Arc<PosLedgerDB>,
73 pub cached_db: Arc<CachedPosLedgerDB>,
74 pub consensus_db: Arc<ConsensusDB>,
75 pub tx_sender: mpsc::Sender<(
76 SignedTransaction,
77 oneshot::Sender<anyhow::Result<SubmissionStatus>>,
78 )>,
79 pub stopped: Arc<AtomicBool>,
80 _mempool: Runtime,
81 _state_sync_bootstrapper: StateSyncBootstrapper,
82 _consensus_runtime: Runtime,
83}
84
85pub fn start_pos_consensus(
86 config: &NodeConfig, network: Arc<NetworkService>,
87 protocol_config: ProtocolConfiguration,
88 own_pos_public_key: Option<(ConsensusPublicKey, ConsensusVRFPublicKey)>,
89 pos_genesis_state: GenesisPosState,
90 consensus_network_receiver: ConsensusNetworkReceivers,
91 mempool_network_receiver: MemPoolNetworkReceivers,
92 test_command_receiver: channel::Receiver<TestCommand>,
93 hsb_protocol: Arc<HotStuffSynchronizationProtocol>,
94) -> PosDropHandle {
95 crash_handler::setup_panic_handler();
96
97 let mut logger = diem_logger::Logger::new();
98 logger
99 .channel_size(config.logger.chan_size)
100 .is_async(config.logger.is_async)
101 .level(config.logger.level)
102 .read_env();
103 if let Some(log_file) = config.logger.file.clone() {
104 if let Some(parent) = log_file.parent() {
105 fs::create_dir_all(parent).expect(&format!(
106 "error creating PoS log file directory: parent={:?}",
107 parent
108 ));
109 }
110 let writer = match config.logger.rotation_count {
111 Some(count) => Box::new(RollingFileWriter::new(
112 log_file,
113 count,
114 config.logger.rotation_file_size_mb.unwrap_or(500),
115 ))
116 as Box<dyn Writer + Send + Sync + 'static>,
117 None => Box::new(FileWriter::new(log_file))
118 as Box<dyn Writer + Send + Sync + 'static>,
119 };
120 logger.printer(writer);
121 }
122 let _logger = Some(logger.build());
123
124 diem_info!(config = config, "Loaded Pos config");
126
127 if fail::has_failpoints() {
139 diem_warn!("Failpoints is enabled");
140 if let Some(failpoints) = &config.failpoints {
141 for (point, actions) in failpoints {
142 fail::cfg(point, actions)
143 .expect("fail to set actions for failpoint");
144 }
145 }
146 } else if config.failpoints.is_some() {
147 diem_warn!("failpoints is set in config, but the binary doesn't compile with this feature");
148 }
149
150 setup_pos_environment(
151 &config,
152 network,
153 protocol_config,
154 own_pos_public_key,
155 pos_genesis_state,
156 consensus_network_receiver,
157 mempool_network_receiver,
158 test_command_receiver,
159 hsb_protocol,
160 )
161}
162
163#[allow(unused)]
164fn setup_metrics(peer_id: PeerId, config: &NodeConfig) {
165 diem_metrics::dump_all_metrics_to_file_periodically(
166 &config.metrics.dir(),
167 &format!("{}.metrics", peer_id),
168 config.metrics.collection_interval_ms,
169 );
170}
171
172pub fn setup_pos_environment(
173 node_config: &NodeConfig, network: Arc<NetworkService>,
174 protocol_config: ProtocolConfiguration,
175 own_pos_public_key: Option<(ConsensusPublicKey, ConsensusVRFPublicKey)>,
176 pos_genesis_state: GenesisPosState,
177 consensus_network_receiver: ConsensusNetworkReceivers,
178 mempool_network_receiver: MemPoolNetworkReceivers,
179 test_command_receiver: channel::Receiver<TestCommand>,
180 hsb_protocol: Arc<HotStuffSynchronizationProtocol>,
181) -> PosDropHandle {
182 let mut instant = Instant::now();
200 let (pos_ledger_db, db_rw) = DbReaderWriter::wrap(
201 PosLedgerDB::open(
202 &node_config.storage.dir(),
203 false, node_config.storage.prune_window,
205 node_config.storage.rocksdb_config,
206 )
207 .expect("DB should open."),
208 );
209
210 if let Some(genesis) = get_genesis_txn(&node_config) {
212 maybe_bootstrap::<PosVM>(
213 &db_rw,
214 genesis,
215 Some(PivotBlockDecision {
216 block_hash: protocol_config.pos_genesis_pivot_decision,
217 height: 0,
218 }),
219 pos_genesis_state.initial_seed.as_bytes().to_vec(),
220 pos_genesis_state
221 .initial_nodes
222 .into_iter()
223 .map(|node| {
224 (NodeID::new(node.bls_key, node.vrf_key), node.voting_power)
225 })
226 .collect(),
227 pos_genesis_state.initial_committee,
228 )
229 .expect("Db-bootstrapper should not fail.");
230 } else {
231 panic!("Genesis txn not provided.");
232 }
233
234 debug!(
235 "Storage service started in {} ms",
236 instant.elapsed().as_millis()
237 );
238
239 let mut reconfig_subscriptions = vec![];
240
241 let (mempool_reconfig_subscription, mempool_reconfig_events) =
242 gen_mempool_reconfig_subscription();
243 reconfig_subscriptions.push(mempool_reconfig_subscription);
244 let (consensus_reconfig_subscription, consensus_reconfig_events) =
246 gen_consensus_reconfig_subscription();
247 if node_config.base.role.is_validator() {
248 reconfig_subscriptions.push(consensus_reconfig_subscription);
249 }
250
251 let (state_sync_to_mempool_sender, state_sync_requests) =
253 channel(INTRA_NODE_CHANNEL_BUFFER_SIZE);
254 let state_sync_bootstrapper = StateSyncBootstrapper::bootstrap(
255 state_sync_to_mempool_sender,
256 Arc::clone(&db_rw.reader),
257 node_config,
258 reconfig_subscriptions,
259 );
260
261 let state_sync_client = state_sync_bootstrapper
262 .create_client(node_config.state_sync.client_commit_timeout_ms);
263
264 let (consensus_to_mempool_sender, consensus_requests) =
265 channel(INTRA_NODE_CHANNEL_BUFFER_SIZE);
266
267 let network_sender = NetworkSender {
268 network,
269 protocol_handler: hsb_protocol,
270 };
271
272 let (mp_client_sender, mp_client_events) =
273 channel(AC_SMP_CHANNEL_BUFFER_SIZE);
274
275 let db_with_cache = Arc::new(CachedPosLedgerDB::new(db_rw));
276
277 instant = Instant::now();
278 let mempool = diem_mempool::bootstrap(
279 node_config,
280 db_with_cache.clone(),
281 network_sender.clone(),
282 mempool_network_receiver,
283 mp_client_events,
284 consensus_requests,
285 state_sync_requests,
286 mempool_reconfig_events,
287 );
288 debug!("Mempool started in {} ms", instant.elapsed().as_millis());
289
290 debug!("Wait until state sync is initialized");
296 block_on(state_sync_client.wait_until_initialized())
297 .expect("State sync initialization failure");
298 debug!("State sync initialization complete.");
299
300 instant = Instant::now();
302 debug!("own_pos_public_key: {:?}", own_pos_public_key);
303 let (consensus_runtime, pow_handler, stopped, consensus_db) =
304 start_consensus(
305 node_config,
306 network_sender,
307 consensus_network_receiver,
308 consensus_to_mempool_sender,
309 state_sync_client,
310 pos_ledger_db.clone(),
311 db_with_cache.clone(),
312 consensus_reconfig_events,
313 own_pos_public_key.map_or_else(
314 || AccountAddress::random(),
315 |public_key| {
316 from_consensus_public_key(&public_key.0, &public_key.1)
317 },
318 ),
319 mp_client_sender.clone(),
320 test_command_receiver,
321 protocol_config.pos_started_as_voter,
322 );
323 debug!("Consensus started in {} ms", instant.elapsed().as_millis());
324
325 PosDropHandle {
326 pow_handler,
327 _consensus_runtime: consensus_runtime,
328 stopped,
329 _state_sync_bootstrapper: state_sync_bootstrapper,
330 _mempool: mempool,
331 pos_ledger_db,
332 cached_db: db_with_cache,
333 consensus_db,
334 tx_sender: mp_client_sender,
335 }
336}
337
338impl Drop for PosDropHandle {
339 fn drop(&mut self) {
340 debug!("Drop PosDropHandle");
341 self.stopped.store(true, Ordering::SeqCst);
342 self.pow_handler.stop();
343 }
344}