cfxcore/consensus/consensus_graph/
sync_graph_api.rs1pub use crate::consensus::consensus_inner::ConsensusGraphInner;
2
3use cfx_parameters::consensus_internal::REWARD_EPOCH_COUNT;
4use cfx_types::{H256, U256};
5use metrics::{register_meter_with_group, Meter, MeterTimer};
6use primitives::EpochId;
7
8use std::{
9 collections::HashSet,
10 sync::{atomic::Ordering, Arc},
11};
12
13use super::ConsensusGraph;
14
15lazy_static! {
16 static ref CONSENSIS_ON_NEW_BLOCK_TIMER: Arc<dyn Meter> =
17 register_meter_with_group("timer", "consensus_on_new_block_timer");
18}
19
20impl ConsensusGraph {
21 pub fn reset(&self) {
24 let old_consensus_inner = &mut *self.inner.write();
25
26 let cur_era_genesis_hash =
27 self.data_man.get_cur_consensus_era_genesis_hash();
28 let cur_era_stable_hash =
29 self.data_man.get_cur_consensus_era_stable_hash();
30 let new_consensus_inner = ConsensusGraphInner::with_era_genesis(
31 old_consensus_inner.pow_config.clone(),
32 old_consensus_inner.pow.clone(),
33 old_consensus_inner.pos_verifier.clone(),
34 self.data_man.clone(),
35 old_consensus_inner.inner_conf.clone(),
36 &cur_era_genesis_hash,
37 &cur_era_stable_hash,
38 );
39 *old_consensus_inner = new_consensus_inner;
40 debug!("Build new consensus graph for sync-recovery with identified genesis {} stable block {}", cur_era_genesis_hash, cur_era_stable_hash);
41
42 self.confirmation_meter.clear();
43 }
44
45 pub fn get_blocks_needing_bodies(&self) -> HashSet<H256> {
51 let inner = self.inner.read();
52 let stable_genesis = self.data_man.get_cur_consensus_era_stable_hash();
54 let mut missing_body_blocks = HashSet::new();
55 for block_hash in inner
56 .get_subtree(&stable_genesis)
57 .expect("stable is in consensus")
58 {
59 if self.data_man.block_by_hash(&block_hash, false).is_none() {
60 missing_body_blocks.insert(block_hash);
61 }
62 }
63 let stable_height = self
66 .data_man
67 .block_height_by_hash(&stable_genesis)
68 .expect("stable exist");
69 let reward_start_epoch = if stable_height >= REWARD_EPOCH_COUNT {
70 stable_height - REWARD_EPOCH_COUNT + 1
71 } else {
72 1
73 };
74 for height in reward_start_epoch..=stable_height {
75 for block_hash in self
76 .data_man
77 .executed_epoch_set_hashes_from_db(height)
78 .expect("epoch sets before stable should exist")
79 {
80 if self.data_man.block_by_hash(&block_hash, false).is_none() {
81 missing_body_blocks.insert(block_hash);
82 }
83 }
84 }
85 missing_body_blocks.remove(&self.data_man.true_genesis.hash());
86 missing_body_blocks
87 }
88
89 pub fn enter_normal_phase(&self) {
90 self.ready_for_mining.store(true, Ordering::SeqCst);
91 self.update_best_info(true);
92 self.txpool.set_ready_for_mining();
93 self.txpool
94 .notify_new_best_info(self.best_info.read_recursive().clone())
95 .expect("No DB error")
96 }
97
98 pub fn set_initial_sequence_number(&self, initial_sn: u64) {
99 self.inner.write().set_initial_sequence_number(initial_sn);
100 }
101
102 pub fn get_trusted_blame_block_for_snapshot(
104 &self, snapshot_epoch_id: &EpochId,
105 ) -> Option<H256> {
106 self.inner
107 .read()
108 .get_trusted_blame_block_for_snapshot(snapshot_epoch_id)
109 }
110
111 pub fn get_to_sync_epoch_id(&self) -> EpochId {
113 self.inner.read().get_to_sync_epoch_id()
114 }
115
116 pub fn catch_up_completed(&self, peer_median_epoch: u64) -> bool {
119 let stable_genesis_height = self
120 .data_man
121 .block_height_by_hash(
122 &self.data_man.get_cur_consensus_era_stable_hash(),
123 )
124 .expect("stable exists");
125
126 if self.best_epoch_number() < stable_genesis_height {
127 return false;
133 }
134 if let Some(target_epoch) = self.config.sync_state_starting_epoch {
135 if stable_genesis_height < target_epoch {
136 return false;
137 }
138 }
139 if let Some(gap) = self.config.sync_state_epoch_gap {
140 if self.best_epoch_number() + gap < peer_median_epoch {
141 return false;
142 }
143 }
144 true
145 }
146
147 pub fn current_era_genesis_seq_num(&self) -> u64 {
150 self.inner.read_recursive().current_era_genesis_seq_num()
151 }
152
153 pub fn on_new_block(&self, hash: &H256) {
156 let _timer =
157 MeterTimer::time_func(CONSENSIS_ON_NEW_BLOCK_TIMER.as_ref());
158 self.statistics.inc_consensus_graph_processed_block_count();
159
160 self.new_block_handler.on_new_block(
161 &mut *self.inner.write(),
162 &self.confirmation_meter,
163 hash,
164 );
165
166 let ready_for_mining = self.ready_for_mining.load(Ordering::SeqCst);
167 self.update_best_info(ready_for_mining);
168 if ready_for_mining {
169 self.txpool
170 .notify_new_best_info(self.best_info.read().clone())
171 .expect(&concat!(file!(), ":", line!(), ":", column!()));
173 }
174 debug!("Finish Consensus::on_new_block for {:?}", hash);
175 }
176
177 pub fn update_total_weight_delta_heartbeat(&self) {
181 self.confirmation_meter
182 .update_total_weight_delta_heartbeat();
183 }
184
185 pub fn construct_pivot_state(&self) {
189 let inner = &mut *self.inner.write();
190 inner.recover_state_valid();
193 self.new_block_handler
194 .construct_pivot_state(inner, &self.confirmation_meter);
195 inner.finish_block_recovery();
196 }
197
198 pub fn expected_difficulty(&self, parent_hash: &H256) -> U256 {
200 let inner = self.inner.read();
201 inner.expected_difficulty(parent_hash)
202 }
203}