Revamp proposer cache to fix lookahead bug (#7442)

* Add and test proposer indices cache

* Remove proposer indices usages from committee cache

* Adopt the new proposer indices cache

* Add comments on why genesis epoch is skipped

* Fix the failing tests

* Update beacon-chain/blockchain/process_block.go

Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>

* Update beacon-chain/core/helpers/committee.go

Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>

* Update beacon-chain/core/helpers/slot_epoch.go

Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>

* Update beacon-chain/core/helpers/slot_epoch.go

Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>

* Update beacon-chain/core/helpers/slot_epoch_test.go

Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>

* Update beacon-chain/core/helpers/validators.go

Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>

* Address additional feedbacks from @prestonvanloon

* Add comments on why genesis epoch is skipped

* Refactor EndSlot to use StartSlot within

* Add proposer indices disabled

* Add libfuzz tags

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>
This commit is contained in:
terence tsao 2020-10-07 10:25:08 -07:00 committed by GitHub
parent 9ce64e2428
commit 7ad2929f0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 278 additions and 107 deletions

View File

@ -246,7 +246,7 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []*ethpb.SignedBeaconBl
if helpers.IsEpochStart(preState.Slot()) {
boundaries[blockRoots[i]] = preState.Copy()
if err := s.handleEpochBoundary(preState); err != nil {
return nil, nil, fmt.Errorf("could not handle epoch boundary state")
return nil, nil, errors.Wrap(err, "could not handle epoch boundary state")
}
}
jCheckpoints[i] = preState.CurrentJustifiedCheckpoint()

View File

@ -2,6 +2,7 @@ load("@prysm//tools/go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_test")
# gazelle:ignore committee_disabled.go
# gazelle:ignore proposer_indices_disabled.go
go_library(
name = "go_default_library",
srcs = [
@ -14,12 +15,15 @@ go_library(
"skip_slot_cache.go",
"state_summary.go",
"subnet_ids.go",
"proposer_indices_type.go",
] + select({
"//fuzz:fuzzing_enabled": [
"committee_disabled.go",
"proposer_indices_disabled.go"
],
"//conditions:default": [
"committee.go",
"proposer_indices.go",
],
}),
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/cache",
@ -57,6 +61,7 @@ go_test(
"hot_state_cache_test.go",
"skip_slot_cache_test.go",
"subnet_ids_test.go",
"proposer_indices_test.go"
],
embed = [":go_default_library"],
deps = [

View File

@ -104,35 +104,6 @@ func (c *CommitteeCache) AddCommitteeShuffledList(committees *Committees) error
return nil
}
// AddProposerIndicesList updates the committee shuffled list with proposer indices.
func (c *CommitteeCache) AddProposerIndicesList(seed [32]byte, indices []uint64) error {
c.lock.Lock()
defer c.lock.Unlock()
obj, exists, err := c.CommitteeCache.GetByKey(key(seed))
if err != nil {
return err
}
if !exists {
committees := &Committees{ProposerIndices: indices}
if err := c.CommitteeCache.Add(committees); err != nil {
return err
}
} else {
committees, ok := obj.(*Committees)
if !ok {
return ErrNotCommittee
}
committees.ProposerIndices = indices
if err := c.CommitteeCache.Add(committees); err != nil {
return err
}
}
trim(c.CommitteeCache, maxCommitteesCacheSize)
return nil
}
// ActiveIndices returns the active indices of a given seed stored in cache.
func (c *CommitteeCache) ActiveIndices(seed [32]byte) ([]uint64, error) {
c.lock.RLock()
@ -181,30 +152,6 @@ func (c *CommitteeCache) ActiveIndicesCount(seed [32]byte) (int, error) {
return len(item.SortedIndices), nil
}
// ProposerIndices returns the proposer indices of a given seed.
func (c *CommitteeCache) ProposerIndices(seed [32]byte) ([]uint64, error) {
c.lock.RLock()
defer c.lock.RUnlock()
obj, exists, err := c.CommitteeCache.GetByKey(key(seed))
if err != nil {
return nil, err
}
if exists {
CommitteeCacheHit.Inc()
} else {
CommitteeCacheMiss.Inc()
return nil, nil
}
item, ok := obj.(*Committees)
if !ok {
return nil, ErrNotCommittee
}
return item.ProposerIndices, nil
}
// HasEntry returns true if the committee cache has a value.
func (c *CommitteeCache) HasEntry(seed string) bool {
_, ok, err := c.CommitteeCache.GetByKey(seed)

View File

@ -87,35 +87,6 @@ func TestCommitteeCache_ActiveCount(t *testing.T) {
assert.Equal(t, len(item.SortedIndices), count)
}
func TestCommitteeCache_AddProposerIndicesList(t *testing.T) {
cache := NewCommitteesCache()
seed := [32]byte{'A'}
indices, err := cache.ProposerIndices(seed)
require.NoError(t, err)
if indices != nil {
t.Error("Expected committee count not to exist in empty cache")
}
require.NoError(t, cache.AddProposerIndicesList(seed, indices))
received, err := cache.ProposerIndices(seed)
require.NoError(t, err)
assert.DeepEqual(t, received, indices)
item := &Committees{Seed: [32]byte{'B'}, SortedIndices: []uint64{1, 2, 3, 4, 5, 6}}
require.NoError(t, cache.AddCommitteeShuffledList(item))
indices, err = cache.ProposerIndices(item.Seed)
require.NoError(t, err)
if indices != nil {
t.Error("Expected committee count not to exist in empty cache")
}
require.NoError(t, cache.AddProposerIndicesList(item.Seed, indices))
received, err = cache.ProposerIndices(item.Seed)
require.NoError(t, err)
assert.DeepEqual(t, received, indices)
}
func TestCommitteeCache_CanRotate(t *testing.T) {
cache := NewCommitteesCache()
@ -150,7 +121,6 @@ func TestCommitteeCacheOutOfRange(t *testing.T) {
Seed: seed,
ShuffledIndices: []uint64{0},
SortedIndices: []uint64{},
ProposerIndices: []uint64{},
})
require.NoError(t, err)

View File

@ -12,5 +12,4 @@ type Committees struct {
Seed [32]byte
ShuffledIndices []uint64
SortedIndices []uint64
ProposerIndices []uint64
}

88
beacon-chain/cache/proposer_indices.go vendored Normal file
View File

@ -0,0 +1,88 @@
// +build !libfuzzer
package cache
import (
"sync"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"k8s.io/client-go/tools/cache"
)
var (
// maxProposerIndicesCacheSize defines the max number of proposer indices on per block root basis can cache.
// Due to reorgs and long finality, it's good to keep the old cache around for quickly switch over.
maxProposerIndicesCacheSize = uint64(8)
// ProposerIndicesCacheMiss tracks the number of proposerIndices requests that aren't present in the cache.
ProposerIndicesCacheMiss = promauto.NewCounter(prometheus.CounterOpts{
Name: "proposer_indices_cache_miss",
Help: "The number of proposer indices requests that aren't present in the cache.",
})
// ProposerIndicesCacheHit tracks the number of proposerIndices requests that are in the cache.
ProposerIndicesCacheHit = promauto.NewCounter(prometheus.CounterOpts{
Name: "proposer_indices_cache_hit",
Help: "The number of proposer indices requests that are present in the cache.",
})
)
// ProposerIndicesCache is a struct with 1 queue for looking up proposer indices by root.
type ProposerIndicesCache struct {
ProposerIndicesCache *cache.FIFO
lock sync.RWMutex
}
// proposerIndicesKeyFn takes the block root as the key to retrieve proposer indices in a given epoch.
func proposerIndicesKeyFn(obj interface{}) (string, error) {
info, ok := obj.(*ProposerIndices)
if !ok {
return "", ErrNotProposerIndices
}
return key(info.BlockRoot), nil
}
// NewProposerIndicesCache creates a new proposer indices cache for storing/accessing proposer index assignments of an epoch.
func NewProposerIndicesCache() *ProposerIndicesCache {
return &ProposerIndicesCache{
ProposerIndicesCache: cache.NewFIFO(proposerIndicesKeyFn),
}
}
// AddProposerIndices adds ProposerIndices object to the cache.
// This method also trims the least recently list if the cache size has ready the max cache size limit.
func (c *ProposerIndicesCache) AddProposerIndices(p *ProposerIndices) error {
c.lock.Lock()
defer c.lock.Unlock()
if err := c.ProposerIndicesCache.AddIfNotPresent(p); err != nil {
return err
}
trim(c.ProposerIndicesCache, maxProposerIndicesCacheSize)
return nil
}
// ProposerIndices returns the proposer indices of a block root seed.
func (c *ProposerIndicesCache) ProposerIndices(r [32]byte) ([]uint64, error) {
c.lock.RLock()
defer c.lock.RUnlock()
obj, exists, err := c.ProposerIndicesCache.GetByKey(key(r))
if err != nil {
return nil, err
}
if exists {
ProposerIndicesCacheHit.Inc()
} else {
ProposerIndicesCacheMiss.Inc()
return nil, nil
}
item, ok := obj.(*ProposerIndices)
if !ok {
return nil, ErrNotProposerIndices
}
return item.ProposerIndices, nil
}

View File

@ -0,0 +1,24 @@
// +build libfuzzer
// This file is used in fuzzer builds to bypass proposer indices caches.
package cache
// FakeProposerIndicesCache is a struct with 1 queue for looking up proposer indices by root.
type FakeProposerIndicesCache struct {
}
// NewProposerIndicesCache creates a new proposer indices cache for storing/accessing proposer index assignments of an epoch.
func NewProposerIndicesCache() *FakeProposerIndicesCache {
return &FakeProposerIndicesCache{}
}
// AddProposerIndices adds ProposerIndices object to the cache.
// This method also trims the least recently list if the cache size has ready the max cache size limit.
func (c *FakeProposerIndicesCache) AddProposerIndices(p *ProposerIndices) error {
return nil
}
// ProposerIndices returns the proposer indices of a block root seed.
func (c *FakeProposerIndicesCache) ProposerIndices(r [32]byte) ([]uint64, error) {
return nil, nil
}

View File

@ -0,0 +1,63 @@
package cache
import (
"strconv"
"testing"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
)
func TestProposerKeyFn_OK(t *testing.T) {
item := &ProposerIndices{
BlockRoot: [32]byte{'A'},
ProposerIndices: []uint64{1, 2, 3, 4, 5},
}
k, err := proposerIndicesKeyFn(item)
require.NoError(t, err)
assert.Equal(t, key(item.BlockRoot), k)
}
func TestProposerKeyFn_InvalidObj(t *testing.T) {
_, err := proposerIndicesKeyFn("bad")
assert.Equal(t, ErrNotProposerIndices, err)
}
func TestProposerCache_AddProposerIndicesList(t *testing.T) {
cache := NewProposerIndicesCache()
bRoot := [32]byte{'A'}
indices, err := cache.ProposerIndices(bRoot)
require.NoError(t, err)
if indices != nil {
t.Error("Expected committee count not to exist in empty cache")
}
require.NoError(t, cache.AddProposerIndices(&ProposerIndices{
ProposerIndices: indices,
BlockRoot: bRoot,
}))
received, err := cache.ProposerIndices(bRoot)
require.NoError(t, err)
assert.DeepEqual(t, received, indices)
item := &ProposerIndices{BlockRoot: [32]byte{'B'}, ProposerIndices: []uint64{1, 2, 3, 4, 5, 6}}
require.NoError(t, cache.AddProposerIndices(item))
received, err = cache.ProposerIndices(item.BlockRoot)
require.NoError(t, err)
assert.DeepEqual(t, item.ProposerIndices, received)
}
func TestProposerCache_CanRotate(t *testing.T) {
cache := NewProposerIndicesCache()
for i := 0; i < int(maxProposerIndicesCacheSize)+1; i++ {
s := []byte(strconv.Itoa(i))
item := &ProposerIndices{BlockRoot: bytesutil.ToBytes32(s)}
require.NoError(t, cache.AddProposerIndices(item))
}
k := cache.ProposerIndicesCache.ListKeys()
assert.Equal(t, maxProposerIndicesCacheSize, uint64(len(k)))
}

View File

@ -0,0 +1,13 @@
package cache
import "errors"
// ErrNotProposerIndices will be returned when a cache object is not a pointer to
// a ProposerIndices struct.
var ErrNotProposerIndices = errors.New("object is not a proposer indices struct")
// ProposerIndices defines the cached struct for proposer indices.
type ProposerIndices struct {
BlockRoot [32]byte
ProposerIndices []uint64
}

View File

@ -19,6 +19,7 @@ import (
)
var committeeCache = cache.NewCommitteesCache()
var proposerIndicesCache = cache.NewProposerIndicesCache()
// SlotCommitteeCount returns the number of crosslink committees of a slot. The
// active validator count is provided as an argument rather than a direct implementation
@ -348,6 +349,12 @@ func UpdateCommitteeCache(state *stateTrie.BeaconState, epoch uint64) error {
// UpdateProposerIndicesInCache updates proposer indices entry of the committee cache.
func UpdateProposerIndicesInCache(state *stateTrie.BeaconState, epoch uint64) error {
// The cache uses the block root at the last epoch slot as key. (e.g. for epoch 1, the key is root at slot 31)
// Which is the reason why we skip genesis epoch.
if epoch <= params.BeaconConfig().GenesisEpoch {
return nil
}
indices, err := ActiveValidatorIndices(state, epoch)
if err != nil {
return err
@ -356,12 +363,18 @@ func UpdateProposerIndicesInCache(state *stateTrie.BeaconState, epoch uint64) er
if err != nil {
return err
}
// The committee cache uses attester domain seed as key.
seed, err := Seed(state, epoch, params.BeaconConfig().DomainBeaconAttester)
s, err := EndSlot(PrevEpoch(state))
if err != nil {
return err
}
if err := committeeCache.AddProposerIndicesList(seed, proposerIndices); err != nil {
r, err := BlockRootAtSlot(state, s)
if err != nil {
return err
}
if err := proposerIndicesCache.AddProposerIndices(&cache.ProposerIndices{
BlockRoot: bytesutil.ToBytes32(r),
ProposerIndices: proposerIndices,
}); err != nil {
return err
}
@ -371,6 +384,7 @@ func UpdateProposerIndicesInCache(state *stateTrie.BeaconState, epoch uint64) er
// ClearCache clears the committee cache
func ClearCache() {
committeeCache = cache.NewCommitteesCache()
proposerIndicesCache = cache.NewProposerIndicesCache()
}
// This computes proposer indices of the current epoch and returns a list of proposer indices,

View File

@ -630,7 +630,7 @@ func TestPrecomputeProposerIndices_Ok(t *testing.T) {
}
func TestUpdateProposerIndicesInCache_CouldNotGetActiveIndices(t *testing.T) {
err := UpdateProposerIndicesInCache(&beaconstate.BeaconState{}, 0)
err := UpdateProposerIndicesInCache(&beaconstate.BeaconState{}, 1)
want := "nil inner state"
require.ErrorContains(t, want, err)
}

View File

@ -2,6 +2,7 @@ package helpers
import (
"fmt"
"math"
"time"
"github.com/pkg/errors"
@ -82,6 +83,19 @@ func StartSlot(epoch uint64) (uint64, error) {
return slot, nil
}
// EndSlot returns the last slot number of the
// current epoch.
func EndSlot(epoch uint64) (uint64, error) {
if epoch == math.MaxUint64 {
return 0, errors.New("start slot calculation overflows")
}
slot, err := StartSlot(epoch + 1)
if err != nil {
return 0, err
}
return slot - 1, nil
}
// IsEpochStart returns true if the given slot number is an epoch starting slot
// number.
func IsEpochStart(slot uint64) bool {

View File

@ -96,11 +96,34 @@ func TestEpochStartSlot_OK(t *testing.T) {
{epoch: 1 << 60, startSlot: 1 << 63, error: true},
}
for _, tt := range tests {
state := &pb.BeaconState{Slot: tt.epoch}
ss, err := StartSlot(tt.epoch)
if !tt.error {
require.NoError(t, err)
assert.Equal(t, tt.startSlot, ss, "StartSlot(%d)", state.Slot)
assert.Equal(t, tt.startSlot, ss, "StartSlot(%d)", tt.epoch)
} else {
require.ErrorContains(t, "start slot calculation overflow", err)
}
}
}
func TestEpochEndSlot_OK(t *testing.T) {
tests := []struct {
epoch uint64
startSlot uint64
error bool
}{
{epoch: 0, startSlot: 1*params.BeaconConfig().SlotsPerEpoch - 1, error: false},
{epoch: 1, startSlot: 2*params.BeaconConfig().SlotsPerEpoch - 1, error: false},
{epoch: 10, startSlot: 11*params.BeaconConfig().SlotsPerEpoch - 1, error: false},
{epoch: 1 << 59, startSlot: 1 << 63, error: true},
{epoch: 1 << 60, startSlot: 1 << 63, error: true},
{epoch: math.MaxUint64, startSlot: 0, error: true},
}
for _, tt := range tests {
ss, err := EndSlot(tt.epoch)
if !tt.error {
require.NoError(t, err)
assert.Equal(t, tt.startSlot, ss, "StartSlot(%d)", tt.epoch)
} else {
require.ErrorContains(t, "start slot calculation overflow", err)
}

View File

@ -175,20 +175,30 @@ func ValidatorChurnLimit(activeValidatorCount uint64) (uint64, error) {
// return compute_proposer_index(state, indices, seed)
func BeaconProposerIndex(state *stateTrie.BeaconState) (uint64, error) {
e := CurrentEpoch(state)
seed, err := Seed(state, e, params.BeaconConfig().DomainBeaconAttester)
if err != nil {
return 0, errors.Wrap(err, "could not generate seed")
}
proposerIndices, err := committeeCache.ProposerIndices(seed)
if err != nil {
return 0, errors.Wrap(err, "could not interface with committee cache")
}
if proposerIndices != nil {
return proposerIndices[state.Slot()%params.BeaconConfig().SlotsPerEpoch], nil
// The cache uses the block root of the previous epoch's last slot as key. (e.g. Starting epoch 1, slot 32, the key would be block root at slot 31)
// For simplicity, the node will skip caching of genesis epoch.
if e > params.BeaconConfig().GenesisEpoch {
s, err := EndSlot(PrevEpoch(state))
if err != nil {
return 0, err
}
r, err := BlockRootAtSlot(state, s)
if err != nil {
return 0, err
}
proposerIndices, err := proposerIndicesCache.ProposerIndices(bytesutil.ToBytes32(r))
if err != nil {
return 0, errors.Wrap(err, "could not interface with committee cache")
}
if proposerIndices != nil {
return proposerIndices[state.Slot()%params.BeaconConfig().SlotsPerEpoch], nil
}
if err := UpdateProposerIndicesInCache(state, e); err != nil {
return 0, errors.Wrap(err, "could not update committee cache")
}
}
seed, err = Seed(state, e, params.BeaconConfig().DomainBeaconProposer)
seed, err := Seed(state, e, params.BeaconConfig().DomainBeaconProposer)
if err != nil {
return 0, errors.Wrap(err, "could not generate seed")
}
@ -201,10 +211,6 @@ func BeaconProposerIndex(state *stateTrie.BeaconState) (uint64, error) {
return 0, errors.Wrap(err, "could not get active indices")
}
if err := UpdateProposerIndicesInCache(state, e); err != nil {
return 0, errors.Wrap(err, "could not update committee cache")
}
return ComputeProposerIndex(state, indices, seedWithSlotHash)
}

View File

@ -677,6 +677,9 @@ func (b *BeaconState) PubkeyAtIndex(idx uint64) [48]byte {
b.lock.RLock()
defer b.lock.RUnlock()
if b.state.Validators[idx] == nil {
return [48]byte{}
}
return bytesutil.ToBytes48(b.state.Validators[idx].PublicKey)
}

View File

@ -4,6 +4,7 @@ import (
"context"
"testing"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state/stateutils"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
@ -46,6 +47,7 @@ func TestGenerateFullBlock_Passes4Epochs(t *testing.T) {
}
finalSlot := params.BeaconConfig().SlotsPerEpoch*4 + 3
for i := 0; i < int(finalSlot); i++ {
helpers.ClearCache()
block, err := GenerateFullBlock(beaconState, privs, conf, beaconState.Slot())
require.NoError(t, err)
beaconState, err = state.ExecuteStateTransition(context.Background(), beaconState, block)