erigon-pulse/cmd/integration/commands/testing.go

199 lines
7.1 KiB
Go
Raw Normal View History

package commands
import (
"context"
"fmt"
"net"
"runtime"
"syscall"
"time"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
//grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
proto_sentry "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry"
proto_testing "github.com/ledgerwatch/erigon-lib/gointerfaces/testing"
"github.com/ledgerwatch/erigon/cmd/utils"
2021-07-29 10:23:23 +00:00
"github.com/ledgerwatch/log/v3"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)
var (
testingAddr string // Address of the gRPC endpoint of the integration testing server
sentryAddr string // Address of the gRPC endpoint of the test sentry (mimicking sentry for the integration tests)
consAddr string // Address of the gRPC endpoint of consensus engine to test
consSpecFile string // Path to the specification file for the consensus engine (ideally, integration test and consensus engine use identical spec files)
)
func init() {
cmdTestCore.Flags().StringVar(&testingAddr, "testing.api.addr", "localhost:9092", "address of the gRPC endpoint of the integration testing server")
cmdTestCore.Flags().StringVar(&sentryAddr, "sentry.api.addr", "localhost:9091", "Address of the gRPC endpoint of the test sentry (mimicking sentry for the integration tests)")
rootCmd.AddCommand(cmdTestCore)
cmdTestCons.Flags().StringVar(&consAddr, "cons.api.addr", "locahost:9093", "Address of the gRPC endpoint of the consensus engine to test")
cmdTestCons.Flags().StringVar(&consSpecFile, "cons.spec.file", "", "Specification file for the consensis engine (ideally, integration test and consensus engine use identical spec files)")
rootCmd.AddCommand(cmdTestCons)
}
var cmdTestCore = &cobra.Command{
Use: "test_core",
Short: "Test server for testing core of Erigon or equivalent component",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, _ := utils.RootContext()
if err := testCore(ctx); err != nil {
log.Error("Error", "err", err)
return err
}
return nil
},
}
var cmdTestCons = &cobra.Command{
Use: "test_cons",
Short: "Integration test for consensus engine",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, _ := utils.RootContext()
if err := testCons(ctx); err != nil {
log.Error("Error", "err", err)
return err
}
return nil
},
}
func testCore(ctx context.Context) error {
if _, err := grpcTestDriverServer(ctx, testingAddr); err != nil {
return fmt.Errorf("start test driver gRPC server: %w", err)
}
if _, err := grpcTestSentryServer(ctx, testingAddr); err != nil {
return fmt.Errorf("start test sentry gRPC server: %w", err)
}
<-ctx.Done()
return nil
}
func grpcTestDriverServer(ctx context.Context, testingAddr string) (*TestDriverServerImpl, error) {
// STARTING GRPC SERVER
log.Info("Starting Test driver server", "on", testingAddr)
listenConfig := net.ListenConfig{
Control: func(network, address string, _ syscall.RawConn) error {
log.Info("Test driver received connection", "via", network, "from", address)
return nil
},
}
lis, err := listenConfig.Listen(ctx, "tcp", sentryAddr)
if err != nil {
return nil, fmt.Errorf("could not create Test driver listener: %w, addr=%s", err, testingAddr)
}
var (
streamInterceptors []grpc.StreamServerInterceptor
unaryInterceptors []grpc.UnaryServerInterceptor
)
//if metrics.Enabled {
// streamInterceptors = append(streamInterceptors, grpc_prometheus.StreamServerInterceptor)
// unaryInterceptors = append(unaryInterceptors, grpc_prometheus.UnaryServerInterceptor)
//}
streamInterceptors = append(streamInterceptors, grpc_recovery.StreamServerInterceptor())
unaryInterceptors = append(unaryInterceptors, grpc_recovery.UnaryServerInterceptor())
var grpcServer *grpc.Server
cpus := uint32(runtime.GOMAXPROCS(-1))
opts := []grpc.ServerOption{
grpc.NumStreamWorkers(cpus), // reduce amount of goroutines
grpc.WriteBufferSize(1024), // reduce buffers to save mem
grpc.ReadBufferSize(1024),
grpc.MaxConcurrentStreams(100), // to force clients reduce concurrency level
grpc.KeepaliveParams(keepalive.ServerParameters{
Time: 10 * time.Minute,
}),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)),
}
grpcServer = grpc.NewServer(opts...)
testDriverServer := NewTestDriverServer(ctx)
proto_testing.RegisterTestDriverServer(grpcServer, testDriverServer)
//if metrics.Enabled {
// grpc_prometheus.Register(grpcServer)
//}
go func() {
if err1 := grpcServer.Serve(lis); err1 != nil {
log.Error("Test driver server fail", "err", err1)
}
}()
return testDriverServer, nil
}
type TestDriverServerImpl struct {
proto_testing.UnimplementedTestDriverServer
}
func NewTestDriverServer(_ context.Context) *TestDriverServerImpl {
return &TestDriverServerImpl{}
}
func grpcTestSentryServer(ctx context.Context, sentryAddr string) (*TestSentryServerImpl, error) {
// STARTING GRPC SERVER
log.Info("Starting Test sentry server", "on", testingAddr)
listenConfig := net.ListenConfig{
Control: func(network, address string, _ syscall.RawConn) error {
log.Info("Test sentry received connection", "via", network, "from", address)
return nil
},
}
lis, err := listenConfig.Listen(ctx, "tcp", sentryAddr)
if err != nil {
return nil, fmt.Errorf("could not create Test sentry listener: %w, addr=%s", err, testingAddr)
}
var (
streamInterceptors []grpc.StreamServerInterceptor
unaryInterceptors []grpc.UnaryServerInterceptor
)
//if metrics.Enabled {
// streamInterceptors = append(streamInterceptors, grpc_prometheus.StreamServerInterceptor)
// unaryInterceptors = append(unaryInterceptors, grpc_prometheus.UnaryServerInterceptor)
//}
streamInterceptors = append(streamInterceptors, grpc_recovery.StreamServerInterceptor())
unaryInterceptors = append(unaryInterceptors, grpc_recovery.UnaryServerInterceptor())
var grpcServer *grpc.Server
cpus := uint32(runtime.GOMAXPROCS(-1))
opts := []grpc.ServerOption{
grpc.NumStreamWorkers(cpus), // reduce amount of goroutines
grpc.WriteBufferSize(1024), // reduce buffers to save mem
grpc.ReadBufferSize(1024),
grpc.MaxConcurrentStreams(100), // to force clients reduce concurrency level
grpc.KeepaliveParams(keepalive.ServerParameters{
Time: 10 * time.Minute,
}),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)),
}
grpcServer = grpc.NewServer(opts...)
testSentryServer := NewTestSentryServer(ctx)
proto_sentry.RegisterSentryServer(grpcServer, testSentryServer)
//if metrics.Enabled {
// grpc_prometheus.Register(grpcServer)
//}
go func() {
if err1 := grpcServer.Serve(lis); err1 != nil {
log.Error("Test driver server fail", "err", err1)
}
}()
return testSentryServer, nil
}
type TestSentryServerImpl struct {
proto_sentry.UnimplementedSentryServer
}
func NewTestSentryServer(_ context.Context) *TestSentryServerImpl {
return &TestSentryServerImpl{}
}
func testCons(ctx context.Context) error {
return nil
}