prysm-pulse/beacon-chain/cache/skip_slot_cache.go
Preston Van Loon 27254ad362 Use a better skip slots cache with a lock around it for identical parallel ProcessSlots requests (#4597)
* Use a better skip slots cache with a lock around it for common requests
* Merge refs/heads/master into better-skip-slots-cache
* add test
* Merge branch 'better-skip-slots-cache' of github.com:prysmaticlabs/prysm into better-skip-slots-cache
* Merge refs/heads/master into better-skip-slots-cache
* exit process slots if the context expired
* Revert "exit process slots if the context expired"

This reverts commit 1430d8ab1914a74c9387a17463121d55b7ad8ca9.
* ensure validation has a pubsub timeout
* Merge refs/heads/master into better-skip-slots-cache
* PR feedback
* Merge branch 'better-skip-slots-cache' of github.com:prysmaticlabs/prysm into better-skip-slots-cache
2020-01-21 02:19:42 +00:00

132 lines
3.2 KiB
Go

package cache
import (
"context"
"math"
"sync"
"time"
"github.com/gogo/protobuf/proto"
lru "github.com/hashicorp/golang-lru"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
)
var (
// Metrics
skipSlotCacheHit = promauto.NewCounter(prometheus.CounterOpts{
Name: "skip_slot_cache_hit",
Help: "The total number of cache hits on the skip slot cache.",
})
skipSlotCacheMiss = promauto.NewCounter(prometheus.CounterOpts{
Name: "skip_slot_cache_miss",
Help: "The total number of cache misses on the skip slot cache.",
})
)
// SkipSlotCache is used to store the cached results of processing skip slots in state.ProcessSlots.
type SkipSlotCache struct {
cache *lru.Cache
lock sync.RWMutex
inProgress map[uint64]bool
}
// NewSkipSlotCache initializes the map and underlying cache.
func NewSkipSlotCache() *SkipSlotCache {
cache, err := lru.New(8)
if err != nil {
panic(err)
}
return &SkipSlotCache{
cache: cache,
inProgress: make(map[uint64]bool),
}
}
// Get waits for any in progress calculation to complete before returning a
// cached response, if any.
func (c *SkipSlotCache) Get(ctx context.Context, slot uint64) (*pb.BeaconState, error) {
if !featureconfig.Get().EnableSkipSlotsCache {
// Return a miss result if cache is not enabled.
skipSlotCacheMiss.Inc()
return nil, nil
}
delay := minDelay
// Another identical request may be in progress already. Let's wait until
// any in progress request resolves or our timeout is exceeded.
for {
if ctx.Err() != nil {
return nil, ctx.Err()
}
c.lock.RLock()
if !c.inProgress[slot] {
c.lock.RUnlock()
break
}
c.lock.RUnlock()
// This increasing backoff is to decrease the CPU cycles while waiting
// for the in progress boolean to flip to false.
time.Sleep(time.Duration(delay) * time.Nanosecond)
delay *= delayFactor
delay = math.Min(delay, maxDelay)
}
item, exists := c.cache.Get(slot)
if exists && item != nil {
skipSlotCacheHit.Inc()
return proto.Clone(item.(*pb.BeaconState)).(*pb.BeaconState), nil
}
skipSlotCacheMiss.Inc()
return nil, nil
}
// MarkInProgress a request so that any other similar requests will block on
// Get until MarkNotInProgress is called.
func (c *SkipSlotCache) MarkInProgress(slot uint64) error {
if !featureconfig.Get().EnableSkipSlotsCache {
return nil
}
c.lock.Lock()
defer c.lock.Unlock()
if c.inProgress[slot] {
return ErrAlreadyInProgress
}
c.inProgress[slot] = true
return nil
}
// MarkNotInProgress will release the lock on a given request. This should be
// called after put.
func (c *SkipSlotCache) MarkNotInProgress(slot uint64) error {
if !featureconfig.Get().EnableSkipSlotsCache {
return nil
}
c.lock.Lock()
defer c.lock.Unlock()
delete(c.inProgress, slot)
return nil
}
// Put the response in the cache.
func (c *SkipSlotCache) Put(ctx context.Context, slot uint64, state *pb.BeaconState) error {
if !featureconfig.Get().EnableSkipSlotsCache {
return nil
}
// Clone state so cached value is not mutated.
c.cache.Add(slot, proto.Clone(state).(*pb.BeaconState))
return nil
}