1use std::{
2 collections::{BTreeSet, HashMap, HashSet, VecDeque},
3 iter::zip,
4 sync::Arc,
5};
6
7use crate::{
8 helpers::{poll_filter::SyncPollFilter, poll_manager::PollManager},
9 traits::Filterable,
10};
11use cfx_rpc_cfx_types::traits::BlockProvider;
12use cfx_rpc_eth_types::{EthRpcLogFilter, Log};
13use cfx_rpc_utils::error::error_codes as codes;
14use cfx_tasks::TaskExecutor;
15use cfx_types::{Space, H256};
16use cfx_util_macros::bail;
17use cfxcore::{
18 channel::Channel, errors::Error as CfxRpcError, ConsensusGraph,
19 SharedConsensusGraph, SharedTransactionPool,
20};
21use jsonrpc_core::{Error as RpcError, ErrorCode, Result as RpcResult};
22use log::{debug, error, info};
23use parking_lot::{Mutex, RwLock};
24use primitives::{
25 filter::LogFilter, log_entry::LocalizedLogEntry, EpochNumber,
26};
27
28pub struct EthFilterHelper {
30 consensus: SharedConsensusGraph,
31 tx_pool: SharedTransactionPool,
32 polls: Mutex<PollManager<SyncPollFilter<Log>>>,
33 unfinalized_epochs: Arc<RwLock<UnfinalizedEpochs>>,
34 logs_filter_max_limit: Option<usize>,
35}
36
37pub struct UnfinalizedEpochs {
38 epochs_queue: VecDeque<(u64, Vec<H256>)>,
39 epochs_map: HashMap<u64, Vec<Vec<H256>>>,
40}
41
42impl Default for UnfinalizedEpochs {
43 fn default() -> Self {
44 UnfinalizedEpochs {
45 epochs_queue: Default::default(),
46 epochs_map: Default::default(),
47 }
48 }
49}
50
51impl EthFilterHelper {
52 pub fn new(
54 consensus: SharedConsensusGraph, tx_pool: SharedTransactionPool,
55 epochs_ordered: Arc<Channel<(u64, Vec<H256>)>>, executor: TaskExecutor,
56 poll_lifetime: u32, logs_filter_max_limit: Option<usize>,
57 ) -> Self {
58 let filter_client = EthFilterHelper {
59 consensus,
60 tx_pool,
61 polls: Mutex::new(PollManager::new(poll_lifetime)),
62 unfinalized_epochs: Default::default(),
63 logs_filter_max_limit,
64 };
65
66 filter_client.start_epochs_loop(epochs_ordered, executor);
68 filter_client
69 }
70
71 fn start_epochs_loop(
72 &self, epochs_ordered: Arc<Channel<(u64, Vec<H256>)>>,
73 executor: TaskExecutor,
74 ) {
75 let mut receiver = epochs_ordered.subscribe();
77 let consensus = self.consensus.clone();
78 let epochs = self.unfinalized_epochs.clone();
79
80 let fut = async move {
82 while let Some(epoch) = receiver.recv().await {
83 let mut epochs = epochs.write();
84
85 epochs.epochs_queue.push_back(epoch.clone());
86 epochs
87 .epochs_map
88 .entry(epoch.0)
89 .or_insert(vec![])
90 .push(epoch.1.clone());
91
92 let latest_finalized_epoch_number =
93 consensus.latest_finalized_epoch_number();
94 debug!(
95 "latest finalized epoch number: {}, received epochs: {:?}",
96 latest_finalized_epoch_number, epoch
97 );
98
99 while let Some(e) = epochs.epochs_queue.front() {
101 if e.0 < latest_finalized_epoch_number {
102 let (k, _) = epochs.epochs_queue.pop_front().unwrap();
103 if let Some(target) = epochs.epochs_map.get_mut(&k) {
104 if target.len() == 1 {
105 epochs.epochs_map.remove(&k);
106 } else {
107 target.remove(0);
108 }
109 }
110 } else {
111 break;
112 }
113 }
114 }
115 };
116
117 executor.spawn(fut);
118 }
119
120 fn retrieve_epoch_logs(
121 epoch: (u64, Vec<H256>), consensus_graph: &ConsensusGraph,
122 ) -> Option<Vec<LocalizedLogEntry>> {
123 debug!("retrieve_epoch_logs {:?}", epoch);
124 let (epoch_number, hashes) = epoch;
125 let pivot = hashes.last().cloned().expect("epoch should not be empty");
126
127 let pb = match consensus_graph.get_phantom_block_by_number(
129 EpochNumber::Number(epoch_number),
130 Some(pivot),
131 false, ) {
133 Ok(Some(b)) => b,
134 Ok(None) => {
135 error!("Block not executed yet {:?}", pivot);
136 return None;
137 }
138 Err(e) => {
139 error!("get_phantom_block_by_number failed with {}", e);
140 return None;
141 }
142 };
143
144 let mut logs = vec![];
145 let mut log_index = 0;
146
147 let txs = &pb.transactions;
148 assert_eq!(pb.receipts.len(), txs.len());
149
150 for (txid, (receipt, tx)) in zip(&pb.receipts, txs).enumerate() {
152 let eth_logs: Vec<_> = receipt
153 .logs
154 .iter()
155 .cloned()
156 .filter(|l| l.space == Space::Ethereum)
157 .collect();
158
159 for (logid, entry) in eth_logs.into_iter().enumerate() {
160 logs.push(LocalizedLogEntry {
161 entry,
162 block_hash: pivot,
163 epoch_number,
164 block_timestamp: Some(pb.pivot_header.timestamp()),
165 transaction_hash: tx.hash,
166 transaction_index: txid,
167 log_index,
168 transaction_log_index: logid,
169 });
170
171 log_index += 1;
172 }
173 }
174
175 Some(logs)
176 }
177}
178
179impl Filterable for EthFilterHelper {
180 fn best_executed_epoch_number(&self) -> u64 {
182 self.consensus_graph().best_executed_state_epoch_number()
183 }
184
185 fn block_hashes(&self, epoch_num: EpochNumber) -> Option<Vec<H256>> {
187 let _inner = self.consensus_graph().inner.read();
189 let hashes =
190 self.consensus_graph().get_block_hashes_by_epoch(epoch_num);
191
192 match hashes {
193 Ok(v) => return Some(v),
194 _ => return None,
195 }
196 }
197
198 fn pending_transaction_hashes(&self) -> BTreeSet<H256> {
200 self.tx_pool.get_pending_transaction_hashes_in_evm_pool()
201 }
202
203 fn logs(&self, filter: LogFilter) -> RpcResult<Vec<Log>> {
205 let logs = self
206 .consensus_graph()
207 .logs(filter)
208 .map_err(|err| CfxRpcError::from(err))?;
209
210 Ok(logs
211 .iter()
212 .cloned()
213 .map(|l| Log::try_from_localized(l, self, false))
214 .collect::<Result<_, _>>()?)
215 }
216
217 fn logs_for_epoch(
218 &self, filter: &LogFilter, epoch: (u64, Vec<H256>), removed: bool,
219 ) -> RpcResult<Vec<Log>> {
220 let mut result = vec![];
221 let logs =
222 match Self::retrieve_epoch_logs(epoch, self.consensus_graph()) {
223 Some(logs) => logs,
224 None => bail!(RpcError {
225 code: ErrorCode::ServerError(codes::UNSUPPORTED),
226 message: "Unable to retrieve logs for epoch".into(),
227 data: None,
228 }),
229 };
230
231 let logs: Vec<Log> = logs
233 .iter()
234 .filter(|l| filter.matches(&l.entry))
235 .cloned()
236 .map(|l| Log::try_from_localized(l, self, removed))
237 .collect::<Result<_, _>>()?;
238 result.extend(logs);
239
240 Ok(result)
241 }
242
243 fn polls(&self) -> &Mutex<PollManager<SyncPollFilter<Log>>> { &self.polls }
245
246 fn consensus_graph(&self) -> &ConsensusGraph { &self.consensus }
247
248 fn shared_consensus_graph(&self) -> SharedConsensusGraph {
249 self.consensus.clone()
250 }
251
252 fn get_logs_filter_max_limit(&self) -> Option<usize> {
253 self.logs_filter_max_limit
254 }
255
256 fn epochs_since_last_request(
257 &self, last_epoch_number: u64,
258 recent_reported_epochs: &VecDeque<(u64, Vec<H256>)>,
259 ) -> RpcResult<(u64, Vec<(u64, Vec<H256>)>)> {
260 let last_block = if let Some((num, hash)) =
261 recent_reported_epochs.front().cloned()
262 {
263 if last_epoch_number != num {
264 bail!(RpcError {
265 code: ErrorCode::ServerError(codes::UNSUPPORTED),
266 message: "Last block number does not match".into(),
267 data: None,
268 });
269 }
270 Some(hash)
271 } else {
272 None
273 };
274
275 let current_epoch_number = self.best_executed_epoch_number();
277 debug!("current epoch number {}", current_epoch_number);
278 let latest_epochs = self.unfinalized_epochs.read();
279
280 let mut idx = latest_epochs.epochs_queue.len() as i32 - 1;
282 while idx >= 0
283 && latest_epochs.epochs_queue[idx as usize].0
284 != current_epoch_number
285 {
286 if latest_epochs.epochs_queue[idx as usize].0 == last_epoch_number
290 && last_block
291 == Some(latest_epochs.epochs_queue[idx as usize].1.clone())
292 {
293 return Ok((0, vec![]));
294 }
295
296 idx -= 1;
297 }
298
299 let mut end_epoch_number = current_epoch_number + 1;
302 let mut new_epochs = vec![];
303 let mut hs = HashSet::new();
304 while idx >= 0 {
305 let (num, blocks) =
306 latest_epochs.epochs_queue[idx as usize].clone();
307 if num == last_epoch_number
308 && (last_block.is_none() || last_block == Some(blocks.clone()))
309 {
310 break;
311 }
312
313 if num < end_epoch_number && !hs.contains(&num) {
315 hs.insert(num);
316 new_epochs.push((num, blocks));
317 end_epoch_number = num;
318 }
319
320 idx -= 1;
321 }
322 new_epochs.reverse();
323
324 let mut reorg_epochs = vec![];
331 let mut reorg_len = 0;
332 for i in 0..recent_reported_epochs.len() {
333 let (num, hash) = recent_reported_epochs[i].clone();
334
335 if num < end_epoch_number {
336 let pivot_hash =
337 if let Some(v) = latest_epochs.epochs_map.get(&num) {
338 v.last().unwrap().clone()
339 } else {
340 self.block_hashes(EpochNumber::Number(num))
341 .expect("Epoch should exist")
342 };
343
344 if pivot_hash == hash {
345 break;
347 }
348
349 debug!("reorg for {}, pivot hash {:?}", num, pivot_hash);
350 reorg_epochs.push((num, pivot_hash));
351 }
352 reorg_len += 1;
353 }
354 reorg_epochs.reverse();
355
356 debug!(
359 "stable epochs from {} to {}",
360 last_epoch_number + 1,
361 end_epoch_number
362 );
363 for epoch_num in (last_epoch_number + 1)..end_epoch_number {
364 let hash = self
365 .block_hashes(EpochNumber::Number(epoch_num))
366 .expect("Epoch should exist");
367 reorg_epochs.push((epoch_num, hash));
368 }
369 reorg_epochs.append(&mut new_epochs);
370
371 info!(
372 "Chain reorg len: {}, new epochs len: {}",
373 reorg_len,
374 reorg_epochs.len()
375 );
376 Ok((reorg_len, reorg_epochs))
377 }
378
379 fn into_primitive_filter(
380 &self, filter: EthRpcLogFilter,
381 ) -> RpcResult<LogFilter> {
382 filter.into_primitive(self).map_err(|e| e.into())
383 }
384}
385
386impl BlockProvider for &EthFilterHelper {
387 fn get_block_epoch_number(&self, hash: &H256) -> Option<u64> {
388 self.consensus.get_block_epoch_number(hash)
389 }
390
391 fn get_block_hashes_by_epoch(
392 &self, epoch_number: EpochNumber,
393 ) -> Result<Vec<H256>, String> {
394 self.consensus
395 .get_block_hashes_by_epoch(epoch_number)
396 .map_err(|e| e.to_string())
397 }
398}