cfxcore/sync/message/
get_blocks.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::{
6    message::{Message, RequestId},
7    sync::{
8        message::{
9            msgid, Context, GetBlocksResponse, GetBlocksWithPublicResponse,
10            Handleable, Key, KeyContainer,
11        },
12        request_manager::{AsAny, Request},
13        Error, ProtocolConfiguration,
14    },
15    NodeType,
16};
17use cfx_parameters::sync::MAX_PACKET_SIZE;
18use cfx_types::H256;
19use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
20use primitives::Block;
21use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream};
22use std::{any::Any, time::Duration};
23
24#[derive(Debug, PartialEq, Default, Clone, DeriveMallocSizeOf)]
25pub struct GetBlocks {
26    pub request_id: RequestId,
27    pub with_public: bool,
28    pub hashes: Vec<H256>,
29    // This field specifies what node type of peers this
30    // request needs to be sent to.
31    pub preferred_node_type: Option<NodeType>,
32}
33
34impl Encodable for GetBlocks {
35    fn rlp_append(&self, s: &mut RlpStream) {
36        s.begin_list(3)
37            .append(&self.request_id)
38            .append(&self.with_public)
39            .append_list(&self.hashes);
40    }
41}
42
43impl Decodable for GetBlocks {
44    fn decode(rlp: &Rlp) -> Result<Self, DecoderError> {
45        Ok(GetBlocks {
46            request_id: rlp.val_at(0)?,
47            with_public: rlp.val_at(1)?,
48            hashes: rlp.list_at(2)?,
49            preferred_node_type: None,
50        })
51    }
52}
53
54impl AsAny for GetBlocks {
55    fn as_any(&self) -> &dyn Any { self }
56
57    fn as_any_mut(&mut self) -> &mut dyn Any { self }
58}
59
60impl Request for GetBlocks {
61    fn timeout(&self, conf: &ProtocolConfiguration) -> Duration {
62        conf.blocks_request_timeout
63    }
64
65    fn on_removed(&self, inflight_keys: &KeyContainer) {
66        let mut inflight_blocks = inflight_keys.write(self.msg_id());
67        let mut net_inflight_blocks =
68            inflight_keys.write(msgid::NET_INFLIGHT_BLOCKS);
69        for hash in self.hashes.iter() {
70            inflight_blocks.remove(&Key::Hash(*hash));
71            net_inflight_blocks.remove(&Key::Hash(*hash));
72        }
73    }
74
75    fn with_inflight(&mut self, inflight_keys: &KeyContainer) {
76        let mut inflight_keys = inflight_keys.write(self.msg_id());
77        self.hashes.retain(|h| inflight_keys.insert(Key::Hash(*h)));
78    }
79
80    fn is_empty(&self) -> bool { self.hashes.is_empty() }
81
82    fn resend(&self) -> Option<Box<dyn Request>> {
83        Some(Box::new(self.clone()))
84    }
85
86    fn preferred_node_type(&self) -> Option<NodeType> {
87        self.preferred_node_type.clone()
88    }
89}
90
91impl GetBlocks {
92    fn get_blocks(&self, ctx: &Context, with_public: bool) -> Vec<Block> {
93        debug!("Received GetBlocks: {:?}", self);
94        let mut blocks = Vec::new();
95        let mut packet_size_left = MAX_PACKET_SIZE;
96
97        for hash in self.hashes.iter() {
98            if let Some(block) = ctx.manager.graph.block_by_hash(hash) {
99                let block_size = if with_public {
100                    block.approximated_rlp_size_with_public()
101                } else {
102                    block.approximated_rlp_size()
103                };
104
105                if packet_size_left >= block_size {
106                    packet_size_left -= block_size;
107                    blocks.push(block.as_ref().clone());
108                } else {
109                    break;
110                }
111            }
112        }
113
114        blocks
115    }
116
117    fn send_response_with_public(
118        &self, ctx: &Context, blocks: Vec<Block>,
119    ) -> Result<(), Error> {
120        let mut response = GetBlocksWithPublicResponse {
121            request_id: self.request_id,
122            blocks,
123        };
124
125        while let Err(e) = ctx.send_response(&response) {
126            if GetBlocks::is_oversize_packet_err(&e) {
127                let block_count = response.blocks.len() / 2;
128                response.blocks.truncate(block_count);
129            } else {
130                return Err(e.into());
131            }
132        }
133
134        Ok(())
135    }
136
137    fn is_oversize_packet_err(e: &Error) -> bool {
138        match e {
139            Error::Network(kind) => match kind {
140                network::Error::OversizedPacket => true,
141                _ => false,
142            },
143            _ => false,
144        }
145    }
146
147    fn send_response(
148        &self, ctx: &Context, blocks: Vec<Block>,
149    ) -> Result<(), Error> {
150        let mut response = GetBlocksResponse {
151            request_id: self.request_id,
152            blocks,
153        };
154
155        while let Err(e) = ctx.send_response(&response) {
156            if GetBlocks::is_oversize_packet_err(&e) {
157                let block_count = response.blocks.len() / 2;
158                response.blocks.truncate(block_count);
159            } else {
160                return Err(e.into());
161            }
162        }
163
164        Ok(())
165    }
166}
167
168impl Handleable for GetBlocks {
169    fn handle(self, ctx: &Context) -> Result<(), Error> {
170        let blocks = self.get_blocks(ctx, self.with_public);
171        if self.with_public {
172            self.send_response_with_public(ctx, blocks)
173        } else {
174            self.send_response(ctx, blocks)
175        }
176    }
177}