cfxcore/sync/message/
get_compact_blocks_response.rs1use crate::{
6 message::RequestId,
7 sync::{
8 message::{
9 metrics::{CMPCT_BLOCK_HANDLE_TIMER, CMPCT_BLOCK_RECOVER_TIMER},
10 Context, GetCompactBlocks, Handleable,
11 },
12 synchronization_protocol_handler::RecoverPublicTask,
13 Error,
14 },
15};
16use cfx_types::H256;
17use metrics::MeterTimer;
18use primitives::{block::CompactBlock, Block};
19use rlp_derive::{RlpDecodable, RlpEncodable};
20use std::collections::HashSet;
21
22#[derive(Debug, PartialEq, Default, RlpDecodable, RlpEncodable)]
23pub struct GetCompactBlocksResponse {
24 pub request_id: RequestId,
25 pub compact_blocks: Vec<CompactBlock>,
26 pub blocks: Vec<Block>,
27}
28
29impl Handleable for GetCompactBlocksResponse {
30 fn handle(self, ctx: &Context) -> Result<(), Error> {
38 let _timer = MeterTimer::time_func(CMPCT_BLOCK_HANDLE_TIMER.as_ref());
39
40 debug!(
41 "on_get_compact_blocks_response request_id={} compact={} block={}",
42 self.request_id,
43 self.compact_blocks.len(),
44 self.blocks.len()
45 );
46
47 if ctx.manager.is_block_queue_full() {
48 warn!("recover_public_queue is full, discard GetCompactBlocksResponse");
49 return Ok(());
50 }
51
52 let req = ctx.match_request(self.request_id)?;
53 let delay = req.delay;
54 let mut to_relay_blocks = Vec::new();
55 let mut received_reconstructed_blocks = Vec::new();
56
57 let mut requested_except_inflight_txn: HashSet<H256> = req
58 .downcast_ref::<GetCompactBlocks>(
59 ctx.io,
60 &ctx.manager.request_manager,
61 )?
62 .hashes
63 .iter()
64 .cloned()
65 .collect();
66
67 for mut cmpct in self.compact_blocks {
68 let hash = cmpct.hash();
69
70 if !requested_except_inflight_txn.contains(&hash) {
71 warn!("Response has not requested compact block {:?}", hash);
72 continue;
73 }
74
75 if ctx.manager.graph.contains_block(&hash) {
76 debug!(
77 "Get cmpct block, but full block already received, hash={}",
78 hash
79 );
80 continue;
81 }
82
83 let header = match ctx.manager.graph.block_header_by_hash(&hash) {
84 Some(header) => header,
85 None => {
86 warn!(
87 "Get cmpct block, but header not received, hash={}",
88 hash
89 );
90 continue;
91 }
92 };
93
94 if ctx.manager.graph.data_man.contains_compact_block(&hash) {
95 debug!("Cmpct block already received, hash={}", hash);
96 continue;
97 }
98
99 debug!("Cmpct block Processing, hash={:?}", hash);
100
101 let missing = {
102 let _timer =
103 MeterTimer::time_func(CMPCT_BLOCK_RECOVER_TIMER.as_ref());
104 ctx.manager
105 .graph
106 .data_man
107 .find_missing_tx_indices_encoded(&mut cmpct)
108 };
109 if !missing.is_empty() {
110 debug!("Request {} missing tx in {}", missing.len(), hash);
111 ctx.manager.graph.data_man.insert_compact_block(cmpct);
112 ctx.manager.request_manager.request_blocktxn(
113 ctx.io,
114 ctx.node_id.clone(),
115 hash,
116 missing,
117 None,
118 );
119 requested_except_inflight_txn.remove(&hash);
121 } else {
122 let trans = cmpct
123 .reconstructed_txns
124 .into_iter()
125 .map(|tx| tx.unwrap())
126 .collect();
127 let block = Block::new(header, trans);
128 debug!("transaction received by block: ratio=0");
129 debug!(
130 "new block received: block_header={:?}, tx_count={}, block_size={}",
131 block.block_header,
132 block.transactions.len(),
133 block.size(),
134 );
135 let insert_result = ctx.manager.graph.insert_block(
136 block, true, true, false, );
140
141 if !insert_result.request_again() {
142 received_reconstructed_blocks.push(hash);
143 }
144 if insert_result.should_relay() {
145 to_relay_blocks.push(hash);
146 }
147 }
148 }
149
150 let mut received_full_blocks = HashSet::new();
153 let mut compact_block_responded_requests =
154 requested_except_inflight_txn;
155 for block in &self.blocks {
156 received_full_blocks.insert(block.hash());
157 compact_block_responded_requests.remove(&block.hash());
158 }
159
160 ctx.manager.blocks_received(
161 ctx.io,
162 compact_block_responded_requests.clone(),
163 received_reconstructed_blocks.iter().cloned().collect(),
164 true,
165 Some(ctx.node_id.clone()),
166 delay,
167 None, );
169
170 ctx.manager.recover_public_queue.dispatch(
171 ctx.io,
172 RecoverPublicTask::new(
173 self.blocks,
174 received_full_blocks,
175 ctx.node_id.clone(),
176 true,
177 delay,
178 ),
179 );
180
181 ctx.manager.relay_blocks(ctx.io, to_relay_blocks)
183 }
184}