State service clean up - better migration (#5391)

This commit is contained in:
terence tsao 2020-04-11 13:54:19 -07:00 committed by GitHub
parent 40f7b258eb
commit e59721f264
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 34 additions and 19 deletions

View File

@ -56,6 +56,7 @@ type ReadOnlyDatabase interface {
ArchivedPointRoot(ctx context.Context, index uint64) [32]byte
HasArchivedPoint(ctx context.Context, index uint64) bool
LastArchivedIndexRoot(ctx context.Context) [32]byte
LastArchivedIndex(ctx context.Context) (uint64, error)
// Deposit contract related handlers.
DepositContractAddress(ctx context.Context) ([]byte, error)
// Powchain operations.

View File

@ -352,3 +352,8 @@ func (e Exporter) HighestSlotStatesBelow(ctx context.Context, slot uint64) ([]*s
func (e Exporter) SaveLastArchivedIndex(ctx context.Context, index uint64) error {
return e.db.SaveLastArchivedIndex(ctx, index)
}
// LastArchivedIndex -- passthrough
func (e Exporter) LastArchivedIndex(ctx context.Context) (uint64, error) {
return e.db.LastArchivedIndex(ctx)
}

View File

@ -38,6 +38,9 @@ func (k *Store) LastArchivedIndex(ctx context.Context) (uint64, error) {
err := k.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(archivedIndexRootBucket)
b := bucket.Get(lastArchivedIndexKey)
if b == nil {
return nil
}
index = binary.LittleEndian.Uint64(b)
return nil
})

View File

@ -30,6 +30,14 @@ func TestLastArchivedPoint_CanRetrieve(t *testing.T) {
db := setupDB(t)
defer teardownDB(t, db)
ctx := context.Background()
i, err := db.LastArchivedIndex(ctx)
if err != nil {
t.Fatal(err)
}
if i != 0 {
t.Error("Did not get correct index")
}
if err := db.SaveArchivedPointRoot(ctx, [32]byte{'A'}, 1); err != nil {
t.Fatal(err)
}
@ -51,4 +59,12 @@ func TestLastArchivedPoint_CanRetrieve(t *testing.T) {
if db.LastArchivedIndexRoot(ctx) != [32]byte{'B'} {
t.Error("Did not get wanted root")
}
i, err = db.LastArchivedIndex(ctx)
if err != nil {
t.Fatal(err)
}
if i != 3 {
t.Error("Did not get correct index")
}
}

View File

@ -4,7 +4,6 @@ import (
"context"
"encoding/hex"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/sirupsen/logrus"
@ -24,9 +23,6 @@ func (s *State) MigrateToCold(ctx context.Context, finalizedSlot uint64, finaliz
if currentSplitSlot > finalizedSlot {
return nil
}
if !helpers.IsEpochStart(finalizedSlot) {
return nil
}
// Migrate all state summary objects from cache to DB.
if err := s.beaconDB.SaveStateSummaries(ctx, s.stateSummaryCache.GetAll()); err != nil {
@ -34,6 +30,11 @@ func (s *State) MigrateToCold(ctx context.Context, finalizedSlot uint64, finaliz
}
s.stateSummaryCache.Clear()
lastArchivedIndex, err := s.beaconDB.LastArchivedIndex(ctx)
if err != nil {
return err
}
// Move the states between split slot to finalized slot from hot section to the cold section.
filter := filters.NewFilter().SetStartSlot(currentSplitSlot).SetEndSlot(finalizedSlot - 1)
blockRoots, err := s.beaconDB.BlockRoots(ctx, filter)
@ -51,7 +52,9 @@ func (s *State) MigrateToCold(ctx context.Context, finalizedSlot uint64, finaliz
}
archivedPointIndex := stateSummary.Slot / s.slotsPerArchivedPoint
if stateSummary.Slot%s.slotsPerArchivedPoint == 0 {
nextArchivedPointSlot := (lastArchivedIndex + 1) * s.slotsPerArchivedPoint
// Only migrate if current slot is equal to or greater than next archived point slot.
if stateSummary.Slot >= nextArchivedPointSlot {
if !s.beaconDB.HasState(ctx, r) {
recoveredArchivedState, err := s.ComputeStateUpToSlot(ctx, stateSummary.Slot)
if err != nil {
@ -67,6 +70,7 @@ func (s *State) MigrateToCold(ctx context.Context, finalizedSlot uint64, finaliz
if err := s.beaconDB.SaveLastArchivedIndex(ctx, archivedPointIndex); err != nil {
return err
}
lastArchivedIndex++
log.WithFields(logrus.Fields{
"slot": stateSummary.Slot,
"archiveIndex": archivedPointIndex,

View File

@ -44,20 +44,6 @@ func TestMigrateToCold_HigherSplitSlot(t *testing.T) {
testutil.AssertLogsDoNotContain(t, hook, "Set hot and cold state split point")
}
func TestMigrateToCold_NotEpochStart(t *testing.T) {
hook := logTest.NewGlobal()
ctx := context.Background()
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
service := New(db, cache.NewStateSummaryCache())
if err := service.MigrateToCold(ctx, params.BeaconConfig().SlotsPerEpoch+1, [32]byte{}); err != nil {
t.Fatal(err)
}
testutil.AssertLogsDoNotContain(t, hook, "Set hot and cold state split point")
}
func TestMigrateToCold_MigrationCompletes(t *testing.T) {
hook := logTest.NewGlobal()
ctx := context.Background()