cfxcore/consensus/consensus_graph/rpc_api/
trace_provider.rs

1use crate::{
2    block_data_manager::DataVersionTuple, errors::Result as CoreResult,
3};
4use alloy_rpc_types_trace::geth::GethDebugTracingOptions;
5use cfx_execute_helper::exec_tracer::{
6    ActionType, BlockExecTraces, LocalizedTrace, TraceFilter,
7};
8use cfx_executor::state::State;
9use cfx_types::{AddressWithSpace, Bloom, Space, H256, U256};
10use either::Either;
11use geth_tracer::GethTraceWithHash;
12use itertools::Itertools;
13
14use primitives::{
15    epoch::BlockHashOrEpochNumber,
16    filter::{FilterError, LogFilter},
17    log_entry::LocalizedLogEntry,
18    receipt::Receipt,
19    Block, EpochNumber, SignedTransaction,
20};
21use rayon::prelude::*;
22use std::{cmp::max, collections::HashSet, sync::Arc};
23
24use super::super::ConsensusGraph;
25
26impl ConsensusGraph {
27    // TODO: maybe return error for reserved address? Not sure where is the best
28    //  place to do the check.
29    pub fn next_nonce(
30        &self, address: AddressWithSpace,
31        block_hash_or_epoch_number: BlockHashOrEpochNumber,
32        rpc_param_name: &str,
33    ) -> CoreResult<U256> {
34        let epoch_number = match block_hash_or_epoch_number {
35            BlockHashOrEpochNumber::BlockHashWithOption {
36                hash,
37                require_pivot,
38            } => EpochNumber::Number(
39                self.get_block_epoch_number_with_pivot_check(
40                    &hash,
41                    require_pivot.unwrap_or(true),
42                )?,
43            ),
44            BlockHashOrEpochNumber::EpochNumber(epoch_number) => epoch_number,
45        };
46        let state = State::new(
47            self.get_state_db_by_epoch_number(epoch_number, rpc_param_name)?,
48        )?;
49
50        Ok(state.nonce(&address)?)
51    }
52
53    fn earliest_epoch_for_trace_filter(&self) -> u64 {
54        self.data_man.earliest_epoch_with_trace()
55    }
56
57    fn filter_block_receipts<'a>(
58        &self, filter: &'a LogFilter, epoch_number: u64, block_hash: H256,
59        block_timestamp: Option<u64>, mut receipts: Vec<Receipt>,
60        mut tx_hashes: Vec<H256>,
61    ) -> impl Iterator<Item = LocalizedLogEntry> + 'a {
62        // sanity check
63        if receipts.len() != tx_hashes.len() {
64            warn!("Block ({}) has different number of receipts ({}) to transactions ({}). Database corrupt?", block_hash, receipts.len(), tx_hashes.len());
65            assert!(false);
66        }
67
68        // iterate in reverse
69        receipts.reverse();
70        tx_hashes.reverse();
71
72        let mut log_index = receipts
73            .iter()
74            .flat_map(|r| r.logs.iter())
75            .filter(|l| l.space == filter.space)
76            .count();
77
78        let receipts_len = receipts.len();
79
80        receipts
81            .into_iter()
82            .map(|receipt| receipt.logs)
83            .zip(tx_hashes)
84            .enumerate()
85            .flat_map(move |(index, (logs, transaction_hash))| {
86                let mut logs: Vec<_> = logs
87                    .into_iter()
88                    .filter(|l| l.space == filter.space)
89                    .collect();
90
91                let current_log_index = log_index;
92                let no_of_logs = logs.len();
93                log_index -= no_of_logs;
94
95                logs.reverse();
96                logs.into_iter().enumerate().map(move |(i, log)| {
97                    LocalizedLogEntry {
98                        entry: log,
99                        block_hash,
100                        epoch_number,
101                        block_timestamp,
102                        transaction_hash,
103                        // iterating in reverse order
104                        transaction_index: receipts_len - index - 1,
105                        transaction_log_index: no_of_logs - i - 1,
106                        log_index: current_log_index - i - 1,
107                    }
108                })
109            })
110            .filter(move |log_entry| filter.matches(&log_entry.entry))
111    }
112
113    fn filter_block<'a>(
114        &self, filter: &'a LogFilter, bloom_possibilities: &'a Vec<Bloom>,
115        epoch: u64, pivot_hash: H256, block_hash: H256,
116    ) -> Result<impl Iterator<Item = LocalizedLogEntry> + 'a, FilterError> {
117        // special case for genesis (for now, genesis has no logs)
118        if epoch == 0 {
119            return Ok(Either::Left(std::iter::empty()));
120        }
121
122        // check if epoch is still available
123        let min = self.earliest_epoch_for_log_filter();
124
125        if epoch < min {
126            return Err(FilterError::EpochAlreadyPruned { epoch, min });
127        }
128
129        // get block bloom and receipts from db
130        let (block_bloom, receipts) = match self
131            .data_man
132            .block_execution_result_by_hash_with_epoch(
133                &block_hash,
134                &pivot_hash,
135                false, /* update_pivot_assumption */
136                false, /* update_cache */
137            ) {
138            Some(r) => (r.bloom, r.block_receipts.receipts.clone()),
139            None => {
140                // `block_hash` must exist so the block not executed yet
141                return Err(FilterError::BlockNotExecutedYet { block_hash });
142            }
143        };
144
145        // filter block
146        if !bloom_possibilities
147            .iter()
148            .any(|bloom| block_bloom.contains_bloom(bloom))
149        {
150            return Ok(Either::Left(std::iter::empty()));
151        }
152
153        // get block body from db
154        let block = match self.data_man.block_by_hash(&block_hash, false) {
155            Some(b) => b,
156            None => {
157                // `block_hash` must exist so this is an internal error
158                error!(
159                    "Block {:?} in epoch {} ({:?}) not found",
160                    block_hash, epoch, pivot_hash
161                );
162
163                return Err(FilterError::UnknownBlock { hash: block_hash });
164            }
165        };
166
167        Ok(Either::Right(self.filter_block_receipts(
168            &filter,
169            epoch,
170            block_hash,
171            Some(block.block_header.timestamp()),
172            receipts,
173            block.transaction_hashes(/* space filter */ None),
174        )))
175    }
176
177    fn filter_phantom_block<'a>(
178        &self, filter: &'a LogFilter, bloom_possibilities: &'a Vec<Bloom>,
179        epoch: u64, pivot_hash: H256,
180    ) -> Result<impl Iterator<Item = LocalizedLogEntry> + 'a, FilterError> {
181        // special case for genesis (for now, genesis has no logs)
182        if epoch == 0 {
183            return Ok(Either::Left(std::iter::empty()));
184        }
185
186        // check if epoch is still available
187        let min = self.earliest_epoch_for_log_filter();
188
189        if epoch < min {
190            return Err(FilterError::EpochAlreadyPruned { epoch, min });
191        }
192
193        // filter block
194        let epoch_bloom = match self.get_phantom_block_bloom_filter(
195            EpochNumber::Number(epoch),
196            pivot_hash,
197        )? {
198            Some(b) => b,
199            None => {
200                return Err(FilterError::BlockNotExecutedYet {
201                    block_hash: pivot_hash,
202                })
203            }
204        };
205
206        if !bloom_possibilities
207            .iter()
208            .any(|bloom| epoch_bloom.contains_bloom(bloom))
209        {
210            return Ok(Either::Left(std::iter::empty()));
211        }
212
213        // construct phantom block
214        let pb = match self.get_phantom_block_by_number(
215            EpochNumber::Number(epoch),
216            Some(pivot_hash),
217            false, /* include_traces */
218        )? {
219            Some(b) => b,
220            None => {
221                return Err(FilterError::BlockNotExecutedYet {
222                    block_hash: pivot_hash,
223                })
224            }
225        };
226
227        Ok(Either::Right(self.filter_block_receipts(
228            &filter,
229            epoch,
230            pivot_hash,
231            Some(pb.pivot_header.timestamp()),
232            pb.receipts,
233            pb.transactions.iter().map(|t| t.hash()).collect(),
234        )))
235    }
236
237    fn filter_single_epoch<'a>(
238        &'a self, filter: &'a LogFilter, bloom_possibilities: &'a Vec<Bloom>,
239        epoch: u64,
240    ) -> Result<Vec<LocalizedLogEntry>, FilterError> {
241        // retrieve epoch hashes and pivot hash
242        let mut epoch_hashes =
243            self.inner.read_recursive().block_hashes_by_epoch(epoch)?;
244
245        let pivot_hash = *epoch_hashes.last().expect("Epoch set not empty");
246
247        // process hashes in reverse order
248        epoch_hashes.reverse();
249
250        if filter.space == Space::Ethereum {
251            Ok(self
252                .filter_phantom_block(
253                    &filter,
254                    &bloom_possibilities,
255                    epoch,
256                    pivot_hash,
257                )?
258                .collect())
259        } else {
260            epoch_hashes
261                .into_iter()
262                .map(move |block_hash| {
263                    self.filter_block(
264                        &filter,
265                        &bloom_possibilities,
266                        epoch,
267                        pivot_hash,
268                        block_hash,
269                    )
270                })
271                // flatten results
272                // Iterator<Result<Iterator<_>>> -> Iterator<Result<_>>
273                .flat_map(|res| match res {
274                    Ok(it) => Either::Left(it.map(Ok)),
275                    Err(e) => Either::Right(std::iter::once(Err(e))),
276                })
277                .collect()
278        }
279    }
280
281    fn filter_epoch_batch(
282        &self, filter: &LogFilter, bloom_possibilities: &Vec<Bloom>,
283        epochs: Vec<u64>, consistency_check_data: &mut Option<(u64, H256)>,
284    ) -> Result<Vec<LocalizedLogEntry>, FilterError> {
285        // lock so that we have a consistent view during this batch
286        let inner = self.inner.read();
287
288        // NOTE: as batches are processed atomically and only the
289        // first batch (last few epochs) is likely to fluctuate, it is unlikely
290        // that releasing the lock between batches would cause inconsistency:
291        // we assume there are no pivot chain reorgs deeper than batch_size.
292        // However, we still add a simple sanity check here:
293
294        if let Some((epoch, pivot)) = *consistency_check_data {
295            let new_pivot = inner.get_pivot_hash_from_epoch_number(epoch)?;
296
297            if pivot != new_pivot {
298                return Err(FilterError::PivotChainReorg {
299                    epoch,
300                    from: pivot,
301                    to: new_pivot,
302                });
303            }
304        }
305
306        *consistency_check_data = Some((
307            epochs[0],
308            inner.get_pivot_hash_from_epoch_number(epochs[0])?,
309        ));
310
311        let epoch_batch_logs = epochs
312            .into_par_iter() // process each epoch of this batch in parallel
313            .map(|e| self.filter_single_epoch(filter, bloom_possibilities, e))
314            .collect::<Result<Vec<Vec<LocalizedLogEntry>>, FilterError>>()?; // short-circuit on error
315
316        Ok(epoch_batch_logs.into_iter().flatten().collect())
317    }
318
319    pub fn get_log_filter_epoch_range(
320        &self, from_epoch: EpochNumber, to_epoch: EpochNumber,
321        check_range: bool,
322    ) -> Result<impl Iterator<Item = u64>, FilterError> {
323        // lock so that we have a consistent view
324        let _inner = self.inner.read_recursive();
325
326        let from_epoch =
327            self.get_height_from_epoch_number(from_epoch.clone())?;
328        let to_epoch = self.get_height_from_epoch_number(to_epoch.clone())?;
329
330        if from_epoch > to_epoch {
331            return Err(FilterError::InvalidEpochNumber {
332                from_epoch,
333                to_epoch,
334            });
335        }
336
337        if from_epoch < self.earliest_epoch_for_log_filter() {
338            return Err(FilterError::EpochAlreadyPruned {
339                epoch: from_epoch,
340                min: self.earliest_epoch_for_log_filter(),
341            });
342        }
343
344        if check_range {
345            if let Some(max_gap) = self.config.get_logs_filter_max_epoch_range {
346                // The range includes both ends.
347                if to_epoch - from_epoch + 1 > max_gap {
348                    return Err(FilterError::EpochNumberGapTooLarge {
349                        from_epoch,
350                        to_epoch,
351                        max_gap,
352                    });
353                }
354            }
355        }
356
357        return Ok((from_epoch..=to_epoch).rev());
358    }
359
360    pub fn get_trace_filter_epoch_range(
361        &self, filter: &TraceFilter,
362    ) -> Result<impl Iterator<Item = u64>, FilterError> {
363        // lock so that we have a consistent view
364        let _inner = self.inner.read_recursive();
365
366        let from_epoch =
367            self.get_height_from_epoch_number(filter.from_epoch.clone())?;
368        let to_epoch =
369            self.get_height_from_epoch_number(filter.to_epoch.clone())?;
370
371        if from_epoch > to_epoch {
372            return Err(FilterError::InvalidEpochNumber {
373                from_epoch,
374                to_epoch,
375            });
376        }
377
378        if from_epoch < self.earliest_epoch_for_trace_filter() {
379            return Err(FilterError::EpochAlreadyPruned {
380                epoch: from_epoch,
381                min: self.earliest_epoch_for_trace_filter(),
382            });
383        }
384        Ok(from_epoch..=to_epoch)
385    }
386
387    fn filter_logs_by_epochs(
388        &self, from_epoch: EpochNumber, to_epoch: EpochNumber,
389        filter: &LogFilter, blocks_to_skip: HashSet<H256>, check_range: bool,
390    ) -> Result<Vec<LocalizedLogEntry>, FilterError> {
391        let bloom_possibilities = filter.bloom_possibilities();
392
393        // we store the last epoch processed and the corresponding pivot hash so
394        // that we can check whether it changed between batches
395        let mut consistency_check_data: Option<(u64, H256)> = None;
396
397        let mut logs = self
398            // iterate over epochs in reverse order
399            .get_log_filter_epoch_range(from_epoch, to_epoch, check_range)?
400            // we process epochs in each batch in parallel
401            // but batches are processed one-by-one
402            .chunks(self.config.get_logs_epoch_batch_size)
403            .into_iter()
404            .map(move |epochs| {
405                self.filter_epoch_batch(
406                    &filter,
407                    &bloom_possibilities,
408                    epochs.into_iter().collect(),
409                    &mut consistency_check_data,
410                )
411            })
412            // flatten results
413            .flat_map(|res| match res {
414                Ok(vec) => Either::Left(vec.into_iter().map(Ok)),
415                Err(e) => Either::Right(std::iter::once(Err(e))),
416            })
417            // take as many as we need
418            .skip_while(|res| match res {
419                Ok(log) => blocks_to_skip.contains(&log.block_hash),
420                Err(_) => false,
421            })
422            // Limit logs can return
423            .take(
424                self.config
425                    .get_logs_filter_max_limit
426                    .unwrap_or(::std::usize::MAX - 1)
427                    + 1,
428            )
429            // short-circuit on error
430            .collect::<Result<Vec<LocalizedLogEntry>, FilterError>>()?;
431
432        logs.reverse();
433        Ok(logs)
434    }
435
436    // collect epoch number, block index in epoch, block hash, pivot hash
437    fn collect_block_info(
438        &self, block_hash: H256,
439    ) -> Result<(u64, usize, H256, H256), FilterError> {
440        // special case for genesis
441        if block_hash == self.data_man.true_genesis.hash() {
442            return Ok((0, 0, block_hash, block_hash));
443        }
444
445        // check if block exists
446        if self.data_man.block_header_by_hash(&block_hash).is_none() {
447            bail!(FilterError::UnknownBlock { hash: block_hash });
448        };
449
450        // find pivot block
451        let pivot_hash = match self
452            .inner
453            .read_recursive()
454            .block_execution_results_by_hash(&block_hash, false)
455        {
456            Some(r) => r.0,
457            None => {
458                match self.data_man.local_block_info_by_hash(&block_hash) {
459                    // if local block info is not available, that means this
460                    // block has never entered the consensus graph.
461                    None => {
462                        bail!(FilterError::BlockNotExecutedYet { block_hash })
463                    }
464                    // if the local block info is available, then it is very
465                    // likely that we have already executed this block and the
466                    // results are not available because they have been pruned.
467                    // NOTE: it might be possible that the block has entered
468                    // consensus graph but has not been executed yet, or that it
469                    // was not executed because it was invalid. these cases seem
470                    // rare enough to not require special handling here; we can
471                    // add more fine-grained errors in the future if necessary.
472                    Some(_) => {
473                        bail!(FilterError::BlockAlreadyPruned { block_hash })
474                    }
475                }
476            }
477        };
478
479        // find epoch number
480        let epoch = match self.data_man.block_header_by_hash(&pivot_hash) {
481            Some(h) => h.height(),
482            None => {
483                // internal error
484                error!("Header of pivot block {:?} not found", pivot_hash);
485                bail!(FilterError::UnknownBlock { hash: pivot_hash });
486            }
487        };
488
489        let index_in_epoch = self
490            .inner
491            .read_recursive()
492            .block_hashes_by_epoch(epoch)?
493            .into_iter()
494            .position(|h| h == block_hash)
495            .expect("Block should exit in epoch set");
496
497        Ok((epoch, index_in_epoch, block_hash, pivot_hash))
498    }
499
500    fn filter_logs_by_block_hashes(
501        &self, block_hashes: Vec<H256>, filter: LogFilter,
502    ) -> Result<Vec<LocalizedLogEntry>, FilterError> {
503        let bloom_possibilities = filter.bloom_possibilities();
504
505        // keep a consistent view during filtering
506        let _inner = self.inner.read();
507
508        // collect all block info in memory
509        // note: we allow at most 128 block hashes so this should be fine
510        let mut block_infos = block_hashes
511            .into_par_iter()
512            .map(|block_hash| self.collect_block_info(block_hash))
513            .collect::<Result<Vec<_>, _>>()?;
514
515        // lexicographic order will match execution order
516        block_infos.sort();
517
518        // process blocks in reverse
519        block_infos.reverse();
520
521        let mut logs = block_infos
522            .into_iter()
523            .map(|(epoch, _, block_hash, pivot_hash)| {
524                self.filter_block(
525                    &filter,
526                    &bloom_possibilities,
527                    epoch,
528                    pivot_hash,
529                    block_hash,
530                )
531            })
532            // flatten results
533            .flat_map(|res| match res {
534                Ok(it) => Either::Left(it.into_iter().map(Ok)),
535                Err(e) => Either::Right(std::iter::once(Err(e))),
536            })
537            // Limit logs can return
538            .take(
539                self.config
540                    .get_logs_filter_max_limit
541                    .unwrap_or(::std::usize::MAX - 1)
542                    + 1,
543            )
544            // short-circuit on error
545            .collect::<Result<Vec<_>, _>>()?;
546
547        logs.reverse();
548        Ok(logs)
549    }
550
551    fn filter_logs_by_block_numbers(
552        &self, from_block: u64, to_block: u64, filter: LogFilter,
553    ) -> Result<Vec<LocalizedLogEntry>, FilterError> {
554        // check range
555        if from_block > to_block {
556            return Err(FilterError::InvalidBlockNumber {
557                from_block,
558                to_block,
559            });
560        }
561
562        if let Some(max_gap) =
563            self.config.get_logs_filter_max_block_number_range
564        {
565            // The range includes both ends.
566            if to_block - from_block + 1 > max_gap {
567                return Err(FilterError::BlockNumberGapTooLarge {
568                    from_block,
569                    to_block,
570                    max_gap,
571                });
572            }
573        }
574
575        // collect info from db
576        let from_hash = match self
577            .data_man
578            .hash_by_block_number(from_block, true /* update_cache */)
579        {
580            Some(h) => h,
581            None => bail!(FilterError::Custom(format!(
582                "Unable to find block hash for from_block {:?}",
583                from_block
584            ))),
585        };
586
587        let to_hash = match self
588            .data_man
589            .hash_by_block_number(to_block, true /* update_cache */)
590        {
591            Some(h) => h,
592            None => bail!(FilterError::Custom(format!(
593                "Unable to find block hash for to_block {:?}",
594                to_block
595            ))),
596        };
597
598        let from_epoch = match self.get_block_epoch_number(&from_hash) {
599            Some(e) => e,
600            None => bail!(FilterError::Custom(format!(
601                "Unable to find epoch number for block {:?}",
602                from_hash
603            ))),
604        };
605
606        let to_epoch = match self.get_block_epoch_number(&to_hash) {
607            Some(e) => e,
608            None => bail!(FilterError::Custom(format!(
609                "Unable to find epoch number for block {:?}",
610                to_hash
611            ))),
612        };
613
614        let (from_epoch_hashes, to_epoch_hashes) = {
615            let inner = self.inner.read();
616            (
617                inner.block_hashes_by_epoch(from_epoch)?,
618                inner.block_hashes_by_epoch(to_epoch)?,
619            )
620        };
621
622        // filter logs based on epochs
623        // out-of-range blocks from the _end_ of the range
624        // are handled by `filter_logs_by_epochs`
625        let skip_from_end = to_epoch_hashes
626            .into_iter()
627            .skip_while(|h| *h != to_hash)
628            .skip(1)
629            .collect();
630
631        let epoch_range_logs = self.filter_logs_by_epochs(
632            EpochNumber::Number(from_epoch),
633            EpochNumber::Number(to_epoch),
634            &filter,
635            skip_from_end,
636            false, /* check_range */
637        )?;
638
639        // remove out-of-range blocks from the _start_ of the range
640        let skip_from_start: HashSet<_> = from_epoch_hashes
641            .into_iter()
642            .take_while(|h| *h != from_hash)
643            .collect();
644
645        Ok(epoch_range_logs
646            .into_iter()
647            .skip_while(|log| skip_from_start.contains(&log.block_hash))
648            .collect())
649    }
650
651    pub fn logs(
652        &self, filter: LogFilter,
653    ) -> Result<Vec<LocalizedLogEntry>, FilterError> {
654        match &filter {
655            // filter by epoch numbers
656            LogFilter::EpochLogFilter {
657                from_epoch,
658                to_epoch,
659                ..
660            } => {
661                // When query logs, if epoch number greater than
662                // best_executed_state_epoch_number, use LatestState instead of
663                // epoch number, in this case we can return logs from from_epoch
664                // to LatestState
665                let to_epoch = if let EpochNumber::Number(num) = to_epoch {
666                    let epoch_number =
667                        if *num > self.best_executed_state_epoch_number() {
668                            EpochNumber::LatestState
669                        } else {
670                            to_epoch.clone()
671                        };
672
673                    epoch_number
674                } else {
675                    to_epoch.clone()
676                };
677
678                self.filter_logs_by_epochs(
679                    from_epoch.clone(),
680                    to_epoch,
681                    &filter,
682                    Default::default(),
683                    !filter.trusted, /* check_range */
684                )
685            }
686
687            // filter by block hashes
688            LogFilter::BlockHashLogFilter { block_hashes, .. } => {
689                self.filter_logs_by_block_hashes(block_hashes.clone(), filter)
690            }
691
692            // filter by block numbers
693            LogFilter::BlockNumberLogFilter {
694                from_block,
695                to_block,
696                ..
697            } => self.filter_logs_by_block_numbers(
698                from_block.clone(),
699                to_block.clone(),
700                filter,
701            ),
702        }
703    }
704
705    // TODO(lpl): Limit epoch range in filter.
706    pub fn filter_traces(
707        &self, mut filter: TraceFilter,
708    ) -> Result<Vec<LocalizedTrace>, FilterError> {
709        let traces = match filter.block_hashes.take() {
710            None => self.filter_traces_by_epochs(&filter),
711            Some(hashes) => self.filter_traces_by_block_hashes(&filter, hashes),
712        }?;
713        // Apply `filter.after` and `filter.count` after getting all trace
714        // entries.
715        Ok(traces
716            .into_iter()
717            .skip(filter.after.unwrap_or(0))
718            .take(filter.count.unwrap_or(usize::max_value()))
719            .collect())
720    }
721
722    pub fn collect_epoch_geth_trace(
723        &self, epoch_num: u64, tx_hash: Option<H256>,
724        opts: GethDebugTracingOptions,
725    ) -> CoreResult<Vec<GethTraceWithHash>> {
726        let epoch = EpochNumber::Number(epoch_num);
727        self.validate_stated_epoch(&epoch)?;
728
729        let epoch_block_hashes = if let Ok(v) =
730            self.get_block_hashes_by_epoch(epoch)
731        {
732            v
733        } else {
734            bail!("cannot get block hashes in the specified epoch, maybe it does not exist?");
735        };
736
737        let blocks = self
738            .data_man
739            .blocks_by_hash_list(
740                &epoch_block_hashes,
741                true, /* update_cache */
742            )
743            .expect("blocks exist");
744
745        let pivot_block = blocks.last().expect("Not empty");
746        let parent_pivot_block_hash = pivot_block.block_header.parent_hash();
747        let parent_epoch_num = pivot_block.block_header.height() - 1;
748
749        self.collect_blocks_geth_trace(
750            *parent_pivot_block_hash,
751            parent_epoch_num,
752            &blocks,
753            opts,
754            tx_hash,
755        )
756    }
757
758    pub fn collect_blocks_geth_trace(
759        &self, epoch_id: H256, epoch_num: u64, blocks: &Vec<Arc<Block>>,
760        opts: GethDebugTracingOptions, tx_hash: Option<H256>,
761    ) -> CoreResult<Vec<GethTraceWithHash>> {
762        self.executor.collect_blocks_geth_trace(
763            epoch_id, epoch_num, blocks, opts, tx_hash,
764        )
765    }
766
767    fn earliest_epoch_for_log_filter(&self) -> u64 {
768        max(
769            self.data_man.earliest_epoch_with_block_body(),
770            self.data_man.earliest_epoch_with_execution_result(),
771        )
772    }
773
774    fn filter_traces_by_epochs(
775        &self, filter: &TraceFilter,
776    ) -> Result<Vec<LocalizedTrace>, FilterError> {
777        let epochs_and_pivot_hash = {
778            let inner = self.inner.read();
779            let mut epochs_and_pivot_hash = Vec::new();
780            for epoch_number in self.get_trace_filter_epoch_range(filter)? {
781                epochs_and_pivot_hash.push((
782                    epoch_number,
783                    inner.get_pivot_hash_from_epoch_number(epoch_number)?,
784                ))
785            }
786            epochs_and_pivot_hash
787        };
788
789        let block_traces = epochs_and_pivot_hash
790            .into_par_iter()
791            .map(|(epoch_number, assumed_pivot)| {
792                self.collect_traces_single_epoch(
793                    filter,
794                    epoch_number,
795                    assumed_pivot,
796                )
797            })
798            .collect::<Result<Vec<Vec<_>>, FilterError>>()?
799            .into_iter()
800            .flatten()
801            .collect();
802
803        self.filter_block_traces(filter, block_traces)
804    }
805
806    /// Return `Vec<(pivot_hash, block_hash, block_traces, block_txs)>`
807    pub fn collect_traces_single_epoch(
808        &self, filter: &TraceFilter, epoch_number: u64, assumed_pivot: H256,
809    ) -> Result<
810        Vec<(H256, H256, BlockExecTraces, Vec<Arc<SignedTransaction>>)>,
811        FilterError,
812    > {
813        if filter.space == Space::Ethereum {
814            let phantom_block = self
815                .get_phantom_block_by_number(
816                    EpochNumber::Number(epoch_number),
817                    Some(assumed_pivot),
818                    true, /* include_traces */
819                )?
820                .ok_or(FilterError::UnknownBlock {
821                    hash: assumed_pivot,
822                })?;
823
824            return Ok(vec![(
825                assumed_pivot,
826                assumed_pivot,
827                BlockExecTraces(phantom_block.traces),
828                phantom_block.transactions,
829            )]);
830        }
831
832        let block_hashes = self
833            .inner
834            .read_recursive()
835            .block_hashes_by_epoch(epoch_number)?;
836        if block_hashes.last().expect("epoch set not empty") != &assumed_pivot {
837            bail!(FilterError::PivotChainReorg {
838                epoch: epoch_number,
839                from: assumed_pivot,
840                to: *block_hashes.last().unwrap()
841            })
842        }
843        let mut traces = Vec::new();
844        for block_hash in block_hashes {
845            let block = self
846                .data_man
847                .block_by_hash(&block_hash, false /* update_cache */)
848                .ok_or(FilterError::BlockAlreadyPruned { block_hash })?;
849
850            traces.push(
851                self.data_man
852                    .block_traces_by_hash_with_epoch(
853                        &block_hash,
854                        &assumed_pivot,
855                        false,
856                        true,
857                    )
858                    .map(|trace| {
859                        (
860                            assumed_pivot,
861                            block_hash,
862                            trace,
863                            block.transactions.clone(),
864                        )
865                    })
866                    .ok_or(FilterError::UnknownBlock { hash: block_hash })?,
867            );
868        }
869        Ok(traces)
870    }
871
872    // TODO: We can apply some early return logic based on `filter.count`.
873    fn filter_traces_by_block_hashes(
874        &self, filter: &TraceFilter, block_hashes: Vec<H256>,
875    ) -> Result<Vec<LocalizedTrace>, FilterError> {
876        let block_traces = block_hashes
877            .into_par_iter()
878            .map(|h| {
879                let block = self
880                    .data_man
881                    .block_by_hash(&h, false /* update_cache */)
882                    .ok_or(FilterError::BlockAlreadyPruned { block_hash: h })?;
883
884                self.data_man
885                    .block_traces_by_hash(&h)
886                    .map(|DataVersionTuple(pivot_hash, trace)| {
887                        (pivot_hash, h, trace, block.transactions.clone())
888                    })
889                    .ok_or_else(|| FilterError::BlockNotExecutedYet {
890                        block_hash: h,
891                    })
892            })
893            .collect::<Result<Vec<_>, FilterError>>()?;
894        self.filter_block_traces(filter, block_traces)
895    }
896
897    /// `block_traces` is a list of tuple `(pivot_hash, block_hash,
898    /// block_trace)`.
899    pub fn filter_block_traces(
900        &self, filter: &TraceFilter,
901        block_traces: Vec<(
902            H256,
903            H256,
904            BlockExecTraces,
905            Vec<Arc<SignedTransaction>>,
906        )>,
907    ) -> Result<Vec<LocalizedTrace>, FilterError> {
908        let mut traces = Vec::new();
909        for (pivot_hash, block_hash, block_trace, block_txs) in block_traces {
910            if block_txs.len() != block_trace.0.len() {
911                bail!(format!(
912                    "tx list and trace length unmatch: block_hash={:?}",
913                    block_hash
914                ));
915            }
916            let epoch_number = self
917                .data_man
918                .block_height_by_hash(&pivot_hash)
919                .ok_or_else(|| {
920                    FilterError::Custom(
921                        format!(
922                            "pivot block header missing, hash={:?}",
923                            pivot_hash
924                        )
925                        .into(),
926                    )
927                })?;
928            let mut rpc_tx_index = 0;
929            for (tx_pos, tx_trace) in block_trace.0.into_iter().enumerate() {
930                if filter.space == Space::Native
931                    && block_txs[tx_pos].space() == Space::Ethereum
932                {
933                    continue;
934                }
935                for trace in filter
936                    .filter_traces(tx_trace)
937                    .map_err(|e| FilterError::Custom(e))?
938                {
939                    if !filter
940                        .action_types
941                        .matches(&ActionType::from(&trace.action))
942                    {
943                        continue;
944                    }
945                    let trace = LocalizedTrace {
946                        action: trace.action,
947                        valid: trace.valid,
948                        epoch_hash: pivot_hash,
949                        epoch_number: epoch_number.into(),
950                        block_hash,
951                        transaction_position: rpc_tx_index.into(),
952                        transaction_hash: block_txs[tx_pos].hash(),
953                    };
954                    traces.push(trace);
955                }
956                rpc_tx_index += 1;
957            }
958        }
959        Ok(traces)
960    }
961}