1use crate::{
6 sync::{
7 message::{
8 handleable::{Context, Handleable},
9 DynamicCapability,
10 },
11 Error, SynchronizationPeerState,
12 },
13 NodeType,
14};
15use cfx_internal_common::ChainIdParamsDeprecated;
16use cfx_types::H256;
17use network::{NODE_TAG_ARCHIVE, NODE_TAG_FULL, NODE_TAG_NODE_TYPE};
18use rlp_derive::{RlpDecodable, RlpEncodable};
19use std::{collections::HashSet, time::Instant};
20use throttling::token_bucket::TokenBucketManager;
21
22#[derive(Debug, PartialEq, RlpDecodable, RlpEncodable)]
23pub struct StatusV2 {
24 pub chain_id: ChainIdParamsDeprecated,
25 pub genesis_hash: H256,
26 pub best_epoch: u64,
27 pub terminal_block_hashes: Vec<H256>,
28}
29
30impl Handleable for StatusV2 {
31 fn handle(self, ctx: &Context) -> Result<(), Error> {
32 debug!("on_status, msg=:{:?}", self);
33
34 let chain_id = ctx
35 .manager
36 .graph
37 .consensus
38 .best_chain_id()
39 .in_native_space();
40 if chain_id != self.chain_id.chain_id {
41 debug!(
42 "Peer {:?} chain_id mismatches (ours: {:?}, theirs: {:?})",
43 ctx.node_id, chain_id, self.chain_id,
44 );
45 bail!(Error::InvalidStatus("chain_id mismatches".into()));
46 }
47 let genesis_hash = ctx.manager.graph.data_man.true_genesis.hash();
48 if ctx.manager.protocol_config.check_status_genesis {
49 if genesis_hash != self.genesis_hash {
50 debug!(
51 "Peer {:?} genesis hash mismatches (ours: {:?}, theirs: {:?})",
52 ctx.node_id, genesis_hash, self.genesis_hash
53 );
54 bail!(Error::InvalidStatus("genesis hash mismatches".into()));
55 }
56 }
57
58 let latest: HashSet<H256> =
59 self.terminal_block_hashes.iter().cloned().collect();
60
61 if let Ok(peer_info) = ctx.manager.syn.get_peer_info(&ctx.node_id) {
62 let latest_updated = {
63 let mut peer_info = peer_info.write();
64 peer_info.update(
65 Some(NodeType::Unknown),
66 latest,
67 self.best_epoch,
68 )
69 };
70
71 if latest_updated {
72 ctx.manager.start_sync(ctx.io);
73 }
74 } else {
75 let peer_protocol_version =
76 match ctx.manager.syn.on_status_in_handshaking(&ctx.node_id) {
77 None => {
78 warn!(
79 "Unexpected Status message from peer={}",
80 ctx.node_id
81 );
82 return Err(Error::UnknownPeer.into());
83 }
84 Some(protocol_version) => protocol_version,
85 };
86
87 let throttling =
88 match ctx.manager.protocol_config.throttling_config_file {
89 Some(ref file) => {
90 TokenBucketManager::load(file, Some("sync_protocol"))
91 .expect("invalid throttling configuration file")
92 }
93 None => TokenBucketManager::default(),
94 };
95
96 let mut peer_state = SynchronizationPeerState {
97 node_id: ctx.node_id(),
98 node_type: NodeType::Unknown,
99 is_validator: false,
100 protocol_version: peer_protocol_version,
101 genesis_hash,
102 best_epoch: self.best_epoch,
103 latest_block_hashes: latest,
104 received_transaction_count: 0,
105 heartbeat: Instant::now(),
106 capabilities: Default::default(),
107 notified_capabilities: Default::default(),
108 throttling,
109 throttled_msgs: Default::default(),
110 };
111
112 peer_state
113 .capabilities
114 .insert(DynamicCapability::NormalPhase(true));
115
116 debug!(
117 "New peer (pv={:?}, gh={:?})",
118 peer_protocol_version, self.genesis_hash
119 );
120
121 debug!("Peer {:?} connected", ctx.node_id);
122 ctx.manager
123 .syn
124 .peer_connected(ctx.node_id.clone(), peer_state);
125 ctx.manager.request_manager.on_peer_connected(&ctx.node_id);
126
127 ctx.manager.start_sync(ctx.io);
128 }
129
130 Ok(())
131 }
132}
133
134#[derive(Debug, PartialEq, RlpDecodable, RlpEncodable)]
135pub struct StatusV3 {
136 pub chain_id: ChainIdParamsDeprecated,
137 pub genesis_hash: H256,
138 pub best_epoch: u64,
139 pub node_type: NodeType,
140 pub terminal_block_hashes: Vec<H256>,
141}
142
143impl Handleable for StatusV3 {
144 fn handle(self, ctx: &Context) -> Result<(), Error> {
145 debug!("on_status, msg=:{:?}", self);
146
147 let chain_id = ctx
148 .manager
149 .graph
150 .consensus
151 .config()
152 .chain_id
153 .read()
154 .to_native_space_params();
155 if !chain_id.matches(&self.chain_id.clone().into(), self.best_epoch) {
156 debug!(
157 "Peer {:?} chain_id mismatches (ours: {:?}, theirs: {:?})",
158 ctx.node_id, chain_id, self.chain_id,
159 );
160 bail!(Error::InvalidStatus("chain_id mismatches".into()));
161 }
162 drop(chain_id);
163
164 let genesis_hash = ctx.manager.graph.data_man.true_genesis.hash();
165 if ctx.manager.protocol_config.check_status_genesis {
166 if genesis_hash != self.genesis_hash {
167 debug!(
168 "Peer {:?} genesis hash mismatches (ours: {:?}, theirs: {:?})",
169 ctx.node_id, genesis_hash, self.genesis_hash
170 );
171 bail!(Error::InvalidStatus("genesis hash mismatches".into()));
172 }
173 }
174
175 let latest: HashSet<H256> =
176 self.terminal_block_hashes.iter().cloned().collect();
177
178 match self.node_type {
179 NodeType::Archive => {
180 let key: String = NODE_TAG_NODE_TYPE.into();
181 let value: String = NODE_TAG_ARCHIVE.into();
182 ctx.insert_peer_node_tag(ctx.node_id(), &key, &value);
183 }
184 NodeType::Full => {
185 let key: String = NODE_TAG_NODE_TYPE.into();
186 let value: String = NODE_TAG_FULL.into();
187 ctx.insert_peer_node_tag(ctx.node_id(), &key, &value);
188 }
189 _ => {}
190 };
191
192 if let Ok(peer_info) = ctx.manager.syn.get_peer_info(&ctx.node_id) {
193 let latest_updated = {
194 let mut peer_info = peer_info.write();
195 peer_info.update(Some(self.node_type), latest, self.best_epoch)
196 };
197
198 if latest_updated {
199 ctx.manager.start_sync(ctx.io);
200 }
201 } else {
202 let peer_protocol_version =
203 match ctx.manager.syn.on_status_in_handshaking(&ctx.node_id) {
204 None => {
205 warn!(
206 "Unexpected Status message from peer={}",
207 ctx.node_id
208 );
209 return Err(Error::UnknownPeer.into());
210 }
211 Some(protocol_version) => protocol_version,
212 };
213
214 let throttling =
215 match ctx.manager.protocol_config.throttling_config_file {
216 Some(ref file) => {
217 TokenBucketManager::load(file, Some("sync_protocol"))
218 .expect("invalid throttling configuration file")
219 }
220 None => TokenBucketManager::default(),
221 };
222
223 let mut peer_state = SynchronizationPeerState {
224 node_id: ctx.node_id(),
225 node_type: self.node_type,
226 is_validator: false,
227 protocol_version: peer_protocol_version,
228 genesis_hash,
229 best_epoch: self.best_epoch,
230 latest_block_hashes: latest,
231 received_transaction_count: 0,
232 heartbeat: Instant::now(),
233 capabilities: Default::default(),
234 notified_capabilities: Default::default(),
235 throttling,
236 throttled_msgs: Default::default(),
237 };
238
239 peer_state
240 .capabilities
241 .insert(DynamicCapability::NormalPhase(true));
242
243 debug!(
244 "New peer (pv={:?}, gh={:?})",
245 peer_protocol_version, self.genesis_hash
246 );
247
248 debug!("Peer {:?} connected", ctx.node_id);
249 ctx.manager
250 .syn
251 .peer_connected(ctx.node_id.clone(), peer_state);
252 ctx.manager.request_manager.on_peer_connected(&ctx.node_id);
253
254 ctx.manager.start_sync(ctx.io);
255 }
256
257 Ok(())
258 }
259}