cfxcore/pos/consensus/
network.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8use std::{mem::Discriminant, time::Duration};
9
10use anyhow::{anyhow, bail, ensure, format_err};
11use serde::{Deserialize, Serialize};
12
13use channel::{diem_channel, message_queues::QueueStyle};
14use consensus_types::{
15    block_retrieval::{BlockRetrievalRequest, BlockRetrievalResponse},
16    common::Author,
17    epoch_retrieval::EpochRetrievalRequest,
18    proposal_msg::ProposalMsg,
19    sync_info::SyncInfo,
20    vote_msg::VoteMsg,
21};
22use diem_logger::prelude::*;
23use diem_types::{
24    account_address::AccountAddress, epoch_change::EpochChangeProof,
25    validator_verifier::ValidatorVerifier,
26};
27use network::node_table::NodeId;
28
29use crate::{
30    message::RequestId,
31    pos::protocol::{
32        message::{
33            block_retrieval::BlockRetrievalRpcRequest,
34            block_retrieval_response::BlockRetrievalRpcResponse,
35        },
36        network_sender::NetworkSender,
37    },
38};
39
40/// Network type for consensus
41#[derive(Clone, Debug, Deserialize, Serialize)]
42pub enum ConsensusMsg {
43    /// RPC to get a chain of block of the given length starting from the given
44    /// block id.
45    BlockRetrievalRequest(Box<BlockRetrievalRequest>),
46    /// Carries the returned blocks and the retrieval status.
47    BlockRetrievalResponse(Box<BlockRetrievalResponse>),
48    /// Request to get a EpochChangeProof from current_epoch to target_epoch
49    EpochRetrievalRequest(Box<EpochRetrievalRequest>),
50    /// ProposalMsg contains the required information for the proposer election
51    /// protocol to make its choice (typically depends on round and
52    /// proposer info).
53    ProposalMsg(Box<ProposalMsg>),
54    /// This struct describes basic synchronization metadata.
55    SyncInfo(Box<SyncInfo>),
56    /// A vector of LedgerInfo with contiguous increasing epoch numbers to
57    /// prove a sequence of epoch changes from the first LedgerInfo's
58    /// epoch.
59    EpochChangeProof(Box<EpochChangeProof>),
60    /// VoteMsg is the struct that is ultimately sent by the voter in response
61    /// for receiving a proposal.
62    VoteMsg(Box<VoteMsg>),
63}
64
65/// The block retrieval request is used internally for implementing RPC: the
66/// callback is executed for carrying the response
67#[derive(Debug)]
68pub struct IncomingBlockRetrievalRequest {
69    pub req: BlockRetrievalRequest,
70    pub peer_id: NodeId,
71    pub request_id: RequestId,
72}
73
74/// Just a convenience struct to keep all the network proxy receiving queues in
75/// one place. Will be returned by the NetworkTask upon startup.
76pub struct NetworkReceivers {
77    /// Provide a LIFO buffer for each (Author, MessageType) key
78    pub consensus_messages: diem_channel::Receiver<
79        (AccountAddress, Discriminant<ConsensusMsg>),
80        (AccountAddress, ConsensusMsg),
81    >,
82    pub block_retrieval:
83        diem_channel::Receiver<AccountAddress, IncomingBlockRetrievalRequest>,
84}
85
86/// Implements the actual networking support for all consensus messaging.
87#[derive(Clone)]
88pub struct ConsensusNetworkSender {
89    pub author: Author,
90    network_sender: NetworkSender,
91    validators: ValidatorVerifier,
92}
93
94impl ConsensusNetworkSender {
95    pub fn new(
96        author: Author, network_sender: NetworkSender,
97        validators: ValidatorVerifier,
98    ) -> Self {
99        ConsensusNetworkSender {
100            author,
101            network_sender,
102            validators,
103        }
104    }
105
106    pub fn network_sender(&self) -> &NetworkSender { &self.network_sender }
107
108    /// Tries to retrieve num of blocks backwards starting from id from the
109    /// given peer: the function returns a future that is fulfilled with
110    /// BlockRetrievalResponse.
111    pub async fn request_block(
112        &mut self, retrieval_request: BlockRetrievalRequest, from: Author,
113        timeout: Duration,
114    ) -> anyhow::Result<BlockRetrievalResponse> {
115        ensure!(from != self.author, "Retrieve block from self");
116
117        let peer_hash = self
118            .network_sender
119            .protocol_handler
120            .pos_peer_mapping
121            .read()
122            .get(&from)
123            .cloned()
124            .ok_or(anyhow!(
125                "request_block: recipient {:?} has been removed",
126                from
127            ))?;
128        let peer_state =
129            self.network_sender.protocol_handler.peers.get(&peer_hash);
130        if peer_state.is_none() {
131            bail!("peer not found");
132        }
133        let peer_state = peer_state.unwrap();
134        let peer_id = peer_state.read().get_id().clone();
135
136        let request = BlockRetrievalRpcRequest {
137            request_id: 0,
138            request: retrieval_request.clone(),
139            is_empty: false,
140            response_tx: None,
141            timeout,
142        };
143
144        let rpc_response = self
145            .network_sender
146            .send_rpc(Some(peer_id), Box::new(request))
147            .await
148            .map_err(|_| format_err!("rpc call failed"))?;
149        let response = match rpc_response
150            .as_any()
151            .downcast_ref::<BlockRetrievalRpcResponse>()
152        {
153            Some(r) => r.clone(),
154            None => {
155                bail!("response downcast failed");
156            }
157        };
158
159        response
160            .response
161            .verify(
162                retrieval_request.block_id(),
163                retrieval_request.num_blocks(),
164                &self.validators,
165            )
166            .map_err(|e| {
167                diem_error!(
168                    request_block_response = response,
169                    error = ?e,
170                );
171                e
172            })?;
173
174        Ok(response.response)
175    }
176
177    /// Tries to send the given msg to all the participants.
178    ///
179    /// The future is fulfilled as soon as the message put into the mpsc channel
180    /// to network internal(to provide back pressure), it does not indicate
181    /// the message is delivered or sent out. It does not give indication
182    /// about when the message is delivered to the recipients, as well as
183    /// there is no indication about the network failures.
184    pub async fn broadcast(
185        &mut self, msg: ConsensusMsg, exclude: Vec<AccountAddress>,
186    ) {
187        if !exclude.contains(&self.author) {
188            if let Err(err) = self
189                .network_sender
190                .send_self_msg(self.author, msg.clone())
191                .await
192            {
193                diem_error!("Error broadcasting to self: {:?}", err);
194            }
195        }
196
197        /*
198        // Get the list of validators excluding our own account address. Note
199        // the ordering is not important in this case.
200        let self_author = self.author;
201        let other_validators = self
202            .validators
203            .get_ordered_account_addresses_iter()
204            .filter(|author| author != &self_author);
205        let mut public_keys = vec![];
206        for account in other_validators {
207            public_keys.push(self.validators.get_public_key(&account).unwrap());
208        }
209
210        // Broadcast message over direct-send to all other validators.
211        if let Err(err) = self
212            .network_sender
213            .send_to_many(public_keys.into_iter(), &msg)
214        {
215            diem_error!(error = ?err, "Error broadcasting message");
216        }
217         */
218        // TODO(lpl): It may be sufficient to broadcast some messages to only
219        // validators.
220        if let Err(err) = self.network_sender.send_to_others(&msg, &exclude) {
221            diem_error!(error = ?err, "Error broadcasting message");
222        }
223    }
224
225    // This is unused because we always broadcast votes now.
226    // It may be needed when non-voter nodes do not receive votes anymore.
227    #[allow(unused)]
228    /// Sends the vote to the chosen recipients (typically that would be the
229    /// recipients that we believe could serve as proposers in the next
230    /// round). The recipients on the receiving end are going to be notified
231    /// about a new vote in the vote queue.
232    ///
233    /// The future is fulfilled as soon as the message put into the mpsc channel
234    /// to network internal(to provide back pressure), it does not indicate
235    /// the message is delivered or sent out. It does not give indication
236    /// about when the message is delivered to the recipients, as well as
237    /// there is no indication about the network failures.
238    pub async fn send_vote(&self, vote_msg: VoteMsg, recipients: Vec<Author>) {
239        let mut network_sender = self.network_sender.clone();
240        let msg = ConsensusMsg::VoteMsg(Box::new(vote_msg));
241        for peer in recipients {
242            if self.author == peer {
243                if let Err(err) =
244                    network_sender.send_self_msg(self.author, msg.clone()).await
245                {
246                    diem_error!(error = ?err, "Error delivering a self vote");
247                }
248                continue;
249            }
250            if let Err(e) = network_sender.send_to(peer, &msg) {
251                diem_error!(
252                    remote_peer = peer,
253                    error = ?e, "Failed to send a vote to peer",
254                );
255            }
256        }
257    }
258
259    /// Sends the given sync info to the given author.
260    /// The future is fulfilled as soon as the message is added to the internal
261    /// network channel (does not indicate whether the message is delivered
262    /// or sent out).
263    pub fn send_sync_info(&self, sync_info: SyncInfo, recipient: Author) {
264        let msg = ConsensusMsg::SyncInfo(Box::new(sync_info));
265        let mut network_sender = self.network_sender.clone();
266        if let Err(e) = network_sender.send_to(recipient, &msg) {
267            diem_warn!(
268                remote_peer = recipient,
269                error = "Failed to send a sync info msg to peer {:?}",
270                "{:?}",
271                e
272            );
273        }
274    }
275}
276
277/// Consensus network task
278pub struct NetworkTask {
279    /// consensus message sender
280    pub consensus_messages_tx: diem_channel::Sender<
281        (AccountAddress, Discriminant<ConsensusMsg>),
282        (AccountAddress, ConsensusMsg),
283    >,
284    /// block retrieval message sender
285    pub block_retrieval_tx:
286        diem_channel::Sender<AccountAddress, IncomingBlockRetrievalRequest>,
287}
288
289impl NetworkTask {
290    /// Establishes the initial connections with the peers and returns the
291    /// receivers.
292    pub fn new() -> (NetworkTask, NetworkReceivers) {
293        let (consensus_messages_tx, consensus_messages) =
294            diem_channel::new(QueueStyle::LIFO, 1);
295        let (block_retrieval_tx, block_retrieval) =
296            diem_channel::new(QueueStyle::LIFO, 1);
297        (
298            NetworkTask {
299                consensus_messages_tx,
300                block_retrieval_tx,
301            },
302            NetworkReceivers {
303                consensus_messages,
304                block_retrieval,
305            },
306        )
307    }
308
309    /// start
310    pub async fn start(self) {}
311}