cfxcore/sync/
synchronization_graph.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use std::{
6    cmp::max,
7    collections::{BinaryHeap, HashMap, HashSet, VecDeque},
8    mem, panic,
9    sync::{
10        atomic::{AtomicUsize, Ordering},
11        Arc,
12    },
13    thread,
14    time::{Duration, SystemTime, UNIX_EPOCH},
15};
16
17use cfx_parameters::consensus_internal::ELASTICITY_MULTIPLIER;
18use futures::executor::block_on;
19use parking_lot::RwLock;
20use slab::Slab;
21use tokio::sync::mpsc::error::TryRecvError;
22use unexpected::{Mismatch, OutOfBounds};
23
24use cfx_executor::machine::Machine;
25use cfx_types::{H256, U256};
26use cfxcore_errors::ProviderBlockError;
27use dag::{Graph, RichDAG, RichTreeGraph, TreeGraph, DAG};
28use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
29use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
30use metrics::{
31    register_meter_with_group, register_queue, Meter, MeterTimer, Queue,
32};
33use primitives::{
34    pos::PosBlockId, transaction::SignedTransaction, Block, BlockHeader,
35    EpochNumber,
36};
37
38use crate::{
39    block_data_manager::{BlockDataManager, BlockStatus},
40    channel::Channel,
41    consensus::{pos_handler::PosVerifier, SharedConsensusGraph},
42    core_error::{BlockError, CoreError as Error},
43    pow::{PowComputer, ProofOfWorkConfig},
44    state_exposer::{SyncGraphBlockState, STATE_EXPOSER},
45    statistics::SharedStatistics,
46    sync::synchronization_protocol_handler::FutureBlockContainer,
47    verification::*,
48    Notifications,
49};
50
51lazy_static! {
52    static ref SYNC_INSERT_HEADER: Arc<dyn Meter> =
53        register_meter_with_group("timer", "sync::insert_block_header");
54    static ref SYNC_INSERT_BLOCK: Arc<dyn Meter> =
55        register_meter_with_group("timer", "sync::insert_block");
56    static ref CONSENSUS_WORKER_QUEUE: Arc<dyn Queue> =
57        register_queue("consensus_worker_queue");
58}
59
60const NULL: usize = !0;
61const BLOCK_INVALID: u8 = 0;
62const BLOCK_HEADER_ONLY: u8 = 1;
63const BLOCK_HEADER_GRAPH_READY: u8 = 2;
64const BLOCK_GRAPH_READY: u8 = 3;
65
66#[derive(Copy, Clone)]
67pub struct SyncGraphConfig {
68    pub future_block_buffer_capacity: usize,
69    pub enable_state_expose: bool,
70    pub is_consortium: bool,
71}
72
73#[derive(Debug)]
74pub struct SyncGraphStatistics {
75    pub inserted_block_count: usize,
76    pub inserted_header_count: usize,
77}
78
79impl SyncGraphStatistics {
80    pub fn new() -> SyncGraphStatistics {
81        SyncGraphStatistics {
82            // Already counted genesis block
83            inserted_header_count: 1,
84            inserted_block_count: 1,
85        }
86    }
87
88    pub fn clear(&mut self) {
89        self.inserted_header_count = 1;
90        self.inserted_block_count = 1;
91    }
92}
93
94#[derive(DeriveMallocSizeOf)]
95pub struct SynchronizationGraphNode {
96    pub block_header: Arc<BlockHeader>,
97    /// The status of graph connectivity in the current block view.
98    pub graph_status: u8,
99    /// Whether the block body is ready.
100    pub block_ready: bool,
101    /// Whether parent is in old era and already reclaimed
102    pub parent_reclaimed: bool,
103    /// The index of the parent of the block.
104    pub parent: usize,
105    /// The indices of the children of the block.
106    pub children: Vec<usize>,
107    /// The indices of the blocks referenced by the block.
108    pub referees: Vec<usize>,
109    /// The number of blocks referenced by the block but
110    /// haven't been inserted in synchronization graph.
111    pub pending_referee_count: usize,
112    /// The indices of the blocks referencing the block.
113    pub referrers: Vec<usize>,
114    /// the timestamp in seconds when graph_status updated
115    pub last_update_timestamp: u64,
116}
117
118#[derive(DeriveMallocSizeOf)]
119pub struct UnreadyBlockFrontier {
120    frontier: HashSet<usize>,
121    updated: bool,
122}
123
124impl UnreadyBlockFrontier {
125    fn new() -> Self {
126        UnreadyBlockFrontier {
127            frontier: HashSet::new(),
128            updated: false,
129        }
130    }
131
132    pub fn reset_update_state(&mut self) { self.updated = false; }
133
134    pub fn updated(&self) -> bool { self.updated }
135
136    pub fn get_frontier(&self) -> &HashSet<usize> { &self.frontier }
137
138    pub fn remove(&mut self, index: &usize) -> bool {
139        self.updated = true;
140        self.frontier.remove(index)
141    }
142
143    pub fn contains(&self, index: &usize) -> bool {
144        self.frontier.contains(index)
145    }
146
147    pub fn insert(&mut self, index: usize) -> bool {
148        self.updated = true;
149        self.frontier.insert(index)
150    }
151
152    pub fn len(&self) -> usize { self.frontier.len() }
153}
154
155pub struct SynchronizationGraphInner {
156    pub arena: Slab<SynchronizationGraphNode>,
157    pub hash_to_arena_indices: HashMap<H256, usize>,
158    pub data_man: Arc<BlockDataManager>,
159    children_by_hash: HashMap<H256, Vec<usize>>,
160    referrers_by_hash: HashMap<H256, Vec<usize>>,
161    pub pow_config: ProofOfWorkConfig,
162    pub pow: Arc<PowComputer>,
163    pub config: SyncGraphConfig,
164    /// The indices of blocks whose graph_status is not GRAPH_READY.
165    /// It may consider not header-graph-ready in phases
166    /// `CatchUpRecoverBlockHeaderFromDB` and `CatchUpSyncBlockHeader`.
167    /// Or, it may consider not block-graph-ready in phases
168    /// `CatchUpRecoverBlockFromDB`, `CatchUpSyncBlock`, and `Normal`.
169    pub not_ready_blocks_frontier: UnreadyBlockFrontier,
170
171    /// This includes the blocks whose parent and referees are all ready, and
172    /// only pos_reference has not been ready (pos_reference not committed
173    /// or its pivot decision is not ready).
174    pub pos_not_ready_blocks_frontier: HashSet<usize>,
175    pub old_era_blocks_frontier: VecDeque<usize>,
176    pub old_era_blocks_frontier_set: HashSet<usize>,
177
178    /// Set to `true` in `CatchUpCheckpointPhase` and
179    /// `CatchUpFillBlockBodyPhase` so that sync graph and consensus graph
180    /// remain unchanged for consistency.
181    pub locked_for_catchup: bool,
182    /// The set of blocks that we need to download block bodies in
183    /// `CatchUpFillBlockBodyPhase`.
184    pub block_to_fill_set: HashSet<H256>,
185    machine: Arc<Machine>,
186    pub pos_verifier: Arc<PosVerifier>,
187}
188
189impl MallocSizeOf for SynchronizationGraphInner {
190    fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
191        self.arena.size_of(ops)
192            + self.hash_to_arena_indices.size_of(ops)
193            + self.data_man.size_of(ops)
194            + self.children_by_hash.size_of(ops)
195            + self.referrers_by_hash.size_of(ops)
196            + self.pow_config.size_of(ops)
197            + self.not_ready_blocks_frontier.size_of(ops)
198            + self.old_era_blocks_frontier.size_of(ops)
199            + self.old_era_blocks_frontier_set.size_of(ops)
200        // Does not count size_of machine.
201    }
202}
203
204impl SynchronizationGraphInner {
205    pub fn with_genesis_block(
206        genesis_header: Arc<BlockHeader>, pow_config: ProofOfWorkConfig,
207        pow: Arc<PowComputer>, config: SyncGraphConfig,
208        data_man: Arc<BlockDataManager>, machine: Arc<Machine>,
209        pos_verifier: Arc<PosVerifier>,
210    ) -> Self {
211        let mut inner = SynchronizationGraphInner {
212            arena: Slab::new(),
213            hash_to_arena_indices: HashMap::new(),
214            data_man,
215            children_by_hash: HashMap::new(),
216            referrers_by_hash: HashMap::new(),
217            pow_config,
218            pow,
219            config,
220            not_ready_blocks_frontier: UnreadyBlockFrontier::new(),
221            pos_not_ready_blocks_frontier: Default::default(),
222            old_era_blocks_frontier: Default::default(),
223            old_era_blocks_frontier_set: Default::default(),
224            block_to_fill_set: Default::default(),
225            locked_for_catchup: false,
226            machine,
227            pos_verifier,
228        };
229        let genesis_hash = genesis_header.hash();
230        let genesis_block_index = inner.insert(genesis_header);
231        debug!(
232            "genesis block {:?} has index {} in sync graph",
233            genesis_hash, genesis_block_index
234        );
235
236        inner.old_era_blocks_frontier.push_back(genesis_block_index);
237        inner
238            .old_era_blocks_frontier_set
239            .insert(genesis_block_index);
240
241        inner
242    }
243
244    fn get_genesis_in_current_era(&self) -> usize {
245        let genesis_hash = self.data_man.get_cur_consensus_era_genesis_hash();
246        *self.hash_to_arena_indices.get(&genesis_hash).unwrap()
247    }
248
249    pub fn get_genesis_hash_and_height_in_current_era(&self) -> (H256, u64) {
250        let era_genesis = self.get_genesis_in_current_era();
251        (
252            self.arena[era_genesis].block_header.hash(),
253            self.arena[era_genesis].block_header.height(),
254        )
255    }
256
257    pub fn get_stable_hash_and_height_in_current_era(&self) -> (H256, u64) {
258        let stable_hash = self.data_man.get_cur_consensus_era_stable_hash();
259        // The stable block may not be in the sync-graph when this function is
260        // invoked during the synchronization phase, let's query the
261        // data from data manager
262        let height = self
263            .data_man
264            .block_header_by_hash(&stable_hash)
265            .expect("stable block must exist in data manager")
266            .height();
267        (stable_hash, height)
268    }
269
270    fn try_clear_old_era_blocks(&mut self) {
271        let max_num_of_cleared_blocks = 2;
272        let mut num_cleared = 0;
273        let era_genesis = self.get_genesis_in_current_era();
274        let genesis_seq_num = self
275            .data_man
276            .local_block_info_by_hash(
277                &self.data_man.get_cur_consensus_era_genesis_hash(),
278            )
279            .expect("local_block_info for genesis must exist")
280            .get_seq_num();
281        let mut era_genesis_in_frontier = false;
282
283        while let Some(index) = self.old_era_blocks_frontier.pop_front() {
284            if index == era_genesis {
285                era_genesis_in_frontier = true;
286                continue;
287            }
288
289            // Remove node with index
290            if !self.old_era_blocks_frontier_set.contains(&index) {
291                continue;
292            }
293
294            let hash = self.arena[index].block_header.hash();
295            assert!(self.arena[index].parent == NULL);
296
297            if !self.is_graph_ready_in_db(&hash, genesis_seq_num) {
298                // This block has not been processed in consensus. Clearing it
299                // now may make its referrers not block-graph-ready.
300                // See https://github.com/Conflux-Chain/conflux-rust/issues/1426.
301                //
302                // The blocks pushed to `old_era_blocks_frontier` are all
303                // BlockGraphReady, so it's ensured that they will be inserted
304                // into consensus and their local block infos will be persisted.
305                continue;
306            }
307
308            let referees: Vec<usize> =
309                self.arena[index].referees.iter().map(|x| *x).collect();
310            for referee in referees {
311                self.arena[referee].referrers.retain(|&x| x != index);
312            }
313            let referee_hashes: Vec<H256> = self.arena[index]
314                .block_header
315                .referee_hashes()
316                .iter()
317                .map(|x| *x)
318                .collect();
319            for referee_hash in referee_hashes {
320                let mut remove_referee_hash: bool = false;
321                if let Some(referrers) =
322                    self.referrers_by_hash.get_mut(&referee_hash)
323                {
324                    referrers.retain(|&x| x != index);
325                    remove_referee_hash = referrers.len() == 0;
326                }
327                // clean empty key
328                if remove_referee_hash {
329                    self.referrers_by_hash.remove(&referee_hash);
330                }
331            }
332
333            let children: Vec<usize> =
334                self.arena[index].children.iter().map(|x| *x).collect();
335            for child in children {
336                self.arena[child].parent = NULL;
337                self.arena[child].parent_reclaimed = true;
338                // We will check `is_graph_ready_in_db` before garbage
339                // collecting the blocks in `old_era_blocks_frontier`,
340                // so we do not need to check graph-ready-related status here
341                // before inserting them.
342                self.old_era_blocks_frontier.push_back(child);
343                assert!(!self.old_era_blocks_frontier_set.contains(&child));
344                self.old_era_blocks_frontier_set.insert(child);
345            }
346
347            let referrers: Vec<usize> =
348                self.arena[index].referrers.iter().map(|x| *x).collect();
349            for referrer in referrers {
350                self.arena[referrer].referees.retain(|&x| x != index);
351            }
352
353            self.old_era_blocks_frontier_set.remove(&index);
354            self.arena.remove(index);
355            self.hash_to_arena_indices.remove(&hash);
356            // only remove block header in memory cache
357            self.data_man
358                .remove_block_header(&hash, false /* remove_db */);
359
360            num_cleared += 1;
361            if num_cleared == max_num_of_cleared_blocks {
362                break;
363            }
364        }
365
366        if era_genesis_in_frontier {
367            self.old_era_blocks_frontier.push_front(era_genesis);
368        }
369    }
370
371    pub fn insert_invalid(&mut self, header: Arc<BlockHeader>) -> usize {
372        let hash = header.hash();
373        let me = self.arena.insert(SynchronizationGraphNode {
374            graph_status: BLOCK_INVALID,
375            block_ready: false,
376            parent_reclaimed: false,
377            parent: NULL,
378            children: Vec::new(),
379            referees: Vec::new(),
380            pending_referee_count: 0,
381            referrers: Vec::new(),
382            block_header: header,
383            last_update_timestamp: SystemTime::now()
384                .duration_since(UNIX_EPOCH)
385                .unwrap()
386                .as_secs(),
387        });
388        self.hash_to_arena_indices.insert(hash, me);
389
390        if let Some(children) = self.children_by_hash.remove(&hash) {
391            for child in &children {
392                self.arena[*child].parent = me;
393            }
394            self.arena[me].children = children;
395        }
396        if let Some(referrers) = self.referrers_by_hash.remove(&hash) {
397            for referrer in &referrers {
398                let ref mut node_referrer = self.arena[*referrer];
399                node_referrer.referees.push(me);
400                debug_assert!(node_referrer.pending_referee_count > 0);
401                if node_referrer.pending_referee_count > 0 {
402                    node_referrer.pending_referee_count =
403                        node_referrer.pending_referee_count - 1;
404                }
405            }
406            self.arena[me].referrers = referrers;
407        }
408
409        me
410    }
411
412    /// Return the index of the inserted block.
413    pub fn insert(&mut self, header: Arc<BlockHeader>) -> usize {
414        let hash = header.hash();
415        let is_genesis =
416            hash == self.data_man.get_cur_consensus_era_genesis_hash();
417
418        let me = self.arena.insert(SynchronizationGraphNode {
419            graph_status: if is_genesis {
420                BLOCK_GRAPH_READY
421            } else {
422                BLOCK_HEADER_ONLY
423            },
424            block_ready: is_genesis,
425            parent_reclaimed: false,
426            parent: NULL,
427            children: Vec::new(),
428            referees: Vec::new(),
429            pending_referee_count: 0,
430            referrers: Vec::new(),
431            block_header: header.clone(),
432            last_update_timestamp: SystemTime::now()
433                .duration_since(UNIX_EPOCH)
434                .unwrap()
435                .as_secs(),
436        });
437        self.hash_to_arena_indices.insert(hash, me);
438
439        if !is_genesis {
440            let parent_hash = header.parent_hash().clone();
441            if let Some(parent) =
442                self.hash_to_arena_indices.get(&parent_hash).cloned()
443            {
444                self.arena[me].parent = parent;
445                self.arena[parent].children.push(me);
446            } else {
447                self.children_by_hash
448                    .entry(parent_hash)
449                    .or_insert(Vec::new())
450                    .push(me);
451            }
452        }
453        for referee_hash in header.referee_hashes() {
454            if let Some(referee) =
455                self.hash_to_arena_indices.get(referee_hash).cloned()
456            {
457                self.arena[me].referees.push(referee);
458                self.arena[referee].referrers.push(me);
459            } else {
460                self.arena[me].pending_referee_count =
461                    self.arena[me].pending_referee_count + 1;
462                self.referrers_by_hash
463                    .entry(*referee_hash)
464                    .or_insert(Vec::new())
465                    .push(me);
466            }
467        }
468
469        if let Some(children) = self.children_by_hash.remove(&hash) {
470            for child in &children {
471                self.arena[*child].parent = me;
472            }
473            self.arena[me].children = children;
474        }
475        if let Some(referrers) = self.referrers_by_hash.remove(&hash) {
476            for referrer in &referrers {
477                let ref mut node_referrer = self.arena[*referrer];
478                node_referrer.referees.push(me);
479                debug_assert!(node_referrer.pending_referee_count > 0);
480                if node_referrer.pending_referee_count > 0 {
481                    node_referrer.pending_referee_count =
482                        node_referrer.pending_referee_count - 1;
483                }
484            }
485            self.arena[me].referrers = referrers;
486        }
487
488        me
489    }
490
491    // TODO local_block_info is also loaded for invalid check, so maybe we can
492    // refactor code to avoid loading it twice.
493    fn is_graph_ready_in_db(
494        &self, parent_or_referee_hash: &H256, genesis_seq_num: u64,
495    ) -> bool {
496        if let Some(info) = self
497            .data_man
498            .local_block_info_by_hash(parent_or_referee_hash)
499        {
500            if info.get_status() == BlockStatus::Invalid {
501                false
502            } else {
503                info.get_seq_num() < genesis_seq_num
504                    || info.get_instance_id() == self.data_man.get_instance_id()
505            }
506        } else {
507            false
508        }
509    }
510
511    fn new_to_be_graph_ready(
512        &mut self, index: usize, minimal_status: u8,
513    ) -> bool {
514        let ref node_me = self.arena[index];
515        // If a block has become graph-ready before and reclaimed,
516        // it will be marked as `already_processed`
517        // in `insert_block_header`, so we do not need to handle this case here.
518        // And thus we also won't propagate graph-ready to already processed
519        // blocks.
520        if node_me.graph_status >= minimal_status {
521            return false;
522        }
523
524        let genesis_hash = self.data_man.get_cur_consensus_era_genesis_hash();
525        let genesis_seq_num = self
526            .data_man
527            .local_block_info_by_hash(&genesis_hash)
528            .expect("local_block_info for genesis must exist")
529            .get_seq_num();
530        let parent = self.arena[index].parent;
531        let parent_graph_ready = if parent == NULL {
532            self.arena[index].parent_reclaimed
533                || self.is_graph_ready_in_db(
534                    self.arena[index].block_header.parent_hash(),
535                    genesis_seq_num,
536                )
537        } else {
538            self.arena[parent].graph_status >= minimal_status
539        };
540
541        if !parent_graph_ready {
542            return false;
543        } else if parent == NULL {
544            self.arena[index].parent_reclaimed = true;
545        }
546
547        // check whether referees are `BLOCK_HEADER_GRAPH_READY`
548        // 1. referees which are in
549        // memory and status is BLOCK_HEADER_GRAPH_READY.
550        // 2. referees
551        // which are not in memory and not invalid in disk
552        // (assume these blocks are BLOCK_GRAPH_READY)
553        let mut referee_hash_in_mem = HashSet::new();
554        for referee in self.arena[index].referees.iter() {
555            if self.arena[*referee].graph_status < minimal_status {
556                return false;
557            } else {
558                referee_hash_in_mem
559                    .insert(self.arena[*referee].block_header.hash());
560            }
561        }
562
563        for referee_hash in self.arena[index].block_header.referee_hashes() {
564            if !referee_hash_in_mem.contains(referee_hash) {
565                if !self.is_graph_ready_in_db(referee_hash, genesis_seq_num) {
566                    return false;
567                }
568            }
569        }
570
571        if !self.is_pos_reference_graph_ready(
572            index,
573            genesis_seq_num,
574            minimal_status,
575        ) {
576            debug!(
577                "Block {:?} not ready for its pos_reference: {:?}",
578                self.arena[index].block_header.hash(),
579                self.pos_verifier.get_pivot_decision(
580                    self.arena[index]
581                        .block_header
582                        .pos_reference()
583                        .as_ref()
584                        .unwrap()
585                )
586            );
587            // All its future will remain not ready.
588            self.pos_not_ready_blocks_frontier.insert(index);
589            return false;
590        }
591
592        // parent and referees are all header graph ready.
593        true
594    }
595
596    fn new_to_be_header_graph_ready(&mut self, index: usize) -> bool {
597        self.new_to_be_graph_ready(index, BLOCK_HEADER_GRAPH_READY)
598    }
599
600    fn new_to_be_block_graph_ready(&mut self, index: usize) -> bool {
601        self.new_to_be_graph_ready(index, BLOCK_GRAPH_READY)
602            && self.arena[index].block_ready
603    }
604
605    fn is_pos_reference_graph_ready(
606        &self, index: usize, genesis_seq_num: u64, minimal_status: u8,
607    ) -> bool {
608        // Check if the pos reference is committed.
609        match self.arena[index].block_header.pos_reference() {
610            // TODO(lpl): Should we check if the pos reference will never be
611            // committed?
612            Some(pos_reference) => {
613                match self.pos_verifier.get_pivot_decision(pos_reference) {
614                    // The pos reference has not been committed.
615                    None => false,
616                    Some(pivot_decision) => {
617                        // Check if this pivot_decision is graph_ready.
618                        match self.hash_to_arena_indices.get(&pivot_decision) {
619                            None => self.is_graph_ready_in_db(
620                                &pivot_decision,
621                                genesis_seq_num,
622                            ),
623                            Some(index) => {
624                                self.arena[*index].graph_status
625                                    >= minimal_status
626                            }
627                        }
628                    }
629                }
630            }
631            None => true,
632        }
633    }
634
635    // Get parent (height, timestamp, gas_limit, difficulty,
636    // parent_and_referee_pos_references) This function assumes that the
637    // parent and referee information MUST exist in memory or in disk.
638    fn get_parent_and_referee_info(
639        &self, index: usize,
640    ) -> (u64, u64, U256, U256, Vec<Option<PosBlockId>>) {
641        let parent_height;
642        let parent_timestamp;
643        let parent_gas_limit;
644        let parent_difficulty;
645        // Since eventually all blocks should have pos_references, we do not
646        // try to avoid loading them here before PoS is enabled.
647        let mut pos_references = Vec::new();
648        let parent = self.arena[index].parent;
649
650        // Get info for parent.
651        if parent != NULL {
652            parent_height = self.arena[parent].block_header.height();
653            parent_timestamp = self.arena[parent].block_header.timestamp();
654            parent_gas_limit = *self.arena[parent].block_header.gas_limit();
655            parent_difficulty = *self.arena[parent].block_header.difficulty();
656            pos_references
657                .push(self.arena[parent].block_header.pos_reference().clone())
658        } else {
659            let parent_hash = self.arena[index].block_header.parent_hash();
660            let parent_header = self
661                .data_man
662                .block_header_by_hash(parent_hash)
663                .unwrap()
664                .clone();
665            parent_height = parent_header.height();
666            parent_timestamp = parent_header.timestamp();
667            parent_gas_limit = *parent_header.gas_limit();
668            parent_difficulty = *parent_header.difficulty();
669            pos_references.push(parent_header.pos_reference().clone());
670        }
671
672        // Get pos references for referees.
673        let mut referee_hash_in_mem = HashSet::new();
674        for referee in self.arena[index].referees.iter() {
675            pos_references.push(
676                self.arena[*referee].block_header.pos_reference().clone(),
677            );
678            referee_hash_in_mem
679                .insert(self.arena[*referee].block_header.hash());
680        }
681
682        for referee_hash in self.arena[index].block_header.referee_hashes() {
683            if !referee_hash_in_mem.contains(referee_hash) {
684                let referee_header = self
685                    .data_man
686                    .block_header_by_hash(referee_hash)
687                    .unwrap()
688                    .clone();
689                pos_references.push(referee_header.pos_reference().clone());
690            }
691        }
692
693        (
694            parent_height,
695            parent_timestamp,
696            parent_gas_limit,
697            parent_difficulty,
698            pos_references,
699        )
700    }
701
702    fn verify_header_graph_ready_block(
703        &self, index: usize,
704    ) -> Result<(), Error> {
705        let epoch = self.arena[index].block_header.height();
706        let (
707            parent_height,
708            parent_timestamp,
709            parent_gas_limit,
710            parent_difficulty,
711            predecessor_pos_references,
712        ) = self.get_parent_and_referee_info(index);
713
714        // Verify the height and epoch numbers are correct
715        if parent_height + 1 != epoch {
716            warn!("Invalid height. mine {}, parent {}", epoch, parent_height);
717            return Err(From::from(BlockError::InvalidHeight(Mismatch {
718                expected: parent_height + 1,
719                found: epoch,
720            })));
721        }
722
723        // Verify the timestamp being correctly set.
724        // Conflux tries to maintain the timestamp drift among blocks
725        // in the graph, which probably being generated at the same time,
726        // within a small bound (specified by ACCEPTABLE_TIME_DRIFT).
727        // This is achieved through the following mechanism. Anytime
728        // when receiving a new block from the peer, if the timestamp of
729        // the block is more than ACCEPTABLE_TIME_DRIFT later than the
730        // current timestamp of the node, the block is postponed to be
731        // added into the graph until the current timestamp passes the
732        // the timestamp of the block. Otherwise, this block can be added
733        // into the graph.
734        // Meanwhile, Conflux also requires that the timestamp of each
735        // block must be later than or equal to its parent's timestamp.
736        // This is achieved through adjusting the timestamp of a newly
737        // generated block to the one later than its parent's timestamp.
738        // This is also enough for difficulty adjustment computation where
739        // the timespan in the adjustment period is only computed based on
740        // timestamps of pivot chain blocks.
741        let my_timestamp = self.arena[index].block_header.timestamp();
742        if parent_timestamp > my_timestamp {
743            let my_timestamp = UNIX_EPOCH + Duration::from_secs(my_timestamp);
744            let pa_timestamp =
745                UNIX_EPOCH + Duration::from_secs(parent_timestamp);
746
747            warn!("Invalid timestamp: parent {:?} timestamp {}, me {:?} timestamp {}",
748                  self.arena[index].block_header.parent_hash().clone(),
749                  parent_timestamp,
750                  self.arena[index].block_header.hash(),
751                  self.arena[index].block_header.timestamp());
752            return Err(From::from(BlockError::InvalidTimestamp(
753                OutOfBounds {
754                    max: Some(my_timestamp),
755                    min: Some(pa_timestamp),
756                    found: my_timestamp,
757                },
758            )));
759        }
760
761        let parent_gas_limit = parent_gas_limit
762            * if epoch == self.machine.params().transition_heights.cip1559 {
763                ELASTICITY_MULTIPLIER
764            } else {
765                1
766            };
767
768        // Verify the gas limit is respected
769        let self_gas_limit = *self.arena[index].block_header.gas_limit();
770        let gas_limit_divisor = self.machine.params().gas_limit_bound_divisor;
771        let min_gas_limit = self.machine.params().min_gas_limit;
772        let gas_upper =
773            parent_gas_limit + parent_gas_limit / gas_limit_divisor - 1;
774        let gas_lower = max(
775            parent_gas_limit - parent_gas_limit / gas_limit_divisor + 1,
776            min_gas_limit,
777        );
778
779        if self_gas_limit < gas_lower || self_gas_limit > gas_upper {
780            return Err(From::from(BlockError::InvalidGasLimit(OutOfBounds {
781                min: Some(gas_lower),
782                max: Some(gas_upper),
783                found: self_gas_limit,
784            })));
785        }
786
787        if !self.config.is_consortium {
788            // Verify difficulty being correctly set
789            let mut difficulty_invalid = false;
790            let my_diff = *self.arena[index].block_header.difficulty();
791            let mut min_diff = my_diff;
792            let mut max_diff = my_diff;
793            let initial_difficulty: U256 =
794                self.pow_config.initial_difficulty.into();
795
796            if parent_height
797                < self
798                    .pow_config
799                    .difficulty_adjustment_epoch_period(parent_height)
800            {
801                if my_diff != initial_difficulty {
802                    difficulty_invalid = true;
803                    min_diff = initial_difficulty;
804                    max_diff = initial_difficulty;
805                }
806            } else {
807                let last_period_upper = (parent_height
808                    / self
809                        .pow_config
810                        .difficulty_adjustment_epoch_period(parent_height))
811                    * self
812                        .pow_config
813                        .difficulty_adjustment_epoch_period(parent_height);
814                if last_period_upper != parent_height {
815                    // parent_epoch should not trigger difficulty adjustment
816                    if my_diff != parent_difficulty {
817                        difficulty_invalid = true;
818                        min_diff = parent_difficulty;
819                        max_diff = parent_difficulty;
820                    }
821                } else {
822                    let (lower, upper) =
823                        self.pow_config.get_adjustment_bound(parent_difficulty);
824                    min_diff = lower;
825                    max_diff = upper;
826                    if my_diff < min_diff || my_diff > max_diff {
827                        difficulty_invalid = true;
828                    }
829                }
830            }
831
832            if difficulty_invalid {
833                return Err(From::from(BlockError::InvalidDifficulty(
834                    OutOfBounds {
835                        min: Some(min_diff),
836                        max: Some(max_diff),
837                        found: my_diff,
838                    },
839                )));
840            }
841        }
842
843        if let Some(pos_reference) =
844            self.arena[index].block_header.pos_reference()
845        {
846            let mut pred_pos_ref_list = Vec::new();
847            for maybe_pos_ref in predecessor_pos_references {
848                if let Some(pos_ref) = maybe_pos_ref {
849                    pred_pos_ref_list.push(pos_ref);
850                }
851            }
852            if !self
853                .pos_verifier
854                .verify_against_predecessors(pos_reference, &pred_pos_ref_list)
855            {
856                bail!(BlockError::InvalidPosReference);
857            }
858        }
859
860        Ok(())
861    }
862
863    fn verify_graph_ready_block(
864        &self, index: usize, verification_config: &VerificationConfig,
865    ) -> Result<(), Error> {
866        let block_header = &self.arena[index].block_header;
867        let parent = self
868            .data_man
869            .block_header_by_hash(block_header.parent_hash())
870            .expect("headers will not be deleted");
871        let block = self
872            .data_man
873            .block_by_hash(&block_header.hash(), true)
874            .expect("received");
875        verification_config.verify_sync_graph_ready_block(&block, &parent)
876    }
877
878    fn process_invalid_blocks(&mut self, invalid_set: &HashSet<usize>) {
879        for index in invalid_set {
880            let hash = self.arena[*index].block_header.hash();
881            // Mark this block as invalid, so we don't need to request/verify it
882            // again
883            self.data_man.invalidate_block(hash);
884        }
885        self.remove_blocks(&invalid_set);
886    }
887
888    fn remove_blocks(&mut self, to_remove_set: &HashSet<usize>) {
889        for index in to_remove_set {
890            let hash = self.arena[*index].block_header.hash();
891            self.not_ready_blocks_frontier.remove(index);
892            self.pos_not_ready_blocks_frontier.remove(index);
893            self.old_era_blocks_frontier_set.remove(index);
894            // This include invalid blocks and blocks not received after a long
895            // time.
896            self.block_to_fill_set.remove(&hash);
897
898            let parent = self.arena[*index].parent;
899            if parent != NULL {
900                self.arena[parent].children.retain(|&x| x != *index);
901            }
902            let parent_hash = *self.arena[*index].block_header.parent_hash();
903            let mut remove_parent_hash: bool = false;
904            if let Some(children) = self.children_by_hash.get_mut(&parent_hash)
905            {
906                children.retain(|&x| x != *index);
907                remove_parent_hash = children.len() == 0;
908            }
909            // clean empty hash key
910            if remove_parent_hash {
911                self.children_by_hash.remove(&parent_hash);
912            }
913
914            let referees: Vec<usize> =
915                self.arena[*index].referees.iter().map(|x| *x).collect();
916            for referee in referees {
917                self.arena[referee].referrers.retain(|&x| x != *index);
918            }
919            let referee_hashes: Vec<H256> = self.arena[*index]
920                .block_header
921                .referee_hashes()
922                .iter()
923                .map(|x| *x)
924                .collect();
925            for referee_hash in referee_hashes {
926                let mut remove_referee_hash: bool = false;
927                if let Some(referrers) =
928                    self.referrers_by_hash.get_mut(&referee_hash)
929                {
930                    referrers.retain(|&x| x != *index);
931                    remove_referee_hash = referrers.len() == 0;
932                }
933                // clean empty hash key
934                if remove_referee_hash {
935                    self.referrers_by_hash.remove(&referee_hash);
936                }
937            }
938
939            let children: Vec<usize> =
940                self.arena[*index].children.iter().map(|x| *x).collect();
941            for child in children {
942                debug_assert!(to_remove_set.contains(&child));
943                self.arena[child].parent = NULL;
944            }
945
946            let referrers: Vec<usize> =
947                self.arena[*index].referrers.iter().map(|x| *x).collect();
948            for referrer in referrers {
949                debug_assert!(to_remove_set.contains(&referrer));
950                self.arena[referrer].referees.retain(|&x| x != *index);
951            }
952
953            self.arena.remove(*index);
954            self.hash_to_arena_indices.remove(&hash);
955            // remove header/block in memory cache and header/block in db
956            self.data_man
957                .remove_useless_block(&hash, true /* remove_db */);
958        }
959    }
960
961    fn set_and_propagate_invalid(
962        &mut self, queue: &mut VecDeque<usize>,
963        invalid_set: &mut HashSet<usize>, index: usize,
964    ) {
965        let children =
966            mem::replace(&mut self.arena[index].children, Vec::new());
967        for child in &children {
968            if !invalid_set.contains(&child) {
969                self.arena[*child].graph_status = BLOCK_INVALID;
970                queue.push_back(*child);
971                invalid_set.insert(*child);
972            }
973        }
974        self.arena[index].children = children;
975        let referrers =
976            mem::replace(&mut self.arena[index].referrers, Vec::new());
977        for referrer in &referrers {
978            if !invalid_set.contains(&referrer) {
979                self.arena[*referrer].graph_status = BLOCK_INVALID;
980                queue.push_back(*referrer);
981                invalid_set.insert(*referrer);
982            }
983        }
984        self.arena[index].referrers = referrers;
985    }
986}
987
988pub struct SynchronizationGraph {
989    pub inner: Arc<RwLock<SynchronizationGraphInner>>,
990    pub consensus: SharedConsensusGraph,
991    pub data_man: Arc<BlockDataManager>,
992    pub pow: Arc<PowComputer>,
993    pub verification_config: VerificationConfig,
994    pub sync_config: SyncGraphConfig,
995    pub statistics: SharedStatistics,
996    /// This is the boolean state shared with the underlying consensus worker
997    /// to indicate whether the worker is now finished all pending blocks.
998    /// Since the critical section is very short, a `Mutex` is enough.
999    consensus_unprocessed_count: Arc<AtomicUsize>,
1000
1001    /// Channel used to send block hashes to `ConsensusGraph` and PubSub.
1002    /// Each element is <block_hash, ignore_body>
1003    new_block_hashes: Arc<Channel<H256>>,
1004
1005    /// The blocks whose timestamps are near future.
1006    /// They will be inserted into sync graph inner at their timestamp.
1007    pub future_blocks: FutureBlockContainer,
1008
1009    machine: Arc<Machine>,
1010}
1011
1012impl MallocSizeOf for SynchronizationGraph {
1013    fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
1014        let inner_size = self.inner.read().size_of(ops);
1015        let mut malloc_size = inner_size + self.data_man.size_of(ops);
1016
1017        // TODO: Add statistics for consortium.
1018        if !self.is_consortium() {
1019            let consensus_graph = &*self.consensus;
1020            malloc_size += consensus_graph.size_of(ops);
1021        }
1022        // Does not count size_of machine.
1023
1024        malloc_size
1025    }
1026}
1027
1028pub type SharedSynchronizationGraph = Arc<SynchronizationGraph>;
1029
1030impl SynchronizationGraph {
1031    pub fn new(
1032        consensus: SharedConsensusGraph, data_man: Arc<BlockDataManager>,
1033        statistics: SharedStatistics, verification_config: VerificationConfig,
1034        pow_config: ProofOfWorkConfig, pow: Arc<PowComputer>,
1035        sync_config: SyncGraphConfig, notifications: Arc<Notifications>,
1036        machine: Arc<Machine>, pos_verifier: Arc<PosVerifier>,
1037    ) -> Self {
1038        let genesis_hash = data_man.get_cur_consensus_era_genesis_hash();
1039        let genesis_block_header = data_man
1040            .block_header_by_hash(&genesis_hash)
1041            .expect("genesis block header should exist here");
1042
1043        // It should not be initialized to `true` now, otherwise consensus
1044        // worker will be blocked on waiting the first block forever.
1045        let consensus_unprocessed_count = Arc::new(AtomicUsize::new(0));
1046        let mut consensus_receiver = notifications.new_block_hashes.subscribe();
1047        let inner = Arc::new(RwLock::new(
1048            SynchronizationGraphInner::with_genesis_block(
1049                genesis_block_header.clone(),
1050                pow_config,
1051                pow.clone(),
1052                sync_config,
1053                data_man.clone(),
1054                machine.clone(),
1055                pos_verifier.clone(),
1056            ),
1057        ));
1058        let sync_graph = SynchronizationGraph {
1059            inner: inner.clone(),
1060            future_blocks: FutureBlockContainer::new(
1061                sync_config.future_block_buffer_capacity,
1062            ),
1063            data_man: data_man.clone(),
1064            pow: pow.clone(),
1065            verification_config,
1066            sync_config,
1067            consensus: consensus.clone(),
1068            statistics: statistics.clone(),
1069            consensus_unprocessed_count: consensus_unprocessed_count.clone(),
1070            new_block_hashes: notifications.new_block_hashes.clone(),
1071            machine,
1072        };
1073
1074        // It receives `BLOCK_GRAPH_READY` blocks in order and handles them in
1075        // `ConsensusGraph`
1076        thread::Builder::new()
1077            .name("Consensus Worker".into())
1078            .spawn(move || {
1079                // The Consensus Worker will prioritize blocks based on its parent epoch number while respecting the topological order. This has the following two benefits:
1080                //
1081                // 1. It will almost make sure that the self mined block being processed first
1082                //
1083                // 2. In case of a DoS attack that a malicious player releases a large chunk of old blocks. This strategy will make the consensus to process the meaningful blocks first.
1084                let mut priority_queue: BinaryHeap<(u64, H256)> = BinaryHeap::new();
1085                let mut reverse_map : HashMap<H256, Vec<H256>> = HashMap::new();
1086                let mut counter_map = HashMap::new();
1087                let mut pos_started = false;
1088
1089                'outer: loop {
1090                    // Only block when we have processed all received blocks.
1091                    let mut blocking = priority_queue.is_empty();
1092                    'inner: loop {
1093                        // Use blocking `recv` for the first element, and then drain the receiver
1094                        // with non-blocking `try_recv`.
1095                        let maybe_item = if blocking {
1096                            blocking = false;
1097                            match block_on(consensus_receiver.recv()) {
1098                                Some(item) => Ok(item),
1099                                None => break 'outer,
1100                            }
1101                        } else {
1102                            consensus_receiver.try_recv()
1103                        };
1104
1105                        match maybe_item {
1106                            // FIXME: We need to investigate why duplicate hash may send to the consensus worker
1107                            Ok(hash) => if !reverse_map.contains_key(&hash) {
1108                                debug!("Worker thread receive: block = {}", hash);
1109                                let header = data_man.block_header_by_hash(&hash).expect("Header must exist before sending to the consensus worker!");
1110
1111                                // start pos with an era advance.
1112                                if !pos_started && pos_verifier.is_enabled_at_height(header.height() + consensus.config().inner_conf.era_epoch_count) {
1113                                    if let Err(e) = pos_verifier.initialize(consensus.clone()) {
1114                                        info!("PoS cannot be started at the expected height: e={}", e);
1115                                    } else {
1116                                        pos_started = true;
1117                                    }
1118                                }
1119
1120                                let mut cnt: usize = 0;
1121                                let parent_hash = header.parent_hash();
1122                                if let Some(v) = reverse_map.get_mut(parent_hash) {
1123                                    v.push(hash.clone());
1124                                    cnt += 1;
1125                                }
1126                                for referee in header.referee_hashes() {
1127                                    if let Some(v) = reverse_map.get_mut(referee) {
1128                                        v.push(hash.clone());
1129                                        cnt += 1;
1130                                    }
1131                                }
1132                                if let Some(pivot_decision) = header.pos_reference().as_ref().and_then(|pos_reference| pos_verifier.get_pivot_decision(pos_reference)) {
1133                                    if let Some(v) = reverse_map.get_mut(&pivot_decision) {
1134                                        v.push(hash.clone());
1135                                        cnt += 1;
1136                                    }
1137                                }
1138                                reverse_map.insert(hash.clone(), Vec::new());
1139                                if cnt == 0 {
1140                                    let epoch_number = consensus.get_block_epoch_number(parent_hash).unwrap_or(0);
1141                                    priority_queue.push((epoch_number, hash));
1142                                } else {
1143                                    counter_map.insert(hash, cnt);
1144                                }
1145                            } else {
1146                                warn!("Duplicate block = {} sent to the consensus worker", hash);
1147                            },
1148                            Err(TryRecvError::Empty) => break 'inner,
1149                            Err(TryRecvError::Disconnected) => break 'outer,
1150                        }
1151                    }
1152                    if let Some((_, hash)) = priority_queue.pop() {
1153                        CONSENSUS_WORKER_QUEUE.dequeue(1);
1154                        let successors = reverse_map.remove(&hash).unwrap();
1155                        for succ in successors {
1156                            let cnt_tuple = counter_map.get_mut(&succ).unwrap();
1157                            *cnt_tuple -= 1;
1158                            if *cnt_tuple == 0 {
1159                                counter_map.remove(&succ);
1160                                let header_succ = data_man.block_header_by_hash(&succ).expect("Header must exist before sending to the consensus worker!");
1161                                let parent_succ = header_succ.parent_hash();
1162                                let epoch_number = consensus.get_block_epoch_number(parent_succ).unwrap_or(0);
1163                                priority_queue.push((epoch_number, succ));
1164                            }
1165                        }
1166                        consensus.on_new_block(
1167                            &hash,
1168                        );
1169                        consensus_unprocessed_count.fetch_sub(1, Ordering::SeqCst);
1170                    }
1171                }
1172            })
1173            .expect("Cannot fail");
1174        sync_graph
1175    }
1176
1177    pub fn is_consortium(&self) -> bool { self.sync_config.is_consortium }
1178
1179    pub fn machine(&self) -> Arc<Machine> { self.machine.clone() }
1180
1181    pub fn get_genesis_hash_and_height_in_current_era(&self) -> (H256, u64) {
1182        self.inner
1183            .read()
1184            .get_genesis_hash_and_height_in_current_era()
1185    }
1186
1187    /// Compute the expected difficulty for a block given its
1188    /// parent hash
1189    pub fn expected_difficulty(&self, parent_hash: &H256) -> U256 {
1190        self.consensus.expected_difficulty(parent_hash)
1191    }
1192
1193    pub fn get_to_propagate_trans(
1194        &self,
1195    ) -> HashMap<H256, Arc<SignedTransaction>> {
1196        self.consensus.tx_pool().get_to_be_propagated_transactions()
1197    }
1198
1199    pub fn set_to_propagate_trans(
1200        &self, transactions: HashMap<H256, Arc<SignedTransaction>>,
1201    ) {
1202        self.consensus
1203            .tx_pool()
1204            .set_to_be_propagated_transactions(transactions);
1205    }
1206
1207    /// In full/archive node, this function can be invoked during
1208    /// CatchUpRecoverBlockHeaderFromDbPhase phase.
1209    /// It tries to construct the consensus graph based on header
1210    /// information stored in db.
1211    pub fn recover_graph_from_db(&self) {
1212        info!("Start fast recovery of the block DAG from database");
1213
1214        // Recover the initial sequence number in consensus graph
1215        // based on the sequence number of genesis block in db.
1216        let genesis_hash = self.data_man.get_cur_consensus_era_genesis_hash();
1217        let genesis_local_info =
1218            self.data_man.local_block_info_by_hash(&genesis_hash);
1219        if genesis_local_info.is_none() {
1220            // Local info of genesis block must exist.
1221            panic!(
1222                "failed to get local block info from db for genesis[{}]",
1223                genesis_hash
1224            );
1225        }
1226        let genesis_seq_num = genesis_local_info.unwrap().get_seq_num();
1227        self.consensus.set_initial_sequence_number(genesis_seq_num);
1228        let genesis_header =
1229            self.data_man.block_header_by_hash(&genesis_hash).unwrap();
1230        debug!(
1231            "Get current genesis_block hash={:?}, height={}, seq_num={}",
1232            genesis_hash,
1233            genesis_header.height(),
1234            genesis_seq_num
1235        );
1236
1237        // Get terminals stored in db.
1238        let terminals_opt = self.data_man.terminals_from_db();
1239        if terminals_opt.is_none() {
1240            return;
1241        }
1242        let terminals = terminals_opt.unwrap();
1243        debug!("Get terminals {:?}", terminals);
1244
1245        // Reconstruct the consensus graph by traversing backward from
1246        // terminals. This traversal will visit all the blocks under the
1247        // future of current era genesis till the terminals. However,
1248        // some blocks may not be graph-ready since they may have
1249        // references or parents which are out of the current era. We
1250        // need to resolve these out-of-era dependencies later and make
1251        // those blocks be graph-ready again.
1252        let mut queue = VecDeque::new();
1253        let mut visited_blocks: HashSet<H256> = HashSet::new();
1254        for terminal in terminals {
1255            // header terminals and block terminals may contain the same hash
1256            if !visited_blocks.contains(&terminal) {
1257                queue.push_back(terminal);
1258                visited_blocks.insert(terminal);
1259            }
1260        }
1261
1262        // Remember the hashes of blocks that belong to the current genesis
1263        // era but are missed in db. The missed blocks will be fetched from
1264        // peers.
1265        let mut missed_hashes = HashSet::new();
1266        while let Some(hash) = queue.pop_front() {
1267            if hash == genesis_hash {
1268                // Genesis block is already in consensus graph.
1269                continue;
1270            }
1271
1272            // Ignore blocks beyond the future of current genesis era.
1273            // If block_local_info is missing, consider it is in current
1274            // genesis era.
1275            if let Some(block_local_info) =
1276                self.data_man.local_block_info_by_hash(&hash)
1277            {
1278                if block_local_info.get_seq_num() < genesis_seq_num {
1279                    debug!(
1280                        "Skip block {:?} before checkpoint: seq_num={}",
1281                        hash,
1282                        block_local_info.get_seq_num()
1283                    );
1284                    continue;
1285                }
1286            }
1287
1288            if let Some(block_header) =
1289                self.data_man.block_header_by_hash(&hash)
1290            {
1291                self.insert_block_header(
1292                    &mut block_header.as_ref().clone(),
1293                    true,  /* need_to_verify */
1294                    false, /* bench_mode */
1295                    true,  /* insert_to_consensus */
1296                    false, /* persistent */
1297                );
1298                let parent = block_header.parent_hash().clone();
1299                let referees = block_header.referee_hashes().clone();
1300                if !visited_blocks.contains(&parent) {
1301                    queue.push_back(parent);
1302                    visited_blocks.insert(parent);
1303                }
1304                for referee in referees {
1305                    if !visited_blocks.contains(&referee) {
1306                        queue.push_back(referee);
1307                        visited_blocks.insert(referee);
1308                    }
1309                }
1310            } else {
1311                missed_hashes.insert(hash);
1312            }
1313        }
1314
1315        debug!(
1316            "Current frontier after recover from db: {:?}",
1317            self.inner.read().not_ready_blocks_frontier.get_frontier()
1318        );
1319
1320        info!("Finish reconstructing the pivot chain of length {}, start to sync from peers", self.consensus.best_epoch_number());
1321    }
1322
1323    /// Return None if `hash` is not in sync graph
1324    pub fn block_header_by_hash(&self, hash: &H256) -> Option<BlockHeader> {
1325        if !self.contains_block_header(hash) {
1326            // Only return headers in sync graph
1327            return None;
1328        }
1329        self.data_man
1330            .block_header_by_hash(hash)
1331            .map(|header_ref| header_ref.as_ref().clone())
1332    }
1333
1334    /// Return None if `hash` is not in sync graph
1335    pub fn block_height_by_hash(&self, hash: &H256) -> Option<u64> {
1336        self.block_header_by_hash(hash)
1337            .map(|header| header.height())
1338    }
1339
1340    /// Return None if `hash` is not in sync graph
1341    pub fn block_timestamp_by_hash(&self, hash: &H256) -> Option<u64> {
1342        self.block_header_by_hash(hash)
1343            .map(|header| header.timestamp())
1344    }
1345
1346    /// TODO Be more specific about which functions only return in-memory data
1347    /// and which can return the in-database data
1348    pub fn block_by_hash(&self, hash: &H256) -> Option<Arc<Block>> {
1349        self.data_man.block_by_hash(hash, true /* update_cache */)
1350    }
1351
1352    pub fn contains_block_header(&self, hash: &H256) -> bool {
1353        self.inner.read().hash_to_arena_indices.contains_key(hash)
1354            || self.future_blocks.contains(hash)
1355    }
1356
1357    fn parent_or_referees_invalid(&self, header: &BlockHeader) -> bool {
1358        self.data_man.verified_invalid(header.parent_hash()).0
1359            || header
1360                .referee_hashes()
1361                .iter()
1362                .any(|referee| self.data_man.verified_invalid(referee).0)
1363    }
1364
1365    /// subroutine called by `insert_block_header` and `remove_expire_blocks`
1366    fn propagate_header_graph_status(
1367        &self, inner: &mut SynchronizationGraphInner,
1368        frontier_index_list: Vec<usize>, need_to_verify: bool,
1369        header_index_to_insert: usize, insert_to_consensus: bool,
1370        persistent: bool,
1371    ) -> (HashSet<usize>, Vec<H256>) {
1372        let now = SystemTime::now()
1373            .duration_since(UNIX_EPOCH)
1374            .unwrap()
1375            .as_secs();
1376        let mut need_to_relay: Vec<H256> = Vec::new();
1377        let mut invalid_set: HashSet<usize> = HashSet::new();
1378        let mut queue = VecDeque::new();
1379
1380        for index in frontier_index_list {
1381            if inner.arena[index].graph_status == BLOCK_INVALID {
1382                invalid_set.insert(index);
1383            }
1384            queue.push_back(index);
1385        }
1386
1387        while let Some(index) = queue.pop_front() {
1388            if inner.arena[index].graph_status == BLOCK_INVALID {
1389                inner.set_and_propagate_invalid(
1390                    &mut queue,
1391                    &mut invalid_set,
1392                    index,
1393                );
1394            } else if inner.new_to_be_header_graph_ready(index) {
1395                inner.arena[index].graph_status = BLOCK_HEADER_GRAPH_READY;
1396                inner.arena[index].last_update_timestamp = now;
1397                debug!("BlockIndex {} parent_index {} hash {:?} is header graph ready", index,
1398                           inner.arena[index].parent, inner.arena[index].block_header.hash());
1399
1400                let r = inner.verify_header_graph_ready_block(index);
1401
1402                if need_to_verify && r.is_err() {
1403                    warn!(
1404                        "Invalid header_arc! inserted_header={:?} err={:?}",
1405                        inner.arena[index].block_header.clone(),
1406                        r
1407                    );
1408                    invalid_set.insert(index);
1409                    inner.arena[index].graph_status = BLOCK_INVALID;
1410                    inner.set_and_propagate_invalid(
1411                        &mut queue,
1412                        &mut invalid_set,
1413                        index,
1414                    );
1415                    continue;
1416                }
1417
1418                // Maintain `old_era_blocks_frontier` for future garbage
1419                // collection after making a checkpoint.
1420                if inner.arena[index].parent_reclaimed {
1421                    inner.old_era_blocks_frontier.push_back(index);
1422                    inner.old_era_blocks_frontier_set.insert(index);
1423                }
1424
1425                // Note that when called by `insert_block_header` we have to
1426                // insert header here immediately instead of
1427                // after the loop because its children may
1428                // become ready and being processed in the loop later. It
1429                // requires this block already being inserted
1430                // into the BlockDataManager!
1431                if index == header_index_to_insert && persistent {
1432                    self.data_man.insert_block_header(
1433                        inner.arena[index].block_header.hash(),
1434                        inner.arena[index].block_header.clone(),
1435                        true,
1436                    );
1437                }
1438                if insert_to_consensus {
1439                    CONSENSUS_WORKER_QUEUE.enqueue(1);
1440
1441                    self.consensus_unprocessed_count
1442                        .fetch_add(1, Ordering::SeqCst);
1443                    assert!(
1444                        self.new_block_hashes
1445                            .send(inner.arena[index].block_header.hash(),),
1446                        "consensus receiver dropped"
1447                    );
1448
1449                    // maintain not_ready_blocks_frontier
1450                    inner.not_ready_blocks_frontier.remove(&index);
1451                    // The children will be automatically added in
1452                    // `new_to_be_header_graph_ready` if they should be added.
1453                    inner.pos_not_ready_blocks_frontier.remove(&index);
1454                    for child in &inner.arena[index].children {
1455                        inner.not_ready_blocks_frontier.insert(*child);
1456                    }
1457                }
1458
1459                // Passed verification on header_arc.
1460                if inner.arena[index].block_ready {
1461                    need_to_relay.push(inner.arena[index].block_header.hash());
1462                }
1463
1464                for child in &inner.arena[index].children {
1465                    if inner.arena[*child].graph_status
1466                        < BLOCK_HEADER_GRAPH_READY
1467                    {
1468                        queue.push_back(*child);
1469                    }
1470                }
1471                for referrer in &inner.arena[index].referrers {
1472                    if inner.arena[*referrer].graph_status
1473                        < BLOCK_HEADER_GRAPH_READY
1474                    {
1475                        queue.push_back(*referrer);
1476                    }
1477                }
1478            } else {
1479                debug!(
1480                    "BlockIndex {} parent_index {} hash {:?} is not ready",
1481                    index,
1482                    inner.arena[index].parent,
1483                    inner.arena[index].block_header.hash()
1484                );
1485                if index == header_index_to_insert && persistent {
1486                    self.data_man.insert_block_header(
1487                        inner.arena[index].block_header.hash(),
1488                        inner.arena[index].block_header.clone(),
1489                        true,
1490                    );
1491                }
1492            }
1493        }
1494        (invalid_set, need_to_relay)
1495    }
1496
1497    pub fn insert_block_header(
1498        &self, header: &mut BlockHeader, need_to_verify: bool,
1499        bench_mode: bool, insert_to_consensus: bool, persistent: bool,
1500    ) -> (BlockHeaderInsertionResult, Vec<H256>) {
1501        let _timer = MeterTimer::time_func(SYNC_INSERT_HEADER.as_ref());
1502        self.statistics.inc_sync_graph_inserted_header_count();
1503        let inner = &mut *self.inner.write();
1504        if inner.locked_for_catchup {
1505            // Ignore received headers when we are downloading block bodies.
1506            return (BlockHeaderInsertionResult::TemporarySkipped, Vec::new());
1507        }
1508        let hash = header.hash();
1509
1510        let (invalid, local_info_opt) = self.data_man.verified_invalid(&hash);
1511        if invalid {
1512            return (BlockHeaderInsertionResult::Invalid, Vec::new());
1513        }
1514
1515        if let Some(info) = local_info_opt {
1516            // If the block is ordered before current era genesis or it has
1517            // already entered consensus graph in this run, we do not need to
1518            // process it. And in these two cases, the block is considered
1519            // valid.
1520            let already_processed = info.get_seq_num()
1521                < self.consensus.current_era_genesis_seq_num()
1522                || info.get_instance_id() == self.data_man.get_instance_id();
1523            if already_processed {
1524                if need_to_verify && !self.is_consortium() {
1525                    // Compute pow_quality, because the input header may be used
1526                    // as a part of block later
1527                    VerificationConfig::get_or_fill_header_pow_quality(
1528                        &self.pow, header,
1529                    );
1530                }
1531                return (
1532                    BlockHeaderInsertionResult::AlreadyProcessedInConsensus,
1533                    Vec::new(),
1534                );
1535            }
1536        }
1537
1538        if inner.hash_to_arena_indices.contains_key(&hash) {
1539            if need_to_verify {
1540                // Compute pow_quality, because the input header may be used as
1541                // a part of block later
1542                VerificationConfig::get_or_fill_header_pow_quality(
1543                    &self.pow, header,
1544                );
1545            }
1546            return (
1547                BlockHeaderInsertionResult::AlreadyProcessedInSync,
1548                Vec::new(),
1549            );
1550        }
1551
1552        // skip check for consortium currently
1553        debug!("is_consortium={:?}", self.is_consortium());
1554        let verification_passed = if need_to_verify {
1555            self.is_consortium()
1556                || !(self.parent_or_referees_invalid(header)
1557                    || self
1558                        .verification_config
1559                        .verify_header_params(&self.pow, header)
1560                        .or_else(|e| {
1561                            warn!(
1562                                "Invalid header: err={} header={:?}",
1563                                e, header
1564                            );
1565                            Err(e)
1566                        })
1567                        .is_err())
1568        } else {
1569            if !bench_mode && !self.is_consortium() {
1570                self.verification_config
1571                    .verify_pow(&self.pow, header)
1572                    .expect("local mined block should pass this check!");
1573            }
1574            true
1575        };
1576
1577        let header_arc = Arc::new(header.clone());
1578        let me = if verification_passed {
1579            inner.insert(header_arc.clone())
1580        } else {
1581            inner.insert_invalid(header_arc.clone())
1582        };
1583
1584        // Currently, `inner.arena[me].graph_status` will only be
1585        //   1. `BLOCK_GRAPH_READY` for genesis block.
1586        //   2. `BLOCK_HEADER_ONLY` for non genesis block.
1587        //   3. `BLOCK_INVALID` for invalid block.
1588        if inner.arena[me].graph_status != BLOCK_GRAPH_READY {
1589            // This block will become a new `not_ready_blocks_frontier` if
1590            //   1. It's parent block has not inserted yet.
1591            //   2. We are in `Catch Up Blocks Phase` and the graph status of
1592            // parent block is `BLOCK_GRAPH_READY`.
1593            //   3. We are in `Catch Up Headers Phase` and the graph status of
1594            // parent block is `BLOCK_HEADER_GRAPH_READY`.
1595            //   4. The block is not graph ready because of not-ready
1596            // pos_reference, and parent is not in the frontier.
1597            if inner.arena[me].parent == NULL
1598                || inner.arena[inner.arena[me].parent].graph_status
1599                    == BLOCK_GRAPH_READY
1600                || (insert_to_consensus
1601                    && inner.arena[inner.arena[me].parent].graph_status
1602                        == BLOCK_HEADER_GRAPH_READY)
1603            {
1604                inner.not_ready_blocks_frontier.insert(me);
1605            }
1606            let mut to_be_removed = Vec::new();
1607            for child in &inner.arena[me].children {
1608                if inner.not_ready_blocks_frontier.contains(child) {
1609                    to_be_removed.push(*child);
1610                }
1611            }
1612            for x in to_be_removed {
1613                inner.not_ready_blocks_frontier.remove(&x);
1614            }
1615        }
1616
1617        debug!("insert_block_header() Block = {:?}, index = {}, need_to_verify = {}, bench_mode = {} insert_to_consensus = {}",
1618               header.hash(), me, need_to_verify, bench_mode, insert_to_consensus);
1619
1620        // Start to pass influence to descendants
1621        let (invalid_set, need_to_relay) = self.propagate_header_graph_status(
1622            inner,
1623            vec![me],
1624            need_to_verify,
1625            me,
1626            insert_to_consensus,
1627            persistent,
1628        );
1629
1630        let me_invalid = invalid_set.contains(&me);
1631
1632        // Post-processing invalid blocks.
1633        inner.process_invalid_blocks(&invalid_set);
1634
1635        if me_invalid {
1636            return (BlockHeaderInsertionResult::Invalid, need_to_relay);
1637        }
1638
1639        inner.try_clear_old_era_blocks();
1640
1641        (BlockHeaderInsertionResult::NewValid, need_to_relay)
1642    }
1643
1644    pub fn contains_block(&self, hash: &H256) -> bool {
1645        let inner = self.inner.read();
1646        if let Some(index) = inner.hash_to_arena_indices.get(hash) {
1647            inner.arena[*index].block_ready
1648        } else {
1649            false
1650        }
1651    }
1652
1653    fn set_graph_ready(
1654        &self, inner: &mut SynchronizationGraphInner, index: usize,
1655    ) {
1656        inner.arena[index].graph_status = BLOCK_GRAPH_READY;
1657
1658        // maintain not_ready_blocks_frontier
1659        inner.not_ready_blocks_frontier.remove(&index);
1660        // The children will be automatically added in
1661        // `new_to_be_block_graph_ready` if they should be added.
1662        inner.pos_not_ready_blocks_frontier.remove(&index);
1663        for child in &inner.arena[index].children {
1664            inner.not_ready_blocks_frontier.insert(*child);
1665        }
1666
1667        let h = inner.arena[index].block_header.hash();
1668        debug!("Block {:?} is graph ready", h);
1669        CONSENSUS_WORKER_QUEUE.enqueue(1);
1670
1671        self.consensus_unprocessed_count
1672            .fetch_add(1, Ordering::SeqCst);
1673        assert!(self.new_block_hashes.send(h), "consensus receiver dropped");
1674
1675        if inner.config.enable_state_expose {
1676            STATE_EXPOSER.sync_graph.lock().ready_block_vec.push(
1677                SyncGraphBlockState {
1678                    block_hash: h,
1679                    parent: inner.arena[index]
1680                        .block_header
1681                        .parent_hash()
1682                        .clone(),
1683                    referees: inner.arena[index]
1684                        .block_header
1685                        .referee_hashes()
1686                        .clone(),
1687                    nonce: inner.arena[index].block_header.nonce(),
1688                    timestamp: inner.arena[index].block_header.timestamp(),
1689                    adaptive: inner.arena[index].block_header.adaptive(),
1690                },
1691            );
1692        }
1693    }
1694
1695    /// subroutine called by `insert_block` and `remove_expire_blocks`
1696    fn propagate_graph_status(
1697        &self, inner: &mut SynchronizationGraphInner,
1698        frontier_index_list: Vec<usize>,
1699    ) -> HashSet<usize> {
1700        let mut queue = VecDeque::new();
1701        let mut invalid_set = HashSet::new();
1702        for index in frontier_index_list {
1703            if inner.arena[index].graph_status == BLOCK_INVALID {
1704                invalid_set.insert(index);
1705            }
1706            queue.push_back(index);
1707        }
1708
1709        while let Some(index) = queue.pop_front() {
1710            if inner.arena[index].graph_status == BLOCK_INVALID {
1711                inner.set_and_propagate_invalid(
1712                    &mut queue,
1713                    &mut invalid_set,
1714                    index,
1715                );
1716            } else if inner.new_to_be_block_graph_ready(index) {
1717                let verify_result = inner
1718                    .verify_graph_ready_block(index, &self.verification_config);
1719                if verify_result.is_err() {
1720                    warn!(
1721                        "Invalid block! inserted_header={:?} err={:?}",
1722                        inner.arena[index].block_header.clone(),
1723                        verify_result
1724                    );
1725                    invalid_set.insert(index);
1726                    inner.arena[index].graph_status = BLOCK_INVALID;
1727                    inner.set_and_propagate_invalid(
1728                        &mut queue,
1729                        &mut invalid_set,
1730                        index,
1731                    );
1732                    continue;
1733                }
1734                self.set_graph_ready(inner, index);
1735                for child in &inner.arena[index].children {
1736                    debug_assert!(
1737                        inner.arena[*child].graph_status < BLOCK_GRAPH_READY
1738                    );
1739                    queue.push_back(*child);
1740                }
1741                for referrer in &inner.arena[index].referrers {
1742                    debug_assert!(
1743                        inner.arena[*referrer].graph_status < BLOCK_GRAPH_READY
1744                    );
1745                    queue.push_back(*referrer);
1746                }
1747            } else {
1748                trace!("Block index {:?} not block_graph_ready, current frontier: {:?}", index, inner.not_ready_blocks_frontier.get_frontier());
1749            }
1750        }
1751
1752        invalid_set
1753    }
1754
1755    pub fn insert_block(
1756        &self, block: Block, need_to_verify: bool, persistent: bool,
1757        recover_from_db: bool,
1758    ) -> BlockInsertionResult {
1759        let _timer = MeterTimer::time_func(SYNC_INSERT_BLOCK.as_ref());
1760        let hash = block.hash();
1761
1762        debug!("insert_block {:?}", hash);
1763
1764        let inner = &mut *self.inner.write();
1765
1766        let contains_block =
1767            if let Some(index) = inner.hash_to_arena_indices.get(&hash) {
1768                inner.arena[*index].block_ready
1769            } else {
1770                // Sync graph is cleaned after inserting the header, so we can
1771                // ignore the block body
1772                return BlockInsertionResult::Ignored;
1773            };
1774
1775        if contains_block {
1776            return BlockInsertionResult::AlreadyProcessed;
1777        }
1778
1779        // We only insert the body after a valid header is inserted, so this has
1780        // been checked when we insert the header.
1781        debug_assert!(!self.data_man.verified_invalid(&hash).0);
1782
1783        self.statistics.inc_sync_graph_inserted_block_count();
1784
1785        let me = *inner.hash_to_arena_indices.get(&hash).unwrap();
1786
1787        debug_assert!(hash == inner.arena[me].block_header.hash());
1788        debug_assert!(!inner.arena[me].block_ready);
1789        inner.arena[me].block_ready = true;
1790
1791        if need_to_verify {
1792            let r = self.verification_config.verify_sync_graph_block_basic(
1793                &block,
1794                self.consensus.best_chain_id(),
1795            );
1796            match r {
1797                Err(Error::Block(BlockError::InvalidTransactionsRoot(e))) => {
1798                    warn!("BlockTransactionRoot not match! inserted_block={:?} err={:?}", block, e);
1799                    // If the transaction root does not match, it might be
1800                    // caused by receiving wrong
1801                    // transactions because of conflicting ShortId in
1802                    // CompactBlock, or caused by
1803                    // adversaries. In either case, we should request the block
1804                    // again, and the received block body is
1805                    // discarded.
1806                    inner.arena[me].block_ready = false;
1807                    return BlockInsertionResult::RequestAgain;
1808                }
1809                Err(e) => {
1810                    warn!(
1811                        "Invalid block! inserted_block={:?} err={:?}",
1812                        block.block_header, e
1813                    );
1814                    inner.arena[me].graph_status = BLOCK_INVALID;
1815                }
1816                _ => {}
1817            };
1818        }
1819
1820        let block = Arc::new(block);
1821        if inner.arena[me].graph_status != BLOCK_INVALID {
1822            // If we are rebuilding the graph from db, we do not insert all
1823            // blocks into memory
1824            if !recover_from_db {
1825                // Here we always build a new compact block because we should
1826                // not reuse the nonce
1827                self.data_man.insert_compact_block(block.to_compact());
1828                // block header was inserted in before, only insert block body
1829                // here
1830                self.data_man.insert_block_body(
1831                    block.hash(),
1832                    block.clone(),
1833                    persistent,
1834                );
1835            }
1836        }
1837
1838        // If we are locked for catch-up, make sure no new block will enter sync
1839        // graph.
1840        if inner.locked_for_catchup {
1841            if inner.arena[me].graph_status == BLOCK_INVALID {
1842                let invalid_set = self.propagate_graph_status(inner, vec![me]);
1843                // Invalid blocks will also be removed from
1844                // `block_to_fill_set`
1845                // in `process_invalid_blocks`.
1846                inner.process_invalid_blocks(&invalid_set);
1847                return BlockInsertionResult::Invalid;
1848            } else {
1849                debug!("Downloaded block body for {:?}", hash);
1850                inner.block_to_fill_set.remove(&hash);
1851                return BlockInsertionResult::AlreadyProcessed;
1852            }
1853        }
1854
1855        let invalid_set = self.propagate_graph_status(inner, vec![me]);
1856
1857        // Post-processing invalid blocks.
1858        inner.process_invalid_blocks(&invalid_set);
1859
1860        debug!(
1861            "new block inserted into graph: block_header={:?}, tx_count={}, block_size={}",
1862            block.block_header,
1863            block.transactions.len(),
1864            block.size(),
1865        );
1866
1867        // Note: If `me` is invalid, it has been removed from `arena` now,
1868        // so we cannot access its `graph_status`.
1869        if invalid_set.contains(&me) {
1870            BlockInsertionResult::Invalid
1871        } else if inner.arena[me].graph_status >= BLOCK_HEADER_GRAPH_READY {
1872            BlockInsertionResult::ShouldRelay
1873        } else {
1874            BlockInsertionResult::SuccessWithoutRelay
1875        }
1876    }
1877
1878    pub fn get_all_block_hashes_by_epoch(
1879        &self, epoch_number: u64,
1880    ) -> Result<Vec<H256>, ProviderBlockError> {
1881        let mut res = self.consensus.get_skipped_block_hashes_by_epoch(
1882            EpochNumber::Number(epoch_number.into()),
1883        )?;
1884        res.append(&mut self.consensus.get_block_hashes_by_epoch(
1885            EpochNumber::Number(epoch_number.into()),
1886        )?);
1887        Ok(res)
1888    }
1889
1890    pub fn log_statistics(&self) { self.statistics.log_statistics(); }
1891
1892    pub fn update_total_weight_delta_heartbeat(&self) {
1893        self.consensus.update_total_weight_delta_heartbeat();
1894    }
1895
1896    /// Get the current number of blocks in the synchronization graph
1897    /// This only returns cached block count, and this is enough since this is
1898    /// only used in test.
1899    pub fn block_count(&self) -> usize { self.data_man.cached_block_count() }
1900
1901    /// Remove all blocks which have not been updated for a long time. We
1902    /// maintain a set `not_ready_blocks_frontier` which is the root nodes in
1903    /// the parental tree formed by not graph ready blocks. Find all expire
1904    /// blocks which can be reached by `not_ready_blocks_frontier`.
1905    pub fn remove_expire_blocks(&self, expire_time: u64) {
1906        let inner = &mut *self.inner.write();
1907        let now = SystemTime::now()
1908            .duration_since(UNIX_EPOCH)
1909            .unwrap()
1910            .as_secs();
1911        let frontier = inner.not_ready_blocks_frontier.get_frontier().clone();
1912        let all_not_ready: HashSet<_> = inner.get_future(frontier);
1913        let mut expire_set = HashSet::new();
1914        for index in all_not_ready {
1915            if inner.arena[index].last_update_timestamp + expire_time < now {
1916                expire_set.insert(index);
1917            }
1918        }
1919
1920        // find blocks reached by previous found expired blocks
1921        let all_expire: HashSet<_> = inner.get_future(expire_set);
1922        debug!("all_expire: {:?}", all_expire);
1923        inner.remove_blocks(&all_expire);
1924    }
1925
1926    /// Remove all blocks in `to_remove_set` and their future set from the
1927    /// graph.
1928    pub fn remove_blocks_and_future(&self, to_remove_set: &HashSet<H256>) {
1929        let mut inner = self.inner.write();
1930        let mut index_set = Vec::new();
1931        for block_hash in to_remove_set {
1932            if let Some(index) = inner.hash_to_arena_indices.get(block_hash) {
1933                index_set.push(*index);
1934            }
1935        }
1936        let index_set_and_future: HashSet<_> = inner.get_future(index_set);
1937        inner.remove_blocks(&index_set_and_future);
1938    }
1939
1940    pub fn is_consensus_worker_busy(&self) -> bool {
1941        self.consensus_unprocessed_count.load(Ordering::SeqCst) != 0
1942    }
1943
1944    pub fn is_fill_block_completed(&self) -> bool {
1945        self.inner.read().block_to_fill_set.is_empty()
1946    }
1947
1948    /// Construct the states along the pivot chain, set all
1949    /// `BLOCK_HEADER_GRAPH_READY` blocks as `BLOCK_GRAPH_READY` and remove all
1950    /// other blocks. All blocks in the future can be processed normally in
1951    /// sync graph and consensus graph.
1952    ///
1953    /// If some blocks become invalid after validating their bodies, we need to
1954    /// remove them and reconstruct the consensus graph. Return `false` if
1955    /// there are blocks in the new consensus graph whose bodies are missing.
1956    /// Return `true` if we do not need to reconstruct consensus, or all blocks
1957    /// in the new consensus graph already have bodies.
1958    pub fn complete_filling_block_bodies(&self) -> bool {
1959        {
1960            let inner = &mut *self.inner.write();
1961
1962            // Iterating over `hash_to_arena_indices` might be more efficient
1963            // than iterating over `arena`.
1964            let to_remove = {
1965                let arena = &mut inner.arena;
1966                inner
1967                    .hash_to_arena_indices
1968                    .iter()
1969                    .filter_map(|(_, index)| {
1970                        let graph_node = &mut arena[*index];
1971                        if graph_node.graph_status == BLOCK_HEADER_GRAPH_READY {
1972                            graph_node.block_ready = true;
1973                            graph_node.graph_status = BLOCK_GRAPH_READY;
1974                        }
1975                        if graph_node.graph_status != BLOCK_GRAPH_READY {
1976                            Some(*index)
1977                        } else {
1978                            None
1979                        }
1980                    })
1981                    .collect()
1982            };
1983            inner.remove_blocks(&to_remove);
1984
1985            // Check if we skip some block bodies. It's either because they are
1986            // never retrieved after a long time, or they have invalid
1987            // bodies.
1988            let skipped_body_blocks =
1989                self.consensus.get_blocks_needing_bodies();
1990            if !skipped_body_blocks.is_empty() {
1991                warn!("Has invalid blocks after downloading block bodies!");
1992                // Some headers should not enter consensus, so we just
1993                // reconstruct the consensus graph with the
1994                // current sync graph.
1995                self.consensus.reset();
1996
1997                let all_block_indices: HashSet<_> = inner
1998                    .hash_to_arena_indices
1999                    .iter()
2000                    .map(|(_, i)| *i)
2001                    .collect();
2002                // Send blocks in topological order.
2003                let sorted_blocks = inner.topological_sort(all_block_indices);
2004                for i in sorted_blocks {
2005                    self.consensus
2006                        .on_new_block(&inner.arena[i].block_header.hash());
2007                }
2008                let new_to_fill_blocks: HashSet<_> =
2009                    self.consensus.get_blocks_needing_bodies();
2010                if !new_to_fill_blocks.is_empty() {
2011                    // This should not happen if stable checkpoint is not
2012                    // reverted because we have downloaded
2013                    // all blocks in its subtree.
2014                    warn!(
2015                        "{} new block bodies to get",
2016                        new_to_fill_blocks.len()
2017                    );
2018                    inner.block_to_fill_set = new_to_fill_blocks;
2019                    return false;
2020                }
2021            }
2022        }
2023        self.consensus.construct_pivot_state();
2024        self.inner.write().locked_for_catchup = false;
2025        true
2026    }
2027
2028    /// TODO(lpl): Only triggered when pos commits new blocks?
2029    /// Check if not_ready_frontier blocks become ready now.
2030    /// Blocks that are not ready because of missing pos references only become
2031    /// ready here.
2032    pub fn check_not_ready_frontier(&self, header_only: bool) {
2033        debug!("check_not_ready_frontier starts");
2034        let mut inner = self.inner.write();
2035        if inner.locked_for_catchup {
2036            // Do not change sync graph or consensus graph during
2037            // `CatchUpFillBlockBodyPhase`.
2038            return;
2039        }
2040        if header_only {
2041            for b in inner.pos_not_ready_blocks_frontier.clone() {
2042                debug!(
2043                    "check_not_ready_frontier: check {:?}",
2044                    inner.arena[b].block_header.hash()
2045                );
2046                if inner.new_to_be_header_graph_ready(b) {
2047                    self.propagate_header_graph_status(
2048                        &mut *inner,
2049                        vec![b],
2050                        true, /* need_to_verify */
2051                        b,
2052                        true, /* insert_to_consensus */
2053                        true, /* persistent */
2054                    );
2055                }
2056            }
2057        } else {
2058            for b in inner.pos_not_ready_blocks_frontier.clone() {
2059                debug!(
2060                    "check_not_ready_frontier: check {:?}",
2061                    inner.arena[b].block_header.hash()
2062                );
2063                if inner.new_to_be_header_graph_ready(b) {
2064                    self.propagate_header_graph_status(
2065                        &mut *inner,
2066                        vec![b],
2067                        true, /* need_to_verify */
2068                        b,
2069                        false, /* insert_to_consensus */
2070                        true,  /* persistent */
2071                    );
2072                }
2073                // This will not introduce new invalid blocks, so we do not need
2074                // to process the return value.
2075                if inner.new_to_be_block_graph_ready(b) {
2076                    debug!("new graph ready found");
2077                    self.propagate_graph_status(&mut *inner, vec![b]);
2078                }
2079            }
2080        }
2081    }
2082}
2083
2084impl Graph for SynchronizationGraphInner {
2085    type NodeIndex = usize;
2086}
2087
2088impl TreeGraph for SynchronizationGraphInner {
2089    fn parent(&self, node_index: Self::NodeIndex) -> Option<Self::NodeIndex> {
2090        if self.arena[node_index].parent != NULL {
2091            Some(self.arena[node_index].parent)
2092        } else {
2093            None
2094        }
2095    }
2096
2097    fn referees(&self, node_index: Self::NodeIndex) -> Vec<Self::NodeIndex> {
2098        self.arena[node_index].referees.clone()
2099    }
2100}
2101
2102impl RichTreeGraph for SynchronizationGraphInner {
2103    fn children(&self, node_index: Self::NodeIndex) -> Vec<Self::NodeIndex> {
2104        self.arena[node_index].children.clone()
2105    }
2106
2107    fn referrers(&self, node_index: Self::NodeIndex) -> Vec<Self::NodeIndex> {
2108        self.arena[node_index].referrers.clone()
2109    }
2110}
2111
2112pub enum BlockInsertionResult {
2113    // The block is valid and already processed before.
2114    AlreadyProcessed,
2115    // The block is valid and is new to be block-graph-ready.
2116    ShouldRelay,
2117    // The block is valid but not block-graph-ready.
2118    SuccessWithoutRelay,
2119    // The block is definitely invalid. It's not inserted to sync graph
2120    // and should not be requested again.
2121    Invalid,
2122    // The case where transaction root does not match.
2123    // We should request again to get
2124    // the correct transactions for full verification.
2125    RequestAgain,
2126    // This is only for the case the header is removed, possibly because
2127    // we switch phases.
2128    // We ignore the block without verification.
2129    Ignored,
2130}
2131
2132impl BlockInsertionResult {
2133    pub fn is_valid(&self) -> bool {
2134        matches!(
2135            self,
2136            BlockInsertionResult::AlreadyProcessed
2137                | BlockInsertionResult::ShouldRelay
2138                | BlockInsertionResult::SuccessWithoutRelay
2139        )
2140    }
2141
2142    pub fn is_invalid(&self) -> bool {
2143        matches!(self, BlockInsertionResult::Invalid)
2144    }
2145
2146    pub fn should_relay(&self) -> bool {
2147        matches!(self, BlockInsertionResult::ShouldRelay)
2148    }
2149
2150    pub fn request_again(&self) -> bool {
2151        matches!(self, BlockInsertionResult::RequestAgain)
2152    }
2153}
2154
2155pub enum BlockHeaderInsertionResult {
2156    // The block is valid and already processed consensus before.
2157    // We should not process this block again.
2158    AlreadyProcessedInConsensus,
2159    // The block header has been inserted into sync graph. We can ingore this
2160    // header, but we should keep processing its body if needed.
2161    AlreadyProcessedInSync,
2162    // The block is valid and is processed for the first time.
2163    NewValid,
2164    // The block is definitely invalid. It's not inserted to sync graph
2165    // and should not be requested again.
2166    Invalid,
2167    // The header is received when we have locked sync graph.
2168    TemporarySkipped,
2169}
2170
2171impl BlockHeaderInsertionResult {
2172    pub fn is_new_valid(&self) -> bool {
2173        matches!(self, BlockHeaderInsertionResult::NewValid)
2174    }
2175
2176    pub fn is_invalid(&self) -> bool {
2177        matches!(self, BlockHeaderInsertionResult::Invalid)
2178    }
2179
2180    pub fn should_process_body(&self) -> bool {
2181        matches!(
2182            self,
2183            BlockHeaderInsertionResult::NewValid
2184                | BlockHeaderInsertionResult::AlreadyProcessedInSync
2185        )
2186    }
2187}