cfxcore/sync/message/
get_blocks_response.rs1use crate::{
6 message::RequestId,
7 sync::{
8 message::{
9 metrics::BLOCK_HANDLE_TIMER, Context, GetBlocks, GetCompactBlocks,
10 Handleable,
11 },
12 synchronization_protocol_handler::RecoverPublicTask,
13 Error,
14 },
15};
16use cfx_types::H256;
17use metrics::MeterTimer;
18use primitives::Block;
19use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream};
20use rlp_derive::{RlpDecodable, RlpEncodable};
21use std::collections::HashSet;
22
23#[derive(Debug, PartialEq, Default, RlpDecodable, RlpEncodable)]
24pub struct GetBlocksResponse {
25 pub request_id: RequestId,
26 pub blocks: Vec<Block>,
27}
28
29impl Handleable for GetBlocksResponse {
30 fn handle(self, ctx: &Context) -> Result<(), Error> {
31 let _timer = MeterTimer::time_func(BLOCK_HANDLE_TIMER.as_ref());
32
33 debug!(
34 "on_blocks_response, get block hashes {:?}",
35 self.blocks
36 .iter()
37 .map(|b| b.block_header.hash())
38 .collect::<Vec<H256>>()
39 );
40
41 if ctx.manager.is_block_queue_full() {
43 debug!("recover_public_queue is full, discard GetBlocksResponse");
44 return Ok(());
45 }
46
47 for block in &self.blocks {
48 debug!("transaction received by block: ratio=1");
49 debug!(
50 "new block received: block_header={:?}, tx_count={}, block_size={}",
51 block.block_header,
52 block.transactions.len(),
53 block.size(),
54 );
55 }
56
57 let req = ctx.match_request(self.request_id)?;
58 let delay = req.delay;
59 let requested_blocks: HashSet<H256> = req
60 .downcast_ref::<GetBlocks>(ctx.io, &ctx.manager.request_manager)?
61 .hashes
62 .iter()
63 .cloned()
64 .collect();
65
66 ctx.manager.recover_public_queue.dispatch(
67 ctx.io,
68 RecoverPublicTask::new(
69 self.blocks,
70 requested_blocks,
71 ctx.node_id.clone(),
72 false,
73 delay,
74 ),
75 );
76
77 Ok(())
78 }
79}
80
81#[derive(Debug, PartialEq, Default)]
84pub struct GetBlocksWithPublicResponse {
85 pub request_id: RequestId,
86 pub blocks: Vec<Block>,
87}
88
89impl Handleable for GetBlocksWithPublicResponse {
90 fn handle(self, ctx: &Context) -> Result<(), Error> {
91 debug!(
92 "on_blocks_with_public_response, get block hashes {:?}",
93 self.blocks
94 .iter()
95 .map(|b| b.block_header.hash())
96 .collect::<Vec<H256>>()
97 );
98 let req = ctx.match_request(self.request_id)?;
99 let delay = req.delay;
100 let req_hashes: HashSet<H256> = if let Ok(req) = req
101 .downcast_ref::<GetCompactBlocks>(
102 ctx.io,
103 &ctx.manager.request_manager,
104 ) {
105 req.hashes.iter().cloned().collect()
106 } else {
107 let req = req.downcast_ref::<GetBlocks>(
108 ctx.io,
109 &ctx.manager.request_manager,
110 )?;
111 req.hashes.iter().cloned().collect()
112 };
113
114 ctx.manager.recover_public_queue.dispatch(
115 ctx.io,
116 RecoverPublicTask::new(
117 self.blocks,
118 req_hashes,
119 ctx.node_id.clone(),
120 false, delay,
122 ),
123 );
124
125 Ok(())
126 }
127}
128
129impl Encodable for GetBlocksWithPublicResponse {
130 fn rlp_append(&self, stream: &mut RlpStream) {
131 stream
132 .begin_list(2)
133 .append(&self.request_id)
134 .begin_list(self.blocks.len());
135
136 for block in self.blocks.iter() {
137 stream.begin_list(2).append(&block.block_header);
138 stream.begin_list(block.transactions.len());
139 for tx in &block.transactions {
140 stream.append(tx.as_ref());
141 }
142 }
143 }
144}
145
146impl Decodable for GetBlocksWithPublicResponse {
147 fn decode(rlp: &Rlp) -> Result<Self, DecoderError> {
148 let request_id = rlp.val_at(0)?;
149 let rlp_blocks = rlp.at(1)?;
150 let mut blocks = Vec::new();
151
152 for i in 0..rlp_blocks.item_count()? {
153 let rlp_block = rlp_blocks.at(i)?;
154 let block = Block::decode_with_tx_public(&rlp_block)
155 .expect("Wrong block rlp format!");
156 blocks.push(block);
157 }
158
159 Ok(GetBlocksWithPublicResponse { request_id, blocks })
160 }
161}