Refactor BLS change pool (#11894)

* Refactor BLS change pool

* update mock's signature

* do not return error from `MarkIncluded`

* fix tests

* fix mock's signature

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Radosław Kapka 2023-01-23 15:11:45 +01:00 committed by GitHub
parent 3df2dedbb2
commit 1a048a2f2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 99 additions and 102 deletions

View File

@ -557,22 +557,6 @@ func (s *Service) handleBlockAttestations(ctx context.Context, blk interfaces.Be
return nil
}
func (s *Service) handleBlockBLSToExecChanges(blk interfaces.BeaconBlock) error {
if blk.Version() < version.Capella {
return nil
}
changes, err := blk.Body().BLSToExecutionChanges()
if err != nil {
return errors.Wrap(err, "could not get BLSToExecutionChanges")
}
for _, change := range changes {
if err := s.cfg.BLSToExecPool.MarkIncluded(change); err != nil {
return errors.Wrap(err, "could not mark BLSToExecutionChange as included")
}
}
return nil
}
// InsertSlashingsToForkChoiceStore inserts attester slashing indices to fork choice store.
// To call this function, it's caller's responsibility to ensure the slashing object is valid.
func (s *Service) InsertSlashingsToForkChoiceStore(ctx context.Context, slashings []*ethpb.AttesterSlashing) {

View File

@ -26,7 +26,6 @@ import (
doublylinkedtree "github.com/prysmaticlabs/prysm/v3/beacon-chain/forkchoice/doubly-linked-tree"
forkchoicetypes "github.com/prysmaticlabs/prysm/v3/beacon-chain/forkchoice/types"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/operations/attestations"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/operations/blstoexec"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/state/stategen"
fieldparams "github.com/prysmaticlabs/prysm/v3/config/fieldparams"
@ -2305,65 +2304,6 @@ func TestFillMissingBlockPayloadId_DiffSlotExitEarly(t *testing.T) {
require.NoError(t, service.fillMissingBlockPayloadId(ctx, time.Unix(int64(params.BeaconConfig().SecondsPerSlot/2), 0)))
}
func TestHandleBBlockBLSToExecutionChanges(t *testing.T) {
ctx := context.Background()
beaconDB := testDB.SetupDB(t)
fc := doublylinkedtree.New()
pool := blstoexec.NewPool()
opts := []Option{
WithDatabase(beaconDB),
WithStateGen(stategen.New(beaconDB, fc)),
WithForkChoiceStore(fc),
WithStateNotifier(&mock.MockStateNotifier{}),
WithBLSToExecPool(pool),
}
service, err := NewService(ctx, opts...)
require.NoError(t, err)
t.Run("pre Capella block", func(t *testing.T) {
body := &ethpb.BeaconBlockBodyBellatrix{}
pbb := &ethpb.BeaconBlockBellatrix{
Body: body,
}
blk, err := consensusblocks.NewBeaconBlock(pbb)
require.NoError(t, err)
require.NoError(t, service.handleBlockBLSToExecChanges(blk))
})
t.Run("Post Capella no changes", func(t *testing.T) {
body := &ethpb.BeaconBlockBodyCapella{}
pbb := &ethpb.BeaconBlockCapella{
Body: body,
}
blk, err := consensusblocks.NewBeaconBlock(pbb)
require.NoError(t, err)
require.NoError(t, service.handleBlockBLSToExecChanges(blk))
})
t.Run("Post Capella some changes", func(t *testing.T) {
idx := types.ValidatorIndex(123)
change := &ethpb.BLSToExecutionChange{
ValidatorIndex: idx,
}
signedChange := &ethpb.SignedBLSToExecutionChange{
Message: change,
}
body := &ethpb.BeaconBlockBodyCapella{
BlsToExecutionChanges: []*ethpb.SignedBLSToExecutionChange{signedChange},
}
pbb := &ethpb.BeaconBlockCapella{
Body: body,
}
blk, err := consensusblocks.NewBeaconBlock(pbb)
require.NoError(t, err)
pool.InsertBLSToExecChange(signedChange)
require.Equal(t, true, pool.ValidatorExists(idx))
require.NoError(t, service.handleBlockBLSToExecChanges(blk))
require.Equal(t, false, pool.ValidatorExists(idx))
})
}
// Helper function to simulate the block being on time or delayed for proposer
// boost. It alters the genesisTime tracked by the store.
func driftGenesisTime(s *Service, slot int64, delay int64) {

View File

@ -10,6 +10,7 @@ import (
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v3/monitoring/tracing"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/runtime/version"
"github.com/prysmaticlabs/prysm/v3/time"
"github.com/prysmaticlabs/prysm/v3/time/slots"
"go.opencensus.io/trace"
@ -150,6 +151,11 @@ func (s *Service) handlePostBlockOperations(b interfaces.BeaconBlock) error {
s.cfg.ExitPool.MarkIncluded(e)
}
// Mark block BLS changes as seen so we don't include same ones in future blocks.
if err := s.handleBlockBLSToExecChanges(b); err != nil {
return errors.Wrap(err, "could not process BLSToExecutionChanges")
}
// Mark attester slashings as seen so we don't include same ones in future blocks.
for _, as := range b.Body().AttesterSlashings() {
s.cfg.SlashingPool.MarkIncludedAttesterSlashing(as)
@ -157,6 +163,20 @@ func (s *Service) handlePostBlockOperations(b interfaces.BeaconBlock) error {
return nil
}
func (s *Service) handleBlockBLSToExecChanges(blk interfaces.BeaconBlock) error {
if blk.Version() < version.Capella {
return nil
}
changes, err := blk.Body().BLSToExecutionChanges()
if err != nil {
return errors.Wrap(err, "could not get BLSToExecutionChanges")
}
for _, change := range changes {
s.cfg.BLSToExecPool.MarkIncluded(change)
}
return nil
}
// This checks whether it's time to start saving hot state to DB.
// It's time when there's `epochsSinceFinalitySaveHotStateDB` epochs of non-finality.
func (s *Service) checkSaveHotStateDB(ctx context.Context) error {

View File

@ -10,6 +10,7 @@ import (
testDB "github.com/prysmaticlabs/prysm/v3/beacon-chain/db/testing"
doublylinkedtree "github.com/prysmaticlabs/prysm/v3/beacon-chain/forkchoice/doubly-linked-tree"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/operations/attestations"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/operations/blstoexec"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/operations/voluntaryexits"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/v3/config/params"
@ -332,3 +333,62 @@ func TestCheckSaveHotStateDB_Overflow(t *testing.T) {
require.NoError(t, s.checkSaveHotStateDB(context.Background()))
assert.LogsDoNotContain(t, hook, "Entering mode to save hot states in DB")
}
func TestHandleBlockBLSToExecutionChanges(t *testing.T) {
ctx := context.Background()
beaconDB := testDB.SetupDB(t)
fc := doublylinkedtree.New()
pool := blstoexec.NewPool()
opts := []Option{
WithDatabase(beaconDB),
WithStateGen(stategen.New(beaconDB, fc)),
WithForkChoiceStore(fc),
WithStateNotifier(&blockchainTesting.MockStateNotifier{}),
WithBLSToExecPool(pool),
}
service, err := NewService(ctx, opts...)
require.NoError(t, err)
t.Run("pre Capella block", func(t *testing.T) {
body := &ethpb.BeaconBlockBodyBellatrix{}
pbb := &ethpb.BeaconBlockBellatrix{
Body: body,
}
blk, err := blocks.NewBeaconBlock(pbb)
require.NoError(t, err)
require.NoError(t, service.handleBlockBLSToExecChanges(blk))
})
t.Run("Post Capella no changes", func(t *testing.T) {
body := &ethpb.BeaconBlockBodyCapella{}
pbb := &ethpb.BeaconBlockCapella{
Body: body,
}
blk, err := blocks.NewBeaconBlock(pbb)
require.NoError(t, err)
require.NoError(t, service.handleBlockBLSToExecChanges(blk))
})
t.Run("Post Capella some changes", func(t *testing.T) {
idx := types.ValidatorIndex(123)
change := &ethpb.BLSToExecutionChange{
ValidatorIndex: idx,
}
signedChange := &ethpb.SignedBLSToExecutionChange{
Message: change,
}
body := &ethpb.BeaconBlockBodyCapella{
BlsToExecutionChanges: []*ethpb.SignedBLSToExecutionChange{signedChange},
}
pbb := &ethpb.BeaconBlockCapella{
Body: body,
}
blk, err := blocks.NewBeaconBlock(pbb)
require.NoError(t, err)
pool.InsertBLSToExecChange(signedChange)
require.Equal(t, true, pool.ValidatorExists(idx))
require.NoError(t, service.handleBlockBLSToExecChanges(blk))
require.Equal(t, false, pool.ValidatorExists(idx))
})
}

View File

@ -18,7 +18,6 @@ go_library(
"//container/doubly-linked-list:go_default_library",
"//crypto/bls/blst:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)

View File

@ -17,7 +17,7 @@ func (m *PoolMock) PendingBLSToExecChanges() ([]*eth.SignedBLSToExecutionChange,
}
// BLSToExecChangesForInclusion --
func (m *PoolMock) BLSToExecChangesForInclusion(_ state.BeaconState) ([]*eth.SignedBLSToExecutionChange, error) {
func (m *PoolMock) BLSToExecChangesForInclusion(_ state.ReadOnlyBeaconState) ([]*eth.SignedBLSToExecutionChange, error) {
return m.Changes, nil
}
@ -27,7 +27,7 @@ func (m *PoolMock) InsertBLSToExecChange(change *eth.SignedBLSToExecutionChange)
}
// MarkIncluded --
func (*PoolMock) MarkIncluded(_ *eth.SignedBLSToExecutionChange) error {
func (*PoolMock) MarkIncluded(_ *eth.SignedBLSToExecutionChange) {
panic("implement me")
}

View File

@ -4,7 +4,6 @@ import (
"math"
"sync"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v3/config/params"
@ -19,9 +18,9 @@ import (
// This pool is used by proposers to insert BLS-to-execution-change objects into new blocks.
type PoolManager interface {
PendingBLSToExecChanges() ([]*ethpb.SignedBLSToExecutionChange, error)
BLSToExecChangesForInclusion(state.BeaconState) ([]*ethpb.SignedBLSToExecutionChange, error)
BLSToExecChangesForInclusion(beaconState state.ReadOnlyBeaconState) ([]*ethpb.SignedBLSToExecutionChange, error)
InsertBLSToExecChange(change *ethpb.SignedBLSToExecutionChange)
MarkIncluded(change *ethpb.SignedBLSToExecutionChange) error
MarkIncluded(change *ethpb.SignedBLSToExecutionChange)
ValidatorExists(idx types.ValidatorIndex) bool
}
@ -61,9 +60,9 @@ func (p *Pool) PendingBLSToExecChanges() ([]*ethpb.SignedBLSToExecutionChange, e
return result, nil
}
// BLSToExecChangesForInclusion returns objects that are ready for inclusion at the given slot.
// BLSToExecChangesForInclusion returns objects that are ready for inclusion.
// This method will not return more than the block enforced MaxBlsToExecutionChanges.
func (p *Pool) BLSToExecChangesForInclusion(st state.BeaconState) ([]*ethpb.SignedBLSToExecutionChange, error) {
func (p *Pool) BLSToExecChangesForInclusion(st state.ReadOnlyBeaconState) ([]*ethpb.SignedBLSToExecutionChange, error) {
p.lock.RLock()
length := int(math.Min(float64(params.BeaconConfig().MaxBlsToExecutionChanges), float64(p.pending.Len())))
result := make([]*ethpb.SignedBLSToExecutionChange, 0, length)
@ -79,9 +78,7 @@ func (p *Pool) BLSToExecChangesForInclusion(st state.BeaconState) ([]*ethpb.Sign
logrus.WithError(err).Warning("removing invalid BLSToExecutionChange from pool")
// MarkIncluded removes the invalid change from the pool
p.lock.RUnlock()
if err := p.MarkIncluded(change); err != nil {
return nil, errors.Wrap(err, "could not mark BLSToExecutionChange as included")
}
p.MarkIncluded(change)
p.lock.RLock()
} else {
result = append(result, change)
@ -118,9 +115,7 @@ func (p *Pool) BLSToExecChangesForInclusion(st state.BeaconState) ([]*ethpb.Sign
}
if !signature.Verify(cSet.PublicKeys[i], cSet.Messages[i][:]) {
logrus.Warning("removing BLSToExecutionChange with invalid signature from pool")
if err := p.MarkIncluded(result[i]); err != nil {
return nil, errors.Wrap(err, "could not mark BLSToExecutionChange as included")
}
p.MarkIncluded(result[i])
} else {
verified = append(verified, result[i])
}
@ -143,19 +138,18 @@ func (p *Pool) InsertBLSToExecChange(change *ethpb.SignedBLSToExecutionChange) {
}
// MarkIncluded is used when an object has been included in a beacon block. Every block seen by this
// listNode should call this method to include the object. This will remove the object from the pool.
func (p *Pool) MarkIncluded(change *ethpb.SignedBLSToExecutionChange) error {
// node should call this method to include the object. This will remove the object from the pool.
func (p *Pool) MarkIncluded(change *ethpb.SignedBLSToExecutionChange) {
p.lock.Lock()
defer p.lock.Unlock()
node := p.m[change.Message.ValidatorIndex]
if node == nil {
return nil
return
}
delete(p.m, change.Message.ValidatorIndex)
p.pending.Remove(node)
return nil
}
// ValidatorExists checks if the bls to execution change object exists

View File

@ -237,7 +237,7 @@ func TestMarkIncluded(t *testing.T) {
ValidatorIndex: types.ValidatorIndex(0),
}}
pool.InsertBLSToExecChange(change)
require.NoError(t, pool.MarkIncluded(change))
pool.MarkIncluded(change)
assert.Equal(t, 0, pool.pending.Len())
_, ok := pool.m[0]
assert.Equal(t, false, ok)
@ -259,7 +259,7 @@ func TestMarkIncluded(t *testing.T) {
pool.InsertBLSToExecChange(first)
pool.InsertBLSToExecChange(second)
pool.InsertBLSToExecChange(third)
require.NoError(t, pool.MarkIncluded(first))
pool.MarkIncluded(first)
require.Equal(t, 2, pool.pending.Len())
_, ok := pool.m[0]
assert.Equal(t, false, ok)
@ -281,7 +281,7 @@ func TestMarkIncluded(t *testing.T) {
pool.InsertBLSToExecChange(first)
pool.InsertBLSToExecChange(second)
pool.InsertBLSToExecChange(third)
require.NoError(t, pool.MarkIncluded(third))
pool.MarkIncluded(third)
require.Equal(t, 2, pool.pending.Len())
_, ok := pool.m[2]
assert.Equal(t, false, ok)
@ -303,7 +303,7 @@ func TestMarkIncluded(t *testing.T) {
pool.InsertBLSToExecChange(first)
pool.InsertBLSToExecChange(second)
pool.InsertBLSToExecChange(third)
require.NoError(t, pool.MarkIncluded(second))
pool.MarkIncluded(second)
require.Equal(t, 2, pool.pending.Len())
_, ok := pool.m[1]
assert.Equal(t, false, ok)
@ -324,7 +324,7 @@ func TestMarkIncluded(t *testing.T) {
}}
pool.InsertBLSToExecChange(first)
pool.InsertBLSToExecChange(second)
require.NoError(t, pool.MarkIncluded(change))
pool.MarkIncluded(change)
require.Equal(t, 2, pool.pending.Len())
_, ok := pool.m[0]
require.Equal(t, true, ok)
@ -378,7 +378,7 @@ func TestValidatorExists(t *testing.T) {
ValidatorIndex: types.ValidatorIndex(0),
}}
pool.InsertBLSToExecChange(change)
require.NoError(t, pool.MarkIncluded(change))
pool.MarkIncluded(change)
assert.Equal(t, false, pool.ValidatorExists(0))
})
t.Run("multiple validators added to pool and removed", func(t *testing.T) {
@ -399,8 +399,8 @@ func TestValidatorExists(t *testing.T) {
}}
pool.InsertBLSToExecChange(thirdChange)
assert.NoError(t, pool.MarkIncluded(firstChange))
assert.NoError(t, pool.MarkIncluded(thirdChange))
pool.MarkIncluded(firstChange)
pool.MarkIncluded(thirdChange)
assert.Equal(t, false, pool.ValidatorExists(0))
assert.Equal(t, true, pool.ValidatorExists(10))