Pool: add grpcutils pkg, no spamming logs on disconnect (#54)

This commit is contained in:
Alex Sharov 2021-09-02 16:37:36 +07:00 committed by GitHub
parent b1435d3679
commit c935d9ff65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 91 additions and 3 deletions

View File

@ -0,0 +1,82 @@
package grpcutil
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"time"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
)
func TLS(tlsCACert, tlsCertFile, tlsKeyFile string) (credentials.TransportCredentials, error) {
// load peer cert/key, ca cert
if tlsCACert == "" {
return credentials.NewServerTLSFromFile(tlsCertFile, tlsKeyFile)
}
var caCert []byte
peerCert, err := tls.LoadX509KeyPair(tlsCertFile, tlsKeyFile)
if err != nil {
return nil, fmt.Errorf("load peer cert/key error:%w", err)
}
caCert, err = ioutil.ReadFile(tlsCACert)
if err != nil {
return nil, fmt.Errorf("read ca cert file error:%w", err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
return credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{peerCert},
ClientCAs: caCertPool,
ClientAuth: tls.RequireAndVerifyClientCert,
MinVersion: tls.VersionTLS12,
}), nil
}
func NewServer(rateLimit uint32, creds *credentials.TransportCredentials) *grpc.Server {
var (
streamInterceptors []grpc.StreamServerInterceptor
unaryInterceptors []grpc.UnaryServerInterceptor
)
streamInterceptors = append(streamInterceptors, grpc_recovery.StreamServerInterceptor())
unaryInterceptors = append(unaryInterceptors, grpc_recovery.UnaryServerInterceptor())
//if metrics.Enabled {
// streamInterceptors = append(streamInterceptors, grpc_prometheus.StreamServerInterceptor)
// unaryInterceptors = append(unaryInterceptors, grpc_prometheus.UnaryServerInterceptor)
//}
var grpcServer *grpc.Server
//cpus := uint32(runtime.GOMAXPROCS(-1))
opts := []grpc.ServerOption{
//grpc.NumStreamWorkers(cpus), // reduce amount of goroutines
grpc.WriteBufferSize(1024), // reduce buffers to save mem
grpc.ReadBufferSize(1024),
grpc.MaxConcurrentStreams(rateLimit), // to force clients reduce concurrency level
// Don't drop the connection, settings accordign to this comment on GitHub
// https://github.com/grpc/grpc-go/issues/3171#issuecomment-552796779
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 10 * time.Second,
PermitWithoutStream: true,
}),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)),
}
if creds == nil {
// no specific opts
} else {
opts = append(opts, grpc.Creds(*creds))
}
grpcServer = grpc.NewServer(opts...)
//if metrics.Enabled {
// grpc_prometheus.Register(grpcServer)
//}
return grpcServer
}

View File

@ -180,9 +180,6 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
continue continue
} }
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
continue
}
log.Warn("Handling incoming message", "err", err) log.Warn("Handling incoming message", "err", err)
} }
if f.wg != nil { if f.wg != nil {

View File

@ -21,7 +21,9 @@ import (
"container/heap" "container/heap"
"context" "context"
"encoding/binary" "encoding/binary"
"errors"
"fmt" "fmt"
"io"
"math" "math"
"runtime" "runtime"
"sort" "sort"
@ -38,6 +40,7 @@ import (
"github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/log/v3" "github.com/ledgerwatch/log/v3"
"go.uber.org/atomic" "go.uber.org/atomic"
"google.golang.org/grpc/status"
) )
var ( var (
@ -1534,6 +1537,12 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs
} }
case <-processRemoteTxsEvery.C: case <-processRemoteTxsEvery.C:
if err := p.processRemoteTxs(ctx); err != nil { if err := p.processRemoteTxs(ctx); err != nil {
if s, ok := status.FromError(err); ok && retryLater(s.Code()) {
continue
}
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
continue
}
log.Error("process batch remote txs", "err", err) log.Error("process batch remote txs", "err", err)
} }
case <-commitEvery.C: case <-commitEvery.C: