1use std::{
6 cmp::max,
7 collections::{BinaryHeap, HashMap, HashSet, VecDeque},
8 mem, panic,
9 sync::{
10 atomic::{AtomicUsize, Ordering},
11 Arc,
12 },
13 thread,
14 time::{Duration, SystemTime, UNIX_EPOCH},
15};
16
17use cfx_parameters::consensus_internal::ELASTICITY_MULTIPLIER;
18use parking_lot::{Mutex, RwLock};
19use slab::Slab;
20use tokio::sync::mpsc::error::TryRecvError;
21use unexpected::{Mismatch, OutOfBounds};
22
23use cfx_executor::machine::Machine;
24use cfx_types::{H256, U256};
25use cfxcore_errors::ProviderBlockError;
26use dag::{Graph, RichDAG, RichTreeGraph, TreeGraph, DAG};
27use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
28use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
29use metrics::{
30 register_meter_with_group, register_queue, Meter, MeterTimer, Queue,
31};
32use primitives::{
33 pos::PosBlockId, transaction::SignedTransaction, Block, BlockHeader,
34 EpochNumber,
35};
36
37use crate::{
38 block_data_manager::{BlockDataManager, BlockStatus},
39 channel::Channel,
40 consensus::{pos_handler::PosVerifier, SharedConsensusGraph},
41 core_error::{BlockError, CoreError as Error},
42 pow::{PowComputer, ProofOfWorkConfig},
43 state_exposer::{SyncGraphBlockState, STATE_EXPOSER},
44 statistics::SharedStatistics,
45 sync::synchronization_protocol_handler::FutureBlockContainer,
46 verification::*,
47 Notifications,
48};
49
50lazy_static! {
51 static ref SYNC_INSERT_HEADER: Arc<dyn Meter> =
52 register_meter_with_group("timer", "sync::insert_block_header");
53 static ref SYNC_INSERT_BLOCK: Arc<dyn Meter> =
54 register_meter_with_group("timer", "sync::insert_block");
55 static ref CONSENSUS_WORKER_QUEUE: Arc<dyn Queue> =
56 register_queue("consensus_worker_queue");
57}
58
59const NULL: usize = !0;
60const BLOCK_INVALID: u8 = 0;
61const BLOCK_HEADER_ONLY: u8 = 1;
62const BLOCK_HEADER_GRAPH_READY: u8 = 2;
63const BLOCK_GRAPH_READY: u8 = 3;
64
65#[derive(Copy, Clone)]
66pub struct SyncGraphConfig {
67 pub future_block_buffer_capacity: usize,
68 pub enable_state_expose: bool,
69 pub is_consortium: bool,
70}
71
72#[derive(Debug)]
73pub struct SyncGraphStatistics {
74 pub inserted_block_count: usize,
75 pub inserted_header_count: usize,
76}
77
78impl SyncGraphStatistics {
79 pub fn new() -> SyncGraphStatistics {
80 SyncGraphStatistics {
81 inserted_header_count: 1,
83 inserted_block_count: 1,
84 }
85 }
86
87 pub fn clear(&mut self) {
88 self.inserted_header_count = 1;
89 self.inserted_block_count = 1;
90 }
91}
92
93#[derive(DeriveMallocSizeOf)]
94pub struct SynchronizationGraphNode {
95 pub block_header: Arc<BlockHeader>,
96 pub graph_status: u8,
98 pub block_ready: bool,
100 pub parent_reclaimed: bool,
102 pub parent: usize,
104 pub children: Vec<usize>,
106 pub referees: Vec<usize>,
108 pub pending_referee_count: usize,
111 pub referrers: Vec<usize>,
113 pub last_update_timestamp: u64,
115}
116
117#[derive(DeriveMallocSizeOf)]
118pub struct UnreadyBlockFrontier {
119 frontier: HashSet<usize>,
120 updated: bool,
121}
122
123impl UnreadyBlockFrontier {
124 fn new() -> Self {
125 UnreadyBlockFrontier {
126 frontier: HashSet::new(),
127 updated: false,
128 }
129 }
130
131 pub fn reset_update_state(&mut self) { self.updated = false; }
132
133 pub fn updated(&self) -> bool { self.updated }
134
135 pub fn get_frontier(&self) -> &HashSet<usize> { &self.frontier }
136
137 pub fn remove(&mut self, index: &usize) -> bool {
138 self.updated = true;
139 self.frontier.remove(index)
140 }
141
142 pub fn contains(&self, index: &usize) -> bool {
143 self.frontier.contains(index)
144 }
145
146 pub fn insert(&mut self, index: usize) -> bool {
147 self.updated = true;
148 self.frontier.insert(index)
149 }
150
151 pub fn len(&self) -> usize { self.frontier.len() }
152}
153
154pub struct SynchronizationGraphInner {
155 pub arena: Slab<SynchronizationGraphNode>,
156 pub hash_to_arena_indices: HashMap<H256, usize>,
157 pub data_man: Arc<BlockDataManager>,
158 children_by_hash: HashMap<H256, Vec<usize>>,
159 referrers_by_hash: HashMap<H256, Vec<usize>>,
160 pub pow_config: ProofOfWorkConfig,
161 pub pow: Arc<PowComputer>,
162 pub config: SyncGraphConfig,
163 pub not_ready_blocks_frontier: UnreadyBlockFrontier,
169
170 pub pos_not_ready_blocks_frontier: HashSet<usize>,
174 pub old_era_blocks_frontier: VecDeque<usize>,
175 pub old_era_blocks_frontier_set: HashSet<usize>,
176
177 pub locked_for_catchup: bool,
181 pub block_to_fill_set: HashSet<H256>,
184 machine: Arc<Machine>,
185 pub pos_verifier: Arc<PosVerifier>,
186}
187
188impl MallocSizeOf for SynchronizationGraphInner {
189 fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
190 self.arena.size_of(ops)
191 + self.hash_to_arena_indices.size_of(ops)
192 + self.data_man.size_of(ops)
193 + self.children_by_hash.size_of(ops)
194 + self.referrers_by_hash.size_of(ops)
195 + self.pow_config.size_of(ops)
196 + self.not_ready_blocks_frontier.size_of(ops)
197 + self.old_era_blocks_frontier.size_of(ops)
198 + self.old_era_blocks_frontier_set.size_of(ops)
199 }
201}
202
203impl SynchronizationGraphInner {
204 pub fn with_genesis_block(
205 genesis_header: Arc<BlockHeader>, pow_config: ProofOfWorkConfig,
206 pow: Arc<PowComputer>, config: SyncGraphConfig,
207 data_man: Arc<BlockDataManager>, machine: Arc<Machine>,
208 pos_verifier: Arc<PosVerifier>,
209 ) -> Self {
210 let mut inner = SynchronizationGraphInner {
211 arena: Slab::new(),
212 hash_to_arena_indices: HashMap::new(),
213 data_man,
214 children_by_hash: HashMap::new(),
215 referrers_by_hash: HashMap::new(),
216 pow_config,
217 pow,
218 config,
219 not_ready_blocks_frontier: UnreadyBlockFrontier::new(),
220 pos_not_ready_blocks_frontier: Default::default(),
221 old_era_blocks_frontier: Default::default(),
222 old_era_blocks_frontier_set: Default::default(),
223 block_to_fill_set: Default::default(),
224 locked_for_catchup: false,
225 machine,
226 pos_verifier,
227 };
228 let genesis_hash = genesis_header.hash();
229 let genesis_block_index = inner.insert(genesis_header);
230 debug!(
231 "genesis block {:?} has index {} in sync graph",
232 genesis_hash, genesis_block_index
233 );
234
235 inner.old_era_blocks_frontier.push_back(genesis_block_index);
236 inner
237 .old_era_blocks_frontier_set
238 .insert(genesis_block_index);
239
240 inner
241 }
242
243 fn get_genesis_in_current_era(&self) -> usize {
244 let genesis_hash = self.data_man.get_cur_consensus_era_genesis_hash();
245 *self.hash_to_arena_indices.get(&genesis_hash).unwrap()
246 }
247
248 pub fn get_genesis_hash_and_height_in_current_era(&self) -> (H256, u64) {
249 let era_genesis = self.get_genesis_in_current_era();
250 (
251 self.arena[era_genesis].block_header.hash(),
252 self.arena[era_genesis].block_header.height(),
253 )
254 }
255
256 pub fn get_stable_hash_and_height_in_current_era(&self) -> (H256, u64) {
257 let stable_hash = self.data_man.get_cur_consensus_era_stable_hash();
258 let height = self
262 .data_man
263 .block_header_by_hash(&stable_hash)
264 .expect("stable block must exist in data manager")
265 .height();
266 (stable_hash, height)
267 }
268
269 fn try_clear_old_era_blocks(&mut self) {
270 let max_num_of_cleared_blocks = 2;
271 let mut num_cleared = 0;
272 let era_genesis = self.get_genesis_in_current_era();
273 let genesis_seq_num = self
274 .data_man
275 .local_block_info_by_hash(
276 &self.data_man.get_cur_consensus_era_genesis_hash(),
277 )
278 .expect("local_block_info for genesis must exist")
279 .get_seq_num();
280 let mut era_genesis_in_frontier = false;
281
282 while let Some(index) = self.old_era_blocks_frontier.pop_front() {
283 if index == era_genesis {
284 era_genesis_in_frontier = true;
285 continue;
286 }
287
288 if !self.old_era_blocks_frontier_set.contains(&index) {
290 continue;
291 }
292
293 let hash = self.arena[index].block_header.hash();
294 assert!(self.arena[index].parent == NULL);
295
296 if !self.is_graph_ready_in_db(&hash, genesis_seq_num) {
297 continue;
305 }
306
307 let referees: Vec<usize> =
308 self.arena[index].referees.iter().map(|x| *x).collect();
309 for referee in referees {
310 self.arena[referee].referrers.retain(|&x| x != index);
311 }
312 let referee_hashes: Vec<H256> = self.arena[index]
313 .block_header
314 .referee_hashes()
315 .iter()
316 .map(|x| *x)
317 .collect();
318 for referee_hash in referee_hashes {
319 let mut remove_referee_hash: bool = false;
320 if let Some(referrers) =
321 self.referrers_by_hash.get_mut(&referee_hash)
322 {
323 referrers.retain(|&x| x != index);
324 remove_referee_hash = referrers.len() == 0;
325 }
326 if remove_referee_hash {
328 self.referrers_by_hash.remove(&referee_hash);
329 }
330 }
331
332 let children: Vec<usize> =
333 self.arena[index].children.iter().map(|x| *x).collect();
334 for child in children {
335 self.arena[child].parent = NULL;
336 self.arena[child].parent_reclaimed = true;
337 self.old_era_blocks_frontier.push_back(child);
342 assert!(!self.old_era_blocks_frontier_set.contains(&child));
343 self.old_era_blocks_frontier_set.insert(child);
344 }
345
346 let referrers: Vec<usize> =
347 self.arena[index].referrers.iter().map(|x| *x).collect();
348 for referrer in referrers {
349 self.arena[referrer].referees.retain(|&x| x != index);
350 }
351
352 self.old_era_blocks_frontier_set.remove(&index);
353 self.arena.remove(index);
354 self.hash_to_arena_indices.remove(&hash);
355 self.data_man
357 .remove_block_header(&hash, false );
358
359 num_cleared += 1;
360 if num_cleared == max_num_of_cleared_blocks {
361 break;
362 }
363 }
364
365 if era_genesis_in_frontier {
366 self.old_era_blocks_frontier.push_front(era_genesis);
367 }
368 }
369
370 pub fn insert_invalid(&mut self, header: Arc<BlockHeader>) -> usize {
371 let hash = header.hash();
372 let me = self.arena.insert(SynchronizationGraphNode {
373 graph_status: BLOCK_INVALID,
374 block_ready: false,
375 parent_reclaimed: false,
376 parent: NULL,
377 children: Vec::new(),
378 referees: Vec::new(),
379 pending_referee_count: 0,
380 referrers: Vec::new(),
381 block_header: header,
382 last_update_timestamp: SystemTime::now()
383 .duration_since(UNIX_EPOCH)
384 .unwrap()
385 .as_secs(),
386 });
387 self.hash_to_arena_indices.insert(hash, me);
388
389 if let Some(children) = self.children_by_hash.remove(&hash) {
390 for child in &children {
391 self.arena[*child].parent = me;
392 }
393 self.arena[me].children = children;
394 }
395 if let Some(referrers) = self.referrers_by_hash.remove(&hash) {
396 for referrer in &referrers {
397 let ref mut node_referrer = self.arena[*referrer];
398 node_referrer.referees.push(me);
399 debug_assert!(node_referrer.pending_referee_count > 0);
400 if node_referrer.pending_referee_count > 0 {
401 node_referrer.pending_referee_count =
402 node_referrer.pending_referee_count - 1;
403 }
404 }
405 self.arena[me].referrers = referrers;
406 }
407
408 me
409 }
410
411 pub fn insert(&mut self, header: Arc<BlockHeader>) -> usize {
413 let hash = header.hash();
414 let is_genesis =
415 hash == self.data_man.get_cur_consensus_era_genesis_hash();
416
417 let me = self.arena.insert(SynchronizationGraphNode {
418 graph_status: if is_genesis {
419 BLOCK_GRAPH_READY
420 } else {
421 BLOCK_HEADER_ONLY
422 },
423 block_ready: is_genesis,
424 parent_reclaimed: false,
425 parent: NULL,
426 children: Vec::new(),
427 referees: Vec::new(),
428 pending_referee_count: 0,
429 referrers: Vec::new(),
430 block_header: header.clone(),
431 last_update_timestamp: SystemTime::now()
432 .duration_since(UNIX_EPOCH)
433 .unwrap()
434 .as_secs(),
435 });
436 self.hash_to_arena_indices.insert(hash, me);
437
438 if !is_genesis {
439 let parent_hash = header.parent_hash().clone();
440 if let Some(parent) =
441 self.hash_to_arena_indices.get(&parent_hash).cloned()
442 {
443 self.arena[me].parent = parent;
444 self.arena[parent].children.push(me);
445 } else {
446 self.children_by_hash
447 .entry(parent_hash)
448 .or_insert(Vec::new())
449 .push(me);
450 }
451 }
452 for referee_hash in header.referee_hashes() {
453 if let Some(referee) =
454 self.hash_to_arena_indices.get(referee_hash).cloned()
455 {
456 self.arena[me].referees.push(referee);
457 self.arena[referee].referrers.push(me);
458 } else {
459 self.arena[me].pending_referee_count =
460 self.arena[me].pending_referee_count + 1;
461 self.referrers_by_hash
462 .entry(*referee_hash)
463 .or_insert(Vec::new())
464 .push(me);
465 }
466 }
467
468 if let Some(children) = self.children_by_hash.remove(&hash) {
469 for child in &children {
470 self.arena[*child].parent = me;
471 }
472 self.arena[me].children = children;
473 }
474 if let Some(referrers) = self.referrers_by_hash.remove(&hash) {
475 for referrer in &referrers {
476 let ref mut node_referrer = self.arena[*referrer];
477 node_referrer.referees.push(me);
478 debug_assert!(node_referrer.pending_referee_count > 0);
479 if node_referrer.pending_referee_count > 0 {
480 node_referrer.pending_referee_count =
481 node_referrer.pending_referee_count - 1;
482 }
483 }
484 self.arena[me].referrers = referrers;
485 }
486
487 me
488 }
489
490 fn is_graph_ready_in_db(
493 &self, parent_or_referee_hash: &H256, genesis_seq_num: u64,
494 ) -> bool {
495 if let Some(info) = self
496 .data_man
497 .local_block_info_by_hash(parent_or_referee_hash)
498 {
499 if info.get_status() == BlockStatus::Invalid {
500 false
501 } else {
502 info.get_seq_num() < genesis_seq_num
503 || info.get_instance_id() == self.data_man.get_instance_id()
504 }
505 } else {
506 false
507 }
508 }
509
510 fn new_to_be_graph_ready(
511 &mut self, index: usize, minimal_status: u8,
512 ) -> bool {
513 let ref node_me = self.arena[index];
514 if node_me.graph_status >= minimal_status {
520 return false;
521 }
522
523 let genesis_hash = self.data_man.get_cur_consensus_era_genesis_hash();
524 let genesis_seq_num = self
525 .data_man
526 .local_block_info_by_hash(&genesis_hash)
527 .expect("local_block_info for genesis must exist")
528 .get_seq_num();
529 let parent = self.arena[index].parent;
530 let parent_graph_ready = if parent == NULL {
531 self.arena[index].parent_reclaimed
532 || self.is_graph_ready_in_db(
533 self.arena[index].block_header.parent_hash(),
534 genesis_seq_num,
535 )
536 } else {
537 self.arena[parent].graph_status >= minimal_status
538 };
539
540 if !parent_graph_ready {
541 return false;
542 } else if parent == NULL {
543 self.arena[index].parent_reclaimed = true;
544 }
545
546 let mut referee_hash_in_mem = HashSet::new();
553 for referee in self.arena[index].referees.iter() {
554 if self.arena[*referee].graph_status < minimal_status {
555 return false;
556 } else {
557 referee_hash_in_mem
558 .insert(self.arena[*referee].block_header.hash());
559 }
560 }
561
562 for referee_hash in self.arena[index].block_header.referee_hashes() {
563 if !referee_hash_in_mem.contains(referee_hash) {
564 if !self.is_graph_ready_in_db(referee_hash, genesis_seq_num) {
565 return false;
566 }
567 }
568 }
569
570 if !self.is_pos_reference_graph_ready(
571 index,
572 genesis_seq_num,
573 minimal_status,
574 ) {
575 debug!(
576 "Block {:?} not ready for its pos_reference: {:?}",
577 self.arena[index].block_header.hash(),
578 self.pos_verifier.get_pivot_decision(
579 self.arena[index]
580 .block_header
581 .pos_reference()
582 .as_ref()
583 .unwrap()
584 )
585 );
586 self.pos_not_ready_blocks_frontier.insert(index);
588 return false;
589 }
590
591 true
593 }
594
595 fn new_to_be_header_graph_ready(&mut self, index: usize) -> bool {
596 self.new_to_be_graph_ready(index, BLOCK_HEADER_GRAPH_READY)
597 }
598
599 fn new_to_be_block_graph_ready(&mut self, index: usize) -> bool {
600 self.new_to_be_graph_ready(index, BLOCK_GRAPH_READY)
601 && self.arena[index].block_ready
602 }
603
604 fn is_pos_reference_graph_ready(
605 &self, index: usize, genesis_seq_num: u64, minimal_status: u8,
606 ) -> bool {
607 match self.arena[index].block_header.pos_reference() {
609 Some(pos_reference) => {
612 match self.pos_verifier.get_pivot_decision(pos_reference) {
613 None => false,
615 Some(pivot_decision) => {
616 match self.hash_to_arena_indices.get(&pivot_decision) {
618 None => self.is_graph_ready_in_db(
619 &pivot_decision,
620 genesis_seq_num,
621 ),
622 Some(index) => {
623 self.arena[*index].graph_status
624 >= minimal_status
625 }
626 }
627 }
628 }
629 }
630 None => true,
631 }
632 }
633
634 fn get_parent_and_referee_info(
638 &self, index: usize,
639 ) -> (u64, u64, U256, U256, Vec<Option<PosBlockId>>) {
640 let parent_height;
641 let parent_timestamp;
642 let parent_gas_limit;
643 let parent_difficulty;
644 let mut pos_references = Vec::new();
647 let parent = self.arena[index].parent;
648
649 if parent != NULL {
651 parent_height = self.arena[parent].block_header.height();
652 parent_timestamp = self.arena[parent].block_header.timestamp();
653 parent_gas_limit = *self.arena[parent].block_header.gas_limit();
654 parent_difficulty = *self.arena[parent].block_header.difficulty();
655 pos_references
656 .push(self.arena[parent].block_header.pos_reference().clone())
657 } else {
658 let parent_hash = self.arena[index].block_header.parent_hash();
659 let parent_header = self
660 .data_man
661 .block_header_by_hash(parent_hash)
662 .unwrap()
663 .clone();
664 parent_height = parent_header.height();
665 parent_timestamp = parent_header.timestamp();
666 parent_gas_limit = *parent_header.gas_limit();
667 parent_difficulty = *parent_header.difficulty();
668 pos_references.push(parent_header.pos_reference().clone());
669 }
670
671 let mut referee_hash_in_mem = HashSet::new();
673 for referee in self.arena[index].referees.iter() {
674 pos_references.push(
675 self.arena[*referee].block_header.pos_reference().clone(),
676 );
677 referee_hash_in_mem
678 .insert(self.arena[*referee].block_header.hash());
679 }
680
681 for referee_hash in self.arena[index].block_header.referee_hashes() {
682 if !referee_hash_in_mem.contains(referee_hash) {
683 let referee_header = self
684 .data_man
685 .block_header_by_hash(referee_hash)
686 .unwrap()
687 .clone();
688 pos_references.push(referee_header.pos_reference().clone());
689 }
690 }
691
692 (
693 parent_height,
694 parent_timestamp,
695 parent_gas_limit,
696 parent_difficulty,
697 pos_references,
698 )
699 }
700
701 fn verify_header_graph_ready_block(
702 &self, index: usize,
703 ) -> Result<(), Error> {
704 let epoch = self.arena[index].block_header.height();
705 let (
706 parent_height,
707 parent_timestamp,
708 parent_gas_limit,
709 parent_difficulty,
710 predecessor_pos_references,
711 ) = self.get_parent_and_referee_info(index);
712
713 if parent_height + 1 != epoch {
715 warn!("Invalid height. mine {}, parent {}", epoch, parent_height);
716 return Err(From::from(BlockError::InvalidHeight(Mismatch {
717 expected: parent_height + 1,
718 found: epoch,
719 })));
720 }
721
722 let my_timestamp = self.arena[index].block_header.timestamp();
741 if parent_timestamp > my_timestamp {
742 let my_timestamp = UNIX_EPOCH + Duration::from_secs(my_timestamp);
743 let pa_timestamp =
744 UNIX_EPOCH + Duration::from_secs(parent_timestamp);
745
746 warn!("Invalid timestamp: parent {:?} timestamp {}, me {:?} timestamp {}",
747 self.arena[index].block_header.parent_hash().clone(),
748 parent_timestamp,
749 self.arena[index].block_header.hash(),
750 self.arena[index].block_header.timestamp());
751 return Err(From::from(BlockError::InvalidTimestamp(
752 OutOfBounds {
753 max: Some(my_timestamp),
754 min: Some(pa_timestamp),
755 found: my_timestamp,
756 },
757 )));
758 }
759
760 let parent_gas_limit = parent_gas_limit
761 * if epoch == self.machine.params().transition_heights.cip1559 {
762 ELASTICITY_MULTIPLIER
763 } else {
764 1
765 };
766
767 let self_gas_limit = *self.arena[index].block_header.gas_limit();
769 let gas_limit_divisor = self.machine.params().gas_limit_bound_divisor;
770 let min_gas_limit = self.machine.params().min_gas_limit;
771 let gas_upper =
772 parent_gas_limit + parent_gas_limit / gas_limit_divisor - 1;
773 let gas_lower = max(
774 parent_gas_limit - parent_gas_limit / gas_limit_divisor + 1,
775 min_gas_limit,
776 );
777
778 if self_gas_limit < gas_lower || self_gas_limit > gas_upper {
779 return Err(From::from(BlockError::InvalidGasLimit(OutOfBounds {
780 min: Some(gas_lower),
781 max: Some(gas_upper),
782 found: self_gas_limit,
783 })));
784 }
785
786 if !self.config.is_consortium {
787 let mut difficulty_invalid = false;
789 let my_diff = *self.arena[index].block_header.difficulty();
790 let mut min_diff = my_diff;
791 let mut max_diff = my_diff;
792 let initial_difficulty: U256 =
793 self.pow_config.initial_difficulty.into();
794
795 if parent_height
796 < self
797 .pow_config
798 .difficulty_adjustment_epoch_period(parent_height)
799 {
800 if my_diff != initial_difficulty {
801 difficulty_invalid = true;
802 min_diff = initial_difficulty;
803 max_diff = initial_difficulty;
804 }
805 } else {
806 let last_period_upper = (parent_height
807 / self
808 .pow_config
809 .difficulty_adjustment_epoch_period(parent_height))
810 * self
811 .pow_config
812 .difficulty_adjustment_epoch_period(parent_height);
813 if last_period_upper != parent_height {
814 if my_diff != parent_difficulty {
816 difficulty_invalid = true;
817 min_diff = parent_difficulty;
818 max_diff = parent_difficulty;
819 }
820 } else {
821 let (lower, upper) =
822 self.pow_config.get_adjustment_bound(parent_difficulty);
823 min_diff = lower;
824 max_diff = upper;
825 if my_diff < min_diff || my_diff > max_diff {
826 difficulty_invalid = true;
827 }
828 }
829 }
830
831 if difficulty_invalid {
832 return Err(From::from(BlockError::InvalidDifficulty(
833 OutOfBounds {
834 min: Some(min_diff),
835 max: Some(max_diff),
836 found: my_diff,
837 },
838 )));
839 }
840 }
841
842 if let Some(pos_reference) =
843 self.arena[index].block_header.pos_reference()
844 {
845 let mut pred_pos_ref_list = Vec::new();
846 for maybe_pos_ref in predecessor_pos_references {
847 if let Some(pos_ref) = maybe_pos_ref {
848 pred_pos_ref_list.push(pos_ref);
849 }
850 }
851 if !self
852 .pos_verifier
853 .verify_against_predecessors(pos_reference, &pred_pos_ref_list)
854 {
855 bail!(BlockError::InvalidPosReference);
856 }
857 }
858
859 Ok(())
860 }
861
862 fn verify_graph_ready_block(
863 &self, index: usize, verification_config: &VerificationConfig,
864 ) -> Result<(), Error> {
865 let block_header = &self.arena[index].block_header;
866 let parent = self
867 .data_man
868 .block_header_by_hash(block_header.parent_hash())
869 .expect("headers will not be deleted");
870 let block = self
871 .data_man
872 .block_by_hash(&block_header.hash(), true)
873 .expect("received");
874 verification_config.verify_sync_graph_ready_block(&block, &parent)
875 }
876
877 fn process_invalid_blocks(&mut self, invalid_set: &HashSet<usize>) {
878 for index in invalid_set {
879 let hash = self.arena[*index].block_header.hash();
880 self.data_man.invalidate_block(hash);
883 }
884 self.remove_blocks(&invalid_set);
885 }
886
887 fn remove_blocks(&mut self, to_remove_set: &HashSet<usize>) {
888 for index in to_remove_set {
889 let hash = self.arena[*index].block_header.hash();
890 self.not_ready_blocks_frontier.remove(index);
891 self.pos_not_ready_blocks_frontier.remove(index);
892 self.old_era_blocks_frontier_set.remove(index);
893 self.block_to_fill_set.remove(&hash);
896
897 let parent = self.arena[*index].parent;
898 if parent != NULL {
899 self.arena[parent].children.retain(|&x| x != *index);
900 }
901 let parent_hash = *self.arena[*index].block_header.parent_hash();
902 let mut remove_parent_hash: bool = false;
903 if let Some(children) = self.children_by_hash.get_mut(&parent_hash)
904 {
905 children.retain(|&x| x != *index);
906 remove_parent_hash = children.len() == 0;
907 }
908 if remove_parent_hash {
910 self.children_by_hash.remove(&parent_hash);
911 }
912
913 let referees: Vec<usize> =
914 self.arena[*index].referees.iter().map(|x| *x).collect();
915 for referee in referees {
916 self.arena[referee].referrers.retain(|&x| x != *index);
917 }
918 let referee_hashes: Vec<H256> = self.arena[*index]
919 .block_header
920 .referee_hashes()
921 .iter()
922 .map(|x| *x)
923 .collect();
924 for referee_hash in referee_hashes {
925 let mut remove_referee_hash: bool = false;
926 if let Some(referrers) =
927 self.referrers_by_hash.get_mut(&referee_hash)
928 {
929 referrers.retain(|&x| x != *index);
930 remove_referee_hash = referrers.len() == 0;
931 }
932 if remove_referee_hash {
934 self.referrers_by_hash.remove(&referee_hash);
935 }
936 }
937
938 let children: Vec<usize> =
939 self.arena[*index].children.iter().map(|x| *x).collect();
940 for child in children {
941 debug_assert!(to_remove_set.contains(&child));
942 self.arena[child].parent = NULL;
943 }
944
945 let referrers: Vec<usize> =
946 self.arena[*index].referrers.iter().map(|x| *x).collect();
947 for referrer in referrers {
948 debug_assert!(to_remove_set.contains(&referrer));
949 self.arena[referrer].referees.retain(|&x| x != *index);
950 }
951
952 self.arena.remove(*index);
953 self.hash_to_arena_indices.remove(&hash);
954 self.data_man
956 .remove_useless_block(&hash, true );
957 }
958 }
959
960 fn set_and_propagate_invalid(
961 &mut self, queue: &mut VecDeque<usize>,
962 invalid_set: &mut HashSet<usize>, index: usize,
963 ) {
964 let children =
965 mem::replace(&mut self.arena[index].children, Vec::new());
966 for child in &children {
967 if !invalid_set.contains(&child) {
968 self.arena[*child].graph_status = BLOCK_INVALID;
969 queue.push_back(*child);
970 invalid_set.insert(*child);
971 }
972 }
973 self.arena[index].children = children;
974 let referrers =
975 mem::replace(&mut self.arena[index].referrers, Vec::new());
976 for referrer in &referrers {
977 if !invalid_set.contains(&referrer) {
978 self.arena[*referrer].graph_status = BLOCK_INVALID;
979 queue.push_back(*referrer);
980 invalid_set.insert(*referrer);
981 }
982 }
983 self.arena[index].referrers = referrers;
984 }
985}
986
987struct ConsensusWorkerHandle {
991 thread: Mutex<Option<thread::JoinHandle<()>>>,
992 new_block_hashes: Arc<Channel<H256>>,
994 worker_subscription_id: u64,
995}
996
997impl ConsensusWorkerHandle {
998 fn stop(&self) {
999 self.new_block_hashes
1000 .unsubscribe(self.worker_subscription_id);
1001 if let Some(handle) = self.thread.lock().take() {
1002 handle.join().expect("Consensus Worker should not panic");
1003 }
1004 }
1005}
1006
1007impl Drop for ConsensusWorkerHandle {
1008 fn drop(&mut self) { self.stop(); }
1009}
1010
1011pub struct SynchronizationGraph {
1012 pub inner: Arc<RwLock<SynchronizationGraphInner>>,
1013 pub consensus: SharedConsensusGraph,
1014 pub data_man: Arc<BlockDataManager>,
1015 pub pow: Arc<PowComputer>,
1016 pub verification_config: VerificationConfig,
1017 pub sync_config: SyncGraphConfig,
1018 pub statistics: SharedStatistics,
1019 consensus_unprocessed_count: Arc<AtomicUsize>,
1023
1024 new_block_hashes: Arc<Channel<H256>>,
1027
1028 pub future_blocks: FutureBlockContainer,
1031
1032 machine: Arc<Machine>,
1033
1034 #[allow(unused)]
1036 consensus_worker_handle: ConsensusWorkerHandle,
1037}
1038
1039impl MallocSizeOf for SynchronizationGraph {
1040 fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
1041 let inner_size = self.inner.read().size_of(ops);
1042 let mut malloc_size = inner_size + self.data_man.size_of(ops);
1043
1044 if !self.is_consortium() {
1046 let consensus_graph = &*self.consensus;
1047 malloc_size += consensus_graph.size_of(ops);
1048 }
1049 malloc_size
1052 }
1053}
1054
1055pub type SharedSynchronizationGraph = Arc<SynchronizationGraph>;
1056
1057impl SynchronizationGraph {
1058 pub fn new(
1059 consensus: SharedConsensusGraph, data_man: Arc<BlockDataManager>,
1060 statistics: SharedStatistics, verification_config: VerificationConfig,
1061 pow_config: ProofOfWorkConfig, pow: Arc<PowComputer>,
1062 sync_config: SyncGraphConfig, notifications: Arc<Notifications>,
1063 machine: Arc<Machine>, pos_verifier: Arc<PosVerifier>,
1064 ) -> Self {
1065 let genesis_hash = data_man.get_cur_consensus_era_genesis_hash();
1066 let genesis_block_header = data_man
1067 .block_header_by_hash(&genesis_hash)
1068 .expect("genesis block header should exist here");
1069
1070 let consensus_unprocessed_count = Arc::new(AtomicUsize::new(0));
1073 let mut consensus_receiver = notifications.new_block_hashes.subscribe();
1074 let inner = Arc::new(RwLock::new(
1075 SynchronizationGraphInner::with_genesis_block(
1076 genesis_block_header.clone(),
1077 pow_config,
1078 pow.clone(),
1079 sync_config,
1080 data_man.clone(),
1081 machine.clone(),
1082 pos_verifier.clone(),
1083 ),
1084 ));
1085 let worker_subscription_id = consensus_receiver.id;
1086
1087 let worker_data_man = data_man.clone();
1089 let worker_consensus = consensus.clone();
1090 let worker_unprocessed_count = consensus_unprocessed_count.clone();
1091
1092 let handle = thread::Builder::new()
1095 .name("Consensus Worker".into())
1096 .spawn(move || {
1098 let mut priority_queue: BinaryHeap<(u64, H256)> = BinaryHeap::new();
1104 let mut reverse_map : HashMap<H256, Vec<H256>> = HashMap::new();
1105 let mut counter_map = HashMap::new();
1106 let mut pos_started = false;
1107
1108 'outer: loop {
1109 let mut blocking = priority_queue.is_empty();
1111 'inner: loop {
1112 let maybe_item = if blocking {
1115 blocking = false;
1116 match consensus_receiver.recv_blocking() {
1117 Some(item) => Ok(item),
1118 None => break 'outer, }
1120 } else {
1121 consensus_receiver.try_recv()
1122 };
1123
1124 match maybe_item {
1125 Ok(hash) => if !reverse_map.contains_key(&hash) {
1127 debug!("Worker thread receive: block = {}", hash);
1128 let header = worker_data_man.block_header_by_hash(&hash).expect("Header must exist before sending to the consensus worker!");
1129
1130 if !pos_started && pos_verifier.is_enabled_at_height(header.height() + worker_consensus.config().inner_conf.era_epoch_count) {
1132 if let Err(e) = pos_verifier.initialize(worker_consensus.clone()) {
1133 info!("PoS cannot be started at the expected height: e={}", e);
1134 } else {
1135 pos_started = true;
1136 }
1137 }
1138
1139 let mut cnt: usize = 0;
1140 let parent_hash = header.parent_hash();
1141 if let Some(v) = reverse_map.get_mut(parent_hash) {
1142 v.push(hash.clone());
1143 cnt += 1;
1144 }
1145 for referee in header.referee_hashes() {
1146 if let Some(v) = reverse_map.get_mut(referee) {
1147 v.push(hash.clone());
1148 cnt += 1;
1149 }
1150 }
1151 if let Some(pivot_decision) = header.pos_reference().as_ref().and_then(|pos_reference| pos_verifier.get_pivot_decision(pos_reference)) {
1152 if let Some(v) = reverse_map.get_mut(&pivot_decision) {
1153 v.push(hash.clone());
1154 cnt += 1;
1155 }
1156 }
1157 reverse_map.insert(hash.clone(), Vec::new());
1158 if cnt == 0 {
1159 let epoch_number = worker_consensus.get_block_epoch_number(parent_hash).unwrap_or(0);
1160 priority_queue.push((epoch_number, hash));
1161 } else {
1162 counter_map.insert(hash, cnt);
1163 }
1164 } else {
1165 warn!("Duplicate block = {} sent to the consensus worker", hash);
1166 },
1167 Err(TryRecvError::Empty) => break 'inner,
1168 Err(TryRecvError::Disconnected) => break 'outer,
1169 }
1170 }
1171 if let Some((_, hash)) = priority_queue.pop() {
1172 CONSENSUS_WORKER_QUEUE.dequeue(1);
1173 let successors = reverse_map.remove(&hash).unwrap();
1174 for succ in successors {
1175 let cnt_tuple = counter_map.get_mut(&succ).unwrap();
1176 *cnt_tuple -= 1;
1177 if *cnt_tuple == 0 {
1178 counter_map.remove(&succ);
1179 let header_succ = worker_data_man.block_header_by_hash(&succ).expect("Header must exist before sending to the consensus worker!");
1180 let parent_succ = header_succ.parent_hash();
1181 let epoch_number = worker_consensus.get_block_epoch_number(parent_succ).unwrap_or(0);
1182 priority_queue.push((epoch_number, succ));
1183 }
1184 }
1185 worker_consensus.on_new_block(
1186 &hash,
1187 );
1188 worker_unprocessed_count.fetch_sub(1, Ordering::SeqCst);
1189 }
1190 }
1191 })
1192 .expect("Cannot fail");
1193
1194 let consensus_worker_handle = ConsensusWorkerHandle {
1195 thread: Mutex::new(Some(handle)),
1196 new_block_hashes: notifications.new_block_hashes.clone(),
1197 worker_subscription_id,
1198 };
1199
1200 let sync_graph = SynchronizationGraph {
1201 inner: inner.clone(),
1202 future_blocks: FutureBlockContainer::new(
1203 sync_config.future_block_buffer_capacity,
1204 ),
1205 data_man: data_man.clone(),
1206 pow: pow.clone(),
1207 verification_config,
1208 sync_config,
1209 consensus: consensus.clone(),
1210 statistics: statistics.clone(),
1211 consensus_unprocessed_count: consensus_unprocessed_count.clone(),
1212 new_block_hashes: notifications.new_block_hashes.clone(),
1213 machine,
1214 consensus_worker_handle,
1215 };
1216 sync_graph
1217 }
1218
1219 pub fn is_consortium(&self) -> bool { self.sync_config.is_consortium }
1220
1221 pub fn machine(&self) -> Arc<Machine> { self.machine.clone() }
1222
1223 pub fn get_genesis_hash_and_height_in_current_era(&self) -> (H256, u64) {
1224 self.inner
1225 .read()
1226 .get_genesis_hash_and_height_in_current_era()
1227 }
1228
1229 pub fn expected_difficulty(&self, parent_hash: &H256) -> U256 {
1232 self.consensus.expected_difficulty(parent_hash)
1233 }
1234
1235 pub fn get_to_propagate_trans(
1236 &self,
1237 ) -> HashMap<H256, Arc<SignedTransaction>> {
1238 self.consensus.tx_pool().get_to_be_propagated_transactions()
1239 }
1240
1241 pub fn set_to_propagate_trans(
1242 &self, transactions: HashMap<H256, Arc<SignedTransaction>>,
1243 ) {
1244 self.consensus
1245 .tx_pool()
1246 .set_to_be_propagated_transactions(transactions);
1247 }
1248
1249 pub fn recover_graph_from_db(&self) {
1254 info!("Start fast recovery of the block DAG from database");
1255
1256 let genesis_hash = self.data_man.get_cur_consensus_era_genesis_hash();
1259 let genesis_local_info =
1260 self.data_man.local_block_info_by_hash(&genesis_hash);
1261 if genesis_local_info.is_none() {
1262 panic!(
1264 "failed to get local block info from db for genesis[{}]",
1265 genesis_hash
1266 );
1267 }
1268 let genesis_seq_num = genesis_local_info.unwrap().get_seq_num();
1269 self.consensus.set_initial_sequence_number(genesis_seq_num);
1270 let genesis_header =
1271 self.data_man.block_header_by_hash(&genesis_hash).unwrap();
1272 debug!(
1273 "Get current genesis_block hash={:?}, height={}, seq_num={}",
1274 genesis_hash,
1275 genesis_header.height(),
1276 genesis_seq_num
1277 );
1278
1279 let terminals_opt = self.data_man.terminals_from_db();
1281 if terminals_opt.is_none() {
1282 return;
1283 }
1284 let terminals = terminals_opt.unwrap();
1285 debug!("Get terminals {:?}", terminals);
1286
1287 let mut queue = VecDeque::new();
1295 let mut visited_blocks: HashSet<H256> = HashSet::new();
1296 for terminal in terminals {
1297 if !visited_blocks.contains(&terminal) {
1299 queue.push_back(terminal);
1300 visited_blocks.insert(terminal);
1301 }
1302 }
1303
1304 let mut missed_hashes = HashSet::new();
1308 while let Some(hash) = queue.pop_front() {
1309 if hash == genesis_hash {
1310 continue;
1312 }
1313
1314 if let Some(block_local_info) =
1318 self.data_man.local_block_info_by_hash(&hash)
1319 {
1320 if block_local_info.get_seq_num() < genesis_seq_num {
1321 debug!(
1322 "Skip block {:?} before checkpoint: seq_num={}",
1323 hash,
1324 block_local_info.get_seq_num()
1325 );
1326 continue;
1327 }
1328 }
1329
1330 if let Some(block_header) =
1331 self.data_man.block_header_by_hash(&hash)
1332 {
1333 self.insert_block_header(
1334 &mut block_header.as_ref().clone(),
1335 true, false, true, false, );
1340 let parent = block_header.parent_hash().clone();
1341 let referees = block_header.referee_hashes().clone();
1342 if !visited_blocks.contains(&parent) {
1343 queue.push_back(parent);
1344 visited_blocks.insert(parent);
1345 }
1346 for referee in referees {
1347 if !visited_blocks.contains(&referee) {
1348 queue.push_back(referee);
1349 visited_blocks.insert(referee);
1350 }
1351 }
1352 } else {
1353 missed_hashes.insert(hash);
1354 }
1355 }
1356
1357 debug!(
1358 "Current frontier after recover from db: {:?}",
1359 self.inner.read().not_ready_blocks_frontier.get_frontier()
1360 );
1361
1362 info!("Finish reconstructing the pivot chain of length {}, start to sync from peers", self.consensus.best_epoch_number());
1363 }
1364
1365 pub fn block_header_by_hash(&self, hash: &H256) -> Option<BlockHeader> {
1367 if !self.contains_block_header(hash) {
1368 return None;
1370 }
1371 self.data_man
1372 .block_header_by_hash(hash)
1373 .map(|header_ref| header_ref.as_ref().clone())
1374 }
1375
1376 pub fn block_height_by_hash(&self, hash: &H256) -> Option<u64> {
1378 self.block_header_by_hash(hash)
1379 .map(|header| header.height())
1380 }
1381
1382 pub fn block_timestamp_by_hash(&self, hash: &H256) -> Option<u64> {
1384 self.block_header_by_hash(hash)
1385 .map(|header| header.timestamp())
1386 }
1387
1388 pub fn block_by_hash(&self, hash: &H256) -> Option<Arc<Block>> {
1391 self.data_man.block_by_hash(hash, true )
1392 }
1393
1394 pub fn contains_block_header(&self, hash: &H256) -> bool {
1395 self.inner.read().hash_to_arena_indices.contains_key(hash)
1396 || self.future_blocks.contains(hash)
1397 }
1398
1399 fn parent_or_referees_invalid(&self, header: &BlockHeader) -> bool {
1400 self.data_man.verified_invalid(header.parent_hash()).0
1401 || header
1402 .referee_hashes()
1403 .iter()
1404 .any(|referee| self.data_man.verified_invalid(referee).0)
1405 }
1406
1407 fn propagate_header_graph_status(
1409 &self, inner: &mut SynchronizationGraphInner,
1410 frontier_index_list: Vec<usize>, need_to_verify: bool,
1411 header_index_to_insert: usize, insert_to_consensus: bool,
1412 persistent: bool,
1413 ) -> (HashSet<usize>, Vec<H256>) {
1414 let now = SystemTime::now()
1415 .duration_since(UNIX_EPOCH)
1416 .unwrap()
1417 .as_secs();
1418 let mut need_to_relay: Vec<H256> = Vec::new();
1419 let mut invalid_set: HashSet<usize> = HashSet::new();
1420 let mut queue = VecDeque::new();
1421
1422 for index in frontier_index_list {
1423 if inner.arena[index].graph_status == BLOCK_INVALID {
1424 invalid_set.insert(index);
1425 }
1426 queue.push_back(index);
1427 }
1428
1429 while let Some(index) = queue.pop_front() {
1430 if inner.arena[index].graph_status == BLOCK_INVALID {
1431 inner.set_and_propagate_invalid(
1432 &mut queue,
1433 &mut invalid_set,
1434 index,
1435 );
1436 } else if inner.new_to_be_header_graph_ready(index) {
1437 inner.arena[index].graph_status = BLOCK_HEADER_GRAPH_READY;
1438 inner.arena[index].last_update_timestamp = now;
1439 debug!("BlockIndex {} parent_index {} hash {:?} is header graph ready", index,
1440 inner.arena[index].parent, inner.arena[index].block_header.hash());
1441
1442 let r = inner.verify_header_graph_ready_block(index);
1443
1444 if need_to_verify && r.is_err() {
1445 warn!(
1446 "Invalid header_arc! inserted_header={:?} err={:?}",
1447 inner.arena[index].block_header.clone(),
1448 r
1449 );
1450 invalid_set.insert(index);
1451 inner.arena[index].graph_status = BLOCK_INVALID;
1452 inner.set_and_propagate_invalid(
1453 &mut queue,
1454 &mut invalid_set,
1455 index,
1456 );
1457 continue;
1458 }
1459
1460 if inner.arena[index].parent_reclaimed {
1463 inner.old_era_blocks_frontier.push_back(index);
1464 inner.old_era_blocks_frontier_set.insert(index);
1465 }
1466
1467 if index == header_index_to_insert && persistent {
1474 self.data_man.insert_block_header(
1475 inner.arena[index].block_header.hash(),
1476 inner.arena[index].block_header.clone(),
1477 true,
1478 );
1479 }
1480 if insert_to_consensus {
1481 CONSENSUS_WORKER_QUEUE.enqueue(1);
1482
1483 self.consensus_unprocessed_count
1484 .fetch_add(1, Ordering::SeqCst);
1485 assert!(
1486 self.new_block_hashes
1487 .send(inner.arena[index].block_header.hash(),),
1488 "consensus receiver dropped"
1489 );
1490
1491 inner.not_ready_blocks_frontier.remove(&index);
1493 inner.pos_not_ready_blocks_frontier.remove(&index);
1496 for child in &inner.arena[index].children {
1497 inner.not_ready_blocks_frontier.insert(*child);
1498 }
1499 }
1500
1501 if inner.arena[index].block_ready {
1503 need_to_relay.push(inner.arena[index].block_header.hash());
1504 }
1505
1506 for child in &inner.arena[index].children {
1507 if inner.arena[*child].graph_status
1508 < BLOCK_HEADER_GRAPH_READY
1509 {
1510 queue.push_back(*child);
1511 }
1512 }
1513 for referrer in &inner.arena[index].referrers {
1514 if inner.arena[*referrer].graph_status
1515 < BLOCK_HEADER_GRAPH_READY
1516 {
1517 queue.push_back(*referrer);
1518 }
1519 }
1520 } else {
1521 debug!(
1522 "BlockIndex {} parent_index {} hash {:?} is not ready",
1523 index,
1524 inner.arena[index].parent,
1525 inner.arena[index].block_header.hash()
1526 );
1527 if index == header_index_to_insert && persistent {
1528 self.data_man.insert_block_header(
1529 inner.arena[index].block_header.hash(),
1530 inner.arena[index].block_header.clone(),
1531 true,
1532 );
1533 }
1534 }
1535 }
1536 (invalid_set, need_to_relay)
1537 }
1538
1539 pub fn insert_block_header(
1540 &self, header: &mut BlockHeader, need_to_verify: bool,
1541 bench_mode: bool, insert_to_consensus: bool, persistent: bool,
1542 ) -> (BlockHeaderInsertionResult, Vec<H256>) {
1543 let _timer = MeterTimer::time_func(SYNC_INSERT_HEADER.as_ref());
1544 self.statistics.inc_sync_graph_inserted_header_count();
1545 let inner = &mut *self.inner.write();
1546 if inner.locked_for_catchup {
1547 return (BlockHeaderInsertionResult::TemporarySkipped, Vec::new());
1549 }
1550 let hash = header.hash();
1551
1552 let (invalid, local_info_opt) = self.data_man.verified_invalid(&hash);
1553 if invalid {
1554 return (BlockHeaderInsertionResult::Invalid, Vec::new());
1555 }
1556
1557 if let Some(info) = local_info_opt {
1558 let already_processed = info.get_seq_num()
1563 < self.consensus.current_era_genesis_seq_num()
1564 || info.get_instance_id() == self.data_man.get_instance_id();
1565 if already_processed {
1566 if need_to_verify && !self.is_consortium() {
1567 VerificationConfig::get_or_fill_header_pow_quality(
1570 &self.pow, header,
1571 );
1572 }
1573 return (
1574 BlockHeaderInsertionResult::AlreadyProcessedInConsensus,
1575 Vec::new(),
1576 );
1577 }
1578 }
1579
1580 if inner.hash_to_arena_indices.contains_key(&hash) {
1581 if need_to_verify {
1582 VerificationConfig::get_or_fill_header_pow_quality(
1585 &self.pow, header,
1586 );
1587 }
1588 return (
1589 BlockHeaderInsertionResult::AlreadyProcessedInSync,
1590 Vec::new(),
1591 );
1592 }
1593
1594 debug!("is_consortium={:?}", self.is_consortium());
1596 let verification_passed = if need_to_verify {
1597 self.is_consortium()
1598 || !(self.parent_or_referees_invalid(header)
1599 || self
1600 .verification_config
1601 .verify_header_params(&self.pow, header)
1602 .or_else(|e| {
1603 warn!(
1604 "Invalid header: err={} header={:?}",
1605 e, header
1606 );
1607 Err(e)
1608 })
1609 .is_err())
1610 } else {
1611 if !bench_mode && !self.is_consortium() {
1612 self.verification_config
1613 .verify_pow(&self.pow, header)
1614 .expect("local mined block should pass this check!");
1615 }
1616 true
1617 };
1618
1619 let header_arc = Arc::new(header.clone());
1620 let me = if verification_passed {
1621 inner.insert(header_arc.clone())
1622 } else {
1623 inner.insert_invalid(header_arc.clone())
1624 };
1625
1626 if inner.arena[me].graph_status != BLOCK_GRAPH_READY {
1631 if inner.arena[me].parent == NULL
1640 || inner.arena[inner.arena[me].parent].graph_status
1641 == BLOCK_GRAPH_READY
1642 || (insert_to_consensus
1643 && inner.arena[inner.arena[me].parent].graph_status
1644 == BLOCK_HEADER_GRAPH_READY)
1645 {
1646 inner.not_ready_blocks_frontier.insert(me);
1647 }
1648 let mut to_be_removed = Vec::new();
1649 for child in &inner.arena[me].children {
1650 if inner.not_ready_blocks_frontier.contains(child) {
1651 to_be_removed.push(*child);
1652 }
1653 }
1654 for x in to_be_removed {
1655 inner.not_ready_blocks_frontier.remove(&x);
1656 }
1657 }
1658
1659 debug!("insert_block_header() Block = {:?}, index = {}, need_to_verify = {}, bench_mode = {} insert_to_consensus = {}",
1660 header.hash(), me, need_to_verify, bench_mode, insert_to_consensus);
1661
1662 let (invalid_set, need_to_relay) = self.propagate_header_graph_status(
1664 inner,
1665 vec![me],
1666 need_to_verify,
1667 me,
1668 insert_to_consensus,
1669 persistent,
1670 );
1671
1672 let me_invalid = invalid_set.contains(&me);
1673
1674 inner.process_invalid_blocks(&invalid_set);
1676
1677 if me_invalid {
1678 return (BlockHeaderInsertionResult::Invalid, need_to_relay);
1679 }
1680
1681 inner.try_clear_old_era_blocks();
1682
1683 (BlockHeaderInsertionResult::NewValid, need_to_relay)
1684 }
1685
1686 pub fn contains_block(&self, hash: &H256) -> bool {
1687 let inner = self.inner.read();
1688 if let Some(index) = inner.hash_to_arena_indices.get(hash) {
1689 inner.arena[*index].block_ready
1690 } else {
1691 false
1692 }
1693 }
1694
1695 fn set_graph_ready(
1696 &self, inner: &mut SynchronizationGraphInner, index: usize,
1697 ) {
1698 inner.arena[index].graph_status = BLOCK_GRAPH_READY;
1699
1700 inner.not_ready_blocks_frontier.remove(&index);
1702 inner.pos_not_ready_blocks_frontier.remove(&index);
1705 for child in &inner.arena[index].children {
1706 inner.not_ready_blocks_frontier.insert(*child);
1707 }
1708
1709 let h = inner.arena[index].block_header.hash();
1710 debug!("Block {:?} is graph ready", h);
1711 CONSENSUS_WORKER_QUEUE.enqueue(1);
1712
1713 self.consensus_unprocessed_count
1714 .fetch_add(1, Ordering::SeqCst);
1715 assert!(self.new_block_hashes.send(h), "consensus receiver dropped");
1716
1717 if inner.config.enable_state_expose {
1718 STATE_EXPOSER.sync_graph.lock().ready_block_vec.push(
1719 SyncGraphBlockState {
1720 block_hash: h,
1721 parent: inner.arena[index]
1722 .block_header
1723 .parent_hash()
1724 .clone(),
1725 referees: inner.arena[index]
1726 .block_header
1727 .referee_hashes()
1728 .clone(),
1729 nonce: inner.arena[index].block_header.nonce(),
1730 timestamp: inner.arena[index].block_header.timestamp(),
1731 adaptive: inner.arena[index].block_header.adaptive(),
1732 },
1733 );
1734 }
1735 }
1736
1737 fn propagate_graph_status(
1739 &self, inner: &mut SynchronizationGraphInner,
1740 frontier_index_list: Vec<usize>,
1741 ) -> HashSet<usize> {
1742 let mut queue = VecDeque::new();
1743 let mut invalid_set = HashSet::new();
1744 for index in frontier_index_list {
1745 if inner.arena[index].graph_status == BLOCK_INVALID {
1746 invalid_set.insert(index);
1747 }
1748 queue.push_back(index);
1749 }
1750
1751 while let Some(index) = queue.pop_front() {
1752 if inner.arena[index].graph_status == BLOCK_INVALID {
1753 inner.set_and_propagate_invalid(
1754 &mut queue,
1755 &mut invalid_set,
1756 index,
1757 );
1758 } else if inner.new_to_be_block_graph_ready(index) {
1759 let verify_result = inner
1760 .verify_graph_ready_block(index, &self.verification_config);
1761 if verify_result.is_err() {
1762 warn!(
1763 "Invalid block! inserted_header={:?} err={:?}",
1764 inner.arena[index].block_header.clone(),
1765 verify_result
1766 );
1767 invalid_set.insert(index);
1768 inner.arena[index].graph_status = BLOCK_INVALID;
1769 inner.set_and_propagate_invalid(
1770 &mut queue,
1771 &mut invalid_set,
1772 index,
1773 );
1774 continue;
1775 }
1776 self.set_graph_ready(inner, index);
1777 for child in &inner.arena[index].children {
1778 debug_assert!(
1779 inner.arena[*child].graph_status < BLOCK_GRAPH_READY
1780 );
1781 queue.push_back(*child);
1782 }
1783 for referrer in &inner.arena[index].referrers {
1784 debug_assert!(
1785 inner.arena[*referrer].graph_status < BLOCK_GRAPH_READY
1786 );
1787 queue.push_back(*referrer);
1788 }
1789 } else {
1790 trace!("Block index {:?} not block_graph_ready, current frontier: {:?}", index, inner.not_ready_blocks_frontier.get_frontier());
1791 }
1792 }
1793
1794 invalid_set
1795 }
1796
1797 pub fn insert_block(
1798 &self, block: Block, need_to_verify: bool, persistent: bool,
1799 recover_from_db: bool,
1800 ) -> BlockInsertionResult {
1801 let _timer = MeterTimer::time_func(SYNC_INSERT_BLOCK.as_ref());
1802 let hash = block.hash();
1803
1804 debug!("insert_block {:?}", hash);
1805
1806 let inner = &mut *self.inner.write();
1807
1808 let contains_block =
1809 if let Some(index) = inner.hash_to_arena_indices.get(&hash) {
1810 inner.arena[*index].block_ready
1811 } else {
1812 return BlockInsertionResult::Ignored;
1815 };
1816
1817 if contains_block {
1818 return BlockInsertionResult::AlreadyProcessed;
1819 }
1820
1821 debug_assert!(!self.data_man.verified_invalid(&hash).0);
1824
1825 self.statistics.inc_sync_graph_inserted_block_count();
1826
1827 let me = *inner.hash_to_arena_indices.get(&hash).unwrap();
1828
1829 debug_assert!(hash == inner.arena[me].block_header.hash());
1830 debug_assert!(!inner.arena[me].block_ready);
1831 inner.arena[me].block_ready = true;
1832
1833 if need_to_verify {
1834 let r = self.verification_config.verify_sync_graph_block_basic(
1835 &block,
1836 self.consensus.best_chain_id(),
1837 );
1838 match r {
1839 Err(Error::Block(BlockError::InvalidTransactionsRoot(e))) => {
1840 warn!("BlockTransactionRoot not match! inserted_block={:?} err={:?}", block, e);
1841 inner.arena[me].block_ready = false;
1849 return BlockInsertionResult::RequestAgain;
1850 }
1851 Err(e) => {
1852 warn!(
1853 "Invalid block! inserted_block={:?} err={:?}",
1854 block.block_header, e
1855 );
1856 inner.arena[me].graph_status = BLOCK_INVALID;
1857 }
1858 _ => {}
1859 };
1860 }
1861
1862 let block = Arc::new(block);
1863 if inner.arena[me].graph_status != BLOCK_INVALID {
1864 if !recover_from_db {
1867 self.data_man.insert_compact_block(block.to_compact());
1870 self.data_man.insert_block_body(
1873 block.hash(),
1874 block.clone(),
1875 persistent,
1876 );
1877 }
1878 }
1879
1880 if inner.locked_for_catchup {
1883 if inner.arena[me].graph_status == BLOCK_INVALID {
1884 let invalid_set = self.propagate_graph_status(inner, vec![me]);
1885 inner.process_invalid_blocks(&invalid_set);
1889 return BlockInsertionResult::Invalid;
1890 } else {
1891 debug!("Downloaded block body for {:?}", hash);
1892 inner.block_to_fill_set.remove(&hash);
1893 return BlockInsertionResult::AlreadyProcessed;
1894 }
1895 }
1896
1897 let invalid_set = self.propagate_graph_status(inner, vec![me]);
1898
1899 inner.process_invalid_blocks(&invalid_set);
1901
1902 debug!(
1903 "new block inserted into graph: block_header={:?}, tx_count={}, block_size={}",
1904 block.block_header,
1905 block.transactions.len(),
1906 block.size(),
1907 );
1908
1909 if invalid_set.contains(&me) {
1912 BlockInsertionResult::Invalid
1913 } else if inner.arena[me].graph_status >= BLOCK_HEADER_GRAPH_READY {
1914 BlockInsertionResult::ShouldRelay
1915 } else {
1916 BlockInsertionResult::SuccessWithoutRelay
1917 }
1918 }
1919
1920 pub fn get_all_block_hashes_by_epoch(
1921 &self, epoch_number: u64,
1922 ) -> Result<Vec<H256>, ProviderBlockError> {
1923 let mut res = self.consensus.get_skipped_block_hashes_by_epoch(
1924 EpochNumber::Number(epoch_number.into()),
1925 )?;
1926 res.append(&mut self.consensus.get_block_hashes_by_epoch(
1927 EpochNumber::Number(epoch_number.into()),
1928 )?);
1929 Ok(res)
1930 }
1931
1932 pub fn log_statistics(&self) { self.statistics.log_statistics(); }
1933
1934 pub fn update_total_weight_delta_heartbeat(&self) {
1935 self.consensus.update_total_weight_delta_heartbeat();
1936 }
1937
1938 pub fn block_count(&self) -> usize { self.data_man.cached_block_count() }
1942
1943 pub fn remove_expire_blocks(&self, expire_time: u64) {
1948 let inner = &mut *self.inner.write();
1949 let now = SystemTime::now()
1950 .duration_since(UNIX_EPOCH)
1951 .unwrap()
1952 .as_secs();
1953 let frontier = inner.not_ready_blocks_frontier.get_frontier().clone();
1954 let all_not_ready: HashSet<_> = inner.get_future(frontier);
1955 let mut expire_set = HashSet::new();
1956 for index in all_not_ready {
1957 if inner.arena[index].last_update_timestamp + expire_time < now {
1958 expire_set.insert(index);
1959 }
1960 }
1961
1962 let all_expire: HashSet<_> = inner.get_future(expire_set);
1964 debug!("all_expire: {:?}", all_expire);
1965 inner.remove_blocks(&all_expire);
1966 }
1967
1968 pub fn remove_blocks_and_future(&self, to_remove_set: &HashSet<H256>) {
1971 let mut inner = self.inner.write();
1972 let mut index_set = Vec::new();
1973 for block_hash in to_remove_set {
1974 if let Some(index) = inner.hash_to_arena_indices.get(block_hash) {
1975 index_set.push(*index);
1976 }
1977 }
1978 let index_set_and_future: HashSet<_> = inner.get_future(index_set);
1979 inner.remove_blocks(&index_set_and_future);
1980 }
1981
1982 pub fn is_consensus_worker_busy(&self) -> bool {
1983 self.consensus_unprocessed_count.load(Ordering::SeqCst) != 0
1984 }
1985
1986 pub fn is_fill_block_completed(&self) -> bool {
1987 self.inner.read().block_to_fill_set.is_empty()
1988 }
1989
1990 pub fn complete_filling_block_bodies(&self) -> bool {
2001 {
2002 let inner = &mut *self.inner.write();
2003
2004 let to_remove = {
2007 let arena = &mut inner.arena;
2008 inner
2009 .hash_to_arena_indices
2010 .iter()
2011 .filter_map(|(_, index)| {
2012 let graph_node = &mut arena[*index];
2013 if graph_node.graph_status == BLOCK_HEADER_GRAPH_READY {
2014 graph_node.block_ready = true;
2015 graph_node.graph_status = BLOCK_GRAPH_READY;
2016 }
2017 if graph_node.graph_status != BLOCK_GRAPH_READY {
2018 Some(*index)
2019 } else {
2020 None
2021 }
2022 })
2023 .collect()
2024 };
2025 inner.remove_blocks(&to_remove);
2026
2027 let skipped_body_blocks =
2031 self.consensus.get_blocks_needing_bodies();
2032 if !skipped_body_blocks.is_empty() {
2033 warn!("Has invalid blocks after downloading block bodies!");
2034 self.consensus.reset();
2038
2039 let all_block_indices: HashSet<_> = inner
2040 .hash_to_arena_indices
2041 .iter()
2042 .map(|(_, i)| *i)
2043 .collect();
2044 let sorted_blocks = inner.topological_sort(all_block_indices);
2046 for i in sorted_blocks {
2047 self.consensus
2048 .on_new_block(&inner.arena[i].block_header.hash());
2049 }
2050 let new_to_fill_blocks: HashSet<_> =
2051 self.consensus.get_blocks_needing_bodies();
2052 if !new_to_fill_blocks.is_empty() {
2053 warn!(
2057 "{} new block bodies to get",
2058 new_to_fill_blocks.len()
2059 );
2060 inner.block_to_fill_set = new_to_fill_blocks;
2061 return false;
2062 }
2063 }
2064 }
2065 self.consensus.construct_pivot_state();
2066 self.inner.write().locked_for_catchup = false;
2067 true
2068 }
2069
2070 pub fn check_not_ready_frontier(&self, header_only: bool) {
2075 debug!("check_not_ready_frontier starts");
2076 let mut inner = self.inner.write();
2077 if inner.locked_for_catchup {
2078 return;
2081 }
2082 if header_only {
2083 for b in inner.pos_not_ready_blocks_frontier.clone() {
2084 debug!(
2085 "check_not_ready_frontier: check {:?}",
2086 inner.arena[b].block_header.hash()
2087 );
2088 if inner.new_to_be_header_graph_ready(b) {
2089 self.propagate_header_graph_status(
2090 &mut *inner,
2091 vec![b],
2092 true, b,
2094 true, true, );
2097 }
2098 }
2099 } else {
2100 for b in inner.pos_not_ready_blocks_frontier.clone() {
2101 debug!(
2102 "check_not_ready_frontier: check {:?}",
2103 inner.arena[b].block_header.hash()
2104 );
2105 if inner.new_to_be_header_graph_ready(b) {
2106 self.propagate_header_graph_status(
2107 &mut *inner,
2108 vec![b],
2109 true, b,
2111 false, true, );
2114 }
2115 if inner.new_to_be_block_graph_ready(b) {
2118 debug!("new graph ready found");
2119 self.propagate_graph_status(&mut *inner, vec![b]);
2120 }
2121 }
2122 }
2123 }
2124}
2125
2126impl Graph for SynchronizationGraphInner {
2127 type NodeIndex = usize;
2128}
2129
2130impl TreeGraph for SynchronizationGraphInner {
2131 fn parent(&self, node_index: Self::NodeIndex) -> Option<Self::NodeIndex> {
2132 if self.arena[node_index].parent != NULL {
2133 Some(self.arena[node_index].parent)
2134 } else {
2135 None
2136 }
2137 }
2138
2139 fn referees(&self, node_index: Self::NodeIndex) -> Vec<Self::NodeIndex> {
2140 self.arena[node_index].referees.clone()
2141 }
2142}
2143
2144impl RichTreeGraph for SynchronizationGraphInner {
2145 fn children(&self, node_index: Self::NodeIndex) -> Vec<Self::NodeIndex> {
2146 self.arena[node_index].children.clone()
2147 }
2148
2149 fn referrers(&self, node_index: Self::NodeIndex) -> Vec<Self::NodeIndex> {
2150 self.arena[node_index].referrers.clone()
2151 }
2152}
2153
2154pub enum BlockInsertionResult {
2155 AlreadyProcessed,
2157 ShouldRelay,
2159 SuccessWithoutRelay,
2161 Invalid,
2164 RequestAgain,
2168 Ignored,
2172}
2173
2174impl BlockInsertionResult {
2175 pub fn is_valid(&self) -> bool {
2176 matches!(
2177 self,
2178 BlockInsertionResult::AlreadyProcessed
2179 | BlockInsertionResult::ShouldRelay
2180 | BlockInsertionResult::SuccessWithoutRelay
2181 )
2182 }
2183
2184 pub fn is_invalid(&self) -> bool {
2185 matches!(self, BlockInsertionResult::Invalid)
2186 }
2187
2188 pub fn should_relay(&self) -> bool {
2189 matches!(self, BlockInsertionResult::ShouldRelay)
2190 }
2191
2192 pub fn request_again(&self) -> bool {
2193 matches!(self, BlockInsertionResult::RequestAgain)
2194 }
2195}
2196
2197pub enum BlockHeaderInsertionResult {
2198 AlreadyProcessedInConsensus,
2201 AlreadyProcessedInSync,
2204 NewValid,
2206 Invalid,
2209 TemporarySkipped,
2211}
2212
2213impl BlockHeaderInsertionResult {
2214 pub fn is_new_valid(&self) -> bool {
2215 matches!(self, BlockHeaderInsertionResult::NewValid)
2216 }
2217
2218 pub fn is_invalid(&self) -> bool {
2219 matches!(self, BlockHeaderInsertionResult::Invalid)
2220 }
2221
2222 pub fn should_process_body(&self) -> bool {
2223 matches!(
2224 self,
2225 BlockHeaderInsertionResult::NewValid
2226 | BlockHeaderInsertionResult::AlreadyProcessedInSync
2227 )
2228 }
2229}