diff --git a/gointerfaces/grpcutil/utils.go b/gointerfaces/grpcutil/utils.go index c186ae6d8..bbcd41877 100644 --- a/gointerfaces/grpcutil/utils.go +++ b/gointerfaces/grpcutil/utils.go @@ -4,7 +4,9 @@ import ( "context" "crypto/tls" "crypto/x509" + "errors" "fmt" + "io" "io/ioutil" "time" @@ -122,3 +124,24 @@ func IsRetryLater(err error) bool { } return false } + +func IsEndOfStream(err error) bool { + if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { + return true + } + if s, ok := status.FromError(err); ok { + return s.Code() == codes.Canceled || s.Message() == context.Canceled.Error() + } + return false +} + +// ErrIs - like `errors.Is` but for grpc errors +func ErrIs(err, target error) bool { + if errors.Is(err, target) { // direct clients do return Go-style errors + return true + } + if s, ok := status.FromError(err); ok { // remote clients do return GRPC-style errors + return s.Message() == target.Error() + } + return false +} diff --git a/kv/remotedb/kv_remote.go b/kv/remotedb/kv_remote.go index 00951d7e1..5439da7b6 100644 --- a/kv/remotedb/kv_remote.go +++ b/kv/remotedb/kv_remote.go @@ -3,18 +3,15 @@ package remotedb import ( "bytes" "context" - "errors" "fmt" - "io" "github.com/ledgerwatch/erigon-lib/gointerfaces" + "github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil" "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/log/v3" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" ) @@ -498,16 +495,14 @@ func (tx *remoteTx) closeGrpcStream() { // try graceful close stream err := tx.stream.CloseSend() if err != nil { - s, ok := status.FromError(err) - doLog := !((ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled)) + doLog := !grpcutil.IsEndOfStream(err) if doLog { log.Warn("couldn't send msg CloseSend to server", "err", err) } } else { _, err = tx.stream.Recv() if err != nil { - s, ok := status.FromError(err) - doLog := !((ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled)) + doLog := !grpcutil.IsEndOfStream(err) if doLog { log.Warn("received unexpected error from server after CloseSend", "err", err) } diff --git a/txpool/fetch.go b/txpool/fetch.go index 4c6141c3f..65d2ad4e9 100644 --- a/txpool/fetch.go +++ b/txpool/fetch.go @@ -18,23 +18,20 @@ package txpool import ( "context" - "errors" "fmt" - "io" "sync" "time" "github.com/holiman/uint256" "github.com/ledgerwatch/erigon-lib/common/dbg" "github.com/ledgerwatch/erigon-lib/direct" + "github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil" "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/rlp" "github.com/ledgerwatch/log/v3" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" ) @@ -117,11 +114,8 @@ func (f *Fetch) ConnectCore() { default: } if err := f.handleStateChanges(f.ctx, f.stateChangesClient); err != nil { - if s, ok := status.FromError(err); ok && retryLater(s.Code()) { - time.Sleep(time.Second) - continue - } - if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { + if grpcutil.IsRetryLater(err) || grpcutil.IsEndOfStream(err) { + time.Sleep(3 * time.Second) continue } log.Warn("[txpool.handleStateChanges]", "err", err) @@ -138,11 +132,8 @@ func (f *Fetch) receiveMessageLoop(sentryClient sentry.SentryClient) { default: } if _, err := sentryClient.HandShake(f.ctx, &emptypb.Empty{}, grpc.WaitForReady(true)); err != nil { - if s, ok := status.FromError(err); ok && retryLater(s.Code()) { - time.Sleep(time.Second) - continue - } - if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { + if grpcutil.IsRetryLater(err) || grpcutil.IsEndOfStream(err) { + time.Sleep(3 * time.Second) continue } // Report error and wait more @@ -151,11 +142,8 @@ func (f *Fetch) receiveMessageLoop(sentryClient sentry.SentryClient) { } if err := f.receiveMessage(f.ctx, sentryClient); err != nil { - if s, ok := status.FromError(err); ok && retryLater(s.Code()) { - time.Sleep(time.Second) - continue - } - if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { + if grpcutil.IsRetryLater(err) || grpcutil.IsEndOfStream(err) { + time.Sleep(3 * time.Second) continue } log.Warn("[txpool.recvMessage]", "err", err) @@ -195,11 +183,8 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl return nil } if err := f.handleInboundMessage(streamCtx, req, sentryClient); err != nil { - if s, ok := status.FromError(err); ok && retryLater(s.Code()) { - time.Sleep(time.Second) - continue - } - if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { + if grpcutil.IsRetryLater(err) || grpcutil.IsEndOfStream(err) { + time.Sleep(3 * time.Second) continue } @@ -357,10 +342,6 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes return nil } -func retryLater(code codes.Code) bool { - return code == codes.Unavailable || code == codes.Canceled || code == codes.ResourceExhausted -} - func (f *Fetch) receivePeerLoop(sentryClient sentry.SentryClient) { for { select { @@ -369,11 +350,8 @@ func (f *Fetch) receivePeerLoop(sentryClient sentry.SentryClient) { default: } if _, err := sentryClient.HandShake(f.ctx, &emptypb.Empty{}, grpc.WaitForReady(true)); err != nil { - if s, ok := status.FromError(err); ok && retryLater(s.Code()) { - time.Sleep(time.Second) - continue - } - if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { + if grpcutil.IsRetryLater(err) || grpcutil.IsEndOfStream(err) { + time.Sleep(3 * time.Second) continue } // Report error and wait more @@ -382,11 +360,8 @@ func (f *Fetch) receivePeerLoop(sentryClient sentry.SentryClient) { continue } if err := f.receivePeer(sentryClient); err != nil { - if s, ok := status.FromError(err); ok && retryLater(s.Code()) { - time.Sleep(time.Second) - continue - } - if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { + if grpcutil.IsRetryLater(err) || grpcutil.IsEndOfStream(err) { + time.Sleep(3 * time.Second) continue } diff --git a/txpool/pool.go b/txpool/pool.go index 1018f757e..db827a405 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -22,9 +22,7 @@ import ( "context" "encoding/binary" "encoding/json" - "errors" "fmt" - "io" "math" "runtime" "sort" @@ -40,6 +38,7 @@ import ( "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/fixedgas" "github.com/ledgerwatch/erigon-lib/gointerfaces" + "github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil" "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" proto_txpool "github.com/ledgerwatch/erigon-lib/gointerfaces/txpool" "github.com/ledgerwatch/erigon-lib/kv" @@ -47,7 +46,6 @@ import ( "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/log/v3" "go.uber.org/atomic" - "google.golang.org/grpc/status" ) var ( @@ -1300,13 +1298,8 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs } if err := p.processRemoteTxs(ctx); err != nil { - if s, ok := status.FromError(err); ok && retryLater(s.Code()) { - continue - } - if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { - continue - } - if stopped := common.Stopped(ctx.Done()); stopped != nil { + if grpcutil.IsRetryLater(err) || grpcutil.IsEndOfStream(err) { + time.Sleep(3 * time.Second) continue }