diff --git a/cmd/downloader/main.go b/cmd/downloader/main.go index bf7a76c0f..2ce9ce9d6 100644 --- a/cmd/downloader/main.go +++ b/cmd/downloader/main.go @@ -175,7 +175,7 @@ func Downloader(ctx context.Context, logger log.Logger) error { } downloadernat.DoNat(natif, cfg.ClientConfig, logger) - d, err := downloader.New(ctx, cfg, logger) + d, err := downloader.New(ctx, cfg, logger, log.LvlInfo) if err != nil { return err } diff --git a/erigon-lib/downloader/downloader.go b/erigon-lib/downloader/downloader.go index 301474eb7..34a822b0f 100644 --- a/erigon-lib/downloader/downloader.go +++ b/erigon-lib/downloader/downloader.go @@ -24,6 +24,7 @@ import ( "os" "path/filepath" "runtime" + "strings" "sync" "sync/atomic" "time" @@ -59,8 +60,9 @@ type Downloader struct { stopMainLoop context.CancelFunc wg sync.WaitGroup - webseeds *WebSeeds - logger log.Logger + webseeds *WebSeeds + logger log.Logger + verbosity log.Lvl } type AggStats struct { @@ -78,7 +80,7 @@ type AggStats struct { UploadRate, DownloadRate uint64 } -func New(ctx context.Context, cfg *downloadercfg.Cfg, logger log.Logger) (*Downloader, error) { +func New(ctx context.Context, cfg *downloadercfg.Cfg, logger log.Logger, verbosity log.Lvl) (*Downloader, error) { // Application must never see partially-downloaded files // To provide such consistent view - downloader does: // add /snapshots/tmp - then method .onComplete will remove this suffix @@ -115,13 +117,14 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg, logger log.Logger) (*Downl statsLock: &sync.RWMutex{}, webseeds: &WebSeeds{logger: logger}, logger: logger, + verbosity: verbosity, } d.ctx, d.stopMainLoop = context.WithCancel(ctx) if err := d.BuildTorrentFilesIfNeed(d.ctx); err != nil { return nil, err } - if err := d.addTorrentFilesFromDisk(d.ctx); err != nil { + if err := d.addTorrentFilesFromDisk(); err != nil { return nil, err } // CornerCase: no peers -> no anoncments to trackers -> no magnetlink resolution (but magnetlink has filename) @@ -131,10 +134,9 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg, logger log.Logger) (*Downl defer d.wg.Done() d.webseeds.Discover(d.ctx, d.cfg.WebSeedUrls, d.cfg.WebSeedFiles, d.cfg.SnapDir) // webseeds.Discover may create new .torrent files on disk - if err := d.addTorrentFilesFromDisk(d.ctx); err != nil { - d.logger.Warn("[downloader] addTorrentFilesFromDisk", "err", err) + if err := d.addTorrentFilesFromDisk(); err != nil && !errors.Is(err, context.Canceled) { + d.logger.Warn("[snapshots] addTorrentFilesFromDisk", "err", err) } - d.applyWebseeds() }() return d, nil } @@ -163,6 +165,11 @@ func (d *Downloader) mainLoop(silent bool) error { // First loop drops torrents that were downloaded or are already complete // This improves efficiency of download by reducing number of active torrent (empirical observation) for torrents := d.torrentClient.Torrents(); len(torrents) > 0; torrents = d.torrentClient.Torrents() { + select { + case <-d.ctx.Done(): + return + default: + } for _, t := range torrents { if _, already := torrentMap[t.InfoHash()]; already { continue @@ -202,10 +209,15 @@ func (d *Downloader) mainLoop(silent bool) error { } atomic.StoreUint64(&d.stats.DroppedCompleted, 0) atomic.StoreUint64(&d.stats.DroppedTotal, 0) - d.addTorrentFilesFromDisk(d.ctx) + d.addTorrentFilesFromDisk() maps.Clear(torrentMap) for { torrents := d.torrentClient.Torrents() + select { + case <-d.ctx.Done(): + return + default: + } for _, t := range torrents { if _, already := torrentMap[t.InfoHash()]; already { continue @@ -317,6 +329,9 @@ func (d *Downloader) ReCalcStats(interval time.Duration) { stats.BytesUpload = uint64(connStats.BytesWrittenData.Int64()) stats.BytesTotal, stats.BytesCompleted, stats.ConnectionsTotal, stats.MetadataReady = atomic.LoadUint64(&stats.DroppedTotal), atomic.LoadUint64(&stats.DroppedCompleted), 0, 0 + + var zeroProgress []string + var noMetadata []string for _, t := range torrents { select { case <-t.GotInfo(): @@ -329,14 +344,30 @@ func (d *Downloader) ReCalcStats(interval time.Duration) { stats.BytesTotal += uint64(t.Length()) if !t.Complete.Bool() { progress := float32(float64(100) * (float64(t.BytesCompleted()) / float64(t.Length()))) - d.logger.Debug("[downloader] file not downloaded yet", "name", t.Name(), "progress", fmt.Sprintf("%.2f%%", progress)) + if progress == 0 { + zeroProgress = append(zeroProgress, t.Name()) + } else { + d.logger.Log(d.verbosity, "[snapshots] progress", "name", t.Name(), "progress", fmt.Sprintf("%.2f%%", progress)) + } } default: - d.logger.Debug("[downloader] file has no metadata yet", "name", t.Name()) + noMetadata = append(noMetadata, t.Name()) } stats.Completed = stats.Completed && t.Complete.Bool() } + if len(noMetadata) > 0 { + if len(noMetadata) > 5 { + noMetadata = append(noMetadata[:5], "...") + } + d.logger.Log(d.verbosity, "[snapshots] no metadata yet", "files", strings.Join(noMetadata, ",")) + } + if len(zeroProgress) > 0 { + if len(zeroProgress) > 5 { + zeroProgress = append(zeroProgress[:5], "...") + } + d.logger.Log(d.verbosity, "[snapshots] no progress yet", "files", strings.Join(zeroProgress, ",")) + } stats.DownloadRate = (stats.BytesDownload - prevStats.BytesDownload) / uint64(interval.Seconds()) stats.UploadRate = (stats.BytesUpload - prevStats.BytesUpload) / uint64(interval.Seconds()) @@ -468,11 +499,6 @@ func (d *Downloader) VerifyData(ctx context.Context) error { // have .torrent no .seg => get .seg file from .torrent // have .seg no .torrent => get .torrent from .seg func (d *Downloader) AddNewSeedableFile(ctx context.Context, name string) error { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } // if we don't have the torrent file we build it if we have the .seg file torrentFilePath, err := BuildTorrentIfNeed(ctx, name, d.SnapDir()) if err != nil { @@ -482,7 +508,11 @@ func (d *Downloader) AddNewSeedableFile(ctx context.Context, name string) error if err != nil { return err } - _, err = addTorrentFile(ts, d.torrentClient) + wsUrls, ok := d.webseeds.ByFileName(ts.DisplayName) + if ok { + ts.Webseeds = append(ts.Webseeds, wsUrls...) + } + _, err = addTorrentFile(ctx, ts, d.torrentClient) if err != nil { return fmt.Errorf("addTorrentFile: %w", err) } @@ -527,9 +557,13 @@ func (d *Downloader) AddInfoHashAsMagnetLink(ctx context.Context, infoHash metai mi := t.Metainfo() if err := CreateTorrentFileIfNotExists(d.SnapDir(), t.Info(), &mi); err != nil { - d.logger.Warn("[downloader] create torrent file", "err", err) + d.logger.Warn("[snapshots] create torrent file", "err", err) return } + urls, ok := d.webseeds.ByFileName(t.Name()) + if ok { + t.AddWebSeeds(urls) + } }(t) //log.Debug("[downloader] downloaded both seg and torrent files", "hash", infoHash) return nil @@ -547,16 +581,27 @@ func seedableFiles(snapDir string) ([]string, error) { files = append(files, files2...) return files, nil } -func (d *Downloader) addTorrentFilesFromDisk(ctx context.Context) error { +func (d *Downloader) addTorrentFilesFromDisk() error { + logEvery := time.NewTicker(20 * time.Second) + defer logEvery.Stop() files, err := allTorrentFiles(d.SnapDir()) if err != nil { return err } - for _, ts := range files { - _, err := addTorrentFile(ts, d.torrentClient) + for i, ts := range files { + ws, ok := d.webseeds.ByFileName(ts.DisplayName) + if ok { + ts.Webseeds = append(ts.Webseeds, ws...) + } + _, err := addTorrentFile(d.ctx, ts, d.torrentClient) if err != nil { return err } + select { + case <-logEvery.C: + log.Info("[snapshots] Adding .torrent files from disk", "progress", fmt.Sprintf("%d/%d", i, len(files))) + default: + } } return nil } @@ -632,14 +677,3 @@ func openClient(cfg *torrent.ClientConfig) (db kv.RwDB, c storage.PieceCompletio return db, c, m, torrentClient, nil } - -func (d *Downloader) applyWebseeds() { - for _, t := range d.TorrentClient().Torrents() { - urls, ok := d.webseeds.ByFileName(t.Name()) - if !ok { - continue - } - d.logger.Debug("[downloader] addd webseeds", "file", t.Name()) - t.AddWebSeeds(urls) - } -} diff --git a/erigon-lib/downloader/downloader_grpc_server.go b/erigon-lib/downloader/downloader_grpc_server.go index bb50349ff..8ac525128 100644 --- a/erigon-lib/downloader/downloader_grpc_server.go +++ b/erigon-lib/downloader/downloader_grpc_server.go @@ -46,7 +46,6 @@ type GrpcServer struct { func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.DownloadRequest) (*emptypb.Empty, error) { logEvery := time.NewTicker(20 * time.Second) defer logEvery.Stop() - defer s.d.applyWebseeds() for i, it := range request.Items { if it.Path == "" { diff --git a/erigon-lib/downloader/downloader_test.go b/erigon-lib/downloader/downloader_test.go index ef778ccc0..493e3bddd 100644 --- a/erigon-lib/downloader/downloader_test.go +++ b/erigon-lib/downloader/downloader_test.go @@ -18,7 +18,7 @@ func TestChangeInfoHashOfSameFile(t *testing.T) { dirs := datadir.New(t.TempDir()) cfg, err := downloadercfg2.New(dirs, "", lg.Info, 0, 0, 0, 0, 0, nil, "") require.NoError(err) - d, err := New(context.Background(), cfg, log.New()) + d, err := New(context.Background(), cfg, log.New(), log.LvlInfo) require.NoError(err) defer d.Close() err = d.AddInfoHashAsMagnetLink(d.ctx, snaptype.Hex2InfoHash("aa"), "a.seg") diff --git a/erigon-lib/downloader/util.go b/erigon-lib/downloader/util.go index 8ff80c423..44ebd3628 100644 --- a/erigon-lib/downloader/util.go +++ b/erigon-lib/downloader/util.go @@ -32,7 +32,6 @@ import ( "github.com/anacrolix/torrent/metainfo" common2 "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/cmp" - "github.com/ledgerwatch/erigon-lib/common/dbg" dir2 "github.com/ledgerwatch/erigon-lib/common/dir" "github.com/ledgerwatch/erigon-lib/downloader/downloadercfg" "github.com/ledgerwatch/erigon-lib/downloader/snaptype" @@ -264,15 +263,13 @@ func BuildTorrentFilesIfNeed(ctx context.Context, snapDir string) error { }) } - var m runtime.MemStats Loop: for int(i.Load()) < len(files) { select { case <-ctx.Done(): break Loop // g.Wait() will return right error case <-logEvery.C: - dbg.ReadMemStats(&m) - log.Info("[snapshots] Creating .torrent files", "progress", fmt.Sprintf("%d/%d", i.Load(), len(files)), "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys)) + log.Info("[snapshots] Creating .torrent files", "progress", fmt.Sprintf("%d/%d", i.Load(), len(files))) } } if err := g.Wait(); err != nil { @@ -399,7 +396,12 @@ func saveTorrent(torrentFilePath string, res []byte) error { // added first time - pieces verification process will start (disk IO heavy) - Progress // kept in `piece completion storage` (surviving reboot). Once it done - no disk IO needed again. // Don't need call torrent.VerifyData manually -func addTorrentFile(ts *torrent.TorrentSpec, torrentClient *torrent.Client) (*torrent.Torrent, error) { +func addTorrentFile(ctx context.Context, ts *torrent.TorrentSpec, torrentClient *torrent.Client) (*torrent.Torrent, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } if _, ok := torrentClient.Torrent(ts.InfoHash); !ok { // can set ChunkSize only for new torrents ts.ChunkSize = downloadercfg.DefaultNetworkChunkSize } else { diff --git a/erigon-lib/downloader/webseed.go b/erigon-lib/downloader/webseed.go index 57abde71c..58c40bdee 100644 --- a/erigon-lib/downloader/webseed.go +++ b/erigon-lib/downloader/webseed.go @@ -90,12 +90,15 @@ func (d *WebSeeds) downloadTorrentFilesFromProviders(ctx context.Context, rootDi if len(d.TorrentUrls()) == 0 { return } + var addedNew int e, ctx := errgroup.WithContext(ctx) - for name, tUrls := range d.TorrentUrls() { + urlsByName := d.TorrentUrls() + for name, tUrls := range urlsByName { tPath := filepath.Join(rootDir, name) if dir.FileExist(tPath) { continue } + addedNew++ tUrls := tUrls e.Go(func() error { for _, url := range tUrls { @@ -116,6 +119,9 @@ func (d *WebSeeds) downloadTorrentFilesFromProviders(ctx context.Context, rootDi if err := e.Wait(); err != nil { d.logger.Warn("[downloader] webseed discover", "err", err) } + if addedNew > 0 { + d.logger.Debug("[snapshots] downloaded .torrent from webseed", "amount", addedNew) + } } func (d *WebSeeds) TorrentUrls() snaptype.TorrentUrls { @@ -124,6 +130,12 @@ func (d *WebSeeds) TorrentUrls() snaptype.TorrentUrls { return d.torrentUrls } +func (d *WebSeeds) Len() int { + d.lock.Lock() + defer d.lock.Unlock() + return len(d.byFileName) +} + func (d *WebSeeds) ByFileName(name string) (metainfo.UrlList, bool) { d.lock.Lock() defer d.lock.Unlock() diff --git a/eth/backend.go b/eth/backend.go index 19250bf8d..41afbd253 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -1106,7 +1106,7 @@ func (s *Ethereum) setUpSnapDownloader(ctx context.Context, downloaderCfg *downl s.downloaderClient, err = downloadergrpc.NewClient(ctx, s.config.Snapshot.DownloaderAddr) } else { // start embedded Downloader - s.downloader, err = downloader3.New(ctx, downloaderCfg, s.logger) + s.downloader, err = downloader3.New(ctx, downloaderCfg, s.logger, log.LvlInfo) if err != nil { return err }