cfxcore/pos/mempool/shared_mempool/
tasks.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8//! Tasks that are executed by coordinators (short-lived compared to
9//! coordinators)
10
11use crate::pos::mempool::{
12    core_mempool::{CoreMempool, TimelineState, TxnPointer},
13    counters,
14    logging::{LogEntry, LogEvent, LogSchema},
15    network::MempoolSyncMsg,
16    shared_mempool::{
17        transaction_validator::TransactionValidator,
18        types::{
19            notify_subscribers, ScheduledBroadcast, SharedMempool,
20            SharedMempoolNotification, SubmissionStatusBundle,
21        },
22    },
23    CommitNotification, CommitResponse, CommittedTransaction, ConsensusRequest,
24    ConsensusResponse, SubmissionStatus,
25};
26use anyhow::Result;
27use cached_pos_ledger_db::CachedPosLedgerDB;
28use diem_infallible::{Mutex, RwLock};
29use diem_logger::prelude::*;
30use diem_metrics::HistogramTimer;
31use diem_types::{
32    mempool_status::{MempoolStatus, MempoolStatusCode},
33    on_chain_config::OnChainConfigPayload,
34    transaction::SignedTransaction,
35};
36use futures::{channel::oneshot, stream::FuturesUnordered};
37use network::node_table::NodeId;
38use rayon::prelude::*;
39use std::{
40    cmp,
41    collections::HashSet,
42    sync::Arc,
43    time::{Duration, Instant},
44};
45use tokio::runtime::Handle;
46
47// ============================== //
48//  broadcast_coordinator tasks  //
49// ============================== //
50
51/// Attempts broadcast to `peer` and schedules the next broadcast.
52pub(crate) fn execute_broadcast(
53    peer: NodeId, backoff: bool, smp: &mut SharedMempool,
54    scheduled_broadcasts: &mut FuturesUnordered<ScheduledBroadcast>,
55    broadcasting_peers: &mut HashSet<NodeId>, executor: Handle,
56) {
57    diem_trace!("execute_broadcast starts: peer={}", peer);
58    let peer_manager = &smp.peer_manager.clone();
59    peer_manager.execute_broadcast(peer.clone(), backoff, smp);
60    let schedule_backoff = peer_manager.is_backoff_mode(&peer);
61
62    let interval_ms = if schedule_backoff {
63        smp.config.shared_mempool_backoff_interval_ms
64    } else {
65        smp.config.shared_mempool_tick_interval_ms
66    };
67
68    if peer_manager.contains_peer(&peer) {
69        // Make sure we only has one broadcast task for one peer id.
70        broadcasting_peers.insert(peer);
71        scheduled_broadcasts.push(ScheduledBroadcast::new(
72            Instant::now() + Duration::from_millis(interval_ms),
73            peer,
74            schedule_backoff,
75            executor,
76        ));
77    } else {
78        // The peer has been disconnected,
79        // so it can be added again after reconnection.
80        broadcasting_peers.remove(&peer);
81    }
82    diem_trace!("execute_broadcast end: peer={}", peer);
83}
84
85// =============================== //
86// Tasks processing txn submission //
87// =============================== //
88
89/// Processes transactions directly submitted by client.
90pub(crate) async fn process_client_transaction_submission(
91    smp: SharedMempool, transaction: SignedTransaction,
92    callback: oneshot::Sender<Result<SubmissionStatus>>, timer: HistogramTimer,
93) {
94    timer.stop_and_record();
95    let _timer = counters::process_txn_submit_latency_timer(
96        counters::CLIENT_LABEL,
97        counters::CLIENT_LABEL,
98    );
99    let statuses = process_incoming_transactions(
100        &smp,
101        vec![transaction],
102        TimelineState::NotReady,
103    )
104    .await;
105    log_txn_process_results(&statuses, None);
106
107    if let Some(status) = statuses.get(0) {
108        if callback.send(Ok(status.1.clone())).is_err() {
109            diem_error!(LogSchema::event_log(
110                LogEntry::JsonRpc,
111                LogEvent::CallbackFail
112            ));
113            counters::CLIENT_CALLBACK_FAIL.inc();
114        }
115    }
116}
117
118/// Processes transactions from other nodes.
119pub(crate) async fn process_transaction_broadcast(
120    smp: SharedMempool, transactions: Vec<SignedTransaction>,
121    request_id: Vec<u8>, timeline_state: TimelineState, peer: NodeId,
122    timer: HistogramTimer,
123) {
124    diem_trace!("process_transaction_broadcast starts: peer={}", peer);
125    timer.stop_and_record();
126    /*let _timer = counters::process_txn_submit_latency_timer(
127        peer.raw_network_id().as_str(),
128        peer.peer_id().short_str().as_str(),
129    );*/
130    let results = process_incoming_transactions(
131        &smp,
132        transactions.clone(),
133        timeline_state,
134    )
135    .await;
136    log_txn_process_results(&results, Some(peer.clone()));
137
138    let ack_response = gen_ack_response(request_id, results, &peer);
139    if let Err(e) = smp
140        .network_sender
141        .send_message_with_peer_id(&peer, &ack_response)
142    {
143        counters::network_send_fail_inc(counters::ACK_TXNS);
144        diem_error!(LogSchema::event_log(
145            LogEntry::BroadcastACK,
146            LogEvent::NetworkSendFail
147        )
148        //.peer(&peer)
149        .error(&e.into()));
150        return;
151    }
152    notify_subscribers(SharedMempoolNotification::ACK, &smp.subscribers);
153    diem_trace!("process_transaction_broadcast ends: peer={}", peer);
154}
155
156fn gen_ack_response(
157    request_id: Vec<u8>, results: Vec<SubmissionStatusBundle>, peer: &NodeId,
158) -> MempoolSyncMsg {
159    let mut backoff = false;
160    let mut retry = false;
161    for r in results.into_iter() {
162        let submission_status = r.1;
163        if submission_status.0.code == MempoolStatusCode::MempoolIsFull {
164            backoff = true;
165        }
166        if is_txn_retryable(submission_status) {
167            retry = true;
168        }
169
170        if backoff && retry {
171            break;
172        }
173    }
174
175    diem_trace!(
176        "request[{:?}] from peer[{:?}] retry[{:?}]",
177        request_id,
178        peer,
179        retry
180    );
181
182    update_ack_counter(&peer, counters::SENT_LABEL, retry, backoff);
183    MempoolSyncMsg::BroadcastTransactionsResponse {
184        request_id,
185        retry,
186        backoff,
187    }
188}
189
190pub(crate) fn update_ack_counter(
191    _peer: &NodeId, _direction_label: &str, _retry: bool, _backoff: bool,
192) {
193    /*
194    if retry {
195        counters::shared_mempool_ack_inc(
196            peer,
197            direction_label,
198            counters::RETRY_BROADCAST_LABEL,
199        );
200    }
201    if backoff {
202        counters::shared_mempool_ack_inc(
203            peer,
204            direction_label,
205            counters::BACKPRESSURE_BROADCAST_LABEL,
206        );
207    }*/
208}
209
210fn is_txn_retryable(result: SubmissionStatus) -> bool {
211    result.0.code == MempoolStatusCode::MempoolIsFull
212}
213
214/// Submits a list of SignedTransaction to the local mempool
215/// and returns a vector containing AdmissionControlStatus.
216pub(crate) async fn process_incoming_transactions(
217    smp: &SharedMempool, transactions: Vec<SignedTransaction>,
218    timeline_state: TimelineState,
219) -> Vec<SubmissionStatusBundle> {
220    let mut statuses = vec![];
221
222    let start_storage_read = Instant::now();
223    // Track latency for storage read fetching sequence number
224    let storage_read_latency = start_storage_read.elapsed();
225    counters::PROCESS_TXN_BREAKDOWN_LATENCY
226        .with_label_values(&[counters::FETCH_SEQ_NUM_LABEL])
227        .observe(
228            storage_read_latency.as_secs_f64() / transactions.len() as f64,
229        );
230
231    // Track latency: VM validation
232    let vm_validation_timer = counters::PROCESS_TXN_BREAKDOWN_LATENCY
233        .with_label_values(&[counters::VM_VALIDATION_LABEL])
234        .start_timer();
235    // Filter out already received transactions, so we do not need to process
236    // them.
237    let transactions: Vec<SignedTransaction> = {
238        let mempool = smp.mempool.lock();
239        transactions
240            .into_iter()
241            .filter(|tx| mempool.transactions.get(&tx.hash()).is_none())
242            .collect()
243    };
244    let validation_results = transactions
245        .par_iter()
246        .map(|t| {
247            smp.validator
248                .read()
249                .validate_transaction(&t, smp.commited_pos_state.clone())
250        })
251        .collect::<Vec<_>>();
252    vm_validation_timer.stop_and_record();
253
254    {
255        let mut mempool = smp.mempool.lock();
256        for (idx, transaction) in transactions.into_iter().enumerate() {
257            if let Some(validation_result) = &validation_results[idx] {
258                match validation_result.status() {
259                    None => {
260                        let ranking_score = validation_result.score();
261                        let governance_role =
262                            validation_result.governance_role();
263                        let mempool_status = mempool.add_txn(
264                            transaction.clone(),
265                            ranking_score,
266                            timeline_state,
267                            governance_role,
268                        );
269                        statuses.push((transaction, (mempool_status, None)));
270                    }
271                    Some(validation_status) => {
272                        statuses.push((
273                            transaction.clone(),
274                            (
275                                MempoolStatus::new(MempoolStatusCode::VmError),
276                                Some(validation_status),
277                            ),
278                        ));
279                    }
280                }
281            }
282        }
283    }
284    notify_subscribers(
285        SharedMempoolNotification::NewTransactions,
286        &smp.subscribers,
287    );
288    statuses
289}
290
291fn log_txn_process_results(
292    results: &[SubmissionStatusBundle], sender: Option<NodeId>,
293) {
294    let sender = match sender {
295        Some(peer) => peer,
296        None => {
297            return;
298        }
299    };
300    for (txn, (_mempool_status, maybe_vm_status)) in results.iter() {
301        if let Some(vm_status) = maybe_vm_status {
302            diem_trace!(
303                SecurityEvent::InvalidTransactionMempool,
304                failed_transaction = txn,
305                vm_status = vm_status,
306                sender = sender,
307            );
308            /*counters::shared_mempool_transactions_processed_inc(
309                counters::VM_VALIDATION_LABEL,
310                &network,
311                &sender,
312            );*/
313            continue;
314        }
315        /*
316        match mempool_status.code {
317            MempoolStatusCode::Accepted => {
318                counters::shared_mempool_transactions_processed_inc(
319                    counters::SUCCESS_LABEL,
320                    &network,
321                    &sender,
322                )
323            }
324            _ => counters::shared_mempool_transactions_processed_inc(
325                &mempool_status.code.to_string(),
326                &network,
327                &sender,
328            ),
329        }*/
330    }
331}
332
333// ================================= //
334// intra-node communication handlers //
335// ================================= //
336
337pub(crate) async fn process_state_sync_request(
338    mempool: Arc<Mutex<CoreMempool>>, req: CommitNotification,
339) {
340    let start_time = Instant::now();
341    diem_debug!(LogSchema::event_log(
342        LogEntry::StateSyncCommit,
343        LogEvent::Received
344    )
345    .state_sync_msg(&req));
346    counters::mempool_service_transactions(
347        counters::COMMIT_STATE_SYNC_LABEL,
348        req.transactions.len(),
349    );
350    commit_txns(&mempool, req.transactions, req.block_timestamp_usecs, false)
351        .await;
352    let result = if req.callback.send(Ok(CommitResponse::success())).is_err() {
353        diem_error!(LogSchema::event_log(
354            LogEntry::StateSyncCommit,
355            LogEvent::CallbackFail
356        ));
357        counters::REQUEST_FAIL_LABEL
358    } else {
359        counters::REQUEST_SUCCESS_LABEL
360    };
361    let latency = start_time.elapsed();
362    counters::mempool_service_latency(
363        counters::COMMIT_STATE_SYNC_LABEL,
364        result,
365        latency,
366    );
367}
368
369pub(crate) async fn process_consensus_request(
370    db: Arc<CachedPosLedgerDB>, mempool: &Mutex<CoreMempool>,
371    req: ConsensusRequest,
372) {
373    // Start latency timer
374    let start_time = Instant::now();
375    diem_debug!(
376        LogSchema::event_log(LogEntry::Consensus, LogEvent::Received)
377            .consensus_msg(&req)
378    );
379
380    let (resp, callback, counter_label) = match req {
381        ConsensusRequest::GetBlockRequest(
382            max_block_size,
383            transactions,
384            parent_block_id,
385            validators,
386            callback,
387        ) => {
388            let exclude_transactions: HashSet<TxnPointer> = transactions
389                .iter()
390                .map(|txn| (txn.sender, txn.hash))
391                .collect();
392            let mut txns;
393            {
394                let mut mempool = mempool.lock();
395                // gc before pulling block as extra protection against txns that
396                // may expire in consensus Note: this gc
397                // operation relies on the fact that consensus uses the system
398                // time to determine block timestamp
399                let curr_time = diem_infallible::duration_since_epoch();
400                mempool.gc_by_expiration_time(curr_time);
401                let block_size = cmp::max(max_block_size, 1);
402                let pos_state = db
403                    .get_pos_state(&parent_block_id)
404                    .expect("pos_state should exist");
405                txns = mempool.get_block(
406                    block_size,
407                    exclude_transactions,
408                    &pos_state,
409                    validators,
410                );
411            }
412            counters::mempool_service_transactions(
413                counters::GET_BLOCK_LABEL,
414                txns.len(),
415            );
416            txns.len();
417            let pulled_block =
418                txns.drain(..).map(SignedTransaction::into).collect();
419
420            (
421                ConsensusResponse::GetBlockResponse(pulled_block),
422                callback,
423                counters::GET_BLOCK_LABEL,
424            )
425        }
426        ConsensusRequest::RejectNotification(transactions, callback) => {
427            counters::mempool_service_transactions(
428                counters::COMMIT_CONSENSUS_LABEL,
429                transactions.len(),
430            );
431            commit_txns(mempool, transactions, 0, true).await;
432            (
433                ConsensusResponse::CommitResponse(),
434                callback,
435                counters::COMMIT_CONSENSUS_LABEL,
436            )
437        }
438    };
439    // Send back to callback
440    let result = if callback.send(Ok(resp)).is_err() {
441        diem_error!(LogSchema::event_log(
442            LogEntry::Consensus,
443            LogEvent::CallbackFail
444        ));
445        counters::REQUEST_FAIL_LABEL
446    } else {
447        counters::REQUEST_SUCCESS_LABEL
448    };
449    let latency = start_time.elapsed();
450    counters::mempool_service_latency(counter_label, result, latency);
451}
452
453async fn commit_txns(
454    mempool: &Mutex<CoreMempool>, transactions: Vec<CommittedTransaction>,
455    block_timestamp_usecs: u64, is_rejected: bool,
456) {
457    let mut pool = mempool.lock();
458
459    for transaction in transactions {
460        pool.remove_transaction(
461            &transaction.sender,
462            transaction.hash,
463            is_rejected,
464        );
465    }
466
467    if block_timestamp_usecs > 0 {
468        pool.gc_by_expiration_time(Duration::from_micros(
469            block_timestamp_usecs,
470        ));
471    }
472}
473
474/// Processes on-chain reconfiguration notification.
475pub(crate) async fn process_config_update(
476    config_update: OnChainConfigPayload,
477    _validator: Arc<RwLock<TransactionValidator>>,
478) {
479    diem_trace!(LogSchema::event_log(
480        LogEntry::ReconfigUpdate,
481        LogEvent::Process
482    )
483    .reconfig_update(config_update.clone()));
484
485    /*if let Err(e) = validator.write().restart(config_update) {
486        counters::VM_RECONFIG_UPDATE_FAIL_COUNT.inc();
487        diem_error!(LogSchema::event_log(
488            LogEntry::ReconfigUpdate,
489            LogEvent::VMUpdateFail
490        )
491        .error(&e));
492    }*/
493}