Proposer Server Filters Attestation by Canonical (#2271)

* filter attestations by canonical

* fix test

* added mutex

* fixed rpc tests

* forgot to init map, all existing tests passing

* added test for filter non-canonical atts

* lint
This commit is contained in:
terence tsao 2019-04-16 11:19:31 -07:00 committed by Raul Jordan
parent 3e525ebe8b
commit a9da23f444
7 changed files with 142 additions and 7 deletions

View File

@ -23,6 +23,8 @@ import (
type BlockReceiver interface {
CanonicalBlockFeed() *event.Feed
ReceiveBlock(ctx context.Context, block *pb.BeaconBlock) (*pb.BeaconState, error)
IsCanonical(slot uint64, hash []byte) bool
InsertsCanonical(slot uint64, hash []byte)
}
// BlockProcessor defines a common interface for methods useful for directly applying state transitions

View File

@ -142,6 +142,14 @@ func (c *ChainService) ApplyForkChoiceRule(
if err != nil {
return fmt.Errorf("could not run fork choice: %v", err)
}
headRoot, err := hashutil.HashBeaconBlock(head)
if err != nil {
return fmt.Errorf("could not hash head block: %v", err)
}
c.canonicalBlocksLock.Lock()
defer c.canonicalBlocksLock.Unlock()
c.canonicalBlocks[head.Slot] = headRoot[:]
newState := postState
if head.Slot != block.Slot {
log.Warnf("Reorg happened, last processed block at slot %d, new head block at slot %d",
@ -158,6 +166,9 @@ func (c *ChainService) ApplyForkChoiceRule(
postState.Slot-params.BeaconConfig().GenesisSlot, newState.Slot-params.BeaconConfig().GenesisSlot)
}
for revertedSlot := block.Slot; revertedSlot > head.Slot; revertedSlot-- {
delete(c.canonicalBlocks, revertedSlot)
}
reorgCount.Inc()
}

View File

@ -4,8 +4,10 @@
package blockchain
import (
"bytes"
"context"
"fmt"
"sync"
"time"
"github.com/prysmaticlabs/prysm/beacon-chain/attestation"
@ -44,6 +46,8 @@ type ChainService struct {
finalizedEpoch uint64
stateInitializedFeed *event.Feed
p2p p2p.Broadcaster
canonicalBlocks map[uint64][]byte
canonicalBlocksLock sync.RWMutex
}
// Config options for the service.
@ -72,6 +76,7 @@ func NewChainService(ctx context.Context, cfg *Config) (*ChainService, error) {
chainStartChan: make(chan time.Time),
stateInitializedFeed: new(event.Feed),
p2p: cfg.P2p,
canonicalBlocks: make(map[uint64][]byte),
}, nil
}
@ -206,3 +211,22 @@ func (c *ChainService) ChainHeadRoot() ([32]byte, error) {
}
return root, nil
}
// IsCanonical returns true if the input block hash of the corresponding slot
// is part of the canonical chain. False otherwise.
func (c *ChainService) IsCanonical(slot uint64, hash []byte) bool {
c.canonicalBlocksLock.RLock()
defer c.canonicalBlocksLock.RUnlock()
if canonicalHash, ok := c.canonicalBlocks[slot]; ok {
return bytes.Equal(canonicalHash, hash)
}
return false
}
// InsertsCanonical inserts a canonical block hash to its corresponding slot.
// This is used for testing purpose.
func (c *ChainService) InsertsCanonical(slot uint64, hash []byte) {
c.canonicalBlocksLock.Lock()
defer c.canonicalBlocksLock.Unlock()
c.canonicalBlocks[slot] = hash
}

View File

@ -112,7 +112,8 @@ func (ps *ProposerServer) PendingAttestations(ctx context.Context, req *pb.Pendi
// Remove any attestation from the list if their slot is before the start of
// the previous epoch or does not match the current state previous justified
// epoch. This should be handled in the operationService cleanup but we
// epoch or attestation is not voting on the canonical chain.
// This should be handled in the operationService cleanup but we
// should filter here in case it wasn't yet processed.
boundary := currentSlot - params.BeaconConfig().SlotsPerEpoch
attsWithinBoundary := make([]*pbp2p.Attestation, 0, len(atts))
@ -125,7 +126,9 @@ func (ps *ProposerServer) PendingAttestations(ctx context.Context, req *pb.Pendi
expectedJustifedEpoch = beaconState.PreviousJustifiedEpoch
}
if att.Data.Slot > boundary && att.Data.JustifiedEpoch == expectedJustifedEpoch {
if att.Data.Slot > boundary &&
att.Data.JustifiedEpoch == expectedJustifedEpoch &&
ps.chainService.IsCanonical(att.Data.Slot, att.Data.BeaconBlockRootHash32) {
attsWithinBoundary = append(attsWithinBoundary, att)
}
}

View File

@ -151,7 +151,8 @@ func TestPendingAttestations_FiltersWithinInclusionDelay(t *testing.T) {
}},
},
},
beaconDB: db,
chainService: &mockChainService{},
beaconDB: db,
}
if err := db.SaveState(ctx, beaconState); err != nil {
t.Fatal(err)
@ -212,6 +213,7 @@ func TestPendingAttestations_FiltersExpiredAttestations(t *testing.T) {
expectedNumberOfAttestations := 3
proposerServer := &ProposerServer{
operationService: opService,
chainService: &mockChainService{},
beaconDB: db,
}
beaconState := &pbp2p.BeaconState{
@ -260,6 +262,7 @@ func TestPendingAttestations_OK(t *testing.T) {
proposerServer := &ProposerServer{
operationService: &mockOperationService{},
chainService: &mockChainService{},
beaconDB: db,
}
beaconState := &pbp2p.BeaconState{
@ -289,3 +292,76 @@ func TestPendingAttestations_OK(t *testing.T) {
t.Error("Expected pending attestations list to be non-empty")
}
}
func TestPendingAttestations_FiltersCanonicalAttestations(t *testing.T) {
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
ctx := context.Background()
// Edge case: current slot is at the end of an epoch. The pending attestation
// for the next slot should come from currentSlot + 1.
currentSlot := helpers.StartSlot(
params.BeaconConfig().GenesisEpoch+10,
) - 1
expectedEpoch := uint64(100)
opService := &mockOperationService{
pendingAttestations: []*pbp2p.Attestation{
// Canonical attestations
{Data: &pbp2p.AttestationData{Slot: currentSlot - 5, JustifiedEpoch: expectedEpoch, BeaconBlockRootHash32: []byte{'A'}}},
{Data: &pbp2p.AttestationData{Slot: currentSlot - 2, JustifiedEpoch: expectedEpoch, BeaconBlockRootHash32: []byte{'B'}}},
// Non canonical attestations
{Data: &pbp2p.AttestationData{Slot: currentSlot, JustifiedEpoch: expectedEpoch, BeaconBlockRootHash32: []byte{'C'}}},
{Data: &pbp2p.AttestationData{Slot: currentSlot, JustifiedEpoch: expectedEpoch, BeaconBlockRootHash32: []byte{'D'}}},
// Canonical attestation
{Data: &pbp2p.AttestationData{Slot: currentSlot, JustifiedEpoch: expectedEpoch, BeaconBlockRootHash32: []byte{'E'}}},
},
}
expectedNumberOfAttestations := 3
proposerServer := &ProposerServer{
operationService: opService,
chainService: &mockChainService{canonicalBlocks: make(map[uint64][]byte)},
beaconDB: db,
}
beaconState := &pbp2p.BeaconState{
Slot: currentSlot,
JustifiedEpoch: expectedEpoch,
PreviousJustifiedEpoch: expectedEpoch,
}
if err := db.SaveState(ctx, beaconState); err != nil {
t.Fatal(err)
}
blk := &pbp2p.BeaconBlock{
Slot: beaconState.Slot,
}
if err := db.SaveBlock(blk); err != nil {
t.Fatalf("failed to save block %v", err)
}
if err := db.UpdateChainHead(ctx, blk, beaconState); err != nil {
t.Fatalf("couldnt update chainhead: %v", err)
}
for _, atts := range opService.pendingAttestations {
proposerServer.chainService.InsertsCanonical(atts.Data.Slot, atts.Data.BeaconBlockRootHash32)
}
res, err := proposerServer.PendingAttestations(
context.Background(),
&pb.PendingAttestationsRequest{
ProposalBlockSlot: currentSlot,
},
)
if err != nil {
t.Fatalf("Unexpected error fetching pending attestations: %v", err)
}
if len(res.PendingAttestations) != expectedNumberOfAttestations {
t.Errorf(
"Expected pending attestations list length %d, but was %d",
expectedNumberOfAttestations,
len(res.PendingAttestations),
)
}
}

View File

@ -1,6 +1,7 @@
package rpc
import (
"bytes"
"context"
"errors"
"fmt"
@ -77,6 +78,7 @@ type mockChainService struct {
stateFeed *event.Feed
attestationFeed *event.Feed
stateInitializedFeed *event.Feed
canonicalBlocks map[uint64][]byte
}
func (m *mockChainService) StateInitializedFeed() *event.Feed {
@ -99,6 +101,14 @@ func (m mockChainService) SaveHistoricalState(beaconState *pb.BeaconState) error
return nil
}
func (m mockChainService) IsCanonical(slot uint64, hash []byte) bool {
return bytes.Equal(m.canonicalBlocks[slot], hash)
}
func (m mockChainService) InsertsCanonical(slot uint64, hash []byte) {
m.canonicalBlocks[slot] = hash
}
func newMockChainService() *mockChainService {
return &mockChainService{
blockFeed: new(event.Feed),

View File

@ -45,10 +45,11 @@ func (mp *mockP2P) Send(ctx context.Context, msg proto.Message, peerID peer.ID)
}
type mockChainService struct {
bFeed *event.Feed
sFeed *event.Feed
cFeed *event.Feed
db *db.BeaconDB
bFeed *event.Feed
sFeed *event.Feed
cFeed *event.Feed
db *db.BeaconDB
canonicalBlocks map[uint64][]byte
}
func (ms *mockChainService) StateInitializedFeed() *event.Feed {
@ -88,6 +89,14 @@ func (ms *mockChainService) CleanupBlockOperations(ctx context.Context, block *p
return nil
}
func (ms *mockChainService) IsCanonical(slot uint64, hash []byte) bool {
return true
}
func (ms mockChainService) InsertsCanonical(slot uint64, hash []byte) {
ms.canonicalBlocks[slot] = hash
}
type mockOperationService struct{}
func (ms *mockOperationService) IncomingProcessedBlockFeed() *event.Feed {