From 9d4c7cb4f7232f4ea83d0b533516d1dccb9f8095 Mon Sep 17 00:00:00 2001 From: terence tsao Date: Thu, 5 Dec 2019 16:49:19 -0800 Subject: [PATCH] Use cache state during init sync (#4199) * Initial sync state cache * Gaz * Gaz * Don't save head root * Fix config validator * Uncomment save head * Merge branch 'master' into initial-sync-no-verify * Minor refactor * Merge branch 'initial-sync-no-verify' of https://github.com/prysmaticlabs/prysm into initial-sync-no-verify * Merge branch 'master' into initial-sync-no-verify * Tests * Merge branch 'initial-sync-no-verify' of https://github.com/prysmaticlabs/prysm into initial-sync-no-verify * Merge branch 'master' into initial-sync-no-verify * Add lock * Merge branch 'initial-sync-no-verify' of https://github.com/prysmaticlabs/prysm into initial-sync-no-verify * Tests * Removed save head * One more test * Merge branch 'master' into initial-sync-no-verify * Raul's feedback * Merge branch 'initial-sync-no-verify' of https://github.com/prysmaticlabs/prysm into initial-sync-no-verify * Comment * Gazelle * Merge branch 'master' of https://github.com/prysmaticlabs/prysm into initial-sync-no-verify * revert * Update beacon-chain/blockchain/service.go Co-Authored-By: Ivan Martinez * Merge branch 'master' of https://github.com/prysmaticlabs/prysm into initial-sync-no-verify * Merge branch 'initial-sync-no-verify' of https://github.com/prysmaticlabs/prysm into initial-sync-no-verify * Fixed test * Fixed feature flag * Merge branch 'master' into initial-sync-no-verify * Fixed cache gensis state test * Merge branch 'initial-sync-no-verify' of https://github.com/prysmaticlabs/prysm into initial-sync-no-verify --- .../blockchain/forkchoice/process_block.go | 68 ++++++++++- .../forkchoice/process_block_test.go | 110 ++++++++++++++++++ beacon-chain/blockchain/forkchoice/service.go | 34 ++++++ .../blockchain/forkchoice/service_test.go | 30 +++++ beacon-chain/blockchain/service.go | 19 ++- shared/featureconfig/config.go | 5 + shared/featureconfig/flags.go | 7 ++ 7 files changed, 269 insertions(+), 4 deletions(-) diff --git a/beacon-chain/blockchain/forkchoice/process_block.go b/beacon-chain/blockchain/forkchoice/process_block.go index 99d125f25..6f6e925ca 100644 --- a/beacon-chain/blockchain/forkchoice/process_block.go +++ b/beacon-chain/blockchain/forkchoice/process_block.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "fmt" + "github.com/gogo/protobuf/proto" "github.com/pkg/errors" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/go-ssz" @@ -18,6 +19,7 @@ import ( "github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/prysmaticlabs/prysm/shared/featureconfig" "github.com/prysmaticlabs/prysm/shared/hashutil" + "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/traceutil" "github.com/sirupsen/logrus" "go.opencensus.io/trace" @@ -154,8 +156,11 @@ func (s *Store) OnBlockInitialSyncStateTransition(ctx context.Context, b *ethpb. ctx, span := trace.StartSpan(ctx, "forkchoice.onBlock") defer span.End() + s.initSyncStateLock.Lock() + defer s.initSyncStateLock.Unlock() + // Retrieve incoming block's pre state. - preState, err := s.getBlockPreState(ctx, b) + preState, err := s.cachedPreState(ctx, b) if err != nil { return err } @@ -175,8 +180,13 @@ func (s *Store) OnBlockInitialSyncStateTransition(ctx context.Context, b *ethpb. if err != nil { return errors.Wrapf(err, "could not get signing root of block %d", b.Slot) } - if err := s.db.SaveState(ctx, postState, root); err != nil { - return errors.Wrap(err, "could not save state") + + if featureconfig.Get().InitSyncCacheState { + s.initSyncState[root] = postState + } else { + if err := s.db.SaveState(ctx, postState, root); err != nil { + return errors.Wrap(err, "could not save state") + } } // Update justified check point. @@ -205,6 +215,10 @@ func (s *Store) OnBlockInitialSyncStateTransition(ctx context.Context, b *ethpb. } } + if err := s.saveInitState(ctx, postState); err != nil { + return errors.Wrap(err, "could not save init sync finalized state") + } + if err := s.db.SaveFinalizedCheckpoint(ctx, postState.FinalizedCheckpoint); err != nil { return errors.Wrap(err, "could not save finalized checkpoint") } @@ -465,3 +479,51 @@ func (s *Store) rmStatesOlderThanLastFinalized(ctx context.Context, startSlot ui return nil } + +// This receives cached state in memory for initial sync only during initial sync. +func (s *Store) cachedPreState(ctx context.Context, b *ethpb.BeaconBlock) (*pb.BeaconState, error) { + if featureconfig.Get().InitSyncCacheState { + preState := s.initSyncState[bytesutil.ToBytes32(b.ParentRoot)] + var err error + if preState == nil { + preState, err = s.db.State(ctx, bytesutil.ToBytes32(b.ParentRoot)) + if err != nil { + return nil, errors.Wrapf(err, "could not get pre state for slot %d", b.Slot) + } + if preState == nil { + return nil, fmt.Errorf("pre state of slot %d does not exist", b.Slot) + } + } + return proto.Clone(preState).(*pb.BeaconState), nil + } + + preState, err := s.db.State(ctx, bytesutil.ToBytes32(b.ParentRoot)) + if err != nil { + return nil, errors.Wrapf(err, "could not get pre state for slot %d", b.Slot) + } + if preState == nil { + return nil, fmt.Errorf("pre state of slot %d does not exist", b.Slot) + } + + return preState, nil +} + +// This saves every finalized state in DB during initial sync, needed as part of optimization to +// use cache state during initial sync in case of restart. +func (s *Store) saveInitState(ctx context.Context, state *pb.BeaconState) error { + if !featureconfig.Get().InitSyncCacheState { + return nil + } + finalizedRoot := bytesutil.ToBytes32(state.FinalizedCheckpoint.Root) + fs := s.initSyncState[finalizedRoot] + + if err := s.db.SaveState(ctx, fs, finalizedRoot); err != nil { + return errors.Wrap(err, "could not save state") + } + for r, oldState := range s.initSyncState { + if oldState.Slot < state.FinalizedCheckpoint.Epoch*params.BeaconConfig().SlotsPerEpoch { + delete(s.initSyncState, r) + } + } + return nil +} diff --git a/beacon-chain/blockchain/forkchoice/process_block_test.go b/beacon-chain/blockchain/forkchoice/process_block_test.go index d231f6c2c..2a577fafd 100644 --- a/beacon-chain/blockchain/forkchoice/process_block_test.go +++ b/beacon-chain/blockchain/forkchoice/process_block_test.go @@ -369,3 +369,113 @@ func TestRemoveStateSinceLastFinalized_EmptyStartSlot(t *testing.T) { t.Error("Did not delete state for start slot") } } + +func TestCachedPreState_CanGetFromCache(t *testing.T) { + ctx := context.Background() + db := testDB.SetupDB(t) + defer testDB.TeardownDB(t, db) + + store := NewForkChoiceService(ctx, db) + s := &pb.BeaconState{Slot: 1} + r := [32]byte{'A'} + b := ðpb.BeaconBlock{Slot: 1, ParentRoot: r[:]} + store.initSyncState[r] = s + + wanted := "pre state of slot 1 does not exist" + if _, err := store.cachedPreState(ctx, b); !strings.Contains(err.Error(), wanted) { + t.Fatal("Not expected error") + } +} + +func TestCachedPreState_CanGetFromCacheWithFeature(t *testing.T) { + ctx := context.Background() + db := testDB.SetupDB(t) + defer testDB.TeardownDB(t, db) + config := &featureconfig.Flags{ + InitSyncCacheState: true, + } + featureconfig.Init(config) + + store := NewForkChoiceService(ctx, db) + s := &pb.BeaconState{Slot: 1} + r := [32]byte{'A'} + b := ðpb.BeaconBlock{Slot: 1, ParentRoot: r[:]} + store.initSyncState[r] = s + + received, err := store.cachedPreState(ctx, b) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(s, received) { + t.Error("cached state not the same") + } +} + +func TestCachedPreState_CanGetFromDB(t *testing.T) { + ctx := context.Background() + db := testDB.SetupDB(t) + defer testDB.TeardownDB(t, db) + + store := NewForkChoiceService(ctx, db) + r := [32]byte{'A'} + b := ðpb.BeaconBlock{Slot: 1, ParentRoot: r[:]} + + _, err := store.cachedPreState(ctx, b) + wanted := "pre state of slot 1 does not exist" + if err.Error() != wanted { + t.Error("Did not get wanted error") + } + + s := &pb.BeaconState{Slot: 1} + store.db.SaveState(ctx, s, r) + + received, err := store.cachedPreState(ctx, b) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(s, received) { + t.Error("cached state not the same") + } +} + +func TestSaveInitState_CanSaveDelete(t *testing.T) { + ctx := context.Background() + db := testDB.SetupDB(t) + defer testDB.TeardownDB(t, db) + + store := NewForkChoiceService(ctx, db) + + config := &featureconfig.Flags{ + InitSyncCacheState: true, + } + featureconfig.Init(config) + + for i := uint64(0); i < 64; i++ { + b := ðpb.BeaconBlock{Slot: i} + s := &pb.BeaconState{Slot: i} + r, _ := ssz.SigningRoot(b) + store.initSyncState[r] = s + } + + // Set finalized root as slot 32 + finalizedRoot, _ := ssz.SigningRoot(ðpb.BeaconBlock{Slot: 32}) + + if err := store.saveInitState(ctx, &pb.BeaconState{FinalizedCheckpoint: ðpb.Checkpoint{ + Epoch: 1, Root: finalizedRoot[:]}}); err != nil { + t.Fatal(err) + } + + // Verify finalized state is saved in DB + finalizedState, err := store.db.State(ctx, finalizedRoot) + if err != nil { + t.Fatal(err) + } + if finalizedState == nil { + t.Error("finalized state can't be nil") + } + + // Verify cached state is properly pruned + if len(store.initSyncState) != int(params.BeaconConfig().SlotsPerEpoch) { + t.Errorf("wanted: %d, got: %d", len(store.initSyncState), params.BeaconConfig().SlotsPerEpoch) + } +} diff --git a/beacon-chain/blockchain/forkchoice/service.go b/beacon-chain/blockchain/forkchoice/service.go index 8bc6d8355..99b266522 100644 --- a/beacon-chain/blockchain/forkchoice/service.go +++ b/beacon-chain/blockchain/forkchoice/service.go @@ -8,12 +8,15 @@ import ( "github.com/gogo/protobuf/proto" "github.com/pkg/errors" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/go-ssz" "github.com/prysmaticlabs/prysm/beacon-chain/cache" + "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/db" "github.com/prysmaticlabs/prysm/beacon-chain/db/filters" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/featureconfig" "github.com/prysmaticlabs/prysm/shared/params" "go.opencensus.io/trace" ) @@ -44,6 +47,8 @@ type Store struct { seenAttsLock sync.Mutex latestVoteMap map[uint64]*pb.ValidatorLatestVote voteLock sync.RWMutex + initSyncState map[[32]byte]*pb.BeaconState + initSyncStateLock sync.RWMutex } // NewForkChoiceService instantiates a new service instance that will @@ -57,6 +62,7 @@ func NewForkChoiceService(ctx context.Context, db db.Database) *Store { checkpointState: cache.NewCheckpointStateCache(), latestVoteMap: make(map[uint64]*pb.ValidatorLatestVote), seenAtts: make(map[[32]byte]bool), + initSyncState: make(map[[32]byte]*pb.BeaconState), } } @@ -98,6 +104,34 @@ func (s *Store) GenesisStore( return errors.Wrap(err, "could not save genesis state in check point cache") } + if err := s.cacheGenesisState(ctx); err != nil { + return errors.Wrap(err, "could not cache initial sync state") + } + + return nil +} + +// This sets up gensis for initial sync state cache. +func (s *Store) cacheGenesisState(ctx context.Context) error { + if !featureconfig.Get().InitSyncCacheState { + return nil + } + + genesisState, err := s.db.GenesisState(ctx) + if err != nil { + return err + } + stateRoot, err := ssz.HashTreeRoot(genesisState) + if err != nil { + return errors.Wrap(err, "could not tree hash genesis state") + } + genesisBlk := blocks.NewGenesisBlock(stateRoot[:]) + genesisBlkRoot, err := ssz.SigningRoot(genesisBlk) + if err != nil { + return errors.Wrap(err, "could not get genesis block root") + } + s.initSyncState[genesisBlkRoot] = genesisState + return nil } diff --git a/beacon-chain/blockchain/forkchoice/service_test.go b/beacon-chain/blockchain/forkchoice/service_test.go index da95d4e61..46bd0aec7 100644 --- a/beacon-chain/blockchain/forkchoice/service_test.go +++ b/beacon-chain/blockchain/forkchoice/service_test.go @@ -16,6 +16,7 @@ import ( testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/featureconfig" ) func TestStore_GenesisStoreOk(t *testing.T) { @@ -329,3 +330,32 @@ func TestStore_GetHead(t *testing.T) { t.Error("Incorrect head") } } + +func TestCacheGenesisState_Correct(t *testing.T) { + ctx := context.Background() + db := testDB.SetupDB(t) + defer testDB.TeardownDB(t, db) + + store := NewForkChoiceService(ctx, db) + config := &featureconfig.Flags{ + InitSyncCacheState: true, + } + featureconfig.Init(config) + + b := ðpb.BeaconBlock{Slot: 1} + r, _ := ssz.SigningRoot(b) + s := &pb.BeaconState{GenesisTime: 99} + + store.db.SaveState(ctx, s, r) + store.db.SaveGenesisBlockRoot(ctx, r) + + if err := store.cacheGenesisState(ctx); err != nil { + t.Fatal(err) + } + + for _, state := range store.initSyncState { + if !reflect.DeepEqual(s, state) { + t.Error("Did not get wanted state") + } + } +} diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index fabde2a12..ddcfbd4f1 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -90,6 +90,23 @@ func (s *Service) Start() { if err != nil { log.Fatalf("Could not fetch beacon state: %v", err) } + + // For running initial sync with state cache, in an event of restart, we use + // last finalized check point as start point to sync instead of head + // state. This is because we no longer save state every slot during sync. + if featureconfig.Get().InitSyncCacheState { + cp, err := s.beaconDB.FinalizedCheckpoint(ctx) + if err != nil { + log.Fatalf("Could not fetch finalized cp: %v", err) + } + if beaconState == nil { + beaconState, err = s.beaconDB.State(ctx, bytesutil.ToBytes32(cp.Root)) + if err != nil { + log.Fatalf("Could not fetch beacon state: %v", err) + } + } + } + // If the chain has already been initialized, simply start the block processing routine. if beaconState != nil { log.Info("Blockchain data already exists in DB, initializing...") @@ -313,7 +330,7 @@ func (s *Service) initializeChainInfo(ctx context.Context) error { return errors.Wrap(err, "could not get finalized block from db") } - s.headSlot = s.headState.Slot + s.headSlot = s.headBlock.Slot s.canonicalRoots[s.headSlot] = finalized.Root return nil diff --git a/shared/featureconfig/config.go b/shared/featureconfig/config.go index c1cf005a8..32ba2f80e 100644 --- a/shared/featureconfig/config.go +++ b/shared/featureconfig/config.go @@ -34,6 +34,7 @@ type Flags struct { PruneEpochBoundaryStates bool // PruneEpochBoundaryStates prunes the epoch boundary state before last finalized check point. EnableSnappyDBCompression bool // EnableSnappyDBCompression in the database. EnableCustomStateSSZ bool // EnableCustomStateSSZ in the the state transition function. + InitSyncCacheState bool // InitSyncCacheState caches state during initial sync. // Cache toggles. EnableAttestationCache bool // EnableAttestationCache; see https://github.com/prysmaticlabs/prysm/issues/3106. @@ -141,6 +142,10 @@ func ConfigureBeaconChain(ctx *cli.Context) { log.Warn("Enabled pruning epoch boundary states before last finalized check point.") cfg.PruneEpochBoundaryStates = true } + if ctx.GlobalBool(initSyncCacheState.Name) { + log.Warn("Enabled initial sync cache state mode.") + cfg.InitSyncCacheState = true + } Init(cfg) } diff --git a/shared/featureconfig/flags.go b/shared/featureconfig/flags.go index 3c67f98b1..30f4517d5 100644 --- a/shared/featureconfig/flags.go +++ b/shared/featureconfig/flags.go @@ -87,6 +87,12 @@ var ( "and attestation's aggregated signatures. Without this flag, only the proposer " + "signature is verified until the node reaches the end of the finalized chain.", } + initSyncCacheState = cli.BoolFlag{ + Name: "initial-sync-cache-state", + Usage: "Save state in cache during initial sync. We currently save state in the DB during " + + "initial sync and disk-IO is one of the biggest bottleneck. This still saves finalized state in DB " + + "and start syncing from there", + } ) // Deprecated flags list. @@ -148,6 +154,7 @@ var BeaconChainFlags = append(deprecatedFlags, []cli.Flag{ EnableEth1DataVoteCacheFlag, EnableCustomStateSSZ, initSyncVerifyEverythingFlag, + initSyncCacheState, NewCacheFlag, SkipBLSVerifyFlag, enableBackupWebhookFlag,