prysm-pulse/beacon-chain/state/stategen/replay.go
kasey 49826ebe28
Checkpoint Sync 2/5 - API support for retrieving weak subjectivity data (#10384)
also refactor replayer code for better reuse:
- separately expose stategen's canonical block func
- CanonicalHistory in ws api, NewCanonicalBuilder
- refactor CanonicalHistory into a ReplayerBuilder

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
2022-03-23 22:54:07 +00:00

257 lines
8.2 KiB
Go

package stategen
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
types "github.com/prysmaticlabs/eth2-types"
"github.com/prysmaticlabs/prysm/beacon-chain/core/altair"
"github.com/prysmaticlabs/prysm/beacon-chain/core/execution"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
prysmtime "github.com/prysmaticlabs/prysm/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
"github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/monitoring/tracing"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/block"
"github.com/prysmaticlabs/prysm/runtime/version"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
// ReplayBlocks replays the input blocks on the input state until the target slot is reached.
func (_ *State) ReplayBlocks(
ctx context.Context,
state state.BeaconState,
signed []block.SignedBeaconBlock,
targetSlot types.Slot,
) (state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "stateGen.ReplayBlocks")
defer span.End()
var err error
start := time.Now()
log.WithFields(logrus.Fields{
"startSlot": state.Slot(),
"endSlot": targetSlot,
"diff": targetSlot - state.Slot(),
}).Debug("Replaying state")
// The input block list is sorted in decreasing slots order.
if len(signed) > 0 {
for i := len(signed) - 1; i >= 0; i-- {
if ctx.Err() != nil {
return nil, ctx.Err()
}
if state.Slot() >= targetSlot {
break
}
// A node shouldn't process the block if the block slot is lower than the state slot.
if state.Slot() >= signed[i].Block().Slot() {
continue
}
state, err = executeStateTransitionStateGen(ctx, state, signed[i])
if err != nil {
return nil, err
}
}
}
// If there is skip slots at the end.
if targetSlot > state.Slot() {
state, err = ReplayProcessSlots(ctx, state, targetSlot)
if err != nil {
return nil, err
}
}
duration := time.Since(start)
log.WithFields(logrus.Fields{
"duration": duration,
}).Debug("Replayed state")
return state, nil
}
// LoadBlocks loads the blocks between start slot and end slot by recursively fetching from end block root.
// The Blocks are returned in slot-descending order.
func (s *State) LoadBlocks(ctx context.Context, startSlot, endSlot types.Slot, endBlockRoot [32]byte) ([]block.SignedBeaconBlock, error) {
// Nothing to load for invalid range.
if endSlot < startSlot {
return nil, fmt.Errorf("start slot %d >= end slot %d", startSlot, endSlot)
}
filter := filters.NewFilter().SetStartSlot(startSlot).SetEndSlot(endSlot)
blocks, blockRoots, err := s.beaconDB.Blocks(ctx, filter)
if err != nil {
return nil, err
}
// The retrieved blocks and block roots have to be in the same length given same filter.
if len(blocks) != len(blockRoots) {
return nil, errors.New("length of blocks and roots don't match")
}
// Return early if there's no block given the input.
length := len(blocks)
if length == 0 {
return nil, nil
}
// The last retrieved block root has to match input end block root.
// Covers the edge case if there's multiple blocks on the same end slot,
// the end root may not be the last index in `blockRoots`.
for length >= 3 && blocks[length-1].Block().Slot() == blocks[length-2].Block().Slot() && blockRoots[length-1] != endBlockRoot {
if ctx.Err() != nil {
return nil, ctx.Err()
}
length--
if blockRoots[length-2] == endBlockRoot {
length--
break
}
}
if blockRoots[length-1] != endBlockRoot {
return nil, errors.New("end block roots don't match")
}
filteredBlocks := []block.SignedBeaconBlock{blocks[length-1]}
// Starting from second to last index because the last block is already in the filtered block list.
for i := length - 2; i >= 0; i-- {
if ctx.Err() != nil {
return nil, ctx.Err()
}
b := filteredBlocks[len(filteredBlocks)-1]
if bytesutil.ToBytes32(b.Block().ParentRoot()) != blockRoots[i] {
continue
}
filteredBlocks = append(filteredBlocks, blocks[i])
}
return filteredBlocks, nil
}
// executeStateTransitionStateGen applies state transition on input historical state and block for state gen usages.
// There's no signature verification involved given state gen only works with stored block and state in DB.
// If the objects are already in stored in DB, one can omit redundant signature checks and ssz hashing calculations.
// WARNING: This method should not be used on an unverified new block.
func executeStateTransitionStateGen(
ctx context.Context,
state state.BeaconState,
signed block.SignedBeaconBlock,
) (state.BeaconState, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
if err := helpers.BeaconBlockIsNil(signed); err != nil {
return nil, err
}
ctx, span := trace.StartSpan(ctx, "stategen.ExecuteStateTransitionStateGen")
defer span.End()
var err error
// Execute per slots transition.
// Given this is for state gen, a node uses the version process slots without skip slots cache.
state, err = ReplayProcessSlots(ctx, state, signed.Block().Slot())
if err != nil {
return nil, errors.Wrap(err, "could not process slot")
}
// Execute per block transition.
// Given this is for state gen, a node only cares about the post state without proposer
// and randao signature verifications.
state, err = transition.ProcessBlockForStateRoot(ctx, state, signed)
if err != nil {
return nil, errors.Wrap(err, "could not process block")
}
return state, nil
}
// ReplayProcessSlots to process old slots for state gen usages.
// There's no skip slot cache involved given state gen only works with already stored block and state in DB.
// WARNING: This method should not be used for future slot.
func ReplayProcessSlots(ctx context.Context, state state.BeaconState, slot types.Slot) (state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "stategen.ReplayProcessSlots")
defer span.End()
if state == nil || state.IsNil() {
return nil, errUnknownState
}
if state.Slot() > slot {
err := fmt.Errorf("expected state.slot %d < slot %d", state.Slot(), slot)
return nil, err
}
if state.Slot() == slot {
return state, nil
}
var err error
for state.Slot() < slot {
state, err = transition.ProcessSlot(ctx, state)
if err != nil {
return nil, errors.Wrap(err, "could not process slot")
}
if prysmtime.CanProcessEpoch(state) {
switch state.Version() {
case version.Phase0:
state, err = transition.ProcessEpochPrecompute(ctx, state)
if err != nil {
tracing.AnnotateError(span, err)
return nil, errors.Wrap(err, "could not process epoch with optimizations")
}
case version.Altair, version.Bellatrix:
state, err = altair.ProcessEpoch(ctx, state)
if err != nil {
tracing.AnnotateError(span, err)
return nil, errors.Wrap(err, "could not process epoch")
}
default:
return nil, errors.New("beacon state should have a version")
}
}
if err := state.SetSlot(state.Slot() + 1); err != nil {
tracing.AnnotateError(span, err)
return nil, errors.Wrap(err, "failed to increment state slot")
}
if prysmtime.CanUpgradeToAltair(state.Slot()) {
state, err = altair.UpgradeToAltair(ctx, state)
if err != nil {
tracing.AnnotateError(span, err)
return nil, err
}
}
if prysmtime.CanUpgradeToBellatrix(state.Slot()) {
state, err = execution.UpgradeToBellatrix(ctx, state)
if err != nil {
tracing.AnnotateError(span, err)
return nil, err
}
}
}
return state, nil
}
// Given the start slot and the end slot, this returns the finalized beacon blocks in between.
// Since hot states don't have finalized blocks, this should ONLY be used for replaying cold state.
func (s *State) loadFinalizedBlocks(ctx context.Context, startSlot, endSlot types.Slot) ([]block.SignedBeaconBlock, error) {
f := filters.NewFilter().SetStartSlot(startSlot).SetEndSlot(endSlot)
bs, bRoots, err := s.beaconDB.Blocks(ctx, f)
if err != nil {
return nil, err
}
if len(bs) != len(bRoots) {
return nil, errors.New("length of blocks and roots don't match")
}
fbs := make([]block.SignedBeaconBlock, 0, len(bs))
for i := len(bs) - 1; i >= 0; i-- {
if s.beaconDB.IsFinalizedBlock(ctx, bRoots[i]) {
fbs = append(fbs, bs[i])
}
}
return fbs, nil
}