diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml
new file mode 100644
index 000000000..20946426d
--- /dev/null
+++ b/.github/workflows/check.yml
@@ -0,0 +1,15 @@
+name: Check
+
+on:
+ push:
+ branches:
+ - devel
+ workflow_dispatch:
+
+jobs:
+ goreleaser:
+ runs-on: ubuntu-latest
+ steps:
+ - run: echo ${GITHUB_REF}
+ - run: echo ${GITHUB_REF#refs/tags/}
+ - run: echo ${GITHUB_REF##*/}
diff --git a/cmd/caplin-phase1/main.go b/cmd/caplin-phase1/main.go
index 262b02c1b..656a05ba2 100644
--- a/cmd/caplin-phase1/main.go
+++ b/cmd/caplin-phase1/main.go
@@ -36,7 +36,7 @@ import (
)
func main() {
- app := lightclientapp.MakeApp(runCaplinNode, flags.LCDefaultFlags)
+ app := lightclientapp.MakeApp("caplin-phase1", runCaplinNode, flags.LCDefaultFlags)
if err := app.Run(os.Args); err != nil {
_, printErr := fmt.Fprintln(os.Stderr, err)
if printErr != nil {
diff --git a/cmd/devnet/node/node.go b/cmd/devnet/node/node.go
index 66b00c10a..283ddbc6f 100644
--- a/cmd/devnet/node/node.go
+++ b/cmd/devnet/node/node.go
@@ -64,7 +64,7 @@ func StartNode(wg *sync.WaitGroup, args []string) {
os.Exit(1)
}()
- app := erigonapp.MakeApp(runNode, erigoncli.DefaultFlags)
+ app := erigonapp.MakeApp("devnet", runNode, erigoncli.DefaultFlags)
nodeNumber++ // increment the number of nodes on the network
if err := app.Run(args); err != nil {
_, printErr := fmt.Fprintln(os.Stderr, err)
diff --git a/cmd/erigon-cl/main.go b/cmd/erigon-cl/main.go
index 8a78f90c2..3391962da 100644
--- a/cmd/erigon-cl/main.go
+++ b/cmd/erigon-cl/main.go
@@ -32,7 +32,7 @@ import (
)
func main() {
- app := sentinelapp.MakeApp(runConsensusLayerNode, flags.CLDefaultFlags)
+ app := sentinelapp.MakeApp("erigon-cl", runConsensusLayerNode, flags.CLDefaultFlags)
if err := app.Run(os.Args); err != nil {
_, printErr := fmt.Fprintln(os.Stderr, err)
if printErr != nil {
diff --git a/cmd/erigon-el/main.go b/cmd/erigon-el/main.go
index 0c9159991..3e954b2e9 100644
--- a/cmd/erigon-el/main.go
+++ b/cmd/erigon-el/main.go
@@ -34,7 +34,7 @@ func main() {
os.Exit(1)
}()
- app := erigonapp.MakeApp(runErigon, erigoncli.DefaultFlags)
+ app := erigonapp.MakeApp("erigon-el", runErigon, erigoncli.DefaultFlags)
if err := app.Run(os.Args); err != nil {
_, printErr := fmt.Fprintln(os.Stderr, err)
if printErr != nil {
diff --git a/cmd/erigon/main.go b/cmd/erigon/main.go
index 432bcb0f4..e18380211 100644
--- a/cmd/erigon/main.go
+++ b/cmd/erigon/main.go
@@ -32,7 +32,7 @@ func main() {
os.Exit(1)
}()
- app := erigonapp.MakeApp(runErigon, erigoncli.DefaultFlags)
+ app := erigonapp.MakeApp("erigon", runErigon, erigoncli.DefaultFlags)
if err := app.Run(os.Args); err != nil {
_, printErr := fmt.Fprintln(os.Stderr, err)
if printErr != nil {
diff --git a/cmd/erigoncustom/main.go b/cmd/erigoncustom/main.go
index ad3ef4748..1870785fc 100644
--- a/cmd/erigoncustom/main.go
+++ b/cmd/erigoncustom/main.go
@@ -24,7 +24,7 @@ const (
// the regular main function
func main() {
// initializing Erigon application here and providing our custom flag
- app := erigonapp.MakeApp(runErigon,
+ app := erigonapp.MakeApp("erigoncustom", runErigon,
append(erigoncli.DefaultFlags, &flag), // always use DefaultFlags, but add a new one in the end.
)
if err := app.Run(os.Args); err != nil {
diff --git a/cmd/evm/internal/t8ntool/flags.go b/cmd/evm/internal/t8ntool/flags.go
index 3a42f32e5..fa0886e7f 100644
--- a/cmd/evm/internal/t8ntool/flags.go
+++ b/cmd/evm/internal/t8ntool/flags.go
@@ -103,7 +103,7 @@ var (
}
VerbosityFlag = cli.IntFlag{
Name: "verbosity",
- Usage: "sets the verbosity level",
+ Usage: "Deprecated. Use --log.console.verbosity, --log.dir.verbosity, --torrent.verbosity, --database.verbosity",
Value: 3,
}
)
diff --git a/cmd/integration/commands/refetence_db.go b/cmd/integration/commands/refetence_db.go
index 267105f3b..3e8fcb7b5 100644
--- a/cmd/integration/commands/refetence_db.go
+++ b/cmd/integration/commands/refetence_db.go
@@ -4,24 +4,20 @@ import (
"bufio"
"bytes"
"context"
- "encoding/hex"
"errors"
"fmt"
"os"
- "runtime"
"strings"
"sync/atomic"
"time"
common2 "github.com/ledgerwatch/erigon-lib/common"
- "github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/erigon-lib/kv"
mdbx2 "github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/erigon/common"
- "github.com/ledgerwatch/erigon/core/rawdb/rawdbreset"
+ "github.com/ledgerwatch/erigon/turbo/backup"
"github.com/ledgerwatch/log/v3"
"github.com/spf13/cobra"
- "github.com/torquem-ch/mdbx-go/mdbx"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
@@ -99,8 +95,9 @@ var cmdMdbxToMdbx = &cobra.Command{
Short: "copy data from '--chaindata' to '--chaindata.to'",
Run: func(cmd *cobra.Command, args []string) {
ctx, _ := common2.RootContext()
- logger := log.New()
- err := mdbxToMdbx(ctx, logger, chaindata, toChaindata)
+
+ from, to := backup.OpenPair(chaindata, toChaindata, kv.ChainDB, 0)
+ err := backup.Kv2kv(ctx, from, to, nil, backup.ReadAheadThreads)
if err != nil && !errors.Is(err, context.Canceled) {
if !errors.Is(err, context.Canceled) {
log.Error(err.Error())
@@ -427,96 +424,3 @@ MainLoop:
return nil
}
-
-func mdbxToMdbx(ctx context.Context, logger log.Logger, from, to string) error {
- src := mdbx2.NewMDBX(logger).Path(from).Flags(func(flags uint) uint { return mdbx.Readonly | mdbx.Accede }).MustOpen()
- dst := mdbx2.NewMDBX(logger).Path(to).
- WriteMap().
- Flags(func(flags uint) uint { return flags | mdbx.NoMemInit | mdbx.WriteMap | mdbx.Accede }).
- MustOpen()
- return kv2kv(ctx, src, dst)
-}
-
-func kv2kv(ctx context.Context, src, dst kv.RwDB) error {
- srcTx, err1 := src.BeginRo(ctx)
- if err1 != nil {
- return err1
- }
- defer srcTx.Rollback()
-
- commitEvery := time.NewTicker(5 * time.Minute)
- defer commitEvery.Stop()
- logEvery := time.NewTicker(20 * time.Second)
- defer logEvery.Stop()
-
- var total uint64
- for name, b := range src.AllBuckets() {
- if b.IsDeprecated {
- continue
- }
- go rawdbreset.WarmupTable(ctx, src, name, log.LvlTrace)
- srcC, err := srcTx.Cursor(name)
- if err != nil {
- return err
- }
- total, _ = srcC.Count()
-
- dstTx, err1 := dst.BeginRw(ctx)
- if err1 != nil {
- return err1
- }
- defer dstTx.Rollback()
- _ = dstTx.ClearBucket(name)
-
- c, err := dstTx.RwCursor(name)
- if err != nil {
- return err
- }
- casted, isDupsort := c.(kv.RwCursorDupSort)
- i := uint64(0)
-
- for k, v, err := srcC.First(); k != nil; k, v, err = srcC.Next() {
- if err != nil {
- return err
- }
-
- if isDupsort {
- if err = casted.AppendDup(k, v); err != nil {
- panic(err)
- }
- } else {
- if err = c.Append(k, v); err != nil {
- panic(err)
- }
- }
-
- i++
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-logEvery.C:
- var m runtime.MemStats
- dbg.ReadMemStats(&m)
- log.Info("Progress", "bucket", name, "progress", fmt.Sprintf("%.1fm/%.1fm", float64(i)/1_000_000, float64(total)/1_000_000), "key", hex.EncodeToString(k),
- "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys))
- default:
- }
- }
-
- // migrate bucket sequences to native mdbx implementation
- //currentID, err := srcTx.Sequence(name, 0)
- //if err != nil {
- // return err
- //}
- //_, err = dstTx.Sequence(name, currentID)
- //if err != nil {
- // return err
- //}
- if err2 := dstTx.Commit(); err2 != nil {
- return err2
- }
- }
- srcTx.Rollback()
- log.Info("done")
- return nil
-}
diff --git a/cmd/sentinel/main.go b/cmd/sentinel/main.go
index d06a4d349..e3a722ad8 100644
--- a/cmd/sentinel/main.go
+++ b/cmd/sentinel/main.go
@@ -29,7 +29,7 @@ import (
)
func main() {
- app := sentinelapp.MakeApp(runSentinelNode, flags.CLDefaultFlags)
+ app := sentinelapp.MakeApp("sentinel", runSentinelNode, flags.CLDefaultFlags)
if err := app.Run(os.Args); err != nil {
_, printErr := fmt.Fprintln(os.Stderr, err)
if printErr != nil {
diff --git a/cmd/txpool/main.go b/cmd/txpool/main.go
index 138d871b7..3646a894e 100644
--- a/cmd/txpool/main.go
+++ b/cmd/txpool/main.go
@@ -81,7 +81,7 @@ func init() {
var rootCmd = &cobra.Command{
Use: "txpool",
- Short: "Launch externa Transaction Pool instance - same as built-into Erigon, but as independent Service",
+ Short: "Launch external Transaction Pool instance - same as built-into Erigon, but as independent Process",
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
return debug.SetupCobra(cmd)
},
diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go
index 54a2fa07a..06b0ea8a7 100644
--- a/cmd/utils/flags.go
+++ b/cmd/utils/flags.go
@@ -1151,29 +1151,16 @@ func setDataDir(ctx *cli.Context, cfg *nodecfg.Config) {
}
cfg.Dirs = datadir.New(cfg.Dirs.DataDir)
- if err := cfg.MdbxPageSize.UnmarshalText([]byte(ctx.String(DbPageSizeFlag.Name))); err != nil {
- panic(err)
- }
+ cfg.MdbxPageSize = flags.DBPageSizeFlagUnmarshal(ctx, DbPageSizeFlag.Name, DbPageSizeFlag.Usage)
if err := cfg.MdbxDBSizeLimit.UnmarshalText([]byte(ctx.String(DbSizeLimitFlag.Name))); err != nil {
panic(err)
}
- sz := cfg.MdbxPageSize.Bytes()
- if !isPowerOfTwo(sz) || sz < 256 || sz > 64*1024 {
- panic(fmt.Errorf("invalid --db.pageSize: %s=%d, see: %s", ctx.String(DbPageSizeFlag.Name), sz, DbPageSizeFlag.Usage))
- }
szLimit := cfg.MdbxDBSizeLimit.Bytes()
- if szLimit%sz != 0 || szLimit < 256 {
+ if szLimit%szLimit != 0 || szLimit < 256 {
panic(fmt.Errorf("invalid --db.size.limit: %s=%d, see: %s", ctx.String(DbSizeLimitFlag.Name), szLimit, DbSizeLimitFlag.Usage))
}
}
-func isPowerOfTwo(n uint64) bool {
- if n == 0 { //corner case: if n is zero it will also consider as power 2
- return true
- }
- return n&(n-1) == 0
-}
-
func setDataDirCobra(f *pflag.FlagSet, cfg *nodecfg.Config) {
dirname, err := f.GetString(DataDirFlag.Name)
if err != nil {
diff --git a/cmd/utils/flags/flags.go b/cmd/utils/flags/flags.go
index 683a3cd4a..c4e583424 100644
--- a/cmd/utils/flags/flags.go
+++ b/cmd/utils/flags/flags.go
@@ -20,12 +20,14 @@ import (
"encoding"
"errors"
"flag"
+ "fmt"
"math/big"
"os"
"os/user"
"path/filepath"
"strings"
+ "github.com/c2h5oh/datasize"
"github.com/ledgerwatch/erigon/common/math"
"github.com/urfave/cli/v2"
)
@@ -342,3 +344,22 @@ func eachName(f cli.Flag, fn func(string)) {
fn(name)
}
}
+
+func DBPageSizeFlagUnmarshal(cliCtx *cli.Context, flagName, flagUsage string) datasize.ByteSize {
+ var pageSize datasize.ByteSize
+ if err := pageSize.UnmarshalText([]byte(cliCtx.String(flagName))); err != nil {
+ panic(err)
+ }
+ sz := pageSize.Bytes()
+ if !isPowerOfTwo(sz) || sz < 256 || sz > 64*1024 {
+ panic(fmt.Errorf("invalid --%s: %d, see: %s", flagName, sz, flagUsage))
+ }
+ return pageSize
+}
+
+func isPowerOfTwo(n uint64) bool {
+ if n == 0 { //corner case: if n is zero it will also consider as power 2
+ return true
+ }
+ return n&(n-1) == 0
+}
diff --git a/core/rawdb/rawdbreset/reset_stages.go b/core/rawdb/rawdbreset/reset_stages.go
index 185ad2108..e63b82ebb 100644
--- a/core/rawdb/rawdbreset/reset_stages.go
+++ b/core/rawdb/rawdbreset/reset_stages.go
@@ -2,11 +2,7 @@ package rawdbreset
import (
"context"
- "encoding/binary"
"fmt"
- "sync"
- "sync/atomic"
- "time"
"github.com/ledgerwatch/erigon-lib/chain"
"github.com/ledgerwatch/erigon-lib/common/datadir"
@@ -14,16 +10,15 @@ import (
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/kvcfg"
"github.com/ledgerwatch/erigon-lib/state"
- "github.com/ledgerwatch/log/v3"
- "golang.org/x/sync/errgroup"
-
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/eth/stagedsync"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
+ "github.com/ledgerwatch/erigon/turbo/backup"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
+ "github.com/ledgerwatch/log/v3"
)
func ResetState(db kv.RwDB, ctx context.Context, chain string, tmpDir string) error {
@@ -99,7 +94,7 @@ func ResetBlocks(tx kv.RwTx, db kv.RoDB, snapshots *snapshotsync.RoSnapshots, ag
ethtx = kv.EthTxV3
}
- if err := clearTables(context.Background(), db, tx,
+ if err := backup.ClearTables(context.Background(), db, tx,
kv.NonCanonicalTxs,
ethtx,
kv.MaxTxNum,
@@ -126,7 +121,7 @@ func ResetBlocks(tx kv.RwTx, db kv.RoDB, snapshots *snapshotsync.RoSnapshots, ag
return nil
}
func ResetSenders(ctx context.Context, db kv.RwDB, tx kv.RwTx) error {
- if err := clearTables(ctx, db, tx, kv.Senders); err != nil {
+ if err := backup.ClearTables(ctx, db, tx, kv.Senders); err != nil {
return nil
}
return clearStageProgress(tx, stages.Senders)
@@ -134,12 +129,12 @@ func ResetSenders(ctx context.Context, db kv.RwDB, tx kv.RwTx) error {
func WarmupExec(ctx context.Context, db kv.RwDB) (err error) {
for _, tbl := range stateBuckets {
- WarmupTable(ctx, db, tbl, log.LvlInfo)
+ backup.WarmupTable(ctx, db, tbl, log.LvlInfo, backup.ReadAheadThreads)
}
historyV3 := kvcfg.HistoryV3.FromDB(db)
if historyV3 { //hist v2 is too big, if you have so much ram, just use `cat mdbx.dat > /dev/null` to warmup
for _, tbl := range stateHistoryV3Buckets {
- WarmupTable(ctx, db, tbl, log.LvlInfo)
+ backup.WarmupTable(ctx, db, tbl, log.LvlInfo, backup.ReadAheadThreads)
}
}
return
@@ -157,7 +152,7 @@ func ResetExec(ctx context.Context, db kv.RwDB, chain string, tmpDir string) (er
return err
}
- if err := clearTables(ctx, db, tx, stateBuckets...); err != nil {
+ if err := backup.ClearTables(ctx, db, tx, stateBuckets...); err != nil {
return nil
}
for _, b := range stateBuckets {
@@ -166,7 +161,7 @@ func ResetExec(ctx context.Context, db kv.RwDB, chain string, tmpDir string) (er
}
}
- if err := clearTables(ctx, db, tx, stateHistoryBuckets...); err != nil {
+ if err := backup.ClearTables(ctx, db, tx, stateHistoryBuckets...); err != nil {
return nil
}
if !historyV3 {
@@ -231,135 +226,6 @@ var stateHistoryV4Buckets = []string{
kv.CommitmentKeys, kv.CommitmentVals, kv.CommitmentHistoryKeys, kv.CommitmentHistoryVals, kv.CommitmentIdx,
}
-func WarmupTable(ctx context.Context, db kv.RoDB, bucket string, lvl log.Lvl) {
- const ThreadsLimit = 128
- var total uint64
- db.View(ctx, func(tx kv.Tx) error {
- c, _ := tx.Cursor(bucket)
- total, _ = c.Count()
- return nil
- })
- if total < 10_000 {
- return
- }
- progress := atomic.Int64{}
-
- logEvery := time.NewTicker(20 * time.Second)
- defer logEvery.Stop()
-
- g, ctx := errgroup.WithContext(ctx)
- g.SetLimit(ThreadsLimit)
- for i := 0; i < 256; i++ {
- for j := 0; j < 256; j++ {
- i := i
- j := j
- g.Go(func() error {
- return db.View(ctx, func(tx kv.Tx) error {
- it, err := tx.Prefix(bucket, []byte{byte(i), byte(j)})
- if err != nil {
- return err
- }
- for it.HasNext() {
- _, _, err = it.Next()
- if err != nil {
- return err
- }
- progress.Add(1)
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-logEvery.C:
- log.Log(lvl, fmt.Sprintf("Progress: %s %.2f%%", bucket, 100*float64(progress.Load())/float64(total)))
- default:
- }
- }
- return nil
- })
- })
- }
- }
- for i := 0; i < 1_000; i++ {
- i := i
- g.Go(func() error {
- return db.View(ctx, func(tx kv.Tx) error {
- seek := make([]byte, 8)
- binary.BigEndian.PutUint64(seek, uint64(i*100_000))
- it, err := tx.Prefix(bucket, seek)
- if err != nil {
- return err
- }
- for it.HasNext() {
- _, _, err = it.Next()
- if err != nil {
- return err
- }
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-logEvery.C:
- log.Log(lvl, fmt.Sprintf("Progress: %s %.2f%%", bucket, 100*float64(progress.Load())/float64(total)))
- default:
- }
- }
- return nil
- })
- })
- }
- _ = g.Wait()
-}
-func Warmup(ctx context.Context, db kv.RwDB, lvl log.Lvl, stList ...stages.SyncStage) error {
- for _, st := range stList {
- for _, tbl := range Tables[st] {
- WarmupTable(ctx, db, tbl, lvl)
- }
- }
- return nil
-}
-
-func Reset(ctx context.Context, db kv.RwDB, stagesList ...stages.SyncStage) error {
- return db.Update(ctx, func(tx kv.RwTx) error {
- for _, st := range stagesList {
- if err := clearTables(ctx, db, tx, Tables[st]...); err != nil {
- return err
- }
- if err := clearStageProgress(tx, stagesList...); err != nil {
- return err
- }
- }
- return nil
- })
-}
-
-func warmup(ctx context.Context, db kv.RoDB, bucket string) func() {
- wg := sync.WaitGroup{}
- wg.Add(1)
- go func() {
- defer wg.Done()
- WarmupTable(ctx, db, bucket, log.LvlInfo)
- }()
- return func() { wg.Wait() }
-}
-
-func clearTables(ctx context.Context, db kv.RoDB, tx kv.RwTx, tables ...string) error {
- for _, tbl := range tables {
- if err := clearTable(ctx, db, tx, tbl); err != nil {
- return err
- }
- }
- return nil
-}
-
-func clearTable(ctx context.Context, db kv.RoDB, tx kv.RwTx, table string) error {
- ctx, cancel := context.WithCancel(ctx)
- clean := warmup(ctx, db, table)
- defer func() {
- cancel()
- clean()
- }()
- log.Info("Clear", "table", table)
- return tx.ClearBucket(table)
-}
-
func clearStageProgress(tx kv.RwTx, stagesList ...stages.SyncStage) error {
for _, stage := range stagesList {
if err := stages.SaveStageProgress(tx, stage, 0); err != nil {
@@ -371,3 +237,25 @@ func clearStageProgress(tx kv.RwTx, stagesList ...stages.SyncStage) error {
}
return nil
}
+
+func Reset(ctx context.Context, db kv.RwDB, stagesList ...stages.SyncStage) error {
+ return db.Update(ctx, func(tx kv.RwTx) error {
+ for _, st := range stagesList {
+ if err := backup.ClearTables(ctx, db, tx, Tables[st]...); err != nil {
+ return err
+ }
+ if err := clearStageProgress(tx, stagesList...); err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+}
+func Warmup(ctx context.Context, db kv.RwDB, lvl log.Lvl, stList ...stages.SyncStage) error {
+ for _, st := range stList {
+ for _, tbl := range Tables[st] {
+ backup.WarmupTable(ctx, db, tbl, lvl, backup.ReadAheadThreads)
+ }
+ }
+ return nil
+}
diff --git a/core/state/intra_block_state_test.go b/core/state/intra_block_state_test.go
index 45cf2442b..b189e2f11 100644
--- a/core/state/intra_block_state_test.go
+++ b/core/state/intra_block_state_test.go
@@ -14,8 +14,6 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see .
-//go:build integration
-
package state
import (
diff --git a/go.mod b/go.mod
index 20794b199..1c3c49657 100644
--- a/go.mod
+++ b/go.mod
@@ -3,7 +3,7 @@ module github.com/ledgerwatch/erigon
go 1.19
require (
- github.com/ledgerwatch/erigon-lib v0.0.0-20230424042211-39e97b6becde
+ github.com/ledgerwatch/erigon-lib v0.0.0-20230427033621-8ed42874e3e2
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20230404044759-5dec854ce336
github.com/ledgerwatch/log/v3 v3.7.0
github.com/ledgerwatch/secp256k1 v1.0.0
diff --git a/go.sum b/go.sum
index 59df00ea5..ca0cee2b4 100644
--- a/go.sum
+++ b/go.sum
@@ -438,8 +438,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
-github.com/ledgerwatch/erigon-lib v0.0.0-20230424042211-39e97b6becde h1:6ldvFEx1d0gPIdiXYV3DGGvUjTkPn78m/5B9/6T/LUo=
-github.com/ledgerwatch/erigon-lib v0.0.0-20230424042211-39e97b6becde/go.mod h1:D05f9OXc/2cnYxCyBexlu5HeIeQW9GKXynyWYzJ1F5I=
+github.com/ledgerwatch/erigon-lib v0.0.0-20230427033621-8ed42874e3e2 h1:upWpgnnGdc/CZLok0F0uaVJmFhaHPDrjKsXpE6KeDsY=
+github.com/ledgerwatch/erigon-lib v0.0.0-20230427033621-8ed42874e3e2/go.mod h1:D05f9OXc/2cnYxCyBexlu5HeIeQW9GKXynyWYzJ1F5I=
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20230404044759-5dec854ce336 h1:Yxmt4Wyd0RCLr7UJJAl0ApCP/f5qkWfvHfgPbnI8ghM=
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20230404044759-5dec854ce336/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo=
github.com/ledgerwatch/log/v3 v3.7.0 h1:aFPEZdwZx4jzA3+/Pf8wNDN5tCI0cIolq/kfvgcM+og=
diff --git a/turbo/app/backup_cmd.go b/turbo/app/backup_cmd.go
new file mode 100644
index 000000000..807d2baf6
--- /dev/null
+++ b/turbo/app/backup_cmd.go
@@ -0,0 +1,139 @@
+package app
+
+import (
+ "fmt"
+ "os"
+ "path/filepath"
+
+ "github.com/c2h5oh/datasize"
+ "github.com/ledgerwatch/erigon-lib/common/datadir"
+ "github.com/ledgerwatch/erigon-lib/common/dir"
+ "github.com/ledgerwatch/erigon-lib/kv"
+ "github.com/ledgerwatch/erigon/cmd/utils"
+ "github.com/ledgerwatch/erigon/cmd/utils/flags"
+ "github.com/ledgerwatch/erigon/turbo/backup"
+ "github.com/ledgerwatch/erigon/turbo/debug"
+ "github.com/ledgerwatch/erigon/turbo/logging"
+ "github.com/ledgerwatch/log/v3"
+ "github.com/urfave/cli/v2"
+)
+
+// nolint
+var backupCommand = cli.Command{
+ Name: "backup",
+ Description: `Backup all databases of Erigon.
+Can do backup without stopping Erigon.
+Limitations:
+- no support of Consensus DB (copy it manually if you need). Possible to implement in future.
+- no support of datadir/snapshots folder. Possible to implement in future. Can copy it manually or rsync or symlink/mount.
+- way to pipe output to compressor (lz4/zstd). Can compress target floder later or use zfs-with-enabled-compression.
+- jwt tocken: copy it manually - if need.
+- no support of SentryDB (datadir/nodes folder). Because seems no much reason to backup it.
+
+Example: erigon backup --datadir= --to.datadir=
+`,
+ Action: doBackup,
+ Flags: joinFlags([]cli.Flag{
+ &utils.DataDirFlag,
+ &ToDatadirFlag,
+ &BackupToPageSizeFlag,
+ &BackupLabelsFlag,
+ &BackupTablesFlag,
+ &WarmupThreadsFlag,
+ }, debug.Flags, logging.Flags),
+}
+
+var (
+ ToDatadirFlag = flags.DirectoryFlag{
+ Name: "to.datadir",
+ Usage: "Target datadir",
+ Required: true,
+ }
+ BackupLabelsFlag = cli.StringFlag{
+ Name: "lables",
+ Usage: "Name of component to backup. Example: chaindata,txpool,downloader",
+ }
+ BackupTablesFlag = cli.StringFlag{
+ Name: "tables",
+ Usage: "One of: PlainState,HashedState",
+ }
+ BackupToPageSizeFlag = cli.StringFlag{
+ Name: "to.pagesize",
+ Usage: utils.DbPageSizeFlag.Usage,
+ }
+ WarmupThreadsFlag = cli.Uint64Flag{
+ Name: "warmup.threads",
+ Usage: `Erigon's db works as blocking-io: means it stops when read from disk.
+It means backup speed depends on 'disk latency' (not throughput).
+Can spawn many threads which will read-ahead the data and bring it to OS's PageCache.
+CloudDrives (and ssd) have bad-latency and good-parallel-throughput - then having >1k of warmup threads will help.`,
+ Value: uint64(backup.ReadAheadThreads),
+ }
+)
+
+func doBackup(cliCtx *cli.Context) error {
+ defer log.Info("backup done")
+
+ ctx := cliCtx.Context
+ dirs := datadir.New(cliCtx.String(utils.DataDirFlag.Name))
+ toDirs := datadir.New(cliCtx.String(ToDatadirFlag.Name))
+
+ var targetPageSize datasize.ByteSize
+ if cliCtx.IsSet(BackupToPageSizeFlag.Name) {
+ targetPageSize = flags.DBPageSizeFlagUnmarshal(cliCtx, BackupToPageSizeFlag.Name, BackupToPageSizeFlag.Usage)
+ }
+
+ var lables = []kv.Label{kv.ChainDB, kv.TxPoolDB, kv.DownloaderDB}
+ if cliCtx.IsSet(BackupToPageSizeFlag.Name) {
+ lables = lables[:0]
+ for _, l := range utils.SplitAndTrim(cliCtx.String(BackupLabelsFlag.Name)) {
+ lables = append(lables, kv.UnmarshalLabel(l))
+ }
+ }
+
+ var tables []string
+ if cliCtx.IsSet(BackupTablesFlag.Name) {
+ tables = utils.SplitAndTrim(cliCtx.String(BackupTablesFlag.Name))
+ }
+
+ readAheadThreads := backup.ReadAheadThreads
+ if cliCtx.IsSet(WarmupThreadsFlag.Name) {
+ readAheadThreads = int(cliCtx.Uint64(WarmupThreadsFlag.Name))
+ }
+
+ //kv.SentryDB no much reason to backup
+ //TODO: add support of kv.ConsensusDB
+ for _, label := range lables {
+ var from, to string
+ switch label {
+ case kv.ChainDB:
+ from, to = dirs.Chaindata, toDirs.Chaindata
+ case kv.TxPoolDB:
+ from, to = dirs.TxPool, toDirs.TxPool
+ case kv.DownloaderDB:
+ from, to = filepath.Join(dirs.Snap, "db"), filepath.Join(toDirs.Snap, "db")
+ default:
+ panic(fmt.Sprintf("unexpected: %+v", label))
+ }
+
+ if !dir.Exist(from) {
+ continue
+ }
+
+ if len(tables) == 0 { // if not partial backup - just drop target dir, to make backup more compact/fast (instead of clean tables)
+ if err := os.RemoveAll(to); err != nil {
+ return fmt.Errorf("mkdir: %w, %s", err, to)
+ }
+ }
+ if err := os.MkdirAll(to, 0740); err != nil { //owner: rw, group: r, others: -
+ return fmt.Errorf("mkdir: %w, %s", err, to)
+ }
+ log.Info("[backup] start", "label", label)
+ fromDB, toDB := backup.OpenPair(from, to, label, targetPageSize)
+ if err := backup.Kv2kv(ctx, fromDB, toDB, nil, readAheadThreads); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
diff --git a/turbo/app/import.go b/turbo/app/import_cmd.go
similarity index 99%
rename from turbo/app/import.go
rename to turbo/app/import_cmd.go
index 379d922c6..f5811f8d9 100644
--- a/turbo/app/import.go
+++ b/turbo/app/import_cmd.go
@@ -38,7 +38,7 @@ var importCommand = cli.Command{
&utils.DataDirFlag,
&utils.ChainFlag,
},
- Category: "BLOCKCHAIN COMMANDS",
+ //Category: "BLOCKCHAIN COMMANDS",
Description: `
The import command imports blocks from an RLP-encoded form. The form can be one file
with several RLP-encoded blocks, or several files can be used.
diff --git a/turbo/app/init.go b/turbo/app/init_cmd.go
similarity index 98%
rename from turbo/app/init.go
rename to turbo/app/init_cmd.go
index 0c7f84a0b..45e8d423f 100644
--- a/turbo/app/init.go
+++ b/turbo/app/init_cmd.go
@@ -22,7 +22,7 @@ var initCommand = cli.Command{
Flags: []cli.Flag{
&utils.DataDirFlag,
},
- Category: "BLOCKCHAIN COMMANDS",
+ //Category: "BLOCKCHAIN COMMANDS",
Description: `
The init command initializes a new genesis block and definition for the network.
This is a destructive action and changes the network in which you will be
diff --git a/turbo/app/make_app.go b/turbo/app/make_app.go
index f81f4d9be..dea56d8cc 100644
--- a/turbo/app/make_app.go
+++ b/turbo/app/make_app.go
@@ -2,9 +2,11 @@
package app
import (
+ "fmt"
"strings"
"github.com/ledgerwatch/erigon-lib/common/datadir"
+ "github.com/ledgerwatch/log/v3"
"github.com/urfave/cli/v2"
"github.com/ledgerwatch/erigon/cmd/utils"
@@ -20,9 +22,24 @@ import (
// Parameters:
// * action: the main function for the application. receives `*cli.Context` with parsed command-line flags.
// * cliFlags: the list of flags `cli.Flag` that the app should set and parse. By default, use `DefaultFlags()`. If you want to specify your own flag, use `append(DefaultFlags(), myFlag)` for this parameter.
-func MakeApp(action cli.ActionFunc, cliFlags []cli.Flag) *cli.App {
- app := cli2.NewApp(params.GitCommit, "erigon experimental cli")
- app.Action = action
+func MakeApp(name string, action cli.ActionFunc, cliFlags []cli.Flag) *cli.App {
+ app := cli2.NewApp(params.GitCommit, "erigon")
+ app.Name = name
+ app.UsageText = app.Name + ` [command] [flags]`
+ app.Action = func(context *cli.Context) error {
+ // handle case: unknown sub-command
+ if context.Args().Present() {
+ var goodNames []string
+ for _, c := range app.VisibleCommands() {
+ goodNames = append(goodNames, c.Name)
+ }
+ log.Error(fmt.Sprintf("Command '%s' not found. Available commands: %s", context.Args().First(), goodNames))
+ cli.ShowAppHelpAndExit(context, 1)
+ }
+
+ // run default action
+ return action(context)
+ }
app.Flags = append(cliFlags, debug.Flags...) // debug flags are required
app.Before = func(ctx *cli.Context) error {
return debug.Setup(ctx)
@@ -31,7 +48,13 @@ func MakeApp(action cli.ActionFunc, cliFlags []cli.Flag) *cli.App {
debug.Exit()
return nil
}
- app.Commands = []*cli.Command{&initCommand, &importCommand, &snapshotCommand, &supportCommand}
+ app.Commands = []*cli.Command{
+ &initCommand,
+ &importCommand,
+ &snapshotCommand,
+ &supportCommand,
+ //&backupCommand,
+ }
return app
}
diff --git a/turbo/app/snapshots.go b/turbo/app/snapshots_cmd.go
similarity index 99%
rename from turbo/app/snapshots.go
rename to turbo/app/snapshots_cmd.go
index 45e24c665..778819c7a 100644
--- a/turbo/app/snapshots.go
+++ b/turbo/app/snapshots_cmd.go
@@ -45,8 +45,8 @@ func joinFlags(lists ...[]cli.Flag) (res []cli.Flag) {
}
var snapshotCommand = cli.Command{
- Name: "snapshots",
- Description: `Managing snapshots (historical data partitions)`,
+ Name: "snapshots",
+ Usage: `Managing snapshots (historical data partitions)`,
Subcommands: []*cli.Command{
{
Name: "index",
diff --git a/turbo/app/support.go b/turbo/app/support_cmd.go
similarity index 99%
rename from turbo/app/support.go
rename to turbo/app/support_cmd.go
index b4d2bbf86..51bb9f9f7 100644
--- a/turbo/app/support.go
+++ b/turbo/app/support_cmd.go
@@ -44,7 +44,7 @@ var supportCommand = cli.Command{
&diagnosticsURLFlag,
&insecureFlag,
},
- Category: "SUPPORT COMMANDS",
+ //Category: "SUPPORT COMMANDS",
Description: `
The support command connects a running Erigon instances to a diagnostics system specified
by the URL.`,
diff --git a/turbo/backup/backup.go b/turbo/backup/backup.go
new file mode 100644
index 000000000..f6a0a2631
--- /dev/null
+++ b/turbo/backup/backup.go
@@ -0,0 +1,279 @@
+package backup
+
+import (
+ "context"
+ "encoding/binary"
+ "encoding/hex"
+ "fmt"
+ "runtime"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/c2h5oh/datasize"
+ common2 "github.com/ledgerwatch/erigon-lib/common"
+ "github.com/ledgerwatch/erigon-lib/common/dbg"
+ "github.com/ledgerwatch/erigon-lib/kv"
+ mdbx2 "github.com/ledgerwatch/erigon-lib/kv/mdbx"
+ "github.com/ledgerwatch/log/v3"
+ "github.com/torquem-ch/mdbx-go/mdbx"
+ "golang.org/x/exp/maps"
+ "golang.org/x/sync/errgroup"
+)
+
+func OpenPair(from, to string, label kv.Label, targetPageSize datasize.ByteSize) (kv.RoDB, kv.RwDB) {
+ src := mdbx2.NewMDBX(log.New()).Path(from).
+ Label(label).
+ WithTableCfg(func(_ kv.TableCfg) kv.TableCfg { return kv.TablesCfgByLabel(label) }).
+ Flags(func(flags uint) uint { return mdbx.Readonly | mdbx.Accede }).MustOpen()
+ if targetPageSize <= 0 {
+ targetPageSize = datasize.ByteSize(src.PageSize())
+ }
+ info, err := src.(*mdbx2.MdbxKV).Env().Info(nil)
+ if err != nil {
+ panic(err)
+ }
+ dst := mdbx2.NewMDBX(log.New()).Path(to).
+ Label(label).
+ PageSize(targetPageSize.Bytes()).
+ MapSize(datasize.ByteSize(info.Geo.Upper)).
+ WithTableCfg(func(_ kv.TableCfg) kv.TableCfg { return kv.TablesCfgByLabel(label) }).
+ MustOpen()
+ return src, dst
+}
+func MdbxToMdbx(ctx context.Context, from, to string, label kv.Label, tables []string, targetPageSize datasize.ByteSize, readAheadThreads int) error {
+ src := mdbx2.NewMDBX(log.New()).Path(from).
+ Label(label).
+ WithTableCfg(func(_ kv.TableCfg) kv.TableCfg { return kv.TablesCfgByLabel(label) }).
+ Flags(func(flags uint) uint { return mdbx.Readonly | mdbx.Accede }).MustOpen()
+ if targetPageSize <= 0 {
+ targetPageSize = datasize.ByteSize(src.PageSize())
+ }
+ info, err := src.(*mdbx2.MdbxKV).Env().Info(nil)
+ if err != nil {
+ return err
+ }
+ dst := mdbx2.NewMDBX(log.New()).Path(to).
+ Label(label).
+ PageSize(targetPageSize.Bytes()).
+ MapSize(datasize.ByteSize(info.Geo.Upper)).
+ WithTableCfg(func(_ kv.TableCfg) kv.TableCfg { return kv.TablesCfgByLabel(label) }).
+ MustOpen()
+ return Kv2kv(ctx, src, dst, tables, readAheadThreads)
+}
+
+func Kv2kv(ctx context.Context, src kv.RoDB, dst kv.RwDB, tables []string, readAheadThreads int) error {
+ srcTx, err1 := src.BeginRo(ctx)
+ if err1 != nil {
+ return err1
+ }
+ defer srcTx.Rollback()
+
+ commitEvery := time.NewTicker(5 * time.Minute)
+ defer commitEvery.Stop()
+ logEvery := time.NewTicker(20 * time.Second)
+ defer logEvery.Stop()
+
+ tablesMap := src.AllTables()
+ if len(tables) > 0 {
+ tablesMapCopy := maps.Clone(tablesMap)
+ tablesMap = kv.TableCfg{}
+ for _, name := range tables {
+ tablesMap[name] = tablesMapCopy[name]
+ }
+ }
+
+ for name, b := range tablesMap {
+ if b.IsDeprecated {
+ continue
+ }
+ if err := backupTable(ctx, src, srcTx, dst, name, readAheadThreads, logEvery); err != nil {
+ return err
+ }
+ }
+ log.Info("done")
+ return nil
+}
+
+func backupTable(ctx context.Context, src kv.RoDB, srcTx kv.Tx, dst kv.RwDB, table string, readAheadThreads int, logEvery *time.Ticker) error {
+ var total uint64
+ wg := sync.WaitGroup{}
+ defer wg.Wait()
+ warmupCtx, warmupCancel := context.WithCancel(ctx)
+ defer warmupCancel()
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ WarmupTable(warmupCtx, src, table, log.LvlTrace, readAheadThreads)
+ }()
+ srcC, err := srcTx.Cursor(table)
+ if err != nil {
+ return err
+ }
+ total, _ = srcC.Count()
+
+ dstTx, err1 := dst.BeginRw(ctx)
+ if err1 != nil {
+ return err1
+ }
+ defer dstTx.Rollback()
+ _ = dstTx.ClearBucket(table)
+
+ c, err := dstTx.RwCursor(table)
+ if err != nil {
+ return err
+ }
+ casted, isDupsort := c.(kv.RwCursorDupSort)
+ i := uint64(0)
+
+ for k, v, err := srcC.First(); k != nil; k, v, err = srcC.Next() {
+ if err != nil {
+ return err
+ }
+
+ if isDupsort {
+ if err = casted.AppendDup(k, v); err != nil {
+ panic(err)
+ }
+ } else {
+ if err = c.Append(k, v); err != nil {
+ panic(err)
+ }
+ }
+
+ i++
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-logEvery.C:
+ var m runtime.MemStats
+ dbg.ReadMemStats(&m)
+ log.Info("Progress", "table", table, "progress", fmt.Sprintf("%.1fm/%.1fm", float64(i)/1_000_000, float64(total)/1_000_000), "key", hex.EncodeToString(k),
+ "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys))
+ default:
+ }
+ }
+ // migrate bucket sequences to native mdbx implementation
+ //currentID, err := srcTx.Sequence(name, 0)
+ //if err != nil {
+ // return err
+ //}
+ //_, err = dstTx.Sequence(name, currentID)
+ //if err != nil {
+ // return err
+ //}
+ if err2 := dstTx.Commit(); err2 != nil {
+ return err2
+ }
+ return nil
+}
+
+const ReadAheadThreads = 128
+
+func WarmupTable(ctx context.Context, db kv.RoDB, bucket string, lvl log.Lvl, readAheadThreads int) {
+ var ThreadsLimit = readAheadThreads
+ var total uint64
+ db.View(ctx, func(tx kv.Tx) error {
+ c, _ := tx.Cursor(bucket)
+ total, _ = c.Count()
+ return nil
+ })
+ if total < 10_000 {
+ return
+ }
+ progress := atomic.Int64{}
+
+ logEvery := time.NewTicker(20 * time.Second)
+ defer logEvery.Stop()
+
+ g, ctx := errgroup.WithContext(ctx)
+ g.SetLimit(ThreadsLimit)
+ for i := 0; i < 256; i++ {
+ for j := 0; j < 256; j++ {
+ i := i
+ j := j
+ g.Go(func() error {
+ return db.View(ctx, func(tx kv.Tx) error {
+ it, err := tx.Prefix(bucket, []byte{byte(i), byte(j)})
+ if err != nil {
+ return err
+ }
+ for it.HasNext() {
+ _, _, err = it.Next()
+ if err != nil {
+ return err
+ }
+ progress.Add(1)
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-logEvery.C:
+ log.Log(lvl, fmt.Sprintf("Progress: %s %.2f%%", bucket, 100*float64(progress.Load())/float64(total)))
+ default:
+ }
+ }
+ return nil
+ })
+ })
+ }
+ }
+ for i := 0; i < 1_000; i++ {
+ i := i
+ g.Go(func() error {
+ return db.View(ctx, func(tx kv.Tx) error {
+ seek := make([]byte, 8)
+ binary.BigEndian.PutUint64(seek, uint64(i*100_000))
+ it, err := tx.Prefix(bucket, seek)
+ if err != nil {
+ return err
+ }
+ for it.HasNext() {
+ _, _, err = it.Next()
+ if err != nil {
+ return err
+ }
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-logEvery.C:
+ log.Log(lvl, fmt.Sprintf("Progress: %s %.2f%%", bucket, 100*float64(progress.Load())/float64(total)))
+ default:
+ }
+ }
+ return nil
+ })
+ })
+ }
+ _ = g.Wait()
+}
+
+func ClearTables(ctx context.Context, db kv.RoDB, tx kv.RwTx, tables ...string) error {
+ for _, tbl := range tables {
+ if err := ClearTable(ctx, db, tx, tbl); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func ClearTable(ctx context.Context, db kv.RoDB, tx kv.RwTx, table string) error {
+ ctx, cancel := context.WithCancel(ctx)
+ clean := warmup(ctx, db, table)
+ defer func() {
+ cancel()
+ clean()
+ }()
+ log.Info("Clear", "table", table)
+ return tx.ClearBucket(table)
+}
+
+func warmup(ctx context.Context, db kv.RoDB, bucket string) func() {
+ wg := sync.WaitGroup{}
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ WarmupTable(ctx, db, bucket, log.LvlInfo, ReadAheadThreads)
+ }()
+ return func() { wg.Wait() }
+}