e3: exec workers - can't treat all errors as "state conflict", because applyLoop will get same error and handle them well (#6993)

This commit is contained in:
Alex Sharov 2023-03-01 16:55:29 +07:00 committed by GitHub
parent a0ffa454ec
commit 8406cb7899
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -9,6 +9,7 @@ import (
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/log/v3"
"golang.org/x/sync/errgroup"
"github.com/ledgerwatch/erigon/cmd/state/exec22"
"github.com/ledgerwatch/erigon/consensus"
@ -102,15 +103,16 @@ func (rw *Worker) ResetTx(chainTx kv.Tx) {
}
}
func (rw *Worker) Run() {
func (rw *Worker) Run() error {
for txTask, ok := rw.rs.Schedule(); ok; txTask, ok = rw.rs.Schedule() {
rw.RunTxTask(txTask)
select {
case rw.resultCh <- txTask: // Needs to have outside of the lock
case <-rw.ctx.Done():
return
return rw.ctx.Err()
}
}
return nil
}
func (rw *Worker) RunTxTask(txTask *exec22.TxTask) {
@ -324,37 +326,43 @@ func (cr EpochReader) FindBeforeOrEqualNumber(number uint64) (blockNum uint64, b
}
func NewWorkersPool(lock sync.Locker, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, blockReader services.FullBlockReader, chainConfig *chain.Config, logger log.Logger, genesis *core.Genesis, engine consensus.Engine, workerCount int) (reconWorkers []*Worker, applyWorker *Worker, resultCh chan *exec22.TxTask, clear func(), wait func()) {
ctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
queueSize := workerCount * 256
reconWorkers = make([]*Worker, workerCount)
resultCh = make(chan *exec22.TxTask, queueSize)
for i := 0; i < workerCount; i++ {
reconWorkers[i] = NewWorker(lock, ctx, background, chainDb, rs, blockReader, chainConfig, logger, genesis, resultCh, engine)
{
// we all errors in background workers (except ctx.Cancele), because applyLoop will detect this error anyway.
// and in applyLoop all errors are critical
ctx, cancel := context.WithCancel(ctx)
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < workerCount; i++ {
reconWorkers[i] = NewWorker(lock, ctx, background, chainDb, rs, blockReader, chainConfig, logger, genesis, resultCh, engine)
}
if background {
for i := 0; i < workerCount; i++ {
i := i
g.Go(func() error {
return reconWorkers[i].Run()
})
}
wait = func() { g.Wait() }
}
var clearDone bool
clear = func() {
if clearDone {
return
}
clearDone = true
cancel()
g.Wait()
for _, w := range reconWorkers {
w.ResetTx(nil)
}
//applyWorker.ResetTx(nil)
close(resultCh)
}
}
applyWorker = NewWorker(lock, ctx, false, chainDb, rs, blockReader, chainConfig, logger, genesis, resultCh, engine)
var clearDone bool
clear = func() {
if clearDone {
return
}
clearDone = true
cancel()
wg.Wait()
for _, w := range reconWorkers {
w.ResetTx(nil)
}
applyWorker.ResetTx(nil)
close(resultCh)
}
if background {
wg.Add(workerCount)
for i := 0; i < workerCount; i++ {
go func(i int) {
defer wg.Done()
reconWorkers[i].Run()
}(i)
}
}
return reconWorkers, applyWorker, resultCh, clear, wg.Wait
return reconWorkers, applyWorker, resultCh, clear, wait
}