1use crate::{
6 block_data_manager::BlockExecutionResult,
7 message::{
8 GetMaybeRequestId, Message, MessageProtocolVersionBound, MsgId,
9 RequestId, SetRequestId,
10 },
11 sync::{
12 message::{
13 msgid, Context, DynamicCapability, Handleable, KeyContainer,
14 SnapshotManifestResponse,
15 },
16 request_manager::{AsAny, Request},
17 state::storage::{RangedManifest, SnapshotSyncCandidate},
18 Error, ProtocolConfiguration, SYNC_PROTO_V1, SYNC_PROTO_V3,
19 },
20};
21use cfx_parameters::{
22 consensus::DEFERRED_STATE_EPOCH_COUNT,
23 consensus_internal::REWARD_EPOCH_COUNT,
24};
25use cfx_types::H256;
26use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
27use network::service::ProtocolVersion;
28use primitives::{EpochNumber, StateRoot};
29use rlp::Encodable;
30use rlp_derive::{RlpDecodable, RlpEncodable};
31use std::{any::Any, time::Duration};
32
33#[derive(Debug, Clone, RlpDecodable, RlpEncodable, DeriveMallocSizeOf)]
34pub struct SnapshotManifestRequest {
35 pub request_id: u64,
36 pub snapshot_to_sync: SnapshotSyncCandidate,
37 pub start_chunk: Option<Vec<u8>>,
38 pub trusted_blame_block: Option<H256>,
39}
40
41build_msg_with_request_id_impl! {
42 SnapshotManifestRequest, msgid::GET_SNAPSHOT_MANIFEST,
43 "SnapshotManifestRequest", SYNC_PROTO_V1, SYNC_PROTO_V3
44}
45
46impl Handleable for SnapshotManifestRequest {
47 fn handle(self, ctx: &Context) -> Result<(), Error> {
48 let snapshot_merkle_root;
50 let manifest = match RangedManifest::load(
51 &self.snapshot_to_sync,
52 self.start_chunk.clone(),
53 &ctx.manager.graph.data_man.storage_manager,
54 ctx.manager.protocol_config.chunk_size_byte,
55 ctx.manager.protocol_config.max_chunk_number_in_manifest,
56 ) {
57 Ok(Some((m, merkle_root))) => {
58 snapshot_merkle_root = merkle_root;
59 m
60 }
61 _ => {
62 ctx.send_response(&SnapshotManifestResponse {
65 request_id: self.request_id,
66 ..Default::default()
67 })?;
68 return Ok(());
69 }
70 };
71 if self.is_initial_request() {
72 let (state_root_vec, receipt_blame_vec, bloom_blame_vec) =
73 self.get_blame_states(ctx).unwrap_or_default();
74 let block_receipts =
75 self.get_block_receipts(ctx).unwrap_or_default();
76
77 debug!("handle SnapshotManifestRequest {:?}", self,);
78 ctx.send_response(&SnapshotManifestResponse {
79 request_id: self.request_id,
80 manifest,
81 snapshot_merkle_root,
82 state_root_vec,
83 receipt_blame_vec,
84 bloom_blame_vec,
85 block_receipts,
86 })
87 } else {
88 ctx.send_response(&SnapshotManifestResponse {
89 request_id: self.request_id,
90 manifest,
91 snapshot_merkle_root: Default::default(),
92 state_root_vec: Default::default(),
93 receipt_blame_vec: Default::default(),
94 bloom_blame_vec: Default::default(),
95 block_receipts: Default::default(),
96 })
97 }
98 }
99}
100
101impl SnapshotManifestRequest {
102 pub fn new(
103 snapshot_sync_candidate: SnapshotSyncCandidate,
104 trusted_blame_block: Option<H256>, start_chunk: Option<Vec<u8>>,
105 ) -> Self {
106 SnapshotManifestRequest {
107 request_id: 0,
108 snapshot_to_sync: snapshot_sync_candidate,
109 start_chunk,
110 trusted_blame_block,
111 }
112 }
113
114 pub fn is_initial_request(&self) -> bool {
115 self.trusted_blame_block.is_some()
116 }
117
118 fn get_block_receipts(
126 &self, ctx: &Context,
127 ) -> Option<Vec<BlockExecutionResult>> {
128 let mut epoch_receipts = Vec::new();
129 let mut epoch_hash =
130 self.snapshot_to_sync.get_snapshot_epoch_id().clone();
131 for i in 0..REWARD_EPOCH_COUNT {
132 if let Some(block) =
133 ctx.manager.graph.data_man.block_header_by_hash(&epoch_hash)
134 {
135 match ctx.manager.graph.consensus.get_block_hashes_by_epoch(
136 EpochNumber::Number(block.height()),
137 ) {
138 Ok(ordered_executable_epoch_blocks) => {
139 if i == 0
140 && *ordered_executable_epoch_blocks.last().unwrap()
141 != epoch_hash
142 {
143 debug!(
144 "Snapshot epoch id mismatched for epoch {}",
145 block.height()
146 );
147 return None;
148 }
149 for hash in &ordered_executable_epoch_blocks {
150 match ctx
151 .manager
152 .graph
153 .data_man
154 .block_execution_result_by_hash_with_epoch(
155 hash,
156 &epoch_hash,
157 false, false, ) {
160 Some(block_execution_result) => {
161 epoch_receipts.push(block_execution_result);
162 }
163 None => {
164 debug!("Cannot get execution result for hash={:?} epoch_hash={:?}",
165 hash, epoch_hash
166 );
167 return None;
168 }
169 }
170 }
171 }
172 Err(_) => {
173 debug!(
174 "Cannot get block hashes for epoch {}",
175 block.height()
176 );
177 return None;
178 }
179 }
180 if block.height() == 0 {
182 break;
183 }
184 epoch_hash = block.parent_hash().clone();
185 } else {
186 warn!(
187 "failed to find block={} in db, peer={}",
188 epoch_hash, ctx.node_id
189 );
190 return None;
191 }
192 }
193 Some(epoch_receipts)
194 }
195
196 fn get_blame_states(
200 &self, ctx: &Context,
201 ) -> Option<(Vec<StateRoot>, Vec<H256>, Vec<H256>)> {
202 let trusted_block = ctx
203 .manager
204 .graph
205 .data_man
206 .block_header_by_hash(&self.trusted_blame_block?)?;
207 let snapshot_epoch_block =
208 ctx.manager.graph.data_man.block_header_by_hash(
209 self.snapshot_to_sync.get_snapshot_epoch_id(),
210 )?;
211 if trusted_block.height() < snapshot_epoch_block.height() {
212 warn!(
213 "receive invalid snapshot manifest request from peer={}",
214 ctx.node_id
215 );
216 return None;
217 }
218 let mut block_hash = trusted_block.hash();
219 let mut trusted_block_height = trusted_block.height();
220 let mut blame_count = trusted_block.blame();
221 let mut deferred_block_hash = block_hash;
222 for _ in 0..DEFERRED_STATE_EPOCH_COUNT {
223 deferred_block_hash = *ctx
224 .manager
225 .graph
226 .data_man
227 .block_header_by_hash(&deferred_block_hash)
228 .expect("All headers exist")
229 .parent_hash();
230 }
231
232 let min_vec_len = if snapshot_epoch_block.height() == 0 {
233 trusted_block.height()
234 - DEFERRED_STATE_EPOCH_COUNT
235 - snapshot_epoch_block.height()
236 + 1
237 } else {
238 trusted_block.height()
239 - DEFERRED_STATE_EPOCH_COUNT
240 - snapshot_epoch_block.height()
241 + REWARD_EPOCH_COUNT
242 };
243 let mut state_root_vec = Vec::with_capacity(min_vec_len as usize);
244 let mut receipt_blame_vec = Vec::with_capacity(min_vec_len as usize);
245 let mut bloom_blame_vec = Vec::with_capacity(min_vec_len as usize);
246
247 loop {
249 if let Some(block) =
250 ctx.manager.graph.data_man.block_header_by_hash(&block_hash)
251 {
252 if block.height() + blame_count as u64 + 1
254 == trusted_block_height
255 {
256 trusted_block_height = block.height();
257 blame_count = block.blame()
258 }
259 if let Some(commitment) = ctx
260 .manager
261 .graph
262 .data_man
263 .get_epoch_execution_commitment_with_db(
264 &deferred_block_hash,
265 )
266 {
267 state_root_vec.push(
268 commitment.state_root_with_aux_info.state_root.clone(),
269 );
270 receipt_blame_vec.push(commitment.receipts_root);
271 bloom_blame_vec.push(commitment.logs_bloom_hash);
272 } else {
273 warn!(
274 "failed to find block={} in db, peer={}",
275 block_hash, ctx.node_id
276 );
277 return None;
278 }
279 if block.height() + blame_count as u64 == trusted_block_height
281 && state_root_vec.len() >= min_vec_len as usize
282 {
283 break;
284 }
285 block_hash = *block.parent_hash();
286 deferred_block_hash = *ctx
287 .manager
288 .graph
289 .data_man
290 .block_header_by_hash(&deferred_block_hash)
291 .expect("All headers received")
292 .parent_hash();
293 } else {
294 warn!(
295 "failed to find block={} in db, peer={}",
296 block_hash, ctx.node_id
297 );
298 return None;
299 }
300 }
301
302 Some((state_root_vec, receipt_blame_vec, bloom_blame_vec))
303 }
304}
305
306impl AsAny for SnapshotManifestRequest {
307 fn as_any(&self) -> &dyn Any { self }
308
309 fn as_any_mut(&mut self) -> &mut dyn Any { self }
310}
311
312impl Request for SnapshotManifestRequest {
313 fn timeout(&self, conf: &ProtocolConfiguration) -> Duration {
314 conf.snapshot_manifest_request_timeout
315 }
316
317 fn on_removed(&self, _inflight_keys: &KeyContainer) {}
318
319 fn with_inflight(&mut self, _inflight_keys: &KeyContainer) {}
320
321 fn is_empty(&self) -> bool { false }
322
323 fn resend(&self) -> Option<Box<dyn Request>> { None }
324
325 fn required_capability(&self) -> Option<DynamicCapability> { None }
326}