1use crate::{
6 message::RequestId,
7 sync::{
8 message::{
9 metrics::BLOCK_TXN_HANDLE_TIMER, Context, GetBlockTxn, Handleable,
10 },
11 Error,
12 },
13};
14use cfx_types::H256;
15use metrics::MeterTimer;
16use primitives::{Block, BlockHeader, TransactionWithSignature};
17use rlp_derive::{RlpDecodable, RlpEncodable};
18use std::collections::HashSet;
19
20#[derive(Debug, PartialEq, Default, RlpEncodable, RlpDecodable)]
21pub struct GetBlockTxnResponse {
22 pub request_id: RequestId,
23 pub block_hash: H256,
24 pub block_txn: Vec<TransactionWithSignature>,
25}
26
27impl Handleable for GetBlockTxnResponse {
28 fn handle(self, ctx: &Context) -> Result<(), Error> {
29 let _timer = MeterTimer::time_func(BLOCK_TXN_HANDLE_TIMER.as_ref());
30
31 debug!("on_get_blocktxn_response, hash={:?}", self.block_hash);
32
33 let resp_hash = self.block_hash;
34 let req = ctx.match_request(self.request_id)?;
35 let delay = req.delay;
36 let req = req.downcast_ref::<GetBlockTxn>(
39 ctx.io,
40 &ctx.manager.request_manager,
41 )?;
42
43 let outcome: Result<HandleOutcome, Error> = if resp_hash
48 != req.block_hash
49 {
50 if resp_hash == H256::default() {
56 debug!(
57 "Peer {} does not have block {}",
58 ctx.node_id, req.block_hash
59 );
60 } else {
61 warn!(
62 "Response blocktxn is not the requested block, \
63 req={:?}, resp={:?}",
64 req.block_hash, resp_hash
65 );
66 }
67 Ok(HandleOutcome::RetryAnotherPeer)
68 } else if ctx.manager.graph.contains_block(&resp_hash) {
69 debug!(
70 "Get blocktxn, but full block already received, hash={}",
71 resp_hash
72 );
73 Ok(HandleOutcome::Received)
74 } else if let Some(header) =
75 ctx.manager.graph.block_header_by_hash(&resp_hash)
76 {
77 debug!("Process blocktxn hash={:?}", resp_hash);
78 process_blocktxn(ctx, header, resp_hash, self.block_txn)
79 } else {
80 warn!("Get blocktxn, but header not received, hash={}", resp_hash);
81 Ok(HandleOutcome::RetryAnotherPeer)
82 };
83
84 let (received_blocks, peer) = match &outcome {
85 Ok(HandleOutcome::Received) => {
86 (std::iter::once(resp_hash).collect(), None)
87 }
88 Ok(HandleOutcome::RetrySamePeer) => {
89 (HashSet::new(), Some(ctx.node_id.clone()))
90 }
91 _ => (HashSet::new(), None),
92 };
93 ctx.manager.blocks_received(
94 ctx.io,
95 vec![req.block_hash].into_iter().collect(),
96 received_blocks,
97 true,
98 peer,
99 delay,
100 None, );
102 outcome.map(|_| ())
103 }
104}
105
106enum HandleOutcome {
107 Received,
109 RetrySamePeer,
112 RetryAnotherPeer,
116}
117
118fn process_blocktxn(
119 ctx: &Context, header: BlockHeader, resp_hash: H256,
120 block_txn: Vec<TransactionWithSignature>,
121) -> Result<HandleOutcome, Error> {
122 let cmpct =
123 match ctx.manager.graph.data_man.compact_block_by_hash(&resp_hash) {
124 Some(c) => c,
125 None => {
126 warn!(
127 "Get blocktxn, but misses compact block, hash={}",
128 resp_hash
129 );
130 return Ok(HandleOutcome::RetryAnotherPeer);
131 }
132 };
133
134 let signed_txns = match ctx
135 .manager
136 .graph
137 .data_man
138 .recover_unsigned_tx_with_order(&block_txn)
139 {
140 Ok(s) => s,
141 Err(e) => {
142 warn!(
143 "Peer {} sent GetBlockTxnResponse with unrecoverable \
144 transactions for block {}: {}",
145 ctx.node_id, resp_hash, e
146 );
147 return Err(Error::InvalidGetBlockTxn(format!(
148 "transaction signature recovery failed: {}",
149 e
150 )));
151 }
152 };
153
154 let block_txn_len = block_txn.len();
155 let trans = match fill_missing_slots(cmpct.reconstructed_txns, &signed_txns)
156 {
157 Some(t) => t,
158 None => {
159 warn!(
160 "Peer {} sent GetBlockTxnResponse for block {} with {} txs, \
161 which does not match the compact block's missing-slot count",
162 ctx.node_id, resp_hash, block_txn_len,
163 );
164 return Err(Error::InvalidGetBlockTxn(
165 "block_txn count does not match missing transactions".into(),
166 ));
167 }
168 };
169
170 let block = Block::new(header, trans);
173 debug!(
174 "transaction received by block: ratio={:?}",
175 block_txn_len as f64 / block.transactions.len() as f64
176 );
177 debug!(
178 "new block received: block_header={:?}, tx_count={}, block_size={}",
179 block.block_header,
180 block.transactions.len(),
181 block.size(),
182 );
183 let insert_result = ctx.manager.graph.insert_block(
184 block, true, true, false, );
188
189 if insert_result.is_valid() {
190 let (signed_txns, _) = ctx
192 .manager
193 .graph
194 .consensus
195 .tx_pool()
196 .insert_new_signed_transactions(signed_txns);
197 ctx.manager
200 .request_manager
201 .append_received_transactions(signed_txns);
202 }
203 if insert_result.should_relay() && !ctx.manager.catch_up_mode() {
204 ctx.manager.relay_blocks(ctx.io, vec![resp_hash]).ok();
205 }
206
207 if insert_result.request_again() {
208 Ok(HandleOutcome::RetrySamePeer)
209 } else {
210 Ok(HandleOutcome::Received)
211 }
212}
213
214fn fill_missing_slots<T: Clone>(
221 items: Vec<Option<T>>, candidates: &[T],
222) -> Option<Vec<T>> {
223 let mut candidates_iter = candidates.iter();
224 let result: Vec<T> = items
225 .into_iter()
226 .map(|slot| slot.or_else(|| candidates_iter.next().cloned()))
227 .collect::<Option<Vec<T>>>()?;
229
230 if candidates_iter.next().is_some() {
232 return None;
233 }
234
235 Some(result)
236}
237
238#[cfg(test)]
239mod tests {
240 use super::*;
241
242 #[test]
243 fn reject_too_few_candidates() {
244 assert!(fill_missing_slots(vec![None::<u8>], &[]).is_none());
245 }
246
247 #[test]
248 fn reject_too_many_candidates() {
249 assert!(fill_missing_slots(vec![Some(1u8)], &[2]).is_none());
250 }
251
252 #[test]
253 fn fill_missing_slots_in_order() {
254 assert_eq!(
255 fill_missing_slots(vec![Some(1u8), None, Some(3)], &[2]).unwrap(),
256 vec![1, 2, 3],
257 );
258 }
259}