HashState: parallel promote cleanly (#6512)

This commit is contained in:
Alex Sharov 2023-01-09 11:01:21 +07:00 committed by GitHub
parent 7aa056e179
commit e88a5aa2e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 165 additions and 117 deletions

View File

@ -13,11 +13,8 @@ type estimatedRamPerWorker datasize.ByteSize
// Workers - return max workers amount based on total Memory/CPU's and estimated RAM per worker
func (r estimatedRamPerWorker) Workers() int {
// 50% of TotalMemory. Better don't count on 100% because OOM Killer may have aggressive defaults and other software may need RAM
totalMemory := memory.TotalMemory() / 2
maxWorkersForGivenMemory := totalMemory / uint64(r)
maxWorkersForGivenCPU := runtime.NumCPU() - 1 // reserve 1 cpu for "work-producer thread", also IO software on machine in cloud-providers using 1 CPU
return cmp.InRange(1, maxWorkersForGivenCPU, int(maxWorkersForGivenMemory))
maxWorkersForGivenMemory := (memory.TotalMemory() / 2) / uint64(r)
return cmp.Min(AlmostAllCPUs(), int(maxWorkersForGivenMemory))
}
func (r estimatedRamPerWorker) WorkersHalf() int { return cmp.Max(1, r.Workers()/2) }
func (r estimatedRamPerWorker) WorkersQuarter() int { return cmp.Max(1, r.Workers()/4) }
@ -27,3 +24,9 @@ const (
CompressSnapshot = estimatedRamPerWorker(1 * datasize.GB) //1-file-compression is multi-threaded
ReconstituteState = estimatedRamPerWorker(512 * datasize.MB) //state-reconstitution is multi-threaded
)
// AlmostAllCPUs - return all-but-one cpus. Leaving 1 cpu for "work producer", also cloud-providers do recommend leave 1 CPU for their IO software
// user can reduce GOMAXPROCS env variable
func AlmostAllCPUs() int {
return cmp.Max(1, runtime.GOMAXPROCS(-1)-1)
}

View File

@ -5,6 +5,7 @@ import (
"context"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"runtime"
"time"
@ -22,8 +23,9 @@ import (
"github.com/ledgerwatch/erigon/common/math"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types/accounts"
"github.com/ledgerwatch/erigon/eth/ethconfig/estimate"
"github.com/ledgerwatch/log/v3"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
)
type HashStateCfg struct {
@ -77,7 +79,7 @@ func SpawnHashStateStage(s *StageState, tx kv.RwTx, cfg HashStateCfg, ctx contex
return err
}
} else {
if err := promoteHashedStateIncrementally(logPrefix, s.BlockNumber, to, tx, cfg, ctx.Done(), quiet); err != nil {
if err := promoteHashedStateIncrementally(logPrefix, s.BlockNumber, to, tx, cfg, ctx, quiet); err != nil {
return err
}
}
@ -105,7 +107,7 @@ func UnwindHashStateStage(u *UnwindState, s *StageState, tx kv.RwTx, cfg HashSta
}
logPrefix := u.LogPrefix()
if err = unwindHashStateStageImpl(logPrefix, u, s, tx, cfg, ctx.Done()); err != nil {
if err = unwindHashStateStageImpl(logPrefix, u, s, tx, cfg, ctx); err != nil {
return err
}
if err = u.Done(tx); err != nil {
@ -119,10 +121,10 @@ func UnwindHashStateStage(u *UnwindState, s *StageState, tx kv.RwTx, cfg HashSta
return nil
}
func unwindHashStateStageImpl(logPrefix string, u *UnwindState, s *StageState, tx kv.RwTx, cfg HashStateCfg, quit <-chan struct{}) error {
func unwindHashStateStageImpl(logPrefix string, u *UnwindState, s *StageState, tx kv.RwTx, cfg HashStateCfg, ctx context.Context) error {
// Currently it does not require unwinding because it does not create any Intermediate Hash records
// and recomputes the state root from scratch
prom := NewPromoter(tx, cfg.dirs, quit)
prom := NewPromoter(tx, cfg.dirs, ctx)
if cfg.historyV3 {
cfg.agg.SetTx(tx)
if err := prom.UnwindOnHistoryV3(logPrefix, cfg.agg, s.BlockNumber, u.UnwindPoint+1, false, true); err != nil {
@ -149,24 +151,30 @@ func unwindHashStateStageImpl(logPrefix string, u *UnwindState, s *StageState, t
}
func PromoteHashedStateCleanly(logPrefix string, tx kv.RwTx, cfg HashStateCfg, ctx context.Context) error {
kv.ReadAhead(ctx, cfg.db, atomic.NewBool(false), kv.PlainState, nil, math.MaxUint32)
if err := promotePlainState(
logPrefix,
cfg.db,
tx,
cfg.dirs.Tmp,
etl.IdentityLoadFunc,
ctx.Done(),
ctx,
); err != nil {
return err
}
go parallelWarmup(ctx, cfg.db, kv.PlainContractCode, 2)
return etl.Transform(
logPrefix,
tx,
kv.PlainContractCode,
kv.ContractCode,
cfg.dirs.Tmp,
keyTransformExtractFunc(transformContractCodeKey),
func(k, v []byte, next etl.ExtractNextFunc) error {
newK, err := transformContractCodeKey(k)
if err != nil {
return err
}
return next(k, newK, v)
},
etl.IdentityLoadFunc,
etl.TransformArgs{
Quit: ctx.Done(),
@ -176,114 +184,151 @@ func PromoteHashedStateCleanly(logPrefix string, tx kv.RwTx, cfg HashStateCfg, c
func promotePlainState(
logPrefix string,
db kv.RoDB,
tx kv.RwTx,
tmpdir string,
loadFunc etl.LoadFunc,
quit <-chan struct{},
ctx context.Context,
) error {
bufferSize := etl.BufferOptimalSize
accCollector := etl.NewCollector(logPrefix, tmpdir, etl.NewSortableBuffer(bufferSize))
accCollector := etl.NewCollector(logPrefix, tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer accCollector.Close()
storageCollector := etl.NewCollector(logPrefix, tmpdir, etl.NewSortableBuffer(bufferSize))
accCollector.LogLvl(log.LvlTrace)
storageCollector := etl.NewCollector(logPrefix, tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer storageCollector.Close()
storageCollector.LogLvl(log.LvlTrace)
t := time.Now()
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()
var m runtime.MemStats
c, err := tx.Cursor(kv.PlainState)
if err != nil {
return err
transform := func(k, v []byte) ([]byte, []byte, error) {
newK, err := transformPlainStateKey(k)
return newK, v, err
}
defer c.Close()
convertAccFunc := func(key []byte) ([]byte, error) {
hash, err := common.HashData(key)
return hash[:], err
collect := func(k, v []byte) error {
if len(k) == 32 {
return accCollector.Collect(k, v)
}
return storageCollector.Collect(k, v)
}
convertStorageFunc := func(key []byte) ([]byte, error) {
addrHash, err := common.HashData(key[:length.Addr])
if err != nil {
return nil, err
}
inc := binary.BigEndian.Uint64(key[length.Addr:])
secKey, err := common.HashData(key[length.Addr+length.Incarnation:])
if err != nil {
return nil, err
}
compositeKey := dbutils.GenerateCompositeStorageKey(addrHash, inc, secKey)
return compositeKey, nil
}
{ //errgroup cancelation scope
// pipeline: extract -> transform -> collect
in, out := make(chan pair, 10_000), make(chan pair, 10_000)
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
defer close(out)
return parallelTransform(ctx, in, out, transform, estimate.AlmostAllCPUs()).Wait()
})
g.Go(func() error { return collectChan(ctx, out, collect) })
g.Go(func() error { return parallelWarmup(ctx, db, kv.PlainState, 2) })
var startkey []byte
// reading kv.PlainState
for k, v, e := c.Seek(startkey); k != nil; k, v, e = c.Next() {
if e != nil {
return e
}
if err := libcommon.Stopped(quit); err != nil {
if err := extractTableToChan(ctx, tx, kv.PlainState, in, logPrefix); err != nil {
// if ctx canceled, then maybe it's because of error in errgroup
//
// errgroup doesn't play with pattern where some 1 goroutine-producer is outside of errgroup
// but RwTx doesn't allow move between goroutines
if errors.Is(err, context.Canceled) {
return g.Wait()
}
return err
}
if len(k) == 20 {
newK, err := convertAccFunc(k)
if err != nil {
return err
}
if err := accCollector.Collect(newK, v); err != nil {
return err
}
} else {
newK, err := convertStorageFunc(k)
if err != nil {
return err
}
if err := storageCollector.Collect(newK, v); err != nil {
return err
}
}
select {
default:
case <-logEvery.C:
dbg.ReadMemStats(&m)
log.Info(fmt.Sprintf("[%s] ETL [1/2] Extracting", logPrefix), "current_prefix", hex.EncodeToString(k[:4]), "alloc", libcommon.ByteCount(m.Alloc), "sys", libcommon.ByteCount(m.Sys))
if err := g.Wait(); err != nil {
return fmt.Errorf("g.Wait: %w", err)
}
}
log.Trace(fmt.Sprintf("[%s] Extraction finished", logPrefix), "took", time.Since(t))
defer func(t time.Time) {
log.Trace(fmt.Sprintf("[%s] Load finished", logPrefix), "took", time.Since(t))
}(time.Now())
args := etl.TransformArgs{
Quit: quit,
args := etl.TransformArgs{Quit: ctx.Done()}
if err := accCollector.Load(tx, kv.HashedAccounts, etl.IdentityLoadFunc, args); err != nil {
return fmt.Errorf("accCollector.Load: %w", err)
}
if err := accCollector.Load(tx, kv.HashedAccounts, loadFunc, args); err != nil {
return err
}
if err := storageCollector.Load(tx, kv.HashedStorage, loadFunc, args); err != nil {
return err
if err := storageCollector.Load(tx, kv.HashedStorage, etl.IdentityLoadFunc, args); err != nil {
return fmt.Errorf("storageCollector.Load: %w", err)
}
return nil
}
func keyTransformExtractFunc(transformKey func([]byte) ([]byte, error)) etl.ExtractFunc {
return func(k, v []byte, next etl.ExtractNextFunc) error {
newK, err := transformKey(k)
if err != nil {
return err
type pair struct{ k, v []byte }
func extractTableToChan(ctx context.Context, tx kv.Tx, table string, in chan pair, logPrefix string) error {
defer close(in)
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()
var m runtime.MemStats
return tx.ForEach(table, nil, func(k, v []byte) error {
select { // this select can't print logs, because of
case in <- pair{k: k, v: v}:
case <-ctx.Done():
return ctx.Err()
}
select {
case <-logEvery.C:
dbg.ReadMemStats(&m)
log.Info(fmt.Sprintf("[%s] ETL [1/2] Extracting", logPrefix), "current_prefix", hex.EncodeToString(k[:4]), "alloc", libcommon.ByteCount(m.Alloc), "sys", libcommon.ByteCount(m.Sys))
default:
}
return nil
})
}
func collectChan(ctx context.Context, out chan pair, collect func(k, v []byte) error) error {
for {
select {
case item, ok := <-out:
if !ok {
return nil
}
if err := collect(item.k, item.v); err != nil {
return fmt.Errorf("collectChan: %w", err)
}
case <-ctx.Done():
return ctx.Err()
}
return next(k, newK, v)
}
}
func parallelTransform(ctx context.Context, in chan pair, out chan pair, transform func(k, v []byte) ([]byte, []byte, error), workers int) *errgroup.Group {
hashG, ctx := errgroup.WithContext(ctx)
for i := 0; i < workers; i++ {
hashG.Go(func() error {
for item := range in {
k, v, err := transform(item.k, item.v)
if err != nil {
return err
}
select {
case out <- pair{k: k, v: v}:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
}
return hashG
}
func parallelWarmup(ctx context.Context, db kv.RoDB, bucket string, workers int) error {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(workers)
for i := 0; i < 256; i++ {
i := i
g.Go(func() error {
return db.View(ctx, func(tx kv.Tx) error {
it, err := tx.Prefix(bucket, []byte{byte(i)})
if err != nil {
return err
}
for it.HasNext() {
_, _, err = it.Next()
if err != nil {
return err
}
}
return nil
})
})
}
return g.Wait()
}
func transformPlainStateKey(key []byte) ([]byte, error) {
switch len(key) {
@ -340,12 +385,12 @@ func (l *OldestAppearedLoad) LoadFunc(k, v []byte, table etl.CurrentTableReader,
return l.innerLoadFunc(k, v, table, next)
}
func NewPromoter(db kv.RwTx, dirs datadir.Dirs, quitCh <-chan struct{}) *Promoter {
func NewPromoter(db kv.RwTx, dirs datadir.Dirs, ctx context.Context) *Promoter {
return &Promoter{
tx: db,
ChangeSetBufSize: 256 * 1024 * 1024,
dirs: dirs,
quitCh: quitCh,
ctx: ctx,
}
}
@ -353,7 +398,7 @@ type Promoter struct {
tx kv.RwTx
ChangeSetBufSize uint64
dirs datadir.Dirs
quitCh <-chan struct{}
ctx context.Context
}
func getExtractFunc(db kv.Tx, changeSetBucket string) etl.ExtractFunc {
@ -541,7 +586,7 @@ func (p *Promoter) PromoteOnHistoryV3(logPrefix string, agg *state.AggregatorV3,
}); err != nil {
return err
}
if err := collector.Load(p.tx, kv.HashedStorage, etl.IdentityLoadFunc, etl.TransformArgs{Quit: p.quitCh}); err != nil {
if err := collector.Load(p.tx, kv.HashedStorage, etl.IdentityLoadFunc, etl.TransformArgs{Quit: p.ctx.Done()}); err != nil {
return err
}
return nil
@ -596,10 +641,10 @@ func (p *Promoter) PromoteOnHistoryV3(logPrefix string, agg *state.AggregatorV3,
return err
}
if err := collector.Load(p.tx, kv.HashedAccounts, etl.IdentityLoadFunc, etl.TransformArgs{Quit: p.quitCh}); err != nil {
if err := collector.Load(p.tx, kv.HashedAccounts, etl.IdentityLoadFunc, etl.TransformArgs{Quit: p.ctx.Done()}); err != nil {
return err
}
if err := codeCollector.Load(p.tx, kv.ContractCode, etl.IdentityLoadFunc, etl.TransformArgs{Quit: p.quitCh}); err != nil {
if err := codeCollector.Load(p.tx, kv.ContractCode, etl.IdentityLoadFunc, etl.TransformArgs{Quit: p.ctx.Done()}); err != nil {
return err
}
return nil
@ -642,7 +687,7 @@ func (p *Promoter) Promote(logPrefix string, from, to uint64, storage, codes boo
etl.TransformArgs{
BufferType: etl.SortableOldestAppearedBuffer,
ExtractStartKey: startkey,
Quit: p.quitCh,
Quit: p.ctx.Done(),
},
); err != nil {
return err
@ -693,7 +738,7 @@ func (p *Promoter) UnwindOnHistoryV3(logPrefix string, agg *state.AggregatorV3,
return err
}
return collector.Load(p.tx, kv.ContractCode, etl.IdentityLoadFunc, etl.TransformArgs{Quit: p.quitCh})
return collector.Load(p.tx, kv.ContractCode, etl.IdentityLoadFunc, etl.TransformArgs{Quit: p.ctx.Done()})
}
if storage {
@ -719,7 +764,7 @@ func (p *Promoter) UnwindOnHistoryV3(logPrefix string, agg *state.AggregatorV3,
}); err != nil {
return err
}
return collector.Load(p.tx, kv.HashedStorage, etl.IdentityLoadFunc, etl.TransformArgs{Quit: p.quitCh})
return collector.Load(p.tx, kv.HashedStorage, etl.IdentityLoadFunc, etl.TransformArgs{Quit: p.ctx.Done()})
}
if err = agg.Accounts().MakeContext().IterateRecentlyChanged(txnFrom, txnTo, p.tx, func(k []byte, v []byte) error {
@ -755,7 +800,7 @@ func (p *Promoter) UnwindOnHistoryV3(logPrefix string, agg *state.AggregatorV3,
}); err != nil {
return err
}
return collector.Load(p.tx, kv.HashedAccounts, etl.IdentityLoadFunc, etl.TransformArgs{Quit: p.quitCh})
return collector.Load(p.tx, kv.HashedAccounts, etl.IdentityLoadFunc, etl.TransformArgs{Quit: p.ctx.Done()})
}
func (p *Promoter) Unwind(logPrefix string, s *StageState, u *UnwindState, storage bool, codes bool) error {
@ -800,7 +845,7 @@ func (p *Promoter) Unwind(logPrefix string, s *StageState, u *UnwindState, stora
etl.TransformArgs{
BufferType: etl.SortableOldestAppearedBuffer,
ExtractStartKey: startkey,
Quit: p.quitCh,
Quit: p.ctx.Done(),
LogDetailsExtract: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"progress", etl.ProgressFromKey(k)}
},
@ -811,8 +856,8 @@ func (p *Promoter) Unwind(logPrefix string, s *StageState, u *UnwindState, stora
)
}
func promoteHashedStateIncrementally(logPrefix string, from, to uint64, tx kv.RwTx, cfg HashStateCfg, quit <-chan struct{}, quiet bool) error {
prom := NewPromoter(tx, cfg.dirs, quit)
func promoteHashedStateIncrementally(logPrefix string, from, to uint64, tx kv.RwTx, cfg HashStateCfg, ctx context.Context, quiet bool) error {
prom := NewPromoter(tx, cfg.dirs, ctx)
if cfg.historyV3 {
cfg.agg.SetTx(tx)
if err := prom.PromoteOnHistoryV3(logPrefix, cfg.agg, from, to, false, quiet); err != nil {

View File

@ -48,7 +48,7 @@ func TestPromoteHashedStateIncremental(t *testing.T) {
generateBlocks(t, 51, 50, hashedWriterGen(tx1), changeCodeWithIncarnations)
generateBlocks(t, 51, 50, plainWriterGen(tx2), changeCodeWithIncarnations)
err = promoteHashedStateIncrementally("logPrefix", 50, 101, tx2, cfg, nil /* quit */, false /* quiet */)
err = promoteHashedStateIncrementally("logPrefix", 50, 101, tx2, cfg, context.Background(), false /* quiet */)
if err != nil {
t.Errorf("error while promoting state: %v", err)
}
@ -66,7 +66,7 @@ func TestPromoteHashedStateIncrementalMixed(t *testing.T) {
generateBlocks(t, 1, 50, hashedWriterGen(tx2), changeCodeWithIncarnations)
generateBlocks(t, 51, 50, plainWriterGen(tx2), changeCodeWithIncarnations)
err := promoteHashedStateIncrementally("logPrefix", 50, 101, tx2, StageHashStateCfg(db2, dirs, historyV3, nil), nil /* quit */, false /* quiet */)
err := promoteHashedStateIncrementally("logPrefix", 50, 101, tx2, StageHashStateCfg(db2, dirs, historyV3, nil), context.Background(), false /* quiet */)
if err != nil {
t.Errorf("error while promoting state: %v", err)
}
@ -88,7 +88,7 @@ func TestUnwindHashed(t *testing.T) {
}
u := &UnwindState{UnwindPoint: 50}
s := &StageState{BlockNumber: 100}
err = unwindHashStateStageImpl("logPrefix", u, s, tx2, StageHashStateCfg(db2, dirs, historyV3, nil), nil)
err = unwindHashStateStageImpl("logPrefix", u, s, tx2, StageHashStateCfg(db2, dirs, historyV3, nil), context.Background())
if err != nil {
t.Errorf("error while unwind state: %v", err)
}
@ -119,7 +119,7 @@ func TestPromoteIncrementallyShutdown(t *testing.T) {
}
db, tx := memdb.NewTestTx(t)
generateBlocks(t, 1, 10, plainWriterGen(tx), changeCodeWithIncarnations)
if err := promoteHashedStateIncrementally("logPrefix", 1, 10, tx, StageHashStateCfg(db, dirs, historyV3, nil), ctx.Done(), false /* quiet */); !errors.Is(err, tc.errExp) {
if err := promoteHashedStateIncrementally("logPrefix", 1, 10, tx, StageHashStateCfg(db, dirs, historyV3, nil), ctx, false /* quiet */); !errors.Is(err, tc.errExp) {
t.Errorf("error does not match expected error while shutdown promoteHashedStateIncrementally, got: %v, expected: %v", err, tc.errExp)
}
})
@ -136,7 +136,7 @@ func TestPromoteHashedStateCleanlyShutdown(t *testing.T) {
cancelFuncExec bool
errExp error
}{
{"cancel", true, libcommon.ErrStopped},
{"cancel", true, context.Canceled},
{"no cancel", false, nil},
}
@ -191,14 +191,14 @@ func TestUnwindHashStateShutdown(t *testing.T) {
cfg := StageHashStateCfg(db, dirs, historyV3, nil)
err := PromoteHashedStateCleanly("logPrefix", tx, cfg, ctx)
if tc.cancelFuncExec {
require.ErrorIs(t, err, libcommon.ErrStopped)
require.ErrorIs(t, err, context.Canceled)
} else {
require.NoError(t, err)
}
u := &UnwindState{UnwindPoint: 5}
s := &StageState{BlockNumber: 10}
if err = unwindHashStateStageImpl("logPrefix", u, s, tx, cfg, ctx.Done()); !errors.Is(err, tc.errExp) {
if err = unwindHashStateStageImpl("logPrefix", u, s, tx, cfg, ctx); !errors.Is(err, tc.errExp) {
t.Errorf("error does not match expected error while shutdown unwindHashStateStageImpl, got: %v, expected: %v", err, tc.errExp)
}