Downloader: correct logging when create .torrent files (#1123)

This commit is contained in:
Alex Sharov 2023-09-13 11:49:42 +07:00 committed by GitHub
parent de8f783eeb
commit 810c4022f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 46 additions and 35 deletions

View File

@ -18,7 +18,6 @@ package downloader
import (
"context"
//nolint:gosec
"fmt"
"net"
"os"
@ -26,7 +25,6 @@ import (
"regexp"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
@ -35,13 +33,13 @@ import (
"github.com/anacrolix/torrent/metainfo"
common2 "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/cmp"
"github.com/ledgerwatch/erigon-lib/common/dbg"
dir2 "github.com/ledgerwatch/erigon-lib/common/dir"
"github.com/ledgerwatch/erigon-lib/downloader/downloadercfg"
"github.com/ledgerwatch/erigon-lib/downloader/snaptype"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/log/v3"
"golang.org/x/sync/semaphore"
"golang.org/x/sync/errgroup"
)
// `github.com/anacrolix/torrent` library spawning several goroutines and producing many requests for each tracker. So we limit amout of trackers by 7
@ -145,6 +143,18 @@ func seedableSegmentFiles(dir string) ([]string, error) {
var historyFileRegex = regexp.MustCompile("^([[:lower:]]+).([0-9]+)-([0-9]+).(.*)$")
func seedableHistorySnapshots(dir, subDir string) ([]string, error) {
l, err := seedableSnapshotsBySubDir(dir, "history")
if err != nil {
return nil, err
}
l2, err := seedableSnapshotsBySubDir(dir, "warm")
if err != nil {
return nil, err
}
return append(l, l2...), nil
}
func seedableSnapshotsBySubDir(dir, subDir string) ([]string, error) {
historyDir := filepath.Join(dir, subDir)
dir2.MustExist(historyDir)
files, err := os.ReadDir(historyDir)
@ -186,6 +196,12 @@ func seedableHistorySnapshots(dir, subDir string) ([]string, error) {
}
func buildTorrentIfNeed(ctx context.Context, fName, root string) (err error) {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
fPath := filepath.Join(root, fName)
if dir2.FileExist(fPath + ".torrent") {
return
@ -229,42 +245,35 @@ func BuildTorrentFilesIfNeed(ctx context.Context, snapDir string) ([]string, err
return nil, err
}
errs := make(chan error, len(files)*2)
wg := &sync.WaitGroup{}
workers := cmp.Max(1, runtime.GOMAXPROCS(-1)-1) * 2
var sem = semaphore.NewWeighted(int64(workers))
i := atomic.Int32{}
for _, file := range files {
wg.Add(1)
if err := sem.Acquire(ctx, 1); err != nil {
return nil, err
}
go func(f string) {
defer i.Add(1)
defer sem.Release(1)
defer wg.Done()
if err := buildTorrentIfNeed(ctx, f, snapDir); err != nil {
errs <- err
}
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(cmp.Max(1, runtime.GOMAXPROCS(-1)-1) * 4)
var i atomic.Int32
select {
default:
case <-ctx.Done():
errs <- ctx.Err()
case <-logEvery.C:
log.Info("[snapshots] Creating .torrent files", "Progress", fmt.Sprintf("%d/%d", i.Load(), len(files)))
for _, file := range files {
file := file
g.Go(func() error {
defer i.Add(1)
if err := buildTorrentIfNeed(ctx, file, snapDir); err != nil {
return err
}
}(file)
return nil
})
}
go func() {
wg.Wait()
close(errs)
}()
for err := range errs {
if err != nil {
return nil, err
var m runtime.MemStats
Loop:
for int(i.Load()) < len(files) {
select {
case <-ctx.Done():
break Loop // g.Wait() will return right error
case <-logEvery.C:
dbg.ReadMemStats(&m)
log.Info("[snapshots] Creating .torrent files", "progress", fmt.Sprintf("%d/%d", i.Load(), len(files)), "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys))
}
}
if err := g.Wait(); err != nil {
return nil, err
}
return files, nil
}

View File

@ -35,6 +35,8 @@ output=$(find "$projectDir" -type 'd' -maxdepth 1 \
| grep -v "github.com/anacrolix/multiless" `# MPL-2.0` \
| grep -v "github.com/anacrolix/sync" `# MPL-2.0` \
| grep -v "github.com/anacrolix/upnp" `# MPL-2.0` \
| grep -v "github.com/go-llsqlite/adapter" `# MPL-2.0` \
| grep -v "github.com/go-llsqlite/crawshaw" `# ISC` \
| grep -v "github.com/consensys/gnark-crypto" `# Apache-2.0` \
| grep -v "github.com/erigontech/mdbx-go" `# Apache-2.0` \
| grep -v "github.com/ledgerwatch/secp256k1" `# BSD-3-Clause` \