1use crate::{
9 configurable_validator_signer::ConfigurableValidatorSigner,
10 consensus_state::ConsensusState,
11 counters,
12 error::Error,
13 logging::{LogEntry, LogEvent, SafetyLogSchema},
14 persistent_safety_storage::PersistentSafetyStorage,
15 t_safety_rules::TSafetyRules,
16};
17use consensus_types::{
18 block::Block,
19 block_data::BlockData,
20 common::{Author, Round},
21 quorum_cert::QuorumCert,
22 safety_data::SafetyData,
23 timeout::Timeout,
24 vote::Vote,
25 vote_data::VoteData,
26 vote_proposal::{MaybeSignedVoteProposal, VoteProposal},
27};
28use diem_crypto::{
29 hash::{CryptoHash, HashValue},
30 traits::Signature,
31 PrivateKey,
32};
33use diem_logger::prelude::*;
34use diem_types::{
35 account_address::AccountAddress,
36 block_info::BlockInfo,
37 epoch_change::EpochChangeProof,
38 epoch_state::EpochState,
39 ledger_info::LedgerInfo,
40 validator_config::{
41 ConsensusPublicKey, ConsensusSignature, ConsensusVRFPrivateKey,
42 },
43 waypoint::Waypoint,
44};
45use log::error;
46use serde::Serialize;
47use std::cmp::Ordering;
48
49const SAFETY_STORAGE_SAVE_SUFFIX: &str = "json.save";
50
51pub struct SafetyRules {
53 persistent_storage: PersistentSafetyStorage,
54 execution_public_key: Option<ConsensusPublicKey>,
55 export_consensus_key: bool,
56 validator_signer: Option<ConfigurableValidatorSigner>,
57 epoch_state: Option<EpochState>,
58 vrf_private_key: Option<ConsensusVRFPrivateKey>,
59}
60
61impl SafetyRules {
62 pub fn new(
65 persistent_storage: PersistentSafetyStorage,
66 _verify_vote_proposal_signature: bool, export_consensus_key: bool,
67 vrf_private_key: Option<ConsensusVRFPrivateKey>,
68 author: AccountAddress,
69 ) -> Self {
70 let execution_public_key = None;
71 if let Ok(storage_author) = persistent_storage.author() {
72 if storage_author != author {
73 diem_error!(
77 "author in secure_storage does not match PoS keys!"
78 );
79 error!("author in secure_storage does not match PoS keys!");
80 }
81 }
82 Self {
93 persistent_storage,
94 execution_public_key,
95 export_consensus_key,
96 validator_signer: None,
97 epoch_state: None,
98 vrf_private_key,
99 }
100 }
101
102 fn sign<T: Serialize + CryptoHash>(
103 &self, message: &T,
104 ) -> Result<ConsensusSignature, Error> {
105 let signer = self.signer()?;
106 signer.sign(message, &self.persistent_storage)
107 }
108
109 fn signer(&self) -> Result<&ConfigurableValidatorSigner, Error> {
110 self.validator_signer
111 .as_ref()
112 .ok_or_else(|| Error::NotInitialized("validator_signer".into()))
113 }
114
115 fn epoch_state(&self) -> Result<&EpochState, Error> {
116 self.epoch_state
117 .as_ref()
118 .ok_or_else(|| Error::NotInitialized("epoch_state".into()))
119 }
120
121 pub fn extension_check(
123 vote_proposal: &VoteProposal,
124 ) -> Result<VoteData, Error> {
125 let proposed_block = vote_proposal.block();
126 let new_tree = vote_proposal
127 .accumulator_extension_proof()
128 .verify(
129 proposed_block
130 .quorum_cert()
131 .certified_block()
132 .executed_state_id(),
133 )
134 .map_err(|e| Error::InvalidAccumulatorExtension(e.to_string()))?;
135 Ok(VoteData::new(
136 proposed_block.gen_block_info(
137 Default::default(),
139 new_tree.version(),
140 vote_proposal.next_epoch_state().cloned(),
141 vote_proposal.pivot_decision().clone(),
142 ),
143 proposed_block.quorum_cert().certified_block().clone(),
144 ))
145 }
146
147 pub fn construct_ledger_info(
154 proposed_block: &Block, consensus_data_hash: HashValue,
155 ) -> Result<LedgerInfo, Error> {
156 let block2 = proposed_block.round();
157 let block1 = proposed_block.quorum_cert().certified_block().round();
158 let block0 = proposed_block.quorum_cert().parent_block().round();
159
160 let next_round = |round: u64| {
162 u64::checked_add(round, 1).ok_or(Error::IncorrectRound(round))
163 };
164 let commit =
165 next_round(block0)? == block1 && next_round(block1)? == block2;
166
167 let commit_info = if commit {
169 proposed_block.quorum_cert().parent_block().clone()
170 } else {
171 BlockInfo::empty()
172 };
173
174 Ok(LedgerInfo::new(commit_info, consensus_data_hash))
175 }
176
177 fn verify_and_update_preferred_round(
179 &mut self, quorum_cert: &QuorumCert, safety_data: &mut SafetyData,
180 ) -> Result<bool, Error> {
181 let preferred_round = safety_data.preferred_round;
182 let one_chain_round = quorum_cert.certified_block().round();
183 let two_chain_round = quorum_cert.parent_block().round();
184
185 if one_chain_round < preferred_round {
186 return Err(Error::IncorrectPreferredRound(
187 one_chain_round,
188 preferred_round,
189 ));
190 }
191
192 let updated = match two_chain_round.cmp(&preferred_round) {
193 Ordering::Greater => {
194 safety_data.preferred_round = two_chain_round;
195 diem_info!(SafetyLogSchema::new(
196 LogEntry::PreferredRound,
197 LogEvent::Update
198 )
199 .preferred_round(safety_data.preferred_round));
200 true
201 }
202 Ordering::Less => {
203 diem_trace!(
204 "2-chain round {} is lower than preferred round {} but 1-chain round {} is higher.",
205 two_chain_round, preferred_round, one_chain_round
206 );
207 false
208 }
209 Ordering::Equal => false,
210 };
211 Ok(updated)
212 }
213
214 fn verify_author(&self, author: Option<Author>) -> Result<(), Error> {
216 let validator_signer_author = &self.signer()?.author();
217 let author = author.ok_or_else(|| {
218 Error::InvalidProposal("No author found in the proposal".into())
219 })?;
220 if validator_signer_author != &author {
221 return Err(Error::InvalidProposal(
222 "Proposal author is not validator signer!".into(),
223 ));
224 }
225 Ok(())
226 }
227
228 fn verify_epoch(
231 &self, epoch: u64, safety_data: &SafetyData,
232 ) -> Result<(), Error> {
233 if epoch != safety_data.epoch {
234 return Err(Error::IncorrectEpoch(epoch, safety_data.epoch));
235 }
236
237 Ok(())
238 }
239
240 fn verify_and_update_last_vote_round(
242 &self, round: Round, safety_data: &mut SafetyData,
243 ) -> Result<(), Error> {
244 if round <= safety_data.last_voted_round {
245 return Err(Error::IncorrectLastVotedRound(
246 round,
247 safety_data.last_voted_round,
248 ));
249 }
250
251 safety_data.last_voted_round = round;
252 diem_info!(SafetyLogSchema::new(
253 LogEntry::LastVotedRound,
254 LogEvent::Update
255 )
256 .last_voted_round(safety_data.last_voted_round));
257
258 Ok(())
259 }
260
261 fn verify_qc(&self, qc: &QuorumCert) -> Result<(), Error> {
263 let epoch_state = self.epoch_state()?;
264
265 qc.verify(&epoch_state.verifier())
266 .map_err(|e| Error::InvalidQuorumCertificate(e.to_string()))?;
267 Ok(())
268 }
269
270 fn guarded_consensus_state(&mut self) -> Result<ConsensusState, Error> {
274 let waypoint = self.persistent_storage.waypoint()?;
275 let safety_data = self.persistent_storage.safety_data()?;
276
277 diem_info!(SafetyLogSchema::new(LogEntry::State, LogEvent::Update)
278 .author(self.persistent_storage.author()?)
279 .epoch(safety_data.epoch)
280 .last_voted_round(safety_data.last_voted_round)
281 .preferred_round(safety_data.preferred_round)
282 .waypoint(waypoint));
283
284 Ok(ConsensusState::new(
285 self.persistent_storage.safety_data()?,
286 self.persistent_storage.waypoint()?,
287 self.signer().is_ok(),
288 ))
289 }
290
291 fn guarded_initialize(
292 &mut self, proof: &EpochChangeProof,
293 ) -> Result<(), Error> {
294 let waypoint = self.persistent_storage.waypoint()?;
295 let last_li = proof
296 .verify(&waypoint)
297 .map_err(|e| Error::InvalidEpochChangeProof(format!("{}", e)))?;
298 let ledger_info = last_li.ledger_info();
299 let epoch_state = ledger_info
300 .next_epoch_state()
301 .cloned()
302 .ok_or(Error::InvalidLedgerInfo)?;
303
304 let current_epoch = self.persistent_storage.safety_data()?.epoch;
305 if current_epoch < epoch_state.epoch {
306 let waypoint = &Waypoint::new_epoch_boundary(ledger_info)
313 .map_err(|error| Error::InternalError(error.to_string()))?;
314 self.persistent_storage.set_waypoint(waypoint)?;
315 self.persistent_storage.set_safety_data(SafetyData::new(
316 epoch_state.epoch,
317 0,
318 0,
319 None,
320 ))?;
321
322 diem_info!(SafetyLogSchema::new(LogEntry::Epoch, LogEvent::Update)
323 .epoch(epoch_state.epoch));
324 }
325 self.epoch_state = Some(epoch_state.clone());
326
327 let author = self.persistent_storage.author()?;
328 let expected_key = epoch_state.verifier().get_public_key(&author);
329 let initialize_result = match expected_key {
330 None => {
331 diem_debug!(
332 "guarded_initialize: not a validator in epoch_set={:?}",
333 epoch_state
334 );
335 self.validator_signer = None;
336 Ok(())
337 }
338 Some(expected_key) => {
339 let current_key = self.signer().ok().map(|s| s.public_key());
340 if current_key == Some(expected_key.clone()) {
341 diem_debug!(
342 SafetyLogSchema::new(
343 LogEntry::KeyReconciliation,
344 LogEvent::Success
345 ),
346 "in set",
347 );
348 Ok(())
349 } else if self.export_consensus_key {
350 match self
352 .persistent_storage
353 .consensus_key_for_version(expected_key.clone())
354 {
355 Ok(consensus_key) => {
356 if consensus_key.public_key() != expected_key {
357 Err(Error::ValidatorKeyNotFound("exported key does not match the expected key".into()))
358 } else {
359 self.validator_signer = Some(
360 ConfigurableValidatorSigner::new_signer(
361 author,
362 consensus_key,
363 self.vrf_private_key.clone(),
364 ),
365 );
366 Ok(())
367 }
368 }
369 Err(Error::SecureStorageMissingDataError(error)) => {
370 Err(Error::ValidatorKeyNotFound(error))
371 }
372 Err(error) => Err(error),
373 }
374 } else {
375 self.validator_signer =
379 Some(ConfigurableValidatorSigner::new_handle(
380 author,
381 expected_key.clone(),
382 ));
383 self.sign(&Timeout::new(0, 0)).and_then(|signature| {
384 signature
385 .verify(&Timeout::new(0, 0), &expected_key)
386 .map_err(|error| {
387 Error::ValidatorKeyNotFound(error.to_string())
388 })
389 })
390 }
391 }
392 };
393 initialize_result.map_err(|error| {
394 diem_info!(SafetyLogSchema::new(
395 LogEntry::KeyReconciliation,
396 LogEvent::Error
397 )
398 .error(&error),);
399 self.validator_signer = None;
400 error
401 })
402 }
403
404 fn guarded_construct_and_sign_vote(
405 &mut self, maybe_signed_vote_proposal: &MaybeSignedVoteProposal,
406 ) -> Result<Vote, Error> {
407 self.signer()?;
409
410 let vote_proposal = &maybe_signed_vote_proposal.vote_proposal;
411 let execution_signature = maybe_signed_vote_proposal.signature.as_ref();
412
413 if let Some(public_key) = self.execution_public_key.as_ref() {
414 execution_signature
415 .ok_or(Error::VoteProposalSignatureNotFound)?
416 .verify(vote_proposal, public_key)
417 .map_err(|error| Error::InternalError(error.to_string()))?;
418 }
419
420 let proposed_block = vote_proposal.block();
421 let mut safety_data = self.persistent_storage.safety_data()?;
422
423 self.verify_epoch(proposed_block.epoch(), &safety_data)?;
424
425 if let Some(vote) = safety_data.last_vote.clone() {
429 if vote.vote_data().proposed().round() == proposed_block.round() {
430 return Ok(vote);
431 }
432 }
433
434 self.verify_qc(proposed_block.quorum_cert())?;
435 proposed_block
436 .validate_signature(&self.epoch_state()?.verifier())
437 .map_err(|error| Error::InternalError(error.to_string()))?;
438
439 self.verify_and_update_preferred_round(
440 proposed_block.quorum_cert(),
441 &mut safety_data,
442 )?;
443 self.verify_and_update_last_vote_round(
444 proposed_block.block_data().round(),
445 &mut safety_data,
446 )?;
447
448 let vote_data = Self::extension_check(vote_proposal)?;
450 let author = self.signer()?.author();
451 let ledger_info =
452 Self::construct_ledger_info(proposed_block, vote_data.hash())?;
453 let signature = self.sign(&ledger_info)?;
454 let vote =
455 Vote::new_with_signature(vote_data, author, ledger_info, signature);
456
457 safety_data.last_vote = Some(vote.clone());
458 self.persistent_storage.set_safety_data(safety_data)?;
459
460 Ok(vote)
461 }
462
463 fn guarded_sign_proposal(
464 &mut self, block_data: BlockData,
465 ) -> Result<Block, Error> {
466 self.signer()?;
467 self.verify_author(block_data.author())?;
468
469 let mut safety_data = self.persistent_storage.safety_data()?;
470 self.verify_epoch(block_data.epoch(), &safety_data)?;
471
472 if block_data.round() <= safety_data.last_voted_round {
473 return Err(Error::InvalidProposal(format!(
474 "Proposed round {} is not higher than last voted round {}",
475 block_data.round(),
476 safety_data.last_voted_round
477 )));
478 }
479
480 self.verify_qc(block_data.quorum_cert())?;
481 if self.verify_and_update_preferred_round(
482 block_data.quorum_cert(),
483 &mut safety_data,
484 )? {
485 self.persistent_storage.set_safety_data(safety_data)?;
486 }
487
488 let signature = self.sign(&block_data)?;
489 Ok(Block::new_proposal_from_block_data_and_signature(
490 block_data, signature, None,
491 ))
492 }
493
494 fn guarded_sign_timeout(
495 &mut self, timeout: &Timeout,
496 ) -> Result<ConsensusSignature, Error> {
497 self.signer()?;
498
499 let mut safety_data = self.persistent_storage.safety_data()?;
500 self.verify_epoch(timeout.epoch(), &safety_data)?;
501
502 if timeout.round() <= safety_data.preferred_round {
503 return Err(Error::IncorrectPreferredRound(
504 timeout.round(),
505 safety_data.preferred_round,
506 ));
507 }
508 if timeout.round() < safety_data.last_voted_round {
509 return Err(Error::IncorrectLastVotedRound(
510 timeout.round(),
511 safety_data.last_voted_round,
512 ));
513 }
514 if timeout.round() > safety_data.last_voted_round {
515 self.verify_and_update_last_vote_round(
516 timeout.round(),
517 &mut safety_data,
518 )?;
519 self.persistent_storage.set_safety_data(safety_data)?;
520 }
521
522 let signature = self.sign(timeout)?;
523 Ok(signature)
524 }
525
526 pub fn start_voting(&mut self, initialize: bool) -> Result<(), Error> {
527 if initialize {
528 Ok(())
531 } else {
532 self.persistent_storage
533 .replace_with_suffix(SAFETY_STORAGE_SAVE_SUFFIX)
534 }
535 }
536
537 pub fn stop_voting(&mut self) -> Result<(), Error> {
538 self.persistent_storage
539 .save_to_suffix(SAFETY_STORAGE_SAVE_SUFFIX)
540 }
541}
542
543impl TSafetyRules for SafetyRules {
544 fn consensus_state(&mut self) -> Result<ConsensusState, Error> {
545 let cb = || self.guarded_consensus_state();
546 run_and_log(cb, |log| log, LogEntry::ConsensusState)
547 }
548
549 fn initialize(&mut self, proof: &EpochChangeProof) -> Result<(), Error> {
550 let cb = || self.guarded_initialize(proof);
551 run_and_log(cb, |log| log, LogEntry::Initialize)
552 }
553
554 fn construct_and_sign_vote(
555 &mut self, maybe_signed_vote_proposal: &MaybeSignedVoteProposal,
556 ) -> Result<Vote, Error> {
557 let round = maybe_signed_vote_proposal.vote_proposal.block().round();
558 let cb =
559 || self.guarded_construct_and_sign_vote(maybe_signed_vote_proposal);
560 run_and_log(cb, |log| log.round(round), LogEntry::ConstructAndSignVote)
561 }
562
563 fn sign_proposal(&mut self, block_data: BlockData) -> Result<Block, Error> {
564 let round = block_data.round();
565 let cb = || self.guarded_sign_proposal(block_data);
566 run_and_log(cb, |log| log.round(round), LogEntry::SignProposal)
567 }
568
569 fn sign_timeout(
570 &mut self, timeout: &Timeout,
571 ) -> Result<ConsensusSignature, Error> {
572 let cb = || self.guarded_sign_timeout(timeout);
573 run_and_log(cb, |log| log.round(timeout.round()), LogEntry::SignTimeout)
574 }
575}
576
577fn run_and_log<F, L, R>(
578 callback: F, log_cb: L, log_entry: LogEntry,
579) -> Result<R, Error>
580where
581 F: FnOnce() -> Result<R, Error>,
582 L: for<'a> Fn(SafetyLogSchema<'a>) -> SafetyLogSchema<'a>,
583{
584 let _timer = counters::start_timer("internal", log_entry.as_str());
585 diem_debug!(log_cb(SafetyLogSchema::new(log_entry, LogEvent::Request)));
586 counters::increment_query(log_entry.as_str(), "request");
587 callback()
588 .map(|v| {
589 diem_info!(log_cb(SafetyLogSchema::new(
590 log_entry,
591 LogEvent::Success
592 )));
593 counters::increment_query(log_entry.as_str(), "success");
594 v
595 })
596 .map_err(|err| {
597 diem_error!(log_cb(SafetyLogSchema::new(
598 log_entry,
599 LogEvent::Error
600 ))
601 .error(&err));
602 counters::increment_query(log_entry.as_str(), "error");
603 err
604 })
605}