Update PubSub and include topic filter (#7496)

* Update pubsub and fix topicIDs

* WIP filter

* Add suggested code from @bidlocode

* add tests and fix bugs

* more tests

* Wait until state initialized to accept pubsub filtering

* rename for clarity and clarify comment

* fix test builds

* Autofix issues in 2 files

Resolved issues in the following files via DeepSource Autofix:
1. beacon-chain/p2p/pubsub_filter.go
2. beacon-chain/p2p/pubsub_filter_test.go

* @nisdas pr feedback

* pr feedback and fuzz fix

* Update beacon-chain/p2p/pubsub_filter.go

* Must have protocol suffix

* Must have protocol suffix

* gofmt

* rm test, fix panic

* Fix tests

* Add isInitialized check

* Add a few more tests for better coverage

* cache fork digest, make pubsub filter part of the p2p service

* rename service

* gofmt

* Add comment

* fix

Co-authored-by: deepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com>
Co-authored-by: Nishant Das <nishdas93@gmail.com>
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Preston Van Loon 2020-10-16 00:05:40 -07:00 committed by GitHub
parent a81c863ddb
commit 07e7e030d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 779 additions and 188 deletions

View File

@ -362,7 +362,7 @@ func TestService_ReceiveBlockBatch(t *testing.T) {
}
func TestService_HasInitSyncBlock(t *testing.T) {
s, err := NewService(context.Background(), &Config{})
s, err := NewService(context.Background(), &Config{StateNotifier: &blockchainTesting.MockStateNotifier{}})
require.NoError(t, err)
r := [32]byte{'a'}
if s.HasInitSyncBlock(r) {

View File

@ -80,7 +80,8 @@ func (msn *MockBlockNotifier) BlockFeed() *event.Feed {
// MockStateNotifier mocks the state notifier.
type MockStateNotifier struct {
feed *event.Feed
feed *event.Feed
feedLock sync.Mutex
recv []*feed.Event
recvLock sync.Mutex
@ -98,6 +99,9 @@ func (msn *MockStateNotifier) ReceivedEvents() []*feed.Event {
// StateFeed returns a state feed.
func (msn *MockStateNotifier) StateFeed() *event.Feed {
msn.feedLock.Lock()
defer msn.feedLock.Unlock()
if msn.feed == nil && msn.recvCh == nil {
msn.feed = new(event.Feed)
if msn.RecordEvents {

View File

@ -20,6 +20,7 @@ go_library(
"monitoring.go",
"options.go",
"pubsub.go",
"pubsub_filter.go",
"rpc_topic_mappings.go",
"sender.go",
"service.go",
@ -101,6 +102,7 @@ go_test(
"gossip_topic_mappings_test.go",
"options_test.go",
"parameter_test.go",
"pubsub_filter_test.go",
"pubsub_test.go",
"rpc_topic_mappings_test.go",
"sender_test.go",
@ -116,6 +118,7 @@ go_test(
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/peers/scorers:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
@ -123,6 +126,7 @@ go_test(
"//proto/beacon/p2p/v1:go_default_library",
"//proto/testing:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/event:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/iputils:go_default_library",
@ -131,6 +135,7 @@ go_test(
"//shared/testutil:go_default_library",
"//shared/testutil/assert:go_default_library",
"//shared/testutil/require:go_default_library",
"//shared/timeutils:go_default_library",
"@com_github_ethereum_go_ethereum//crypto:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/discover:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
@ -151,5 +156,6 @@ go_test(
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
"@com_github_stretchr_testify//require:go_default_library",
],
)

View File

@ -189,6 +189,7 @@ func TestStaticPeering_PeersAreAdded(t *testing.T) {
s.Start()
<-exitRoutine
}()
time.Sleep(50 * time.Millisecond)
// Send in a loop to ensure it is delivered (busy wait for the service to subscribe to the state feed).
for sent := 0; sent == 0; {
sent = s.stateNotifier.StateFeed().Send(&feed.Event{

View File

@ -30,6 +30,9 @@ var bufReaderPool = new(sync.Pool)
// with snappy compression (if enabled).
type SszNetworkEncoder struct{}
// ProtocolSuffixSSZSnappy is the last part of the topic string to identify the encoding protocol.
const ProtocolSuffixSSZSnappy = "ssz_snappy"
func (e SszNetworkEncoder) doEncode(msg interface{}) ([]byte, error) {
if v, ok := msg.(fastssz.Marshaler); ok {
return v.MarshalSSZ()
@ -136,7 +139,7 @@ func (e SszNetworkEncoder) DecodeWithMaxLength(r io.Reader, to interface{}) erro
// ProtocolSuffix returns the appropriate suffix for protocol IDs.
func (e SszNetworkEncoder) ProtocolSuffix() string {
return "/ssz_snappy"
return "/" + ProtocolSuffixSSZSnappy
}
// MaxLength specifies the maximum possible length of an encoded

View File

@ -22,7 +22,14 @@ var eth2ENRKey = params.BeaconNetworkConfig().ETH2Key
// ForkDigest returns the current fork digest of
// the node.
func (s *Service) forkDigest() ([4]byte, error) {
return p2putils.CreateForkDigest(s.genesisTime, s.genesisValidatorsRoot)
if s.currentForkDigest != [4]byte{} {
return s.currentForkDigest, nil
}
fd, err := p2putils.CreateForkDigest(s.genesisTime, s.genesisValidatorsRoot)
if err != nil {
s.currentForkDigest = fd
}
return fd, err
}
// Compares fork ENRs between an incoming peer's record and our node's

View File

@ -14,6 +14,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
ma "github.com/multiformats/go-multiaddr"
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
@ -32,7 +33,10 @@ func TestStartDiscv5_DifferentForkDigests(t *testing.T) {
genesisTime := time.Now()
genesisValidatorsRoot := make([]byte, 32)
s := &Service{
cfg: &Config{UDPPort: uint(port)},
cfg: &Config{
UDPPort: uint(port),
StateNotifier: &mock.MockStateNotifier{},
},
genesisTime: genesisTime,
genesisValidatorsRoot: genesisValidatorsRoot,
}
@ -44,6 +48,7 @@ func TestStartDiscv5_DifferentForkDigests(t *testing.T) {
cfg := &Config{
Discv5BootStrapAddr: []string{bootNode.String()},
UDPPort: uint(port),
StateNotifier: &mock.MockStateNotifier{},
}
var listeners []*discover.UDPv5
@ -119,6 +124,7 @@ func TestStartDiscv5_SameForkDigests_DifferentNextForkData(t *testing.T) {
cfg: &Config{UDPPort: uint(port)},
genesisTime: genesisTime,
genesisValidatorsRoot: genesisValidatorsRoot,
stateNotifier: &mock.MockStateNotifier{},
}
bootListener, err := s.createListener(ipAddr, pkey)
require.NoError(t, err)
@ -149,6 +155,7 @@ func TestStartDiscv5_SameForkDigests_DifferentNextForkData(t *testing.T) {
cfg: cfg,
genesisTime: genesisTime,
genesisValidatorsRoot: genesisValidatorsRoot,
stateNotifier: &mock.MockStateNotifier{},
}
listener, err := s.startDiscoveryV5(ipAddr, pkey)
assert.NoError(t, err, "Could not start discovery for node")
@ -176,6 +183,7 @@ func TestStartDiscv5_SameForkDigests_DifferentNextForkData(t *testing.T) {
cfg.UDPPort = 14000
cfg.TCPPort = 14001
cfg.MaxPeers = 30
cfg.StateNotifier = &mock.MockStateNotifier{}
s, err = NewService(context.Background(), cfg)
require.NoError(t, err)

View File

@ -69,6 +69,8 @@ func (s *Service) PublishToTopic(ctx context.Context, topic string, data []byte,
// SubscribeToTopic joins (if necessary) and subscribes to PubSub topic.
func (s *Service) SubscribeToTopic(topic string, opts ...pubsub.SubOpt) (*pubsub.Subscription, error) {
s.awaitStateInitialized() // Genesis time and genesis validator root are required to subscribe.
topicHandle, err := s.JoinTopic(topic)
if err != nil {
return nil, err

View File

@ -0,0 +1,81 @@
package p2p
import (
"fmt"
"strings"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
)
var _ pubsub.SubscriptionFilter = (*Service)(nil)
const pubsubSubscriptionRequestLimit = 100
// CanSubscribe returns true if the topic is of interest and we could subscribe to it.
func (s *Service) CanSubscribe(topic string) bool {
if !s.isInitialized() {
return false
}
parts := strings.Split(topic, "/")
if len(parts) != 5 {
return false
}
// The topic must start with a slash, which means the first part will be empty.
if parts[0] != "" {
return false
}
if parts[1] != "eth2" {
return false
}
fd, err := s.forkDigest()
if err != nil {
log.WithError(err).Error("Could not determine fork digest")
return false
}
if parts[2] != fmt.Sprintf("%x", fd) {
return false
}
if parts[4] != encoder.ProtocolSuffixSSZSnappy {
return false
}
// Check the incoming topic matches any topic mapping. This includes a check for part[3].
for gt := range GossipTopicMappings {
if _, err := scanfcheck(strings.Join(parts[0:4], "/"), gt); err == nil {
return true
}
}
return false
}
// FilterIncomingSubscriptions is invoked for all RPCs containing subscription notifications.
// This method returns only the topics of interest and may return an error if the subscription
// request contains too many topics.
func (sf *Service) FilterIncomingSubscriptions(_ peer.ID, subs []*pubsubpb.RPC_SubOpts) ([]*pubsubpb.RPC_SubOpts, error) {
if len(subs) > pubsubSubscriptionRequestLimit {
return nil, pubsub.ErrTooManySubscriptions
}
return pubsub.FilterSubscriptions(subs, sf.CanSubscribe), nil
}
// scanfcheck uses fmt.Sscanf to check that a given string matches expected format. This method
// returns the number of formatting substitutions matched and error if the string does not match
// the expected format. Note: this method only accepts integer compatible formatting substitutions
// such as %d or %x.
func scanfcheck(input, format string) (int, error) {
var t int
// Sscanf requires argument pointers with the appropriate type to load the value from the input.
// This method only checks that the input conforms to the format, the arguments are not used and
// therefore we can reuse the same integer pointer.
var cnt = strings.Count(format, "%")
var args = []interface{}{}
for i := 0; i < cnt; i++ {
args = append(args, &t)
}
return fmt.Sscanf(input, format, args...)
}

View File

@ -0,0 +1,363 @@
package p2p
import (
"context"
"fmt"
"reflect"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/peer"
pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb"
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/timeutils"
"github.com/stretchr/testify/require"
)
func TestService_CanSubscribe(t *testing.T) {
currentFork := [4]byte{0x01, 0x02, 0x03, 0x04}
validProtocolSuffix := "/" + encoder.ProtocolSuffixSSZSnappy
type test struct {
name string
topic string
want bool
}
tests := []test{
{
name: "block topic on current fork",
topic: fmt.Sprintf(BlockSubnetTopicFormat, currentFork) + validProtocolSuffix,
want: true,
},
{
name: "block topic on unknown fork",
topic: fmt.Sprintf(BlockSubnetTopicFormat, [4]byte{0xFF, 0xEE, 0x56, 0x21}) + validProtocolSuffix,
want: false,
},
{
name: "block topic missing protocol suffix",
topic: fmt.Sprintf(BlockSubnetTopicFormat, currentFork),
want: false,
},
{
name: "block topic wrong protocol suffix",
topic: fmt.Sprintf(BlockSubnetTopicFormat, currentFork) + "/foobar",
want: false,
},
{
name: "erroneous topic",
topic: "hey, want to foobar?",
want: false,
},
{
name: "erroneous topic that has the correct amount of slashes",
topic: "hey, want to foobar?////",
want: false,
},
{
name: "bad prefix",
topic: fmt.Sprintf("/eth3/%x/foobar", currentFork) + validProtocolSuffix,
want: false,
},
{
name: "topic not in gossip mapping",
topic: fmt.Sprintf("/eth2/%x/foobar", currentFork) + validProtocolSuffix,
want: false,
},
{
name: "att subnet topic on current fork",
topic: fmt.Sprintf(AttestationSubnetTopicFormat, currentFork, 55 /*subnet*/) + validProtocolSuffix,
want: true,
},
{
name: "att subnet topic on unknown fork",
topic: fmt.Sprintf(AttestationSubnetTopicFormat, [4]byte{0xCC, 0xBB, 0xAA, 0xA1} /*fork digest*/, 54 /*subnet*/) + validProtocolSuffix,
want: false,
},
}
// Ensure all gossip topic mappings pass validation.
for topic := range GossipTopicMappings {
formatting := []interface{}{currentFork}
// Special case for attestation subnets which have a second formatting placeholder.
if topic == AttestationSubnetTopicFormat {
formatting = append(formatting, 0 /* some subnet ID */)
}
tt := test{
name: topic,
topic: fmt.Sprintf(topic, formatting...) + validProtocolSuffix,
want: true,
}
tests = append(tests, tt)
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &Service{
currentForkDigest: currentFork,
genesisValidatorsRoot: make([]byte, 32),
genesisTime: time.Now(),
}
if got := s.CanSubscribe(tt.topic); got != tt.want {
t.Errorf("CanSubscribe(%s) = %v, want %v", tt.topic, got, tt.want)
}
})
}
}
func TestService_CanSubscribe_uninitialized(t *testing.T) {
s := &Service{}
require.False(t, s.CanSubscribe("foo"))
}
func Test_scanfcheck(t *testing.T) {
type args struct {
input string
format string
}
tests := []struct {
name string
args args
want int
wantErr bool
}{
{
name: "no formatting, exact match",
args: args{
input: "/foo/bar/zzzzzzzzzzzz/1234567",
format: "/foo/bar/zzzzzzzzzzzz/1234567",
},
want: 0,
wantErr: false,
},
{
name: "no formatting, mismatch",
args: args{
input: "/foo/bar/zzzzzzzzzzzz/1234567",
format: "/bar/foo/yyyyyy/7654321",
},
want: 0,
wantErr: true,
},
{
name: "formatting, match",
args: args{
input: "/foo/bar/abcdef/topic_11",
format: "/foo/bar/%x/topic_%d",
},
want: 2,
wantErr: false,
},
{
name: "formatting, incompatible bytes",
args: args{
input: "/foo/bar/zzzzzz/topic_11",
format: "/foo/bar/%x/topic_%d",
},
want: 0,
wantErr: true,
},
{ // Note: This method only supports integer compatible formatting values.
name: "formatting, string match",
args: args{
input: "/foo/bar/zzzzzz/topic_11",
format: "/foo/bar/%s/topic_%d",
},
want: 0,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := scanfcheck(tt.args.input, tt.args.format)
if (err != nil) != tt.wantErr {
t.Errorf("scanfcheck() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("scanfcheck() got = %v, want %v", got, tt.want)
}
})
}
}
func TestGossipTopicMapping_scanfcheck_GossipTopicFormattingSanityCheck(t *testing.T) {
// scanfcheck only supports integer based substitutions at the moment. Any others will
// inaccurately fail validation.
for topic := range GossipTopicMappings {
t.Run(topic, func(t *testing.T) {
for i, c := range topic {
if string(c) == "%" {
next := string(topic[i+1])
if next != "d" && next != "x" {
t.Errorf("Topic %s has formatting incompatiable with scanfcheck. Only %%d and %%x are supported", topic)
}
}
}
})
}
}
func TestService_FilterIncomingSubscriptions(t *testing.T) {
currentFork := [4]byte{0x01, 0x02, 0x03, 0x04}
validProtocolSuffix := "/" + encoder.ProtocolSuffixSSZSnappy
type args struct {
id peer.ID
subs []*pubsubpb.RPC_SubOpts
}
tests := []struct {
name string
args args
want []*pubsubpb.RPC_SubOpts
wantErr bool
}{
{
name: "too many topics",
args: args{
subs: make([]*pubsubpb.RPC_SubOpts, pubsubSubscriptionRequestLimit+1),
},
wantErr: true,
},
{
name: "exactly topic limit",
args: args{
subs: make([]*pubsubpb.RPC_SubOpts, pubsubSubscriptionRequestLimit),
},
wantErr: false,
want: nil, // No topics matched filters.
},
{
name: "blocks topic",
args: args{
subs: []*pubsubpb.RPC_SubOpts{
{
Subscribe: func() *bool {
b := true
return &b
}(),
Topicid: func() *string {
s := fmt.Sprintf(BlockSubnetTopicFormat, currentFork) + validProtocolSuffix
return &s
}(),
},
},
},
wantErr: false,
want: []*pubsubpb.RPC_SubOpts{
{
Subscribe: func() *bool {
b := true
return &b
}(),
Topicid: func() *string {
s := fmt.Sprintf(BlockSubnetTopicFormat, currentFork) + validProtocolSuffix
return &s
}(),
},
},
},
{
name: "blocks topic duplicated",
args: args{
subs: []*pubsubpb.RPC_SubOpts{
{
Subscribe: func() *bool {
b := true
return &b
}(),
Topicid: func() *string {
s := fmt.Sprintf(BlockSubnetTopicFormat, currentFork) + validProtocolSuffix
return &s
}(),
},
{
Subscribe: func() *bool {
b := true
return &b
}(),
Topicid: func() *string {
s := fmt.Sprintf(BlockSubnetTopicFormat, currentFork) + validProtocolSuffix
return &s
}(),
},
},
},
wantErr: false,
want: []*pubsubpb.RPC_SubOpts{ // Duplicated topics are only present once after filtering.
{
Subscribe: func() *bool {
b := true
return &b
}(),
Topicid: func() *string {
s := fmt.Sprintf(BlockSubnetTopicFormat, currentFork) + validProtocolSuffix
return &s
}(),
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &Service{
currentForkDigest: currentFork,
genesisValidatorsRoot: make([]byte, 32),
genesisTime: time.Now(),
}
got, err := s.FilterIncomingSubscriptions(tt.args.id, tt.args.subs)
if (err != nil) != tt.wantErr {
t.Errorf("FilterIncomingSubscriptions() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("FilterIncomingSubscriptions() got = %v, want %v", got, tt.want)
}
})
}
}
func TestService_MonitorsStateForkUpdates(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
notifier := &mock.MockStateNotifier{}
s, err := NewService(ctx, &Config{
StateNotifier: notifier,
})
require.NoError(t, err)
require.False(t, s.isInitialized())
go s.awaitStateInitialized()
for n := 0; n == 0; {
if ctx.Err() != nil {
t.Fatal(ctx.Err())
}
n = notifier.StateFeed().Send(&feed.Event{
Type: statefeed.Initialized,
Data: &statefeed.InitializedData{
StartTime: timeutils.Now(),
GenesisValidatorsRoot: bytesutil.PadTo([]byte("genesis"), 32),
},
})
}
time.Sleep(50 * time.Millisecond)
require.True(t, s.isInitialized())
require.NotEmpty(t, s.currentForkDigest)
}
func TestService_doesntSupportForksYet(t *testing.T) {
// Part of phase 1 will include a state transition which updates the state's fork. In phase 0,
// there are no forks or fork schedule planned. As such, we'll work on supporting fork upgrades
// in phase 1 changes.
if len(params.BeaconConfig().ForkVersionSchedule) > 0 {
t.Fatal("pubsub subscription filters do not support fork schedule (yet)")
}
}

View File

@ -5,24 +5,37 @@ import (
"fmt"
"sync"
"testing"
"time"
pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb"
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
)
func TestService_PublishToTopicConcurrentMapWrite(t *testing.T) {
s, err := NewService(context.Background(), &Config{})
s, err := NewService(context.Background(), &Config{
StateNotifier: &mock.MockStateNotifier{},
})
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
go s.awaitStateInitialized()
fd := initializeStateWithForkDigest(ctx, t, s.stateNotifier.StateFeed())
if !s.isInitialized() {
t.Fatal("service was not initialized")
}
wg := sync.WaitGroup{}
wg.Add(10)
for i := 0; i < 10; i++ {
go func(i int) {
assert.NoError(t, s.PublishToTopic(ctx, fmt.Sprintf("foo%v", i), []byte{}))
topic := fmt.Sprintf(AttestationSubnetTopicFormat, fd, i) + "/" + encoder.ProtocolSuffixSSZSnappy
assert.NoError(t, s.PublishToTopic(ctx, topic, []byte{}))
wg.Done()
}(i)
}

View File

@ -64,6 +64,7 @@ var maxDialTimeout = params.BeaconNetworkConfig().RespTimeout
type Service struct {
started bool
isPreGenesis bool
currentForkDigest [4]byte
pingMethod func(ctx context.Context, id peer.ID) error
cancel context.CancelFunc
cfg *Config
@ -78,6 +79,7 @@ type Service struct {
joinedTopicsLock sync.Mutex
subnetsLock map[uint64]*sync.RWMutex
subnetsLockLock sync.Mutex // Lock access to subnetsLock
initializationLock sync.Mutex
dv5Listener Listener
startupErr error
stateNotifier statefeed.Notifier
@ -152,6 +154,7 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
pubsub.WithNoAuthor(),
pubsub.WithMessageIdFn(msgIDFunction),
pubsub.WithSubscriptionFilter(s),
}
// Set the pubsub global parameters that we require.
setPubSubParameters()
@ -368,6 +371,13 @@ func (s *Service) pingPeers() {
// for initializing the p2p service as p2p needs to be aware
// of genesis information for peering.
func (s *Service) awaitStateInitialized() {
s.initializationLock.Lock()
defer s.initializationLock.Unlock()
if s.isInitialized() {
return
}
stateChannel := make(chan *feed.Event, 1)
stateSub := s.stateNotifier.StateFeed().Subscribe(stateChannel)
cleanup := stateSub.Unsubscribe
@ -384,6 +394,11 @@ func (s *Service) awaitStateInitialized() {
}
s.genesisTime = data.StartTime
s.genesisValidatorsRoot = data.GenesisValidatorsRoot
_, err := s.forkDigest() // initialize fork digest cache
if err != nil {
log.WithError(err).Error("Could not initialize fork digest")
}
return
}
case <-s.ctx.Done():
@ -448,3 +463,9 @@ func (s *Service) connectToBootnodes() error {
s.connectWithAllPeers(multiAddresses)
return nil
}
// Returns true if the service is aware of the genesis time and genesis validator root. This is
// required for discovery and pubsub validation.
func (s *Service) isInitialized() bool {
return !s.genesisTime.IsZero() && len(s.genesisValidatorsRoot) == 32
}

View File

@ -17,8 +17,13 @@ import (
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/p2putils"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
"github.com/prysmaticlabs/prysm/shared/timeutils"
logTest "github.com/sirupsen/logrus/hooks/test"
)
@ -71,7 +76,7 @@ func createHost(t *testing.T, port int) (host.Host, *ecdsa.PrivateKey, net.IP) {
}
func TestService_Stop_SetsStartedToFalse(t *testing.T) {
s, err := NewService(context.Background(), &Config{})
s, err := NewService(context.Background(), &Config{StateNotifier: &mock.MockStateNotifier{}})
require.NoError(t, err)
s.started = true
s.dv5Listener = &mockListener{}
@ -80,7 +85,7 @@ func TestService_Stop_SetsStartedToFalse(t *testing.T) {
}
func TestService_Stop_DontPanicIfDv5ListenerIsNotInited(t *testing.T) {
s, err := NewService(context.Background(), &Config{})
s, err := NewService(context.Background(), &Config{StateNotifier: &mock.MockStateNotifier{}})
require.NoError(t, err)
assert.NoError(t, s.Stop())
}
@ -89,8 +94,9 @@ func TestService_Start_OnlyStartsOnce(t *testing.T) {
hook := logTest.NewGlobal()
cfg := &Config{
TCPPort: 2000,
UDPPort: 2000,
TCPPort: 2000,
UDPPort: 2000,
StateNotifier: &mock.MockStateNotifier{},
}
s, err := NewService(context.Background(), cfg)
require.NoError(t, err)
@ -127,12 +133,13 @@ func TestService_Status_NotRunning(t *testing.T) {
func TestListenForNewNodes(t *testing.T) {
// Setup bootnode.
cfg := &Config{}
notifier := &mock.MockStateNotifier{}
cfg := &Config{StateNotifier: notifier}
port := 2000
cfg.UDPPort = uint(port)
_, pkey := createAddrAndPrivKey(t)
ipAddr := net.ParseIP("127.0.0.1")
genesisTime := time.Now()
genesisTime := timeutils.Now()
genesisValidatorsRoot := make([]byte, 32)
s := &Service{
cfg: cfg,
@ -159,6 +166,7 @@ func TestListenForNewNodes(t *testing.T) {
BootstrapNodeAddr: []string{bootNode.String()},
Discv5BootStrapAddr: []string{bootNode.String()},
MaxPeers: 30,
StateNotifier: notifier,
}
for i := 1; i <= 5; i++ {
h, pkey, ipAddr := createHost(t, port+i)
@ -195,12 +203,12 @@ func TestListenForNewNodes(t *testing.T) {
s, err = NewService(context.Background(), cfg)
require.NoError(t, err)
s.stateNotifier = &mock.MockStateNotifier{}
exitRoutine := make(chan bool)
go func() {
s.Start()
<-exitRoutine
}()
time.Sleep(1 * time.Second)
// Send in a loop to ensure it is delivered (busy wait for the service to subscribe to the state feed).
for sent := 0; sent == 0; {
sent = s.stateNotifier.StateFeed().Send(&feed.Event{
@ -249,15 +257,25 @@ func TestPeer_Disconnect(t *testing.T) {
}
func TestService_JoinLeaveTopic(t *testing.T) {
s, err := NewService(context.Background(), &Config{})
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
s, err := NewService(ctx, &Config{StateNotifier: &mock.MockStateNotifier{}})
require.NoError(t, err)
go s.awaitStateInitialized()
fd := initializeStateWithForkDigest(ctx, t, s.stateNotifier.StateFeed())
assert.Equal(t, 0, len(s.joinedTopics))
topic := fmt.Sprintf(AttestationSubnetTopicFormat, 42, 42)
topic := fmt.Sprintf(AttestationSubnetTopicFormat, fd, 42) + "/" + encoder.ProtocolSuffixSSZSnappy
topicHandle, err := s.JoinTopic(topic)
assert.NoError(t, err)
assert.Equal(t, 1, len(s.joinedTopics))
if topicHandle == nil {
t.Fatal("topic is nil")
}
sub, err := topicHandle.Subscribe()
assert.NoError(t, err)
@ -269,3 +287,29 @@ func TestService_JoinLeaveTopic(t *testing.T) {
sub.Cancel()
assert.NoError(t, s.LeaveTopic(topic))
}
// initializeStateWithForkDigest sets up the state feed initialized event and returns the fork
// digest associated with that genesis event.
func initializeStateWithForkDigest(ctx context.Context, t *testing.T, ef *event.Feed) [4]byte {
gt := timeutils.Now()
gvr := bytesutil.PadTo([]byte("genesis validator root"), 32)
for n := 0; n == 0; {
if ctx.Err() != nil {
t.Fatal(ctx.Err())
}
n = ef.Send(&feed.Event{
Type: statefeed.Initialized,
Data: &statefeed.InitializedData{
StartTime: gt,
GenesisValidatorsRoot: gvr,
},
})
}
fd, err := p2putils.CreateForkDigest(gt, gvr)
require.NoError(t, err)
time.Sleep(50 * time.Millisecond) // wait for pubsub filter to initialize.
return fd
}

View File

@ -86,6 +86,7 @@ func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) {
s.Start()
<-exitRoutine
}()
time.Sleep(50 * time.Millisecond)
// Send in a loop to ensure it is delivered (busy wait for the service to subscribe to the state feed).
for sent := 0; sent == 0; {
sent = s.stateNotifier.StateFeed().Send(&feed.Event{

View File

@ -9,18 +9,14 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
)
var errTooManyTopics = errors.New("too many topic IDs")
var errNilPubsubMessage = errors.New("nil pubsub message")
var errInvalidTopic = errors.New("invalid topic format")
func (s *Service) decodePubsubMessage(msg *pubsub.Message) (proto.Message, error) {
if msg == nil || msg.TopicIDs == nil {
if msg == nil || msg.Topic == nil || *msg.Topic == "" {
return nil, errNilPubsubMessage
}
if len(msg.TopicIDs) != 1 {
return nil, errTooManyTopics
}
topic := msg.TopicIDs[0]
topic := *msg.Topic
topic = strings.TrimSuffix(topic, s.p2p.Encoding().ProtocolSuffix())
topic, err := s.replaceForkDigest(topic)
if err != nil {

View File

@ -18,6 +18,7 @@ import (
func TestService_decodePubsubMessage(t *testing.T) {
tests := []struct {
name string
topic string
input *pubsub.Message
want proto.Message
wantErr error
@ -28,37 +29,29 @@ func TestService_decodePubsubMessage(t *testing.T) {
wantErr: errNilPubsubMessage,
},
{
name: "More than 1 topic ID",
name: "nil topic",
input: &pubsub.Message{
Message: &pb.Message{
TopicIDs: []string{"foo", "bar"},
Topic: nil,
},
},
wantErr: errTooManyTopics,
wantErr: errNilPubsubMessage,
},
{
name: "invalid topic format",
input: &pubsub.Message{
Message: &pb.Message{
TopicIDs: []string{"foo"}, // Topic should be in format of /eth2/%x/{something}.
},
},
name: "invalid topic format",
topic: "foo",
wantErr: errInvalidTopic,
},
{
name: "topic not mapped to any message type",
input: &pubsub.Message{
Message: &pb.Message{
TopicIDs: []string{"/eth2/abcdef/foo"},
},
},
name: "topic not mapped to any message type",
topic: "/eth2/abcdef/foo",
wantErr: p2p.ErrMessageNotMapped,
},
{
name: "valid message -- beacon block",
name: "valid message -- beacon block",
topic: p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.SignedBeaconBlock{})],
input: &pubsub.Message{
Message: &pb.Message{
TopicIDs: []string{p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.SignedBeaconBlock{})]},
Data: func() []byte {
buf := new(bytes.Buffer)
if _, err := p2ptesting.NewTestP2P(t).Encoding().EncodeGossip(buf, testutil.NewBeaconBlock()); err != nil {
@ -77,6 +70,14 @@ func TestService_decodePubsubMessage(t *testing.T) {
s := &Service{
p2p: p2ptesting.NewTestP2P(t),
}
if tt.topic != "" {
if tt.input == nil {
tt.input = &pubsub.Message{Message: &pb.Message{}}
} else if tt.input.Message == nil {
tt.input.Message = &pb.Message{}
}
tt.input.Message.Topic = &tt.topic
}
got, err := s.decodePubsubMessage(tt.input)
if err != tt.wantErr {
t.Errorf("decodePubsubMessage() error = %v, wantErr %v", err, tt.wantErr)

View File

@ -94,9 +94,10 @@ func (s *Service) subscribeWithBase(base proto.Message, topic string, validator
sub, err := s.p2p.SubscribeToTopic(topic)
if err != nil {
// Any error subscribing to a PubSub topic would be the result of a misconfiguration of
// libp2p PubSub library. This should not happen at normal runtime, unless the config
// changes to a fatal configuration.
panic(err)
// libp2p PubSub library or a subscription request to a topic that fails to match the topic
// subscription filter.
log.WithError(err).WithField("topic", topic).Error("Failed to subscribe to topic")
return nil
}
// Pipeline decodes the incoming subscription data, runs the validation, and handles the
@ -164,6 +165,10 @@ func (s *Service) wrapAndReportValidation(topic string, v pubsub.ValidatorEx) (s
ctx, cancel := context.WithTimeout(ctx, pubsubMessageTimeout)
defer cancel()
messageReceivedCounter.WithLabelValues(topic).Inc()
if msg.Topic == nil {
messageFailedValidationCounter.WithLabelValues(topic).Inc()
return pubsub.ValidationReject
}
// Reject any messages received before chainstart.
if !s.chainStarted {
messageFailedValidationCounter.WithLabelValues(topic).Inc()

View File

@ -12,6 +12,7 @@ import (
lru "github.com/hashicorp/golang-lru"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb"
pb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
mockChain "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
db "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
@ -269,6 +270,14 @@ func Test_wrapAndReportValidation(t *testing.T) {
v: func(ctx context.Context, id peer.ID, message *pubsub.Message) pubsub.ValidationResult {
return pubsub.ValidationAccept
},
msg: &pubsub.Message{
Message: &pubsubpb.Message{
Topic: func() *string {
s := "foo"
return &s
}(),
},
},
chainstarted: false,
},
want: pubsub.ValidationReject,
@ -281,6 +290,14 @@ func Test_wrapAndReportValidation(t *testing.T) {
panic("oh no!")
},
chainstarted: true,
msg: &pubsub.Message{
Message: &pubsubpb.Message{
Topic: func() *string {
s := "foo"
return &s
}(),
},
},
},
want: pubsub.ValidationIgnore,
},
@ -292,9 +309,32 @@ func Test_wrapAndReportValidation(t *testing.T) {
return pubsub.ValidationAccept
},
chainstarted: true,
msg: &pubsub.Message{
Message: &pubsubpb.Message{
Topic: func() *string {
s := "foo"
return &s
}(),
},
},
},
want: pubsub.ValidationAccept,
},
{
name: "nil topic",
args: args{
topic: "foo",
v: func(ctx context.Context, id peer.ID, message *pubsub.Message) pubsub.ValidationResult {
return pubsub.ValidationAccept
},
msg: &pubsub.Message{
Message: &pubsubpb.Message{
Topic: nil,
},
},
},
want: pubsub.ValidationReject,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

View File

@ -165,12 +165,11 @@ func TestValidateAggregateAndProof_NoBlock(t *testing.T) {
_, err = p.Encoding().EncodeGossip(buf, signedAggregateAndProof)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(signedAggregateAndProof)]
msg := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(signedAggregateAndProof)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
@ -235,12 +234,11 @@ func TestValidateAggregateAndProof_NotWithinSlotRange(t *testing.T) {
_, err = p.Encoding().EncodeGossip(buf, signedAggregateAndProof)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(signedAggregateAndProof)]
msg := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(signedAggregateAndProof)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
@ -256,10 +254,8 @@ func TestValidateAggregateAndProof_NotWithinSlotRange(t *testing.T) {
msg = &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(signedAggregateAndProof)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
if r.validateAggregateAndProof(context.Background(), "", msg) == pubsub.ValidationAccept {
@ -318,12 +314,11 @@ func TestValidateAggregateAndProof_ExistedInPool(t *testing.T) {
_, err = p.Encoding().EncodeGossip(buf, signedAggregateAndProof)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(signedAggregateAndProof)]
msg := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(signedAggregateAndProof)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
@ -409,12 +404,11 @@ func TestValidateAggregateAndProof_CanValidate(t *testing.T) {
_, err = p.Encoding().EncodeGossip(buf, signedAggregateAndProof)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(signedAggregateAndProof)]
msg := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(signedAggregateAndProof)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
@ -500,12 +494,11 @@ func TestValidateAggregateAndProofUseCheckptCache_CanValidate(t *testing.T) {
_, err = p.Encoding().EncodeGossip(buf, signedAggregateAndProof)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(signedAggregateAndProof)]
msg := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(signedAggregateAndProof)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
@ -589,12 +582,11 @@ func TestVerifyIndexInCommittee_SeenAggregatorEpoch(t *testing.T) {
_, err = p.Encoding().EncodeGossip(buf, signedAggregateAndProof)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(signedAggregateAndProof)]
msg := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(signedAggregateAndProof)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
@ -607,10 +599,8 @@ func TestVerifyIndexInCommittee_SeenAggregatorEpoch(t *testing.T) {
require.NoError(t, err)
msg = &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(signedAggregateAndProof)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
@ -697,12 +687,11 @@ func TestValidateAggregateAndProof_BadBlock(t *testing.T) {
_, err = p.Encoding().EncodeGossip(buf, signedAggregateAndProof)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(signedAggregateAndProof)]
msg := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(signedAggregateAndProof)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}

View File

@ -101,12 +101,11 @@ func TestValidateAttesterSlashing_ValidSlashing(t *testing.T) {
_, err = p.Encoding().EncodeGossip(buf, slashing)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(slashing)]
msg := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(slashing)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
valid := r.validateAttesterSlashing(ctx, "foobar", msg) == pubsub.ValidationAccept
@ -137,12 +136,11 @@ func TestValidateAttesterSlashing_ContextTimeout(t *testing.T) {
_, err = p.Encoding().EncodeGossip(buf, slashing)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(slashing)]
msg := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(slashing)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
valid := r.validateAttesterSlashing(ctx, "", msg) == pubsub.ValidationAccept
@ -164,12 +162,12 @@ func TestValidateAttesterSlashing_Syncing(t *testing.T) {
buf := new(bytes.Buffer)
_, err := p.Encoding().EncodeGossip(buf, slashing)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(slashing)]
msg := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(slashing)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
valid := r.validateAttesterSlashing(ctx, "", msg) == pubsub.ValidationAccept

View File

@ -36,10 +36,14 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
ctx, span := trace.StartSpan(ctx, "sync.validateCommitteeIndexBeaconAttestation")
defer span.End()
if msg.Topic == nil {
return pubsub.ValidationReject
}
// Override topic for decoding.
originalTopic := msg.TopicIDs[0]
originalTopic := msg.Topic
format := p2p.GossipTypeMapping[reflect.TypeOf(&eth.Attestation{})]
msg.TopicIDs[0] = format
msg.Topic = &format
m, err := s.decodePubsubMessage(msg)
if err != nil {
@ -48,7 +52,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
return pubsub.ValidationReject
}
// Restore topic.
msg.TopicIDs[0] = originalTopic
msg.Topic = originalTopic
att, ok := m.(*eth.Attestation)
if !ok {
@ -121,7 +125,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
}
// Is the attestation subnet correct.
subnet := helpers.ComputeSubnetForAttestation(uint64(len(indices)), att)
if !strings.HasPrefix(originalTopic, fmt.Sprintf(format, digest, subnet)) {
if !strings.HasPrefix(*originalTopic, fmt.Sprintf(format, digest, subnet)) {
return pubsub.ValidationReject
}
committee, err := helpers.BeaconCommittee(indices, bytesutil.ToBytes32(c.Seed), att.Data.Slot, att.Data.CommitteeIndex)
@ -167,7 +171,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
}
subnet := helpers.ComputeSubnetForAttestation(valCount, att)
if !strings.HasPrefix(originalTopic, fmt.Sprintf(format, digest, subnet)) {
if !strings.HasPrefix(*originalTopic, fmt.Sprintf(format, digest, subnet)) {
return pubsub.ValidationReject
}

View File

@ -95,6 +95,25 @@ func TestService_validateCommitteeIndexBeaconAttestation(t *testing.T) {
validAttestationSignature: true,
want: true,
},
{
name: "valid attestation signature with nil topic",
msg: &ethpb.Attestation{
AggregationBits: bitfield.Bitlist{0b101},
Data: &ethpb.AttestationData{
BeaconBlockRoot: validBlockRoot[:],
CommitteeIndex: 0,
Slot: 1,
Target: &ethpb.Checkpoint{
Epoch: 0,
Root: validBlockRoot[:],
},
Source: &ethpb.Checkpoint{Root: make([]byte, 32)},
},
},
topic: "",
validAttestationSignature: true,
want: false,
},
{
name: "bad target epoch",
msg: &ethpb.Attestation{
@ -253,10 +272,13 @@ func TestService_validateCommitteeIndexBeaconAttestation(t *testing.T) {
require.NoError(t, err)
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{tt.topic},
Data: buf.Bytes(),
Topic: &tt.topic,
},
}
if tt.topic == "" {
m.Message.Topic = nil
}
received := s.validateCommitteeIndexBeaconAttestation(ctx, "" /*peerID*/, m) == pubsub.ValidationAccept
if received != tt.want {
t.Fatalf("Did not received wanted validation. Got %v, wanted %v", !tt.want, tt.want)
@ -498,8 +520,8 @@ func TestService_validateCommitteeIndexBeaconAttestationUseCheckptCache(t *testi
require.NoError(t, err)
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{tt.topic},
Data: buf.Bytes(),
Topic: &tt.topic,
},
}
received := s.validateCommitteeIndexBeaconAttestation(ctx, "" /*peerID*/, m) == pubsub.ValidationAccept

View File

@ -67,12 +67,11 @@ func TestValidateBeaconBlockPubSub_InvalidSignature(t *testing.T) {
buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, msg)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(msg)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
result := r.validateBeaconBlockPubSub(ctx, "", m) == pubsub.ValidationAccept
@ -109,12 +108,11 @@ func TestValidateBeaconBlockPubSub_BlockAlreadyPresentInDB(t *testing.T) {
_, err = p.Encoding().EncodeGossip(buf, msg)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(msg)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
result := r.validateBeaconBlockPubSub(ctx, "", m) == pubsub.ValidationAccept
@ -171,12 +169,11 @@ func TestValidateBeaconBlockPubSub_ValidProposerSignature(t *testing.T) {
buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, msg)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(msg)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
result := r.validateBeaconBlockPubSub(ctx, "", m) == pubsub.ValidationAccept
@ -235,12 +232,11 @@ func TestValidateBeaconBlockPubSub_AdvanceEpochsForState(t *testing.T) {
buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, msg)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(msg)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
result := r.validateBeaconBlockPubSub(ctx, "", m) == pubsub.ValidationAccept
@ -275,12 +271,11 @@ func TestValidateBeaconBlockPubSub_Syncing(t *testing.T) {
buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, msg)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(msg)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
result := r.validateBeaconBlockPubSub(ctx, "", m) == pubsub.ValidationAccept
@ -320,12 +315,11 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromFuture(t *testing.T) {
buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, msg)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(msg)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
result := r.validateBeaconBlockPubSub(ctx, "", m) == pubsub.ValidationAccept
@ -369,12 +363,11 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromThePast(t *testing.T) {
buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, msg)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(msg)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
result := r.validateBeaconBlockPubSub(ctx, "", m) == pubsub.ValidationAccept
@ -428,12 +421,11 @@ func TestValidateBeaconBlockPubSub_SeenProposerSlot(t *testing.T) {
buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, msg)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(msg)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
r.setSeenBlockIndexSlot(msg.Block.Slot, msg.Block.ProposerIndex)
@ -476,12 +468,11 @@ func TestValidateBeaconBlockPubSub_FilterByFinalizedEpoch(t *testing.T) {
buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, b)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(b)]
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(b)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
@ -495,10 +486,8 @@ func TestValidateBeaconBlockPubSub_FilterByFinalizedEpoch(t *testing.T) {
require.NoError(t, err)
m = &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(b)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
@ -558,12 +547,11 @@ func TestValidateBeaconBlockPubSub_ParentNotFinalizedDescendant(t *testing.T) {
buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, msg)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(msg)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
assert.Equal(t, pubsub.ValidationReject, r.validateBeaconBlockPubSub(ctx, "", m), "Wrong validation result returned")
@ -623,12 +611,11 @@ func TestValidateBeaconBlockPubSub_InvalidParentBlock(t *testing.T) {
buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, msg)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(msg)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
result := r.validateBeaconBlockPubSub(ctx, "", m) == pubsub.ValidationAccept
@ -648,10 +635,8 @@ func TestValidateBeaconBlockPubSub_InvalidParentBlock(t *testing.T) {
require.NoError(t, err)
m = &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(msg)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
@ -727,12 +712,11 @@ func TestValidateBeaconBlockPubSub_RejectEvilBlocksFromFuture(t *testing.T) {
buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, msg)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(msg)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
result := r.validateBeaconBlockPubSub(ctx, "", m) == pubsub.ValidationAccept

View File

@ -122,12 +122,11 @@ func TestValidateProposerSlashing_ValidSlashing(t *testing.T) {
buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, slashing)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(slashing)]
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(slashing)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
@ -160,12 +159,11 @@ func TestValidateProposerSlashing_ContextTimeout(t *testing.T) {
buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, slashing)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(slashing)]
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(slashing)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
valid := r.validateProposerSlashing(ctx, "", m) == pubsub.ValidationAccept
@ -187,12 +185,11 @@ func TestValidateProposerSlashing_Syncing(t *testing.T) {
buf := new(bytes.Buffer)
_, err := p.Encoding().EncodeGossip(buf, slashing)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(slashing)]
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(slashing)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
valid := r.validateProposerSlashing(ctx, "", m) == pubsub.ValidationAccept

View File

@ -85,12 +85,11 @@ func TestValidateVoluntaryExit_ValidExit(t *testing.T) {
buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, exit)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(exit)]
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(exit)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
valid := r.validateVoluntaryExit(ctx, "", m) == pubsub.ValidationAccept
@ -119,12 +118,11 @@ func TestValidateVoluntaryExit_InvalidExitSlot(t *testing.T) {
buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, exit)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(exit)]
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(exit)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
valid := r.validateVoluntaryExit(ctx, "", m) == pubsub.ValidationAccept
@ -147,12 +145,11 @@ func TestValidateVoluntaryExit_ValidExit_Syncing(t *testing.T) {
buf := new(bytes.Buffer)
_, err := p.Encoding().EncodeGossip(buf, exit)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(exit)]
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(exit)],
},
Data: buf.Bytes(),
Topic: &topic,
},
}
valid := r.validateVoluntaryExit(ctx, "", m) == pubsub.ValidationAccept

View File

@ -2278,8 +2278,8 @@ def prysm_deps():
name = "com_github_libp2p_go_libp2p_pubsub",
build_file_proto_mode = "disable_global",
importpath = "github.com/libp2p/go-libp2p-pubsub",
sum = "h1:/AzOAmjDc+IJWybEzhYj1UaV1HErqmo4v3pQVepbgi8=",
version = "v0.3.3",
sum = "h1:9oO8W7qIWCYQYyz5z8nUsPcb3rrFehBlkbqvbSVjBxY=",
version = "v0.3.6",
)
go_repository(
name = "com_github_libp2p_go_libp2p_record",

View File

@ -176,8 +176,11 @@ func BeaconFuzzBlock(b []byte) {
pid := peer.ID("fuzz")
msg := &pubsub.Message{
Message: &pubsub_pb.Message{
Data: buf.Bytes(),
TopicIDs: []string{topic},
Data: buf.Bytes(),
Topic: func() *string {
tpc := topic
return &tpc
}(),
},
}

3
go.mod
View File

@ -60,7 +60,7 @@ require (
github.com/libp2p/go-libp2p-core v0.6.1
github.com/libp2p/go-libp2p-crypto v0.1.0
github.com/libp2p/go-libp2p-noise v0.1.1
github.com/libp2p/go-libp2p-pubsub v0.3.3
github.com/libp2p/go-libp2p-pubsub v0.3.6
github.com/libp2p/go-libp2p-secio v0.2.2
github.com/libp2p/go-libp2p-swarm v0.2.8
github.com/libp2p/go-libp2p-tls v0.1.4-0.20200421131144-8a8ad624a291 // indirect
@ -96,6 +96,7 @@ require (
github.com/schollz/progressbar/v3 v3.3.4
github.com/sirupsen/logrus v1.6.0
github.com/status-im/keycard-go v0.0.0-20200402102358-957c09536969 // indirect
github.com/stretchr/testify v1.6.1
github.com/supranational/blst v0.1.2-alpha.1.0.20200917144033-cd0847a7580b
github.com/tyler-smith/go-bip39 v1.0.2
github.com/urfave/cli/v2 v2.2.0

4
go.sum
View File

@ -587,8 +587,8 @@ github.com/libp2p/go-libp2p-peerstore v0.2.6 h1:2ACefBX23iMdJU9Ke+dcXt3w86MIryes
github.com/libp2p/go-libp2p-peerstore v0.2.6/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s=
github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k=
github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA=
github.com/libp2p/go-libp2p-pubsub v0.3.3 h1:/AzOAmjDc+IJWybEzhYj1UaV1HErqmo4v3pQVepbgi8=
github.com/libp2p/go-libp2p-pubsub v0.3.3/go.mod h1:DTMSVmZZfXodB/pvdTGrY2eHPZ9W2ev7hzTH83OKHrI=
github.com/libp2p/go-libp2p-pubsub v0.3.6 h1:9oO8W7qIWCYQYyz5z8nUsPcb3rrFehBlkbqvbSVjBxY=
github.com/libp2p/go-libp2p-pubsub v0.3.6/go.mod h1:DTMSVmZZfXodB/pvdTGrY2eHPZ9W2ev7hzTH83OKHrI=
github.com/libp2p/go-libp2p-secio v0.1.0/go.mod h1:tMJo2w7h3+wN4pgU2LSYeiKPrfqBgkOsdiKK77hE7c8=
github.com/libp2p/go-libp2p-secio v0.2.0/go.mod h1:2JdZepB8J5V9mBp79BmwsaPQhRPNN2NrnB2lKQcdy6g=
github.com/libp2p/go-libp2p-secio v0.2.1/go.mod h1:cWtZpILJqkqrSkiYcDBh5lA3wbT2Q+hz3rJQq3iftD8=