cfxcore/pos/consensus/liveness/proposal_generator.rs
1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8use std::sync::Arc;
9
10use anyhow::{bail, ensure, format_err, Context};
11
12use consensus_types::{
13 block::Block,
14 block_data::BlockData,
15 common::{Author, Round},
16 quorum_cert::QuorumCert,
17};
18use diem_infallible::Mutex;
19use diem_logger::{debug as diem_debug, error as diem_error};
20use diem_types::{
21 transaction::{RawTransaction, SignedTransaction, TransactionPayload},
22 validator_config::ConsensusPrivateKey,
23 validator_verifier::ValidatorVerifier,
24};
25use pow_types::PowInterface;
26
27use crate::pos::consensus::{
28 block_storage::BlockReader, state_replication::TxnManager,
29 util::time_service::TimeService,
30};
31
32#[cfg(test)]
33#[path = "proposal_generator_test.rs"]
34mod proposal_generator_test;
35
36/// ProposalGenerator is responsible for generating the proposed block on
37/// demand: it's typically used by a validator that believes it's a valid
38/// candidate for serving as a proposer at a given round.
39/// ProposalGenerator is the one choosing the branch to extend:
40/// - round is given by the caller (typically determined by RoundState).
41/// The transactions for the proposed block are delivered by TxnManager.
42///
43/// TxnManager should be aware of the pending transactions in the branch that it
44/// is extending, such that it will filter them out to avoid transaction
45/// duplication.
46pub struct ProposalGenerator {
47 // The account address of this validator
48 author: Author,
49 // Block store is queried both for finding the branch to extend and for
50 // generating the proposed block.
51 block_store: Arc<dyn BlockReader + Send + Sync>,
52 // Transaction manager is delivering the transactions.
53 txn_manager: Arc<dyn TxnManager>,
54 // Time service to generate block timestamps
55 time_service: Arc<dyn TimeService>,
56 // Max number of transactions to be added to a proposed block.
57 max_block_size: u64,
58 // Last round that a proposal was generated
59 last_round_generated: Mutex<Round>,
60 // Handle the interaction with PoW consensus.
61 pow_handler: Arc<dyn PowInterface>,
62 // TODO(lpl): Where to put them?
63 pub private_key: ConsensusPrivateKey,
64}
65
66impl ProposalGenerator {
67 pub fn new(
68 author: Author, block_store: Arc<dyn BlockReader + Send + Sync>,
69 txn_manager: Arc<dyn TxnManager>, time_service: Arc<dyn TimeService>,
70 max_block_size: u64, pow_handler: Arc<dyn PowInterface>,
71 private_key: ConsensusPrivateKey,
72 ) -> Self {
73 Self {
74 author,
75 block_store,
76 txn_manager,
77 time_service,
78 max_block_size,
79 last_round_generated: Mutex::new(0),
80 pow_handler,
81 private_key,
82 }
83 }
84
85 pub fn author(&self) -> Author { self.author }
86
87 /// Creates a NIL block proposal extending the highest certified block from
88 /// the block store.
89 pub fn generate_nil_block(&self, round: Round) -> anyhow::Result<Block> {
90 let hqc = self.ensure_highest_quorum_cert(round)?;
91 Ok(Block::new_nil(round, hqc.as_ref().clone()))
92 }
93
94 /// The function generates a new proposal block: the returned future is
95 /// fulfilled when the payload is delivered by the TxnManager
96 /// implementation. At most one proposal can be generated per round (no
97 /// proposal equivocation allowed). Errors returned by the TxnManager
98 /// implementation are propagated to the caller. The logic for choosing
99 /// the branch to extend is as follows: 1. The function gets the highest
100 /// head of a one-chain from block tree. The new proposal must extend
101 /// hqc to ensure optimistic responsiveness. 2. The round is provided by
102 /// the caller. 3. In case a given round is not greater than the
103 /// calculated parent, return an OldRound error.
104 pub async fn generate_proposal(
105 &mut self, round: Round, validators: ValidatorVerifier,
106 ) -> anyhow::Result<BlockData> {
107 {
108 let mut last_round_generated = self.last_round_generated.lock();
109 if *last_round_generated < round {
110 *last_round_generated = round;
111 } else {
112 bail!("Already proposed in the round {}", round);
113 }
114 }
115
116 let hqc = self.ensure_highest_quorum_cert(round)?;
117
118 // TODO(lpl): Handle reconfiguraiton.
119 let (payload, timestamp) = if hqc
120 .certified_block()
121 .has_reconfiguration()
122 {
123 // Reconfiguration rule - we propose empty blocks with parents'
124 // timestamp after reconfiguration until it's committed
125 (vec![], hqc.certified_block().timestamp_usecs())
126 } else {
127 // One needs to hold the blocks with the references to the payloads
128 // while get_block is being executed: pending blocks
129 // vector keeps all the pending ancestors of the extended branch.
130 let mut pending_blocks = self
131 .block_store
132 .path_from_root(hqc.certified_block().id())
133 .ok_or_else(|| {
134 format_err!(
135 "HQC {} already pruned",
136 hqc.certified_block().id()
137 )
138 })?;
139 // Avoid txn manager long poll it the root block has txns, so that
140 // the leader can deliver the commit proof to others
141 // without delay.
142 pending_blocks.insert(0, self.block_store.root());
143
144 // Exclude all the pending transactions: these are all the ancestors
145 // of parent (including) up to the root (including).
146 let exclude_payload: Vec<&Vec<_>> = pending_blocks
147 .iter()
148 .flat_map(|block| block.payload())
149 .collect();
150
151 // All proposed blocks in a branch are guaranteed to have increasing
152 // timestamps since their predecessor block will not be
153 // added to the BlockStore until the local time exceeds
154 // it.
155 let timestamp = self.time_service.get_current_timestamp();
156
157 // TODO(lpl): Check what to do if `parent_block !=
158 // hqc.certified_block()`
159 let parent_block = pending_blocks.last().expect("root pushed");
160 assert!(
161 hqc.certified_block().id() == parent_block.id(),
162 "generate_proposal: hqc = parent"
163 );
164
165 let mut payload = self
166 .txn_manager
167 .pull_txns(
168 self.max_block_size,
169 exclude_payload,
170 parent_block.id(),
171 validators,
172 )
173 .await
174 .context("Fail to retrieve txn")?;
175 diem_debug!(
176 "generate_proposal: Pull {} transactions",
177 payload.len()
178 );
179
180 // Sending non-existent decision (default value here) will return
181 // the latest pivot decision.
182 let parent_decision = parent_block
183 .block_info()
184 .pivot_decision()
185 .cloned()
186 .unwrap_or_default();
187 let new_pivot_decision =
188 payload.iter().find_map(|tx| match tx.payload() {
189 TransactionPayload::PivotDecision(decision) => {
190 if decision.height
191 <= parent_block
192 .block_info()
193 .pivot_decision()
194 .map(|d| d.height)
195 .unwrap_or_default()
196 {
197 None
198 } else {
199 Some(decision.clone())
200 }
201 }
202 _ => None,
203 });
204 if new_pivot_decision.is_none()
205 && payload.last().is_some()
206 && matches!(
207 payload.last().unwrap().payload(),
208 TransactionPayload::PivotDecision(_)
209 )
210 {
211 payload.pop();
212 }
213
214 match new_pivot_decision {
215 Some(me_decision) => {
216 // Included new registered or updated nodes as transactions.
217 let staking_events = self.pow_handler.get_staking_events(
218 parent_decision.height,
219 me_decision.height,
220 parent_decision.block_hash,
221 me_decision.block_hash,
222 )?;
223 diem_debug!(
224 "generate_proposal: staking_events={:?} parent={:?} me={:?}",
225 staking_events, parent_block.block_info().pivot_decision(), payload.last()
226 );
227 for event in staking_events {
228 match RawTransaction::from_staking_event(
229 &event,
230 self.author,
231 ) {
232 Ok(raw_tx) => {
233 let signed_tx = raw_tx
234 .sign(&self.private_key)?
235 .into_inner();
236 payload.push(signed_tx);
237 }
238 // TODO(lpl): This is not supposed to happen, so
239 // should we return error here?
240 Err(e) => diem_error!(
241 "Get invalid staking event: err={:?}",
242 e
243 ),
244 }
245 }
246 }
247 None => {
248 warn!("pos progress without new pivot decision");
249 }
250 }
251
252 (payload, timestamp.as_micros() as u64)
253 };
254
255 // create block proposal
256 Ok(BlockData::new_proposal(
257 payload,
258 self.author,
259 round,
260 timestamp,
261 hqc.as_ref().clone(),
262 ))
263 }
264
265 fn ensure_highest_quorum_cert(
266 &self, round: Round,
267 ) -> anyhow::Result<Arc<QuorumCert>> {
268 let hqc = self.block_store.highest_quorum_cert();
269 ensure!(
270 hqc.certified_block().round() < round,
271 "Given round {} is lower than hqc round {}",
272 round,
273 hqc.certified_block().round()
274 );
275 ensure!(
276 !hqc.ends_epoch(),
277 "The epoch has already ended,a proposal is not allowed to generated"
278 );
279
280 Ok(hqc)
281 }
282}
283
284/// The functions used in tests to construct attack cases
285impl ProposalGenerator {
286 pub fn force_propose(
287 &self, round: Round, parent_qc: Arc<QuorumCert>,
288 payload: Vec<TransactionPayload>,
289 ) -> anyhow::Result<BlockData> {
290 let payload = payload
291 .into_iter()
292 .map(|p| {
293 let raw_tx = RawTransaction::new(
294 self.author,
295 p,
296 u64::MAX,
297 Default::default(),
298 );
299 raw_tx.sign(&self.private_key).map(|tx| tx.into_inner())
300 })
301 .collect::<anyhow::Result<Vec<SignedTransaction>>>()?;
302
303 Ok(BlockData::new_proposal(
304 payload,
305 self.author,
306 round,
307 self.time_service.get_current_timestamp().as_micros() as u64,
308 parent_qc.as_ref().clone(),
309 ))
310 }
311}