1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0
// Copyright 2021 Conflux Foundation. All rights reserved.
// Conflux is free software and distributed under GNU General Public License.
// See http://www.gnu.org/licenses/
use crate::pos::consensus::counters;
use anyhow::bail;
use consensus_types::{
executed_block::ExecutedBlock, quorum_cert::QuorumCert,
timeout_certificate::TimeoutCertificate,
};
use diem_crypto::HashValue;
use diem_logger::prelude::*;
use mirai_annotations::{checked_verify_eq, precondition};
use std::{
collections::{vec_deque::VecDeque, HashMap, HashSet},
sync::Arc,
};
/// This structure is a wrapper of
/// [`ExecutedBlock`](crate::consensus_types::block::ExecutedBlock)
/// that adds `children` field to know the parent-child relationship between
/// blocks.
struct LinkableBlock {
/// Executed block that has raw block data and execution output.
executed_block: Arc<ExecutedBlock>,
/// The set of children for cascading pruning. Note: a block may have
/// multiple children.
children: HashSet<HashValue>,
}
impl LinkableBlock {
pub fn new(block: ExecutedBlock) -> Self {
Self {
executed_block: Arc::new(block),
children: HashSet::new(),
}
}
pub fn executed_block(&self) -> &Arc<ExecutedBlock> { &self.executed_block }
pub fn children(&self) -> &HashSet<HashValue> { &self.children }
pub fn add_child(&mut self, child_id: HashValue) {
assert!(
self.children.insert(child_id),
"Block {:x} already existed.",
child_id,
);
}
}
impl LinkableBlock {
pub fn id(&self) -> HashValue { self.executed_block().id() }
}
/// This structure maintains a consistent block tree of parent and children
/// links. Blocks contain parent links and are immutable. For all parent links,
/// a child link exists. This structure should only be used internally in
/// BlockStore.
pub struct BlockTree {
/// All the blocks known to this replica (with parent links)
id_to_block: HashMap<HashValue, LinkableBlock>,
/// Root of the tree.
root_id: HashValue,
/// A certified block id with highest round
highest_certified_block_id: HashValue,
/// The quorum certificate of highest_certified_block
highest_quorum_cert: Arc<QuorumCert>,
/// The highest timeout certificate (if any).
highest_timeout_cert: Option<Arc<TimeoutCertificate>>,
/// The quorum certificate that has highest commit info.
highest_commit_cert: Arc<QuorumCert>,
/// Map of block id to its completed quorum certificate (2f + 1 votes)
id_to_quorum_cert: HashMap<HashValue, Arc<QuorumCert>>,
/// To keep the IDs of the elements that have been pruned from the tree but
/// not cleaned up yet.
pruned_block_ids: VecDeque<HashValue>,
/// Num pruned blocks to keep in memory.
max_pruned_blocks_in_mem: usize,
}
impl BlockTree {
pub(super) fn new(
root: ExecutedBlock, root_quorum_cert: QuorumCert,
root_ledger_info: QuorumCert, max_pruned_blocks_in_mem: usize,
highest_timeout_cert: Option<Arc<TimeoutCertificate>>,
) -> Self {
assert_eq!(
root.id(),
root_ledger_info.commit_info().id(),
"inconsistent root and ledger info"
);
let root_id = root.id();
let mut id_to_block = HashMap::new();
id_to_block.insert(root_id, LinkableBlock::new(root));
counters::NUM_BLOCKS_IN_TREE.set(1);
let root_quorum_cert = Arc::new(root_quorum_cert);
let mut id_to_quorum_cert = HashMap::new();
id_to_quorum_cert.insert(
root_quorum_cert.certified_block().id(),
Arc::clone(&root_quorum_cert),
);
let pruned_block_ids =
VecDeque::with_capacity(max_pruned_blocks_in_mem);
BlockTree {
id_to_block,
root_id,
highest_certified_block_id: root_id,
highest_quorum_cert: Arc::clone(&root_quorum_cert),
highest_timeout_cert,
highest_commit_cert: Arc::new(root_ledger_info),
id_to_quorum_cert,
pruned_block_ids,
max_pruned_blocks_in_mem,
}
}
// This method will only be used in this module.
fn get_linkable_block(
&self, block_id: &HashValue,
) -> Option<&LinkableBlock> {
self.id_to_block.get(block_id)
}
// This method will only be used in this module.
fn get_linkable_block_mut(
&mut self, block_id: &HashValue,
) -> Option<&mut LinkableBlock> {
self.id_to_block.get_mut(block_id)
}
// This method will only be used in this module.
fn linkable_root(&self) -> &LinkableBlock {
self.get_linkable_block(&self.root_id)
.expect("Root must exist")
}
fn remove_block(&mut self, block_id: HashValue) {
// Remove the block from the store
self.id_to_block.remove(&block_id);
self.id_to_quorum_cert.remove(&block_id);
}
pub(super) fn block_exists(&self, block_id: &HashValue) -> bool {
self.id_to_block.contains_key(block_id)
}
pub(super) fn get_block(
&self, block_id: &HashValue,
) -> Option<Arc<ExecutedBlock>> {
self.get_linkable_block(block_id)
.map(|lb| Arc::clone(lb.executed_block()))
}
pub(super) fn root(&self) -> Arc<ExecutedBlock> {
self.get_block(&self.root_id).expect("Root must exist")
}
pub(super) fn highest_certified_block(&self) -> Arc<ExecutedBlock> {
self.get_block(&self.highest_certified_block_id)
.expect("Highest cerfified block must exist")
}
pub(super) fn highest_quorum_cert(&self) -> Arc<QuorumCert> {
Arc::clone(&self.highest_quorum_cert)
}
pub(super) fn highest_timeout_cert(
&self,
) -> Option<Arc<TimeoutCertificate>> {
self.highest_timeout_cert.clone()
}
/// Replace highest timeout cert with the given value.
pub(super) fn replace_timeout_cert(&mut self, tc: Arc<TimeoutCertificate>) {
self.highest_timeout_cert.replace(tc);
}
pub(super) fn highest_commit_cert(&self) -> Arc<QuorumCert> {
Arc::clone(&self.highest_commit_cert)
}
pub(super) fn get_quorum_cert_for_block(
&self, block_id: &HashValue,
) -> Option<Arc<QuorumCert>> {
self.id_to_quorum_cert.get(block_id).cloned()
}
pub(super) fn insert_block(
&mut self, block: ExecutedBlock,
) -> anyhow::Result<Arc<ExecutedBlock>> {
let block_id = block.id();
if let Some(existing_block) = self.get_block(&block_id) {
diem_debug!("Already had block {:?} for id {:?} when trying to add another block {:?} for the same id",
existing_block,
block_id,
block);
checked_verify_eq!(
existing_block.compute_result(),
block.compute_result()
);
Ok(existing_block)
} else {
match self.get_linkable_block_mut(&block.parent_id()) {
Some(parent_block) => parent_block.add_child(block_id),
None => bail!("Parent block {} not found", block.parent_id()),
};
let linkable_block = LinkableBlock::new(block);
let arc_block = Arc::clone(linkable_block.executed_block());
assert!(self
.id_to_block
.insert(block_id, linkable_block)
.is_none());
counters::NUM_BLOCKS_IN_TREE.inc();
Ok(arc_block)
}
}
pub(super) fn insert_quorum_cert(
&mut self, qc: QuorumCert,
) -> anyhow::Result<()> {
let block_id = qc.certified_block().id();
let qc = Arc::new(qc);
// Safety invariant: For any two quorum certificates qc1, qc2 in the
// block store, qc1 == qc2 || qc1.round != qc2.round
// The invariant is quadratic but can be maintained in linear time by
// the check below.
precondition!({
let qc_round = qc.certified_block().round();
self.id_to_quorum_cert.values().all(|x| {
(*(*x).ledger_info()).ledger_info().consensus_data_hash()
== (*(*qc).ledger_info())
.ledger_info()
.consensus_data_hash()
|| x.certified_block().round() != qc_round
})
});
match self.get_block(&block_id) {
Some(block) => {
if block.round() > self.highest_certified_block().round() {
self.highest_certified_block_id = block.id();
self.highest_quorum_cert = Arc::clone(&qc);
}
}
None => bail!("Block {} not found", block_id),
}
self.id_to_quorum_cert
.entry(block_id)
.or_insert_with(|| Arc::clone(&qc));
if self.highest_commit_cert.commit_info().round()
< qc.commit_info().round()
{
self.highest_commit_cert = qc;
}
Ok(())
}
/// Find the blocks to prune up to next_root_id (keep next_root_id's block).
/// Any branches not part of the next_root_id's tree should be removed
/// as well.
///
/// For example, root = B0
/// B0--> B1--> B2
/// ╰--> B3--> B4
///
/// prune_tree(B_3) should be left with
/// B3--> B4, root = B3
///
/// Note this function is read-only, use with process_pruned_blocks to do
/// the actual prune.
pub(super) fn find_blocks_to_prune(
&self, next_root_id: HashValue,
) -> VecDeque<HashValue> {
// Nothing to do if this is the root
if next_root_id == self.root_id {
return VecDeque::new();
}
let mut blocks_pruned = VecDeque::new();
let mut blocks_to_be_pruned = Vec::new();
blocks_to_be_pruned.push(self.linkable_root());
while let Some(block_to_remove) = blocks_to_be_pruned.pop() {
// Add the children to the blocks to be pruned (if any), but stop
// when it reaches the new root
for child_id in block_to_remove.children() {
if next_root_id == *child_id {
continue;
}
blocks_to_be_pruned.push(
self.get_linkable_block(child_id)
.expect("Child must exist in the tree"),
);
}
// Track all the block ids removed
blocks_pruned.push_back(block_to_remove.id());
}
blocks_pruned
}
/// Process the data returned by the prune_tree, they're separated because
/// caller might be interested in doing extra work e.g. delete from
/// persistent storage. Note that we do not necessarily remove the
/// pruned blocks: they're kept in a separate buffer for some time in
/// order to enable other peers to retrieve the blocks even after they've
/// been committed.
pub(super) fn process_pruned_blocks(
&mut self, root_id: HashValue,
mut newly_pruned_blocks: VecDeque<HashValue>,
) {
assert!(self.block_exists(&root_id));
// Update the next root
self.root_id = root_id;
counters::NUM_BLOCKS_IN_TREE.sub(newly_pruned_blocks.len() as i64);
// The newly pruned blocks are pushed back to the deque
// pruned_block_ids. In case the overall number of the elements
// is greater than the predefined threshold, the oldest elements
// (in the front of the deque) are removed from the tree.
self.pruned_block_ids.append(&mut newly_pruned_blocks);
if self.pruned_block_ids.len() > self.max_pruned_blocks_in_mem {
let num_blocks_to_remove =
self.pruned_block_ids.len() - self.max_pruned_blocks_in_mem;
for _ in 0..num_blocks_to_remove {
if let Some(id) = self.pruned_block_ids.pop_front() {
self.remove_block(id);
}
}
}
}
/// Returns all the blocks between the root and the given block, including
/// the given block but excluding the root.
/// In case a given block is not the successor of the root, return None.
/// While generally the provided blocks should always belong to the active
/// tree, there might be a race, in which the root of the tree is
/// propagated forward between retrieving the block and getting its path
/// from root (e.g., at proposal generator). Hence, we don't want to panic
/// and prefer to return None instead.
pub(super) fn path_from_root(
&self, block_id: HashValue,
) -> Option<Vec<Arc<ExecutedBlock>>> {
let mut res = vec![];
let mut cur_block_id = block_id;
loop {
match self.get_block(&cur_block_id) {
Some(ref block) if block.round() <= self.root().round() => {
break;
}
Some(block) => {
cur_block_id = block.parent_id();
res.push(block);
}
None => return None,
}
}
// At this point cur_block.round() <= self.root.round()
if cur_block_id != self.root_id {
return None;
}
// Called `.reverse()` to get the chronically increased order.
res.reverse();
Some(res)
}
}
#[cfg(any(test, feature = "fuzzing"))]
#[allow(unused)]
impl BlockTree {
/// Returns the number of blocks in the tree
pub(super) fn len(&self) -> usize {
// BFS over the tree to find the number of blocks in the tree.
let mut res = 0;
let mut to_visit = Vec::new();
to_visit.push(self.linkable_root());
while let Some(block) = to_visit.pop() {
res += 1;
for child_id in block.children() {
to_visit.push(
self.get_linkable_block(child_id)
.expect("Child must exist in the tree"),
);
}
}
res
}
/// Returns the number of child links in the tree
pub(super) fn child_links(&self) -> usize { self.len() - 1 }
/// The number of pruned blocks that are still available in memory
pub(super) fn pruned_blocks_in_mem(&self) -> usize {
self.pruned_block_ids.len()
}
}