1use crate::{
6 sync::{
7 message::DynamicCapability,
8 state::{SnapshotChunkSync, Status},
9 synchronization_protocol_handler::SynchronizationProtocolHandler,
10 synchronization_state::SynchronizationState,
11 SharedSynchronizationGraph,
12 },
13 ConsensusGraph,
14};
15use cfx_internal_common::StateAvailabilityBoundary;
16use cfx_parameters::sync::CATCH_UP_EPOCH_LAG_THRESHOLD;
17use network::NetworkContext;
18use parking_lot::RwLock;
19use std::{
20 collections::HashMap,
21 sync::{
22 atomic::{AtomicBool, Ordering as AtomicOrdering},
23 Arc,
24 },
25 thread,
26 time::{self, Instant},
27};
28
29#[derive(Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq, Hash)]
35pub enum SyncPhaseType {
36 CatchUpRecoverBlockHeaderFromDB = 0,
37 CatchUpSyncBlockHeader = 1,
38 CatchUpCheckpoint = 2,
39 CatchUpFillBlockBodyPhase = 3,
40 CatchUpSyncBlock = 4,
41 Normal = 5,
42}
43
44pub trait SynchronizationPhaseTrait: Send + Sync {
45 fn name(&self) -> &'static str;
46 fn phase_type(&self) -> SyncPhaseType;
47 fn next(
48 &self, _io: &dyn NetworkContext,
49 _sync_handler: &SynchronizationProtocolHandler,
50 ) -> SyncPhaseType;
51 fn start(
52 &self, _io: &dyn NetworkContext,
53 _sync_handler: &SynchronizationProtocolHandler,
54 );
55}
56
57pub struct SynchronizationPhaseManagerInner {
58 initialized: bool,
59 current_phase: SyncPhaseType,
60 phases: HashMap<SyncPhaseType, Arc<dyn SynchronizationPhaseTrait>>,
61}
62
63impl SynchronizationPhaseManagerInner {
64 pub fn new(initial_phase_type: SyncPhaseType) -> Self {
65 SynchronizationPhaseManagerInner {
66 initialized: false,
67 current_phase: initial_phase_type,
68 phases: HashMap::new(),
69 }
70 }
71
72 pub fn register_phase(
73 &mut self, phase: Arc<dyn SynchronizationPhaseTrait>,
74 ) {
75 self.phases.insert(phase.phase_type(), phase);
76 }
77
78 pub fn get_phase(
79 &self, phase_type: SyncPhaseType,
80 ) -> Arc<dyn SynchronizationPhaseTrait> {
81 self.phases.get(&phase_type).unwrap().clone()
82 }
83
84 pub fn get_current_phase(&self) -> Arc<dyn SynchronizationPhaseTrait> {
85 self.get_phase(self.current_phase)
86 }
87
88 pub fn change_phase_to(&mut self, phase_type: SyncPhaseType) {
89 self.current_phase = phase_type;
90 }
91
92 pub fn try_initialize(&mut self) -> bool {
93 let initialized = self.initialized;
94 if !self.initialized {
95 self.initialized = true;
96 }
97
98 initialized
99 }
100}
101
102pub struct SynchronizationPhaseManager {
103 inner: RwLock<SynchronizationPhaseManagerInner>,
104}
105
106impl SynchronizationPhaseManager {
107 pub fn new(
108 initial_phase_type: SyncPhaseType,
109 sync_state: Arc<SynchronizationState>,
110 sync_graph: SharedSynchronizationGraph,
111 state_sync: Arc<SnapshotChunkSync>, consensus: Arc<ConsensusGraph>,
112 ) -> Self {
113 let sync_manager = SynchronizationPhaseManager {
114 inner: RwLock::new(SynchronizationPhaseManagerInner::new(
115 initial_phase_type,
116 )),
117 };
118
119 sync_manager.register_phase(Arc::new(
120 CatchUpRecoverBlockHeaderFromDbPhase::new(sync_graph.clone()),
121 ));
122 sync_manager.register_phase(Arc::new(
123 CatchUpSyncBlockHeaderPhase::new(
124 sync_state.clone(),
125 sync_graph.clone(),
126 ),
127 ));
128 sync_manager
129 .register_phase(Arc::new(CatchUpCheckpointPhase::new(state_sync)));
130 sync_manager.register_phase(Arc::new(CatchUpFillBlockBodyPhase::new(
131 sync_graph.clone(),
132 )));
133 sync_manager.register_phase(Arc::new(CatchUpSyncBlockPhase::new(
134 sync_state.clone(),
135 sync_graph.clone(),
136 )));
137 sync_manager.register_phase(Arc::new(NormalSyncPhase::new(consensus)));
138
139 sync_manager
140 }
141
142 pub fn register_phase(&self, phase: Arc<dyn SynchronizationPhaseTrait>) {
143 self.inner.write().register_phase(phase);
144 }
145
146 pub fn get_phase(
147 &self, phase_type: SyncPhaseType,
148 ) -> Arc<dyn SynchronizationPhaseTrait> {
149 self.inner.read().get_phase(phase_type)
150 }
151
152 pub fn get_current_phase(&self) -> Arc<dyn SynchronizationPhaseTrait> {
153 self.inner.read().get_current_phase()
154 }
155
156 pub fn change_phase_to(
157 &self, phase_type: SyncPhaseType, io: &dyn NetworkContext,
158 sync_handler: &SynchronizationProtocolHandler,
159 ) {
160 self.inner.write().change_phase_to(phase_type);
161 let current_phase = self.get_current_phase();
162 current_phase.start(io, sync_handler);
163 }
164
165 pub fn try_initialize(
166 &self, io: &dyn NetworkContext,
167 sync_handler: &SynchronizationProtocolHandler,
168 ) {
169 if !self.inner.write().try_initialize() {
170 let current_phase = self.get_current_phase();
172 current_phase.start(io, sync_handler);
173 }
174 }
175}
176
177pub struct CatchUpRecoverBlockHeaderFromDbPhase {
178 pub graph: SharedSynchronizationGraph,
179 pub recovered: Arc<AtomicBool>,
180}
181
182impl CatchUpRecoverBlockHeaderFromDbPhase {
183 pub fn new(graph: SharedSynchronizationGraph) -> Self {
184 CatchUpRecoverBlockHeaderFromDbPhase {
185 graph,
186 recovered: Arc::new(AtomicBool::new(false)),
187 }
188 }
189}
190
191impl SynchronizationPhaseTrait for CatchUpRecoverBlockHeaderFromDbPhase {
192 fn name(&self) -> &'static str { "CatchUpRecoverBlockHeaderFromDbPhase" }
193
194 fn phase_type(&self) -> SyncPhaseType {
195 SyncPhaseType::CatchUpRecoverBlockHeaderFromDB
196 }
197
198 fn next(
199 &self, io: &dyn NetworkContext,
200 sync_handler: &SynchronizationProtocolHandler,
201 ) -> SyncPhaseType {
202 if self.recovered.load(AtomicOrdering::SeqCst) == false {
203 return self.phase_type();
204 }
205
206 DynamicCapability::ServeHeaders(true).broadcast(io, &sync_handler.syn);
207 SyncPhaseType::CatchUpSyncBlockHeader
208 }
209
210 fn start(
211 &self, _io: &dyn NetworkContext,
212 _sync_handler: &SynchronizationProtocolHandler,
213 ) {
214 info!("start phase {:?}", self.name());
215 self.recovered.store(false, AtomicOrdering::SeqCst);
216 let recovered = self.recovered.clone();
217 let graph = self.graph.clone();
218 std::thread::spawn(move || {
219 graph.recover_graph_from_db();
220 recovered.store(true, AtomicOrdering::SeqCst);
221 info!("finish recover header graph from db");
222 });
223 }
224}
225
226pub struct CatchUpSyncBlockHeaderPhase {
227 pub syn: Arc<SynchronizationState>,
228 pub graph: SharedSynchronizationGraph,
229}
230
231impl CatchUpSyncBlockHeaderPhase {
232 pub fn new(
233 syn: Arc<SynchronizationState>, graph: SharedSynchronizationGraph,
234 ) -> Self {
235 CatchUpSyncBlockHeaderPhase { syn, graph }
236 }
237}
238
239impl SynchronizationPhaseTrait for CatchUpSyncBlockHeaderPhase {
240 fn name(&self) -> &'static str { "CatchUpSyncBlockHeaderPhase" }
241
242 fn phase_type(&self) -> SyncPhaseType {
243 SyncPhaseType::CatchUpSyncBlockHeader
244 }
245
246 fn next(
247 &self, _io: &dyn NetworkContext,
248 _sync_handler: &SynchronizationProtocolHandler,
249 ) -> SyncPhaseType {
250 let median_epoch = match self.syn.median_epoch_from_normal_peers() {
251 None => {
252 return if self.syn.allow_phase_change_without_peer() {
253 SyncPhaseType::CatchUpCheckpoint
254 } else {
255 self.phase_type()
256 }
257 }
258 Some(epoch) => epoch,
259 };
260 debug!(
261 "best_epoch: {}, peer median: {}",
262 self.graph.consensus.best_epoch_number(),
263 median_epoch
264 );
265 if self.graph.consensus.catch_up_completed(median_epoch) {
268 return SyncPhaseType::CatchUpCheckpoint;
269 }
270
271 self.phase_type()
272 }
273
274 fn start(
275 &self, io: &dyn NetworkContext,
276 sync_handler: &SynchronizationProtocolHandler,
277 ) {
278 info!("start phase {:?}", self.name());
279 let (_, cur_era_genesis_height) =
280 self.graph.get_genesis_hash_and_height_in_current_era();
281 *sync_handler.latest_epoch_requested.lock() =
282 (cur_era_genesis_height, Instant::now(), 0, 0);
283
284 sync_handler.request_epochs(io);
286 }
287}
288
289pub struct CatchUpCheckpointPhase {
290 state_sync: Arc<SnapshotChunkSync>,
291
292 has_state: AtomicBool,
295}
296
297impl CatchUpCheckpointPhase {
298 pub fn new(state_sync: Arc<SnapshotChunkSync>) -> Self {
299 CatchUpCheckpointPhase {
300 state_sync,
301 has_state: AtomicBool::new(false),
302 }
303 }
304}
305
306impl SynchronizationPhaseTrait for CatchUpCheckpointPhase {
307 fn name(&self) -> &'static str { "CatchUpCheckpointPhase" }
308
309 fn phase_type(&self) -> SyncPhaseType { SyncPhaseType::CatchUpCheckpoint }
310
311 fn next(
312 &self, io: &dyn NetworkContext,
313 sync_handler: &SynchronizationProtocolHandler,
314 ) -> SyncPhaseType {
315 if self.has_state.load(AtomicOrdering::SeqCst) {
316 return SyncPhaseType::CatchUpFillBlockBodyPhase;
317 }
318 let epoch_to_sync = sync_handler.graph.consensus.get_to_sync_epoch_id();
319 let current_era_genesis = sync_handler
320 .graph
321 .data_man
322 .get_cur_consensus_era_genesis_hash();
323 self.state_sync.update_status(
324 current_era_genesis,
325 epoch_to_sync,
326 io,
327 sync_handler,
328 );
329 if self.state_sync.status() == Status::Completed {
330 self.state_sync.restore_execution_state(sync_handler);
331 *sync_handler.synced_epoch_id.lock() = Some(epoch_to_sync);
332 SyncPhaseType::CatchUpFillBlockBodyPhase
333 } else {
334 self.phase_type()
335 }
336 }
337
338 fn start(
339 &self, io: &dyn NetworkContext,
340 sync_handler: &SynchronizationProtocolHandler,
341 ) {
342 info!("start phase {:?}", self.name());
343 sync_handler.graph.inner.write().locked_for_catchup = true;
344 while sync_handler.graph.is_consensus_worker_busy() {
345 thread::sleep(time::Duration::from_millis(100));
346 }
347 let current_era_genesis = sync_handler
348 .graph
349 .data_man
350 .get_cur_consensus_era_genesis_hash();
351 let epoch_to_sync = sync_handler.graph.consensus.get_to_sync_epoch_id();
352
353 if let Some(commitment) = sync_handler
356 .graph
357 .data_man
358 .load_epoch_execution_commitment_from_db(&epoch_to_sync)
359 {
360 info!("CatchUpCheckpointPhase: commitment for epoch {:?} exists, skip state sync. \
361 commitment={:?}", epoch_to_sync, commitment);
362 self.has_state.store(true, AtomicOrdering::SeqCst);
363
364 if epoch_to_sync != sync_handler.graph.data_man.true_genesis.hash()
370 {
371 *sync_handler.synced_epoch_id.lock() = Some(epoch_to_sync);
372 }
373 return;
374 }
375
376 self.state_sync.update_status(
377 current_era_genesis,
378 epoch_to_sync,
379 io,
380 sync_handler,
381 );
382 }
383}
384
385pub struct CatchUpFillBlockBodyPhase {
386 pub graph: SharedSynchronizationGraph,
387}
388
389impl CatchUpFillBlockBodyPhase {
390 pub fn new(graph: SharedSynchronizationGraph) -> Self {
391 CatchUpFillBlockBodyPhase { graph }
392 }
393}
394
395impl SynchronizationPhaseTrait for CatchUpFillBlockBodyPhase {
396 fn name(&self) -> &'static str { "CatchUpFillBlockBodyPhase" }
397
398 fn phase_type(&self) -> SyncPhaseType {
399 SyncPhaseType::CatchUpFillBlockBodyPhase
400 }
401
402 fn next(
403 &self, io: &dyn NetworkContext,
404 sync_handler: &SynchronizationProtocolHandler,
405 ) -> SyncPhaseType {
406 if self.graph.is_fill_block_completed() {
407 if self.graph.complete_filling_block_bodies() {
408 return SyncPhaseType::CatchUpSyncBlock;
409 } else {
410 sync_handler.request_block_bodies(io)
413 }
414 }
415 self.phase_type()
416 }
417
418 fn start(
419 &self, io: &dyn NetworkContext,
420 sync_handler: &SynchronizationProtocolHandler,
421 ) {
422 info!("start phase {:?}", self.name());
423 {
424 let full_state_start_height = self
425 .graph
426 .data_man
427 .storage_manager
428 .config()
429 .full_state_start_height();
430 let full_state_space = self
431 .graph
432 .data_man
433 .storage_manager
434 .config()
435 .single_mpt_space;
436 if let Some(epoch_synced) = &*sync_handler.synced_epoch_id.lock() {
442 let epoch_synced_height = self
443 .graph
444 .data_man
445 .block_header_by_hash(epoch_synced)
446 .expect("Header for checkpoint exists")
447 .height();
448 *self.graph.data_man.state_availability_boundary.write() =
449 StateAvailabilityBoundary::new(
450 *epoch_synced,
451 epoch_synced_height,
452 full_state_start_height,
453 full_state_space,
454 );
455 self.graph
456 .data_man
457 .state_availability_boundary
458 .write()
459 .set_synced_state_height(epoch_synced_height);
460 } else {
461 let cur_era_stable_hash =
462 self.graph.data_man.get_cur_consensus_era_stable_hash();
463 let cur_era_stable_height = self
464 .graph
465 .data_man
466 .block_header_by_hash(&cur_era_stable_hash)
467 .expect("stable era block header must exist")
468 .height();
469 *self.graph.data_man.state_availability_boundary.write() =
470 StateAvailabilityBoundary::new(
471 cur_era_stable_hash,
472 cur_era_stable_height,
473 full_state_start_height,
474 full_state_space,
475 );
476 }
477 self.graph.inner.write().block_to_fill_set =
478 self.graph.consensus.get_blocks_needing_bodies();
479 sync_handler.request_block_bodies(io);
480 }
481 }
482}
483
484pub struct CatchUpSyncBlockPhase {
485 pub syn: Arc<SynchronizationState>,
486 pub graph: SharedSynchronizationGraph,
487}
488
489impl CatchUpSyncBlockPhase {
490 pub fn new(
491 syn: Arc<SynchronizationState>, graph: SharedSynchronizationGraph,
492 ) -> Self {
493 CatchUpSyncBlockPhase { syn, graph }
494 }
495}
496
497impl SynchronizationPhaseTrait for CatchUpSyncBlockPhase {
498 fn name(&self) -> &'static str { "CatchUpSyncBlockPhase" }
499
500 fn phase_type(&self) -> SyncPhaseType { SyncPhaseType::CatchUpSyncBlock }
501
502 fn next(
503 &self, _io: &dyn NetworkContext,
504 sync_handler: &SynchronizationProtocolHandler,
505 ) -> SyncPhaseType {
506 let median_epoch = match self.syn.median_epoch_from_normal_peers() {
508 None => {
509 return if self.syn.allow_phase_change_without_peer() {
510 sync_handler.graph.consensus.enter_normal_phase();
511 SyncPhaseType::Normal
512 } else {
513 self.phase_type()
514 }
515 }
516 Some(epoch) => epoch,
517 };
518 if self.graph.consensus.best_epoch_number()
521 + CATCH_UP_EPOCH_LAG_THRESHOLD
522 >= median_epoch
523 {
524 sync_handler.graph.consensus.enter_normal_phase();
525 return SyncPhaseType::Normal;
526 }
527
528 self.phase_type()
529 }
530
531 fn start(
532 &self, io: &dyn NetworkContext,
533 sync_handler: &SynchronizationProtocolHandler,
534 ) {
535 info!("start phase {:?}", self.name());
536 let (_, cur_era_genesis_height) =
537 self.graph.get_genesis_hash_and_height_in_current_era();
538 *sync_handler.latest_epoch_requested.lock() =
539 (cur_era_genesis_height, Instant::now(), 0, 0);
540
541 sync_handler.request_epochs(io);
542 }
543}
544
545pub struct NormalSyncPhase {
546 _consensus: Arc<ConsensusGraph>,
547}
548
549impl NormalSyncPhase {
550 pub fn new(consensus: Arc<ConsensusGraph>) -> Self {
551 NormalSyncPhase {
552 _consensus: consensus,
553 }
554 }
555}
556
557impl SynchronizationPhaseTrait for NormalSyncPhase {
558 fn name(&self) -> &'static str { "NormalSyncPhase" }
559
560 fn phase_type(&self) -> SyncPhaseType { SyncPhaseType::Normal }
561
562 fn next(
563 &self, _io: &dyn NetworkContext,
564 _sync_handler: &SynchronizationProtocolHandler,
565 ) -> SyncPhaseType {
566 self.phase_type()
568 }
569
570 fn start(
571 &self, io: &dyn NetworkContext,
572 sync_handler: &SynchronizationProtocolHandler,
573 ) {
574 info!("start phase {:?}", self.name());
575 sync_handler.request_missing_terminals(io);
576 }
577}