prysm-pulse/p2p/simulations/http_test.go
Preston Van Loon a41e372b39 First pass collation
Former-commit-id: fa0dc873367b92a06b10fd989e3fec960c03065a [formerly e98efffa565ea6aa3eadeb19c2f9a66a56eb5ddd]
Former-commit-id: ce6e4f01d12015e67a51aa7d928a6ab8e8eedeee
2018-01-12 22:59:17 -05:00

824 lines
20 KiB
Go

// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package simulations
import (
"context"
"fmt"
"math/rand"
"net/http/httptest"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/rpc"
)
// testService implements the node.Service interface and provides protocols
// and APIs which are useful for testing nodes in a simulation network
type testService struct {
id discover.NodeID
// peerCount is incremented once a peer handshake has been performed
peerCount int64
peers map[discover.NodeID]*testPeer
peersMtx sync.Mutex
// state stores []byte which is used to test creating and loading
// snapshots
state atomic.Value
}
func newTestService(ctx *adapters.ServiceContext) (node.Service, error) {
svc := &testService{
id: ctx.Config.ID,
peers: make(map[discover.NodeID]*testPeer),
}
svc.state.Store(ctx.Snapshot)
return svc, nil
}
type testPeer struct {
testReady chan struct{}
dumReady chan struct{}
}
func (t *testService) peer(id discover.NodeID) *testPeer {
t.peersMtx.Lock()
defer t.peersMtx.Unlock()
if peer, ok := t.peers[id]; ok {
return peer
}
peer := &testPeer{
testReady: make(chan struct{}),
dumReady: make(chan struct{}),
}
t.peers[id] = peer
return peer
}
func (t *testService) Protocols() []p2p.Protocol {
return []p2p.Protocol{
{
Name: "test",
Version: 1,
Length: 3,
Run: t.RunTest,
},
{
Name: "dum",
Version: 1,
Length: 1,
Run: t.RunDum,
},
{
Name: "prb",
Version: 1,
Length: 1,
Run: t.RunPrb,
},
}
}
func (t *testService) APIs() []rpc.API {
return []rpc.API{{
Namespace: "test",
Version: "1.0",
Service: &TestAPI{
state: &t.state,
peerCount: &t.peerCount,
},
}}
}
func (t *testService) Start(server *p2p.Server) error {
return nil
}
func (t *testService) Stop() error {
return nil
}
// handshake performs a peer handshake by sending and expecting an empty
// message with the given code
func (t *testService) handshake(rw p2p.MsgReadWriter, code uint64) error {
errc := make(chan error, 2)
go func() { errc <- p2p.Send(rw, code, struct{}{}) }()
go func() { errc <- p2p.ExpectMsg(rw, code, struct{}{}) }()
for i := 0; i < 2; i++ {
if err := <-errc; err != nil {
return err
}
}
return nil
}
func (t *testService) RunTest(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := t.peer(p.ID())
// perform three handshakes with three different message codes,
// used to test message sending and filtering
if err := t.handshake(rw, 2); err != nil {
return err
}
if err := t.handshake(rw, 1); err != nil {
return err
}
if err := t.handshake(rw, 0); err != nil {
return err
}
// close the testReady channel so that other protocols can run
close(peer.testReady)
// track the peer
atomic.AddInt64(&t.peerCount, 1)
defer atomic.AddInt64(&t.peerCount, -1)
// block until the peer is dropped
for {
_, err := rw.ReadMsg()
if err != nil {
return err
}
}
}
func (t *testService) RunDum(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := t.peer(p.ID())
// wait for the test protocol to perform its handshake
<-peer.testReady
// perform a handshake
if err := t.handshake(rw, 0); err != nil {
return err
}
// close the dumReady channel so that other protocols can run
close(peer.dumReady)
// block until the peer is dropped
for {
_, err := rw.ReadMsg()
if err != nil {
return err
}
}
}
func (t *testService) RunPrb(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := t.peer(p.ID())
// wait for the dum protocol to perform its handshake
<-peer.dumReady
// perform a handshake
if err := t.handshake(rw, 0); err != nil {
return err
}
// block until the peer is dropped
for {
_, err := rw.ReadMsg()
if err != nil {
return err
}
}
}
func (t *testService) Snapshot() ([]byte, error) {
return t.state.Load().([]byte), nil
}
// TestAPI provides a test API to:
// * get the peer count
// * get and set an arbitrary state byte slice
// * get and increment a counter
// * subscribe to counter increment events
type TestAPI struct {
state *atomic.Value
peerCount *int64
counter int64
feed event.Feed
}
func (t *TestAPI) PeerCount() int64 {
return atomic.LoadInt64(t.peerCount)
}
func (t *TestAPI) Get() int64 {
return atomic.LoadInt64(&t.counter)
}
func (t *TestAPI) Add(delta int64) {
atomic.AddInt64(&t.counter, delta)
t.feed.Send(delta)
}
func (t *TestAPI) GetState() []byte {
return t.state.Load().([]byte)
}
func (t *TestAPI) SetState(state []byte) {
t.state.Store(state)
}
func (t *TestAPI) Events(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return nil, rpc.ErrNotificationsUnsupported
}
rpcSub := notifier.CreateSubscription()
go func() {
events := make(chan int64)
sub := t.feed.Subscribe(events)
defer sub.Unsubscribe()
for {
select {
case event := <-events:
notifier.Notify(rpcSub.ID, event)
case <-sub.Err():
return
case <-rpcSub.Err():
return
case <-notifier.Closed():
return
}
}
}()
return rpcSub, nil
}
var testServices = adapters.Services{
"test": newTestService,
}
func testHTTPServer(t *testing.T) (*Network, *httptest.Server) {
adapter := adapters.NewSimAdapter(testServices)
network := NewNetwork(adapter, &NetworkConfig{
DefaultService: "test",
})
return network, httptest.NewServer(NewServer(network))
}
// TestHTTPNetwork tests interacting with a simulation network using the HTTP
// API
func TestHTTPNetwork(t *testing.T) {
// start the server
network, s := testHTTPServer(t)
defer s.Close()
// subscribe to events so we can check them later
client := NewClient(s.URL)
events := make(chan *Event, 100)
var opts SubscribeOpts
sub, err := client.SubscribeNetwork(events, opts)
if err != nil {
t.Fatalf("error subscribing to network events: %s", err)
}
defer sub.Unsubscribe()
// check we can retrieve details about the network
gotNetwork, err := client.GetNetwork()
if err != nil {
t.Fatalf("error getting network: %s", err)
}
if gotNetwork.ID != network.ID {
t.Fatalf("expected network to have ID %q, got %q", network.ID, gotNetwork.ID)
}
// start a simulation network
nodeIDs := startTestNetwork(t, client)
// check we got all the events
x := &expectEvents{t, events, sub}
x.expect(
x.nodeEvent(nodeIDs[0], false),
x.nodeEvent(nodeIDs[1], false),
x.nodeEvent(nodeIDs[0], true),
x.nodeEvent(nodeIDs[1], true),
x.connEvent(nodeIDs[0], nodeIDs[1], false),
x.connEvent(nodeIDs[0], nodeIDs[1], true),
)
// reconnect the stream and check we get the current nodes and conns
events = make(chan *Event, 100)
opts.Current = true
sub, err = client.SubscribeNetwork(events, opts)
if err != nil {
t.Fatalf("error subscribing to network events: %s", err)
}
defer sub.Unsubscribe()
x = &expectEvents{t, events, sub}
x.expect(
x.nodeEvent(nodeIDs[0], true),
x.nodeEvent(nodeIDs[1], true),
x.connEvent(nodeIDs[0], nodeIDs[1], true),
)
}
func startTestNetwork(t *testing.T, client *Client) []string {
// create two nodes
nodeCount := 2
nodeIDs := make([]string, nodeCount)
for i := 0; i < nodeCount; i++ {
node, err := client.CreateNode(nil)
if err != nil {
t.Fatalf("error creating node: %s", err)
}
nodeIDs[i] = node.ID
}
// check both nodes exist
nodes, err := client.GetNodes()
if err != nil {
t.Fatalf("error getting nodes: %s", err)
}
if len(nodes) != nodeCount {
t.Fatalf("expected %d nodes, got %d", nodeCount, len(nodes))
}
for i, nodeID := range nodeIDs {
if nodes[i].ID != nodeID {
t.Fatalf("expected node %d to have ID %q, got %q", i, nodeID, nodes[i].ID)
}
node, err := client.GetNode(nodeID)
if err != nil {
t.Fatalf("error getting node %d: %s", i, err)
}
if node.ID != nodeID {
t.Fatalf("expected node %d to have ID %q, got %q", i, nodeID, node.ID)
}
}
// start both nodes
for _, nodeID := range nodeIDs {
if err := client.StartNode(nodeID); err != nil {
t.Fatalf("error starting node %q: %s", nodeID, err)
}
}
// connect the nodes
for i := 0; i < nodeCount-1; i++ {
peerId := i + 1
if i == nodeCount-1 {
peerId = 0
}
if err := client.ConnectNode(nodeIDs[i], nodeIDs[peerId]); err != nil {
t.Fatalf("error connecting nodes: %s", err)
}
}
return nodeIDs
}
type expectEvents struct {
*testing.T
events chan *Event
sub event.Subscription
}
func (t *expectEvents) nodeEvent(id string, up bool) *Event {
return &Event{
Type: EventTypeNode,
Node: &Node{
Config: &adapters.NodeConfig{
ID: discover.MustHexID(id),
},
Up: up,
},
}
}
func (t *expectEvents) connEvent(one, other string, up bool) *Event {
return &Event{
Type: EventTypeConn,
Conn: &Conn{
One: discover.MustHexID(one),
Other: discover.MustHexID(other),
Up: up,
},
}
}
func (t *expectEvents) expectMsgs(expected map[MsgFilter]int) {
actual := make(map[MsgFilter]int)
timeout := time.After(10 * time.Second)
loop:
for {
select {
case event := <-t.events:
t.Logf("received %s event: %s", event.Type, event)
if event.Type != EventTypeMsg || event.Msg.Received {
continue loop
}
if event.Msg == nil {
t.Fatal("expected event.Msg to be set")
}
filter := MsgFilter{
Proto: event.Msg.Protocol,
Code: int64(event.Msg.Code),
}
actual[filter]++
if actual[filter] > expected[filter] {
t.Fatalf("received too many msgs for filter: %v", filter)
}
if reflect.DeepEqual(actual, expected) {
return
}
case err := <-t.sub.Err():
t.Fatalf("network stream closed unexpectedly: %s", err)
case <-timeout:
t.Fatal("timed out waiting for expected events")
}
}
}
func (t *expectEvents) expect(events ...*Event) {
timeout := time.After(10 * time.Second)
i := 0
for {
select {
case event := <-t.events:
t.Logf("received %s event: %s", event.Type, event)
expected := events[i]
if event.Type != expected.Type {
t.Fatalf("expected event %d to have type %q, got %q", i, expected.Type, event.Type)
}
switch expected.Type {
case EventTypeNode:
if event.Node == nil {
t.Fatal("expected event.Node to be set")
}
if event.Node.ID() != expected.Node.ID() {
t.Fatalf("expected node event %d to have id %q, got %q", i, expected.Node.ID().TerminalString(), event.Node.ID().TerminalString())
}
if event.Node.Up != expected.Node.Up {
t.Fatalf("expected node event %d to have up=%t, got up=%t", i, expected.Node.Up, event.Node.Up)
}
case EventTypeConn:
if event.Conn == nil {
t.Fatal("expected event.Conn to be set")
}
if event.Conn.One != expected.Conn.One {
t.Fatalf("expected conn event %d to have one=%q, got one=%q", i, expected.Conn.One.TerminalString(), event.Conn.One.TerminalString())
}
if event.Conn.Other != expected.Conn.Other {
t.Fatalf("expected conn event %d to have other=%q, got other=%q", i, expected.Conn.Other.TerminalString(), event.Conn.Other.TerminalString())
}
if event.Conn.Up != expected.Conn.Up {
t.Fatalf("expected conn event %d to have up=%t, got up=%t", i, expected.Conn.Up, event.Conn.Up)
}
}
i++
if i == len(events) {
return
}
case err := <-t.sub.Err():
t.Fatalf("network stream closed unexpectedly: %s", err)
case <-timeout:
t.Fatal("timed out waiting for expected events")
}
}
}
// TestHTTPNodeRPC tests calling RPC methods on nodes via the HTTP API
func TestHTTPNodeRPC(t *testing.T) {
// start the server
_, s := testHTTPServer(t)
defer s.Close()
// start a node in the network
client := NewClient(s.URL)
node, err := client.CreateNode(nil)
if err != nil {
t.Fatalf("error creating node: %s", err)
}
if err := client.StartNode(node.ID); err != nil {
t.Fatalf("error starting node: %s", err)
}
// create two RPC clients
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
rpcClient1, err := client.RPCClient(ctx, node.ID)
if err != nil {
t.Fatalf("error getting node RPC client: %s", err)
}
rpcClient2, err := client.RPCClient(ctx, node.ID)
if err != nil {
t.Fatalf("error getting node RPC client: %s", err)
}
// subscribe to events using client 1
events := make(chan int64, 1)
sub, err := rpcClient1.Subscribe(ctx, "test", events, "events")
if err != nil {
t.Fatalf("error subscribing to events: %s", err)
}
defer sub.Unsubscribe()
// call some RPC methods using client 2
if err := rpcClient2.CallContext(ctx, nil, "test_add", 10); err != nil {
t.Fatalf("error calling RPC method: %s", err)
}
var result int64
if err := rpcClient2.CallContext(ctx, &result, "test_get"); err != nil {
t.Fatalf("error calling RPC method: %s", err)
}
if result != 10 {
t.Fatalf("expected result to be 10, got %d", result)
}
// check we got an event from client 1
select {
case event := <-events:
if event != 10 {
t.Fatalf("expected event to be 10, got %d", event)
}
case <-ctx.Done():
t.Fatal(ctx.Err())
}
}
// TestHTTPSnapshot tests creating and loading network snapshots
func TestHTTPSnapshot(t *testing.T) {
// start the server
_, s := testHTTPServer(t)
defer s.Close()
// create a two-node network
client := NewClient(s.URL)
nodeCount := 2
nodes := make([]*p2p.NodeInfo, nodeCount)
for i := 0; i < nodeCount; i++ {
node, err := client.CreateNode(nil)
if err != nil {
t.Fatalf("error creating node: %s", err)
}
if err := client.StartNode(node.ID); err != nil {
t.Fatalf("error starting node: %s", err)
}
nodes[i] = node
}
if err := client.ConnectNode(nodes[0].ID, nodes[1].ID); err != nil {
t.Fatalf("error connecting nodes: %s", err)
}
// store some state in the test services
states := make([]string, nodeCount)
for i, node := range nodes {
rpc, err := client.RPCClient(context.Background(), node.ID)
if err != nil {
t.Fatalf("error getting RPC client: %s", err)
}
defer rpc.Close()
state := fmt.Sprintf("%x", rand.Int())
if err := rpc.Call(nil, "test_setState", []byte(state)); err != nil {
t.Fatalf("error setting service state: %s", err)
}
states[i] = state
}
// create a snapshot
snap, err := client.CreateSnapshot()
if err != nil {
t.Fatalf("error creating snapshot: %s", err)
}
for i, state := range states {
gotState := snap.Nodes[i].Snapshots["test"]
if string(gotState) != state {
t.Fatalf("expected snapshot state %q, got %q", state, gotState)
}
}
// create another network
_, s = testHTTPServer(t)
defer s.Close()
client = NewClient(s.URL)
// subscribe to events so we can check them later
events := make(chan *Event, 100)
var opts SubscribeOpts
sub, err := client.SubscribeNetwork(events, opts)
if err != nil {
t.Fatalf("error subscribing to network events: %s", err)
}
defer sub.Unsubscribe()
// load the snapshot
if err := client.LoadSnapshot(snap); err != nil {
t.Fatalf("error loading snapshot: %s", err)
}
// check the nodes and connection exists
net, err := client.GetNetwork()
if err != nil {
t.Fatalf("error getting network: %s", err)
}
if len(net.Nodes) != nodeCount {
t.Fatalf("expected network to have %d nodes, got %d", nodeCount, len(net.Nodes))
}
for i, node := range nodes {
id := net.Nodes[i].ID().String()
if id != node.ID {
t.Fatalf("expected node %d to have ID %s, got %s", i, node.ID, id)
}
}
if len(net.Conns) != 1 {
t.Fatalf("expected network to have 1 connection, got %d", len(net.Conns))
}
conn := net.Conns[0]
if conn.One.String() != nodes[0].ID {
t.Fatalf("expected connection to have one=%q, got one=%q", nodes[0].ID, conn.One)
}
if conn.Other.String() != nodes[1].ID {
t.Fatalf("expected connection to have other=%q, got other=%q", nodes[1].ID, conn.Other)
}
// check the node states were restored
for i, node := range nodes {
rpc, err := client.RPCClient(context.Background(), node.ID)
if err != nil {
t.Fatalf("error getting RPC client: %s", err)
}
defer rpc.Close()
var state []byte
if err := rpc.Call(&state, "test_getState"); err != nil {
t.Fatalf("error getting service state: %s", err)
}
if string(state) != states[i] {
t.Fatalf("expected snapshot state %q, got %q", states[i], state)
}
}
// check we got all the events
x := &expectEvents{t, events, sub}
x.expect(
x.nodeEvent(nodes[0].ID, false),
x.nodeEvent(nodes[0].ID, true),
x.nodeEvent(nodes[1].ID, false),
x.nodeEvent(nodes[1].ID, true),
x.connEvent(nodes[0].ID, nodes[1].ID, false),
x.connEvent(nodes[0].ID, nodes[1].ID, true),
)
}
// TestMsgFilterPassMultiple tests streaming message events using a filter
// with multiple protocols
func TestMsgFilterPassMultiple(t *testing.T) {
// start the server
_, s := testHTTPServer(t)
defer s.Close()
// subscribe to events with a message filter
client := NewClient(s.URL)
events := make(chan *Event, 10)
opts := SubscribeOpts{
Filter: "prb:0-test:0",
}
sub, err := client.SubscribeNetwork(events, opts)
if err != nil {
t.Fatalf("error subscribing to network events: %s", err)
}
defer sub.Unsubscribe()
// start a simulation network
startTestNetwork(t, client)
// check we got the expected events
x := &expectEvents{t, events, sub}
x.expectMsgs(map[MsgFilter]int{
{"test", 0}: 2,
{"prb", 0}: 2,
})
}
// TestMsgFilterPassWildcard tests streaming message events using a filter
// with a code wildcard
func TestMsgFilterPassWildcard(t *testing.T) {
// start the server
_, s := testHTTPServer(t)
defer s.Close()
// subscribe to events with a message filter
client := NewClient(s.URL)
events := make(chan *Event, 10)
opts := SubscribeOpts{
Filter: "prb:0,2-test:*",
}
sub, err := client.SubscribeNetwork(events, opts)
if err != nil {
t.Fatalf("error subscribing to network events: %s", err)
}
defer sub.Unsubscribe()
// start a simulation network
startTestNetwork(t, client)
// check we got the expected events
x := &expectEvents{t, events, sub}
x.expectMsgs(map[MsgFilter]int{
{"test", 2}: 2,
{"test", 1}: 2,
{"test", 0}: 2,
{"prb", 0}: 2,
})
}
// TestMsgFilterPassSingle tests streaming message events using a filter
// with a single protocol and code
func TestMsgFilterPassSingle(t *testing.T) {
// start the server
_, s := testHTTPServer(t)
defer s.Close()
// subscribe to events with a message filter
client := NewClient(s.URL)
events := make(chan *Event, 10)
opts := SubscribeOpts{
Filter: "dum:0",
}
sub, err := client.SubscribeNetwork(events, opts)
if err != nil {
t.Fatalf("error subscribing to network events: %s", err)
}
defer sub.Unsubscribe()
// start a simulation network
startTestNetwork(t, client)
// check we got the expected events
x := &expectEvents{t, events, sub}
x.expectMsgs(map[MsgFilter]int{
{"dum", 0}: 2,
})
}
// TestMsgFilterPassSingle tests streaming message events using an invalid
// filter
func TestMsgFilterFailBadParams(t *testing.T) {
// start the server
_, s := testHTTPServer(t)
defer s.Close()
client := NewClient(s.URL)
events := make(chan *Event, 10)
opts := SubscribeOpts{
Filter: "foo:",
}
_, err := client.SubscribeNetwork(events, opts)
if err == nil {
t.Fatalf("expected event subscription to fail but succeeded!")
}
opts.Filter = "bzz:aa"
_, err = client.SubscribeNetwork(events, opts)
if err == nil {
t.Fatalf("expected event subscription to fail but succeeded!")
}
opts.Filter = "invalid"
_, err = client.SubscribeNetwork(events, opts)
if err == nil {
t.Fatalf("expected event subscription to fail but succeeded!")
}
}