erigon-pulse/kv/helpers.go

177 lines
3.5 KiB
Go

package kv
import (
"context"
"fmt"
"os"
"time"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/torquem-ch/mdbx-go/mdbx"
"go.uber.org/atomic"
)
func DefaultPageSize() uint64 {
osPageSize := os.Getpagesize()
if osPageSize < 4096 { // reduce further may lead to errors (because some data is just big)
osPageSize = 4096
} else if osPageSize > mdbx.MaxPageSize {
osPageSize = mdbx.MaxPageSize
}
osPageSize = osPageSize / 4096 * 4096 // ensure it's rounded
return uint64(osPageSize)
}
// BigChunks - read `table` by big chunks - restart read transaction after each 1 minutes
func BigChunks(db RoDB, table string, from []byte, walker func(tx Tx, k, v []byte) (bool, error)) error {
rollbackEvery := time.NewTicker(1 * time.Minute)
var stop bool
for !stop {
if err := db.View(context.Background(), func(tx Tx) error {
c, err := tx.Cursor(table)
if err != nil {
return err
}
defer c.Close()
k, v, err := c.Seek(from)
Loop:
for ; k != nil; k, v, err = c.Next() {
if err != nil {
return err
}
// break loop before walker() call, to make sure all keys are received by walker() exactly once
select {
case <-rollbackEvery.C:
break Loop
default:
}
ok, err := walker(tx, k, v)
if err != nil {
return err
}
if !ok {
stop = true
break
}
}
if k == nil {
stop = true
}
from = common.Copy(k) // next transaction will start from this key
return nil
}); err != nil {
return err
}
}
return nil
}
var (
bytesTrue = []byte{1}
bytesFalse = []byte{0}
)
func bytes2bool(in []byte) bool {
if len(in) < 1 {
return false
}
return in[0] == 1
}
var ErrChanged = fmt.Errorf("key must not change")
// EnsureNotChangedBool - used to store immutable config flags in db. protects from human mistakes
func EnsureNotChangedBool(tx GetPut, bucket string, k []byte, value bool) (ok, enabled bool, err error) {
vBytes, err := tx.GetOne(bucket, k)
if err != nil {
return false, enabled, err
}
if vBytes == nil {
if value {
vBytes = bytesTrue
} else {
vBytes = bytesFalse
}
if err := tx.Put(bucket, k, vBytes); err != nil {
return false, enabled, err
}
}
enabled = bytes2bool(vBytes)
return value == enabled, enabled, nil
}
func GetBool(tx Getter, bucket string, k []byte) (enabled bool, err error) {
vBytes, err := tx.GetOne(bucket, k)
if err != nil {
return false, err
}
return bytes2bool(vBytes), nil
}
func ReadAhead(ctx context.Context, db RoDB, progress *atomic.Bool, table string, from []byte, amount uint32) {
if db == nil || progress.Load() {
return
}
progress.Store(true)
go func() {
defer progress.Store(false)
_ = db.View(ctx, func(tx Tx) error {
c, err := tx.Cursor(table)
if err != nil {
return err
}
defer c.Close()
for k, _, err := c.Seek(from); k != nil && amount > 0; k, _, err = c.Next() {
if err != nil {
return err
}
amount--
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}
return nil
})
}()
}
// FirstKey - candidate on move to kv.Tx interface
func FirstKey(tx Tx, table string) ([]byte, error) {
c, err := tx.Cursor(table)
if err != nil {
return nil, err
}
defer c.Close()
k, _, err := c.First()
if err != nil {
return nil, err
}
return k, nil
}
// LastKey - candidate on move to kv.Tx interface
func LastKey(tx Tx, table string) ([]byte, error) {
c, err := tx.Cursor(table)
if err != nil {
return nil, err
}
defer c.Close()
k, _, err := c.Last()
if err != nil {
return nil, err
}
return k, nil
}