1use crate::{
2 helpers::{
3 eth_filter::EthFilterHelper,
4 poll_filter::{
5 limit_logs, PollFilter, SyncPollFilter, MAX_BLOCK_HISTORY_SIZE,
6 },
7 },
8 traits::Filterable,
9};
10use cfx_rpc_eth_api::EthFilterApiServer;
11use cfx_rpc_eth_types::{
12 BlockId, EthRpcLogFilter as Filter, FilterChanges, Log,
13};
14use cfx_rpc_utils::error::jsonrpsee_error_helpers::invalid_request_msg;
15use cfx_tasks::TaskExecutor;
16use cfx_types::{H128 as FilterId, H256};
17use cfx_util_macros::bail;
18use cfxcore::{channel::Channel, SharedConsensusGraph, SharedTransactionPool};
19use jsonrpsee::core::RpcResult;
20use primitives::filter::LogFilter;
21use std::{collections::VecDeque, sync::Arc};
22
23type PendingTransactionFilterKind = ();
24
25pub struct EthFilterApi {
26 inner: EthFilterHelper,
27}
28
29impl EthFilterApi {
30 pub fn new(
31 consensus: SharedConsensusGraph, tx_pool: SharedTransactionPool,
32 epochs_ordered: Arc<Channel<(u64, Vec<H256>)>>, executor: TaskExecutor,
33 poll_lifetime: u32, logs_filter_max_limit: Option<usize>,
34 ) -> EthFilterApi {
35 let eth_filter = EthFilterHelper::new(
36 consensus,
37 tx_pool,
38 epochs_ordered,
39 executor,
40 poll_lifetime,
41 logs_filter_max_limit,
42 );
43 EthFilterApi { inner: eth_filter }
44 }
45}
46
47#[async_trait::async_trait]
48impl EthFilterApiServer for EthFilterApi {
49 async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
50 let mut polls = self.inner.polls().lock();
51 let epoch_number = self.inner.best_executed_epoch_number();
52
53 if filter.to_block == Some(BlockId::Pending) {
54 bail!(invalid_request_msg(
55 "Filter logs from pending blocks is not supported"
56 ))
57 }
58
59 let filter: LogFilter = self.inner.into_primitive_filter(filter)?;
60
61 let id = polls.create_poll(SyncPollFilter::new(PollFilter::Logs {
62 last_epoch_number: if epoch_number == 0 {
63 0
64 } else {
65 epoch_number - 1
66 },
67 filter,
68 include_pending: false,
69 previous_logs: VecDeque::with_capacity(MAX_BLOCK_HISTORY_SIZE),
70 recent_reported_epochs: VecDeque::with_capacity(
71 MAX_BLOCK_HISTORY_SIZE,
72 ),
73 }));
74
75 Ok(id.into())
76 }
77
78 async fn new_block_filter(&self) -> RpcResult<FilterId> {
79 let mut polls = self.inner.polls().lock();
80 let id = polls.create_poll(SyncPollFilter::new(PollFilter::Block {
82 last_epoch_number: self.inner.best_executed_epoch_number(),
83 recent_reported_epochs: VecDeque::with_capacity(
84 MAX_BLOCK_HISTORY_SIZE,
85 ),
86 }));
87
88 Ok(id.into())
89 }
90
91 async fn new_pending_transaction_filter(
92 &self, kind: Option<PendingTransactionFilterKind>,
93 ) -> RpcResult<FilterId> {
94 let _ = kind;
95 let mut polls = self.inner.polls().lock();
96 let pending_transactions = self.inner.pending_transaction_hashes();
97 let id = polls.create_poll(SyncPollFilter::new(
98 PollFilter::PendingTransaction(pending_transactions),
99 ));
100 Ok(id.into())
101 }
102
103 async fn filter_changes(&self, id: FilterId) -> RpcResult<FilterChanges> {
104 let filter = match self.inner.polls().lock().poll_mut(&id) {
105 Some(filter) => filter.clone(),
106 None => bail!(invalid_request_msg("Filter not found")),
107 };
108
109 filter.modify(|filter| match *filter {
110 PollFilter::Block {
111 ref mut last_epoch_number,
112 ref mut recent_reported_epochs,
113 } => {
114 let (reorg_len, epochs) =
115 self.inner.epochs_since_last_request(
116 *last_epoch_number,
117 recent_reported_epochs,
118 )?;
119
120 for _ in 0..reorg_len {
122 recent_reported_epochs.pop_front();
123 }
124
125 let mut hashes = Vec::new();
126 for (num, blocks) in epochs.into_iter() {
127 *last_epoch_number = num;
128 hashes.push(
129 blocks
130 .last()
131 .cloned()
132 .expect("pivot block should exist"),
133 );
134 if recent_reported_epochs.len() >= MAX_BLOCK_HISTORY_SIZE {
136 recent_reported_epochs.pop_back();
137 }
138 recent_reported_epochs.push_front((num, blocks));
139 }
140
141 Ok(FilterChanges::Hashes(hashes))
142 }
143 PollFilter::PendingTransaction(ref mut previous_hashes) => {
144 let current_hashes = self.inner.pending_transaction_hashes();
146
147 let new_hashes = {
148 current_hashes
150 .difference(previous_hashes)
151 .cloned()
152 .map(Into::into)
153 .collect()
154 };
155
156 *previous_hashes = current_hashes;
158
159 Ok(FilterChanges::Hashes(new_hashes))
161 }
162 PollFilter::Logs {
163 ref mut last_epoch_number,
164 ref mut recent_reported_epochs,
165 ref mut previous_logs,
166 ref filter,
167 include_pending: _,
168 } => {
169 let (reorg_len, epochs) =
170 self.inner.epochs_since_last_request(
171 *last_epoch_number,
172 recent_reported_epochs,
173 )?;
174
175 let mut logs = vec![];
176
177 for _ in 0..reorg_len {
179 recent_reported_epochs.pop_front().unwrap();
180 let mut log: Vec<Log> = previous_logs
181 .pop_front()
182 .unwrap()
183 .into_iter()
184 .map(|mut l| {
185 l.removed = true;
186 l
187 })
188 .collect();
189 logs.append(&mut log);
190 }
191
192 for (num, blocks) in epochs.into_iter() {
194 let log = match self.inner.logs_for_epoch(
195 &filter,
196 (num, blocks.clone()),
197 false,
198 ) {
199 Ok(l) => l,
200 _ => break,
201 };
202
203 logs.append(&mut log.clone());
204 *last_epoch_number = num;
205
206 if recent_reported_epochs.len() >= MAX_BLOCK_HISTORY_SIZE {
208 recent_reported_epochs.pop_back();
209 previous_logs.pop_back();
210 }
211 recent_reported_epochs.push_front((num, blocks));
212 previous_logs.push_front(log);
213 }
214
215 Ok(FilterChanges::Logs(limit_logs(
216 logs,
217 self.inner.get_logs_filter_max_limit(),
218 )))
219 }
220 })
221 }
222
223 async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
224 let (filter, _) = {
225 let mut polls = self.inner.polls().lock();
226
227 match polls.poll(&id).and_then(|f| {
228 f.modify(|filter| match *filter {
229 PollFilter::Logs {
230 ref filter,
231 include_pending,
232 ..
233 } => Some((filter.clone(), include_pending)),
234 _ => None,
235 })
236 }) {
237 Some((filter, include_pending)) => (filter, include_pending),
238 None => bail!(invalid_request_msg("Filter not found")),
239 }
240 };
241
242 Ok(limit_logs(
244 self.inner.logs(filter)?,
245 self.inner.get_logs_filter_max_limit(),
246 ))
247 }
248
249 async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
250 Ok(self.inner.polls().lock().remove_poll(&id))
251 }
252}