1use crate::sync::{
6 error::Error,
7 message::{
8 msgid, Context, SnapshotManifestRequest, SnapshotManifestResponse,
9 StateSyncCandidateRequest,
10 },
11 state::{
12 state_sync_candidate::state_sync_candidate_manager::StateSyncCandidateManager,
13 state_sync_chunk::snapshot_chunk_manager::{
14 SnapshotChunkConfig, SnapshotChunkManager,
15 },
16 state_sync_manifest::snapshot_manifest_manager::{
17 RelatedData, SnapshotManifestConfig, SnapshotManifestManager,
18 },
19 storage::{Chunk, ChunkKey, SnapshotSyncCandidate},
20 },
21 synchronization_state::PeerFilter,
22 SynchronizationProtocolHandler,
23};
24use cfx_parameters::consensus_internal::REWARD_EPOCH_COUNT;
25use cfx_storage::Result as StorageResult;
26use cfx_types::H256;
27use network::{node_table::NodeId, NetworkContext};
28use parking_lot::RwLock;
29use primitives::EpochId;
30use std::{
31 collections::HashSet,
32 fmt::{Debug, Formatter},
33 sync::Arc,
34 time::{Duration, Instant},
35};
36
37#[derive(Copy, Clone, PartialEq)]
38pub enum Status {
39 Inactive,
40 RequestingCandidates,
41 StartCandidateSync,
42 DownloadingManifest(Instant),
43 DownloadingChunks(Instant),
44 Completed,
45 Invalid,
46}
47
48impl Default for Status {
49 fn default() -> Self { Status::Inactive }
50}
51
52impl Debug for Status {
53 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
54 let status = match self {
55 Status::Inactive => "inactive".into(),
56 Status::RequestingCandidates => "requesting candidates".into(),
57 Status::StartCandidateSync => {
58 "about to request a candidate state".into()
59 }
60 Status::DownloadingManifest(t) => {
61 format!("downloading manifest ({:?})", t.elapsed())
62 }
63 Status::DownloadingChunks(t) => {
64 format!("downloading chunks ({:?})", t.elapsed())
65 }
66 Status::Completed => "completed".into(),
67 Status::Invalid => "invalid".into(),
68 };
69
70 write!(f, "{}", status)
71 }
72}
73
74struct Inner {
77 status: Status,
78
79 sync_candidate_manager: StateSyncCandidateManager,
80 chunk_manager: Option<SnapshotChunkManager>,
82 manifest_manager: Option<SnapshotManifestManager>,
83
84 related_data: Option<RelatedData>,
85 manifest_attempts: usize,
86}
87
88impl Default for Inner {
89 fn default() -> Self { Self::new() }
90}
91
92impl Inner {
93 fn new() -> Self {
94 Self {
95 sync_candidate_manager: Default::default(),
96 status: Status::Inactive,
97 related_data: None,
98 chunk_manager: None,
99 manifest_manager: None,
100 manifest_attempts: 0,
101 }
102 }
103
104 pub fn start_sync_for_candidate(
105 &mut self, sync_candidate: SnapshotSyncCandidate,
106 active_peers: HashSet<NodeId>, trusted_blame_block: H256,
107 io: &dyn NetworkContext, sync_handler: &SynchronizationProtocolHandler,
108 manifest_config: SnapshotManifestConfig,
109 ) {
110 if let Some(chunk_manager) = &mut self.chunk_manager {
111 if chunk_manager.snapshot_candidate == sync_candidate {
112 self.status = Status::DownloadingChunks(Instant::now());
121 chunk_manager.set_active_peers(active_peers);
122 return;
123 }
124 }
125 info!(
126 "start to sync state, snapshot_to_sync = {:?}, trusted blame block = {:?}",
127 sync_candidate, trusted_blame_block);
128 let manifest_manager = SnapshotManifestManager::new_and_start(
129 sync_candidate,
130 trusted_blame_block,
131 active_peers,
132 manifest_config,
133 io,
134 sync_handler,
135 );
136 self.manifest_manager = Some(manifest_manager);
137 self.status = Status::DownloadingManifest(Instant::now());
138 }
139
140 pub fn start_sync(
141 &mut self, current_era_genesis: EpochId,
142 candidates: Vec<SnapshotSyncCandidate>, io: &dyn NetworkContext,
143 sync_handler: &SynchronizationProtocolHandler,
144 ) {
145 let peers = PeerFilter::new(msgid::STATE_SYNC_CANDIDATE_REQUEST)
146 .select_all(&sync_handler.syn);
147 if peers.is_empty() {
148 return;
149 }
150 self.status = Status::RequestingCandidates;
151 self.sync_candidate_manager.reset(
152 current_era_genesis,
153 candidates.clone(),
154 peers.clone(),
155 );
156 self.request_candidates(io, sync_handler, candidates, peers);
157 }
158
159 fn request_candidates(
161 &self, io: &dyn NetworkContext,
162 sync_handler: &SynchronizationProtocolHandler,
163 candidates: Vec<SnapshotSyncCandidate>, peers: Vec<NodeId>,
164 ) {
165 let request = StateSyncCandidateRequest {
166 request_id: 0,
167 candidates,
168 };
169 for peer in peers {
170 sync_handler.request_manager.request_with_delay(
171 io,
172 Box::new(request.clone()),
173 Some(peer),
174 None,
175 );
176 }
177 }
178}
179
180impl Debug for Inner {
181 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
182 write!(
183 f,
184 "(status = {:?}, pending_peers: {}, manifest: {:?}, chunks: {:?})",
185 self.status,
186 self.sync_candidate_manager.pending_peers().len(),
187 self.manifest_manager,
188 self.chunk_manager,
189 )
190 }
191}
192
193pub struct SnapshotChunkSync {
194 inner: Arc<RwLock<Inner>>,
195 config: StateSyncConfiguration,
196}
197
198impl SnapshotChunkSync {
199 pub fn new(config: StateSyncConfiguration) -> Self {
200 SnapshotChunkSync {
201 inner: Default::default(),
202 config,
203 }
204 }
205
206 pub fn status(&self) -> Status { self.inner.read().status }
207
208 pub fn handle_snapshot_manifest_response(
209 &self, ctx: &Context, response: SnapshotManifestResponse,
210 request: &SnapshotManifestRequest,
211 ) -> Result<(), Error> {
212 let inner = &mut *self.inner.write();
213
214 if !matches!(inner.status, Status::DownloadingManifest(_)) {
216 info!("Snapshot manifest received, but mismatch with current status {:?}", inner.status);
217 return Ok(());
218 };
219 if let Some(manifest_manager) = &mut inner.manifest_manager {
220 let r = manifest_manager
221 .handle_snapshot_manifest_response(ctx, response, request)?;
222 if let Some(related_data) = r {
223 inner.status = Status::DownloadingChunks(Instant::now());
225 inner.chunk_manager =
226 Some(SnapshotChunkManager::new_and_start(
227 ctx,
228 manifest_manager.snapshot_candidate.clone(),
229 related_data.snapshot_info.clone(),
230 related_data.parent_snapshot_info.clone(),
231 manifest_manager.chunk_boundaries.clone(),
232 manifest_manager.chunk_boundary_proofs.clone(),
233 manifest_manager.active_peers.clone(),
234 self.config.chunk_config(),
235 related_data
240 .true_state_root_by_blame_info
241 .state_root
242 .delta_root,
243 )?);
244 inner.related_data = Some(related_data);
245 }
246 debug!("sync state progress: {:?}", *inner);
247 } else {
248 error!("manifest manager is None in status {:?}", inner.status);
249 }
250 if matches!(inner.status, Status::DownloadingChunks(_)) {
251 inner.manifest_manager = None;
252 }
253 Ok(())
254 }
255
256 pub fn handle_snapshot_chunk_response(
257 &self, ctx: &Context, chunk_key: ChunkKey, chunk: Chunk,
258 ) -> StorageResult<()> {
259 let mut inner = self.inner.write();
260
261 if !matches!(inner.status, Status::DownloadingChunks(_)) {
262 info!("Snapshot chunk {:?} received, but mismatch with current status {:?}",
263 chunk_key, inner.status);
264 return Ok(());
265 }
266
267 if let Some(chunk_manager) = &mut inner.chunk_manager {
268 if chunk_manager.add_chunk(ctx, chunk_key, chunk)? {
269 inner.status = Status::Completed;
278 }
279 } else {
280 debug!(
281 "Chunk {:?} received in status {:?}",
282 chunk_key, inner.status
283 );
284 }
285 info!("sync state progress: {:?}", *inner);
286 Ok(())
287 }
288
289 pub fn restore_execution_state(
290 &self, sync_handler: &SynchronizationProtocolHandler,
291 ) {
292 let inner = self.inner.read();
293 let related_data = inner
294 .related_data
295 .as_ref()
296 .expect("Set after receving manifest");
297 let mut deferred_block_hash =
298 related_data.snapshot_info.get_snapshot_epoch_id().clone();
299 for i in related_data.blame_vec_offset
303 ..(related_data.blame_vec_offset + REWARD_EPOCH_COUNT as usize)
304 {
305 info!(
306 "insert_epoch_execution_commitment for block hash {:?}",
307 &deferred_block_hash
308 );
309 sync_handler
310 .graph
311 .data_man
312 .insert_epoch_execution_commitment(
313 deferred_block_hash,
314 related_data.true_state_root_by_blame_info.clone(),
317 related_data.receipt_blame_vec[i],
318 related_data.bloom_blame_vec[i],
319 );
320 let block = sync_handler
321 .graph
322 .data_man
323 .block_header_by_hash(&deferred_block_hash)
324 .unwrap();
325 deferred_block_hash = *block.parent_hash();
326 }
327 for (block_hash, epoch_hash, receipts) in &related_data.epoch_receipts {
328 sync_handler.graph.data_man.insert_block_execution_result(
329 *block_hash,
330 *epoch_hash,
331 receipts.clone(),
332 true, );
334 }
335 }
336
337 pub fn handle_snapshot_candidate_response(
341 &self, peer: &NodeId,
342 supported_candidates: &Vec<SnapshotSyncCandidate>,
343 requested_candidates: &Vec<SnapshotSyncCandidate>,
344 ) {
345 self.inner.write().sync_candidate_manager.on_peer_response(
346 peer,
347 supported_candidates,
348 requested_candidates,
349 )
350 }
351
352 pub fn on_peer_disconnected(&self, peer: &NodeId) {
353 let mut inner = self.inner.write();
354 inner.sync_candidate_manager.on_peer_disconnected(peer);
355 if let Some(manifest_manager) = &mut inner.manifest_manager {
356 manifest_manager.on_peer_disconnected(peer);
357 }
358 if let Some(chunk_manager) = &mut inner.chunk_manager {
359 chunk_manager.on_peer_disconnected(peer);
360 }
361 }
362
363 pub fn update_status(
366 &self, current_era_genesis: EpochId, epoch_to_sync: EpochId,
367 io: &dyn NetworkContext, sync_handler: &SynchronizationProtocolHandler,
368 ) {
369 let mut inner = self.inner.write();
370 if inner.manifest_attempts
371 >= self.config.max_downloading_manifest_attempts
372 {
373 panic!(
374 "Exceed max manifest attempts {}",
375 self.config.max_downloading_manifest_attempts
376 );
377 }
378
379 debug!("sync state status before updating: {:?}", *inner);
380 self.check_timeout(
381 &mut *inner,
382 &Context {
383 node_id: Default::default(),
385 io,
386 manager: sync_handler,
387 },
388 );
389
390 if inner.sync_candidate_manager.current_era_genesis
397 == current_era_genesis
398 {
399 match inner.status {
400 Status::Completed => return,
401 Status::RequestingCandidates => {
402 if inner.sync_candidate_manager.pending_peers().is_empty() {
403 inner.status = Status::StartCandidateSync;
404 inner.sync_candidate_manager.set_active_candidate();
405 }
406 }
407 Status::DownloadingManifest(_) => {
408 if inner
409 .manifest_manager
410 .as_ref()
411 .expect("always set in DownloadingManifest")
412 .is_inactive()
413 {
414 inner.status = Status::StartCandidateSync;
417 inner.sync_candidate_manager.set_active_candidate();
418 }
419 }
420 Status::DownloadingChunks(_) => {
421 if inner
422 .chunk_manager
423 .as_ref()
424 .expect("always set in DownloadingChunks")
425 .is_inactive()
426 {
427 inner.status = Status::StartCandidateSync;
430 inner.sync_candidate_manager.set_active_candidate();
431 }
432 }
433 _ => {}
434 }
435 if inner.sync_candidate_manager.is_inactive()
436 && inner
437 .chunk_manager
438 .as_ref()
439 .map_or(true, |m| m.is_inactive())
440 && inner
441 .manifest_manager
442 .as_ref()
443 .map_or(true, |m| m.is_inactive())
444 {
445 warn!("current sync candidate becomes inactive: {:?}", inner);
449 inner.status = Status::Inactive;
450 inner.manifest_manager = None;
451 }
452 if inner.status == Status::StartCandidateSync {
454 if let Some((sync_candidate, active_peers)) = inner
455 .sync_candidate_manager
456 .get_active_candidate_and_peers()
457 {
458 match sync_handler
459 .graph
460 .consensus
461 .get_trusted_blame_block_for_snapshot(
462 sync_candidate.get_snapshot_epoch_id(),
463 ) {
464 Some(trusted_blame_block) => {
465 inner.start_sync_for_candidate(
466 sync_candidate,
467 active_peers,
468 trusted_blame_block,
469 io,
470 sync_handler,
471 self.config.manifest_config(),
472 );
473 }
474 None => {
475 error!("failed to start checkpoint sync, the trusted blame block is unavailable, epoch_to_sync={:?}", epoch_to_sync);
476 }
477 }
478 } else {
479 inner.status = Status::Inactive;
480 }
481 }
482 } else {
483 inner.status = Status::Inactive;
484 }
485
486 if inner.status == Status::Inactive {
487 let height = sync_handler
490 .graph
491 .data_man
492 .block_header_by_hash(&epoch_to_sync)
493 .expect("Syncing checkpoint should have available header")
494 .height();
495 let candidates = vec![SnapshotSyncCandidate::FullSync {
496 height,
497 snapshot_epoch_id: epoch_to_sync,
498 }];
499 inner.start_sync(current_era_genesis, candidates, io, sync_handler)
500 }
501 debug!("sync state status after updating: {:?}", *inner);
502 }
503
504 fn check_timeout(&self, inner: &mut Inner, ctx: &Context) {
505 inner
506 .sync_candidate_manager
507 .check_timeout(&self.config.candidate_request_timeout);
508 if let Some(manifest_manager) = &mut inner.manifest_manager {
509 manifest_manager.check_timeout(ctx);
510 }
511 if let Some(chunk_manager) = &mut inner.chunk_manager {
512 if !chunk_manager.check_timeout(ctx) {
513 debug!("reset status to Inactive and redownload manifest");
514 inner.status = Status::Inactive;
515 inner.chunk_manager = None;
516 inner.manifest_attempts += 1;
517 }
518 }
519 }
520}
521
522pub struct StateSyncConfiguration {
523 pub max_downloading_chunks: usize,
524 pub candidate_request_timeout: Duration,
525 pub chunk_request_timeout: Duration,
526 pub manifest_request_timeout: Duration,
527 pub max_downloading_manifest_attempts: usize,
528}
529
530impl StateSyncConfiguration {
531 fn chunk_config(&self) -> SnapshotChunkConfig {
532 SnapshotChunkConfig {
533 max_downloading_chunks: self.max_downloading_chunks,
534 chunk_request_timeout: self.chunk_request_timeout,
535 }
536 }
537
538 fn manifest_config(&self) -> SnapshotManifestConfig {
539 SnapshotManifestConfig {
540 manifest_request_timeout: self.manifest_request_timeout,
541 }
542 }
543}