cfxcore/sync/message/
status.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::{
6    sync::{
7        message::{
8            handleable::{Context, Handleable},
9            DynamicCapability,
10        },
11        Error, SynchronizationPeerState,
12    },
13    NodeType,
14};
15use cfx_internal_common::ChainIdParamsDeprecated;
16use cfx_types::H256;
17use network::{NODE_TAG_ARCHIVE, NODE_TAG_FULL, NODE_TAG_NODE_TYPE};
18use rlp_derive::{RlpDecodable, RlpEncodable};
19use std::{collections::HashSet, time::Instant};
20use throttling::token_bucket::TokenBucketManager;
21
22#[derive(Debug, PartialEq, RlpDecodable, RlpEncodable)]
23pub struct StatusV2 {
24    pub chain_id: ChainIdParamsDeprecated,
25    pub genesis_hash: H256,
26    pub best_epoch: u64,
27    pub terminal_block_hashes: Vec<H256>,
28}
29
30impl Handleable for StatusV2 {
31    fn handle(self, ctx: &Context) -> Result<(), Error> {
32        debug!("on_status, msg=:{:?}", self);
33
34        let chain_id = ctx
35            .manager
36            .graph
37            .consensus
38            .best_chain_id()
39            .in_native_space();
40        if chain_id != self.chain_id.chain_id {
41            debug!(
42                "Peer {:?} chain_id mismatches (ours: {:?}, theirs: {:?})",
43                ctx.node_id, chain_id, self.chain_id,
44            );
45            bail!(Error::InvalidStatus("chain_id mismatches".into()));
46        }
47        let genesis_hash = ctx.manager.graph.data_man.true_genesis.hash();
48        if ctx.manager.protocol_config.check_status_genesis {
49            if genesis_hash != self.genesis_hash {
50                debug!(
51                "Peer {:?} genesis hash mismatches (ours: {:?}, theirs: {:?})",
52                ctx.node_id, genesis_hash, self.genesis_hash
53            );
54                bail!(Error::InvalidStatus("genesis hash mismatches".into()));
55            }
56        }
57
58        let latest: HashSet<H256> =
59            self.terminal_block_hashes.iter().cloned().collect();
60
61        if let Ok(peer_info) = ctx.manager.syn.get_peer_info(&ctx.node_id) {
62            let latest_updated = {
63                let mut peer_info = peer_info.write();
64                peer_info.update(
65                    Some(NodeType::Unknown),
66                    latest,
67                    self.best_epoch,
68                )
69            };
70
71            if latest_updated {
72                ctx.manager.start_sync(ctx.io);
73            }
74        } else {
75            let peer_protocol_version =
76                match ctx.manager.syn.on_status_in_handshaking(&ctx.node_id) {
77                    None => {
78                        warn!(
79                            "Unexpected Status message from peer={}",
80                            ctx.node_id
81                        );
82                        return Err(Error::UnknownPeer.into());
83                    }
84                    Some(protocol_version) => protocol_version,
85                };
86
87            let throttling =
88                match ctx.manager.protocol_config.throttling_config_file {
89                    Some(ref file) => {
90                        TokenBucketManager::load(file, Some("sync_protocol"))
91                            .expect("invalid throttling configuration file")
92                    }
93                    None => TokenBucketManager::default(),
94                };
95
96            let mut peer_state = SynchronizationPeerState {
97                node_id: ctx.node_id(),
98                node_type: NodeType::Unknown,
99                is_validator: false,
100                protocol_version: peer_protocol_version,
101                genesis_hash,
102                best_epoch: self.best_epoch,
103                latest_block_hashes: latest,
104                received_transaction_count: 0,
105                heartbeat: Instant::now(),
106                capabilities: Default::default(),
107                notified_capabilities: Default::default(),
108                throttling,
109                throttled_msgs: Default::default(),
110            };
111
112            peer_state
113                .capabilities
114                .insert(DynamicCapability::NormalPhase(true));
115
116            debug!(
117                "New peer (pv={:?}, gh={:?})",
118                peer_protocol_version, self.genesis_hash
119            );
120
121            debug!("Peer {:?} connected", ctx.node_id);
122            ctx.manager
123                .syn
124                .peer_connected(ctx.node_id.clone(), peer_state);
125            ctx.manager.request_manager.on_peer_connected(&ctx.node_id);
126
127            ctx.manager.start_sync(ctx.io);
128        }
129
130        Ok(())
131    }
132}
133
134#[derive(Debug, PartialEq, RlpDecodable, RlpEncodable)]
135pub struct StatusV3 {
136    pub chain_id: ChainIdParamsDeprecated,
137    pub genesis_hash: H256,
138    pub best_epoch: u64,
139    pub node_type: NodeType,
140    pub terminal_block_hashes: Vec<H256>,
141}
142
143impl Handleable for StatusV3 {
144    fn handle(self, ctx: &Context) -> Result<(), Error> {
145        debug!("on_status, msg=:{:?}", self);
146
147        let chain_id = ctx
148            .manager
149            .graph
150            .consensus
151            .config()
152            .chain_id
153            .read()
154            .to_native_space_params();
155        if !chain_id.matches(&self.chain_id.clone().into(), self.best_epoch) {
156            debug!(
157                "Peer {:?} chain_id mismatches (ours: {:?}, theirs: {:?})",
158                ctx.node_id, chain_id, self.chain_id,
159            );
160            bail!(Error::InvalidStatus("chain_id mismatches".into()));
161        }
162        drop(chain_id);
163
164        let genesis_hash = ctx.manager.graph.data_man.true_genesis.hash();
165        if ctx.manager.protocol_config.check_status_genesis {
166            if genesis_hash != self.genesis_hash {
167                debug!(
168                "Peer {:?} genesis hash mismatches (ours: {:?}, theirs: {:?})",
169                ctx.node_id, genesis_hash, self.genesis_hash
170            );
171                bail!(Error::InvalidStatus("genesis hash mismatches".into()));
172            }
173        }
174
175        let latest: HashSet<H256> =
176            self.terminal_block_hashes.iter().cloned().collect();
177
178        match self.node_type {
179            NodeType::Archive => {
180                let key: String = NODE_TAG_NODE_TYPE.into();
181                let value: String = NODE_TAG_ARCHIVE.into();
182                ctx.insert_peer_node_tag(ctx.node_id(), &key, &value);
183            }
184            NodeType::Full => {
185                let key: String = NODE_TAG_NODE_TYPE.into();
186                let value: String = NODE_TAG_FULL.into();
187                ctx.insert_peer_node_tag(ctx.node_id(), &key, &value);
188            }
189            _ => {}
190        };
191
192        if let Ok(peer_info) = ctx.manager.syn.get_peer_info(&ctx.node_id) {
193            let latest_updated = {
194                let mut peer_info = peer_info.write();
195                peer_info.update(Some(self.node_type), latest, self.best_epoch)
196            };
197
198            if latest_updated {
199                ctx.manager.start_sync(ctx.io);
200            }
201        } else {
202            let peer_protocol_version =
203                match ctx.manager.syn.on_status_in_handshaking(&ctx.node_id) {
204                    None => {
205                        warn!(
206                            "Unexpected Status message from peer={}",
207                            ctx.node_id
208                        );
209                        return Err(Error::UnknownPeer.into());
210                    }
211                    Some(protocol_version) => protocol_version,
212                };
213
214            let throttling =
215                match ctx.manager.protocol_config.throttling_config_file {
216                    Some(ref file) => {
217                        TokenBucketManager::load(file, Some("sync_protocol"))
218                            .expect("invalid throttling configuration file")
219                    }
220                    None => TokenBucketManager::default(),
221                };
222
223            let mut peer_state = SynchronizationPeerState {
224                node_id: ctx.node_id(),
225                node_type: self.node_type,
226                is_validator: false,
227                protocol_version: peer_protocol_version,
228                genesis_hash,
229                best_epoch: self.best_epoch,
230                latest_block_hashes: latest,
231                received_transaction_count: 0,
232                heartbeat: Instant::now(),
233                capabilities: Default::default(),
234                notified_capabilities: Default::default(),
235                throttling,
236                throttled_msgs: Default::default(),
237            };
238
239            peer_state
240                .capabilities
241                .insert(DynamicCapability::NormalPhase(true));
242
243            debug!(
244                "New peer (pv={:?}, gh={:?})",
245                peer_protocol_version, self.genesis_hash
246            );
247
248            debug!("Peer {:?} connected", ctx.node_id);
249            ctx.manager
250                .syn
251                .peer_connected(ctx.node_id.clone(), peer_state);
252            ctx.manager.request_manager.on_peer_connected(&ctx.node_id);
253
254            ctx.manager.start_sync(ctx.io);
255        }
256
257        Ok(())
258    }
259}