Graduate Batch Gossip Verification Feature (#10553)

* grad feat

* fix stuck tests
This commit is contained in:
Nishant Das 2022-04-26 20:28:35 +08:00 committed by GitHub
parent f4c004085b
commit 7c3d89b25f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 86 additions and 123 deletions

View File

@ -108,7 +108,9 @@ func TestProcessPendingAtts_HasBlockSaveUnAggregatedAtt(t *testing.T) {
require.NoError(t, beaconState.SetGenesisTime(uint64(time.Now().Unix())))
ctx, cancel := context.WithCancel(context.Background())
r := &Service{
ctx: ctx,
cfg: &config{
p2p: p1,
beaconDB: db,
@ -122,7 +124,9 @@ func TestProcessPendingAtts_HasBlockSaveUnAggregatedAtt(t *testing.T) {
},
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof),
seenUnAggregatedAttestationCache: lruwrpr.New(10),
signatureChan: make(chan *signatureVerifier, verifierLimit),
}
go r.verifierRoutine()
s, err := util.NewBeaconState()
require.NoError(t, err)
@ -137,6 +141,7 @@ func TestProcessPendingAtts_HasBlockSaveUnAggregatedAtt(t *testing.T) {
assert.DeepEqual(t, att, atts[0], "Incorrect saved att")
assert.Equal(t, 0, len(r.cfg.attPool.AggregatedAttestations()), "Did save aggregated att")
require.LogsContain(t, hook, "Verified and saved pending attestations to pool")
cancel()
}
func TestProcessPendingAtts_NoBroadcastWithBadSignature(t *testing.T) {
@ -220,7 +225,9 @@ func TestProcessPendingAtts_NoBroadcastWithBadSignature(t *testing.T) {
require.NoError(t, err)
require.NoError(t, s.SetGenesisTime(uint64(time.Now().Unix())))
ctx, cancel := context.WithCancel(context.Background())
r = &Service{
ctx: ctx,
cfg: &config{
p2p: p1,
beaconDB: db,
@ -234,12 +241,15 @@ func TestProcessPendingAtts_NoBroadcastWithBadSignature(t *testing.T) {
},
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof),
seenUnAggregatedAttestationCache: lruwrpr.New(10),
signatureChan: make(chan *signatureVerifier, verifierLimit),
}
go r.verifierRoutine()
r.blkRootToPendingAtts[r32] = []*ethpb.SignedAggregateAttestationAndProof{{Message: aggregateAndProof, Signature: aggreSig}}
require.NoError(t, r.processPendingAtts(context.Background()))
assert.Equal(t, true, p1.BroadcastCalled, "Could not broadcast the good aggregate")
cancel()
}
func TestProcessPendingAtts_HasBlockSaveAggregatedAtt(t *testing.T) {
@ -299,7 +309,9 @@ func TestProcessPendingAtts_HasBlockSaveAggregatedAtt(t *testing.T) {
require.NoError(t, beaconState.SetGenesisTime(uint64(time.Now().Unix())))
ctx, cancel := context.WithCancel(context.Background())
r := &Service{
ctx: ctx,
cfg: &config{
p2p: p1,
beaconDB: db,
@ -313,8 +325,9 @@ func TestProcessPendingAtts_HasBlockSaveAggregatedAtt(t *testing.T) {
},
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof),
seenAggregatedAttestationCache: lruwrpr.New(10),
signatureChan: make(chan *signatureVerifier, verifierLimit),
}
go r.verifierRoutine()
s, err := util.NewBeaconState()
require.NoError(t, err)
require.NoError(t, r.cfg.beaconDB.SaveState(context.Background(), s, root))
@ -328,6 +341,7 @@ func TestProcessPendingAtts_HasBlockSaveAggregatedAtt(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, 0, len(atts), "Did save aggregated att")
require.LogsContain(t, hook, "Verified and saved pending attestations to pool")
cancel()
}
func TestValidatePendingAtts_CanPruneOldAtts(t *testing.T) {

View File

@ -15,7 +15,6 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/signing"
"github.com/prysmaticlabs/prysm/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/config/features"
"github.com/prysmaticlabs/prysm/config/params"
"github.com/prysmaticlabs/prysm/crypto/bls"
"github.com/prysmaticlabs/prysm/encoding/bytesutil"
@ -199,20 +198,7 @@ func (s *Service) validateAggregatedAtt(ctx context.Context, signed *ethpb.Signe
set := bls.NewSet()
set.Join(selectionSigSet).Join(aggregatorSigSet).Join(attSigSet)
if features.Get().EnableBatchVerification {
return s.validateWithBatchVerifier(ctx, "aggregate", set)
}
valid, err := set.Verify()
if err != nil {
tracing.AnnotateError(span, errors.Errorf("Could not join signature set"))
return pubsub.ValidationIgnore, err
}
if !valid {
err = errors.Errorf("Could not verify selection or aggregator or attestation signature")
tracing.AnnotateError(span, err)
return pubsub.ValidationReject, err
}
return pubsub.ValidationAccept, nil
return s.validateWithBatchVerifier(ctx, "aggregate", set)
}
func (s *Service) validateBlockInAttestation(ctx context.Context, satt *ethpb.SignedAggregateAttestationAndProof) bool {

View File

@ -360,7 +360,10 @@ func TestValidateAggregateAndProof_CanValidate(t *testing.T) {
require.NoError(t, err)
require.NoError(t, beaconState.SetGenesisTime(uint64(time.Now().Unix())))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
r := &Service{
ctx: ctx,
cfg: &config{
p2p: p,
beaconDB: db,
@ -376,8 +379,10 @@ func TestValidateAggregateAndProof_CanValidate(t *testing.T) {
attestationNotifier: (&mock.ChainService{}).OperationNotifier(),
},
seenAggregatedAttestationCache: lruwrpr.New(10),
signatureChan: make(chan *signatureVerifier, verifierLimit),
}
r.initCaches()
go r.verifierRoutine()
buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, signedAggregateAndProof)
@ -456,7 +461,10 @@ func TestVerifyIndexInCommittee_SeenAggregatorEpoch(t *testing.T) {
require.NoError(t, err)
require.NoError(t, beaconState.SetGenesisTime(uint64(time.Now().Unix())))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
r := &Service{
ctx: ctx,
cfg: &config{
p2p: p,
beaconDB: db,
@ -474,8 +482,10 @@ func TestVerifyIndexInCommittee_SeenAggregatorEpoch(t *testing.T) {
attestationNotifier: (&mock.ChainService{}).OperationNotifier(),
},
seenAggregatedAttestationCache: lruwrpr.New(10),
signatureChan: make(chan *signatureVerifier, verifierLimit),
}
r.initCaches()
go r.verifierRoutine()
buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, signedAggregateAndProof)

View File

@ -229,19 +229,12 @@ func (s *Service) validateUnaggregatedAttWithState(ctx context.Context, a *eth.A
return pubsub.ValidationReject, errors.New("attestation bitfield is invalid")
}
if features.Get().EnableBatchVerification {
set, err := blocks.AttestationSignatureBatch(ctx, bs, []*eth.Attestation{a})
if err != nil {
tracing.AnnotateError(span, err)
return pubsub.ValidationReject, err
}
return s.validateWithBatchVerifier(ctx, "attestation", set)
}
if err := blocks.VerifyAttestationSignature(ctx, bs, a); err != nil {
set, err := blocks.AttestationSignatureBatch(ctx, bs, []*eth.Attestation{a})
if err != nil {
tracing.AnnotateError(span, err)
return pubsub.ValidationReject, err
}
return pubsub.ValidationAccept, nil
return s.validateWithBatchVerifier(ctx, "attestation", set)
}
// Returns true if the attestation was already seen for the participating validator for the slot.

View File

@ -39,8 +39,10 @@ func TestService_validateCommitteeIndexBeaconAttestation(t *testing.T) {
ValidatorsRoot: [32]byte{'A'},
ValidAttestation: true,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s := &Service{
ctx: ctx,
cfg: &config{
initialSync: &mockSync.Sync{IsSyncing: false},
p2p: p,
@ -50,8 +52,10 @@ func TestService_validateCommitteeIndexBeaconAttestation(t *testing.T) {
},
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof),
seenUnAggregatedAttestationCache: lruwrpr.New(10),
signatureChan: make(chan *signatureVerifier, verifierLimit),
}
s.initCaches()
go s.verifierRoutine()
invalidRoot := [32]byte{'A', 'B', 'C', 'D'}
s.setBadBlock(ctx, invalidRoot)

View File

@ -14,7 +14,6 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/signing"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/config/features"
"github.com/prysmaticlabs/prysm/config/params"
"github.com/prysmaticlabs/prysm/crypto/bls"
"github.com/prysmaticlabs/prysm/encoding/bytesutil"
@ -253,27 +252,12 @@ func (s *Service) rejectInvalidSyncCommitteeSignature(m *ethpb.SyncCommitteeMess
// Batch verify message signature before unmarshalling
// the signature to a G2 point if batch verification is
// enabled.
if features.Get().EnableBatchVerification {
set := &bls.SignatureBatch{
Messages: [][32]byte{sigRoot},
PublicKeys: []bls.PublicKey{pKey},
Signatures: [][]byte{m.Signature},
}
return s.validateWithBatchVerifier(ctx, "sync committee message", set)
set := &bls.SignatureBatch{
Messages: [][32]byte{sigRoot},
PublicKeys: []bls.PublicKey{pKey},
Signatures: [][]byte{m.Signature},
}
// We reject a malformed signature from bytes according to the p2p specification.
blsSig, err := bls.SignatureFromBytes(m.Signature)
if err != nil {
tracing.AnnotateError(span, err)
return pubsub.ValidationReject, err
}
verified := blsSig.Verify(pKey, sigRoot[:])
if !verified {
return pubsub.ValidationReject, errors.New("signature failed verification")
}
return pubsub.ValidationAccept, nil
return s.validateWithBatchVerifier(ctx, "sync committee message", set)
}
}

View File

@ -12,7 +12,6 @@ import (
opfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation"
"github.com/prysmaticlabs/prysm/beacon-chain/core/signing"
p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/config/features"
"github.com/prysmaticlabs/prysm/config/params"
"github.com/prysmaticlabs/prysm/crypto/bls"
"github.com/prysmaticlabs/prysm/encoding/bytesutil"
@ -224,30 +223,22 @@ func (s *Service) rejectInvalidContributionSignature(m *ethpb.SignedContribution
if err != nil {
return pubsub.ValidationIgnore, err
}
if features.Get().EnableBatchVerification {
publicKey, err := bls.PublicKeyFromBytes(pubkey[:])
if err != nil {
tracing.AnnotateError(span, err)
return pubsub.ValidationReject, err
}
root, err := signing.ComputeSigningRoot(m.Message, d)
if err != nil {
tracing.AnnotateError(span, err)
return pubsub.ValidationReject, err
}
set := &bls.SignatureBatch{
Messages: [][32]byte{root},
PublicKeys: []bls.PublicKey{publicKey},
Signatures: [][]byte{m.Signature},
}
return s.validateWithBatchVerifier(ctx, "sync contribution signature", set)
}
if err := signing.VerifySigningRoot(m.Message, pubkey[:], m.Signature, d); err != nil {
publicKey, err := bls.PublicKeyFromBytes(pubkey[:])
if err != nil {
tracing.AnnotateError(span, err)
return pubsub.ValidationReject, err
}
return pubsub.ValidationAccept, nil
root, err := signing.ComputeSigningRoot(m.Message, d)
if err != nil {
tracing.AnnotateError(span, err)
return pubsub.ValidationReject, err
}
set := &bls.SignatureBatch{
Messages: [][32]byte{root},
PublicKeys: []bls.PublicKey{publicKey},
Signatures: [][]byte{m.Signature},
}
return s.validateWithBatchVerifier(ctx, "sync contribution signature", set)
}
}
@ -293,29 +284,17 @@ func (s *Service) rejectInvalidSyncAggregateSignature(m *ethpb.SignedContributio
}
// Aggregate pubkeys separately again to allow
// for signature sets to be created for batch verification.
if features.Get().EnableBatchVerification {
aggKey, err := bls.AggregatePublicKeys(activeRawPubkeys)
if err != nil {
tracing.AnnotateError(span, err)
return pubsub.ValidationIgnore, err
}
set := &bls.SignatureBatch{
Messages: [][32]byte{sigRoot},
PublicKeys: []bls.PublicKey{aggKey},
Signatures: [][]byte{m.Message.Contribution.Signature},
}
return s.validateWithBatchVerifier(ctx, "sync contribution aggregate signature", set)
}
sig, err := bls.SignatureFromBytes(m.Message.Contribution.Signature)
aggKey, err := bls.AggregatePublicKeys(activeRawPubkeys)
if err != nil {
tracing.AnnotateError(span, err)
return pubsub.ValidationReject, err
return pubsub.ValidationIgnore, err
}
verified := sig.Eth2FastAggregateVerify(activePubkeys, sigRoot)
if !verified {
return pubsub.ValidationReject, errors.New("verification failed")
set := &bls.SignatureBatch{
Messages: [][32]byte{sigRoot},
PublicKeys: []bls.PublicKey{aggKey},
Signatures: [][]byte{m.Message.Contribution.Signature},
}
return pubsub.ValidationAccept, nil
return s.validateWithBatchVerifier(ctx, "sync contribution aggregate signature", set)
}
}
@ -351,28 +330,25 @@ func (s *Service) verifySyncSelectionData(ctx context.Context, m *ethpb.Contribu
if err != nil {
return err
}
if features.Get().EnableBatchVerification {
publicKey, err := bls.PublicKeyFromBytes(pubkey[:])
if err != nil {
return err
}
root, err := signing.ComputeSigningRoot(selectionData, domain)
if err != nil {
return err
}
set := &bls.SignatureBatch{
Messages: [][32]byte{root},
PublicKeys: []bls.PublicKey{publicKey},
Signatures: [][]byte{m.SelectionProof},
}
valid, err := s.validateWithBatchVerifier(ctx, "sync contribution selection signature", set)
if err != nil {
return err
}
if valid != pubsub.ValidationAccept {
return errors.New("invalid sync selection proof provided")
}
return nil
publicKey, err := bls.PublicKeyFromBytes(pubkey[:])
if err != nil {
return err
}
return signing.VerifySigningRoot(selectionData, pubkey[:], m.SelectionProof, domain)
root, err := signing.ComputeSigningRoot(selectionData, domain)
if err != nil {
return err
}
set := &bls.SignatureBatch{
Messages: [][32]byte{root},
PublicKeys: []bls.PublicKey{publicKey},
Signatures: [][]byte{m.SelectionProof},
}
valid, err := s.validateWithBatchVerifier(ctx, "sync contribution selection signature", set)
if err != nil {
return err
}
if valid != pubsub.ValidationAccept {
return errors.New("invalid sync selection proof provided")
}
return nil
}

View File

@ -926,6 +926,7 @@ func TestValidateSyncContributionAndProof(t *testing.T) {
WithStateNotifier(chainService.StateNotifier()),
WithOperationNotifier(chainService.OperationNotifier()),
)
go s.verifierRoutine()
s.cfg.stateGen = stategen.New(db)
msg.Message.Contribution.BlockRoot = headRoot[:]
s.cfg.beaconDB = db

View File

@ -45,7 +45,6 @@ type Flags struct {
DisableAttestingHistoryDBCache bool // DisableAttestingHistoryDBCache for the validator client increases disk reads/writes.
EnableDoppelGanger bool // EnableDoppelGanger enables doppelganger protection on startup for the validator.
EnableHistoricalSpaceRepresentation bool // EnableHistoricalSpaceRepresentation enables the saving of registry validators in separate buckets to save space
EnableBatchVerification bool // EnableBatchVerification enables batch signature verification on gossip messages.
// Logging related toggles.
DisableGRPCConnectionLogs bool // Disables logging when a new grpc client has connected.
@ -174,11 +173,6 @@ func ConfigureBeaconChain(ctx *cli.Context) {
logDisabled(disableCorrectlyPruneCanonicalAtts)
cfg.CorrectlyPruneCanonicalAtts = false
}
cfg.EnableBatchVerification = true
if ctx.Bool(disableBatchGossipVerification.Name) {
logDisabled(disableBatchGossipVerification)
cfg.EnableBatchVerification = false
}
cfg.EnableNativeState = false
if ctx.Bool(enableNativeState.Name) {
logEnabled(enableNativeState)

View File

@ -100,6 +100,11 @@ var (
Usage: deprecatedUsage,
Hidden: true,
}
deprecatedDisableBatchGossipVerification = &cli.BoolFlag{
Name: "disable-batch-gossip-verification",
Usage: deprecatedUsage,
Hidden: true,
}
)
var deprecatedFlags = []cli.Flag{
@ -120,4 +125,5 @@ var deprecatedFlags = []cli.Flag{
deprecatedDisableOptimizedBalanceUpdate,
deprecatedDisableActiveBalanceCache,
deprecatedDisableBalanceTrieComputation,
deprecatedDisableBatchGossipVerification,
}

View File

@ -101,10 +101,6 @@ var (
Usage: "Disable the fix for bug where any block attestations can get incorrectly pruned, which improves validator profitability and overall network health," +
"see issue #9443 for further detail",
}
disableBatchGossipVerification = &cli.BoolFlag{
Name: "disable-batch-gossip-verification",
Usage: "This enables batch verification of signatures received over gossip.",
}
enableNativeState = &cli.BoolFlag{
Name: "enable-native-state",
Usage: "Enables representing the beacon state as a pure Go struct.",
@ -159,7 +155,6 @@ var BeaconChainFlags = append(deprecatedFlags, []cli.Flag{
enableHistoricalSpaceRepresentation,
disableCorrectlyInsertOrphanedAtts,
disableCorrectlyPruneCanonicalAtts,
disableBatchGossipVerification,
enableNativeState,
enableVecHTR,
enableForkChoiceDoublyLinkedTree,