cfxcore/light_protocol/handler/sync/
tx_infos.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use super::{
6    common::{FutureItem, PendingItem, SyncManager, TimeOrdered},
7    Witnesses,
8};
9use crate::{
10    consensus::SharedConsensusGraph,
11    light_protocol::{
12        common::{FullPeerState, LedgerInfo, Peers},
13        error::*,
14        message::{msgid, GetTxInfos, TxInfo},
15    },
16    message::{Message, RequestId},
17    verification::{
18        is_valid_receipt_inclusion_proof, is_valid_tx_inclusion_proof,
19    },
20    UniqueId,
21};
22use cfx_parameters::light::{
23    CACHE_TIMEOUT, MAX_TX_INFOS_IN_FLIGHT, TX_INFO_REQUEST_BATCH_SIZE,
24    TX_INFO_REQUEST_TIMEOUT,
25};
26use cfx_types::{H256, U256};
27use futures::future::FutureExt;
28use lru_time_cache::LruCache;
29use network::{node_table::NodeId, NetworkContext};
30use parking_lot::RwLock;
31use primitives::{
32    Receipt, SignedTransaction, TransactionIndex, TransactionStatus,
33};
34use std::{future::Future, sync::Arc};
35
36#[derive(Debug)]
37#[allow(dead_code)]
38struct Statistics {
39    cached: usize,
40    in_flight: usize,
41    waiting: usize,
42}
43
44// prioritize earlier requests
45type MissingTxInfo = TimeOrdered<H256>;
46
47#[derive(Clone)]
48pub struct TxInfoValidated {
49    pub tx: SignedTransaction,
50    pub receipt: Receipt,
51    pub tx_index: TransactionIndex,
52    pub prior_gas_used: U256,
53}
54
55type PendingTxInfo = PendingItem<TxInfoValidated, ClonableError>;
56
57pub struct TxInfos {
58    // helper API for retrieving ledger information
59    ledger: LedgerInfo,
60
61    // series of unique request ids
62    request_id_allocator: Arc<UniqueId>,
63
64    // sync and request manager
65    sync_manager: SyncManager<H256, MissingTxInfo>,
66
67    // block txs received from full node
68    verified: Arc<RwLock<LruCache<H256, PendingTxInfo>>>,
69
70    // witness sync manager
71    pub witnesses: Arc<Witnesses>,
72}
73
74impl TxInfos {
75    pub fn new(
76        consensus: SharedConsensusGraph, peers: Arc<Peers<FullPeerState>>,
77        request_id_allocator: Arc<UniqueId>, witnesses: Arc<Witnesses>,
78    ) -> Self {
79        let ledger = LedgerInfo::new(consensus.clone());
80        let sync_manager = SyncManager::new(peers.clone(), msgid::GET_TX_INFOS);
81
82        let cache = LruCache::with_expiry_duration(*CACHE_TIMEOUT);
83        let verified = Arc::new(RwLock::new(cache));
84
85        TxInfos {
86            ledger,
87            request_id_allocator,
88            sync_manager,
89            verified,
90            witnesses,
91        }
92    }
93
94    #[inline]
95    pub fn print_stats(&self) {
96        debug!(
97            "tx info sync statistics: {:?}",
98            Statistics {
99                cached: self.verified.read().len(),
100                in_flight: self.sync_manager.num_in_flight(),
101                waiting: self.sync_manager.num_waiting(),
102            }
103        );
104    }
105
106    #[inline]
107    pub fn request_now(
108        &self, io: &dyn NetworkContext, hash: H256,
109    ) -> impl Future<Output = Result<TxInfoValidated>> {
110        let mut verified = self.verified.write();
111
112        if !verified.contains_key(&hash) {
113            let missing = std::iter::once(MissingTxInfo::new(hash));
114
115            self.sync_manager.request_now(missing, |peer, hashes| {
116                self.send_request(io, peer, hashes)
117            });
118        }
119
120        verified
121            .entry(hash)
122            .or_insert(PendingItem::pending())
123            .clear_error();
124
125        FutureItem::new(hash, self.verified.clone())
126            .map(|res| res.map_err(|e| e.into()))
127    }
128
129    #[inline]
130    pub fn receive(
131        &self, peer: &NodeId, id: RequestId,
132        infos: impl Iterator<Item = TxInfo>,
133    ) -> Result<()> {
134        for info in infos {
135            trace!("Validating tx_info {:?}", info);
136
137            match self.sync_manager.check_if_requested(
138                peer,
139                id,
140                &info.tx.hash(),
141            )? {
142                None => continue,
143                Some(_) => self.validate_and_store(info)?,
144            };
145        }
146
147        Ok(())
148    }
149
150    #[inline]
151    fn validate_and_store(&self, info: TxInfo) -> Result<()> {
152        let tx_hash = info.tx.hash();
153
154        // validate bloom
155        if let Err(e) = self.validate_and_store_tx_info(info) {
156            // forward error to both rpc caller(s) and sync handler
157            // so we need to make it clonable
158            let e = ClonableError::from(e);
159
160            self.verified
161                .write()
162                .entry(tx_hash)
163                .or_insert(PendingItem::pending())
164                .set_error(e.clone());
165
166            bail!(e);
167        }
168
169        Ok(())
170    }
171
172    #[inline]
173    fn validate_and_store_tx_info(&self, info: TxInfo) -> Result<()> {
174        let TxInfo {
175            epoch,
176
177            // tx-related fields
178            tx,
179            tx_index_in_block,
180            num_txs_in_block,
181            tx_proof,
182
183            // receipt-related fields
184            receipt,
185            block_index_in_epoch,
186            num_blocks_in_epoch,
187            block_index_proof,
188            receipt_proof,
189
190            // prior_gas_used-related fields
191            maybe_prev_receipt,
192            maybe_prev_receipt_proof,
193        } = info;
194
195        // quick check for well-formedness
196        if block_index_in_epoch >= num_blocks_in_epoch {
197            bail!(Error::InvalidTxInfo {
198                reason: format!(
199                    "Inconsisent block index: {} >= {}",
200                    block_index_in_epoch, num_blocks_in_epoch
201                )
202            });
203        }
204
205        if tx_index_in_block >= num_txs_in_block {
206            bail!(Error::InvalidTxInfo {
207                reason: format!(
208                    "Inconsisent tx index: {} >= {}",
209                    tx_index_in_block, num_txs_in_block
210                )
211            });
212        }
213
214        // only executed instances of the transaction are acceptable;
215        // receipts belonging to non-executed instances should not be sent
216        if receipt.outcome_status != TransactionStatus::Success
217            && receipt.outcome_status != TransactionStatus::Failure
218        {
219            bail!(Error::InvalidTxInfo {
220                reason: format!(
221                    "Unexpected outcome status in tx info: {:?}",
222                    receipt.outcome_status
223                )
224            });
225        }
226
227        let block_hash = match self.ledger.block_hashes_in(epoch)? {
228            hs if hs.len() != num_blocks_in_epoch => {
229                bail!(Error::InvalidTxInfo {
230                    reason: format!(
231                        "Number of blocks in epoch mismatch: local = {}, received = {}",
232                        hs.len(), num_blocks_in_epoch),
233                });
234            }
235            hs => hs[block_index_in_epoch],
236        };
237
238        // verify tx proof
239        let tx_hash = tx.hash();
240        let block_tx_root =
241            *self.ledger.header(block_hash)?.transactions_root();
242
243        trace!(
244            "verifying tx proof with\n
245            block_tx_root = {:?}\n
246            tx_index_in_block = {:?}\n
247            num_txs_in_block = {:?}\n
248            tx_hash = {:?}\n
249            tx_proof = {:?}",
250            block_tx_root,
251            tx_index_in_block,
252            num_txs_in_block,
253            tx_hash,
254            tx_proof
255        );
256
257        if !is_valid_tx_inclusion_proof(
258            block_tx_root,
259            tx_index_in_block,
260            num_txs_in_block,
261            tx_hash,
262            &tx_proof,
263        ) {
264            bail!(Error::InvalidTxInfo {
265                reason: "Transaction proof verification failed".to_owned()
266            });
267        }
268
269        // verify receipt proof
270        let verified_epoch_receipts_root =
271            self.witnesses.root_hashes_of(epoch)?.receipts_root_hash;
272
273        trace!(
274            "verifying receipt proof with\n
275            verified_epoch_receipts_root = {:?}\n
276            block_index_in_epoch = {:?}\n
277            num_blocks_in_epoch = {:?}\n
278            block_index_proof = {:?}\n
279            tx_index_in_block = {:?}\n
280            num_txs_in_block = {:?}\n
281            receipt = {:?}\n
282            receipt_proof = {:?}",
283            verified_epoch_receipts_root,
284            block_index_in_epoch,
285            num_blocks_in_epoch,
286            block_index_proof,
287            tx_index_in_block,
288            num_txs_in_block,
289            receipt,
290            receipt_proof,
291        );
292
293        if !is_valid_receipt_inclusion_proof(
294            verified_epoch_receipts_root,
295            block_index_in_epoch,
296            num_blocks_in_epoch,
297            &block_index_proof,
298            tx_index_in_block,
299            num_txs_in_block,
300            &receipt,
301            &receipt_proof,
302        ) {
303            bail!(Error::InvalidTxInfo {
304                reason: "Receipt proof verification failed".to_owned()
305            });
306        }
307
308        // find prior gas used
309        let prior_gas_used = match (
310            tx_index_in_block,
311            maybe_prev_receipt,
312            maybe_prev_receipt_proof,
313        ) {
314            // first receipt in block
315            (0, _, _) => U256::zero(),
316
317            // not the first receipt so we will use the previous receipt
318            (_n, Some(prev_receipt), Some(prev_receipt_proof)) => {
319                let prev_receipt_index = tx_index_in_block - 1;
320
321                if !is_valid_receipt_inclusion_proof(
322                    verified_epoch_receipts_root,
323                    block_index_in_epoch,
324                    num_blocks_in_epoch,
325                    &block_index_proof,
326                    prev_receipt_index,
327                    num_txs_in_block,
328                    &prev_receipt,
329                    &prev_receipt_proof,
330                ) {
331                    bail!(Error::InvalidTxInfo {
332                        reason: "Previous receipt proof verification failed"
333                            .to_owned()
334                    });
335                }
336
337                prev_receipt.accumulated_gas_used
338            }
339
340            // not the first receipt but no previous receipt was provided
341            (_, maybe_prev_receipt, maybe_prev_receipt_proof) => {
342                bail!(Error::InvalidTxInfo {
343                    reason: format!(
344                        "Expected two receipts; received one.
345                        tx_index_in_block = {:?},
346                        maybe_prev_receipt = {:?},
347                        maybe_prev_receipt_proof = {:?}",
348                        tx_index_in_block,
349                        maybe_prev_receipt,
350                        maybe_prev_receipt_proof
351                    )
352                });
353            }
354        };
355
356        // store
357        let tx_index = TransactionIndex {
358            block_hash,
359            real_index: tx_index_in_block,
360            is_phantom: false,
361            rpc_index: None,
362        };
363
364        self.verified
365            .write()
366            .entry(tx_hash)
367            .or_insert(PendingItem::pending())
368            .set(TxInfoValidated {
369                tx,
370                receipt,
371                tx_index,
372                prior_gas_used,
373            });
374
375        Ok(())
376    }
377
378    #[inline]
379    pub fn clean_up(&self) {
380        // remove timeout in-flight requests
381        let timeout = *TX_INFO_REQUEST_TIMEOUT;
382        let infos = self.sync_manager.remove_timeout_requests(timeout);
383        trace!("Timeout tx-infos ({}): {:?}", infos.len(), infos);
384        self.sync_manager.insert_waiting(infos.into_iter());
385
386        // trigger cache cleanup
387        self.verified.write().get(&Default::default());
388    }
389
390    #[inline]
391    fn send_request(
392        &self, io: &dyn NetworkContext, peer: &NodeId, hashes: Vec<H256>,
393    ) -> Result<Option<RequestId>> {
394        if hashes.is_empty() {
395            return Ok(None);
396        }
397
398        let request_id = self.request_id_allocator.next();
399
400        trace!(
401            "send_request GetTxInfos peer={:?} id={:?} hashes={:?}",
402            peer,
403            request_id,
404            hashes
405        );
406
407        let msg: Box<dyn Message> = Box::new(GetTxInfos { request_id, hashes });
408
409        msg.send(io, peer)?;
410        Ok(Some(request_id))
411    }
412
413    #[inline]
414    pub fn sync(&self, io: &dyn NetworkContext) {
415        self.sync_manager.sync(
416            MAX_TX_INFOS_IN_FLIGHT,
417            TX_INFO_REQUEST_BATCH_SIZE,
418            |peer, hashes| self.send_request(io, peer, hashes),
419        );
420    }
421}