1use crate::{
2 helpers::{
3 eth_filter::EthFilterHelper,
4 poll_filter::{
5 limit_logs, PollFilter, SyncPollFilter, MAX_BLOCK_HISTORY_SIZE,
6 },
7 },
8 traits::Filterable,
9};
10use cfx_rpc_eth_api::EthFilterApiServer;
11use cfx_rpc_eth_types::{
12 BlockId, EthRpcLogFilter as Filter, FilterChanges, Log,
13};
14use cfx_rpc_utils::error::jsonrpsee_error_helpers::{
15 invalid_request_msg, jsonrpc_error_to_error_object_owned,
16};
17use cfx_tasks::TaskExecutor;
18use cfx_types::{H128 as FilterId, H256};
19use cfx_util_macros::bail;
20use cfxcore::{channel::Channel, SharedConsensusGraph, SharedTransactionPool};
21use jsonrpsee::core::RpcResult;
22use primitives::filter::LogFilter;
23use std::{collections::VecDeque, sync::Arc};
24
25type PendingTransactionFilterKind = ();
26
27pub struct EthFilterApi {
28 inner: EthFilterHelper,
29}
30
31impl EthFilterApi {
32 pub fn new(
33 consensus: SharedConsensusGraph, tx_pool: SharedTransactionPool,
34 epochs_ordered: Arc<Channel<(u64, Vec<H256>)>>, executor: TaskExecutor,
35 poll_lifetime: u32, logs_filter_max_limit: Option<usize>,
36 ) -> EthFilterApi {
37 let eth_filter = EthFilterHelper::new(
38 consensus,
39 tx_pool,
40 epochs_ordered,
41 executor,
42 poll_lifetime,
43 logs_filter_max_limit,
44 );
45 EthFilterApi { inner: eth_filter }
46 }
47}
48
49#[async_trait::async_trait]
50impl EthFilterApiServer for EthFilterApi {
51 async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
52 let mut polls = self.inner.polls().lock();
53 let epoch_number = self.inner.best_executed_epoch_number();
54
55 if filter.to_block == Some(BlockId::Pending) {
56 bail!(invalid_request_msg(
57 "Filter logs from pending blocks is not supported"
58 ))
59 }
60
61 let filter: LogFilter = self
62 .inner
63 .into_primitive_filter(filter)
64 .map_err(|e| jsonrpc_error_to_error_object_owned(e.into()))?;
65
66 let id = polls.create_poll(SyncPollFilter::new(PollFilter::Logs {
67 last_epoch_number: if epoch_number == 0 {
68 0
69 } else {
70 epoch_number - 1
71 },
72 filter,
73 include_pending: false,
74 previous_logs: VecDeque::with_capacity(MAX_BLOCK_HISTORY_SIZE),
75 recent_reported_epochs: VecDeque::with_capacity(
76 MAX_BLOCK_HISTORY_SIZE,
77 ),
78 }));
79
80 Ok(id.into())
81 }
82
83 async fn new_block_filter(&self) -> RpcResult<FilterId> {
84 let mut polls = self.inner.polls().lock();
85 let id = polls.create_poll(SyncPollFilter::new(PollFilter::Block {
87 last_epoch_number: self.inner.best_executed_epoch_number(),
88 recent_reported_epochs: VecDeque::with_capacity(
89 MAX_BLOCK_HISTORY_SIZE,
90 ),
91 }));
92
93 Ok(id.into())
94 }
95
96 async fn new_pending_transaction_filter(
97 &self, kind: Option<PendingTransactionFilterKind>,
98 ) -> RpcResult<FilterId> {
99 let _ = kind;
100 let mut polls = self.inner.polls().lock();
101 let pending_transactions = self.inner.pending_transaction_hashes();
102 let id = polls.create_poll(SyncPollFilter::new(
103 PollFilter::PendingTransaction(pending_transactions),
104 ));
105 Ok(id.into())
106 }
107
108 async fn filter_changes(&self, id: FilterId) -> RpcResult<FilterChanges> {
109 let filter = match self.inner.polls().lock().poll_mut(&id) {
110 Some(filter) => filter.clone(),
111 None => bail!(invalid_request_msg("Filter not found")),
112 };
113
114 filter.modify(|filter| match *filter {
115 PollFilter::Block {
116 ref mut last_epoch_number,
117 ref mut recent_reported_epochs,
118 } => {
119 let (reorg_len, epochs) = self
120 .inner
121 .epochs_since_last_request(
122 *last_epoch_number,
123 recent_reported_epochs,
124 )
125 .map_err(|e| jsonrpc_error_to_error_object_owned(e))?;
126
127 for _ in 0..reorg_len {
129 recent_reported_epochs.pop_front();
130 }
131
132 let mut hashes = Vec::new();
133 for (num, blocks) in epochs.into_iter() {
134 *last_epoch_number = num;
135 hashes.push(
136 blocks
137 .last()
138 .cloned()
139 .expect("pivot block should exist"),
140 );
141 if recent_reported_epochs.len() >= MAX_BLOCK_HISTORY_SIZE {
143 recent_reported_epochs.pop_back();
144 }
145 recent_reported_epochs.push_front((num, blocks));
146 }
147
148 Ok(FilterChanges::Hashes(hashes))
149 }
150 PollFilter::PendingTransaction(ref mut previous_hashes) => {
151 let current_hashes = self.inner.pending_transaction_hashes();
153
154 let new_hashes = {
155 current_hashes
157 .difference(previous_hashes)
158 .cloned()
159 .map(Into::into)
160 .collect()
161 };
162
163 *previous_hashes = current_hashes;
165
166 Ok(FilterChanges::Hashes(new_hashes))
168 }
169 PollFilter::Logs {
170 ref mut last_epoch_number,
171 ref mut recent_reported_epochs,
172 ref mut previous_logs,
173 ref filter,
174 include_pending: _,
175 } => {
176 let (reorg_len, epochs) = self
177 .inner
178 .epochs_since_last_request(
179 *last_epoch_number,
180 recent_reported_epochs,
181 )
182 .map_err(|e| jsonrpc_error_to_error_object_owned(e))?;
183
184 let mut logs = vec![];
185
186 for _ in 0..reorg_len {
188 recent_reported_epochs.pop_front().unwrap();
189 let mut log: Vec<Log> = previous_logs
190 .pop_front()
191 .unwrap()
192 .into_iter()
193 .map(|mut l| {
194 l.removed = true;
195 l
196 })
197 .collect();
198 logs.append(&mut log);
199 }
200
201 for (num, blocks) in epochs.into_iter() {
203 let log = match self.inner.logs_for_epoch(
204 &filter,
205 (num, blocks.clone()),
206 false,
207 ) {
208 Ok(l) => l,
209 _ => break,
210 };
211
212 logs.append(&mut log.clone());
213 *last_epoch_number = num;
214
215 if recent_reported_epochs.len() >= MAX_BLOCK_HISTORY_SIZE {
217 recent_reported_epochs.pop_back();
218 previous_logs.pop_back();
219 }
220 recent_reported_epochs.push_front((num, blocks));
221 previous_logs.push_front(log);
222 }
223
224 Ok(FilterChanges::Logs(limit_logs(
225 logs,
226 self.inner.get_logs_filter_max_limit(),
227 )))
228 }
229 })
230 }
231
232 async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
233 let (filter, _) = {
234 let mut polls = self.inner.polls().lock();
235
236 match polls.poll(&id).and_then(|f| {
237 f.modify(|filter| match *filter {
238 PollFilter::Logs {
239 ref filter,
240 include_pending,
241 ..
242 } => Some((filter.clone(), include_pending)),
243 _ => None,
244 })
245 }) {
246 Some((filter, include_pending)) => (filter, include_pending),
247 None => bail!(invalid_request_msg("Filter not found")),
248 }
249 };
250
251 Ok(limit_logs(
253 self.inner
254 .logs(filter)
255 .map_err(|e| jsonrpc_error_to_error_object_owned(e))?,
256 self.inner.get_logs_filter_max_limit(),
257 ))
258 }
259
260 async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
261 Ok(self.inner.polls().lock().remove_poll(&id))
262 }
263}