cfxcore/pos/protocol/message/
block_retrieval.rs

1// Copyright 2019-2020 Conflux Foundation. All rights reserved.
2// TreeGraph is free software and distributed under Apache License 2.0.
3// See https://www.apache.org/licenses/LICENSE-2.0
4
5use crate::{
6    message::{Message, RequestId},
7    pos::{
8        consensus::network::IncomingBlockRetrievalRequest,
9        protocol::{
10            message::block_retrieval_response::BlockRetrievalRpcResponse,
11            request_manager::{AsAny, Request},
12            sync_protocol::{Context, Handleable, RpcResponse},
13        },
14    },
15    sync::{Error, ProtocolConfiguration},
16};
17use channel::diem_channel::ElementStatus;
18use consensus_types::block_retrieval::{
19    BlockRetrievalRequest, BlockRetrievalResponse, BlockRetrievalStatus,
20};
21use diem_logger::prelude::diem_debug;
22use futures::channel::oneshot;
23use serde::{Deserialize, Serialize};
24use std::{any::Any, time::Duration};
25
26//#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
27#[derive(Serialize, Deserialize, Debug)]
28pub struct BlockRetrievalRpcRequest {
29    pub request_id: RequestId,
30    pub request: BlockRetrievalRequest,
31    #[serde(skip)]
32    pub is_empty: bool,
33    #[serde(skip)]
34    pub response_tx:
35        Option<oneshot::Sender<Result<Box<dyn RpcResponse>, Error>>>,
36    #[serde(skip)]
37    pub timeout: Duration,
38}
39
40impl AsAny for BlockRetrievalRpcRequest {
41    fn as_any(&self) -> &dyn Any { self }
42
43    fn as_any_mut(&mut self) -> &mut dyn Any { self }
44}
45
46impl Request for BlockRetrievalRpcRequest {
47    fn timeout(&self, _conf: &ProtocolConfiguration) -> Duration {
48        self.timeout
49    }
50
51    fn notify_error(&mut self, error: Error) {
52        let res_tx = self.response_tx.take();
53        if let Some(tx) = res_tx {
54            if let Err(e) = tx.send(Err(error)) {
55                // receiver dropped, we can just drop this error.
56                debug!("send ResponseTX EmptyError: e={:?}", e);
57            }
58        }
59    }
60
61    fn set_response_notification(
62        &mut self, res_tx: oneshot::Sender<Result<Box<dyn RpcResponse>, Error>>,
63    ) {
64        self.response_tx = Some(res_tx);
65    }
66}
67
68impl Handleable for BlockRetrievalRpcRequest {
69    fn handle(self, ctx: &Context) -> Result<(), Error> {
70        let peer_address = ctx.get_peer_account_address()?;
71        let req = self.request;
72        diem_debug!(
73            "Received block retrieval request [block id: {}, request_id: {}]",
74            req.block_id(),
75            self.request_id
76        );
77        let req_with_callback = IncomingBlockRetrievalRequest {
78            req,
79            peer_id: ctx.peer,
80            request_id: self.request_id,
81        };
82        // We only keep one pending retrieval with a LIFO style, so if an old
83        // request is dropped, we respond with an empty response so the
84        // sender does not need to wait for timeout.
85        let (status_tx, mut status_rx) = oneshot::channel();
86        ctx.manager
87            .consensus_network_task
88            .block_retrieval_tx
89            .push_with_feedback(
90                peer_address,
91                req_with_callback,
92                Some(status_tx),
93            )?;
94        if let Ok(Some(ElementStatus::Dropped(request))) = status_rx.try_recv()
95        {
96            let response = BlockRetrievalRpcResponse {
97                request_id: request.request_id,
98                response: BlockRetrievalResponse::new(
99                    BlockRetrievalStatus::IdNotFound,
100                    vec![],
101                ),
102            };
103            response.send(ctx.io, &ctx.peer)?;
104        }
105        Ok(())
106    }
107}