mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-03 09:37:38 +00:00
mdbx_to_mdbx better logging (#6455)
This commit is contained in:
parent
36fc6a3950
commit
fc71d441a6
@ -443,6 +443,8 @@ func kv2kv(ctx context.Context, src, dst kv.RwDB) error {
|
||||
|
||||
commitEvery := time.NewTicker(5 * time.Minute)
|
||||
defer commitEvery.Stop()
|
||||
logEvery := time.NewTicker(20 * time.Second)
|
||||
defer logEvery.Stop()
|
||||
|
||||
var total uint64
|
||||
for name, b := range src.AllBuckets() {
|
||||
@ -450,7 +452,7 @@ func kv2kv(ctx context.Context, src, dst kv.RwDB) error {
|
||||
continue
|
||||
}
|
||||
|
||||
rawdbreset.WarmupTable(ctx, src, name)
|
||||
rawdbreset.WarmupTable(ctx, src, name, log.LvlTrace)
|
||||
_ = dstTx.ClearBucket(name)
|
||||
c, err := dstTx.RwCursor(name)
|
||||
if err != nil {
|
||||
@ -484,8 +486,9 @@ func kv2kv(ctx context.Context, src, dst kv.RwDB) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-commitEvery.C:
|
||||
case <-logEvery.C:
|
||||
log.Info("Progress", "bucket", name, "progress", fmt.Sprintf("%.1fm/%.1fm", float64(i)/1_000_000, float64(total)/1_000_000), "key", hex.EncodeToString(k))
|
||||
case <-commitEvery.C:
|
||||
if err2 := dstTx.Commit(); err2 != nil {
|
||||
return err2
|
||||
}
|
||||
|
@ -738,7 +738,7 @@ func stageTrie(db kv.RwDB, ctx context.Context) error {
|
||||
_, agg := allSnapshots(db)
|
||||
|
||||
if warmup {
|
||||
return reset2.Warmup(ctx, db, stages.IntermediateHashes)
|
||||
return reset2.Warmup(ctx, db, log.LvlInfo, stages.IntermediateHashes)
|
||||
}
|
||||
if reset {
|
||||
return reset2.Reset(ctx, db, stages.IntermediateHashes)
|
||||
@ -792,7 +792,7 @@ func stageHashState(db kv.RwDB, ctx context.Context) error {
|
||||
_, agg := allSnapshots(db)
|
||||
|
||||
if warmup {
|
||||
return reset2.Warmup(ctx, db, stages.HashState)
|
||||
return reset2.Warmup(ctx, db, log.LvlInfo, stages.HashState)
|
||||
}
|
||||
if reset {
|
||||
return reset2.Reset(ctx, db, stages.HashState)
|
||||
@ -847,7 +847,7 @@ func stageLogIndex(db kv.RwDB, ctx context.Context) error {
|
||||
_, _, sync, _, _ := newSync(ctx, db, nil)
|
||||
must(sync.SetCurrentStage(stages.LogIndex))
|
||||
if warmup {
|
||||
return reset2.Warmup(ctx, db, stages.LogIndex)
|
||||
return reset2.Warmup(ctx, db, log.LvlInfo, stages.LogIndex)
|
||||
}
|
||||
if reset {
|
||||
return reset2.Reset(ctx, db, stages.LogIndex)
|
||||
@ -903,7 +903,7 @@ func stageCallTraces(db kv.RwDB, ctx context.Context) error {
|
||||
must(sync.SetCurrentStage(stages.CallTraces))
|
||||
|
||||
if warmup {
|
||||
return reset2.Warmup(ctx, db, stages.CallTraces)
|
||||
return reset2.Warmup(ctx, db, log.LvlInfo, stages.CallTraces)
|
||||
}
|
||||
if reset {
|
||||
return reset2.Reset(ctx, db, stages.CallTraces)
|
||||
@ -966,7 +966,7 @@ func stageHistory(db kv.RwDB, ctx context.Context) error {
|
||||
must(sync.SetCurrentStage(stages.AccountHistoryIndex))
|
||||
|
||||
if warmup {
|
||||
return reset2.Warmup(ctx, db, stages.AccountHistoryIndex, stages.StorageHistoryIndex)
|
||||
return reset2.Warmup(ctx, db, log.LvlInfo, stages.AccountHistoryIndex, stages.StorageHistoryIndex)
|
||||
}
|
||||
if reset {
|
||||
return reset2.Reset(ctx, db, stages.AccountHistoryIndex, stages.StorageHistoryIndex)
|
||||
|
@ -208,8 +208,8 @@ var Tables = map[stages.SyncStage][]string{
|
||||
stages.Finish: {},
|
||||
}
|
||||
|
||||
func WarmupTable(ctx context.Context, db kv.RoDB, bucket string) {
|
||||
const ThreadsLimit = 1024
|
||||
func WarmupTable(ctx context.Context, db kv.RoDB, bucket string, lvl log.Lvl) {
|
||||
const ThreadsLimit = 256
|
||||
var total uint64
|
||||
db.View(ctx, func(tx kv.Tx) error {
|
||||
c, _ := tx.Cursor(bucket)
|
||||
@ -240,7 +240,7 @@ func WarmupTable(ctx context.Context, db kv.RoDB, bucket string) {
|
||||
}
|
||||
select {
|
||||
case <-logEvery.C:
|
||||
log.Info(fmt.Sprintf("Progress: %s %.2f%%", bucket, 100*float64(progress.Load())/float64(total)))
|
||||
log.Log(lvl, fmt.Sprintf("Progress: %s %.2f%%", bucket, 100*float64(progress.Load())/float64(total)))
|
||||
default:
|
||||
}
|
||||
}
|
||||
@ -251,10 +251,10 @@ func WarmupTable(ctx context.Context, db kv.RoDB, bucket string) {
|
||||
}
|
||||
_ = g.Wait()
|
||||
}
|
||||
func Warmup(ctx context.Context, db kv.RwDB, stList ...stages.SyncStage) error {
|
||||
func Warmup(ctx context.Context, db kv.RwDB, lvl log.Lvl, stList ...stages.SyncStage) error {
|
||||
for _, st := range stList {
|
||||
for _, tbl := range Tables[st] {
|
||||
WarmupTable(ctx, db, tbl)
|
||||
WarmupTable(ctx, db, tbl, lvl)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@ -279,7 +279,7 @@ func warmup(ctx context.Context, db kv.RoDB, bucket string) func() {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
WarmupTable(ctx, db, bucket)
|
||||
WarmupTable(ctx, db, bucket, log.LvlInfo)
|
||||
}()
|
||||
return func() { wg.Wait() }
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user