From 7dd678896ae68fcadb996f07b6146cae6fab8a85 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Thu, 5 Oct 2023 14:25:00 +0700 Subject: [PATCH] downloader: move from `snapshots/db` to `snapshots/downloader` (#8375) --- cmd/downloader/main.go | 28 +++++++++---- cmd/erigon/main.go | 5 +++ cmd/integration/commands/stages.go | 9 ++++ cmd/integration/commands/state_stages.go | 4 ++ erigon-lib/common/datadir/dirs.go | 52 ++++++++++-------------- erigon-lib/downloader/downloader.go | 2 + node/node.go | 5 ++- 7 files changed, 63 insertions(+), 42 deletions(-) diff --git a/cmd/downloader/main.go b/cmd/downloader/main.go index a437b53c6..4e88f436c 100644 --- a/cmd/downloader/main.go +++ b/cmd/downloader/main.go @@ -11,23 +11,21 @@ import ( "strings" "time" - "github.com/ledgerwatch/erigon-lib/common/dir" - "github.com/ledgerwatch/erigon-lib/downloader/snaptype" - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon-lib/kv/mdbx" - "github.com/ledgerwatch/erigon/cmd/hack/tool" - "github.com/ledgerwatch/erigon/turbo/snapshotsync/snapcfg" - "github.com/pelletier/go-toml/v2" - "github.com/c2h5oh/datasize" + mdbx2 "github.com/erigontech/mdbx-go/mdbx" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/datadir" + "github.com/ledgerwatch/erigon-lib/common/dir" "github.com/ledgerwatch/erigon-lib/downloader" downloadercfg2 "github.com/ledgerwatch/erigon-lib/downloader/downloadercfg" + "github.com/ledgerwatch/erigon-lib/downloader/snaptype" proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/log/v3" + "github.com/pelletier/go-toml/v2" "github.com/spf13/cobra" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -37,12 +35,14 @@ import ( "google.golang.org/grpc/reflection" "github.com/ledgerwatch/erigon/cmd/downloader/downloadernat" + "github.com/ledgerwatch/erigon/cmd/hack/tool" "github.com/ledgerwatch/erigon/cmd/utils" "github.com/ledgerwatch/erigon/common/paths" "github.com/ledgerwatch/erigon/p2p/nat" "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/turbo/debug" "github.com/ledgerwatch/erigon/turbo/logging" + "github.com/ledgerwatch/erigon/turbo/snapshotsync/snapcfg" ) func main() { @@ -146,6 +146,9 @@ var rootCmd = &cobra.Command{ func Downloader(ctx context.Context, logger log.Logger) error { dirs := datadir.New(datadirCli) + if err := datadir.ApplyMigrations(dirs); err != nil { + return err + } if err := checkChainName(dirs, chain); err != nil { return err } @@ -243,6 +246,10 @@ var printTorrentHashes = &cobra.Command{ func doPrintTorrentHashes(ctx context.Context, logger log.Logger) error { dirs := datadir.New(datadirCli) + if err := datadir.ApplyMigrations(dirs); err != nil { + return err + } + if forceRebuild { // remove and create .torrent files (will re-read all snapshots) //removePieceCompletionStorage(snapDir) files, err := downloader.AllTorrentPaths(dirs) @@ -371,7 +378,10 @@ func checkChainName(dirs datadir.Dirs, chainName string) error { if !dir.FileExist(filepath.Join(dirs.Chaindata, "mdbx.dat")) { return nil } - db := mdbx.NewMDBX(log.New()).Path(dirs.Chaindata).Label(kv.ChainDB).MustOpen() + db := mdbx.NewMDBX(log.New()). + Path(dirs.Chaindata).Label(kv.ChainDB). + Flags(func(flags uint) uint { return flags | mdbx2.Accede }). + MustOpen() defer db.Close() if err := db.View(context.Background(), func(tx kv.Tx) error { cc := tool.ChainConfig(tx) diff --git a/cmd/erigon/main.go b/cmd/erigon/main.go index 135d72c76..24022e8d6 100644 --- a/cmd/erigon/main.go +++ b/cmd/erigon/main.go @@ -9,6 +9,7 @@ import ( "reflect" "strings" + "github.com/ledgerwatch/erigon-lib/common/datadir" "github.com/ledgerwatch/erigon-lib/common/dbg" "github.com/ledgerwatch/erigon/diagnostics" "github.com/ledgerwatch/erigon/metrics" @@ -70,6 +71,10 @@ func runErigon(cliCtx *cli.Context) error { erigonInfoGauge.Set(1) nodeCfg := node.NewNodConfigUrfave(cliCtx, logger) + if err := datadir.ApplyMigrations(nodeCfg.Dirs); err != nil { + return err + } + ethCfg := node.NewEthConfigUrfave(cliCtx, nodeCfg, logger) ethNode, err := node.New(cliCtx.Context, nodeCfg, ethCfg, logger) diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 70855cb1f..218e3ce49 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -634,6 +634,11 @@ func stageSnapshots(db kv.RwDB, ctx context.Context, logger log.Logger) error { } func stageHeaders(db kv.RwDB, ctx context.Context, logger log.Logger) error { + dirs := datadir.New(datadirCli) + if err := datadir.ApplyMigrations(dirs); err != nil { + return err + } + sn, borSn, agg := allSnapshots(ctx, db, logger) defer sn.Close() defer borSn.Close() @@ -852,6 +857,10 @@ func stageSenders(db kv.RwDB, ctx context.Context, logger log.Logger) error { func stageExec(db kv.RwDB, ctx context.Context, logger log.Logger) error { dirs := datadir.New(datadirCli) + if err := datadir.ApplyMigrations(dirs); err != nil { + return err + } + engine, vmConfig, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger) must(sync.SetCurrentStage(stages.Execution)) sn, borSn, agg := allSnapshots(ctx, db, logger) diff --git a/cmd/integration/commands/state_stages.go b/cmd/integration/commands/state_stages.go index 650b5985d..266836850 100644 --- a/cmd/integration/commands/state_stages.go +++ b/cmd/integration/commands/state_stages.go @@ -169,6 +169,10 @@ func init() { func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context.Context, logger1 log.Logger) error { dirs := datadir.New(datadirCli) + if err := datadir.ApplyMigrations(dirs); err != nil { + return err + } + sn, borSn, agg := allSnapshots(ctx, db, logger1) defer sn.Close() defer borSn.Close() diff --git a/erigon-lib/common/datadir/dirs.go b/erigon-lib/common/datadir/dirs.go index 736dea559..85c10b063 100644 --- a/erigon-lib/common/datadir/dirs.go +++ b/erigon-lib/common/datadir/dirs.go @@ -67,7 +67,7 @@ func New(datadir string) Dirs { SnapHistory: filepath.Join(datadir, "snapshots", "history"), SnapDomain: filepath.Join(datadir, "snapshots", "domain"), SnapAccessors: filepath.Join(datadir, "snapshots", "accessor"), - Downloader: filepath.Join(datadir, "snapshots", "db"), + Downloader: filepath.Join(datadir, "downloader"), TxPool: filepath.Join(datadir, "txpool"), Nodes: filepath.Join(datadir, "nodes"), } @@ -91,7 +91,7 @@ func convertFileLockError(err error) error { return err } -func Flock(dirs Dirs) (*flock.Flock, bool, error) { +func TryFlock(dirs Dirs) (*flock.Flock, bool, error) { // Lock the instance directory to prevent concurrent use by another instance as well as // accidental use of the instance directory as a database. l := flock.New(filepath.Join(dirs.DataDir, "LOCK")) @@ -104,7 +104,12 @@ func Flock(dirs Dirs) (*flock.Flock, bool, error) { // ApplyMigrations - if can get flock. func ApplyMigrations(dirs Dirs) error { - lock, locked, err := Flock(dirs) + need := downloaderV2MigrationNeeded(dirs) + if !need { + return nil + } + + lock, locked, err := TryFlock(dirs) if err != nil { return err } @@ -113,43 +118,28 @@ func ApplyMigrations(dirs Dirs) error { } defer lock.Unlock() + // add your migration here + if err := downloaderV2Migration(dirs); err != nil { return err } - //if err := erigonV3foldersV31Migration(dirs); err != nil { - // return err - //} return nil } +func downloaderV2MigrationNeeded(dirs Dirs) bool { + return dir.FileExist(filepath.Join(dirs.Snap, "db", "mdbx.dat")) +} func downloaderV2Migration(dirs Dirs) error { // move db from `datadir/snapshot/db` to `datadir/downloader` - if dir.Exist(filepath.Join(dirs.Snap, "db", "mdbx.dat")) { // migration from prev versions - from, to := filepath.Join(dirs.Snap, "db", "mdbx.dat"), filepath.Join(dirs.Downloader, "mdbx.dat") - if err := os.Rename(from, to); err != nil { - //fall back to copy-file if folders are on different disks - if err := copyFile(from, to); err != nil { - return err - } - } + if !downloaderV2MigrationNeeded(dirs) { + return nil } - return nil -} - -// nolint -func erigonV3foldersV31Migration(dirs Dirs) error { - // migrate files db from `datadir/snapshot/warm` to `datadir/snapshots/domain` - if dir.Exist(filepath.Join(dirs.Snap, "warm")) { - warmDir := filepath.Join(dirs.Snap, "warm") - moveFiles(warmDir, dirs.SnapDomain, ".kv") - os.Rename(filepath.Join(dirs.SnapHistory, "salt.txt"), filepath.Join(dirs.Snap, "salt.txt")) - moveFiles(warmDir, dirs.SnapDomain, ".kv") - moveFiles(warmDir, dirs.SnapDomain, ".kvei") - moveFiles(warmDir, dirs.SnapDomain, ".bt") - moveFiles(dirs.SnapHistory, dirs.SnapAccessors, ".vi") - moveFiles(dirs.SnapHistory, dirs.SnapAccessors, ".efi") - moveFiles(dirs.SnapHistory, dirs.SnapAccessors, ".efei") - moveFiles(dirs.SnapHistory, dirs.SnapIdx, ".ef") + from, to := filepath.Join(dirs.Snap, "db", "mdbx.dat"), filepath.Join(dirs.Downloader, "mdbx.dat") + if err := os.Rename(from, to); err != nil { + //fall back to copy-file if folders are on different disks + if err := copyFile(from, to); err != nil { + return err + } } return nil } diff --git a/erigon-lib/downloader/downloader.go b/erigon-lib/downloader/downloader.go index e9989a961..0d0828f5c 100644 --- a/erigon-lib/downloader/downloader.go +++ b/erigon-lib/downloader/downloader.go @@ -29,6 +29,7 @@ import ( "github.com/anacrolix/torrent" "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/storage" + "github.com/c2h5oh/datasize" "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/datadir" "github.com/ledgerwatch/erigon-lib/common/dbg" @@ -634,6 +635,7 @@ func openClient(dbDir, snapDir string, cfg *torrent.ClientConfig) (db kv.RwDB, c Label(kv.DownloaderDB). WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.DownloaderTablesCfg }). SyncPeriod(15 * time.Second). + GrowthStep(16 * datasize.MB). Path(dbDir). Open() if err != nil { diff --git a/node/node.go b/node/node.go index 04a5dbbb7..2b313f2e3 100644 --- a/node/node.go +++ b/node/node.go @@ -234,7 +234,7 @@ func (n *Node) openDataDir(ctx context.Context) error { return err } for retry := 0; ; retry++ { - l, locked, err := datadir.Flock(n.config.Dirs) + l, locked, err := datadir.TryFlock(n.config.Dirs) if err != nil { return err } @@ -324,8 +324,9 @@ func OpenDatabase(config *nodecfg.Config, label kv.Label, name string, readonly roTxLimit = int64(config.Http.DBReadConcurrency) } roTxsLimiter := semaphore.NewWeighted(roTxLimit) // 1 less than max to allow unlocking to happen - opts := mdbx.NewMDBX(log.Root()). + opts := mdbx.NewMDBX(logger). Path(dbPath).Label(label). + GrowthStep(16 * datasize.MB). DBVerbosity(config.DatabaseVerbosity).RoTxsLimiter(roTxsLimiter) if readonly {