1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0
// Copyright 2021 Conflux Foundation. All rights reserved.
// Conflux is free software and distributed under GNU General Public License.
// See http://www.gnu.org/licenses/
use crate::pos::state_sync::client::StateSyncClient;
use super::{error::StateSyncError, state_replication::StateComputer};
use anyhow::Result;
use consensus_types::block::Block;
use diem_crypto::HashValue;
use diem_infallible::Mutex;
use diem_logger::prelude::*;
use diem_metrics::monitor;
use diem_types::{
ledger_info::LedgerInfoWithSignatures, transaction::Transaction,
};
use executor_types::{
BlockExecutor, Error as ExecutionError, StateComputeResult,
};
use fail::fail_point;
use std::boxed::Box;
/// Basic communication with the Execution module;
/// implements StateComputer traits.
pub struct ExecutionProxy {
//execution_correctness_client:
// Mutex<Box<dyn ExecutionCorrectness + Send + Sync>>,
synchronizer: StateSyncClient,
// TODO(lpl): Use Mutex or Arc?
executor: Mutex<Box<dyn BlockExecutor>>,
}
impl ExecutionProxy {
pub fn new(
executor: Box<dyn BlockExecutor>, synchronizer: StateSyncClient,
) -> Self {
Self {
/*execution_correctness_client: Mutex::new(
execution_correctness_client,
),*/
synchronizer,
executor: Mutex::new(executor),
}
}
}
#[async_trait::async_trait]
impl StateComputer for ExecutionProxy {
fn compute(
&self,
// The block to be executed.
block: &Block,
// The parent block id.
parent_block_id: HashValue,
catch_up_mode: bool,
) -> Result<StateComputeResult, ExecutionError> {
fail_point!("consensus::compute", |_| {
Err(ExecutionError::InternalError {
error: "Injected error in compute".into(),
})
});
diem_debug!(
block_id = block.id(),
parent_id = block.parent_id(),
"Executing block",
);
// TODO: figure out error handling for the prologue txn
monitor!(
"execute_block",
self.executor.lock().execute_block(
id_and_transactions_from_block(block),
parent_block_id,
catch_up_mode
)
)
}
/// Send a successful commit. A future is fulfilled when the state is
/// finalized.
async fn commit(
&self, block_ids: Vec<HashValue>,
finality_proof: LedgerInfoWithSignatures,
) -> Result<(), ExecutionError> {
let (committed_txns, reconfig_events) = monitor!(
"commit_block",
self.executor
.lock()
.commit_blocks(block_ids, finality_proof)?
);
if let Err(e) = monitor!(
"notify_state_sync",
self.synchronizer
.commit(committed_txns, reconfig_events)
.await
) {
diem_error!(error = ?e, "Failed to notify state synchronizer");
}
Ok(())
}
/// Synchronize to a commit that not present locally.
async fn sync_to(
&self, _target: LedgerInfoWithSignatures,
) -> Result<(), StateSyncError> {
fail_point!("consensus::sync_to", |_| {
Err(anyhow::anyhow!("Injected error in sync_to").into())
});
// Here to start to do state synchronization where ChunkExecutor inside
// will process chunks and commit to Storage. However, after
// block execution and commitments, the sync state of
// ChunkExecutor may be not up to date so it is required to
// reset the cache of ChunkExecutor in State Sync when requested
// to sync.
//let res = monitor!("sync_to",
// self.synchronizer.sync_to(target).await); Similarily, after
// the state synchronization, we have to reset the
// cache of BlockExecutor to guarantee the latest committed
// state is up to date.
// self.executor.reset()?;
/*res.map_err(|error| {
let anyhow_error: anyhow::Error = error.into();
anyhow_error.into()
})*/
Ok(())
}
}
fn id_and_transactions_from_block(
block: &Block,
) -> (HashValue, Vec<Transaction>) {
let id = block.id();
// TODO(lpl): Do we need BlockMetadata?
let mut transactions = vec![Transaction::BlockMetadata(block.into())];
transactions.extend(
block
.payload()
.unwrap_or(&vec![])
.iter()
.map(|txn| Transaction::UserTransaction(txn.clone())),
);
(id, transactions)
}