Grpc err utils (#334)

* save

* save
This commit is contained in:
Alex Sharov 2022-02-18 09:40:11 +07:00 committed by GitHub
parent d94c4ada78
commit 5be29adf1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 42 additions and 56 deletions

View File

@ -4,7 +4,9 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io"
"io/ioutil"
"time"
@ -122,3 +124,24 @@ func IsRetryLater(err error) bool {
}
return false
}
func IsEndOfStream(err error) bool {
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
return true
}
if s, ok := status.FromError(err); ok {
return s.Code() == codes.Canceled || s.Message() == context.Canceled.Error()
}
return false
}
// ErrIs - like `errors.Is` but for grpc errors
func ErrIs(err, target error) bool {
if errors.Is(err, target) { // direct clients do return Go-style errors
return true
}
if s, ok := status.FromError(err); ok { // remote clients do return GRPC-style errors
return s.Message() == target.Error()
}
return false
}

View File

@ -3,18 +3,15 @@ package remotedb
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/log/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
)
@ -498,16 +495,14 @@ func (tx *remoteTx) closeGrpcStream() {
// try graceful close stream
err := tx.stream.CloseSend()
if err != nil {
s, ok := status.FromError(err)
doLog := !((ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled))
doLog := !grpcutil.IsEndOfStream(err)
if doLog {
log.Warn("couldn't send msg CloseSend to server", "err", err)
}
} else {
_, err = tx.stream.Recv()
if err != nil {
s, ok := status.FromError(err)
doLog := !((ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled))
doLog := !grpcutil.IsEndOfStream(err)
if doLog {
log.Warn("received unexpected error from server after CloseSend", "err", err)
}

View File

@ -18,23 +18,20 @@ package txpool
import (
"context"
"errors"
"fmt"
"io"
"sync"
"time"
"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/erigon-lib/direct"
"github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/gointerfaces/sentry"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/rlp"
"github.com/ledgerwatch/log/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
)
@ -117,11 +114,8 @@ func (f *Fetch) ConnectCore() {
default:
}
if err := f.handleStateChanges(f.ctx, f.stateChangesClient); err != nil {
if s, ok := status.FromError(err); ok && retryLater(s.Code()) {
time.Sleep(time.Second)
continue
}
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
if grpcutil.IsRetryLater(err) || grpcutil.IsEndOfStream(err) {
time.Sleep(3 * time.Second)
continue
}
log.Warn("[txpool.handleStateChanges]", "err", err)
@ -138,11 +132,8 @@ func (f *Fetch) receiveMessageLoop(sentryClient sentry.SentryClient) {
default:
}
if _, err := sentryClient.HandShake(f.ctx, &emptypb.Empty{}, grpc.WaitForReady(true)); err != nil {
if s, ok := status.FromError(err); ok && retryLater(s.Code()) {
time.Sleep(time.Second)
continue
}
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
if grpcutil.IsRetryLater(err) || grpcutil.IsEndOfStream(err) {
time.Sleep(3 * time.Second)
continue
}
// Report error and wait more
@ -151,11 +142,8 @@ func (f *Fetch) receiveMessageLoop(sentryClient sentry.SentryClient) {
}
if err := f.receiveMessage(f.ctx, sentryClient); err != nil {
if s, ok := status.FromError(err); ok && retryLater(s.Code()) {
time.Sleep(time.Second)
continue
}
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
if grpcutil.IsRetryLater(err) || grpcutil.IsEndOfStream(err) {
time.Sleep(3 * time.Second)
continue
}
log.Warn("[txpool.recvMessage]", "err", err)
@ -195,11 +183,8 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl
return nil
}
if err := f.handleInboundMessage(streamCtx, req, sentryClient); err != nil {
if s, ok := status.FromError(err); ok && retryLater(s.Code()) {
time.Sleep(time.Second)
continue
}
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
if grpcutil.IsRetryLater(err) || grpcutil.IsEndOfStream(err) {
time.Sleep(3 * time.Second)
continue
}
@ -357,10 +342,6 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes
return nil
}
func retryLater(code codes.Code) bool {
return code == codes.Unavailable || code == codes.Canceled || code == codes.ResourceExhausted
}
func (f *Fetch) receivePeerLoop(sentryClient sentry.SentryClient) {
for {
select {
@ -369,11 +350,8 @@ func (f *Fetch) receivePeerLoop(sentryClient sentry.SentryClient) {
default:
}
if _, err := sentryClient.HandShake(f.ctx, &emptypb.Empty{}, grpc.WaitForReady(true)); err != nil {
if s, ok := status.FromError(err); ok && retryLater(s.Code()) {
time.Sleep(time.Second)
continue
}
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
if grpcutil.IsRetryLater(err) || grpcutil.IsEndOfStream(err) {
time.Sleep(3 * time.Second)
continue
}
// Report error and wait more
@ -382,11 +360,8 @@ func (f *Fetch) receivePeerLoop(sentryClient sentry.SentryClient) {
continue
}
if err := f.receivePeer(sentryClient); err != nil {
if s, ok := status.FromError(err); ok && retryLater(s.Code()) {
time.Sleep(time.Second)
continue
}
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
if grpcutil.IsRetryLater(err) || grpcutil.IsEndOfStream(err) {
time.Sleep(3 * time.Second)
continue
}

View File

@ -22,9 +22,7 @@ import (
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
"math"
"runtime"
"sort"
@ -40,6 +38,7 @@ import (
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/fixedgas"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
proto_txpool "github.com/ledgerwatch/erigon-lib/gointerfaces/txpool"
"github.com/ledgerwatch/erigon-lib/kv"
@ -47,7 +46,6 @@ import (
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/log/v3"
"go.uber.org/atomic"
"google.golang.org/grpc/status"
)
var (
@ -1300,13 +1298,8 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs
}
if err := p.processRemoteTxs(ctx); err != nil {
if s, ok := status.FromError(err); ok && retryLater(s.Code()) {
continue
}
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
continue
}
if stopped := common.Stopped(ctx.Done()); stopped != nil {
if grpcutil.IsRetryLater(err) || grpcutil.IsEndOfStream(err) {
time.Sleep(3 * time.Second)
continue
}