cfxcore/pos/
pow_handler.rs1use crate::{pos::consensus::ConsensusDB, ConsensusGraph};
6use anyhow::{anyhow, bail, Result};
7use async_trait::async_trait;
8use cfx_executor::internal_contract::decode_register_info;
9use cfx_parameters::internal_contract_addresses::POS_REGISTER_CONTRACT_ADDRESS;
10use cfx_types::H256;
11use diem_types::block_info::PivotBlockDecision;
12use futures::channel::oneshot;
13use parking_lot::RwLock;
14use pow_types::{PowInterface, StakingEvent};
15use primitives::filter::{LogFilter, LogFilterParams};
16use std::{
17 sync::{atomic::Ordering, Arc, Weak},
18 time::Duration,
19};
20use tokio::runtime::Handle;
21
22pub const POS_TERM_EPOCHS: u64 = 60;
24
25pub struct PowHandler {
26 executor: Handle,
27 pow_consensus: RwLock<Option<Weak<ConsensusGraph>>>,
28 pos_consensus_db: Arc<ConsensusDB>,
29}
30
31impl PowHandler {
32 pub fn new(executor: Handle, pos_consensus_db: Arc<ConsensusDB>) -> Self {
33 Self {
34 executor,
35 pow_consensus: RwLock::new(None),
36 pos_consensus_db,
37 }
38 }
39
40 pub fn initialize(&self, pow_consensus: Arc<ConsensusGraph>) {
41 *self.pow_consensus.write() = Some(Arc::downgrade(&pow_consensus));
42 }
43
44 pub fn stop(&self) {
45 let pow_consensus = &mut *self.pow_consensus.write();
46 if pow_consensus.is_some() {
47 debug!(
48 "Consensus ref count:{}",
49 Weak::strong_count(pow_consensus.as_ref().unwrap())
50 );
51 *pow_consensus = None;
52 }
53 }
54
55 fn next_pivot_decision_impl(
56 pow_consensus: Arc<ConsensusGraph>, parent_decision: &H256,
57 ) -> Option<(u64, H256)> {
58 let inner = pow_consensus.inner.read();
59 let confirmed_height =
60 pow_consensus.confirmation_meter.get_confirmed_epoch_num();
61 inner.get_next_pivot_decision(parent_decision, confirmed_height)
62 }
63
64 fn validate_proposal_pivot_decision_impl(
65 pow_consensus: Arc<ConsensusGraph>, parent_decision: &H256,
66 me_decision: &H256,
67 ) -> bool {
68 pow_consensus
69 .inner
70 .read()
71 .validate_pivot_decision(parent_decision, me_decision)
72 }
73
74 fn get_staking_events_impl(
75 pow_consensus: Arc<ConsensusGraph>, parent_decision: H256,
76 me_decision: H256,
77 ) -> Result<Vec<StakingEvent>> {
78 if parent_decision == me_decision {
81 return Ok(vec![]);
82 }
83 let start_epoch = pow_consensus
84 .data_man
85 .block_height_by_hash(&parent_decision)
86 .ok_or(anyhow!(
87 "parent decision block missing, hash={:?}",
88 parent_decision
89 ))?;
90 let end_epoch = pow_consensus
91 .data_man
92 .block_height_by_hash(&me_decision)
93 .ok_or(anyhow!(
94 "new decision block missing, hash={:?}",
95 me_decision
96 ))?;
97 let from_epoch = (start_epoch + 1).into();
99 let to_epoch = end_epoch.into();
100 let mut params = LogFilterParams::default();
101 params.address = Some(vec![POS_REGISTER_CONTRACT_ADDRESS]);
102 params.trusted = true;
103 let log_filter = LogFilter::EpochLogFilter {
104 from_epoch,
105 to_epoch,
106 params,
107 };
108 Ok(pow_consensus
109 .logs(log_filter)
110 .map_err(|e| anyhow!("Logs not available: e={}", e))?
111 .into_iter()
112 .map(|localized_entry| {
113 decode_register_info(&localized_entry.entry)
114 .expect("address checked")
115 })
116 .collect())
117 }
118}
119
120#[async_trait]
123impl PowInterface for PowHandler {
124 async fn next_pivot_decision(
125 &self, parent_decision: H256,
126 ) -> Option<(u64, H256)> {
127 let pow_consensus =
128 self.pow_consensus.read().clone().and_then(|c| c.upgrade());
129 if pow_consensus.is_none() {
130 return None;
131 }
132 let (callback, cb_receiver) = oneshot::channel();
133 let pow_consensus = pow_consensus.unwrap();
134 self.executor.spawn(async move {
135 let r =
136 Self::next_pivot_decision_impl(pow_consensus, &parent_decision);
137 if let Err(e) = callback.send(r) {
138 debug!("send next_pivot_decision err={:?}", e);
139 }
140 });
141 cb_receiver.await.ok().flatten()
142 }
143
144 fn validate_proposal_pivot_decision(
145 &self, parent_decision: H256, me_decision: H256,
146 ) -> bool {
147 let pow_consensus =
148 self.pow_consensus.read().clone().and_then(|c| c.upgrade());
149 if pow_consensus.is_none() {
150 return true;
151 }
152 let pow_consensus = pow_consensus.unwrap();
153 debug!("before spawn pivot_decision");
154 let r = Self::validate_proposal_pivot_decision_impl(
155 pow_consensus,
156 &parent_decision,
157 &me_decision,
158 );
159 debug!("after spawn pivot_decision");
160 r
161 }
162
163 fn get_staking_events(
168 &self, parent_height: u64, me_height: u64, parent_decision: H256,
169 me_decision: H256,
170 ) -> Result<Vec<StakingEvent>> {
171 let pow_consensus =
172 self.pow_consensus.read().clone().and_then(|c| c.upgrade());
173 if pow_consensus.is_none() {
174 bail!("PoW consensus not initialized");
176 }
177 debug!(
178 "get_staking_events: parent={:?}, me={:?}",
179 parent_decision, me_decision
180 );
181 let pow_consensus = pow_consensus.unwrap();
182 if parent_decision == pow_consensus.data_man.true_genesis.hash() {
183 return Ok(vec![]);
190 }
191 self.pos_consensus_db
192 .get_staking_events(
193 PivotBlockDecision {
194 height: parent_height,
195 block_hash: parent_decision,
196 },
197 PivotBlockDecision {
198 height: me_height,
199 block_hash: me_decision,
200 },
201 )
202 .or_else(|e| {
203 debug!("get_staking_events from pow: err={:?}", e);
204 Self::get_staking_events_impl(
205 pow_consensus,
206 parent_decision,
207 me_decision,
208 )
209 })
210 }
211
212 async fn wait_for_initialization(&self, last_decision: H256) {
213 debug!("wait_for_initialization: {:?}", last_decision);
214 while self.pow_consensus.read().is_none() {
215 tokio::time::sleep(Duration::from_millis(200)).await
216 }
217 loop {
219 {
223 if self
224 .pow_consensus
225 .read()
226 .as_ref()
227 .unwrap()
228 .upgrade()
229 .unwrap()
230 .inner
231 .read()
232 .pivot_block_processed(&last_decision)
233 {
234 return;
235 }
236 }
237 tokio::time::sleep(Duration::from_millis(200)).await
238 }
239 }
240
241 fn is_normal_phase(&self) -> bool {
242 self.pow_consensus
243 .read()
244 .as_ref()
245 .and_then(|p| {
246 p.upgrade().map(|consensus| {
247 consensus.ready_for_mining.load(Ordering::SeqCst)
248 })
249 })
250 .unwrap_or(false)
251 }
252}