cfxcore/sync/message/
get_block_txn_response.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::{
6    message::RequestId,
7    sync::{
8        message::{
9            metrics::BLOCK_TXN_HANDLE_TIMER, Context, GetBlockTxn, Handleable,
10        },
11        Error,
12    },
13};
14use cfx_types::H256;
15use metrics::MeterTimer;
16use primitives::{Block, BlockHeader, TransactionWithSignature};
17use rlp_derive::{RlpDecodable, RlpEncodable};
18use std::collections::HashSet;
19
20#[derive(Debug, PartialEq, Default, RlpEncodable, RlpDecodable)]
21pub struct GetBlockTxnResponse {
22    pub request_id: RequestId,
23    pub block_hash: H256,
24    pub block_txn: Vec<TransactionWithSignature>,
25}
26
27impl Handleable for GetBlockTxnResponse {
28    fn handle(self, ctx: &Context) -> Result<(), Error> {
29        let _timer = MeterTimer::time_func(BLOCK_TXN_HANDLE_TIMER.as_ref());
30
31        debug!("on_get_blocktxn_response, hash={:?}", self.block_hash);
32
33        let resp_hash = self.block_hash;
34        let req = ctx.match_request(self.request_id)?;
35        let delay = req.delay;
36        // `?` here is safe: downcast_ref calls resend_request_to_another_peer
37        // on failure, whose Request::on_removed handles inflight cleanup.
38        let req = req.downcast_ref::<GetBlockTxn>(
39            ctx.io,
40            &ctx.manager.request_manager,
41        )?;
42
43        // Errors found after match_request consumed the inflight request are
44        // carried in `outcome` so blocks_received can run before Err
45        // propagates, otherwise the inflight key would leak and sync would
46        // stall on this block.
47        let outcome: Result<HandleOutcome, Error> = if resp_hash
48            != req.block_hash
49        {
50            // get_block_txn.rs returns block_hash=H256::default() when the
51            // requested block isn't available locally - that is the
52            // protocol's "I don't have it" signal from an honest peer. Only
53            // a non-zero mismatch is suspicious enough to warn; both retry
54            // via another peer without demoting.
55            if resp_hash == H256::default() {
56                debug!(
57                    "Peer {} does not have block {}",
58                    ctx.node_id, req.block_hash
59                );
60            } else {
61                warn!(
62                    "Response blocktxn is not the requested block, \
63                     req={:?}, resp={:?}",
64                    req.block_hash, resp_hash
65                );
66            }
67            Ok(HandleOutcome::RetryAnotherPeer)
68        } else if ctx.manager.graph.contains_block(&resp_hash) {
69            debug!(
70                "Get blocktxn, but full block already received, hash={}",
71                resp_hash
72            );
73            Ok(HandleOutcome::Received)
74        } else if let Some(header) =
75            ctx.manager.graph.block_header_by_hash(&resp_hash)
76        {
77            debug!("Process blocktxn hash={:?}", resp_hash);
78            process_blocktxn(ctx, header, resp_hash, self.block_txn)
79        } else {
80            warn!("Get blocktxn, but header not received, hash={}", resp_hash);
81            Ok(HandleOutcome::RetryAnotherPeer)
82        };
83
84        let (received_blocks, peer) = match &outcome {
85            Ok(HandleOutcome::Received) => {
86                (std::iter::once(resp_hash).collect(), None)
87            }
88            Ok(HandleOutcome::RetrySamePeer) => {
89                (HashSet::new(), Some(ctx.node_id.clone()))
90            }
91            _ => (HashSet::new(), None),
92        };
93        ctx.manager.blocks_received(
94            ctx.io,
95            vec![req.block_hash].into_iter().collect(),
96            received_blocks,
97            true,
98            peer,
99            delay,
100            None, /* preferred_node_type_for_block_request */
101        );
102        outcome.map(|_| ())
103    }
104}
105
106enum HandleOutcome {
107    /// Block inserted; mark resp_hash in received_blocks. No retry.
108    Received,
109    /// Insert succeeded but insert_result.request_again() — retry from the
110    /// same peer.
111    RetrySamePeer,
112    /// Nothing inserted; retry via PeerFilter (different peer). Used when
113    /// the peer is not at fault (our header expired, our compact block was
114    /// evicted).
115    RetryAnotherPeer,
116}
117
118fn process_blocktxn(
119    ctx: &Context, header: BlockHeader, resp_hash: H256,
120    block_txn: Vec<TransactionWithSignature>,
121) -> Result<HandleOutcome, Error> {
122    let cmpct =
123        match ctx.manager.graph.data_man.compact_block_by_hash(&resp_hash) {
124            Some(c) => c,
125            None => {
126                warn!(
127                    "Get blocktxn, but misses compact block, hash={}",
128                    resp_hash
129                );
130                return Ok(HandleOutcome::RetryAnotherPeer);
131            }
132        };
133
134    let signed_txns = match ctx
135        .manager
136        .graph
137        .data_man
138        .recover_unsigned_tx_with_order(&block_txn)
139    {
140        Ok(s) => s,
141        Err(e) => {
142            warn!(
143                "Peer {} sent GetBlockTxnResponse with unrecoverable \
144                 transactions for block {}: {}",
145                ctx.node_id, resp_hash, e
146            );
147            return Err(Error::InvalidGetBlockTxn(format!(
148                "transaction signature recovery failed: {}",
149                e
150            )));
151        }
152    };
153
154    let block_txn_len = block_txn.len();
155    let trans = match fill_missing_slots(cmpct.reconstructed_txns, &signed_txns)
156    {
157        Some(t) => t,
158        None => {
159            warn!(
160                "Peer {} sent GetBlockTxnResponse for block {} with {} txs, \
161                 which does not match the compact block's missing-slot count",
162                ctx.node_id, resp_hash, block_txn_len,
163            );
164            return Err(Error::InvalidGetBlockTxn(
165                "block_txn count does not match missing transactions".into(),
166            ));
167        }
168    };
169
170    // transactions_root mismatch is caught by insert_block's
171    // verify_block_integrity.
172    let block = Block::new(header, trans);
173    debug!(
174        "transaction received by block: ratio={:?}",
175        block_txn_len as f64 / block.transactions.len() as f64
176    );
177    debug!(
178        "new block received: block_header={:?}, tx_count={}, block_size={}",
179        block.block_header,
180        block.transactions.len(),
181        block.size(),
182    );
183    let insert_result = ctx.manager.graph.insert_block(
184        block, true,  // need_to_verify
185        true,  // persistent
186        false, // recover_from_db
187    );
188
189    if insert_result.is_valid() {
190        //insert signed transaction to tx pool
191        let (signed_txns, _) = ctx
192            .manager
193            .graph
194            .consensus
195            .tx_pool()
196            .insert_new_signed_transactions(signed_txns);
197        // a transaction from compact block should be
198        // added to received pool
199        ctx.manager
200            .request_manager
201            .append_received_transactions(signed_txns);
202    }
203    if insert_result.should_relay() && !ctx.manager.catch_up_mode() {
204        ctx.manager.relay_blocks(ctx.io, vec![resp_hash]).ok();
205    }
206
207    if insert_result.request_again() {
208        Ok(HandleOutcome::RetrySamePeer)
209    } else {
210        Ok(HandleOutcome::Received)
211    }
212}
213
214/// Fill missing slots with candidates in order.
215///
216/// `items` contains already-known values as `Some` and holes as `None`.
217/// `candidates` must match those holes exactly: too few candidates would leave
218/// missing values unresolved, while too many candidates means the response does
219/// not match the requested layout. Returns `None` for either mismatch.
220fn fill_missing_slots<T: Clone>(
221    items: Vec<Option<T>>, candidates: &[T],
222) -> Option<Vec<T>> {
223    let mut candidates_iter = candidates.iter();
224    let result: Vec<T> = items
225        .into_iter()
226        .map(|slot| slot.or_else(|| candidates_iter.next().cloned()))
227        // Any single None that can't be filled fails the entire reconstruction.
228        .collect::<Option<Vec<T>>>()?;
229
230    // Extra candidates means the response doesn't match the compact layout.
231    if candidates_iter.next().is_some() {
232        return None;
233    }
234
235    Some(result)
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241
242    #[test]
243    fn reject_too_few_candidates() {
244        assert!(fill_missing_slots(vec![None::<u8>], &[]).is_none());
245    }
246
247    #[test]
248    fn reject_too_many_candidates() {
249        assert!(fill_missing_slots(vec![Some(1u8)], &[2]).is_none());
250    }
251
252    #[test]
253    fn fill_missing_slots_in_order() {
254        assert_eq!(
255            fill_missing_slots(vec![Some(1u8), None, Some(3)], &[2]).unwrap(),
256            vec![1, 2, 3],
257        );
258    }
259}