cfxcore/sync/message/
transactions.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::{
6    message::{
7        GetMaybeRequestId, Message, MessageProtocolVersionBound, MsgId,
8        RequestId, SetRequestId,
9    },
10    sync::{
11        message::{
12            metrics::TX_HANDLE_TIMER, msgid, Context, DynamicCapability,
13            Handleable, Key, KeyContainer,
14        },
15        request_manager::{AsAny, Request},
16        Error, ProtocolConfiguration, SYNC_PROTO_V1, SYNC_PROTO_V3,
17    },
18};
19use cfx_types::H256;
20use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
21use metrics::MeterTimer;
22use network::service::ProtocolVersion;
23use primitives::{transaction::TxPropagateId, TransactionWithSignature};
24use priority_send_queue::SendQueuePriority;
25use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream};
26use rlp_derive::{RlpDecodable, RlpEncodable};
27use siphasher::sip::SipHasher24;
28use std::{any::Any, collections::HashSet, hash::Hasher, time::Duration};
29
30#[derive(Debug, PartialEq)]
31pub struct Transactions {
32    pub transactions: Vec<TransactionWithSignature>,
33}
34
35impl Encodable for Transactions {
36    fn rlp_append(&self, s: &mut RlpStream) {
37        s.append_list(&self.transactions);
38    }
39}
40
41impl Decodable for Transactions {
42    fn decode(d: &Rlp) -> Result<Self, DecoderError> {
43        let transactions = d.as_list()?;
44        Ok(Transactions { transactions })
45    }
46}
47
48impl Handleable for Transactions {
49    fn handle(self, ctx: &Context) -> Result<(), Error> {
50        let transactions = self.transactions;
51        debug!(
52            "Received {:?} transactions from Peer {:?}",
53            transactions.len(),
54            ctx.node_id
55        );
56
57        let peer_info = ctx.manager.syn.get_peer_info(&ctx.node_id)?;
58        let should_disconnect = {
59            let mut peer_info = peer_info.write();
60            if peer_info
61                .notified_capabilities
62                .contains(DynamicCapability::NormalPhase(false))
63            {
64                peer_info.received_transaction_count += transactions.len();
65                peer_info.received_transaction_count
66                    > ctx
67                        .manager
68                        .protocol_config
69                        .max_trans_count_received_in_catch_up
70                        as usize
71            } else {
72                false
73            }
74        };
75
76        if should_disconnect {
77            bail!(Error::TooManyTrans);
78        }
79
80        // The transaction pool will rely on the execution state information to
81        // verify transaction validity. It may incorrectly accept/reject
82        // transactions when in the catch up mode because the state is still
83        // not correct. We therefore do not insert transactions when in the
84        // catch up mode.
85        if !ctx.manager.catch_up_mode() {
86            let (signed_trans, failure) = ctx
87                .manager
88                .graph
89                .consensus
90                .tx_pool()
91                .insert_new_transactions(transactions);
92            if failure.is_empty() {
93                debug!(
94                    "Transactions successfully inserted to transaction pool"
95                );
96            } else {
97                debug!(
98                    "{} transactions are rejected by the transaction pool",
99                    failure.len()
100                );
101                for (tx, e) in failure {
102                    trace!("Transaction {} is rejected by the transaction pool: error = {}", tx, e);
103                }
104            }
105
106            ctx.manager
107                .request_manager
108                .append_received_transactions(signed_trans);
109            Ok(())
110        } else {
111            debug!("All {} transactions are not inserted to the transaction pool, because the node is still in the catch up mode", transactions.len());
112            Err(Error::InCatchUpMode("ignore transaction_digests message because still in the catch up mode".to_string()).into())
113        }
114    }
115}
116
117/////////////////////////////////////////////////////////////////////
118#[derive(Debug, PartialEq)]
119pub struct TransactionDigests {
120    pub window_index: usize,
121    pub key1: u64, //keys used for siphash
122    pub key2: u64,
123    short_ids: Vec<u8>, // 4 bytes ids which stores in sequential order
124    pub tx_hashes: Vec<H256>, // SHA-3 hash
125}
126
127impl Handleable for TransactionDigests {
128    fn handle(self, ctx: &Context) -> Result<(), Error> {
129        {
130            let peer_info = ctx.manager.syn.get_peer_info(&ctx.node_id)?;
131
132            let mut peer_info = peer_info.write();
133            if peer_info
134                .notified_capabilities
135                .contains(DynamicCapability::NormalPhase(false))
136            {
137                peer_info.received_transaction_count += self.short_ids.len()
138                    / Self::SHORT_ID_SIZE_IN_BYTES
139                    + self.tx_hashes.len();
140                if peer_info.received_transaction_count
141                    > ctx
142                        .manager
143                        .protocol_config
144                        .max_trans_count_received_in_catch_up
145                        as usize
146                {
147                    bail!(Error::TooManyTrans);
148                }
149            }
150        }
151
152        // We will not request transactions when in the catch up mode, because
153        // the transaction pool cannot process them correctly.
154        if !ctx.manager.catch_up_mode() {
155            ctx.manager
156                .request_manager
157                .request_transactions_from_digest(
158                    ctx.io,
159                    ctx.node_id.clone(),
160                    &self,
161                );
162            Ok(())
163        } else {
164            Err(Error::InCatchUpMode("ignore transaction_digests message because still in the catch up mode".to_string()).into())
165        }
166    }
167}
168
169impl Encodable for TransactionDigests {
170    fn rlp_append(&self, stream: &mut RlpStream) {
171        if self.tx_hashes.is_empty() {
172            stream
173                .begin_list(4)
174                .append(&self.window_index)
175                .append(&self.key1)
176                .append(&self.key2)
177                .append(&self.short_ids);
178        } else {
179            stream
180                .begin_list(5)
181                .append(&self.window_index)
182                .append(&self.key1)
183                .append(&self.key2)
184                .append(&self.short_ids)
185                .append_list(&self.tx_hashes);
186        }
187    }
188}
189
190impl Decodable for TransactionDigests {
191    fn decode(rlp: &Rlp) -> Result<Self, DecoderError> {
192        if !(rlp.item_count()? == 4 || rlp.item_count()? == 5) {
193            return Err(DecoderError::RlpIncorrectListLen);
194        }
195
196        let short_ids: Vec<u8> = rlp.val_at(3)?;
197        if short_ids.len() % TransactionDigests::SHORT_ID_SIZE_IN_BYTES != 0 {
198            return Err(DecoderError::Custom(
199                "TransactionDigests length Error!",
200            ));
201        }
202
203        let tx_hashes = {
204            if rlp.item_count()? == 5 {
205                rlp.list_at(4)?
206            } else {
207                vec![]
208            }
209        };
210
211        Ok(TransactionDigests {
212            window_index: rlp.val_at(0)?,
213            key1: rlp.val_at(1)?,
214            key2: rlp.val_at(2)?,
215            short_ids,
216            tx_hashes,
217        })
218    }
219}
220
221impl TransactionDigests {
222    const SHORT_ID_SIZE_IN_BYTES: usize = 4;
223
224    pub fn new(
225        window_index: usize, key1: u64, key2: u64, short_ids: Vec<u8>,
226        tx_hashes: Vec<H256>,
227    ) -> TransactionDigests {
228        TransactionDigests {
229            window_index,
230            key1,
231            key2,
232            short_ids,
233            tx_hashes,
234        }
235    }
236
237    pub fn get_decomposed_short_ids(&self) -> (Vec<u8>, Vec<TxPropagateId>) {
238        let mut random_byte_vector: Vec<u8> = Vec::new();
239        let mut fixed_bytes_vector: Vec<TxPropagateId> = Vec::new();
240
241        for i in (0..self.short_ids.len())
242            .step_by(TransactionDigests::SHORT_ID_SIZE_IN_BYTES)
243        {
244            random_byte_vector.push(self.short_ids[i]);
245            fixed_bytes_vector.push(TransactionDigests::to_u24(
246                self.short_ids[i + 1],
247                self.short_ids[i + 2],
248                self.short_ids[i + 3],
249            ));
250        }
251
252        (random_byte_vector, fixed_bytes_vector)
253    }
254
255    pub fn len(&self) -> usize {
256        self.short_ids.len() / TransactionDigests::SHORT_ID_SIZE_IN_BYTES
257    }
258
259    pub fn to_u24(v1: u8, v2: u8, v3: u8) -> u32 {
260        ((v1 as u32) << 16) + ((v2 as u32) << 8) + v3 as u32
261    }
262
263    pub fn append_short_id(
264        message: &mut Vec<u8>, key1: u64, key2: u64, transaction_id: &H256,
265    ) {
266        message.push(TransactionDigests::get_random_byte(
267            transaction_id,
268            key1,
269            key2,
270        ));
271        message.push(transaction_id[29]);
272        message.push(transaction_id[30]);
273        message.push(transaction_id[31]);
274    }
275
276    pub fn append_tx_hash(message: &mut Vec<H256>, transaction_id: H256) {
277        message.push(transaction_id);
278    }
279
280    pub fn get_random_byte(transaction_id: &H256, key1: u64, key2: u64) -> u8 {
281        let mut hasher = SipHasher24::new_with_keys(key1, key2);
282        hasher.write(transaction_id.as_ref());
283        (hasher.finish() & 0xff) as u8
284    }
285}
286
287/////////////////////////////////////////////////////////////////////////
288
289#[derive(Debug, PartialEq, DeriveMallocSizeOf)]
290pub struct GetTransactions {
291    pub request_id: RequestId,
292    pub window_index: usize,
293    pub indices: Vec<usize>,
294    pub tx_hashes_indices: Vec<usize>,
295    pub short_ids: HashSet<TxPropagateId>,
296    pub tx_hashes: HashSet<H256>,
297}
298
299impl_request_id_methods!(GetTransactions);
300
301impl AsAny for GetTransactions {
302    fn as_any(&self) -> &dyn Any { self }
303
304    fn as_any_mut(&mut self) -> &mut dyn Any { self }
305}
306
307mark_msg_version_bound!(GetTransactions, SYNC_PROTO_V1, SYNC_PROTO_V3);
308impl Message for GetTransactions {
309    fn msg_id(&self) -> MsgId { msgid::GET_TRANSACTIONS }
310
311    fn msg_name(&self) -> &'static str { "GetTransactions" }
312
313    fn priority(&self) -> SendQueuePriority { SendQueuePriority::Normal }
314
315    fn encode(&self) -> Vec<u8> {
316        let mut encoded = self.rlp_bytes();
317        self.push_msg_id_leb128_encoding(&mut encoded);
318        encoded
319    }
320}
321
322impl Request for GetTransactions {
323    fn timeout(&self, conf: &ProtocolConfiguration) -> Duration {
324        conf.transaction_request_timeout
325    }
326
327    fn on_removed(&self, inflight_keys: &KeyContainer) {
328        let mut short_id_inflight_keys =
329            inflight_keys.write(msgid::GET_TRANSACTIONS);
330        let mut tx_hash_inflight_keys =
331            inflight_keys.write(msgid::GET_TRANSACTIONS_FROM_TX_HASHES);
332        for tx in &self.short_ids {
333            short_id_inflight_keys.remove(&Key::Id(*tx));
334        }
335        for tx in &self.tx_hashes {
336            tx_hash_inflight_keys.remove(&Key::Hash(*tx));
337        }
338    }
339
340    fn with_inflight(&mut self, inflight_keys: &KeyContainer) {
341        let mut short_id_inflight_keys =
342            inflight_keys.write(msgid::GET_TRANSACTIONS);
343        let mut tx_hash_inflight_keys =
344            inflight_keys.write(msgid::GET_TRANSACTIONS_FROM_TX_HASHES);
345        let mut short_ids: HashSet<TxPropagateId> = HashSet::new();
346        let mut tx_hashes: HashSet<H256> = HashSet::new();
347        for id in self.short_ids.iter() {
348            if short_id_inflight_keys.insert(Key::Id(*id)) {
349                short_ids.insert(*id);
350            }
351        }
352        for id in self.tx_hashes.iter() {
353            if tx_hash_inflight_keys.insert(Key::Hash(*id)) {
354                tx_hashes.insert(*id);
355            }
356        }
357
358        self.short_ids = short_ids;
359        self.tx_hashes = tx_hashes;
360    }
361
362    fn is_empty(&self) -> bool {
363        self.tx_hashes_indices.is_empty() && self.indices.is_empty()
364    }
365
366    fn resend(&self) -> Option<Box<dyn Request>> { None }
367}
368
369impl Handleable for GetTransactions {
370    fn handle(self, ctx: &Context) -> Result<(), Error> {
371        let transactions = ctx
372            .manager
373            .request_manager
374            .get_sent_transactions(self.window_index, &self.indices);
375        let tx_hashes_indices = ctx
376            .manager
377            .request_manager
378            .get_sent_transactions(self.window_index, &self.tx_hashes_indices);
379        let tx_hashes =
380            tx_hashes_indices.into_iter().map(|tx| tx.hash()).collect();
381        let response = GetTransactionsResponse {
382            request_id: self.request_id,
383            transactions,
384            tx_hashes,
385        };
386        debug!(
387            "on_get_transactions request {} txs, {} tx hashes, returned {} txs {} tx hashes",
388            self.indices.len(),
389            self.tx_hashes_indices.len(),
390            response.transactions.len(),
391            response.tx_hashes.len(),
392        );
393
394        ctx.send_response(&response)
395    }
396}
397
398impl Encodable for GetTransactions {
399    fn rlp_append(&self, stream: &mut RlpStream) {
400        if self.tx_hashes_indices.is_empty() {
401            stream
402                .begin_list(3)
403                .append(&self.request_id)
404                .append(&self.window_index)
405                .append_list(&self.indices);
406        } else {
407            stream
408                .begin_list(4)
409                .append(&self.request_id)
410                .append(&self.window_index)
411                .append_list(&self.indices)
412                .append_list(&self.tx_hashes_indices);
413        }
414    }
415}
416
417impl Decodable for GetTransactions {
418    fn decode(rlp: &Rlp) -> Result<Self, DecoderError> {
419        if !(rlp.item_count()? == 3 || rlp.item_count()? == 4) {
420            return Err(DecoderError::RlpIncorrectListLen);
421        }
422        if rlp.item_count()? == 3 {
423            Ok(GetTransactions {
424                request_id: rlp.val_at(0)?,
425                window_index: rlp.val_at(1)?,
426                indices: rlp.list_at(2)?,
427                tx_hashes_indices: vec![],
428                short_ids: HashSet::new(),
429                tx_hashes: HashSet::new(),
430            })
431        } else {
432            Ok(GetTransactions {
433                request_id: rlp.val_at(0)?,
434                window_index: rlp.val_at(1)?,
435                indices: rlp.list_at(2)?,
436                tx_hashes_indices: rlp.list_at(3)?,
437                short_ids: HashSet::new(),
438                tx_hashes: HashSet::new(),
439            })
440        }
441    }
442}
443
444/////////////////////////////////////////////////////////////////////////
445
446#[derive(Debug, PartialEq, DeriveMallocSizeOf)]
447pub struct GetTransactionsFromTxHashes {
448    pub request_id: RequestId,
449    pub window_index: usize,
450    pub indices: Vec<usize>,
451    pub tx_hashes: HashSet<H256>,
452}
453
454impl_request_id_methods!(GetTransactionsFromTxHashes);
455
456impl AsAny for GetTransactionsFromTxHashes {
457    fn as_any(&self) -> &dyn Any { self }
458
459    fn as_any_mut(&mut self) -> &mut dyn Any { self }
460}
461
462mark_msg_version_bound!(
463    GetTransactionsFromTxHashes,
464    SYNC_PROTO_V1,
465    SYNC_PROTO_V3
466);
467impl Message for GetTransactionsFromTxHashes {
468    fn msg_id(&self) -> MsgId { msgid::GET_TRANSACTIONS_FROM_TX_HASHES }
469
470    fn msg_name(&self) -> &'static str { "GetTransactionsFromTxHashes" }
471
472    fn priority(&self) -> SendQueuePriority { SendQueuePriority::Normal }
473
474    fn encode(&self) -> Vec<u8> {
475        let mut encoded = self.rlp_bytes();
476        self.push_msg_id_leb128_encoding(&mut encoded);
477        encoded
478    }
479}
480
481impl Request for GetTransactionsFromTxHashes {
482    fn timeout(&self, conf: &ProtocolConfiguration) -> Duration {
483        conf.transaction_request_timeout
484    }
485
486    fn on_removed(&self, inflight_keys: &KeyContainer) {
487        let mut inflight_keys = inflight_keys.write(self.msg_id());
488        for tx_hash in self.tx_hashes.iter() {
489            inflight_keys.remove(&Key::Hash(*tx_hash));
490        }
491    }
492
493    fn with_inflight(&mut self, inflight_keys: &KeyContainer) {
494        let mut inflight_keys = inflight_keys.write(self.msg_id());
495
496        let mut tx_hashes: HashSet<H256> = HashSet::new();
497        for id in self.tx_hashes.iter() {
498            if inflight_keys.insert(Key::Hash(*id)) {
499                tx_hashes.insert(*id);
500            }
501        }
502
503        self.tx_hashes = tx_hashes;
504    }
505
506    fn is_empty(&self) -> bool { self.tx_hashes.is_empty() }
507
508    fn resend(&self) -> Option<Box<dyn Request>> { None }
509}
510
511impl Handleable for GetTransactionsFromTxHashes {
512    fn handle(self, ctx: &Context) -> Result<(), Error> {
513        let transactions = ctx
514            .manager
515            .request_manager
516            .get_sent_transactions(self.window_index, &self.indices);
517
518        let response = GetTransactionsFromTxHashesResponse {
519            request_id: self.request_id,
520            transactions,
521        };
522        debug!(
523            "on_get_transactions_from_tx_hashes request {} txs, returned {} txs",
524            self.indices.len(),
525            response.transactions.len(),
526        );
527
528        ctx.send_response(&response)
529    }
530}
531
532impl Encodable for GetTransactionsFromTxHashes {
533    fn rlp_append(&self, stream: &mut RlpStream) {
534        stream
535            .begin_list(3)
536            .append(&self.request_id)
537            .append(&self.window_index)
538            .append_list(&self.indices);
539    }
540}
541
542impl Decodable for GetTransactionsFromTxHashes {
543    fn decode(rlp: &Rlp) -> Result<Self, DecoderError> {
544        if rlp.item_count()? != 3 {
545            return Err(DecoderError::RlpIncorrectListLen);
546        }
547
548        Ok(GetTransactionsFromTxHashes {
549            request_id: rlp.val_at(0)?,
550            window_index: rlp.val_at(1)?,
551            indices: rlp.list_at(2)?,
552            tx_hashes: HashSet::new(),
553        })
554    }
555}
556
557///////////////////////////////////////////////////////////////////////
558
559#[derive(Debug, PartialEq, RlpDecodable, RlpEncodable)]
560pub struct GetTransactionsResponse {
561    pub request_id: RequestId,
562    pub transactions: Vec<TransactionWithSignature>,
563    pub tx_hashes: Vec<H256>,
564}
565
566impl Handleable for GetTransactionsResponse {
567    fn handle(self, ctx: &Context) -> Result<(), Error> {
568        let _timer = MeterTimer::time_func(TX_HANDLE_TIMER.as_ref());
569
570        debug!("on_get_transactions_response {:?}", self.request_id);
571
572        let req = ctx.match_request(self.request_id)?;
573        let req = req.downcast_ref::<GetTransactions>(
574            ctx.io,
575            &ctx.manager.request_manager,
576        )?;
577
578        // FIXME: Do some check based on transaction request.
579
580        debug!(
581            "Received {:?} transactions and {:?} tx hashes from Peer {:?}",
582            self.transactions.len(),
583            self.tx_hashes.len(),
584            ctx.node_id
585        );
586
587        // The transaction pool will rely on the execution state information to
588        // verify transaction validity. It may incorrectly accept/reject
589        // transactions when in the catch up mode because the state is still
590        // not correct. We therefore do not insert transactions when in the
591        // catch up mode.
592        if !ctx.manager.catch_up_mode() {
593            let (signed_trans, failure) = ctx
594                .manager
595                .graph
596                .consensus
597                .tx_pool()
598                .insert_new_transactions(self.transactions);
599            if failure.is_empty() {
600                debug!(
601                    "Transactions successfully inserted to transaction pool"
602                );
603            } else {
604                debug!(
605                    "{} transactions are rejected by the transaction pool",
606                    failure.len()
607                );
608                for (tx, e) in failure {
609                    trace!("Transaction {} is rejected by the transaction pool: error = {}", tx, e);
610                }
611            }
612            ctx.manager
613                .request_manager
614                .transactions_received_from_digests(ctx.io, &req, signed_trans);
615
616            if req.tx_hashes_indices.len() > 0 && !self.tx_hashes.is_empty() {
617                ctx.manager
618                    .request_manager
619                    .request_transactions_from_tx_hashes(
620                        ctx.io,
621                        ctx.node_id.clone(),
622                        self.tx_hashes,
623                        req.window_index,
624                        &req.tx_hashes_indices,
625                    );
626            }
627            Ok(())
628        } else {
629            debug!("All {} transactions are not inserted to the transaction pool, because the node is still in the catch up mode", self.transactions.len());
630            Err(Error::InCatchUpMode("transactions discarded for handling on_get_transactions_response messages".to_string()).into())
631        }
632    }
633}
634
635//////////////////////////////////////////////////////////////////////
636
637#[derive(Debug, PartialEq, RlpDecodable, RlpEncodable)]
638pub struct GetTransactionsFromTxHashesResponse {
639    pub request_id: RequestId,
640    pub transactions: Vec<TransactionWithSignature>,
641}
642
643impl Handleable for GetTransactionsFromTxHashesResponse {
644    fn handle(self, ctx: &Context) -> Result<(), Error> {
645        let _timer = MeterTimer::time_func(TX_HANDLE_TIMER.as_ref());
646
647        debug!(
648            "on_get_transactions_from_tx_hashes_response {:?}",
649            self.request_id
650        );
651
652        let req = ctx.match_request(self.request_id)?;
653        let req = req.downcast_ref::<GetTransactionsFromTxHashes>(
654            ctx.io,
655            &ctx.manager.request_manager,
656        )?;
657
658        // FIXME: Do some check based on transaction request.
659
660        debug!(
661            "Received {:?} transactions from Peer {:?}",
662            self.transactions.len(),
663            ctx.node_id
664        );
665
666        // The transaction pool will rely on the execution state information to
667        // verify transaction validity. It may incorrectly accept/reject
668        // transactions when in the catch up mode because the state is still
669        // not correct. We therefore do not insert transactions when in the
670        // catch up mode.
671        if !ctx.manager.catch_up_mode() {
672            let (signed_trans, failure) = ctx
673                .manager
674                .graph
675                .consensus
676                .tx_pool()
677                .insert_new_transactions(self.transactions);
678            if failure.is_empty() {
679                debug!(
680                    "Transactions successfully inserted to transaction pool"
681                );
682            } else {
683                debug!(
684                    "{} transactions are rejected by the transaction pool",
685                    failure.len()
686                );
687                for (tx, e) in failure {
688                    trace!("Transaction {} is rejected by the transaction pool: error = {}", tx, e);
689                }
690            }
691            ctx.manager
692                .request_manager
693                .transactions_received_from_tx_hashes(&req, signed_trans);
694            Ok(())
695        } else {
696            debug!("All {} transactions are not inserted to the transaction pool, because the node is still in the catch up mode", self.transactions.len());
697            Err(Error::InCatchUpMode("transactions discarded for handling on_get_transactions_response messages".to_string()).into())
698        }
699    }
700}