diff --git a/beacon-chain/sync/rpc.go b/beacon-chain/sync/rpc.go index 8e2d6b376..b801ae5bc 100644 --- a/beacon-chain/sync/rpc.go +++ b/beacon-chain/sync/rpc.go @@ -9,6 +9,7 @@ import ( "github.com/libp2p/go-libp2p-core/network" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/roughtime" + "github.com/prysmaticlabs/prysm/shared/traceutil" "go.opencensus.io/trace" ) @@ -62,6 +63,8 @@ func (r *RegularSync) registerRPC(topic string, base interface{}, handle rpcHand ctx, span := trace.StartSpan(ctx, "sync.rpc") defer span.End() span.AddAttributes(trace.StringAttribute("topic", topic)) + span.AddAttributes(trace.StringAttribute("peer", stream.Conn().RemotePeer().Pretty())) + log := log.WithField("peer", stream.Conn().RemotePeer().Pretty()) if err := stream.SetReadDeadline(roughtime.Now().Add(ttfbTimeout)); err != nil { log.WithError(err).Error("Could not set stream read deadline") @@ -76,21 +79,25 @@ func (r *RegularSync) registerRPC(topic string, base interface{}, handle rpcHand msg := reflect.New(t.Elem()) if err := r.p2p.Encoding().DecodeWithLength(stream, msg.Interface()); err != nil { log.WithError(err).Error("Failed to decode stream message") + traceutil.AnnotateError(span, err) return } if err := handle(ctx, msg.Interface(), stream); err != nil { messageFailedProcessingCounter.WithLabelValues(topic).Inc() log.WithError(err).Error("Failed to handle p2p RPC") + traceutil.AnnotateError(span, err) } } else { msg := reflect.New(t) if err := r.p2p.Encoding().DecodeWithLength(stream, msg.Interface()); err != nil { log.WithError(err).Error("Failed to decode stream message") + traceutil.AnnotateError(span, err) return } if err := handle(ctx, msg.Elem().Interface(), stream); err != nil { messageFailedProcessingCounter.WithLabelValues(topic).Inc() log.WithError(err).Error("Failed to handle p2p RPC") + traceutil.AnnotateError(span, err) } }