1use crate::{
6 message::{Message, RequestId},
7 sync::{
8 message::{
9 metrics::BLOCK_HEADER_HANDLE_TIMER, Context, GetBlockHeaders,
10 Handleable,
11 },
12 synchronization_state::PeerFilter,
13 Error,
14 },
15};
16use cfx_parameters::{
17 block::ACCEPTABLE_TIME_DRIFT, sync::LOCAL_BLOCK_INFO_QUERY_THRESHOLD,
18};
19use cfx_types::H256;
20use metrics::MeterTimer;
21use network::node_table::NodeId;
22use primitives::BlockHeader;
23use rlp_derive::{RlpDecodable, RlpEncodable};
24use std::{
25 collections::HashSet,
26 time::{Duration, SystemTime, UNIX_EPOCH},
27};
28
29#[derive(Debug, PartialEq, Default, RlpDecodable, RlpEncodable)]
30pub struct GetBlockHeadersResponse {
31 pub request_id: RequestId,
32 pub headers: Vec<BlockHeader>,
33}
34
35impl Handleable for GetBlockHeadersResponse {
36 fn handle(mut self, ctx: &Context) -> Result<(), Error> {
37 let _timer = MeterTimer::time_func(BLOCK_HEADER_HANDLE_TIMER.as_ref());
38
39 for header in &self.headers {
40 debug!(
41 "new block headers received: block_header={:?}, tx_count={}, block_size={}",
42 header,
43 0,
44 0,
45 );
46 }
47
48 if ctx.io.is_peer_self(&ctx.node_id) {
49 let requested = self.headers.iter().map(|h| h.hash()).collect();
50
51 self.handle_block_headers(
52 ctx,
53 &self.headers,
54 requested,
55 None,
56 None,
57 )?;
58 return Ok(());
59 }
60
61 if ctx.manager.in_recover_from_db_phase() {
65 return Ok(());
66 }
67
68 let req = ctx.match_request(self.request_id)?;
69 let delay = req.delay;
70 let req = req.downcast_ref::<GetBlockHeaders>(
71 ctx.io,
72 &ctx.manager.request_manager,
73 )?;
74
75 let now_timestamp = SystemTime::now()
77 .duration_since(UNIX_EPOCH)
78 .unwrap()
79 .as_secs();
80
81 let timestamp_validation_result =
82 if ctx.manager.graph.verification_config.verify_timestamp {
83 let original_size = self.headers.len();
84 self.headers.retain(|h| {
85 ctx.manager
86 .graph
87 .verification_config
88 .validate_header_timestamp(h, now_timestamp)
89 .is_ok()
90 });
91 if original_size != self.headers.len() {
92 Err(Error::InvalidTimestamp.into())
94 } else {
95 Ok(())
96 }
97 } else {
98 Ok(())
99 };
100
101 let chosen_peer = if timestamp_validation_result.is_ok() {
102 Some(ctx.node_id.clone())
103 } else {
104 PeerFilter::new(self.msg_id())
105 .exclude(ctx.node_id.clone())
106 .select(&ctx.manager.syn)
107 };
108
109 let requested: HashSet<H256> = req.hashes.iter().cloned().collect();
111 self.handle_block_headers(
112 ctx,
113 &self.headers,
114 requested,
115 chosen_peer,
116 delay,
117 )?;
118
119 timestamp_validation_result
120 }
121}
122
123impl GetBlockHeadersResponse {
124 fn handle_block_headers(
126 &self, ctx: &Context, block_headers: &Vec<BlockHeader>,
127 requested: HashSet<H256>, chosen_peer: Option<NodeId>,
128 delay: Option<Duration>,
129 ) -> Result<(), Error> {
130 let mut hashes = Vec::new();
132 let mut dependent_hashes_bounded = HashSet::new();
133 let mut dependent_hashes_unbounded = HashSet::new();
134 let mut need_to_relay = Vec::new();
136 let mut returned_headers = HashSet::new();
137 let best_height = ctx.manager.graph.consensus.best_epoch_number();
138 let now_timestamp = SystemTime::now()
139 .duration_since(UNIX_EPOCH)
140 .unwrap()
141 .as_secs();
142 let mut has_invalid_header = false;
143 for header in block_headers {
144 let hash = header.hash();
145 returned_headers.insert(hash);
146 if ctx.manager.graph.contains_block_header(&hash) {
147 continue;
151 }
152
153 if ctx.manager.graph.verification_config.verify_timestamp {
156 let header_timestamp = header.timestamp();
157 if header_timestamp > now_timestamp {
158 debug!("Block {} timestamp {} is ahead of the current time {}. Potential time drift!", hash, header_timestamp, now_timestamp);
159 }
160 if header_timestamp > now_timestamp + ACCEPTABLE_TIME_DRIFT {
161 warn!("The drift is more than the acceptable range ({}s). The processing of block {} will be delayed.", ACCEPTABLE_TIME_DRIFT, hash);
162 ctx.manager
163 .graph
164 .future_blocks
165 .insert(header.clone(), ctx.node_id);
166 continue;
167 }
168 }
169
170 let (era_genesis_hash, era_genesis_height) = ctx
172 .manager
173 .graph
174 .get_genesis_hash_and_height_in_current_era();
175 if (header.height() < era_genesis_height)
176 || (header.height() == era_genesis_height
177 && header.hash() != era_genesis_hash)
178 {
179 assert!(true);
181 }
182
183 let (insert_result, to_relay) = {
185 let _pm_lock = ctx.manager.phase_manager_lock.lock();
186 if ctx.manager.in_recover_from_db_phase() {
190 ctx.manager.request_manager.headers_received(
193 ctx.io,
194 requested.clone(),
195 requested,
196 delay,
197 );
198 return Ok(());
199 }
200 ctx.manager.graph.insert_block_header(
201 &mut header.clone(),
202 true, false, ctx.manager.insert_header_to_consensus(),
205 true, )
207 };
208 if insert_result.is_invalid() {
209 has_invalid_header = true;
210 continue;
211 } else if !insert_result.is_new_valid() {
212 continue;
213 }
214
215 let parent = header.parent_hash();
217 if !ctx.manager.graph.contains_block_header(parent) {
218 if header.height() > best_height
219 || best_height - header.height()
220 < LOCAL_BLOCK_INFO_QUERY_THRESHOLD
221 {
222 dependent_hashes_bounded.insert(*parent);
223 } else {
224 dependent_hashes_unbounded.insert(*parent);
225 }
226 }
227
228 for referee in header.referee_hashes() {
229 if !ctx.manager.graph.contains_block_header(referee) {
230 dependent_hashes_unbounded.insert(*referee);
231 }
232 }
233 need_to_relay.extend(to_relay);
234
235 if !ctx.manager.graph.contains_block(&hash) {
237 hashes.push(hash);
238 }
239 }
240
241 for hash in &returned_headers {
243 dependent_hashes_bounded.remove(hash);
244 dependent_hashes_unbounded.remove(hash);
245 }
246 for hash in &dependent_hashes_bounded {
247 dependent_hashes_unbounded.remove(hash);
248 }
249
250 debug!(
251 "get headers response of hashes:{:?}, requesting block:{:?}",
252 returned_headers, hashes
253 );
254
255 ctx.manager.request_manager.headers_received(
256 ctx.io,
257 requested,
258 returned_headers,
259 delay,
260 );
261
262 ctx.manager.request_block_headers(
265 ctx.io,
266 chosen_peer.clone(),
267 dependent_hashes_bounded.into_iter().collect(),
268 true, );
270 ctx.manager.request_block_headers(
271 ctx.io,
272 chosen_peer.clone(),
273 dependent_hashes_unbounded.into_iter().collect(),
274 false, );
276
277 if ctx.manager.need_requesting_blocks() {
278 ctx.manager
280 .request_missing_blocks(ctx.io, chosen_peer, hashes);
281
282 ctx.manager.relay_blocks(ctx.io, need_to_relay).ok();
284 }
285 if has_invalid_header {
286 return Err(Error::InvalidBlock.into());
287 }
288 Ok(())
289 }
290}