From df9a534826037ddac8dcaac0b1b470ce9fa8ecd4 Mon Sep 17 00:00:00 2001 From: terence tsao Date: Tue, 31 Mar 2020 16:54:24 -0700 Subject: [PATCH] Regen historical states for `new-state-mgmt` compatibility (#5261) --- beacon-chain/blockchain/service.go | 4 + beacon-chain/db/kv/BUILD.bazel | 5 +- beacon-chain/db/kv/archived_point.go | 16 ++ beacon-chain/db/kv/check_historical_state.go | 55 +++++ beacon-chain/db/kv/check_state.go | 33 --- beacon-chain/db/kv/kv.go | 3 +- beacon-chain/db/kv/regen_historical_states.go | 194 ++++++++++++++++++ shared/params/config.go | 2 + 8 files changed, 277 insertions(+), 35 deletions(-) create mode 100644 beacon-chain/db/kv/check_historical_state.go delete mode 100644 beacon-chain/db/kv/check_state.go create mode 100644 beacon-chain/db/kv/regen_historical_states.go diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index 560a2616e..486a7f98c 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -468,6 +468,10 @@ func (s *Service) pruneGarbageState(ctx context.Context, slot uint64) error { return err } + if err := s.beaconDB.SaveLastArchivedIndex(ctx, 0); err != nil { + return err + } + return nil } diff --git a/beacon-chain/db/kv/BUILD.bazel b/beacon-chain/db/kv/BUILD.bazel index 3028065ad..76c8dfd82 100644 --- a/beacon-chain/db/kv/BUILD.bazel +++ b/beacon-chain/db/kv/BUILD.bazel @@ -8,7 +8,7 @@ go_library( "attestations.go", "backup.go", "blocks.go", - "check_state.go", + "check_historical_state.go", "checkpoint.go", "deposit_contract.go", "encoding.go", @@ -16,6 +16,7 @@ go_library( "kv.go", "operations.go", "powchain.go", + "regen_historical_states.go", "schema.go", "slashings.go", "state.go", @@ -28,6 +29,7 @@ go_library( deps = [ "//beacon-chain/cache:go_default_library", "//beacon-chain/core/helpers:go_default_library", + "//beacon-chain/core/state:go_default_library", "//beacon-chain/db/filters:go_default_library", "//beacon-chain/db/iface:go_default_library", "//beacon-chain/state:go_default_library", @@ -35,6 +37,7 @@ go_library( "//proto/beacon/db:go_default_library", "//proto/beacon/p2p/v1:go_default_library", "//shared/bytesutil:go_default_library", + "//shared/cmd:go_default_library", "//shared/featureconfig:go_default_library", "//shared/params:go_default_library", "//shared/sliceutil:go_default_library", diff --git a/beacon-chain/db/kv/archived_point.go b/beacon-chain/db/kv/archived_point.go index 1afd568fd..f543bdd2c 100644 --- a/beacon-chain/db/kv/archived_point.go +++ b/beacon-chain/db/kv/archived_point.go @@ -2,6 +2,7 @@ package kv import ( "context" + "encoding/binary" "github.com/prysmaticlabs/prysm/shared/bytesutil" bolt "go.etcd.io/bbolt" @@ -29,6 +30,21 @@ func (k *Store) SaveLastArchivedIndex(ctx context.Context, index uint64) error { }) } +// LastArchivedIndex from the db. +func (k *Store) LastArchivedIndex(ctx context.Context) (uint64, error) { + ctx, span := trace.StartSpan(ctx, "BeaconDB.LastArchivedIndex") + defer span.End() + var index uint64 + err := k.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket(archivedIndexRootBucket) + b := bucket.Get(lastArchivedIndexKey) + index = binary.LittleEndian.Uint64(b) + return nil + }) + + return index, err +} + // LastArchivedIndexRoot from the db. func (k *Store) LastArchivedIndexRoot(ctx context.Context) [32]byte { ctx, span := trace.StartSpan(ctx, "BeaconDB.LastArchivedIndexRoot") diff --git a/beacon-chain/db/kv/check_historical_state.go b/beacon-chain/db/kv/check_historical_state.go new file mode 100644 index 000000000..6210cd400 --- /dev/null +++ b/beacon-chain/db/kv/check_historical_state.go @@ -0,0 +1,55 @@ +package kv + +import ( + "context" + + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/shared/cmd" + "github.com/prysmaticlabs/prysm/shared/featureconfig" + bolt "go.etcd.io/bbolt" +) + +var historicalStateDeletedKey = []byte("historical-states-deleted") + +func (kv *Store) ensureNewStateServiceCompatible(ctx context.Context) error { + if !featureconfig.Get().NewStateMgmt { + return kv.db.Update(func(tx *bolt.Tx) error { + bkt := tx.Bucket(newStateServiceCompatibleBucket) + return bkt.Put(historicalStateDeletedKey, []byte{0x01}) + }) + } + + var historicalStateDeleted bool + kv.db.View(func(tx *bolt.Tx) error { + bkt := tx.Bucket(newStateServiceCompatibleBucket) + v := bkt.Get(historicalStateDeletedKey) + historicalStateDeleted = len(v) == 1 && v[0] == 0x01 + return nil + }) + + regenHistoricalStatesConfirmed := false + var err error + if historicalStateDeleted { + actionText := "Looks like you stopped using --new-state-mgmt. To reuse it, the node will need " + + "to generate and save historical states. The process may take a while, - do you want to proceed? (Y/N)" + deniedText := "Historical states will not be generated. Please remove usage --new-state-mgmt" + + regenHistoricalStatesConfirmed, err = cmd.ConfirmAction(actionText, deniedText) + if err != nil { + return err + } + + if !regenHistoricalStatesConfirmed { + return errors.New("exiting... please do not run with flag --new-state-mgmt") + } + + if err := kv.regenHistoricalStates(ctx); err != nil { + return errors.Wrap(err, "could not regenerate historical states, please retry") + } + } + + return kv.db.Update(func(tx *bolt.Tx) error { + bkt := tx.Bucket(newStateServiceCompatibleBucket) + return bkt.Put(historicalStateDeletedKey, []byte{0x00}) + }) +} diff --git a/beacon-chain/db/kv/check_state.go b/beacon-chain/db/kv/check_state.go deleted file mode 100644 index f83fe1755..000000000 --- a/beacon-chain/db/kv/check_state.go +++ /dev/null @@ -1,33 +0,0 @@ -package kv - -import ( - "errors" - - "github.com/prysmaticlabs/prysm/shared/featureconfig" - bolt "go.etcd.io/bbolt" -) - -var historicalStateDeletedKey = []byte("historical-states-deleted") - -func (kv *Store) ensureNewStateServiceCompatible() error { - if !featureconfig.Get().NewStateMgmt { - return kv.db.Update(func(tx *bolt.Tx) error { - bkt := tx.Bucket(newStateServiceCompatibleBucket) - return bkt.Put(historicalStateDeletedKey, []byte{0x01}) - }) - } - - var historicalStateDeleted bool - kv.db.View(func(tx *bolt.Tx) error { - bkt := tx.Bucket(newStateServiceCompatibleBucket) - v := bkt.Get(historicalStateDeletedKey) - historicalStateDeleted = len(v) == 1 && v[0] == 0x01 - return nil - }) - - if historicalStateDeleted { - return errors.New("historical states were pruned in db, do not run with flag --new-state-mgmt") - } - - return nil -} diff --git a/beacon-chain/db/kv/kv.go b/beacon-chain/db/kv/kv.go index 4dff30d29..feb570bf0 100644 --- a/beacon-chain/db/kv/kv.go +++ b/beacon-chain/db/kv/kv.go @@ -1,6 +1,7 @@ package kv import ( + "context" "os" "path" "sync" @@ -120,7 +121,7 @@ func NewKVStore(dirPath string, stateSummaryCache *cache.StateSummaryCache) (*St return nil, err } - if err := kv.ensureNewStateServiceCompatible(); err != nil { + if err := kv.ensureNewStateServiceCompatible(context.Background()); err != nil { return nil, err } diff --git a/beacon-chain/db/kv/regen_historical_states.go b/beacon-chain/db/kv/regen_historical_states.go new file mode 100644 index 000000000..d0931578b --- /dev/null +++ b/beacon-chain/db/kv/regen_historical_states.go @@ -0,0 +1,194 @@ +package kv + +import ( + "context" + "fmt" + + "github.com/pkg/errors" + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/go-ssz" + "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" + transition "github.com/prysmaticlabs/prysm/beacon-chain/core/state" + "github.com/prysmaticlabs/prysm/beacon-chain/db/filters" + stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state" + "github.com/prysmaticlabs/prysm/shared/params" + log "github.com/sirupsen/logrus" + "go.opencensus.io/trace" +) + +func (kv *Store) regenHistoricalStates(ctx context.Context) error { + ctx, span := trace.StartSpan(ctx, "db.regenHistoricalStates") + defer span.End() + + genesisState, err := kv.GenesisState(ctx) + if err != nil { + return err + } + currentState := genesisState.Copy() + startSlot := genesisState.Slot() + + // Restore from last archived point if this process was previously interrupted. + slotsPerArchivedPoint := params.BeaconConfig().SlotsPerArchivedPoint + lastArchivedIndex, err := kv.LastArchivedIndex(ctx) + if err != nil { + return err + } + if lastArchivedIndex > 0 { + archivedIndexStart := lastArchivedIndex - 1 + wantedSlotBelow := archivedIndexStart*slotsPerArchivedPoint + 1 + states, err := kv.HighestSlotStatesBelow(ctx, wantedSlotBelow) + if err != nil { + return err + } + if len(states) == 0 { + return errors.New("states can't be empty") + } + if states[0] == nil { + return errors.New("nil last state") + } + currentState = states[0] + startSlot = currentState.Slot() + } + + lastSavedBlockArchivedIndex, err := kv.lastSavedBlockArchivedIndex(ctx) + if err != nil { + return err + } + for i := lastArchivedIndex; i <= lastSavedBlockArchivedIndex; i++ { + targetSlot := startSlot + slotsPerArchivedPoint + filter := filters.NewFilter().SetStartSlot(startSlot + 1).SetEndSlot(targetSlot) + blocks, err := kv.Blocks(ctx, filter) + if err != nil { + return err + } + + // Replay blocks and replay slots if necessary. + if len(blocks) > 0 { + for i := 0; i < len(blocks); i++ { + if blocks[i].Block.Slot == 0 { + continue + } + currentState, err = regenHistoricalStateTransition(ctx, currentState, blocks[i]) + if err != nil { + return err + } + } + } + if targetSlot > currentState.Slot() { + currentState, err = regenHistoricalStateProcessSlots(ctx, currentState, targetSlot) + if err != nil { + return err + } + } + + if len(blocks) > 0 { + // Save the historical root, state and highest index to the DB. + if helpers.IsEpochStart(currentState.Slot()) && currentState.Slot()%slotsPerArchivedPoint == 0 && blocks[len(blocks)-1].Block.Slot&slotsPerArchivedPoint == 0 { + if err := kv.saveArchivedInfo(ctx, currentState, blocks, i); err != nil { + return err + } + log.WithFields(log.Fields{ + "currentArchivedIndex/totalArchivedIndices": fmt.Sprintf("%d/%d", i, lastSavedBlockArchivedIndex), + "archivedStateSlot": currentState.Slot()}).Info("Saved historical state") + } + } + startSlot += slotsPerArchivedPoint + } + return nil +} + +// This runs state transition to recompute historical state. +func regenHistoricalStateTransition( + ctx context.Context, + state *stateTrie.BeaconState, + signed *ethpb.SignedBeaconBlock, +) (*stateTrie.BeaconState, error) { + if ctx.Err() != nil { + return nil, ctx.Err() + } + if signed == nil || signed.Block == nil { + return nil, errors.New("block can't be nil") + } + ctx, span := trace.StartSpan(ctx, "db.regenHistoricalStateTransition") + defer span.End() + var err error + state, err = regenHistoricalStateProcessSlots(ctx, state, signed.Block.Slot) + if err != nil { + return nil, errors.Wrap(err, "could not process slot") + } + state, err = transition.ProcessBlockForStateRoot(ctx, state, signed) + if err != nil { + return nil, errors.Wrap(err, "could not process block") + } + return state, nil +} + +// This runs slot transition to recompute historical state. +func regenHistoricalStateProcessSlots(ctx context.Context, state *stateTrie.BeaconState, slot uint64) (*stateTrie.BeaconState, error) { + ctx, span := trace.StartSpan(ctx, "db.regenHistoricalStateProcessSlots") + defer span.End() + if state == nil { + return nil, errors.New("state can't be nil") + } + if state.Slot() > slot { + err := fmt.Errorf("expected state.slot %d < slot %d", state.Slot(), slot) + return nil, err + } + if state.Slot() == slot { + return state, nil + } + for state.Slot() < slot { + state, err := transition.ProcessSlot(ctx, state) + if err != nil { + return nil, errors.Wrap(err, "could not process slot") + } + if transition.CanProcessEpoch(state) { + state, err = transition.ProcessEpochPrecompute(ctx, state) + if err != nil { + return nil, errors.Wrap(err, "could not process epoch with optimizations") + } + } + state.SetSlot(state.Slot() + 1) + } + return state, nil +} + +// This retrieves the last saved block's archived index. +func (kv *Store) lastSavedBlockArchivedIndex(ctx context.Context) (uint64, error) { + b, err := kv.HighestSlotBlocks(ctx) + if err != nil { + return 0, err + } + if len(b) == 0 { + return 0, errors.New("blocks can't be empty") + } + if b[0] == nil { + return 0, errors.New("nil last block") + } + lastSavedBlockSlot := b[0].Block.Slot + slotsPerArchivedPoint := params.BeaconConfig().SlotsPerArchivedPoint + lastSavedBlockArchivedIndex := lastSavedBlockSlot/slotsPerArchivedPoint - 1 + + return lastSavedBlockArchivedIndex, nil +} + +// This saved archived info (state, root, index) into the db. +func (kv *Store) saveArchivedInfo(ctx context.Context, + currentState *stateTrie.BeaconState, + blocks []*ethpb.SignedBeaconBlock, + archivedIndex uint64) error { + lastBlocksRoot, err := ssz.HashTreeRoot(blocks[len(blocks)-1].Block) + if err != nil { + return nil + } + if err := kv.SaveState(ctx, currentState, lastBlocksRoot); err != nil { + return err + } + if err := kv.SaveArchivedPointRoot(ctx, lastBlocksRoot, archivedIndex); err != nil { + return err + } + if err := kv.SaveLastArchivedIndex(ctx, archivedIndex); err != nil { + return err + } + return nil +} diff --git a/shared/params/config.go b/shared/params/config.go index 10027b725..e57ca44e4 100644 --- a/shared/params/config.go +++ b/shared/params/config.go @@ -94,6 +94,7 @@ type BeaconChainConfig struct { EmptySignature [96]byte // EmptySignature is used to represent a zeroed out BLS Signature. DefaultPageSize int // DefaultPageSize defines the default page size for RPC server request. MaxPeersToSync int // MaxPeersToSync describes the limit for number of peers in round robin sync. + SlotsPerArchivedPoint uint64 // SlotsPerArchivedPoint defines the number of slots per one archived point. // Slasher constants. WeakSubjectivityPeriod uint64 // WeakSubjectivityPeriod defines the time period expressed in number of epochs were proof of stake network should validate block headers and attestations for slashable events. @@ -185,6 +186,7 @@ var defaultBeaconConfig = &BeaconChainConfig{ EmptySignature: [96]byte{}, DefaultPageSize: 250, MaxPeersToSync: 15, + SlotsPerArchivedPoint: 256, // Slasher related values. WeakSubjectivityPeriod: 54000,