cfxcore/pos/consensus/liveness/
proposal_generator.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8use std::sync::Arc;
9
10use anyhow::{bail, ensure, format_err, Context};
11
12use consensus_types::{
13    block::Block,
14    block_data::BlockData,
15    common::{Author, Round},
16    quorum_cert::QuorumCert,
17};
18use diem_infallible::Mutex;
19use diem_logger::{debug as diem_debug, error as diem_error};
20use diem_types::{
21    transaction::{RawTransaction, SignedTransaction, TransactionPayload},
22    validator_config::ConsensusPrivateKey,
23    validator_verifier::ValidatorVerifier,
24};
25use pow_types::PowInterface;
26
27use crate::pos::consensus::{
28    block_storage::BlockReader, state_replication::TxnManager,
29    util::time_service::TimeService,
30};
31
32#[cfg(test)]
33#[path = "proposal_generator_test.rs"]
34mod proposal_generator_test;
35
36/// ProposalGenerator is responsible for generating the proposed block on
37/// demand: it's typically used by a validator that believes it's a valid
38/// candidate for serving as a proposer at a given round.
39/// ProposalGenerator is the one choosing the branch to extend:
40/// - round is given by the caller (typically determined by RoundState).
41/// The transactions for the proposed block are delivered by TxnManager.
42///
43/// TxnManager should be aware of the pending transactions in the branch that it
44/// is extending, such that it will filter them out to avoid transaction
45/// duplication.
46pub struct ProposalGenerator {
47    // The account address of this validator
48    author: Author,
49    // Block store is queried both for finding the branch to extend and for
50    // generating the proposed block.
51    block_store: Arc<dyn BlockReader + Send + Sync>,
52    // Transaction manager is delivering the transactions.
53    txn_manager: Arc<dyn TxnManager>,
54    // Time service to generate block timestamps
55    time_service: Arc<dyn TimeService>,
56    // Max number of transactions to be added to a proposed block.
57    max_block_size: u64,
58    // Last round that a proposal was generated
59    last_round_generated: Mutex<Round>,
60    // Handle the interaction with PoW consensus.
61    pow_handler: Arc<dyn PowInterface>,
62    // TODO(lpl): Where to put them?
63    pub private_key: ConsensusPrivateKey,
64}
65
66impl ProposalGenerator {
67    pub fn new(
68        author: Author, block_store: Arc<dyn BlockReader + Send + Sync>,
69        txn_manager: Arc<dyn TxnManager>, time_service: Arc<dyn TimeService>,
70        max_block_size: u64, pow_handler: Arc<dyn PowInterface>,
71        private_key: ConsensusPrivateKey,
72    ) -> Self {
73        Self {
74            author,
75            block_store,
76            txn_manager,
77            time_service,
78            max_block_size,
79            last_round_generated: Mutex::new(0),
80            pow_handler,
81            private_key,
82        }
83    }
84
85    pub fn author(&self) -> Author { self.author }
86
87    /// Creates a NIL block proposal extending the highest certified block from
88    /// the block store.
89    pub fn generate_nil_block(&self, round: Round) -> anyhow::Result<Block> {
90        let hqc = self.ensure_highest_quorum_cert(round)?;
91        Ok(Block::new_nil(round, hqc.as_ref().clone()))
92    }
93
94    /// The function generates a new proposal block: the returned future is
95    /// fulfilled when the payload is delivered by the TxnManager
96    /// implementation.  At most one proposal can be generated per round (no
97    /// proposal equivocation allowed). Errors returned by the TxnManager
98    /// implementation are propagated to the caller. The logic for choosing
99    /// the branch to extend is as follows: 1. The function gets the highest
100    /// head of a one-chain from block tree. The new proposal must extend
101    /// hqc to ensure optimistic responsiveness. 2. The round is provided by
102    /// the caller. 3. In case a given round is not greater than the
103    /// calculated parent, return an OldRound error.
104    pub async fn generate_proposal(
105        &mut self, round: Round, validators: ValidatorVerifier,
106    ) -> anyhow::Result<BlockData> {
107        {
108            let mut last_round_generated = self.last_round_generated.lock();
109            if *last_round_generated < round {
110                *last_round_generated = round;
111            } else {
112                bail!("Already proposed in the round {}", round);
113            }
114        }
115
116        let hqc = self.ensure_highest_quorum_cert(round)?;
117
118        // TODO(lpl): Handle reconfiguraiton.
119        let (payload, timestamp) = if hqc
120            .certified_block()
121            .has_reconfiguration()
122        {
123            // Reconfiguration rule - we propose empty blocks with parents'
124            // timestamp after reconfiguration until it's committed
125            (vec![], hqc.certified_block().timestamp_usecs())
126        } else {
127            // One needs to hold the blocks with the references to the payloads
128            // while get_block is being executed: pending blocks
129            // vector keeps all the pending ancestors of the extended branch.
130            let mut pending_blocks = self
131                .block_store
132                .path_from_root(hqc.certified_block().id())
133                .ok_or_else(|| {
134                    format_err!(
135                        "HQC {} already pruned",
136                        hqc.certified_block().id()
137                    )
138                })?;
139            // Avoid txn manager long poll it the root block has txns, so that
140            // the leader can deliver the commit proof to others
141            // without delay.
142            pending_blocks.insert(0, self.block_store.root());
143
144            // Exclude all the pending transactions: these are all the ancestors
145            // of parent (including) up to the root (including).
146            let exclude_payload: Vec<&Vec<_>> = pending_blocks
147                .iter()
148                .flat_map(|block| block.payload())
149                .collect();
150
151            // All proposed blocks in a branch are guaranteed to have increasing
152            // timestamps since their predecessor block will not be
153            // added to the BlockStore until the local time exceeds
154            // it.
155            let timestamp = self.time_service.get_current_timestamp();
156
157            // TODO(lpl): Check what to do if `parent_block !=
158            // hqc.certified_block()`
159            let parent_block = pending_blocks.last().expect("root pushed");
160            assert!(
161                hqc.certified_block().id() == parent_block.id(),
162                "generate_proposal: hqc = parent"
163            );
164
165            let mut payload = self
166                .txn_manager
167                .pull_txns(
168                    self.max_block_size,
169                    exclude_payload,
170                    parent_block.id(),
171                    validators,
172                )
173                .await
174                .context("Fail to retrieve txn")?;
175            diem_debug!(
176                "generate_proposal: Pull {} transactions",
177                payload.len()
178            );
179
180            // Sending non-existent decision (default value here) will return
181            // the latest pivot decision.
182            let parent_decision = parent_block
183                .block_info()
184                .pivot_decision()
185                .cloned()
186                .unwrap_or_default();
187            let new_pivot_decision =
188                payload.iter().find_map(|tx| match tx.payload() {
189                    TransactionPayload::PivotDecision(decision) => {
190                        if decision.height
191                            <= parent_block
192                                .block_info()
193                                .pivot_decision()
194                                .map(|d| d.height)
195                                .unwrap_or_default()
196                        {
197                            None
198                        } else {
199                            Some(decision.clone())
200                        }
201                    }
202                    _ => None,
203                });
204            if new_pivot_decision.is_none()
205                && payload.last().is_some()
206                && matches!(
207                    payload.last().unwrap().payload(),
208                    TransactionPayload::PivotDecision(_)
209                )
210            {
211                payload.pop();
212            }
213
214            match new_pivot_decision {
215                Some(me_decision) => {
216                    // Included new registered or updated nodes as transactions.
217                    let staking_events = self.pow_handler.get_staking_events(
218                        parent_decision.height,
219                        me_decision.height,
220                        parent_decision.block_hash,
221                        me_decision.block_hash,
222                    )?;
223                    diem_debug!(
224                        "generate_proposal: staking_events={:?} parent={:?} me={:?}",
225                        staking_events, parent_block.block_info().pivot_decision(), payload.last()
226                    );
227                    for event in staking_events {
228                        match RawTransaction::from_staking_event(
229                            &event,
230                            self.author,
231                        ) {
232                            Ok(raw_tx) => {
233                                let signed_tx = raw_tx
234                                    .sign(&self.private_key)?
235                                    .into_inner();
236                                payload.push(signed_tx);
237                            }
238                            // TODO(lpl): This is not supposed to happen, so
239                            // should we return error here?
240                            Err(e) => diem_error!(
241                                "Get invalid staking event: err={:?}",
242                                e
243                            ),
244                        }
245                    }
246                }
247                None => {
248                    warn!("pos progress without new pivot decision");
249                }
250            }
251
252            (payload, timestamp.as_micros() as u64)
253        };
254
255        // create block proposal
256        Ok(BlockData::new_proposal(
257            payload,
258            self.author,
259            round,
260            timestamp,
261            hqc.as_ref().clone(),
262        ))
263    }
264
265    fn ensure_highest_quorum_cert(
266        &self, round: Round,
267    ) -> anyhow::Result<Arc<QuorumCert>> {
268        let hqc = self.block_store.highest_quorum_cert();
269        ensure!(
270            hqc.certified_block().round() < round,
271            "Given round {} is lower than hqc round {}",
272            round,
273            hqc.certified_block().round()
274        );
275        ensure!(
276            !hqc.ends_epoch(),
277            "The epoch has already ended,a proposal is not allowed to generated"
278        );
279
280        Ok(hqc)
281    }
282}
283
284/// The functions used in tests to construct attack cases
285impl ProposalGenerator {
286    pub fn force_propose(
287        &self, round: Round, parent_qc: Arc<QuorumCert>,
288        payload: Vec<TransactionPayload>,
289    ) -> anyhow::Result<BlockData> {
290        let payload = payload
291            .into_iter()
292            .map(|p| {
293                let raw_tx = RawTransaction::new(
294                    self.author,
295                    p,
296                    u64::MAX,
297                    Default::default(),
298                );
299                raw_tx.sign(&self.private_key).map(|tx| tx.into_inner())
300            })
301            .collect::<anyhow::Result<Vec<SignedTransaction>>>()?;
302
303        Ok(BlockData::new_proposal(
304            payload,
305            self.author,
306            round,
307            self.time_service.get_current_timestamp().as_micros() as u64,
308            parent_qc.as_ref().clone(),
309        ))
310    }
311}