1use cfx_addr::Network;
6use cfx_executor::internal_contract;
7use cfx_rpc_cfx_api::PosRpcServer;
8use cfx_rpc_cfx_types::{
9 pos::{
10 tx_type, Account, Block, BlockNumber, CommitteeState, Decision,
11 EpochState as RpcEpochState,
12 LedgerInfoWithSignatures as RpcLedgerInfoWithSignatures,
13 NodeLockStatus, PoSEpochReward, Reward, RpcCommittee, RpcTermData,
14 RpcTransactionStatus, RpcTransactionType, Signature, Status,
15 Transaction, VotePowerState,
16 },
17 EpochNumber, RpcAddress,
18};
19use cfx_statedb::StateDbExt;
20use cfx_types::{hexstr_to_h256, BigEndianHash, H256, U256, U64};
21use cfx_util_macros::bail;
22use cfxcore::{
23 block_data_manager::block_data_types::PosRewardInfo,
24 consensus::pos_handler::PosVerifier, errors::Result as CoreResult,
25 BlockDataManager, SharedConsensusGraph,
26};
27use consensus_types::block::Block as ConsensusBlock;
28use diem_crypto::hash::HashValue;
29use diem_types::{
30 account_address::AccountAddress,
31 epoch_state::EpochState,
32 ledger_info::LedgerInfoWithSignatures,
33 term_state::{lock_status::StatusList, PosState, TERM_LIST_LEN},
34 transaction::Transaction as CoreTransaction,
35};
36use itertools::Itertools;
37use jsonrpsee::core::RpcResult;
38use log::{debug, info};
39use primitives::{StorageKey, StorageValue};
40use std::{collections::HashMap, sync::Arc};
41use storage_interface::{DBReaderForPoW, DbReader};
42
43pub struct PosHandler {
44 pos_handler: Arc<PosVerifier>,
45 pow_data_manager: Arc<BlockDataManager>,
46 network_type: Network,
47 consensus: SharedConsensusGraph,
48}
49
50impl PosHandler {
51 pub fn new(
52 pos_handler: Arc<PosVerifier>, pow_data_manager: Arc<BlockDataManager>,
53 network_type: Network, consensus: SharedConsensusGraph,
54 ) -> Self {
55 PosHandler {
56 pos_handler,
57 pow_data_manager,
58 network_type,
59 consensus,
60 }
61 }
62
63 fn current_height(&self) -> u64 {
64 self.pos_handler
65 .pos_ledger_db()
66 .get_latest_pos_state()
67 .current_view()
68 }
69
70 fn current_epoch(&self) -> u64 {
71 self.pos_handler
72 .pos_ledger_db()
73 .get_latest_pos_state()
74 .epoch_state()
75 .epoch
76 }
77
78 fn status_impl(&self) -> Status {
79 let state = self.pos_handler.pos_ledger_db().get_latest_pos_state();
80 let decision = state.pivot_decision();
81 let epoch_state = state.epoch_state();
82 let block_number = state.current_view();
83 let latest_voted = self.latest_voted().map(|b| U64::from(b.height));
84 let latest_tx_number = self
85 .block_by_number(BlockNumber::Num(U64::from(block_number)))
86 .map(|b| b.last_tx_number.into())
87 .unwrap_or_default();
88 Status {
89 epoch: U64::from(epoch_state.epoch),
90 latest_committed: U64::from(block_number),
91 pivot_decision: Decision::from(decision),
92 latest_voted,
93 latest_tx_number,
94 }
95 }
96
97 fn account_impl(
98 &self, address: H256, view: Option<U64>,
99 ) -> cfxcore::errors::Result<Account> {
100 let state = self.pos_state_by_view(view)?;
101
102 let account_address = AccountAddress::from_bytes(address);
103
104 if let Ok(addr) = account_address {
105 let maybe_node_data = state.account_node_data(addr);
106 info!("maybe_node_data {:?}", maybe_node_data);
107
108 if let Some(node_data) = maybe_node_data {
109 let lock_status = node_data.lock_status();
110 return Ok(Account {
111 address,
112 block_number: U64::from(state.current_view()),
113 status: NodeLockStatus {
114 in_queue: map_votes(&lock_status.in_queue),
115 locked: U64::from(lock_status.locked),
116 out_queue: map_votes(&lock_status.out_queue),
117 unlocked: U64::from(lock_status.unlocked_votes()),
118 available_votes: U64::from(
119 lock_status.available_votes(),
120 ),
121 force_retired: lock_status
122 .force_retired()
123 .map(|x| U64::from(x)),
124 forfeited: U64::from(lock_status.forfeited()),
125 },
126 });
127 };
128 }
129
130 let mut default_acct: Account = Account::default();
131 default_acct.address = address;
132 default_acct.block_number = U64::from(state.current_view());
133 return Ok(default_acct);
134 }
135
136 fn account_by_pow_address_impl(
137 &self, address: RpcAddress, view: Option<U64>,
138 ) -> CoreResult<Account> {
139 debug!(
140 "Get pos account by pow address {:?}, view {:?}",
141 address, view
142 );
143
144 let state_db = self.consensus.get_state_db_by_epoch_number(
145 EpochNumber::LatestState.into(),
146 "view",
147 )?;
148
149 let identifier_entry =
150 internal_contract::pos_internal_entries::identifier_entry(
151 &address.hex_address,
152 );
153 let storage_key = StorageKey::new_storage_key(
154 &cfx_parameters::internal_contract_addresses::POS_REGISTER_CONTRACT_ADDRESS,
155 identifier_entry.as_ref(),
156 )
157 .with_native_space();
158 let StorageValue { value, .. } = state_db
159 .get::<StorageValue>(storage_key)?
160 .unwrap_or_default();
161 let addr = BigEndianHash::from_uint(&value);
162
163 debug!("Pos Address: {:?}", addr);
164
165 self.account_impl(addr, view)
166 }
167
168 fn pos_state_by_view(
169 &self, view: Option<U64>,
170 ) -> Result<Arc<PosState>, String> {
171 let latest_state =
172 self.pos_handler.pos_ledger_db().get_latest_pos_state();
173 let state = match view {
174 None => latest_state,
175 Some(v) => {
176 let latest_view = latest_state.current_view();
177 let v = v.as_u64();
178 if v > latest_view {
179 bail!("Specified block {} is not executed, the latest block number is {}", v, latest_view)
180 }
181
182 let state = self
183 .pos_handler
184 .pos_ledger_db()
185 .get_committed_block_hash_by_view(v)
186 .and_then(|block_hash| {
187 self.pos_handler
188 .pos_ledger_db()
189 .get_pos_state(&block_hash)
190 })
191 .map_err(|_| format!("PoS state of {} not found", v))?;
192 Arc::new(state)
193 }
194 };
195 Ok(state)
196 }
197
198 fn committee_by_block_number(
199 &self, view: Option<U64>,
200 ) -> CoreResult<CommitteeState> {
201 let pos_state = self.pos_state_by_view(view)?;
202
203 let current_committee =
204 RpcCommittee::from_epoch_state(pos_state.epoch_state());
205
206 let elections = pos_state.term_list().term_list()
208 [TERM_LIST_LEN..=TERM_LIST_LEN + 1]
209 .iter()
210 .map(|term_data| RpcTermData::from(term_data))
211 .collect();
212
213 Ok(CommitteeState {
214 current_committee,
215 elections,
216 })
217 }
218
219 fn ledger_info_by_epoch(
221 &self, epoch: u64,
222 ) -> Option<LedgerInfoWithSignatures> {
223 self.pos_handler
224 .pos_ledger_db()
225 .get_epoch_ending_ledger_infos(epoch, epoch + 1)
226 .ok()?
227 .get_all_ledger_infos()
228 .first()
229 .map(|l| l.clone())
230 }
231
232 fn ledger_infos_by_epoch(
233 &self, start_epoch: u64, end_epoch: u64,
234 ) -> Vec<LedgerInfoWithSignatures> {
235 self.pos_handler
236 .pos_ledger_db()
237 .get_epoch_ending_ledger_infos(start_epoch, end_epoch)
238 .ok()
239 .map(|proof| proof.get_all_ledger_infos())
240 .unwrap_or(vec![])
241 }
242
243 fn epoch_state_by_epoch_number(&self, epoch: u64) -> Option<EpochState> {
245 if epoch == 0 {
246 return None;
247 }
248 if epoch == self.current_epoch() {
249 return Some(
250 self.pos_handler
251 .pos_ledger_db()
252 .get_latest_pos_state()
253 .epoch_state()
254 .clone(),
255 );
256 }
257 if let Some(ledger_info) = self.ledger_info_by_epoch(epoch - 1) {
258 let option = ledger_info.ledger_info().next_epoch_state();
259 return option.map(|f| (*f).clone());
260 }
261 None
262 }
263
264 fn block_by_hash(&self, hash: H256) -> Option<Block> {
265 let hash_value = HashValue::from_slice(hash.as_bytes()).ok()?;
266 let block = self
267 .pos_handler
268 .pos_ledger_db()
269 .get_committed_block_by_hash(&hash_value);
270 match block {
271 Ok(b) => {
272 let mut block = Block {
273 hash,
274 height: U64::from(b.view),
275 epoch: U64::from(b.epoch),
276 round: U64::from(b.round),
277 last_tx_number: U64::from(b.version),
278 miner: b.miner.map(|m| H256::from(m.to_u8())),
279 parent_hash: hash_value_to_h256(b.parent_hash),
280 timestamp: U64::from(b.timestamp),
281 pivot_decision: Some(Decision::from(&b.pivot_decision)),
282 signatures: vec![],
283 };
284 if let Some(epoch_state) =
286 self.epoch_state_by_epoch_number(b.epoch)
287 {
288 if let Ok(ledger_info) = self
289 .pos_handler
290 .pos_ledger_db()
291 .get_ledger_info_by_voted_block(&b.hash)
292 {
293 block.signatures = ledger_info
294 .signatures()
295 .iter()
296 .map(|(a, _s)| {
297 let voting_power = epoch_state
298 .verifier()
299 .get_voting_power(a)
300 .unwrap_or(0);
301 Signature {
302 account: H256::from(a.to_u8()),
303 votes: U64::from(voting_power),
304 }
305 })
306 .collect();
307 }
308 };
309 Some(block)
310 }
311 Err(_) => self.consensus_block_by_hash(hash),
312 }
313 }
314
315 fn block_by_number(&self, number: BlockNumber) -> Option<Block> {
316 match number {
317 BlockNumber::Num(num) => {
318 if num.as_u64() <= self.current_height() {
319 let hash = self
320 .pos_handler
321 .pos_ledger_db()
322 .get_committed_block_hash_by_view(num.as_u64())
323 .ok()?;
324 self.block_by_hash(hash_value_to_h256(hash))
325 } else {
326 self.consensus_block_by_number(num)
327 }
328 }
329 BlockNumber::LatestCommitted => {
330 let hash = self.pos_handler.get_latest_pos_reference();
331 self.block_by_hash(hash)
332 }
333 BlockNumber::Earliest => {
334 let hash = self
335 .pos_handler
336 .pos_ledger_db()
337 .get_committed_block_hash_by_view(1)
338 .ok()?;
339 self.block_by_hash(hash_value_to_h256(hash))
340 }
341 BlockNumber::LatestVoted => self.latest_voted(),
342 }
343 }
344
345 fn consensus_blocks(&self) -> Option<Vec<Block>> {
346 let blocks = self.pos_handler.consensus_db().get_blocks().ok()?;
347 let block_ids = blocks.values().map(|b| b.id()).collect::<Vec<_>>();
348 debug!("consensus_blocks: block_ids={:?}", block_ids);
349 if blocks.len() == 0 {
350 return Some(vec![]);
351 }
352 let qcs = self
353 .pos_handler
354 .consensus_db()
355 .get_quorum_certificates()
356 .ok()?;
357 let blocks: Vec<ConsensusBlock> = blocks
359 .into_iter()
360 .sorted_by(|(_, b1), (_, b2)| {
361 Ord::cmp(&(b1.epoch(), b1.round()), &(b2.epoch(), b2.round()))
362 })
363 .map(|(_, b)| b)
364 .collect();
365 let latest_epoch_state = self
366 .pos_handler
367 .pos_ledger_db()
368 .get_latest_pos_state()
369 .epoch_state()
370 .clone();
371 let rpc_blocks = blocks
373 .into_iter()
374 .filter(|b| b.epoch() == latest_epoch_state.epoch)
375 .map(|b| {
376 let mut rpc_block = Block {
377 hash: hash_value_to_h256(b.id()),
378 epoch: U64::from(b.epoch()),
379 round: U64::from(b.round()),
380 last_tx_number: Default::default(),
381 miner: b.author().map(|a| H256::from(a.to_u8())),
382 parent_hash: hash_value_to_h256(b.parent_id()),
383 timestamp: U64::from(b.timestamp_usecs()),
384 pivot_decision: Default::default(),
385 height: Default::default(),
386 signatures: vec![],
387 };
388 if let Ok(executed_block) =
395 self.pos_handler.cached_db().get_block(&b.id())
396 {
397 let executed = executed_block.lock();
398 if let Some(version) = executed.output().version() {
399 rpc_block.last_tx_number = U64::from(version);
400 }
401 rpc_block.pivot_decision = executed
402 .output()
403 .pivot_block()
404 .as_ref()
405 .map(|p| Decision::from(p));
406 rpc_block.height = U64::from(
407 executed
408 .output()
409 .executed_trees()
410 .pos_state()
411 .current_view(),
412 );
413 } else if let Ok(committed_block) = self
414 .pos_handler
415 .pos_ledger_db()
416 .get_committed_block_by_hash(&b.id())
417 {
418 rpc_block.last_tx_number = committed_block.version.into();
419 rpc_block.pivot_decision =
420 Some(Decision::from(&committed_block.pivot_decision));
421 rpc_block.height = U64::from(committed_block.view);
422 }
423 if let Some(qc) = qcs.get(&b.id()) {
424 let signatures = qc
425 .ledger_info()
426 .signatures()
427 .iter()
428 .map(|(a, _s)| {
429 let voting_power = latest_epoch_state
430 .verifier()
431 .get_voting_power(a)
432 .unwrap_or(0);
433 Signature {
434 account: H256::from(a.to_u8()),
435 votes: U64::from(voting_power),
436 }
437 })
438 .collect();
439 rpc_block.signatures = signatures;
440 }
441 rpc_block
442 })
443 .collect::<Vec<_>>();
444 Some(rpc_blocks)
445 }
446
447 fn latest_voted(&self) -> Option<Block> {
448 self.consensus_blocks()
449 .map(|blocks| {
450 blocks
451 .iter()
452 .filter(|b| b.pivot_decision.is_some())
453 .last()
454 .cloned()
455 })
456 .flatten()
457 }
458
459 fn consensus_block_by_number(&self, number: U64) -> Option<Block> {
460 self.consensus_blocks()?
461 .into_iter()
462 .find(|b| b.height == number)
463 }
464
465 fn consensus_block_by_hash(&self, hash: H256) -> Option<Block> {
466 self.consensus_blocks()?
467 .into_iter()
468 .find(|b| b.hash == hash)
469 }
470
471 fn tx_by_version(&self, version: u64) -> Option<Transaction> {
472 let pos_ledger_db = self.pos_handler.pos_ledger_db();
473 match pos_ledger_db.get_transaction(version).ok()? {
474 CoreTransaction::UserTransaction(signed_tx) => {
475 let mut block_hash: Option<H256> = None;
476 let mut block_number: Option<U64> = None;
477 let mut timestamp: Option<U64> = None;
478 let block_meta = pos_ledger_db
479 .get_transaction_block_meta(version)
480 .unwrap_or(None);
481 if let Some((_, bm)) = block_meta {
482 block_hash = Some(hash_value_to_h256(bm.id()));
483 timestamp = Some(U64::from(bm.timestamp_usec()));
484 if let Some(block) = self.block_by_hash(block_hash?) {
485 block_number = Some(block.height);
486 }
487 }
488 let status = pos_ledger_db
489 .get_transaction_info(version)
490 .ok()
491 .map(|tx| RpcTransactionStatus::from(tx.status().clone()));
492 Some(Transaction {
493 hash: hash_value_to_h256(signed_tx.hash()),
494 from: H256::from(signed_tx.sender().to_u8()),
495 block_hash,
496 block_number,
497 timestamp,
498 number: U64::from(version),
499 payload: Some(signed_tx.payload().clone().into()),
500 status,
501 tx_type: tx_type(signed_tx.payload().clone()),
502 })
503 }
504 CoreTransaction::GenesisTransaction(_) => None,
505 CoreTransaction::BlockMetadata(block_meta) => {
506 let block_number = self
507 .block_by_hash(hash_value_to_h256(block_meta.id()))
508 .map(|b| U64::from(b.height));
509 let mut tx = Transaction {
510 hash: Default::default(),
511 from: Default::default(),
512 block_hash: Some(hash_value_to_h256(block_meta.id())),
513 block_number,
514 timestamp: Some(U64::from(block_meta.timestamp_usec())),
515 number: U64::from(version),
516 payload: None,
517 status: None,
518 tx_type: RpcTransactionType::BlockMetadata,
519 };
520 if let Some(tx_info) =
521 pos_ledger_db.get_transaction_info(version).ok()
522 {
523 let status =
524 RpcTransactionStatus::from(tx_info.status().clone());
525 tx.status = Some(status);
526 tx.hash = hash_value_to_h256(tx_info.transaction_hash());
527 }
528 Some(tx)
529 }
530 }
531 }
532
533 fn ledger_info_by_block_number(
534 &self, block_number: BlockNumber,
535 ) -> Option<LedgerInfoWithSignatures> {
536 let block_hash = self.block_by_number(block_number.clone())?.hash;
538 debug!(
539 "ledger_info_by_block_number {:?} {:?}",
540 block_number, block_hash
541 );
542 self.pos_handler
543 .pos_ledger_db()
544 .get_block_ledger_info(
545 &HashValue::from_slice(block_hash.as_bytes()).unwrap(),
546 )
547 .ok()
548 }
549
550 fn ledger_info_by_epoch_and_round(
551 &self, epoch: u64, round: u64,
552 ) -> Option<LedgerInfoWithSignatures> {
553 let block_hash = self
554 .pos_handler
555 .pos_ledger_db()
556 .get_block_hash_by_epoch_and_round(epoch, round)
557 .ok()?;
558 self.pos_handler
559 .pos_ledger_db()
560 .get_block_ledger_info(&block_hash)
561 .ok()
562 }
563}
564
565fn map_votes(list: &StatusList) -> Vec<VotePowerState> {
566 let mut ans = Vec::with_capacity(list.len());
567 for item in list.iter() {
568 ans.push(VotePowerState {
569 end_block_number: U64::from(item.view),
570 power: U64::from(item.votes),
571 })
572 }
573 ans
574}
575
576pub fn hash_value_to_h256(h: HashValue) -> H256 {
577 hexstr_to_h256(h.to_hex().as_str())
578}
579
580pub fn convert_to_pos_epoch_reward(
581 reward: PosRewardInfo, network_type: Network,
582) -> Result<PoSEpochReward, String> {
583 let default_value = U256::from(0);
584 let mut account_reward_map = HashMap::new();
585 let mut account_address_map = HashMap::new();
586 for r in reward.account_rewards.iter() {
587 let key = r.pos_identifier;
588 let r1 = account_reward_map.get(&key).unwrap_or(&default_value);
589 let merged_reward = r.reward + r1;
590 account_reward_map.insert(key, merged_reward);
591
592 let rpc_address = RpcAddress::try_from_h160(r.address, network_type)?;
593 account_address_map.insert(key, rpc_address);
594 }
595 let account_rewards = account_reward_map
596 .iter()
597 .map(|(k, v)| Reward {
598 pos_address: *k,
599 pow_address: account_address_map.get(k).unwrap().clone(),
600 reward: *v,
601 })
602 .filter(|r| r.reward > U256::from(0))
603 .collect();
604 Ok(PoSEpochReward {
605 pow_epoch_hash: reward.execution_epoch_hash,
606 account_rewards,
607 })
608}
609
610impl PosRpcServer for PosHandler {
611 fn pos_status(&self) -> RpcResult<Status> { Ok(self.status_impl()) }
612
613 fn pos_account(
614 &self, address: H256, view: Option<U64>,
615 ) -> RpcResult<Account> {
616 self.account_impl(address, view).map_err(Into::into)
617 }
618
619 fn pos_account_by_pow_address(
620 &self, address: RpcAddress, view: Option<U64>,
621 ) -> RpcResult<Account> {
622 self.account_by_pow_address_impl(address, view)
623 .map_err(Into::into)
624 }
625
626 fn pos_committee(&self, view: Option<U64>) -> RpcResult<CommitteeState> {
627 self.committee_by_block_number(view).map_err(Into::into)
628 }
629
630 fn pos_block_by_hash(&self, hash: H256) -> RpcResult<Option<Block>> {
631 Ok(self.block_by_hash(hash))
632 }
633
634 fn pos_block_by_number(
635 &self, number: BlockNumber,
636 ) -> RpcResult<Option<Block>> {
637 Ok(self.block_by_number(number))
638 }
639
640 fn pos_transaction_by_number(
641 &self, number: U64,
642 ) -> RpcResult<Option<Transaction>> {
643 Ok(self.tx_by_version(number.as_u64()))
644 }
645
646 fn pos_consensus_blocks(&self) -> RpcResult<Vec<Block>> {
647 Ok(self.consensus_blocks().unwrap_or(vec![]))
648 }
649
650 fn pos_get_epoch_state(
651 &self, epoch: U64,
652 ) -> RpcResult<Option<RpcEpochState>> {
653 Ok(self
654 .epoch_state_by_epoch_number(epoch.as_u64())
655 .map(|e| (&e).into()))
656 }
657
658 fn pos_get_ledger_info_by_epoch(
659 &self, epoch: U64,
660 ) -> RpcResult<Option<RpcLedgerInfoWithSignatures>> {
661 Ok(self
662 .ledger_info_by_epoch(epoch.as_u64())
663 .map(|l| (&l).into()))
664 }
665
666 fn pos_get_ledger_info_by_block_number(
667 &self, number: BlockNumber,
668 ) -> RpcResult<Option<RpcLedgerInfoWithSignatures>> {
669 Ok(self
670 .ledger_info_by_block_number(number)
671 .map(|l| (&l).into()))
672 }
673
674 fn pos_get_ledger_info_by_epoch_and_round(
675 &self, epoch: U64, round: U64,
676 ) -> RpcResult<Option<RpcLedgerInfoWithSignatures>> {
677 Ok(self
678 .ledger_info_by_epoch_and_round(epoch.as_u64(), round.as_u64())
679 .map(|l| (&l).into()))
680 }
681
682 fn pos_get_ledger_infos_by_epoch(
683 &self, start_epoch: U64, end_epoch: U64,
684 ) -> RpcResult<Vec<RpcLedgerInfoWithSignatures>> {
685 Ok(self
686 .ledger_infos_by_epoch(start_epoch.as_u64(), end_epoch.as_u64())
687 .iter()
688 .map(|l| l.into())
689 .collect())
690 }
691
692 fn pos_get_rewards_by_epoch(
693 &self, epoch: U64,
694 ) -> RpcResult<Option<PoSEpochReward>> {
695 let reward = self
696 .pow_data_manager
697 .pos_reward_by_pos_epoch(epoch.as_u64())
698 .map(|reward_info| {
699 convert_to_pos_epoch_reward(reward_info, self.network_type).ok()
700 })
701 .unwrap_or(None);
702 Ok(reward)
703 }
704}