1use crate::pos::mempool::{
12 core_mempool::{CoreMempool, TimelineState, TxnPointer},
13 counters,
14 logging::{LogEntry, LogEvent, LogSchema},
15 network::MempoolSyncMsg,
16 shared_mempool::{
17 transaction_validator::TransactionValidator,
18 types::{
19 notify_subscribers, ScheduledBroadcast, SharedMempool,
20 SharedMempoolNotification, SubmissionStatusBundle,
21 },
22 },
23 CommitNotification, CommitResponse, CommittedTransaction, ConsensusRequest,
24 ConsensusResponse, SubmissionStatus,
25};
26use anyhow::Result;
27use cached_pos_ledger_db::CachedPosLedgerDB;
28use diem_infallible::{Mutex, RwLock};
29use diem_logger::prelude::*;
30use diem_metrics::HistogramTimer;
31use diem_types::{
32 mempool_status::{MempoolStatus, MempoolStatusCode},
33 on_chain_config::OnChainConfigPayload,
34 transaction::SignedTransaction,
35};
36use futures::{channel::oneshot, stream::FuturesUnordered};
37use network::node_table::NodeId;
38use rayon::prelude::*;
39use std::{
40 cmp,
41 collections::HashSet,
42 sync::Arc,
43 time::{Duration, Instant},
44};
45use tokio::runtime::Handle;
46
47pub(crate) fn execute_broadcast(
53 peer: NodeId, backoff: bool, smp: &mut SharedMempool,
54 scheduled_broadcasts: &mut FuturesUnordered<ScheduledBroadcast>,
55 broadcasting_peers: &mut HashSet<NodeId>, executor: Handle,
56) {
57 diem_trace!("execute_broadcast starts: peer={}", peer);
58 let peer_manager = &smp.peer_manager.clone();
59 peer_manager.execute_broadcast(peer.clone(), backoff, smp);
60 let schedule_backoff = peer_manager.is_backoff_mode(&peer);
61
62 let interval_ms = if schedule_backoff {
63 smp.config.shared_mempool_backoff_interval_ms
64 } else {
65 smp.config.shared_mempool_tick_interval_ms
66 };
67
68 if peer_manager.contains_peer(&peer) {
69 broadcasting_peers.insert(peer);
71 scheduled_broadcasts.push(ScheduledBroadcast::new(
72 Instant::now() + Duration::from_millis(interval_ms),
73 peer,
74 schedule_backoff,
75 executor,
76 ));
77 } else {
78 broadcasting_peers.remove(&peer);
81 }
82 diem_trace!("execute_broadcast end: peer={}", peer);
83}
84
85pub(crate) async fn process_client_transaction_submission(
91 smp: SharedMempool, transaction: SignedTransaction,
92 callback: oneshot::Sender<Result<SubmissionStatus>>, timer: HistogramTimer,
93) {
94 timer.stop_and_record();
95 let _timer = counters::process_txn_submit_latency_timer(
96 counters::CLIENT_LABEL,
97 counters::CLIENT_LABEL,
98 );
99 let statuses = process_incoming_transactions(
100 &smp,
101 vec![transaction],
102 TimelineState::NotReady,
103 )
104 .await;
105 log_txn_process_results(&statuses, None);
106
107 if let Some(status) = statuses.get(0) {
108 if callback.send(Ok(status.1.clone())).is_err() {
109 diem_error!(LogSchema::event_log(
110 LogEntry::JsonRpc,
111 LogEvent::CallbackFail
112 ));
113 counters::CLIENT_CALLBACK_FAIL.inc();
114 }
115 }
116}
117
118pub(crate) async fn process_transaction_broadcast(
120 smp: SharedMempool, transactions: Vec<SignedTransaction>,
121 request_id: Vec<u8>, timeline_state: TimelineState, peer: NodeId,
122 timer: HistogramTimer,
123) {
124 diem_trace!("process_transaction_broadcast starts: peer={}", peer);
125 timer.stop_and_record();
126 let results = process_incoming_transactions(
131 &smp,
132 transactions.clone(),
133 timeline_state,
134 )
135 .await;
136 log_txn_process_results(&results, Some(peer.clone()));
137
138 let ack_response = gen_ack_response(request_id, results, &peer);
139 if let Err(e) = smp
140 .network_sender
141 .send_message_with_peer_id(&peer, &ack_response)
142 {
143 counters::network_send_fail_inc(counters::ACK_TXNS);
144 diem_error!(LogSchema::event_log(
145 LogEntry::BroadcastACK,
146 LogEvent::NetworkSendFail
147 )
148 .error(&e.into()));
150 return;
151 }
152 notify_subscribers(SharedMempoolNotification::ACK, &smp.subscribers);
153 diem_trace!("process_transaction_broadcast ends: peer={}", peer);
154}
155
156fn gen_ack_response(
157 request_id: Vec<u8>, results: Vec<SubmissionStatusBundle>, peer: &NodeId,
158) -> MempoolSyncMsg {
159 let mut backoff = false;
160 let mut retry = false;
161 for r in results.into_iter() {
162 let submission_status = r.1;
163 if submission_status.0.code == MempoolStatusCode::MempoolIsFull {
164 backoff = true;
165 }
166 if is_txn_retryable(submission_status) {
167 retry = true;
168 }
169
170 if backoff && retry {
171 break;
172 }
173 }
174
175 diem_trace!(
176 "request[{:?}] from peer[{:?}] retry[{:?}]",
177 request_id,
178 peer,
179 retry
180 );
181
182 update_ack_counter(&peer, counters::SENT_LABEL, retry, backoff);
183 MempoolSyncMsg::BroadcastTransactionsResponse {
184 request_id,
185 retry,
186 backoff,
187 }
188}
189
190pub(crate) fn update_ack_counter(
191 _peer: &NodeId, _direction_label: &str, _retry: bool, _backoff: bool,
192) {
193 }
209
210fn is_txn_retryable(result: SubmissionStatus) -> bool {
211 result.0.code == MempoolStatusCode::MempoolIsFull
212}
213
214pub(crate) async fn process_incoming_transactions(
217 smp: &SharedMempool, transactions: Vec<SignedTransaction>,
218 timeline_state: TimelineState,
219) -> Vec<SubmissionStatusBundle> {
220 let mut statuses = vec![];
221
222 let start_storage_read = Instant::now();
223 let storage_read_latency = start_storage_read.elapsed();
225 counters::PROCESS_TXN_BREAKDOWN_LATENCY
226 .with_label_values(&[counters::FETCH_SEQ_NUM_LABEL])
227 .observe(
228 storage_read_latency.as_secs_f64() / transactions.len() as f64,
229 );
230
231 let vm_validation_timer = counters::PROCESS_TXN_BREAKDOWN_LATENCY
233 .with_label_values(&[counters::VM_VALIDATION_LABEL])
234 .start_timer();
235 let transactions: Vec<SignedTransaction> = {
238 let mempool = smp.mempool.lock();
239 transactions
240 .into_iter()
241 .filter(|tx| mempool.transactions.get(&tx.hash()).is_none())
242 .collect()
243 };
244 let validation_results = transactions
245 .par_iter()
246 .map(|t| {
247 smp.validator
248 .read()
249 .validate_transaction(&t, smp.commited_pos_state.clone())
250 })
251 .collect::<Vec<_>>();
252 vm_validation_timer.stop_and_record();
253
254 {
255 let mut mempool = smp.mempool.lock();
256 for (idx, transaction) in transactions.into_iter().enumerate() {
257 if let Some(validation_result) = &validation_results[idx] {
258 match validation_result.status() {
259 None => {
260 let ranking_score = validation_result.score();
261 let governance_role =
262 validation_result.governance_role();
263 let mempool_status = mempool.add_txn(
264 transaction.clone(),
265 ranking_score,
266 timeline_state,
267 governance_role,
268 );
269 statuses.push((transaction, (mempool_status, None)));
270 }
271 Some(validation_status) => {
272 statuses.push((
273 transaction.clone(),
274 (
275 MempoolStatus::new(MempoolStatusCode::VmError),
276 Some(validation_status),
277 ),
278 ));
279 }
280 }
281 }
282 }
283 }
284 notify_subscribers(
285 SharedMempoolNotification::NewTransactions,
286 &smp.subscribers,
287 );
288 statuses
289}
290
291fn log_txn_process_results(
292 results: &[SubmissionStatusBundle], sender: Option<NodeId>,
293) {
294 let sender = match sender {
295 Some(peer) => peer,
296 None => {
297 return;
298 }
299 };
300 for (txn, (_mempool_status, maybe_vm_status)) in results.iter() {
301 if let Some(vm_status) = maybe_vm_status {
302 diem_trace!(
303 SecurityEvent::InvalidTransactionMempool,
304 failed_transaction = txn,
305 vm_status = vm_status,
306 sender = sender,
307 );
308 continue;
314 }
315 }
331}
332
333pub(crate) async fn process_state_sync_request(
338 mempool: Arc<Mutex<CoreMempool>>, req: CommitNotification,
339) {
340 let start_time = Instant::now();
341 diem_debug!(LogSchema::event_log(
342 LogEntry::StateSyncCommit,
343 LogEvent::Received
344 )
345 .state_sync_msg(&req));
346 counters::mempool_service_transactions(
347 counters::COMMIT_STATE_SYNC_LABEL,
348 req.transactions.len(),
349 );
350 commit_txns(&mempool, req.transactions, req.block_timestamp_usecs, false)
351 .await;
352 let result = if req.callback.send(Ok(CommitResponse::success())).is_err() {
353 diem_error!(LogSchema::event_log(
354 LogEntry::StateSyncCommit,
355 LogEvent::CallbackFail
356 ));
357 counters::REQUEST_FAIL_LABEL
358 } else {
359 counters::REQUEST_SUCCESS_LABEL
360 };
361 let latency = start_time.elapsed();
362 counters::mempool_service_latency(
363 counters::COMMIT_STATE_SYNC_LABEL,
364 result,
365 latency,
366 );
367}
368
369pub(crate) async fn process_consensus_request(
370 db: Arc<CachedPosLedgerDB>, mempool: &Mutex<CoreMempool>,
371 req: ConsensusRequest,
372) {
373 let start_time = Instant::now();
375 diem_debug!(
376 LogSchema::event_log(LogEntry::Consensus, LogEvent::Received)
377 .consensus_msg(&req)
378 );
379
380 let (resp, callback, counter_label) = match req {
381 ConsensusRequest::GetBlockRequest(
382 max_block_size,
383 transactions,
384 parent_block_id,
385 validators,
386 callback,
387 ) => {
388 let exclude_transactions: HashSet<TxnPointer> = transactions
389 .iter()
390 .map(|txn| (txn.sender, txn.hash))
391 .collect();
392 let mut txns;
393 {
394 let mut mempool = mempool.lock();
395 let curr_time = diem_infallible::duration_since_epoch();
400 mempool.gc_by_expiration_time(curr_time);
401 let block_size = cmp::max(max_block_size, 1);
402 let pos_state = db
403 .get_pos_state(&parent_block_id)
404 .expect("pos_state should exist");
405 txns = mempool.get_block(
406 block_size,
407 exclude_transactions,
408 &pos_state,
409 validators,
410 );
411 }
412 counters::mempool_service_transactions(
413 counters::GET_BLOCK_LABEL,
414 txns.len(),
415 );
416 txns.len();
417 let pulled_block =
418 txns.drain(..).map(SignedTransaction::into).collect();
419
420 (
421 ConsensusResponse::GetBlockResponse(pulled_block),
422 callback,
423 counters::GET_BLOCK_LABEL,
424 )
425 }
426 ConsensusRequest::RejectNotification(transactions, callback) => {
427 counters::mempool_service_transactions(
428 counters::COMMIT_CONSENSUS_LABEL,
429 transactions.len(),
430 );
431 commit_txns(mempool, transactions, 0, true).await;
432 (
433 ConsensusResponse::CommitResponse(),
434 callback,
435 counters::COMMIT_CONSENSUS_LABEL,
436 )
437 }
438 };
439 let result = if callback.send(Ok(resp)).is_err() {
441 diem_error!(LogSchema::event_log(
442 LogEntry::Consensus,
443 LogEvent::CallbackFail
444 ));
445 counters::REQUEST_FAIL_LABEL
446 } else {
447 counters::REQUEST_SUCCESS_LABEL
448 };
449 let latency = start_time.elapsed();
450 counters::mempool_service_latency(counter_label, result, latency);
451}
452
453async fn commit_txns(
454 mempool: &Mutex<CoreMempool>, transactions: Vec<CommittedTransaction>,
455 block_timestamp_usecs: u64, is_rejected: bool,
456) {
457 let mut pool = mempool.lock();
458
459 for transaction in transactions {
460 pool.remove_transaction(
461 &transaction.sender,
462 transaction.hash,
463 is_rejected,
464 );
465 }
466
467 if block_timestamp_usecs > 0 {
468 pool.gc_by_expiration_time(Duration::from_micros(
469 block_timestamp_usecs,
470 ));
471 }
472}
473
474pub(crate) async fn process_config_update(
476 config_update: OnChainConfigPayload,
477 _validator: Arc<RwLock<TransactionValidator>>,
478) {
479 diem_trace!(LogSchema::event_log(
480 LogEntry::ReconfigUpdate,
481 LogEvent::Process
482 )
483 .reconfig_update(config_update.clone()));
484
485 }