From d4f26ee123a3185b7e43b08dec69681265e1fde2 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Wed, 25 Oct 2023 03:42:24 +0000 Subject: [PATCH] Add block roots heal logic in v18 schema migration. (#4875) ## Issue Addressed Fixes #4697. This also unblocks the state pruning PR (#4835). Because self healing breaks if state pruning is applied to a database with missing block roots. ## Proposed Changes - Fill in the missing block roots between last restore point slot and split slot when upgrading to latest database version. --- .../src/schema_change/migration_schema_v18.rs | 3 + beacon_node/beacon_chain/tests/store_tests.rs | 179 +++++++++++++++++- beacon_node/store/src/hot_cold_store.rs | 29 +++ 3 files changed, 210 insertions(+), 1 deletion(-) diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v18.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v18.rs index 7a6409a34..14a560f97 100644 --- a/beacon_node/beacon_chain/src/schema_change/migration_schema_v18.rs +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v18.rs @@ -51,6 +51,9 @@ pub fn upgrade_to_v18( db: Arc>, log: Logger, ) -> Result, Error> { + db.heal_freezer_block_roots()?; + info!(log, "Healed freezer block roots"); + // No-op, even if Deneb has already occurred. The database is probably borked in this case, but // *maybe* the fork recovery will revert the minority fork and succeed. if let Some(deneb_fork_epoch) = db.get_chain_spec().deneb_fork_epoch { diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 5aeaa4c07..b62b7139f 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -29,8 +29,10 @@ use std::sync::Arc; use std::time::Duration; use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION}; use store::{ + chunked_vector::{chunk_key, Field}, + get_key_for_col, iter::{BlockRootsIterator, StateRootsIterator}, - HotColdDB, LevelDB, StoreConfig, + DBColumn, HotColdDB, KeyValueStore, KeyValueStoreOp, LevelDB, StoreConfig, }; use tempfile::{tempdir, TempDir}; use tokio::time::sleep; @@ -104,6 +106,181 @@ fn get_harness_generic( harness } +/// Tests that `store.heal_freezer_block_roots` inserts block roots between last restore point +/// slot and the split slot. +#[tokio::test] +async fn heal_freezer_block_roots() { + // chunk_size is hard-coded to 128 + let num_blocks_produced = E::slots_per_epoch() * 20; + let db_path = tempdir().unwrap(); + let store = get_store_generic( + &db_path, + StoreConfig { + slots_per_restore_point: 2 * E::slots_per_epoch(), + ..Default::default() + }, + test_spec::(), + ); + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); + + harness + .extend_chain( + num_blocks_produced as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + let split_slot = store.get_split_slot(); + assert_eq!(split_slot, 18 * E::slots_per_epoch()); + + // Do a heal before deleting to make sure that it doesn't break. + let last_restore_point_slot = Slot::new(16 * E::slots_per_epoch()); + store.heal_freezer_block_roots().unwrap(); + check_freezer_block_roots(&harness, last_restore_point_slot, split_slot); + + // Delete block roots between `last_restore_point_slot` and `split_slot`. + let chunk_index = >::chunk_index( + last_restore_point_slot.as_usize(), + ); + let key_chunk = get_key_for_col(DBColumn::BeaconBlockRoots.as_str(), &chunk_key(chunk_index)); + store + .cold_db + .do_atomically(vec![KeyValueStoreOp::DeleteKey(key_chunk)]) + .unwrap(); + + let block_root_err = store + .forwards_block_roots_iterator_until( + last_restore_point_slot, + last_restore_point_slot + 1, + || unreachable!(), + &harness.chain.spec, + ) + .unwrap() + .next() + .unwrap() + .unwrap_err(); + + assert!(matches!(block_root_err, store::Error::NoContinuationData)); + + // Re-insert block roots + store.heal_freezer_block_roots().unwrap(); + check_freezer_block_roots(&harness, last_restore_point_slot, split_slot); + + // Run for another two epochs to check that the invariant is maintained. + let additional_blocks_produced = 2 * E::slots_per_epoch(); + harness + .extend_slots(additional_blocks_produced as usize) + .await; + + check_finalization(&harness, num_blocks_produced + additional_blocks_produced); + check_split_slot(&harness, store); + check_chain_dump( + &harness, + num_blocks_produced + additional_blocks_produced + 1, + ); + check_iterators(&harness); +} + +/// Tests that `store.heal_freezer_block_roots` inserts block roots between last restore point +/// slot and the split slot. +#[tokio::test] +async fn heal_freezer_block_roots_with_skip_slots() { + // chunk_size is hard-coded to 128 + let num_blocks_produced = E::slots_per_epoch() * 20; + let db_path = tempdir().unwrap(); + let store = get_store_generic( + &db_path, + StoreConfig { + slots_per_restore_point: 2 * E::slots_per_epoch(), + ..Default::default() + }, + test_spec::(), + ); + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); + + let current_state = harness.get_current_state(); + let state_root = harness.get_current_state().tree_hash_root(); + let all_validators = &harness.get_all_validators(); + harness + .add_attested_blocks_at_slots( + current_state, + state_root, + &(1..=num_blocks_produced) + .filter(|i| i % 12 != 0) + .map(Slot::new) + .collect::>(), + all_validators, + ) + .await; + + // split slot should be 18 here + let split_slot = store.get_split_slot(); + assert_eq!(split_slot, 18 * E::slots_per_epoch()); + + let last_restore_point_slot = Slot::new(16 * E::slots_per_epoch()); + let chunk_index = >::chunk_index( + last_restore_point_slot.as_usize(), + ); + let key_chunk = get_key_for_col(DBColumn::BeaconBlockRoots.as_str(), &chunk_key(chunk_index)); + store + .cold_db + .do_atomically(vec![KeyValueStoreOp::DeleteKey(key_chunk)]) + .unwrap(); + + let block_root_err = store + .forwards_block_roots_iterator_until( + last_restore_point_slot, + last_restore_point_slot + 1, + || unreachable!(), + &harness.chain.spec, + ) + .unwrap() + .next() + .unwrap() + .unwrap_err(); + + assert!(matches!(block_root_err, store::Error::NoContinuationData)); + + // heal function + store.heal_freezer_block_roots().unwrap(); + check_freezer_block_roots(&harness, last_restore_point_slot, split_slot); + + // Run for another two epochs to check that the invariant is maintained. + let additional_blocks_produced = 2 * E::slots_per_epoch(); + harness + .extend_slots(additional_blocks_produced as usize) + .await; + + check_finalization(&harness, num_blocks_produced + additional_blocks_produced); + check_split_slot(&harness, store); + check_iterators(&harness); +} + +fn check_freezer_block_roots( + harness: &TestHarness, + last_restore_point_slot: Slot, + split_slot: Slot, +) { + for slot in (last_restore_point_slot.as_u64()..split_slot.as_u64()).map(Slot::new) { + let (block_root, result_slot) = harness + .chain + .store + .forwards_block_roots_iterator_until(slot, slot, || unreachable!(), &harness.chain.spec) + .unwrap() + .next() + .unwrap() + .unwrap(); + assert_eq!(slot, result_slot); + let expected_block_root = harness + .chain + .block_root_at_slot(slot, WhenSlotSkipped::Prev) + .unwrap() + .unwrap(); + assert_eq!(expected_block_root, block_root); + } +} + #[tokio::test] async fn full_participation_no_skips() { let num_blocks_produced = E::slots_per_epoch() * 5; diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 91b03f9f5..1b8fa0ebf 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -2218,6 +2218,35 @@ impl, Cold: ItemStore> HotColdDB Ok(()) } + + pub fn heal_freezer_block_roots(&self) -> Result<(), Error> { + let split = self.get_split_info(); + let last_restore_point_slot = (split.slot - 1) / self.config.slots_per_restore_point + * self.config.slots_per_restore_point; + + // Load split state (which has access to block roots). + let (_, split_state) = self + .get_advanced_hot_state(split.block_root, split.slot, split.state_root)? + .ok_or(HotColdDBError::MissingSplitState( + split.state_root, + split.slot, + ))?; + + let mut batch = vec![]; + let mut chunk_writer = ChunkWriter::::new( + &self.cold_db, + last_restore_point_slot.as_usize(), + )?; + + for slot in (last_restore_point_slot.as_u64()..split.slot.as_u64()).map(Slot::new) { + let block_root = *split_state.get_block_root(slot)?; + chunk_writer.set(slot.as_usize(), block_root, &mut batch)?; + } + chunk_writer.write(&mut batch)?; + self.cold_db.do_atomically(batch)?; + + Ok(()) + } } /// Advance the split point of the store, moving new finalized states to the freezer.