cfxcore/sync/message/
new_block.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::sync::{
6    message::{Context, Handleable},
7    Error,
8};
9use cfx_types::H256;
10use primitives::Block;
11use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream};
12
13#[derive(Debug, PartialEq)]
14pub struct NewBlock {
15    pub block: Block,
16}
17
18impl Encodable for NewBlock {
19    fn rlp_append(&self, s: &mut RlpStream) { s.append_internal(&self.block); }
20}
21
22impl Decodable for NewBlock {
23    fn decode(d: &Rlp) -> Result<Self, DecoderError> {
24        let block = d.as_val()?;
25        Ok(NewBlock { block })
26    }
27}
28
29impl Handleable for NewBlock {
30    // TODO This is only used in tests now. Maybe we can add a rpc to send full
31    // block and remove NEW_BLOCK from p2p
32    fn handle(self, ctx: &Context) -> Result<(), Error> {
33        // We may receive some messages from peer during recover from db
34        // phase. We should ignore it, since it may cause some
35        // inconsistency.
36        if ctx.manager.in_recover_from_db_phase() {
37            return Ok(());
38        }
39        let mut block = self.block;
40        ctx.manager.graph.data_man.recover_block(&mut block)?;
41
42        debug!(
43            "on_new_block, header={:?} tx_number={}",
44            block.block_header,
45            block.transactions.len()
46        );
47
48        let parent_hash = block.block_header.parent_hash().clone();
49        let referee_hashes = block.block_header.referee_hashes().clone();
50
51        let headers_to_request = std::iter::once(parent_hash)
52            .chain(referee_hashes)
53            .filter(|h| !ctx.manager.graph.contains_block_header(&h))
54            .collect();
55
56        ctx.manager.request_block_headers(
57            ctx.io,
58            Some(ctx.node_id.clone()),
59            headers_to_request,
60            true, /* ignore_db */
61        );
62
63        let need_to_relay = on_new_decoded_block(ctx, block, true, true)?;
64
65        // broadcast the hash of the newly got block
66        ctx.manager.relay_blocks(ctx.io, need_to_relay)
67    }
68}
69
70fn on_new_decoded_block(
71    ctx: &Context, mut block: Block, need_to_verify: bool, persistent: bool,
72) -> Result<Vec<H256>, Error> {
73    let hash = block.block_header.hash();
74    let mut need_to_relay = Vec::new();
75    match ctx.manager.graph.block_header_by_hash(&hash) {
76        Some(header) => block.block_header = header,
77        None => {
78            let (insert_result, to_relay) =
79                ctx.manager.graph.insert_block_header(
80                    &mut block.block_header,
81                    need_to_verify,
82                    false,
83                    false,
84                    true,
85                );
86            if insert_result.is_new_valid() {
87                need_to_relay.extend(to_relay);
88            } else {
89                return Err(Error::InvalidBlock);
90            }
91        }
92    }
93
94    let insert_result = ctx.manager.graph.insert_block(
95        block,
96        need_to_verify,
97        persistent,
98        false, // recover_from_db
99    );
100    if insert_result.should_relay() {
101        need_to_relay.push(hash);
102    }
103    Ok(need_to_relay)
104}