prysm-pulse/beacon-chain/sync/decode_pubsub.go
Preston Van Loon 07e7e030d9
Update PubSub and include topic filter (#7496)
* Update pubsub and fix topicIDs

* WIP filter

* Add suggested code from @bidlocode

* add tests and fix bugs

* more tests

* Wait until state initialized to accept pubsub filtering

* rename for clarity and clarify comment

* fix test builds

* Autofix issues in 2 files

Resolved issues in the following files via DeepSource Autofix:
1. beacon-chain/p2p/pubsub_filter.go
2. beacon-chain/p2p/pubsub_filter_test.go

* @nisdas pr feedback

* pr feedback and fuzz fix

* Update beacon-chain/p2p/pubsub_filter.go

* Must have protocol suffix

* Must have protocol suffix

* gofmt

* rm test, fix panic

* Fix tests

* Add isInitialized check

* Add a few more tests for better coverage

* cache fork digest, make pubsub filter part of the p2p service

* rename service

* gofmt

* Add comment

* fix

Co-authored-by: deepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com>
Co-authored-by: Nishant Das <nishdas93@gmail.com>
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
2020-10-16 07:05:40 +00:00

45 lines
1.1 KiB
Go

package sync
import (
"errors"
"strings"
"github.com/gogo/protobuf/proto"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
)
var errNilPubsubMessage = errors.New("nil pubsub message")
var errInvalidTopic = errors.New("invalid topic format")
func (s *Service) decodePubsubMessage(msg *pubsub.Message) (proto.Message, error) {
if msg == nil || msg.Topic == nil || *msg.Topic == "" {
return nil, errNilPubsubMessage
}
topic := *msg.Topic
topic = strings.TrimSuffix(topic, s.p2p.Encoding().ProtocolSuffix())
topic, err := s.replaceForkDigest(topic)
if err != nil {
return nil, err
}
base, ok := p2p.GossipTopicMappings[topic]
if !ok {
return nil, p2p.ErrMessageNotMapped
}
m := proto.Clone(base)
if err := s.p2p.Encoding().DecodeGossip(msg.Data, m); err != nil {
return nil, err
}
return m, nil
}
// Replaces our fork digest with the formatter.
func (s *Service) replaceForkDigest(topic string) (string, error) {
subStrings := strings.Split(topic, "/")
if len(subStrings) != 4 {
return "", errInvalidTopic
}
subStrings[2] = "%x"
return strings.Join(subStrings, "/"), nil
}