cfxcore/sync/message/
message.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use super::*;
6use crate::{
7    message::{
8        decode_rlp_and_check_deprecation, GetMaybeRequestId, Message,
9        MessageProtocolVersionBound, MsgId, RequestId, SetRequestId,
10    },
11    sync::{
12        message::throttling::Throttle, Error, SYNC_PROTO_V1, SYNC_PROTO_V2,
13        SYNC_PROTO_V3,
14    },
15};
16use network::{service::ProtocolVersion, NetworkProtocolHandler};
17pub use priority_send_queue::SendQueuePriority;
18use rlp::{Decodable, Encodable, Rlp};
19
20// generate `pub mod msgid`
21build_msgid! {
22    NEW_BLOCK_HASHES = 0x01
23    TRANSACTIONS = 0x02
24    GET_BLOCK_HASHES = 0x03
25    GET_BLOCK_HASHES_RESPONSE = 0x04
26    GET_BLOCK_HEADERS = 0x05
27    GET_BLOCK_HEADERS_RESPONSE = 0x06
28    GET_BLOCK_BODIES = 0x07
29    GET_BLOCK_BODIES_RESPONSE = 0x08
30    NEW_BLOCK = 0x09
31    GET_TERMINAL_BLOCK_HASHES_RESPONSE = 0x0a
32    GET_TERMINAL_BLOCK_HASHES = 0x0b
33    GET_BLOCKS = 0x0c
34    GET_BLOCKS_RESPONSE = 0x0d
35    GET_BLOCKS_WITH_PUBLIC_RESPONSE = 0x0e
36    GET_CMPCT_BLOCKS = 0x0f
37    GET_CMPCT_BLOCKS_RESPONSE = 0x10
38    GET_BLOCK_TXN = 0x11
39    GET_BLOCK_TXN_RESPONSE = 0x12
40    DYNAMIC_CAPABILITY_CHANGE = 0x13
41    TRANSACTION_DIGESTS = 0x14
42    GET_TRANSACTIONS = 0x15
43    GET_TRANSACTIONS_RESPONSE = 0x16
44    GET_BLOCK_HASHES_BY_EPOCH = 0x17
45    GET_BLOCK_HEADER_CHAIN = 0x18
46    GET_SNAPSHOT_MANIFEST = 0x19
47    GET_SNAPSHOT_MANIFEST_RESPONSE = 0x1a
48    GET_SNAPSHOT_CHUNK = 0x1b
49    GET_SNAPSHOT_CHUNK_RESPONSE = 0x1c
50    GET_TRANSACTIONS_FROM_TX_HASHES = 0x1d
51    GET_TRANSACTIONS_FROM_TX_HASHES_RESPONSE = 0x1e
52    STATE_SYNC_CANDIDATE_REQUEST = 0x20
53    STATE_SYNC_CANDIDATE_RESPONSE = 0x21
54    STATUS_V2 = 0x22
55    STATUS_V3 = 0x23
56    HEARTBEAT = 0x24
57
58    // This is only used in inflight_keys.
59    NET_INFLIGHT_BLOCKS = 0xf0
60    THROTTLED = 0xfe
61
62    INVALID = 0xff
63    // This above not the last one because msg_id is u16,
64    // U16_NEXT = 0x100
65
66    // TODO (linxi): add pos final decision message
67}
68
69// generate `impl Message for _` for each message type
70// high priority message types
71build_msg_impl! { StatusV2, msgid::STATUS_V2, "StatusV2", SYNC_PROTO_V2, SYNC_PROTO_V2 }
72build_msg_impl! { StatusV3, msgid::STATUS_V3, "StatusV3", SYNC_PROTO_V3, SYNC_PROTO_V3 }
73build_msg_impl! { Heartbeat, msgid::HEARTBEAT, "Heartbeat", SYNC_PROTO_V3, SYNC_PROTO_V3 }
74build_msg_impl! { NewBlockHashes, msgid::NEW_BLOCK_HASHES, "NewBlockHashes", SYNC_PROTO_V1, SYNC_PROTO_V3 }
75build_msg_with_request_id_impl! { GetBlockHeaders, msgid::GET_BLOCK_HEADERS, "GetBlockHeaders", SYNC_PROTO_V1, SYNC_PROTO_V3 }
76build_msg_impl! { GetBlockHeadersResponse, msgid::GET_BLOCK_HEADERS_RESPONSE, "GetBlockHeadersResponse", SYNC_PROTO_V1, SYNC_PROTO_V3 }
77build_msg_impl! { NewBlock, msgid::NEW_BLOCK, "NewBlock", SYNC_PROTO_V1, SYNC_PROTO_V3 }
78build_msg_impl! { GetTerminalBlockHashesResponse, msgid::GET_TERMINAL_BLOCK_HASHES_RESPONSE, "GetTerminalBlockHashesResponse", SYNC_PROTO_V1, SYNC_PROTO_V3 }
79build_msg_with_request_id_impl! { GetTerminalBlockHashes, msgid::GET_TERMINAL_BLOCK_HASHES, "GetTerminalBlockHashes", SYNC_PROTO_V1, SYNC_PROTO_V3 }
80build_msg_with_request_id_impl! { GetBlocks, msgid::GET_BLOCKS, "GetBlocks", SYNC_PROTO_V1, SYNC_PROTO_V3 }
81build_msg_with_request_id_impl! { GetCompactBlocks, msgid::GET_CMPCT_BLOCKS, "GetCompactBlocks", SYNC_PROTO_V1, SYNC_PROTO_V3 }
82build_msg_impl! { GetCompactBlocksResponse, msgid::GET_CMPCT_BLOCKS_RESPONSE, "GetCompactBlocksResponse", SYNC_PROTO_V1, SYNC_PROTO_V3 }
83build_msg_with_request_id_impl! { GetBlockTxn, msgid::GET_BLOCK_TXN, "GetBlockTxn", SYNC_PROTO_V1, SYNC_PROTO_V3 }
84build_msg_impl! { DynamicCapabilityChange, msgid::DYNAMIC_CAPABILITY_CHANGE, "DynamicCapabilityChange", SYNC_PROTO_V1, SYNC_PROTO_V3 }
85build_msg_with_request_id_impl! { GetBlockHashesByEpoch, msgid::GET_BLOCK_HASHES_BY_EPOCH, "GetBlockHashesByEpoch", SYNC_PROTO_V1, SYNC_PROTO_V3 }
86build_msg_impl! { Throttled, msgid::THROTTLED, "Throttled", SYNC_PROTO_V1, SYNC_PROTO_V3 }
87
88impl GetMaybeRequestId for GetBlockHashesResponse {}
89mark_msg_version_bound!(GetBlockHashesResponse, SYNC_PROTO_V1, SYNC_PROTO_V3);
90impl Message for GetBlockHashesResponse {
91    fn msg_id(&self) -> MsgId { msgid::GET_BLOCK_HASHES_RESPONSE }
92
93    fn msg_name(&self) -> &'static str { "GetBlockHashesResponse" }
94
95    fn priority(&self) -> SendQueuePriority { SendQueuePriority::Low }
96
97    fn encode(&self) -> Vec<u8> {
98        let mut encoded = self.rlp_bytes();
99        self.push_msg_id_leb128_encoding(&mut encoded);
100        encoded
101    }
102}
103
104// normal priority and size-sensitive message types
105impl GetMaybeRequestId for Transactions {}
106mark_msg_version_bound!(Transactions, SYNC_PROTO_V1, SYNC_PROTO_V3);
107impl Message for Transactions {
108    fn is_size_sensitive(&self) -> bool { self.transactions.len() > 1 }
109
110    fn msg_id(&self) -> MsgId { msgid::TRANSACTIONS }
111
112    fn msg_name(&self) -> &'static str { "Transactions" }
113
114    fn encode(&self) -> Vec<u8> {
115        let mut encoded = self.rlp_bytes();
116        self.push_msg_id_leb128_encoding(&mut encoded);
117        encoded
118    }
119}
120
121impl GetMaybeRequestId for GetBlocksResponse {}
122mark_msg_version_bound!(GetBlocksResponse, SYNC_PROTO_V1, SYNC_PROTO_V3);
123impl Message for GetBlocksResponse {
124    fn is_size_sensitive(&self) -> bool { self.blocks.len() > 0 }
125
126    fn msg_id(&self) -> MsgId { msgid::GET_BLOCKS_RESPONSE }
127
128    fn msg_name(&self) -> &'static str { "GetBlocksResponse" }
129
130    fn encode(&self) -> Vec<u8> {
131        let mut encoded = self.rlp_bytes();
132        self.push_msg_id_leb128_encoding(&mut encoded);
133        encoded
134    }
135}
136
137impl GetMaybeRequestId for GetBlocksWithPublicResponse {}
138mark_msg_version_bound!(
139    GetBlocksWithPublicResponse,
140    SYNC_PROTO_V1,
141    SYNC_PROTO_V3
142);
143impl Message for GetBlocksWithPublicResponse {
144    fn is_size_sensitive(&self) -> bool { self.blocks.len() > 0 }
145
146    fn msg_id(&self) -> MsgId { msgid::GET_BLOCKS_WITH_PUBLIC_RESPONSE }
147
148    fn msg_name(&self) -> &'static str { "GetBlocksWithPublicResponse" }
149
150    fn encode(&self) -> Vec<u8> {
151        let mut encoded = self.rlp_bytes();
152        self.push_msg_id_leb128_encoding(&mut encoded);
153        encoded
154    }
155}
156
157impl GetMaybeRequestId for GetBlockTxnResponse {}
158mark_msg_version_bound!(GetBlockTxnResponse, SYNC_PROTO_V1, SYNC_PROTO_V3);
159impl Message for GetBlockTxnResponse {
160    fn is_size_sensitive(&self) -> bool { self.block_txn.len() > 1 }
161
162    fn msg_id(&self) -> MsgId { msgid::GET_BLOCK_TXN_RESPONSE }
163
164    fn msg_name(&self) -> &'static str { "GetBlockTxnResponse" }
165
166    fn encode(&self) -> Vec<u8> {
167        let mut encoded = self.rlp_bytes();
168        self.push_msg_id_leb128_encoding(&mut encoded);
169        encoded
170    }
171}
172
173impl GetMaybeRequestId for TransactionDigests {}
174mark_msg_version_bound!(TransactionDigests, SYNC_PROTO_V1, SYNC_PROTO_V3);
175impl Message for TransactionDigests {
176    fn is_size_sensitive(&self) -> bool { self.len() > 1 }
177
178    fn msg_id(&self) -> MsgId { msgid::TRANSACTION_DIGESTS }
179
180    fn msg_name(&self) -> &'static str { "TransactionDigests" }
181
182    fn priority(&self) -> SendQueuePriority { SendQueuePriority::Normal }
183
184    fn encode(&self) -> Vec<u8> {
185        let mut encoded = self.rlp_bytes();
186        self.push_msg_id_leb128_encoding(&mut encoded);
187        encoded
188    }
189}
190
191impl GetMaybeRequestId for GetTransactionsResponse {}
192mark_msg_version_bound!(GetTransactionsResponse, SYNC_PROTO_V1, SYNC_PROTO_V3);
193impl Message for GetTransactionsResponse {
194    fn is_size_sensitive(&self) -> bool { self.transactions.len() > 0 }
195
196    fn msg_id(&self) -> MsgId { msgid::GET_TRANSACTIONS_RESPONSE }
197
198    fn msg_name(&self) -> &'static str { "GetTransactionsResponse" }
199
200    fn priority(&self) -> SendQueuePriority { SendQueuePriority::Normal }
201
202    fn encode(&self) -> Vec<u8> {
203        let mut encoded = self.rlp_bytes();
204        self.push_msg_id_leb128_encoding(&mut encoded);
205        encoded
206    }
207}
208impl GetMaybeRequestId for GetTransactionsFromTxHashesResponse {}
209mark_msg_version_bound!(
210    GetTransactionsFromTxHashesResponse,
211    SYNC_PROTO_V1,
212    SYNC_PROTO_V3
213);
214impl Message for GetTransactionsFromTxHashesResponse {
215    fn is_size_sensitive(&self) -> bool { self.transactions.len() > 0 }
216
217    fn msg_id(&self) -> MsgId {
218        msgid::GET_TRANSACTIONS_FROM_TX_HASHES_RESPONSE
219    }
220
221    fn msg_name(&self) -> &'static str { "GetTransactionsFromTxHashesResponse" }
222
223    fn priority(&self) -> SendQueuePriority { SendQueuePriority::Normal }
224
225    fn encode(&self) -> Vec<u8> {
226        let mut encoded = self.rlp_bytes();
227        self.push_msg_id_leb128_encoding(&mut encoded);
228        encoded
229    }
230}
231/// handle the RLP encoded message with given context `ctx`.
232/// If the message not handled, return `Ok(false)`.
233/// Otherwise, return `Ok(true)` if handled successfully
234/// or Err(e) on any error.
235pub fn handle_rlp_message(
236    id: MsgId, ctx: &Context, rlp: &Rlp,
237) -> Result<bool, Error> {
238    match id {
239        msgid::STATUS_V2 => handle_message::<StatusV2>(ctx, rlp)?,
240        msgid::STATUS_V3 => handle_message::<StatusV3>(ctx, rlp)?,
241        msgid::HEARTBEAT => handle_message::<Heartbeat>(ctx, rlp)?,
242        msgid::NEW_BLOCK => handle_message::<NewBlock>(ctx, rlp)?,
243        msgid::NEW_BLOCK_HASHES => {
244            handle_message::<NewBlockHashes>(ctx, rlp)?;
245        }
246        msgid::GET_BLOCK_HEADERS => {
247            handle_message::<GetBlockHeaders>(ctx, rlp)?;
248        }
249        msgid::GET_BLOCK_HEADERS_RESPONSE => {
250            handle_message::<GetBlockHeadersResponse>(ctx, rlp)?;
251        }
252        msgid::GET_BLOCKS => handle_message::<GetBlocks>(ctx, rlp)?,
253        msgid::GET_BLOCKS_RESPONSE => {
254            handle_message::<GetBlocksResponse>(ctx, rlp)?;
255        }
256        msgid::GET_BLOCKS_WITH_PUBLIC_RESPONSE => {
257            handle_message::<GetBlocksWithPublicResponse>(ctx, rlp)?;
258        }
259        msgid::GET_TERMINAL_BLOCK_HASHES => {
260            handle_message::<GetTerminalBlockHashes>(ctx, rlp)?;
261        }
262        msgid::GET_TERMINAL_BLOCK_HASHES_RESPONSE => {
263            handle_message::<GetTerminalBlockHashesResponse>(ctx, rlp)?;
264        }
265        msgid::GET_CMPCT_BLOCKS => {
266            handle_message::<GetCompactBlocks>(ctx, rlp)?;
267        }
268        msgid::GET_CMPCT_BLOCKS_RESPONSE => {
269            handle_message::<GetCompactBlocksResponse>(ctx, rlp)?;
270        }
271        msgid::GET_BLOCK_TXN => {
272            handle_message::<GetBlockTxn>(ctx, rlp)?;
273        }
274        msgid::GET_BLOCK_TXN_RESPONSE => {
275            handle_message::<GetBlockTxnResponse>(ctx, rlp)?;
276        }
277        msgid::TRANSACTIONS => {
278            handle_message::<Transactions>(ctx, rlp)?;
279        }
280        msgid::DYNAMIC_CAPABILITY_CHANGE => {
281            handle_message::<DynamicCapabilityChange>(ctx, rlp)?;
282        }
283        msgid::TRANSACTION_DIGESTS => {
284            handle_message::<TransactionDigests>(ctx, rlp)?;
285        }
286        msgid::GET_TRANSACTIONS => {
287            handle_message::<GetTransactions>(ctx, rlp)?;
288        }
289        msgid::GET_TRANSACTIONS_FROM_TX_HASHES => {
290            handle_message::<GetTransactionsFromTxHashes>(ctx, rlp)?;
291        }
292        msgid::GET_TRANSACTIONS_RESPONSE => {
293            handle_message::<GetTransactionsResponse>(ctx, rlp)?;
294        }
295        msgid::GET_TRANSACTIONS_FROM_TX_HASHES_RESPONSE => {
296            handle_message::<GetTransactionsFromTxHashesResponse>(ctx, rlp)?;
297        }
298        msgid::GET_BLOCK_HASHES_BY_EPOCH => {
299            handle_message::<GetBlockHashesByEpoch>(ctx, rlp)?;
300        }
301        msgid::GET_BLOCK_HASHES_RESPONSE => {
302            handle_message::<GetBlockHashesResponse>(ctx, rlp)?;
303        }
304        msgid::GET_SNAPSHOT_MANIFEST => {
305            handle_message::<SnapshotManifestRequest>(ctx, rlp)?;
306        }
307        msgid::GET_SNAPSHOT_MANIFEST_RESPONSE => {
308            handle_message::<SnapshotManifestResponse>(ctx, rlp)?;
309        }
310        msgid::GET_SNAPSHOT_CHUNK => {
311            handle_message::<SnapshotChunkRequest>(ctx, rlp)?;
312        }
313        msgid::GET_SNAPSHOT_CHUNK_RESPONSE => {
314            handle_message::<SnapshotChunkResponse>(ctx, rlp)?;
315        }
316        msgid::STATE_SYNC_CANDIDATE_REQUEST => {
317            handle_message::<StateSyncCandidateRequest>(ctx, rlp)?;
318        }
319        msgid::STATE_SYNC_CANDIDATE_RESPONSE => {
320            handle_message::<StateSyncCandidateResponse>(ctx, rlp)?;
321        }
322        msgid::THROTTLED => {
323            handle_message::<Throttled>(ctx, rlp)?;
324        }
325        _ => return Ok(false),
326    }
327
328    Ok(true)
329}
330
331fn handle_message<T: Decodable + Handleable + Message>(
332    ctx: &Context, rlp: &Rlp,
333) -> Result<(), Error> {
334    let msg: T = decode_rlp_and_check_deprecation(
335        rlp,
336        ctx.manager.minimum_supported_version(),
337        ctx.io.get_protocol(),
338    )?;
339
340    let msg_id = msg.msg_id();
341    let msg_name = msg.msg_name();
342    let req_id = msg.get_request_id();
343
344    trace!(
345        "handle sync protocol message, peer = {}, id = {}, name = {}, request_id = {:?}",
346        ctx.node_id, msg_id, msg_name, req_id,
347    );
348
349    msg.throttle(ctx)?;
350
351    if let Err(e) = msg.handle(ctx) {
352        debug!(
353            "failed to handle sync protocol message, peer = {}, id = {}, name = {}, request_id = {:?}, error_kind = {:?}",
354            ctx.node_id, msg_id, msg_name, req_id, e,
355        );
356
357        return Err(e);
358    }
359
360    Ok(())
361}