mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-01 00:31:21 +00:00
Reduce insertQueue usage, throttle Skeleton requests (#4185)
* Reduce insertQueue usage, throttle Skeleton requests * Cleanup Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
This commit is contained in:
parent
80c562bdd3
commit
ec08a284a6
@ -709,6 +709,7 @@ func HeadersPOW(
|
|||||||
var skeletonReqMin, skeletonReqMax, reqMin, reqMax uint64 // min and max block height for skeleton and non-skeleton requests
|
var skeletonReqMin, skeletonReqMax, reqMin, reqMax uint64 // min and max block height for skeleton and non-skeleton requests
|
||||||
var noProgressCounter int
|
var noProgressCounter int
|
||||||
var wasProgress bool
|
var wasProgress bool
|
||||||
|
var lastSkeletonTime time.Time
|
||||||
Loop:
|
Loop:
|
||||||
for !stopped {
|
for !stopped {
|
||||||
|
|
||||||
@ -764,17 +765,20 @@ Loop:
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Send skeleton request if required
|
// Send skeleton request if required
|
||||||
req = cfg.hd.RequestSkeleton()
|
if time.Since(lastSkeletonTime) > 1*time.Second {
|
||||||
if req != nil {
|
req = cfg.hd.RequestSkeleton()
|
||||||
_, sentToPeer = cfg.headerReqSend(ctx, req)
|
if req != nil {
|
||||||
if sentToPeer {
|
_, sentToPeer = cfg.headerReqSend(ctx, req)
|
||||||
log.Trace("Sent skeleton request", "height", req.Number)
|
if sentToPeer {
|
||||||
skeletonReqCount++
|
log.Trace("Sent skeleton request", "height", req.Number)
|
||||||
if skeletonReqMin == 0 || req.Number < skeletonReqMin {
|
skeletonReqCount++
|
||||||
skeletonReqMin = req.Number
|
if skeletonReqMin == 0 || req.Number < skeletonReqMin {
|
||||||
}
|
skeletonReqMin = req.Number
|
||||||
if req.Number+req.Length*req.Skip > skeletonReqMax {
|
}
|
||||||
skeletonReqMax = req.Number + req.Length*req.Skip
|
if req.Number+req.Length*req.Skip > skeletonReqMax {
|
||||||
|
skeletonReqMax = req.Number + req.Length*req.Skip
|
||||||
|
}
|
||||||
|
lastSkeletonTime = time.Now()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -501,7 +501,7 @@ func (hd *HeaderDownload) InsertHeaders(hf FeedHeaderFunc, terminalTotalDifficul
|
|||||||
// Make sure long insertions do not appear as a stuck stage 1
|
// Make sure long insertions do not appear as a stuck stage 1
|
||||||
select {
|
select {
|
||||||
case <-logChannel:
|
case <-logChannel:
|
||||||
log.Info(fmt.Sprintf("[%s] Inserting headers", logPrefix), "progress", hd.highestInDb)
|
log.Info(fmt.Sprintf("[%s] Inserting headers", logPrefix), "progress", hd.highestInDb, "queue", hd.insertQueue.Len())
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
td, err := hf(link.header, link.headerRaw, link.hash, link.blockHeight)
|
td, err := hf(link.header, link.headerRaw, link.hash, link.blockHeight)
|
||||||
@ -530,6 +530,11 @@ func (hd *HeaderDownload) InsertHeaders(hf FeedHeaderFunc, terminalTotalDifficul
|
|||||||
link.header = nil // Drop header reference to free memory, as we won't need it anymore
|
link.header = nil // Drop header reference to free memory, as we won't need it anymore
|
||||||
link.headerRaw = nil
|
link.headerRaw = nil
|
||||||
hd.moveLinkToQueue(link, PersistedQueueID)
|
hd.moveLinkToQueue(link, PersistedQueueID)
|
||||||
|
for _, nextLink := range link.next {
|
||||||
|
if !nextLink.persisted {
|
||||||
|
hd.moveLinkToQueue(nextLink, InsertQueueID)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for hd.persistedLinkQueue.Len() > hd.persistedLinkLimit {
|
for hd.persistedLinkQueue.Len() > hd.persistedLinkLimit {
|
||||||
link := heap.Pop(&hd.persistedLinkQueue).(*Link)
|
link := heap.Pop(&hd.persistedLinkQueue).(*Link)
|
||||||
@ -848,14 +853,6 @@ func (hi *HeaderInserter) BestHeaderChanged() bool {
|
|||||||
return hi.newCanonical
|
return hi.newCanonical
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hd *HeaderDownload) recursiveLinked(link *Link) {
|
|
||||||
for _, n := range link.next {
|
|
||||||
n.linked = true
|
|
||||||
hd.moveLinkToQueue(n, InsertQueueID)
|
|
||||||
hd.recursiveLinked(n)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (hd *HeaderDownload) ProcessHeaders(csHeaders []ChainSegmentHeader, newBlock bool, peerID [64]byte) bool {
|
func (hd *HeaderDownload) ProcessHeaders(csHeaders []ChainSegmentHeader, newBlock bool, peerID [64]byte) bool {
|
||||||
hd.lock.Lock()
|
hd.lock.Lock()
|
||||||
defer hd.lock.Unlock()
|
defer hd.lock.Unlock()
|
||||||
@ -900,10 +897,9 @@ func (hd *HeaderDownload) ProcessHeaders(csHeaders []ChainSegmentHeader, newBloc
|
|||||||
if foundParent {
|
if foundParent {
|
||||||
//fmt.Printf("sh = %d %x, found parent\n", sh.Number, sh.Hash)
|
//fmt.Printf("sh = %d %x, found parent\n", sh.Number, sh.Hash)
|
||||||
parent.next = append(parent.next, link)
|
parent.next = append(parent.next, link)
|
||||||
if parent.linked {
|
if parent.persisted {
|
||||||
link.linked = true
|
link.linked = true
|
||||||
hd.moveLinkToQueue(link, InsertQueueID)
|
hd.moveLinkToQueue(link, InsertQueueID)
|
||||||
hd.recursiveLinked(link)
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if sh.Number+params.FullImmutabilityThreshold < hd.highestInDb {
|
if sh.Number+params.FullImmutabilityThreshold < hd.highestInDb {
|
||||||
|
Loading…
Reference in New Issue
Block a user