1use cfx_types::H256;
6use crate::{
8 message::MsgId,
9 sync::{
10 message::{DynamicCapability, DynamicCapabilitySet},
11 random, Error,
12 },
13 NodeType,
14};
15use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
16use network::{
17 node_table::NodeId, service::ProtocolVersion, Error as NetworkError,
18};
19use parking_lot::RwLock;
20use rand::prelude::{IndexedRandom, SliceRandom};
21use std::{
22 collections::{HashMap, HashSet},
23 sync::Arc,
24 time::{Duration, Instant},
25};
26use throttling::token_bucket::{ThrottledManager, TokenBucketManager};
27
28#[derive(DeriveMallocSizeOf)]
29pub struct SynchronizationPeerState {
30 pub node_id: NodeId,
31 pub node_type: NodeType,
32 pub is_validator: bool,
35 pub protocol_version: ProtocolVersion,
36 pub genesis_hash: H256,
37 pub best_epoch: u64,
38 pub latest_block_hashes: HashSet<H256>,
39
40 pub received_transaction_count: usize,
43
44 pub heartbeat: Instant,
47
48 pub capabilities: DynamicCapabilitySet,
50 pub notified_capabilities: DynamicCapabilitySet,
52
53 pub throttling: TokenBucketManager,
56 pub throttled_msgs: ThrottledManager<MsgId>,
60}
61
62impl SynchronizationPeerState {
63 pub fn update(
64 &mut self, node_type: Option<NodeType>,
65 latest_block_hashes: HashSet<H256>, best_epoch: u64,
66 ) -> bool {
67 if let Some(node_type) = node_type {
68 self.node_type = node_type;
69 }
70 self.heartbeat = Instant::now();
71
72 let updated = best_epoch != self.best_epoch
73 || latest_block_hashes != self.latest_block_hashes;
74
75 if updated {
79 self.best_epoch = best_epoch;
80 self.latest_block_hashes = latest_block_hashes;
81 }
82
83 updated
84 }
85}
86
87pub type SynchronizationPeers =
88 HashMap<NodeId, Arc<RwLock<SynchronizationPeerState>>>;
89
90#[derive(DeriveMallocSizeOf)]
91pub struct SynchronizationState {
92 is_consortium: bool,
93 node_type: NodeType,
94 allow_phase_change_without_peer: bool,
95 min_phase_change_normal_peer_count: usize,
96 pub peers: RwLock<SynchronizationPeers>,
97 pub handshaking_peers: RwLock<HashMap<NodeId, (ProtocolVersion, Instant)>>,
98 pub last_sent_transaction_hashes: RwLock<HashSet<H256>>,
99}
100
101impl SynchronizationState {
102 pub fn new(
103 is_consortium: bool, node_type: NodeType,
104 allow_phase_change_without_peer: bool,
105 min_phase_change_normal_peer_count: usize,
106 ) -> Self {
107 SynchronizationState {
108 is_consortium,
109 node_type,
110 allow_phase_change_without_peer,
111 min_phase_change_normal_peer_count,
112 peers: Default::default(),
113 handshaking_peers: Default::default(),
114 last_sent_transaction_hashes: Default::default(),
115 }
116 }
117
118 pub fn is_consortium(&self) -> bool { self.is_consortium }
119
120 pub fn on_status_in_handshaking(
121 &self, node_id: &NodeId,
122 ) -> Option<ProtocolVersion> {
123 let peers = self.peers.read();
124 let mut handshaking_peers = self.handshaking_peers.write();
125 if !peers.contains_key(node_id) {
126 handshaking_peers.remove(node_id).map(|(v, _)| v)
127 } else {
128 None
129 }
130 }
131
132 pub fn peer_connected(
133 &self, node_id: NodeId, state: SynchronizationPeerState,
134 ) {
135 let mut peers = self.peers.write();
136 if self.is_consortium() {
137 unimplemented!();
138 } else {
139 peers.insert(node_id, Arc::new(RwLock::new(state)));
140 }
141 }
142
143 pub fn contains_peer(&self, node_id: &NodeId) -> bool {
144 self.peers.read().contains_key(node_id)
145 }
146
147 pub fn get_peer_info(
148 &self, node_id: &NodeId,
149 ) -> Result<Arc<RwLock<SynchronizationPeerState>>, Error> {
150 Ok(self
151 .peers
152 .read()
153 .get(node_id)
154 .ok_or(Error::UnknownPeer)?
155 .clone())
156 }
157
158 pub fn get_peer_version(
159 &self, peer: &NodeId,
160 ) -> Result<ProtocolVersion, NetworkError> {
161 match self.get_peer_info(peer) {
162 Err(_) => bail!(NetworkError::InvalidNodeId),
163 Ok(info) => Ok(info.read().protocol_version),
164 }
165 }
166
167 pub fn update_heartbeat(&self, node_id: &NodeId) {
170 if let Some(state) = self.peers.read().get(node_id) {
171 state.write().heartbeat = Instant::now();
172 }
173 }
174
175 pub fn get_heartbeat_timeout_peers(
178 &self, timeout: Duration,
179 ) -> Vec<NodeId> {
180 let mut timeout_peers = Vec::new();
181
182 for (peer, (_, handshake_time)) in self.handshaking_peers.read().iter()
183 {
184 if handshake_time.elapsed() > timeout {
185 timeout_peers.push(*peer);
186 }
187 }
188
189 for (peer, state) in self.peers.read().iter() {
190 if state.read().heartbeat.elapsed() > timeout {
191 timeout_peers.push(*peer);
192 }
193 }
194
195 timeout_peers
196 }
197
198 pub fn is_full_node(&self) -> bool { self.node_type == NodeType::Full }
199
200 pub fn allow_phase_change_without_peer(&self) -> bool {
201 self.allow_phase_change_without_peer
202 }
203
204 pub fn median_epoch_from_normal_peers(&self) -> Option<u64> {
208 let mut fresh_start = true;
212 let mut peer_best_epoches = Vec::new();
213 {
214 let peers = self.peers.read();
215 if peers.is_empty() {
216 debug!("median_epoch_from_normal_peers: no connected peers");
219 fresh_start = false;
220 }
221 for (_, state_lock) in &*peers {
222 let state = state_lock.read();
223 if state
224 .capabilities
225 .contains(DynamicCapability::NormalPhase(true))
226 {
227 fresh_start = false;
228 peer_best_epoches.push(state.best_epoch);
229 } else if state.best_epoch != 0 {
230 fresh_start = false;
233 debug!("median_epoch_from_normal_peers: not fresh start");
234 }
235 }
236 };
237
238 if peer_best_epoches.len() < self.min_phase_change_normal_peer_count
241 || peer_best_epoches.is_empty()
242 {
243 return if fresh_start {
244 debug!("median_epoch_from_normal_peers: fresh start");
245 Some(0)
246 } else {
247 debug!(
248 "median_epoch_from_normal_peers: no enough peers in normal phase, have {}, require {}",
249 peer_best_epoches.len(), self.min_phase_change_normal_peer_count
250 );
251 None
252 };
253 }
254
255 peer_best_epoches.sort();
256 Some(peer_best_epoches[peer_best_epoches.len() / 2])
257 }
258
259 pub fn best_peer_epoch(&self) -> Option<u64> {
260 self.peers
261 .read()
262 .iter()
263 .map(|(_, state)| state.read().best_epoch)
264 .fold(None, |max, x| match max {
265 None => Some(x),
266 Some(max) => Some(if x > max { x } else { max }),
267 })
268 }
269}
270
271#[derive(Default)]
272pub struct PeerFilter<'a> {
274 throttle_msg_ids: Option<HashSet<MsgId>>,
275 preferred_node_type: Option<NodeType>,
276 excludes: Option<HashSet<NodeId>>,
277 choose_from: Option<&'a HashSet<NodeId>>,
278 cap: Option<DynamicCapability>,
279 min_best_epoch: Option<u64>,
280}
281
282impl<'a> PeerFilter<'a> {
283 pub fn new(msg_id: MsgId) -> Self { PeerFilter::default().throttle(msg_id) }
284
285 pub fn with_preferred_node_type(mut self, node_type: NodeType) -> Self {
286 self.preferred_node_type = Some(node_type);
287 self
288 }
289
290 pub fn throttle(mut self, msg_id: MsgId) -> Self {
291 self.throttle_msg_ids
292 .get_or_insert_with(|| HashSet::new())
293 .insert(msg_id);
294 self
295 }
296
297 pub fn exclude(mut self, node_id: NodeId) -> Self {
298 self.excludes
299 .get_or_insert_with(|| HashSet::new())
300 .insert(node_id);
301 self
302 }
303
304 pub fn choose_from(mut self, peer_set: &'a HashSet<NodeId>) -> Self {
306 self.choose_from = Some(peer_set);
307 self
308 }
309
310 pub fn with_cap(mut self, cap: DynamicCapability) -> Self {
311 self.cap.replace(cap);
312 self
313 }
314
315 pub fn with_min_best_epoch(mut self, min_best_epoch: u64) -> Self {
316 self.min_best_epoch.replace(min_best_epoch);
317 self
318 }
319
320 pub fn select_all(self, syn: &SynchronizationState) -> Vec<NodeId> {
321 let mut peers = Vec::new();
322
323 let check_state = self.throttle_msg_ids.is_some()
324 || self.cap.is_some()
325 || self.min_best_epoch.is_some();
326
327 for (id, peer) in syn.peers.read().iter() {
328 let peer_node_type = peer.read().node_type.clone();
329 if let Some(ref preferred_node_type) = self.preferred_node_type {
330 if *preferred_node_type != peer_node_type {
331 continue;
332 }
333 }
334
335 if let Some(ref excludes) = self.excludes {
336 if excludes.contains(id) {
337 continue;
338 }
339 }
340
341 if let Some(ref choose_from) = self.choose_from {
342 if !choose_from.contains(id) {
343 continue;
344 }
345 }
346
347 if check_state {
348 let mut peer = peer.write();
349
350 if syn.is_consortium() {
351 if !peer.is_validator {
352 continue;
353 }
354 }
355
356 if let Some(ref ids) = self.throttle_msg_ids {
357 if ids
358 .iter()
359 .any(|id| peer.throttled_msgs.check_throttled(id))
360 {
361 continue;
362 }
363 }
364
365 if let Some(cap) = self.cap {
366 if !peer.capabilities.contains(cap) {
367 continue;
368 }
369 }
370
371 if let Some(min) = self.min_best_epoch {
372 if peer.best_epoch < min {
373 continue;
374 }
375 }
376 }
377
378 peers.push(*id);
379 }
380
381 peers
382 }
383
384 pub fn select(self, syn: &SynchronizationState) -> Option<NodeId> {
385 self.select_all(syn).choose(&mut random::new()).cloned()
386 }
387
388 pub fn select_n(self, n: usize, syn: &SynchronizationState) -> Vec<NodeId> {
389 let mut peers = self.select_all(syn);
390 peers.shuffle(&mut random::new());
391 peers.truncate(n);
392 peers
393 }
394}