cfxcore/sync/message/
get_blocks.rsuse crate::{
    message::{Message, RequestId},
    sync::{
        message::{
            msgid, Context, GetBlocksResponse, GetBlocksWithPublicResponse,
            Handleable, Key, KeyContainer,
        },
        request_manager::{AsAny, Request},
        Error, ProtocolConfiguration,
    },
    NodeType,
};
use cfx_parameters::sync::MAX_PACKET_SIZE;
use cfx_types::H256;
use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
use primitives::Block;
use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream};
use std::{any::Any, time::Duration};
#[derive(Debug, PartialEq, Default, Clone, DeriveMallocSizeOf)]
pub struct GetBlocks {
    pub request_id: RequestId,
    pub with_public: bool,
    pub hashes: Vec<H256>,
    pub preferred_node_type: Option<NodeType>,
}
impl Encodable for GetBlocks {
    fn rlp_append(&self, s: &mut RlpStream) {
        s.begin_list(3)
            .append(&self.request_id)
            .append(&self.with_public)
            .append_list(&self.hashes);
    }
}
impl Decodable for GetBlocks {
    fn decode(rlp: &Rlp) -> Result<Self, DecoderError> {
        Ok(GetBlocks {
            request_id: rlp.val_at(0)?,
            with_public: rlp.val_at(1)?,
            hashes: rlp.list_at(2)?,
            preferred_node_type: None,
        })
    }
}
impl AsAny for GetBlocks {
    fn as_any(&self) -> &dyn Any { self }
    fn as_any_mut(&mut self) -> &mut dyn Any { self }
}
impl Request for GetBlocks {
    fn timeout(&self, conf: &ProtocolConfiguration) -> Duration {
        conf.blocks_request_timeout
    }
    fn on_removed(&self, inflight_keys: &KeyContainer) {
        let mut inflight_blocks = inflight_keys.write(self.msg_id());
        let mut net_inflight_blocks =
            inflight_keys.write(msgid::NET_INFLIGHT_BLOCKS);
        for hash in self.hashes.iter() {
            inflight_blocks.remove(&Key::Hash(*hash));
            net_inflight_blocks.remove(&Key::Hash(*hash));
        }
    }
    fn with_inflight(&mut self, inflight_keys: &KeyContainer) {
        let mut inflight_keys = inflight_keys.write(self.msg_id());
        self.hashes.retain(|h| inflight_keys.insert(Key::Hash(*h)));
    }
    fn is_empty(&self) -> bool { self.hashes.is_empty() }
    fn resend(&self) -> Option<Box<dyn Request>> {
        Some(Box::new(self.clone()))
    }
    fn preferred_node_type(&self) -> Option<NodeType> {
        self.preferred_node_type.clone()
    }
}
impl GetBlocks {
    fn get_blocks(&self, ctx: &Context, with_public: bool) -> Vec<Block> {
        debug!("Received GetBlocks: {:?}", self);
        let mut blocks = Vec::new();
        let mut packet_size_left = MAX_PACKET_SIZE;
        for hash in self.hashes.iter() {
            if let Some(block) = ctx.manager.graph.block_by_hash(hash) {
                let block_size = if with_public {
                    block.approximated_rlp_size_with_public()
                } else {
                    block.approximated_rlp_size()
                };
                if packet_size_left >= block_size {
                    packet_size_left -= block_size;
                    blocks.push(block.as_ref().clone());
                } else {
                    break;
                }
            }
        }
        blocks
    }
    fn send_response_with_public(
        &self, ctx: &Context, blocks: Vec<Block>,
    ) -> Result<(), Error> {
        let mut response = GetBlocksWithPublicResponse {
            request_id: self.request_id,
            blocks,
        };
        while let Err(e) = ctx.send_response(&response) {
            if GetBlocks::is_oversize_packet_err(&e) {
                let block_count = response.blocks.len() / 2;
                response.blocks.truncate(block_count);
            } else {
                return Err(e.into());
            }
        }
        Ok(())
    }
    fn is_oversize_packet_err(e: &Error) -> bool {
        match e {
            Error::Network(kind) => match kind {
                network::Error::OversizedPacket => true,
                _ => false,
            },
            _ => false,
        }
    }
    fn send_response(
        &self, ctx: &Context, blocks: Vec<Block>,
    ) -> Result<(), Error> {
        let mut response = GetBlocksResponse {
            request_id: self.request_id,
            blocks,
        };
        while let Err(e) = ctx.send_response(&response) {
            if GetBlocks::is_oversize_packet_err(&e) {
                let block_count = response.blocks.len() / 2;
                response.blocks.truncate(block_count);
            } else {
                return Err(e.into());
            }
        }
        Ok(())
    }
}
impl Handleable for GetBlocks {
    fn handle(self, ctx: &Context) -> Result<(), Error> {
        let blocks = self.get_blocks(ctx, self.with_public);
        if self.with_public {
            self.send_response_with_public(ctx, blocks)
        } else {
            self.send_response(ctx, blocks)
        }
    }
}