client/rpc/impls/
pos.rs

1// Copyright 2020 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::{
6    common::delegate_convert::into_jsonrpc_result,
7    rpc::{
8        errors::{build_rpc_server_error, codes::POS_NOT_ENABLED},
9        traits::pos::Pos,
10        types::{
11            pos::{
12                tx_type, Account, Block, BlockNumber, CommitteeState, Decision,
13                EpochState as RpcEpochState,
14                LedgerInfoWithSignatures as RpcLedgerInfoWithSignatures,
15                NodeLockStatus, PoSEpochReward, Reward, RpcCommittee,
16                RpcTermData, RpcTransactionStatus, RpcTransactionType,
17                Signature, Status, Transaction, VotePowerState,
18            },
19            EpochNumber, RpcAddress,
20        },
21        CoreResult, RpcInterceptor,
22    },
23};
24use cfx_addr::Network;
25use cfx_executor::internal_contract;
26use cfx_parameters::internal_contract_addresses::POS_REGISTER_CONTRACT_ADDRESS;
27use cfx_statedb::StateDbExt;
28use cfx_types::{hexstr_to_h256, BigEndianHash, H256, U256, U64};
29use cfx_util_macros::bail;
30use cfxcore::{
31    block_data_manager::block_data_types::PosRewardInfo,
32    consensus::pos_handler::PosVerifier, BlockDataManager,
33    SharedConsensusGraph,
34};
35use consensus_types::block::Block as ConsensusBlock;
36use diem_crypto::hash::HashValue;
37use diem_types::{
38    account_address::AccountAddress,
39    epoch_state::EpochState,
40    ledger_info::LedgerInfoWithSignatures,
41    term_state::{lock_status::StatusList, PosState, TERM_LIST_LEN},
42    transaction::Transaction as CoreTransaction,
43};
44use itertools::Itertools;
45use jsonrpc_core::Result as JsonRpcResult;
46use log::{debug, info};
47use primitives::{StorageKey, StorageValue};
48use std::{collections::HashMap, sync::Arc};
49use storage_interface::{DBReaderForPoW, DbReader};
50
51pub struct PoSInterceptor {
52    pos_handler: Arc<PosVerifier>,
53}
54
55impl PoSInterceptor {
56    pub fn new(pos_handler: Arc<PosVerifier>) -> Self {
57        PoSInterceptor { pos_handler }
58    }
59}
60
61impl RpcInterceptor for PoSInterceptor {
62    fn before(&self, _name: &String) -> JsonRpcResult<()> {
63        match self.pos_handler.pos_option() {
64            Some(_) => Ok(()),
65            None => bail!(build_rpc_server_error(
66                POS_NOT_ENABLED,
67                "PoS chain is not enabled".into()
68            )),
69        }
70    }
71}
72
73pub struct PosHandler {
74    pos_handler: Arc<PosVerifier>,
75    pow_data_manager: Arc<BlockDataManager>,
76    network_type: Network,
77    consensus: SharedConsensusGraph,
78}
79
80impl PosHandler {
81    pub fn new(
82        pos_handler: Arc<PosVerifier>, pow_data_manager: Arc<BlockDataManager>,
83        network_type: Network, consensus: SharedConsensusGraph,
84    ) -> Self {
85        PosHandler {
86            pos_handler,
87            pow_data_manager,
88            network_type,
89            consensus,
90        }
91    }
92
93    fn current_height(&self) -> u64 {
94        self.pos_handler
95            .pos_ledger_db()
96            .get_latest_pos_state()
97            .current_view()
98    }
99
100    fn current_epoch(&self) -> u64 {
101        self.pos_handler
102            .pos_ledger_db()
103            .get_latest_pos_state()
104            .epoch_state()
105            .epoch
106    }
107
108    fn status_impl(&self) -> Status {
109        let state = self.pos_handler.pos_ledger_db().get_latest_pos_state();
110        let decision = state.pivot_decision();
111        let epoch_state = state.epoch_state();
112        let block_number = state.current_view();
113        let latest_voted = self.latest_voted().map(|b| U64::from(b.height));
114        let latest_tx_number = self
115            .block_by_number(BlockNumber::Num(U64::from(block_number)))
116            .map(|b| b.last_tx_number.into())
117            .unwrap_or_default();
118        Status {
119            epoch: U64::from(epoch_state.epoch),
120            latest_committed: U64::from(block_number),
121            pivot_decision: Decision::from(decision),
122            latest_voted,
123            latest_tx_number,
124        }
125    }
126
127    fn account_impl(
128        &self, address: H256, view: Option<U64>,
129    ) -> CoreResult<Account> {
130        let state = self.pos_state_by_view(view)?;
131
132        let account_address = AccountAddress::from_bytes(address);
133
134        if let Ok(addr) = account_address {
135            let maybe_node_data = state.account_node_data(addr);
136            info!("maybe_node_data {:?}", maybe_node_data);
137
138            if let Some(node_data) = maybe_node_data {
139                let lock_status = node_data.lock_status();
140                return Ok(Account {
141                    address,
142                    block_number: U64::from(state.current_view()),
143                    status: NodeLockStatus {
144                        in_queue: map_votes(&lock_status.in_queue),
145                        locked: U64::from(lock_status.locked),
146                        out_queue: map_votes(&lock_status.out_queue),
147                        unlocked: U64::from(lock_status.unlocked_votes()),
148                        available_votes: U64::from(
149                            lock_status.available_votes(),
150                        ),
151                        force_retired: lock_status
152                            .force_retired()
153                            .map(|x| U64::from(x)),
154                        forfeited: U64::from(lock_status.forfeited()),
155                    },
156                });
157            };
158        }
159
160        let mut default_acct: Account = Account::default();
161        default_acct.address = address;
162        default_acct.block_number = U64::from(state.current_view());
163        return Ok(default_acct);
164    }
165
166    fn account_by_pow_address_impl(
167        &self, address: RpcAddress, view: Option<U64>,
168    ) -> CoreResult<Account> {
169        debug!(
170            "Get pos account by pow address {:?}, view {:?}",
171            address, view
172        );
173
174        let state_db = self.consensus.get_state_db_by_epoch_number(
175            EpochNumber::LatestState.into(),
176            "view",
177        )?;
178
179        let identifier_entry =
180            internal_contract::pos_internal_entries::identifier_entry(
181                &address.hex_address,
182            );
183        let storage_key = StorageKey::new_storage_key(
184            &POS_REGISTER_CONTRACT_ADDRESS,
185            identifier_entry.as_ref(),
186        )
187        .with_native_space();
188        let StorageValue { value, .. } = state_db
189            .get::<StorageValue>(storage_key)?
190            .unwrap_or_default();
191        let addr = BigEndianHash::from_uint(&value);
192
193        debug!("Pos Address: {:?}", addr);
194
195        self.account_impl(addr, view)
196    }
197
198    fn pos_state_by_view(
199        &self, view: Option<U64>,
200    ) -> Result<Arc<PosState>, String> {
201        let latest_state =
202            self.pos_handler.pos_ledger_db().get_latest_pos_state();
203        let state = match view {
204            None => latest_state,
205            Some(v) => {
206                let latest_view = latest_state.current_view();
207                let v = v.as_u64();
208                if v > latest_view {
209                    bail!("Specified block {} is not executed, the latest block number is {}", v, latest_view)
210                }
211
212                let state = self
213                    .pos_handler
214                    .pos_ledger_db()
215                    .get_committed_block_hash_by_view(v)
216                    .and_then(|block_hash| {
217                        self.pos_handler
218                            .pos_ledger_db()
219                            .get_pos_state(&block_hash)
220                    })
221                    .map_err(|_| format!("PoS state of {} not found", v))?;
222                Arc::new(state)
223            }
224        };
225        Ok(state)
226    }
227
228    fn committee_by_block_number(
229        &self, view: Option<U64>,
230    ) -> CoreResult<CommitteeState> {
231        let pos_state = self.pos_state_by_view(view)?;
232
233        let current_committee =
234            RpcCommittee::from_epoch_state(pos_state.epoch_state());
235
236        // get future term data
237        let elections = pos_state.term_list().term_list()
238            [TERM_LIST_LEN..=TERM_LIST_LEN + 1]
239            .iter()
240            .map(|term_data| RpcTermData::from(term_data))
241            .collect();
242
243        Ok(CommitteeState {
244            current_committee,
245            elections,
246        })
247    }
248
249    // get epoch ending ledger info
250    fn ledger_info_by_epoch(
251        &self, epoch: u64,
252    ) -> Option<LedgerInfoWithSignatures> {
253        self.pos_handler
254            .pos_ledger_db()
255            .get_epoch_ending_ledger_infos(epoch, epoch + 1)
256            .ok()?
257            .get_all_ledger_infos()
258            .first()
259            .map(|l| l.clone())
260    }
261
262    fn ledger_infos_by_epoch(
263        &self, start_epoch: u64, end_epoch: u64,
264    ) -> Vec<LedgerInfoWithSignatures> {
265        self.pos_handler
266            .pos_ledger_db()
267            .get_epoch_ending_ledger_infos(start_epoch, end_epoch)
268            .ok()
269            .map(|proof| proof.get_all_ledger_infos())
270            .unwrap_or(vec![])
271    }
272
273    // get epoch state
274    fn epoch_state_by_epoch_number(&self, epoch: u64) -> Option<EpochState> {
275        if epoch == 0 {
276            return None;
277        }
278        if epoch == self.current_epoch() {
279            return Some(
280                self.pos_handler
281                    .pos_ledger_db()
282                    .get_latest_pos_state()
283                    .epoch_state()
284                    .clone(),
285            );
286        }
287        if let Some(ledger_info) = self.ledger_info_by_epoch(epoch - 1) {
288            let option = ledger_info.ledger_info().next_epoch_state();
289            return option.map(|f| (*f).clone());
290        }
291        None
292    }
293
294    fn block_by_hash(&self, hash: H256) -> Option<Block> {
295        let hash_value = HashValue::from_slice(hash.as_bytes()).ok()?;
296        let block = self
297            .pos_handler
298            .pos_ledger_db()
299            .get_committed_block_by_hash(&hash_value);
300        match block {
301            Ok(b) => {
302                let mut block = Block {
303                    hash,
304                    height: U64::from(b.view),
305                    epoch: U64::from(b.epoch),
306                    round: U64::from(b.round),
307                    last_tx_number: U64::from(b.version),
308                    miner: b.miner.map(|m| H256::from(m.to_u8())),
309                    parent_hash: hash_value_to_h256(b.parent_hash),
310                    timestamp: U64::from(b.timestamp),
311                    pivot_decision: Some(Decision::from(&b.pivot_decision)),
312                    signatures: vec![],
313                };
314                // get signatures info
315                if let Some(epoch_state) =
316                    self.epoch_state_by_epoch_number(b.epoch)
317                {
318                    if let Ok(ledger_info) = self
319                        .pos_handler
320                        .pos_ledger_db()
321                        .get_ledger_info_by_voted_block(&b.hash)
322                    {
323                        block.signatures = ledger_info
324                            .signatures()
325                            .iter()
326                            .map(|(a, _s)| {
327                                let voting_power = epoch_state
328                                    .verifier()
329                                    .get_voting_power(a)
330                                    .unwrap_or(0);
331                                Signature {
332                                    account: H256::from(a.to_u8()),
333                                    // signature: s.to_string(),
334                                    votes: U64::from(voting_power),
335                                }
336                            })
337                            .collect();
338                    }
339                };
340                Some(block)
341            }
342            Err(_) => self.consensus_block_by_hash(hash),
343        }
344    }
345
346    fn block_by_number(&self, number: BlockNumber) -> Option<Block> {
347        match number {
348            BlockNumber::Num(num) => {
349                if num.as_u64() <= self.current_height() {
350                    let hash = self
351                        .pos_handler
352                        .pos_ledger_db()
353                        .get_committed_block_hash_by_view(num.as_u64())
354                        .ok()?;
355                    self.block_by_hash(hash_value_to_h256(hash))
356                } else {
357                    self.consensus_block_by_number(num)
358                }
359            }
360            BlockNumber::LatestCommitted => {
361                let hash = self.pos_handler.get_latest_pos_reference();
362                self.block_by_hash(hash)
363            }
364            BlockNumber::Earliest => {
365                let hash = self
366                    .pos_handler
367                    .pos_ledger_db()
368                    .get_committed_block_hash_by_view(1)
369                    .ok()?;
370                self.block_by_hash(hash_value_to_h256(hash))
371            }
372            BlockNumber::LatestVoted => self.latest_voted(),
373        }
374    }
375
376    fn consensus_blocks(&self) -> Option<Vec<Block>> {
377        let blocks = self.pos_handler.consensus_db().get_blocks().ok()?;
378        let block_ids = blocks.values().map(|b| b.id()).collect::<Vec<_>>();
379        debug!("consensus_blocks: block_ids={:?}", block_ids);
380        if blocks.len() == 0 {
381            return Some(vec![]);
382        }
383        let qcs = self
384            .pos_handler
385            .consensus_db()
386            .get_quorum_certificates()
387            .ok()?;
388        // sort by epoch and round
389        let blocks: Vec<ConsensusBlock> = blocks
390            .into_iter()
391            .sorted_by(|(_, b1), (_, b2)| {
392                Ord::cmp(&(b1.epoch(), b1.round()), &(b2.epoch(), b2.round()))
393            })
394            .map(|(_, b)| b)
395            .collect();
396        let latest_epoch_state = self
397            .pos_handler
398            .pos_ledger_db()
399            .get_latest_pos_state()
400            .epoch_state()
401            .clone();
402        // map to Committed block
403        let rpc_blocks = blocks
404            .into_iter()
405            .filter(|b| b.epoch() == latest_epoch_state.epoch)
406            .map(|b| {
407                let mut rpc_block = Block {
408                    hash: hash_value_to_h256(b.id()),
409                    epoch: U64::from(b.epoch()),
410                    round: U64::from(b.round()),
411                    last_tx_number: Default::default(),
412                    miner: b.author().map(|a| H256::from(a.to_u8())),
413                    parent_hash: hash_value_to_h256(b.parent_id()),
414                    timestamp: U64::from(b.timestamp_usecs()),
415                    pivot_decision: Default::default(),
416                    height: Default::default(),
417                    signatures: vec![],
418                };
419                // Executed blocks are committed and pruned before ConsensusDB.
420                // If we get a block from ConsensusDB and it's pruned before we
421                // get the executed block here, its version and
422                // pivot decision would be missing.
423                // If this consensus block is not on a fork, its CommittedBlock
424                // should be accessible in this case.
425                if let Ok(executed_block) =
426                    self.pos_handler.cached_db().get_block(&b.id())
427                {
428                    let executed = executed_block.lock();
429                    if let Some(version) = executed.output().version() {
430                        rpc_block.last_tx_number = U64::from(version);
431                    }
432                    rpc_block.pivot_decision = executed
433                        .output()
434                        .pivot_block()
435                        .as_ref()
436                        .map(|p| Decision::from(p));
437                    rpc_block.height = U64::from(
438                        executed
439                            .output()
440                            .executed_trees()
441                            .pos_state()
442                            .current_view(),
443                    );
444                } else if let Ok(committed_block) = self
445                    .pos_handler
446                    .pos_ledger_db()
447                    .get_committed_block_by_hash(&b.id())
448                {
449                    rpc_block.last_tx_number = committed_block.version.into();
450                    rpc_block.pivot_decision =
451                        Some(Decision::from(&committed_block.pivot_decision));
452                    rpc_block.height = U64::from(committed_block.view);
453                }
454                if let Some(qc) = qcs.get(&b.id()) {
455                    let signatures = qc
456                        .ledger_info()
457                        .signatures()
458                        .iter()
459                        .map(|(a, _s)| {
460                            let voting_power = latest_epoch_state
461                                .verifier()
462                                .get_voting_power(a)
463                                .unwrap_or(0);
464                            Signature {
465                                account: H256::from(a.to_u8()),
466                                // signature: s.to_string(),
467                                votes: U64::from(voting_power),
468                            }
469                        })
470                        .collect();
471                    rpc_block.signatures = signatures;
472                }
473                rpc_block
474            })
475            .collect::<Vec<_>>();
476        Some(rpc_blocks)
477    }
478
479    fn latest_voted(&self) -> Option<Block> {
480        self.consensus_blocks()
481            .map(|blocks| {
482                blocks
483                    .iter()
484                    .filter(|b| b.pivot_decision.is_some())
485                    .last()
486                    .cloned()
487            })
488            .flatten()
489    }
490
491    fn consensus_block_by_number(&self, number: U64) -> Option<Block> {
492        self.consensus_blocks()?
493            .into_iter()
494            .find(|b| b.height == number)
495    }
496
497    fn consensus_block_by_hash(&self, hash: H256) -> Option<Block> {
498        self.consensus_blocks()?
499            .into_iter()
500            .find(|b| b.hash == hash)
501    }
502
503    fn tx_by_version(&self, version: u64) -> Option<Transaction> {
504        let pos_ledger_db = self.pos_handler.pos_ledger_db();
505        match pos_ledger_db.get_transaction(version).ok()? {
506            CoreTransaction::UserTransaction(signed_tx) => {
507                let mut block_hash: Option<H256> = None;
508                let mut block_number: Option<U64> = None;
509                let mut timestamp: Option<U64> = None;
510                let block_meta = pos_ledger_db
511                    .get_transaction_block_meta(version)
512                    .unwrap_or(None);
513                if let Some((_, bm)) = block_meta {
514                    block_hash = Some(hash_value_to_h256(bm.id()));
515                    timestamp = Some(U64::from(bm.timestamp_usec()));
516                    if let Some(block) = self.block_by_hash(block_hash?) {
517                        block_number = Some(block.height);
518                    }
519                }
520                let status = pos_ledger_db
521                    .get_transaction_info(version)
522                    .ok()
523                    .map(|tx| RpcTransactionStatus::from(tx.status().clone()));
524                Some(Transaction {
525                    hash: hash_value_to_h256(signed_tx.hash()),
526                    from: H256::from(signed_tx.sender().to_u8()),
527                    block_hash,
528                    block_number,
529                    timestamp,
530                    number: U64::from(version),
531                    payload: Some(signed_tx.payload().clone().into()),
532                    status,
533                    tx_type: tx_type(signed_tx.payload().clone()),
534                })
535            }
536            CoreTransaction::GenesisTransaction(_) => None,
537            CoreTransaction::BlockMetadata(block_meta) => {
538                let block_number = self
539                    .block_by_hash(hash_value_to_h256(block_meta.id()))
540                    .map(|b| U64::from(b.height));
541                let mut tx = Transaction {
542                    hash: Default::default(),
543                    from: Default::default(), // TODO
544                    block_hash: Some(hash_value_to_h256(block_meta.id())),
545                    block_number,
546                    timestamp: Some(U64::from(block_meta.timestamp_usec())),
547                    number: U64::from(version),
548                    payload: None,
549                    status: None,
550                    tx_type: RpcTransactionType::BlockMetadata,
551                };
552                if let Some(tx_info) =
553                    pos_ledger_db.get_transaction_info(version).ok()
554                {
555                    let status =
556                        RpcTransactionStatus::from(tx_info.status().clone());
557                    tx.status = Some(status);
558                    tx.hash = hash_value_to_h256(tx_info.transaction_hash());
559                }
560                Some(tx)
561            }
562        }
563    }
564
565    fn ledger_info_by_block_number(
566        &self, block_number: BlockNumber,
567    ) -> Option<LedgerInfoWithSignatures> {
568        // TODO: Get hash without getting the block.
569        let block_hash = self.block_by_number(block_number.clone())?.hash;
570        debug!(
571            "ledger_info_by_block_number {:?} {:?}",
572            block_number, block_hash
573        );
574        self.pos_handler
575            .pos_ledger_db()
576            .get_block_ledger_info(
577                &HashValue::from_slice(block_hash.as_bytes()).unwrap(),
578            )
579            .ok()
580    }
581
582    fn ledger_info_by_epoch_and_round(
583        &self, epoch: u64, round: u64,
584    ) -> Option<LedgerInfoWithSignatures> {
585        let block_hash = self
586            .pos_handler
587            .pos_ledger_db()
588            .get_block_hash_by_epoch_and_round(epoch, round)
589            .ok()?;
590        self.pos_handler
591            .pos_ledger_db()
592            .get_block_ledger_info(&block_hash)
593            .ok()
594    }
595}
596
597fn map_votes(list: &StatusList) -> Vec<VotePowerState> {
598    let mut ans = Vec::with_capacity(list.len());
599    for item in list.iter() {
600        ans.push(VotePowerState {
601            end_block_number: U64::from(item.view),
602            power: U64::from(item.votes),
603        })
604    }
605    ans
606}
607
608pub fn hash_value_to_h256(h: HashValue) -> H256 {
609    hexstr_to_h256(h.to_hex().as_str())
610}
611
612impl Pos for PosHandler {
613    fn pos_status(&self) -> JsonRpcResult<Status> { Ok(self.status_impl()) }
614
615    fn pos_account(
616        &self, address: H256, view: Option<U64>,
617    ) -> JsonRpcResult<Account> {
618        into_jsonrpc_result(self.account_impl(address, view))
619    }
620
621    fn pos_account_by_pow_address(
622        &self, address: RpcAddress, view: Option<U64>,
623    ) -> JsonRpcResult<Account> {
624        into_jsonrpc_result(self.account_by_pow_address_impl(address, view))
625    }
626
627    fn pos_committee(
628        &self, view: Option<U64>,
629    ) -> JsonRpcResult<CommitteeState> {
630        into_jsonrpc_result(self.committee_by_block_number(view))
631    }
632
633    fn pos_block_by_hash(&self, hash: H256) -> JsonRpcResult<Option<Block>> {
634        Ok(self.block_by_hash(hash))
635    }
636
637    fn pos_block_by_number(
638        &self, number: BlockNumber,
639    ) -> JsonRpcResult<Option<Block>> {
640        Ok(self.block_by_number(number))
641    }
642
643    fn pos_transaction_by_number(
644        &self, number: U64,
645    ) -> JsonRpcResult<Option<Transaction>> {
646        Ok(self.tx_by_version(number.as_u64()))
647    }
648
649    fn pos_consensus_blocks(&self) -> JsonRpcResult<Vec<Block>> {
650        Ok(self.consensus_blocks().unwrap_or(vec![]))
651    }
652
653    fn pos_get_epoch_state(
654        &self, epoch: U64,
655    ) -> JsonRpcResult<Option<RpcEpochState>> {
656        Ok(self
657            .epoch_state_by_epoch_number(epoch.as_u64())
658            .map(|e| (&e).into()))
659    }
660
661    fn pos_get_ledger_info_by_epoch(
662        &self, epoch: U64,
663    ) -> JsonRpcResult<Option<RpcLedgerInfoWithSignatures>> {
664        Ok(self
665            .ledger_info_by_epoch(epoch.as_u64())
666            .map(|l| (&l).into()))
667    }
668
669    fn pos_get_ledger_info_by_block_number(
670        &self, number: BlockNumber,
671    ) -> JsonRpcResult<Option<RpcLedgerInfoWithSignatures>> {
672        Ok(self
673            .ledger_info_by_block_number(number)
674            .map(|l| (&l).into()))
675    }
676
677    fn pos_get_ledger_info_by_epoch_and_round(
678        &self, epoch: U64, round: U64,
679    ) -> JsonRpcResult<Option<RpcLedgerInfoWithSignatures>> {
680        Ok(self
681            .ledger_info_by_epoch_and_round(epoch.as_u64(), round.as_u64())
682            .map(|l| (&l).into()))
683    }
684
685    fn pos_get_ledger_infos_by_epoch(
686        &self, start_epoch: U64, end_epoch: U64,
687    ) -> JsonRpcResult<Vec<RpcLedgerInfoWithSignatures>> {
688        Ok(self
689            .ledger_infos_by_epoch(start_epoch.as_u64(), end_epoch.as_u64())
690            .iter()
691            .map(|l| l.into())
692            .collect())
693    }
694
695    fn pos_get_rewards_by_epoch(
696        &self, epoch: U64,
697    ) -> JsonRpcResult<Option<PoSEpochReward>> {
698        let reward = self
699            .pow_data_manager
700            .pos_reward_by_pos_epoch(epoch.as_u64())
701            .map(|reward_info| {
702                convert_to_pos_epoch_reward(reward_info, self.network_type).ok()
703            })
704            .unwrap_or(None);
705        Ok(reward)
706    }
707}
708
709pub fn convert_to_pos_epoch_reward(
710    reward: PosRewardInfo, network_type: Network,
711) -> Result<PoSEpochReward, String> {
712    let default_value = U256::from(0);
713    let mut account_reward_map = HashMap::new();
714    let mut account_address_map = HashMap::new();
715    for r in reward.account_rewards.iter() {
716        let key = r.pos_identifier;
717        let r1 = account_reward_map.get(&key).unwrap_or(&default_value);
718        let merged_reward = r.reward + r1;
719        account_reward_map.insert(key, merged_reward);
720
721        let rpc_address = RpcAddress::try_from_h160(r.address, network_type)?;
722        account_address_map.insert(key, rpc_address);
723    }
724    let account_rewards = account_reward_map
725        .iter()
726        .map(|(k, v)| Reward {
727            pos_address: *k,
728            pow_address: account_address_map.get(k).unwrap().clone(),
729            reward: *v,
730        })
731        .filter(|r| r.reward > U256::from(0))
732        .collect();
733    Ok(PoSEpochReward {
734        pow_epoch_hash: reward.execution_epoch_hash,
735        account_rewards,
736    })
737}