mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-15 23:38:19 +00:00
75ca6b8c76
* Initial work on integration tests * Delete subtree * Squashed 'interfaces/' content from commit 41a082ba4 git-subtree-dir: interfaces git-subtree-split: 41a082ba4bde38647325eb0416cb1da1b4ca2b12 * Add consensus interfaces * More stuff * comments * Fix compile * Squashed 'interfaces/' changes from 41a082ba4..1b13a42a7 1b13a42a7 Add chainspec to consensus interface git-subtree-dir: interfaces git-subtree-split: 1b13a42a7803f5464722867a71065c27a7ebe8c3 * Squashed 'interfaces/' changes from 1b13a42a7..93a072c4c 93a072c4c Add missing import git-subtree-dir: interfaces git-subtree-split: 93a072c4c099d169322a3a53b95e40203276820b * New consensus interfaces * More on clique * Fix tests * Squashed 'interfaces/' changes from 93a072c4c..62f4ec4b2 62f4ec4b2 Add test service for consensus engine git-subtree-dir: interfaces git-subtree-split: 62f4ec4b263107635ffa3aabd5d634af22e813c6 * Squashed 'interfaces/' changes from 62f4ec4b2..061a63543 061a63543 Fix git-subtree-dir: interfaces git-subtree-split: 061a63543babdeb51ab7e3a96dec56b2485d4389 * Configuring clique engine with toml specs - start * More toml parsing * Constructed rinkeby genesis * Simplify VerifyHeaders functions * Fix lint * Remove concurrent verification tests * Fix lint Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro.local> Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
200 lines
7.1 KiB
Go
200 lines
7.1 KiB
Go
package commands
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"runtime"
|
|
"syscall"
|
|
"time"
|
|
|
|
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
|
|
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
|
|
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
|
"github.com/ledgerwatch/turbo-geth/cmd/utils"
|
|
proto_sentry "github.com/ledgerwatch/turbo-geth/gointerfaces/sentry"
|
|
proto_testing "github.com/ledgerwatch/turbo-geth/gointerfaces/testing"
|
|
"github.com/ledgerwatch/turbo-geth/log"
|
|
"github.com/ledgerwatch/turbo-geth/metrics"
|
|
"github.com/spf13/cobra"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/keepalive"
|
|
)
|
|
|
|
var (
|
|
testingAddr string // Address of the gRPC endpoint of the integration testing server
|
|
sentryAddr string // Address of the gRPC endpoint of the test sentry (mimicking sentry for the integration tests)
|
|
consAddr string // Address of the gRPC endpoint of consensus engine to test
|
|
consSpecFile string // Path to the specification file for the consensus engine (ideally, integration test and consensus engine use identical spec files)
|
|
)
|
|
|
|
func init() {
|
|
cmdTestCore.Flags().StringVar(&testingAddr, "testing.api.addr", "localhost:9092", "address of the gRPC endpoint of the integration testing server")
|
|
cmdTestCore.Flags().StringVar(&sentryAddr, "sentry.api.addr", "localhost:9091", "Address of the gRPC endpoint of the test sentry (mimicking sentry for the integration tests)")
|
|
rootCmd.AddCommand(cmdTestCore)
|
|
|
|
cmdTestCons.Flags().StringVar(&consAddr, "cons.api.addr", "locahost:9093", "Address of the gRPC endpoint of the consensus engine to test")
|
|
cmdTestCons.Flags().StringVar(&consSpecFile, "cons.spec.file", "", "Specification file for the consensis engine (ideally, integration test and consensus engine use identical spec files)")
|
|
rootCmd.AddCommand(cmdTestCons)
|
|
}
|
|
|
|
var cmdTestCore = &cobra.Command{
|
|
Use: "test_core",
|
|
Short: "Test server for testing core of turbo-geth or equivalent component",
|
|
RunE: func(cmd *cobra.Command, args []string) error {
|
|
ctx, _ := utils.RootContext()
|
|
|
|
if err := testCore(ctx); err != nil {
|
|
log.Error("Error", "err", err)
|
|
return err
|
|
}
|
|
return nil
|
|
},
|
|
}
|
|
|
|
var cmdTestCons = &cobra.Command{
|
|
Use: "test_cons",
|
|
Short: "Integration test for consensus engine",
|
|
RunE: func(cmd *cobra.Command, args []string) error {
|
|
ctx, _ := utils.RootContext()
|
|
if err := testCons(ctx); err != nil {
|
|
log.Error("Error", "err", err)
|
|
return err
|
|
}
|
|
return nil
|
|
},
|
|
}
|
|
|
|
func testCore(ctx context.Context) error {
|
|
if _, err := grpcTestDriverServer(ctx, testingAddr); err != nil {
|
|
return fmt.Errorf("start test driver gRPC server: %w", err)
|
|
}
|
|
if _, err := grpcTestSentryServer(ctx, testingAddr); err != nil {
|
|
return fmt.Errorf("start test sentry gRPC server: %w", err)
|
|
}
|
|
<-ctx.Done()
|
|
return nil
|
|
}
|
|
|
|
func grpcTestDriverServer(ctx context.Context, testingAddr string) (*TestDriverServerImpl, error) {
|
|
// STARTING GRPC SERVER
|
|
log.Info("Starting Test driver server", "on", testingAddr)
|
|
listenConfig := net.ListenConfig{
|
|
Control: func(network, address string, _ syscall.RawConn) error {
|
|
log.Info("Test driver received connection", "via", network, "from", address)
|
|
return nil
|
|
},
|
|
}
|
|
lis, err := listenConfig.Listen(ctx, "tcp", sentryAddr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not create Test driver listener: %w, addr=%s", err, testingAddr)
|
|
}
|
|
var (
|
|
streamInterceptors []grpc.StreamServerInterceptor
|
|
unaryInterceptors []grpc.UnaryServerInterceptor
|
|
)
|
|
if metrics.Enabled {
|
|
streamInterceptors = append(streamInterceptors, grpc_prometheus.StreamServerInterceptor)
|
|
unaryInterceptors = append(unaryInterceptors, grpc_prometheus.UnaryServerInterceptor)
|
|
}
|
|
streamInterceptors = append(streamInterceptors, grpc_recovery.StreamServerInterceptor())
|
|
unaryInterceptors = append(unaryInterceptors, grpc_recovery.UnaryServerInterceptor())
|
|
var grpcServer *grpc.Server
|
|
cpus := uint32(runtime.GOMAXPROCS(-1))
|
|
opts := []grpc.ServerOption{
|
|
grpc.NumStreamWorkers(cpus), // reduce amount of goroutines
|
|
grpc.WriteBufferSize(1024), // reduce buffers to save mem
|
|
grpc.ReadBufferSize(1024),
|
|
grpc.MaxConcurrentStreams(100), // to force clients reduce concurrency level
|
|
grpc.KeepaliveParams(keepalive.ServerParameters{
|
|
Time: 10 * time.Minute,
|
|
}),
|
|
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)),
|
|
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)),
|
|
}
|
|
grpcServer = grpc.NewServer(opts...)
|
|
|
|
testDriverServer := NewTestDriverServer(ctx)
|
|
proto_testing.RegisterTestDriverServer(grpcServer, testDriverServer)
|
|
if metrics.Enabled {
|
|
grpc_prometheus.Register(grpcServer)
|
|
}
|
|
go func() {
|
|
if err1 := grpcServer.Serve(lis); err1 != nil {
|
|
log.Error("Test driver server fail", "err", err1)
|
|
}
|
|
}()
|
|
return testDriverServer, nil
|
|
}
|
|
|
|
type TestDriverServerImpl struct {
|
|
proto_testing.UnimplementedTestDriverServer
|
|
}
|
|
|
|
func NewTestDriverServer(_ context.Context) *TestDriverServerImpl {
|
|
return &TestDriverServerImpl{}
|
|
}
|
|
|
|
func grpcTestSentryServer(ctx context.Context, sentryAddr string) (*TestSentryServerImpl, error) {
|
|
// STARTING GRPC SERVER
|
|
log.Info("Starting Test sentry server", "on", testingAddr)
|
|
listenConfig := net.ListenConfig{
|
|
Control: func(network, address string, _ syscall.RawConn) error {
|
|
log.Info("Test sentry received connection", "via", network, "from", address)
|
|
return nil
|
|
},
|
|
}
|
|
lis, err := listenConfig.Listen(ctx, "tcp", sentryAddr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not create Test sentry listener: %w, addr=%s", err, testingAddr)
|
|
}
|
|
var (
|
|
streamInterceptors []grpc.StreamServerInterceptor
|
|
unaryInterceptors []grpc.UnaryServerInterceptor
|
|
)
|
|
if metrics.Enabled {
|
|
streamInterceptors = append(streamInterceptors, grpc_prometheus.StreamServerInterceptor)
|
|
unaryInterceptors = append(unaryInterceptors, grpc_prometheus.UnaryServerInterceptor)
|
|
}
|
|
streamInterceptors = append(streamInterceptors, grpc_recovery.StreamServerInterceptor())
|
|
unaryInterceptors = append(unaryInterceptors, grpc_recovery.UnaryServerInterceptor())
|
|
var grpcServer *grpc.Server
|
|
cpus := uint32(runtime.GOMAXPROCS(-1))
|
|
opts := []grpc.ServerOption{
|
|
grpc.NumStreamWorkers(cpus), // reduce amount of goroutines
|
|
grpc.WriteBufferSize(1024), // reduce buffers to save mem
|
|
grpc.ReadBufferSize(1024),
|
|
grpc.MaxConcurrentStreams(100), // to force clients reduce concurrency level
|
|
grpc.KeepaliveParams(keepalive.ServerParameters{
|
|
Time: 10 * time.Minute,
|
|
}),
|
|
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)),
|
|
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)),
|
|
}
|
|
grpcServer = grpc.NewServer(opts...)
|
|
|
|
testSentryServer := NewTestSentryServer(ctx)
|
|
proto_sentry.RegisterSentryServer(grpcServer, testSentryServer)
|
|
if metrics.Enabled {
|
|
grpc_prometheus.Register(grpcServer)
|
|
}
|
|
go func() {
|
|
if err1 := grpcServer.Serve(lis); err1 != nil {
|
|
log.Error("Test driver server fail", "err", err1)
|
|
}
|
|
}()
|
|
return testSentryServer, nil
|
|
}
|
|
|
|
type TestSentryServerImpl struct {
|
|
proto_sentry.UnimplementedSentryServer
|
|
}
|
|
|
|
func NewTestSentryServer(_ context.Context) *TestSentryServerImpl {
|
|
return &TestSentryServerImpl{}
|
|
}
|
|
|
|
func testCons(ctx context.Context) error {
|
|
return nil
|
|
}
|