cfxcore/pos/consensus/liveness/
proposal_generator.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8use std::sync::Arc;
9
10use anyhow::{bail, ensure, format_err, Context};
11
12use consensus_types::{
13    block::Block,
14    block_data::BlockData,
15    common::{Author, Round},
16    quorum_cert::QuorumCert,
17};
18use diem_logger::{debug as diem_debug, error as diem_error};
19use diem_types::{
20    transaction::{RawTransaction, SignedTransaction, TransactionPayload},
21    validator_config::ConsensusPrivateKey,
22    validator_verifier::ValidatorVerifier,
23};
24use parking_lot::Mutex;
25use pow_types::PowInterface;
26
27use crate::pos::consensus::{
28    block_storage::BlockReader, state_replication::TxnManager,
29    util::time_service::TimeService,
30};
31
32/// ProposalGenerator is responsible for generating the proposed block on
33/// demand: it's typically used by a validator that believes it's a valid
34/// candidate for serving as a proposer at a given round.
35/// ProposalGenerator is the one choosing the branch to extend:
36/// - round is given by the caller (typically determined by RoundState).
37/// The transactions for the proposed block are delivered by TxnManager.
38///
39/// TxnManager should be aware of the pending transactions in the branch that it
40/// is extending, such that it will filter them out to avoid transaction
41/// duplication.
42pub struct ProposalGenerator {
43    // The account address of this validator
44    author: Author,
45    // Block store is queried both for finding the branch to extend and for
46    // generating the proposed block.
47    block_store: Arc<dyn BlockReader + Send + Sync>,
48    // Transaction manager is delivering the transactions.
49    txn_manager: Arc<dyn TxnManager>,
50    // Time service to generate block timestamps
51    time_service: Arc<dyn TimeService>,
52    // Max number of transactions to be added to a proposed block.
53    max_block_size: u64,
54    // Last round that a proposal was generated
55    last_round_generated: Mutex<Round>,
56    // Handle the interaction with PoW consensus.
57    pow_handler: Arc<dyn PowInterface>,
58    // TODO(lpl): Where to put them?
59    pub private_key: ConsensusPrivateKey,
60}
61
62impl ProposalGenerator {
63    pub fn new(
64        author: Author, block_store: Arc<dyn BlockReader + Send + Sync>,
65        txn_manager: Arc<dyn TxnManager>, time_service: Arc<dyn TimeService>,
66        max_block_size: u64, pow_handler: Arc<dyn PowInterface>,
67        private_key: ConsensusPrivateKey,
68    ) -> Self {
69        Self {
70            author,
71            block_store,
72            txn_manager,
73            time_service,
74            max_block_size,
75            last_round_generated: Mutex::new(0),
76            pow_handler,
77            private_key,
78        }
79    }
80
81    pub fn author(&self) -> Author { self.author }
82
83    /// Creates a NIL block proposal extending the highest certified block from
84    /// the block store.
85    pub fn generate_nil_block(&self, round: Round) -> anyhow::Result<Block> {
86        let hqc = self.ensure_highest_quorum_cert(round)?;
87        Ok(Block::new_nil(round, hqc.as_ref().clone()))
88    }
89
90    /// The function generates a new proposal block: the returned future is
91    /// fulfilled when the payload is delivered by the TxnManager
92    /// implementation.  At most one proposal can be generated per round (no
93    /// proposal equivocation allowed). Errors returned by the TxnManager
94    /// implementation are propagated to the caller. The logic for choosing
95    /// the branch to extend is as follows: 1. The function gets the highest
96    /// head of a one-chain from block tree. The new proposal must extend
97    /// hqc to ensure optimistic responsiveness. 2. The round is provided by
98    /// the caller. 3. In case a given round is not greater than the
99    /// calculated parent, return an OldRound error.
100    pub async fn generate_proposal(
101        &mut self, round: Round, validators: ValidatorVerifier,
102    ) -> anyhow::Result<BlockData> {
103        {
104            let mut last_round_generated = self.last_round_generated.lock();
105            if *last_round_generated < round {
106                *last_round_generated = round;
107            } else {
108                bail!("Already proposed in the round {}", round);
109            }
110        }
111
112        let hqc = self.ensure_highest_quorum_cert(round)?;
113
114        // TODO(lpl): Handle reconfiguraiton.
115        let (payload, timestamp) = if hqc
116            .certified_block()
117            .has_reconfiguration()
118        {
119            // Reconfiguration rule - we propose empty blocks with parents'
120            // timestamp after reconfiguration until it's committed
121            (vec![], hqc.certified_block().timestamp_usecs())
122        } else {
123            // One needs to hold the blocks with the references to the payloads
124            // while get_block is being executed: pending blocks
125            // vector keeps all the pending ancestors of the extended branch.
126            let mut pending_blocks = self
127                .block_store
128                .path_from_root(hqc.certified_block().id())
129                .ok_or_else(|| {
130                    format_err!(
131                        "HQC {} already pruned",
132                        hqc.certified_block().id()
133                    )
134                })?;
135            // Avoid txn manager long poll it the root block has txns, so that
136            // the leader can deliver the commit proof to others
137            // without delay.
138            pending_blocks.insert(0, self.block_store.root());
139
140            // Exclude all the pending transactions: these are all the ancestors
141            // of parent (including) up to the root (including).
142            let exclude_payload: Vec<&Vec<_>> = pending_blocks
143                .iter()
144                .flat_map(|block| block.payload())
145                .collect();
146
147            // All proposed blocks in a branch are guaranteed to have increasing
148            // timestamps since their predecessor block will not be
149            // added to the BlockStore until the local time exceeds
150            // it.
151            let timestamp = self.time_service.get_current_timestamp();
152
153            // TODO(lpl): Check what to do if `parent_block !=
154            // hqc.certified_block()`
155            let parent_block = pending_blocks.last().expect("root pushed");
156            assert!(
157                hqc.certified_block().id() == parent_block.id(),
158                "generate_proposal: hqc = parent"
159            );
160
161            let mut payload = self
162                .txn_manager
163                .pull_txns(
164                    self.max_block_size,
165                    exclude_payload,
166                    parent_block.id(),
167                    validators,
168                )
169                .await
170                .context("Fail to retrieve txn")?;
171            diem_debug!(
172                "generate_proposal: Pull {} transactions",
173                payload.len()
174            );
175
176            // Sending non-existent decision (default value here) will return
177            // the latest pivot decision.
178            let parent_decision = parent_block
179                .block_info()
180                .pivot_decision()
181                .cloned()
182                .unwrap_or_default();
183            let new_pivot_decision =
184                payload.iter().find_map(|tx| match tx.payload() {
185                    TransactionPayload::PivotDecision(decision) => {
186                        if decision.height
187                            <= parent_block
188                                .block_info()
189                                .pivot_decision()
190                                .map(|d| d.height)
191                                .unwrap_or_default()
192                        {
193                            None
194                        } else {
195                            Some(decision.clone())
196                        }
197                    }
198                    _ => None,
199                });
200            if new_pivot_decision.is_none()
201                && payload.last().is_some()
202                && matches!(
203                    payload.last().unwrap().payload(),
204                    TransactionPayload::PivotDecision(_)
205                )
206            {
207                payload.pop();
208            }
209
210            match new_pivot_decision {
211                Some(me_decision) => {
212                    // Included new registered or updated nodes as transactions.
213                    let staking_events = self.pow_handler.get_staking_events(
214                        parent_decision.height,
215                        me_decision.height,
216                        parent_decision.block_hash,
217                        me_decision.block_hash,
218                    )?;
219                    diem_debug!(
220                        "generate_proposal: staking_events={:?} parent={:?} me={:?}",
221                        staking_events, parent_block.block_info().pivot_decision(), payload.last()
222                    );
223                    for event in staking_events {
224                        match RawTransaction::from_staking_event(
225                            &event,
226                            self.author,
227                        ) {
228                            Ok(raw_tx) => {
229                                let signed_tx = raw_tx
230                                    .sign(&self.private_key)?
231                                    .into_inner();
232                                payload.push(signed_tx);
233                            }
234                            // TODO(lpl): This is not supposed to happen, so
235                            // should we return error here?
236                            Err(e) => diem_error!(
237                                "Get invalid staking event: err={:?}",
238                                e
239                            ),
240                        }
241                    }
242                }
243                None => {
244                    warn!("pos progress without new pivot decision");
245                }
246            }
247
248            (payload, timestamp.as_micros() as u64)
249        };
250
251        // create block proposal
252        Ok(BlockData::new_proposal(
253            payload,
254            self.author,
255            round,
256            timestamp,
257            hqc.as_ref().clone(),
258        ))
259    }
260
261    fn ensure_highest_quorum_cert(
262        &self, round: Round,
263    ) -> anyhow::Result<Arc<QuorumCert>> {
264        let hqc = self.block_store.highest_quorum_cert();
265        ensure!(
266            hqc.certified_block().round() < round,
267            "Given round {} is lower than hqc round {}",
268            round,
269            hqc.certified_block().round()
270        );
271        ensure!(
272            !hqc.ends_epoch(),
273            "The epoch has already ended,a proposal is not allowed to generated"
274        );
275
276        Ok(hqc)
277    }
278}
279
280/// The functions used in tests to construct attack cases
281impl ProposalGenerator {
282    pub fn force_propose(
283        &self, round: Round, parent_qc: Arc<QuorumCert>,
284        payload: Vec<TransactionPayload>,
285    ) -> anyhow::Result<BlockData> {
286        let payload = payload
287            .into_iter()
288            .map(|p| {
289                let raw_tx = RawTransaction::new(
290                    self.author,
291                    p,
292                    u64::MAX,
293                    Default::default(),
294                );
295                raw_tx.sign(&self.private_key).map(|tx| tx.into_inner())
296            })
297            .collect::<anyhow::Result<Vec<SignedTransaction>>>()?;
298
299        Ok(BlockData::new_proposal(
300            payload,
301            self.author,
302            round,
303            self.time_service.get_current_timestamp().as_micros() as u64,
304            parent_qc.as_ref().clone(),
305        ))
306    }
307}