1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0

// Copyright 2021 Conflux Foundation. All rights reserved.
// Conflux is free software and distributed under GNU General Public License.
// See http://www.gnu.org/licenses/

use std::sync::Arc;

use anyhow::{bail, ensure, format_err, Context};

use consensus_types::{
    block::Block,
    block_data::BlockData,
    common::{Author, Round},
    quorum_cert::QuorumCert,
};
use diem_infallible::Mutex;
use diem_logger::{debug as diem_debug, error as diem_error};
use diem_types::{
    transaction::{RawTransaction, SignedTransaction, TransactionPayload},
    validator_config::{
        ConsensusPrivateKey, ConsensusPublicKey, ConsensusVRFPrivateKey,
        ConsensusVRFPublicKey,
    },
    validator_verifier::ValidatorVerifier,
};
use pow_types::PowInterface;

use crate::pos::consensus::{
    block_storage::BlockReader, state_replication::TxnManager,
    util::time_service::TimeService,
};

#[cfg(test)]
#[path = "proposal_generator_test.rs"]
mod proposal_generator_test;

/// ProposalGenerator is responsible for generating the proposed block on
/// demand: it's typically used by a validator that believes it's a valid
/// candidate for serving as a proposer at a given round.
/// ProposalGenerator is the one choosing the branch to extend:
/// - round is given by the caller (typically determined by RoundState).
/// The transactions for the proposed block are delivered by TxnManager.
///
/// TxnManager should be aware of the pending transactions in the branch that it
/// is extending, such that it will filter them out to avoid transaction
/// duplication.
pub struct ProposalGenerator {
    // The account address of this validator
    author: Author,
    // Block store is queried both for finding the branch to extend and for
    // generating the proposed block.
    block_store: Arc<dyn BlockReader + Send + Sync>,
    // Transaction manager is delivering the transactions.
    txn_manager: Arc<dyn TxnManager>,
    // Time service to generate block timestamps
    time_service: Arc<dyn TimeService>,
    // Max number of transactions to be added to a proposed block.
    max_block_size: u64,
    // Last round that a proposal was generated
    last_round_generated: Mutex<Round>,
    // Handle the interaction with PoW consensus.
    pow_handler: Arc<dyn PowInterface>,
    // TODO(lpl): Where to put them?
    pub private_key: ConsensusPrivateKey,
    pub public_key: ConsensusPublicKey,
    pub vrf_private_key: ConsensusVRFPrivateKey,
    pub vrf_public_key: ConsensusVRFPublicKey,
}

impl ProposalGenerator {
    pub fn new(
        author: Author, block_store: Arc<dyn BlockReader + Send + Sync>,
        txn_manager: Arc<dyn TxnManager>, time_service: Arc<dyn TimeService>,
        max_block_size: u64, pow_handler: Arc<dyn PowInterface>,
        private_key: ConsensusPrivateKey, public_key: ConsensusPublicKey,
        vrf_private_key: ConsensusVRFPrivateKey,
        vrf_public_key: ConsensusVRFPublicKey,
    ) -> Self {
        Self {
            author,
            block_store,
            txn_manager,
            time_service,
            max_block_size,
            last_round_generated: Mutex::new(0),
            pow_handler,
            private_key,
            public_key,
            vrf_private_key,
            vrf_public_key,
        }
    }

    pub fn author(&self) -> Author { self.author }

    /// Creates a NIL block proposal extending the highest certified block from
    /// the block store.
    pub fn generate_nil_block(&self, round: Round) -> anyhow::Result<Block> {
        let hqc = self.ensure_highest_quorum_cert(round)?;
        Ok(Block::new_nil(round, hqc.as_ref().clone()))
    }

    /// The function generates a new proposal block: the returned future is
    /// fulfilled when the payload is delivered by the TxnManager
    /// implementation.  At most one proposal can be generated per round (no
    /// proposal equivocation allowed). Errors returned by the TxnManager
    /// implementation are propagated to the caller. The logic for choosing
    /// the branch to extend is as follows: 1. The function gets the highest
    /// head of a one-chain from block tree. The new proposal must extend
    /// hqc to ensure optimistic responsiveness. 2. The round is provided by
    /// the caller. 3. In case a given round is not greater than the
    /// calculated parent, return an OldRound error.
    pub async fn generate_proposal(
        &mut self, round: Round, validators: ValidatorVerifier,
    ) -> anyhow::Result<BlockData> {
        {
            let mut last_round_generated = self.last_round_generated.lock();
            if *last_round_generated < round {
                *last_round_generated = round;
            } else {
                bail!("Already proposed in the round {}", round);
            }
        }

        let hqc = self.ensure_highest_quorum_cert(round)?;

        // TODO(lpl): Handle reconfiguraiton.
        let (payload, timestamp) = if hqc
            .certified_block()
            .has_reconfiguration()
        {
            // Reconfiguration rule - we propose empty blocks with parents'
            // timestamp after reconfiguration until it's committed
            (vec![], hqc.certified_block().timestamp_usecs())
        } else {
            // One needs to hold the blocks with the references to the payloads
            // while get_block is being executed: pending blocks
            // vector keeps all the pending ancestors of the extended branch.
            let mut pending_blocks = self
                .block_store
                .path_from_root(hqc.certified_block().id())
                .ok_or_else(|| {
                    format_err!(
                        "HQC {} already pruned",
                        hqc.certified_block().id()
                    )
                })?;
            // Avoid txn manager long poll it the root block has txns, so that
            // the leader can deliver the commit proof to others
            // without delay.
            pending_blocks.insert(0, self.block_store.root());

            // Exclude all the pending transactions: these are all the ancestors
            // of parent (including) up to the root (including).
            let exclude_payload: Vec<&Vec<_>> = pending_blocks
                .iter()
                .flat_map(|block| block.payload())
                .collect();

            // All proposed blocks in a branch are guaranteed to have increasing
            // timestamps since their predecessor block will not be
            // added to the BlockStore until the local time exceeds
            // it.
            let timestamp = self.time_service.get_current_timestamp();

            // TODO(lpl): Check what to do if `parent_block !=
            // hqc.certified_block()`
            let parent_block = pending_blocks.last().expect("root pushed");
            assert!(
                hqc.certified_block().id() == parent_block.id(),
                "generate_proposal: hqc = parent"
            );

            let mut payload = self
                .txn_manager
                .pull_txns(
                    self.max_block_size,
                    exclude_payload,
                    parent_block.id(),
                    validators,
                )
                .await
                .context("Fail to retrieve txn")?;
            diem_debug!(
                "generate_proposal: Pull {} transactions",
                payload.len()
            );

            // Sending non-existent decision (default value here) will return
            // the latest pivot decision.
            let parent_decision = parent_block
                .block_info()
                .pivot_decision()
                .cloned()
                .unwrap_or_default();
            let new_pivot_decision =
                payload.iter().find_map(|tx| match tx.payload() {
                    TransactionPayload::PivotDecision(decision) => {
                        if decision.height
                            <= parent_block
                                .block_info()
                                .pivot_decision()
                                .map(|d| d.height)
                                .unwrap_or_default()
                        {
                            None
                        } else {
                            Some(decision.clone())
                        }
                    }
                    _ => None,
                });
            if new_pivot_decision.is_none()
                && payload.last().is_some()
                && matches!(
                    payload.last().unwrap().payload(),
                    TransactionPayload::PivotDecision(_)
                )
            {
                payload.pop();
            }

            match new_pivot_decision {
                Some(me_decision) => {
                    // Included new registered or updated nodes as transactions.
                    let staking_events = self.pow_handler.get_staking_events(
                        parent_decision.height,
                        me_decision.height,
                        parent_decision.block_hash,
                        me_decision.block_hash,
                    )?;
                    diem_debug!(
                        "generate_proposal: staking_events={:?} parent={:?} me={:?}",
                        staking_events, parent_block.block_info().pivot_decision(), payload.last()
                    );
                    for event in staking_events {
                        match RawTransaction::from_staking_event(
                            &event,
                            self.author,
                        ) {
                            Ok(raw_tx) => {
                                let signed_tx = raw_tx
                                    .sign(&self.private_key)?
                                    .into_inner();
                                payload.push(signed_tx);
                            }
                            // TODO(lpl): This is not supposed to happen, so
                            // should we return error here?
                            Err(e) => diem_error!(
                                "Get invalid staking event: err={:?}",
                                e
                            ),
                        }
                    }
                }
                None => {
                    warn!("pos progress without new pivot decision");
                }
            }

            (payload, timestamp.as_micros() as u64)
        };

        // create block proposal
        Ok(BlockData::new_proposal(
            payload,
            self.author,
            round,
            timestamp,
            hqc.as_ref().clone(),
        ))
    }

    fn ensure_highest_quorum_cert(
        &self, round: Round,
    ) -> anyhow::Result<Arc<QuorumCert>> {
        let hqc = self.block_store.highest_quorum_cert();
        ensure!(
            hqc.certified_block().round() < round,
            "Given round {} is lower than hqc round {}",
            round,
            hqc.certified_block().round()
        );
        ensure!(
            !hqc.ends_epoch(),
            "The epoch has already ended,a proposal is not allowed to generated"
        );

        Ok(hqc)
    }
}

/// The functions used in tests to construct attack cases
impl ProposalGenerator {
    pub fn force_propose(
        &self, round: Round, parent_qc: Arc<QuorumCert>,
        payload: Vec<TransactionPayload>,
    ) -> anyhow::Result<BlockData> {
        let payload = payload
            .into_iter()
            .map(|p| {
                let raw_tx = RawTransaction::new(
                    self.author,
                    p,
                    u64::MAX,
                    Default::default(),
                );
                raw_tx.sign(&self.private_key).map(|tx| tx.into_inner())
            })
            .collect::<anyhow::Result<Vec<SignedTransaction>>>()?;

        Ok(BlockData::new_proposal(
            payload,
            self.author,
            round,
            self.time_service.get_current_timestamp().as_micros() as u64,
            parent_qc.as_ref().clone(),
        ))
    }
}