1use crate::{
6 consensus::SharedConsensusGraph,
7 errors::{account_result_to_rpc_result, Error},
8 light_protocol::{
9 common::{FullPeerFilter, LedgerInfo},
10 handler::sync::TxInfoValidated,
11 message::msgid,
12 Error as LightError, Handler as LightHandler, LightNodeConfiguration,
13 LIGHT_PROTOCOL_ID, LIGHT_PROTOCOL_VERSION,
14 },
15 sync::SynchronizationGraph,
16 Notifications,
17};
18use cfx_addr::Network;
19use cfx_executor::state::COMMISSION_PRIVILEGE_SPECIAL_KEY;
20use cfx_parameters::{
21 consensus::DEFERRED_STATE_EPOCH_COUNT,
22 internal_contract_addresses::SPONSOR_WHITELIST_CONTROL_CONTRACT_ADDRESS,
23 light::{
24 GAS_PRICE_BATCH_SIZE, GAS_PRICE_BLOCK_SAMPLE_SIZE,
25 GAS_PRICE_TRANSACTION_SAMPLE_SIZE, LOG_FILTERING_LOOKAHEAD,
26 MAX_POLL_TIME, TRANSACTION_COUNT_PER_BLOCK_WATER_LINE_LOW,
27 TRANSACTION_COUNT_PER_BLOCK_WATER_LINE_MEDIUM,
28 },
29};
30use cfx_statedb::global_params::{self, GlobalParamKey};
31use cfx_types::{
32 address_util::AddressUtil, AllChainID, BigEndianHash, Bloom, H160, H256,
33 KECCAK_EMPTY_BLOOM, U256,
34};
35use futures::{
36 future::{self, Either},
37 stream, try_join, StreamExt, TryStreamExt,
38};
39use network::{service::ProtocolVersion, NetworkContext, NetworkService};
40use primitives::{
41 filter::{FilterError, LogFilter},
42 log_entry::{LocalizedLogEntry, LogEntry},
43 Account, Block, BlockReceipts, CodeInfo, DepositList, EpochNumber, Receipt,
44 SignedTransaction, StorageKey, StorageRoot, StorageValue, TransactionIndex,
45 VoteStakeList,
46};
47use rlp::Rlp;
48use std::{collections::BTreeSet, future::Future, sync::Arc, time::Duration};
49use tokio::time::timeout;
50
51pub struct TxInfo {
52 pub tx: SignedTransaction,
53 pub maybe_block_number: Option<u64>,
54 pub receipt: Receipt,
55 pub tx_index: TransactionIndex,
56 pub maybe_epoch: Option<u64>,
57 pub maybe_state_root: Option<H256>,
58 pub prior_gas_used: U256,
59}
60
61async fn with_timeout<T>(
62 d: Duration, msg: String,
63 fut: impl Future<Output = Result<T, LightError>> + Send + Sync,
64) -> Result<T, LightError> {
65 let with_timeout = timeout(d, fut);
66 with_timeout
68 .await
69 .map_err(|_| LightError::from(LightError::Timeout(msg)))?
70}
71
72pub struct QueryService {
73 protocol_version: ProtocolVersion,
74
75 consensus: SharedConsensusGraph,
77
78 handler: Arc<LightHandler>,
80
81 ledger: LedgerInfo,
83
84 network: Arc<NetworkService>,
86}
87
88impl QueryService {
89 pub fn new(
90 consensus: SharedConsensusGraph, graph: Arc<SynchronizationGraph>,
91 network: Arc<NetworkService>, throttling_config_file: Option<String>,
92 notifications: Arc<Notifications>, config: LightNodeConfiguration,
93 ) -> Self {
94 let handler = Arc::new(LightHandler::new(
95 consensus.clone(),
96 graph,
97 throttling_config_file,
98 notifications,
99 config,
100 ));
101 let ledger = LedgerInfo::new(consensus.clone());
102
103 QueryService {
104 protocol_version: LIGHT_PROTOCOL_VERSION,
105 consensus,
106 handler,
107 ledger,
108 network,
109 }
110 }
111
112 pub fn register(&self) -> Result<(), String> {
113 self.network
114 .register_protocol(
115 self.handler.clone(),
116 LIGHT_PROTOCOL_ID,
117 self.protocol_version,
118 )
119 .map_err(|e| {
120 format!("failed to register protocol QueryService: {:?}", e)
121 })
122 }
123
124 fn with_io<T>(&self, f: impl FnOnce(&dyn NetworkContext) -> T) -> T {
125 self.network
126 .with_context(self.handler.clone(), LIGHT_PROTOCOL_ID, |io| f(io))
127 .expect("Unable to access network service")
128 }
129
130 async fn retrieve_state_entry_raw(
131 &self, epoch: u64, key: Vec<u8>,
132 ) -> Result<Option<Vec<u8>>, LightError> {
133 trace!(
134 "retrieve_state_entry_raw epoch = {}, key = {:?}",
135 epoch,
136 key
137 );
138
139 with_timeout(
140 *MAX_POLL_TIME,
141 format!("Timeout while retrieving state entry for epoch {:?} with key {:?}", epoch, key),
142 self.with_io(|io| self.handler.state_entries.request_now(io, epoch, key)),
143 )
144 .await
145 }
146
147 async fn retrieve_state_entry<T: rlp::Decodable>(
148 &self, epoch: u64, key: Vec<u8>,
149 ) -> Result<Option<T>, LightError> {
150 match self.retrieve_state_entry_raw(epoch, key).await? {
151 None => Ok(None),
152 Some(raw) => {
153 let decoded = rlp::decode::<T>(raw.as_ref())
154 .map_err(|e| format!("{}", e))?;
155 Ok(Some(decoded))
156 }
157 }
158 }
159
160 async fn retrieve_storage_root(
161 &self, epoch: u64, address: H160,
162 ) -> Result<StorageRoot, LightError> {
163 trace!(
164 "retrieve_storage_root epoch = {}, address = {}",
165 epoch,
166 address
167 );
168
169 with_timeout(
170 *MAX_POLL_TIME,
171 format!("Timeout while retrieving storage root for address {:?} in epoch {:?}", address, epoch),
172 self.with_io(|io| self.handler.storage_roots.request_now(io, epoch, address)),
173 )
174 .await
175 }
176
177 async fn retrieve_bloom(
178 &self, epoch: u64,
179 ) -> Result<(u64, Bloom), LightError> {
180 trace!("retrieve_bloom epoch = {}", epoch);
181
182 with_timeout(
183 *MAX_POLL_TIME,
184 format!("Timeout while retrieving bloom for epoch {:?}", epoch),
185 self.handler.blooms.request(epoch),
186 )
187 .await
188 .map(|bloom| (epoch, bloom))
189 }
190
191 async fn retrieve_receipts(
192 &self, epoch: u64,
193 ) -> Result<(u64, Vec<BlockReceipts>), LightError> {
194 trace!("retrieve_receipts epoch = {}", epoch);
195
196 with_timeout(
197 *MAX_POLL_TIME,
198 format!("Timeout while retrieving receipts for epoch {:?}", epoch),
199 self.handler.receipts.request(epoch),
200 )
201 .await
202 .map(|receipts| (epoch, receipts))
203 }
204
205 pub async fn retrieve_block_txs(
206 &self, hash: H256,
207 ) -> Result<Vec<SignedTransaction>, LightError> {
208 trace!("retrieve_block_txs hash = {:?}", hash);
209
210 with_timeout(
211 *MAX_POLL_TIME,
212 format!("Timeout while retrieving block txs for block {:?}", hash),
213 self.handler.block_txs.request(hash),
214 )
215 .await
216 }
217
218 async fn retrieve_block_txs_for_log(
219 &self, log: LocalizedLogEntry,
220 ) -> Result<(LocalizedLogEntry, Vec<SignedTransaction>), LightError> {
221 trace!("retrieve_block_txs_for_log log = {:?}", log);
222
223 self.retrieve_block_txs(log.block_hash)
224 .await
225 .map(|block_txs| (log, block_txs))
226 }
227
228 pub async fn retrieve_block(
229 &self, hash: H256,
230 ) -> Result<Option<Block>, LightError> {
231 let genesis = self.consensus.data_manager().true_genesis.clone();
232
233 if hash == genesis.hash() {
234 return Ok(Some((*genesis).clone()));
235 }
236
237 let maybe_block_header =
238 self.consensus.data_manager().block_header_by_hash(&hash);
239
240 let block_header = match maybe_block_header {
241 None => return Ok(None),
242 Some(h) => (*h).clone(),
243 };
244
245 let transactions = self
246 .retrieve_block_txs(hash)
247 .await?
248 .into_iter()
249 .map(Arc::new)
250 .collect();
251
252 Ok(Some(Block::new(block_header, transactions)))
253 }
254
255 async fn retrieve_tx_info(
256 &self, hash: H256,
257 ) -> Result<TxInfoValidated, LightError> {
258 trace!("retrieve_tx_info hash = {:?}", hash);
259
260 with_timeout(
261 *MAX_POLL_TIME,
262 format!("Timeout while retrieving tx info for tx {:?}", hash),
263 self.with_io(|io| self.handler.tx_infos.request_now(io, hash)),
264 )
265 .await
266 }
267
268 pub async fn gas_price(&self) -> Result<Option<U256>, LightError> {
269 let mut epoch = self.consensus.best_epoch_number();
271 let mut hashes = vec![];
272 let mut total_transaction_count_in_processed_blocks = 0;
273 let mut processed_block_count = 0;
274
275 let inner = self.consensus.inner.clone();
276
277 loop {
278 if hashes.len() >= GAS_PRICE_BLOCK_SAMPLE_SIZE || epoch == 0 {
279 break;
280 }
281
282 let mut epoch_hashes = inner
283 .read()
284 .block_hashes_by_epoch(epoch)
285 .map_err(|e| e.to_string())?;
286 epoch_hashes.reverse();
287
288 let missing = GAS_PRICE_BLOCK_SAMPLE_SIZE - hashes.len();
289 hashes.extend(epoch_hashes.into_iter().take(missing));
290
291 epoch -= 1;
292 }
293
294 let mut stream = stream::iter(hashes)
296 .map(|h| async move {
297 self.retrieve_block(h).await.map(move |b| (h, b))
298 })
299 .buffered(GAS_PRICE_BATCH_SIZE);
300
301 let mut prices = vec![];
303
304 while let Some(item) = stream.try_next().await? {
305 let block = match item {
306 (_, Some(b)) => b,
307 (hash, None) => {
308 bail!(LightError::InternalError(format!(
312 "Block {:?} not found during gas price sampling",
313 hash
314 )));
315 }
316 };
317
318 trace!("sampling gas prices from block {:?}", block.hash());
319 processed_block_count += 1;
320 total_transaction_count_in_processed_blocks +=
321 block.transactions.len();
322
323 for tx in block.transactions.iter() {
324 prices.push(tx.gas_price().clone());
325
326 if prices.len() == GAS_PRICE_TRANSACTION_SAMPLE_SIZE {
327 break;
328 }
329 }
330 }
331
332 trace!("gas price sample: {:?}", prices);
333
334 let average_transaction_count_per_block = if processed_block_count != 0
335 {
336 total_transaction_count_in_processed_blocks / processed_block_count
337 } else {
338 0
339 };
340
341 if prices.is_empty() {
342 Ok(Some(U256::from(1)))
343 } else {
344 prices.sort();
345 if average_transaction_count_per_block
346 < TRANSACTION_COUNT_PER_BLOCK_WATER_LINE_LOW
347 {
348 Ok(Some(U256::from(1)))
349 } else if average_transaction_count_per_block
350 < TRANSACTION_COUNT_PER_BLOCK_WATER_LINE_MEDIUM
351 {
352 Ok(Some(prices[prices.len() / 8]))
353 } else {
354 Ok(Some(prices[prices.len() / 2]))
355 }
356 }
357 }
358
359 fn account_key(address: &H160) -> Vec<u8> {
360 StorageKey::new_account_key(&address)
361 .with_native_space()
362 .to_key_bytes()
363 }
364
365 fn code_key(address: &H160, code_hash: &H256) -> Vec<u8> {
366 StorageKey::new_code_key(&address, &code_hash)
367 .with_native_space()
368 .to_key_bytes()
369 }
370
371 fn storage_key(address: &H160, position: &[u8]) -> Vec<u8> {
372 StorageKey::new_storage_key(&address, &position)
373 .with_native_space()
374 .to_key_bytes()
375 }
376
377 fn deposit_list_key(address: &H160) -> Vec<u8> {
378 StorageKey::new_deposit_list_key(address)
379 .with_native_space()
380 .to_key_bytes()
381 }
382
383 fn vote_list_key(address: &H160) -> Vec<u8> {
384 StorageKey::new_vote_list_key(address)
385 .with_native_space()
386 .to_key_bytes()
387 }
388
389 pub async fn get_account(
390 &self, epoch: EpochNumber, address: H160,
391 ) -> Result<Option<Account>, LightError> {
392 debug!("get_account epoch={:?} address={:?}", epoch, address);
393
394 let epoch = self.get_height_from_epoch_number(epoch)?;
395 let key = Self::account_key(&address);
396
397 match self.retrieve_state_entry_raw(epoch, key).await? {
398 None => Ok(None),
399 Some(rlp) => {
400 Ok(Some(Account::new_from_rlp(address, &Rlp::new(&rlp))?))
401 }
402 }
403 }
404
405 pub async fn get_deposit_list(
406 &self, epoch: EpochNumber, address: H160,
407 ) -> Result<Option<DepositList>, LightError> {
408 let epoch = self.get_height_from_epoch_number(epoch)?;
409 let key = Self::deposit_list_key(&address);
410 self.retrieve_state_entry::<DepositList>(epoch, key).await
411 }
412
413 pub async fn get_vote_list(
414 &self, epoch: EpochNumber, address: H160,
415 ) -> Result<Option<VoteStakeList>, LightError> {
416 let epoch = self.get_height_from_epoch_number(epoch)?;
417 let key = Self::vote_list_key(&address);
418 self.retrieve_state_entry::<VoteStakeList>(epoch, key).await
419 }
420
421 pub async fn get_code(
422 &self, epoch: EpochNumber, address: H160,
423 ) -> Result<Option<Vec<u8>>, Error> {
424 debug!("get_code epoch={:?} address={:?}", epoch, address);
425
426 if !address.is_contract_address() && !address.is_builtin_address() {
428 return Ok(None);
429 }
430
431 let epoch = self.get_height_from_epoch_number(epoch)?;
432 let key = Self::account_key(&address);
433
434 let code_hash = match self.retrieve_state_entry_raw(epoch, key).await {
435 Err(e) => bail!(e),
436 Ok(None) => return Ok(None),
437 Ok(Some(rlp)) => {
438 account_result_to_rpc_result(
439 "address",
440 Account::new_from_rlp(address, &Rlp::new(&rlp)),
441 )?
442 .code_hash
443 }
444 };
445
446 let key = Self::code_key(&address, &code_hash);
447
448 match self.retrieve_state_entry::<CodeInfo>(epoch, key).await? {
449 None => {
450 error!("Account {:?} found but code {:?} does not exist (epoch={:?})", address, code_hash, epoch);
454 Err(Error::Custom(format!(
455 "Unable to retrieve code: internal error"
456 )))
457 }
458 Some(info) => Ok(Some((*info.code).clone())),
459 }
460 }
461
462 pub async fn get_storage(
463 &self, epoch: EpochNumber, address: H160, position: H256,
464 ) -> Result<Option<H256>, LightError> {
465 debug!(
466 "get_storage epoch={:?} address={:?} position={:?}",
467 epoch, address, position
468 );
469
470 let epoch = self.get_height_from_epoch_number(epoch)?;
471 let key = Self::storage_key(&address, &position.0);
472
473 match self.retrieve_state_entry::<StorageValue>(epoch, key).await {
474 Err(e) => Err(e),
475 Ok(None) => Ok(None),
476 Ok(Some(entry)) => Ok(Some(H256::from_uint(&entry.value))),
477 }
478 }
479
480 pub async fn is_user_sponsored(
481 &self, epoch: EpochNumber, contract: H160, user: H160,
482 ) -> Result<bool, LightError> {
483 debug!(
484 "is_user_sponsored epoch={:?} contract={:?} user={:?}",
485 epoch, contract, user
486 );
487
488 let epoch = self.get_height_from_epoch_number(epoch)?;
489
490 let all_sponsored = {
492 let mut pos = Vec::with_capacity(H160::len_bytes() * 2);
493 pos.extend_from_slice(contract.as_bytes());
494 pos.extend_from_slice(COMMISSION_PRIVILEGE_SPECIAL_KEY.as_bytes());
495
496 let key = Self::storage_key(
497 &SPONSOR_WHITELIST_CONTROL_CONTRACT_ADDRESS,
498 &pos,
499 );
500
501 self.retrieve_state_entry::<StorageValue>(epoch, key)
502 };
503
504 let user_sponsored = {
506 let mut pos = Vec::with_capacity(H160::len_bytes() * 2);
507 pos.extend_from_slice(contract.as_bytes());
508 pos.extend_from_slice(user.as_bytes());
509
510 let key = Self::storage_key(
511 &SPONSOR_WHITELIST_CONTROL_CONTRACT_ADDRESS,
512 &pos,
513 );
514
515 self.retrieve_state_entry::<StorageValue>(epoch, key)
516 };
517
518 let (all_sponsored, user_sponsored) =
520 future::join(all_sponsored, user_sponsored).await;
521
522 if matches!(all_sponsored?, Some(n) if !n.value.is_zero()) {
523 return Ok(true);
524 }
525
526 if matches!(user_sponsored?, Some(n) if !n.value.is_zero()) {
527 return Ok(true);
528 }
529
530 Ok(false)
531 }
532
533 pub async fn get_storage_root(
534 &self, epoch: EpochNumber, address: H160,
535 ) -> Result<StorageRoot, LightError> {
536 debug!("get_storage_root epoch={:?} address={:?}", epoch, address);
537
538 let epoch = self.get_height_from_epoch_number(epoch)?;
539 self.retrieve_storage_root(epoch, address).await
540 }
541
542 pub async fn get_interest_rate(
543 &self, epoch: EpochNumber,
544 ) -> Result<U256, LightError> {
545 debug!("get_interest_rate epoch={:?}", epoch);
546
547 let epoch = self.get_height_from_epoch_number(epoch)?;
548
549 let key = global_params::InterestRate::STORAGE_KEY.to_key_bytes();
550
551 self.retrieve_state_entry::<U256>(epoch, key)
552 .await
553 .map(|opt| opt.unwrap_or_default())
554 }
555
556 pub async fn get_accumulate_interest_rate(
557 &self, epoch: EpochNumber,
558 ) -> Result<U256, LightError> {
559 debug!("get_accumulate_interest_rate epoch={:?}", epoch);
560
561 let epoch = self.get_height_from_epoch_number(epoch)?;
562
563 let key =
564 global_params::AccumulateInterestRate::STORAGE_KEY.to_key_bytes();
565
566 self.retrieve_state_entry::<U256>(epoch, key)
567 .await
568 .map(|opt| opt.unwrap_or_default())
569 }
570
571 pub async fn get_pos_economics(
572 &self, epoch: EpochNumber,
573 ) -> Result<[U256; 3], LightError> {
574 debug!("get_PoSEconomics epoch={:?}", epoch);
575
576 let epoch = self.get_height_from_epoch_number(epoch)?;
577
578 let key1 = global_params::TotalPosStaking::STORAGE_KEY.to_key_bytes();
579 let key2 =
580 global_params::DistributablePoSInterest::STORAGE_KEY.to_key_bytes();
581 let key3 =
582 global_params::LastDistributeBlock::STORAGE_KEY.to_key_bytes();
583
584 let total_pos_staking = try_join!(
585 self.retrieve_state_entry::<U256>(epoch, key1),
586 self.retrieve_state_entry::<U256>(epoch, key2),
587 self.retrieve_state_entry::<U256>(epoch, key3)
588 )?;
589 Ok([
590 total_pos_staking.0.unwrap_or_default(),
591 total_pos_staking.1.unwrap_or_default(),
592 total_pos_staking.2.unwrap_or_default(),
593 ])
594 }
595
596 pub async fn get_tx_info(&self, hash: H256) -> Result<TxInfo, LightError> {
597 debug!("get_tx_info hash={:?}", hash);
598
599 let TxInfoValidated {
603 tx,
604 receipt,
605 tx_index,
606 prior_gas_used,
607 } = self.retrieve_tx_info(hash).await?;
608
609 let block_hash = tx_index.block_hash;
610 let maybe_epoch = self.consensus.get_block_epoch_number(&block_hash);
611 let maybe_block_number =
612 self.consensus.get_block_number(&block_hash)?;
613 let maybe_state_root = maybe_epoch
614 .and_then(|e| self.handler.witnesses.root_hashes_of(e).ok())
615 .map(|roots| roots.state_root_hash);
616
617 Ok(TxInfo {
618 tx,
619 maybe_block_number,
620 receipt,
621 tx_index,
622 maybe_epoch,
623 maybe_state_root,
624 prior_gas_used,
625 })
626 }
627
628 pub fn send_raw_tx(&self, raw: Vec<u8>) -> bool {
632 debug!("send_raw_tx raw={:?}", raw);
633
634 let peers = FullPeerFilter::new(msgid::SEND_RAW_TX)
635 .select_all(self.handler.peers.clone());
636
637 match self.network.with_context(
638 self.handler.clone(),
639 LIGHT_PROTOCOL_ID,
640 |io| {
641 let mut success = false;
642
643 for peer in peers {
644 let res = self.handler.send_raw_tx(io, &peer, raw.clone());
646
647 match res {
649 Err(e) => {
650 warn!("Failed to relay to peer={:?}: {:?}", peer, e)
651 }
652 Ok(_) => {
653 debug!("Tx relay to peer {:?} successful", peer);
654 success = true;
655 }
656 }
657 }
658
659 success
660 },
661 ) {
662 Err(e) => unreachable!("{}", e),
663 Ok(success) => success,
664 }
665 }
666
667 pub async fn get_tx(
668 &self, hash: H256,
669 ) -> Result<SignedTransaction, LightError> {
670 debug!("get_tx hash={:?}", hash);
671
672 with_timeout(
673 *MAX_POLL_TIME,
674 format!(
675 "Timeout while retrieving transaction with hash {:?}",
676 hash
677 ),
678 self.with_io(|io| self.handler.txs.request_now(io, hash)),
679 )
680 .await
681 }
682
683 fn filter_receipt_logs(
687 epoch: u64, block_hash: H256, block_timestamp: Option<u64>,
688 transaction_index: usize, num_logs_remaining: &mut usize,
689 mut logs: Vec<LogEntry>, filter: LogFilter,
690 ) -> impl Iterator<Item = LocalizedLogEntry> {
691 let num_logs = logs.len();
692
693 let log_base_index = *num_logs_remaining;
694 *num_logs_remaining -= num_logs;
695
696 logs.reverse();
698
699 logs.into_iter()
700 .enumerate()
701 .filter(move |(_, entry)| filter.matches(&entry))
702 .map(move |(ii, entry)| LocalizedLogEntry {
703 block_hash,
704 epoch_number: epoch,
705 block_timestamp,
706 entry,
707 log_index: log_base_index - ii - 1,
708 transaction_hash: KECCAK_EMPTY_BLOOM, transaction_index,
710 transaction_log_index: num_logs - ii - 1,
711 })
712 }
713
714 fn filter_block_receipts(
716 epoch: u64, hash: H256, block_timestamp: Option<u64>,
717 block_receipts: BlockReceipts, filter: LogFilter,
718 ) -> impl Iterator<Item = LocalizedLogEntry> {
719 let mut receipts = block_receipts.receipts;
720 let num_receipts = receipts.len();
722
723 let mut remaining = receipts.iter().fold(0, |s, r| s + r.logs.len());
725
726 receipts.reverse();
728
729 receipts.into_iter().map(|r| r.logs).enumerate().flat_map(
730 move |(ii, logs)| {
731 debug!("block_hash {:?} logs = {:?}", hash, logs);
732 Self::filter_receipt_logs(
733 epoch,
734 hash,
735 block_timestamp,
736 num_receipts - ii - 1,
737 &mut remaining,
738 logs,
739 filter.clone(),
740 )
741 },
742 )
743 }
744
745 fn filter_epoch_receipts(
747 &self, epoch: u64, mut receipts: Vec<BlockReceipts>, filter: LogFilter,
748 ) -> Result<impl Iterator<Item = LocalizedLogEntry>, String> {
749 let mut hashes = self
751 .ledger
752 .block_hashes_in(epoch)
753 .map_err(|e| format!("{}", e))?;
754
755 receipts.reverse();
756 hashes.reverse();
757
758 let timestamps = hashes
759 .iter()
760 .map(|h| self.ledger.header(*h))
761 .filter(|s| s.is_ok())
762 .map(|h| h.unwrap().timestamp())
763 .collect::<Vec<_>>();
764
765 if timestamps.len() != hashes.len() {
766 return Err(format!(
767 "Unable to retrieve all block headers in epoch {:?} for log filtering",
768 epoch
769 ));
770 }
771
772 let matching = itertools::izip!(receipts, hashes, timestamps).flat_map(
773 move |(receipts, hash, timestamp)| {
774 trace!("block_hash {:?} receipts = {:?}", hash, receipts);
775 Self::filter_block_receipts(
776 epoch,
777 hash,
778 Some(timestamp),
779 receipts,
780 filter.clone(),
781 )
782 },
783 );
784
785 Ok(matching)
786 }
787
788 pub fn get_latest_verifiable_chain_id(
789 &self,
790 ) -> Result<AllChainID, FilterError> {
791 let epoch_number = self.get_latest_verifiable_epoch_number()?;
792 Ok(self
793 .consensus
794 .config()
795 .chain_id
796 .read()
797 .get_chain_id(epoch_number))
798 }
799
800 pub fn get_latest_verifiable_epoch_number(
801 &self,
802 ) -> Result<u64, FilterError> {
803 let latest_verified = self.handler.witnesses.latest_verified();
805
806 let latest_verifiable = match latest_verified {
807 n if n >= DEFERRED_STATE_EPOCH_COUNT => {
808 n - DEFERRED_STATE_EPOCH_COUNT
809 }
810 _ => {
811 return Err(FilterError::UnableToVerify {
812 epoch: 0,
813 latest_verifiable: 0,
814 });
815 }
816 };
817
818 trace!(
819 "get_latest_verifiable_epoch_number latest_verifiable = {}",
820 latest_verifiable
821 );
822 Ok(latest_verifiable)
823 }
824
825 pub fn get_height_from_epoch_number(
826 &self, epoch: EpochNumber,
827 ) -> Result<u64, FilterError> {
828 let latest_verifiable = self.get_latest_verifiable_epoch_number()?;
829
830 trace!(
831 "get_height_from_epoch_number epoch = {:?}, latest_verifiable = {}",
832 epoch,
833 latest_verifiable
834 );
835
836 match epoch {
837 EpochNumber::Earliest => Ok(0),
838 EpochNumber::LatestCheckpoint => {
839 Ok(self.consensus.latest_checkpoint_epoch_number())
840 }
841 EpochNumber::LatestConfirmed => {
842 Ok(self.consensus.latest_confirmed_epoch_number())
843 }
844 EpochNumber::LatestMined => Ok(latest_verifiable),
845 EpochNumber::LatestState => Ok(latest_verifiable),
846 EpochNumber::LatestFinalized => {
847 Ok(self.consensus.latest_finalized_epoch_number())
848 }
849 EpochNumber::Number(n) if n <= latest_verifiable => Ok(n),
850 EpochNumber::Number(n) => Err(FilterError::UnableToVerify {
851 epoch: n,
852 latest_verifiable,
853 }),
854 }
855 }
856
857 fn get_filter_epochs(
858 &self, filter: &LogFilter,
859 ) -> Result<(Vec<u64>, Box<dyn Fn(H256) -> bool + Send + Sync>), FilterError>
860 {
861 match &filter {
862 LogFilter::EpochLogFilter {
863 from_epoch,
864 to_epoch,
865 ..
866 } => {
867 let from_epoch =
868 self.get_height_from_epoch_number(from_epoch.clone())?;
869 let to_epoch =
870 self.get_height_from_epoch_number(to_epoch.clone())?;
871
872 if from_epoch > to_epoch {
873 return Err(FilterError::InvalidEpochNumber {
874 from_epoch,
875 to_epoch,
876 });
877 }
878
879 let epochs = (from_epoch..(to_epoch + 1)).rev().collect();
880 let block_filter = Box::new(|_| true);
881
882 Ok((epochs, block_filter))
883 }
884 LogFilter::BlockHashLogFilter { block_hashes, .. } => {
885 let hashes: BTreeSet<_> =
887 block_hashes.iter().cloned().collect();
888
889 let mut epochs = BTreeSet::new();
891
892 for hash in &hashes {
893 match self.consensus.get_block_epoch_number(&hash) {
894 Some(epoch) => epochs.insert(epoch),
895 None => {
896 return Err(FilterError::UnknownBlock {
897 hash: *hash,
898 })
899 }
900 };
901 }
902
903 let epochs = epochs.into_iter().rev().collect();
904 let block_filter = Box::new(move |hash| hashes.contains(&hash));
905
906 Ok((epochs, block_filter))
907 }
908 _ => bail!(FilterError::Custom(
909 "Light nodes do not support log filtering using block numbers"
910 .into(),
911 )),
912 }
913 }
914
915 pub async fn get_logs(
916 &self, filter: LogFilter,
917 ) -> Result<Vec<LocalizedLogEntry>, LightError> {
918 debug!("get_logs filter = {:?}", filter);
919
920 let (epochs, block_filter) = self
922 .get_filter_epochs(&filter)
923 .map_err(|e| format!("{}", e))?;
924
925 debug!("Executing filter on epochs {:?}", epochs);
926
927 let blooms = filter.bloom_possibilities();
929
930 let bloom_match = move |block_log_bloom: &Bloom| {
934 blooms
935 .iter()
936 .any(|bloom| block_log_bloom.contains_bloom(bloom))
937 };
938
939 let stream =
951 stream::iter(epochs)
953 .map(|epoch| self.retrieve_bloom(epoch))
957 .buffered(LOG_FILTERING_LOOKAHEAD)
960 .try_filter_map(move |(epoch, bloom)| {
964 debug!("Matching epoch {:?} bloom = {:?}", epoch, bloom);
965
966 match bloom_match(&bloom) {
967 true => future::ready(Ok(Some(epoch))),
968 false => future::ready(Ok(None)),
969 }
970 })
971 .map(|res| match res {
975 Err(e) => Either::Left(future::err(e)),
976 Ok(epoch) => Either::Right(self.retrieve_receipts(epoch)),
977 })
978 .buffered(LOG_FILTERING_LOOKAHEAD)
981 .map(|res| match res {
985 Err(e) => Err(e),
986 Ok((epoch, receipts)) => {
987 debug!("Filtering epoch {:?} receipts = {:?}", epoch, receipts);
988
989 let logs = self
990 .filter_epoch_receipts(epoch, receipts, filter.clone())?
991 .map(|log| Ok(log));
992
993 Ok(stream::iter(logs))
994 }
995 })
996 .try_flatten()
999 .try_filter(move |log| future::ready(block_filter(log.block_hash)))
1003 .map(|res| match res {
1007 Err(e) => Either::Left(future::err(e)),
1008 Ok(log) => Either::Right(self.retrieve_block_txs_for_log(log)),
1009 })
1010 .buffered(LOG_FILTERING_LOOKAHEAD)
1013 .map_ok(|(mut log, txs)| {
1016 debug!("processing log = {:?} txs = {:?}", log, txs);
1017
1018 assert!(log.transaction_index < txs.len());
1022
1023 log.transaction_hash = txs[log.transaction_index].hash();
1025 log
1026 })
1027 .take(self.consensus.config().get_logs_filter_max_limit.unwrap_or(::std::usize::MAX - 1) + 1)
1029 .try_collect();
1030 let mut matching: Vec<_> = stream.await?;
1033 matching.reverse();
1034 debug!("Collected matching logs = {:?}", matching);
1035 Ok(matching)
1036 }
1037
1038 pub fn get_network_type(&self) -> &Network {
1039 self.network.get_network_type()
1040 }
1041}