mirror of
https://gitlab.com/pulsechaincom/lighthouse-pulse.git
synced 2024-12-21 19:20:40 +00:00
Improve database compaction and prune-states
(#5142)
* Fix no-op state prune check * Compact freezer DB after pruning * Refine DB compaction * Add blobs-db options to inspect/compact * Better key size * Fix compaction end key
This commit is contained in:
parent
e470596715
commit
6f442f2bb8
@ -2376,6 +2376,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
self.cold_db.do_atomically(cold_ops)?;
|
self.cold_db.do_atomically(cold_ops)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// In order to reclaim space, we need to compact the freezer DB as well.
|
||||||
|
self.cold_db.compact()?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -154,25 +154,15 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
|
|||||||
self.transaction_mutex.lock()
|
self.transaction_mutex.lock()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Compact all values in the states and states flag columns.
|
fn compact_column(&self, column: DBColumn) -> Result<(), Error> {
|
||||||
fn compact(&self) -> Result<(), Error> {
|
// Use key-size-agnostic keys [] and 0xff..ff with a minimum of 32 bytes to account for
|
||||||
let endpoints = |column: DBColumn| {
|
// columns that may change size between sub-databases or schema versions.
|
||||||
(
|
let start_key = BytesKey::from_vec(get_key_for_col(column.as_str(), &[]));
|
||||||
BytesKey::from_vec(get_key_for_col(column.as_str(), Hash256::zero().as_bytes())),
|
let end_key = BytesKey::from_vec(get_key_for_col(
|
||||||
BytesKey::from_vec(get_key_for_col(
|
column.as_str(),
|
||||||
column.as_str(),
|
&vec![0xff; std::cmp::max(column.key_size(), 32)],
|
||||||
Hash256::repeat_byte(0xff).as_bytes(),
|
));
|
||||||
)),
|
self.db.compact(&start_key, &end_key);
|
||||||
)
|
|
||||||
};
|
|
||||||
|
|
||||||
for (start_key, end_key) in [
|
|
||||||
endpoints(DBColumn::BeaconStateTemporary),
|
|
||||||
endpoints(DBColumn::BeaconState),
|
|
||||||
endpoints(DBColumn::BeaconStateSummary),
|
|
||||||
] {
|
|
||||||
self.db.compact(&start_key, &end_key);
|
|
||||||
}
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -80,8 +80,22 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
|
|||||||
/// this method. In future we may implement a safer mandatory locking scheme.
|
/// this method. In future we may implement a safer mandatory locking scheme.
|
||||||
fn begin_rw_transaction(&self) -> MutexGuard<()>;
|
fn begin_rw_transaction(&self) -> MutexGuard<()>;
|
||||||
|
|
||||||
/// Compact the database, freeing space used by deleted items.
|
/// Compact a single column in the database, freeing space used by deleted items.
|
||||||
fn compact(&self) -> Result<(), Error>;
|
fn compact_column(&self, column: DBColumn) -> Result<(), Error>;
|
||||||
|
|
||||||
|
/// Compact a default set of columns that are likely to free substantial space.
|
||||||
|
fn compact(&self) -> Result<(), Error> {
|
||||||
|
// Compact state and block related columns as they are likely to have the most churn,
|
||||||
|
// i.e. entries being created and deleted.
|
||||||
|
for column in [
|
||||||
|
DBColumn::BeaconState,
|
||||||
|
DBColumn::BeaconStateSummary,
|
||||||
|
DBColumn::BeaconBlock,
|
||||||
|
] {
|
||||||
|
self.compact_column(column)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Iterate through all keys and values in a particular column.
|
/// Iterate through all keys and values in a particular column.
|
||||||
fn iter_column<K: Key>(&self, column: DBColumn) -> ColumnIter<K> {
|
fn iter_column<K: Key>(&self, column: DBColumn) -> ColumnIter<K> {
|
||||||
|
@ -108,7 +108,7 @@ impl<E: EthSpec> KeyValueStore<E> for MemoryStore<E> {
|
|||||||
self.transaction_mutex.lock()
|
self.transaction_mutex.lock()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn compact(&self) -> Result<(), Error> {
|
fn compact_column(&self, _column: DBColumn) -> Result<(), Error> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -77,7 +77,15 @@ pub fn inspect_cli_app<'a, 'b>() -> App<'a, 'b> {
|
|||||||
Arg::with_name("freezer")
|
Arg::with_name("freezer")
|
||||||
.long("freezer")
|
.long("freezer")
|
||||||
.help("Inspect the freezer DB rather than the hot DB")
|
.help("Inspect the freezer DB rather than the hot DB")
|
||||||
.takes_value(false),
|
.takes_value(false)
|
||||||
|
.conflicts_with("blobs-db"),
|
||||||
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("blobs-db")
|
||||||
|
.long("blobs-db")
|
||||||
|
.help("Inspect the blobs DB rather than the hot DB")
|
||||||
|
.takes_value(false)
|
||||||
|
.conflicts_with("freezer"),
|
||||||
)
|
)
|
||||||
.arg(
|
.arg(
|
||||||
Arg::with_name("output-dir")
|
Arg::with_name("output-dir")
|
||||||
@ -88,6 +96,34 @@ pub fn inspect_cli_app<'a, 'b>() -> App<'a, 'b> {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn compact_cli_app<'a, 'b>() -> App<'a, 'b> {
|
||||||
|
App::new("compact")
|
||||||
|
.setting(clap::AppSettings::ColoredHelp)
|
||||||
|
.about("Compact database manually")
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("column")
|
||||||
|
.long("column")
|
||||||
|
.value_name("TAG")
|
||||||
|
.help("3-byte column ID (see `DBColumn`)")
|
||||||
|
.takes_value(true)
|
||||||
|
.required(true),
|
||||||
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("freezer")
|
||||||
|
.long("freezer")
|
||||||
|
.help("Inspect the freezer DB rather than the hot DB")
|
||||||
|
.takes_value(false)
|
||||||
|
.conflicts_with("blobs-db"),
|
||||||
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("blobs-db")
|
||||||
|
.long("blobs-db")
|
||||||
|
.help("Inspect the blobs DB rather than the hot DB")
|
||||||
|
.takes_value(false)
|
||||||
|
.conflicts_with("freezer"),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn prune_payloads_app<'a, 'b>() -> App<'a, 'b> {
|
pub fn prune_payloads_app<'a, 'b>() -> App<'a, 'b> {
|
||||||
App::new("prune-payloads")
|
App::new("prune-payloads")
|
||||||
.alias("prune_payloads")
|
.alias("prune_payloads")
|
||||||
@ -162,6 +198,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
|
|||||||
.subcommand(migrate_cli_app())
|
.subcommand(migrate_cli_app())
|
||||||
.subcommand(version_cli_app())
|
.subcommand(version_cli_app())
|
||||||
.subcommand(inspect_cli_app())
|
.subcommand(inspect_cli_app())
|
||||||
|
.subcommand(compact_cli_app())
|
||||||
.subcommand(prune_payloads_app())
|
.subcommand(prune_payloads_app())
|
||||||
.subcommand(prune_blobs_app())
|
.subcommand(prune_blobs_app())
|
||||||
.subcommand(prune_states_app())
|
.subcommand(prune_states_app())
|
||||||
@ -251,6 +288,7 @@ pub struct InspectConfig {
|
|||||||
skip: Option<usize>,
|
skip: Option<usize>,
|
||||||
limit: Option<usize>,
|
limit: Option<usize>,
|
||||||
freezer: bool,
|
freezer: bool,
|
||||||
|
blobs_db: bool,
|
||||||
/// Configures where the inspect output should be stored.
|
/// Configures where the inspect output should be stored.
|
||||||
output_dir: PathBuf,
|
output_dir: PathBuf,
|
||||||
}
|
}
|
||||||
@ -261,6 +299,7 @@ fn parse_inspect_config(cli_args: &ArgMatches) -> Result<InspectConfig, String>
|
|||||||
let skip = clap_utils::parse_optional(cli_args, "skip")?;
|
let skip = clap_utils::parse_optional(cli_args, "skip")?;
|
||||||
let limit = clap_utils::parse_optional(cli_args, "limit")?;
|
let limit = clap_utils::parse_optional(cli_args, "limit")?;
|
||||||
let freezer = cli_args.is_present("freezer");
|
let freezer = cli_args.is_present("freezer");
|
||||||
|
let blobs_db = cli_args.is_present("blobs-db");
|
||||||
|
|
||||||
let output_dir: PathBuf =
|
let output_dir: PathBuf =
|
||||||
clap_utils::parse_optional(cli_args, "output-dir")?.unwrap_or_else(PathBuf::new);
|
clap_utils::parse_optional(cli_args, "output-dir")?.unwrap_or_else(PathBuf::new);
|
||||||
@ -270,6 +309,7 @@ fn parse_inspect_config(cli_args: &ArgMatches) -> Result<InspectConfig, String>
|
|||||||
skip,
|
skip,
|
||||||
limit,
|
limit,
|
||||||
freezer,
|
freezer,
|
||||||
|
blobs_db,
|
||||||
output_dir,
|
output_dir,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -277,32 +317,20 @@ fn parse_inspect_config(cli_args: &ArgMatches) -> Result<InspectConfig, String>
|
|||||||
pub fn inspect_db<E: EthSpec>(
|
pub fn inspect_db<E: EthSpec>(
|
||||||
inspect_config: InspectConfig,
|
inspect_config: InspectConfig,
|
||||||
client_config: ClientConfig,
|
client_config: ClientConfig,
|
||||||
runtime_context: &RuntimeContext<E>,
|
|
||||||
log: Logger,
|
|
||||||
) -> Result<(), String> {
|
) -> Result<(), String> {
|
||||||
let spec = runtime_context.eth2_config.spec.clone();
|
|
||||||
let hot_path = client_config.get_db_path();
|
let hot_path = client_config.get_db_path();
|
||||||
let cold_path = client_config.get_freezer_db_path();
|
let cold_path = client_config.get_freezer_db_path();
|
||||||
let blobs_path = client_config.get_blobs_db_path();
|
let blobs_path = client_config.get_blobs_db_path();
|
||||||
|
|
||||||
let db = HotColdDB::<E, LevelDB<E>, LevelDB<E>>::open(
|
|
||||||
&hot_path,
|
|
||||||
&cold_path,
|
|
||||||
&blobs_path,
|
|
||||||
|_, _, _| Ok(()),
|
|
||||||
client_config.store,
|
|
||||||
spec,
|
|
||||||
log,
|
|
||||||
)
|
|
||||||
.map_err(|e| format!("{:?}", e))?;
|
|
||||||
|
|
||||||
let mut total = 0;
|
let mut total = 0;
|
||||||
let mut num_keys = 0;
|
let mut num_keys = 0;
|
||||||
|
|
||||||
let sub_db = if inspect_config.freezer {
|
let sub_db = if inspect_config.freezer {
|
||||||
&db.cold_db
|
LevelDB::<E>::open(&cold_path).map_err(|e| format!("Unable to open freezer DB: {e:?}"))?
|
||||||
|
} else if inspect_config.blobs_db {
|
||||||
|
LevelDB::<E>::open(&blobs_path).map_err(|e| format!("Unable to open blobs DB: {e:?}"))?
|
||||||
} else {
|
} else {
|
||||||
&db.hot_db
|
LevelDB::<E>::open(&hot_path).map_err(|e| format!("Unable to open hot DB: {e:?}"))?
|
||||||
};
|
};
|
||||||
|
|
||||||
let skip = inspect_config.skip.unwrap_or(0);
|
let skip = inspect_config.skip.unwrap_or(0);
|
||||||
@ -385,6 +413,50 @@ pub fn inspect_db<E: EthSpec>(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct CompactConfig {
|
||||||
|
column: DBColumn,
|
||||||
|
freezer: bool,
|
||||||
|
blobs_db: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_compact_config(cli_args: &ArgMatches) -> Result<CompactConfig, String> {
|
||||||
|
let column = clap_utils::parse_required(cli_args, "column")?;
|
||||||
|
let freezer = cli_args.is_present("freezer");
|
||||||
|
let blobs_db = cli_args.is_present("blobs-db");
|
||||||
|
Ok(CompactConfig {
|
||||||
|
column,
|
||||||
|
freezer,
|
||||||
|
blobs_db,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn compact_db<E: EthSpec>(
|
||||||
|
compact_config: CompactConfig,
|
||||||
|
client_config: ClientConfig,
|
||||||
|
log: Logger,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let hot_path = client_config.get_db_path();
|
||||||
|
let cold_path = client_config.get_freezer_db_path();
|
||||||
|
let blobs_path = client_config.get_blobs_db_path();
|
||||||
|
let column = compact_config.column;
|
||||||
|
|
||||||
|
let (sub_db, db_name) = if compact_config.freezer {
|
||||||
|
(LevelDB::<E>::open(&cold_path)?, "freezer_db")
|
||||||
|
} else if compact_config.blobs_db {
|
||||||
|
(LevelDB::<E>::open(&blobs_path)?, "blobs_db")
|
||||||
|
} else {
|
||||||
|
(LevelDB::<E>::open(&hot_path)?, "hot_db")
|
||||||
|
};
|
||||||
|
info!(
|
||||||
|
log,
|
||||||
|
"Compacting database";
|
||||||
|
"db" => db_name,
|
||||||
|
"column" => ?column
|
||||||
|
);
|
||||||
|
sub_db.compact_column(column)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub struct MigrateConfig {
|
pub struct MigrateConfig {
|
||||||
to: SchemaVersion,
|
to: SchemaVersion,
|
||||||
}
|
}
|
||||||
@ -538,7 +610,10 @@ pub fn prune_states<E: EthSpec>(
|
|||||||
// Check that the user has confirmed they want to proceed.
|
// Check that the user has confirmed they want to proceed.
|
||||||
if !prune_config.confirm {
|
if !prune_config.confirm {
|
||||||
match db.get_anchor_info() {
|
match db.get_anchor_info() {
|
||||||
Some(anchor_info) if anchor_info.state_upper_limit == STATE_UPPER_LIMIT_NO_RETAIN => {
|
Some(anchor_info)
|
||||||
|
if anchor_info.state_lower_limit == 0
|
||||||
|
&& anchor_info.state_upper_limit == STATE_UPPER_LIMIT_NO_RETAIN =>
|
||||||
|
{
|
||||||
info!(log, "States have already been pruned");
|
info!(log, "States have already been pruned");
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
@ -586,7 +661,11 @@ pub fn run<T: EthSpec>(cli_args: &ArgMatches<'_>, env: Environment<T>) -> Result
|
|||||||
}
|
}
|
||||||
("inspect", Some(cli_args)) => {
|
("inspect", Some(cli_args)) => {
|
||||||
let inspect_config = parse_inspect_config(cli_args)?;
|
let inspect_config = parse_inspect_config(cli_args)?;
|
||||||
inspect_db(inspect_config, client_config, &context, log)
|
inspect_db::<T>(inspect_config, client_config)
|
||||||
|
}
|
||||||
|
("compact", Some(cli_args)) => {
|
||||||
|
let compact_config = parse_compact_config(cli_args)?;
|
||||||
|
compact_db::<T>(compact_config, client_config, log).map_err(format_err)
|
||||||
}
|
}
|
||||||
("prune-payloads", Some(_)) => {
|
("prune-payloads", Some(_)) => {
|
||||||
prune_payloads(client_config, &context, log).map_err(format_err)
|
prune_payloads(client_config, &context, log).map_err(format_err)
|
||||||
|
Loading…
Reference in New Issue
Block a user