cfxcore/consensus/consensus_inner/consensus_executor/
epoch_execution.rs

1use super::ConsensusExecutionHandler;
2use std::{collections::BTreeSet, convert::From, sync::Arc};
3
4use alloy_rpc_types_trace::geth::GethDebugTracingOptions;
5use cfx_parameters::genesis::GENESIS_ACCOUNT_ADDRESS;
6use geth_tracer::{GethTraceWithHash, GethTracer, TxExecContext};
7use pow_types::StakingEvent;
8
9use cfx_statedb::{Error as DbErrorKind, Result as DbResult};
10use cfx_types::{AddressSpaceUtil, Space, SpaceMap, H256, U256};
11use primitives::{
12    receipt::BlockReceipts, AccessListItem, Action, Block, BlockNumber,
13    Receipt, SignedTransaction, TransactionIndex,
14};
15
16use crate::{
17    block_data_manager::BlockDataManager,
18    consensus::consensus_inner::consensus_executor::GOOD_TPS_METER,
19};
20use cfx_execute_helper::{
21    exec_tracer::TransactionExecTraces,
22    observer::Observer,
23    tx_outcome::{make_process_tx_outcome, ProcessTxOutcome},
24};
25use cfx_executor::{
26    executive::{ExecutiveContext, TransactOptions, TransactSettings},
27    internal_contract::{
28        block_hash_slot, epoch_hash_slot, initialize_internal_contract_accounts,
29    },
30    state::{
31        initialize_cip107, initialize_cip137,
32        initialize_or_update_dao_voted_params, State,
33    },
34};
35use cfx_vm_types::Env;
36
37pub enum VirtualCall<'a> {
38    GethTrace(GethTask<'a>),
39}
40
41pub struct GethTask<'a> {
42    pub(super) tx_hash: Option<H256>,
43    pub(super) opts: GethDebugTracingOptions,
44    pub(super) answer: &'a mut Vec<GethTraceWithHash>,
45}
46
47impl ConsensusExecutionHandler {
48    pub(super) fn process_epoch_transactions<'a>(
49        &self, state: &mut State, epoch_blocks: &Vec<Arc<Block>>,
50        start_block_number: u64, on_local_pivot: bool,
51        virtual_call: Option<VirtualCall<'a>>,
52    ) -> DbResult<Vec<Arc<BlockReceipts>>> {
53        self.prefetch_storage_for_execution(state, epoch_blocks);
54
55        let pivot_block = epoch_blocks.last().expect("Epoch not empty");
56
57        let dry_run = virtual_call.is_some();
58
59        self.before_epoch_execution(state, &*pivot_block)?;
60
61        let base_gas_price =
62            pivot_block.block_header.base_price().unwrap_or_default();
63
64        let burnt_gas_price =
65            base_gas_price.map_all(|x| state.burnt_gas_price(x));
66        let context = EpochProcessContext {
67            on_local_pivot,
68            executive_trace: self.config.executive_trace,
69            dry_run,
70            virtual_call,
71            pivot_block,
72            base_gas_price,
73            burnt_gas_price,
74        };
75
76        let mut epoch_recorder = EpochProcessRecorder::new();
77
78        let mut block_context = BlockProcessContext::first_block(
79            &context,
80            epoch_blocks.first().unwrap(),
81            start_block_number,
82        );
83
84        for (idx, block) in epoch_blocks.iter().enumerate() {
85            if idx > 0 {
86                block_context.next_block(block);
87            }
88
89            self.process_block_transactions(
90                &block_context,
91                state,
92                &mut epoch_recorder,
93            )?;
94        }
95
96        if let Some(VirtualCall::GethTrace(task)) = context.virtual_call {
97            std::mem::swap(&mut epoch_recorder.geth_traces, task.answer);
98        }
99
100        if !dry_run && self.pos_verifier.pos_option().is_some() {
101            debug!(
102                "put_staking_events: {:?} height={} len={}",
103                pivot_block.hash(),
104                pivot_block.block_header.height(),
105                epoch_recorder.staking_events.len()
106            );
107            self.pos_verifier
108                .consensus_db()
109                .put_staking_events(
110                    pivot_block.block_header.height(),
111                    pivot_block.hash(),
112                    epoch_recorder.staking_events,
113                )
114                .map_err(|e| {
115                    cfx_statedb::Error::from(DbErrorKind::PosDatabaseError(
116                        format!("{:?}", e),
117                    ))
118                })?;
119        }
120
121        if !dry_run && on_local_pivot {
122            self.tx_pool.recycle_transactions(epoch_recorder.repack_tx);
123        }
124
125        debug!("Finish processing tx for epoch");
126        Ok(epoch_recorder.receipts)
127    }
128
129    fn prefetch_storage_for_execution(
130        &self, state: &mut State, epoch_blocks: &Vec<Arc<Block>>,
131    ) {
132        // Prefetch accounts for transactions.
133        // The return value _prefetch_join_handles is used to join all threads
134        // before the exit of this function.
135        let pool = if let Some(prefetcher) =
136            self.execution_state_prefetcher.as_ref()
137        {
138            prefetcher
139        } else {
140            return;
141        };
142
143        let mut accounts = BTreeSet::new();
144        for block in epoch_blocks.iter() {
145            for transaction in block.transactions.iter() {
146                let space = transaction.space();
147                accounts.insert(transaction.sender.with_space(space));
148                if let Action::Call(ref address) = transaction.action() {
149                    accounts.insert(address.with_space(space));
150                }
151                if let Some(access_list) = transaction.access_list() {
152                    for AccessListItem { address, .. } in access_list.iter() {
153                        accounts.insert(address.with_space(space));
154                    }
155                }
156            }
157        }
158        // Due to an existing bug and special handling of the genesis account,
159        // it can not be prefetched.
160        accounts.remove(&GENESIS_ACCOUNT_ADDRESS.with_native_space());
161        let res = state.prefetch_accounts(accounts, pool);
162        if let Err(e) = res {
163            warn!("Fail to prefetch account {:?}", e);
164        }
165    }
166
167    fn make_block_env(&self, block_context: &BlockProcessContext) -> Env {
168        let BlockProcessContext {
169            epoch_context:
170                &EpochProcessContext {
171                    pivot_block,
172                    base_gas_price,
173                    burnt_gas_price,
174                    ..
175                },
176            block,
177            block_number,
178            last_hash,
179        } = *block_context;
180
181        let last_block_header = &self.data_man.block_header_by_hash(&last_hash);
182
183        let pos_id = last_block_header
184            .as_ref()
185            .and_then(|header| header.pos_reference().as_ref());
186        let pos_view_number =
187            pos_id.and_then(|id| self.pos_verifier.get_pos_view(id));
188        let pivot_decision_epoch = pos_id
189            .and_then(|id| self.pos_verifier.get_pivot_decision(id))
190            .and_then(|hash| self.data_man.block_header_by_hash(&hash))
191            .map(|header| header.height());
192
193        let epoch_height = pivot_block.block_header.height();
194        let chain_id = self.machine.params().chain_id_map(epoch_height);
195        Env {
196            chain_id,
197            number: block_number,
198            author: block.block_header.author().clone(),
199            timestamp: pivot_block.block_header.timestamp(),
200            difficulty: block.block_header.difficulty().clone(),
201            accumulated_gas_used: U256::zero(),
202            last_hash,
203            gas_limit: U256::from(block.block_header.gas_limit()),
204            epoch_height,
205            pos_view: pos_view_number,
206            finalized_epoch: pivot_decision_epoch,
207            transaction_epoch_bound: self
208                .verification_config
209                .transaction_epoch_bound,
210            base_gas_price,
211            burnt_gas_price,
212            // Temporarily set `transaction_hash` to zero; it will be updated
213            // with the actual transaction hash for each transaction.
214            transaction_hash: H256::zero(),
215            ..Default::default()
216        }
217    }
218
219    fn process_block_transactions(
220        &self, block_context: &BlockProcessContext, state: &mut State,
221        epoch_recorder: &mut EpochProcessRecorder,
222    ) -> DbResult<()> {
223        let BlockProcessContext {
224            epoch_context: &EpochProcessContext { on_local_pivot, .. },
225            block,
226            block_number,
227            ..
228        } = *block_context;
229
230        debug!(
231            "process txs in block: hash={:?}, tx count={:?}",
232            block.hash(),
233            block.transactions.len()
234        );
235
236        // TODO: ideally, this function should not have return value.
237        // However, the previous implementation read `secondary_reward` in an
238        // intermediate step. Since we are not sure which steps will influnce
239        // `secondary_reward`, we must `secondary_reward` at the same point to
240        // keep the backward compatible.
241        let secondary_reward =
242            self.before_block_execution(state, block_number, block)?;
243
244        let mut env = self.make_block_env(block_context);
245
246        let mut block_recorder =
247            BlockProcessRecorder::new(epoch_recorder.evm_tx_idx);
248
249        for (idx, transaction) in block.transactions.iter().enumerate() {
250            self.process_transaction(
251                idx,
252                transaction,
253                block_context,
254                state,
255                &mut env,
256                on_local_pivot,
257                &mut block_recorder,
258            )?;
259        }
260
261        block_recorder.finish_block(
262            &self.data_man,
263            epoch_recorder,
264            block_context,
265            secondary_reward,
266        );
267
268        Ok(())
269    }
270
271    fn process_transaction(
272        &self, idx: usize, transaction: &Arc<SignedTransaction>,
273        block_context: &BlockProcessContext, state: &mut State, env: &mut Env,
274        on_local_pivot: bool, recorder: &mut BlockProcessRecorder,
275    ) -> DbResult<()> {
276        let rpc_index = recorder.tx_idx[transaction.space()];
277
278        let block = &block_context.block;
279        let dry_run = block_context.epoch_context.dry_run;
280
281        let machine = self.machine.as_ref();
282
283        let spec = machine.spec(env.number, env.epoch_height);
284
285        let options = TransactOptions {
286            observer: self.make_observer(transaction, block_context),
287            settings: TransactSettings::all_checks(),
288        };
289
290        env.transaction_hash = transaction.hash();
291        let execution_outcome =
292            ExecutiveContext::new(state, env, machine, &spec)
293                .transact(transaction, options)?;
294        state.update_state_post_tx_execution(!spec.cip645.fix_eip1153);
295        execution_outcome.log(transaction, &block_context.block.hash());
296
297        if let Some(burnt_fee) = execution_outcome
298            .try_as_executed()
299            .and_then(|e| e.burnt_fee)
300        {
301            state.burn_by_cip1559(burnt_fee);
302        };
303
304        let r = make_process_tx_outcome(
305            execution_outcome,
306            &mut env.accumulated_gas_used,
307            transaction.hash,
308            &spec,
309        );
310
311        if r.receipt.tx_success() {
312            GOOD_TPS_METER.mark(1);
313        }
314
315        let tx_skipped = r.receipt.tx_skipped();
316        let phantom_txs = r.phantom_txs.clone();
317
318        recorder.receive_tx_outcome(r, transaction, block_context);
319
320        if !on_local_pivot || tx_skipped || dry_run {
321            // Skip transaction index persist
322            return Ok(());
323        }
324
325        let hash = transaction.hash();
326
327        self.data_man.insert_transaction_index(
328            &hash,
329            &TransactionIndex {
330                block_hash: block.hash(),
331                real_index: idx,
332                is_phantom: false,
333                rpc_index: Some(rpc_index),
334            },
335        );
336
337        // persist tx index for phantom transactions.
338        // note: in some cases, pivot chain reorgs will result in
339        // different phantom txs (with different hashes) for the
340        // same Conflux space tx. we do not remove invalidated
341        // hashes here, but leave it up to the RPC layer to handle
342        // this instead.
343        let evm_chain_id = env.chain_id[&Space::Ethereum];
344        let evm_tx_index = &mut recorder.tx_idx[Space::Ethereum];
345
346        for ptx in phantom_txs {
347            self.data_man.insert_transaction_index(
348                &ptx.into_eip155(evm_chain_id).hash(),
349                &TransactionIndex {
350                    block_hash: block.hash(),
351                    real_index: idx,
352                    is_phantom: true,
353                    rpc_index: Some(*evm_tx_index),
354                },
355            );
356
357            *evm_tx_index += 1;
358        }
359
360        Ok(())
361    }
362
363    fn make_observer(
364        &self, transaction: &Arc<SignedTransaction>,
365        block_context: &BlockProcessContext,
366    ) -> Observer {
367        use alloy_rpc_types_trace::geth::{
368            GethDebugBuiltInTracerType::*, GethDebugTracerType::BuiltInTracer,
369        };
370
371        let mut observer = if self.config.executive_trace {
372            Observer::with_tracing()
373        } else {
374            Observer::with_no_tracing()
375        };
376
377        if let Some(VirtualCall::GethTrace(ref task)) =
378            block_context.epoch_context.virtual_call
379        {
380            let need_trace =
381                task.tx_hash.map_or(true, |hash| transaction.hash() == hash);
382            let support_tracer = matches!(
383                task.opts.tracer,
384                Some(BuiltInTracer(
385                    FourByteTracer | CallTracer | PreStateTracer | NoopTracer
386                )) | None
387            );
388            let tx_gas_limit = transaction.gas_limit().as_u64();
389
390            if need_trace && support_tracer {
391                observer.geth_tracer = Some(GethTracer::new(
392                    TxExecContext {
393                        tx_gas_limit,
394                        block_height: block_context
395                            .epoch_context
396                            .pivot_block
397                            .block_header
398                            .height(),
399                        block_number: block_context.block_number,
400                    },
401                    Arc::clone(&self.machine),
402                    task.opts.clone(),
403                ))
404            }
405        }
406        observer
407    }
408
409    fn before_epoch_execution(
410        &self, state: &mut State, pivot_block: &Block,
411    ) -> DbResult<()> {
412        let params = self.machine.params();
413
414        let epoch_number = pivot_block.block_header.height();
415        let hash = pivot_block.hash();
416        let parent_hash = pivot_block.block_header.parent_hash();
417
418        if epoch_number >= params.transition_heights.cip133e {
419            state.set_system_storage(
420                epoch_hash_slot(epoch_number).into(),
421                U256::from_big_endian(&hash.0),
422            )?;
423        }
424
425        if epoch_number >= params.transition_heights.eip2935 {
426            state.set_eip2935_storage(epoch_number - 1, *parent_hash)?;
427        }
428        Ok(())
429    }
430
431    pub fn before_block_execution(
432        &self, state: &mut State, block_number: BlockNumber, block: &Block,
433    ) -> DbResult<U256> {
434        let params = self.machine.params();
435        let transition_numbers = &params.transition_numbers;
436
437        let cip94_start = transition_numbers.cip94n;
438        let period = params.params_dao_vote_period;
439        // Update/initialize parameters before processing rewards.
440        if block_number >= cip94_start
441            && (block_number - cip94_start) % period == 0
442        {
443            let set_pos_staking = block_number > transition_numbers.cip105;
444            initialize_or_update_dao_voted_params(state, set_pos_staking)?;
445        }
446
447        // Initialize old_storage_point_prop_ratio in the state.
448        // The time may not be in the vote period boundary, so this is not
449        // integrated with `initialize_or_update_dao_voted_params`, but
450        // that function will update the value after cip107 is enabled
451        // here.
452        if block_number == transition_numbers.cip107 {
453            initialize_cip107(state)?;
454        }
455
456        if block_number >= transition_numbers.cip133b {
457            state.set_system_storage(
458                block_hash_slot(block_number).into(),
459                U256::from_big_endian(&block.hash().0),
460            )?;
461        }
462
463        if block_number == transition_numbers.cip137 {
464            initialize_cip137(state);
465        }
466
467        if block_number < transition_numbers.cip43a {
468            state.bump_block_number_accumulate_interest();
469        }
470
471        let secondary_reward = state.secondary_reward();
472
473        state.inc_distributable_pos_interest(block_number)?;
474
475        initialize_internal_contract_accounts(
476            state,
477            self.machine
478                .internal_contracts()
479                .initialized_at(block_number),
480        )?;
481
482        state.commit_cache(false);
483
484        Ok(secondary_reward)
485    }
486}
487
488struct EpochProcessContext<'a> {
489    on_local_pivot: bool,
490    executive_trace: bool,
491    virtual_call: Option<VirtualCall<'a>>,
492    dry_run: bool,
493
494    pivot_block: &'a Block,
495
496    base_gas_price: SpaceMap<U256>,
497    burnt_gas_price: SpaceMap<U256>,
498}
499
500struct BlockProcessContext<'a, 'b> {
501    epoch_context: &'b EpochProcessContext<'a>,
502    block: &'b Block,
503    block_number: u64,
504    last_hash: H256,
505}
506
507impl<'a, 'b> BlockProcessContext<'a, 'b> {
508    fn first_block(
509        epoch_context: &'b EpochProcessContext<'a>, block: &'b Block,
510        start_block_number: u64,
511    ) -> Self {
512        let EpochProcessContext { pivot_block, .. } = *epoch_context;
513        let last_hash = *pivot_block.block_header.parent_hash();
514        Self {
515            epoch_context,
516            block,
517            block_number: start_block_number,
518            last_hash,
519        }
520    }
521
522    fn next_block(&mut self, block: &'b Block) {
523        self.last_hash = self.block.hash();
524        self.block_number += 1;
525        self.block = block;
526    }
527}
528
529#[derive(Default)]
530struct EpochProcessRecorder {
531    receipts: Vec<Arc<BlockReceipts>>,
532    staking_events: Vec<StakingEvent>,
533    repack_tx: Vec<Arc<SignedTransaction>>,
534    geth_traces: Vec<GethTraceWithHash>,
535
536    evm_tx_idx: usize,
537}
538
539impl EpochProcessRecorder {
540    fn new() -> Self { Default::default() }
541}
542
543struct BlockProcessRecorder {
544    receipt: Vec<Receipt>,
545    tx_error_msg: Vec<String>,
546    traces: Vec<TransactionExecTraces>,
547    geth_traces: Vec<GethTraceWithHash>,
548    repack_tx: Vec<Arc<SignedTransaction>>,
549    staking_events: Vec<StakingEvent>,
550
551    tx_idx: SpaceMap<usize>,
552}
553
554impl BlockProcessRecorder {
555    fn new(evm_tx_idx: usize) -> BlockProcessRecorder {
556        let mut tx_idx = SpaceMap::default();
557        tx_idx[Space::Ethereum] = evm_tx_idx;
558        Self {
559            receipt: vec![],
560            tx_error_msg: vec![],
561            traces: vec![],
562            geth_traces: vec![],
563            repack_tx: vec![],
564            staking_events: vec![],
565            tx_idx,
566        }
567    }
568
569    fn receive_tx_outcome(
570        &mut self, r: ProcessTxOutcome, tx: &Arc<SignedTransaction>,
571        block_context: &BlockProcessContext,
572    ) {
573        let EpochProcessContext {
574            on_local_pivot,
575            executive_trace,
576            ..
577        } = *block_context.epoch_context;
578
579        if on_local_pivot && r.consider_repacked {
580            self.repack_tx.push(tx.clone())
581        }
582
583        let not_skipped = !r.receipt.tx_skipped();
584
585        if executive_trace {
586            self.traces.push(r.tx_traces.into());
587        }
588
589        self.receipt.push(r.receipt);
590        self.tx_error_msg.push(r.tx_exec_error_msg);
591        self.staking_events.extend(r.tx_staking_events);
592
593        if let Some(trace) = r.geth_trace {
594            self.geth_traces.push(GethTraceWithHash {
595                trace,
596                tx_hash: tx.hash(),
597                space: tx.space(),
598            });
599        }
600
601        match tx.space() {
602            Space::Native => {
603                self.tx_idx[Space::Native] += 1;
604            }
605            Space::Ethereum if not_skipped => {
606                self.tx_idx[Space::Ethereum] += 1;
607            }
608            _ => {}
609        };
610    }
611
612    fn finish_block(
613        self, data_man: &BlockDataManager,
614        epoch_recorder: &mut EpochProcessRecorder,
615        block_context: &BlockProcessContext, secondary_reward: U256,
616    ) {
617        let BlockProcessContext {
618            epoch_context:
619                &EpochProcessContext {
620                    on_local_pivot,
621                    executive_trace,
622                    pivot_block,
623                    dry_run,
624                    ..
625                },
626            block,
627            block_number,
628            ..
629        } = *block_context;
630
631        let block_receipts = Arc::new(BlockReceipts {
632            receipts: self.receipt,
633            // An existing bug makes the block_number is one larger than the
634            // actual.
635            block_number: block_number + 1,
636            secondary_reward,
637            tx_execution_error_messages: self.tx_error_msg,
638        });
639
640        epoch_recorder.receipts.push(block_receipts.clone());
641        epoch_recorder.staking_events.extend(self.staking_events);
642        epoch_recorder.repack_tx.extend(self.repack_tx);
643        epoch_recorder.geth_traces.extend(self.geth_traces);
644
645        epoch_recorder.evm_tx_idx = self.tx_idx[Space::Ethereum];
646
647        if dry_run {
648            return;
649        }
650
651        if executive_trace {
652            data_man.insert_block_traces(
653                block.hash(),
654                self.traces.into(),
655                pivot_block.hash(),
656                on_local_pivot,
657            );
658        }
659
660        data_man.insert_block_execution_result(
661            block.hash(),
662            pivot_block.hash(),
663            block_receipts.clone(),
664            on_local_pivot,
665        );
666    }
667}