cfx_rpc_cfx_impl/
pos_handler.rs

1// Copyright 2020 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use cfx_addr::Network;
6use cfx_executor::internal_contract;
7use cfx_rpc_cfx_api::PosRpcServer;
8use cfx_rpc_cfx_types::{
9    pos::{
10        tx_type, Account, Block, BlockNumber, CommitteeState, Decision,
11        EpochState as RpcEpochState,
12        LedgerInfoWithSignatures as RpcLedgerInfoWithSignatures,
13        NodeLockStatus, PoSEpochReward, Reward, RpcCommittee, RpcTermData,
14        RpcTransactionStatus, RpcTransactionType, Signature, Status,
15        Transaction, VotePowerState,
16    },
17    EpochNumber, RpcAddress,
18};
19use cfx_statedb::StateDbExt;
20use cfx_types::{hexstr_to_h256, BigEndianHash, H256, U256, U64};
21use cfx_util_macros::bail;
22use cfxcore::{
23    block_data_manager::block_data_types::PosRewardInfo,
24    consensus::pos_handler::PosVerifier, errors::Result as CoreResult,
25    BlockDataManager, SharedConsensusGraph,
26};
27use consensus_types::block::Block as ConsensusBlock;
28use diem_crypto::hash::HashValue;
29use diem_types::{
30    account_address::AccountAddress,
31    epoch_state::EpochState,
32    ledger_info::LedgerInfoWithSignatures,
33    term_state::{lock_status::StatusList, PosState, TERM_LIST_LEN},
34    transaction::Transaction as CoreTransaction,
35};
36use itertools::Itertools;
37use jsonrpsee::core::RpcResult;
38use log::{debug, info};
39use primitives::{StorageKey, StorageValue};
40use std::{collections::HashMap, sync::Arc};
41use storage_interface::{DBReaderForPoW, DbReader};
42
43pub struct PosHandler {
44    pos_handler: Arc<PosVerifier>,
45    pow_data_manager: Arc<BlockDataManager>,
46    network_type: Network,
47    consensus: SharedConsensusGraph,
48}
49
50impl PosHandler {
51    pub fn new(
52        pos_handler: Arc<PosVerifier>, pow_data_manager: Arc<BlockDataManager>,
53        network_type: Network, consensus: SharedConsensusGraph,
54    ) -> Self {
55        PosHandler {
56            pos_handler,
57            pow_data_manager,
58            network_type,
59            consensus,
60        }
61    }
62
63    fn current_height(&self) -> u64 {
64        self.pos_handler
65            .pos_ledger_db()
66            .get_latest_pos_state()
67            .current_view()
68    }
69
70    fn current_epoch(&self) -> u64 {
71        self.pos_handler
72            .pos_ledger_db()
73            .get_latest_pos_state()
74            .epoch_state()
75            .epoch
76    }
77
78    fn status_impl(&self) -> Status {
79        let state = self.pos_handler.pos_ledger_db().get_latest_pos_state();
80        let decision = state.pivot_decision();
81        let epoch_state = state.epoch_state();
82        let block_number = state.current_view();
83        let latest_voted = self.latest_voted().map(|b| U64::from(b.height));
84        let latest_tx_number = self
85            .block_by_number(BlockNumber::Num(U64::from(block_number)))
86            .map(|b| b.last_tx_number.into())
87            .unwrap_or_default();
88        Status {
89            epoch: U64::from(epoch_state.epoch),
90            latest_committed: U64::from(block_number),
91            pivot_decision: Decision::from(decision),
92            latest_voted,
93            latest_tx_number,
94        }
95    }
96
97    fn account_impl(
98        &self, address: H256, view: Option<U64>,
99    ) -> cfxcore::errors::Result<Account> {
100        let state = self.pos_state_by_view(view)?;
101
102        let account_address = AccountAddress::from_bytes(address);
103
104        if let Ok(addr) = account_address {
105            let maybe_node_data = state.account_node_data(addr);
106            info!("maybe_node_data {:?}", maybe_node_data);
107
108            if let Some(node_data) = maybe_node_data {
109                let lock_status = node_data.lock_status();
110                return Ok(Account {
111                    address,
112                    block_number: U64::from(state.current_view()),
113                    status: NodeLockStatus {
114                        in_queue: map_votes(&lock_status.in_queue),
115                        locked: U64::from(lock_status.locked),
116                        out_queue: map_votes(&lock_status.out_queue),
117                        unlocked: U64::from(lock_status.unlocked_votes()),
118                        available_votes: U64::from(
119                            lock_status.available_votes(),
120                        ),
121                        force_retired: lock_status
122                            .force_retired()
123                            .map(|x| U64::from(x)),
124                        forfeited: U64::from(lock_status.forfeited()),
125                    },
126                });
127            };
128        }
129
130        let mut default_acct: Account = Account::default();
131        default_acct.address = address;
132        default_acct.block_number = U64::from(state.current_view());
133        return Ok(default_acct);
134    }
135
136    fn account_by_pow_address_impl(
137        &self, address: RpcAddress, view: Option<U64>,
138    ) -> CoreResult<Account> {
139        debug!(
140            "Get pos account by pow address {:?}, view {:?}",
141            address, view
142        );
143
144        let state_db = self.consensus.get_state_db_by_epoch_number(
145            EpochNumber::LatestState.into(),
146            "view",
147        )?;
148
149        let identifier_entry =
150            internal_contract::pos_internal_entries::identifier_entry(
151                &address.hex_address,
152            );
153        let storage_key = StorageKey::new_storage_key(
154            &cfx_parameters::internal_contract_addresses::POS_REGISTER_CONTRACT_ADDRESS,
155            identifier_entry.as_ref(),
156        )
157        .with_native_space();
158        let StorageValue { value, .. } = state_db
159            .get::<StorageValue>(storage_key)?
160            .unwrap_or_default();
161        let addr = BigEndianHash::from_uint(&value);
162
163        debug!("Pos Address: {:?}", addr);
164
165        self.account_impl(addr, view)
166    }
167
168    fn pos_state_by_view(
169        &self, view: Option<U64>,
170    ) -> Result<Arc<PosState>, String> {
171        let latest_state =
172            self.pos_handler.pos_ledger_db().get_latest_pos_state();
173        let state = match view {
174            None => latest_state,
175            Some(v) => {
176                let latest_view = latest_state.current_view();
177                let v = v.as_u64();
178                if v > latest_view {
179                    bail!("Specified block {} is not executed, the latest block number is {}", v, latest_view)
180                }
181
182                let state = self
183                    .pos_handler
184                    .pos_ledger_db()
185                    .get_committed_block_hash_by_view(v)
186                    .and_then(|block_hash| {
187                        self.pos_handler
188                            .pos_ledger_db()
189                            .get_pos_state(&block_hash)
190                    })
191                    .map_err(|_| format!("PoS state of {} not found", v))?;
192                Arc::new(state)
193            }
194        };
195        Ok(state)
196    }
197
198    fn committee_by_block_number(
199        &self, view: Option<U64>,
200    ) -> CoreResult<CommitteeState> {
201        let pos_state = self.pos_state_by_view(view)?;
202
203        let current_committee =
204            RpcCommittee::from_epoch_state(pos_state.epoch_state());
205
206        // get future term data
207        let elections = pos_state.term_list().term_list()
208            [TERM_LIST_LEN..=TERM_LIST_LEN + 1]
209            .iter()
210            .map(|term_data| RpcTermData::from(term_data))
211            .collect();
212
213        Ok(CommitteeState {
214            current_committee,
215            elections,
216        })
217    }
218
219    // get epoch ending ledger info
220    fn ledger_info_by_epoch(
221        &self, epoch: u64,
222    ) -> Option<LedgerInfoWithSignatures> {
223        self.pos_handler
224            .pos_ledger_db()
225            .get_epoch_ending_ledger_infos(epoch, epoch + 1)
226            .ok()?
227            .get_all_ledger_infos()
228            .first()
229            .map(|l| l.clone())
230    }
231
232    fn ledger_infos_by_epoch(
233        &self, start_epoch: u64, end_epoch: u64,
234    ) -> Vec<LedgerInfoWithSignatures> {
235        self.pos_handler
236            .pos_ledger_db()
237            .get_epoch_ending_ledger_infos(start_epoch, end_epoch)
238            .ok()
239            .map(|proof| proof.get_all_ledger_infos())
240            .unwrap_or(vec![])
241    }
242
243    // get epoch state
244    fn epoch_state_by_epoch_number(&self, epoch: u64) -> Option<EpochState> {
245        if epoch == 0 {
246            return None;
247        }
248        if epoch == self.current_epoch() {
249            return Some(
250                self.pos_handler
251                    .pos_ledger_db()
252                    .get_latest_pos_state()
253                    .epoch_state()
254                    .clone(),
255            );
256        }
257        if let Some(ledger_info) = self.ledger_info_by_epoch(epoch - 1) {
258            let option = ledger_info.ledger_info().next_epoch_state();
259            return option.map(|f| (*f).clone());
260        }
261        None
262    }
263
264    fn block_by_hash(&self, hash: H256) -> Option<Block> {
265        let hash_value = HashValue::from_slice(hash.as_bytes()).ok()?;
266        let block = self
267            .pos_handler
268            .pos_ledger_db()
269            .get_committed_block_by_hash(&hash_value);
270        match block {
271            Ok(b) => {
272                let mut block = Block {
273                    hash,
274                    height: U64::from(b.view),
275                    epoch: U64::from(b.epoch),
276                    round: U64::from(b.round),
277                    last_tx_number: U64::from(b.version),
278                    miner: b.miner.map(|m| H256::from(m.to_u8())),
279                    parent_hash: hash_value_to_h256(b.parent_hash),
280                    timestamp: U64::from(b.timestamp),
281                    pivot_decision: Some(Decision::from(&b.pivot_decision)),
282                    signatures: vec![],
283                };
284                // get signatures info
285                if let Some(epoch_state) =
286                    self.epoch_state_by_epoch_number(b.epoch)
287                {
288                    if let Ok(ledger_info) = self
289                        .pos_handler
290                        .pos_ledger_db()
291                        .get_ledger_info_by_voted_block(&b.hash)
292                    {
293                        block.signatures = ledger_info
294                            .signatures()
295                            .iter()
296                            .map(|(a, _s)| {
297                                let voting_power = epoch_state
298                                    .verifier()
299                                    .get_voting_power(a)
300                                    .unwrap_or(0);
301                                Signature {
302                                    account: H256::from(a.to_u8()),
303                                    votes: U64::from(voting_power),
304                                }
305                            })
306                            .collect();
307                    }
308                };
309                Some(block)
310            }
311            Err(_) => self.consensus_block_by_hash(hash),
312        }
313    }
314
315    fn block_by_number(&self, number: BlockNumber) -> Option<Block> {
316        match number {
317            BlockNumber::Num(num) => {
318                if num.as_u64() <= self.current_height() {
319                    let hash = self
320                        .pos_handler
321                        .pos_ledger_db()
322                        .get_committed_block_hash_by_view(num.as_u64())
323                        .ok()?;
324                    self.block_by_hash(hash_value_to_h256(hash))
325                } else {
326                    self.consensus_block_by_number(num)
327                }
328            }
329            BlockNumber::LatestCommitted => {
330                let hash = self.pos_handler.get_latest_pos_reference();
331                self.block_by_hash(hash)
332            }
333            BlockNumber::Earliest => {
334                let hash = self
335                    .pos_handler
336                    .pos_ledger_db()
337                    .get_committed_block_hash_by_view(1)
338                    .ok()?;
339                self.block_by_hash(hash_value_to_h256(hash))
340            }
341            BlockNumber::LatestVoted => self.latest_voted(),
342        }
343    }
344
345    fn consensus_blocks(&self) -> Option<Vec<Block>> {
346        let blocks = self.pos_handler.consensus_db().get_blocks().ok()?;
347        let block_ids = blocks.values().map(|b| b.id()).collect::<Vec<_>>();
348        debug!("consensus_blocks: block_ids={:?}", block_ids);
349        if blocks.len() == 0 {
350            return Some(vec![]);
351        }
352        let qcs = self
353            .pos_handler
354            .consensus_db()
355            .get_quorum_certificates()
356            .ok()?;
357        // sort by epoch and round
358        let blocks: Vec<ConsensusBlock> = blocks
359            .into_iter()
360            .sorted_by(|(_, b1), (_, b2)| {
361                Ord::cmp(&(b1.epoch(), b1.round()), &(b2.epoch(), b2.round()))
362            })
363            .map(|(_, b)| b)
364            .collect();
365        let latest_epoch_state = self
366            .pos_handler
367            .pos_ledger_db()
368            .get_latest_pos_state()
369            .epoch_state()
370            .clone();
371        // map to Committed block
372        let rpc_blocks = blocks
373            .into_iter()
374            .filter(|b| b.epoch() == latest_epoch_state.epoch)
375            .map(|b| {
376                let mut rpc_block = Block {
377                    hash: hash_value_to_h256(b.id()),
378                    epoch: U64::from(b.epoch()),
379                    round: U64::from(b.round()),
380                    last_tx_number: Default::default(),
381                    miner: b.author().map(|a| H256::from(a.to_u8())),
382                    parent_hash: hash_value_to_h256(b.parent_id()),
383                    timestamp: U64::from(b.timestamp_usecs()),
384                    pivot_decision: Default::default(),
385                    height: Default::default(),
386                    signatures: vec![],
387                };
388                // Executed blocks are committed and pruned before ConsensusDB.
389                // If we get a block from ConsensusDB and it's pruned before we
390                // get the executed block here, its version and
391                // pivot decision would be missing.
392                // If this consensus block is not on a fork, its CommittedBlock
393                // should be accessible in this case.
394                if let Ok(executed_block) =
395                    self.pos_handler.cached_db().get_block(&b.id())
396                {
397                    let executed = executed_block.lock();
398                    if let Some(version) = executed.output().version() {
399                        rpc_block.last_tx_number = U64::from(version);
400                    }
401                    rpc_block.pivot_decision = executed
402                        .output()
403                        .pivot_block()
404                        .as_ref()
405                        .map(|p| Decision::from(p));
406                    rpc_block.height = U64::from(
407                        executed
408                            .output()
409                            .executed_trees()
410                            .pos_state()
411                            .current_view(),
412                    );
413                } else if let Ok(committed_block) = self
414                    .pos_handler
415                    .pos_ledger_db()
416                    .get_committed_block_by_hash(&b.id())
417                {
418                    rpc_block.last_tx_number = committed_block.version.into();
419                    rpc_block.pivot_decision =
420                        Some(Decision::from(&committed_block.pivot_decision));
421                    rpc_block.height = U64::from(committed_block.view);
422                }
423                if let Some(qc) = qcs.get(&b.id()) {
424                    let signatures = qc
425                        .ledger_info()
426                        .signatures()
427                        .iter()
428                        .map(|(a, _s)| {
429                            let voting_power = latest_epoch_state
430                                .verifier()
431                                .get_voting_power(a)
432                                .unwrap_or(0);
433                            Signature {
434                                account: H256::from(a.to_u8()),
435                                votes: U64::from(voting_power),
436                            }
437                        })
438                        .collect();
439                    rpc_block.signatures = signatures;
440                }
441                rpc_block
442            })
443            .collect::<Vec<_>>();
444        Some(rpc_blocks)
445    }
446
447    fn latest_voted(&self) -> Option<Block> {
448        self.consensus_blocks()
449            .map(|blocks| {
450                blocks
451                    .iter()
452                    .filter(|b| b.pivot_decision.is_some())
453                    .last()
454                    .cloned()
455            })
456            .flatten()
457    }
458
459    fn consensus_block_by_number(&self, number: U64) -> Option<Block> {
460        self.consensus_blocks()?
461            .into_iter()
462            .find(|b| b.height == number)
463    }
464
465    fn consensus_block_by_hash(&self, hash: H256) -> Option<Block> {
466        self.consensus_blocks()?
467            .into_iter()
468            .find(|b| b.hash == hash)
469    }
470
471    fn tx_by_version(&self, version: u64) -> Option<Transaction> {
472        let pos_ledger_db = self.pos_handler.pos_ledger_db();
473        match pos_ledger_db.get_transaction(version).ok()? {
474            CoreTransaction::UserTransaction(signed_tx) => {
475                let mut block_hash: Option<H256> = None;
476                let mut block_number: Option<U64> = None;
477                let mut timestamp: Option<U64> = None;
478                let block_meta = pos_ledger_db
479                    .get_transaction_block_meta(version)
480                    .unwrap_or(None);
481                if let Some((_, bm)) = block_meta {
482                    block_hash = Some(hash_value_to_h256(bm.id()));
483                    timestamp = Some(U64::from(bm.timestamp_usec()));
484                    if let Some(block) = self.block_by_hash(block_hash?) {
485                        block_number = Some(block.height);
486                    }
487                }
488                let status = pos_ledger_db
489                    .get_transaction_info(version)
490                    .ok()
491                    .map(|tx| RpcTransactionStatus::from(tx.status().clone()));
492                Some(Transaction {
493                    hash: hash_value_to_h256(signed_tx.hash()),
494                    from: H256::from(signed_tx.sender().to_u8()),
495                    block_hash,
496                    block_number,
497                    timestamp,
498                    number: U64::from(version),
499                    payload: Some(signed_tx.payload().clone().into()),
500                    status,
501                    tx_type: tx_type(signed_tx.payload().clone()),
502                })
503            }
504            CoreTransaction::GenesisTransaction(_) => None,
505            CoreTransaction::BlockMetadata(block_meta) => {
506                let block_number = self
507                    .block_by_hash(hash_value_to_h256(block_meta.id()))
508                    .map(|b| U64::from(b.height));
509                let mut tx = Transaction {
510                    hash: Default::default(),
511                    from: Default::default(),
512                    block_hash: Some(hash_value_to_h256(block_meta.id())),
513                    block_number,
514                    timestamp: Some(U64::from(block_meta.timestamp_usec())),
515                    number: U64::from(version),
516                    payload: None,
517                    status: None,
518                    tx_type: RpcTransactionType::BlockMetadata,
519                };
520                if let Some(tx_info) =
521                    pos_ledger_db.get_transaction_info(version).ok()
522                {
523                    let status =
524                        RpcTransactionStatus::from(tx_info.status().clone());
525                    tx.status = Some(status);
526                    tx.hash = hash_value_to_h256(tx_info.transaction_hash());
527                }
528                Some(tx)
529            }
530        }
531    }
532
533    fn ledger_info_by_block_number(
534        &self, block_number: BlockNumber,
535    ) -> Option<LedgerInfoWithSignatures> {
536        // TODO: Get hash without getting the block.
537        let block_hash = self.block_by_number(block_number.clone())?.hash;
538        debug!(
539            "ledger_info_by_block_number {:?} {:?}",
540            block_number, block_hash
541        );
542        self.pos_handler
543            .pos_ledger_db()
544            .get_block_ledger_info(
545                &HashValue::from_slice(block_hash.as_bytes()).unwrap(),
546            )
547            .ok()
548    }
549
550    fn ledger_info_by_epoch_and_round(
551        &self, epoch: u64, round: u64,
552    ) -> Option<LedgerInfoWithSignatures> {
553        let block_hash = self
554            .pos_handler
555            .pos_ledger_db()
556            .get_block_hash_by_epoch_and_round(epoch, round)
557            .ok()?;
558        self.pos_handler
559            .pos_ledger_db()
560            .get_block_ledger_info(&block_hash)
561            .ok()
562    }
563}
564
565fn map_votes(list: &StatusList) -> Vec<VotePowerState> {
566    let mut ans = Vec::with_capacity(list.len());
567    for item in list.iter() {
568        ans.push(VotePowerState {
569            end_block_number: U64::from(item.view),
570            power: U64::from(item.votes),
571        })
572    }
573    ans
574}
575
576pub fn hash_value_to_h256(h: HashValue) -> H256 {
577    hexstr_to_h256(h.to_hex().as_str())
578}
579
580pub fn convert_to_pos_epoch_reward(
581    reward: PosRewardInfo, network_type: Network,
582) -> Result<PoSEpochReward, String> {
583    let default_value = U256::from(0);
584    let mut account_reward_map = HashMap::new();
585    let mut account_address_map = HashMap::new();
586    for r in reward.account_rewards.iter() {
587        let key = r.pos_identifier;
588        let r1 = account_reward_map.get(&key).unwrap_or(&default_value);
589        let merged_reward = r.reward + r1;
590        account_reward_map.insert(key, merged_reward);
591
592        let rpc_address = RpcAddress::try_from_h160(r.address, network_type)?;
593        account_address_map.insert(key, rpc_address);
594    }
595    let account_rewards = account_reward_map
596        .iter()
597        .map(|(k, v)| Reward {
598            pos_address: *k,
599            pow_address: account_address_map.get(k).unwrap().clone(),
600            reward: *v,
601        })
602        .filter(|r| r.reward > U256::from(0))
603        .collect();
604    Ok(PoSEpochReward {
605        pow_epoch_hash: reward.execution_epoch_hash,
606        account_rewards,
607    })
608}
609
610impl PosRpcServer for PosHandler {
611    fn pos_status(&self) -> RpcResult<Status> { Ok(self.status_impl()) }
612
613    fn pos_account(
614        &self, address: H256, view: Option<U64>,
615    ) -> RpcResult<Account> {
616        self.account_impl(address, view).map_err(Into::into)
617    }
618
619    fn pos_account_by_pow_address(
620        &self, address: RpcAddress, view: Option<U64>,
621    ) -> RpcResult<Account> {
622        self.account_by_pow_address_impl(address, view)
623            .map_err(Into::into)
624    }
625
626    fn pos_committee(&self, view: Option<U64>) -> RpcResult<CommitteeState> {
627        self.committee_by_block_number(view).map_err(Into::into)
628    }
629
630    fn pos_block_by_hash(&self, hash: H256) -> RpcResult<Option<Block>> {
631        Ok(self.block_by_hash(hash))
632    }
633
634    fn pos_block_by_number(
635        &self, number: BlockNumber,
636    ) -> RpcResult<Option<Block>> {
637        Ok(self.block_by_number(number))
638    }
639
640    fn pos_transaction_by_number(
641        &self, number: U64,
642    ) -> RpcResult<Option<Transaction>> {
643        Ok(self.tx_by_version(number.as_u64()))
644    }
645
646    fn pos_consensus_blocks(&self) -> RpcResult<Vec<Block>> {
647        Ok(self.consensus_blocks().unwrap_or(vec![]))
648    }
649
650    fn pos_get_epoch_state(
651        &self, epoch: U64,
652    ) -> RpcResult<Option<RpcEpochState>> {
653        Ok(self
654            .epoch_state_by_epoch_number(epoch.as_u64())
655            .map(|e| (&e).into()))
656    }
657
658    fn pos_get_ledger_info_by_epoch(
659        &self, epoch: U64,
660    ) -> RpcResult<Option<RpcLedgerInfoWithSignatures>> {
661        Ok(self
662            .ledger_info_by_epoch(epoch.as_u64())
663            .map(|l| (&l).into()))
664    }
665
666    fn pos_get_ledger_info_by_block_number(
667        &self, number: BlockNumber,
668    ) -> RpcResult<Option<RpcLedgerInfoWithSignatures>> {
669        Ok(self
670            .ledger_info_by_block_number(number)
671            .map(|l| (&l).into()))
672    }
673
674    fn pos_get_ledger_info_by_epoch_and_round(
675        &self, epoch: U64, round: U64,
676    ) -> RpcResult<Option<RpcLedgerInfoWithSignatures>> {
677        Ok(self
678            .ledger_info_by_epoch_and_round(epoch.as_u64(), round.as_u64())
679            .map(|l| (&l).into()))
680    }
681
682    fn pos_get_ledger_infos_by_epoch(
683        &self, start_epoch: U64, end_epoch: U64,
684    ) -> RpcResult<Vec<RpcLedgerInfoWithSignatures>> {
685        Ok(self
686            .ledger_infos_by_epoch(start_epoch.as_u64(), end_epoch.as_u64())
687            .iter()
688            .map(|l| l.into())
689            .collect())
690    }
691
692    fn pos_get_rewards_by_epoch(
693        &self, epoch: U64,
694    ) -> RpcResult<Option<PoSEpochReward>> {
695        let reward = self
696            .pow_data_manager
697            .pos_reward_by_pos_epoch(epoch.as_u64())
698            .map(|reward_info| {
699                convert_to_pos_epoch_reward(reward_info, self.network_type).ok()
700            })
701            .unwrap_or(None);
702        Ok(reward)
703    }
704}