mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2025-01-12 04:30:04 +00:00
Reclaims leakybucket resources in sync service (#6339)
* Reclaims leakybucket resources * move calls to defer * do not reuse queue, after stopping
This commit is contained in:
parent
d4545233cd
commit
6f8349cdb4
@ -132,6 +132,12 @@ func (f *blocksFetcher) start() error {
|
||||
|
||||
// stop terminates all fetcher operations.
|
||||
func (f *blocksFetcher) stop() {
|
||||
defer func() {
|
||||
if f.rateLimiter != nil {
|
||||
f.rateLimiter.Free()
|
||||
f.rateLimiter = nil
|
||||
}
|
||||
}()
|
||||
f.cancel()
|
||||
<-f.quit // make sure that loop() is done
|
||||
}
|
||||
|
@ -80,6 +80,10 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
|
||||
// mitigation. We are already convinced that we are on the correct finalized chain. Any blocks
|
||||
// we receive there after must build on the finalized chain or be considered invalid during
|
||||
// fork choice resolution / block processing.
|
||||
blocksFetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
|
||||
p2p: s.p2p,
|
||||
headFetcher: s.chain,
|
||||
})
|
||||
_, _, pids := s.p2p.Peers().BestFinalized(1 /* maxPeers */, s.highestFinalizedEpoch())
|
||||
for len(pids) == 0 {
|
||||
log.Info("Waiting for a suitable peer before syncing to the head of the chain")
|
||||
@ -90,7 +94,7 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
|
||||
|
||||
for head := helpers.SlotsSince(genesis); s.chain.HeadSlot() < head; {
|
||||
count := mathutil.Min(
|
||||
helpers.SlotsSince(genesis)-s.chain.HeadSlot()+1, queue.blocksFetcher.blocksPerSecond)
|
||||
helpers.SlotsSince(genesis)-s.chain.HeadSlot()+1, blocksFetcher.blocksPerSecond)
|
||||
req := &p2ppb.BeaconBlocksByRangeRequest{
|
||||
StartSlot: s.chain.HeadSlot() + 1,
|
||||
Count: count,
|
||||
@ -100,7 +104,7 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
|
||||
"req": req,
|
||||
"peer": best.Pretty(),
|
||||
}).Debug("Sending batch block request")
|
||||
resp, err := queue.blocksFetcher.requestBlocks(ctx, req, best)
|
||||
resp, err := blocksFetcher.requestBlocks(ctx, req, best)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to receive blocks, exiting init sync")
|
||||
return nil
|
||||
|
@ -159,6 +159,12 @@ func (r *Service) Start() {
|
||||
|
||||
// Stop the regular sync service.
|
||||
func (r *Service) Stop() error {
|
||||
defer func() {
|
||||
if r.blocksRateLimiter != nil {
|
||||
r.blocksRateLimiter.Free()
|
||||
r.blocksRateLimiter = nil
|
||||
}
|
||||
}()
|
||||
defer r.cancel()
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user