Use cache state during init sync (#4199)

* Initial sync state cache
* Gaz
* Gaz
* Don't save head root
* Fix config validator
* Uncomment save head
* Merge branch 'master' into initial-sync-no-verify
* Minor refactor
* Merge branch 'initial-sync-no-verify' of https://github.com/prysmaticlabs/prysm into initial-sync-no-verify
* Merge branch 'master' into initial-sync-no-verify
* Tests
* Merge branch 'initial-sync-no-verify' of https://github.com/prysmaticlabs/prysm into initial-sync-no-verify
* Merge branch 'master' into initial-sync-no-verify
* Add lock
* Merge branch 'initial-sync-no-verify' of https://github.com/prysmaticlabs/prysm into initial-sync-no-verify
* Tests
* Removed save head
* One more test
* Merge branch 'master' into initial-sync-no-verify
* Raul's feedback
* Merge branch 'initial-sync-no-verify' of https://github.com/prysmaticlabs/prysm into initial-sync-no-verify
* Comment
* Gazelle
* Merge branch 'master' of https://github.com/prysmaticlabs/prysm into initial-sync-no-verify
* revert
* Update beacon-chain/blockchain/service.go

Co-Authored-By: Ivan Martinez <ivanthegreatdev@gmail.com>
* Merge branch 'master' of https://github.com/prysmaticlabs/prysm into initial-sync-no-verify
* Merge branch 'initial-sync-no-verify' of https://github.com/prysmaticlabs/prysm into initial-sync-no-verify
* Fixed test
* Fixed feature flag
* Merge branch 'master' into initial-sync-no-verify
* Fixed cache gensis state test
* Merge branch 'initial-sync-no-verify' of https://github.com/prysmaticlabs/prysm into initial-sync-no-verify
This commit is contained in:
terence tsao 2019-12-05 16:49:19 -08:00 committed by prylabs-bulldozer[bot]
parent ae2b2e74ca
commit 9d4c7cb4f7
7 changed files with 269 additions and 4 deletions

View File

@ -6,6 +6,7 @@ import (
"encoding/hex"
"fmt"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
@ -18,6 +19,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/traceutil"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
@ -154,8 +156,11 @@ func (s *Store) OnBlockInitialSyncStateTransition(ctx context.Context, b *ethpb.
ctx, span := trace.StartSpan(ctx, "forkchoice.onBlock")
defer span.End()
s.initSyncStateLock.Lock()
defer s.initSyncStateLock.Unlock()
// Retrieve incoming block's pre state.
preState, err := s.getBlockPreState(ctx, b)
preState, err := s.cachedPreState(ctx, b)
if err != nil {
return err
}
@ -175,8 +180,13 @@ func (s *Store) OnBlockInitialSyncStateTransition(ctx context.Context, b *ethpb.
if err != nil {
return errors.Wrapf(err, "could not get signing root of block %d", b.Slot)
}
if err := s.db.SaveState(ctx, postState, root); err != nil {
return errors.Wrap(err, "could not save state")
if featureconfig.Get().InitSyncCacheState {
s.initSyncState[root] = postState
} else {
if err := s.db.SaveState(ctx, postState, root); err != nil {
return errors.Wrap(err, "could not save state")
}
}
// Update justified check point.
@ -205,6 +215,10 @@ func (s *Store) OnBlockInitialSyncStateTransition(ctx context.Context, b *ethpb.
}
}
if err := s.saveInitState(ctx, postState); err != nil {
return errors.Wrap(err, "could not save init sync finalized state")
}
if err := s.db.SaveFinalizedCheckpoint(ctx, postState.FinalizedCheckpoint); err != nil {
return errors.Wrap(err, "could not save finalized checkpoint")
}
@ -465,3 +479,51 @@ func (s *Store) rmStatesOlderThanLastFinalized(ctx context.Context, startSlot ui
return nil
}
// This receives cached state in memory for initial sync only during initial sync.
func (s *Store) cachedPreState(ctx context.Context, b *ethpb.BeaconBlock) (*pb.BeaconState, error) {
if featureconfig.Get().InitSyncCacheState {
preState := s.initSyncState[bytesutil.ToBytes32(b.ParentRoot)]
var err error
if preState == nil {
preState, err = s.db.State(ctx, bytesutil.ToBytes32(b.ParentRoot))
if err != nil {
return nil, errors.Wrapf(err, "could not get pre state for slot %d", b.Slot)
}
if preState == nil {
return nil, fmt.Errorf("pre state of slot %d does not exist", b.Slot)
}
}
return proto.Clone(preState).(*pb.BeaconState), nil
}
preState, err := s.db.State(ctx, bytesutil.ToBytes32(b.ParentRoot))
if err != nil {
return nil, errors.Wrapf(err, "could not get pre state for slot %d", b.Slot)
}
if preState == nil {
return nil, fmt.Errorf("pre state of slot %d does not exist", b.Slot)
}
return preState, nil
}
// This saves every finalized state in DB during initial sync, needed as part of optimization to
// use cache state during initial sync in case of restart.
func (s *Store) saveInitState(ctx context.Context, state *pb.BeaconState) error {
if !featureconfig.Get().InitSyncCacheState {
return nil
}
finalizedRoot := bytesutil.ToBytes32(state.FinalizedCheckpoint.Root)
fs := s.initSyncState[finalizedRoot]
if err := s.db.SaveState(ctx, fs, finalizedRoot); err != nil {
return errors.Wrap(err, "could not save state")
}
for r, oldState := range s.initSyncState {
if oldState.Slot < state.FinalizedCheckpoint.Epoch*params.BeaconConfig().SlotsPerEpoch {
delete(s.initSyncState, r)
}
}
return nil
}

View File

@ -369,3 +369,113 @@ func TestRemoveStateSinceLastFinalized_EmptyStartSlot(t *testing.T) {
t.Error("Did not delete state for start slot")
}
}
func TestCachedPreState_CanGetFromCache(t *testing.T) {
ctx := context.Background()
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
store := NewForkChoiceService(ctx, db)
s := &pb.BeaconState{Slot: 1}
r := [32]byte{'A'}
b := &ethpb.BeaconBlock{Slot: 1, ParentRoot: r[:]}
store.initSyncState[r] = s
wanted := "pre state of slot 1 does not exist"
if _, err := store.cachedPreState(ctx, b); !strings.Contains(err.Error(), wanted) {
t.Fatal("Not expected error")
}
}
func TestCachedPreState_CanGetFromCacheWithFeature(t *testing.T) {
ctx := context.Background()
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
config := &featureconfig.Flags{
InitSyncCacheState: true,
}
featureconfig.Init(config)
store := NewForkChoiceService(ctx, db)
s := &pb.BeaconState{Slot: 1}
r := [32]byte{'A'}
b := &ethpb.BeaconBlock{Slot: 1, ParentRoot: r[:]}
store.initSyncState[r] = s
received, err := store.cachedPreState(ctx, b)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(s, received) {
t.Error("cached state not the same")
}
}
func TestCachedPreState_CanGetFromDB(t *testing.T) {
ctx := context.Background()
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
store := NewForkChoiceService(ctx, db)
r := [32]byte{'A'}
b := &ethpb.BeaconBlock{Slot: 1, ParentRoot: r[:]}
_, err := store.cachedPreState(ctx, b)
wanted := "pre state of slot 1 does not exist"
if err.Error() != wanted {
t.Error("Did not get wanted error")
}
s := &pb.BeaconState{Slot: 1}
store.db.SaveState(ctx, s, r)
received, err := store.cachedPreState(ctx, b)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(s, received) {
t.Error("cached state not the same")
}
}
func TestSaveInitState_CanSaveDelete(t *testing.T) {
ctx := context.Background()
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
store := NewForkChoiceService(ctx, db)
config := &featureconfig.Flags{
InitSyncCacheState: true,
}
featureconfig.Init(config)
for i := uint64(0); i < 64; i++ {
b := &ethpb.BeaconBlock{Slot: i}
s := &pb.BeaconState{Slot: i}
r, _ := ssz.SigningRoot(b)
store.initSyncState[r] = s
}
// Set finalized root as slot 32
finalizedRoot, _ := ssz.SigningRoot(&ethpb.BeaconBlock{Slot: 32})
if err := store.saveInitState(ctx, &pb.BeaconState{FinalizedCheckpoint: &ethpb.Checkpoint{
Epoch: 1, Root: finalizedRoot[:]}}); err != nil {
t.Fatal(err)
}
// Verify finalized state is saved in DB
finalizedState, err := store.db.State(ctx, finalizedRoot)
if err != nil {
t.Fatal(err)
}
if finalizedState == nil {
t.Error("finalized state can't be nil")
}
// Verify cached state is properly pruned
if len(store.initSyncState) != int(params.BeaconConfig().SlotsPerEpoch) {
t.Errorf("wanted: %d, got: %d", len(store.initSyncState), params.BeaconConfig().SlotsPerEpoch)
}
}

View File

@ -8,12 +8,15 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/params"
"go.opencensus.io/trace"
)
@ -44,6 +47,8 @@ type Store struct {
seenAttsLock sync.Mutex
latestVoteMap map[uint64]*pb.ValidatorLatestVote
voteLock sync.RWMutex
initSyncState map[[32]byte]*pb.BeaconState
initSyncStateLock sync.RWMutex
}
// NewForkChoiceService instantiates a new service instance that will
@ -57,6 +62,7 @@ func NewForkChoiceService(ctx context.Context, db db.Database) *Store {
checkpointState: cache.NewCheckpointStateCache(),
latestVoteMap: make(map[uint64]*pb.ValidatorLatestVote),
seenAtts: make(map[[32]byte]bool),
initSyncState: make(map[[32]byte]*pb.BeaconState),
}
}
@ -98,6 +104,34 @@ func (s *Store) GenesisStore(
return errors.Wrap(err, "could not save genesis state in check point cache")
}
if err := s.cacheGenesisState(ctx); err != nil {
return errors.Wrap(err, "could not cache initial sync state")
}
return nil
}
// This sets up gensis for initial sync state cache.
func (s *Store) cacheGenesisState(ctx context.Context) error {
if !featureconfig.Get().InitSyncCacheState {
return nil
}
genesisState, err := s.db.GenesisState(ctx)
if err != nil {
return err
}
stateRoot, err := ssz.HashTreeRoot(genesisState)
if err != nil {
return errors.Wrap(err, "could not tree hash genesis state")
}
genesisBlk := blocks.NewGenesisBlock(stateRoot[:])
genesisBlkRoot, err := ssz.SigningRoot(genesisBlk)
if err != nil {
return errors.Wrap(err, "could not get genesis block root")
}
s.initSyncState[genesisBlkRoot] = genesisState
return nil
}

View File

@ -16,6 +16,7 @@ import (
testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
)
func TestStore_GenesisStoreOk(t *testing.T) {
@ -329,3 +330,32 @@ func TestStore_GetHead(t *testing.T) {
t.Error("Incorrect head")
}
}
func TestCacheGenesisState_Correct(t *testing.T) {
ctx := context.Background()
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
store := NewForkChoiceService(ctx, db)
config := &featureconfig.Flags{
InitSyncCacheState: true,
}
featureconfig.Init(config)
b := &ethpb.BeaconBlock{Slot: 1}
r, _ := ssz.SigningRoot(b)
s := &pb.BeaconState{GenesisTime: 99}
store.db.SaveState(ctx, s, r)
store.db.SaveGenesisBlockRoot(ctx, r)
if err := store.cacheGenesisState(ctx); err != nil {
t.Fatal(err)
}
for _, state := range store.initSyncState {
if !reflect.DeepEqual(s, state) {
t.Error("Did not get wanted state")
}
}
}

View File

@ -90,6 +90,23 @@ func (s *Service) Start() {
if err != nil {
log.Fatalf("Could not fetch beacon state: %v", err)
}
// For running initial sync with state cache, in an event of restart, we use
// last finalized check point as start point to sync instead of head
// state. This is because we no longer save state every slot during sync.
if featureconfig.Get().InitSyncCacheState {
cp, err := s.beaconDB.FinalizedCheckpoint(ctx)
if err != nil {
log.Fatalf("Could not fetch finalized cp: %v", err)
}
if beaconState == nil {
beaconState, err = s.beaconDB.State(ctx, bytesutil.ToBytes32(cp.Root))
if err != nil {
log.Fatalf("Could not fetch beacon state: %v", err)
}
}
}
// If the chain has already been initialized, simply start the block processing routine.
if beaconState != nil {
log.Info("Blockchain data already exists in DB, initializing...")
@ -313,7 +330,7 @@ func (s *Service) initializeChainInfo(ctx context.Context) error {
return errors.Wrap(err, "could not get finalized block from db")
}
s.headSlot = s.headState.Slot
s.headSlot = s.headBlock.Slot
s.canonicalRoots[s.headSlot] = finalized.Root
return nil

View File

@ -34,6 +34,7 @@ type Flags struct {
PruneEpochBoundaryStates bool // PruneEpochBoundaryStates prunes the epoch boundary state before last finalized check point.
EnableSnappyDBCompression bool // EnableSnappyDBCompression in the database.
EnableCustomStateSSZ bool // EnableCustomStateSSZ in the the state transition function.
InitSyncCacheState bool // InitSyncCacheState caches state during initial sync.
// Cache toggles.
EnableAttestationCache bool // EnableAttestationCache; see https://github.com/prysmaticlabs/prysm/issues/3106.
@ -141,6 +142,10 @@ func ConfigureBeaconChain(ctx *cli.Context) {
log.Warn("Enabled pruning epoch boundary states before last finalized check point.")
cfg.PruneEpochBoundaryStates = true
}
if ctx.GlobalBool(initSyncCacheState.Name) {
log.Warn("Enabled initial sync cache state mode.")
cfg.InitSyncCacheState = true
}
Init(cfg)
}

View File

@ -87,6 +87,12 @@ var (
"and attestation's aggregated signatures. Without this flag, only the proposer " +
"signature is verified until the node reaches the end of the finalized chain.",
}
initSyncCacheState = cli.BoolFlag{
Name: "initial-sync-cache-state",
Usage: "Save state in cache during initial sync. We currently save state in the DB during " +
"initial sync and disk-IO is one of the biggest bottleneck. This still saves finalized state in DB " +
"and start syncing from there",
}
)
// Deprecated flags list.
@ -148,6 +154,7 @@ var BeaconChainFlags = append(deprecatedFlags, []cli.Flag{
EnableEth1DataVoteCacheFlag,
EnableCustomStateSSZ,
initSyncVerifyEverythingFlag,
initSyncCacheState,
NewCacheFlag,
SkipBLSVerifyFlag,
enableBackupWebhookFlag,