From 1cebf41452b63ec622e9ca3a4eb2e38f5b9ddbc7 Mon Sep 17 00:00:00 2001
From: realbigsean <sean@sigmaprime.io>
Date: Tue, 23 Jan 2024 17:35:02 -0500
Subject: [PATCH] Backfill blob storage fix (#5119)

* store blobs in the correct db in backfill

* add database migration

* add migration file

* remove log info suggesting deneb isn't schedule

* add batching in blob migration
---
 .../beacon_chain/src/historical_blocks.rs     |  6 +-
 beacon_node/beacon_chain/src/schema_change.rs |  9 +++
 .../src/schema_change/migration_schema_v19.rs | 65 +++++++++++++++++++
 beacon_node/store/src/metadata.rs             |  2 +-
 book/src/database-migrations.md               |  6 +-
 5 files changed, 83 insertions(+), 5 deletions(-)
 create mode 100644 beacon_node/beacon_chain/src/schema_change/migration_schema_v19.rs

diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs
index b5e58e7ff..b5b42fcfc 100644
--- a/beacon_node/beacon_chain/src/historical_blocks.rs
+++ b/beacon_node/beacon_chain/src/historical_blocks.rs
@@ -101,8 +101,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
             ChunkWriter::<BlockRoots, _, _>::new(&self.store.cold_db, prev_block_slot.as_usize())?;
         let mut new_oldest_blob_slot = blob_info.oldest_blob_slot;
 
+        let mut blob_batch = Vec::with_capacity(n_blobs_lists_to_import);
         let mut cold_batch = Vec::with_capacity(blocks_to_import.len());
-        let mut hot_batch = Vec::with_capacity(blocks_to_import.len() + n_blobs_lists_to_import);
+        let mut hot_batch = Vec::with_capacity(blocks_to_import.len());
         let mut signed_blocks = Vec::with_capacity(blocks_to_import.len());
 
         for available_block in blocks_to_import.into_iter().rev() {
@@ -124,7 +125,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
             if let Some(blobs) = maybe_blobs {
                 new_oldest_blob_slot = Some(block.slot());
                 self.store
-                    .blobs_as_kv_store_ops(&block_root, blobs, &mut hot_batch);
+                    .blobs_as_kv_store_ops(&block_root, blobs, &mut blob_batch);
             }
 
             // Store block roots, including at all skip slots in the freezer DB.
@@ -199,6 +200,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
         // Write the I/O batches to disk, writing the blocks themselves first, as it's better
         // for the hot DB to contain extra blocks than for the cold DB to point to blocks that
         // do not exist.
+        self.store.blobs_db.do_atomically(blob_batch)?;
         self.store.hot_db.do_atomically(hot_batch)?;
         self.store.cold_db.do_atomically(cold_batch)?;
 
diff --git a/beacon_node/beacon_chain/src/schema_change.rs b/beacon_node/beacon_chain/src/schema_change.rs
index e42ee20c4..63eb72c43 100644
--- a/beacon_node/beacon_chain/src/schema_change.rs
+++ b/beacon_node/beacon_chain/src/schema_change.rs
@@ -1,6 +1,7 @@
 //! Utilities for managing database schema changes.
 mod migration_schema_v17;
 mod migration_schema_v18;
+mod migration_schema_v19;
 
 use crate::beacon_chain::BeaconChainTypes;
 use crate::types::ChainSpec;
@@ -69,6 +70,14 @@ pub fn migrate_schema<T: BeaconChainTypes>(
             let ops = migration_schema_v18::downgrade_from_v18::<T>(db.clone(), log)?;
             db.store_schema_version_atomically(to, ops)
         }
+        (SchemaVersion(18), SchemaVersion(19)) => {
+            let ops = migration_schema_v19::upgrade_to_v19::<T>(db.clone(), log)?;
+            db.store_schema_version_atomically(to, ops)
+        }
+        (SchemaVersion(19), SchemaVersion(18)) => {
+            let ops = migration_schema_v19::downgrade_from_v19::<T>(db.clone(), log)?;
+            db.store_schema_version_atomically(to, ops)
+        }
         // Anything else is an error.
         (_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
             target_version: to,
diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v19.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v19.rs
new file mode 100644
index 000000000..578e9bad3
--- /dev/null
+++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v19.rs
@@ -0,0 +1,65 @@
+use crate::beacon_chain::BeaconChainTypes;
+use slog::{debug, info, Logger};
+use std::sync::Arc;
+use store::{get_key_for_col, DBColumn, Error, HotColdDB, KeyValueStore, KeyValueStoreOp};
+
+pub fn upgrade_to_v19<T: BeaconChainTypes>(
+    db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
+    log: Logger,
+) -> Result<Vec<KeyValueStoreOp>, Error> {
+    let mut hot_delete_ops = vec![];
+    let mut blob_keys = vec![];
+    let column = DBColumn::BeaconBlob;
+
+    debug!(log, "Migrating from v18 to v19");
+    // Iterate through the blobs on disk.
+    for res in db.hot_db.iter_column_keys::<Vec<u8>>(column) {
+        let key = res?;
+        let key_col = get_key_for_col(column.as_str(), &key);
+        hot_delete_ops.push(KeyValueStoreOp::DeleteKey(key_col));
+        blob_keys.push(key);
+    }
+
+    let num_blobs = blob_keys.len();
+    debug!(log, "Collected {} blob lists to migrate", num_blobs);
+
+    let batch_size = 500;
+    let mut batch = Vec::with_capacity(batch_size);
+
+    for key in blob_keys {
+        let next_blob = db.hot_db.get_bytes(column.as_str(), &key)?;
+        if let Some(next_blob) = next_blob {
+            let key_col = get_key_for_col(column.as_str(), &key);
+            batch.push(KeyValueStoreOp::PutKeyValue(key_col, next_blob));
+
+            if batch.len() >= batch_size {
+                db.blobs_db.do_atomically(batch.clone())?;
+                batch.clear();
+            }
+        }
+    }
+
+    // Process the remaining batch if it's not empty
+    if !batch.is_empty() {
+        db.blobs_db.do_atomically(batch)?;
+    }
+
+    debug!(log, "Wrote {} blobs to the blobs db", num_blobs);
+
+    // Delete all the blobs
+    info!(log, "Upgrading to v19 schema");
+    Ok(hot_delete_ops)
+}
+
+pub fn downgrade_from_v19<T: BeaconChainTypes>(
+    _db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
+    log: Logger,
+) -> Result<Vec<KeyValueStoreOp>, Error> {
+    // No-op
+    info!(
+        log,
+        "Downgrading to v18 schema";
+    );
+
+    Ok(vec![])
+}
diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs
index 6fef74d7f..1675051bd 100644
--- a/beacon_node/store/src/metadata.rs
+++ b/beacon_node/store/src/metadata.rs
@@ -4,7 +4,7 @@ use ssz::{Decode, Encode};
 use ssz_derive::{Decode, Encode};
 use types::{Checkpoint, Hash256, Slot};
 
-pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(18);
+pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(19);
 
 // All the keys that get stored under the `BeaconMeta` column.
 //
diff --git a/book/src/database-migrations.md b/book/src/database-migrations.md
index 5b7b4d493..a4d28452d 100644
--- a/book/src/database-migrations.md
+++ b/book/src/database-migrations.md
@@ -16,7 +16,8 @@ validator client or the slasher**.
 
 | Lighthouse version | Release date | Schema version | Downgrade available? |
 |--------------------|--------------|----------------|----------------------|
-| v4.6.0             | Dec 2023     | v18            | yes before Deneb     |
+| v4.6.0             | Dec 2023     | v19            | yes before Deneb     |
+| v4.6.0-rc.0        | Dec 2023     | v18            | yes before Deneb     |
 | v4.5.0             | Sep 2023     | v17            | yes                  |
 | v4.4.0             | Aug 2023     | v17            | yes                  |
 | v4.3.0             | Jul 2023     | v17            | yes                  |
@@ -192,7 +193,8 @@ Here are the steps to prune historic states:
 
 | Lighthouse version | Release date | Schema version | Downgrade available?                |
 |--------------------|--------------|----------------|-------------------------------------|
-| v4.6.0             | Dec 2023     | v18            | yes before Deneb                    |
+| v4.6.0             | Dec 2023     | v19            | yes before Deneb                    |
+| v4.6.0-rc.0        | Dec 2023     | v18            | yes before Deneb                    |
 | v4.5.0             | Sep 2023     | v17            | yes                                 |
 | v4.4.0             | Aug 2023     | v17            | yes                                 |
 | v4.3.0             | Jul 2023     | v17            | yes                                 |