cfxcore/pos/mempool/core_mempool/
mempool.rs1use crate::pos::mempool::{
11 core_mempool::{
12 index::TxnPointer,
13 transaction::{MempoolTransaction, TimelineState},
14 transaction_store::TransactionStore,
15 ttl_cache::TtlCache,
16 },
17 counters,
18 logging::{LogEntry, LogSchema, TxnsLog},
19};
20use diem_config::config::NodeConfig;
21use diem_crypto::{hash::CryptoHash, HashValue};
22use diem_logger::prelude::*;
23use diem_types::{
24 account_address::AccountAddress,
25 mempool_status::MempoolStatus,
26 term_state::PosState,
27 transaction::{
28 authenticator::TransactionAuthenticator, GovernanceRole,
29 SignedTransaction, TransactionPayload,
30 },
31 validator_verifier::ValidatorVerifier,
32};
33use executor::vm::verify_dispute;
34use std::{
35 collections::HashSet,
36 time::{Duration, SystemTime},
37};
38
39pub struct Mempool {
40 pub transactions: TransactionStore,
42
43 pub(crate) metrics_cache: TtlCache<(AccountAddress, HashValue), SystemTime>,
48 pub system_transaction_timeout: Duration,
49}
50
51impl Mempool {
52 pub fn new(config: &NodeConfig) -> Self {
53 Mempool {
54 transactions: TransactionStore::new(&config.mempool),
55 metrics_cache: TtlCache::new(
56 config.mempool.capacity,
57 Duration::from_secs(100),
58 ),
59 system_transaction_timeout: Duration::from_secs(
60 config.mempool.system_transaction_timeout_secs,
61 ),
62 }
63 }
64
65 pub(crate) fn remove_transaction(
67 &mut self, sender: &AccountAddress, hash: HashValue, is_rejected: bool,
68 ) {
69 diem_trace!(
70 LogSchema::new(LogEntry::RemoveTxn)
71 .txns(TxnsLog::new_txn(*sender, hash)),
72 is_rejected = is_rejected
73 );
74 let metric_label = if is_rejected {
75 counters::COMMIT_REJECTED_LABEL
76 } else {
77 counters::COMMIT_ACCEPTED_LABEL
78 };
79 self.log_latency(*sender, hash, metric_label);
80 self.metrics_cache.remove(&(*sender, hash));
81
82 if is_rejected {
83 self.transactions.reject_transaction(&sender, hash);
84 } else {
85 self.transactions.commit_transaction(&sender, hash);
86 }
87 }
88
89 fn log_latency(
90 &mut self, account: AccountAddress, hash: HashValue, metric: &str,
91 ) {
92 if let Some(&creation_time) = self.metrics_cache.get(&(account, hash)) {
93 if let Ok(time_delta) =
94 SystemTime::now().duration_since(creation_time)
95 {
96 counters::CORE_MEMPOOL_TXN_COMMIT_LATENCY
97 .with_label_values(&[metric])
98 .observe(time_delta.as_secs_f64());
99 }
100 }
101 }
102
103 pub(crate) fn add_txn(
106 &mut self, txn: SignedTransaction, ranking_score: u64,
107 timeline_state: TimelineState, governance_role: GovernanceRole,
108 ) -> MempoolStatus {
109 diem_trace!(LogSchema::new(LogEntry::AddTxn)
110 .txns(TxnsLog::new_txn(txn.sender(), txn.hash())),);
111
112 let expiration_time = diem_infallible::duration_since_epoch()
113 + self.system_transaction_timeout;
114 if timeline_state != TimelineState::NonQualified {
115 self.metrics_cache
116 .insert((txn.sender(), txn.hash()), SystemTime::now());
117 }
118
119 let txn_info = MempoolTransaction::new(
120 txn,
121 expiration_time,
122 ranking_score,
123 timeline_state,
124 governance_role,
125 );
126
127 self.transactions.insert(txn_info)
128 }
129
130 #[allow(clippy::explicit_counter_loop)]
135 pub(crate) fn get_block(
136 &mut self, _batch_size: u64, mut seen: HashSet<TxnPointer>,
137 pos_state: &PosState, validators: ValidatorVerifier,
138 ) -> Vec<SignedTransaction> {
139 let mut block = vec![];
140 let mut block_log = TxnsLog::new();
141 let seen_size = seen.len();
149 let mut txn_walked = 0usize;
150 for txn in self.transactions.iter() {
152 txn_walked += 1;
153 if seen.contains(&TxnPointer::from(txn)) {
154 continue;
155 }
156 let validate_result = match txn.txn.payload() {
157 TransactionPayload::Election(election_payload) => {
158 pos_state.validate_election(election_payload)
159 }
160 TransactionPayload::PivotDecision(_) => {
161 seen.insert((txn.get_sender(), txn.get_hash()));
162 continue;
163 }
164 TransactionPayload::Dispute(dispute_payload) => {
165 pos_state.validate_dispute(dispute_payload).and(
167 verify_dispute(dispute_payload)
168 .then_some(())
169 .ok_or(anyhow::anyhow!("invalid dispute")),
170 )
171 }
172 _ => {
173 continue;
174 }
175 };
176 if validate_result.is_ok() {
177 block.push(txn.txn.clone());
178 block_log.add(txn.get_sender(), txn.get_hash());
179 seen.insert((txn.get_sender(), txn.get_hash()));
180 }
181 }
182 let mut max_pivot_height = 0;
183 let mut chosen_pivot_tx = None;
184 for pivot_decision_set in self.transactions.iter_pivot_decision() {
186 let mut pivot_decision_opt = None;
187 diem_debug!("get_block: 0 {:?}", pivot_decision_set.len());
188 for (account, hash) in pivot_decision_set.iter() {
189 if validators.get_public_key(account).is_some() {
190 if pivot_decision_opt.is_none() {
191 if let Some(txn) = self.transactions.get(hash) {
192 pivot_decision_opt = Some(txn);
193 }
194 }
195 }
196 }
197 diem_debug!("get_block: 1 {:?}", pivot_decision_opt);
198 if validators
199 .check_voting_power(
200 pivot_decision_set.iter().map(|(addr, _)| addr),
201 )
202 .is_ok()
203 {
204 let pivot_decision = pivot_decision_opt.unwrap();
205 let pivot_height = match pivot_decision.payload() {
206 TransactionPayload::PivotDecision(decision) => {
207 decision.height
208 }
209 _ => unreachable!(),
210 };
211 if pivot_height > max_pivot_height
212 && pivot_height > pos_state.pivot_decision().height
213 {
214 max_pivot_height = pivot_height;
215 chosen_pivot_tx = Some(pivot_decision);
216 }
217 }
218 diem_debug!("get_block: 2 {:?}", chosen_pivot_tx);
219 }
220 if let Some(tx) = chosen_pivot_tx {
221 let pivot_decision_hash = match tx.payload() {
222 TransactionPayload::PivotDecision(decision) => decision.hash(),
223 _ => unreachable!(),
224 };
225 let txn_hashes =
227 self.transactions.get_pivot_decisions(&pivot_decision_hash);
228 let senders: Vec<AccountAddress> =
229 validators.get_ordered_account_addresses_iter().collect();
230 let mut signatures = vec![];
231 for hash in &txn_hashes {
232 if let Some(txn) = self.transactions.get(hash) {
233 match txn.authenticator() {
234 TransactionAuthenticator::BLS { signature, .. } => {
235 if let Ok(index) =
236 senders.binary_search(&txn.sender())
237 {
238 signatures.push((signature, index));
239 }
240 }
241 _ => unreachable!(),
242 }
243 }
244 }
245 let new_tx =
246 SignedTransaction::new_multisig(tx.raw_txn(), signatures);
247 block_log.add(new_tx.sender(), new_tx.hash());
248 block.push(new_tx);
249 }
250
251 diem_debug!(
252 LogSchema::new(LogEntry::GetBlock).txns(block_log),
253 seen_consensus = seen_size,
254 walked = txn_walked,
255 seen_after = seen.len(),
256 result_size = block.len(),
257 block_size = block.len()
258 );
259 for transaction in &block {
260 self.log_latency(
261 transaction.sender(),
262 transaction.hash(),
263 counters::GET_BLOCK_STAGE_LABEL,
264 );
265 }
266 block
267 }
268
269 pub(crate) fn gc(&mut self) {
273 let now = SystemTime::now();
274 self.transactions.gc_by_system_ttl(&self.metrics_cache);
275 self.metrics_cache.gc(now);
276 }
277
278 pub(crate) fn gc_by_expiration_time(&mut self, block_time: Duration) {
280 self.transactions
281 .gc_by_expiration_time(block_time, &self.metrics_cache);
282 }
283
284 pub(crate) fn read_timeline(
287 &mut self, timeline_id: u64, count: usize,
288 ) -> (Vec<SignedTransaction>, u64) {
289 self.transactions.read_timeline(timeline_id, count)
290 }
291
292 pub(crate) fn timeline_range(
295 &mut self, start_id: u64, end_id: u64,
296 ) -> Vec<SignedTransaction> {
297 self.transactions.timeline_range(start_id, end_id)
298 }
299}