db: Deduplicate saveCheckpoint functionality (#12304)

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Preston Van Loon 2023-04-20 07:13:21 -05:00 committed by GitHub
parent 7e7a2a2959
commit 9f886da1de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 32 additions and 35 deletions

View File

@ -7,6 +7,7 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v4/monitoring/tracing"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
bolt "go.etcd.io/bbolt"
"go.opencensus.io/trace"
@ -55,22 +56,7 @@ func (s *Store) SaveJustifiedCheckpoint(ctx context.Context, checkpoint *ethpb.C
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveJustifiedCheckpoint")
defer span.End()
enc, err := encode(ctx, checkpoint)
if err != nil {
return err
}
hasStateSummary := s.HasStateSummary(ctx, bytesutil.ToBytes32(checkpoint.Root))
return s.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(checkpointBucket)
hasStateInDB := tx.Bucket(stateBucket).Get(checkpoint.Root) != nil
if !(hasStateInDB || hasStateSummary) {
log.Warnf("Recovering state summary for justified root: %#x", bytesutil.Trunc(checkpoint.Root))
if err := recoverStateSummary(ctx, tx, checkpoint.Root); err != nil {
return errors.Wrapf(errMissingStateForCheckpoint, "could not save justified checkpoint, finalized root: %#x", bytesutil.Trunc(checkpoint.Root))
}
}
return bucket.Put(justifiedCheckpointKey, enc)
})
return s.saveCheckpoint(ctx, justifiedCheckpointKey, checkpoint)
}
// SaveFinalizedCheckpoint saves finalized checkpoint in beacon chain.
@ -80,10 +66,11 @@ func (s *Store) SaveFinalizedCheckpoint(ctx context.Context, checkpoint *ethpb.C
enc, err := encode(ctx, checkpoint)
if err != nil {
tracing.AnnotateError(span, err)
return err
}
hasStateSummary := s.HasStateSummary(ctx, bytesutil.ToBytes32(checkpoint.Root))
return s.db.Update(func(tx *bolt.Tx) error {
err = s.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(checkpointBucket)
hasStateInDB := tx.Bucket(stateBucket).Get(checkpoint.Root) != nil
if !(hasStateInDB || hasStateSummary) {
@ -98,6 +85,33 @@ func (s *Store) SaveFinalizedCheckpoint(ctx context.Context, checkpoint *ethpb.C
return s.updateFinalizedBlockRoots(ctx, tx, checkpoint)
})
tracing.AnnotateError(span, err)
return err
}
func (s *Store) saveCheckpoint(ctx context.Context, key []byte, checkpoint *ethpb.Checkpoint) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.saveCheckpoint")
defer span.End()
enc, err := encode(ctx, checkpoint)
if err != nil {
tracing.AnnotateError(span, err)
return err
}
hasStateSummary := s.HasStateSummary(ctx, bytesutil.ToBytes32(checkpoint.Root))
err = s.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(checkpointBucket)
hasStateInDB := tx.Bucket(stateBucket).Get(checkpoint.Root) != nil
if !(hasStateInDB || hasStateSummary) {
log.WithField("root", fmt.Sprintf("%#x", bytesutil.Trunc(checkpoint.Root))).Warn("Recovering state summary")
if err := recoverStateSummary(ctx, tx, checkpoint.Root); err != nil {
return errMissingStateForCheckpoint
}
}
return bucket.Put(key, enc)
})
tracing.AnnotateError(span, err)
return err
}
// Recovers and saves state summary for a given root if the root has a block in the DB.

View File

@ -3,8 +3,6 @@ package kv
import (
"context"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
bolt "go.etcd.io/bbolt"
"go.opencensus.io/trace"
@ -34,20 +32,5 @@ func (s *Store) SaveLastValidatedCheckpoint(ctx context.Context, checkpoint *eth
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveLastValidatedCheckpoint")
defer span.End()
enc, err := encode(ctx, checkpoint)
if err != nil {
return err
}
hasStateSummary := s.HasStateSummary(ctx, bytesutil.ToBytes32(checkpoint.Root))
return s.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(checkpointBucket)
hasStateInDB := tx.Bucket(stateBucket).Get(checkpoint.Root) != nil
if !(hasStateInDB || hasStateSummary) {
log.Warnf("Recovering state summary for last validated root: %#x", bytesutil.Trunc(checkpoint.Root))
if err := recoverStateSummary(ctx, tx, checkpoint.Root); err != nil {
return errors.Wrapf(errMissingStateForCheckpoint, "could not save finalized checkpoint, last validated root: %#x", bytesutil.Trunc(checkpoint.Root))
}
}
return bucket.Put(lastValidatedCheckpointKey, enc)
})
return s.saveCheckpoint(ctx, lastValidatedCheckpointKey, checkpoint)
}