cfxcore/pos/protocol/
network_sender.rs1use std::{mem::discriminant, sync::Arc};
6
7use anyhow::format_err;
8use futures::channel::oneshot;
9
10use diem_types::account_address::AccountAddress;
11use network::{node_table::NodeId, NetworkService};
12
13use crate::{
14 message::Message,
15 pos::{
16 consensus::network::ConsensusMsg,
17 protocol::{
18 request_manager::Request,
19 sync_protocol::{HotStuffSynchronizationProtocol, RpcResponse},
20 HSB_PROTOCOL_ID,
21 },
22 },
23};
24
25#[derive(Clone)]
34pub struct NetworkSender {
35 pub network: Arc<NetworkService>,
37 pub protocol_handler: Arc<HotStuffSynchronizationProtocol>,
39}
40
41impl NetworkSender {
42 pub fn send_to(
45 &mut self, recipient: AccountAddress, msg: &dyn Message,
46 ) -> Result<(), anyhow::Error> {
47 if let Some(peer_hash) = self
48 .protocol_handler
49 .pos_peer_mapping
50 .read()
51 .get(&recipient)
52 {
53 if let Some(peer) = self.protocol_handler.peers.get(peer_hash) {
54 let peer_id = peer.read().get_id();
55 self.send_message_with_peer_id(&peer_id, msg)?;
56 } else {
57 warn!("peer_hash {:?} does not exist", peer_hash);
58 }
59 } else {
60 warn!("recipient {:?} has been removed", recipient)
61 }
62 Ok(())
63 }
64
65 pub fn send_to_many(
68 &mut self, recipients: impl Iterator<Item = AccountAddress>,
69 msg: &dyn Message,
70 ) -> Result<(), anyhow::Error> {
71 for recipient in recipients {
72 self.send_to(recipient, msg)?;
73 }
74 Ok(())
75 }
76
77 pub fn send_to_others(
80 &mut self, msg: &dyn Message, exclude: &Vec<AccountAddress>,
81 ) -> Result<(), anyhow::Error> {
82 for (node_id, peer_hash) in
84 self.protocol_handler.pos_peer_mapping.read().iter()
85 {
86 if exclude.contains(node_id) {
87 continue;
88 }
89 if let Some(peer) = self.protocol_handler.peers.get(peer_hash) {
90 let peer_id = peer.read().get_id();
91 self.send_message_with_peer_id(&peer_id, msg)?;
92 } else {
93 warn!("peer_hash {:?} does not exist", peer_hash);
94 }
95 }
96 Ok(())
97 }
98
99 pub async fn send_rpc(
102 &self, recipient: Option<NodeId>, mut request: Box<dyn Request>,
103 ) -> Result<Box<dyn RpcResponse>, anyhow::Error> {
104 let (res_tx, res_rx) = oneshot::channel();
105 self.network
106 .with_context(
107 self.protocol_handler.clone(),
108 HSB_PROTOCOL_ID,
109 |io| {
110 request.set_response_notification(res_tx);
111 self.protocol_handler
112 .request_manager
113 .request_with_delay(io, request, recipient, None)
114 },
115 )
116 .map_err(|e| format_err!("send rpc failed: err={:?}", e))?;
117 Ok(res_rx
118 .await?
119 .map_err(|e| format_err!("rpc call failed: err={:?}", e))?)
120 }
121
122 pub async fn send_self_msg(
124 &self, self_author: AccountAddress, msg: ConsensusMsg,
125 ) -> anyhow::Result<(), anyhow::Error> {
126 self.protocol_handler
127 .consensus_network_task
128 .consensus_messages_tx
129 .push((self_author, discriminant(&msg)), (self_author, msg))
130 }
131
132 pub fn send_message_with_peer_id(
134 &self, peer_id: &NodeId, msg: &dyn Message,
135 ) -> anyhow::Result<(), anyhow::Error> {
136 self.network
137 .with_context(
138 self.protocol_handler.clone(),
139 HSB_PROTOCOL_ID,
140 |io| msg.send(io, peer_id),
141 )
142 .map_err(|e| format_err!("context failed: {:#}", e))?
143 .map_err(|e| format_err!("send message failed: {:#}", e))?;
144 Ok(())
145 }
146}