1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0

// Copyright 2021 Conflux Foundation. All rights reserved.
// Conflux is free software and distributed under GNU General Public License.
// See http://www.gnu.org/licenses/

use std::{mem::Discriminant, time::Duration};

use anyhow::{anyhow, bail, ensure, format_err};
use serde::{Deserialize, Serialize};

use channel::{self, diem_channel, message_queues::QueueStyle};
use consensus_types::{
    block_retrieval::{BlockRetrievalRequest, BlockRetrievalResponse},
    common::Author,
    epoch_retrieval::EpochRetrievalRequest,
    proposal_msg::ProposalMsg,
    sync_info::SyncInfo,
    vote_msg::VoteMsg,
};
use diem_logger::prelude::*;
use diem_metrics::monitor;
use diem_types::{
    account_address::AccountAddress, epoch_change::EpochChangeProof,
    validator_verifier::ValidatorVerifier,
};
use network::node_table::NodeId;

use crate::{
    message::RequestId,
    pos::protocol::{
        message::{
            block_retrieval::BlockRetrievalRpcRequest,
            block_retrieval_response::BlockRetrievalRpcResponse,
        },
        network_sender::NetworkSender,
    },
};

use super::counters;

/// Network type for consensus
#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum ConsensusMsg {
    /// RPC to get a chain of block of the given length starting from the given
    /// block id.
    BlockRetrievalRequest(Box<BlockRetrievalRequest>),
    /// Carries the returned blocks and the retrieval status.
    BlockRetrievalResponse(Box<BlockRetrievalResponse>),
    /// Request to get a EpochChangeProof from current_epoch to target_epoch
    EpochRetrievalRequest(Box<EpochRetrievalRequest>),
    /// ProposalMsg contains the required information for the proposer election
    /// protocol to make its choice (typically depends on round and
    /// proposer info).
    ProposalMsg(Box<ProposalMsg>),
    /// This struct describes basic synchronization metadata.
    SyncInfo(Box<SyncInfo>),
    /// A vector of LedgerInfo with contiguous increasing epoch numbers to
    /// prove a sequence of epoch changes from the first LedgerInfo's
    /// epoch.
    EpochChangeProof(Box<EpochChangeProof>),
    /// VoteMsg is the struct that is ultimately sent by the voter in response
    /// for receiving a proposal.
    VoteMsg(Box<VoteMsg>),
}

/// The block retrieval request is used internally for implementing RPC: the
/// callback is executed for carrying the response
#[derive(Debug)]
pub struct IncomingBlockRetrievalRequest {
    pub req: BlockRetrievalRequest,
    pub peer_id: NodeId,
    pub request_id: RequestId,
}

/// Just a convenience struct to keep all the network proxy receiving queues in
/// one place. Will be returned by the NetworkTask upon startup.
pub struct NetworkReceivers {
    /// Provide a LIFO buffer for each (Author, MessageType) key
    pub consensus_messages: diem_channel::Receiver<
        (AccountAddress, Discriminant<ConsensusMsg>),
        (AccountAddress, ConsensusMsg),
    >,
    pub block_retrieval:
        diem_channel::Receiver<AccountAddress, IncomingBlockRetrievalRequest>,
}

/// Implements the actual networking support for all consensus messaging.
#[derive(Clone)]
pub struct ConsensusNetworkSender {
    pub author: Author,
    network_sender: NetworkSender,
    validators: ValidatorVerifier,
}

impl ConsensusNetworkSender {
    pub fn new(
        author: Author, network_sender: NetworkSender,
        validators: ValidatorVerifier,
    ) -> Self {
        ConsensusNetworkSender {
            author,
            network_sender,
            validators,
        }
    }

    pub fn network_sender(&self) -> &NetworkSender { &self.network_sender }

    /// Tries to retrieve num of blocks backwards starting from id from the
    /// given peer: the function returns a future that is fulfilled with
    /// BlockRetrievalResponse.
    pub async fn request_block(
        &mut self, retrieval_request: BlockRetrievalRequest, from: Author,
        timeout: Duration,
    ) -> anyhow::Result<BlockRetrievalResponse> {
        ensure!(from != self.author, "Retrieve block from self");

        let peer_hash = self
            .network_sender
            .protocol_handler
            .pos_peer_mapping
            .read()
            .get(&from)
            .cloned()
            .ok_or(anyhow!(
                "request_block: recipient {:?} has been removed",
                from
            ))?;
        let peer_state =
            self.network_sender.protocol_handler.peers.get(&peer_hash);
        if peer_state.is_none() {
            bail!("peer not found");
        }
        let peer_state = peer_state.unwrap();
        let peer_id = peer_state.read().get_id().clone();

        let request = BlockRetrievalRpcRequest {
            request_id: 0,
            request: retrieval_request.clone(),
            is_empty: false,
            response_tx: None,
            timeout,
        };

        let rpc_response = monitor!(
            "block_retrieval",
            self.network_sender
                .send_rpc(Some(peer_id), Box::new(request))
                .await
                .map_err(|_| { format_err!("rpc call failed") })?
        );
        let response = match rpc_response
            .as_any()
            .downcast_ref::<BlockRetrievalRpcResponse>()
        {
            Some(r) => r.clone(),
            None => {
                bail!("response downcast failed");
            }
        };

        response
            .response
            .verify(
                retrieval_request.block_id(),
                retrieval_request.num_blocks(),
                &self.validators,
            )
            .map_err(|e| {
                diem_error!(
                    request_block_response = response,
                    error = ?e,
                );
                e
            })?;

        Ok(response.response)
    }

    /// Tries to send the given msg to all the participants.
    ///
    /// The future is fulfilled as soon as the message put into the mpsc channel
    /// to network internal(to provide back pressure), it does not indicate
    /// the message is delivered or sent out. It does not give indication
    /// about when the message is delivered to the recipients, as well as
    /// there is no indication about the network failures.
    pub async fn broadcast(
        &mut self, msg: ConsensusMsg, exclude: Vec<AccountAddress>,
    ) {
        if !exclude.contains(&self.author) {
            if let Err(err) = self
                .network_sender
                .send_self_msg(self.author, msg.clone())
                .await
            {
                diem_error!("Error broadcasting to self: {:?}", err);
            }
        }

        /*
        // Get the list of validators excluding our own account address. Note
        // the ordering is not important in this case.
        let self_author = self.author;
        let other_validators = self
            .validators
            .get_ordered_account_addresses_iter()
            .filter(|author| author != &self_author);
        let mut public_keys = vec![];
        for account in other_validators {
            public_keys.push(self.validators.get_public_key(&account).unwrap());
        }

        // Broadcast message over direct-send to all other validators.
        if let Err(err) = self
            .network_sender
            .send_to_many(public_keys.into_iter(), &msg)
        {
            diem_error!(error = ?err, "Error broadcasting message");
        }
         */
        // TODO(lpl): It may be sufficient to broadcast some messages to only
        // validators.
        if let Err(err) = self.network_sender.send_to_others(&msg, &exclude) {
            diem_error!(error = ?err, "Error broadcasting message");
        }
    }

    // This is unused because we always broadcast votes now.
    // It may be needed when non-voter nodes do not receive votes anymore.
    #[allow(unused)]
    /// Sends the vote to the chosen recipients (typically that would be the
    /// recipients that we believe could serve as proposers in the next
    /// round). The recipients on the receiving end are going to be notified
    /// about a new vote in the vote queue.
    ///
    /// The future is fulfilled as soon as the message put into the mpsc channel
    /// to network internal(to provide back pressure), it does not indicate
    /// the message is delivered or sent out. It does not give indication
    /// about when the message is delivered to the recipients, as well as
    /// there is no indication about the network failures.
    pub async fn send_vote(&self, vote_msg: VoteMsg, recipients: Vec<Author>) {
        let mut network_sender = self.network_sender.clone();
        let msg = ConsensusMsg::VoteMsg(Box::new(vote_msg));
        for peer in recipients {
            if self.author == peer {
                if let Err(err) =
                    network_sender.send_self_msg(self.author, msg.clone()).await
                {
                    diem_error!(error = ?err, "Error delivering a self vote");
                }
                continue;
            }
            if let Err(e) = network_sender.send_to(peer, &msg) {
                diem_error!(
                    remote_peer = peer,
                    error = ?e, "Failed to send a vote to peer",
                );
            }
        }
    }

    /// Sends the given sync info to the given author.
    /// The future is fulfilled as soon as the message is added to the internal
    /// network channel (does not indicate whether the message is delivered
    /// or sent out).
    pub fn send_sync_info(&self, sync_info: SyncInfo, recipient: Author) {
        let msg = ConsensusMsg::SyncInfo(Box::new(sync_info));
        let mut network_sender = self.network_sender.clone();
        if let Err(e) = network_sender.send_to(recipient, &msg) {
            diem_warn!(
                remote_peer = recipient,
                error = "Failed to send a sync info msg to peer {:?}",
                "{:?}",
                e
            );
        }
    }
}

/// Consensus network task
pub struct NetworkTask {
    /// consensus message sender
    pub consensus_messages_tx: diem_channel::Sender<
        (AccountAddress, Discriminant<ConsensusMsg>),
        (AccountAddress, ConsensusMsg),
    >,
    /// block retrieval message sender
    pub block_retrieval_tx:
        diem_channel::Sender<AccountAddress, IncomingBlockRetrievalRequest>,
}

impl NetworkTask {
    /// Establishes the initial connections with the peers and returns the
    /// receivers.
    pub fn new() -> (NetworkTask, NetworkReceivers) {
        let (consensus_messages_tx, consensus_messages) = diem_channel::new(
            QueueStyle::LIFO,
            1,
            Some(&counters::CONSENSUS_CHANNEL_MSGS),
        );
        let (block_retrieval_tx, block_retrieval) = diem_channel::new(
            QueueStyle::LIFO,
            1,
            Some(&counters::BLOCK_RETRIEVAL_CHANNEL_MSGS),
        );
        (
            NetworkTask {
                consensus_messages_tx,
                block_retrieval_tx,
            },
            NetworkReceivers {
                consensus_messages,
                block_retrieval,
            },
        )
    }

    /// start
    pub async fn start(self) {}
}