cfxcore/pos/consensus/
network.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8use std::{mem::Discriminant, time::Duration};
9
10use anyhow::{anyhow, bail, ensure, format_err};
11use serde::{Deserialize, Serialize};
12
13use channel::{self, diem_channel, message_queues::QueueStyle};
14use consensus_types::{
15    block_retrieval::{BlockRetrievalRequest, BlockRetrievalResponse},
16    common::Author,
17    epoch_retrieval::EpochRetrievalRequest,
18    proposal_msg::ProposalMsg,
19    sync_info::SyncInfo,
20    vote_msg::VoteMsg,
21};
22use diem_logger::prelude::*;
23use diem_metrics::monitor;
24use diem_types::{
25    account_address::AccountAddress, epoch_change::EpochChangeProof,
26    validator_verifier::ValidatorVerifier,
27};
28use network::node_table::NodeId;
29
30use crate::{
31    message::RequestId,
32    pos::protocol::{
33        message::{
34            block_retrieval::BlockRetrievalRpcRequest,
35            block_retrieval_response::BlockRetrievalRpcResponse,
36        },
37        network_sender::NetworkSender,
38    },
39};
40
41use super::counters;
42
43/// Network type for consensus
44#[derive(Clone, Debug, Deserialize, Serialize)]
45pub enum ConsensusMsg {
46    /// RPC to get a chain of block of the given length starting from the given
47    /// block id.
48    BlockRetrievalRequest(Box<BlockRetrievalRequest>),
49    /// Carries the returned blocks and the retrieval status.
50    BlockRetrievalResponse(Box<BlockRetrievalResponse>),
51    /// Request to get a EpochChangeProof from current_epoch to target_epoch
52    EpochRetrievalRequest(Box<EpochRetrievalRequest>),
53    /// ProposalMsg contains the required information for the proposer election
54    /// protocol to make its choice (typically depends on round and
55    /// proposer info).
56    ProposalMsg(Box<ProposalMsg>),
57    /// This struct describes basic synchronization metadata.
58    SyncInfo(Box<SyncInfo>),
59    /// A vector of LedgerInfo with contiguous increasing epoch numbers to
60    /// prove a sequence of epoch changes from the first LedgerInfo's
61    /// epoch.
62    EpochChangeProof(Box<EpochChangeProof>),
63    /// VoteMsg is the struct that is ultimately sent by the voter in response
64    /// for receiving a proposal.
65    VoteMsg(Box<VoteMsg>),
66}
67
68/// The block retrieval request is used internally for implementing RPC: the
69/// callback is executed for carrying the response
70#[derive(Debug)]
71pub struct IncomingBlockRetrievalRequest {
72    pub req: BlockRetrievalRequest,
73    pub peer_id: NodeId,
74    pub request_id: RequestId,
75}
76
77/// Just a convenience struct to keep all the network proxy receiving queues in
78/// one place. Will be returned by the NetworkTask upon startup.
79pub struct NetworkReceivers {
80    /// Provide a LIFO buffer for each (Author, MessageType) key
81    pub consensus_messages: diem_channel::Receiver<
82        (AccountAddress, Discriminant<ConsensusMsg>),
83        (AccountAddress, ConsensusMsg),
84    >,
85    pub block_retrieval:
86        diem_channel::Receiver<AccountAddress, IncomingBlockRetrievalRequest>,
87}
88
89/// Implements the actual networking support for all consensus messaging.
90#[derive(Clone)]
91pub struct ConsensusNetworkSender {
92    pub author: Author,
93    network_sender: NetworkSender,
94    validators: ValidatorVerifier,
95}
96
97impl ConsensusNetworkSender {
98    pub fn new(
99        author: Author, network_sender: NetworkSender,
100        validators: ValidatorVerifier,
101    ) -> Self {
102        ConsensusNetworkSender {
103            author,
104            network_sender,
105            validators,
106        }
107    }
108
109    pub fn network_sender(&self) -> &NetworkSender { &self.network_sender }
110
111    /// Tries to retrieve num of blocks backwards starting from id from the
112    /// given peer: the function returns a future that is fulfilled with
113    /// BlockRetrievalResponse.
114    pub async fn request_block(
115        &mut self, retrieval_request: BlockRetrievalRequest, from: Author,
116        timeout: Duration,
117    ) -> anyhow::Result<BlockRetrievalResponse> {
118        ensure!(from != self.author, "Retrieve block from self");
119
120        let peer_hash = self
121            .network_sender
122            .protocol_handler
123            .pos_peer_mapping
124            .read()
125            .get(&from)
126            .cloned()
127            .ok_or(anyhow!(
128                "request_block: recipient {:?} has been removed",
129                from
130            ))?;
131        let peer_state =
132            self.network_sender.protocol_handler.peers.get(&peer_hash);
133        if peer_state.is_none() {
134            bail!("peer not found");
135        }
136        let peer_state = peer_state.unwrap();
137        let peer_id = peer_state.read().get_id().clone();
138
139        let request = BlockRetrievalRpcRequest {
140            request_id: 0,
141            request: retrieval_request.clone(),
142            is_empty: false,
143            response_tx: None,
144            timeout,
145        };
146
147        let rpc_response = monitor!(
148            "block_retrieval",
149            self.network_sender
150                .send_rpc(Some(peer_id), Box::new(request))
151                .await
152                .map_err(|_| { format_err!("rpc call failed") })?
153        );
154        let response = match rpc_response
155            .as_any()
156            .downcast_ref::<BlockRetrievalRpcResponse>()
157        {
158            Some(r) => r.clone(),
159            None => {
160                bail!("response downcast failed");
161            }
162        };
163
164        response
165            .response
166            .verify(
167                retrieval_request.block_id(),
168                retrieval_request.num_blocks(),
169                &self.validators,
170            )
171            .map_err(|e| {
172                diem_error!(
173                    request_block_response = response,
174                    error = ?e,
175                );
176                e
177            })?;
178
179        Ok(response.response)
180    }
181
182    /// Tries to send the given msg to all the participants.
183    ///
184    /// The future is fulfilled as soon as the message put into the mpsc channel
185    /// to network internal(to provide back pressure), it does not indicate
186    /// the message is delivered or sent out. It does not give indication
187    /// about when the message is delivered to the recipients, as well as
188    /// there is no indication about the network failures.
189    pub async fn broadcast(
190        &mut self, msg: ConsensusMsg, exclude: Vec<AccountAddress>,
191    ) {
192        if !exclude.contains(&self.author) {
193            if let Err(err) = self
194                .network_sender
195                .send_self_msg(self.author, msg.clone())
196                .await
197            {
198                diem_error!("Error broadcasting to self: {:?}", err);
199            }
200        }
201
202        /*
203        // Get the list of validators excluding our own account address. Note
204        // the ordering is not important in this case.
205        let self_author = self.author;
206        let other_validators = self
207            .validators
208            .get_ordered_account_addresses_iter()
209            .filter(|author| author != &self_author);
210        let mut public_keys = vec![];
211        for account in other_validators {
212            public_keys.push(self.validators.get_public_key(&account).unwrap());
213        }
214
215        // Broadcast message over direct-send to all other validators.
216        if let Err(err) = self
217            .network_sender
218            .send_to_many(public_keys.into_iter(), &msg)
219        {
220            diem_error!(error = ?err, "Error broadcasting message");
221        }
222         */
223        // TODO(lpl): It may be sufficient to broadcast some messages to only
224        // validators.
225        if let Err(err) = self.network_sender.send_to_others(&msg, &exclude) {
226            diem_error!(error = ?err, "Error broadcasting message");
227        }
228    }
229
230    // This is unused because we always broadcast votes now.
231    // It may be needed when non-voter nodes do not receive votes anymore.
232    #[allow(unused)]
233    /// Sends the vote to the chosen recipients (typically that would be the
234    /// recipients that we believe could serve as proposers in the next
235    /// round). The recipients on the receiving end are going to be notified
236    /// about a new vote in the vote queue.
237    ///
238    /// The future is fulfilled as soon as the message put into the mpsc channel
239    /// to network internal(to provide back pressure), it does not indicate
240    /// the message is delivered or sent out. It does not give indication
241    /// about when the message is delivered to the recipients, as well as
242    /// there is no indication about the network failures.
243    pub async fn send_vote(&self, vote_msg: VoteMsg, recipients: Vec<Author>) {
244        let mut network_sender = self.network_sender.clone();
245        let msg = ConsensusMsg::VoteMsg(Box::new(vote_msg));
246        for peer in recipients {
247            if self.author == peer {
248                if let Err(err) =
249                    network_sender.send_self_msg(self.author, msg.clone()).await
250                {
251                    diem_error!(error = ?err, "Error delivering a self vote");
252                }
253                continue;
254            }
255            if let Err(e) = network_sender.send_to(peer, &msg) {
256                diem_error!(
257                    remote_peer = peer,
258                    error = ?e, "Failed to send a vote to peer",
259                );
260            }
261        }
262    }
263
264    /// Sends the given sync info to the given author.
265    /// The future is fulfilled as soon as the message is added to the internal
266    /// network channel (does not indicate whether the message is delivered
267    /// or sent out).
268    pub fn send_sync_info(&self, sync_info: SyncInfo, recipient: Author) {
269        let msg = ConsensusMsg::SyncInfo(Box::new(sync_info));
270        let mut network_sender = self.network_sender.clone();
271        if let Err(e) = network_sender.send_to(recipient, &msg) {
272            diem_warn!(
273                remote_peer = recipient,
274                error = "Failed to send a sync info msg to peer {:?}",
275                "{:?}",
276                e
277            );
278        }
279    }
280}
281
282/// Consensus network task
283pub struct NetworkTask {
284    /// consensus message sender
285    pub consensus_messages_tx: diem_channel::Sender<
286        (AccountAddress, Discriminant<ConsensusMsg>),
287        (AccountAddress, ConsensusMsg),
288    >,
289    /// block retrieval message sender
290    pub block_retrieval_tx:
291        diem_channel::Sender<AccountAddress, IncomingBlockRetrievalRequest>,
292}
293
294impl NetworkTask {
295    /// Establishes the initial connections with the peers and returns the
296    /// receivers.
297    pub fn new() -> (NetworkTask, NetworkReceivers) {
298        let (consensus_messages_tx, consensus_messages) = diem_channel::new(
299            QueueStyle::LIFO,
300            1,
301            Some(&counters::CONSENSUS_CHANNEL_MSGS),
302        );
303        let (block_retrieval_tx, block_retrieval) = diem_channel::new(
304            QueueStyle::LIFO,
305            1,
306            Some(&counters::BLOCK_RETRIEVAL_CHANNEL_MSGS),
307        );
308        (
309            NetworkTask {
310                consensus_messages_tx,
311                block_retrieval_tx,
312            },
313            NetworkReceivers {
314                consensus_messages,
315                block_retrieval,
316            },
317        )
318    }
319
320    /// start
321    pub async fn start(self) {}
322}