cfxcore/pos/consensus/
state_computer.rs1use crate::pos::state_sync::client::StateSyncClient;
9
10use super::{error::StateSyncError, state_replication::StateComputer};
11use anyhow::Result;
12use consensus_types::block::Block;
13use diem_crypto::HashValue;
14use diem_infallible::Mutex;
15use diem_logger::prelude::*;
16use diem_metrics::monitor;
17use diem_types::{
18 ledger_info::LedgerInfoWithSignatures, transaction::Transaction,
19};
20use executor_types::{
21 BlockExecutor, Error as ExecutionError, StateComputeResult,
22};
23use fail::fail_point;
24use std::boxed::Box;
25
26pub struct ExecutionProxy {
29 synchronizer: StateSyncClient,
32 executor: Mutex<Box<dyn BlockExecutor>>,
34}
35
36impl ExecutionProxy {
37 pub fn new(
38 executor: Box<dyn BlockExecutor>, synchronizer: StateSyncClient,
39 ) -> Self {
40 Self {
41 synchronizer,
45 executor: Mutex::new(executor),
46 }
47 }
48}
49
50#[async_trait::async_trait]
51impl StateComputer for ExecutionProxy {
52 fn compute(
53 &self,
54 block: &Block,
56 parent_block_id: HashValue,
58 catch_up_mode: bool,
59 ) -> Result<StateComputeResult, ExecutionError> {
60 fail_point!("consensus::compute", |_| {
61 Err(ExecutionError::InternalError {
62 error: "Injected error in compute".into(),
63 })
64 });
65 diem_debug!(
66 block_id = block.id(),
67 parent_id = block.parent_id(),
68 "Executing block",
69 );
70
71 monitor!(
73 "execute_block",
74 self.executor.lock().execute_block(
75 id_and_transactions_from_block(block),
76 parent_block_id,
77 catch_up_mode
78 )
79 )
80 }
81
82 async fn commit(
85 &self, block_ids: Vec<HashValue>,
86 finality_proof: LedgerInfoWithSignatures,
87 ) -> Result<(), ExecutionError> {
88 let (committed_txns, reconfig_events) = monitor!(
89 "commit_block",
90 self.executor
91 .lock()
92 .commit_blocks(block_ids, finality_proof)?
93 );
94 if let Err(e) = monitor!(
95 "notify_state_sync",
96 self.synchronizer
97 .commit(committed_txns, reconfig_events)
98 .await
99 ) {
100 diem_error!(error = ?e, "Failed to notify state synchronizer");
101 }
102 Ok(())
103 }
104
105 async fn sync_to(
107 &self, _target: LedgerInfoWithSignatures,
108 ) -> Result<(), StateSyncError> {
109 fail_point!("consensus::sync_to", |_| {
110 Err(anyhow::anyhow!("Injected error in sync_to").into())
111 });
112 Ok(())
130 }
131}
132
133fn id_and_transactions_from_block(
134 block: &Block,
135) -> (HashValue, Vec<Transaction>) {
136 let id = block.id();
137 let mut transactions = vec![Transaction::BlockMetadata(block.into())];
139 transactions.extend(
140 block
141 .payload()
142 .unwrap_or(&vec![])
143 .iter()
144 .map(|txn| Transaction::UserTransaction(txn.clone())),
145 );
146 (id, transactions)
147}