From fc71d441a68bf26422df1783054a826a197f1785 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Wed, 28 Dec 2022 13:27:42 +0700 Subject: [PATCH] mdbx_to_mdbx better logging (#6455) --- cmd/integration/commands/refetence_db.go | 7 +++++-- cmd/integration/commands/stages.go | 10 +++++----- core/rawdb/rawdbreset/reset_stages.go | 12 ++++++------ 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/cmd/integration/commands/refetence_db.go b/cmd/integration/commands/refetence_db.go index 64ef4b9e8..b794bf5d6 100644 --- a/cmd/integration/commands/refetence_db.go +++ b/cmd/integration/commands/refetence_db.go @@ -443,6 +443,8 @@ func kv2kv(ctx context.Context, src, dst kv.RwDB) error { commitEvery := time.NewTicker(5 * time.Minute) defer commitEvery.Stop() + logEvery := time.NewTicker(20 * time.Second) + defer logEvery.Stop() var total uint64 for name, b := range src.AllBuckets() { @@ -450,7 +452,7 @@ func kv2kv(ctx context.Context, src, dst kv.RwDB) error { continue } - rawdbreset.WarmupTable(ctx, src, name) + rawdbreset.WarmupTable(ctx, src, name, log.LvlTrace) _ = dstTx.ClearBucket(name) c, err := dstTx.RwCursor(name) if err != nil { @@ -484,8 +486,9 @@ func kv2kv(ctx context.Context, src, dst kv.RwDB) error { select { case <-ctx.Done(): return ctx.Err() - case <-commitEvery.C: + case <-logEvery.C: log.Info("Progress", "bucket", name, "progress", fmt.Sprintf("%.1fm/%.1fm", float64(i)/1_000_000, float64(total)/1_000_000), "key", hex.EncodeToString(k)) + case <-commitEvery.C: if err2 := dstTx.Commit(); err2 != nil { return err2 } diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index b0b805715..6e680a76d 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -738,7 +738,7 @@ func stageTrie(db kv.RwDB, ctx context.Context) error { _, agg := allSnapshots(db) if warmup { - return reset2.Warmup(ctx, db, stages.IntermediateHashes) + return reset2.Warmup(ctx, db, log.LvlInfo, stages.IntermediateHashes) } if reset { return reset2.Reset(ctx, db, stages.IntermediateHashes) @@ -792,7 +792,7 @@ func stageHashState(db kv.RwDB, ctx context.Context) error { _, agg := allSnapshots(db) if warmup { - return reset2.Warmup(ctx, db, stages.HashState) + return reset2.Warmup(ctx, db, log.LvlInfo, stages.HashState) } if reset { return reset2.Reset(ctx, db, stages.HashState) @@ -847,7 +847,7 @@ func stageLogIndex(db kv.RwDB, ctx context.Context) error { _, _, sync, _, _ := newSync(ctx, db, nil) must(sync.SetCurrentStage(stages.LogIndex)) if warmup { - return reset2.Warmup(ctx, db, stages.LogIndex) + return reset2.Warmup(ctx, db, log.LvlInfo, stages.LogIndex) } if reset { return reset2.Reset(ctx, db, stages.LogIndex) @@ -903,7 +903,7 @@ func stageCallTraces(db kv.RwDB, ctx context.Context) error { must(sync.SetCurrentStage(stages.CallTraces)) if warmup { - return reset2.Warmup(ctx, db, stages.CallTraces) + return reset2.Warmup(ctx, db, log.LvlInfo, stages.CallTraces) } if reset { return reset2.Reset(ctx, db, stages.CallTraces) @@ -966,7 +966,7 @@ func stageHistory(db kv.RwDB, ctx context.Context) error { must(sync.SetCurrentStage(stages.AccountHistoryIndex)) if warmup { - return reset2.Warmup(ctx, db, stages.AccountHistoryIndex, stages.StorageHistoryIndex) + return reset2.Warmup(ctx, db, log.LvlInfo, stages.AccountHistoryIndex, stages.StorageHistoryIndex) } if reset { return reset2.Reset(ctx, db, stages.AccountHistoryIndex, stages.StorageHistoryIndex) diff --git a/core/rawdb/rawdbreset/reset_stages.go b/core/rawdb/rawdbreset/reset_stages.go index f5c7ef47e..61f67f1c6 100644 --- a/core/rawdb/rawdbreset/reset_stages.go +++ b/core/rawdb/rawdbreset/reset_stages.go @@ -208,8 +208,8 @@ var Tables = map[stages.SyncStage][]string{ stages.Finish: {}, } -func WarmupTable(ctx context.Context, db kv.RoDB, bucket string) { - const ThreadsLimit = 1024 +func WarmupTable(ctx context.Context, db kv.RoDB, bucket string, lvl log.Lvl) { + const ThreadsLimit = 256 var total uint64 db.View(ctx, func(tx kv.Tx) error { c, _ := tx.Cursor(bucket) @@ -240,7 +240,7 @@ func WarmupTable(ctx context.Context, db kv.RoDB, bucket string) { } select { case <-logEvery.C: - log.Info(fmt.Sprintf("Progress: %s %.2f%%", bucket, 100*float64(progress.Load())/float64(total))) + log.Log(lvl, fmt.Sprintf("Progress: %s %.2f%%", bucket, 100*float64(progress.Load())/float64(total))) default: } } @@ -251,10 +251,10 @@ func WarmupTable(ctx context.Context, db kv.RoDB, bucket string) { } _ = g.Wait() } -func Warmup(ctx context.Context, db kv.RwDB, stList ...stages.SyncStage) error { +func Warmup(ctx context.Context, db kv.RwDB, lvl log.Lvl, stList ...stages.SyncStage) error { for _, st := range stList { for _, tbl := range Tables[st] { - WarmupTable(ctx, db, tbl) + WarmupTable(ctx, db, tbl, lvl) } } return nil @@ -279,7 +279,7 @@ func warmup(ctx context.Context, db kv.RoDB, bucket string) func() { wg.Add(1) go func() { defer wg.Done() - WarmupTable(ctx, db, bucket) + WarmupTable(ctx, db, bucket, log.LvlInfo) }() return func() { wg.Wait() } }