From a9da23f4449ba0307303416cb0baba66647713c0 Mon Sep 17 00:00:00 2001 From: terence tsao Date: Tue, 16 Apr 2019 11:19:31 -0700 Subject: [PATCH] Proposer Server Filters Attestation by Canonical (#2271) * filter attestations by canonical * fix test * added mutex * fixed rpc tests * forgot to init map, all existing tests passing * added test for filter non-canonical atts * lint --- beacon-chain/blockchain/block_processing.go | 2 + beacon-chain/blockchain/fork_choice.go | 11 +++ beacon-chain/blockchain/service.go | 24 +++++++ beacon-chain/rpc/proposer_server.go | 7 +- beacon-chain/rpc/proposer_server_test.go | 78 ++++++++++++++++++++- beacon-chain/rpc/service_test.go | 10 +++ beacon-chain/sync/regular_sync_test.go | 17 +++-- 7 files changed, 142 insertions(+), 7 deletions(-) diff --git a/beacon-chain/blockchain/block_processing.go b/beacon-chain/blockchain/block_processing.go index 9ac0c563c..41efecfb2 100644 --- a/beacon-chain/blockchain/block_processing.go +++ b/beacon-chain/blockchain/block_processing.go @@ -23,6 +23,8 @@ import ( type BlockReceiver interface { CanonicalBlockFeed() *event.Feed ReceiveBlock(ctx context.Context, block *pb.BeaconBlock) (*pb.BeaconState, error) + IsCanonical(slot uint64, hash []byte) bool + InsertsCanonical(slot uint64, hash []byte) } // BlockProcessor defines a common interface for methods useful for directly applying state transitions diff --git a/beacon-chain/blockchain/fork_choice.go b/beacon-chain/blockchain/fork_choice.go index 63dae4a32..c843f8357 100644 --- a/beacon-chain/blockchain/fork_choice.go +++ b/beacon-chain/blockchain/fork_choice.go @@ -142,6 +142,14 @@ func (c *ChainService) ApplyForkChoiceRule( if err != nil { return fmt.Errorf("could not run fork choice: %v", err) } + headRoot, err := hashutil.HashBeaconBlock(head) + if err != nil { + return fmt.Errorf("could not hash head block: %v", err) + } + c.canonicalBlocksLock.Lock() + defer c.canonicalBlocksLock.Unlock() + c.canonicalBlocks[head.Slot] = headRoot[:] + newState := postState if head.Slot != block.Slot { log.Warnf("Reorg happened, last processed block at slot %d, new head block at slot %d", @@ -158,6 +166,9 @@ func (c *ChainService) ApplyForkChoiceRule( postState.Slot-params.BeaconConfig().GenesisSlot, newState.Slot-params.BeaconConfig().GenesisSlot) } + for revertedSlot := block.Slot; revertedSlot > head.Slot; revertedSlot-- { + delete(c.canonicalBlocks, revertedSlot) + } reorgCount.Inc() } diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index d65c312f7..4cab666eb 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -4,8 +4,10 @@ package blockchain import ( + "bytes" "context" "fmt" + "sync" "time" "github.com/prysmaticlabs/prysm/beacon-chain/attestation" @@ -44,6 +46,8 @@ type ChainService struct { finalizedEpoch uint64 stateInitializedFeed *event.Feed p2p p2p.Broadcaster + canonicalBlocks map[uint64][]byte + canonicalBlocksLock sync.RWMutex } // Config options for the service. @@ -72,6 +76,7 @@ func NewChainService(ctx context.Context, cfg *Config) (*ChainService, error) { chainStartChan: make(chan time.Time), stateInitializedFeed: new(event.Feed), p2p: cfg.P2p, + canonicalBlocks: make(map[uint64][]byte), }, nil } @@ -206,3 +211,22 @@ func (c *ChainService) ChainHeadRoot() ([32]byte, error) { } return root, nil } + +// IsCanonical returns true if the input block hash of the corresponding slot +// is part of the canonical chain. False otherwise. +func (c *ChainService) IsCanonical(slot uint64, hash []byte) bool { + c.canonicalBlocksLock.RLock() + defer c.canonicalBlocksLock.RUnlock() + if canonicalHash, ok := c.canonicalBlocks[slot]; ok { + return bytes.Equal(canonicalHash, hash) + } + return false +} + +// InsertsCanonical inserts a canonical block hash to its corresponding slot. +// This is used for testing purpose. +func (c *ChainService) InsertsCanonical(slot uint64, hash []byte) { + c.canonicalBlocksLock.Lock() + defer c.canonicalBlocksLock.Unlock() + c.canonicalBlocks[slot] = hash +} diff --git a/beacon-chain/rpc/proposer_server.go b/beacon-chain/rpc/proposer_server.go index 81a77022b..a2d954f4c 100644 --- a/beacon-chain/rpc/proposer_server.go +++ b/beacon-chain/rpc/proposer_server.go @@ -112,7 +112,8 @@ func (ps *ProposerServer) PendingAttestations(ctx context.Context, req *pb.Pendi // Remove any attestation from the list if their slot is before the start of // the previous epoch or does not match the current state previous justified - // epoch. This should be handled in the operationService cleanup but we + // epoch or attestation is not voting on the canonical chain. + // This should be handled in the operationService cleanup but we // should filter here in case it wasn't yet processed. boundary := currentSlot - params.BeaconConfig().SlotsPerEpoch attsWithinBoundary := make([]*pbp2p.Attestation, 0, len(atts)) @@ -125,7 +126,9 @@ func (ps *ProposerServer) PendingAttestations(ctx context.Context, req *pb.Pendi expectedJustifedEpoch = beaconState.PreviousJustifiedEpoch } - if att.Data.Slot > boundary && att.Data.JustifiedEpoch == expectedJustifedEpoch { + if att.Data.Slot > boundary && + att.Data.JustifiedEpoch == expectedJustifedEpoch && + ps.chainService.IsCanonical(att.Data.Slot, att.Data.BeaconBlockRootHash32) { attsWithinBoundary = append(attsWithinBoundary, att) } } diff --git a/beacon-chain/rpc/proposer_server_test.go b/beacon-chain/rpc/proposer_server_test.go index d88250066..2ac3839b9 100644 --- a/beacon-chain/rpc/proposer_server_test.go +++ b/beacon-chain/rpc/proposer_server_test.go @@ -151,7 +151,8 @@ func TestPendingAttestations_FiltersWithinInclusionDelay(t *testing.T) { }}, }, }, - beaconDB: db, + chainService: &mockChainService{}, + beaconDB: db, } if err := db.SaveState(ctx, beaconState); err != nil { t.Fatal(err) @@ -212,6 +213,7 @@ func TestPendingAttestations_FiltersExpiredAttestations(t *testing.T) { expectedNumberOfAttestations := 3 proposerServer := &ProposerServer{ operationService: opService, + chainService: &mockChainService{}, beaconDB: db, } beaconState := &pbp2p.BeaconState{ @@ -260,6 +262,7 @@ func TestPendingAttestations_OK(t *testing.T) { proposerServer := &ProposerServer{ operationService: &mockOperationService{}, + chainService: &mockChainService{}, beaconDB: db, } beaconState := &pbp2p.BeaconState{ @@ -289,3 +292,76 @@ func TestPendingAttestations_OK(t *testing.T) { t.Error("Expected pending attestations list to be non-empty") } } + +func TestPendingAttestations_FiltersCanonicalAttestations(t *testing.T) { + db := internal.SetupDB(t) + defer internal.TeardownDB(t, db) + ctx := context.Background() + + // Edge case: current slot is at the end of an epoch. The pending attestation + // for the next slot should come from currentSlot + 1. + currentSlot := helpers.StartSlot( + params.BeaconConfig().GenesisEpoch+10, + ) - 1 + + expectedEpoch := uint64(100) + + opService := &mockOperationService{ + pendingAttestations: []*pbp2p.Attestation{ + // Canonical attestations + {Data: &pbp2p.AttestationData{Slot: currentSlot - 5, JustifiedEpoch: expectedEpoch, BeaconBlockRootHash32: []byte{'A'}}}, + {Data: &pbp2p.AttestationData{Slot: currentSlot - 2, JustifiedEpoch: expectedEpoch, BeaconBlockRootHash32: []byte{'B'}}}, + // Non canonical attestations + {Data: &pbp2p.AttestationData{Slot: currentSlot, JustifiedEpoch: expectedEpoch, BeaconBlockRootHash32: []byte{'C'}}}, + {Data: &pbp2p.AttestationData{Slot: currentSlot, JustifiedEpoch: expectedEpoch, BeaconBlockRootHash32: []byte{'D'}}}, + // Canonical attestation + {Data: &pbp2p.AttestationData{Slot: currentSlot, JustifiedEpoch: expectedEpoch, BeaconBlockRootHash32: []byte{'E'}}}, + }, + } + expectedNumberOfAttestations := 3 + proposerServer := &ProposerServer{ + operationService: opService, + chainService: &mockChainService{canonicalBlocks: make(map[uint64][]byte)}, + beaconDB: db, + } + beaconState := &pbp2p.BeaconState{ + Slot: currentSlot, + JustifiedEpoch: expectedEpoch, + PreviousJustifiedEpoch: expectedEpoch, + } + if err := db.SaveState(ctx, beaconState); err != nil { + t.Fatal(err) + } + + blk := &pbp2p.BeaconBlock{ + Slot: beaconState.Slot, + } + + if err := db.SaveBlock(blk); err != nil { + t.Fatalf("failed to save block %v", err) + } + + if err := db.UpdateChainHead(ctx, blk, beaconState); err != nil { + t.Fatalf("couldnt update chainhead: %v", err) + } + for _, atts := range opService.pendingAttestations { + proposerServer.chainService.InsertsCanonical(atts.Data.Slot, atts.Data.BeaconBlockRootHash32) + } + + res, err := proposerServer.PendingAttestations( + context.Background(), + &pb.PendingAttestationsRequest{ + ProposalBlockSlot: currentSlot, + }, + ) + if err != nil { + t.Fatalf("Unexpected error fetching pending attestations: %v", err) + } + if len(res.PendingAttestations) != expectedNumberOfAttestations { + t.Errorf( + "Expected pending attestations list length %d, but was %d", + expectedNumberOfAttestations, + len(res.PendingAttestations), + ) + } +} diff --git a/beacon-chain/rpc/service_test.go b/beacon-chain/rpc/service_test.go index b19bbacd2..a59a5d26d 100644 --- a/beacon-chain/rpc/service_test.go +++ b/beacon-chain/rpc/service_test.go @@ -1,6 +1,7 @@ package rpc import ( + "bytes" "context" "errors" "fmt" @@ -77,6 +78,7 @@ type mockChainService struct { stateFeed *event.Feed attestationFeed *event.Feed stateInitializedFeed *event.Feed + canonicalBlocks map[uint64][]byte } func (m *mockChainService) StateInitializedFeed() *event.Feed { @@ -99,6 +101,14 @@ func (m mockChainService) SaveHistoricalState(beaconState *pb.BeaconState) error return nil } +func (m mockChainService) IsCanonical(slot uint64, hash []byte) bool { + return bytes.Equal(m.canonicalBlocks[slot], hash) +} + +func (m mockChainService) InsertsCanonical(slot uint64, hash []byte) { + m.canonicalBlocks[slot] = hash +} + func newMockChainService() *mockChainService { return &mockChainService{ blockFeed: new(event.Feed), diff --git a/beacon-chain/sync/regular_sync_test.go b/beacon-chain/sync/regular_sync_test.go index 238f7029c..ed8e57fa8 100644 --- a/beacon-chain/sync/regular_sync_test.go +++ b/beacon-chain/sync/regular_sync_test.go @@ -45,10 +45,11 @@ func (mp *mockP2P) Send(ctx context.Context, msg proto.Message, peerID peer.ID) } type mockChainService struct { - bFeed *event.Feed - sFeed *event.Feed - cFeed *event.Feed - db *db.BeaconDB + bFeed *event.Feed + sFeed *event.Feed + cFeed *event.Feed + db *db.BeaconDB + canonicalBlocks map[uint64][]byte } func (ms *mockChainService) StateInitializedFeed() *event.Feed { @@ -88,6 +89,14 @@ func (ms *mockChainService) CleanupBlockOperations(ctx context.Context, block *p return nil } +func (ms *mockChainService) IsCanonical(slot uint64, hash []byte) bool { + return true +} + +func (ms mockChainService) InsertsCanonical(slot uint64, hash []byte) { + ms.canonicalBlocks[slot] = hash +} + type mockOperationService struct{} func (ms *mockOperationService) IncomingProcessedBlockFeed() *event.Feed {