mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-25 21:07:18 +00:00
16bbf5602f
* builds * move block to consensus-types Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
493 lines
15 KiB
Go
493 lines
15 KiB
Go
package p2p
|
|
|
|
import (
|
|
"context"
|
|
"crypto/rand"
|
|
"reflect"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/p2p/discover"
|
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
|
"github.com/ethereum/go-ethereum/p2p/enr"
|
|
"github.com/libp2p/go-libp2p-core/crypto"
|
|
"github.com/prysmaticlabs/go-bitfield"
|
|
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
|
|
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
|
|
"github.com/prysmaticlabs/prysm/cmd/beacon-chain/flags"
|
|
"github.com/prysmaticlabs/prysm/consensus-types/wrapper"
|
|
pb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
|
|
"github.com/prysmaticlabs/prysm/testing/assert"
|
|
"github.com/prysmaticlabs/prysm/testing/require"
|
|
)
|
|
|
|
func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) {
|
|
// This test needs to be entirely rewritten and should be done in a follow up PR from #7885.
|
|
t.Skip("This test is now failing after PR 7885 due to false positive")
|
|
gFlags := new(flags.GlobalFlags)
|
|
gFlags.MinimumPeersPerSubnet = 4
|
|
flags.Init(gFlags)
|
|
// Reset config.
|
|
defer flags.Init(new(flags.GlobalFlags))
|
|
port := 2000
|
|
ipAddr, pkey := createAddrAndPrivKey(t)
|
|
genesisTime := time.Now()
|
|
genesisValidatorsRoot := make([]byte, 32)
|
|
s := &Service{
|
|
cfg: &Config{UDPPort: uint(port)},
|
|
genesisTime: genesisTime,
|
|
genesisValidatorsRoot: genesisValidatorsRoot,
|
|
}
|
|
bootListener, err := s.createListener(ipAddr, pkey)
|
|
require.NoError(t, err)
|
|
defer bootListener.Close()
|
|
|
|
bootNode := bootListener.Self()
|
|
// Use shorter period for testing.
|
|
currentPeriod := pollingPeriod
|
|
pollingPeriod = 1 * time.Second
|
|
defer func() {
|
|
pollingPeriod = currentPeriod
|
|
}()
|
|
|
|
var listeners []*discover.UDPv5
|
|
for i := 1; i <= 3; i++ {
|
|
port = 3000 + i
|
|
cfg := &Config{
|
|
BootstrapNodeAddr: []string{bootNode.String()},
|
|
Discv5BootStrapAddr: []string{bootNode.String()},
|
|
MaxPeers: 30,
|
|
UDPPort: uint(port),
|
|
}
|
|
ipAddr, pkey := createAddrAndPrivKey(t)
|
|
s = &Service{
|
|
cfg: cfg,
|
|
genesisTime: genesisTime,
|
|
genesisValidatorsRoot: genesisValidatorsRoot,
|
|
}
|
|
listener, err := s.startDiscoveryV5(ipAddr, pkey)
|
|
assert.NoError(t, err, "Could not start discovery for node")
|
|
bitV := bitfield.NewBitvector64()
|
|
bitV.SetBitAt(uint64(i), true)
|
|
|
|
entry := enr.WithEntry(attSubnetEnrKey, &bitV)
|
|
listener.LocalNode().Set(entry)
|
|
listeners = append(listeners, listener)
|
|
}
|
|
defer func() {
|
|
// Close down all peers.
|
|
for _, listener := range listeners {
|
|
listener.Close()
|
|
}
|
|
}()
|
|
|
|
// Make one service on port 4001.
|
|
port = 4001
|
|
cfg := &Config{
|
|
BootstrapNodeAddr: []string{bootNode.String()},
|
|
Discv5BootStrapAddr: []string{bootNode.String()},
|
|
MaxPeers: 30,
|
|
UDPPort: uint(port),
|
|
}
|
|
cfg.StateNotifier = &mock.MockStateNotifier{}
|
|
s, err = NewService(context.Background(), cfg)
|
|
require.NoError(t, err)
|
|
exitRoutine := make(chan bool)
|
|
go func() {
|
|
s.Start()
|
|
<-exitRoutine
|
|
}()
|
|
time.Sleep(50 * time.Millisecond)
|
|
// Send in a loop to ensure it is delivered (busy wait for the service to subscribe to the state feed).
|
|
for sent := 0; sent == 0; {
|
|
sent = s.stateNotifier.StateFeed().Send(&feed.Event{
|
|
Type: statefeed.Initialized,
|
|
Data: &statefeed.InitializedData{
|
|
StartTime: time.Now(),
|
|
GenesisValidatorsRoot: make([]byte, 32),
|
|
},
|
|
})
|
|
}
|
|
|
|
// Wait for the nodes to have their local routing tables to be populated with the other nodes
|
|
time.Sleep(6 * discoveryWaitTime)
|
|
|
|
// look up 3 different subnets
|
|
ctx := context.Background()
|
|
exists, err := s.FindPeersWithSubnet(ctx, "", 1, flags.Get().MinimumPeersPerSubnet)
|
|
require.NoError(t, err)
|
|
exists2, err := s.FindPeersWithSubnet(ctx, "", 2, flags.Get().MinimumPeersPerSubnet)
|
|
require.NoError(t, err)
|
|
exists3, err := s.FindPeersWithSubnet(ctx, "", 3, flags.Get().MinimumPeersPerSubnet)
|
|
require.NoError(t, err)
|
|
if !exists || !exists2 || !exists3 {
|
|
t.Fatal("Peer with subnet doesn't exist")
|
|
}
|
|
|
|
// Update ENR of a peer.
|
|
testService := &Service{
|
|
dv5Listener: listeners[0],
|
|
metaData: wrapper.WrappedMetadataV0(&pb.MetaDataV0{
|
|
Attnets: bitfield.NewBitvector64(),
|
|
}),
|
|
}
|
|
cache.SubnetIDs.AddAttesterSubnetID(0, 10)
|
|
testService.RefreshENR()
|
|
time.Sleep(2 * time.Second)
|
|
|
|
exists, err = s.FindPeersWithSubnet(ctx, "", 2, flags.Get().MinimumPeersPerSubnet)
|
|
require.NoError(t, err)
|
|
|
|
assert.Equal(t, true, exists, "Peer with subnet doesn't exist")
|
|
assert.NoError(t, s.Stop())
|
|
exitRoutine <- true
|
|
}
|
|
|
|
func Test_AttSubnets(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
record func(t *testing.T) *enr.Record
|
|
want []uint64
|
|
wantErr bool
|
|
errContains string
|
|
}{
|
|
{
|
|
name: "valid record",
|
|
record: func(t *testing.T) *enr.Record {
|
|
db, err := enode.OpenDB("")
|
|
assert.NoError(t, err)
|
|
priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader)
|
|
assert.NoError(t, err)
|
|
convertedKey := convertFromInterfacePrivKey(priv)
|
|
assert.NoError(t, err)
|
|
localNode := enode.NewLocalNode(db, convertedKey)
|
|
localNode = initializeAttSubnets(localNode)
|
|
return localNode.Node().Record()
|
|
},
|
|
want: nil,
|
|
wantErr: false,
|
|
},
|
|
{
|
|
name: "too small subnet",
|
|
record: func(t *testing.T) *enr.Record {
|
|
db, err := enode.OpenDB("")
|
|
assert.NoError(t, err)
|
|
priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader)
|
|
assert.NoError(t, err)
|
|
convertedKey := convertFromInterfacePrivKey(priv)
|
|
assert.NoError(t, err)
|
|
localNode := enode.NewLocalNode(db, convertedKey)
|
|
entry := enr.WithEntry(attSubnetEnrKey, []byte{})
|
|
localNode.Set(entry)
|
|
return localNode.Node().Record()
|
|
},
|
|
want: []uint64{},
|
|
wantErr: true,
|
|
errContains: "invalid bitvector provided, it has a size of",
|
|
},
|
|
{
|
|
name: "half sized subnet",
|
|
record: func(t *testing.T) *enr.Record {
|
|
db, err := enode.OpenDB("")
|
|
assert.NoError(t, err)
|
|
priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader)
|
|
assert.NoError(t, err)
|
|
convertedKey := convertFromInterfacePrivKey(priv)
|
|
assert.NoError(t, err)
|
|
localNode := enode.NewLocalNode(db, convertedKey)
|
|
entry := enr.WithEntry(attSubnetEnrKey, make([]byte, 4))
|
|
localNode.Set(entry)
|
|
return localNode.Node().Record()
|
|
},
|
|
want: []uint64{},
|
|
wantErr: true,
|
|
errContains: "invalid bitvector provided, it has a size of",
|
|
},
|
|
{
|
|
name: "too large subnet",
|
|
record: func(t *testing.T) *enr.Record {
|
|
db, err := enode.OpenDB("")
|
|
assert.NoError(t, err)
|
|
priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader)
|
|
assert.NoError(t, err)
|
|
convertedKey := convertFromInterfacePrivKey(priv)
|
|
assert.NoError(t, err)
|
|
localNode := enode.NewLocalNode(db, convertedKey)
|
|
entry := enr.WithEntry(attSubnetEnrKey, make([]byte, byteCount(int(attestationSubnetCount))+1))
|
|
localNode.Set(entry)
|
|
return localNode.Node().Record()
|
|
},
|
|
want: []uint64{},
|
|
wantErr: true,
|
|
errContains: "invalid bitvector provided, it has a size of",
|
|
},
|
|
{
|
|
name: "very large subnet",
|
|
record: func(t *testing.T) *enr.Record {
|
|
db, err := enode.OpenDB("")
|
|
assert.NoError(t, err)
|
|
priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader)
|
|
assert.NoError(t, err)
|
|
convertedKey := convertFromInterfacePrivKey(priv)
|
|
assert.NoError(t, err)
|
|
localNode := enode.NewLocalNode(db, convertedKey)
|
|
entry := enr.WithEntry(attSubnetEnrKey, make([]byte, byteCount(int(attestationSubnetCount))+100))
|
|
localNode.Set(entry)
|
|
return localNode.Node().Record()
|
|
},
|
|
want: []uint64{},
|
|
wantErr: true,
|
|
errContains: "invalid bitvector provided, it has a size of",
|
|
},
|
|
{
|
|
name: "single subnet",
|
|
record: func(t *testing.T) *enr.Record {
|
|
db, err := enode.OpenDB("")
|
|
assert.NoError(t, err)
|
|
priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader)
|
|
assert.NoError(t, err)
|
|
convertedKey := convertFromInterfacePrivKey(priv)
|
|
assert.NoError(t, err)
|
|
localNode := enode.NewLocalNode(db, convertedKey)
|
|
bitV := bitfield.NewBitvector64()
|
|
bitV.SetBitAt(0, true)
|
|
entry := enr.WithEntry(attSubnetEnrKey, bitV.Bytes())
|
|
localNode.Set(entry)
|
|
return localNode.Node().Record()
|
|
},
|
|
want: []uint64{0},
|
|
wantErr: false,
|
|
},
|
|
{
|
|
name: "multiple subnets",
|
|
record: func(t *testing.T) *enr.Record {
|
|
db, err := enode.OpenDB("")
|
|
assert.NoError(t, err)
|
|
priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader)
|
|
assert.NoError(t, err)
|
|
convertedKey := convertFromInterfacePrivKey(priv)
|
|
assert.NoError(t, err)
|
|
localNode := enode.NewLocalNode(db, convertedKey)
|
|
bitV := bitfield.NewBitvector64()
|
|
for i := uint64(0); i < bitV.Len(); i++ {
|
|
// skip 2 subnets
|
|
if (i+1)%2 == 0 {
|
|
continue
|
|
}
|
|
bitV.SetBitAt(i, true)
|
|
}
|
|
bitV.SetBitAt(0, true)
|
|
entry := enr.WithEntry(attSubnetEnrKey, bitV.Bytes())
|
|
localNode.Set(entry)
|
|
return localNode.Node().Record()
|
|
},
|
|
want: []uint64{0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20,
|
|
22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48,
|
|
50, 52, 54, 56, 58, 60, 62},
|
|
wantErr: false,
|
|
},
|
|
{
|
|
name: "all subnets",
|
|
record: func(t *testing.T) *enr.Record {
|
|
db, err := enode.OpenDB("")
|
|
assert.NoError(t, err)
|
|
priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader)
|
|
assert.NoError(t, err)
|
|
convertedKey := convertFromInterfacePrivKey(priv)
|
|
assert.NoError(t, err)
|
|
localNode := enode.NewLocalNode(db, convertedKey)
|
|
bitV := bitfield.NewBitvector64()
|
|
for i := uint64(0); i < bitV.Len(); i++ {
|
|
bitV.SetBitAt(i, true)
|
|
}
|
|
entry := enr.WithEntry(attSubnetEnrKey, bitV.Bytes())
|
|
localNode.Set(entry)
|
|
return localNode.Node().Record()
|
|
},
|
|
want: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
|
|
21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49,
|
|
50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63},
|
|
wantErr: false,
|
|
},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
got, err := attSubnets(tt.record(t))
|
|
if (err != nil) != tt.wantErr {
|
|
t.Errorf("syncSubnets() error = %v, wantErr %v", err, tt.wantErr)
|
|
return
|
|
}
|
|
if tt.wantErr {
|
|
assert.ErrorContains(t, tt.errContains, err)
|
|
}
|
|
if !reflect.DeepEqual(got, tt.want) {
|
|
t.Errorf("syncSubnets() got = %v, want %v", got, tt.want)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func Test_SyncSubnets(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
record func(t *testing.T) *enr.Record
|
|
want []uint64
|
|
wantErr bool
|
|
errContains string
|
|
}{
|
|
{
|
|
name: "valid record",
|
|
record: func(t *testing.T) *enr.Record {
|
|
db, err := enode.OpenDB("")
|
|
assert.NoError(t, err)
|
|
priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader)
|
|
assert.NoError(t, err)
|
|
convertedKey := convertFromInterfacePrivKey(priv)
|
|
assert.NoError(t, err)
|
|
localNode := enode.NewLocalNode(db, convertedKey)
|
|
localNode = initializeSyncCommSubnets(localNode)
|
|
return localNode.Node().Record()
|
|
},
|
|
want: nil,
|
|
wantErr: false,
|
|
},
|
|
{
|
|
name: "too small subnet",
|
|
record: func(t *testing.T) *enr.Record {
|
|
db, err := enode.OpenDB("")
|
|
assert.NoError(t, err)
|
|
priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader)
|
|
assert.NoError(t, err)
|
|
convertedKey := convertFromInterfacePrivKey(priv)
|
|
assert.NoError(t, err)
|
|
localNode := enode.NewLocalNode(db, convertedKey)
|
|
entry := enr.WithEntry(syncCommsSubnetEnrKey, []byte{})
|
|
localNode.Set(entry)
|
|
return localNode.Node().Record()
|
|
},
|
|
want: []uint64{},
|
|
wantErr: true,
|
|
errContains: "invalid bitvector provided, it has a size of",
|
|
},
|
|
{
|
|
name: "too large subnet",
|
|
record: func(t *testing.T) *enr.Record {
|
|
db, err := enode.OpenDB("")
|
|
assert.NoError(t, err)
|
|
priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader)
|
|
assert.NoError(t, err)
|
|
convertedKey := convertFromInterfacePrivKey(priv)
|
|
assert.NoError(t, err)
|
|
localNode := enode.NewLocalNode(db, convertedKey)
|
|
entry := enr.WithEntry(syncCommsSubnetEnrKey, make([]byte, byteCount(int(syncCommsSubnetCount))+1))
|
|
localNode.Set(entry)
|
|
return localNode.Node().Record()
|
|
},
|
|
want: []uint64{},
|
|
wantErr: true,
|
|
errContains: "invalid bitvector provided, it has a size of",
|
|
},
|
|
{
|
|
name: "very large subnet",
|
|
record: func(t *testing.T) *enr.Record {
|
|
db, err := enode.OpenDB("")
|
|
assert.NoError(t, err)
|
|
priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader)
|
|
assert.NoError(t, err)
|
|
convertedKey := convertFromInterfacePrivKey(priv)
|
|
assert.NoError(t, err)
|
|
localNode := enode.NewLocalNode(db, convertedKey)
|
|
entry := enr.WithEntry(syncCommsSubnetEnrKey, make([]byte, byteCount(int(syncCommsSubnetCount))+100))
|
|
localNode.Set(entry)
|
|
return localNode.Node().Record()
|
|
},
|
|
want: []uint64{},
|
|
wantErr: true,
|
|
errContains: "invalid bitvector provided, it has a size of",
|
|
},
|
|
{
|
|
name: "single subnet",
|
|
record: func(t *testing.T) *enr.Record {
|
|
db, err := enode.OpenDB("")
|
|
assert.NoError(t, err)
|
|
priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader)
|
|
assert.NoError(t, err)
|
|
convertedKey := convertFromInterfacePrivKey(priv)
|
|
assert.NoError(t, err)
|
|
localNode := enode.NewLocalNode(db, convertedKey)
|
|
bitV := bitfield.Bitvector4{byte(0x00)}
|
|
bitV.SetBitAt(0, true)
|
|
entry := enr.WithEntry(syncCommsSubnetEnrKey, bitV.Bytes())
|
|
localNode.Set(entry)
|
|
return localNode.Node().Record()
|
|
},
|
|
want: []uint64{0},
|
|
wantErr: false,
|
|
},
|
|
{
|
|
name: "multiple subnets",
|
|
record: func(t *testing.T) *enr.Record {
|
|
db, err := enode.OpenDB("")
|
|
assert.NoError(t, err)
|
|
priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader)
|
|
assert.NoError(t, err)
|
|
convertedKey := convertFromInterfacePrivKey(priv)
|
|
assert.NoError(t, err)
|
|
localNode := enode.NewLocalNode(db, convertedKey)
|
|
bitV := bitfield.Bitvector4{byte(0x00)}
|
|
for i := uint64(0); i < bitV.Len(); i++ {
|
|
// skip 2 subnets
|
|
if (i+1)%2 == 0 {
|
|
continue
|
|
}
|
|
bitV.SetBitAt(i, true)
|
|
}
|
|
bitV.SetBitAt(0, true)
|
|
entry := enr.WithEntry(syncCommsSubnetEnrKey, bitV.Bytes())
|
|
localNode.Set(entry)
|
|
return localNode.Node().Record()
|
|
},
|
|
want: []uint64{0, 2},
|
|
wantErr: false,
|
|
},
|
|
{
|
|
name: "all subnets",
|
|
record: func(t *testing.T) *enr.Record {
|
|
db, err := enode.OpenDB("")
|
|
assert.NoError(t, err)
|
|
priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader)
|
|
assert.NoError(t, err)
|
|
convertedKey := convertFromInterfacePrivKey(priv)
|
|
assert.NoError(t, err)
|
|
localNode := enode.NewLocalNode(db, convertedKey)
|
|
bitV := bitfield.Bitvector4{byte(0x00)}
|
|
for i := uint64(0); i < bitV.Len(); i++ {
|
|
bitV.SetBitAt(i, true)
|
|
}
|
|
entry := enr.WithEntry(syncCommsSubnetEnrKey, bitV.Bytes())
|
|
localNode.Set(entry)
|
|
return localNode.Node().Record()
|
|
},
|
|
want: []uint64{0, 1, 2, 3},
|
|
wantErr: false,
|
|
},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
got, err := syncSubnets(tt.record(t))
|
|
if (err != nil) != tt.wantErr {
|
|
t.Errorf("syncSubnets() error = %v, wantErr %v", err, tt.wantErr)
|
|
return
|
|
}
|
|
if tt.wantErr {
|
|
assert.ErrorContains(t, tt.errContains, err)
|
|
}
|
|
if !reflect.DeepEqual(got, tt.want) {
|
|
t.Errorf("syncSubnets() got = %v, want %v", got, tt.want)
|
|
}
|
|
})
|
|
}
|
|
}
|