erigon-pulse/turbo/app/support_cmd.go
Dmytro a6b5297b3e
dvovk/tunnelwws (#8745)
- changed communication tunnel to web socket in order to connect to
remote nodes
- changed diagnostics.url flag to diagnostics.addr as now user need to
enter only address and support command will connect to it through
websocket
- changed flag debug.urls to debug.addrs in order to have ability to
change connection type between erigon and support to websocket and don't
change user API
- added auto trying to connect to connect to ws if connection with was
failed
2023-11-16 16:37:29 +00:00

423 lines
11 KiB
Go

package app
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"
"github.com/gorilla/websocket"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon/rpc"
"github.com/ledgerwatch/log/v3"
"github.com/urfave/cli/v2"
)
const (
wsReadBuffer = 1024
wsWriteBuffer = 1024
wsPingInterval = 60 * time.Second
wsPingWriteTimeout = 5 * time.Second
wsMessageSizeLimit = 32 * 1024 * 1024
)
var wsBufferPool = new(sync.Pool)
var (
diagnosticsURLFlag = cli.StringFlag{
Name: "diagnostics.addr",
Usage: "Address of the diagnostics system provided by the support team, include unique session PIN",
}
debugURLsFlag = cli.StringSliceFlag{
Name: "debug.addrs",
Usage: "Comma separated list of URLs to the debug endpoints thats are being diagnosed",
}
insecureFlag = cli.BoolFlag{
Name: "insecure",
Usage: "Allows communication with diagnostics system using self-signed TLS certificates",
}
sessionsFlag = cli.StringSliceFlag{
Name: "diagnostics.sessions",
Usage: "Comma separated list of support session ids to connect to",
}
)
var supportCommand = cli.Command{
Action: MigrateFlags(connectDiagnostics),
Name: "support",
Usage: "Connect Erigon instance to a diagnostics system for support",
ArgsUsage: "--diagnostics.addr <URL for the diagnostics system> --ids <diagnostic session ids allowed to connect> --metrics.urls <http://erigon_host:metrics_port>",
Flags: []cli.Flag{
&debugURLsFlag,
&diagnosticsURLFlag,
&sessionsFlag,
&insecureFlag,
},
//Category: "SUPPORT COMMANDS",
Description: `The support command connects a running Erigon instances to a diagnostics system specified by the URL.`,
}
const Version = 1
func connectDiagnostics(cliCtx *cli.Context) error {
return ConnectDiagnostics(cliCtx, log.Root())
}
func ConnectDiagnostics(cliCtx *cli.Context, logger log.Logger) error {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
debugURLs := []string{}
for _, debugURL := range cliCtx.StringSlice(debugURLsFlag.Name) {
debugURLs = append(debugURLs, "http://"+debugURL)
}
diagnosticsUrl := cliCtx.String(diagnosticsURLFlag.Name) + "/bridge"
// Create TLS configuration with the certificate of the server
insecure := cliCtx.Bool(insecureFlag.Name)
tlsConfig := &tls.Config{
InsecureSkipVerify: insecure, //nolint:gosec
}
sessionIds := cliCtx.StringSlice(sessionsFlag.Name)
// Perform the requests in a loop (reconnect)
for {
if err := tunnel(ctx, cancel, sigs, tlsConfig, diagnosticsUrl, sessionIds, debugURLs, logger); err != nil {
return err
}
select {
case <-ctx.Done():
// Quit immediately if the context was cancelled (by Ctrl-C or TERM signal)
return nil
default:
}
logger.Info("Reconnecting in 1 second...")
timer := time.NewTimer(1 * time.Second)
<-timer.C
}
}
type conn struct {
io.ReadCloser
*io.PipeWriter
}
func (c *conn) Close() error {
c.ReadCloser.Close()
c.PipeWriter.Close()
return nil
}
func (c *conn) SetWriteDeadline(time time.Time) error {
return nil
}
// tunnel operates the tunnel from diagnostics system to the metrics URL for one http/2 request
// needs to be called repeatedly to implement re-connect logic
// tunnel operates the tunnel from diagnostics system to the metrics URL for one http/2 request
// needs to be called repeatedly to implement re-connect logic
func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal, tlsConfig *tls.Config, diagnosticsUrl string, sessionIds []string, debugURLs []string, logger log.Logger) error {
metricsClient := &http.Client{}
defer metricsClient.CloseIdleConnections()
ctx1, cancel1 := context.WithCancel(ctx)
defer cancel1()
go func() {
select {
case <-sigs:
cancel()
case <-ctx1.Done():
}
}()
type enode struct {
Enode string `json:"enode,omitempty"`
Enr string `json:"enr,omitempty"`
Ports *types.NodeInfoPorts `json:"ports,omitempty"`
ListenerAddr string `json:"listener_addr,omitempty"`
}
type info struct {
Id string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Protocols json.RawMessage `json:"protocols,omitempty"`
Enodes []enode `json:"enodes,omitempty"`
}
type node struct {
debugURL string
info *info
}
nodes := map[string]*node{}
for _, debugURL := range debugURLs {
debugResponse, err := metricsClient.Get(debugURL + "/debug/nodeinfo")
if err != nil {
return err
}
if debugResponse.StatusCode != http.StatusOK {
return fmt.Errorf("debug request to %s failed: %s", debugURL, debugResponse.Status)
}
var reply remote.NodesInfoReply
err = json.NewDecoder(debugResponse.Body).Decode(&reply)
debugResponse.Body.Close()
if err != nil {
return err
}
for _, ni := range reply.NodesInfo {
if n, ok := nodes[ni.Id]; ok {
n.info.Enodes = append(n.info.Enodes, enode{
Enode: ni.Enode,
Enr: ni.Enr,
Ports: ni.Ports,
ListenerAddr: ni.ListenerAddr,
})
} else {
nodes[ni.Id] = &node{debugURL, &info{
Id: ni.Id,
Name: ni.Name,
Protocols: ni.Protocols,
Enodes: []enode{{
Enode: ni.Enode,
Enr: ni.Enr,
Ports: ni.Ports,
ListenerAddr: ni.ListenerAddr,
}}}}
}
}
}
dialer := websocket.Dialer{
ReadBufferSize: wsReadBuffer,
WriteBufferSize: wsWriteBuffer,
WriteBufferPool: wsBufferPool,
}
conn, resp, err := dialer.DialContext(ctx1, "wss://"+diagnosticsUrl, nil)
if err != nil {
conn, resp, err = dialer.DialContext(ctx1, "ws://"+diagnosticsUrl, nil)
if err != nil {
return err
}
}
if resp.StatusCode != http.StatusSwitchingProtocols {
return fmt.Errorf("support request to %s failed: %s", diagnosticsUrl, resp.Status)
}
type connectionInfo struct {
Version uint64 `json:"version"`
Sessions []string `json:"sessions"`
Nodes []*info `json:"nodes"`
}
codec := rpc.NewWebsocketCodec(conn)
defer codec.Close()
err = codec.WriteJSON(ctx1, &connectionInfo{
Version: Version,
Sessions: sessionIds,
Nodes: func() (replies []*info) {
for _, node := range nodes {
replies = append(replies, node.info)
}
return replies
}(),
})
if err != nil {
return err
}
logger.Info("Connected")
for {
requests, _, err := codec.ReadBatch()
select {
case <-ctx.Done():
return nil
default:
if err != nil {
logger.Info("Breaking connection", "err", err)
return nil
}
}
var requestId string
if err = json.Unmarshal(requests[0].ID, &requestId); err != nil {
logger.Error("Invalid request id", "err", err)
continue
}
nodeRequest := struct {
NodeId string `json:"nodeId"`
QueryParams url.Values `json:"queryParams"`
}{}
if err = json.Unmarshal(requests[0].Params, &nodeRequest); err != nil {
logger.Error("Invalid node request", "err", err, "id", requestId)
continue
}
type responseError struct {
Code int64 `json:"code"`
Message string `json:"message"`
Data *json.RawMessage `json:"data,omitempty"`
}
type nodeResponse struct {
Id string `json:"id"`
Result json.RawMessage `json:"result,omitempty"`
Error *responseError `json:"error,omitempty"`
Last bool `json:"last,omitempty"`
}
if node, ok := nodes[nodeRequest.NodeId]; ok {
err := func() error {
var queryString string
if len(nodeRequest.QueryParams) > 0 {
queryString = "?" + nodeRequest.QueryParams.Encode()
}
debugURL := node.debugURL + "/debug/" + requests[0].Method + queryString
debugResponse, err := metricsClient.Get(debugURL)
if err != nil {
return codec.WriteJSON(ctx1, &nodeResponse{
Id: requestId,
Error: &responseError{
Code: http.StatusFailedDependency,
Message: fmt.Sprintf("Request for metrics method [%s] failed: %v", debugURL, err),
},
Last: true,
})
}
defer debugResponse.Body.Close()
//Websocket ok message
if resp.StatusCode != http.StatusSwitchingProtocols {
body, _ := io.ReadAll(debugResponse.Body)
return codec.WriteJSON(ctx1, &nodeResponse{
Id: requestId,
Error: &responseError{
Code: int64(resp.StatusCode),
Message: fmt.Sprintf("Request for metrics method [%s] failed: %s", debugURL, string(body)),
},
Last: true,
})
}
buffer := &bytes.Buffer{}
switch debugResponse.Header.Get("Content-Type") {
case "application/json":
if _, err := io.Copy(buffer, debugResponse.Body); err != nil {
return codec.WriteJSON(ctx1, &nodeResponse{
Id: requestId,
Error: &responseError{
Code: http.StatusInternalServerError,
Message: fmt.Sprintf("Request for metrics method [%s] failed: %v", debugURL, err),
},
Last: true,
})
}
case "application/octet-stream":
if _, err := io.Copy(buffer, debugResponse.Body); err != nil {
return codec.WriteJSON(ctx1, &nodeResponse{
Id: requestId,
Error: &responseError{
Code: int64(http.StatusInternalServerError),
Message: fmt.Sprintf("Can't copy metrics response for [%s]: %s", debugURL, err),
},
Last: true,
})
}
offset, _ := strconv.ParseInt(debugResponse.Header.Get("X-Offset"), 10, 64)
size, _ := strconv.ParseInt(debugResponse.Header.Get("X-Size"), 10, 64)
data, err := json.Marshal(struct {
Offset int64 `json:"offset"`
Size int64 `json:"size"`
Data []byte `json:"chunk"`
}{
Offset: offset,
Size: size,
Data: buffer.Bytes(),
})
buffer = bytes.NewBuffer(data)
if err != nil {
return codec.WriteJSON(ctx1, &nodeResponse{
Id: requestId,
Error: &responseError{
Code: int64(http.StatusInternalServerError),
Message: fmt.Sprintf("Can't copy metrics response for [%s]: %s", debugURL, err),
},
Last: true,
})
}
default:
return codec.WriteJSON(ctx1, &nodeResponse{
Id: requestId,
Error: &responseError{
Code: int64(http.StatusInternalServerError),
Message: fmt.Sprintf("Unhandled content type: %s, from: %s", debugResponse.Header.Get("Content-Type"), debugURL),
},
Last: true,
})
}
return codec.WriteJSON(ctx1, &nodeResponse{
Id: requestId,
Result: json.RawMessage(buffer.Bytes()),
Last: true,
})
}()
if err != nil {
return err
}
}
}
}