remove atomic from stage4 (#751)

This commit is contained in:
Alex Sharov 2020-07-17 06:59:03 +07:00 committed by GitHub
parent 5ead20e288
commit 086435117f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -6,7 +6,6 @@ import (
"os"
"runtime"
"runtime/pprof"
"sync/atomic"
"time"
"github.com/ledgerwatch/turbo-geth/common"
@ -26,57 +25,6 @@ const (
logInterval = 30 // seconds
)
type progressLogger struct {
timer *time.Ticker
quit chan struct{}
interval int
batch ethdb.DbWithPendingMutations
}
func NewProgressLogger(intervalInSeconds int, batch ethdb.DbWithPendingMutations) *progressLogger {
return &progressLogger{
timer: time.NewTicker(time.Duration(intervalInSeconds) * time.Second),
quit: make(chan struct{}),
interval: intervalInSeconds,
batch: batch,
}
}
func (l *progressLogger) Start(numberRef *uint64) {
go func() {
prev := atomic.LoadUint64(numberRef)
printFunc := func() {
now := atomic.LoadUint64(numberRef)
speed := float64(now-prev) / float64(l.interval)
var m runtime.MemStats
runtime.ReadMemStats(&m)
log.Info("Executed blocks:",
"currentBlock", now,
"speed (blk/second)", speed,
"state batch", common.StorageSize(l.batch.BatchSize()),
"alloc", common.StorageSize(m.Alloc),
"sys", common.StorageSize(m.Sys),
"numGC", int(m.NumGC))
prev = now
}
for {
select {
case <-l.timer.C:
printFunc()
case <-l.quit:
printFunc()
return
}
}
}()
}
func (l *progressLogger) Stop() {
l.timer.Stop()
close(l.quit)
}
type HasChangeSetWriter interface {
ChangeSetWriter() *state.ChangeSetWriter
}
@ -110,23 +58,20 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
}
}
stageProgress := s.BlockNumber
batch := stateDB.NewBatch()
progressLogger := NewProgressLogger(logInterval, batch)
progressLogger.Start(&stageProgress)
defer progressLogger.Stop()
engine := blockchain.Engine()
vmConfig := blockchain.GetVMConfig()
for blockNum := atomic.LoadUint64(&stageProgress) + 1; blockNum <= to; blockNum++ {
stageProgress := s.BlockNumber
logTime, logBlock := time.Now(), stageProgress
for blockNum := stageProgress + 1; blockNum <= to; blockNum++ {
if err := common.Stopped(quit); err != nil {
return err
}
atomic.StoreUint64(&stageProgress, blockNum)
stageProgress = blockNum
blockHash := rawdb.ReadCanonicalHash(stateDB, blockNum)
block := rawdb.ReadBlock(stateDB, blockHash, blockNum)
@ -189,19 +134,40 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
changeSetHook(blockNum, hasChangeSet.ChangeSetWriter())
}
}
logTime, logBlock = logProgress(logTime, logBlock, blockNum, batch)
}
if err := s.Update(batch, atomic.LoadUint64(&stageProgress)); err != nil {
if err := s.Update(batch, stageProgress); err != nil {
return err
}
if _, err := batch.Commit(); err != nil {
return fmt.Errorf("sync Execute: failed to write batch commit: %v", err)
}
log.Info("Completed on", "block", atomic.LoadUint64(&stageProgress))
log.Info("Completed on", "block", stageProgress)
s.Done()
return nil
}
func logProgress(lastLogTime time.Time, prev, now uint64, batch ethdb.DbWithPendingMutations) (time.Time, uint64) {
if now%64 != 0 || time.Since(lastLogTime).Seconds() < logInterval {
return lastLogTime, prev // return old values because no logging happened
}
speed := float64(now-prev) / float64(logInterval)
var m runtime.MemStats
runtime.ReadMemStats(&m)
log.Info("Executed blocks:",
"currentBlock", now,
"speed (blk/second)", speed,
"state batch", common.StorageSize(batch.BatchSize()),
"alloc", common.StorageSize(m.Alloc),
"sys", common.StorageSize(m.Sys),
"numGC", int(m.NumGC))
return time.Now(), now
}
func UnwindExecutionStage(u *UnwindState, s *StageState, stateDB ethdb.Database) error {
if u.UnwindPoint >= s.BlockNumber {
s.Done()