From 8406cb7899896287f63f16cfa1aba01a9e185b82 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Wed, 1 Mar 2023 16:55:29 +0700 Subject: [PATCH] e3: exec workers - can't treat all errors as "state conflict", because applyLoop will get same error and handle them well (#6993) --- cmd/state/exec3/state.go | 68 ++++++++++++++++++++++------------------ 1 file changed, 38 insertions(+), 30 deletions(-) diff --git a/cmd/state/exec3/state.go b/cmd/state/exec3/state.go index df6434489..8c204dec3 100644 --- a/cmd/state/exec3/state.go +++ b/cmd/state/exec3/state.go @@ -9,6 +9,7 @@ import ( libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/log/v3" + "golang.org/x/sync/errgroup" "github.com/ledgerwatch/erigon/cmd/state/exec22" "github.com/ledgerwatch/erigon/consensus" @@ -102,15 +103,16 @@ func (rw *Worker) ResetTx(chainTx kv.Tx) { } } -func (rw *Worker) Run() { +func (rw *Worker) Run() error { for txTask, ok := rw.rs.Schedule(); ok; txTask, ok = rw.rs.Schedule() { rw.RunTxTask(txTask) select { case rw.resultCh <- txTask: // Needs to have outside of the lock case <-rw.ctx.Done(): - return + return rw.ctx.Err() } } + return nil } func (rw *Worker) RunTxTask(txTask *exec22.TxTask) { @@ -324,37 +326,43 @@ func (cr EpochReader) FindBeforeOrEqualNumber(number uint64) (blockNum uint64, b } func NewWorkersPool(lock sync.Locker, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, blockReader services.FullBlockReader, chainConfig *chain.Config, logger log.Logger, genesis *core.Genesis, engine consensus.Engine, workerCount int) (reconWorkers []*Worker, applyWorker *Worker, resultCh chan *exec22.TxTask, clear func(), wait func()) { - ctx, cancel := context.WithCancel(ctx) - var wg sync.WaitGroup queueSize := workerCount * 256 reconWorkers = make([]*Worker, workerCount) resultCh = make(chan *exec22.TxTask, queueSize) - for i := 0; i < workerCount; i++ { - reconWorkers[i] = NewWorker(lock, ctx, background, chainDb, rs, blockReader, chainConfig, logger, genesis, resultCh, engine) + { + // we all errors in background workers (except ctx.Cancele), because applyLoop will detect this error anyway. + // and in applyLoop all errors are critical + ctx, cancel := context.WithCancel(ctx) + g, ctx := errgroup.WithContext(ctx) + for i := 0; i < workerCount; i++ { + reconWorkers[i] = NewWorker(lock, ctx, background, chainDb, rs, blockReader, chainConfig, logger, genesis, resultCh, engine) + } + if background { + for i := 0; i < workerCount; i++ { + i := i + g.Go(func() error { + return reconWorkers[i].Run() + }) + } + wait = func() { g.Wait() } + } + + var clearDone bool + clear = func() { + if clearDone { + return + } + clearDone = true + cancel() + g.Wait() + for _, w := range reconWorkers { + w.ResetTx(nil) + } + //applyWorker.ResetTx(nil) + close(resultCh) + } } applyWorker = NewWorker(lock, ctx, false, chainDb, rs, blockReader, chainConfig, logger, genesis, resultCh, engine) - var clearDone bool - clear = func() { - if clearDone { - return - } - clearDone = true - cancel() - wg.Wait() - for _, w := range reconWorkers { - w.ResetTx(nil) - } - applyWorker.ResetTx(nil) - close(resultCh) - } - if background { - wg.Add(workerCount) - for i := 0; i < workerCount; i++ { - go func(i int) { - defer wg.Done() - reconWorkers[i].Run() - }(i) - } - } - return reconWorkers, applyWorker, resultCh, clear, wg.Wait + + return reconWorkers, applyWorker, resultCh, clear, wait }