cfxcore/consensus/consensus_inner/
consensus_new_block_handler.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use super::blame_verifier::BlameVerifier;
6use crate::{
7    block_data_manager::{BlockDataManager, BlockStatus, LocalBlockInfo},
8    channel::Channel,
9    consensus::{
10        consensus_inner::{
11            confirmation_meter::ConfirmationMeter,
12            consensus_executor::{ConsensusExecutor, EpochExecutionTask},
13            ConsensusGraphInner, NULL,
14        },
15        pivot_hint::PivotHint,
16        pos_handler::PosVerifier,
17        ConsensusConfig,
18    },
19    state_exposer::{ConsensusGraphBlockState, STATE_EXPOSER},
20    statistics::SharedStatistics,
21    NodeType, Notifications, SharedTransactionPool,
22};
23use cfx_parameters::{consensus::*, consensus_internal::*};
24use cfx_storage::{storage_db::SnapshotDbManagerTrait, StateIndex};
25use cfx_types::H256;
26use hibitset::{BitSet, BitSetLike, DrainableBitSet};
27use parking_lot::Mutex;
28use primitives::{MERKLE_NULL_NODE, NULL_EPOCH};
29use std::{
30    cmp::{max, min},
31    collections::{BinaryHeap, HashMap, HashSet, VecDeque},
32    slice::Iter,
33    sync::Arc,
34};
35
36pub struct ConsensusNewBlockHandler {
37    conf: ConsensusConfig,
38    txpool: SharedTransactionPool,
39    data_man: Arc<BlockDataManager>,
40    executor: Arc<ConsensusExecutor>,
41    pos_verifier: Arc<PosVerifier>,
42    statistics: SharedStatistics,
43
44    /// Channel used to send epochs to PubSub
45    /// Each element is <epoch_number, epoch_hashes>
46    epochs_sender: Arc<Channel<(u64, Vec<H256>)>>,
47
48    /// API used for verifying blaming on light nodes.
49    blame_verifier: Mutex<BlameVerifier>,
50
51    /// The type of this node: Archive, Full, or Light.
52    node_type: NodeType,
53
54    pivot_hint: Option<Arc<PivotHint>>,
55}
56
57/// ConsensusNewBlockHandler contains all sub-routines for handling new arriving
58/// blocks from network or db. It manipulates and updates ConsensusGraphInner
59/// object accordingly.
60impl ConsensusNewBlockHandler {
61    pub fn new(
62        conf: ConsensusConfig, txpool: SharedTransactionPool,
63        data_man: Arc<BlockDataManager>, executor: Arc<ConsensusExecutor>,
64        statistics: SharedStatistics, notifications: Arc<Notifications>,
65        node_type: NodeType, pos_verifier: Arc<PosVerifier>,
66        pivot_hint: Option<Arc<PivotHint>>,
67    ) -> Self {
68        let epochs_sender = notifications.epochs_ordered.clone();
69        let blame_verifier =
70            Mutex::new(BlameVerifier::new(data_man.clone(), notifications));
71
72        Self {
73            pos_verifier,
74            conf,
75            txpool,
76            data_man,
77            executor,
78            statistics,
79            epochs_sender,
80            blame_verifier,
81            node_type,
82            pivot_hint,
83        }
84    }
85
86    /// Return (old_era_block_set, new_era_block_set).
87    /// `old_era_block_set` includes the blocks in the past of
88    /// `new_era_block_arena_index`. `new_era_block_set` includes all other
89    /// blocks (the anticone and the future).
90    fn compute_old_era_and_new_era_block_set(
91        inner: &mut ConsensusGraphInner, new_era_block_arena_index: usize,
92    ) -> (HashSet<usize>, HashSet<usize>) {
93        // We first compute the set of blocks inside the new era and we
94        // recompute the past_weight inside the stable height.
95        let mut old_era_block_arena_index_set = HashSet::new();
96        let mut queue = VecDeque::new();
97        queue.push_back(new_era_block_arena_index);
98        while let Some(x) = queue.pop_front() {
99            if inner.arena[x].parent != NULL
100                && !old_era_block_arena_index_set
101                    .contains(&inner.arena[x].parent)
102            {
103                old_era_block_arena_index_set.insert(inner.arena[x].parent);
104                queue.push_back(inner.arena[x].parent);
105            }
106            for referee in &inner.arena[x].referees {
107                if *referee != NULL
108                    && !old_era_block_arena_index_set.contains(referee)
109                {
110                    old_era_block_arena_index_set.insert(*referee);
111                    queue.push_back(*referee);
112                }
113            }
114        }
115        let mut new_era_block_arena_index_set = HashSet::new();
116        for (i, _) in &inner.arena {
117            if !old_era_block_arena_index_set.contains(&i) {
118                new_era_block_arena_index_set.insert(i);
119            }
120        }
121        (old_era_block_arena_index_set, new_era_block_arena_index_set)
122    }
123
124    /// Note that there is an important assumption: the timer chain must have no
125    /// block in the anticone of new_era_block_arena_index. If this is not
126    /// true, it cannot become a checkpoint block
127    fn make_checkpoint_at(
128        inner: &mut ConsensusGraphInner, new_era_block_arena_index: usize,
129    ) {
130        let new_era_height = inner.arena[new_era_block_arena_index].height;
131        let (outside_block_arena_indices, new_era_block_arena_index_set) =
132            Self::compute_old_era_and_new_era_block_set(
133                inner,
134                new_era_block_arena_index,
135            );
136
137        // This is the arena indices for legacy blocks.
138        let mut new_era_genesis_subtree = HashSet::new();
139        let mut queue = VecDeque::new();
140        queue.push_back(new_era_block_arena_index);
141        while let Some(x) = queue.pop_front() {
142            new_era_genesis_subtree.insert(x);
143            for child in &inner.arena[x].children {
144                queue.push_back(*child);
145            }
146        }
147        let new_era_legacy_block_arena_index_set: HashSet<_> =
148            new_era_block_arena_index_set
149                .difference(&new_era_genesis_subtree)
150                .collect();
151
152        // Next we are going to recompute all referee and referrer information
153        // in arena
154        let new_era_pivot_index = inner.height_to_pivot_index(new_era_height);
155        for v in new_era_block_arena_index_set.iter() {
156            let me = *v;
157            // It is necessary to process `referees` and
158            // `blockset_in_own_view_of_epoch` because
159            // `new_era_block_arena_index_set` include the blocks in
160            // the anticone of the new era genesis.
161            inner.arena[me]
162                .referees
163                .retain(|v| new_era_block_arena_index_set.contains(v));
164            inner.arena[me]
165                .data
166                .blockset_in_own_view_of_epoch
167                .retain(|v| new_era_block_arena_index_set.contains(v));
168            if !new_era_block_arena_index_set.contains(
169                &inner.arena[me].data.past_view_last_timer_block_arena_index,
170            ) {
171                inner.arena[me].data.past_view_last_timer_block_arena_index =
172                    NULL;
173            }
174            if !new_era_block_arena_index_set
175                .contains(&inner.arena[me].data.force_confirm)
176            {
177                inner.arena[me].data.force_confirm = new_era_block_arena_index;
178            }
179        }
180        // reassign the parent for outside era blocks
181        for v in new_era_legacy_block_arena_index_set {
182            let me = *v;
183            let mut parent = inner.arena[me].parent;
184            if inner.arena[me].era_block != NULL {
185                inner.split_root(me);
186            }
187            if !new_era_block_arena_index_set.contains(&parent) {
188                parent = NULL;
189            }
190            inner.arena[me].parent = parent;
191            inner.arena[me].era_block = NULL;
192            inner.terminal_hashes.remove(&inner.arena[me].hash);
193        }
194        // Now we are ready to cleanup outside blocks in inner data structures
195        inner
196            .pastset_cache
197            .intersect_update(&outside_block_arena_indices);
198        for index in outside_block_arena_indices {
199            let hash = inner.arena[index].hash;
200            inner.hash_to_arena_indices.remove(&hash);
201            inner.terminal_hashes.remove(&hash);
202            inner.arena.remove(index);
203            // remove useless data in BlockDataManager
204            inner.data_man.remove_epoch_execution_commitment(&hash);
205            inner.data_man.remove_epoch_execution_context(&hash);
206        }
207
208        // Now we truncate the timer chain that are outside the genesis.
209        let mut timer_chain_truncate = 0;
210        while timer_chain_truncate < inner.timer_chain.len()
211            && !new_era_block_arena_index_set
212                .contains(&inner.timer_chain[timer_chain_truncate])
213        {
214            timer_chain_truncate += 1;
215        }
216        inner.cur_era_genesis_timer_chain_height += timer_chain_truncate as u64;
217        assert_eq!(
218            inner.cur_era_genesis_timer_chain_height,
219            inner.arena[new_era_block_arena_index]
220                .data
221                .ledger_view_timer_chain_height
222        );
223        for i in 0..(inner.timer_chain.len() - timer_chain_truncate) {
224            inner.timer_chain[i] = inner.timer_chain[i + timer_chain_truncate];
225            if i + timer_chain_truncate
226                < inner.timer_chain_accumulative_lca.len()
227            {
228                inner.timer_chain_accumulative_lca[i] = inner
229                    .timer_chain_accumulative_lca[i + timer_chain_truncate];
230            }
231        }
232        inner
233            .timer_chain
234            .resize(inner.timer_chain.len() - timer_chain_truncate, 0);
235        if inner.timer_chain_accumulative_lca.len() > timer_chain_truncate {
236            inner.timer_chain_accumulative_lca.resize(
237                inner.timer_chain_accumulative_lca.len() - timer_chain_truncate,
238                0,
239            );
240        } else {
241            inner.timer_chain_accumulative_lca.clear();
242        }
243        // Move LCA to new genesis if necessary!
244        for i in 0..inner.timer_chain_accumulative_lca.len() {
245            if i < inner.inner_conf.timer_chain_beta as usize - 1
246                || !new_era_genesis_subtree
247                    .contains(&inner.timer_chain_accumulative_lca[i])
248            {
249                inner.timer_chain_accumulative_lca[i] =
250                    new_era_block_arena_index;
251            }
252        }
253
254        assert!(new_era_pivot_index < inner.pivot_chain.len());
255        inner.pivot_chain = inner.pivot_chain.split_off(new_era_pivot_index);
256        inner.pivot_chain_metadata =
257            inner.pivot_chain_metadata.split_off(new_era_pivot_index);
258        // Recompute past weight values
259        inner.pivot_chain_metadata[0].past_weight =
260            inner.block_weight(new_era_block_arena_index);
261        for i in 1..inner.pivot_chain_metadata.len() {
262            let pivot = inner.pivot_chain[i];
263            inner.pivot_chain_metadata[i].past_weight =
264                inner.pivot_chain_metadata[i - 1].past_weight
265                    + inner.total_weight_in_own_epoch(
266                        &inner.arena[pivot].data.blockset_in_own_view_of_epoch,
267                        new_era_block_arena_index,
268                    )
269                    + inner.block_weight(pivot)
270        }
271        for d in inner.pivot_chain_metadata.iter_mut() {
272            d.last_pivot_in_past_blocks
273                .retain(|v| new_era_block_arena_index_set.contains(v));
274        }
275        inner
276            .anticone_cache
277            .intersect_update(&new_era_genesis_subtree);
278
279        // Clear best_terminals_lca_caches
280        inner.best_terminals_lca_height_cache.clear();
281
282        // Clear has_timer_block_in_anticone cache
283        inner.has_timer_block_in_anticone_cache.clear();
284
285        // Chop off all link-cut-trees in the inner data structure
286        inner.split_root(new_era_block_arena_index);
287
288        inner.cur_era_genesis_block_arena_index = new_era_block_arena_index;
289        inner.cur_era_genesis_height = new_era_height;
290
291        let cur_era_hash = inner.arena[new_era_block_arena_index].hash.clone();
292        let stable_era_arena_index =
293            inner.get_pivot_block_arena_index(inner.cur_era_stable_height);
294        let stable_era_hash = inner.arena[stable_era_arena_index].hash.clone();
295
296        // This must be true given our checkpoint rule!
297        for (_, x) in &inner.invalid_block_queue {
298            assert!(new_era_block_arena_index_set.contains(x))
299        }
300
301        inner.data_man.set_cur_consensus_era_genesis_hash(
302            &cur_era_hash,
303            &stable_era_hash,
304        );
305        inner
306            .data_man
307            .new_checkpoint(new_era_height, inner.best_epoch_number());
308    }
309
310    pub fn compute_anticone_bruteforce(
311        inner: &ConsensusGraphInner, me: usize,
312    ) -> BitSet {
313        let parent = inner.arena[me].parent;
314        if parent == NULL {
315            // This is genesis, so the anticone should be empty
316            return BitSet::new();
317        }
318        let mut last_in_pivot = inner.arena[parent].data.last_pivot_in_past;
319        for referee in &inner.arena[me].referees {
320            last_in_pivot = max(
321                last_in_pivot,
322                inner.arena[*referee].data.last_pivot_in_past,
323            );
324        }
325        let mut visited = BitSet::new();
326        let mut queue = VecDeque::new();
327        queue.push_back(me);
328        visited.add(me as u32);
329        while let Some(index) = queue.pop_front() {
330            let parent = inner.arena[index].parent;
331            if parent != NULL
332                && inner.arena[parent].data.epoch_number > last_in_pivot
333                && !visited.contains(parent as u32)
334            {
335                visited.add(parent as u32);
336                queue.push_back(parent);
337            }
338            for referee in &inner.arena[index].referees {
339                if inner.arena[*referee].data.epoch_number > last_in_pivot
340                    && !visited.contains(*referee as u32)
341                {
342                    visited.add(*referee as u32);
343                    queue.push_back(*referee);
344                }
345            }
346        }
347        // Now we traverse all future of me, when adding new block, this is
348        // empty
349        queue.clear();
350        queue.push_back(me);
351        while let Some(index) = queue.pop_front() {
352            for child in &inner.arena[index].children {
353                if !visited.contains(*child as u32) {
354                    visited.add(*child as u32);
355                    queue.push_back(*child);
356                }
357            }
358            for referrer in &inner.arena[index].referrers {
359                if !visited.contains(*referrer as u32) {
360                    visited.add(*referrer as u32);
361                    queue.push_back(*referrer);
362                }
363            }
364        }
365
366        let mut anticone = BitSet::with_capacity(inner.arena.capacity() as u32);
367        for (i, node) in inner.arena.iter() {
368            if node.data.epoch_number > last_in_pivot
369                && !visited.contains(i as u32)
370                && (node.data.activated || node.data.inactive_dependency_cnt == NULL) /* We include only preactivated blocks */
371                && node.era_block != NULL
372            /* We exclude out-of-era blocks */
373            {
374                anticone.add(i as u32);
375            }
376        }
377        anticone
378    }
379
380    pub fn compute_anticone_hashset_bruteforce(
381        inner: &ConsensusGraphInner, me: usize,
382    ) -> HashSet<usize> {
383        let s =
384            ConsensusNewBlockHandler::compute_anticone_bruteforce(inner, me);
385        let mut ret = HashSet::new();
386        for index in s.iter() {
387            ret.insert(index as usize);
388        }
389        ret
390    }
391
392    /// Note that this function is not a pure computation function. It has the
393    /// sideeffect of updating all existing anticone set in the anticone
394    /// cache
395    fn compute_and_update_anticone(
396        inner: &mut ConsensusGraphInner, me: usize,
397    ) -> (BitSet, BitSet) {
398        let parent = inner.arena[me].parent;
399
400        // If we do not have the anticone of its parent, we compute it with
401        // brute force!
402        let parent_anticone_opt = inner.anticone_cache.get(parent);
403        let mut anticone;
404        if parent_anticone_opt.is_none() {
405            anticone = ConsensusNewBlockHandler::compute_anticone_bruteforce(
406                inner, me,
407            );
408        } else {
409            // anticone = parent_anticone + parent_future - my_past
410            // Compute future set of parent
411            anticone = inner.compute_future_bitset(parent);
412            anticone.remove(me as u32);
413
414            for index in parent_anticone_opt.unwrap() {
415                anticone.add(*index as u32);
416            }
417            let mut my_past = BitSet::new();
418            let mut queue: VecDeque<usize> = VecDeque::new();
419            queue.push_back(me);
420            while let Some(index) = queue.pop_front() {
421                if my_past.contains(index as u32) {
422                    continue;
423                }
424
425                debug_assert!(index != parent);
426                if index != me {
427                    my_past.add(index as u32);
428                }
429
430                let idx_parent = inner.arena[index].parent;
431                if idx_parent != NULL {
432                    if anticone.contains(idx_parent as u32)
433                        || inner.arena[idx_parent].era_block == NULL
434                    {
435                        queue.push_back(idx_parent);
436                    }
437                }
438
439                for referee in &inner.arena[index].referees {
440                    if anticone.contains(*referee as u32)
441                        || inner.arena[*referee].era_block == NULL
442                    {
443                        queue.push_back(*referee);
444                    }
445                }
446            }
447            for index in my_past.drain() {
448                anticone.remove(index);
449            }
450
451            // We only consider non-lagacy blocks when computing anticone.
452            for index in anticone.clone().iter() {
453                if inner.arena[index as usize].era_block == NULL {
454                    anticone.remove(index);
455                }
456            }
457        }
458
459        inner.anticone_cache.update(me, &anticone);
460
461        let mut anticone_barrier = BitSet::new();
462        for index in anticone.clone().iter() {
463            let parent = inner.arena[index as usize].parent as u32;
464            if !anticone.contains(parent) {
465                anticone_barrier.add(index);
466            }
467        }
468
469        debug!(
470            "Block {} anticone size {}",
471            inner.arena[me].hash,
472            anticone.len()
473        );
474
475        (anticone, anticone_barrier)
476    }
477
478    fn check_correct_parent_brutal(
479        inner: &ConsensusGraphInner, me: usize, subtree_weight: &Vec<i128>,
480        checking_candidate: Iter<usize>,
481    ) -> bool {
482        let mut valid = true;
483        let parent = inner.arena[me].parent;
484        let force_confirm = inner.arena[me].data.force_confirm;
485        let force_confirm_height = inner.arena[force_confirm].height;
486
487        // Check the pivot selection decision.
488        for consensus_arena_index_in_epoch in checking_candidate {
489            let lca = inner.lca(*consensus_arena_index_in_epoch, parent);
490            assert!(lca != *consensus_arena_index_in_epoch);
491            // If it is outside current era, we will skip!
492            if lca == NULL || inner.arena[lca].height < force_confirm_height {
493                continue;
494            }
495            if lca == parent {
496                valid = false;
497                break;
498            }
499
500            let fork = inner.ancestor_at(
501                *consensus_arena_index_in_epoch,
502                inner.arena[lca].height + 1,
503            );
504            let pivot = inner.ancestor_at(parent, inner.arena[lca].height + 1);
505
506            let fork_subtree_weight = subtree_weight[fork];
507            let pivot_subtree_weight = subtree_weight[pivot];
508
509            if ConsensusGraphInner::is_heavier(
510                (fork_subtree_weight, &inner.arena[fork].hash),
511                (pivot_subtree_weight, &inner.arena[pivot].hash),
512            ) {
513                valid = false;
514                break;
515            }
516        }
517
518        valid
519    }
520
521    fn check_correct_parent(
522        inner: &mut ConsensusGraphInner, me: usize, anticone_barrier: &BitSet,
523        weight_tuple: Option<&Vec<i128>>,
524    ) -> bool {
525        let parent = inner.arena[me].parent;
526        // FIXME: Because now we allow partial invalid blocks as parent, we need
527        // to consider more for block candidates. This may cause a
528        // performance issue and we should consider another optimized strategy.
529        let mut candidate;
530        let blockset =
531            inner.exchange_or_compute_blockset_in_own_view_of_epoch(me, None);
532        // Note that here we have to be conservative. If it is pending we have
533        // to treat it as if it is partial invalid.
534        let candidate_iter = if inner.arena[parent].data.partial_invalid
535            || inner.arena[parent].data.pending
536        {
537            candidate = blockset.clone();
538            let mut p = parent;
539            while p != NULL
540                && (inner.arena[p].data.partial_invalid
541                    || inner.arena[p].data.pending)
542            {
543                let blockset_p = inner
544                    .exchange_or_compute_blockset_in_own_view_of_epoch(p, None);
545                candidate.extend(blockset_p.iter());
546                inner.exchange_or_compute_blockset_in_own_view_of_epoch(
547                    p,
548                    Some(blockset_p),
549                );
550                p = inner.arena[p].parent;
551            }
552            candidate.iter()
553        } else {
554            blockset.iter()
555        };
556
557        if let Some(subtree_weight) = weight_tuple {
558            let res = ConsensusNewBlockHandler::check_correct_parent_brutal(
559                inner,
560                me,
561                subtree_weight,
562                candidate_iter,
563            );
564            // We have to put but the blockset here! Otherwise the
565            // blockset_in_own_view_of_epoch will be corrupted.
566            inner.exchange_or_compute_blockset_in_own_view_of_epoch(
567                me,
568                Some(blockset),
569            );
570            return res;
571        }
572        let mut valid = true;
573        let force_confirm = inner.arena[me].data.force_confirm;
574        let force_confirm_height = inner.arena[force_confirm].height;
575
576        let mut weight_delta = HashMap::new();
577
578        for index in anticone_barrier {
579            let delta = inner.weight_tree.get(index as usize);
580            weight_delta.insert(index as usize, delta);
581        }
582
583        // Remove weight contribution of anticone
584        for (index, delta) in &weight_delta {
585            inner.weight_tree.path_apply(*index, -delta);
586        }
587
588        // Check the pivot selection decision.
589        for consensus_arena_index_in_epoch in candidate_iter {
590            let lca = inner.lca(*consensus_arena_index_in_epoch, parent);
591            assert!(lca != *consensus_arena_index_in_epoch);
592            // If it is outside the era, we will skip!
593            if lca == NULL || inner.arena[lca].height < force_confirm_height {
594                continue;
595            }
596            if lca == parent {
597                debug!("Block invalid (index = {}), referenced block {} index {} is in the subtree of parent block {} index {}!", me, inner.arena[*consensus_arena_index_in_epoch].hash, *consensus_arena_index_in_epoch, inner.arena[parent].hash, parent);
598                valid = false;
599                break;
600            }
601
602            let fork = inner.ancestor_at(
603                *consensus_arena_index_in_epoch,
604                inner.arena[lca].height + 1,
605            );
606            let pivot = inner.ancestor_at(parent, inner.arena[lca].height + 1);
607
608            let fork_subtree_weight = inner.weight_tree.get(fork);
609            let pivot_subtree_weight = inner.weight_tree.get(pivot);
610
611            if ConsensusGraphInner::is_heavier(
612                (fork_subtree_weight, &inner.arena[fork].hash),
613                (pivot_subtree_weight, &inner.arena[pivot].hash),
614            ) {
615                debug!("Block invalid (index = {}), referenced block {} index {} fork is heavier than the parent block {} index {} fork! Ref fork block {} weight {}, parent fork block {} weight {}!",
616                       me, inner.arena[*consensus_arena_index_in_epoch].hash, *consensus_arena_index_in_epoch, inner.arena[parent].hash, parent,
617                       inner.arena[fork].hash, fork_subtree_weight, inner.arena[pivot].hash, pivot_subtree_weight);
618                valid = false;
619                break;
620            } else {
621                trace!("Pass one validity check, block index = {}. Referenced block {} index {} fork is not heavier than the parent block {} index {} fork. Ref fork block {} weight {}, parent fork block {} weight {}!",
622                       me, inner.arena[*consensus_arena_index_in_epoch].hash, *consensus_arena_index_in_epoch, inner.arena[parent].hash, parent,
623                       inner.arena[fork].hash, fork_subtree_weight, inner.arena[pivot].hash, pivot_subtree_weight);
624            }
625        }
626
627        inner.exchange_or_compute_blockset_in_own_view_of_epoch(
628            me,
629            Some(blockset),
630        );
631
632        for (index, delta) in &weight_delta {
633            inner.weight_tree.path_apply(*index, *delta);
634        }
635
636        valid
637    }
638
639    fn check_block_full_validity(
640        &self, new: usize, inner: &mut ConsensusGraphInner, adaptive: bool,
641        anticone_barrier: &BitSet, weight_tuple: Option<&Vec<i128>>,
642    ) -> bool {
643        let parent = inner.arena[new].parent;
644        let force_confirm = inner.arena[new].data.force_confirm;
645
646        if inner.lca(parent, force_confirm) != force_confirm {
647            warn!("Partially invalid due to picking incorrect parent (force confirmation {:?} violation). {:?}", force_confirm, inner.arena[new].hash);
648            return false;
649        }
650
651        // Check whether the new block select the correct parent block
652        if !ConsensusNewBlockHandler::check_correct_parent(
653            inner,
654            new,
655            anticone_barrier,
656            weight_tuple,
657        ) {
658            warn!(
659                "Partially invalid due to picking incorrect parent. {:?}",
660                inner.arena[new].hash
661            );
662            return false;
663        }
664
665        // Check whether difficulty is set correctly
666        if inner.arena[new].difficulty
667            != inner.expected_difficulty(&inner.arena[parent].hash)
668        {
669            warn!(
670                "Partially invalid due to wrong difficulty. {:?}",
671                inner.arena[new].hash
672            );
673            return false;
674        }
675
676        // Check adaptivity match. Note that in bench mode we do not check
677        // the adaptive field correctness. We simply override its value
678        // with the right one.
679        if !self.conf.bench_mode {
680            if inner.arena[new].adaptive != adaptive {
681                warn!(
682                    "Partially invalid due to invalid adaptive field. {:?}",
683                    inner.arena[new].hash
684                );
685                return false;
686            }
687        }
688
689        // Check if `new` is in the subtree of its pos reference.
690        if self
691            .pos_verifier
692            .is_enabled_at_height(inner.arena[new].height)
693        {
694            let pivot_decision = inner
695                .get_pos_reference_pivot_decision(&inner.arena[new].hash)
696                .expect("pos reference checked");
697            match inner.hash_to_arena_indices.get(&pivot_decision) {
698                // Pivot decision is before checkpoint or fake.
699                // Check if it's on the pivot chain.
700                None => {
701                    warn!("Possibly partial invalid due to pos_reference's pivot decision not in consensus graph");
702                    return inner.pivot_block_processed(&pivot_decision);
703                }
704                Some(pivot_decision_arena_index) => {
705                    if inner.lca(new, *pivot_decision_arena_index)
706                        != *pivot_decision_arena_index
707                    {
708                        warn!("Partial invalid due to not in the subtree of pos_reference's pivot decision");
709                        // Not in the subtree of pivot_decision, mark as partial
710                        // invalid.
711                        return false;
712                    }
713                }
714            }
715        }
716
717        return true;
718    }
719
720    #[inline]
721    /// Subroutine called by on_new_block()
722    fn update_lcts_initial(&self, inner: &mut ConsensusGraphInner, me: usize) {
723        let parent = inner.arena[me].parent;
724
725        inner.weight_tree.make_tree(me);
726        inner.weight_tree.link(parent, me);
727
728        inner.adaptive_tree.make_tree(me);
729        inner.adaptive_tree.link(parent, me);
730    }
731
732    #[inline]
733    /// Subroutine called by on_new_block()
734    fn update_lcts_finalize(&self, inner: &mut ConsensusGraphInner, me: usize) {
735        let parent = inner.arena[me].parent;
736        let parent_tw = inner.weight_tree.get(parent);
737        let parent_w = inner.block_weight(parent);
738        inner.adaptive_tree.set(me, -parent_tw + parent_w);
739
740        let weight = inner.block_weight(me);
741        inner.weight_tree.path_apply(me, weight);
742        inner.adaptive_tree.path_apply(me, 2 * weight);
743        inner.adaptive_tree.caterpillar_apply(parent, -weight);
744    }
745
746    fn recycle_tx_in_block(
747        &self, inner: &ConsensusGraphInner, block_hash: &H256,
748    ) {
749        info!("recycle_tx_in_block: block_hash={:?}", block_hash);
750        if let Some(block) = inner
751            .data_man
752            .block_by_hash(block_hash, true /* update_cache */)
753        {
754            self.txpool.recycle_transactions(block.transactions.clone());
755        } else {
756            // This should only happen for blocks in the anticone of
757            // checkpoints.
758            warn!("recycle_tx_in_block: block {:?} not in db", block_hash);
759        }
760    }
761
762    fn should_move_stable_height(
763        &self, inner: &mut ConsensusGraphInner,
764    ) -> u64 {
765        if let Some(sync_state_starting_epoch) =
766            self.conf.sync_state_starting_epoch
767        {
768            if inner.header_only
769                && inner.cur_era_stable_height == sync_state_starting_epoch
770            {
771                // We want to use sync_state_starting_epoch as our stable
772                // checkpoint when we enter
773                // CatchUpCheckpointPhase, so we do not want to move forward our
774                // stable checkpoint. Since we will enter
775                // CatchUpCheckpointPhase the next time we check phase changes,
776                // it's impossible for the delayed checkpoint making to cause
777                // OOM.
778                return inner.cur_era_stable_height;
779            }
780        }
781        let new_stable_height =
782            inner.cur_era_stable_height + inner.inner_conf.era_epoch_count;
783        // We make sure there is an additional era before the best for moving it
784        if new_stable_height + inner.inner_conf.era_epoch_count
785            > inner.best_epoch_number()
786        {
787            return inner.cur_era_stable_height;
788        }
789        let new_stable_pivot_arena_index =
790            inner.get_pivot_block_arena_index(new_stable_height);
791        // Now we need to make sure that this new stable block is
792        // force_confirmed in our current graph
793        if inner.timer_chain_accumulative_lca.len() == 0 {
794            return inner.cur_era_stable_height;
795        }
796        if let Some(last) = inner.timer_chain_accumulative_lca.last() {
797            let lca = inner.lca(*last, new_stable_pivot_arena_index);
798            if lca == new_stable_pivot_arena_index {
799                return new_stable_height;
800            }
801        }
802        return inner.cur_era_stable_height;
803    }
804
805    fn should_form_checkpoint_at(
806        &self, inner: &mut ConsensusGraphInner,
807    ) -> usize {
808        let stable_pivot_block =
809            inner.get_pivot_block_arena_index(inner.cur_era_stable_height);
810        let mut new_genesis_height =
811            inner.cur_era_genesis_height + inner.inner_conf.era_epoch_count;
812
813        // FIXME: Here is a chicken and egg problem. In our full node sync
814        // FIXME: logic, we first run consensus on headers to determine
815        // FIXME: the checkpoint location. And then run the full blocks.
816        // FIXME: However, when we do not have the body, we cannot faithfully
817        // FIXME: check this condition. The consequence is that if
818        // FIXME: attacker managed to generate a lot blame blocks. New full
819        // FIXME: nodes will not correctly determine the safe checkpoint
820        // FIXME: location to start the sync. Causing potential panic
821        // FIXME: when computing `state_valid` and `blame_info`.
822        if !inner.header_only && !self.conf.bench_mode {
823            // Stable block must have a blame vector that does not stretch
824            // beyond the new genesis
825            if !inner.arena[stable_pivot_block].data.state_valid.unwrap() {
826                if inner.arena[stable_pivot_block]
827                    .data
828                    .blame_info
829                    .unwrap()
830                    .blame as u64
831                    + new_genesis_height
832                    + DEFERRED_STATE_EPOCH_COUNT
833                    >= inner.cur_era_stable_height
834                {
835                    return inner.cur_era_genesis_block_arena_index;
836                }
837            }
838        }
839
840        // We cannot move beyond the stable block/height
841        'out: while new_genesis_height < inner.cur_era_stable_height {
842            let new_genesis_block_arena_index =
843                inner.get_pivot_block_arena_index(new_genesis_height);
844            assert!(inner.arena[stable_pivot_block].data.force_confirm != NULL);
845            if inner.lca(
846                new_genesis_block_arena_index,
847                inner.arena[stable_pivot_block].data.force_confirm,
848            ) != new_genesis_block_arena_index
849            {
850                // All following era genesis candidates are on the same fork,
851                // so they are not force_confirmed by stable now.
852                return inner.cur_era_genesis_block_arena_index;
853            }
854
855            // Because the timer chain is unlikely to reorganize at this point.
856            // We will just skip this height if we found timer block
857            // in its anticone before.
858            if inner
859                .has_timer_block_in_anticone_cache
860                .contains(&new_genesis_block_arena_index)
861            {
862                new_genesis_height += inner.inner_conf.era_epoch_count;
863                continue 'out;
864            }
865
866            // Now we need to make sure that no timer chain block is in the
867            // anticone of the new genesis. This is required for our
868            // checkpoint algorithm.
869            let mut visited = BitSet::new();
870            let mut queue = VecDeque::new();
871            queue.push_back(new_genesis_block_arena_index);
872            visited.add(new_genesis_block_arena_index as u32);
873            while let Some(x) = queue.pop_front() {
874                for child in &inner.arena[x].children {
875                    if !visited.contains(*child as u32) {
876                        visited.add(*child as u32);
877                        queue.push_back(*child);
878                    }
879                }
880                for referrer in &inner.arena[x].referrers {
881                    if !visited.contains(*referrer as u32) {
882                        visited.add(*referrer as u32);
883                        queue.push_back(*referrer);
884                    }
885                }
886            }
887            let start_timer_chain_height = inner.arena
888                [new_genesis_block_arena_index]
889                .data
890                .ledger_view_timer_chain_height;
891            let start_timer_chain_index = (start_timer_chain_height
892                - inner.cur_era_genesis_timer_chain_height)
893                as usize;
894            for i in start_timer_chain_index..inner.timer_chain.len() {
895                if !visited.contains(inner.timer_chain[i] as u32) {
896                    inner
897                        .has_timer_block_in_anticone_cache
898                        .insert(new_genesis_block_arena_index);
899                    // This era genesis candidate has a timer chain block in its
900                    // anticone, so we move to check the next one.
901                    new_genesis_height += inner.inner_conf.era_epoch_count;
902                    continue 'out;
903                }
904            }
905            return new_genesis_block_arena_index;
906        }
907        // We cannot make a new checkpoint.
908        inner.cur_era_genesis_block_arena_index
909    }
910
911    fn persist_terminals(&self, inner: &ConsensusGraphInner) {
912        let mut terminals = Vec::with_capacity(inner.terminal_hashes.len());
913        for h in &inner.terminal_hashes {
914            terminals.push(h.clone());
915        }
916        self.data_man.insert_terminals_to_db(terminals);
917    }
918
919    fn try_clear_blockset_in_own_view_of_epoch(
920        inner: &mut ConsensusGraphInner, me: usize,
921    ) {
922        if inner.arena[me].data.blockset_in_own_view_of_epoch.len() as u64
923            > BLOCKSET_IN_OWN_VIEW_OF_EPOCH_CAP
924        {
925            inner.arena[me].data.blockset_in_own_view_of_epoch =
926                Default::default();
927            inner.arena[me].data.skipped_epoch_blocks = Default::default();
928            inner.arena[me].data.blockset_cleared = true;
929        }
930    }
931
932    // This function computes the timer chain in the view of the new block.
933    // The first returned value is the fork height of the timer chain.
934    // The second is a map that overwrites timer_chain_height values after the
935    // fork height.
936    fn compute_timer_chain_tuple(
937        inner: &ConsensusGraphInner, me: usize, anticone: &BitSet,
938    ) -> (u64, HashMap<usize, u64>, Vec<usize>, Vec<usize>) {
939        inner.compute_timer_chain_tuple(
940            inner.arena[me].parent,
941            &inner.arena[me].referees,
942            Some(anticone),
943        )
944    }
945
946    fn compute_invalid_block_start_timer(
947        &self, inner: &ConsensusGraphInner, me: usize,
948    ) -> u64 {
949        let last_index =
950            inner.arena[me].data.past_view_last_timer_block_arena_index;
951        if last_index == NULL {
952            inner.inner_conf.timer_chain_beta
953        } else {
954            inner.arena[last_index].data.ledger_view_timer_chain_height
955                + inner.inner_conf.timer_chain_beta
956                + if inner.get_timer_chain_index(last_index) != NULL {
957                    1
958                } else {
959                    0
960                }
961        }
962    }
963
964    fn preactivate_block(
965        &self, inner: &mut ConsensusGraphInner, me: usize,
966    ) -> BlockStatus {
967        debug!(
968            "Start to preactivate block {} index = {}",
969            inner.arena[me].hash, me
970        );
971        let parent = inner.arena[me].parent;
972        let mut pending = {
973            if let Some(f) = inner.initial_stable_future.as_mut() {
974                let mut in_future = false;
975                if inner.arena[me].hash == inner.cur_era_stable_block_hash {
976                    in_future = true;
977                }
978                if parent != NULL && f.contains(parent as u32) {
979                    in_future = true;
980                }
981                if !in_future {
982                    for referee in &inner.arena[me].referees {
983                        if f.contains(*referee as u32) {
984                            in_future = true;
985                            break;
986                        }
987                    }
988                }
989                if in_future {
990                    f.add(me as u32);
991                }
992                !in_future
993            } else {
994                let mut last_pivot_in_past = if parent != NULL {
995                    inner.arena[parent].data.last_pivot_in_past
996                } else {
997                    inner.cur_era_genesis_height
998                };
999                for referee in &inner.arena[me].referees {
1000                    last_pivot_in_past = max(
1001                        last_pivot_in_past,
1002                        inner.arena[*referee].data.last_pivot_in_past,
1003                    );
1004                }
1005                last_pivot_in_past < inner.cur_era_stable_height
1006            }
1007        };
1008
1009        // Because the following computation relies on all previous blocks being
1010        // active, We have to delay it till now
1011        let (timer_longest_difficulty, last_timer_block_arena_index) = inner
1012            .compute_timer_chain_past_view_info(
1013                parent,
1014                &inner.arena[me].referees,
1015            );
1016
1017        inner.arena[me].data.past_view_timer_longest_difficulty =
1018            timer_longest_difficulty;
1019        inner.arena[me].data.past_view_last_timer_block_arena_index =
1020            last_timer_block_arena_index;
1021
1022        inner.arena[me].data.force_confirm =
1023            inner.cur_era_genesis_block_arena_index;
1024
1025        let fully_valid;
1026
1027        // Note that this function also updates the anticone for other nodes, so
1028        // we have to call it even for pending blocks!
1029        let (anticone, anticone_barrier) =
1030            ConsensusNewBlockHandler::compute_and_update_anticone(inner, me);
1031
1032        if !pending {
1033            let timer_chain_tuple =
1034                ConsensusNewBlockHandler::compute_timer_chain_tuple(
1035                    inner, me, &anticone,
1036                );
1037
1038            inner.arena[me].data.force_confirm = inner
1039                .compute_block_force_confirm(
1040                    &timer_chain_tuple,
1041                    self.data_man
1042                        .pos_reference_by_hash(&inner.arena[me].hash)
1043                        .expect("header exist"),
1044                );
1045            debug!(
1046                "Force confirm block index {} in the past view of block index={}",
1047                inner.arena[me].data.force_confirm, me
1048            );
1049
1050            let weight_tuple = if anticone_barrier.len() >= ANTICONE_BARRIER_CAP
1051            {
1052                Some(inner.compute_subtree_weights(me, &anticone_barrier))
1053            } else {
1054                None
1055            };
1056
1057            let adaptive = inner.adaptive_weight(
1058                me,
1059                &anticone_barrier,
1060                weight_tuple.as_ref(),
1061                &timer_chain_tuple,
1062            );
1063
1064            fully_valid = self.check_block_full_validity(
1065                me,
1066                inner,
1067                adaptive,
1068                &anticone_barrier,
1069                weight_tuple.as_ref(),
1070            );
1071
1072            if self.conf.bench_mode && fully_valid {
1073                inner.arena[me].adaptive = adaptive;
1074            }
1075        } else {
1076            let block_status_in_db = self
1077                .data_man
1078                .local_block_info_by_hash(&inner.arena[me].hash)
1079                .map(|info| info.get_status())
1080                .unwrap_or(BlockStatus::Pending);
1081            fully_valid = block_status_in_db != BlockStatus::PartialInvalid;
1082            pending = block_status_in_db == BlockStatus::Pending;
1083            debug!(
1084                "Fetch the block validity status {} from the local data base",
1085                fully_valid
1086            );
1087        }
1088
1089        debug!(
1090            "Finish preactivation block {} index = {}",
1091            inner.arena[me].hash, me
1092        );
1093        let block_status = if pending {
1094            BlockStatus::Pending
1095        } else if fully_valid {
1096            BlockStatus::Valid
1097        } else {
1098            BlockStatus::PartialInvalid
1099        };
1100        self.persist_block_info(inner, me, block_status);
1101
1102        block_status
1103    }
1104
1105    fn activate_block(
1106        &self, inner: &mut ConsensusGraphInner, me: usize,
1107        meter: &ConfirmationMeter, queue: &mut VecDeque<usize>,
1108    ) {
1109        inner.arena[me].data.activated = true;
1110        self.statistics.inc_consensus_graph_activated_block_count();
1111        let mut succ_list = inner.arena[me].children.clone();
1112        succ_list.extend(inner.arena[me].referrers.iter());
1113        for succ in &succ_list {
1114            assert!(inner.arena[*succ].data.inactive_dependency_cnt > 0);
1115            inner.arena[*succ].data.inactive_dependency_cnt -= 1;
1116            if inner.arena[*succ].data.inactive_dependency_cnt == 0 {
1117                queue.push_back(*succ);
1118            }
1119        }
1120        // The above is the only thing we need to do for out-of-era blocks
1121        // so for these blocks, we quit here.
1122        if inner.arena[me].era_block == NULL {
1123            debug!(
1124                "Updated active counters for out-of-era block in ConsensusGraph: index = {:?} hash={:?}",
1125                me, inner.arena[me].hash,
1126            );
1127            return;
1128        } else {
1129            debug!(
1130                "Start activating block in ConsensusGraph: index = {:?} hash={:?} height={:?}",
1131                me, inner.arena[me].hash, inner.arena[me].height,
1132            );
1133        }
1134
1135        let parent = inner.arena[me].parent;
1136        // Update terminal hashes for mining
1137        if parent != NULL {
1138            inner.terminal_hashes.remove(&inner.arena[parent].hash);
1139        }
1140        inner.terminal_hashes.insert(inner.arena[me].hash.clone());
1141        for referee in &inner.arena[me].referees {
1142            inner.terminal_hashes.remove(&inner.arena[*referee].hash);
1143        }
1144
1145        self.update_lcts_finalize(inner, me);
1146        let my_weight = inner.block_weight(me);
1147        let mut extend_pivot = false;
1148        let mut pivot_changed = false;
1149        // ``fork_at`` stores the first pivot chain height that we need to
1150        // update (because of the new inserted block). If the new block
1151        // extends the pivot chain, ``fork_at`` will equal to the new pivot
1152        // chain height (end of the pivot chain).
1153        let mut fork_at;
1154        let old_pivot_chain_len = inner.pivot_chain.len();
1155
1156        // Update consensus inner with a possibly new pos_reference.
1157        inner.update_pos_pivot_decision(me);
1158
1159        // Now we are going to maintain the timer chain.
1160        let diff = inner.arena[me].data.past_view_timer_longest_difficulty
1161            + inner.get_timer_difficulty(me);
1162        if inner.arena[me].is_timer
1163            && !inner.arena[me].data.partial_invalid
1164            && ConsensusGraphInner::is_heavier(
1165                (diff, &inner.arena[me].hash),
1166                (
1167                    inner.best_timer_chain_difficulty,
1168                    &inner.best_timer_chain_hash,
1169                ),
1170            )
1171        {
1172            inner.best_timer_chain_difficulty = diff;
1173            inner.best_timer_chain_hash = inner.arena[me].hash.clone();
1174            inner.update_timer_chain(me);
1175            // Now we go over every element in the ``invalid_block_queue``
1176            // because their timer may change.
1177            if !self.conf.bench_mode {
1178                let mut new_block_queue = BinaryHeap::new();
1179                for (_, x) in &inner.invalid_block_queue {
1180                    let timer =
1181                        self.compute_invalid_block_start_timer(inner, *x);
1182                    new_block_queue.push((-(timer as i128), *x));
1183                    debug!(
1184                        "Partial invalid Block {} (hash = {}) start timer is now {}",
1185                        *x, inner.arena[*x].hash, timer
1186                    );
1187                }
1188                inner.invalid_block_queue = new_block_queue;
1189            }
1190        } else {
1191            let mut timer_chain_height =
1192                inner.arena[parent].data.ledger_view_timer_chain_height;
1193            if inner.get_timer_chain_index(parent) != NULL {
1194                timer_chain_height += 1;
1195            }
1196            for referee in &inner.arena[me].referees {
1197                let timer_bit = if inner.get_timer_chain_index(*referee) != NULL
1198                {
1199                    1
1200                } else {
1201                    0
1202                };
1203                if inner.arena[*referee].data.ledger_view_timer_chain_height
1204                    + timer_bit
1205                    > timer_chain_height
1206                {
1207                    timer_chain_height = inner.arena[*referee]
1208                        .data
1209                        .ledger_view_timer_chain_height
1210                        + timer_bit;
1211                }
1212            }
1213            inner.arena[me].data.ledger_view_timer_chain_height =
1214                timer_chain_height;
1215        }
1216
1217        meter.aggregate_total_weight_in_past(my_weight);
1218        let force_confirm = inner.compute_global_force_confirm();
1219        let force_height = inner.arena[force_confirm].height;
1220        let last = inner.pivot_chain.last().cloned().unwrap();
1221        let force_lca = inner.lca(force_confirm, last);
1222
1223        if force_lca == force_confirm && inner.arena[me].parent == last {
1224            let me_height = inner.arena[me].height;
1225            let me_hash = inner.arena[me].hash;
1226            let allow_extend = self
1227                .pivot_hint
1228                .as_ref()
1229                .map_or(true, |hint| hint.allow_extend(me_height, me_hash));
1230            if allow_extend {
1231                inner.pivot_chain.push(me);
1232                inner.set_epoch_number_in_epoch(
1233                    me,
1234                    inner.pivot_index_to_height(inner.pivot_chain.len()) - 1,
1235                );
1236                inner.pivot_chain_metadata.push(Default::default());
1237                extend_pivot = true;
1238                pivot_changed = true;
1239                fork_at = inner.pivot_index_to_height(old_pivot_chain_len);
1240            } else {
1241                debug!("Chain extend rejected by pivot hint: height={me_height}, hash={me_hash:?}");
1242                fork_at = inner.pivot_index_to_height(old_pivot_chain_len);
1243            }
1244        } else {
1245            let lca = inner.lca(last, me);
1246            let new;
1247            if self.pivot_hint.is_some() && lca == last {
1248                // If pivot hint is enabled, `me` could be an extend of the
1249                // pivot chain, but its parent block is not on the pivot chain.
1250                // This special case can only happen
1251                debug!("Chain extend rejected by pivot hint because parent is rejected.");
1252                fork_at = inner.pivot_index_to_height(old_pivot_chain_len);
1253                // In this case, `pivot_changed` is false. So `new` can be
1254                // aribitrary value.
1255                new = 0;
1256            } else if force_confirm != force_lca {
1257                debug!(
1258                    "pivot chain switch to force_confirm={} force_height={}",
1259                    force_confirm, force_height
1260                );
1261                fork_at = inner.arena[force_lca].height + 1;
1262                new = inner.ancestor_at(force_confirm, fork_at);
1263                pivot_changed = true;
1264            } else {
1265                fork_at = inner.arena[lca].height + 1;
1266                let prev = inner.get_pivot_block_arena_index(fork_at);
1267                let prev_weight = inner.weight_tree.get(prev);
1268                new = inner.ancestor_at(me, fork_at);
1269                let new_weight = inner.weight_tree.get(new);
1270
1271                let me_height = inner.arena[me].height;
1272                let me_ancestor_hash_at =
1273                    |height| inner.arena[inner.ancestor_at(me, height)].hash;
1274
1275                // Note that for properly set consensus parameters, fork_at will
1276                // always after the force_height (i.e., the
1277                // force confirmation is always stable).
1278                // But during testing, we may want to stress the consensus.
1279                // Therefore we add this condition fork_at >
1280                // force_height to maintain consistency.
1281                if fork_at > force_height
1282                    && ConsensusGraphInner::is_heavier(
1283                        (new_weight, &inner.arena[new].hash),
1284                        (prev_weight, &inner.arena[prev].hash),
1285                    )
1286                    && self.pivot_hint.as_ref().map_or(true, |hint| {
1287                        hint.allow_switch(
1288                            fork_at,
1289                            me_height,
1290                            me_ancestor_hash_at,
1291                        )
1292                    })
1293                {
1294                    pivot_changed = true;
1295                } else {
1296                    // The previous subtree is still heavier, nothing is
1297                    // updated
1298                    debug!("Old pivot chain is heavier, pivot chain unchanged");
1299                    fork_at = inner.pivot_index_to_height(old_pivot_chain_len);
1300                }
1301            }
1302            if pivot_changed {
1303                // The new subtree is heavier, update pivot chain
1304                let fork_pivot_index = inner.height_to_pivot_index(fork_at);
1305                assert!(fork_pivot_index < inner.pivot_chain.len());
1306                for discarded_idx in
1307                    inner.pivot_chain.split_off(fork_pivot_index)
1308                {
1309                    // Reset the epoch_number of the discarded fork
1310                    inner.reset_epoch_number_in_epoch(discarded_idx);
1311                    ConsensusNewBlockHandler::try_clear_blockset_in_own_view_of_epoch(inner,
1312                    discarded_idx);
1313                }
1314                let mut u = new;
1315                loop {
1316                    inner.compute_blockset_in_own_view_of_epoch(u);
1317                    inner.pivot_chain.push(u);
1318                    inner.set_epoch_number_in_epoch(
1319                        u,
1320                        inner.pivot_index_to_height(inner.pivot_chain.len())
1321                            - 1,
1322                    );
1323                    if inner.arena[u].height >= force_height {
1324                        let mut heaviest = NULL;
1325                        let mut heaviest_weight = 0;
1326                        for index in &inner.arena[u].children {
1327                            if !inner.arena[*index].data.activated {
1328                                continue;
1329                            }
1330                            let weight = inner.weight_tree.get(*index);
1331                            if heaviest == NULL
1332                                || ConsensusGraphInner::is_heavier(
1333                                    (weight, &inner.arena[*index].hash),
1334                                    (
1335                                        heaviest_weight,
1336                                        &inner.arena[heaviest].hash,
1337                                    ),
1338                                )
1339                            {
1340                                heaviest = *index;
1341                                heaviest_weight = weight;
1342                            }
1343                        }
1344                        if heaviest == NULL {
1345                            break;
1346                        }
1347                        u = heaviest;
1348                    } else {
1349                        u = inner.ancestor_at(
1350                            force_confirm,
1351                            inner.arena[u].height + 1,
1352                        );
1353                    }
1354                }
1355            }
1356        };
1357        debug!(
1358            "Forked at height {}, fork parent block {}",
1359            fork_at,
1360            &inner.arena[inner.get_pivot_block_arena_index(fork_at - 1)].hash,
1361        );
1362
1363        // Now compute last_pivot_in_block and update pivot_metadata.
1364        // Note that we need to do this for partially invalid blocks to
1365        // propagate information!
1366        if !extend_pivot {
1367            let update_at = fork_at - 1;
1368            let mut last_pivot_to_update = HashSet::new();
1369            last_pivot_to_update.insert(me);
1370            if pivot_changed {
1371                inner.best_terminals_reorg_height =
1372                    min(inner.best_terminals_reorg_height, update_at);
1373                let update_pivot_index = inner.height_to_pivot_index(update_at);
1374                for pivot_index in update_pivot_index..old_pivot_chain_len {
1375                    for x in &inner.pivot_chain_metadata[pivot_index]
1376                        .last_pivot_in_past_blocks
1377                    {
1378                        last_pivot_to_update.insert(*x);
1379                    }
1380                }
1381                inner.recompute_metadata(fork_at, last_pivot_to_update);
1382            } else {
1383                // pivot chain not extend and not change
1384                ConsensusNewBlockHandler::try_clear_blockset_in_own_view_of_epoch(inner, me);
1385                inner.recompute_metadata(
1386                    inner.get_pivot_height(),
1387                    last_pivot_to_update,
1388                );
1389            }
1390        } else {
1391            let height = inner.arena[me].height;
1392            inner.arena[me].data.last_pivot_in_past = height;
1393            let pivot_index = inner.height_to_pivot_index(height);
1394            inner.pivot_chain_metadata[pivot_index]
1395                .last_pivot_in_past_blocks
1396                .insert(me);
1397            let blockset = inner
1398                .exchange_or_compute_blockset_in_own_view_of_epoch(me, None);
1399            inner.pivot_chain_metadata[pivot_index].past_weight =
1400                inner.pivot_chain_metadata[pivot_index - 1].past_weight
1401                    + inner.total_weight_in_own_epoch(
1402                        &blockset,
1403                        inner.cur_era_genesis_block_arena_index,
1404                    )
1405                    + inner.block_weight(me);
1406            inner.exchange_or_compute_blockset_in_own_view_of_epoch(
1407                me,
1408                Some(blockset),
1409            );
1410        }
1411
1412        // Only process blocks in the subtree of stable
1413        if (inner.arena[me].height <= inner.cur_era_stable_height
1414            || (inner.arena[me].height > inner.cur_era_stable_height
1415                && inner.arena
1416                    [inner.ancestor_at(me, inner.cur_era_stable_height)]
1417                .hash
1418                    != inner.cur_era_stable_block_hash))
1419            && !self.conf.bench_mode
1420        {
1421            self.persist_terminals(inner);
1422            if pivot_changed {
1423                // If we switch to a chain without stable block,
1424                // we should avoid execute unavailable states.
1425                // TODO It is handled by processing
1426                // `state_availability_boundary` at the end,
1427                // we can probably refactor to move that part of code before
1428                // this skip and remove this special case.
1429                self.data_man
1430                    .state_availability_boundary
1431                    .write()
1432                    .optimistic_executed_height = None;
1433            }
1434            debug!(
1435                "Finish activating block in ConsensusGraph: index={:?} hash={:?},\
1436                 block is not in the subtree of stable",
1437                me, inner.arena[me].hash
1438            );
1439            return;
1440        }
1441        // Note that only pivot chain height after the capped_fork_at needs to
1442        // update execution state.
1443        let capped_fork_at = max(inner.cur_era_stable_height + 1, fork_at);
1444
1445        inner.adjust_difficulty(*inner.pivot_chain.last().expect("not empty"));
1446        if me % CONFIRMATION_METER_UPDATE_FREQUENCY == 0 || pivot_changed {
1447            meter.update_confirmation_risks(inner);
1448        }
1449
1450        if pivot_changed {
1451            if inner.pivot_chain.len() > EPOCH_SET_PERSISTENCE_DELAY as usize {
1452                let capped_fork_at_pivot_index =
1453                    inner.height_to_pivot_index(capped_fork_at);
1454                // Starting from old_len ensures that all epochs within
1455                // [old_len - delay, new_len - delay) will be inserted to db, so
1456                // no epochs will be skipped. Starting from
1457                // fork_at ensures that any epoch set change will be
1458                // overwritten.
1459                let start_pivot_index = if old_pivot_chain_len
1460                    >= EPOCH_SET_PERSISTENCE_DELAY as usize
1461                {
1462                    min(
1463                        capped_fork_at_pivot_index,
1464                        old_pivot_chain_len
1465                            - EPOCH_SET_PERSISTENCE_DELAY as usize,
1466                    )
1467                } else {
1468                    capped_fork_at_pivot_index
1469                };
1470                let to_persist_pivot_index = inner.pivot_chain.len()
1471                    - EPOCH_SET_PERSISTENCE_DELAY as usize;
1472                for pivot_index in start_pivot_index..to_persist_pivot_index {
1473                    inner.persist_epoch_set_hashes(pivot_index);
1474                }
1475            }
1476        }
1477
1478        // Note that after the checkpoint (if happens), the old_pivot_chain_len
1479        // value will become obsolete
1480        let old_pivot_chain_height =
1481            inner.pivot_index_to_height(old_pivot_chain_len);
1482
1483        if inner.best_epoch_number() > inner.cur_era_stable_height
1484            && inner.arena
1485                [inner.get_pivot_block_arena_index(inner.cur_era_stable_height)]
1486            .hash
1487                == inner.cur_era_stable_block_hash
1488        {
1489            let new_stable_height = self.should_move_stable_height(inner);
1490            if inner.cur_era_stable_height != new_stable_height {
1491                inner.cur_era_stable_height = new_stable_height;
1492                let stable_arena_index =
1493                    inner.get_pivot_block_arena_index(new_stable_height);
1494
1495                // Ensure all blocks on the pivot chain before
1496                // the new stable block to have state_valid computed
1497                if !inner.header_only && !self.conf.bench_mode {
1498                    // FIXME: this asserion doesn't hold any more
1499                    // assert!(
1500                    //     new_stable_height
1501                    //         >= inner
1502                    //             .data_man
1503                    //             .state_availability_boundary
1504                    //             .read()
1505                    //             .lower_bound
1506                    // );
1507                    // If new_era_genesis should have available state,
1508                    // make sure state execution is finished before setting
1509                    // lower_bound
1510                    // to the new_checkpoint_era_genesis.
1511                    self.executor
1512                        .wait_for_result(inner.arena[stable_arena_index].hash)
1513                        .expect(
1514                            "Execution state of the pivot chain is corrupted!",
1515                        );
1516                    inner
1517                        .compute_state_valid_and_blame_info(
1518                            stable_arena_index,
1519                            &self.executor,
1520                        )
1521                        .expect(
1522                            "New stable node should have available state_valid",
1523                        );
1524                }
1525
1526                let genesis_hash =
1527                    &inner.arena[inner.cur_era_genesis_block_arena_index].hash;
1528                let stable_hash = &inner.arena[stable_arena_index].hash;
1529                inner.cur_era_stable_block_hash = stable_hash.clone();
1530                inner.data_man.set_cur_consensus_era_genesis_hash(
1531                    genesis_hash,
1532                    stable_hash,
1533                );
1534                inner.initial_stable_future = None;
1535                debug!(
1536                    "Move era stable genesis to height={} hash={:?}",
1537                    new_stable_height, stable_hash
1538                );
1539            }
1540        }
1541
1542        // We are only going to check the checkpoint movement after the stable
1543        // is on the pivot chain (will not always be true during the recovery).
1544        // The code inside assumes this assumption.
1545        if inner.cur_era_stable_height < inner.best_epoch_number()
1546            && inner.arena
1547                [inner.get_pivot_block_arena_index(inner.cur_era_stable_height)]
1548            .hash
1549                == inner.cur_era_stable_block_hash
1550        {
1551            let new_checkpoint_era_genesis =
1552                self.should_form_checkpoint_at(inner);
1553            if new_checkpoint_era_genesis
1554                != inner.cur_era_genesis_block_arena_index
1555            {
1556                info!(
1557                    "Working on new checkpoint, old checkpoint block {} height {}",
1558                    &inner.arena[inner.cur_era_genesis_block_arena_index].hash,
1559                    inner.cur_era_genesis_height
1560                );
1561
1562                ConsensusNewBlockHandler::make_checkpoint_at(
1563                    inner,
1564                    new_checkpoint_era_genesis,
1565                );
1566                let stable_era_genesis_arena_index =
1567                    inner.ancestor_at(me, inner.cur_era_stable_height);
1568                meter.reset_for_checkpoint(
1569                    inner.weight_tree.get(stable_era_genesis_arena_index),
1570                    inner.cur_era_stable_height,
1571                );
1572                meter.update_confirmation_risks(inner);
1573                info!(
1574                    "New checkpoint formed at block {} stable block {} height {}",
1575                    &inner.arena[inner.cur_era_genesis_block_arena_index].hash,
1576                    &inner.arena[stable_era_genesis_arena_index].hash,
1577                    inner.cur_era_genesis_height
1578                );
1579            }
1580        }
1581
1582        // send updated pivot chain to pubsub
1583        let from = capped_fork_at;
1584        let to = inner.pivot_index_to_height(inner.pivot_chain.len());
1585
1586        for epoch_number in from..to {
1587            let arena_index = inner.get_pivot_block_arena_index(epoch_number);
1588            let epoch_hashes = inner.get_epoch_block_hashes(arena_index);
1589
1590            // send epoch to pub-sub layer
1591            self.epochs_sender.send((epoch_number, epoch_hashes));
1592
1593            // send epoch to blame verifier
1594            if let NodeType::Light = self.node_type {
1595                // ConsensusNewBlockHandler is single-threaded,
1596                // lock should always succeed.
1597                self.blame_verifier.lock().process(inner, epoch_number);
1598            }
1599        }
1600
1601        // If we are inserting header only, we will skip execution and
1602        // tx_pool-related operations
1603        if !inner.header_only {
1604            // FIXME: Now we have to pass a conservative stable_height here.
1605            // FIXME: Because the storage layer does not handle the case when
1606            // FIXME: this confirmed point being reverted. We have to be extra
1607            // FIXME: conservatively but this will cost storage space.
1608            // FIXME: Eventually, we should implement the logic to recover from
1609            // FIXME: the database if such a rare reversion case happens.
1610            //
1611            // FIXME: we need a function to compute the deferred epoch
1612            // FIXME: number. the current codebase may not be
1613            // FIXME: consistent at all places.
1614            let mut confirmed_height = meter.get_confirmed_epoch_num();
1615            if confirmed_height < DEFERRED_STATE_EPOCH_COUNT {
1616                confirmed_height = 0;
1617            } else {
1618                confirmed_height -= DEFERRED_STATE_EPOCH_COUNT;
1619            }
1620            confirmed_height =
1621                inner.confirmed_height_for_state_maintenance(confirmed_height);
1622            self.data_man
1623                .storage_manager
1624                .get_storage_manager()
1625                .maintain_state_confirmed(
1626                    inner,
1627                    inner.cur_era_stable_height,
1628                    self.conf.inner_conf.era_epoch_count,
1629                    confirmed_height,
1630                    &self.data_man.state_availability_boundary,
1631                )
1632                // FIXME: propogate error.
1633                .expect(&concat!(file!(), ":", line!(), ":", column!()));
1634            self.set_block_tx_packed(inner, me);
1635            self.delayed_tx_recycle_in_skipped_blocks(inner, capped_fork_at);
1636
1637            let to_state_pos = if inner
1638                .pivot_index_to_height(inner.pivot_chain.len())
1639                < DEFERRED_STATE_EPOCH_COUNT
1640            {
1641                0
1642            } else {
1643                inner.pivot_index_to_height(inner.pivot_chain.len())
1644                    - DEFERRED_STATE_EPOCH_COUNT
1645                    + 1
1646            };
1647            let mut state_at = capped_fork_at;
1648            if capped_fork_at + DEFERRED_STATE_EPOCH_COUNT
1649                > old_pivot_chain_height
1650            {
1651                if old_pivot_chain_height > DEFERRED_STATE_EPOCH_COUNT {
1652                    state_at =
1653                        old_pivot_chain_height - DEFERRED_STATE_EPOCH_COUNT + 1;
1654                } else {
1655                    state_at = 1;
1656                }
1657            }
1658            {
1659                let mut state_availability_boundary =
1660                    inner.data_man.state_availability_boundary.write();
1661                if pivot_changed {
1662                    assert!(
1663                        capped_fork_at > state_availability_boundary.lower_bound,
1664                        "forked_at {} should > boundary_lower_bound, boundary {:?}",
1665                        capped_fork_at,
1666                        state_availability_boundary
1667                    );
1668                    if extend_pivot {
1669                        state_availability_boundary
1670                            .pivot_chain
1671                            .push(inner.arena[me].hash);
1672                    } else {
1673                        let split_off_index = capped_fork_at
1674                            - state_availability_boundary.lower_bound;
1675                        state_availability_boundary
1676                            .pivot_chain
1677                            .truncate(split_off_index as usize);
1678                        for i in inner.height_to_pivot_index(capped_fork_at)
1679                            ..inner.pivot_chain.len()
1680                        {
1681                            state_availability_boundary
1682                                .pivot_chain
1683                                .push(inner.arena[inner.pivot_chain[i]].hash);
1684                        }
1685                        if state_availability_boundary.upper_bound
1686                            >= capped_fork_at
1687                        {
1688                            state_availability_boundary.upper_bound =
1689                                capped_fork_at - 1;
1690                        }
1691                    }
1692                    state_availability_boundary.optimistic_executed_height =
1693                        if to_state_pos
1694                            > state_availability_boundary.lower_bound
1695                        {
1696                            Some(to_state_pos)
1697                        } else {
1698                            None
1699                        };
1700                }
1701                // For full node, we don't execute blocks before available
1702                // states. This skip should only happen in
1703                // `SyncBlockPhase` for full nodes
1704                if state_at < state_availability_boundary.lower_bound + 1 {
1705                    state_at = state_availability_boundary.lower_bound + 1;
1706                }
1707            }
1708
1709            // Apply transactions in the determined total order
1710            while state_at < to_state_pos {
1711                let epoch_arena_index =
1712                    inner.get_pivot_block_arena_index(state_at);
1713                let reward_execution_info = self
1714                    .executor
1715                    .get_reward_execution_info(inner, epoch_arena_index);
1716                self.executor.enqueue_epoch(EpochExecutionTask::new(
1717                    epoch_arena_index,
1718                    inner,
1719                    reward_execution_info,
1720                    true,  /* on_local_pivot */
1721                    false, /* force_recompute */
1722                ));
1723
1724                state_at += 1;
1725            }
1726        }
1727
1728        self.persist_terminals(inner);
1729        debug!(
1730            "Finish activating block in ConsensusGraph: index={:?} hash={:?} cur_era_stable_height={} cur_era_genesis_height={}",
1731            me, inner.arena[me].hash, inner.cur_era_stable_height, inner.cur_era_genesis_height
1732        );
1733    }
1734
1735    /// The top level function invoked by ConsensusGraph to insert a new block.
1736    pub fn on_new_block(
1737        &self, inner: &mut ConsensusGraphInner, meter: &ConfirmationMeter,
1738        hash: &H256,
1739    ) {
1740        let block_header = self
1741            .data_man
1742            .block_header_by_hash(hash)
1743            .expect("header exist for consensus");
1744        debug!(
1745            "insert new block into consensus: header_only={:?} block={:?}",
1746            inner.header_only, &block_header
1747        );
1748        let parent_hash = block_header.parent_hash();
1749        let parent_index = inner.hash_to_arena_indices.get(&parent_hash);
1750        let me = if parent_index.is_none()
1751            || inner.arena[*parent_index.unwrap()].era_block == NULL
1752        {
1753            // current block is outside of the current era.
1754            debug!(
1755                "parent={:?} not in consensus graph or not in the genesis subtree, inserted as an out-era block stub",
1756                parent_hash
1757            );
1758            let block_status_in_db = self
1759                .data_man
1760                .local_block_info_by_hash(hash)
1761                .map(|info| info.get_status())
1762                .unwrap_or(BlockStatus::Pending);
1763            let (sn, me) = inner.insert_out_era_block(
1764                &block_header,
1765                block_status_in_db == BlockStatus::PartialInvalid,
1766            );
1767            let block_info = LocalBlockInfo::new(
1768                block_status_in_db,
1769                sn,
1770                self.data_man.get_instance_id(),
1771            );
1772            self.data_man.insert_local_block_info(hash, block_info);
1773            // If me is NULL, it means that this block does not have any stub,
1774            // so we can safely ignore it in the consensus besides
1775            // update its sequence number in the data manager.
1776            if me == NULL {
1777                // Block body in the anticone of a checkpoint is not needed for
1778                // full nodes, but they are still needed to sync
1779                // an archive node in the current implementation (to make their
1780                // child blocks `graph_ready`), so we still keep
1781                // them for now.
1782
1783                // self.data_man
1784                //     .remove_block_body(hash, true /* remove_db */);
1785                return;
1786            }
1787            me
1788        } else {
1789            let (me, indices_len) = inner.insert(&block_header);
1790            self.statistics
1791                .set_consensus_graph_inserted_block_count(indices_len);
1792            self.update_lcts_initial(inner, me);
1793            me
1794        };
1795
1796        if inner.arena[me].data.inactive_dependency_cnt == 0 {
1797            let mut queue: VecDeque<usize> = VecDeque::new();
1798            queue.push_back(me);
1799            while let Some(me) = queue.pop_front() {
1800                // For out-of-era blocks, we just fetch the results from the
1801                // already filled field. We do not run
1802                // preactivate_block() on them.
1803                let block_status = if inner.arena[me].era_block != NULL {
1804                    self.preactivate_block(inner, me)
1805                } else {
1806                    if inner.arena[me].data.partial_invalid {
1807                        BlockStatus::PartialInvalid
1808                    } else {
1809                        BlockStatus::Pending
1810                    }
1811                };
1812
1813                if block_status == BlockStatus::PartialInvalid {
1814                    inner.arena[me].data.partial_invalid = true;
1815                    let timer =
1816                        self.compute_invalid_block_start_timer(inner, me);
1817                    // We are not going to delay partial invalid blocks in the
1818                    // bench mode
1819                    if self.conf.bench_mode {
1820                        inner.invalid_block_queue.push((0, me));
1821                    } else {
1822                        inner.invalid_block_queue.push((-(timer as i128), me));
1823                    }
1824                    inner.arena[me].data.inactive_dependency_cnt = NULL;
1825                    debug!(
1826                        "Block {} (hash = {}) is partially invalid, all of its future will be non-active till timer height {}",
1827                        me, inner.arena[me].hash, timer
1828                    );
1829                } else {
1830                    if block_status == BlockStatus::Pending {
1831                        inner.arena[me].data.pending = true;
1832                        debug!(
1833                            "Block {} (hash = {}) is pending but processed",
1834                            me, inner.arena[me].hash
1835                        );
1836                    } else {
1837                        debug!(
1838                            "Block {} (hash = {}) is fully valid",
1839                            me, inner.arena[me].hash
1840                        );
1841                    }
1842                    self.activate_block(inner, me, meter, &mut queue);
1843                }
1844                // Now we are going to check all invalid blocks in the delay
1845                // queue Activate them if the timer is up
1846                let timer = if let Some(x) = inner.timer_chain.last() {
1847                    inner.arena[*x].data.ledger_view_timer_chain_height + 1
1848                } else {
1849                    inner.cur_era_genesis_timer_chain_height
1850                };
1851                loop {
1852                    if let Some((t, _)) = inner.invalid_block_queue.peek() {
1853                        if timer < (-*t) as u64 {
1854                            break;
1855                        }
1856                    } else {
1857                        break;
1858                    }
1859                    let (_, x) = inner.invalid_block_queue.pop().unwrap();
1860                    assert!(
1861                        inner.arena[x].data.inactive_dependency_cnt == NULL
1862                    );
1863                    inner.arena[x].data.inactive_dependency_cnt = 0;
1864                    self.activate_block(inner, x, meter, &mut queue);
1865                }
1866            }
1867        } else {
1868            debug!(
1869                "Block {} (hash = {}) is non-active with active counter {}",
1870                me,
1871                inner.arena[me].hash,
1872                inner.arena[me].data.inactive_dependency_cnt
1873            );
1874        }
1875    }
1876
1877    fn persist_block_info(
1878        &self, inner: &mut ConsensusGraphInner, me: usize,
1879        block_status: BlockStatus,
1880    ) {
1881        let block_info = LocalBlockInfo::new(
1882            block_status,
1883            inner.arena[me].data.sequence_number,
1884            self.data_man.get_instance_id(),
1885        );
1886        self.data_man
1887            .insert_local_block_info(&inner.arena[me].hash, block_info);
1888        let era_block = inner.arena[me].era_block();
1889        let era_block_hash = if era_block != NULL {
1890            inner.arena[era_block].hash
1891        } else {
1892            Default::default()
1893        };
1894        if inner.inner_conf.enable_state_expose {
1895            STATE_EXPOSER.consensus_graph.lock().block_state_vec.push(
1896                ConsensusGraphBlockState {
1897                    block_hash: inner.arena[me].hash,
1898                    best_block_hash: inner.best_block_hash(),
1899                    block_status: block_info.get_status(),
1900                    era_block_hash,
1901                    adaptive: inner.arena[me].adaptive(),
1902                },
1903            )
1904        }
1905    }
1906
1907    /// construct_pivot_state() rebuild pivot chain state info from db
1908    /// avoiding intermediate redundant computation triggered by
1909    /// on_new_block().
1910    /// It also recovers receipts_root and logs_bloom_hash in pivot chain.
1911    /// This function is only invoked from recover_graph_from_db with
1912    /// header_only being false.
1913    pub fn construct_pivot_state(
1914        &self, inner: &mut ConsensusGraphInner, meter: &ConfirmationMeter,
1915    ) {
1916        // FIXME: this line doesn't exactly match its purpose.
1917        // FIXME: Is it the checkpoint or synced snapshot or could it be
1918        // anything else?
1919        let state_boundary_height =
1920            self.data_man.state_availability_boundary.read().lower_bound;
1921        let start_pivot_index =
1922            (state_boundary_height - inner.cur_era_genesis_height) as usize;
1923        debug!(
1924            "construct_pivot_state: start={}, pivot_chain.len()={}, state_boundary_height={}",
1925            start_pivot_index,
1926            inner.pivot_chain.len(),
1927            state_boundary_height
1928        );
1929        if start_pivot_index >= inner.pivot_chain.len() {
1930            // The pivot chain of recovered blocks is before state lower_bound,
1931            // so we do not need to construct any pivot state.
1932            return;
1933        }
1934        let start_hash = inner.arena[inner.pivot_chain[start_pivot_index]].hash;
1935        // Here, we should ensure the epoch_execution_commitment for stable hash
1936        // must be loaded into memory. Since, in some rare cases, the number of
1937        // blocks between stable and best_epoch is less than
1938        // DEFERRED_STATE_EPOCH_COUNT, the for loop below will not load
1939        // epoch_execution_commitment for stable hash.
1940        if start_hash != inner.data_man.true_genesis.hash()
1941            && self
1942                .data_man
1943                .get_epoch_execution_commitment(&start_hash)
1944                .is_none()
1945        {
1946            self.data_man.load_epoch_execution_commitment_from_db(&start_hash)
1947                .expect("epoch_execution_commitment for stable hash must exist in disk");
1948        }
1949        {
1950            let mut state_availability_boundary =
1951                self.data_man.state_availability_boundary.write();
1952            assert!(
1953                state_availability_boundary.lower_bound
1954                    == state_availability_boundary.upper_bound
1955            );
1956            for pivot_index in start_pivot_index + 1..inner.pivot_chain.len() {
1957                state_availability_boundary
1958                    .pivot_chain
1959                    .push(inner.arena[inner.pivot_chain[pivot_index]].hash);
1960            }
1961        }
1962
1963        if inner.pivot_chain.len() < DEFERRED_STATE_EPOCH_COUNT as usize {
1964            return;
1965        }
1966
1967        let end_index =
1968            inner.pivot_chain.len() - DEFERRED_STATE_EPOCH_COUNT as usize + 1;
1969        for pivot_index in start_pivot_index + 1..end_index {
1970            let pivot_arena_index = inner.pivot_chain[pivot_index];
1971            let pivot_hash = inner.arena[pivot_arena_index].hash;
1972
1973            // Ensure that the commitments for the blocks on
1974            // pivot_chain after cur_era_stable_genesis are kept in memory.
1975            if self
1976                .data_man
1977                .load_epoch_execution_commitment_from_db(&pivot_hash)
1978                .is_none()
1979            {
1980                break;
1981            }
1982        }
1983
1984        // Retrieve the most recently executed epoch
1985        let mut start_compute_epoch_pivot_index =
1986            self.get_force_compute_index(inner, start_pivot_index, end_index);
1987
1988        // Retrieve the earliest non-executed epoch
1989        for pivot_index in start_pivot_index + 1..end_index {
1990            let pivot_arena_index = inner.pivot_chain[pivot_index];
1991            let pivot_hash = inner.arena[pivot_arena_index].hash;
1992
1993            if self
1994                .data_man
1995                .get_epoch_execution_commitment(&pivot_hash)
1996                .is_none()
1997            {
1998                start_compute_epoch_pivot_index =
1999                    min(pivot_index, start_compute_epoch_pivot_index);
2000                debug!(
2001                    "Start compute epoch pivot index {}, height {}",
2002                    pivot_index, inner.arena[pivot_arena_index].height
2003                );
2004                break;
2005            }
2006        }
2007
2008        let snapshot_epoch_count = inner
2009            .data_man
2010            .storage_manager
2011            .get_storage_manager()
2012            .get_snapshot_epoch_count();
2013        let mut need_set_intermediate_trie_root_merkle = false;
2014        let max_snapshot_epoch_index_has_mpt = self
2015            .recover_latest_mpt_snapshot_if_needed(
2016                inner,
2017                &mut start_compute_epoch_pivot_index,
2018                start_pivot_index,
2019                end_index,
2020                &mut need_set_intermediate_trie_root_merkle,
2021                snapshot_epoch_count as u64,
2022            );
2023        self.set_intermediate_trie_root_merkle(
2024            inner,
2025            start_compute_epoch_pivot_index,
2026            need_set_intermediate_trie_root_merkle,
2027            snapshot_epoch_count as u64,
2028        );
2029
2030        let confirmed_epoch_num = meter.get_confirmed_epoch_num();
2031        for pivot_index in start_pivot_index + 1..end_index {
2032            let pivot_arena_index = inner.pivot_chain[pivot_index];
2033            let pivot_hash = inner.arena[pivot_arena_index].hash;
2034            let height = inner.arena[pivot_arena_index].height;
2035
2036            let compute_epoch =
2037                if pivot_index >= start_compute_epoch_pivot_index {
2038                    true
2039                } else {
2040                    false
2041                };
2042
2043            if self
2044                .data_man
2045                .get_epoch_execution_commitment(&pivot_hash)
2046                .is_some()
2047            {
2048                self.data_man
2049                    .state_availability_boundary
2050                    .write()
2051                    .upper_bound += 1;
2052            }
2053
2054            info!(
2055                "construct_pivot_state: index {} height {} compute_epoch {}.",
2056                pivot_index, height, compute_epoch,
2057            );
2058
2059            if compute_epoch {
2060                let reward_execution_info = self
2061                    .executor
2062                    .get_reward_execution_info(inner, pivot_arena_index);
2063
2064                let recover_mpt_during_construct_pivot_state =
2065                    max_snapshot_epoch_index_has_mpt.map_or(true, |idx| {
2066                        pivot_index > idx + snapshot_epoch_count as usize
2067                    });
2068                info!(
2069                    "compute epoch recovery flag {}",
2070                    recover_mpt_during_construct_pivot_state
2071                );
2072                self.executor.compute_epoch(
2073                    EpochExecutionTask::new(
2074                        pivot_arena_index,
2075                        inner,
2076                        reward_execution_info,
2077                        true, /* on_local_pivot */
2078                        true, /* force_recompute */
2079                    ),
2080                    None,
2081                    recover_mpt_during_construct_pivot_state,
2082                );
2083
2084                // Remove old-pivot state during start up to save disk,
2085                // otherwise, all state will be keep till normal phase, this
2086                // will occupy too many disk
2087                {
2088                    let mut confirmed_height = min(confirmed_epoch_num, height);
2089                    if confirmed_height < DEFERRED_STATE_EPOCH_COUNT {
2090                        confirmed_height = 0;
2091                    } else {
2092                        confirmed_height -= DEFERRED_STATE_EPOCH_COUNT;
2093                    }
2094                    confirmed_height = inner
2095                        .confirmed_height_for_state_maintenance(
2096                            confirmed_height,
2097                        );
2098
2099                    self.data_man
2100                        .storage_manager
2101                        .get_storage_manager()
2102                        .maintain_state_confirmed(
2103                            inner,
2104                            inner.cur_era_stable_height,
2105                            self.conf.inner_conf.era_epoch_count,
2106                            confirmed_height,
2107                            &self.data_man.state_availability_boundary,
2108                        )
2109                        .expect(&concat!(
2110                            file!(),
2111                            ":",
2112                            line!(),
2113                            ":",
2114                            column!()
2115                        ));
2116                }
2117            }
2118        }
2119
2120        inner
2121            .data_man
2122            .storage_manager
2123            .get_storage_manager()
2124            .get_snapshot_manager()
2125            .get_snapshot_db_manager()
2126            .clean_snapshot_epoch_id_before_recovered();
2127    }
2128
2129    fn get_force_compute_index(
2130        &self, inner: &mut ConsensusGraphInner, start_pivot_index: usize,
2131        end_index: usize,
2132    ) -> usize {
2133        let mut force_compute_index = start_pivot_index + 1;
2134        let mut epoch_count = 0;
2135        for pivot_index in (start_pivot_index + 1..end_index).rev() {
2136            let pivot_arena_index = inner.pivot_chain[pivot_index];
2137            let pivot_hash = inner.arena[pivot_arena_index].hash;
2138
2139            let maybe_epoch_execution_commitment =
2140                self.data_man.get_epoch_execution_commitment(&pivot_hash);
2141            if let Some(commitment) = *maybe_epoch_execution_commitment {
2142                if self
2143                    .data_man
2144                    .storage_manager
2145                    .get_state_no_commit_inner(
2146                        StateIndex::new_for_readonly(
2147                            &pivot_hash,
2148                            &commitment.state_root_with_aux_info,
2149                        ),
2150                        /* try_open = */ false,
2151                        false,
2152                    )
2153                    .expect("DB Error")
2154                    .is_some()
2155                {
2156                    epoch_count += 1;
2157
2158                    // force to recompute last 5 epochs in case state database
2159                    // is not ready in last shutdown
2160                    if epoch_count > DEFERRED_STATE_EPOCH_COUNT {
2161                        let reward_execution_info =
2162                            self.executor.get_reward_execution_info(
2163                                inner,
2164                                pivot_arena_index,
2165                            );
2166
2167                        let pivot_block_height = self
2168                            .data_man
2169                            .block_header_by_hash(&pivot_hash)
2170                            .expect("must exists")
2171                            .height();
2172
2173                        // ensure current epoch is new executed whether there
2174                        // is a fork or not
2175                        if self.executor.epoch_executed_and_recovered(
2176                            &pivot_hash,
2177                            &inner.get_epoch_block_hashes(pivot_arena_index),
2178                            true,
2179                            &reward_execution_info,
2180                            pivot_block_height,
2181                        ) {
2182                            force_compute_index = pivot_index + 1;
2183                            debug!(
2184                                "Force compute start index {}",
2185                                force_compute_index
2186                            );
2187                            break;
2188                        }
2189                    }
2190                } else {
2191                    epoch_count = 0;
2192                }
2193            }
2194        }
2195
2196        if let Some(height) = self
2197            .conf
2198            .inner_conf
2199            .force_recompute_height_during_construct_pivot
2200        {
2201            if height > inner.cur_era_stable_height {
2202                let pivot_idx = inner.height_to_pivot_index(height);
2203                debug!(
2204                    "force recompute height during constructing pivot {}",
2205                    pivot_idx
2206                );
2207                force_compute_index = min(force_compute_index, pivot_idx);
2208            }
2209        }
2210
2211        force_compute_index
2212    }
2213
2214    fn recover_latest_mpt_snapshot_if_needed(
2215        &self, inner: &mut ConsensusGraphInner,
2216        start_compute_epoch_pivot_index: &mut usize, start_pivot_index: usize,
2217        end_index: usize, need_set_intermediate_trie_root_merkle: &mut bool,
2218        snapshot_epoch_count: u64,
2219    ) -> Option<usize> {
2220        if !self.conf.inner_conf.use_isolated_db_for_mpt_table {
2221            return Some(end_index);
2222        }
2223
2224        let (
2225            temp_snapshot_db_existing,
2226            removed_snapshots,
2227            latest_snapshot_epoch_height,
2228            max_snapshot_epoch_height_has_mpt,
2229        ) = if let Some((
2230            temp_snapshot_db_existing,
2231            removed_snapshots,
2232            latest_snapshot_epoch_height,
2233            max_snapshot_epoch_height_has_mpt,
2234        )) = inner
2235            .data_man
2236            .storage_manager
2237            .get_storage_manager()
2238            .persist_state_from_initialization
2239            .write()
2240            .take()
2241        {
2242            (
2243                temp_snapshot_db_existing,
2244                removed_snapshots,
2245                max(latest_snapshot_epoch_height, inner.cur_era_stable_height),
2246                max_snapshot_epoch_height_has_mpt,
2247            )
2248        } else {
2249            (None, HashSet::new(), inner.cur_era_stable_height, None)
2250        };
2251
2252        debug!("latest snapshot epoch height: {}, temp snapshot status: {:?}, max snapshot epoch height has mpt: {:?}, removed snapshots {:?}",
2253            latest_snapshot_epoch_height, temp_snapshot_db_existing, max_snapshot_epoch_height_has_mpt, removed_snapshots);
2254
2255        if removed_snapshots.len() == 1
2256            && removed_snapshots.contains(&NULL_EPOCH)
2257        {
2258            debug!("special case for synced snapshot");
2259            return Some(end_index);
2260        }
2261
2262        if max_snapshot_epoch_height_has_mpt
2263            .is_some_and(|h| h == latest_snapshot_epoch_height)
2264        {
2265            inner
2266                .data_man
2267                .storage_manager
2268                .get_storage_manager()
2269                .get_snapshot_manager()
2270                .get_snapshot_db_manager()
2271                .recreate_latest_mpt_snapshot()
2272                .unwrap();
2273
2274            info!(
2275                "snapshot for epoch height {} is still not use mpt database",
2276                start_compute_epoch_pivot_index
2277            );
2278            return Some(end_index);
2279        }
2280
2281        // maximum epoch need to compute
2282        let maximum_height_to_create_next_snapshot =
2283            latest_snapshot_epoch_height + snapshot_epoch_count * 2;
2284        let index =
2285            inner.height_to_pivot_index(maximum_height_to_create_next_snapshot);
2286        if *start_compute_epoch_pivot_index > index {
2287            warn!("start_compute_epoch_pivot_index is greater than maximum epoch need to compute {}", index);
2288            *start_compute_epoch_pivot_index = index;
2289        }
2290
2291        // Find the closest ear prior to the start_compute_epoch_height
2292        let start_compute_epoch_height = inner.arena
2293            [inner.pivot_chain[*start_compute_epoch_pivot_index]]
2294            .height;
2295        info!(
2296            "current start compute epoch height {}",
2297            start_compute_epoch_height
2298        );
2299
2300        let recovery_latest_mpt_snapshot =
2301            if self.conf.inner_conf.recovery_latest_mpt_snapshot
2302                || start_compute_epoch_height <= latest_snapshot_epoch_height
2303                || (temp_snapshot_db_existing.is_some()
2304                    && latest_snapshot_epoch_height
2305                        < start_compute_epoch_height
2306                    && start_compute_epoch_height
2307                        <= latest_snapshot_epoch_height + snapshot_epoch_count)
2308            {
2309                true
2310            } else {
2311                let mut max_epoch_height = 0;
2312                for pivot_index in (start_pivot_index..end_index)
2313                    .step_by(snapshot_epoch_count as usize)
2314                {
2315                    let pivot_arena_index = inner.pivot_chain[pivot_index];
2316                    let pivot_hash = inner.arena[pivot_arena_index].hash;
2317
2318                    debug!(
2319                        "snapshot pivot_index {} height {} ",
2320                        pivot_index, inner.arena[pivot_arena_index].height
2321                    );
2322
2323                    if removed_snapshots.contains(&pivot_hash) {
2324                        max_epoch_height = max(
2325                            max_epoch_height,
2326                            inner.arena[pivot_arena_index].height,
2327                        );
2328                    }
2329                }
2330
2331                // snapshots after latest_snapshot_epoch_height is removed
2332                latest_snapshot_epoch_height < max_epoch_height
2333            };
2334
2335        // if the latest_snapshot_epoch_height is greater than
2336        // start_compute_epoch_height, the latest MPT snapshot is dirty
2337        if recovery_latest_mpt_snapshot {
2338            let era_pivot_epoch_height = if start_compute_epoch_height
2339                <= inner.cur_era_stable_height + snapshot_epoch_count
2340            {
2341                debug!("snapshot for cur_era_stable_height must be exist");
2342                inner.cur_era_stable_height
2343            } else {
2344                (start_compute_epoch_height - snapshot_epoch_count - 1)
2345                    / self.conf.inner_conf.era_epoch_count
2346                    * self.conf.inner_conf.era_epoch_count
2347            };
2348
2349            if era_pivot_epoch_height > latest_snapshot_epoch_height {
2350                panic!("era_pivot_epoch_height is greater than latest_snapshot_epoch_height, this should not happen");
2351            }
2352
2353            debug!(
2354                "need recovery latest mpt snapshot, start compute epoch height {}, era pivot epoch height {}",
2355                start_compute_epoch_height, era_pivot_epoch_height
2356            );
2357
2358            if start_compute_epoch_height <= era_pivot_epoch_height {
2359                unreachable!("start_compute_epoch_height {} is smaller than era_pivot_epoch_height {}", start_compute_epoch_height, era_pivot_epoch_height);
2360            } else if start_compute_epoch_height
2361                <= era_pivot_epoch_height + snapshot_epoch_count
2362            {
2363                if start_compute_epoch_height % snapshot_epoch_count == 1 {
2364                    *need_set_intermediate_trie_root_merkle = true;
2365                }
2366            } else if start_compute_epoch_height
2367                <= era_pivot_epoch_height + snapshot_epoch_count * 2
2368            {
2369                // nothing need to do
2370            } else {
2371                let new_height =
2372                    era_pivot_epoch_height + snapshot_epoch_count * 2;
2373                let new_index = inner.height_to_pivot_index(new_height);
2374
2375                info!("reset start_compute_epoch_pivot_index to {}", new_index);
2376                *start_compute_epoch_pivot_index = new_index;
2377            }
2378
2379            let era_pivot_hash = if era_pivot_epoch_height == 0 {
2380                NULL_EPOCH
2381            } else {
2382                inner
2383                    .get_pivot_hash_from_epoch_number(era_pivot_epoch_height)
2384                    .expect("pivot hash should be exist")
2385            };
2386
2387            let snapshot_db_manager = inner
2388                .data_man
2389                .storage_manager
2390                .get_storage_manager()
2391                .get_snapshot_manager()
2392                .get_snapshot_db_manager();
2393
2394            snapshot_db_manager.update_latest_snapshot_id(
2395                era_pivot_hash.clone(),
2396                era_pivot_epoch_height,
2397            );
2398
2399            if max_snapshot_epoch_height_has_mpt
2400                .is_some_and(|height| height >= era_pivot_epoch_height)
2401            {
2402                // mpt snapshot will be created from empty
2403                snapshot_db_manager.recreate_latest_mpt_snapshot().unwrap();
2404            } else {
2405                let pivot_hash_before_era = if era_pivot_epoch_height == 0 {
2406                    None
2407                } else {
2408                    Some(
2409                        inner
2410                            .get_pivot_hash_from_epoch_number(
2411                                era_pivot_epoch_height - snapshot_epoch_count,
2412                            )
2413                            .expect("pivot hash should be exist"),
2414                    )
2415                };
2416
2417                // use ear snapshot replace latest
2418                snapshot_db_manager
2419                    .recovery_latest_mpt_snapshot_from_checkpoint(
2420                        &era_pivot_hash,
2421                        pivot_hash_before_era,
2422                    )
2423                    .unwrap();
2424            }
2425
2426            max_snapshot_epoch_height_has_mpt.and_then(|v| {
2427                if v >= inner.cur_era_stable_height {
2428                    Some(inner.height_to_pivot_index(v))
2429                } else {
2430                    None
2431                }
2432            })
2433        } else {
2434            if temp_snapshot_db_existing.is_some()
2435                && latest_snapshot_epoch_height + snapshot_epoch_count
2436                    < start_compute_epoch_height
2437                && start_compute_epoch_height
2438                    <= latest_snapshot_epoch_height + 2 * snapshot_epoch_count
2439            {
2440                inner
2441                    .data_man
2442                    .storage_manager
2443                    .get_storage_manager()
2444                    .get_snapshot_manager()
2445                    .get_snapshot_db_manager()
2446                    .set_reconstruct_snapshot_id(temp_snapshot_db_existing);
2447            }
2448
2449            debug!("the latest MPT snapshot is valid");
2450            Some(end_index)
2451        }
2452    }
2453
2454    fn set_intermediate_trie_root_merkle(
2455        &self, inner: &mut ConsensusGraphInner,
2456        start_compute_epoch_pivot_index: usize,
2457        need_set_intermediate_trie_root_merkle: bool,
2458        snapshot_epoch_count: u64,
2459    ) {
2460        let storage_manager =
2461            inner.data_man.storage_manager.get_storage_manager();
2462        if !storage_manager
2463            .storage_conf
2464            .keep_snapshot_before_stable_checkpoint
2465            || need_set_intermediate_trie_root_merkle
2466        {
2467            let pivot_arena_index =
2468                inner.pivot_chain[start_compute_epoch_pivot_index - 1];
2469            let pivot_hash = inner.arena[pivot_arena_index].hash;
2470            let height = inner.arena[pivot_arena_index].height + 1;
2471
2472            let intermediate_trie_root_merkle = match *self
2473                .data_man
2474                .get_epoch_execution_commitment(&pivot_hash)
2475            {
2476                Some(commitment) => {
2477                    if height % snapshot_epoch_count == 1 {
2478                        commitment
2479                            .state_root_with_aux_info
2480                            .state_root
2481                            .delta_root
2482                    } else {
2483                        commitment
2484                            .state_root_with_aux_info
2485                            .state_root
2486                            .intermediate_delta_root
2487                    }
2488                }
2489                None => MERKLE_NULL_NODE,
2490            };
2491
2492            debug!("previous pivot hash {:?} intermediate trie root merkle for next pivot {:?}", pivot_hash, intermediate_trie_root_merkle);
2493            *storage_manager.intermediate_trie_root_merkle.write() =
2494                Some(intermediate_trie_root_merkle);
2495        }
2496    }
2497
2498    fn set_block_tx_packed(&self, inner: &ConsensusGraphInner, me: usize) {
2499        if !self.txpool.ready_for_mining() {
2500            // Skip tx pool operation before catching up.
2501            return;
2502        }
2503        let parent = inner.arena[me].parent;
2504        if parent == NULL {
2505            warn!(
2506                "set_block_tx_packed skips block with empty parent {:?}",
2507                inner.arena[me].hash
2508            );
2509            return;
2510        }
2511        let era_genesis_height =
2512            inner.get_era_genesis_height(inner.arena[parent].height);
2513        let cur_pivot_era_block = if inner
2514            .pivot_index_to_height(inner.pivot_chain.len())
2515            > era_genesis_height
2516        {
2517            inner.get_pivot_block_arena_index(era_genesis_height)
2518        } else {
2519            NULL
2520        };
2521        let era_block = inner.get_era_genesis_block_with_parent(parent);
2522
2523        // It's only correct to set tx stale after the block is considered
2524        // terminal for mining.
2525        // Note that we conservatively only mark those blocks inside the
2526        // current pivot era
2527        if era_block == cur_pivot_era_block {
2528            self.txpool.set_tx_packed(
2529                &self
2530                    .data_man
2531                    .block_by_hash(
2532                        &inner.arena[me].hash,
2533                        true, /* update_cache */
2534                    )
2535                    .expect("Already checked")
2536                    .transactions,
2537            );
2538        } else {
2539            warn!("set_block_tx_packed skips block {:?}", inner.arena[me].hash);
2540        }
2541    }
2542
2543    fn delayed_tx_recycle_in_skipped_blocks(
2544        &self, inner: &mut ConsensusGraphInner, fork_height: u64,
2545    ) {
2546        if !self.txpool.ready_for_mining() {
2547            // Skip tx pool operation before catching up.
2548            return;
2549        }
2550        if inner.pivot_chain.len() > RECYCLE_TRANSACTION_DELAY as usize {
2551            let recycle_end_pivot_index = inner.pivot_chain.len()
2552                - RECYCLE_TRANSACTION_DELAY as usize
2553                - 1;
2554            // If the pivot reorg is deeper than `RECYCLE_TRANSACTION_DELAY`, we
2555            // will try to recycle all skipped blocks since the
2556            // forking point.
2557            let start = min(
2558                // `fork_height` has been capped by the caller.
2559                inner.height_to_pivot_index(fork_height),
2560                recycle_end_pivot_index,
2561            );
2562            for recycle_pivot_index in start..=recycle_end_pivot_index {
2563                let recycle_arena_index =
2564                    inner.pivot_chain[recycle_pivot_index];
2565                let skipped_blocks = inner
2566                    .get_or_compute_skipped_epoch_blocks(recycle_arena_index)
2567                    .clone();
2568                for h in &skipped_blocks {
2569                    self.recycle_tx_in_block(inner, h);
2570                }
2571            }
2572        }
2573    }
2574}