mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-31 15:31:20 +00:00
Add p2p broadcast implementation (#3226)
* add broadcaster impl * change API so broadcast returns an error * change API so broadcast returns an error * add test for message not mapped * lint msg * lint msg
This commit is contained in:
parent
68210eb733
commit
b8e550b1e9
@ -34,12 +34,16 @@ go_library(
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"broadcaster_test.go",
|
||||
"parameter_test.go",
|
||||
"service_test.go",
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//beacon-chain/p2p/testing:go_default_library",
|
||||
"//proto/testing:go_default_library",
|
||||
"//shared/testutil:go_default_library",
|
||||
"@com_github_gogo_protobuf//proto:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
|
||||
],
|
||||
|
@ -1,8 +1,31 @@
|
||||
package p2p
|
||||
|
||||
import "github.com/gogo/protobuf/proto"
|
||||
import (
|
||||
"bytes"
|
||||
"reflect"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// ErrMessageNotMapped occurs on a Broadcast attempt when a message has not been defined in the
|
||||
// GossipTypeMapping.
|
||||
var ErrMessageNotMapped = errors.New("message type is not mapped to a PubSub topic")
|
||||
|
||||
// Broadcast a message to the p2p network.
|
||||
func (s *Service) Broadcast(msg proto.Message) {
|
||||
// TODO(3147): implement
|
||||
func (s *Service) Broadcast(msg proto.Message) error {
|
||||
topic, ok := GossipTypeMapping[reflect.TypeOf(msg)]
|
||||
if !ok {
|
||||
return ErrMessageNotMapped
|
||||
}
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
if _, err := s.Encoding().Encode(buf, msg); err != nil {
|
||||
return errors.Wrap(err, "could not encode message")
|
||||
}
|
||||
|
||||
if err := s.pubsub.Publish(topic+s.Encoding().ProtocolSuffix(), buf.Bytes()); err != nil {
|
||||
return errors.Wrap(err, "could not publish message")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
83
beacon-chain/p2p/broadcaster_test.go
Normal file
83
beacon-chain/p2p/broadcaster_test.go
Normal file
@ -0,0 +1,83 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
|
||||
testpb "github.com/prysmaticlabs/prysm/proto/testing"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil"
|
||||
)
|
||||
|
||||
func TestService_Broadcast(t *testing.T) {
|
||||
p1 := p2ptest.NewTestP2P(t)
|
||||
p2 := p2ptest.NewTestP2P(t)
|
||||
p1.Connect(p2)
|
||||
if len(p1.Host.Network().Peers()) == 0 {
|
||||
t.Fatal("No peers")
|
||||
}
|
||||
|
||||
p := &Service{
|
||||
host: p1.Host,
|
||||
pubsub: p1.PubSub(),
|
||||
}
|
||||
|
||||
msg := &testpb.TestSimpleMessage{
|
||||
Bar: 55,
|
||||
}
|
||||
|
||||
// Set a test gossip mapping for testpb.TestSimpleMessage.
|
||||
GossipTypeMapping[reflect.TypeOf(msg)] = "/testing"
|
||||
|
||||
// External peer subscribes to the topic.
|
||||
topic := "/testing" + p.Encoding().ProtocolSuffix()
|
||||
sub, err := p2.PubSub().Subscribe(topic)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
time.Sleep(50 * time.Millisecond) // libp2p fails without this delay...
|
||||
|
||||
// Async listen for the pubsub, must be before the broadcast.
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
||||
defer cancel()
|
||||
|
||||
incomingMessage, err := sub.Next(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
buf := bytes.NewBuffer(incomingMessage.Data)
|
||||
result := &testpb.TestSimpleMessage{}
|
||||
if err := p.Encoding().Decode(buf, result); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !proto.Equal(result, msg) {
|
||||
t.Errorf("Did not receive expected message, got %+v, wanted %+v", result, msg)
|
||||
}
|
||||
}()
|
||||
|
||||
// Broadcast to peers and wait.
|
||||
if err := p.Broadcast(msg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if testutil.WaitTimeout(&wg, 1*time.Second) {
|
||||
t.Error("Failed to receive pubsub within 1s")
|
||||
}
|
||||
}
|
||||
|
||||
func TestService_Broadcast_ReturnsErr_TopicNotMapped(t *testing.T) {
|
||||
p := Service{}
|
||||
if err := p.Broadcast(&testpb.AddressBook{}); err != ErrMessageNotMapped {
|
||||
t.Fatalf("Expected error %v, got %v", ErrMessageNotMapped, err)
|
||||
}
|
||||
}
|
@ -21,7 +21,7 @@ type P2P interface {
|
||||
|
||||
// Broadcaster broadcasts messages to peers over the p2p pubsub protocol.
|
||||
type Broadcaster interface {
|
||||
Broadcast(proto.Message)
|
||||
Broadcast(proto.Message) error
|
||||
}
|
||||
|
||||
// SetStreamHandler configures p2p to handle streams of a certain topic ID.
|
||||
|
@ -82,7 +82,8 @@ func (s *Service) Status() error {
|
||||
|
||||
// Encoding returns the configured networking encoding.
|
||||
func (s *Service) Encoding() encoder.NetworkEncoding {
|
||||
return nil
|
||||
// TODO(3147): Return based on flag value
|
||||
return &encoder.SszNetworkEncoder{}
|
||||
}
|
||||
|
||||
// PubSub returns the p2p pubsub framework.
|
||||
|
@ -7,7 +7,6 @@ go_library(
|
||||
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing",
|
||||
visibility = ["//beacon-chain:__subpackages__"],
|
||||
deps = [
|
||||
"//beacon-chain/p2p:go_default_library",
|
||||
"//beacon-chain/p2p/encoder:go_default_library",
|
||||
"//proto/beacon/p2p/v1:go_default_library",
|
||||
"@com_github_gogo_protobuf//proto:go_default_library",
|
||||
|
@ -3,6 +3,7 @@ package testing
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -14,12 +15,10 @@ import (
|
||||
"github.com/libp2p/go-libp2p-core/protocol"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
|
||||
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
|
||||
)
|
||||
|
||||
var _ = p2p.P2P(&TestP2P{})
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
|
||||
)
|
||||
|
||||
// TestP2P represents a p2p implementation that can be used for testing.
|
||||
type TestP2P struct {
|
||||
@ -108,8 +107,9 @@ func (p *TestP2P) ReceivePubSub(topic string, msg proto.Message) {
|
||||
}
|
||||
|
||||
// Broadcast a message.
|
||||
func (p *TestP2P) Broadcast(msg proto.Message) {
|
||||
func (p *TestP2P) Broadcast(msg proto.Message) error {
|
||||
// TODO(3147): implement
|
||||
return errors.New("not implemented")
|
||||
}
|
||||
|
||||
// SetStreamHandler for RPC.
|
||||
|
Loading…
Reference in New Issue
Block a user