1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0

// Copyright 2021 Conflux Foundation. All rights reserved.
// Conflux is free software and distributed under GNU General Public License.
// See http://www.gnu.org/licenses/

use crate::pos::state_sync::client::StateSyncClient;

use super::{error::StateSyncError, state_replication::StateComputer};
use anyhow::Result;
use consensus_types::block::Block;
use diem_crypto::HashValue;
use diem_infallible::Mutex;
use diem_logger::prelude::*;
use diem_metrics::monitor;
use diem_types::{
    ledger_info::LedgerInfoWithSignatures, transaction::Transaction,
};
use executor_types::{
    BlockExecutor, Error as ExecutionError, StateComputeResult,
};
use fail::fail_point;
use std::boxed::Box;

/// Basic communication with the Execution module;
/// implements StateComputer traits.
pub struct ExecutionProxy {
    //execution_correctness_client:
    //    Mutex<Box<dyn ExecutionCorrectness + Send + Sync>>,
    synchronizer: StateSyncClient,
    // TODO(lpl): Use Mutex or Arc?
    executor: Mutex<Box<dyn BlockExecutor>>,
}

impl ExecutionProxy {
    pub fn new(
        executor: Box<dyn BlockExecutor>, synchronizer: StateSyncClient,
    ) -> Self {
        Self {
            /*execution_correctness_client: Mutex::new(
                execution_correctness_client,
            ),*/
            synchronizer,
            executor: Mutex::new(executor),
        }
    }
}

#[async_trait::async_trait]
impl StateComputer for ExecutionProxy {
    fn compute(
        &self,
        // The block to be executed.
        block: &Block,
        // The parent block id.
        parent_block_id: HashValue,
        catch_up_mode: bool,
    ) -> Result<StateComputeResult, ExecutionError> {
        fail_point!("consensus::compute", |_| {
            Err(ExecutionError::InternalError {
                error: "Injected error in compute".into(),
            })
        });
        diem_debug!(
            block_id = block.id(),
            parent_id = block.parent_id(),
            "Executing block",
        );

        // TODO: figure out error handling for the prologue txn
        monitor!(
            "execute_block",
            self.executor.lock().execute_block(
                id_and_transactions_from_block(block),
                parent_block_id,
                catch_up_mode
            )
        )
    }

    /// Send a successful commit. A future is fulfilled when the state is
    /// finalized.
    async fn commit(
        &self, block_ids: Vec<HashValue>,
        finality_proof: LedgerInfoWithSignatures,
    ) -> Result<(), ExecutionError> {
        let (committed_txns, reconfig_events) = monitor!(
            "commit_block",
            self.executor
                .lock()
                .commit_blocks(block_ids, finality_proof)?
        );
        if let Err(e) = monitor!(
            "notify_state_sync",
            self.synchronizer
                .commit(committed_txns, reconfig_events)
                .await
        ) {
            diem_error!(error = ?e, "Failed to notify state synchronizer");
        }
        Ok(())
    }

    /// Synchronize to a commit that not present locally.
    async fn sync_to(
        &self, _target: LedgerInfoWithSignatures,
    ) -> Result<(), StateSyncError> {
        fail_point!("consensus::sync_to", |_| {
            Err(anyhow::anyhow!("Injected error in sync_to").into())
        });
        // Here to start to do state synchronization where ChunkExecutor inside
        // will process chunks and commit to Storage. However, after
        // block execution and commitments, the sync state of
        // ChunkExecutor may be not up to date so it is required to
        // reset the cache of ChunkExecutor in State Sync when requested
        // to sync.
        //let res = monitor!("sync_to",
        // self.synchronizer.sync_to(target).await); Similarily, after
        // the state synchronization, we have to reset the
        // cache of BlockExecutor to guarantee the latest committed
        // state is up to date.
        // self.executor.reset()?;

        /*res.map_err(|error| {
            let anyhow_error: anyhow::Error = error.into();
            anyhow_error.into()
        })*/
        Ok(())
    }
}

fn id_and_transactions_from_block(
    block: &Block,
) -> (HashValue, Vec<Transaction>) {
    let id = block.id();
    // TODO(lpl): Do we need BlockMetadata?
    let mut transactions = vec![Transaction::BlockMetadata(block.into())];
    transactions.extend(
        block
            .payload()
            .unwrap_or(&vec![])
            .iter()
            .map(|txn| Transaction::UserTransaction(txn.clone())),
    );
    (id, transactions)
}