mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-27 21:57:16 +00:00
94155191b7
* ensure single point of entry for ctx and cancelation of expensive func
243 lines
7.3 KiB
Go
243 lines
7.3 KiB
Go
package kv
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"runtime"
|
|
|
|
lru "github.com/hashicorp/golang-lru"
|
|
"github.com/pkg/errors"
|
|
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
|
"github.com/prysmaticlabs/go-ssz"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
|
transition "github.com/prysmaticlabs/prysm/beacon-chain/core/state"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
|
|
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
|
|
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
|
"github.com/prysmaticlabs/prysm/shared/params"
|
|
log "github.com/sirupsen/logrus"
|
|
"go.opencensus.io/trace"
|
|
)
|
|
|
|
// Using max possible size to avoid using DB to save and retrieve pre state (slow)
|
|
// The size is 80 because block at slot 43772 built on top of block at slot 43693.
|
|
// That is the worst case.
|
|
const historicalStatesSize = 80
|
|
|
|
func (kv *Store) regenHistoricalStates(ctx context.Context) error {
|
|
ctx, span := trace.StartSpan(ctx, "db.regenHistoricalStates")
|
|
defer span.End()
|
|
|
|
genesisState, err := kv.GenesisState(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
currentState := genesisState.Copy()
|
|
startSlot := genesisState.Slot()
|
|
|
|
// Restore from last archived point if this process was previously interrupted.
|
|
slotsPerArchivedPoint := params.BeaconConfig().SlotsPerArchivedPoint
|
|
lastArchivedIndex, err := kv.LastArchivedIndex(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if lastArchivedIndex > 0 {
|
|
archivedIndexStart := lastArchivedIndex - 1
|
|
wantedSlotBelow := archivedIndexStart*slotsPerArchivedPoint + 1
|
|
states, err := kv.HighestSlotStatesBelow(ctx, wantedSlotBelow)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(states) == 0 {
|
|
return errors.New("states can't be empty")
|
|
}
|
|
if states[0] == nil {
|
|
return errors.New("nil last state")
|
|
}
|
|
currentState = states[0]
|
|
startSlot = currentState.Slot()
|
|
}
|
|
|
|
lastSavedBlockArchivedIndex, err := kv.lastSavedBlockArchivedIndex(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
cacheState, err := lru.New(historicalStatesSize)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for i := lastArchivedIndex; i <= lastSavedBlockArchivedIndex; i++ {
|
|
// This is an expensive operation, so we check if the context was canceled
|
|
// at any point in the iteration.
|
|
if err := ctx.Err(); err != nil {
|
|
return err
|
|
}
|
|
targetSlot := startSlot + slotsPerArchivedPoint
|
|
filter := filters.NewFilter().SetStartSlot(startSlot + 1).SetEndSlot(targetSlot)
|
|
blocks, err := kv.Blocks(ctx, filter)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Replay blocks and replay slots if necessary.
|
|
if len(blocks) > 0 {
|
|
for i := 0; i < len(blocks); i++ {
|
|
if blocks[i].Block.Slot == 0 {
|
|
continue
|
|
}
|
|
|
|
var preState *stateTrie.BeaconState
|
|
item, ok := cacheState.Get(bytesutil.ToBytes32(blocks[i].Block.ParentRoot))
|
|
if !ok {
|
|
preState, err = kv.State(ctx, bytesutil.ToBytes32(blocks[i].Block.ParentRoot))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
preState = item.(*stateTrie.BeaconState).Copy()
|
|
}
|
|
if preState == nil {
|
|
return errors.New("pre state can't be nil")
|
|
}
|
|
|
|
currentState, err = regenHistoricalStateTransition(ctx, preState.Copy(), blocks[i])
|
|
if err != nil {
|
|
return errors.Wrap(err, "could not regenerate historical state transition")
|
|
}
|
|
|
|
r, err := ssz.HashTreeRoot(blocks[i].Block)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
cacheState.Add(r, currentState)
|
|
}
|
|
}
|
|
if targetSlot > currentState.Slot() {
|
|
currentState, err = regenHistoricalStateProcessSlots(ctx, currentState, targetSlot)
|
|
if err != nil {
|
|
return errors.Wrap(err, "could not regenerate historical process slot")
|
|
}
|
|
}
|
|
|
|
if len(blocks) > 0 {
|
|
// Save the historical root, state and highest index to the DB.
|
|
if helpers.IsEpochStart(currentState.Slot()) && currentState.Slot()%slotsPerArchivedPoint == 0 && blocks[len(blocks)-1].Block.Slot&slotsPerArchivedPoint == 0 {
|
|
if err := kv.saveArchivedInfo(ctx, currentState.Copy(), blocks, i); err != nil {
|
|
return err
|
|
}
|
|
log.WithFields(log.Fields{
|
|
"currentArchivedIndex/totalArchivedIndices": fmt.Sprintf("%d/%d", i, lastSavedBlockArchivedIndex),
|
|
"archivedStateSlot": currentState.Slot()}).Info("Saved historical state")
|
|
}
|
|
}
|
|
startSlot += slotsPerArchivedPoint
|
|
}
|
|
|
|
// Flush the cache, the cached states never be used again.
|
|
cacheState.Purge()
|
|
|
|
// Manually garbage collect as previous cache will never be used again.
|
|
runtime.GC()
|
|
|
|
return nil
|
|
}
|
|
|
|
// This runs state transition to recompute historical state.
|
|
func regenHistoricalStateTransition(
|
|
ctx context.Context,
|
|
state *stateTrie.BeaconState,
|
|
signed *ethpb.SignedBeaconBlock,
|
|
) (*stateTrie.BeaconState, error) {
|
|
if ctx.Err() != nil {
|
|
return nil, ctx.Err()
|
|
}
|
|
if signed == nil || signed.Block == nil {
|
|
return nil, errors.New("block can't be nil")
|
|
}
|
|
ctx, span := trace.StartSpan(ctx, "db.regenHistoricalStateTransition")
|
|
defer span.End()
|
|
var err error
|
|
state, err = regenHistoricalStateProcessSlots(ctx, state, signed.Block.Slot)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "could not process slot")
|
|
}
|
|
state, err = transition.ProcessBlockForStateRoot(ctx, state, signed)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "could not process block")
|
|
}
|
|
return state, nil
|
|
}
|
|
|
|
// This runs slot transition to recompute historical state.
|
|
func regenHistoricalStateProcessSlots(ctx context.Context, state *stateTrie.BeaconState, slot uint64) (*stateTrie.BeaconState, error) {
|
|
ctx, span := trace.StartSpan(ctx, "db.regenHistoricalStateProcessSlots")
|
|
defer span.End()
|
|
if state == nil {
|
|
return nil, errors.New("state can't be nil")
|
|
}
|
|
if state.Slot() > slot {
|
|
err := fmt.Errorf("expected state.slot %d < slot %d", state.Slot(), slot)
|
|
return nil, err
|
|
}
|
|
if state.Slot() == slot {
|
|
return state, nil
|
|
}
|
|
for state.Slot() < slot {
|
|
state, err := transition.ProcessSlot(ctx, state)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "could not process slot")
|
|
}
|
|
if transition.CanProcessEpoch(state) {
|
|
state, err = transition.ProcessEpochPrecompute(ctx, state)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "could not process epoch with optimizations")
|
|
}
|
|
}
|
|
if err := state.SetSlot(state.Slot() + 1); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return state, nil
|
|
}
|
|
|
|
// This retrieves the last saved block's archived index.
|
|
func (kv *Store) lastSavedBlockArchivedIndex(ctx context.Context) (uint64, error) {
|
|
b, err := kv.HighestSlotBlocks(ctx)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if len(b) == 0 {
|
|
return 0, errors.New("blocks can't be empty")
|
|
}
|
|
if b[0] == nil {
|
|
return 0, errors.New("nil last block")
|
|
}
|
|
lastSavedBlockSlot := b[0].Block.Slot
|
|
slotsPerArchivedPoint := params.BeaconConfig().SlotsPerArchivedPoint
|
|
lastSavedBlockArchivedIndex := lastSavedBlockSlot/slotsPerArchivedPoint - 1
|
|
|
|
return lastSavedBlockArchivedIndex, nil
|
|
}
|
|
|
|
// This saved archived info (state, root, index) into the db.
|
|
func (kv *Store) saveArchivedInfo(ctx context.Context,
|
|
currentState *stateTrie.BeaconState,
|
|
blocks []*ethpb.SignedBeaconBlock,
|
|
archivedIndex uint64) error {
|
|
lastBlocksRoot, err := ssz.HashTreeRoot(blocks[len(blocks)-1].Block)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
if err := kv.SaveState(ctx, currentState, lastBlocksRoot); err != nil {
|
|
return err
|
|
}
|
|
if err := kv.SaveArchivedPointRoot(ctx, lastBlocksRoot, archivedIndex); err != nil {
|
|
return err
|
|
}
|
|
if err := kv.SaveLastArchivedIndex(ctx, archivedIndex); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|