cfx_rpc_eth_impl/
filter.rs

1use crate::{
2    helpers::{
3        eth_filter::EthFilterHelper,
4        poll_filter::{
5            limit_logs, PollFilter, SyncPollFilter, MAX_BLOCK_HISTORY_SIZE,
6        },
7    },
8    traits::Filterable,
9};
10use cfx_rpc_eth_api::EthFilterApiServer;
11use cfx_rpc_eth_types::{
12    BlockId, EthRpcLogFilter as Filter, FilterChanges, Log,
13};
14use cfx_rpc_utils::error::jsonrpsee_error_helpers::invalid_request_msg;
15use cfx_tasks::TaskExecutor;
16use cfx_types::{H128 as FilterId, H256};
17use cfx_util_macros::bail;
18use cfxcore::{channel::Channel, SharedConsensusGraph, SharedTransactionPool};
19use jsonrpsee::core::RpcResult;
20use primitives::filter::LogFilter;
21use std::{collections::VecDeque, sync::Arc};
22
23type PendingTransactionFilterKind = ();
24
25pub struct EthFilterApi {
26    inner: EthFilterHelper,
27}
28
29impl EthFilterApi {
30    pub fn new(
31        consensus: SharedConsensusGraph, tx_pool: SharedTransactionPool,
32        epochs_ordered: Arc<Channel<(u64, Vec<H256>)>>, executor: TaskExecutor,
33        poll_lifetime: u32, logs_filter_max_limit: Option<usize>,
34    ) -> EthFilterApi {
35        let eth_filter = EthFilterHelper::new(
36            consensus,
37            tx_pool,
38            epochs_ordered,
39            executor,
40            poll_lifetime,
41            logs_filter_max_limit,
42        );
43        EthFilterApi { inner: eth_filter }
44    }
45}
46
47#[async_trait::async_trait]
48impl EthFilterApiServer for EthFilterApi {
49    async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
50        let mut polls = self.inner.polls().lock();
51        let epoch_number = self.inner.best_executed_epoch_number();
52
53        if filter.to_block == Some(BlockId::Pending) {
54            bail!(invalid_request_msg(
55                "Filter logs from pending blocks is not supported"
56            ))
57        }
58
59        let filter: LogFilter = self.inner.into_primitive_filter(filter)?;
60
61        let id = polls.create_poll(SyncPollFilter::new(PollFilter::Logs {
62            last_epoch_number: if epoch_number == 0 {
63                0
64            } else {
65                epoch_number - 1
66            },
67            filter,
68            include_pending: false,
69            previous_logs: VecDeque::with_capacity(MAX_BLOCK_HISTORY_SIZE),
70            recent_reported_epochs: VecDeque::with_capacity(
71                MAX_BLOCK_HISTORY_SIZE,
72            ),
73        }));
74
75        Ok(id.into())
76    }
77
78    async fn new_block_filter(&self) -> RpcResult<FilterId> {
79        let mut polls = self.inner.polls().lock();
80        // +1, since we don't want to include the current block
81        let id = polls.create_poll(SyncPollFilter::new(PollFilter::Block {
82            last_epoch_number: self.inner.best_executed_epoch_number(),
83            recent_reported_epochs: VecDeque::with_capacity(
84                MAX_BLOCK_HISTORY_SIZE,
85            ),
86        }));
87
88        Ok(id.into())
89    }
90
91    async fn new_pending_transaction_filter(
92        &self, kind: Option<PendingTransactionFilterKind>,
93    ) -> RpcResult<FilterId> {
94        let _ = kind;
95        let mut polls = self.inner.polls().lock();
96        let pending_transactions = self.inner.pending_transaction_hashes();
97        let id = polls.create_poll(SyncPollFilter::new(
98            PollFilter::PendingTransaction(pending_transactions),
99        ));
100        Ok(id.into())
101    }
102
103    async fn filter_changes(&self, id: FilterId) -> RpcResult<FilterChanges> {
104        let filter = match self.inner.polls().lock().poll_mut(&id) {
105            Some(filter) => filter.clone(),
106            None => bail!(invalid_request_msg("Filter not found")),
107        };
108
109        filter.modify(|filter| match *filter {
110            PollFilter::Block {
111                ref mut last_epoch_number,
112                ref mut recent_reported_epochs,
113            } => {
114                let (reorg_len, epochs) =
115                    self.inner.epochs_since_last_request(
116                        *last_epoch_number,
117                        recent_reported_epochs,
118                    )?;
119
120                // rewind block to last valid
121                for _ in 0..reorg_len {
122                    recent_reported_epochs.pop_front();
123                }
124
125                let mut hashes = Vec::new();
126                for (num, blocks) in epochs.into_iter() {
127                    *last_epoch_number = num;
128                    hashes.push(
129                        blocks
130                            .last()
131                            .cloned()
132                            .expect("pivot block should exist"),
133                    );
134                    // Only keep the most recent history
135                    if recent_reported_epochs.len() >= MAX_BLOCK_HISTORY_SIZE {
136                        recent_reported_epochs.pop_back();
137                    }
138                    recent_reported_epochs.push_front((num, blocks));
139                }
140
141                Ok(FilterChanges::Hashes(hashes))
142            }
143            PollFilter::PendingTransaction(ref mut previous_hashes) => {
144                // get hashes of pending transactions
145                let current_hashes = self.inner.pending_transaction_hashes();
146
147                let new_hashes = {
148                    // find all new hashes
149                    current_hashes
150                        .difference(previous_hashes)
151                        .cloned()
152                        .map(Into::into)
153                        .collect()
154                };
155
156                // save all hashes of pending transactions
157                *previous_hashes = current_hashes;
158
159                // return new hashes
160                Ok(FilterChanges::Hashes(new_hashes))
161            }
162            PollFilter::Logs {
163                ref mut last_epoch_number,
164                ref mut recent_reported_epochs,
165                ref mut previous_logs,
166                ref filter,
167                include_pending: _,
168            } => {
169                let (reorg_len, epochs) =
170                    self.inner.epochs_since_last_request(
171                        *last_epoch_number,
172                        recent_reported_epochs,
173                    )?;
174
175                let mut logs = vec![];
176
177                // retrieve reorg logs
178                for _ in 0..reorg_len {
179                    recent_reported_epochs.pop_front().unwrap();
180                    let mut log: Vec<Log> = previous_logs
181                        .pop_front()
182                        .unwrap()
183                        .into_iter()
184                        .map(|mut l| {
185                            l.removed = true;
186                            l
187                        })
188                        .collect();
189                    logs.append(&mut log);
190                }
191
192                // logs from new epochs
193                for (num, blocks) in epochs.into_iter() {
194                    let log = match self.inner.logs_for_epoch(
195                        &filter,
196                        (num, blocks.clone()),
197                        false,
198                    ) {
199                        Ok(l) => l,
200                        _ => break,
201                    };
202
203                    logs.append(&mut log.clone());
204                    *last_epoch_number = num;
205
206                    // Only keep the most recent history
207                    if recent_reported_epochs.len() >= MAX_BLOCK_HISTORY_SIZE {
208                        recent_reported_epochs.pop_back();
209                        previous_logs.pop_back();
210                    }
211                    recent_reported_epochs.push_front((num, blocks));
212                    previous_logs.push_front(log);
213                }
214
215                Ok(FilterChanges::Logs(limit_logs(
216                    logs,
217                    self.inner.get_logs_filter_max_limit(),
218                )))
219            }
220        })
221    }
222
223    async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
224        let (filter, _) = {
225            let mut polls = self.inner.polls().lock();
226
227            match polls.poll(&id).and_then(|f| {
228                f.modify(|filter| match *filter {
229                    PollFilter::Logs {
230                        ref filter,
231                        include_pending,
232                        ..
233                    } => Some((filter.clone(), include_pending)),
234                    _ => None,
235                })
236            }) {
237                Some((filter, include_pending)) => (filter, include_pending),
238                None => bail!(invalid_request_msg("Filter not found")),
239            }
240        };
241
242        // retrieve logs
243        Ok(limit_logs(
244            self.inner.logs(filter)?,
245            self.inner.get_logs_filter_max_limit(),
246        ))
247    }
248
249    async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
250        Ok(self.inner.polls().lock().remove_poll(&id))
251    }
252}