cfxcore/light_protocol/
query_service.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::{
6    consensus::SharedConsensusGraph,
7    errors::{account_result_to_rpc_result, Error},
8    light_protocol::{
9        common::{FullPeerFilter, LedgerInfo},
10        handler::sync::TxInfoValidated,
11        message::msgid,
12        Error as LightError, Handler as LightHandler, LightNodeConfiguration,
13        LIGHT_PROTOCOL_ID, LIGHT_PROTOCOL_VERSION,
14    },
15    sync::SynchronizationGraph,
16    Notifications,
17};
18use cfx_addr::Network;
19use cfx_executor::state::COMMISSION_PRIVILEGE_SPECIAL_KEY;
20use cfx_parameters::{
21    consensus::DEFERRED_STATE_EPOCH_COUNT,
22    internal_contract_addresses::SPONSOR_WHITELIST_CONTROL_CONTRACT_ADDRESS,
23    light::{
24        GAS_PRICE_BATCH_SIZE, GAS_PRICE_BLOCK_SAMPLE_SIZE,
25        GAS_PRICE_TRANSACTION_SAMPLE_SIZE, LOG_FILTERING_LOOKAHEAD,
26        MAX_POLL_TIME, TRANSACTION_COUNT_PER_BLOCK_WATER_LINE_LOW,
27        TRANSACTION_COUNT_PER_BLOCK_WATER_LINE_MEDIUM,
28    },
29};
30use cfx_statedb::global_params::{self, GlobalParamKey};
31use cfx_types::{
32    address_util::AddressUtil, AllChainID, BigEndianHash, Bloom, H160, H256,
33    KECCAK_EMPTY_BLOOM, U256,
34};
35use futures::{
36    future::{self, Either},
37    stream, try_join, StreamExt, TryStreamExt,
38};
39use network::{service::ProtocolVersion, NetworkContext, NetworkService};
40use primitives::{
41    filter::{FilterError, LogFilter},
42    log_entry::{LocalizedLogEntry, LogEntry},
43    Account, Block, BlockReceipts, CodeInfo, DepositList, EpochNumber, Receipt,
44    SignedTransaction, StorageKey, StorageRoot, StorageValue, TransactionIndex,
45    VoteStakeList,
46};
47use rlp::Rlp;
48use std::{collections::BTreeSet, future::Future, sync::Arc, time::Duration};
49use tokio::time::timeout;
50
51pub struct TxInfo {
52    pub tx: SignedTransaction,
53    pub maybe_block_number: Option<u64>,
54    pub receipt: Receipt,
55    pub tx_index: TransactionIndex,
56    pub maybe_epoch: Option<u64>,
57    pub maybe_state_root: Option<H256>,
58    pub prior_gas_used: U256,
59}
60
61async fn with_timeout<T>(
62    d: Duration, msg: String,
63    fut: impl Future<Output = Result<T, LightError>> + Send + Sync,
64) -> Result<T, LightError> {
65    let with_timeout = timeout(d, fut);
66    // set error message
67    with_timeout
68        .await
69        .map_err(|_| LightError::from(LightError::Timeout(msg)))?
70}
71
72pub struct QueryService {
73    protocol_version: ProtocolVersion,
74
75    // shared consensus graph
76    consensus: SharedConsensusGraph,
77
78    // light protocol handler
79    handler: Arc<LightHandler>,
80
81    // helper API for retrieving ledger information
82    ledger: LedgerInfo,
83
84    // shared network service
85    network: Arc<NetworkService>,
86}
87
88impl QueryService {
89    pub fn new(
90        consensus: SharedConsensusGraph, graph: Arc<SynchronizationGraph>,
91        network: Arc<NetworkService>, throttling_config_file: Option<String>,
92        notifications: Arc<Notifications>, config: LightNodeConfiguration,
93    ) -> Self {
94        let handler = Arc::new(LightHandler::new(
95            consensus.clone(),
96            graph,
97            throttling_config_file,
98            notifications,
99            config,
100        ));
101        let ledger = LedgerInfo::new(consensus.clone());
102
103        QueryService {
104            protocol_version: LIGHT_PROTOCOL_VERSION,
105            consensus,
106            handler,
107            ledger,
108            network,
109        }
110    }
111
112    pub fn register(&self) -> Result<(), String> {
113        self.network
114            .register_protocol(
115                self.handler.clone(),
116                LIGHT_PROTOCOL_ID,
117                self.protocol_version,
118            )
119            .map_err(|e| {
120                format!("failed to register protocol QueryService: {:?}", e)
121            })
122    }
123
124    fn with_io<T>(&self, f: impl FnOnce(&dyn NetworkContext) -> T) -> T {
125        self.network
126            .with_context(self.handler.clone(), LIGHT_PROTOCOL_ID, |io| f(io))
127            .expect("Unable to access network service")
128    }
129
130    async fn retrieve_state_entry_raw(
131        &self, epoch: u64, key: Vec<u8>,
132    ) -> Result<Option<Vec<u8>>, LightError> {
133        trace!(
134            "retrieve_state_entry_raw epoch = {}, key = {:?}",
135            epoch,
136            key
137        );
138
139        with_timeout(
140            *MAX_POLL_TIME,
141            format!("Timeout while retrieving state entry for epoch {:?} with key {:?}", epoch, key),
142            self.with_io(|io| self.handler.state_entries.request_now(io, epoch, key)),
143        )
144        .await
145    }
146
147    async fn retrieve_state_entry<T: rlp::Decodable>(
148        &self, epoch: u64, key: Vec<u8>,
149    ) -> Result<Option<T>, LightError> {
150        match self.retrieve_state_entry_raw(epoch, key).await? {
151            None => Ok(None),
152            Some(raw) => {
153                let decoded = rlp::decode::<T>(raw.as_ref())
154                    .map_err(|e| format!("{}", e))?;
155                Ok(Some(decoded))
156            }
157        }
158    }
159
160    async fn retrieve_storage_root(
161        &self, epoch: u64, address: H160,
162    ) -> Result<StorageRoot, LightError> {
163        trace!(
164            "retrieve_storage_root epoch = {}, address = {}",
165            epoch,
166            address
167        );
168
169        with_timeout(
170            *MAX_POLL_TIME,
171            format!("Timeout while retrieving storage root for address {:?} in epoch {:?}", address, epoch),
172            self.with_io(|io| self.handler.storage_roots.request_now(io, epoch, address)),
173        )
174        .await
175    }
176
177    async fn retrieve_bloom(
178        &self, epoch: u64,
179    ) -> Result<(u64, Bloom), LightError> {
180        trace!("retrieve_bloom epoch = {}", epoch);
181
182        with_timeout(
183            *MAX_POLL_TIME,
184            format!("Timeout while retrieving bloom for epoch {:?}", epoch),
185            self.handler.blooms.request(epoch),
186        )
187        .await
188        .map(|bloom| (epoch, bloom))
189    }
190
191    async fn retrieve_receipts(
192        &self, epoch: u64,
193    ) -> Result<(u64, Vec<BlockReceipts>), LightError> {
194        trace!("retrieve_receipts epoch = {}", epoch);
195
196        with_timeout(
197            *MAX_POLL_TIME,
198            format!("Timeout while retrieving receipts for epoch {:?}", epoch),
199            self.handler.receipts.request(epoch),
200        )
201        .await
202        .map(|receipts| (epoch, receipts))
203    }
204
205    pub async fn retrieve_block_txs(
206        &self, hash: H256,
207    ) -> Result<Vec<SignedTransaction>, LightError> {
208        trace!("retrieve_block_txs hash = {:?}", hash);
209
210        with_timeout(
211            *MAX_POLL_TIME,
212            format!("Timeout while retrieving block txs for block {:?}", hash),
213            self.handler.block_txs.request(hash),
214        )
215        .await
216    }
217
218    async fn retrieve_block_txs_for_log(
219        &self, log: LocalizedLogEntry,
220    ) -> Result<(LocalizedLogEntry, Vec<SignedTransaction>), LightError> {
221        trace!("retrieve_block_txs_for_log log = {:?}", log);
222
223        self.retrieve_block_txs(log.block_hash)
224            .await
225            .map(|block_txs| (log, block_txs))
226    }
227
228    pub async fn retrieve_block(
229        &self, hash: H256,
230    ) -> Result<Option<Block>, LightError> {
231        let genesis = self.consensus.data_manager().true_genesis.clone();
232
233        if hash == genesis.hash() {
234            return Ok(Some((*genesis).clone()));
235        }
236
237        let maybe_block_header =
238            self.consensus.data_manager().block_header_by_hash(&hash);
239
240        let block_header = match maybe_block_header {
241            None => return Ok(None),
242            Some(h) => (*h).clone(),
243        };
244
245        let transactions = self
246            .retrieve_block_txs(hash)
247            .await?
248            .into_iter()
249            .map(Arc::new)
250            .collect();
251
252        Ok(Some(Block::new(block_header, transactions)))
253    }
254
255    async fn retrieve_tx_info(
256        &self, hash: H256,
257    ) -> Result<TxInfoValidated, LightError> {
258        trace!("retrieve_tx_info hash = {:?}", hash);
259
260        with_timeout(
261            *MAX_POLL_TIME,
262            format!("Timeout while retrieving tx info for tx {:?}", hash),
263            self.with_io(|io| self.handler.tx_infos.request_now(io, hash)),
264        )
265        .await
266    }
267
268    pub async fn gas_price(&self) -> Result<Option<U256>, LightError> {
269        // collect block hashes for gas price sample
270        let mut epoch = self.consensus.best_epoch_number();
271        let mut hashes = vec![];
272        let mut total_transaction_count_in_processed_blocks = 0;
273        let mut processed_block_count = 0;
274
275        let inner = self.consensus.inner.clone();
276
277        loop {
278            if hashes.len() >= GAS_PRICE_BLOCK_SAMPLE_SIZE || epoch == 0 {
279                break;
280            }
281
282            let mut epoch_hashes = inner
283                .read()
284                .block_hashes_by_epoch(epoch)
285                .map_err(|e| e.to_string())?;
286            epoch_hashes.reverse();
287
288            let missing = GAS_PRICE_BLOCK_SAMPLE_SIZE - hashes.len();
289            hashes.extend(epoch_hashes.into_iter().take(missing));
290
291            epoch -= 1;
292        }
293
294        // retrieve blocks in batches
295        let mut stream = stream::iter(hashes)
296            .map(|h| async move {
297                self.retrieve_block(h).await.map(move |b| (h, b))
298            })
299            .buffered(GAS_PRICE_BATCH_SIZE);
300
301        // collect gas price sample
302        let mut prices = vec![];
303
304        while let Some(item) = stream.try_next().await? {
305            let block = match item {
306                (_, Some(b)) => b,
307                (hash, None) => {
308                    // `retrieve_block` will only return None if we do not have
309                    // the corresponding header, which should not happen in this
310                    // case.
311                    bail!(LightError::InternalError(format!(
312                        "Block {:?} not found during gas price sampling",
313                        hash
314                    )));
315                }
316            };
317
318            trace!("sampling gas prices from block {:?}", block.hash());
319            processed_block_count += 1;
320            total_transaction_count_in_processed_blocks +=
321                block.transactions.len();
322
323            for tx in block.transactions.iter() {
324                prices.push(tx.gas_price().clone());
325
326                if prices.len() == GAS_PRICE_TRANSACTION_SAMPLE_SIZE {
327                    break;
328                }
329            }
330        }
331
332        trace!("gas price sample: {:?}", prices);
333
334        let average_transaction_count_per_block = if processed_block_count != 0
335        {
336            total_transaction_count_in_processed_blocks / processed_block_count
337        } else {
338            0
339        };
340
341        if prices.is_empty() {
342            Ok(Some(U256::from(1)))
343        } else {
344            prices.sort();
345            if average_transaction_count_per_block
346                < TRANSACTION_COUNT_PER_BLOCK_WATER_LINE_LOW
347            {
348                Ok(Some(U256::from(1)))
349            } else if average_transaction_count_per_block
350                < TRANSACTION_COUNT_PER_BLOCK_WATER_LINE_MEDIUM
351            {
352                Ok(Some(prices[prices.len() / 8]))
353            } else {
354                Ok(Some(prices[prices.len() / 2]))
355            }
356        }
357    }
358
359    fn account_key(address: &H160) -> Vec<u8> {
360        StorageKey::new_account_key(&address)
361            .with_native_space()
362            .to_key_bytes()
363    }
364
365    fn code_key(address: &H160, code_hash: &H256) -> Vec<u8> {
366        StorageKey::new_code_key(&address, &code_hash)
367            .with_native_space()
368            .to_key_bytes()
369    }
370
371    fn storage_key(address: &H160, position: &[u8]) -> Vec<u8> {
372        StorageKey::new_storage_key(&address, &position)
373            .with_native_space()
374            .to_key_bytes()
375    }
376
377    fn deposit_list_key(address: &H160) -> Vec<u8> {
378        StorageKey::new_deposit_list_key(address)
379            .with_native_space()
380            .to_key_bytes()
381    }
382
383    fn vote_list_key(address: &H160) -> Vec<u8> {
384        StorageKey::new_vote_list_key(address)
385            .with_native_space()
386            .to_key_bytes()
387    }
388
389    pub async fn get_account(
390        &self, epoch: EpochNumber, address: H160,
391    ) -> Result<Option<Account>, LightError> {
392        debug!("get_account epoch={:?} address={:?}", epoch, address);
393
394        let epoch = self.get_height_from_epoch_number(epoch)?;
395        let key = Self::account_key(&address);
396
397        match self.retrieve_state_entry_raw(epoch, key).await? {
398            None => Ok(None),
399            Some(rlp) => {
400                Ok(Some(Account::new_from_rlp(address, &Rlp::new(&rlp))?))
401            }
402        }
403    }
404
405    pub async fn get_deposit_list(
406        &self, epoch: EpochNumber, address: H160,
407    ) -> Result<Option<DepositList>, LightError> {
408        let epoch = self.get_height_from_epoch_number(epoch)?;
409        let key = Self::deposit_list_key(&address);
410        self.retrieve_state_entry::<DepositList>(epoch, key).await
411    }
412
413    pub async fn get_vote_list(
414        &self, epoch: EpochNumber, address: H160,
415    ) -> Result<Option<VoteStakeList>, LightError> {
416        let epoch = self.get_height_from_epoch_number(epoch)?;
417        let key = Self::vote_list_key(&address);
418        self.retrieve_state_entry::<VoteStakeList>(epoch, key).await
419    }
420
421    pub async fn get_code(
422        &self, epoch: EpochNumber, address: H160,
423    ) -> Result<Option<Vec<u8>>, Error> {
424        debug!("get_code epoch={:?} address={:?}", epoch, address);
425
426        // do not query peers for non-contract addresses
427        if !address.is_contract_address() && !address.is_builtin_address() {
428            return Ok(None);
429        }
430
431        let epoch = self.get_height_from_epoch_number(epoch)?;
432        let key = Self::account_key(&address);
433
434        let code_hash = match self.retrieve_state_entry_raw(epoch, key).await {
435            Err(e) => bail!(e),
436            Ok(None) => return Ok(None),
437            Ok(Some(rlp)) => {
438                account_result_to_rpc_result(
439                    "address",
440                    Account::new_from_rlp(address, &Rlp::new(&rlp)),
441                )?
442                .code_hash
443            }
444        };
445
446        let key = Self::code_key(&address, &code_hash);
447
448        match self.retrieve_state_entry::<CodeInfo>(epoch, key).await? {
449            None => {
450                // this should not happen
451                // if the corresponding state becomes unavailable between the
452                // two requests, we will fail with timeout instead
453                error!("Account {:?} found but code {:?} does not exist (epoch={:?})",  address, code_hash, epoch);
454                Err(Error::Custom(format!(
455                    "Unable to retrieve code: internal error"
456                )))
457            }
458            Some(info) => Ok(Some((*info.code).clone())),
459        }
460    }
461
462    pub async fn get_storage(
463        &self, epoch: EpochNumber, address: H160, position: H256,
464    ) -> Result<Option<H256>, LightError> {
465        debug!(
466            "get_storage epoch={:?} address={:?} position={:?}",
467            epoch, address, position
468        );
469
470        let epoch = self.get_height_from_epoch_number(epoch)?;
471        let key = Self::storage_key(&address, &position.0);
472
473        match self.retrieve_state_entry::<StorageValue>(epoch, key).await {
474            Err(e) => Err(e),
475            Ok(None) => Ok(None),
476            Ok(Some(entry)) => Ok(Some(H256::from_uint(&entry.value))),
477        }
478    }
479
480    pub async fn is_user_sponsored(
481        &self, epoch: EpochNumber, contract: H160, user: H160,
482    ) -> Result<bool, LightError> {
483        debug!(
484            "is_user_sponsored epoch={:?} contract={:?} user={:?}",
485            epoch, contract, user
486        );
487
488        let epoch = self.get_height_from_epoch_number(epoch)?;
489
490        // check if sponsorship is enabled for all users
491        let all_sponsored = {
492            let mut pos = Vec::with_capacity(H160::len_bytes() * 2);
493            pos.extend_from_slice(contract.as_bytes());
494            pos.extend_from_slice(COMMISSION_PRIVILEGE_SPECIAL_KEY.as_bytes());
495
496            let key = Self::storage_key(
497                &SPONSOR_WHITELIST_CONTROL_CONTRACT_ADDRESS,
498                &pos,
499            );
500
501            self.retrieve_state_entry::<StorageValue>(epoch, key)
502        };
503
504        // check if sponsorship is enabled for this specific user
505        let user_sponsored = {
506            let mut pos = Vec::with_capacity(H160::len_bytes() * 2);
507            pos.extend_from_slice(contract.as_bytes());
508            pos.extend_from_slice(user.as_bytes());
509
510            let key = Self::storage_key(
511                &SPONSOR_WHITELIST_CONTROL_CONTRACT_ADDRESS,
512                &pos,
513            );
514
515            self.retrieve_state_entry::<StorageValue>(epoch, key)
516        };
517
518        // execute in parallel
519        let (all_sponsored, user_sponsored) =
520            future::join(all_sponsored, user_sponsored).await;
521
522        if matches!(all_sponsored?, Some(n) if !n.value.is_zero()) {
523            return Ok(true);
524        }
525
526        if matches!(user_sponsored?, Some(n) if !n.value.is_zero()) {
527            return Ok(true);
528        }
529
530        Ok(false)
531    }
532
533    pub async fn get_storage_root(
534        &self, epoch: EpochNumber, address: H160,
535    ) -> Result<StorageRoot, LightError> {
536        debug!("get_storage_root epoch={:?} address={:?}", epoch, address);
537
538        let epoch = self.get_height_from_epoch_number(epoch)?;
539        self.retrieve_storage_root(epoch, address).await
540    }
541
542    pub async fn get_interest_rate(
543        &self, epoch: EpochNumber,
544    ) -> Result<U256, LightError> {
545        debug!("get_interest_rate epoch={:?}", epoch);
546
547        let epoch = self.get_height_from_epoch_number(epoch)?;
548
549        let key = global_params::InterestRate::STORAGE_KEY.to_key_bytes();
550
551        self.retrieve_state_entry::<U256>(epoch, key)
552            .await
553            .map(|opt| opt.unwrap_or_default())
554    }
555
556    pub async fn get_accumulate_interest_rate(
557        &self, epoch: EpochNumber,
558    ) -> Result<U256, LightError> {
559        debug!("get_accumulate_interest_rate epoch={:?}", epoch);
560
561        let epoch = self.get_height_from_epoch_number(epoch)?;
562
563        let key =
564            global_params::AccumulateInterestRate::STORAGE_KEY.to_key_bytes();
565
566        self.retrieve_state_entry::<U256>(epoch, key)
567            .await
568            .map(|opt| opt.unwrap_or_default())
569    }
570
571    pub async fn get_pos_economics(
572        &self, epoch: EpochNumber,
573    ) -> Result<[U256; 3], LightError> {
574        debug!("get_PoSEconomics epoch={:?}", epoch);
575
576        let epoch = self.get_height_from_epoch_number(epoch)?;
577
578        let key1 = global_params::TotalPosStaking::STORAGE_KEY.to_key_bytes();
579        let key2 =
580            global_params::DistributablePoSInterest::STORAGE_KEY.to_key_bytes();
581        let key3 =
582            global_params::LastDistributeBlock::STORAGE_KEY.to_key_bytes();
583
584        let total_pos_staking = try_join!(
585            self.retrieve_state_entry::<U256>(epoch, key1),
586            self.retrieve_state_entry::<U256>(epoch, key2),
587            self.retrieve_state_entry::<U256>(epoch, key3)
588        )?;
589        Ok([
590            total_pos_staking.0.unwrap_or_default(),
591            total_pos_staking.1.unwrap_or_default(),
592            total_pos_staking.2.unwrap_or_default(),
593        ])
594    }
595
596    pub async fn get_tx_info(&self, hash: H256) -> Result<TxInfo, LightError> {
597        debug!("get_tx_info hash={:?}", hash);
598
599        // Note: if a transaction does not exist, we fail with timeout, as
600        //       peers cannot provide non-existence proofs for transactions.
601        // FIXME: is there a better way?
602        let TxInfoValidated {
603            tx,
604            receipt,
605            tx_index,
606            prior_gas_used,
607        } = self.retrieve_tx_info(hash).await?;
608
609        let block_hash = tx_index.block_hash;
610        let maybe_epoch = self.consensus.get_block_epoch_number(&block_hash);
611        let maybe_block_number =
612            self.consensus.get_block_number(&block_hash)?;
613        let maybe_state_root = maybe_epoch
614            .and_then(|e| self.handler.witnesses.root_hashes_of(e).ok())
615            .map(|roots| roots.state_root_hash);
616
617        Ok(TxInfo {
618            tx,
619            maybe_block_number,
620            receipt,
621            tx_index,
622            maybe_epoch,
623            maybe_state_root,
624            prior_gas_used,
625        })
626    }
627
628    /// Relay raw transaction to all peers.
629    // TODO(thegaram): consider returning TxStatus instead of bool,
630    // e.g. Failed, Sent/Pending, Confirmed, etc.
631    pub fn send_raw_tx(&self, raw: Vec<u8>) -> bool {
632        debug!("send_raw_tx raw={:?}", raw);
633
634        let peers = FullPeerFilter::new(msgid::SEND_RAW_TX)
635            .select_all(self.handler.peers.clone());
636
637        match self.network.with_context(
638            self.handler.clone(),
639            LIGHT_PROTOCOL_ID,
640            |io| {
641                let mut success = false;
642
643                for peer in peers {
644                    // relay to peer
645                    let res = self.handler.send_raw_tx(io, &peer, raw.clone());
646
647                    // check error
648                    match res {
649                        Err(e) => {
650                            warn!("Failed to relay to peer={:?}: {:?}", peer, e)
651                        }
652                        Ok(_) => {
653                            debug!("Tx relay to peer {:?} successful", peer);
654                            success = true;
655                        }
656                    }
657                }
658
659                success
660            },
661        ) {
662            Err(e) => unreachable!("{}", e),
663            Ok(success) => success,
664        }
665    }
666
667    pub async fn get_tx(
668        &self, hash: H256,
669    ) -> Result<SignedTransaction, LightError> {
670        debug!("get_tx hash={:?}", hash);
671
672        with_timeout(
673            *MAX_POLL_TIME,
674            format!(
675                "Timeout while retrieving transaction with hash {:?}",
676                hash
677            ),
678            self.with_io(|io| self.handler.txs.request_now(io, hash)),
679        )
680        .await
681    }
682
683    /// Apply filter to all logs within a receipt.
684    /// NOTE: `log.transaction_hash` is not known at this point,
685    /// so this field has to be filled later on.
686    fn filter_receipt_logs(
687        epoch: u64, block_hash: H256, block_timestamp: Option<u64>,
688        transaction_index: usize, num_logs_remaining: &mut usize,
689        mut logs: Vec<LogEntry>, filter: LogFilter,
690    ) -> impl Iterator<Item = LocalizedLogEntry> {
691        let num_logs = logs.len();
692
693        let log_base_index = *num_logs_remaining;
694        *num_logs_remaining -= num_logs;
695
696        // process logs in reverse order
697        logs.reverse();
698
699        logs.into_iter()
700            .enumerate()
701            .filter(move |(_, entry)| filter.matches(&entry))
702            .map(move |(ii, entry)| LocalizedLogEntry {
703                block_hash,
704                epoch_number: epoch,
705                block_timestamp,
706                entry,
707                log_index: log_base_index - ii - 1,
708                transaction_hash: KECCAK_EMPTY_BLOOM, // will fill in later
709                transaction_index,
710                transaction_log_index: num_logs - ii - 1,
711            })
712    }
713
714    /// Apply filter to all receipts within a block.
715    fn filter_block_receipts(
716        epoch: u64, hash: H256, block_timestamp: Option<u64>,
717        block_receipts: BlockReceipts, filter: LogFilter,
718    ) -> impl Iterator<Item = LocalizedLogEntry> {
719        let mut receipts = block_receipts.receipts;
720        // number of receipts in this block
721        let num_receipts = receipts.len();
722
723        // number of logs in this block
724        let mut remaining = receipts.iter().fold(0, |s, r| s + r.logs.len());
725
726        // process block receipts in reverse order
727        receipts.reverse();
728
729        receipts.into_iter().map(|r| r.logs).enumerate().flat_map(
730            move |(ii, logs)| {
731                debug!("block_hash {:?} logs = {:?}", hash, logs);
732                Self::filter_receipt_logs(
733                    epoch,
734                    hash,
735                    block_timestamp,
736                    num_receipts - ii - 1,
737                    &mut remaining,
738                    logs,
739                    filter.clone(),
740                )
741            },
742        )
743    }
744
745    /// Apply filter to all receipts within an epoch.
746    fn filter_epoch_receipts(
747        &self, epoch: u64, mut receipts: Vec<BlockReceipts>, filter: LogFilter,
748    ) -> Result<impl Iterator<Item = LocalizedLogEntry>, String> {
749        // get epoch blocks in execution order
750        let mut hashes = self
751            .ledger
752            .block_hashes_in(epoch)
753            .map_err(|e| format!("{}", e))?;
754
755        receipts.reverse();
756        hashes.reverse();
757
758        let timestamps = hashes
759            .iter()
760            .map(|h| self.ledger.header(*h))
761            .filter(|s| s.is_ok())
762            .map(|h| h.unwrap().timestamp())
763            .collect::<Vec<_>>();
764
765        if timestamps.len() != hashes.len() {
766            return Err(format!(
767                "Unable to retrieve all block headers in epoch {:?} for log filtering",
768                epoch
769            ));
770        }
771
772        let matching = itertools::izip!(receipts, hashes, timestamps).flat_map(
773            move |(receipts, hash, timestamp)| {
774                trace!("block_hash {:?} receipts = {:?}", hash, receipts);
775                Self::filter_block_receipts(
776                    epoch,
777                    hash,
778                    Some(timestamp),
779                    receipts,
780                    filter.clone(),
781                )
782            },
783        );
784
785        Ok(matching)
786    }
787
788    pub fn get_latest_verifiable_chain_id(
789        &self,
790    ) -> Result<AllChainID, FilterError> {
791        let epoch_number = self.get_latest_verifiable_epoch_number()?;
792        Ok(self
793            .consensus
794            .config()
795            .chain_id
796            .read()
797            .get_chain_id(epoch_number))
798    }
799
800    pub fn get_latest_verifiable_epoch_number(
801        &self,
802    ) -> Result<u64, FilterError> {
803        // find highest epoch that we are able to verify based on witness info
804        let latest_verified = self.handler.witnesses.latest_verified();
805
806        let latest_verifiable = match latest_verified {
807            n if n >= DEFERRED_STATE_EPOCH_COUNT => {
808                n - DEFERRED_STATE_EPOCH_COUNT
809            }
810            _ => {
811                return Err(FilterError::UnableToVerify {
812                    epoch: 0,
813                    latest_verifiable: 0,
814                });
815            }
816        };
817
818        trace!(
819            "get_latest_verifiable_epoch_number latest_verifiable = {}",
820            latest_verifiable
821        );
822        Ok(latest_verifiable)
823    }
824
825    pub fn get_height_from_epoch_number(
826        &self, epoch: EpochNumber,
827    ) -> Result<u64, FilterError> {
828        let latest_verifiable = self.get_latest_verifiable_epoch_number()?;
829
830        trace!(
831            "get_height_from_epoch_number epoch = {:?}, latest_verifiable = {}",
832            epoch,
833            latest_verifiable
834        );
835
836        match epoch {
837            EpochNumber::Earliest => Ok(0),
838            EpochNumber::LatestCheckpoint => {
839                Ok(self.consensus.latest_checkpoint_epoch_number())
840            }
841            EpochNumber::LatestConfirmed => {
842                Ok(self.consensus.latest_confirmed_epoch_number())
843            }
844            EpochNumber::LatestMined => Ok(latest_verifiable),
845            EpochNumber::LatestState => Ok(latest_verifiable),
846            EpochNumber::LatestFinalized => {
847                Ok(self.consensus.latest_finalized_epoch_number())
848            }
849            EpochNumber::Number(n) if n <= latest_verifiable => Ok(n),
850            EpochNumber::Number(n) => Err(FilterError::UnableToVerify {
851                epoch: n,
852                latest_verifiable,
853            }),
854        }
855    }
856
857    fn get_filter_epochs(
858        &self, filter: &LogFilter,
859    ) -> Result<(Vec<u64>, Box<dyn Fn(H256) -> bool + Send + Sync>), FilterError>
860    {
861        match &filter {
862            LogFilter::EpochLogFilter {
863                from_epoch,
864                to_epoch,
865                ..
866            } => {
867                let from_epoch =
868                    self.get_height_from_epoch_number(from_epoch.clone())?;
869                let to_epoch =
870                    self.get_height_from_epoch_number(to_epoch.clone())?;
871
872                if from_epoch > to_epoch {
873                    return Err(FilterError::InvalidEpochNumber {
874                        from_epoch,
875                        to_epoch,
876                    });
877                }
878
879                let epochs = (from_epoch..(to_epoch + 1)).rev().collect();
880                let block_filter = Box::new(|_| true);
881
882                Ok((epochs, block_filter))
883            }
884            LogFilter::BlockHashLogFilter { block_hashes, .. } => {
885                // we use BTreeSet to make lookup efficient
886                let hashes: BTreeSet<_> =
887                    block_hashes.iter().cloned().collect();
888
889                // we use BTreeSet to ensure order and uniqueness
890                let mut epochs = BTreeSet::new();
891
892                for hash in &hashes {
893                    match self.consensus.get_block_epoch_number(&hash) {
894                        Some(epoch) => epochs.insert(epoch),
895                        None => {
896                            return Err(FilterError::UnknownBlock {
897                                hash: *hash,
898                            })
899                        }
900                    };
901                }
902
903                let epochs = epochs.into_iter().rev().collect();
904                let block_filter = Box::new(move |hash| hashes.contains(&hash));
905
906                Ok((epochs, block_filter))
907            }
908            _ => bail!(FilterError::Custom(
909                "Light nodes do not support log filtering using block numbers"
910                    .into(),
911            )),
912        }
913    }
914
915    pub async fn get_logs(
916        &self, filter: LogFilter,
917    ) -> Result<Vec<LocalizedLogEntry>, LightError> {
918        debug!("get_logs filter = {:?}", filter);
919
920        // find epochs and blocks to match against
921        let (epochs, block_filter) = self
922            .get_filter_epochs(&filter)
923            .map_err(|e| format!("{}", e))?;
924
925        debug!("Executing filter on epochs {:?}", epochs);
926
927        // construct blooms for matching epochs
928        let blooms = filter.bloom_possibilities();
929
930        // The returned future will outlive this method (`get_logs`). Thus, we
931        // need to move `blooms` into `bloom_match` and `bloom_match` into the
932        // future.
933        let bloom_match = move |block_log_bloom: &Bloom| {
934            blooms
935                .iter()
936                .any(|bloom| block_log_bloom.contains_bloom(bloom))
937        };
938
939        // construct a stream object for log filtering
940        // we first retrieve the epoch blooms and try to match against them. for
941        // matching epochs, we retrieve the corresponding receipts and find the
942        // matching entries. finally, for each matching entry, we retrieve the
943        // block transactions so that we can add the tx hash. each of these is
944        // verified in the corresponding sync handler.
945
946        // NOTE: in the type annotations below, we use these conventions:
947        //    Stream<T> = futures::stream::Stream<Item = T>
948        // TryStream<T> = futures::stream::TryStream<Ok = T, Error = String>
949        // TryFuture<T> = futures::future::TryFuture<Ok = T, Error = String>
950        let stream =
951            // process epochs one by one
952            stream::iter(epochs)
953            // --> Stream<u64>
954
955            // retrieve blooms
956            .map(|epoch| self.retrieve_bloom(epoch))
957            // --> Stream<TryFuture<(u64, Bloom)>>
958
959            .buffered(LOG_FILTERING_LOOKAHEAD)
960            // --> TryStream<(u64, Bloom)>
961
962            // find the epochs that match
963            .try_filter_map(move |(epoch, bloom)| {
964                debug!("Matching epoch {:?} bloom = {:?}", epoch, bloom);
965
966                match bloom_match(&bloom) {
967                    true => future::ready(Ok(Some(epoch))),
968                    false => future::ready(Ok(None)),
969                }
970            })
971            // --> TryStream<u64>
972
973            // retrieve receipts
974            .map(|res| match res {
975                Err(e) => Either::Left(future::err(e)),
976                Ok(epoch) => Either::Right(self.retrieve_receipts(epoch)),
977            })
978            // --> Stream<TryFuture<(u64, Vec<Vec<Receipt>>)>>
979
980            .buffered(LOG_FILTERING_LOOKAHEAD)
981            // --> TryStream<(u64, Vec<Vec<Receipt>>)>
982
983            // filter logs in epoch
984            .map(|res| match res {
985                Err(e) => Err(e),
986                Ok((epoch, receipts)) => {
987                    debug!("Filtering epoch {:?} receipts = {:?}", epoch, receipts);
988
989                    let logs = self
990                        .filter_epoch_receipts(epoch, receipts, filter.clone())?
991                        .map(|log| Ok(log));
992
993                    Ok(stream::iter(logs))
994                }
995            })
996            // --> TryStream<TryStream<LocalizedLogEntry>>
997
998            .try_flatten()
999            // --> TryStream<LocalizedLogEntry>
1000
1001            // apply block filter
1002            .try_filter(move |log| future::ready(block_filter(log.block_hash)))
1003            // --> TryStream<LocalizedLogEntry>
1004
1005            // retrieve block txs
1006            .map(|res| match res {
1007                Err(e) => Either::Left(future::err(e)),
1008                Ok(log) => Either::Right(self.retrieve_block_txs_for_log(log)),
1009            })
1010            // --> Stream<TryFuture<(LocalizedLogEntry, Vec<SignedTransaction>)>>
1011
1012            .buffered(LOG_FILTERING_LOOKAHEAD)
1013            // --> TryStream<(LocalizedLogEntry, Vec<SignedTransaction>)>
1014
1015            .map_ok(|(mut log, txs)| {
1016                debug!("processing log = {:?} txs = {:?}", log, txs);
1017
1018                // at this point, we're trying to retrieve a block tx based on verified
1019                // bloom, receipt, and tx information. if all the verifications passed
1020                // before, then we must have the corresponding tx in this block.
1021                assert!(log.transaction_index < txs.len());
1022
1023                // set tx hash
1024                log.transaction_hash = txs[log.transaction_index].hash();
1025                log
1026            })
1027            // Limit logs can return
1028            .take(self.consensus.config().get_logs_filter_max_limit.unwrap_or(::std::usize::MAX - 1) + 1)
1029            .try_collect();
1030        // --> TryFuture<Vec<LocalizedLogEntry>>
1031
1032        let mut matching: Vec<_> = stream.await?;
1033        matching.reverse();
1034        debug!("Collected matching logs = {:?}", matching);
1035        Ok(matching)
1036    }
1037
1038    pub fn get_network_type(&self) -> &Network {
1039        self.network.get_network_type()
1040    }
1041}