cfxcore/pos/mempool/shared_mempool/
runtime.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8use crate::pos::{
9    mempool::{
10        core_mempool::CoreMempool,
11        network::NetworkReceivers,
12        shared_mempool::{
13            coordinator::{coordinator, gc_coordinator},
14            peer_manager::PeerManager,
15            transaction_validator::TransactionValidator,
16            types::{SharedMempool, SharedMempoolNotification},
17        },
18        CommitNotification, ConsensusRequest, SubmissionStatus,
19    },
20    protocol::network_sender::NetworkSender,
21};
22use anyhow::Result;
23use cached_pos_ledger_db::CachedPosLedgerDB;
24use diem_config::config::NodeConfig;
25use diem_types::transaction::SignedTransaction;
26use futures::channel::{
27    mpsc::{self, Receiver, UnboundedSender},
28    oneshot,
29};
30use parking_lot::{Mutex, RwLock};
31use std::sync::Arc;
32use tokio::runtime::{Builder, Handle, Runtime};
33
34/// Bootstrap of SharedMempool.
35/// Creates a separate Tokio Runtime that runs the following routines:
36///   - outbound_sync_task (task that periodically broadcasts transactions to
37///     peers).
38///   - inbound_network_task (task that handles inbound mempool messages and
39///     network events).
40///   - gc_task (task that performs GC of all expired transactions by
41///     SystemTTL).
42pub(crate) fn start_shared_mempool(
43    executor: &Handle, config: &NodeConfig, mempool: Arc<Mutex<CoreMempool>>,
44    network_sender: NetworkSender, network_receivers: NetworkReceivers,
45    client_events: mpsc::Receiver<(
46        SignedTransaction,
47        oneshot::Sender<Result<SubmissionStatus>>,
48    )>,
49    consensus_requests: mpsc::Receiver<ConsensusRequest>,
50    commit_notifications: mpsc::Receiver<CommitNotification>,
51    db_with_cache: Arc<CachedPosLedgerDB>,
52    validator: Arc<RwLock<TransactionValidator>>,
53    subscribers: Vec<UnboundedSender<SharedMempoolNotification>>,
54) {
55    let peer_manager = Arc::new(PeerManager::new());
56
57    let smp = SharedMempool {
58        mempool: mempool.clone(),
59        config: config.mempool.clone(),
60        network_sender,
61        db_with_cache: db_with_cache.clone(),
62        validator,
63        peer_manager,
64        subscribers,
65        commited_pos_state: db_with_cache.db.reader.get_latest_pos_state(),
66    };
67
68    executor.spawn(coordinator(
69        smp,
70        executor.clone(),
71        network_receivers,
72        client_events,
73        consensus_requests,
74        commit_notifications,
75    ));
76
77    executor.spawn(gc_coordinator(
78        mempool.clone(),
79        config.mempool.system_transaction_gc_interval_ms,
80    ));
81}
82
83pub fn bootstrap(
84    config: &NodeConfig, db_with_cache: Arc<CachedPosLedgerDB>,
85    network_sender: NetworkSender, network_receivers: NetworkReceivers,
86    client_events: Receiver<(
87        SignedTransaction,
88        oneshot::Sender<Result<SubmissionStatus>>,
89    )>,
90    consensus_requests: Receiver<ConsensusRequest>,
91    commit_notifications: Receiver<CommitNotification>,
92) -> Runtime {
93    let runtime = Builder::new_multi_thread()
94        .thread_name("shared-mem")
95        .enable_all()
96        .build()
97        .expect("[shared mempool] failed to create runtime");
98    let mempool = Arc::new(Mutex::new(CoreMempool::new(&config)));
99    let validator = Arc::new(RwLock::new(TransactionValidator::new()));
100    start_shared_mempool(
101        runtime.handle(),
102        config,
103        mempool,
104        network_sender,
105        network_receivers,
106        client_events,
107        consensus_requests,
108        commit_notifications,
109        db_with_cache,
110        validator,
111        vec![],
112    );
113    runtime
114}