cfxcore/sync/message/
get_blocks.rs1use crate::{
6 message::{Message, RequestId},
7 sync::{
8 message::{
9 msgid, Context, GetBlocksResponse, GetBlocksWithPublicResponse,
10 Handleable, Key, KeyContainer,
11 },
12 request_manager::{AsAny, Request},
13 Error, ProtocolConfiguration,
14 },
15 NodeType,
16};
17use cfx_parameters::sync::MAX_PACKET_SIZE;
18use cfx_types::H256;
19use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
20use primitives::Block;
21use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream};
22use std::{any::Any, time::Duration};
23
24#[derive(Debug, PartialEq, Default, Clone, DeriveMallocSizeOf)]
25pub struct GetBlocks {
26 pub request_id: RequestId,
27 pub with_public: bool,
28 pub hashes: Vec<H256>,
29 pub preferred_node_type: Option<NodeType>,
32}
33
34impl Encodable for GetBlocks {
35 fn rlp_append(&self, s: &mut RlpStream) {
36 s.begin_list(3)
37 .append(&self.request_id)
38 .append(&self.with_public)
39 .append_list(&self.hashes);
40 }
41}
42
43impl Decodable for GetBlocks {
44 fn decode(rlp: &Rlp) -> Result<Self, DecoderError> {
45 Ok(GetBlocks {
46 request_id: rlp.val_at(0)?,
47 with_public: rlp.val_at(1)?,
48 hashes: rlp.list_at(2)?,
49 preferred_node_type: None,
50 })
51 }
52}
53
54impl AsAny for GetBlocks {
55 fn as_any(&self) -> &dyn Any { self }
56
57 fn as_any_mut(&mut self) -> &mut dyn Any { self }
58}
59
60impl Request for GetBlocks {
61 fn timeout(&self, conf: &ProtocolConfiguration) -> Duration {
62 conf.blocks_request_timeout
63 }
64
65 fn on_removed(&self, inflight_keys: &KeyContainer) {
66 let mut inflight_blocks = inflight_keys.write(self.msg_id());
67 let mut net_inflight_blocks =
68 inflight_keys.write(msgid::NET_INFLIGHT_BLOCKS);
69 for hash in self.hashes.iter() {
70 inflight_blocks.remove(&Key::Hash(*hash));
71 net_inflight_blocks.remove(&Key::Hash(*hash));
72 }
73 }
74
75 fn with_inflight(&mut self, inflight_keys: &KeyContainer) {
76 let mut inflight_keys = inflight_keys.write(self.msg_id());
77 self.hashes.retain(|h| inflight_keys.insert(Key::Hash(*h)));
78 }
79
80 fn is_empty(&self) -> bool { self.hashes.is_empty() }
81
82 fn resend(&self) -> Option<Box<dyn Request>> {
83 Some(Box::new(self.clone()))
84 }
85
86 fn preferred_node_type(&self) -> Option<NodeType> {
87 self.preferred_node_type.clone()
88 }
89}
90
91impl GetBlocks {
92 fn get_blocks(&self, ctx: &Context, with_public: bool) -> Vec<Block> {
93 debug!("Received GetBlocks: {:?}", self);
94 let mut blocks = Vec::new();
95 let mut packet_size_left = MAX_PACKET_SIZE;
96
97 for hash in self.hashes.iter() {
98 if let Some(block) = ctx.manager.graph.block_by_hash(hash) {
99 let block_size = if with_public {
100 block.approximated_rlp_size_with_public()
101 } else {
102 block.approximated_rlp_size()
103 };
104
105 if packet_size_left >= block_size {
106 packet_size_left -= block_size;
107 blocks.push(block.as_ref().clone());
108 } else {
109 break;
110 }
111 }
112 }
113
114 blocks
115 }
116
117 fn send_response_with_public(
118 &self, ctx: &Context, blocks: Vec<Block>,
119 ) -> Result<(), Error> {
120 let mut response = GetBlocksWithPublicResponse {
121 request_id: self.request_id,
122 blocks,
123 };
124
125 while let Err(e) = ctx.send_response(&response) {
126 if GetBlocks::is_oversize_packet_err(&e) {
127 let block_count = response.blocks.len() / 2;
128 response.blocks.truncate(block_count);
129 } else {
130 return Err(e.into());
131 }
132 }
133
134 Ok(())
135 }
136
137 fn is_oversize_packet_err(e: &Error) -> bool {
138 match e {
139 Error::Network(kind) => match kind {
140 network::Error::OversizedPacket => true,
141 _ => false,
142 },
143 _ => false,
144 }
145 }
146
147 fn send_response(
148 &self, ctx: &Context, blocks: Vec<Block>,
149 ) -> Result<(), Error> {
150 let mut response = GetBlocksResponse {
151 request_id: self.request_id,
152 blocks,
153 };
154
155 while let Err(e) = ctx.send_response(&response) {
156 if GetBlocks::is_oversize_packet_err(&e) {
157 let block_count = response.blocks.len() / 2;
158 response.blocks.truncate(block_count);
159 } else {
160 return Err(e.into());
161 }
162 }
163
164 Ok(())
165 }
166}
167
168impl Handleable for GetBlocks {
169 fn handle(self, ctx: &Context) -> Result<(), Error> {
170 let blocks = self.get_blocks(ctx, self.with_public);
171 if self.with_public {
172 self.send_response_with_public(ctx, blocks)
173 } else {
174 self.send_response(ctx, blocks)
175 }
176 }
177}