cfxcore/pos/
pos.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8use crate::{
9    genesis_block::GenesisPosState,
10    pos::{
11        consensus::{
12            consensus_provider::start_consensus,
13            network::NetworkReceivers as ConsensusNetworkReceivers,
14            ConsensusDB, TestCommand,
15        },
16        mempool as diem_mempool,
17        mempool::{
18            network::NetworkReceivers as MemPoolNetworkReceivers,
19            SubmissionStatus,
20        },
21        pow_handler::PowHandler,
22        protocol::{
23            network_sender::NetworkSender,
24            sync_protocol::HotStuffSynchronizationProtocol,
25        },
26    },
27    sync::ProtocolConfiguration,
28};
29
30use cached_pos_ledger_db::CachedPosLedgerDB;
31use cfx_types::U256;
32use diem_config::config::NodeConfig;
33use diem_logger::{prelude::*, Writer};
34use diem_types::{
35    account_address::AccountAddress,
36    block_info::PivotBlockDecision,
37    chain_id::ChainId,
38    term_state::NodeID,
39    transaction::SignedTransaction,
40    validator_config::{ConsensusPrivateKey, ConsensusVRFPrivateKey},
41};
42use executor::db_bootstrapper::maybe_bootstrap;
43use futures::channel::{
44    mpsc::{self, channel},
45    oneshot,
46};
47use network::NetworkService;
48use pos_ledger_db::PosLedgerDB;
49use std::{
50    boxed::Box,
51    fs,
52    sync::{
53        atomic::{AtomicBool, Ordering},
54        Arc,
55    },
56    time::Instant,
57};
58use storage_interface::DbReaderWriter;
59use tokio::runtime::Runtime;
60
61const AC_SMP_CHANNEL_BUFFER_SIZE: usize = 1_024;
62const INTRA_NODE_CHANNEL_BUFFER_SIZE: usize = 1;
63
64/// This validator's identity and signing secrets.
65///
66/// These used to live inside `SafetyRulesConfig.test` / `.vrf_private_key`,
67/// but they are supplied by the Conflux-side TOML and have nothing to do
68/// with per-chain parameters.
69pub struct PosNodeKeys {
70    pub author: AccountAddress,
71    pub consensus_private_key: ConsensusPrivateKey,
72    pub vrf_private_key: ConsensusVRFPrivateKey,
73}
74
75/// Chain-wide PoS consensus parameters that every validator must agree on.
76///
77/// These used to live inside `ConsensusConfig.chain_id` /
78/// `SafetyRulesConfig.vrf_proposal_threshold`, but they are not per-operator
79/// choices — they're constants of the chain, derived from the network (for
80/// `chain_id`) or fixed by protocol (for `vrf_proposal_threshold`).
81pub struct PosChainParams {
82    pub chain_id: ChainId,
83    pub vrf_proposal_threshold: U256,
84}
85
86pub struct PosDropHandle {
87    // pow handler
88    pub pow_handler: Arc<PowHandler>,
89    pub pos_ledger_db: Arc<PosLedgerDB>,
90    pub cached_db: Arc<CachedPosLedgerDB>,
91    pub consensus_db: Arc<ConsensusDB>,
92    pub tx_sender: mpsc::Sender<(
93        SignedTransaction,
94        oneshot::Sender<anyhow::Result<SubmissionStatus>>,
95    )>,
96    pub stopped: Arc<AtomicBool>,
97    _mempool: Runtime,
98    _consensus_runtime: Runtime,
99}
100
101pub fn start_pos_consensus(
102    config: &NodeConfig, network: Arc<NetworkService>,
103    protocol_config: ProtocolConfiguration, node_keys: PosNodeKeys,
104    chain_params: PosChainParams, pos_genesis_state: GenesisPosState,
105    consensus_network_receiver: ConsensusNetworkReceivers,
106    mempool_network_receiver: MemPoolNetworkReceivers,
107    test_command_receiver: mpsc::Receiver<TestCommand>,
108    hsb_protocol: Arc<HotStuffSynchronizationProtocol>,
109) -> PosDropHandle {
110    let mut logger = diem_logger::Logger::new();
111    logger
112        .channel_size(config.logger.chan_size)
113        .is_async(config.logger.is_async)
114        .level(config.logger.level)
115        .read_env();
116    if let Some(log_file) = config.logger.file.clone() {
117        if let Some(parent) = log_file.parent() {
118            fs::create_dir_all(parent).expect(&format!(
119                "error creating PoS log file directory: parent={:?}",
120                parent
121            ));
122        }
123        let writer = match config.logger.rotation_count {
124            Some(count) => Box::new(RollingFileWriter::new(
125                log_file,
126                count,
127                config.logger.rotation_file_size_mb.unwrap_or(500),
128            ))
129                as Box<dyn Writer + Send + Sync + 'static>,
130            None => Box::new(FileWriter::new(log_file))
131                as Box<dyn Writer + Send + Sync + 'static>,
132        };
133        logger.printer(writer);
134    }
135    let _logger = Some(logger.build());
136
137    // Let's now log some important information, since the logger is set up
138    diem_info!(config = config, "Loaded Pos config");
139
140    if fail::has_failpoints() {
141        diem_warn!("Failpoints is enabled");
142        if let Some(failpoints) = &config.failpoints {
143            for (point, actions) in failpoints {
144                fail::cfg(point, actions)
145                    .expect("fail to set actions for failpoint");
146            }
147        }
148    } else if config.failpoints.is_some() {
149        diem_warn!("failpoints is set in config, but the binary doesn't compile with this feature");
150    }
151
152    setup_pos_environment(
153        &config,
154        network,
155        protocol_config,
156        node_keys,
157        chain_params,
158        pos_genesis_state,
159        consensus_network_receiver,
160        mempool_network_receiver,
161        test_command_receiver,
162        hsb_protocol,
163    )
164}
165
166pub fn setup_pos_environment(
167    node_config: &NodeConfig, network: Arc<NetworkService>,
168    protocol_config: ProtocolConfiguration, node_keys: PosNodeKeys,
169    chain_params: PosChainParams, pos_genesis_state: GenesisPosState,
170    consensus_network_receiver: ConsensusNetworkReceivers,
171    mempool_network_receiver: MemPoolNetworkReceivers,
172    test_command_receiver: mpsc::Receiver<TestCommand>,
173    hsb_protocol: Arc<HotStuffSynchronizationProtocol>,
174) -> PosDropHandle {
175    let mut instant = Instant::now();
176    let (pos_ledger_db, db_rw) = DbReaderWriter::wrap(
177        PosLedgerDB::open(
178            &node_config.storage.dir(),
179            false, /* readonly */
180            node_config.storage.rocksdb_config,
181        )
182        .expect("DB should open."),
183    );
184
185    // If the DB hasn't been bootstrapped yet, commit genesis.
186    maybe_bootstrap(
187        &db_rw,
188        Some(PivotBlockDecision {
189            block_hash: protocol_config.pos_genesis_pivot_decision,
190            height: 0,
191        }),
192        pos_genesis_state.initial_seed.as_bytes().to_vec(),
193        pos_genesis_state
194            .initial_nodes
195            .into_iter()
196            .map(|node| {
197                (NodeID::new(node.bls_key, node.vrf_key), node.voting_power)
198            })
199            .collect(),
200        pos_genesis_state.initial_committee,
201    )
202    .expect("Db-bootstrapper should not fail.");
203
204    debug!(
205        "Storage service started in {} ms",
206        instant.elapsed().as_millis()
207    );
208
209    // Channel for consensus to notify mempool of committed transactions.
210    let (mempool_commit_sender, mempool_commit_receiver) =
211        channel(INTRA_NODE_CHANNEL_BUFFER_SIZE);
212
213    let (consensus_to_mempool_sender, consensus_requests) =
214        channel(INTRA_NODE_CHANNEL_BUFFER_SIZE);
215
216    let network_sender = NetworkSender {
217        network,
218        protocol_handler: hsb_protocol,
219    };
220
221    let (mp_client_sender, mp_client_events) =
222        channel(AC_SMP_CHANNEL_BUFFER_SIZE);
223
224    let db_with_cache = Arc::new(CachedPosLedgerDB::new(db_rw));
225
226    instant = Instant::now();
227    let mempool = diem_mempool::bootstrap(
228        node_config,
229        db_with_cache.clone(),
230        network_sender.clone(),
231        mempool_network_receiver,
232        mp_client_events,
233        consensus_requests,
234        mempool_commit_receiver,
235    );
236    debug!("Mempool started in {} ms", instant.elapsed().as_millis());
237
238    // Initialize and start consensus.
239    instant = Instant::now();
240    debug!("pos author: {:?}", node_keys.author);
241    let (consensus_runtime, pow_handler, stopped, consensus_db) =
242        start_consensus(
243            node_config,
244            network_sender,
245            consensus_network_receiver,
246            consensus_to_mempool_sender,
247            mempool_commit_sender.clone(),
248            node_config.consensus.mempool_commit_timeout_ms,
249            pos_ledger_db.clone(),
250            db_with_cache.clone(),
251            node_keys,
252            chain_params,
253            mp_client_sender.clone(),
254            test_command_receiver,
255            protocol_config.pos_started_as_voter,
256        );
257    debug!("Consensus started in {} ms", instant.elapsed().as_millis());
258
259    PosDropHandle {
260        pow_handler,
261        _consensus_runtime: consensus_runtime,
262        stopped,
263        _mempool: mempool,
264        pos_ledger_db,
265        cached_db: db_with_cache,
266        consensus_db,
267        tx_sender: mp_client_sender,
268    }
269}
270
271impl Drop for PosDropHandle {
272    fn drop(&mut self) {
273        debug!("Drop PosDropHandle");
274        self.stopped.store(true, Ordering::SeqCst);
275        self.pow_handler.stop();
276    }
277}