cfxcore/pos/
pos.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8use crate::{
9    genesis_block::GenesisPosState,
10    pos::{
11        consensus::{
12            consensus_provider::start_consensus,
13            gen_consensus_reconfig_subscription,
14            network::NetworkReceivers as ConsensusNetworkReceivers,
15            ConsensusDB, TestCommand,
16        },
17        mempool as diem_mempool,
18        mempool::{
19            gen_mempool_reconfig_subscription,
20            network::NetworkReceivers as MemPoolNetworkReceivers,
21            SubmissionStatus,
22        },
23        pow_handler::PowHandler,
24        protocol::{
25            network_sender::NetworkSender,
26            sync_protocol::HotStuffSynchronizationProtocol,
27        },
28        state_sync::bootstrapper::StateSyncBootstrapper,
29    },
30    sync::ProtocolConfiguration,
31};
32
33use cached_pos_ledger_db::CachedPosLedgerDB;
34use diem_config::{config::NodeConfig, utils::get_genesis_txn};
35use diem_logger::{prelude::*, Writer};
36use diem_types::{
37    account_address::{from_consensus_public_key, AccountAddress},
38    block_info::PivotBlockDecision,
39    term_state::NodeID,
40    transaction::SignedTransaction,
41    validator_config::{ConsensusPublicKey, ConsensusVRFPublicKey},
42    PeerId,
43};
44use executor::{db_bootstrapper::maybe_bootstrap, vm::PosVM};
45use futures::{
46    channel::{
47        mpsc::{self, channel},
48        oneshot,
49    },
50    executor::block_on,
51};
52use network::NetworkService;
53use pos_ledger_db::PosLedgerDB;
54use std::{
55    boxed::Box,
56    fs,
57    sync::{
58        atomic::{AtomicBool, Ordering},
59        Arc,
60    },
61    time::Instant,
62};
63use storage_interface::DbReaderWriter;
64use tokio::runtime::Runtime;
65
66const AC_SMP_CHANNEL_BUFFER_SIZE: usize = 1_024;
67const INTRA_NODE_CHANNEL_BUFFER_SIZE: usize = 1;
68
69pub struct PosDropHandle {
70    // pow handler
71    pub pow_handler: Arc<PowHandler>,
72    pub pos_ledger_db: Arc<PosLedgerDB>,
73    pub cached_db: Arc<CachedPosLedgerDB>,
74    pub consensus_db: Arc<ConsensusDB>,
75    pub tx_sender: mpsc::Sender<(
76        SignedTransaction,
77        oneshot::Sender<anyhow::Result<SubmissionStatus>>,
78    )>,
79    pub stopped: Arc<AtomicBool>,
80    _mempool: Runtime,
81    _state_sync_bootstrapper: StateSyncBootstrapper,
82    _consensus_runtime: Runtime,
83}
84
85pub fn start_pos_consensus(
86    config: &NodeConfig, network: Arc<NetworkService>,
87    protocol_config: ProtocolConfiguration,
88    own_pos_public_key: Option<(ConsensusPublicKey, ConsensusVRFPublicKey)>,
89    pos_genesis_state: GenesisPosState,
90    consensus_network_receiver: ConsensusNetworkReceivers,
91    mempool_network_receiver: MemPoolNetworkReceivers,
92    test_command_receiver: channel::Receiver<TestCommand>,
93    hsb_protocol: Arc<HotStuffSynchronizationProtocol>,
94) -> PosDropHandle {
95    crash_handler::setup_panic_handler();
96
97    let mut logger = diem_logger::Logger::new();
98    logger
99        .channel_size(config.logger.chan_size)
100        .is_async(config.logger.is_async)
101        .level(config.logger.level)
102        .read_env();
103    if let Some(log_file) = config.logger.file.clone() {
104        if let Some(parent) = log_file.parent() {
105            fs::create_dir_all(parent).expect(&format!(
106                "error creating PoS log file directory: parent={:?}",
107                parent
108            ));
109        }
110        let writer = match config.logger.rotation_count {
111            Some(count) => Box::new(RollingFileWriter::new(
112                log_file,
113                count,
114                config.logger.rotation_file_size_mb.unwrap_or(500),
115            ))
116                as Box<dyn Writer + Send + Sync + 'static>,
117            None => Box::new(FileWriter::new(log_file))
118                as Box<dyn Writer + Send + Sync + 'static>,
119        };
120        logger.printer(writer);
121    }
122    let _logger = Some(logger.build());
123
124    // Let's now log some important information, since the logger is set up
125    diem_info!(config = config, "Loaded Pos config");
126
127    /*if config.metrics.enabled {
128        for network in &config.full_node_networks {
129            let peer_id = network.peer_id();
130            setup_metrics(peer_id, &config);
131        }
132
133        if let Some(network) = config.validator_network.as_ref() {
134            let peer_id = network.peer_id();
135            setup_metrics(peer_id, &config);
136        }
137    }*/
138    if fail::has_failpoints() {
139        diem_warn!("Failpoints is enabled");
140        if let Some(failpoints) = &config.failpoints {
141            for (point, actions) in failpoints {
142                fail::cfg(point, actions)
143                    .expect("fail to set actions for failpoint");
144            }
145        }
146    } else if config.failpoints.is_some() {
147        diem_warn!("failpoints is set in config, but the binary doesn't compile with this feature");
148    }
149
150    setup_pos_environment(
151        &config,
152        network,
153        protocol_config,
154        own_pos_public_key,
155        pos_genesis_state,
156        consensus_network_receiver,
157        mempool_network_receiver,
158        test_command_receiver,
159        hsb_protocol,
160    )
161}
162
163#[allow(unused)]
164fn setup_metrics(peer_id: PeerId, config: &NodeConfig) {
165    diem_metrics::dump_all_metrics_to_file_periodically(
166        &config.metrics.dir(),
167        &format!("{}.metrics", peer_id),
168        config.metrics.collection_interval_ms,
169    );
170}
171
172pub fn setup_pos_environment(
173    node_config: &NodeConfig, network: Arc<NetworkService>,
174    protocol_config: ProtocolConfiguration,
175    own_pos_public_key: Option<(ConsensusPublicKey, ConsensusVRFPublicKey)>,
176    pos_genesis_state: GenesisPosState,
177    consensus_network_receiver: ConsensusNetworkReceivers,
178    mempool_network_receiver: MemPoolNetworkReceivers,
179    test_command_receiver: channel::Receiver<TestCommand>,
180    hsb_protocol: Arc<HotStuffSynchronizationProtocol>,
181) -> PosDropHandle {
182    // TODO(lpl): Handle port conflict.
183    // let metrics_port = node_config.debug_interface.metrics_server_port;
184    // let metric_host = node_config.debug_interface.address.clone();
185    // thread::spawn(move || {
186    //     metric_server::start_server(metric_host, metrics_port, false)
187    // });
188    // let public_metrics_port =
189    //     node_config.debug_interface.public_metrics_server_port;
190    // let public_metric_host = node_config.debug_interface.address.clone();
191    // thread::spawn(move || {
192    //     metric_server::start_server(
193    //         public_metric_host,
194    //         public_metrics_port,
195    //         true,
196    //     )
197    // });
198
199    let mut instant = Instant::now();
200    let (pos_ledger_db, db_rw) = DbReaderWriter::wrap(
201        PosLedgerDB::open(
202            &node_config.storage.dir(),
203            false, /* readonly */
204            node_config.storage.prune_window,
205            node_config.storage.rocksdb_config,
206        )
207        .expect("DB should open."),
208    );
209
210    // If the DB hasn't been bootstrapped yet, commit genesis.
211    if let Some(genesis) = get_genesis_txn(&node_config) {
212        maybe_bootstrap::<PosVM>(
213            &db_rw,
214            genesis,
215            Some(PivotBlockDecision {
216                block_hash: protocol_config.pos_genesis_pivot_decision,
217                height: 0,
218            }),
219            pos_genesis_state.initial_seed.as_bytes().to_vec(),
220            pos_genesis_state
221                .initial_nodes
222                .into_iter()
223                .map(|node| {
224                    (NodeID::new(node.bls_key, node.vrf_key), node.voting_power)
225                })
226                .collect(),
227            pos_genesis_state.initial_committee,
228        )
229        .expect("Db-bootstrapper should not fail.");
230    } else {
231        panic!("Genesis txn not provided.");
232    }
233
234    debug!(
235        "Storage service started in {} ms",
236        instant.elapsed().as_millis()
237    );
238
239    let mut reconfig_subscriptions = vec![];
240
241    let (mempool_reconfig_subscription, mempool_reconfig_events) =
242        gen_mempool_reconfig_subscription();
243    reconfig_subscriptions.push(mempool_reconfig_subscription);
244    // consensus has to subscribe to ALL on-chain configs
245    let (consensus_reconfig_subscription, consensus_reconfig_events) =
246        gen_consensus_reconfig_subscription();
247    if node_config.base.role.is_validator() {
248        reconfig_subscriptions.push(consensus_reconfig_subscription);
249    }
250
251    // for state sync to send requests to mempool
252    let (state_sync_to_mempool_sender, state_sync_requests) =
253        channel(INTRA_NODE_CHANNEL_BUFFER_SIZE);
254    let state_sync_bootstrapper = StateSyncBootstrapper::bootstrap(
255        state_sync_to_mempool_sender,
256        Arc::clone(&db_rw.reader),
257        node_config,
258        reconfig_subscriptions,
259    );
260
261    let state_sync_client = state_sync_bootstrapper
262        .create_client(node_config.state_sync.client_commit_timeout_ms);
263
264    let (consensus_to_mempool_sender, consensus_requests) =
265        channel(INTRA_NODE_CHANNEL_BUFFER_SIZE);
266
267    let network_sender = NetworkSender {
268        network,
269        protocol_handler: hsb_protocol,
270    };
271
272    let (mp_client_sender, mp_client_events) =
273        channel(AC_SMP_CHANNEL_BUFFER_SIZE);
274
275    let db_with_cache = Arc::new(CachedPosLedgerDB::new(db_rw));
276
277    instant = Instant::now();
278    let mempool = diem_mempool::bootstrap(
279        node_config,
280        db_with_cache.clone(),
281        network_sender.clone(),
282        mempool_network_receiver,
283        mp_client_events,
284        consensus_requests,
285        state_sync_requests,
286        mempool_reconfig_events,
287    );
288    debug!("Mempool started in {} ms", instant.elapsed().as_millis());
289
290    // Make sure that state synchronizer is caught up at least to its waypoint
291    // (in case it's present). There is no sense to start consensus prior to
292    // that. TODO: Note that we need the networking layer to be able to
293    // discover & connect to the peers with potentially outdated network
294    // identity public keys.
295    debug!("Wait until state sync is initialized");
296    block_on(state_sync_client.wait_until_initialized())
297        .expect("State sync initialization failure");
298    debug!("State sync initialization complete.");
299
300    // Initialize and start consensus.
301    instant = Instant::now();
302    debug!("own_pos_public_key: {:?}", own_pos_public_key);
303    let (consensus_runtime, pow_handler, stopped, consensus_db) =
304        start_consensus(
305            node_config,
306            network_sender,
307            consensus_network_receiver,
308            consensus_to_mempool_sender,
309            state_sync_client,
310            pos_ledger_db.clone(),
311            db_with_cache.clone(),
312            consensus_reconfig_events,
313            own_pos_public_key.map_or_else(
314                || AccountAddress::random(),
315                |public_key| {
316                    from_consensus_public_key(&public_key.0, &public_key.1)
317                },
318            ),
319            mp_client_sender.clone(),
320            test_command_receiver,
321            protocol_config.pos_started_as_voter,
322        );
323    debug!("Consensus started in {} ms", instant.elapsed().as_millis());
324
325    PosDropHandle {
326        pow_handler,
327        _consensus_runtime: consensus_runtime,
328        stopped,
329        _state_sync_bootstrapper: state_sync_bootstrapper,
330        _mempool: mempool,
331        pos_ledger_db,
332        cached_db: db_with_cache,
333        consensus_db,
334        tx_sender: mp_client_sender,
335    }
336}
337
338impl Drop for PosDropHandle {
339    fn drop(&mut self) {
340        debug!("Drop PosDropHandle");
341        self.stopped.store(true, Ordering::SeqCst);
342        self.pow_handler.stop();
343    }
344}