cfxcore/sync/
synchronization_phases.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::{
6    sync::{
7        message::DynamicCapability,
8        state::{SnapshotChunkSync, Status},
9        synchronization_protocol_handler::SynchronizationProtocolHandler,
10        synchronization_state::SynchronizationState,
11        SharedSynchronizationGraph,
12    },
13    ConsensusGraph,
14};
15use cfx_internal_common::StateAvailabilityBoundary;
16use cfx_parameters::sync::CATCH_UP_EPOCH_LAG_THRESHOLD;
17use network::NetworkContext;
18use parking_lot::RwLock;
19use std::{
20    collections::HashMap,
21    sync::{
22        atomic::{AtomicBool, Ordering as AtomicOrdering},
23        Arc,
24    },
25    thread,
26    time::{self, Instant},
27};
28
29/// Both Archive and Full node go through the following phases:
30///     CatchUpRecoverBlockHeaderFromDB --> CatchUpSyncBlockHeader -->
31///     CatchUpCheckpoint --> CatchUpFillBlockBody -->
32///     CatchUpSyncBlock --> Normal
33
34#[derive(Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq, Hash)]
35pub enum SyncPhaseType {
36    CatchUpRecoverBlockHeaderFromDB = 0,
37    CatchUpSyncBlockHeader = 1,
38    CatchUpCheckpoint = 2,
39    CatchUpFillBlockBodyPhase = 3,
40    CatchUpSyncBlock = 4,
41    Normal = 5,
42}
43
44pub trait SynchronizationPhaseTrait: Send + Sync {
45    fn name(&self) -> &'static str;
46    fn phase_type(&self) -> SyncPhaseType;
47    fn next(
48        &self, _io: &dyn NetworkContext,
49        _sync_handler: &SynchronizationProtocolHandler,
50    ) -> SyncPhaseType;
51    fn start(
52        &self, _io: &dyn NetworkContext,
53        _sync_handler: &SynchronizationProtocolHandler,
54    );
55}
56
57pub struct SynchronizationPhaseManagerInner {
58    initialized: bool,
59    current_phase: SyncPhaseType,
60    phases: HashMap<SyncPhaseType, Arc<dyn SynchronizationPhaseTrait>>,
61}
62
63impl SynchronizationPhaseManagerInner {
64    pub fn new(initial_phase_type: SyncPhaseType) -> Self {
65        SynchronizationPhaseManagerInner {
66            initialized: false,
67            current_phase: initial_phase_type,
68            phases: HashMap::new(),
69        }
70    }
71
72    pub fn register_phase(
73        &mut self, phase: Arc<dyn SynchronizationPhaseTrait>,
74    ) {
75        self.phases.insert(phase.phase_type(), phase);
76    }
77
78    pub fn get_phase(
79        &self, phase_type: SyncPhaseType,
80    ) -> Arc<dyn SynchronizationPhaseTrait> {
81        self.phases.get(&phase_type).unwrap().clone()
82    }
83
84    pub fn get_current_phase(&self) -> Arc<dyn SynchronizationPhaseTrait> {
85        self.get_phase(self.current_phase)
86    }
87
88    pub fn change_phase_to(&mut self, phase_type: SyncPhaseType) {
89        self.current_phase = phase_type;
90    }
91
92    pub fn try_initialize(&mut self) -> bool {
93        let initialized = self.initialized;
94        if !self.initialized {
95            self.initialized = true;
96        }
97
98        initialized
99    }
100}
101
102pub struct SynchronizationPhaseManager {
103    inner: RwLock<SynchronizationPhaseManagerInner>,
104}
105
106impl SynchronizationPhaseManager {
107    pub fn new(
108        initial_phase_type: SyncPhaseType,
109        sync_state: Arc<SynchronizationState>,
110        sync_graph: SharedSynchronizationGraph,
111        state_sync: Arc<SnapshotChunkSync>, consensus: Arc<ConsensusGraph>,
112    ) -> Self {
113        let sync_manager = SynchronizationPhaseManager {
114            inner: RwLock::new(SynchronizationPhaseManagerInner::new(
115                initial_phase_type,
116            )),
117        };
118
119        sync_manager.register_phase(Arc::new(
120            CatchUpRecoverBlockHeaderFromDbPhase::new(sync_graph.clone()),
121        ));
122        sync_manager.register_phase(Arc::new(
123            CatchUpSyncBlockHeaderPhase::new(
124                sync_state.clone(),
125                sync_graph.clone(),
126            ),
127        ));
128        sync_manager
129            .register_phase(Arc::new(CatchUpCheckpointPhase::new(state_sync)));
130        sync_manager.register_phase(Arc::new(CatchUpFillBlockBodyPhase::new(
131            sync_graph.clone(),
132        )));
133        sync_manager.register_phase(Arc::new(CatchUpSyncBlockPhase::new(
134            sync_state.clone(),
135            sync_graph.clone(),
136        )));
137        sync_manager.register_phase(Arc::new(NormalSyncPhase::new(consensus)));
138
139        sync_manager
140    }
141
142    pub fn register_phase(&self, phase: Arc<dyn SynchronizationPhaseTrait>) {
143        self.inner.write().register_phase(phase);
144    }
145
146    pub fn get_phase(
147        &self, phase_type: SyncPhaseType,
148    ) -> Arc<dyn SynchronizationPhaseTrait> {
149        self.inner.read().get_phase(phase_type)
150    }
151
152    pub fn get_current_phase(&self) -> Arc<dyn SynchronizationPhaseTrait> {
153        self.inner.read().get_current_phase()
154    }
155
156    pub fn change_phase_to(
157        &self, phase_type: SyncPhaseType, io: &dyn NetworkContext,
158        sync_handler: &SynchronizationProtocolHandler,
159    ) {
160        self.inner.write().change_phase_to(phase_type);
161        let current_phase = self.get_current_phase();
162        current_phase.start(io, sync_handler);
163    }
164
165    pub fn try_initialize(
166        &self, io: &dyn NetworkContext,
167        sync_handler: &SynchronizationProtocolHandler,
168    ) {
169        if !self.inner.write().try_initialize() {
170            // if not initialized
171            let current_phase = self.get_current_phase();
172            current_phase.start(io, sync_handler);
173        }
174    }
175}
176
177pub struct CatchUpRecoverBlockHeaderFromDbPhase {
178    pub graph: SharedSynchronizationGraph,
179    pub recovered: Arc<AtomicBool>,
180}
181
182impl CatchUpRecoverBlockHeaderFromDbPhase {
183    pub fn new(graph: SharedSynchronizationGraph) -> Self {
184        CatchUpRecoverBlockHeaderFromDbPhase {
185            graph,
186            recovered: Arc::new(AtomicBool::new(false)),
187        }
188    }
189}
190
191impl SynchronizationPhaseTrait for CatchUpRecoverBlockHeaderFromDbPhase {
192    fn name(&self) -> &'static str { "CatchUpRecoverBlockHeaderFromDbPhase" }
193
194    fn phase_type(&self) -> SyncPhaseType {
195        SyncPhaseType::CatchUpRecoverBlockHeaderFromDB
196    }
197
198    fn next(
199        &self, io: &dyn NetworkContext,
200        sync_handler: &SynchronizationProtocolHandler,
201    ) -> SyncPhaseType {
202        if self.recovered.load(AtomicOrdering::SeqCst) == false {
203            return self.phase_type();
204        }
205
206        DynamicCapability::ServeHeaders(true).broadcast(io, &sync_handler.syn);
207        SyncPhaseType::CatchUpSyncBlockHeader
208    }
209
210    fn start(
211        &self, _io: &dyn NetworkContext,
212        _sync_handler: &SynchronizationProtocolHandler,
213    ) {
214        info!("start phase {:?}", self.name());
215        self.recovered.store(false, AtomicOrdering::SeqCst);
216        let recovered = self.recovered.clone();
217        let graph = self.graph.clone();
218        std::thread::spawn(move || {
219            graph.recover_graph_from_db();
220            recovered.store(true, AtomicOrdering::SeqCst);
221            info!("finish recover header graph from db");
222        });
223    }
224}
225
226pub struct CatchUpSyncBlockHeaderPhase {
227    pub syn: Arc<SynchronizationState>,
228    pub graph: SharedSynchronizationGraph,
229}
230
231impl CatchUpSyncBlockHeaderPhase {
232    pub fn new(
233        syn: Arc<SynchronizationState>, graph: SharedSynchronizationGraph,
234    ) -> Self {
235        CatchUpSyncBlockHeaderPhase { syn, graph }
236    }
237}
238
239impl SynchronizationPhaseTrait for CatchUpSyncBlockHeaderPhase {
240    fn name(&self) -> &'static str { "CatchUpSyncBlockHeaderPhase" }
241
242    fn phase_type(&self) -> SyncPhaseType {
243        SyncPhaseType::CatchUpSyncBlockHeader
244    }
245
246    fn next(
247        &self, _io: &dyn NetworkContext,
248        _sync_handler: &SynchronizationProtocolHandler,
249    ) -> SyncPhaseType {
250        let median_epoch = match self.syn.median_epoch_from_normal_peers() {
251            None => {
252                return if self.syn.allow_phase_change_without_peer() {
253                    SyncPhaseType::CatchUpCheckpoint
254                } else {
255                    self.phase_type()
256                }
257            }
258            Some(epoch) => epoch,
259        };
260        debug!(
261            "best_epoch: {}, peer median: {}",
262            self.graph.consensus.best_epoch_number(),
263            median_epoch
264        );
265        // FIXME: OK, what if the chain height is close, or even local height is
266        // FIXME: larger, but the chain forked earlier very far away?
267        if self.graph.consensus.catch_up_completed(median_epoch) {
268            return SyncPhaseType::CatchUpCheckpoint;
269        }
270
271        self.phase_type()
272    }
273
274    fn start(
275        &self, io: &dyn NetworkContext,
276        sync_handler: &SynchronizationProtocolHandler,
277    ) {
278        info!("start phase {:?}", self.name());
279        let (_, cur_era_genesis_height) =
280            self.graph.get_genesis_hash_and_height_in_current_era();
281        *sync_handler.latest_epoch_requested.lock() =
282            (cur_era_genesis_height, Instant::now(), 0, 0);
283
284        // sync block headers from peers
285        sync_handler.request_epochs(io);
286    }
287}
288
289pub struct CatchUpCheckpointPhase {
290    state_sync: Arc<SnapshotChunkSync>,
291
292    /// Is `true` if we have the state locally and do not need to sync
293    /// checkpoints. Only set when the phase starts.
294    has_state: AtomicBool,
295}
296
297impl CatchUpCheckpointPhase {
298    pub fn new(state_sync: Arc<SnapshotChunkSync>) -> Self {
299        CatchUpCheckpointPhase {
300            state_sync,
301            has_state: AtomicBool::new(false),
302        }
303    }
304}
305
306impl SynchronizationPhaseTrait for CatchUpCheckpointPhase {
307    fn name(&self) -> &'static str { "CatchUpCheckpointPhase" }
308
309    fn phase_type(&self) -> SyncPhaseType { SyncPhaseType::CatchUpCheckpoint }
310
311    fn next(
312        &self, io: &dyn NetworkContext,
313        sync_handler: &SynchronizationProtocolHandler,
314    ) -> SyncPhaseType {
315        if self.has_state.load(AtomicOrdering::SeqCst) {
316            return SyncPhaseType::CatchUpFillBlockBodyPhase;
317        }
318        let epoch_to_sync = sync_handler.graph.consensus.get_to_sync_epoch_id();
319        let current_era_genesis = sync_handler
320            .graph
321            .data_man
322            .get_cur_consensus_era_genesis_hash();
323        self.state_sync.update_status(
324            current_era_genesis,
325            epoch_to_sync,
326            io,
327            sync_handler,
328        );
329        if self.state_sync.status() == Status::Completed {
330            self.state_sync.restore_execution_state(sync_handler);
331            *sync_handler.synced_epoch_id.lock() = Some(epoch_to_sync);
332            SyncPhaseType::CatchUpFillBlockBodyPhase
333        } else {
334            self.phase_type()
335        }
336    }
337
338    fn start(
339        &self, io: &dyn NetworkContext,
340        sync_handler: &SynchronizationProtocolHandler,
341    ) {
342        info!("start phase {:?}", self.name());
343        sync_handler.graph.inner.write().locked_for_catchup = true;
344        while sync_handler.graph.is_consensus_worker_busy() {
345            thread::sleep(time::Duration::from_millis(100));
346        }
347        let current_era_genesis = sync_handler
348            .graph
349            .data_man
350            .get_cur_consensus_era_genesis_hash();
351        let epoch_to_sync = sync_handler.graph.consensus.get_to_sync_epoch_id();
352
353        // FIXME: what happens if the snapshot before epoch_to_sync is
354        // corrupted?
355        if let Some(commitment) = sync_handler
356            .graph
357            .data_man
358            .load_epoch_execution_commitment_from_db(&epoch_to_sync)
359        {
360            info!("CatchUpCheckpointPhase: commitment for epoch {:?} exists, skip state sync. \
361                commitment={:?}", epoch_to_sync, commitment);
362            self.has_state.store(true, AtomicOrdering::SeqCst);
363
364            // TODO Here has_state could mean we have the snapshot of the state
365            // or the last snapshot and the delta mpt. We only need to specially
366            // handle the case of snapshot-only state where we
367            // cannot compute state_valid because we do not have a
368            // valid state root.
369            if epoch_to_sync != sync_handler.graph.data_man.true_genesis.hash()
370            {
371                *sync_handler.synced_epoch_id.lock() = Some(epoch_to_sync);
372            }
373            return;
374        }
375
376        self.state_sync.update_status(
377            current_era_genesis,
378            epoch_to_sync,
379            io,
380            sync_handler,
381        );
382    }
383}
384
385pub struct CatchUpFillBlockBodyPhase {
386    pub graph: SharedSynchronizationGraph,
387}
388
389impl CatchUpFillBlockBodyPhase {
390    pub fn new(graph: SharedSynchronizationGraph) -> Self {
391        CatchUpFillBlockBodyPhase { graph }
392    }
393}
394
395impl SynchronizationPhaseTrait for CatchUpFillBlockBodyPhase {
396    fn name(&self) -> &'static str { "CatchUpFillBlockBodyPhase" }
397
398    fn phase_type(&self) -> SyncPhaseType {
399        SyncPhaseType::CatchUpFillBlockBodyPhase
400    }
401
402    fn next(
403        &self, io: &dyn NetworkContext,
404        sync_handler: &SynchronizationProtocolHandler,
405    ) -> SyncPhaseType {
406        if self.graph.is_fill_block_completed() {
407            if self.graph.complete_filling_block_bodies() {
408                return SyncPhaseType::CatchUpSyncBlock;
409            } else {
410                // consensus graph is reconstructed and we need to request more
411                // bodies
412                sync_handler.request_block_bodies(io)
413            }
414        }
415        self.phase_type()
416    }
417
418    fn start(
419        &self, io: &dyn NetworkContext,
420        sync_handler: &SynchronizationProtocolHandler,
421    ) {
422        info!("start phase {:?}", self.name());
423        {
424            let full_state_start_height = self
425                .graph
426                .data_man
427                .storage_manager
428                .config()
429                .full_state_start_height();
430            let full_state_space = self
431                .graph
432                .data_man
433                .storage_manager
434                .config()
435                .single_mpt_space;
436            // For both archive and full node, synced_epoch_id possible be
437            // `None`. It wil be none when stable epoch is equal to
438            // true genesis In both cases, we should set
439            // `state_availability_boundary` to
440            // `[cur_era_stable_height, cur_era_stable_height]`.
441            if let Some(epoch_synced) = &*sync_handler.synced_epoch_id.lock() {
442                let epoch_synced_height = self
443                    .graph
444                    .data_man
445                    .block_header_by_hash(epoch_synced)
446                    .expect("Header for checkpoint exists")
447                    .height();
448                *self.graph.data_man.state_availability_boundary.write() =
449                    StateAvailabilityBoundary::new(
450                        *epoch_synced,
451                        epoch_synced_height,
452                        full_state_start_height,
453                        full_state_space,
454                    );
455                self.graph
456                    .data_man
457                    .state_availability_boundary
458                    .write()
459                    .set_synced_state_height(epoch_synced_height);
460            } else {
461                let cur_era_stable_hash =
462                    self.graph.data_man.get_cur_consensus_era_stable_hash();
463                let cur_era_stable_height = self
464                    .graph
465                    .data_man
466                    .block_header_by_hash(&cur_era_stable_hash)
467                    .expect("stable era block header must exist")
468                    .height();
469                *self.graph.data_man.state_availability_boundary.write() =
470                    StateAvailabilityBoundary::new(
471                        cur_era_stable_hash,
472                        cur_era_stable_height,
473                        full_state_start_height,
474                        full_state_space,
475                    );
476            }
477            self.graph.inner.write().block_to_fill_set =
478                self.graph.consensus.get_blocks_needing_bodies();
479            sync_handler.request_block_bodies(io);
480        }
481    }
482}
483
484pub struct CatchUpSyncBlockPhase {
485    pub syn: Arc<SynchronizationState>,
486    pub graph: SharedSynchronizationGraph,
487}
488
489impl CatchUpSyncBlockPhase {
490    pub fn new(
491        syn: Arc<SynchronizationState>, graph: SharedSynchronizationGraph,
492    ) -> Self {
493        CatchUpSyncBlockPhase { syn, graph }
494    }
495}
496
497impl SynchronizationPhaseTrait for CatchUpSyncBlockPhase {
498    fn name(&self) -> &'static str { "CatchUpSyncBlockPhase" }
499
500    fn phase_type(&self) -> SyncPhaseType { SyncPhaseType::CatchUpSyncBlock }
501
502    fn next(
503        &self, _io: &dyn NetworkContext,
504        sync_handler: &SynchronizationProtocolHandler,
505    ) -> SyncPhaseType {
506        // FIXME: use target_height instead.
507        let median_epoch = match self.syn.median_epoch_from_normal_peers() {
508            None => {
509                return if self.syn.allow_phase_change_without_peer() {
510                    sync_handler.graph.consensus.enter_normal_phase();
511                    SyncPhaseType::Normal
512                } else {
513                    self.phase_type()
514                }
515            }
516            Some(epoch) => epoch,
517        };
518        // FIXME: OK, what if the chain height is close, or even local height is
519        // FIXME: larger, but the chain forked earlier very far away?
520        if self.graph.consensus.best_epoch_number()
521            + CATCH_UP_EPOCH_LAG_THRESHOLD
522            >= median_epoch
523        {
524            sync_handler.graph.consensus.enter_normal_phase();
525            return SyncPhaseType::Normal;
526        }
527
528        self.phase_type()
529    }
530
531    fn start(
532        &self, io: &dyn NetworkContext,
533        sync_handler: &SynchronizationProtocolHandler,
534    ) {
535        info!("start phase {:?}", self.name());
536        let (_, cur_era_genesis_height) =
537            self.graph.get_genesis_hash_and_height_in_current_era();
538        *sync_handler.latest_epoch_requested.lock() =
539            (cur_era_genesis_height, Instant::now(), 0, 0);
540
541        sync_handler.request_epochs(io);
542    }
543}
544
545pub struct NormalSyncPhase {
546    _consensus: Arc<ConsensusGraph>,
547}
548
549impl NormalSyncPhase {
550    pub fn new(consensus: Arc<ConsensusGraph>) -> Self {
551        NormalSyncPhase {
552            _consensus: consensus,
553        }
554    }
555}
556
557impl SynchronizationPhaseTrait for NormalSyncPhase {
558    fn name(&self) -> &'static str { "NormalSyncPhase" }
559
560    fn phase_type(&self) -> SyncPhaseType { SyncPhaseType::Normal }
561
562    fn next(
563        &self, _io: &dyn NetworkContext,
564        _sync_handler: &SynchronizationProtocolHandler,
565    ) -> SyncPhaseType {
566        // FIXME: handle the case where we need to switch back phase
567        self.phase_type()
568    }
569
570    fn start(
571        &self, io: &dyn NetworkContext,
572        sync_handler: &SynchronizationProtocolHandler,
573    ) {
574        info!("start phase {:?}", self.name());
575        sync_handler.request_missing_terminals(io);
576    }
577}