mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-10 04:51:20 +00:00
d89e5d2603
* ISSUE-2710: Add standard grpc health check to services with grpc server * Go import changed files * Add flags for healthcheck * Add grpc healthcheck option to rpcdaemon * Remove grpc port info if grpc is not enabled * Resolve merge issues
188 lines
5.6 KiB
Go
188 lines
5.6 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"os"
|
|
"path"
|
|
"time"
|
|
|
|
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
|
|
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
|
|
proto_snap "github.com/ledgerwatch/erigon-lib/gointerfaces/snapshotsync"
|
|
"github.com/ledgerwatch/erigon/cmd/utils"
|
|
"github.com/ledgerwatch/erigon/common/paths"
|
|
"github.com/ledgerwatch/erigon/internal/debug"
|
|
"github.com/ledgerwatch/erigon/params"
|
|
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
|
|
"github.com/ledgerwatch/log/v3"
|
|
"github.com/spf13/cobra"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials"
|
|
"google.golang.org/grpc/health"
|
|
"google.golang.org/grpc/health/grpc_health_v1"
|
|
"google.golang.org/grpc/keepalive"
|
|
"google.golang.org/grpc/reflection"
|
|
)
|
|
|
|
var (
|
|
datadir string
|
|
seeding bool
|
|
downloaderApiAddr string
|
|
healthCheck bool
|
|
)
|
|
|
|
func init() {
|
|
flags := append(debug.Flags, utils.MetricFlags...)
|
|
utils.CobraFlags(rootCmd, flags)
|
|
|
|
rootCmd.Flags().StringVar(&datadir, utils.DataDirFlag.Name, paths.DefaultDataDir(), utils.DataDirFlag.Usage)
|
|
if err := rootCmd.MarkFlagDirname(utils.DataDirFlag.Name); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
rootCmd.PersistentFlags().BoolVar(&seeding, "seeding", true, "Seed snapshots")
|
|
rootCmd.Flags().StringVar(&downloaderApiAddr, "downloader.api.addr", "127.0.0.1:9093", "external downloader api network address, for example: 127.0.0.1:9093 serves remote downloader interface")
|
|
rootCmd.Flags().BoolVar(&healthCheck, "healthcheck", false, "Enable grpc health check")
|
|
}
|
|
|
|
func main() {
|
|
ctx, cancel := utils.RootContext()
|
|
defer cancel()
|
|
|
|
if err := rootCmd.ExecuteContext(ctx); err != nil {
|
|
fmt.Println(err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
var rootCmd = &cobra.Command{
|
|
Use: "",
|
|
Short: "snapshot downloader",
|
|
Example: "go run ./cmd/snapshots --datadir <your_datadir> --downloader.api.addr 127.0.0.1:9093",
|
|
PersistentPreRun: func(cmd *cobra.Command, args []string) {
|
|
if err := debug.SetupCobra(cmd); err != nil {
|
|
panic(err)
|
|
}
|
|
},
|
|
PersistentPostRun: func(cmd *cobra.Command, args []string) {
|
|
debug.Exit()
|
|
},
|
|
RunE: func(cmd *cobra.Command, args []string) error {
|
|
ctx := cmd.Context()
|
|
snapshotsDir := path.Join(datadir, "snapshots")
|
|
log.Info("Run snapshot downloader", "addr", downloaderApiAddr, "datadir", datadir, "seeding", seeding)
|
|
|
|
bittorrentServer, err := snapshotsync.NewServer(snapshotsDir, seeding)
|
|
if err != nil {
|
|
return fmt.Errorf("new server: %w", err)
|
|
}
|
|
log.Info("Load")
|
|
err = bittorrentServer.Load()
|
|
if err != nil {
|
|
return fmt.Errorf("load: %w", err)
|
|
}
|
|
|
|
go func() {
|
|
_, err := bittorrentServer.Download(ctx, &proto_snap.DownloadSnapshotRequest{
|
|
NetworkId: params.MainnetChainConfig.ChainID.Uint64(),
|
|
Type: snapshotsync.GetAvailableSnapshotTypes(params.MainnetChainConfig.ChainID.Uint64()),
|
|
})
|
|
if err != nil {
|
|
log.Error("Download failed", "err", err, "networkID", params.MainnetChainConfig.ChainID.Uint64())
|
|
}
|
|
}()
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-cmd.Context().Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
snapshots, err := bittorrentServer.Snapshots(ctx, &proto_snap.SnapshotsRequest{
|
|
NetworkId: params.MainnetChainConfig.ChainID.Uint64(),
|
|
})
|
|
if err != nil {
|
|
log.Error("get snapshots", "err", err)
|
|
time.Sleep(time.Minute)
|
|
continue
|
|
}
|
|
stats := bittorrentServer.Stats(context.Background())
|
|
for _, v := range snapshots.Info {
|
|
log.Info("Snapshot "+v.Type.String(), "%", v.Readiness, "peers", stats[v.Type.String()].ConnectedSeeders)
|
|
}
|
|
time.Sleep(time.Minute)
|
|
}
|
|
}()
|
|
grpcServer, err := StartGrpc(bittorrentServer, downloaderApiAddr, nil, healthCheck)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
<-cmd.Context().Done()
|
|
grpcServer.GracefulStop()
|
|
|
|
return nil
|
|
},
|
|
}
|
|
|
|
func StartGrpc(snServer *snapshotsync.SNDownloaderServer, addr string, creds *credentials.TransportCredentials, healthCheck bool) (*grpc.Server, error) {
|
|
lis, err := net.Listen("tcp", addr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not create listener: %w, addr=%s", err, addr)
|
|
}
|
|
|
|
var (
|
|
streamInterceptors []grpc.StreamServerInterceptor
|
|
unaryInterceptors []grpc.UnaryServerInterceptor
|
|
)
|
|
streamInterceptors = append(streamInterceptors, grpc_recovery.StreamServerInterceptor())
|
|
unaryInterceptors = append(unaryInterceptors, grpc_recovery.UnaryServerInterceptor())
|
|
|
|
//if metrics.Enabled {
|
|
// streamInterceptors = append(streamInterceptors, grpc_prometheus.StreamServerInterceptor)
|
|
// unaryInterceptors = append(unaryInterceptors, grpc_prometheus.UnaryServerInterceptor)
|
|
//}
|
|
|
|
opts := []grpc.ServerOption{
|
|
// https://github.com/grpc/grpc-go/issues/3171#issuecomment-552796779
|
|
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
|
|
MinTime: 10 * time.Second,
|
|
PermitWithoutStream: true,
|
|
}),
|
|
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)),
|
|
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)),
|
|
}
|
|
if creds == nil {
|
|
// no specific opts
|
|
} else {
|
|
opts = append(opts, grpc.Creds(*creds))
|
|
}
|
|
grpcServer := grpc.NewServer(opts...)
|
|
reflection.Register(grpcServer) // Register reflection service on gRPC server.
|
|
if snServer != nil {
|
|
proto_snap.RegisterDownloaderServer(grpcServer, snServer)
|
|
}
|
|
var healthServer *health.Server
|
|
if healthCheck {
|
|
healthServer = health.NewServer()
|
|
grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)
|
|
}
|
|
|
|
//if metrics.Enabled {
|
|
// grpc_prometheus.Register(grpcServer)
|
|
//}
|
|
|
|
go func() {
|
|
if healthCheck {
|
|
defer healthServer.Shutdown()
|
|
}
|
|
if err := grpcServer.Serve(lis); err != nil {
|
|
log.Error("gRPC server stop", "err", err)
|
|
}
|
|
}()
|
|
log.Info("Started gRPC server", "on", addr)
|
|
return grpcServer, nil
|
|
}
|