go-pulse/ethdb/pebble/pebble.go
Martin Holst Swende 713fc8bbe6
ethdb/pebble: fsync for batch writes ()
This is likely the culprit behind several data corruption issues, e.g. where data has been
written to the freezer, but the deletion from pebble does not go through due to process
crash.
2023-06-21 18:08:12 +02:00

620 lines
21 KiB
Go

// Copyright 2023 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
//go:build (arm64 || amd64) && !openbsd
// Package pebble implements the key-value database layer based on pebble.
package pebble
import (
"bytes"
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/bloom"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)
const (
// minCache is the minimum amount of memory in megabytes to allocate to pebble
// read and write caching, split half and half.
minCache = 16
// minHandles is the minimum number of files handles to allocate to the open
// database files.
minHandles = 16
// metricsGatheringInterval specifies the interval to retrieve pebble database
// compaction, io and pause stats to report to the user.
metricsGatheringInterval = 3 * time.Second
)
// Database is a persistent key-value store based on the pebble storage engine.
// Apart from basic data storage functionality it also supports batch writes and
// iterating over the keyspace in binary-alphabetical order.
type Database struct {
fn string // filename for reporting
db *pebble.DB // Underlying pebble storage engine
compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction
compReadMeter metrics.Meter // Meter for measuring the data read during compaction
compWriteMeter metrics.Meter // Meter for measuring the data written during compaction
writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction
writeDelayMeter metrics.Meter // Meter for measuring the write delay duration due to database compaction
diskSizeGauge metrics.Gauge // Gauge for tracking the size of all the levels in the database
diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read
diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written
memCompGauge metrics.Gauge // Gauge for tracking the number of memory compaction
level0CompGauge metrics.Gauge // Gauge for tracking the number of table compaction in level0
nonlevel0CompGauge metrics.Gauge // Gauge for tracking the number of table compaction in non0 level
seekCompGauge metrics.Gauge // Gauge for tracking the number of table compaction caused by read opt
manualMemAllocGauge metrics.Gauge // Gauge for tracking amount of non-managed memory currently allocated
quitLock sync.RWMutex // Mutex protecting the quit channel and the closed flag
quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
closed bool // keep track of whether we're Closed
log log.Logger // Contextual logger tracking the database path
activeComp int // Current number of active compactions
compStartTime time.Time // The start time of the earliest currently-active compaction
compTime atomic.Int64 // Total time spent in compaction in ns
level0Comp atomic.Uint32 // Total number of level-zero compactions
nonLevel0Comp atomic.Uint32 // Total number of non level-zero compactions
writeDelayStartTime time.Time // The start time of the latest write stall
writeDelayCount atomic.Int64 // Total number of write stall counts
writeDelayTime atomic.Int64 // Total time spent in write stalls
}
func (d *Database) onCompactionBegin(info pebble.CompactionInfo) {
if d.activeComp == 0 {
d.compStartTime = time.Now()
}
l0 := info.Input[0]
if l0.Level == 0 {
d.level0Comp.Add(1)
} else {
d.nonLevel0Comp.Add(1)
}
d.activeComp++
}
func (d *Database) onCompactionEnd(info pebble.CompactionInfo) {
if d.activeComp == 1 {
d.compTime.Add(int64(time.Since(d.compStartTime)))
} else if d.activeComp == 0 {
panic("should not happen")
}
d.activeComp--
}
func (d *Database) onWriteStallBegin(b pebble.WriteStallBeginInfo) {
d.writeDelayStartTime = time.Now()
}
func (d *Database) onWriteStallEnd() {
d.writeDelayTime.Add(int64(time.Since(d.writeDelayStartTime)))
}
// New returns a wrapped pebble DB object. The namespace is the prefix that the
// metrics reporting should use for surfacing internal stats.
func New(file string, cache int, handles int, namespace string, readonly bool) (*Database, error) {
// Ensure we have some minimal caching and file guarantees
if cache < minCache {
cache = minCache
}
if handles < minHandles {
handles = minHandles
}
logger := log.New("database", file)
logger.Info("Allocated cache and file handles", "cache", common.StorageSize(cache*1024*1024), "handles", handles)
// The max memtable size is limited by the uint32 offsets stored in
// internal/arenaskl.node, DeferredBatchOp, and flushableBatchEntry.
// Taken from https://github.com/cockroachdb/pebble/blob/master/open.go#L38
maxMemTableSize := 4<<30 - 1 // Capped by 4 GB
// Two memory tables is configured which is identical to leveldb,
// including a frozen memory table and another live one.
memTableLimit := 2
memTableSize := cache * 1024 * 1024 / 2 / memTableLimit
if memTableSize > maxMemTableSize {
memTableSize = maxMemTableSize
}
db := &Database{
fn: file,
log: logger,
quitChan: make(chan chan error),
}
opt := &pebble.Options{
// Pebble has a single combined cache area and the write
// buffers are taken from this too. Assign all available
// memory allowance for cache.
Cache: pebble.NewCache(int64(cache * 1024 * 1024)),
MaxOpenFiles: handles,
// The size of memory table(as well as the write buffer).
// Note, there may have more than two memory tables in the system.
MemTableSize: memTableSize,
// MemTableStopWritesThreshold places a hard limit on the size
// of the existent MemTables(including the frozen one).
// Note, this must be the number of tables not the size of all memtables
// according to https://github.com/cockroachdb/pebble/blob/master/options.go#L738-L742
// and to https://github.com/cockroachdb/pebble/blob/master/db.go#L1892-L1903.
MemTableStopWritesThreshold: memTableLimit,
// The default compaction concurrency(1 thread),
// Here use all available CPUs for faster compaction.
MaxConcurrentCompactions: func() int { return runtime.NumCPU() },
// Per-level options. Options for at least one level must be specified. The
// options for the last level are used for all subsequent levels.
Levels: []pebble.LevelOptions{
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
},
ReadOnly: readonly,
EventListener: &pebble.EventListener{
CompactionBegin: db.onCompactionBegin,
CompactionEnd: db.onCompactionEnd,
WriteStallBegin: db.onWriteStallBegin,
WriteStallEnd: db.onWriteStallEnd,
},
}
// Disable seek compaction explicitly. Check https://github.com/ethereum/go-ethereum/pull/20130
// for more details.
opt.Experimental.ReadSamplingMultiplier = -1
// Open the db and recover any potential corruptions
innerDB, err := pebble.Open(file, opt)
if err != nil {
return nil, err
}
db.db = innerDB
db.compTimeMeter = metrics.NewRegisteredMeter(namespace+"compact/time", nil)
db.compReadMeter = metrics.NewRegisteredMeter(namespace+"compact/input", nil)
db.compWriteMeter = metrics.NewRegisteredMeter(namespace+"compact/output", nil)
db.diskSizeGauge = metrics.NewRegisteredGauge(namespace+"disk/size", nil)
db.diskReadMeter = metrics.NewRegisteredMeter(namespace+"disk/read", nil)
db.diskWriteMeter = metrics.NewRegisteredMeter(namespace+"disk/write", nil)
db.writeDelayMeter = metrics.NewRegisteredMeter(namespace+"compact/writedelay/duration", nil)
db.writeDelayNMeter = metrics.NewRegisteredMeter(namespace+"compact/writedelay/counter", nil)
db.memCompGauge = metrics.NewRegisteredGauge(namespace+"compact/memory", nil)
db.level0CompGauge = metrics.NewRegisteredGauge(namespace+"compact/level0", nil)
db.nonlevel0CompGauge = metrics.NewRegisteredGauge(namespace+"compact/nonlevel0", nil)
db.seekCompGauge = metrics.NewRegisteredGauge(namespace+"compact/seek", nil)
db.manualMemAllocGauge = metrics.NewRegisteredGauge(namespace+"memory/manualalloc", nil)
// Start up the metrics gathering and return
go db.meter(metricsGatheringInterval)
return db, nil
}
// Close stops the metrics collection, flushes any pending data to disk and closes
// all io accesses to the underlying key-value store.
func (d *Database) Close() error {
d.quitLock.Lock()
defer d.quitLock.Unlock()
// Allow double closing, simplifies things
if d.closed {
return nil
}
d.closed = true
if d.quitChan != nil {
errc := make(chan error)
d.quitChan <- errc
if err := <-errc; err != nil {
d.log.Error("Metrics collection failed", "err", err)
}
d.quitChan = nil
}
return d.db.Close()
}
// Has retrieves if a key is present in the key-value store.
func (d *Database) Has(key []byte) (bool, error) {
d.quitLock.RLock()
defer d.quitLock.RUnlock()
if d.closed {
return false, pebble.ErrClosed
}
_, closer, err := d.db.Get(key)
if err == pebble.ErrNotFound {
return false, nil
} else if err != nil {
return false, err
}
closer.Close()
return true, nil
}
// Get retrieves the given key if it's present in the key-value store.
func (d *Database) Get(key []byte) ([]byte, error) {
d.quitLock.RLock()
defer d.quitLock.RUnlock()
if d.closed {
return nil, pebble.ErrClosed
}
dat, closer, err := d.db.Get(key)
if err != nil {
return nil, err
}
ret := make([]byte, len(dat))
copy(ret, dat)
closer.Close()
return ret, nil
}
// Put inserts the given value into the key-value store.
func (d *Database) Put(key []byte, value []byte) error {
d.quitLock.RLock()
defer d.quitLock.RUnlock()
if d.closed {
return pebble.ErrClosed
}
return d.db.Set(key, value, pebble.NoSync)
}
// Delete removes the key from the key-value store.
func (d *Database) Delete(key []byte) error {
d.quitLock.RLock()
defer d.quitLock.RUnlock()
if d.closed {
return pebble.ErrClosed
}
return d.db.Delete(key, nil)
}
// NewBatch creates a write-only key-value store that buffers changes to its host
// database until a final write is called.
func (d *Database) NewBatch() ethdb.Batch {
return &batch{
b: d.db.NewBatch(),
db: d,
}
}
// NewBatchWithSize creates a write-only database batch with pre-allocated buffer.
// It's not supported by pebble, but pebble has better memory allocation strategy
// which turns out a lot faster than leveldb. It's performant enough to construct
// batch object without any pre-allocated space.
func (d *Database) NewBatchWithSize(_ int) ethdb.Batch {
return &batch{
b: d.db.NewBatch(),
db: d,
}
}
// snapshot wraps a pebble snapshot for implementing the Snapshot interface.
type snapshot struct {
db *pebble.Snapshot
}
// NewSnapshot creates a database snapshot based on the current state.
// The created snapshot will not be affected by all following mutations
// happened on the database.
// Note don't forget to release the snapshot once it's used up, otherwise
// the stale data will never be cleaned up by the underlying compactor.
func (d *Database) NewSnapshot() (ethdb.Snapshot, error) {
snap := d.db.NewSnapshot()
return &snapshot{db: snap}, nil
}
// Has retrieves if a key is present in the snapshot backing by a key-value
// data store.
func (snap *snapshot) Has(key []byte) (bool, error) {
_, closer, err := snap.db.Get(key)
if err != nil {
if err != pebble.ErrNotFound {
return false, err
} else {
return false, nil
}
}
closer.Close()
return true, nil
}
// Get retrieves the given key if it's present in the snapshot backing by
// key-value data store.
func (snap *snapshot) Get(key []byte) ([]byte, error) {
dat, closer, err := snap.db.Get(key)
if err != nil {
return nil, err
}
ret := make([]byte, len(dat))
copy(ret, dat)
closer.Close()
return ret, nil
}
// Release releases associated resources. Release should always succeed and can
// be called multiple times without causing error.
func (snap *snapshot) Release() {
snap.db.Close()
}
// upperBound returns the upper bound for the given prefix
func upperBound(prefix []byte) (limit []byte) {
for i := len(prefix) - 1; i >= 0; i-- {
c := prefix[i]
if c == 0xff {
continue
}
limit = make([]byte, i+1)
copy(limit, prefix)
limit[i] = c + 1
break
}
return limit
}
// Stat returns a particular internal stat of the database.
func (d *Database) Stat(property string) (string, error) {
return "", nil
}
// Compact flattens the underlying data store for the given key range. In essence,
// deleted and overwritten versions are discarded, and the data is rearranged to
// reduce the cost of operations needed to access them.
//
// A nil start is treated as a key before all keys in the data store; a nil limit
// is treated as a key after all keys in the data store. If both is nil then it
// will compact entire data store.
func (d *Database) Compact(start []byte, limit []byte) error {
// There is no special flag to represent the end of key range
// in pebble(nil in leveldb). Use an ugly hack to construct a
// large key to represent it.
// Note any prefixed database entry will be smaller than this
// flag, as for trie nodes we need the 32 byte 0xff because
// there might be a shared prefix starting with a number of
// 0xff-s, so 32 ensures than only a hash collision could touch it.
// https://github.com/cockroachdb/pebble/issues/2359#issuecomment-1443995833
if limit == nil {
limit = bytes.Repeat([]byte{0xff}, 32)
}
return d.db.Compact(start, limit, true) // Parallelization is preferred
}
// Path returns the path to the database directory.
func (d *Database) Path() string {
return d.fn
}
// meter periodically retrieves internal pebble counters and reports them to
// the metrics subsystem.
func (d *Database) meter(refresh time.Duration) {
var errc chan error
timer := time.NewTimer(refresh)
defer timer.Stop()
// Create storage and warning log tracer for write delay.
var (
compTimes [2]int64
writeDelayTimes [2]int64
writeDelayCounts [2]int64
compWrites [2]int64
compReads [2]int64
nWrites [2]int64
)
// Iterate ad infinitum and collect the stats
for i := 1; errc == nil; i++ {
var (
compWrite int64
compRead int64
nWrite int64
metrics = d.db.Metrics()
compTime = d.compTime.Load()
writeDelayCount = d.writeDelayCount.Load()
writeDelayTime = d.writeDelayTime.Load()
nonLevel0CompCount = int64(d.nonLevel0Comp.Load())
level0CompCount = int64(d.level0Comp.Load())
)
writeDelayTimes[i%2] = writeDelayTime
writeDelayCounts[i%2] = writeDelayCount
compTimes[i%2] = compTime
for _, levelMetrics := range metrics.Levels {
nWrite += int64(levelMetrics.BytesCompacted)
nWrite += int64(levelMetrics.BytesFlushed)
compWrite += int64(levelMetrics.BytesCompacted)
compRead += int64(levelMetrics.BytesRead)
}
nWrite += int64(metrics.WAL.BytesWritten)
compWrites[i%2] = compWrite
compReads[i%2] = compRead
nWrites[i%2] = nWrite
if d.writeDelayNMeter != nil {
d.writeDelayNMeter.Mark(writeDelayCounts[i%2] - writeDelayCounts[(i-1)%2])
}
if d.writeDelayMeter != nil {
d.writeDelayMeter.Mark(writeDelayTimes[i%2] - writeDelayTimes[(i-1)%2])
}
if d.compTimeMeter != nil {
d.compTimeMeter.Mark(compTimes[i%2] - compTimes[(i-1)%2])
}
if d.compReadMeter != nil {
d.compReadMeter.Mark(compReads[i%2] - compReads[(i-1)%2])
}
if d.compWriteMeter != nil {
d.compWriteMeter.Mark(compWrites[i%2] - compWrites[(i-1)%2])
}
if d.diskSizeGauge != nil {
d.diskSizeGauge.Update(int64(metrics.DiskSpaceUsage()))
}
if d.diskReadMeter != nil {
d.diskReadMeter.Mark(0) // pebble doesn't track non-compaction reads
}
if d.diskWriteMeter != nil {
d.diskWriteMeter.Mark(nWrites[i%2] - nWrites[(i-1)%2])
}
// See https://github.com/cockroachdb/pebble/pull/1628#pullrequestreview-1026664054
manuallyAllocated := metrics.BlockCache.Size + int64(metrics.MemTable.Size) + int64(metrics.MemTable.ZombieSize)
d.manualMemAllocGauge.Update(manuallyAllocated)
d.memCompGauge.Update(metrics.Flush.Count)
d.nonlevel0CompGauge.Update(nonLevel0CompCount)
d.level0CompGauge.Update(level0CompCount)
d.seekCompGauge.Update(metrics.Compact.ReadCount)
// Sleep a bit, then repeat the stats collection
select {
case errc = <-d.quitChan:
// Quit requesting, stop hammering the database
case <-timer.C:
timer.Reset(refresh)
// Timeout, gather a new set of stats
}
}
errc <- nil
}
// batch is a write-only batch that commits changes to its host database
// when Write is called. A batch cannot be used concurrently.
type batch struct {
b *pebble.Batch
db *Database
size int
}
// Put inserts the given value into the batch for later committing.
func (b *batch) Put(key, value []byte) error {
b.b.Set(key, value, nil)
b.size += len(key) + len(value)
return nil
}
// Delete inserts the a key removal into the batch for later committing.
func (b *batch) Delete(key []byte) error {
b.b.Delete(key, nil)
b.size += len(key)
return nil
}
// ValueSize retrieves the amount of data queued up for writing.
func (b *batch) ValueSize() int {
return b.size
}
// Write flushes any accumulated data to disk.
func (b *batch) Write() error {
b.db.quitLock.RLock()
defer b.db.quitLock.RUnlock()
if b.db.closed {
return pebble.ErrClosed
}
return b.b.Commit(pebble.Sync)
}
// Reset resets the batch for reuse.
func (b *batch) Reset() {
b.b.Reset()
b.size = 0
}
// Replay replays the batch contents.
func (b *batch) Replay(w ethdb.KeyValueWriter) error {
reader := b.b.Reader()
for {
kind, k, v, ok := reader.Next()
if !ok {
break
}
// The (k,v) slices might be overwritten if the batch is reset/reused,
// and the receiver should copy them if they are to be retained long-term.
if kind == pebble.InternalKeyKindSet {
w.Put(k, v)
} else if kind == pebble.InternalKeyKindDelete {
w.Delete(k)
} else {
return fmt.Errorf("unhandled operation, keytype: %v", kind)
}
}
return nil
}
// pebbleIterator is a wrapper of underlying iterator in storage engine.
// The purpose of this structure is to implement the missing APIs.
type pebbleIterator struct {
iter *pebble.Iterator
moved bool
}
// NewIterator creates a binary-alphabetical iterator over a subset
// of database content with a particular key prefix, starting at a particular
// initial key (or after, if it does not exist).
func (d *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator {
iter := d.db.NewIter(&pebble.IterOptions{
LowerBound: append(prefix, start...),
UpperBound: upperBound(prefix),
})
iter.First()
return &pebbleIterator{iter: iter, moved: true}
}
// Next moves the iterator to the next key/value pair. It returns whether the
// iterator is exhausted.
func (iter *pebbleIterator) Next() bool {
if iter.moved {
iter.moved = false
return iter.iter.Valid()
}
return iter.iter.Next()
}
// Error returns any accumulated error. Exhausting all the key/value pairs
// is not considered to be an error.
func (iter *pebbleIterator) Error() error {
return iter.iter.Error()
}
// Key returns the key of the current key/value pair, or nil if done. The caller
// should not modify the contents of the returned slice, and its contents may
// change on the next call to Next.
func (iter *pebbleIterator) Key() []byte {
return iter.iter.Key()
}
// Value returns the value of the current key/value pair, or nil if done. The
// caller should not modify the contents of the returned slice, and its contents
// may change on the next call to Next.
func (iter *pebbleIterator) Value() []byte {
return iter.iter.Value()
}
// Release releases associated resources. Release should always succeed and can
// be called multiple times without causing error.
func (iter *pebbleIterator) Release() { iter.iter.Close() }