p2p: refactor peer errors to propagate with a DiscReason (#8089)

Improve p2p error handling to propagate errors
from the origin up the call chain the Server peer removal code
using a new PeerError type containing a DiscReason and a more detailed
description.

The origin can be tracked down using PeerErrorCode (code) and DiscReason
(reason)
which looks like this in the log:

> [TRACE] [08-28|16:33:40.205] Removing p2p peer peercount=0
url=enode://d399f4b...@1.2.3.4:30303 duration=6.901ms
err="PeerError(code=remote disconnect reason, reason=too many peers,
err=<nil>, message=Peer.run got a remote DiscReason)"
This commit is contained in:
battlmonstr 2023-08-31 17:45:23 +02:00 committed by GitHub
parent 51b7b0ce1d
commit 340b9811b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 201 additions and 177 deletions

View File

@ -15,20 +15,24 @@ func readAndValidatePeerStatusMessage(
status *proto_sentry.StatusData,
version uint,
minVersion uint,
) (*eth.StatusPacket, error) {
) (*eth.StatusPacket, *p2p.PeerError) {
msg, err := rw.ReadMsg()
if err != nil {
return nil, err
return nil, p2p.NewPeerError(p2p.PeerErrorStatusReceive, p2p.DiscNetworkError, err, "readAndValidatePeerStatusMessage rw.ReadMsg error")
}
reply, err := tryDecodeStatusMessage(&msg)
msg.Discard()
if err != nil {
return nil, err
return nil, p2p.NewPeerError(p2p.PeerErrorStatusDecode, p2p.DiscProtocolError, err, "readAndValidatePeerStatusMessage tryDecodeStatusMessage error")
}
err = checkPeerStatusCompatibility(reply, status, version, minVersion)
return reply, err
if err != nil {
return nil, p2p.NewPeerError(p2p.PeerErrorStatusIncompatible, p2p.DiscUselessPeer, err, "readAndValidatePeerStatusMessage checkPeerStatusCompatibility error")
}
return reply, nil
}
func tryDecodeStatusMessage(msg *p2p.Msg) (*eth.StatusPacket, error) {
@ -48,8 +52,6 @@ func tryDecodeStatusMessage(msg *p2p.Msg) (*eth.StatusPacket, error) {
return &reply, nil
}
var NetworkIdMissmatchErr = fmt.Errorf("network id does not match")
func checkPeerStatusCompatibility(
reply *eth.StatusPacket,
status *proto_sentry.StatusData,
@ -58,7 +60,7 @@ func checkPeerStatusCompatibility(
) error {
networkID := status.NetworkId
if reply.NetworkID != networkID {
return fmt.Errorf("%w: theirs %d, ours %d", NetworkIdMissmatchErr, reply.NetworkID, networkID)
return fmt.Errorf("network id does not match: theirs %d, ours %d", reply.NetworkID, networkID)
}
if uint(reply.ProtocolVersion) > version {

View File

@ -62,10 +62,14 @@ type PeerInfo struct {
rw p2p.MsgReadWriter
protocol uint
removed chan struct{} // close this channel on remove
ctx context.Context
ctxCancel context.CancelFunc
removeOnce sync.Once
ctx context.Context
ctxCancel context.CancelFunc
// this channel is closed on Remove()
removed chan struct{}
removeReason *p2p.PeerError
removeOnce sync.Once
// each peer has own worker (goroutine) - all funcs from this queue will execute on this worker
// if this queue is full (means peer is slow) - old messages will be dropped
// channel closed on peer remove
@ -116,7 +120,14 @@ func (bp *PeersByMinBlock) Pop() interface{} {
func NewPeerInfo(peer *p2p.Peer, rw p2p.MsgReadWriter) *PeerInfo {
ctx, cancel := context.WithCancel(context.Background())
p := &PeerInfo{peer: peer, rw: rw, removed: make(chan struct{}), tasks: make(chan func(), 16), ctx: ctx, ctxCancel: cancel}
p := &PeerInfo{
peer: peer,
rw: rw,
removed: make(chan struct{}),
tasks: make(chan func(), 16),
ctx: ctx,
ctxCancel: cancel,
}
p.lock.RLock()
t := p.tasks
@ -193,10 +204,9 @@ func (pi *PeerInfo) LatestDeadline() time.Time {
return pi.latestDealine
}
func (pi *PeerInfo) Remove() {
pi.lock.Lock()
defer pi.lock.Unlock()
func (pi *PeerInfo) Remove(reason *p2p.PeerError) {
pi.removeOnce.Do(func() {
pi.removeReason = reason
close(pi.removed)
pi.ctxCancel()
})
@ -230,12 +240,12 @@ func (pi *PeerInfo) Async(f func(), logger log.Logger) {
}
}
func (pi *PeerInfo) Removed() bool {
func (pi *PeerInfo) RemoveReason() *p2p.PeerError {
select {
case <-pi.removed:
return true
return pi.removeReason
default:
return false
return nil
}
}
@ -273,9 +283,9 @@ func handShake(
rw p2p.MsgReadWriter,
version uint,
minVersion uint,
) (*libcommon.Hash, error) {
) (*libcommon.Hash, *p2p.PeerError) {
// Send out own handshake in a new thread
errChan := make(chan error, 2)
errChan := make(chan *p2p.PeerError, 2)
resultChan := make(chan *eth.StatusPacket, 1)
ourTD := gointerfaces.ConvertH256ToUint256Int(status.TotalDifficulty)
@ -294,11 +304,11 @@ func handShake(
}
err := p2p.Send(rw, eth.StatusMsg, status)
if err != nil {
err = fmt.Errorf("sentry.handShake failed to send eth Status: %w", err)
if err == nil {
errChan <- nil
} else {
errChan <- p2p.NewPeerError(p2p.PeerErrorStatusSend, p2p.DiscNetworkError, err, "sentry.handShake failed to send eth Status")
}
errChan <- err
}()
go func() {
@ -307,11 +317,10 @@ func handShake(
if err == nil {
resultChan <- status
errChan <- nil
} else {
err = fmt.Errorf("sentry.handShake failed to receive/validate a remote peer eth Status: %w", err)
errChan <- err
}
errChan <- err
}()
timeout := time.NewTimer(handshakeTimeout)
@ -323,9 +332,9 @@ func handShake(
return nil, err
}
case <-timeout.C:
return nil, p2p.DiscReadTimeout
return nil, p2p.NewPeerError(p2p.PeerErrorStatusHandshakeTimeout, p2p.DiscReadTimeout, nil, "sentry.handShake timeout")
case <-ctx.Done():
return nil, ctx.Err()
return nil, p2p.NewPeerError(p2p.PeerErrorDiscReason, p2p.DiscQuitting, ctx.Err(), "sentry.handShake ctx.Done")
}
}
@ -342,7 +351,7 @@ func runPeer(
send func(msgId proto_sentry.MessageId, peerID [64]byte, b []byte),
hasSubscribers func(msgId proto_sentry.MessageId) bool,
logger log.Logger,
) error {
) *p2p.PeerError {
printTime := time.Now().Add(time.Minute)
peerPrinted := false
defer func() {
@ -355,6 +364,7 @@ func runPeer(
logger.Trace("Peer disconnected", "id", peerID, "name", peerInfo.peer.Fullname())
}
}()
for {
if !peerPrinted {
if time.Now().After(printTime) {
@ -363,25 +373,27 @@ func runPeer(
}
}
if err := libcommon.Stopped(ctx.Done()); err != nil {
return p2p.NewPeerError(p2p.PeerErrorDiscReason, p2p.DiscQuitting, ctx.Err(), "sentry.runPeer: context stopped")
}
if err := peerInfo.RemoveReason(); err != nil {
return err
}
if peerInfo.Removed() {
return fmt.Errorf("peer removed")
}
msg, err := rw.ReadMsg()
if err != nil {
return fmt.Errorf("reading message: %w", err)
return p2p.NewPeerError(p2p.PeerErrorMessageReceive, p2p.DiscNetworkError, err, "sentry.runPeer: ReadMsg error")
}
if msg.Size > eth.ProtocolMaxMsgSize {
msg.Discard()
return fmt.Errorf("message is too large %d, limit %d", msg.Size, eth.ProtocolMaxMsgSize)
return p2p.NewPeerError(p2p.PeerErrorMessageSizeLimit, p2p.DiscSubprotocolError, nil, fmt.Sprintf("sentry.runPeer: message is too large %d, limit %d", msg.Size, eth.ProtocolMaxMsgSize))
}
givePermit := false
switch msg.Code {
case eth.StatusMsg:
msg.Discard()
// Status messages should never arrive after the handshake
return fmt.Errorf("uncontrolled status message")
return p2p.NewPeerError(p2p.PeerErrorStatusUnexpected, p2p.DiscSubprotocolError, nil, "sentry.runPeer: unexpected status message")
case eth.GetBlockHeadersMsg:
if !hasSubscribers(eth.ToProto[protocol][msg.Code]) {
continue
@ -424,7 +436,7 @@ func runPeer(
case eth.GetNodeDataMsg:
if protocol >= direct.ETH67 {
msg.Discard()
return fmt.Errorf("unexpected GetNodeDataMsg from %s in eth/%d", peerID, protocol)
return p2p.NewPeerError(p2p.PeerErrorMessageObsolete, p2p.DiscSubprotocolError, nil, fmt.Sprintf("unexpected GetNodeDataMsg from %s in eth/%d", peerID, protocol))
}
if !hasSubscribers(eth.ToProto[protocol][msg.Code]) {
continue
@ -581,12 +593,11 @@ func NewGrpcServer(ctx context.Context, dialCandidates func() enode.Iterator, re
Version: protocol,
Length: 17,
DialCandidates: disc,
Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) *p2p.PeerError {
peerID := peer.Pubkey()
printablePeerID := hex.EncodeToString(peerID[:])[:20]
if ss.getPeer(peerID) != nil {
logger.Trace("[p2p] peer already has connection", "peerId", printablePeerID)
return nil
return p2p.NewPeerError(p2p.PeerErrorDiscReason, p2p.DiscAlreadyConnected, nil, "peer already has connection")
}
logger.Trace("[p2p] start with peer", "peerId", printablePeerID)
@ -599,20 +610,12 @@ func NewGrpcServer(ctx context.Context, dialCandidates func() enode.Iterator, re
status := ss.GetStatus()
if status == nil {
err := fmt.Errorf("could not get status message from core for peer %s connection", printablePeerID)
logger.Trace("[p2p] Handshake failure", "peer", printablePeerID, "local", peer.LocalAddr(), "remote", peer.RemoteAddr(), "err", err)
return err
return p2p.NewPeerError(p2p.PeerErrorLocalStatusNeeded, p2p.DiscProtocolError, nil, "could not get status message from core")
}
peerBestHash, err := handShake(ctx, status, rw, protocol, protocol)
if err != nil {
if errors.Is(err, NetworkIdMissmatchErr) || errors.Is(err, io.EOF) || errors.Is(err, p2p.ErrShuttingDown) {
logger.Trace("[p2p] Handshake failure", "peer", printablePeerID, "err", err)
} else {
logger.Debug("[p2p] Handshake failure", "peer", printablePeerID, "err", err)
}
return fmt.Errorf("[p2p] handshake to peer %s: %w", printablePeerID, err)
return err
}
// handshake is successful
@ -620,10 +623,9 @@ func NewGrpcServer(ctx context.Context, dialCandidates func() enode.Iterator, re
ss.GoodPeers.Store(peerID, peerInfo)
ss.sendNewPeerToClients(gointerfaces.ConvertHashToH512(peerID))
err = ss.startSync(ctx, *peerBestHash, peerID)
getBlockHeadersErr := ss.getBlockHeaders(ctx, *peerBestHash, peerID)
if err != nil {
logger.Error("[p2p] p2p.Protocol.Run startSync failure", "peer", printablePeerID, "err", err)
return fmt.Errorf("[p2p] startSync for peer %s: %w", printablePeerID, err)
return p2p.NewPeerError(p2p.PeerErrorFirstMessageSend, p2p.DiscNetworkError, getBlockHeadersErr, "p2p.Protocol.Run getBlockHeaders failure")
}
err = runPeer(
@ -635,10 +637,9 @@ func NewGrpcServer(ctx context.Context, dialCandidates func() enode.Iterator, re
ss.send,
ss.hasSubscribers,
logger,
) // runPeer never returns a nil error
logger.Trace("[p2p] error while running peer", "peerId", printablePeerID, "err", err)
)
ss.sendGonePeerToClients(gointerfaces.ConvertHashToH512(peerID))
return nil
return err
},
NodeInfo: func() interface{} {
return readNodeInfo()
@ -718,11 +719,11 @@ func (ss *GrpcServer) getPeer(peerID [64]byte) (peerInfo *PeerInfo) {
return nil
}
func (ss *GrpcServer) removePeer(peerID [64]byte) {
func (ss *GrpcServer) removePeer(peerID [64]byte, reason *p2p.PeerError) {
if value, ok := ss.GoodPeers.LoadAndDelete(peerID); ok {
peerInfo := value.(*PeerInfo)
if peerInfo != nil {
peerInfo.Remove()
peerInfo.Remove(reason)
}
}
}
@ -731,11 +732,8 @@ func (ss *GrpcServer) writePeer(logPrefix string, peerInfo *PeerInfo, msgcode ui
peerInfo.Async(func() {
err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(data)), Payload: bytes.NewReader(data)})
if err != nil {
peerInfo.Remove()
peerInfo.Remove(p2p.NewPeerError(p2p.PeerErrorMessageSend, p2p.DiscNetworkError, err, fmt.Sprintf("%s writePeer msgcode=%d", logPrefix, msgcode)))
ss.GoodPeers.Delete(peerInfo.ID())
if !errors.Is(err, p2p.ErrShuttingDown) {
ss.logger.Debug(logPrefix, "msgcode", msgcode, "err", err)
}
} else {
if ttl > 0 {
peerInfo.AddDeadline(time.Now().Add(ttl))
@ -744,7 +742,7 @@ func (ss *GrpcServer) writePeer(logPrefix string, peerInfo *PeerInfo, msgcode ui
}, ss.logger)
}
func (ss *GrpcServer) startSync(ctx context.Context, bestHash libcommon.Hash, peerID [64]byte) error {
func (ss *GrpcServer) getBlockHeaders(ctx context.Context, bestHash libcommon.Hash, peerID [64]byte) error {
b, err := rlp.EncodeToBytes(&eth.GetBlockHeadersPacket66{
RequestId: rand.Uint64(), // nolint: gosec
GetBlockHeadersPacket: &eth.GetBlockHeadersPacket{
@ -755,7 +753,7 @@ func (ss *GrpcServer) startSync(ctx context.Context, bestHash libcommon.Hash, pe
},
})
if err != nil {
return fmt.Errorf("startSync encode packet failed: %w", err)
return fmt.Errorf("GrpcServer.getBlockHeaders encode packet failed: %w", err)
}
if _, err := ss.SendMessageById(ctx, &proto_sentry.SendMessageByIdRequest{
PeerId: gointerfaces.ConvertHashToH512(peerID),
@ -774,9 +772,7 @@ func (ss *GrpcServer) PenalizePeer(_ context.Context, req *proto_sentry.Penalize
peerID := ConvertH512ToPeerID(req.PeerId)
peerInfo := ss.getPeer(peerID)
if ss.statusData != nil && peerInfo != nil && !peerInfo.peer.Info().Network.Static && !peerInfo.peer.Info().Network.Trusted {
ss.removePeer(peerID)
printablePeerID := hex.EncodeToString(peerID[:])[:8]
ss.logger.Debug("[p2p] Penalized peer", "peerId", printablePeerID, "name", peerInfo.peer.Name())
ss.removePeer(peerID, p2p.NewPeerError(p2p.PeerErrorDiscReason, p2p.DiscRequested, nil, "penalized peer"))
}
return &emptypb.Empty{}, nil
}

View File

@ -59,7 +59,7 @@ func startHandshake(
status *proto_sentry.StatusData,
pipe *p2p.MsgPipeRW,
protocolVersion uint,
errChan chan error,
errChan chan *p2p.PeerError,
) {
go func() {
_, err := handShake(ctx, status, pipe, protocolVersion, protocolVersion)
@ -110,7 +110,7 @@ func testForkIDSplit(t *testing.T, protocol uint) {
defer p2pNoFork.Close()
defer p2pProFork.Close()
errc := make(chan error, 2)
errc := make(chan *p2p.PeerError, 2)
startHandshake(ctx, s1.GetStatus(), p2pNoFork, protocol, errc)
startHandshake(ctx, s2.GetStatus(), p2pProFork, protocol, errc)

View File

@ -56,7 +56,7 @@ type Msg struct {
func (msg Msg) Decode(val interface{}) error {
s := rlp.NewStream(msg.Payload, uint64(msg.Size))
if err := s.Decode(val); err != nil {
return newPeerError(errInvalidMsg, "(code %x) (size %d) %v", msg.Code, msg.Size, err)
return NewPeerError(PeerErrorInvalidMessage, DiscProtocolError, err, fmt.Sprintf("(code %x) (size %d)", msg.Code, msg.Size))
}
return nil
}

View File

@ -112,9 +112,9 @@ type Peer struct {
created mclock.AbsTime
wg sync.WaitGroup
protoErr chan error
protoErr chan *PeerError
closed chan struct{}
disc chan DiscReason
disc chan *PeerError
// events receives message send / receive events if set
events *event.Feed
@ -192,9 +192,9 @@ func (p *Peer) LocalAddr() net.Addr {
// Disconnect terminates the peer connection with the given reason.
// It returns immediately and does not wait until the connection is closed.
func (p *Peer) Disconnect(reason DiscReason) {
func (p *Peer) Disconnect(err *PeerError) {
select {
case p.disc <- reason:
case p.disc <- err:
case <-p.closed:
}
}
@ -216,8 +216,8 @@ func newPeer(logger log.Logger, conn *conn, protocols []Protocol, pubkey [64]byt
rw: conn,
running: protomap,
created: mclock.Now(),
disc: make(chan DiscReason),
protoErr: make(chan error, len(protomap)+1), // protocols + pingLoop
disc: make(chan *PeerError),
protoErr: make(chan *PeerError, len(protomap)+1), // protocols + pingLoop
closed: make(chan struct{}),
log: logger.New("id", conn.node.ID(), "conn", conn.flags),
pubkey: pubkey,
@ -230,13 +230,13 @@ func (p *Peer) Log() log.Logger {
return p.log
}
func (p *Peer) run() (remoteRequested bool, err error) {
func (p *Peer) run() (peerErr *PeerError) {
var (
writeStart = make(chan struct{}, 1)
writeErr = make(chan error, 1)
readErr = make(chan error, 1)
reason DiscReason // sent to the peer
)
p.wg.Add(2)
go p.readLoop(readErr)
go p.pingLoop()
@ -245,39 +245,33 @@ func (p *Peer) run() (remoteRequested bool, err error) {
writeStart <- struct{}{}
p.startProtocols(writeStart, writeErr)
defer func() {
close(p.closed)
p.rw.close(peerErr.Reason)
p.wg.Wait()
}()
// Wait for an error or disconnect.
loop:
for {
select {
case err = <-writeErr:
// A write finished. Allow the next write to start if
// there was no error.
case err := <-writeErr:
if err != nil {
reason = DiscNetworkError
break loop
return NewPeerError(PeerErrorDiscReason, DiscNetworkError, err, "Peer.run writeErr")
}
// Allow the next write to start if there was no error.
writeStart <- struct{}{}
case err = <-readErr:
if r, ok := err.(DiscReason); ok {
remoteRequested = true
reason = r
case err := <-readErr:
if reason, ok := err.(DiscReason); ok {
return NewPeerError(PeerErrorDiscReasonRemote, reason, nil, "Peer.run got a remote DiscReason")
} else {
reason = DiscNetworkError
return NewPeerError(PeerErrorDiscReason, DiscNetworkError, err, "Peer.run readErr")
}
break loop
case err = <-p.protoErr:
reason = discReasonForError(err)
break loop
case err = <-p.disc:
reason = discReasonForError(err)
break loop
case err := <-p.protoErr:
return err
case err := <-p.disc:
return err
}
}
close(p.closed)
p.rw.close(reason)
p.wg.Wait()
return remoteRequested, err
}
func (p *Peer) pingLoop() {
@ -289,7 +283,7 @@ func (p *Peer) pingLoop() {
select {
case <-ping.C:
if err := SendItems(p.rw, pingMsg); err != nil {
p.protoErr <- err
p.protoErr <- NewPeerError(PeerErrorPingFailure, DiscNetworkError, err, "Failed to send pingMsg")
return
}
ping.Reset(pingInterval)
@ -407,11 +401,9 @@ func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error)
defer debug.LogPanic()
defer p.wg.Done()
err := proto.Run(p, rw)
// only unit test protocols can return nil
if err == nil {
p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version))
err = errProtocolReturned
} else if err != io.EOF {
p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err)
err = NewPeerError(PeerErrorTest, DiscQuitting, nil, fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version))
}
p.protoErr <- err
}()
@ -426,7 +418,7 @@ func (p *Peer) getProto(code uint64) (*protoRW, error) {
return proto, nil
}
}
return nil, newPeerError(errInvalidMsgCode, "%d", code)
return nil, NewPeerError(PeerErrorInvalidMessageCode, DiscProtocolError, nil, fmt.Sprintf("code=%d", code))
}
type protoRW struct {
@ -441,7 +433,7 @@ type protoRW struct {
func (rw *protoRW) WriteMsg(msg Msg) (err error) {
if msg.Code >= rw.Length {
return newPeerError(errInvalidMsgCode, "not handled")
return NewPeerError(PeerErrorInvalidMessageCode, DiscProtocolError, nil, fmt.Sprintf("not handled code=%d", msg.Code))
}
msg.meterCap = rw.cap()
msg.meterCode = msg.Code

View File

@ -17,42 +17,88 @@
package p2p
import (
"errors"
"fmt"
)
type PeerErrorCode uint8
const (
errInvalidMsgCode = iota
errInvalidMsg
PeerErrorInvalidMessageCode PeerErrorCode = iota
PeerErrorInvalidMessage
PeerErrorPingFailure
PeerErrorDiscReason
PeerErrorDiscReasonRemote
PeerErrorMessageReceive
PeerErrorMessageSizeLimit
PeerErrorMessageObsolete
PeerErrorMessageSend
PeerErrorLocalStatusNeeded
PeerErrorStatusSend
PeerErrorStatusReceive
PeerErrorStatusDecode
PeerErrorStatusIncompatible
PeerErrorStatusHandshakeTimeout
PeerErrorStatusUnexpected
PeerErrorFirstMessageSend
PeerErrorTest
)
var errorToString = map[int]string{
errInvalidMsgCode: "invalid message code",
errInvalidMsg: "invalid message",
var peerErrorCodeToString = map[PeerErrorCode]string{
PeerErrorInvalidMessageCode: "invalid message code",
PeerErrorInvalidMessage: "invalid message",
PeerErrorPingFailure: "ping failure",
PeerErrorDiscReason: "disconnect reason",
PeerErrorDiscReasonRemote: "remote disconnect reason",
PeerErrorMessageReceive: "failed to receive a message",
PeerErrorMessageSizeLimit: "too big message",
PeerErrorMessageObsolete: "obsolete message",
PeerErrorMessageSend: "failed to send a message",
PeerErrorLocalStatusNeeded: "need a local status message",
PeerErrorStatusSend: "failed to send the local status",
PeerErrorStatusReceive: "failed to receive the remote status",
PeerErrorStatusDecode: "failed to decode the remote status",
PeerErrorStatusIncompatible: "incompatible remote status",
PeerErrorStatusHandshakeTimeout: "handshake timeout",
PeerErrorStatusUnexpected: "unexpected remote status",
PeerErrorFirstMessageSend: "failed to send the first message",
PeerErrorTest: "test error",
}
type peerError struct {
code int
message string
}
func newPeerError(code int, format string, v ...interface{}) *peerError {
desc, ok := errorToString[code]
if !ok {
panic("invalid error code")
func (c PeerErrorCode) String() string {
if len(peerErrorCodeToString) <= int(c) {
return fmt.Sprintf("unknown code %d", c)
}
err := &peerError{code, desc}
if format != "" {
err.message += ": " + fmt.Sprintf(format, v...)
return peerErrorCodeToString[c]
}
func (c PeerErrorCode) Error() string {
return c.String()
}
type PeerError struct {
Code PeerErrorCode
Reason DiscReason
Err error
Message string
}
func NewPeerError(code PeerErrorCode, reason DiscReason, err error, message string) *PeerError {
return &PeerError{
code,
reason,
err,
message,
}
return err
}
func (pe *peerError) Error() string {
return pe.message
func (pe *PeerError) String() string {
return fmt.Sprintf("PeerError(code=%s, reason=%s, err=%v, message=%s)",
pe.Code, pe.Reason, pe.Err, pe.Message)
}
var errProtocolReturned = errors.New("protocol returned")
func (pe *PeerError) Error() string {
return pe.String()
}
type DiscReason uint8
@ -98,22 +144,3 @@ func (d DiscReason) String() string {
func (d DiscReason) Error() string {
return d.String()
}
func discReasonForError(err error) DiscReason {
if reason, ok := err.(DiscReason); ok {
return reason
}
if err == errProtocolReturned {
return DiscQuitting
}
peerError, ok := err.(*peerError)
if ok {
switch peerError.code {
case errInvalidMsgCode, errInvalidMsg:
return DiscProtocolError
default:
return DiscSubprotocolError
}
}
return DiscSubprotocolError
}

View File

@ -37,11 +37,11 @@ import (
var discard = Protocol{
Name: "discard",
Length: 1,
Run: func(p *Peer, rw MsgReadWriter) error {
Run: func(p *Peer, rw MsgReadWriter) *PeerError {
for {
msg, err := rw.ReadMsg()
if err != nil {
return err
return NewPeerError(PeerErrorTest, DiscProtocolError, err, "peer_test: 'discard' protocol ReadMsg error")
}
fmt.Printf("discarding %d\n", msg.Code)
msg.Discard()
@ -84,7 +84,7 @@ func newNode(id enode.ID, addr string) *enode.Node {
return enode.SignNull(&r, id)
}
func testPeer(protos []Protocol) (func(), *conn, *Peer, <-chan error) {
func testPeer(protos []Protocol) (func(), *conn, *Peer, <-chan *PeerError) {
var (
fd1, fd2 = net.Pipe()
key1, key2 = newkey(), newkey()
@ -100,9 +100,9 @@ func testPeer(protos []Protocol) (func(), *conn, *Peer, <-chan error) {
}
peer := newPeer(log.Root(), c1, protos, [64]byte{1}, true)
errc := make(chan error, 1)
errc := make(chan *PeerError, 1)
go func() {
_, err := peer.run()
err := peer.run()
errc <- err
}()
@ -114,7 +114,7 @@ func TestPeerProtoReadMsg(t *testing.T) {
proto := Protocol{
Name: "a",
Length: 5,
Run: func(peer *Peer, rw MsgReadWriter) error {
Run: func(peer *Peer, rw MsgReadWriter) *PeerError {
if err := ExpectMsg(rw, 2, []uint{1}); err != nil {
t.Error(err)
}
@ -137,7 +137,7 @@ func TestPeerProtoReadMsg(t *testing.T) {
select {
case err := <-errc:
if err != errProtocolReturned {
if (err != nil) && (err.Reason != DiscQuitting) {
t.Errorf("peer returned error: %v", err)
}
case <-time.After(2 * time.Second):
@ -149,7 +149,7 @@ func TestPeerProtoEncodeMsg(t *testing.T) {
proto := Protocol{
Name: "a",
Length: 2,
Run: func(peer *Peer, rw MsgReadWriter) error {
Run: func(peer *Peer, rw MsgReadWriter) *PeerError {
if err := SendItems(rw, 2); err == nil {
t.Error("expected error for out-of-range msg code, got nil")
}
@ -203,17 +203,20 @@ func TestPeerDisconnectRace(t *testing.T) {
maybe := func() bool { return rand.Intn(2) == 1 }
for i := 0; i < 1000; i++ {
protoclose := make(chan error)
protodisc := make(chan DiscReason)
protoclose := make(chan *PeerError)
protodisc := make(chan *PeerError)
closer, rw, p, disc := testPeer([]Protocol{
{
Name: "closereq",
Run: func(p *Peer, rw MsgReadWriter) error { return <-protoclose },
Run: func(p *Peer, rw MsgReadWriter) *PeerError { return <-protoclose },
Length: 1,
},
{
Name: "disconnect",
Run: func(p *Peer, rw MsgReadWriter) error { p.Disconnect(<-protodisc); return nil },
Name: "disconnect",
Run: func(p *Peer, rw MsgReadWriter) *PeerError {
p.Disconnect(<-protodisc)
return nil
},
Length: 1,
},
})
@ -224,12 +227,12 @@ func TestPeerDisconnectRace(t *testing.T) {
// Close the network connection.
go closer()
// Make protocol "closereq" return.
protoclose <- errors.New("protocol closed")
protoclose <- NewPeerError(PeerErrorTest, DiscRequested, nil, "peer_test.TestPeerDisconnectRace: protocol closed")
// Make protocol "disconnect" call peer.Disconnect
protodisc <- DiscAlreadyConnected
protodisc <- NewPeerError(PeerErrorTest, DiscAlreadyConnected, nil, "peer_test.TestPeerDisconnectRace: protocol called peer.Disconnect()")
// In some cases, simulate something else calling peer.Disconnect.
if maybe() {
go p.Disconnect(DiscInvalidIdentity)
go p.Disconnect(NewPeerError(PeerErrorTest, DiscInvalidIdentity, nil, "peer_test.TestPeerDisconnectRace: something else called peer.Disconnect()"))
}
// In some cases, simulate remote requesting a disconnect.
if maybe() {
@ -262,7 +265,7 @@ func TestNewPeer(t *testing.T) {
t.Errorf("Caps mismatch: got %v, expected %v", p.Caps(), caps)
}
p.Disconnect(DiscAlreadyConnected) // Should not hang
p.Disconnect(NewPeerError(PeerErrorTest, DiscAlreadyConnected, nil, "TestNewPeer Disconnect")) // Should not hang
}
func TestMatchProtocols(t *testing.T) {

View File

@ -43,7 +43,7 @@ type Protocol struct {
// The peer connection is closed when Start returns. It should return
// any protocol-level error (such as an I/O error) that is
// encountered.
Run func(peer *Peer, rw MsgReadWriter) error
Run func(peer *Peer, rw MsgReadWriter) *PeerError
// NodeInfo is an optional helper method to retrieve protocol specific metadata
// about the host node.

View File

@ -221,8 +221,7 @@ type peerOpFunc func(map[enode.ID]*Peer)
type peerDrop struct {
*Peer
err error
requested bool // true if signaled by the peer
err *PeerError
}
type connFlag int32
@ -362,7 +361,7 @@ func (srv *Server) RemovePeer(node *enode.Node) {
if peer := peers[node.ID()]; peer != nil {
ch = make(chan *PeerEvent, 1)
sub = srv.peerFeed.Subscribe(ch)
peer.Disconnect(DiscRequested)
peer.Disconnect(NewPeerError(PeerErrorDiscReason, DiscRequested, nil, "Server.RemovePeer Disconnect"))
}
})
// Wait for the peer connection to end.
@ -815,7 +814,7 @@ running:
// The handshakes are done and it passed all checks.
p := srv.launchPeer(c, c.pubkey)
peers[c.node.ID()] = p
srv.logger.Trace("Adding p2p peer", "peercount", len(peers), "id", p.ID(), "conn", c.flags, "addr", p.RemoteAddr(), "name", p.Name())
srv.logger.Trace("Adding p2p peer", "peercount", len(peers), "url", p.Node(), "conn", c.flags, "name", p.Fullname())
srv.dialsched.peerAdded(c)
if p.Inbound() {
inboundCount++
@ -827,7 +826,7 @@ running:
// A peer disconnected.
d := common.PrettyDuration(mclock.Now() - pd.created)
delete(peers, pd.ID())
srv.logger.Trace("Removing p2p peer", "peercount", len(peers), "id", pd.ID(), "duration", d, "req", pd.requested, "err", pd.err)
srv.logger.Trace("Removing p2p peer", "peercount", len(peers), "url", pd.Node(), "duration", d, "err", pd.err)
srv.dialsched.peerRemoved(pd.rw)
if pd.Inbound() {
inboundCount--
@ -846,7 +845,7 @@ running:
}
// Disconnect all peers.
for _, p := range peers {
p.Disconnect(DiscQuitting)
p.Disconnect(NewPeerError(PeerErrorDiscReason, DiscQuitting, nil, "Server.run() spindown"))
}
// Wait for peers to shut down. Pending connections and tasks are
// not handled here and will terminate soon-ish because srv.quit
@ -1090,12 +1089,12 @@ func (srv *Server) runPeer(p *Peer) {
})
// Run the per-peer main loop.
remoteRequested, err := p.run()
err := p.run()
// Announce disconnect on the main loop to update the peer set.
// The main loop waits for existing peers to be sent on srv.delpeer
// before returning, so this send should not select on srv.quit.
srv.delpeer <- peerDrop{p, err, remoteRequested}
srv.delpeer <- peerDrop{p, err}
// Broadcast peer drop to external subscribers. This needs to be
// after the send to delpeer so subscribers have a consistent view of

View File

@ -43,7 +43,7 @@ func (t *NoopService) Protocols() []p2p.Protocol {
Name: "noop",
Version: 666,
Length: 0,
Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) *p2p.PeerError {
if t.c != nil {
t.c[peer.ID()] = make(chan struct{})
close(t.c[peer.ID()])

View File

@ -19,6 +19,7 @@ package p2p
import (
"bytes"
"errors"
"github.com/ledgerwatch/erigon/rlp"
"reflect"
"sync"
"testing"
@ -129,7 +130,7 @@ func TestProtocolHandshakeErrors(t *testing.T) {
{
code: handshakeMsg,
msg: []byte{1, 2, 3},
err: newPeerError(errInvalidMsg, "(code 0) (size 4) rlp: expected input list for p2p.protoHandshake"),
err: NewPeerError(PeerErrorInvalidMessage, DiscProtocolError, rlp.WrapStreamError(rlp.ErrExpectedList, reflect.TypeOf(protoHandshake{})), "(code 0) (size 4)"),
},
{
code: handshakeMsg,

View File

@ -171,6 +171,10 @@ func wrapStreamError(err error, typ reflect.Type) error {
return err
}
func WrapStreamError(err error, typ reflect.Type) error {
return wrapStreamError(err, typ)
}
func addErrorContext(err error, ctx string) error {
if decErr, ok := err.(*decodeError); ok {
decErr.ctx = append(decErr.ctx, ctx)