cfx_rpc/helpers/
eth_filter.rs

1use std::{
2    collections::{BTreeSet, HashMap, HashSet, VecDeque},
3    iter::zip,
4    sync::Arc,
5};
6
7use crate::{
8    helpers::{poll_filter::SyncPollFilter, poll_manager::PollManager},
9    traits::Filterable,
10};
11use cfx_rpc_cfx_types::traits::BlockProvider;
12use cfx_rpc_eth_types::{EthRpcLogFilter, Log};
13use cfx_rpc_utils::error::error_codes as codes;
14use cfx_tasks::TaskExecutor;
15use cfx_types::{Space, H256};
16use cfx_util_macros::bail;
17use cfxcore::{
18    channel::Channel, errors::Error as CfxRpcError, ConsensusGraph,
19    SharedConsensusGraph, SharedTransactionPool,
20};
21use jsonrpc_core::{Error as RpcError, ErrorCode, Result as RpcResult};
22use log::{debug, error, info};
23use parking_lot::{Mutex, RwLock};
24use primitives::{
25    filter::LogFilter, log_entry::LocalizedLogEntry, EpochNumber,
26};
27
28/// Eth filter rpc implementation for a full node.
29pub struct EthFilterHelper {
30    consensus: SharedConsensusGraph,
31    tx_pool: SharedTransactionPool,
32    polls: Mutex<PollManager<SyncPollFilter<Log>>>,
33    unfinalized_epochs: Arc<RwLock<UnfinalizedEpochs>>,
34    logs_filter_max_limit: Option<usize>,
35}
36
37pub struct UnfinalizedEpochs {
38    epochs_queue: VecDeque<(u64, Vec<H256>)>,
39    epochs_map: HashMap<u64, Vec<Vec<H256>>>,
40}
41
42impl Default for UnfinalizedEpochs {
43    fn default() -> Self {
44        UnfinalizedEpochs {
45            epochs_queue: Default::default(),
46            epochs_map: Default::default(),
47        }
48    }
49}
50
51impl EthFilterHelper {
52    /// Creates new Eth filter client.
53    pub fn new(
54        consensus: SharedConsensusGraph, tx_pool: SharedTransactionPool,
55        epochs_ordered: Arc<Channel<(u64, Vec<H256>)>>, executor: TaskExecutor,
56        poll_lifetime: u32, logs_filter_max_limit: Option<usize>,
57    ) -> Self {
58        let filter_client = EthFilterHelper {
59            consensus,
60            tx_pool,
61            polls: Mutex::new(PollManager::new(poll_lifetime)),
62            unfinalized_epochs: Default::default(),
63            logs_filter_max_limit,
64        };
65
66        // start loop to receive epochs, to avoid re-org during filter query
67        filter_client.start_epochs_loop(epochs_ordered, executor);
68        filter_client
69    }
70
71    fn start_epochs_loop(
72        &self, epochs_ordered: Arc<Channel<(u64, Vec<H256>)>>,
73        executor: TaskExecutor,
74    ) {
75        // subscribe to the `epochs_ordered` channel
76        let mut receiver = epochs_ordered.subscribe();
77        let consensus = self.consensus.clone();
78        let epochs = self.unfinalized_epochs.clone();
79
80        // loop asynchronously
81        let fut = async move {
82            while let Some(epoch) = receiver.recv().await {
83                let mut epochs = epochs.write();
84
85                epochs.epochs_queue.push_back(epoch.clone());
86                epochs
87                    .epochs_map
88                    .entry(epoch.0)
89                    .or_insert(vec![])
90                    .push(epoch.1.clone());
91
92                let latest_finalized_epoch_number =
93                    consensus.latest_finalized_epoch_number();
94                debug!(
95                    "latest finalized epoch number: {}, received epochs: {:?}",
96                    latest_finalized_epoch_number, epoch
97                );
98
99                // only keep epochs after finalized state
100                while let Some(e) = epochs.epochs_queue.front() {
101                    if e.0 < latest_finalized_epoch_number {
102                        let (k, _) = epochs.epochs_queue.pop_front().unwrap();
103                        if let Some(target) = epochs.epochs_map.get_mut(&k) {
104                            if target.len() == 1 {
105                                epochs.epochs_map.remove(&k);
106                            } else {
107                                target.remove(0);
108                            }
109                        }
110                    } else {
111                        break;
112                    }
113                }
114            }
115        };
116
117        executor.spawn(fut);
118    }
119
120    fn retrieve_epoch_logs(
121        epoch: (u64, Vec<H256>), consensus_graph: &ConsensusGraph,
122    ) -> Option<Vec<LocalizedLogEntry>> {
123        debug!("retrieve_epoch_logs {:?}", epoch);
124        let (epoch_number, hashes) = epoch;
125        let pivot = hashes.last().cloned().expect("epoch should not be empty");
126
127        // construct phantom block
128        let pb = match consensus_graph.get_phantom_block_by_number(
129            EpochNumber::Number(epoch_number),
130            Some(pivot),
131            false, /* include_traces */
132        ) {
133            Ok(Some(b)) => b,
134            Ok(None) => {
135                error!("Block not executed yet {:?}", pivot);
136                return None;
137            }
138            Err(e) => {
139                error!("get_phantom_block_by_number failed with {}", e);
140                return None;
141            }
142        };
143
144        let mut logs = vec![];
145        let mut log_index = 0;
146
147        let txs = &pb.transactions;
148        assert_eq!(pb.receipts.len(), txs.len());
149
150        // construct logs
151        for (txid, (receipt, tx)) in zip(&pb.receipts, txs).enumerate() {
152            let eth_logs: Vec<_> = receipt
153                .logs
154                .iter()
155                .cloned()
156                .filter(|l| l.space == Space::Ethereum)
157                .collect();
158
159            for (logid, entry) in eth_logs.into_iter().enumerate() {
160                logs.push(LocalizedLogEntry {
161                    entry,
162                    block_hash: pivot,
163                    epoch_number,
164                    block_timestamp: Some(pb.pivot_header.timestamp()),
165                    transaction_hash: tx.hash,
166                    transaction_index: txid,
167                    log_index,
168                    transaction_log_index: logid,
169                });
170
171                log_index += 1;
172            }
173        }
174
175        Some(logs)
176    }
177}
178
179impl Filterable for EthFilterHelper {
180    /// Current best epoch number.
181    fn best_executed_epoch_number(&self) -> u64 {
182        self.consensus_graph().best_executed_state_epoch_number()
183    }
184
185    /// Get a block hash by block id.
186    fn block_hashes(&self, epoch_num: EpochNumber) -> Option<Vec<H256>> {
187        // keep read lock to ensure consistent view
188        let _inner = self.consensus_graph().inner.read();
189        let hashes =
190            self.consensus_graph().get_block_hashes_by_epoch(epoch_num);
191
192        match hashes {
193            Ok(v) => return Some(v),
194            _ => return None,
195        }
196    }
197
198    /// pending transaction hashes at the given block (unordered).
199    fn pending_transaction_hashes(&self) -> BTreeSet<H256> {
200        self.tx_pool.get_pending_transaction_hashes_in_evm_pool()
201    }
202
203    /// Get logs that match the given filter.
204    fn logs(&self, filter: LogFilter) -> RpcResult<Vec<Log>> {
205        let logs = self
206            .consensus_graph()
207            .logs(filter)
208            .map_err(|err| CfxRpcError::from(err))?;
209
210        Ok(logs
211            .iter()
212            .cloned()
213            .map(|l| Log::try_from_localized(l, self, false))
214            .collect::<Result<_, _>>()?)
215    }
216
217    fn logs_for_epoch(
218        &self, filter: &LogFilter, epoch: (u64, Vec<H256>), removed: bool,
219    ) -> RpcResult<Vec<Log>> {
220        let mut result = vec![];
221        let logs =
222            match Self::retrieve_epoch_logs(epoch, self.consensus_graph()) {
223                Some(logs) => logs,
224                None => bail!(RpcError {
225                    code: ErrorCode::ServerError(codes::UNSUPPORTED),
226                    message: "Unable to retrieve logs for epoch".into(),
227                    data: None,
228                }),
229            };
230
231        // apply filter to logs
232        let logs: Vec<Log> = logs
233            .iter()
234            .filter(|l| filter.matches(&l.entry))
235            .cloned()
236            .map(|l| Log::try_from_localized(l, self, removed))
237            .collect::<Result<_, _>>()?;
238        result.extend(logs);
239
240        Ok(result)
241    }
242
243    /// Get a reference to the poll manager.
244    fn polls(&self) -> &Mutex<PollManager<SyncPollFilter<Log>>> { &self.polls }
245
246    fn consensus_graph(&self) -> &ConsensusGraph { &self.consensus }
247
248    fn shared_consensus_graph(&self) -> SharedConsensusGraph {
249        self.consensus.clone()
250    }
251
252    fn get_logs_filter_max_limit(&self) -> Option<usize> {
253        self.logs_filter_max_limit
254    }
255
256    fn epochs_since_last_request(
257        &self, last_epoch_number: u64,
258        recent_reported_epochs: &VecDeque<(u64, Vec<H256>)>,
259    ) -> RpcResult<(u64, Vec<(u64, Vec<H256>)>)> {
260        let last_block = if let Some((num, hash)) =
261            recent_reported_epochs.front().cloned()
262        {
263            if last_epoch_number != num {
264                bail!(RpcError {
265                    code: ErrorCode::ServerError(codes::UNSUPPORTED),
266                    message: "Last block number does not match".into(),
267                    data: None,
268                });
269            }
270            Some(hash)
271        } else {
272            None
273        };
274
275        // retrieve the current epoch number
276        let current_epoch_number = self.best_executed_epoch_number();
277        debug!("current epoch number {}", current_epoch_number);
278        let latest_epochs = self.unfinalized_epochs.read();
279
280        // the best executed epoch index
281        let mut idx = latest_epochs.epochs_queue.len() as i32 - 1;
282        while idx >= 0
283            && latest_epochs.epochs_queue[idx as usize].0
284                != current_epoch_number
285        {
286            // special case: best_executed_epoch_number rollback, so those
287            // epoches before last_epoch_number can be considered to have be
288            // processed.
289            if latest_epochs.epochs_queue[idx as usize].0 == last_epoch_number
290                && last_block
291                    == Some(latest_epochs.epochs_queue[idx as usize].1.clone())
292            {
293                return Ok((0, vec![]));
294            }
295
296            idx -= 1;
297        }
298
299        // epochs between [max(last_epoch_number,
300        // latest_finalized_epoch_number), best executed epoch]
301        let mut end_epoch_number = current_epoch_number + 1;
302        let mut new_epochs = vec![];
303        let mut hs = HashSet::new();
304        while idx >= 0 {
305            let (num, blocks) =
306                latest_epochs.epochs_queue[idx as usize].clone();
307            if num == last_epoch_number
308                && (last_block.is_none() || last_block == Some(blocks.clone()))
309            {
310                break;
311            }
312
313            // only keep the last one
314            if num < end_epoch_number && !hs.contains(&num) {
315                hs.insert(num);
316                new_epochs.push((num, blocks));
317                end_epoch_number = num;
318            }
319
320            idx -= 1;
321        }
322        new_epochs.reverse();
323
324        // re-orged epochs
325        // when last_epoch_number great than or equal to
326        // latest_finalized_epoch_number, reorg_epochs should be empty
327        // when last_epoch_number less than
328        // latest_finalized_epoch_number, epochs between [fork point,
329        // min(last_epoch_number, latest_finalized_epoch_number)]
330        let mut reorg_epochs = vec![];
331        let mut reorg_len = 0;
332        for i in 0..recent_reported_epochs.len() {
333            let (num, hash) = recent_reported_epochs[i].clone();
334
335            if num < end_epoch_number {
336                let pivot_hash =
337                    if let Some(v) = latest_epochs.epochs_map.get(&num) {
338                        v.last().unwrap().clone()
339                    } else {
340                        self.block_hashes(EpochNumber::Number(num))
341                            .expect("Epoch should exist")
342                    };
343
344                if pivot_hash == hash {
345                    // meet fork point
346                    break;
347                }
348
349                debug!("reorg for {}, pivot hash {:?}", num, pivot_hash);
350                reorg_epochs.push((num, pivot_hash));
351            }
352            reorg_len += 1;
353        }
354        reorg_epochs.reverse();
355
356        // mid stable epochs, epochs in [last_epoch_number,
357        // latest_finalized_epoch_number]
358        debug!(
359            "stable epochs from {} to {}",
360            last_epoch_number + 1,
361            end_epoch_number
362        );
363        for epoch_num in (last_epoch_number + 1)..end_epoch_number {
364            let hash = self
365                .block_hashes(EpochNumber::Number(epoch_num))
366                .expect("Epoch should exist");
367            reorg_epochs.push((epoch_num, hash));
368        }
369        reorg_epochs.append(&mut new_epochs);
370
371        info!(
372            "Chain reorg len: {}, new epochs len: {}",
373            reorg_len,
374            reorg_epochs.len()
375        );
376        Ok((reorg_len, reorg_epochs))
377    }
378
379    fn into_primitive_filter(
380        &self, filter: EthRpcLogFilter,
381    ) -> RpcResult<LogFilter> {
382        filter.into_primitive(self).map_err(|e| e.into())
383    }
384}
385
386impl BlockProvider for &EthFilterHelper {
387    fn get_block_epoch_number(&self, hash: &H256) -> Option<u64> {
388        self.consensus.get_block_epoch_number(hash)
389    }
390
391    fn get_block_hashes_by_epoch(
392        &self, epoch_number: EpochNumber,
393    ) -> Result<Vec<H256>, String> {
394        self.consensus
395            .get_block_hashes_by_epoch(epoch_number)
396            .map_err(|e| e.to_string())
397    }
398}