mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-31 23:41:22 +00:00
07e7e030d9
* Update pubsub and fix topicIDs * WIP filter * Add suggested code from @bidlocode * add tests and fix bugs * more tests * Wait until state initialized to accept pubsub filtering * rename for clarity and clarify comment * fix test builds * Autofix issues in 2 files Resolved issues in the following files via DeepSource Autofix: 1. beacon-chain/p2p/pubsub_filter.go 2. beacon-chain/p2p/pubsub_filter_test.go * @nisdas pr feedback * pr feedback and fuzz fix * Update beacon-chain/p2p/pubsub_filter.go * Must have protocol suffix * Must have protocol suffix * gofmt * rm test, fix panic * Fix tests * Add isInitialized check * Add a few more tests for better coverage * cache fork digest, make pubsub filter part of the p2p service * rename service * gofmt * Add comment * fix Co-authored-by: deepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com> Co-authored-by: Nishant Das <nishdas93@gmail.com> Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
316 lines
8.8 KiB
Go
316 lines
8.8 KiB
Go
package p2p
|
|
|
|
import (
|
|
"context"
|
|
"crypto/ecdsa"
|
|
"fmt"
|
|
"net"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/p2p/discover"
|
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
|
"github.com/libp2p/go-libp2p"
|
|
"github.com/libp2p/go-libp2p-core/host"
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
|
"github.com/multiformats/go-multiaddr"
|
|
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
|
|
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
|
|
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
|
"github.com/prysmaticlabs/prysm/shared/event"
|
|
"github.com/prysmaticlabs/prysm/shared/p2putils"
|
|
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
|
|
"github.com/prysmaticlabs/prysm/shared/testutil/require"
|
|
"github.com/prysmaticlabs/prysm/shared/timeutils"
|
|
logTest "github.com/sirupsen/logrus/hooks/test"
|
|
)
|
|
|
|
type mockListener struct{}
|
|
|
|
func (mockListener) Self() *enode.Node {
|
|
panic("implement me")
|
|
}
|
|
|
|
func (mockListener) Close() {
|
|
//no-op
|
|
}
|
|
|
|
func (mockListener) Lookup(enode.ID) []*enode.Node {
|
|
panic("implement me")
|
|
}
|
|
|
|
func (mockListener) ReadRandomNodes(_ []*enode.Node) int {
|
|
panic("implement me")
|
|
}
|
|
|
|
func (mockListener) Resolve(*enode.Node) *enode.Node {
|
|
panic("implement me")
|
|
}
|
|
|
|
func (mockListener) Ping(*enode.Node) error {
|
|
panic("implement me")
|
|
}
|
|
|
|
func (mockListener) RequestENR(*enode.Node) (*enode.Node, error) {
|
|
panic("implement me")
|
|
}
|
|
|
|
func (mockListener) LocalNode() *enode.LocalNode {
|
|
panic("implement me")
|
|
}
|
|
|
|
func (mockListener) RandomNodes() enode.Iterator {
|
|
panic("implement me")
|
|
}
|
|
|
|
func createHost(t *testing.T, port int) (host.Host, *ecdsa.PrivateKey, net.IP) {
|
|
_, pkey := createAddrAndPrivKey(t)
|
|
ipAddr := net.ParseIP("127.0.0.1")
|
|
listen, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, port))
|
|
require.NoError(t, err, "Failed to p2p listen")
|
|
h, err := libp2p.New(context.Background(), []libp2p.Option{privKeyOption(pkey), libp2p.ListenAddrs(listen)}...)
|
|
require.NoError(t, err)
|
|
return h, pkey, ipAddr
|
|
}
|
|
|
|
func TestService_Stop_SetsStartedToFalse(t *testing.T) {
|
|
s, err := NewService(context.Background(), &Config{StateNotifier: &mock.MockStateNotifier{}})
|
|
require.NoError(t, err)
|
|
s.started = true
|
|
s.dv5Listener = &mockListener{}
|
|
assert.NoError(t, s.Stop())
|
|
assert.Equal(t, false, s.started)
|
|
}
|
|
|
|
func TestService_Stop_DontPanicIfDv5ListenerIsNotInited(t *testing.T) {
|
|
s, err := NewService(context.Background(), &Config{StateNotifier: &mock.MockStateNotifier{}})
|
|
require.NoError(t, err)
|
|
assert.NoError(t, s.Stop())
|
|
}
|
|
|
|
func TestService_Start_OnlyStartsOnce(t *testing.T) {
|
|
hook := logTest.NewGlobal()
|
|
|
|
cfg := &Config{
|
|
TCPPort: 2000,
|
|
UDPPort: 2000,
|
|
StateNotifier: &mock.MockStateNotifier{},
|
|
}
|
|
s, err := NewService(context.Background(), cfg)
|
|
require.NoError(t, err)
|
|
s.stateNotifier = &mock.MockStateNotifier{}
|
|
s.dv5Listener = &mockListener{}
|
|
exitRoutine := make(chan bool)
|
|
go func() {
|
|
s.Start()
|
|
<-exitRoutine
|
|
}()
|
|
// Send in a loop to ensure it is delivered (busy wait for the service to subscribe to the state feed).
|
|
for sent := 0; sent == 0; {
|
|
sent = s.stateNotifier.StateFeed().Send(&feed.Event{
|
|
Type: statefeed.Initialized,
|
|
Data: &statefeed.InitializedData{
|
|
StartTime: time.Now(),
|
|
GenesisValidatorsRoot: make([]byte, 32),
|
|
},
|
|
})
|
|
}
|
|
time.Sleep(time.Second * 2)
|
|
assert.Equal(t, true, s.started, "Expected service to be started")
|
|
s.Start()
|
|
require.LogsContain(t, hook, "Attempted to start p2p service when it was already started")
|
|
require.NoError(t, s.Stop())
|
|
exitRoutine <- true
|
|
}
|
|
|
|
func TestService_Status_NotRunning(t *testing.T) {
|
|
s := &Service{started: false}
|
|
s.dv5Listener = &mockListener{}
|
|
assert.ErrorContains(t, "not running", s.Status(), "Status returned wrong error")
|
|
}
|
|
|
|
func TestListenForNewNodes(t *testing.T) {
|
|
// Setup bootnode.
|
|
notifier := &mock.MockStateNotifier{}
|
|
cfg := &Config{StateNotifier: notifier}
|
|
port := 2000
|
|
cfg.UDPPort = uint(port)
|
|
_, pkey := createAddrAndPrivKey(t)
|
|
ipAddr := net.ParseIP("127.0.0.1")
|
|
genesisTime := timeutils.Now()
|
|
genesisValidatorsRoot := make([]byte, 32)
|
|
s := &Service{
|
|
cfg: cfg,
|
|
genesisTime: genesisTime,
|
|
genesisValidatorsRoot: genesisValidatorsRoot,
|
|
}
|
|
bootListener, err := s.createListener(ipAddr, pkey)
|
|
require.NoError(t, err)
|
|
defer bootListener.Close()
|
|
|
|
// Use shorter period for testing.
|
|
currentPeriod := pollingPeriod
|
|
pollingPeriod = 1 * time.Second
|
|
defer func() {
|
|
pollingPeriod = currentPeriod
|
|
}()
|
|
|
|
bootNode := bootListener.Self()
|
|
|
|
var listeners []*discover.UDPv5
|
|
var hosts []host.Host
|
|
// setup other nodes.
|
|
cfg = &Config{
|
|
BootstrapNodeAddr: []string{bootNode.String()},
|
|
Discv5BootStrapAddr: []string{bootNode.String()},
|
|
MaxPeers: 30,
|
|
StateNotifier: notifier,
|
|
}
|
|
for i := 1; i <= 5; i++ {
|
|
h, pkey, ipAddr := createHost(t, port+i)
|
|
cfg.UDPPort = uint(port + i)
|
|
cfg.TCPPort = uint(port + i)
|
|
s := &Service{
|
|
cfg: cfg,
|
|
genesisTime: genesisTime,
|
|
genesisValidatorsRoot: genesisValidatorsRoot,
|
|
}
|
|
listener, err := s.startDiscoveryV5(ipAddr, pkey)
|
|
assert.NoError(t, err, "Could not start discovery for node")
|
|
listeners = append(listeners, listener)
|
|
hosts = append(hosts, h)
|
|
}
|
|
defer func() {
|
|
// Close down all peers.
|
|
for _, listener := range listeners {
|
|
listener.Close()
|
|
}
|
|
}()
|
|
|
|
// close peers upon exit of test
|
|
defer func() {
|
|
for _, h := range hosts {
|
|
if err := h.Close(); err != nil {
|
|
t.Log(err)
|
|
}
|
|
}
|
|
}()
|
|
|
|
cfg.UDPPort = 14000
|
|
cfg.TCPPort = 14001
|
|
|
|
s, err = NewService(context.Background(), cfg)
|
|
require.NoError(t, err)
|
|
exitRoutine := make(chan bool)
|
|
go func() {
|
|
s.Start()
|
|
<-exitRoutine
|
|
}()
|
|
time.Sleep(1 * time.Second)
|
|
// Send in a loop to ensure it is delivered (busy wait for the service to subscribe to the state feed).
|
|
for sent := 0; sent == 0; {
|
|
sent = s.stateNotifier.StateFeed().Send(&feed.Event{
|
|
Type: statefeed.Initialized,
|
|
Data: &statefeed.InitializedData{
|
|
StartTime: genesisTime,
|
|
GenesisValidatorsRoot: genesisValidatorsRoot,
|
|
},
|
|
})
|
|
}
|
|
time.Sleep(4 * time.Second)
|
|
peers := s.host.Network().Peers()
|
|
assert.Equal(t, 5, len(peers), "Not all peers added to peerstore")
|
|
require.NoError(t, s.Stop())
|
|
exitRoutine <- true
|
|
}
|
|
|
|
func TestPeer_Disconnect(t *testing.T) {
|
|
h1, _, _ := createHost(t, 5000)
|
|
defer func() {
|
|
if err := h1.Close(); err != nil {
|
|
t.Log(err)
|
|
}
|
|
}()
|
|
|
|
s := &Service{
|
|
host: h1,
|
|
}
|
|
|
|
h2, _, ipaddr := createHost(t, 5001)
|
|
defer func() {
|
|
if err := h2.Close(); err != nil {
|
|
t.Log(err)
|
|
}
|
|
}()
|
|
|
|
h2Addr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", ipaddr, 5001, h2.ID()))
|
|
require.NoError(t, err)
|
|
addrInfo, err := peer.AddrInfoFromP2pAddr(h2Addr)
|
|
require.NoError(t, err)
|
|
require.NoError(t, s.host.Connect(context.Background(), *addrInfo))
|
|
assert.Equal(t, 1, len(s.host.Network().Peers()), "Invalid number of peers")
|
|
assert.Equal(t, 1, len(s.host.Network().Conns()), "Invalid number of connections")
|
|
require.NoError(t, s.Disconnect(h2.ID()))
|
|
assert.Equal(t, 0, len(s.host.Network().Conns()), "Invalid number of connections")
|
|
}
|
|
|
|
func TestService_JoinLeaveTopic(t *testing.T) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
|
defer cancel()
|
|
s, err := NewService(ctx, &Config{StateNotifier: &mock.MockStateNotifier{}})
|
|
require.NoError(t, err)
|
|
|
|
go s.awaitStateInitialized()
|
|
fd := initializeStateWithForkDigest(ctx, t, s.stateNotifier.StateFeed())
|
|
|
|
assert.Equal(t, 0, len(s.joinedTopics))
|
|
|
|
topic := fmt.Sprintf(AttestationSubnetTopicFormat, fd, 42) + "/" + encoder.ProtocolSuffixSSZSnappy
|
|
topicHandle, err := s.JoinTopic(topic)
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, 1, len(s.joinedTopics))
|
|
|
|
if topicHandle == nil {
|
|
t.Fatal("topic is nil")
|
|
}
|
|
|
|
sub, err := topicHandle.Subscribe()
|
|
assert.NoError(t, err)
|
|
|
|
// Try leaving topic that has subscriptions.
|
|
want := "cannot close topic: outstanding event handlers or subscriptions"
|
|
assert.ErrorContains(t, want, s.LeaveTopic(topic))
|
|
|
|
// After subscription is cancelled, leaving topic should not result in error.
|
|
sub.Cancel()
|
|
assert.NoError(t, s.LeaveTopic(topic))
|
|
}
|
|
|
|
// initializeStateWithForkDigest sets up the state feed initialized event and returns the fork
|
|
// digest associated with that genesis event.
|
|
func initializeStateWithForkDigest(ctx context.Context, t *testing.T, ef *event.Feed) [4]byte {
|
|
gt := timeutils.Now()
|
|
gvr := bytesutil.PadTo([]byte("genesis validator root"), 32)
|
|
for n := 0; n == 0; {
|
|
if ctx.Err() != nil {
|
|
t.Fatal(ctx.Err())
|
|
}
|
|
n = ef.Send(&feed.Event{
|
|
Type: statefeed.Initialized,
|
|
Data: &statefeed.InitializedData{
|
|
StartTime: gt,
|
|
GenesisValidatorsRoot: gvr,
|
|
},
|
|
})
|
|
}
|
|
|
|
fd, err := p2putils.CreateForkDigest(gt, gvr)
|
|
require.NoError(t, err)
|
|
|
|
time.Sleep(50 * time.Millisecond) // wait for pubsub filter to initialize.
|
|
|
|
return fd
|
|
}
|