cfxcore/pos/mempool/shared_mempool/
runtime.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8use crate::pos::{
9    mempool::{
10        core_mempool::CoreMempool,
11        network::NetworkReceivers,
12        shared_mempool::{
13            coordinator::{coordinator, gc_coordinator},
14            peer_manager::PeerManager,
15            transaction_validator::TransactionValidator,
16            types::{SharedMempool, SharedMempoolNotification},
17        },
18        CommitNotification, ConsensusRequest, SubmissionStatus,
19    },
20    protocol::network_sender::NetworkSender,
21};
22use anyhow::Result;
23use cached_pos_ledger_db::CachedPosLedgerDB;
24use diem_config::config::NodeConfig;
25use diem_types::transaction::SignedTransaction;
26use futures::channel::{
27    mpsc::{self, Receiver, UnboundedSender},
28    oneshot,
29};
30use parking_lot::{Mutex, RwLock};
31use std::sync::Arc;
32use tokio::runtime::{Builder, Handle, Runtime};
33
34/// Bootstrap of SharedMempool.
35/// Creates a separate Tokio Runtime that runs the following routines:
36///   - outbound_sync_task (task that periodically broadcasts transactions to
37///     peers).
38///   - inbound_network_task (task that handles inbound mempool messages and
39///     network events).
40///   - gc_task (task that performs GC of all expired transactions by
41///     SystemTTL).
42pub(crate) fn start_shared_mempool(
43    executor: &Handle, config: &NodeConfig, mempool: Arc<Mutex<CoreMempool>>,
44    network_sender: NetworkSender, network_receivers: NetworkReceivers,
45    client_events: mpsc::Receiver<(
46        SignedTransaction,
47        oneshot::Sender<Result<SubmissionStatus>>,
48    )>,
49    consensus_requests: mpsc::Receiver<ConsensusRequest>,
50    commit_notifications: mpsc::Receiver<CommitNotification>,
51    db_with_cache: Arc<CachedPosLedgerDB>,
52    validator: Arc<RwLock<TransactionValidator>>,
53    subscribers: Vec<UnboundedSender<SharedMempoolNotification>>,
54) {
55    let peer_manager = Arc::new(PeerManager::new());
56
57    let smp = SharedMempool {
58        mempool: mempool.clone(),
59        config: config.mempool.clone(),
60        network_sender,
61        db_with_cache: db_with_cache.clone(),
62        validator,
63        peer_manager,
64        subscribers,
65    };
66
67    executor.spawn(coordinator(
68        smp,
69        executor.clone(),
70        network_receivers,
71        client_events,
72        consensus_requests,
73        commit_notifications,
74    ));
75
76    executor.spawn(gc_coordinator(
77        mempool.clone(),
78        config.mempool.system_transaction_gc_interval_ms,
79    ));
80}
81
82pub fn bootstrap(
83    config: &NodeConfig, db_with_cache: Arc<CachedPosLedgerDB>,
84    network_sender: NetworkSender, network_receivers: NetworkReceivers,
85    client_events: Receiver<(
86        SignedTransaction,
87        oneshot::Sender<Result<SubmissionStatus>>,
88    )>,
89    consensus_requests: Receiver<ConsensusRequest>,
90    commit_notifications: Receiver<CommitNotification>,
91) -> Runtime {
92    let runtime = Builder::new_multi_thread()
93        .thread_name("shared-mem")
94        .enable_all()
95        .build()
96        .expect("[shared mempool] failed to create runtime");
97    let mempool = Arc::new(Mutex::new(CoreMempool::new(&config)));
98    let validator = Arc::new(RwLock::new(TransactionValidator::new()));
99    start_shared_mempool(
100        runtime.handle(),
101        config,
102        mempool,
103        network_sender,
104        network_receivers,
105        client_events,
106        consensus_requests,
107        commit_notifications,
108        db_with_cache,
109        validator,
110        vec![],
111    );
112    runtime
113}