From 77a8cf9d5b406202308b3fdcc33e0fa12657032d Mon Sep 17 00:00:00 2001 From: Giulio rebuffo Date: Fri, 17 Mar 2023 22:51:19 +0100 Subject: [PATCH] refactored sentinel gossip and only connect to nimbus now (#7127) --- cl/fork/fork.go | 29 ++++ cmd/sentinel/sentinel/pubsub.go | 147 ++++----------------- cmd/sentinel/sentinel/pubsub_test.go | 84 ------------ cmd/sentinel/sentinel/sentinel.go | 8 +- cmd/sentinel/sentinel/service/notifiers.go | 2 +- cmd/sentinel/sentinel/service/start.go | 33 +---- cmd/sentinel/sentinel/utils.go | 5 +- 7 files changed, 60 insertions(+), 248 deletions(-) delete mode 100644 cmd/sentinel/sentinel/pubsub_test.go diff --git a/cl/fork/fork.go b/cl/fork/fork.go index 6ecc6d4b3..a3dbef998 100644 --- a/cl/fork/fork.go +++ b/cl/fork/fork.go @@ -103,6 +103,35 @@ func ComputeForkDigest( return ComputeForkDigestForVersion(currentForkVersion, genesisConfig.GenesisValidatorRoot) } +func ComputeNextForkDigest( + beaconConfig *clparams.BeaconChainConfig, + genesisConfig *clparams.GenesisConfig, +) ([4]byte, error) { + if genesisConfig.GenesisTime == 0 { + return [4]byte{}, errors.New("genesis time is not set") + } + if genesisConfig.GenesisValidatorRoot == (libcommon.Hash{}) { + return [4]byte{}, errors.New("genesis validators root is not set") + } + + currentEpoch := utils.GetCurrentEpoch(genesisConfig.GenesisTime, beaconConfig.SecondsPerSlot, beaconConfig.SlotsPerEpoch) + // Retrieve next fork version. + nextForkIndex := 0 + forkList := forkList(beaconConfig.ForkVersionSchedule) + for _, fork := range forkList { + if currentEpoch >= fork.epoch { + nextForkIndex++ + continue + } + break + } + if nextForkIndex != len(forkList)-1 { + nextForkIndex++ + } + + return ComputeForkDigestForVersion(forkList[nextForkIndex].version, genesisConfig.GenesisValidatorRoot) +} + type fork struct { epoch uint64 version [4]byte diff --git a/cmd/sentinel/sentinel/pubsub.go b/cmd/sentinel/sentinel/pubsub.go index 1dc805a08..0e3db9d8b 100644 --- a/cmd/sentinel/sentinel/pubsub.go +++ b/cmd/sentinel/sentinel/pubsub.go @@ -16,40 +16,14 @@ package sentinel import ( "context" "fmt" - "strconv" "strings" "sync" - "time" "github.com/ledgerwatch/erigon/cl/fork" "github.com/ledgerwatch/log/v3" pubsub "github.com/libp2p/go-libp2p-pubsub" ) -const ( - // overlay parameters - gossipSubD = 8 // topic stable mesh target count - gossipSubDlo = 6 // topic stable mesh low watermark - gossipSubDhi = 12 // topic stable mesh high watermark - - // gossip parameters - gossipSubMcacheLen = 6 // number of windows to retain full messages in cache for `IWANT` responses - gossipSubMcacheGossip = 3 // number of windows to gossip about - gossipSubSeenTTL = 550 // number of heartbeat intervals to retain message IDs - - // fanout ttl - gossipSubFanoutTTL = 60000000000 // TTL for fanout maps for topics we are not subscribed to but have published to, in nano seconds - - // heartbeat interval - gossipSubHeartbeatInterval = 700 * time.Millisecond // frequency of heartbeat, milliseconds - - // misc - rSubD = 8 // random gossip target -) - -// Specifies the prefix for any pubsub topic. -const gossipTopicPrefix = "/eth2/" -const blockSubnetTopicFormat = "/eth2/%x/beacon_block" const SSZSnappyCodec = "ssz_snappy" type TopicName string @@ -119,42 +93,12 @@ func (s *GossipManager) Recv() <-chan *pubsub.Message { return s.ch } -// closes a specific topic -func (s *GossipManager) CloseTopic(topic string) { - s.mu.Lock() - defer s.mu.Unlock() - if val, ok := s.subscriptions[topic]; ok { - val.Close() - delete(s.subscriptions, topic) - } -} - -// reset'em -func (s *GossipManager) Reset() { - s.mu.Lock() - defer s.mu.Unlock() - for _, val := range s.subscriptions { - val.Close() // Close all. - } - s.subscriptions = map[string]*GossipSubscription{} -} - -// get a specific topic -func (s *GossipManager) GetSubscription(topic string) (*GossipSubscription, bool) { - s.mu.Lock() - defer s.mu.Unlock() - if val, ok := s.subscriptions[topic]; ok { - return val, true - } - return nil, false -} - func (s *GossipManager) GetMatchingSubscription(match string) *GossipSubscription { s.mu.Lock() defer s.mu.Unlock() var sub *GossipSubscription for topic, currSub := range s.subscriptions { - if strings.Contains(topic, string(BeaconBlockTopic)) { + if strings.Contains(topic, match) { sub = currSub } } @@ -167,79 +111,36 @@ func (s *GossipManager) AddSubscription(topic string, sub *GossipSubscription) { s.subscriptions[topic] = sub } -// starts listening to a specific topic (forwarding its messages to the gossip manager channel) -func (s *GossipManager) ListenTopic(topic string) error { - s.mu.Lock() - defer s.mu.Unlock() - if val, ok := s.subscriptions[topic]; ok { - return val.Listen() - } - return nil -} - -// closes the gossip manager -func (s *GossipManager) Close() { - s.mu.Lock() - defer s.mu.Unlock() - for _, val := range s.subscriptions { - val.Close() - } - close(s.ch) -} - -func (s *GossipManager) String() string { - s.mu.Lock() - defer s.mu.Unlock() - sb := strings.Builder{} - sb.Grow(len(s.subscriptions) * 4) - - for _, v := range s.subscriptions { - sb.Write([]byte(v.topic.String())) - sb.WriteString("=") - sb.WriteString(strconv.Itoa(len(v.topic.ListPeers()))) - sb.WriteString(" ") - } - return sb.String() -} - -func (s *Sentinel) RestartTopics() { - // Reset all topics - s.subManager.Reset() - for _, topic := range s.gossipTopics { - s.SubscribeGossip(topic) - } -} - func (s *Sentinel) SubscribeGossip(topic GossipTopic, opts ...pubsub.TopicOpt) (sub *GossipSubscription, err error) { - sub = &GossipSubscription{ - gossip_topic: topic, - ch: s.subManager.ch, - host: s.host.ID(), - ctx: s.ctx, - } - path := s.getTopic(topic) - sub.topic, err = s.pubsub.Join(path, opts...) - if err != nil { - return nil, fmt.Errorf("failed to join topic %s, err=%w", path, err) - } - s.subManager.AddSubscription(path, sub) - for _, t := range s.gossipTopics { - if t.CodecStr == topic.CodecStr { - return sub, nil + paths := s.getTopics(topic) + for _, path := range paths { + sub = &GossipSubscription{ + gossip_topic: topic, + ch: s.subManager.ch, + host: s.host.ID(), + ctx: s.ctx, } + sub.topic, err = s.pubsub.Join(path, opts...) + if err != nil { + return nil, fmt.Errorf("failed to join topic %s, err=%w", path, err) + } + s.subManager.AddSubscription(path, sub) } - s.gossipTopics = append(s.gossipTopics, topic) + return sub, nil } -func (s *Sentinel) LogTopicPeers() { - log.Info("[Gossip] Network Update", "topic peers", s.subManager.String()) -} - -func (s *Sentinel) getTopic(topic GossipTopic) string { - o, err := fork.ComputeForkDigest(s.cfg.BeaconConfig, s.cfg.GenesisConfig) +func (s *Sentinel) getTopics(topic GossipTopic) []string { + digest, err := fork.ComputeForkDigest(s.cfg.BeaconConfig, s.cfg.GenesisConfig) if err != nil { log.Error("[Gossip] Failed to calculate fork choice", "err", err) } - return fmt.Sprintf("/eth2/%x/%s/%s", o, topic.Name, topic.CodecStr) + nextDigest, err := fork.ComputeNextForkDigest(s.cfg.BeaconConfig, s.cfg.GenesisConfig) + if err != nil { + log.Error("[Gossip] Failed to calculate fork choice", "err", err) + } + return []string{ + fmt.Sprintf("/eth2/%x/%s/%s", nextDigest, topic.Name, topic.CodecStr), + fmt.Sprintf("/eth2/%x/%s/%s", digest, topic.Name, topic.CodecStr), + } } diff --git a/cmd/sentinel/sentinel/pubsub_test.go b/cmd/sentinel/sentinel/pubsub_test.go deleted file mode 100644 index c27a7239a..000000000 --- a/cmd/sentinel/sentinel/pubsub_test.go +++ /dev/null @@ -1,84 +0,0 @@ -package sentinel - -import ( - "testing" - - pubsub "github.com/libp2p/go-libp2p-pubsub" - "github.com/stretchr/testify/require" -) - -func NewMockGossipManager() *GossipManager { - subs := map[string]*GossipSubscription{ - "topic1": { - host: "host1", - }, - "topic3": { - host: "host2", - }, - "topic2": { - host: "host3", - }, - } - - return &GossipManager{ - ch: make(chan *pubsub.Message, 1), - subscriptions: subs, - } -} - -func TestCloseTopic(t *testing.T) { - gm := NewMockGossipManager() - - testCases := []struct { - topic string - }{ - {"topic1"}, - {"topic2"}, - } - - for _, testCase := range testCases { - // check that it has the topic before it closes it - if _, ok := gm.subscriptions[testCase.topic]; !ok { - t.Errorf("attempted to close invalid topic") - } - - gm.CloseTopic(testCase.topic) - - // check that topic has been deleted after closing - if _, ok := gm.subscriptions[testCase.topic]; ok { - t.Errorf("closed topic but topic still exists") - } - } -} - -func TestGetSubscription(t *testing.T) { - gm := NewMockGossipManager() - - testCases := []struct { - topic string - expectedSub *GossipSubscription - expectedBool bool - }{ - { - topic: "topic1", - expectedSub: gm.subscriptions["topic1"], - expectedBool: true, - }, - { - topic: "topic2", - expectedSub: gm.subscriptions["topic2"], - expectedBool: true, - }, - { - topic: "topic4", - expectedSub: nil, - expectedBool: false, - }, - } - - for _, testCase := range testCases { - subscription, ok := gm.GetSubscription(testCase.topic) - require.EqualValues(t, subscription, testCase.expectedSub) - require.EqualValues(t, ok, testCase.expectedBool) - } -} diff --git a/cmd/sentinel/sentinel/sentinel.go b/cmd/sentinel/sentinel/sentinel.go index d0415af8c..d0c2aebb2 100644 --- a/cmd/sentinel/sentinel/sentinel.go +++ b/cmd/sentinel/sentinel/sentinel.go @@ -20,7 +20,6 @@ import ( "net" "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon/cl/clparams" "github.com/ledgerwatch/erigon/cl/cltypes" "github.com/ledgerwatch/erigon/cl/fork" "github.com/ledgerwatch/erigon/cmd/sentinel/sentinel/handlers" @@ -53,7 +52,6 @@ type Sentinel struct { discoverConfig discover.Config pubsub *pubsub.PubSub subManager *GossipManager - gossipTopics []GossipTopic } func (s *Sentinel) createLocalNode( @@ -221,10 +219,6 @@ func New( return s, nil } -func (s *Sentinel) ChainConfigs() (clparams.BeaconChainConfig, clparams.GenesisConfig) { - return *s.cfg.BeaconConfig, *s.cfg.GenesisConfig -} - func (s *Sentinel) RecvGossip() <-chan *pubsub.Message { return s.subManager.Recv() } @@ -262,7 +256,7 @@ func (s *Sentinel) HasTooManyPeers() bool { } func (s *Sentinel) GetPeersCount() int { - sub := s.subManager.GetMatchingSubscription(string(LightClientFinalityUpdateTopic)) + sub := s.subManager.GetMatchingSubscription(string(LightClientOptimisticUpdateTopic)) if sub == nil { return len(s.host.Network().Peers()) diff --git a/cmd/sentinel/sentinel/service/notifiers.go b/cmd/sentinel/sentinel/service/notifiers.go index 9d0fca709..84dc5872f 100644 --- a/cmd/sentinel/sentinel/service/notifiers.go +++ b/cmd/sentinel/sentinel/service/notifiers.go @@ -8,7 +8,7 @@ import ( ) const ( - maxSubscribers = 100 // only 100 lightclients per sentinel + maxSubscribers = 100 // only 100 clients per sentinel ) type gossipObject struct { diff --git a/cmd/sentinel/sentinel/service/start.go b/cmd/sentinel/sentinel/service/start.go index fcf1008d1..e091239e2 100644 --- a/cmd/sentinel/sentinel/service/start.go +++ b/cmd/sentinel/sentinel/service/start.go @@ -9,7 +9,6 @@ import ( sentinelrpc "github.com/ledgerwatch/erigon-lib/gointerfaces/sentinel" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/cl/cltypes" - "github.com/ledgerwatch/erigon/cl/fork" "github.com/ledgerwatch/erigon/cmd/sentinel/sentinel" "github.com/ledgerwatch/erigon/cmd/sentinel/sentinel/handshake" "github.com/ledgerwatch/log/v3" @@ -25,31 +24,6 @@ type ServerConfig struct { Addr string } -func hardForkListener(ctx context.Context, s *sentinel.Sentinel) { - tryAgainInterval := time.NewTicker(time.Second) - beaconCfg, genesisCfg := s.ChainConfigs() - currentDigest, err := fork.ComputeForkDigest(&beaconCfg, &genesisCfg) - if err != nil { - log.Error("cannot listen for hard forks", "reasons", err) - return - } - for { - select { - case <-ctx.Done(): - return - case <-tryAgainInterval.C: - newDigest, err := fork.ComputeForkDigest(&beaconCfg, &genesisCfg) - if err != nil { - log.Error("cannot listen for hard forks", "reasons", err) - return - } - if newDigest != currentDigest { - s.RestartTopics() - currentDigest = newDigest - } - } - } -} func StartSentinelService(cfg *sentinel.SentinelConfig, db kv.RoDB, srvCfg *ServerConfig, creds credentials.TransportCredentials, initialStatus *cltypes.Status, rule handshake.RuleFunc) (sentinelrpc.SentinelClient, error) { ctx := context.Background() sent, err := sentinel.New(context.Background(), cfg, db, rule) @@ -63,9 +37,9 @@ func StartSentinelService(cfg *sentinel.SentinelConfig, db kv.RoDB, srvCfg *Serv sentinel.BeaconBlockSsz, // Cause problem due to buggy msg id will uncomment in the future. //sentinel.BeaconAggregateAndProofSsz, - sentinel.VoluntaryExitSsz, - sentinel.ProposerSlashingSsz, - sentinel.AttesterSlashingSsz, + //sentinel.VoluntaryExitSsz, + //sentinel.ProposerSlashingSsz, + //sentinel.AttesterSlashingSsz, sentinel.LightClientFinalityUpdateSsz, sentinel.LightClientOptimisticUpdateSsz, } @@ -81,7 +55,6 @@ func StartSentinelService(cfg *sentinel.SentinelConfig, db kv.RoDB, srvCfg *Serv log.Error("[Sentinel] failed to start sentinel", "err", err) } } - go hardForkListener(ctx, sent) log.Info("[Sentinel] Sentinel started", "enr", sent.String()) if initialStatus != nil { sent.SetStatus(initialStatus) diff --git a/cmd/sentinel/sentinel/utils.go b/cmd/sentinel/sentinel/utils.go index 40d336941..f909f89b4 100644 --- a/cmd/sentinel/sentinel/utils.go +++ b/cmd/sentinel/sentinel/utils.go @@ -34,11 +34,11 @@ func convertToInterfacePubkey(pubkey *ecdsa.PublicKey) (crypto.PubKey, error) { xVal, yVal := new(btcec.FieldVal), new(btcec.FieldVal) overflows := xVal.SetByteSlice(pubkey.X.Bytes()) if overflows { - return nil, fmt.Errorf("X value overflows") + return nil, fmt.Errorf("x value overflows") } overflows = yVal.SetByteSlice(pubkey.Y.Bytes()) if overflows { - return nil, fmt.Errorf("Y value overflows") + return nil, fmt.Errorf("y value overflows") } newKey := crypto.PubKey((*crypto.Secp256k1PublicKey)(btcec.NewPublicKey(xVal, yVal))) // Zero out temporary values. @@ -140,7 +140,6 @@ func connectToRandomPeer(s *Sentinel, topic string) (peerInfo peer.ID, err error node := validPeerList[index] if !isPeerWhitelisted(node, validPeerList) { - continue }