1use super::ConsensusExecutionHandler;
2use std::{collections::BTreeSet, convert::From, sync::Arc};
3
4use alloy_rpc_types_trace::geth::GethDebugTracingOptions;
5use cfx_parameters::genesis::GENESIS_ACCOUNT_ADDRESS;
6use geth_tracer::{GethTraceWithHash, GethTracer, TxExecContext};
7use pow_types::StakingEvent;
8
9use cfx_statedb::{Error as DbErrorKind, Result as DbResult};
10use cfx_types::{AddressSpaceUtil, Space, SpaceMap, H256, U256};
11use primitives::{
12 receipt::BlockReceipts, AccessListItem, Action, Block, BlockNumber,
13 Receipt, SignedTransaction, TransactionIndex,
14};
15
16use crate::{
17 block_data_manager::BlockDataManager,
18 consensus::consensus_inner::consensus_executor::GOOD_TPS_METER,
19};
20use cfx_execute_helper::{
21 exec_tracer::TransactionExecTraces,
22 observer::Observer,
23 tx_outcome::{make_process_tx_outcome, ProcessTxOutcome},
24};
25use cfx_executor::{
26 executive::{ExecutiveContext, TransactOptions, TransactSettings},
27 internal_contract::{
28 block_hash_slot, epoch_hash_slot, initialize_internal_contract_accounts,
29 },
30 state::{
31 initialize_cip107, initialize_cip137,
32 initialize_or_update_dao_voted_params, State,
33 },
34};
35use cfx_vm_types::Env;
36
37pub enum VirtualCall<'a> {
38 GethTrace(GethTask<'a>),
39}
40
41pub struct GethTask<'a> {
42 pub(super) tx_hash: Option<H256>,
43 pub(super) opts: GethDebugTracingOptions,
44 pub(super) answer: &'a mut Vec<GethTraceWithHash>,
45}
46
47impl ConsensusExecutionHandler {
48 pub(super) fn process_epoch_transactions<'a>(
49 &self, state: &mut State, epoch_blocks: &Vec<Arc<Block>>,
50 start_block_number: u64, on_local_pivot: bool,
51 virtual_call: Option<VirtualCall<'a>>,
52 ) -> DbResult<Vec<Arc<BlockReceipts>>> {
53 self.prefetch_storage_for_execution(state, epoch_blocks);
54
55 let pivot_block = epoch_blocks.last().expect("Epoch not empty");
56
57 let dry_run = virtual_call.is_some();
58
59 self.before_epoch_execution(state, &*pivot_block)?;
60
61 let base_gas_price =
62 pivot_block.block_header.base_price().unwrap_or_default();
63
64 let burnt_gas_price =
65 base_gas_price.map_all(|x| state.burnt_gas_price(x));
66 let context = EpochProcessContext {
67 on_local_pivot,
68 executive_trace: self.config.executive_trace,
69 dry_run,
70 virtual_call,
71 pivot_block,
72 base_gas_price,
73 burnt_gas_price,
74 };
75
76 let mut epoch_recorder = EpochProcessRecorder::new();
77
78 let mut block_context = BlockProcessContext::first_block(
79 &context,
80 epoch_blocks.first().unwrap(),
81 start_block_number,
82 );
83
84 for (idx, block) in epoch_blocks.iter().enumerate() {
85 if idx > 0 {
86 block_context.next_block(block);
87 }
88
89 self.process_block_transactions(
90 &block_context,
91 state,
92 &mut epoch_recorder,
93 )?;
94 }
95
96 if let Some(VirtualCall::GethTrace(task)) = context.virtual_call {
97 std::mem::swap(&mut epoch_recorder.geth_traces, task.answer);
98 }
99
100 if !dry_run && self.pos_verifier.pos_option().is_some() {
101 debug!(
102 "put_staking_events: {:?} height={} len={}",
103 pivot_block.hash(),
104 pivot_block.block_header.height(),
105 epoch_recorder.staking_events.len()
106 );
107 self.pos_verifier
108 .consensus_db()
109 .put_staking_events(
110 pivot_block.block_header.height(),
111 pivot_block.hash(),
112 epoch_recorder.staking_events,
113 )
114 .map_err(|e| {
115 cfx_statedb::Error::from(DbErrorKind::PosDatabaseError(
116 format!("{:?}", e),
117 ))
118 })?;
119 }
120
121 if !dry_run && on_local_pivot {
122 self.tx_pool.recycle_transactions(epoch_recorder.repack_tx);
123 }
124
125 debug!("Finish processing tx for epoch");
126 Ok(epoch_recorder.receipts)
127 }
128
129 fn prefetch_storage_for_execution(
130 &self, state: &mut State, epoch_blocks: &Vec<Arc<Block>>,
131 ) {
132 let pool = if let Some(prefetcher) =
136 self.execution_state_prefetcher.as_ref()
137 {
138 prefetcher
139 } else {
140 return;
141 };
142
143 let mut accounts = BTreeSet::new();
144 for block in epoch_blocks.iter() {
145 for transaction in block.transactions.iter() {
146 let space = transaction.space();
147 accounts.insert(transaction.sender.with_space(space));
148 if let Action::Call(ref address) = transaction.action() {
149 accounts.insert(address.with_space(space));
150 }
151 if let Some(access_list) = transaction.access_list() {
152 for AccessListItem { address, .. } in access_list.iter() {
153 accounts.insert(address.with_space(space));
154 }
155 }
156 }
157 }
158 accounts.remove(&GENESIS_ACCOUNT_ADDRESS.with_native_space());
161 let res = state.prefetch_accounts(accounts, pool);
162 if let Err(e) = res {
163 warn!("Fail to prefetch account {:?}", e);
164 }
165 }
166
167 fn make_block_env(&self, block_context: &BlockProcessContext) -> Env {
168 let BlockProcessContext {
169 epoch_context:
170 &EpochProcessContext {
171 pivot_block,
172 base_gas_price,
173 burnt_gas_price,
174 ..
175 },
176 block,
177 block_number,
178 last_hash,
179 } = *block_context;
180
181 let last_block_header = &self.data_man.block_header_by_hash(&last_hash);
182
183 let pos_id = last_block_header
184 .as_ref()
185 .and_then(|header| header.pos_reference().as_ref());
186 let pos_view_number =
187 pos_id.and_then(|id| self.pos_verifier.get_pos_view(id));
188 let pivot_decision_epoch = pos_id
189 .and_then(|id| self.pos_verifier.get_pivot_decision(id))
190 .and_then(|hash| self.data_man.block_header_by_hash(&hash))
191 .map(|header| header.height());
192
193 let epoch_height = pivot_block.block_header.height();
194 let chain_id = self.machine.params().chain_id_map(epoch_height);
195 Env {
196 chain_id,
197 number: block_number,
198 author: block.block_header.author().clone(),
199 timestamp: pivot_block.block_header.timestamp(),
200 difficulty: block.block_header.difficulty().clone(),
201 accumulated_gas_used: U256::zero(),
202 last_hash,
203 gas_limit: U256::from(block.block_header.gas_limit()),
204 epoch_height,
205 pos_view: pos_view_number,
206 finalized_epoch: pivot_decision_epoch,
207 transaction_epoch_bound: self
208 .verification_config
209 .transaction_epoch_bound,
210 base_gas_price,
211 burnt_gas_price,
212 transaction_hash: H256::zero(),
215 ..Default::default()
216 }
217 }
218
219 fn process_block_transactions(
220 &self, block_context: &BlockProcessContext, state: &mut State,
221 epoch_recorder: &mut EpochProcessRecorder,
222 ) -> DbResult<()> {
223 let BlockProcessContext {
224 epoch_context: &EpochProcessContext { on_local_pivot, .. },
225 block,
226 block_number,
227 ..
228 } = *block_context;
229
230 debug!(
231 "process txs in block: hash={:?}, tx count={:?}",
232 block.hash(),
233 block.transactions.len()
234 );
235
236 let secondary_reward =
242 self.before_block_execution(state, block_number, block)?;
243
244 let mut env = self.make_block_env(block_context);
245
246 let mut block_recorder =
247 BlockProcessRecorder::new(epoch_recorder.evm_tx_idx);
248
249 for (idx, transaction) in block.transactions.iter().enumerate() {
250 self.process_transaction(
251 idx,
252 transaction,
253 block_context,
254 state,
255 &mut env,
256 on_local_pivot,
257 &mut block_recorder,
258 )?;
259 }
260
261 block_recorder.finish_block(
262 &self.data_man,
263 epoch_recorder,
264 block_context,
265 secondary_reward,
266 );
267
268 Ok(())
269 }
270
271 fn process_transaction(
272 &self, idx: usize, transaction: &Arc<SignedTransaction>,
273 block_context: &BlockProcessContext, state: &mut State, env: &mut Env,
274 on_local_pivot: bool, recorder: &mut BlockProcessRecorder,
275 ) -> DbResult<()> {
276 let rpc_index = recorder.tx_idx[transaction.space()];
277
278 let block = &block_context.block;
279 let dry_run = block_context.epoch_context.dry_run;
280
281 let machine = self.machine.as_ref();
282
283 let spec = machine.spec(env.number, env.epoch_height);
284
285 let options = TransactOptions {
286 observer: self.make_observer(transaction, block_context),
287 settings: TransactSettings::all_checks(),
288 };
289
290 env.transaction_hash = transaction.hash();
291 let execution_outcome =
292 ExecutiveContext::new(state, env, machine, &spec)
293 .transact(transaction, options)?;
294 state.update_state_post_tx_execution(!spec.cip645.fix_eip1153);
295 execution_outcome.log(transaction, &block_context.block.hash());
296
297 if let Some(burnt_fee) = execution_outcome
298 .try_as_executed()
299 .and_then(|e| e.burnt_fee)
300 {
301 state.burn_by_cip1559(burnt_fee);
302 };
303
304 let r = make_process_tx_outcome(
305 execution_outcome,
306 &mut env.accumulated_gas_used,
307 transaction.hash,
308 &spec,
309 );
310
311 if r.receipt.tx_success() {
312 GOOD_TPS_METER.mark(1);
313 }
314
315 let tx_skipped = r.receipt.tx_skipped();
316 let phantom_txs = r.phantom_txs.clone();
317
318 recorder.receive_tx_outcome(r, transaction, block_context);
319
320 if !on_local_pivot || tx_skipped || dry_run {
321 return Ok(());
323 }
324
325 let hash = transaction.hash();
326
327 self.data_man.insert_transaction_index(
328 &hash,
329 &TransactionIndex {
330 block_hash: block.hash(),
331 real_index: idx,
332 is_phantom: false,
333 rpc_index: Some(rpc_index),
334 },
335 );
336
337 let evm_chain_id = env.chain_id[&Space::Ethereum];
344 let evm_tx_index = &mut recorder.tx_idx[Space::Ethereum];
345
346 for ptx in phantom_txs {
347 self.data_man.insert_transaction_index(
348 &ptx.into_eip155(evm_chain_id).hash(),
349 &TransactionIndex {
350 block_hash: block.hash(),
351 real_index: idx,
352 is_phantom: true,
353 rpc_index: Some(*evm_tx_index),
354 },
355 );
356
357 *evm_tx_index += 1;
358 }
359
360 Ok(())
361 }
362
363 fn make_observer(
364 &self, transaction: &Arc<SignedTransaction>,
365 block_context: &BlockProcessContext,
366 ) -> Observer {
367 use alloy_rpc_types_trace::geth::{
368 GethDebugBuiltInTracerType::*, GethDebugTracerType::BuiltInTracer,
369 };
370
371 let mut observer = if self.config.executive_trace {
372 Observer::with_tracing()
373 } else {
374 Observer::with_no_tracing()
375 };
376
377 if let Some(VirtualCall::GethTrace(ref task)) =
378 block_context.epoch_context.virtual_call
379 {
380 let need_trace =
381 task.tx_hash.map_or(true, |hash| transaction.hash() == hash);
382 let support_tracer = matches!(
383 task.opts.tracer,
384 Some(BuiltInTracer(
385 FourByteTracer | CallTracer | PreStateTracer | NoopTracer
386 )) | None
387 );
388 let tx_gas_limit = transaction.gas_limit().as_u64();
389
390 if need_trace && support_tracer {
391 observer.geth_tracer = Some(GethTracer::new(
392 TxExecContext {
393 tx_gas_limit,
394 block_height: block_context
395 .epoch_context
396 .pivot_block
397 .block_header
398 .height(),
399 block_number: block_context.block_number,
400 },
401 Arc::clone(&self.machine),
402 task.opts.clone(),
403 ))
404 }
405 }
406 observer
407 }
408
409 fn before_epoch_execution(
410 &self, state: &mut State, pivot_block: &Block,
411 ) -> DbResult<()> {
412 let params = self.machine.params();
413
414 let epoch_number = pivot_block.block_header.height();
415 let hash = pivot_block.hash();
416 let parent_hash = pivot_block.block_header.parent_hash();
417
418 if epoch_number >= params.transition_heights.cip133e {
419 state.set_system_storage(
420 epoch_hash_slot(epoch_number).into(),
421 U256::from_big_endian(&hash.0),
422 )?;
423 }
424
425 if epoch_number >= params.transition_heights.eip2935 {
426 state.set_eip2935_storage(epoch_number - 1, *parent_hash)?;
427 }
428 Ok(())
429 }
430
431 pub fn before_block_execution(
432 &self, state: &mut State, block_number: BlockNumber, block: &Block,
433 ) -> DbResult<U256> {
434 let params = self.machine.params();
435 let transition_numbers = ¶ms.transition_numbers;
436
437 let cip94_start = transition_numbers.cip94n;
438 let period = params.params_dao_vote_period;
439 if block_number >= cip94_start
441 && (block_number - cip94_start) % period == 0
442 {
443 let set_pos_staking = block_number > transition_numbers.cip105;
444 initialize_or_update_dao_voted_params(state, set_pos_staking)?;
445 }
446
447 if block_number == transition_numbers.cip107 {
453 initialize_cip107(state)?;
454 }
455
456 if block_number >= transition_numbers.cip133b {
457 state.set_system_storage(
458 block_hash_slot(block_number).into(),
459 U256::from_big_endian(&block.hash().0),
460 )?;
461 }
462
463 if block_number == transition_numbers.cip137 {
464 initialize_cip137(state);
465 }
466
467 if block_number < transition_numbers.cip43a {
468 state.bump_block_number_accumulate_interest();
469 }
470
471 let secondary_reward = state.secondary_reward();
472
473 state.inc_distributable_pos_interest(block_number)?;
474
475 initialize_internal_contract_accounts(
476 state,
477 self.machine
478 .internal_contracts()
479 .initialized_at(block_number),
480 )?;
481
482 state.commit_cache(false);
483
484 Ok(secondary_reward)
485 }
486}
487
488struct EpochProcessContext<'a> {
489 on_local_pivot: bool,
490 executive_trace: bool,
491 virtual_call: Option<VirtualCall<'a>>,
492 dry_run: bool,
493
494 pivot_block: &'a Block,
495
496 base_gas_price: SpaceMap<U256>,
497 burnt_gas_price: SpaceMap<U256>,
498}
499
500struct BlockProcessContext<'a, 'b> {
501 epoch_context: &'b EpochProcessContext<'a>,
502 block: &'b Block,
503 block_number: u64,
504 last_hash: H256,
505}
506
507impl<'a, 'b> BlockProcessContext<'a, 'b> {
508 fn first_block(
509 epoch_context: &'b EpochProcessContext<'a>, block: &'b Block,
510 start_block_number: u64,
511 ) -> Self {
512 let EpochProcessContext { pivot_block, .. } = *epoch_context;
513 let last_hash = *pivot_block.block_header.parent_hash();
514 Self {
515 epoch_context,
516 block,
517 block_number: start_block_number,
518 last_hash,
519 }
520 }
521
522 fn next_block(&mut self, block: &'b Block) {
523 self.last_hash = self.block.hash();
524 self.block_number += 1;
525 self.block = block;
526 }
527}
528
529#[derive(Default)]
530struct EpochProcessRecorder {
531 receipts: Vec<Arc<BlockReceipts>>,
532 staking_events: Vec<StakingEvent>,
533 repack_tx: Vec<Arc<SignedTransaction>>,
534 geth_traces: Vec<GethTraceWithHash>,
535
536 evm_tx_idx: usize,
537}
538
539impl EpochProcessRecorder {
540 fn new() -> Self { Default::default() }
541}
542
543struct BlockProcessRecorder {
544 receipt: Vec<Receipt>,
545 tx_error_msg: Vec<String>,
546 traces: Vec<TransactionExecTraces>,
547 geth_traces: Vec<GethTraceWithHash>,
548 repack_tx: Vec<Arc<SignedTransaction>>,
549 staking_events: Vec<StakingEvent>,
550
551 tx_idx: SpaceMap<usize>,
552}
553
554impl BlockProcessRecorder {
555 fn new(evm_tx_idx: usize) -> BlockProcessRecorder {
556 let mut tx_idx = SpaceMap::default();
557 tx_idx[Space::Ethereum] = evm_tx_idx;
558 Self {
559 receipt: vec![],
560 tx_error_msg: vec![],
561 traces: vec![],
562 geth_traces: vec![],
563 repack_tx: vec![],
564 staking_events: vec![],
565 tx_idx,
566 }
567 }
568
569 fn receive_tx_outcome(
570 &mut self, r: ProcessTxOutcome, tx: &Arc<SignedTransaction>,
571 block_context: &BlockProcessContext,
572 ) {
573 let EpochProcessContext {
574 on_local_pivot,
575 executive_trace,
576 ..
577 } = *block_context.epoch_context;
578
579 if on_local_pivot && r.consider_repacked {
580 self.repack_tx.push(tx.clone())
581 }
582
583 let not_skipped = !r.receipt.tx_skipped();
584
585 if executive_trace {
586 self.traces.push(r.tx_traces.into());
587 }
588
589 self.receipt.push(r.receipt);
590 self.tx_error_msg.push(r.tx_exec_error_msg);
591 self.staking_events.extend(r.tx_staking_events);
592
593 if let Some(trace) = r.geth_trace {
594 self.geth_traces.push(GethTraceWithHash {
595 trace,
596 tx_hash: tx.hash(),
597 space: tx.space(),
598 });
599 }
600
601 match tx.space() {
602 Space::Native => {
603 self.tx_idx[Space::Native] += 1;
604 }
605 Space::Ethereum if not_skipped => {
606 self.tx_idx[Space::Ethereum] += 1;
607 }
608 _ => {}
609 };
610 }
611
612 fn finish_block(
613 self, data_man: &BlockDataManager,
614 epoch_recorder: &mut EpochProcessRecorder,
615 block_context: &BlockProcessContext, secondary_reward: U256,
616 ) {
617 let BlockProcessContext {
618 epoch_context:
619 &EpochProcessContext {
620 on_local_pivot,
621 executive_trace,
622 pivot_block,
623 dry_run,
624 ..
625 },
626 block,
627 block_number,
628 ..
629 } = *block_context;
630
631 let block_receipts = Arc::new(BlockReceipts {
632 receipts: self.receipt,
633 block_number: block_number + 1,
636 secondary_reward,
637 tx_execution_error_messages: self.tx_error_msg,
638 });
639
640 epoch_recorder.receipts.push(block_receipts.clone());
641 epoch_recorder.staking_events.extend(self.staking_events);
642 epoch_recorder.repack_tx.extend(self.repack_tx);
643 epoch_recorder.geth_traces.extend(self.geth_traces);
644
645 epoch_recorder.evm_tx_idx = self.tx_idx[Space::Ethereum];
646
647 if dry_run {
648 return;
649 }
650
651 if executive_trace {
652 data_man.insert_block_traces(
653 block.hash(),
654 self.traces.into(),
655 pivot_block.hash(),
656 on_local_pivot,
657 );
658 }
659
660 data_man.insert_block_execution_result(
661 block.hash(),
662 pivot_block.hash(),
663 block_receipts.clone(),
664 on_local_pivot,
665 );
666 }
667}