2021-04-29 15:14:10 +00:00
package commands
import (
"context"
"fmt"
"net"
"runtime"
"syscall"
"time"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
2021-07-01 21:31:14 +00:00
proto_sentry "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry"
proto_testing "github.com/ledgerwatch/erigon-lib/gointerfaces/testing"
2021-05-20 18:25:53 +00:00
"github.com/ledgerwatch/erigon/cmd/utils"
"github.com/ledgerwatch/erigon/log"
"github.com/ledgerwatch/erigon/metrics"
2021-04-29 15:14:10 +00:00
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)
var (
testingAddr string // Address of the gRPC endpoint of the integration testing server
sentryAddr string // Address of the gRPC endpoint of the test sentry (mimicking sentry for the integration tests)
consAddr string // Address of the gRPC endpoint of consensus engine to test
consSpecFile string // Path to the specification file for the consensus engine (ideally, integration test and consensus engine use identical spec files)
)
func init ( ) {
cmdTestCore . Flags ( ) . StringVar ( & testingAddr , "testing.api.addr" , "localhost:9092" , "address of the gRPC endpoint of the integration testing server" )
cmdTestCore . Flags ( ) . StringVar ( & sentryAddr , "sentry.api.addr" , "localhost:9091" , "Address of the gRPC endpoint of the test sentry (mimicking sentry for the integration tests)" )
rootCmd . AddCommand ( cmdTestCore )
cmdTestCons . Flags ( ) . StringVar ( & consAddr , "cons.api.addr" , "locahost:9093" , "Address of the gRPC endpoint of the consensus engine to test" )
cmdTestCons . Flags ( ) . StringVar ( & consSpecFile , "cons.spec.file" , "" , "Specification file for the consensis engine (ideally, integration test and consensus engine use identical spec files)" )
rootCmd . AddCommand ( cmdTestCons )
}
var cmdTestCore = & cobra . Command {
Use : "test_core" ,
2021-05-26 10:35:39 +00:00
Short : "Test server for testing core of Erigon or equivalent component" ,
2021-04-29 15:14:10 +00:00
RunE : func ( cmd * cobra . Command , args [ ] string ) error {
ctx , _ := utils . RootContext ( )
if err := testCore ( ctx ) ; err != nil {
log . Error ( "Error" , "err" , err )
return err
}
return nil
} ,
}
var cmdTestCons = & cobra . Command {
Use : "test_cons" ,
Short : "Integration test for consensus engine" ,
RunE : func ( cmd * cobra . Command , args [ ] string ) error {
ctx , _ := utils . RootContext ( )
if err := testCons ( ctx ) ; err != nil {
log . Error ( "Error" , "err" , err )
return err
}
return nil
} ,
}
func testCore ( ctx context . Context ) error {
if _ , err := grpcTestDriverServer ( ctx , testingAddr ) ; err != nil {
return fmt . Errorf ( "start test driver gRPC server: %w" , err )
}
if _ , err := grpcTestSentryServer ( ctx , testingAddr ) ; err != nil {
return fmt . Errorf ( "start test sentry gRPC server: %w" , err )
}
<- ctx . Done ( )
return nil
}
func grpcTestDriverServer ( ctx context . Context , testingAddr string ) ( * TestDriverServerImpl , error ) {
// STARTING GRPC SERVER
log . Info ( "Starting Test driver server" , "on" , testingAddr )
listenConfig := net . ListenConfig {
Control : func ( network , address string , _ syscall . RawConn ) error {
log . Info ( "Test driver received connection" , "via" , network , "from" , address )
return nil
} ,
}
lis , err := listenConfig . Listen ( ctx , "tcp" , sentryAddr )
if err != nil {
return nil , fmt . Errorf ( "could not create Test driver listener: %w, addr=%s" , err , testingAddr )
}
var (
streamInterceptors [ ] grpc . StreamServerInterceptor
unaryInterceptors [ ] grpc . UnaryServerInterceptor
)
if metrics . Enabled {
streamInterceptors = append ( streamInterceptors , grpc_prometheus . StreamServerInterceptor )
unaryInterceptors = append ( unaryInterceptors , grpc_prometheus . UnaryServerInterceptor )
}
streamInterceptors = append ( streamInterceptors , grpc_recovery . StreamServerInterceptor ( ) )
unaryInterceptors = append ( unaryInterceptors , grpc_recovery . UnaryServerInterceptor ( ) )
var grpcServer * grpc . Server
cpus := uint32 ( runtime . GOMAXPROCS ( - 1 ) )
opts := [ ] grpc . ServerOption {
grpc . NumStreamWorkers ( cpus ) , // reduce amount of goroutines
grpc . WriteBufferSize ( 1024 ) , // reduce buffers to save mem
grpc . ReadBufferSize ( 1024 ) ,
grpc . MaxConcurrentStreams ( 100 ) , // to force clients reduce concurrency level
grpc . KeepaliveParams ( keepalive . ServerParameters {
Time : 10 * time . Minute ,
} ) ,
grpc . StreamInterceptor ( grpc_middleware . ChainStreamServer ( streamInterceptors ... ) ) ,
grpc . UnaryInterceptor ( grpc_middleware . ChainUnaryServer ( unaryInterceptors ... ) ) ,
}
grpcServer = grpc . NewServer ( opts ... )
testDriverServer := NewTestDriverServer ( ctx )
proto_testing . RegisterTestDriverServer ( grpcServer , testDriverServer )
if metrics . Enabled {
grpc_prometheus . Register ( grpcServer )
}
go func ( ) {
if err1 := grpcServer . Serve ( lis ) ; err1 != nil {
log . Error ( "Test driver server fail" , "err" , err1 )
}
} ( )
return testDriverServer , nil
}
type TestDriverServerImpl struct {
proto_testing . UnimplementedTestDriverServer
}
func NewTestDriverServer ( _ context . Context ) * TestDriverServerImpl {
return & TestDriverServerImpl { }
}
func grpcTestSentryServer ( ctx context . Context , sentryAddr string ) ( * TestSentryServerImpl , error ) {
// STARTING GRPC SERVER
log . Info ( "Starting Test sentry server" , "on" , testingAddr )
listenConfig := net . ListenConfig {
Control : func ( network , address string , _ syscall . RawConn ) error {
log . Info ( "Test sentry received connection" , "via" , network , "from" , address )
return nil
} ,
}
lis , err := listenConfig . Listen ( ctx , "tcp" , sentryAddr )
if err != nil {
return nil , fmt . Errorf ( "could not create Test sentry listener: %w, addr=%s" , err , testingAddr )
}
var (
streamInterceptors [ ] grpc . StreamServerInterceptor
unaryInterceptors [ ] grpc . UnaryServerInterceptor
)
if metrics . Enabled {
streamInterceptors = append ( streamInterceptors , grpc_prometheus . StreamServerInterceptor )
unaryInterceptors = append ( unaryInterceptors , grpc_prometheus . UnaryServerInterceptor )
}
streamInterceptors = append ( streamInterceptors , grpc_recovery . StreamServerInterceptor ( ) )
unaryInterceptors = append ( unaryInterceptors , grpc_recovery . UnaryServerInterceptor ( ) )
var grpcServer * grpc . Server
cpus := uint32 ( runtime . GOMAXPROCS ( - 1 ) )
opts := [ ] grpc . ServerOption {
grpc . NumStreamWorkers ( cpus ) , // reduce amount of goroutines
grpc . WriteBufferSize ( 1024 ) , // reduce buffers to save mem
grpc . ReadBufferSize ( 1024 ) ,
grpc . MaxConcurrentStreams ( 100 ) , // to force clients reduce concurrency level
grpc . KeepaliveParams ( keepalive . ServerParameters {
Time : 10 * time . Minute ,
} ) ,
grpc . StreamInterceptor ( grpc_middleware . ChainStreamServer ( streamInterceptors ... ) ) ,
grpc . UnaryInterceptor ( grpc_middleware . ChainUnaryServer ( unaryInterceptors ... ) ) ,
}
grpcServer = grpc . NewServer ( opts ... )
testSentryServer := NewTestSentryServer ( ctx )
proto_sentry . RegisterSentryServer ( grpcServer , testSentryServer )
if metrics . Enabled {
grpc_prometheus . Register ( grpcServer )
}
go func ( ) {
if err1 := grpcServer . Serve ( lis ) ; err1 != nil {
log . Error ( "Test driver server fail" , "err" , err1 )
}
} ( )
return testSentryServer , nil
}
type TestSentryServerImpl struct {
proto_sentry . UnimplementedSentryServer
}
func NewTestSentryServer ( _ context . Context ) * TestSentryServerImpl {
return & TestSentryServerImpl { }
}
func testCons ( ctx context . Context ) error {
return nil
}