From c935d9ff654f3b7235870cc97fbab6e307ba99fb Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Thu, 2 Sep 2021 16:37:36 +0700 Subject: [PATCH] Pool: add grpcutils pkg, no spamming logs on disconnect (#54) --- gointerfaces/grpcutil/utils.go | 82 ++++++++++++++++++++++++++++++++++ txpool/fetch.go | 3 -- txpool/pool.go | 9 ++++ 3 files changed, 91 insertions(+), 3 deletions(-) create mode 100644 gointerfaces/grpcutil/utils.go diff --git a/gointerfaces/grpcutil/utils.go b/gointerfaces/grpcutil/utils.go new file mode 100644 index 000000000..2e7f52b7b --- /dev/null +++ b/gointerfaces/grpcutil/utils.go @@ -0,0 +1,82 @@ +package grpcutil + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "io/ioutil" + "time" + + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/keepalive" +) + +func TLS(tlsCACert, tlsCertFile, tlsKeyFile string) (credentials.TransportCredentials, error) { + // load peer cert/key, ca cert + if tlsCACert == "" { + return credentials.NewServerTLSFromFile(tlsCertFile, tlsKeyFile) + } + var caCert []byte + peerCert, err := tls.LoadX509KeyPair(tlsCertFile, tlsKeyFile) + if err != nil { + return nil, fmt.Errorf("load peer cert/key error:%w", err) + } + caCert, err = ioutil.ReadFile(tlsCACert) + if err != nil { + return nil, fmt.Errorf("read ca cert file error:%w", err) + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + return credentials.NewTLS(&tls.Config{ + Certificates: []tls.Certificate{peerCert}, + ClientCAs: caCertPool, + ClientAuth: tls.RequireAndVerifyClientCert, + MinVersion: tls.VersionTLS12, + }), nil +} + +func NewServer(rateLimit uint32, creds *credentials.TransportCredentials) *grpc.Server { + var ( + streamInterceptors []grpc.StreamServerInterceptor + unaryInterceptors []grpc.UnaryServerInterceptor + ) + streamInterceptors = append(streamInterceptors, grpc_recovery.StreamServerInterceptor()) + unaryInterceptors = append(unaryInterceptors, grpc_recovery.UnaryServerInterceptor()) + + //if metrics.Enabled { + // streamInterceptors = append(streamInterceptors, grpc_prometheus.StreamServerInterceptor) + // unaryInterceptors = append(unaryInterceptors, grpc_prometheus.UnaryServerInterceptor) + //} + + var grpcServer *grpc.Server + //cpus := uint32(runtime.GOMAXPROCS(-1)) + opts := []grpc.ServerOption{ + //grpc.NumStreamWorkers(cpus), // reduce amount of goroutines + grpc.WriteBufferSize(1024), // reduce buffers to save mem + grpc.ReadBufferSize(1024), + grpc.MaxConcurrentStreams(rateLimit), // to force clients reduce concurrency level + // Don't drop the connection, settings accordign to this comment on GitHub + // https://github.com/grpc/grpc-go/issues/3171#issuecomment-552796779 + grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: 10 * time.Second, + PermitWithoutStream: true, + }), + grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)), + grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)), + } + if creds == nil { + // no specific opts + } else { + opts = append(opts, grpc.Creds(*creds)) + } + grpcServer = grpc.NewServer(opts...) + + //if metrics.Enabled { + // grpc_prometheus.Register(grpcServer) + //} + + return grpcServer +} diff --git a/txpool/fetch.go b/txpool/fetch.go index dcff4f170..7cf46743d 100644 --- a/txpool/fetch.go +++ b/txpool/fetch.go @@ -180,9 +180,6 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { continue } - if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { - continue - } log.Warn("Handling incoming message", "err", err) } if f.wg != nil { diff --git a/txpool/pool.go b/txpool/pool.go index d60da753d..5e1068da5 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -21,7 +21,9 @@ import ( "container/heap" "context" "encoding/binary" + "errors" "fmt" + "io" "math" "runtime" "sort" @@ -38,6 +40,7 @@ import ( "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/log/v3" "go.uber.org/atomic" + "google.golang.org/grpc/status" ) var ( @@ -1534,6 +1537,12 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs } case <-processRemoteTxsEvery.C: if err := p.processRemoteTxs(ctx); err != nil { + if s, ok := status.FromError(err); ok && retryLater(s.Code()) { + continue + } + if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { + continue + } log.Error("process batch remote txs", "err", err) } case <-commitEvery.C: