erigon-pulse/cmd/integration/commands/testing.go
ledgerwatch 75ca6b8c76
Initial work on integration tests (#1797)
* Initial work on integration tests

* Delete subtree

* Squashed 'interfaces/' content from commit 41a082ba4

git-subtree-dir: interfaces
git-subtree-split: 41a082ba4bde38647325eb0416cb1da1b4ca2b12

* Add consensus interfaces

* More stuff

* comments

* Fix compile

* Squashed 'interfaces/' changes from 41a082ba4..1b13a42a7

1b13a42a7 Add chainspec to consensus interface

git-subtree-dir: interfaces
git-subtree-split: 1b13a42a7803f5464722867a71065c27a7ebe8c3

* Squashed 'interfaces/' changes from 1b13a42a7..93a072c4c

93a072c4c Add missing import

git-subtree-dir: interfaces
git-subtree-split: 93a072c4c099d169322a3a53b95e40203276820b

* New consensus interfaces

* More on clique

* Fix tests

* Squashed 'interfaces/' changes from 93a072c4c..62f4ec4b2

62f4ec4b2 Add test service for consensus engine

git-subtree-dir: interfaces
git-subtree-split: 62f4ec4b263107635ffa3aabd5d634af22e813c6

* Squashed 'interfaces/' changes from 62f4ec4b2..061a63543

061a63543 Fix

git-subtree-dir: interfaces
git-subtree-split: 061a63543babdeb51ab7e3a96dec56b2485d4389

* Configuring clique engine with toml specs - start

* More toml parsing

* Constructed rinkeby genesis

* Simplify VerifyHeaders functions

* Fix lint

* Remove concurrent verification tests

* Fix lint

Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro.local>
Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
2021-04-29 16:14:10 +01:00

200 lines
7.1 KiB
Go

package commands
import (
"context"
"fmt"
"net"
"runtime"
"syscall"
"time"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/ledgerwatch/turbo-geth/cmd/utils"
proto_sentry "github.com/ledgerwatch/turbo-geth/gointerfaces/sentry"
proto_testing "github.com/ledgerwatch/turbo-geth/gointerfaces/testing"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/metrics"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)
var (
testingAddr string // Address of the gRPC endpoint of the integration testing server
sentryAddr string // Address of the gRPC endpoint of the test sentry (mimicking sentry for the integration tests)
consAddr string // Address of the gRPC endpoint of consensus engine to test
consSpecFile string // Path to the specification file for the consensus engine (ideally, integration test and consensus engine use identical spec files)
)
func init() {
cmdTestCore.Flags().StringVar(&testingAddr, "testing.api.addr", "localhost:9092", "address of the gRPC endpoint of the integration testing server")
cmdTestCore.Flags().StringVar(&sentryAddr, "sentry.api.addr", "localhost:9091", "Address of the gRPC endpoint of the test sentry (mimicking sentry for the integration tests)")
rootCmd.AddCommand(cmdTestCore)
cmdTestCons.Flags().StringVar(&consAddr, "cons.api.addr", "locahost:9093", "Address of the gRPC endpoint of the consensus engine to test")
cmdTestCons.Flags().StringVar(&consSpecFile, "cons.spec.file", "", "Specification file for the consensis engine (ideally, integration test and consensus engine use identical spec files)")
rootCmd.AddCommand(cmdTestCons)
}
var cmdTestCore = &cobra.Command{
Use: "test_core",
Short: "Test server for testing core of turbo-geth or equivalent component",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, _ := utils.RootContext()
if err := testCore(ctx); err != nil {
log.Error("Error", "err", err)
return err
}
return nil
},
}
var cmdTestCons = &cobra.Command{
Use: "test_cons",
Short: "Integration test for consensus engine",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, _ := utils.RootContext()
if err := testCons(ctx); err != nil {
log.Error("Error", "err", err)
return err
}
return nil
},
}
func testCore(ctx context.Context) error {
if _, err := grpcTestDriverServer(ctx, testingAddr); err != nil {
return fmt.Errorf("start test driver gRPC server: %w", err)
}
if _, err := grpcTestSentryServer(ctx, testingAddr); err != nil {
return fmt.Errorf("start test sentry gRPC server: %w", err)
}
<-ctx.Done()
return nil
}
func grpcTestDriverServer(ctx context.Context, testingAddr string) (*TestDriverServerImpl, error) {
// STARTING GRPC SERVER
log.Info("Starting Test driver server", "on", testingAddr)
listenConfig := net.ListenConfig{
Control: func(network, address string, _ syscall.RawConn) error {
log.Info("Test driver received connection", "via", network, "from", address)
return nil
},
}
lis, err := listenConfig.Listen(ctx, "tcp", sentryAddr)
if err != nil {
return nil, fmt.Errorf("could not create Test driver listener: %w, addr=%s", err, testingAddr)
}
var (
streamInterceptors []grpc.StreamServerInterceptor
unaryInterceptors []grpc.UnaryServerInterceptor
)
if metrics.Enabled {
streamInterceptors = append(streamInterceptors, grpc_prometheus.StreamServerInterceptor)
unaryInterceptors = append(unaryInterceptors, grpc_prometheus.UnaryServerInterceptor)
}
streamInterceptors = append(streamInterceptors, grpc_recovery.StreamServerInterceptor())
unaryInterceptors = append(unaryInterceptors, grpc_recovery.UnaryServerInterceptor())
var grpcServer *grpc.Server
cpus := uint32(runtime.GOMAXPROCS(-1))
opts := []grpc.ServerOption{
grpc.NumStreamWorkers(cpus), // reduce amount of goroutines
grpc.WriteBufferSize(1024), // reduce buffers to save mem
grpc.ReadBufferSize(1024),
grpc.MaxConcurrentStreams(100), // to force clients reduce concurrency level
grpc.KeepaliveParams(keepalive.ServerParameters{
Time: 10 * time.Minute,
}),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)),
}
grpcServer = grpc.NewServer(opts...)
testDriverServer := NewTestDriverServer(ctx)
proto_testing.RegisterTestDriverServer(grpcServer, testDriverServer)
if metrics.Enabled {
grpc_prometheus.Register(grpcServer)
}
go func() {
if err1 := grpcServer.Serve(lis); err1 != nil {
log.Error("Test driver server fail", "err", err1)
}
}()
return testDriverServer, nil
}
type TestDriverServerImpl struct {
proto_testing.UnimplementedTestDriverServer
}
func NewTestDriverServer(_ context.Context) *TestDriverServerImpl {
return &TestDriverServerImpl{}
}
func grpcTestSentryServer(ctx context.Context, sentryAddr string) (*TestSentryServerImpl, error) {
// STARTING GRPC SERVER
log.Info("Starting Test sentry server", "on", testingAddr)
listenConfig := net.ListenConfig{
Control: func(network, address string, _ syscall.RawConn) error {
log.Info("Test sentry received connection", "via", network, "from", address)
return nil
},
}
lis, err := listenConfig.Listen(ctx, "tcp", sentryAddr)
if err != nil {
return nil, fmt.Errorf("could not create Test sentry listener: %w, addr=%s", err, testingAddr)
}
var (
streamInterceptors []grpc.StreamServerInterceptor
unaryInterceptors []grpc.UnaryServerInterceptor
)
if metrics.Enabled {
streamInterceptors = append(streamInterceptors, grpc_prometheus.StreamServerInterceptor)
unaryInterceptors = append(unaryInterceptors, grpc_prometheus.UnaryServerInterceptor)
}
streamInterceptors = append(streamInterceptors, grpc_recovery.StreamServerInterceptor())
unaryInterceptors = append(unaryInterceptors, grpc_recovery.UnaryServerInterceptor())
var grpcServer *grpc.Server
cpus := uint32(runtime.GOMAXPROCS(-1))
opts := []grpc.ServerOption{
grpc.NumStreamWorkers(cpus), // reduce amount of goroutines
grpc.WriteBufferSize(1024), // reduce buffers to save mem
grpc.ReadBufferSize(1024),
grpc.MaxConcurrentStreams(100), // to force clients reduce concurrency level
grpc.KeepaliveParams(keepalive.ServerParameters{
Time: 10 * time.Minute,
}),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)),
}
grpcServer = grpc.NewServer(opts...)
testSentryServer := NewTestSentryServer(ctx)
proto_sentry.RegisterSentryServer(grpcServer, testSentryServer)
if metrics.Enabled {
grpc_prometheus.Register(grpcServer)
}
go func() {
if err1 := grpcServer.Serve(lis); err1 != nil {
log.Error("Test driver server fail", "err", err1)
}
}()
return testSentryServer, nil
}
type TestSentryServerImpl struct {
proto_sentry.UnimplementedSentryServer
}
func NewTestSentryServer(_ context.Context) *TestSentryServerImpl {
return &TestSentryServerImpl{}
}
func testCons(ctx context.Context) error {
return nil
}