atomic CRUD for .torrent files (#9043)

This commit is contained in:
Alex Sharov 2023-12-21 12:15:32 +07:00 committed by GitHub
parent 1499fbb582
commit 9eb9151be4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 149 additions and 76 deletions

View File

@ -241,7 +241,7 @@ var createTorrent = &cobra.Command{
RunE: func(cmd *cobra.Command, args []string) error {
//logger := debug.SetupCobra(cmd, "integration")
dirs := datadir.New(datadirCli)
err := downloader.BuildTorrentFilesIfNeed(cmd.Context(), dirs)
err := downloader.BuildTorrentFilesIfNeed(cmd.Context(), dirs, downloader.NewAtomicTorrentFiles(dirs.Snap))
if err != nil {
return err
}
@ -318,6 +318,8 @@ func doPrintTorrentHashes(ctx context.Context, logger log.Logger) error {
return err
}
tf := downloader.NewAtomicTorrentFiles(dirs.Snap)
if forceRebuild { // remove and create .torrent files (will re-read all snapshots)
//removePieceCompletionStorage(snapDir)
files, err := downloader.AllTorrentPaths(dirs)
@ -329,13 +331,13 @@ func doPrintTorrentHashes(ctx context.Context, logger log.Logger) error {
return err
}
}
if err := downloader.BuildTorrentFilesIfNeed(ctx, dirs); err != nil {
if err := downloader.BuildTorrentFilesIfNeed(ctx, dirs, tf); err != nil {
return fmt.Errorf("BuildTorrentFilesIfNeed: %w", err)
}
}
res := map[string]string{}
torrents, err := downloader.AllTorrentSpecs(dirs)
torrents, err := downloader.AllTorrentSpecs(dirs, tf)
if err != nil {
return err
}

View File

@ -68,6 +68,8 @@ type Downloader struct {
webseeds *WebSeeds
logger log.Logger
verbosity log.Lvl
torrentFiles *TorrentFiles
}
type AggStats struct {
@ -112,7 +114,9 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg, dirs datadir.Dirs, logger
webseeds: &WebSeeds{logger: logger, verbosity: verbosity, downloadTorrentFile: cfg.DownloadTorrentFilesFromWebseed, torrentsWhitelist: cfg.ExpectedTorrentFilesHashes},
logger: logger,
verbosity: verbosity,
torrentFiles: &TorrentFiles{dir: cfg.Dirs.Snap},
}
d.webseeds.torrentFiles = d.torrentFiles
d.ctx, d.stopMainLoop = context.WithCancel(ctx)
if err := d.BuildTorrentFilesIfNeed(d.ctx); err != nil {
@ -580,11 +584,11 @@ func (d *Downloader) AddNewSeedableFile(ctx context.Context, name string) error
}
// if we don't have the torrent file we build it if we have the .seg file
torrentFilePath, err := BuildTorrentIfNeed(ctx, name, d.SnapDir())
err := BuildTorrentIfNeed(ctx, name, d.SnapDir(), d.torrentFiles)
if err != nil {
return fmt.Errorf("AddNewSeedableFile: %w", err)
}
ts, err := loadTorrent(torrentFilePath)
ts, err := d.torrentFiles.LoadByName(name)
if err != nil {
return fmt.Errorf("AddNewSeedableFile: %w", err)
}
@ -642,7 +646,7 @@ func (d *Downloader) AddMagnetLink(ctx context.Context, infoHash metainfo.Hash,
}
mi := t.Metainfo()
if err := CreateTorrentFileIfNotExists(d.SnapDir(), t.Info(), &mi); err != nil {
if err := CreateTorrentFileIfNotExists(d.SnapDir(), t.Info(), &mi, d.torrentFiles); err != nil {
d.logger.Warn("[snapshots] create torrent file", "err", err)
return
}
@ -675,7 +679,7 @@ func (d *Downloader) addTorrentFilesFromDisk(quiet bool) error {
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()
files, err := AllTorrentSpecs(d.cfg.Dirs)
files, err := AllTorrentSpecs(d.cfg.Dirs, d.torrentFiles)
if err != nil {
return err
}
@ -695,7 +699,7 @@ func (d *Downloader) addTorrentFilesFromDisk(quiet bool) error {
return nil
}
func (d *Downloader) BuildTorrentFilesIfNeed(ctx context.Context) error {
return BuildTorrentFilesIfNeed(ctx, d.cfg.Dirs)
return BuildTorrentFilesIfNeed(ctx, d.cfg.Dirs, d.torrentFiles)
}
func (d *Downloader) Stats() AggStats {
d.statsLock.RLock()

View File

@ -108,7 +108,7 @@ func (s *GrpcServer) Delete(ctx context.Context, request *proto_downloader.Delet
fPath := filepath.Join(s.d.SnapDir(), name)
_ = os.Remove(fPath)
_ = os.Remove(fPath + ".torrent")
s.d.torrentFiles.Delete(name)
}
return &emptypb.Empty{}, nil
}

View File

@ -48,19 +48,20 @@ func TestNoEscape(t *testing.T) {
dirs := datadir.New(t.TempDir())
ctx := context.Background()
tf := NewAtomicTorrentFiles(dirs.Snap)
// allow adding files only if they are inside snapshots dir
_, err := BuildTorrentIfNeed(ctx, "a.seg", dirs.Snap)
err := BuildTorrentIfNeed(ctx, "a.seg", dirs.Snap, tf)
require.NoError(err)
_, err = BuildTorrentIfNeed(ctx, "b/a.seg", dirs.Snap)
err = BuildTorrentIfNeed(ctx, "b/a.seg", dirs.Snap, tf)
require.NoError(err)
_, err = BuildTorrentIfNeed(ctx, filepath.Join(dirs.Snap, "a.seg"), dirs.Snap)
err = BuildTorrentIfNeed(ctx, filepath.Join(dirs.Snap, "a.seg"), dirs.Snap, tf)
require.NoError(err)
_, err = BuildTorrentIfNeed(ctx, filepath.Join(dirs.Snap, "b", "a.seg"), dirs.Snap)
err = BuildTorrentIfNeed(ctx, filepath.Join(dirs.Snap, "b", "a.seg"), dirs.Snap, tf)
require.NoError(err)
// reject escaping snapshots dir
_, err = BuildTorrentIfNeed(ctx, filepath.Join(dirs.Chaindata, "b", "a.seg"), dirs.Snap)
err = BuildTorrentIfNeed(ctx, filepath.Join(dirs.Chaindata, "b", "a.seg"), dirs.Snap, tf)
require.Error(err)
_, err = BuildTorrentIfNeed(ctx, "./../a.seg", dirs.Snap)
err = BuildTorrentIfNeed(ctx, "./../a.seg", dirs.Snap, tf)
require.Error(err)
}

View File

@ -0,0 +1,106 @@
package downloader
import (
"fmt"
"os"
"path/filepath"
"sync"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo"
dir2 "github.com/ledgerwatch/erigon-lib/common/dir"
)
// TorrentFiles - does provide thread-safe CRUD operations on .torrent files
type TorrentFiles struct {
lock sync.Mutex
dir string
}
func NewAtomicTorrentFiles(dir string) *TorrentFiles {
return &TorrentFiles{dir: dir}
}
func (tf *TorrentFiles) Exists(name string) bool {
tf.lock.Lock()
defer tf.lock.Unlock()
return tf.exists(name)
}
func (tf *TorrentFiles) exists(name string) bool {
fPath := filepath.Join(tf.dir, name)
return dir2.FileExist(fPath + ".torrent")
}
func (tf *TorrentFiles) Delete(name string) error {
tf.lock.Lock()
defer tf.lock.Unlock()
return tf.delete(name)
}
func (tf *TorrentFiles) delete(name string) error {
fPath := filepath.Join(tf.dir, name)
return os.Remove(fPath + ".torrent")
}
func (tf *TorrentFiles) Create(torrentFilePath string, res []byte) error {
tf.lock.Lock()
defer tf.lock.Unlock()
return tf.create(torrentFilePath, res)
}
func (tf *TorrentFiles) create(torrentFilePath string, res []byte) error {
if len(res) == 0 {
return fmt.Errorf("try to write 0 bytes to file: %s", torrentFilePath)
}
f, err := os.Create(torrentFilePath)
if err != nil {
return err
}
defer f.Close()
if _, err = f.Write(res); err != nil {
return err
}
if err = f.Sync(); err != nil {
return err
}
return nil
}
func (tf *TorrentFiles) CreateTorrentFromMetaInfo(fPath string, mi *metainfo.MetaInfo) error {
tf.lock.Lock()
defer tf.lock.Unlock()
return tf.createTorrentFromMetaInfo(fPath, mi)
}
func (tf *TorrentFiles) createTorrentFromMetaInfo(fPath string, mi *metainfo.MetaInfo) error {
file, err := os.Create(fPath)
if err != nil {
return err
}
defer file.Close()
if err := mi.Write(file); err != nil {
return err
}
file.Sync()
return nil
}
func (tf *TorrentFiles) LoadByName(fName string) (*torrent.TorrentSpec, error) {
tf.lock.Lock()
defer tf.lock.Unlock()
fPath := filepath.Join(tf.dir, fName+".torrent")
return tf.load(fPath)
}
func (tf *TorrentFiles) LoadByPath(fPath string) (*torrent.TorrentSpec, error) {
tf.lock.Lock()
defer tf.lock.Unlock()
return tf.load(fPath)
}
func (tf *TorrentFiles) load(fPath string) (*torrent.TorrentSpec, error) {
mi, err := metainfo.LoadFromFile(fPath)
if err != nil {
return nil, fmt.Errorf("LoadFromFile: %w, file=%s", err, fPath)
}
mi.AnnounceList = Trackers
return torrent.TorrentSpecFromMetaInfoErr(mi)
}

View File

@ -19,7 +19,6 @@ package downloader
import (
"context"
"fmt"
"os"
"path/filepath"
"regexp"
"runtime"
@ -142,36 +141,36 @@ func ensureCantLeaveDir(fName, root string) (string, error) {
return fName, nil
}
func BuildTorrentIfNeed(ctx context.Context, fName, root string) (torrentFilePath string, err error) {
func BuildTorrentIfNeed(ctx context.Context, fName, root string, torrentFiles *TorrentFiles) (err error) {
select {
case <-ctx.Done():
return "", ctx.Err()
return ctx.Err()
default:
}
fName, err = ensureCantLeaveDir(fName, root)
if err != nil {
return "", err
return err
}
fPath := filepath.Join(root, fName)
if dir2.FileExist(fPath + ".torrent") {
return fPath, nil
if torrentFiles.Exists(fName) {
return nil
}
fPath := filepath.Join(root, fName)
if !dir2.FileExist(fPath) {
return fPath, nil
return nil
}
info := &metainfo.Info{PieceLength: downloadercfg.DefaultPieceSize, Name: fName}
if err := info.BuildFromFilePath(fPath); err != nil {
return "", fmt.Errorf("createTorrentFileFromSegment: %w", err)
return fmt.Errorf("createTorrentFileFromSegment: %w", err)
}
info.Name = fName
return fPath + ".torrent", CreateTorrentFileFromInfo(root, info, nil)
return CreateTorrentFileFromInfo(root, info, nil, torrentFiles)
}
// BuildTorrentFilesIfNeed - create .torrent files from .seg files (big IO) - if .seg files were added manually
func BuildTorrentFilesIfNeed(ctx context.Context, dirs datadir.Dirs) error {
func BuildTorrentFilesIfNeed(ctx context.Context, dirs datadir.Dirs, torrentFiles *TorrentFiles) error {
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()
@ -188,7 +187,7 @@ func BuildTorrentFilesIfNeed(ctx context.Context, dirs datadir.Dirs) error {
file := file
g.Go(func() error {
defer i.Add(1)
if _, err := BuildTorrentIfNeed(ctx, file, dirs.Snap); err != nil {
if err := BuildTorrentIfNeed(ctx, file, dirs.Snap, torrentFiles); err != nil {
return err
}
return nil
@ -213,12 +212,11 @@ Loop:
return nil
}
func CreateTorrentFileIfNotExists(root string, info *metainfo.Info, mi *metainfo.MetaInfo) error {
fPath := filepath.Join(root, info.Name)
if dir2.FileExist(fPath + ".torrent") {
func CreateTorrentFileIfNotExists(root string, info *metainfo.Info, mi *metainfo.MetaInfo, torrentFiles *TorrentFiles) error {
if torrentFiles.Exists(info.Name) {
return nil
}
if err := CreateTorrentFileFromInfo(root, info, mi); err != nil {
if err := CreateTorrentFileFromInfo(root, info, mi, torrentFiles); err != nil {
return err
}
return nil
@ -241,25 +239,12 @@ func CreateMetaInfo(info *metainfo.Info, mi *metainfo.MetaInfo) (*metainfo.MetaI
}
return mi, nil
}
func CreateTorrentFromMetaInfo(root string, info *metainfo.Info, mi *metainfo.MetaInfo) error {
torrentFileName := filepath.Join(root, info.Name+".torrent")
file, err := os.Create(torrentFileName)
if err != nil {
return err
}
defer file.Close()
if err := mi.Write(file); err != nil {
return err
}
file.Sync()
return nil
}
func CreateTorrentFileFromInfo(root string, info *metainfo.Info, mi *metainfo.MetaInfo) (err error) {
func CreateTorrentFileFromInfo(root string, info *metainfo.Info, mi *metainfo.MetaInfo, torrentFiles *TorrentFiles) (err error) {
mi, err = CreateMetaInfo(info, mi)
if err != nil {
return err
}
return CreateTorrentFromMetaInfo(root, info, mi)
return torrentFiles.CreateTorrentFromMetaInfo(root, mi)
}
func AllTorrentPaths(dirs datadir.Dirs) ([]string, error) {
@ -275,7 +260,7 @@ func AllTorrentPaths(dirs datadir.Dirs) ([]string, error) {
return files, nil
}
func AllTorrentSpecs(dirs datadir.Dirs) (res []*torrent.TorrentSpec, err error) {
func AllTorrentSpecs(dirs datadir.Dirs, torrentFiles *TorrentFiles) (res []*torrent.TorrentSpec, err error) {
files, err := AllTorrentPaths(dirs)
if err != nil {
return nil, err
@ -284,7 +269,7 @@ func AllTorrentSpecs(dirs datadir.Dirs) (res []*torrent.TorrentSpec, err error)
if len(fPath) == 0 {
continue
}
a, err := loadTorrent(fPath)
a, err := torrentFiles.LoadByPath(fPath)
if err != nil {
return nil, fmt.Errorf("AllTorrentSpecs: %w", err)
}
@ -293,15 +278,6 @@ func AllTorrentSpecs(dirs datadir.Dirs) (res []*torrent.TorrentSpec, err error)
return res, nil
}
func loadTorrent(torrentFilePath string) (*torrent.TorrentSpec, error) {
mi, err := metainfo.LoadFromFile(torrentFilePath)
if err != nil {
return nil, fmt.Errorf("LoadFromFile: %w, file=%s", err, torrentFilePath)
}
mi.AnnounceList = Trackers
return torrent.TorrentSpecFromMetaInfoErr(mi)
}
// addTorrentFile - adding .torrent file to torrentClient (and checking their hashes), if .torrent file
// added first time - pieces verification process will start (disk IO heavy) - Progress
// kept in `piece completion storage` (surviving reboot). Once it done - no disk IO needed again.
@ -382,21 +358,3 @@ func readPeerID(db kv.RoDB) (peerID []byte, err error) {
func IsLocal(path string) bool {
return isLocal(path)
}
func saveTorrent(torrentFilePath string, res []byte) error {
if len(res) == 0 {
return fmt.Errorf("try to write 0 bytes to file: %s", torrentFilePath)
}
f, err := os.Create(torrentFilePath)
if err != nil {
return err
}
defer f.Close()
if _, err = f.Write(res); err != nil {
return err
}
if err = f.Sync(); err != nil {
return err
}
return nil
}

View File

@ -40,6 +40,8 @@ type WebSeeds struct {
logger log.Logger
verbosity log.Lvl
torrentFiles *TorrentFiles
}
func (d *WebSeeds) Discover(ctx context.Context, s3tokens []string, urls []*url.URL, files []string, rootDir string) {
@ -261,7 +263,7 @@ func (d *WebSeeds) downloadTorrentFilesFromProviders(ctx context.Context, rootDi
continue
}
d.logger.Log(d.verbosity, "[snapshots] got from webseed", "name", name)
if err := saveTorrent(tPath, res); err != nil {
if err := d.torrentFiles.Create(tPath, res); err != nil {
d.logger.Debug("[snapshots] saveTorrent", "err", err)
continue
}