cfxcore/consensus/consensus_inner/
consensus_new_block_handler.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use super::blame_verifier::BlameVerifier;
6use crate::{
7    block_data_manager::{BlockDataManager, BlockStatus, LocalBlockInfo},
8    channel::Channel,
9    consensus::{
10        consensus_inner::{
11            confirmation_meter::ConfirmationMeter,
12            consensus_executor::{ConsensusExecutor, EpochExecutionTask},
13            ConsensusGraphInner, NULL,
14        },
15        pivot_hint::PivotHint,
16        pos_handler::PosVerifier,
17        ConsensusConfig,
18    },
19    state_exposer::{ConsensusGraphBlockState, STATE_EXPOSER},
20    statistics::SharedStatistics,
21    NodeType, Notifications, SharedTransactionPool,
22};
23use cfx_parameters::{consensus::*, consensus_internal::*};
24use cfx_storage::{storage_db::SnapshotDbManagerTrait, StateIndex};
25use cfx_types::H256;
26use hibitset::{BitSet, BitSetLike, DrainableBitSet};
27use parking_lot::Mutex;
28use primitives::{MERKLE_NULL_NODE, NULL_EPOCH};
29use std::{
30    cmp::{max, min},
31    collections::{BinaryHeap, HashMap, HashSet, VecDeque},
32    slice::Iter,
33    sync::Arc,
34};
35
36pub struct ConsensusNewBlockHandler {
37    conf: ConsensusConfig,
38    txpool: SharedTransactionPool,
39    data_man: Arc<BlockDataManager>,
40    executor: Arc<ConsensusExecutor>,
41    pos_verifier: Arc<PosVerifier>,
42    statistics: SharedStatistics,
43
44    /// Channel used to send epochs to PubSub
45    /// Each element is <epoch_number, epoch_hashes>
46    epochs_sender: Arc<Channel<(u64, Vec<H256>)>>,
47
48    /// API used for verifying blaming on light nodes.
49    blame_verifier: Mutex<BlameVerifier>,
50
51    /// The type of this node: Archive, Full, or Light.
52    node_type: NodeType,
53
54    pivot_hint: Option<Arc<PivotHint>>,
55}
56
57/// ConsensusNewBlockHandler contains all sub-routines for handling new arriving
58/// blocks from network or db. It manipulates and updates ConsensusGraphInner
59/// object accordingly.
60impl ConsensusNewBlockHandler {
61    pub fn new(
62        conf: ConsensusConfig, txpool: SharedTransactionPool,
63        data_man: Arc<BlockDataManager>, executor: Arc<ConsensusExecutor>,
64        statistics: SharedStatistics, notifications: Arc<Notifications>,
65        node_type: NodeType, pos_verifier: Arc<PosVerifier>,
66        pivot_hint: Option<Arc<PivotHint>>,
67    ) -> Self {
68        let epochs_sender = notifications.epochs_ordered.clone();
69        let blame_verifier =
70            Mutex::new(BlameVerifier::new(data_man.clone(), notifications));
71
72        Self {
73            pos_verifier,
74            conf,
75            txpool,
76            data_man,
77            executor,
78            statistics,
79            epochs_sender,
80            blame_verifier,
81            node_type,
82            pivot_hint,
83        }
84    }
85
86    /// Return (old_era_block_set, new_era_block_set).
87    /// `old_era_block_set` includes the blocks in the past of
88    /// `new_era_block_arena_index`. `new_era_block_set` includes all other
89    /// blocks (the anticone and the future).
90    fn compute_old_era_and_new_era_block_set(
91        inner: &mut ConsensusGraphInner, new_era_block_arena_index: usize,
92    ) -> (HashSet<usize>, HashSet<usize>) {
93        // We first compute the set of blocks inside the new era and we
94        // recompute the past_weight inside the stable height.
95        let mut old_era_block_arena_index_set = HashSet::new();
96        let mut queue = VecDeque::new();
97        queue.push_back(new_era_block_arena_index);
98        while let Some(x) = queue.pop_front() {
99            if inner.arena[x].parent != NULL
100                && !old_era_block_arena_index_set
101                    .contains(&inner.arena[x].parent)
102            {
103                old_era_block_arena_index_set.insert(inner.arena[x].parent);
104                queue.push_back(inner.arena[x].parent);
105            }
106            for referee in &inner.arena[x].referees {
107                if *referee != NULL
108                    && !old_era_block_arena_index_set.contains(referee)
109                {
110                    old_era_block_arena_index_set.insert(*referee);
111                    queue.push_back(*referee);
112                }
113            }
114        }
115        let mut new_era_block_arena_index_set = HashSet::new();
116        for (i, _) in &inner.arena {
117            if !old_era_block_arena_index_set.contains(&i) {
118                new_era_block_arena_index_set.insert(i);
119            }
120        }
121        (old_era_block_arena_index_set, new_era_block_arena_index_set)
122    }
123
124    /// Note that there is an important assumption: the timer chain must have no
125    /// block in the anticone of new_era_block_arena_index. If this is not
126    /// true, it cannot become a checkpoint block
127    fn make_checkpoint_at(
128        inner: &mut ConsensusGraphInner, new_era_block_arena_index: usize,
129    ) {
130        let new_era_height = inner.arena[new_era_block_arena_index].height;
131        let (outside_block_arena_indices, new_era_block_arena_index_set) =
132            Self::compute_old_era_and_new_era_block_set(
133                inner,
134                new_era_block_arena_index,
135            );
136
137        // This is the arena indices for legacy blocks.
138        let mut new_era_genesis_subtree = HashSet::new();
139        let mut queue = VecDeque::new();
140        queue.push_back(new_era_block_arena_index);
141        while let Some(x) = queue.pop_front() {
142            new_era_genesis_subtree.insert(x);
143            for child in &inner.arena[x].children {
144                queue.push_back(*child);
145            }
146        }
147        let new_era_legacy_block_arena_index_set: HashSet<_> =
148            new_era_block_arena_index_set
149                .difference(&new_era_genesis_subtree)
150                .collect();
151
152        // Next we are going to recompute all referee and referrer information
153        // in arena
154        let new_era_pivot_index = inner.height_to_pivot_index(new_era_height);
155        for v in new_era_block_arena_index_set.iter() {
156            let me = *v;
157            // It is necessary to process `referees` and
158            // `blockset_in_own_view_of_epoch` because
159            // `new_era_block_arena_index_set` include the blocks in
160            // the anticone of the new era genesis.
161            inner.arena[me]
162                .referees
163                .retain(|v| new_era_block_arena_index_set.contains(v));
164            inner.arena[me]
165                .data
166                .blockset_in_own_view_of_epoch
167                .retain(|v| new_era_block_arena_index_set.contains(v));
168            if !new_era_block_arena_index_set.contains(
169                &inner.arena[me].data.past_view_last_timer_block_arena_index,
170            ) {
171                inner.arena[me].data.past_view_last_timer_block_arena_index =
172                    NULL;
173            }
174            if !new_era_block_arena_index_set
175                .contains(&inner.arena[me].data.force_confirm)
176            {
177                inner.arena[me].data.force_confirm = new_era_block_arena_index;
178            }
179        }
180        // reassign the parent for outside era blocks
181        for v in new_era_legacy_block_arena_index_set {
182            let me = *v;
183            let mut parent = inner.arena[me].parent;
184            if inner.arena[me].era_block != NULL {
185                inner.split_root(me);
186            }
187            if !new_era_block_arena_index_set.contains(&parent) {
188                parent = NULL;
189            }
190            inner.arena[me].parent = parent;
191            inner.arena[me].era_block = NULL;
192            inner.terminal_hashes.remove(&inner.arena[me].hash);
193        }
194        // Now we are ready to cleanup outside blocks in inner data structures
195        inner
196            .pastset_cache
197            .intersect_update(&outside_block_arena_indices);
198        for index in outside_block_arena_indices {
199            let hash = inner.arena[index].hash;
200            inner.hash_to_arena_indices.remove(&hash);
201            inner.terminal_hashes.remove(&hash);
202            inner.arena.remove(index);
203            // remove useless data in BlockDataManager
204            inner.data_man.remove_epoch_execution_commitment(&hash);
205            inner.data_man.remove_epoch_execution_context(&hash);
206        }
207
208        // Now we truncate the timer chain that are outside the genesis.
209        let mut timer_chain_truncate = 0;
210        while timer_chain_truncate < inner.timer_chain.len()
211            && !new_era_block_arena_index_set
212                .contains(&inner.timer_chain[timer_chain_truncate])
213        {
214            timer_chain_truncate += 1;
215        }
216        inner.cur_era_genesis_timer_chain_height += timer_chain_truncate as u64;
217        assert_eq!(
218            inner.cur_era_genesis_timer_chain_height,
219            inner.arena[new_era_block_arena_index]
220                .data
221                .ledger_view_timer_chain_height
222        );
223        for i in 0..(inner.timer_chain.len() - timer_chain_truncate) {
224            inner.timer_chain[i] = inner.timer_chain[i + timer_chain_truncate];
225            if i + timer_chain_truncate
226                < inner.timer_chain_accumulative_lca.len()
227            {
228                inner.timer_chain_accumulative_lca[i] = inner
229                    .timer_chain_accumulative_lca[i + timer_chain_truncate];
230            }
231        }
232        inner
233            .timer_chain
234            .resize(inner.timer_chain.len() - timer_chain_truncate, 0);
235        if inner.timer_chain_accumulative_lca.len() > timer_chain_truncate {
236            inner.timer_chain_accumulative_lca.resize(
237                inner.timer_chain_accumulative_lca.len() - timer_chain_truncate,
238                0,
239            );
240        } else {
241            inner.timer_chain_accumulative_lca.clear();
242        }
243        // Move LCA to new genesis if necessary!
244        for i in 0..inner.timer_chain_accumulative_lca.len() {
245            if i < inner.inner_conf.timer_chain_beta as usize - 1
246                || !new_era_genesis_subtree
247                    .contains(&inner.timer_chain_accumulative_lca[i])
248            {
249                inner.timer_chain_accumulative_lca[i] =
250                    new_era_block_arena_index;
251            }
252        }
253
254        assert!(new_era_pivot_index < inner.pivot_chain.len());
255        inner.pivot_chain = inner.pivot_chain.split_off(new_era_pivot_index);
256        inner.pivot_chain_metadata =
257            inner.pivot_chain_metadata.split_off(new_era_pivot_index);
258        // Recompute past weight values
259        inner.pivot_chain_metadata[0].past_weight =
260            inner.block_weight(new_era_block_arena_index);
261        for i in 1..inner.pivot_chain_metadata.len() {
262            let pivot = inner.pivot_chain[i];
263            inner.pivot_chain_metadata[i].past_weight =
264                inner.pivot_chain_metadata[i - 1].past_weight
265                    + inner.total_weight_in_own_epoch(
266                        &inner.arena[pivot].data.blockset_in_own_view_of_epoch,
267                        new_era_block_arena_index,
268                    )
269                    + inner.block_weight(pivot)
270        }
271        for d in inner.pivot_chain_metadata.iter_mut() {
272            d.last_pivot_in_past_blocks
273                .retain(|v| new_era_block_arena_index_set.contains(v));
274        }
275        inner
276            .anticone_cache
277            .intersect_update(&new_era_genesis_subtree);
278
279        // Clear best_terminals_lca_caches
280        inner.best_terminals_lca_height_cache.clear();
281
282        // Clear has_timer_block_in_anticone cache
283        inner.has_timer_block_in_anticone_cache.clear();
284
285        // Chop off all link-cut-trees in the inner data structure
286        inner.split_root(new_era_block_arena_index);
287
288        inner.cur_era_genesis_block_arena_index = new_era_block_arena_index;
289        inner.cur_era_genesis_height = new_era_height;
290
291        let cur_era_hash = inner.arena[new_era_block_arena_index].hash.clone();
292        let stable_era_arena_index =
293            inner.get_pivot_block_arena_index(inner.cur_era_stable_height);
294        let stable_era_hash = inner.arena[stable_era_arena_index].hash.clone();
295
296        // This must be true given our checkpoint rule!
297        for (_, x) in &inner.invalid_block_queue {
298            assert!(new_era_block_arena_index_set.contains(x))
299        }
300
301        inner.data_man.set_cur_consensus_era_genesis_hash(
302            &cur_era_hash,
303            &stable_era_hash,
304        );
305        inner
306            .data_man
307            .new_checkpoint(new_era_height, inner.best_epoch_number());
308    }
309
310    pub fn compute_anticone_bruteforce(
311        inner: &ConsensusGraphInner, me: usize,
312    ) -> BitSet {
313        let parent = inner.arena[me].parent;
314        if parent == NULL {
315            // This is genesis, so the anticone should be empty
316            return BitSet::new();
317        }
318        let mut last_in_pivot = inner.arena[parent].data.last_pivot_in_past;
319        for referee in &inner.arena[me].referees {
320            last_in_pivot = max(
321                last_in_pivot,
322                inner.arena[*referee].data.last_pivot_in_past,
323            );
324        }
325        let mut visited = BitSet::new();
326        let mut queue = VecDeque::new();
327        queue.push_back(me);
328        visited.add(me as u32);
329        while let Some(index) = queue.pop_front() {
330            let parent = inner.arena[index].parent;
331            if parent != NULL
332                && inner.arena[parent].data.epoch_number > last_in_pivot
333                && !visited.contains(parent as u32)
334            {
335                visited.add(parent as u32);
336                queue.push_back(parent);
337            }
338            for referee in &inner.arena[index].referees {
339                if inner.arena[*referee].data.epoch_number > last_in_pivot
340                    && !visited.contains(*referee as u32)
341                {
342                    visited.add(*referee as u32);
343                    queue.push_back(*referee);
344                }
345            }
346        }
347        // Now we traverse all future of me, when adding new block, this is
348        // empty
349        queue.clear();
350        queue.push_back(me);
351        while let Some(index) = queue.pop_front() {
352            for child in &inner.arena[index].children {
353                if !visited.contains(*child as u32) {
354                    visited.add(*child as u32);
355                    queue.push_back(*child);
356                }
357            }
358            for referrer in &inner.arena[index].referrers {
359                if !visited.contains(*referrer as u32) {
360                    visited.add(*referrer as u32);
361                    queue.push_back(*referrer);
362                }
363            }
364        }
365
366        let mut anticone = BitSet::with_capacity(inner.arena.capacity() as u32);
367        for (i, node) in inner.arena.iter() {
368            if node.data.epoch_number > last_in_pivot
369                && !visited.contains(i as u32)
370                && (node.data.activated || node.data.inactive_dependency_cnt == NULL) /* We include only preactivated blocks */
371                && node.era_block != NULL
372            /* We exclude out-of-era blocks */
373            {
374                anticone.add(i as u32);
375            }
376        }
377        anticone
378    }
379
380    pub fn compute_anticone_hashset_bruteforce(
381        inner: &ConsensusGraphInner, me: usize,
382    ) -> HashSet<usize> {
383        let s =
384            ConsensusNewBlockHandler::compute_anticone_bruteforce(inner, me);
385        let mut ret = HashSet::new();
386        for index in s.iter() {
387            ret.insert(index as usize);
388        }
389        ret
390    }
391
392    /// Note that this function is not a pure computation function. It has the
393    /// sideeffect of updating all existing anticone set in the anticone
394    /// cache
395    fn compute_and_update_anticone(
396        inner: &mut ConsensusGraphInner, me: usize,
397    ) -> (BitSet, BitSet) {
398        let parent = inner.arena[me].parent;
399
400        // If we do not have the anticone of its parent, we compute it with
401        // brute force!
402        let parent_anticone_opt = inner.anticone_cache.get(parent);
403        let mut anticone;
404        if parent_anticone_opt.is_none() {
405            anticone = ConsensusNewBlockHandler::compute_anticone_bruteforce(
406                inner, me,
407            );
408        } else {
409            // anticone = parent_anticone + parent_future - my_past
410            // Compute future set of parent
411            anticone = inner.compute_future_bitset(parent);
412            anticone.remove(me as u32);
413
414            for index in parent_anticone_opt.unwrap() {
415                anticone.add(*index as u32);
416            }
417            let mut my_past = BitSet::new();
418            let mut queue: VecDeque<usize> = VecDeque::new();
419            queue.push_back(me);
420            while let Some(index) = queue.pop_front() {
421                if my_past.contains(index as u32) {
422                    continue;
423                }
424
425                debug_assert!(index != parent);
426                if index != me {
427                    my_past.add(index as u32);
428                }
429
430                let idx_parent = inner.arena[index].parent;
431                if idx_parent != NULL {
432                    if anticone.contains(idx_parent as u32)
433                        || inner.arena[idx_parent].era_block == NULL
434                    {
435                        queue.push_back(idx_parent);
436                    }
437                }
438
439                for referee in &inner.arena[index].referees {
440                    if anticone.contains(*referee as u32)
441                        || inner.arena[*referee].era_block == NULL
442                    {
443                        queue.push_back(*referee);
444                    }
445                }
446            }
447            for index in my_past.drain() {
448                anticone.remove(index);
449            }
450
451            // We only consider non-lagacy blocks when computing anticone.
452            for index in anticone.clone().iter() {
453                if inner.arena[index as usize].era_block == NULL {
454                    anticone.remove(index);
455                }
456            }
457        }
458
459        inner.anticone_cache.update(me, &anticone);
460
461        let mut anticone_barrier = BitSet::new();
462        for index in anticone.clone().iter() {
463            let parent = inner.arena[index as usize].parent as u32;
464            if !anticone.contains(parent) {
465                anticone_barrier.add(index);
466            }
467        }
468
469        debug!(
470            "Block {} anticone size {}",
471            inner.arena[me].hash,
472            anticone.len()
473        );
474
475        (anticone, anticone_barrier)
476    }
477
478    fn check_correct_parent_brutal(
479        inner: &ConsensusGraphInner, me: usize, subtree_weight: &Vec<i128>,
480        checking_candidate: Iter<usize>,
481    ) -> bool {
482        let mut valid = true;
483        let parent = inner.arena[me].parent;
484        let force_confirm = inner.arena[me].data.force_confirm;
485        let force_confirm_height = inner.arena[force_confirm].height;
486
487        // Check the pivot selection decision.
488        for consensus_arena_index_in_epoch in checking_candidate {
489            let lca = inner.lca(*consensus_arena_index_in_epoch, parent);
490            assert!(lca != *consensus_arena_index_in_epoch);
491            // If it is outside current era, we will skip!
492            if lca == NULL || inner.arena[lca].height < force_confirm_height {
493                continue;
494            }
495            if lca == parent {
496                valid = false;
497                break;
498            }
499
500            let fork = inner.ancestor_at(
501                *consensus_arena_index_in_epoch,
502                inner.arena[lca].height + 1,
503            );
504            let pivot = inner.ancestor_at(parent, inner.arena[lca].height + 1);
505
506            let fork_subtree_weight = subtree_weight[fork];
507            let pivot_subtree_weight = subtree_weight[pivot];
508
509            if ConsensusGraphInner::is_heavier(
510                (fork_subtree_weight, &inner.arena[fork].hash),
511                (pivot_subtree_weight, &inner.arena[pivot].hash),
512            ) {
513                valid = false;
514                break;
515            }
516        }
517
518        valid
519    }
520
521    fn check_correct_parent(
522        inner: &mut ConsensusGraphInner, me: usize, anticone_barrier: &BitSet,
523        weight_tuple: Option<&Vec<i128>>,
524    ) -> bool {
525        let parent = inner.arena[me].parent;
526        // FIXME: Because now we allow partial invalid blocks as parent, we need
527        // to consider more for block candidates. This may cause a
528        // performance issue and we should consider another optimized strategy.
529        let mut candidate;
530        let blockset =
531            inner.exchange_or_compute_blockset_in_own_view_of_epoch(me, None);
532        // Note that here we have to be conservative. If it is pending we have
533        // to treat it as if it is partial invalid.
534        let candidate_iter = if inner.arena[parent].data.partial_invalid
535            || inner.arena[parent].data.pending
536        {
537            candidate = blockset.clone();
538            let mut p = parent;
539            while p != NULL && inner.arena[p].data.partial_invalid
540                || inner.arena[p].data.pending
541            {
542                let blockset_p = inner
543                    .exchange_or_compute_blockset_in_own_view_of_epoch(p, None);
544                candidate.extend(blockset_p.iter());
545                inner.exchange_or_compute_blockset_in_own_view_of_epoch(
546                    p,
547                    Some(blockset_p),
548                );
549                p = inner.arena[p].parent;
550            }
551            candidate.iter()
552        } else {
553            blockset.iter()
554        };
555
556        if let Some(subtree_weight) = weight_tuple {
557            let res = ConsensusNewBlockHandler::check_correct_parent_brutal(
558                inner,
559                me,
560                subtree_weight,
561                candidate_iter,
562            );
563            // We have to put but the blockset here! Otherwise the
564            // blockset_in_own_view_of_epoch will be corrupted.
565            inner.exchange_or_compute_blockset_in_own_view_of_epoch(
566                me,
567                Some(blockset),
568            );
569            return res;
570        }
571        let mut valid = true;
572        let force_confirm = inner.arena[me].data.force_confirm;
573        let force_confirm_height = inner.arena[force_confirm].height;
574
575        let mut weight_delta = HashMap::new();
576
577        for index in anticone_barrier {
578            let delta = inner.weight_tree.get(index as usize);
579            weight_delta.insert(index as usize, delta);
580        }
581
582        // Remove weight contribution of anticone
583        for (index, delta) in &weight_delta {
584            inner.weight_tree.path_apply(*index, -delta);
585        }
586
587        // Check the pivot selection decision.
588        for consensus_arena_index_in_epoch in candidate_iter {
589            let lca = inner.lca(*consensus_arena_index_in_epoch, parent);
590            assert!(lca != *consensus_arena_index_in_epoch);
591            // If it is outside the era, we will skip!
592            if lca == NULL || inner.arena[lca].height < force_confirm_height {
593                continue;
594            }
595            if lca == parent {
596                debug!("Block invalid (index = {}), referenced block {} index {} is in the subtree of parent block {} index {}!", me, inner.arena[*consensus_arena_index_in_epoch].hash, *consensus_arena_index_in_epoch, inner.arena[parent].hash, parent);
597                valid = false;
598                break;
599            }
600
601            let fork = inner.ancestor_at(
602                *consensus_arena_index_in_epoch,
603                inner.arena[lca].height + 1,
604            );
605            let pivot = inner.ancestor_at(parent, inner.arena[lca].height + 1);
606
607            let fork_subtree_weight = inner.weight_tree.get(fork);
608            let pivot_subtree_weight = inner.weight_tree.get(pivot);
609
610            if ConsensusGraphInner::is_heavier(
611                (fork_subtree_weight, &inner.arena[fork].hash),
612                (pivot_subtree_weight, &inner.arena[pivot].hash),
613            ) {
614                debug!("Block invalid (index = {}), referenced block {} index {} fork is heavier than the parent block {} index {} fork! Ref fork block {} weight {}, parent fork block {} weight {}!",
615                       me, inner.arena[*consensus_arena_index_in_epoch].hash, *consensus_arena_index_in_epoch, inner.arena[parent].hash, parent,
616                       inner.arena[fork].hash, fork_subtree_weight, inner.arena[pivot].hash, pivot_subtree_weight);
617                valid = false;
618                break;
619            } else {
620                trace!("Pass one validity check, block index = {}. Referenced block {} index {} fork is not heavier than the parent block {} index {} fork. Ref fork block {} weight {}, parent fork block {} weight {}!",
621                       me, inner.arena[*consensus_arena_index_in_epoch].hash, *consensus_arena_index_in_epoch, inner.arena[parent].hash, parent,
622                       inner.arena[fork].hash, fork_subtree_weight, inner.arena[pivot].hash, pivot_subtree_weight);
623            }
624        }
625
626        inner.exchange_or_compute_blockset_in_own_view_of_epoch(
627            me,
628            Some(blockset),
629        );
630
631        for (index, delta) in &weight_delta {
632            inner.weight_tree.path_apply(*index, *delta);
633        }
634
635        valid
636    }
637
638    fn check_block_full_validity(
639        &self, new: usize, inner: &mut ConsensusGraphInner, adaptive: bool,
640        anticone_barrier: &BitSet, weight_tuple: Option<&Vec<i128>>,
641    ) -> bool {
642        let parent = inner.arena[new].parent;
643        let force_confirm = inner.arena[new].data.force_confirm;
644
645        if inner.lca(parent, force_confirm) != force_confirm {
646            warn!("Partially invalid due to picking incorrect parent (force confirmation {:?} violation). {:?}", force_confirm, inner.arena[new].hash);
647            return false;
648        }
649
650        // Check whether the new block select the correct parent block
651        if !ConsensusNewBlockHandler::check_correct_parent(
652            inner,
653            new,
654            anticone_barrier,
655            weight_tuple,
656        ) {
657            warn!(
658                "Partially invalid due to picking incorrect parent. {:?}",
659                inner.arena[new].hash
660            );
661            return false;
662        }
663
664        // Check whether difficulty is set correctly
665        if inner.arena[new].difficulty
666            != inner.expected_difficulty(&inner.arena[parent].hash)
667        {
668            warn!(
669                "Partially invalid due to wrong difficulty. {:?}",
670                inner.arena[new].hash
671            );
672            return false;
673        }
674
675        // Check adaptivity match. Note that in bench mode we do not check
676        // the adaptive field correctness. We simply override its value
677        // with the right one.
678        if !self.conf.bench_mode {
679            if inner.arena[new].adaptive != adaptive {
680                warn!(
681                    "Partially invalid due to invalid adaptive field. {:?}",
682                    inner.arena[new].hash
683                );
684                return false;
685            }
686        }
687
688        // Check if `new` is in the subtree of its pos reference.
689        if self
690            .pos_verifier
691            .is_enabled_at_height(inner.arena[new].height)
692        {
693            let pivot_decision = inner
694                .get_pos_reference_pivot_decision(&inner.arena[new].hash)
695                .expect("pos reference checked");
696            match inner.hash_to_arena_indices.get(&pivot_decision) {
697                // Pivot decision is before checkpoint or fake.
698                // Check if it's on the pivot chain.
699                None => {
700                    warn!("Possibly partial invalid due to pos_reference's pivot decision not in consensus graph");
701                    return inner.pivot_block_processed(&pivot_decision);
702                }
703                Some(pivot_decision_arena_index) => {
704                    if inner.lca(new, *pivot_decision_arena_index)
705                        != *pivot_decision_arena_index
706                    {
707                        warn!("Partial invalid due to not in the subtree of pos_reference's pivot decision");
708                        // Not in the subtree of pivot_decision, mark as partial
709                        // invalid.
710                        return false;
711                    }
712                }
713            }
714        }
715
716        return true;
717    }
718
719    #[inline]
720    /// Subroutine called by on_new_block()
721    fn update_lcts_initial(&self, inner: &mut ConsensusGraphInner, me: usize) {
722        let parent = inner.arena[me].parent;
723
724        inner.weight_tree.make_tree(me);
725        inner.weight_tree.link(parent, me);
726
727        inner.adaptive_tree.make_tree(me);
728        inner.adaptive_tree.link(parent, me);
729    }
730
731    #[inline]
732    /// Subroutine called by on_new_block()
733    fn update_lcts_finalize(&self, inner: &mut ConsensusGraphInner, me: usize) {
734        let parent = inner.arena[me].parent;
735        let parent_tw = inner.weight_tree.get(parent);
736        let parent_w = inner.block_weight(parent);
737        inner.adaptive_tree.set(me, -parent_tw + parent_w);
738
739        let weight = inner.block_weight(me);
740        inner.weight_tree.path_apply(me, weight);
741        inner.adaptive_tree.path_apply(me, 2 * weight);
742        inner.adaptive_tree.caterpillar_apply(parent, -weight);
743    }
744
745    fn recycle_tx_in_block(
746        &self, inner: &ConsensusGraphInner, block_hash: &H256,
747    ) {
748        info!("recycle_tx_in_block: block_hash={:?}", block_hash);
749        if let Some(block) = inner
750            .data_man
751            .block_by_hash(block_hash, true /* update_cache */)
752        {
753            self.txpool.recycle_transactions(block.transactions.clone());
754        } else {
755            // This should only happen for blocks in the anticone of
756            // checkpoints.
757            warn!("recycle_tx_in_block: block {:?} not in db", block_hash);
758        }
759    }
760
761    fn should_move_stable_height(
762        &self, inner: &mut ConsensusGraphInner,
763    ) -> u64 {
764        if let Some(sync_state_starting_epoch) =
765            self.conf.sync_state_starting_epoch
766        {
767            if inner.header_only
768                && inner.cur_era_stable_height == sync_state_starting_epoch
769            {
770                // We want to use sync_state_starting_epoch as our stable
771                // checkpoint when we enter
772                // CatchUpCheckpointPhase, so we do not want to move forward our
773                // stable checkpoint. Since we will enter
774                // CatchUpCheckpointPhase the next time we check phase changes,
775                // it's impossible for the delayed checkpoint making to cause
776                // OOM.
777                return inner.cur_era_stable_height;
778            }
779        }
780        let new_stable_height =
781            inner.cur_era_stable_height + inner.inner_conf.era_epoch_count;
782        // We make sure there is an additional era before the best for moving it
783        if new_stable_height + inner.inner_conf.era_epoch_count
784            > inner.best_epoch_number()
785        {
786            return inner.cur_era_stable_height;
787        }
788        let new_stable_pivot_arena_index =
789            inner.get_pivot_block_arena_index(new_stable_height);
790        // Now we need to make sure that this new stable block is
791        // force_confirmed in our current graph
792        if inner.timer_chain_accumulative_lca.len() == 0 {
793            return inner.cur_era_stable_height;
794        }
795        if let Some(last) = inner.timer_chain_accumulative_lca.last() {
796            let lca = inner.lca(*last, new_stable_pivot_arena_index);
797            if lca == new_stable_pivot_arena_index {
798                return new_stable_height;
799            }
800        }
801        return inner.cur_era_stable_height;
802    }
803
804    fn should_form_checkpoint_at(
805        &self, inner: &mut ConsensusGraphInner,
806    ) -> usize {
807        let stable_pivot_block =
808            inner.get_pivot_block_arena_index(inner.cur_era_stable_height);
809        let mut new_genesis_height =
810            inner.cur_era_genesis_height + inner.inner_conf.era_epoch_count;
811
812        // FIXME: Here is a chicken and egg problem. In our full node sync
813        // FIXME: logic, we first run consensus on headers to determine
814        // FIXME: the checkpoint location. And then run the full blocks.
815        // FIXME: However, when we do not have the body, we cannot faithfully
816        // FIXME: check this condition. The consequence is that if
817        // FIXME: attacker managed to generate a lot blame blocks. New full
818        // FIXME: nodes will not correctly determine the safe checkpoint
819        // FIXME: location to start the sync. Causing potential panic
820        // FIXME: when computing `state_valid` and `blame_info`.
821        if !inner.header_only && !self.conf.bench_mode {
822            // Stable block must have a blame vector that does not stretch
823            // beyond the new genesis
824            if !inner.arena[stable_pivot_block].data.state_valid.unwrap() {
825                if inner.arena[stable_pivot_block]
826                    .data
827                    .blame_info
828                    .unwrap()
829                    .blame as u64
830                    + new_genesis_height
831                    + DEFERRED_STATE_EPOCH_COUNT
832                    >= inner.cur_era_stable_height
833                {
834                    return inner.cur_era_genesis_block_arena_index;
835                }
836            }
837        }
838
839        // We cannot move beyond the stable block/height
840        'out: while new_genesis_height < inner.cur_era_stable_height {
841            let new_genesis_block_arena_index =
842                inner.get_pivot_block_arena_index(new_genesis_height);
843            assert!(inner.arena[stable_pivot_block].data.force_confirm != NULL);
844            if inner.lca(
845                new_genesis_block_arena_index,
846                inner.arena[stable_pivot_block].data.force_confirm,
847            ) != new_genesis_block_arena_index
848            {
849                // All following era genesis candidates are on the same fork,
850                // so they are not force_confirmed by stable now.
851                return inner.cur_era_genesis_block_arena_index;
852            }
853
854            // Because the timer chain is unlikely to reorganize at this point.
855            // We will just skip this height if we found timer block
856            // in its anticone before.
857            if inner
858                .has_timer_block_in_anticone_cache
859                .contains(&new_genesis_block_arena_index)
860            {
861                new_genesis_height += inner.inner_conf.era_epoch_count;
862                continue 'out;
863            }
864
865            // Now we need to make sure that no timer chain block is in the
866            // anticone of the new genesis. This is required for our
867            // checkpoint algorithm.
868            let mut visited = BitSet::new();
869            let mut queue = VecDeque::new();
870            queue.push_back(new_genesis_block_arena_index);
871            visited.add(new_genesis_block_arena_index as u32);
872            while let Some(x) = queue.pop_front() {
873                for child in &inner.arena[x].children {
874                    if !visited.contains(*child as u32) {
875                        visited.add(*child as u32);
876                        queue.push_back(*child);
877                    }
878                }
879                for referrer in &inner.arena[x].referrers {
880                    if !visited.contains(*referrer as u32) {
881                        visited.add(*referrer as u32);
882                        queue.push_back(*referrer);
883                    }
884                }
885            }
886            let start_timer_chain_height = inner.arena
887                [new_genesis_block_arena_index]
888                .data
889                .ledger_view_timer_chain_height;
890            let start_timer_chain_index = (start_timer_chain_height
891                - inner.cur_era_genesis_timer_chain_height)
892                as usize;
893            for i in start_timer_chain_index..inner.timer_chain.len() {
894                if !visited.contains(inner.timer_chain[i] as u32) {
895                    inner
896                        .has_timer_block_in_anticone_cache
897                        .insert(new_genesis_block_arena_index);
898                    // This era genesis candidate has a timer chain block in its
899                    // anticone, so we move to check the next one.
900                    new_genesis_height += inner.inner_conf.era_epoch_count;
901                    continue 'out;
902                }
903            }
904            return new_genesis_block_arena_index;
905        }
906        // We cannot make a new checkpoint.
907        inner.cur_era_genesis_block_arena_index
908    }
909
910    fn persist_terminals(&self, inner: &ConsensusGraphInner) {
911        let mut terminals = Vec::with_capacity(inner.terminal_hashes.len());
912        for h in &inner.terminal_hashes {
913            terminals.push(h.clone());
914        }
915        self.data_man.insert_terminals_to_db(terminals);
916    }
917
918    fn try_clear_blockset_in_own_view_of_epoch(
919        inner: &mut ConsensusGraphInner, me: usize,
920    ) {
921        if inner.arena[me].data.blockset_in_own_view_of_epoch.len() as u64
922            > BLOCKSET_IN_OWN_VIEW_OF_EPOCH_CAP
923        {
924            inner.arena[me].data.blockset_in_own_view_of_epoch =
925                Default::default();
926            inner.arena[me].data.skipped_epoch_blocks = Default::default();
927            inner.arena[me].data.blockset_cleared = true;
928        }
929    }
930
931    // This function computes the timer chain in the view of the new block.
932    // The first returned value is the fork height of the timer chain.
933    // The second is a map that overwrites timer_chain_height values after the
934    // fork height.
935    fn compute_timer_chain_tuple(
936        inner: &ConsensusGraphInner, me: usize, anticone: &BitSet,
937    ) -> (u64, HashMap<usize, u64>, Vec<usize>, Vec<usize>) {
938        inner.compute_timer_chain_tuple(
939            inner.arena[me].parent,
940            &inner.arena[me].referees,
941            Some(anticone),
942        )
943    }
944
945    fn compute_invalid_block_start_timer(
946        &self, inner: &ConsensusGraphInner, me: usize,
947    ) -> u64 {
948        let last_index =
949            inner.arena[me].data.past_view_last_timer_block_arena_index;
950        if last_index == NULL {
951            inner.inner_conf.timer_chain_beta
952        } else {
953            inner.arena[last_index].data.ledger_view_timer_chain_height
954                + inner.inner_conf.timer_chain_beta
955                + if inner.get_timer_chain_index(last_index) != NULL {
956                    1
957                } else {
958                    0
959                }
960        }
961    }
962
963    fn preactivate_block(
964        &self, inner: &mut ConsensusGraphInner, me: usize,
965    ) -> BlockStatus {
966        debug!(
967            "Start to preactivate block {} index = {}",
968            inner.arena[me].hash, me
969        );
970        let parent = inner.arena[me].parent;
971        let mut pending = {
972            if let Some(f) = inner.initial_stable_future.as_mut() {
973                let mut in_future = false;
974                if inner.arena[me].hash == inner.cur_era_stable_block_hash {
975                    in_future = true;
976                }
977                if parent != NULL && f.contains(parent as u32) {
978                    in_future = true;
979                }
980                if !in_future {
981                    for referee in &inner.arena[me].referees {
982                        if f.contains(*referee as u32) {
983                            in_future = true;
984                            break;
985                        }
986                    }
987                }
988                if in_future {
989                    f.add(me as u32);
990                }
991                !in_future
992            } else {
993                let mut last_pivot_in_past = if parent != NULL {
994                    inner.arena[parent].data.last_pivot_in_past
995                } else {
996                    inner.cur_era_genesis_height
997                };
998                for referee in &inner.arena[me].referees {
999                    last_pivot_in_past = max(
1000                        last_pivot_in_past,
1001                        inner.arena[*referee].data.last_pivot_in_past,
1002                    );
1003                }
1004                last_pivot_in_past < inner.cur_era_stable_height
1005            }
1006        };
1007
1008        // Because the following computation relies on all previous blocks being
1009        // active, We have to delay it till now
1010        let (timer_longest_difficulty, last_timer_block_arena_index) = inner
1011            .compute_timer_chain_past_view_info(
1012                parent,
1013                &inner.arena[me].referees,
1014            );
1015
1016        inner.arena[me].data.past_view_timer_longest_difficulty =
1017            timer_longest_difficulty;
1018        inner.arena[me].data.past_view_last_timer_block_arena_index =
1019            last_timer_block_arena_index;
1020
1021        inner.arena[me].data.force_confirm =
1022            inner.cur_era_genesis_block_arena_index;
1023
1024        let fully_valid;
1025
1026        // Note that this function also updates the anticone for other nodes, so
1027        // we have to call it even for pending blocks!
1028        let (anticone, anticone_barrier) =
1029            ConsensusNewBlockHandler::compute_and_update_anticone(inner, me);
1030
1031        if !pending {
1032            let timer_chain_tuple =
1033                ConsensusNewBlockHandler::compute_timer_chain_tuple(
1034                    inner, me, &anticone,
1035                );
1036
1037            inner.arena[me].data.force_confirm = inner
1038                .compute_block_force_confirm(
1039                    &timer_chain_tuple,
1040                    self.data_man
1041                        .pos_reference_by_hash(&inner.arena[me].hash)
1042                        .expect("header exist"),
1043                );
1044            debug!(
1045                "Force confirm block index {} in the past view of block index={}",
1046                inner.arena[me].data.force_confirm, me
1047            );
1048
1049            let weight_tuple = if anticone_barrier.len() >= ANTICONE_BARRIER_CAP
1050            {
1051                Some(inner.compute_subtree_weights(me, &anticone_barrier))
1052            } else {
1053                None
1054            };
1055
1056            let adaptive = inner.adaptive_weight(
1057                me,
1058                &anticone_barrier,
1059                weight_tuple.as_ref(),
1060                &timer_chain_tuple,
1061            );
1062
1063            fully_valid = self.check_block_full_validity(
1064                me,
1065                inner,
1066                adaptive,
1067                &anticone_barrier,
1068                weight_tuple.as_ref(),
1069            );
1070
1071            if self.conf.bench_mode && fully_valid {
1072                inner.arena[me].adaptive = adaptive;
1073            }
1074        } else {
1075            let block_status_in_db = self
1076                .data_man
1077                .local_block_info_by_hash(&inner.arena[me].hash)
1078                .map(|info| info.get_status())
1079                .unwrap_or(BlockStatus::Pending);
1080            fully_valid = block_status_in_db != BlockStatus::PartialInvalid;
1081            pending = block_status_in_db == BlockStatus::Pending;
1082            debug!(
1083                "Fetch the block validity status {} from the local data base",
1084                fully_valid
1085            );
1086        }
1087
1088        debug!(
1089            "Finish preactivation block {} index = {}",
1090            inner.arena[me].hash, me
1091        );
1092        let block_status = if pending {
1093            BlockStatus::Pending
1094        } else if fully_valid {
1095            BlockStatus::Valid
1096        } else {
1097            BlockStatus::PartialInvalid
1098        };
1099        self.persist_block_info(inner, me, block_status);
1100
1101        block_status
1102    }
1103
1104    fn activate_block(
1105        &self, inner: &mut ConsensusGraphInner, me: usize,
1106        meter: &ConfirmationMeter, queue: &mut VecDeque<usize>,
1107    ) {
1108        inner.arena[me].data.activated = true;
1109        self.statistics.inc_consensus_graph_activated_block_count();
1110        let mut succ_list = inner.arena[me].children.clone();
1111        succ_list.extend(inner.arena[me].referrers.iter());
1112        for succ in &succ_list {
1113            assert!(inner.arena[*succ].data.inactive_dependency_cnt > 0);
1114            inner.arena[*succ].data.inactive_dependency_cnt -= 1;
1115            if inner.arena[*succ].data.inactive_dependency_cnt == 0 {
1116                queue.push_back(*succ);
1117            }
1118        }
1119        // The above is the only thing we need to do for out-of-era blocks
1120        // so for these blocks, we quit here.
1121        if inner.arena[me].era_block == NULL {
1122            debug!(
1123                "Updated active counters for out-of-era block in ConsensusGraph: index = {:?} hash={:?}",
1124                me, inner.arena[me].hash,
1125            );
1126            return;
1127        } else {
1128            debug!(
1129                "Start activating block in ConsensusGraph: index = {:?} hash={:?} height={:?}",
1130                me, inner.arena[me].hash, inner.arena[me].height,
1131            );
1132        }
1133
1134        let parent = inner.arena[me].parent;
1135        // Update terminal hashes for mining
1136        if parent != NULL {
1137            inner.terminal_hashes.remove(&inner.arena[parent].hash);
1138        }
1139        inner.terminal_hashes.insert(inner.arena[me].hash.clone());
1140        for referee in &inner.arena[me].referees {
1141            inner.terminal_hashes.remove(&inner.arena[*referee].hash);
1142        }
1143
1144        self.update_lcts_finalize(inner, me);
1145        let my_weight = inner.block_weight(me);
1146        let mut extend_pivot = false;
1147        let mut pivot_changed = false;
1148        // ``fork_at`` stores the first pivot chain height that we need to
1149        // update (because of the new inserted block). If the new block
1150        // extends the pivot chain, ``fork_at`` will equal to the new pivot
1151        // chain height (end of the pivot chain).
1152        let mut fork_at;
1153        let old_pivot_chain_len = inner.pivot_chain.len();
1154
1155        // Update consensus inner with a possibly new pos_reference.
1156        inner.update_pos_pivot_decision(me);
1157
1158        // Now we are going to maintain the timer chain.
1159        let diff = inner.arena[me].data.past_view_timer_longest_difficulty
1160            + inner.get_timer_difficulty(me);
1161        if inner.arena[me].is_timer
1162            && !inner.arena[me].data.partial_invalid
1163            && ConsensusGraphInner::is_heavier(
1164                (diff, &inner.arena[me].hash),
1165                (
1166                    inner.best_timer_chain_difficulty,
1167                    &inner.best_timer_chain_hash,
1168                ),
1169            )
1170        {
1171            inner.best_timer_chain_difficulty = diff;
1172            inner.best_timer_chain_hash = inner.arena[me].hash.clone();
1173            inner.update_timer_chain(me);
1174            // Now we go over every element in the ``invalid_block_queue``
1175            // because their timer may change.
1176            if !self.conf.bench_mode {
1177                let mut new_block_queue = BinaryHeap::new();
1178                for (_, x) in &inner.invalid_block_queue {
1179                    let timer =
1180                        self.compute_invalid_block_start_timer(inner, *x);
1181                    new_block_queue.push((-(timer as i128), *x));
1182                    debug!(
1183                        "Partial invalid Block {} (hash = {}) start timer is now {}",
1184                        *x, inner.arena[*x].hash, timer
1185                    );
1186                }
1187                inner.invalid_block_queue = new_block_queue;
1188            }
1189        } else {
1190            let mut timer_chain_height =
1191                inner.arena[parent].data.ledger_view_timer_chain_height;
1192            if inner.get_timer_chain_index(parent) != NULL {
1193                timer_chain_height += 1;
1194            }
1195            for referee in &inner.arena[me].referees {
1196                let timer_bit = if inner.get_timer_chain_index(*referee) != NULL
1197                {
1198                    1
1199                } else {
1200                    0
1201                };
1202                if inner.arena[*referee].data.ledger_view_timer_chain_height
1203                    + timer_bit
1204                    > timer_chain_height
1205                {
1206                    timer_chain_height = inner.arena[*referee]
1207                        .data
1208                        .ledger_view_timer_chain_height
1209                        + timer_bit;
1210                }
1211            }
1212            inner.arena[me].data.ledger_view_timer_chain_height =
1213                timer_chain_height;
1214        }
1215
1216        meter.aggregate_total_weight_in_past(my_weight);
1217        let force_confirm = inner.compute_global_force_confirm();
1218        let force_height = inner.arena[force_confirm].height;
1219        let last = inner.pivot_chain.last().cloned().unwrap();
1220        let force_lca = inner.lca(force_confirm, last);
1221
1222        if force_lca == force_confirm && inner.arena[me].parent == last {
1223            let me_height = inner.arena[me].height;
1224            let me_hash = inner.arena[me].hash;
1225            let allow_extend = self
1226                .pivot_hint
1227                .as_ref()
1228                .map_or(true, |hint| hint.allow_extend(me_height, me_hash));
1229            if allow_extend {
1230                inner.pivot_chain.push(me);
1231                inner.set_epoch_number_in_epoch(
1232                    me,
1233                    inner.pivot_index_to_height(inner.pivot_chain.len()) - 1,
1234                );
1235                inner.pivot_chain_metadata.push(Default::default());
1236                extend_pivot = true;
1237                pivot_changed = true;
1238                fork_at = inner.pivot_index_to_height(old_pivot_chain_len);
1239            } else {
1240                debug!("Chain extend rejected by pivot hint: height={me_height}, hash={me_hash:?}");
1241                fork_at = inner.pivot_index_to_height(old_pivot_chain_len);
1242            }
1243        } else {
1244            let lca = inner.lca(last, me);
1245            let new;
1246            if self.pivot_hint.is_some() && lca == last {
1247                // If pivot hint is enabled, `me` could be an extend of the
1248                // pivot chain, but its parent block is not on the pivot chain.
1249                // This special case can only happen
1250                debug!("Chain extend rejected by pivot hint because parent is rejected.");
1251                fork_at = inner.pivot_index_to_height(old_pivot_chain_len);
1252                // In this case, `pivot_changed` is false. So `new` can be
1253                // aribitrary value.
1254                new = 0;
1255            } else if force_confirm != force_lca {
1256                debug!(
1257                    "pivot chain switch to force_confirm={} force_height={}",
1258                    force_confirm, force_height
1259                );
1260                fork_at = inner.arena[force_lca].height + 1;
1261                new = inner.ancestor_at(force_confirm, fork_at);
1262                pivot_changed = true;
1263            } else {
1264                fork_at = inner.arena[lca].height + 1;
1265                let prev = inner.get_pivot_block_arena_index(fork_at);
1266                let prev_weight = inner.weight_tree.get(prev);
1267                new = inner.ancestor_at(me, fork_at);
1268                let new_weight = inner.weight_tree.get(new);
1269
1270                let me_height = inner.arena[me].height;
1271                let me_ancestor_hash_at =
1272                    |height| inner.arena[inner.ancestor_at(me, height)].hash;
1273
1274                // Note that for properly set consensus parameters, fork_at will
1275                // always after the force_height (i.e., the
1276                // force confirmation is always stable).
1277                // But during testing, we may want to stress the consensus.
1278                // Therefore we add this condition fork_at >
1279                // force_height to maintain consistency.
1280                if fork_at > force_height
1281                    && ConsensusGraphInner::is_heavier(
1282                        (new_weight, &inner.arena[new].hash),
1283                        (prev_weight, &inner.arena[prev].hash),
1284                    )
1285                    && self.pivot_hint.as_ref().map_or(true, |hint| {
1286                        hint.allow_switch(
1287                            fork_at,
1288                            me_height,
1289                            me_ancestor_hash_at,
1290                        )
1291                    })
1292                {
1293                    pivot_changed = true;
1294                } else {
1295                    // The previous subtree is still heavier, nothing is
1296                    // updated
1297                    debug!("Old pivot chain is heavier, pivot chain unchanged");
1298                    fork_at = inner.pivot_index_to_height(old_pivot_chain_len);
1299                }
1300            }
1301            if pivot_changed {
1302                // The new subtree is heavier, update pivot chain
1303                let fork_pivot_index = inner.height_to_pivot_index(fork_at);
1304                assert!(fork_pivot_index < inner.pivot_chain.len());
1305                for discarded_idx in
1306                    inner.pivot_chain.split_off(fork_pivot_index)
1307                {
1308                    // Reset the epoch_number of the discarded fork
1309                    inner.reset_epoch_number_in_epoch(discarded_idx);
1310                    ConsensusNewBlockHandler::try_clear_blockset_in_own_view_of_epoch(inner,
1311                    discarded_idx);
1312                }
1313                let mut u = new;
1314                loop {
1315                    inner.compute_blockset_in_own_view_of_epoch(u);
1316                    inner.pivot_chain.push(u);
1317                    inner.set_epoch_number_in_epoch(
1318                        u,
1319                        inner.pivot_index_to_height(inner.pivot_chain.len())
1320                            - 1,
1321                    );
1322                    if inner.arena[u].height >= force_height {
1323                        let mut heaviest = NULL;
1324                        let mut heaviest_weight = 0;
1325                        for index in &inner.arena[u].children {
1326                            if !inner.arena[*index].data.activated {
1327                                continue;
1328                            }
1329                            let weight = inner.weight_tree.get(*index);
1330                            if heaviest == NULL
1331                                || ConsensusGraphInner::is_heavier(
1332                                    (weight, &inner.arena[*index].hash),
1333                                    (
1334                                        heaviest_weight,
1335                                        &inner.arena[heaviest].hash,
1336                                    ),
1337                                )
1338                            {
1339                                heaviest = *index;
1340                                heaviest_weight = weight;
1341                            }
1342                        }
1343                        if heaviest == NULL {
1344                            break;
1345                        }
1346                        u = heaviest;
1347                    } else {
1348                        u = inner.ancestor_at(
1349                            force_confirm,
1350                            inner.arena[u].height + 1,
1351                        );
1352                    }
1353                }
1354            }
1355        };
1356        debug!(
1357            "Forked at height {}, fork parent block {}",
1358            fork_at,
1359            &inner.arena[inner.get_pivot_block_arena_index(fork_at - 1)].hash,
1360        );
1361
1362        // Now compute last_pivot_in_block and update pivot_metadata.
1363        // Note that we need to do this for partially invalid blocks to
1364        // propagate information!
1365        if !extend_pivot {
1366            let update_at = fork_at - 1;
1367            let mut last_pivot_to_update = HashSet::new();
1368            last_pivot_to_update.insert(me);
1369            if pivot_changed {
1370                inner.best_terminals_reorg_height =
1371                    min(inner.best_terminals_reorg_height, update_at);
1372                let update_pivot_index = inner.height_to_pivot_index(update_at);
1373                for pivot_index in update_pivot_index..old_pivot_chain_len {
1374                    for x in &inner.pivot_chain_metadata[pivot_index]
1375                        .last_pivot_in_past_blocks
1376                    {
1377                        last_pivot_to_update.insert(*x);
1378                    }
1379                }
1380                inner.recompute_metadata(fork_at, last_pivot_to_update);
1381            } else {
1382                // pivot chain not extend and not change
1383                ConsensusNewBlockHandler::try_clear_blockset_in_own_view_of_epoch(inner, me);
1384                inner.recompute_metadata(
1385                    inner.get_pivot_height(),
1386                    last_pivot_to_update,
1387                );
1388            }
1389        } else {
1390            let height = inner.arena[me].height;
1391            inner.arena[me].data.last_pivot_in_past = height;
1392            let pivot_index = inner.height_to_pivot_index(height);
1393            inner.pivot_chain_metadata[pivot_index]
1394                .last_pivot_in_past_blocks
1395                .insert(me);
1396            let blockset = inner
1397                .exchange_or_compute_blockset_in_own_view_of_epoch(me, None);
1398            inner.pivot_chain_metadata[pivot_index].past_weight =
1399                inner.pivot_chain_metadata[pivot_index - 1].past_weight
1400                    + inner.total_weight_in_own_epoch(
1401                        &blockset,
1402                        inner.cur_era_genesis_block_arena_index,
1403                    )
1404                    + inner.block_weight(me);
1405            inner.exchange_or_compute_blockset_in_own_view_of_epoch(
1406                me,
1407                Some(blockset),
1408            );
1409        }
1410
1411        // Only process blocks in the subtree of stable
1412        if (inner.arena[me].height <= inner.cur_era_stable_height
1413            || (inner.arena[me].height > inner.cur_era_stable_height
1414                && inner.arena
1415                    [inner.ancestor_at(me, inner.cur_era_stable_height)]
1416                .hash
1417                    != inner.cur_era_stable_block_hash))
1418            && !self.conf.bench_mode
1419        {
1420            self.persist_terminals(inner);
1421            if pivot_changed {
1422                // If we switch to a chain without stable block,
1423                // we should avoid execute unavailable states.
1424                // TODO It is handled by processing
1425                // `state_availability_boundary` at the end,
1426                // we can probably refactor to move that part of code before
1427                // this skip and remove this special case.
1428                self.data_man
1429                    .state_availability_boundary
1430                    .write()
1431                    .optimistic_executed_height = None;
1432            }
1433            debug!(
1434                "Finish activating block in ConsensusGraph: index={:?} hash={:?},\
1435                 block is not in the subtree of stable",
1436                me, inner.arena[me].hash
1437            );
1438            return;
1439        }
1440        // Note that only pivot chain height after the capped_fork_at needs to
1441        // update execution state.
1442        let capped_fork_at = max(inner.cur_era_stable_height + 1, fork_at);
1443
1444        inner.adjust_difficulty(*inner.pivot_chain.last().expect("not empty"));
1445        if me % CONFIRMATION_METER_UPDATE_FREQUENCY == 0 || pivot_changed {
1446            meter.update_confirmation_risks(inner);
1447        }
1448
1449        if pivot_changed {
1450            if inner.pivot_chain.len() > EPOCH_SET_PERSISTENCE_DELAY as usize {
1451                let capped_fork_at_pivot_index =
1452                    inner.height_to_pivot_index(capped_fork_at);
1453                // Starting from old_len ensures that all epochs within
1454                // [old_len - delay, new_len - delay) will be inserted to db, so
1455                // no epochs will be skipped. Starting from
1456                // fork_at ensures that any epoch set change will be
1457                // overwritten.
1458                let start_pivot_index = if old_pivot_chain_len
1459                    >= EPOCH_SET_PERSISTENCE_DELAY as usize
1460                {
1461                    min(
1462                        capped_fork_at_pivot_index,
1463                        old_pivot_chain_len
1464                            - EPOCH_SET_PERSISTENCE_DELAY as usize,
1465                    )
1466                } else {
1467                    capped_fork_at_pivot_index
1468                };
1469                let to_persist_pivot_index = inner.pivot_chain.len()
1470                    - EPOCH_SET_PERSISTENCE_DELAY as usize;
1471                for pivot_index in start_pivot_index..to_persist_pivot_index {
1472                    inner.persist_epoch_set_hashes(pivot_index);
1473                }
1474            }
1475        }
1476
1477        // Note that after the checkpoint (if happens), the old_pivot_chain_len
1478        // value will become obsolete
1479        let old_pivot_chain_height =
1480            inner.pivot_index_to_height(old_pivot_chain_len);
1481
1482        if inner.best_epoch_number() > inner.cur_era_stable_height
1483            && inner.arena
1484                [inner.get_pivot_block_arena_index(inner.cur_era_stable_height)]
1485            .hash
1486                == inner.cur_era_stable_block_hash
1487        {
1488            let new_stable_height = self.should_move_stable_height(inner);
1489            if inner.cur_era_stable_height != new_stable_height {
1490                inner.cur_era_stable_height = new_stable_height;
1491                let stable_arena_index =
1492                    inner.get_pivot_block_arena_index(new_stable_height);
1493
1494                // Ensure all blocks on the pivot chain before
1495                // the new stable block to have state_valid computed
1496                if !inner.header_only && !self.conf.bench_mode {
1497                    // FIXME: this asserion doesn't hold any more
1498                    // assert!(
1499                    //     new_stable_height
1500                    //         >= inner
1501                    //             .data_man
1502                    //             .state_availability_boundary
1503                    //             .read()
1504                    //             .lower_bound
1505                    // );
1506                    // If new_era_genesis should have available state,
1507                    // make sure state execution is finished before setting
1508                    // lower_bound
1509                    // to the new_checkpoint_era_genesis.
1510                    self.executor
1511                        .wait_for_result(inner.arena[stable_arena_index].hash)
1512                        .expect(
1513                            "Execution state of the pivot chain is corrupted!",
1514                        );
1515                    inner
1516                        .compute_state_valid_and_blame_info(
1517                            stable_arena_index,
1518                            &self.executor,
1519                        )
1520                        .expect(
1521                            "New stable node should have available state_valid",
1522                        );
1523                }
1524
1525                let genesis_hash =
1526                    &inner.arena[inner.cur_era_genesis_block_arena_index].hash;
1527                let stable_hash = &inner.arena[stable_arena_index].hash;
1528                inner.cur_era_stable_block_hash = stable_hash.clone();
1529                inner.data_man.set_cur_consensus_era_genesis_hash(
1530                    genesis_hash,
1531                    stable_hash,
1532                );
1533                inner.initial_stable_future = None;
1534                debug!(
1535                    "Move era stable genesis to height={} hash={:?}",
1536                    new_stable_height, stable_hash
1537                );
1538            }
1539        }
1540
1541        // We are only going to check the checkpoint movement after the stable
1542        // is on the pivot chain (will not always be true during the recovery).
1543        // The code inside assumes this assumption.
1544        if inner.cur_era_stable_height < inner.best_epoch_number()
1545            && inner.arena
1546                [inner.get_pivot_block_arena_index(inner.cur_era_stable_height)]
1547            .hash
1548                == inner.cur_era_stable_block_hash
1549        {
1550            let new_checkpoint_era_genesis =
1551                self.should_form_checkpoint_at(inner);
1552            if new_checkpoint_era_genesis
1553                != inner.cur_era_genesis_block_arena_index
1554            {
1555                info!(
1556                    "Working on new checkpoint, old checkpoint block {} height {}",
1557                    &inner.arena[inner.cur_era_genesis_block_arena_index].hash,
1558                    inner.cur_era_genesis_height
1559                );
1560
1561                ConsensusNewBlockHandler::make_checkpoint_at(
1562                    inner,
1563                    new_checkpoint_era_genesis,
1564                );
1565                let stable_era_genesis_arena_index =
1566                    inner.ancestor_at(me, inner.cur_era_stable_height);
1567                meter.reset_for_checkpoint(
1568                    inner.weight_tree.get(stable_era_genesis_arena_index),
1569                    inner.cur_era_stable_height,
1570                );
1571                meter.update_confirmation_risks(inner);
1572                info!(
1573                    "New checkpoint formed at block {} stable block {} height {}",
1574                    &inner.arena[inner.cur_era_genesis_block_arena_index].hash,
1575                    &inner.arena[stable_era_genesis_arena_index].hash,
1576                    inner.cur_era_genesis_height
1577                );
1578            }
1579        }
1580
1581        // send updated pivot chain to pubsub
1582        let from = capped_fork_at;
1583        let to = inner.pivot_index_to_height(inner.pivot_chain.len());
1584
1585        for epoch_number in from..to {
1586            let arena_index = inner.get_pivot_block_arena_index(epoch_number);
1587            let epoch_hashes = inner.get_epoch_block_hashes(arena_index);
1588
1589            // send epoch to pub-sub layer
1590            self.epochs_sender.send((epoch_number, epoch_hashes));
1591
1592            // send epoch to blame verifier
1593            if let NodeType::Light = self.node_type {
1594                // ConsensusNewBlockHandler is single-threaded,
1595                // lock should always succeed.
1596                self.blame_verifier.lock().process(inner, epoch_number);
1597            }
1598        }
1599
1600        // If we are inserting header only, we will skip execution and
1601        // tx_pool-related operations
1602        if !inner.header_only {
1603            // FIXME: Now we have to pass a conservative stable_height here.
1604            // FIXME: Because the storage layer does not handle the case when
1605            // FIXME: this confirmed point being reverted. We have to be extra
1606            // FIXME: conservatively but this will cost storage space.
1607            // FIXME: Eventually, we should implement the logic to recover from
1608            // FIXME: the database if such a rare reversion case happens.
1609            //
1610            // FIXME: we need a function to compute the deferred epoch
1611            // FIXME: number. the current codebase may not be
1612            // FIXME: consistent at all places.
1613            let mut confirmed_height = meter.get_confirmed_epoch_num();
1614            if confirmed_height < DEFERRED_STATE_EPOCH_COUNT {
1615                confirmed_height = 0;
1616            } else {
1617                confirmed_height -= DEFERRED_STATE_EPOCH_COUNT;
1618            }
1619            // We can not assume that confirmed epoch are already executed,
1620            // but we can assume that the deferred block are executed.
1621            self.data_man
1622                .storage_manager
1623                .get_storage_manager()
1624                .maintain_state_confirmed(
1625                    inner,
1626                    inner.cur_era_stable_height,
1627                    self.conf.inner_conf.era_epoch_count,
1628                    confirmed_height,
1629                    &self.data_man.state_availability_boundary,
1630                )
1631                // FIXME: propogate error.
1632                .expect(&concat!(file!(), ":", line!(), ":", column!()));
1633            self.set_block_tx_packed(inner, me);
1634            self.delayed_tx_recycle_in_skipped_blocks(inner, capped_fork_at);
1635
1636            let to_state_pos = if inner
1637                .pivot_index_to_height(inner.pivot_chain.len())
1638                < DEFERRED_STATE_EPOCH_COUNT
1639            {
1640                0
1641            } else {
1642                inner.pivot_index_to_height(inner.pivot_chain.len())
1643                    - DEFERRED_STATE_EPOCH_COUNT
1644                    + 1
1645            };
1646            let mut state_at = capped_fork_at;
1647            if capped_fork_at + DEFERRED_STATE_EPOCH_COUNT
1648                > old_pivot_chain_height
1649            {
1650                if old_pivot_chain_height > DEFERRED_STATE_EPOCH_COUNT {
1651                    state_at =
1652                        old_pivot_chain_height - DEFERRED_STATE_EPOCH_COUNT + 1;
1653                } else {
1654                    state_at = 1;
1655                }
1656            }
1657            {
1658                let mut state_availability_boundary =
1659                    inner.data_man.state_availability_boundary.write();
1660                if pivot_changed {
1661                    assert!(
1662                        capped_fork_at > state_availability_boundary.lower_bound,
1663                        "forked_at {} should > boundary_lower_bound, boundary {:?}",
1664                        capped_fork_at,
1665                        state_availability_boundary
1666                    );
1667                    if extend_pivot {
1668                        state_availability_boundary
1669                            .pivot_chain
1670                            .push(inner.arena[me].hash);
1671                    } else {
1672                        let split_off_index = capped_fork_at
1673                            - state_availability_boundary.lower_bound;
1674                        state_availability_boundary
1675                            .pivot_chain
1676                            .truncate(split_off_index as usize);
1677                        for i in inner.height_to_pivot_index(capped_fork_at)
1678                            ..inner.pivot_chain.len()
1679                        {
1680                            state_availability_boundary
1681                                .pivot_chain
1682                                .push(inner.arena[inner.pivot_chain[i]].hash);
1683                        }
1684                        if state_availability_boundary.upper_bound
1685                            >= capped_fork_at
1686                        {
1687                            state_availability_boundary.upper_bound =
1688                                capped_fork_at - 1;
1689                        }
1690                    }
1691                    state_availability_boundary.optimistic_executed_height =
1692                        if to_state_pos
1693                            > state_availability_boundary.lower_bound
1694                        {
1695                            Some(to_state_pos)
1696                        } else {
1697                            None
1698                        };
1699                }
1700                // For full node, we don't execute blocks before available
1701                // states. This skip should only happen in
1702                // `SyncBlockPhase` for full nodes
1703                if state_at < state_availability_boundary.lower_bound + 1 {
1704                    state_at = state_availability_boundary.lower_bound + 1;
1705                }
1706            }
1707
1708            // Apply transactions in the determined total order
1709            while state_at < to_state_pos {
1710                let epoch_arena_index =
1711                    inner.get_pivot_block_arena_index(state_at);
1712                let reward_execution_info = self
1713                    .executor
1714                    .get_reward_execution_info(inner, epoch_arena_index);
1715                self.executor.enqueue_epoch(EpochExecutionTask::new(
1716                    epoch_arena_index,
1717                    inner,
1718                    reward_execution_info,
1719                    true,  /* on_local_pivot */
1720                    false, /* force_recompute */
1721                ));
1722
1723                state_at += 1;
1724            }
1725        }
1726
1727        self.persist_terminals(inner);
1728        debug!(
1729            "Finish activating block in ConsensusGraph: index={:?} hash={:?} cur_era_stable_height={} cur_era_genesis_height={}",
1730            me, inner.arena[me].hash, inner.cur_era_stable_height, inner.cur_era_genesis_height
1731        );
1732    }
1733
1734    /// The top level function invoked by ConsensusGraph to insert a new block.
1735    pub fn on_new_block(
1736        &self, inner: &mut ConsensusGraphInner, meter: &ConfirmationMeter,
1737        hash: &H256,
1738    ) {
1739        let block_header = self
1740            .data_man
1741            .block_header_by_hash(hash)
1742            .expect("header exist for consensus");
1743        debug!(
1744            "insert new block into consensus: header_only={:?} block={:?}",
1745            inner.header_only, &block_header
1746        );
1747        let parent_hash = block_header.parent_hash();
1748        let parent_index = inner.hash_to_arena_indices.get(&parent_hash);
1749        let me = if parent_index.is_none()
1750            || inner.arena[*parent_index.unwrap()].era_block == NULL
1751        {
1752            // current block is outside of the current era.
1753            debug!(
1754                "parent={:?} not in consensus graph or not in the genesis subtree, inserted as an out-era block stub",
1755                parent_hash
1756            );
1757            let block_status_in_db = self
1758                .data_man
1759                .local_block_info_by_hash(hash)
1760                .map(|info| info.get_status())
1761                .unwrap_or(BlockStatus::Pending);
1762            let (sn, me) = inner.insert_out_era_block(
1763                &block_header,
1764                block_status_in_db == BlockStatus::PartialInvalid,
1765            );
1766            let block_info = LocalBlockInfo::new(
1767                block_status_in_db,
1768                sn,
1769                self.data_man.get_instance_id(),
1770            );
1771            self.data_man.insert_local_block_info(hash, block_info);
1772            // If me is NULL, it means that this block does not have any stub,
1773            // so we can safely ignore it in the consensus besides
1774            // update its sequence number in the data manager.
1775            if me == NULL {
1776                // Block body in the anticone of a checkpoint is not needed for
1777                // full nodes, but they are still needed to sync
1778                // an archive node in the current implementation (to make their
1779                // child blocks `graph_ready`), so we still keep
1780                // them for now.
1781
1782                // self.data_man
1783                //     .remove_block_body(hash, true /* remove_db */);
1784                return;
1785            }
1786            me
1787        } else {
1788            let (me, indices_len) = inner.insert(&block_header);
1789            self.statistics
1790                .set_consensus_graph_inserted_block_count(indices_len);
1791            self.update_lcts_initial(inner, me);
1792            me
1793        };
1794
1795        if inner.arena[me].data.inactive_dependency_cnt == 0 {
1796            let mut queue: VecDeque<usize> = VecDeque::new();
1797            queue.push_back(me);
1798            while let Some(me) = queue.pop_front() {
1799                // For out-of-era blocks, we just fetch the results from the
1800                // already filled field. We do not run
1801                // preactivate_block() on them.
1802                let block_status = if inner.arena[me].era_block != NULL {
1803                    self.preactivate_block(inner, me)
1804                } else {
1805                    if inner.arena[me].data.partial_invalid {
1806                        BlockStatus::PartialInvalid
1807                    } else {
1808                        BlockStatus::Pending
1809                    }
1810                };
1811
1812                if block_status == BlockStatus::PartialInvalid {
1813                    inner.arena[me].data.partial_invalid = true;
1814                    let timer =
1815                        self.compute_invalid_block_start_timer(inner, me);
1816                    // We are not going to delay partial invalid blocks in the
1817                    // bench mode
1818                    if self.conf.bench_mode {
1819                        inner.invalid_block_queue.push((0, me));
1820                    } else {
1821                        inner.invalid_block_queue.push((-(timer as i128), me));
1822                    }
1823                    inner.arena[me].data.inactive_dependency_cnt = NULL;
1824                    debug!(
1825                        "Block {} (hash = {}) is partially invalid, all of its future will be non-active till timer height {}",
1826                        me, inner.arena[me].hash, timer
1827                    );
1828                } else {
1829                    if block_status == BlockStatus::Pending {
1830                        inner.arena[me].data.pending = true;
1831                        debug!(
1832                            "Block {} (hash = {}) is pending but processed",
1833                            me, inner.arena[me].hash
1834                        );
1835                    } else {
1836                        debug!(
1837                            "Block {} (hash = {}) is fully valid",
1838                            me, inner.arena[me].hash
1839                        );
1840                    }
1841                    self.activate_block(inner, me, meter, &mut queue);
1842                }
1843                // Now we are going to check all invalid blocks in the delay
1844                // queue Activate them if the timer is up
1845                let timer = if let Some(x) = inner.timer_chain.last() {
1846                    inner.arena[*x].data.ledger_view_timer_chain_height + 1
1847                } else {
1848                    inner.cur_era_genesis_timer_chain_height
1849                };
1850                loop {
1851                    if let Some((t, _)) = inner.invalid_block_queue.peek() {
1852                        if timer < (-*t) as u64 {
1853                            break;
1854                        }
1855                    } else {
1856                        break;
1857                    }
1858                    let (_, x) = inner.invalid_block_queue.pop().unwrap();
1859                    assert!(
1860                        inner.arena[x].data.inactive_dependency_cnt == NULL
1861                    );
1862                    inner.arena[x].data.inactive_dependency_cnt = 0;
1863                    self.activate_block(inner, x, meter, &mut queue);
1864                }
1865            }
1866        } else {
1867            debug!(
1868                "Block {} (hash = {}) is non-active with active counter {}",
1869                me,
1870                inner.arena[me].hash,
1871                inner.arena[me].data.inactive_dependency_cnt
1872            );
1873        }
1874    }
1875
1876    fn persist_block_info(
1877        &self, inner: &mut ConsensusGraphInner, me: usize,
1878        block_status: BlockStatus,
1879    ) {
1880        let block_info = LocalBlockInfo::new(
1881            block_status,
1882            inner.arena[me].data.sequence_number,
1883            self.data_man.get_instance_id(),
1884        );
1885        self.data_man
1886            .insert_local_block_info(&inner.arena[me].hash, block_info);
1887        let era_block = inner.arena[me].era_block();
1888        let era_block_hash = if era_block != NULL {
1889            inner.arena[era_block].hash
1890        } else {
1891            Default::default()
1892        };
1893        if inner.inner_conf.enable_state_expose {
1894            STATE_EXPOSER.consensus_graph.lock().block_state_vec.push(
1895                ConsensusGraphBlockState {
1896                    block_hash: inner.arena[me].hash,
1897                    best_block_hash: inner.best_block_hash(),
1898                    block_status: block_info.get_status(),
1899                    era_block_hash,
1900                    adaptive: inner.arena[me].adaptive(),
1901                },
1902            )
1903        }
1904    }
1905
1906    /// construct_pivot_state() rebuild pivot chain state info from db
1907    /// avoiding intermediate redundant computation triggered by
1908    /// on_new_block().
1909    /// It also recovers receipts_root and logs_bloom_hash in pivot chain.
1910    /// This function is only invoked from recover_graph_from_db with
1911    /// header_only being false.
1912    pub fn construct_pivot_state(
1913        &self, inner: &mut ConsensusGraphInner, meter: &ConfirmationMeter,
1914    ) {
1915        // FIXME: this line doesn't exactly match its purpose.
1916        // FIXME: Is it the checkpoint or synced snapshot or could it be
1917        // anything else?
1918        let state_boundary_height =
1919            self.data_man.state_availability_boundary.read().lower_bound;
1920        let start_pivot_index =
1921            (state_boundary_height - inner.cur_era_genesis_height) as usize;
1922        debug!(
1923            "construct_pivot_state: start={}, pivot_chain.len()={}, state_boundary_height={}",
1924            start_pivot_index,
1925            inner.pivot_chain.len(),
1926            state_boundary_height
1927        );
1928        if start_pivot_index >= inner.pivot_chain.len() {
1929            // The pivot chain of recovered blocks is before state lower_bound,
1930            // so we do not need to construct any pivot state.
1931            return;
1932        }
1933        let start_hash = inner.arena[inner.pivot_chain[start_pivot_index]].hash;
1934        // Here, we should ensure the epoch_execution_commitment for stable hash
1935        // must be loaded into memory. Since, in some rare cases, the number of
1936        // blocks between stable and best_epoch is less than
1937        // DEFERRED_STATE_EPOCH_COUNT, the for loop below will not load
1938        // epoch_execution_commitment for stable hash.
1939        if start_hash != inner.data_man.true_genesis.hash()
1940            && self
1941                .data_man
1942                .get_epoch_execution_commitment(&start_hash)
1943                .is_none()
1944        {
1945            self.data_man.load_epoch_execution_commitment_from_db(&start_hash)
1946                .expect("epoch_execution_commitment for stable hash must exist in disk");
1947        }
1948        {
1949            let mut state_availability_boundary =
1950                self.data_man.state_availability_boundary.write();
1951            assert!(
1952                state_availability_boundary.lower_bound
1953                    == state_availability_boundary.upper_bound
1954            );
1955            for pivot_index in start_pivot_index + 1..inner.pivot_chain.len() {
1956                state_availability_boundary
1957                    .pivot_chain
1958                    .push(inner.arena[inner.pivot_chain[pivot_index]].hash);
1959            }
1960        }
1961
1962        if inner.pivot_chain.len() < DEFERRED_STATE_EPOCH_COUNT as usize {
1963            return;
1964        }
1965
1966        let end_index =
1967            inner.pivot_chain.len() - DEFERRED_STATE_EPOCH_COUNT as usize + 1;
1968        for pivot_index in start_pivot_index + 1..end_index {
1969            let pivot_arena_index = inner.pivot_chain[pivot_index];
1970            let pivot_hash = inner.arena[pivot_arena_index].hash;
1971
1972            // Ensure that the commitments for the blocks on
1973            // pivot_chain after cur_era_stable_genesis are kept in memory.
1974            if self
1975                .data_man
1976                .load_epoch_execution_commitment_from_db(&pivot_hash)
1977                .is_none()
1978            {
1979                break;
1980            }
1981        }
1982
1983        // Retrieve the most recently executed epoch
1984        let mut start_compute_epoch_pivot_index =
1985            self.get_force_compute_index(inner, start_pivot_index, end_index);
1986
1987        // Retrieve the earliest non-executed epoch
1988        for pivot_index in start_pivot_index + 1..end_index {
1989            let pivot_arena_index = inner.pivot_chain[pivot_index];
1990            let pivot_hash = inner.arena[pivot_arena_index].hash;
1991
1992            if self
1993                .data_man
1994                .get_epoch_execution_commitment(&pivot_hash)
1995                .is_none()
1996            {
1997                start_compute_epoch_pivot_index =
1998                    min(pivot_index, start_compute_epoch_pivot_index);
1999                debug!(
2000                    "Start compute epoch pivot index {}, height {}",
2001                    pivot_index, inner.arena[pivot_arena_index].height
2002                );
2003                break;
2004            }
2005        }
2006
2007        let snapshot_epoch_count = inner
2008            .data_man
2009            .storage_manager
2010            .get_storage_manager()
2011            .get_snapshot_epoch_count();
2012        let mut need_set_intermediate_trie_root_merkle = false;
2013        let max_snapshot_epoch_index_has_mpt = self
2014            .recover_latest_mpt_snapshot_if_needed(
2015                inner,
2016                &mut start_compute_epoch_pivot_index,
2017                start_pivot_index,
2018                end_index,
2019                &mut need_set_intermediate_trie_root_merkle,
2020                snapshot_epoch_count as u64,
2021            );
2022        self.set_intermediate_trie_root_merkle(
2023            inner,
2024            start_compute_epoch_pivot_index,
2025            need_set_intermediate_trie_root_merkle,
2026            snapshot_epoch_count as u64,
2027        );
2028
2029        let confirmed_epoch_num = meter.get_confirmed_epoch_num();
2030        for pivot_index in start_pivot_index + 1..end_index {
2031            let pivot_arena_index = inner.pivot_chain[pivot_index];
2032            let pivot_hash = inner.arena[pivot_arena_index].hash;
2033            let height = inner.arena[pivot_arena_index].height;
2034
2035            let compute_epoch =
2036                if pivot_index >= start_compute_epoch_pivot_index {
2037                    true
2038                } else {
2039                    false
2040                };
2041
2042            if self
2043                .data_man
2044                .get_epoch_execution_commitment(&pivot_hash)
2045                .is_some()
2046            {
2047                self.data_man
2048                    .state_availability_boundary
2049                    .write()
2050                    .upper_bound += 1;
2051            }
2052
2053            info!(
2054                "construct_pivot_state: index {} height {} compute_epoch {}.",
2055                pivot_index, height, compute_epoch,
2056            );
2057
2058            if compute_epoch {
2059                let reward_execution_info = self
2060                    .executor
2061                    .get_reward_execution_info(inner, pivot_arena_index);
2062
2063                let recover_mpt_during_construct_pivot_state =
2064                    max_snapshot_epoch_index_has_mpt.map_or(true, |idx| {
2065                        pivot_index > idx + snapshot_epoch_count as usize
2066                    });
2067                info!(
2068                    "compute epoch recovery flag {}",
2069                    recover_mpt_during_construct_pivot_state
2070                );
2071                self.executor.compute_epoch(
2072                    EpochExecutionTask::new(
2073                        pivot_arena_index,
2074                        inner,
2075                        reward_execution_info,
2076                        true, /* on_local_pivot */
2077                        true, /* force_recompute */
2078                    ),
2079                    None,
2080                    recover_mpt_during_construct_pivot_state,
2081                );
2082
2083                // Remove old-pivot state during start up to save disk,
2084                // otherwise, all state will be keep till normal phase, this
2085                // will occupy too many disk
2086                {
2087                    let mut confirmed_height = min(confirmed_epoch_num, height);
2088                    if confirmed_height < DEFERRED_STATE_EPOCH_COUNT {
2089                        confirmed_height = 0;
2090                    } else {
2091                        confirmed_height -= DEFERRED_STATE_EPOCH_COUNT;
2092                    }
2093
2094                    self.data_man
2095                        .storage_manager
2096                        .get_storage_manager()
2097                        .maintain_state_confirmed(
2098                            inner,
2099                            inner.cur_era_stable_height,
2100                            self.conf.inner_conf.era_epoch_count,
2101                            confirmed_height,
2102                            &self.data_man.state_availability_boundary,
2103                        )
2104                        .expect(&concat!(
2105                            file!(),
2106                            ":",
2107                            line!(),
2108                            ":",
2109                            column!()
2110                        ));
2111                }
2112            }
2113        }
2114
2115        inner
2116            .data_man
2117            .storage_manager
2118            .get_storage_manager()
2119            .get_snapshot_manager()
2120            .get_snapshot_db_manager()
2121            .clean_snapshot_epoch_id_before_recovered();
2122    }
2123
2124    fn get_force_compute_index(
2125        &self, inner: &mut ConsensusGraphInner, start_pivot_index: usize,
2126        end_index: usize,
2127    ) -> usize {
2128        let mut force_compute_index = start_pivot_index + 1;
2129        let mut epoch_count = 0;
2130        for pivot_index in (start_pivot_index + 1..end_index).rev() {
2131            let pivot_arena_index = inner.pivot_chain[pivot_index];
2132            let pivot_hash = inner.arena[pivot_arena_index].hash;
2133
2134            let maybe_epoch_execution_commitment =
2135                self.data_man.get_epoch_execution_commitment(&pivot_hash);
2136            if let Some(commitment) = *maybe_epoch_execution_commitment {
2137                if self
2138                    .data_man
2139                    .storage_manager
2140                    .get_state_no_commit_inner(
2141                        StateIndex::new_for_readonly(
2142                            &pivot_hash,
2143                            &commitment.state_root_with_aux_info,
2144                        ),
2145                        /* try_open = */ false,
2146                        false,
2147                    )
2148                    .expect("DB Error")
2149                    .is_some()
2150                {
2151                    epoch_count += 1;
2152
2153                    // force to recompute last 5 epochs in case state database
2154                    // is not ready in last shutdown
2155                    if epoch_count > DEFERRED_STATE_EPOCH_COUNT {
2156                        let reward_execution_info =
2157                            self.executor.get_reward_execution_info(
2158                                inner,
2159                                pivot_arena_index,
2160                            );
2161
2162                        let pivot_block_height = self
2163                            .data_man
2164                            .block_header_by_hash(&pivot_hash)
2165                            .expect("must exists")
2166                            .height();
2167
2168                        // ensure current epoch is new executed whether there
2169                        // is a fork or not
2170                        if self.executor.epoch_executed_and_recovered(
2171                            &pivot_hash,
2172                            &inner.get_epoch_block_hashes(pivot_arena_index),
2173                            true,
2174                            &reward_execution_info,
2175                            pivot_block_height,
2176                        ) {
2177                            force_compute_index = pivot_index + 1;
2178                            debug!(
2179                                "Force compute start index {}",
2180                                force_compute_index
2181                            );
2182                            break;
2183                        }
2184                    }
2185                } else {
2186                    epoch_count = 0;
2187                }
2188            }
2189        }
2190
2191        if let Some(height) = self
2192            .conf
2193            .inner_conf
2194            .force_recompute_height_during_construct_pivot
2195        {
2196            if height > inner.cur_era_stable_height {
2197                let pivot_idx = inner.height_to_pivot_index(height);
2198                debug!(
2199                    "force recompute height during constructing pivot {}",
2200                    pivot_idx
2201                );
2202                force_compute_index = min(force_compute_index, pivot_idx);
2203            }
2204        }
2205
2206        force_compute_index
2207    }
2208
2209    fn recover_latest_mpt_snapshot_if_needed(
2210        &self, inner: &mut ConsensusGraphInner,
2211        start_compute_epoch_pivot_index: &mut usize, start_pivot_index: usize,
2212        end_index: usize, need_set_intermediate_trie_root_merkle: &mut bool,
2213        snapshot_epoch_count: u64,
2214    ) -> Option<usize> {
2215        if !self.conf.inner_conf.use_isolated_db_for_mpt_table {
2216            return Some(end_index);
2217        }
2218
2219        let (
2220            temp_snapshot_db_existing,
2221            removed_snapshots,
2222            latest_snapshot_epoch_height,
2223            max_snapshot_epoch_height_has_mpt,
2224        ) = if let Some((
2225            temp_snapshot_db_existing,
2226            removed_snapshots,
2227            latest_snapshot_epoch_height,
2228            max_snapshot_epoch_height_has_mpt,
2229        )) = inner
2230            .data_man
2231            .storage_manager
2232            .get_storage_manager()
2233            .persist_state_from_initialization
2234            .write()
2235            .take()
2236        {
2237            (
2238                temp_snapshot_db_existing,
2239                removed_snapshots,
2240                max(latest_snapshot_epoch_height, inner.cur_era_stable_height),
2241                max_snapshot_epoch_height_has_mpt,
2242            )
2243        } else {
2244            (None, HashSet::new(), inner.cur_era_stable_height, None)
2245        };
2246
2247        debug!("latest snapshot epoch height: {}, temp snapshot status: {:?}, max snapshot epoch height has mpt: {:?}, removed snapshots {:?}",
2248            latest_snapshot_epoch_height, temp_snapshot_db_existing, max_snapshot_epoch_height_has_mpt, removed_snapshots);
2249
2250        if removed_snapshots.len() == 1
2251            && removed_snapshots.contains(&NULL_EPOCH)
2252        {
2253            debug!("special case for synced snapshot");
2254            return Some(end_index);
2255        }
2256
2257        if max_snapshot_epoch_height_has_mpt
2258            .is_some_and(|h| h == latest_snapshot_epoch_height)
2259        {
2260            inner
2261                .data_man
2262                .storage_manager
2263                .get_storage_manager()
2264                .get_snapshot_manager()
2265                .get_snapshot_db_manager()
2266                .recreate_latest_mpt_snapshot()
2267                .unwrap();
2268
2269            info!(
2270                "snapshot for epoch height {} is still not use mpt database",
2271                start_compute_epoch_pivot_index
2272            );
2273            return Some(end_index);
2274        }
2275
2276        // maximum epoch need to compute
2277        let maximum_height_to_create_next_snapshot =
2278            latest_snapshot_epoch_height + snapshot_epoch_count * 2;
2279        let index =
2280            inner.height_to_pivot_index(maximum_height_to_create_next_snapshot);
2281        if *start_compute_epoch_pivot_index > index {
2282            warn!("start_compute_epoch_pivot_index is greater than maximum epoch need to compute {}", index);
2283            *start_compute_epoch_pivot_index = index;
2284        }
2285
2286        // Find the closest ear prior to the start_compute_epoch_height
2287        let start_compute_epoch_height = inner.arena
2288            [inner.pivot_chain[*start_compute_epoch_pivot_index]]
2289            .height;
2290        info!(
2291            "current start compute epoch height {}",
2292            start_compute_epoch_height
2293        );
2294
2295        let recovery_latest_mpt_snapshot =
2296            if self.conf.inner_conf.recovery_latest_mpt_snapshot
2297                || start_compute_epoch_height <= latest_snapshot_epoch_height
2298                || (temp_snapshot_db_existing.is_some()
2299                    && latest_snapshot_epoch_height
2300                        < start_compute_epoch_height
2301                    && start_compute_epoch_height
2302                        <= latest_snapshot_epoch_height + snapshot_epoch_count)
2303            {
2304                true
2305            } else {
2306                let mut max_epoch_height = 0;
2307                for pivot_index in (start_pivot_index..end_index)
2308                    .step_by(snapshot_epoch_count as usize)
2309                {
2310                    let pivot_arena_index = inner.pivot_chain[pivot_index];
2311                    let pivot_hash = inner.arena[pivot_arena_index].hash;
2312
2313                    debug!(
2314                        "snapshot pivot_index {} height {} ",
2315                        pivot_index, inner.arena[pivot_arena_index].height
2316                    );
2317
2318                    if removed_snapshots.contains(&pivot_hash) {
2319                        max_epoch_height = max(
2320                            max_epoch_height,
2321                            inner.arena[pivot_arena_index].height,
2322                        );
2323                    }
2324                }
2325
2326                // snapshots after latest_snapshot_epoch_height is removed
2327                latest_snapshot_epoch_height < max_epoch_height
2328            };
2329
2330        // if the latest_snapshot_epoch_height is greater than
2331        // start_compute_epoch_height, the latest MPT snapshot is dirty
2332        if recovery_latest_mpt_snapshot {
2333            let era_pivot_epoch_height = if start_compute_epoch_height
2334                <= inner.cur_era_stable_height + snapshot_epoch_count
2335            {
2336                debug!("snapshot for cur_era_stable_height must be exist");
2337                inner.cur_era_stable_height
2338            } else {
2339                (start_compute_epoch_height - snapshot_epoch_count - 1)
2340                    / self.conf.inner_conf.era_epoch_count
2341                    * self.conf.inner_conf.era_epoch_count
2342            };
2343
2344            if era_pivot_epoch_height > latest_snapshot_epoch_height {
2345                panic!("era_pivot_epoch_height is greater than latest_snapshot_epoch_height, this should not happen");
2346            }
2347
2348            debug!(
2349                "need recovery latest mpt snapshot, start compute epoch height {}, era pivot epoch height {}",
2350                start_compute_epoch_height, era_pivot_epoch_height
2351            );
2352
2353            if start_compute_epoch_height <= era_pivot_epoch_height {
2354                unreachable!("start_compute_epoch_height {} is smaller than era_pivot_epoch_height {}", start_compute_epoch_height, era_pivot_epoch_height);
2355            } else if start_compute_epoch_height
2356                <= era_pivot_epoch_height + snapshot_epoch_count
2357            {
2358                if start_compute_epoch_height % snapshot_epoch_count == 1 {
2359                    *need_set_intermediate_trie_root_merkle = true;
2360                }
2361            } else if start_compute_epoch_height
2362                <= era_pivot_epoch_height + snapshot_epoch_count * 2
2363            {
2364                // nothing need to do
2365            } else {
2366                let new_height =
2367                    era_pivot_epoch_height + snapshot_epoch_count * 2;
2368                let new_index = inner.height_to_pivot_index(new_height);
2369
2370                info!("reset start_compute_epoch_pivot_index to {}", new_index);
2371                *start_compute_epoch_pivot_index = new_index;
2372            }
2373
2374            let era_pivot_hash = if era_pivot_epoch_height == 0 {
2375                NULL_EPOCH
2376            } else {
2377                inner
2378                    .get_pivot_hash_from_epoch_number(era_pivot_epoch_height)
2379                    .expect("pivot hash should be exist")
2380            };
2381
2382            let snapshot_db_manager = inner
2383                .data_man
2384                .storage_manager
2385                .get_storage_manager()
2386                .get_snapshot_manager()
2387                .get_snapshot_db_manager();
2388
2389            snapshot_db_manager.update_latest_snapshot_id(
2390                era_pivot_hash.clone(),
2391                era_pivot_epoch_height,
2392            );
2393
2394            if max_snapshot_epoch_height_has_mpt
2395                .is_some_and(|height| height >= era_pivot_epoch_height)
2396            {
2397                // mpt snapshot will be created from empty
2398                snapshot_db_manager.recreate_latest_mpt_snapshot().unwrap();
2399            } else {
2400                let pivot_hash_before_era = if era_pivot_epoch_height == 0 {
2401                    None
2402                } else {
2403                    Some(
2404                        inner
2405                            .get_pivot_hash_from_epoch_number(
2406                                era_pivot_epoch_height - snapshot_epoch_count,
2407                            )
2408                            .expect("pivot hash should be exist"),
2409                    )
2410                };
2411
2412                // use ear snapshot replace latest
2413                snapshot_db_manager
2414                    .recovery_latest_mpt_snapshot_from_checkpoint(
2415                        &era_pivot_hash,
2416                        pivot_hash_before_era,
2417                    )
2418                    .unwrap();
2419            }
2420
2421            max_snapshot_epoch_height_has_mpt.and_then(|v| {
2422                if v >= inner.cur_era_stable_height {
2423                    Some(inner.height_to_pivot_index(v))
2424                } else {
2425                    None
2426                }
2427            })
2428        } else {
2429            if temp_snapshot_db_existing.is_some()
2430                && latest_snapshot_epoch_height + snapshot_epoch_count
2431                    < start_compute_epoch_height
2432                && start_compute_epoch_height
2433                    <= latest_snapshot_epoch_height + 2 * snapshot_epoch_count
2434            {
2435                inner
2436                    .data_man
2437                    .storage_manager
2438                    .get_storage_manager()
2439                    .get_snapshot_manager()
2440                    .get_snapshot_db_manager()
2441                    .set_reconstruct_snapshot_id(temp_snapshot_db_existing);
2442            }
2443
2444            debug!("the latest MPT snapshot is valid");
2445            Some(end_index)
2446        }
2447    }
2448
2449    fn set_intermediate_trie_root_merkle(
2450        &self, inner: &mut ConsensusGraphInner,
2451        start_compute_epoch_pivot_index: usize,
2452        need_set_intermediate_trie_root_merkle: bool,
2453        snapshot_epoch_count: u64,
2454    ) {
2455        let storage_manager =
2456            inner.data_man.storage_manager.get_storage_manager();
2457        if !storage_manager
2458            .storage_conf
2459            .keep_snapshot_before_stable_checkpoint
2460            || need_set_intermediate_trie_root_merkle
2461        {
2462            let pivot_arena_index =
2463                inner.pivot_chain[start_compute_epoch_pivot_index - 1];
2464            let pivot_hash = inner.arena[pivot_arena_index].hash;
2465            let height = inner.arena[pivot_arena_index].height + 1;
2466
2467            let intermediate_trie_root_merkle = match *self
2468                .data_man
2469                .get_epoch_execution_commitment(&pivot_hash)
2470            {
2471                Some(commitment) => {
2472                    if height % snapshot_epoch_count == 1 {
2473                        commitment
2474                            .state_root_with_aux_info
2475                            .state_root
2476                            .delta_root
2477                    } else {
2478                        commitment
2479                            .state_root_with_aux_info
2480                            .state_root
2481                            .intermediate_delta_root
2482                    }
2483                }
2484                None => MERKLE_NULL_NODE,
2485            };
2486
2487            debug!("previous pivot hash {:?} intermediate trie root merkle for next pivot {:?}", pivot_hash, intermediate_trie_root_merkle);
2488            *storage_manager.intermediate_trie_root_merkle.write() =
2489                Some(intermediate_trie_root_merkle);
2490        }
2491    }
2492
2493    fn set_block_tx_packed(&self, inner: &ConsensusGraphInner, me: usize) {
2494        if !self.txpool.ready_for_mining() {
2495            // Skip tx pool operation before catching up.
2496            return;
2497        }
2498        let parent = inner.arena[me].parent;
2499        if parent == NULL {
2500            warn!(
2501                "set_block_tx_packed skips block with empty parent {:?}",
2502                inner.arena[me].hash
2503            );
2504            return;
2505        }
2506        let era_genesis_height =
2507            inner.get_era_genesis_height(inner.arena[parent].height);
2508        let cur_pivot_era_block = if inner
2509            .pivot_index_to_height(inner.pivot_chain.len())
2510            > era_genesis_height
2511        {
2512            inner.get_pivot_block_arena_index(era_genesis_height)
2513        } else {
2514            NULL
2515        };
2516        let era_block = inner.get_era_genesis_block_with_parent(parent);
2517
2518        // It's only correct to set tx stale after the block is considered
2519        // terminal for mining.
2520        // Note that we conservatively only mark those blocks inside the
2521        // current pivot era
2522        if era_block == cur_pivot_era_block {
2523            self.txpool.set_tx_packed(
2524                &self
2525                    .data_man
2526                    .block_by_hash(
2527                        &inner.arena[me].hash,
2528                        true, /* update_cache */
2529                    )
2530                    .expect("Already checked")
2531                    .transactions,
2532            );
2533        } else {
2534            warn!("set_block_tx_packed skips block {:?}", inner.arena[me].hash);
2535        }
2536    }
2537
2538    fn delayed_tx_recycle_in_skipped_blocks(
2539        &self, inner: &mut ConsensusGraphInner, fork_height: u64,
2540    ) {
2541        if !self.txpool.ready_for_mining() {
2542            // Skip tx pool operation before catching up.
2543            return;
2544        }
2545        if inner.pivot_chain.len() > RECYCLE_TRANSACTION_DELAY as usize {
2546            let recycle_end_pivot_index = inner.pivot_chain.len()
2547                - RECYCLE_TRANSACTION_DELAY as usize
2548                - 1;
2549            // If the pivot reorg is deeper than `RECYCLE_TRANSACTION_DELAY`, we
2550            // will try to recycle all skipped blocks since the
2551            // forking point.
2552            let start = min(
2553                // `fork_height` has been capped by the caller.
2554                inner.height_to_pivot_index(fork_height),
2555                recycle_end_pivot_index,
2556            );
2557            for recycle_pivot_index in start..=recycle_end_pivot_index {
2558                let recycle_arena_index =
2559                    inner.pivot_chain[recycle_pivot_index];
2560                let skipped_blocks = inner
2561                    .get_or_compute_skipped_epoch_blocks(recycle_arena_index)
2562                    .clone();
2563                for h in &skipped_blocks {
2564                    self.recycle_tx_in_block(inner, h);
2565                }
2566            }
2567        }
2568    }
2569}