sync: fix a memory leak when header verification fails (#8431)

If HeaderDownload.VerifyHeader always returns false, the memory usage
grows at a fast pace
due to Link objects (containing headers) not deallocated even after the
link queue pruning.
This commit is contained in:
battlmonstr 2023-10-14 03:39:43 +02:00 committed by GitHub
parent c1e99219d0
commit 757a91c44d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 66 additions and 33 deletions

View File

@ -869,7 +869,7 @@ func (c *Bor) snapshot(chain consensus.ChainHeaderReader, number uint64, hash li
return nil, err
}
c.logger.Info("Stored proposer snapshot to disk", "number", snap.Number, "hash", snap.Hash)
c.logger.Trace("Stored proposer snapshot to disk", "number", snap.Number, "hash", snap.Hash)
}
return snap, err
@ -1428,7 +1428,7 @@ func (c *Bor) getSpanForBlock(blockNum uint64) (*span.HeimdallSpan, error) {
return nil, fmt.Errorf("span with given block number is not loaded: %d", spanID)
}
c.logger.Info("Span with given block number is not loaded", "fetching span", spanID)
c.logger.Debug("Span with given block number is not loaded", "fetching span", spanID)
response, err := c.HeimdallClient.Span(c.execCtx, spanID)
if err != nil {

View File

@ -272,8 +272,8 @@ Loop:
stopped = true
case <-logEvery.C:
progress := cfg.hd.Progress()
logProgressHeaders(logPrefix, prevProgress, progress, logger)
stats := cfg.hd.ExtractStats()
logProgressHeaders(logPrefix, prevProgress, progress, stats, logger)
if prevProgress == progress {
noProgressCounter++
} else {
@ -474,20 +474,32 @@ func HeadersUnwind(u *UnwindState, s *StageState, tx kv.RwTx, cfg HeadersCfg, te
return nil
}
func logProgressHeaders(logPrefix string, prev, now uint64, logger log.Logger) uint64 {
func logProgressHeaders(
logPrefix string,
prev uint64,
now uint64,
stats headerdownload.Stats,
logger log.Logger,
) uint64 {
speed := float64(now-prev) / float64(logInterval/time.Second)
var message string
if speed == 0 {
logger.Info(fmt.Sprintf("[%s] No block headers to write in this log period", logPrefix), "block number", now)
return now
message = "No block headers to write in this log period"
} else {
message = "Wrote block headers"
}
var m runtime.MemStats
dbg.ReadMemStats(&m)
logger.Info(fmt.Sprintf("[%s] Wrote block headers", logPrefix),
logger.Info(fmt.Sprintf("[%s] %s", logPrefix, message),
"number", now,
"blk/second", speed,
"alloc", libcommon.ByteCount(m.Alloc),
"sys", libcommon.ByteCount(m.Sys))
"sys", libcommon.ByteCount(m.Sys),
"invalidHeaders", stats.InvalidHeaders,
"rejectedBadHeaders", stats.RejectedBadHeaders,
)
return now
}

View File

@ -101,6 +101,7 @@ func (hd *HeaderDownload) SingleHeaderAsSegment(headerRaw []byte, header *types.
headerHash := types.RawRlpHash(headerRaw)
if _, bad := hd.badHeaders[headerHash]; bad {
hd.stats.RejectedBadHeaders++
hd.logger.Warn("[downloader] Rejected header marked as bad", "hash", headerHash, "height", header.Number.Uint64())
return nil, BadBlockPenalty, nil
}
@ -172,9 +173,10 @@ func (hd *HeaderDownload) removeUpwards(link *Link) {
toRemove = toRemove[:len(toRemove)-1]
delete(hd.links, removal.hash)
hd.moveLinkToQueue(removal, NoQueue)
for child := removal.fChild; child != nil; child, child.next = child.next, nil {
for child := removal.fChild; child != nil; child = child.next {
toRemove = append(toRemove, child)
}
removal.ClearChildren()
}
}
@ -207,29 +209,12 @@ func (hd *HeaderDownload) pruneLinkQueue() {
for hd.linkQueue.Len() > hd.linkLimit {
link := heap.Pop(&hd.linkQueue).(*Link)
delete(hd.links, link.hash)
for child := link.fChild; child != nil; child, child.next = child.next, nil {
}
link.ClearChildren()
if parentLink, ok := hd.links[link.header.ParentHash]; ok {
var prevChild *Link
for child := parentLink.fChild; child != nil && child != link; child = child.next {
prevChild = child
}
if prevChild == nil {
parentLink.fChild = link.next
} else {
prevChild.next = link.next
}
parentLink.RemoveChild(link)
}
if anchor, ok := hd.anchors[link.header.ParentHash]; ok {
var prevChild *Link
for child := anchor.fLink; child != nil && child != link; child = child.next {
prevChild = child
}
if prevChild == nil {
anchor.fLink = link.next
} else {
prevChild.next = link.next
}
anchor.RemoveChild(link)
if anchor.fLink == nil {
hd.removeAnchor(anchor)
}
@ -327,8 +312,7 @@ func (hd *HeaderDownload) RecoverFromDb(db kv.RoDB) error {
for hd.persistedLinkQueue.Len() > 0 {
link := heap.Pop(&hd.persistedLinkQueue).(*Link)
delete(hd.links, link.hash)
for child := link.fChild; child != nil; child, child.next = child.next, nil {
}
link.ClearChildren()
}
err := db.View(context.Background(), func(tx kv.Tx) error {
c, err := tx.Cursor(kv.Headers)
@ -538,6 +522,7 @@ func (hd *HeaderDownload) InsertHeader(hf FeedHeaderFunc, terminalTotalDifficult
delete(hd.links, link.hash)
hd.removeUpwards(link)
dataflow.HeaderDownloadStates.AddChange(link.blockHeight, dataflow.HeaderBad)
hd.stats.RejectedBadHeaders++
hd.logger.Warn("[downloader] Rejected header marked as bad", "hash", link.hash, "height", link.blockHeight)
return true, false, 0, lastTime, nil
}
@ -553,7 +538,11 @@ func (hd *HeaderDownload) InsertHeader(hf FeedHeaderFunc, terminalTotalDifficult
hd.moveLinkToQueue(link, NoQueue)
delete(hd.links, link.hash)
hd.removeUpwards(link)
if parentLink, ok := hd.links[link.header.ParentHash]; ok {
parentLink.RemoveChild(link)
}
dataflow.HeaderDownloadStates.AddChange(link.blockHeight, dataflow.HeaderEvicted)
hd.stats.InvalidHeaders++
return true, false, 0, lastTime, nil
}
}
@ -613,8 +602,7 @@ func (hd *HeaderDownload) InsertHeader(hf FeedHeaderFunc, terminalTotalDifficult
link := heap.Pop(&hd.persistedLinkQueue).(*Link)
dataflow.HeaderDownloadStates.AddChange(link.blockHeight, dataflow.HeaderEvicted)
delete(hd.links, link.hash)
for child := link.fChild; child != nil; child, child.next = child.next, nil {
}
link.ClearChildren()
}
var blocksToTTD uint64
if terminalTotalDifficulty != nil && returnTd != nil && lastD != nil {
@ -1076,6 +1064,8 @@ func (hd *HeaderDownload) ProcessHeaders(csHeaders []ChainSegmentHeader, newBloc
}
func (hd *HeaderDownload) ExtractStats() Stats {
hd.lock.RLock()
defer hd.lock.RUnlock()
s := hd.stats
hd.stats = Stats{}
return s

View File

@ -47,6 +47,23 @@ type Link struct {
peerId [64]byte
}
func (link *Link) ClearChildren() {
for child := link.fChild; child != nil; child, child.next = child.next, nil {
}
}
func (parentLink *Link) RemoveChild(link *Link) {
var prevChild *Link
for child := parentLink.fChild; child != nil && child != link; child = child.next {
prevChild = child
}
if prevChild == nil {
parentLink.fChild = link.next
} else {
prevChild.next = link.next
}
}
// LinkQueue is the priority queue of links. It is instantiated once for persistent links, and once for non-persistent links
// In other instances, it is used to limit number of links of corresponding type (persistent and non-persistent) in memory
type LinkQueue []*Link
@ -112,6 +129,18 @@ type Anchor struct {
timeouts int // Number of timeout that this anchor has experiences - after certain threshold, it gets invalidated
}
func (anchor *Anchor) RemoveChild(link *Link) {
var prevChild *Link
for child := anchor.fLink; child != nil && child != link; child = child.next {
prevChild = child
}
if prevChild == nil {
anchor.fLink = link.next
} else {
prevChild.next = link.next
}
}
type ChainSegmentHeader struct {
HeaderRaw rlp.RawValue
Header *types.Header
@ -224,6 +253,8 @@ type Stats struct {
SkeletonReqMaxBlock uint64
RespMinBlock uint64
RespMaxBlock uint64
InvalidHeaders int
RejectedBadHeaders int
}
type HeaderDownload struct {