Set highest and highest correctly (#2002)

* Set highest and highest correctly

* Remove batch from headers_new

* Fix for initial cycle

* Better naming

* Don't ruin progress on interruption

* Only print Processed message when not interrupted

Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro.local>
This commit is contained in:
ledgerwatch 2021-05-24 15:35:56 +01:00 committed by GitHub
parent 244ae7adb3
commit 7e932a420b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 28 additions and 59 deletions

View File

@ -91,8 +91,6 @@ func HeadersForward(
}
log.Info(fmt.Sprintf("[%s] Waiting for headers...", logPrefix), "from", headerProgress)
batch := ethdb.NewBatch(tx)
defer batch.Rollback()
logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()
@ -101,7 +99,7 @@ func HeadersForward(
return err
}
headerInserter := headerdownload.NewHeaderInserter(logPrefix, localTd, headerProgress)
cfg.hd.SetHeaderReader(&chainReader{config: &cfg.chainConfig, batch: batch})
cfg.hd.SetHeaderReader(&chainReader{config: &cfg.chainConfig, tx: tx})
var peer []byte
stopped := false
@ -142,38 +140,22 @@ func HeadersForward(
}
// Load headers into the database
var inSync bool
if inSync, err = cfg.hd.InsertHeaders(headerInserter.FeedHeaderFunc(batch), logPrefix, logEvery.C); err != nil {
if inSync, err = cfg.hd.InsertHeaders(headerInserter.FeedHeaderFunc(tx), logPrefix, logEvery.C); err != nil {
return err
}
if batch.BatchSize() >= int(cfg.batchSize) {
if err = batch.Commit(); err != nil {
return err
}
if !useExternalTx {
if err = s.Update(tx, headerInserter.GetHighest()); err != nil {
return err
}
if err = tx.Commit(); err != nil {
return err
}
tx, err = cfg.db.BeginRw(ctx)
if err != nil {
return err
}
}
batch = ethdb.NewBatch(tx)
cfg.hd.SetHeaderReader(&chainReader{config: &cfg.chainConfig, batch: batch})
}
announces := cfg.hd.GrabAnnounces()
if len(announces) > 0 {
cfg.announceNewHashes(ctx, announces)
}
if !initialCycle && headerInserter.AnythingDone() {
// if this is not an initial cycle, we need to react quickly when new headers are coming in
break
}
if initialCycle && inSync {
break
if headerInserter.BestHeaderChanged() { // We do not break unless there best header changed
if !initialCycle {
// if this is not an initial cycle, we need to react quickly when new headers are coming in
break
}
// if this is initial cycle, we want to make sure we insert all known headers (inSync)
if inSync {
break
}
}
timer := time.NewTimer(1 * time.Second)
select {
@ -181,7 +163,7 @@ func HeadersForward(
stopped = true
case <-logEvery.C:
progress := cfg.hd.Progress()
logProgressHeaders(logPrefix, prevProgress, progress, batch)
logProgressHeaders(logPrefix, prevProgress, progress)
prevProgress = progress
case <-timer.C:
log.Trace("RequestQueueTime (header) ticked")
@ -190,36 +172,29 @@ func HeadersForward(
}
timer.Stop()
}
if headerInserter.AnythingDone() {
if err := s.Update(batch, headerInserter.GetHighest()); err != nil {
return err
}
}
if headerInserter.UnwindPoint() < headerProgress {
if err := u.UnwindTo(headerInserter.UnwindPoint(), batch); err != nil {
if err := u.UnwindTo(headerInserter.UnwindPoint(), tx); err != nil {
return fmt.Errorf("%s: failed to unwind to %d: %w", logPrefix, headerInserter.UnwindPoint(), err)
}
} else {
if err := fixCanonicalChain(logPrefix, headerInserter.GetHighest(), headerInserter.GetHighestHash(), batch); err != nil {
if err := fixCanonicalChain(logPrefix, headerInserter.GetHighest(), headerInserter.GetHighestHash(), tx); err != nil {
return fmt.Errorf("%s: failed to fix canonical chain: %w", logPrefix, err)
}
if !stopped {
s.Done()
}
}
if err := batch.Commit(); err != nil {
return fmt.Errorf("%s: failed to write batch commit: %v", logPrefix, err)
}
if !useExternalTx {
if err := tx.Commit(); err != nil {
return err
}
}
log.Info(fmt.Sprintf("[%s] Processed", logPrefix), "highest", headerInserter.GetHighest(), "age", common.PrettyAge(time.Unix(int64(headerInserter.GetHighestTimestamp()), 0)))
if stopped {
return common.ErrStopped
}
stageHeadersGauge.Update(int64(headerInserter.GetHighest()))
// We do not print the followin line if the stage was interrupted
log.Info(fmt.Sprintf("[%s] Processed", logPrefix), "highest inserted", headerInserter.GetHighest(), "age", common.PrettyAge(time.Unix(int64(headerInserter.GetHighestTimestamp()), 0)))
stageHeadersGauge.Update(int64(cfg.hd.Progress()))
return nil
}
@ -282,14 +257,13 @@ func HeadersUnwind(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg HeadersCfg)
return nil
}
func logProgressHeaders(logPrefix string, prev, now uint64, batch ethdb.DbWithPendingMutations) uint64 {
func logProgressHeaders(logPrefix string, prev, now uint64) uint64 {
speed := float64(now-prev) / float64(logInterval/time.Second)
var m runtime.MemStats
runtime.ReadMemStats(&m)
log.Info(fmt.Sprintf("[%s] Wrote block headers", logPrefix),
"number", now,
"blk/second", speed,
"batch", common.StorageSize(batch.BatchSize()),
"alloc", common.StorageSize(m.Alloc),
"sys", common.StorageSize(m.Sys),
"numGC", int(m.NumGC))
@ -299,15 +273,15 @@ func logProgressHeaders(logPrefix string, prev, now uint64, batch ethdb.DbWithPe
type chainReader struct {
config *params.ChainConfig
batch ethdb.DbWithPendingMutations
tx ethdb.RwTx
}
func (cr chainReader) Config() *params.ChainConfig { return cr.config }
func (cr chainReader) CurrentHeader() *types.Header { panic("") }
func (cr chainReader) GetHeader(hash common.Hash, number uint64) *types.Header {
return rawdb.ReadHeader(cr.batch, hash, number)
return rawdb.ReadHeader(cr.tx, hash, number)
}
func (cr chainReader) GetHeaderByNumber(number uint64) *types.Header {
return rawdb.ReadHeaderByNumber(cr.batch, number)
return rawdb.ReadHeaderByNumber(cr.tx, number)
}
func (cr chainReader) GetHeaderByHash(hash common.Hash) *types.Header { panic("") }

View File

@ -735,10 +735,12 @@ func (hi *HeaderInserter) FeedHeader(db ethdb.StatelessRwTx, header *types.Heade
if err = rawdb.WriteHeadHeaderHash(db, hash); err != nil {
return fmt.Errorf("[%s] marking head header hash as %x: %w", hi.logPrefix, hash, err)
}
hi.headerProgress = blockHeight
if err = stages.SaveStageProgress(db, stages.Headers, blockHeight); err != nil {
return fmt.Errorf("[%s] saving Headers progress: %w", hi.logPrefix, err)
}
hi.highest = blockHeight
hi.highestHash = hash
hi.highestTimestamp = header.Time
// See if the forking point affects the unwindPoint (the block number to which other stages will need to unwind before the new canonical chain is applied)
if forkingPoint < hi.unwindPoint {
hi.unwindPoint = forkingPoint
@ -757,11 +759,6 @@ func (hi *HeaderInserter) FeedHeader(db ethdb.StatelessRwTx, header *types.Heade
return fmt.Errorf("[%s] failed to store header: %w", hi.logPrefix, err)
}
hi.prevHash = hash
if blockHeight > hi.highest {
hi.highest = blockHeight
hi.highestHash = hash
hi.highestTimestamp = header.Time
}
return nil
}
@ -781,7 +778,7 @@ func (hi *HeaderInserter) UnwindPoint() uint64 {
return hi.unwindPoint
}
func (hi *HeaderInserter) AnythingDone() bool {
func (hi *HeaderInserter) BestHeaderChanged() bool {
return hi.newCanonical
}

View File

@ -261,15 +261,13 @@ type HeaderInserter struct {
highestHash common.Hash
highestTimestamp uint64
localTd *big.Int
headerProgress uint64
}
func NewHeaderInserter(logPrefix string, localTd *big.Int, headerProgress uint64) *HeaderInserter {
return &HeaderInserter{
logPrefix: logPrefix,
localTd: localTd,
headerProgress: headerProgress,
unwindPoint: headerProgress,
logPrefix: logPrefix,
localTd: localTd,
unwindPoint: headerProgress,
}
}