cfxcore/sync/message/
get_block_txn_response.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::{
6    message::RequestId,
7    sync::{
8        message::{
9            metrics::BLOCK_TXN_HANDLE_TIMER, Context, GetBlockTxn, Handleable,
10        },
11        Error,
12    },
13};
14use cfx_types::H256;
15use metrics::MeterTimer;
16use primitives::{Block, TransactionWithSignature};
17use rlp_derive::{RlpDecodable, RlpEncodable};
18use std::collections::HashSet;
19
20#[derive(Debug, PartialEq, Default, RlpEncodable, RlpDecodable)]
21pub struct GetBlockTxnResponse {
22    pub request_id: RequestId,
23    pub block_hash: H256,
24    pub block_txn: Vec<TransactionWithSignature>,
25}
26
27impl Handleable for GetBlockTxnResponse {
28    fn handle(self, ctx: &Context) -> Result<(), Error> {
29        let _timer = MeterTimer::time_func(BLOCK_TXN_HANDLE_TIMER.as_ref());
30
31        debug!("on_get_blocktxn_response, hash={:?}", self.block_hash);
32
33        let resp_hash = self.block_hash;
34        let req = ctx.match_request(self.request_id)?;
35        let delay = req.delay;
36        let req = req.downcast_ref::<GetBlockTxn>(
37            ctx.io,
38            &ctx.manager.request_manager,
39        )?;
40
41        let mut request_from_same_peer = false;
42        // There can be at most one success block in this set.
43        let mut received_blocks = HashSet::new();
44        if resp_hash != req.block_hash {
45            warn!("Response blocktxn is not the requested block, req={:?}, resp={:?}", req.block_hash, resp_hash);
46        } else if ctx.manager.graph.contains_block(&resp_hash) {
47            debug!(
48                "Get blocktxn, but full block already received, hash={}",
49                resp_hash
50            );
51            received_blocks.insert(resp_hash);
52        } else if let Some(header) =
53            ctx.manager.graph.block_header_by_hash(&resp_hash)
54        {
55            debug!("Process blocktxn hash={:?}", resp_hash);
56            let signed_txns = ctx
57                .manager
58                .graph
59                .data_man
60                .recover_unsigned_tx_with_order(&self.block_txn)?;
61            match ctx.manager.graph.data_man.compact_block_by_hash(&resp_hash) {
62                Some(cmpct) => {
63                    let mut trans =
64                        Vec::with_capacity(cmpct.reconstructed_txns.len());
65                    let mut index = 0;
66                    for tx in cmpct.reconstructed_txns {
67                        match tx {
68                            Some(tx) => trans.push(tx),
69                            None => {
70                                trans.push(signed_txns[index].clone());
71                                index += 1;
72                            }
73                        }
74                    }
75                    // FIXME Should check if hash matches
76                    let block = Block::new(header, trans);
77                    debug!(
78                        "transaction received by block: ratio={:?}",
79                        self.block_txn.len() as f64
80                            / block.transactions.len() as f64
81                    );
82                    debug!(
83                        "new block received: block_header={:?}, tx_count={}, block_size={}",
84                        block.block_header,
85                        block.transactions.len(),
86                        block.size(),
87                    );
88                    let insert_result = ctx.manager.graph.insert_block(
89                        block, true,  // need_to_verify
90                        true,  // persistent
91                        false, // recover_from_db
92                    );
93
94                    if !insert_result.request_again() {
95                        received_blocks.insert(resp_hash);
96                    }
97                    if insert_result.is_valid() {
98                        //insert signed transaction to tx pool
99                        let (signed_txns, _) = ctx
100                            .manager
101                            .graph
102                            .consensus
103                            .tx_pool()
104                            .insert_new_signed_transactions(signed_txns);
105                        // a transaction from compact block should be
106                        // added to received pool
107                        ctx.manager
108                            .request_manager
109                            .append_received_transactions(signed_txns);
110                    }
111                    if insert_result.should_relay()
112                        && !ctx.manager.catch_up_mode()
113                    {
114                        ctx.manager.relay_blocks(ctx.io, vec![resp_hash]).ok();
115                    }
116                    if insert_result.request_again() {
117                        request_from_same_peer = true;
118                    }
119                }
120                None => {
121                    warn!(
122                        "Get blocktxn, but misses compact block, hash={}",
123                        resp_hash
124                    );
125                }
126            }
127        } else {
128            warn!("Get blocktxn, but header not received, hash={}", resp_hash);
129        }
130
131        let peer = if request_from_same_peer {
132            Some(ctx.node_id.clone())
133        } else {
134            None
135        };
136        ctx.manager.blocks_received(
137            ctx.io,
138            vec![req.block_hash].into_iter().collect(),
139            received_blocks,
140            true,
141            peer,
142            delay,
143            None, /* preferred_node_type_for_block_request */
144        );
145        Ok(())
146    }
147}