1use std::{mem::Discriminant, time::Duration};
9
10use anyhow::{anyhow, bail, ensure, format_err};
11use serde::{Deserialize, Serialize};
12
13use channel::{self, diem_channel, message_queues::QueueStyle};
14use consensus_types::{
15 block_retrieval::{BlockRetrievalRequest, BlockRetrievalResponse},
16 common::Author,
17 epoch_retrieval::EpochRetrievalRequest,
18 proposal_msg::ProposalMsg,
19 sync_info::SyncInfo,
20 vote_msg::VoteMsg,
21};
22use diem_logger::prelude::*;
23use diem_metrics::monitor;
24use diem_types::{
25 account_address::AccountAddress, epoch_change::EpochChangeProof,
26 validator_verifier::ValidatorVerifier,
27};
28use network::node_table::NodeId;
29
30use crate::{
31 message::RequestId,
32 pos::protocol::{
33 message::{
34 block_retrieval::BlockRetrievalRpcRequest,
35 block_retrieval_response::BlockRetrievalRpcResponse,
36 },
37 network_sender::NetworkSender,
38 },
39};
40
41use super::counters;
42
43#[derive(Clone, Debug, Deserialize, Serialize)]
45pub enum ConsensusMsg {
46 BlockRetrievalRequest(Box<BlockRetrievalRequest>),
49 BlockRetrievalResponse(Box<BlockRetrievalResponse>),
51 EpochRetrievalRequest(Box<EpochRetrievalRequest>),
53 ProposalMsg(Box<ProposalMsg>),
57 SyncInfo(Box<SyncInfo>),
59 EpochChangeProof(Box<EpochChangeProof>),
63 VoteMsg(Box<VoteMsg>),
66}
67
68#[derive(Debug)]
71pub struct IncomingBlockRetrievalRequest {
72 pub req: BlockRetrievalRequest,
73 pub peer_id: NodeId,
74 pub request_id: RequestId,
75}
76
77pub struct NetworkReceivers {
80 pub consensus_messages: diem_channel::Receiver<
82 (AccountAddress, Discriminant<ConsensusMsg>),
83 (AccountAddress, ConsensusMsg),
84 >,
85 pub block_retrieval:
86 diem_channel::Receiver<AccountAddress, IncomingBlockRetrievalRequest>,
87}
88
89#[derive(Clone)]
91pub struct ConsensusNetworkSender {
92 pub author: Author,
93 network_sender: NetworkSender,
94 validators: ValidatorVerifier,
95}
96
97impl ConsensusNetworkSender {
98 pub fn new(
99 author: Author, network_sender: NetworkSender,
100 validators: ValidatorVerifier,
101 ) -> Self {
102 ConsensusNetworkSender {
103 author,
104 network_sender,
105 validators,
106 }
107 }
108
109 pub fn network_sender(&self) -> &NetworkSender { &self.network_sender }
110
111 pub async fn request_block(
115 &mut self, retrieval_request: BlockRetrievalRequest, from: Author,
116 timeout: Duration,
117 ) -> anyhow::Result<BlockRetrievalResponse> {
118 ensure!(from != self.author, "Retrieve block from self");
119
120 let peer_hash = self
121 .network_sender
122 .protocol_handler
123 .pos_peer_mapping
124 .read()
125 .get(&from)
126 .cloned()
127 .ok_or(anyhow!(
128 "request_block: recipient {:?} has been removed",
129 from
130 ))?;
131 let peer_state =
132 self.network_sender.protocol_handler.peers.get(&peer_hash);
133 if peer_state.is_none() {
134 bail!("peer not found");
135 }
136 let peer_state = peer_state.unwrap();
137 let peer_id = peer_state.read().get_id().clone();
138
139 let request = BlockRetrievalRpcRequest {
140 request_id: 0,
141 request: retrieval_request.clone(),
142 is_empty: false,
143 response_tx: None,
144 timeout,
145 };
146
147 let rpc_response = monitor!(
148 "block_retrieval",
149 self.network_sender
150 .send_rpc(Some(peer_id), Box::new(request))
151 .await
152 .map_err(|_| { format_err!("rpc call failed") })?
153 );
154 let response = match rpc_response
155 .as_any()
156 .downcast_ref::<BlockRetrievalRpcResponse>()
157 {
158 Some(r) => r.clone(),
159 None => {
160 bail!("response downcast failed");
161 }
162 };
163
164 response
165 .response
166 .verify(
167 retrieval_request.block_id(),
168 retrieval_request.num_blocks(),
169 &self.validators,
170 )
171 .map_err(|e| {
172 diem_error!(
173 request_block_response = response,
174 error = ?e,
175 );
176 e
177 })?;
178
179 Ok(response.response)
180 }
181
182 pub async fn broadcast(
190 &mut self, msg: ConsensusMsg, exclude: Vec<AccountAddress>,
191 ) {
192 if !exclude.contains(&self.author) {
193 if let Err(err) = self
194 .network_sender
195 .send_self_msg(self.author, msg.clone())
196 .await
197 {
198 diem_error!("Error broadcasting to self: {:?}", err);
199 }
200 }
201
202 if let Err(err) = self.network_sender.send_to_others(&msg, &exclude) {
226 diem_error!(error = ?err, "Error broadcasting message");
227 }
228 }
229
230 #[allow(unused)]
233 pub async fn send_vote(&self, vote_msg: VoteMsg, recipients: Vec<Author>) {
244 let mut network_sender = self.network_sender.clone();
245 let msg = ConsensusMsg::VoteMsg(Box::new(vote_msg));
246 for peer in recipients {
247 if self.author == peer {
248 if let Err(err) =
249 network_sender.send_self_msg(self.author, msg.clone()).await
250 {
251 diem_error!(error = ?err, "Error delivering a self vote");
252 }
253 continue;
254 }
255 if let Err(e) = network_sender.send_to(peer, &msg) {
256 diem_error!(
257 remote_peer = peer,
258 error = ?e, "Failed to send a vote to peer",
259 );
260 }
261 }
262 }
263
264 pub fn send_sync_info(&self, sync_info: SyncInfo, recipient: Author) {
269 let msg = ConsensusMsg::SyncInfo(Box::new(sync_info));
270 let mut network_sender = self.network_sender.clone();
271 if let Err(e) = network_sender.send_to(recipient, &msg) {
272 diem_warn!(
273 remote_peer = recipient,
274 error = "Failed to send a sync info msg to peer {:?}",
275 "{:?}",
276 e
277 );
278 }
279 }
280}
281
282pub struct NetworkTask {
284 pub consensus_messages_tx: diem_channel::Sender<
286 (AccountAddress, Discriminant<ConsensusMsg>),
287 (AccountAddress, ConsensusMsg),
288 >,
289 pub block_retrieval_tx:
291 diem_channel::Sender<AccountAddress, IncomingBlockRetrievalRequest>,
292}
293
294impl NetworkTask {
295 pub fn new() -> (NetworkTask, NetworkReceivers) {
298 let (consensus_messages_tx, consensus_messages) = diem_channel::new(
299 QueueStyle::LIFO,
300 1,
301 Some(&counters::CONSENSUS_CHANNEL_MSGS),
302 );
303 let (block_retrieval_tx, block_retrieval) = diem_channel::new(
304 QueueStyle::LIFO,
305 1,
306 Some(&counters::BLOCK_RETRIEVAL_CHANNEL_MSGS),
307 );
308 (
309 NetworkTask {
310 consensus_messages_tx,
311 block_retrieval_tx,
312 },
313 NetworkReceivers {
314 consensus_messages,
315 block_retrieval,
316 },
317 )
318 }
319
320 pub async fn start(self) {}
322}