From b8e550b1e9d1146b2e6b71573de63968d9ba3328 Mon Sep 17 00:00:00 2001 From: Preston Van Loon Date: Sun, 18 Aug 2019 00:32:39 -0400 Subject: [PATCH] Add p2p broadcast implementation (#3226) * add broadcaster impl * change API so broadcast returns an error * change API so broadcast returns an error * add test for message not mapped * lint msg * lint msg --- beacon-chain/p2p/BUILD.bazel | 4 ++ beacon-chain/p2p/broadcaster.go | 29 +++++++++- beacon-chain/p2p/broadcaster_test.go | 83 ++++++++++++++++++++++++++++ beacon-chain/p2p/interfaces.go | 2 +- beacon-chain/p2p/service.go | 3 +- beacon-chain/p2p/testing/BUILD.bazel | 1 - beacon-chain/p2p/testing/p2p.go | 10 ++-- 7 files changed, 121 insertions(+), 11 deletions(-) create mode 100644 beacon-chain/p2p/broadcaster_test.go diff --git a/beacon-chain/p2p/BUILD.bazel b/beacon-chain/p2p/BUILD.bazel index f4f5a8949..1e1f6bb73 100644 --- a/beacon-chain/p2p/BUILD.bazel +++ b/beacon-chain/p2p/BUILD.bazel @@ -34,12 +34,16 @@ go_library( go_test( name = "go_default_test", srcs = [ + "broadcaster_test.go", "parameter_test.go", "service_test.go", ], embed = [":go_default_library"], deps = [ + "//beacon-chain/p2p/testing:go_default_library", + "//proto/testing:go_default_library", "//shared/testutil:go_default_library", + "@com_github_gogo_protobuf//proto:go_default_library", "@com_github_libp2p_go_libp2p_pubsub//:go_default_library", "@com_github_sirupsen_logrus//hooks/test:go_default_library", ], diff --git a/beacon-chain/p2p/broadcaster.go b/beacon-chain/p2p/broadcaster.go index 6caed16df..8e7c26f9d 100644 --- a/beacon-chain/p2p/broadcaster.go +++ b/beacon-chain/p2p/broadcaster.go @@ -1,8 +1,31 @@ package p2p -import "github.com/gogo/protobuf/proto" +import ( + "bytes" + "reflect" + + "github.com/gogo/protobuf/proto" + "github.com/pkg/errors" +) + +// ErrMessageNotMapped occurs on a Broadcast attempt when a message has not been defined in the +// GossipTypeMapping. +var ErrMessageNotMapped = errors.New("message type is not mapped to a PubSub topic") // Broadcast a message to the p2p network. -func (s *Service) Broadcast(msg proto.Message) { - // TODO(3147): implement +func (s *Service) Broadcast(msg proto.Message) error { + topic, ok := GossipTypeMapping[reflect.TypeOf(msg)] + if !ok { + return ErrMessageNotMapped + } + + buf := new(bytes.Buffer) + if _, err := s.Encoding().Encode(buf, msg); err != nil { + return errors.Wrap(err, "could not encode message") + } + + if err := s.pubsub.Publish(topic+s.Encoding().ProtocolSuffix(), buf.Bytes()); err != nil { + return errors.Wrap(err, "could not publish message") + } + return nil } diff --git a/beacon-chain/p2p/broadcaster_test.go b/beacon-chain/p2p/broadcaster_test.go new file mode 100644 index 000000000..948f3844d --- /dev/null +++ b/beacon-chain/p2p/broadcaster_test.go @@ -0,0 +1,83 @@ +package p2p + +import ( + "bytes" + "context" + "reflect" + "sync" + "testing" + "time" + + "github.com/gogo/protobuf/proto" + p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing" + testpb "github.com/prysmaticlabs/prysm/proto/testing" + "github.com/prysmaticlabs/prysm/shared/testutil" +) + +func TestService_Broadcast(t *testing.T) { + p1 := p2ptest.NewTestP2P(t) + p2 := p2ptest.NewTestP2P(t) + p1.Connect(p2) + if len(p1.Host.Network().Peers()) == 0 { + t.Fatal("No peers") + } + + p := &Service{ + host: p1.Host, + pubsub: p1.PubSub(), + } + + msg := &testpb.TestSimpleMessage{ + Bar: 55, + } + + // Set a test gossip mapping for testpb.TestSimpleMessage. + GossipTypeMapping[reflect.TypeOf(msg)] = "/testing" + + // External peer subscribes to the topic. + topic := "/testing" + p.Encoding().ProtocolSuffix() + sub, err := p2.PubSub().Subscribe(topic) + if err != nil { + t.Fatal(err) + } + + time.Sleep(50 * time.Millisecond) // libp2p fails without this delay... + + // Async listen for the pubsub, must be before the broadcast. + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + incomingMessage, err := sub.Next(ctx) + if err != nil { + t.Fatal(err) + } + + buf := bytes.NewBuffer(incomingMessage.Data) + result := &testpb.TestSimpleMessage{} + if err := p.Encoding().Decode(buf, result); err != nil { + t.Fatal(err) + } + if !proto.Equal(result, msg) { + t.Errorf("Did not receive expected message, got %+v, wanted %+v", result, msg) + } + }() + + // Broadcast to peers and wait. + if err := p.Broadcast(msg); err != nil { + t.Fatal(err) + } + if testutil.WaitTimeout(&wg, 1*time.Second) { + t.Error("Failed to receive pubsub within 1s") + } +} + +func TestService_Broadcast_ReturnsErr_TopicNotMapped(t *testing.T) { + p := Service{} + if err := p.Broadcast(&testpb.AddressBook{}); err != ErrMessageNotMapped { + t.Fatalf("Expected error %v, got %v", ErrMessageNotMapped, err) + } +} diff --git a/beacon-chain/p2p/interfaces.go b/beacon-chain/p2p/interfaces.go index 13e8f842f..f169e1649 100644 --- a/beacon-chain/p2p/interfaces.go +++ b/beacon-chain/p2p/interfaces.go @@ -21,7 +21,7 @@ type P2P interface { // Broadcaster broadcasts messages to peers over the p2p pubsub protocol. type Broadcaster interface { - Broadcast(proto.Message) + Broadcast(proto.Message) error } // SetStreamHandler configures p2p to handle streams of a certain topic ID. diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index cd614c586..706f5dce7 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -82,7 +82,8 @@ func (s *Service) Status() error { // Encoding returns the configured networking encoding. func (s *Service) Encoding() encoder.NetworkEncoding { - return nil + // TODO(3147): Return based on flag value + return &encoder.SszNetworkEncoder{} } // PubSub returns the p2p pubsub framework. diff --git a/beacon-chain/p2p/testing/BUILD.bazel b/beacon-chain/p2p/testing/BUILD.bazel index 2ce098756..da4a2652b 100644 --- a/beacon-chain/p2p/testing/BUILD.bazel +++ b/beacon-chain/p2p/testing/BUILD.bazel @@ -7,7 +7,6 @@ go_library( importpath = "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing", visibility = ["//beacon-chain:__subpackages__"], deps = [ - "//beacon-chain/p2p:go_default_library", "//beacon-chain/p2p/encoder:go_default_library", "//proto/beacon/p2p/v1:go_default_library", "@com_github_gogo_protobuf//proto:go_default_library", diff --git a/beacon-chain/p2p/testing/p2p.go b/beacon-chain/p2p/testing/p2p.go index 73735ed5d..057820e51 100644 --- a/beacon-chain/p2p/testing/p2p.go +++ b/beacon-chain/p2p/testing/p2p.go @@ -3,6 +3,7 @@ package testing import ( "bytes" "context" + "errors" "testing" "time" @@ -14,12 +15,10 @@ import ( "github.com/libp2p/go-libp2p-core/protocol" pubsub "github.com/libp2p/go-libp2p-pubsub" swarmt "github.com/libp2p/go-libp2p-swarm/testing" - "github.com/prysmaticlabs/prysm/beacon-chain/p2p" - "github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" -) -var _ = p2p.P2P(&TestP2P{}) + "github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder" +) // TestP2P represents a p2p implementation that can be used for testing. type TestP2P struct { @@ -108,8 +107,9 @@ func (p *TestP2P) ReceivePubSub(topic string, msg proto.Message) { } // Broadcast a message. -func (p *TestP2P) Broadcast(msg proto.Message) { +func (p *TestP2P) Broadcast(msg proto.Message) error { // TODO(3147): implement + return errors.New("not implemented") } // SetStreamHandler for RPC.