From ded8283df7af5e67e3c40e649829ab2544bceb01 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Thu, 27 Apr 2023 10:42:12 +0700 Subject: [PATCH] erigon backup: v0 of sub-command (#7396) --- .github/workflows/check.yml | 15 + cmd/caplin-phase1/main.go | 2 +- cmd/devnet/node/node.go | 2 +- cmd/erigon-cl/main.go | 2 +- cmd/erigon-el/main.go | 2 +- cmd/erigon/main.go | 2 +- cmd/erigoncustom/main.go | 2 +- cmd/evm/internal/t8ntool/flags.go | 2 +- cmd/integration/commands/refetence_db.go | 104 +------ cmd/sentinel/main.go | 2 +- cmd/txpool/main.go | 2 +- cmd/utils/flags.go | 17 +- cmd/utils/flags/flags.go | 21 ++ core/rawdb/rawdbreset/reset_stages.go | 172 ++---------- core/state/intra_block_state_test.go | 2 - go.mod | 2 +- go.sum | 4 +- turbo/app/backup_cmd.go | 139 +++++++++ turbo/app/{import.go => import_cmd.go} | 2 +- turbo/app/{init.go => init_cmd.go} | 2 +- turbo/app/make_app.go | 31 ++- turbo/app/{snapshots.go => snapshots_cmd.go} | 4 +- turbo/app/{support.go => support_cmd.go} | 2 +- turbo/backup/backup.go | 279 +++++++++++++++++++ 24 files changed, 534 insertions(+), 280 deletions(-) create mode 100644 .github/workflows/check.yml create mode 100644 turbo/app/backup_cmd.go rename turbo/app/{import.go => import_cmd.go} (99%) rename turbo/app/{init.go => init_cmd.go} (98%) rename turbo/app/{snapshots.go => snapshots_cmd.go} (99%) rename turbo/app/{support.go => support_cmd.go} (99%) create mode 100644 turbo/backup/backup.go diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml new file mode 100644 index 000000000..20946426d --- /dev/null +++ b/.github/workflows/check.yml @@ -0,0 +1,15 @@ +name: Check + +on: + push: + branches: + - devel + workflow_dispatch: + +jobs: + goreleaser: + runs-on: ubuntu-latest + steps: + - run: echo ${GITHUB_REF} + - run: echo ${GITHUB_REF#refs/tags/} + - run: echo ${GITHUB_REF##*/} diff --git a/cmd/caplin-phase1/main.go b/cmd/caplin-phase1/main.go index 262b02c1b..656a05ba2 100644 --- a/cmd/caplin-phase1/main.go +++ b/cmd/caplin-phase1/main.go @@ -36,7 +36,7 @@ import ( ) func main() { - app := lightclientapp.MakeApp(runCaplinNode, flags.LCDefaultFlags) + app := lightclientapp.MakeApp("caplin-phase1", runCaplinNode, flags.LCDefaultFlags) if err := app.Run(os.Args); err != nil { _, printErr := fmt.Fprintln(os.Stderr, err) if printErr != nil { diff --git a/cmd/devnet/node/node.go b/cmd/devnet/node/node.go index 66b00c10a..283ddbc6f 100644 --- a/cmd/devnet/node/node.go +++ b/cmd/devnet/node/node.go @@ -64,7 +64,7 @@ func StartNode(wg *sync.WaitGroup, args []string) { os.Exit(1) }() - app := erigonapp.MakeApp(runNode, erigoncli.DefaultFlags) + app := erigonapp.MakeApp("devnet", runNode, erigoncli.DefaultFlags) nodeNumber++ // increment the number of nodes on the network if err := app.Run(args); err != nil { _, printErr := fmt.Fprintln(os.Stderr, err) diff --git a/cmd/erigon-cl/main.go b/cmd/erigon-cl/main.go index 8a78f90c2..3391962da 100644 --- a/cmd/erigon-cl/main.go +++ b/cmd/erigon-cl/main.go @@ -32,7 +32,7 @@ import ( ) func main() { - app := sentinelapp.MakeApp(runConsensusLayerNode, flags.CLDefaultFlags) + app := sentinelapp.MakeApp("erigon-cl", runConsensusLayerNode, flags.CLDefaultFlags) if err := app.Run(os.Args); err != nil { _, printErr := fmt.Fprintln(os.Stderr, err) if printErr != nil { diff --git a/cmd/erigon-el/main.go b/cmd/erigon-el/main.go index 0c9159991..3e954b2e9 100644 --- a/cmd/erigon-el/main.go +++ b/cmd/erigon-el/main.go @@ -34,7 +34,7 @@ func main() { os.Exit(1) }() - app := erigonapp.MakeApp(runErigon, erigoncli.DefaultFlags) + app := erigonapp.MakeApp("erigon-el", runErigon, erigoncli.DefaultFlags) if err := app.Run(os.Args); err != nil { _, printErr := fmt.Fprintln(os.Stderr, err) if printErr != nil { diff --git a/cmd/erigon/main.go b/cmd/erigon/main.go index 432bcb0f4..e18380211 100644 --- a/cmd/erigon/main.go +++ b/cmd/erigon/main.go @@ -32,7 +32,7 @@ func main() { os.Exit(1) }() - app := erigonapp.MakeApp(runErigon, erigoncli.DefaultFlags) + app := erigonapp.MakeApp("erigon", runErigon, erigoncli.DefaultFlags) if err := app.Run(os.Args); err != nil { _, printErr := fmt.Fprintln(os.Stderr, err) if printErr != nil { diff --git a/cmd/erigoncustom/main.go b/cmd/erigoncustom/main.go index ad3ef4748..1870785fc 100644 --- a/cmd/erigoncustom/main.go +++ b/cmd/erigoncustom/main.go @@ -24,7 +24,7 @@ const ( // the regular main function func main() { // initializing Erigon application here and providing our custom flag - app := erigonapp.MakeApp(runErigon, + app := erigonapp.MakeApp("erigoncustom", runErigon, append(erigoncli.DefaultFlags, &flag), // always use DefaultFlags, but add a new one in the end. ) if err := app.Run(os.Args); err != nil { diff --git a/cmd/evm/internal/t8ntool/flags.go b/cmd/evm/internal/t8ntool/flags.go index 3a42f32e5..fa0886e7f 100644 --- a/cmd/evm/internal/t8ntool/flags.go +++ b/cmd/evm/internal/t8ntool/flags.go @@ -103,7 +103,7 @@ var ( } VerbosityFlag = cli.IntFlag{ Name: "verbosity", - Usage: "sets the verbosity level", + Usage: "Deprecated. Use --log.console.verbosity, --log.dir.verbosity, --torrent.verbosity, --database.verbosity", Value: 3, } ) diff --git a/cmd/integration/commands/refetence_db.go b/cmd/integration/commands/refetence_db.go index 267105f3b..3e8fcb7b5 100644 --- a/cmd/integration/commands/refetence_db.go +++ b/cmd/integration/commands/refetence_db.go @@ -4,24 +4,20 @@ import ( "bufio" "bytes" "context" - "encoding/hex" "errors" "fmt" "os" - "runtime" "strings" "sync/atomic" "time" common2 "github.com/ledgerwatch/erigon-lib/common" - "github.com/ledgerwatch/erigon-lib/common/dbg" "github.com/ledgerwatch/erigon-lib/kv" mdbx2 "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/erigon/common" - "github.com/ledgerwatch/erigon/core/rawdb/rawdbreset" + "github.com/ledgerwatch/erigon/turbo/backup" "github.com/ledgerwatch/log/v3" "github.com/spf13/cobra" - "github.com/torquem-ch/mdbx-go/mdbx" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" ) @@ -99,8 +95,9 @@ var cmdMdbxToMdbx = &cobra.Command{ Short: "copy data from '--chaindata' to '--chaindata.to'", Run: func(cmd *cobra.Command, args []string) { ctx, _ := common2.RootContext() - logger := log.New() - err := mdbxToMdbx(ctx, logger, chaindata, toChaindata) + + from, to := backup.OpenPair(chaindata, toChaindata, kv.ChainDB, 0) + err := backup.Kv2kv(ctx, from, to, nil, backup.ReadAheadThreads) if err != nil && !errors.Is(err, context.Canceled) { if !errors.Is(err, context.Canceled) { log.Error(err.Error()) @@ -427,96 +424,3 @@ MainLoop: return nil } - -func mdbxToMdbx(ctx context.Context, logger log.Logger, from, to string) error { - src := mdbx2.NewMDBX(logger).Path(from).Flags(func(flags uint) uint { return mdbx.Readonly | mdbx.Accede }).MustOpen() - dst := mdbx2.NewMDBX(logger).Path(to). - WriteMap(). - Flags(func(flags uint) uint { return flags | mdbx.NoMemInit | mdbx.WriteMap | mdbx.Accede }). - MustOpen() - return kv2kv(ctx, src, dst) -} - -func kv2kv(ctx context.Context, src, dst kv.RwDB) error { - srcTx, err1 := src.BeginRo(ctx) - if err1 != nil { - return err1 - } - defer srcTx.Rollback() - - commitEvery := time.NewTicker(5 * time.Minute) - defer commitEvery.Stop() - logEvery := time.NewTicker(20 * time.Second) - defer logEvery.Stop() - - var total uint64 - for name, b := range src.AllBuckets() { - if b.IsDeprecated { - continue - } - go rawdbreset.WarmupTable(ctx, src, name, log.LvlTrace) - srcC, err := srcTx.Cursor(name) - if err != nil { - return err - } - total, _ = srcC.Count() - - dstTx, err1 := dst.BeginRw(ctx) - if err1 != nil { - return err1 - } - defer dstTx.Rollback() - _ = dstTx.ClearBucket(name) - - c, err := dstTx.RwCursor(name) - if err != nil { - return err - } - casted, isDupsort := c.(kv.RwCursorDupSort) - i := uint64(0) - - for k, v, err := srcC.First(); k != nil; k, v, err = srcC.Next() { - if err != nil { - return err - } - - if isDupsort { - if err = casted.AppendDup(k, v); err != nil { - panic(err) - } - } else { - if err = c.Append(k, v); err != nil { - panic(err) - } - } - - i++ - select { - case <-ctx.Done(): - return ctx.Err() - case <-logEvery.C: - var m runtime.MemStats - dbg.ReadMemStats(&m) - log.Info("Progress", "bucket", name, "progress", fmt.Sprintf("%.1fm/%.1fm", float64(i)/1_000_000, float64(total)/1_000_000), "key", hex.EncodeToString(k), - "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys)) - default: - } - } - - // migrate bucket sequences to native mdbx implementation - //currentID, err := srcTx.Sequence(name, 0) - //if err != nil { - // return err - //} - //_, err = dstTx.Sequence(name, currentID) - //if err != nil { - // return err - //} - if err2 := dstTx.Commit(); err2 != nil { - return err2 - } - } - srcTx.Rollback() - log.Info("done") - return nil -} diff --git a/cmd/sentinel/main.go b/cmd/sentinel/main.go index d06a4d349..e3a722ad8 100644 --- a/cmd/sentinel/main.go +++ b/cmd/sentinel/main.go @@ -29,7 +29,7 @@ import ( ) func main() { - app := sentinelapp.MakeApp(runSentinelNode, flags.CLDefaultFlags) + app := sentinelapp.MakeApp("sentinel", runSentinelNode, flags.CLDefaultFlags) if err := app.Run(os.Args); err != nil { _, printErr := fmt.Fprintln(os.Stderr, err) if printErr != nil { diff --git a/cmd/txpool/main.go b/cmd/txpool/main.go index 138d871b7..3646a894e 100644 --- a/cmd/txpool/main.go +++ b/cmd/txpool/main.go @@ -81,7 +81,7 @@ func init() { var rootCmd = &cobra.Command{ Use: "txpool", - Short: "Launch externa Transaction Pool instance - same as built-into Erigon, but as independent Service", + Short: "Launch external Transaction Pool instance - same as built-into Erigon, but as independent Process", PersistentPreRunE: func(cmd *cobra.Command, args []string) error { return debug.SetupCobra(cmd) }, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 54a2fa07a..06b0ea8a7 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1151,29 +1151,16 @@ func setDataDir(ctx *cli.Context, cfg *nodecfg.Config) { } cfg.Dirs = datadir.New(cfg.Dirs.DataDir) - if err := cfg.MdbxPageSize.UnmarshalText([]byte(ctx.String(DbPageSizeFlag.Name))); err != nil { - panic(err) - } + cfg.MdbxPageSize = flags.DBPageSizeFlagUnmarshal(ctx, DbPageSizeFlag.Name, DbPageSizeFlag.Usage) if err := cfg.MdbxDBSizeLimit.UnmarshalText([]byte(ctx.String(DbSizeLimitFlag.Name))); err != nil { panic(err) } - sz := cfg.MdbxPageSize.Bytes() - if !isPowerOfTwo(sz) || sz < 256 || sz > 64*1024 { - panic(fmt.Errorf("invalid --db.pageSize: %s=%d, see: %s", ctx.String(DbPageSizeFlag.Name), sz, DbPageSizeFlag.Usage)) - } szLimit := cfg.MdbxDBSizeLimit.Bytes() - if szLimit%sz != 0 || szLimit < 256 { + if szLimit%szLimit != 0 || szLimit < 256 { panic(fmt.Errorf("invalid --db.size.limit: %s=%d, see: %s", ctx.String(DbSizeLimitFlag.Name), szLimit, DbSizeLimitFlag.Usage)) } } -func isPowerOfTwo(n uint64) bool { - if n == 0 { //corner case: if n is zero it will also consider as power 2 - return true - } - return n&(n-1) == 0 -} - func setDataDirCobra(f *pflag.FlagSet, cfg *nodecfg.Config) { dirname, err := f.GetString(DataDirFlag.Name) if err != nil { diff --git a/cmd/utils/flags/flags.go b/cmd/utils/flags/flags.go index 683a3cd4a..c4e583424 100644 --- a/cmd/utils/flags/flags.go +++ b/cmd/utils/flags/flags.go @@ -20,12 +20,14 @@ import ( "encoding" "errors" "flag" + "fmt" "math/big" "os" "os/user" "path/filepath" "strings" + "github.com/c2h5oh/datasize" "github.com/ledgerwatch/erigon/common/math" "github.com/urfave/cli/v2" ) @@ -342,3 +344,22 @@ func eachName(f cli.Flag, fn func(string)) { fn(name) } } + +func DBPageSizeFlagUnmarshal(cliCtx *cli.Context, flagName, flagUsage string) datasize.ByteSize { + var pageSize datasize.ByteSize + if err := pageSize.UnmarshalText([]byte(cliCtx.String(flagName))); err != nil { + panic(err) + } + sz := pageSize.Bytes() + if !isPowerOfTwo(sz) || sz < 256 || sz > 64*1024 { + panic(fmt.Errorf("invalid --%s: %d, see: %s", flagName, sz, flagUsage)) + } + return pageSize +} + +func isPowerOfTwo(n uint64) bool { + if n == 0 { //corner case: if n is zero it will also consider as power 2 + return true + } + return n&(n-1) == 0 +} diff --git a/core/rawdb/rawdbreset/reset_stages.go b/core/rawdb/rawdbreset/reset_stages.go index 185ad2108..e63b82ebb 100644 --- a/core/rawdb/rawdbreset/reset_stages.go +++ b/core/rawdb/rawdbreset/reset_stages.go @@ -2,11 +2,7 @@ package rawdbreset import ( "context" - "encoding/binary" "fmt" - "sync" - "sync/atomic" - "time" "github.com/ledgerwatch/erigon-lib/chain" "github.com/ledgerwatch/erigon-lib/common/datadir" @@ -14,16 +10,15 @@ import ( "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/kvcfg" "github.com/ledgerwatch/erigon-lib/state" - "github.com/ledgerwatch/log/v3" - "golang.org/x/sync/errgroup" - "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/core" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/eth/stagedsync" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" + "github.com/ledgerwatch/erigon/turbo/backup" "github.com/ledgerwatch/erigon/turbo/services" "github.com/ledgerwatch/erigon/turbo/snapshotsync" + "github.com/ledgerwatch/log/v3" ) func ResetState(db kv.RwDB, ctx context.Context, chain string, tmpDir string) error { @@ -99,7 +94,7 @@ func ResetBlocks(tx kv.RwTx, db kv.RoDB, snapshots *snapshotsync.RoSnapshots, ag ethtx = kv.EthTxV3 } - if err := clearTables(context.Background(), db, tx, + if err := backup.ClearTables(context.Background(), db, tx, kv.NonCanonicalTxs, ethtx, kv.MaxTxNum, @@ -126,7 +121,7 @@ func ResetBlocks(tx kv.RwTx, db kv.RoDB, snapshots *snapshotsync.RoSnapshots, ag return nil } func ResetSenders(ctx context.Context, db kv.RwDB, tx kv.RwTx) error { - if err := clearTables(ctx, db, tx, kv.Senders); err != nil { + if err := backup.ClearTables(ctx, db, tx, kv.Senders); err != nil { return nil } return clearStageProgress(tx, stages.Senders) @@ -134,12 +129,12 @@ func ResetSenders(ctx context.Context, db kv.RwDB, tx kv.RwTx) error { func WarmupExec(ctx context.Context, db kv.RwDB) (err error) { for _, tbl := range stateBuckets { - WarmupTable(ctx, db, tbl, log.LvlInfo) + backup.WarmupTable(ctx, db, tbl, log.LvlInfo, backup.ReadAheadThreads) } historyV3 := kvcfg.HistoryV3.FromDB(db) if historyV3 { //hist v2 is too big, if you have so much ram, just use `cat mdbx.dat > /dev/null` to warmup for _, tbl := range stateHistoryV3Buckets { - WarmupTable(ctx, db, tbl, log.LvlInfo) + backup.WarmupTable(ctx, db, tbl, log.LvlInfo, backup.ReadAheadThreads) } } return @@ -157,7 +152,7 @@ func ResetExec(ctx context.Context, db kv.RwDB, chain string, tmpDir string) (er return err } - if err := clearTables(ctx, db, tx, stateBuckets...); err != nil { + if err := backup.ClearTables(ctx, db, tx, stateBuckets...); err != nil { return nil } for _, b := range stateBuckets { @@ -166,7 +161,7 @@ func ResetExec(ctx context.Context, db kv.RwDB, chain string, tmpDir string) (er } } - if err := clearTables(ctx, db, tx, stateHistoryBuckets...); err != nil { + if err := backup.ClearTables(ctx, db, tx, stateHistoryBuckets...); err != nil { return nil } if !historyV3 { @@ -231,135 +226,6 @@ var stateHistoryV4Buckets = []string{ kv.CommitmentKeys, kv.CommitmentVals, kv.CommitmentHistoryKeys, kv.CommitmentHistoryVals, kv.CommitmentIdx, } -func WarmupTable(ctx context.Context, db kv.RoDB, bucket string, lvl log.Lvl) { - const ThreadsLimit = 128 - var total uint64 - db.View(ctx, func(tx kv.Tx) error { - c, _ := tx.Cursor(bucket) - total, _ = c.Count() - return nil - }) - if total < 10_000 { - return - } - progress := atomic.Int64{} - - logEvery := time.NewTicker(20 * time.Second) - defer logEvery.Stop() - - g, ctx := errgroup.WithContext(ctx) - g.SetLimit(ThreadsLimit) - for i := 0; i < 256; i++ { - for j := 0; j < 256; j++ { - i := i - j := j - g.Go(func() error { - return db.View(ctx, func(tx kv.Tx) error { - it, err := tx.Prefix(bucket, []byte{byte(i), byte(j)}) - if err != nil { - return err - } - for it.HasNext() { - _, _, err = it.Next() - if err != nil { - return err - } - progress.Add(1) - select { - case <-ctx.Done(): - return ctx.Err() - case <-logEvery.C: - log.Log(lvl, fmt.Sprintf("Progress: %s %.2f%%", bucket, 100*float64(progress.Load())/float64(total))) - default: - } - } - return nil - }) - }) - } - } - for i := 0; i < 1_000; i++ { - i := i - g.Go(func() error { - return db.View(ctx, func(tx kv.Tx) error { - seek := make([]byte, 8) - binary.BigEndian.PutUint64(seek, uint64(i*100_000)) - it, err := tx.Prefix(bucket, seek) - if err != nil { - return err - } - for it.HasNext() { - _, _, err = it.Next() - if err != nil { - return err - } - select { - case <-ctx.Done(): - return ctx.Err() - case <-logEvery.C: - log.Log(lvl, fmt.Sprintf("Progress: %s %.2f%%", bucket, 100*float64(progress.Load())/float64(total))) - default: - } - } - return nil - }) - }) - } - _ = g.Wait() -} -func Warmup(ctx context.Context, db kv.RwDB, lvl log.Lvl, stList ...stages.SyncStage) error { - for _, st := range stList { - for _, tbl := range Tables[st] { - WarmupTable(ctx, db, tbl, lvl) - } - } - return nil -} - -func Reset(ctx context.Context, db kv.RwDB, stagesList ...stages.SyncStage) error { - return db.Update(ctx, func(tx kv.RwTx) error { - for _, st := range stagesList { - if err := clearTables(ctx, db, tx, Tables[st]...); err != nil { - return err - } - if err := clearStageProgress(tx, stagesList...); err != nil { - return err - } - } - return nil - }) -} - -func warmup(ctx context.Context, db kv.RoDB, bucket string) func() { - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - WarmupTable(ctx, db, bucket, log.LvlInfo) - }() - return func() { wg.Wait() } -} - -func clearTables(ctx context.Context, db kv.RoDB, tx kv.RwTx, tables ...string) error { - for _, tbl := range tables { - if err := clearTable(ctx, db, tx, tbl); err != nil { - return err - } - } - return nil -} - -func clearTable(ctx context.Context, db kv.RoDB, tx kv.RwTx, table string) error { - ctx, cancel := context.WithCancel(ctx) - clean := warmup(ctx, db, table) - defer func() { - cancel() - clean() - }() - log.Info("Clear", "table", table) - return tx.ClearBucket(table) -} - func clearStageProgress(tx kv.RwTx, stagesList ...stages.SyncStage) error { for _, stage := range stagesList { if err := stages.SaveStageProgress(tx, stage, 0); err != nil { @@ -371,3 +237,25 @@ func clearStageProgress(tx kv.RwTx, stagesList ...stages.SyncStage) error { } return nil } + +func Reset(ctx context.Context, db kv.RwDB, stagesList ...stages.SyncStage) error { + return db.Update(ctx, func(tx kv.RwTx) error { + for _, st := range stagesList { + if err := backup.ClearTables(ctx, db, tx, Tables[st]...); err != nil { + return err + } + if err := clearStageProgress(tx, stagesList...); err != nil { + return err + } + } + return nil + }) +} +func Warmup(ctx context.Context, db kv.RwDB, lvl log.Lvl, stList ...stages.SyncStage) error { + for _, st := range stList { + for _, tbl := range Tables[st] { + backup.WarmupTable(ctx, db, tbl, lvl, backup.ReadAheadThreads) + } + } + return nil +} diff --git a/core/state/intra_block_state_test.go b/core/state/intra_block_state_test.go index 45cf2442b..b189e2f11 100644 --- a/core/state/intra_block_state_test.go +++ b/core/state/intra_block_state_test.go @@ -14,8 +14,6 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -//go:build integration - package state import ( diff --git a/go.mod b/go.mod index 20794b199..1c3c49657 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/ledgerwatch/erigon go 1.19 require ( - github.com/ledgerwatch/erigon-lib v0.0.0-20230424042211-39e97b6becde + github.com/ledgerwatch/erigon-lib v0.0.0-20230427033621-8ed42874e3e2 github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20230404044759-5dec854ce336 github.com/ledgerwatch/log/v3 v3.7.0 github.com/ledgerwatch/secp256k1 v1.0.0 diff --git a/go.sum b/go.sum index 59df00ea5..ca0cee2b4 100644 --- a/go.sum +++ b/go.sum @@ -438,8 +438,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= -github.com/ledgerwatch/erigon-lib v0.0.0-20230424042211-39e97b6becde h1:6ldvFEx1d0gPIdiXYV3DGGvUjTkPn78m/5B9/6T/LUo= -github.com/ledgerwatch/erigon-lib v0.0.0-20230424042211-39e97b6becde/go.mod h1:D05f9OXc/2cnYxCyBexlu5HeIeQW9GKXynyWYzJ1F5I= +github.com/ledgerwatch/erigon-lib v0.0.0-20230427033621-8ed42874e3e2 h1:upWpgnnGdc/CZLok0F0uaVJmFhaHPDrjKsXpE6KeDsY= +github.com/ledgerwatch/erigon-lib v0.0.0-20230427033621-8ed42874e3e2/go.mod h1:D05f9OXc/2cnYxCyBexlu5HeIeQW9GKXynyWYzJ1F5I= github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20230404044759-5dec854ce336 h1:Yxmt4Wyd0RCLr7UJJAl0ApCP/f5qkWfvHfgPbnI8ghM= github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20230404044759-5dec854ce336/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo= github.com/ledgerwatch/log/v3 v3.7.0 h1:aFPEZdwZx4jzA3+/Pf8wNDN5tCI0cIolq/kfvgcM+og= diff --git a/turbo/app/backup_cmd.go b/turbo/app/backup_cmd.go new file mode 100644 index 000000000..807d2baf6 --- /dev/null +++ b/turbo/app/backup_cmd.go @@ -0,0 +1,139 @@ +package app + +import ( + "fmt" + "os" + "path/filepath" + + "github.com/c2h5oh/datasize" + "github.com/ledgerwatch/erigon-lib/common/datadir" + "github.com/ledgerwatch/erigon-lib/common/dir" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon/cmd/utils" + "github.com/ledgerwatch/erigon/cmd/utils/flags" + "github.com/ledgerwatch/erigon/turbo/backup" + "github.com/ledgerwatch/erigon/turbo/debug" + "github.com/ledgerwatch/erigon/turbo/logging" + "github.com/ledgerwatch/log/v3" + "github.com/urfave/cli/v2" +) + +// nolint +var backupCommand = cli.Command{ + Name: "backup", + Description: `Backup all databases of Erigon. +Can do backup without stopping Erigon. +Limitations: +- no support of Consensus DB (copy it manually if you need). Possible to implement in future. +- no support of datadir/snapshots folder. Possible to implement in future. Can copy it manually or rsync or symlink/mount. +- way to pipe output to compressor (lz4/zstd). Can compress target floder later or use zfs-with-enabled-compression. +- jwt tocken: copy it manually - if need. +- no support of SentryDB (datadir/nodes folder). Because seems no much reason to backup it. + +Example: erigon backup --datadir= --to.datadir= +`, + Action: doBackup, + Flags: joinFlags([]cli.Flag{ + &utils.DataDirFlag, + &ToDatadirFlag, + &BackupToPageSizeFlag, + &BackupLabelsFlag, + &BackupTablesFlag, + &WarmupThreadsFlag, + }, debug.Flags, logging.Flags), +} + +var ( + ToDatadirFlag = flags.DirectoryFlag{ + Name: "to.datadir", + Usage: "Target datadir", + Required: true, + } + BackupLabelsFlag = cli.StringFlag{ + Name: "lables", + Usage: "Name of component to backup. Example: chaindata,txpool,downloader", + } + BackupTablesFlag = cli.StringFlag{ + Name: "tables", + Usage: "One of: PlainState,HashedState", + } + BackupToPageSizeFlag = cli.StringFlag{ + Name: "to.pagesize", + Usage: utils.DbPageSizeFlag.Usage, + } + WarmupThreadsFlag = cli.Uint64Flag{ + Name: "warmup.threads", + Usage: `Erigon's db works as blocking-io: means it stops when read from disk. +It means backup speed depends on 'disk latency' (not throughput). +Can spawn many threads which will read-ahead the data and bring it to OS's PageCache. +CloudDrives (and ssd) have bad-latency and good-parallel-throughput - then having >1k of warmup threads will help.`, + Value: uint64(backup.ReadAheadThreads), + } +) + +func doBackup(cliCtx *cli.Context) error { + defer log.Info("backup done") + + ctx := cliCtx.Context + dirs := datadir.New(cliCtx.String(utils.DataDirFlag.Name)) + toDirs := datadir.New(cliCtx.String(ToDatadirFlag.Name)) + + var targetPageSize datasize.ByteSize + if cliCtx.IsSet(BackupToPageSizeFlag.Name) { + targetPageSize = flags.DBPageSizeFlagUnmarshal(cliCtx, BackupToPageSizeFlag.Name, BackupToPageSizeFlag.Usage) + } + + var lables = []kv.Label{kv.ChainDB, kv.TxPoolDB, kv.DownloaderDB} + if cliCtx.IsSet(BackupToPageSizeFlag.Name) { + lables = lables[:0] + for _, l := range utils.SplitAndTrim(cliCtx.String(BackupLabelsFlag.Name)) { + lables = append(lables, kv.UnmarshalLabel(l)) + } + } + + var tables []string + if cliCtx.IsSet(BackupTablesFlag.Name) { + tables = utils.SplitAndTrim(cliCtx.String(BackupTablesFlag.Name)) + } + + readAheadThreads := backup.ReadAheadThreads + if cliCtx.IsSet(WarmupThreadsFlag.Name) { + readAheadThreads = int(cliCtx.Uint64(WarmupThreadsFlag.Name)) + } + + //kv.SentryDB no much reason to backup + //TODO: add support of kv.ConsensusDB + for _, label := range lables { + var from, to string + switch label { + case kv.ChainDB: + from, to = dirs.Chaindata, toDirs.Chaindata + case kv.TxPoolDB: + from, to = dirs.TxPool, toDirs.TxPool + case kv.DownloaderDB: + from, to = filepath.Join(dirs.Snap, "db"), filepath.Join(toDirs.Snap, "db") + default: + panic(fmt.Sprintf("unexpected: %+v", label)) + } + + if !dir.Exist(from) { + continue + } + + if len(tables) == 0 { // if not partial backup - just drop target dir, to make backup more compact/fast (instead of clean tables) + if err := os.RemoveAll(to); err != nil { + return fmt.Errorf("mkdir: %w, %s", err, to) + } + } + if err := os.MkdirAll(to, 0740); err != nil { //owner: rw, group: r, others: - + return fmt.Errorf("mkdir: %w, %s", err, to) + } + log.Info("[backup] start", "label", label) + fromDB, toDB := backup.OpenPair(from, to, label, targetPageSize) + if err := backup.Kv2kv(ctx, fromDB, toDB, nil, readAheadThreads); err != nil { + return err + } + } + + return nil +} diff --git a/turbo/app/import.go b/turbo/app/import_cmd.go similarity index 99% rename from turbo/app/import.go rename to turbo/app/import_cmd.go index 379d922c6..f5811f8d9 100644 --- a/turbo/app/import.go +++ b/turbo/app/import_cmd.go @@ -38,7 +38,7 @@ var importCommand = cli.Command{ &utils.DataDirFlag, &utils.ChainFlag, }, - Category: "BLOCKCHAIN COMMANDS", + //Category: "BLOCKCHAIN COMMANDS", Description: ` The import command imports blocks from an RLP-encoded form. The form can be one file with several RLP-encoded blocks, or several files can be used. diff --git a/turbo/app/init.go b/turbo/app/init_cmd.go similarity index 98% rename from turbo/app/init.go rename to turbo/app/init_cmd.go index 0c7f84a0b..45e8d423f 100644 --- a/turbo/app/init.go +++ b/turbo/app/init_cmd.go @@ -22,7 +22,7 @@ var initCommand = cli.Command{ Flags: []cli.Flag{ &utils.DataDirFlag, }, - Category: "BLOCKCHAIN COMMANDS", + //Category: "BLOCKCHAIN COMMANDS", Description: ` The init command initializes a new genesis block and definition for the network. This is a destructive action and changes the network in which you will be diff --git a/turbo/app/make_app.go b/turbo/app/make_app.go index f81f4d9be..dea56d8cc 100644 --- a/turbo/app/make_app.go +++ b/turbo/app/make_app.go @@ -2,9 +2,11 @@ package app import ( + "fmt" "strings" "github.com/ledgerwatch/erigon-lib/common/datadir" + "github.com/ledgerwatch/log/v3" "github.com/urfave/cli/v2" "github.com/ledgerwatch/erigon/cmd/utils" @@ -20,9 +22,24 @@ import ( // Parameters: // * action: the main function for the application. receives `*cli.Context` with parsed command-line flags. // * cliFlags: the list of flags `cli.Flag` that the app should set and parse. By default, use `DefaultFlags()`. If you want to specify your own flag, use `append(DefaultFlags(), myFlag)` for this parameter. -func MakeApp(action cli.ActionFunc, cliFlags []cli.Flag) *cli.App { - app := cli2.NewApp(params.GitCommit, "erigon experimental cli") - app.Action = action +func MakeApp(name string, action cli.ActionFunc, cliFlags []cli.Flag) *cli.App { + app := cli2.NewApp(params.GitCommit, "erigon") + app.Name = name + app.UsageText = app.Name + ` [command] [flags]` + app.Action = func(context *cli.Context) error { + // handle case: unknown sub-command + if context.Args().Present() { + var goodNames []string + for _, c := range app.VisibleCommands() { + goodNames = append(goodNames, c.Name) + } + log.Error(fmt.Sprintf("Command '%s' not found. Available commands: %s", context.Args().First(), goodNames)) + cli.ShowAppHelpAndExit(context, 1) + } + + // run default action + return action(context) + } app.Flags = append(cliFlags, debug.Flags...) // debug flags are required app.Before = func(ctx *cli.Context) error { return debug.Setup(ctx) @@ -31,7 +48,13 @@ func MakeApp(action cli.ActionFunc, cliFlags []cli.Flag) *cli.App { debug.Exit() return nil } - app.Commands = []*cli.Command{&initCommand, &importCommand, &snapshotCommand, &supportCommand} + app.Commands = []*cli.Command{ + &initCommand, + &importCommand, + &snapshotCommand, + &supportCommand, + //&backupCommand, + } return app } diff --git a/turbo/app/snapshots.go b/turbo/app/snapshots_cmd.go similarity index 99% rename from turbo/app/snapshots.go rename to turbo/app/snapshots_cmd.go index 45e24c665..778819c7a 100644 --- a/turbo/app/snapshots.go +++ b/turbo/app/snapshots_cmd.go @@ -45,8 +45,8 @@ func joinFlags(lists ...[]cli.Flag) (res []cli.Flag) { } var snapshotCommand = cli.Command{ - Name: "snapshots", - Description: `Managing snapshots (historical data partitions)`, + Name: "snapshots", + Usage: `Managing snapshots (historical data partitions)`, Subcommands: []*cli.Command{ { Name: "index", diff --git a/turbo/app/support.go b/turbo/app/support_cmd.go similarity index 99% rename from turbo/app/support.go rename to turbo/app/support_cmd.go index b4d2bbf86..51bb9f9f7 100644 --- a/turbo/app/support.go +++ b/turbo/app/support_cmd.go @@ -44,7 +44,7 @@ var supportCommand = cli.Command{ &diagnosticsURLFlag, &insecureFlag, }, - Category: "SUPPORT COMMANDS", + //Category: "SUPPORT COMMANDS", Description: ` The support command connects a running Erigon instances to a diagnostics system specified by the URL.`, diff --git a/turbo/backup/backup.go b/turbo/backup/backup.go new file mode 100644 index 000000000..f6a0a2631 --- /dev/null +++ b/turbo/backup/backup.go @@ -0,0 +1,279 @@ +package backup + +import ( + "context" + "encoding/binary" + "encoding/hex" + "fmt" + "runtime" + "sync" + "sync/atomic" + "time" + + "github.com/c2h5oh/datasize" + common2 "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/common/dbg" + "github.com/ledgerwatch/erigon-lib/kv" + mdbx2 "github.com/ledgerwatch/erigon-lib/kv/mdbx" + "github.com/ledgerwatch/log/v3" + "github.com/torquem-ch/mdbx-go/mdbx" + "golang.org/x/exp/maps" + "golang.org/x/sync/errgroup" +) + +func OpenPair(from, to string, label kv.Label, targetPageSize datasize.ByteSize) (kv.RoDB, kv.RwDB) { + src := mdbx2.NewMDBX(log.New()).Path(from). + Label(label). + WithTableCfg(func(_ kv.TableCfg) kv.TableCfg { return kv.TablesCfgByLabel(label) }). + Flags(func(flags uint) uint { return mdbx.Readonly | mdbx.Accede }).MustOpen() + if targetPageSize <= 0 { + targetPageSize = datasize.ByteSize(src.PageSize()) + } + info, err := src.(*mdbx2.MdbxKV).Env().Info(nil) + if err != nil { + panic(err) + } + dst := mdbx2.NewMDBX(log.New()).Path(to). + Label(label). + PageSize(targetPageSize.Bytes()). + MapSize(datasize.ByteSize(info.Geo.Upper)). + WithTableCfg(func(_ kv.TableCfg) kv.TableCfg { return kv.TablesCfgByLabel(label) }). + MustOpen() + return src, dst +} +func MdbxToMdbx(ctx context.Context, from, to string, label kv.Label, tables []string, targetPageSize datasize.ByteSize, readAheadThreads int) error { + src := mdbx2.NewMDBX(log.New()).Path(from). + Label(label). + WithTableCfg(func(_ kv.TableCfg) kv.TableCfg { return kv.TablesCfgByLabel(label) }). + Flags(func(flags uint) uint { return mdbx.Readonly | mdbx.Accede }).MustOpen() + if targetPageSize <= 0 { + targetPageSize = datasize.ByteSize(src.PageSize()) + } + info, err := src.(*mdbx2.MdbxKV).Env().Info(nil) + if err != nil { + return err + } + dst := mdbx2.NewMDBX(log.New()).Path(to). + Label(label). + PageSize(targetPageSize.Bytes()). + MapSize(datasize.ByteSize(info.Geo.Upper)). + WithTableCfg(func(_ kv.TableCfg) kv.TableCfg { return kv.TablesCfgByLabel(label) }). + MustOpen() + return Kv2kv(ctx, src, dst, tables, readAheadThreads) +} + +func Kv2kv(ctx context.Context, src kv.RoDB, dst kv.RwDB, tables []string, readAheadThreads int) error { + srcTx, err1 := src.BeginRo(ctx) + if err1 != nil { + return err1 + } + defer srcTx.Rollback() + + commitEvery := time.NewTicker(5 * time.Minute) + defer commitEvery.Stop() + logEvery := time.NewTicker(20 * time.Second) + defer logEvery.Stop() + + tablesMap := src.AllTables() + if len(tables) > 0 { + tablesMapCopy := maps.Clone(tablesMap) + tablesMap = kv.TableCfg{} + for _, name := range tables { + tablesMap[name] = tablesMapCopy[name] + } + } + + for name, b := range tablesMap { + if b.IsDeprecated { + continue + } + if err := backupTable(ctx, src, srcTx, dst, name, readAheadThreads, logEvery); err != nil { + return err + } + } + log.Info("done") + return nil +} + +func backupTable(ctx context.Context, src kv.RoDB, srcTx kv.Tx, dst kv.RwDB, table string, readAheadThreads int, logEvery *time.Ticker) error { + var total uint64 + wg := sync.WaitGroup{} + defer wg.Wait() + warmupCtx, warmupCancel := context.WithCancel(ctx) + defer warmupCancel() + + wg.Add(1) + go func() { + defer wg.Done() + WarmupTable(warmupCtx, src, table, log.LvlTrace, readAheadThreads) + }() + srcC, err := srcTx.Cursor(table) + if err != nil { + return err + } + total, _ = srcC.Count() + + dstTx, err1 := dst.BeginRw(ctx) + if err1 != nil { + return err1 + } + defer dstTx.Rollback() + _ = dstTx.ClearBucket(table) + + c, err := dstTx.RwCursor(table) + if err != nil { + return err + } + casted, isDupsort := c.(kv.RwCursorDupSort) + i := uint64(0) + + for k, v, err := srcC.First(); k != nil; k, v, err = srcC.Next() { + if err != nil { + return err + } + + if isDupsort { + if err = casted.AppendDup(k, v); err != nil { + panic(err) + } + } else { + if err = c.Append(k, v); err != nil { + panic(err) + } + } + + i++ + select { + case <-ctx.Done(): + return ctx.Err() + case <-logEvery.C: + var m runtime.MemStats + dbg.ReadMemStats(&m) + log.Info("Progress", "table", table, "progress", fmt.Sprintf("%.1fm/%.1fm", float64(i)/1_000_000, float64(total)/1_000_000), "key", hex.EncodeToString(k), + "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys)) + default: + } + } + // migrate bucket sequences to native mdbx implementation + //currentID, err := srcTx.Sequence(name, 0) + //if err != nil { + // return err + //} + //_, err = dstTx.Sequence(name, currentID) + //if err != nil { + // return err + //} + if err2 := dstTx.Commit(); err2 != nil { + return err2 + } + return nil +} + +const ReadAheadThreads = 128 + +func WarmupTable(ctx context.Context, db kv.RoDB, bucket string, lvl log.Lvl, readAheadThreads int) { + var ThreadsLimit = readAheadThreads + var total uint64 + db.View(ctx, func(tx kv.Tx) error { + c, _ := tx.Cursor(bucket) + total, _ = c.Count() + return nil + }) + if total < 10_000 { + return + } + progress := atomic.Int64{} + + logEvery := time.NewTicker(20 * time.Second) + defer logEvery.Stop() + + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(ThreadsLimit) + for i := 0; i < 256; i++ { + for j := 0; j < 256; j++ { + i := i + j := j + g.Go(func() error { + return db.View(ctx, func(tx kv.Tx) error { + it, err := tx.Prefix(bucket, []byte{byte(i), byte(j)}) + if err != nil { + return err + } + for it.HasNext() { + _, _, err = it.Next() + if err != nil { + return err + } + progress.Add(1) + select { + case <-ctx.Done(): + return ctx.Err() + case <-logEvery.C: + log.Log(lvl, fmt.Sprintf("Progress: %s %.2f%%", bucket, 100*float64(progress.Load())/float64(total))) + default: + } + } + return nil + }) + }) + } + } + for i := 0; i < 1_000; i++ { + i := i + g.Go(func() error { + return db.View(ctx, func(tx kv.Tx) error { + seek := make([]byte, 8) + binary.BigEndian.PutUint64(seek, uint64(i*100_000)) + it, err := tx.Prefix(bucket, seek) + if err != nil { + return err + } + for it.HasNext() { + _, _, err = it.Next() + if err != nil { + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-logEvery.C: + log.Log(lvl, fmt.Sprintf("Progress: %s %.2f%%", bucket, 100*float64(progress.Load())/float64(total))) + default: + } + } + return nil + }) + }) + } + _ = g.Wait() +} + +func ClearTables(ctx context.Context, db kv.RoDB, tx kv.RwTx, tables ...string) error { + for _, tbl := range tables { + if err := ClearTable(ctx, db, tx, tbl); err != nil { + return err + } + } + return nil +} + +func ClearTable(ctx context.Context, db kv.RoDB, tx kv.RwTx, table string) error { + ctx, cancel := context.WithCancel(ctx) + clean := warmup(ctx, db, table) + defer func() { + cancel() + clean() + }() + log.Info("Clear", "table", table) + return tx.ClearBucket(table) +} + +func warmup(ctx context.Context, db kv.RoDB, bucket string) func() { + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + WarmupTable(ctx, db, bucket, log.LvlInfo, ReadAheadThreads) + }() + return func() { wg.Wait() } +}