diff --git a/cmd/sentry/sentry/eth_handshake.go b/cmd/sentry/sentry/eth_handshake.go index ee5cbd025..768f57cd1 100644 --- a/cmd/sentry/sentry/eth_handshake.go +++ b/cmd/sentry/sentry/eth_handshake.go @@ -15,20 +15,24 @@ func readAndValidatePeerStatusMessage( status *proto_sentry.StatusData, version uint, minVersion uint, -) (*eth.StatusPacket, error) { +) (*eth.StatusPacket, *p2p.PeerError) { msg, err := rw.ReadMsg() if err != nil { - return nil, err + return nil, p2p.NewPeerError(p2p.PeerErrorStatusReceive, p2p.DiscNetworkError, err, "readAndValidatePeerStatusMessage rw.ReadMsg error") } reply, err := tryDecodeStatusMessage(&msg) msg.Discard() if err != nil { - return nil, err + return nil, p2p.NewPeerError(p2p.PeerErrorStatusDecode, p2p.DiscProtocolError, err, "readAndValidatePeerStatusMessage tryDecodeStatusMessage error") } err = checkPeerStatusCompatibility(reply, status, version, minVersion) - return reply, err + if err != nil { + return nil, p2p.NewPeerError(p2p.PeerErrorStatusIncompatible, p2p.DiscUselessPeer, err, "readAndValidatePeerStatusMessage checkPeerStatusCompatibility error") + } + + return reply, nil } func tryDecodeStatusMessage(msg *p2p.Msg) (*eth.StatusPacket, error) { @@ -48,8 +52,6 @@ func tryDecodeStatusMessage(msg *p2p.Msg) (*eth.StatusPacket, error) { return &reply, nil } -var NetworkIdMissmatchErr = fmt.Errorf("network id does not match") - func checkPeerStatusCompatibility( reply *eth.StatusPacket, status *proto_sentry.StatusData, @@ -58,7 +60,7 @@ func checkPeerStatusCompatibility( ) error { networkID := status.NetworkId if reply.NetworkID != networkID { - return fmt.Errorf("%w: theirs %d, ours %d", NetworkIdMissmatchErr, reply.NetworkID, networkID) + return fmt.Errorf("network id does not match: theirs %d, ours %d", reply.NetworkID, networkID) } if uint(reply.ProtocolVersion) > version { diff --git a/cmd/sentry/sentry/sentry_grpc_server.go b/cmd/sentry/sentry/sentry_grpc_server.go index e0cea7398..6146f91e7 100644 --- a/cmd/sentry/sentry/sentry_grpc_server.go +++ b/cmd/sentry/sentry/sentry_grpc_server.go @@ -62,10 +62,14 @@ type PeerInfo struct { rw p2p.MsgReadWriter protocol uint - removed chan struct{} // close this channel on remove - ctx context.Context - ctxCancel context.CancelFunc - removeOnce sync.Once + ctx context.Context + ctxCancel context.CancelFunc + + // this channel is closed on Remove() + removed chan struct{} + removeReason *p2p.PeerError + removeOnce sync.Once + // each peer has own worker (goroutine) - all funcs from this queue will execute on this worker // if this queue is full (means peer is slow) - old messages will be dropped // channel closed on peer remove @@ -116,7 +120,14 @@ func (bp *PeersByMinBlock) Pop() interface{} { func NewPeerInfo(peer *p2p.Peer, rw p2p.MsgReadWriter) *PeerInfo { ctx, cancel := context.WithCancel(context.Background()) - p := &PeerInfo{peer: peer, rw: rw, removed: make(chan struct{}), tasks: make(chan func(), 16), ctx: ctx, ctxCancel: cancel} + p := &PeerInfo{ + peer: peer, + rw: rw, + removed: make(chan struct{}), + tasks: make(chan func(), 16), + ctx: ctx, + ctxCancel: cancel, + } p.lock.RLock() t := p.tasks @@ -193,10 +204,9 @@ func (pi *PeerInfo) LatestDeadline() time.Time { return pi.latestDealine } -func (pi *PeerInfo) Remove() { - pi.lock.Lock() - defer pi.lock.Unlock() +func (pi *PeerInfo) Remove(reason *p2p.PeerError) { pi.removeOnce.Do(func() { + pi.removeReason = reason close(pi.removed) pi.ctxCancel() }) @@ -230,12 +240,12 @@ func (pi *PeerInfo) Async(f func(), logger log.Logger) { } } -func (pi *PeerInfo) Removed() bool { +func (pi *PeerInfo) RemoveReason() *p2p.PeerError { select { case <-pi.removed: - return true + return pi.removeReason default: - return false + return nil } } @@ -273,9 +283,9 @@ func handShake( rw p2p.MsgReadWriter, version uint, minVersion uint, -) (*libcommon.Hash, error) { +) (*libcommon.Hash, *p2p.PeerError) { // Send out own handshake in a new thread - errChan := make(chan error, 2) + errChan := make(chan *p2p.PeerError, 2) resultChan := make(chan *eth.StatusPacket, 1) ourTD := gointerfaces.ConvertH256ToUint256Int(status.TotalDifficulty) @@ -294,11 +304,11 @@ func handShake( } err := p2p.Send(rw, eth.StatusMsg, status) - if err != nil { - err = fmt.Errorf("sentry.handShake failed to send eth Status: %w", err) + if err == nil { + errChan <- nil + } else { + errChan <- p2p.NewPeerError(p2p.PeerErrorStatusSend, p2p.DiscNetworkError, err, "sentry.handShake failed to send eth Status") } - - errChan <- err }() go func() { @@ -307,11 +317,10 @@ func handShake( if err == nil { resultChan <- status + errChan <- nil } else { - err = fmt.Errorf("sentry.handShake failed to receive/validate a remote peer eth Status: %w", err) + errChan <- err } - - errChan <- err }() timeout := time.NewTimer(handshakeTimeout) @@ -323,9 +332,9 @@ func handShake( return nil, err } case <-timeout.C: - return nil, p2p.DiscReadTimeout + return nil, p2p.NewPeerError(p2p.PeerErrorStatusHandshakeTimeout, p2p.DiscReadTimeout, nil, "sentry.handShake timeout") case <-ctx.Done(): - return nil, ctx.Err() + return nil, p2p.NewPeerError(p2p.PeerErrorDiscReason, p2p.DiscQuitting, ctx.Err(), "sentry.handShake ctx.Done") } } @@ -342,7 +351,7 @@ func runPeer( send func(msgId proto_sentry.MessageId, peerID [64]byte, b []byte), hasSubscribers func(msgId proto_sentry.MessageId) bool, logger log.Logger, -) error { +) *p2p.PeerError { printTime := time.Now().Add(time.Minute) peerPrinted := false defer func() { @@ -355,6 +364,7 @@ func runPeer( logger.Trace("Peer disconnected", "id", peerID, "name", peerInfo.peer.Fullname()) } }() + for { if !peerPrinted { if time.Now().After(printTime) { @@ -363,25 +373,27 @@ func runPeer( } } if err := libcommon.Stopped(ctx.Done()); err != nil { + return p2p.NewPeerError(p2p.PeerErrorDiscReason, p2p.DiscQuitting, ctx.Err(), "sentry.runPeer: context stopped") + } + if err := peerInfo.RemoveReason(); err != nil { return err } - if peerInfo.Removed() { - return fmt.Errorf("peer removed") - } + msg, err := rw.ReadMsg() if err != nil { - return fmt.Errorf("reading message: %w", err) + return p2p.NewPeerError(p2p.PeerErrorMessageReceive, p2p.DiscNetworkError, err, "sentry.runPeer: ReadMsg error") } if msg.Size > eth.ProtocolMaxMsgSize { msg.Discard() - return fmt.Errorf("message is too large %d, limit %d", msg.Size, eth.ProtocolMaxMsgSize) + return p2p.NewPeerError(p2p.PeerErrorMessageSizeLimit, p2p.DiscSubprotocolError, nil, fmt.Sprintf("sentry.runPeer: message is too large %d, limit %d", msg.Size, eth.ProtocolMaxMsgSize)) } + givePermit := false switch msg.Code { case eth.StatusMsg: msg.Discard() // Status messages should never arrive after the handshake - return fmt.Errorf("uncontrolled status message") + return p2p.NewPeerError(p2p.PeerErrorStatusUnexpected, p2p.DiscSubprotocolError, nil, "sentry.runPeer: unexpected status message") case eth.GetBlockHeadersMsg: if !hasSubscribers(eth.ToProto[protocol][msg.Code]) { continue @@ -424,7 +436,7 @@ func runPeer( case eth.GetNodeDataMsg: if protocol >= direct.ETH67 { msg.Discard() - return fmt.Errorf("unexpected GetNodeDataMsg from %s in eth/%d", peerID, protocol) + return p2p.NewPeerError(p2p.PeerErrorMessageObsolete, p2p.DiscSubprotocolError, nil, fmt.Sprintf("unexpected GetNodeDataMsg from %s in eth/%d", peerID, protocol)) } if !hasSubscribers(eth.ToProto[protocol][msg.Code]) { continue @@ -581,12 +593,11 @@ func NewGrpcServer(ctx context.Context, dialCandidates func() enode.Iterator, re Version: protocol, Length: 17, DialCandidates: disc, - Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) error { + Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) *p2p.PeerError { peerID := peer.Pubkey() printablePeerID := hex.EncodeToString(peerID[:])[:20] if ss.getPeer(peerID) != nil { - logger.Trace("[p2p] peer already has connection", "peerId", printablePeerID) - return nil + return p2p.NewPeerError(p2p.PeerErrorDiscReason, p2p.DiscAlreadyConnected, nil, "peer already has connection") } logger.Trace("[p2p] start with peer", "peerId", printablePeerID) @@ -599,20 +610,12 @@ func NewGrpcServer(ctx context.Context, dialCandidates func() enode.Iterator, re status := ss.GetStatus() if status == nil { - err := fmt.Errorf("could not get status message from core for peer %s connection", printablePeerID) - logger.Trace("[p2p] Handshake failure", "peer", printablePeerID, "local", peer.LocalAddr(), "remote", peer.RemoteAddr(), "err", err) - return err + return p2p.NewPeerError(p2p.PeerErrorLocalStatusNeeded, p2p.DiscProtocolError, nil, "could not get status message from core") } peerBestHash, err := handShake(ctx, status, rw, protocol, protocol) - if err != nil { - if errors.Is(err, NetworkIdMissmatchErr) || errors.Is(err, io.EOF) || errors.Is(err, p2p.ErrShuttingDown) { - logger.Trace("[p2p] Handshake failure", "peer", printablePeerID, "err", err) - } else { - logger.Debug("[p2p] Handshake failure", "peer", printablePeerID, "err", err) - } - return fmt.Errorf("[p2p] handshake to peer %s: %w", printablePeerID, err) + return err } // handshake is successful @@ -620,10 +623,9 @@ func NewGrpcServer(ctx context.Context, dialCandidates func() enode.Iterator, re ss.GoodPeers.Store(peerID, peerInfo) ss.sendNewPeerToClients(gointerfaces.ConvertHashToH512(peerID)) - err = ss.startSync(ctx, *peerBestHash, peerID) + getBlockHeadersErr := ss.getBlockHeaders(ctx, *peerBestHash, peerID) if err != nil { - logger.Error("[p2p] p2p.Protocol.Run startSync failure", "peer", printablePeerID, "err", err) - return fmt.Errorf("[p2p] startSync for peer %s: %w", printablePeerID, err) + return p2p.NewPeerError(p2p.PeerErrorFirstMessageSend, p2p.DiscNetworkError, getBlockHeadersErr, "p2p.Protocol.Run getBlockHeaders failure") } err = runPeer( @@ -635,10 +637,9 @@ func NewGrpcServer(ctx context.Context, dialCandidates func() enode.Iterator, re ss.send, ss.hasSubscribers, logger, - ) // runPeer never returns a nil error - logger.Trace("[p2p] error while running peer", "peerId", printablePeerID, "err", err) + ) ss.sendGonePeerToClients(gointerfaces.ConvertHashToH512(peerID)) - return nil + return err }, NodeInfo: func() interface{} { return readNodeInfo() @@ -718,11 +719,11 @@ func (ss *GrpcServer) getPeer(peerID [64]byte) (peerInfo *PeerInfo) { return nil } -func (ss *GrpcServer) removePeer(peerID [64]byte) { +func (ss *GrpcServer) removePeer(peerID [64]byte, reason *p2p.PeerError) { if value, ok := ss.GoodPeers.LoadAndDelete(peerID); ok { peerInfo := value.(*PeerInfo) if peerInfo != nil { - peerInfo.Remove() + peerInfo.Remove(reason) } } } @@ -731,11 +732,8 @@ func (ss *GrpcServer) writePeer(logPrefix string, peerInfo *PeerInfo, msgcode ui peerInfo.Async(func() { err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(data)), Payload: bytes.NewReader(data)}) if err != nil { - peerInfo.Remove() + peerInfo.Remove(p2p.NewPeerError(p2p.PeerErrorMessageSend, p2p.DiscNetworkError, err, fmt.Sprintf("%s writePeer msgcode=%d", logPrefix, msgcode))) ss.GoodPeers.Delete(peerInfo.ID()) - if !errors.Is(err, p2p.ErrShuttingDown) { - ss.logger.Debug(logPrefix, "msgcode", msgcode, "err", err) - } } else { if ttl > 0 { peerInfo.AddDeadline(time.Now().Add(ttl)) @@ -744,7 +742,7 @@ func (ss *GrpcServer) writePeer(logPrefix string, peerInfo *PeerInfo, msgcode ui }, ss.logger) } -func (ss *GrpcServer) startSync(ctx context.Context, bestHash libcommon.Hash, peerID [64]byte) error { +func (ss *GrpcServer) getBlockHeaders(ctx context.Context, bestHash libcommon.Hash, peerID [64]byte) error { b, err := rlp.EncodeToBytes(ð.GetBlockHeadersPacket66{ RequestId: rand.Uint64(), // nolint: gosec GetBlockHeadersPacket: ð.GetBlockHeadersPacket{ @@ -755,7 +753,7 @@ func (ss *GrpcServer) startSync(ctx context.Context, bestHash libcommon.Hash, pe }, }) if err != nil { - return fmt.Errorf("startSync encode packet failed: %w", err) + return fmt.Errorf("GrpcServer.getBlockHeaders encode packet failed: %w", err) } if _, err := ss.SendMessageById(ctx, &proto_sentry.SendMessageByIdRequest{ PeerId: gointerfaces.ConvertHashToH512(peerID), @@ -774,9 +772,7 @@ func (ss *GrpcServer) PenalizePeer(_ context.Context, req *proto_sentry.Penalize peerID := ConvertH512ToPeerID(req.PeerId) peerInfo := ss.getPeer(peerID) if ss.statusData != nil && peerInfo != nil && !peerInfo.peer.Info().Network.Static && !peerInfo.peer.Info().Network.Trusted { - ss.removePeer(peerID) - printablePeerID := hex.EncodeToString(peerID[:])[:8] - ss.logger.Debug("[p2p] Penalized peer", "peerId", printablePeerID, "name", peerInfo.peer.Name()) + ss.removePeer(peerID, p2p.NewPeerError(p2p.PeerErrorDiscReason, p2p.DiscRequested, nil, "penalized peer")) } return &emptypb.Empty{}, nil } diff --git a/cmd/sentry/sentry/sentry_grpc_server_test.go b/cmd/sentry/sentry/sentry_grpc_server_test.go index 4c8950a51..93b0c4b8e 100644 --- a/cmd/sentry/sentry/sentry_grpc_server_test.go +++ b/cmd/sentry/sentry/sentry_grpc_server_test.go @@ -59,7 +59,7 @@ func startHandshake( status *proto_sentry.StatusData, pipe *p2p.MsgPipeRW, protocolVersion uint, - errChan chan error, + errChan chan *p2p.PeerError, ) { go func() { _, err := handShake(ctx, status, pipe, protocolVersion, protocolVersion) @@ -110,7 +110,7 @@ func testForkIDSplit(t *testing.T, protocol uint) { defer p2pNoFork.Close() defer p2pProFork.Close() - errc := make(chan error, 2) + errc := make(chan *p2p.PeerError, 2) startHandshake(ctx, s1.GetStatus(), p2pNoFork, protocol, errc) startHandshake(ctx, s2.GetStatus(), p2pProFork, protocol, errc) diff --git a/p2p/message.go b/p2p/message.go index ed1067c9a..90c291645 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -56,7 +56,7 @@ type Msg struct { func (msg Msg) Decode(val interface{}) error { s := rlp.NewStream(msg.Payload, uint64(msg.Size)) if err := s.Decode(val); err != nil { - return newPeerError(errInvalidMsg, "(code %x) (size %d) %v", msg.Code, msg.Size, err) + return NewPeerError(PeerErrorInvalidMessage, DiscProtocolError, err, fmt.Sprintf("(code %x) (size %d)", msg.Code, msg.Size)) } return nil } diff --git a/p2p/peer.go b/p2p/peer.go index 0c68e4882..d783dafbf 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -112,9 +112,9 @@ type Peer struct { created mclock.AbsTime wg sync.WaitGroup - protoErr chan error + protoErr chan *PeerError closed chan struct{} - disc chan DiscReason + disc chan *PeerError // events receives message send / receive events if set events *event.Feed @@ -192,9 +192,9 @@ func (p *Peer) LocalAddr() net.Addr { // Disconnect terminates the peer connection with the given reason. // It returns immediately and does not wait until the connection is closed. -func (p *Peer) Disconnect(reason DiscReason) { +func (p *Peer) Disconnect(err *PeerError) { select { - case p.disc <- reason: + case p.disc <- err: case <-p.closed: } } @@ -216,8 +216,8 @@ func newPeer(logger log.Logger, conn *conn, protocols []Protocol, pubkey [64]byt rw: conn, running: protomap, created: mclock.Now(), - disc: make(chan DiscReason), - protoErr: make(chan error, len(protomap)+1), // protocols + pingLoop + disc: make(chan *PeerError), + protoErr: make(chan *PeerError, len(protomap)+1), // protocols + pingLoop closed: make(chan struct{}), log: logger.New("id", conn.node.ID(), "conn", conn.flags), pubkey: pubkey, @@ -230,13 +230,13 @@ func (p *Peer) Log() log.Logger { return p.log } -func (p *Peer) run() (remoteRequested bool, err error) { +func (p *Peer) run() (peerErr *PeerError) { var ( writeStart = make(chan struct{}, 1) writeErr = make(chan error, 1) readErr = make(chan error, 1) - reason DiscReason // sent to the peer ) + p.wg.Add(2) go p.readLoop(readErr) go p.pingLoop() @@ -245,39 +245,33 @@ func (p *Peer) run() (remoteRequested bool, err error) { writeStart <- struct{}{} p.startProtocols(writeStart, writeErr) + defer func() { + close(p.closed) + p.rw.close(peerErr.Reason) + p.wg.Wait() + }() + // Wait for an error or disconnect. -loop: for { select { - case err = <-writeErr: - // A write finished. Allow the next write to start if - // there was no error. + case err := <-writeErr: if err != nil { - reason = DiscNetworkError - break loop + return NewPeerError(PeerErrorDiscReason, DiscNetworkError, err, "Peer.run writeErr") } + // Allow the next write to start if there was no error. writeStart <- struct{}{} - case err = <-readErr: - if r, ok := err.(DiscReason); ok { - remoteRequested = true - reason = r + case err := <-readErr: + if reason, ok := err.(DiscReason); ok { + return NewPeerError(PeerErrorDiscReasonRemote, reason, nil, "Peer.run got a remote DiscReason") } else { - reason = DiscNetworkError + return NewPeerError(PeerErrorDiscReason, DiscNetworkError, err, "Peer.run readErr") } - break loop - case err = <-p.protoErr: - reason = discReasonForError(err) - break loop - case err = <-p.disc: - reason = discReasonForError(err) - break loop + case err := <-p.protoErr: + return err + case err := <-p.disc: + return err } } - - close(p.closed) - p.rw.close(reason) - p.wg.Wait() - return remoteRequested, err } func (p *Peer) pingLoop() { @@ -289,7 +283,7 @@ func (p *Peer) pingLoop() { select { case <-ping.C: if err := SendItems(p.rw, pingMsg); err != nil { - p.protoErr <- err + p.protoErr <- NewPeerError(PeerErrorPingFailure, DiscNetworkError, err, "Failed to send pingMsg") return } ping.Reset(pingInterval) @@ -407,11 +401,9 @@ func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) defer debug.LogPanic() defer p.wg.Done() err := proto.Run(p, rw) + // only unit test protocols can return nil if err == nil { - p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version)) - err = errProtocolReturned - } else if err != io.EOF { - p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err) + err = NewPeerError(PeerErrorTest, DiscQuitting, nil, fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version)) } p.protoErr <- err }() @@ -426,7 +418,7 @@ func (p *Peer) getProto(code uint64) (*protoRW, error) { return proto, nil } } - return nil, newPeerError(errInvalidMsgCode, "%d", code) + return nil, NewPeerError(PeerErrorInvalidMessageCode, DiscProtocolError, nil, fmt.Sprintf("code=%d", code)) } type protoRW struct { @@ -441,7 +433,7 @@ type protoRW struct { func (rw *protoRW) WriteMsg(msg Msg) (err error) { if msg.Code >= rw.Length { - return newPeerError(errInvalidMsgCode, "not handled") + return NewPeerError(PeerErrorInvalidMessageCode, DiscProtocolError, nil, fmt.Sprintf("not handled code=%d", msg.Code)) } msg.meterCap = rw.cap() msg.meterCode = msg.Code diff --git a/p2p/peer_error.go b/p2p/peer_error.go index aad1a65c7..b5a26a2a3 100644 --- a/p2p/peer_error.go +++ b/p2p/peer_error.go @@ -17,42 +17,88 @@ package p2p import ( - "errors" "fmt" ) +type PeerErrorCode uint8 + const ( - errInvalidMsgCode = iota - errInvalidMsg + PeerErrorInvalidMessageCode PeerErrorCode = iota + PeerErrorInvalidMessage + PeerErrorPingFailure + PeerErrorDiscReason + PeerErrorDiscReasonRemote + PeerErrorMessageReceive + PeerErrorMessageSizeLimit + PeerErrorMessageObsolete + PeerErrorMessageSend + PeerErrorLocalStatusNeeded + PeerErrorStatusSend + PeerErrorStatusReceive + PeerErrorStatusDecode + PeerErrorStatusIncompatible + PeerErrorStatusHandshakeTimeout + PeerErrorStatusUnexpected + PeerErrorFirstMessageSend + PeerErrorTest ) -var errorToString = map[int]string{ - errInvalidMsgCode: "invalid message code", - errInvalidMsg: "invalid message", +var peerErrorCodeToString = map[PeerErrorCode]string{ + PeerErrorInvalidMessageCode: "invalid message code", + PeerErrorInvalidMessage: "invalid message", + PeerErrorPingFailure: "ping failure", + PeerErrorDiscReason: "disconnect reason", + PeerErrorDiscReasonRemote: "remote disconnect reason", + PeerErrorMessageReceive: "failed to receive a message", + PeerErrorMessageSizeLimit: "too big message", + PeerErrorMessageObsolete: "obsolete message", + PeerErrorMessageSend: "failed to send a message", + PeerErrorLocalStatusNeeded: "need a local status message", + PeerErrorStatusSend: "failed to send the local status", + PeerErrorStatusReceive: "failed to receive the remote status", + PeerErrorStatusDecode: "failed to decode the remote status", + PeerErrorStatusIncompatible: "incompatible remote status", + PeerErrorStatusHandshakeTimeout: "handshake timeout", + PeerErrorStatusUnexpected: "unexpected remote status", + PeerErrorFirstMessageSend: "failed to send the first message", + PeerErrorTest: "test error", } -type peerError struct { - code int - message string -} - -func newPeerError(code int, format string, v ...interface{}) *peerError { - desc, ok := errorToString[code] - if !ok { - panic("invalid error code") +func (c PeerErrorCode) String() string { + if len(peerErrorCodeToString) <= int(c) { + return fmt.Sprintf("unknown code %d", c) } - err := &peerError{code, desc} - if format != "" { - err.message += ": " + fmt.Sprintf(format, v...) + return peerErrorCodeToString[c] +} + +func (c PeerErrorCode) Error() string { + return c.String() +} + +type PeerError struct { + Code PeerErrorCode + Reason DiscReason + Err error + Message string +} + +func NewPeerError(code PeerErrorCode, reason DiscReason, err error, message string) *PeerError { + return &PeerError{ + code, + reason, + err, + message, } - return err } -func (pe *peerError) Error() string { - return pe.message +func (pe *PeerError) String() string { + return fmt.Sprintf("PeerError(code=%s, reason=%s, err=%v, message=%s)", + pe.Code, pe.Reason, pe.Err, pe.Message) } -var errProtocolReturned = errors.New("protocol returned") +func (pe *PeerError) Error() string { + return pe.String() +} type DiscReason uint8 @@ -98,22 +144,3 @@ func (d DiscReason) String() string { func (d DiscReason) Error() string { return d.String() } - -func discReasonForError(err error) DiscReason { - if reason, ok := err.(DiscReason); ok { - return reason - } - if err == errProtocolReturned { - return DiscQuitting - } - peerError, ok := err.(*peerError) - if ok { - switch peerError.code { - case errInvalidMsgCode, errInvalidMsg: - return DiscProtocolError - default: - return DiscSubprotocolError - } - } - return DiscSubprotocolError -} diff --git a/p2p/peer_test.go b/p2p/peer_test.go index 10131a3b9..45b0e89f6 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -37,11 +37,11 @@ import ( var discard = Protocol{ Name: "discard", Length: 1, - Run: func(p *Peer, rw MsgReadWriter) error { + Run: func(p *Peer, rw MsgReadWriter) *PeerError { for { msg, err := rw.ReadMsg() if err != nil { - return err + return NewPeerError(PeerErrorTest, DiscProtocolError, err, "peer_test: 'discard' protocol ReadMsg error") } fmt.Printf("discarding %d\n", msg.Code) msg.Discard() @@ -84,7 +84,7 @@ func newNode(id enode.ID, addr string) *enode.Node { return enode.SignNull(&r, id) } -func testPeer(protos []Protocol) (func(), *conn, *Peer, <-chan error) { +func testPeer(protos []Protocol) (func(), *conn, *Peer, <-chan *PeerError) { var ( fd1, fd2 = net.Pipe() key1, key2 = newkey(), newkey() @@ -100,9 +100,9 @@ func testPeer(protos []Protocol) (func(), *conn, *Peer, <-chan error) { } peer := newPeer(log.Root(), c1, protos, [64]byte{1}, true) - errc := make(chan error, 1) + errc := make(chan *PeerError, 1) go func() { - _, err := peer.run() + err := peer.run() errc <- err }() @@ -114,7 +114,7 @@ func TestPeerProtoReadMsg(t *testing.T) { proto := Protocol{ Name: "a", Length: 5, - Run: func(peer *Peer, rw MsgReadWriter) error { + Run: func(peer *Peer, rw MsgReadWriter) *PeerError { if err := ExpectMsg(rw, 2, []uint{1}); err != nil { t.Error(err) } @@ -137,7 +137,7 @@ func TestPeerProtoReadMsg(t *testing.T) { select { case err := <-errc: - if err != errProtocolReturned { + if (err != nil) && (err.Reason != DiscQuitting) { t.Errorf("peer returned error: %v", err) } case <-time.After(2 * time.Second): @@ -149,7 +149,7 @@ func TestPeerProtoEncodeMsg(t *testing.T) { proto := Protocol{ Name: "a", Length: 2, - Run: func(peer *Peer, rw MsgReadWriter) error { + Run: func(peer *Peer, rw MsgReadWriter) *PeerError { if err := SendItems(rw, 2); err == nil { t.Error("expected error for out-of-range msg code, got nil") } @@ -203,17 +203,20 @@ func TestPeerDisconnectRace(t *testing.T) { maybe := func() bool { return rand.Intn(2) == 1 } for i := 0; i < 1000; i++ { - protoclose := make(chan error) - protodisc := make(chan DiscReason) + protoclose := make(chan *PeerError) + protodisc := make(chan *PeerError) closer, rw, p, disc := testPeer([]Protocol{ { Name: "closereq", - Run: func(p *Peer, rw MsgReadWriter) error { return <-protoclose }, + Run: func(p *Peer, rw MsgReadWriter) *PeerError { return <-protoclose }, Length: 1, }, { - Name: "disconnect", - Run: func(p *Peer, rw MsgReadWriter) error { p.Disconnect(<-protodisc); return nil }, + Name: "disconnect", + Run: func(p *Peer, rw MsgReadWriter) *PeerError { + p.Disconnect(<-protodisc) + return nil + }, Length: 1, }, }) @@ -224,12 +227,12 @@ func TestPeerDisconnectRace(t *testing.T) { // Close the network connection. go closer() // Make protocol "closereq" return. - protoclose <- errors.New("protocol closed") + protoclose <- NewPeerError(PeerErrorTest, DiscRequested, nil, "peer_test.TestPeerDisconnectRace: protocol closed") // Make protocol "disconnect" call peer.Disconnect - protodisc <- DiscAlreadyConnected + protodisc <- NewPeerError(PeerErrorTest, DiscAlreadyConnected, nil, "peer_test.TestPeerDisconnectRace: protocol called peer.Disconnect()") // In some cases, simulate something else calling peer.Disconnect. if maybe() { - go p.Disconnect(DiscInvalidIdentity) + go p.Disconnect(NewPeerError(PeerErrorTest, DiscInvalidIdentity, nil, "peer_test.TestPeerDisconnectRace: something else called peer.Disconnect()")) } // In some cases, simulate remote requesting a disconnect. if maybe() { @@ -262,7 +265,7 @@ func TestNewPeer(t *testing.T) { t.Errorf("Caps mismatch: got %v, expected %v", p.Caps(), caps) } - p.Disconnect(DiscAlreadyConnected) // Should not hang + p.Disconnect(NewPeerError(PeerErrorTest, DiscAlreadyConnected, nil, "TestNewPeer Disconnect")) // Should not hang } func TestMatchProtocols(t *testing.T) { diff --git a/p2p/protocol.go b/p2p/protocol.go index 819526228..bcd40c5dc 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -43,7 +43,7 @@ type Protocol struct { // The peer connection is closed when Start returns. It should return // any protocol-level error (such as an I/O error) that is // encountered. - Run func(peer *Peer, rw MsgReadWriter) error + Run func(peer *Peer, rw MsgReadWriter) *PeerError // NodeInfo is an optional helper method to retrieve protocol specific metadata // about the host node. diff --git a/p2p/server.go b/p2p/server.go index e03a6049b..e859f3776 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -221,8 +221,7 @@ type peerOpFunc func(map[enode.ID]*Peer) type peerDrop struct { *Peer - err error - requested bool // true if signaled by the peer + err *PeerError } type connFlag int32 @@ -362,7 +361,7 @@ func (srv *Server) RemovePeer(node *enode.Node) { if peer := peers[node.ID()]; peer != nil { ch = make(chan *PeerEvent, 1) sub = srv.peerFeed.Subscribe(ch) - peer.Disconnect(DiscRequested) + peer.Disconnect(NewPeerError(PeerErrorDiscReason, DiscRequested, nil, "Server.RemovePeer Disconnect")) } }) // Wait for the peer connection to end. @@ -815,7 +814,7 @@ running: // The handshakes are done and it passed all checks. p := srv.launchPeer(c, c.pubkey) peers[c.node.ID()] = p - srv.logger.Trace("Adding p2p peer", "peercount", len(peers), "id", p.ID(), "conn", c.flags, "addr", p.RemoteAddr(), "name", p.Name()) + srv.logger.Trace("Adding p2p peer", "peercount", len(peers), "url", p.Node(), "conn", c.flags, "name", p.Fullname()) srv.dialsched.peerAdded(c) if p.Inbound() { inboundCount++ @@ -827,7 +826,7 @@ running: // A peer disconnected. d := common.PrettyDuration(mclock.Now() - pd.created) delete(peers, pd.ID()) - srv.logger.Trace("Removing p2p peer", "peercount", len(peers), "id", pd.ID(), "duration", d, "req", pd.requested, "err", pd.err) + srv.logger.Trace("Removing p2p peer", "peercount", len(peers), "url", pd.Node(), "duration", d, "err", pd.err) srv.dialsched.peerRemoved(pd.rw) if pd.Inbound() { inboundCount-- @@ -846,7 +845,7 @@ running: } // Disconnect all peers. for _, p := range peers { - p.Disconnect(DiscQuitting) + p.Disconnect(NewPeerError(PeerErrorDiscReason, DiscQuitting, nil, "Server.run() spindown")) } // Wait for peers to shut down. Pending connections and tasks are // not handled here and will terminate soon-ish because srv.quit @@ -1090,12 +1089,12 @@ func (srv *Server) runPeer(p *Peer) { }) // Run the per-peer main loop. - remoteRequested, err := p.run() + err := p.run() // Announce disconnect on the main loop to update the peer set. // The main loop waits for existing peers to be sent on srv.delpeer // before returning, so this send should not select on srv.quit. - srv.delpeer <- peerDrop{p, err, remoteRequested} + srv.delpeer <- peerDrop{p, err} // Broadcast peer drop to external subscribers. This needs to be // after the send to delpeer so subscribers have a consistent view of diff --git a/p2p/simulations/test.go b/p2p/simulations/test.go index 0b5d44c8a..00b88ce2b 100644 --- a/p2p/simulations/test.go +++ b/p2p/simulations/test.go @@ -43,7 +43,7 @@ func (t *NoopService) Protocols() []p2p.Protocol { Name: "noop", Version: 666, Length: 0, - Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) error { + Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) *p2p.PeerError { if t.c != nil { t.c[peer.ID()] = make(chan struct{}) close(t.c[peer.ID()]) diff --git a/p2p/transport_test.go b/p2p/transport_test.go index 43a8f3274..9e38e1eba 100644 --- a/p2p/transport_test.go +++ b/p2p/transport_test.go @@ -19,6 +19,7 @@ package p2p import ( "bytes" "errors" + "github.com/ledgerwatch/erigon/rlp" "reflect" "sync" "testing" @@ -129,7 +130,7 @@ func TestProtocolHandshakeErrors(t *testing.T) { { code: handshakeMsg, msg: []byte{1, 2, 3}, - err: newPeerError(errInvalidMsg, "(code 0) (size 4) rlp: expected input list for p2p.protoHandshake"), + err: NewPeerError(PeerErrorInvalidMessage, DiscProtocolError, rlp.WrapStreamError(rlp.ErrExpectedList, reflect.TypeOf(protoHandshake{})), "(code 0) (size 4)"), }, { code: handshakeMsg, diff --git a/rlp/decode.go b/rlp/decode.go index 1eebcd9ac..4824946e5 100644 --- a/rlp/decode.go +++ b/rlp/decode.go @@ -171,6 +171,10 @@ func wrapStreamError(err error, typ reflect.Type) error { return err } +func WrapStreamError(err error, typ reflect.Type) error { + return wrapStreamError(err, typ) +} + func addErrorContext(err error, ctx string) error { if decErr, ok := err.(*decodeError); ok { decErr.ctx = append(decErr.ctx, ctx)