1use std::{
6 cmp::max,
7 collections::{BinaryHeap, HashMap, HashSet, VecDeque},
8 mem, panic,
9 sync::{
10 atomic::{AtomicUsize, Ordering},
11 Arc,
12 },
13 thread,
14 time::{Duration, SystemTime, UNIX_EPOCH},
15};
16
17use cfx_parameters::consensus_internal::ELASTICITY_MULTIPLIER;
18use futures::executor::block_on;
19use parking_lot::RwLock;
20use slab::Slab;
21use tokio::sync::mpsc::error::TryRecvError;
22use unexpected::{Mismatch, OutOfBounds};
23
24use cfx_executor::machine::Machine;
25use cfx_types::{H256, U256};
26use cfxcore_errors::ProviderBlockError;
27use dag::{Graph, RichDAG, RichTreeGraph, TreeGraph, DAG};
28use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
29use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
30use metrics::{
31 register_meter_with_group, register_queue, Meter, MeterTimer, Queue,
32};
33use primitives::{
34 pos::PosBlockId, transaction::SignedTransaction, Block, BlockHeader,
35 EpochNumber,
36};
37
38use crate::{
39 block_data_manager::{BlockDataManager, BlockStatus},
40 channel::Channel,
41 consensus::{pos_handler::PosVerifier, SharedConsensusGraph},
42 core_error::{BlockError, CoreError as Error},
43 pow::{PowComputer, ProofOfWorkConfig},
44 state_exposer::{SyncGraphBlockState, STATE_EXPOSER},
45 statistics::SharedStatistics,
46 sync::synchronization_protocol_handler::FutureBlockContainer,
47 verification::*,
48 Notifications,
49};
50
51lazy_static! {
52 static ref SYNC_INSERT_HEADER: Arc<dyn Meter> =
53 register_meter_with_group("timer", "sync::insert_block_header");
54 static ref SYNC_INSERT_BLOCK: Arc<dyn Meter> =
55 register_meter_with_group("timer", "sync::insert_block");
56 static ref CONSENSUS_WORKER_QUEUE: Arc<dyn Queue> =
57 register_queue("consensus_worker_queue");
58}
59
60const NULL: usize = !0;
61const BLOCK_INVALID: u8 = 0;
62const BLOCK_HEADER_ONLY: u8 = 1;
63const BLOCK_HEADER_GRAPH_READY: u8 = 2;
64const BLOCK_GRAPH_READY: u8 = 3;
65
66#[derive(Copy, Clone)]
67pub struct SyncGraphConfig {
68 pub future_block_buffer_capacity: usize,
69 pub enable_state_expose: bool,
70 pub is_consortium: bool,
71}
72
73#[derive(Debug)]
74pub struct SyncGraphStatistics {
75 pub inserted_block_count: usize,
76 pub inserted_header_count: usize,
77}
78
79impl SyncGraphStatistics {
80 pub fn new() -> SyncGraphStatistics {
81 SyncGraphStatistics {
82 inserted_header_count: 1,
84 inserted_block_count: 1,
85 }
86 }
87
88 pub fn clear(&mut self) {
89 self.inserted_header_count = 1;
90 self.inserted_block_count = 1;
91 }
92}
93
94#[derive(DeriveMallocSizeOf)]
95pub struct SynchronizationGraphNode {
96 pub block_header: Arc<BlockHeader>,
97 pub graph_status: u8,
99 pub block_ready: bool,
101 pub parent_reclaimed: bool,
103 pub parent: usize,
105 pub children: Vec<usize>,
107 pub referees: Vec<usize>,
109 pub pending_referee_count: usize,
112 pub referrers: Vec<usize>,
114 pub last_update_timestamp: u64,
116}
117
118#[derive(DeriveMallocSizeOf)]
119pub struct UnreadyBlockFrontier {
120 frontier: HashSet<usize>,
121 updated: bool,
122}
123
124impl UnreadyBlockFrontier {
125 fn new() -> Self {
126 UnreadyBlockFrontier {
127 frontier: HashSet::new(),
128 updated: false,
129 }
130 }
131
132 pub fn reset_update_state(&mut self) { self.updated = false; }
133
134 pub fn updated(&self) -> bool { self.updated }
135
136 pub fn get_frontier(&self) -> &HashSet<usize> { &self.frontier }
137
138 pub fn remove(&mut self, index: &usize) -> bool {
139 self.updated = true;
140 self.frontier.remove(index)
141 }
142
143 pub fn contains(&self, index: &usize) -> bool {
144 self.frontier.contains(index)
145 }
146
147 pub fn insert(&mut self, index: usize) -> bool {
148 self.updated = true;
149 self.frontier.insert(index)
150 }
151
152 pub fn len(&self) -> usize { self.frontier.len() }
153}
154
155pub struct SynchronizationGraphInner {
156 pub arena: Slab<SynchronizationGraphNode>,
157 pub hash_to_arena_indices: HashMap<H256, usize>,
158 pub data_man: Arc<BlockDataManager>,
159 children_by_hash: HashMap<H256, Vec<usize>>,
160 referrers_by_hash: HashMap<H256, Vec<usize>>,
161 pub pow_config: ProofOfWorkConfig,
162 pub pow: Arc<PowComputer>,
163 pub config: SyncGraphConfig,
164 pub not_ready_blocks_frontier: UnreadyBlockFrontier,
170
171 pub pos_not_ready_blocks_frontier: HashSet<usize>,
175 pub old_era_blocks_frontier: VecDeque<usize>,
176 pub old_era_blocks_frontier_set: HashSet<usize>,
177
178 pub locked_for_catchup: bool,
182 pub block_to_fill_set: HashSet<H256>,
185 machine: Arc<Machine>,
186 pub pos_verifier: Arc<PosVerifier>,
187}
188
189impl MallocSizeOf for SynchronizationGraphInner {
190 fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
191 self.arena.size_of(ops)
192 + self.hash_to_arena_indices.size_of(ops)
193 + self.data_man.size_of(ops)
194 + self.children_by_hash.size_of(ops)
195 + self.referrers_by_hash.size_of(ops)
196 + self.pow_config.size_of(ops)
197 + self.not_ready_blocks_frontier.size_of(ops)
198 + self.old_era_blocks_frontier.size_of(ops)
199 + self.old_era_blocks_frontier_set.size_of(ops)
200 }
202}
203
204impl SynchronizationGraphInner {
205 pub fn with_genesis_block(
206 genesis_header: Arc<BlockHeader>, pow_config: ProofOfWorkConfig,
207 pow: Arc<PowComputer>, config: SyncGraphConfig,
208 data_man: Arc<BlockDataManager>, machine: Arc<Machine>,
209 pos_verifier: Arc<PosVerifier>,
210 ) -> Self {
211 let mut inner = SynchronizationGraphInner {
212 arena: Slab::new(),
213 hash_to_arena_indices: HashMap::new(),
214 data_man,
215 children_by_hash: HashMap::new(),
216 referrers_by_hash: HashMap::new(),
217 pow_config,
218 pow,
219 config,
220 not_ready_blocks_frontier: UnreadyBlockFrontier::new(),
221 pos_not_ready_blocks_frontier: Default::default(),
222 old_era_blocks_frontier: Default::default(),
223 old_era_blocks_frontier_set: Default::default(),
224 block_to_fill_set: Default::default(),
225 locked_for_catchup: false,
226 machine,
227 pos_verifier,
228 };
229 let genesis_hash = genesis_header.hash();
230 let genesis_block_index = inner.insert(genesis_header);
231 debug!(
232 "genesis block {:?} has index {} in sync graph",
233 genesis_hash, genesis_block_index
234 );
235
236 inner.old_era_blocks_frontier.push_back(genesis_block_index);
237 inner
238 .old_era_blocks_frontier_set
239 .insert(genesis_block_index);
240
241 inner
242 }
243
244 fn get_genesis_in_current_era(&self) -> usize {
245 let genesis_hash = self.data_man.get_cur_consensus_era_genesis_hash();
246 *self.hash_to_arena_indices.get(&genesis_hash).unwrap()
247 }
248
249 pub fn get_genesis_hash_and_height_in_current_era(&self) -> (H256, u64) {
250 let era_genesis = self.get_genesis_in_current_era();
251 (
252 self.arena[era_genesis].block_header.hash(),
253 self.arena[era_genesis].block_header.height(),
254 )
255 }
256
257 pub fn get_stable_hash_and_height_in_current_era(&self) -> (H256, u64) {
258 let stable_hash = self.data_man.get_cur_consensus_era_stable_hash();
259 let height = self
263 .data_man
264 .block_header_by_hash(&stable_hash)
265 .expect("stable block must exist in data manager")
266 .height();
267 (stable_hash, height)
268 }
269
270 fn try_clear_old_era_blocks(&mut self) {
271 let max_num_of_cleared_blocks = 2;
272 let mut num_cleared = 0;
273 let era_genesis = self.get_genesis_in_current_era();
274 let genesis_seq_num = self
275 .data_man
276 .local_block_info_by_hash(
277 &self.data_man.get_cur_consensus_era_genesis_hash(),
278 )
279 .expect("local_block_info for genesis must exist")
280 .get_seq_num();
281 let mut era_genesis_in_frontier = false;
282
283 while let Some(index) = self.old_era_blocks_frontier.pop_front() {
284 if index == era_genesis {
285 era_genesis_in_frontier = true;
286 continue;
287 }
288
289 if !self.old_era_blocks_frontier_set.contains(&index) {
291 continue;
292 }
293
294 let hash = self.arena[index].block_header.hash();
295 assert!(self.arena[index].parent == NULL);
296
297 if !self.is_graph_ready_in_db(&hash, genesis_seq_num) {
298 continue;
306 }
307
308 let referees: Vec<usize> =
309 self.arena[index].referees.iter().map(|x| *x).collect();
310 for referee in referees {
311 self.arena[referee].referrers.retain(|&x| x != index);
312 }
313 let referee_hashes: Vec<H256> = self.arena[index]
314 .block_header
315 .referee_hashes()
316 .iter()
317 .map(|x| *x)
318 .collect();
319 for referee_hash in referee_hashes {
320 let mut remove_referee_hash: bool = false;
321 if let Some(referrers) =
322 self.referrers_by_hash.get_mut(&referee_hash)
323 {
324 referrers.retain(|&x| x != index);
325 remove_referee_hash = referrers.len() == 0;
326 }
327 if remove_referee_hash {
329 self.referrers_by_hash.remove(&referee_hash);
330 }
331 }
332
333 let children: Vec<usize> =
334 self.arena[index].children.iter().map(|x| *x).collect();
335 for child in children {
336 self.arena[child].parent = NULL;
337 self.arena[child].parent_reclaimed = true;
338 self.old_era_blocks_frontier.push_back(child);
343 assert!(!self.old_era_blocks_frontier_set.contains(&child));
344 self.old_era_blocks_frontier_set.insert(child);
345 }
346
347 let referrers: Vec<usize> =
348 self.arena[index].referrers.iter().map(|x| *x).collect();
349 for referrer in referrers {
350 self.arena[referrer].referees.retain(|&x| x != index);
351 }
352
353 self.old_era_blocks_frontier_set.remove(&index);
354 self.arena.remove(index);
355 self.hash_to_arena_indices.remove(&hash);
356 self.data_man
358 .remove_block_header(&hash, false );
359
360 num_cleared += 1;
361 if num_cleared == max_num_of_cleared_blocks {
362 break;
363 }
364 }
365
366 if era_genesis_in_frontier {
367 self.old_era_blocks_frontier.push_front(era_genesis);
368 }
369 }
370
371 pub fn insert_invalid(&mut self, header: Arc<BlockHeader>) -> usize {
372 let hash = header.hash();
373 let me = self.arena.insert(SynchronizationGraphNode {
374 graph_status: BLOCK_INVALID,
375 block_ready: false,
376 parent_reclaimed: false,
377 parent: NULL,
378 children: Vec::new(),
379 referees: Vec::new(),
380 pending_referee_count: 0,
381 referrers: Vec::new(),
382 block_header: header,
383 last_update_timestamp: SystemTime::now()
384 .duration_since(UNIX_EPOCH)
385 .unwrap()
386 .as_secs(),
387 });
388 self.hash_to_arena_indices.insert(hash, me);
389
390 if let Some(children) = self.children_by_hash.remove(&hash) {
391 for child in &children {
392 self.arena[*child].parent = me;
393 }
394 self.arena[me].children = children;
395 }
396 if let Some(referrers) = self.referrers_by_hash.remove(&hash) {
397 for referrer in &referrers {
398 let ref mut node_referrer = self.arena[*referrer];
399 node_referrer.referees.push(me);
400 debug_assert!(node_referrer.pending_referee_count > 0);
401 if node_referrer.pending_referee_count > 0 {
402 node_referrer.pending_referee_count =
403 node_referrer.pending_referee_count - 1;
404 }
405 }
406 self.arena[me].referrers = referrers;
407 }
408
409 me
410 }
411
412 pub fn insert(&mut self, header: Arc<BlockHeader>) -> usize {
414 let hash = header.hash();
415 let is_genesis =
416 hash == self.data_man.get_cur_consensus_era_genesis_hash();
417
418 let me = self.arena.insert(SynchronizationGraphNode {
419 graph_status: if is_genesis {
420 BLOCK_GRAPH_READY
421 } else {
422 BLOCK_HEADER_ONLY
423 },
424 block_ready: is_genesis,
425 parent_reclaimed: false,
426 parent: NULL,
427 children: Vec::new(),
428 referees: Vec::new(),
429 pending_referee_count: 0,
430 referrers: Vec::new(),
431 block_header: header.clone(),
432 last_update_timestamp: SystemTime::now()
433 .duration_since(UNIX_EPOCH)
434 .unwrap()
435 .as_secs(),
436 });
437 self.hash_to_arena_indices.insert(hash, me);
438
439 if !is_genesis {
440 let parent_hash = header.parent_hash().clone();
441 if let Some(parent) =
442 self.hash_to_arena_indices.get(&parent_hash).cloned()
443 {
444 self.arena[me].parent = parent;
445 self.arena[parent].children.push(me);
446 } else {
447 self.children_by_hash
448 .entry(parent_hash)
449 .or_insert(Vec::new())
450 .push(me);
451 }
452 }
453 for referee_hash in header.referee_hashes() {
454 if let Some(referee) =
455 self.hash_to_arena_indices.get(referee_hash).cloned()
456 {
457 self.arena[me].referees.push(referee);
458 self.arena[referee].referrers.push(me);
459 } else {
460 self.arena[me].pending_referee_count =
461 self.arena[me].pending_referee_count + 1;
462 self.referrers_by_hash
463 .entry(*referee_hash)
464 .or_insert(Vec::new())
465 .push(me);
466 }
467 }
468
469 if let Some(children) = self.children_by_hash.remove(&hash) {
470 for child in &children {
471 self.arena[*child].parent = me;
472 }
473 self.arena[me].children = children;
474 }
475 if let Some(referrers) = self.referrers_by_hash.remove(&hash) {
476 for referrer in &referrers {
477 let ref mut node_referrer = self.arena[*referrer];
478 node_referrer.referees.push(me);
479 debug_assert!(node_referrer.pending_referee_count > 0);
480 if node_referrer.pending_referee_count > 0 {
481 node_referrer.pending_referee_count =
482 node_referrer.pending_referee_count - 1;
483 }
484 }
485 self.arena[me].referrers = referrers;
486 }
487
488 me
489 }
490
491 fn is_graph_ready_in_db(
494 &self, parent_or_referee_hash: &H256, genesis_seq_num: u64,
495 ) -> bool {
496 if let Some(info) = self
497 .data_man
498 .local_block_info_by_hash(parent_or_referee_hash)
499 {
500 if info.get_status() == BlockStatus::Invalid {
501 false
502 } else {
503 info.get_seq_num() < genesis_seq_num
504 || info.get_instance_id() == self.data_man.get_instance_id()
505 }
506 } else {
507 false
508 }
509 }
510
511 fn new_to_be_graph_ready(
512 &mut self, index: usize, minimal_status: u8,
513 ) -> bool {
514 let ref node_me = self.arena[index];
515 if node_me.graph_status >= minimal_status {
521 return false;
522 }
523
524 let genesis_hash = self.data_man.get_cur_consensus_era_genesis_hash();
525 let genesis_seq_num = self
526 .data_man
527 .local_block_info_by_hash(&genesis_hash)
528 .expect("local_block_info for genesis must exist")
529 .get_seq_num();
530 let parent = self.arena[index].parent;
531 let parent_graph_ready = if parent == NULL {
532 self.arena[index].parent_reclaimed
533 || self.is_graph_ready_in_db(
534 self.arena[index].block_header.parent_hash(),
535 genesis_seq_num,
536 )
537 } else {
538 self.arena[parent].graph_status >= minimal_status
539 };
540
541 if !parent_graph_ready {
542 return false;
543 } else if parent == NULL {
544 self.arena[index].parent_reclaimed = true;
545 }
546
547 let mut referee_hash_in_mem = HashSet::new();
554 for referee in self.arena[index].referees.iter() {
555 if self.arena[*referee].graph_status < minimal_status {
556 return false;
557 } else {
558 referee_hash_in_mem
559 .insert(self.arena[*referee].block_header.hash());
560 }
561 }
562
563 for referee_hash in self.arena[index].block_header.referee_hashes() {
564 if !referee_hash_in_mem.contains(referee_hash) {
565 if !self.is_graph_ready_in_db(referee_hash, genesis_seq_num) {
566 return false;
567 }
568 }
569 }
570
571 if !self.is_pos_reference_graph_ready(
572 index,
573 genesis_seq_num,
574 minimal_status,
575 ) {
576 debug!(
577 "Block {:?} not ready for its pos_reference: {:?}",
578 self.arena[index].block_header.hash(),
579 self.pos_verifier.get_pivot_decision(
580 self.arena[index]
581 .block_header
582 .pos_reference()
583 .as_ref()
584 .unwrap()
585 )
586 );
587 self.pos_not_ready_blocks_frontier.insert(index);
589 return false;
590 }
591
592 true
594 }
595
596 fn new_to_be_header_graph_ready(&mut self, index: usize) -> bool {
597 self.new_to_be_graph_ready(index, BLOCK_HEADER_GRAPH_READY)
598 }
599
600 fn new_to_be_block_graph_ready(&mut self, index: usize) -> bool {
601 self.new_to_be_graph_ready(index, BLOCK_GRAPH_READY)
602 && self.arena[index].block_ready
603 }
604
605 fn is_pos_reference_graph_ready(
606 &self, index: usize, genesis_seq_num: u64, minimal_status: u8,
607 ) -> bool {
608 match self.arena[index].block_header.pos_reference() {
610 Some(pos_reference) => {
613 match self.pos_verifier.get_pivot_decision(pos_reference) {
614 None => false,
616 Some(pivot_decision) => {
617 match self.hash_to_arena_indices.get(&pivot_decision) {
619 None => self.is_graph_ready_in_db(
620 &pivot_decision,
621 genesis_seq_num,
622 ),
623 Some(index) => {
624 self.arena[*index].graph_status
625 >= minimal_status
626 }
627 }
628 }
629 }
630 }
631 None => true,
632 }
633 }
634
635 fn get_parent_and_referee_info(
639 &self, index: usize,
640 ) -> (u64, u64, U256, U256, Vec<Option<PosBlockId>>) {
641 let parent_height;
642 let parent_timestamp;
643 let parent_gas_limit;
644 let parent_difficulty;
645 let mut pos_references = Vec::new();
648 let parent = self.arena[index].parent;
649
650 if parent != NULL {
652 parent_height = self.arena[parent].block_header.height();
653 parent_timestamp = self.arena[parent].block_header.timestamp();
654 parent_gas_limit = *self.arena[parent].block_header.gas_limit();
655 parent_difficulty = *self.arena[parent].block_header.difficulty();
656 pos_references
657 .push(self.arena[parent].block_header.pos_reference().clone())
658 } else {
659 let parent_hash = self.arena[index].block_header.parent_hash();
660 let parent_header = self
661 .data_man
662 .block_header_by_hash(parent_hash)
663 .unwrap()
664 .clone();
665 parent_height = parent_header.height();
666 parent_timestamp = parent_header.timestamp();
667 parent_gas_limit = *parent_header.gas_limit();
668 parent_difficulty = *parent_header.difficulty();
669 pos_references.push(parent_header.pos_reference().clone());
670 }
671
672 let mut referee_hash_in_mem = HashSet::new();
674 for referee in self.arena[index].referees.iter() {
675 pos_references.push(
676 self.arena[*referee].block_header.pos_reference().clone(),
677 );
678 referee_hash_in_mem
679 .insert(self.arena[*referee].block_header.hash());
680 }
681
682 for referee_hash in self.arena[index].block_header.referee_hashes() {
683 if !referee_hash_in_mem.contains(referee_hash) {
684 let referee_header = self
685 .data_man
686 .block_header_by_hash(referee_hash)
687 .unwrap()
688 .clone();
689 pos_references.push(referee_header.pos_reference().clone());
690 }
691 }
692
693 (
694 parent_height,
695 parent_timestamp,
696 parent_gas_limit,
697 parent_difficulty,
698 pos_references,
699 )
700 }
701
702 fn verify_header_graph_ready_block(
703 &self, index: usize,
704 ) -> Result<(), Error> {
705 let epoch = self.arena[index].block_header.height();
706 let (
707 parent_height,
708 parent_timestamp,
709 parent_gas_limit,
710 parent_difficulty,
711 predecessor_pos_references,
712 ) = self.get_parent_and_referee_info(index);
713
714 if parent_height + 1 != epoch {
716 warn!("Invalid height. mine {}, parent {}", epoch, parent_height);
717 return Err(From::from(BlockError::InvalidHeight(Mismatch {
718 expected: parent_height + 1,
719 found: epoch,
720 })));
721 }
722
723 let my_timestamp = self.arena[index].block_header.timestamp();
742 if parent_timestamp > my_timestamp {
743 let my_timestamp = UNIX_EPOCH + Duration::from_secs(my_timestamp);
744 let pa_timestamp =
745 UNIX_EPOCH + Duration::from_secs(parent_timestamp);
746
747 warn!("Invalid timestamp: parent {:?} timestamp {}, me {:?} timestamp {}",
748 self.arena[index].block_header.parent_hash().clone(),
749 parent_timestamp,
750 self.arena[index].block_header.hash(),
751 self.arena[index].block_header.timestamp());
752 return Err(From::from(BlockError::InvalidTimestamp(
753 OutOfBounds {
754 max: Some(my_timestamp),
755 min: Some(pa_timestamp),
756 found: my_timestamp,
757 },
758 )));
759 }
760
761 let parent_gas_limit = parent_gas_limit
762 * if epoch == self.machine.params().transition_heights.cip1559 {
763 ELASTICITY_MULTIPLIER
764 } else {
765 1
766 };
767
768 let self_gas_limit = *self.arena[index].block_header.gas_limit();
770 let gas_limit_divisor = self.machine.params().gas_limit_bound_divisor;
771 let min_gas_limit = self.machine.params().min_gas_limit;
772 let gas_upper =
773 parent_gas_limit + parent_gas_limit / gas_limit_divisor - 1;
774 let gas_lower = max(
775 parent_gas_limit - parent_gas_limit / gas_limit_divisor + 1,
776 min_gas_limit,
777 );
778
779 if self_gas_limit < gas_lower || self_gas_limit > gas_upper {
780 return Err(From::from(BlockError::InvalidGasLimit(OutOfBounds {
781 min: Some(gas_lower),
782 max: Some(gas_upper),
783 found: self_gas_limit,
784 })));
785 }
786
787 if !self.config.is_consortium {
788 let mut difficulty_invalid = false;
790 let my_diff = *self.arena[index].block_header.difficulty();
791 let mut min_diff = my_diff;
792 let mut max_diff = my_diff;
793 let initial_difficulty: U256 =
794 self.pow_config.initial_difficulty.into();
795
796 if parent_height
797 < self
798 .pow_config
799 .difficulty_adjustment_epoch_period(parent_height)
800 {
801 if my_diff != initial_difficulty {
802 difficulty_invalid = true;
803 min_diff = initial_difficulty;
804 max_diff = initial_difficulty;
805 }
806 } else {
807 let last_period_upper = (parent_height
808 / self
809 .pow_config
810 .difficulty_adjustment_epoch_period(parent_height))
811 * self
812 .pow_config
813 .difficulty_adjustment_epoch_period(parent_height);
814 if last_period_upper != parent_height {
815 if my_diff != parent_difficulty {
817 difficulty_invalid = true;
818 min_diff = parent_difficulty;
819 max_diff = parent_difficulty;
820 }
821 } else {
822 let (lower, upper) =
823 self.pow_config.get_adjustment_bound(parent_difficulty);
824 min_diff = lower;
825 max_diff = upper;
826 if my_diff < min_diff || my_diff > max_diff {
827 difficulty_invalid = true;
828 }
829 }
830 }
831
832 if difficulty_invalid {
833 return Err(From::from(BlockError::InvalidDifficulty(
834 OutOfBounds {
835 min: Some(min_diff),
836 max: Some(max_diff),
837 found: my_diff,
838 },
839 )));
840 }
841 }
842
843 if let Some(pos_reference) =
844 self.arena[index].block_header.pos_reference()
845 {
846 let mut pred_pos_ref_list = Vec::new();
847 for maybe_pos_ref in predecessor_pos_references {
848 if let Some(pos_ref) = maybe_pos_ref {
849 pred_pos_ref_list.push(pos_ref);
850 }
851 }
852 if !self
853 .pos_verifier
854 .verify_against_predecessors(pos_reference, &pred_pos_ref_list)
855 {
856 bail!(BlockError::InvalidPosReference);
857 }
858 }
859
860 Ok(())
861 }
862
863 fn verify_graph_ready_block(
864 &self, index: usize, verification_config: &VerificationConfig,
865 ) -> Result<(), Error> {
866 let block_header = &self.arena[index].block_header;
867 let parent = self
868 .data_man
869 .block_header_by_hash(block_header.parent_hash())
870 .expect("headers will not be deleted");
871 let block = self
872 .data_man
873 .block_by_hash(&block_header.hash(), true)
874 .expect("received");
875 verification_config.verify_sync_graph_ready_block(&block, &parent)
876 }
877
878 fn process_invalid_blocks(&mut self, invalid_set: &HashSet<usize>) {
879 for index in invalid_set {
880 let hash = self.arena[*index].block_header.hash();
881 self.data_man.invalidate_block(hash);
884 }
885 self.remove_blocks(&invalid_set);
886 }
887
888 fn remove_blocks(&mut self, to_remove_set: &HashSet<usize>) {
889 for index in to_remove_set {
890 let hash = self.arena[*index].block_header.hash();
891 self.not_ready_blocks_frontier.remove(index);
892 self.pos_not_ready_blocks_frontier.remove(index);
893 self.old_era_blocks_frontier_set.remove(index);
894 self.block_to_fill_set.remove(&hash);
897
898 let parent = self.arena[*index].parent;
899 if parent != NULL {
900 self.arena[parent].children.retain(|&x| x != *index);
901 }
902 let parent_hash = *self.arena[*index].block_header.parent_hash();
903 let mut remove_parent_hash: bool = false;
904 if let Some(children) = self.children_by_hash.get_mut(&parent_hash)
905 {
906 children.retain(|&x| x != *index);
907 remove_parent_hash = children.len() == 0;
908 }
909 if remove_parent_hash {
911 self.children_by_hash.remove(&parent_hash);
912 }
913
914 let referees: Vec<usize> =
915 self.arena[*index].referees.iter().map(|x| *x).collect();
916 for referee in referees {
917 self.arena[referee].referrers.retain(|&x| x != *index);
918 }
919 let referee_hashes: Vec<H256> = self.arena[*index]
920 .block_header
921 .referee_hashes()
922 .iter()
923 .map(|x| *x)
924 .collect();
925 for referee_hash in referee_hashes {
926 let mut remove_referee_hash: bool = false;
927 if let Some(referrers) =
928 self.referrers_by_hash.get_mut(&referee_hash)
929 {
930 referrers.retain(|&x| x != *index);
931 remove_referee_hash = referrers.len() == 0;
932 }
933 if remove_referee_hash {
935 self.referrers_by_hash.remove(&referee_hash);
936 }
937 }
938
939 let children: Vec<usize> =
940 self.arena[*index].children.iter().map(|x| *x).collect();
941 for child in children {
942 debug_assert!(to_remove_set.contains(&child));
943 self.arena[child].parent = NULL;
944 }
945
946 let referrers: Vec<usize> =
947 self.arena[*index].referrers.iter().map(|x| *x).collect();
948 for referrer in referrers {
949 debug_assert!(to_remove_set.contains(&referrer));
950 self.arena[referrer].referees.retain(|&x| x != *index);
951 }
952
953 self.arena.remove(*index);
954 self.hash_to_arena_indices.remove(&hash);
955 self.data_man
957 .remove_useless_block(&hash, true );
958 }
959 }
960
961 fn set_and_propagate_invalid(
962 &mut self, queue: &mut VecDeque<usize>,
963 invalid_set: &mut HashSet<usize>, index: usize,
964 ) {
965 let children =
966 mem::replace(&mut self.arena[index].children, Vec::new());
967 for child in &children {
968 if !invalid_set.contains(&child) {
969 self.arena[*child].graph_status = BLOCK_INVALID;
970 queue.push_back(*child);
971 invalid_set.insert(*child);
972 }
973 }
974 self.arena[index].children = children;
975 let referrers =
976 mem::replace(&mut self.arena[index].referrers, Vec::new());
977 for referrer in &referrers {
978 if !invalid_set.contains(&referrer) {
979 self.arena[*referrer].graph_status = BLOCK_INVALID;
980 queue.push_back(*referrer);
981 invalid_set.insert(*referrer);
982 }
983 }
984 self.arena[index].referrers = referrers;
985 }
986}
987
988pub struct SynchronizationGraph {
989 pub inner: Arc<RwLock<SynchronizationGraphInner>>,
990 pub consensus: SharedConsensusGraph,
991 pub data_man: Arc<BlockDataManager>,
992 pub pow: Arc<PowComputer>,
993 pub verification_config: VerificationConfig,
994 pub sync_config: SyncGraphConfig,
995 pub statistics: SharedStatistics,
996 consensus_unprocessed_count: Arc<AtomicUsize>,
1000
1001 new_block_hashes: Arc<Channel<H256>>,
1004
1005 pub future_blocks: FutureBlockContainer,
1008
1009 machine: Arc<Machine>,
1010}
1011
1012impl MallocSizeOf for SynchronizationGraph {
1013 fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
1014 let inner_size = self.inner.read().size_of(ops);
1015 let mut malloc_size = inner_size + self.data_man.size_of(ops);
1016
1017 if !self.is_consortium() {
1019 let consensus_graph = &*self.consensus;
1020 malloc_size += consensus_graph.size_of(ops);
1021 }
1022 malloc_size
1025 }
1026}
1027
1028pub type SharedSynchronizationGraph = Arc<SynchronizationGraph>;
1029
1030impl SynchronizationGraph {
1031 pub fn new(
1032 consensus: SharedConsensusGraph, data_man: Arc<BlockDataManager>,
1033 statistics: SharedStatistics, verification_config: VerificationConfig,
1034 pow_config: ProofOfWorkConfig, pow: Arc<PowComputer>,
1035 sync_config: SyncGraphConfig, notifications: Arc<Notifications>,
1036 machine: Arc<Machine>, pos_verifier: Arc<PosVerifier>,
1037 ) -> Self {
1038 let genesis_hash = data_man.get_cur_consensus_era_genesis_hash();
1039 let genesis_block_header = data_man
1040 .block_header_by_hash(&genesis_hash)
1041 .expect("genesis block header should exist here");
1042
1043 let consensus_unprocessed_count = Arc::new(AtomicUsize::new(0));
1046 let mut consensus_receiver = notifications.new_block_hashes.subscribe();
1047 let inner = Arc::new(RwLock::new(
1048 SynchronizationGraphInner::with_genesis_block(
1049 genesis_block_header.clone(),
1050 pow_config,
1051 pow.clone(),
1052 sync_config,
1053 data_man.clone(),
1054 machine.clone(),
1055 pos_verifier.clone(),
1056 ),
1057 ));
1058 let sync_graph = SynchronizationGraph {
1059 inner: inner.clone(),
1060 future_blocks: FutureBlockContainer::new(
1061 sync_config.future_block_buffer_capacity,
1062 ),
1063 data_man: data_man.clone(),
1064 pow: pow.clone(),
1065 verification_config,
1066 sync_config,
1067 consensus: consensus.clone(),
1068 statistics: statistics.clone(),
1069 consensus_unprocessed_count: consensus_unprocessed_count.clone(),
1070 new_block_hashes: notifications.new_block_hashes.clone(),
1071 machine,
1072 };
1073
1074 thread::Builder::new()
1077 .name("Consensus Worker".into())
1078 .spawn(move || {
1079 let mut priority_queue: BinaryHeap<(u64, H256)> = BinaryHeap::new();
1085 let mut reverse_map : HashMap<H256, Vec<H256>> = HashMap::new();
1086 let mut counter_map = HashMap::new();
1087 let mut pos_started = false;
1088
1089 'outer: loop {
1090 let mut blocking = priority_queue.is_empty();
1092 'inner: loop {
1093 let maybe_item = if blocking {
1096 blocking = false;
1097 match block_on(consensus_receiver.recv()) {
1098 Some(item) => Ok(item),
1099 None => break 'outer,
1100 }
1101 } else {
1102 consensus_receiver.try_recv()
1103 };
1104
1105 match maybe_item {
1106 Ok(hash) => if !reverse_map.contains_key(&hash) {
1108 debug!("Worker thread receive: block = {}", hash);
1109 let header = data_man.block_header_by_hash(&hash).expect("Header must exist before sending to the consensus worker!");
1110
1111 if !pos_started && pos_verifier.is_enabled_at_height(header.height() + consensus.config().inner_conf.era_epoch_count) {
1113 if let Err(e) = pos_verifier.initialize(consensus.clone()) {
1114 info!("PoS cannot be started at the expected height: e={}", e);
1115 } else {
1116 pos_started = true;
1117 }
1118 }
1119
1120 let mut cnt: usize = 0;
1121 let parent_hash = header.parent_hash();
1122 if let Some(v) = reverse_map.get_mut(parent_hash) {
1123 v.push(hash.clone());
1124 cnt += 1;
1125 }
1126 for referee in header.referee_hashes() {
1127 if let Some(v) = reverse_map.get_mut(referee) {
1128 v.push(hash.clone());
1129 cnt += 1;
1130 }
1131 }
1132 if let Some(pivot_decision) = header.pos_reference().as_ref().and_then(|pos_reference| pos_verifier.get_pivot_decision(pos_reference)) {
1133 if let Some(v) = reverse_map.get_mut(&pivot_decision) {
1134 v.push(hash.clone());
1135 cnt += 1;
1136 }
1137 }
1138 reverse_map.insert(hash.clone(), Vec::new());
1139 if cnt == 0 {
1140 let epoch_number = consensus.get_block_epoch_number(parent_hash).unwrap_or(0);
1141 priority_queue.push((epoch_number, hash));
1142 } else {
1143 counter_map.insert(hash, cnt);
1144 }
1145 } else {
1146 warn!("Duplicate block = {} sent to the consensus worker", hash);
1147 },
1148 Err(TryRecvError::Empty) => break 'inner,
1149 Err(TryRecvError::Disconnected) => break 'outer,
1150 }
1151 }
1152 if let Some((_, hash)) = priority_queue.pop() {
1153 CONSENSUS_WORKER_QUEUE.dequeue(1);
1154 let successors = reverse_map.remove(&hash).unwrap();
1155 for succ in successors {
1156 let cnt_tuple = counter_map.get_mut(&succ).unwrap();
1157 *cnt_tuple -= 1;
1158 if *cnt_tuple == 0 {
1159 counter_map.remove(&succ);
1160 let header_succ = data_man.block_header_by_hash(&succ).expect("Header must exist before sending to the consensus worker!");
1161 let parent_succ = header_succ.parent_hash();
1162 let epoch_number = consensus.get_block_epoch_number(parent_succ).unwrap_or(0);
1163 priority_queue.push((epoch_number, succ));
1164 }
1165 }
1166 consensus.on_new_block(
1167 &hash,
1168 );
1169 consensus_unprocessed_count.fetch_sub(1, Ordering::SeqCst);
1170 }
1171 }
1172 })
1173 .expect("Cannot fail");
1174 sync_graph
1175 }
1176
1177 pub fn is_consortium(&self) -> bool { self.sync_config.is_consortium }
1178
1179 pub fn machine(&self) -> Arc<Machine> { self.machine.clone() }
1180
1181 pub fn get_genesis_hash_and_height_in_current_era(&self) -> (H256, u64) {
1182 self.inner
1183 .read()
1184 .get_genesis_hash_and_height_in_current_era()
1185 }
1186
1187 pub fn expected_difficulty(&self, parent_hash: &H256) -> U256 {
1190 self.consensus.expected_difficulty(parent_hash)
1191 }
1192
1193 pub fn get_to_propagate_trans(
1194 &self,
1195 ) -> HashMap<H256, Arc<SignedTransaction>> {
1196 self.consensus.tx_pool().get_to_be_propagated_transactions()
1197 }
1198
1199 pub fn set_to_propagate_trans(
1200 &self, transactions: HashMap<H256, Arc<SignedTransaction>>,
1201 ) {
1202 self.consensus
1203 .tx_pool()
1204 .set_to_be_propagated_transactions(transactions);
1205 }
1206
1207 pub fn recover_graph_from_db(&self) {
1212 info!("Start fast recovery of the block DAG from database");
1213
1214 let genesis_hash = self.data_man.get_cur_consensus_era_genesis_hash();
1217 let genesis_local_info =
1218 self.data_man.local_block_info_by_hash(&genesis_hash);
1219 if genesis_local_info.is_none() {
1220 panic!(
1222 "failed to get local block info from db for genesis[{}]",
1223 genesis_hash
1224 );
1225 }
1226 let genesis_seq_num = genesis_local_info.unwrap().get_seq_num();
1227 self.consensus.set_initial_sequence_number(genesis_seq_num);
1228 let genesis_header =
1229 self.data_man.block_header_by_hash(&genesis_hash).unwrap();
1230 debug!(
1231 "Get current genesis_block hash={:?}, height={}, seq_num={}",
1232 genesis_hash,
1233 genesis_header.height(),
1234 genesis_seq_num
1235 );
1236
1237 let terminals_opt = self.data_man.terminals_from_db();
1239 if terminals_opt.is_none() {
1240 return;
1241 }
1242 let terminals = terminals_opt.unwrap();
1243 debug!("Get terminals {:?}", terminals);
1244
1245 let mut queue = VecDeque::new();
1253 let mut visited_blocks: HashSet<H256> = HashSet::new();
1254 for terminal in terminals {
1255 if !visited_blocks.contains(&terminal) {
1257 queue.push_back(terminal);
1258 visited_blocks.insert(terminal);
1259 }
1260 }
1261
1262 let mut missed_hashes = HashSet::new();
1266 while let Some(hash) = queue.pop_front() {
1267 if hash == genesis_hash {
1268 continue;
1270 }
1271
1272 if let Some(block_local_info) =
1276 self.data_man.local_block_info_by_hash(&hash)
1277 {
1278 if block_local_info.get_seq_num() < genesis_seq_num {
1279 debug!(
1280 "Skip block {:?} before checkpoint: seq_num={}",
1281 hash,
1282 block_local_info.get_seq_num()
1283 );
1284 continue;
1285 }
1286 }
1287
1288 if let Some(block_header) =
1289 self.data_man.block_header_by_hash(&hash)
1290 {
1291 self.insert_block_header(
1292 &mut block_header.as_ref().clone(),
1293 true, false, true, false, );
1298 let parent = block_header.parent_hash().clone();
1299 let referees = block_header.referee_hashes().clone();
1300 if !visited_blocks.contains(&parent) {
1301 queue.push_back(parent);
1302 visited_blocks.insert(parent);
1303 }
1304 for referee in referees {
1305 if !visited_blocks.contains(&referee) {
1306 queue.push_back(referee);
1307 visited_blocks.insert(referee);
1308 }
1309 }
1310 } else {
1311 missed_hashes.insert(hash);
1312 }
1313 }
1314
1315 debug!(
1316 "Current frontier after recover from db: {:?}",
1317 self.inner.read().not_ready_blocks_frontier.get_frontier()
1318 );
1319
1320 info!("Finish reconstructing the pivot chain of length {}, start to sync from peers", self.consensus.best_epoch_number());
1321 }
1322
1323 pub fn block_header_by_hash(&self, hash: &H256) -> Option<BlockHeader> {
1325 if !self.contains_block_header(hash) {
1326 return None;
1328 }
1329 self.data_man
1330 .block_header_by_hash(hash)
1331 .map(|header_ref| header_ref.as_ref().clone())
1332 }
1333
1334 pub fn block_height_by_hash(&self, hash: &H256) -> Option<u64> {
1336 self.block_header_by_hash(hash)
1337 .map(|header| header.height())
1338 }
1339
1340 pub fn block_timestamp_by_hash(&self, hash: &H256) -> Option<u64> {
1342 self.block_header_by_hash(hash)
1343 .map(|header| header.timestamp())
1344 }
1345
1346 pub fn block_by_hash(&self, hash: &H256) -> Option<Arc<Block>> {
1349 self.data_man.block_by_hash(hash, true )
1350 }
1351
1352 pub fn contains_block_header(&self, hash: &H256) -> bool {
1353 self.inner.read().hash_to_arena_indices.contains_key(hash)
1354 || self.future_blocks.contains(hash)
1355 }
1356
1357 fn parent_or_referees_invalid(&self, header: &BlockHeader) -> bool {
1358 self.data_man.verified_invalid(header.parent_hash()).0
1359 || header
1360 .referee_hashes()
1361 .iter()
1362 .any(|referee| self.data_man.verified_invalid(referee).0)
1363 }
1364
1365 fn propagate_header_graph_status(
1367 &self, inner: &mut SynchronizationGraphInner,
1368 frontier_index_list: Vec<usize>, need_to_verify: bool,
1369 header_index_to_insert: usize, insert_to_consensus: bool,
1370 persistent: bool,
1371 ) -> (HashSet<usize>, Vec<H256>) {
1372 let now = SystemTime::now()
1373 .duration_since(UNIX_EPOCH)
1374 .unwrap()
1375 .as_secs();
1376 let mut need_to_relay: Vec<H256> = Vec::new();
1377 let mut invalid_set: HashSet<usize> = HashSet::new();
1378 let mut queue = VecDeque::new();
1379
1380 for index in frontier_index_list {
1381 if inner.arena[index].graph_status == BLOCK_INVALID {
1382 invalid_set.insert(index);
1383 }
1384 queue.push_back(index);
1385 }
1386
1387 while let Some(index) = queue.pop_front() {
1388 if inner.arena[index].graph_status == BLOCK_INVALID {
1389 inner.set_and_propagate_invalid(
1390 &mut queue,
1391 &mut invalid_set,
1392 index,
1393 );
1394 } else if inner.new_to_be_header_graph_ready(index) {
1395 inner.arena[index].graph_status = BLOCK_HEADER_GRAPH_READY;
1396 inner.arena[index].last_update_timestamp = now;
1397 debug!("BlockIndex {} parent_index {} hash {:?} is header graph ready", index,
1398 inner.arena[index].parent, inner.arena[index].block_header.hash());
1399
1400 let r = inner.verify_header_graph_ready_block(index);
1401
1402 if need_to_verify && r.is_err() {
1403 warn!(
1404 "Invalid header_arc! inserted_header={:?} err={:?}",
1405 inner.arena[index].block_header.clone(),
1406 r
1407 );
1408 invalid_set.insert(index);
1409 inner.arena[index].graph_status = BLOCK_INVALID;
1410 inner.set_and_propagate_invalid(
1411 &mut queue,
1412 &mut invalid_set,
1413 index,
1414 );
1415 continue;
1416 }
1417
1418 if inner.arena[index].parent_reclaimed {
1421 inner.old_era_blocks_frontier.push_back(index);
1422 inner.old_era_blocks_frontier_set.insert(index);
1423 }
1424
1425 if index == header_index_to_insert && persistent {
1432 self.data_man.insert_block_header(
1433 inner.arena[index].block_header.hash(),
1434 inner.arena[index].block_header.clone(),
1435 true,
1436 );
1437 }
1438 if insert_to_consensus {
1439 CONSENSUS_WORKER_QUEUE.enqueue(1);
1440
1441 self.consensus_unprocessed_count
1442 .fetch_add(1, Ordering::SeqCst);
1443 assert!(
1444 self.new_block_hashes
1445 .send(inner.arena[index].block_header.hash(),),
1446 "consensus receiver dropped"
1447 );
1448
1449 inner.not_ready_blocks_frontier.remove(&index);
1451 inner.pos_not_ready_blocks_frontier.remove(&index);
1454 for child in &inner.arena[index].children {
1455 inner.not_ready_blocks_frontier.insert(*child);
1456 }
1457 }
1458
1459 if inner.arena[index].block_ready {
1461 need_to_relay.push(inner.arena[index].block_header.hash());
1462 }
1463
1464 for child in &inner.arena[index].children {
1465 if inner.arena[*child].graph_status
1466 < BLOCK_HEADER_GRAPH_READY
1467 {
1468 queue.push_back(*child);
1469 }
1470 }
1471 for referrer in &inner.arena[index].referrers {
1472 if inner.arena[*referrer].graph_status
1473 < BLOCK_HEADER_GRAPH_READY
1474 {
1475 queue.push_back(*referrer);
1476 }
1477 }
1478 } else {
1479 debug!(
1480 "BlockIndex {} parent_index {} hash {:?} is not ready",
1481 index,
1482 inner.arena[index].parent,
1483 inner.arena[index].block_header.hash()
1484 );
1485 if index == header_index_to_insert && persistent {
1486 self.data_man.insert_block_header(
1487 inner.arena[index].block_header.hash(),
1488 inner.arena[index].block_header.clone(),
1489 true,
1490 );
1491 }
1492 }
1493 }
1494 (invalid_set, need_to_relay)
1495 }
1496
1497 pub fn insert_block_header(
1498 &self, header: &mut BlockHeader, need_to_verify: bool,
1499 bench_mode: bool, insert_to_consensus: bool, persistent: bool,
1500 ) -> (BlockHeaderInsertionResult, Vec<H256>) {
1501 let _timer = MeterTimer::time_func(SYNC_INSERT_HEADER.as_ref());
1502 self.statistics.inc_sync_graph_inserted_header_count();
1503 let inner = &mut *self.inner.write();
1504 if inner.locked_for_catchup {
1505 return (BlockHeaderInsertionResult::TemporarySkipped, Vec::new());
1507 }
1508 let hash = header.hash();
1509
1510 let (invalid, local_info_opt) = self.data_man.verified_invalid(&hash);
1511 if invalid {
1512 return (BlockHeaderInsertionResult::Invalid, Vec::new());
1513 }
1514
1515 if let Some(info) = local_info_opt {
1516 let already_processed = info.get_seq_num()
1521 < self.consensus.current_era_genesis_seq_num()
1522 || info.get_instance_id() == self.data_man.get_instance_id();
1523 if already_processed {
1524 if need_to_verify && !self.is_consortium() {
1525 VerificationConfig::get_or_fill_header_pow_quality(
1528 &self.pow, header,
1529 );
1530 }
1531 return (
1532 BlockHeaderInsertionResult::AlreadyProcessedInConsensus,
1533 Vec::new(),
1534 );
1535 }
1536 }
1537
1538 if inner.hash_to_arena_indices.contains_key(&hash) {
1539 if need_to_verify {
1540 VerificationConfig::get_or_fill_header_pow_quality(
1543 &self.pow, header,
1544 );
1545 }
1546 return (
1547 BlockHeaderInsertionResult::AlreadyProcessedInSync,
1548 Vec::new(),
1549 );
1550 }
1551
1552 debug!("is_consortium={:?}", self.is_consortium());
1554 let verification_passed = if need_to_verify {
1555 self.is_consortium()
1556 || !(self.parent_or_referees_invalid(header)
1557 || self
1558 .verification_config
1559 .verify_header_params(&self.pow, header)
1560 .or_else(|e| {
1561 warn!(
1562 "Invalid header: err={} header={:?}",
1563 e, header
1564 );
1565 Err(e)
1566 })
1567 .is_err())
1568 } else {
1569 if !bench_mode && !self.is_consortium() {
1570 self.verification_config
1571 .verify_pow(&self.pow, header)
1572 .expect("local mined block should pass this check!");
1573 }
1574 true
1575 };
1576
1577 let header_arc = Arc::new(header.clone());
1578 let me = if verification_passed {
1579 inner.insert(header_arc.clone())
1580 } else {
1581 inner.insert_invalid(header_arc.clone())
1582 };
1583
1584 if inner.arena[me].graph_status != BLOCK_GRAPH_READY {
1589 if inner.arena[me].parent == NULL
1598 || inner.arena[inner.arena[me].parent].graph_status
1599 == BLOCK_GRAPH_READY
1600 || (insert_to_consensus
1601 && inner.arena[inner.arena[me].parent].graph_status
1602 == BLOCK_HEADER_GRAPH_READY)
1603 {
1604 inner.not_ready_blocks_frontier.insert(me);
1605 }
1606 let mut to_be_removed = Vec::new();
1607 for child in &inner.arena[me].children {
1608 if inner.not_ready_blocks_frontier.contains(child) {
1609 to_be_removed.push(*child);
1610 }
1611 }
1612 for x in to_be_removed {
1613 inner.not_ready_blocks_frontier.remove(&x);
1614 }
1615 }
1616
1617 debug!("insert_block_header() Block = {:?}, index = {}, need_to_verify = {}, bench_mode = {} insert_to_consensus = {}",
1618 header.hash(), me, need_to_verify, bench_mode, insert_to_consensus);
1619
1620 let (invalid_set, need_to_relay) = self.propagate_header_graph_status(
1622 inner,
1623 vec![me],
1624 need_to_verify,
1625 me,
1626 insert_to_consensus,
1627 persistent,
1628 );
1629
1630 let me_invalid = invalid_set.contains(&me);
1631
1632 inner.process_invalid_blocks(&invalid_set);
1634
1635 if me_invalid {
1636 return (BlockHeaderInsertionResult::Invalid, need_to_relay);
1637 }
1638
1639 inner.try_clear_old_era_blocks();
1640
1641 (BlockHeaderInsertionResult::NewValid, need_to_relay)
1642 }
1643
1644 pub fn contains_block(&self, hash: &H256) -> bool {
1645 let inner = self.inner.read();
1646 if let Some(index) = inner.hash_to_arena_indices.get(hash) {
1647 inner.arena[*index].block_ready
1648 } else {
1649 false
1650 }
1651 }
1652
1653 fn set_graph_ready(
1654 &self, inner: &mut SynchronizationGraphInner, index: usize,
1655 ) {
1656 inner.arena[index].graph_status = BLOCK_GRAPH_READY;
1657
1658 inner.not_ready_blocks_frontier.remove(&index);
1660 inner.pos_not_ready_blocks_frontier.remove(&index);
1663 for child in &inner.arena[index].children {
1664 inner.not_ready_blocks_frontier.insert(*child);
1665 }
1666
1667 let h = inner.arena[index].block_header.hash();
1668 debug!("Block {:?} is graph ready", h);
1669 CONSENSUS_WORKER_QUEUE.enqueue(1);
1670
1671 self.consensus_unprocessed_count
1672 .fetch_add(1, Ordering::SeqCst);
1673 assert!(self.new_block_hashes.send(h), "consensus receiver dropped");
1674
1675 if inner.config.enable_state_expose {
1676 STATE_EXPOSER.sync_graph.lock().ready_block_vec.push(
1677 SyncGraphBlockState {
1678 block_hash: h,
1679 parent: inner.arena[index]
1680 .block_header
1681 .parent_hash()
1682 .clone(),
1683 referees: inner.arena[index]
1684 .block_header
1685 .referee_hashes()
1686 .clone(),
1687 nonce: inner.arena[index].block_header.nonce(),
1688 timestamp: inner.arena[index].block_header.timestamp(),
1689 adaptive: inner.arena[index].block_header.adaptive(),
1690 },
1691 );
1692 }
1693 }
1694
1695 fn propagate_graph_status(
1697 &self, inner: &mut SynchronizationGraphInner,
1698 frontier_index_list: Vec<usize>,
1699 ) -> HashSet<usize> {
1700 let mut queue = VecDeque::new();
1701 let mut invalid_set = HashSet::new();
1702 for index in frontier_index_list {
1703 if inner.arena[index].graph_status == BLOCK_INVALID {
1704 invalid_set.insert(index);
1705 }
1706 queue.push_back(index);
1707 }
1708
1709 while let Some(index) = queue.pop_front() {
1710 if inner.arena[index].graph_status == BLOCK_INVALID {
1711 inner.set_and_propagate_invalid(
1712 &mut queue,
1713 &mut invalid_set,
1714 index,
1715 );
1716 } else if inner.new_to_be_block_graph_ready(index) {
1717 let verify_result = inner
1718 .verify_graph_ready_block(index, &self.verification_config);
1719 if verify_result.is_err() {
1720 warn!(
1721 "Invalid block! inserted_header={:?} err={:?}",
1722 inner.arena[index].block_header.clone(),
1723 verify_result
1724 );
1725 invalid_set.insert(index);
1726 inner.arena[index].graph_status = BLOCK_INVALID;
1727 inner.set_and_propagate_invalid(
1728 &mut queue,
1729 &mut invalid_set,
1730 index,
1731 );
1732 continue;
1733 }
1734 self.set_graph_ready(inner, index);
1735 for child in &inner.arena[index].children {
1736 debug_assert!(
1737 inner.arena[*child].graph_status < BLOCK_GRAPH_READY
1738 );
1739 queue.push_back(*child);
1740 }
1741 for referrer in &inner.arena[index].referrers {
1742 debug_assert!(
1743 inner.arena[*referrer].graph_status < BLOCK_GRAPH_READY
1744 );
1745 queue.push_back(*referrer);
1746 }
1747 } else {
1748 trace!("Block index {:?} not block_graph_ready, current frontier: {:?}", index, inner.not_ready_blocks_frontier.get_frontier());
1749 }
1750 }
1751
1752 invalid_set
1753 }
1754
1755 pub fn insert_block(
1756 &self, block: Block, need_to_verify: bool, persistent: bool,
1757 recover_from_db: bool,
1758 ) -> BlockInsertionResult {
1759 let _timer = MeterTimer::time_func(SYNC_INSERT_BLOCK.as_ref());
1760 let hash = block.hash();
1761
1762 debug!("insert_block {:?}", hash);
1763
1764 let inner = &mut *self.inner.write();
1765
1766 let contains_block =
1767 if let Some(index) = inner.hash_to_arena_indices.get(&hash) {
1768 inner.arena[*index].block_ready
1769 } else {
1770 return BlockInsertionResult::Ignored;
1773 };
1774
1775 if contains_block {
1776 return BlockInsertionResult::AlreadyProcessed;
1777 }
1778
1779 debug_assert!(!self.data_man.verified_invalid(&hash).0);
1782
1783 self.statistics.inc_sync_graph_inserted_block_count();
1784
1785 let me = *inner.hash_to_arena_indices.get(&hash).unwrap();
1786
1787 debug_assert!(hash == inner.arena[me].block_header.hash());
1788 debug_assert!(!inner.arena[me].block_ready);
1789 inner.arena[me].block_ready = true;
1790
1791 if need_to_verify {
1792 let r = self.verification_config.verify_sync_graph_block_basic(
1793 &block,
1794 self.consensus.best_chain_id(),
1795 );
1796 match r {
1797 Err(Error::Block(BlockError::InvalidTransactionsRoot(e))) => {
1798 warn!("BlockTransactionRoot not match! inserted_block={:?} err={:?}", block, e);
1799 inner.arena[me].block_ready = false;
1807 return BlockInsertionResult::RequestAgain;
1808 }
1809 Err(e) => {
1810 warn!(
1811 "Invalid block! inserted_block={:?} err={:?}",
1812 block.block_header, e
1813 );
1814 inner.arena[me].graph_status = BLOCK_INVALID;
1815 }
1816 _ => {}
1817 };
1818 }
1819
1820 let block = Arc::new(block);
1821 if inner.arena[me].graph_status != BLOCK_INVALID {
1822 if !recover_from_db {
1825 self.data_man.insert_compact_block(block.to_compact());
1828 self.data_man.insert_block_body(
1831 block.hash(),
1832 block.clone(),
1833 persistent,
1834 );
1835 }
1836 }
1837
1838 if inner.locked_for_catchup {
1841 if inner.arena[me].graph_status == BLOCK_INVALID {
1842 let invalid_set = self.propagate_graph_status(inner, vec![me]);
1843 inner.process_invalid_blocks(&invalid_set);
1847 return BlockInsertionResult::Invalid;
1848 } else {
1849 debug!("Downloaded block body for {:?}", hash);
1850 inner.block_to_fill_set.remove(&hash);
1851 return BlockInsertionResult::AlreadyProcessed;
1852 }
1853 }
1854
1855 let invalid_set = self.propagate_graph_status(inner, vec![me]);
1856
1857 inner.process_invalid_blocks(&invalid_set);
1859
1860 debug!(
1861 "new block inserted into graph: block_header={:?}, tx_count={}, block_size={}",
1862 block.block_header,
1863 block.transactions.len(),
1864 block.size(),
1865 );
1866
1867 if invalid_set.contains(&me) {
1870 BlockInsertionResult::Invalid
1871 } else if inner.arena[me].graph_status >= BLOCK_HEADER_GRAPH_READY {
1872 BlockInsertionResult::ShouldRelay
1873 } else {
1874 BlockInsertionResult::SuccessWithoutRelay
1875 }
1876 }
1877
1878 pub fn get_all_block_hashes_by_epoch(
1879 &self, epoch_number: u64,
1880 ) -> Result<Vec<H256>, ProviderBlockError> {
1881 let mut res = self.consensus.get_skipped_block_hashes_by_epoch(
1882 EpochNumber::Number(epoch_number.into()),
1883 )?;
1884 res.append(&mut self.consensus.get_block_hashes_by_epoch(
1885 EpochNumber::Number(epoch_number.into()),
1886 )?);
1887 Ok(res)
1888 }
1889
1890 pub fn log_statistics(&self) { self.statistics.log_statistics(); }
1891
1892 pub fn update_total_weight_delta_heartbeat(&self) {
1893 self.consensus.update_total_weight_delta_heartbeat();
1894 }
1895
1896 pub fn block_count(&self) -> usize { self.data_man.cached_block_count() }
1900
1901 pub fn remove_expire_blocks(&self, expire_time: u64) {
1906 let inner = &mut *self.inner.write();
1907 let now = SystemTime::now()
1908 .duration_since(UNIX_EPOCH)
1909 .unwrap()
1910 .as_secs();
1911 let frontier = inner.not_ready_blocks_frontier.get_frontier().clone();
1912 let all_not_ready: HashSet<_> = inner.get_future(frontier);
1913 let mut expire_set = HashSet::new();
1914 for index in all_not_ready {
1915 if inner.arena[index].last_update_timestamp + expire_time < now {
1916 expire_set.insert(index);
1917 }
1918 }
1919
1920 let all_expire: HashSet<_> = inner.get_future(expire_set);
1922 debug!("all_expire: {:?}", all_expire);
1923 inner.remove_blocks(&all_expire);
1924 }
1925
1926 pub fn remove_blocks_and_future(&self, to_remove_set: &HashSet<H256>) {
1929 let mut inner = self.inner.write();
1930 let mut index_set = Vec::new();
1931 for block_hash in to_remove_set {
1932 if let Some(index) = inner.hash_to_arena_indices.get(block_hash) {
1933 index_set.push(*index);
1934 }
1935 }
1936 let index_set_and_future: HashSet<_> = inner.get_future(index_set);
1937 inner.remove_blocks(&index_set_and_future);
1938 }
1939
1940 pub fn is_consensus_worker_busy(&self) -> bool {
1941 self.consensus_unprocessed_count.load(Ordering::SeqCst) != 0
1942 }
1943
1944 pub fn is_fill_block_completed(&self) -> bool {
1945 self.inner.read().block_to_fill_set.is_empty()
1946 }
1947
1948 pub fn complete_filling_block_bodies(&self) -> bool {
1959 {
1960 let inner = &mut *self.inner.write();
1961
1962 let to_remove = {
1965 let arena = &mut inner.arena;
1966 inner
1967 .hash_to_arena_indices
1968 .iter()
1969 .filter_map(|(_, index)| {
1970 let graph_node = &mut arena[*index];
1971 if graph_node.graph_status == BLOCK_HEADER_GRAPH_READY {
1972 graph_node.block_ready = true;
1973 graph_node.graph_status = BLOCK_GRAPH_READY;
1974 }
1975 if graph_node.graph_status != BLOCK_GRAPH_READY {
1976 Some(*index)
1977 } else {
1978 None
1979 }
1980 })
1981 .collect()
1982 };
1983 inner.remove_blocks(&to_remove);
1984
1985 let skipped_body_blocks =
1989 self.consensus.get_blocks_needing_bodies();
1990 if !skipped_body_blocks.is_empty() {
1991 warn!("Has invalid blocks after downloading block bodies!");
1992 self.consensus.reset();
1996
1997 let all_block_indices: HashSet<_> = inner
1998 .hash_to_arena_indices
1999 .iter()
2000 .map(|(_, i)| *i)
2001 .collect();
2002 let sorted_blocks = inner.topological_sort(all_block_indices);
2004 for i in sorted_blocks {
2005 self.consensus
2006 .on_new_block(&inner.arena[i].block_header.hash());
2007 }
2008 let new_to_fill_blocks: HashSet<_> =
2009 self.consensus.get_blocks_needing_bodies();
2010 if !new_to_fill_blocks.is_empty() {
2011 warn!(
2015 "{} new block bodies to get",
2016 new_to_fill_blocks.len()
2017 );
2018 inner.block_to_fill_set = new_to_fill_blocks;
2019 return false;
2020 }
2021 }
2022 }
2023 self.consensus.construct_pivot_state();
2024 self.inner.write().locked_for_catchup = false;
2025 true
2026 }
2027
2028 pub fn check_not_ready_frontier(&self, header_only: bool) {
2033 debug!("check_not_ready_frontier starts");
2034 let mut inner = self.inner.write();
2035 if inner.locked_for_catchup {
2036 return;
2039 }
2040 if header_only {
2041 for b in inner.pos_not_ready_blocks_frontier.clone() {
2042 debug!(
2043 "check_not_ready_frontier: check {:?}",
2044 inner.arena[b].block_header.hash()
2045 );
2046 if inner.new_to_be_header_graph_ready(b) {
2047 self.propagate_header_graph_status(
2048 &mut *inner,
2049 vec![b],
2050 true, b,
2052 true, true, );
2055 }
2056 }
2057 } else {
2058 for b in inner.pos_not_ready_blocks_frontier.clone() {
2059 debug!(
2060 "check_not_ready_frontier: check {:?}",
2061 inner.arena[b].block_header.hash()
2062 );
2063 if inner.new_to_be_header_graph_ready(b) {
2064 self.propagate_header_graph_status(
2065 &mut *inner,
2066 vec![b],
2067 true, b,
2069 false, true, );
2072 }
2073 if inner.new_to_be_block_graph_ready(b) {
2076 debug!("new graph ready found");
2077 self.propagate_graph_status(&mut *inner, vec![b]);
2078 }
2079 }
2080 }
2081 }
2082}
2083
2084impl Graph for SynchronizationGraphInner {
2085 type NodeIndex = usize;
2086}
2087
2088impl TreeGraph for SynchronizationGraphInner {
2089 fn parent(&self, node_index: Self::NodeIndex) -> Option<Self::NodeIndex> {
2090 if self.arena[node_index].parent != NULL {
2091 Some(self.arena[node_index].parent)
2092 } else {
2093 None
2094 }
2095 }
2096
2097 fn referees(&self, node_index: Self::NodeIndex) -> Vec<Self::NodeIndex> {
2098 self.arena[node_index].referees.clone()
2099 }
2100}
2101
2102impl RichTreeGraph for SynchronizationGraphInner {
2103 fn children(&self, node_index: Self::NodeIndex) -> Vec<Self::NodeIndex> {
2104 self.arena[node_index].children.clone()
2105 }
2106
2107 fn referrers(&self, node_index: Self::NodeIndex) -> Vec<Self::NodeIndex> {
2108 self.arena[node_index].referrers.clone()
2109 }
2110}
2111
2112pub enum BlockInsertionResult {
2113 AlreadyProcessed,
2115 ShouldRelay,
2117 SuccessWithoutRelay,
2119 Invalid,
2122 RequestAgain,
2126 Ignored,
2130}
2131
2132impl BlockInsertionResult {
2133 pub fn is_valid(&self) -> bool {
2134 matches!(
2135 self,
2136 BlockInsertionResult::AlreadyProcessed
2137 | BlockInsertionResult::ShouldRelay
2138 | BlockInsertionResult::SuccessWithoutRelay
2139 )
2140 }
2141
2142 pub fn is_invalid(&self) -> bool {
2143 matches!(self, BlockInsertionResult::Invalid)
2144 }
2145
2146 pub fn should_relay(&self) -> bool {
2147 matches!(self, BlockInsertionResult::ShouldRelay)
2148 }
2149
2150 pub fn request_again(&self) -> bool {
2151 matches!(self, BlockInsertionResult::RequestAgain)
2152 }
2153}
2154
2155pub enum BlockHeaderInsertionResult {
2156 AlreadyProcessedInConsensus,
2159 AlreadyProcessedInSync,
2162 NewValid,
2164 Invalid,
2167 TemporarySkipped,
2169}
2170
2171impl BlockHeaderInsertionResult {
2172 pub fn is_new_valid(&self) -> bool {
2173 matches!(self, BlockHeaderInsertionResult::NewValid)
2174 }
2175
2176 pub fn is_invalid(&self) -> bool {
2177 matches!(self, BlockHeaderInsertionResult::Invalid)
2178 }
2179
2180 pub fn should_process_body(&self) -> bool {
2181 matches!(
2182 self,
2183 BlockHeaderInsertionResult::NewValid
2184 | BlockHeaderInsertionResult::AlreadyProcessedInSync
2185 )
2186 }
2187}