Downloader: step towards more complex datadir (#8286)

migration included - no manual actions required
This commit is contained in:
Alex Sharov 2023-10-04 11:01:02 +07:00 committed by GitHub
parent ce47ad30e2
commit fa3b8c23b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 397 additions and 336 deletions

View File

@ -1,7 +1,7 @@
package devnet
import (
context "context"
"context"
"fmt"
"math/big"
"net/http"
@ -167,7 +167,7 @@ func (n *node) run(ctx *cli.Context) error {
n.ethCfg.Bor.StateSyncConfirmationDelay = map[string]uint64{"0": uint64(n.network.BorStateSyncDelay.Seconds())}
}
n.ethNode, err = enode.New(n.nodeCfg, n.ethCfg, logger)
n.ethNode, err = enode.New(ctx.Context, n.nodeCfg, n.ethCfg, logger)
if metricsMux != nil {
diagnostics.Setup(ctx, metricsMux, n.ethNode)

View File

@ -7,6 +7,8 @@ import (
"net"
"os"
"path/filepath"
"runtime"
"strings"
"time"
"github.com/ledgerwatch/erigon-lib/common/dir"
@ -15,8 +17,8 @@ import (
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/erigon/cmd/hack/tool"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snapcfg"
"github.com/pelletier/go-toml/v2"
"github.com/anacrolix/torrent/metainfo"
"github.com/c2h5oh/datasize"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
@ -26,7 +28,6 @@ import (
downloadercfg2 "github.com/ledgerwatch/erigon-lib/downloader/downloadercfg"
proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader"
"github.com/ledgerwatch/log/v3"
"github.com/pelletier/go-toml"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
@ -121,6 +122,7 @@ func withFile(cmd *cobra.Command) {
}
}
var logger log.Logger
var rootCmd = &cobra.Command{
Use: "",
Short: "snapshot downloader",
@ -128,8 +130,11 @@ var rootCmd = &cobra.Command{
PersistentPostRun: func(cmd *cobra.Command, args []string) {
debug.Exit()
},
PersistentPreRun: func(cmd *cobra.Command, args []string) {
logger = debug.SetupCobra(cmd, "downloader")
logger.Info("Build info", "git_branch", params.GitBranch, "git_tag", params.GitTag, "git_commit", params.GitCommit)
},
Run: func(cmd *cobra.Command, args []string) {
logger := debug.SetupCobra(cmd, "integration")
if err := Downloader(cmd.Context(), logger); err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error(err.Error())
@ -157,7 +162,7 @@ func Downloader(ctx context.Context, logger log.Logger) error {
return err
}
logger.Info("Run snapshot downloader", "addr", downloaderApiAddr, "datadir", dirs.DataDir, "ipv6-enabled", !disableIPV6, "ipv4-enabled", !disableIPV4, "download.rate", downloadRate.String(), "upload.rate", uploadRate.String())
logger.Info("[snapshots] cli flags", "chain", chain, "addr", downloaderApiAddr, "datadir", dirs.DataDir, "ipv6-enabled", !disableIPV6, "ipv4-enabled", !disableIPV4, "download.rate", downloadRate.String(), "upload.rate", uploadRate.String(), "webseed", webseeds)
staticPeers := common.CliString2Array(staticPeersStr)
version := "erigon: " + params.VersionWithCommit(params.GitCommit)
@ -166,6 +171,7 @@ func Downloader(ctx context.Context, logger log.Logger) error {
return err
}
cfg.ClientConfig.PieceHashersPerTorrent = runtime.NumCPU() * 4
cfg.ClientConfig.DisableIPv6 = disableIPV6
cfg.ClientConfig.DisableIPv4 = disableIPV4
@ -180,7 +186,7 @@ func Downloader(ctx context.Context, logger log.Logger) error {
return err
}
defer d.Close()
logger.Info("[torrent] Start", "my peerID", fmt.Sprintf("%x", d.TorrentClient().PeerID()))
logger.Info("[snapshots] Start bittorrent server", "my_peer_id", fmt.Sprintf("%x", d.TorrentClient().PeerID()))
d.MainLoopInBackground(false)
@ -215,7 +221,7 @@ var createTorrent = &cobra.Command{
RunE: func(cmd *cobra.Command, args []string) error {
//logger := debug.SetupCobra(cmd, "integration")
dirs := datadir.New(datadirCli)
err := downloader.BuildTorrentFilesIfNeed(context.Background(), dirs.Snap)
err := downloader.BuildTorrentFilesIfNeed(cmd.Context(), dirs)
if err != nil {
return err
}
@ -227,71 +233,75 @@ var printTorrentHashes = &cobra.Command{
Use: "torrent_hashes",
Example: "go run ./cmd/downloader torrent_hashes --datadir <your_datadir>",
RunE: func(cmd *cobra.Command, args []string) error {
logger := debug.SetupCobra(cmd, "integration")
dirs := datadir.New(datadirCli)
ctx := cmd.Context()
if forceRebuild { // remove and create .torrent files (will re-read all snapshots)
//removePieceCompletionStorage(snapDir)
files, err := downloader.AllTorrentPaths(dirs.Snap)
if err != nil {
return err
}
for _, filePath := range files {
if err := os.Remove(filePath); err != nil {
return err
}
}
if err := downloader.BuildTorrentFilesIfNeed(ctx, dirs.Snap); err != nil {
return err
}
}
res := map[string]string{}
files, err := downloader.AllTorrentPaths(dirs.Snap)
if err != nil {
return err
}
for _, torrentFilePath := range files {
mi, err := metainfo.LoadFromFile(torrentFilePath)
if err != nil {
return err
}
info, err := mi.UnmarshalInfo()
if err != nil {
return err
}
res[info.Name] = mi.HashInfoBytes().String()
}
serialized, err := toml.Marshal(res)
if err != nil {
return err
}
if targetFile == "" {
fmt.Printf("%s\n", serialized)
return nil
}
oldContent, err := os.ReadFile(targetFile)
if err != nil {
return err
}
oldLines := map[string]string{}
if err := toml.Unmarshal(oldContent, &oldLines); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}
if len(oldLines) >= len(res) {
logger.Info("amount of lines in target file is equal or greater than amount of lines in snapshot dir", "old", len(oldLines), "new", len(res))
return nil
}
if err := os.WriteFile(targetFile, serialized, 0644); err != nil { // nolint
return err
logger := debug.SetupCobra(cmd, "downloader")
if err := doPrintTorrentHashes(cmd.Context(), logger); err != nil {
log.Error(err.Error())
}
return nil
},
}
func doPrintTorrentHashes(ctx context.Context, logger log.Logger) error {
dirs := datadir.New(datadirCli)
if forceRebuild { // remove and create .torrent files (will re-read all snapshots)
//removePieceCompletionStorage(snapDir)
files, err := downloader.AllTorrentPaths(dirs)
if err != nil {
return err
}
for _, filePath := range files {
if err := os.Remove(filePath); err != nil {
return err
}
}
if err := downloader.BuildTorrentFilesIfNeed(ctx, dirs); err != nil {
return fmt.Errorf("BuildTorrentFilesIfNeed: %w", err)
}
}
res := map[string]string{}
torrents, err := downloader.AllTorrentSpecs(dirs)
if err != nil {
return err
}
for _, t := range torrents {
// we don't release commitment history in this time. let's skip it here.
if strings.HasPrefix(t.DisplayName, "history/commitment") {
continue
}
if strings.HasPrefix(t.DisplayName, "idx/commitment") {
continue
}
res[t.DisplayName] = t.InfoHash.String()
}
serialized, err := toml.Marshal(res)
if err != nil {
return err
}
if targetFile == "" {
fmt.Printf("%s\n", serialized)
return nil
}
oldContent, err := os.ReadFile(targetFile)
if err != nil {
return err
}
oldLines := map[string]string{}
if err := toml.Unmarshal(oldContent, &oldLines); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}
if len(oldLines) >= len(res) {
logger.Info("amount of lines in target file is equal or greater than amount of lines in snapshot dir", "old", len(oldLines), "new", len(res))
return nil
}
if err := os.WriteFile(targetFile, serialized, 0644); err != nil { // nolint
return err
}
return nil
}
func StartGrpc(snServer *downloader.GrpcServer, addr string, creds *credentials.TransportCredentials, logger log.Logger) (*grpc.Server, error) {
lis, err := net.Listen("tcp", addr)
if err != nil {

View File

@ -72,7 +72,7 @@ func runErigon(cliCtx *cli.Context) error {
nodeCfg := node.NewNodConfigUrfave(cliCtx, logger)
ethCfg := node.NewEthConfigUrfave(cliCtx, nodeCfg, logger)
ethNode, err := node.New(nodeCfg, ethCfg, logger)
ethNode, err := node.New(cliCtx.Context, nodeCfg, ethCfg, logger)
if err != nil {
log.Error("Erigon startup", "err", err)
return err

View File

@ -17,7 +17,14 @@
package datadir
import (
"errors"
"fmt"
"os"
"path/filepath"
"syscall"
"github.com/gofrs/flock"
"github.com/ledgerwatch/erigon-lib/common/dir"
)
// Dirs is the file system folder the node should use for any data storage
@ -30,7 +37,11 @@ type Dirs struct {
Chaindata string
Tmp string
Snap string
SnapIdx string
SnapHistory string
SnapDomain string
SnapAccessors string
Downloader string
TxPool string
Nodes string
}
@ -46,14 +57,141 @@ func New(datadir string) Dirs {
datadir = absdatadir
}
return Dirs{
dirs := Dirs{
RelativeDataDir: relativeDataDir,
DataDir: datadir,
Chaindata: filepath.Join(datadir, "chaindata"),
Tmp: filepath.Join(datadir, "temp"),
Snap: filepath.Join(datadir, "snapshots"),
SnapIdx: filepath.Join(datadir, "snapshots", "idx"),
SnapHistory: filepath.Join(datadir, "snapshots", "history"),
SnapDomain: filepath.Join(datadir, "snapshots", "domain"),
SnapAccessors: filepath.Join(datadir, "snapshots", "accessor"),
Downloader: filepath.Join(datadir, "snapshots", "db"),
TxPool: filepath.Join(datadir, "txpool"),
Nodes: filepath.Join(datadir, "nodes"),
}
dir.MustExist(dirs.Chaindata, dirs.Tmp,
dirs.SnapIdx, dirs.SnapHistory, dirs.SnapDomain, dirs.SnapAccessors,
dirs.Downloader, dirs.TxPool, dirs.Nodes)
return dirs
}
var (
ErrDataDirLocked = errors.New("datadir already used by another process")
datadirInUseErrNos = map[uint]bool{11: true, 32: true, 35: true}
)
func convertFileLockError(err error) error {
//nolint
if errno, ok := err.(syscall.Errno); ok && datadirInUseErrNos[uint(errno)] {
return ErrDataDirLocked
}
return err
}
func Flock(dirs Dirs) (*flock.Flock, bool, error) {
// Lock the instance directory to prevent concurrent use by another instance as well as
// accidental use of the instance directory as a database.
l := flock.New(filepath.Join(dirs.DataDir, "LOCK"))
locked, err := l.TryLock()
if err != nil {
return nil, false, convertFileLockError(err)
}
return l, locked, nil
}
// ApplyMigrations - if can get flock.
func ApplyMigrations(dirs Dirs) error {
lock, locked, err := Flock(dirs)
if err != nil {
return err
}
if !locked {
return nil
}
defer lock.Unlock()
if err := downloaderV2Migration(dirs); err != nil {
return err
}
//if err := erigonV3foldersV31Migration(dirs); err != nil {
// return err
//}
return nil
}
func downloaderV2Migration(dirs Dirs) error {
// move db from `datadir/snapshot/db` to `datadir/downloader`
if dir.Exist(filepath.Join(dirs.Snap, "db", "mdbx.dat")) { // migration from prev versions
from, to := filepath.Join(dirs.Snap, "db", "mdbx.dat"), filepath.Join(dirs.Downloader, "mdbx.dat")
if err := os.Rename(from, to); err != nil {
//fall back to copy-file if folders are on different disks
if err := copyFile(from, to); err != nil {
return err
}
}
}
return nil
}
// nolint
func erigonV3foldersV31Migration(dirs Dirs) error {
// migrate files db from `datadir/snapshot/warm` to `datadir/snapshots/domain`
if dir.Exist(filepath.Join(dirs.Snap, "warm")) {
warmDir := filepath.Join(dirs.Snap, "warm")
moveFiles(warmDir, dirs.SnapDomain, ".kv")
os.Rename(filepath.Join(dirs.SnapHistory, "salt.txt"), filepath.Join(dirs.Snap, "salt.txt"))
moveFiles(warmDir, dirs.SnapDomain, ".kv")
moveFiles(warmDir, dirs.SnapDomain, ".kvei")
moveFiles(warmDir, dirs.SnapDomain, ".bt")
moveFiles(dirs.SnapHistory, dirs.SnapAccessors, ".vi")
moveFiles(dirs.SnapHistory, dirs.SnapAccessors, ".efi")
moveFiles(dirs.SnapHistory, dirs.SnapAccessors, ".efei")
moveFiles(dirs.SnapHistory, dirs.SnapIdx, ".ef")
}
return nil
}
// nolint
func moveFiles(from, to string, ext string) error {
files, err := os.ReadDir(from)
if err != nil {
return fmt.Errorf("ReadDir: %w, %s", err, from)
}
for _, f := range files {
if f.Type().IsDir() || !f.Type().IsRegular() {
continue
}
if filepath.Ext(f.Name()) != ext {
continue
}
_ = os.Rename(filepath.Join(from, f.Name()), filepath.Join(to, f.Name()))
}
return nil
}
func copyFile(from, to string) error {
r, err := os.Open(from)
if err != nil {
return fmt.Errorf("please manually move file: from %s to %s. error: %w", from, to, err)
}
defer r.Close()
w, err := os.Create(to)
if err != nil {
return fmt.Errorf("please manually move file: from %s to %s. error: %w", from, to, err)
}
defer w.Close()
if _, err = w.ReadFrom(r); err != nil {
w.Close()
os.Remove(to)
return fmt.Errorf("please manually move file: from %s to %s. error: %w", from, to, err)
}
if err = w.Sync(); err != nil {
w.Close()
os.Remove(to)
return fmt.Errorf("please manually move file: from %s to %s. error: %w", from, to, err)
}
return nil
}

View File

@ -21,10 +21,12 @@ import (
"path/filepath"
)
func MustExist(path string) {
func MustExist(path ...string) {
const perm = 0764 // user rwx, group rw, other r
if err := os.MkdirAll(path, perm); err != nil {
panic(err)
for _, p := range path {
if err := os.MkdirAll(p, perm); err != nil {
panic(err)
}
}
}
@ -47,6 +49,24 @@ func FileExist(path string) bool {
return true
}
// nolint
func WriteFileWithFsync(name string, data []byte, perm os.FileMode) error {
f, err := os.OpenFile(name, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm)
if err != nil {
return err
}
defer f.Close()
_, err = f.Write(data)
if err != nil {
return err
}
err = f.Sync()
if err != nil {
return err
}
return err
}
func Recreate(dir string) {
if Exist(dir) {
_ = os.RemoveAll(dir)
@ -70,30 +90,42 @@ func HasFileOfType(dir, ext string) bool {
return false
}
func DeleteFilesOfType(dir string, exts ...string) {
d, err := os.Open(dir)
if err != nil {
if os.IsNotExist(err) {
return
// nolint
func DeleteFiles(dirs ...string) error {
for _, dir := range dirs {
files, err := ListFiles(dir)
if err != nil {
return err
}
panic(err)
}
defer d.Close()
files, err := d.Readdir(-1)
if err != nil {
panic(err)
}
for _, file := range files {
if !file.Mode().IsRegular() {
continue
}
for _, ext := range exts {
if filepath.Ext(file.Name()) == ext {
_ = os.Remove(filepath.Join(dir, file.Name()))
for _, fPath := range files {
if err := os.Remove(filepath.Join(dir, fPath)); err != nil {
return err
}
}
}
return nil
}
func ListFiles(dir string, extensions ...string) ([]string, error) {
files, err := os.ReadDir(dir)
if err != nil {
return nil, err
}
res := make([]string, 0, len(files))
for _, f := range files {
if f.IsDir() && !f.Type().IsRegular() {
continue
}
match := false
for _, ext := range extensions {
if filepath.Ext(f.Name()) == ext { // filter out only compressed files
match = true
}
}
if !match {
continue
}
res = append(res, filepath.Join(dir, f.Name()))
}
return res, nil
}

View File

@ -20,9 +20,6 @@ import (
"context"
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
@ -33,7 +30,7 @@ import (
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
common2 "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/dir"
"github.com/ledgerwatch/erigon-lib/common/datadir"
"github.com/ledgerwatch/erigon-lib/downloader/downloadercfg"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
@ -81,18 +78,18 @@ type AggStats struct {
}
func New(ctx context.Context, cfg *downloadercfg.Cfg, logger log.Logger, verbosity log.Lvl) (*Downloader, error) {
// Application must never see partially-downloaded files
// To provide such consistent view - downloader does:
// add <datadir>/snapshots/tmp - then method .onComplete will remove this suffix
// and App only work with <datadir>/snapshot s folder
if dir.FileExist(cfg.SnapDir + "_tmp") { // migration from prev versions
_ = os.Rename(cfg.SnapDir+"_tmp", filepath.Join(cfg.SnapDir, "tmp")) // ignore error, because maybe they are on different drive, or target folder already created manually, all is fine
}
if err := moveFromTmp(cfg.SnapDir); err != nil {
return nil, err
}
// move db from `datadir/snapshot/db` to `datadir/downloader`
//if dir.Exist(filepath.Join(cfg.Dirs.Snap, "db", "mdbx.dat")) { // migration from prev versions
// from, to := filepath.Join(cfg.Dirs.Snap, "db", "mdbx.dat"), filepath.Join(cfg.Dirs.Downloader, "mdbx.dat")
// if err := os.Rename(from, to); err != nil {
// //fall back to copy-file if folders are on different disks
// if err := copyFile(from, to); err != nil {
// return nil, err
// }
// }
//}
db, c, m, torrentClient, err := openClient(cfg.ClientConfig)
db, c, m, torrentClient, err := openClient(cfg.Dirs.Downloader, cfg.Dirs.Snap, cfg.ClientConfig)
if err != nil {
return nil, fmt.Errorf("openClient: %w", err)
}
@ -132,7 +129,7 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg, logger log.Logger, verbosi
d.wg.Add(1)
go func() {
defer d.wg.Done()
d.webseeds.Discover(d.ctx, d.cfg.WebSeedUrls, d.cfg.WebSeedFiles, d.cfg.SnapDir)
d.webseeds.Discover(d.ctx, d.cfg.WebSeedUrls, d.cfg.WebSeedFiles, d.cfg.Dirs.Snap)
// webseeds.Discover may create new .torrent files on disk
if err := d.addTorrentFilesFromDisk(); err != nil && !errors.Is(err, context.Canceled) {
d.logger.Warn("[snapshots] addTorrentFilesFromDisk", "err", err)
@ -312,7 +309,7 @@ func (d *Downloader) mainLoop(silent bool) error {
}
}
func (d *Downloader) SnapDir() string { return d.cfg.SnapDir }
func (d *Downloader) SnapDir() string { return d.cfg.Dirs.Snap }
func (d *Downloader) ReCalcStats(interval time.Duration) {
//Call this methods outside of `statsLock` critical section, because they have own locks with contention
@ -386,37 +383,6 @@ func (d *Downloader) ReCalcStats(interval time.Duration) {
d.stats = stats
}
func moveFromTmp(snapDir string) error {
tmpDir := filepath.Join(snapDir, "tmp")
if !dir.FileExist(tmpDir) {
return nil
}
snFs := os.DirFS(tmpDir)
paths, err := fs.ReadDir(snFs, ".")
if err != nil {
return err
}
for _, p := range paths {
if p.IsDir() || !p.Type().IsRegular() {
continue
}
if p.Name() == "tmp" {
continue
}
src := filepath.Join(tmpDir, p.Name())
if err := os.Rename(src, filepath.Join(snapDir, p.Name())); err != nil {
if os.IsExist(err) {
_ = os.Remove(src)
continue
}
return err
}
}
_ = os.Remove(tmpDir)
return nil
}
func (d *Downloader) verifyFile(ctx context.Context, t *torrent.Torrent, completePieces *atomic.Uint64) error {
select {
case <-ctx.Done():
@ -490,7 +456,9 @@ func (d *Downloader) VerifyData(ctx context.Context) error {
})
}
g.Wait()
if err := g.Wait(); err != nil {
return err
}
// force fsync of db. to not loose results of validation on power-off
return d.db.Update(context.Background(), func(tx kv.RwTx) error { return nil })
}
@ -569,22 +537,26 @@ func (d *Downloader) AddInfoHashAsMagnetLink(ctx context.Context, infoHash metai
return nil
}
func seedableFiles(snapDir string) ([]string, error) {
files, err := seedableSegmentFiles(snapDir)
func seedableFiles(dirs datadir.Dirs) ([]string, error) {
files, err := seedableSegmentFiles(dirs.Snap)
if err != nil {
return nil, fmt.Errorf("seedableSegmentFiles: %w", err)
}
files2, err := seedableHistorySnapshots(snapDir)
l, err := seedableSnapshotsBySubDir(dirs.Snap, "history")
if err != nil {
return nil, fmt.Errorf("seedableHistorySnapshots: %w", err)
return nil, err
}
files = append(files, files2...)
l2, err := seedableSnapshotsBySubDir(dirs.Snap, "warm")
if err != nil {
return nil, err
}
files = append(append(files, l...), l2...)
return files, nil
}
func (d *Downloader) addTorrentFilesFromDisk() error {
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()
files, err := allTorrentFiles(d.SnapDir())
files, err := AllTorrentSpecs(d.cfg.Dirs)
if err != nil {
return err
}
@ -606,7 +578,7 @@ func (d *Downloader) addTorrentFilesFromDisk() error {
return nil
}
func (d *Downloader) BuildTorrentFilesIfNeed(ctx context.Context) error {
return BuildTorrentFilesIfNeed(ctx, d.SnapDir())
return BuildTorrentFilesIfNeed(ctx, d.cfg.Dirs)
}
func (d *Downloader) Stats() AggStats {
@ -646,13 +618,12 @@ func (d *Downloader) StopSeeding(hash metainfo.Hash) error {
func (d *Downloader) TorrentClient() *torrent.Client { return d.torrentClient }
func openClient(cfg *torrent.ClientConfig) (db kv.RwDB, c storage.PieceCompletion, m storage.ClientImplCloser, torrentClient *torrent.Client, err error) {
snapDir := cfg.DataDir
func openClient(dbDir, snapDir string, cfg *torrent.ClientConfig) (db kv.RwDB, c storage.PieceCompletion, m storage.ClientImplCloser, torrentClient *torrent.Client, err error) {
db, err = mdbx.NewMDBX(log.New()).
Label(kv.DownloaderDB).
WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.DownloaderTablesCfg }).
SyncPeriod(15 * time.Second).
Path(filepath.Join(snapDir, "db")).
Path(dbDir).
Open()
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("torrentcfg.openClient: %w", err)
@ -664,13 +635,7 @@ func openClient(cfg *torrent.ClientConfig) (db kv.RwDB, c storage.PieceCompletio
m = storage.NewMMapWithCompletion(snapDir, c)
cfg.DefaultStorage = m
for retry := 0; retry < 5; retry++ {
torrentClient, err = torrent.NewClient(cfg)
if err == nil {
break
}
time.Sleep(10 * time.Millisecond)
}
torrentClient, err = torrent.NewClient(cfg)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("torrent.NewClient: %w", err)
}

View File

@ -46,10 +46,10 @@ const DefaultNetworkChunkSize = 512 * 1024
type Cfg struct {
ClientConfig *torrent.ClientConfig
SnapDir string
DownloadSlots int
WebSeedUrls []*url.URL
WebSeedFiles []string
Dirs datadir.Dirs
}
func Default() *torrent.ClientConfig {
@ -78,9 +78,9 @@ func Default() *torrent.ClientConfig {
return torrentConfig
}
func New(dataDir datadir.Dirs, version string, verbosity lg.Level, downloadRate, uploadRate datasize.ByteSize, port, connsPerFile, downloadSlots int, staticPeers []string, webseeds string) (*Cfg, error) {
func New(dirs datadir.Dirs, version string, verbosity lg.Level, downloadRate, uploadRate datasize.ByteSize, port, connsPerFile, downloadSlots int, staticPeers []string, webseeds string) (*Cfg, error) {
torrentConfig := Default()
torrentConfig.DataDir = dataDir.Snap // `DataDir` of torrent-client-lib is different from Erigon's `DataDir`. Just same naming.
torrentConfig.DataDir = dirs.Snap // `DataDir` of torrent-client-lib is different from Erigon's `DataDir`. Just same naming.
torrentConfig.ExtendedHandshakeClientVersion = version
@ -155,12 +155,12 @@ func New(dataDir datadir.Dirs, version string, verbosity lg.Level, downloadRate,
}
webseedUrls = append(webseedUrls, uri)
}
localCfgFile := filepath.Join(dataDir.DataDir, "webseeds.toml") // datadir/webseeds.toml allowed
localCfgFile := filepath.Join(dirs.DataDir, "webseeds.toml") // datadir/webseeds.toml allowed
if dir.FileExist(localCfgFile) {
webseedFiles = append(webseedFiles, localCfgFile)
}
return &Cfg{SnapDir: torrentConfig.DataDir,
return &Cfg{Dirs: dirs,
ClientConfig: torrentConfig, DownloadSlots: downloadSlots,
WebSeedUrls: webseedUrls, WebSeedFiles: webseedFiles,
}, nil

View File

@ -32,6 +32,7 @@ import (
"github.com/anacrolix/torrent/metainfo"
common2 "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/cmp"
"github.com/ledgerwatch/erigon-lib/common/datadir"
dir2 "github.com/ledgerwatch/erigon-lib/common/dir"
"github.com/ledgerwatch/erigon-lib/downloader/downloadercfg"
"github.com/ledgerwatch/erigon-lib/downloader/snaptype"
@ -63,116 +64,42 @@ var Trackers = [][]string{
//websocketTrackers // TODO: Ws protocol producing too many errors and flooding logs. But it's also very fast and reactive.
}
func AllTorrentPaths(dir string) ([]string, error) {
files, err := AllTorrentFiles(dir)
if err != nil {
return nil, err
}
histDir := filepath.Join(dir, "history")
files2, err := AllTorrentFiles(histDir)
if err != nil {
return nil, err
}
res := make([]string, 0, len(files)+len(files2))
for _, f := range files {
torrentFilePath := filepath.Join(dir, f)
res = append(res, torrentFilePath)
}
for _, f := range files2 {
torrentFilePath := filepath.Join(histDir, f)
res = append(res, torrentFilePath)
}
return res, nil
}
func AllTorrentFiles(dir string) ([]string, error) {
files, err := os.ReadDir(dir)
if err != nil {
return nil, err
}
res := make([]string, 0, len(files))
for _, f := range files {
if filepath.Ext(f.Name()) != ".torrent" { // filter out only compressed files
continue
}
fileInfo, err := f.Info()
if err != nil {
return nil, err
}
if fileInfo.Size() == 0 {
continue
}
res = append(res, f.Name())
}
return res, nil
}
func seedableSegmentFiles(dir string) ([]string, error) {
files, err := os.ReadDir(dir)
files, err := dir2.ListFiles(dir, ".seg")
if err != nil {
return nil, err
}
res := make([]string, 0, len(files))
for _, f := range files {
if f.IsDir() {
for _, fPath := range files {
_, name := filepath.Split(fPath)
if !snaptype.IsCorrectFileName(name) {
continue
}
if !f.Type().IsRegular() {
continue
}
if !snaptype.IsCorrectFileName(f.Name()) {
continue
}
if filepath.Ext(f.Name()) != ".seg" { // filter out only compressed files
continue
}
ff, ok := snaptype.ParseFileName(dir, f.Name())
ff, ok := snaptype.ParseFileName(dir, name)
if !ok {
continue
}
if !ff.Seedable() {
continue
}
res = append(res, f.Name())
res = append(res, name)
}
return res, nil
}
var historyFileRegex = regexp.MustCompile("^([[:lower:]]+).([0-9]+)-([0-9]+).(.*)$")
func seedableHistorySnapshots(dir string) ([]string, error) {
l, err := seedableSnapshotsBySubDir(dir, "history")
if err != nil {
return nil, err
}
l2, err := seedableSnapshotsBySubDir(dir, "warm")
if err != nil {
return nil, err
}
return append(l, l2...), nil
}
func seedableSnapshotsBySubDir(dir, subDir string) ([]string, error) {
historyDir := filepath.Join(dir, subDir)
dir2.MustExist(historyDir)
files, err := os.ReadDir(historyDir)
files, err := dir2.ListFiles(historyDir, ".kv", ".v", ".ef")
if err != nil {
return nil, err
}
res := make([]string, 0, len(files))
for _, f := range files {
if f.IsDir() {
continue
}
if !f.Type().IsRegular() {
continue
}
ext := filepath.Ext(f.Name())
if ext != ".v" && ext != ".ef" { // filter out only compressed files
continue
}
subs := historyFileRegex.FindStringSubmatch(f.Name())
for _, fPath := range files {
_, name := filepath.Split(fPath)
subs := historyFileRegex.FindStringSubmatch(name)
if len(subs) != 5 {
continue
}
@ -188,7 +115,7 @@ func seedableSnapshotsBySubDir(dir, subDir string) ([]string, error) {
if (to-from)%snaptype.Erigon3SeedableSteps != 0 {
continue
}
res = append(res, filepath.Join(subDir, f.Name()))
res = append(res, filepath.Join(subDir, name))
}
return res, nil
}
@ -239,11 +166,11 @@ func BuildTorrentIfNeed(ctx context.Context, fName, root string) (torrentFilePat
}
// BuildTorrentFilesIfNeed - create .torrent files from .seg files (big IO) - if .seg files were added manually
func BuildTorrentFilesIfNeed(ctx context.Context, snapDir string) error {
func BuildTorrentFilesIfNeed(ctx context.Context, dirs datadir.Dirs) error {
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()
files, err := seedableFiles(snapDir)
files, err := seedableFiles(dirs)
if err != nil {
return err
}
@ -256,7 +183,7 @@ func BuildTorrentFilesIfNeed(ctx context.Context, snapDir string) error {
file := file
g.Go(func() error {
defer i.Add(1)
if _, err := BuildTorrentIfNeed(ctx, file, snapDir); err != nil {
if _, err := BuildTorrentIfNeed(ctx, file, dirs.Snap); err != nil {
return err
}
return nil
@ -327,37 +254,26 @@ func CreateTorrentFileFromInfo(root string, info *metainfo.Info, mi *metainfo.Me
return CreateTorrentFromMetaInfo(root, info, mi)
}
func allTorrentFiles(snapDir string) (res []*torrent.TorrentSpec, err error) {
res, err = torrentInDir(snapDir)
func AllTorrentPaths(dirs datadir.Dirs) ([]string, error) {
files, err := dir2.ListFiles(dirs.Snap, ".torrent")
if err != nil {
return nil, err
}
res2, err := torrentInDir(filepath.Join(snapDir, "history"))
files2, err := dir2.ListFiles(dirs.SnapHistory, ".torrent")
if err != nil {
return nil, err
}
res = append(res, res2...)
res2, err = torrentInDir(filepath.Join(snapDir, "warm"))
if err != nil {
return nil, err
}
res = append(res, res2...)
return res, nil
files = append(files, files2...)
return files, nil
}
func torrentInDir(snapDir string) (res []*torrent.TorrentSpec, err error) {
files, err := os.ReadDir(snapDir)
func AllTorrentSpecs(dirs datadir.Dirs) (res []*torrent.TorrentSpec, err error) {
files, err := AllTorrentPaths(dirs)
if err != nil {
return nil, err
}
for _, f := range files {
if f.IsDir() || !f.Type().IsRegular() {
continue
}
if filepath.Ext(f.Name()) != ".torrent" { // filter out only compressed files
continue
}
a, err := loadTorrent(filepath.Join(snapDir, f.Name()))
for _, fPath := range files {
a, err := loadTorrent(fPath)
if err != nil {
return nil, err
}

View File

@ -21,6 +21,7 @@ require (
github.com/deckarep/golang-set/v2 v2.3.1
github.com/edsrzf/mmap-go v1.1.0
github.com/go-stack/stack v1.8.1
github.com/gofrs/flock v0.8.1
github.com/google/btree v1.1.2
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
github.com/hashicorp/golang-lru/v2 v2.0.6

View File

@ -158,6 +158,8 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me
github.com/go-stack/stack v1.8.1 h1:ntEHSVwIt7PNXNpgPmVfMrNhLtgjlmnZha2kOpuRiDw=
github.com/go-stack/stack v1.8.1/go.mod h1:dcoOX6HbPZSZptuspn9bctJ+N/CnF5gGygcUP3XYfe4=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw=
github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=

View File

@ -20,24 +20,13 @@ import (
"errors"
"fmt"
"reflect"
"syscall"
)
var (
ErrDataDirUsed = errors.New("datadir already used by another process")
ErrNodeStopped = errors.New("node not started")
ErrNodeRunning = errors.New("node already running")
datadirInUseErrNos = map[uint]bool{11: true, 32: true, 35: true}
)
func convertFileLockError(err error) error {
if errno, ok := err.(syscall.Errno); ok && datadirInUseErrNos[uint(errno)] {
return ErrDataDirUsed
}
return err
}
// StopError is returned if a Node fails to stop either any of its registered
// services or itself.
type StopError struct {

View File

@ -20,25 +20,28 @@ import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"reflect"
"strings"
"sync"
"time"
"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/erigon-lib/common/datadir"
"golang.org/x/sync/semaphore"
"github.com/ledgerwatch/erigon/cmd/utils"
"github.com/ledgerwatch/erigon/node/nodecfg"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/turbo/debug"
"golang.org/x/sync/semaphore"
"github.com/gofrs/flock"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/erigon/migrations"
"github.com/ledgerwatch/log/v3"
)
// Node is a container on which services can be registered.
@ -63,7 +66,7 @@ const (
)
// New creates a new P2P node, ready for protocol registration.
func New(conf *nodecfg.Config, logger log.Logger) (*Node, error) {
func New(ctx context.Context, conf *nodecfg.Config, logger log.Logger) (*Node, error) {
// Copy config and resolve the datadir so future changes to the current
// working directory don't affect the node.
confCopy := *conf
@ -86,7 +89,7 @@ func New(conf *nodecfg.Config, logger log.Logger) (*Node, error) {
}
// Acquire the instance directory lock.
if err := node.openDataDir(); err != nil {
if err := node.openDataDir(ctx); err != nil {
return nil, err
}
@ -221,27 +224,35 @@ func (n *Node) stopServices(running []Lifecycle) error {
return nil
}
func (n *Node) openDataDir() error {
func (n *Node) openDataDir(ctx context.Context) error {
if n.config.Dirs.DataDir == "" {
return nil // ephemeral
}
instdir := n.config.Dirs.DataDir
if err := os.MkdirAll(instdir, 0700); err != nil {
if err := datadir.ApplyMigrations(n.config.Dirs); err != nil {
return err
}
// Lock the instance directory to prevent concurrent use by another instance as well as
// accidental use of the instance directory as a database.
l := flock.New(filepath.Join(instdir, "LOCK"))
locked, err := l.TryLock()
if err != nil {
return convertFileLockError(err)
for retry := 0; ; retry++ {
l, locked, err := datadir.Flock(n.config.Dirs)
if err != nil {
return err
}
if !locked {
if retry >= 10 {
return fmt.Errorf("%w: %s", datadir.ErrDataDirLocked, instdir)
}
log.Error(datadir.ErrDataDirLocked.Error() + ", retry in 2 sec")
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(2 * time.Second):
}
continue
}
n.dirLock = l
break
}
if !locked {
return fmt.Errorf("%w: %s", ErrDataDirUsed, instdir)
}
n.dirLock = l
return nil
}

View File

@ -17,6 +17,7 @@
package node_test
import (
"context"
"fmt"
log2 "log"
@ -38,7 +39,7 @@ func (s *SampleLifecycle) Stop() error { fmt.Println("Service stopping..."); re
func ExampleLifecycle() {
// Create a network node to run protocols with the default values.
stack, err := node.New(&nodecfg.Config{}, log.New())
stack, err := node.New(context.Background(), &nodecfg.Config{}, log.New())
if err != nil {
log2.Fatalf("Failed to create network node: %v", err)
}

View File

@ -50,7 +50,7 @@ func TestNodeCloseMultipleTimes(t *testing.T) {
t.Skip("fix me on win please")
}
stack, err := New(testNodeConfig(t), log.New())
stack, err := New(context.Background(), testNodeConfig(t), log.New())
if err != nil {
t.Fatalf("failed to create protocol stack: %v", err)
}
@ -69,7 +69,7 @@ func TestNodeStartMultipleTimes(t *testing.T) {
t.Skip("fix me on win please")
}
stack, err := New(testNodeConfig(t), log.New())
stack, err := New(context.Background(), testNodeConfig(t), log.New())
if err != nil {
t.Fatalf("failed to create protocol stack: %v", err)
}
@ -100,7 +100,7 @@ func TestNodeUsedDataDir(t *testing.T) {
dir := t.TempDir()
// Create a new node based on the data directory
original, originalErr := New(&nodecfg.Config{Dirs: datadir.New(dir)}, log.New())
original, originalErr := New(context.Background(), &nodecfg.Config{Dirs: datadir.New(dir)}, log.New())
if originalErr != nil {
t.Fatalf("failed to create original protocol stack: %v", originalErr)
}
@ -110,14 +110,14 @@ func TestNodeUsedDataDir(t *testing.T) {
}
// Create a second node based on the same data directory and ensure failure
if _, err := New(&nodecfg.Config{Dirs: datadir.New(dir)}, log.New()); !errors.Is(err, ErrDataDirUsed) {
t.Fatalf("duplicate datadir failure mismatch: have %v, want %v", err, ErrDataDirUsed)
if _, err := New(context.Background(), &nodecfg.Config{Dirs: datadir.New(dir)}, log.New()); !errors.Is(err, datadir.ErrDataDirLocked) {
t.Fatalf("duplicate datadir failure mismatch: have %v, want %v", err, datadir.ErrDataDirLocked)
}
}
// Tests whether a Lifecycle can be registered.
func TestLifecycleRegistry_Successful(t *testing.T) {
stack, err := New(testNodeConfig(t), log.New())
stack, err := New(context.Background(), testNodeConfig(t), log.New())
if err != nil {
t.Fatalf("failed to create protocol stack: %v", err)
}
@ -144,7 +144,7 @@ func TestNodeCloseClosesDB(t *testing.T) {
}
logger := log.New()
stack, _ := New(testNodeConfig(t), logger)
stack, _ := New(context.Background(), testNodeConfig(t), logger)
defer stack.Close()
db, err := OpenDatabase(stack.Config(), kv.SentryDB, "", false, logger)
@ -172,7 +172,7 @@ func TestNodeOpenDatabaseFromLifecycleStart(t *testing.T) {
}
logger := log.New()
stack, err := New(testNodeConfig(t), logger)
stack, err := New(context.Background(), testNodeConfig(t), logger)
require.NoError(t, err)
defer stack.Close()
@ -200,7 +200,7 @@ func TestNodeOpenDatabaseFromLifecycleStop(t *testing.T) {
}
logger := log.New()
stack, _ := New(testNodeConfig(t), logger)
stack, _ := New(context.Background(), testNodeConfig(t), logger)
defer stack.Close()
stack.RegisterLifecycle(&InstrumentedService{
@ -219,7 +219,7 @@ func TestNodeOpenDatabaseFromLifecycleStop(t *testing.T) {
// Tests that registered Lifecycles get started and stopped correctly.
func TestLifecycleLifeCycle(t *testing.T) {
stack, _ := New(testNodeConfig(t), log.New())
stack, _ := New(context.Background(), testNodeConfig(t), log.New())
defer stack.Close()
started := make(map[string]bool)
@ -274,7 +274,7 @@ func TestLifecycleStartupError(t *testing.T) {
t.Skip("fix me on win please")
}
stack, err := New(testNodeConfig(t), log.New())
stack, err := New(context.Background(), testNodeConfig(t), log.New())
if err != nil {
t.Fatalf("failed to create protocol stack: %v", err)
}
@ -324,7 +324,7 @@ func TestLifecycleStartupError(t *testing.T) {
// Tests that even if a registered Lifecycle fails to shut down cleanly, it does
// not influence the rest of the shutdown invocations.
func TestLifecycleTerminationGuarantee(t *testing.T) {
stack, err := New(testNodeConfig(t), log.New())
stack, err := New(context.Background(), testNodeConfig(t), log.New())
if err != nil {
t.Fatalf("failed to create protocol stack: %v", err)
}

View File

@ -17,6 +17,7 @@
package nodecfg_test
import (
"context"
"os"
"path/filepath"
"runtime"
@ -36,7 +37,7 @@ func TestDataDirCreation(t *testing.T) {
}
// Create a temporary data dir and check that it can be used by a node
dir := t.TempDir()
node, err := node2.New(&nodecfg.Config{Dirs: datadir.New(dir)}, log.New())
node, err := node2.New(context.Background(), &nodecfg.Config{Dirs: datadir.New(dir)}, log.New())
if err != nil {
t.Fatalf("failed to create stack with existing datadir: %v", err)
}
@ -45,7 +46,7 @@ func TestDataDirCreation(t *testing.T) {
}
// Generate a long non-existing datadir path and check that it gets created by a node
dir = filepath.Join(dir, "a", "b", "c", "d", "e", "f")
node, err = node2.New(&nodecfg.Config{Dirs: datadir.New(dir)}, log.New())
node, err = node2.New(context.Background(), &nodecfg.Config{Dirs: datadir.New(dir)}, log.New())
if err != nil {
t.Fatalf("failed to create stack with creatable datadir: %v", err)
}
@ -61,16 +62,6 @@ func TestDataDirCreation(t *testing.T) {
t.Fatalf("failed to create temporary file: %v", err)
}
defer os.Remove(file.Name())
dir = filepath.Join(file.Name(), "invalid/path")
node, err = node2.New(&nodecfg.Config{Dirs: datadir.New(dir)}, log.New())
if err == nil {
t.Fatalf("protocol stack created with an invalid datadir")
if err := node.Close(); err != nil {
t.Fatalf("failed to close node: %v", err)
}
}
_ = node
}
// Tests that IPC paths are correctly resolved to valid endpoints of different

View File

@ -1,6 +1,7 @@
package helper
import (
"context"
"crypto/ecdsa"
"encoding/json"
"math/big"
@ -92,7 +93,7 @@ func InitMiner(genesis *types.Genesis, privKey *ecdsa.PrivateKey, withoutHeimdal
MdbxDBSizeLimit: 64 * datasize.MB,
}
stack, err := node.New(nodeCfg, logger)
stack, err := node.New(context.Background(), nodeCfg, logger)
if err != nil {
return nil, nil, err
}

View File

@ -62,7 +62,7 @@ func importChain(cliCtx *cli.Context) error {
nodeCfg := turboNode.NewNodConfigUrfave(cliCtx, logger)
ethCfg := turboNode.NewEthConfigUrfave(cliCtx, nodeCfg, logger)
stack := makeConfigNode(nodeCfg, logger)
stack := makeConfigNode(cliCtx.Context, nodeCfg, logger)
defer stack.Close()
ethereum, err := eth.New(stack, ethCfg, logger)

View File

@ -2,6 +2,7 @@
package app
import (
"context"
"fmt"
"strings"
@ -132,12 +133,12 @@ func NewNodeConfig(ctx *cli.Context) *nodecfg.Config {
return &nodeConfig
}
func MakeConfigNodeDefault(ctx *cli.Context, logger log.Logger) *node.Node {
return makeConfigNode(NewNodeConfig(ctx), logger)
func MakeConfigNodeDefault(cliCtx *cli.Context, logger log.Logger) *node.Node {
return makeConfigNode(cliCtx.Context, NewNodeConfig(cliCtx), logger)
}
func makeConfigNode(config *nodecfg.Config, logger log.Logger) *node.Node {
stack, err := node.New(config, logger)
func makeConfigNode(ctx context.Context, config *nodecfg.Config, logger log.Logger) *node.Node {
stack, err := node.New(ctx, config, logger)
if err != nil {
utils.Fatalf("Failed to create Erigon node: %v", err)
}

View File

@ -2,6 +2,8 @@
package node
import (
"context"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/log/v3"
"github.com/urfave/cli/v2"
@ -106,12 +108,13 @@ func NewEthConfigUrfave(ctx *cli.Context, nodeConfig *nodecfg.Config, logger log
// * sync - `stagedsync.StagedSync`, an instance of staged sync, setup just as needed.
// * optionalParams - additional parameters for running a node.
func New(
ctx context.Context,
nodeConfig *nodecfg.Config,
ethConfig *ethconfig.Config,
logger log.Logger,
) (*ErigonNode, error) {
//prepareBuckets(optionalParams.CustomBuckets)
node, err := node.New(nodeConfig, logger)
node, err := node.New(ctx, nodeConfig, logger)
if err != nil {
utils.Fatalf("Failed to create Erigon node: %v", err)
}