cfxcore/pos/mempool/shared_mempool/
runtime.rs1use crate::pos::{
9 mempool::{
10 core_mempool::CoreMempool,
11 network::NetworkReceivers,
12 shared_mempool::{
13 coordinator::{coordinator, gc_coordinator},
14 peer_manager::PeerManager,
15 transaction_validator::TransactionValidator,
16 types::{SharedMempool, SharedMempoolNotification},
17 },
18 CommitNotification, ConsensusRequest, SubmissionStatus,
19 },
20 protocol::network_sender::NetworkSender,
21};
22use anyhow::Result;
23use cached_pos_ledger_db::CachedPosLedgerDB;
24use channel::diem_channel;
25use diem_config::config::NodeConfig;
26use diem_infallible::{Mutex, RwLock};
27use diem_types::{
28 on_chain_config::OnChainConfigPayload, transaction::SignedTransaction,
29};
30use futures::channel::{
31 mpsc::{self, Receiver, UnboundedSender},
32 oneshot,
33};
34use std::sync::Arc;
35use tokio::runtime::{Builder, Handle, Runtime};
36
37pub(crate) fn start_shared_mempool(
46 executor: &Handle, config: &NodeConfig, mempool: Arc<Mutex<CoreMempool>>,
47 network_sender: NetworkSender, network_receivers: NetworkReceivers,
48 client_events: mpsc::Receiver<(
49 SignedTransaction,
50 oneshot::Sender<Result<SubmissionStatus>>,
51 )>,
52 consensus_requests: mpsc::Receiver<ConsensusRequest>,
53 state_sync_requests: mpsc::Receiver<CommitNotification>,
54 mempool_reconfig_events: diem_channel::Receiver<(), OnChainConfigPayload>,
55 db_with_cache: Arc<CachedPosLedgerDB>,
56 validator: Arc<RwLock<TransactionValidator>>,
57 subscribers: Vec<UnboundedSender<SharedMempoolNotification>>,
58) {
59 let peer_manager =
60 Arc::new(PeerManager::new(config.base.role, config.mempool.clone()));
61
62 let smp = SharedMempool {
63 mempool: mempool.clone(),
64 config: config.mempool.clone(),
65 network_sender,
66 db_with_cache: db_with_cache.clone(),
67 validator,
68 peer_manager,
69 subscribers,
70 commited_pos_state: db_with_cache.db.reader.get_latest_pos_state(),
71 };
72
73 executor.spawn(coordinator(
74 smp,
75 executor.clone(),
76 network_receivers,
77 client_events,
78 consensus_requests,
79 state_sync_requests,
80 mempool_reconfig_events,
81 ));
82
83 executor.spawn(gc_coordinator(
84 mempool.clone(),
85 config.mempool.system_transaction_gc_interval_ms,
86 ));
87}
88
89pub fn bootstrap(
90 config: &NodeConfig, db_with_cache: Arc<CachedPosLedgerDB>,
91 network_sender: NetworkSender, network_receivers: NetworkReceivers,
92 client_events: Receiver<(
93 SignedTransaction,
94 oneshot::Sender<Result<SubmissionStatus>>,
95 )>,
96 consensus_requests: Receiver<ConsensusRequest>,
97 state_sync_requests: Receiver<CommitNotification>,
98 mempool_reconfig_events: diem_channel::Receiver<(), OnChainConfigPayload>,
99) -> Runtime {
100 let runtime = Builder::new_multi_thread()
101 .thread_name("shared-mem")
102 .enable_all()
103 .build()
104 .expect("[shared mempool] failed to create runtime");
105 let mempool = Arc::new(Mutex::new(CoreMempool::new(&config)));
106 let validator = Arc::new(RwLock::new(TransactionValidator::new()));
107 start_shared_mempool(
108 runtime.handle(),
109 config,
110 mempool,
111 network_sender,
112 network_receivers,
113 client_events,
114 consensus_requests,
115 state_sync_requests,
116 mempool_reconfig_events,
117 db_with_cache,
118 validator,
119 vec![],
120 );
121 runtime
122}