cfxcore/sync/message/
get_block_txn_response.rs1use crate::{
6 message::RequestId,
7 sync::{
8 message::{
9 metrics::BLOCK_TXN_HANDLE_TIMER, Context, GetBlockTxn, Handleable,
10 },
11 Error,
12 },
13};
14use cfx_types::H256;
15use metrics::MeterTimer;
16use primitives::{Block, TransactionWithSignature};
17use rlp_derive::{RlpDecodable, RlpEncodable};
18use std::collections::HashSet;
19
20#[derive(Debug, PartialEq, Default, RlpEncodable, RlpDecodable)]
21pub struct GetBlockTxnResponse {
22 pub request_id: RequestId,
23 pub block_hash: H256,
24 pub block_txn: Vec<TransactionWithSignature>,
25}
26
27impl Handleable for GetBlockTxnResponse {
28 fn handle(self, ctx: &Context) -> Result<(), Error> {
29 let _timer = MeterTimer::time_func(BLOCK_TXN_HANDLE_TIMER.as_ref());
30
31 debug!("on_get_blocktxn_response, hash={:?}", self.block_hash);
32
33 let resp_hash = self.block_hash;
34 let req = ctx.match_request(self.request_id)?;
35 let delay = req.delay;
36 let req = req.downcast_ref::<GetBlockTxn>(
37 ctx.io,
38 &ctx.manager.request_manager,
39 )?;
40
41 let mut request_from_same_peer = false;
42 let mut received_blocks = HashSet::new();
44 if resp_hash != req.block_hash {
45 warn!("Response blocktxn is not the requested block, req={:?}, resp={:?}", req.block_hash, resp_hash);
46 } else if ctx.manager.graph.contains_block(&resp_hash) {
47 debug!(
48 "Get blocktxn, but full block already received, hash={}",
49 resp_hash
50 );
51 received_blocks.insert(resp_hash);
52 } else if let Some(header) =
53 ctx.manager.graph.block_header_by_hash(&resp_hash)
54 {
55 debug!("Process blocktxn hash={:?}", resp_hash);
56 let signed_txns = ctx
57 .manager
58 .graph
59 .data_man
60 .recover_unsigned_tx_with_order(&self.block_txn)?;
61 match ctx.manager.graph.data_man.compact_block_by_hash(&resp_hash) {
62 Some(cmpct) => {
63 let mut trans =
64 Vec::with_capacity(cmpct.reconstructed_txns.len());
65 let mut index = 0;
66 for tx in cmpct.reconstructed_txns {
67 match tx {
68 Some(tx) => trans.push(tx),
69 None => {
70 trans.push(signed_txns[index].clone());
71 index += 1;
72 }
73 }
74 }
75 let block = Block::new(header, trans);
77 debug!(
78 "transaction received by block: ratio={:?}",
79 self.block_txn.len() as f64
80 / block.transactions.len() as f64
81 );
82 debug!(
83 "new block received: block_header={:?}, tx_count={}, block_size={}",
84 block.block_header,
85 block.transactions.len(),
86 block.size(),
87 );
88 let insert_result = ctx.manager.graph.insert_block(
89 block, true, true, false, );
93
94 if !insert_result.request_again() {
95 received_blocks.insert(resp_hash);
96 }
97 if insert_result.is_valid() {
98 let (signed_txns, _) = ctx
100 .manager
101 .graph
102 .consensus
103 .tx_pool()
104 .insert_new_signed_transactions(signed_txns);
105 ctx.manager
108 .request_manager
109 .append_received_transactions(signed_txns);
110 }
111 if insert_result.should_relay()
112 && !ctx.manager.catch_up_mode()
113 {
114 ctx.manager.relay_blocks(ctx.io, vec![resp_hash]).ok();
115 }
116 if insert_result.request_again() {
117 request_from_same_peer = true;
118 }
119 }
120 None => {
121 warn!(
122 "Get blocktxn, but misses compact block, hash={}",
123 resp_hash
124 );
125 }
126 }
127 } else {
128 warn!("Get blocktxn, but header not received, hash={}", resp_hash);
129 }
130
131 let peer = if request_from_same_peer {
132 Some(ctx.node_id.clone())
133 } else {
134 None
135 };
136 ctx.manager.blocks_received(
137 ctx.io,
138 vec![req.block_hash].into_iter().collect(),
139 received_blocks,
140 true,
141 peer,
142 delay,
143 None, );
145 Ok(())
146 }
147}