1use crate::{
6 message::{
7 GetMaybeRequestId, Message, MessageProtocolVersionBound, MsgId,
8 RequestId, SetRequestId,
9 },
10 sync::{
11 message::{
12 metrics::TX_HANDLE_TIMER, msgid, Context, DynamicCapability,
13 Handleable, Key, KeyContainer,
14 },
15 request_manager::{AsAny, Request},
16 Error, ProtocolConfiguration, SYNC_PROTO_V1, SYNC_PROTO_V3,
17 },
18};
19use cfx_types::H256;
20use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
21use metrics::MeterTimer;
22use network::service::ProtocolVersion;
23use primitives::{transaction::TxPropagateId, TransactionWithSignature};
24use priority_send_queue::SendQueuePriority;
25use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream};
26use rlp_derive::{RlpDecodable, RlpEncodable};
27use siphasher::sip::SipHasher24;
28use std::{any::Any, collections::HashSet, hash::Hasher, time::Duration};
29
30#[derive(Debug, PartialEq)]
31pub struct Transactions {
32 pub transactions: Vec<TransactionWithSignature>,
33}
34
35impl Encodable for Transactions {
36 fn rlp_append(&self, s: &mut RlpStream) {
37 s.append_list(&self.transactions);
38 }
39}
40
41impl Decodable for Transactions {
42 fn decode(d: &Rlp) -> Result<Self, DecoderError> {
43 let transactions = d.as_list()?;
44 Ok(Transactions { transactions })
45 }
46}
47
48impl Handleable for Transactions {
49 fn handle(self, ctx: &Context) -> Result<(), Error> {
50 let transactions = self.transactions;
51 debug!(
52 "Received {:?} transactions from Peer {:?}",
53 transactions.len(),
54 ctx.node_id
55 );
56
57 let peer_info = ctx.manager.syn.get_peer_info(&ctx.node_id)?;
58 let should_disconnect = {
59 let mut peer_info = peer_info.write();
60 if peer_info
61 .notified_capabilities
62 .contains(DynamicCapability::NormalPhase(false))
63 {
64 peer_info.received_transaction_count += transactions.len();
65 peer_info.received_transaction_count
66 > ctx
67 .manager
68 .protocol_config
69 .max_trans_count_received_in_catch_up
70 as usize
71 } else {
72 false
73 }
74 };
75
76 if should_disconnect {
77 bail!(Error::TooManyTrans);
78 }
79
80 if !ctx.manager.catch_up_mode() {
86 let (signed_trans, failure) = ctx
87 .manager
88 .graph
89 .consensus
90 .tx_pool()
91 .insert_new_transactions(transactions);
92 if failure.is_empty() {
93 debug!(
94 "Transactions successfully inserted to transaction pool"
95 );
96 } else {
97 debug!(
98 "{} transactions are rejected by the transaction pool",
99 failure.len()
100 );
101 for (tx, e) in failure {
102 trace!("Transaction {} is rejected by the transaction pool: error = {}", tx, e);
103 }
104 }
105
106 ctx.manager
107 .request_manager
108 .append_received_transactions(signed_trans);
109 Ok(())
110 } else {
111 debug!("All {} transactions are not inserted to the transaction pool, because the node is still in the catch up mode", transactions.len());
112 Err(Error::InCatchUpMode("ignore transaction_digests message because still in the catch up mode".to_string()).into())
113 }
114 }
115}
116
117#[derive(Debug, PartialEq)]
119pub struct TransactionDigests {
120 pub window_index: usize,
121 pub key1: u64, pub key2: u64,
123 short_ids: Vec<u8>, pub tx_hashes: Vec<H256>, }
126
127impl Handleable for TransactionDigests {
128 fn handle(self, ctx: &Context) -> Result<(), Error> {
129 {
130 let peer_info = ctx.manager.syn.get_peer_info(&ctx.node_id)?;
131
132 let mut peer_info = peer_info.write();
133 if peer_info
134 .notified_capabilities
135 .contains(DynamicCapability::NormalPhase(false))
136 {
137 peer_info.received_transaction_count += self.short_ids.len()
138 / Self::SHORT_ID_SIZE_IN_BYTES
139 + self.tx_hashes.len();
140 if peer_info.received_transaction_count
141 > ctx
142 .manager
143 .protocol_config
144 .max_trans_count_received_in_catch_up
145 as usize
146 {
147 bail!(Error::TooManyTrans);
148 }
149 }
150 }
151
152 if !ctx.manager.catch_up_mode() {
155 ctx.manager
156 .request_manager
157 .request_transactions_from_digest(
158 ctx.io,
159 ctx.node_id.clone(),
160 &self,
161 );
162 Ok(())
163 } else {
164 Err(Error::InCatchUpMode("ignore transaction_digests message because still in the catch up mode".to_string()).into())
165 }
166 }
167}
168
169impl Encodable for TransactionDigests {
170 fn rlp_append(&self, stream: &mut RlpStream) {
171 if self.tx_hashes.is_empty() {
172 stream
173 .begin_list(4)
174 .append(&self.window_index)
175 .append(&self.key1)
176 .append(&self.key2)
177 .append(&self.short_ids);
178 } else {
179 stream
180 .begin_list(5)
181 .append(&self.window_index)
182 .append(&self.key1)
183 .append(&self.key2)
184 .append(&self.short_ids)
185 .append_list(&self.tx_hashes);
186 }
187 }
188}
189
190impl Decodable for TransactionDigests {
191 fn decode(rlp: &Rlp) -> Result<Self, DecoderError> {
192 if !(rlp.item_count()? == 4 || rlp.item_count()? == 5) {
193 return Err(DecoderError::RlpIncorrectListLen);
194 }
195
196 let short_ids: Vec<u8> = rlp.val_at(3)?;
197 if short_ids.len() % TransactionDigests::SHORT_ID_SIZE_IN_BYTES != 0 {
198 return Err(DecoderError::Custom(
199 "TransactionDigests length Error!",
200 ));
201 }
202
203 let tx_hashes = {
204 if rlp.item_count()? == 5 {
205 rlp.list_at(4)?
206 } else {
207 vec![]
208 }
209 };
210
211 Ok(TransactionDigests {
212 window_index: rlp.val_at(0)?,
213 key1: rlp.val_at(1)?,
214 key2: rlp.val_at(2)?,
215 short_ids,
216 tx_hashes,
217 })
218 }
219}
220
221impl TransactionDigests {
222 const SHORT_ID_SIZE_IN_BYTES: usize = 4;
223
224 pub fn new(
225 window_index: usize, key1: u64, key2: u64, short_ids: Vec<u8>,
226 tx_hashes: Vec<H256>,
227 ) -> TransactionDigests {
228 TransactionDigests {
229 window_index,
230 key1,
231 key2,
232 short_ids,
233 tx_hashes,
234 }
235 }
236
237 pub fn get_decomposed_short_ids(&self) -> (Vec<u8>, Vec<TxPropagateId>) {
238 let mut random_byte_vector: Vec<u8> = Vec::new();
239 let mut fixed_bytes_vector: Vec<TxPropagateId> = Vec::new();
240
241 for i in (0..self.short_ids.len())
242 .step_by(TransactionDigests::SHORT_ID_SIZE_IN_BYTES)
243 {
244 random_byte_vector.push(self.short_ids[i]);
245 fixed_bytes_vector.push(TransactionDigests::to_u24(
246 self.short_ids[i + 1],
247 self.short_ids[i + 2],
248 self.short_ids[i + 3],
249 ));
250 }
251
252 (random_byte_vector, fixed_bytes_vector)
253 }
254
255 pub fn len(&self) -> usize {
256 self.short_ids.len() / TransactionDigests::SHORT_ID_SIZE_IN_BYTES
257 }
258
259 pub fn to_u24(v1: u8, v2: u8, v3: u8) -> u32 {
260 ((v1 as u32) << 16) + ((v2 as u32) << 8) + v3 as u32
261 }
262
263 pub fn append_short_id(
264 message: &mut Vec<u8>, key1: u64, key2: u64, transaction_id: &H256,
265 ) {
266 message.push(TransactionDigests::get_random_byte(
267 transaction_id,
268 key1,
269 key2,
270 ));
271 message.push(transaction_id[29]);
272 message.push(transaction_id[30]);
273 message.push(transaction_id[31]);
274 }
275
276 pub fn append_tx_hash(message: &mut Vec<H256>, transaction_id: H256) {
277 message.push(transaction_id);
278 }
279
280 pub fn get_random_byte(transaction_id: &H256, key1: u64, key2: u64) -> u8 {
281 let mut hasher = SipHasher24::new_with_keys(key1, key2);
282 hasher.write(transaction_id.as_ref());
283 (hasher.finish() & 0xff) as u8
284 }
285}
286
287#[derive(Debug, PartialEq, DeriveMallocSizeOf)]
290pub struct GetTransactions {
291 pub request_id: RequestId,
292 pub window_index: usize,
293 pub indices: Vec<usize>,
294 pub tx_hashes_indices: Vec<usize>,
295 pub short_ids: HashSet<TxPropagateId>,
296 pub tx_hashes: HashSet<H256>,
297}
298
299impl_request_id_methods!(GetTransactions);
300
301impl AsAny for GetTransactions {
302 fn as_any(&self) -> &dyn Any { self }
303
304 fn as_any_mut(&mut self) -> &mut dyn Any { self }
305}
306
307mark_msg_version_bound!(GetTransactions, SYNC_PROTO_V1, SYNC_PROTO_V3);
308impl Message for GetTransactions {
309 fn msg_id(&self) -> MsgId { msgid::GET_TRANSACTIONS }
310
311 fn msg_name(&self) -> &'static str { "GetTransactions" }
312
313 fn priority(&self) -> SendQueuePriority { SendQueuePriority::Normal }
314
315 fn encode(&self) -> Vec<u8> {
316 let mut encoded = self.rlp_bytes();
317 self.push_msg_id_leb128_encoding(&mut encoded);
318 encoded
319 }
320}
321
322impl Request for GetTransactions {
323 fn timeout(&self, conf: &ProtocolConfiguration) -> Duration {
324 conf.transaction_request_timeout
325 }
326
327 fn on_removed(&self, inflight_keys: &KeyContainer) {
328 let mut short_id_inflight_keys =
329 inflight_keys.write(msgid::GET_TRANSACTIONS);
330 let mut tx_hash_inflight_keys =
331 inflight_keys.write(msgid::GET_TRANSACTIONS_FROM_TX_HASHES);
332 for tx in &self.short_ids {
333 short_id_inflight_keys.remove(&Key::Id(*tx));
334 }
335 for tx in &self.tx_hashes {
336 tx_hash_inflight_keys.remove(&Key::Hash(*tx));
337 }
338 }
339
340 fn with_inflight(&mut self, inflight_keys: &KeyContainer) {
341 let mut short_id_inflight_keys =
342 inflight_keys.write(msgid::GET_TRANSACTIONS);
343 let mut tx_hash_inflight_keys =
344 inflight_keys.write(msgid::GET_TRANSACTIONS_FROM_TX_HASHES);
345 let mut short_ids: HashSet<TxPropagateId> = HashSet::new();
346 let mut tx_hashes: HashSet<H256> = HashSet::new();
347 for id in self.short_ids.iter() {
348 if short_id_inflight_keys.insert(Key::Id(*id)) {
349 short_ids.insert(*id);
350 }
351 }
352 for id in self.tx_hashes.iter() {
353 if tx_hash_inflight_keys.insert(Key::Hash(*id)) {
354 tx_hashes.insert(*id);
355 }
356 }
357
358 self.short_ids = short_ids;
359 self.tx_hashes = tx_hashes;
360 }
361
362 fn is_empty(&self) -> bool {
363 self.tx_hashes_indices.is_empty() && self.indices.is_empty()
364 }
365
366 fn resend(&self) -> Option<Box<dyn Request>> { None }
367}
368
369impl Handleable for GetTransactions {
370 fn handle(self, ctx: &Context) -> Result<(), Error> {
371 let transactions = ctx
372 .manager
373 .request_manager
374 .get_sent_transactions(self.window_index, &self.indices);
375 let tx_hashes_indices = ctx
376 .manager
377 .request_manager
378 .get_sent_transactions(self.window_index, &self.tx_hashes_indices);
379 let tx_hashes =
380 tx_hashes_indices.into_iter().map(|tx| tx.hash()).collect();
381 let response = GetTransactionsResponse {
382 request_id: self.request_id,
383 transactions,
384 tx_hashes,
385 };
386 debug!(
387 "on_get_transactions request {} txs, {} tx hashes, returned {} txs {} tx hashes",
388 self.indices.len(),
389 self.tx_hashes_indices.len(),
390 response.transactions.len(),
391 response.tx_hashes.len(),
392 );
393
394 ctx.send_response(&response)
395 }
396}
397
398impl Encodable for GetTransactions {
399 fn rlp_append(&self, stream: &mut RlpStream) {
400 if self.tx_hashes_indices.is_empty() {
401 stream
402 .begin_list(3)
403 .append(&self.request_id)
404 .append(&self.window_index)
405 .append_list(&self.indices);
406 } else {
407 stream
408 .begin_list(4)
409 .append(&self.request_id)
410 .append(&self.window_index)
411 .append_list(&self.indices)
412 .append_list(&self.tx_hashes_indices);
413 }
414 }
415}
416
417impl Decodable for GetTransactions {
418 fn decode(rlp: &Rlp) -> Result<Self, DecoderError> {
419 if !(rlp.item_count()? == 3 || rlp.item_count()? == 4) {
420 return Err(DecoderError::RlpIncorrectListLen);
421 }
422 if rlp.item_count()? == 3 {
423 Ok(GetTransactions {
424 request_id: rlp.val_at(0)?,
425 window_index: rlp.val_at(1)?,
426 indices: rlp.list_at(2)?,
427 tx_hashes_indices: vec![],
428 short_ids: HashSet::new(),
429 tx_hashes: HashSet::new(),
430 })
431 } else {
432 Ok(GetTransactions {
433 request_id: rlp.val_at(0)?,
434 window_index: rlp.val_at(1)?,
435 indices: rlp.list_at(2)?,
436 tx_hashes_indices: rlp.list_at(3)?,
437 short_ids: HashSet::new(),
438 tx_hashes: HashSet::new(),
439 })
440 }
441 }
442}
443
444#[derive(Debug, PartialEq, DeriveMallocSizeOf)]
447pub struct GetTransactionsFromTxHashes {
448 pub request_id: RequestId,
449 pub window_index: usize,
450 pub indices: Vec<usize>,
451 pub tx_hashes: HashSet<H256>,
452}
453
454impl_request_id_methods!(GetTransactionsFromTxHashes);
455
456impl AsAny for GetTransactionsFromTxHashes {
457 fn as_any(&self) -> &dyn Any { self }
458
459 fn as_any_mut(&mut self) -> &mut dyn Any { self }
460}
461
462mark_msg_version_bound!(
463 GetTransactionsFromTxHashes,
464 SYNC_PROTO_V1,
465 SYNC_PROTO_V3
466);
467impl Message for GetTransactionsFromTxHashes {
468 fn msg_id(&self) -> MsgId { msgid::GET_TRANSACTIONS_FROM_TX_HASHES }
469
470 fn msg_name(&self) -> &'static str { "GetTransactionsFromTxHashes" }
471
472 fn priority(&self) -> SendQueuePriority { SendQueuePriority::Normal }
473
474 fn encode(&self) -> Vec<u8> {
475 let mut encoded = self.rlp_bytes();
476 self.push_msg_id_leb128_encoding(&mut encoded);
477 encoded
478 }
479}
480
481impl Request for GetTransactionsFromTxHashes {
482 fn timeout(&self, conf: &ProtocolConfiguration) -> Duration {
483 conf.transaction_request_timeout
484 }
485
486 fn on_removed(&self, inflight_keys: &KeyContainer) {
487 let mut inflight_keys = inflight_keys.write(self.msg_id());
488 for tx_hash in self.tx_hashes.iter() {
489 inflight_keys.remove(&Key::Hash(*tx_hash));
490 }
491 }
492
493 fn with_inflight(&mut self, inflight_keys: &KeyContainer) {
494 let mut inflight_keys = inflight_keys.write(self.msg_id());
495
496 let mut tx_hashes: HashSet<H256> = HashSet::new();
497 for id in self.tx_hashes.iter() {
498 if inflight_keys.insert(Key::Hash(*id)) {
499 tx_hashes.insert(*id);
500 }
501 }
502
503 self.tx_hashes = tx_hashes;
504 }
505
506 fn is_empty(&self) -> bool { self.tx_hashes.is_empty() }
507
508 fn resend(&self) -> Option<Box<dyn Request>> { None }
509}
510
511impl Handleable for GetTransactionsFromTxHashes {
512 fn handle(self, ctx: &Context) -> Result<(), Error> {
513 let transactions = ctx
514 .manager
515 .request_manager
516 .get_sent_transactions(self.window_index, &self.indices);
517
518 let response = GetTransactionsFromTxHashesResponse {
519 request_id: self.request_id,
520 transactions,
521 };
522 debug!(
523 "on_get_transactions_from_tx_hashes request {} txs, returned {} txs",
524 self.indices.len(),
525 response.transactions.len(),
526 );
527
528 ctx.send_response(&response)
529 }
530}
531
532impl Encodable for GetTransactionsFromTxHashes {
533 fn rlp_append(&self, stream: &mut RlpStream) {
534 stream
535 .begin_list(3)
536 .append(&self.request_id)
537 .append(&self.window_index)
538 .append_list(&self.indices);
539 }
540}
541
542impl Decodable for GetTransactionsFromTxHashes {
543 fn decode(rlp: &Rlp) -> Result<Self, DecoderError> {
544 if rlp.item_count()? != 3 {
545 return Err(DecoderError::RlpIncorrectListLen);
546 }
547
548 Ok(GetTransactionsFromTxHashes {
549 request_id: rlp.val_at(0)?,
550 window_index: rlp.val_at(1)?,
551 indices: rlp.list_at(2)?,
552 tx_hashes: HashSet::new(),
553 })
554 }
555}
556
557#[derive(Debug, PartialEq, RlpDecodable, RlpEncodable)]
560pub struct GetTransactionsResponse {
561 pub request_id: RequestId,
562 pub transactions: Vec<TransactionWithSignature>,
563 pub tx_hashes: Vec<H256>,
564}
565
566impl Handleable for GetTransactionsResponse {
567 fn handle(self, ctx: &Context) -> Result<(), Error> {
568 let _timer = MeterTimer::time_func(TX_HANDLE_TIMER.as_ref());
569
570 debug!("on_get_transactions_response {:?}", self.request_id);
571
572 let req = ctx.match_request(self.request_id)?;
573 let req = req.downcast_ref::<GetTransactions>(
574 ctx.io,
575 &ctx.manager.request_manager,
576 )?;
577
578 debug!(
581 "Received {:?} transactions and {:?} tx hashes from Peer {:?}",
582 self.transactions.len(),
583 self.tx_hashes.len(),
584 ctx.node_id
585 );
586
587 if !ctx.manager.catch_up_mode() {
593 let (signed_trans, failure) = ctx
594 .manager
595 .graph
596 .consensus
597 .tx_pool()
598 .insert_new_transactions(self.transactions);
599 if failure.is_empty() {
600 debug!(
601 "Transactions successfully inserted to transaction pool"
602 );
603 } else {
604 debug!(
605 "{} transactions are rejected by the transaction pool",
606 failure.len()
607 );
608 for (tx, e) in failure {
609 trace!("Transaction {} is rejected by the transaction pool: error = {}", tx, e);
610 }
611 }
612 ctx.manager
613 .request_manager
614 .transactions_received_from_digests(ctx.io, &req, signed_trans);
615
616 if req.tx_hashes_indices.len() > 0 && !self.tx_hashes.is_empty() {
617 ctx.manager
618 .request_manager
619 .request_transactions_from_tx_hashes(
620 ctx.io,
621 ctx.node_id.clone(),
622 self.tx_hashes,
623 req.window_index,
624 &req.tx_hashes_indices,
625 );
626 }
627 Ok(())
628 } else {
629 debug!("All {} transactions are not inserted to the transaction pool, because the node is still in the catch up mode", self.transactions.len());
630 Err(Error::InCatchUpMode("transactions discarded for handling on_get_transactions_response messages".to_string()).into())
631 }
632 }
633}
634
635#[derive(Debug, PartialEq, RlpDecodable, RlpEncodable)]
638pub struct GetTransactionsFromTxHashesResponse {
639 pub request_id: RequestId,
640 pub transactions: Vec<TransactionWithSignature>,
641}
642
643impl Handleable for GetTransactionsFromTxHashesResponse {
644 fn handle(self, ctx: &Context) -> Result<(), Error> {
645 let _timer = MeterTimer::time_func(TX_HANDLE_TIMER.as_ref());
646
647 debug!(
648 "on_get_transactions_from_tx_hashes_response {:?}",
649 self.request_id
650 );
651
652 let req = ctx.match_request(self.request_id)?;
653 let req = req.downcast_ref::<GetTransactionsFromTxHashes>(
654 ctx.io,
655 &ctx.manager.request_manager,
656 )?;
657
658 debug!(
661 "Received {:?} transactions from Peer {:?}",
662 self.transactions.len(),
663 ctx.node_id
664 );
665
666 if !ctx.manager.catch_up_mode() {
672 let (signed_trans, failure) = ctx
673 .manager
674 .graph
675 .consensus
676 .tx_pool()
677 .insert_new_transactions(self.transactions);
678 if failure.is_empty() {
679 debug!(
680 "Transactions successfully inserted to transaction pool"
681 );
682 } else {
683 debug!(
684 "{} transactions are rejected by the transaction pool",
685 failure.len()
686 );
687 for (tx, e) in failure {
688 trace!("Transaction {} is rejected by the transaction pool: error = {}", tx, e);
689 }
690 }
691 ctx.manager
692 .request_manager
693 .transactions_received_from_tx_hashes(&req, signed_trans);
694 Ok(())
695 } else {
696 debug!("All {} transactions are not inserted to the transaction pool, because the node is still in the catch up mode", self.transactions.len());
697 Err(Error::InCatchUpMode("transactions discarded for handling on_get_transactions_response messages".to_string()).into())
698 }
699 }
700}