cfxcore/sync/state/
snapshot_chunk_sync.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::sync::{
6    error::Error,
7    message::{
8        msgid, Context, SnapshotManifestRequest, SnapshotManifestResponse,
9        StateSyncCandidateRequest,
10    },
11    state::{
12        state_sync_candidate::state_sync_candidate_manager::StateSyncCandidateManager,
13        state_sync_chunk::snapshot_chunk_manager::{
14            SnapshotChunkConfig, SnapshotChunkManager,
15        },
16        state_sync_manifest::snapshot_manifest_manager::{
17            RelatedData, SnapshotManifestConfig, SnapshotManifestManager,
18        },
19        storage::{Chunk, ChunkKey, SnapshotSyncCandidate},
20    },
21    synchronization_state::PeerFilter,
22    SynchronizationProtocolHandler,
23};
24use cfx_parameters::consensus_internal::REWARD_EPOCH_COUNT;
25use cfx_storage::Result as StorageResult;
26use cfx_types::H256;
27use network::{node_table::NodeId, NetworkContext};
28use parking_lot::RwLock;
29use primitives::EpochId;
30use std::{
31    collections::HashSet,
32    fmt::{Debug, Formatter},
33    sync::Arc,
34    time::{Duration, Instant},
35};
36
37#[derive(Copy, Clone, PartialEq)]
38pub enum Status {
39    Inactive,
40    RequestingCandidates,
41    StartCandidateSync,
42    DownloadingManifest(Instant),
43    DownloadingChunks(Instant),
44    Completed,
45    Invalid,
46}
47
48impl Default for Status {
49    fn default() -> Self { Status::Inactive }
50}
51
52impl Debug for Status {
53    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
54        let status = match self {
55            Status::Inactive => "inactive".into(),
56            Status::RequestingCandidates => "requesting candidates".into(),
57            Status::StartCandidateSync => {
58                "about to request a candidate state".into()
59            }
60            Status::DownloadingManifest(t) => {
61                format!("downloading manifest ({:?})", t.elapsed())
62            }
63            Status::DownloadingChunks(t) => {
64                format!("downloading chunks ({:?})", t.elapsed())
65            }
66            Status::Completed => "completed".into(),
67            Status::Invalid => "invalid".into(),
68        };
69
70        write!(f, "{}", status)
71    }
72}
73
74// TODO: Implement OneStepSync / IncSync as this is currently only implemented
75// for FullSync.
76struct Inner {
77    status: Status,
78
79    sync_candidate_manager: StateSyncCandidateManager,
80    // Initialized after we receive a valid manifest.
81    chunk_manager: Option<SnapshotChunkManager>,
82    manifest_manager: Option<SnapshotManifestManager>,
83
84    related_data: Option<RelatedData>,
85    manifest_attempts: usize,
86}
87
88impl Default for Inner {
89    fn default() -> Self { Self::new() }
90}
91
92impl Inner {
93    fn new() -> Self {
94        Self {
95            sync_candidate_manager: Default::default(),
96            status: Status::Inactive,
97            related_data: None,
98            chunk_manager: None,
99            manifest_manager: None,
100            manifest_attempts: 0,
101        }
102    }
103
104    pub fn start_sync_for_candidate(
105        &mut self, sync_candidate: SnapshotSyncCandidate,
106        active_peers: HashSet<NodeId>, trusted_blame_block: H256,
107        io: &dyn NetworkContext, sync_handler: &SynchronizationProtocolHandler,
108        manifest_config: SnapshotManifestConfig,
109    ) {
110        if let Some(chunk_manager) = &mut self.chunk_manager {
111            if chunk_manager.snapshot_candidate == sync_candidate {
112                // TODO If the chunk manager does not make progress for a long
113                // time, we should also resync the manifest,
114                // because the manifest might be valid but also
115                // malicious. For example, the chunk size might be larger than
116                // MaxPacketSize so no one can return us that chunk.
117
118                // The new candidate is not changed, so we can resume our
119                // previous sync status with new `active_peers`.
120                self.status = Status::DownloadingChunks(Instant::now());
121                chunk_manager.set_active_peers(active_peers);
122                return;
123            }
124        }
125        info!(
126            "start to sync state, snapshot_to_sync = {:?}, trusted blame block = {:?}",
127            sync_candidate, trusted_blame_block);
128        let manifest_manager = SnapshotManifestManager::new_and_start(
129            sync_candidate,
130            trusted_blame_block,
131            active_peers,
132            manifest_config,
133            io,
134            sync_handler,
135        );
136        self.manifest_manager = Some(manifest_manager);
137        self.status = Status::DownloadingManifest(Instant::now());
138    }
139
140    pub fn start_sync(
141        &mut self, current_era_genesis: EpochId,
142        candidates: Vec<SnapshotSyncCandidate>, io: &dyn NetworkContext,
143        sync_handler: &SynchronizationProtocolHandler,
144    ) {
145        let peers = PeerFilter::new(msgid::STATE_SYNC_CANDIDATE_REQUEST)
146            .select_all(&sync_handler.syn);
147        if peers.is_empty() {
148            return;
149        }
150        self.status = Status::RequestingCandidates;
151        self.sync_candidate_manager.reset(
152            current_era_genesis,
153            candidates.clone(),
154            peers.clone(),
155        );
156        self.request_candidates(io, sync_handler, candidates, peers);
157    }
158
159    /// request state candidates from all peers
160    fn request_candidates(
161        &self, io: &dyn NetworkContext,
162        sync_handler: &SynchronizationProtocolHandler,
163        candidates: Vec<SnapshotSyncCandidate>, peers: Vec<NodeId>,
164    ) {
165        let request = StateSyncCandidateRequest {
166            request_id: 0,
167            candidates,
168        };
169        for peer in peers {
170            sync_handler.request_manager.request_with_delay(
171                io,
172                Box::new(request.clone()),
173                Some(peer),
174                None,
175            );
176        }
177    }
178}
179
180impl Debug for Inner {
181    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
182        write!(
183            f,
184            "(status = {:?}, pending_peers: {}, manifest: {:?}, chunks: {:?})",
185            self.status,
186            self.sync_candidate_manager.pending_peers().len(),
187            self.manifest_manager,
188            self.chunk_manager,
189        )
190    }
191}
192
193pub struct SnapshotChunkSync {
194    inner: Arc<RwLock<Inner>>,
195    config: StateSyncConfiguration,
196}
197
198impl SnapshotChunkSync {
199    pub fn new(config: StateSyncConfiguration) -> Self {
200        SnapshotChunkSync {
201            inner: Default::default(),
202            config,
203        }
204    }
205
206    pub fn status(&self) -> Status { self.inner.read().status }
207
208    pub fn handle_snapshot_manifest_response(
209        &self, ctx: &Context, response: SnapshotManifestResponse,
210        request: &SnapshotManifestRequest,
211    ) -> Result<(), Error> {
212        let inner = &mut *self.inner.write();
213
214        // status mismatch
215        if !matches!(inner.status, Status::DownloadingManifest(_)) {
216            info!("Snapshot manifest received, but mismatch with current status {:?}", inner.status);
217            return Ok(());
218        };
219        if let Some(manifest_manager) = &mut inner.manifest_manager {
220            let r = manifest_manager
221                .handle_snapshot_manifest_response(ctx, response, request)?;
222            if let Some(related_data) = r {
223                // update status
224                inner.status = Status::DownloadingChunks(Instant::now());
225                inner.chunk_manager =
226                    Some(SnapshotChunkManager::new_and_start(
227                        ctx,
228                        manifest_manager.snapshot_candidate.clone(),
229                        related_data.snapshot_info.clone(),
230                        related_data.parent_snapshot_info.clone(),
231                        manifest_manager.chunk_boundaries.clone(),
232                        manifest_manager.chunk_boundary_proofs.clone(),
233                        manifest_manager.active_peers.clone(),
234                        self.config.chunk_config(),
235                        // This delta_root is the intermediate_delta_root of
236                        // the new snapshot, and this field will be used to
237                        // fill new state_root in
238                        // get_state_trees_for_next_epoch
239                        related_data
240                            .true_state_root_by_blame_info
241                            .state_root
242                            .delta_root,
243                    )?);
244                inner.related_data = Some(related_data);
245            }
246            debug!("sync state progress: {:?}", *inner);
247        } else {
248            error!("manifest manager is None in status {:?}", inner.status);
249        }
250        if matches!(inner.status, Status::DownloadingChunks(_)) {
251            inner.manifest_manager = None;
252        }
253        Ok(())
254    }
255
256    pub fn handle_snapshot_chunk_response(
257        &self, ctx: &Context, chunk_key: ChunkKey, chunk: Chunk,
258    ) -> StorageResult<()> {
259        let mut inner = self.inner.write();
260
261        if !matches!(inner.status, Status::DownloadingChunks(_)) {
262            info!("Snapshot chunk {:?} received, but mismatch with current status {:?}",
263                chunk_key, inner.status);
264            return Ok(());
265        }
266
267        if let Some(chunk_manager) = &mut inner.chunk_manager {
268            if chunk_manager.add_chunk(ctx, chunk_key, chunk)? {
269                // Once the status becomes Completed, it will never be changed
270                // to another status, and all the related fields
271                // (snapshot_id, trust_blame_block, receipts, e.t.c.)
272                // of Inner will not be modified, because we return early in
273                // `update_status`
274                // and `handle_snapshot_manifest_response`. Thus, we can rely on
275                // the phase changing thread
276                // to call `restore_execution_state` later safely.
277                inner.status = Status::Completed;
278            }
279        } else {
280            debug!(
281                "Chunk {:?} received in status {:?}",
282                chunk_key, inner.status
283            );
284        }
285        info!("sync state progress: {:?}", *inner);
286        Ok(())
287    }
288
289    pub fn restore_execution_state(
290        &self, sync_handler: &SynchronizationProtocolHandler,
291    ) {
292        let inner = self.inner.read();
293        let related_data = inner
294            .related_data
295            .as_ref()
296            .expect("Set after receving manifest");
297        let mut deferred_block_hash =
298            related_data.snapshot_info.get_snapshot_epoch_id().clone();
299        // FIXME: Because state_root_aux_info can't be computed for state block
300        // FIXME: before snapshot, for the reward epoch count, maybe
301        // FIXME: save it to a dedicated place for reward computation.
302        for i in related_data.blame_vec_offset
303            ..(related_data.blame_vec_offset + REWARD_EPOCH_COUNT as usize)
304        {
305            info!(
306                "insert_epoch_execution_commitment for block hash {:?}",
307                &deferred_block_hash
308            );
309            sync_handler
310                .graph
311                .data_man
312                .insert_epoch_execution_commitment(
313                    deferred_block_hash,
314                    // FIXME: the state root is wrong for epochs before sync
315                    // FIXME: point. but these information won't be used.
316                    related_data.true_state_root_by_blame_info.clone(),
317                    related_data.receipt_blame_vec[i],
318                    related_data.bloom_blame_vec[i],
319                );
320            let block = sync_handler
321                .graph
322                .data_man
323                .block_header_by_hash(&deferred_block_hash)
324                .unwrap();
325            deferred_block_hash = *block.parent_hash();
326        }
327        for (block_hash, epoch_hash, receipts) in &related_data.epoch_receipts {
328            sync_handler.graph.data_man.insert_block_execution_result(
329                *block_hash,
330                *epoch_hash,
331                receipts.clone(),
332                true, /* persistent */
333            );
334        }
335    }
336
337    /// TODO Handling manifest requesting separately
338    /// Return Some if a candidate is ready and we can start requesting
339    /// manifests
340    pub fn handle_snapshot_candidate_response(
341        &self, peer: &NodeId,
342        supported_candidates: &Vec<SnapshotSyncCandidate>,
343        requested_candidates: &Vec<SnapshotSyncCandidate>,
344    ) {
345        self.inner.write().sync_candidate_manager.on_peer_response(
346            peer,
347            supported_candidates,
348            requested_candidates,
349        )
350    }
351
352    pub fn on_peer_disconnected(&self, peer: &NodeId) {
353        let mut inner = self.inner.write();
354        inner.sync_candidate_manager.on_peer_disconnected(peer);
355        if let Some(manifest_manager) = &mut inner.manifest_manager {
356            manifest_manager.on_peer_disconnected(peer);
357        }
358        if let Some(chunk_manager) = &mut inner.chunk_manager {
359            chunk_manager.on_peer_disconnected(peer);
360        }
361    }
362
363    /// Reset status if we cannot make progress based on current peers and
364    /// candidates
365    pub fn update_status(
366        &self, current_era_genesis: EpochId, epoch_to_sync: EpochId,
367        io: &dyn NetworkContext, sync_handler: &SynchronizationProtocolHandler,
368    ) {
369        let mut inner = self.inner.write();
370        if inner.manifest_attempts
371            >= self.config.max_downloading_manifest_attempts
372        {
373            panic!(
374                "Exceed max manifest attempts {}",
375                self.config.max_downloading_manifest_attempts
376            );
377        }
378
379        debug!("sync state status before updating: {:?}", *inner);
380        self.check_timeout(
381            &mut *inner,
382            &Context {
383                // node_id is not used here
384                node_id: Default::default(),
385                io,
386                manager: sync_handler,
387            },
388        );
389
390        // If we moves into the next era, we should force state_sync to change
391        // the candidates to states with in the new stable era. If the
392        // era stays the same and a new snapshot becomes available, we
393        // only change candidates if old candidates cannot to be synced,
394        // so a state can be synced with one era time instead of only
395        // one snapshot time
396        if inner.sync_candidate_manager.current_era_genesis
397            == current_era_genesis
398        {
399            match inner.status {
400                Status::Completed => return,
401                Status::RequestingCandidates => {
402                    if inner.sync_candidate_manager.pending_peers().is_empty() {
403                        inner.status = Status::StartCandidateSync;
404                        inner.sync_candidate_manager.set_active_candidate();
405                    }
406                }
407                Status::DownloadingManifest(_) => {
408                    if inner
409                        .manifest_manager
410                        .as_ref()
411                        .expect("always set in DownloadingManifest")
412                        .is_inactive()
413                    {
414                        // The current candidate fails, so try to choose the
415                        // next one.
416                        inner.status = Status::StartCandidateSync;
417                        inner.sync_candidate_manager.set_active_candidate();
418                    }
419                }
420                Status::DownloadingChunks(_) => {
421                    if inner
422                        .chunk_manager
423                        .as_ref()
424                        .expect("always set in DownloadingChunks")
425                        .is_inactive()
426                    {
427                        // The current candidate fails, so try to choose the
428                        // next one.
429                        inner.status = Status::StartCandidateSync;
430                        inner.sync_candidate_manager.set_active_candidate();
431                    }
432                }
433                _ => {}
434            }
435            if inner.sync_candidate_manager.is_inactive()
436                && inner
437                    .chunk_manager
438                    .as_ref()
439                    .map_or(true, |m| m.is_inactive())
440                && inner
441                    .manifest_manager
442                    .as_ref()
443                    .map_or(true, |m| m.is_inactive())
444            {
445                // We are requesting candidates and all `pending_peers` timeout,
446                // or we are syncing states and all
447                // `active_peers` for all candidates timeout.
448                warn!("current sync candidate becomes inactive: {:?}", inner);
449                inner.status = Status::Inactive;
450                inner.manifest_manager = None;
451            }
452            // We need to start/restart syncing states for a candidate.
453            if inner.status == Status::StartCandidateSync {
454                if let Some((sync_candidate, active_peers)) = inner
455                    .sync_candidate_manager
456                    .get_active_candidate_and_peers()
457                {
458                    match sync_handler
459                        .graph
460                        .consensus
461                        .get_trusted_blame_block_for_snapshot(
462                            sync_candidate.get_snapshot_epoch_id(),
463                        ) {
464                        Some(trusted_blame_block) => {
465                            inner.start_sync_for_candidate(
466                                sync_candidate,
467                                active_peers,
468                                trusted_blame_block,
469                                io,
470                                sync_handler,
471                                self.config.manifest_config(),
472                            );
473                        }
474                        None => {
475                            error!("failed to start checkpoint sync, the trusted blame block is unavailable, epoch_to_sync={:?}", epoch_to_sync);
476                        }
477                    }
478                } else {
479                    inner.status = Status::Inactive;
480                }
481            }
482        } else {
483            inner.status = Status::Inactive;
484        }
485
486        if inner.status == Status::Inactive {
487            // New era started or all candidates fail, we should restart
488            // candidates sync
489            let height = sync_handler
490                .graph
491                .data_man
492                .block_header_by_hash(&epoch_to_sync)
493                .expect("Syncing checkpoint should have available header")
494                .height();
495            let candidates = vec![SnapshotSyncCandidate::FullSync {
496                height,
497                snapshot_epoch_id: epoch_to_sync,
498            }];
499            inner.start_sync(current_era_genesis, candidates, io, sync_handler)
500        }
501        debug!("sync state status after updating: {:?}", *inner);
502    }
503
504    fn check_timeout(&self, inner: &mut Inner, ctx: &Context) {
505        inner
506            .sync_candidate_manager
507            .check_timeout(&self.config.candidate_request_timeout);
508        if let Some(manifest_manager) = &mut inner.manifest_manager {
509            manifest_manager.check_timeout(ctx);
510        }
511        if let Some(chunk_manager) = &mut inner.chunk_manager {
512            if !chunk_manager.check_timeout(ctx) {
513                debug!("reset status to Inactive and redownload manifest");
514                inner.status = Status::Inactive;
515                inner.chunk_manager = None;
516                inner.manifest_attempts += 1;
517            }
518        }
519    }
520}
521
522pub struct StateSyncConfiguration {
523    pub max_downloading_chunks: usize,
524    pub candidate_request_timeout: Duration,
525    pub chunk_request_timeout: Duration,
526    pub manifest_request_timeout: Duration,
527    pub max_downloading_manifest_attempts: usize,
528}
529
530impl StateSyncConfiguration {
531    fn chunk_config(&self) -> SnapshotChunkConfig {
532        SnapshotChunkConfig {
533            max_downloading_chunks: self.max_downloading_chunks,
534            chunk_request_timeout: self.chunk_request_timeout,
535        }
536    }
537
538    fn manifest_config(&self) -> SnapshotManifestConfig {
539        SnapshotManifestConfig {
540            manifest_request_timeout: self.manifest_request_timeout,
541        }
542    }
543}