cfxcore/consensus/consensus_graph/
sync_graph_api.rs

1pub use crate::consensus::consensus_inner::ConsensusGraphInner;
2
3use cfx_parameters::consensus_internal::REWARD_EPOCH_COUNT;
4use cfx_types::{H256, U256};
5use metrics::{register_meter_with_group, Meter, MeterTimer};
6use primitives::EpochId;
7
8use std::{
9    collections::HashSet,
10    sync::{atomic::Ordering, Arc},
11};
12
13use super::ConsensusGraph;
14
15lazy_static! {
16    static ref CONSENSIS_ON_NEW_BLOCK_TIMER: Arc<dyn Meter> =
17        register_meter_with_group("timer", "consensus_on_new_block_timer");
18}
19
20impl ConsensusGraph {
21    /// Reset the information in consensus graph with only checkpoint
22    /// information kept.
23    pub fn reset(&self) {
24        let old_consensus_inner = &mut *self.inner.write();
25
26        let cur_era_genesis_hash =
27            self.data_man.get_cur_consensus_era_genesis_hash();
28        let cur_era_stable_hash =
29            self.data_man.get_cur_consensus_era_stable_hash();
30        let new_consensus_inner = ConsensusGraphInner::with_era_genesis(
31            old_consensus_inner.pow_config.clone(),
32            old_consensus_inner.pow.clone(),
33            old_consensus_inner.pos_verifier.clone(),
34            self.data_man.clone(),
35            old_consensus_inner.inner_conf.clone(),
36            &cur_era_genesis_hash,
37            &cur_era_stable_hash,
38        );
39        *old_consensus_inner = new_consensus_inner;
40        debug!("Build new consensus graph for sync-recovery with identified genesis {} stable block {}", cur_era_genesis_hash, cur_era_stable_hash);
41
42        self.confirmation_meter.clear();
43    }
44
45    /// Return the blocks without bodies in the subtree of stable genesis and
46    /// the blocks in the `REWARD_EPOCH_COUNT` epochs before it. Block
47    /// bodies of other blocks in the consensus graph will never be needed
48    /// for executions after this stable genesis, as long as the checkpoint
49    /// is not reverted.
50    pub fn get_blocks_needing_bodies(&self) -> HashSet<H256> {
51        let inner = self.inner.read();
52        // TODO: This may not be stable genesis with other configurations.
53        let stable_genesis = self.data_man.get_cur_consensus_era_stable_hash();
54        let mut missing_body_blocks = HashSet::new();
55        for block_hash in inner
56            .get_subtree(&stable_genesis)
57            .expect("stable is in consensus")
58        {
59            if self.data_man.block_by_hash(&block_hash, false).is_none() {
60                missing_body_blocks.insert(block_hash);
61            }
62        }
63        // We also need the block bodies before the checkpoint to compute
64        // rewards.
65        let stable_height = self
66            .data_man
67            .block_height_by_hash(&stable_genesis)
68            .expect("stable exist");
69        let reward_start_epoch = if stable_height >= REWARD_EPOCH_COUNT {
70            stable_height - REWARD_EPOCH_COUNT + 1
71        } else {
72            1
73        };
74        for height in reward_start_epoch..=stable_height {
75            for block_hash in self
76                .data_man
77                .executed_epoch_set_hashes_from_db(height)
78                .expect("epoch sets before stable should exist")
79            {
80                if self.data_man.block_by_hash(&block_hash, false).is_none() {
81                    missing_body_blocks.insert(block_hash);
82                }
83            }
84        }
85        missing_body_blocks.remove(&self.data_man.true_genesis.hash());
86        missing_body_blocks
87    }
88
89    pub fn enter_normal_phase(&self) {
90        self.ready_for_mining.store(true, Ordering::SeqCst);
91        self.update_best_info(true);
92        self.txpool.set_ready_for_mining();
93        self.txpool
94            .notify_new_best_info(self.best_info.read_recursive().clone())
95            .expect("No DB error")
96    }
97
98    pub fn set_initial_sequence_number(&self, initial_sn: u64) {
99        self.inner.write().set_initial_sequence_number(initial_sn);
100    }
101
102    /// Find a trusted blame block for snapshot full sync
103    pub fn get_trusted_blame_block_for_snapshot(
104        &self, snapshot_epoch_id: &EpochId,
105    ) -> Option<H256> {
106        self.inner
107            .read()
108            .get_trusted_blame_block_for_snapshot(snapshot_epoch_id)
109    }
110
111    /// Return the epoch that we are going to sync the state
112    pub fn get_to_sync_epoch_id(&self) -> EpochId {
113        self.inner.read().get_to_sync_epoch_id()
114    }
115
116    /// Check if we have downloaded all the headers to find the lowest needed
117    /// checkpoint. We can enter `CatchUpCheckpoint` if it's true.
118    pub fn catch_up_completed(&self, peer_median_epoch: u64) -> bool {
119        let stable_genesis_height = self
120            .data_man
121            .block_height_by_hash(
122                &self.data_man.get_cur_consensus_era_stable_hash(),
123            )
124            .expect("stable exists");
125
126        if self.best_epoch_number() < stable_genesis_height {
127            // For an archive node, if its terminals are overwritten with
128            // earlier blocks during recovery, it's possible to
129            // reach here with a pivot chain before stable era
130            // checkpoint. Here we wait for it to recover the missing headers
131            // after the overwritten terminals.
132            return false;
133        }
134        if let Some(target_epoch) = self.config.sync_state_starting_epoch {
135            if stable_genesis_height < target_epoch {
136                return false;
137            }
138        }
139        if let Some(gap) = self.config.sync_state_epoch_gap {
140            if self.best_epoch_number() + gap < peer_median_epoch {
141                return false;
142            }
143        }
144        true
145    }
146
147    // FIXME store this in BlockDataManager
148    /// Return the sequence number of the current era genesis hash.
149    pub fn current_era_genesis_seq_num(&self) -> u64 {
150        self.inner.read_recursive().current_era_genesis_seq_num()
151    }
152
153    /// This is the main function that SynchronizationGraph calls to deliver a
154    /// new block to the consensus graph.
155    pub fn on_new_block(&self, hash: &H256) {
156        let _timer =
157            MeterTimer::time_func(CONSENSIS_ON_NEW_BLOCK_TIMER.as_ref());
158        self.statistics.inc_consensus_graph_processed_block_count();
159
160        self.new_block_handler.on_new_block(
161            &mut *self.inner.write(),
162            &self.confirmation_meter,
163            hash,
164        );
165
166        let ready_for_mining = self.ready_for_mining.load(Ordering::SeqCst);
167        self.update_best_info(ready_for_mining);
168        if ready_for_mining {
169            self.txpool
170                .notify_new_best_info(self.best_info.read().clone())
171                // FIXME: propogate error.
172                .expect(&concat!(file!(), ":", line!(), ":", column!()));
173        }
174        debug!("Finish Consensus::on_new_block for {:?}", hash);
175    }
176
177    /// This function is a wrapper function for the function in the confirmation
178    /// meter. The synchronization layer is supposed to call this function
179    /// every 2 * BLOCK_PROPAGATION_DELAY seconds
180    pub fn update_total_weight_delta_heartbeat(&self) {
181        self.confirmation_meter
182            .update_total_weight_delta_heartbeat();
183    }
184
185    /// construct_pivot_state() rebuild pivot chain state info from db
186    /// avoiding intermediate redundant computation triggered by
187    /// on_new_block().
188    pub fn construct_pivot_state(&self) {
189        let inner = &mut *self.inner.write();
190        // Ensure that `state_valid` of the first valid block after
191        // cur_era_stable_genesis is set
192        inner.recover_state_valid();
193        self.new_block_handler
194            .construct_pivot_state(inner, &self.confirmation_meter);
195        inner.finish_block_recovery();
196    }
197
198    /// Compute the expected difficulty of a new block given its parent
199    pub fn expected_difficulty(&self, parent_hash: &H256) -> U256 {
200        let inner = self.inner.read();
201        inner.expected_difficulty(parent_hash)
202    }
203}