1#![allow(unused)]
2
3use crate::{
4 block_data_manager::BlockExecutionResult,
5 message::NetworkContext,
6 sync::{
7 error::Error,
8 message::{
9 msgid, Context, SnapshotManifestRequest, SnapshotManifestResponse,
10 },
11 state::storage::SnapshotSyncCandidate,
12 synchronization_state::PeerFilter,
13 SynchronizationProtocolHandler,
14 },
15 verification::compute_receipts_root,
16};
17use cfx_internal_common::{StateRootAuxInfo, StateRootWithAuxInfo};
18use cfx_parameters::{
19 consensus::DEFERRED_STATE_EPOCH_COUNT,
20 consensus_internal::REWARD_EPOCH_COUNT,
21};
22use cfx_storage::{
23 storage_db::{SnapshotInfo, SnapshotKeptToProvideSyncStatus},
24 TrieProof,
25};
26use cfx_types::{option_vec_to_hex, H256};
27use network::node_table::NodeId;
28use primitives::{
29 BlockHeaderBuilder, BlockReceipts, EpochId, EpochNumber, StateRoot,
30 StorageKey, StorageKeyWithSpace, NULL_EPOCH,
31};
32use rand::{
33 rng,
34 seq::{IndexedRandom, SliceRandom},
35};
36
37use std::{
38 collections::HashSet,
39 fmt::{Debug, Formatter},
40 sync::Arc,
41 time::{Duration, Instant},
42};
43
44pub struct SnapshotManifestManager {
45 manifest_request_status: Option<(Instant, NodeId)>,
46 pub snapshot_candidate: SnapshotSyncCandidate,
47 trusted_blame_block: H256,
48 pub active_peers: HashSet<NodeId>,
49
50 pub chunk_boundaries: Vec<Vec<u8>>,
51 pub chunk_boundary_proofs: Vec<TrieProof>,
52
53 related_data: Option<RelatedData>,
54 config: SnapshotManifestConfig,
55}
56
57#[derive(Clone)]
58pub struct RelatedData {
59 pub true_state_root_by_blame_info: StateRootWithAuxInfo,
61 pub blame_vec_offset: usize,
63 pub receipt_blame_vec: Vec<H256>,
64 pub bloom_blame_vec: Vec<H256>,
65 pub epoch_receipts: Vec<(H256, H256, Arc<BlockReceipts>)>,
66 pub snapshot_info: SnapshotInfo,
67 pub parent_snapshot_info: Option<SnapshotInfo>,
68}
69
70impl SnapshotManifestManager {
71 pub fn new_and_start(
72 snapshot_candidate: SnapshotSyncCandidate, trusted_blame_block: H256,
73 active_peers: HashSet<NodeId>, config: SnapshotManifestConfig,
74 io: &dyn NetworkContext, sync_handler: &SynchronizationProtocolHandler,
75 ) -> Self {
76 let mut manager = Self {
77 manifest_request_status: None,
78 snapshot_candidate,
79 trusted_blame_block,
80 active_peers,
81 chunk_boundaries: vec![],
82 chunk_boundary_proofs: vec![],
83 related_data: None,
84 config,
85 };
86 manager.request_manifest(io, sync_handler, None);
87 manager
88 }
89
90 pub fn handle_snapshot_manifest_response(
91 &mut self, ctx: &Context, response: SnapshotManifestResponse,
92 request: &SnapshotManifestRequest,
93 ) -> Result<Option<RelatedData>, Error> {
94 match self
95 .handle_snapshot_manifest_response_impl(ctx, response, request)
96 {
97 Ok(r) => Ok(r),
98 Err(e) => {
99 self.note_failure(&ctx.node_id);
100 Err(e)
101 }
102 }
103 }
104
105 fn handle_snapshot_manifest_response_impl(
106 &mut self, ctx: &Context, response: SnapshotManifestResponse,
107 request: &SnapshotManifestRequest,
108 ) -> Result<Option<RelatedData>, Error> {
109 if request.snapshot_to_sync != self.snapshot_candidate {
111 info!(
112 "The received snapshot manifest doesn't match the current snapshot_candidate,\
113 current snapshot_candidate = {:?}, requested sync candidate = {:?}",
114 self.snapshot_candidate,
115 request.snapshot_to_sync);
116 return Ok(None);
117 }
118
119 info!(
120 "Snapshot manifest received, checkpoint = {:?}, chunk_boundaries.len()={}, \
121 start={}, next={}",
122 self.snapshot_candidate, response.manifest.chunk_boundaries.len(),
123 option_vec_to_hex(request.start_chunk.as_ref()), option_vec_to_hex(response.manifest.next.as_ref())
124 );
125
126 if request.is_initial_request() {
128 if !self.chunk_boundaries.is_empty() {
129 bail!(Error::InvalidSnapshotManifest(
130 "Initial manifest is not expected".into(),
131 ));
132 }
133 let (
134 blame_vec_offset,
135 state_root_with_aux_info,
136 snapshot_info,
137 parent_snapshot_info,
138 ) = match Self::validate_blame_states(
139 ctx,
140 self.snapshot_candidate.get_snapshot_epoch_id(),
141 &self.trusted_blame_block,
142 &response.state_root_vec,
143 &response.receipt_blame_vec,
144 &response.bloom_blame_vec,
145 ) {
146 Some(info_tuple) => info_tuple,
147 None => {
148 warn!("failed to validate the blame state, re-sync manifest from other peer");
149 self.resync_manifest(ctx);
150 bail!(Error::InvalidSnapshotManifest(
151 "invalid blame state in manifest".into(),
152 ));
153 }
154 };
155
156 let epoch_receipts =
157 match SnapshotManifestManager::validate_epoch_receipts(
158 ctx,
159 blame_vec_offset,
160 self.snapshot_candidate.get_snapshot_epoch_id(),
161 &response.receipt_blame_vec,
162 &response.bloom_blame_vec,
163 &response.block_receipts,
164 ) {
165 Some(epoch_receipts) => epoch_receipts,
166 None => {
167 warn!("failed to validate the epoch receipts, re-sync manifest from other peer");
168 self.resync_manifest(ctx);
169 bail!(Error::InvalidSnapshotManifest(
170 "invalid epoch receipts in manifest".into(),
171 ));
172 }
173 };
174
175 if let Err(e) =
177 response.manifest.validate(&snapshot_info.merkle_root)
178 {
179 warn!("failed to validate snapshot manifest, error = {:?}", e);
180 bail!(Error::InvalidSnapshotManifest(
181 "invalid chunk proofs in manifest".into(),
182 ));
183 }
184 self.related_data = Some(RelatedData {
185 true_state_root_by_blame_info: state_root_with_aux_info,
186 blame_vec_offset,
187 receipt_blame_vec: response.receipt_blame_vec,
188 bloom_blame_vec: response.bloom_blame_vec,
189 epoch_receipts,
190 snapshot_info,
191 parent_snapshot_info,
192 });
193 } else {
194 if self.chunk_boundaries.is_empty() {
195 bail!(Error::InvalidSnapshotManifest(
196 "Non-initial manifest is not expected".into()
197 ));
198 }
199 debug_assert_eq!(
200 request.start_chunk.as_ref(),
201 self.chunk_boundaries.last()
202 );
203 if let Some(related_data) = &self.related_data {
204 if let Err(e) = response
206 .manifest
207 .validate(&related_data.snapshot_info.merkle_root)
208 {
209 warn!(
210 "failed to validate snapshot manifest, error = {:?}",
211 e
212 );
213 bail!(Error::InvalidSnapshotManifest(
214 "invalid chunk proofs in manifest".into(),
215 ));
216 }
217 }
218 }
219 self.chunk_boundaries
222 .extend_from_slice(&response.manifest.chunk_boundaries);
223 self.chunk_boundary_proofs
224 .extend_from_slice(&response.manifest.chunk_boundary_proofs);
225 if response.manifest.next.is_none() {
226 return Ok(self.related_data.clone());
227 } else {
228 self.request_manifest(ctx.io, ctx.manager, response.manifest.next);
229 }
230 Ok(None)
231 }
232
233 pub fn request_manifest(
235 &mut self, io: &dyn NetworkContext,
236 sync_handler: &SynchronizationProtocolHandler,
237 start_chunk: Option<Vec<u8>>,
238 ) {
239 let maybe_trusted_blame_block = if start_chunk.is_none() {
240 Some(self.trusted_blame_block.clone())
241 } else {
242 None
243 };
244 let request = SnapshotManifestRequest::new(
245 self.snapshot_candidate.clone(),
247 maybe_trusted_blame_block,
248 start_chunk,
249 );
250
251 let available_peers = PeerFilter::new(msgid::GET_SNAPSHOT_MANIFEST)
252 .choose_from(&self.active_peers)
253 .select_all(&sync_handler.syn);
254 let maybe_peer = available_peers.choose(&mut rng()).map(|p| *p);
255 if let Some(peer) = maybe_peer {
256 self.manifest_request_status = Some((Instant::now(), peer));
257 sync_handler.request_manager.request_with_delay(
258 io,
259 Box::new(request),
260 Some(peer),
261 None,
262 );
263 }
264 }
265
266 fn resync_manifest(&mut self, ctx: &Context) {
267 self.request_manifest(
268 ctx.io,
269 ctx.manager,
270 self.chunk_boundaries.last().cloned(),
271 );
272 }
273
274 pub fn check_timeout(&mut self, ctx: &Context) {
275 if let Some((manifest_start_time, peer)) = &self.manifest_request_status
276 {
277 if manifest_start_time.elapsed()
278 > self.config.manifest_request_timeout
279 {
280 self.active_peers.remove(peer);
281 self.manifest_request_status = None;
282 self.resync_manifest(ctx);
283 }
284 }
285 }
286
287 pub fn is_inactive(&self) -> bool { self.active_peers.is_empty() }
288
289 pub fn validate_blame_states(
290 ctx: &Context, snapshot_epoch_id: &H256, trusted_blame_block: &H256,
291 state_root_vec: &Vec<StateRoot>, receipt_blame_vec: &Vec<H256>,
292 bloom_blame_vec: &Vec<H256>,
293 ) -> Option<(
294 usize,
295 StateRootWithAuxInfo,
296 SnapshotInfo,
297 Option<SnapshotInfo>,
298 )> {
299 let mut state_blame_vec = vec![];
300
301 let snapshot_block_header = ctx
303 .manager
304 .graph
305 .data_man
306 .block_header_by_hash(snapshot_epoch_id)
307 .expect("block header must exist for snapshot to sync");
308 let trusted_blame_block = ctx
309 .manager
310 .graph
311 .data_man
312 .block_header_by_hash(trusted_blame_block)
313 .expect("trusted_blame_block header must exist");
314
315 let offset = (trusted_blame_block.height()
317 - (snapshot_block_header.height() + DEFERRED_STATE_EPOCH_COUNT))
318 as usize;
319 if offset >= state_root_vec.len() {
320 warn!("validate_blame_states: not enough state_root");
321 return None;
322 }
323
324 let min_vec_len = if snapshot_block_header.height() == 0 {
325 trusted_blame_block.height()
326 - DEFERRED_STATE_EPOCH_COUNT
327 - snapshot_block_header.height()
328 + 1
329 } else {
330 trusted_blame_block.height()
331 - DEFERRED_STATE_EPOCH_COUNT
332 - snapshot_block_header.height()
333 + REWARD_EPOCH_COUNT
334 };
335 let mut trusted_blocks = Vec::new();
336 let mut trusted_block_height = trusted_blame_block.height();
337 let mut blame_count = trusted_blame_block.blame();
338 let mut block_hash = trusted_blame_block.hash();
339 let mut vec_len: usize = 0;
340 trusted_blocks.push(trusted_blame_block);
341
342 loop {
344 vec_len += 1;
345 let block = ctx
346 .manager
347 .graph
348 .data_man
349 .block_header_by_hash(&block_hash)
350 .expect("block header must exist");
351 if block.height() + blame_count as u64 + 1 == trusted_block_height {
353 trusted_block_height = block.height();
354 blame_count = block.blame();
355 trusted_blocks.push(block.clone());
356 }
357 if block.height() + blame_count as u64 == trusted_block_height
358 && vec_len >= min_vec_len as usize
359 {
360 break;
361 }
362 block_hash = *block.parent_hash();
363 }
364 if vec_len != state_root_vec.len() {
366 warn!(
367 "wrong length of state_root_vec, expected {}, but {} found",
368 vec_len,
369 state_root_vec.len()
370 );
371 return None;
372 }
373 state_blame_vec.clear();
375 for state_root in state_root_vec {
376 state_blame_vec.push(state_root.compute_state_root_hash());
377 }
378 let mut slice_begin = 0;
379 for trusted_block in trusted_blocks {
380 let slice_end = slice_begin + trusted_block.blame() as usize + 1;
381 let deferred_state_root = if trusted_block.blame() == 0 {
382 state_blame_vec[slice_begin].clone()
383 } else {
384 BlockHeaderBuilder::compute_blame_state_root_vec_root(
385 state_blame_vec[slice_begin..slice_end].to_vec(),
386 )
387 };
388 let deferred_receipts_root = if trusted_block.blame() == 0 {
389 receipt_blame_vec[slice_begin].clone()
390 } else {
391 BlockHeaderBuilder::compute_blame_state_root_vec_root(
392 receipt_blame_vec[slice_begin..slice_end].to_vec(),
393 )
394 };
395 let deferred_logs_bloom_hash = if trusted_block.blame() == 0 {
396 bloom_blame_vec[slice_begin].clone()
397 } else {
398 BlockHeaderBuilder::compute_blame_state_root_vec_root(
399 bloom_blame_vec[slice_begin..slice_end].to_vec(),
400 )
401 };
402 if deferred_state_root != *trusted_block.deferred_state_root()
405 || deferred_receipts_root
406 != *trusted_block.deferred_receipts_root()
407 || deferred_logs_bloom_hash
408 != *trusted_block.deferred_logs_bloom_hash()
409 {
410 warn!("root mismatch: (state_root, receipts_root, logs_bloom_hash) \
411 should be ({:?} {:?} {:?}), get ({:?} {:?} {:?})",
412 trusted_block.deferred_state_root(),
413 trusted_block.deferred_receipts_root(),
414 trusted_block.deferred_logs_bloom_hash(),
415 deferred_state_root,
416 deferred_receipts_root,
417 deferred_logs_bloom_hash,
418 );
419 return None;
420 }
421 slice_begin = slice_end;
422 }
423
424 let snapshot_epoch_count =
425 ctx.manager.graph.data_man.get_snapshot_epoch_count();
426 let (parent_snapshot_epoch, pivot_chain_parts) =
427 ctx.manager.graph.data_man.get_parent_epochs_for(
428 snapshot_epoch_id.clone(),
429 snapshot_epoch_count as u64,
430 );
431
432 let parent_snapshot_height = if parent_snapshot_epoch == NULL_EPOCH {
433 0
434 } else {
435 ctx.manager
436 .graph
437 .data_man
438 .block_header_by_hash(&parent_snapshot_epoch)
439 .unwrap()
440 .height()
441 };
442 let mut snapshot_state_root = state_root_vec[offset].clone();
443 let state_root_hash = state_root_vec[offset].compute_state_root_hash();
444
445 let snapshot_before_stable_checkpoint = if snapshot_block_header
446 .height()
447 > snapshot_epoch_count as u64
448 {
449 let (grandparent_snapshot_epoch, grandparent_pivot_chain_parts) =
450 ctx.manager.graph.data_man.get_parent_epochs_for(
451 parent_snapshot_epoch.clone(),
452 snapshot_epoch_count as u64,
453 );
454
455 let grandparent_snapshot_height =
456 if grandparent_snapshot_epoch == NULL_EPOCH {
457 0
458 } else {
459 ctx.manager
460 .graph
461 .data_man
462 .block_header_by_hash(&grandparent_snapshot_epoch)
463 .unwrap()
464 .height()
465 };
466 debug!(
467 "grandparent snapshot epoch {:?}, height {}",
468 grandparent_snapshot_epoch, grandparent_snapshot_height
469 );
470
471 Some(SnapshotInfo {
472 snapshot_info_kept_to_provide_sync:
474 SnapshotKeptToProvideSyncStatus::InfoOnly,
475 serve_one_step_sync: false,
476 merkle_root: state_root_vec[offset - 1].snapshot_root,
477 height: snapshot_block_header.height()
478 - snapshot_epoch_count as u64,
479 parent_snapshot_epoch_id: grandparent_snapshot_epoch,
480 parent_snapshot_height: grandparent_snapshot_height,
481 pivot_chain_parts: grandparent_pivot_chain_parts,
482 })
483 } else {
484 None
485 };
486
487 Some((
488 offset,
489 StateRootWithAuxInfo {
490 state_root: snapshot_state_root,
491 aux_info: StateRootAuxInfo {
492 snapshot_epoch_id: snapshot_epoch_id.clone(),
493 delta_mpt_key_padding:
495 StorageKeyWithSpace::delta_mpt_padding(
496 &state_root_vec[offset].snapshot_root,
497 &state_root_vec[offset].intermediate_delta_root,
498 ),
499 intermediate_epoch_id: parent_snapshot_epoch,
500 maybe_intermediate_mpt_key_padding: None,
503 state_root_hash,
504 },
505 },
506 SnapshotInfo {
507 snapshot_info_kept_to_provide_sync: Default::default(),
508 serve_one_step_sync: false,
509 merkle_root: state_root_vec[offset
512 - ctx
513 .manager
514 .graph
515 .data_man
516 .get_snapshot_blame_plus_depth()]
517 .snapshot_root,
518 height: snapshot_block_header.height(),
519 parent_snapshot_epoch_id: parent_snapshot_epoch,
520 parent_snapshot_height,
521 pivot_chain_parts,
522 },
523 snapshot_before_stable_checkpoint,
524 ))
525 }
526
527 pub fn validate_epoch_receipts(
528 ctx: &Context, blame_vec_offset: usize, snapshot_epoch_id: &EpochId,
529 receipt_blame_vec: &Vec<H256>, bloom_blame_vec: &Vec<H256>,
530 block_receipts: &Vec<BlockExecutionResult>,
531 ) -> Option<Vec<(H256, H256, Arc<BlockReceipts>)>> {
532 let mut epoch_hash = snapshot_epoch_id.clone();
533 let checkpoint = ctx
534 .manager
535 .graph
536 .data_man
537 .block_header_by_hash(snapshot_epoch_id)
538 .expect("checkpoint header must exist");
539 let epoch_receipts_count = if checkpoint.height() == 0 {
540 1
541 } else {
542 REWARD_EPOCH_COUNT
543 } as usize;
544 let mut receipts_vec_offset = 0;
545 let mut result = Vec::new();
546 for idx in 0..epoch_receipts_count {
547 let block_header = ctx
548 .manager
549 .graph
550 .data_man
551 .block_header_by_hash(&epoch_hash)
552 .expect("block header must exist");
553 let ordered_executable_epoch_blocks = ctx
554 .manager
555 .graph
556 .consensus
557 .get_block_hashes_by_epoch(EpochNumber::Number(
558 block_header.height(),
559 ))
560 .expect("ordered executable epoch blocks must exist");
561 let mut epoch_receipts = Vec::new();
562 for i in 0..ordered_executable_epoch_blocks.len() {
563 if let Some(block_receipt) =
564 block_receipts.get(receipts_vec_offset + i)
565 {
566 epoch_receipts.push(block_receipt.block_receipts.clone());
567 } else {
568 return None;
570 }
571 }
572 let receipt_root = compute_receipts_root(&epoch_receipts);
573 let logs_bloom_hash =
574 BlockHeaderBuilder::compute_block_logs_bloom_hash(
575 &epoch_receipts,
576 );
577 if receipt_blame_vec[blame_vec_offset + idx] != receipt_root {
578 debug!(
579 "wrong receipt root, expected={:?}, now={:?}",
580 receipt_blame_vec[blame_vec_offset + idx],
581 receipt_root
582 );
583 return None;
584 }
585 if bloom_blame_vec[blame_vec_offset + idx] != logs_bloom_hash {
586 debug!(
587 "wrong logs bloom hash, expected={:?}, now={:?}",
588 bloom_blame_vec[blame_vec_offset + idx],
589 logs_bloom_hash
590 );
591 return None;
592 }
593 for i in 0..ordered_executable_epoch_blocks.len() {
594 result.push((
595 ordered_executable_epoch_blocks[i],
596 epoch_hash,
597 epoch_receipts[i].clone(),
598 ));
599 }
600 receipts_vec_offset += ordered_executable_epoch_blocks.len();
601 epoch_hash = *block_header.parent_hash();
602 }
603 if receipts_vec_offset == block_receipts.len() {
604 Some(result)
605 } else {
606 None
607 }
608 }
609
610 pub fn on_peer_disconnected(&mut self, peer: &NodeId) {
611 self.active_peers.remove(peer);
612 }
613
614 fn note_failure(&mut self, node_id: &NodeId) {
615 self.active_peers.remove(node_id);
616 }
617}
618
619impl Debug for SnapshotManifestManager {
620 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
621 write!(
622 f,
623 "(request_status = {:?}, candidate={:?} active_peers: {})",
624 self.manifest_request_status,
625 self.snapshot_candidate,
626 self.active_peers.len(),
627 )
628 }
629}
630
631pub struct SnapshotManifestConfig {
632 pub manifest_request_timeout: Duration,
633}