From 624c42421d903054b65d9b2b0a81ef819232441c Mon Sep 17 00:00:00 2001 From: terence tsao Date: Wed, 8 Jan 2020 06:59:49 -0800 Subject: [PATCH] Add HasAggregatedAttestation getter for pool (#4451) --- .../operations/attestations/kv/aggregated.go | 22 +++++++ .../attestations/kv/aggregated_test.go | 42 ++++++++++++ beacon-chain/operations/attestations/pool.go | 1 + beacon-chain/sync/validate_aggregate_proof.go | 9 ++- .../sync/validate_aggregate_proof_test.go | 64 +++++++++++++++++++ 5 files changed, 137 insertions(+), 1 deletion(-) diff --git a/beacon-chain/operations/attestations/kv/aggregated.go b/beacon-chain/operations/attestations/kv/aggregated.go index d4cc712ca..47c5d3211 100644 --- a/beacon-chain/operations/attestations/kv/aggregated.go +++ b/beacon-chain/operations/attestations/kv/aggregated.go @@ -86,3 +86,25 @@ func (p *AttCaches) DeleteAggregatedAttestation(att *ethpb.Attestation) error { return nil } + +// HasAggregatedAttestation checks if the input attestations has already existed in cache. +func (p *AttCaches) HasAggregatedAttestation(att *ethpb.Attestation) (bool, error) { + r, err := ssz.HashTreeRoot(att) + if err != nil { + return false, errors.Wrap(err, "could not tree hash attestation") + } + + for k := range p.aggregatedAtt.Items() { + if k == string(r[:]) { + return true, nil + } + } + + for k := range p.blockAtt.Items() { + if k == string(r[:]) { + return true, nil + } + } + + return false, nil +} diff --git a/beacon-chain/operations/attestations/kv/aggregated_test.go b/beacon-chain/operations/attestations/kv/aggregated_test.go index bfa9fb91b..ad57cd2d3 100644 --- a/beacon-chain/operations/attestations/kv/aggregated_test.go +++ b/beacon-chain/operations/attestations/kv/aggregated_test.go @@ -106,3 +106,45 @@ func TestKV_Aggregated_CheckExpTime(t *testing.T) { math.RoundToEven(exp.Sub(time.Now()).Seconds())) } } + +func TestKV_HasAggregatedAttestation(t *testing.T) { + cache := NewAttCaches() + + att := ðpb.Attestation{AggregationBits: bitfield.Bitlist{0b111}} + has, err := cache.HasAggregatedAttestation(att) + if err != nil { + t.Fatal(err) + } + if has { + t.Error("should not have unsaved att in cache") + } + if err := cache.SaveAggregatedAttestation(att); err != nil { + t.Fatal(err) + } + has, err = cache.HasAggregatedAttestation(att) + if err != nil { + t.Fatal(err) + } + if !has { + t.Error("should have saved att in cache") + } + + att = ðpb.Attestation{AggregationBits: bitfield.Bitlist{0b1111}} + has, err = cache.HasAggregatedAttestation(att) + if err != nil { + t.Fatal(err) + } + if has { + t.Error("should not have unsaved att in cache") + } + if err := cache.SaveBlockAttestation(att); err != nil { + t.Fatal(err) + } + has, err = cache.HasAggregatedAttestation(att) + if err != nil { + t.Fatal(err) + } + if !has { + t.Error("should have saved att in cache") + } +} diff --git a/beacon-chain/operations/attestations/pool.go b/beacon-chain/operations/attestations/pool.go index 756dfe812..1fb69a1db 100644 --- a/beacon-chain/operations/attestations/pool.go +++ b/beacon-chain/operations/attestations/pool.go @@ -16,6 +16,7 @@ type Pool interface { AggregatedAttestations() []*ethpb.Attestation AggregatedAttestationsBySlotIndex(slot uint64, committeeIndex uint64) []*ethpb.Attestation DeleteAggregatedAttestation(att *ethpb.Attestation) error + HasAggregatedAttestation(att *ethpb.Attestation) (bool, error) // For unaggregated attestations. SaveUnaggregatedAttestation(att *ethpb.Attestation) error SaveUnaggregatedAttestations(atts []*ethpb.Attestation) error diff --git a/beacon-chain/sync/validate_aggregate_proof.go b/beacon-chain/sync/validate_aggregate_proof.go index 25a3337cd..cb7580bdb 100644 --- a/beacon-chain/sync/validate_aggregate_proof.go +++ b/beacon-chain/sync/validate_aggregate_proof.go @@ -51,7 +51,14 @@ func (r *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms attSlot := m.Aggregate.Data.Slot // Verify aggregate attestation has not already been seen via aggregate gossip, within a block, or through the creation locally. - // TODO(3835): Blocked by operation pool redesign + seen, err := r.attPool.HasAggregatedAttestation(m.Aggregate) + if err != nil { + traceutil.AnnotateError(span, err) + return false + } + if seen { + return false + } // Verify the block being voted for passes validation. The block should have passed validation if it's in the DB. if !r.db.HasBlock(ctx, bytesutil.ToBytes32(m.Aggregate.Data.BeaconBlockRoot)) { diff --git a/beacon-chain/sync/validate_aggregate_proof_test.go b/beacon-chain/sync/validate_aggregate_proof_test.go index c07912c7e..23baa542d 100644 --- a/beacon-chain/sync/validate_aggregate_proof_test.go +++ b/beacon-chain/sync/validate_aggregate_proof_test.go @@ -16,6 +16,7 @@ import ( mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing" mockSync "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync/testing" @@ -127,6 +128,7 @@ func TestValidateAggregateAndProof_NoBlock(t *testing.T) { p2p: p, db: db, initialSync: &mockSync.Sync{IsSyncing: false}, + attPool: attestations.NewPool(), } buf := new(bytes.Buffer) @@ -183,6 +185,7 @@ func TestValidateAggregateAndProof_NotWithinSlotRange(t *testing.T) { initialSync: &mockSync.Sync{IsSyncing: false}, chain: &mock.ChainService{Genesis: time.Now(), State: beaconState}, + attPool: attestations.NewPool(), } buf := new(bytes.Buffer) @@ -223,6 +226,66 @@ func TestValidateAggregateAndProof_NotWithinSlotRange(t *testing.T) { } } +func TestValidateAggregateAndProof_ExistedInPool(t *testing.T) { + db := dbtest.SetupDB(t) + defer dbtest.TeardownDB(t, db) + p := p2ptest.NewTestP2P(t) + + validators := uint64(256) + beaconState, _ := testutil.DeterministicGenesisState(t, validators) + + b := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{}} + db.SaveBlock(context.Background(), b) + root, _ := ssz.HashTreeRoot(b.Block) + + aggBits := bitfield.NewBitlist(3) + aggBits.SetBitAt(0, true) + att := ðpb.Attestation{ + Data: ðpb.AttestationData{ + Slot: 1, + BeaconBlockRoot: root[:], + Source: ðpb.Checkpoint{Epoch: 0, Root: []byte("hello-world")}, + Target: ðpb.Checkpoint{Epoch: 0, Root: []byte("hello-world")}, + }, + AggregationBits: aggBits, + } + + aggregateAndProof := ðpb.AggregateAttestationAndProof{ + Aggregate: att, + } + + beaconState.GenesisTime = uint64(time.Now().Unix()) + r := &Service{ + attPool: attestations.NewPool(), + p2p: p, + db: db, + initialSync: &mockSync.Sync{IsSyncing: false}, + chain: &mock.ChainService{Genesis: time.Now(), + State: beaconState}, + } + + buf := new(bytes.Buffer) + if _, err := p.Encoding().Encode(buf, aggregateAndProof); err != nil { + t.Fatal(err) + } + + msg := &pubsub.Message{ + Message: &pubsubpb.Message{ + Data: buf.Bytes(), + TopicIDs: []string{ + p2p.GossipTypeMapping[reflect.TypeOf(aggregateAndProof)], + }, + }, + } + + if err := r.attPool.SaveBlockAttestation(att); err != nil { + t.Fatal(err) + } + if r.validateAggregateAndProof(context.Background(), "", msg) { + t.Error("Expected validate to fail") + } +} + func TestValidateAggregateAndProof_CanValidate(t *testing.T) { db := dbtest.SetupDB(t) defer dbtest.TeardownDB(t, db) @@ -288,6 +351,7 @@ func TestValidateAggregateAndProof_CanValidate(t *testing.T) { FinalizedCheckPoint: ðpb.Checkpoint{ Epoch: 0, }}, + attPool: attestations.NewPool(), } buf := new(bytes.Buffer)