downloader: progress print better (#8350)

This commit is contained in:
Alex Sharov 2023-10-04 09:57:37 +07:00 committed by GitHub
parent a62796ab4d
commit ce47ad30e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 88 additions and 41 deletions

View File

@ -175,7 +175,7 @@ func Downloader(ctx context.Context, logger log.Logger) error {
}
downloadernat.DoNat(natif, cfg.ClientConfig, logger)
d, err := downloader.New(ctx, cfg, logger)
d, err := downloader.New(ctx, cfg, logger, log.LvlInfo)
if err != nil {
return err
}

View File

@ -24,6 +24,7 @@ import (
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
@ -59,8 +60,9 @@ type Downloader struct {
stopMainLoop context.CancelFunc
wg sync.WaitGroup
webseeds *WebSeeds
logger log.Logger
webseeds *WebSeeds
logger log.Logger
verbosity log.Lvl
}
type AggStats struct {
@ -78,7 +80,7 @@ type AggStats struct {
UploadRate, DownloadRate uint64
}
func New(ctx context.Context, cfg *downloadercfg.Cfg, logger log.Logger) (*Downloader, error) {
func New(ctx context.Context, cfg *downloadercfg.Cfg, logger log.Logger, verbosity log.Lvl) (*Downloader, error) {
// Application must never see partially-downloaded files
// To provide such consistent view - downloader does:
// add <datadir>/snapshots/tmp - then method .onComplete will remove this suffix
@ -115,13 +117,14 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg, logger log.Logger) (*Downl
statsLock: &sync.RWMutex{},
webseeds: &WebSeeds{logger: logger},
logger: logger,
verbosity: verbosity,
}
d.ctx, d.stopMainLoop = context.WithCancel(ctx)
if err := d.BuildTorrentFilesIfNeed(d.ctx); err != nil {
return nil, err
}
if err := d.addTorrentFilesFromDisk(d.ctx); err != nil {
if err := d.addTorrentFilesFromDisk(); err != nil {
return nil, err
}
// CornerCase: no peers -> no anoncments to trackers -> no magnetlink resolution (but magnetlink has filename)
@ -131,10 +134,9 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg, logger log.Logger) (*Downl
defer d.wg.Done()
d.webseeds.Discover(d.ctx, d.cfg.WebSeedUrls, d.cfg.WebSeedFiles, d.cfg.SnapDir)
// webseeds.Discover may create new .torrent files on disk
if err := d.addTorrentFilesFromDisk(d.ctx); err != nil {
d.logger.Warn("[downloader] addTorrentFilesFromDisk", "err", err)
if err := d.addTorrentFilesFromDisk(); err != nil && !errors.Is(err, context.Canceled) {
d.logger.Warn("[snapshots] addTorrentFilesFromDisk", "err", err)
}
d.applyWebseeds()
}()
return d, nil
}
@ -163,6 +165,11 @@ func (d *Downloader) mainLoop(silent bool) error {
// First loop drops torrents that were downloaded or are already complete
// This improves efficiency of download by reducing number of active torrent (empirical observation)
for torrents := d.torrentClient.Torrents(); len(torrents) > 0; torrents = d.torrentClient.Torrents() {
select {
case <-d.ctx.Done():
return
default:
}
for _, t := range torrents {
if _, already := torrentMap[t.InfoHash()]; already {
continue
@ -202,10 +209,15 @@ func (d *Downloader) mainLoop(silent bool) error {
}
atomic.StoreUint64(&d.stats.DroppedCompleted, 0)
atomic.StoreUint64(&d.stats.DroppedTotal, 0)
d.addTorrentFilesFromDisk(d.ctx)
d.addTorrentFilesFromDisk()
maps.Clear(torrentMap)
for {
torrents := d.torrentClient.Torrents()
select {
case <-d.ctx.Done():
return
default:
}
for _, t := range torrents {
if _, already := torrentMap[t.InfoHash()]; already {
continue
@ -317,6 +329,9 @@ func (d *Downloader) ReCalcStats(interval time.Duration) {
stats.BytesUpload = uint64(connStats.BytesWrittenData.Int64())
stats.BytesTotal, stats.BytesCompleted, stats.ConnectionsTotal, stats.MetadataReady = atomic.LoadUint64(&stats.DroppedTotal), atomic.LoadUint64(&stats.DroppedCompleted), 0, 0
var zeroProgress []string
var noMetadata []string
for _, t := range torrents {
select {
case <-t.GotInfo():
@ -329,14 +344,30 @@ func (d *Downloader) ReCalcStats(interval time.Duration) {
stats.BytesTotal += uint64(t.Length())
if !t.Complete.Bool() {
progress := float32(float64(100) * (float64(t.BytesCompleted()) / float64(t.Length())))
d.logger.Debug("[downloader] file not downloaded yet", "name", t.Name(), "progress", fmt.Sprintf("%.2f%%", progress))
if progress == 0 {
zeroProgress = append(zeroProgress, t.Name())
} else {
d.logger.Log(d.verbosity, "[snapshots] progress", "name", t.Name(), "progress", fmt.Sprintf("%.2f%%", progress))
}
}
default:
d.logger.Debug("[downloader] file has no metadata yet", "name", t.Name())
noMetadata = append(noMetadata, t.Name())
}
stats.Completed = stats.Completed && t.Complete.Bool()
}
if len(noMetadata) > 0 {
if len(noMetadata) > 5 {
noMetadata = append(noMetadata[:5], "...")
}
d.logger.Log(d.verbosity, "[snapshots] no metadata yet", "files", strings.Join(noMetadata, ","))
}
if len(zeroProgress) > 0 {
if len(zeroProgress) > 5 {
zeroProgress = append(zeroProgress[:5], "...")
}
d.logger.Log(d.verbosity, "[snapshots] no progress yet", "files", strings.Join(zeroProgress, ","))
}
stats.DownloadRate = (stats.BytesDownload - prevStats.BytesDownload) / uint64(interval.Seconds())
stats.UploadRate = (stats.BytesUpload - prevStats.BytesUpload) / uint64(interval.Seconds())
@ -468,11 +499,6 @@ func (d *Downloader) VerifyData(ctx context.Context) error {
// have .torrent no .seg => get .seg file from .torrent
// have .seg no .torrent => get .torrent from .seg
func (d *Downloader) AddNewSeedableFile(ctx context.Context, name string) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// if we don't have the torrent file we build it if we have the .seg file
torrentFilePath, err := BuildTorrentIfNeed(ctx, name, d.SnapDir())
if err != nil {
@ -482,7 +508,11 @@ func (d *Downloader) AddNewSeedableFile(ctx context.Context, name string) error
if err != nil {
return err
}
_, err = addTorrentFile(ts, d.torrentClient)
wsUrls, ok := d.webseeds.ByFileName(ts.DisplayName)
if ok {
ts.Webseeds = append(ts.Webseeds, wsUrls...)
}
_, err = addTorrentFile(ctx, ts, d.torrentClient)
if err != nil {
return fmt.Errorf("addTorrentFile: %w", err)
}
@ -527,9 +557,13 @@ func (d *Downloader) AddInfoHashAsMagnetLink(ctx context.Context, infoHash metai
mi := t.Metainfo()
if err := CreateTorrentFileIfNotExists(d.SnapDir(), t.Info(), &mi); err != nil {
d.logger.Warn("[downloader] create torrent file", "err", err)
d.logger.Warn("[snapshots] create torrent file", "err", err)
return
}
urls, ok := d.webseeds.ByFileName(t.Name())
if ok {
t.AddWebSeeds(urls)
}
}(t)
//log.Debug("[downloader] downloaded both seg and torrent files", "hash", infoHash)
return nil
@ -547,16 +581,27 @@ func seedableFiles(snapDir string) ([]string, error) {
files = append(files, files2...)
return files, nil
}
func (d *Downloader) addTorrentFilesFromDisk(ctx context.Context) error {
func (d *Downloader) addTorrentFilesFromDisk() error {
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()
files, err := allTorrentFiles(d.SnapDir())
if err != nil {
return err
}
for _, ts := range files {
_, err := addTorrentFile(ts, d.torrentClient)
for i, ts := range files {
ws, ok := d.webseeds.ByFileName(ts.DisplayName)
if ok {
ts.Webseeds = append(ts.Webseeds, ws...)
}
_, err := addTorrentFile(d.ctx, ts, d.torrentClient)
if err != nil {
return err
}
select {
case <-logEvery.C:
log.Info("[snapshots] Adding .torrent files from disk", "progress", fmt.Sprintf("%d/%d", i, len(files)))
default:
}
}
return nil
}
@ -632,14 +677,3 @@ func openClient(cfg *torrent.ClientConfig) (db kv.RwDB, c storage.PieceCompletio
return db, c, m, torrentClient, nil
}
func (d *Downloader) applyWebseeds() {
for _, t := range d.TorrentClient().Torrents() {
urls, ok := d.webseeds.ByFileName(t.Name())
if !ok {
continue
}
d.logger.Debug("[downloader] addd webseeds", "file", t.Name())
t.AddWebSeeds(urls)
}
}

View File

@ -46,7 +46,6 @@ type GrpcServer struct {
func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.DownloadRequest) (*emptypb.Empty, error) {
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()
defer s.d.applyWebseeds()
for i, it := range request.Items {
if it.Path == "" {

View File

@ -18,7 +18,7 @@ func TestChangeInfoHashOfSameFile(t *testing.T) {
dirs := datadir.New(t.TempDir())
cfg, err := downloadercfg2.New(dirs, "", lg.Info, 0, 0, 0, 0, 0, nil, "")
require.NoError(err)
d, err := New(context.Background(), cfg, log.New())
d, err := New(context.Background(), cfg, log.New(), log.LvlInfo)
require.NoError(err)
defer d.Close()
err = d.AddInfoHashAsMagnetLink(d.ctx, snaptype.Hex2InfoHash("aa"), "a.seg")

View File

@ -32,7 +32,6 @@ import (
"github.com/anacrolix/torrent/metainfo"
common2 "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/cmp"
"github.com/ledgerwatch/erigon-lib/common/dbg"
dir2 "github.com/ledgerwatch/erigon-lib/common/dir"
"github.com/ledgerwatch/erigon-lib/downloader/downloadercfg"
"github.com/ledgerwatch/erigon-lib/downloader/snaptype"
@ -264,15 +263,13 @@ func BuildTorrentFilesIfNeed(ctx context.Context, snapDir string) error {
})
}
var m runtime.MemStats
Loop:
for int(i.Load()) < len(files) {
select {
case <-ctx.Done():
break Loop // g.Wait() will return right error
case <-logEvery.C:
dbg.ReadMemStats(&m)
log.Info("[snapshots] Creating .torrent files", "progress", fmt.Sprintf("%d/%d", i.Load(), len(files)), "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys))
log.Info("[snapshots] Creating .torrent files", "progress", fmt.Sprintf("%d/%d", i.Load(), len(files)))
}
}
if err := g.Wait(); err != nil {
@ -399,7 +396,12 @@ func saveTorrent(torrentFilePath string, res []byte) error {
// added first time - pieces verification process will start (disk IO heavy) - Progress
// kept in `piece completion storage` (surviving reboot). Once it done - no disk IO needed again.
// Don't need call torrent.VerifyData manually
func addTorrentFile(ts *torrent.TorrentSpec, torrentClient *torrent.Client) (*torrent.Torrent, error) {
func addTorrentFile(ctx context.Context, ts *torrent.TorrentSpec, torrentClient *torrent.Client) (*torrent.Torrent, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
if _, ok := torrentClient.Torrent(ts.InfoHash); !ok { // can set ChunkSize only for new torrents
ts.ChunkSize = downloadercfg.DefaultNetworkChunkSize
} else {

View File

@ -90,12 +90,15 @@ func (d *WebSeeds) downloadTorrentFilesFromProviders(ctx context.Context, rootDi
if len(d.TorrentUrls()) == 0 {
return
}
var addedNew int
e, ctx := errgroup.WithContext(ctx)
for name, tUrls := range d.TorrentUrls() {
urlsByName := d.TorrentUrls()
for name, tUrls := range urlsByName {
tPath := filepath.Join(rootDir, name)
if dir.FileExist(tPath) {
continue
}
addedNew++
tUrls := tUrls
e.Go(func() error {
for _, url := range tUrls {
@ -116,6 +119,9 @@ func (d *WebSeeds) downloadTorrentFilesFromProviders(ctx context.Context, rootDi
if err := e.Wait(); err != nil {
d.logger.Warn("[downloader] webseed discover", "err", err)
}
if addedNew > 0 {
d.logger.Debug("[snapshots] downloaded .torrent from webseed", "amount", addedNew)
}
}
func (d *WebSeeds) TorrentUrls() snaptype.TorrentUrls {
@ -124,6 +130,12 @@ func (d *WebSeeds) TorrentUrls() snaptype.TorrentUrls {
return d.torrentUrls
}
func (d *WebSeeds) Len() int {
d.lock.Lock()
defer d.lock.Unlock()
return len(d.byFileName)
}
func (d *WebSeeds) ByFileName(name string) (metainfo.UrlList, bool) {
d.lock.Lock()
defer d.lock.Unlock()

View File

@ -1106,7 +1106,7 @@ func (s *Ethereum) setUpSnapDownloader(ctx context.Context, downloaderCfg *downl
s.downloaderClient, err = downloadergrpc.NewClient(ctx, s.config.Snapshot.DownloaderAddr)
} else {
// start embedded Downloader
s.downloader, err = downloader3.New(ctx, downloaderCfg, s.logger)
s.downloader, err = downloader3.New(ctx, downloaderCfg, s.logger, log.LvlInfo)
if err != nil {
return err
}