use super::blame_verifier::BlameVerifier;
use crate::{
    block_data_manager::{BlockDataManager, BlockStatus, LocalBlockInfo},
    channel::Channel,
    consensus::{
        consensus_inner::{
            confirmation_meter::ConfirmationMeter,
            consensus_executor::{ConsensusExecutor, EpochExecutionTask},
            ConsensusGraphInner, NULL,
        },
        pivot_hint::PivotHint,
        pos_handler::PosVerifier,
        ConsensusConfig,
    },
    state_exposer::{ConsensusGraphBlockState, STATE_EXPOSER},
    statistics::SharedStatistics,
    NodeType, Notifications, SharedTransactionPool,
};
use cfx_parameters::{consensus::*, consensus_internal::*};
use cfx_storage::{storage_db::SnapshotDbManagerTrait, StateIndex};
use cfx_types::H256;
use hibitset::{BitSet, BitSetLike, DrainableBitSet};
use parking_lot::Mutex;
use primitives::{MERKLE_NULL_NODE, NULL_EPOCH};
use std::{
    cmp::{max, min},
    collections::{BinaryHeap, HashMap, HashSet, VecDeque},
    slice::Iter,
    sync::Arc,
};
pub struct ConsensusNewBlockHandler {
    conf: ConsensusConfig,
    txpool: SharedTransactionPool,
    data_man: Arc<BlockDataManager>,
    executor: Arc<ConsensusExecutor>,
    pos_verifier: Arc<PosVerifier>,
    statistics: SharedStatistics,
    epochs_sender: Arc<Channel<(u64, Vec<H256>)>>,
    blame_verifier: Mutex<BlameVerifier>,
    node_type: NodeType,
    pivot_hint: Option<Arc<PivotHint>>,
}
impl ConsensusNewBlockHandler {
    pub fn new(
        conf: ConsensusConfig, txpool: SharedTransactionPool,
        data_man: Arc<BlockDataManager>, executor: Arc<ConsensusExecutor>,
        statistics: SharedStatistics, notifications: Arc<Notifications>,
        node_type: NodeType, pos_verifier: Arc<PosVerifier>,
        pivot_hint: Option<Arc<PivotHint>>,
    ) -> Self {
        let epochs_sender = notifications.epochs_ordered.clone();
        let blame_verifier =
            Mutex::new(BlameVerifier::new(data_man.clone(), notifications));
        Self {
            pos_verifier,
            conf,
            txpool,
            data_man,
            executor,
            statistics,
            epochs_sender,
            blame_verifier,
            node_type,
            pivot_hint,
        }
    }
    fn compute_old_era_and_new_era_block_set(
        inner: &mut ConsensusGraphInner, new_era_block_arena_index: usize,
    ) -> (HashSet<usize>, HashSet<usize>) {
        let mut old_era_block_arena_index_set = HashSet::new();
        let mut queue = VecDeque::new();
        queue.push_back(new_era_block_arena_index);
        while let Some(x) = queue.pop_front() {
            if inner.arena[x].parent != NULL
                && !old_era_block_arena_index_set
                    .contains(&inner.arena[x].parent)
            {
                old_era_block_arena_index_set.insert(inner.arena[x].parent);
                queue.push_back(inner.arena[x].parent);
            }
            for referee in &inner.arena[x].referees {
                if *referee != NULL
                    && !old_era_block_arena_index_set.contains(referee)
                {
                    old_era_block_arena_index_set.insert(*referee);
                    queue.push_back(*referee);
                }
            }
        }
        let mut new_era_block_arena_index_set = HashSet::new();
        for (i, _) in &inner.arena {
            if !old_era_block_arena_index_set.contains(&i) {
                new_era_block_arena_index_set.insert(i);
            }
        }
        (old_era_block_arena_index_set, new_era_block_arena_index_set)
    }
    fn make_checkpoint_at(
        inner: &mut ConsensusGraphInner, new_era_block_arena_index: usize,
    ) {
        let new_era_height = inner.arena[new_era_block_arena_index].height;
        let (outside_block_arena_indices, new_era_block_arena_index_set) =
            Self::compute_old_era_and_new_era_block_set(
                inner,
                new_era_block_arena_index,
            );
        let mut new_era_genesis_subtree = HashSet::new();
        let mut queue = VecDeque::new();
        queue.push_back(new_era_block_arena_index);
        while let Some(x) = queue.pop_front() {
            new_era_genesis_subtree.insert(x);
            for child in &inner.arena[x].children {
                queue.push_back(*child);
            }
        }
        let new_era_legacy_block_arena_index_set: HashSet<_> =
            new_era_block_arena_index_set
                .difference(&new_era_genesis_subtree)
                .collect();
        let new_era_pivot_index = inner.height_to_pivot_index(new_era_height);
        for v in new_era_block_arena_index_set.iter() {
            let me = *v;
            inner.arena[me]
                .referees
                .retain(|v| new_era_block_arena_index_set.contains(v));
            inner.arena[me]
                .data
                .blockset_in_own_view_of_epoch
                .retain(|v| new_era_block_arena_index_set.contains(v));
            if !new_era_block_arena_index_set.contains(
                &inner.arena[me].data.past_view_last_timer_block_arena_index,
            ) {
                inner.arena[me].data.past_view_last_timer_block_arena_index =
                    NULL;
            }
            if !new_era_block_arena_index_set
                .contains(&inner.arena[me].data.force_confirm)
            {
                inner.arena[me].data.force_confirm = new_era_block_arena_index;
            }
        }
        for v in new_era_legacy_block_arena_index_set {
            let me = *v;
            let mut parent = inner.arena[me].parent;
            if inner.arena[me].era_block != NULL {
                inner.split_root(me);
            }
            if !new_era_block_arena_index_set.contains(&parent) {
                parent = NULL;
            }
            inner.arena[me].parent = parent;
            inner.arena[me].era_block = NULL;
            inner.terminal_hashes.remove(&inner.arena[me].hash);
        }
        inner
            .pastset_cache
            .intersect_update(&outside_block_arena_indices);
        for index in outside_block_arena_indices {
            let hash = inner.arena[index].hash;
            inner.hash_to_arena_indices.remove(&hash);
            inner.terminal_hashes.remove(&hash);
            inner.arena.remove(index);
            inner.data_man.remove_epoch_execution_commitment(&hash);
            inner.data_man.remove_epoch_execution_context(&hash);
        }
        let mut timer_chain_truncate = 0;
        while timer_chain_truncate < inner.timer_chain.len()
            && !new_era_block_arena_index_set
                .contains(&inner.timer_chain[timer_chain_truncate])
        {
            timer_chain_truncate += 1;
        }
        inner.cur_era_genesis_timer_chain_height += timer_chain_truncate as u64;
        assert_eq!(
            inner.cur_era_genesis_timer_chain_height,
            inner.arena[new_era_block_arena_index]
                .data
                .ledger_view_timer_chain_height
        );
        for i in 0..(inner.timer_chain.len() - timer_chain_truncate) {
            inner.timer_chain[i] = inner.timer_chain[i + timer_chain_truncate];
            if i + timer_chain_truncate
                < inner.timer_chain_accumulative_lca.len()
            {
                inner.timer_chain_accumulative_lca[i] = inner
                    .timer_chain_accumulative_lca[i + timer_chain_truncate];
            }
        }
        inner
            .timer_chain
            .resize(inner.timer_chain.len() - timer_chain_truncate, 0);
        if inner.timer_chain_accumulative_lca.len() > timer_chain_truncate {
            inner.timer_chain_accumulative_lca.resize(
                inner.timer_chain_accumulative_lca.len() - timer_chain_truncate,
                0,
            );
        } else {
            inner.timer_chain_accumulative_lca.clear();
        }
        for i in 0..inner.timer_chain_accumulative_lca.len() {
            if i < inner.inner_conf.timer_chain_beta as usize - 1
                || !new_era_genesis_subtree
                    .contains(&inner.timer_chain_accumulative_lca[i])
            {
                inner.timer_chain_accumulative_lca[i] =
                    new_era_block_arena_index;
            }
        }
        assert!(new_era_pivot_index < inner.pivot_chain.len());
        inner.pivot_chain = inner.pivot_chain.split_off(new_era_pivot_index);
        inner.pivot_chain_metadata =
            inner.pivot_chain_metadata.split_off(new_era_pivot_index);
        inner.pivot_chain_metadata[0].past_weight =
            inner.block_weight(new_era_block_arena_index);
        for i in 1..inner.pivot_chain_metadata.len() {
            let pivot = inner.pivot_chain[i];
            inner.pivot_chain_metadata[i].past_weight =
                inner.pivot_chain_metadata[i - 1].past_weight
                    + inner.total_weight_in_own_epoch(
                        &inner.arena[pivot].data.blockset_in_own_view_of_epoch,
                        new_era_block_arena_index,
                    )
                    + inner.block_weight(pivot)
        }
        for d in inner.pivot_chain_metadata.iter_mut() {
            d.last_pivot_in_past_blocks
                .retain(|v| new_era_block_arena_index_set.contains(v));
        }
        inner
            .anticone_cache
            .intersect_update(&new_era_genesis_subtree);
        inner.best_terminals_lca_height_cache.clear();
        inner.has_timer_block_in_anticone_cache.clear();
        inner.split_root(new_era_block_arena_index);
        inner.cur_era_genesis_block_arena_index = new_era_block_arena_index;
        inner.cur_era_genesis_height = new_era_height;
        let cur_era_hash = inner.arena[new_era_block_arena_index].hash.clone();
        let stable_era_arena_index =
            inner.get_pivot_block_arena_index(inner.cur_era_stable_height);
        let stable_era_hash = inner.arena[stable_era_arena_index].hash.clone();
        for (_, x) in &inner.invalid_block_queue {
            assert!(new_era_block_arena_index_set.contains(x))
        }
        inner.data_man.set_cur_consensus_era_genesis_hash(
            &cur_era_hash,
            &stable_era_hash,
        );
        inner
            .data_man
            .new_checkpoint(new_era_height, inner.best_epoch_number());
    }
    pub fn compute_anticone_bruteforce(
        inner: &ConsensusGraphInner, me: usize,
    ) -> BitSet {
        let parent = inner.arena[me].parent;
        if parent == NULL {
            return BitSet::new();
        }
        let mut last_in_pivot = inner.arena[parent].data.last_pivot_in_past;
        for referee in &inner.arena[me].referees {
            last_in_pivot = max(
                last_in_pivot,
                inner.arena[*referee].data.last_pivot_in_past,
            );
        }
        let mut visited = BitSet::new();
        let mut queue = VecDeque::new();
        queue.push_back(me);
        visited.add(me as u32);
        while let Some(index) = queue.pop_front() {
            let parent = inner.arena[index].parent;
            if parent != NULL
                && inner.arena[parent].data.epoch_number > last_in_pivot
                && !visited.contains(parent as u32)
            {
                visited.add(parent as u32);
                queue.push_back(parent);
            }
            for referee in &inner.arena[index].referees {
                if inner.arena[*referee].data.epoch_number > last_in_pivot
                    && !visited.contains(*referee as u32)
                {
                    visited.add(*referee as u32);
                    queue.push_back(*referee);
                }
            }
        }
        queue.clear();
        queue.push_back(me);
        while let Some(index) = queue.pop_front() {
            for child in &inner.arena[index].children {
                if !visited.contains(*child as u32) {
                    visited.add(*child as u32);
                    queue.push_back(*child);
                }
            }
            for referrer in &inner.arena[index].referrers {
                if !visited.contains(*referrer as u32) {
                    visited.add(*referrer as u32);
                    queue.push_back(*referrer);
                }
            }
        }
        let mut anticone = BitSet::with_capacity(inner.arena.capacity() as u32);
        for (i, node) in inner.arena.iter() {
            if node.data.epoch_number > last_in_pivot
                && !visited.contains(i as u32)
                && (node.data.activated || node.data.inactive_dependency_cnt == NULL) && node.era_block != NULL
            {
                anticone.add(i as u32);
            }
        }
        anticone
    }
    pub fn compute_anticone_hashset_bruteforce(
        inner: &ConsensusGraphInner, me: usize,
    ) -> HashSet<usize> {
        let s =
            ConsensusNewBlockHandler::compute_anticone_bruteforce(inner, me);
        let mut ret = HashSet::new();
        for index in s.iter() {
            ret.insert(index as usize);
        }
        ret
    }
    fn compute_and_update_anticone(
        inner: &mut ConsensusGraphInner, me: usize,
    ) -> (BitSet, BitSet) {
        let parent = inner.arena[me].parent;
        let parent_anticone_opt = inner.anticone_cache.get(parent);
        let mut anticone;
        if parent_anticone_opt.is_none() {
            anticone = ConsensusNewBlockHandler::compute_anticone_bruteforce(
                inner, me,
            );
        } else {
            anticone = inner.compute_future_bitset(parent);
            anticone.remove(me as u32);
            for index in parent_anticone_opt.unwrap() {
                anticone.add(*index as u32);
            }
            let mut my_past = BitSet::new();
            let mut queue: VecDeque<usize> = VecDeque::new();
            queue.push_back(me);
            while let Some(index) = queue.pop_front() {
                if my_past.contains(index as u32) {
                    continue;
                }
                debug_assert!(index != parent);
                if index != me {
                    my_past.add(index as u32);
                }
                let idx_parent = inner.arena[index].parent;
                if idx_parent != NULL {
                    if anticone.contains(idx_parent as u32)
                        || inner.arena[idx_parent].era_block == NULL
                    {
                        queue.push_back(idx_parent);
                    }
                }
                for referee in &inner.arena[index].referees {
                    if anticone.contains(*referee as u32)
                        || inner.arena[*referee].era_block == NULL
                    {
                        queue.push_back(*referee);
                    }
                }
            }
            for index in my_past.drain() {
                anticone.remove(index);
            }
            for index in anticone.clone().iter() {
                if inner.arena[index as usize].era_block == NULL {
                    anticone.remove(index);
                }
            }
        }
        inner.anticone_cache.update(me, &anticone);
        let mut anticone_barrier = BitSet::new();
        for index in anticone.clone().iter() {
            let parent = inner.arena[index as usize].parent as u32;
            if !anticone.contains(parent) {
                anticone_barrier.add(index);
            }
        }
        debug!(
            "Block {} anticone size {}",
            inner.arena[me].hash,
            anticone.len()
        );
        (anticone, anticone_barrier)
    }
    fn check_correct_parent_brutal(
        inner: &ConsensusGraphInner, me: usize, subtree_weight: &Vec<i128>,
        checking_candidate: Iter<usize>,
    ) -> bool {
        let mut valid = true;
        let parent = inner.arena[me].parent;
        let force_confirm = inner.arena[me].data.force_confirm;
        let force_confirm_height = inner.arena[force_confirm].height;
        for consensus_arena_index_in_epoch in checking_candidate {
            let lca = inner.lca(*consensus_arena_index_in_epoch, parent);
            assert!(lca != *consensus_arena_index_in_epoch);
            if lca == NULL || inner.arena[lca].height < force_confirm_height {
                continue;
            }
            if lca == parent {
                valid = false;
                break;
            }
            let fork = inner.ancestor_at(
                *consensus_arena_index_in_epoch,
                inner.arena[lca].height + 1,
            );
            let pivot = inner.ancestor_at(parent, inner.arena[lca].height + 1);
            let fork_subtree_weight = subtree_weight[fork];
            let pivot_subtree_weight = subtree_weight[pivot];
            if ConsensusGraphInner::is_heavier(
                (fork_subtree_weight, &inner.arena[fork].hash),
                (pivot_subtree_weight, &inner.arena[pivot].hash),
            ) {
                valid = false;
                break;
            }
        }
        valid
    }
    fn check_correct_parent(
        inner: &mut ConsensusGraphInner, me: usize, anticone_barrier: &BitSet,
        weight_tuple: Option<&Vec<i128>>,
    ) -> bool {
        let parent = inner.arena[me].parent;
        let mut candidate;
        let blockset =
            inner.exchange_or_compute_blockset_in_own_view_of_epoch(me, None);
        let candidate_iter = if inner.arena[parent].data.partial_invalid
            || inner.arena[parent].data.pending
        {
            candidate = blockset.clone();
            let mut p = parent;
            while p != NULL && inner.arena[p].data.partial_invalid
                || inner.arena[p].data.pending
            {
                let blockset_p = inner
                    .exchange_or_compute_blockset_in_own_view_of_epoch(p, None);
                candidate.extend(blockset_p.iter());
                inner.exchange_or_compute_blockset_in_own_view_of_epoch(
                    p,
                    Some(blockset_p),
                );
                p = inner.arena[p].parent;
            }
            candidate.iter()
        } else {
            blockset.iter()
        };
        if let Some(subtree_weight) = weight_tuple {
            let res = ConsensusNewBlockHandler::check_correct_parent_brutal(
                inner,
                me,
                subtree_weight,
                candidate_iter,
            );
            inner.exchange_or_compute_blockset_in_own_view_of_epoch(
                me,
                Some(blockset),
            );
            return res;
        }
        let mut valid = true;
        let force_confirm = inner.arena[me].data.force_confirm;
        let force_confirm_height = inner.arena[force_confirm].height;
        let mut weight_delta = HashMap::new();
        for index in anticone_barrier {
            let delta = inner.weight_tree.get(index as usize);
            weight_delta.insert(index as usize, delta);
        }
        for (index, delta) in &weight_delta {
            inner.weight_tree.path_apply(*index, -delta);
        }
        for consensus_arena_index_in_epoch in candidate_iter {
            let lca = inner.lca(*consensus_arena_index_in_epoch, parent);
            assert!(lca != *consensus_arena_index_in_epoch);
            if lca == NULL || inner.arena[lca].height < force_confirm_height {
                continue;
            }
            if lca == parent {
                debug!("Block invalid (index = {}), referenced block {} index {} is in the subtree of parent block {} index {}!", me, inner.arena[*consensus_arena_index_in_epoch].hash, *consensus_arena_index_in_epoch, inner.arena[parent].hash, parent);
                valid = false;
                break;
            }
            let fork = inner.ancestor_at(
                *consensus_arena_index_in_epoch,
                inner.arena[lca].height + 1,
            );
            let pivot = inner.ancestor_at(parent, inner.arena[lca].height + 1);
            let fork_subtree_weight = inner.weight_tree.get(fork);
            let pivot_subtree_weight = inner.weight_tree.get(pivot);
            if ConsensusGraphInner::is_heavier(
                (fork_subtree_weight, &inner.arena[fork].hash),
                (pivot_subtree_weight, &inner.arena[pivot].hash),
            ) {
                debug!("Block invalid (index = {}), referenced block {} index {} fork is heavier than the parent block {} index {} fork! Ref fork block {} weight {}, parent fork block {} weight {}!",
                       me, inner.arena[*consensus_arena_index_in_epoch].hash, *consensus_arena_index_in_epoch, inner.arena[parent].hash, parent,
                       inner.arena[fork].hash, fork_subtree_weight, inner.arena[pivot].hash, pivot_subtree_weight);
                valid = false;
                break;
            } else {
                trace!("Pass one validity check, block index = {}. Referenced block {} index {} fork is not heavier than the parent block {} index {} fork. Ref fork block {} weight {}, parent fork block {} weight {}!",
                       me, inner.arena[*consensus_arena_index_in_epoch].hash, *consensus_arena_index_in_epoch, inner.arena[parent].hash, parent,
                       inner.arena[fork].hash, fork_subtree_weight, inner.arena[pivot].hash, pivot_subtree_weight);
            }
        }
        inner.exchange_or_compute_blockset_in_own_view_of_epoch(
            me,
            Some(blockset),
        );
        for (index, delta) in &weight_delta {
            inner.weight_tree.path_apply(*index, *delta);
        }
        valid
    }
    fn check_block_full_validity(
        &self, new: usize, inner: &mut ConsensusGraphInner, adaptive: bool,
        anticone_barrier: &BitSet, weight_tuple: Option<&Vec<i128>>,
    ) -> bool {
        let parent = inner.arena[new].parent;
        let force_confirm = inner.arena[new].data.force_confirm;
        if inner.lca(parent, force_confirm) != force_confirm {
            warn!("Partially invalid due to picking incorrect parent (force confirmation {:?} violation). {:?}", force_confirm, inner.arena[new].hash);
            return false;
        }
        if !ConsensusNewBlockHandler::check_correct_parent(
            inner,
            new,
            anticone_barrier,
            weight_tuple,
        ) {
            warn!(
                "Partially invalid due to picking incorrect parent. {:?}",
                inner.arena[new].hash
            );
            return false;
        }
        if inner.arena[new].difficulty
            != inner.expected_difficulty(&inner.arena[parent].hash)
        {
            warn!(
                "Partially invalid due to wrong difficulty. {:?}",
                inner.arena[new].hash
            );
            return false;
        }
        if !self.conf.bench_mode {
            if inner.arena[new].adaptive != adaptive {
                warn!(
                    "Partially invalid due to invalid adaptive field. {:?}",
                    inner.arena[new].hash
                );
                return false;
            }
        }
        if self
            .pos_verifier
            .is_enabled_at_height(inner.arena[new].height)
        {
            let pivot_decision = inner
                .get_pos_reference_pivot_decision(&inner.arena[new].hash)
                .expect("pos reference checked");
            match inner.hash_to_arena_indices.get(&pivot_decision) {
                None => {
                    warn!("Possibly partial invalid due to pos_reference's pivot decision not in consensus graph");
                    return inner.pivot_block_processed(&pivot_decision);
                }
                Some(pivot_decision_arena_index) => {
                    if inner.lca(new, *pivot_decision_arena_index)
                        != *pivot_decision_arena_index
                    {
                        warn!("Partial invalid due to not in the subtree of pos_reference's pivot decision");
                        return false;
                    }
                }
            }
        }
        return true;
    }
    #[inline]
    fn update_lcts_initial(&self, inner: &mut ConsensusGraphInner, me: usize) {
        let parent = inner.arena[me].parent;
        inner.weight_tree.make_tree(me);
        inner.weight_tree.link(parent, me);
        inner.adaptive_tree.make_tree(me);
        inner.adaptive_tree.link(parent, me);
    }
    #[inline]
    fn update_lcts_finalize(&self, inner: &mut ConsensusGraphInner, me: usize) {
        let parent = inner.arena[me].parent;
        let parent_tw = inner.weight_tree.get(parent);
        let parent_w = inner.block_weight(parent);
        inner.adaptive_tree.set(me, -parent_tw + parent_w);
        let weight = inner.block_weight(me);
        inner.weight_tree.path_apply(me, weight);
        inner.adaptive_tree.path_apply(me, 2 * weight);
        inner.adaptive_tree.caterpillar_apply(parent, -weight);
    }
    fn recycle_tx_in_block(
        &self, inner: &ConsensusGraphInner, block_hash: &H256,
    ) {
        info!("recycle_tx_in_block: block_hash={:?}", block_hash);
        if let Some(block) = inner
            .data_man
            .block_by_hash(block_hash, true )
        {
            self.txpool.recycle_transactions(block.transactions.clone());
        } else {
            warn!("recycle_tx_in_block: block {:?} not in db", block_hash);
        }
    }
    fn should_move_stable_height(
        &self, inner: &mut ConsensusGraphInner,
    ) -> u64 {
        if let Some(sync_state_starting_epoch) =
            self.conf.sync_state_starting_epoch
        {
            if inner.header_only
                && inner.cur_era_stable_height == sync_state_starting_epoch
            {
                return inner.cur_era_stable_height;
            }
        }
        let new_stable_height =
            inner.cur_era_stable_height + inner.inner_conf.era_epoch_count;
        if new_stable_height + inner.inner_conf.era_epoch_count
            > inner.best_epoch_number()
        {
            return inner.cur_era_stable_height;
        }
        let new_stable_pivot_arena_index =
            inner.get_pivot_block_arena_index(new_stable_height);
        if inner.timer_chain_accumulative_lca.len() == 0 {
            return inner.cur_era_stable_height;
        }
        if let Some(last) = inner.timer_chain_accumulative_lca.last() {
            let lca = inner.lca(*last, new_stable_pivot_arena_index);
            if lca == new_stable_pivot_arena_index {
                return new_stable_height;
            }
        }
        return inner.cur_era_stable_height;
    }
    fn should_form_checkpoint_at(
        &self, inner: &mut ConsensusGraphInner,
    ) -> usize {
        let stable_pivot_block =
            inner.get_pivot_block_arena_index(inner.cur_era_stable_height);
        let mut new_genesis_height =
            inner.cur_era_genesis_height + inner.inner_conf.era_epoch_count;
        if !inner.header_only && !self.conf.bench_mode {
            if !inner.arena[stable_pivot_block].data.state_valid.unwrap() {
                if inner.arena[stable_pivot_block]
                    .data
                    .blame_info
                    .unwrap()
                    .blame as u64
                    + new_genesis_height
                    + DEFERRED_STATE_EPOCH_COUNT
                    >= inner.cur_era_stable_height
                {
                    return inner.cur_era_genesis_block_arena_index;
                }
            }
        }
        'out: while new_genesis_height < inner.cur_era_stable_height {
            let new_genesis_block_arena_index =
                inner.get_pivot_block_arena_index(new_genesis_height);
            assert!(inner.arena[stable_pivot_block].data.force_confirm != NULL);
            if inner.lca(
                new_genesis_block_arena_index,
                inner.arena[stable_pivot_block].data.force_confirm,
            ) != new_genesis_block_arena_index
            {
                return inner.cur_era_genesis_block_arena_index;
            }
            if inner
                .has_timer_block_in_anticone_cache
                .contains(&new_genesis_block_arena_index)
            {
                new_genesis_height += inner.inner_conf.era_epoch_count;
                continue 'out;
            }
            let mut visited = BitSet::new();
            let mut queue = VecDeque::new();
            queue.push_back(new_genesis_block_arena_index);
            visited.add(new_genesis_block_arena_index as u32);
            while let Some(x) = queue.pop_front() {
                for child in &inner.arena[x].children {
                    if !visited.contains(*child as u32) {
                        visited.add(*child as u32);
                        queue.push_back(*child);
                    }
                }
                for referrer in &inner.arena[x].referrers {
                    if !visited.contains(*referrer as u32) {
                        visited.add(*referrer as u32);
                        queue.push_back(*referrer);
                    }
                }
            }
            let start_timer_chain_height = inner.arena
                [new_genesis_block_arena_index]
                .data
                .ledger_view_timer_chain_height;
            let start_timer_chain_index = (start_timer_chain_height
                - inner.cur_era_genesis_timer_chain_height)
                as usize;
            for i in start_timer_chain_index..inner.timer_chain.len() {
                if !visited.contains(inner.timer_chain[i] as u32) {
                    inner
                        .has_timer_block_in_anticone_cache
                        .insert(new_genesis_block_arena_index);
                    new_genesis_height += inner.inner_conf.era_epoch_count;
                    continue 'out;
                }
            }
            return new_genesis_block_arena_index;
        }
        inner.cur_era_genesis_block_arena_index
    }
    fn persist_terminals(&self, inner: &ConsensusGraphInner) {
        let mut terminals = Vec::with_capacity(inner.terminal_hashes.len());
        for h in &inner.terminal_hashes {
            terminals.push(h.clone());
        }
        self.data_man.insert_terminals_to_db(terminals);
    }
    fn try_clear_blockset_in_own_view_of_epoch(
        inner: &mut ConsensusGraphInner, me: usize,
    ) {
        if inner.arena[me].data.blockset_in_own_view_of_epoch.len() as u64
            > BLOCKSET_IN_OWN_VIEW_OF_EPOCH_CAP
        {
            inner.arena[me].data.blockset_in_own_view_of_epoch =
                Default::default();
            inner.arena[me].data.skipped_epoch_blocks = Default::default();
            inner.arena[me].data.blockset_cleared = true;
        }
    }
    fn compute_timer_chain_tuple(
        inner: &ConsensusGraphInner, me: usize, anticone: &BitSet,
    ) -> (u64, HashMap<usize, u64>, Vec<usize>, Vec<usize>) {
        inner.compute_timer_chain_tuple(
            inner.arena[me].parent,
            &inner.arena[me].referees,
            Some(anticone),
        )
    }
    fn compute_invalid_block_start_timer(
        &self, inner: &ConsensusGraphInner, me: usize,
    ) -> u64 {
        let last_index =
            inner.arena[me].data.past_view_last_timer_block_arena_index;
        if last_index == NULL {
            inner.inner_conf.timer_chain_beta
        } else {
            inner.arena[last_index].data.ledger_view_timer_chain_height
                + inner.inner_conf.timer_chain_beta
                + if inner.get_timer_chain_index(last_index) != NULL {
                    1
                } else {
                    0
                }
        }
    }
    fn preactivate_block(
        &self, inner: &mut ConsensusGraphInner, me: usize,
    ) -> BlockStatus {
        debug!(
            "Start to preactivate block {} index = {}",
            inner.arena[me].hash, me
        );
        let parent = inner.arena[me].parent;
        let mut pending = {
            if let Some(f) = inner.initial_stable_future.as_mut() {
                let mut in_future = false;
                if inner.arena[me].hash == inner.cur_era_stable_block_hash {
                    in_future = true;
                }
                if parent != NULL && f.contains(parent as u32) {
                    in_future = true;
                }
                if !in_future {
                    for referee in &inner.arena[me].referees {
                        if f.contains(*referee as u32) {
                            in_future = true;
                            break;
                        }
                    }
                }
                if in_future {
                    f.add(me as u32);
                }
                !in_future
            } else {
                let mut last_pivot_in_past = if parent != NULL {
                    inner.arena[parent].data.last_pivot_in_past
                } else {
                    inner.cur_era_genesis_height
                };
                for referee in &inner.arena[me].referees {
                    last_pivot_in_past = max(
                        last_pivot_in_past,
                        inner.arena[*referee].data.last_pivot_in_past,
                    );
                }
                last_pivot_in_past < inner.cur_era_stable_height
            }
        };
        let (timer_longest_difficulty, last_timer_block_arena_index) = inner
            .compute_timer_chain_past_view_info(
                parent,
                &inner.arena[me].referees,
            );
        inner.arena[me].data.past_view_timer_longest_difficulty =
            timer_longest_difficulty;
        inner.arena[me].data.past_view_last_timer_block_arena_index =
            last_timer_block_arena_index;
        inner.arena[me].data.force_confirm =
            inner.cur_era_genesis_block_arena_index;
        let fully_valid;
        let (anticone, anticone_barrier) =
            ConsensusNewBlockHandler::compute_and_update_anticone(inner, me);
        if !pending {
            let timer_chain_tuple =
                ConsensusNewBlockHandler::compute_timer_chain_tuple(
                    inner, me, &anticone,
                );
            inner.arena[me].data.force_confirm = inner
                .compute_block_force_confirm(
                    &timer_chain_tuple,
                    self.data_man
                        .pos_reference_by_hash(&inner.arena[me].hash)
                        .expect("header exist"),
                );
            debug!(
                "Force confirm block index {} in the past view of block index={}",
                inner.arena[me].data.force_confirm, me
            );
            let weight_tuple = if anticone_barrier.len() >= ANTICONE_BARRIER_CAP
            {
                Some(inner.compute_subtree_weights(me, &anticone_barrier))
            } else {
                None
            };
            let adaptive = inner.adaptive_weight(
                me,
                &anticone_barrier,
                weight_tuple.as_ref(),
                &timer_chain_tuple,
            );
            fully_valid = self.check_block_full_validity(
                me,
                inner,
                adaptive,
                &anticone_barrier,
                weight_tuple.as_ref(),
            );
            if self.conf.bench_mode && fully_valid {
                inner.arena[me].adaptive = adaptive;
            }
        } else {
            let block_status_in_db = self
                .data_man
                .local_block_info_by_hash(&inner.arena[me].hash)
                .map(|info| info.get_status())
                .unwrap_or(BlockStatus::Pending);
            fully_valid = block_status_in_db != BlockStatus::PartialInvalid;
            pending = block_status_in_db == BlockStatus::Pending;
            debug!(
                "Fetch the block validity status {} from the local data base",
                fully_valid
            );
        }
        debug!(
            "Finish preactivation block {} index = {}",
            inner.arena[me].hash, me
        );
        let block_status = if pending {
            BlockStatus::Pending
        } else if fully_valid {
            BlockStatus::Valid
        } else {
            BlockStatus::PartialInvalid
        };
        self.persist_block_info(inner, me, block_status);
        block_status
    }
    fn activate_block(
        &self, inner: &mut ConsensusGraphInner, me: usize,
        meter: &ConfirmationMeter, queue: &mut VecDeque<usize>,
    ) {
        inner.arena[me].data.activated = true;
        self.statistics.inc_consensus_graph_activated_block_count();
        let mut succ_list = inner.arena[me].children.clone();
        succ_list.extend(inner.arena[me].referrers.iter());
        for succ in &succ_list {
            assert!(inner.arena[*succ].data.inactive_dependency_cnt > 0);
            inner.arena[*succ].data.inactive_dependency_cnt -= 1;
            if inner.arena[*succ].data.inactive_dependency_cnt == 0 {
                queue.push_back(*succ);
            }
        }
        if inner.arena[me].era_block == NULL {
            debug!(
                "Updated active counters for out-of-era block in ConsensusGraph: index = {:?} hash={:?}",
                me, inner.arena[me].hash,
            );
            return;
        } else {
            debug!(
                "Start activating block in ConsensusGraph: index = {:?} hash={:?} height={:?}",
                me, inner.arena[me].hash, inner.arena[me].height,
            );
        }
        let parent = inner.arena[me].parent;
        if parent != NULL {
            inner.terminal_hashes.remove(&inner.arena[parent].hash);
        }
        inner.terminal_hashes.insert(inner.arena[me].hash.clone());
        for referee in &inner.arena[me].referees {
            inner.terminal_hashes.remove(&inner.arena[*referee].hash);
        }
        self.update_lcts_finalize(inner, me);
        let my_weight = inner.block_weight(me);
        let mut extend_pivot = false;
        let mut pivot_changed = false;
        let mut fork_at;
        let old_pivot_chain_len = inner.pivot_chain.len();
        inner.update_pos_pivot_decision(me);
        let diff = inner.arena[me].data.past_view_timer_longest_difficulty
            + inner.get_timer_difficulty(me);
        if inner.arena[me].is_timer
            && !inner.arena[me].data.partial_invalid
            && ConsensusGraphInner::is_heavier(
                (diff, &inner.arena[me].hash),
                (
                    inner.best_timer_chain_difficulty,
                    &inner.best_timer_chain_hash,
                ),
            )
        {
            inner.best_timer_chain_difficulty = diff;
            inner.best_timer_chain_hash = inner.arena[me].hash.clone();
            inner.update_timer_chain(me);
            if !self.conf.bench_mode {
                let mut new_block_queue = BinaryHeap::new();
                for (_, x) in &inner.invalid_block_queue {
                    let timer =
                        self.compute_invalid_block_start_timer(inner, *x);
                    new_block_queue.push((-(timer as i128), *x));
                    debug!(
                        "Partial invalid Block {} (hash = {}) start timer is now {}",
                        *x, inner.arena[*x].hash, timer
                    );
                }
                inner.invalid_block_queue = new_block_queue;
            }
        } else {
            let mut timer_chain_height =
                inner.arena[parent].data.ledger_view_timer_chain_height;
            if inner.get_timer_chain_index(parent) != NULL {
                timer_chain_height += 1;
            }
            for referee in &inner.arena[me].referees {
                let timer_bit = if inner.get_timer_chain_index(*referee) != NULL
                {
                    1
                } else {
                    0
                };
                if inner.arena[*referee].data.ledger_view_timer_chain_height
                    + timer_bit
                    > timer_chain_height
                {
                    timer_chain_height = inner.arena[*referee]
                        .data
                        .ledger_view_timer_chain_height
                        + timer_bit;
                }
            }
            inner.arena[me].data.ledger_view_timer_chain_height =
                timer_chain_height;
        }
        meter.aggregate_total_weight_in_past(my_weight);
        let force_confirm = inner.compute_global_force_confirm();
        let force_height = inner.arena[force_confirm].height;
        let last = inner.pivot_chain.last().cloned().unwrap();
        let force_lca = inner.lca(force_confirm, last);
        if force_lca == force_confirm && inner.arena[me].parent == last {
            let me_height = inner.arena[me].height;
            let me_hash = inner.arena[me].hash;
            let allow_extend = self
                .pivot_hint
                .as_ref()
                .map_or(true, |hint| hint.allow_extend(me_height, me_hash));
            if allow_extend {
                inner.pivot_chain.push(me);
                inner.set_epoch_number_in_epoch(
                    me,
                    inner.pivot_index_to_height(inner.pivot_chain.len()) - 1,
                );
                inner.pivot_chain_metadata.push(Default::default());
                extend_pivot = true;
                pivot_changed = true;
                fork_at = inner.pivot_index_to_height(old_pivot_chain_len);
            } else {
                debug!("Chain extend rejected by pivot hint: height={me_height}, hash={me_hash:?}");
                fork_at = inner.pivot_index_to_height(old_pivot_chain_len);
            }
        } else {
            let lca = inner.lca(last, me);
            let new;
            if self.pivot_hint.is_some() && lca == last {
                debug!("Chain extend rejected by pivot hint because parent is rejected.");
                fork_at = inner.pivot_index_to_height(old_pivot_chain_len);
                new = 0;
            } else if force_confirm != force_lca {
                debug!(
                    "pivot chain switch to force_confirm={} force_height={}",
                    force_confirm, force_height
                );
                fork_at = inner.arena[force_lca].height + 1;
                new = inner.ancestor_at(force_confirm, fork_at);
                pivot_changed = true;
            } else {
                fork_at = inner.arena[lca].height + 1;
                let prev = inner.get_pivot_block_arena_index(fork_at);
                let prev_weight = inner.weight_tree.get(prev);
                new = inner.ancestor_at(me, fork_at);
                let new_weight = inner.weight_tree.get(new);
                let me_height = inner.arena[me].height;
                let me_ancestor_hash_at =
                    |height| inner.arena[inner.ancestor_at(me, height)].hash;
                if fork_at > force_height
                    && ConsensusGraphInner::is_heavier(
                        (new_weight, &inner.arena[new].hash),
                        (prev_weight, &inner.arena[prev].hash),
                    )
                    && self.pivot_hint.as_ref().map_or(true, |hint| {
                        hint.allow_switch(
                            fork_at,
                            me_height,
                            me_ancestor_hash_at,
                        )
                    })
                {
                    pivot_changed = true;
                } else {
                    debug!("Old pivot chain is heavier, pivot chain unchanged");
                    fork_at = inner.pivot_index_to_height(old_pivot_chain_len);
                }
            }
            if pivot_changed {
                let fork_pivot_index = inner.height_to_pivot_index(fork_at);
                assert!(fork_pivot_index < inner.pivot_chain.len());
                for discarded_idx in
                    inner.pivot_chain.split_off(fork_pivot_index)
                {
                    inner.reset_epoch_number_in_epoch(discarded_idx);
                    ConsensusNewBlockHandler::try_clear_blockset_in_own_view_of_epoch(inner,
                    discarded_idx);
                }
                let mut u = new;
                loop {
                    inner.compute_blockset_in_own_view_of_epoch(u);
                    inner.pivot_chain.push(u);
                    inner.set_epoch_number_in_epoch(
                        u,
                        inner.pivot_index_to_height(inner.pivot_chain.len())
                            - 1,
                    );
                    if inner.arena[u].height >= force_height {
                        let mut heaviest = NULL;
                        let mut heaviest_weight = 0;
                        for index in &inner.arena[u].children {
                            if !inner.arena[*index].data.activated {
                                continue;
                            }
                            let weight = inner.weight_tree.get(*index);
                            if heaviest == NULL
                                || ConsensusGraphInner::is_heavier(
                                    (weight, &inner.arena[*index].hash),
                                    (
                                        heaviest_weight,
                                        &inner.arena[heaviest].hash,
                                    ),
                                )
                            {
                                heaviest = *index;
                                heaviest_weight = weight;
                            }
                        }
                        if heaviest == NULL {
                            break;
                        }
                        u = heaviest;
                    } else {
                        u = inner.ancestor_at(
                            force_confirm,
                            inner.arena[u].height + 1,
                        );
                    }
                }
            }
        };
        debug!(
            "Forked at height {}, fork parent block {}",
            fork_at,
            &inner.arena[inner.get_pivot_block_arena_index(fork_at - 1)].hash,
        );
        if !extend_pivot {
            let update_at = fork_at - 1;
            let mut last_pivot_to_update = HashSet::new();
            last_pivot_to_update.insert(me);
            if pivot_changed {
                inner.best_terminals_reorg_height =
                    min(inner.best_terminals_reorg_height, update_at);
                let update_pivot_index = inner.height_to_pivot_index(update_at);
                for pivot_index in update_pivot_index..old_pivot_chain_len {
                    for x in &inner.pivot_chain_metadata[pivot_index]
                        .last_pivot_in_past_blocks
                    {
                        last_pivot_to_update.insert(*x);
                    }
                }
                inner.recompute_metadata(fork_at, last_pivot_to_update);
            } else {
                ConsensusNewBlockHandler::try_clear_blockset_in_own_view_of_epoch(inner, me);
                inner.recompute_metadata(
                    inner.get_pivot_height(),
                    last_pivot_to_update,
                );
            }
        } else {
            let height = inner.arena[me].height;
            inner.arena[me].data.last_pivot_in_past = height;
            let pivot_index = inner.height_to_pivot_index(height);
            inner.pivot_chain_metadata[pivot_index]
                .last_pivot_in_past_blocks
                .insert(me);
            let blockset = inner
                .exchange_or_compute_blockset_in_own_view_of_epoch(me, None);
            inner.pivot_chain_metadata[pivot_index].past_weight =
                inner.pivot_chain_metadata[pivot_index - 1].past_weight
                    + inner.total_weight_in_own_epoch(
                        &blockset,
                        inner.cur_era_genesis_block_arena_index,
                    )
                    + inner.block_weight(me);
            inner.exchange_or_compute_blockset_in_own_view_of_epoch(
                me,
                Some(blockset),
            );
        }
        if (inner.arena[me].height <= inner.cur_era_stable_height
            || (inner.arena[me].height > inner.cur_era_stable_height
                && inner.arena
                    [inner.ancestor_at(me, inner.cur_era_stable_height)]
                .hash
                    != inner.cur_era_stable_block_hash))
            && !self.conf.bench_mode
        {
            self.persist_terminals(inner);
            if pivot_changed {
                self.data_man
                    .state_availability_boundary
                    .write()
                    .optimistic_executed_height = None;
            }
            debug!(
                "Finish activating block in ConsensusGraph: index={:?} hash={:?},\
                 block is not in the subtree of stable",
                me, inner.arena[me].hash
            );
            return;
        }
        let capped_fork_at = max(inner.cur_era_stable_height + 1, fork_at);
        inner.adjust_difficulty(*inner.pivot_chain.last().expect("not empty"));
        if me % CONFIRMATION_METER_UPDATE_FREQUENCY == 0 || pivot_changed {
            meter.update_confirmation_risks(inner);
        }
        if pivot_changed {
            if inner.pivot_chain.len() > EPOCH_SET_PERSISTENCE_DELAY as usize {
                let capped_fork_at_pivot_index =
                    inner.height_to_pivot_index(capped_fork_at);
                let start_pivot_index = if old_pivot_chain_len
                    >= EPOCH_SET_PERSISTENCE_DELAY as usize
                {
                    min(
                        capped_fork_at_pivot_index,
                        old_pivot_chain_len
                            - EPOCH_SET_PERSISTENCE_DELAY as usize,
                    )
                } else {
                    capped_fork_at_pivot_index
                };
                let to_persist_pivot_index = inner.pivot_chain.len()
                    - EPOCH_SET_PERSISTENCE_DELAY as usize;
                for pivot_index in start_pivot_index..to_persist_pivot_index {
                    inner.persist_epoch_set_hashes(pivot_index);
                }
            }
        }
        let old_pivot_chain_height =
            inner.pivot_index_to_height(old_pivot_chain_len);
        if inner.best_epoch_number() > inner.cur_era_stable_height
            && inner.arena
                [inner.get_pivot_block_arena_index(inner.cur_era_stable_height)]
            .hash
                == inner.cur_era_stable_block_hash
        {
            let new_stable_height = self.should_move_stable_height(inner);
            if inner.cur_era_stable_height != new_stable_height {
                inner.cur_era_stable_height = new_stable_height;
                let stable_arena_index =
                    inner.get_pivot_block_arena_index(new_stable_height);
                if !inner.header_only && !self.conf.bench_mode {
                    self.executor
                        .wait_for_result(inner.arena[stable_arena_index].hash)
                        .expect(
                            "Execution state of the pivot chain is corrupted!",
                        );
                    inner
                        .compute_state_valid_and_blame_info(
                            stable_arena_index,
                            &self.executor,
                        )
                        .expect(
                            "New stable node should have available state_valid",
                        );
                }
                let genesis_hash =
                    &inner.arena[inner.cur_era_genesis_block_arena_index].hash;
                let stable_hash = &inner.arena[stable_arena_index].hash;
                inner.cur_era_stable_block_hash = stable_hash.clone();
                inner.data_man.set_cur_consensus_era_genesis_hash(
                    genesis_hash,
                    stable_hash,
                );
                inner.initial_stable_future = None;
                debug!(
                    "Move era stable genesis to height={} hash={:?}",
                    new_stable_height, stable_hash
                );
            }
        }
        if inner.cur_era_stable_height < inner.best_epoch_number()
            && inner.arena
                [inner.get_pivot_block_arena_index(inner.cur_era_stable_height)]
            .hash
                == inner.cur_era_stable_block_hash
        {
            let new_checkpoint_era_genesis =
                self.should_form_checkpoint_at(inner);
            if new_checkpoint_era_genesis
                != inner.cur_era_genesis_block_arena_index
            {
                info!(
                    "Working on new checkpoint, old checkpoint block {} height {}",
                    &inner.arena[inner.cur_era_genesis_block_arena_index].hash,
                    inner.cur_era_genesis_height
                );
                ConsensusNewBlockHandler::make_checkpoint_at(
                    inner,
                    new_checkpoint_era_genesis,
                );
                let stable_era_genesis_arena_index =
                    inner.ancestor_at(me, inner.cur_era_stable_height);
                meter.reset_for_checkpoint(
                    inner.weight_tree.get(stable_era_genesis_arena_index),
                    inner.cur_era_stable_height,
                );
                meter.update_confirmation_risks(inner);
                info!(
                    "New checkpoint formed at block {} stable block {} height {}",
                    &inner.arena[inner.cur_era_genesis_block_arena_index].hash,
                    &inner.arena[stable_era_genesis_arena_index].hash,
                    inner.cur_era_genesis_height
                );
            }
        }
        let from = capped_fork_at;
        let to = inner.pivot_index_to_height(inner.pivot_chain.len());
        for epoch_number in from..to {
            let arena_index = inner.get_pivot_block_arena_index(epoch_number);
            let epoch_hashes = inner.get_epoch_block_hashes(arena_index);
            self.epochs_sender.send((epoch_number, epoch_hashes));
            if let NodeType::Light = self.node_type {
                self.blame_verifier.lock().process(inner, epoch_number);
            }
        }
        if !inner.header_only {
            let mut confirmed_height = meter.get_confirmed_epoch_num();
            if confirmed_height < DEFERRED_STATE_EPOCH_COUNT {
                confirmed_height = 0;
            } else {
                confirmed_height -= DEFERRED_STATE_EPOCH_COUNT;
            }
            self.data_man
                .storage_manager
                .get_storage_manager()
                .maintain_state_confirmed(
                    inner,
                    inner.cur_era_stable_height,
                    self.conf.inner_conf.era_epoch_count,
                    confirmed_height,
                    &self.data_man.state_availability_boundary,
                )
                .expect(&concat!(file!(), ":", line!(), ":", column!()));
            self.set_block_tx_packed(inner, me);
            self.delayed_tx_recycle_in_skipped_blocks(inner, capped_fork_at);
            let to_state_pos = if inner
                .pivot_index_to_height(inner.pivot_chain.len())
                < DEFERRED_STATE_EPOCH_COUNT
            {
                0
            } else {
                inner.pivot_index_to_height(inner.pivot_chain.len())
                    - DEFERRED_STATE_EPOCH_COUNT
                    + 1
            };
            let mut state_at = capped_fork_at;
            if capped_fork_at + DEFERRED_STATE_EPOCH_COUNT
                > old_pivot_chain_height
            {
                if old_pivot_chain_height > DEFERRED_STATE_EPOCH_COUNT {
                    state_at =
                        old_pivot_chain_height - DEFERRED_STATE_EPOCH_COUNT + 1;
                } else {
                    state_at = 1;
                }
            }
            {
                let mut state_availability_boundary =
                    inner.data_man.state_availability_boundary.write();
                if pivot_changed {
                    assert!(
                        capped_fork_at > state_availability_boundary.lower_bound,
                        "forked_at {} should > boundary_lower_bound, boundary {:?}",
                        capped_fork_at,
                        state_availability_boundary
                    );
                    if extend_pivot {
                        state_availability_boundary
                            .pivot_chain
                            .push(inner.arena[me].hash);
                    } else {
                        let split_off_index = capped_fork_at
                            - state_availability_boundary.lower_bound;
                        state_availability_boundary
                            .pivot_chain
                            .truncate(split_off_index as usize);
                        for i in inner.height_to_pivot_index(capped_fork_at)
                            ..inner.pivot_chain.len()
                        {
                            state_availability_boundary
                                .pivot_chain
                                .push(inner.arena[inner.pivot_chain[i]].hash);
                        }
                        if state_availability_boundary.upper_bound
                            >= capped_fork_at
                        {
                            state_availability_boundary.upper_bound =
                                capped_fork_at - 1;
                        }
                    }
                    state_availability_boundary.optimistic_executed_height =
                        if to_state_pos
                            > state_availability_boundary.lower_bound
                        {
                            Some(to_state_pos)
                        } else {
                            None
                        };
                }
                if state_at < state_availability_boundary.lower_bound + 1 {
                    state_at = state_availability_boundary.lower_bound + 1;
                }
            }
            while state_at < to_state_pos {
                let epoch_arena_index =
                    inner.get_pivot_block_arena_index(state_at);
                let reward_execution_info = self
                    .executor
                    .get_reward_execution_info(inner, epoch_arena_index);
                self.executor.enqueue_epoch(EpochExecutionTask::new(
                    epoch_arena_index,
                    inner,
                    reward_execution_info,
                    true,  false, ));
                state_at += 1;
            }
        }
        self.persist_terminals(inner);
        debug!(
            "Finish activating block in ConsensusGraph: index={:?} hash={:?} cur_era_stable_height={} cur_era_genesis_height={}",
            me, inner.arena[me].hash, inner.cur_era_stable_height, inner.cur_era_genesis_height
        );
    }
    pub fn on_new_block(
        &self, inner: &mut ConsensusGraphInner, meter: &ConfirmationMeter,
        hash: &H256,
    ) {
        let block_header = self
            .data_man
            .block_header_by_hash(hash)
            .expect("header exist for consensus");
        debug!(
            "insert new block into consensus: header_only={:?} block={:?}",
            inner.header_only, &block_header
        );
        let parent_hash = block_header.parent_hash();
        let parent_index = inner.hash_to_arena_indices.get(&parent_hash);
        let me = if parent_index.is_none()
            || inner.arena[*parent_index.unwrap()].era_block == NULL
        {
            debug!(
                "parent={:?} not in consensus graph or not in the genesis subtree, inserted as an out-era block stub",
                parent_hash
            );
            let block_status_in_db = self
                .data_man
                .local_block_info_by_hash(hash)
                .map(|info| info.get_status())
                .unwrap_or(BlockStatus::Pending);
            let (sn, me) = inner.insert_out_era_block(
                &block_header,
                block_status_in_db == BlockStatus::PartialInvalid,
            );
            let block_info = LocalBlockInfo::new(
                block_status_in_db,
                sn,
                self.data_man.get_instance_id(),
            );
            self.data_man.insert_local_block_info(hash, block_info);
            if me == NULL {
                return;
            }
            me
        } else {
            let (me, indices_len) = inner.insert(&block_header);
            self.statistics
                .set_consensus_graph_inserted_block_count(indices_len);
            self.update_lcts_initial(inner, me);
            me
        };
        if inner.arena[me].data.inactive_dependency_cnt == 0 {
            let mut queue: VecDeque<usize> = VecDeque::new();
            queue.push_back(me);
            while let Some(me) = queue.pop_front() {
                let block_status = if inner.arena[me].era_block != NULL {
                    self.preactivate_block(inner, me)
                } else {
                    if inner.arena[me].data.partial_invalid {
                        BlockStatus::PartialInvalid
                    } else {
                        BlockStatus::Pending
                    }
                };
                if block_status == BlockStatus::PartialInvalid {
                    inner.arena[me].data.partial_invalid = true;
                    let timer =
                        self.compute_invalid_block_start_timer(inner, me);
                    if self.conf.bench_mode {
                        inner.invalid_block_queue.push((0, me));
                    } else {
                        inner.invalid_block_queue.push((-(timer as i128), me));
                    }
                    inner.arena[me].data.inactive_dependency_cnt = NULL;
                    debug!(
                        "Block {} (hash = {}) is partially invalid, all of its future will be non-active till timer height {}",
                        me, inner.arena[me].hash, timer
                    );
                } else {
                    if block_status == BlockStatus::Pending {
                        inner.arena[me].data.pending = true;
                        debug!(
                            "Block {} (hash = {}) is pending but processed",
                            me, inner.arena[me].hash
                        );
                    } else {
                        debug!(
                            "Block {} (hash = {}) is fully valid",
                            me, inner.arena[me].hash
                        );
                    }
                    self.activate_block(inner, me, meter, &mut queue);
                }
                let timer = if let Some(x) = inner.timer_chain.last() {
                    inner.arena[*x].data.ledger_view_timer_chain_height + 1
                } else {
                    inner.cur_era_genesis_timer_chain_height
                };
                loop {
                    if let Some((t, _)) = inner.invalid_block_queue.peek() {
                        if timer < (-*t) as u64 {
                            break;
                        }
                    } else {
                        break;
                    }
                    let (_, x) = inner.invalid_block_queue.pop().unwrap();
                    assert!(
                        inner.arena[x].data.inactive_dependency_cnt == NULL
                    );
                    inner.arena[x].data.inactive_dependency_cnt = 0;
                    self.activate_block(inner, x, meter, &mut queue);
                }
            }
        } else {
            debug!(
                "Block {} (hash = {}) is non-active with active counter {}",
                me,
                inner.arena[me].hash,
                inner.arena[me].data.inactive_dependency_cnt
            );
        }
    }
    fn persist_block_info(
        &self, inner: &mut ConsensusGraphInner, me: usize,
        block_status: BlockStatus,
    ) {
        let block_info = LocalBlockInfo::new(
            block_status,
            inner.arena[me].data.sequence_number,
            self.data_man.get_instance_id(),
        );
        self.data_man
            .insert_local_block_info(&inner.arena[me].hash, block_info);
        let era_block = inner.arena[me].era_block();
        let era_block_hash = if era_block != NULL {
            inner.arena[era_block].hash
        } else {
            Default::default()
        };
        if inner.inner_conf.enable_state_expose {
            STATE_EXPOSER.consensus_graph.lock().block_state_vec.push(
                ConsensusGraphBlockState {
                    block_hash: inner.arena[me].hash,
                    best_block_hash: inner.best_block_hash(),
                    block_status: block_info.get_status(),
                    era_block_hash,
                    adaptive: inner.arena[me].adaptive(),
                },
            )
        }
    }
    pub fn construct_pivot_state(
        &self, inner: &mut ConsensusGraphInner, meter: &ConfirmationMeter,
    ) {
        let state_boundary_height =
            self.data_man.state_availability_boundary.read().lower_bound;
        let start_pivot_index =
            (state_boundary_height - inner.cur_era_genesis_height) as usize;
        debug!(
            "construct_pivot_state: start={}, pivot_chain.len()={}, state_boundary_height={}",
            start_pivot_index,
            inner.pivot_chain.len(),
            state_boundary_height
        );
        if start_pivot_index >= inner.pivot_chain.len() {
            return;
        }
        let start_hash = inner.arena[inner.pivot_chain[start_pivot_index]].hash;
        if start_hash != inner.data_man.true_genesis.hash()
            && self
                .data_man
                .get_epoch_execution_commitment(&start_hash)
                .is_none()
        {
            self.data_man.load_epoch_execution_commitment_from_db(&start_hash)
                .expect("epoch_execution_commitment for stable hash must exist in disk");
        }
        {
            let mut state_availability_boundary =
                self.data_man.state_availability_boundary.write();
            assert!(
                state_availability_boundary.lower_bound
                    == state_availability_boundary.upper_bound
            );
            for pivot_index in start_pivot_index + 1..inner.pivot_chain.len() {
                state_availability_boundary
                    .pivot_chain
                    .push(inner.arena[inner.pivot_chain[pivot_index]].hash);
            }
        }
        if inner.pivot_chain.len() < DEFERRED_STATE_EPOCH_COUNT as usize {
            return;
        }
        let end_index =
            inner.pivot_chain.len() - DEFERRED_STATE_EPOCH_COUNT as usize + 1;
        for pivot_index in start_pivot_index + 1..end_index {
            let pivot_arena_index = inner.pivot_chain[pivot_index];
            let pivot_hash = inner.arena[pivot_arena_index].hash;
            if self
                .data_man
                .load_epoch_execution_commitment_from_db(&pivot_hash)
                .is_none()
            {
                break;
            }
        }
        let mut start_compute_epoch_pivot_index =
            self.get_force_compute_index(inner, start_pivot_index, end_index);
        for pivot_index in start_pivot_index + 1..end_index {
            let pivot_arena_index = inner.pivot_chain[pivot_index];
            let pivot_hash = inner.arena[pivot_arena_index].hash;
            if self
                .data_man
                .get_epoch_execution_commitment(&pivot_hash)
                .is_none()
            {
                start_compute_epoch_pivot_index =
                    min(pivot_index, start_compute_epoch_pivot_index);
                debug!(
                    "Start compute epoch pivot index {}, height {}",
                    pivot_index, inner.arena[pivot_arena_index].height
                );
                break;
            }
        }
        let snapshot_epoch_count = inner
            .data_man
            .storage_manager
            .get_storage_manager()
            .get_snapshot_epoch_count();
        let mut need_set_intermediate_trie_root_merkle = false;
        let max_snapshot_epoch_index_has_mpt = self
            .recover_latest_mpt_snapshot_if_needed(
                inner,
                &mut start_compute_epoch_pivot_index,
                start_pivot_index,
                end_index,
                &mut need_set_intermediate_trie_root_merkle,
                snapshot_epoch_count as u64,
            );
        self.set_intermediate_trie_root_merkle(
            inner,
            start_compute_epoch_pivot_index,
            need_set_intermediate_trie_root_merkle,
            snapshot_epoch_count as u64,
        );
        let confirmed_epoch_num = meter.get_confirmed_epoch_num();
        for pivot_index in start_pivot_index + 1..end_index {
            let pivot_arena_index = inner.pivot_chain[pivot_index];
            let pivot_hash = inner.arena[pivot_arena_index].hash;
            let height = inner.arena[pivot_arena_index].height;
            let compute_epoch =
                if pivot_index >= start_compute_epoch_pivot_index {
                    true
                } else {
                    false
                };
            if self
                .data_man
                .get_epoch_execution_commitment(&pivot_hash)
                .is_some()
            {
                self.data_man
                    .state_availability_boundary
                    .write()
                    .upper_bound += 1;
            }
            info!(
                "construct_pivot_state: index {} height {} compute_epoch {}.",
                pivot_index, height, compute_epoch,
            );
            if compute_epoch {
                let reward_execution_info = self
                    .executor
                    .get_reward_execution_info(inner, pivot_arena_index);
                let recover_mpt_during_construct_pivot_state =
                    max_snapshot_epoch_index_has_mpt.map_or(true, |idx| {
                        pivot_index > idx + snapshot_epoch_count as usize
                    });
                info!(
                    "compute epoch recovery flag {}",
                    recover_mpt_during_construct_pivot_state
                );
                self.executor.compute_epoch(
                    EpochExecutionTask::new(
                        pivot_arena_index,
                        inner,
                        reward_execution_info,
                        true, true, ),
                    None,
                    recover_mpt_during_construct_pivot_state,
                );
                {
                    let mut confirmed_height = min(confirmed_epoch_num, height);
                    if confirmed_height < DEFERRED_STATE_EPOCH_COUNT {
                        confirmed_height = 0;
                    } else {
                        confirmed_height -= DEFERRED_STATE_EPOCH_COUNT;
                    }
                    self.data_man
                        .storage_manager
                        .get_storage_manager()
                        .maintain_state_confirmed(
                            inner,
                            inner.cur_era_stable_height,
                            self.conf.inner_conf.era_epoch_count,
                            confirmed_height,
                            &self.data_man.state_availability_boundary,
                        )
                        .expect(&concat!(
                            file!(),
                            ":",
                            line!(),
                            ":",
                            column!()
                        ));
                }
            }
        }
        inner
            .data_man
            .storage_manager
            .get_storage_manager()
            .get_snapshot_manager()
            .get_snapshot_db_manager()
            .clean_snapshot_epoch_id_before_recovered();
    }
    fn get_force_compute_index(
        &self, inner: &mut ConsensusGraphInner, start_pivot_index: usize,
        end_index: usize,
    ) -> usize {
        let mut force_compute_index = start_pivot_index + 1;
        let mut epoch_count = 0;
        for pivot_index in (start_pivot_index + 1..end_index).rev() {
            let pivot_arena_index = inner.pivot_chain[pivot_index];
            let pivot_hash = inner.arena[pivot_arena_index].hash;
            let maybe_epoch_execution_commitment =
                self.data_man.get_epoch_execution_commitment(&pivot_hash);
            if let Some(commitment) = *maybe_epoch_execution_commitment {
                if self
                    .data_man
                    .storage_manager
                    .get_state_no_commit_inner(
                        StateIndex::new_for_readonly(
                            &pivot_hash,
                            &commitment.state_root_with_aux_info,
                        ),
                        false,
                        false,
                    )
                    .expect("DB Error")
                    .is_some()
                {
                    epoch_count += 1;
                    if epoch_count > DEFERRED_STATE_EPOCH_COUNT {
                        let reward_execution_info =
                            self.executor.get_reward_execution_info(
                                inner,
                                pivot_arena_index,
                            );
                        let pivot_block_height = self
                            .data_man
                            .block_header_by_hash(&pivot_hash)
                            .expect("must exists")
                            .height();
                        if self.executor.epoch_executed_and_recovered(
                            &pivot_hash,
                            &inner.get_epoch_block_hashes(pivot_arena_index),
                            true,
                            &reward_execution_info,
                            pivot_block_height,
                        ) {
                            force_compute_index = pivot_index + 1;
                            debug!(
                                "Force compute start index {}",
                                force_compute_index
                            );
                            break;
                        }
                    }
                } else {
                    epoch_count = 0;
                }
            }
        }
        if let Some(height) = self
            .conf
            .inner_conf
            .force_recompute_height_during_construct_pivot
        {
            if height > inner.cur_era_stable_height {
                let pivot_idx = inner.height_to_pivot_index(height);
                debug!(
                    "force recompute height during constructing pivot {}",
                    pivot_idx
                );
                force_compute_index = min(force_compute_index, pivot_idx);
            }
        }
        force_compute_index
    }
    fn recover_latest_mpt_snapshot_if_needed(
        &self, inner: &mut ConsensusGraphInner,
        start_compute_epoch_pivot_index: &mut usize, start_pivot_index: usize,
        end_index: usize, need_set_intermediate_trie_root_merkle: &mut bool,
        snapshot_epoch_count: u64,
    ) -> Option<usize> {
        if !self.conf.inner_conf.use_isolated_db_for_mpt_table {
            return Some(end_index);
        }
        let (
            temp_snapshot_db_existing,
            removed_snapshots,
            latest_snapshot_epoch_height,
            max_snapshot_epoch_height_has_mpt,
        ) = if let Some((
            temp_snapshot_db_existing,
            removed_snapshots,
            latest_snapshot_epoch_height,
            max_snapshot_epoch_height_has_mpt,
        )) = inner
            .data_man
            .storage_manager
            .get_storage_manager()
            .persist_state_from_initialization
            .write()
            .take()
        {
            (
                temp_snapshot_db_existing,
                removed_snapshots,
                max(latest_snapshot_epoch_height, inner.cur_era_stable_height),
                max_snapshot_epoch_height_has_mpt,
            )
        } else {
            (None, HashSet::new(), inner.cur_era_stable_height, None)
        };
        debug!("latest snapshot epoch height: {}, temp snapshot status: {:?}, max snapshot epoch height has mpt: {:?}, removed snapshots {:?}",
            latest_snapshot_epoch_height, temp_snapshot_db_existing, max_snapshot_epoch_height_has_mpt, removed_snapshots);
        if removed_snapshots.len() == 1
            && removed_snapshots.contains(&NULL_EPOCH)
        {
            debug!("special case for synced snapshot");
            return Some(end_index);
        }
        if max_snapshot_epoch_height_has_mpt
            .is_some_and(|h| h == latest_snapshot_epoch_height)
        {
            inner
                .data_man
                .storage_manager
                .get_storage_manager()
                .get_snapshot_manager()
                .get_snapshot_db_manager()
                .recreate_latest_mpt_snapshot()
                .unwrap();
            info!(
                "snapshot for epoch height {} is still not use mpt database",
                start_compute_epoch_pivot_index
            );
            return Some(end_index);
        }
        let maximum_height_to_create_next_snapshot =
            latest_snapshot_epoch_height + snapshot_epoch_count * 2;
        let index =
            inner.height_to_pivot_index(maximum_height_to_create_next_snapshot);
        if *start_compute_epoch_pivot_index > index {
            warn!("start_compute_epoch_pivot_index is greater than maximum epoch need to compute {}", index);
            *start_compute_epoch_pivot_index = index;
        }
        let start_compute_epoch_height = inner.arena
            [inner.pivot_chain[*start_compute_epoch_pivot_index]]
            .height;
        info!(
            "current start compute epoch height {}",
            start_compute_epoch_height
        );
        let recovery_latest_mpt_snapshot =
            if self.conf.inner_conf.recovery_latest_mpt_snapshot
                || start_compute_epoch_height <= latest_snapshot_epoch_height
                || (temp_snapshot_db_existing.is_some()
                    && latest_snapshot_epoch_height
                        < start_compute_epoch_height
                    && start_compute_epoch_height
                        <= latest_snapshot_epoch_height + snapshot_epoch_count)
            {
                true
            } else {
                let mut max_epoch_height = 0;
                for pivot_index in (start_pivot_index..end_index)
                    .step_by(snapshot_epoch_count as usize)
                {
                    let pivot_arena_index = inner.pivot_chain[pivot_index];
                    let pivot_hash = inner.arena[pivot_arena_index].hash;
                    debug!(
                        "snapshot pivot_index {} height {} ",
                        pivot_index, inner.arena[pivot_arena_index].height
                    );
                    if removed_snapshots.contains(&pivot_hash) {
                        max_epoch_height = max(
                            max_epoch_height,
                            inner.arena[pivot_arena_index].height,
                        );
                    }
                }
                latest_snapshot_epoch_height < max_epoch_height
            };
        if recovery_latest_mpt_snapshot {
            let era_pivot_epoch_height = if start_compute_epoch_height
                <= inner.cur_era_stable_height + snapshot_epoch_count
            {
                debug!("snapshot for cur_era_stable_height must be exist");
                inner.cur_era_stable_height
            } else {
                (start_compute_epoch_height - snapshot_epoch_count - 1)
                    / self.conf.inner_conf.era_epoch_count
                    * self.conf.inner_conf.era_epoch_count
            };
            if era_pivot_epoch_height > latest_snapshot_epoch_height {
                panic!("era_pivot_epoch_height is greater than latest_snapshot_epoch_height, this should not happen");
            }
            debug!(
                "need recovery latest mpt snapshot, start compute epoch height {}, era pivot epoch height {}",
                start_compute_epoch_height, era_pivot_epoch_height
            );
            if start_compute_epoch_height <= era_pivot_epoch_height {
                unreachable!("start_compute_epoch_height {} is smaller than era_pivot_epoch_height {}", start_compute_epoch_height, era_pivot_epoch_height);
            } else if start_compute_epoch_height
                <= era_pivot_epoch_height + snapshot_epoch_count
            {
                if start_compute_epoch_height % snapshot_epoch_count == 1 {
                    *need_set_intermediate_trie_root_merkle = true;
                }
            } else if start_compute_epoch_height
                <= era_pivot_epoch_height + snapshot_epoch_count * 2
            {
                } else {
                let new_height =
                    era_pivot_epoch_height + snapshot_epoch_count * 2;
                let new_index = inner.height_to_pivot_index(new_height);
                info!("reset start_compute_epoch_pivot_index to {}", new_index);
                *start_compute_epoch_pivot_index = new_index;
            }
            let era_pivot_hash = if era_pivot_epoch_height == 0 {
                NULL_EPOCH
            } else {
                inner
                    .get_pivot_hash_from_epoch_number(era_pivot_epoch_height)
                    .expect("pivot hash should be exist")
            };
            let snapshot_db_manager = inner
                .data_man
                .storage_manager
                .get_storage_manager()
                .get_snapshot_manager()
                .get_snapshot_db_manager();
            snapshot_db_manager.update_latest_snapshot_id(
                era_pivot_hash.clone(),
                era_pivot_epoch_height,
            );
            if max_snapshot_epoch_height_has_mpt
                .is_some_and(|height| height >= era_pivot_epoch_height)
            {
                snapshot_db_manager.recreate_latest_mpt_snapshot().unwrap();
            } else {
                let pivot_hash_before_era = if era_pivot_epoch_height == 0 {
                    None
                } else {
                    Some(
                        inner
                            .get_pivot_hash_from_epoch_number(
                                era_pivot_epoch_height - snapshot_epoch_count,
                            )
                            .expect("pivot hash should be exist"),
                    )
                };
                snapshot_db_manager
                    .recovery_latest_mpt_snapshot_from_checkpoint(
                        &era_pivot_hash,
                        pivot_hash_before_era,
                    )
                    .unwrap();
            }
            max_snapshot_epoch_height_has_mpt.and_then(|v| {
                if v >= inner.cur_era_stable_height {
                    Some(inner.height_to_pivot_index(v))
                } else {
                    None
                }
            })
        } else {
            if temp_snapshot_db_existing.is_some()
                && latest_snapshot_epoch_height + snapshot_epoch_count
                    < start_compute_epoch_height
                && start_compute_epoch_height
                    <= latest_snapshot_epoch_height + 2 * snapshot_epoch_count
            {
                inner
                    .data_man
                    .storage_manager
                    .get_storage_manager()
                    .get_snapshot_manager()
                    .get_snapshot_db_manager()
                    .set_reconstruct_snapshot_id(temp_snapshot_db_existing);
            }
            debug!("the latest MPT snapshot is valid");
            Some(end_index)
        }
    }
    fn set_intermediate_trie_root_merkle(
        &self, inner: &mut ConsensusGraphInner,
        start_compute_epoch_pivot_index: usize,
        need_set_intermediate_trie_root_merkle: bool,
        snapshot_epoch_count: u64,
    ) {
        let storage_manager =
            inner.data_man.storage_manager.get_storage_manager();
        if !storage_manager
            .storage_conf
            .keep_snapshot_before_stable_checkpoint
            || need_set_intermediate_trie_root_merkle
        {
            let pivot_arena_index =
                inner.pivot_chain[start_compute_epoch_pivot_index - 1];
            let pivot_hash = inner.arena[pivot_arena_index].hash;
            let height = inner.arena[pivot_arena_index].height + 1;
            let intermediate_trie_root_merkle = match *self
                .data_man
                .get_epoch_execution_commitment(&pivot_hash)
            {
                Some(commitment) => {
                    if height % snapshot_epoch_count == 1 {
                        commitment
                            .state_root_with_aux_info
                            .state_root
                            .delta_root
                    } else {
                        commitment
                            .state_root_with_aux_info
                            .state_root
                            .intermediate_delta_root
                    }
                }
                None => MERKLE_NULL_NODE,
            };
            debug!("previous pivot hash {:?} intermediate trie root merkle for next pivot {:?}", pivot_hash, intermediate_trie_root_merkle);
            *storage_manager.intermediate_trie_root_merkle.write() =
                Some(intermediate_trie_root_merkle);
        }
    }
    fn set_block_tx_packed(&self, inner: &ConsensusGraphInner, me: usize) {
        if !self.txpool.ready_for_mining() {
            return;
        }
        let parent = inner.arena[me].parent;
        if parent == NULL {
            warn!(
                "set_block_tx_packed skips block with empty parent {:?}",
                inner.arena[me].hash
            );
            return;
        }
        let era_genesis_height =
            inner.get_era_genesis_height(inner.arena[parent].height);
        let cur_pivot_era_block = if inner
            .pivot_index_to_height(inner.pivot_chain.len())
            > era_genesis_height
        {
            inner.get_pivot_block_arena_index(era_genesis_height)
        } else {
            NULL
        };
        let era_block = inner.get_era_genesis_block_with_parent(parent);
        if era_block == cur_pivot_era_block {
            self.txpool.set_tx_packed(
                &self
                    .data_man
                    .block_by_hash(
                        &inner.arena[me].hash,
                        true, )
                    .expect("Already checked")
                    .transactions,
            );
        } else {
            warn!("set_block_tx_packed skips block {:?}", inner.arena[me].hash);
        }
    }
    fn delayed_tx_recycle_in_skipped_blocks(
        &self, inner: &mut ConsensusGraphInner, fork_height: u64,
    ) {
        if !self.txpool.ready_for_mining() {
            return;
        }
        if inner.pivot_chain.len() > RECYCLE_TRANSACTION_DELAY as usize {
            let recycle_end_pivot_index = inner.pivot_chain.len()
                - RECYCLE_TRANSACTION_DELAY as usize
                - 1;
            let start = min(
                inner.height_to_pivot_index(fork_height),
                recycle_end_pivot_index,
            );
            for recycle_pivot_index in start..=recycle_end_pivot_index {
                let recycle_arena_index =
                    inner.pivot_chain[recycle_pivot_index];
                let skipped_blocks = inner
                    .get_or_compute_skipped_epoch_blocks(recycle_arena_index)
                    .clone();
                for h in &skipped_blocks {
                    self.recycle_tx_in_block(inner, h);
                }
            }
        }
    }
}