mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-21 19:20:39 +00:00
erigon backup: v0 of sub-command (#7396)
This commit is contained in:
parent
4e9b378a5d
commit
ded8283df7
15
.github/workflows/check.yml
vendored
Normal file
15
.github/workflows/check.yml
vendored
Normal file
@ -0,0 +1,15 @@
|
||||
name: Check
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- devel
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
goreleaser:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- run: echo ${GITHUB_REF}
|
||||
- run: echo ${GITHUB_REF#refs/tags/}
|
||||
- run: echo ${GITHUB_REF##*/}
|
@ -36,7 +36,7 @@ import (
|
||||
)
|
||||
|
||||
func main() {
|
||||
app := lightclientapp.MakeApp(runCaplinNode, flags.LCDefaultFlags)
|
||||
app := lightclientapp.MakeApp("caplin-phase1", runCaplinNode, flags.LCDefaultFlags)
|
||||
if err := app.Run(os.Args); err != nil {
|
||||
_, printErr := fmt.Fprintln(os.Stderr, err)
|
||||
if printErr != nil {
|
||||
|
@ -64,7 +64,7 @@ func StartNode(wg *sync.WaitGroup, args []string) {
|
||||
os.Exit(1)
|
||||
}()
|
||||
|
||||
app := erigonapp.MakeApp(runNode, erigoncli.DefaultFlags)
|
||||
app := erigonapp.MakeApp("devnet", runNode, erigoncli.DefaultFlags)
|
||||
nodeNumber++ // increment the number of nodes on the network
|
||||
if err := app.Run(args); err != nil {
|
||||
_, printErr := fmt.Fprintln(os.Stderr, err)
|
||||
|
@ -32,7 +32,7 @@ import (
|
||||
)
|
||||
|
||||
func main() {
|
||||
app := sentinelapp.MakeApp(runConsensusLayerNode, flags.CLDefaultFlags)
|
||||
app := sentinelapp.MakeApp("erigon-cl", runConsensusLayerNode, flags.CLDefaultFlags)
|
||||
if err := app.Run(os.Args); err != nil {
|
||||
_, printErr := fmt.Fprintln(os.Stderr, err)
|
||||
if printErr != nil {
|
||||
|
@ -34,7 +34,7 @@ func main() {
|
||||
os.Exit(1)
|
||||
}()
|
||||
|
||||
app := erigonapp.MakeApp(runErigon, erigoncli.DefaultFlags)
|
||||
app := erigonapp.MakeApp("erigon-el", runErigon, erigoncli.DefaultFlags)
|
||||
if err := app.Run(os.Args); err != nil {
|
||||
_, printErr := fmt.Fprintln(os.Stderr, err)
|
||||
if printErr != nil {
|
||||
|
@ -32,7 +32,7 @@ func main() {
|
||||
os.Exit(1)
|
||||
}()
|
||||
|
||||
app := erigonapp.MakeApp(runErigon, erigoncli.DefaultFlags)
|
||||
app := erigonapp.MakeApp("erigon", runErigon, erigoncli.DefaultFlags)
|
||||
if err := app.Run(os.Args); err != nil {
|
||||
_, printErr := fmt.Fprintln(os.Stderr, err)
|
||||
if printErr != nil {
|
||||
|
@ -24,7 +24,7 @@ const (
|
||||
// the regular main function
|
||||
func main() {
|
||||
// initializing Erigon application here and providing our custom flag
|
||||
app := erigonapp.MakeApp(runErigon,
|
||||
app := erigonapp.MakeApp("erigoncustom", runErigon,
|
||||
append(erigoncli.DefaultFlags, &flag), // always use DefaultFlags, but add a new one in the end.
|
||||
)
|
||||
if err := app.Run(os.Args); err != nil {
|
||||
|
@ -103,7 +103,7 @@ var (
|
||||
}
|
||||
VerbosityFlag = cli.IntFlag{
|
||||
Name: "verbosity",
|
||||
Usage: "sets the verbosity level",
|
||||
Usage: "Deprecated. Use --log.console.verbosity, --log.dir.verbosity, --torrent.verbosity, --database.verbosity",
|
||||
Value: 3,
|
||||
}
|
||||
)
|
||||
|
@ -4,24 +4,20 @@ import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
common2 "github.com/ledgerwatch/erigon-lib/common"
|
||||
"github.com/ledgerwatch/erigon-lib/common/dbg"
|
||||
"github.com/ledgerwatch/erigon-lib/kv"
|
||||
mdbx2 "github.com/ledgerwatch/erigon-lib/kv/mdbx"
|
||||
"github.com/ledgerwatch/erigon/common"
|
||||
"github.com/ledgerwatch/erigon/core/rawdb/rawdbreset"
|
||||
"github.com/ledgerwatch/erigon/turbo/backup"
|
||||
"github.com/ledgerwatch/log/v3"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/torquem-ch/mdbx-go/mdbx"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/sync/semaphore"
|
||||
)
|
||||
@ -99,8 +95,9 @@ var cmdMdbxToMdbx = &cobra.Command{
|
||||
Short: "copy data from '--chaindata' to '--chaindata.to'",
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
ctx, _ := common2.RootContext()
|
||||
logger := log.New()
|
||||
err := mdbxToMdbx(ctx, logger, chaindata, toChaindata)
|
||||
|
||||
from, to := backup.OpenPair(chaindata, toChaindata, kv.ChainDB, 0)
|
||||
err := backup.Kv2kv(ctx, from, to, nil, backup.ReadAheadThreads)
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
log.Error(err.Error())
|
||||
@ -427,96 +424,3 @@ MainLoop:
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func mdbxToMdbx(ctx context.Context, logger log.Logger, from, to string) error {
|
||||
src := mdbx2.NewMDBX(logger).Path(from).Flags(func(flags uint) uint { return mdbx.Readonly | mdbx.Accede }).MustOpen()
|
||||
dst := mdbx2.NewMDBX(logger).Path(to).
|
||||
WriteMap().
|
||||
Flags(func(flags uint) uint { return flags | mdbx.NoMemInit | mdbx.WriteMap | mdbx.Accede }).
|
||||
MustOpen()
|
||||
return kv2kv(ctx, src, dst)
|
||||
}
|
||||
|
||||
func kv2kv(ctx context.Context, src, dst kv.RwDB) error {
|
||||
srcTx, err1 := src.BeginRo(ctx)
|
||||
if err1 != nil {
|
||||
return err1
|
||||
}
|
||||
defer srcTx.Rollback()
|
||||
|
||||
commitEvery := time.NewTicker(5 * time.Minute)
|
||||
defer commitEvery.Stop()
|
||||
logEvery := time.NewTicker(20 * time.Second)
|
||||
defer logEvery.Stop()
|
||||
|
||||
var total uint64
|
||||
for name, b := range src.AllBuckets() {
|
||||
if b.IsDeprecated {
|
||||
continue
|
||||
}
|
||||
go rawdbreset.WarmupTable(ctx, src, name, log.LvlTrace)
|
||||
srcC, err := srcTx.Cursor(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
total, _ = srcC.Count()
|
||||
|
||||
dstTx, err1 := dst.BeginRw(ctx)
|
||||
if err1 != nil {
|
||||
return err1
|
||||
}
|
||||
defer dstTx.Rollback()
|
||||
_ = dstTx.ClearBucket(name)
|
||||
|
||||
c, err := dstTx.RwCursor(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
casted, isDupsort := c.(kv.RwCursorDupSort)
|
||||
i := uint64(0)
|
||||
|
||||
for k, v, err := srcC.First(); k != nil; k, v, err = srcC.Next() {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if isDupsort {
|
||||
if err = casted.AppendDup(k, v); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
} else {
|
||||
if err = c.Append(k, v); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
i++
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-logEvery.C:
|
||||
var m runtime.MemStats
|
||||
dbg.ReadMemStats(&m)
|
||||
log.Info("Progress", "bucket", name, "progress", fmt.Sprintf("%.1fm/%.1fm", float64(i)/1_000_000, float64(total)/1_000_000), "key", hex.EncodeToString(k),
|
||||
"alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys))
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// migrate bucket sequences to native mdbx implementation
|
||||
//currentID, err := srcTx.Sequence(name, 0)
|
||||
//if err != nil {
|
||||
// return err
|
||||
//}
|
||||
//_, err = dstTx.Sequence(name, currentID)
|
||||
//if err != nil {
|
||||
// return err
|
||||
//}
|
||||
if err2 := dstTx.Commit(); err2 != nil {
|
||||
return err2
|
||||
}
|
||||
}
|
||||
srcTx.Rollback()
|
||||
log.Info("done")
|
||||
return nil
|
||||
}
|
||||
|
@ -29,7 +29,7 @@ import (
|
||||
)
|
||||
|
||||
func main() {
|
||||
app := sentinelapp.MakeApp(runSentinelNode, flags.CLDefaultFlags)
|
||||
app := sentinelapp.MakeApp("sentinel", runSentinelNode, flags.CLDefaultFlags)
|
||||
if err := app.Run(os.Args); err != nil {
|
||||
_, printErr := fmt.Fprintln(os.Stderr, err)
|
||||
if printErr != nil {
|
||||
|
@ -81,7 +81,7 @@ func init() {
|
||||
|
||||
var rootCmd = &cobra.Command{
|
||||
Use: "txpool",
|
||||
Short: "Launch externa Transaction Pool instance - same as built-into Erigon, but as independent Service",
|
||||
Short: "Launch external Transaction Pool instance - same as built-into Erigon, but as independent Process",
|
||||
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
|
||||
return debug.SetupCobra(cmd)
|
||||
},
|
||||
|
@ -1151,29 +1151,16 @@ func setDataDir(ctx *cli.Context, cfg *nodecfg.Config) {
|
||||
}
|
||||
cfg.Dirs = datadir.New(cfg.Dirs.DataDir)
|
||||
|
||||
if err := cfg.MdbxPageSize.UnmarshalText([]byte(ctx.String(DbPageSizeFlag.Name))); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
cfg.MdbxPageSize = flags.DBPageSizeFlagUnmarshal(ctx, DbPageSizeFlag.Name, DbPageSizeFlag.Usage)
|
||||
if err := cfg.MdbxDBSizeLimit.UnmarshalText([]byte(ctx.String(DbSizeLimitFlag.Name))); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
sz := cfg.MdbxPageSize.Bytes()
|
||||
if !isPowerOfTwo(sz) || sz < 256 || sz > 64*1024 {
|
||||
panic(fmt.Errorf("invalid --db.pageSize: %s=%d, see: %s", ctx.String(DbPageSizeFlag.Name), sz, DbPageSizeFlag.Usage))
|
||||
}
|
||||
szLimit := cfg.MdbxDBSizeLimit.Bytes()
|
||||
if szLimit%sz != 0 || szLimit < 256 {
|
||||
if szLimit%szLimit != 0 || szLimit < 256 {
|
||||
panic(fmt.Errorf("invalid --db.size.limit: %s=%d, see: %s", ctx.String(DbSizeLimitFlag.Name), szLimit, DbSizeLimitFlag.Usage))
|
||||
}
|
||||
}
|
||||
|
||||
func isPowerOfTwo(n uint64) bool {
|
||||
if n == 0 { //corner case: if n is zero it will also consider as power 2
|
||||
return true
|
||||
}
|
||||
return n&(n-1) == 0
|
||||
}
|
||||
|
||||
func setDataDirCobra(f *pflag.FlagSet, cfg *nodecfg.Config) {
|
||||
dirname, err := f.GetString(DataDirFlag.Name)
|
||||
if err != nil {
|
||||
|
@ -20,12 +20,14 @@ import (
|
||||
"encoding"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"os"
|
||||
"os/user"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/c2h5oh/datasize"
|
||||
"github.com/ledgerwatch/erigon/common/math"
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
@ -342,3 +344,22 @@ func eachName(f cli.Flag, fn func(string)) {
|
||||
fn(name)
|
||||
}
|
||||
}
|
||||
|
||||
func DBPageSizeFlagUnmarshal(cliCtx *cli.Context, flagName, flagUsage string) datasize.ByteSize {
|
||||
var pageSize datasize.ByteSize
|
||||
if err := pageSize.UnmarshalText([]byte(cliCtx.String(flagName))); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
sz := pageSize.Bytes()
|
||||
if !isPowerOfTwo(sz) || sz < 256 || sz > 64*1024 {
|
||||
panic(fmt.Errorf("invalid --%s: %d, see: %s", flagName, sz, flagUsage))
|
||||
}
|
||||
return pageSize
|
||||
}
|
||||
|
||||
func isPowerOfTwo(n uint64) bool {
|
||||
if n == 0 { //corner case: if n is zero it will also consider as power 2
|
||||
return true
|
||||
}
|
||||
return n&(n-1) == 0
|
||||
}
|
||||
|
@ -2,11 +2,7 @@ package rawdbreset
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/ledgerwatch/erigon-lib/chain"
|
||||
"github.com/ledgerwatch/erigon-lib/common/datadir"
|
||||
@ -14,16 +10,15 @@ import (
|
||||
"github.com/ledgerwatch/erigon-lib/kv"
|
||||
"github.com/ledgerwatch/erigon-lib/kv/kvcfg"
|
||||
"github.com/ledgerwatch/erigon-lib/state"
|
||||
"github.com/ledgerwatch/log/v3"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/ledgerwatch/erigon/consensus"
|
||||
"github.com/ledgerwatch/erigon/core"
|
||||
"github.com/ledgerwatch/erigon/core/rawdb"
|
||||
"github.com/ledgerwatch/erigon/eth/stagedsync"
|
||||
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
|
||||
"github.com/ledgerwatch/erigon/turbo/backup"
|
||||
"github.com/ledgerwatch/erigon/turbo/services"
|
||||
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
|
||||
"github.com/ledgerwatch/log/v3"
|
||||
)
|
||||
|
||||
func ResetState(db kv.RwDB, ctx context.Context, chain string, tmpDir string) error {
|
||||
@ -99,7 +94,7 @@ func ResetBlocks(tx kv.RwTx, db kv.RoDB, snapshots *snapshotsync.RoSnapshots, ag
|
||||
ethtx = kv.EthTxV3
|
||||
}
|
||||
|
||||
if err := clearTables(context.Background(), db, tx,
|
||||
if err := backup.ClearTables(context.Background(), db, tx,
|
||||
kv.NonCanonicalTxs,
|
||||
ethtx,
|
||||
kv.MaxTxNum,
|
||||
@ -126,7 +121,7 @@ func ResetBlocks(tx kv.RwTx, db kv.RoDB, snapshots *snapshotsync.RoSnapshots, ag
|
||||
return nil
|
||||
}
|
||||
func ResetSenders(ctx context.Context, db kv.RwDB, tx kv.RwTx) error {
|
||||
if err := clearTables(ctx, db, tx, kv.Senders); err != nil {
|
||||
if err := backup.ClearTables(ctx, db, tx, kv.Senders); err != nil {
|
||||
return nil
|
||||
}
|
||||
return clearStageProgress(tx, stages.Senders)
|
||||
@ -134,12 +129,12 @@ func ResetSenders(ctx context.Context, db kv.RwDB, tx kv.RwTx) error {
|
||||
|
||||
func WarmupExec(ctx context.Context, db kv.RwDB) (err error) {
|
||||
for _, tbl := range stateBuckets {
|
||||
WarmupTable(ctx, db, tbl, log.LvlInfo)
|
||||
backup.WarmupTable(ctx, db, tbl, log.LvlInfo, backup.ReadAheadThreads)
|
||||
}
|
||||
historyV3 := kvcfg.HistoryV3.FromDB(db)
|
||||
if historyV3 { //hist v2 is too big, if you have so much ram, just use `cat mdbx.dat > /dev/null` to warmup
|
||||
for _, tbl := range stateHistoryV3Buckets {
|
||||
WarmupTable(ctx, db, tbl, log.LvlInfo)
|
||||
backup.WarmupTable(ctx, db, tbl, log.LvlInfo, backup.ReadAheadThreads)
|
||||
}
|
||||
}
|
||||
return
|
||||
@ -157,7 +152,7 @@ func ResetExec(ctx context.Context, db kv.RwDB, chain string, tmpDir string) (er
|
||||
return err
|
||||
}
|
||||
|
||||
if err := clearTables(ctx, db, tx, stateBuckets...); err != nil {
|
||||
if err := backup.ClearTables(ctx, db, tx, stateBuckets...); err != nil {
|
||||
return nil
|
||||
}
|
||||
for _, b := range stateBuckets {
|
||||
@ -166,7 +161,7 @@ func ResetExec(ctx context.Context, db kv.RwDB, chain string, tmpDir string) (er
|
||||
}
|
||||
}
|
||||
|
||||
if err := clearTables(ctx, db, tx, stateHistoryBuckets...); err != nil {
|
||||
if err := backup.ClearTables(ctx, db, tx, stateHistoryBuckets...); err != nil {
|
||||
return nil
|
||||
}
|
||||
if !historyV3 {
|
||||
@ -231,135 +226,6 @@ var stateHistoryV4Buckets = []string{
|
||||
kv.CommitmentKeys, kv.CommitmentVals, kv.CommitmentHistoryKeys, kv.CommitmentHistoryVals, kv.CommitmentIdx,
|
||||
}
|
||||
|
||||
func WarmupTable(ctx context.Context, db kv.RoDB, bucket string, lvl log.Lvl) {
|
||||
const ThreadsLimit = 128
|
||||
var total uint64
|
||||
db.View(ctx, func(tx kv.Tx) error {
|
||||
c, _ := tx.Cursor(bucket)
|
||||
total, _ = c.Count()
|
||||
return nil
|
||||
})
|
||||
if total < 10_000 {
|
||||
return
|
||||
}
|
||||
progress := atomic.Int64{}
|
||||
|
||||
logEvery := time.NewTicker(20 * time.Second)
|
||||
defer logEvery.Stop()
|
||||
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
g.SetLimit(ThreadsLimit)
|
||||
for i := 0; i < 256; i++ {
|
||||
for j := 0; j < 256; j++ {
|
||||
i := i
|
||||
j := j
|
||||
g.Go(func() error {
|
||||
return db.View(ctx, func(tx kv.Tx) error {
|
||||
it, err := tx.Prefix(bucket, []byte{byte(i), byte(j)})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for it.HasNext() {
|
||||
_, _, err = it.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
progress.Add(1)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-logEvery.C:
|
||||
log.Log(lvl, fmt.Sprintf("Progress: %s %.2f%%", bucket, 100*float64(progress.Load())/float64(total)))
|
||||
default:
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
for i := 0; i < 1_000; i++ {
|
||||
i := i
|
||||
g.Go(func() error {
|
||||
return db.View(ctx, func(tx kv.Tx) error {
|
||||
seek := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(seek, uint64(i*100_000))
|
||||
it, err := tx.Prefix(bucket, seek)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for it.HasNext() {
|
||||
_, _, err = it.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-logEvery.C:
|
||||
log.Log(lvl, fmt.Sprintf("Progress: %s %.2f%%", bucket, 100*float64(progress.Load())/float64(total)))
|
||||
default:
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
})
|
||||
}
|
||||
_ = g.Wait()
|
||||
}
|
||||
func Warmup(ctx context.Context, db kv.RwDB, lvl log.Lvl, stList ...stages.SyncStage) error {
|
||||
for _, st := range stList {
|
||||
for _, tbl := range Tables[st] {
|
||||
WarmupTable(ctx, db, tbl, lvl)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Reset(ctx context.Context, db kv.RwDB, stagesList ...stages.SyncStage) error {
|
||||
return db.Update(ctx, func(tx kv.RwTx) error {
|
||||
for _, st := range stagesList {
|
||||
if err := clearTables(ctx, db, tx, Tables[st]...); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := clearStageProgress(tx, stagesList...); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func warmup(ctx context.Context, db kv.RoDB, bucket string) func() {
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
WarmupTable(ctx, db, bucket, log.LvlInfo)
|
||||
}()
|
||||
return func() { wg.Wait() }
|
||||
}
|
||||
|
||||
func clearTables(ctx context.Context, db kv.RoDB, tx kv.RwTx, tables ...string) error {
|
||||
for _, tbl := range tables {
|
||||
if err := clearTable(ctx, db, tx, tbl); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func clearTable(ctx context.Context, db kv.RoDB, tx kv.RwTx, table string) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
clean := warmup(ctx, db, table)
|
||||
defer func() {
|
||||
cancel()
|
||||
clean()
|
||||
}()
|
||||
log.Info("Clear", "table", table)
|
||||
return tx.ClearBucket(table)
|
||||
}
|
||||
|
||||
func clearStageProgress(tx kv.RwTx, stagesList ...stages.SyncStage) error {
|
||||
for _, stage := range stagesList {
|
||||
if err := stages.SaveStageProgress(tx, stage, 0); err != nil {
|
||||
@ -371,3 +237,25 @@ func clearStageProgress(tx kv.RwTx, stagesList ...stages.SyncStage) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Reset(ctx context.Context, db kv.RwDB, stagesList ...stages.SyncStage) error {
|
||||
return db.Update(ctx, func(tx kv.RwTx) error {
|
||||
for _, st := range stagesList {
|
||||
if err := backup.ClearTables(ctx, db, tx, Tables[st]...); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := clearStageProgress(tx, stagesList...); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
func Warmup(ctx context.Context, db kv.RwDB, lvl log.Lvl, stList ...stages.SyncStage) error {
|
||||
for _, st := range stList {
|
||||
for _, tbl := range Tables[st] {
|
||||
backup.WarmupTable(ctx, db, tbl, lvl, backup.ReadAheadThreads)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -14,8 +14,6 @@
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//go:build integration
|
||||
|
||||
package state
|
||||
|
||||
import (
|
||||
|
2
go.mod
2
go.mod
@ -3,7 +3,7 @@ module github.com/ledgerwatch/erigon
|
||||
go 1.19
|
||||
|
||||
require (
|
||||
github.com/ledgerwatch/erigon-lib v0.0.0-20230424042211-39e97b6becde
|
||||
github.com/ledgerwatch/erigon-lib v0.0.0-20230427033621-8ed42874e3e2
|
||||
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20230404044759-5dec854ce336
|
||||
github.com/ledgerwatch/log/v3 v3.7.0
|
||||
github.com/ledgerwatch/secp256k1 v1.0.0
|
||||
|
4
go.sum
4
go.sum
@ -438,8 +438,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0=
|
||||
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
|
||||
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
|
||||
github.com/ledgerwatch/erigon-lib v0.0.0-20230424042211-39e97b6becde h1:6ldvFEx1d0gPIdiXYV3DGGvUjTkPn78m/5B9/6T/LUo=
|
||||
github.com/ledgerwatch/erigon-lib v0.0.0-20230424042211-39e97b6becde/go.mod h1:D05f9OXc/2cnYxCyBexlu5HeIeQW9GKXynyWYzJ1F5I=
|
||||
github.com/ledgerwatch/erigon-lib v0.0.0-20230427033621-8ed42874e3e2 h1:upWpgnnGdc/CZLok0F0uaVJmFhaHPDrjKsXpE6KeDsY=
|
||||
github.com/ledgerwatch/erigon-lib v0.0.0-20230427033621-8ed42874e3e2/go.mod h1:D05f9OXc/2cnYxCyBexlu5HeIeQW9GKXynyWYzJ1F5I=
|
||||
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20230404044759-5dec854ce336 h1:Yxmt4Wyd0RCLr7UJJAl0ApCP/f5qkWfvHfgPbnI8ghM=
|
||||
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20230404044759-5dec854ce336/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo=
|
||||
github.com/ledgerwatch/log/v3 v3.7.0 h1:aFPEZdwZx4jzA3+/Pf8wNDN5tCI0cIolq/kfvgcM+og=
|
||||
|
139
turbo/app/backup_cmd.go
Normal file
139
turbo/app/backup_cmd.go
Normal file
@ -0,0 +1,139 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/c2h5oh/datasize"
|
||||
"github.com/ledgerwatch/erigon-lib/common/datadir"
|
||||
"github.com/ledgerwatch/erigon-lib/common/dir"
|
||||
"github.com/ledgerwatch/erigon-lib/kv"
|
||||
"github.com/ledgerwatch/erigon/cmd/utils"
|
||||
"github.com/ledgerwatch/erigon/cmd/utils/flags"
|
||||
"github.com/ledgerwatch/erigon/turbo/backup"
|
||||
"github.com/ledgerwatch/erigon/turbo/debug"
|
||||
"github.com/ledgerwatch/erigon/turbo/logging"
|
||||
"github.com/ledgerwatch/log/v3"
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
// nolint
|
||||
var backupCommand = cli.Command{
|
||||
Name: "backup",
|
||||
Description: `Backup all databases of Erigon.
|
||||
Can do backup without stopping Erigon.
|
||||
Limitations:
|
||||
- no support of Consensus DB (copy it manually if you need). Possible to implement in future.
|
||||
- no support of datadir/snapshots folder. Possible to implement in future. Can copy it manually or rsync or symlink/mount.
|
||||
- way to pipe output to compressor (lz4/zstd). Can compress target floder later or use zfs-with-enabled-compression.
|
||||
- jwt tocken: copy it manually - if need.
|
||||
- no support of SentryDB (datadir/nodes folder). Because seems no much reason to backup it.
|
||||
|
||||
Example: erigon backup --datadir=<your_datadir> --to.datadir=<backup_datadir>
|
||||
`,
|
||||
Action: doBackup,
|
||||
Flags: joinFlags([]cli.Flag{
|
||||
&utils.DataDirFlag,
|
||||
&ToDatadirFlag,
|
||||
&BackupToPageSizeFlag,
|
||||
&BackupLabelsFlag,
|
||||
&BackupTablesFlag,
|
||||
&WarmupThreadsFlag,
|
||||
}, debug.Flags, logging.Flags),
|
||||
}
|
||||
|
||||
var (
|
||||
ToDatadirFlag = flags.DirectoryFlag{
|
||||
Name: "to.datadir",
|
||||
Usage: "Target datadir",
|
||||
Required: true,
|
||||
}
|
||||
BackupLabelsFlag = cli.StringFlag{
|
||||
Name: "lables",
|
||||
Usage: "Name of component to backup. Example: chaindata,txpool,downloader",
|
||||
}
|
||||
BackupTablesFlag = cli.StringFlag{
|
||||
Name: "tables",
|
||||
Usage: "One of: PlainState,HashedState",
|
||||
}
|
||||
BackupToPageSizeFlag = cli.StringFlag{
|
||||
Name: "to.pagesize",
|
||||
Usage: utils.DbPageSizeFlag.Usage,
|
||||
}
|
||||
WarmupThreadsFlag = cli.Uint64Flag{
|
||||
Name: "warmup.threads",
|
||||
Usage: `Erigon's db works as blocking-io: means it stops when read from disk.
|
||||
It means backup speed depends on 'disk latency' (not throughput).
|
||||
Can spawn many threads which will read-ahead the data and bring it to OS's PageCache.
|
||||
CloudDrives (and ssd) have bad-latency and good-parallel-throughput - then having >1k of warmup threads will help.`,
|
||||
Value: uint64(backup.ReadAheadThreads),
|
||||
}
|
||||
)
|
||||
|
||||
func doBackup(cliCtx *cli.Context) error {
|
||||
defer log.Info("backup done")
|
||||
|
||||
ctx := cliCtx.Context
|
||||
dirs := datadir.New(cliCtx.String(utils.DataDirFlag.Name))
|
||||
toDirs := datadir.New(cliCtx.String(ToDatadirFlag.Name))
|
||||
|
||||
var targetPageSize datasize.ByteSize
|
||||
if cliCtx.IsSet(BackupToPageSizeFlag.Name) {
|
||||
targetPageSize = flags.DBPageSizeFlagUnmarshal(cliCtx, BackupToPageSizeFlag.Name, BackupToPageSizeFlag.Usage)
|
||||
}
|
||||
|
||||
var lables = []kv.Label{kv.ChainDB, kv.TxPoolDB, kv.DownloaderDB}
|
||||
if cliCtx.IsSet(BackupToPageSizeFlag.Name) {
|
||||
lables = lables[:0]
|
||||
for _, l := range utils.SplitAndTrim(cliCtx.String(BackupLabelsFlag.Name)) {
|
||||
lables = append(lables, kv.UnmarshalLabel(l))
|
||||
}
|
||||
}
|
||||
|
||||
var tables []string
|
||||
if cliCtx.IsSet(BackupTablesFlag.Name) {
|
||||
tables = utils.SplitAndTrim(cliCtx.String(BackupTablesFlag.Name))
|
||||
}
|
||||
|
||||
readAheadThreads := backup.ReadAheadThreads
|
||||
if cliCtx.IsSet(WarmupThreadsFlag.Name) {
|
||||
readAheadThreads = int(cliCtx.Uint64(WarmupThreadsFlag.Name))
|
||||
}
|
||||
|
||||
//kv.SentryDB no much reason to backup
|
||||
//TODO: add support of kv.ConsensusDB
|
||||
for _, label := range lables {
|
||||
var from, to string
|
||||
switch label {
|
||||
case kv.ChainDB:
|
||||
from, to = dirs.Chaindata, toDirs.Chaindata
|
||||
case kv.TxPoolDB:
|
||||
from, to = dirs.TxPool, toDirs.TxPool
|
||||
case kv.DownloaderDB:
|
||||
from, to = filepath.Join(dirs.Snap, "db"), filepath.Join(toDirs.Snap, "db")
|
||||
default:
|
||||
panic(fmt.Sprintf("unexpected: %+v", label))
|
||||
}
|
||||
|
||||
if !dir.Exist(from) {
|
||||
continue
|
||||
}
|
||||
|
||||
if len(tables) == 0 { // if not partial backup - just drop target dir, to make backup more compact/fast (instead of clean tables)
|
||||
if err := os.RemoveAll(to); err != nil {
|
||||
return fmt.Errorf("mkdir: %w, %s", err, to)
|
||||
}
|
||||
}
|
||||
if err := os.MkdirAll(to, 0740); err != nil { //owner: rw, group: r, others: -
|
||||
return fmt.Errorf("mkdir: %w, %s", err, to)
|
||||
}
|
||||
log.Info("[backup] start", "label", label)
|
||||
fromDB, toDB := backup.OpenPair(from, to, label, targetPageSize)
|
||||
if err := backup.Kv2kv(ctx, fromDB, toDB, nil, readAheadThreads); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -38,7 +38,7 @@ var importCommand = cli.Command{
|
||||
&utils.DataDirFlag,
|
||||
&utils.ChainFlag,
|
||||
},
|
||||
Category: "BLOCKCHAIN COMMANDS",
|
||||
//Category: "BLOCKCHAIN COMMANDS",
|
||||
Description: `
|
||||
The import command imports blocks from an RLP-encoded form. The form can be one file
|
||||
with several RLP-encoded blocks, or several files can be used.
|
@ -22,7 +22,7 @@ var initCommand = cli.Command{
|
||||
Flags: []cli.Flag{
|
||||
&utils.DataDirFlag,
|
||||
},
|
||||
Category: "BLOCKCHAIN COMMANDS",
|
||||
//Category: "BLOCKCHAIN COMMANDS",
|
||||
Description: `
|
||||
The init command initializes a new genesis block and definition for the network.
|
||||
This is a destructive action and changes the network in which you will be
|
@ -2,9 +2,11 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/ledgerwatch/erigon-lib/common/datadir"
|
||||
"github.com/ledgerwatch/log/v3"
|
||||
"github.com/urfave/cli/v2"
|
||||
|
||||
"github.com/ledgerwatch/erigon/cmd/utils"
|
||||
@ -20,9 +22,24 @@ import (
|
||||
// Parameters:
|
||||
// * action: the main function for the application. receives `*cli.Context` with parsed command-line flags.
|
||||
// * cliFlags: the list of flags `cli.Flag` that the app should set and parse. By default, use `DefaultFlags()`. If you want to specify your own flag, use `append(DefaultFlags(), myFlag)` for this parameter.
|
||||
func MakeApp(action cli.ActionFunc, cliFlags []cli.Flag) *cli.App {
|
||||
app := cli2.NewApp(params.GitCommit, "erigon experimental cli")
|
||||
app.Action = action
|
||||
func MakeApp(name string, action cli.ActionFunc, cliFlags []cli.Flag) *cli.App {
|
||||
app := cli2.NewApp(params.GitCommit, "erigon")
|
||||
app.Name = name
|
||||
app.UsageText = app.Name + ` [command] [flags]`
|
||||
app.Action = func(context *cli.Context) error {
|
||||
// handle case: unknown sub-command
|
||||
if context.Args().Present() {
|
||||
var goodNames []string
|
||||
for _, c := range app.VisibleCommands() {
|
||||
goodNames = append(goodNames, c.Name)
|
||||
}
|
||||
log.Error(fmt.Sprintf("Command '%s' not found. Available commands: %s", context.Args().First(), goodNames))
|
||||
cli.ShowAppHelpAndExit(context, 1)
|
||||
}
|
||||
|
||||
// run default action
|
||||
return action(context)
|
||||
}
|
||||
app.Flags = append(cliFlags, debug.Flags...) // debug flags are required
|
||||
app.Before = func(ctx *cli.Context) error {
|
||||
return debug.Setup(ctx)
|
||||
@ -31,7 +48,13 @@ func MakeApp(action cli.ActionFunc, cliFlags []cli.Flag) *cli.App {
|
||||
debug.Exit()
|
||||
return nil
|
||||
}
|
||||
app.Commands = []*cli.Command{&initCommand, &importCommand, &snapshotCommand, &supportCommand}
|
||||
app.Commands = []*cli.Command{
|
||||
&initCommand,
|
||||
&importCommand,
|
||||
&snapshotCommand,
|
||||
&supportCommand,
|
||||
//&backupCommand,
|
||||
}
|
||||
return app
|
||||
}
|
||||
|
||||
|
@ -45,8 +45,8 @@ func joinFlags(lists ...[]cli.Flag) (res []cli.Flag) {
|
||||
}
|
||||
|
||||
var snapshotCommand = cli.Command{
|
||||
Name: "snapshots",
|
||||
Description: `Managing snapshots (historical data partitions)`,
|
||||
Name: "snapshots",
|
||||
Usage: `Managing snapshots (historical data partitions)`,
|
||||
Subcommands: []*cli.Command{
|
||||
{
|
||||
Name: "index",
|
@ -44,7 +44,7 @@ var supportCommand = cli.Command{
|
||||
&diagnosticsURLFlag,
|
||||
&insecureFlag,
|
||||
},
|
||||
Category: "SUPPORT COMMANDS",
|
||||
//Category: "SUPPORT COMMANDS",
|
||||
Description: `
|
||||
The support command connects a running Erigon instances to a diagnostics system specified
|
||||
by the URL.`,
|
279
turbo/backup/backup.go
Normal file
279
turbo/backup/backup.go
Normal file
@ -0,0 +1,279 @@
|
||||
package backup
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/c2h5oh/datasize"
|
||||
common2 "github.com/ledgerwatch/erigon-lib/common"
|
||||
"github.com/ledgerwatch/erigon-lib/common/dbg"
|
||||
"github.com/ledgerwatch/erigon-lib/kv"
|
||||
mdbx2 "github.com/ledgerwatch/erigon-lib/kv/mdbx"
|
||||
"github.com/ledgerwatch/log/v3"
|
||||
"github.com/torquem-ch/mdbx-go/mdbx"
|
||||
"golang.org/x/exp/maps"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func OpenPair(from, to string, label kv.Label, targetPageSize datasize.ByteSize) (kv.RoDB, kv.RwDB) {
|
||||
src := mdbx2.NewMDBX(log.New()).Path(from).
|
||||
Label(label).
|
||||
WithTableCfg(func(_ kv.TableCfg) kv.TableCfg { return kv.TablesCfgByLabel(label) }).
|
||||
Flags(func(flags uint) uint { return mdbx.Readonly | mdbx.Accede }).MustOpen()
|
||||
if targetPageSize <= 0 {
|
||||
targetPageSize = datasize.ByteSize(src.PageSize())
|
||||
}
|
||||
info, err := src.(*mdbx2.MdbxKV).Env().Info(nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
dst := mdbx2.NewMDBX(log.New()).Path(to).
|
||||
Label(label).
|
||||
PageSize(targetPageSize.Bytes()).
|
||||
MapSize(datasize.ByteSize(info.Geo.Upper)).
|
||||
WithTableCfg(func(_ kv.TableCfg) kv.TableCfg { return kv.TablesCfgByLabel(label) }).
|
||||
MustOpen()
|
||||
return src, dst
|
||||
}
|
||||
func MdbxToMdbx(ctx context.Context, from, to string, label kv.Label, tables []string, targetPageSize datasize.ByteSize, readAheadThreads int) error {
|
||||
src := mdbx2.NewMDBX(log.New()).Path(from).
|
||||
Label(label).
|
||||
WithTableCfg(func(_ kv.TableCfg) kv.TableCfg { return kv.TablesCfgByLabel(label) }).
|
||||
Flags(func(flags uint) uint { return mdbx.Readonly | mdbx.Accede }).MustOpen()
|
||||
if targetPageSize <= 0 {
|
||||
targetPageSize = datasize.ByteSize(src.PageSize())
|
||||
}
|
||||
info, err := src.(*mdbx2.MdbxKV).Env().Info(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dst := mdbx2.NewMDBX(log.New()).Path(to).
|
||||
Label(label).
|
||||
PageSize(targetPageSize.Bytes()).
|
||||
MapSize(datasize.ByteSize(info.Geo.Upper)).
|
||||
WithTableCfg(func(_ kv.TableCfg) kv.TableCfg { return kv.TablesCfgByLabel(label) }).
|
||||
MustOpen()
|
||||
return Kv2kv(ctx, src, dst, tables, readAheadThreads)
|
||||
}
|
||||
|
||||
func Kv2kv(ctx context.Context, src kv.RoDB, dst kv.RwDB, tables []string, readAheadThreads int) error {
|
||||
srcTx, err1 := src.BeginRo(ctx)
|
||||
if err1 != nil {
|
||||
return err1
|
||||
}
|
||||
defer srcTx.Rollback()
|
||||
|
||||
commitEvery := time.NewTicker(5 * time.Minute)
|
||||
defer commitEvery.Stop()
|
||||
logEvery := time.NewTicker(20 * time.Second)
|
||||
defer logEvery.Stop()
|
||||
|
||||
tablesMap := src.AllTables()
|
||||
if len(tables) > 0 {
|
||||
tablesMapCopy := maps.Clone(tablesMap)
|
||||
tablesMap = kv.TableCfg{}
|
||||
for _, name := range tables {
|
||||
tablesMap[name] = tablesMapCopy[name]
|
||||
}
|
||||
}
|
||||
|
||||
for name, b := range tablesMap {
|
||||
if b.IsDeprecated {
|
||||
continue
|
||||
}
|
||||
if err := backupTable(ctx, src, srcTx, dst, name, readAheadThreads, logEvery); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
log.Info("done")
|
||||
return nil
|
||||
}
|
||||
|
||||
func backupTable(ctx context.Context, src kv.RoDB, srcTx kv.Tx, dst kv.RwDB, table string, readAheadThreads int, logEvery *time.Ticker) error {
|
||||
var total uint64
|
||||
wg := sync.WaitGroup{}
|
||||
defer wg.Wait()
|
||||
warmupCtx, warmupCancel := context.WithCancel(ctx)
|
||||
defer warmupCancel()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
WarmupTable(warmupCtx, src, table, log.LvlTrace, readAheadThreads)
|
||||
}()
|
||||
srcC, err := srcTx.Cursor(table)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
total, _ = srcC.Count()
|
||||
|
||||
dstTx, err1 := dst.BeginRw(ctx)
|
||||
if err1 != nil {
|
||||
return err1
|
||||
}
|
||||
defer dstTx.Rollback()
|
||||
_ = dstTx.ClearBucket(table)
|
||||
|
||||
c, err := dstTx.RwCursor(table)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
casted, isDupsort := c.(kv.RwCursorDupSort)
|
||||
i := uint64(0)
|
||||
|
||||
for k, v, err := srcC.First(); k != nil; k, v, err = srcC.Next() {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if isDupsort {
|
||||
if err = casted.AppendDup(k, v); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
} else {
|
||||
if err = c.Append(k, v); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
i++
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-logEvery.C:
|
||||
var m runtime.MemStats
|
||||
dbg.ReadMemStats(&m)
|
||||
log.Info("Progress", "table", table, "progress", fmt.Sprintf("%.1fm/%.1fm", float64(i)/1_000_000, float64(total)/1_000_000), "key", hex.EncodeToString(k),
|
||||
"alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys))
|
||||
default:
|
||||
}
|
||||
}
|
||||
// migrate bucket sequences to native mdbx implementation
|
||||
//currentID, err := srcTx.Sequence(name, 0)
|
||||
//if err != nil {
|
||||
// return err
|
||||
//}
|
||||
//_, err = dstTx.Sequence(name, currentID)
|
||||
//if err != nil {
|
||||
// return err
|
||||
//}
|
||||
if err2 := dstTx.Commit(); err2 != nil {
|
||||
return err2
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
const ReadAheadThreads = 128
|
||||
|
||||
func WarmupTable(ctx context.Context, db kv.RoDB, bucket string, lvl log.Lvl, readAheadThreads int) {
|
||||
var ThreadsLimit = readAheadThreads
|
||||
var total uint64
|
||||
db.View(ctx, func(tx kv.Tx) error {
|
||||
c, _ := tx.Cursor(bucket)
|
||||
total, _ = c.Count()
|
||||
return nil
|
||||
})
|
||||
if total < 10_000 {
|
||||
return
|
||||
}
|
||||
progress := atomic.Int64{}
|
||||
|
||||
logEvery := time.NewTicker(20 * time.Second)
|
||||
defer logEvery.Stop()
|
||||
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
g.SetLimit(ThreadsLimit)
|
||||
for i := 0; i < 256; i++ {
|
||||
for j := 0; j < 256; j++ {
|
||||
i := i
|
||||
j := j
|
||||
g.Go(func() error {
|
||||
return db.View(ctx, func(tx kv.Tx) error {
|
||||
it, err := tx.Prefix(bucket, []byte{byte(i), byte(j)})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for it.HasNext() {
|
||||
_, _, err = it.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
progress.Add(1)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-logEvery.C:
|
||||
log.Log(lvl, fmt.Sprintf("Progress: %s %.2f%%", bucket, 100*float64(progress.Load())/float64(total)))
|
||||
default:
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
for i := 0; i < 1_000; i++ {
|
||||
i := i
|
||||
g.Go(func() error {
|
||||
return db.View(ctx, func(tx kv.Tx) error {
|
||||
seek := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(seek, uint64(i*100_000))
|
||||
it, err := tx.Prefix(bucket, seek)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for it.HasNext() {
|
||||
_, _, err = it.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-logEvery.C:
|
||||
log.Log(lvl, fmt.Sprintf("Progress: %s %.2f%%", bucket, 100*float64(progress.Load())/float64(total)))
|
||||
default:
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
})
|
||||
}
|
||||
_ = g.Wait()
|
||||
}
|
||||
|
||||
func ClearTables(ctx context.Context, db kv.RoDB, tx kv.RwTx, tables ...string) error {
|
||||
for _, tbl := range tables {
|
||||
if err := ClearTable(ctx, db, tx, tbl); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func ClearTable(ctx context.Context, db kv.RoDB, tx kv.RwTx, table string) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
clean := warmup(ctx, db, table)
|
||||
defer func() {
|
||||
cancel()
|
||||
clean()
|
||||
}()
|
||||
log.Info("Clear", "table", table)
|
||||
return tx.ClearBucket(table)
|
||||
}
|
||||
|
||||
func warmup(ctx context.Context, db kv.RoDB, bucket string) func() {
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
WarmupTable(ctx, db, bucket, log.LvlInfo, ReadAheadThreads)
|
||||
}()
|
||||
return func() { wg.Wait() }
|
||||
}
|
Loading…
Reference in New Issue
Block a user