diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 49bc75393..4bdb0deca 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -2376,6 +2376,9 @@ impl, Cold: ItemStore> HotColdDB self.cold_db.do_atomically(cold_ops)?; } + // In order to reclaim space, we need to compact the freezer DB as well. + self.cold_db.compact()?; + Ok(()) } } diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index 62619dd2c..d799bdedd 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -154,25 +154,15 @@ impl KeyValueStore for LevelDB { self.transaction_mutex.lock() } - /// Compact all values in the states and states flag columns. - fn compact(&self) -> Result<(), Error> { - let endpoints = |column: DBColumn| { - ( - BytesKey::from_vec(get_key_for_col(column.as_str(), Hash256::zero().as_bytes())), - BytesKey::from_vec(get_key_for_col( - column.as_str(), - Hash256::repeat_byte(0xff).as_bytes(), - )), - ) - }; - - for (start_key, end_key) in [ - endpoints(DBColumn::BeaconStateTemporary), - endpoints(DBColumn::BeaconState), - endpoints(DBColumn::BeaconStateSummary), - ] { - self.db.compact(&start_key, &end_key); - } + fn compact_column(&self, column: DBColumn) -> Result<(), Error> { + // Use key-size-agnostic keys [] and 0xff..ff with a minimum of 32 bytes to account for + // columns that may change size between sub-databases or schema versions. + let start_key = BytesKey::from_vec(get_key_for_col(column.as_str(), &[])); + let end_key = BytesKey::from_vec(get_key_for_col( + column.as_str(), + &vec![0xff; std::cmp::max(column.key_size(), 32)], + )); + self.db.compact(&start_key, &end_key); Ok(()) } diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index eacd28d2d..e86689b0c 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -80,8 +80,22 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { /// this method. In future we may implement a safer mandatory locking scheme. fn begin_rw_transaction(&self) -> MutexGuard<()>; - /// Compact the database, freeing space used by deleted items. - fn compact(&self) -> Result<(), Error>; + /// Compact a single column in the database, freeing space used by deleted items. + fn compact_column(&self, column: DBColumn) -> Result<(), Error>; + + /// Compact a default set of columns that are likely to free substantial space. + fn compact(&self) -> Result<(), Error> { + // Compact state and block related columns as they are likely to have the most churn, + // i.e. entries being created and deleted. + for column in [ + DBColumn::BeaconState, + DBColumn::BeaconStateSummary, + DBColumn::BeaconBlock, + ] { + self.compact_column(column)?; + } + Ok(()) + } /// Iterate through all keys and values in a particular column. fn iter_column(&self, column: DBColumn) -> ColumnIter { diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index c2e494dce..302d2c2ad 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -108,7 +108,7 @@ impl KeyValueStore for MemoryStore { self.transaction_mutex.lock() } - fn compact(&self) -> Result<(), Error> { + fn compact_column(&self, _column: DBColumn) -> Result<(), Error> { Ok(()) } } diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index 87a1a8bd1..3583d9e27 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -77,7 +77,15 @@ pub fn inspect_cli_app<'a, 'b>() -> App<'a, 'b> { Arg::with_name("freezer") .long("freezer") .help("Inspect the freezer DB rather than the hot DB") - .takes_value(false), + .takes_value(false) + .conflicts_with("blobs-db"), + ) + .arg( + Arg::with_name("blobs-db") + .long("blobs-db") + .help("Inspect the blobs DB rather than the hot DB") + .takes_value(false) + .conflicts_with("freezer"), ) .arg( Arg::with_name("output-dir") @@ -88,6 +96,34 @@ pub fn inspect_cli_app<'a, 'b>() -> App<'a, 'b> { ) } +pub fn compact_cli_app<'a, 'b>() -> App<'a, 'b> { + App::new("compact") + .setting(clap::AppSettings::ColoredHelp) + .about("Compact database manually") + .arg( + Arg::with_name("column") + .long("column") + .value_name("TAG") + .help("3-byte column ID (see `DBColumn`)") + .takes_value(true) + .required(true), + ) + .arg( + Arg::with_name("freezer") + .long("freezer") + .help("Inspect the freezer DB rather than the hot DB") + .takes_value(false) + .conflicts_with("blobs-db"), + ) + .arg( + Arg::with_name("blobs-db") + .long("blobs-db") + .help("Inspect the blobs DB rather than the hot DB") + .takes_value(false) + .conflicts_with("freezer"), + ) +} + pub fn prune_payloads_app<'a, 'b>() -> App<'a, 'b> { App::new("prune-payloads") .alias("prune_payloads") @@ -162,6 +198,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .subcommand(migrate_cli_app()) .subcommand(version_cli_app()) .subcommand(inspect_cli_app()) + .subcommand(compact_cli_app()) .subcommand(prune_payloads_app()) .subcommand(prune_blobs_app()) .subcommand(prune_states_app()) @@ -251,6 +288,7 @@ pub struct InspectConfig { skip: Option, limit: Option, freezer: bool, + blobs_db: bool, /// Configures where the inspect output should be stored. output_dir: PathBuf, } @@ -261,6 +299,7 @@ fn parse_inspect_config(cli_args: &ArgMatches) -> Result let skip = clap_utils::parse_optional(cli_args, "skip")?; let limit = clap_utils::parse_optional(cli_args, "limit")?; let freezer = cli_args.is_present("freezer"); + let blobs_db = cli_args.is_present("blobs-db"); let output_dir: PathBuf = clap_utils::parse_optional(cli_args, "output-dir")?.unwrap_or_else(PathBuf::new); @@ -270,6 +309,7 @@ fn parse_inspect_config(cli_args: &ArgMatches) -> Result skip, limit, freezer, + blobs_db, output_dir, }) } @@ -277,32 +317,20 @@ fn parse_inspect_config(cli_args: &ArgMatches) -> Result pub fn inspect_db( inspect_config: InspectConfig, client_config: ClientConfig, - runtime_context: &RuntimeContext, - log: Logger, ) -> Result<(), String> { - let spec = runtime_context.eth2_config.spec.clone(); let hot_path = client_config.get_db_path(); let cold_path = client_config.get_freezer_db_path(); let blobs_path = client_config.get_blobs_db_path(); - let db = HotColdDB::, LevelDB>::open( - &hot_path, - &cold_path, - &blobs_path, - |_, _, _| Ok(()), - client_config.store, - spec, - log, - ) - .map_err(|e| format!("{:?}", e))?; - let mut total = 0; let mut num_keys = 0; let sub_db = if inspect_config.freezer { - &db.cold_db + LevelDB::::open(&cold_path).map_err(|e| format!("Unable to open freezer DB: {e:?}"))? + } else if inspect_config.blobs_db { + LevelDB::::open(&blobs_path).map_err(|e| format!("Unable to open blobs DB: {e:?}"))? } else { - &db.hot_db + LevelDB::::open(&hot_path).map_err(|e| format!("Unable to open hot DB: {e:?}"))? }; let skip = inspect_config.skip.unwrap_or(0); @@ -385,6 +413,50 @@ pub fn inspect_db( Ok(()) } +pub struct CompactConfig { + column: DBColumn, + freezer: bool, + blobs_db: bool, +} + +fn parse_compact_config(cli_args: &ArgMatches) -> Result { + let column = clap_utils::parse_required(cli_args, "column")?; + let freezer = cli_args.is_present("freezer"); + let blobs_db = cli_args.is_present("blobs-db"); + Ok(CompactConfig { + column, + freezer, + blobs_db, + }) +} + +pub fn compact_db( + compact_config: CompactConfig, + client_config: ClientConfig, + log: Logger, +) -> Result<(), Error> { + let hot_path = client_config.get_db_path(); + let cold_path = client_config.get_freezer_db_path(); + let blobs_path = client_config.get_blobs_db_path(); + let column = compact_config.column; + + let (sub_db, db_name) = if compact_config.freezer { + (LevelDB::::open(&cold_path)?, "freezer_db") + } else if compact_config.blobs_db { + (LevelDB::::open(&blobs_path)?, "blobs_db") + } else { + (LevelDB::::open(&hot_path)?, "hot_db") + }; + info!( + log, + "Compacting database"; + "db" => db_name, + "column" => ?column + ); + sub_db.compact_column(column)?; + Ok(()) +} + pub struct MigrateConfig { to: SchemaVersion, } @@ -538,7 +610,10 @@ pub fn prune_states( // Check that the user has confirmed they want to proceed. if !prune_config.confirm { match db.get_anchor_info() { - Some(anchor_info) if anchor_info.state_upper_limit == STATE_UPPER_LIMIT_NO_RETAIN => { + Some(anchor_info) + if anchor_info.state_lower_limit == 0 + && anchor_info.state_upper_limit == STATE_UPPER_LIMIT_NO_RETAIN => + { info!(log, "States have already been pruned"); return Ok(()); } @@ -586,7 +661,11 @@ pub fn run(cli_args: &ArgMatches<'_>, env: Environment) -> Result } ("inspect", Some(cli_args)) => { let inspect_config = parse_inspect_config(cli_args)?; - inspect_db(inspect_config, client_config, &context, log) + inspect_db::(inspect_config, client_config) + } + ("compact", Some(cli_args)) => { + let compact_config = parse_compact_config(cli_args)?; + compact_db::(compact_config, client_config, log).map_err(format_err) } ("prune-payloads", Some(_)) => { prune_payloads(client_config, &context, log).map_err(format_err)