cfxcore/pos/consensus/liveness/proposal_generator.rs
1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8use std::sync::Arc;
9
10use anyhow::{bail, ensure, format_err, Context};
11
12use consensus_types::{
13 block::Block,
14 block_data::BlockData,
15 common::{Author, Round},
16 quorum_cert::QuorumCert,
17};
18use diem_logger::{debug as diem_debug, error as diem_error};
19use diem_types::{
20 transaction::{RawTransaction, SignedTransaction, TransactionPayload},
21 validator_config::ConsensusPrivateKey,
22 validator_verifier::ValidatorVerifier,
23};
24use parking_lot::Mutex;
25use pow_types::PowInterface;
26
27use crate::pos::consensus::{
28 block_storage::BlockReader, state_replication::TxnManager,
29 util::time_service::TimeService,
30};
31
32/// ProposalGenerator is responsible for generating the proposed block on
33/// demand: it's typically used by a validator that believes it's a valid
34/// candidate for serving as a proposer at a given round.
35/// ProposalGenerator is the one choosing the branch to extend:
36/// - round is given by the caller (typically determined by RoundState).
37/// The transactions for the proposed block are delivered by TxnManager.
38///
39/// TxnManager should be aware of the pending transactions in the branch that it
40/// is extending, such that it will filter them out to avoid transaction
41/// duplication.
42pub struct ProposalGenerator {
43 // The account address of this validator
44 author: Author,
45 // Block store is queried both for finding the branch to extend and for
46 // generating the proposed block.
47 block_store: Arc<dyn BlockReader + Send + Sync>,
48 // Transaction manager is delivering the transactions.
49 txn_manager: Arc<dyn TxnManager>,
50 // Time service to generate block timestamps
51 time_service: Arc<dyn TimeService>,
52 // Max number of transactions to be added to a proposed block.
53 max_block_size: u64,
54 // Last round that a proposal was generated
55 last_round_generated: Mutex<Round>,
56 // Handle the interaction with PoW consensus.
57 pow_handler: Arc<dyn PowInterface>,
58 // TODO(lpl): Where to put them?
59 pub private_key: ConsensusPrivateKey,
60}
61
62impl ProposalGenerator {
63 pub fn new(
64 author: Author, block_store: Arc<dyn BlockReader + Send + Sync>,
65 txn_manager: Arc<dyn TxnManager>, time_service: Arc<dyn TimeService>,
66 max_block_size: u64, pow_handler: Arc<dyn PowInterface>,
67 private_key: ConsensusPrivateKey,
68 ) -> Self {
69 Self {
70 author,
71 block_store,
72 txn_manager,
73 time_service,
74 max_block_size,
75 last_round_generated: Mutex::new(0),
76 pow_handler,
77 private_key,
78 }
79 }
80
81 pub fn author(&self) -> Author { self.author }
82
83 /// Creates a NIL block proposal extending the highest certified block from
84 /// the block store.
85 pub fn generate_nil_block(&self, round: Round) -> anyhow::Result<Block> {
86 let hqc = self.ensure_highest_quorum_cert(round)?;
87 Ok(Block::new_nil(round, hqc.as_ref().clone()))
88 }
89
90 /// The function generates a new proposal block: the returned future is
91 /// fulfilled when the payload is delivered by the TxnManager
92 /// implementation. At most one proposal can be generated per round (no
93 /// proposal equivocation allowed). Errors returned by the TxnManager
94 /// implementation are propagated to the caller. The logic for choosing
95 /// the branch to extend is as follows: 1. The function gets the highest
96 /// head of a one-chain from block tree. The new proposal must extend
97 /// hqc to ensure optimistic responsiveness. 2. The round is provided by
98 /// the caller. 3. In case a given round is not greater than the
99 /// calculated parent, return an OldRound error.
100 pub async fn generate_proposal(
101 &mut self, round: Round, validators: ValidatorVerifier,
102 ) -> anyhow::Result<BlockData> {
103 {
104 let mut last_round_generated = self.last_round_generated.lock();
105 if *last_round_generated < round {
106 *last_round_generated = round;
107 } else {
108 bail!("Already proposed in the round {}", round);
109 }
110 }
111
112 let hqc = self.ensure_highest_quorum_cert(round)?;
113
114 // TODO(lpl): Handle reconfiguraiton.
115 let (payload, timestamp) = if hqc
116 .certified_block()
117 .has_reconfiguration()
118 {
119 // Reconfiguration rule - we propose empty blocks with parents'
120 // timestamp after reconfiguration until it's committed
121 (vec![], hqc.certified_block().timestamp_usecs())
122 } else {
123 // One needs to hold the blocks with the references to the payloads
124 // while get_block is being executed: pending blocks
125 // vector keeps all the pending ancestors of the extended branch.
126 let mut pending_blocks = self
127 .block_store
128 .path_from_root(hqc.certified_block().id())
129 .ok_or_else(|| {
130 format_err!(
131 "HQC {} already pruned",
132 hqc.certified_block().id()
133 )
134 })?;
135 // Avoid txn manager long poll it the root block has txns, so that
136 // the leader can deliver the commit proof to others
137 // without delay.
138 pending_blocks.insert(0, self.block_store.root());
139
140 // Exclude all the pending transactions: these are all the ancestors
141 // of parent (including) up to the root (including).
142 let exclude_payload: Vec<&Vec<_>> = pending_blocks
143 .iter()
144 .flat_map(|block| block.payload())
145 .collect();
146
147 // All proposed blocks in a branch are guaranteed to have increasing
148 // timestamps since their predecessor block will not be
149 // added to the BlockStore until the local time exceeds
150 // it.
151 let timestamp = self.time_service.get_current_timestamp();
152
153 // TODO(lpl): Check what to do if `parent_block !=
154 // hqc.certified_block()`
155 let parent_block = pending_blocks.last().expect("root pushed");
156 assert!(
157 hqc.certified_block().id() == parent_block.id(),
158 "generate_proposal: hqc = parent"
159 );
160
161 let mut payload = self
162 .txn_manager
163 .pull_txns(
164 self.max_block_size,
165 exclude_payload,
166 parent_block.id(),
167 validators,
168 )
169 .await
170 .context("Fail to retrieve txn")?;
171 diem_debug!(
172 "generate_proposal: Pull {} transactions",
173 payload.len()
174 );
175
176 // Sending non-existent decision (default value here) will return
177 // the latest pivot decision.
178 let parent_decision = parent_block
179 .block_info()
180 .pivot_decision()
181 .cloned()
182 .unwrap_or_default();
183 let new_pivot_decision =
184 payload.iter().find_map(|tx| match tx.payload() {
185 TransactionPayload::PivotDecision(decision) => {
186 if decision.height
187 <= parent_block
188 .block_info()
189 .pivot_decision()
190 .map(|d| d.height)
191 .unwrap_or_default()
192 {
193 None
194 } else {
195 Some(decision.clone())
196 }
197 }
198 _ => None,
199 });
200 if new_pivot_decision.is_none()
201 && payload.last().is_some()
202 && matches!(
203 payload.last().unwrap().payload(),
204 TransactionPayload::PivotDecision(_)
205 )
206 {
207 payload.pop();
208 }
209
210 match new_pivot_decision {
211 Some(me_decision) => {
212 // Included new registered or updated nodes as transactions.
213 let staking_events = self.pow_handler.get_staking_events(
214 parent_decision.height,
215 me_decision.height,
216 parent_decision.block_hash,
217 me_decision.block_hash,
218 )?;
219 diem_debug!(
220 "generate_proposal: staking_events={:?} parent={:?} me={:?}",
221 staking_events, parent_block.block_info().pivot_decision(), payload.last()
222 );
223 for event in staking_events {
224 match RawTransaction::from_staking_event(
225 &event,
226 self.author,
227 ) {
228 Ok(raw_tx) => {
229 let signed_tx = raw_tx
230 .sign(&self.private_key)?
231 .into_inner();
232 payload.push(signed_tx);
233 }
234 // TODO(lpl): This is not supposed to happen, so
235 // should we return error here?
236 Err(e) => diem_error!(
237 "Get invalid staking event: err={:?}",
238 e
239 ),
240 }
241 }
242 }
243 None => {
244 warn!("pos progress without new pivot decision");
245 }
246 }
247
248 (payload, timestamp.as_micros() as u64)
249 };
250
251 // create block proposal
252 Ok(BlockData::new_proposal(
253 payload,
254 self.author,
255 round,
256 timestamp,
257 hqc.as_ref().clone(),
258 ))
259 }
260
261 fn ensure_highest_quorum_cert(
262 &self, round: Round,
263 ) -> anyhow::Result<Arc<QuorumCert>> {
264 let hqc = self.block_store.highest_quorum_cert();
265 ensure!(
266 hqc.certified_block().round() < round,
267 "Given round {} is lower than hqc round {}",
268 round,
269 hqc.certified_block().round()
270 );
271 ensure!(
272 !hqc.ends_epoch(),
273 "The epoch has already ended,a proposal is not allowed to generated"
274 );
275
276 Ok(hqc)
277 }
278}
279
280/// The functions used in tests to construct attack cases
281impl ProposalGenerator {
282 pub fn force_propose(
283 &self, round: Round, parent_qc: Arc<QuorumCert>,
284 payload: Vec<TransactionPayload>,
285 ) -> anyhow::Result<BlockData> {
286 let payload = payload
287 .into_iter()
288 .map(|p| {
289 let raw_tx = RawTransaction::new(
290 self.author,
291 p,
292 u64::MAX,
293 Default::default(),
294 );
295 raw_tx.sign(&self.private_key).map(|tx| tx.into_inner())
296 })
297 .collect::<anyhow::Result<Vec<SignedTransaction>>>()?;
298
299 Ok(BlockData::new_proposal(
300 payload,
301 self.author,
302 round,
303 self.time_service.get_current_timestamp().as_micros() as u64,
304 parent_qc.as_ref().clone(),
305 ))
306 }
307}