From de8e50d8b6bcca923c38418e80291ca4c329848b Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Thu, 6 Oct 2022 03:11:03 +0800 Subject: [PATCH] Migrate Historical States In Separate Routine (#11501) * add changes * space * add test * space Co-authored-by: terencechain --- .../blockchain/process_block_helpers.go | 11 ++- beacon-chain/state/stategen/migrate.go | 6 ++ beacon-chain/state/stategen/migrate_test.go | 84 +++++++++++++++++++ beacon-chain/state/stategen/service.go | 4 +- 4 files changed, 101 insertions(+), 4 deletions(-) diff --git a/beacon-chain/blockchain/process_block_helpers.go b/beacon-chain/blockchain/process_block_helpers.go index cfb902895..74def66bd 100644 --- a/beacon-chain/blockchain/process_block_helpers.go +++ b/beacon-chain/blockchain/process_block_helpers.go @@ -171,9 +171,14 @@ func (s *Service) updateFinalized(ctx context.Context, cp *ethpb.Checkpoint) err return err } } - if err := s.cfg.StateGen.MigrateToCold(ctx, fRoot); err != nil { - return errors.Wrap(err, "could not migrate to cold") - } + go func() { + // We do not pass in the parent context from the method as this method call + // is meant to be asynchronous and run in the background rather than being + // tied to the execution of a block. + if err := s.cfg.StateGen.MigrateToCold(s.ctx, fRoot); err != nil { + log.WithError(err).Error("could not migrate to cold") + } + }() return nil } diff --git a/beacon-chain/state/stategen/migrate.go b/beacon-chain/state/stategen/migrate.go index e70cf688b..98c300568 100644 --- a/beacon-chain/state/stategen/migrate.go +++ b/beacon-chain/state/stategen/migrate.go @@ -18,6 +18,12 @@ func (s *State) MigrateToCold(ctx context.Context, fRoot [32]byte) error { ctx, span := trace.StartSpan(ctx, "stateGen.MigrateToCold") defer span.End() + // When migrating states we choose to acquire the migration lock before + // proceeding. This is to prevent multiple migration routines from overwriting each + // other. + s.migrationLock.Lock() + defer s.migrationLock.Unlock() + s.finalizedInfo.lock.RLock() oldFSlot := s.finalizedInfo.slot s.finalizedInfo.lock.RUnlock() diff --git a/beacon-chain/state/stategen/migrate_test.go b/beacon-chain/state/stategen/migrate_test.go index 1d2b7e7bf..25e4146c2 100644 --- a/beacon-chain/state/stategen/migrate_test.go +++ b/beacon-chain/state/stategen/migrate_test.go @@ -7,6 +7,7 @@ import ( "github.com/prysmaticlabs/prysm/v3/beacon-chain/core/blocks" testDB "github.com/prysmaticlabs/prysm/v3/beacon-chain/db/testing" doublylinkedtree "github.com/prysmaticlabs/prysm/v3/beacon-chain/forkchoice/doubly-linked-tree" + consensusblocks "github.com/prysmaticlabs/prysm/v3/consensus-types/blocks" types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v3/testing/assert" @@ -135,3 +136,86 @@ func TestMigrateToCold_StateExistsInDB(t *testing.T) { assert.DeepEqual(t, [][32]byte{{1}, {2}, {3}, {4}}, service.saveHotStateDB.blockRootsOfSavedStates) assert.LogsDoNotContain(t, hook, "Saved state in DB") } + +func TestMigrateToCold_ParallelCalls(t *testing.T) { + hook := logTest.NewGlobal() + ctx := context.Background() + beaconDB := testDB.SetupDB(t) + + service := New(beaconDB, doublylinkedtree.New()) + service.slotsPerArchivedPoint = 1 + beaconState, pks := util.DeterministicGenesisState(t, 32) + genState := beaconState.Copy() + genesisStateRoot, err := beaconState.HashTreeRoot(ctx) + require.NoError(t, err) + genesis := blocks.NewGenesisBlock(genesisStateRoot[:]) + util.SaveBlock(t, ctx, beaconDB, genesis) + gRoot, err := genesis.Block.HashTreeRoot() + require.NoError(t, err) + assert.NoError(t, beaconDB.SaveState(ctx, beaconState, gRoot)) + assert.NoError(t, beaconDB.SaveGenesisBlockRoot(ctx, gRoot)) + + b1, err := util.GenerateFullBlock(beaconState, pks, util.DefaultBlockGenConfig(), 1) + require.NoError(t, err) + wB1, err := consensusblocks.NewSignedBeaconBlock(b1) + require.NoError(t, err) + beaconState, err = executeStateTransitionStateGen(ctx, beaconState, wB1) + assert.NoError(t, err) + r1, err := b1.Block.HashTreeRoot() + require.NoError(t, err) + util.SaveBlock(t, ctx, service.beaconDB, b1) + require.NoError(t, service.beaconDB.SaveStateSummary(ctx, ðpb.StateSummary{Slot: 1, Root: r1[:]})) + + b4, err := util.GenerateFullBlock(beaconState, pks, util.DefaultBlockGenConfig(), 4) + require.NoError(t, err) + wB4, err := consensusblocks.NewSignedBeaconBlock(b4) + require.NoError(t, err) + beaconState, err = executeStateTransitionStateGen(ctx, beaconState, wB4) + assert.NoError(t, err) + r4, err := b4.Block.HashTreeRoot() + require.NoError(t, err) + util.SaveBlock(t, ctx, service.beaconDB, b4) + require.NoError(t, service.beaconDB.SaveStateSummary(ctx, ðpb.StateSummary{Slot: 4, Root: r4[:]})) + + b7, err := util.GenerateFullBlock(beaconState, pks, util.DefaultBlockGenConfig(), 7) + require.NoError(t, err) + wB7, err := consensusblocks.NewSignedBeaconBlock(b7) + require.NoError(t, err) + beaconState, err = executeStateTransitionStateGen(ctx, beaconState, wB7) + assert.NoError(t, err) + r7, err := b7.Block.HashTreeRoot() + require.NoError(t, err) + util.SaveBlock(t, ctx, service.beaconDB, b7) + require.NoError(t, service.beaconDB.SaveStateSummary(ctx, ðpb.StateSummary{Slot: 7, Root: r7[:]})) + + service.finalizedInfo = &finalizedInfo{ + slot: 0, + root: genesisStateRoot, + state: genState, + } + service.saveHotStateDB.blockRootsOfSavedStates = [][32]byte{r1, r4, r7} + + // Run the migration routines concurrently for 2 different finalized roots. + go func() { + require.NoError(t, service.MigrateToCold(ctx, r4)) + }() + + require.NoError(t, service.MigrateToCold(ctx, r7)) + + s1, err := service.beaconDB.State(ctx, r1) + require.NoError(t, err) + assert.Equal(t, s1.Slot(), types.Slot(1), "Did not save state") + s4, err := service.beaconDB.State(ctx, r4) + require.NoError(t, err) + assert.Equal(t, s4.Slot(), types.Slot(4), "Did not save state") + + gotRoot := service.beaconDB.ArchivedPointRoot(ctx, 1/service.slotsPerArchivedPoint) + assert.Equal(t, r1, gotRoot, "Did not save archived root") + gotRoot = service.beaconDB.ArchivedPointRoot(ctx, 4) + assert.Equal(t, r4, gotRoot, "Did not save archived root") + lastIndex, err := service.beaconDB.LastArchivedSlot(ctx) + require.NoError(t, err) + assert.Equal(t, types.Slot(4), lastIndex, "Did not save last archived index") + assert.DeepEqual(t, [][32]byte{r7}, service.saveHotStateDB.blockRootsOfSavedStates, "Did not remove all saved hot state roots") + require.LogsContain(t, hook, "Saved state in DB") +} diff --git a/beacon-chain/state/stategen/service.go b/beacon-chain/state/stategen/service.go index c2e57bc85..beb26e32e 100644 --- a/beacon-chain/state/stategen/service.go +++ b/beacon-chain/state/stategen/service.go @@ -47,6 +47,7 @@ type State struct { epochBoundaryStateCache *epochBoundaryState saveHotStateDB *saveHotStateDbConfig backfillStatus *backfill.Status + migrationLock *sync.Mutex fc forkchoice.ForkChoicer } @@ -89,7 +90,8 @@ func New(beaconDB db.NoHeadAccessDatabase, fc forkchoice.ForkChoicer, opts ...St saveHotStateDB: &saveHotStateDbConfig{ duration: defaultHotStateDBInterval, }, - fc: fc, + migrationLock: new(sync.Mutex), + fc: fc, } for _, o := range opts { o(s)