2019-08-16 17:13:04 +00:00
|
|
|
package sync
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"fmt"
|
2019-08-24 01:15:02 +00:00
|
|
|
"runtime/debug"
|
2019-08-22 05:34:25 +00:00
|
|
|
"time"
|
2019-08-16 17:13:04 +00:00
|
|
|
|
|
|
|
"github.com/gogo/protobuf/proto"
|
2019-12-20 03:18:08 +00:00
|
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
|
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
2019-12-07 17:57:26 +00:00
|
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
|
|
|
|
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
|
2019-08-16 17:13:04 +00:00
|
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
|
2019-09-17 21:14:51 +00:00
|
|
|
"github.com/prysmaticlabs/prysm/shared/roughtime"
|
2019-09-29 18:48:55 +00:00
|
|
|
"github.com/prysmaticlabs/prysm/shared/traceutil"
|
2019-09-03 18:06:35 +00:00
|
|
|
"go.opencensus.io/trace"
|
2019-08-16 17:13:04 +00:00
|
|
|
)
|
|
|
|
|
2019-09-20 17:54:32 +00:00
|
|
|
const pubsubMessageTimeout = 10 * time.Second
|
2019-08-22 05:34:25 +00:00
|
|
|
|
2019-08-16 17:13:04 +00:00
|
|
|
// subHandler represents handler for a given subscription.
|
|
|
|
type subHandler func(context.Context, proto.Message) error
|
|
|
|
|
2019-12-20 03:18:08 +00:00
|
|
|
// noopValidator is a no-op that only decodes the message, but does not check its contents.
|
|
|
|
func (r *Service) noopValidator(ctx context.Context, _ peer.ID, msg *pubsub.Message) bool {
|
|
|
|
m, err := r.decodePubsubMessage(msg)
|
|
|
|
if err != nil {
|
|
|
|
log.WithError(err).Error("Failed to decode message")
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
msg.ValidatorData = m
|
|
|
|
return true
|
2019-08-16 17:13:04 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Register PubSub subscribers
|
2019-12-17 01:53:55 +00:00
|
|
|
func (r *Service) registerSubscribers() {
|
2019-09-17 21:14:51 +00:00
|
|
|
go func() {
|
|
|
|
// Wait until chain start.
|
2019-12-07 17:57:26 +00:00
|
|
|
stateChannel := make(chan *feed.Event, 1)
|
2019-11-23 03:35:47 +00:00
|
|
|
stateSub := r.stateNotifier.StateFeed().Subscribe(stateChannel)
|
|
|
|
defer stateSub.Unsubscribe()
|
|
|
|
for r.chainStarted == false {
|
|
|
|
select {
|
|
|
|
case event := <-stateChannel:
|
2019-12-07 17:57:26 +00:00
|
|
|
if event.Type == statefeed.Initialized {
|
|
|
|
data := event.Data.(*statefeed.InitializedData)
|
2019-11-23 03:35:47 +00:00
|
|
|
log.WithField("starttime", data.StartTime).Debug("Received state initialized event")
|
|
|
|
if data.StartTime.After(roughtime.Now()) {
|
2019-11-23 11:15:02 +00:00
|
|
|
stateSub.Unsubscribe()
|
2019-11-23 03:35:47 +00:00
|
|
|
time.Sleep(roughtime.Until(data.StartTime))
|
|
|
|
}
|
|
|
|
r.chainStarted = true
|
|
|
|
}
|
|
|
|
case <-r.ctx.Done():
|
|
|
|
log.Debug("Context closed, exiting goroutine")
|
|
|
|
return
|
|
|
|
case err := <-stateSub.Err():
|
|
|
|
log.WithError(err).Error("Subscription to state notifier failed")
|
|
|
|
return
|
|
|
|
}
|
2019-09-17 21:14:51 +00:00
|
|
|
}
|
|
|
|
}()
|
2019-08-16 17:13:04 +00:00
|
|
|
r.subscribe(
|
|
|
|
"/eth2/beacon_block",
|
2019-08-22 18:11:52 +00:00
|
|
|
r.validateBeaconBlockPubSub,
|
|
|
|
r.beaconBlockSubscriber,
|
2019-08-16 17:13:04 +00:00
|
|
|
)
|
|
|
|
r.subscribe(
|
|
|
|
"/eth2/beacon_attestation",
|
2019-08-23 19:46:04 +00:00
|
|
|
r.validateBeaconAttestation,
|
2019-08-23 21:34:03 +00:00
|
|
|
r.beaconAttestationSubscriber,
|
2019-08-16 17:13:04 +00:00
|
|
|
)
|
|
|
|
r.subscribe(
|
|
|
|
"/eth2/voluntary_exit",
|
2019-08-18 15:33:58 +00:00
|
|
|
r.validateVoluntaryExit,
|
|
|
|
r.voluntaryExitSubscriber,
|
2019-08-16 17:13:04 +00:00
|
|
|
)
|
|
|
|
r.subscribe(
|
|
|
|
"/eth2/proposer_slashing",
|
2019-08-22 14:31:56 +00:00
|
|
|
r.validateProposerSlashing,
|
|
|
|
r.proposerSlashingSubscriber,
|
2019-08-16 17:13:04 +00:00
|
|
|
)
|
|
|
|
r.subscribe(
|
|
|
|
"/eth2/attester_slashing",
|
2019-08-22 05:34:25 +00:00
|
|
|
r.validateAttesterSlashing,
|
|
|
|
r.attesterSlashingSubscriber,
|
2019-08-16 17:13:04 +00:00
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
// subscribe to a given topic with a given validator and subscription handler.
|
|
|
|
// The base protobuf message is used to initialize new messages for decoding.
|
2019-12-20 03:18:08 +00:00
|
|
|
func (r *Service) subscribe(topic string, validator pubsub.Validator, handle subHandler) {
|
2019-08-16 17:13:04 +00:00
|
|
|
base := p2p.GossipTopicMappings[topic]
|
|
|
|
if base == nil {
|
|
|
|
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic))
|
|
|
|
}
|
|
|
|
|
|
|
|
topic += r.p2p.Encoding().ProtocolSuffix()
|
|
|
|
log := log.WithField("topic", topic)
|
|
|
|
|
2019-12-20 03:18:08 +00:00
|
|
|
if err := r.p2p.PubSub().RegisterTopicValidator(wrapAndReportValidation(topic, validator)); err != nil {
|
|
|
|
// Configuring a topic validator would only return an error as a result of misconfiguration
|
|
|
|
// and is not a runtime concern.
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
|
2019-08-16 17:13:04 +00:00
|
|
|
sub, err := r.p2p.PubSub().Subscribe(topic)
|
|
|
|
if err != nil {
|
|
|
|
// Any error subscribing to a PubSub topic would be the result of a misconfiguration of
|
|
|
|
// libp2p PubSub library. This should not happen at normal runtime, unless the config
|
|
|
|
// changes to a fatal configuration.
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Pipeline decodes the incoming subscription data, runs the validation, and handles the
|
|
|
|
// message.
|
2019-12-20 03:18:08 +00:00
|
|
|
pipeline := func(msg *pubsub.Message) {
|
2019-09-29 18:48:55 +00:00
|
|
|
ctx, _ := context.WithTimeout(context.Background(), pubsubMessageTimeout)
|
|
|
|
ctx, span := trace.StartSpan(ctx, "sync.pubsub")
|
|
|
|
defer span.End()
|
|
|
|
|
2019-08-24 01:15:02 +00:00
|
|
|
defer func() {
|
|
|
|
if r := recover(); r != nil {
|
2019-09-29 18:48:55 +00:00
|
|
|
traceutil.AnnotateError(span, fmt.Errorf("panic occurred: %v", r))
|
2019-08-24 01:15:02 +00:00
|
|
|
log.WithField("error", r).Error("Panic occurred")
|
|
|
|
debug.PrintStack()
|
|
|
|
}
|
|
|
|
}()
|
2019-09-29 18:48:55 +00:00
|
|
|
|
2019-09-03 18:06:35 +00:00
|
|
|
span.AddAttributes(trace.StringAttribute("topic", topic))
|
2019-10-03 16:33:16 +00:00
|
|
|
|
2019-12-20 03:18:08 +00:00
|
|
|
if msg.ValidatorData == nil {
|
|
|
|
log.Error("Received nil message on pubsub")
|
|
|
|
messageFailedProcessingCounter.WithLabelValues(topic).Inc()
|
2019-10-01 15:13:04 +00:00
|
|
|
return
|
|
|
|
}
|
2019-08-16 17:13:04 +00:00
|
|
|
|
2019-12-20 03:18:08 +00:00
|
|
|
if err := handle(ctx, msg.ValidatorData.(proto.Message)); err != nil {
|
2019-09-29 18:48:55 +00:00
|
|
|
traceutil.AnnotateError(span, err)
|
2019-08-16 17:13:04 +00:00
|
|
|
log.WithError(err).Error("Failed to handle p2p pubsub")
|
2019-09-30 05:23:19 +00:00
|
|
|
messageFailedProcessingCounter.WithLabelValues(topic).Inc()
|
2019-08-16 17:13:04 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// The main message loop for receiving incoming messages from this subscription.
|
|
|
|
messageLoop := func() {
|
|
|
|
for {
|
|
|
|
msg, err := sub.Next(r.ctx)
|
|
|
|
if err != nil {
|
|
|
|
log.WithError(err).Error("Subscription next failed")
|
|
|
|
// TODO(3147): Mark status unhealthy.
|
|
|
|
return
|
|
|
|
}
|
2019-09-17 21:14:51 +00:00
|
|
|
if !r.chainStarted {
|
|
|
|
messageReceivedBeforeChainStartCounter.WithLabelValues(topic + r.p2p.Encoding().ProtocolSuffix()).Inc()
|
|
|
|
continue
|
|
|
|
}
|
2019-08-24 18:41:24 +00:00
|
|
|
|
2019-08-28 15:14:22 +00:00
|
|
|
messageReceivedCounter.WithLabelValues(topic + r.p2p.Encoding().ProtocolSuffix()).Inc()
|
|
|
|
|
2019-12-20 03:18:08 +00:00
|
|
|
go pipeline(msg)
|
2019-08-16 17:13:04 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
go messageLoop()
|
|
|
|
}
|
2019-12-20 03:18:08 +00:00
|
|
|
|
|
|
|
// Wrap the pubsub validator with a metric monitoring function. This function increments the
|
|
|
|
// appropriate counter if the particular message fails to validate.
|
|
|
|
func wrapAndReportValidation(topic string, v pubsub.Validator) (string, pubsub.Validator) {
|
|
|
|
return topic, func(ctx context.Context, pid peer.ID, msg *pubsub.Message) bool {
|
|
|
|
b := v(ctx, pid, msg)
|
|
|
|
if !b {
|
|
|
|
messageFailedValidationCounter.WithLabelValues(topic).Inc()
|
|
|
|
}
|
|
|
|
return b
|
|
|
|
}
|
|
|
|
}
|