mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-22 03:30:37 +00:00
downloader: move from snapshots/db
to snapshots/downloader
(#8375)
This commit is contained in:
parent
8850f3a76d
commit
7dd678896a
@ -11,23 +11,21 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ledgerwatch/erigon-lib/common/dir"
|
||||
"github.com/ledgerwatch/erigon-lib/downloader/snaptype"
|
||||
"github.com/ledgerwatch/erigon-lib/kv"
|
||||
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
|
||||
"github.com/ledgerwatch/erigon/cmd/hack/tool"
|
||||
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snapcfg"
|
||||
"github.com/pelletier/go-toml/v2"
|
||||
|
||||
"github.com/c2h5oh/datasize"
|
||||
mdbx2 "github.com/erigontech/mdbx-go/mdbx"
|
||||
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
|
||||
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
|
||||
"github.com/ledgerwatch/erigon-lib/common"
|
||||
"github.com/ledgerwatch/erigon-lib/common/datadir"
|
||||
"github.com/ledgerwatch/erigon-lib/common/dir"
|
||||
"github.com/ledgerwatch/erigon-lib/downloader"
|
||||
downloadercfg2 "github.com/ledgerwatch/erigon-lib/downloader/downloadercfg"
|
||||
"github.com/ledgerwatch/erigon-lib/downloader/snaptype"
|
||||
proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader"
|
||||
"github.com/ledgerwatch/erigon-lib/kv"
|
||||
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
|
||||
"github.com/ledgerwatch/log/v3"
|
||||
"github.com/pelletier/go-toml/v2"
|
||||
"github.com/spf13/cobra"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
@ -37,12 +35,14 @@ import (
|
||||
"google.golang.org/grpc/reflection"
|
||||
|
||||
"github.com/ledgerwatch/erigon/cmd/downloader/downloadernat"
|
||||
"github.com/ledgerwatch/erigon/cmd/hack/tool"
|
||||
"github.com/ledgerwatch/erigon/cmd/utils"
|
||||
"github.com/ledgerwatch/erigon/common/paths"
|
||||
"github.com/ledgerwatch/erigon/p2p/nat"
|
||||
"github.com/ledgerwatch/erigon/params"
|
||||
"github.com/ledgerwatch/erigon/turbo/debug"
|
||||
"github.com/ledgerwatch/erigon/turbo/logging"
|
||||
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snapcfg"
|
||||
)
|
||||
|
||||
func main() {
|
||||
@ -146,6 +146,9 @@ var rootCmd = &cobra.Command{
|
||||
|
||||
func Downloader(ctx context.Context, logger log.Logger) error {
|
||||
dirs := datadir.New(datadirCli)
|
||||
if err := datadir.ApplyMigrations(dirs); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := checkChainName(dirs, chain); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -243,6 +246,10 @@ var printTorrentHashes = &cobra.Command{
|
||||
|
||||
func doPrintTorrentHashes(ctx context.Context, logger log.Logger) error {
|
||||
dirs := datadir.New(datadirCli)
|
||||
if err := datadir.ApplyMigrations(dirs); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if forceRebuild { // remove and create .torrent files (will re-read all snapshots)
|
||||
//removePieceCompletionStorage(snapDir)
|
||||
files, err := downloader.AllTorrentPaths(dirs)
|
||||
@ -371,7 +378,10 @@ func checkChainName(dirs datadir.Dirs, chainName string) error {
|
||||
if !dir.FileExist(filepath.Join(dirs.Chaindata, "mdbx.dat")) {
|
||||
return nil
|
||||
}
|
||||
db := mdbx.NewMDBX(log.New()).Path(dirs.Chaindata).Label(kv.ChainDB).MustOpen()
|
||||
db := mdbx.NewMDBX(log.New()).
|
||||
Path(dirs.Chaindata).Label(kv.ChainDB).
|
||||
Flags(func(flags uint) uint { return flags | mdbx2.Accede }).
|
||||
MustOpen()
|
||||
defer db.Close()
|
||||
if err := db.View(context.Background(), func(tx kv.Tx) error {
|
||||
cc := tool.ChainConfig(tx)
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"github.com/ledgerwatch/erigon-lib/common/datadir"
|
||||
"github.com/ledgerwatch/erigon-lib/common/dbg"
|
||||
"github.com/ledgerwatch/erigon/diagnostics"
|
||||
"github.com/ledgerwatch/erigon/metrics"
|
||||
@ -70,6 +71,10 @@ func runErigon(cliCtx *cli.Context) error {
|
||||
erigonInfoGauge.Set(1)
|
||||
|
||||
nodeCfg := node.NewNodConfigUrfave(cliCtx, logger)
|
||||
if err := datadir.ApplyMigrations(nodeCfg.Dirs); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ethCfg := node.NewEthConfigUrfave(cliCtx, nodeCfg, logger)
|
||||
|
||||
ethNode, err := node.New(cliCtx.Context, nodeCfg, ethCfg, logger)
|
||||
|
@ -634,6 +634,11 @@ func stageSnapshots(db kv.RwDB, ctx context.Context, logger log.Logger) error {
|
||||
}
|
||||
|
||||
func stageHeaders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
|
||||
dirs := datadir.New(datadirCli)
|
||||
if err := datadir.ApplyMigrations(dirs); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sn, borSn, agg := allSnapshots(ctx, db, logger)
|
||||
defer sn.Close()
|
||||
defer borSn.Close()
|
||||
@ -852,6 +857,10 @@ func stageSenders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
|
||||
|
||||
func stageExec(db kv.RwDB, ctx context.Context, logger log.Logger) error {
|
||||
dirs := datadir.New(datadirCli)
|
||||
if err := datadir.ApplyMigrations(dirs); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
engine, vmConfig, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
|
||||
must(sync.SetCurrentStage(stages.Execution))
|
||||
sn, borSn, agg := allSnapshots(ctx, db, logger)
|
||||
|
@ -169,6 +169,10 @@ func init() {
|
||||
|
||||
func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context.Context, logger1 log.Logger) error {
|
||||
dirs := datadir.New(datadirCli)
|
||||
if err := datadir.ApplyMigrations(dirs); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sn, borSn, agg := allSnapshots(ctx, db, logger1)
|
||||
defer sn.Close()
|
||||
defer borSn.Close()
|
||||
|
@ -67,7 +67,7 @@ func New(datadir string) Dirs {
|
||||
SnapHistory: filepath.Join(datadir, "snapshots", "history"),
|
||||
SnapDomain: filepath.Join(datadir, "snapshots", "domain"),
|
||||
SnapAccessors: filepath.Join(datadir, "snapshots", "accessor"),
|
||||
Downloader: filepath.Join(datadir, "snapshots", "db"),
|
||||
Downloader: filepath.Join(datadir, "downloader"),
|
||||
TxPool: filepath.Join(datadir, "txpool"),
|
||||
Nodes: filepath.Join(datadir, "nodes"),
|
||||
}
|
||||
@ -91,7 +91,7 @@ func convertFileLockError(err error) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func Flock(dirs Dirs) (*flock.Flock, bool, error) {
|
||||
func TryFlock(dirs Dirs) (*flock.Flock, bool, error) {
|
||||
// Lock the instance directory to prevent concurrent use by another instance as well as
|
||||
// accidental use of the instance directory as a database.
|
||||
l := flock.New(filepath.Join(dirs.DataDir, "LOCK"))
|
||||
@ -104,7 +104,12 @@ func Flock(dirs Dirs) (*flock.Flock, bool, error) {
|
||||
|
||||
// ApplyMigrations - if can get flock.
|
||||
func ApplyMigrations(dirs Dirs) error {
|
||||
lock, locked, err := Flock(dirs)
|
||||
need := downloaderV2MigrationNeeded(dirs)
|
||||
if !need {
|
||||
return nil
|
||||
}
|
||||
|
||||
lock, locked, err := TryFlock(dirs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -113,43 +118,28 @@ func ApplyMigrations(dirs Dirs) error {
|
||||
}
|
||||
defer lock.Unlock()
|
||||
|
||||
// add your migration here
|
||||
|
||||
if err := downloaderV2Migration(dirs); err != nil {
|
||||
return err
|
||||
}
|
||||
//if err := erigonV3foldersV31Migration(dirs); err != nil {
|
||||
// return err
|
||||
//}
|
||||
return nil
|
||||
}
|
||||
|
||||
func downloaderV2MigrationNeeded(dirs Dirs) bool {
|
||||
return dir.FileExist(filepath.Join(dirs.Snap, "db", "mdbx.dat"))
|
||||
}
|
||||
func downloaderV2Migration(dirs Dirs) error {
|
||||
// move db from `datadir/snapshot/db` to `datadir/downloader`
|
||||
if dir.Exist(filepath.Join(dirs.Snap, "db", "mdbx.dat")) { // migration from prev versions
|
||||
from, to := filepath.Join(dirs.Snap, "db", "mdbx.dat"), filepath.Join(dirs.Downloader, "mdbx.dat")
|
||||
if err := os.Rename(from, to); err != nil {
|
||||
//fall back to copy-file if folders are on different disks
|
||||
if err := copyFile(from, to); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if !downloaderV2MigrationNeeded(dirs) {
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// nolint
|
||||
func erigonV3foldersV31Migration(dirs Dirs) error {
|
||||
// migrate files db from `datadir/snapshot/warm` to `datadir/snapshots/domain`
|
||||
if dir.Exist(filepath.Join(dirs.Snap, "warm")) {
|
||||
warmDir := filepath.Join(dirs.Snap, "warm")
|
||||
moveFiles(warmDir, dirs.SnapDomain, ".kv")
|
||||
os.Rename(filepath.Join(dirs.SnapHistory, "salt.txt"), filepath.Join(dirs.Snap, "salt.txt"))
|
||||
moveFiles(warmDir, dirs.SnapDomain, ".kv")
|
||||
moveFiles(warmDir, dirs.SnapDomain, ".kvei")
|
||||
moveFiles(warmDir, dirs.SnapDomain, ".bt")
|
||||
moveFiles(dirs.SnapHistory, dirs.SnapAccessors, ".vi")
|
||||
moveFiles(dirs.SnapHistory, dirs.SnapAccessors, ".efi")
|
||||
moveFiles(dirs.SnapHistory, dirs.SnapAccessors, ".efei")
|
||||
moveFiles(dirs.SnapHistory, dirs.SnapIdx, ".ef")
|
||||
from, to := filepath.Join(dirs.Snap, "db", "mdbx.dat"), filepath.Join(dirs.Downloader, "mdbx.dat")
|
||||
if err := os.Rename(from, to); err != nil {
|
||||
//fall back to copy-file if folders are on different disks
|
||||
if err := copyFile(from, to); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -29,6 +29,7 @@ import (
|
||||
"github.com/anacrolix/torrent"
|
||||
"github.com/anacrolix/torrent/metainfo"
|
||||
"github.com/anacrolix/torrent/storage"
|
||||
"github.com/c2h5oh/datasize"
|
||||
"github.com/ledgerwatch/erigon-lib/common"
|
||||
"github.com/ledgerwatch/erigon-lib/common/datadir"
|
||||
"github.com/ledgerwatch/erigon-lib/common/dbg"
|
||||
@ -634,6 +635,7 @@ func openClient(dbDir, snapDir string, cfg *torrent.ClientConfig) (db kv.RwDB, c
|
||||
Label(kv.DownloaderDB).
|
||||
WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.DownloaderTablesCfg }).
|
||||
SyncPeriod(15 * time.Second).
|
||||
GrowthStep(16 * datasize.MB).
|
||||
Path(dbDir).
|
||||
Open()
|
||||
if err != nil {
|
||||
|
@ -234,7 +234,7 @@ func (n *Node) openDataDir(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
for retry := 0; ; retry++ {
|
||||
l, locked, err := datadir.Flock(n.config.Dirs)
|
||||
l, locked, err := datadir.TryFlock(n.config.Dirs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -324,8 +324,9 @@ func OpenDatabase(config *nodecfg.Config, label kv.Label, name string, readonly
|
||||
roTxLimit = int64(config.Http.DBReadConcurrency)
|
||||
}
|
||||
roTxsLimiter := semaphore.NewWeighted(roTxLimit) // 1 less than max to allow unlocking to happen
|
||||
opts := mdbx.NewMDBX(log.Root()).
|
||||
opts := mdbx.NewMDBX(logger).
|
||||
Path(dbPath).Label(label).
|
||||
GrowthStep(16 * datasize.MB).
|
||||
DBVerbosity(config.DatabaseVerbosity).RoTxsLimiter(roTxsLimiter)
|
||||
|
||||
if readonly {
|
||||
|
Loading…
Reference in New Issue
Block a user