client/rpc/impls/cfx/
cfx_filter.rs

1// Copyright 2022 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use log::{debug, error, info, warn};
6use std::{
7    collections::{BTreeSet, HashMap, HashSet, VecDeque},
8    sync::Arc,
9};
10
11use crate::rpc::{
12    errors::{codes, invalid_params},
13    helpers::{
14        limit_logs, PollFilter, PollManager, SyncPollFilter,
15        MAX_BLOCK_HISTORY_SIZE,
16    },
17    traits::cfx_filter::CfxFilter,
18    types::{CfxFilterChanges, CfxFilterLog, CfxRpcLogFilter, Log, RevertTo},
19};
20use cfx_addr::Network;
21use cfx_types::{Space, H128, H256};
22use cfx_util_macros::bail;
23use cfxcore::{
24    channel::Channel, errors::Error as CfxRpcError, BlockDataManager,
25    ConsensusGraph, SharedConsensusGraph, SharedTransactionPool,
26};
27use itertools::zip;
28use jsonrpc_core::{Error as RpcError, ErrorCode, Result as JsonRpcResult};
29use parking_lot::{Mutex, RwLock};
30use primitives::{
31    filter::LogFilter, log_entry::LocalizedLogEntry, BlockReceipts, EpochNumber,
32};
33use tokio::runtime::Runtime;
34
35/// Something which provides data that can be filtered over.
36pub trait Filterable {
37    /// Current best epoch number.
38    fn best_executed_epoch_number(&self) -> u64;
39
40    /// Get a block hash by block id.
41    fn block_hashes(&self, epoch_num: EpochNumber) -> Option<Vec<H256>>;
42
43    /// pending transaction hashes at the given block (unordered).
44    fn pending_transaction_hashes(&self) -> BTreeSet<H256>;
45
46    /// Get logs that match the given filter.
47    fn logs(&self, filter: LogFilter) -> JsonRpcResult<Vec<Log>>;
48
49    /// Get logs that match the given filter for specific epoch
50    fn logs_for_epoch(
51        &self, filter: &LogFilter, epoch: (u64, Vec<H256>),
52        data_man: &Arc<BlockDataManager>,
53    ) -> JsonRpcResult<Vec<Log>>;
54
55    /// Get a reference to the poll manager.
56    fn polls(&self) -> &Mutex<PollManager<SyncPollFilter<Log>>>;
57
58    /// Get a reference to ConsensusGraph
59    fn consensus_graph(&self) -> &ConsensusGraph;
60
61    /// Get a clone of SharedConsensusGraph
62    fn shared_consensus_graph(&self) -> SharedConsensusGraph;
63
64    /// Get logs limitation
65    fn get_logs_filter_max_limit(&self) -> Option<usize>;
66
67    /// Get epochs since last query
68    fn epochs_since_last_request(
69        &self, last_epoch_number: u64,
70        recent_reported_epochs: &VecDeque<(u64, Vec<H256>)>,
71    ) -> JsonRpcResult<(u64, Vec<(u64, Vec<H256>)>)>;
72}
73
74/// Cfx filter rpc implementation for a full node.
75pub struct CfxFilterClient {
76    consensus: SharedConsensusGraph,
77    tx_pool: SharedTransactionPool,
78    polls: Mutex<PollManager<SyncPollFilter<Log>>>,
79    unfinalized_epochs: Arc<RwLock<UnfinalizedEpochs>>,
80    logs_filter_max_limit: Option<usize>,
81    network: Network,
82}
83
84pub struct UnfinalizedEpochs {
85    epochs_queue: VecDeque<(u64, Vec<H256>)>,
86    epochs_map: HashMap<u64, Vec<Vec<H256>>>,
87}
88
89impl Default for UnfinalizedEpochs {
90    fn default() -> Self {
91        UnfinalizedEpochs {
92            epochs_queue: Default::default(),
93            epochs_map: Default::default(),
94        }
95    }
96}
97
98impl CfxFilterClient {
99    /// Creates new Cfx filter client.
100    pub fn new(
101        consensus: SharedConsensusGraph, tx_pool: SharedTransactionPool,
102        epochs_ordered: Arc<Channel<(u64, Vec<H256>)>>, executor: Arc<Runtime>,
103        poll_lifetime: u32, logs_filter_max_limit: Option<usize>,
104        network: Network,
105    ) -> Self {
106        let filter_client = CfxFilterClient {
107            consensus,
108            tx_pool,
109            polls: Mutex::new(PollManager::new(poll_lifetime)),
110            unfinalized_epochs: Default::default(),
111            logs_filter_max_limit,
112            network,
113        };
114
115        // start loop to receive epochs, to avoid re-org during filter query
116        filter_client.start_epochs_loop(epochs_ordered, executor);
117        filter_client
118    }
119
120    fn start_epochs_loop(
121        &self, epochs_ordered: Arc<Channel<(u64, Vec<H256>)>>,
122        executor: Arc<Runtime>,
123    ) {
124        // subscribe to the `epochs_ordered` channel
125        let mut receiver = epochs_ordered.subscribe();
126        let consensus = self.consensus.clone();
127        let epochs = self.unfinalized_epochs.clone();
128
129        // loop asynchronously
130        let fut = async move {
131            while let Some(epoch) = receiver.recv().await {
132                let mut epochs = epochs.write();
133
134                epochs.epochs_queue.push_back(epoch.clone());
135                epochs
136                    .epochs_map
137                    .entry(epoch.0)
138                    .or_insert(vec![])
139                    .push(epoch.1.clone());
140
141                let latest_finalized_epoch_number =
142                    consensus.latest_finalized_epoch_number();
143                debug!(
144                    "latest finalized epoch number: {}, received epochs: {:?}",
145                    latest_finalized_epoch_number, epoch
146                );
147
148                // only keep epochs after finalized state
149                while let Some(e) = epochs.epochs_queue.front() {
150                    if e.0 < latest_finalized_epoch_number {
151                        let (k, _) = epochs.epochs_queue.pop_front().unwrap();
152                        if let Some(target) = epochs.epochs_map.get_mut(&k) {
153                            if target.len() == 1 {
154                                epochs.epochs_map.remove(&k);
155                            } else {
156                                target.remove(0);
157                            }
158                        }
159                    } else {
160                        break;
161                    }
162                }
163            }
164        };
165
166        executor.spawn(fut);
167    }
168}
169
170impl Filterable for CfxFilterClient {
171    /// Current best epoch number.
172    fn best_executed_epoch_number(&self) -> u64 {
173        self.consensus_graph().best_executed_state_epoch_number()
174    }
175
176    /// Get a block hash by block id.
177    fn block_hashes(&self, epoch_num: EpochNumber) -> Option<Vec<H256>> {
178        // keep read lock to ensure consistent view
179        let _inner = self.consensus_graph().inner.read();
180        let hashes =
181            self.consensus_graph().get_block_hashes_by_epoch(epoch_num);
182
183        match hashes {
184            Ok(v) => return Some(v),
185            _ => return None,
186        }
187    }
188
189    /// pending transaction hashes at the given block (unordered).
190    fn pending_transaction_hashes(&self) -> BTreeSet<H256> {
191        self.tx_pool.get_pending_transaction_hashes_in_native_pool()
192    }
193
194    /// Get logs that match the given filter.
195    fn logs(&self, filter: LogFilter) -> JsonRpcResult<Vec<Log>> {
196        let logs = self
197            .consensus_graph()
198            .logs(filter)
199            .map_err(|err| CfxRpcError::from(err))?;
200
201        Ok(logs
202            .iter()
203            .cloned()
204            .map(|l| Log::try_from_localized(l, self.network))
205            .collect::<Result<_, _>>()
206            .map_err(|_| invalid_params("filter", "retrieve logs error"))?)
207    }
208
209    fn logs_for_epoch(
210        &self, filter: &LogFilter, epoch: (u64, Vec<H256>),
211        data_man: &Arc<BlockDataManager>,
212    ) -> JsonRpcResult<Vec<Log>> {
213        let mut result = vec![];
214        let logs = match retrieve_epoch_logs(data_man, epoch) {
215            Some(logs) => logs,
216            None => bail!(RpcError {
217                code: ErrorCode::ServerError(codes::UNSUPPORTED),
218                message: "Unable to retrieve logs for epoch".into(),
219                data: None,
220            }),
221        };
222
223        // apply filter to logs
224        let logs: Vec<Log> = logs
225            .iter()
226            .filter(|l| filter.matches(&l.entry))
227            .cloned()
228            .map(|l| Log::try_from_localized(l, self.network))
229            .collect::<Result<_, _>>()
230            .map_err(|_| {
231                invalid_params("filter", "retrieve logs for epoch error")
232            })?;
233        result.extend(logs);
234
235        Ok(result)
236    }
237
238    /// Get a reference to the poll manager.
239    fn polls(&self) -> &Mutex<PollManager<SyncPollFilter<Log>>> { &self.polls }
240
241    fn consensus_graph(&self) -> &ConsensusGraph { &self.consensus }
242
243    fn shared_consensus_graph(&self) -> SharedConsensusGraph {
244        self.consensus.clone()
245    }
246
247    fn get_logs_filter_max_limit(&self) -> Option<usize> {
248        self.logs_filter_max_limit
249    }
250
251    fn epochs_since_last_request(
252        &self, last_epoch_number: u64,
253        recent_reported_epochs: &VecDeque<(u64, Vec<H256>)>,
254    ) -> JsonRpcResult<(u64, Vec<(u64, Vec<H256>)>)> {
255        let last_block = if let Some((num, hash)) =
256            recent_reported_epochs.front().cloned()
257        {
258            if last_epoch_number != num {
259                bail!(RpcError {
260                    code: ErrorCode::ServerError(codes::UNSUPPORTED),
261                    message: "Last block number does not match".into(),
262                    data: None,
263                });
264            }
265            Some(hash)
266        } else {
267            None
268        };
269
270        // retrieve current epoch number
271        let current_epoch_number = self.best_executed_epoch_number();
272        debug!("current epoch number {}", current_epoch_number);
273        let latest_epochs = self.unfinalized_epochs.read();
274
275        // the best executed epoch index
276        let mut idx = latest_epochs.epochs_queue.len() as i32 - 1;
277        while idx >= 0
278            && latest_epochs.epochs_queue[idx as usize].0
279                != current_epoch_number
280        {
281            // special case: best_executed_epoch_number rollback, so those
282            // epoches before last_epoch_number can be considered to have be
283            // processed.
284            if latest_epochs.epochs_queue[idx as usize].0 == last_epoch_number
285                && last_block
286                    == Some(latest_epochs.epochs_queue[idx as usize].1.clone())
287            {
288                return Ok((0, vec![]));
289            }
290
291            idx -= 1;
292        }
293
294        // epochs between [max(last_epoch_number,
295        // latest_finalized_epoch_number), best executed epoch]
296        let mut end_epoch_number = current_epoch_number + 1;
297        let mut new_epochs = vec![];
298        let mut hs = HashSet::new();
299        while idx >= 0 {
300            let (num, blocks) =
301                latest_epochs.epochs_queue[idx as usize].clone();
302            if num == last_epoch_number
303                && (last_block.is_none() || last_block == Some(blocks.clone()))
304            {
305                break;
306            }
307
308            // only keep the last one
309            if num < end_epoch_number && !hs.contains(&num) {
310                hs.insert(num);
311                new_epochs.push((num, blocks));
312                end_epoch_number = num;
313            }
314
315            idx -= 1;
316        }
317        new_epochs.reverse();
318
319        // re-orged epochs
320        // when last_epoch_number great than or equal to
321        // latest_finalized_epoch_number, reorg_epochs should be empty
322        // when last_epoch_number less than
323        // latest_finalized_epoch_number, epochs between [fork point,
324        // min(last_epoch_number, latest_finalized_epoch_number)]
325        let mut reorg_epochs = vec![];
326        let mut reorg_len = 0;
327        for i in 0..recent_reported_epochs.len() {
328            let (num, hash) = recent_reported_epochs[i].clone();
329
330            if num < end_epoch_number {
331                let pivot_hash =
332                    if let Some(v) = latest_epochs.epochs_map.get(&num) {
333                        v.last().unwrap().clone()
334                    } else {
335                        self.block_hashes(EpochNumber::Number(num))
336                            .expect("Epoch should exist")
337                    };
338
339                if pivot_hash == hash {
340                    // meet fork point
341                    break;
342                }
343
344                debug!("reorg for {}, pivot hash {:?}", num, pivot_hash);
345                reorg_epochs.push((num, pivot_hash));
346            }
347            reorg_len += 1;
348        }
349        reorg_epochs.reverse();
350
351        // mid stable epochs, epochs in [last_epoch_number,
352        // latest_finalized_epoch_number]
353        debug!(
354            "stable epochs from {} to {}",
355            last_epoch_number + 1,
356            end_epoch_number
357        );
358        for epoch_num in (last_epoch_number + 1)..end_epoch_number {
359            let hash = self
360                .block_hashes(EpochNumber::Number(epoch_num))
361                .expect("Epoch should exist");
362            reorg_epochs.push((epoch_num, hash));
363        }
364        reorg_epochs.append(&mut new_epochs);
365
366        info!(
367            "Chain reorg len: {}, new epochs len: {}",
368            reorg_len,
369            reorg_epochs.len()
370        );
371        Ok((reorg_len, reorg_epochs))
372    }
373}
374
375impl<T: Filterable + Send + Sync + 'static> CfxFilter for T {
376    /// Returns id of new filter.
377    fn new_filter(&self, filter: CfxRpcLogFilter) -> JsonRpcResult<H128> {
378        debug!("create filter: {:?}", filter);
379        let mut polls = self.polls().lock();
380        let epoch_number = self.best_executed_epoch_number();
381
382        let filter: LogFilter = filter.into_primitive()?;
383
384        let id = polls.create_poll(SyncPollFilter::new(PollFilter::Logs {
385            last_epoch_number: if epoch_number == 0 {
386                0
387            } else {
388                epoch_number - 1
389            },
390            filter,
391            include_pending: false,
392            previous_logs: VecDeque::with_capacity(MAX_BLOCK_HISTORY_SIZE),
393            recent_reported_epochs: VecDeque::with_capacity(
394                MAX_BLOCK_HISTORY_SIZE,
395            ),
396        }));
397
398        Ok(id.into())
399    }
400
401    /// Returns id of new block filter.
402    fn new_block_filter(&self) -> JsonRpcResult<H128> {
403        debug!("create block filter");
404        let mut polls = self.polls().lock();
405        // +1, since we don't want to include the current block
406        let id = polls.create_poll(SyncPollFilter::new(PollFilter::Block {
407            last_epoch_number: self.best_executed_epoch_number(),
408            recent_reported_epochs: VecDeque::with_capacity(
409                MAX_BLOCK_HISTORY_SIZE,
410            ),
411        }));
412
413        Ok(id.into())
414    }
415
416    /// Returns id of new block filter.
417    fn new_pending_transaction_filter(&self) -> JsonRpcResult<H128> {
418        debug!("create pending transaction filter");
419        let mut polls = self.polls().lock();
420        let pending_transactions = self.pending_transaction_hashes();
421        let id = polls.create_poll(SyncPollFilter::new(
422            PollFilter::PendingTransaction(pending_transactions),
423        ));
424        Ok(id.into())
425    }
426
427    /// Returns filter changes since last poll.
428    fn filter_changes(&self, index: H128) -> JsonRpcResult<CfxFilterChanges> {
429        info!("filter_changes id: {}", index);
430        let filter = match self.polls().lock().poll_mut(&index) {
431            Some(filter) => filter.clone(),
432            None => bail!(RpcError {
433                code: ErrorCode::InvalidRequest,
434                message: "Filter not found".into(),
435                data: None,
436            }),
437        };
438
439        filter.modify(|filter| match *filter {
440            PollFilter::Block {
441                ref mut last_epoch_number,
442                ref mut recent_reported_epochs,
443            } => {
444                let (reorg_len, epochs) = self.epochs_since_last_request(
445                    *last_epoch_number,
446                    recent_reported_epochs,
447                )?;
448
449                // rewind block to last valid
450                for _ in 0..reorg_len {
451                    recent_reported_epochs.pop_front();
452                }
453
454                let mut hashes = Vec::new();
455                for (num, blocks) in epochs.into_iter() {
456                    *last_epoch_number = num;
457                    hashes.append(&mut blocks.clone());
458
459                    // Only keep the most recent history
460                    if recent_reported_epochs.len() >= MAX_BLOCK_HISTORY_SIZE {
461                        recent_reported_epochs.pop_back();
462                    }
463                    recent_reported_epochs.push_front((num, blocks));
464                }
465
466                Ok(CfxFilterChanges::Hashes(hashes))
467            }
468            PollFilter::PendingTransaction(ref mut previous_hashes) => {
469                // get hashes of pending transactions
470                let current_hashes = self.pending_transaction_hashes();
471
472                let new_hashes = {
473                    // find all new hashes
474                    current_hashes
475                        .difference(previous_hashes)
476                        .cloned()
477                        .map(Into::into)
478                        .collect()
479                };
480
481                // save all hashes of pending transactions
482                *previous_hashes = current_hashes;
483
484                // return new hashes
485                Ok(CfxFilterChanges::Hashes(new_hashes))
486            }
487            PollFilter::Logs {
488                ref mut last_epoch_number,
489                ref mut recent_reported_epochs,
490                ref mut previous_logs,
491                ref filter,
492                include_pending: _,
493            } => {
494                let (reorg_len, epochs) = self.epochs_since_last_request(
495                    *last_epoch_number,
496                    recent_reported_epochs,
497                )?;
498
499                let mut logs = vec![];
500
501                // retrieve reorg logs
502                for _ in 0..reorg_len {
503                    recent_reported_epochs.pop_front().unwrap();
504                }
505
506                if reorg_len > 0 {
507                    logs.push(CfxFilterLog::ChainReorg(RevertTo {
508                        revert_to: epochs.first().unwrap().0.into(),
509                    }));
510                }
511                let data_man = self.consensus_graph().data_manager().clone();
512
513                // logs from new epochs
514                for (num, blocks) in epochs.into_iter() {
515                    let log = match self.logs_for_epoch(
516                        &filter,
517                        (num, blocks.clone()),
518                        &data_man,
519                    ) {
520                        Ok(l) => l,
521                        _ => break,
522                    };
523
524                    log.iter()
525                        // .map(|l| CfxFilterLog::Log(l))
526                        .for_each(|l| logs.push(CfxFilterLog::Log(l.clone())));
527
528                    // logs.append(&mut log.clone());
529                    *last_epoch_number = num;
530
531                    // Only keep the most recent history
532                    if recent_reported_epochs.len() >= MAX_BLOCK_HISTORY_SIZE {
533                        recent_reported_epochs.pop_back();
534                        previous_logs.pop_back();
535                    }
536                    recent_reported_epochs.push_front((num, blocks));
537                    previous_logs.push_front(log);
538                }
539
540                Ok(CfxFilterChanges::Logs(limit_logs(
541                    logs,
542                    self.get_logs_filter_max_limit(),
543                )))
544            }
545        })
546    }
547
548    /// Returns all logs matching given filter (in a range 'from' - 'to').
549    fn filter_logs(&self, index: H128) -> JsonRpcResult<Vec<Log>> {
550        let (filter, _) = {
551            let mut polls = self.polls().lock();
552
553            match polls.poll(&index).and_then(|f| {
554                f.modify(|filter| match *filter {
555                    PollFilter::Logs {
556                        ref filter,
557                        include_pending,
558                        ..
559                    } => Some((filter.clone(), include_pending)),
560                    _ => None,
561                })
562            }) {
563                Some((filter, include_pending)) => (filter, include_pending),
564                None => bail!(RpcError {
565                    code: ErrorCode::InvalidRequest,
566                    message: "Filter not found".into(),
567                    data: None,
568                }),
569            }
570        };
571
572        // retrieve logs
573        Ok(limit_logs(
574            self.logs(filter)?,
575            self.get_logs_filter_max_limit(),
576        ))
577    }
578
579    /// Uninstalls filter.
580    fn uninstall_filter(&self, index: H128) -> JsonRpcResult<bool> {
581        Ok(self.polls().lock().remove_poll(&index))
582    }
583}
584
585fn retrieve_epoch_logs(
586    data_man: &Arc<BlockDataManager>, epoch: (u64, Vec<H256>),
587) -> Option<Vec<LocalizedLogEntry>> {
588    debug!("retrieve_epoch_logs {:?}", epoch);
589    let (epoch_number, hashes) = epoch;
590    let pivot = hashes.last().cloned().expect("epoch should not be empty");
591
592    // retrieve epoch receipts
593    let fut = hashes
594        .iter()
595        .map(|h| retrieve_block_receipts(&data_man, h, &pivot));
596
597    let receipts = fut.into_iter().collect::<Option<Vec<_>>>()?;
598
599    let mut logs = vec![];
600    let mut log_index = 0;
601
602    for (block_hash, block_receipts) in zip(hashes, receipts) {
603        // retrieve block transactions
604        let block = match data_man
605            .block_by_hash(&block_hash, true /* update_cache */)
606        {
607            Some(b) => b,
608            None => {
609                warn!("Unable to retrieve block {:?}", block_hash);
610                return None;
611            }
612        };
613
614        let txs = &block.transactions;
615        assert_eq!(block_receipts.receipts.len(), txs.len());
616
617        // construct logs
618        for (txid, (receipt, tx)) in
619            zip(&block_receipts.receipts, txs).enumerate()
620        {
621            let native_logs: Vec<_> = receipt
622                .logs
623                .iter()
624                .cloned()
625                .filter(|l| l.space == Space::Native)
626                .collect();
627
628            for (logid, entry) in native_logs.into_iter().enumerate() {
629                logs.push(LocalizedLogEntry {
630                    entry,
631                    block_hash,
632                    epoch_number,
633                    block_timestamp: Some(block.block_header.timestamp()),
634                    transaction_hash: tx.hash,
635                    transaction_index: txid,
636                    log_index,
637                    transaction_log_index: logid,
638                });
639
640                log_index += 1;
641            }
642        }
643    }
644
645    Some(logs)
646}
647
648// attempt to retrieve block receipts from BlockDataManager
649fn retrieve_block_receipts(
650    data_man: &Arc<BlockDataManager>, block: &H256, pivot: &H256,
651) -> Option<Arc<BlockReceipts>> {
652    match data_man.block_execution_result_by_hash_with_epoch(
653        &block, &pivot, false, /* update_pivot_assumption */
654        false, /* update_cache */
655    ) {
656        Some(res) => return Some(res.block_receipts.clone()),
657        None => {
658            error!("Cannot find receipts with {:?}/{:?}", block, pivot);
659            return None;
660        }
661    }
662}