cfxcore/sync/message/
get_blocks_response.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::{
6    message::RequestId,
7    sync::{
8        message::{
9            metrics::BLOCK_HANDLE_TIMER, Context, GetBlocks, GetCompactBlocks,
10            Handleable,
11        },
12        synchronization_protocol_handler::RecoverPublicTask,
13        Error,
14    },
15};
16use cfx_types::H256;
17use metrics::MeterTimer;
18use primitives::Block;
19use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream};
20use rlp_derive::{RlpDecodable, RlpEncodable};
21use std::collections::HashSet;
22
23#[derive(Debug, PartialEq, Default, RlpDecodable, RlpEncodable)]
24pub struct GetBlocksResponse {
25    pub request_id: RequestId,
26    pub blocks: Vec<Block>,
27}
28
29impl Handleable for GetBlocksResponse {
30    fn handle(self, ctx: &Context) -> Result<(), Error> {
31        let _timer = MeterTimer::time_func(BLOCK_HANDLE_TIMER.as_ref());
32
33        debug!(
34            "on_blocks_response, get block hashes {:?}",
35            self.blocks
36                .iter()
37                .map(|b| b.block_header.hash())
38                .collect::<Vec<H256>>()
39        );
40
41        // TODO Check block size in advance to avoid attacks causing OOM.
42        if ctx.manager.is_block_queue_full() {
43            debug!("recover_public_queue is full, discard GetBlocksResponse");
44            return Ok(());
45        }
46
47        for block in &self.blocks {
48            debug!("transaction received by block: ratio=1");
49            debug!(
50                "new block received: block_header={:?}, tx_count={}, block_size={}",
51                block.block_header,
52                block.transactions.len(),
53                block.size(),
54            );
55        }
56
57        let req = ctx.match_request(self.request_id)?;
58        let delay = req.delay;
59        let requested_blocks: HashSet<H256> = req
60            .downcast_ref::<GetBlocks>(ctx.io, &ctx.manager.request_manager)?
61            .hashes
62            .iter()
63            .cloned()
64            .collect();
65
66        ctx.manager.recover_public_queue.dispatch(
67            ctx.io,
68            RecoverPublicTask::new(
69                self.blocks,
70                requested_blocks,
71                ctx.node_id.clone(),
72                false,
73                delay,
74            ),
75        );
76
77        Ok(())
78    }
79}
80
81//////////////////////////////////////////////////////////////
82
83#[derive(Debug, PartialEq, Default)]
84pub struct GetBlocksWithPublicResponse {
85    pub request_id: RequestId,
86    pub blocks: Vec<Block>,
87}
88
89impl Handleable for GetBlocksWithPublicResponse {
90    fn handle(self, ctx: &Context) -> Result<(), Error> {
91        debug!(
92            "on_blocks_with_public_response, get block hashes {:?}",
93            self.blocks
94                .iter()
95                .map(|b| b.block_header.hash())
96                .collect::<Vec<H256>>()
97        );
98        let req = ctx.match_request(self.request_id)?;
99        let delay = req.delay;
100        let req_hashes: HashSet<H256> = if let Ok(req) = req
101            .downcast_ref::<GetCompactBlocks>(
102                ctx.io,
103                &ctx.manager.request_manager,
104            ) {
105            req.hashes.iter().cloned().collect()
106        } else {
107            let req = req.downcast_ref::<GetBlocks>(
108                ctx.io,
109                &ctx.manager.request_manager,
110            )?;
111            req.hashes.iter().cloned().collect()
112        };
113
114        ctx.manager.recover_public_queue.dispatch(
115            ctx.io,
116            RecoverPublicTask::new(
117                self.blocks,
118                req_hashes,
119                ctx.node_id.clone(),
120                false, /* compact */
121                delay,
122            ),
123        );
124
125        Ok(())
126    }
127}
128
129impl Encodable for GetBlocksWithPublicResponse {
130    fn rlp_append(&self, stream: &mut RlpStream) {
131        stream
132            .begin_list(2)
133            .append(&self.request_id)
134            .begin_list(self.blocks.len());
135
136        for block in self.blocks.iter() {
137            stream.begin_list(2).append(&block.block_header);
138            stream.begin_list(block.transactions.len());
139            for tx in &block.transactions {
140                stream.append(tx.as_ref());
141            }
142        }
143    }
144}
145
146impl Decodable for GetBlocksWithPublicResponse {
147    fn decode(rlp: &Rlp) -> Result<Self, DecoderError> {
148        let request_id = rlp.val_at(0)?;
149        let rlp_blocks = rlp.at(1)?;
150        let mut blocks = Vec::new();
151
152        for i in 0..rlp_blocks.item_count()? {
153            let rlp_block = rlp_blocks.at(i)?;
154            let block = Block::decode_with_tx_public(&rlp_block)
155                .expect("Wrong block rlp format!");
156            blocks.push(block);
157        }
158
159        Ok(GetBlocksWithPublicResponse { request_id, blocks })
160    }
161}