cfxcore/sync/message/
get_block_headers_response.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::{
6    message::{Message, RequestId},
7    sync::{
8        message::{
9            metrics::BLOCK_HEADER_HANDLE_TIMER, Context, GetBlockHeaders,
10            Handleable,
11        },
12        synchronization_state::PeerFilter,
13        Error,
14    },
15};
16use cfx_parameters::{
17    block::ACCEPTABLE_TIME_DRIFT, sync::LOCAL_BLOCK_INFO_QUERY_THRESHOLD,
18};
19use cfx_types::H256;
20use metrics::MeterTimer;
21use network::node_table::NodeId;
22use primitives::BlockHeader;
23use rlp_derive::{RlpDecodable, RlpEncodable};
24use std::{
25    collections::HashSet,
26    time::{Duration, SystemTime, UNIX_EPOCH},
27};
28
29#[derive(Debug, PartialEq, Default, RlpDecodable, RlpEncodable)]
30pub struct GetBlockHeadersResponse {
31    pub request_id: RequestId,
32    pub headers: Vec<BlockHeader>,
33}
34
35impl Handleable for GetBlockHeadersResponse {
36    fn handle(mut self, ctx: &Context) -> Result<(), Error> {
37        let _timer = MeterTimer::time_func(BLOCK_HEADER_HANDLE_TIMER.as_ref());
38
39        for header in &self.headers {
40            debug!(
41                "new block headers received: block_header={:?}, tx_count={}, block_size={}",
42                header,
43                0,
44                0,
45            );
46        }
47
48        if ctx.io.is_peer_self(&ctx.node_id) {
49            let requested = self.headers.iter().map(|h| h.hash()).collect();
50
51            self.handle_block_headers(
52                ctx,
53                &self.headers,
54                requested,
55                None,
56                None,
57            )?;
58            return Ok(());
59        }
60
61        // We may receive some messages from peer during recover from db
62        // phase. We should ignore it, since it may cause some inconsistency.
63        // This will be double checked later with `phase_manager_lock` locked.
64        if ctx.manager.in_recover_from_db_phase() {
65            return Ok(());
66        }
67
68        let req = ctx.match_request(self.request_id)?;
69        let delay = req.delay;
70        let req = req.downcast_ref::<GetBlockHeaders>(
71            ctx.io,
72            &ctx.manager.request_manager,
73        )?;
74
75        // keep first time drift validation error to return later
76        let now_timestamp = SystemTime::now()
77            .duration_since(UNIX_EPOCH)
78            .unwrap()
79            .as_secs();
80
81        let timestamp_validation_result =
82            if ctx.manager.graph.verification_config.verify_timestamp {
83                let original_size = self.headers.len();
84                self.headers.retain(|h| {
85                    ctx.manager
86                        .graph
87                        .verification_config
88                        .validate_header_timestamp(h, now_timestamp)
89                        .is_ok()
90                });
91                if original_size != self.headers.len() {
92                    // Some headers are removed because of invalid timestamps.
93                    Err(Error::InvalidTimestamp.into())
94                } else {
95                    Ok(())
96                }
97            } else {
98                Ok(())
99            };
100
101        let chosen_peer = if timestamp_validation_result.is_ok() {
102            Some(ctx.node_id.clone())
103        } else {
104            PeerFilter::new(self.msg_id())
105                .exclude(ctx.node_id.clone())
106                .select(&ctx.manager.syn)
107        };
108
109        // re-request headers requested but not received
110        let requested: HashSet<H256> = req.hashes.iter().cloned().collect();
111        self.handle_block_headers(
112            ctx,
113            &self.headers,
114            requested,
115            chosen_peer,
116            delay,
117        )?;
118
119        timestamp_validation_result
120    }
121}
122
123impl GetBlockHeadersResponse {
124    // FIXME Remove recursive call if block headers exist db
125    fn handle_block_headers(
126        &self, ctx: &Context, block_headers: &Vec<BlockHeader>,
127        requested: HashSet<H256>, chosen_peer: Option<NodeId>,
128        delay: Option<Duration>,
129    ) -> Result<(), Error> {
130        // This stores the block hashes for blocks without block body.
131        let mut hashes = Vec::new();
132        let mut dependent_hashes_bounded = HashSet::new();
133        let mut dependent_hashes_unbounded = HashSet::new();
134        // This stores the block hashes for blocks which can relay to peers.
135        let mut need_to_relay = Vec::new();
136        let mut returned_headers = HashSet::new();
137        let best_height = ctx.manager.graph.consensus.best_epoch_number();
138        let now_timestamp = SystemTime::now()
139            .duration_since(UNIX_EPOCH)
140            .unwrap()
141            .as_secs();
142        let mut has_invalid_header = false;
143        for header in block_headers {
144            let hash = header.hash();
145            returned_headers.insert(hash);
146            if ctx.manager.graph.contains_block_header(&hash) {
147                // A block header might be loaded from db and sent to the local
148                // queue multiple times, but we should only
149                // process it and request its dependence once.
150                continue;
151            }
152
153            // Check timestamp drift
154            // See comments in verify_header_graph_ready_block()
155            if ctx.manager.graph.verification_config.verify_timestamp {
156                let header_timestamp = header.timestamp();
157                if header_timestamp > now_timestamp {
158                    debug!("Block {} timestamp {} is ahead of the current time {}. Potential time drift!", hash, header_timestamp, now_timestamp);
159                }
160                if header_timestamp > now_timestamp + ACCEPTABLE_TIME_DRIFT {
161                    warn!("The drift is more than the acceptable range ({}s). The processing of block {} will be delayed.", ACCEPTABLE_TIME_DRIFT, hash);
162                    ctx.manager
163                        .graph
164                        .future_blocks
165                        .insert(header.clone(), ctx.node_id);
166                    continue;
167                }
168            }
169
170            // check whether block is in old era
171            let (era_genesis_hash, era_genesis_height) = ctx
172                .manager
173                .graph
174                .get_genesis_hash_and_height_in_current_era();
175            if (header.height() < era_genesis_height)
176                || (header.height() == era_genesis_height
177                    && header.hash() != era_genesis_hash)
178            {
179                // TODO: optimize to make block body empty
180                assert!(true);
181            }
182
183            // insert into sync graph
184            let (insert_result, to_relay) = {
185                let _pm_lock = ctx.manager.phase_manager_lock.lock();
186                // If we insert headers in CatchUpRecoverBlockFromDB,
187                // the bodies may never be requested.
188                // See issue https://github.com/Conflux-Chain/conflux-rust/issues/1869.
189                if ctx.manager.in_recover_from_db_phase() {
190                    // Remove all from inflight keys so they can be requested
191                    // again after we clean up sync graph.
192                    ctx.manager.request_manager.headers_received(
193                        ctx.io,
194                        requested.clone(),
195                        requested,
196                        delay,
197                    );
198                    return Ok(());
199                }
200                ctx.manager.graph.insert_block_header(
201                    &mut header.clone(),
202                    true,  /* need_to_verify */
203                    false, /* bench_mode */
204                    ctx.manager.insert_header_to_consensus(),
205                    true, /* persistent */
206                )
207            };
208            if insert_result.is_invalid() {
209                has_invalid_header = true;
210                continue;
211            } else if !insert_result.is_new_valid() {
212                continue;
213            }
214
215            // check missing dependencies
216            let parent = header.parent_hash();
217            if !ctx.manager.graph.contains_block_header(parent) {
218                if header.height() > best_height
219                    || best_height - header.height()
220                        < LOCAL_BLOCK_INFO_QUERY_THRESHOLD
221                {
222                    dependent_hashes_bounded.insert(*parent);
223                } else {
224                    dependent_hashes_unbounded.insert(*parent);
225                }
226            }
227
228            for referee in header.referee_hashes() {
229                if !ctx.manager.graph.contains_block_header(referee) {
230                    dependent_hashes_unbounded.insert(*referee);
231                }
232            }
233            need_to_relay.extend(to_relay);
234
235            // check block body
236            if !ctx.manager.graph.contains_block(&hash) {
237                hashes.push(hash);
238            }
239        }
240
241        // do not request headers we just received
242        for hash in &returned_headers {
243            dependent_hashes_bounded.remove(hash);
244            dependent_hashes_unbounded.remove(hash);
245        }
246        for hash in &dependent_hashes_bounded {
247            dependent_hashes_unbounded.remove(hash);
248        }
249
250        debug!(
251            "get headers response of hashes:{:?}, requesting block:{:?}",
252            returned_headers, hashes
253        );
254
255        ctx.manager.request_manager.headers_received(
256            ctx.io,
257            requested,
258            returned_headers,
259            delay,
260        );
261
262        // request missing headers. We do not need to request more headers on
263        // the pivot chain after the request_epoch mechanism is applied.
264        ctx.manager.request_block_headers(
265            ctx.io,
266            chosen_peer.clone(),
267            dependent_hashes_bounded.into_iter().collect(),
268            true, /* ignore_db */
269        );
270        ctx.manager.request_block_headers(
271            ctx.io,
272            chosen_peer.clone(),
273            dependent_hashes_unbounded.into_iter().collect(),
274            false, /* ignore_db */
275        );
276
277        if ctx.manager.need_requesting_blocks() {
278            // request missing blocks
279            ctx.manager
280                .request_missing_blocks(ctx.io, chosen_peer, hashes);
281
282            // relay if necessary
283            ctx.manager.relay_blocks(ctx.io, need_to_relay).ok();
284        }
285        if has_invalid_header {
286            return Err(Error::InvalidBlock.into());
287        }
288        Ok(())
289    }
290}