1use super::*;
6use crate::{
7 message::{
8 decode_rlp_and_check_deprecation, GetMaybeRequestId, Message,
9 MessageProtocolVersionBound, MsgId, RequestId, SetRequestId,
10 },
11 sync::{
12 message::throttling::Throttle, Error, SYNC_PROTO_V1, SYNC_PROTO_V2,
13 SYNC_PROTO_V3,
14 },
15};
16use network::{service::ProtocolVersion, NetworkProtocolHandler};
17pub use priority_send_queue::SendQueuePriority;
18use rlp::{Decodable, Encodable, Rlp};
19
20build_msgid! {
22 NEW_BLOCK_HASHES = 0x01
23 TRANSACTIONS = 0x02
24 GET_BLOCK_HASHES = 0x03
25 GET_BLOCK_HASHES_RESPONSE = 0x04
26 GET_BLOCK_HEADERS = 0x05
27 GET_BLOCK_HEADERS_RESPONSE = 0x06
28 GET_BLOCK_BODIES = 0x07
29 GET_BLOCK_BODIES_RESPONSE = 0x08
30 NEW_BLOCK = 0x09
31 GET_TERMINAL_BLOCK_HASHES_RESPONSE = 0x0a
32 GET_TERMINAL_BLOCK_HASHES = 0x0b
33 GET_BLOCKS = 0x0c
34 GET_BLOCKS_RESPONSE = 0x0d
35 GET_BLOCKS_WITH_PUBLIC_RESPONSE = 0x0e
36 GET_CMPCT_BLOCKS = 0x0f
37 GET_CMPCT_BLOCKS_RESPONSE = 0x10
38 GET_BLOCK_TXN = 0x11
39 GET_BLOCK_TXN_RESPONSE = 0x12
40 DYNAMIC_CAPABILITY_CHANGE = 0x13
41 TRANSACTION_DIGESTS = 0x14
42 GET_TRANSACTIONS = 0x15
43 GET_TRANSACTIONS_RESPONSE = 0x16
44 GET_BLOCK_HASHES_BY_EPOCH = 0x17
45 GET_BLOCK_HEADER_CHAIN = 0x18
46 GET_SNAPSHOT_MANIFEST = 0x19
47 GET_SNAPSHOT_MANIFEST_RESPONSE = 0x1a
48 GET_SNAPSHOT_CHUNK = 0x1b
49 GET_SNAPSHOT_CHUNK_RESPONSE = 0x1c
50 GET_TRANSACTIONS_FROM_TX_HASHES = 0x1d
51 GET_TRANSACTIONS_FROM_TX_HASHES_RESPONSE = 0x1e
52 STATE_SYNC_CANDIDATE_REQUEST = 0x20
53 STATE_SYNC_CANDIDATE_RESPONSE = 0x21
54 STATUS_V2 = 0x22
55 STATUS_V3 = 0x23
56 HEARTBEAT = 0x24
57
58 NET_INFLIGHT_BLOCKS = 0xf0
60 THROTTLED = 0xfe
61
62 INVALID = 0xff
63 }
68
69build_msg_impl! { StatusV2, msgid::STATUS_V2, "StatusV2", SYNC_PROTO_V2, SYNC_PROTO_V2 }
72build_msg_impl! { StatusV3, msgid::STATUS_V3, "StatusV3", SYNC_PROTO_V3, SYNC_PROTO_V3 }
73build_msg_impl! { Heartbeat, msgid::HEARTBEAT, "Heartbeat", SYNC_PROTO_V3, SYNC_PROTO_V3 }
74build_msg_impl! { NewBlockHashes, msgid::NEW_BLOCK_HASHES, "NewBlockHashes", SYNC_PROTO_V1, SYNC_PROTO_V3 }
75build_msg_with_request_id_impl! { GetBlockHeaders, msgid::GET_BLOCK_HEADERS, "GetBlockHeaders", SYNC_PROTO_V1, SYNC_PROTO_V3 }
76build_msg_impl! { GetBlockHeadersResponse, msgid::GET_BLOCK_HEADERS_RESPONSE, "GetBlockHeadersResponse", SYNC_PROTO_V1, SYNC_PROTO_V3 }
77build_msg_impl! { NewBlock, msgid::NEW_BLOCK, "NewBlock", SYNC_PROTO_V1, SYNC_PROTO_V3 }
78build_msg_impl! { GetTerminalBlockHashesResponse, msgid::GET_TERMINAL_BLOCK_HASHES_RESPONSE, "GetTerminalBlockHashesResponse", SYNC_PROTO_V1, SYNC_PROTO_V3 }
79build_msg_with_request_id_impl! { GetTerminalBlockHashes, msgid::GET_TERMINAL_BLOCK_HASHES, "GetTerminalBlockHashes", SYNC_PROTO_V1, SYNC_PROTO_V3 }
80build_msg_with_request_id_impl! { GetBlocks, msgid::GET_BLOCKS, "GetBlocks", SYNC_PROTO_V1, SYNC_PROTO_V3 }
81build_msg_with_request_id_impl! { GetCompactBlocks, msgid::GET_CMPCT_BLOCKS, "GetCompactBlocks", SYNC_PROTO_V1, SYNC_PROTO_V3 }
82build_msg_impl! { GetCompactBlocksResponse, msgid::GET_CMPCT_BLOCKS_RESPONSE, "GetCompactBlocksResponse", SYNC_PROTO_V1, SYNC_PROTO_V3 }
83build_msg_with_request_id_impl! { GetBlockTxn, msgid::GET_BLOCK_TXN, "GetBlockTxn", SYNC_PROTO_V1, SYNC_PROTO_V3 }
84build_msg_impl! { DynamicCapabilityChange, msgid::DYNAMIC_CAPABILITY_CHANGE, "DynamicCapabilityChange", SYNC_PROTO_V1, SYNC_PROTO_V3 }
85build_msg_with_request_id_impl! { GetBlockHashesByEpoch, msgid::GET_BLOCK_HASHES_BY_EPOCH, "GetBlockHashesByEpoch", SYNC_PROTO_V1, SYNC_PROTO_V3 }
86build_msg_impl! { Throttled, msgid::THROTTLED, "Throttled", SYNC_PROTO_V1, SYNC_PROTO_V3 }
87
88impl GetMaybeRequestId for GetBlockHashesResponse {}
89mark_msg_version_bound!(GetBlockHashesResponse, SYNC_PROTO_V1, SYNC_PROTO_V3);
90impl Message for GetBlockHashesResponse {
91 fn msg_id(&self) -> MsgId { msgid::GET_BLOCK_HASHES_RESPONSE }
92
93 fn msg_name(&self) -> &'static str { "GetBlockHashesResponse" }
94
95 fn priority(&self) -> SendQueuePriority { SendQueuePriority::Low }
96
97 fn encode(&self) -> Vec<u8> {
98 let mut encoded = self.rlp_bytes();
99 self.push_msg_id_leb128_encoding(&mut encoded);
100 encoded
101 }
102}
103
104impl GetMaybeRequestId for Transactions {}
106mark_msg_version_bound!(Transactions, SYNC_PROTO_V1, SYNC_PROTO_V3);
107impl Message for Transactions {
108 fn is_size_sensitive(&self) -> bool { self.transactions.len() > 1 }
109
110 fn msg_id(&self) -> MsgId { msgid::TRANSACTIONS }
111
112 fn msg_name(&self) -> &'static str { "Transactions" }
113
114 fn encode(&self) -> Vec<u8> {
115 let mut encoded = self.rlp_bytes();
116 self.push_msg_id_leb128_encoding(&mut encoded);
117 encoded
118 }
119}
120
121impl GetMaybeRequestId for GetBlocksResponse {}
122mark_msg_version_bound!(GetBlocksResponse, SYNC_PROTO_V1, SYNC_PROTO_V3);
123impl Message for GetBlocksResponse {
124 fn is_size_sensitive(&self) -> bool { self.blocks.len() > 0 }
125
126 fn msg_id(&self) -> MsgId { msgid::GET_BLOCKS_RESPONSE }
127
128 fn msg_name(&self) -> &'static str { "GetBlocksResponse" }
129
130 fn encode(&self) -> Vec<u8> {
131 let mut encoded = self.rlp_bytes();
132 self.push_msg_id_leb128_encoding(&mut encoded);
133 encoded
134 }
135}
136
137impl GetMaybeRequestId for GetBlocksWithPublicResponse {}
138mark_msg_version_bound!(
139 GetBlocksWithPublicResponse,
140 SYNC_PROTO_V1,
141 SYNC_PROTO_V3
142);
143impl Message for GetBlocksWithPublicResponse {
144 fn is_size_sensitive(&self) -> bool { self.blocks.len() > 0 }
145
146 fn msg_id(&self) -> MsgId { msgid::GET_BLOCKS_WITH_PUBLIC_RESPONSE }
147
148 fn msg_name(&self) -> &'static str { "GetBlocksWithPublicResponse" }
149
150 fn encode(&self) -> Vec<u8> {
151 let mut encoded = self.rlp_bytes();
152 self.push_msg_id_leb128_encoding(&mut encoded);
153 encoded
154 }
155}
156
157impl GetMaybeRequestId for GetBlockTxnResponse {}
158mark_msg_version_bound!(GetBlockTxnResponse, SYNC_PROTO_V1, SYNC_PROTO_V3);
159impl Message for GetBlockTxnResponse {
160 fn is_size_sensitive(&self) -> bool { self.block_txn.len() > 1 }
161
162 fn msg_id(&self) -> MsgId { msgid::GET_BLOCK_TXN_RESPONSE }
163
164 fn msg_name(&self) -> &'static str { "GetBlockTxnResponse" }
165
166 fn encode(&self) -> Vec<u8> {
167 let mut encoded = self.rlp_bytes();
168 self.push_msg_id_leb128_encoding(&mut encoded);
169 encoded
170 }
171}
172
173impl GetMaybeRequestId for TransactionDigests {}
174mark_msg_version_bound!(TransactionDigests, SYNC_PROTO_V1, SYNC_PROTO_V3);
175impl Message for TransactionDigests {
176 fn is_size_sensitive(&self) -> bool { self.len() > 1 }
177
178 fn msg_id(&self) -> MsgId { msgid::TRANSACTION_DIGESTS }
179
180 fn msg_name(&self) -> &'static str { "TransactionDigests" }
181
182 fn priority(&self) -> SendQueuePriority { SendQueuePriority::Normal }
183
184 fn encode(&self) -> Vec<u8> {
185 let mut encoded = self.rlp_bytes();
186 self.push_msg_id_leb128_encoding(&mut encoded);
187 encoded
188 }
189}
190
191impl GetMaybeRequestId for GetTransactionsResponse {}
192mark_msg_version_bound!(GetTransactionsResponse, SYNC_PROTO_V1, SYNC_PROTO_V3);
193impl Message for GetTransactionsResponse {
194 fn is_size_sensitive(&self) -> bool { self.transactions.len() > 0 }
195
196 fn msg_id(&self) -> MsgId { msgid::GET_TRANSACTIONS_RESPONSE }
197
198 fn msg_name(&self) -> &'static str { "GetTransactionsResponse" }
199
200 fn priority(&self) -> SendQueuePriority { SendQueuePriority::Normal }
201
202 fn encode(&self) -> Vec<u8> {
203 let mut encoded = self.rlp_bytes();
204 self.push_msg_id_leb128_encoding(&mut encoded);
205 encoded
206 }
207}
208impl GetMaybeRequestId for GetTransactionsFromTxHashesResponse {}
209mark_msg_version_bound!(
210 GetTransactionsFromTxHashesResponse,
211 SYNC_PROTO_V1,
212 SYNC_PROTO_V3
213);
214impl Message for GetTransactionsFromTxHashesResponse {
215 fn is_size_sensitive(&self) -> bool { self.transactions.len() > 0 }
216
217 fn msg_id(&self) -> MsgId {
218 msgid::GET_TRANSACTIONS_FROM_TX_HASHES_RESPONSE
219 }
220
221 fn msg_name(&self) -> &'static str { "GetTransactionsFromTxHashesResponse" }
222
223 fn priority(&self) -> SendQueuePriority { SendQueuePriority::Normal }
224
225 fn encode(&self) -> Vec<u8> {
226 let mut encoded = self.rlp_bytes();
227 self.push_msg_id_leb128_encoding(&mut encoded);
228 encoded
229 }
230}
231pub fn handle_rlp_message(
236 id: MsgId, ctx: &Context, rlp: &Rlp,
237) -> Result<bool, Error> {
238 match id {
239 msgid::STATUS_V2 => handle_message::<StatusV2>(ctx, rlp)?,
240 msgid::STATUS_V3 => handle_message::<StatusV3>(ctx, rlp)?,
241 msgid::HEARTBEAT => handle_message::<Heartbeat>(ctx, rlp)?,
242 msgid::NEW_BLOCK => handle_message::<NewBlock>(ctx, rlp)?,
243 msgid::NEW_BLOCK_HASHES => {
244 handle_message::<NewBlockHashes>(ctx, rlp)?;
245 }
246 msgid::GET_BLOCK_HEADERS => {
247 handle_message::<GetBlockHeaders>(ctx, rlp)?;
248 }
249 msgid::GET_BLOCK_HEADERS_RESPONSE => {
250 handle_message::<GetBlockHeadersResponse>(ctx, rlp)?;
251 }
252 msgid::GET_BLOCKS => handle_message::<GetBlocks>(ctx, rlp)?,
253 msgid::GET_BLOCKS_RESPONSE => {
254 handle_message::<GetBlocksResponse>(ctx, rlp)?;
255 }
256 msgid::GET_BLOCKS_WITH_PUBLIC_RESPONSE => {
257 handle_message::<GetBlocksWithPublicResponse>(ctx, rlp)?;
258 }
259 msgid::GET_TERMINAL_BLOCK_HASHES => {
260 handle_message::<GetTerminalBlockHashes>(ctx, rlp)?;
261 }
262 msgid::GET_TERMINAL_BLOCK_HASHES_RESPONSE => {
263 handle_message::<GetTerminalBlockHashesResponse>(ctx, rlp)?;
264 }
265 msgid::GET_CMPCT_BLOCKS => {
266 handle_message::<GetCompactBlocks>(ctx, rlp)?;
267 }
268 msgid::GET_CMPCT_BLOCKS_RESPONSE => {
269 handle_message::<GetCompactBlocksResponse>(ctx, rlp)?;
270 }
271 msgid::GET_BLOCK_TXN => {
272 handle_message::<GetBlockTxn>(ctx, rlp)?;
273 }
274 msgid::GET_BLOCK_TXN_RESPONSE => {
275 handle_message::<GetBlockTxnResponse>(ctx, rlp)?;
276 }
277 msgid::TRANSACTIONS => {
278 handle_message::<Transactions>(ctx, rlp)?;
279 }
280 msgid::DYNAMIC_CAPABILITY_CHANGE => {
281 handle_message::<DynamicCapabilityChange>(ctx, rlp)?;
282 }
283 msgid::TRANSACTION_DIGESTS => {
284 handle_message::<TransactionDigests>(ctx, rlp)?;
285 }
286 msgid::GET_TRANSACTIONS => {
287 handle_message::<GetTransactions>(ctx, rlp)?;
288 }
289 msgid::GET_TRANSACTIONS_FROM_TX_HASHES => {
290 handle_message::<GetTransactionsFromTxHashes>(ctx, rlp)?;
291 }
292 msgid::GET_TRANSACTIONS_RESPONSE => {
293 handle_message::<GetTransactionsResponse>(ctx, rlp)?;
294 }
295 msgid::GET_TRANSACTIONS_FROM_TX_HASHES_RESPONSE => {
296 handle_message::<GetTransactionsFromTxHashesResponse>(ctx, rlp)?;
297 }
298 msgid::GET_BLOCK_HASHES_BY_EPOCH => {
299 handle_message::<GetBlockHashesByEpoch>(ctx, rlp)?;
300 }
301 msgid::GET_BLOCK_HASHES_RESPONSE => {
302 handle_message::<GetBlockHashesResponse>(ctx, rlp)?;
303 }
304 msgid::GET_SNAPSHOT_MANIFEST => {
305 handle_message::<SnapshotManifestRequest>(ctx, rlp)?;
306 }
307 msgid::GET_SNAPSHOT_MANIFEST_RESPONSE => {
308 handle_message::<SnapshotManifestResponse>(ctx, rlp)?;
309 }
310 msgid::GET_SNAPSHOT_CHUNK => {
311 handle_message::<SnapshotChunkRequest>(ctx, rlp)?;
312 }
313 msgid::GET_SNAPSHOT_CHUNK_RESPONSE => {
314 handle_message::<SnapshotChunkResponse>(ctx, rlp)?;
315 }
316 msgid::STATE_SYNC_CANDIDATE_REQUEST => {
317 handle_message::<StateSyncCandidateRequest>(ctx, rlp)?;
318 }
319 msgid::STATE_SYNC_CANDIDATE_RESPONSE => {
320 handle_message::<StateSyncCandidateResponse>(ctx, rlp)?;
321 }
322 msgid::THROTTLED => {
323 handle_message::<Throttled>(ctx, rlp)?;
324 }
325 _ => return Ok(false),
326 }
327
328 Ok(true)
329}
330
331fn handle_message<T: Decodable + Handleable + Message>(
332 ctx: &Context, rlp: &Rlp,
333) -> Result<(), Error> {
334 let msg: T = decode_rlp_and_check_deprecation(
335 rlp,
336 ctx.manager.minimum_supported_version(),
337 ctx.io.get_protocol(),
338 )?;
339
340 let msg_id = msg.msg_id();
341 let msg_name = msg.msg_name();
342 let req_id = msg.get_request_id();
343
344 trace!(
345 "handle sync protocol message, peer = {}, id = {}, name = {}, request_id = {:?}",
346 ctx.node_id, msg_id, msg_name, req_id,
347 );
348
349 msg.throttle(ctx)?;
350
351 if let Err(e) = msg.handle(ctx) {
352 debug!(
353 "failed to handle sync protocol message, peer = {}, id = {}, name = {}, request_id = {:?}, error_kind = {:?}",
354 ctx.node_id, msg_id, msg_name, req_id, e,
355 );
356
357 return Err(e);
358 }
359
360 Ok(())
361}