1use log::{debug, error, info, warn};
6use std::{
7 collections::{BTreeSet, HashMap, HashSet, VecDeque},
8 sync::Arc,
9};
10
11use crate::rpc::{
12 errors::{codes, invalid_params},
13 helpers::{
14 limit_logs, PollFilter, PollManager, SyncPollFilter,
15 MAX_BLOCK_HISTORY_SIZE,
16 },
17 traits::cfx_filter::CfxFilter,
18 types::{CfxFilterChanges, CfxFilterLog, CfxRpcLogFilter, Log, RevertTo},
19};
20use cfx_addr::Network;
21use cfx_types::{Space, H128, H256};
22use cfx_util_macros::bail;
23use cfxcore::{
24 channel::Channel, errors::Error as CfxRpcError, BlockDataManager,
25 ConsensusGraph, SharedConsensusGraph, SharedTransactionPool,
26};
27use itertools::zip;
28use jsonrpc_core::{Error as RpcError, ErrorCode, Result as JsonRpcResult};
29use parking_lot::{Mutex, RwLock};
30use primitives::{
31 filter::LogFilter, log_entry::LocalizedLogEntry, BlockReceipts, EpochNumber,
32};
33use tokio::runtime::Runtime;
34
35pub trait Filterable {
37 fn best_executed_epoch_number(&self) -> u64;
39
40 fn block_hashes(&self, epoch_num: EpochNumber) -> Option<Vec<H256>>;
42
43 fn pending_transaction_hashes(&self) -> BTreeSet<H256>;
45
46 fn logs(&self, filter: LogFilter) -> JsonRpcResult<Vec<Log>>;
48
49 fn logs_for_epoch(
51 &self, filter: &LogFilter, epoch: (u64, Vec<H256>),
52 data_man: &Arc<BlockDataManager>,
53 ) -> JsonRpcResult<Vec<Log>>;
54
55 fn polls(&self) -> &Mutex<PollManager<SyncPollFilter<Log>>>;
57
58 fn consensus_graph(&self) -> &ConsensusGraph;
60
61 fn shared_consensus_graph(&self) -> SharedConsensusGraph;
63
64 fn get_logs_filter_max_limit(&self) -> Option<usize>;
66
67 fn epochs_since_last_request(
69 &self, last_epoch_number: u64,
70 recent_reported_epochs: &VecDeque<(u64, Vec<H256>)>,
71 ) -> JsonRpcResult<(u64, Vec<(u64, Vec<H256>)>)>;
72}
73
74pub struct CfxFilterClient {
76 consensus: SharedConsensusGraph,
77 tx_pool: SharedTransactionPool,
78 polls: Mutex<PollManager<SyncPollFilter<Log>>>,
79 unfinalized_epochs: Arc<RwLock<UnfinalizedEpochs>>,
80 logs_filter_max_limit: Option<usize>,
81 network: Network,
82}
83
84pub struct UnfinalizedEpochs {
85 epochs_queue: VecDeque<(u64, Vec<H256>)>,
86 epochs_map: HashMap<u64, Vec<Vec<H256>>>,
87}
88
89impl Default for UnfinalizedEpochs {
90 fn default() -> Self {
91 UnfinalizedEpochs {
92 epochs_queue: Default::default(),
93 epochs_map: Default::default(),
94 }
95 }
96}
97
98impl CfxFilterClient {
99 pub fn new(
101 consensus: SharedConsensusGraph, tx_pool: SharedTransactionPool,
102 epochs_ordered: Arc<Channel<(u64, Vec<H256>)>>, executor: Arc<Runtime>,
103 poll_lifetime: u32, logs_filter_max_limit: Option<usize>,
104 network: Network,
105 ) -> Self {
106 let filter_client = CfxFilterClient {
107 consensus,
108 tx_pool,
109 polls: Mutex::new(PollManager::new(poll_lifetime)),
110 unfinalized_epochs: Default::default(),
111 logs_filter_max_limit,
112 network,
113 };
114
115 filter_client.start_epochs_loop(epochs_ordered, executor);
117 filter_client
118 }
119
120 fn start_epochs_loop(
121 &self, epochs_ordered: Arc<Channel<(u64, Vec<H256>)>>,
122 executor: Arc<Runtime>,
123 ) {
124 let mut receiver = epochs_ordered.subscribe();
126 let consensus = self.consensus.clone();
127 let epochs = self.unfinalized_epochs.clone();
128
129 let fut = async move {
131 while let Some(epoch) = receiver.recv().await {
132 let mut epochs = epochs.write();
133
134 epochs.epochs_queue.push_back(epoch.clone());
135 epochs
136 .epochs_map
137 .entry(epoch.0)
138 .or_insert(vec![])
139 .push(epoch.1.clone());
140
141 let latest_finalized_epoch_number =
142 consensus.latest_finalized_epoch_number();
143 debug!(
144 "latest finalized epoch number: {}, received epochs: {:?}",
145 latest_finalized_epoch_number, epoch
146 );
147
148 while let Some(e) = epochs.epochs_queue.front() {
150 if e.0 < latest_finalized_epoch_number {
151 let (k, _) = epochs.epochs_queue.pop_front().unwrap();
152 if let Some(target) = epochs.epochs_map.get_mut(&k) {
153 if target.len() == 1 {
154 epochs.epochs_map.remove(&k);
155 } else {
156 target.remove(0);
157 }
158 }
159 } else {
160 break;
161 }
162 }
163 }
164 };
165
166 executor.spawn(fut);
167 }
168}
169
170impl Filterable for CfxFilterClient {
171 fn best_executed_epoch_number(&self) -> u64 {
173 self.consensus_graph().best_executed_state_epoch_number()
174 }
175
176 fn block_hashes(&self, epoch_num: EpochNumber) -> Option<Vec<H256>> {
178 let _inner = self.consensus_graph().inner.read();
180 let hashes =
181 self.consensus_graph().get_block_hashes_by_epoch(epoch_num);
182
183 match hashes {
184 Ok(v) => return Some(v),
185 _ => return None,
186 }
187 }
188
189 fn pending_transaction_hashes(&self) -> BTreeSet<H256> {
191 self.tx_pool.get_pending_transaction_hashes_in_native_pool()
192 }
193
194 fn logs(&self, filter: LogFilter) -> JsonRpcResult<Vec<Log>> {
196 let logs = self
197 .consensus_graph()
198 .logs(filter)
199 .map_err(|err| CfxRpcError::from(err))?;
200
201 Ok(logs
202 .iter()
203 .cloned()
204 .map(|l| Log::try_from_localized(l, self.network))
205 .collect::<Result<_, _>>()
206 .map_err(|_| invalid_params("filter", "retrieve logs error"))?)
207 }
208
209 fn logs_for_epoch(
210 &self, filter: &LogFilter, epoch: (u64, Vec<H256>),
211 data_man: &Arc<BlockDataManager>,
212 ) -> JsonRpcResult<Vec<Log>> {
213 let mut result = vec![];
214 let logs = match retrieve_epoch_logs(data_man, epoch) {
215 Some(logs) => logs,
216 None => bail!(RpcError {
217 code: ErrorCode::ServerError(codes::UNSUPPORTED),
218 message: "Unable to retrieve logs for epoch".into(),
219 data: None,
220 }),
221 };
222
223 let logs: Vec<Log> = logs
225 .iter()
226 .filter(|l| filter.matches(&l.entry))
227 .cloned()
228 .map(|l| Log::try_from_localized(l, self.network))
229 .collect::<Result<_, _>>()
230 .map_err(|_| {
231 invalid_params("filter", "retrieve logs for epoch error")
232 })?;
233 result.extend(logs);
234
235 Ok(result)
236 }
237
238 fn polls(&self) -> &Mutex<PollManager<SyncPollFilter<Log>>> { &self.polls }
240
241 fn consensus_graph(&self) -> &ConsensusGraph { &self.consensus }
242
243 fn shared_consensus_graph(&self) -> SharedConsensusGraph {
244 self.consensus.clone()
245 }
246
247 fn get_logs_filter_max_limit(&self) -> Option<usize> {
248 self.logs_filter_max_limit
249 }
250
251 fn epochs_since_last_request(
252 &self, last_epoch_number: u64,
253 recent_reported_epochs: &VecDeque<(u64, Vec<H256>)>,
254 ) -> JsonRpcResult<(u64, Vec<(u64, Vec<H256>)>)> {
255 let last_block = if let Some((num, hash)) =
256 recent_reported_epochs.front().cloned()
257 {
258 if last_epoch_number != num {
259 bail!(RpcError {
260 code: ErrorCode::ServerError(codes::UNSUPPORTED),
261 message: "Last block number does not match".into(),
262 data: None,
263 });
264 }
265 Some(hash)
266 } else {
267 None
268 };
269
270 let current_epoch_number = self.best_executed_epoch_number();
272 debug!("current epoch number {}", current_epoch_number);
273 let latest_epochs = self.unfinalized_epochs.read();
274
275 let mut idx = latest_epochs.epochs_queue.len() as i32 - 1;
277 while idx >= 0
278 && latest_epochs.epochs_queue[idx as usize].0
279 != current_epoch_number
280 {
281 if latest_epochs.epochs_queue[idx as usize].0 == last_epoch_number
285 && last_block
286 == Some(latest_epochs.epochs_queue[idx as usize].1.clone())
287 {
288 return Ok((0, vec![]));
289 }
290
291 idx -= 1;
292 }
293
294 let mut end_epoch_number = current_epoch_number + 1;
297 let mut new_epochs = vec![];
298 let mut hs = HashSet::new();
299 while idx >= 0 {
300 let (num, blocks) =
301 latest_epochs.epochs_queue[idx as usize].clone();
302 if num == last_epoch_number
303 && (last_block.is_none() || last_block == Some(blocks.clone()))
304 {
305 break;
306 }
307
308 if num < end_epoch_number && !hs.contains(&num) {
310 hs.insert(num);
311 new_epochs.push((num, blocks));
312 end_epoch_number = num;
313 }
314
315 idx -= 1;
316 }
317 new_epochs.reverse();
318
319 let mut reorg_epochs = vec![];
326 let mut reorg_len = 0;
327 for i in 0..recent_reported_epochs.len() {
328 let (num, hash) = recent_reported_epochs[i].clone();
329
330 if num < end_epoch_number {
331 let pivot_hash =
332 if let Some(v) = latest_epochs.epochs_map.get(&num) {
333 v.last().unwrap().clone()
334 } else {
335 self.block_hashes(EpochNumber::Number(num))
336 .expect("Epoch should exist")
337 };
338
339 if pivot_hash == hash {
340 break;
342 }
343
344 debug!("reorg for {}, pivot hash {:?}", num, pivot_hash);
345 reorg_epochs.push((num, pivot_hash));
346 }
347 reorg_len += 1;
348 }
349 reorg_epochs.reverse();
350
351 debug!(
354 "stable epochs from {} to {}",
355 last_epoch_number + 1,
356 end_epoch_number
357 );
358 for epoch_num in (last_epoch_number + 1)..end_epoch_number {
359 let hash = self
360 .block_hashes(EpochNumber::Number(epoch_num))
361 .expect("Epoch should exist");
362 reorg_epochs.push((epoch_num, hash));
363 }
364 reorg_epochs.append(&mut new_epochs);
365
366 info!(
367 "Chain reorg len: {}, new epochs len: {}",
368 reorg_len,
369 reorg_epochs.len()
370 );
371 Ok((reorg_len, reorg_epochs))
372 }
373}
374
375impl<T: Filterable + Send + Sync + 'static> CfxFilter for T {
376 fn new_filter(&self, filter: CfxRpcLogFilter) -> JsonRpcResult<H128> {
378 debug!("create filter: {:?}", filter);
379 let mut polls = self.polls().lock();
380 let epoch_number = self.best_executed_epoch_number();
381
382 let filter: LogFilter = filter.into_primitive()?;
383
384 let id = polls.create_poll(SyncPollFilter::new(PollFilter::Logs {
385 last_epoch_number: if epoch_number == 0 {
386 0
387 } else {
388 epoch_number - 1
389 },
390 filter,
391 include_pending: false,
392 previous_logs: VecDeque::with_capacity(MAX_BLOCK_HISTORY_SIZE),
393 recent_reported_epochs: VecDeque::with_capacity(
394 MAX_BLOCK_HISTORY_SIZE,
395 ),
396 }));
397
398 Ok(id.into())
399 }
400
401 fn new_block_filter(&self) -> JsonRpcResult<H128> {
403 debug!("create block filter");
404 let mut polls = self.polls().lock();
405 let id = polls.create_poll(SyncPollFilter::new(PollFilter::Block {
407 last_epoch_number: self.best_executed_epoch_number(),
408 recent_reported_epochs: VecDeque::with_capacity(
409 MAX_BLOCK_HISTORY_SIZE,
410 ),
411 }));
412
413 Ok(id.into())
414 }
415
416 fn new_pending_transaction_filter(&self) -> JsonRpcResult<H128> {
418 debug!("create pending transaction filter");
419 let mut polls = self.polls().lock();
420 let pending_transactions = self.pending_transaction_hashes();
421 let id = polls.create_poll(SyncPollFilter::new(
422 PollFilter::PendingTransaction(pending_transactions),
423 ));
424 Ok(id.into())
425 }
426
427 fn filter_changes(&self, index: H128) -> JsonRpcResult<CfxFilterChanges> {
429 info!("filter_changes id: {}", index);
430 let filter = match self.polls().lock().poll_mut(&index) {
431 Some(filter) => filter.clone(),
432 None => bail!(RpcError {
433 code: ErrorCode::InvalidRequest,
434 message: "Filter not found".into(),
435 data: None,
436 }),
437 };
438
439 filter.modify(|filter| match *filter {
440 PollFilter::Block {
441 ref mut last_epoch_number,
442 ref mut recent_reported_epochs,
443 } => {
444 let (reorg_len, epochs) = self.epochs_since_last_request(
445 *last_epoch_number,
446 recent_reported_epochs,
447 )?;
448
449 for _ in 0..reorg_len {
451 recent_reported_epochs.pop_front();
452 }
453
454 let mut hashes = Vec::new();
455 for (num, blocks) in epochs.into_iter() {
456 *last_epoch_number = num;
457 hashes.append(&mut blocks.clone());
458
459 if recent_reported_epochs.len() >= MAX_BLOCK_HISTORY_SIZE {
461 recent_reported_epochs.pop_back();
462 }
463 recent_reported_epochs.push_front((num, blocks));
464 }
465
466 Ok(CfxFilterChanges::Hashes(hashes))
467 }
468 PollFilter::PendingTransaction(ref mut previous_hashes) => {
469 let current_hashes = self.pending_transaction_hashes();
471
472 let new_hashes = {
473 current_hashes
475 .difference(previous_hashes)
476 .cloned()
477 .map(Into::into)
478 .collect()
479 };
480
481 *previous_hashes = current_hashes;
483
484 Ok(CfxFilterChanges::Hashes(new_hashes))
486 }
487 PollFilter::Logs {
488 ref mut last_epoch_number,
489 ref mut recent_reported_epochs,
490 ref mut previous_logs,
491 ref filter,
492 include_pending: _,
493 } => {
494 let (reorg_len, epochs) = self.epochs_since_last_request(
495 *last_epoch_number,
496 recent_reported_epochs,
497 )?;
498
499 let mut logs = vec![];
500
501 for _ in 0..reorg_len {
503 recent_reported_epochs.pop_front().unwrap();
504 }
505
506 if reorg_len > 0 {
507 logs.push(CfxFilterLog::ChainReorg(RevertTo {
508 revert_to: epochs.first().unwrap().0.into(),
509 }));
510 }
511 let data_man = self.consensus_graph().data_manager().clone();
512
513 for (num, blocks) in epochs.into_iter() {
515 let log = match self.logs_for_epoch(
516 &filter,
517 (num, blocks.clone()),
518 &data_man,
519 ) {
520 Ok(l) => l,
521 _ => break,
522 };
523
524 log.iter()
525 .for_each(|l| logs.push(CfxFilterLog::Log(l.clone())));
527
528 *last_epoch_number = num;
530
531 if recent_reported_epochs.len() >= MAX_BLOCK_HISTORY_SIZE {
533 recent_reported_epochs.pop_back();
534 previous_logs.pop_back();
535 }
536 recent_reported_epochs.push_front((num, blocks));
537 previous_logs.push_front(log);
538 }
539
540 Ok(CfxFilterChanges::Logs(limit_logs(
541 logs,
542 self.get_logs_filter_max_limit(),
543 )))
544 }
545 })
546 }
547
548 fn filter_logs(&self, index: H128) -> JsonRpcResult<Vec<Log>> {
550 let (filter, _) = {
551 let mut polls = self.polls().lock();
552
553 match polls.poll(&index).and_then(|f| {
554 f.modify(|filter| match *filter {
555 PollFilter::Logs {
556 ref filter,
557 include_pending,
558 ..
559 } => Some((filter.clone(), include_pending)),
560 _ => None,
561 })
562 }) {
563 Some((filter, include_pending)) => (filter, include_pending),
564 None => bail!(RpcError {
565 code: ErrorCode::InvalidRequest,
566 message: "Filter not found".into(),
567 data: None,
568 }),
569 }
570 };
571
572 Ok(limit_logs(
574 self.logs(filter)?,
575 self.get_logs_filter_max_limit(),
576 ))
577 }
578
579 fn uninstall_filter(&self, index: H128) -> JsonRpcResult<bool> {
581 Ok(self.polls().lock().remove_poll(&index))
582 }
583}
584
585fn retrieve_epoch_logs(
586 data_man: &Arc<BlockDataManager>, epoch: (u64, Vec<H256>),
587) -> Option<Vec<LocalizedLogEntry>> {
588 debug!("retrieve_epoch_logs {:?}", epoch);
589 let (epoch_number, hashes) = epoch;
590 let pivot = hashes.last().cloned().expect("epoch should not be empty");
591
592 let fut = hashes
594 .iter()
595 .map(|h| retrieve_block_receipts(&data_man, h, &pivot));
596
597 let receipts = fut.into_iter().collect::<Option<Vec<_>>>()?;
598
599 let mut logs = vec![];
600 let mut log_index = 0;
601
602 for (block_hash, block_receipts) in zip(hashes, receipts) {
603 let block = match data_man
605 .block_by_hash(&block_hash, true )
606 {
607 Some(b) => b,
608 None => {
609 warn!("Unable to retrieve block {:?}", block_hash);
610 return None;
611 }
612 };
613
614 let txs = &block.transactions;
615 assert_eq!(block_receipts.receipts.len(), txs.len());
616
617 for (txid, (receipt, tx)) in
619 zip(&block_receipts.receipts, txs).enumerate()
620 {
621 let native_logs: Vec<_> = receipt
622 .logs
623 .iter()
624 .cloned()
625 .filter(|l| l.space == Space::Native)
626 .collect();
627
628 for (logid, entry) in native_logs.into_iter().enumerate() {
629 logs.push(LocalizedLogEntry {
630 entry,
631 block_hash,
632 epoch_number,
633 block_timestamp: Some(block.block_header.timestamp()),
634 transaction_hash: tx.hash,
635 transaction_index: txid,
636 log_index,
637 transaction_log_index: logid,
638 });
639
640 log_index += 1;
641 }
642 }
643 }
644
645 Some(logs)
646}
647
648fn retrieve_block_receipts(
650 data_man: &Arc<BlockDataManager>, block: &H256, pivot: &H256,
651) -> Option<Arc<BlockReceipts>> {
652 match data_man.block_execution_result_by_hash_with_epoch(
653 &block, &pivot, false, false, ) {
656 Some(res) => return Some(res.block_receipts.clone()),
657 None => {
658 error!("Cannot find receipts with {:?}/{:?}", block, pivot);
659 return None;
660 }
661 }
662}