cfxcore/pos/
pos.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8use crate::{
9    genesis_block::GenesisPosState,
10    pos::{
11        consensus::{
12            consensus_provider::start_consensus,
13            gen_consensus_reconfig_subscription,
14            network::NetworkReceivers as ConsensusNetworkReceivers,
15            ConsensusDB, TestCommand,
16        },
17        mempool as diem_mempool,
18        mempool::{
19            gen_mempool_reconfig_subscription,
20            network::NetworkReceivers as MemPoolNetworkReceivers,
21            SubmissionStatus,
22        },
23        pow_handler::PowHandler,
24        protocol::{
25            network_sender::NetworkSender,
26            sync_protocol::HotStuffSynchronizationProtocol,
27        },
28        state_sync::bootstrapper::StateSyncBootstrapper,
29    },
30    sync::ProtocolConfiguration,
31};
32
33use cached_pos_ledger_db::CachedPosLedgerDB;
34use consensus_types::db::FakeLedgerBlockDB;
35use diem_config::{config::NodeConfig, utils::get_genesis_txn};
36use diem_logger::{prelude::*, Writer};
37use diem_types::{
38    account_address::{from_consensus_public_key, AccountAddress},
39    block_info::PivotBlockDecision,
40    term_state::NodeID,
41    transaction::SignedTransaction,
42    validator_config::{ConsensusPublicKey, ConsensusVRFPublicKey},
43    PeerId,
44};
45use executor::{db_bootstrapper::maybe_bootstrap, vm::PosVM, Executor};
46use executor_types::ChunkExecutor;
47use futures::{
48    channel::{
49        mpsc::{self, channel},
50        oneshot,
51    },
52    executor::block_on,
53};
54use network::NetworkService;
55use pos_ledger_db::PosLedgerDB;
56use pow_types::FakePowHandler;
57use std::{
58    boxed::Box,
59    fs,
60    sync::{
61        atomic::{AtomicBool, Ordering},
62        Arc,
63    },
64    time::Instant,
65};
66use storage_interface::DbReaderWriter;
67use tokio::runtime::Runtime;
68
69const AC_SMP_CHANNEL_BUFFER_SIZE: usize = 1_024;
70const INTRA_NODE_CHANNEL_BUFFER_SIZE: usize = 1;
71
72pub struct PosDropHandle {
73    // pow handler
74    pub pow_handler: Arc<PowHandler>,
75    pub pos_ledger_db: Arc<PosLedgerDB>,
76    pub cached_db: Arc<CachedPosLedgerDB>,
77    pub consensus_db: Arc<ConsensusDB>,
78    pub tx_sender: mpsc::Sender<(
79        SignedTransaction,
80        oneshot::Sender<anyhow::Result<SubmissionStatus>>,
81    )>,
82    pub stopped: Arc<AtomicBool>,
83    _mempool: Runtime,
84    _state_sync_bootstrapper: StateSyncBootstrapper,
85    _consensus_runtime: Runtime,
86}
87
88pub fn start_pos_consensus(
89    config: &NodeConfig, network: Arc<NetworkService>,
90    protocol_config: ProtocolConfiguration,
91    own_pos_public_key: Option<(ConsensusPublicKey, ConsensusVRFPublicKey)>,
92    pos_genesis_state: GenesisPosState,
93    consensus_network_receiver: ConsensusNetworkReceivers,
94    mempool_network_receiver: MemPoolNetworkReceivers,
95    test_command_receiver: channel::Receiver<TestCommand>,
96    hsb_protocol: Arc<HotStuffSynchronizationProtocol>,
97) -> PosDropHandle {
98    crash_handler::setup_panic_handler();
99
100    let mut logger = diem_logger::Logger::new();
101    logger
102        .channel_size(config.logger.chan_size)
103        .is_async(config.logger.is_async)
104        .level(config.logger.level)
105        .read_env();
106    if let Some(log_file) = config.logger.file.clone() {
107        if let Some(parent) = log_file.parent() {
108            fs::create_dir_all(parent).expect(&format!(
109                "error creating PoS log file directory: parent={:?}",
110                parent
111            ));
112        }
113        let writer = match config.logger.rotation_count {
114            Some(count) => Box::new(RollingFileWriter::new(
115                log_file,
116                count,
117                config.logger.rotation_file_size_mb.unwrap_or(500),
118            ))
119                as Box<dyn Writer + Send + Sync + 'static>,
120            None => Box::new(FileWriter::new(log_file))
121                as Box<dyn Writer + Send + Sync + 'static>,
122        };
123        logger.printer(writer);
124    }
125    let _logger = Some(logger.build());
126
127    // Let's now log some important information, since the logger is set up
128    diem_info!(config = config, "Loaded Pos config");
129
130    /*if config.metrics.enabled {
131        for network in &config.full_node_networks {
132            let peer_id = network.peer_id();
133            setup_metrics(peer_id, &config);
134        }
135
136        if let Some(network) = config.validator_network.as_ref() {
137            let peer_id = network.peer_id();
138            setup_metrics(peer_id, &config);
139        }
140    }*/
141    if fail::has_failpoints() {
142        diem_warn!("Failpoints is enabled");
143        if let Some(failpoints) = &config.failpoints {
144            for (point, actions) in failpoints {
145                fail::cfg(point, actions)
146                    .expect("fail to set actions for failpoint");
147            }
148        }
149    } else if config.failpoints.is_some() {
150        diem_warn!("failpoints is set in config, but the binary doesn't compile with this feature");
151    }
152
153    setup_pos_environment(
154        &config,
155        network,
156        protocol_config,
157        own_pos_public_key,
158        pos_genesis_state,
159        consensus_network_receiver,
160        mempool_network_receiver,
161        test_command_receiver,
162        hsb_protocol,
163    )
164}
165
166#[allow(unused)]
167fn setup_metrics(peer_id: PeerId, config: &NodeConfig) {
168    diem_metrics::dump_all_metrics_to_file_periodically(
169        &config.metrics.dir(),
170        &format!("{}.metrics", peer_id),
171        config.metrics.collection_interval_ms,
172    );
173}
174
175fn setup_chunk_executor(db: DbReaderWriter) -> Box<dyn ChunkExecutor> {
176    Box::new(Executor::<PosVM>::new(
177        Arc::new(CachedPosLedgerDB::new(db)),
178        Arc::new(FakePowHandler {}),
179        Arc::new(FakeLedgerBlockDB {}),
180    ))
181}
182
183pub fn setup_pos_environment(
184    node_config: &NodeConfig, network: Arc<NetworkService>,
185    protocol_config: ProtocolConfiguration,
186    own_pos_public_key: Option<(ConsensusPublicKey, ConsensusVRFPublicKey)>,
187    pos_genesis_state: GenesisPosState,
188    consensus_network_receiver: ConsensusNetworkReceivers,
189    mempool_network_receiver: MemPoolNetworkReceivers,
190    test_command_receiver: channel::Receiver<TestCommand>,
191    hsb_protocol: Arc<HotStuffSynchronizationProtocol>,
192) -> PosDropHandle {
193    // TODO(lpl): Handle port conflict.
194    // let metrics_port = node_config.debug_interface.metrics_server_port;
195    // let metric_host = node_config.debug_interface.address.clone();
196    // thread::spawn(move || {
197    //     metric_server::start_server(metric_host, metrics_port, false)
198    // });
199    // let public_metrics_port =
200    //     node_config.debug_interface.public_metrics_server_port;
201    // let public_metric_host = node_config.debug_interface.address.clone();
202    // thread::spawn(move || {
203    //     metric_server::start_server(
204    //         public_metric_host,
205    //         public_metrics_port,
206    //         true,
207    //     )
208    // });
209
210    let mut instant = Instant::now();
211    let (pos_ledger_db, db_rw) = DbReaderWriter::wrap(
212        PosLedgerDB::open(
213            &node_config.storage.dir(),
214            false, /* readonly */
215            node_config.storage.prune_window,
216            node_config.storage.rocksdb_config,
217        )
218        .expect("DB should open."),
219    );
220
221    let genesis_waypoint = node_config.base.waypoint.genesis_waypoint();
222    // if there's genesis txn and waypoint, commit it if the result matches.
223    if let Some(genesis) = get_genesis_txn(&node_config) {
224        maybe_bootstrap::<PosVM>(
225            &db_rw,
226            genesis,
227            genesis_waypoint,
228            Some(PivotBlockDecision {
229                block_hash: protocol_config.pos_genesis_pivot_decision,
230                height: 0,
231            }),
232            pos_genesis_state.initial_seed.as_bytes().to_vec(),
233            pos_genesis_state
234                .initial_nodes
235                .into_iter()
236                .map(|node| {
237                    (NodeID::new(node.bls_key, node.vrf_key), node.voting_power)
238                })
239                .collect(),
240            pos_genesis_state.initial_committee,
241        )
242        .expect("Db-bootstrapper should not fail.");
243    } else {
244        panic!("Genesis txn not provided.");
245    }
246
247    debug!(
248        "Storage service started in {} ms",
249        instant.elapsed().as_millis()
250    );
251
252    instant = Instant::now();
253    let chunk_executor = setup_chunk_executor(db_rw.clone());
254    debug!(
255        "ChunkExecutor setup in {} ms",
256        instant.elapsed().as_millis()
257    );
258    let mut reconfig_subscriptions = vec![];
259
260    let (mempool_reconfig_subscription, mempool_reconfig_events) =
261        gen_mempool_reconfig_subscription();
262    reconfig_subscriptions.push(mempool_reconfig_subscription);
263    // consensus has to subscribe to ALL on-chain configs
264    let (consensus_reconfig_subscription, consensus_reconfig_events) =
265        gen_consensus_reconfig_subscription();
266    if node_config.base.role.is_validator() {
267        reconfig_subscriptions.push(consensus_reconfig_subscription);
268    }
269
270    // for state sync to send requests to mempool
271    let (state_sync_to_mempool_sender, state_sync_requests) =
272        channel(INTRA_NODE_CHANNEL_BUFFER_SIZE);
273    let state_sync_bootstrapper = StateSyncBootstrapper::bootstrap(
274        state_sync_to_mempool_sender,
275        Arc::clone(&db_rw.reader),
276        chunk_executor,
277        node_config,
278        genesis_waypoint,
279        reconfig_subscriptions,
280    );
281
282    let state_sync_client = state_sync_bootstrapper
283        .create_client(node_config.state_sync.client_commit_timeout_ms);
284
285    let (consensus_to_mempool_sender, consensus_requests) =
286        channel(INTRA_NODE_CHANNEL_BUFFER_SIZE);
287
288    let network_sender = NetworkSender {
289        network,
290        protocol_handler: hsb_protocol,
291    };
292
293    let (mp_client_sender, mp_client_events) =
294        channel(AC_SMP_CHANNEL_BUFFER_SIZE);
295
296    let db_with_cache = Arc::new(CachedPosLedgerDB::new(db_rw));
297
298    instant = Instant::now();
299    let mempool = diem_mempool::bootstrap(
300        node_config,
301        db_with_cache.clone(),
302        network_sender.clone(),
303        mempool_network_receiver,
304        mp_client_events,
305        consensus_requests,
306        state_sync_requests,
307        mempool_reconfig_events,
308    );
309    debug!("Mempool started in {} ms", instant.elapsed().as_millis());
310
311    // Make sure that state synchronizer is caught up at least to its waypoint
312    // (in case it's present). There is no sense to start consensus prior to
313    // that. TODO: Note that we need the networking layer to be able to
314    // discover & connect to the peers with potentially outdated network
315    // identity public keys.
316    debug!("Wait until state sync is initialized");
317    block_on(state_sync_client.wait_until_initialized())
318        .expect("State sync initialization failure");
319    debug!("State sync initialization complete.");
320
321    // Initialize and start consensus.
322    instant = Instant::now();
323    debug!("own_pos_public_key: {:?}", own_pos_public_key);
324    let (consensus_runtime, pow_handler, stopped, consensus_db) =
325        start_consensus(
326            node_config,
327            network_sender,
328            consensus_network_receiver,
329            consensus_to_mempool_sender,
330            state_sync_client,
331            pos_ledger_db.clone(),
332            db_with_cache.clone(),
333            consensus_reconfig_events,
334            own_pos_public_key.map_or_else(
335                || AccountAddress::random(),
336                |public_key| {
337                    from_consensus_public_key(&public_key.0, &public_key.1)
338                },
339            ),
340            mp_client_sender.clone(),
341            test_command_receiver,
342            protocol_config.pos_started_as_voter,
343        );
344    debug!("Consensus started in {} ms", instant.elapsed().as_millis());
345
346    PosDropHandle {
347        pow_handler,
348        _consensus_runtime: consensus_runtime,
349        stopped,
350        _state_sync_bootstrapper: state_sync_bootstrapper,
351        _mempool: mempool,
352        pos_ledger_db,
353        cached_db: db_with_cache,
354        consensus_db,
355        tx_sender: mp_client_sender,
356    }
357}
358
359impl Drop for PosDropHandle {
360    fn drop(&mut self) {
361        debug!("Drop PosDropHandle");
362        self.stopped.store(true, Ordering::SeqCst);
363        self.pow_handler.stop();
364    }
365}