diff --git a/cmd/devnet/devnet/node.go b/cmd/devnet/devnet/node.go index 91cc90271..b45a098f0 100644 --- a/cmd/devnet/devnet/node.go +++ b/cmd/devnet/devnet/node.go @@ -1,7 +1,7 @@ package devnet import ( - context "context" + "context" "fmt" "math/big" "net/http" @@ -167,7 +167,7 @@ func (n *node) run(ctx *cli.Context) error { n.ethCfg.Bor.StateSyncConfirmationDelay = map[string]uint64{"0": uint64(n.network.BorStateSyncDelay.Seconds())} } - n.ethNode, err = enode.New(n.nodeCfg, n.ethCfg, logger) + n.ethNode, err = enode.New(ctx.Context, n.nodeCfg, n.ethCfg, logger) if metricsMux != nil { diagnostics.Setup(ctx, metricsMux, n.ethNode) diff --git a/cmd/downloader/main.go b/cmd/downloader/main.go index 2ce9ce9d6..af0e45c31 100644 --- a/cmd/downloader/main.go +++ b/cmd/downloader/main.go @@ -7,6 +7,8 @@ import ( "net" "os" "path/filepath" + "runtime" + "strings" "time" "github.com/ledgerwatch/erigon-lib/common/dir" @@ -15,8 +17,8 @@ import ( "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/erigon/cmd/hack/tool" "github.com/ledgerwatch/erigon/turbo/snapshotsync/snapcfg" + "github.com/pelletier/go-toml/v2" - "github.com/anacrolix/torrent/metainfo" "github.com/c2h5oh/datasize" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" @@ -26,7 +28,6 @@ import ( downloadercfg2 "github.com/ledgerwatch/erigon-lib/downloader/downloadercfg" proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader" "github.com/ledgerwatch/log/v3" - "github.com/pelletier/go-toml" "github.com/spf13/cobra" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -121,6 +122,7 @@ func withFile(cmd *cobra.Command) { } } +var logger log.Logger var rootCmd = &cobra.Command{ Use: "", Short: "snapshot downloader", @@ -128,8 +130,11 @@ var rootCmd = &cobra.Command{ PersistentPostRun: func(cmd *cobra.Command, args []string) { debug.Exit() }, + PersistentPreRun: func(cmd *cobra.Command, args []string) { + logger = debug.SetupCobra(cmd, "downloader") + logger.Info("Build info", "git_branch", params.GitBranch, "git_tag", params.GitTag, "git_commit", params.GitCommit) + }, Run: func(cmd *cobra.Command, args []string) { - logger := debug.SetupCobra(cmd, "integration") if err := Downloader(cmd.Context(), logger); err != nil { if !errors.Is(err, context.Canceled) { logger.Error(err.Error()) @@ -157,7 +162,7 @@ func Downloader(ctx context.Context, logger log.Logger) error { return err } - logger.Info("Run snapshot downloader", "addr", downloaderApiAddr, "datadir", dirs.DataDir, "ipv6-enabled", !disableIPV6, "ipv4-enabled", !disableIPV4, "download.rate", downloadRate.String(), "upload.rate", uploadRate.String()) + logger.Info("[snapshots] cli flags", "chain", chain, "addr", downloaderApiAddr, "datadir", dirs.DataDir, "ipv6-enabled", !disableIPV6, "ipv4-enabled", !disableIPV4, "download.rate", downloadRate.String(), "upload.rate", uploadRate.String(), "webseed", webseeds) staticPeers := common.CliString2Array(staticPeersStr) version := "erigon: " + params.VersionWithCommit(params.GitCommit) @@ -166,6 +171,7 @@ func Downloader(ctx context.Context, logger log.Logger) error { return err } + cfg.ClientConfig.PieceHashersPerTorrent = runtime.NumCPU() * 4 cfg.ClientConfig.DisableIPv6 = disableIPV6 cfg.ClientConfig.DisableIPv4 = disableIPV4 @@ -180,7 +186,7 @@ func Downloader(ctx context.Context, logger log.Logger) error { return err } defer d.Close() - logger.Info("[torrent] Start", "my peerID", fmt.Sprintf("%x", d.TorrentClient().PeerID())) + logger.Info("[snapshots] Start bittorrent server", "my_peer_id", fmt.Sprintf("%x", d.TorrentClient().PeerID())) d.MainLoopInBackground(false) @@ -215,7 +221,7 @@ var createTorrent = &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { //logger := debug.SetupCobra(cmd, "integration") dirs := datadir.New(datadirCli) - err := downloader.BuildTorrentFilesIfNeed(context.Background(), dirs.Snap) + err := downloader.BuildTorrentFilesIfNeed(cmd.Context(), dirs) if err != nil { return err } @@ -227,71 +233,75 @@ var printTorrentHashes = &cobra.Command{ Use: "torrent_hashes", Example: "go run ./cmd/downloader torrent_hashes --datadir ", RunE: func(cmd *cobra.Command, args []string) error { - logger := debug.SetupCobra(cmd, "integration") - dirs := datadir.New(datadirCli) - ctx := cmd.Context() - - if forceRebuild { // remove and create .torrent files (will re-read all snapshots) - //removePieceCompletionStorage(snapDir) - files, err := downloader.AllTorrentPaths(dirs.Snap) - if err != nil { - return err - } - for _, filePath := range files { - if err := os.Remove(filePath); err != nil { - return err - } - } - if err := downloader.BuildTorrentFilesIfNeed(ctx, dirs.Snap); err != nil { - return err - } - } - - res := map[string]string{} - files, err := downloader.AllTorrentPaths(dirs.Snap) - if err != nil { - return err - } - for _, torrentFilePath := range files { - mi, err := metainfo.LoadFromFile(torrentFilePath) - if err != nil { - return err - } - info, err := mi.UnmarshalInfo() - if err != nil { - return err - } - res[info.Name] = mi.HashInfoBytes().String() - } - serialized, err := toml.Marshal(res) - if err != nil { - return err - } - - if targetFile == "" { - fmt.Printf("%s\n", serialized) - return nil - } - - oldContent, err := os.ReadFile(targetFile) - if err != nil { - return err - } - oldLines := map[string]string{} - if err := toml.Unmarshal(oldContent, &oldLines); err != nil { - return fmt.Errorf("unmarshal: %w", err) - } - if len(oldLines) >= len(res) { - logger.Info("amount of lines in target file is equal or greater than amount of lines in snapshot dir", "old", len(oldLines), "new", len(res)) - return nil - } - if err := os.WriteFile(targetFile, serialized, 0644); err != nil { // nolint - return err + logger := debug.SetupCobra(cmd, "downloader") + if err := doPrintTorrentHashes(cmd.Context(), logger); err != nil { + log.Error(err.Error()) } return nil }, } +func doPrintTorrentHashes(ctx context.Context, logger log.Logger) error { + dirs := datadir.New(datadirCli) + if forceRebuild { // remove and create .torrent files (will re-read all snapshots) + //removePieceCompletionStorage(snapDir) + files, err := downloader.AllTorrentPaths(dirs) + if err != nil { + return err + } + for _, filePath := range files { + if err := os.Remove(filePath); err != nil { + return err + } + } + if err := downloader.BuildTorrentFilesIfNeed(ctx, dirs); err != nil { + return fmt.Errorf("BuildTorrentFilesIfNeed: %w", err) + } + } + + res := map[string]string{} + torrents, err := downloader.AllTorrentSpecs(dirs) + if err != nil { + return err + } + for _, t := range torrents { + // we don't release commitment history in this time. let's skip it here. + if strings.HasPrefix(t.DisplayName, "history/commitment") { + continue + } + if strings.HasPrefix(t.DisplayName, "idx/commitment") { + continue + } + res[t.DisplayName] = t.InfoHash.String() + } + serialized, err := toml.Marshal(res) + if err != nil { + return err + } + + if targetFile == "" { + fmt.Printf("%s\n", serialized) + return nil + } + + oldContent, err := os.ReadFile(targetFile) + if err != nil { + return err + } + oldLines := map[string]string{} + if err := toml.Unmarshal(oldContent, &oldLines); err != nil { + return fmt.Errorf("unmarshal: %w", err) + } + if len(oldLines) >= len(res) { + logger.Info("amount of lines in target file is equal or greater than amount of lines in snapshot dir", "old", len(oldLines), "new", len(res)) + return nil + } + if err := os.WriteFile(targetFile, serialized, 0644); err != nil { // nolint + return err + } + return nil +} + func StartGrpc(snServer *downloader.GrpcServer, addr string, creds *credentials.TransportCredentials, logger log.Logger) (*grpc.Server, error) { lis, err := net.Listen("tcp", addr) if err != nil { diff --git a/cmd/erigon/main.go b/cmd/erigon/main.go index aff45cbd5..135d72c76 100644 --- a/cmd/erigon/main.go +++ b/cmd/erigon/main.go @@ -72,7 +72,7 @@ func runErigon(cliCtx *cli.Context) error { nodeCfg := node.NewNodConfigUrfave(cliCtx, logger) ethCfg := node.NewEthConfigUrfave(cliCtx, nodeCfg, logger) - ethNode, err := node.New(nodeCfg, ethCfg, logger) + ethNode, err := node.New(cliCtx.Context, nodeCfg, ethCfg, logger) if err != nil { log.Error("Erigon startup", "err", err) return err diff --git a/erigon-lib/common/datadir/dirs.go b/erigon-lib/common/datadir/dirs.go index d4cd59972..736dea559 100644 --- a/erigon-lib/common/datadir/dirs.go +++ b/erigon-lib/common/datadir/dirs.go @@ -17,7 +17,14 @@ package datadir import ( + "errors" + "fmt" + "os" "path/filepath" + "syscall" + + "github.com/gofrs/flock" + "github.com/ledgerwatch/erigon-lib/common/dir" ) // Dirs is the file system folder the node should use for any data storage @@ -30,7 +37,11 @@ type Dirs struct { Chaindata string Tmp string Snap string + SnapIdx string SnapHistory string + SnapDomain string + SnapAccessors string + Downloader string TxPool string Nodes string } @@ -46,14 +57,141 @@ func New(datadir string) Dirs { datadir = absdatadir } - return Dirs{ + dirs := Dirs{ RelativeDataDir: relativeDataDir, DataDir: datadir, Chaindata: filepath.Join(datadir, "chaindata"), Tmp: filepath.Join(datadir, "temp"), Snap: filepath.Join(datadir, "snapshots"), + SnapIdx: filepath.Join(datadir, "snapshots", "idx"), SnapHistory: filepath.Join(datadir, "snapshots", "history"), + SnapDomain: filepath.Join(datadir, "snapshots", "domain"), + SnapAccessors: filepath.Join(datadir, "snapshots", "accessor"), + Downloader: filepath.Join(datadir, "snapshots", "db"), TxPool: filepath.Join(datadir, "txpool"), Nodes: filepath.Join(datadir, "nodes"), } + dir.MustExist(dirs.Chaindata, dirs.Tmp, + dirs.SnapIdx, dirs.SnapHistory, dirs.SnapDomain, dirs.SnapAccessors, + dirs.Downloader, dirs.TxPool, dirs.Nodes) + return dirs +} + +var ( + ErrDataDirLocked = errors.New("datadir already used by another process") + + datadirInUseErrNos = map[uint]bool{11: true, 32: true, 35: true} +) + +func convertFileLockError(err error) error { + //nolint + if errno, ok := err.(syscall.Errno); ok && datadirInUseErrNos[uint(errno)] { + return ErrDataDirLocked + } + return err +} + +func Flock(dirs Dirs) (*flock.Flock, bool, error) { + // Lock the instance directory to prevent concurrent use by another instance as well as + // accidental use of the instance directory as a database. + l := flock.New(filepath.Join(dirs.DataDir, "LOCK")) + locked, err := l.TryLock() + if err != nil { + return nil, false, convertFileLockError(err) + } + return l, locked, nil +} + +// ApplyMigrations - if can get flock. +func ApplyMigrations(dirs Dirs) error { + lock, locked, err := Flock(dirs) + if err != nil { + return err + } + if !locked { + return nil + } + defer lock.Unlock() + + if err := downloaderV2Migration(dirs); err != nil { + return err + } + //if err := erigonV3foldersV31Migration(dirs); err != nil { + // return err + //} + return nil +} + +func downloaderV2Migration(dirs Dirs) error { + // move db from `datadir/snapshot/db` to `datadir/downloader` + if dir.Exist(filepath.Join(dirs.Snap, "db", "mdbx.dat")) { // migration from prev versions + from, to := filepath.Join(dirs.Snap, "db", "mdbx.dat"), filepath.Join(dirs.Downloader, "mdbx.dat") + if err := os.Rename(from, to); err != nil { + //fall back to copy-file if folders are on different disks + if err := copyFile(from, to); err != nil { + return err + } + } + } + return nil +} + +// nolint +func erigonV3foldersV31Migration(dirs Dirs) error { + // migrate files db from `datadir/snapshot/warm` to `datadir/snapshots/domain` + if dir.Exist(filepath.Join(dirs.Snap, "warm")) { + warmDir := filepath.Join(dirs.Snap, "warm") + moveFiles(warmDir, dirs.SnapDomain, ".kv") + os.Rename(filepath.Join(dirs.SnapHistory, "salt.txt"), filepath.Join(dirs.Snap, "salt.txt")) + moveFiles(warmDir, dirs.SnapDomain, ".kv") + moveFiles(warmDir, dirs.SnapDomain, ".kvei") + moveFiles(warmDir, dirs.SnapDomain, ".bt") + moveFiles(dirs.SnapHistory, dirs.SnapAccessors, ".vi") + moveFiles(dirs.SnapHistory, dirs.SnapAccessors, ".efi") + moveFiles(dirs.SnapHistory, dirs.SnapAccessors, ".efei") + moveFiles(dirs.SnapHistory, dirs.SnapIdx, ".ef") + } + return nil +} + +// nolint +func moveFiles(from, to string, ext string) error { + files, err := os.ReadDir(from) + if err != nil { + return fmt.Errorf("ReadDir: %w, %s", err, from) + } + for _, f := range files { + if f.Type().IsDir() || !f.Type().IsRegular() { + continue + } + if filepath.Ext(f.Name()) != ext { + continue + } + _ = os.Rename(filepath.Join(from, f.Name()), filepath.Join(to, f.Name())) + } + return nil +} + +func copyFile(from, to string) error { + r, err := os.Open(from) + if err != nil { + return fmt.Errorf("please manually move file: from %s to %s. error: %w", from, to, err) + } + defer r.Close() + w, err := os.Create(to) + if err != nil { + return fmt.Errorf("please manually move file: from %s to %s. error: %w", from, to, err) + } + defer w.Close() + if _, err = w.ReadFrom(r); err != nil { + w.Close() + os.Remove(to) + return fmt.Errorf("please manually move file: from %s to %s. error: %w", from, to, err) + } + if err = w.Sync(); err != nil { + w.Close() + os.Remove(to) + return fmt.Errorf("please manually move file: from %s to %s. error: %w", from, to, err) + } + return nil } diff --git a/erigon-lib/common/dir/rw_dir.go b/erigon-lib/common/dir/rw_dir.go index 008d0f569..e2dab0886 100644 --- a/erigon-lib/common/dir/rw_dir.go +++ b/erigon-lib/common/dir/rw_dir.go @@ -21,10 +21,12 @@ import ( "path/filepath" ) -func MustExist(path string) { +func MustExist(path ...string) { const perm = 0764 // user rwx, group rw, other r - if err := os.MkdirAll(path, perm); err != nil { - panic(err) + for _, p := range path { + if err := os.MkdirAll(p, perm); err != nil { + panic(err) + } } } @@ -47,6 +49,24 @@ func FileExist(path string) bool { return true } +// nolint +func WriteFileWithFsync(name string, data []byte, perm os.FileMode) error { + f, err := os.OpenFile(name, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm) + if err != nil { + return err + } + defer f.Close() + _, err = f.Write(data) + if err != nil { + return err + } + err = f.Sync() + if err != nil { + return err + } + return err +} + func Recreate(dir string) { if Exist(dir) { _ = os.RemoveAll(dir) @@ -70,30 +90,42 @@ func HasFileOfType(dir, ext string) bool { return false } -func DeleteFilesOfType(dir string, exts ...string) { - d, err := os.Open(dir) - if err != nil { - if os.IsNotExist(err) { - return +// nolint +func DeleteFiles(dirs ...string) error { + for _, dir := range dirs { + files, err := ListFiles(dir) + if err != nil { + return err } - panic(err) - } - defer d.Close() - - files, err := d.Readdir(-1) - if err != nil { - panic(err) - } - - for _, file := range files { - if !file.Mode().IsRegular() { - continue - } - - for _, ext := range exts { - if filepath.Ext(file.Name()) == ext { - _ = os.Remove(filepath.Join(dir, file.Name())) + for _, fPath := range files { + if err := os.Remove(filepath.Join(dir, fPath)); err != nil { + return err } } } + return nil +} + +func ListFiles(dir string, extensions ...string) ([]string, error) { + files, err := os.ReadDir(dir) + if err != nil { + return nil, err + } + res := make([]string, 0, len(files)) + for _, f := range files { + if f.IsDir() && !f.Type().IsRegular() { + continue + } + match := false + for _, ext := range extensions { + if filepath.Ext(f.Name()) == ext { // filter out only compressed files + match = true + } + } + if !match { + continue + } + res = append(res, filepath.Join(dir, f.Name())) + } + return res, nil } diff --git a/erigon-lib/downloader/downloader.go b/erigon-lib/downloader/downloader.go index 34a822b0f..4ed4d5321 100644 --- a/erigon-lib/downloader/downloader.go +++ b/erigon-lib/downloader/downloader.go @@ -20,9 +20,6 @@ import ( "context" "errors" "fmt" - "io/fs" - "os" - "path/filepath" "runtime" "strings" "sync" @@ -33,7 +30,7 @@ import ( "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/storage" common2 "github.com/ledgerwatch/erigon-lib/common" - "github.com/ledgerwatch/erigon-lib/common/dir" + "github.com/ledgerwatch/erigon-lib/common/datadir" "github.com/ledgerwatch/erigon-lib/downloader/downloadercfg" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/mdbx" @@ -81,18 +78,18 @@ type AggStats struct { } func New(ctx context.Context, cfg *downloadercfg.Cfg, logger log.Logger, verbosity log.Lvl) (*Downloader, error) { - // Application must never see partially-downloaded files - // To provide such consistent view - downloader does: - // add /snapshots/tmp - then method .onComplete will remove this suffix - // and App only work with /snapshot s folder - if dir.FileExist(cfg.SnapDir + "_tmp") { // migration from prev versions - _ = os.Rename(cfg.SnapDir+"_tmp", filepath.Join(cfg.SnapDir, "tmp")) // ignore error, because maybe they are on different drive, or target folder already created manually, all is fine - } - if err := moveFromTmp(cfg.SnapDir); err != nil { - return nil, err - } + // move db from `datadir/snapshot/db` to `datadir/downloader` + //if dir.Exist(filepath.Join(cfg.Dirs.Snap, "db", "mdbx.dat")) { // migration from prev versions + // from, to := filepath.Join(cfg.Dirs.Snap, "db", "mdbx.dat"), filepath.Join(cfg.Dirs.Downloader, "mdbx.dat") + // if err := os.Rename(from, to); err != nil { + // //fall back to copy-file if folders are on different disks + // if err := copyFile(from, to); err != nil { + // return nil, err + // } + // } + //} - db, c, m, torrentClient, err := openClient(cfg.ClientConfig) + db, c, m, torrentClient, err := openClient(cfg.Dirs.Downloader, cfg.Dirs.Snap, cfg.ClientConfig) if err != nil { return nil, fmt.Errorf("openClient: %w", err) } @@ -132,7 +129,7 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg, logger log.Logger, verbosi d.wg.Add(1) go func() { defer d.wg.Done() - d.webseeds.Discover(d.ctx, d.cfg.WebSeedUrls, d.cfg.WebSeedFiles, d.cfg.SnapDir) + d.webseeds.Discover(d.ctx, d.cfg.WebSeedUrls, d.cfg.WebSeedFiles, d.cfg.Dirs.Snap) // webseeds.Discover may create new .torrent files on disk if err := d.addTorrentFilesFromDisk(); err != nil && !errors.Is(err, context.Canceled) { d.logger.Warn("[snapshots] addTorrentFilesFromDisk", "err", err) @@ -312,7 +309,7 @@ func (d *Downloader) mainLoop(silent bool) error { } } -func (d *Downloader) SnapDir() string { return d.cfg.SnapDir } +func (d *Downloader) SnapDir() string { return d.cfg.Dirs.Snap } func (d *Downloader) ReCalcStats(interval time.Duration) { //Call this methods outside of `statsLock` critical section, because they have own locks with contention @@ -386,37 +383,6 @@ func (d *Downloader) ReCalcStats(interval time.Duration) { d.stats = stats } -func moveFromTmp(snapDir string) error { - tmpDir := filepath.Join(snapDir, "tmp") - if !dir.FileExist(tmpDir) { - return nil - } - - snFs := os.DirFS(tmpDir) - paths, err := fs.ReadDir(snFs, ".") - if err != nil { - return err - } - for _, p := range paths { - if p.IsDir() || !p.Type().IsRegular() { - continue - } - if p.Name() == "tmp" { - continue - } - src := filepath.Join(tmpDir, p.Name()) - if err := os.Rename(src, filepath.Join(snapDir, p.Name())); err != nil { - if os.IsExist(err) { - _ = os.Remove(src) - continue - } - return err - } - } - _ = os.Remove(tmpDir) - return nil -} - func (d *Downloader) verifyFile(ctx context.Context, t *torrent.Torrent, completePieces *atomic.Uint64) error { select { case <-ctx.Done(): @@ -490,7 +456,9 @@ func (d *Downloader) VerifyData(ctx context.Context) error { }) } - g.Wait() + if err := g.Wait(); err != nil { + return err + } // force fsync of db. to not loose results of validation on power-off return d.db.Update(context.Background(), func(tx kv.RwTx) error { return nil }) } @@ -569,22 +537,26 @@ func (d *Downloader) AddInfoHashAsMagnetLink(ctx context.Context, infoHash metai return nil } -func seedableFiles(snapDir string) ([]string, error) { - files, err := seedableSegmentFiles(snapDir) +func seedableFiles(dirs datadir.Dirs) ([]string, error) { + files, err := seedableSegmentFiles(dirs.Snap) if err != nil { return nil, fmt.Errorf("seedableSegmentFiles: %w", err) } - files2, err := seedableHistorySnapshots(snapDir) + l, err := seedableSnapshotsBySubDir(dirs.Snap, "history") if err != nil { - return nil, fmt.Errorf("seedableHistorySnapshots: %w", err) + return nil, err } - files = append(files, files2...) + l2, err := seedableSnapshotsBySubDir(dirs.Snap, "warm") + if err != nil { + return nil, err + } + files = append(append(files, l...), l2...) return files, nil } func (d *Downloader) addTorrentFilesFromDisk() error { logEvery := time.NewTicker(20 * time.Second) defer logEvery.Stop() - files, err := allTorrentFiles(d.SnapDir()) + files, err := AllTorrentSpecs(d.cfg.Dirs) if err != nil { return err } @@ -606,7 +578,7 @@ func (d *Downloader) addTorrentFilesFromDisk() error { return nil } func (d *Downloader) BuildTorrentFilesIfNeed(ctx context.Context) error { - return BuildTorrentFilesIfNeed(ctx, d.SnapDir()) + return BuildTorrentFilesIfNeed(ctx, d.cfg.Dirs) } func (d *Downloader) Stats() AggStats { @@ -646,13 +618,12 @@ func (d *Downloader) StopSeeding(hash metainfo.Hash) error { func (d *Downloader) TorrentClient() *torrent.Client { return d.torrentClient } -func openClient(cfg *torrent.ClientConfig) (db kv.RwDB, c storage.PieceCompletion, m storage.ClientImplCloser, torrentClient *torrent.Client, err error) { - snapDir := cfg.DataDir +func openClient(dbDir, snapDir string, cfg *torrent.ClientConfig) (db kv.RwDB, c storage.PieceCompletion, m storage.ClientImplCloser, torrentClient *torrent.Client, err error) { db, err = mdbx.NewMDBX(log.New()). Label(kv.DownloaderDB). WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.DownloaderTablesCfg }). SyncPeriod(15 * time.Second). - Path(filepath.Join(snapDir, "db")). + Path(dbDir). Open() if err != nil { return nil, nil, nil, nil, fmt.Errorf("torrentcfg.openClient: %w", err) @@ -664,13 +635,7 @@ func openClient(cfg *torrent.ClientConfig) (db kv.RwDB, c storage.PieceCompletio m = storage.NewMMapWithCompletion(snapDir, c) cfg.DefaultStorage = m - for retry := 0; retry < 5; retry++ { - torrentClient, err = torrent.NewClient(cfg) - if err == nil { - break - } - time.Sleep(10 * time.Millisecond) - } + torrentClient, err = torrent.NewClient(cfg) if err != nil { return nil, nil, nil, nil, fmt.Errorf("torrent.NewClient: %w", err) } diff --git a/erigon-lib/downloader/downloadercfg/downloadercfg.go b/erigon-lib/downloader/downloadercfg/downloadercfg.go index 6b702b948..bbf31c9fc 100644 --- a/erigon-lib/downloader/downloadercfg/downloadercfg.go +++ b/erigon-lib/downloader/downloadercfg/downloadercfg.go @@ -46,10 +46,10 @@ const DefaultNetworkChunkSize = 512 * 1024 type Cfg struct { ClientConfig *torrent.ClientConfig - SnapDir string DownloadSlots int WebSeedUrls []*url.URL WebSeedFiles []string + Dirs datadir.Dirs } func Default() *torrent.ClientConfig { @@ -78,9 +78,9 @@ func Default() *torrent.ClientConfig { return torrentConfig } -func New(dataDir datadir.Dirs, version string, verbosity lg.Level, downloadRate, uploadRate datasize.ByteSize, port, connsPerFile, downloadSlots int, staticPeers []string, webseeds string) (*Cfg, error) { +func New(dirs datadir.Dirs, version string, verbosity lg.Level, downloadRate, uploadRate datasize.ByteSize, port, connsPerFile, downloadSlots int, staticPeers []string, webseeds string) (*Cfg, error) { torrentConfig := Default() - torrentConfig.DataDir = dataDir.Snap // `DataDir` of torrent-client-lib is different from Erigon's `DataDir`. Just same naming. + torrentConfig.DataDir = dirs.Snap // `DataDir` of torrent-client-lib is different from Erigon's `DataDir`. Just same naming. torrentConfig.ExtendedHandshakeClientVersion = version @@ -155,12 +155,12 @@ func New(dataDir datadir.Dirs, version string, verbosity lg.Level, downloadRate, } webseedUrls = append(webseedUrls, uri) } - localCfgFile := filepath.Join(dataDir.DataDir, "webseeds.toml") // datadir/webseeds.toml allowed + localCfgFile := filepath.Join(dirs.DataDir, "webseeds.toml") // datadir/webseeds.toml allowed if dir.FileExist(localCfgFile) { webseedFiles = append(webseedFiles, localCfgFile) } - return &Cfg{SnapDir: torrentConfig.DataDir, + return &Cfg{Dirs: dirs, ClientConfig: torrentConfig, DownloadSlots: downloadSlots, WebSeedUrls: webseedUrls, WebSeedFiles: webseedFiles, }, nil diff --git a/erigon-lib/downloader/util.go b/erigon-lib/downloader/util.go index 44ebd3628..9c966eb61 100644 --- a/erigon-lib/downloader/util.go +++ b/erigon-lib/downloader/util.go @@ -32,6 +32,7 @@ import ( "github.com/anacrolix/torrent/metainfo" common2 "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/cmp" + "github.com/ledgerwatch/erigon-lib/common/datadir" dir2 "github.com/ledgerwatch/erigon-lib/common/dir" "github.com/ledgerwatch/erigon-lib/downloader/downloadercfg" "github.com/ledgerwatch/erigon-lib/downloader/snaptype" @@ -63,116 +64,42 @@ var Trackers = [][]string{ //websocketTrackers // TODO: Ws protocol producing too many errors and flooding logs. But it's also very fast and reactive. } -func AllTorrentPaths(dir string) ([]string, error) { - files, err := AllTorrentFiles(dir) - if err != nil { - return nil, err - } - histDir := filepath.Join(dir, "history") - files2, err := AllTorrentFiles(histDir) - if err != nil { - return nil, err - } - res := make([]string, 0, len(files)+len(files2)) - for _, f := range files { - torrentFilePath := filepath.Join(dir, f) - res = append(res, torrentFilePath) - } - for _, f := range files2 { - torrentFilePath := filepath.Join(histDir, f) - res = append(res, torrentFilePath) - } - return res, nil -} - -func AllTorrentFiles(dir string) ([]string, error) { - files, err := os.ReadDir(dir) - if err != nil { - return nil, err - } - res := make([]string, 0, len(files)) - for _, f := range files { - if filepath.Ext(f.Name()) != ".torrent" { // filter out only compressed files - continue - } - fileInfo, err := f.Info() - if err != nil { - return nil, err - } - if fileInfo.Size() == 0 { - continue - } - res = append(res, f.Name()) - } - return res, nil -} - func seedableSegmentFiles(dir string) ([]string, error) { - files, err := os.ReadDir(dir) + files, err := dir2.ListFiles(dir, ".seg") if err != nil { return nil, err } res := make([]string, 0, len(files)) - for _, f := range files { - if f.IsDir() { + for _, fPath := range files { + _, name := filepath.Split(fPath) + if !snaptype.IsCorrectFileName(name) { continue } - if !f.Type().IsRegular() { - continue - } - if !snaptype.IsCorrectFileName(f.Name()) { - continue - } - if filepath.Ext(f.Name()) != ".seg" { // filter out only compressed files - continue - } - ff, ok := snaptype.ParseFileName(dir, f.Name()) + ff, ok := snaptype.ParseFileName(dir, name) if !ok { continue } if !ff.Seedable() { continue } - res = append(res, f.Name()) + res = append(res, name) } return res, nil } var historyFileRegex = regexp.MustCompile("^([[:lower:]]+).([0-9]+)-([0-9]+).(.*)$") -func seedableHistorySnapshots(dir string) ([]string, error) { - l, err := seedableSnapshotsBySubDir(dir, "history") - if err != nil { - return nil, err - } - l2, err := seedableSnapshotsBySubDir(dir, "warm") - if err != nil { - return nil, err - } - return append(l, l2...), nil -} - func seedableSnapshotsBySubDir(dir, subDir string) ([]string, error) { historyDir := filepath.Join(dir, subDir) dir2.MustExist(historyDir) - files, err := os.ReadDir(historyDir) + files, err := dir2.ListFiles(historyDir, ".kv", ".v", ".ef") if err != nil { return nil, err } res := make([]string, 0, len(files)) - for _, f := range files { - if f.IsDir() { - continue - } - if !f.Type().IsRegular() { - continue - } - ext := filepath.Ext(f.Name()) - if ext != ".v" && ext != ".ef" { // filter out only compressed files - continue - } - - subs := historyFileRegex.FindStringSubmatch(f.Name()) + for _, fPath := range files { + _, name := filepath.Split(fPath) + subs := historyFileRegex.FindStringSubmatch(name) if len(subs) != 5 { continue } @@ -188,7 +115,7 @@ func seedableSnapshotsBySubDir(dir, subDir string) ([]string, error) { if (to-from)%snaptype.Erigon3SeedableSteps != 0 { continue } - res = append(res, filepath.Join(subDir, f.Name())) + res = append(res, filepath.Join(subDir, name)) } return res, nil } @@ -239,11 +166,11 @@ func BuildTorrentIfNeed(ctx context.Context, fName, root string) (torrentFilePat } // BuildTorrentFilesIfNeed - create .torrent files from .seg files (big IO) - if .seg files were added manually -func BuildTorrentFilesIfNeed(ctx context.Context, snapDir string) error { +func BuildTorrentFilesIfNeed(ctx context.Context, dirs datadir.Dirs) error { logEvery := time.NewTicker(20 * time.Second) defer logEvery.Stop() - files, err := seedableFiles(snapDir) + files, err := seedableFiles(dirs) if err != nil { return err } @@ -256,7 +183,7 @@ func BuildTorrentFilesIfNeed(ctx context.Context, snapDir string) error { file := file g.Go(func() error { defer i.Add(1) - if _, err := BuildTorrentIfNeed(ctx, file, snapDir); err != nil { + if _, err := BuildTorrentIfNeed(ctx, file, dirs.Snap); err != nil { return err } return nil @@ -327,37 +254,26 @@ func CreateTorrentFileFromInfo(root string, info *metainfo.Info, mi *metainfo.Me return CreateTorrentFromMetaInfo(root, info, mi) } -func allTorrentFiles(snapDir string) (res []*torrent.TorrentSpec, err error) { - res, err = torrentInDir(snapDir) +func AllTorrentPaths(dirs datadir.Dirs) ([]string, error) { + files, err := dir2.ListFiles(dirs.Snap, ".torrent") if err != nil { return nil, err } - res2, err := torrentInDir(filepath.Join(snapDir, "history")) + files2, err := dir2.ListFiles(dirs.SnapHistory, ".torrent") if err != nil { return nil, err } - res = append(res, res2...) - res2, err = torrentInDir(filepath.Join(snapDir, "warm")) - if err != nil { - return nil, err - } - res = append(res, res2...) - return res, nil + files = append(files, files2...) + return files, nil } -func torrentInDir(snapDir string) (res []*torrent.TorrentSpec, err error) { - files, err := os.ReadDir(snapDir) + +func AllTorrentSpecs(dirs datadir.Dirs) (res []*torrent.TorrentSpec, err error) { + files, err := AllTorrentPaths(dirs) if err != nil { return nil, err } - for _, f := range files { - if f.IsDir() || !f.Type().IsRegular() { - continue - } - if filepath.Ext(f.Name()) != ".torrent" { // filter out only compressed files - continue - } - - a, err := loadTorrent(filepath.Join(snapDir, f.Name())) + for _, fPath := range files { + a, err := loadTorrent(fPath) if err != nil { return nil, err } diff --git a/erigon-lib/go.mod b/erigon-lib/go.mod index ec77de05c..7173eb248 100644 --- a/erigon-lib/go.mod +++ b/erigon-lib/go.mod @@ -21,6 +21,7 @@ require ( github.com/deckarep/golang-set/v2 v2.3.1 github.com/edsrzf/mmap-go v1.1.0 github.com/go-stack/stack v1.8.1 + github.com/gofrs/flock v0.8.1 github.com/google/btree v1.1.2 github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 github.com/hashicorp/golang-lru/v2 v2.0.6 diff --git a/erigon-lib/go.sum b/erigon-lib/go.sum index f61b512b3..dead3943d 100644 --- a/erigon-lib/go.sum +++ b/erigon-lib/go.sum @@ -158,6 +158,8 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me github.com/go-stack/stack v1.8.1 h1:ntEHSVwIt7PNXNpgPmVfMrNhLtgjlmnZha2kOpuRiDw= github.com/go-stack/stack v1.8.1/go.mod h1:dcoOX6HbPZSZptuspn9bctJ+N/CnF5gGygcUP3XYfe4= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= +github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= +github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= diff --git a/node/errors.go b/node/errors.go index c6ac5c7aa..bbde51072 100644 --- a/node/errors.go +++ b/node/errors.go @@ -20,24 +20,13 @@ import ( "errors" "fmt" "reflect" - "syscall" ) var ( - ErrDataDirUsed = errors.New("datadir already used by another process") ErrNodeStopped = errors.New("node not started") ErrNodeRunning = errors.New("node already running") - - datadirInUseErrNos = map[uint]bool{11: true, 32: true, 35: true} ) -func convertFileLockError(err error) error { - if errno, ok := err.(syscall.Errno); ok && datadirInUseErrNos[uint(errno)] { - return ErrDataDirUsed - } - return err -} - // StopError is returned if a Node fails to stop either any of its registered // services or itself. type StopError struct { diff --git a/node/node.go b/node/node.go index 06ba60181..04a5dbbb7 100644 --- a/node/node.go +++ b/node/node.go @@ -20,25 +20,28 @@ import ( "context" "errors" "fmt" - "os" "path/filepath" "reflect" "strings" "sync" + "time" "github.com/c2h5oh/datasize" + "github.com/ledgerwatch/erigon-lib/common/datadir" + "golang.org/x/sync/semaphore" + "github.com/ledgerwatch/erigon/cmd/utils" "github.com/ledgerwatch/erigon/node/nodecfg" "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/turbo/debug" - "golang.org/x/sync/semaphore" "github.com/gofrs/flock" + "github.com/ledgerwatch/log/v3" + "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/erigon-lib/kv/memdb" "github.com/ledgerwatch/erigon/migrations" - "github.com/ledgerwatch/log/v3" ) // Node is a container on which services can be registered. @@ -63,7 +66,7 @@ const ( ) // New creates a new P2P node, ready for protocol registration. -func New(conf *nodecfg.Config, logger log.Logger) (*Node, error) { +func New(ctx context.Context, conf *nodecfg.Config, logger log.Logger) (*Node, error) { // Copy config and resolve the datadir so future changes to the current // working directory don't affect the node. confCopy := *conf @@ -86,7 +89,7 @@ func New(conf *nodecfg.Config, logger log.Logger) (*Node, error) { } // Acquire the instance directory lock. - if err := node.openDataDir(); err != nil { + if err := node.openDataDir(ctx); err != nil { return nil, err } @@ -221,27 +224,35 @@ func (n *Node) stopServices(running []Lifecycle) error { return nil } -func (n *Node) openDataDir() error { +func (n *Node) openDataDir(ctx context.Context) error { if n.config.Dirs.DataDir == "" { return nil // ephemeral } instdir := n.config.Dirs.DataDir - if err := os.MkdirAll(instdir, 0700); err != nil { + if err := datadir.ApplyMigrations(n.config.Dirs); err != nil { return err } - // Lock the instance directory to prevent concurrent use by another instance as well as - // accidental use of the instance directory as a database. - l := flock.New(filepath.Join(instdir, "LOCK")) - - locked, err := l.TryLock() - if err != nil { - return convertFileLockError(err) + for retry := 0; ; retry++ { + l, locked, err := datadir.Flock(n.config.Dirs) + if err != nil { + return err + } + if !locked { + if retry >= 10 { + return fmt.Errorf("%w: %s", datadir.ErrDataDirLocked, instdir) + } + log.Error(datadir.ErrDataDirLocked.Error() + ", retry in 2 sec") + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(2 * time.Second): + } + continue + } + n.dirLock = l + break } - if !locked { - return fmt.Errorf("%w: %s", ErrDataDirUsed, instdir) - } - n.dirLock = l return nil } diff --git a/node/node_example_test.go b/node/node_example_test.go index 82d42723b..17cdfd6b4 100644 --- a/node/node_example_test.go +++ b/node/node_example_test.go @@ -17,6 +17,7 @@ package node_test import ( + "context" "fmt" log2 "log" @@ -38,7 +39,7 @@ func (s *SampleLifecycle) Stop() error { fmt.Println("Service stopping..."); re func ExampleLifecycle() { // Create a network node to run protocols with the default values. - stack, err := node.New(&nodecfg.Config{}, log.New()) + stack, err := node.New(context.Background(), &nodecfg.Config{}, log.New()) if err != nil { log2.Fatalf("Failed to create network node: %v", err) } diff --git a/node/node_test.go b/node/node_test.go index b2d8c7f2c..3ed613650 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -50,7 +50,7 @@ func TestNodeCloseMultipleTimes(t *testing.T) { t.Skip("fix me on win please") } - stack, err := New(testNodeConfig(t), log.New()) + stack, err := New(context.Background(), testNodeConfig(t), log.New()) if err != nil { t.Fatalf("failed to create protocol stack: %v", err) } @@ -69,7 +69,7 @@ func TestNodeStartMultipleTimes(t *testing.T) { t.Skip("fix me on win please") } - stack, err := New(testNodeConfig(t), log.New()) + stack, err := New(context.Background(), testNodeConfig(t), log.New()) if err != nil { t.Fatalf("failed to create protocol stack: %v", err) } @@ -100,7 +100,7 @@ func TestNodeUsedDataDir(t *testing.T) { dir := t.TempDir() // Create a new node based on the data directory - original, originalErr := New(&nodecfg.Config{Dirs: datadir.New(dir)}, log.New()) + original, originalErr := New(context.Background(), &nodecfg.Config{Dirs: datadir.New(dir)}, log.New()) if originalErr != nil { t.Fatalf("failed to create original protocol stack: %v", originalErr) } @@ -110,14 +110,14 @@ func TestNodeUsedDataDir(t *testing.T) { } // Create a second node based on the same data directory and ensure failure - if _, err := New(&nodecfg.Config{Dirs: datadir.New(dir)}, log.New()); !errors.Is(err, ErrDataDirUsed) { - t.Fatalf("duplicate datadir failure mismatch: have %v, want %v", err, ErrDataDirUsed) + if _, err := New(context.Background(), &nodecfg.Config{Dirs: datadir.New(dir)}, log.New()); !errors.Is(err, datadir.ErrDataDirLocked) { + t.Fatalf("duplicate datadir failure mismatch: have %v, want %v", err, datadir.ErrDataDirLocked) } } // Tests whether a Lifecycle can be registered. func TestLifecycleRegistry_Successful(t *testing.T) { - stack, err := New(testNodeConfig(t), log.New()) + stack, err := New(context.Background(), testNodeConfig(t), log.New()) if err != nil { t.Fatalf("failed to create protocol stack: %v", err) } @@ -144,7 +144,7 @@ func TestNodeCloseClosesDB(t *testing.T) { } logger := log.New() - stack, _ := New(testNodeConfig(t), logger) + stack, _ := New(context.Background(), testNodeConfig(t), logger) defer stack.Close() db, err := OpenDatabase(stack.Config(), kv.SentryDB, "", false, logger) @@ -172,7 +172,7 @@ func TestNodeOpenDatabaseFromLifecycleStart(t *testing.T) { } logger := log.New() - stack, err := New(testNodeConfig(t), logger) + stack, err := New(context.Background(), testNodeConfig(t), logger) require.NoError(t, err) defer stack.Close() @@ -200,7 +200,7 @@ func TestNodeOpenDatabaseFromLifecycleStop(t *testing.T) { } logger := log.New() - stack, _ := New(testNodeConfig(t), logger) + stack, _ := New(context.Background(), testNodeConfig(t), logger) defer stack.Close() stack.RegisterLifecycle(&InstrumentedService{ @@ -219,7 +219,7 @@ func TestNodeOpenDatabaseFromLifecycleStop(t *testing.T) { // Tests that registered Lifecycles get started and stopped correctly. func TestLifecycleLifeCycle(t *testing.T) { - stack, _ := New(testNodeConfig(t), log.New()) + stack, _ := New(context.Background(), testNodeConfig(t), log.New()) defer stack.Close() started := make(map[string]bool) @@ -274,7 +274,7 @@ func TestLifecycleStartupError(t *testing.T) { t.Skip("fix me on win please") } - stack, err := New(testNodeConfig(t), log.New()) + stack, err := New(context.Background(), testNodeConfig(t), log.New()) if err != nil { t.Fatalf("failed to create protocol stack: %v", err) } @@ -324,7 +324,7 @@ func TestLifecycleStartupError(t *testing.T) { // Tests that even if a registered Lifecycle fails to shut down cleanly, it does // not influence the rest of the shutdown invocations. func TestLifecycleTerminationGuarantee(t *testing.T) { - stack, err := New(testNodeConfig(t), log.New()) + stack, err := New(context.Background(), testNodeConfig(t), log.New()) if err != nil { t.Fatalf("failed to create protocol stack: %v", err) } diff --git a/node/nodecfg/config_test.go b/node/nodecfg/config_test.go index fe1e4d10a..51284fe23 100644 --- a/node/nodecfg/config_test.go +++ b/node/nodecfg/config_test.go @@ -17,6 +17,7 @@ package nodecfg_test import ( + "context" "os" "path/filepath" "runtime" @@ -36,7 +37,7 @@ func TestDataDirCreation(t *testing.T) { } // Create a temporary data dir and check that it can be used by a node dir := t.TempDir() - node, err := node2.New(&nodecfg.Config{Dirs: datadir.New(dir)}, log.New()) + node, err := node2.New(context.Background(), &nodecfg.Config{Dirs: datadir.New(dir)}, log.New()) if err != nil { t.Fatalf("failed to create stack with existing datadir: %v", err) } @@ -45,7 +46,7 @@ func TestDataDirCreation(t *testing.T) { } // Generate a long non-existing datadir path and check that it gets created by a node dir = filepath.Join(dir, "a", "b", "c", "d", "e", "f") - node, err = node2.New(&nodecfg.Config{Dirs: datadir.New(dir)}, log.New()) + node, err = node2.New(context.Background(), &nodecfg.Config{Dirs: datadir.New(dir)}, log.New()) if err != nil { t.Fatalf("failed to create stack with creatable datadir: %v", err) } @@ -61,16 +62,6 @@ func TestDataDirCreation(t *testing.T) { t.Fatalf("failed to create temporary file: %v", err) } defer os.Remove(file.Name()) - - dir = filepath.Join(file.Name(), "invalid/path") - node, err = node2.New(&nodecfg.Config{Dirs: datadir.New(dir)}, log.New()) - if err == nil { - t.Fatalf("protocol stack created with an invalid datadir") - if err := node.Close(); err != nil { - t.Fatalf("failed to close node: %v", err) - } - } - _ = node } // Tests that IPC paths are correctly resolved to valid endpoints of different diff --git a/tests/bor/helper/miner.go b/tests/bor/helper/miner.go index 14abe7adc..df682f3ca 100644 --- a/tests/bor/helper/miner.go +++ b/tests/bor/helper/miner.go @@ -1,6 +1,7 @@ package helper import ( + "context" "crypto/ecdsa" "encoding/json" "math/big" @@ -92,7 +93,7 @@ func InitMiner(genesis *types.Genesis, privKey *ecdsa.PrivateKey, withoutHeimdal MdbxDBSizeLimit: 64 * datasize.MB, } - stack, err := node.New(nodeCfg, logger) + stack, err := node.New(context.Background(), nodeCfg, logger) if err != nil { return nil, nil, err } diff --git a/turbo/app/import_cmd.go b/turbo/app/import_cmd.go index 752836c8c..4733cccc9 100644 --- a/turbo/app/import_cmd.go +++ b/turbo/app/import_cmd.go @@ -62,7 +62,7 @@ func importChain(cliCtx *cli.Context) error { nodeCfg := turboNode.NewNodConfigUrfave(cliCtx, logger) ethCfg := turboNode.NewEthConfigUrfave(cliCtx, nodeCfg, logger) - stack := makeConfigNode(nodeCfg, logger) + stack := makeConfigNode(cliCtx.Context, nodeCfg, logger) defer stack.Close() ethereum, err := eth.New(stack, ethCfg, logger) diff --git a/turbo/app/make_app.go b/turbo/app/make_app.go index de25969d4..24267f568 100644 --- a/turbo/app/make_app.go +++ b/turbo/app/make_app.go @@ -2,6 +2,7 @@ package app import ( + "context" "fmt" "strings" @@ -132,12 +133,12 @@ func NewNodeConfig(ctx *cli.Context) *nodecfg.Config { return &nodeConfig } -func MakeConfigNodeDefault(ctx *cli.Context, logger log.Logger) *node.Node { - return makeConfigNode(NewNodeConfig(ctx), logger) +func MakeConfigNodeDefault(cliCtx *cli.Context, logger log.Logger) *node.Node { + return makeConfigNode(cliCtx.Context, NewNodeConfig(cliCtx), logger) } -func makeConfigNode(config *nodecfg.Config, logger log.Logger) *node.Node { - stack, err := node.New(config, logger) +func makeConfigNode(ctx context.Context, config *nodecfg.Config, logger log.Logger) *node.Node { + stack, err := node.New(ctx, config, logger) if err != nil { utils.Fatalf("Failed to create Erigon node: %v", err) } diff --git a/turbo/node/node.go b/turbo/node/node.go index 43cc17d9b..81ce2ae5e 100644 --- a/turbo/node/node.go +++ b/turbo/node/node.go @@ -2,6 +2,8 @@ package node import ( + "context" + "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/log/v3" "github.com/urfave/cli/v2" @@ -106,12 +108,13 @@ func NewEthConfigUrfave(ctx *cli.Context, nodeConfig *nodecfg.Config, logger log // * sync - `stagedsync.StagedSync`, an instance of staged sync, setup just as needed. // * optionalParams - additional parameters for running a node. func New( + ctx context.Context, nodeConfig *nodecfg.Config, ethConfig *ethconfig.Config, logger log.Logger, ) (*ErigonNode, error) { //prepareBuckets(optionalParams.CustomBuckets) - node, err := node.New(nodeConfig, logger) + node, err := node.New(ctx, nodeConfig, logger) if err != nil { utils.Fatalf("Failed to create Erigon node: %v", err) }