erigon-pulse/ethdb/badger_db.go
Alex Sharov 57777e7c60
Prepare codebase for future default DB change (#670)
* Add kv.tx.bucket.Clear() and db.ClearBuckets() methods

* Add kv.tx.bucket.Clear() and db.ClearBuckets() methods

* choose db based on file suffix

* implement db.id method

* implement db.id method

* use ethdb.NewDatabase method

* use ethb.MustOpen method

* cleanup

* support TEST_DB env flag

* create db path automatically needed

* bolt - don't change prefix on happy path
2020-06-16 14:36:16 +01:00

447 lines
12 KiB
Go

// Copyright 2019 The turbo-geth authors
// This file is part of the turbo-geth library.
//
// The turbo-geth library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The turbo-geth library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the turbo-geth library. If not, see <http://www.gnu.org/licenses/>.
package ethdb
import (
"bytes"
"context"
"os"
"time"
"github.com/dgraph-io/badger/v2"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/log"
)
// https://github.com/dgraph-io/badger#garbage-collection
const gcPeriod = 5 * time.Minute
// BadgerDatabase is a wrapper over BadgerDb,
// compatible with the Database interface.
type BadgerDatabase struct {
db *badger.DB // BadgerDB instance
log log.Logger // Contextual logger tracking the database path
tmpDir string // Temporary data directory
gcTicker *time.Ticker // Garbage Collector
id uint64
}
// NewBadgerDatabase returns a BadgerDB wrapper.
func NewBadgerDatabase(dir string) (*BadgerDatabase, error) {
logger := log.New("database", dir)
options := badger.DefaultOptions(dir).WithMaxTableSize(128 << 20) // 128Mb, default 64Mb
db, err := badger.Open(options)
if err != nil {
return nil, err
}
// Start GC in background
ticker := time.NewTicker(gcPeriod)
go func() {
// DB.RunValueLogGC(): This method is designed to do garbage collection while Badger is online. Along with randomly picking a file, it uses statistics generated by the LSM-tree compactions to pick files that are likely to lead to maximum space reclamation. It is recommended to be called during periods of low activity in your system, or periodically. One call would only result in removal of at max one log file. As an optimization, you could also immediately re-run it whenever it returns nil error (indicating a successful value log GC), as shown below.
i := 0
for range ticker.C {
nextFile:
err := db.RunValueLogGC(0.7)
if err == nil {
i++
goto nextFile
}
if err != badger.ErrNoRewrite {
logger.Info("Badger GC happened", "rewritten_vlog_files", i)
i = 0
continue
}
}
}()
return &BadgerDatabase{
db: db,
log: logger,
gcTicker: ticker,
id: id(),
}, nil
}
// NewEphemeralBadger returns a new BadgerDB in a temporary directory.
func NewEphemeralBadger() (*BadgerDatabase, error) {
logger := log.New("db", "badgerInMem")
db, err := badger.Open(badger.DefaultOptions("").WithInMemory(true))
if err != nil {
return nil, err
}
return &BadgerDatabase{
db: db,
log: logger,
}, nil
}
func (db *BadgerDatabase) KV() KV {
return &badgerKV{badger: db.db}
}
// Close closes the database.
func (db *BadgerDatabase) Close() {
if db.gcTicker != nil {
db.gcTicker.Stop()
}
if err := db.db.Close(); err == nil {
db.log.Info("Database closed")
if len(db.tmpDir) > 0 {
os.RemoveAll(db.tmpDir)
}
} else {
db.log.Error("Failed to close database", "err", err)
}
}
func bucketKey(bucket, key []byte) []byte {
composite := make([]byte, 0, len(bucket)+len(key))
composite = append(composite, bucket...)
composite = append(composite, key...)
return composite
}
func keyWithoutBucket(key, bucket []byte) []byte {
if len(key) <= len(bucket) || !bytes.HasPrefix(key, bucket) {
return nil
}
return key[len(bucket):]
}
// Delete removes a single entry.
func (db *BadgerDatabase) Delete(bucket, key []byte) error {
return db.db.Update(func(txn *badger.Txn) error {
return txn.Delete(bucketKey(bucket, key))
})
}
// Put inserts or updates a single entry.
func (db *BadgerDatabase) Put(bucket, key []byte, value []byte) error {
return db.db.Update(func(txn *badger.Txn) error {
return txn.Set(bucketKey(bucket, key), value)
})
}
// Get returns the value for a given key if it's present.
func (db *BadgerDatabase) Get(bucket, key []byte) ([]byte, error) {
var val []byte
err := db.db.View(func(txn *badger.Txn) error {
item, err := txn.Get(bucketKey(bucket, key))
if err != nil {
return err
}
val, err = item.ValueCopy(nil)
return err
})
if err == badger.ErrKeyNotFound {
return nil, ErrKeyNotFound
}
return val, err
}
func (db *BadgerDatabase) GetIndexChunk(bucket, key []byte, timestamp uint64) ([]byte, error) {
var val []byte
var innerErr error
err := db.db.View(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
it.Seek(bucketKey(bucket, dbutils.IndexChunkKey(key, timestamp)))
if it.ValidForPrefix(bucketKey(bucket, key)) {
val, innerErr = it.Item().ValueCopy(nil)
if innerErr != nil {
return innerErr
}
return nil
}
return badger.ErrKeyNotFound
})
if err == badger.ErrKeyNotFound {
return nil, ErrKeyNotFound
}
return val, err
}
// GetAsOf returns the value valid as of a given timestamp.
func (db *BadgerDatabase) GetAsOf(bucket, hBucket, key []byte, timestamp uint64) ([]byte, error) {
composite, _ := dbutils.CompositeKeySuffix(key, timestamp)
var dat []byte
err := db.db.View(func(tx *badger.Txn) error {
{ // first look in the historical dbi
it := tx.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
it.Seek(bucketKey(hBucket, composite))
if it.ValidForPrefix(bucketKey(hBucket, key)) {
var err2 error
dat, err2 = it.Item().ValueCopy(nil)
return err2
}
}
{ // fall back to the current dbi
item, err2 := tx.Get(bucketKey(bucket, key))
if err2 != nil {
return err2
}
dat, err2 = item.ValueCopy(nil)
return err2
}
})
if err == badger.ErrKeyNotFound {
return dat, ErrKeyNotFound
}
return dat, err
}
// Has indicates whether a key exists in the database.
func (db *BadgerDatabase) Has(bucket, key []byte) (bool, error) {
_, err := db.Get(bucket, key)
if err == ErrKeyNotFound {
return false, nil
}
return err == nil, err
}
// Walk iterates over entries with keys greater or equals to startkey.
// Only the keys whose first fixedbits match those of startkey are iterated over.
// walker is called for each eligible entry.
// If walker returns false or an error, the walk stops.
func (db *BadgerDatabase) Walk(bucket, startkey []byte, fixedbits int, walker func(k, v []byte) (bool, error)) error {
fixedbytes, mask := Bytesmask(fixedbits)
prefix := bucketKey(bucket, startkey)
err := db.db.View(func(tx *badger.Txn) error {
it := tx.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
for it.Seek(prefix); it.Valid(); it.Next() {
item := it.Item()
k := keyWithoutBucket(item.Key(), bucket)
if k == nil {
break
}
goOn := fixedbits == 0 || bytes.Equal(k[:fixedbytes-1], startkey[:fixedbytes-1]) && (k[fixedbytes-1]&mask) == (startkey[fixedbytes-1]&mask)
if !goOn {
break
}
err := item.Value(func(v []byte) error {
var err2 error
goOn, err2 = walker(k, v)
return err2
})
if err != nil {
return err
}
if !goOn {
break
}
}
return nil
})
return err
}
// MultiWalk is similar to multiple Walk calls folded into one.
func (db *BadgerDatabase) MultiWalk(bucket []byte, startkeys [][]byte, fixedbits []int, walker func(int, []byte, []byte) error) error {
if len(startkeys) == 0 {
return nil
}
rangeIdx := 0 // What is the current range we are extracting
fixedbytes, mask := Bytesmask(fixedbits[rangeIdx])
startkey := startkeys[rangeIdx]
err := db.db.View(func(tx *badger.Txn) error {
it := tx.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
for it.Seek(bucketKey(bucket, startkey)); it.Valid(); it.Next() {
item := it.Item()
k := keyWithoutBucket(item.Key(), bucket)
if k == nil {
return nil
}
// Adjust rangeIdx if needed
if fixedbytes > 0 {
cmp := int(-1)
for cmp != 0 {
cmp = bytes.Compare(k[:fixedbytes-1], startkey[:fixedbytes-1])
if cmp == 0 {
k1 := k[fixedbytes-1] & mask
k2 := startkey[fixedbytes-1] & mask
if k1 < k2 {
cmp = -1
} else if k1 > k2 {
cmp = 1
}
}
if cmp < 0 {
it.Seek(bucketKey(bucket, startkey))
if !it.Valid() {
return nil
}
item = it.Item()
k = keyWithoutBucket(item.Key(), bucket)
if k == nil {
return nil
}
} else if cmp > 0 {
rangeIdx++
if rangeIdx == len(startkeys) {
return nil
}
fixedbytes, mask = Bytesmask(fixedbits[rangeIdx])
startkey = startkeys[rangeIdx]
}
}
}
err := item.Value(func(v []byte) error {
if len(v) == 0 {
return nil
}
return walker(rangeIdx, k, v)
})
if err != nil {
return err
}
}
return nil
})
return err
}
// MultiPut inserts or updates multiple entries.
// Entries are passed as an array:
// bucket0, key0, val0, bucket1, key1, val1, ...
func (db *BadgerDatabase) MultiPut(triplets ...[]byte) (uint64, error) {
l := len(triplets)
err := db.db.Update(func(tx *badger.Txn) error {
for i := 0; i < l; i += 3 {
bucket := triplets[i]
key := triplets[i+1]
val := triplets[i+2]
if err := tx.Set(bucketKey(bucket, key), val); err != nil {
return err
}
}
return nil
})
if err != nil {
return 0, err
}
return uint64(l / 3), err
}
func (db *BadgerDatabase) NewBatch() DbWithPendingMutations {
m := &mutation{
db: db,
puts: newPuts(),
}
return m
}
// IdealBatchSize defines the size of the data batches should ideally add in one write.
func (db *BadgerDatabase) IdealBatchSize() int {
return int(db.db.MaxBatchSize() / 2)
}
// DiskSize returns the total disk size of the database in bytes.
func (db *BadgerDatabase) DiskSize(_ context.Context) (common.StorageSize, error) {
lsm, vlog := db.db.Size()
return common.StorageSize(lsm + vlog), nil
}
func (db *BadgerDatabase) ClearBuckets(buckets ...[]byte) error {
for _, bucket := range buckets {
if err := db.db.DropPrefix(bucket); err != nil {
return err
}
}
return nil
}
func (db *BadgerDatabase) BucketsStat(ctx context.Context) (map[string]common.StorageBucketWriteStats, error) {
return nil, nil
}
// MemCopy creates a copy of the database in a temporary directory.
// We don't do it in memory because BadgerDB doesn't support that.
func (db *BadgerDatabase) MemCopy() Database {
newDb, err := NewEphemeralBadger()
if err != nil {
panic("failed to create tmp database: " + err.Error())
}
err = db.db.View(func(readTx *badger.Txn) error {
return newDb.db.Update(func(writeTx *badger.Txn) error {
it := readTx.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
k := item.Key()
err2 := item.Value(func(v []byte) error {
return writeTx.Set(k, v)
})
if err2 != nil {
return err2
}
}
return nil
})
})
if err != nil {
panic(err)
}
return newDb
}
// TODO [Issue 144] Implement the methods
func (db *BadgerDatabase) WalkAsOf(bucket, hBucket, startkey []byte, fixedbits int, timestamp uint64, walker func([]byte, []byte) (bool, error)) error {
panic("Not implemented")
}
func (db *BadgerDatabase) Keys() ([][]byte, error) {
panic("Not implemented")
}
func (db *BadgerDatabase) Ancients() (uint64, error) {
return 0, errNotSupported
}
func (db *BadgerDatabase) TruncateAncients(items uint64) error {
return errNotSupported
}
func (db *BadgerDatabase) ID() uint64 {
return db.id
}