e3: BuildMissedIndices errgroup (#802)

This commit is contained in:
Alex Sharov 2022-12-24 17:30:21 +07:00 committed by GitHub
parent 3170bf0a57
commit 1371990bed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 45 additions and 108 deletions

View File

@ -5,7 +5,7 @@ import (
"strings"
"sync"
"github.com/google/btree"
btree2 "github.com/tidwall/btree"
"go.uber.org/atomic"
)
@ -24,13 +24,13 @@ func (p *Progress) percent() int {
// ProgressSet - tracks multiple background job progress
type ProgressSet struct {
b *btree.BTreeG[*Progress]
list *btree2.Map[int, *Progress]
i int
lock sync.RWMutex
}
func NewProgressSet() *ProgressSet {
return &ProgressSet{b: btree.NewG(4, func(i, j *Progress) bool { return i.i < j.i })}
return &ProgressSet{list: btree2.NewMap[int, *Progress](128)}
}
func (s *ProgressSet) Add(p *Progress) {
@ -38,13 +38,13 @@ func (s *ProgressSet) Add(p *Progress) {
defer s.lock.Unlock()
s.i++
p.i = s.i
s.b.ReplaceOrInsert(p)
s.list.Set(p.i, p)
}
func (s *ProgressSet) Delete(p *Progress) {
s.lock.Lock()
defer s.lock.Unlock()
s.b.Delete(p)
s.list.Delete(p.i)
}
func (s *ProgressSet) String() string {
@ -52,10 +52,10 @@ func (s *ProgressSet) String() string {
defer s.lock.RUnlock()
var sb strings.Builder
var i int
s.b.Ascend(func(p *Progress) bool {
s.list.Scan(func(_ int, p *Progress) bool {
sb.WriteString(fmt.Sprintf("%s=%d%%", p.Name.Load(), p.percent()))
i++
if i != s.b.Len() {
if i != s.list.Len() {
sb.WriteString(", ")
}
return true

2
go.mod
View File

@ -24,6 +24,7 @@ require (
github.com/quasilyte/go-ruleguard/dsl v0.3.21
github.com/spaolacci/murmur3 v1.1.0
github.com/stretchr/testify v1.8.1
github.com/tidwall/btree v1.5.0
github.com/torquem-ch/mdbx-go v0.27.0
go.uber.org/atomic v1.10.0
golang.org/x/crypto v0.4.0
@ -89,7 +90,6 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rs/dnscache v0.0.0-20211102005908-e0241e321417 // indirect
github.com/tidwall/btree v1.3.1 // indirect
github.com/valyala/fastrand v1.1.0 // indirect
github.com/valyala/histogram v1.2.0 // indirect
go.etcd.io/bbolt v1.3.6 // indirect

4
go.sum
View File

@ -372,8 +372,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/tidwall/btree v1.3.1 h1:636+tdVDs8Hjcf35Di260W2xCW4KuoXOKyk9QWOvCpA=
github.com/tidwall/btree v1.3.1/go.mod h1:LGm8L/DZjPLmeWGjv5kFrY8dL4uVhMmzmmLYmsObdKE=
github.com/tidwall/btree v1.5.0 h1:iV0yVY/frd7r6qGBXfEYs7DH0gTDgrKTrDjS7xt/IyQ=
github.com/tidwall/btree v1.5.0/go.mod h1:LGm8L/DZjPLmeWGjv5kFrY8dL4uVhMmzmmLYmsObdKE=
github.com/tinylib/msgp v1.0.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
github.com/tinylib/msgp v1.1.0/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
github.com/tinylib/msgp v1.1.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=

View File

@ -23,13 +23,13 @@ import (
math2 "math"
"runtime"
"strings"
"sync"
"time"
"github.com/RoaringBitmap/roaring/roaring64"
"github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/log/v3"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
common2 "github.com/ledgerwatch/erigon-lib/common"
@ -146,68 +146,29 @@ func (a *Aggregator22) closeFiles() {
}
func (a *Aggregator22) BuildMissedIndices(ctx context.Context, sem *semaphore.Weighted) error {
wg := sync.WaitGroup{}
errs := make(chan error, 7)
g, ctx := errgroup.WithContext(ctx)
if a.accounts != nil {
wg.Add(1)
go func() {
defer wg.Done()
errs <- a.accounts.BuildMissedIndices(ctx, sem)
}()
g.Go(func() error { return a.accounts.BuildMissedIndices(ctx, sem) })
}
if a.storage != nil {
wg.Add(1)
go func() {
defer wg.Done()
errs <- a.storage.BuildMissedIndices(ctx, sem)
}()
g.Go(func() error { return a.storage.BuildMissedIndices(ctx, sem) })
}
if a.code != nil {
wg.Add(1)
go func() {
defer wg.Done()
errs <- a.code.BuildMissedIndices(ctx, sem)
}()
g.Go(func() error { return a.code.BuildMissedIndices(ctx, sem) })
}
if a.logAddrs != nil {
wg.Add(1)
go func() {
defer wg.Done()
errs <- a.logAddrs.BuildMissedIndices(ctx, sem)
}()
g.Go(func() error { return a.logAddrs.BuildMissedIndices(ctx, sem) })
}
if a.logTopics != nil {
wg.Add(1)
go func() {
defer wg.Done()
errs <- a.logTopics.BuildMissedIndices(ctx, sem)
}()
g.Go(func() error { return a.logTopics.BuildMissedIndices(ctx, sem) })
}
if a.tracesFrom != nil {
wg.Add(1)
go func() {
defer wg.Done()
errs <- a.tracesFrom.BuildMissedIndices(ctx, sem)
}()
g.Go(func() error { return a.tracesFrom.BuildMissedIndices(ctx, sem) })
}
if a.tracesTo != nil {
wg.Add(1)
go func() {
defer wg.Done()
errs <- a.tracesTo.BuildMissedIndices(ctx, sem)
}()
g.Go(func() error { return a.tracesTo.BuildMissedIndices(ctx, sem) })
}
go func() {
wg.Wait()
close(errs)
}()
var lastError error
for err := range errs {
if err != nil {
lastError = err
}
}
return lastError
return g.Wait()
}
func (a *Aggregator22) SetLogPrefix(v string) { a.logPrefix = v }

View File

@ -36,6 +36,7 @@ import (
"github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/log/v3"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"github.com/ledgerwatch/erigon-lib/etl"
@ -261,23 +262,19 @@ func (h *History) BuildMissedIndices(ctx context.Context, sem *semaphore.Weighte
}
missedFiles := h.missedIdxFiles()
errs := make(chan error, len(missedFiles))
wg := sync.WaitGroup{}
g, ctx := errgroup.WithContext(ctx)
for _, item := range missedFiles {
if err := sem.Acquire(ctx, 1); err != nil {
errs <- err
break
}
wg.Add(1)
go func(item *filesItem) {
item := item
g.Go(func() error {
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)
defer wg.Done()
search := &filesItem{startTxNum: item.startTxNum, endTxNum: item.endTxNum}
iiItem, ok := h.InvertedIndex.files.Get(search)
if !ok {
return
return nil
}
fromStep, toStep := item.startTxNum/h.aggregationStep, item.endTxNum/h.aggregationStep
@ -286,23 +283,13 @@ func (h *History) BuildMissedIndices(ctx context.Context, sem *semaphore.Weighte
log.Info("[snapshots] build idx", "file", fName)
count, err := iterateForVi(item, iiItem, h.compressVals, func(v []byte) error { return nil })
if err != nil {
errs <- err
return err
}
errs <- buildVi(item, iiItem, idxPath, h.tmpdir, count, false /* values */, h.compressVals)
}(item)
return buildVi(item, iiItem, idxPath, h.tmpdir, count, false /* values */, h.compressVals)
})
}
go func() {
wg.Wait()
close(errs)
}()
var lastError error
for err := range errs {
if err != nil {
lastError = err
}
}
if lastError != nil {
return lastError
if err := g.Wait(); err != nil {
return err
}
return h.openFiles()

View File

@ -38,6 +38,7 @@ import (
"github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/log/v3"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"github.com/ledgerwatch/erigon-lib/common/cmp"
@ -184,39 +185,27 @@ func (ii *InvertedIndex) missedIdxFiles() (l []*filesItem) {
// BuildMissedIndices - produce .efi/.vi/.kvi from .ef/.v/.kv
func (ii *InvertedIndex) BuildMissedIndices(ctx context.Context, sem *semaphore.Weighted) (err error) {
missedFiles := ii.missedIdxFiles()
errs := make(chan error, len(missedFiles))
wg := sync.WaitGroup{}
g, ctx := errgroup.WithContext(ctx)
for _, item := range missedFiles {
if err := sem.Acquire(ctx, 1); err != nil {
errs <- err
break
}
wg.Add(1)
go func(item *filesItem) {
item := item
g.Go(func() error {
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)
defer wg.Done()
fromStep, toStep := item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep
fName := fmt.Sprintf("%s.%d-%d.efi", ii.filenameBase, fromStep, toStep)
idxPath := filepath.Join(ii.dir, fName)
log.Info("[snapshots] build idx", "file", fName)
_, err := buildIndex(ctx, item.decompressor, idxPath, ii.tmpdir, item.decompressor.Count()/2, false)
if err != nil {
errs <- err
return err
}
}(item)
return nil
})
}
go func() {
wg.Wait()
close(errs)
}()
var lastError error
for err := range errs {
if err != nil {
lastError = err
}
}
if lastError != nil {
return lastError
if err := g.Wait(); err != nil {
return err
}
return ii.openFiles()
}