cfxcore/pos/consensus/
state_computer.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8use crate::pos::state_sync::client::StateSyncClient;
9
10use super::{error::StateSyncError, state_replication::StateComputer};
11use anyhow::Result;
12use consensus_types::block::Block;
13use diem_crypto::HashValue;
14use diem_infallible::Mutex;
15use diem_logger::prelude::*;
16use diem_metrics::monitor;
17use diem_types::{
18    ledger_info::LedgerInfoWithSignatures, transaction::Transaction,
19};
20use executor_types::{
21    BlockExecutor, Error as ExecutionError, StateComputeResult,
22};
23use fail::fail_point;
24use std::boxed::Box;
25
26/// Basic communication with the Execution module;
27/// implements StateComputer traits.
28pub struct ExecutionProxy {
29    //execution_correctness_client:
30    //    Mutex<Box<dyn ExecutionCorrectness + Send + Sync>>,
31    synchronizer: StateSyncClient,
32    // TODO(lpl): Use Mutex or Arc?
33    executor: Mutex<Box<dyn BlockExecutor>>,
34}
35
36impl ExecutionProxy {
37    pub fn new(
38        executor: Box<dyn BlockExecutor>, synchronizer: StateSyncClient,
39    ) -> Self {
40        Self {
41            /*execution_correctness_client: Mutex::new(
42                execution_correctness_client,
43            ),*/
44            synchronizer,
45            executor: Mutex::new(executor),
46        }
47    }
48}
49
50#[async_trait::async_trait]
51impl StateComputer for ExecutionProxy {
52    fn compute(
53        &self,
54        // The block to be executed.
55        block: &Block,
56        // The parent block id.
57        parent_block_id: HashValue,
58        catch_up_mode: bool,
59    ) -> Result<StateComputeResult, ExecutionError> {
60        fail_point!("consensus::compute", |_| {
61            Err(ExecutionError::InternalError {
62                error: "Injected error in compute".into(),
63            })
64        });
65        diem_debug!(
66            block_id = block.id(),
67            parent_id = block.parent_id(),
68            "Executing block",
69        );
70
71        // TODO: figure out error handling for the prologue txn
72        monitor!(
73            "execute_block",
74            self.executor.lock().execute_block(
75                id_and_transactions_from_block(block),
76                parent_block_id,
77                catch_up_mode
78            )
79        )
80    }
81
82    /// Send a successful commit. A future is fulfilled when the state is
83    /// finalized.
84    async fn commit(
85        &self, block_ids: Vec<HashValue>,
86        finality_proof: LedgerInfoWithSignatures,
87    ) -> Result<(), ExecutionError> {
88        let (committed_txns, reconfig_events) = monitor!(
89            "commit_block",
90            self.executor
91                .lock()
92                .commit_blocks(block_ids, finality_proof)?
93        );
94        if let Err(e) = monitor!(
95            "notify_state_sync",
96            self.synchronizer
97                .commit(committed_txns, reconfig_events)
98                .await
99        ) {
100            diem_error!(error = ?e, "Failed to notify state synchronizer");
101        }
102        Ok(())
103    }
104
105    /// Synchronize to a commit that not present locally.
106    async fn sync_to(
107        &self, _target: LedgerInfoWithSignatures,
108    ) -> Result<(), StateSyncError> {
109        fail_point!("consensus::sync_to", |_| {
110            Err(anyhow::anyhow!("Injected error in sync_to").into())
111        });
112        // Here to start to do state synchronization where ChunkExecutor inside
113        // will process chunks and commit to Storage. However, after
114        // block execution and commitments, the sync state of
115        // ChunkExecutor may be not up to date so it is required to
116        // reset the cache of ChunkExecutor in State Sync when requested
117        // to sync.
118        //let res = monitor!("sync_to",
119        // self.synchronizer.sync_to(target).await); Similarily, after
120        // the state synchronization, we have to reset the
121        // cache of BlockExecutor to guarantee the latest committed
122        // state is up to date.
123        // self.executor.reset()?;
124
125        /*res.map_err(|error| {
126            let anyhow_error: anyhow::Error = error.into();
127            anyhow_error.into()
128        })*/
129        Ok(())
130    }
131}
132
133fn id_and_transactions_from_block(
134    block: &Block,
135) -> (HashValue, Vec<Transaction>) {
136    let id = block.id();
137    // TODO(lpl): Do we need BlockMetadata?
138    let mut transactions = vec![Transaction::BlockMetadata(block.into())];
139    transactions.extend(
140        block
141            .payload()
142            .unwrap_or(&vec![])
143            .iter()
144            .map(|txn| Transaction::UserTransaction(txn.clone())),
145    );
146    (id, transactions)
147}