1use std::sync::{mpsc, Arc, Weak};
2
3use once_cell::sync::OnceCell;
4
5use cfx_types::{H256, U256, U64};
6use diem_config::{config::NodeConfig, keys::ConfigKey};
7use diem_crypto::HashValue;
8use diem_types::{
9 contract_event::ContractEvent,
10 epoch_state::EpochState,
11 reward_distribution_event::RewardDistributionEventV2,
12 term_state::{DisputeEvent, UnlockEvent},
13 validator_config::{ConsensusPrivateKey, ConsensusVRFPrivateKey},
14};
15use keccak_hash::keccak;
16use primitives::pos::{NodeId, PosBlockId};
17use storage_interface::{DBReaderForPoW, DbReader};
18
19use crate::{
20 genesis_block::GenesisPosState,
21 pos::{
22 consensus::{
23 network::{
24 NetworkReceivers as ConsensusNetworkReceivers,
25 NetworkTask as ConsensusNetworkTask,
26 },
27 ConsensusDB, TestCommand,
28 },
29 mempool::network::{
30 NetworkReceivers as MemPoolNetworkReceivers,
31 NetworkTask as MempoolNetworkTask,
32 },
33 pos::{start_pos_consensus, PosDropHandle},
34 protocol::sync_protocol::HotStuffSynchronizationProtocol,
35 },
36 sync::ProtocolConfiguration,
37 ConsensusGraph,
38};
39
40use cached_pos_ledger_db::CachedPosLedgerDB;
41use consensus_types::block::Block;
42use diem_config::config::SafetyRulesTestConfig;
43use diem_types::{
44 account_address::from_consensus_public_key,
45 block_info::{PivotBlockDecision, Round},
46 chain_id::ChainId,
47 epoch_state::HARDCODED_COMMITTEE_FOR_EPOCH,
48 term_state::pos_state_config::{PosStateConfig, POS_STATE_CONFIG},
49 transaction::TransactionPayload,
50};
51use network::NetworkService;
52use parking_lot::Mutex;
53use pos_ledger_db::PosLedgerDB;
54use std::{fs, io::Read, path::PathBuf};
55
56pub type PosVerifier = PosHandler;
57
58pub trait PosInterface: Send + Sync {
65 fn initialize(&self) -> Result<(), String>;
67
68 fn get_committed_block(&self, h: &PosBlockId) -> Option<PosBlock>;
72
73 fn latest_block(&self) -> PosBlockId;
76
77 fn get_events(
78 &self, from: &PosBlockId, to: &PosBlockId,
79 ) -> Vec<ContractEvent>;
80
81 fn get_epoch_ending_blocks(
82 &self, start_epoch: u64, end_epoch: u64,
83 ) -> Vec<PosBlockId>;
84
85 fn get_reward_event(&self, epoch: u64)
86 -> Option<RewardDistributionEventV2>;
87
88 fn get_epoch_state(&self, block_id: &PosBlockId) -> EpochState;
89
90 fn pos_ledger_db(&self) -> &Arc<PosLedgerDB>;
91
92 fn consensus_db(&self) -> &Arc<ConsensusDB>;
93
94 fn cached_db(&self) -> &Arc<CachedPosLedgerDB>;
95}
96
97#[allow(unused)]
98pub struct PosBlock {
99 hash: PosBlockId,
100 epoch: u64,
101 round: u64,
102 pivot_decision: H256,
103 version: u64,
104 view: u64,
105 }
109
110pub struct PosHandler {
111 pos: OnceCell<Box<dyn PosInterface>>,
112 network: Mutex<Option<Arc<NetworkService>>>,
113 drop_handle: Mutex<Option<PosDropHandle>>,
115 consensus_network_receiver: Mutex<Option<ConsensusNetworkReceivers>>,
116 mempool_network_receiver: Mutex<Option<MemPoolNetworkReceivers>>,
117 test_command_sender: Mutex<Option<channel::Sender<TestCommand>>>,
118 enable_height: u64,
119 hsb_protocol_handler: Option<Arc<HotStuffSynchronizationProtocol>>,
120 pub conf: PosConfiguration,
121}
122
123impl PosHandler {
124 pub fn new(
125 network: Option<Arc<NetworkService>>, conf: PosConfiguration,
126 enable_height: u64,
127 ) -> Self {
128 let mut pos = Self {
129 pos: OnceCell::new(),
130 network: Mutex::new(network.clone()),
131 drop_handle: Mutex::new(None),
132 consensus_network_receiver: Mutex::new(None),
133 mempool_network_receiver: Mutex::new(None),
134 test_command_sender: Mutex::new(None),
135 enable_height,
136 hsb_protocol_handler: None,
137 conf,
138 };
139 if let Some(network) = &network {
140 let (consensus_network_task, consensus_network_receiver) =
142 ConsensusNetworkTask::new();
143 let (mempool_network_task, mempool_network_receiver) =
144 MempoolNetworkTask::new();
145 let own_node_hash = keccak(
146 network.net_key_pair().expect("Error node key").public(),
147 );
148 let protocol_handler =
149 Arc::new(HotStuffSynchronizationProtocol::new(
150 own_node_hash,
151 consensus_network_task,
152 mempool_network_task,
153 pos.conf.protocol_conf.clone(),
154 ));
155 protocol_handler.clone().register(network.clone()).unwrap();
156 *pos.consensus_network_receiver.lock() =
157 Some(consensus_network_receiver);
158 *pos.mempool_network_receiver.lock() =
159 Some(mempool_network_receiver);
160 pos.hsb_protocol_handler = Some(protocol_handler);
161 }
162 pos
163 }
164
165 pub fn initialize(
166 &self, consensus: Arc<ConsensusGraph>,
167 ) -> Result<(), String> {
168 if self.pos.get().is_some() {
169 warn!("Initializing already-initialized PosHandler!");
170 return Ok(());
171 }
172 let pos_config_path = match self.conf.diem_conf_path.as_ref() {
173 Some(path) => PathBuf::from(path),
174 None => bail!("No pos config!"),
175 };
176
177 POS_STATE_CONFIG
178 .set(self.conf.pos_state_config.clone())
179 .map_err(|e| {
180 format!("Failed to set pos state config: e={:?}", e)
181 })?;
182 let mut pos_config = NodeConfig::load(pos_config_path)
183 .map_err(|e| format!("Failed to load node config: e={:?}", e))?;
184 HARDCODED_COMMITTEE_FOR_EPOCH
185 .set(pos_config.consensus.hardcoded_epoch_committee.clone())
186 .map_err(|e| {
187 format!("Failed to set hardcoded_epoch_committee: e={:?}", e)
188 })?;
189
190 pos_config.set_data_dir(pos_config.data_dir().to_path_buf());
191 let pos_genesis = read_initial_nodes_from_file(
192 self.conf.pos_initial_nodes_path.as_str(),
193 )?;
194 let network = self.network.lock().take().expect("pos not initialized");
195 let (test_command_sender, test_command_receiver) =
196 channel::new_test(1024);
197
198 pos_config.consensus.safety_rules.test = Some(SafetyRulesTestConfig {
199 author: from_consensus_public_key(
200 &self.conf.bls_key.public_key(),
201 &self.conf.vrf_key.public_key(),
202 ),
203 consensus_key: Some(self.conf.bls_key.clone()),
204 execution_key: Some(self.conf.bls_key.clone()),
205 waypoint: Some(pos_config.base.waypoint.waypoint()),
206 });
207 pos_config.consensus.safety_rules.vrf_private_key =
208 Some(self.conf.vrf_key.clone());
209 pos_config.consensus.safety_rules.export_consensus_key = true;
210 pos_config.consensus.safety_rules.vrf_proposal_threshold =
211 self.conf.vrf_proposal_threshold;
212 pos_config.consensus.chain_id = ChainId::new(network.network_id());
213
214 let pos_drop_handle = start_pos_consensus(
215 &pos_config,
216 network,
217 self.conf.protocol_conf.clone(),
218 Some((
219 self.conf.bls_key.public_key(),
220 self.conf.vrf_key.public_key(),
221 )),
222 pos_genesis,
223 self.consensus_network_receiver
224 .lock()
225 .take()
226 .expect("not initialized"),
227 self.mempool_network_receiver
228 .lock()
229 .take()
230 .expect("not initialized"),
231 test_command_receiver,
232 self.hsb_protocol_handler.clone().expect("set in new"),
233 );
234 debug!("PoS initialized");
235 let pos_connection = PosConnection::new(
236 pos_drop_handle.pos_ledger_db.clone(),
237 pos_drop_handle.consensus_db.clone(),
238 pos_drop_handle.cached_db.clone(),
239 );
240 pos_drop_handle.pow_handler.initialize(consensus);
241 if self.pos.set(Box::new(pos_connection)).is_err() {
242 bail!("PoS initialized twice!");
243 }
244 *self.test_command_sender.lock() = Some(test_command_sender);
245 *self.drop_handle.lock() = Some(pos_drop_handle);
246 Ok(())
247 }
248
249 pub fn config(&self) -> &PosConfiguration { &self.conf }
250
251 fn pos(&self) -> &Box<dyn PosInterface> { self.pos.get().unwrap() }
252
253 pub fn pos_option(&self) -> Option<&Box<dyn PosInterface>> {
254 self.pos.get()
255 }
256
257 pub fn is_enabled_at_height(&self, height: u64) -> bool {
258 height >= self.enable_height
259 }
260
261 pub fn enable_height(&self) -> u64 { self.enable_height }
262
263 pub fn is_committed(&self, h: &PosBlockId) -> bool {
264 self.pos().get_committed_block(h).is_some()
265 }
266
267 pub fn verify_against_predecessors(
275 &self, me: &PosBlockId, preds: &Vec<PosBlockId>,
276 ) -> bool {
277 let me_round = match self.pos().get_committed_block(me) {
278 None => {
279 warn!("No pos block for me={:?}", me);
280 return false;
281 }
282 Some(b) => (b.epoch, b.round),
283 };
284 for p in preds {
285 let p_round = match self.pos().get_committed_block(p) {
286 None => {
287 warn!("No pos block for pred={:?}", p);
288 return false;
289 }
290 Some(b) => (b.epoch, b.round),
291 };
292 if me_round < p_round {
293 warn!("Incorrect round: me={:?}, pred={:?}", me_round, p_round);
294 return false;
295 }
296 }
297 true
298 }
299
300 pub fn get_pivot_decision(&self, h: &PosBlockId) -> Option<H256> {
301 self.pos
303 .get()?
304 .get_committed_block(h)
305 .map(|b| b.pivot_decision)
306 }
307
308 pub fn get_latest_pos_reference(&self) -> PosBlockId {
309 self.pos().latest_block()
310 }
311
312 pub fn get_pos_view(&self, h: &PosBlockId) -> Option<u64> {
313 self.pos().get_committed_block(h).map(|b| b.view)
314 }
315
316 pub fn get_unlock_nodes(
317 &self, h: &PosBlockId, parent_pos_ref: &PosBlockId,
318 ) -> Vec<(NodeId, u64)> {
319 let unlock_event_key = UnlockEvent::event_key();
320 let mut unlock_nodes = Vec::new();
321 for event in self.pos().get_events(parent_pos_ref, h) {
322 if *event.key() == unlock_event_key {
323 let unlock_event = UnlockEvent::from_bytes(event.event_data())
324 .expect("key checked");
325 let node_id = H256::from_slice(unlock_event.node_id.as_ref());
326 let votes = unlock_event.unlocked;
327 unlock_nodes.push((node_id, votes));
328 }
329 }
330 unlock_nodes
331 }
332
333 pub fn get_disputed_nodes(
334 &self, h: &PosBlockId, parent_pos_ref: &PosBlockId,
335 ) -> Vec<NodeId> {
336 let dispute_event_key = DisputeEvent::event_key();
337 let mut disputed_nodes = Vec::new();
338 for event in self.pos().get_events(parent_pos_ref, h) {
339 if *event.key() == dispute_event_key {
340 let dispute_event =
341 DisputeEvent::from_bytes(event.event_data())
342 .expect("key checked");
343 disputed_nodes
344 .push(H256::from_slice(dispute_event.node_id.as_ref()));
345 }
346 }
347 disputed_nodes
348 }
349
350 pub fn get_reward_distribution_event(
351 &self, h: &PosBlockId, parent_pos_ref: &PosBlockId,
352 ) -> Option<Vec<(u64, RewardDistributionEventV2)>> {
353 if h == parent_pos_ref {
354 return None;
355 }
356 let me_block = self.pos().get_committed_block(h)?;
357 let parent_block = self.pos().get_committed_block(parent_pos_ref)?;
358 if me_block.epoch == parent_block.epoch {
359 return None;
360 }
361 let mut events = Vec::new();
362 for epoch in parent_block.epoch..me_block.epoch {
363 events.push((epoch, self.pos().get_reward_event(epoch)?));
364 }
365 Some(events)
366 }
367
368 pub fn pos_ledger_db(&self) -> &Arc<PosLedgerDB> {
369 self.pos().pos_ledger_db()
370 }
371
372 pub fn consensus_db(&self) -> &Arc<ConsensusDB> {
373 self.pos().consensus_db()
374 }
375
376 pub fn cached_db(&self) -> &Arc<CachedPosLedgerDB> {
377 self.pos().cached_db()
378 }
379
380 pub fn stop(&self) -> Option<(Weak<PosLedgerDB>, Weak<ConsensusDB>)> {
381 self.network.lock().take();
382 self.consensus_network_receiver.lock().take();
383 self.mempool_network_receiver.lock().take();
384 self.drop_handle.lock().take().map(|pos_drop_handle| {
385 let pos_ledger_db = pos_drop_handle.pos_ledger_db.clone();
386 let consensus_db = pos_drop_handle.consensus_db.clone();
387 (
388 Arc::downgrade(&pos_ledger_db),
389 Arc::downgrade(&consensus_db),
390 )
391 })
392 }
393}
394
395impl PosHandler {
397 pub fn force_vote_proposal(&self, block_id: H256) -> anyhow::Result<()> {
398 self.test_command_sender
399 .lock()
400 .as_mut()
401 .ok_or(anyhow::anyhow!("Pos not initialized!"))?
402 .try_send(TestCommand::ForceVoteProposal(h256_to_diem_hash(
403 &block_id,
404 )))
405 .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))
406 }
407
408 pub fn force_propose(
409 &self, round: U64, parent_block_id: H256,
410 payload: Vec<TransactionPayload>,
411 ) -> anyhow::Result<()> {
412 self.test_command_sender
413 .lock()
414 .as_mut()
415 .ok_or(anyhow::anyhow!("Pos not initialized!"))?
416 .try_send(TestCommand::ForcePropose {
417 round: round.as_u64(),
418 parent_id: h256_to_diem_hash(&parent_block_id),
419 payload,
420 })
421 .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))
422 }
423
424 pub fn trigger_timeout(&self, timeout_type: String) -> anyhow::Result<()> {
425 let command = match timeout_type.as_str() {
426 "local" => TestCommand::LocalTimeout,
427 "proposal" => TestCommand::ProposalTimeOut,
428 "new_round" => TestCommand::NewRoundTimeout,
429 _ => anyhow::bail!("Unknown timeout type"),
430 };
431 self.test_command_sender
432 .lock()
433 .as_mut()
434 .ok_or(anyhow::anyhow!("Pos not initialized!"))?
435 .try_send(command)
436 .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))
437 }
438
439 pub fn force_sign_pivot_decision(
440 &self, pivot_decision: PivotBlockDecision,
441 ) -> anyhow::Result<()> {
442 self.test_command_sender
443 .lock()
444 .as_mut()
445 .ok_or(anyhow::anyhow!("Pos not initialized!"))?
446 .try_send(TestCommand::BroadcastPivotDecision(pivot_decision))
447 .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))
448 }
449
450 pub fn get_chosen_proposal(&self) -> anyhow::Result<Option<Block>> {
451 let (tx, rx) = mpsc::sync_channel(1);
452 self.test_command_sender
453 .lock()
454 .as_mut()
455 .ok_or(anyhow::anyhow!("Pos not initialized!"))?
456 .try_send(TestCommand::GetChosenProposal(tx))
457 .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))?;
458 rx.recv().map_err(|e| anyhow::anyhow!("recv: err={:?}", e))
459 }
460
461 pub fn stop_election(&self) -> anyhow::Result<Option<Round>> {
462 let (tx, rx) = mpsc::sync_channel(1);
463 self.test_command_sender
464 .lock()
465 .as_mut()
466 .ok_or(anyhow::anyhow!("Pos not initialized!"))?
467 .try_send(TestCommand::StopElection(tx))
468 .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))?;
469 rx.recv().map_err(|e| anyhow::anyhow!("recv: err={:?}", e))
470 }
471
472 pub fn start_voting(&self, initialize: bool) -> anyhow::Result<()> {
473 let (tx, rx) = mpsc::sync_channel(1);
474 self.test_command_sender
475 .lock()
476 .as_mut()
477 .ok_or(anyhow::anyhow!("Pos not initialized!"))?
478 .try_send(TestCommand::StartVoting((initialize, tx)))
479 .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))?;
480 rx.recv()?
481 }
482
483 pub fn stop_voting(&self) -> anyhow::Result<()> {
484 let (tx, rx) = mpsc::sync_channel(1);
485 self.test_command_sender
486 .lock()
487 .as_mut()
488 .ok_or(anyhow::anyhow!("Pos not initialized!"))?
489 .try_send(TestCommand::StopVoting(tx))
490 .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))?;
491 rx.recv()?
492 }
493
494 pub fn voting_status(&self) -> anyhow::Result<bool> {
495 let (tx, rx) = mpsc::sync_channel(1);
496 self.test_command_sender
497 .lock()
498 .as_mut()
499 .ok_or(anyhow::anyhow!("Pos not initialized!"))?
500 .try_send(TestCommand::GetVotingStatus(tx))
501 .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))?;
502 Ok(rx.recv()?)
503 }
504}
505
506pub struct PosConnection {
507 pos_storage: Arc<PosLedgerDB>,
508 consensus_db: Arc<ConsensusDB>,
509 pos_cache_db: Arc<CachedPosLedgerDB>,
510}
511
512impl PosConnection {
513 pub fn new(
514 pos_storage: Arc<PosLedgerDB>, consensus_db: Arc<ConsensusDB>,
515 pos_cache_db: Arc<CachedPosLedgerDB>,
516 ) -> Self {
517 Self {
518 pos_storage,
519 consensus_db,
520 pos_cache_db,
521 }
522 }
523}
524
525impl PosInterface for PosConnection {
526 fn initialize(&self) -> Result<(), String> { Ok(()) }
527
528 fn get_committed_block(&self, h: &PosBlockId) -> Option<PosBlock> {
529 debug!("get_committed_block: {:?}", h);
530 let block_hash = h256_to_diem_hash(h);
531 let committed_block = self
532 .pos_storage
533 .get_committed_block_by_hash(&block_hash)
534 .ok()?;
535
536 debug!("pos_handler gets committed_block={:?}", committed_block);
559 Some(PosBlock {
560 hash: *h,
561 epoch: committed_block.epoch,
562 round: committed_block.round,
563 pivot_decision: committed_block.pivot_decision.block_hash,
564 view: committed_block.view,
565 version: committed_block.version,
573 })
574 }
575
576 fn latest_block(&self) -> PosBlockId {
577 diem_hash_to_h256(
578 &self
579 .pos_storage
580 .get_latest_ledger_info_option()
581 .expect("Initialized")
582 .ledger_info()
583 .consensus_block_id(),
584 )
585 }
586
587 fn get_events(
588 &self, from: &PosBlockId, to: &PosBlockId,
589 ) -> Vec<ContractEvent> {
590 let start_version = self
591 .pos_storage
592 .get_committed_block_by_hash(&h256_to_diem_hash(from))
593 .expect("err reading ledger info for from")
594 .version;
595 let end_version = self
596 .pos_storage
597 .get_committed_block_by_hash(&h256_to_diem_hash(to))
598 .expect("err reading ledger info for to")
599 .version;
600 self.pos_storage
601 .get_events_by_version(start_version, end_version)
602 .expect("err reading events")
603 }
604
605 fn get_epoch_ending_blocks(
606 &self, start_epoch: u64, end_epoch: u64,
607 ) -> Vec<PosBlockId> {
608 self.pos_storage
609 .get_epoch_ending_blocks(start_epoch, end_epoch)
610 .expect("err reading epoch ending blocks")
611 .into_iter()
612 .map(|h| diem_hash_to_h256(&h))
613 .collect()
614 }
615
616 fn get_reward_event(
617 &self, epoch: u64,
618 ) -> Option<RewardDistributionEventV2> {
619 self.pos_storage.get_reward_event(epoch).ok()
620 }
621
622 fn get_epoch_state(&self, block_id: &PosBlockId) -> EpochState {
623 self.pos_storage
624 .get_pos_state(&h256_to_diem_hash(block_id))
625 .expect("parent of an ending_epoch block")
626 .epoch_state()
627 .clone()
628 }
629
630 fn pos_ledger_db(&self) -> &Arc<PosLedgerDB> { &self.pos_storage }
631
632 fn consensus_db(&self) -> &Arc<ConsensusDB> { &self.consensus_db }
633
634 fn cached_db(&self) -> &Arc<CachedPosLedgerDB> { &self.pos_cache_db }
635}
636
637pub struct PosConfiguration {
638 pub bls_key: ConfigKey<ConsensusPrivateKey>,
639 pub vrf_key: ConfigKey<ConsensusVRFPrivateKey>,
640 pub diem_conf_path: Option<String>,
641 pub protocol_conf: ProtocolConfiguration,
642 pub pos_initial_nodes_path: String,
643 pub vrf_proposal_threshold: U256,
644 pub pos_state_config: PosStateConfig,
645}
646
647fn diem_hash_to_h256(h: &HashValue) -> PosBlockId { H256::from(h.as_ref()) }
648fn h256_to_diem_hash(h: &PosBlockId) -> HashValue {
649 HashValue::new(h.to_fixed_bytes())
650}
651
652pub fn save_initial_nodes_to_file(path: &str, genesis_nodes: GenesisPosState) {
653 fs::write(path, serde_json::to_string(&genesis_nodes).unwrap()).unwrap();
654}
655
656pub fn read_initial_nodes_from_file(
657 path: &str,
658) -> Result<GenesisPosState, String> {
659 let mut file = fs::File::open(path)
660 .map_err(|e| format!("failed to open initial nodes file: {:?}", e))?;
661
662 let mut nodes_str = String::new();
663 file.read_to_string(&mut nodes_str)
664 .map_err(|e| format!("failed to read initial nodes file: {:?}", e))?;
665
666 serde_json::from_str(nodes_str.as_str())
667 .map_err(|e| format!("failed to parse initial nodes file: {:?}", e))
668}