mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-22 03:30:35 +00:00
Import old attestation store (#7466)
* import attestation to new data structure * add tests * add failure massages * added signing root to data * added signing root to data * public keys 48 length * remove redundant loop * fix proposals * fix manage dir name * Omit redundant nil check on slices * nishant feedback * add test
This commit is contained in:
parent
7cc32c4dda
commit
3d0fc8bc64
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
bolt "go.etcd.io/bbolt"
|
||||
@ -82,7 +83,7 @@ func (hd EncHistoryData) getTargetData(ctx context.Context, target uint64) (*His
|
||||
|
||||
history.Source = bytesutil.FromBytes8(hd[cursor : cursor+sourceSize])
|
||||
sr := make([]byte, 32)
|
||||
copy(hd[cursor+sourceSize:cursor+historySize], sr)
|
||||
copy(sr, hd[cursor+sourceSize:cursor+historySize])
|
||||
history.SigningRoot = sr
|
||||
return history, nil
|
||||
}
|
||||
@ -150,3 +151,50 @@ func (store *Store) SaveAttestationHistoryNewForPubKeys(ctx context.Context, his
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// ImportOldAttestationFormat import old attestation format data into the new attestation format
|
||||
func (store *Store) ImportOldAttestationFormat(ctx context.Context) error {
|
||||
ctx, span := trace.StartSpan(ctx, "Validator.ImportOldAttestationFormat")
|
||||
defer span.End()
|
||||
var allKeys [][48]byte
|
||||
|
||||
if err := store.db.View(func(tx *bolt.Tx) error {
|
||||
attestationsBucket := tx.Bucket(historicAttestationsBucket)
|
||||
if err := attestationsBucket.ForEach(func(pubKey, _ []byte) error {
|
||||
var pubKeyCopy [48]byte
|
||||
copy(pubKeyCopy[:], pubKey)
|
||||
allKeys = append(allKeys, pubKeyCopy)
|
||||
return nil
|
||||
}); err != nil {
|
||||
return errors.Wrapf(err, "could not retrieve attestations for source in %s", store.databasePath)
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
allKeys = removeDuplicateKeys(allKeys)
|
||||
attMap, err := store.AttestationHistoryForPubKeys(ctx, allKeys)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "could not retrieve data for public keys %v", allKeys)
|
||||
}
|
||||
dataMap := make(map[[48]byte]EncHistoryData)
|
||||
for key, atts := range attMap {
|
||||
dataMap[key] = newAttestationHistoryArray(atts.LatestEpochWritten)
|
||||
dataMap[key], err = dataMap[key].setLatestEpochWritten(ctx, atts.LatestEpochWritten)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for target, source := range atts.TargetToSource {
|
||||
dataMap[key], err = dataMap[key].setTargetData(ctx, target, &HistoryData{
|
||||
Source: source,
|
||||
SigningRoot: []byte{1},
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
err = store.SaveAttestationHistoryNewForPubKeys(ctx, dataMap)
|
||||
return err
|
||||
}
|
||||
|
@ -4,10 +4,12 @@ import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
slashpb "github.com/prysmaticlabs/prysm/proto/slashing"
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil/require"
|
||||
bolt "go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
func TestNewAttestationHistoryArray(t *testing.T) {
|
||||
@ -113,11 +115,15 @@ func TestSetTargetData(t *testing.T) {
|
||||
})
|
||||
if tt.error == "" {
|
||||
require.NoError(t, err)
|
||||
|
||||
} else {
|
||||
assert.ErrorContains(t, tt.error, err)
|
||||
td, err := enc.getTargetData(ctx, tt.target)
|
||||
require.NoError(t, err)
|
||||
require.DeepEqual(t, bytesutil.PadTo(tt.signingRoot, 32), td.SigningRoot)
|
||||
require.Equal(t, tt.source, td.Source)
|
||||
return
|
||||
}
|
||||
assert.ErrorContains(t, tt.error, err)
|
||||
require.DeepEqual(t, tt.expected, enc)
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
@ -166,3 +172,70 @@ func TestAttestationHistoryForPubKeysNew_OK(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.DeepEqual(t, setAttHistoryForPubKeys, historyForPubKeys, "Expected attestation history epoch bits to be empty")
|
||||
}
|
||||
func TestStore_ImportOldAttestationFormatBadSourceFormat(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
pubKeys := [][48]byte{{3}, {4}}
|
||||
db := setupDB(t, pubKeys)
|
||||
err := db.update(func(tx *bolt.Tx) error {
|
||||
bucket := tx.Bucket(historicAttestationsBucket)
|
||||
for _, pubKey := range pubKeys {
|
||||
if err := bucket.Put(pubKey[:], []byte{1}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.ErrorContains(t, "could not retrieve data for public keys", db.ImportOldAttestationFormat(ctx))
|
||||
}
|
||||
|
||||
func TestStore_ImportOldAttestationFormat(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
pubKeys := [][48]byte{{3}, {4}}
|
||||
db := setupDB(t, pubKeys)
|
||||
|
||||
farFuture := params.BeaconConfig().FarFutureEpoch
|
||||
newMap := make(map[uint64]uint64)
|
||||
// The validator attested at target epoch 2 but had no attestations for target epochs 0 and 1.
|
||||
newMap[0] = farFuture
|
||||
newMap[1] = farFuture
|
||||
newMap[2] = 1
|
||||
history := &slashpb.AttestationHistory{
|
||||
TargetToSource: newMap,
|
||||
LatestEpochWritten: 2,
|
||||
}
|
||||
|
||||
newMap2 := make(map[uint64]uint64)
|
||||
// The validator attested at target epoch 1 and 3 but had no attestations for target epochs 0 and 2.
|
||||
newMap2[0] = farFuture
|
||||
newMap2[1] = 0
|
||||
newMap2[2] = farFuture
|
||||
newMap2[3] = 2
|
||||
history2 := &slashpb.AttestationHistory{
|
||||
TargetToSource: newMap2,
|
||||
LatestEpochWritten: 3,
|
||||
}
|
||||
|
||||
attestationHistory := make(map[[48]byte]*slashpb.AttestationHistory)
|
||||
attestationHistory[pubKeys[0]] = history
|
||||
attestationHistory[pubKeys[1]] = history2
|
||||
|
||||
require.NoError(t, db.SaveAttestationHistoryForPubKeys(context.Background(), attestationHistory), "Saving attestation history failed")
|
||||
require.NoError(t, db.ImportOldAttestationFormat(ctx), "Import attestation history failed")
|
||||
|
||||
attHis, err := db.AttestationHistoryNewForPubKeys(ctx, pubKeys)
|
||||
require.NoError(t, err)
|
||||
for pk, encHis := range attHis {
|
||||
his, ok := attestationHistory[pk]
|
||||
require.Equal(t, true, ok, "Missing public key in the original data")
|
||||
lew, err := encHis.getLatestEpochWritten(ctx)
|
||||
require.NoError(t, err, "Failed to get latest epoch written")
|
||||
require.Equal(t, his.LatestEpochWritten, lew, "LatestEpochWritten is not equal to the source data value")
|
||||
for target, source := range his.TargetToSource {
|
||||
hd, err := encHis.getTargetData(ctx, target)
|
||||
require.NoError(t, err, "Failed to get target data for epoch: %d", target)
|
||||
require.Equal(t, source, hd.Source, "Source epoch is different")
|
||||
require.DeepEqual(t, bytesutil.PadTo([]byte{1}, 32), hd.SigningRoot, "Signing root differs in imported data")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,7 +1,6 @@
|
||||
package kv
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"path/filepath"
|
||||
@ -20,12 +19,12 @@ type epochProposals struct {
|
||||
}
|
||||
|
||||
type pubKeyProposals struct {
|
||||
PubKey []byte
|
||||
PubKey [48]byte
|
||||
Proposals []epochProposals
|
||||
}
|
||||
|
||||
type pubKeyAttestations struct {
|
||||
PubKey []byte
|
||||
PubKey [48]byte
|
||||
Attestations []byte
|
||||
}
|
||||
|
||||
@ -54,13 +53,13 @@ func Split(ctx context.Context, sourceStore *Store, targetDirectory string) erro
|
||||
return createSplitTargetStores(targetDirectory, allProposals, allAttestations)
|
||||
}
|
||||
|
||||
func getPubKeyProposals(pubKey []byte, proposalsBucket *bolt.Bucket) (*pubKeyProposals, error) {
|
||||
func getPubKeyProposals(pubKey [48]byte, proposalsBucket *bolt.Bucket) (*pubKeyProposals, error) {
|
||||
pubKeyProposals := pubKeyProposals{
|
||||
PubKey: pubKey,
|
||||
Proposals: []epochProposals{},
|
||||
}
|
||||
|
||||
pubKeyBucket := proposalsBucket.Bucket(pubKey)
|
||||
pubKeyBucket := proposalsBucket.Bucket(pubKey[:])
|
||||
if pubKeyBucket == nil {
|
||||
return &pubKeyProposals, nil
|
||||
}
|
||||
@ -104,7 +103,7 @@ func createMergeTargetStore(
|
||||
err = newStore.update(func(tx *bolt.Tx) error {
|
||||
allProposalsBucket := tx.Bucket(historicProposalsBucket)
|
||||
for _, pubKeyProposals := range allProposals {
|
||||
proposalsBucket, err := createProposalsBucket(allProposalsBucket, pubKeyProposals.PubKey)
|
||||
proposalsBucket, err := createProposalsBucket(allProposalsBucket, pubKeyProposals.PubKey[:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -147,7 +146,7 @@ func createSplitTargetStores(
|
||||
}()
|
||||
|
||||
for _, pubKeyProposals := range allProposals {
|
||||
dirName := hex.EncodeToString(pubKeyProposals.PubKey)[:12]
|
||||
dirName := hex.EncodeToString(pubKeyProposals.PubKey[:])[:12]
|
||||
path := filepath.Join(targetDirectory, dirName)
|
||||
newStore, err := NewKVStore(path, [][48]byte{})
|
||||
if err != nil {
|
||||
@ -157,7 +156,7 @@ func createSplitTargetStores(
|
||||
|
||||
if err := newStore.update(func(tx *bolt.Tx) error {
|
||||
allProposalsBucket := tx.Bucket(historicProposalsBucket)
|
||||
proposalsBucket, err := createProposalsBucket(allProposalsBucket, pubKeyProposals.PubKey)
|
||||
proposalsBucket, err := createProposalsBucket(allProposalsBucket, pubKeyProposals.PubKey[:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -167,7 +166,7 @@ func createSplitTargetStores(
|
||||
|
||||
attestationsBucket := tx.Bucket(historicAttestationsBucket)
|
||||
for _, pubKeyAttestations := range allAttestations {
|
||||
if string(pubKeyAttestations.PubKey) == string(pubKeyProposals.PubKey) {
|
||||
if string(pubKeyAttestations.PubKey[:]) == string(pubKeyProposals.PubKey[:]) {
|
||||
if err := addAttestations(attestationsBucket, pubKeyAttestations); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -185,13 +184,13 @@ func createSplitTargetStores(
|
||||
for _, pubKeyAttestations := range allAttestations {
|
||||
var hasMatchingProposals = false
|
||||
for _, pubKeyProposals := range allProposals {
|
||||
if string(pubKeyAttestations.PubKey) == string(pubKeyProposals.PubKey) {
|
||||
if string(pubKeyAttestations.PubKey[:]) == string(pubKeyProposals.PubKey[:]) {
|
||||
hasMatchingProposals = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !hasMatchingProposals {
|
||||
dirName := hex.EncodeToString(pubKeyAttestations.PubKey)[:12]
|
||||
dirName := hex.EncodeToString(pubKeyAttestations.PubKey[:])[:12]
|
||||
path := filepath.Join(targetDirectory, dirName)
|
||||
newStore, err := NewKVStore(path, [][48]byte{})
|
||||
if err != nil {
|
||||
@ -218,13 +217,13 @@ func getAllProposalsAndAllAttestations(stores []*Store) ([]pubKeyProposals, []pu
|
||||
for _, store := range stores {
|
||||
// Storing keys upfront will allow using several short transactions (one for every key)
|
||||
// instead of one long-running transaction for all keys.
|
||||
var allKeys [][]byte
|
||||
var allKeys [][48]byte
|
||||
|
||||
if err := store.db.View(func(tx *bolt.Tx) error {
|
||||
proposalsBucket := tx.Bucket(historicProposalsBucket)
|
||||
if err := proposalsBucket.ForEach(func(pubKey, _ []byte) error {
|
||||
pubKeyCopy := make([]byte, len(pubKey))
|
||||
copy(pubKeyCopy, pubKey)
|
||||
var pubKeyCopy [48]byte
|
||||
copy(pubKeyCopy[:], pubKey)
|
||||
allKeys = append(allKeys, pubKeyCopy)
|
||||
return nil
|
||||
}); err != nil {
|
||||
@ -233,8 +232,8 @@ func getAllProposalsAndAllAttestations(stores []*Store) ([]pubKeyProposals, []pu
|
||||
|
||||
attestationsBucket := tx.Bucket(historicAttestationsBucket)
|
||||
if err := attestationsBucket.ForEach(func(pubKey, _ []byte) error {
|
||||
pubKeyCopy := make([]byte, len(pubKey))
|
||||
copy(pubKeyCopy, pubKey)
|
||||
var pubKeyCopy [48]byte
|
||||
copy(pubKeyCopy[:], pubKey)
|
||||
allKeys = append(allKeys, pubKeyCopy)
|
||||
return nil
|
||||
}); err != nil {
|
||||
@ -258,7 +257,7 @@ func getAllProposalsAndAllAttestations(stores []*Store) ([]pubKeyProposals, []pu
|
||||
allProposals = append(allProposals, *pubKeyProposals)
|
||||
|
||||
attestationsBucket := tx.Bucket(historicAttestationsBucket)
|
||||
v := attestationsBucket.Get(pubKey)
|
||||
v := attestationsBucket.Get(pubKey[:])
|
||||
if v != nil {
|
||||
attestations := pubKeyAttestations{
|
||||
PubKey: pubKey,
|
||||
@ -296,7 +295,7 @@ func addEpochProposals(bucket *bolt.Bucket, proposals []epochProposals) error {
|
||||
}
|
||||
|
||||
func addAttestations(bucket *bolt.Bucket, attestations pubKeyAttestations) error {
|
||||
if err := bucket.Put(attestations.PubKey, attestations.Attestations); err != nil {
|
||||
if err := bucket.Put(attestations.PubKey[:], attestations.Attestations); err != nil {
|
||||
return errors.Wrapf(
|
||||
err,
|
||||
"could not add public key attestations for public key %x",
|
||||
@ -305,13 +304,13 @@ func addAttestations(bucket *bolt.Bucket, attestations pubKeyAttestations) error
|
||||
return nil
|
||||
}
|
||||
|
||||
func removeDuplicateKeys(keys [][]byte) [][]byte {
|
||||
func removeDuplicateKeys(keys [][48]byte) [][48]byte {
|
||||
last := 0
|
||||
|
||||
next:
|
||||
for _, k1 := range keys {
|
||||
for _, k2 := range keys[:last] {
|
||||
if bytes.Equal(k1, k2) {
|
||||
if k1 == k2 {
|
||||
continue next
|
||||
}
|
||||
}
|
||||
|
@ -28,7 +28,7 @@ func (store *Store) ProposalHistoryForSlot(ctx context.Context, publicKey []byte
|
||||
return fmt.Errorf("validator history empty for public key %#x", publicKey)
|
||||
}
|
||||
sr := valBucket.Get(bytesutil.Uint64ToBytesBigEndian(slot))
|
||||
if sr == nil || len(sr) == 0 {
|
||||
if len(sr) == 0 {
|
||||
return nil
|
||||
}
|
||||
copy(signingRoot, sr)
|
||||
@ -62,12 +62,12 @@ func (store *Store) ImportProposalHistory(ctx context.Context) error {
|
||||
ctx, span := trace.StartSpan(ctx, "Validator.ImportProposalHistory")
|
||||
defer span.End()
|
||||
|
||||
var allKeys [][]byte
|
||||
var allKeys [][48]byte
|
||||
err := store.db.View(func(tx *bolt.Tx) error {
|
||||
proposalsBucket := tx.Bucket(historicProposalsBucket)
|
||||
if err := proposalsBucket.ForEach(func(pubKey, _ []byte) error {
|
||||
pubKeyCopy := make([]byte, len(pubKey))
|
||||
copy(pubKeyCopy, pubKey)
|
||||
var pubKeyCopy [48]byte
|
||||
copy(pubKeyCopy[:], pubKey)
|
||||
allKeys = append(allKeys, pubKeyCopy)
|
||||
return nil
|
||||
}); err != nil {
|
||||
@ -97,7 +97,7 @@ func (store *Store) ImportProposalHistory(ctx context.Context) error {
|
||||
err = store.db.Update(func(tx *bolt.Tx) error {
|
||||
newProposalsBucket := tx.Bucket(newhistoricProposalsBucket)
|
||||
for _, pr := range prs {
|
||||
valBucket, err := newProposalsBucket.CreateBucketIfNotExists(pr.PubKey)
|
||||
valBucket, err := newProposalsBucket.CreateBucketIfNotExists(pr.PubKey[:])
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not could not create bucket for public key")
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user