diff --git a/beacon_node/beacon_chain/src/schema_change.rs b/beacon_node/beacon_chain/src/schema_change.rs index 73906b1b5..8684bafe2 100644 --- a/beacon_node/beacon_chain/src/schema_change.rs +++ b/beacon_node/beacon_chain/src/schema_change.rs @@ -1,6 +1,7 @@ //! Utilities for managing database schema changes. mod migration_schema_v12; mod migration_schema_v13; +mod migration_schema_v14; use crate::beacon_chain::{BeaconChainTypes, ETH1_CACHE_DB_KEY}; use crate::eth1_chain::SszEth1; @@ -114,6 +115,14 @@ pub fn migrate_schema( Ok(()) } + (SchemaVersion(13), SchemaVersion(14)) => { + let ops = migration_schema_v14::upgrade_to_v14::(db.clone(), log)?; + db.store_schema_version_atomically(to, ops) + } + (SchemaVersion(14), SchemaVersion(13)) => { + let ops = migration_schema_v14::downgrade_from_v14::(db.clone(), log)?; + db.store_schema_version_atomically(to, ops) + } // Anything else is an error. (_, _) => Err(HotColdDBError::UnsupportedSchemaVersion { target_version: to, diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v12.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v12.rs index bb72b28c0..c9aa2097f 100644 --- a/beacon_node/beacon_chain/src/schema_change/migration_schema_v12.rs +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v12.rs @@ -168,16 +168,14 @@ pub fn downgrade_from_v12( log: Logger, ) -> Result, Error> { // Load a V12 op pool and transform it to V5. - let PersistedOperationPoolV12 { + let PersistedOperationPoolV12:: { attestations, sync_contributions, attester_slashings, proposer_slashings, voluntary_exits, - } = if let Some(PersistedOperationPool::::V12(op_pool)) = - db.get_item(&OP_POOL_DB_KEY)? - { - op_pool + } = if let Some(op_pool_v12) = db.get_item(&OP_POOL_DB_KEY)? { + op_pool_v12 } else { debug!(log, "Nothing to do, no operation pool stored"); return Ok(vec![]); diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v14.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v14.rs new file mode 100644 index 000000000..02422a403 --- /dev/null +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v14.rs @@ -0,0 +1,75 @@ +use crate::beacon_chain::{BeaconChainTypes, OP_POOL_DB_KEY}; +use operation_pool::{ + PersistedOperationPool, PersistedOperationPoolV12, PersistedOperationPoolV14, +}; +use slog::{debug, info, Logger}; +use std::sync::Arc; +use store::{Error, HotColdDB, KeyValueStoreOp, StoreItem}; + +pub fn upgrade_to_v14( + db: Arc>, + log: Logger, +) -> Result, Error> { + // Load a V12 op pool and transform it to V14. + let PersistedOperationPoolV12:: { + attestations, + sync_contributions, + attester_slashings, + proposer_slashings, + voluntary_exits, + } = if let Some(op_pool_v12) = db.get_item(&OP_POOL_DB_KEY)? { + op_pool_v12 + } else { + debug!(log, "Nothing to do, no operation pool stored"); + return Ok(vec![]); + }; + + // initialize with empty vector + let bls_to_execution_changes = vec![]; + let v14 = PersistedOperationPool::V14(PersistedOperationPoolV14 { + attestations, + sync_contributions, + attester_slashings, + proposer_slashings, + voluntary_exits, + bls_to_execution_changes, + }); + Ok(vec![v14.as_kv_store_op(OP_POOL_DB_KEY)]) +} + +pub fn downgrade_from_v14( + db: Arc>, + log: Logger, +) -> Result, Error> { + // Load a V14 op pool and transform it to V12. + let PersistedOperationPoolV14 { + attestations, + sync_contributions, + attester_slashings, + proposer_slashings, + voluntary_exits, + bls_to_execution_changes, + } = if let Some(PersistedOperationPool::::V14(op_pool)) = + db.get_item(&OP_POOL_DB_KEY)? + { + op_pool + } else { + debug!(log, "Nothing to do, no operation pool stored"); + return Ok(vec![]); + }; + + info!( + log, + "Dropping bls_to_execution_changes from pool"; + "count" => bls_to_execution_changes.len(), + ); + + let v12 = PersistedOperationPoolV12 { + attestations, + sync_contributions, + attester_slashings, + proposer_slashings, + voluntary_exits, + }; + Ok(vec![v12.as_kv_store_op(OP_POOL_DB_KEY)]) +} diff --git a/beacon_node/operation_pool/src/lib.rs b/beacon_node/operation_pool/src/lib.rs index a69e0a750..70e0d56bc 100644 --- a/beacon_node/operation_pool/src/lib.rs +++ b/beacon_node/operation_pool/src/lib.rs @@ -12,7 +12,8 @@ pub use attestation::AttMaxCover; pub use attestation_storage::{AttestationRef, SplitAttestation}; pub use max_cover::MaxCover; pub use persistence::{ - PersistedOperationPool, PersistedOperationPoolV12, PersistedOperationPoolV5, + PersistedOperationPool, PersistedOperationPoolV12, PersistedOperationPoolV14, + PersistedOperationPoolV5, }; pub use reward_cache::RewardCache; diff --git a/beacon_node/operation_pool/src/persistence.rs b/beacon_node/operation_pool/src/persistence.rs index b232e5a55..043e6fb7f 100644 --- a/beacon_node/operation_pool/src/persistence.rs +++ b/beacon_node/operation_pool/src/persistence.rs @@ -18,7 +18,7 @@ type PersistedSyncContributions = Vec<(SyncAggregateId, Vec { #[superstruct(only(V5))] pub attestations_v5: Vec<(AttestationId, Vec>)>, /// Attestations and their attesting indices. - #[superstruct(only(V12))] + #[superstruct(only(V12, V14))] pub attestations: Vec<(Attestation, Vec)>, /// Mapping from sync contribution ID to sync contributions and aggregate. pub sync_contributions: PersistedSyncContributions, @@ -40,20 +40,23 @@ pub struct PersistedOperationPool { #[superstruct(only(V5))] pub attester_slashings_v5: Vec<(AttesterSlashing, ForkVersion)>, /// Attester slashings. - #[superstruct(only(V12))] + #[superstruct(only(V12, V14))] pub attester_slashings: Vec, T>>, /// [DEPRECATED] Proposer slashings. #[superstruct(only(V5))] pub proposer_slashings_v5: Vec, /// Proposer slashings with fork information. - #[superstruct(only(V12))] + #[superstruct(only(V12, V14))] pub proposer_slashings: Vec>, /// [DEPRECATED] Voluntary exits. #[superstruct(only(V5))] pub voluntary_exits_v5: Vec, /// Voluntary exits with fork information. - #[superstruct(only(V12))] + #[superstruct(only(V12, V14))] pub voluntary_exits: Vec>, + /// BLS to Execution Changes + #[superstruct(only(V14))] + pub bls_to_execution_changes: Vec>, } impl PersistedOperationPool { @@ -99,12 +102,20 @@ impl PersistedOperationPool { .map(|(_, exit)| exit.clone()) .collect(); - PersistedOperationPool::V12(PersistedOperationPoolV12 { + let bls_to_execution_changes = operation_pool + .bls_to_execution_changes + .read() + .iter() + .map(|(_, bls_to_execution_change)| bls_to_execution_change.clone()) + .collect(); + + PersistedOperationPool::V14(PersistedOperationPoolV14 { attestations, sync_contributions, attester_slashings, proposer_slashings, voluntary_exits, + bls_to_execution_changes, }) } @@ -127,23 +138,41 @@ impl PersistedOperationPool { ); let sync_contributions = RwLock::new(self.sync_contributions().iter().cloned().collect()); let attestations = match self { - PersistedOperationPool::V5(_) => return Err(OpPoolError::IncorrectOpPoolVariant), - PersistedOperationPool::V12(pool) => { + PersistedOperationPool::V5(_) | PersistedOperationPool::V12(_) => { + return Err(OpPoolError::IncorrectOpPoolVariant) + } + PersistedOperationPool::V14(ref pool) => { let mut map = AttestationMap::default(); - for (att, attesting_indices) in pool.attestations { + for (att, attesting_indices) in pool.attestations.clone() { map.insert(att, attesting_indices); } RwLock::new(map) } }; + let bls_to_execution_changes = match self { + PersistedOperationPool::V5(_) | PersistedOperationPool::V12(_) => { + return Err(OpPoolError::IncorrectOpPoolVariant) + } + PersistedOperationPool::V14(pool) => RwLock::new( + pool.bls_to_execution_changes + .iter() + .cloned() + .map(|bls_to_execution_change| { + ( + bls_to_execution_change.as_inner().message.validator_index, + bls_to_execution_change, + ) + }) + .collect(), + ), + }; let op_pool = OperationPool { attestations, sync_contributions, attester_slashings, proposer_slashings, voluntary_exits, - // FIXME(capella): implement schema migration for address changes in op pool - bls_to_execution_changes: Default::default(), + bls_to_execution_changes, reward_cache: Default::default(), _phantom: Default::default(), }; @@ -165,6 +194,20 @@ impl StoreItem for PersistedOperationPoolV5 { } } +impl StoreItem for PersistedOperationPoolV12 { + fn db_column() -> DBColumn { + DBColumn::OpPool + } + + fn as_store_bytes(&self) -> Vec { + self.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + PersistedOperationPoolV12::from_ssz_bytes(bytes).map_err(Into::into) + } +} + /// Deserialization for `PersistedOperationPool` defaults to `PersistedOperationPool::V12`. impl StoreItem for PersistedOperationPool { fn db_column() -> DBColumn { @@ -177,8 +220,8 @@ impl StoreItem for PersistedOperationPool { fn from_store_bytes(bytes: &[u8]) -> Result { // Default deserialization to the latest variant. - PersistedOperationPoolV12::from_ssz_bytes(bytes) - .map(Self::V12) + PersistedOperationPoolV14::from_ssz_bytes(bytes) + .map(Self::V14) .map_err(Into::into) } } diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index 5cb3f1220..fb5769635 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -4,7 +4,7 @@ use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use types::{Checkpoint, Hash256, Slot}; -pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(13); +pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(14); // All the keys that get stored under the `BeaconMeta` column. //