cfxcore/sync/message/
get_compact_blocks_response.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::{
6    message::RequestId,
7    sync::{
8        message::{
9            metrics::{CMPCT_BLOCK_HANDLE_TIMER, CMPCT_BLOCK_RECOVER_TIMER},
10            Context, GetCompactBlocks, Handleable,
11        },
12        synchronization_protocol_handler::RecoverPublicTask,
13        Error,
14    },
15};
16use cfx_types::H256;
17use metrics::MeterTimer;
18use primitives::{block::CompactBlock, Block};
19use rlp_derive::{RlpDecodable, RlpEncodable};
20use std::collections::HashSet;
21
22#[derive(Debug, PartialEq, Default, RlpDecodable, RlpEncodable)]
23pub struct GetCompactBlocksResponse {
24    pub request_id: RequestId,
25    pub compact_blocks: Vec<CompactBlock>,
26    pub blocks: Vec<Block>,
27}
28
29impl Handleable for GetCompactBlocksResponse {
30    /// For requested compact block,
31    ///     if a compact block is returned
32    ///         if it is recoverable and reconstructed block is valid,
33    ///             it's removed from requested_manager
34    ///         if it is recoverable and reconstructed block is not valid,
35    ///             it's sent to requested_manager as requested but not received
36    /// block, and the full block
37    fn handle(self, ctx: &Context) -> Result<(), Error> {
38        let _timer = MeterTimer::time_func(CMPCT_BLOCK_HANDLE_TIMER.as_ref());
39
40        debug!(
41            "on_get_compact_blocks_response request_id={} compact={} block={}",
42            self.request_id,
43            self.compact_blocks.len(),
44            self.blocks.len()
45        );
46
47        if ctx.manager.is_block_queue_full() {
48            warn!("recover_public_queue is full, discard GetCompactBlocksResponse");
49            return Ok(());
50        }
51
52        let req = ctx.match_request(self.request_id)?;
53        let delay = req.delay;
54        let mut to_relay_blocks = Vec::new();
55        let mut received_reconstructed_blocks = Vec::new();
56
57        let mut requested_except_inflight_txn: HashSet<H256> = req
58            .downcast_ref::<GetCompactBlocks>(
59                ctx.io,
60                &ctx.manager.request_manager,
61            )?
62            .hashes
63            .iter()
64            .cloned()
65            .collect();
66
67        for mut cmpct in self.compact_blocks {
68            let hash = cmpct.hash();
69
70            if !requested_except_inflight_txn.contains(&hash) {
71                warn!("Response has not requested compact block {:?}", hash);
72                continue;
73            }
74
75            if ctx.manager.graph.contains_block(&hash) {
76                debug!(
77                    "Get cmpct block, but full block already received, hash={}",
78                    hash
79                );
80                continue;
81            }
82
83            let header = match ctx.manager.graph.block_header_by_hash(&hash) {
84                Some(header) => header,
85                None => {
86                    warn!(
87                        "Get cmpct block, but header not received, hash={}",
88                        hash
89                    );
90                    continue;
91                }
92            };
93
94            if ctx.manager.graph.data_man.contains_compact_block(&hash) {
95                debug!("Cmpct block already received, hash={}", hash);
96                continue;
97            }
98
99            debug!("Cmpct block Processing, hash={:?}", hash);
100
101            let missing = {
102                let _timer =
103                    MeterTimer::time_func(CMPCT_BLOCK_RECOVER_TIMER.as_ref());
104                ctx.manager
105                    .graph
106                    .data_man
107                    .find_missing_tx_indices_encoded(&mut cmpct)
108            };
109            if !missing.is_empty() {
110                debug!("Request {} missing tx in {}", missing.len(), hash);
111                ctx.manager.graph.data_man.insert_compact_block(cmpct);
112                ctx.manager.request_manager.request_blocktxn(
113                    ctx.io,
114                    ctx.node_id.clone(),
115                    hash,
116                    missing,
117                    None,
118                );
119                // The block remains inflight.
120                requested_except_inflight_txn.remove(&hash);
121            } else {
122                let trans = cmpct
123                    .reconstructed_txns
124                    .into_iter()
125                    .map(|tx| tx.unwrap())
126                    .collect();
127                let block = Block::new(header, trans);
128                debug!("transaction received by block: ratio=0");
129                debug!(
130                    "new block received: block_header={:?}, tx_count={}, block_size={}",
131                    block.block_header,
132                    block.transactions.len(),
133                    block.size(),
134                );
135                let insert_result = ctx.manager.graph.insert_block(
136                    block, true,  // need_to_verify
137                    true,  // persistent
138                    false, // recover_from_db
139                );
140
141                if !insert_result.request_again() {
142                    received_reconstructed_blocks.push(hash);
143                }
144                if insert_result.should_relay() {
145                    to_relay_blocks.push(hash);
146                }
147            }
148        }
149
150        // We cannot just mark `self.blocks` as completed here because they
151        // might be invalid.
152        let mut received_full_blocks = HashSet::new();
153        let mut compact_block_responded_requests =
154            requested_except_inflight_txn;
155        for block in &self.blocks {
156            received_full_blocks.insert(block.hash());
157            compact_block_responded_requests.remove(&block.hash());
158        }
159
160        ctx.manager.blocks_received(
161            ctx.io,
162            compact_block_responded_requests.clone(),
163            received_reconstructed_blocks.iter().cloned().collect(),
164            true,
165            Some(ctx.node_id.clone()),
166            delay,
167            None, /* preferred_node_type_for_block_request */
168        );
169
170        ctx.manager.recover_public_queue.dispatch(
171            ctx.io,
172            RecoverPublicTask::new(
173                self.blocks,
174                received_full_blocks,
175                ctx.node_id.clone(),
176                true,
177                delay,
178            ),
179        );
180
181        // Broadcast completed block_header_ready blocks
182        ctx.manager.relay_blocks(ctx.io, to_relay_blocks)
183    }
184}