1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318
// Copyright 2020 Conflux Foundation. All rights reserved.
// Conflux is free software and distributed under GNU General Public License.
// See http://www.gnu.org/licenses/
use crate::{
block_data_manager::BlockDataManager, channel::Channel,
consensus::consensus_inner::ConsensusGraphInner, Notifications,
};
use cfx_parameters::{consensus::*, light::BLAME_CHECK_OFFSET};
use primitives::BlockHeader;
use std::{collections::VecDeque, sync::Arc};
pub struct BlameVerifier {
/// Channel used to send verified headers to the light sync layer.
/// Each element is <height, maybe_witness>
blame_sender: Arc<Channel<(u64, Option<u64>)>>,
/// Block data manager.
data_man: Arc<BlockDataManager>,
/// Last epoch received from ConsensusNewBlockHandler.
last_epoch_received: u64,
/// Next epoch we plan to process.
next_epoch_to_process: u64,
/// Queue of epochs that need to be re-processed.
queue: VecDeque<u64>,
}
impl BlameVerifier {
pub fn new(
data_man: Arc<BlockDataManager>, notifications: Arc<Notifications>,
) -> Self {
let blame_sender = notifications.blame_verification_results.clone();
let stable_hash = data_man.get_cur_consensus_era_stable_hash();
let stable_height = data_man
.block_header_by_hash(&stable_hash)
.expect("Current era stable header should exist")
.height();
let start_height = stable_height.saturating_sub(BLAME_CHECK_OFFSET);
debug!("Starting Blame Verifier from height {}", start_height);
let last_epoch_received = start_height;
let next_epoch_to_process = start_height + 1;
let queue = VecDeque::new();
Self {
blame_sender,
data_man,
last_epoch_received,
next_epoch_to_process,
queue,
}
}
fn header_from_height(
&self, inner: &ConsensusGraphInner, height: u64,
) -> Option<Arc<BlockHeader>> {
let pivot_index = inner.height_to_pivot_index(height);
let pivot_arena_index = inner.pivot_chain[pivot_index];
let pivot_hash = inner.arena[pivot_arena_index].hash;
self.data_man.block_header_by_hash(&pivot_hash)
}
fn first_trusted_header_starting_from(
&self, inner: &ConsensusGraphInner, height: u64,
) -> Option<u64> {
// check if `height` is available in memory
let pivot_index = match height {
h if h < inner.get_cur_era_genesis_height() => return None,
h => inner.height_to_pivot_index(h),
};
inner
.find_first_trusted_starting_from(
pivot_index,
Some(1000), /* blame_bound */
10, /* min_vote_count */
)
.map(|index| inner.pivot_index_to_height(index))
}
/// Add `epoch` to the queue and start processing it.
pub fn process(&mut self, inner: &ConsensusGraphInner, epoch: u64) {
// we need to keep an offset so that we have
// enough headers to calculate the blame ratio
// TODO(thegaram): choose better value for `BLAME_CHECK_OFFSET`
let epoch = match epoch {
e if e < BLAME_CHECK_OFFSET => return,
e => e - BLAME_CHECK_OFFSET,
};
trace!("Blame verification received epoch {:?}", epoch);
self.queue.push_back(epoch);
loop {
// process while there are unprocessed epochs
let epoch = match self.queue.pop_front() {
Some(e) => e,
None => break,
};
// process until we encounter an epoch for which
// there is no blame information available
if !self.check(inner, epoch) {
break;
}
}
}
/// Check the blame corresponding to `epoch` and send the verification
/// results to the light node sync layer.
/// Returns false if the epoch cannot be processed, true otherwise.
#[rustfmt::skip]
pub fn check(&mut self, inner: &ConsensusGraphInner, epoch: u64) -> bool {
trace!(
"Blame verification is processing epoch {:?} (last_epoch_received = {}, next_epoch_to_process = {})",
epoch, self.last_epoch_received, self.next_epoch_to_process
);
match epoch {
// pivot chain reorg
//
// --- ---
// .- | D | <--- | E | <--- ...
// --- --- --- | --- ---
// | A | <--- | B | <--- | C | <-*
// --- --- --- | ---
// .- | F | <--- ...
// ---
//
// example (BLAME_CHECK_OFFSET = 2):
// check is called with C, D, E, C, F
// we will process epochs A, B, C, A, B
//
// TODO(thegaram): can a fork change the blame status of a header?
e if e <= self.last_epoch_received => {
// re-process from fork point
debug!("Chain reorg ({} --> {}), re-executing", self.last_epoch_received, e);
self.last_epoch_received = e;
self.next_epoch_to_process = e;
}
// sanity check: epochs are sent in order, one-by-one
e if e > self.last_epoch_received + 1 => {
error!(
"Unexpected epoch number: e = {}, last_epoch_received = {}",
e, self.last_epoch_received
);
assert!(false);
}
// epoch already handled through witness
//
// blames
// ............
// v |
// --- --- --- ---
// | A | <--- | B | <--- | C | <--- | D | <--- ...
// --- --- --- ---
//
// we receive B and proceed to request all blamed headers (B, C);
// set last-epoch-received to B and next-epoch-to-process to D;
// we will skip C in the next iteration (it is covered already).
e if e < self.next_epoch_to_process => {
debug!("Epoch already covered, skipping (e = {}, next_epoch_to_process = {})", e, self.next_epoch_to_process);
self.last_epoch_received = e;
return true;
}
// sanity check: no epochs are skipped
e if e > self.next_epoch_to_process => {
error!("Unexpected epoch number: e = {}, next_epoch_to_process = {}", e, self.next_epoch_to_process);
assert!(false);
}
// in most cases, we will iterate over the pivot chain sequentially;
// at each step, the epoch we receive (e) will be the same as the
// next-epoch-to-process (nep)
//
// e
// --- --- --- ---
// | | <--- | | <--- | | <--- | | <--- ...
// --- --- --- ---
// ler nep
// e == last_epoch_received + 1
// e == next_epoch_to_process
e => {
self.last_epoch_received = e;
}
}
// convert epoch number into pivot height
let height = epoch + DEFERRED_STATE_EPOCH_COUNT;
// check blame
debug!(
"Finding witness for header at height {} (epoch {})...",
height, epoch
);
match self.first_trusted_header_starting_from(inner, height) {
// no witness found
None => {
warn!(
"No witness found for epoch {} (height {});
best_epoch_number = {}",
epoch,
height,
inner.best_epoch_number(),
);
// this can happen in two cases:
//
// (1) we are lagging behind so much that `height`
// is no longer maintained in memory.
// --> consensus and blame verification are
// in sync so this should not happen.
//
// (2) there are too many blamed blocks on the
// `BLAME_CHECK_OFFSET` suffix of the pivot
// chain so we cannot reliably determine the
// witness.
// --> we will retry during the next invocation of `check`
//
// example for (2):
//
// blame blame blame
// ...... ...... ......
// / | / | / |
// --- --- --- ---
// | A | <-- | B | <-- | C | <-- | D | <--- ...
// --- --- --- ---
//
// if we have such a section of the pivot chain of length
// `BLAME_CHECK_OFFSET` or more, then at the point of
// receiving A we might not be able to find the corresponding
// witness. in this case, we store this epoch for processing
// later. we assume here that the pivot chain will eventually
// normalize and we will be able to find the witness later.
// save for further processing and terminate
self.queue.push_front(epoch);
return false;
}
// header is not blamed (i.e. it is its own witness)
Some(w) if w == height => {
trace!("Epoch {} (height {}) is NOT blamed", epoch, height);
let header = self
.header_from_height(inner, height)
.expect("Pivot header exists");
// normal case: blaming blocks have been covered previously,
// so this block must be non-blaming
if header.blame() == 0 {
// send non-blaming header
self.blame_sender.send((height, None));
}
// special case
//
// blame
// ........... --- --- ---
// v | .- | E | <--- | F | <--- | G | <--- ...
// --- --- --- --- | --- --- ---
// | A | <--- | B | <--- | C | <--- | D | <-*
// --- --- --- --- | ---
// .- | H | <--- ...
// ---
//
// example (BLAME_CHECK_OFFSET = 2)
// check is called with C, D, E, F, G, D, H
// we will process epochs A, B, C, D, E, B, C
//
// after the chain reorg, we will start re-executing from B
// B was already covered in A's iteration but it is blaming
// --> we do nothing, skip it
else {
// EMPTY
}
// continue from the next header on the pivot chain
self.next_epoch_to_process = epoch + 1;
}
// header is blamed
Some(w) => {
debug!(
"Epoch {} (height {}) is blamed, requesting witness {}",
epoch, height, w
);
// this request covers all blamed headers:
// [height, height + 1, ..., w]
self.blame_sender.send((height, Some(w)));
// skip all subsequent headers requested
assert!(w > DEFERRED_STATE_EPOCH_COUNT);
let witness_epoch = w - DEFERRED_STATE_EPOCH_COUNT;
self.next_epoch_to_process = witness_epoch + 1;
}
}
true
}
}