use crate::{
block_data_manager::BlockExecutionResult,
message::{
GetMaybeRequestId, Message, MessageProtocolVersionBound, MsgId,
RequestId, SetRequestId,
},
sync::{
message::{
msgid, Context, DynamicCapability, Handleable, KeyContainer,
SnapshotManifestResponse,
},
request_manager::{AsAny, Request},
state::storage::{RangedManifest, SnapshotSyncCandidate},
Error, ProtocolConfiguration, SYNC_PROTO_V1, SYNC_PROTO_V3,
},
};
use cfx_parameters::{
consensus::DEFERRED_STATE_EPOCH_COUNT,
consensus_internal::REWARD_EPOCH_COUNT,
};
use cfx_types::H256;
use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
use network::service::ProtocolVersion;
use primitives::{EpochNumber, StateRoot};
use rlp::Encodable;
use rlp_derive::{RlpDecodable, RlpEncodable};
use std::{any::Any, time::Duration};
#[derive(Debug, Clone, RlpDecodable, RlpEncodable, DeriveMallocSizeOf)]
pub struct SnapshotManifestRequest {
pub request_id: u64,
pub snapshot_to_sync: SnapshotSyncCandidate,
pub start_chunk: Option<Vec<u8>>,
pub trusted_blame_block: Option<H256>,
}
build_msg_with_request_id_impl! {
SnapshotManifestRequest, msgid::GET_SNAPSHOT_MANIFEST,
"SnapshotManifestRequest", SYNC_PROTO_V1, SYNC_PROTO_V3
}
impl Handleable for SnapshotManifestRequest {
fn handle(self, ctx: &Context) -> Result<(), Error> {
let snapshot_merkle_root;
let manifest = match RangedManifest::load(
&self.snapshot_to_sync,
self.start_chunk.clone(),
&ctx.manager.graph.data_man.storage_manager,
ctx.manager.protocol_config.chunk_size_byte,
ctx.manager.protocol_config.max_chunk_number_in_manifest,
) {
Ok(Some((m, merkle_root))) => {
snapshot_merkle_root = merkle_root;
m
}
_ => {
ctx.send_response(&SnapshotManifestResponse {
request_id: self.request_id,
..Default::default()
})?;
return Ok(());
}
};
if self.is_initial_request() {
let (state_root_vec, receipt_blame_vec, bloom_blame_vec) =
self.get_blame_states(ctx).unwrap_or_default();
let block_receipts =
self.get_block_receipts(ctx).unwrap_or_default();
debug!("handle SnapshotManifestRequest {:?}", self,);
ctx.send_response(&SnapshotManifestResponse {
request_id: self.request_id,
manifest,
snapshot_merkle_root,
state_root_vec,
receipt_blame_vec,
bloom_blame_vec,
block_receipts,
})
} else {
ctx.send_response(&SnapshotManifestResponse {
request_id: self.request_id,
manifest,
snapshot_merkle_root: Default::default(),
state_root_vec: Default::default(),
receipt_blame_vec: Default::default(),
bloom_blame_vec: Default::default(),
block_receipts: Default::default(),
})
}
}
}
impl SnapshotManifestRequest {
pub fn new(
snapshot_sync_candidate: SnapshotSyncCandidate,
trusted_blame_block: Option<H256>, start_chunk: Option<Vec<u8>>,
) -> Self {
SnapshotManifestRequest {
request_id: 0,
snapshot_to_sync: snapshot_sync_candidate,
start_chunk,
trusted_blame_block,
}
}
pub fn is_initial_request(&self) -> bool {
self.trusted_blame_block.is_some()
}
fn get_block_receipts(
&self, ctx: &Context,
) -> Option<Vec<BlockExecutionResult>> {
let mut epoch_receipts = Vec::new();
let mut epoch_hash =
self.snapshot_to_sync.get_snapshot_epoch_id().clone();
for i in 0..REWARD_EPOCH_COUNT {
if let Some(block) =
ctx.manager.graph.data_man.block_header_by_hash(&epoch_hash)
{
match ctx.manager.graph.consensus.get_block_hashes_by_epoch(
EpochNumber::Number(block.height()),
) {
Ok(ordered_executable_epoch_blocks) => {
if i == 0
&& *ordered_executable_epoch_blocks.last().unwrap()
!= epoch_hash
{
debug!(
"Snapshot epoch id mismatched for epoch {}",
block.height()
);
return None;
}
for hash in &ordered_executable_epoch_blocks {
match ctx
.manager
.graph
.data_man
.block_execution_result_by_hash_with_epoch(
hash,
&epoch_hash,
false, false, ) {
Some(block_execution_result) => {
epoch_receipts.push(block_execution_result);
}
None => {
debug!("Cannot get execution result for hash={:?} epoch_hash={:?}",
hash, epoch_hash
);
return None;
}
}
}
}
Err(_) => {
debug!(
"Cannot get block hashes for epoch {}",
block.height()
);
return None;
}
}
if block.height() == 0 {
break;
}
epoch_hash = block.parent_hash().clone();
} else {
warn!(
"failed to find block={} in db, peer={}",
epoch_hash, ctx.node_id
);
return None;
}
}
Some(epoch_receipts)
}
fn get_blame_states(
&self, ctx: &Context,
) -> Option<(Vec<StateRoot>, Vec<H256>, Vec<H256>)> {
let trusted_block = ctx
.manager
.graph
.data_man
.block_header_by_hash(&self.trusted_blame_block?)?;
let snapshot_epoch_block =
ctx.manager.graph.data_man.block_header_by_hash(
self.snapshot_to_sync.get_snapshot_epoch_id(),
)?;
if trusted_block.height() < snapshot_epoch_block.height() {
warn!(
"receive invalid snapshot manifest request from peer={}",
ctx.node_id
);
return None;
}
let mut block_hash = trusted_block.hash();
let mut trusted_block_height = trusted_block.height();
let mut blame_count = trusted_block.blame();
let mut deferred_block_hash = block_hash;
for _ in 0..DEFERRED_STATE_EPOCH_COUNT {
deferred_block_hash = *ctx
.manager
.graph
.data_man
.block_header_by_hash(&deferred_block_hash)
.expect("All headers exist")
.parent_hash();
}
let min_vec_len = if snapshot_epoch_block.height() == 0 {
trusted_block.height()
- DEFERRED_STATE_EPOCH_COUNT
- snapshot_epoch_block.height()
+ 1
} else {
trusted_block.height()
- DEFERRED_STATE_EPOCH_COUNT
- snapshot_epoch_block.height()
+ REWARD_EPOCH_COUNT
};
let mut state_root_vec = Vec::with_capacity(min_vec_len as usize);
let mut receipt_blame_vec = Vec::with_capacity(min_vec_len as usize);
let mut bloom_blame_vec = Vec::with_capacity(min_vec_len as usize);
loop {
if let Some(block) =
ctx.manager.graph.data_man.block_header_by_hash(&block_hash)
{
if block.height() + blame_count as u64 + 1
== trusted_block_height
{
trusted_block_height = block.height();
blame_count = block.blame()
}
if let Some(commitment) = ctx
.manager
.graph
.data_man
.get_epoch_execution_commitment_with_db(
&deferred_block_hash,
)
{
state_root_vec.push(
commitment.state_root_with_aux_info.state_root.clone(),
);
receipt_blame_vec.push(commitment.receipts_root);
bloom_blame_vec.push(commitment.logs_bloom_hash);
} else {
warn!(
"failed to find block={} in db, peer={}",
block_hash, ctx.node_id
);
return None;
}
if block.height() + blame_count as u64 == trusted_block_height
&& state_root_vec.len() >= min_vec_len as usize
{
break;
}
block_hash = *block.parent_hash();
deferred_block_hash = *ctx
.manager
.graph
.data_man
.block_header_by_hash(&deferred_block_hash)
.expect("All headers received")
.parent_hash();
} else {
warn!(
"failed to find block={} in db, peer={}",
block_hash, ctx.node_id
);
return None;
}
}
Some((state_root_vec, receipt_blame_vec, bloom_blame_vec))
}
}
impl AsAny for SnapshotManifestRequest {
fn as_any(&self) -> &dyn Any { self }
fn as_any_mut(&mut self) -> &mut dyn Any { self }
}
impl Request for SnapshotManifestRequest {
fn timeout(&self, conf: &ProtocolConfiguration) -> Duration {
conf.snapshot_manifest_request_timeout
}
fn on_removed(&self, _inflight_keys: &KeyContainer) {}
fn with_inflight(&mut self, _inflight_keys: &KeyContainer) {}
fn is_empty(&self) -> bool { false }
fn resend(&self) -> Option<Box<dyn Request>> { None }
fn required_capability(&self) -> Option<DynamicCapability> { None }
}