cfx_rpc/
filter.rs

1use crate::{
2    helpers::{
3        eth_filter::EthFilterHelper,
4        poll_filter::{
5            limit_logs, PollFilter, SyncPollFilter, MAX_BLOCK_HISTORY_SIZE,
6        },
7    },
8    traits::Filterable,
9};
10use cfx_rpc_eth_api::EthFilterApiServer;
11use cfx_rpc_eth_types::{
12    BlockId, EthRpcLogFilter as Filter, FilterChanges, Log,
13};
14use cfx_rpc_utils::error::jsonrpsee_error_helpers::{
15    invalid_request_msg, jsonrpc_error_to_error_object_owned,
16};
17use cfx_tasks::TaskExecutor;
18use cfx_types::{H128 as FilterId, H256};
19use cfx_util_macros::bail;
20use cfxcore::{channel::Channel, SharedConsensusGraph, SharedTransactionPool};
21use jsonrpsee::core::RpcResult;
22use primitives::filter::LogFilter;
23use std::{collections::VecDeque, sync::Arc};
24
25type PendingTransactionFilterKind = ();
26
27pub struct EthFilterApi {
28    inner: EthFilterHelper,
29}
30
31impl EthFilterApi {
32    pub fn new(
33        consensus: SharedConsensusGraph, tx_pool: SharedTransactionPool,
34        epochs_ordered: Arc<Channel<(u64, Vec<H256>)>>, executor: TaskExecutor,
35        poll_lifetime: u32, logs_filter_max_limit: Option<usize>,
36    ) -> EthFilterApi {
37        let eth_filter = EthFilterHelper::new(
38            consensus,
39            tx_pool,
40            epochs_ordered,
41            executor,
42            poll_lifetime,
43            logs_filter_max_limit,
44        );
45        EthFilterApi { inner: eth_filter }
46    }
47}
48
49#[async_trait::async_trait]
50impl EthFilterApiServer for EthFilterApi {
51    async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
52        let mut polls = self.inner.polls().lock();
53        let epoch_number = self.inner.best_executed_epoch_number();
54
55        if filter.to_block == Some(BlockId::Pending) {
56            bail!(invalid_request_msg(
57                "Filter logs from pending blocks is not supported"
58            ))
59        }
60
61        let filter: LogFilter = self
62            .inner
63            .into_primitive_filter(filter)
64            .map_err(|e| jsonrpc_error_to_error_object_owned(e.into()))?;
65
66        let id = polls.create_poll(SyncPollFilter::new(PollFilter::Logs {
67            last_epoch_number: if epoch_number == 0 {
68                0
69            } else {
70                epoch_number - 1
71            },
72            filter,
73            include_pending: false,
74            previous_logs: VecDeque::with_capacity(MAX_BLOCK_HISTORY_SIZE),
75            recent_reported_epochs: VecDeque::with_capacity(
76                MAX_BLOCK_HISTORY_SIZE,
77            ),
78        }));
79
80        Ok(id.into())
81    }
82
83    async fn new_block_filter(&self) -> RpcResult<FilterId> {
84        let mut polls = self.inner.polls().lock();
85        // +1, since we don't want to include the current block
86        let id = polls.create_poll(SyncPollFilter::new(PollFilter::Block {
87            last_epoch_number: self.inner.best_executed_epoch_number(),
88            recent_reported_epochs: VecDeque::with_capacity(
89                MAX_BLOCK_HISTORY_SIZE,
90            ),
91        }));
92
93        Ok(id.into())
94    }
95
96    async fn new_pending_transaction_filter(
97        &self, kind: Option<PendingTransactionFilterKind>,
98    ) -> RpcResult<FilterId> {
99        let _ = kind;
100        let mut polls = self.inner.polls().lock();
101        let pending_transactions = self.inner.pending_transaction_hashes();
102        let id = polls.create_poll(SyncPollFilter::new(
103            PollFilter::PendingTransaction(pending_transactions),
104        ));
105        Ok(id.into())
106    }
107
108    async fn filter_changes(&self, id: FilterId) -> RpcResult<FilterChanges> {
109        let filter = match self.inner.polls().lock().poll_mut(&id) {
110            Some(filter) => filter.clone(),
111            None => bail!(invalid_request_msg("Filter not found")),
112        };
113
114        filter.modify(|filter| match *filter {
115            PollFilter::Block {
116                ref mut last_epoch_number,
117                ref mut recent_reported_epochs,
118            } => {
119                let (reorg_len, epochs) = self
120                    .inner
121                    .epochs_since_last_request(
122                        *last_epoch_number,
123                        recent_reported_epochs,
124                    )
125                    .map_err(|e| jsonrpc_error_to_error_object_owned(e))?;
126
127                // rewind block to last valid
128                for _ in 0..reorg_len {
129                    recent_reported_epochs.pop_front();
130                }
131
132                let mut hashes = Vec::new();
133                for (num, blocks) in epochs.into_iter() {
134                    *last_epoch_number = num;
135                    hashes.push(
136                        blocks
137                            .last()
138                            .cloned()
139                            .expect("pivot block should exist"),
140                    );
141                    // Only keep the most recent history
142                    if recent_reported_epochs.len() >= MAX_BLOCK_HISTORY_SIZE {
143                        recent_reported_epochs.pop_back();
144                    }
145                    recent_reported_epochs.push_front((num, blocks));
146                }
147
148                Ok(FilterChanges::Hashes(hashes))
149            }
150            PollFilter::PendingTransaction(ref mut previous_hashes) => {
151                // get hashes of pending transactions
152                let current_hashes = self.inner.pending_transaction_hashes();
153
154                let new_hashes = {
155                    // find all new hashes
156                    current_hashes
157                        .difference(previous_hashes)
158                        .cloned()
159                        .map(Into::into)
160                        .collect()
161                };
162
163                // save all hashes of pending transactions
164                *previous_hashes = current_hashes;
165
166                // return new hashes
167                Ok(FilterChanges::Hashes(new_hashes))
168            }
169            PollFilter::Logs {
170                ref mut last_epoch_number,
171                ref mut recent_reported_epochs,
172                ref mut previous_logs,
173                ref filter,
174                include_pending: _,
175            } => {
176                let (reorg_len, epochs) = self
177                    .inner
178                    .epochs_since_last_request(
179                        *last_epoch_number,
180                        recent_reported_epochs,
181                    )
182                    .map_err(|e| jsonrpc_error_to_error_object_owned(e))?;
183
184                let mut logs = vec![];
185
186                // retrieve reorg logs
187                for _ in 0..reorg_len {
188                    recent_reported_epochs.pop_front().unwrap();
189                    let mut log: Vec<Log> = previous_logs
190                        .pop_front()
191                        .unwrap()
192                        .into_iter()
193                        .map(|mut l| {
194                            l.removed = true;
195                            l
196                        })
197                        .collect();
198                    logs.append(&mut log);
199                }
200
201                // logs from new epochs
202                for (num, blocks) in epochs.into_iter() {
203                    let log = match self.inner.logs_for_epoch(
204                        &filter,
205                        (num, blocks.clone()),
206                        false,
207                    ) {
208                        Ok(l) => l,
209                        _ => break,
210                    };
211
212                    logs.append(&mut log.clone());
213                    *last_epoch_number = num;
214
215                    // Only keep the most recent history
216                    if recent_reported_epochs.len() >= MAX_BLOCK_HISTORY_SIZE {
217                        recent_reported_epochs.pop_back();
218                        previous_logs.pop_back();
219                    }
220                    recent_reported_epochs.push_front((num, blocks));
221                    previous_logs.push_front(log);
222                }
223
224                Ok(FilterChanges::Logs(limit_logs(
225                    logs,
226                    self.inner.get_logs_filter_max_limit(),
227                )))
228            }
229        })
230    }
231
232    async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
233        let (filter, _) = {
234            let mut polls = self.inner.polls().lock();
235
236            match polls.poll(&id).and_then(|f| {
237                f.modify(|filter| match *filter {
238                    PollFilter::Logs {
239                        ref filter,
240                        include_pending,
241                        ..
242                    } => Some((filter.clone(), include_pending)),
243                    _ => None,
244                })
245            }) {
246                Some((filter, include_pending)) => (filter, include_pending),
247                None => bail!(invalid_request_msg("Filter not found")),
248            }
249        };
250
251        // retrieve logs
252        Ok(limit_logs(
253            self.inner
254                .logs(filter)
255                .map_err(|e| jsonrpc_error_to_error_object_owned(e))?,
256            self.inner.get_logs_filter_max_limit(),
257        ))
258    }
259
260    async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
261        Ok(self.inner.polls().lock().remove_poll(&id))
262    }
263}