Added AggregatedAttestation helper (#3539)

This commit is contained in:
terence tsao 2019-09-21 09:43:18 -07:00 committed by GitHub
parent fb8d6a4046
commit 2b2ef4f37c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 79 additions and 49 deletions

View File

@ -21,7 +21,6 @@ go_library(
"//beacon-chain/db/filters:go_default_library", "//beacon-chain/db/filters:go_default_library",
"//proto/beacon/p2p/v1:go_default_library", "//proto/beacon/p2p/v1:go_default_library",
"//proto/eth/v1alpha1:go_default_library", "//proto/eth/v1alpha1:go_default_library",
"//shared/bls:go_default_library",
"//shared/bytesutil:go_default_library", "//shared/bytesutil:go_default_library",
"//shared/hashutil:go_default_library", "//shared/hashutil:go_default_library",
"//shared/params:go_default_library", "//shared/params:go_default_library",

View File

@ -14,7 +14,6 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/state" "github.com/prysmaticlabs/prysm/beacon-chain/core/state"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1" ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bls"
"github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/hashutil" "github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/params"
@ -203,23 +202,12 @@ func (s *Store) aggregateAttestation(ctx context.Context, att *ethpb.Attestation
return err return err
} }
incomingAttBits := att.AggregationBits
if a, ok := s.attsQueue[root]; ok { if a, ok := s.attsQueue[root]; ok {
if !a.AggregationBits.Contains(incomingAttBits) { a, err := helpers.AggregateAttestation(a, att)
newBits := a.AggregationBits.Or(incomingAttBits) if err != nil {
incomingSig, err := bls.SignatureFromBytes(att.Signature) return nil
if err != nil {
return err
}
currentSig, err := bls.SignatureFromBytes(a.Signature)
if err != nil {
return err
}
aggregatedSig := bls.AggregateSignatures([]*bls.Signature{currentSig, incomingSig})
a.Signature = aggregatedSig.Marshal()
a.AggregationBits = newBits
s.attsQueue[root] = a
} }
s.attsQueue[root] = a
return nil return nil
} }

View File

@ -252,5 +252,3 @@ func (s *Store) JustifiedCheckpt() *ethpb.Checkpoint {
func (s *Store) FinalizedCheckpt() *ethpb.Checkpoint { func (s *Store) FinalizedCheckpt() *ethpb.Checkpoint {
return proto.Clone(s.finalizedCheckpt).(*ethpb.Checkpoint) return proto.Clone(s.finalizedCheckpt).(*ethpb.Checkpoint)
} }

View File

@ -41,7 +41,7 @@ func (s *Service) ReceiveBlock(ctx context.Context, block *ethpb.BeaconBlock) er
} }
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{
"blockRoot": hex.EncodeToString(root[:]), "blockRoot": hex.EncodeToString(root[:]),
}).Info("Broadcasting block") }).Debug("Broadcasting block")
if err := s.ReceiveBlockNoPubsub(ctx, block); err != nil { if err := s.ReceiveBlockNoPubsub(ctx, block); err != nil {
return err return err

View File

@ -4,6 +4,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1" ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bls"
"github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/params"
) )
@ -49,3 +50,32 @@ func AttestationDataSlot(state *pb.BeaconState, data *ethpb.AttestationData) (ui
return StartSlot(data.Target.Epoch) + (offset / (committeeCount / params.BeaconConfig().SlotsPerEpoch)), nil return StartSlot(data.Target.Epoch) + (offset / (committeeCount / params.BeaconConfig().SlotsPerEpoch)), nil
} }
// AggregateAttestation aggregates attestations a1 and a2 together.
func AggregateAttestation(a1 *ethpb.Attestation, a2 *ethpb.Attestation) (*ethpb.Attestation, error) {
baseAtt := a1
newAtt := a2
if a2.AggregationBits.Count() > a1.AggregationBits.Count() {
baseAtt = a2
newAtt = a1
}
if baseAtt.AggregationBits.Contains(newAtt.AggregationBits) {
return baseAtt, nil
}
newBits := baseAtt.AggregationBits.Or(newAtt.AggregationBits)
newSig, err := bls.SignatureFromBytes(newAtt.Signature)
if err != nil {
return nil, err
}
baseSig, err := bls.SignatureFromBytes(baseAtt.Signature)
if err != nil {
return nil, err
}
aggregatedSig := bls.AggregateSignatures([]*bls.Signature{baseSig, newSig})
baseAtt.Signature = aggregatedSig.Marshal()
baseAtt.AggregationBits = newBits
return baseAtt, nil
}

View File

@ -1,8 +1,10 @@
package helpers_test package helpers_test
import ( import (
"reflect"
"testing" "testing"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state" "github.com/prysmaticlabs/prysm/beacon-chain/core/state"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
@ -84,3 +86,30 @@ func TestAttestationDataSlot_ReturnsErrorWhenTargetEpochLessThanCurrentEpoch(t *
t.Logf("attestation slot=%v", s) t.Logf("attestation slot=%v", s)
} }
} }
func TestAggregateAttestation(t *testing.T) {
tests := []struct {
a1 *ethpb.Attestation
a2 *ethpb.Attestation
want *ethpb.Attestation
}{
{a1: &ethpb.Attestation{AggregationBits: []byte{}},
a2: &ethpb.Attestation{AggregationBits: []byte{}},
want: &ethpb.Attestation{AggregationBits: []byte{}}},
{a1: &ethpb.Attestation{AggregationBits: bitfield.Bitlist{0x02}},
a2: &ethpb.Attestation{AggregationBits: bitfield.Bitlist{0x03}},
want: &ethpb.Attestation{AggregationBits: []byte{0x03}}},
{a1: &ethpb.Attestation{AggregationBits: bitfield.Bitlist{0x03}},
a2: &ethpb.Attestation{AggregationBits: bitfield.Bitlist{0x02}},
want: &ethpb.Attestation{AggregationBits: []byte{0x03}}},
}
for _, tt := range tests {
got, err := helpers.AggregateAttestation(tt.a1, tt.a2)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("AggregateAttestation() = %v, want %v", got, tt.want)
}
}
}

View File

@ -7,10 +7,10 @@ go_library(
visibility = ["//beacon-chain:__subpackages__"], visibility = ["//beacon-chain:__subpackages__"],
deps = [ deps = [
"//beacon-chain/core/blocks:go_default_library", "//beacon-chain/core/blocks:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/state:go_default_library", "//beacon-chain/core/state:go_default_library",
"//beacon-chain/db:go_default_library", "//beacon-chain/db:go_default_library",
"//proto/eth/v1alpha1:go_default_library", "//proto/eth/v1alpha1:go_default_library",
"//shared/bls:go_default_library",
"//shared/event:go_default_library", "//shared/event:go_default_library",
"//shared/hashutil:go_default_library", "//shared/hashutil:go_default_library",
"//shared/messagehandler:go_default_library", "//shared/messagehandler:go_default_library",

View File

@ -13,10 +13,10 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prysmaticlabs/go-ssz" "github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/core/blocks" "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state" "github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/db" "github.com/prysmaticlabs/prysm/beacon-chain/db"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1" ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bls"
"github.com/prysmaticlabs/prysm/shared/event" "github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/hashutil" "github.com/prysmaticlabs/prysm/shared/hashutil"
handler "github.com/prysmaticlabs/prysm/shared/messagehandler" handler "github.com/prysmaticlabs/prysm/shared/messagehandler"
@ -214,31 +214,17 @@ func (s *Service) HandleAttestation(ctx context.Context, message proto.Message)
} }
} }
incomingAttBits := attestation.AggregationBits
if s.beaconDB.HasAttestation(ctx, root) { if s.beaconDB.HasAttestation(ctx, root) {
dbAtt, err := s.beaconDB.Attestation(ctx, root) dbAtt, err := s.beaconDB.Attestation(ctx, root)
if err != nil { if err != nil {
return err return err
} }
attestation, err = helpers.AggregateAttestation(dbAtt, attestation)
if !dbAtt.AggregationBits.Contains(incomingAttBits) { if err != nil {
newAggregationBits := dbAtt.AggregationBits.Or(incomingAttBits) return err
incomingAttSig, err := bls.SignatureFromBytes(attestation.Signature) }
if err != nil { if err := s.beaconDB.SaveAttestation(ctx, attestation); err != nil {
return err return err
}
dbSig, err := bls.SignatureFromBytes(dbAtt.Signature)
if err != nil {
return err
}
aggregatedSig := bls.AggregateSignatures([]*bls.Signature{dbSig, incomingAttSig})
dbAtt.Signature = aggregatedSig.Marshal()
dbAtt.AggregationBits = newAggregationBits
if err := s.beaconDB.SaveAttestation(ctx, dbAtt); err != nil {
return err
}
} else {
return nil
} }
} else { } else {
if err := s.beaconDB.SaveAttestation(ctx, attestation); err != nil { if err := s.beaconDB.SaveAttestation(ctx, attestation); err != nil {

View File

@ -37,12 +37,12 @@ type blockchainService interface {
// NewRegularSync service. // NewRegularSync service.
func NewRegularSync(cfg *Config) *RegularSync { func NewRegularSync(cfg *Config) *RegularSync {
r := &RegularSync{ r := &RegularSync{
ctx: context.Background(), ctx: context.Background(),
db: cfg.DB, db: cfg.DB,
p2p: cfg.P2P, p2p: cfg.P2P,
operations: cfg.Operations, operations: cfg.Operations,
chain: cfg.Chain, chain: cfg.Chain,
initialSync: cfg.InitialSync, initialSync: cfg.InitialSync,
slotToPendingBlocks: make(map[uint64]*ethpb.BeaconBlock), slotToPendingBlocks: make(map[uint64]*ethpb.BeaconBlock),
seenPendingBlocks: make(map[[32]byte]bool), seenPendingBlocks: make(map[[32]byte]bool),
} }
@ -65,7 +65,7 @@ type RegularSync struct {
seenPendingBlocks map[[32]byte]bool seenPendingBlocks map[[32]byte]bool
pendingQueueLock sync.RWMutex pendingQueueLock sync.RWMutex
chainStarted bool chainStarted bool
initialSync Checker initialSync Checker
} }
// Start the regular sync service. // Start the regular sync service.