diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index 517706255..0d7151358 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -6,7 +6,6 @@ import ( "os" "runtime" "runtime/pprof" - "sync/atomic" "time" "github.com/ledgerwatch/turbo-geth/common" @@ -26,57 +25,6 @@ const ( logInterval = 30 // seconds ) -type progressLogger struct { - timer *time.Ticker - quit chan struct{} - interval int - batch ethdb.DbWithPendingMutations -} - -func NewProgressLogger(intervalInSeconds int, batch ethdb.DbWithPendingMutations) *progressLogger { - return &progressLogger{ - timer: time.NewTicker(time.Duration(intervalInSeconds) * time.Second), - quit: make(chan struct{}), - interval: intervalInSeconds, - batch: batch, - } -} - -func (l *progressLogger) Start(numberRef *uint64) { - go func() { - prev := atomic.LoadUint64(numberRef) - printFunc := func() { - now := atomic.LoadUint64(numberRef) - speed := float64(now-prev) / float64(l.interval) - var m runtime.MemStats - runtime.ReadMemStats(&m) - log.Info("Executed blocks:", - "currentBlock", now, - "speed (blk/second)", speed, - "state batch", common.StorageSize(l.batch.BatchSize()), - "alloc", common.StorageSize(m.Alloc), - "sys", common.StorageSize(m.Sys), - "numGC", int(m.NumGC)) - - prev = now - } - for { - select { - case <-l.timer.C: - printFunc() - case <-l.quit: - printFunc() - return - } - } - }() -} - -func (l *progressLogger) Stop() { - l.timer.Stop() - close(l.quit) -} - type HasChangeSetWriter interface { ChangeSetWriter() *state.ChangeSetWriter } @@ -110,23 +58,20 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig } } - stageProgress := s.BlockNumber - batch := stateDB.NewBatch() - progressLogger := NewProgressLogger(logInterval, batch) - progressLogger.Start(&stageProgress) - defer progressLogger.Stop() - engine := blockchain.Engine() vmConfig := blockchain.GetVMConfig() - for blockNum := atomic.LoadUint64(&stageProgress) + 1; blockNum <= to; blockNum++ { + stageProgress := s.BlockNumber + logTime, logBlock := time.Now(), stageProgress + + for blockNum := stageProgress + 1; blockNum <= to; blockNum++ { if err := common.Stopped(quit); err != nil { return err } - atomic.StoreUint64(&stageProgress, blockNum) + stageProgress = blockNum blockHash := rawdb.ReadCanonicalHash(stateDB, blockNum) block := rawdb.ReadBlock(stateDB, blockHash, blockNum) @@ -189,19 +134,40 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig changeSetHook(blockNum, hasChangeSet.ChangeSetWriter()) } } + + logTime, logBlock = logProgress(logTime, logBlock, blockNum, batch) } - if err := s.Update(batch, atomic.LoadUint64(&stageProgress)); err != nil { + if err := s.Update(batch, stageProgress); err != nil { return err } if _, err := batch.Commit(); err != nil { return fmt.Errorf("sync Execute: failed to write batch commit: %v", err) } - log.Info("Completed on", "block", atomic.LoadUint64(&stageProgress)) + log.Info("Completed on", "block", stageProgress) s.Done() return nil } +func logProgress(lastLogTime time.Time, prev, now uint64, batch ethdb.DbWithPendingMutations) (time.Time, uint64) { + if now%64 != 0 || time.Since(lastLogTime).Seconds() < logInterval { + return lastLogTime, prev // return old values because no logging happened + } + + speed := float64(now-prev) / float64(logInterval) + var m runtime.MemStats + runtime.ReadMemStats(&m) + log.Info("Executed blocks:", + "currentBlock", now, + "speed (blk/second)", speed, + "state batch", common.StorageSize(batch.BatchSize()), + "alloc", common.StorageSize(m.Alloc), + "sys", common.StorageSize(m.Sys), + "numGC", int(m.NumGC)) + + return time.Now(), now +} + func UnwindExecutionStage(u *UnwindState, s *StageState, stateDB ethdb.Database) error { if u.UnwindPoint >= s.BlockNumber { s.Done()