Pruning changesets and less noise in the logs (#1999)

* Pruning changesets and less noise in the logs

* Print effective storage mode

* Fix typo

* Fixes

Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro.local>
This commit is contained in:
ledgerwatch 2021-05-23 19:07:15 +01:00 committed by GitHub
parent 7d98b6504a
commit 70e3a1f628
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 93 additions and 16 deletions

View File

@ -217,6 +217,7 @@ func New(stack *node.Node, config *ethconfig.Config, gitCommit string) (*Ethereu
} else {
config.StorageMode = sm
}
log.Info("Effective", "storage mode", config.StorageMode)
if err = stagedsync.UpdateMetrics(chainDb); err != nil {
return nil, err

View File

@ -81,12 +81,14 @@ func BodiesForward(
if err != nil {
return err
}
if headerProgress-bodyProgress <= 16 {
logPrefix := s.LogPrefix()
if headerProgress <= bodyProgress+16 {
// When processing small number of blocks, we can afford wasting more bandwidth but get blocks quicker
timeout = 1
} else {
// Do not print logs for short periods
log.Info(fmt.Sprintf("[%s] Processing bodies...", logPrefix), "from", bodyProgress, "to", headerProgress)
}
logPrefix := s.LogPrefix()
log.Info(fmt.Sprintf("[%s] Processing bodies...", logPrefix), "from", bodyProgress, "to", headerProgress)
logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()
var prevDeliveredCount float64 = 0

View File

@ -184,6 +184,16 @@ func promoteCallTraces(logPrefix string, tx ethdb.RwTx, startBlock, endBlock uin
if endBlock-blockNum <= params.FullImmutabilityThreshold {
break
}
select {
default:
case <-logEvery.C:
var m runtime.MemStats
runtime.ReadMemStats(&m)
log.Info(fmt.Sprintf("[%s] Pruning call trace intermediate table", logPrefix), "number", blockNum,
"alloc", common.StorageSize(m.Alloc),
"sys", common.StorageSize(m.Sys),
"numGC", int(m.NumGC))
}
if err = traceCursor.DeleteCurrent(); err != nil {
return fmt.Errorf("%s: failed to remove trace call set for block %d: %v", logPrefix, blockNum, err)
}
@ -197,8 +207,8 @@ func promoteCallTraces(logPrefix string, tx ethdb.RwTx, startBlock, endBlock uin
if err != nil {
return fmt.Errorf("%s: failed to move cleanup cursor: %w", logPrefix, err)
}
if prunedMax != 0 {
log.Info(fmt.Sprintf("[%s] Pruned trace call index", logPrefix), "from", prunedMin, "to", prunedMax)
if prunedMax != 0 && prunedMax > prunedMin+16 {
log.Info(fmt.Sprintf("[%s] Pruned call trace intermediate table", logPrefix), "from", prunedMin, "to", prunedMax)
}
if err := finaliseCallTraces(collectorFrom, collectorTo, logPrefix, tx, quit); err != nil {
return fmt.Errorf("[%s] %w", logPrefix, err)

View File

@ -5,6 +5,7 @@ import (
"encoding/binary"
"errors"
"fmt"
"math"
"runtime"
"sort"
"time"
@ -233,7 +234,9 @@ func SpawnExecuteBlocksStage(s *StageState, tx ethdb.RwTx, toBlock uint64, quit
return nil
}
logPrefix := s.state.LogPrefix()
log.Info(fmt.Sprintf("[%s] Blocks execution", logPrefix), "from", s.BlockNumber, "to", to)
if to > s.BlockNumber+16 {
log.Info(fmt.Sprintf("[%s] Blocks execution", logPrefix), "from", s.BlockNumber, "to", to)
}
var traceCursor ethdb.RwCursorDupSort
if cfg.writeCallTraces {
@ -346,6 +349,16 @@ func SpawnExecuteBlocksStage(s *StageState, tx ethdb.RwTx, toBlock uint64, quit
return fmt.Errorf("%s: failed to write batch commit: %v", logPrefix, err)
}
}
// Prune changesets if needed
if cfg.pruningDistance > 0 {
if err := pruneChangeSets(tx, logPrefix, "account changesets", dbutils.AccountChangeSetBucket, to, cfg.pruningDistance, logEvery.C); err != nil {
return err
}
if err := pruneChangeSets(tx, logPrefix, "storage changesets", dbutils.StorageChangeSetBucket, to, cfg.pruningDistance, logEvery.C); err != nil {
return err
}
}
if !useExternalTx {
if traceCursor != nil {
traceCursor.Close()
@ -360,6 +373,49 @@ func SpawnExecuteBlocksStage(s *StageState, tx ethdb.RwTx, toBlock uint64, quit
return stoppedErr
}
func pruneChangeSets(tx ethdb.RwTx, logPrefix string, name string, tableName string, endBlock uint64, pruningDistance uint64, logChannel <-chan time.Time) error {
changeSetCursor, err := tx.RwCursorDupSort(tableName)
if err != nil {
return fmt.Errorf("%s: failed to create cursor for pruning %s: %v", logPrefix, name, err)
}
var prunedMin uint64 = math.MaxUint64
var prunedMax uint64 = 0
var k []byte
for k, _, err = changeSetCursor.First(); k != nil && err == nil; k, _, err = changeSetCursor.Next() {
blockNum := binary.BigEndian.Uint64(k)
if endBlock-blockNum <= pruningDistance {
break
}
select {
default:
case <-logChannel:
var m runtime.MemStats
runtime.ReadMemStats(&m)
log.Info(fmt.Sprintf("[%s] Pruning", logPrefix), "table", tableName, "number", blockNum,
"alloc", common.StorageSize(m.Alloc),
"sys", common.StorageSize(m.Sys),
"numGC", int(m.NumGC))
}
if err = changeSetCursor.DeleteCurrent(); err != nil {
return fmt.Errorf("%s: failed to remove %s for block %d: %v", logPrefix, name, blockNum, err)
}
if blockNum < prunedMin {
prunedMin = blockNum
}
if blockNum > prunedMax {
prunedMax = blockNum
}
}
if err != nil {
return fmt.Errorf("%s: failed to move %s cleanup cursor: %w", logPrefix, tableName, err)
}
if prunedMax != 0 && prunedMax > prunedMin+16 {
log.Info(fmt.Sprintf("[%s] Pruned", logPrefix), "table", tableName, "from", prunedMin, "to", prunedMax)
}
return nil
}
func logProgress(logPrefix string, prevBlock uint64, prevTime time.Time, currentBlock uint64, batch ethdb.DbWithPendingMutations) (uint64, time.Time) {
currentTime := time.Now()
interval := currentTime.Sub(prevTime)

View File

@ -94,7 +94,7 @@ func NotifyNewHeaders(ctx context.Context, finishStageBeforeSync, unwindTo uint6
return err
}
notifyFrom := finishStageBeforeSync + 1
if unwindTo < finishStageBeforeSync {
if unwindTo != 0 && unwindTo < finishStageBeforeSync {
notifyFrom = unwindTo + 1
}
if notifier == nil {

View File

@ -54,7 +54,9 @@ func SpawnHashStateStage(s *StageState, tx ethdb.RwTx, cfg HashStateCfg, quit <-
return fmt.Errorf("hashstate: promotion backwards from %d to %d", s.BlockNumber, to)
}
log.Info(fmt.Sprintf("[%s] Promoting plain state", logPrefix), "from", s.BlockNumber, "to", to)
if to > s.BlockNumber+16 {
log.Info(fmt.Sprintf("[%s] Promoting plain state", logPrefix), "from", s.BlockNumber, "to", to)
}
if s.BlockNumber == 0 { // Initial hashing of the state is performed at the previous stage
if err := PromoteHashedStateCleanly(logPrefix, tx, cfg, quit); err != nil {
return fmt.Errorf("[%s] %w", logPrefix, err)
@ -409,7 +411,9 @@ func (p *Promoter) Promote(logPrefix string, s *StageState, from, to uint64, sto
} else {
changeSetBucket = dbutils.AccountChangeSetBucket
}
log.Info(fmt.Sprintf("[%s] Incremental promotion started", logPrefix), "from", from, "to", to, "codes", codes, "csbucket", changeSetBucket)
if to > from+16 {
log.Info(fmt.Sprintf("[%s] Incremental promotion started", logPrefix), "from", from, "to", to, "codes", codes, "csbucket", changeSetBucket)
}
startkey := dbutils.EncodeBlockNumber(from + 1)

View File

@ -90,7 +90,7 @@ func HeadersForward(
return nil
}
log.Info(fmt.Sprintf("[%s] Processing headers...", logPrefix), "from", headerProgress)
log.Info(fmt.Sprintf("[%s] Waiting for headers...", logPrefix), "from", headerProgress)
batch := ethdb.NewBatch(tx)
defer batch.Rollback()
logEvery := time.NewTicker(logInterval)

View File

@ -70,7 +70,9 @@ func SpawnIntermediateHashesStage(s *StageState, u Unwinder, tx ethdb.RwTx, cfg
}
logPrefix := s.state.LogPrefix()
log.Info(fmt.Sprintf("[%s] Generating intermediate hashes", logPrefix), "from", s.BlockNumber, "to", to)
if to > s.BlockNumber+16 {
log.Info(fmt.Sprintf("[%s] Generating intermediate hashes", logPrefix), "from", s.BlockNumber, "to", to)
}
var root common.Hash
if s.BlockNumber == 0 {
if root, err = RegenerateIntermediateHashes(logPrefix, tx, cfg, expectedRootHash, quit); err != nil {

View File

@ -73,7 +73,9 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, tx ethdb.RwTx, toBl
return nil
}
logPrefix := s.state.LogPrefix()
log.Info(fmt.Sprintf("[%s] Started", logPrefix), "from", s.BlockNumber, "to", to)
if to > s.BlockNumber+16 {
log.Info(fmt.Sprintf("[%s] Started", logPrefix), "from", s.BlockNumber, "to", to)
}
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()
@ -108,7 +110,7 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, tx ethdb.RwTx, toBl
log.Info(fmt.Sprintf("[%s] Preload headedrs", logPrefix), "block_number", binary.BigEndian.Uint64(k))
}
}
log.Info(fmt.Sprintf("[%s] Read canonical hashes", logPrefix), "amount", len(canonical))
log.Debug(fmt.Sprintf("[%s] Read canonical hashes", logPrefix), "amount", len(canonical))
jobs := make(chan *senderRecoveryJob, cfg.batchSize)
out := make(chan *senderRecoveryJob, cfg.batchSize)

View File

@ -116,7 +116,7 @@ func incrementalTxPoolUpdate(logPrefix string, from, to uint64, pool *core.TxPoo
currentHeaderIdx++
}
log.Info(fmt.Sprintf("[%s] Read canonical hashes", logPrefix), "hashes", len(canonical))
log.Debug(fmt.Sprintf("[%s] Read canonical hashes", logPrefix), "hashes", len(canonical))
bodies, err := tx.Cursor(dbutils.BlockBodyPrefix)
if err != nil {
return err
@ -212,7 +212,7 @@ func unwindTxPoolUpdate(logPrefix string, from, to uint64, pool *core.TxPool, tx
copy(canonical[blockNumber-from-1][:], v)
}
log.Info(fmt.Sprintf("[%s] Read canonical hashes", logPrefix), "hashes", len(canonical))
log.Debug(fmt.Sprintf("[%s] Read canonical hashes", logPrefix), "hashes", len(canonical))
senders := make([][]common.Address, to-from+1)
sendersC, err := tx.Cursor(dbutils.Senders)
if err != nil {

View File

@ -199,7 +199,7 @@ func (s *State) Run(db ethdb.GetterPutter, tx ethdb.RwTx) error {
logPrefix, stage.DisabledDescription,
)
log.Info(message)
log.Debug(message)
s.NextStage()
continue