diff --git a/common/background/progress.go b/common/background/progress.go index df8743133..b9ae37dde 100644 --- a/common/background/progress.go +++ b/common/background/progress.go @@ -5,7 +5,7 @@ import ( "strings" "sync" - "github.com/google/btree" + btree2 "github.com/tidwall/btree" "go.uber.org/atomic" ) @@ -24,13 +24,13 @@ func (p *Progress) percent() int { // ProgressSet - tracks multiple background job progress type ProgressSet struct { - b *btree.BTreeG[*Progress] + list *btree2.Map[int, *Progress] i int lock sync.RWMutex } func NewProgressSet() *ProgressSet { - return &ProgressSet{b: btree.NewG(4, func(i, j *Progress) bool { return i.i < j.i })} + return &ProgressSet{list: btree2.NewMap[int, *Progress](128)} } func (s *ProgressSet) Add(p *Progress) { @@ -38,13 +38,13 @@ func (s *ProgressSet) Add(p *Progress) { defer s.lock.Unlock() s.i++ p.i = s.i - s.b.ReplaceOrInsert(p) + s.list.Set(p.i, p) } func (s *ProgressSet) Delete(p *Progress) { s.lock.Lock() defer s.lock.Unlock() - s.b.Delete(p) + s.list.Delete(p.i) } func (s *ProgressSet) String() string { @@ -52,10 +52,10 @@ func (s *ProgressSet) String() string { defer s.lock.RUnlock() var sb strings.Builder var i int - s.b.Ascend(func(p *Progress) bool { + s.list.Scan(func(_ int, p *Progress) bool { sb.WriteString(fmt.Sprintf("%s=%d%%", p.Name.Load(), p.percent())) i++ - if i != s.b.Len() { + if i != s.list.Len() { sb.WriteString(", ") } return true diff --git a/go.mod b/go.mod index 321c2b4c8..5aa660b31 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/quasilyte/go-ruleguard/dsl v0.3.21 github.com/spaolacci/murmur3 v1.1.0 github.com/stretchr/testify v1.8.1 + github.com/tidwall/btree v1.5.0 github.com/torquem-ch/mdbx-go v0.27.0 go.uber.org/atomic v1.10.0 golang.org/x/crypto v0.4.0 @@ -89,7 +90,6 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rs/dnscache v0.0.0-20211102005908-e0241e321417 // indirect - github.com/tidwall/btree v1.3.1 // indirect github.com/valyala/fastrand v1.1.0 // indirect github.com/valyala/histogram v1.2.0 // indirect go.etcd.io/bbolt v1.3.6 // indirect diff --git a/go.sum b/go.sum index ecdbff5ce..29f77fc61 100644 --- a/go.sum +++ b/go.sum @@ -372,8 +372,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/tidwall/btree v1.3.1 h1:636+tdVDs8Hjcf35Di260W2xCW4KuoXOKyk9QWOvCpA= -github.com/tidwall/btree v1.3.1/go.mod h1:LGm8L/DZjPLmeWGjv5kFrY8dL4uVhMmzmmLYmsObdKE= +github.com/tidwall/btree v1.5.0 h1:iV0yVY/frd7r6qGBXfEYs7DH0gTDgrKTrDjS7xt/IyQ= +github.com/tidwall/btree v1.5.0/go.mod h1:LGm8L/DZjPLmeWGjv5kFrY8dL4uVhMmzmmLYmsObdKE= github.com/tinylib/msgp v1.0.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= github.com/tinylib/msgp v1.1.0/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= github.com/tinylib/msgp v1.1.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= diff --git a/state/aggregator22.go b/state/aggregator22.go index 63f104b29..141b269f2 100644 --- a/state/aggregator22.go +++ b/state/aggregator22.go @@ -23,13 +23,13 @@ import ( math2 "math" "runtime" "strings" - "sync" "time" "github.com/RoaringBitmap/roaring/roaring64" "github.com/ledgerwatch/erigon-lib/common/dbg" "github.com/ledgerwatch/log/v3" "go.uber.org/atomic" + "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" common2 "github.com/ledgerwatch/erigon-lib/common" @@ -146,68 +146,29 @@ func (a *Aggregator22) closeFiles() { } func (a *Aggregator22) BuildMissedIndices(ctx context.Context, sem *semaphore.Weighted) error { - wg := sync.WaitGroup{} - errs := make(chan error, 7) + g, ctx := errgroup.WithContext(ctx) if a.accounts != nil { - wg.Add(1) - go func() { - defer wg.Done() - errs <- a.accounts.BuildMissedIndices(ctx, sem) - }() + g.Go(func() error { return a.accounts.BuildMissedIndices(ctx, sem) }) } if a.storage != nil { - wg.Add(1) - go func() { - defer wg.Done() - errs <- a.storage.BuildMissedIndices(ctx, sem) - }() + g.Go(func() error { return a.storage.BuildMissedIndices(ctx, sem) }) } if a.code != nil { - wg.Add(1) - go func() { - defer wg.Done() - errs <- a.code.BuildMissedIndices(ctx, sem) - }() + g.Go(func() error { return a.code.BuildMissedIndices(ctx, sem) }) } if a.logAddrs != nil { - wg.Add(1) - go func() { - defer wg.Done() - errs <- a.logAddrs.BuildMissedIndices(ctx, sem) - }() + g.Go(func() error { return a.logAddrs.BuildMissedIndices(ctx, sem) }) } if a.logTopics != nil { - wg.Add(1) - go func() { - defer wg.Done() - errs <- a.logTopics.BuildMissedIndices(ctx, sem) - }() + g.Go(func() error { return a.logTopics.BuildMissedIndices(ctx, sem) }) } if a.tracesFrom != nil { - wg.Add(1) - go func() { - defer wg.Done() - errs <- a.tracesFrom.BuildMissedIndices(ctx, sem) - }() + g.Go(func() error { return a.tracesFrom.BuildMissedIndices(ctx, sem) }) } if a.tracesTo != nil { - wg.Add(1) - go func() { - defer wg.Done() - errs <- a.tracesTo.BuildMissedIndices(ctx, sem) - }() + g.Go(func() error { return a.tracesTo.BuildMissedIndices(ctx, sem) }) } - go func() { - wg.Wait() - close(errs) - }() - var lastError error - for err := range errs { - if err != nil { - lastError = err - } - } - return lastError + return g.Wait() } func (a *Aggregator22) SetLogPrefix(v string) { a.logPrefix = v } diff --git a/state/history.go b/state/history.go index 2c7092575..2368a5b36 100644 --- a/state/history.go +++ b/state/history.go @@ -36,6 +36,7 @@ import ( "github.com/ledgerwatch/erigon-lib/common/dbg" "github.com/ledgerwatch/log/v3" "golang.org/x/exp/slices" + "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "github.com/ledgerwatch/erigon-lib/etl" @@ -261,23 +262,19 @@ func (h *History) BuildMissedIndices(ctx context.Context, sem *semaphore.Weighte } missedFiles := h.missedIdxFiles() - errs := make(chan error, len(missedFiles)) - wg := sync.WaitGroup{} - + g, ctx := errgroup.WithContext(ctx) for _, item := range missedFiles { - if err := sem.Acquire(ctx, 1); err != nil { - errs <- err - break - } - wg.Add(1) - go func(item *filesItem) { + item := item + g.Go(func() error { + if err := sem.Acquire(ctx, 1); err != nil { + return err + } defer sem.Release(1) - defer wg.Done() search := &filesItem{startTxNum: item.startTxNum, endTxNum: item.endTxNum} iiItem, ok := h.InvertedIndex.files.Get(search) if !ok { - return + return nil } fromStep, toStep := item.startTxNum/h.aggregationStep, item.endTxNum/h.aggregationStep @@ -286,23 +283,13 @@ func (h *History) BuildMissedIndices(ctx context.Context, sem *semaphore.Weighte log.Info("[snapshots] build idx", "file", fName) count, err := iterateForVi(item, iiItem, h.compressVals, func(v []byte) error { return nil }) if err != nil { - errs <- err + return err } - errs <- buildVi(item, iiItem, idxPath, h.tmpdir, count, false /* values */, h.compressVals) - }(item) + return buildVi(item, iiItem, idxPath, h.tmpdir, count, false /* values */, h.compressVals) + }) } - go func() { - wg.Wait() - close(errs) - }() - var lastError error - for err := range errs { - if err != nil { - lastError = err - } - } - if lastError != nil { - return lastError + if err := g.Wait(); err != nil { + return err } return h.openFiles() diff --git a/state/inverted_index.go b/state/inverted_index.go index e4d83811d..f3cf1c8e6 100644 --- a/state/inverted_index.go +++ b/state/inverted_index.go @@ -38,6 +38,7 @@ import ( "github.com/ledgerwatch/erigon-lib/common/dbg" "github.com/ledgerwatch/log/v3" "golang.org/x/exp/slices" + "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "github.com/ledgerwatch/erigon-lib/common/cmp" @@ -184,39 +185,27 @@ func (ii *InvertedIndex) missedIdxFiles() (l []*filesItem) { // BuildMissedIndices - produce .efi/.vi/.kvi from .ef/.v/.kv func (ii *InvertedIndex) BuildMissedIndices(ctx context.Context, sem *semaphore.Weighted) (err error) { missedFiles := ii.missedIdxFiles() - errs := make(chan error, len(missedFiles)) - wg := sync.WaitGroup{} + g, ctx := errgroup.WithContext(ctx) for _, item := range missedFiles { - if err := sem.Acquire(ctx, 1); err != nil { - errs <- err - break - } - wg.Add(1) - go func(item *filesItem) { + item := item + g.Go(func() error { + if err := sem.Acquire(ctx, 1); err != nil { + return err + } defer sem.Release(1) - defer wg.Done() fromStep, toStep := item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep fName := fmt.Sprintf("%s.%d-%d.efi", ii.filenameBase, fromStep, toStep) idxPath := filepath.Join(ii.dir, fName) log.Info("[snapshots] build idx", "file", fName) _, err := buildIndex(ctx, item.decompressor, idxPath, ii.tmpdir, item.decompressor.Count()/2, false) if err != nil { - errs <- err + return err } - }(item) + return nil + }) } - go func() { - wg.Wait() - close(errs) - }() - var lastError error - for err := range errs { - if err != nil { - lastError = err - } - } - if lastError != nil { - return lastError + if err := g.Wait(); err != nil { + return err } return ii.openFiles() }