cfxcore/pos/consensus/network.rs
1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8use std::{mem::Discriminant, time::Duration};
9
10use anyhow::{anyhow, bail, ensure, format_err};
11use serde::{Deserialize, Serialize};
12
13use channel::{diem_channel, message_queues::QueueStyle};
14use consensus_types::{
15 block_retrieval::{BlockRetrievalRequest, BlockRetrievalResponse},
16 common::Author,
17 epoch_retrieval::EpochRetrievalRequest,
18 proposal_msg::ProposalMsg,
19 sync_info::SyncInfo,
20 vote_msg::VoteMsg,
21};
22use diem_logger::prelude::*;
23use diem_types::{
24 account_address::AccountAddress, epoch_change::EpochChangeProof,
25 validator_verifier::ValidatorVerifier,
26};
27use network::node_table::NodeId;
28
29use crate::{
30 message::RequestId,
31 pos::protocol::{
32 message::{
33 block_retrieval::BlockRetrievalRpcRequest,
34 block_retrieval_response::BlockRetrievalRpcResponse,
35 },
36 network_sender::NetworkSender,
37 },
38};
39
40/// Network type for consensus
41#[derive(Clone, Debug, Deserialize, Serialize)]
42pub enum ConsensusMsg {
43 /// RPC to get a chain of block of the given length starting from the given
44 /// block id.
45 BlockRetrievalRequest(Box<BlockRetrievalRequest>),
46 /// Carries the returned blocks and the retrieval status.
47 BlockRetrievalResponse(Box<BlockRetrievalResponse>),
48 /// Request to get a EpochChangeProof from current_epoch to target_epoch
49 EpochRetrievalRequest(Box<EpochRetrievalRequest>),
50 /// ProposalMsg contains the required information for the proposer election
51 /// protocol to make its choice (typically depends on round and
52 /// proposer info).
53 ProposalMsg(Box<ProposalMsg>),
54 /// This struct describes basic synchronization metadata.
55 SyncInfo(Box<SyncInfo>),
56 /// A vector of LedgerInfo with contiguous increasing epoch numbers to
57 /// prove a sequence of epoch changes from the first LedgerInfo's
58 /// epoch.
59 EpochChangeProof(Box<EpochChangeProof>),
60 /// VoteMsg is the struct that is ultimately sent by the voter in response
61 /// for receiving a proposal.
62 VoteMsg(Box<VoteMsg>),
63}
64
65/// The block retrieval request is used internally for implementing RPC: the
66/// callback is executed for carrying the response
67#[derive(Debug)]
68pub struct IncomingBlockRetrievalRequest {
69 pub req: BlockRetrievalRequest,
70 pub peer_id: NodeId,
71 pub request_id: RequestId,
72}
73
74/// Just a convenience struct to keep all the network proxy receiving queues in
75/// one place. Will be returned by the NetworkTask upon startup.
76pub struct NetworkReceivers {
77 /// Provide a LIFO buffer for each (Author, MessageType) key
78 pub consensus_messages: diem_channel::Receiver<
79 (AccountAddress, Discriminant<ConsensusMsg>),
80 (AccountAddress, ConsensusMsg),
81 >,
82 pub block_retrieval:
83 diem_channel::Receiver<AccountAddress, IncomingBlockRetrievalRequest>,
84}
85
86/// Implements the actual networking support for all consensus messaging.
87#[derive(Clone)]
88pub struct ConsensusNetworkSender {
89 pub author: Author,
90 network_sender: NetworkSender,
91 validators: ValidatorVerifier,
92}
93
94impl ConsensusNetworkSender {
95 pub fn new(
96 author: Author, network_sender: NetworkSender,
97 validators: ValidatorVerifier,
98 ) -> Self {
99 ConsensusNetworkSender {
100 author,
101 network_sender,
102 validators,
103 }
104 }
105
106 pub fn network_sender(&self) -> &NetworkSender { &self.network_sender }
107
108 /// Tries to retrieve num of blocks backwards starting from id from the
109 /// given peer: the function returns a future that is fulfilled with
110 /// BlockRetrievalResponse.
111 pub async fn request_block(
112 &mut self, retrieval_request: BlockRetrievalRequest, from: Author,
113 timeout: Duration,
114 ) -> anyhow::Result<BlockRetrievalResponse> {
115 ensure!(from != self.author, "Retrieve block from self");
116
117 let peer_hash = self
118 .network_sender
119 .protocol_handler
120 .pos_peer_mapping
121 .read()
122 .get(&from)
123 .cloned()
124 .ok_or(anyhow!(
125 "request_block: recipient {:?} has been removed",
126 from
127 ))?;
128 let peer_state =
129 self.network_sender.protocol_handler.peers.get(&peer_hash);
130 if peer_state.is_none() {
131 bail!("peer not found");
132 }
133 let peer_state = peer_state.unwrap();
134 let peer_id = peer_state.read().get_id().clone();
135
136 let request = BlockRetrievalRpcRequest {
137 request_id: 0,
138 request: retrieval_request.clone(),
139 is_empty: false,
140 response_tx: None,
141 timeout,
142 };
143
144 let rpc_response = self
145 .network_sender
146 .send_rpc(Some(peer_id), Box::new(request))
147 .await
148 .map_err(|_| format_err!("rpc call failed"))?;
149 let response = match rpc_response
150 .as_any()
151 .downcast_ref::<BlockRetrievalRpcResponse>()
152 {
153 Some(r) => r.clone(),
154 None => {
155 bail!("response downcast failed");
156 }
157 };
158
159 response
160 .response
161 .verify(
162 retrieval_request.block_id(),
163 retrieval_request.num_blocks(),
164 &self.validators,
165 )
166 .map_err(|e| {
167 diem_error!(
168 request_block_response = response,
169 error = ?e,
170 );
171 e
172 })?;
173
174 Ok(response.response)
175 }
176
177 /// Tries to send the given msg to all the participants.
178 ///
179 /// The future is fulfilled as soon as the message put into the mpsc channel
180 /// to network internal(to provide back pressure), it does not indicate
181 /// the message is delivered or sent out. It does not give indication
182 /// about when the message is delivered to the recipients, as well as
183 /// there is no indication about the network failures.
184 pub async fn broadcast(
185 &mut self, msg: ConsensusMsg, exclude: Vec<AccountAddress>,
186 ) {
187 if !exclude.contains(&self.author) {
188 if let Err(err) = self
189 .network_sender
190 .send_self_msg(self.author, msg.clone())
191 .await
192 {
193 diem_error!("Error broadcasting to self: {:?}", err);
194 }
195 }
196
197 /*
198 // Get the list of validators excluding our own account address. Note
199 // the ordering is not important in this case.
200 let self_author = self.author;
201 let other_validators = self
202 .validators
203 .get_ordered_account_addresses_iter()
204 .filter(|author| author != &self_author);
205 let mut public_keys = vec![];
206 for account in other_validators {
207 public_keys.push(self.validators.get_public_key(&account).unwrap());
208 }
209
210 // Broadcast message over direct-send to all other validators.
211 if let Err(err) = self
212 .network_sender
213 .send_to_many(public_keys.into_iter(), &msg)
214 {
215 diem_error!(error = ?err, "Error broadcasting message");
216 }
217 */
218 // TODO(lpl): It may be sufficient to broadcast some messages to only
219 // validators.
220 if let Err(err) = self.network_sender.send_to_others(&msg, &exclude) {
221 diem_error!(error = ?err, "Error broadcasting message");
222 }
223 }
224
225 // This is unused because we always broadcast votes now.
226 // It may be needed when non-voter nodes do not receive votes anymore.
227 #[allow(unused)]
228 /// Sends the vote to the chosen recipients (typically that would be the
229 /// recipients that we believe could serve as proposers in the next
230 /// round). The recipients on the receiving end are going to be notified
231 /// about a new vote in the vote queue.
232 ///
233 /// The future is fulfilled as soon as the message put into the mpsc channel
234 /// to network internal(to provide back pressure), it does not indicate
235 /// the message is delivered or sent out. It does not give indication
236 /// about when the message is delivered to the recipients, as well as
237 /// there is no indication about the network failures.
238 pub async fn send_vote(&self, vote_msg: VoteMsg, recipients: Vec<Author>) {
239 let mut network_sender = self.network_sender.clone();
240 let msg = ConsensusMsg::VoteMsg(Box::new(vote_msg));
241 for peer in recipients {
242 if self.author == peer {
243 if let Err(err) =
244 network_sender.send_self_msg(self.author, msg.clone()).await
245 {
246 diem_error!(error = ?err, "Error delivering a self vote");
247 }
248 continue;
249 }
250 if let Err(e) = network_sender.send_to(peer, &msg) {
251 diem_error!(
252 remote_peer = peer,
253 error = ?e, "Failed to send a vote to peer",
254 );
255 }
256 }
257 }
258
259 /// Sends the given sync info to the given author.
260 /// The future is fulfilled as soon as the message is added to the internal
261 /// network channel (does not indicate whether the message is delivered
262 /// or sent out).
263 pub fn send_sync_info(&self, sync_info: SyncInfo, recipient: Author) {
264 let msg = ConsensusMsg::SyncInfo(Box::new(sync_info));
265 let mut network_sender = self.network_sender.clone();
266 if let Err(e) = network_sender.send_to(recipient, &msg) {
267 diem_warn!(
268 remote_peer = recipient,
269 error = "Failed to send a sync info msg to peer {:?}",
270 "{:?}",
271 e
272 );
273 }
274 }
275}
276
277/// Consensus network task
278pub struct NetworkTask {
279 /// consensus message sender
280 pub consensus_messages_tx: diem_channel::Sender<
281 (AccountAddress, Discriminant<ConsensusMsg>),
282 (AccountAddress, ConsensusMsg),
283 >,
284 /// block retrieval message sender
285 pub block_retrieval_tx:
286 diem_channel::Sender<AccountAddress, IncomingBlockRetrievalRequest>,
287}
288
289impl NetworkTask {
290 /// Establishes the initial connections with the peers and returns the
291 /// receivers.
292 pub fn new() -> (NetworkTask, NetworkReceivers) {
293 let (consensus_messages_tx, consensus_messages) =
294 diem_channel::new(QueueStyle::LIFO, 1);
295 let (block_retrieval_tx, block_retrieval) =
296 diem_channel::new(QueueStyle::LIFO, 1);
297 (
298 NetworkTask {
299 consensus_messages_tx,
300 block_retrieval_tx,
301 },
302 NetworkReceivers {
303 consensus_messages,
304 block_retrieval,
305 },
306 )
307 }
308
309 /// start
310 pub async fn start(self) {}
311}