add fix and test (#6533)

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Nishant Das 2020-07-09 22:46:35 +08:00 committed by GitHub
parent b00c235586
commit c804347fc4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 95 additions and 10 deletions

View File

@ -9,6 +9,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
pb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/params"
)
var (
@ -92,10 +94,17 @@ func (s *Service) updateMetrics() {
indices := s.aggregatorSubnetIndices(s.chain.CurrentSlot())
attTopic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.Attestation{})]
attTopic += s.p2p.Encoding().ProtocolSuffix()
if featureconfig.Get().DisableDynamicCommitteeSubnets {
for i := uint64(0); i < params.BeaconNetworkConfig().AttestationSubnetCount; i++ {
formattedTopic := fmt.Sprintf(attTopic, digest, i)
topicPeerCount.WithLabelValues(formattedTopic).Set(float64(len(s.p2p.PubSub().ListPeers(formattedTopic))))
}
} else {
for _, committeeIdx := range indices {
formattedTopic := fmt.Sprintf(attTopic, digest, committeeIdx)
topicPeerCount.WithLabelValues(formattedTopic).Set(float64(len(s.p2p.PubSub().ListPeers(formattedTopic))))
}
}
// We update all other gossip topics.
for topic := range p2p.GossipTopicMappings {
// We already updated attestation subnet topics.

View File

@ -73,13 +73,11 @@ func (s *Service) registerSubscribers() {
s.attesterSlashingSubscriber,
)
if featureconfig.Get().DisableDynamicCommitteeSubnets {
for i := uint64(0); i < params.BeaconNetworkConfig().AttestationSubnetCount; i++ {
s.subscribe(
fmt.Sprintf("/eth2/%%x/beacon_attestation_%d", i),
s.subscribeStaticWithSubnets(
"/eth2/%x/beacon_attestation_%d",
s.validateCommitteeIndexBeaconAttestation, /* validator */
s.committeeIndexBeaconAttestationSubscriber, /* message handler */
)
}
} else {
s.subscribeDynamicWithSubnets(
"/eth2/%x/beacon_attestation_%d",
@ -187,6 +185,49 @@ func wrapAndReportValidation(topic string, v pubsub.ValidatorEx) (string, pubsub
}
}
// subscribe to a static subnet with the given topic and index.A given validator and subscription handler is
// used to handle messages from the subnet. The base protobuf message is used to initialize new messages for decoding.
func (s *Service) subscribeStaticWithSubnets(topic string, validator pubsub.ValidatorEx, handle subHandler) {
base := p2p.GossipTopicMappings[topic]
if base == nil {
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic))
}
for i := uint64(0); i < params.BeaconNetworkConfig().AttestationSubnetCount; i++ {
s.subscribeWithBase(base, s.addDigestAndIndexToTopic(topic, i), validator, handle)
}
genesis := s.chain.GenesisTime()
ticker := slotutil.GetSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot)
go func() {
for {
select {
case <-s.ctx.Done():
ticker.Done()
return
case <-ticker.C():
if s.chainStarted && s.initialSync.Syncing() {
continue
}
// Check every slot that there are enough peers
for i := uint64(0); i < params.BeaconNetworkConfig().AttestationSubnetCount; i++ {
if !s.validPeersExist(topic, i) {
log.Debugf("No peers found subscribed to attestation gossip subnet with "+
"committee index %d. Searching network for peers subscribed to the subnet.", i)
go func(idx uint64) {
_, err := s.p2p.FindPeersWithSubnet(idx)
if err != nil {
log.Errorf("Could not search for peers: %v", err)
return
}
}(i)
return
}
}
}
}
}()
}
// subscribe to a dynamically changing list of subnets. This method expects a fmt compatible
// string for the topic name and the list of subnets for subscribed topics that should be
// maintained.
@ -325,6 +366,18 @@ func (s *Service) addDigestToTopic(topic string) string {
return fmt.Sprintf(topic, digest)
}
// Add the digest and index to subnet topic.
func (s *Service) addDigestAndIndexToTopic(topic string, idx uint64) string {
if !strings.Contains(topic, "%x") {
log.Fatal("Topic does not have appropriate formatter for digest")
}
digest, err := s.forkDigest()
if err != nil {
log.WithError(err).Fatal("Could not compute fork digest")
}
return fmt.Sprintf(topic, digest, idx)
}
func (s *Service) forkDigest() ([4]byte, error) {
genRoot := s.chain.GenesisValidatorRoot()
return p2putils.CreateForkDigest(s.chain.GenesisTime(), genRoot[:])

View File

@ -261,3 +261,26 @@ func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) {
r.reValidateSubscriptions(subscriptions, []uint64{2}, defaultTopic, digest)
testutil.AssertLogsDoNotContain(t, hook, "Failed to unregister topic validator")
}
func TestStaticSubnets(t *testing.T) {
p := p2ptest.NewTestP2P(t)
ctx, cancel := context.WithCancel(context.Background())
r := Service{
ctx: ctx,
chain: &mockChain.ChainService{
Genesis: time.Now(),
ValidatorsRoot: [32]byte{'A'},
},
p2p: p,
}
defaultTopic := "/eth2/%x/beacon_attestation_%d"
r.subscribeStaticWithSubnets(defaultTopic, r.noopValidator, func(_ context.Context, msg proto.Message) error {
// no-op
return nil
})
topics := r.p2p.PubSub().GetTopics()
if uint64(len(topics)) != params.BeaconNetworkConfig().AttestationSubnetCount {
t.Errorf("Wanted the number of subnet topics registered to be %d but got %d", params.BeaconNetworkConfig().AttestationSubnetCount, len(topics))
}
cancel()
}