cfxcore/pos/
pow_handler.rs

1// Copyright 2021 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::{pos::consensus::ConsensusDB, ConsensusGraph};
6use anyhow::{anyhow, bail, Result};
7use async_trait::async_trait;
8use cfx_executor::internal_contract::decode_register_info;
9use cfx_parameters::internal_contract_addresses::POS_REGISTER_CONTRACT_ADDRESS;
10use cfx_types::H256;
11use diem_types::block_info::PivotBlockDecision;
12use futures::channel::oneshot;
13use parking_lot::RwLock;
14use pow_types::{PowInterface, StakingEvent};
15use primitives::filter::{LogFilter, LogFilterParams};
16use std::{
17    sync::{atomic::Ordering, Arc, Weak},
18    time::Duration,
19};
20use tokio::runtime::Handle;
21
22// TODO(lpl): Decide the value.
23pub const POS_TERM_EPOCHS: u64 = 60;
24
25pub struct PowHandler {
26    executor: Handle,
27    pow_consensus: RwLock<Option<Weak<ConsensusGraph>>>,
28    pos_consensus_db: Arc<ConsensusDB>,
29}
30
31impl PowHandler {
32    pub fn new(executor: Handle, pos_consensus_db: Arc<ConsensusDB>) -> Self {
33        Self {
34            executor,
35            pow_consensus: RwLock::new(None),
36            pos_consensus_db,
37        }
38    }
39
40    pub fn initialize(&self, pow_consensus: Arc<ConsensusGraph>) {
41        *self.pow_consensus.write() = Some(Arc::downgrade(&pow_consensus));
42    }
43
44    pub fn stop(&self) {
45        let pow_consensus = &mut *self.pow_consensus.write();
46        if pow_consensus.is_some() {
47            debug!(
48                "Consensus ref count:{}",
49                Weak::strong_count(pow_consensus.as_ref().unwrap())
50            );
51            *pow_consensus = None;
52        }
53    }
54
55    fn next_pivot_decision_impl(
56        pow_consensus: Arc<ConsensusGraph>, parent_decision: &H256,
57    ) -> Option<(u64, H256)> {
58        let inner = pow_consensus.inner.read();
59        let confirmed_height =
60            pow_consensus.confirmation_meter.get_confirmed_epoch_num();
61        inner.get_next_pivot_decision(parent_decision, confirmed_height)
62    }
63
64    fn validate_proposal_pivot_decision_impl(
65        pow_consensus: Arc<ConsensusGraph>, parent_decision: &H256,
66        me_decision: &H256,
67    ) -> bool {
68        pow_consensus
69            .inner
70            .read()
71            .validate_pivot_decision(parent_decision, me_decision)
72    }
73
74    fn get_staking_events_impl(
75        pow_consensus: Arc<ConsensusGraph>, parent_decision: H256,
76        me_decision: H256,
77    ) -> Result<Vec<StakingEvent>> {
78        // We only call this for committed blocks, so it is guaranteed that
79        // `parent_decision` is an ancestor of `me_decision`.
80        if parent_decision == me_decision {
81            return Ok(vec![]);
82        }
83        let start_epoch = pow_consensus
84            .data_man
85            .block_height_by_hash(&parent_decision)
86            .ok_or(anyhow!(
87                "parent decision block missing, hash={:?}",
88                parent_decision
89            ))?;
90        let end_epoch = pow_consensus
91            .data_man
92            .block_height_by_hash(&me_decision)
93            .ok_or(anyhow!(
94                "new decision block missing, hash={:?}",
95                me_decision
96            ))?;
97        // start_epoch has been processed by parent.
98        let from_epoch = (start_epoch + 1).into();
99        let to_epoch = end_epoch.into();
100        let mut params = LogFilterParams::default();
101        params.address = Some(vec![POS_REGISTER_CONTRACT_ADDRESS]);
102        params.trusted = true;
103        let log_filter = LogFilter::EpochLogFilter {
104            from_epoch,
105            to_epoch,
106            params,
107        };
108        Ok(pow_consensus
109            .logs(log_filter)
110            .map_err(|e| anyhow!("Logs not available: e={}", e))?
111            .into_iter()
112            .map(|localized_entry| {
113                decode_register_info(&localized_entry.entry)
114                    .expect("address checked")
115            })
116            .collect())
117    }
118}
119
120// TODO(lpl): We should let the caller to decide if `pow_consensus` should be
121// `None`?
122#[async_trait]
123impl PowInterface for PowHandler {
124    async fn next_pivot_decision(
125        &self, parent_decision: H256,
126    ) -> Option<(u64, H256)> {
127        let pow_consensus =
128            self.pow_consensus.read().clone().and_then(|c| c.upgrade());
129        if pow_consensus.is_none() {
130            return None;
131        }
132        let (callback, cb_receiver) = oneshot::channel();
133        let pow_consensus = pow_consensus.unwrap();
134        self.executor.spawn(async move {
135            let r =
136                Self::next_pivot_decision_impl(pow_consensus, &parent_decision);
137            if let Err(e) = callback.send(r) {
138                debug!("send next_pivot_decision err={:?}", e);
139            }
140        });
141        cb_receiver.await.ok().flatten()
142    }
143
144    fn validate_proposal_pivot_decision(
145        &self, parent_decision: H256, me_decision: H256,
146    ) -> bool {
147        let pow_consensus =
148            self.pow_consensus.read().clone().and_then(|c| c.upgrade());
149        if pow_consensus.is_none() {
150            return true;
151        }
152        let pow_consensus = pow_consensus.unwrap();
153        debug!("before spawn pivot_decision");
154        let r = Self::validate_proposal_pivot_decision_impl(
155            pow_consensus,
156            &parent_decision,
157            &me_decision,
158        );
159        debug!("after spawn pivot_decision");
160        r
161    }
162
163    /// Return error if pow_consensus has not been initialized or the pivot
164    /// decision blocks have not been processed in PoW. Thus, a PoS node
165    /// will not vote for new pivot decisions if the PoW block has not been
166    /// processed.
167    fn get_staking_events(
168        &self, parent_height: u64, me_height: u64, parent_decision: H256,
169        me_decision: H256,
170    ) -> Result<Vec<StakingEvent>> {
171        let pow_consensus =
172            self.pow_consensus.read().clone().and_then(|c| c.upgrade());
173        if pow_consensus.is_none() {
174            // This case will be reached during pos recovery.
175            bail!("PoW consensus not initialized");
176        }
177        debug!(
178            "get_staking_events: parent={:?}, me={:?}",
179            parent_decision, me_decision
180        );
181        let pow_consensus = pow_consensus.unwrap();
182        if parent_decision == pow_consensus.data_man.true_genesis.hash() {
183            // `me_decision` is the first actual pow_decision. It may be far
184            // from genesis, so getting all event can be slow or
185            // even unavailable. We just drop all events before this
186            // first pow_decision. And in normal cases, these events
187            // have been processed to produce the PoS genesis, so they should
188            // not be packed again.
189            return Ok(vec![]);
190        }
191        self.pos_consensus_db
192            .get_staking_events(
193                PivotBlockDecision {
194                    height: parent_height,
195                    block_hash: parent_decision,
196                },
197                PivotBlockDecision {
198                    height: me_height,
199                    block_hash: me_decision,
200                },
201            )
202            .or_else(|e| {
203                debug!("get_staking_events from pow: err={:?}", e);
204                Self::get_staking_events_impl(
205                    pow_consensus,
206                    parent_decision,
207                    me_decision,
208                )
209            })
210    }
211
212    async fn wait_for_initialization(&self, last_decision: H256) {
213        debug!("wait_for_initialization: {:?}", last_decision);
214        while self.pow_consensus.read().is_none() {
215            tokio::time::sleep(Duration::from_millis(200)).await
216        }
217        // TODO(lpl): Wait for last_decision is stable?
218        loop {
219            // Check epoch hash set to see if last_decision is processed and is
220            // on the pivot chain. Note that for full nodes, there
221            // is no other persisted data to check for old blocks.
222            {
223                if self
224                    .pow_consensus
225                    .read()
226                    .as_ref()
227                    .unwrap()
228                    .upgrade()
229                    .unwrap()
230                    .inner
231                    .read()
232                    .pivot_block_processed(&last_decision)
233                {
234                    return;
235                }
236            }
237            tokio::time::sleep(Duration::from_millis(200)).await
238        }
239    }
240
241    fn is_normal_phase(&self) -> bool {
242        self.pow_consensus
243            .read()
244            .as_ref()
245            .and_then(|p| {
246                p.upgrade().map(|consensus| {
247                    consensus.ready_for_mining.load(Ordering::SeqCst)
248                })
249            })
250            .unwrap_or(false)
251    }
252}