diff --git a/beacon-chain/sync/pending_attestations_queue.go b/beacon-chain/sync/pending_attestations_queue.go index 82b80324a..66d705db6 100644 --- a/beacon-chain/sync/pending_attestations_queue.go +++ b/beacon-chain/sync/pending_attestations_queue.go @@ -1,6 +1,7 @@ package sync import ( + "bytes" "context" "encoding/hex" "sync" @@ -157,7 +158,8 @@ func (s *Service) processAttestations(ctx context.Context, attestations []*ethpb // This defines how pending attestations is saved in the map. The key is the // root of the missing block. The value is the list of pending attestations -// that voted for that block root. +// that voted for that block root. The caller of this function is responsible +// for not sending repeated attestations to the pending queue. func (s *Service) savePendingAtt(att *ethpb.SignedAggregateAttestationAndProof) { root := bytesutil.ToBytes32(att.Message.Aggregate.Data.BeaconBlockRoot) @@ -178,17 +180,32 @@ func (s *Service) savePendingAtt(att *ethpb.SignedAggregateAttestationAndProof) s.blkRootToPendingAtts[root] = []*ethpb.SignedAggregateAttestationAndProof{att} return } - - // Skip if the attestation from the same aggregator already exists in the pending queue. + // Skip if the attestation from the same aggregator already exists in + // the pending queue. for _, a := range s.blkRootToPendingAtts[root] { - if a.Message.AggregatorIndex == att.Message.AggregatorIndex { + if attsAreEqual(att, a) { return } } - s.blkRootToPendingAtts[root] = append(s.blkRootToPendingAtts[root], att) } +func attsAreEqual(a, b *ethpb.SignedAggregateAttestationAndProof) bool { + if a.Signature != nil { + return b.Signature != nil && a.Message.AggregatorIndex == b.Message.AggregatorIndex + } + if b.Signature != nil { + return false + } + if a.Message.Aggregate.Data.Slot != b.Message.Aggregate.Data.Slot { + return false + } + if a.Message.Aggregate.Data.CommitteeIndex != b.Message.Aggregate.Data.CommitteeIndex { + return false + } + return bytes.Equal(a.Message.Aggregate.AggregationBits, b.Message.Aggregate.AggregationBits) +} + // This validates the pending attestations in the queue are still valid. // If not valid, a node will remove it in the queue in place. The validity // check specifies the pending attestation could not fall one epoch behind diff --git a/beacon-chain/sync/pending_attestations_queue_test.go b/beacon-chain/sync/pending_attestations_queue_test.go index 88288f310..3f32b8fa8 100644 --- a/beacon-chain/sync/pending_attestations_queue_test.go +++ b/beacon-chain/sync/pending_attestations_queue_test.go @@ -399,7 +399,7 @@ func TestValidatePendingAtts_CanPruneOldAtts(t *testing.T) { assert.Equal(t, 0, len(s.blkRootToPendingAtts), "Did not delete block keys") } -func TestValidatePendingAtts_NoDuplicatingAggregatorIndex(t *testing.T) { +func TestValidatePendingAtts_NoDuplicatingAtts(t *testing.T) { s := &Service{ blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof), } @@ -420,7 +420,7 @@ func TestValidatePendingAtts_NoDuplicatingAggregatorIndex(t *testing.T) { Message: ðpb.AggregateAttestationAndProof{ AggregatorIndex: 2, Aggregate: ðpb.Attestation{ - Data: ðpb.AttestationData{Slot: 3, BeaconBlockRoot: r2[:]}}}}) + Data: ðpb.AttestationData{Slot: 2, BeaconBlockRoot: r2[:]}}}}) assert.Equal(t, 1, len(s.blkRootToPendingAtts[r1]), "Did not save pending atts") assert.Equal(t, 1, len(s.blkRootToPendingAtts[r2]), "Did not save pending atts")