cfxcore/sync/
synchronization_graph.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use std::{
6    cmp::max,
7    collections::{BinaryHeap, HashMap, HashSet, VecDeque},
8    mem, panic,
9    sync::{
10        atomic::{AtomicUsize, Ordering},
11        Arc,
12    },
13    thread,
14    time::{Duration, SystemTime, UNIX_EPOCH},
15};
16
17use cfx_parameters::consensus_internal::ELASTICITY_MULTIPLIER;
18use parking_lot::{Mutex, RwLock};
19use slab::Slab;
20use tokio::sync::mpsc::error::TryRecvError;
21use unexpected::{Mismatch, OutOfBounds};
22
23use cfx_executor::machine::Machine;
24use cfx_types::{H256, U256};
25use cfxcore_errors::ProviderBlockError;
26use dag::{Graph, RichDAG, RichTreeGraph, TreeGraph, DAG};
27use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
28use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
29use metrics::{
30    register_meter_with_group, register_queue, Meter, MeterTimer, Queue,
31};
32use primitives::{
33    pos::PosBlockId, transaction::SignedTransaction, Block, BlockHeader,
34    EpochNumber,
35};
36
37use crate::{
38    block_data_manager::{BlockDataManager, BlockStatus},
39    channel::Channel,
40    consensus::{pos_handler::PosVerifier, SharedConsensusGraph},
41    core_error::{BlockError, CoreError as Error},
42    pow::{PowComputer, ProofOfWorkConfig},
43    state_exposer::{SyncGraphBlockState, STATE_EXPOSER},
44    statistics::SharedStatistics,
45    sync::synchronization_protocol_handler::FutureBlockContainer,
46    verification::*,
47    Notifications,
48};
49
50lazy_static! {
51    static ref SYNC_INSERT_HEADER: Arc<dyn Meter> =
52        register_meter_with_group("timer", "sync::insert_block_header");
53    static ref SYNC_INSERT_BLOCK: Arc<dyn Meter> =
54        register_meter_with_group("timer", "sync::insert_block");
55    static ref CONSENSUS_WORKER_QUEUE: Arc<dyn Queue> =
56        register_queue("consensus_worker_queue");
57}
58
59const NULL: usize = !0;
60const BLOCK_INVALID: u8 = 0;
61const BLOCK_HEADER_ONLY: u8 = 1;
62const BLOCK_HEADER_GRAPH_READY: u8 = 2;
63const BLOCK_GRAPH_READY: u8 = 3;
64
65#[derive(Copy, Clone)]
66pub struct SyncGraphConfig {
67    pub future_block_buffer_capacity: usize,
68    pub enable_state_expose: bool,
69    pub is_consortium: bool,
70}
71
72#[derive(Debug)]
73pub struct SyncGraphStatistics {
74    pub inserted_block_count: usize,
75    pub inserted_header_count: usize,
76}
77
78impl SyncGraphStatistics {
79    pub fn new() -> SyncGraphStatistics {
80        SyncGraphStatistics {
81            // Already counted genesis block
82            inserted_header_count: 1,
83            inserted_block_count: 1,
84        }
85    }
86
87    pub fn clear(&mut self) {
88        self.inserted_header_count = 1;
89        self.inserted_block_count = 1;
90    }
91}
92
93#[derive(DeriveMallocSizeOf)]
94pub struct SynchronizationGraphNode {
95    pub block_header: Arc<BlockHeader>,
96    /// The status of graph connectivity in the current block view.
97    pub graph_status: u8,
98    /// Whether the block body is ready.
99    pub block_ready: bool,
100    /// Whether parent is in old era and already reclaimed
101    pub parent_reclaimed: bool,
102    /// The index of the parent of the block.
103    pub parent: usize,
104    /// The indices of the children of the block.
105    pub children: Vec<usize>,
106    /// The indices of the blocks referenced by the block.
107    pub referees: Vec<usize>,
108    /// The number of blocks referenced by the block but
109    /// haven't been inserted in synchronization graph.
110    pub pending_referee_count: usize,
111    /// The indices of the blocks referencing the block.
112    pub referrers: Vec<usize>,
113    /// the timestamp in seconds when graph_status updated
114    pub last_update_timestamp: u64,
115}
116
117#[derive(DeriveMallocSizeOf)]
118pub struct UnreadyBlockFrontier {
119    frontier: HashSet<usize>,
120    updated: bool,
121}
122
123impl UnreadyBlockFrontier {
124    fn new() -> Self {
125        UnreadyBlockFrontier {
126            frontier: HashSet::new(),
127            updated: false,
128        }
129    }
130
131    pub fn reset_update_state(&mut self) { self.updated = false; }
132
133    pub fn updated(&self) -> bool { self.updated }
134
135    pub fn get_frontier(&self) -> &HashSet<usize> { &self.frontier }
136
137    pub fn remove(&mut self, index: &usize) -> bool {
138        self.updated = true;
139        self.frontier.remove(index)
140    }
141
142    pub fn contains(&self, index: &usize) -> bool {
143        self.frontier.contains(index)
144    }
145
146    pub fn insert(&mut self, index: usize) -> bool {
147        self.updated = true;
148        self.frontier.insert(index)
149    }
150
151    pub fn len(&self) -> usize { self.frontier.len() }
152}
153
154pub struct SynchronizationGraphInner {
155    pub arena: Slab<SynchronizationGraphNode>,
156    pub hash_to_arena_indices: HashMap<H256, usize>,
157    pub data_man: Arc<BlockDataManager>,
158    children_by_hash: HashMap<H256, Vec<usize>>,
159    referrers_by_hash: HashMap<H256, Vec<usize>>,
160    pub pow_config: ProofOfWorkConfig,
161    pub pow: Arc<PowComputer>,
162    pub config: SyncGraphConfig,
163    /// The indices of blocks whose graph_status is not GRAPH_READY.
164    /// It may consider not header-graph-ready in phases
165    /// `CatchUpRecoverBlockHeaderFromDB` and `CatchUpSyncBlockHeader`.
166    /// Or, it may consider not block-graph-ready in phases
167    /// `CatchUpRecoverBlockFromDB`, `CatchUpSyncBlock`, and `Normal`.
168    pub not_ready_blocks_frontier: UnreadyBlockFrontier,
169
170    /// This includes the blocks whose parent and referees are all ready, and
171    /// only pos_reference has not been ready (pos_reference not committed
172    /// or its pivot decision is not ready).
173    pub pos_not_ready_blocks_frontier: HashSet<usize>,
174    pub old_era_blocks_frontier: VecDeque<usize>,
175    pub old_era_blocks_frontier_set: HashSet<usize>,
176
177    /// Set to `true` in `CatchUpCheckpointPhase` and
178    /// `CatchUpFillBlockBodyPhase` so that sync graph and consensus graph
179    /// remain unchanged for consistency.
180    pub locked_for_catchup: bool,
181    /// The set of blocks that we need to download block bodies in
182    /// `CatchUpFillBlockBodyPhase`.
183    pub block_to_fill_set: HashSet<H256>,
184    machine: Arc<Machine>,
185    pub pos_verifier: Arc<PosVerifier>,
186}
187
188impl MallocSizeOf for SynchronizationGraphInner {
189    fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
190        self.arena.size_of(ops)
191            + self.hash_to_arena_indices.size_of(ops)
192            + self.data_man.size_of(ops)
193            + self.children_by_hash.size_of(ops)
194            + self.referrers_by_hash.size_of(ops)
195            + self.pow_config.size_of(ops)
196            + self.not_ready_blocks_frontier.size_of(ops)
197            + self.old_era_blocks_frontier.size_of(ops)
198            + self.old_era_blocks_frontier_set.size_of(ops)
199        // Does not count size_of machine.
200    }
201}
202
203impl SynchronizationGraphInner {
204    pub fn with_genesis_block(
205        genesis_header: Arc<BlockHeader>, pow_config: ProofOfWorkConfig,
206        pow: Arc<PowComputer>, config: SyncGraphConfig,
207        data_man: Arc<BlockDataManager>, machine: Arc<Machine>,
208        pos_verifier: Arc<PosVerifier>,
209    ) -> Self {
210        let mut inner = SynchronizationGraphInner {
211            arena: Slab::new(),
212            hash_to_arena_indices: HashMap::new(),
213            data_man,
214            children_by_hash: HashMap::new(),
215            referrers_by_hash: HashMap::new(),
216            pow_config,
217            pow,
218            config,
219            not_ready_blocks_frontier: UnreadyBlockFrontier::new(),
220            pos_not_ready_blocks_frontier: Default::default(),
221            old_era_blocks_frontier: Default::default(),
222            old_era_blocks_frontier_set: Default::default(),
223            block_to_fill_set: Default::default(),
224            locked_for_catchup: false,
225            machine,
226            pos_verifier,
227        };
228        let genesis_hash = genesis_header.hash();
229        let genesis_block_index = inner.insert(genesis_header);
230        debug!(
231            "genesis block {:?} has index {} in sync graph",
232            genesis_hash, genesis_block_index
233        );
234
235        inner.old_era_blocks_frontier.push_back(genesis_block_index);
236        inner
237            .old_era_blocks_frontier_set
238            .insert(genesis_block_index);
239
240        inner
241    }
242
243    fn get_genesis_in_current_era(&self) -> usize {
244        let genesis_hash = self.data_man.get_cur_consensus_era_genesis_hash();
245        *self.hash_to_arena_indices.get(&genesis_hash).unwrap()
246    }
247
248    pub fn get_genesis_hash_and_height_in_current_era(&self) -> (H256, u64) {
249        let era_genesis = self.get_genesis_in_current_era();
250        (
251            self.arena[era_genesis].block_header.hash(),
252            self.arena[era_genesis].block_header.height(),
253        )
254    }
255
256    pub fn get_stable_hash_and_height_in_current_era(&self) -> (H256, u64) {
257        let stable_hash = self.data_man.get_cur_consensus_era_stable_hash();
258        // The stable block may not be in the sync-graph when this function is
259        // invoked during the synchronization phase, let's query the
260        // data from data manager
261        let height = self
262            .data_man
263            .block_header_by_hash(&stable_hash)
264            .expect("stable block must exist in data manager")
265            .height();
266        (stable_hash, height)
267    }
268
269    fn try_clear_old_era_blocks(&mut self) {
270        let max_num_of_cleared_blocks = 2;
271        let mut num_cleared = 0;
272        let era_genesis = self.get_genesis_in_current_era();
273        let genesis_seq_num = self
274            .data_man
275            .local_block_info_by_hash(
276                &self.data_man.get_cur_consensus_era_genesis_hash(),
277            )
278            .expect("local_block_info for genesis must exist")
279            .get_seq_num();
280        let mut era_genesis_in_frontier = false;
281
282        while let Some(index) = self.old_era_blocks_frontier.pop_front() {
283            if index == era_genesis {
284                era_genesis_in_frontier = true;
285                continue;
286            }
287
288            // Remove node with index
289            if !self.old_era_blocks_frontier_set.contains(&index) {
290                continue;
291            }
292
293            let hash = self.arena[index].block_header.hash();
294            assert!(self.arena[index].parent == NULL);
295
296            if !self.is_graph_ready_in_db(&hash, genesis_seq_num) {
297                // This block has not been processed in consensus. Clearing it
298                // now may make its referrers not block-graph-ready.
299                // See https://github.com/Conflux-Chain/conflux-rust/issues/1426.
300                //
301                // The blocks pushed to `old_era_blocks_frontier` are all
302                // BlockGraphReady, so it's ensured that they will be inserted
303                // into consensus and their local block infos will be persisted.
304                continue;
305            }
306
307            let referees: Vec<usize> =
308                self.arena[index].referees.iter().map(|x| *x).collect();
309            for referee in referees {
310                self.arena[referee].referrers.retain(|&x| x != index);
311            }
312            let referee_hashes: Vec<H256> = self.arena[index]
313                .block_header
314                .referee_hashes()
315                .iter()
316                .map(|x| *x)
317                .collect();
318            for referee_hash in referee_hashes {
319                let mut remove_referee_hash: bool = false;
320                if let Some(referrers) =
321                    self.referrers_by_hash.get_mut(&referee_hash)
322                {
323                    referrers.retain(|&x| x != index);
324                    remove_referee_hash = referrers.len() == 0;
325                }
326                // clean empty key
327                if remove_referee_hash {
328                    self.referrers_by_hash.remove(&referee_hash);
329                }
330            }
331
332            let children: Vec<usize> =
333                self.arena[index].children.iter().map(|x| *x).collect();
334            for child in children {
335                self.arena[child].parent = NULL;
336                self.arena[child].parent_reclaimed = true;
337                // We will check `is_graph_ready_in_db` before garbage
338                // collecting the blocks in `old_era_blocks_frontier`,
339                // so we do not need to check graph-ready-related status here
340                // before inserting them.
341                self.old_era_blocks_frontier.push_back(child);
342                assert!(!self.old_era_blocks_frontier_set.contains(&child));
343                self.old_era_blocks_frontier_set.insert(child);
344            }
345
346            let referrers: Vec<usize> =
347                self.arena[index].referrers.iter().map(|x| *x).collect();
348            for referrer in referrers {
349                self.arena[referrer].referees.retain(|&x| x != index);
350            }
351
352            self.old_era_blocks_frontier_set.remove(&index);
353            self.arena.remove(index);
354            self.hash_to_arena_indices.remove(&hash);
355            // only remove block header in memory cache
356            self.data_man
357                .remove_block_header(&hash, false /* remove_db */);
358
359            num_cleared += 1;
360            if num_cleared == max_num_of_cleared_blocks {
361                break;
362            }
363        }
364
365        if era_genesis_in_frontier {
366            self.old_era_blocks_frontier.push_front(era_genesis);
367        }
368    }
369
370    pub fn insert_invalid(&mut self, header: Arc<BlockHeader>) -> usize {
371        let hash = header.hash();
372        let me = self.arena.insert(SynchronizationGraphNode {
373            graph_status: BLOCK_INVALID,
374            block_ready: false,
375            parent_reclaimed: false,
376            parent: NULL,
377            children: Vec::new(),
378            referees: Vec::new(),
379            pending_referee_count: 0,
380            referrers: Vec::new(),
381            block_header: header,
382            last_update_timestamp: SystemTime::now()
383                .duration_since(UNIX_EPOCH)
384                .unwrap()
385                .as_secs(),
386        });
387        self.hash_to_arena_indices.insert(hash, me);
388
389        if let Some(children) = self.children_by_hash.remove(&hash) {
390            for child in &children {
391                self.arena[*child].parent = me;
392            }
393            self.arena[me].children = children;
394        }
395        if let Some(referrers) = self.referrers_by_hash.remove(&hash) {
396            for referrer in &referrers {
397                let ref mut node_referrer = self.arena[*referrer];
398                node_referrer.referees.push(me);
399                debug_assert!(node_referrer.pending_referee_count > 0);
400                if node_referrer.pending_referee_count > 0 {
401                    node_referrer.pending_referee_count =
402                        node_referrer.pending_referee_count - 1;
403                }
404            }
405            self.arena[me].referrers = referrers;
406        }
407
408        me
409    }
410
411    /// Return the index of the inserted block.
412    pub fn insert(&mut self, header: Arc<BlockHeader>) -> usize {
413        let hash = header.hash();
414        let is_genesis =
415            hash == self.data_man.get_cur_consensus_era_genesis_hash();
416
417        let me = self.arena.insert(SynchronizationGraphNode {
418            graph_status: if is_genesis {
419                BLOCK_GRAPH_READY
420            } else {
421                BLOCK_HEADER_ONLY
422            },
423            block_ready: is_genesis,
424            parent_reclaimed: false,
425            parent: NULL,
426            children: Vec::new(),
427            referees: Vec::new(),
428            pending_referee_count: 0,
429            referrers: Vec::new(),
430            block_header: header.clone(),
431            last_update_timestamp: SystemTime::now()
432                .duration_since(UNIX_EPOCH)
433                .unwrap()
434                .as_secs(),
435        });
436        self.hash_to_arena_indices.insert(hash, me);
437
438        if !is_genesis {
439            let parent_hash = header.parent_hash().clone();
440            if let Some(parent) =
441                self.hash_to_arena_indices.get(&parent_hash).cloned()
442            {
443                self.arena[me].parent = parent;
444                self.arena[parent].children.push(me);
445            } else {
446                self.children_by_hash
447                    .entry(parent_hash)
448                    .or_insert(Vec::new())
449                    .push(me);
450            }
451        }
452        for referee_hash in header.referee_hashes() {
453            if let Some(referee) =
454                self.hash_to_arena_indices.get(referee_hash).cloned()
455            {
456                self.arena[me].referees.push(referee);
457                self.arena[referee].referrers.push(me);
458            } else {
459                self.arena[me].pending_referee_count =
460                    self.arena[me].pending_referee_count + 1;
461                self.referrers_by_hash
462                    .entry(*referee_hash)
463                    .or_insert(Vec::new())
464                    .push(me);
465            }
466        }
467
468        if let Some(children) = self.children_by_hash.remove(&hash) {
469            for child in &children {
470                self.arena[*child].parent = me;
471            }
472            self.arena[me].children = children;
473        }
474        if let Some(referrers) = self.referrers_by_hash.remove(&hash) {
475            for referrer in &referrers {
476                let ref mut node_referrer = self.arena[*referrer];
477                node_referrer.referees.push(me);
478                debug_assert!(node_referrer.pending_referee_count > 0);
479                if node_referrer.pending_referee_count > 0 {
480                    node_referrer.pending_referee_count =
481                        node_referrer.pending_referee_count - 1;
482                }
483            }
484            self.arena[me].referrers = referrers;
485        }
486
487        me
488    }
489
490    // TODO local_block_info is also loaded for invalid check, so maybe we can
491    // refactor code to avoid loading it twice.
492    fn is_graph_ready_in_db(
493        &self, parent_or_referee_hash: &H256, genesis_seq_num: u64,
494    ) -> bool {
495        if let Some(info) = self
496            .data_man
497            .local_block_info_by_hash(parent_or_referee_hash)
498        {
499            if info.get_status() == BlockStatus::Invalid {
500                false
501            } else {
502                info.get_seq_num() < genesis_seq_num
503                    || info.get_instance_id() == self.data_man.get_instance_id()
504            }
505        } else {
506            false
507        }
508    }
509
510    fn new_to_be_graph_ready(
511        &mut self, index: usize, minimal_status: u8,
512    ) -> bool {
513        let ref node_me = self.arena[index];
514        // If a block has become graph-ready before and reclaimed,
515        // it will be marked as `already_processed`
516        // in `insert_block_header`, so we do not need to handle this case here.
517        // And thus we also won't propagate graph-ready to already processed
518        // blocks.
519        if node_me.graph_status >= minimal_status {
520            return false;
521        }
522
523        let genesis_hash = self.data_man.get_cur_consensus_era_genesis_hash();
524        let genesis_seq_num = self
525            .data_man
526            .local_block_info_by_hash(&genesis_hash)
527            .expect("local_block_info for genesis must exist")
528            .get_seq_num();
529        let parent = self.arena[index].parent;
530        let parent_graph_ready = if parent == NULL {
531            self.arena[index].parent_reclaimed
532                || self.is_graph_ready_in_db(
533                    self.arena[index].block_header.parent_hash(),
534                    genesis_seq_num,
535                )
536        } else {
537            self.arena[parent].graph_status >= minimal_status
538        };
539
540        if !parent_graph_ready {
541            return false;
542        } else if parent == NULL {
543            self.arena[index].parent_reclaimed = true;
544        }
545
546        // check whether referees are `BLOCK_HEADER_GRAPH_READY`
547        // 1. referees which are in
548        // memory and status is BLOCK_HEADER_GRAPH_READY.
549        // 2. referees
550        // which are not in memory and not invalid in disk
551        // (assume these blocks are BLOCK_GRAPH_READY)
552        let mut referee_hash_in_mem = HashSet::new();
553        for referee in self.arena[index].referees.iter() {
554            if self.arena[*referee].graph_status < minimal_status {
555                return false;
556            } else {
557                referee_hash_in_mem
558                    .insert(self.arena[*referee].block_header.hash());
559            }
560        }
561
562        for referee_hash in self.arena[index].block_header.referee_hashes() {
563            if !referee_hash_in_mem.contains(referee_hash) {
564                if !self.is_graph_ready_in_db(referee_hash, genesis_seq_num) {
565                    return false;
566                }
567            }
568        }
569
570        if !self.is_pos_reference_graph_ready(
571            index,
572            genesis_seq_num,
573            minimal_status,
574        ) {
575            debug!(
576                "Block {:?} not ready for its pos_reference: {:?}",
577                self.arena[index].block_header.hash(),
578                self.pos_verifier.get_pivot_decision(
579                    self.arena[index]
580                        .block_header
581                        .pos_reference()
582                        .as_ref()
583                        .unwrap()
584                )
585            );
586            // All its future will remain not ready.
587            self.pos_not_ready_blocks_frontier.insert(index);
588            return false;
589        }
590
591        // parent and referees are all header graph ready.
592        true
593    }
594
595    fn new_to_be_header_graph_ready(&mut self, index: usize) -> bool {
596        self.new_to_be_graph_ready(index, BLOCK_HEADER_GRAPH_READY)
597    }
598
599    fn new_to_be_block_graph_ready(&mut self, index: usize) -> bool {
600        self.new_to_be_graph_ready(index, BLOCK_GRAPH_READY)
601            && self.arena[index].block_ready
602    }
603
604    fn is_pos_reference_graph_ready(
605        &self, index: usize, genesis_seq_num: u64, minimal_status: u8,
606    ) -> bool {
607        // Check if the pos reference is committed.
608        match self.arena[index].block_header.pos_reference() {
609            // TODO(lpl): Should we check if the pos reference will never be
610            // committed?
611            Some(pos_reference) => {
612                match self.pos_verifier.get_pivot_decision(pos_reference) {
613                    // The pos reference has not been committed.
614                    None => false,
615                    Some(pivot_decision) => {
616                        // Check if this pivot_decision is graph_ready.
617                        match self.hash_to_arena_indices.get(&pivot_decision) {
618                            None => self.is_graph_ready_in_db(
619                                &pivot_decision,
620                                genesis_seq_num,
621                            ),
622                            Some(index) => {
623                                self.arena[*index].graph_status
624                                    >= minimal_status
625                            }
626                        }
627                    }
628                }
629            }
630            None => true,
631        }
632    }
633
634    // Get parent (height, timestamp, gas_limit, difficulty,
635    // parent_and_referee_pos_references) This function assumes that the
636    // parent and referee information MUST exist in memory or in disk.
637    fn get_parent_and_referee_info(
638        &self, index: usize,
639    ) -> (u64, u64, U256, U256, Vec<Option<PosBlockId>>) {
640        let parent_height;
641        let parent_timestamp;
642        let parent_gas_limit;
643        let parent_difficulty;
644        // Since eventually all blocks should have pos_references, we do not
645        // try to avoid loading them here before PoS is enabled.
646        let mut pos_references = Vec::new();
647        let parent = self.arena[index].parent;
648
649        // Get info for parent.
650        if parent != NULL {
651            parent_height = self.arena[parent].block_header.height();
652            parent_timestamp = self.arena[parent].block_header.timestamp();
653            parent_gas_limit = *self.arena[parent].block_header.gas_limit();
654            parent_difficulty = *self.arena[parent].block_header.difficulty();
655            pos_references
656                .push(self.arena[parent].block_header.pos_reference().clone())
657        } else {
658            let parent_hash = self.arena[index].block_header.parent_hash();
659            let parent_header = self
660                .data_man
661                .block_header_by_hash(parent_hash)
662                .unwrap()
663                .clone();
664            parent_height = parent_header.height();
665            parent_timestamp = parent_header.timestamp();
666            parent_gas_limit = *parent_header.gas_limit();
667            parent_difficulty = *parent_header.difficulty();
668            pos_references.push(parent_header.pos_reference().clone());
669        }
670
671        // Get pos references for referees.
672        let mut referee_hash_in_mem = HashSet::new();
673        for referee in self.arena[index].referees.iter() {
674            pos_references.push(
675                self.arena[*referee].block_header.pos_reference().clone(),
676            );
677            referee_hash_in_mem
678                .insert(self.arena[*referee].block_header.hash());
679        }
680
681        for referee_hash in self.arena[index].block_header.referee_hashes() {
682            if !referee_hash_in_mem.contains(referee_hash) {
683                let referee_header = self
684                    .data_man
685                    .block_header_by_hash(referee_hash)
686                    .unwrap()
687                    .clone();
688                pos_references.push(referee_header.pos_reference().clone());
689            }
690        }
691
692        (
693            parent_height,
694            parent_timestamp,
695            parent_gas_limit,
696            parent_difficulty,
697            pos_references,
698        )
699    }
700
701    fn verify_header_graph_ready_block(
702        &self, index: usize,
703    ) -> Result<(), Error> {
704        let epoch = self.arena[index].block_header.height();
705        let (
706            parent_height,
707            parent_timestamp,
708            parent_gas_limit,
709            parent_difficulty,
710            predecessor_pos_references,
711        ) = self.get_parent_and_referee_info(index);
712
713        // Verify the height and epoch numbers are correct
714        if parent_height + 1 != epoch {
715            warn!("Invalid height. mine {}, parent {}", epoch, parent_height);
716            return Err(From::from(BlockError::InvalidHeight(Mismatch {
717                expected: parent_height + 1,
718                found: epoch,
719            })));
720        }
721
722        // Verify the timestamp being correctly set.
723        // Conflux tries to maintain the timestamp drift among blocks
724        // in the graph, which probably being generated at the same time,
725        // within a small bound (specified by ACCEPTABLE_TIME_DRIFT).
726        // This is achieved through the following mechanism. Anytime
727        // when receiving a new block from the peer, if the timestamp of
728        // the block is more than ACCEPTABLE_TIME_DRIFT later than the
729        // current timestamp of the node, the block is postponed to be
730        // added into the graph until the current timestamp passes the
731        // the timestamp of the block. Otherwise, this block can be added
732        // into the graph.
733        // Meanwhile, Conflux also requires that the timestamp of each
734        // block must be later than or equal to its parent's timestamp.
735        // This is achieved through adjusting the timestamp of a newly
736        // generated block to the one later than its parent's timestamp.
737        // This is also enough for difficulty adjustment computation where
738        // the timespan in the adjustment period is only computed based on
739        // timestamps of pivot chain blocks.
740        let my_timestamp = self.arena[index].block_header.timestamp();
741        if parent_timestamp > my_timestamp {
742            let my_timestamp = UNIX_EPOCH + Duration::from_secs(my_timestamp);
743            let pa_timestamp =
744                UNIX_EPOCH + Duration::from_secs(parent_timestamp);
745
746            warn!("Invalid timestamp: parent {:?} timestamp {}, me {:?} timestamp {}",
747                  self.arena[index].block_header.parent_hash().clone(),
748                  parent_timestamp,
749                  self.arena[index].block_header.hash(),
750                  self.arena[index].block_header.timestamp());
751            return Err(From::from(BlockError::InvalidTimestamp(
752                OutOfBounds {
753                    max: Some(my_timestamp),
754                    min: Some(pa_timestamp),
755                    found: my_timestamp,
756                },
757            )));
758        }
759
760        let parent_gas_limit = parent_gas_limit
761            * if epoch == self.machine.params().transition_heights.cip1559 {
762                ELASTICITY_MULTIPLIER
763            } else {
764                1
765            };
766
767        // Verify the gas limit is respected
768        let self_gas_limit = *self.arena[index].block_header.gas_limit();
769        let gas_limit_divisor = self.machine.params().gas_limit_bound_divisor;
770        let min_gas_limit = self.machine.params().min_gas_limit;
771        let gas_upper =
772            parent_gas_limit + parent_gas_limit / gas_limit_divisor - 1;
773        let gas_lower = max(
774            parent_gas_limit - parent_gas_limit / gas_limit_divisor + 1,
775            min_gas_limit,
776        );
777
778        if self_gas_limit < gas_lower || self_gas_limit > gas_upper {
779            return Err(From::from(BlockError::InvalidGasLimit(OutOfBounds {
780                min: Some(gas_lower),
781                max: Some(gas_upper),
782                found: self_gas_limit,
783            })));
784        }
785
786        if !self.config.is_consortium {
787            // Verify difficulty being correctly set
788            let mut difficulty_invalid = false;
789            let my_diff = *self.arena[index].block_header.difficulty();
790            let mut min_diff = my_diff;
791            let mut max_diff = my_diff;
792            let initial_difficulty: U256 =
793                self.pow_config.initial_difficulty.into();
794
795            if parent_height
796                < self
797                    .pow_config
798                    .difficulty_adjustment_epoch_period(parent_height)
799            {
800                if my_diff != initial_difficulty {
801                    difficulty_invalid = true;
802                    min_diff = initial_difficulty;
803                    max_diff = initial_difficulty;
804                }
805            } else {
806                let last_period_upper = (parent_height
807                    / self
808                        .pow_config
809                        .difficulty_adjustment_epoch_period(parent_height))
810                    * self
811                        .pow_config
812                        .difficulty_adjustment_epoch_period(parent_height);
813                if last_period_upper != parent_height {
814                    // parent_epoch should not trigger difficulty adjustment
815                    if my_diff != parent_difficulty {
816                        difficulty_invalid = true;
817                        min_diff = parent_difficulty;
818                        max_diff = parent_difficulty;
819                    }
820                } else {
821                    let (lower, upper) =
822                        self.pow_config.get_adjustment_bound(parent_difficulty);
823                    min_diff = lower;
824                    max_diff = upper;
825                    if my_diff < min_diff || my_diff > max_diff {
826                        difficulty_invalid = true;
827                    }
828                }
829            }
830
831            if difficulty_invalid {
832                return Err(From::from(BlockError::InvalidDifficulty(
833                    OutOfBounds {
834                        min: Some(min_diff),
835                        max: Some(max_diff),
836                        found: my_diff,
837                    },
838                )));
839            }
840        }
841
842        if let Some(pos_reference) =
843            self.arena[index].block_header.pos_reference()
844        {
845            let mut pred_pos_ref_list = Vec::new();
846            for maybe_pos_ref in predecessor_pos_references {
847                if let Some(pos_ref) = maybe_pos_ref {
848                    pred_pos_ref_list.push(pos_ref);
849                }
850            }
851            if !self
852                .pos_verifier
853                .verify_against_predecessors(pos_reference, &pred_pos_ref_list)
854            {
855                bail!(BlockError::InvalidPosReference);
856            }
857        }
858
859        Ok(())
860    }
861
862    fn verify_graph_ready_block(
863        &self, index: usize, verification_config: &VerificationConfig,
864    ) -> Result<(), Error> {
865        let block_header = &self.arena[index].block_header;
866        let parent = self
867            .data_man
868            .block_header_by_hash(block_header.parent_hash())
869            .expect("headers will not be deleted");
870        let block = self
871            .data_man
872            .block_by_hash(&block_header.hash(), true)
873            .expect("received");
874        verification_config.verify_sync_graph_ready_block(&block, &parent)
875    }
876
877    fn process_invalid_blocks(&mut self, invalid_set: &HashSet<usize>) {
878        for index in invalid_set {
879            let hash = self.arena[*index].block_header.hash();
880            // Mark this block as invalid, so we don't need to request/verify it
881            // again
882            self.data_man.invalidate_block(hash);
883        }
884        self.remove_blocks(&invalid_set);
885    }
886
887    fn remove_blocks(&mut self, to_remove_set: &HashSet<usize>) {
888        for index in to_remove_set {
889            let hash = self.arena[*index].block_header.hash();
890            self.not_ready_blocks_frontier.remove(index);
891            self.pos_not_ready_blocks_frontier.remove(index);
892            self.old_era_blocks_frontier_set.remove(index);
893            // This include invalid blocks and blocks not received after a long
894            // time.
895            self.block_to_fill_set.remove(&hash);
896
897            let parent = self.arena[*index].parent;
898            if parent != NULL {
899                self.arena[parent].children.retain(|&x| x != *index);
900            }
901            let parent_hash = *self.arena[*index].block_header.parent_hash();
902            let mut remove_parent_hash: bool = false;
903            if let Some(children) = self.children_by_hash.get_mut(&parent_hash)
904            {
905                children.retain(|&x| x != *index);
906                remove_parent_hash = children.len() == 0;
907            }
908            // clean empty hash key
909            if remove_parent_hash {
910                self.children_by_hash.remove(&parent_hash);
911            }
912
913            let referees: Vec<usize> =
914                self.arena[*index].referees.iter().map(|x| *x).collect();
915            for referee in referees {
916                self.arena[referee].referrers.retain(|&x| x != *index);
917            }
918            let referee_hashes: Vec<H256> = self.arena[*index]
919                .block_header
920                .referee_hashes()
921                .iter()
922                .map(|x| *x)
923                .collect();
924            for referee_hash in referee_hashes {
925                let mut remove_referee_hash: bool = false;
926                if let Some(referrers) =
927                    self.referrers_by_hash.get_mut(&referee_hash)
928                {
929                    referrers.retain(|&x| x != *index);
930                    remove_referee_hash = referrers.len() == 0;
931                }
932                // clean empty hash key
933                if remove_referee_hash {
934                    self.referrers_by_hash.remove(&referee_hash);
935                }
936            }
937
938            let children: Vec<usize> =
939                self.arena[*index].children.iter().map(|x| *x).collect();
940            for child in children {
941                debug_assert!(to_remove_set.contains(&child));
942                self.arena[child].parent = NULL;
943            }
944
945            let referrers: Vec<usize> =
946                self.arena[*index].referrers.iter().map(|x| *x).collect();
947            for referrer in referrers {
948                debug_assert!(to_remove_set.contains(&referrer));
949                self.arena[referrer].referees.retain(|&x| x != *index);
950            }
951
952            self.arena.remove(*index);
953            self.hash_to_arena_indices.remove(&hash);
954            // remove header/block in memory cache and header/block in db
955            self.data_man
956                .remove_useless_block(&hash, true /* remove_db */);
957        }
958    }
959
960    fn set_and_propagate_invalid(
961        &mut self, queue: &mut VecDeque<usize>,
962        invalid_set: &mut HashSet<usize>, index: usize,
963    ) {
964        let children =
965            mem::replace(&mut self.arena[index].children, Vec::new());
966        for child in &children {
967            if !invalid_set.contains(&child) {
968                self.arena[*child].graph_status = BLOCK_INVALID;
969                queue.push_back(*child);
970                invalid_set.insert(*child);
971            }
972        }
973        self.arena[index].children = children;
974        let referrers =
975            mem::replace(&mut self.arena[index].referrers, Vec::new());
976        for referrer in &referrers {
977            if !invalid_set.contains(&referrer) {
978                self.arena[*referrer].graph_status = BLOCK_INVALID;
979                queue.push_back(*referrer);
980                invalid_set.insert(*referrer);
981            }
982        }
983        self.arena[index].referrers = referrers;
984    }
985}
986
987/// Manages the lifecycle of the consensus worker thread.
988/// On drop, unsubscribes from the channel (closing the sender) so the worker's
989/// `recv_blocking()` returns `None` and the loop exits naturally, then joins.
990struct ConsensusWorkerHandle {
991    thread: Mutex<Option<thread::JoinHandle<()>>>,
992    /// Channel + subscription ID for unsubscribe-based shutdown.
993    new_block_hashes: Arc<Channel<H256>>,
994    worker_subscription_id: u64,
995}
996
997impl ConsensusWorkerHandle {
998    fn stop(&self) {
999        self.new_block_hashes
1000            .unsubscribe(self.worker_subscription_id);
1001        if let Some(handle) = self.thread.lock().take() {
1002            handle.join().expect("Consensus Worker should not panic");
1003        }
1004    }
1005}
1006
1007impl Drop for ConsensusWorkerHandle {
1008    fn drop(&mut self) { self.stop(); }
1009}
1010
1011pub struct SynchronizationGraph {
1012    pub inner: Arc<RwLock<SynchronizationGraphInner>>,
1013    pub consensus: SharedConsensusGraph,
1014    pub data_man: Arc<BlockDataManager>,
1015    pub pow: Arc<PowComputer>,
1016    pub verification_config: VerificationConfig,
1017    pub sync_config: SyncGraphConfig,
1018    pub statistics: SharedStatistics,
1019    /// This is the boolean state shared with the underlying consensus worker
1020    /// to indicate whether the worker is now finished all pending blocks.
1021    /// Since the critical section is very short, a `Mutex` is enough.
1022    consensus_unprocessed_count: Arc<AtomicUsize>,
1023
1024    /// Channel used to send block hashes to `ConsensusGraph` and PubSub.
1025    /// Each element is <block_hash, ignore_body>
1026    new_block_hashes: Arc<Channel<H256>>,
1027
1028    /// The blocks whose timestamps are near future.
1029    /// They will be inserted into sync graph inner at their timestamp.
1030    pub future_blocks: FutureBlockContainer,
1031
1032    machine: Arc<Machine>,
1033
1034    /// Handle to the consensus worker thread; joined on drop.
1035    #[allow(unused)]
1036    consensus_worker_handle: ConsensusWorkerHandle,
1037}
1038
1039impl MallocSizeOf for SynchronizationGraph {
1040    fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
1041        let inner_size = self.inner.read().size_of(ops);
1042        let mut malloc_size = inner_size + self.data_man.size_of(ops);
1043
1044        // TODO: Add statistics for consortium.
1045        if !self.is_consortium() {
1046            let consensus_graph = &*self.consensus;
1047            malloc_size += consensus_graph.size_of(ops);
1048        }
1049        // Does not count size_of machine.
1050
1051        malloc_size
1052    }
1053}
1054
1055pub type SharedSynchronizationGraph = Arc<SynchronizationGraph>;
1056
1057impl SynchronizationGraph {
1058    pub fn new(
1059        consensus: SharedConsensusGraph, data_man: Arc<BlockDataManager>,
1060        statistics: SharedStatistics, verification_config: VerificationConfig,
1061        pow_config: ProofOfWorkConfig, pow: Arc<PowComputer>,
1062        sync_config: SyncGraphConfig, notifications: Arc<Notifications>,
1063        machine: Arc<Machine>, pos_verifier: Arc<PosVerifier>,
1064    ) -> Self {
1065        let genesis_hash = data_man.get_cur_consensus_era_genesis_hash();
1066        let genesis_block_header = data_man
1067            .block_header_by_hash(&genesis_hash)
1068            .expect("genesis block header should exist here");
1069
1070        // It should not be initialized to `true` now, otherwise consensus
1071        // worker will be blocked on waiting the first block forever.
1072        let consensus_unprocessed_count = Arc::new(AtomicUsize::new(0));
1073        let mut consensus_receiver = notifications.new_block_hashes.subscribe();
1074        let inner = Arc::new(RwLock::new(
1075            SynchronizationGraphInner::with_genesis_block(
1076                genesis_block_header.clone(),
1077                pow_config,
1078                pow.clone(),
1079                sync_config,
1080                data_man.clone(),
1081                machine.clone(),
1082                pos_verifier.clone(),
1083            ),
1084        ));
1085        let worker_subscription_id = consensus_receiver.id;
1086
1087        // Clone Arcs before moving them into the worker thread closure.
1088        let worker_data_man = data_man.clone();
1089        let worker_consensus = consensus.clone();
1090        let worker_unprocessed_count = consensus_unprocessed_count.clone();
1091
1092        // It receives `BLOCK_GRAPH_READY` blocks in order and handles them in
1093        // `ConsensusGraph`
1094        let handle = thread::Builder::new()
1095            .name("Consensus Worker".into())
1096            // TODO: extract it in a seperated file
1097            .spawn(move || {
1098                // The Consensus Worker will prioritize blocks based on its parent epoch number while respecting the topological order. This has the following two benefits:
1099                //
1100                // 1. It will almost make sure that the self mined block being processed first
1101                //
1102                // 2. In case of a DoS attack that a malicious player releases a large chunk of old blocks. This strategy will make the consensus to process the meaningful blocks first.
1103                let mut priority_queue: BinaryHeap<(u64, H256)> = BinaryHeap::new();
1104                let mut reverse_map : HashMap<H256, Vec<H256>> = HashMap::new();
1105                let mut counter_map = HashMap::new();
1106                let mut pos_started = false;
1107
1108                'outer: loop {
1109                    // Only block when we have processed all received blocks.
1110                    let mut blocking = priority_queue.is_empty();
1111                    'inner: loop {
1112                        // Use blocking `recv_blocking` for the first element, and then
1113                        // drain the receiver with non-blocking `try_recv`.
1114                        let maybe_item = if blocking {
1115                            blocking = false;
1116                            match consensus_receiver.recv_blocking() {
1117                                Some(item) => Ok(item),
1118                                None => break 'outer, // channel closed (unsubscribed)
1119                            }
1120                        } else {
1121                            consensus_receiver.try_recv()
1122                        };
1123
1124                        match maybe_item {
1125                            // FIXME: We need to investigate why duplicate hash may send to the consensus worker
1126                            Ok(hash) => if !reverse_map.contains_key(&hash) {
1127                                debug!("Worker thread receive: block = {}", hash);
1128                                let header = worker_data_man.block_header_by_hash(&hash).expect("Header must exist before sending to the consensus worker!");
1129
1130                                // start pos with an era advance.
1131                                if !pos_started && pos_verifier.is_enabled_at_height(header.height() + worker_consensus.config().inner_conf.era_epoch_count) {
1132                                    if let Err(e) = pos_verifier.initialize(worker_consensus.clone()) {
1133                                        info!("PoS cannot be started at the expected height: e={}", e);
1134                                    } else {
1135                                        pos_started = true;
1136                                    }
1137                                }
1138
1139                                let mut cnt: usize = 0;
1140                                let parent_hash = header.parent_hash();
1141                                if let Some(v) = reverse_map.get_mut(parent_hash) {
1142                                    v.push(hash.clone());
1143                                    cnt += 1;
1144                                }
1145                                for referee in header.referee_hashes() {
1146                                    if let Some(v) = reverse_map.get_mut(referee) {
1147                                        v.push(hash.clone());
1148                                        cnt += 1;
1149                                    }
1150                                }
1151                                if let Some(pivot_decision) = header.pos_reference().as_ref().and_then(|pos_reference| pos_verifier.get_pivot_decision(pos_reference)) {
1152                                    if let Some(v) = reverse_map.get_mut(&pivot_decision) {
1153                                        v.push(hash.clone());
1154                                        cnt += 1;
1155                                    }
1156                                }
1157                                reverse_map.insert(hash.clone(), Vec::new());
1158                                if cnt == 0 {
1159                                    let epoch_number = worker_consensus.get_block_epoch_number(parent_hash).unwrap_or(0);
1160                                    priority_queue.push((epoch_number, hash));
1161                                } else {
1162                                    counter_map.insert(hash, cnt);
1163                                }
1164                            } else {
1165                                warn!("Duplicate block = {} sent to the consensus worker", hash);
1166                            },
1167                            Err(TryRecvError::Empty) => break 'inner,
1168                            Err(TryRecvError::Disconnected) => break 'outer,
1169                        }
1170                    }
1171                    if let Some((_, hash)) = priority_queue.pop() {
1172                        CONSENSUS_WORKER_QUEUE.dequeue(1);
1173                        let successors = reverse_map.remove(&hash).unwrap();
1174                        for succ in successors {
1175                            let cnt_tuple = counter_map.get_mut(&succ).unwrap();
1176                            *cnt_tuple -= 1;
1177                            if *cnt_tuple == 0 {
1178                                counter_map.remove(&succ);
1179                                let header_succ = worker_data_man.block_header_by_hash(&succ).expect("Header must exist before sending to the consensus worker!");
1180                                let parent_succ = header_succ.parent_hash();
1181                                let epoch_number = worker_consensus.get_block_epoch_number(parent_succ).unwrap_or(0);
1182                                priority_queue.push((epoch_number, succ));
1183                            }
1184                        }
1185                        worker_consensus.on_new_block(
1186                            &hash,
1187                        );
1188                        worker_unprocessed_count.fetch_sub(1, Ordering::SeqCst);
1189                    }
1190                }
1191            })
1192            .expect("Cannot fail");
1193
1194        let consensus_worker_handle = ConsensusWorkerHandle {
1195            thread: Mutex::new(Some(handle)),
1196            new_block_hashes: notifications.new_block_hashes.clone(),
1197            worker_subscription_id,
1198        };
1199
1200        let sync_graph = SynchronizationGraph {
1201            inner: inner.clone(),
1202            future_blocks: FutureBlockContainer::new(
1203                sync_config.future_block_buffer_capacity,
1204            ),
1205            data_man: data_man.clone(),
1206            pow: pow.clone(),
1207            verification_config,
1208            sync_config,
1209            consensus: consensus.clone(),
1210            statistics: statistics.clone(),
1211            consensus_unprocessed_count: consensus_unprocessed_count.clone(),
1212            new_block_hashes: notifications.new_block_hashes.clone(),
1213            machine,
1214            consensus_worker_handle,
1215        };
1216        sync_graph
1217    }
1218
1219    pub fn is_consortium(&self) -> bool { self.sync_config.is_consortium }
1220
1221    pub fn machine(&self) -> Arc<Machine> { self.machine.clone() }
1222
1223    pub fn get_genesis_hash_and_height_in_current_era(&self) -> (H256, u64) {
1224        self.inner
1225            .read()
1226            .get_genesis_hash_and_height_in_current_era()
1227    }
1228
1229    /// Compute the expected difficulty for a block given its
1230    /// parent hash
1231    pub fn expected_difficulty(&self, parent_hash: &H256) -> U256 {
1232        self.consensus.expected_difficulty(parent_hash)
1233    }
1234
1235    pub fn get_to_propagate_trans(
1236        &self,
1237    ) -> HashMap<H256, Arc<SignedTransaction>> {
1238        self.consensus.tx_pool().get_to_be_propagated_transactions()
1239    }
1240
1241    pub fn set_to_propagate_trans(
1242        &self, transactions: HashMap<H256, Arc<SignedTransaction>>,
1243    ) {
1244        self.consensus
1245            .tx_pool()
1246            .set_to_be_propagated_transactions(transactions);
1247    }
1248
1249    /// In full/archive node, this function can be invoked during
1250    /// CatchUpRecoverBlockHeaderFromDbPhase phase.
1251    /// It tries to construct the consensus graph based on header
1252    /// information stored in db.
1253    pub fn recover_graph_from_db(&self) {
1254        info!("Start fast recovery of the block DAG from database");
1255
1256        // Recover the initial sequence number in consensus graph
1257        // based on the sequence number of genesis block in db.
1258        let genesis_hash = self.data_man.get_cur_consensus_era_genesis_hash();
1259        let genesis_local_info =
1260            self.data_man.local_block_info_by_hash(&genesis_hash);
1261        if genesis_local_info.is_none() {
1262            // Local info of genesis block must exist.
1263            panic!(
1264                "failed to get local block info from db for genesis[{}]",
1265                genesis_hash
1266            );
1267        }
1268        let genesis_seq_num = genesis_local_info.unwrap().get_seq_num();
1269        self.consensus.set_initial_sequence_number(genesis_seq_num);
1270        let genesis_header =
1271            self.data_man.block_header_by_hash(&genesis_hash).unwrap();
1272        debug!(
1273            "Get current genesis_block hash={:?}, height={}, seq_num={}",
1274            genesis_hash,
1275            genesis_header.height(),
1276            genesis_seq_num
1277        );
1278
1279        // Get terminals stored in db.
1280        let terminals_opt = self.data_man.terminals_from_db();
1281        if terminals_opt.is_none() {
1282            return;
1283        }
1284        let terminals = terminals_opt.unwrap();
1285        debug!("Get terminals {:?}", terminals);
1286
1287        // Reconstruct the consensus graph by traversing backward from
1288        // terminals. This traversal will visit all the blocks under the
1289        // future of current era genesis till the terminals. However,
1290        // some blocks may not be graph-ready since they may have
1291        // references or parents which are out of the current era. We
1292        // need to resolve these out-of-era dependencies later and make
1293        // those blocks be graph-ready again.
1294        let mut queue = VecDeque::new();
1295        let mut visited_blocks: HashSet<H256> = HashSet::new();
1296        for terminal in terminals {
1297            // header terminals and block terminals may contain the same hash
1298            if !visited_blocks.contains(&terminal) {
1299                queue.push_back(terminal);
1300                visited_blocks.insert(terminal);
1301            }
1302        }
1303
1304        // Remember the hashes of blocks that belong to the current genesis
1305        // era but are missed in db. The missed blocks will be fetched from
1306        // peers.
1307        let mut missed_hashes = HashSet::new();
1308        while let Some(hash) = queue.pop_front() {
1309            if hash == genesis_hash {
1310                // Genesis block is already in consensus graph.
1311                continue;
1312            }
1313
1314            // Ignore blocks beyond the future of current genesis era.
1315            // If block_local_info is missing, consider it is in current
1316            // genesis era.
1317            if let Some(block_local_info) =
1318                self.data_man.local_block_info_by_hash(&hash)
1319            {
1320                if block_local_info.get_seq_num() < genesis_seq_num {
1321                    debug!(
1322                        "Skip block {:?} before checkpoint: seq_num={}",
1323                        hash,
1324                        block_local_info.get_seq_num()
1325                    );
1326                    continue;
1327                }
1328            }
1329
1330            if let Some(block_header) =
1331                self.data_man.block_header_by_hash(&hash)
1332            {
1333                self.insert_block_header(
1334                    &mut block_header.as_ref().clone(),
1335                    true,  /* need_to_verify */
1336                    false, /* bench_mode */
1337                    true,  /* insert_to_consensus */
1338                    false, /* persistent */
1339                );
1340                let parent = block_header.parent_hash().clone();
1341                let referees = block_header.referee_hashes().clone();
1342                if !visited_blocks.contains(&parent) {
1343                    queue.push_back(parent);
1344                    visited_blocks.insert(parent);
1345                }
1346                for referee in referees {
1347                    if !visited_blocks.contains(&referee) {
1348                        queue.push_back(referee);
1349                        visited_blocks.insert(referee);
1350                    }
1351                }
1352            } else {
1353                missed_hashes.insert(hash);
1354            }
1355        }
1356
1357        debug!(
1358            "Current frontier after recover from db: {:?}",
1359            self.inner.read().not_ready_blocks_frontier.get_frontier()
1360        );
1361
1362        info!("Finish reconstructing the pivot chain of length {}, start to sync from peers", self.consensus.best_epoch_number());
1363    }
1364
1365    /// Return None if `hash` is not in sync graph
1366    pub fn block_header_by_hash(&self, hash: &H256) -> Option<BlockHeader> {
1367        if !self.contains_block_header(hash) {
1368            // Only return headers in sync graph
1369            return None;
1370        }
1371        self.data_man
1372            .block_header_by_hash(hash)
1373            .map(|header_ref| header_ref.as_ref().clone())
1374    }
1375
1376    /// Return None if `hash` is not in sync graph
1377    pub fn block_height_by_hash(&self, hash: &H256) -> Option<u64> {
1378        self.block_header_by_hash(hash)
1379            .map(|header| header.height())
1380    }
1381
1382    /// Return None if `hash` is not in sync graph
1383    pub fn block_timestamp_by_hash(&self, hash: &H256) -> Option<u64> {
1384        self.block_header_by_hash(hash)
1385            .map(|header| header.timestamp())
1386    }
1387
1388    /// TODO Be more specific about which functions only return in-memory data
1389    /// and which can return the in-database data
1390    pub fn block_by_hash(&self, hash: &H256) -> Option<Arc<Block>> {
1391        self.data_man.block_by_hash(hash, true /* update_cache */)
1392    }
1393
1394    pub fn contains_block_header(&self, hash: &H256) -> bool {
1395        self.inner.read().hash_to_arena_indices.contains_key(hash)
1396            || self.future_blocks.contains(hash)
1397    }
1398
1399    fn parent_or_referees_invalid(&self, header: &BlockHeader) -> bool {
1400        self.data_man.verified_invalid(header.parent_hash()).0
1401            || header
1402                .referee_hashes()
1403                .iter()
1404                .any(|referee| self.data_man.verified_invalid(referee).0)
1405    }
1406
1407    /// subroutine called by `insert_block_header` and `remove_expire_blocks`
1408    fn propagate_header_graph_status(
1409        &self, inner: &mut SynchronizationGraphInner,
1410        frontier_index_list: Vec<usize>, need_to_verify: bool,
1411        header_index_to_insert: usize, insert_to_consensus: bool,
1412        persistent: bool,
1413    ) -> (HashSet<usize>, Vec<H256>) {
1414        let now = SystemTime::now()
1415            .duration_since(UNIX_EPOCH)
1416            .unwrap()
1417            .as_secs();
1418        let mut need_to_relay: Vec<H256> = Vec::new();
1419        let mut invalid_set: HashSet<usize> = HashSet::new();
1420        let mut queue = VecDeque::new();
1421
1422        for index in frontier_index_list {
1423            if inner.arena[index].graph_status == BLOCK_INVALID {
1424                invalid_set.insert(index);
1425            }
1426            queue.push_back(index);
1427        }
1428
1429        while let Some(index) = queue.pop_front() {
1430            if inner.arena[index].graph_status == BLOCK_INVALID {
1431                inner.set_and_propagate_invalid(
1432                    &mut queue,
1433                    &mut invalid_set,
1434                    index,
1435                );
1436            } else if inner.new_to_be_header_graph_ready(index) {
1437                inner.arena[index].graph_status = BLOCK_HEADER_GRAPH_READY;
1438                inner.arena[index].last_update_timestamp = now;
1439                debug!("BlockIndex {} parent_index {} hash {:?} is header graph ready", index,
1440                           inner.arena[index].parent, inner.arena[index].block_header.hash());
1441
1442                let r = inner.verify_header_graph_ready_block(index);
1443
1444                if need_to_verify && r.is_err() {
1445                    warn!(
1446                        "Invalid header_arc! inserted_header={:?} err={:?}",
1447                        inner.arena[index].block_header.clone(),
1448                        r
1449                    );
1450                    invalid_set.insert(index);
1451                    inner.arena[index].graph_status = BLOCK_INVALID;
1452                    inner.set_and_propagate_invalid(
1453                        &mut queue,
1454                        &mut invalid_set,
1455                        index,
1456                    );
1457                    continue;
1458                }
1459
1460                // Maintain `old_era_blocks_frontier` for future garbage
1461                // collection after making a checkpoint.
1462                if inner.arena[index].parent_reclaimed {
1463                    inner.old_era_blocks_frontier.push_back(index);
1464                    inner.old_era_blocks_frontier_set.insert(index);
1465                }
1466
1467                // Note that when called by `insert_block_header` we have to
1468                // insert header here immediately instead of
1469                // after the loop because its children may
1470                // become ready and being processed in the loop later. It
1471                // requires this block already being inserted
1472                // into the BlockDataManager!
1473                if index == header_index_to_insert && persistent {
1474                    self.data_man.insert_block_header(
1475                        inner.arena[index].block_header.hash(),
1476                        inner.arena[index].block_header.clone(),
1477                        true,
1478                    );
1479                }
1480                if insert_to_consensus {
1481                    CONSENSUS_WORKER_QUEUE.enqueue(1);
1482
1483                    self.consensus_unprocessed_count
1484                        .fetch_add(1, Ordering::SeqCst);
1485                    assert!(
1486                        self.new_block_hashes
1487                            .send(inner.arena[index].block_header.hash(),),
1488                        "consensus receiver dropped"
1489                    );
1490
1491                    // maintain not_ready_blocks_frontier
1492                    inner.not_ready_blocks_frontier.remove(&index);
1493                    // The children will be automatically added in
1494                    // `new_to_be_header_graph_ready` if they should be added.
1495                    inner.pos_not_ready_blocks_frontier.remove(&index);
1496                    for child in &inner.arena[index].children {
1497                        inner.not_ready_blocks_frontier.insert(*child);
1498                    }
1499                }
1500
1501                // Passed verification on header_arc.
1502                if inner.arena[index].block_ready {
1503                    need_to_relay.push(inner.arena[index].block_header.hash());
1504                }
1505
1506                for child in &inner.arena[index].children {
1507                    if inner.arena[*child].graph_status
1508                        < BLOCK_HEADER_GRAPH_READY
1509                    {
1510                        queue.push_back(*child);
1511                    }
1512                }
1513                for referrer in &inner.arena[index].referrers {
1514                    if inner.arena[*referrer].graph_status
1515                        < BLOCK_HEADER_GRAPH_READY
1516                    {
1517                        queue.push_back(*referrer);
1518                    }
1519                }
1520            } else {
1521                debug!(
1522                    "BlockIndex {} parent_index {} hash {:?} is not ready",
1523                    index,
1524                    inner.arena[index].parent,
1525                    inner.arena[index].block_header.hash()
1526                );
1527                if index == header_index_to_insert && persistent {
1528                    self.data_man.insert_block_header(
1529                        inner.arena[index].block_header.hash(),
1530                        inner.arena[index].block_header.clone(),
1531                        true,
1532                    );
1533                }
1534            }
1535        }
1536        (invalid_set, need_to_relay)
1537    }
1538
1539    pub fn insert_block_header(
1540        &self, header: &mut BlockHeader, need_to_verify: bool,
1541        bench_mode: bool, insert_to_consensus: bool, persistent: bool,
1542    ) -> (BlockHeaderInsertionResult, Vec<H256>) {
1543        let _timer = MeterTimer::time_func(SYNC_INSERT_HEADER.as_ref());
1544        self.statistics.inc_sync_graph_inserted_header_count();
1545        let inner = &mut *self.inner.write();
1546        if inner.locked_for_catchup {
1547            // Ignore received headers when we are downloading block bodies.
1548            return (BlockHeaderInsertionResult::TemporarySkipped, Vec::new());
1549        }
1550        let hash = header.hash();
1551
1552        let (invalid, local_info_opt) = self.data_man.verified_invalid(&hash);
1553        if invalid {
1554            return (BlockHeaderInsertionResult::Invalid, Vec::new());
1555        }
1556
1557        if let Some(info) = local_info_opt {
1558            // If the block is ordered before current era genesis or it has
1559            // already entered consensus graph in this run, we do not need to
1560            // process it. And in these two cases, the block is considered
1561            // valid.
1562            let already_processed = info.get_seq_num()
1563                < self.consensus.current_era_genesis_seq_num()
1564                || info.get_instance_id() == self.data_man.get_instance_id();
1565            if already_processed {
1566                if need_to_verify && !self.is_consortium() {
1567                    // Compute pow_quality, because the input header may be used
1568                    // as a part of block later
1569                    VerificationConfig::get_or_fill_header_pow_quality(
1570                        &self.pow, header,
1571                    );
1572                }
1573                return (
1574                    BlockHeaderInsertionResult::AlreadyProcessedInConsensus,
1575                    Vec::new(),
1576                );
1577            }
1578        }
1579
1580        if inner.hash_to_arena_indices.contains_key(&hash) {
1581            if need_to_verify {
1582                // Compute pow_quality, because the input header may be used as
1583                // a part of block later
1584                VerificationConfig::get_or_fill_header_pow_quality(
1585                    &self.pow, header,
1586                );
1587            }
1588            return (
1589                BlockHeaderInsertionResult::AlreadyProcessedInSync,
1590                Vec::new(),
1591            );
1592        }
1593
1594        // skip check for consortium currently
1595        debug!("is_consortium={:?}", self.is_consortium());
1596        let verification_passed = if need_to_verify {
1597            self.is_consortium()
1598                || !(self.parent_or_referees_invalid(header)
1599                    || self
1600                        .verification_config
1601                        .verify_header_params(&self.pow, header)
1602                        .or_else(|e| {
1603                            warn!(
1604                                "Invalid header: err={} header={:?}",
1605                                e, header
1606                            );
1607                            Err(e)
1608                        })
1609                        .is_err())
1610        } else {
1611            if !bench_mode && !self.is_consortium() {
1612                self.verification_config
1613                    .verify_pow(&self.pow, header)
1614                    .expect("local mined block should pass this check!");
1615            }
1616            true
1617        };
1618
1619        let header_arc = Arc::new(header.clone());
1620        let me = if verification_passed {
1621            inner.insert(header_arc.clone())
1622        } else {
1623            inner.insert_invalid(header_arc.clone())
1624        };
1625
1626        // Currently, `inner.arena[me].graph_status` will only be
1627        //   1. `BLOCK_GRAPH_READY` for genesis block.
1628        //   2. `BLOCK_HEADER_ONLY` for non genesis block.
1629        //   3. `BLOCK_INVALID` for invalid block.
1630        if inner.arena[me].graph_status != BLOCK_GRAPH_READY {
1631            // This block will become a new `not_ready_blocks_frontier` if
1632            //   1. It's parent block has not inserted yet.
1633            //   2. We are in `Catch Up Blocks Phase` and the graph status of
1634            // parent block is `BLOCK_GRAPH_READY`.
1635            //   3. We are in `Catch Up Headers Phase` and the graph status of
1636            // parent block is `BLOCK_HEADER_GRAPH_READY`.
1637            //   4. The block is not graph ready because of not-ready
1638            // pos_reference, and parent is not in the frontier.
1639            if inner.arena[me].parent == NULL
1640                || inner.arena[inner.arena[me].parent].graph_status
1641                    == BLOCK_GRAPH_READY
1642                || (insert_to_consensus
1643                    && inner.arena[inner.arena[me].parent].graph_status
1644                        == BLOCK_HEADER_GRAPH_READY)
1645            {
1646                inner.not_ready_blocks_frontier.insert(me);
1647            }
1648            let mut to_be_removed = Vec::new();
1649            for child in &inner.arena[me].children {
1650                if inner.not_ready_blocks_frontier.contains(child) {
1651                    to_be_removed.push(*child);
1652                }
1653            }
1654            for x in to_be_removed {
1655                inner.not_ready_blocks_frontier.remove(&x);
1656            }
1657        }
1658
1659        debug!("insert_block_header() Block = {:?}, index = {}, need_to_verify = {}, bench_mode = {} insert_to_consensus = {}",
1660               header.hash(), me, need_to_verify, bench_mode, insert_to_consensus);
1661
1662        // Start to pass influence to descendants
1663        let (invalid_set, need_to_relay) = self.propagate_header_graph_status(
1664            inner,
1665            vec![me],
1666            need_to_verify,
1667            me,
1668            insert_to_consensus,
1669            persistent,
1670        );
1671
1672        let me_invalid = invalid_set.contains(&me);
1673
1674        // Post-processing invalid blocks.
1675        inner.process_invalid_blocks(&invalid_set);
1676
1677        if me_invalid {
1678            return (BlockHeaderInsertionResult::Invalid, need_to_relay);
1679        }
1680
1681        inner.try_clear_old_era_blocks();
1682
1683        (BlockHeaderInsertionResult::NewValid, need_to_relay)
1684    }
1685
1686    pub fn contains_block(&self, hash: &H256) -> bool {
1687        let inner = self.inner.read();
1688        if let Some(index) = inner.hash_to_arena_indices.get(hash) {
1689            inner.arena[*index].block_ready
1690        } else {
1691            false
1692        }
1693    }
1694
1695    fn set_graph_ready(
1696        &self, inner: &mut SynchronizationGraphInner, index: usize,
1697    ) {
1698        inner.arena[index].graph_status = BLOCK_GRAPH_READY;
1699
1700        // maintain not_ready_blocks_frontier
1701        inner.not_ready_blocks_frontier.remove(&index);
1702        // The children will be automatically added in
1703        // `new_to_be_block_graph_ready` if they should be added.
1704        inner.pos_not_ready_blocks_frontier.remove(&index);
1705        for child in &inner.arena[index].children {
1706            inner.not_ready_blocks_frontier.insert(*child);
1707        }
1708
1709        let h = inner.arena[index].block_header.hash();
1710        debug!("Block {:?} is graph ready", h);
1711        CONSENSUS_WORKER_QUEUE.enqueue(1);
1712
1713        self.consensus_unprocessed_count
1714            .fetch_add(1, Ordering::SeqCst);
1715        assert!(self.new_block_hashes.send(h), "consensus receiver dropped");
1716
1717        if inner.config.enable_state_expose {
1718            STATE_EXPOSER.sync_graph.lock().ready_block_vec.push(
1719                SyncGraphBlockState {
1720                    block_hash: h,
1721                    parent: inner.arena[index]
1722                        .block_header
1723                        .parent_hash()
1724                        .clone(),
1725                    referees: inner.arena[index]
1726                        .block_header
1727                        .referee_hashes()
1728                        .clone(),
1729                    nonce: inner.arena[index].block_header.nonce(),
1730                    timestamp: inner.arena[index].block_header.timestamp(),
1731                    adaptive: inner.arena[index].block_header.adaptive(),
1732                },
1733            );
1734        }
1735    }
1736
1737    /// subroutine called by `insert_block` and `remove_expire_blocks`
1738    fn propagate_graph_status(
1739        &self, inner: &mut SynchronizationGraphInner,
1740        frontier_index_list: Vec<usize>,
1741    ) -> HashSet<usize> {
1742        let mut queue = VecDeque::new();
1743        let mut invalid_set = HashSet::new();
1744        for index in frontier_index_list {
1745            if inner.arena[index].graph_status == BLOCK_INVALID {
1746                invalid_set.insert(index);
1747            }
1748            queue.push_back(index);
1749        }
1750
1751        while let Some(index) = queue.pop_front() {
1752            if inner.arena[index].graph_status == BLOCK_INVALID {
1753                inner.set_and_propagate_invalid(
1754                    &mut queue,
1755                    &mut invalid_set,
1756                    index,
1757                );
1758            } else if inner.new_to_be_block_graph_ready(index) {
1759                let verify_result = inner
1760                    .verify_graph_ready_block(index, &self.verification_config);
1761                if verify_result.is_err() {
1762                    warn!(
1763                        "Invalid block! inserted_header={:?} err={:?}",
1764                        inner.arena[index].block_header.clone(),
1765                        verify_result
1766                    );
1767                    invalid_set.insert(index);
1768                    inner.arena[index].graph_status = BLOCK_INVALID;
1769                    inner.set_and_propagate_invalid(
1770                        &mut queue,
1771                        &mut invalid_set,
1772                        index,
1773                    );
1774                    continue;
1775                }
1776                self.set_graph_ready(inner, index);
1777                for child in &inner.arena[index].children {
1778                    debug_assert!(
1779                        inner.arena[*child].graph_status < BLOCK_GRAPH_READY
1780                    );
1781                    queue.push_back(*child);
1782                }
1783                for referrer in &inner.arena[index].referrers {
1784                    debug_assert!(
1785                        inner.arena[*referrer].graph_status < BLOCK_GRAPH_READY
1786                    );
1787                    queue.push_back(*referrer);
1788                }
1789            } else {
1790                trace!("Block index {:?} not block_graph_ready, current frontier: {:?}", index, inner.not_ready_blocks_frontier.get_frontier());
1791            }
1792        }
1793
1794        invalid_set
1795    }
1796
1797    pub fn insert_block(
1798        &self, block: Block, need_to_verify: bool, persistent: bool,
1799        recover_from_db: bool,
1800    ) -> BlockInsertionResult {
1801        let _timer = MeterTimer::time_func(SYNC_INSERT_BLOCK.as_ref());
1802        let hash = block.hash();
1803
1804        debug!("insert_block {:?}", hash);
1805
1806        let inner = &mut *self.inner.write();
1807
1808        let contains_block =
1809            if let Some(index) = inner.hash_to_arena_indices.get(&hash) {
1810                inner.arena[*index].block_ready
1811            } else {
1812                // Sync graph is cleaned after inserting the header, so we can
1813                // ignore the block body
1814                return BlockInsertionResult::Ignored;
1815            };
1816
1817        if contains_block {
1818            return BlockInsertionResult::AlreadyProcessed;
1819        }
1820
1821        // We only insert the body after a valid header is inserted, so this has
1822        // been checked when we insert the header.
1823        debug_assert!(!self.data_man.verified_invalid(&hash).0);
1824
1825        self.statistics.inc_sync_graph_inserted_block_count();
1826
1827        let me = *inner.hash_to_arena_indices.get(&hash).unwrap();
1828
1829        debug_assert!(hash == inner.arena[me].block_header.hash());
1830        debug_assert!(!inner.arena[me].block_ready);
1831        inner.arena[me].block_ready = true;
1832
1833        if need_to_verify {
1834            let r = self.verification_config.verify_sync_graph_block_basic(
1835                &block,
1836                self.consensus.best_chain_id(),
1837            );
1838            match r {
1839                Err(Error::Block(BlockError::InvalidTransactionsRoot(e))) => {
1840                    warn!("BlockTransactionRoot not match! inserted_block={:?} err={:?}", block, e);
1841                    // If the transaction root does not match, it might be
1842                    // caused by receiving wrong
1843                    // transactions because of conflicting ShortId in
1844                    // CompactBlock, or caused by
1845                    // adversaries. In either case, we should request the block
1846                    // again, and the received block body is
1847                    // discarded.
1848                    inner.arena[me].block_ready = false;
1849                    return BlockInsertionResult::RequestAgain;
1850                }
1851                Err(e) => {
1852                    warn!(
1853                        "Invalid block! inserted_block={:?} err={:?}",
1854                        block.block_header, e
1855                    );
1856                    inner.arena[me].graph_status = BLOCK_INVALID;
1857                }
1858                _ => {}
1859            };
1860        }
1861
1862        let block = Arc::new(block);
1863        if inner.arena[me].graph_status != BLOCK_INVALID {
1864            // If we are rebuilding the graph from db, we do not insert all
1865            // blocks into memory
1866            if !recover_from_db {
1867                // Here we always build a new compact block because we should
1868                // not reuse the nonce
1869                self.data_man.insert_compact_block(block.to_compact());
1870                // block header was inserted in before, only insert block body
1871                // here
1872                self.data_man.insert_block_body(
1873                    block.hash(),
1874                    block.clone(),
1875                    persistent,
1876                );
1877            }
1878        }
1879
1880        // If we are locked for catch-up, make sure no new block will enter sync
1881        // graph.
1882        if inner.locked_for_catchup {
1883            if inner.arena[me].graph_status == BLOCK_INVALID {
1884                let invalid_set = self.propagate_graph_status(inner, vec![me]);
1885                // Invalid blocks will also be removed from
1886                // `block_to_fill_set`
1887                // in `process_invalid_blocks`.
1888                inner.process_invalid_blocks(&invalid_set);
1889                return BlockInsertionResult::Invalid;
1890            } else {
1891                debug!("Downloaded block body for {:?}", hash);
1892                inner.block_to_fill_set.remove(&hash);
1893                return BlockInsertionResult::AlreadyProcessed;
1894            }
1895        }
1896
1897        let invalid_set = self.propagate_graph_status(inner, vec![me]);
1898
1899        // Post-processing invalid blocks.
1900        inner.process_invalid_blocks(&invalid_set);
1901
1902        debug!(
1903            "new block inserted into graph: block_header={:?}, tx_count={}, block_size={}",
1904            block.block_header,
1905            block.transactions.len(),
1906            block.size(),
1907        );
1908
1909        // Note: If `me` is invalid, it has been removed from `arena` now,
1910        // so we cannot access its `graph_status`.
1911        if invalid_set.contains(&me) {
1912            BlockInsertionResult::Invalid
1913        } else if inner.arena[me].graph_status >= BLOCK_HEADER_GRAPH_READY {
1914            BlockInsertionResult::ShouldRelay
1915        } else {
1916            BlockInsertionResult::SuccessWithoutRelay
1917        }
1918    }
1919
1920    pub fn get_all_block_hashes_by_epoch(
1921        &self, epoch_number: u64,
1922    ) -> Result<Vec<H256>, ProviderBlockError> {
1923        let mut res = self.consensus.get_skipped_block_hashes_by_epoch(
1924            EpochNumber::Number(epoch_number.into()),
1925        )?;
1926        res.append(&mut self.consensus.get_block_hashes_by_epoch(
1927            EpochNumber::Number(epoch_number.into()),
1928        )?);
1929        Ok(res)
1930    }
1931
1932    pub fn log_statistics(&self) { self.statistics.log_statistics(); }
1933
1934    pub fn update_total_weight_delta_heartbeat(&self) {
1935        self.consensus.update_total_weight_delta_heartbeat();
1936    }
1937
1938    /// Get the current number of blocks in the synchronization graph
1939    /// This only returns cached block count, and this is enough since this is
1940    /// only used in test.
1941    pub fn block_count(&self) -> usize { self.data_man.cached_block_count() }
1942
1943    /// Remove all blocks which have not been updated for a long time. We
1944    /// maintain a set `not_ready_blocks_frontier` which is the root nodes in
1945    /// the parental tree formed by not graph ready blocks. Find all expire
1946    /// blocks which can be reached by `not_ready_blocks_frontier`.
1947    pub fn remove_expire_blocks(&self, expire_time: u64) {
1948        let inner = &mut *self.inner.write();
1949        let now = SystemTime::now()
1950            .duration_since(UNIX_EPOCH)
1951            .unwrap()
1952            .as_secs();
1953        let frontier = inner.not_ready_blocks_frontier.get_frontier().clone();
1954        let all_not_ready: HashSet<_> = inner.get_future(frontier);
1955        let mut expire_set = HashSet::new();
1956        for index in all_not_ready {
1957            if inner.arena[index].last_update_timestamp + expire_time < now {
1958                expire_set.insert(index);
1959            }
1960        }
1961
1962        // find blocks reached by previous found expired blocks
1963        let all_expire: HashSet<_> = inner.get_future(expire_set);
1964        debug!("all_expire: {:?}", all_expire);
1965        inner.remove_blocks(&all_expire);
1966    }
1967
1968    /// Remove all blocks in `to_remove_set` and their future set from the
1969    /// graph.
1970    pub fn remove_blocks_and_future(&self, to_remove_set: &HashSet<H256>) {
1971        let mut inner = self.inner.write();
1972        let mut index_set = Vec::new();
1973        for block_hash in to_remove_set {
1974            if let Some(index) = inner.hash_to_arena_indices.get(block_hash) {
1975                index_set.push(*index);
1976            }
1977        }
1978        let index_set_and_future: HashSet<_> = inner.get_future(index_set);
1979        inner.remove_blocks(&index_set_and_future);
1980    }
1981
1982    pub fn is_consensus_worker_busy(&self) -> bool {
1983        self.consensus_unprocessed_count.load(Ordering::SeqCst) != 0
1984    }
1985
1986    pub fn is_fill_block_completed(&self) -> bool {
1987        self.inner.read().block_to_fill_set.is_empty()
1988    }
1989
1990    /// Construct the states along the pivot chain, set all
1991    /// `BLOCK_HEADER_GRAPH_READY` blocks as `BLOCK_GRAPH_READY` and remove all
1992    /// other blocks. All blocks in the future can be processed normally in
1993    /// sync graph and consensus graph.
1994    ///
1995    /// If some blocks become invalid after validating their bodies, we need to
1996    /// remove them and reconstruct the consensus graph. Return `false` if
1997    /// there are blocks in the new consensus graph whose bodies are missing.
1998    /// Return `true` if we do not need to reconstruct consensus, or all blocks
1999    /// in the new consensus graph already have bodies.
2000    pub fn complete_filling_block_bodies(&self) -> bool {
2001        {
2002            let inner = &mut *self.inner.write();
2003
2004            // Iterating over `hash_to_arena_indices` might be more efficient
2005            // than iterating over `arena`.
2006            let to_remove = {
2007                let arena = &mut inner.arena;
2008                inner
2009                    .hash_to_arena_indices
2010                    .iter()
2011                    .filter_map(|(_, index)| {
2012                        let graph_node = &mut arena[*index];
2013                        if graph_node.graph_status == BLOCK_HEADER_GRAPH_READY {
2014                            graph_node.block_ready = true;
2015                            graph_node.graph_status = BLOCK_GRAPH_READY;
2016                        }
2017                        if graph_node.graph_status != BLOCK_GRAPH_READY {
2018                            Some(*index)
2019                        } else {
2020                            None
2021                        }
2022                    })
2023                    .collect()
2024            };
2025            inner.remove_blocks(&to_remove);
2026
2027            // Check if we skip some block bodies. It's either because they are
2028            // never retrieved after a long time, or they have invalid
2029            // bodies.
2030            let skipped_body_blocks =
2031                self.consensus.get_blocks_needing_bodies();
2032            if !skipped_body_blocks.is_empty() {
2033                warn!("Has invalid blocks after downloading block bodies!");
2034                // Some headers should not enter consensus, so we just
2035                // reconstruct the consensus graph with the
2036                // current sync graph.
2037                self.consensus.reset();
2038
2039                let all_block_indices: HashSet<_> = inner
2040                    .hash_to_arena_indices
2041                    .iter()
2042                    .map(|(_, i)| *i)
2043                    .collect();
2044                // Send blocks in topological order.
2045                let sorted_blocks = inner.topological_sort(all_block_indices);
2046                for i in sorted_blocks {
2047                    self.consensus
2048                        .on_new_block(&inner.arena[i].block_header.hash());
2049                }
2050                let new_to_fill_blocks: HashSet<_> =
2051                    self.consensus.get_blocks_needing_bodies();
2052                if !new_to_fill_blocks.is_empty() {
2053                    // This should not happen if stable checkpoint is not
2054                    // reverted because we have downloaded
2055                    // all blocks in its subtree.
2056                    warn!(
2057                        "{} new block bodies to get",
2058                        new_to_fill_blocks.len()
2059                    );
2060                    inner.block_to_fill_set = new_to_fill_blocks;
2061                    return false;
2062                }
2063            }
2064        }
2065        self.consensus.construct_pivot_state();
2066        self.inner.write().locked_for_catchup = false;
2067        true
2068    }
2069
2070    /// TODO(lpl): Only triggered when pos commits new blocks?
2071    /// Check if not_ready_frontier blocks become ready now.
2072    /// Blocks that are not ready because of missing pos references only become
2073    /// ready here.
2074    pub fn check_not_ready_frontier(&self, header_only: bool) {
2075        debug!("check_not_ready_frontier starts");
2076        let mut inner = self.inner.write();
2077        if inner.locked_for_catchup {
2078            // Do not change sync graph or consensus graph during
2079            // `CatchUpFillBlockBodyPhase`.
2080            return;
2081        }
2082        if header_only {
2083            for b in inner.pos_not_ready_blocks_frontier.clone() {
2084                debug!(
2085                    "check_not_ready_frontier: check {:?}",
2086                    inner.arena[b].block_header.hash()
2087                );
2088                if inner.new_to_be_header_graph_ready(b) {
2089                    self.propagate_header_graph_status(
2090                        &mut *inner,
2091                        vec![b],
2092                        true, /* need_to_verify */
2093                        b,
2094                        true, /* insert_to_consensus */
2095                        true, /* persistent */
2096                    );
2097                }
2098            }
2099        } else {
2100            for b in inner.pos_not_ready_blocks_frontier.clone() {
2101                debug!(
2102                    "check_not_ready_frontier: check {:?}",
2103                    inner.arena[b].block_header.hash()
2104                );
2105                if inner.new_to_be_header_graph_ready(b) {
2106                    self.propagate_header_graph_status(
2107                        &mut *inner,
2108                        vec![b],
2109                        true, /* need_to_verify */
2110                        b,
2111                        false, /* insert_to_consensus */
2112                        true,  /* persistent */
2113                    );
2114                }
2115                // This will not introduce new invalid blocks, so we do not need
2116                // to process the return value.
2117                if inner.new_to_be_block_graph_ready(b) {
2118                    debug!("new graph ready found");
2119                    self.propagate_graph_status(&mut *inner, vec![b]);
2120                }
2121            }
2122        }
2123    }
2124}
2125
2126impl Graph for SynchronizationGraphInner {
2127    type NodeIndex = usize;
2128}
2129
2130impl TreeGraph for SynchronizationGraphInner {
2131    fn parent(&self, node_index: Self::NodeIndex) -> Option<Self::NodeIndex> {
2132        if self.arena[node_index].parent != NULL {
2133            Some(self.arena[node_index].parent)
2134        } else {
2135            None
2136        }
2137    }
2138
2139    fn referees(&self, node_index: Self::NodeIndex) -> Vec<Self::NodeIndex> {
2140        self.arena[node_index].referees.clone()
2141    }
2142}
2143
2144impl RichTreeGraph for SynchronizationGraphInner {
2145    fn children(&self, node_index: Self::NodeIndex) -> Vec<Self::NodeIndex> {
2146        self.arena[node_index].children.clone()
2147    }
2148
2149    fn referrers(&self, node_index: Self::NodeIndex) -> Vec<Self::NodeIndex> {
2150        self.arena[node_index].referrers.clone()
2151    }
2152}
2153
2154pub enum BlockInsertionResult {
2155    // The block is valid and already processed before.
2156    AlreadyProcessed,
2157    // The block is valid and is new to be block-graph-ready.
2158    ShouldRelay,
2159    // The block is valid but not block-graph-ready.
2160    SuccessWithoutRelay,
2161    // The block is definitely invalid. It's not inserted to sync graph
2162    // and should not be requested again.
2163    Invalid,
2164    // The case where transaction root does not match.
2165    // We should request again to get
2166    // the correct transactions for full verification.
2167    RequestAgain,
2168    // This is only for the case the header is removed, possibly because
2169    // we switch phases.
2170    // We ignore the block without verification.
2171    Ignored,
2172}
2173
2174impl BlockInsertionResult {
2175    pub fn is_valid(&self) -> bool {
2176        matches!(
2177            self,
2178            BlockInsertionResult::AlreadyProcessed
2179                | BlockInsertionResult::ShouldRelay
2180                | BlockInsertionResult::SuccessWithoutRelay
2181        )
2182    }
2183
2184    pub fn is_invalid(&self) -> bool {
2185        matches!(self, BlockInsertionResult::Invalid)
2186    }
2187
2188    pub fn should_relay(&self) -> bool {
2189        matches!(self, BlockInsertionResult::ShouldRelay)
2190    }
2191
2192    pub fn request_again(&self) -> bool {
2193        matches!(self, BlockInsertionResult::RequestAgain)
2194    }
2195}
2196
2197pub enum BlockHeaderInsertionResult {
2198    // The block is valid and already processed consensus before.
2199    // We should not process this block again.
2200    AlreadyProcessedInConsensus,
2201    // The block header has been inserted into sync graph. We can ingore this
2202    // header, but we should keep processing its body if needed.
2203    AlreadyProcessedInSync,
2204    // The block is valid and is processed for the first time.
2205    NewValid,
2206    // The block is definitely invalid. It's not inserted to sync graph
2207    // and should not be requested again.
2208    Invalid,
2209    // The header is received when we have locked sync graph.
2210    TemporarySkipped,
2211}
2212
2213impl BlockHeaderInsertionResult {
2214    pub fn is_new_valid(&self) -> bool {
2215        matches!(self, BlockHeaderInsertionResult::NewValid)
2216    }
2217
2218    pub fn is_invalid(&self) -> bool {
2219        matches!(self, BlockHeaderInsertionResult::Invalid)
2220    }
2221
2222    pub fn should_process_body(&self) -> bool {
2223        matches!(
2224            self,
2225            BlockHeaderInsertionResult::NewValid
2226                | BlockHeaderInsertionResult::AlreadyProcessedInSync
2227        )
2228    }
2229}