1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0
// Copyright 2021 Conflux Foundation. All rights reserved.
// Conflux is free software and distributed under GNU General Public License.
// See http://www.gnu.org/licenses/
//! This module provides `Pruner` which manages a thread pruning old data in the
//! background and is meant to be triggered by other threads as they commit new
//! data to the DB.
use crate::{
metrics::{
DIEM_STORAGE_OTHER_TIMERS_SECONDS,
DIEM_STORAGE_PRUNER_LEAST_READABLE_STATE_VERSION,
DIEM_STORAGE_PRUNE_WINDOW,
},
schema::{
jellyfish_merkle_node::JellyfishMerkleNodeSchema,
stale_node_index::StaleNodeIndexSchema,
},
};
use anyhow::Result;
use diem_infallible::Mutex;
use diem_jellyfish_merkle::StaleNodeIndex;
use diem_logger::prelude::*;
use diem_types::transaction::Version;
use schemadb::{ReadOptions, SchemaBatch, SchemaIterator, DB};
use std::{
iter::Peekable,
sync::{
atomic::{AtomicU64, Ordering},
mpsc::{channel, Receiver, Sender},
Arc,
},
thread::{sleep, JoinHandle},
time::{Duration, Instant},
};
/// The `Pruner` is meant to be part of a `DiemDB` instance and runs in the
/// background to prune old data.
///
/// It creates a worker thread on construction and joins it on destruction. When
/// destructed, it quits the worker thread eagerly without waiting for all
/// pending work to be done.
#[derive(Debug)]
pub(crate) struct Pruner {
/// Other than the latest version, how many historical versions to keep
/// being readable. For example, this being 0 means keep only the
/// latest version.
historical_versions_to_keep: u64,
/// The worker thread handle, created upon Pruner instance construction and
/// joined upon its destruction. It only becomes `None` after joined in
/// `drop()`.
worker_thread: Option<JoinHandle<()>>,
/// The sender side of the channel talking to the worker thread.
command_sender: Mutex<Sender<Command>>,
/// (For tests) A way for the worker thread to inform the `Pruner` the
/// pruning progress. If it sets this atomic value to `V`, all versions
/// before `V` can no longer be accessed.
#[allow(dead_code)]
worker_progress: Arc<AtomicU64>,
}
impl Pruner {
/// Creates a worker thread that waits on a channel for pruning commands.
pub fn new(db: Arc<DB>, historical_versions_to_keep: u64) -> Self {
let (command_sender, command_receiver) = channel();
let worker_progress = Arc::new(AtomicU64::new(0));
let worker_progress_clone = Arc::clone(&worker_progress);
DIEM_STORAGE_PRUNE_WINDOW.set(historical_versions_to_keep as i64);
let worker_thread = std::thread::Builder::new()
.name("pos_ledger_db_pruner".into())
.spawn(move || {
Worker::new(db, command_receiver, worker_progress_clone).work()
})
.expect("Creating pruner thread should succeed.");
Self {
historical_versions_to_keep,
worker_thread: Some(worker_thread),
command_sender: Mutex::new(command_sender),
worker_progress,
}
}
/// Sends pruning command to the worker thread when necessary.
pub fn wake(&self, latest_version: Version) {
if latest_version > self.historical_versions_to_keep {
let least_readable_version =
latest_version - self.historical_versions_to_keep;
self.command_sender
.lock()
.send(Command::Prune {
least_readable_version,
})
.expect("Receiver should not destruct prematurely.");
}
}
/// (For tests only.) Notifies the worker thread and waits for it to finish
/// its job by polling an internal counter.
#[cfg(test)]
pub fn wake_and_wait(&self, latest_version: Version) -> Result<()> {
self.wake(latest_version);
if latest_version > self.historical_versions_to_keep {
let least_readable_version =
latest_version - self.historical_versions_to_keep;
// Assuming no big pruning chunks will be issued by a test.
const TIMEOUT: Duration = Duration::from_secs(10);
let end = Instant::now() + TIMEOUT;
while Instant::now() < end {
if self.worker_progress.load(Ordering::Relaxed)
>= least_readable_version
{
return Ok(());
}
sleep(Duration::from_millis(1));
}
anyhow::bail!("Timeout waiting for pruner worker.");
}
Ok(())
}
}
impl Drop for Pruner {
fn drop(&mut self) {
self.command_sender
.lock()
.send(Command::Quit)
.expect("Receiver should not destruct.");
self.worker_thread
.take()
.expect("Worker thread must exist.")
.join()
.expect("Worker thread should join peacefully.");
}
}
enum Command {
Quit,
Prune { least_readable_version: Version },
}
struct Worker {
db: Arc<DB>,
command_receiver: Receiver<Command>,
target_least_readable_version: Version,
/// Keeps a record of the pruning progress. If this equals to version `V`,
/// we know versions smaller than `V` are no longer readable.
/// This being an atomic value is to communicate the info with the Pruner
/// thread (for tests).
least_readable_version: Arc<AtomicU64>,
/// Indicates if there's NOT any pending work to do currently, to hint
/// `Self::receive_commands()` to `recv()` blocking-ly.
blocking_recv: bool,
index_min_nonpurged_version: Version,
index_purged_at: Instant,
}
impl Worker {
const MAX_VERSIONS_TO_PRUNE_PER_BATCH: usize = 100;
fn new(
db: Arc<DB>, command_receiver: Receiver<Command>,
least_readable_version: Arc<AtomicU64>,
) -> Self {
Self {
db,
command_receiver,
least_readable_version,
target_least_readable_version: 0,
blocking_recv: true,
index_min_nonpurged_version: 0,
index_purged_at: Instant::now(),
}
}
fn work(mut self) {
self.initialize();
while self.receive_commands() {
// Process a reasonably small batch of work before trying to receive
// commands again, in case `Command::Quit` is received
// (that's when we should quit.)
match prune_state(
Arc::clone(&self.db),
self.least_readable_version.load(Ordering::Relaxed),
self.target_least_readable_version,
Self::MAX_VERSIONS_TO_PRUNE_PER_BATCH,
) {
Ok(least_readable_version) => {
self.record_progress(least_readable_version);
// Make next recv() blocking if all done.
self.blocking_recv = least_readable_version
== self.target_least_readable_version;
// Try to purge the log.
if let Err(e) = self.maybe_purge_index() {
diem_warn!(
error = ?e,
"Failed purging state node index, ignored.",
);
}
}
Err(e) => {
diem_error!(
error = ?e,
"Error pruning stale state nodes.",
);
// On error, stop retrying vigorously by making next recv()
// blocking.
self.blocking_recv = true;
}
}
}
}
/// Find out the first undeleted item in the stale node index.
///
/// Seeking from the beginning (version 0) is potentially costly, we do it
/// once upon worker thread start, record the progress and seek from
/// that position afterwards.
fn initialize(&mut self) {
loop {
match self.get_least_readable_version() {
Ok(least_readable_version) => {
diem_info!(
least_readable_version = least_readable_version,
"[state pruner worker] initialized."
);
self.target_least_readable_version = least_readable_version;
self.record_progress(least_readable_version);
return;
}
Err(e) => {
diem_error!(
error = ?e,
"[state pruner worker] Error on first seek. Retrying in 1 second.",
);
sleep(Duration::from_secs(1));
}
}
}
}
fn get_least_readable_version(&self) -> Result<Version> {
let mut iter = self
.db
.iter::<StaleNodeIndexSchema>(ReadOptions::default())?;
iter.seek_to_first();
Ok(iter.next().transpose()?.map_or(0, |(index, _)| {
index
.stale_since_version
.checked_sub(1)
.expect("Nothing is stale since version 0.")
}))
}
/// Log the progress.
fn record_progress(&mut self, least_readable_version: Version) {
self.least_readable_version
.store(least_readable_version, Ordering::Relaxed);
DIEM_STORAGE_PRUNER_LEAST_READABLE_STATE_VERSION
.set(least_readable_version as i64);
}
/// Tries to receive all pending commands, blocking waits for the next
/// command if no work needs to be done, otherwise quits with `true` to
/// allow the outer loop to do some work before getting back here.
///
/// Returns `false` if `Command::Quit` is received, to break the outer loop
/// and let `work_loop()` return.
fn receive_commands(&mut self) -> bool {
loop {
let command = if self.blocking_recv {
// Worker has nothing to do, blocking wait for the next command.
self.command_receiver
.recv()
.expect("Sender should not destruct prematurely.")
} else {
// Worker has pending work to do, non-blocking recv.
match self.command_receiver.try_recv() {
Ok(command) => command,
// Channel has drained, yield control to the outer loop.
Err(_) => return true,
}
};
match command {
// On `Command::Quit` inform the outer loop to quit by returning
// `false`.
Command::Quit => return false,
Command::Prune {
least_readable_version,
} => {
if least_readable_version
> self.target_least_readable_version
{
self.target_least_readable_version =
least_readable_version;
// Switch to non-blocking to allow some work to be done
// after the channel has
// drained.
self.blocking_recv = false;
}
}
}
}
}
/// Purge the stale node index so that after restart not too much already
/// pruned stuff is dealt with again (although no harm is done deleting
/// those then non-existent things.)
///
/// We issue (range) deletes on the index only periodically instead of after
/// every pruning batch to avoid sending too many deletions to the DB,
/// which takes disk space and slows it down.
fn maybe_purge_index(&mut self) -> Result<()> {
const MIN_INTERVAL: Duration = Duration::from_secs(60);
const MIN_VERSIONS: u64 = 60000;
// A deletion is issued at most once in one minute and when the pruner
// has progressed by at least 60000 versions (assuming the
// pruner deletes as slow as 1000 versions per second,
// this imposes at most one minute of work in vain after restarting.)
let now = Instant::now();
if now - self.index_purged_at > MIN_INTERVAL {
let least_readable_version =
self.least_readable_version.load(Ordering::Relaxed);
if least_readable_version - self.index_min_nonpurged_version + 1
> MIN_VERSIONS
{
let new_min_non_purged_version = least_readable_version + 1;
self.db.range_delete::<StaleNodeIndexSchema, Version>(
&self.index_min_nonpurged_version,
&new_min_non_purged_version, // end is exclusive
)?;
self.index_min_nonpurged_version = new_min_non_purged_version;
self.index_purged_at = now;
}
}
Ok(())
}
}
struct StaleNodeIndicesByVersionIterator<'a> {
inner: Peekable<SchemaIterator<'a, StaleNodeIndexSchema>>,
target_least_readable_version: Version,
}
impl<'a> StaleNodeIndicesByVersionIterator<'a> {
fn new(
db: &'a DB, least_readable_version: Version,
target_least_readable_version: Version,
) -> Result<Self> {
let mut iter =
db.iter::<StaleNodeIndexSchema>(ReadOptions::default())?;
iter.seek(&least_readable_version)?;
Ok(Self {
inner: iter.peekable(),
target_least_readable_version,
})
}
fn next_result(&mut self) -> Result<Option<Vec<StaleNodeIndex>>> {
match self.inner.next().transpose()? {
None => Ok(None),
Some((index, _)) => {
let version = index.stale_since_version;
if version > self.target_least_readable_version {
return Ok(None);
}
let mut indices = vec![index];
while let Some(res) = self.inner.peek() {
if let Ok((index_ref, _)) = res {
if index_ref.stale_since_version != version {
break;
}
}
let (index, _) = self
.inner
.next()
.transpose()?
.expect("Should be Some.");
indices.push(index);
}
Ok(Some(indices))
}
}
}
}
impl<'a> Iterator for StaleNodeIndicesByVersionIterator<'a> {
type Item = Result<Vec<StaleNodeIndex>>;
fn next(&mut self) -> Option<Self::Item> { self.next_result().transpose() }
}
pub fn prune_state(
db: Arc<DB>, least_readable_version: Version,
target_least_readable_version: Version, max_versions: usize,
) -> Result<Version> {
let indices = StaleNodeIndicesByVersionIterator::new(
&db,
least_readable_version,
target_least_readable_version,
)?
.take(max_versions) // Iterator<Item = Result<Vec<StaleNodeIndex>>>
.collect::<Result<Vec<_>>>()? // now Vec<Vec<StaleNodeIndex>>
.into_iter()
.flatten()
.collect::<Vec<_>>();
if indices.is_empty() {
Ok(least_readable_version)
} else {
let _timer = DIEM_STORAGE_OTHER_TIMERS_SECONDS
.with_label_values(&["pruner_commit"])
.start_timer();
let new_least_readable_version =
indices.last().expect("Should exist.").stale_since_version;
let mut batch = SchemaBatch::new();
indices.into_iter().try_for_each(|index| {
batch.delete::<JellyfishMerkleNodeSchema>(&index.node_key)
})?;
db.write_schemas(batch, false)?;
Ok(new_least_readable_version)
}
}
#[cfg(test)]
mod test;