mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-22 03:30:35 +00:00
Regen historical states for new-state-mgmt
compatibility (#5261)
This commit is contained in:
parent
7e50c36725
commit
df9a534826
@ -468,6 +468,10 @@ func (s *Service) pruneGarbageState(ctx context.Context, slot uint64) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.beaconDB.SaveLastArchivedIndex(ctx, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -8,7 +8,7 @@ go_library(
|
||||
"attestations.go",
|
||||
"backup.go",
|
||||
"blocks.go",
|
||||
"check_state.go",
|
||||
"check_historical_state.go",
|
||||
"checkpoint.go",
|
||||
"deposit_contract.go",
|
||||
"encoding.go",
|
||||
@ -16,6 +16,7 @@ go_library(
|
||||
"kv.go",
|
||||
"operations.go",
|
||||
"powchain.go",
|
||||
"regen_historical_states.go",
|
||||
"schema.go",
|
||||
"slashings.go",
|
||||
"state.go",
|
||||
@ -28,6 +29,7 @@ go_library(
|
||||
deps = [
|
||||
"//beacon-chain/cache:go_default_library",
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
"//beacon-chain/core/state:go_default_library",
|
||||
"//beacon-chain/db/filters:go_default_library",
|
||||
"//beacon-chain/db/iface:go_default_library",
|
||||
"//beacon-chain/state:go_default_library",
|
||||
@ -35,6 +37,7 @@ go_library(
|
||||
"//proto/beacon/db:go_default_library",
|
||||
"//proto/beacon/p2p/v1:go_default_library",
|
||||
"//shared/bytesutil:go_default_library",
|
||||
"//shared/cmd:go_default_library",
|
||||
"//shared/featureconfig:go_default_library",
|
||||
"//shared/params:go_default_library",
|
||||
"//shared/sliceutil:go_default_library",
|
||||
|
@ -2,6 +2,7 @@ package kv
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
bolt "go.etcd.io/bbolt"
|
||||
@ -29,6 +30,21 @@ func (k *Store) SaveLastArchivedIndex(ctx context.Context, index uint64) error {
|
||||
})
|
||||
}
|
||||
|
||||
// LastArchivedIndex from the db.
|
||||
func (k *Store) LastArchivedIndex(ctx context.Context) (uint64, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "BeaconDB.LastArchivedIndex")
|
||||
defer span.End()
|
||||
var index uint64
|
||||
err := k.db.Update(func(tx *bolt.Tx) error {
|
||||
bucket := tx.Bucket(archivedIndexRootBucket)
|
||||
b := bucket.Get(lastArchivedIndexKey)
|
||||
index = binary.LittleEndian.Uint64(b)
|
||||
return nil
|
||||
})
|
||||
|
||||
return index, err
|
||||
}
|
||||
|
||||
// LastArchivedIndexRoot from the db.
|
||||
func (k *Store) LastArchivedIndexRoot(ctx context.Context) [32]byte {
|
||||
ctx, span := trace.StartSpan(ctx, "BeaconDB.LastArchivedIndexRoot")
|
||||
|
55
beacon-chain/db/kv/check_historical_state.go
Normal file
55
beacon-chain/db/kv/check_historical_state.go
Normal file
@ -0,0 +1,55 @@
|
||||
package kv
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/shared/cmd"
|
||||
"github.com/prysmaticlabs/prysm/shared/featureconfig"
|
||||
bolt "go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
var historicalStateDeletedKey = []byte("historical-states-deleted")
|
||||
|
||||
func (kv *Store) ensureNewStateServiceCompatible(ctx context.Context) error {
|
||||
if !featureconfig.Get().NewStateMgmt {
|
||||
return kv.db.Update(func(tx *bolt.Tx) error {
|
||||
bkt := tx.Bucket(newStateServiceCompatibleBucket)
|
||||
return bkt.Put(historicalStateDeletedKey, []byte{0x01})
|
||||
})
|
||||
}
|
||||
|
||||
var historicalStateDeleted bool
|
||||
kv.db.View(func(tx *bolt.Tx) error {
|
||||
bkt := tx.Bucket(newStateServiceCompatibleBucket)
|
||||
v := bkt.Get(historicalStateDeletedKey)
|
||||
historicalStateDeleted = len(v) == 1 && v[0] == 0x01
|
||||
return nil
|
||||
})
|
||||
|
||||
regenHistoricalStatesConfirmed := false
|
||||
var err error
|
||||
if historicalStateDeleted {
|
||||
actionText := "Looks like you stopped using --new-state-mgmt. To reuse it, the node will need " +
|
||||
"to generate and save historical states. The process may take a while, - do you want to proceed? (Y/N)"
|
||||
deniedText := "Historical states will not be generated. Please remove usage --new-state-mgmt"
|
||||
|
||||
regenHistoricalStatesConfirmed, err = cmd.ConfirmAction(actionText, deniedText)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !regenHistoricalStatesConfirmed {
|
||||
return errors.New("exiting... please do not run with flag --new-state-mgmt")
|
||||
}
|
||||
|
||||
if err := kv.regenHistoricalStates(ctx); err != nil {
|
||||
return errors.Wrap(err, "could not regenerate historical states, please retry")
|
||||
}
|
||||
}
|
||||
|
||||
return kv.db.Update(func(tx *bolt.Tx) error {
|
||||
bkt := tx.Bucket(newStateServiceCompatibleBucket)
|
||||
return bkt.Put(historicalStateDeletedKey, []byte{0x00})
|
||||
})
|
||||
}
|
@ -1,33 +0,0 @@
|
||||
package kv
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/shared/featureconfig"
|
||||
bolt "go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
var historicalStateDeletedKey = []byte("historical-states-deleted")
|
||||
|
||||
func (kv *Store) ensureNewStateServiceCompatible() error {
|
||||
if !featureconfig.Get().NewStateMgmt {
|
||||
return kv.db.Update(func(tx *bolt.Tx) error {
|
||||
bkt := tx.Bucket(newStateServiceCompatibleBucket)
|
||||
return bkt.Put(historicalStateDeletedKey, []byte{0x01})
|
||||
})
|
||||
}
|
||||
|
||||
var historicalStateDeleted bool
|
||||
kv.db.View(func(tx *bolt.Tx) error {
|
||||
bkt := tx.Bucket(newStateServiceCompatibleBucket)
|
||||
v := bkt.Get(historicalStateDeletedKey)
|
||||
historicalStateDeleted = len(v) == 1 && v[0] == 0x01
|
||||
return nil
|
||||
})
|
||||
|
||||
if historicalStateDeleted {
|
||||
return errors.New("historical states were pruned in db, do not run with flag --new-state-mgmt")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
package kv
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
@ -120,7 +121,7 @@ func NewKVStore(dirPath string, stateSummaryCache *cache.StateSummaryCache) (*St
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := kv.ensureNewStateServiceCompatible(); err != nil {
|
||||
if err := kv.ensureNewStateServiceCompatible(context.Background()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
194
beacon-chain/db/kv/regen_historical_states.go
Normal file
194
beacon-chain/db/kv/regen_historical_states.go
Normal file
@ -0,0 +1,194 @@
|
||||
package kv
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
"github.com/prysmaticlabs/go-ssz"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
transition "github.com/prysmaticlabs/prysm/beacon-chain/core/state"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
|
||||
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
func (kv *Store) regenHistoricalStates(ctx context.Context) error {
|
||||
ctx, span := trace.StartSpan(ctx, "db.regenHistoricalStates")
|
||||
defer span.End()
|
||||
|
||||
genesisState, err := kv.GenesisState(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
currentState := genesisState.Copy()
|
||||
startSlot := genesisState.Slot()
|
||||
|
||||
// Restore from last archived point if this process was previously interrupted.
|
||||
slotsPerArchivedPoint := params.BeaconConfig().SlotsPerArchivedPoint
|
||||
lastArchivedIndex, err := kv.LastArchivedIndex(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if lastArchivedIndex > 0 {
|
||||
archivedIndexStart := lastArchivedIndex - 1
|
||||
wantedSlotBelow := archivedIndexStart*slotsPerArchivedPoint + 1
|
||||
states, err := kv.HighestSlotStatesBelow(ctx, wantedSlotBelow)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(states) == 0 {
|
||||
return errors.New("states can't be empty")
|
||||
}
|
||||
if states[0] == nil {
|
||||
return errors.New("nil last state")
|
||||
}
|
||||
currentState = states[0]
|
||||
startSlot = currentState.Slot()
|
||||
}
|
||||
|
||||
lastSavedBlockArchivedIndex, err := kv.lastSavedBlockArchivedIndex(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i := lastArchivedIndex; i <= lastSavedBlockArchivedIndex; i++ {
|
||||
targetSlot := startSlot + slotsPerArchivedPoint
|
||||
filter := filters.NewFilter().SetStartSlot(startSlot + 1).SetEndSlot(targetSlot)
|
||||
blocks, err := kv.Blocks(ctx, filter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Replay blocks and replay slots if necessary.
|
||||
if len(blocks) > 0 {
|
||||
for i := 0; i < len(blocks); i++ {
|
||||
if blocks[i].Block.Slot == 0 {
|
||||
continue
|
||||
}
|
||||
currentState, err = regenHistoricalStateTransition(ctx, currentState, blocks[i])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
if targetSlot > currentState.Slot() {
|
||||
currentState, err = regenHistoricalStateProcessSlots(ctx, currentState, targetSlot)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if len(blocks) > 0 {
|
||||
// Save the historical root, state and highest index to the DB.
|
||||
if helpers.IsEpochStart(currentState.Slot()) && currentState.Slot()%slotsPerArchivedPoint == 0 && blocks[len(blocks)-1].Block.Slot&slotsPerArchivedPoint == 0 {
|
||||
if err := kv.saveArchivedInfo(ctx, currentState, blocks, i); err != nil {
|
||||
return err
|
||||
}
|
||||
log.WithFields(log.Fields{
|
||||
"currentArchivedIndex/totalArchivedIndices": fmt.Sprintf("%d/%d", i, lastSavedBlockArchivedIndex),
|
||||
"archivedStateSlot": currentState.Slot()}).Info("Saved historical state")
|
||||
}
|
||||
}
|
||||
startSlot += slotsPerArchivedPoint
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// This runs state transition to recompute historical state.
|
||||
func regenHistoricalStateTransition(
|
||||
ctx context.Context,
|
||||
state *stateTrie.BeaconState,
|
||||
signed *ethpb.SignedBeaconBlock,
|
||||
) (*stateTrie.BeaconState, error) {
|
||||
if ctx.Err() != nil {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
if signed == nil || signed.Block == nil {
|
||||
return nil, errors.New("block can't be nil")
|
||||
}
|
||||
ctx, span := trace.StartSpan(ctx, "db.regenHistoricalStateTransition")
|
||||
defer span.End()
|
||||
var err error
|
||||
state, err = regenHistoricalStateProcessSlots(ctx, state, signed.Block.Slot)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not process slot")
|
||||
}
|
||||
state, err = transition.ProcessBlockForStateRoot(ctx, state, signed)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not process block")
|
||||
}
|
||||
return state, nil
|
||||
}
|
||||
|
||||
// This runs slot transition to recompute historical state.
|
||||
func regenHistoricalStateProcessSlots(ctx context.Context, state *stateTrie.BeaconState, slot uint64) (*stateTrie.BeaconState, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "db.regenHistoricalStateProcessSlots")
|
||||
defer span.End()
|
||||
if state == nil {
|
||||
return nil, errors.New("state can't be nil")
|
||||
}
|
||||
if state.Slot() > slot {
|
||||
err := fmt.Errorf("expected state.slot %d < slot %d", state.Slot(), slot)
|
||||
return nil, err
|
||||
}
|
||||
if state.Slot() == slot {
|
||||
return state, nil
|
||||
}
|
||||
for state.Slot() < slot {
|
||||
state, err := transition.ProcessSlot(ctx, state)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not process slot")
|
||||
}
|
||||
if transition.CanProcessEpoch(state) {
|
||||
state, err = transition.ProcessEpochPrecompute(ctx, state)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not process epoch with optimizations")
|
||||
}
|
||||
}
|
||||
state.SetSlot(state.Slot() + 1)
|
||||
}
|
||||
return state, nil
|
||||
}
|
||||
|
||||
// This retrieves the last saved block's archived index.
|
||||
func (kv *Store) lastSavedBlockArchivedIndex(ctx context.Context) (uint64, error) {
|
||||
b, err := kv.HighestSlotBlocks(ctx)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if len(b) == 0 {
|
||||
return 0, errors.New("blocks can't be empty")
|
||||
}
|
||||
if b[0] == nil {
|
||||
return 0, errors.New("nil last block")
|
||||
}
|
||||
lastSavedBlockSlot := b[0].Block.Slot
|
||||
slotsPerArchivedPoint := params.BeaconConfig().SlotsPerArchivedPoint
|
||||
lastSavedBlockArchivedIndex := lastSavedBlockSlot/slotsPerArchivedPoint - 1
|
||||
|
||||
return lastSavedBlockArchivedIndex, nil
|
||||
}
|
||||
|
||||
// This saved archived info (state, root, index) into the db.
|
||||
func (kv *Store) saveArchivedInfo(ctx context.Context,
|
||||
currentState *stateTrie.BeaconState,
|
||||
blocks []*ethpb.SignedBeaconBlock,
|
||||
archivedIndex uint64) error {
|
||||
lastBlocksRoot, err := ssz.HashTreeRoot(blocks[len(blocks)-1].Block)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
if err := kv.SaveState(ctx, currentState, lastBlocksRoot); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := kv.SaveArchivedPointRoot(ctx, lastBlocksRoot, archivedIndex); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := kv.SaveLastArchivedIndex(ctx, archivedIndex); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
@ -94,6 +94,7 @@ type BeaconChainConfig struct {
|
||||
EmptySignature [96]byte // EmptySignature is used to represent a zeroed out BLS Signature.
|
||||
DefaultPageSize int // DefaultPageSize defines the default page size for RPC server request.
|
||||
MaxPeersToSync int // MaxPeersToSync describes the limit for number of peers in round robin sync.
|
||||
SlotsPerArchivedPoint uint64 // SlotsPerArchivedPoint defines the number of slots per one archived point.
|
||||
|
||||
// Slasher constants.
|
||||
WeakSubjectivityPeriod uint64 // WeakSubjectivityPeriod defines the time period expressed in number of epochs were proof of stake network should validate block headers and attestations for slashable events.
|
||||
@ -185,6 +186,7 @@ var defaultBeaconConfig = &BeaconChainConfig{
|
||||
EmptySignature: [96]byte{},
|
||||
DefaultPageSize: 250,
|
||||
MaxPeersToSync: 15,
|
||||
SlotsPerArchivedPoint: 256,
|
||||
|
||||
// Slasher related values.
|
||||
WeakSubjectivityPeriod: 54000,
|
||||
|
Loading…
Reference in New Issue
Block a user