From c804347fc469e7c9a17b83b15f106ccf7b1d18ca Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Thu, 9 Jul 2020 22:46:35 +0800 Subject: [PATCH] add fix and test (#6533) Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com> --- beacon-chain/sync/metrics.go | 15 +++++-- beacon-chain/sync/subscriber.go | 67 +++++++++++++++++++++++++--- beacon-chain/sync/subscriber_test.go | 23 ++++++++++ 3 files changed, 95 insertions(+), 10 deletions(-) diff --git a/beacon-chain/sync/metrics.go b/beacon-chain/sync/metrics.go index b8277dea4..266913271 100644 --- a/beacon-chain/sync/metrics.go +++ b/beacon-chain/sync/metrics.go @@ -9,6 +9,8 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" pb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" + "github.com/prysmaticlabs/prysm/shared/featureconfig" + "github.com/prysmaticlabs/prysm/shared/params" ) var ( @@ -92,9 +94,16 @@ func (s *Service) updateMetrics() { indices := s.aggregatorSubnetIndices(s.chain.CurrentSlot()) attTopic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.Attestation{})] attTopic += s.p2p.Encoding().ProtocolSuffix() - for _, committeeIdx := range indices { - formattedTopic := fmt.Sprintf(attTopic, digest, committeeIdx) - topicPeerCount.WithLabelValues(formattedTopic).Set(float64(len(s.p2p.PubSub().ListPeers(formattedTopic)))) + if featureconfig.Get().DisableDynamicCommitteeSubnets { + for i := uint64(0); i < params.BeaconNetworkConfig().AttestationSubnetCount; i++ { + formattedTopic := fmt.Sprintf(attTopic, digest, i) + topicPeerCount.WithLabelValues(formattedTopic).Set(float64(len(s.p2p.PubSub().ListPeers(formattedTopic)))) + } + } else { + for _, committeeIdx := range indices { + formattedTopic := fmt.Sprintf(attTopic, digest, committeeIdx) + topicPeerCount.WithLabelValues(formattedTopic).Set(float64(len(s.p2p.PubSub().ListPeers(formattedTopic)))) + } } // We update all other gossip topics. for topic := range p2p.GossipTopicMappings { diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index ef4bbcf65..a3cd62270 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -73,13 +73,11 @@ func (s *Service) registerSubscribers() { s.attesterSlashingSubscriber, ) if featureconfig.Get().DisableDynamicCommitteeSubnets { - for i := uint64(0); i < params.BeaconNetworkConfig().AttestationSubnetCount; i++ { - s.subscribe( - fmt.Sprintf("/eth2/%%x/beacon_attestation_%d", i), - s.validateCommitteeIndexBeaconAttestation, /* validator */ - s.committeeIndexBeaconAttestationSubscriber, /* message handler */ - ) - } + s.subscribeStaticWithSubnets( + "/eth2/%x/beacon_attestation_%d", + s.validateCommitteeIndexBeaconAttestation, /* validator */ + s.committeeIndexBeaconAttestationSubscriber, /* message handler */ + ) } else { s.subscribeDynamicWithSubnets( "/eth2/%x/beacon_attestation_%d", @@ -187,6 +185,49 @@ func wrapAndReportValidation(topic string, v pubsub.ValidatorEx) (string, pubsub } } +// subscribe to a static subnet with the given topic and index.A given validator and subscription handler is +// used to handle messages from the subnet. The base protobuf message is used to initialize new messages for decoding. +func (s *Service) subscribeStaticWithSubnets(topic string, validator pubsub.ValidatorEx, handle subHandler) { + base := p2p.GossipTopicMappings[topic] + if base == nil { + panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic)) + } + for i := uint64(0); i < params.BeaconNetworkConfig().AttestationSubnetCount; i++ { + s.subscribeWithBase(base, s.addDigestAndIndexToTopic(topic, i), validator, handle) + } + genesis := s.chain.GenesisTime() + ticker := slotutil.GetSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot) + + go func() { + for { + select { + case <-s.ctx.Done(): + ticker.Done() + return + case <-ticker.C(): + if s.chainStarted && s.initialSync.Syncing() { + continue + } + // Check every slot that there are enough peers + for i := uint64(0); i < params.BeaconNetworkConfig().AttestationSubnetCount; i++ { + if !s.validPeersExist(topic, i) { + log.Debugf("No peers found subscribed to attestation gossip subnet with "+ + "committee index %d. Searching network for peers subscribed to the subnet.", i) + go func(idx uint64) { + _, err := s.p2p.FindPeersWithSubnet(idx) + if err != nil { + log.Errorf("Could not search for peers: %v", err) + return + } + }(i) + return + } + } + } + } + }() +} + // subscribe to a dynamically changing list of subnets. This method expects a fmt compatible // string for the topic name and the list of subnets for subscribed topics that should be // maintained. @@ -325,6 +366,18 @@ func (s *Service) addDigestToTopic(topic string) string { return fmt.Sprintf(topic, digest) } +// Add the digest and index to subnet topic. +func (s *Service) addDigestAndIndexToTopic(topic string, idx uint64) string { + if !strings.Contains(topic, "%x") { + log.Fatal("Topic does not have appropriate formatter for digest") + } + digest, err := s.forkDigest() + if err != nil { + log.WithError(err).Fatal("Could not compute fork digest") + } + return fmt.Sprintf(topic, digest, idx) +} + func (s *Service) forkDigest() ([4]byte, error) { genRoot := s.chain.GenesisValidatorRoot() return p2putils.CreateForkDigest(s.chain.GenesisTime(), genRoot[:]) diff --git a/beacon-chain/sync/subscriber_test.go b/beacon-chain/sync/subscriber_test.go index ba0fddb96..ead7cabbb 100644 --- a/beacon-chain/sync/subscriber_test.go +++ b/beacon-chain/sync/subscriber_test.go @@ -261,3 +261,26 @@ func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) { r.reValidateSubscriptions(subscriptions, []uint64{2}, defaultTopic, digest) testutil.AssertLogsDoNotContain(t, hook, "Failed to unregister topic validator") } + +func TestStaticSubnets(t *testing.T) { + p := p2ptest.NewTestP2P(t) + ctx, cancel := context.WithCancel(context.Background()) + r := Service{ + ctx: ctx, + chain: &mockChain.ChainService{ + Genesis: time.Now(), + ValidatorsRoot: [32]byte{'A'}, + }, + p2p: p, + } + defaultTopic := "/eth2/%x/beacon_attestation_%d" + r.subscribeStaticWithSubnets(defaultTopic, r.noopValidator, func(_ context.Context, msg proto.Message) error { + // no-op + return nil + }) + topics := r.p2p.PubSub().GetTopics() + if uint64(len(topics)) != params.BeaconNetworkConfig().AttestationSubnetCount { + t.Errorf("Wanted the number of subnet topics registered to be %d but got %d", params.BeaconNetworkConfig().AttestationSubnetCount, len(topics)) + } + cancel() +}