1use std::sync::{mpsc, Arc, Weak};
2
3use futures::channel::mpsc as futures_mpsc;
4use once_cell::sync::OnceCell;
5
6use cfx_types::{H256, U256, U64};
7use diem_config::{config::NodeConfig, keys::ConfigKey};
8use diem_crypto::HashValue;
9use diem_types::{
10 contract_event::ContractEvent,
11 epoch_state::EpochState,
12 reward_distribution_event::RewardDistributionEventV2,
13 term_state::{DisputeEvent, UnlockEvent},
14 validator_config::{ConsensusPrivateKey, ConsensusVRFPrivateKey},
15};
16use keccak_hash::keccak;
17use primitives::pos::{NodeId, PosBlockId};
18use storage_interface::{DBReaderForPoW, DbReader};
19
20use crate::{
21 genesis_block::GenesisPosState,
22 pos::{
23 consensus::{
24 network::{
25 NetworkReceivers as ConsensusNetworkReceivers,
26 NetworkTask as ConsensusNetworkTask,
27 },
28 ConsensusDB, TestCommand,
29 },
30 mempool::network::{
31 NetworkReceivers as MemPoolNetworkReceivers,
32 NetworkTask as MempoolNetworkTask,
33 },
34 pos::{
35 start_pos_consensus, PosChainParams, PosDropHandle, PosNodeKeys,
36 },
37 protocol::sync_protocol::HotStuffSynchronizationProtocol,
38 },
39 sync::ProtocolConfiguration,
40 ConsensusGraph,
41};
42
43use cached_pos_ledger_db::CachedPosLedgerDB;
44use consensus_types::block::Block;
45use diem_types::{
46 account_address::from_consensus_public_key,
47 block_info::{PivotBlockDecision, Round},
48 chain_id::ChainId,
49 epoch_state::HARDCODED_COMMITTEE_FOR_EPOCH,
50 term_state::pos_state_config::{PosStateConfig, POS_STATE_CONFIG},
51 transaction::TransactionPayload,
52};
53use network::NetworkService;
54use parking_lot::Mutex;
55use pos_ledger_db::PosLedgerDB;
56use std::{fs, io::Read, path::PathBuf};
57
58pub type PosVerifier = PosHandler;
59
60pub trait PosInterface: Send + Sync {
67 fn initialize(&self) -> Result<(), String>;
69
70 fn get_committed_block(&self, h: &PosBlockId) -> Option<PosBlock>;
74
75 fn latest_block(&self) -> PosBlockId;
78
79 fn get_events(
80 &self, from: &PosBlockId, to: &PosBlockId,
81 ) -> Vec<ContractEvent>;
82
83 fn get_epoch_ending_blocks(
84 &self, start_epoch: u64, end_epoch: u64,
85 ) -> Vec<PosBlockId>;
86
87 fn get_reward_event(&self, epoch: u64)
88 -> Option<RewardDistributionEventV2>;
89
90 fn get_epoch_state(&self, block_id: &PosBlockId) -> EpochState;
91
92 fn pos_ledger_db(&self) -> &Arc<PosLedgerDB>;
93
94 fn consensus_db(&self) -> &Arc<ConsensusDB>;
95
96 fn cached_db(&self) -> &Arc<CachedPosLedgerDB>;
97}
98
99#[allow(unused)]
100pub struct PosBlock {
101 hash: PosBlockId,
102 epoch: u64,
103 round: u64,
104 pivot_decision: H256,
105 version: u64,
106 view: u64,
107 }
111
112pub struct PosHandler {
113 pos: OnceCell<Box<dyn PosInterface>>,
114 network: Mutex<Option<Arc<NetworkService>>>,
115 drop_handle: Mutex<Option<PosDropHandle>>,
117 consensus_network_receiver: Mutex<Option<ConsensusNetworkReceivers>>,
118 mempool_network_receiver: Mutex<Option<MemPoolNetworkReceivers>>,
119 test_command_sender: Mutex<Option<futures_mpsc::Sender<TestCommand>>>,
120 enable_height: u64,
121 hsb_protocol_handler: Option<Arc<HotStuffSynchronizationProtocol>>,
122 pub conf: PosConfiguration,
123}
124
125impl PosHandler {
126 pub fn new(
127 network: Option<Arc<NetworkService>>, conf: PosConfiguration,
128 enable_height: u64,
129 ) -> Self {
130 let mut pos = Self {
131 pos: OnceCell::new(),
132 network: Mutex::new(network.clone()),
133 drop_handle: Mutex::new(None),
134 consensus_network_receiver: Mutex::new(None),
135 mempool_network_receiver: Mutex::new(None),
136 test_command_sender: Mutex::new(None),
137 enable_height,
138 hsb_protocol_handler: None,
139 conf,
140 };
141 if let Some(network) = &network {
142 let (consensus_network_task, consensus_network_receiver) =
144 ConsensusNetworkTask::new();
145 let (mempool_network_task, mempool_network_receiver) =
146 MempoolNetworkTask::new();
147 let own_node_hash = keccak(
148 network.net_key_pair().expect("Error node key").public(),
149 );
150 let protocol_handler =
151 Arc::new(HotStuffSynchronizationProtocol::new(
152 own_node_hash,
153 consensus_network_task,
154 mempool_network_task,
155 pos.conf.protocol_conf.clone(),
156 ));
157 protocol_handler.clone().register(network.clone()).unwrap();
158 *pos.consensus_network_receiver.lock() =
159 Some(consensus_network_receiver);
160 *pos.mempool_network_receiver.lock() =
161 Some(mempool_network_receiver);
162 pos.hsb_protocol_handler = Some(protocol_handler);
163 }
164 pos
165 }
166
167 pub fn initialize(
168 &self, consensus: Arc<ConsensusGraph>,
169 ) -> Result<(), String> {
170 if self.pos.get().is_some() {
171 warn!("Initializing already-initialized PosHandler!");
172 return Ok(());
173 }
174 let pos_config_path = match self.conf.diem_conf_path.as_ref() {
175 Some(path) => PathBuf::from(path),
176 None => bail!("No pos config!"),
177 };
178
179 POS_STATE_CONFIG
180 .set(self.conf.pos_state_config.clone())
181 .map_err(|e| {
182 format!("Failed to set pos state config: e={:?}", e)
183 })?;
184 let mut pos_config = NodeConfig::load(pos_config_path)
185 .map_err(|e| format!("Failed to load node config: e={:?}", e))?;
186 HARDCODED_COMMITTEE_FOR_EPOCH
187 .set(pos_config.consensus.hardcoded_epoch_committee.clone())
188 .map_err(|e| {
189 format!("Failed to set hardcoded_epoch_committee: e={:?}", e)
190 })?;
191
192 pos_config.set_data_dir(pos_config.data_dir().to_path_buf());
193 let pos_genesis = read_initial_nodes_from_file(
194 self.conf.pos_initial_nodes_path.as_str(),
195 )?;
196 let network = self.network.lock().take().expect("pos not initialized");
197 let (test_command_sender, test_command_receiver) =
198 futures_mpsc::channel(1024);
199
200 let node_keys = PosNodeKeys {
201 author: from_consensus_public_key(
202 &self.conf.bls_key.public_key(),
203 &self.conf.vrf_key.public_key(),
204 ),
205 consensus_private_key: self.conf.bls_key.private_key(),
206 vrf_private_key: self.conf.vrf_key.private_key(),
207 };
208 let chain_params = PosChainParams {
209 chain_id: ChainId::new(network.network_id()),
210 vrf_proposal_threshold: self.conf.vrf_proposal_threshold,
211 };
212
213 let pos_drop_handle = start_pos_consensus(
214 &pos_config,
215 network,
216 self.conf.protocol_conf.clone(),
217 node_keys,
218 chain_params,
219 pos_genesis,
220 self.consensus_network_receiver
221 .lock()
222 .take()
223 .expect("not initialized"),
224 self.mempool_network_receiver
225 .lock()
226 .take()
227 .expect("not initialized"),
228 test_command_receiver,
229 self.hsb_protocol_handler.clone().expect("set in new"),
230 );
231 debug!("PoS initialized");
232 let pos_connection = PosConnection::new(
233 pos_drop_handle.pos_ledger_db.clone(),
234 pos_drop_handle.consensus_db.clone(),
235 pos_drop_handle.cached_db.clone(),
236 );
237 pos_drop_handle.pow_handler.initialize(consensus);
238 if self.pos.set(Box::new(pos_connection)).is_err() {
239 bail!("PoS initialized twice!");
240 }
241 *self.test_command_sender.lock() = Some(test_command_sender);
242 *self.drop_handle.lock() = Some(pos_drop_handle);
243 Ok(())
244 }
245
246 pub fn config(&self) -> &PosConfiguration { &self.conf }
247
248 fn pos(&self) -> &Box<dyn PosInterface> { self.pos.get().unwrap() }
249
250 pub fn pos_option(&self) -> Option<&Box<dyn PosInterface>> {
251 self.pos.get()
252 }
253
254 pub fn is_enabled_at_height(&self, height: u64) -> bool {
255 height >= self.enable_height
256 }
257
258 pub fn enable_height(&self) -> u64 { self.enable_height }
259
260 pub fn is_committed(&self, h: &PosBlockId) -> bool {
261 self.pos().get_committed_block(h).is_some()
262 }
263
264 pub fn verify_against_predecessors(
272 &self, me: &PosBlockId, preds: &Vec<PosBlockId>,
273 ) -> bool {
274 let me_round = match self.pos().get_committed_block(me) {
275 None => {
276 warn!("No pos block for me={:?}", me);
277 return false;
278 }
279 Some(b) => (b.epoch, b.round),
280 };
281 for p in preds {
282 let p_round = match self.pos().get_committed_block(p) {
283 None => {
284 warn!("No pos block for pred={:?}", p);
285 return false;
286 }
287 Some(b) => (b.epoch, b.round),
288 };
289 if me_round < p_round {
290 warn!("Incorrect round: me={:?}, pred={:?}", me_round, p_round);
291 return false;
292 }
293 }
294 true
295 }
296
297 pub fn get_pivot_decision(&self, h: &PosBlockId) -> Option<H256> {
298 self.pos
300 .get()?
301 .get_committed_block(h)
302 .map(|b| b.pivot_decision)
303 }
304
305 pub fn get_latest_pos_reference(&self) -> PosBlockId {
306 self.pos().latest_block()
307 }
308
309 pub fn get_pos_view(&self, h: &PosBlockId) -> Option<u64> {
310 self.pos().get_committed_block(h).map(|b| b.view)
311 }
312
313 pub fn get_unlock_nodes(
314 &self, h: &PosBlockId, parent_pos_ref: &PosBlockId,
315 ) -> Vec<(NodeId, u64)> {
316 let unlock_event_key = UnlockEvent::event_key();
317 let mut unlock_nodes = Vec::new();
318 for event in self.pos().get_events(parent_pos_ref, h) {
319 if *event.key() == unlock_event_key {
320 let unlock_event = UnlockEvent::from_bytes(event.event_data())
321 .expect("key checked");
322 let node_id = H256::from_slice(unlock_event.node_id.as_ref());
323 let votes = unlock_event.unlocked;
324 unlock_nodes.push((node_id, votes));
325 }
326 }
327 unlock_nodes
328 }
329
330 pub fn get_disputed_nodes(
331 &self, h: &PosBlockId, parent_pos_ref: &PosBlockId,
332 ) -> Vec<NodeId> {
333 let dispute_event_key = DisputeEvent::event_key();
334 let mut disputed_nodes = Vec::new();
335 for event in self.pos().get_events(parent_pos_ref, h) {
336 if *event.key() == dispute_event_key {
337 let dispute_event =
338 DisputeEvent::from_bytes(event.event_data())
339 .expect("key checked");
340 disputed_nodes
341 .push(H256::from_slice(dispute_event.node_id.as_ref()));
342 }
343 }
344 disputed_nodes
345 }
346
347 pub fn get_reward_distribution_event(
348 &self, h: &PosBlockId, parent_pos_ref: &PosBlockId,
349 ) -> Option<Vec<(u64, RewardDistributionEventV2)>> {
350 if h == parent_pos_ref {
351 return None;
352 }
353 let me_block = self.pos().get_committed_block(h)?;
354 let parent_block = self.pos().get_committed_block(parent_pos_ref)?;
355 if me_block.epoch == parent_block.epoch {
356 return None;
357 }
358 let mut events = Vec::new();
359 for epoch in parent_block.epoch..me_block.epoch {
360 events.push((epoch, self.pos().get_reward_event(epoch)?));
361 }
362 Some(events)
363 }
364
365 pub fn pos_ledger_db(&self) -> &Arc<PosLedgerDB> {
366 self.pos().pos_ledger_db()
367 }
368
369 pub fn consensus_db(&self) -> &Arc<ConsensusDB> {
370 self.pos().consensus_db()
371 }
372
373 pub fn cached_db(&self) -> &Arc<CachedPosLedgerDB> {
374 self.pos().cached_db()
375 }
376
377 pub fn stop(&self) -> Option<(Weak<PosLedgerDB>, Weak<ConsensusDB>)> {
378 self.network.lock().take();
379 self.consensus_network_receiver.lock().take();
380 self.mempool_network_receiver.lock().take();
381 self.drop_handle.lock().take().map(|pos_drop_handle| {
382 let pos_ledger_db = pos_drop_handle.pos_ledger_db.clone();
383 let consensus_db = pos_drop_handle.consensus_db.clone();
384 (
385 Arc::downgrade(&pos_ledger_db),
386 Arc::downgrade(&consensus_db),
387 )
388 })
389 }
390}
391
392impl PosHandler {
394 pub fn force_vote_proposal(&self, block_id: H256) -> anyhow::Result<()> {
395 self.test_command_sender
396 .lock()
397 .as_mut()
398 .ok_or(anyhow::anyhow!("Pos not initialized!"))?
399 .try_send(TestCommand::ForceVoteProposal(h256_to_diem_hash(
400 &block_id,
401 )))
402 .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))
403 }
404
405 pub fn force_propose(
406 &self, round: U64, parent_block_id: H256,
407 payload: Vec<TransactionPayload>,
408 ) -> anyhow::Result<()> {
409 self.test_command_sender
410 .lock()
411 .as_mut()
412 .ok_or(anyhow::anyhow!("Pos not initialized!"))?
413 .try_send(TestCommand::ForcePropose {
414 round: round.as_u64(),
415 parent_id: h256_to_diem_hash(&parent_block_id),
416 payload,
417 })
418 .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))
419 }
420
421 pub fn trigger_timeout(&self, timeout_type: String) -> anyhow::Result<()> {
422 let command = match timeout_type.as_str() {
423 "local" => TestCommand::LocalTimeout,
424 "proposal" => TestCommand::ProposalTimeOut,
425 "new_round" => TestCommand::NewRoundTimeout,
426 _ => anyhow::bail!("Unknown timeout type"),
427 };
428 self.test_command_sender
429 .lock()
430 .as_mut()
431 .ok_or(anyhow::anyhow!("Pos not initialized!"))?
432 .try_send(command)
433 .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))
434 }
435
436 pub fn force_sign_pivot_decision(
437 &self, pivot_decision: PivotBlockDecision,
438 ) -> anyhow::Result<()> {
439 self.test_command_sender
440 .lock()
441 .as_mut()
442 .ok_or(anyhow::anyhow!("Pos not initialized!"))?
443 .try_send(TestCommand::BroadcastPivotDecision(pivot_decision))
444 .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))
445 }
446
447 pub fn get_chosen_proposal(&self) -> anyhow::Result<Option<Block>> {
448 let (tx, rx) = mpsc::sync_channel(1);
449 self.test_command_sender
450 .lock()
451 .as_mut()
452 .ok_or(anyhow::anyhow!("Pos not initialized!"))?
453 .try_send(TestCommand::GetChosenProposal(tx))
454 .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))?;
455 rx.recv().map_err(|e| anyhow::anyhow!("recv: err={:?}", e))
456 }
457
458 pub fn stop_election(&self) -> anyhow::Result<Option<Round>> {
459 let (tx, rx) = mpsc::sync_channel(1);
460 self.test_command_sender
461 .lock()
462 .as_mut()
463 .ok_or(anyhow::anyhow!("Pos not initialized!"))?
464 .try_send(TestCommand::StopElection(tx))
465 .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))?;
466 rx.recv().map_err(|e| anyhow::anyhow!("recv: err={:?}", e))
467 }
468
469 pub fn start_voting(&self, initialize: bool) -> anyhow::Result<()> {
470 let (tx, rx) = mpsc::sync_channel(1);
471 self.test_command_sender
472 .lock()
473 .as_mut()
474 .ok_or(anyhow::anyhow!("Pos not initialized!"))?
475 .try_send(TestCommand::StartVoting((initialize, tx)))
476 .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))?;
477 rx.recv()?
478 }
479
480 pub fn stop_voting(&self) -> anyhow::Result<()> {
481 let (tx, rx) = mpsc::sync_channel(1);
482 self.test_command_sender
483 .lock()
484 .as_mut()
485 .ok_or(anyhow::anyhow!("Pos not initialized!"))?
486 .try_send(TestCommand::StopVoting(tx))
487 .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))?;
488 rx.recv()?
489 }
490
491 pub fn voting_status(&self) -> anyhow::Result<bool> {
492 let (tx, rx) = mpsc::sync_channel(1);
493 self.test_command_sender
494 .lock()
495 .as_mut()
496 .ok_or(anyhow::anyhow!("Pos not initialized!"))?
497 .try_send(TestCommand::GetVotingStatus(tx))
498 .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))?;
499 Ok(rx.recv()?)
500 }
501}
502
503pub struct PosConnection {
504 pos_storage: Arc<PosLedgerDB>,
505 consensus_db: Arc<ConsensusDB>,
506 pos_cache_db: Arc<CachedPosLedgerDB>,
507}
508
509impl PosConnection {
510 pub fn new(
511 pos_storage: Arc<PosLedgerDB>, consensus_db: Arc<ConsensusDB>,
512 pos_cache_db: Arc<CachedPosLedgerDB>,
513 ) -> Self {
514 Self {
515 pos_storage,
516 consensus_db,
517 pos_cache_db,
518 }
519 }
520}
521
522impl PosInterface for PosConnection {
523 fn initialize(&self) -> Result<(), String> { Ok(()) }
524
525 fn get_committed_block(&self, h: &PosBlockId) -> Option<PosBlock> {
526 debug!("get_committed_block: {:?}", h);
527 let block_hash = h256_to_diem_hash(h);
528 let committed_block = self
529 .pos_storage
530 .get_committed_block_by_hash(&block_hash)
531 .ok()?;
532
533 debug!("pos_handler gets committed_block={:?}", committed_block);
556 Some(PosBlock {
557 hash: *h,
558 epoch: committed_block.epoch,
559 round: committed_block.round,
560 pivot_decision: committed_block.pivot_decision.block_hash,
561 view: committed_block.view,
562 version: committed_block.version,
570 })
571 }
572
573 fn latest_block(&self) -> PosBlockId {
574 diem_hash_to_h256(
575 &self
576 .pos_storage
577 .get_latest_ledger_info_option()
578 .expect("Initialized")
579 .ledger_info()
580 .consensus_block_id(),
581 )
582 }
583
584 fn get_events(
585 &self, from: &PosBlockId, to: &PosBlockId,
586 ) -> Vec<ContractEvent> {
587 let start_version = self
588 .pos_storage
589 .get_committed_block_by_hash(&h256_to_diem_hash(from))
590 .expect("err reading ledger info for from")
591 .version;
592 let end_version = self
593 .pos_storage
594 .get_committed_block_by_hash(&h256_to_diem_hash(to))
595 .expect("err reading ledger info for to")
596 .version;
597 self.pos_storage
598 .get_events_by_version(start_version, end_version)
599 .expect("err reading events")
600 }
601
602 fn get_epoch_ending_blocks(
603 &self, start_epoch: u64, end_epoch: u64,
604 ) -> Vec<PosBlockId> {
605 self.pos_storage
606 .get_epoch_ending_blocks(start_epoch, end_epoch)
607 .expect("err reading epoch ending blocks")
608 .into_iter()
609 .map(|h| diem_hash_to_h256(&h))
610 .collect()
611 }
612
613 fn get_reward_event(
614 &self, epoch: u64,
615 ) -> Option<RewardDistributionEventV2> {
616 self.pos_storage.get_reward_event(epoch).ok()
617 }
618
619 fn get_epoch_state(&self, block_id: &PosBlockId) -> EpochState {
620 self.pos_storage
621 .get_pos_state(&h256_to_diem_hash(block_id))
622 .expect("parent of an ending_epoch block")
623 .epoch_state()
624 .clone()
625 }
626
627 fn pos_ledger_db(&self) -> &Arc<PosLedgerDB> { &self.pos_storage }
628
629 fn consensus_db(&self) -> &Arc<ConsensusDB> { &self.consensus_db }
630
631 fn cached_db(&self) -> &Arc<CachedPosLedgerDB> { &self.pos_cache_db }
632}
633
634pub struct PosConfiguration {
635 pub bls_key: ConfigKey<ConsensusPrivateKey>,
636 pub vrf_key: ConfigKey<ConsensusVRFPrivateKey>,
637 pub diem_conf_path: Option<String>,
638 pub protocol_conf: ProtocolConfiguration,
639 pub pos_initial_nodes_path: String,
640 pub vrf_proposal_threshold: U256,
641 pub pos_state_config: PosStateConfig,
642}
643
644fn diem_hash_to_h256(h: &HashValue) -> PosBlockId { H256::from(h.as_ref()) }
645fn h256_to_diem_hash(h: &PosBlockId) -> HashValue {
646 HashValue::new(h.to_fixed_bytes())
647}
648
649pub fn save_initial_nodes_to_file(path: &str, genesis_nodes: GenesisPosState) {
650 fs::write(path, serde_json::to_string(&genesis_nodes).unwrap()).unwrap();
651}
652
653pub fn read_initial_nodes_from_file(
654 path: &str,
655) -> Result<GenesisPosState, String> {
656 let mut file = fs::File::open(path)
657 .map_err(|e| format!("failed to open initial nodes file: {:?}", e))?;
658
659 let mut nodes_str = String::new();
660 file.read_to_string(&mut nodes_str)
661 .map_err(|e| format!("failed to read initial nodes file: {:?}", e))?;
662
663 serde_json::from_str(nodes_str.as_str())
664 .map_err(|e| format!("failed to parse initial nodes file: {:?}", e))
665}