cfxcore/pos/mempool/shared_mempool/
runtime.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8use crate::pos::{
9    mempool::{
10        core_mempool::CoreMempool,
11        network::NetworkReceivers,
12        shared_mempool::{
13            coordinator::{coordinator, gc_coordinator},
14            peer_manager::PeerManager,
15            transaction_validator::TransactionValidator,
16            types::{SharedMempool, SharedMempoolNotification},
17        },
18        CommitNotification, ConsensusRequest, SubmissionStatus,
19    },
20    protocol::network_sender::NetworkSender,
21};
22use anyhow::Result;
23use cached_pos_ledger_db::CachedPosLedgerDB;
24use channel::diem_channel;
25use diem_config::config::NodeConfig;
26use diem_infallible::{Mutex, RwLock};
27use diem_types::{
28    on_chain_config::OnChainConfigPayload, transaction::SignedTransaction,
29};
30use futures::channel::{
31    mpsc::{self, Receiver, UnboundedSender},
32    oneshot,
33};
34use std::sync::Arc;
35use tokio::runtime::{Builder, Handle, Runtime};
36
37/// Bootstrap of SharedMempool.
38/// Creates a separate Tokio Runtime that runs the following routines:
39///   - outbound_sync_task (task that periodically broadcasts transactions to
40///     peers).
41///   - inbound_network_task (task that handles inbound mempool messages and
42///     network events).
43///   - gc_task (task that performs GC of all expired transactions by
44///     SystemTTL).
45pub(crate) fn start_shared_mempool(
46    executor: &Handle, config: &NodeConfig, mempool: Arc<Mutex<CoreMempool>>,
47    network_sender: NetworkSender, network_receivers: NetworkReceivers,
48    client_events: mpsc::Receiver<(
49        SignedTransaction,
50        oneshot::Sender<Result<SubmissionStatus>>,
51    )>,
52    consensus_requests: mpsc::Receiver<ConsensusRequest>,
53    state_sync_requests: mpsc::Receiver<CommitNotification>,
54    mempool_reconfig_events: diem_channel::Receiver<(), OnChainConfigPayload>,
55    db_with_cache: Arc<CachedPosLedgerDB>,
56    validator: Arc<RwLock<TransactionValidator>>,
57    subscribers: Vec<UnboundedSender<SharedMempoolNotification>>,
58) {
59    let peer_manager =
60        Arc::new(PeerManager::new(config.base.role, config.mempool.clone()));
61
62    let smp = SharedMempool {
63        mempool: mempool.clone(),
64        config: config.mempool.clone(),
65        network_sender,
66        db_with_cache: db_with_cache.clone(),
67        validator,
68        peer_manager,
69        subscribers,
70        commited_pos_state: db_with_cache.db.reader.get_latest_pos_state(),
71    };
72
73    executor.spawn(coordinator(
74        smp,
75        executor.clone(),
76        network_receivers,
77        client_events,
78        consensus_requests,
79        state_sync_requests,
80        mempool_reconfig_events,
81    ));
82
83    executor.spawn(gc_coordinator(
84        mempool.clone(),
85        config.mempool.system_transaction_gc_interval_ms,
86    ));
87}
88
89pub fn bootstrap(
90    config: &NodeConfig, db_with_cache: Arc<CachedPosLedgerDB>,
91    network_sender: NetworkSender, network_receivers: NetworkReceivers,
92    client_events: Receiver<(
93        SignedTransaction,
94        oneshot::Sender<Result<SubmissionStatus>>,
95    )>,
96    consensus_requests: Receiver<ConsensusRequest>,
97    state_sync_requests: Receiver<CommitNotification>,
98    mempool_reconfig_events: diem_channel::Receiver<(), OnChainConfigPayload>,
99) -> Runtime {
100    let runtime = Builder::new_multi_thread()
101        .thread_name("shared-mem")
102        .enable_all()
103        .build()
104        .expect("[shared mempool] failed to create runtime");
105    let mempool = Arc::new(Mutex::new(CoreMempool::new(&config)));
106    let validator = Arc::new(RwLock::new(TransactionValidator::new()));
107    start_shared_mempool(
108        runtime.handle(),
109        config,
110        mempool,
111        network_sender,
112        network_receivers,
113        client_events,
114        consensus_requests,
115        state_sync_requests,
116        mempool_reconfig_events,
117        db_with_cache,
118        validator,
119        vec![],
120    );
121    runtime
122}