cfxcore/consensus/consensus_inner/consensus_executor/
mod.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5mod epoch_execution;
6
7use core::convert::TryFrom;
8use std::{
9    collections::{BTreeMap, BTreeSet, HashMap},
10    convert::From,
11    fmt::{Debug, Formatter},
12    sync::{
13        atomic::{AtomicBool, Ordering::Relaxed},
14        mpsc::{channel, RecvError, Sender, TryRecvError},
15        Arc,
16    },
17    thread::{self, JoinHandle},
18};
19
20use crate::hash::KECCAK_EMPTY_LIST_RLP;
21use parking_lot::{Mutex, RwLock};
22use rayon::{ThreadPool, ThreadPoolBuilder};
23use rustc_hex::ToHex;
24
25use cfx_internal_common::{
26    debug::*, EpochExecutionCommitment, StateRootWithAuxInfo,
27};
28use cfx_parameters::consensus::*;
29use cfx_statedb::{Result as DbResult, StateDb};
30use cfx_storage::{
31    defaults::DEFAULT_EXECUTION_PREFETCH_THREADS, StateIndex,
32    StorageManagerTrait,
33};
34use cfx_types::{
35    AddressSpaceUtil, AllChainID, BigEndianHash, Space, H160, H256,
36    KECCAK_EMPTY_BLOOM, U256, U512,
37};
38use metrics::{register_meter_with_group, Meter, MeterTimer};
39use primitives::{
40    compute_block_number, receipt::BlockReceipts, Block, BlockHeader,
41    BlockHeaderBuilder, SignedTransaction, MERKLE_NULL_NODE,
42};
43
44use crate::{
45    block_data_manager::{BlockDataManager, BlockRewardResult, PosRewardInfo},
46    consensus::{
47        consensus_inner::{
48            consensus_new_block_handler::ConsensusNewBlockHandler,
49            StateBlameInfo,
50        },
51        pos_handler::PosVerifier,
52        ConsensusGraphInner,
53    },
54    errors::{invalid_params_check, Result as CoreResult},
55    verification::{
56        compute_receipts_root, VerificationConfig, VerifyTxLocalMode,
57        VerifyTxMode,
58    },
59    SharedTransactionPool,
60};
61use cfx_execute_helper::estimation::{
62    EstimateExt, EstimateRequest, EstimationContext,
63};
64use cfx_executor::{
65    executive::{ExecutionOutcome, ExecutiveContext},
66    machine::Machine,
67    state::{
68        distribute_pos_interest, update_pos_status, State, StateCommitResult,
69    },
70};
71use cfx_vm_types::{Env, Spec};
72use geth_tracer::GethTraceWithHash;
73
74use alloy_rpc_types_trace::geth::GethDebugTracingOptions;
75use cfx_rpc_eth_types::EvmOverrides;
76
77use self::epoch_execution::{GethTask, VirtualCall};
78
79lazy_static! {
80    static ref CONSENSIS_EXECUTION_TIMER: Arc<dyn Meter> =
81        register_meter_with_group("timer", "consensus::handle_epoch_execution");
82    static ref CONSENSIS_COMPUTE_STATE_FOR_BLOCK_TIMER: Arc<dyn Meter> =
83        register_meter_with_group(
84            "timer",
85            "consensus::compute_state_for_block"
86        );
87    static ref GOOD_TPS_METER: Arc<dyn Meter> =
88        register_meter_with_group("system_metrics", "good_tps");
89}
90
91/// The RewardExecutionInfo struct includes most information to compute rewards
92/// for old epochs
93pub struct RewardExecutionInfo {
94    pub past_block_count: u64,
95    pub epoch_blocks: Vec<Arc<Block>>,
96    pub epoch_block_no_reward: Vec<bool>,
97    pub epoch_block_anticone_difficulties: Vec<U512>,
98}
99
100impl Debug for RewardExecutionInfo {
101    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
102        write!(
103            f,
104            "RewardExecutionInfo{{ past_block_count: {} \
105             epoch_blocks: {:?} \
106             epoch_block_no_reward: {:?} \
107             epoch_block_anticone_difficulties: {:?}}}",
108            self.past_block_count,
109            self.epoch_blocks
110                .iter()
111                .map(|b| b.hash())
112                .collect::<Vec<H256>>(),
113            self.epoch_block_no_reward,
114            self.epoch_block_anticone_difficulties
115        )
116    }
117}
118
119#[derive(Debug)]
120enum ExecutionTask {
121    ExecuteEpoch(EpochExecutionTask),
122    GetResult(GetExecutionResultTask),
123
124    /// Stop task is used to stop the execution thread
125    Stop,
126}
127
128/// The EpochExecutionTask struct includes all the information needed to execute
129/// an epoch
130#[derive(Debug)]
131pub struct EpochExecutionTask {
132    epoch_hash: H256,
133    epoch_block_hashes: Vec<H256>,
134    start_block_number: u64,
135    reward_info: Option<RewardExecutionInfo>,
136    // TODO:
137    //  on_local_pivot should be computed at the beginning of the
138    //  epoch execution, not to be set from task.
139    on_local_pivot: bool,
140    force_recompute: bool,
141}
142
143impl EpochExecutionTask {
144    pub fn new(
145        epoch_arena_index: usize, inner: &ConsensusGraphInner,
146        reward_execution_info: Option<RewardExecutionInfo>,
147        on_local_pivot: bool, force_recompute: bool,
148    ) -> Self {
149        Self {
150            epoch_hash: inner.arena[epoch_arena_index].hash,
151            epoch_block_hashes: inner.get_epoch_block_hashes(epoch_arena_index),
152            start_block_number: inner
153                .get_epoch_start_block_number(epoch_arena_index),
154            reward_info: reward_execution_info,
155            on_local_pivot,
156            force_recompute,
157        }
158    }
159}
160
161/// `sender` is used to return the computed `(state_root, receipts_root,
162/// logs_bloom_hash)` to the thread who sends this task.
163#[derive(Debug)]
164struct GetExecutionResultTask {
165    pub epoch_hash: H256,
166    pub sender: Sender<Option<EpochExecutionCommitment>>,
167}
168
169/// ConsensusExecutor processes transaction execution tasks.
170pub struct ConsensusExecutor {
171    /// The thread responsible for execution transactions
172    thread: Mutex<Option<JoinHandle<()>>>,
173
174    /// The sender to send tasks to be executed by `self.thread`
175    sender: Mutex<Sender<ExecutionTask>>,
176
177    /// The state indicating whether the thread should be stopped
178    stopped: AtomicBool,
179
180    /// The handler to provide functions to handle `ExecutionTask` and execute
181    /// transactions It is used both asynchronously by `self.thread` and
182    /// synchronously by the executor itself
183    pub handler: Arc<ConsensusExecutionHandler>,
184
185    consensus_graph_bench_mode: bool,
186}
187
188impl ConsensusExecutor {
189    pub fn start(
190        tx_pool: SharedTransactionPool, data_man: Arc<BlockDataManager>,
191        consensus_inner: Arc<RwLock<ConsensusGraphInner>>,
192        config: ConsensusExecutionConfiguration,
193        verification_config: VerificationConfig, bench_mode: bool,
194        pos_verifier: Arc<PosVerifier>,
195    ) -> Arc<Self> {
196        let machine = tx_pool.machine();
197        let handler = Arc::new(ConsensusExecutionHandler::new(
198            tx_pool,
199            data_man.clone(),
200            config,
201            verification_config,
202            machine,
203            pos_verifier,
204        ));
205        let (sender, receiver) = channel();
206
207        let executor_raw = ConsensusExecutor {
208            thread: Mutex::new(None),
209            sender: Mutex::new(sender),
210            stopped: AtomicBool::new(false),
211            handler: handler.clone(),
212            consensus_graph_bench_mode: bench_mode,
213        };
214        let executor = Arc::new(executor_raw);
215        let executor_thread = executor.clone();
216        // It receives blocks hashes from on_new_block and execute them
217        let handle = thread::Builder::new()
218            .name("Consensus Execution Worker".into())
219            .spawn(move || loop {
220                if executor_thread.stopped.load(Relaxed) {
221                    // The thread should be stopped. The rest tasks in the queue
222                    // will be discarded.
223                    break;
224                }
225
226                let get_optimistic_task = || {
227                    let mut inner = consensus_inner.try_write()?;
228
229                    let task = executor_thread
230                        .get_optimistic_execution_task(&mut *inner)?;
231
232                    debug!("Get optimistic_execution_task {:?}", task);
233                    Some(ExecutionTask::ExecuteEpoch(task))
234                };
235
236                let maybe_task = {
237                    // Here we use `try_write` because some thread
238                    // may wait for execution results while holding the
239                    // Consensus Inner lock, if we wait on
240                    // inner lock here we may get deadlock.
241                    match receiver.try_recv() {
242                        Ok(task) => Some(task),
243                        Err(TryRecvError::Empty) => {
244                            // The channel is empty, so we try to optimistically
245                            // get later epochs to execute.
246                            get_optimistic_task()
247                        }
248                        Err(TryRecvError::Disconnected) => {
249                            info!("Channel disconnected, stop thread");
250                            break;
251                        }
252                    }
253                };
254                let task = match maybe_task {
255                    Some(task) => task,
256                    None => {
257                        //  Even optimistic tasks are all finished, so we block
258                        // and wait for  new execution
259                        // tasks.  New optimistic tasks
260                        // will only exist if pivot_chain changes,
261                        //  and new tasks will be sent to `receiver` in this
262                        // case, so this waiting will
263                        // not prevent new optimistic tasks from being executed.
264                        match receiver.recv() {
265                            Ok(task) => task,
266                            Err(RecvError) => {
267                                info!("Channel receive error, stop thread");
268                                break;
269                            }
270                        }
271                    }
272                };
273                if !handler.handle_execution_work(task) {
274                    // `task` is `Stop`, so just stop.
275                    break;
276                }
277            })
278            .expect("Cannot fail");
279        *executor.thread.lock() = Some(handle);
280        executor
281    }
282
283    // TODO: The comments and method name are not precise,
284    // TODO: given the single-threaded design.
285    /// Wait until all tasks currently in the queue to be executed and return
286    /// `(state_root, receipts_root, logs_bloom_hash)` of the given
287    /// `epoch_hash`.
288    ///
289    /// It is the caller's responsibility to ensure that `epoch_hash` is indeed
290    /// computed when all the tasks before are finished.
291    // TODO Release Consensus inner lock if possible when the function is called
292    pub fn wait_for_result(
293        &self, epoch_hash: H256,
294    ) -> Result<EpochExecutionCommitment, String> {
295        // In consensus_graph_bench_mode execution is skipped.
296        if self.consensus_graph_bench_mode {
297            Ok(EpochExecutionCommitment {
298                state_root_with_aux_info: StateRootWithAuxInfo::genesis(
299                    &MERKLE_NULL_NODE,
300                ),
301                receipts_root: KECCAK_EMPTY_LIST_RLP,
302                logs_bloom_hash: KECCAK_EMPTY_BLOOM,
303            })
304        } else {
305            if self.handler.data_man.epoch_executed(&epoch_hash) {
306                // The epoch already executed, so we do not need wait for the
307                // queue to be empty
308                return self
309                    .handler
310                    .get_execution_result(&epoch_hash).ok_or("Cannot get expected execution results from the data base. Probably the database is corrupted!".to_string());
311            }
312            let (sender, receiver) = channel();
313            debug!("Wait for execution result of epoch {:?}", epoch_hash);
314            self.sender
315                .lock()
316                .send(ExecutionTask::GetResult(GetExecutionResultTask {
317                    epoch_hash,
318                    sender,
319                }))
320                .expect("Cannot fail");
321            receiver.recv().map_err(|e| e.to_string())?.ok_or(
322                "Waiting for an execution result that is not enqueued!"
323                    .to_string(),
324            )
325        }
326    }
327
328    fn get_optimistic_execution_task(
329        &self, inner: &mut ConsensusGraphInner,
330    ) -> Option<EpochExecutionTask> {
331        if !inner.inner_conf.enable_optimistic_execution {
332            return None;
333        }
334
335        let epoch_arena_index = {
336            let mut state_availability_boundary =
337                inner.data_man.state_availability_boundary.write();
338            let opt_height =
339                state_availability_boundary.optimistic_executed_height?;
340            if opt_height != state_availability_boundary.upper_bound + 1 {
341                // The `opt_height` parent's state has not been executed.
342                // This may happen when the pivot chain switches between
343                // the checks of the execution queue and the opt task.
344                return None;
345            }
346            let next_opt_height = opt_height + 1;
347            if next_opt_height
348                >= inner.pivot_index_to_height(inner.pivot_chain.len())
349            {
350                state_availability_boundary.optimistic_executed_height = None;
351            } else {
352                state_availability_boundary.optimistic_executed_height =
353                    Some(next_opt_height);
354            }
355            inner.get_pivot_block_arena_index(opt_height)
356        };
357
358        // `on_local_pivot` is set to `true` because when we later skip its
359        // execution on pivot chain, we will not notify tx pool, so we
360        // will also notify in advance.
361        let reward_execution_info =
362            self.get_reward_execution_info(inner, epoch_arena_index);
363        let execution_task = EpochExecutionTask::new(
364            epoch_arena_index,
365            inner,
366            reward_execution_info,
367            true,  /* on_local_pivot */
368            false, /* force_compute */
369        );
370        Some(execution_task)
371    }
372
373    pub fn get_reward_execution_info_from_index(
374        &self, inner: &mut ConsensusGraphInner,
375        reward_index: Option<(usize, usize)>,
376    ) -> Option<RewardExecutionInfo> {
377        reward_index.map(
378            |(pivot_arena_index, anticone_penalty_cutoff_epoch_arena_index)| {
379                // We have to wait here because blame information will determine the reward of each block.
380                // In order to compute the correct blame information locally, we have to wait for the execution to return.
381                let height = inner.arena[pivot_arena_index].height;
382                if !self.consensus_graph_bench_mode
383                {
384                    debug!(
385                        "wait_and_compute_state_valid_locked, idx = {}, \
386                         height = {}, era_genesis_height = {} era_stable_height = {}",
387                        pivot_arena_index, height, inner.cur_era_genesis_height, inner.cur_era_stable_height
388                    );
389                    self.wait_and_compute_state_valid_and_blame_info_locked(
390                        pivot_arena_index,
391                        inner,
392                    )
393                        .unwrap();
394                }
395
396                let epoch_blocks =
397                    inner.get_executable_epoch_blocks(pivot_arena_index);
398
399                let mut epoch_block_no_reward =
400                    Vec::with_capacity(epoch_blocks.len());
401                let mut epoch_block_anticone_difficulties =
402                    Vec::with_capacity(epoch_blocks.len());
403
404                let epoch_difficulty =
405                    inner.arena[pivot_arena_index].difficulty;
406                let anticone_cutoff_epoch_anticone_set_ref_opt = inner
407                    .anticone_cache
408                    .get(anticone_penalty_cutoff_epoch_arena_index);
409                let anticone_cutoff_epoch_anticone_set;
410                if let Some(r) = anticone_cutoff_epoch_anticone_set_ref_opt {
411                    anticone_cutoff_epoch_anticone_set = r.clone();
412                } else {
413                    anticone_cutoff_epoch_anticone_set = ConsensusNewBlockHandler::compute_anticone_hashset_bruteforce(inner, anticone_penalty_cutoff_epoch_arena_index);
414                }
415                let ordered_epoch_blocks = inner.get_ordered_executable_epoch_blocks(pivot_arena_index).clone();
416                for index in ordered_epoch_blocks.iter() {
417                    let block_consensus_node = &inner.arena[*index];
418
419                    let mut no_reward =
420                        block_consensus_node.data.partial_invalid;
421                    if !self.consensus_graph_bench_mode && !no_reward {
422                        if *index == pivot_arena_index {
423                            no_reward = !inner.arena[pivot_arena_index]
424                                .data
425                                .state_valid.expect("computed in wait_and_compute_state_valid_locked");
426                        } else {
427                            no_reward = !inner
428                                .compute_vote_valid_for_pivot_block(
429                                    *index,
430                                    pivot_arena_index,
431                                );
432                        }
433                    }
434                    // If a block is partial_invalid, it won't have reward and
435                    // anticone_difficulty will not be used, so it's okay to set
436                    // it to 0.
437                    let mut anticone_difficulty: U512 = 0.into();
438                    if !no_reward {
439                        let block_consensus_node_anticone_opt =
440                            inner.anticone_cache.get(*index);
441                        let block_consensus_node_anticone = if let Some(r) = block_consensus_node_anticone_opt {
442                            r.clone()
443                        } else {
444                            ConsensusNewBlockHandler::compute_anticone_hashset_bruteforce(inner, *index)
445                        };
446
447                        for idx in block_consensus_node_anticone {
448                            if inner.is_same_era(idx, pivot_arena_index) && !anticone_cutoff_epoch_anticone_set.contains(&idx) {
449                                anticone_difficulty +=
450                                    U512::from(U256::from(inner.block_weight(
451                                        idx
452                                    )));
453                            }
454                        }
455
456                        // TODO: check the clear definition of anticone penalty,
457                        // normally and around the time of difficulty
458                        // adjustment.
459                        // LINT.IfChange(ANTICONE_PENALTY_1)
460                        if anticone_difficulty / U512::from(epoch_difficulty)
461                            >= U512::from(self.handler.machine.params().anticone_penalty_ratio)
462                        {
463                            no_reward = true;
464                        }
465                        // LINT.ThenChange(consensus/consensus_executor.
466                        // rs#ANTICONE_PENALTY_2)
467                    }
468                    epoch_block_no_reward.push(no_reward);
469                    epoch_block_anticone_difficulties.push(anticone_difficulty);
470                }
471                RewardExecutionInfo {
472                    past_block_count: inner.arena[pivot_arena_index].past_num_blocks,
473                    epoch_blocks,
474                    epoch_block_no_reward,
475                    epoch_block_anticone_difficulties,
476                }
477            },
478        )
479    }
480
481    pub fn get_reward_execution_info(
482        &self, inner: &mut ConsensusGraphInner, epoch_arena_index: usize,
483    ) -> Option<RewardExecutionInfo> {
484        self.get_reward_execution_info_from_index(
485            inner,
486            inner.get_pivot_reward_index(epoch_arena_index),
487        )
488    }
489
490    /// Wait for the deferred state to be executed and compute `state_valid` and
491    /// `blame_info` for `me`.
492    fn wait_and_compute_state_valid_and_blame_info(
493        &self, me: usize, inner_lock: &RwLock<ConsensusGraphInner>,
494    ) -> Result<(), String> {
495        // TODO:
496        //  can we only wait for the deferred block?
497        //  waiting for its parent seems redundant.
498        // We go up from deferred state block of `me`
499        // and find all states whose commitments are missing
500        let waiting_blocks = inner_lock
501            .read()
502            .collect_defer_blocks_missing_execution_commitments(me)?;
503        // Now we wait without holding the inner lock
504        // Note that we must use hash instead of index because once we release
505        // the lock, there might be a checkpoint coming in to break
506        // index
507        for state_block_hash in waiting_blocks {
508            let commitment = self.wait_for_result(state_block_hash)?;
509            self.handler.data_man.insert_epoch_execution_commitment(
510                state_block_hash,
511                commitment.state_root_with_aux_info,
512                commitment.receipts_root,
513                commitment.logs_bloom_hash,
514            );
515        }
516        // Now we need to wait for the execution information of all missing
517        // blocks to come back
518        // TODO: can we merge the state valid computation into the consensus
519        // executor?
520        inner_lock
521            .write()
522            .compute_state_valid_and_blame_info(me, self)?;
523        Ok(())
524    }
525
526    fn wait_and_compute_state_valid_and_blame_info_locked(
527        &self, me: usize, inner: &mut ConsensusGraphInner,
528    ) -> Result<(), String> {
529        // TODO:
530        //  can we only wait for the deferred block?
531        //  waiting for its parent seems redundant.
532        // We go up from deferred state block of `me`
533        // and find all states whose commitments are missing
534        let waiting_blocks =
535            inner.collect_defer_blocks_missing_execution_commitments(me)?;
536        trace!(
537            "wait_and_compute_state_valid_locked: waiting_blocks={:?}",
538            waiting_blocks
539        );
540        // for this rare case, we should make wait_for_result to pop up errors!
541        for state_block_hash in waiting_blocks {
542            self.wait_for_result(state_block_hash)?;
543        }
544        // Now we need to wait for the execution information of all missing
545        // blocks to come back
546        // TODO: can we merge the state valid computation into the consensus
547        // executor?
548        inner.compute_state_valid_and_blame_info(me, self)?;
549        Ok(())
550    }
551
552    pub fn get_blame_and_deferred_state_for_generation(
553        &self, parent_block_hash: &H256,
554        inner_lock: &RwLock<ConsensusGraphInner>,
555    ) -> Result<StateBlameInfo, String> {
556        let (parent_arena_index, last_state_block) = {
557            let inner = inner_lock.read();
558            let parent_opt = inner.hash_to_arena_indices.get(parent_block_hash);
559            if parent_opt.is_none() {
560                return Err(format!(
561                    "Too old parent for generation, parent_hash={:?}",
562                    parent_block_hash
563                ));
564            }
565            (
566                *parent_opt.unwrap(),
567                inner
568                    .get_state_block_with_delay(
569                        parent_block_hash,
570                        DEFERRED_STATE_EPOCH_COUNT as usize - 1,
571                    )?
572                    .clone(),
573            )
574        };
575        let last_result = self.wait_for_result(last_state_block)?;
576        self.wait_and_compute_state_valid_and_blame_info(
577            parent_arena_index,
578            inner_lock,
579        )?;
580        {
581            let inner = &mut *inner_lock.write();
582            if inner.arena[parent_arena_index].hash == *parent_block_hash {
583                Ok(inner.compute_blame_and_state_with_execution_result(
584                    parent_arena_index,
585                    last_result
586                        .state_root_with_aux_info
587                        .aux_info
588                        .state_root_hash,
589                    last_result.receipts_root,
590                    last_result.logs_bloom_hash,
591                )?)
592            } else {
593                Err("Too old parent/subtree to prepare for generation"
594                    .to_owned())
595            }
596        }
597    }
598
599    /// Enqueue the epoch to be executed by the background execution thread
600    /// The parameters are needed for the thread to execute this epoch without
601    /// holding inner lock.
602    pub fn enqueue_epoch(&self, task: EpochExecutionTask) -> bool {
603        if !self.consensus_graph_bench_mode {
604            self.sender
605                .lock()
606                .send(ExecutionTask::ExecuteEpoch(task))
607                .is_ok()
608        } else {
609            true
610        }
611    }
612
613    /// Execute the epoch synchronously
614    pub fn compute_epoch(
615        &self, task: EpochExecutionTask,
616        debug_record: Option<&mut ComputeEpochDebugRecord>,
617        recover_mpt_during_construct_pivot_state: bool,
618    ) {
619        if self.consensus_graph_bench_mode {
620            return;
621        }
622        self.handler.handle_epoch_execution(
623            task,
624            debug_record,
625            recover_mpt_during_construct_pivot_state,
626        )
627    }
628
629    pub fn epoch_executed_and_recovered(
630        &self, epoch_hash: &H256, epoch_block_hashes: &Vec<H256>,
631        on_local_pivot: bool,
632        reward_execution_info: &Option<RewardExecutionInfo>, epoch_height: u64,
633    ) -> bool {
634        self.handler.epoch_executed_and_recovered(
635            epoch_hash,
636            epoch_block_hashes,
637            on_local_pivot,
638            reward_execution_info,
639            epoch_height,
640        )
641    }
642
643    pub fn call_virtual(
644        &self, tx: &SignedTransaction, epoch_id: &H256, epoch_size: usize,
645        request: EstimateRequest, evm_overrides: EvmOverrides,
646    ) -> CoreResult<(ExecutionOutcome, EstimateExt)> {
647        self.handler.call_virtual(
648            tx,
649            epoch_id,
650            epoch_size,
651            request,
652            evm_overrides,
653        )
654    }
655
656    pub fn collect_blocks_geth_trace(
657        &self, epoch_id: H256, epoch_num: u64, blocks: &Vec<Arc<Block>>,
658        opts: GethDebugTracingOptions, tx_hash: Option<H256>,
659    ) -> CoreResult<Vec<GethTraceWithHash>> {
660        self.handler.collect_blocks_geth_trace(
661            epoch_id, epoch_num, blocks, opts, tx_hash,
662        )
663    }
664
665    pub fn stop(&self) {
666        // `stopped` is used to allow the execution thread to stopped even the
667        // queue is not empty and `ExecutionTask::Stop` has not been
668        // processed.
669        self.stopped.store(true, Relaxed);
670
671        // We still need this task because otherwise if the execution queue is
672        // empty the execution thread will block on `recv` forever and
673        // unable to check `stopped`
674        self.sender
675            .lock()
676            .send(ExecutionTask::Stop)
677            .expect("execution receiver exists");
678        if let Some(thread) = self.thread.lock().take() {
679            thread.join().ok();
680        }
681    }
682
683    /// Binary search to find the starting point so we can execute to the end of
684    /// the chain.
685    /// Return the first index that is not executed,
686    /// or return `chain.len()` if they are all executed (impossible for now).
687    ///
688    /// NOTE: If a state for an block exists, all the blocks on its pivot chain
689    /// must have been executed and state committed. The receipts for these
690    /// past blocks may not exist because the receipts on forks will be
691    /// garbage-collected, but when we need them, we will recompute these
692    /// missing receipts in `process_rewards_and_fees`. This 'recompute' is safe
693    /// because the parent state exists. Thus, it's okay that here we do not
694    /// check existence of the receipts that will be needed for reward
695    /// computation during epoch execution.
696    fn find_start_chain_index(
697        inner: &ConsensusGraphInner, chain: &Vec<usize>,
698    ) -> usize {
699        let mut base = 0;
700        let mut size = chain.len();
701        while size > 1 {
702            let half = size / 2;
703            let mid = base + half;
704            let epoch_hash = inner.arena[chain[mid]].hash;
705            base = if inner.data_man.epoch_executed(&epoch_hash) {
706                mid
707            } else {
708                base
709            };
710            size -= half;
711        }
712        let epoch_hash = inner.arena[chain[base]].hash;
713        if inner.data_man.epoch_executed(&epoch_hash) {
714            base + 1
715        } else {
716            base
717        }
718    }
719
720    // TODO:
721    //  this method contains bugs but it's not a big problem since
722    //  it's test-rpc only.
723    /// This is a blocking call to force the execution engine to compute the
724    /// state of a block immediately
725    pub fn compute_state_for_block(
726        &self, block_hash: &H256, inner: &mut ConsensusGraphInner,
727    ) -> Result<(), String> {
728        let _timer = MeterTimer::time_func(
729            CONSENSIS_COMPUTE_STATE_FOR_BLOCK_TIMER.as_ref(),
730        );
731        // If we already computed the state of the block before, we should not
732        // do it again
733        debug!("compute_state_for_block {:?}", block_hash);
734        {
735            let maybe_state_index =
736                self.handler.data_man.get_state_readonly_index(&block_hash);
737            // The state is computed and is retrievable from storage.
738            if let Some(maybe_cached_state_result) =
739                maybe_state_index.map(|state_readonly_index| {
740                    self.handler.data_man.storage_manager.get_state_no_commit(
741                        state_readonly_index,
742                        /* try_open = */ false,
743                        None,
744                    )
745                })
746            {
747                if let Ok(Some(_)) = maybe_cached_state_result {
748                    return Ok(());
749                } else {
750                    return Err("Internal storage error".to_owned());
751                }
752            }
753        }
754        let me_opt = inner.hash_to_arena_indices.get(block_hash);
755        if me_opt == None {
756            return Err("Block hash not found!".to_owned());
757        }
758        // FIXME: isolate this part as a method.
759        let me: usize = *me_opt.unwrap();
760        let block_height = inner.arena[me].height;
761        let mut fork_height = block_height;
762        let mut chain: Vec<usize> = Vec::new();
763        let mut idx = me;
764        // FIXME: this is wrong, however.
765        while fork_height > 0
766            && (fork_height >= inner.get_pivot_height()
767                || inner.get_pivot_block_arena_index(fork_height) != idx)
768        {
769            chain.push(idx);
770            fork_height -= 1;
771            idx = inner.arena[idx].parent;
772        }
773        // FIXME: this is wrong, however.
774        // Because we have genesis at height 0, this should always be true
775        debug_assert!(inner.get_pivot_block_arena_index(fork_height) == idx);
776        debug!("Forked at index {} height {}", idx, fork_height);
777        chain.push(idx);
778        chain.reverse();
779        let start_chain_index =
780            ConsensusExecutor::find_start_chain_index(inner, &chain);
781        debug!("Start execution from index {}", start_chain_index);
782
783        // We need the state of the fork point to start executing the fork
784        if start_chain_index == 0 {
785            let mut last_state_height =
786                if inner.get_pivot_height() > DEFERRED_STATE_EPOCH_COUNT {
787                    inner.get_pivot_height() - DEFERRED_STATE_EPOCH_COUNT
788                } else {
789                    0
790                };
791
792            last_state_height += 1;
793            while last_state_height < fork_height {
794                let epoch_arena_index =
795                    inner.get_pivot_block_arena_index(last_state_height);
796                let reward_execution_info =
797                    self.get_reward_execution_info(inner, epoch_arena_index);
798                self.enqueue_epoch(EpochExecutionTask::new(
799                    epoch_arena_index,
800                    inner,
801                    reward_execution_info,
802                    false, /* on_local_pivot */
803                    false, /* force_recompute */
804                ));
805                last_state_height += 1;
806            }
807        }
808
809        for fork_chain_index in start_chain_index..chain.len() {
810            let epoch_arena_index = chain[fork_chain_index];
811            let reward_index = inner.get_pivot_reward_index(epoch_arena_index);
812
813            let reward_execution_info =
814                self.get_reward_execution_info_from_index(inner, reward_index);
815            self.enqueue_epoch(EpochExecutionTask::new(
816                epoch_arena_index,
817                inner,
818                reward_execution_info,
819                false, /* on_local_pivot */
820                false, /* force_recompute */
821            ));
822        }
823
824        let epoch_execution_result = self.wait_for_result(*block_hash)?;
825        debug!(
826            "Epoch {:?} has state_root={:?} receipts_root={:?} logs_bloom_hash={:?}",
827            inner.arena[me].hash, epoch_execution_result.state_root_with_aux_info,
828            epoch_execution_result.receipts_root, epoch_execution_result.logs_bloom_hash
829        );
830
831        Ok(())
832    }
833}
834
835pub struct ConsensusExecutionHandler {
836    tx_pool: SharedTransactionPool,
837    data_man: Arc<BlockDataManager>,
838    config: ConsensusExecutionConfiguration,
839    verification_config: VerificationConfig,
840    machine: Arc<Machine>,
841    pos_verifier: Arc<PosVerifier>,
842    execution_state_prefetcher: Option<ThreadPool>,
843}
844
845impl ConsensusExecutionHandler {
846    pub fn new(
847        tx_pool: SharedTransactionPool, data_man: Arc<BlockDataManager>,
848        config: ConsensusExecutionConfiguration,
849        verification_config: VerificationConfig, machine: Arc<Machine>,
850        pos_verifier: Arc<PosVerifier>,
851    ) -> Self {
852        ConsensusExecutionHandler {
853            tx_pool,
854            data_man,
855            config,
856            verification_config,
857            machine,
858            pos_verifier,
859            execution_state_prefetcher: if DEFAULT_EXECUTION_PREFETCH_THREADS
860                > 0
861            {
862                Some(
863                    ThreadPoolBuilder::new()
864                        .num_threads(DEFAULT_EXECUTION_PREFETCH_THREADS)
865                        .build()
866                        .unwrap(),
867                )
868            } else {
869                None
870            },
871        }
872    }
873
874    /// Always return `true` for now
875    fn handle_execution_work(&self, task: ExecutionTask) -> bool {
876        debug!("Receive execution task: {:?}", task);
877        match task {
878            ExecutionTask::ExecuteEpoch(task) => {
879                self.handle_epoch_execution(task, None, false)
880            }
881            ExecutionTask::GetResult(task) => self.handle_get_result_task(task),
882            ExecutionTask::Stop => return false,
883        }
884        true
885    }
886
887    fn handle_epoch_execution(
888        &self, task: EpochExecutionTask,
889        debug_record: Option<&mut ComputeEpochDebugRecord>,
890        recover_mpt_during_construct_pivot_state: bool,
891    ) {
892        let _timer = MeterTimer::time_func(CONSENSIS_EXECUTION_TIMER.as_ref());
893        self.compute_epoch(
894            &task.epoch_hash,
895            &task.epoch_block_hashes,
896            task.start_block_number,
897            &task.reward_info,
898            task.on_local_pivot,
899            debug_record,
900            task.force_recompute,
901            recover_mpt_during_construct_pivot_state,
902        );
903    }
904
905    fn handle_get_result_task(&self, task: GetExecutionResultTask) {
906        task.sender
907            .send(self.get_execution_result(&task.epoch_hash))
908            .expect("Consensus Worker fails");
909    }
910
911    /// Get `EpochExecutionCommitment` for an executed epoch.
912    ///
913    /// Return `None` if the commitment does not exist in memory or db.
914    /// For archive node, this should only happen when `epoch_hash` is not
915    /// executed.
916    fn get_execution_result(
917        &self, epoch_hash: &H256,
918    ) -> Option<EpochExecutionCommitment> {
919        self.data_man
920            .get_epoch_execution_commitment_with_db(epoch_hash)
921    }
922
923    fn new_state(
924        &self, pivot_block: &Block,
925        recover_mpt_during_construct_pivot_state: bool,
926    ) -> DbResult<State> {
927        let state_root_with_aux_info = &self
928            .data_man
929            .get_epoch_execution_commitment(
930                pivot_block.block_header.parent_hash(),
931            )
932            // Unwrapping is safe because the state exists.
933            .unwrap()
934            .state_root_with_aux_info;
935
936        let state_index = StateIndex::new_for_next_epoch(
937            pivot_block.block_header.parent_hash(),
938            &state_root_with_aux_info,
939            pivot_block.block_header.height() - 1,
940            self.data_man.get_snapshot_epoch_count(),
941        );
942
943        let storage = self
944            .data_man
945            .storage_manager
946            .get_state_for_next_epoch(
947                state_index,
948                recover_mpt_during_construct_pivot_state,
949            )
950            .expect("No db error")
951            // Unwrapping is safe because the state exists.
952            .expect("State exists");
953
954        let state_db = StateDb::new(storage);
955        State::new(state_db)
956    }
957
958    pub fn epoch_executed_and_recovered(
959        &self, epoch_hash: &H256, epoch_block_hashes: &Vec<H256>,
960        on_local_pivot: bool,
961        reward_execution_info: &Option<RewardExecutionInfo>, epoch_height: u64,
962    ) -> bool {
963        // note: the lock on chain_id is never held so this should be OK.
964        let evm_chain_id = self
965            .machine
966            .params()
967            .chain_id
968            .read()
969            .get_chain_id(epoch_height)
970            .in_evm_space();
971
972        self.data_man.epoch_executed_and_recovered(
973            &epoch_hash,
974            &epoch_block_hashes,
975            on_local_pivot,
976            self.config.executive_trace,
977            reward_execution_info,
978            self.pos_verifier.as_ref(),
979            evm_chain_id,
980        )
981    }
982
983    /// Compute the epoch `epoch_hash`, and skip it if already computed.
984    /// After the function is called, it's assured that the state, the receipt
985    /// root, and the receipts of blocks executed by this epoch exist.
986    ///
987    /// TODO Not sure if this difference is important.
988    /// One different between skipped execution in pivot chain is that the
989    /// transactions packed in the skipped epoch will be checked if they can
990    /// be recycled.
991    pub fn compute_epoch(
992        &self,
993        epoch_hash: &H256,
994        epoch_block_hashes: &Vec<H256>,
995        start_block_number: u64,
996        reward_execution_info: &Option<RewardExecutionInfo>,
997        // TODO: this arg should be removed.
998        on_local_pivot: bool,
999        mut debug_record: Option<&mut ComputeEpochDebugRecord>,
1000        force_recompute: bool,
1001        recover_mpt_during_construct_pivot_state: bool,
1002    ) {
1003        // FIXME: Question: where to calculate if we should make a snapshot?
1004        // FIXME: Currently we make the snapshotting decision when committing
1005        // FIXME: a new state.
1006
1007        // persist block number index
1008        // note: we need to persist before execution because in some cases,
1009        // execution is skipped. when `compute_epoch` is called, it is
1010        // guaranteed that `epoch_hash` is on the current pivot chain.
1011        for (index, hash) in epoch_block_hashes.iter().enumerate() {
1012            let block_number =
1013                compute_block_number(start_block_number, index as u64);
1014            self.data_man
1015                .insert_hash_by_block_number(block_number, hash);
1016        }
1017        let end_block_number =
1018            start_block_number + epoch_block_hashes.len() as u64 - 1;
1019
1020        let pivot_block_header = self
1021            .data_man
1022            .block_header_by_hash(epoch_hash)
1023            .expect("must exists");
1024
1025        // Check if epoch is computed
1026        if !force_recompute
1027            && debug_record.is_none()
1028            && self.epoch_executed_and_recovered(
1029                &epoch_hash,
1030                &epoch_block_hashes,
1031                on_local_pivot,
1032                reward_execution_info,
1033                pivot_block_header.height(),
1034            )
1035        {
1036            self.update_on_skipped_execution(
1037                epoch_hash,
1038                &pivot_block_header,
1039                on_local_pivot,
1040            );
1041            return;
1042        }
1043
1044        // Get blocks in this epoch after skip checking
1045        let epoch_blocks = self
1046            .data_man
1047            .blocks_by_hash_list(
1048                epoch_block_hashes,
1049                true, /* update_cache */
1050            )
1051            .expect("blocks exist");
1052        let pivot_block = epoch_blocks.last().expect("Not empty");
1053
1054        debug!(
1055            "Process tx epoch_id={}, block_count={}",
1056            epoch_hash,
1057            epoch_blocks.len(),
1058        );
1059
1060        let mut state = self
1061            .new_state(pivot_block, recover_mpt_during_construct_pivot_state)
1062            .expect("Cannot init state");
1063
1064        let epoch_receipts = self
1065            .process_epoch_transactions(
1066                &mut state,
1067                &epoch_blocks,
1068                start_block_number,
1069                on_local_pivot,
1070                /* virtual_call */ None,
1071            )
1072            // TODO: maybe propagate the error all the way up so that the
1073            // program may restart by itself.
1074            .expect("Can not handle db error in consensus, crashing.");
1075
1076        if let Some(reward_execution_info) = reward_execution_info {
1077            let spec = self
1078                .machine
1079                .spec(end_block_number, pivot_block.block_header.height());
1080            // Calculate the block reward for blocks inside the epoch
1081            // All transaction fees are shared among blocks inside one epoch
1082            self.process_rewards_and_fees(
1083                &mut state,
1084                &reward_execution_info,
1085                epoch_hash,
1086                on_local_pivot,
1087                debug_record.as_deref_mut(),
1088                spec,
1089            );
1090        }
1091
1092        self.process_pos_interest(
1093            &mut state,
1094            &pivot_block.block_header,
1095            end_block_number,
1096        )
1097        .expect("db error");
1098
1099        let commit_result = state
1100            .commit(*epoch_hash, debug_record.as_deref_mut())
1101            .expect(&concat!(file!(), ":", line!(), ":", column!()));
1102
1103        if on_local_pivot {
1104            self.notify_txpool(&commit_result, epoch_hash);
1105        };
1106
1107        self.data_man.insert_epoch_execution_commitment(
1108            pivot_block.hash(),
1109            commit_result.state_root.clone(),
1110            compute_receipts_root(&epoch_receipts),
1111            BlockHeaderBuilder::compute_block_logs_bloom_hash(&epoch_receipts),
1112        );
1113
1114        let epoch_execution_commitment = self
1115            .data_man
1116            .get_epoch_execution_commitment(&epoch_hash)
1117            .unwrap();
1118        debug!(
1119            "compute_epoch: on_local_pivot={}, epoch={:?} state_root={:?} receipt_root={:?}, logs_bloom_hash={:?}",
1120            on_local_pivot, epoch_hash, commit_result.state_root, epoch_execution_commitment.receipts_root, epoch_execution_commitment.logs_bloom_hash,
1121        );
1122        self.data_man
1123            .state_availability_boundary
1124            .write()
1125            .adjust_upper_bound(&pivot_block.block_header);
1126    }
1127
1128    fn update_on_skipped_execution(
1129        &self, epoch_hash: &H256, pivot_block_header: &BlockHeader,
1130        on_local_pivot: bool,
1131    ) {
1132        if on_local_pivot {
1133            // Unwrap is safe here because it's guaranteed by outer if.
1134            let state_root = &self
1135                .data_man
1136                .get_epoch_execution_commitment(epoch_hash)
1137                .unwrap()
1138                .state_root_with_aux_info;
1139            // When the state have expired, don't inform TransactionPool.
1140            // TransactionPool doesn't require a precise best_executed_state
1141            // when pivot chain oscillates.
1142            if self
1143                .data_man
1144                .state_availability_boundary
1145                .read()
1146                .check_availability(pivot_block_header.height(), epoch_hash)
1147            {
1148                self.tx_pool
1149                    .set_best_executed_state_by_epoch(
1150                        StateIndex::new_for_readonly(epoch_hash, &state_root),
1151                    )
1152                    // FIXME: propogate error.
1153                    .expect(&concat!(file!(), ":", line!(), ":", column!()));
1154            }
1155        }
1156        self.data_man
1157            .state_availability_boundary
1158            .write()
1159            .adjust_upper_bound(pivot_block_header);
1160        debug!("Skip execution in prefix {:?}", epoch_hash);
1161    }
1162
1163    fn process_pos_interest(
1164        &self, state: &mut State, pivot_header: &BlockHeader,
1165        current_block_number: u64,
1166    ) -> DbResult<()> {
1167        // TODO(peilun): Specify if we unlock before or after executing the
1168        // transactions.
1169        let maybe_parent_pos_ref = self
1170            .data_man
1171            .block_header_by_hash(&pivot_header.parent_hash()) // `None` only for genesis.
1172            .and_then(|parent| parent.pos_reference().clone());
1173        if self
1174            .pos_verifier
1175            .is_enabled_at_height(pivot_header.height())
1176            && maybe_parent_pos_ref.is_some()
1177            && *pivot_header.pos_reference() != maybe_parent_pos_ref
1178        {
1179            let current_pos_ref = pivot_header
1180                .pos_reference()
1181                .as_ref()
1182                .expect("checked before sync graph insertion");
1183            let parent_pos_ref = &maybe_parent_pos_ref.expect("checked");
1184            // The pos_reference is continuous, so after seeing a new
1185            // pos_reference, we only need to process the new
1186            // unlock_txs in it.
1187            for (unlock_node_id, votes) in self
1188                .pos_verifier
1189                .get_unlock_nodes(current_pos_ref, parent_pos_ref)
1190            {
1191                debug!("unlock node: {:?} {}", unlock_node_id, votes);
1192                update_pos_status(state, unlock_node_id, votes)?;
1193            }
1194            if let Some((pos_epoch, reward_event)) = self
1195                .pos_verifier
1196                .get_reward_distribution_event(current_pos_ref, parent_pos_ref)
1197                .as_ref()
1198                .and_then(|x| x.first())
1199            {
1200                debug!("distribute_pos_interest: {:?}", reward_event);
1201                let account_rewards: Vec<(H160, H256, U256)> =
1202                    distribute_pos_interest(
1203                        state,
1204                        reward_event.rewards(),
1205                        current_block_number,
1206                    )?;
1207                self.data_man.insert_pos_reward(
1208                    *pos_epoch,
1209                    &PosRewardInfo::new(account_rewards, pivot_header.hash()),
1210                )
1211            }
1212        }
1213        Ok(())
1214    }
1215
1216    fn notify_txpool(
1217        &self, commit_result: &StateCommitResult, epoch_hash: &H256,
1218    ) {
1219        // FIXME: We may want to propagate the error up.
1220
1221        let accounts_for_txpool = commit_result.accounts_for_txpool.clone();
1222        {
1223            debug!("Notify epoch[{}]", epoch_hash);
1224
1225            // TODO: use channel to deliver the message.
1226            let txpool_clone = self.tx_pool.clone();
1227            std::thread::Builder::new()
1228                .name("txpool_update_state".into())
1229                .spawn(move || {
1230                    txpool_clone.notify_modified_accounts(accounts_for_txpool);
1231                })
1232                .expect("can not notify tx pool to start state");
1233        }
1234
1235        self.tx_pool
1236            .set_best_executed_state_by_epoch(StateIndex::new_for_readonly(
1237                epoch_hash,
1238                &commit_result.state_root,
1239            ))
1240            .expect(&concat!(file!(), ":", line!(), ":", column!()));
1241    }
1242
1243    fn compute_block_base_reward(
1244        &self, past_block_count: u64, pivot_height: u64,
1245    ) -> U512 {
1246        self.machine
1247            .params()
1248            .base_reward_in_ucfx(past_block_count, pivot_height)
1249    }
1250
1251    /// `epoch_block_states` includes if a block is partial invalid and its
1252    /// anticone difficulty
1253    fn process_rewards_and_fees(
1254        &self, state: &mut State, reward_info: &RewardExecutionInfo,
1255        epoch_later: &H256, on_local_pivot: bool,
1256        mut debug_record: Option<&mut ComputeEpochDebugRecord>, spec: Spec,
1257    ) {
1258        /// (Fee, SetOfPackingBlockHash)
1259        struct TxExecutionInfo(U256, BTreeSet<H256>);
1260
1261        let epoch_blocks = &reward_info.epoch_blocks;
1262        let pivot_block = epoch_blocks.last().expect("Not empty");
1263        let reward_epoch_hash = pivot_block.hash();
1264        debug!("Process rewards and fees for {:?}", reward_epoch_hash);
1265        let epoch_difficulty = pivot_block.block_header.difficulty();
1266
1267        let epoch_size = epoch_blocks.len();
1268        let mut epoch_block_total_rewards = Vec::with_capacity(epoch_size);
1269        // This is the total primary tokens issued in this epoch.
1270        let mut total_base_reward: U256 = 0.into();
1271
1272        let base_reward_per_block = if spec.cip94 {
1273            U512::from(state.pow_base_reward())
1274        } else {
1275            self.compute_block_base_reward(
1276                reward_info.past_block_count,
1277                pivot_block.block_header.height(),
1278            )
1279        };
1280        debug!("base_reward: {}", base_reward_per_block);
1281
1282        // Base reward and anticone penalties.
1283        for (enum_idx, block) in epoch_blocks.iter().enumerate() {
1284            let no_reward = reward_info.epoch_block_no_reward[enum_idx];
1285
1286            if no_reward {
1287                epoch_block_total_rewards.push(U256::from(0));
1288                if debug_record.is_some() {
1289                    let debug_out = debug_record.as_mut().unwrap();
1290                    debug_out.no_reward_blocks.push(block.hash());
1291                }
1292            } else {
1293                let pow_quality =
1294                    VerificationConfig::get_or_compute_header_pow_quality(
1295                        &self.data_man.pow,
1296                        &block.block_header,
1297                    );
1298                let mut reward = if pow_quality >= *epoch_difficulty {
1299                    base_reward_per_block
1300                } else {
1301                    debug!(
1302                        "Block {} pow_quality {} is less than epoch_difficulty {}!",
1303                        block.hash(), pow_quality, epoch_difficulty
1304                    );
1305                    0.into()
1306                };
1307
1308                if let Some(debug_out) = &mut debug_record {
1309                    debug_out.block_rewards.push(BlockHashAuthorValue(
1310                        block.hash(),
1311                        block.block_header.author().clone(),
1312                        U256::try_from(reward).unwrap(),
1313                    ));
1314                }
1315
1316                if reward > 0.into() {
1317                    let anticone_difficulty =
1318                        reward_info.epoch_block_anticone_difficulties[enum_idx];
1319                    // LINT.IfChange(ANTICONE_PENALTY_2)
1320                    let anticone_penalty = reward * anticone_difficulty
1321                        / U512::from(epoch_difficulty)
1322                        * anticone_difficulty
1323                        / U512::from(epoch_difficulty)
1324                        / U512::from(
1325                            self.machine.params().anticone_penalty_ratio,
1326                        )
1327                        / U512::from(
1328                            self.machine.params().anticone_penalty_ratio,
1329                        );
1330                    // Lint.ThenChange(consensus/mod.rs#ANTICONE_PENALTY_1)
1331
1332                    debug_assert!(reward > anticone_penalty);
1333                    reward -= anticone_penalty;
1334
1335                    if debug_record.is_some() {
1336                        let debug_out = debug_record.as_mut().unwrap();
1337                        debug_out.anticone_penalties.push(
1338                            BlockHashAuthorValue(
1339                                block.hash(),
1340                                block.block_header.author().clone(),
1341                                U256::try_from(anticone_penalty).unwrap(),
1342                            ),
1343                        );
1344                        //
1345                        // debug_out.anticone_set_size.push(BlockHashValue(
1346                        //                            block.hash(),
1347                        //
1348                        // reward_info.epoch_block_anticone_set_sizes
1349                        //                                [enum_idx],
1350                        //                        ));
1351                    }
1352                }
1353
1354                debug_assert!(reward <= U512::from(U256::max_value()));
1355                let reward = U256::try_from(reward).unwrap();
1356                epoch_block_total_rewards.push(reward);
1357                if !reward.is_zero() {
1358                    total_base_reward += reward;
1359                }
1360            }
1361        }
1362
1363        // Tx fee for each block in this epoch
1364        let mut tx_fee = HashMap::new();
1365
1366        // Compute tx_fee of each block based on gas_used and gas_price of every
1367        // tx
1368        let mut epoch_receipts = None;
1369        let mut secondary_reward = U256::zero();
1370        for (enum_idx, block) in epoch_blocks.iter().enumerate() {
1371            let block_hash = block.hash();
1372            // TODO: better redesign to avoid recomputation.
1373            // FIXME: check state availability boundary here. Actually, it seems
1374            // FIXME: we should never recompute states here.
1375            let block_receipts = match self
1376                .data_man
1377                .block_execution_result_by_hash_with_epoch(
1378                    &block_hash,
1379                    &reward_epoch_hash,
1380                    false, /* update_pivot_assumption */
1381                    true,  /* update_cache */
1382                ) {
1383                Some(block_exec_result) => block_exec_result.block_receipts,
1384                None => {
1385                    let ctx = self
1386                        .data_man
1387                        .get_epoch_execution_context(&reward_epoch_hash)
1388                        .expect("epoch_execution_context should exists here");
1389
1390                    // We need to return receipts instead of getting it through
1391                    // function get_receipts, because it's
1392                    // possible that the computed receipts is deleted by garbage
1393                    // collection before we try get it
1394                    if epoch_receipts.is_none() {
1395                        epoch_receipts = Some(self.recompute_states(
1396                            &reward_epoch_hash,
1397                            &epoch_blocks,
1398                            ctx.start_block_number,
1399                        )
1400                            // TODO: maybe propagate the error all the way up so that the
1401                            // program may restart by itself.
1402                            .expect("Can not handle db error in consensus, crashing."));
1403                    }
1404                    epoch_receipts.as_ref().unwrap()[enum_idx].clone()
1405                }
1406            };
1407
1408            secondary_reward += block_receipts.secondary_reward;
1409            debug_assert!(
1410                block_receipts.receipts.len() == block.transactions.len()
1411            );
1412            // TODO: fill base_price.
1413            for (tx, receipt) in block
1414                .transactions
1415                .iter()
1416                .zip(block_receipts.receipts.iter())
1417            {
1418                let fee =
1419                    receipt.gas_fee - receipt.burnt_gas_fee.unwrap_or_default();
1420
1421                let info = tx_fee
1422                    .entry(tx.hash())
1423                    .or_insert(TxExecutionInfo(fee, BTreeSet::default()));
1424                // The same transaction is executed only once.
1425                debug_assert!(
1426                    fee.is_zero() || info.0.is_zero() || info.1.len() == 0
1427                );
1428                // `false` means the block is fully valid
1429                // Partial invalid blocks will not share the tx fee
1430                if reward_info.epoch_block_no_reward[enum_idx] == false {
1431                    info.1.insert(block_hash);
1432                }
1433                if !fee.is_zero() && info.0.is_zero() {
1434                    info.0 = fee;
1435                }
1436            }
1437        }
1438
1439        let mut block_tx_fees = HashMap::new();
1440        // Note that some transaction fees may get lost due to solely packed by
1441        // a partially invalid block.
1442        let mut burnt_fee = U256::from(0);
1443        for TxExecutionInfo(fee, block_set) in tx_fee.values() {
1444            if block_set.is_empty() {
1445                burnt_fee += *fee;
1446                // tx_fee for the transactions executed in a partial invalid
1447                // blocks and not packed in other blocks will be lost
1448                continue;
1449            }
1450            let block_count = U256::from(block_set.len());
1451            let quotient: U256 = *fee / block_count;
1452            let mut remainder: U256 = *fee - (block_count * quotient);
1453            for block_hash in block_set {
1454                let reward =
1455                    block_tx_fees.entry(*block_hash).or_insert(U256::zero());
1456                *reward += quotient;
1457                if !remainder.is_zero() {
1458                    *reward += 1.into();
1459                    remainder -= 1.into();
1460                }
1461            }
1462            debug_assert!(remainder.is_zero());
1463        }
1464
1465        let mut merged_rewards = BTreeMap::new();
1466        // Here is the exact secondary reward allocated in total
1467        let mut allocated_secondary_reward = U256::from(0);
1468
1469        for (enum_idx, block) in epoch_blocks.iter().enumerate() {
1470            let base_reward = epoch_block_total_rewards[enum_idx];
1471
1472            let block_hash = block.hash();
1473            // Add tx fee to reward.
1474            let tx_fee = if let Some(fee) = block_tx_fees.get(&block_hash) {
1475                if let Some(debug_out) = &mut debug_record {
1476                    debug_out.tx_fees.push(BlockHashAuthorValue(
1477                        block_hash,
1478                        block.block_header.author().clone(),
1479                        *fee,
1480                    ));
1481                }
1482                *fee
1483            } else {
1484                U256::from(0)
1485            };
1486
1487            // Distribute the secondary reward according to primary reward.
1488            let total_reward = if base_reward > U256::from(0) {
1489                let block_secondary_reward =
1490                    base_reward * secondary_reward / total_base_reward;
1491                if let Some(debug_out) = &mut debug_record {
1492                    debug_out.secondary_rewards.push(BlockHashAuthorValue(
1493                        block_hash,
1494                        block.block_header.author().clone(),
1495                        block_secondary_reward,
1496                    ));
1497                }
1498                allocated_secondary_reward += block_secondary_reward;
1499                base_reward + tx_fee + block_secondary_reward
1500            } else {
1501                base_reward + tx_fee
1502            };
1503
1504            *merged_rewards
1505                .entry(*block.block_header.author())
1506                .or_insert(U256::from(0)) += total_reward;
1507
1508            if let Some(debug_out) = &mut debug_record {
1509                debug_out.block_final_rewards.push(BlockHashAuthorValue(
1510                    block_hash,
1511                    block.block_header.author().clone(),
1512                    total_reward,
1513                ));
1514            }
1515            if on_local_pivot {
1516                self.data_man.insert_block_reward_result(
1517                    block_hash,
1518                    epoch_later,
1519                    BlockRewardResult {
1520                        total_reward,
1521                        tx_fee,
1522                        base_reward,
1523                    },
1524                    true,
1525                );
1526                self.data_man
1527                    .receipts_retain_epoch(&block_hash, &reward_epoch_hash);
1528            }
1529        }
1530
1531        debug!("Give rewards merged_reward={:?}", merged_rewards);
1532
1533        for (address, reward) in merged_rewards {
1534            if spec.is_valid_address(&address) {
1535                state
1536                    .add_balance(&address.with_native_space(), &reward)
1537                    .unwrap();
1538            }
1539
1540            if let Some(debug_out) = &mut debug_record {
1541                debug_out
1542                    .merged_rewards_by_author
1543                    .push(AuthorValue(address, reward));
1544                debug_out.state_ops.push(StateOp::IncentiveLevelOp {
1545                    op_name: "add_balance".to_string(),
1546                    key: address.0.to_hex::<String>().as_bytes().to_vec(),
1547                    maybe_value: Some({
1548                        let h: H256 = BigEndianHash::from_uint(&reward);
1549                        h.0.to_hex::<String>().as_bytes().into()
1550                    }),
1551                });
1552            }
1553        }
1554        let new_mint = total_base_reward + allocated_secondary_reward;
1555        if new_mint >= burnt_fee {
1556            // The very likely case
1557            state.add_total_issued(new_mint - burnt_fee);
1558        } else {
1559            // The very unlikely case
1560            state.sub_total_issued(burnt_fee - new_mint);
1561        }
1562    }
1563
1564    fn recompute_states(
1565        &self, pivot_hash: &H256, epoch_blocks: &Vec<Arc<Block>>,
1566        start_block_number: u64,
1567    ) -> DbResult<Vec<Arc<BlockReceipts>>> {
1568        debug!(
1569            "Recompute receipts epoch_id={}, block_count={}",
1570            pivot_hash,
1571            epoch_blocks.len(),
1572        );
1573        let pivot_block = epoch_blocks.last().expect("Not empty");
1574        let mut state = self.new_state(&pivot_block, false)?;
1575        self.process_epoch_transactions(
1576            &mut state,
1577            &epoch_blocks,
1578            start_block_number,
1579            false,
1580            /* virtual_call */ None,
1581        )
1582    }
1583
1584    pub fn call_virtual(
1585        &self, tx: &SignedTransaction, epoch_id: &H256, epoch_size: usize,
1586        request: EstimateRequest, evm_overrides: EvmOverrides,
1587    ) -> CoreResult<(ExecutionOutcome, EstimateExt)> {
1588        let best_block_header = self.data_man.block_header_by_hash(epoch_id);
1589        if best_block_header.is_none() {
1590            bail!("invalid epoch id");
1591        }
1592        let best_block_header = best_block_header.unwrap();
1593        let block_height = best_block_header.height() + 1;
1594
1595        let pos_id = best_block_header.pos_reference().as_ref();
1596        let pos_view_number =
1597            pos_id.and_then(|id| self.pos_verifier.get_pos_view(id));
1598        let pivot_decision_epoch = pos_id
1599            .and_then(|id| self.pos_verifier.get_pivot_decision(id))
1600            .and_then(|hash| self.data_man.block_header_by_hash(&hash))
1601            .map(|header| header.height());
1602
1603        let start_block_number = match self.data_man.get_epoch_execution_context(epoch_id) {
1604            Some(v) => v.start_block_number + epoch_size as u64,
1605            None => bail!("cannot obtain the execution context. Database is potentially corrupted!"),
1606        };
1607        let spec = self.machine.spec(start_block_number, block_height);
1608        let transitions = &self.machine.params().transition_heights;
1609
1610        invalid_params_check(
1611            "tx",
1612            self.verification_config.verify_transaction_common(
1613                tx,
1614                AllChainID::fake_for_virtual(tx.chain_id().unwrap_or(1)),
1615                block_height,
1616                transitions,
1617                VerifyTxMode::Local(VerifyTxLocalMode::Full, &spec),
1618            ),
1619        )?;
1620
1621        let state_space = match tx.space() {
1622            Space::Native => None,
1623            Space::Ethereum => Some(Space::Ethereum),
1624        };
1625        let statedb = self.get_statedb_by_epoch_id_and_space(
1626            epoch_id,
1627            best_block_header.height(),
1628            state_space,
1629        )?;
1630        let mut state = if evm_overrides.has_state() {
1631            State::new_with_override(
1632                statedb,
1633                &evm_overrides.state.as_ref().unwrap(),
1634                tx.space(),
1635            )?
1636        } else {
1637            State::new(statedb)?
1638        };
1639
1640        let time_stamp = best_block_header.timestamp();
1641        let miner = best_block_header.author().clone();
1642
1643        let base_gas_price = best_block_header.base_price().unwrap_or_default();
1644        let burnt_gas_price =
1645            base_gas_price.map_all(|x| state.burnt_gas_price(x));
1646
1647        let mut env = Env {
1648            chain_id: self.machine.params().chain_id_map(block_height),
1649            number: start_block_number,
1650            author: miner,
1651            timestamp: time_stamp,
1652            difficulty: Default::default(),
1653            accumulated_gas_used: U256::zero(),
1654            last_hash: epoch_id.clone(),
1655            gas_limit: tx.gas().clone(),
1656            epoch_height: block_height,
1657            pos_view: pos_view_number,
1658            finalized_epoch: pivot_decision_epoch,
1659            transaction_epoch_bound: self
1660                .verification_config
1661                .transaction_epoch_bound,
1662            base_gas_price,
1663            burnt_gas_price,
1664            transaction_hash: tx.hash(),
1665            ..Default::default()
1666        };
1667        if evm_overrides.has_block() {
1668            ExecutiveContext::apply_env_overrides(
1669                &mut env,
1670                evm_overrides.block.unwrap(),
1671            );
1672        }
1673        let spec = self.machine.spec(env.number, env.epoch_height);
1674        let mut ex = EstimationContext::new(
1675            &mut state,
1676            &env,
1677            self.machine.as_ref(),
1678            &spec,
1679        );
1680
1681        let r = ex.transact_virtual(tx.clone(), request);
1682        trace!("Execution result {:?}", r);
1683        Ok(r?)
1684    }
1685
1686    /// Execute transactions in the blocks to collect traces.
1687    pub fn collect_blocks_geth_trace(
1688        &self, epoch_id: H256, epoch_num: u64, blocks: &Vec<Arc<Block>>,
1689        opts: GethDebugTracingOptions, tx_hash: Option<H256>,
1690    ) -> CoreResult<Vec<GethTraceWithHash>> {
1691        let state_space = None;
1692        let mut state = self.get_state_by_epoch_id_and_space(
1693            &epoch_id,
1694            epoch_num,
1695            state_space,
1696        )?;
1697
1698        let start_block_number = self
1699            .data_man
1700            .get_epoch_execution_context(&epoch_id)
1701            .map(|v| v.start_block_number)
1702            .expect("should exist");
1703
1704        let mut answer = vec![];
1705        let virtual_call = VirtualCall::GethTrace(GethTask {
1706            tx_hash,
1707            opts,
1708            answer: &mut answer,
1709        });
1710        self.process_epoch_transactions(
1711            &mut state,
1712            blocks,
1713            start_block_number,
1714            false,
1715            Some(virtual_call),
1716        )?;
1717
1718        Ok(answer)
1719    }
1720
1721    fn get_state_by_epoch_id_and_space(
1722        &self, epoch_id: &H256, epoch_height: u64, state_space: Option<Space>,
1723    ) -> DbResult<State> {
1724        let state_db = self.get_statedb_by_epoch_id_and_space(
1725            epoch_id,
1726            epoch_height,
1727            state_space,
1728        )?;
1729        let state = State::new(state_db)?;
1730
1731        Ok(state)
1732    }
1733
1734    fn get_statedb_by_epoch_id_and_space(
1735        &self, epoch_id: &H256, epoch_height: u64, state_space: Option<Space>,
1736    ) -> DbResult<StateDb> {
1737        // Keep the lock until we get the desired State, otherwise the State may
1738        // expire.
1739        let state_availability_boundary =
1740            self.data_man.state_availability_boundary.read();
1741
1742        if !state_availability_boundary.check_read_availability(
1743            epoch_height,
1744            epoch_id,
1745            state_space,
1746        ) {
1747            bail!("state is not ready");
1748        }
1749
1750        let state_index = self
1751            .data_man
1752            .get_state_readonly_index(epoch_id)
1753            .expect("state index should exist");
1754
1755        let state_db = StateDb::new(
1756            self.data_man
1757                .storage_manager
1758                .get_state_no_commit(
1759                    state_index,
1760                    /* try_open = */ true,
1761                    state_space,
1762                )?
1763                .ok_or("state deleted")?,
1764        );
1765
1766        drop(state_availability_boundary);
1767
1768        Ok(state_db)
1769    }
1770}
1771
1772pub struct ConsensusExecutionConfiguration {
1773    pub executive_trace: bool,
1774}