cfxcore/pos/protocol/
network_sender.rs

1// Copyright 2019-2020 Conflux Foundation. All rights reserved.
2// TreeGraph is free software and distributed under Apache License 2.0.
3// See https://www.apache.org/licenses/LICENSE-2.0
4
5use std::{mem::discriminant, sync::Arc};
6
7use anyhow::format_err;
8use futures::channel::oneshot;
9
10use diem_types::account_address::AccountAddress;
11use network::{node_table::NodeId, NetworkService};
12
13use crate::{
14    message::Message,
15    pos::{
16        consensus::network::ConsensusMsg,
17        protocol::{
18            request_manager::Request,
19            sync_protocol::{HotStuffSynchronizationProtocol, RpcResponse},
20            HSB_PROTOCOL_ID,
21        },
22    },
23};
24
25/// The interface from Consensus to Networking layer.
26///
27/// This is a thin wrapper around a `NetworkSender<ConsensusMsg>`, so it is easy
28/// to clone and send off to a separate task. For example, the rpc requests
29/// return Futures that encapsulate the whole flow, from sending the request to
30/// remote, to finally receiving the response and deserializing. It therefore
31/// makes the most sense to make the rpc call on a separate async task, which
32/// requires the `NetworkSender` to be `Clone` and `Send`.
33#[derive(Clone)]
34pub struct NetworkSender {
35    /// network service
36    pub network: Arc<NetworkService>,
37    /// hotstuff protocol handler
38    pub protocol_handler: Arc<HotStuffSynchronizationProtocol>,
39}
40
41impl NetworkSender {
42    /// Send a single message to the destination peer using the
43    /// `CONSENSUS_DIRECT_SEND_PROTOCOL` ProtocolId.
44    pub fn send_to(
45        &mut self, recipient: AccountAddress, msg: &dyn Message,
46    ) -> Result<(), anyhow::Error> {
47        if let Some(peer_hash) = self
48            .protocol_handler
49            .pos_peer_mapping
50            .read()
51            .get(&recipient)
52        {
53            if let Some(peer) = self.protocol_handler.peers.get(peer_hash) {
54                let peer_id = peer.read().get_id();
55                self.send_message_with_peer_id(&peer_id, msg)?;
56            } else {
57                warn!("peer_hash {:?} does not exist", peer_hash);
58            }
59        } else {
60            warn!("recipient {:?} has been removed", recipient)
61        }
62        Ok(())
63    }
64
65    /// Send a single message to the destination peers using the
66    /// `CONSENSUS_DIRECT_SEND_PROTOCOL` ProtocolId.
67    pub fn send_to_many(
68        &mut self, recipients: impl Iterator<Item = AccountAddress>,
69        msg: &dyn Message,
70    ) -> Result<(), anyhow::Error> {
71        for recipient in recipients {
72            self.send_to(recipient, msg)?;
73        }
74        Ok(())
75    }
76
77    /// Send a msg to all connected PoS nodes. They may or may not be
78    /// validators.
79    pub fn send_to_others(
80        &mut self, msg: &dyn Message, exclude: &Vec<AccountAddress>,
81    ) -> Result<(), anyhow::Error> {
82        // The node itself is not included in pos_peer_mapping.
83        for (node_id, peer_hash) in
84            self.protocol_handler.pos_peer_mapping.read().iter()
85        {
86            if exclude.contains(node_id) {
87                continue;
88            }
89            if let Some(peer) = self.protocol_handler.peers.get(peer_hash) {
90                let peer_id = peer.read().get_id();
91                self.send_message_with_peer_id(&peer_id, msg)?;
92            } else {
93                warn!("peer_hash {:?} does not exist", peer_hash);
94            }
95        }
96        Ok(())
97    }
98
99    /// Send a RPC to the destination peer using the `CONSENSUS_RPC_PROTOCOL`
100    /// ProtocolId.
101    pub async fn send_rpc(
102        &self, recipient: Option<NodeId>, mut request: Box<dyn Request>,
103    ) -> Result<Box<dyn RpcResponse>, anyhow::Error> {
104        let (res_tx, res_rx) = oneshot::channel();
105        self.network
106            .with_context(
107                self.protocol_handler.clone(),
108                HSB_PROTOCOL_ID,
109                |io| {
110                    request.set_response_notification(res_tx);
111                    self.protocol_handler
112                        .request_manager
113                        .request_with_delay(io, request, recipient, None)
114                },
115            )
116            .map_err(|e| format_err!("send rpc failed: err={:?}", e))?;
117        Ok(res_rx
118            .await?
119            .map_err(|e| format_err!("rpc call failed: err={:?}", e))?)
120    }
121
122    /// Send msg to self
123    pub async fn send_self_msg(
124        &self, self_author: AccountAddress, msg: ConsensusMsg,
125    ) -> anyhow::Result<(), anyhow::Error> {
126        self.protocol_handler
127            .consensus_network_task
128            .consensus_messages_tx
129            .push((self_author, discriminant(&msg)), (self_author, msg))
130    }
131
132    /// Send msg to peer
133    pub fn send_message_with_peer_id(
134        &self, peer_id: &NodeId, msg: &dyn Message,
135    ) -> anyhow::Result<(), anyhow::Error> {
136        self.network
137            .with_context(
138                self.protocol_handler.clone(),
139                HSB_PROTOCOL_ID,
140                |io| msg.send(io, peer_id),
141            )
142            .map_err(|e| format_err!("context failed: {:#}", e))?
143            .map_err(|e| format_err!("send message failed: {:#}", e))?;
144        Ok(())
145    }
146}