1use super::{
6 common::{FutureItem, PendingItem, SyncManager, TimeOrdered},
7 Witnesses,
8};
9use crate::{
10 consensus::SharedConsensusGraph,
11 light_protocol::{
12 common::{FullPeerState, LedgerInfo, Peers},
13 error::*,
14 message::{msgid, GetTxInfos, TxInfo},
15 },
16 message::{Message, RequestId},
17 verification::{
18 is_valid_receipt_inclusion_proof, is_valid_tx_inclusion_proof,
19 },
20 UniqueId,
21};
22use cfx_parameters::light::{
23 CACHE_TIMEOUT, MAX_TX_INFOS_IN_FLIGHT, TX_INFO_REQUEST_BATCH_SIZE,
24 TX_INFO_REQUEST_TIMEOUT,
25};
26use cfx_types::{H256, U256};
27use futures::future::FutureExt;
28use lru_time_cache::LruCache;
29use network::{node_table::NodeId, NetworkContext};
30use parking_lot::RwLock;
31use primitives::{
32 Receipt, SignedTransaction, TransactionIndex, TransactionStatus,
33};
34use std::{future::Future, sync::Arc};
35
36#[derive(Debug)]
37#[allow(dead_code)]
38struct Statistics {
39 cached: usize,
40 in_flight: usize,
41 waiting: usize,
42}
43
44type MissingTxInfo = TimeOrdered<H256>;
46
47#[derive(Clone)]
48pub struct TxInfoValidated {
49 pub tx: SignedTransaction,
50 pub receipt: Receipt,
51 pub tx_index: TransactionIndex,
52 pub prior_gas_used: U256,
53}
54
55type PendingTxInfo = PendingItem<TxInfoValidated, ClonableError>;
56
57pub struct TxInfos {
58 ledger: LedgerInfo,
60
61 request_id_allocator: Arc<UniqueId>,
63
64 sync_manager: SyncManager<H256, MissingTxInfo>,
66
67 verified: Arc<RwLock<LruCache<H256, PendingTxInfo>>>,
69
70 pub witnesses: Arc<Witnesses>,
72}
73
74impl TxInfos {
75 pub fn new(
76 consensus: SharedConsensusGraph, peers: Arc<Peers<FullPeerState>>,
77 request_id_allocator: Arc<UniqueId>, witnesses: Arc<Witnesses>,
78 ) -> Self {
79 let ledger = LedgerInfo::new(consensus.clone());
80 let sync_manager = SyncManager::new(peers.clone(), msgid::GET_TX_INFOS);
81
82 let cache = LruCache::with_expiry_duration(*CACHE_TIMEOUT);
83 let verified = Arc::new(RwLock::new(cache));
84
85 TxInfos {
86 ledger,
87 request_id_allocator,
88 sync_manager,
89 verified,
90 witnesses,
91 }
92 }
93
94 #[inline]
95 pub fn print_stats(&self) {
96 debug!(
97 "tx info sync statistics: {:?}",
98 Statistics {
99 cached: self.verified.read().len(),
100 in_flight: self.sync_manager.num_in_flight(),
101 waiting: self.sync_manager.num_waiting(),
102 }
103 );
104 }
105
106 #[inline]
107 pub fn request_now(
108 &self, io: &dyn NetworkContext, hash: H256,
109 ) -> impl Future<Output = Result<TxInfoValidated>> {
110 let mut verified = self.verified.write();
111
112 if !verified.contains_key(&hash) {
113 let missing = std::iter::once(MissingTxInfo::new(hash));
114
115 self.sync_manager.request_now(missing, |peer, hashes| {
116 self.send_request(io, peer, hashes)
117 });
118 }
119
120 verified
121 .entry(hash)
122 .or_insert(PendingItem::pending())
123 .clear_error();
124
125 FutureItem::new(hash, self.verified.clone())
126 .map(|res| res.map_err(|e| e.into()))
127 }
128
129 #[inline]
130 pub fn receive(
131 &self, peer: &NodeId, id: RequestId,
132 infos: impl Iterator<Item = TxInfo>,
133 ) -> Result<()> {
134 for info in infos {
135 trace!("Validating tx_info {:?}", info);
136
137 match self.sync_manager.check_if_requested(
138 peer,
139 id,
140 &info.tx.hash(),
141 )? {
142 None => continue,
143 Some(_) => self.validate_and_store(info)?,
144 };
145 }
146
147 Ok(())
148 }
149
150 #[inline]
151 fn validate_and_store(&self, info: TxInfo) -> Result<()> {
152 let tx_hash = info.tx.hash();
153
154 if let Err(e) = self.validate_and_store_tx_info(info) {
156 let e = ClonableError::from(e);
159
160 self.verified
161 .write()
162 .entry(tx_hash)
163 .or_insert(PendingItem::pending())
164 .set_error(e.clone());
165
166 bail!(e);
167 }
168
169 Ok(())
170 }
171
172 #[inline]
173 fn validate_and_store_tx_info(&self, info: TxInfo) -> Result<()> {
174 let TxInfo {
175 epoch,
176
177 tx,
179 tx_index_in_block,
180 num_txs_in_block,
181 tx_proof,
182
183 receipt,
185 block_index_in_epoch,
186 num_blocks_in_epoch,
187 block_index_proof,
188 receipt_proof,
189
190 maybe_prev_receipt,
192 maybe_prev_receipt_proof,
193 } = info;
194
195 if block_index_in_epoch >= num_blocks_in_epoch {
197 bail!(Error::InvalidTxInfo {
198 reason: format!(
199 "Inconsisent block index: {} >= {}",
200 block_index_in_epoch, num_blocks_in_epoch
201 )
202 });
203 }
204
205 if tx_index_in_block >= num_txs_in_block {
206 bail!(Error::InvalidTxInfo {
207 reason: format!(
208 "Inconsisent tx index: {} >= {}",
209 tx_index_in_block, num_txs_in_block
210 )
211 });
212 }
213
214 if receipt.outcome_status != TransactionStatus::Success
217 && receipt.outcome_status != TransactionStatus::Failure
218 {
219 bail!(Error::InvalidTxInfo {
220 reason: format!(
221 "Unexpected outcome status in tx info: {:?}",
222 receipt.outcome_status
223 )
224 });
225 }
226
227 let block_hash = match self.ledger.block_hashes_in(epoch)? {
228 hs if hs.len() != num_blocks_in_epoch => {
229 bail!(Error::InvalidTxInfo {
230 reason: format!(
231 "Number of blocks in epoch mismatch: local = {}, received = {}",
232 hs.len(), num_blocks_in_epoch),
233 });
234 }
235 hs => hs[block_index_in_epoch],
236 };
237
238 let tx_hash = tx.hash();
240 let block_tx_root =
241 *self.ledger.header(block_hash)?.transactions_root();
242
243 trace!(
244 "verifying tx proof with\n
245 block_tx_root = {:?}\n
246 tx_index_in_block = {:?}\n
247 num_txs_in_block = {:?}\n
248 tx_hash = {:?}\n
249 tx_proof = {:?}",
250 block_tx_root,
251 tx_index_in_block,
252 num_txs_in_block,
253 tx_hash,
254 tx_proof
255 );
256
257 if !is_valid_tx_inclusion_proof(
258 block_tx_root,
259 tx_index_in_block,
260 num_txs_in_block,
261 tx_hash,
262 &tx_proof,
263 ) {
264 bail!(Error::InvalidTxInfo {
265 reason: "Transaction proof verification failed".to_owned()
266 });
267 }
268
269 let verified_epoch_receipts_root =
271 self.witnesses.root_hashes_of(epoch)?.receipts_root_hash;
272
273 trace!(
274 "verifying receipt proof with\n
275 verified_epoch_receipts_root = {:?}\n
276 block_index_in_epoch = {:?}\n
277 num_blocks_in_epoch = {:?}\n
278 block_index_proof = {:?}\n
279 tx_index_in_block = {:?}\n
280 num_txs_in_block = {:?}\n
281 receipt = {:?}\n
282 receipt_proof = {:?}",
283 verified_epoch_receipts_root,
284 block_index_in_epoch,
285 num_blocks_in_epoch,
286 block_index_proof,
287 tx_index_in_block,
288 num_txs_in_block,
289 receipt,
290 receipt_proof,
291 );
292
293 if !is_valid_receipt_inclusion_proof(
294 verified_epoch_receipts_root,
295 block_index_in_epoch,
296 num_blocks_in_epoch,
297 &block_index_proof,
298 tx_index_in_block,
299 num_txs_in_block,
300 &receipt,
301 &receipt_proof,
302 ) {
303 bail!(Error::InvalidTxInfo {
304 reason: "Receipt proof verification failed".to_owned()
305 });
306 }
307
308 let prior_gas_used = match (
310 tx_index_in_block,
311 maybe_prev_receipt,
312 maybe_prev_receipt_proof,
313 ) {
314 (0, _, _) => U256::zero(),
316
317 (_n, Some(prev_receipt), Some(prev_receipt_proof)) => {
319 let prev_receipt_index = tx_index_in_block - 1;
320
321 if !is_valid_receipt_inclusion_proof(
322 verified_epoch_receipts_root,
323 block_index_in_epoch,
324 num_blocks_in_epoch,
325 &block_index_proof,
326 prev_receipt_index,
327 num_txs_in_block,
328 &prev_receipt,
329 &prev_receipt_proof,
330 ) {
331 bail!(Error::InvalidTxInfo {
332 reason: "Previous receipt proof verification failed"
333 .to_owned()
334 });
335 }
336
337 prev_receipt.accumulated_gas_used
338 }
339
340 (_, maybe_prev_receipt, maybe_prev_receipt_proof) => {
342 bail!(Error::InvalidTxInfo {
343 reason: format!(
344 "Expected two receipts; received one.
345 tx_index_in_block = {:?},
346 maybe_prev_receipt = {:?},
347 maybe_prev_receipt_proof = {:?}",
348 tx_index_in_block,
349 maybe_prev_receipt,
350 maybe_prev_receipt_proof
351 )
352 });
353 }
354 };
355
356 let tx_index = TransactionIndex {
358 block_hash,
359 real_index: tx_index_in_block,
360 is_phantom: false,
361 rpc_index: None,
362 };
363
364 self.verified
365 .write()
366 .entry(tx_hash)
367 .or_insert(PendingItem::pending())
368 .set(TxInfoValidated {
369 tx,
370 receipt,
371 tx_index,
372 prior_gas_used,
373 });
374
375 Ok(())
376 }
377
378 #[inline]
379 pub fn clean_up(&self) {
380 let timeout = *TX_INFO_REQUEST_TIMEOUT;
382 let infos = self.sync_manager.remove_timeout_requests(timeout);
383 trace!("Timeout tx-infos ({}): {:?}", infos.len(), infos);
384 self.sync_manager.insert_waiting(infos.into_iter());
385
386 self.verified.write().get(&Default::default());
388 }
389
390 #[inline]
391 fn send_request(
392 &self, io: &dyn NetworkContext, peer: &NodeId, hashes: Vec<H256>,
393 ) -> Result<Option<RequestId>> {
394 if hashes.is_empty() {
395 return Ok(None);
396 }
397
398 let request_id = self.request_id_allocator.next();
399
400 trace!(
401 "send_request GetTxInfos peer={:?} id={:?} hashes={:?}",
402 peer,
403 request_id,
404 hashes
405 );
406
407 let msg: Box<dyn Message> = Box::new(GetTxInfos { request_id, hashes });
408
409 msg.send(io, peer)?;
410 Ok(Some(request_id))
411 }
412
413 #[inline]
414 pub fn sync(&self, io: &dyn NetworkContext) {
415 self.sync_manager.sync(
416 MAX_TX_INFOS_IN_FLIGHT,
417 TX_INFO_REQUEST_BATCH_SIZE,
418 |peer, hashes| self.send_request(io, peer, hashes),
419 );
420 }
421}