Medbx: add label to error messages, UpdateForkChoice: add ctx to erorrs, MemDb: increase db-limit from 512Mb to 512Gb (#8434)

This commit is contained in:
Alex Sharov 2023-10-11 12:53:34 +07:00 committed by GitHub
parent b8d8003618
commit 404719c292
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 20 additions and 15 deletions

View File

@ -636,6 +636,7 @@ func openClient(dbDir, snapDir string, cfg *torrent.ClientConfig) (db kv.RwDB, c
WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.DownloaderTablesCfg }).
SyncPeriod(15 * time.Second).
GrowthStep(16 * datasize.MB).
MapSize(16 * datasize.GB).
Path(dbDir).
Open()
if err != nil {

View File

@ -144,7 +144,7 @@ func (opts MdbxOpts) InMem(tmpDir string) MdbxOpts {
}
opts.path = path
opts.inMem = true
opts.flags = mdbx.UtterlyNoSync | mdbx.NoMetaSync | mdbx.LifoReclaim | mdbx.NoMemInit
opts.flags = mdbx.UtterlyNoSync | mdbx.NoMetaSync | mdbx.NoMemInit
opts.growthStep = 2 * datasize.MB
opts.mapSize = 512 * datasize.MB
opts.shrinkThreshold = 0 // disable
@ -1360,12 +1360,12 @@ func (c *MdbxCursor) Put(key []byte, value []byte) error {
b := c.bucketCfg
if b.AutoDupSortKeysConversion {
if err := c.putDupSort(key, value); err != nil {
return err
return fmt.Errorf("label: %s, table: %s, err: %w", c.tx.db.opts.label, c.bucketName, err)
}
return nil
}
if err := c.put(key, value); err != nil {
return fmt.Errorf("table: %s, err: %w", c.bucketName, err)
return fmt.Errorf("label: %s, table: %s, err: %w", c.tx.db.opts.label, c.bucketName, err)
}
return nil
}
@ -1374,7 +1374,7 @@ func (c *MdbxCursor) putDupSort(key []byte, value []byte) error {
b := c.bucketCfg
from, to := b.DupFromLen, b.DupToLen
if len(key) != from && len(key) >= to {
return fmt.Errorf("put dupsort bucket: %s, can have keys of len==%d and len<%d. key: %x,%d", c.bucketName, from, to, key, len(key))
return fmt.Errorf("label: %s, table: %s, can have keys of len==%d and len<%d. key: %x,%d", c.tx.db.opts.label, c.bucketName, from, to, key, len(key))
}
if len(key) != from {
@ -1383,7 +1383,7 @@ func (c *MdbxCursor) putDupSort(key []byte, value []byte) error {
if mdbx.IsKeyExists(err) {
return c.putCurrent(key, value)
}
return fmt.Errorf("putNoOverwrite, bucket: %s, key: %x, val: %x, err: %w", c.bucketName, key, value, err)
return fmt.Errorf("label: %s, putNoOverwrite, bucket: %s, key: %x, val: %x, err: %w", c.tx.db.opts.label, c.bucketName, key, value, err)
}
return nil
}
@ -1446,7 +1446,7 @@ func (c *MdbxCursor) Append(k []byte, v []byte) error {
b := c.bucketCfg
from, to := b.DupFromLen, b.DupToLen
if len(k) != from && len(k) >= to {
return fmt.Errorf("append dupsort bucket: %s, can have keys of len==%d and len<%d. key: %x,%d", c.bucketName, from, to, k, len(k))
return fmt.Errorf("label: %s, append dupsort bucket: %s, can have keys of len==%d and len<%d. key: %x,%d", c.tx.db.opts.label, c.bucketName, from, to, k, len(k))
}
if len(k) == from {
@ -1457,13 +1457,13 @@ func (c *MdbxCursor) Append(k []byte, v []byte) error {
if c.bucketCfg.Flags&mdbx.DupSort != 0 {
if err := c.c.Put(k, v, mdbx.AppendDup); err != nil {
return fmt.Errorf("bucket: %s, %w", c.bucketName, err)
return fmt.Errorf("label: %s, bucket: %s, %w", c.tx.db.opts.label, c.bucketName, err)
}
return nil
}
if err := c.c.Put(k, v, mdbx.Append); err != nil {
return fmt.Errorf("bucket: %s, %w", c.bucketName, err)
return fmt.Errorf("label: %s, bucket: %s, %w", c.tx.db.opts.label, c.bucketName, err)
}
return nil
}
@ -1588,21 +1588,21 @@ func (c *MdbxDupSortCursor) LastDup() ([]byte, error) {
func (c *MdbxDupSortCursor) Append(k []byte, v []byte) error {
if err := c.c.Put(k, v, mdbx.Append|mdbx.AppendDup); err != nil {
return fmt.Errorf("in Append: bucket=%s, %w", c.bucketName, err)
return fmt.Errorf("label: %s, in Append: bucket=%s, %w", c.tx.db.opts.label, c.bucketName, err)
}
return nil
}
func (c *MdbxDupSortCursor) AppendDup(k []byte, v []byte) error {
if err := c.c.Put(k, v, mdbx.AppendDup); err != nil {
return fmt.Errorf("in AppendDup: bucket=%s, %w", c.bucketName, err)
return fmt.Errorf("label: %s, in AppendDup: bucket=%s, %w", c.tx.db.opts.label, c.bucketName, err)
}
return nil
}
func (c *MdbxDupSortCursor) PutNoDupData(k, v []byte) error {
if err := c.c.Put(k, v, mdbx.NoDupData); err != nil {
return fmt.Errorf("in PutNoDupData: %w", err)
return fmt.Errorf("label: %s, in PutNoDupData: %w", c.tx.db.opts.label, err)
}
return nil
@ -1611,7 +1611,7 @@ func (c *MdbxDupSortCursor) PutNoDupData(k, v []byte) error {
// DeleteCurrentDuplicates - delete all of the data items for the current key.
func (c *MdbxDupSortCursor) DeleteCurrentDuplicates() error {
if err := c.delAllDupData(); err != nil {
return fmt.Errorf("in DeleteCurrentDuplicates: %w", err)
return fmt.Errorf("label: %s,in DeleteCurrentDuplicates: %w", c.tx.db.opts.label, err)
}
return nil
}

View File

@ -18,6 +18,7 @@ import (
"context"
"unsafe"
"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv/iter"
"github.com/ledgerwatch/erigon-lib/kv/order"
@ -45,7 +46,7 @@ type MemoryMutation struct {
// ... some calculations on `batch`
// batch.Commit()
func NewMemoryBatch(tx kv.Tx, tmpDir string) *MemoryMutation {
tmpDB := mdbx.NewMDBX(log.New()).InMem(tmpDir).MustOpen()
tmpDB := mdbx.NewMDBX(log.New()).InMem(tmpDir).GrowthStep(64 * datasize.MB).MapSize(512 * datasize.GB).MustOpen()
memTx, err := tmpDB.BeginRw(context.Background())
if err != nil {
panic(err)

View File

@ -118,7 +118,7 @@ func newPersistentDB(logger log.Logger, path string) (*DB, error) {
Path(path).
Label(kv.SentryDB).
WithTableCfg(bucketsConfig).
MapSize(1024 * datasize.MB).
MapSize(8 * datasize.GB).
GrowthStep(16 * datasize.MB).
Flags(func(f uint) uint { return f ^ mdbx1.Durable | mdbx1.SafeNoSync }).
SyncPeriod(2 * time.Second).

View File

@ -260,7 +260,7 @@ func (fv *ForkValidator) validateAndStorePayload(tx kv.RwTx, header *types.Heade
if errors.Is(err, consensus.ErrInvalidBlock) {
validationError = err
} else {
criticalError = err
criticalError = fmt.Errorf("validateAndStorePayload: %w", err)
return
}
}

View File

@ -235,6 +235,7 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, blockHas
// Run the unwind
if err := e.executionPipeline.RunUnwind(e.db, tx); err != nil {
err = fmt.Errorf("updateForkChoice: %w", err)
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
@ -299,6 +300,7 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, blockHas
}
// Run the forkchoice
if err := e.executionPipeline.Run(e.db, tx, false); err != nil {
err = fmt.Errorf("updateForkChoice: %w", err)
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
@ -351,6 +353,7 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, blockHas
}
if err := e.db.Update(ctx, func(tx kv.RwTx) error { return e.executionPipeline.RunPrune(e.db, tx, false) }); err != nil {
err = fmt.Errorf("updateForkChoice: %w", err)
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}