mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-25 12:57:18 +00:00
18be4a4e3e
* Immediate save of validator protection data * fix error log * separate delete from save * remove logs * rename delete into reset * comment fix Co-authored-by: Raul Jordan <raul@prysmaticlabs.com> Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
281 lines
9.6 KiB
Go
281 lines
9.6 KiB
Go
package kv
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"github.com/pkg/errors"
|
|
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
|
"github.com/prysmaticlabs/prysm/shared/params"
|
|
log "github.com/sirupsen/logrus"
|
|
bolt "go.etcd.io/bbolt"
|
|
"go.opencensus.io/trace"
|
|
)
|
|
|
|
const (
|
|
// The size of each data entry in bytes for the source epoch (8 bytes) and signing root (32 bytes).
|
|
uint64Size = 8
|
|
latestEpochWrittenSize = uint64Size
|
|
targetSize = uint64Size
|
|
sourceSize = uint64Size
|
|
signingRootSize = 32
|
|
historySize = targetSize + sourceSize + signingRootSize
|
|
minimalSize = latestEpochWrittenSize
|
|
)
|
|
|
|
// HistoryData stores the needed data to confirm if an attestation is slashable
|
|
// or repeated.
|
|
type HistoryData struct {
|
|
Source uint64
|
|
SigningRoot []byte
|
|
}
|
|
|
|
// EncHistoryData encapsulated history data.
|
|
type EncHistoryData []byte
|
|
|
|
func (hd EncHistoryData) assertSize() error {
|
|
if hd == nil || len(hd) < minimalSize {
|
|
return fmt.Errorf("encapsulated data size: %d is smaller then minimal size: %d", len(hd), minimalSize)
|
|
}
|
|
if (len(hd)-minimalSize)%historySize != 0 {
|
|
return fmt.Errorf("encapsulated data size: %d is not a multiple of entry size: %d", len(hd), historySize)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (h *HistoryData) IsEmpty() bool {
|
|
if h == (*HistoryData)(nil) {
|
|
return true
|
|
}
|
|
if h.Source == params.BeaconConfig().FarFutureEpoch {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func emptyHistoryData() *HistoryData {
|
|
h := &HistoryData{Source: params.BeaconConfig().FarFutureEpoch, SigningRoot: bytesutil.PadTo([]byte{}, 32)}
|
|
return h
|
|
}
|
|
|
|
// NewAttestationHistoryArray creates a new encapsulated attestation history byte array
|
|
// sized by the latest epoch written.
|
|
func NewAttestationHistoryArray(target uint64) EncHistoryData {
|
|
relativeTarget := target % params.BeaconConfig().WeakSubjectivityPeriod
|
|
historyDataSize := (relativeTarget + 1) * historySize
|
|
arraySize := latestEpochWrittenSize + historyDataSize
|
|
en := make(EncHistoryData, arraySize)
|
|
enc := en
|
|
ctx := context.Background()
|
|
var err error
|
|
for i := uint64(0); i <= target%params.BeaconConfig().WeakSubjectivityPeriod; i++ {
|
|
enc, err = enc.SetTargetData(ctx, i, emptyHistoryData())
|
|
if err != nil {
|
|
log.WithError(err).Error("Failed to set empty target data")
|
|
}
|
|
}
|
|
return enc
|
|
}
|
|
|
|
func (hd EncHistoryData) GetLatestEpochWritten(ctx context.Context) (uint64, error) {
|
|
if err := hd.assertSize(); err != nil {
|
|
return 0, err
|
|
}
|
|
return bytesutil.FromBytes8(hd[:latestEpochWrittenSize]), nil
|
|
}
|
|
|
|
func (hd EncHistoryData) SetLatestEpochWritten(ctx context.Context, latestEpochWritten uint64) (EncHistoryData, error) {
|
|
if err := hd.assertSize(); err != nil {
|
|
return nil, err
|
|
}
|
|
copy(hd[:latestEpochWrittenSize], bytesutil.Uint64ToBytesLittleEndian(latestEpochWritten))
|
|
return hd, nil
|
|
}
|
|
|
|
func (hd EncHistoryData) GetTargetData(ctx context.Context, target uint64) (*HistoryData, error) {
|
|
if err := hd.assertSize(); err != nil {
|
|
return nil, err
|
|
}
|
|
// Cursor for the location to read target epoch from.
|
|
// Modulus of target epoch X weak subjectivity period in order to have maximum size to the encapsulated data array.
|
|
cursor := (target%params.BeaconConfig().WeakSubjectivityPeriod)*historySize + latestEpochWrittenSize
|
|
if uint64(len(hd)) < cursor+historySize {
|
|
return nil, nil
|
|
}
|
|
history := &HistoryData{}
|
|
history.Source = bytesutil.FromBytes8(hd[cursor : cursor+sourceSize])
|
|
sr := make([]byte, 32)
|
|
copy(sr, hd[cursor+sourceSize:cursor+historySize])
|
|
history.SigningRoot = sr
|
|
return history, nil
|
|
}
|
|
|
|
func (hd EncHistoryData) SetTargetData(ctx context.Context, target uint64, historyData *HistoryData) (EncHistoryData, error) {
|
|
if err := hd.assertSize(); err != nil {
|
|
return nil, err
|
|
}
|
|
// Cursor for the location to write target epoch to.
|
|
// Modulus of target epoch X weak subjectivity period in order to have maximum size to the encapsulated data array.
|
|
cursor := latestEpochWrittenSize + (target%params.BeaconConfig().WeakSubjectivityPeriod)*historySize
|
|
|
|
if uint64(len(hd)) < cursor+historySize {
|
|
ext := make([]byte, cursor+historySize-uint64(len(hd)))
|
|
hd = append(hd, ext...)
|
|
}
|
|
copy(hd[cursor:cursor+sourceSize], bytesutil.Uint64ToBytesLittleEndian(historyData.Source))
|
|
copy(hd[cursor+sourceSize:cursor+sourceSize+signingRootSize], historyData.SigningRoot)
|
|
return hd, nil
|
|
}
|
|
|
|
// AttestationHistoryForPubKeysV2 accepts an array of validator public keys and returns a mapping of corresponding attestation history.
|
|
func (store *Store) AttestationHistoryForPubKeysV2(ctx context.Context, publicKeys [][48]byte) (map[[48]byte]EncHistoryData, error) {
|
|
ctx, span := trace.StartSpan(ctx, "Validator.AttestationHistoryForPubKeysV2")
|
|
defer span.End()
|
|
|
|
if len(publicKeys) == 0 {
|
|
return make(map[[48]byte]EncHistoryData), nil
|
|
}
|
|
|
|
var err error
|
|
attestationHistoryForVals := make(map[[48]byte]EncHistoryData)
|
|
err = store.view(func(tx *bolt.Tx) error {
|
|
bucket := tx.Bucket(newHistoricAttestationsBucket)
|
|
for _, key := range publicKeys {
|
|
enc := bucket.Get(key[:])
|
|
var attestationHistory EncHistoryData
|
|
if len(enc) == 0 {
|
|
attestationHistory = NewAttestationHistoryArray(0)
|
|
} else {
|
|
attestationHistory = enc
|
|
}
|
|
attestationHistoryForVals[key] = attestationHistory
|
|
}
|
|
return nil
|
|
})
|
|
for pk, ah := range attestationHistoryForVals {
|
|
ehd := make(EncHistoryData, len(ah))
|
|
copy(ehd, ah)
|
|
attestationHistoryForVals[pk] = ehd
|
|
}
|
|
return attestationHistoryForVals, err
|
|
}
|
|
|
|
// SaveAttestationHistoryForPubKeysV2 saves the attestation histories for the requested validator public keys.
|
|
func (store *Store) SaveAttestationHistoryForPubKeysV2(ctx context.Context, historyByPubKeys map[[48]byte]EncHistoryData) error {
|
|
ctx, span := trace.StartSpan(ctx, "Validator.SaveAttestationHistoryForPubKeysV2")
|
|
defer span.End()
|
|
|
|
err := store.update(func(tx *bolt.Tx) error {
|
|
bucket := tx.Bucket(newHistoricAttestationsBucket)
|
|
for pubKey, encodedHistory := range historyByPubKeys {
|
|
if err := bucket.Put(pubKey[:], encodedHistory); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
return err
|
|
}
|
|
|
|
// SaveAttestationHistoryForPubKeyV2 saves the attestation history for the requested validator public key.
|
|
func (store *Store) SaveAttestationHistoryForPubKeyV2(ctx context.Context, pubKey [48]byte, history EncHistoryData) error {
|
|
ctx, span := trace.StartSpan(ctx, "Validator.SaveAttestationHistoryForPubKeyV2")
|
|
defer span.End()
|
|
err := store.update(func(tx *bolt.Tx) error {
|
|
bucket := tx.Bucket(newHistoricAttestationsBucket)
|
|
return bucket.Put(pubKey[:], history)
|
|
})
|
|
return err
|
|
}
|
|
|
|
// MigrateV2AttestationProtection import old attestation format data into the new attestation format
|
|
func (store *Store) MigrateV2AttestationProtection(ctx context.Context) error {
|
|
ctx, span := trace.StartSpan(ctx, "Validator.MigrateV2AttestationProtection")
|
|
defer span.End()
|
|
var allKeys [][48]byte
|
|
|
|
if err := store.db.View(func(tx *bolt.Tx) error {
|
|
attestationsBucket := tx.Bucket(historicAttestationsBucket)
|
|
if err := attestationsBucket.ForEach(func(pubKey, _ []byte) error {
|
|
var pubKeyCopy [48]byte
|
|
copy(pubKeyCopy[:], pubKey)
|
|
allKeys = append(allKeys, pubKeyCopy)
|
|
return nil
|
|
}); err != nil {
|
|
return errors.Wrapf(err, "could not retrieve attestations for source in %s", store.databasePath)
|
|
}
|
|
|
|
return nil
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
allKeys = removeDuplicateKeys(allKeys)
|
|
attMap, err := store.AttestationHistoryForPubKeys(ctx, allKeys)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "could not retrieve data for public keys %v", allKeys)
|
|
}
|
|
dataMap := make(map[[48]byte]EncHistoryData)
|
|
for key, atts := range attMap {
|
|
dataMap[key] = NewAttestationHistoryArray(atts.LatestEpochWritten)
|
|
dataMap[key], err = dataMap[key].SetLatestEpochWritten(ctx, atts.LatestEpochWritten)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "failed to set latest epoch while migrating attestations to v2")
|
|
}
|
|
for target, source := range atts.TargetToSource {
|
|
dataMap[key], err = dataMap[key].SetTargetData(ctx, target, &HistoryData{
|
|
Source: source,
|
|
SigningRoot: []byte{1},
|
|
})
|
|
if err != nil {
|
|
return errors.Wrapf(err, "failed to set target data while migrating attestations to v2")
|
|
}
|
|
}
|
|
}
|
|
err = store.SaveAttestationHistoryForPubKeysV2(ctx, dataMap)
|
|
return err
|
|
}
|
|
|
|
// MigrateV2AttestationProtectionDb exports old attestation protection data
|
|
// format to the new format and save the exported flag to database.
|
|
func (store *Store) MigrateV2AttestationProtectionDb(ctx context.Context) error {
|
|
ctx, span := trace.StartSpan(ctx, "Validator.MigrateV2AttestationProtectionDb")
|
|
defer span.End()
|
|
importAttestations, err := store.shouldMigrateAttestations()
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to analyze whether attestations should be imported")
|
|
}
|
|
if !importAttestations {
|
|
return nil
|
|
}
|
|
log.Info("Starting proposals protection db migration to v2...")
|
|
err = store.MigrateV2AttestationProtection(ctx)
|
|
if err != nil {
|
|
return errors.Wrap(err, "filed to import attestations")
|
|
}
|
|
err = store.db.Update(func(tx *bolt.Tx) error {
|
|
bucket := tx.Bucket(historicAttestationsBucket)
|
|
if bucket != nil {
|
|
if err := bucket.Put([]byte(attestationExported), []byte{1}); err != nil {
|
|
return errors.Wrap(err, "failed to set migrated attestations flag in db")
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
log.Info("Finished proposals protection db migration to v2")
|
|
return err
|
|
}
|
|
|
|
func (store *Store) shouldMigrateAttestations() (bool, error) {
|
|
var importAttestations bool
|
|
err := store.db.View(func(tx *bolt.Tx) error {
|
|
attestationBucket := tx.Bucket(historicAttestationsBucket)
|
|
if attestationBucket != nil && attestationBucket.Stats().KeyN != 0 {
|
|
if exported := attestationBucket.Get([]byte(attestationExported)); exported == nil {
|
|
importAttestations = true
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
return importAttestations, err
|
|
}
|