package p2p import ( "context" "crypto/rand" "reflect" "testing" "time" "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/libp2p/go-libp2p-core/crypto" "github.com/prysmaticlabs/go-bitfield" mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state" "github.com/prysmaticlabs/prysm/cmd/beacon-chain/flags" "github.com/prysmaticlabs/prysm/config/params" "github.com/prysmaticlabs/prysm/consensus-types/wrapper" ecdsaprysm "github.com/prysmaticlabs/prysm/crypto/ecdsa" pb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/testing/assert" "github.com/prysmaticlabs/prysm/testing/require" ) func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) { params.SetupTestConfigCleanup(t) // This test needs to be entirely rewritten and should be done in a follow up PR from #7885. t.Skip("This test is now failing after PR 7885 due to false positive") gFlags := new(flags.GlobalFlags) gFlags.MinimumPeersPerSubnet = 4 flags.Init(gFlags) // Reset config. defer flags.Init(new(flags.GlobalFlags)) port := 2000 ipAddr, pkey := createAddrAndPrivKey(t) genesisTime := time.Now() genesisValidatorsRoot := make([]byte, 32) s := &Service{ cfg: &Config{UDPPort: uint(port)}, genesisTime: genesisTime, genesisValidatorsRoot: genesisValidatorsRoot, } bootListener, err := s.createListener(ipAddr, pkey) require.NoError(t, err) defer bootListener.Close() bootNode := bootListener.Self() // Use shorter period for testing. currentPeriod := pollingPeriod pollingPeriod = 1 * time.Second defer func() { pollingPeriod = currentPeriod }() var listeners []*discover.UDPv5 for i := 1; i <= 3; i++ { port = 3000 + i cfg := &Config{ BootstrapNodeAddr: []string{bootNode.String()}, Discv5BootStrapAddr: []string{bootNode.String()}, MaxPeers: 30, UDPPort: uint(port), } ipAddr, pkey := createAddrAndPrivKey(t) s = &Service{ cfg: cfg, genesisTime: genesisTime, genesisValidatorsRoot: genesisValidatorsRoot, } listener, err := s.startDiscoveryV5(ipAddr, pkey) assert.NoError(t, err, "Could not start discovery for node") bitV := bitfield.NewBitvector64() bitV.SetBitAt(uint64(i), true) entry := enr.WithEntry(attSubnetEnrKey, &bitV) listener.LocalNode().Set(entry) listeners = append(listeners, listener) } defer func() { // Close down all peers. for _, listener := range listeners { listener.Close() } }() // Make one service on port 4001. port = 4001 cfg := &Config{ BootstrapNodeAddr: []string{bootNode.String()}, Discv5BootStrapAddr: []string{bootNode.String()}, MaxPeers: 30, UDPPort: uint(port), } cfg.StateNotifier = &mock.MockStateNotifier{} s, err = NewService(context.Background(), cfg) require.NoError(t, err) exitRoutine := make(chan bool) go func() { s.Start() <-exitRoutine }() time.Sleep(50 * time.Millisecond) // Send in a loop to ensure it is delivered (busy wait for the service to subscribe to the state feed). for sent := 0; sent == 0; { sent = s.stateNotifier.StateFeed().Send(&feed.Event{ Type: statefeed.Initialized, Data: &statefeed.InitializedData{ StartTime: time.Now(), GenesisValidatorsRoot: make([]byte, 32), }, }) } // Wait for the nodes to have their local routing tables to be populated with the other nodes time.Sleep(6 * discoveryWaitTime) // look up 3 different subnets ctx := context.Background() exists, err := s.FindPeersWithSubnet(ctx, "", 1, flags.Get().MinimumPeersPerSubnet) require.NoError(t, err) exists2, err := s.FindPeersWithSubnet(ctx, "", 2, flags.Get().MinimumPeersPerSubnet) require.NoError(t, err) exists3, err := s.FindPeersWithSubnet(ctx, "", 3, flags.Get().MinimumPeersPerSubnet) require.NoError(t, err) if !exists || !exists2 || !exists3 { t.Fatal("Peer with subnet doesn't exist") } // Update ENR of a peer. testService := &Service{ dv5Listener: listeners[0], metaData: wrapper.WrappedMetadataV0(&pb.MetaDataV0{ Attnets: bitfield.NewBitvector64(), }), } cache.SubnetIDs.AddAttesterSubnetID(0, 10) testService.RefreshENR() time.Sleep(2 * time.Second) exists, err = s.FindPeersWithSubnet(ctx, "", 2, flags.Get().MinimumPeersPerSubnet) require.NoError(t, err) assert.Equal(t, true, exists, "Peer with subnet doesn't exist") assert.NoError(t, s.Stop()) exitRoutine <- true } func Test_AttSubnets(t *testing.T) { params.SetupTestConfigCleanup(t) tests := []struct { name string record func(t *testing.T) *enr.Record want []uint64 wantErr bool errContains string }{ { name: "valid record", record: func(t *testing.T) *enr.Record { db, err := enode.OpenDB("") assert.NoError(t, err) priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader) assert.NoError(t, err) convertedKey, err := ecdsaprysm.ConvertFromInterfacePrivKey(priv) assert.NoError(t, err) localNode := enode.NewLocalNode(db, convertedKey) localNode = initializeAttSubnets(localNode) return localNode.Node().Record() }, want: nil, wantErr: false, }, { name: "too small subnet", record: func(t *testing.T) *enr.Record { db, err := enode.OpenDB("") assert.NoError(t, err) priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader) assert.NoError(t, err) convertedKey, err := ecdsaprysm.ConvertFromInterfacePrivKey(priv) assert.NoError(t, err) localNode := enode.NewLocalNode(db, convertedKey) entry := enr.WithEntry(attSubnetEnrKey, []byte{}) localNode.Set(entry) return localNode.Node().Record() }, want: []uint64{}, wantErr: true, errContains: "invalid bitvector provided, it has a size of", }, { name: "half sized subnet", record: func(t *testing.T) *enr.Record { db, err := enode.OpenDB("") assert.NoError(t, err) priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader) assert.NoError(t, err) convertedKey, err := ecdsaprysm.ConvertFromInterfacePrivKey(priv) assert.NoError(t, err) localNode := enode.NewLocalNode(db, convertedKey) entry := enr.WithEntry(attSubnetEnrKey, make([]byte, 4)) localNode.Set(entry) return localNode.Node().Record() }, want: []uint64{}, wantErr: true, errContains: "invalid bitvector provided, it has a size of", }, { name: "too large subnet", record: func(t *testing.T) *enr.Record { db, err := enode.OpenDB("") assert.NoError(t, err) priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader) assert.NoError(t, err) convertedKey, err := ecdsaprysm.ConvertFromInterfacePrivKey(priv) assert.NoError(t, err) localNode := enode.NewLocalNode(db, convertedKey) entry := enr.WithEntry(attSubnetEnrKey, make([]byte, byteCount(int(attestationSubnetCount))+1)) localNode.Set(entry) return localNode.Node().Record() }, want: []uint64{}, wantErr: true, errContains: "invalid bitvector provided, it has a size of", }, { name: "very large subnet", record: func(t *testing.T) *enr.Record { db, err := enode.OpenDB("") assert.NoError(t, err) priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader) assert.NoError(t, err) convertedKey, err := ecdsaprysm.ConvertFromInterfacePrivKey(priv) assert.NoError(t, err) localNode := enode.NewLocalNode(db, convertedKey) entry := enr.WithEntry(attSubnetEnrKey, make([]byte, byteCount(int(attestationSubnetCount))+100)) localNode.Set(entry) return localNode.Node().Record() }, want: []uint64{}, wantErr: true, errContains: "invalid bitvector provided, it has a size of", }, { name: "single subnet", record: func(t *testing.T) *enr.Record { db, err := enode.OpenDB("") assert.NoError(t, err) priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader) assert.NoError(t, err) convertedKey, err := ecdsaprysm.ConvertFromInterfacePrivKey(priv) assert.NoError(t, err) localNode := enode.NewLocalNode(db, convertedKey) bitV := bitfield.NewBitvector64() bitV.SetBitAt(0, true) entry := enr.WithEntry(attSubnetEnrKey, bitV.Bytes()) localNode.Set(entry) return localNode.Node().Record() }, want: []uint64{0}, wantErr: false, }, { name: "multiple subnets", record: func(t *testing.T) *enr.Record { db, err := enode.OpenDB("") assert.NoError(t, err) priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader) assert.NoError(t, err) convertedKey, err := ecdsaprysm.ConvertFromInterfacePrivKey(priv) assert.NoError(t, err) localNode := enode.NewLocalNode(db, convertedKey) bitV := bitfield.NewBitvector64() for i := uint64(0); i < bitV.Len(); i++ { // skip 2 subnets if (i+1)%2 == 0 { continue } bitV.SetBitAt(i, true) } bitV.SetBitAt(0, true) entry := enr.WithEntry(attSubnetEnrKey, bitV.Bytes()) localNode.Set(entry) return localNode.Node().Record() }, want: []uint64{0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62}, wantErr: false, }, { name: "all subnets", record: func(t *testing.T) *enr.Record { db, err := enode.OpenDB("") assert.NoError(t, err) priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader) assert.NoError(t, err) convertedKey, err := ecdsaprysm.ConvertFromInterfacePrivKey(priv) assert.NoError(t, err) localNode := enode.NewLocalNode(db, convertedKey) bitV := bitfield.NewBitvector64() for i := uint64(0); i < bitV.Len(); i++ { bitV.SetBitAt(i, true) } entry := enr.WithEntry(attSubnetEnrKey, bitV.Bytes()) localNode.Set(entry) return localNode.Node().Record() }, want: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63}, wantErr: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { got, err := attSubnets(tt.record(t)) if (err != nil) != tt.wantErr { t.Errorf("syncSubnets() error = %v, wantErr %v", err, tt.wantErr) return } if tt.wantErr { assert.ErrorContains(t, tt.errContains, err) } if !reflect.DeepEqual(got, tt.want) { t.Errorf("syncSubnets() got = %v, want %v", got, tt.want) } }) } } func Test_SyncSubnets(t *testing.T) { params.SetupTestConfigCleanup(t) tests := []struct { name string record func(t *testing.T) *enr.Record want []uint64 wantErr bool errContains string }{ { name: "valid record", record: func(t *testing.T) *enr.Record { db, err := enode.OpenDB("") assert.NoError(t, err) priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader) assert.NoError(t, err) convertedKey, err := ecdsaprysm.ConvertFromInterfacePrivKey(priv) assert.NoError(t, err) localNode := enode.NewLocalNode(db, convertedKey) localNode = initializeSyncCommSubnets(localNode) return localNode.Node().Record() }, want: nil, wantErr: false, }, { name: "too small subnet", record: func(t *testing.T) *enr.Record { db, err := enode.OpenDB("") assert.NoError(t, err) priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader) assert.NoError(t, err) convertedKey, err := ecdsaprysm.ConvertFromInterfacePrivKey(priv) assert.NoError(t, err) localNode := enode.NewLocalNode(db, convertedKey) entry := enr.WithEntry(syncCommsSubnetEnrKey, []byte{}) localNode.Set(entry) return localNode.Node().Record() }, want: []uint64{}, wantErr: true, errContains: "invalid bitvector provided, it has a size of", }, { name: "too large subnet", record: func(t *testing.T) *enr.Record { db, err := enode.OpenDB("") assert.NoError(t, err) priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader) assert.NoError(t, err) convertedKey, err := ecdsaprysm.ConvertFromInterfacePrivKey(priv) assert.NoError(t, err) localNode := enode.NewLocalNode(db, convertedKey) entry := enr.WithEntry(syncCommsSubnetEnrKey, make([]byte, byteCount(int(syncCommsSubnetCount))+1)) localNode.Set(entry) return localNode.Node().Record() }, want: []uint64{}, wantErr: true, errContains: "invalid bitvector provided, it has a size of", }, { name: "very large subnet", record: func(t *testing.T) *enr.Record { db, err := enode.OpenDB("") assert.NoError(t, err) priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader) assert.NoError(t, err) convertedKey, err := ecdsaprysm.ConvertFromInterfacePrivKey(priv) assert.NoError(t, err) localNode := enode.NewLocalNode(db, convertedKey) entry := enr.WithEntry(syncCommsSubnetEnrKey, make([]byte, byteCount(int(syncCommsSubnetCount))+100)) localNode.Set(entry) return localNode.Node().Record() }, want: []uint64{}, wantErr: true, errContains: "invalid bitvector provided, it has a size of", }, { name: "single subnet", record: func(t *testing.T) *enr.Record { db, err := enode.OpenDB("") assert.NoError(t, err) priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader) assert.NoError(t, err) convertedKey, err := ecdsaprysm.ConvertFromInterfacePrivKey(priv) assert.NoError(t, err) localNode := enode.NewLocalNode(db, convertedKey) bitV := bitfield.Bitvector4{byte(0x00)} bitV.SetBitAt(0, true) entry := enr.WithEntry(syncCommsSubnetEnrKey, bitV.Bytes()) localNode.Set(entry) return localNode.Node().Record() }, want: []uint64{0}, wantErr: false, }, { name: "multiple subnets", record: func(t *testing.T) *enr.Record { db, err := enode.OpenDB("") assert.NoError(t, err) priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader) assert.NoError(t, err) convertedKey, err := ecdsaprysm.ConvertFromInterfacePrivKey(priv) assert.NoError(t, err) localNode := enode.NewLocalNode(db, convertedKey) bitV := bitfield.Bitvector4{byte(0x00)} for i := uint64(0); i < bitV.Len(); i++ { // skip 2 subnets if (i+1)%2 == 0 { continue } bitV.SetBitAt(i, true) } bitV.SetBitAt(0, true) entry := enr.WithEntry(syncCommsSubnetEnrKey, bitV.Bytes()) localNode.Set(entry) return localNode.Node().Record() }, want: []uint64{0, 2}, wantErr: false, }, { name: "all subnets", record: func(t *testing.T) *enr.Record { db, err := enode.OpenDB("") assert.NoError(t, err) priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader) assert.NoError(t, err) convertedKey, err := ecdsaprysm.ConvertFromInterfacePrivKey(priv) assert.NoError(t, err) localNode := enode.NewLocalNode(db, convertedKey) bitV := bitfield.Bitvector4{byte(0x00)} for i := uint64(0); i < bitV.Len(); i++ { bitV.SetBitAt(i, true) } entry := enr.WithEntry(syncCommsSubnetEnrKey, bitV.Bytes()) localNode.Set(entry) return localNode.Node().Record() }, want: []uint64{0, 1, 2, 3}, wantErr: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { got, err := syncSubnets(tt.record(t)) if (err != nil) != tt.wantErr { t.Errorf("syncSubnets() error = %v, wantErr %v", err, tt.wantErr) return } if tt.wantErr { assert.ErrorContains(t, tt.errContains, err) } if !reflect.DeepEqual(got, tt.want) { t.Errorf("syncSubnets() got = %v, want %v", got, tt.want) } }) } }