prysm-pulse/beacon-chain/db/kv/kv.go
2024-02-21 19:08:13 +00:00

302 lines
9.7 KiB
Go

// Package kv defines a bolt-db, key-value store implementation
// of the Database interface defined by a Prysm beacon node.
package kv
import (
"context"
"fmt"
"os"
"path"
"time"
"github.com/dgraph-io/ristretto"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
prombolt "github.com/prysmaticlabs/prombbolt"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/iface"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/io/file"
bolt "go.etcd.io/bbolt"
)
var _ iface.Database = (*Store)(nil)
const (
// NumOfValidatorEntries is the size of the validator cache entries.
// we expect to hold a max of 200K validators, so setting it to 2 million (10x the capacity).
NumOfValidatorEntries = 1 << 21
// ValidatorEntryMaxCost is set to ~64Mb to allow 200K validators entries to be cached.
ValidatorEntryMaxCost = 1 << 26
// BeaconNodeDbDirName is the name of the directory containing the beacon node database.
BeaconNodeDbDirName = "beaconchaindata"
// DatabaseFileName is the name of the beacon node database.
DatabaseFileName = "beaconchain.db"
boltAllocSize = 8 * 1024 * 1024
// The size of hash length in bytes
hashLength = 32
// Specifies the initial mmap size of bolt.
mmapSize = 536870912
)
var (
// Metrics for the validator cache.
validatorEntryCacheHit = promauto.NewCounter(prometheus.CounterOpts{
Name: "validator_entry_cache_hit_total",
Help: "The total number of cache hits on the validator entry cache.",
})
validatorEntryCacheMiss = promauto.NewCounter(prometheus.CounterOpts{
Name: "validator_entry_cache_miss_total",
Help: "The total number of cache misses on the validator entry cache.",
})
validatorEntryCacheDelete = promauto.NewCounter(prometheus.CounterOpts{
Name: "validator_entry_cache_delete_total",
Help: "The total number of cache deletes on the validator entry cache.",
})
stateReadingTime = promauto.NewSummary(prometheus.SummaryOpts{
Name: "db_beacon_state_reading_milliseconds",
Help: "Milliseconds it takes to read a beacon state from the DB",
})
stateSavingTime = promauto.NewSummary(prometheus.SummaryOpts{
Name: "db_beacon_state_saving_milliseconds",
Help: "Milliseconds it takes to save a beacon state to the DB",
})
)
// BlockCacheSize specifies 1000 slots worth of blocks cached, which
// would be approximately 2MB
var BlockCacheSize = int64(1 << 21)
// blockedBuckets represents the buckets that we want to restrict
// from our metrics fetching for performance reasons. For a detailed
// summary, it can be read in https://github.com/prysmaticlabs/prysm/issues/8274.
var blockedBuckets = [][]byte{
blocksBucket,
stateSummaryBucket,
blockParentRootIndicesBucket,
blockSlotIndicesBucket,
finalizedBlockRootsIndexBucket,
}
// Store defines an implementation of the Prysm Database interface
// using BoltDB as the underlying persistent kv-store for Ethereum Beacon Nodes.
type Store struct {
db *bolt.DB
databasePath string
blockCache *ristretto.Cache
validatorEntryCache *ristretto.Cache
stateSummaryCache *stateSummaryCache
ctx context.Context
}
// StoreDatafilePath is the canonical construction of a full
// database file path from the directory path, so that code outside
// this package can find the full path in a consistent way.
func StoreDatafilePath(dirPath string) string {
return path.Join(dirPath, DatabaseFileName)
}
var Buckets = [][]byte{
blocksBucket,
stateBucket,
chainMetadataBucket,
checkpointBucket,
powchainBucket,
stateSummaryBucket,
stateValidatorsBucket,
// Indices buckets.
blockSlotIndicesBucket,
stateSlotIndicesBucket,
blockParentRootIndicesBucket,
finalizedBlockRootsIndexBucket,
blockRootValidatorHashesBucket,
// Migrations
migrationsBucket,
feeRecipientBucket,
registrationBucket,
}
// KVStoreOption is a functional option that modifies a kv.Store.
type KVStoreOption func(*Store)
// NewKVStore initializes a new boltDB key-value store at the directory
// path specified, creates the kv-buckets based on the schema, and stores
// an open connection db object as a property of the Store struct.
func NewKVStore(ctx context.Context, dirPath string, opts ...KVStoreOption) (*Store, error) {
hasDir, err := file.HasDir(dirPath)
if err != nil {
return nil, err
}
if !hasDir {
if err := file.MkdirAll(dirPath); err != nil {
return nil, err
}
}
datafile := StoreDatafilePath(dirPath)
log.Infof("Opening Bolt DB at %s", datafile)
boltDB, err := bolt.Open(
datafile,
params.BeaconIoConfig().ReadWritePermissions,
&bolt.Options{
Timeout: 1 * time.Second,
InitialMmapSize: mmapSize,
},
)
if err != nil {
if errors.Is(err, bolt.ErrTimeout) {
return nil, errors.New("cannot obtain database lock, database may be in use by another process")
}
return nil, err
}
boltDB.AllocSize = boltAllocSize
blockCache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: 1000, // number of keys to track frequency of (1000).
MaxCost: BlockCacheSize, // maximum cost of cache (1000 Blocks).
BufferItems: 64, // number of keys per Get buffer.
})
if err != nil {
return nil, err
}
validatorCache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: NumOfValidatorEntries, // number of entries in cache (2 Million).
MaxCost: ValidatorEntryMaxCost, // maximum size of the cache (64Mb)
BufferItems: 64, // number of keys per Get buffer.
})
if err != nil {
return nil, err
}
kv := &Store{
db: boltDB,
databasePath: dirPath,
blockCache: blockCache,
validatorEntryCache: validatorCache,
stateSummaryCache: newStateSummaryCache(),
ctx: ctx,
}
for _, o := range opts {
o(kv)
}
if err := kv.db.Update(func(tx *bolt.Tx) error {
return createBuckets(tx, Buckets...)
}); err != nil {
return nil, err
}
if err = prometheus.Register(createBoltCollector(kv.db)); err != nil {
return nil, err
}
// Setup the type of block storage used depending on whether or not this is a fresh database.
if err := kv.setupBlockStorageType(ctx); err != nil {
return nil, err
}
return kv, nil
}
// ClearDB removes the previously stored database in the data directory.
func (s *Store) ClearDB() error {
if err := s.Close(); err != nil {
return fmt.Errorf("failed to close db: %w", err)
}
if _, err := os.Stat(s.databasePath); os.IsNotExist(err) {
return nil
}
prometheus.Unregister(createBoltCollector(s.db))
if err := os.Remove(path.Join(s.databasePath, DatabaseFileName)); err != nil {
return errors.Wrap(err, "could not remove database file")
}
return nil
}
// Close closes the underlying BoltDB database.
func (s *Store) Close() error {
prometheus.Unregister(createBoltCollector(s.db))
// Before DB closes, we should dump the cached state summary objects to DB.
if err := s.saveCachedStateSummariesDB(s.ctx); err != nil {
return err
}
return s.db.Close()
}
// DatabasePath at which this database writes files.
func (s *Store) DatabasePath() string {
return s.databasePath
}
func (s *Store) setupBlockStorageType(ctx context.Context) error {
// We check if we want to save blinded beacon blocks by checking a key in the db
// otherwise, we check the last stored block and set that key in the DB if it is blinded.
headBlock, err := s.HeadBlock(ctx)
if err != nil {
return errors.Wrap(err, "could not get head block when setting up block storage type")
}
err = blocks.BeaconBlockIsNil(headBlock)
isNilBlk := err != nil
saveFull := features.Get().SaveFullExecutionPayloads
var saveBlinded bool
if err := s.db.Update(func(tx *bolt.Tx) error {
// If we have a key stating we wish to save blinded beacon blocks, then we set saveBlinded to true.
metadataBkt := tx.Bucket(chainMetadataBucket)
keyExists := len(metadataBkt.Get(saveBlindedBeaconBlocksKey)) > 0
if keyExists {
saveBlinded = true
return nil
}
// If the head block exists and is blinded, we update the key in the DB to
// say we wish to save all blocks as blinded.
if !isNilBlk && headBlock.IsBlinded() {
if err := metadataBkt.Put(saveBlindedBeaconBlocksKey, []byte{1}); err != nil {
return err
}
saveBlinded = true
}
if isNilBlk && !saveFull {
if err := metadataBkt.Put(saveBlindedBeaconBlocksKey, []byte{1}); err != nil {
return err
}
saveBlinded = true
}
return nil
}); err != nil {
return err
}
// If the user wants to save full execution payloads but their database is saving blinded blocks only,
// we then throw an error as the node should not start.
if saveFull && saveBlinded {
return fmt.Errorf(
"cannot use the %s flag with this existing database, as it has already been initialized to only store "+
"execution payload headers (aka blinded beacon blocks). If you want to use this flag, you must re-sync your node with a fresh "+
"database. We recommend using checkpoint sync https://docs.prylabs.network/docs/prysm-usage/checkpoint-sync/",
features.SaveFullExecutionPayloads.Name,
)
}
if saveFull {
log.Warn("Saving full beacon blocks to the database. For greater disk space savings, we recommend resyncing from an empty database with " +
"checkpoint sync to save only blinded beacon blocks by default")
}
return nil
}
func createBuckets(tx *bolt.Tx, buckets ...[]byte) error {
for _, bucket := range buckets {
if _, err := tx.CreateBucketIfNotExists(bucket); err != nil {
return err
}
}
return nil
}
// createBoltCollector returns a prometheus collector specifically configured for boltdb.
func createBoltCollector(db *bolt.DB) prometheus.Collector {
return prombolt.New("boltDB", db, blockedBuckets...)
}