safety_rules/
safety_rules.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8use crate::{
9    configurable_validator_signer::ConfigurableValidatorSigner,
10    consensus_state::ConsensusState,
11    counters,
12    error::Error,
13    logging::{LogEntry, LogEvent, SafetyLogSchema},
14    persistent_safety_storage::PersistentSafetyStorage,
15    t_safety_rules::TSafetyRules,
16};
17use consensus_types::{
18    block::Block,
19    block_data::BlockData,
20    common::{Author, Round},
21    quorum_cert::QuorumCert,
22    safety_data::SafetyData,
23    timeout::Timeout,
24    vote::Vote,
25    vote_data::VoteData,
26    vote_proposal::{MaybeSignedVoteProposal, VoteProposal},
27};
28use diem_crypto::{
29    hash::{CryptoHash, HashValue},
30    traits::Signature,
31    PrivateKey,
32};
33use diem_logger::prelude::*;
34use diem_types::{
35    account_address::AccountAddress,
36    block_info::BlockInfo,
37    epoch_change::EpochChangeProof,
38    epoch_state::EpochState,
39    ledger_info::LedgerInfo,
40    validator_config::{
41        ConsensusPublicKey, ConsensusSignature, ConsensusVRFPrivateKey,
42    },
43    waypoint::Waypoint,
44};
45use log::error;
46use serde::Serialize;
47use std::cmp::Ordering;
48
49const SAFETY_STORAGE_SAVE_SUFFIX: &str = "json.save";
50
51/// @TODO consider a cache of verified QCs to cut down on verification costs
52pub struct SafetyRules {
53    persistent_storage: PersistentSafetyStorage,
54    execution_public_key: Option<ConsensusPublicKey>,
55    export_consensus_key: bool,
56    validator_signer: Option<ConfigurableValidatorSigner>,
57    epoch_state: Option<EpochState>,
58    vrf_private_key: Option<ConsensusVRFPrivateKey>,
59}
60
61impl SafetyRules {
62    /// Constructs a new instance of SafetyRules with the given persistent
63    /// storage and the consensus private keys
64    pub fn new(
65        persistent_storage: PersistentSafetyStorage,
66        _verify_vote_proposal_signature: bool, export_consensus_key: bool,
67        vrf_private_key: Option<ConsensusVRFPrivateKey>,
68        author: AccountAddress,
69    ) -> Self {
70        let execution_public_key = None;
71        if let Ok(storage_author) = persistent_storage.author() {
72            if storage_author != author {
73                // TODO: After we allow non-voting nodes to not have a pos key,
74                // we can panic here so pos-voting nodes can be aware of this
75                // issue.
76                diem_error!(
77                    "author in secure_storage does not match PoS keys!"
78                );
79                error!("author in secure_storage does not match PoS keys!");
80            }
81        }
82        /*
83            if verify_vote_proposal_signature {
84            None
85        Some(
86            persistent_storage
87                .execution_public_key()
88                .expect("Unable to retrieve execution public key"),
89        } else {
90            None
91        )*/
92        Self {
93            persistent_storage,
94            execution_public_key,
95            export_consensus_key,
96            validator_signer: None,
97            epoch_state: None,
98            vrf_private_key,
99        }
100    }
101
102    fn sign<T: Serialize + CryptoHash>(
103        &self, message: &T,
104    ) -> Result<ConsensusSignature, Error> {
105        let signer = self.signer()?;
106        signer.sign(message, &self.persistent_storage)
107    }
108
109    fn signer(&self) -> Result<&ConfigurableValidatorSigner, Error> {
110        self.validator_signer
111            .as_ref()
112            .ok_or_else(|| Error::NotInitialized("validator_signer".into()))
113    }
114
115    fn epoch_state(&self) -> Result<&EpochState, Error> {
116        self.epoch_state
117            .as_ref()
118            .ok_or_else(|| Error::NotInitialized("epoch_state".into()))
119    }
120
121    /// Check if the executed result extends the parent result.
122    pub fn extension_check(
123        vote_proposal: &VoteProposal,
124    ) -> Result<VoteData, Error> {
125        let proposed_block = vote_proposal.block();
126        let new_tree = vote_proposal
127            .accumulator_extension_proof()
128            .verify(
129                proposed_block
130                    .quorum_cert()
131                    .certified_block()
132                    .executed_state_id(),
133            )
134            .map_err(|e| Error::InvalidAccumulatorExtension(e.to_string()))?;
135        Ok(VoteData::new(
136            proposed_block.gen_block_info(
137                // TODO(lpl): Remove state tree.
138                Default::default(),
139                new_tree.version(),
140                vote_proposal.next_epoch_state().cloned(),
141                vote_proposal.pivot_decision().clone(),
142            ),
143            proposed_block.quorum_cert().certified_block().clone(),
144        ))
145    }
146
147    /// Produces a LedgerInfo that either commits a block based upon the 3-chain
148    /// commit rule or an empty LedgerInfo for no commit. The 3-chain commit
149    /// rule is: B0 and its prefixes can be committed if there exist
150    /// certified blocks B1 and B2 that satisfy: 1) B0 <- B1 <- B2 <--
151    /// 2) round(B0) + 1 = round(B1), and
152    /// 3) round(B1) + 1 = round(B2).
153    pub fn construct_ledger_info(
154        proposed_block: &Block, consensus_data_hash: HashValue,
155    ) -> Result<LedgerInfo, Error> {
156        let block2 = proposed_block.round();
157        let block1 = proposed_block.quorum_cert().certified_block().round();
158        let block0 = proposed_block.quorum_cert().parent_block().round();
159
160        // verify 3-chain rule
161        let next_round = |round: u64| {
162            u64::checked_add(round, 1).ok_or(Error::IncorrectRound(round))
163        };
164        let commit =
165            next_round(block0)? == block1 && next_round(block1)? == block2;
166
167        // create a ledger info
168        let commit_info = if commit {
169            proposed_block.quorum_cert().parent_block().clone()
170        } else {
171            BlockInfo::empty()
172        };
173
174        Ok(LedgerInfo::new(commit_info, consensus_data_hash))
175    }
176
177    /// Second voting rule
178    fn verify_and_update_preferred_round(
179        &mut self, quorum_cert: &QuorumCert, safety_data: &mut SafetyData,
180    ) -> Result<bool, Error> {
181        let preferred_round = safety_data.preferred_round;
182        let one_chain_round = quorum_cert.certified_block().round();
183        let two_chain_round = quorum_cert.parent_block().round();
184
185        if one_chain_round < preferred_round {
186            return Err(Error::IncorrectPreferredRound(
187                one_chain_round,
188                preferred_round,
189            ));
190        }
191
192        let updated = match two_chain_round.cmp(&preferred_round) {
193            Ordering::Greater => {
194                safety_data.preferred_round = two_chain_round;
195                diem_info!(SafetyLogSchema::new(
196                    LogEntry::PreferredRound,
197                    LogEvent::Update
198                )
199                .preferred_round(safety_data.preferred_round));
200                true
201            }
202            Ordering::Less => {
203                diem_trace!(
204                "2-chain round {} is lower than preferred round {} but 1-chain round {} is higher.",
205                two_chain_round, preferred_round, one_chain_round
206            );
207                false
208            }
209            Ordering::Equal => false,
210        };
211        Ok(updated)
212    }
213
214    /// This verifies whether the author of one proposal is the validator signer
215    fn verify_author(&self, author: Option<Author>) -> Result<(), Error> {
216        let validator_signer_author = &self.signer()?.author();
217        let author = author.ok_or_else(|| {
218            Error::InvalidProposal("No author found in the proposal".into())
219        })?;
220        if validator_signer_author != &author {
221            return Err(Error::InvalidProposal(
222                "Proposal author is not validator signer!".into(),
223            ));
224        }
225        Ok(())
226    }
227
228    /// This verifies the epoch given against storage for consistent
229    /// verification
230    fn verify_epoch(
231        &self, epoch: u64, safety_data: &SafetyData,
232    ) -> Result<(), Error> {
233        if epoch != safety_data.epoch {
234            return Err(Error::IncorrectEpoch(epoch, safety_data.epoch));
235        }
236
237        Ok(())
238    }
239
240    /// First voting rule
241    fn verify_and_update_last_vote_round(
242        &self, round: Round, safety_data: &mut SafetyData,
243    ) -> Result<(), Error> {
244        if round <= safety_data.last_voted_round {
245            return Err(Error::IncorrectLastVotedRound(
246                round,
247                safety_data.last_voted_round,
248            ));
249        }
250
251        safety_data.last_voted_round = round;
252        diem_info!(SafetyLogSchema::new(
253            LogEntry::LastVotedRound,
254            LogEvent::Update
255        )
256        .last_voted_round(safety_data.last_voted_round));
257
258        Ok(())
259    }
260
261    /// This verifies a QC has valid signatures.
262    fn verify_qc(&self, qc: &QuorumCert) -> Result<(), Error> {
263        let epoch_state = self.epoch_state()?;
264
265        qc.verify(&epoch_state.verifier())
266            .map_err(|e| Error::InvalidQuorumCertificate(e.to_string()))?;
267        Ok(())
268    }
269
270    // Internal functions mapped to the public interface to enable exhaustive
271    // logging and metrics
272
273    fn guarded_consensus_state(&mut self) -> Result<ConsensusState, Error> {
274        let waypoint = self.persistent_storage.waypoint()?;
275        let safety_data = self.persistent_storage.safety_data()?;
276
277        diem_info!(SafetyLogSchema::new(LogEntry::State, LogEvent::Update)
278            .author(self.persistent_storage.author()?)
279            .epoch(safety_data.epoch)
280            .last_voted_round(safety_data.last_voted_round)
281            .preferred_round(safety_data.preferred_round)
282            .waypoint(waypoint));
283
284        Ok(ConsensusState::new(
285            self.persistent_storage.safety_data()?,
286            self.persistent_storage.waypoint()?,
287            self.signer().is_ok(),
288        ))
289    }
290
291    fn guarded_initialize(
292        &mut self, proof: &EpochChangeProof,
293    ) -> Result<(), Error> {
294        let waypoint = self.persistent_storage.waypoint()?;
295        let last_li = proof
296            .verify(&waypoint)
297            .map_err(|e| Error::InvalidEpochChangeProof(format!("{}", e)))?;
298        let ledger_info = last_li.ledger_info();
299        let epoch_state = ledger_info
300            .next_epoch_state()
301            .cloned()
302            .ok_or(Error::InvalidLedgerInfo)?;
303
304        let current_epoch = self.persistent_storage.safety_data()?.epoch;
305        if current_epoch < epoch_state.epoch {
306            // This is ordered specifically to avoid configuration issues:
307            // * First set the waypoint to lock in the minimum restarting point,
308            // * set the round information,
309            // * finally, set the epoch information because once the epoch is
310            //   set, this `if`
311            // statement cannot be re-entered.
312            let waypoint = &Waypoint::new_epoch_boundary(ledger_info)
313                .map_err(|error| Error::InternalError(error.to_string()))?;
314            self.persistent_storage.set_waypoint(waypoint)?;
315            self.persistent_storage.set_safety_data(SafetyData::new(
316                epoch_state.epoch,
317                0,
318                0,
319                None,
320            ))?;
321
322            diem_info!(SafetyLogSchema::new(LogEntry::Epoch, LogEvent::Update)
323                .epoch(epoch_state.epoch));
324        }
325        self.epoch_state = Some(epoch_state.clone());
326
327        let author = self.persistent_storage.author()?;
328        let expected_key = epoch_state.verifier().get_public_key(&author);
329        let initialize_result = match expected_key {
330            None => {
331                diem_debug!(
332                    "guarded_initialize: not a validator in epoch_set={:?}",
333                    epoch_state
334                );
335                self.validator_signer = None;
336                Ok(())
337            }
338            Some(expected_key) => {
339                let current_key = self.signer().ok().map(|s| s.public_key());
340                if current_key == Some(expected_key.clone()) {
341                    diem_debug!(
342                        SafetyLogSchema::new(
343                            LogEntry::KeyReconciliation,
344                            LogEvent::Success
345                        ),
346                        "in set",
347                    );
348                    Ok(())
349                } else if self.export_consensus_key {
350                    // Try to export the consensus key directly from storage.
351                    match self
352                        .persistent_storage
353                        .consensus_key_for_version(expected_key.clone())
354                    {
355                        Ok(consensus_key) => {
356                            if consensus_key.public_key() != expected_key {
357                                Err(Error::ValidatorKeyNotFound("exported key does not match the expected key".into()))
358                            } else {
359                                self.validator_signer = Some(
360                                    ConfigurableValidatorSigner::new_signer(
361                                        author,
362                                        consensus_key,
363                                        self.vrf_private_key.clone(),
364                                    ),
365                                );
366                                Ok(())
367                            }
368                        }
369                        Err(Error::SecureStorageMissingDataError(error)) => {
370                            Err(Error::ValidatorKeyNotFound(error))
371                        }
372                        Err(error) => Err(error),
373                    }
374                } else {
375                    // Try to generate a signature over a test message to ensure
376                    // the expected key is actually held in
377                    // storage.
378                    self.validator_signer =
379                        Some(ConfigurableValidatorSigner::new_handle(
380                            author,
381                            expected_key.clone(),
382                        ));
383                    self.sign(&Timeout::new(0, 0)).and_then(|signature| {
384                        signature
385                            .verify(&Timeout::new(0, 0), &expected_key)
386                            .map_err(|error| {
387                                Error::ValidatorKeyNotFound(error.to_string())
388                            })
389                    })
390                }
391            }
392        };
393        initialize_result.map_err(|error| {
394            diem_info!(SafetyLogSchema::new(
395                LogEntry::KeyReconciliation,
396                LogEvent::Error
397            )
398            .error(&error),);
399            self.validator_signer = None;
400            error
401        })
402    }
403
404    fn guarded_construct_and_sign_vote(
405        &mut self, maybe_signed_vote_proposal: &MaybeSignedVoteProposal,
406    ) -> Result<Vote, Error> {
407        // Exit early if we cannot sign
408        self.signer()?;
409
410        let vote_proposal = &maybe_signed_vote_proposal.vote_proposal;
411        let execution_signature = maybe_signed_vote_proposal.signature.as_ref();
412
413        if let Some(public_key) = self.execution_public_key.as_ref() {
414            execution_signature
415                .ok_or(Error::VoteProposalSignatureNotFound)?
416                .verify(vote_proposal, public_key)
417                .map_err(|error| Error::InternalError(error.to_string()))?;
418        }
419
420        let proposed_block = vote_proposal.block();
421        let mut safety_data = self.persistent_storage.safety_data()?;
422
423        self.verify_epoch(proposed_block.epoch(), &safety_data)?;
424
425        // if already voted on this round, send back the previous vote
426        // note: this needs to happen after verifying the epoch as we just check
427        // the round here
428        if let Some(vote) = safety_data.last_vote.clone() {
429            if vote.vote_data().proposed().round() == proposed_block.round() {
430                return Ok(vote);
431            }
432        }
433
434        self.verify_qc(proposed_block.quorum_cert())?;
435        proposed_block
436            .validate_signature(&self.epoch_state()?.verifier())
437            .map_err(|error| Error::InternalError(error.to_string()))?;
438
439        self.verify_and_update_preferred_round(
440            proposed_block.quorum_cert(),
441            &mut safety_data,
442        )?;
443        self.verify_and_update_last_vote_round(
444            proposed_block.block_data().round(),
445            &mut safety_data,
446        )?;
447
448        // Construct and sign vote
449        let vote_data = Self::extension_check(vote_proposal)?;
450        let author = self.signer()?.author();
451        let ledger_info =
452            Self::construct_ledger_info(proposed_block, vote_data.hash())?;
453        let signature = self.sign(&ledger_info)?;
454        let vote =
455            Vote::new_with_signature(vote_data, author, ledger_info, signature);
456
457        safety_data.last_vote = Some(vote.clone());
458        self.persistent_storage.set_safety_data(safety_data)?;
459
460        Ok(vote)
461    }
462
463    fn guarded_sign_proposal(
464        &mut self, block_data: BlockData,
465    ) -> Result<Block, Error> {
466        self.signer()?;
467        self.verify_author(block_data.author())?;
468
469        let mut safety_data = self.persistent_storage.safety_data()?;
470        self.verify_epoch(block_data.epoch(), &safety_data)?;
471
472        if block_data.round() <= safety_data.last_voted_round {
473            return Err(Error::InvalidProposal(format!(
474                "Proposed round {} is not higher than last voted round {}",
475                block_data.round(),
476                safety_data.last_voted_round
477            )));
478        }
479
480        self.verify_qc(block_data.quorum_cert())?;
481        if self.verify_and_update_preferred_round(
482            block_data.quorum_cert(),
483            &mut safety_data,
484        )? {
485            self.persistent_storage.set_safety_data(safety_data)?;
486        }
487
488        let signature = self.sign(&block_data)?;
489        Ok(Block::new_proposal_from_block_data_and_signature(
490            block_data, signature, None,
491        ))
492    }
493
494    fn guarded_sign_timeout(
495        &mut self, timeout: &Timeout,
496    ) -> Result<ConsensusSignature, Error> {
497        self.signer()?;
498
499        let mut safety_data = self.persistent_storage.safety_data()?;
500        self.verify_epoch(timeout.epoch(), &safety_data)?;
501
502        if timeout.round() <= safety_data.preferred_round {
503            return Err(Error::IncorrectPreferredRound(
504                timeout.round(),
505                safety_data.preferred_round,
506            ));
507        }
508        if timeout.round() < safety_data.last_voted_round {
509            return Err(Error::IncorrectLastVotedRound(
510                timeout.round(),
511                safety_data.last_voted_round,
512            ));
513        }
514        if timeout.round() > safety_data.last_voted_round {
515            self.verify_and_update_last_vote_round(
516                timeout.round(),
517                &mut safety_data,
518            )?;
519            self.persistent_storage.set_safety_data(safety_data)?;
520        }
521
522        let signature = self.sign(timeout)?;
523        Ok(signature)
524    }
525
526    pub fn start_voting(&mut self, initialize: bool) -> Result<(), Error> {
527        if initialize {
528            // If the node starts voting with its local safety data,
529            // `SafetyRule` just remains the same.
530            Ok(())
531        } else {
532            self.persistent_storage
533                .replace_with_suffix(SAFETY_STORAGE_SAVE_SUFFIX)
534        }
535    }
536
537    pub fn stop_voting(&mut self) -> Result<(), Error> {
538        self.persistent_storage
539            .save_to_suffix(SAFETY_STORAGE_SAVE_SUFFIX)
540    }
541}
542
543impl TSafetyRules for SafetyRules {
544    fn consensus_state(&mut self) -> Result<ConsensusState, Error> {
545        let cb = || self.guarded_consensus_state();
546        run_and_log(cb, |log| log, LogEntry::ConsensusState)
547    }
548
549    fn initialize(&mut self, proof: &EpochChangeProof) -> Result<(), Error> {
550        let cb = || self.guarded_initialize(proof);
551        run_and_log(cb, |log| log, LogEntry::Initialize)
552    }
553
554    fn construct_and_sign_vote(
555        &mut self, maybe_signed_vote_proposal: &MaybeSignedVoteProposal,
556    ) -> Result<Vote, Error> {
557        let round = maybe_signed_vote_proposal.vote_proposal.block().round();
558        let cb =
559            || self.guarded_construct_and_sign_vote(maybe_signed_vote_proposal);
560        run_and_log(cb, |log| log.round(round), LogEntry::ConstructAndSignVote)
561    }
562
563    fn sign_proposal(&mut self, block_data: BlockData) -> Result<Block, Error> {
564        let round = block_data.round();
565        let cb = || self.guarded_sign_proposal(block_data);
566        run_and_log(cb, |log| log.round(round), LogEntry::SignProposal)
567    }
568
569    fn sign_timeout(
570        &mut self, timeout: &Timeout,
571    ) -> Result<ConsensusSignature, Error> {
572        let cb = || self.guarded_sign_timeout(timeout);
573        run_and_log(cb, |log| log.round(timeout.round()), LogEntry::SignTimeout)
574    }
575}
576
577fn run_and_log<F, L, R>(
578    callback: F, log_cb: L, log_entry: LogEntry,
579) -> Result<R, Error>
580where
581    F: FnOnce() -> Result<R, Error>,
582    L: for<'a> Fn(SafetyLogSchema<'a>) -> SafetyLogSchema<'a>,
583{
584    let _timer = counters::start_timer("internal", log_entry.as_str());
585    diem_debug!(log_cb(SafetyLogSchema::new(log_entry, LogEvent::Request)));
586    counters::increment_query(log_entry.as_str(), "request");
587    callback()
588        .map(|v| {
589            diem_info!(log_cb(SafetyLogSchema::new(
590                log_entry,
591                LogEvent::Success
592            )));
593            counters::increment_query(log_entry.as_str(), "success");
594            v
595        })
596        .map_err(|err| {
597            diem_error!(log_cb(SafetyLogSchema::new(
598                log_entry,
599                LogEvent::Error
600            ))
601            .error(&err));
602            counters::increment_query(log_entry.as_str(), "error");
603            err
604        })
605}