cfxcore/pos/consensus/
round_manager.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8use std::{
9    sync::{
10        atomic::{AtomicBool, Ordering as AtomicOrdering},
11        Arc,
12    },
13    time::Duration,
14};
15
16use anyhow::{bail, ensure, Context, Result};
17use fail::fail_point;
18use futures::{
19    channel::{mpsc, oneshot},
20    SinkExt,
21};
22use serde::Serialize;
23
24use consensus_types::{
25    block::Block,
26    block_retrieval::{BlockRetrievalResponse, BlockRetrievalStatus},
27    common::{Author, Round},
28    proposal_msg::ProposalMsg,
29    quorum_cert::QuorumCert,
30    sync_info::SyncInfo,
31    timeout_certificate::TimeoutCertificate,
32    vote::Vote,
33    vote_msg::VoteMsg,
34};
35use diem_config::keys::ConfigKey;
36use diem_crypto::{hash::CryptoHash, HashValue, SigningKey, VRFPrivateKey};
37use diem_infallible::checked;
38use diem_logger::prelude::*;
39use diem_types::{
40    account_address::{from_consensus_public_key, AccountAddress},
41    block_info::PivotBlockDecision,
42    chain_id::ChainId,
43    epoch_state::EpochState,
44    ledger_info::LedgerInfoWithSignatures,
45    transaction::{
46        ConflictSignature, DisputePayload, ElectionPayload, RawTransaction,
47        SignedTransaction, TransactionPayload,
48    },
49    validator_config::{ConsensusPrivateKey, ConsensusVRFPrivateKey},
50    validator_verifier::ValidatorVerifier,
51};
52#[cfg(test)]
53use safety_rules::ConsensusState;
54use safety_rules::{SafetyRules, TSafetyRules};
55
56use crate::pos::{
57    mempool::SubmissionStatus,
58    protocol::message::block_retrieval_response::BlockRetrievalRpcResponse,
59};
60
61use super::{
62    block_storage::{
63        tracing::{observe_block, BlockStage},
64        BlockReader, BlockRetriever, BlockStore,
65    },
66    counters,
67    error::VerifyError,
68    liveness::{
69        proposal_generator::ProposalGenerator,
70        proposer_election::ProposerElection,
71        round_state::{NewRoundEvent, NewRoundReason, RoundState},
72    },
73    logging::{LogEvent, LogSchema},
74    metrics_safety_rules::MetricsSafetyRules,
75    network::{
76        ConsensusMsg, ConsensusNetworkSender, IncomingBlockRetrievalRequest,
77    },
78    pending_votes::VoteReceptionResult,
79    persistent_liveness_storage::{PersistentLivenessStorage, RecoveryData},
80    state_replication::{StateComputer, TxnManager},
81};
82
83#[derive(Serialize, Clone)]
84pub enum UnverifiedEvent {
85    ProposalMsg(Box<ProposalMsg>),
86    VoteMsg(Box<VoteMsg>),
87    SyncInfo(Box<SyncInfo>),
88}
89
90impl UnverifiedEvent {
91    pub fn verify(
92        self, validator: &ValidatorVerifier, epoch_vrf_seed: &[u8],
93    ) -> Result<VerifiedEvent, VerifyError> {
94        Ok(match self {
95            UnverifiedEvent::ProposalMsg(p) => {
96                p.verify(validator, epoch_vrf_seed)?;
97                VerifiedEvent::ProposalMsg(p)
98            }
99            UnverifiedEvent::VoteMsg(v) => {
100                v.verify(validator)?;
101                VerifiedEvent::VoteMsg(v)
102            }
103            UnverifiedEvent::SyncInfo(s) => {
104                s.verify(validator)?;
105                VerifiedEvent::SyncInfo(s)
106            }
107        })
108    }
109
110    pub fn epoch(&self) -> u64 {
111        match self {
112            UnverifiedEvent::ProposalMsg(p) => p.epoch(),
113            UnverifiedEvent::VoteMsg(v) => v.epoch(),
114            UnverifiedEvent::SyncInfo(s) => s.epoch(),
115        }
116    }
117}
118
119impl From<ConsensusMsg> for UnverifiedEvent {
120    fn from(value: ConsensusMsg) -> Self {
121        match value {
122            ConsensusMsg::ProposalMsg(m) => UnverifiedEvent::ProposalMsg(m),
123            ConsensusMsg::VoteMsg(m) => UnverifiedEvent::VoteMsg(m),
124            ConsensusMsg::SyncInfo(m) => UnverifiedEvent::SyncInfo(m),
125            _ => unreachable!("Unexpected conversion"),
126        }
127    }
128}
129
130pub enum VerifiedEvent {
131    ProposalMsg(Box<ProposalMsg>),
132    VoteMsg(Box<VoteMsg>),
133    SyncInfo(Box<SyncInfo>),
134}
135
136#[cfg(test)]
137#[path = "round_manager_test.rs"]
138mod round_manager_test;
139
140#[cfg(feature = "fuzzing")]
141#[path = "round_manager_fuzzing.rs"]
142pub mod round_manager_fuzzing;
143
144/// If the node can't recover corresponding blocks from local storage,
145/// RecoveryManager is responsible for processing the events carrying sync info
146/// and use the info to retrieve blocks from peers
147pub struct RecoveryManager {
148    epoch_state: EpochState,
149    network: ConsensusNetworkSender,
150    storage: Arc<dyn PersistentLivenessStorage>,
151    state_computer: Arc<dyn StateComputer>,
152    last_committed_round: Round,
153}
154
155impl RecoveryManager {
156    pub fn new(
157        epoch_state: EpochState, network: ConsensusNetworkSender,
158        storage: Arc<dyn PersistentLivenessStorage>,
159        state_computer: Arc<dyn StateComputer>, last_committed_round: Round,
160    ) -> Self {
161        RecoveryManager {
162            epoch_state,
163            network,
164            storage,
165            state_computer,
166            last_committed_round,
167        }
168    }
169
170    pub async fn process_proposal_msg(
171        &mut self, proposal_msg: ProposalMsg,
172    ) -> Result<RecoveryData> {
173        let author = proposal_msg.proposer();
174        let sync_info = proposal_msg.sync_info();
175        self.sync_up(&sync_info, author).await
176    }
177
178    pub async fn process_vote_msg(
179        &mut self, vote_msg: VoteMsg,
180    ) -> Result<RecoveryData> {
181        let author = vote_msg.vote().author();
182        let sync_info = vote_msg.sync_info();
183        self.sync_up(&sync_info, author).await
184    }
185
186    pub async fn sync_up(
187        &mut self, sync_info: &SyncInfo, peer: Author,
188    ) -> Result<RecoveryData> {
189        sync_info
190            .verify(&self.epoch_state.verifier())
191            .map_err(VerifyError::from)?;
192        ensure!(
193            sync_info.highest_round() > self.last_committed_round,
194            "[RecoveryManager] Received sync info has lower round number than committed block"
195        );
196        ensure!(
197            sync_info.epoch() == self.epoch_state.epoch,
198            "[RecoveryManager] Received sync info is in different epoch than committed block"
199        );
200        let mut retriever = BlockRetriever::new(self.network.clone(), peer);
201        let recovery_data = BlockStore::fast_forward_sync(
202            &sync_info.highest_commit_cert(),
203            &mut retriever,
204            self.storage.clone(),
205            self.state_computer.clone(),
206        )
207        .await?;
208
209        Ok(recovery_data)
210    }
211
212    pub fn epoch_state(&self) -> &EpochState { &self.epoch_state }
213}
214
215/// Consensus SMR is working in an event based fashion: RoundManager is
216/// responsible for processing the individual events (e.g., process_new_round,
217/// process_proposal, process_vote, etc.). It is exposing the async processing
218/// functions for each event type. The caller is responsible for running the
219/// event loops and driving the execution via some executors.
220pub struct RoundManager {
221    epoch_state: EpochState,
222    block_store: Arc<BlockStore>,
223    round_state: RoundState,
224    proposer_election: Box<dyn ProposerElection + Send + Sync>,
225    // None if this is not a validator.
226    proposal_generator: Option<ProposalGenerator>,
227    safety_rules: MetricsSafetyRules,
228    network: ConsensusNetworkSender,
229    txn_manager: Arc<dyn TxnManager>,
230    storage: Arc<dyn PersistentLivenessStorage>,
231    sync_only: bool,
232    tx_sender: mpsc::Sender<(
233        SignedTransaction,
234        oneshot::Sender<anyhow::Result<SubmissionStatus>>,
235    )>,
236    chain_id: ChainId,
237
238    is_voting: bool,
239    election_control: Arc<AtomicBool>,
240    consensus_private_key: Option<ConfigKey<ConsensusPrivateKey>>,
241    vrf_private_key: Option<ConfigKey<ConsensusVRFPrivateKey>>,
242}
243
244impl RoundManager {
245    pub fn new(
246        epoch_state: EpochState, block_store: Arc<BlockStore>,
247        round_state: RoundState,
248        proposer_election: Box<dyn ProposerElection + Send + Sync>,
249        proposal_generator: Option<ProposalGenerator>,
250        safety_rules: MetricsSafetyRules, network: ConsensusNetworkSender,
251        txn_manager: Arc<dyn TxnManager>,
252        storage: Arc<dyn PersistentLivenessStorage>, sync_only: bool,
253        tx_sender: mpsc::Sender<(
254            SignedTransaction,
255            oneshot::Sender<anyhow::Result<SubmissionStatus>>,
256        )>,
257        chain_id: ChainId, is_voting: bool, election_control: Arc<AtomicBool>,
258        consensus_private_key: Option<ConfigKey<ConsensusPrivateKey>>,
259        vrf_private_key: Option<ConfigKey<ConsensusVRFPrivateKey>>,
260    ) -> Self {
261        counters::OP_COUNTERS
262            .gauge("sync_only")
263            .set(sync_only as i64);
264        Self {
265            epoch_state,
266            block_store,
267            round_state,
268            proposer_election,
269            proposal_generator,
270            is_voting,
271            safety_rules,
272            txn_manager,
273            network,
274            storage,
275            sync_only,
276            tx_sender,
277            chain_id,
278            election_control,
279            consensus_private_key,
280            vrf_private_key,
281        }
282    }
283
284    fn create_block_retriever(&self, author: Author) -> BlockRetriever {
285        BlockRetriever::new(self.network.clone(), author)
286    }
287
288    /// Leader:
289    ///
290    /// This event is triggered by a new quorum certificate at the previous
291    /// round or a timeout certificate at the previous round.  In either
292    /// case, if this replica is the new proposer for this round, it is
293    /// ready to propose and guarantee that it can create a proposal
294    /// that all honest replicas can vote for.  While this method should only be
295    /// invoked at most once per round, we ensure that only at most one
296    /// proposal can get generated per round to avoid accidental
297    /// equivocation of proposals.
298    ///
299    /// Replica:
300    ///
301    /// Do nothing
302    async fn process_new_round_event(
303        &mut self, new_round_event: NewRoundEvent,
304    ) -> anyhow::Result<()> {
305        counters::CURRENT_ROUND.set(new_round_event.round as i64);
306        counters::ROUND_TIMEOUT_MS
307            .set(new_round_event.timeout.as_millis() as i64);
308        match new_round_event.reason {
309            NewRoundReason::QCReady => {
310                counters::QC_ROUNDS_COUNT.inc();
311            }
312            NewRoundReason::Timeout => {
313                counters::TIMEOUT_ROUNDS_COUNT.inc();
314            }
315        };
316        diem_debug!(
317            self.new_log(LogEvent::NewRound),
318            reason = new_round_event.reason
319        );
320        if self.proposer_election.is_random_election() {
321            self.proposer_election.next_round(
322                new_round_event.round,
323                self.epoch_state.vrf_seed.clone(),
324            );
325            self.round_state
326                .setup_proposal_timeout(self.epoch_state.epoch);
327        }
328
329        if let Err(e) = self.broadcast_pivot_decision().await {
330            diem_error!("error in broadcasting pivot decision tx: {:?}", e);
331        }
332        // After the election transaction has been packed and executed,
333        // `broadcast_election` will be a no-op.
334        if let Err(e) = self.broadcast_election().await {
335            diem_error!("error in broadcasting election tx: {:?}", e);
336        }
337
338        if self.is_validator() {
339            if let Some(ref proposal_generator) = self.proposal_generator {
340                let author = proposal_generator.author();
341
342                if self
343                    .proposer_election
344                    .is_valid_proposer(author, new_round_event.round)
345                {
346                    let proposal_msg = ConsensusMsg::ProposalMsg(Box::new(
347                        self.generate_proposal(new_round_event).await?,
348                    ));
349                    let mut network = self.network.clone();
350                    network.broadcast(proposal_msg, vec![]).await;
351                    counters::PROPOSALS_COUNT.inc();
352                }
353            }
354        }
355        Ok(())
356    }
357
358    pub async fn broadcast_pivot_decision(&mut self) -> anyhow::Result<()> {
359        if !self.is_validator() {
360            // Not an active validator, so do not need to sign pivot decision.
361            return Ok(());
362        }
363        diem_debug!("broadcast_pivot_decision starts");
364
365        let hqc = self.block_store.highest_quorum_cert();
366        let parent_block = hqc.certified_block();
367        // TODO(lpl): Check if this may happen.
368        if self.block_store.path_from_root(parent_block.id()).is_none() {
369            bail!("HQC {} already pruned", parent_block);
370        }
371
372        // Sending non-existent H256 (default) will return the latest pivot
373        // decision.
374        let parent_decision = parent_block
375            .pivot_decision()
376            .map(|d| d.block_hash)
377            .unwrap_or_default();
378        let pivot_decision = match self
379            .block_store
380            .pow_handler
381            .next_pivot_decision(parent_decision)
382            .await
383        {
384            Some(res) => res,
385            None => {
386                // No new pivot decision.
387                diem_debug!("No new pivot decision");
388                return Ok(());
389            }
390        };
391
392        let proposal_generator =
393            self.proposal_generator.as_ref().expect("checked");
394        diem_info!("Broadcast new pivot decision: {:?}", pivot_decision);
395        // It's allowed for a node to sign conflict pivot decision,
396        // so we do not need to persist this signing event.
397        let raw_tx = RawTransaction::new_pivot_decision(
398            proposal_generator.author(),
399            PivotBlockDecision {
400                block_hash: pivot_decision.1,
401                height: pivot_decision.0,
402            },
403            self.chain_id,
404        );
405        let signed_tx =
406            raw_tx.sign(&proposal_generator.private_key)?.into_inner();
407        let (tx, rx) = oneshot::channel();
408        self.tx_sender.send((signed_tx, tx)).await?;
409        // TODO(lpl): Check if we want to wait here.
410        rx.await??;
411        diem_debug!("broadcast_pivot_decision sends");
412        Ok(())
413    }
414
415    pub async fn broadcast_election(&mut self) -> anyhow::Result<()> {
416        if !self.election_control.load(AtomicOrdering::Relaxed) {
417            diem_debug!("Skip election for election_control");
418            return Ok(());
419        }
420        if !self.is_voting {
421            // This node does not participate in any signing or voting.
422            return Ok(());
423        }
424        if !self.block_store.pow_handler.is_normal_phase() {
425            // Do not start election before PoW enters normal phase so we will
426            // not be force retired unexpectedly because we are
427            // elected but cannot vote.
428            return Ok(());
429        }
430        if self.vrf_private_key.is_none()
431            || self.consensus_private_key.is_none()
432        {
433            diem_warn!("broadcast_election without keys");
434            return Ok(());
435        }
436        let private_key = self.consensus_private_key.as_ref().unwrap();
437        let vrf_private_key = self.vrf_private_key.as_ref().unwrap();
438        let author = from_consensus_public_key(
439            &private_key.public_key(),
440            &vrf_private_key.public_key(),
441        );
442        diem_debug!("broadcast_election starts");
443        let pos_state = self.storage.pos_ledger_db().get_latest_pos_state();
444        if let Some(target_term) = pos_state.next_elect_term(&author) {
445            let epoch_vrf_seed = pos_state.target_term_seed(target_term);
446            let election_payload = ElectionPayload {
447                public_key: private_key.public_key(),
448                vrf_public_key: vrf_private_key.public_key(),
449                target_term,
450                vrf_proof: vrf_private_key
451                    .private_key()
452                    .compute(epoch_vrf_seed.as_slice())
453                    .unwrap(),
454            };
455            let raw_tx = RawTransaction::new_election(
456                author,
457                election_payload,
458                self.chain_id,
459            );
460            let signed_tx =
461                raw_tx.sign(&private_key.private_key())?.into_inner();
462            let (tx, rx) = oneshot::channel();
463            self.tx_sender.send((signed_tx, tx)).await?;
464            // TODO(lpl): Check if we want to wait here.
465            rx.await??;
466            diem_debug!(
467                "broadcast_election sends: target_term={}",
468                target_term
469            );
470        } else {
471            diem_debug!("Skip election for elected");
472            if let Some(node_data) = pos_state.account_node_data(author) {
473                if node_data.lock_status().force_retired().is_some() {
474                    warn!("The node stops elections for force retire!");
475                }
476            }
477        }
478        Ok(())
479    }
480
481    async fn generate_proposal(
482        &mut self, new_round_event: NewRoundEvent,
483    ) -> anyhow::Result<ProposalMsg> {
484        // Proposal generator will ensure that at most one proposal is generated
485        // per round
486        let proposal = self
487            .proposal_generator
488            .as_mut()
489            .expect("checked by process_new_round_event")
490            .generate_proposal(
491                new_round_event.round,
492                self.epoch_state.verifier().clone(),
493            )
494            .await?;
495        let mut signed_proposal = self.safety_rules.sign_proposal(proposal)?;
496        if self.proposer_election.is_random_election() {
497            signed_proposal.set_vrf_nonce_and_proof(
498                self.proposer_election
499                    .gen_vrf_nonce_and_proof(signed_proposal.block_data())
500                    .expect("threshold checked in is_valid_proposer"),
501            )
502        }
503        observe_block(signed_proposal.timestamp_usecs(), BlockStage::SIGNED);
504        diem_debug!(self.new_log(LogEvent::Propose), "{}", signed_proposal);
505        // return proposal
506        Ok(ProposalMsg::new(
507            signed_proposal,
508            self.block_store.sync_info(),
509        ))
510    }
511
512    /// Process the proposal message:
513    /// 1. ensure after processing sync info, we're at the same round as the
514    /// proposal 2. execute and decide whether to vode for the proposal
515    pub async fn process_proposal_msg(
516        &mut self, proposal_msg: ProposalMsg,
517    ) -> anyhow::Result<()> {
518        fail_point!("consensus::process_proposal_msg", |_| {
519            Err(anyhow::anyhow!("Injected error in process_proposal_msg"))
520        });
521
522        observe_block(
523            proposal_msg.proposal().timestamp_usecs(),
524            BlockStage::RECEIVED,
525        );
526        if self
527            .ensure_round_and_sync_up(
528                proposal_msg.proposal().round(),
529                proposal_msg.sync_info(),
530                proposal_msg.proposer(),
531                true,
532            )
533            .await
534            .context("[RoundManager] Process proposal")?
535        {
536            if self
537                .process_proposal(proposal_msg.clone().take_proposal())
538                .await?
539            {
540                // If a proposal has been received and voted, it will return
541                // error or false.
542                //
543                // 1. For old leader elections where there is only one leader
544                // and we vote after receiving the first
545                // proposal, the error is returned in
546                // `execute_and_vote` because `vote_sent.
547                // is_none()` is false. 2. For VRF leader election, we
548                // return Ok(false) when we insert a proposal from the same
549                // author to proposal_candidates.
550                //
551                // This ensures that there is no broadcast storm
552                // because we only broadcast a proposal when we receive it for
553                // the first time.
554                // TODO(lpl): Do not send to the sender and the original author.
555                let exclude =
556                    vec![proposal_msg.proposer(), self.network.author];
557                self.network
558                    .broadcast(
559                        ConsensusMsg::ProposalMsg(Box::new(proposal_msg)),
560                        exclude,
561                    )
562                    .await;
563            }
564            Ok(())
565        } else {
566            bail!(
567                "Stale proposal {}, current round {}",
568                proposal_msg.proposal(),
569                self.round_state.current_round()
570            );
571        }
572    }
573
574    /// Sync to the sync info sending from peer if it has newer certificates, if
575    /// we have newer certificates and help_remote is set, send it back the
576    /// local sync info.
577    pub async fn sync_up(
578        &mut self, sync_info: &SyncInfo, author: Author, help_remote: bool,
579    ) -> anyhow::Result<()> {
580        let local_sync_info = self.block_store.sync_info();
581        if help_remote && local_sync_info.has_newer_certificates(&sync_info) {
582            counters::SYNC_INFO_MSGS_SENT_COUNT.inc();
583            diem_debug!(
584                self.new_log(LogEvent::HelpPeerSync).remote_peer(author),
585                "Remote peer has stale state {}, send it back {}",
586                sync_info,
587                local_sync_info,
588            );
589            self.network.send_sync_info(local_sync_info.clone(), author);
590        }
591        if sync_info.has_newer_certificates(&local_sync_info) {
592            diem_debug!(
593                self.new_log(LogEvent::SyncToPeer).remote_peer(author),
594                "Local state {} is stale than remote state {}",
595                local_sync_info,
596                sync_info
597            );
598            // Some information in SyncInfo is ahead of what we have locally.
599            // First verify the SyncInfo (didn't verify it in the yet).
600            sync_info
601                .verify(&self.epoch_state().verifier())
602                .map_err(|e| {
603                    diem_error!(
604                        SecurityEvent::InvalidSyncInfoMsg,
605                        sync_info = sync_info,
606                        remote_peer = author,
607                        error = ?e,
608                    );
609                    VerifyError::from(e)
610                })?;
611            /*
612            let result = self
613                .block_store
614                .add_certs(&sync_info, self.create_block_retriever(author))
615                .await;
616             */
617            // TODO(lpl): Ensure this does not cause OOM.
618            let mut retriever = self.create_block_retriever(author);
619            self.block_store
620                .insert_quorum_cert(
621                    &sync_info.highest_commit_cert(),
622                    &mut retriever,
623                )
624                .await?;
625            self.block_store
626                .insert_quorum_cert(
627                    &sync_info.highest_quorum_cert(),
628                    &mut retriever,
629                )
630                .await?;
631            if let Some(tc) = sync_info.highest_timeout_certificate() {
632                self.block_store
633                    .insert_timeout_certificate(Arc::new(tc.clone()))?;
634            }
635            self.process_certificates().await
636        } else {
637            Ok(())
638        }
639    }
640
641    /// This can only be used in `EpochManager.start_new_epoch`.
642    pub async fn sync_to_ledger_info(
643        &mut self, ledger_info: &LedgerInfoWithSignatures,
644        peer_id: AccountAddress,
645    ) -> Result<()> {
646        diem_debug!("sync_to_ledger_info: {:?}", ledger_info);
647        let mut retriever = self.create_block_retriever(peer_id);
648        if !self
649            .block_store
650            .block_exists(ledger_info.ledger_info().consensus_block_id())
651        {
652            let block_for_ledger_info = retriever
653                .retrieve_block_for_ledger_info(ledger_info)
654                .await?;
655            self.block_store
656                .insert_quorum_cert(
657                    block_for_ledger_info.quorum_cert(),
658                    &mut retriever,
659                )
660                .await?;
661            // `insert_quorum_cert` will wait for PoW to initialize if needed,
662            // so here we do not need to execute as catch_up_mode
663            // again.
664            self.block_store.execute_and_insert_block(
665                block_for_ledger_info,
666                true,
667                false,
668            )?;
669        };
670        self.block_store.commit(ledger_info.clone()).await?;
671        Ok(())
672    }
673
674    /// The function makes sure that it ensures the message_round equal to what
675    /// we have locally, brings the missing dependencies from the QC and
676    /// LedgerInfo of the given sync info and update the round_state with
677    /// the certificates if succeed. Returns Ok(true) if the sync succeeds
678    /// and the round matches so we can process further. Returns Ok(false)
679    /// if the message is stale. Returns Error in case sync mgr failed to
680    /// bring the missing dependencies. We'll try to help the remote if the
681    /// SyncInfo lags behind and the flag is set.
682    pub async fn ensure_round_and_sync_up(
683        &mut self, message_round: Round, sync_info: &SyncInfo, author: Author,
684        help_remote: bool,
685    ) -> anyhow::Result<bool> {
686        if message_round < self.round_state.current_round() {
687            return Ok(false);
688        }
689        self.sync_up(sync_info, author, help_remote).await?;
690        ensure!(
691            message_round == self.round_state.current_round(),
692            "After sync, round {} doesn't match local {}",
693            message_round,
694            self.round_state.current_round()
695        );
696        Ok(true)
697    }
698
699    /// Process the SyncInfo sent by peers to catch up to latest state.
700    pub async fn process_sync_info_msg(
701        &mut self, sync_info: SyncInfo, peer: Author,
702    ) -> anyhow::Result<()> {
703        fail_point!("consensus::process_sync_info_msg", |_| {
704            Err(anyhow::anyhow!("Injected error in process_sync_info_msg"))
705        });
706        diem_debug!(
707            self.new_log(LogEvent::ReceiveSyncInfo).remote_peer(peer),
708            "{}",
709            sync_info
710        );
711        // To avoid a ping-pong cycle between two peers that move forward
712        // together.
713        self.ensure_round_and_sync_up(
714            checked!((sync_info.highest_round()) + 1)?,
715            &sync_info,
716            peer,
717            false,
718        )
719        .await
720        .context("[RoundManager] Failed to process sync info msg")?;
721        Ok(())
722    }
723
724    pub async fn process_new_round_timeout(
725        &mut self, epoch_round: (u64, Round),
726    ) -> anyhow::Result<()> {
727        diem_debug!("process_new_round_timeout: round={:?}", epoch_round);
728        if epoch_round
729            != (self.epoch_state.epoch, self.round_state.current_round())
730        {
731            return Ok(());
732        }
733        let round = epoch_round.1;
734
735        match self
736            .round_state
737            .get_round_certificate(&self.epoch_state.verifier())
738        {
739            VoteReceptionResult::NewQuorumCertificate(qc) => {
740                self.new_qc_aggregated(
741                    qc.clone(),
742                    qc.ledger_info()
743                        .signatures()
744                        .keys()
745                        .next()
746                        .expect("qc formed")
747                        .clone(),
748                )
749                .await?;
750            }
751            VoteReceptionResult::NewTimeoutCertificate(tc) => {
752                self.new_tc_aggregated(tc).await?;
753            }
754            _ => {
755                // No certificate formed. This should not happen.
756                anyhow::bail!(
757                    "New round timeout without new certificate! round={}",
758                    round
759                );
760            }
761        }
762        Ok(())
763    }
764
765    /// The replica broadcasts a "timeout vote message", which includes the
766    /// round signature, which can be aggregated to a TimeoutCertificate.
767    /// The timeout vote message can be one of the following three options:
768    /// 1) In case a validator has previously voted in this round, it repeats
769    /// the same vote and sign a timeout.
770    /// 2) Otherwise vote for a NIL block and sign a timeout.
771    /// Note this function returns Err even if messages are broadcasted
772    /// successfully because timeout is considered as error. It only returns
773    /// Ok(()) when the timeout is stale.
774    pub async fn process_local_timeout(
775        &mut self, epoch_round: (u64, Round),
776    ) -> anyhow::Result<()> {
777        diem_debug!("process_local_timeout: round={:?}", epoch_round);
778        if epoch_round
779            != (self.epoch_state.epoch, self.round_state.current_round())
780        {
781            return Ok(());
782        }
783        let round = epoch_round.1;
784
785        if !self.round_state.process_local_timeout(epoch_round) {
786            return Ok(());
787        }
788
789        self.network
790            .broadcast(
791                ConsensusMsg::SyncInfo(Box::new(self.block_store.sync_info())),
792                vec![],
793            )
794            .await;
795
796        match self
797            .round_state
798            .get_round_certificate(&self.epoch_state.verifier())
799        {
800            VoteReceptionResult::NewQuorumCertificate(_)
801            | VoteReceptionResult::NewTimeoutCertificate(_) => {
802                // Certificate formed, so do not send timeout vote.
803                return Ok(());
804            }
805            _ => {
806                // No certificate formed, so enter normal timeout processing.
807            }
808        }
809
810        if !self.is_validator() {
811            return Ok(());
812        }
813
814        let (use_last_vote, mut timeout_vote) =
815            match self.round_state.vote_sent() {
816                Some(vote) if vote.vote_data().proposed().round() == round => {
817                    (true, vote)
818                }
819                _ => {
820                    // Didn't vote in this round yet, generate a backup vote
821                    let nil_block = self
822                        .proposal_generator
823                        .as_ref()
824                        .expect("checked in is_validator")
825                        .generate_nil_block(round)?;
826                    diem_debug!(
827                        self.new_log(LogEvent::VoteNIL),
828                        "Planning to vote for a NIL block {}",
829                        nil_block
830                    );
831                    counters::VOTE_NIL_COUNT.inc();
832                    let nil_vote = self.execute_and_vote(nil_block).await?;
833                    (false, nil_vote)
834                }
835            };
836
837        if !timeout_vote.is_timeout() {
838            let timeout = timeout_vote.timeout();
839            let signature = self
840                .safety_rules
841                .sign_timeout(&timeout)
842                .context("[RoundManager] SafetyRules signs timeout")?;
843            timeout_vote.add_timeout_signature(signature);
844        }
845
846        self.round_state.record_vote(timeout_vote.clone());
847        let timeout_vote_msg = ConsensusMsg::VoteMsg(Box::new(VoteMsg::new(
848            timeout_vote,
849            self.block_store.sync_info(),
850        )));
851        self.network.broadcast(timeout_vote_msg, vec![]).await;
852        diem_error!(
853            round = round,
854            voted = use_last_vote,
855            event = LogEvent::Timeout,
856        );
857        bail!("Round {} timeout, broadcast to all peers", round);
858    }
859
860    pub async fn process_proposal_timeout(
861        &mut self, epoch_round: (u64, Round),
862    ) -> anyhow::Result<()> {
863        diem_debug!("process_proposal_timeout: round={:?}", epoch_round);
864        if epoch_round
865            != (self.epoch_state.epoch, self.round_state.current_round())
866        {
867            return Ok(());
868        }
869        let round = epoch_round.1;
870
871        if let Some(proposal) = self.proposer_election.choose_proposal_to_vote()
872        {
873            if self.is_validator() {
874                // Vote for proposal
875                let vote = self
876                    .execute_and_vote(proposal)
877                    .await
878                    .context("[RoundManager] Process proposal")?;
879                diem_debug!(self.new_log(LogEvent::Vote), "{}", vote);
880
881                self.round_state.record_vote(vote.clone());
882                let vote_msg = VoteMsg::new(vote, self.block_store.sync_info());
883                self.network
884                    .broadcast(
885                        ConsensusMsg::VoteMsg(Box::new(vote_msg)),
886                        vec![],
887                    )
888                    .await;
889                Ok(())
890            } else {
891                // Not a validator, just execute the block and wait for votes.
892                self.block_store
893                    .execute_and_insert_block(proposal, false, false)
894                    .context(
895                        "[RoundManager] Failed to execute_and_insert the block",
896                    )?;
897                Ok(())
898            }
899        } else {
900            debug!("No proposal to vote: round={}", round);
901            // No proposal to vote. Send Timeout earlier.
902            self.process_local_timeout(epoch_round).await
903        }
904    }
905
906    /// This function is called only after all the dependencies of the given QC
907    /// have been retrieved.
908    async fn process_certificates(&mut self) -> anyhow::Result<()> {
909        let sync_info = self.block_store.sync_info();
910        if let Some(new_round_event) =
911            self.round_state.process_certificates(sync_info)
912        {
913            self.process_new_round_event(new_round_event).await?;
914        }
915        Ok(())
916    }
917
918    /// This function processes a proposal for the current round:
919    /// 1. Filter if it's proposed by valid proposer.
920    /// 2. Execute and add it to a block store.
921    /// 3. Try to vote for it following the safety rules.
922    /// 4. In case a validator chooses to vote, send the vote to the
923    /// representatives at the next round.
924    ///
925    /// Return `Ok(true)` if the block should be relayed.
926    async fn process_proposal(&mut self, proposal: Block) -> Result<bool> {
927        let author = proposal
928            .author()
929            .expect("Proposal should be verified having an author");
930
931        diem_info!(
932            self.new_log(LogEvent::ReceiveProposal).remote_peer(author),
933            block_hash = proposal.id(),
934            block_parent_hash = proposal.quorum_cert().certified_block().id(),
935        );
936
937        ensure!(
938            self.proposer_election.is_valid_proposal(&proposal),
939            "[RoundManager] Proposer {} for block {} is not a valid proposer for this round",
940            author,
941            proposal,
942        );
943
944        let block_time_since_epoch =
945            Duration::from_micros(proposal.timestamp_usecs());
946
947        ensure!(
948            block_time_since_epoch < self.round_state.current_round_deadline(),
949            "[RoundManager] Waiting until proposal block timestamp usecs {:?} \
950            would exceed the round duration {:?}, hence will not vote for this round",
951            block_time_since_epoch,
952            self.round_state.current_round_deadline(),
953        );
954
955        observe_block(proposal.timestamp_usecs(), BlockStage::SYNCED);
956
957        if self.proposer_election.is_random_election() {
958            if self
959                .proposer_election
960                .receive_proposal_candidate(&proposal)?
961            {
962                self.block_store.execute_and_insert_block(
963                    proposal.clone(),
964                    false,
965                    false,
966                )?;
967                self.proposer_election.set_proposal_candidate(proposal);
968                Ok(true)
969            } else {
970                // This proposal will not be chosen to vote, so we do not need
971                // to relay. A proposal received for several
972                // times also enters this branch because
973                // the vrf_output is the same.
974                Ok(false)
975            }
976        } else {
977            bail!("unsupported election rules")
978        }
979    }
980
981    /// The function generates a VoteMsg for a given proposed_block:
982    /// * first execute the block and add it to the block store
983    /// * then verify the voting rules
984    /// * save the updated state to consensus DB
985    /// * return a VoteMsg with the LedgerInfo to be committed in case the vote
986    ///   gathers QC.
987    async fn execute_and_vote(
988        &mut self, proposed_block: Block,
989    ) -> anyhow::Result<Vote> {
990        let executed_block = self
991            .block_store
992            .execute_and_insert_block(proposed_block, false, false)
993            .context("[RoundManager] Failed to execute_and_insert the block")?;
994        // notify mempool about failed txn
995        let compute_result = executed_block.compute_result();
996        if let Err(e) = self
997            .txn_manager
998            .notify(executed_block.block(), compute_result)
999            .await
1000        {
1001            diem_error!(
1002                error = ?e, "[RoundManager] Failed to notify mempool of rejected txns",
1003            );
1004        }
1005
1006        // Short circuit if already voted.
1007        ensure!(
1008            self.round_state.vote_sent().is_none(),
1009            "[RoundManager] Already vote on this round {}",
1010            self.round_state.current_round()
1011        );
1012
1013        ensure!(
1014            !self.sync_only,
1015            "[RoundManager] sync_only flag is set, stop voting"
1016        );
1017
1018        let maybe_signed_vote_proposal =
1019            executed_block.maybe_signed_vote_proposal();
1020        let vote = self
1021            .safety_rules
1022            .construct_and_sign_vote(&maybe_signed_vote_proposal)
1023            .context(format!(
1024                "[RoundManager] SafetyRules {}Rejected",
1025                // TODO(lpl): Remove color because `termion` does not support
1026                // windows. Fg(Red),
1027                // Fg(Reset),
1028                executed_block.block()
1029            ))?;
1030        observe_block(
1031            executed_block.block().timestamp_usecs(),
1032            BlockStage::VOTED,
1033        );
1034
1035        self.storage
1036            .save_vote(&vote)
1037            .context("[RoundManager] Fail to persist last vote")?;
1038
1039        Ok(vote)
1040    }
1041
1042    /// Upon new vote:
1043    /// 1. Ensures we're processing the vote from the same round as local round
1044    /// 2. Filter out votes for rounds that should not be processed by this
1045    /// validator (to avoid potential attacks).
1046    /// 2. Add the vote to the pending votes and check whether it finishes a QC.
1047    /// 3. Once the QC/TC successfully formed, notify the RoundState.
1048    pub async fn process_vote_msg(
1049        &mut self, vote_msg: VoteMsg,
1050    ) -> anyhow::Result<()> {
1051        fail_point!("consensus::process_vote_msg", |_| {
1052            Err(anyhow::anyhow!("Injected error in process_vote_msg"))
1053        });
1054        if self
1055            .ensure_round_and_sync_up(
1056                vote_msg.vote().vote_data().proposed().round(),
1057                vote_msg.sync_info(),
1058                vote_msg.vote().author(),
1059                true,
1060            )
1061            .await
1062            .context("[RoundManager] Stop processing vote")?
1063        {
1064            let relay = self
1065                .process_vote(vote_msg.vote())
1066                .await
1067                .context("[RoundManager] Add a new vote")?;
1068            if relay {
1069                let exclude =
1070                    vec![vote_msg.vote().author(), self.network.author];
1071                self.network
1072                    .broadcast(
1073                        ConsensusMsg::VoteMsg(Box::new(vote_msg)),
1074                        exclude,
1075                    )
1076                    .await;
1077            }
1078        }
1079        Ok(())
1080    }
1081
1082    /// Add a vote to the pending votes.
1083    /// If a new QC / TC is formed then
1084    /// 1) fetch missing dependencies if required, and then
1085    /// 2) call process_certificates(), which will start a new round in return.
1086    ///
1087    /// Return `Ok(true)` if the vote should be relayed.
1088    async fn process_vote(&mut self, vote: &Vote) -> anyhow::Result<bool> {
1089        diem_info!(
1090            self.new_log(LogEvent::ReceiveVote)
1091                .remote_peer(vote.author()),
1092            vote = %vote,
1093            vote_epoch = vote.vote_data().proposed().epoch(),
1094            vote_round = vote.vote_data().proposed().round(),
1095            vote_id = vote.vote_data().proposed().id(),
1096            vote_state = vote.vote_data().proposed().executed_state_id(),
1097        );
1098
1099        // Add the vote and check whether it completes a new QC or a TC
1100        let mut relay = true;
1101        match self
1102            .round_state
1103            .insert_vote(vote, &self.epoch_state.verifier())
1104        {
1105            VoteReceptionResult::NewQuorumCertificate(_)
1106            | VoteReceptionResult::NewTimeoutCertificate(_) => {
1107                // Wait for extra time to gather more votes before entering the
1108                // next round.
1109                self.round_state
1110                    .setup_new_round_timeout(self.epoch_state.epoch);
1111            }
1112            VoteReceptionResult::VoteAdded(_) => {}
1113            VoteReceptionResult::DuplicateVote => {
1114                // Do not relay duplicate votes as we should have relayed it
1115                // before.
1116                relay = false;
1117            }
1118            VoteReceptionResult::EquivocateVote((vote1, vote2)) => {
1119                // Attack detected!
1120                // Construct a transaction to dispute this signer.
1121                // TODO(lpl): Allow non-committee member to dispute?
1122                match &self.proposal_generator {
1123                    Some(proposal_generator) => {
1124                        ensure!(
1125                            vote1.author() == vote2.author(),
1126                            "incorrect author"
1127                        );
1128                        ensure!(
1129                            vote1.vote_data().proposed().round()
1130                                == vote2.vote_data().proposed().round(),
1131                            "incorrect round"
1132                        );
1133                        diem_warn!("Find Equivocate Vote!!! author={}, vote1={:?}, vote2={:?}", vote.author(), vote1, vote2);
1134                        let dispute_payload = DisputePayload {
1135                            address: vote1.author(),
1136                            bls_pub_key: self
1137                                .epoch_state
1138                                .verifier()
1139                                .get_public_key(&vote1.author())
1140                                .expect("checked in verify"),
1141                            vrf_pub_key: self
1142                                .epoch_state
1143                                .verifier()
1144                                .get_vrf_public_key(&vote1.author())
1145                                .expect("checked in verify")
1146                                .unwrap(),
1147                            conflicting_votes: ConflictSignature::Vote((
1148                                bcs::to_bytes(&vote1).expect("encoding error"),
1149                                bcs::to_bytes(&vote2).expect("encoding error"),
1150                            )),
1151                        };
1152                        let raw_tx = RawTransaction::new_dispute(
1153                            proposal_generator.author(),
1154                            dispute_payload,
1155                        );
1156                        let signed_tx = raw_tx
1157                            .sign(&proposal_generator.private_key)?
1158                            .into_inner();
1159                        // TODO(lpl): Track disputed nodes to avoid sending
1160                        // multiple dispute, and retry if needed?
1161                        let (tx, rx) = oneshot::channel();
1162                        self.tx_sender.send((signed_tx, tx)).await?;
1163                        rx.await??;
1164                    }
1165                    None => {}
1166                }
1167                bail!("EquivocateVote!")
1168            }
1169            // Return error so that duplicate or invalid votes will not be
1170            // broadcast to others.
1171            r => bail!("vote not added with result {:?}", r),
1172        }
1173        Ok(relay)
1174    }
1175
1176    async fn new_qc_aggregated(
1177        &mut self, qc: Arc<QuorumCert>, preferred_peer: Author,
1178    ) -> anyhow::Result<()> {
1179        observe_block(
1180            qc.certified_block().timestamp_usecs(),
1181            BlockStage::QC_AGGREGATED,
1182        );
1183        let result = self
1184            .block_store
1185            .insert_quorum_cert(
1186                &qc,
1187                &mut self.create_block_retriever(preferred_peer),
1188            )
1189            .await
1190            .context("[RoundManager] Failed to process a newly aggregated QC");
1191        self.process_certificates().await?;
1192        result
1193    }
1194
1195    async fn new_tc_aggregated(
1196        &mut self, tc: Arc<TimeoutCertificate>,
1197    ) -> anyhow::Result<()> {
1198        let result = self
1199            .block_store
1200            .insert_timeout_certificate(tc.clone())
1201            .context("[RoundManager] Failed to process a newly aggregated TC");
1202        self.process_certificates().await?;
1203        result
1204    }
1205
1206    /// Retrieve a n chained blocks from the block store starting from
1207    /// an initial parent id, returning with <n (as many as possible) if
1208    /// id or its ancestors can not be found.
1209    ///
1210    /// The current version of the function is not really async, but keeping it
1211    /// this way for future possible changes.
1212    pub async fn process_block_retrieval(
1213        &self, request: IncomingBlockRetrievalRequest,
1214    ) -> anyhow::Result<()> {
1215        fail_point!("consensus::process_block_retrieval", |_| {
1216            Err(anyhow::anyhow!("Injected error in process_block_retrieval"))
1217        });
1218        let mut blocks = vec![];
1219        let mut status = BlockRetrievalStatus::Succeeded;
1220        let mut id = request.req.block_id();
1221        while (blocks.len() as u64) < request.req.num_blocks() {
1222            if let Some(executed_block) = self.block_store.get_block(id) {
1223                if executed_block.block().is_genesis_block() {
1224                    break;
1225                }
1226                id = executed_block.parent_id();
1227                blocks.push(executed_block.block().clone());
1228            } else if let Ok(Some(block)) =
1229                self.block_store.get_ledger_block(&id)
1230            {
1231                if block.is_genesis_block() {
1232                    break;
1233                }
1234                id = block.parent_id();
1235                blocks.push(block);
1236            } else {
1237                // TODO(lpl): This error may be needed in the future.
1238                // status = BlockRetrievalStatus::NotEnoughBlocks;
1239                break;
1240            }
1241        }
1242
1243        if blocks.is_empty() {
1244            status = BlockRetrievalStatus::IdNotFound;
1245        }
1246
1247        let response = BlockRetrievalRpcResponse {
1248            request_id: request.request_id,
1249            response: BlockRetrievalResponse::new(status, blocks),
1250        };
1251        self.network
1252            .network_sender()
1253            .send_message_with_peer_id(&request.peer_id, &response)?;
1254        Ok(())
1255    }
1256
1257    /// To jump start new round with the current certificates we have.
1258    pub async fn start(&mut self, last_vote_sent: Option<Vote>) {
1259        let new_round_event = self
1260            .round_state
1261            .process_certificates(self.block_store.sync_info())
1262            .expect(
1263                "Can not jump start a round_state from existing certificates.",
1264            );
1265        if let Some(vote) = last_vote_sent {
1266            self.round_state.record_vote(vote);
1267        }
1268        if let Err(e) = self.process_new_round_event(new_round_event).await {
1269            diem_error!(error = ?e, "[RoundManager] Error during start");
1270        }
1271    }
1272
1273    /// Inspect the current consensus state.
1274    #[cfg(test)]
1275    #[allow(unused)]
1276    pub fn consensus_state(&mut self) -> ConsensusState {
1277        self.safety_rules.consensus_state().unwrap()
1278    }
1279
1280    #[cfg(test)]
1281    #[allow(unused)]
1282    pub fn set_safety_rules(&mut self, safety_rules: MetricsSafetyRules) {
1283        self.safety_rules = safety_rules
1284    }
1285
1286    pub fn epoch_state(&self) -> &EpochState { &self.epoch_state }
1287
1288    pub fn round_state(&self) -> &RoundState { &self.round_state }
1289
1290    fn new_log(&self, event: LogEvent) -> LogSchema {
1291        LogSchema::new(event)
1292            .round(self.round_state.current_round())
1293            .epoch(self.epoch_state.epoch)
1294    }
1295
1296    fn is_validator(&self) -> bool {
1297        let r = self.proposal_generator.is_some();
1298        diem_debug!("Check validator: r={} is_voting={}", r, self.is_voting);
1299        r && self.is_voting
1300    }
1301
1302    /// Return true for blocks that we need to process
1303    pub fn filter_proposal(&self, p: &ProposalMsg) -> bool {
1304        self.proposer_election
1305            .receive_proposal_candidate(p.proposal())
1306            .unwrap_or(false)
1307    }
1308
1309    /// Return true for votes that we need to process
1310    pub fn filter_vote(&self, v: &VoteMsg) -> bool {
1311        !self.round_state.vote_received(v.vote())
1312    }
1313}
1314
1315/// The functions used in tests to construct attack cases
1316impl RoundManager {
1317    /// Force the node to vote for a proposal without changing its consensus
1318    /// state. The node will still vote for the correct proposal
1319    /// independently if that's not disabled.
1320    pub async fn force_vote_proposal(
1321        &mut self, block_id: HashValue, author: Author,
1322        private_key: &ConsensusPrivateKey,
1323    ) -> Result<()> {
1324        let proposal = self
1325            .block_store
1326            .get_block(block_id)
1327            .ok_or(anyhow::anyhow!("force sign block not received"))?;
1328        let vote_proposal = proposal.maybe_signed_vote_proposal().vote_proposal;
1329        let vote_data =
1330            SafetyRules::extension_check(&vote_proposal).map_err(|e| {
1331                anyhow::anyhow!("extension_check error: err={:?}", e)
1332            })?;
1333        let ledger_info = SafetyRules::construct_ledger_info(
1334            vote_proposal.block(),
1335            vote_data.hash(),
1336        )
1337        .map_err(|e| anyhow::anyhow!("extension_check error: err={:?}", e))?;
1338        let signature = private_key.sign(&ledger_info);
1339        let vote =
1340            Vote::new_with_signature(vote_data, author, ledger_info, signature);
1341        let vote_msg = VoteMsg::new(vote, self.block_store.sync_info());
1342        diem_debug!("force_vote_proposal: broadcast {:?}", vote_msg);
1343        self.network
1344            .broadcast(ConsensusMsg::VoteMsg(Box::new(vote_msg)), vec![])
1345            .await;
1346        Ok(())
1347    }
1348
1349    /// Force the node to propose a block without changing its consensus
1350    /// state. The node will still propose a valid block independently if that's
1351    /// not disabled.
1352    pub async fn force_propose(
1353        &mut self, round: Round, parent_block_id: HashValue,
1354        payload: Vec<TransactionPayload>, private_key: &ConsensusPrivateKey,
1355    ) -> Result<()> {
1356        let parent_qc = self
1357            .block_store
1358            .get_quorum_cert_for_block(parent_block_id)
1359            .ok_or(anyhow::anyhow!(
1360                "no QC for parent: {:?}",
1361                parent_block_id
1362            ))?;
1363        let block_data = self
1364            .proposal_generator
1365            .as_ref()
1366            .ok_or(anyhow::anyhow!("proposal generator is None"))?
1367            .force_propose(round, parent_qc, payload)?;
1368        let signature = private_key.sign(&block_data);
1369        let mut signed_proposal =
1370            Block::new_proposal_from_block_data_and_signature(
1371                block_data, signature, None,
1372            );
1373        // TODO: This vrf_output is incorrect if we want to propose a block in
1374        // another epoch.
1375        signed_proposal.set_vrf_nonce_and_proof(
1376            self.proposer_election
1377                .gen_vrf_nonce_and_proof(signed_proposal.block_data())
1378                .ok_or(anyhow::anyhow!(
1379                    "The proposer should not propose in this round"
1380                ))?,
1381        );
1382        // TODO: The sync_info here may not be consistent with
1383        // `signed_proposal`.
1384        let proposal_msg =
1385            ProposalMsg::new(signed_proposal, self.block_store.sync_info());
1386        diem_debug!("force_propose: broadcast {:?}", proposal_msg);
1387        self.network
1388            .broadcast(
1389                ConsensusMsg::ProposalMsg(Box::new(proposal_msg)),
1390                vec![],
1391            )
1392            .await;
1393        Ok(())
1394    }
1395
1396    pub async fn force_sign_pivot_decision(
1397        &mut self, pivot_decision: PivotBlockDecision,
1398    ) -> anyhow::Result<()> {
1399        let proposal_generator = self.proposal_generator.as_ref().ok_or(
1400            anyhow::anyhow!("Non-validator cannot sign pivot decision"),
1401        )?;
1402        diem_info!("force_sign_pivot_decision: {:?}", pivot_decision);
1403        // It's allowed for a node to sign conflict pivot decision,
1404        // so we do not need to persist this signing event.
1405        let raw_tx = RawTransaction::new_pivot_decision(
1406            proposal_generator.author(),
1407            pivot_decision,
1408            self.chain_id,
1409        );
1410        let signed_tx =
1411            raw_tx.sign(&proposal_generator.private_key)?.into_inner();
1412        let (tx, rx) = oneshot::channel();
1413        self.tx_sender.send((signed_tx, tx)).await?;
1414        // TODO(lpl): Check if we want to wait here.
1415        rx.await??;
1416        diem_debug!("force_sign_pivot_decision sends");
1417        Ok(())
1418    }
1419
1420    pub fn get_chosen_proposal(&self) -> anyhow::Result<Option<Block>> {
1421        // This takes out the candidate, so we need to insert it back if it's
1422        // Some.
1423        let chosen = self.proposer_election.choose_proposal_to_vote();
1424        Ok(chosen)
1425    }
1426
1427    pub fn start_voting(&mut self, initialize: bool) -> anyhow::Result<()> {
1428        if !initialize {
1429            self.safety_rules
1430                .start_voting(initialize)
1431                .map_err(anyhow::Error::from)?;
1432        }
1433        self.is_voting = true;
1434        Ok(())
1435    }
1436
1437    pub fn stop_voting(&mut self) -> anyhow::Result<()> {
1438        self.safety_rules
1439            .stop_voting()
1440            .map_err(anyhow::Error::from)?;
1441        self.is_voting = false;
1442        Ok(())
1443    }
1444}