cfxcore/pos/mempool/shared_mempool/
runtime.rs1use crate::pos::{
9 mempool::{
10 core_mempool::CoreMempool,
11 network::NetworkReceivers,
12 shared_mempool::{
13 coordinator::{coordinator, gc_coordinator},
14 peer_manager::PeerManager,
15 transaction_validator::TransactionValidator,
16 types::{SharedMempool, SharedMempoolNotification},
17 },
18 CommitNotification, ConsensusRequest, SubmissionStatus,
19 },
20 protocol::network_sender::NetworkSender,
21};
22use anyhow::Result;
23use cached_pos_ledger_db::CachedPosLedgerDB;
24use diem_config::config::NodeConfig;
25use diem_types::transaction::SignedTransaction;
26use futures::channel::{
27 mpsc::{self, Receiver, UnboundedSender},
28 oneshot,
29};
30use parking_lot::{Mutex, RwLock};
31use std::sync::Arc;
32use tokio::runtime::{Builder, Handle, Runtime};
33
34pub(crate) fn start_shared_mempool(
43 executor: &Handle, config: &NodeConfig, mempool: Arc<Mutex<CoreMempool>>,
44 network_sender: NetworkSender, network_receivers: NetworkReceivers,
45 client_events: mpsc::Receiver<(
46 SignedTransaction,
47 oneshot::Sender<Result<SubmissionStatus>>,
48 )>,
49 consensus_requests: mpsc::Receiver<ConsensusRequest>,
50 commit_notifications: mpsc::Receiver<CommitNotification>,
51 db_with_cache: Arc<CachedPosLedgerDB>,
52 validator: Arc<RwLock<TransactionValidator>>,
53 subscribers: Vec<UnboundedSender<SharedMempoolNotification>>,
54) {
55 let peer_manager = Arc::new(PeerManager::new());
56
57 let smp = SharedMempool {
58 mempool: mempool.clone(),
59 config: config.mempool.clone(),
60 network_sender,
61 db_with_cache: db_with_cache.clone(),
62 validator,
63 peer_manager,
64 subscribers,
65 };
66
67 executor.spawn(coordinator(
68 smp,
69 executor.clone(),
70 network_receivers,
71 client_events,
72 consensus_requests,
73 commit_notifications,
74 ));
75
76 executor.spawn(gc_coordinator(
77 mempool.clone(),
78 config.mempool.system_transaction_gc_interval_ms,
79 ));
80}
81
82pub fn bootstrap(
83 config: &NodeConfig, db_with_cache: Arc<CachedPosLedgerDB>,
84 network_sender: NetworkSender, network_receivers: NetworkReceivers,
85 client_events: Receiver<(
86 SignedTransaction,
87 oneshot::Sender<Result<SubmissionStatus>>,
88 )>,
89 consensus_requests: Receiver<ConsensusRequest>,
90 commit_notifications: Receiver<CommitNotification>,
91) -> Runtime {
92 let runtime = Builder::new_multi_thread()
93 .thread_name("shared-mem")
94 .enable_all()
95 .build()
96 .expect("[shared mempool] failed to create runtime");
97 let mempool = Arc::new(Mutex::new(CoreMempool::new(&config)));
98 let validator = Arc::new(RwLock::new(TransactionValidator::new()));
99 start_shared_mempool(
100 runtime.handle(),
101 config,
102 mempool,
103 network_sender,
104 network_receivers,
105 client_events,
106 consensus_requests,
107 commit_notifications,
108 db_with_cache,
109 validator,
110 vec![],
111 );
112 runtime
113}