cfxcore/sync/
synchronization_state.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use cfx_types::H256;
6//use slab::Slab;
7use crate::{
8    message::MsgId,
9    sync::{
10        message::{DynamicCapability, DynamicCapabilitySet},
11        random, Error,
12    },
13    NodeType,
14};
15use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
16use network::{
17    node_table::NodeId, service::ProtocolVersion, Error as NetworkError,
18};
19use parking_lot::RwLock;
20use rand::prelude::{IndexedRandom, SliceRandom};
21use std::{
22    collections::{HashMap, HashSet},
23    sync::Arc,
24    time::{Duration, Instant},
25};
26use throttling::token_bucket::{ThrottledManager, TokenBucketManager};
27
28#[derive(DeriveMallocSizeOf)]
29pub struct SynchronizationPeerState {
30    pub node_id: NodeId,
31    pub node_type: NodeType,
32    // This field is only used for consortium setup.
33    // Whether this node is a validator.
34    pub is_validator: bool,
35    pub protocol_version: ProtocolVersion,
36    pub genesis_hash: H256,
37    pub best_epoch: u64,
38    pub latest_block_hashes: HashSet<H256>,
39
40    /// The following fields are used to control how to handle
41    /// transaction propagation for nodes in catch-up mode.
42    pub received_transaction_count: usize,
43
44    // heartbeat is used to disconnect inactive nodes periodically,
45    // and updated when new message received.
46    pub heartbeat: Instant,
47
48    // latest received capabilities from the remote peer.
49    pub capabilities: DynamicCapabilitySet,
50    // latest notified capabilities of mine to the remote peer.
51    pub notified_capabilities: DynamicCapabilitySet,
52
53    // Used to throttle the P2P messages from remote peer, so as to avoid DoS
54    // attack. E.g. send large number of P2P messages to query blocks.
55    pub throttling: TokenBucketManager,
56    // Used to track the throttled P2P messages to remote peer. When throttled,
57    // should not send requests to the remote peer. Otherwise, the remote peer
58    // may disconnect the TCP connection.
59    pub throttled_msgs: ThrottledManager<MsgId>,
60}
61
62impl SynchronizationPeerState {
63    pub fn update(
64        &mut self, node_type: Option<NodeType>,
65        latest_block_hashes: HashSet<H256>, best_epoch: u64,
66    ) -> bool {
67        if let Some(node_type) = node_type {
68            self.node_type = node_type;
69        }
70        self.heartbeat = Instant::now();
71
72        let updated = best_epoch != self.best_epoch
73            || latest_block_hashes != self.latest_block_hashes;
74
75        // NOTE: we need to update best_epoch even if it's smaller than
76        // the previous value, otherwise sync will get stuck in tests
77        // with large chain reorg (decreasing best epoch value)
78        if updated {
79            self.best_epoch = best_epoch;
80            self.latest_block_hashes = latest_block_hashes;
81        }
82
83        updated
84    }
85}
86
87pub type SynchronizationPeers =
88    HashMap<NodeId, Arc<RwLock<SynchronizationPeerState>>>;
89
90#[derive(DeriveMallocSizeOf)]
91pub struct SynchronizationState {
92    is_consortium: bool,
93    node_type: NodeType,
94    allow_phase_change_without_peer: bool,
95    min_phase_change_normal_peer_count: usize,
96    pub peers: RwLock<SynchronizationPeers>,
97    pub handshaking_peers: RwLock<HashMap<NodeId, (ProtocolVersion, Instant)>>,
98    pub last_sent_transaction_hashes: RwLock<HashSet<H256>>,
99}
100
101impl SynchronizationState {
102    pub fn new(
103        is_consortium: bool, node_type: NodeType,
104        allow_phase_change_without_peer: bool,
105        min_phase_change_normal_peer_count: usize,
106    ) -> Self {
107        SynchronizationState {
108            is_consortium,
109            node_type,
110            allow_phase_change_without_peer,
111            min_phase_change_normal_peer_count,
112            peers: Default::default(),
113            handshaking_peers: Default::default(),
114            last_sent_transaction_hashes: Default::default(),
115        }
116    }
117
118    pub fn is_consortium(&self) -> bool { self.is_consortium }
119
120    pub fn on_status_in_handshaking(
121        &self, node_id: &NodeId,
122    ) -> Option<ProtocolVersion> {
123        let peers = self.peers.read();
124        let mut handshaking_peers = self.handshaking_peers.write();
125        if !peers.contains_key(node_id) {
126            handshaking_peers.remove(node_id).map(|(v, _)| v)
127        } else {
128            None
129        }
130    }
131
132    pub fn peer_connected(
133        &self, node_id: NodeId, state: SynchronizationPeerState,
134    ) {
135        let mut peers = self.peers.write();
136        if self.is_consortium() {
137            unimplemented!();
138        } else {
139            peers.insert(node_id, Arc::new(RwLock::new(state)));
140        }
141    }
142
143    pub fn contains_peer(&self, node_id: &NodeId) -> bool {
144        self.peers.read().contains_key(node_id)
145    }
146
147    pub fn get_peer_info(
148        &self, node_id: &NodeId,
149    ) -> Result<Arc<RwLock<SynchronizationPeerState>>, Error> {
150        Ok(self
151            .peers
152            .read()
153            .get(node_id)
154            .ok_or(Error::UnknownPeer)?
155            .clone())
156    }
157
158    pub fn get_peer_version(
159        &self, peer: &NodeId,
160    ) -> Result<ProtocolVersion, NetworkError> {
161        match self.get_peer_info(peer) {
162            Err(_) => bail!(NetworkError::InvalidNodeId),
163            Ok(info) => Ok(info.read().protocol_version),
164        }
165    }
166
167    /// Updates the heartbeat for the specified peer. It takes no effect if the
168    /// peer is in handshaking status or not found.
169    pub fn update_heartbeat(&self, node_id: &NodeId) {
170        if let Some(state) = self.peers.read().get(node_id) {
171            state.write().heartbeat = Instant::now();
172        }
173    }
174
175    /// Retrieves the heartbeat timeout peers, including handshaking timeout
176    /// peers and inactive peers after handshake.
177    pub fn get_heartbeat_timeout_peers(
178        &self, timeout: Duration,
179    ) -> Vec<NodeId> {
180        let mut timeout_peers = Vec::new();
181
182        for (peer, (_, handshake_time)) in self.handshaking_peers.read().iter()
183        {
184            if handshake_time.elapsed() > timeout {
185                timeout_peers.push(*peer);
186            }
187        }
188
189        for (peer, state) in self.peers.read().iter() {
190            if state.read().heartbeat.elapsed() > timeout {
191                timeout_peers.push(*peer);
192            }
193        }
194
195        timeout_peers
196    }
197
198    pub fn is_full_node(&self) -> bool { self.node_type == NodeType::Full }
199
200    pub fn allow_phase_change_without_peer(&self) -> bool {
201        self.allow_phase_change_without_peer
202    }
203
204    // FIXME: median_chain_height_from_peers.
205    // FIXME: it lead to more questions but these are questions on the
206    // FIXME: algorithm side.
207    pub fn median_epoch_from_normal_peers(&self) -> Option<u64> {
208        // This flag is set to true if all peers are just starting from a clean
209        // state, so we can just enter the normal phase because there's
210        // nothing to catch up.
211        let mut fresh_start = true;
212        let mut peer_best_epoches = Vec::new();
213        {
214            let peers = self.peers.read();
215            if peers.is_empty() {
216                // We do not have peers, so just assume that this is not a fresh
217                // start and do not enter NormalPhase.
218                debug!("median_epoch_from_normal_peers: no connected peers");
219                fresh_start = false;
220            }
221            for (_, state_lock) in &*peers {
222                let state = state_lock.read();
223                if state
224                    .capabilities
225                    .contains(DynamicCapability::NormalPhase(true))
226                {
227                    fresh_start = false;
228                    peer_best_epoches.push(state.best_epoch);
229                } else if state.best_epoch != 0 {
230                    // Note `best_epoch` is initialized according to Status,
231                    // so if it's 0, the peer is just newly started
232                    fresh_start = false;
233                    debug!("median_epoch_from_normal_peers: not fresh start");
234                }
235            }
236        };
237
238        // `peer_best_epoches.is_empty()` is only possible if
239        // `self.min_phase_change_normal_peer_count == 0`
240        if peer_best_epoches.len() < self.min_phase_change_normal_peer_count
241            || peer_best_epoches.is_empty()
242        {
243            return if fresh_start {
244                debug!("median_epoch_from_normal_peers: fresh start");
245                Some(0)
246            } else {
247                debug!(
248                    "median_epoch_from_normal_peers: no enough peers in normal phase, have {}, require {}",
249                    peer_best_epoches.len(), self.min_phase_change_normal_peer_count
250                );
251                None
252            };
253        }
254
255        peer_best_epoches.sort();
256        Some(peer_best_epoches[peer_best_epoches.len() / 2])
257    }
258
259    pub fn best_peer_epoch(&self) -> Option<u64> {
260        self.peers
261            .read()
262            .iter()
263            .map(|(_, state)| state.read().best_epoch)
264            .fold(None, |max, x| match max {
265                None => Some(x),
266                Some(max) => Some(if x > max { x } else { max }),
267            })
268    }
269}
270
271#[derive(Default)]
272/// Filter peers that match ``all'' the provided conditions.
273pub struct PeerFilter<'a> {
274    throttle_msg_ids: Option<HashSet<MsgId>>,
275    preferred_node_type: Option<NodeType>,
276    excludes: Option<HashSet<NodeId>>,
277    choose_from: Option<&'a HashSet<NodeId>>,
278    cap: Option<DynamicCapability>,
279    min_best_epoch: Option<u64>,
280}
281
282impl<'a> PeerFilter<'a> {
283    pub fn new(msg_id: MsgId) -> Self { PeerFilter::default().throttle(msg_id) }
284
285    pub fn with_preferred_node_type(mut self, node_type: NodeType) -> Self {
286        self.preferred_node_type = Some(node_type);
287        self
288    }
289
290    pub fn throttle(mut self, msg_id: MsgId) -> Self {
291        self.throttle_msg_ids
292            .get_or_insert_with(|| HashSet::new())
293            .insert(msg_id);
294        self
295    }
296
297    pub fn exclude(mut self, node_id: NodeId) -> Self {
298        self.excludes
299            .get_or_insert_with(|| HashSet::new())
300            .insert(node_id);
301        self
302    }
303
304    /// Exclude the peers not in the `peer_set`
305    pub fn choose_from(mut self, peer_set: &'a HashSet<NodeId>) -> Self {
306        self.choose_from = Some(peer_set);
307        self
308    }
309
310    pub fn with_cap(mut self, cap: DynamicCapability) -> Self {
311        self.cap.replace(cap);
312        self
313    }
314
315    pub fn with_min_best_epoch(mut self, min_best_epoch: u64) -> Self {
316        self.min_best_epoch.replace(min_best_epoch);
317        self
318    }
319
320    pub fn select_all(self, syn: &SynchronizationState) -> Vec<NodeId> {
321        let mut peers = Vec::new();
322
323        let check_state = self.throttle_msg_ids.is_some()
324            || self.cap.is_some()
325            || self.min_best_epoch.is_some();
326
327        for (id, peer) in syn.peers.read().iter() {
328            let peer_node_type = peer.read().node_type.clone();
329            if let Some(ref preferred_node_type) = self.preferred_node_type {
330                if *preferred_node_type != peer_node_type {
331                    continue;
332                }
333            }
334
335            if let Some(ref excludes) = self.excludes {
336                if excludes.contains(id) {
337                    continue;
338                }
339            }
340
341            if let Some(ref choose_from) = self.choose_from {
342                if !choose_from.contains(id) {
343                    continue;
344                }
345            }
346
347            if check_state {
348                let mut peer = peer.write();
349
350                if syn.is_consortium() {
351                    if !peer.is_validator {
352                        continue;
353                    }
354                }
355
356                if let Some(ref ids) = self.throttle_msg_ids {
357                    if ids
358                        .iter()
359                        .any(|id| peer.throttled_msgs.check_throttled(id))
360                    {
361                        continue;
362                    }
363                }
364
365                if let Some(cap) = self.cap {
366                    if !peer.capabilities.contains(cap) {
367                        continue;
368                    }
369                }
370
371                if let Some(min) = self.min_best_epoch {
372                    if peer.best_epoch < min {
373                        continue;
374                    }
375                }
376            }
377
378            peers.push(*id);
379        }
380
381        peers
382    }
383
384    pub fn select(self, syn: &SynchronizationState) -> Option<NodeId> {
385        self.select_all(syn).choose(&mut random::new()).cloned()
386    }
387
388    pub fn select_n(self, n: usize, syn: &SynchronizationState) -> Vec<NodeId> {
389        let mut peers = self.select_all(syn);
390        peers.shuffle(&mut random::new());
391        peers.truncate(n);
392        peers
393    }
394}