cfxcore/pos/protocol/message/
block_retrieval.rs1use crate::{
6 message::{Message, RequestId},
7 pos::{
8 consensus::network::IncomingBlockRetrievalRequest,
9 protocol::{
10 message::block_retrieval_response::BlockRetrievalRpcResponse,
11 request_manager::{AsAny, Request},
12 sync_protocol::{Context, Handleable, RpcResponse},
13 },
14 },
15 sync::{Error, ProtocolConfiguration},
16};
17use channel::diem_channel::ElementStatus;
18use consensus_types::block_retrieval::{
19 BlockRetrievalRequest, BlockRetrievalResponse, BlockRetrievalStatus,
20};
21use diem_logger::prelude::diem_debug;
22use futures::channel::oneshot;
23use serde::{Deserialize, Serialize};
24use std::{any::Any, time::Duration};
25
26#[derive(Serialize, Deserialize, Debug)]
28pub struct BlockRetrievalRpcRequest {
29 pub request_id: RequestId,
30 pub request: BlockRetrievalRequest,
31 #[serde(skip)]
32 pub is_empty: bool,
33 #[serde(skip)]
34 pub response_tx:
35 Option<oneshot::Sender<Result<Box<dyn RpcResponse>, Error>>>,
36 #[serde(skip)]
37 pub timeout: Duration,
38}
39
40impl AsAny for BlockRetrievalRpcRequest {
41 fn as_any(&self) -> &dyn Any { self }
42
43 fn as_any_mut(&mut self) -> &mut dyn Any { self }
44}
45
46impl Request for BlockRetrievalRpcRequest {
47 fn timeout(&self, _conf: &ProtocolConfiguration) -> Duration {
48 self.timeout
49 }
50
51 fn notify_error(&mut self, error: Error) {
52 let res_tx = self.response_tx.take();
53 if let Some(tx) = res_tx {
54 if let Err(e) = tx.send(Err(error)) {
55 debug!("send ResponseTX EmptyError: e={:?}", e);
57 }
58 }
59 }
60
61 fn set_response_notification(
62 &mut self, res_tx: oneshot::Sender<Result<Box<dyn RpcResponse>, Error>>,
63 ) {
64 self.response_tx = Some(res_tx);
65 }
66}
67
68impl Handleable for BlockRetrievalRpcRequest {
69 fn handle(self, ctx: &Context) -> Result<(), Error> {
70 let peer_address = ctx.get_peer_account_address()?;
71 let req = self.request;
72 diem_debug!(
73 "Received block retrieval request [block id: {}, request_id: {}]",
74 req.block_id(),
75 self.request_id
76 );
77 let req_with_callback = IncomingBlockRetrievalRequest {
78 req,
79 peer_id: ctx.peer,
80 request_id: self.request_id,
81 };
82 let (status_tx, mut status_rx) = oneshot::channel();
86 ctx.manager
87 .consensus_network_task
88 .block_retrieval_tx
89 .push_with_feedback(
90 peer_address,
91 req_with_callback,
92 Some(status_tx),
93 )?;
94 if let Ok(Some(ElementStatus::Dropped(request))) = status_rx.try_recv()
95 {
96 let response = BlockRetrievalRpcResponse {
97 request_id: request.request_id,
98 response: BlockRetrievalResponse::new(
99 BlockRetrievalStatus::IdNotFound,
100 vec![],
101 ),
102 };
103 response.send(ctx.io, &ctx.peer)?;
104 }
105 Ok(())
106 }
107}