refactored sentinel gossip and only connect to nimbus now (#7127)

This commit is contained in:
Giulio rebuffo 2023-03-17 22:51:19 +01:00 committed by GitHub
parent 36828fbb08
commit 77a8cf9d5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 60 additions and 248 deletions

View File

@ -103,6 +103,35 @@ func ComputeForkDigest(
return ComputeForkDigestForVersion(currentForkVersion, genesisConfig.GenesisValidatorRoot) return ComputeForkDigestForVersion(currentForkVersion, genesisConfig.GenesisValidatorRoot)
} }
func ComputeNextForkDigest(
beaconConfig *clparams.BeaconChainConfig,
genesisConfig *clparams.GenesisConfig,
) ([4]byte, error) {
if genesisConfig.GenesisTime == 0 {
return [4]byte{}, errors.New("genesis time is not set")
}
if genesisConfig.GenesisValidatorRoot == (libcommon.Hash{}) {
return [4]byte{}, errors.New("genesis validators root is not set")
}
currentEpoch := utils.GetCurrentEpoch(genesisConfig.GenesisTime, beaconConfig.SecondsPerSlot, beaconConfig.SlotsPerEpoch)
// Retrieve next fork version.
nextForkIndex := 0
forkList := forkList(beaconConfig.ForkVersionSchedule)
for _, fork := range forkList {
if currentEpoch >= fork.epoch {
nextForkIndex++
continue
}
break
}
if nextForkIndex != len(forkList)-1 {
nextForkIndex++
}
return ComputeForkDigestForVersion(forkList[nextForkIndex].version, genesisConfig.GenesisValidatorRoot)
}
type fork struct { type fork struct {
epoch uint64 epoch uint64
version [4]byte version [4]byte

View File

@ -16,40 +16,14 @@ package sentinel
import ( import (
"context" "context"
"fmt" "fmt"
"strconv"
"strings" "strings"
"sync" "sync"
"time"
"github.com/ledgerwatch/erigon/cl/fork" "github.com/ledgerwatch/erigon/cl/fork"
"github.com/ledgerwatch/log/v3" "github.com/ledgerwatch/log/v3"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
) )
const (
// overlay parameters
gossipSubD = 8 // topic stable mesh target count
gossipSubDlo = 6 // topic stable mesh low watermark
gossipSubDhi = 12 // topic stable mesh high watermark
// gossip parameters
gossipSubMcacheLen = 6 // number of windows to retain full messages in cache for `IWANT` responses
gossipSubMcacheGossip = 3 // number of windows to gossip about
gossipSubSeenTTL = 550 // number of heartbeat intervals to retain message IDs
// fanout ttl
gossipSubFanoutTTL = 60000000000 // TTL for fanout maps for topics we are not subscribed to but have published to, in nano seconds
// heartbeat interval
gossipSubHeartbeatInterval = 700 * time.Millisecond // frequency of heartbeat, milliseconds
// misc
rSubD = 8 // random gossip target
)
// Specifies the prefix for any pubsub topic.
const gossipTopicPrefix = "/eth2/"
const blockSubnetTopicFormat = "/eth2/%x/beacon_block"
const SSZSnappyCodec = "ssz_snappy" const SSZSnappyCodec = "ssz_snappy"
type TopicName string type TopicName string
@ -119,42 +93,12 @@ func (s *GossipManager) Recv() <-chan *pubsub.Message {
return s.ch return s.ch
} }
// closes a specific topic
func (s *GossipManager) CloseTopic(topic string) {
s.mu.Lock()
defer s.mu.Unlock()
if val, ok := s.subscriptions[topic]; ok {
val.Close()
delete(s.subscriptions, topic)
}
}
// reset'em
func (s *GossipManager) Reset() {
s.mu.Lock()
defer s.mu.Unlock()
for _, val := range s.subscriptions {
val.Close() // Close all.
}
s.subscriptions = map[string]*GossipSubscription{}
}
// get a specific topic
func (s *GossipManager) GetSubscription(topic string) (*GossipSubscription, bool) {
s.mu.Lock()
defer s.mu.Unlock()
if val, ok := s.subscriptions[topic]; ok {
return val, true
}
return nil, false
}
func (s *GossipManager) GetMatchingSubscription(match string) *GossipSubscription { func (s *GossipManager) GetMatchingSubscription(match string) *GossipSubscription {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
var sub *GossipSubscription var sub *GossipSubscription
for topic, currSub := range s.subscriptions { for topic, currSub := range s.subscriptions {
if strings.Contains(topic, string(BeaconBlockTopic)) { if strings.Contains(topic, match) {
sub = currSub sub = currSub
} }
} }
@ -167,79 +111,36 @@ func (s *GossipManager) AddSubscription(topic string, sub *GossipSubscription) {
s.subscriptions[topic] = sub s.subscriptions[topic] = sub
} }
// starts listening to a specific topic (forwarding its messages to the gossip manager channel)
func (s *GossipManager) ListenTopic(topic string) error {
s.mu.Lock()
defer s.mu.Unlock()
if val, ok := s.subscriptions[topic]; ok {
return val.Listen()
}
return nil
}
// closes the gossip manager
func (s *GossipManager) Close() {
s.mu.Lock()
defer s.mu.Unlock()
for _, val := range s.subscriptions {
val.Close()
}
close(s.ch)
}
func (s *GossipManager) String() string {
s.mu.Lock()
defer s.mu.Unlock()
sb := strings.Builder{}
sb.Grow(len(s.subscriptions) * 4)
for _, v := range s.subscriptions {
sb.Write([]byte(v.topic.String()))
sb.WriteString("=")
sb.WriteString(strconv.Itoa(len(v.topic.ListPeers())))
sb.WriteString(" ")
}
return sb.String()
}
func (s *Sentinel) RestartTopics() {
// Reset all topics
s.subManager.Reset()
for _, topic := range s.gossipTopics {
s.SubscribeGossip(topic)
}
}
func (s *Sentinel) SubscribeGossip(topic GossipTopic, opts ...pubsub.TopicOpt) (sub *GossipSubscription, err error) { func (s *Sentinel) SubscribeGossip(topic GossipTopic, opts ...pubsub.TopicOpt) (sub *GossipSubscription, err error) {
sub = &GossipSubscription{ paths := s.getTopics(topic)
gossip_topic: topic, for _, path := range paths {
ch: s.subManager.ch, sub = &GossipSubscription{
host: s.host.ID(), gossip_topic: topic,
ctx: s.ctx, ch: s.subManager.ch,
} host: s.host.ID(),
path := s.getTopic(topic) ctx: s.ctx,
sub.topic, err = s.pubsub.Join(path, opts...)
if err != nil {
return nil, fmt.Errorf("failed to join topic %s, err=%w", path, err)
}
s.subManager.AddSubscription(path, sub)
for _, t := range s.gossipTopics {
if t.CodecStr == topic.CodecStr {
return sub, nil
} }
sub.topic, err = s.pubsub.Join(path, opts...)
if err != nil {
return nil, fmt.Errorf("failed to join topic %s, err=%w", path, err)
}
s.subManager.AddSubscription(path, sub)
} }
s.gossipTopics = append(s.gossipTopics, topic)
return sub, nil return sub, nil
} }
func (s *Sentinel) LogTopicPeers() { func (s *Sentinel) getTopics(topic GossipTopic) []string {
log.Info("[Gossip] Network Update", "topic peers", s.subManager.String()) digest, err := fork.ComputeForkDigest(s.cfg.BeaconConfig, s.cfg.GenesisConfig)
}
func (s *Sentinel) getTopic(topic GossipTopic) string {
o, err := fork.ComputeForkDigest(s.cfg.BeaconConfig, s.cfg.GenesisConfig)
if err != nil { if err != nil {
log.Error("[Gossip] Failed to calculate fork choice", "err", err) log.Error("[Gossip] Failed to calculate fork choice", "err", err)
} }
return fmt.Sprintf("/eth2/%x/%s/%s", o, topic.Name, topic.CodecStr) nextDigest, err := fork.ComputeNextForkDigest(s.cfg.BeaconConfig, s.cfg.GenesisConfig)
if err != nil {
log.Error("[Gossip] Failed to calculate fork choice", "err", err)
}
return []string{
fmt.Sprintf("/eth2/%x/%s/%s", nextDigest, topic.Name, topic.CodecStr),
fmt.Sprintf("/eth2/%x/%s/%s", digest, topic.Name, topic.CodecStr),
}
} }

View File

@ -1,84 +0,0 @@
package sentinel
import (
"testing"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/stretchr/testify/require"
)
func NewMockGossipManager() *GossipManager {
subs := map[string]*GossipSubscription{
"topic1": {
host: "host1",
},
"topic3": {
host: "host2",
},
"topic2": {
host: "host3",
},
}
return &GossipManager{
ch: make(chan *pubsub.Message, 1),
subscriptions: subs,
}
}
func TestCloseTopic(t *testing.T) {
gm := NewMockGossipManager()
testCases := []struct {
topic string
}{
{"topic1"},
{"topic2"},
}
for _, testCase := range testCases {
// check that it has the topic before it closes it
if _, ok := gm.subscriptions[testCase.topic]; !ok {
t.Errorf("attempted to close invalid topic")
}
gm.CloseTopic(testCase.topic)
// check that topic has been deleted after closing
if _, ok := gm.subscriptions[testCase.topic]; ok {
t.Errorf("closed topic but topic still exists")
}
}
}
func TestGetSubscription(t *testing.T) {
gm := NewMockGossipManager()
testCases := []struct {
topic string
expectedSub *GossipSubscription
expectedBool bool
}{
{
topic: "topic1",
expectedSub: gm.subscriptions["topic1"],
expectedBool: true,
},
{
topic: "topic2",
expectedSub: gm.subscriptions["topic2"],
expectedBool: true,
},
{
topic: "topic4",
expectedSub: nil,
expectedBool: false,
},
}
for _, testCase := range testCases {
subscription, ok := gm.GetSubscription(testCase.topic)
require.EqualValues(t, subscription, testCase.expectedSub)
require.EqualValues(t, ok, testCase.expectedBool)
}
}

View File

@ -20,7 +20,6 @@ import (
"net" "net"
"github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/cltypes" "github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cl/fork" "github.com/ledgerwatch/erigon/cl/fork"
"github.com/ledgerwatch/erigon/cmd/sentinel/sentinel/handlers" "github.com/ledgerwatch/erigon/cmd/sentinel/sentinel/handlers"
@ -53,7 +52,6 @@ type Sentinel struct {
discoverConfig discover.Config discoverConfig discover.Config
pubsub *pubsub.PubSub pubsub *pubsub.PubSub
subManager *GossipManager subManager *GossipManager
gossipTopics []GossipTopic
} }
func (s *Sentinel) createLocalNode( func (s *Sentinel) createLocalNode(
@ -221,10 +219,6 @@ func New(
return s, nil return s, nil
} }
func (s *Sentinel) ChainConfigs() (clparams.BeaconChainConfig, clparams.GenesisConfig) {
return *s.cfg.BeaconConfig, *s.cfg.GenesisConfig
}
func (s *Sentinel) RecvGossip() <-chan *pubsub.Message { func (s *Sentinel) RecvGossip() <-chan *pubsub.Message {
return s.subManager.Recv() return s.subManager.Recv()
} }
@ -262,7 +256,7 @@ func (s *Sentinel) HasTooManyPeers() bool {
} }
func (s *Sentinel) GetPeersCount() int { func (s *Sentinel) GetPeersCount() int {
sub := s.subManager.GetMatchingSubscription(string(LightClientFinalityUpdateTopic)) sub := s.subManager.GetMatchingSubscription(string(LightClientOptimisticUpdateTopic))
if sub == nil { if sub == nil {
return len(s.host.Network().Peers()) return len(s.host.Network().Peers())

View File

@ -8,7 +8,7 @@ import (
) )
const ( const (
maxSubscribers = 100 // only 100 lightclients per sentinel maxSubscribers = 100 // only 100 clients per sentinel
) )
type gossipObject struct { type gossipObject struct {

View File

@ -9,7 +9,6 @@ import (
sentinelrpc "github.com/ledgerwatch/erigon-lib/gointerfaces/sentinel" sentinelrpc "github.com/ledgerwatch/erigon-lib/gointerfaces/sentinel"
"github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/cl/cltypes" "github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cl/fork"
"github.com/ledgerwatch/erigon/cmd/sentinel/sentinel" "github.com/ledgerwatch/erigon/cmd/sentinel/sentinel"
"github.com/ledgerwatch/erigon/cmd/sentinel/sentinel/handshake" "github.com/ledgerwatch/erigon/cmd/sentinel/sentinel/handshake"
"github.com/ledgerwatch/log/v3" "github.com/ledgerwatch/log/v3"
@ -25,31 +24,6 @@ type ServerConfig struct {
Addr string Addr string
} }
func hardForkListener(ctx context.Context, s *sentinel.Sentinel) {
tryAgainInterval := time.NewTicker(time.Second)
beaconCfg, genesisCfg := s.ChainConfigs()
currentDigest, err := fork.ComputeForkDigest(&beaconCfg, &genesisCfg)
if err != nil {
log.Error("cannot listen for hard forks", "reasons", err)
return
}
for {
select {
case <-ctx.Done():
return
case <-tryAgainInterval.C:
newDigest, err := fork.ComputeForkDigest(&beaconCfg, &genesisCfg)
if err != nil {
log.Error("cannot listen for hard forks", "reasons", err)
return
}
if newDigest != currentDigest {
s.RestartTopics()
currentDigest = newDigest
}
}
}
}
func StartSentinelService(cfg *sentinel.SentinelConfig, db kv.RoDB, srvCfg *ServerConfig, creds credentials.TransportCredentials, initialStatus *cltypes.Status, rule handshake.RuleFunc) (sentinelrpc.SentinelClient, error) { func StartSentinelService(cfg *sentinel.SentinelConfig, db kv.RoDB, srvCfg *ServerConfig, creds credentials.TransportCredentials, initialStatus *cltypes.Status, rule handshake.RuleFunc) (sentinelrpc.SentinelClient, error) {
ctx := context.Background() ctx := context.Background()
sent, err := sentinel.New(context.Background(), cfg, db, rule) sent, err := sentinel.New(context.Background(), cfg, db, rule)
@ -63,9 +37,9 @@ func StartSentinelService(cfg *sentinel.SentinelConfig, db kv.RoDB, srvCfg *Serv
sentinel.BeaconBlockSsz, sentinel.BeaconBlockSsz,
// Cause problem due to buggy msg id will uncomment in the future. // Cause problem due to buggy msg id will uncomment in the future.
//sentinel.BeaconAggregateAndProofSsz, //sentinel.BeaconAggregateAndProofSsz,
sentinel.VoluntaryExitSsz, //sentinel.VoluntaryExitSsz,
sentinel.ProposerSlashingSsz, //sentinel.ProposerSlashingSsz,
sentinel.AttesterSlashingSsz, //sentinel.AttesterSlashingSsz,
sentinel.LightClientFinalityUpdateSsz, sentinel.LightClientFinalityUpdateSsz,
sentinel.LightClientOptimisticUpdateSsz, sentinel.LightClientOptimisticUpdateSsz,
} }
@ -81,7 +55,6 @@ func StartSentinelService(cfg *sentinel.SentinelConfig, db kv.RoDB, srvCfg *Serv
log.Error("[Sentinel] failed to start sentinel", "err", err) log.Error("[Sentinel] failed to start sentinel", "err", err)
} }
} }
go hardForkListener(ctx, sent)
log.Info("[Sentinel] Sentinel started", "enr", sent.String()) log.Info("[Sentinel] Sentinel started", "enr", sent.String())
if initialStatus != nil { if initialStatus != nil {
sent.SetStatus(initialStatus) sent.SetStatus(initialStatus)

View File

@ -34,11 +34,11 @@ func convertToInterfacePubkey(pubkey *ecdsa.PublicKey) (crypto.PubKey, error) {
xVal, yVal := new(btcec.FieldVal), new(btcec.FieldVal) xVal, yVal := new(btcec.FieldVal), new(btcec.FieldVal)
overflows := xVal.SetByteSlice(pubkey.X.Bytes()) overflows := xVal.SetByteSlice(pubkey.X.Bytes())
if overflows { if overflows {
return nil, fmt.Errorf("X value overflows") return nil, fmt.Errorf("x value overflows")
} }
overflows = yVal.SetByteSlice(pubkey.Y.Bytes()) overflows = yVal.SetByteSlice(pubkey.Y.Bytes())
if overflows { if overflows {
return nil, fmt.Errorf("Y value overflows") return nil, fmt.Errorf("y value overflows")
} }
newKey := crypto.PubKey((*crypto.Secp256k1PublicKey)(btcec.NewPublicKey(xVal, yVal))) newKey := crypto.PubKey((*crypto.Secp256k1PublicKey)(btcec.NewPublicKey(xVal, yVal)))
// Zero out temporary values. // Zero out temporary values.
@ -140,7 +140,6 @@ func connectToRandomPeer(s *Sentinel, topic string) (peerInfo peer.ID, err error
node := validPeerList[index] node := validPeerList[index]
if !isPeerWhitelisted(node, validPeerList) { if !isPeerWhitelisted(node, validPeerList) {
continue continue
} }