mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2025-01-11 04:00:05 +00:00
Handle Pubsub Panics (#4350)
* handle panics * lint * gaz * preston's review
This commit is contained in:
parent
53b8eb57ee
commit
b337a5720c
@ -46,6 +46,7 @@ go_library(
|
||||
"//shared:go_default_library",
|
||||
"//shared/bls:go_default_library",
|
||||
"//shared/bytesutil:go_default_library",
|
||||
"//shared/messagehandler:go_default_library",
|
||||
"//shared/params:go_default_library",
|
||||
"//shared/roughtime:go_default_library",
|
||||
"//shared/runutil:go_default_library",
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
|
||||
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
|
||||
"github.com/prysmaticlabs/prysm/shared/messagehandler"
|
||||
"github.com/prysmaticlabs/prysm/shared/roughtime"
|
||||
"github.com/prysmaticlabs/prysm/shared/traceutil"
|
||||
"go.opencensus.io/trace"
|
||||
@ -175,6 +176,7 @@ func (r *Service) subscribe(topic string, validator pubsub.Validator, handle sub
|
||||
// appropriate counter if the particular message fails to validate.
|
||||
func wrapAndReportValidation(topic string, v pubsub.Validator) (string, pubsub.Validator) {
|
||||
return topic, func(ctx context.Context, pid peer.ID, msg *pubsub.Message) bool {
|
||||
defer messagehandler.HandlePanic(ctx, msg)
|
||||
b := v(ctx, pid, msg)
|
||||
if !b {
|
||||
messageFailedValidationCounter.WithLabelValues(topic).Inc()
|
||||
|
@ -17,7 +17,23 @@ var log = logrus.WithField("prefix", "message-handler")
|
||||
// SafelyHandleMessage will recover and log any panic that occurs from the
|
||||
// function argument.
|
||||
func SafelyHandleMessage(ctx context.Context, fn func(ctx context.Context, message proto.Message) error, msg proto.Message) {
|
||||
defer func() {
|
||||
defer HandlePanic(ctx, msg)
|
||||
|
||||
// Fingers crossed that it doesn't panic...
|
||||
if err := fn(ctx, msg); err != nil {
|
||||
// Report any error on the span, if one exists.
|
||||
if span := trace.FromContext(ctx); span != nil {
|
||||
span.SetStatus(trace.Status{
|
||||
Code: trace.StatusCodeInternal,
|
||||
Message: err.Error(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// HandlePanic returns a panic handler function that is used to
|
||||
// capture a panic.
|
||||
func HandlePanic(ctx context.Context, msg proto.Message) {
|
||||
if r := recover(); r != nil {
|
||||
printedMsg := noMsgData
|
||||
if msg != nil {
|
||||
@ -40,16 +56,4 @@ func SafelyHandleMessage(ctx context.Context, fn func(ctx context.Context, messa
|
||||
})
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Fingers crossed that it doesn't panic...
|
||||
if err := fn(ctx, msg); err != nil {
|
||||
// Report any error on the span, if one exists.
|
||||
if span := trace.FromContext(ctx); span != nil {
|
||||
span.SetStatus(trace.Status{
|
||||
Code: trace.StatusCodeInternal,
|
||||
Message: err.Error(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user