diff --git a/p2p/peer.go b/p2p/peer.go index a20adc7dc..88ea4bb50 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -114,6 +114,7 @@ type Peer struct { wg sync.WaitGroup protoErr chan *PeerError closed chan struct{} + pingRecv chan struct{} disc chan *PeerError // events receives message send / receive events if set @@ -219,6 +220,7 @@ func newPeer(logger log.Logger, conn *conn, protocols []Protocol, pubkey [64]byt disc: make(chan *PeerError), protoErr: make(chan *PeerError, len(protomap)+1), // protocols + pingLoop closed: make(chan struct{}), + pingRecv: make(chan struct{}, 16), log: logger.New("id", conn.node.ID(), "conn", conn.flags), pubkey: pubkey, metricsEnabled: metricsEnabled, @@ -287,6 +289,11 @@ func (p *Peer) pingLoop() { return } ping.Reset(pingInterval) + case <-p.pingRecv: + if err := SendItems(p.rw, pongMsg); err != nil { + p.protoErr <- NewPeerError(PeerErrorPongFailure, DiscNetworkError, err, "Failed to send pongMsg") + return + } case <-p.closed: return } @@ -314,7 +321,10 @@ func (p *Peer) handle(msg Msg) error { switch { case msg.Code == pingMsg: msg.Discard() - go SendItems(p.rw, pongMsg) + select { + case p.pingRecv <- struct{}{}: + case <-p.closed: + } case msg.Code == discMsg: // This is the last message. // We don't need to discard because the connection will be closed after it. diff --git a/p2p/peer_error.go b/p2p/peer_error.go index b5a26a2a3..f87737053 100644 --- a/p2p/peer_error.go +++ b/p2p/peer_error.go @@ -26,6 +26,7 @@ const ( PeerErrorInvalidMessageCode PeerErrorCode = iota PeerErrorInvalidMessage PeerErrorPingFailure + PeerErrorPongFailure PeerErrorDiscReason PeerErrorDiscReasonRemote PeerErrorMessageReceive @@ -47,6 +48,7 @@ var peerErrorCodeToString = map[PeerErrorCode]string{ PeerErrorInvalidMessageCode: "invalid message code", PeerErrorInvalidMessage: "invalid message", PeerErrorPingFailure: "ping failure", + PeerErrorPongFailure: "pong failure", PeerErrorDiscReason: "disconnect reason", PeerErrorDiscReasonRemote: "remote disconnect reason", PeerErrorMessageReceive: "failed to receive a message",