mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-25 12:57:18 +00:00
120 lines
3.4 KiB
Go
120 lines
3.4 KiB
Go
// Package dbcleanup defines the life cycle and logic of beacon DB cleanup routine.
|
|
package dbcleanup
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/core/types"
|
|
"github.com/prysmaticlabs/prysm/shared/event"
|
|
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/db"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
var log = logrus.WithField("prefix", "dbcleaner")
|
|
|
|
type chainService interface {
|
|
CanonicalStateFeed() *event.Feed
|
|
}
|
|
|
|
// CleanupService represents a service that handles routine task for cleaning up
|
|
// beacon DB so our DB won't grow infinitely.
|
|
// Currently it only cleans up block vote cache. In future, it could add more tasks
|
|
// such as cleaning up historical beacon states.
|
|
type CleanupService struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
beaconDB *db.BeaconDB
|
|
chainService chainService
|
|
canonicalStateChan chan *types.BeaconState
|
|
}
|
|
|
|
// Config defines the needed fields for creating a new cleanup service.
|
|
type Config struct {
|
|
SubscriptionBuf int
|
|
BeaconDB *db.BeaconDB
|
|
ChainService chainService
|
|
}
|
|
|
|
// NewCleanupService creates a new cleanup service instance.
|
|
func NewCleanupService(ctx context.Context, cfg *Config) *CleanupService {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
return &CleanupService{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
beaconDB: cfg.BeaconDB,
|
|
chainService: cfg.ChainService,
|
|
canonicalStateChan: make(chan *types.BeaconState, cfg.SubscriptionBuf),
|
|
}
|
|
}
|
|
|
|
// Start a cleanup service.
|
|
func (d *CleanupService) Start() {
|
|
log.Info("Starting service")
|
|
go d.cleanDB()
|
|
}
|
|
|
|
// Stop a cleanup service.
|
|
func (d *CleanupService) Stop() error {
|
|
defer d.cancel()
|
|
|
|
log.Info("Stopping service")
|
|
return nil
|
|
}
|
|
|
|
func (d *CleanupService) cleanDB() {
|
|
cStateSub := d.chainService.CanonicalStateFeed().Subscribe(d.canonicalStateChan)
|
|
defer cStateSub.Unsubscribe()
|
|
|
|
for {
|
|
select {
|
|
case <-d.ctx.Done():
|
|
log.Debug("Cleanup service context closed, exiting goroutine")
|
|
return
|
|
case cState := <-d.canonicalStateChan:
|
|
if err := d.cleanBlockVoteCache(cState.LastFinalizedSlot()); err != nil {
|
|
log.Errorf("Failed to clean block vote cache: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *CleanupService) cleanBlockVoteCache(latestFinalizedSlot uint64) error {
|
|
var lastCleanedFinalizedSlot uint64
|
|
var err error
|
|
|
|
lastCleanedFinalizedSlot, err = d.beaconDB.GetCleanedFinalizedSlot()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read cleaned finalized slot from DB: %v", err)
|
|
}
|
|
|
|
log.Infof("Finalized slot: latest: %d, last cleaned: %d, %d blocks' vote cache will be cleaned",
|
|
latestFinalizedSlot, lastCleanedFinalizedSlot, latestFinalizedSlot-lastCleanedFinalizedSlot)
|
|
|
|
var blockHashes [][32]byte
|
|
for slot := lastCleanedFinalizedSlot + 1; slot <= latestFinalizedSlot; slot++ {
|
|
var block *types.Block
|
|
block, err = d.beaconDB.GetBlockBySlot(slot)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read block at slot %d: %v", slot, err)
|
|
}
|
|
if block != nil {
|
|
var blockHash [32]byte
|
|
blockHash, err = block.Hash()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get hash of block: %v", err)
|
|
}
|
|
blockHashes = append(blockHashes, blockHash)
|
|
}
|
|
}
|
|
if err = d.beaconDB.DeleteBlockVoteCache(blockHashes); err != nil {
|
|
return fmt.Errorf("failed to delete block vote cache: %v", err)
|
|
}
|
|
|
|
if err = d.beaconDB.SaveCleanedFinalizedSlot(latestFinalizedSlot); err != nil {
|
|
return fmt.Errorf("failed to update cleaned finalized slot: %v", err)
|
|
}
|
|
return nil
|
|
}
|