1use std::{
9 sync::{
10 atomic::{AtomicBool, Ordering as AtomicOrdering},
11 Arc,
12 },
13 time::Duration,
14};
15
16use anyhow::{bail, ensure, Context, Result};
17use fail::fail_point;
18use futures::{
19 channel::{mpsc, oneshot},
20 SinkExt,
21};
22use serde::Serialize;
23
24use consensus_types::{
25 block::Block,
26 block_retrieval::{BlockRetrievalResponse, BlockRetrievalStatus},
27 common::{Author, Round},
28 proposal_msg::ProposalMsg,
29 quorum_cert::QuorumCert,
30 sync_info::SyncInfo,
31 timeout_certificate::TimeoutCertificate,
32 vote::Vote,
33 vote_msg::VoteMsg,
34};
35use diem_config::keys::ConfigKey;
36use diem_crypto::{hash::CryptoHash, HashValue, SigningKey, VRFPrivateKey};
37use diem_infallible::checked;
38use diem_logger::prelude::*;
39use diem_types::{
40 account_address::{from_consensus_public_key, AccountAddress},
41 block_info::PivotBlockDecision,
42 chain_id::ChainId,
43 epoch_state::EpochState,
44 ledger_info::LedgerInfoWithSignatures,
45 transaction::{
46 ConflictSignature, DisputePayload, ElectionPayload, RawTransaction,
47 SignedTransaction, TransactionPayload,
48 },
49 validator_config::{ConsensusPrivateKey, ConsensusVRFPrivateKey},
50 validator_verifier::ValidatorVerifier,
51};
52#[cfg(test)]
53use safety_rules::ConsensusState;
54use safety_rules::{SafetyRules, TSafetyRules};
55
56use crate::pos::{
57 mempool::SubmissionStatus,
58 protocol::message::block_retrieval_response::BlockRetrievalRpcResponse,
59};
60
61use super::{
62 block_storage::{
63 tracing::{observe_block, BlockStage},
64 BlockReader, BlockRetriever, BlockStore,
65 },
66 counters,
67 error::VerifyError,
68 liveness::{
69 proposal_generator::ProposalGenerator,
70 proposer_election::ProposerElection,
71 round_state::{NewRoundEvent, NewRoundReason, RoundState},
72 },
73 logging::{LogEvent, LogSchema},
74 metrics_safety_rules::MetricsSafetyRules,
75 network::{
76 ConsensusMsg, ConsensusNetworkSender, IncomingBlockRetrievalRequest,
77 },
78 pending_votes::VoteReceptionResult,
79 persistent_liveness_storage::{PersistentLivenessStorage, RecoveryData},
80 state_replication::{StateComputer, TxnManager},
81};
82
83#[derive(Serialize, Clone)]
84pub enum UnverifiedEvent {
85 ProposalMsg(Box<ProposalMsg>),
86 VoteMsg(Box<VoteMsg>),
87 SyncInfo(Box<SyncInfo>),
88}
89
90impl UnverifiedEvent {
91 pub fn verify(
92 self, validator: &ValidatorVerifier, epoch_vrf_seed: &[u8],
93 ) -> Result<VerifiedEvent, VerifyError> {
94 Ok(match self {
95 UnverifiedEvent::ProposalMsg(p) => {
96 p.verify(validator, epoch_vrf_seed)?;
97 VerifiedEvent::ProposalMsg(p)
98 }
99 UnverifiedEvent::VoteMsg(v) => {
100 v.verify(validator)?;
101 VerifiedEvent::VoteMsg(v)
102 }
103 UnverifiedEvent::SyncInfo(s) => {
104 s.verify(validator)?;
105 VerifiedEvent::SyncInfo(s)
106 }
107 })
108 }
109
110 pub fn epoch(&self) -> u64 {
111 match self {
112 UnverifiedEvent::ProposalMsg(p) => p.epoch(),
113 UnverifiedEvent::VoteMsg(v) => v.epoch(),
114 UnverifiedEvent::SyncInfo(s) => s.epoch(),
115 }
116 }
117}
118
119impl From<ConsensusMsg> for UnverifiedEvent {
120 fn from(value: ConsensusMsg) -> Self {
121 match value {
122 ConsensusMsg::ProposalMsg(m) => UnverifiedEvent::ProposalMsg(m),
123 ConsensusMsg::VoteMsg(m) => UnverifiedEvent::VoteMsg(m),
124 ConsensusMsg::SyncInfo(m) => UnverifiedEvent::SyncInfo(m),
125 _ => unreachable!("Unexpected conversion"),
126 }
127 }
128}
129
130pub enum VerifiedEvent {
131 ProposalMsg(Box<ProposalMsg>),
132 VoteMsg(Box<VoteMsg>),
133 SyncInfo(Box<SyncInfo>),
134}
135
136#[cfg(test)]
137#[path = "round_manager_test.rs"]
138mod round_manager_test;
139
140#[cfg(feature = "fuzzing")]
141#[path = "round_manager_fuzzing.rs"]
142pub mod round_manager_fuzzing;
143
144pub struct RecoveryManager {
148 epoch_state: EpochState,
149 network: ConsensusNetworkSender,
150 storage: Arc<dyn PersistentLivenessStorage>,
151 state_computer: Arc<dyn StateComputer>,
152 last_committed_round: Round,
153}
154
155impl RecoveryManager {
156 pub fn new(
157 epoch_state: EpochState, network: ConsensusNetworkSender,
158 storage: Arc<dyn PersistentLivenessStorage>,
159 state_computer: Arc<dyn StateComputer>, last_committed_round: Round,
160 ) -> Self {
161 RecoveryManager {
162 epoch_state,
163 network,
164 storage,
165 state_computer,
166 last_committed_round,
167 }
168 }
169
170 pub async fn process_proposal_msg(
171 &mut self, proposal_msg: ProposalMsg,
172 ) -> Result<RecoveryData> {
173 let author = proposal_msg.proposer();
174 let sync_info = proposal_msg.sync_info();
175 self.sync_up(&sync_info, author).await
176 }
177
178 pub async fn process_vote_msg(
179 &mut self, vote_msg: VoteMsg,
180 ) -> Result<RecoveryData> {
181 let author = vote_msg.vote().author();
182 let sync_info = vote_msg.sync_info();
183 self.sync_up(&sync_info, author).await
184 }
185
186 pub async fn sync_up(
187 &mut self, sync_info: &SyncInfo, peer: Author,
188 ) -> Result<RecoveryData> {
189 sync_info
190 .verify(&self.epoch_state.verifier())
191 .map_err(VerifyError::from)?;
192 ensure!(
193 sync_info.highest_round() > self.last_committed_round,
194 "[RecoveryManager] Received sync info has lower round number than committed block"
195 );
196 ensure!(
197 sync_info.epoch() == self.epoch_state.epoch,
198 "[RecoveryManager] Received sync info is in different epoch than committed block"
199 );
200 let mut retriever = BlockRetriever::new(self.network.clone(), peer);
201 let recovery_data = BlockStore::fast_forward_sync(
202 &sync_info.highest_commit_cert(),
203 &mut retriever,
204 self.storage.clone(),
205 self.state_computer.clone(),
206 )
207 .await?;
208
209 Ok(recovery_data)
210 }
211
212 pub fn epoch_state(&self) -> &EpochState { &self.epoch_state }
213}
214
215pub struct RoundManager {
221 epoch_state: EpochState,
222 block_store: Arc<BlockStore>,
223 round_state: RoundState,
224 proposer_election: Box<dyn ProposerElection + Send + Sync>,
225 proposal_generator: Option<ProposalGenerator>,
227 safety_rules: MetricsSafetyRules,
228 network: ConsensusNetworkSender,
229 txn_manager: Arc<dyn TxnManager>,
230 storage: Arc<dyn PersistentLivenessStorage>,
231 sync_only: bool,
232 tx_sender: mpsc::Sender<(
233 SignedTransaction,
234 oneshot::Sender<anyhow::Result<SubmissionStatus>>,
235 )>,
236 chain_id: ChainId,
237
238 is_voting: bool,
239 election_control: Arc<AtomicBool>,
240 consensus_private_key: Option<ConfigKey<ConsensusPrivateKey>>,
241 vrf_private_key: Option<ConfigKey<ConsensusVRFPrivateKey>>,
242}
243
244impl RoundManager {
245 pub fn new(
246 epoch_state: EpochState, block_store: Arc<BlockStore>,
247 round_state: RoundState,
248 proposer_election: Box<dyn ProposerElection + Send + Sync>,
249 proposal_generator: Option<ProposalGenerator>,
250 safety_rules: MetricsSafetyRules, network: ConsensusNetworkSender,
251 txn_manager: Arc<dyn TxnManager>,
252 storage: Arc<dyn PersistentLivenessStorage>, sync_only: bool,
253 tx_sender: mpsc::Sender<(
254 SignedTransaction,
255 oneshot::Sender<anyhow::Result<SubmissionStatus>>,
256 )>,
257 chain_id: ChainId, is_voting: bool, election_control: Arc<AtomicBool>,
258 consensus_private_key: Option<ConfigKey<ConsensusPrivateKey>>,
259 vrf_private_key: Option<ConfigKey<ConsensusVRFPrivateKey>>,
260 ) -> Self {
261 counters::OP_COUNTERS
262 .gauge("sync_only")
263 .set(sync_only as i64);
264 Self {
265 epoch_state,
266 block_store,
267 round_state,
268 proposer_election,
269 proposal_generator,
270 is_voting,
271 safety_rules,
272 txn_manager,
273 network,
274 storage,
275 sync_only,
276 tx_sender,
277 chain_id,
278 election_control,
279 consensus_private_key,
280 vrf_private_key,
281 }
282 }
283
284 fn create_block_retriever(&self, author: Author) -> BlockRetriever {
285 BlockRetriever::new(self.network.clone(), author)
286 }
287
288 async fn process_new_round_event(
303 &mut self, new_round_event: NewRoundEvent,
304 ) -> anyhow::Result<()> {
305 counters::CURRENT_ROUND.set(new_round_event.round as i64);
306 counters::ROUND_TIMEOUT_MS
307 .set(new_round_event.timeout.as_millis() as i64);
308 match new_round_event.reason {
309 NewRoundReason::QCReady => {
310 counters::QC_ROUNDS_COUNT.inc();
311 }
312 NewRoundReason::Timeout => {
313 counters::TIMEOUT_ROUNDS_COUNT.inc();
314 }
315 };
316 diem_debug!(
317 self.new_log(LogEvent::NewRound),
318 reason = new_round_event.reason
319 );
320 if self.proposer_election.is_random_election() {
321 self.proposer_election.next_round(
322 new_round_event.round,
323 self.epoch_state.vrf_seed.clone(),
324 );
325 self.round_state
326 .setup_proposal_timeout(self.epoch_state.epoch);
327 }
328
329 if let Err(e) = self.broadcast_pivot_decision().await {
330 diem_error!("error in broadcasting pivot decision tx: {:?}", e);
331 }
332 if let Err(e) = self.broadcast_election().await {
335 diem_error!("error in broadcasting election tx: {:?}", e);
336 }
337
338 if self.is_validator() {
339 if let Some(ref proposal_generator) = self.proposal_generator {
340 let author = proposal_generator.author();
341
342 if self
343 .proposer_election
344 .is_valid_proposer(author, new_round_event.round)
345 {
346 let proposal_msg = ConsensusMsg::ProposalMsg(Box::new(
347 self.generate_proposal(new_round_event).await?,
348 ));
349 let mut network = self.network.clone();
350 network.broadcast(proposal_msg, vec![]).await;
351 counters::PROPOSALS_COUNT.inc();
352 }
353 }
354 }
355 Ok(())
356 }
357
358 pub async fn broadcast_pivot_decision(&mut self) -> anyhow::Result<()> {
359 if !self.is_validator() {
360 return Ok(());
362 }
363 diem_debug!("broadcast_pivot_decision starts");
364
365 let hqc = self.block_store.highest_quorum_cert();
366 let parent_block = hqc.certified_block();
367 if self.block_store.path_from_root(parent_block.id()).is_none() {
369 bail!("HQC {} already pruned", parent_block);
370 }
371
372 let parent_decision = parent_block
375 .pivot_decision()
376 .map(|d| d.block_hash)
377 .unwrap_or_default();
378 let pivot_decision = match self
379 .block_store
380 .pow_handler
381 .next_pivot_decision(parent_decision)
382 .await
383 {
384 Some(res) => res,
385 None => {
386 diem_debug!("No new pivot decision");
388 return Ok(());
389 }
390 };
391
392 let proposal_generator =
393 self.proposal_generator.as_ref().expect("checked");
394 diem_info!("Broadcast new pivot decision: {:?}", pivot_decision);
395 let raw_tx = RawTransaction::new_pivot_decision(
398 proposal_generator.author(),
399 PivotBlockDecision {
400 block_hash: pivot_decision.1,
401 height: pivot_decision.0,
402 },
403 self.chain_id,
404 );
405 let signed_tx =
406 raw_tx.sign(&proposal_generator.private_key)?.into_inner();
407 let (tx, rx) = oneshot::channel();
408 self.tx_sender.send((signed_tx, tx)).await?;
409 rx.await??;
411 diem_debug!("broadcast_pivot_decision sends");
412 Ok(())
413 }
414
415 pub async fn broadcast_election(&mut self) -> anyhow::Result<()> {
416 if !self.election_control.load(AtomicOrdering::Relaxed) {
417 diem_debug!("Skip election for election_control");
418 return Ok(());
419 }
420 if !self.is_voting {
421 return Ok(());
423 }
424 if !self.block_store.pow_handler.is_normal_phase() {
425 return Ok(());
429 }
430 if self.vrf_private_key.is_none()
431 || self.consensus_private_key.is_none()
432 {
433 diem_warn!("broadcast_election without keys");
434 return Ok(());
435 }
436 let private_key = self.consensus_private_key.as_ref().unwrap();
437 let vrf_private_key = self.vrf_private_key.as_ref().unwrap();
438 let author = from_consensus_public_key(
439 &private_key.public_key(),
440 &vrf_private_key.public_key(),
441 );
442 diem_debug!("broadcast_election starts");
443 let pos_state = self.storage.pos_ledger_db().get_latest_pos_state();
444 if let Some(target_term) = pos_state.next_elect_term(&author) {
445 let epoch_vrf_seed = pos_state.target_term_seed(target_term);
446 let election_payload = ElectionPayload {
447 public_key: private_key.public_key(),
448 vrf_public_key: vrf_private_key.public_key(),
449 target_term,
450 vrf_proof: vrf_private_key
451 .private_key()
452 .compute(epoch_vrf_seed.as_slice())
453 .unwrap(),
454 };
455 let raw_tx = RawTransaction::new_election(
456 author,
457 election_payload,
458 self.chain_id,
459 );
460 let signed_tx =
461 raw_tx.sign(&private_key.private_key())?.into_inner();
462 let (tx, rx) = oneshot::channel();
463 self.tx_sender.send((signed_tx, tx)).await?;
464 rx.await??;
466 diem_debug!(
467 "broadcast_election sends: target_term={}",
468 target_term
469 );
470 } else {
471 diem_debug!("Skip election for elected");
472 if let Some(node_data) = pos_state.account_node_data(author) {
473 if node_data.lock_status().force_retired().is_some() {
474 warn!("The node stops elections for force retire!");
475 }
476 }
477 }
478 Ok(())
479 }
480
481 async fn generate_proposal(
482 &mut self, new_round_event: NewRoundEvent,
483 ) -> anyhow::Result<ProposalMsg> {
484 let proposal = self
487 .proposal_generator
488 .as_mut()
489 .expect("checked by process_new_round_event")
490 .generate_proposal(
491 new_round_event.round,
492 self.epoch_state.verifier().clone(),
493 )
494 .await?;
495 let mut signed_proposal = self.safety_rules.sign_proposal(proposal)?;
496 if self.proposer_election.is_random_election() {
497 signed_proposal.set_vrf_nonce_and_proof(
498 self.proposer_election
499 .gen_vrf_nonce_and_proof(signed_proposal.block_data())
500 .expect("threshold checked in is_valid_proposer"),
501 )
502 }
503 observe_block(signed_proposal.timestamp_usecs(), BlockStage::SIGNED);
504 diem_debug!(self.new_log(LogEvent::Propose), "{}", signed_proposal);
505 Ok(ProposalMsg::new(
507 signed_proposal,
508 self.block_store.sync_info(),
509 ))
510 }
511
512 pub async fn process_proposal_msg(
516 &mut self, proposal_msg: ProposalMsg,
517 ) -> anyhow::Result<()> {
518 fail_point!("consensus::process_proposal_msg", |_| {
519 Err(anyhow::anyhow!("Injected error in process_proposal_msg"))
520 });
521
522 observe_block(
523 proposal_msg.proposal().timestamp_usecs(),
524 BlockStage::RECEIVED,
525 );
526 if self
527 .ensure_round_and_sync_up(
528 proposal_msg.proposal().round(),
529 proposal_msg.sync_info(),
530 proposal_msg.proposer(),
531 true,
532 )
533 .await
534 .context("[RoundManager] Process proposal")?
535 {
536 if self
537 .process_proposal(proposal_msg.clone().take_proposal())
538 .await?
539 {
540 let exclude =
556 vec![proposal_msg.proposer(), self.network.author];
557 self.network
558 .broadcast(
559 ConsensusMsg::ProposalMsg(Box::new(proposal_msg)),
560 exclude,
561 )
562 .await;
563 }
564 Ok(())
565 } else {
566 bail!(
567 "Stale proposal {}, current round {}",
568 proposal_msg.proposal(),
569 self.round_state.current_round()
570 );
571 }
572 }
573
574 pub async fn sync_up(
578 &mut self, sync_info: &SyncInfo, author: Author, help_remote: bool,
579 ) -> anyhow::Result<()> {
580 let local_sync_info = self.block_store.sync_info();
581 if help_remote && local_sync_info.has_newer_certificates(&sync_info) {
582 counters::SYNC_INFO_MSGS_SENT_COUNT.inc();
583 diem_debug!(
584 self.new_log(LogEvent::HelpPeerSync).remote_peer(author),
585 "Remote peer has stale state {}, send it back {}",
586 sync_info,
587 local_sync_info,
588 );
589 self.network.send_sync_info(local_sync_info.clone(), author);
590 }
591 if sync_info.has_newer_certificates(&local_sync_info) {
592 diem_debug!(
593 self.new_log(LogEvent::SyncToPeer).remote_peer(author),
594 "Local state {} is stale than remote state {}",
595 local_sync_info,
596 sync_info
597 );
598 sync_info
601 .verify(&self.epoch_state().verifier())
602 .map_err(|e| {
603 diem_error!(
604 SecurityEvent::InvalidSyncInfoMsg,
605 sync_info = sync_info,
606 remote_peer = author,
607 error = ?e,
608 );
609 VerifyError::from(e)
610 })?;
611 let mut retriever = self.create_block_retriever(author);
619 self.block_store
620 .insert_quorum_cert(
621 &sync_info.highest_commit_cert(),
622 &mut retriever,
623 )
624 .await?;
625 self.block_store
626 .insert_quorum_cert(
627 &sync_info.highest_quorum_cert(),
628 &mut retriever,
629 )
630 .await?;
631 if let Some(tc) = sync_info.highest_timeout_certificate() {
632 self.block_store
633 .insert_timeout_certificate(Arc::new(tc.clone()))?;
634 }
635 self.process_certificates().await
636 } else {
637 Ok(())
638 }
639 }
640
641 pub async fn sync_to_ledger_info(
643 &mut self, ledger_info: &LedgerInfoWithSignatures,
644 peer_id: AccountAddress,
645 ) -> Result<()> {
646 diem_debug!("sync_to_ledger_info: {:?}", ledger_info);
647 let mut retriever = self.create_block_retriever(peer_id);
648 if !self
649 .block_store
650 .block_exists(ledger_info.ledger_info().consensus_block_id())
651 {
652 let block_for_ledger_info = retriever
653 .retrieve_block_for_ledger_info(ledger_info)
654 .await?;
655 self.block_store
656 .insert_quorum_cert(
657 block_for_ledger_info.quorum_cert(),
658 &mut retriever,
659 )
660 .await?;
661 self.block_store.execute_and_insert_block(
665 block_for_ledger_info,
666 true,
667 false,
668 )?;
669 };
670 self.block_store.commit(ledger_info.clone()).await?;
671 Ok(())
672 }
673
674 pub async fn ensure_round_and_sync_up(
683 &mut self, message_round: Round, sync_info: &SyncInfo, author: Author,
684 help_remote: bool,
685 ) -> anyhow::Result<bool> {
686 if message_round < self.round_state.current_round() {
687 return Ok(false);
688 }
689 self.sync_up(sync_info, author, help_remote).await?;
690 ensure!(
691 message_round == self.round_state.current_round(),
692 "After sync, round {} doesn't match local {}",
693 message_round,
694 self.round_state.current_round()
695 );
696 Ok(true)
697 }
698
699 pub async fn process_sync_info_msg(
701 &mut self, sync_info: SyncInfo, peer: Author,
702 ) -> anyhow::Result<()> {
703 fail_point!("consensus::process_sync_info_msg", |_| {
704 Err(anyhow::anyhow!("Injected error in process_sync_info_msg"))
705 });
706 diem_debug!(
707 self.new_log(LogEvent::ReceiveSyncInfo).remote_peer(peer),
708 "{}",
709 sync_info
710 );
711 self.ensure_round_and_sync_up(
714 checked!((sync_info.highest_round()) + 1)?,
715 &sync_info,
716 peer,
717 false,
718 )
719 .await
720 .context("[RoundManager] Failed to process sync info msg")?;
721 Ok(())
722 }
723
724 pub async fn process_new_round_timeout(
725 &mut self, epoch_round: (u64, Round),
726 ) -> anyhow::Result<()> {
727 diem_debug!("process_new_round_timeout: round={:?}", epoch_round);
728 if epoch_round
729 != (self.epoch_state.epoch, self.round_state.current_round())
730 {
731 return Ok(());
732 }
733 let round = epoch_round.1;
734
735 match self
736 .round_state
737 .get_round_certificate(&self.epoch_state.verifier())
738 {
739 VoteReceptionResult::NewQuorumCertificate(qc) => {
740 self.new_qc_aggregated(
741 qc.clone(),
742 qc.ledger_info()
743 .signatures()
744 .keys()
745 .next()
746 .expect("qc formed")
747 .clone(),
748 )
749 .await?;
750 }
751 VoteReceptionResult::NewTimeoutCertificate(tc) => {
752 self.new_tc_aggregated(tc).await?;
753 }
754 _ => {
755 anyhow::bail!(
757 "New round timeout without new certificate! round={}",
758 round
759 );
760 }
761 }
762 Ok(())
763 }
764
765 pub async fn process_local_timeout(
775 &mut self, epoch_round: (u64, Round),
776 ) -> anyhow::Result<()> {
777 diem_debug!("process_local_timeout: round={:?}", epoch_round);
778 if epoch_round
779 != (self.epoch_state.epoch, self.round_state.current_round())
780 {
781 return Ok(());
782 }
783 let round = epoch_round.1;
784
785 if !self.round_state.process_local_timeout(epoch_round) {
786 return Ok(());
787 }
788
789 self.network
790 .broadcast(
791 ConsensusMsg::SyncInfo(Box::new(self.block_store.sync_info())),
792 vec![],
793 )
794 .await;
795
796 match self
797 .round_state
798 .get_round_certificate(&self.epoch_state.verifier())
799 {
800 VoteReceptionResult::NewQuorumCertificate(_)
801 | VoteReceptionResult::NewTimeoutCertificate(_) => {
802 return Ok(());
804 }
805 _ => {
806 }
808 }
809
810 if !self.is_validator() {
811 return Ok(());
812 }
813
814 let (use_last_vote, mut timeout_vote) =
815 match self.round_state.vote_sent() {
816 Some(vote) if vote.vote_data().proposed().round() == round => {
817 (true, vote)
818 }
819 _ => {
820 let nil_block = self
822 .proposal_generator
823 .as_ref()
824 .expect("checked in is_validator")
825 .generate_nil_block(round)?;
826 diem_debug!(
827 self.new_log(LogEvent::VoteNIL),
828 "Planning to vote for a NIL block {}",
829 nil_block
830 );
831 counters::VOTE_NIL_COUNT.inc();
832 let nil_vote = self.execute_and_vote(nil_block).await?;
833 (false, nil_vote)
834 }
835 };
836
837 if !timeout_vote.is_timeout() {
838 let timeout = timeout_vote.timeout();
839 let signature = self
840 .safety_rules
841 .sign_timeout(&timeout)
842 .context("[RoundManager] SafetyRules signs timeout")?;
843 timeout_vote.add_timeout_signature(signature);
844 }
845
846 self.round_state.record_vote(timeout_vote.clone());
847 let timeout_vote_msg = ConsensusMsg::VoteMsg(Box::new(VoteMsg::new(
848 timeout_vote,
849 self.block_store.sync_info(),
850 )));
851 self.network.broadcast(timeout_vote_msg, vec![]).await;
852 diem_error!(
853 round = round,
854 voted = use_last_vote,
855 event = LogEvent::Timeout,
856 );
857 bail!("Round {} timeout, broadcast to all peers", round);
858 }
859
860 pub async fn process_proposal_timeout(
861 &mut self, epoch_round: (u64, Round),
862 ) -> anyhow::Result<()> {
863 diem_debug!("process_proposal_timeout: round={:?}", epoch_round);
864 if epoch_round
865 != (self.epoch_state.epoch, self.round_state.current_round())
866 {
867 return Ok(());
868 }
869 let round = epoch_round.1;
870
871 if let Some(proposal) = self.proposer_election.choose_proposal_to_vote()
872 {
873 if self.is_validator() {
874 let vote = self
876 .execute_and_vote(proposal)
877 .await
878 .context("[RoundManager] Process proposal")?;
879 diem_debug!(self.new_log(LogEvent::Vote), "{}", vote);
880
881 self.round_state.record_vote(vote.clone());
882 let vote_msg = VoteMsg::new(vote, self.block_store.sync_info());
883 self.network
884 .broadcast(
885 ConsensusMsg::VoteMsg(Box::new(vote_msg)),
886 vec![],
887 )
888 .await;
889 Ok(())
890 } else {
891 self.block_store
893 .execute_and_insert_block(proposal, false, false)
894 .context(
895 "[RoundManager] Failed to execute_and_insert the block",
896 )?;
897 Ok(())
898 }
899 } else {
900 debug!("No proposal to vote: round={}", round);
901 self.process_local_timeout(epoch_round).await
903 }
904 }
905
906 async fn process_certificates(&mut self) -> anyhow::Result<()> {
909 let sync_info = self.block_store.sync_info();
910 if let Some(new_round_event) =
911 self.round_state.process_certificates(sync_info)
912 {
913 self.process_new_round_event(new_round_event).await?;
914 }
915 Ok(())
916 }
917
918 async fn process_proposal(&mut self, proposal: Block) -> Result<bool> {
927 let author = proposal
928 .author()
929 .expect("Proposal should be verified having an author");
930
931 diem_info!(
932 self.new_log(LogEvent::ReceiveProposal).remote_peer(author),
933 block_hash = proposal.id(),
934 block_parent_hash = proposal.quorum_cert().certified_block().id(),
935 );
936
937 ensure!(
938 self.proposer_election.is_valid_proposal(&proposal),
939 "[RoundManager] Proposer {} for block {} is not a valid proposer for this round",
940 author,
941 proposal,
942 );
943
944 let block_time_since_epoch =
945 Duration::from_micros(proposal.timestamp_usecs());
946
947 ensure!(
948 block_time_since_epoch < self.round_state.current_round_deadline(),
949 "[RoundManager] Waiting until proposal block timestamp usecs {:?} \
950 would exceed the round duration {:?}, hence will not vote for this round",
951 block_time_since_epoch,
952 self.round_state.current_round_deadline(),
953 );
954
955 observe_block(proposal.timestamp_usecs(), BlockStage::SYNCED);
956
957 if self.proposer_election.is_random_election() {
958 if self
959 .proposer_election
960 .receive_proposal_candidate(&proposal)?
961 {
962 self.block_store.execute_and_insert_block(
963 proposal.clone(),
964 false,
965 false,
966 )?;
967 self.proposer_election.set_proposal_candidate(proposal);
968 Ok(true)
969 } else {
970 Ok(false)
975 }
976 } else {
977 bail!("unsupported election rules")
978 }
979 }
980
981 async fn execute_and_vote(
988 &mut self, proposed_block: Block,
989 ) -> anyhow::Result<Vote> {
990 let executed_block = self
991 .block_store
992 .execute_and_insert_block(proposed_block, false, false)
993 .context("[RoundManager] Failed to execute_and_insert the block")?;
994 let compute_result = executed_block.compute_result();
996 if let Err(e) = self
997 .txn_manager
998 .notify(executed_block.block(), compute_result)
999 .await
1000 {
1001 diem_error!(
1002 error = ?e, "[RoundManager] Failed to notify mempool of rejected txns",
1003 );
1004 }
1005
1006 ensure!(
1008 self.round_state.vote_sent().is_none(),
1009 "[RoundManager] Already vote on this round {}",
1010 self.round_state.current_round()
1011 );
1012
1013 ensure!(
1014 !self.sync_only,
1015 "[RoundManager] sync_only flag is set, stop voting"
1016 );
1017
1018 let maybe_signed_vote_proposal =
1019 executed_block.maybe_signed_vote_proposal();
1020 let vote = self
1021 .safety_rules
1022 .construct_and_sign_vote(&maybe_signed_vote_proposal)
1023 .context(format!(
1024 "[RoundManager] SafetyRules {}Rejected",
1025 executed_block.block()
1029 ))?;
1030 observe_block(
1031 executed_block.block().timestamp_usecs(),
1032 BlockStage::VOTED,
1033 );
1034
1035 self.storage
1036 .save_vote(&vote)
1037 .context("[RoundManager] Fail to persist last vote")?;
1038
1039 Ok(vote)
1040 }
1041
1042 pub async fn process_vote_msg(
1049 &mut self, vote_msg: VoteMsg,
1050 ) -> anyhow::Result<()> {
1051 fail_point!("consensus::process_vote_msg", |_| {
1052 Err(anyhow::anyhow!("Injected error in process_vote_msg"))
1053 });
1054 if self
1055 .ensure_round_and_sync_up(
1056 vote_msg.vote().vote_data().proposed().round(),
1057 vote_msg.sync_info(),
1058 vote_msg.vote().author(),
1059 true,
1060 )
1061 .await
1062 .context("[RoundManager] Stop processing vote")?
1063 {
1064 let relay = self
1065 .process_vote(vote_msg.vote())
1066 .await
1067 .context("[RoundManager] Add a new vote")?;
1068 if relay {
1069 let exclude =
1070 vec![vote_msg.vote().author(), self.network.author];
1071 self.network
1072 .broadcast(
1073 ConsensusMsg::VoteMsg(Box::new(vote_msg)),
1074 exclude,
1075 )
1076 .await;
1077 }
1078 }
1079 Ok(())
1080 }
1081
1082 async fn process_vote(&mut self, vote: &Vote) -> anyhow::Result<bool> {
1089 diem_info!(
1090 self.new_log(LogEvent::ReceiveVote)
1091 .remote_peer(vote.author()),
1092 vote = %vote,
1093 vote_epoch = vote.vote_data().proposed().epoch(),
1094 vote_round = vote.vote_data().proposed().round(),
1095 vote_id = vote.vote_data().proposed().id(),
1096 vote_state = vote.vote_data().proposed().executed_state_id(),
1097 );
1098
1099 let mut relay = true;
1101 match self
1102 .round_state
1103 .insert_vote(vote, &self.epoch_state.verifier())
1104 {
1105 VoteReceptionResult::NewQuorumCertificate(_)
1106 | VoteReceptionResult::NewTimeoutCertificate(_) => {
1107 self.round_state
1110 .setup_new_round_timeout(self.epoch_state.epoch);
1111 }
1112 VoteReceptionResult::VoteAdded(_) => {}
1113 VoteReceptionResult::DuplicateVote => {
1114 relay = false;
1117 }
1118 VoteReceptionResult::EquivocateVote((vote1, vote2)) => {
1119 match &self.proposal_generator {
1123 Some(proposal_generator) => {
1124 ensure!(
1125 vote1.author() == vote2.author(),
1126 "incorrect author"
1127 );
1128 ensure!(
1129 vote1.vote_data().proposed().round()
1130 == vote2.vote_data().proposed().round(),
1131 "incorrect round"
1132 );
1133 diem_warn!("Find Equivocate Vote!!! author={}, vote1={:?}, vote2={:?}", vote.author(), vote1, vote2);
1134 let dispute_payload = DisputePayload {
1135 address: vote1.author(),
1136 bls_pub_key: self
1137 .epoch_state
1138 .verifier()
1139 .get_public_key(&vote1.author())
1140 .expect("checked in verify"),
1141 vrf_pub_key: self
1142 .epoch_state
1143 .verifier()
1144 .get_vrf_public_key(&vote1.author())
1145 .expect("checked in verify")
1146 .unwrap(),
1147 conflicting_votes: ConflictSignature::Vote((
1148 bcs::to_bytes(&vote1).expect("encoding error"),
1149 bcs::to_bytes(&vote2).expect("encoding error"),
1150 )),
1151 };
1152 let raw_tx = RawTransaction::new_dispute(
1153 proposal_generator.author(),
1154 dispute_payload,
1155 );
1156 let signed_tx = raw_tx
1157 .sign(&proposal_generator.private_key)?
1158 .into_inner();
1159 let (tx, rx) = oneshot::channel();
1162 self.tx_sender.send((signed_tx, tx)).await?;
1163 rx.await??;
1164 }
1165 None => {}
1166 }
1167 bail!("EquivocateVote!")
1168 }
1169 r => bail!("vote not added with result {:?}", r),
1172 }
1173 Ok(relay)
1174 }
1175
1176 async fn new_qc_aggregated(
1177 &mut self, qc: Arc<QuorumCert>, preferred_peer: Author,
1178 ) -> anyhow::Result<()> {
1179 observe_block(
1180 qc.certified_block().timestamp_usecs(),
1181 BlockStage::QC_AGGREGATED,
1182 );
1183 let result = self
1184 .block_store
1185 .insert_quorum_cert(
1186 &qc,
1187 &mut self.create_block_retriever(preferred_peer),
1188 )
1189 .await
1190 .context("[RoundManager] Failed to process a newly aggregated QC");
1191 self.process_certificates().await?;
1192 result
1193 }
1194
1195 async fn new_tc_aggregated(
1196 &mut self, tc: Arc<TimeoutCertificate>,
1197 ) -> anyhow::Result<()> {
1198 let result = self
1199 .block_store
1200 .insert_timeout_certificate(tc.clone())
1201 .context("[RoundManager] Failed to process a newly aggregated TC");
1202 self.process_certificates().await?;
1203 result
1204 }
1205
1206 pub async fn process_block_retrieval(
1213 &self, request: IncomingBlockRetrievalRequest,
1214 ) -> anyhow::Result<()> {
1215 fail_point!("consensus::process_block_retrieval", |_| {
1216 Err(anyhow::anyhow!("Injected error in process_block_retrieval"))
1217 });
1218 let mut blocks = vec![];
1219 let mut status = BlockRetrievalStatus::Succeeded;
1220 let mut id = request.req.block_id();
1221 while (blocks.len() as u64) < request.req.num_blocks() {
1222 if let Some(executed_block) = self.block_store.get_block(id) {
1223 if executed_block.block().is_genesis_block() {
1224 break;
1225 }
1226 id = executed_block.parent_id();
1227 blocks.push(executed_block.block().clone());
1228 } else if let Ok(Some(block)) =
1229 self.block_store.get_ledger_block(&id)
1230 {
1231 if block.is_genesis_block() {
1232 break;
1233 }
1234 id = block.parent_id();
1235 blocks.push(block);
1236 } else {
1237 break;
1240 }
1241 }
1242
1243 if blocks.is_empty() {
1244 status = BlockRetrievalStatus::IdNotFound;
1245 }
1246
1247 let response = BlockRetrievalRpcResponse {
1248 request_id: request.request_id,
1249 response: BlockRetrievalResponse::new(status, blocks),
1250 };
1251 self.network
1252 .network_sender()
1253 .send_message_with_peer_id(&request.peer_id, &response)?;
1254 Ok(())
1255 }
1256
1257 pub async fn start(&mut self, last_vote_sent: Option<Vote>) {
1259 let new_round_event = self
1260 .round_state
1261 .process_certificates(self.block_store.sync_info())
1262 .expect(
1263 "Can not jump start a round_state from existing certificates.",
1264 );
1265 if let Some(vote) = last_vote_sent {
1266 self.round_state.record_vote(vote);
1267 }
1268 if let Err(e) = self.process_new_round_event(new_round_event).await {
1269 diem_error!(error = ?e, "[RoundManager] Error during start");
1270 }
1271 }
1272
1273 #[cfg(test)]
1275 #[allow(unused)]
1276 pub fn consensus_state(&mut self) -> ConsensusState {
1277 self.safety_rules.consensus_state().unwrap()
1278 }
1279
1280 #[cfg(test)]
1281 #[allow(unused)]
1282 pub fn set_safety_rules(&mut self, safety_rules: MetricsSafetyRules) {
1283 self.safety_rules = safety_rules
1284 }
1285
1286 pub fn epoch_state(&self) -> &EpochState { &self.epoch_state }
1287
1288 pub fn round_state(&self) -> &RoundState { &self.round_state }
1289
1290 fn new_log(&self, event: LogEvent) -> LogSchema {
1291 LogSchema::new(event)
1292 .round(self.round_state.current_round())
1293 .epoch(self.epoch_state.epoch)
1294 }
1295
1296 fn is_validator(&self) -> bool {
1297 let r = self.proposal_generator.is_some();
1298 diem_debug!("Check validator: r={} is_voting={}", r, self.is_voting);
1299 r && self.is_voting
1300 }
1301
1302 pub fn filter_proposal(&self, p: &ProposalMsg) -> bool {
1304 self.proposer_election
1305 .receive_proposal_candidate(p.proposal())
1306 .unwrap_or(false)
1307 }
1308
1309 pub fn filter_vote(&self, v: &VoteMsg) -> bool {
1311 !self.round_state.vote_received(v.vote())
1312 }
1313}
1314
1315impl RoundManager {
1317 pub async fn force_vote_proposal(
1321 &mut self, block_id: HashValue, author: Author,
1322 private_key: &ConsensusPrivateKey,
1323 ) -> Result<()> {
1324 let proposal = self
1325 .block_store
1326 .get_block(block_id)
1327 .ok_or(anyhow::anyhow!("force sign block not received"))?;
1328 let vote_proposal = proposal.maybe_signed_vote_proposal().vote_proposal;
1329 let vote_data =
1330 SafetyRules::extension_check(&vote_proposal).map_err(|e| {
1331 anyhow::anyhow!("extension_check error: err={:?}", e)
1332 })?;
1333 let ledger_info = SafetyRules::construct_ledger_info(
1334 vote_proposal.block(),
1335 vote_data.hash(),
1336 )
1337 .map_err(|e| anyhow::anyhow!("extension_check error: err={:?}", e))?;
1338 let signature = private_key.sign(&ledger_info);
1339 let vote =
1340 Vote::new_with_signature(vote_data, author, ledger_info, signature);
1341 let vote_msg = VoteMsg::new(vote, self.block_store.sync_info());
1342 diem_debug!("force_vote_proposal: broadcast {:?}", vote_msg);
1343 self.network
1344 .broadcast(ConsensusMsg::VoteMsg(Box::new(vote_msg)), vec![])
1345 .await;
1346 Ok(())
1347 }
1348
1349 pub async fn force_propose(
1353 &mut self, round: Round, parent_block_id: HashValue,
1354 payload: Vec<TransactionPayload>, private_key: &ConsensusPrivateKey,
1355 ) -> Result<()> {
1356 let parent_qc = self
1357 .block_store
1358 .get_quorum_cert_for_block(parent_block_id)
1359 .ok_or(anyhow::anyhow!(
1360 "no QC for parent: {:?}",
1361 parent_block_id
1362 ))?;
1363 let block_data = self
1364 .proposal_generator
1365 .as_ref()
1366 .ok_or(anyhow::anyhow!("proposal generator is None"))?
1367 .force_propose(round, parent_qc, payload)?;
1368 let signature = private_key.sign(&block_data);
1369 let mut signed_proposal =
1370 Block::new_proposal_from_block_data_and_signature(
1371 block_data, signature, None,
1372 );
1373 signed_proposal.set_vrf_nonce_and_proof(
1376 self.proposer_election
1377 .gen_vrf_nonce_and_proof(signed_proposal.block_data())
1378 .ok_or(anyhow::anyhow!(
1379 "The proposer should not propose in this round"
1380 ))?,
1381 );
1382 let proposal_msg =
1385 ProposalMsg::new(signed_proposal, self.block_store.sync_info());
1386 diem_debug!("force_propose: broadcast {:?}", proposal_msg);
1387 self.network
1388 .broadcast(
1389 ConsensusMsg::ProposalMsg(Box::new(proposal_msg)),
1390 vec![],
1391 )
1392 .await;
1393 Ok(())
1394 }
1395
1396 pub async fn force_sign_pivot_decision(
1397 &mut self, pivot_decision: PivotBlockDecision,
1398 ) -> anyhow::Result<()> {
1399 let proposal_generator = self.proposal_generator.as_ref().ok_or(
1400 anyhow::anyhow!("Non-validator cannot sign pivot decision"),
1401 )?;
1402 diem_info!("force_sign_pivot_decision: {:?}", pivot_decision);
1403 let raw_tx = RawTransaction::new_pivot_decision(
1406 proposal_generator.author(),
1407 pivot_decision,
1408 self.chain_id,
1409 );
1410 let signed_tx =
1411 raw_tx.sign(&proposal_generator.private_key)?.into_inner();
1412 let (tx, rx) = oneshot::channel();
1413 self.tx_sender.send((signed_tx, tx)).await?;
1414 rx.await??;
1416 diem_debug!("force_sign_pivot_decision sends");
1417 Ok(())
1418 }
1419
1420 pub fn get_chosen_proposal(&self) -> anyhow::Result<Option<Block>> {
1421 let chosen = self.proposer_election.choose_proposal_to_vote();
1424 Ok(chosen)
1425 }
1426
1427 pub fn start_voting(&mut self, initialize: bool) -> anyhow::Result<()> {
1428 if !initialize {
1429 self.safety_rules
1430 .start_voting(initialize)
1431 .map_err(anyhow::Error::from)?;
1432 }
1433 self.is_voting = true;
1434 Ok(())
1435 }
1436
1437 pub fn stop_voting(&mut self) -> anyhow::Result<()> {
1438 self.safety_rules
1439 .stop_voting()
1440 .map_err(anyhow::Error::from)?;
1441 self.is_voting = false;
1442 Ok(())
1443 }
1444}