Reset Streams Properly (#6066)

* reset streams

* fix build
This commit is contained in:
Nishant Das 2020-06-01 16:14:47 +08:00 committed by GitHub
parent 6598c38625
commit ceaceeb33c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 27 additions and 12 deletions

View File

@ -15,8 +15,6 @@ import (
)
const (
// The time to wait before disconnecting a peer.
flushDuration = 50 * time.Millisecond
// The time to wait for a status request.
timeForStatus = 10 * time.Second
)
@ -85,9 +83,6 @@ func (s *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer
if err := goodbyeFunc(context.Background(), remotePeer); err != nil {
log.WithError(err).Trace("Unable to send goodbye message to peer")
}
// Add a short delay to allow the stream to flush before closing the connection.
// There is still a chance that the peer won't receive the message.
time.Sleep(flushDuration)
disconnectFromPeer()
return
}

View File

@ -458,8 +458,8 @@ func (f *blocksFetcher) requestBlocks(
return nil, err
}
defer func() {
if err := stream.Close(); err != nil {
log.WithError(err).Error("Failed to close stream")
if err := stream.Reset(); err != nil {
log.WithError(err).Errorf("Failed to close stream with protocol %s", stream.Protocol())
}
}()

View File

@ -22,6 +22,11 @@ func (r *Service) sendRecentBeaconBlocksRequest(ctx context.Context, blockRoots
if err != nil {
return err
}
defer func() {
if err := stream.Reset(); err != nil {
log.WithError(err).Errorf("Failed to reset stream with protocol %s", stream.Protocol())
}
}()
for i := 0; i < len(blockRoots); i++ {
blk, err := ReadChunkedBlock(stream, r.p2p)
if err == io.EOF {

View File

@ -51,9 +51,6 @@ func (r *Service) sendGoodByeAndDisconnect(ctx context.Context, code uint64, id
"peer": id,
}).Debug("Could not send goodbye message to peer")
}
// Add a short delay to allow the stream to flush before closing the connection.
// There is still a chance that the peer won't receive the message.
time.Sleep(50 * time.Millisecond)
if err := r.p2p.Disconnect(id); err != nil {
return err
}
@ -68,8 +65,16 @@ func (r *Service) sendGoodByeMessage(ctx context.Context, code uint64, id peer.I
if err != nil {
return err
}
defer func() {
if err := stream.Reset(); err != nil {
log.WithError(err).Errorf("Failed to reset stream with protocol %s", stream.Protocol())
}
}()
log := log.WithField("Reason", goodbyeMessage(code))
log.WithField("peer", stream.Conn().RemotePeer()).Debug("Sending Goodbye message to peer")
// Add a short delay to allow the stream to flush before resetting it.
// There is still a chance that the peer won't receive the message.
time.Sleep(50 * time.Millisecond)
return nil
}

View File

@ -41,8 +41,8 @@ func (r *Service) sendMetaDataRequest(ctx context.Context, id peer.ID) (*pb.Meta
// metadata requests send no payload, so closing the
// stream early leads it to a reset.
defer func() {
if err := stream.Close(); err != nil {
log.WithError(err).Error("Failed to close stream")
if err := stream.Reset(); err != nil {
log.WithError(err).Errorf("Failed to reset stream for protocol %s", stream.Protocol())
}
}()
code, errMsg, err := ReadStatusCode(stream, r.p2p.Encoding())

View File

@ -58,6 +58,11 @@ func (r *Service) sendPingRequest(ctx context.Context, id peer.ID) error {
if err != nil {
return err
}
defer func() {
if err := stream.Reset(); err != nil {
log.WithError(err).Errorf("Failed to reset stream with protocol %s", stream.Protocol())
}
}()
code, errMsg, err := ReadStatusCode(stream, r.p2p.Encoding())
if err != nil {

View File

@ -95,6 +95,11 @@ func (r *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error {
if err != nil {
return err
}
defer func() {
if err := stream.Reset(); err != nil {
log.WithError(err).Errorf("Failed to reset stream with protocol %s", stream.Protocol())
}
}()
code, errMsg, err := ReadStatusCode(stream, r.p2p.Encoding())
if err != nil {