cfxcore/pos/mempool/shared_mempool/
runtime.rs1use crate::pos::{
9 mempool::{
10 core_mempool::CoreMempool,
11 network::NetworkReceivers,
12 shared_mempool::{
13 coordinator::{coordinator, gc_coordinator},
14 peer_manager::PeerManager,
15 transaction_validator::TransactionValidator,
16 types::{SharedMempool, SharedMempoolNotification},
17 },
18 CommitNotification, ConsensusRequest, SubmissionStatus,
19 },
20 protocol::network_sender::NetworkSender,
21};
22use anyhow::Result;
23use cached_pos_ledger_db::CachedPosLedgerDB;
24use diem_config::config::NodeConfig;
25use diem_types::transaction::SignedTransaction;
26use futures::channel::{
27 mpsc::{self, Receiver, UnboundedSender},
28 oneshot,
29};
30use parking_lot::{Mutex, RwLock};
31use std::sync::Arc;
32use tokio::runtime::{Builder, Handle, Runtime};
33
34pub(crate) fn start_shared_mempool(
43 executor: &Handle, config: &NodeConfig, mempool: Arc<Mutex<CoreMempool>>,
44 network_sender: NetworkSender, network_receivers: NetworkReceivers,
45 client_events: mpsc::Receiver<(
46 SignedTransaction,
47 oneshot::Sender<Result<SubmissionStatus>>,
48 )>,
49 consensus_requests: mpsc::Receiver<ConsensusRequest>,
50 commit_notifications: mpsc::Receiver<CommitNotification>,
51 db_with_cache: Arc<CachedPosLedgerDB>,
52 validator: Arc<RwLock<TransactionValidator>>,
53 subscribers: Vec<UnboundedSender<SharedMempoolNotification>>,
54) {
55 let peer_manager = Arc::new(PeerManager::new());
56
57 let smp = SharedMempool {
58 mempool: mempool.clone(),
59 config: config.mempool.clone(),
60 network_sender,
61 db_with_cache: db_with_cache.clone(),
62 validator,
63 peer_manager,
64 subscribers,
65 commited_pos_state: db_with_cache.db.reader.get_latest_pos_state(),
66 };
67
68 executor.spawn(coordinator(
69 smp,
70 executor.clone(),
71 network_receivers,
72 client_events,
73 consensus_requests,
74 commit_notifications,
75 ));
76
77 executor.spawn(gc_coordinator(
78 mempool.clone(),
79 config.mempool.system_transaction_gc_interval_ms,
80 ));
81}
82
83pub fn bootstrap(
84 config: &NodeConfig, db_with_cache: Arc<CachedPosLedgerDB>,
85 network_sender: NetworkSender, network_receivers: NetworkReceivers,
86 client_events: Receiver<(
87 SignedTransaction,
88 oneshot::Sender<Result<SubmissionStatus>>,
89 )>,
90 consensus_requests: Receiver<ConsensusRequest>,
91 commit_notifications: Receiver<CommitNotification>,
92) -> Runtime {
93 let runtime = Builder::new_multi_thread()
94 .thread_name("shared-mem")
95 .enable_all()
96 .build()
97 .expect("[shared mempool] failed to create runtime");
98 let mempool = Arc::new(Mutex::new(CoreMempool::new(&config)));
99 let validator = Arc::new(RwLock::new(TransactionValidator::new()));
100 start_shared_mempool(
101 runtime.handle(),
102 config,
103 mempool,
104 network_sender,
105 network_receivers,
106 client_events,
107 consensus_requests,
108 commit_notifications,
109 db_with_cache,
110 validator,
111 vec![],
112 );
113 runtime
114}