cfxcore/pos/mempool/shared_mempool/
network.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8//! Interface between Mempool and Network layers.
9
10use crate::pos::protocol::network_event::NetworkEvent;
11use channel::{diem_channel, message_queues::QueueStyle};
12use diem_types::transaction::SignedTransaction;
13use network::node_table::NodeId;
14use serde::{Deserialize, Serialize};
15use std::{fmt::Formatter, mem::Discriminant};
16
17/// Container for exchanging transactions with other Mempools.
18#[derive(Clone, Debug, Deserialize, Serialize)]
19pub enum MempoolSyncMsg {
20    /// Broadcast request issued by the sender.
21    BroadcastTransactionsRequest {
22        /// Unique id of sync request. Can be used by sender for rebroadcast
23        /// analysis
24        request_id: Vec<u8>,
25        transactions: Vec<SignedTransaction>,
26    },
27    /// Broadcast ack issued by the receiver.
28    BroadcastTransactionsResponse {
29        request_id: Vec<u8>,
30        /// Retry signal from recipient if there are txns in corresponding
31        /// broadcast that were rejected from mempool but may succeed
32        /// on resend.
33        retry: bool,
34        /// A backpressure signal from the recipient when it is overwhelmed
35        /// (e.g., mempool is full).
36        backoff: bool,
37    },
38}
39
40impl std::fmt::Display for MempoolSyncMsg {
41    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
42        match &self {
43            Self::BroadcastTransactionsRequest { .. } => {
44                write!(f, "BroadcastTransactionsRequest")
45            }
46            Self::BroadcastTransactionsResponse { .. } => {
47                write!(f, "BroadcastTransactionsResponse")
48            }
49        }
50    }
51}
52
53/// Just a convenience struct to keep all the network proxy receiving queues in
54/// one place. Will be returned by the NetworkTask upon startup.
55pub struct NetworkReceivers {
56    /// Provide a LIFO buffer for each (Author, MessageType) key
57    pub mempool_sync_message: diem_channel::Receiver<
58        (NodeId, Discriminant<MempoolSyncMsg>),
59        (NodeId, MempoolSyncMsg),
60    >,
61    pub network_events: diem_channel::Receiver<
62        (NodeId, Discriminant<NetworkEvent>),
63        (NodeId, NetworkEvent),
64    >,
65}
66
67pub struct NetworkTask {
68    pub mempool_sync_message_tx: diem_channel::Sender<
69        (NodeId, Discriminant<MempoolSyncMsg>),
70        (NodeId, MempoolSyncMsg),
71    >,
72    pub network_events_tx: diem_channel::Sender<
73        (NodeId, Discriminant<NetworkEvent>),
74        (NodeId, NetworkEvent),
75    >,
76}
77
78impl NetworkTask {
79    /// Establishes the initial connections with the peers and returns the
80    /// receivers.
81    pub fn new() -> (NetworkTask, NetworkReceivers) {
82        let (mempool_sync_message_tx, mempool_sync_message) = diem_channel::new(
83            QueueStyle::LIFO,
84            1,
85            None, //Some(&counters::CONSENSUS_CHANNEL_MSGS),
86        );
87        let (network_events_tx, network_events) = diem_channel::new(
88            QueueStyle::LIFO,
89            1,
90            None, //Some(&counters::CONSENSUS_CHANNEL_MSGS),
91        );
92        (
93            NetworkTask {
94                mempool_sync_message_tx,
95                network_events_tx,
96            },
97            NetworkReceivers {
98                mempool_sync_message,
99                network_events,
100            },
101        )
102    }
103}