erigon-pulse/turbo/app/support.go
ledgerwatch 3d904d509e
[Diagnostics] expose command line args via metrics (#7271)
Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
2023-04-06 15:34:06 +00:00

206 lines
5.9 KiB
Go

package app
import (
"bufio"
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/ledgerwatch/log/v3"
"github.com/urfave/cli/v2"
"golang.org/x/net/http2"
)
var (
diagnosticsURLFlag = cli.StringFlag{
Name: "diagnostics.url",
Usage: "URL of the diagnostics system provided by the support team, include unique session PIN",
}
metricsURLsFlag = cli.StringSliceFlag{
Name: "metrics.urls",
Usage: "Comma separated list of URLs to the metrics endpoints thats are being diagnosed",
}
insecureFlag = cli.BoolFlag{
Name: "insecure",
Usage: "Allows communication with diagnostics system using self-signed TLS certificates",
}
)
var supportCommand = cli.Command{
Action: MigrateFlags(connectDiagnostics),
Name: "support",
Usage: "Connect Erigon instance to a diagnostics system for support",
ArgsUsage: "--diagnostics.url <URL for the diagnostics system> --metrics.url <http://erigon_host:metrics_port>",
Flags: []cli.Flag{
&metricsURLsFlag,
&diagnosticsURLFlag,
&insecureFlag,
},
Category: "SUPPORT COMMANDS",
Description: `
The support command connects a running Erigon instances to a diagnostics system specified
by the URL.`,
}
// Conn is client/server symmetric connection.
// It implements the io.Reader/io.Writer/io.Closer to read/write or close the connection to the other side.
// It also has a Send/Recv function to use channels to communicate with the other side.
type Conn struct {
r io.Reader
wc io.WriteCloser
cancel context.CancelFunc
wLock sync.Mutex
rLock sync.Mutex
}
func connectDiagnostics(cliCtx *cli.Context) error {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-sigs
cancel()
}()
metricsURLs := cliCtx.StringSlice(metricsURLsFlag.Name)
metricsURL := metricsURLs[0] // TODO: Generalise
diagnosticsUrl := cliCtx.String(diagnosticsURLFlag.Name)
// Create a pool with the server certificate since it is not signed
// by a known CA
certPool := x509.NewCertPool()
srvCert, err := ioutil.ReadFile("diagnostics.crt")
if err != nil {
return fmt.Errorf("reading server certificate: %v", err)
}
caCert, err := ioutil.ReadFile("CA-cert.pem")
if err != nil {
return fmt.Errorf("reading server certificate: %v", err)
}
certPool.AppendCertsFromPEM(srvCert)
certPool.AppendCertsFromPEM(caCert)
// Create TLS configuration with the certificate of the server
insecure := cliCtx.Bool(insecureFlag.Name)
tlsConfig := &tls.Config{
RootCAs: certPool,
InsecureSkipVerify: insecure, //nolint:gosec
}
// Perform the requests in a loop (reconnect)
for {
if err := tunnel(ctx, tlsConfig, diagnosticsUrl, metricsURL); err != nil {
return err
}
log.Info("Reconnecting in 1 second...")
timer := time.NewTimer(1 * time.Second)
select {
case <-timer.C:
case <-ctx.Done():
// Quit immediately if the context was cancelled (by Ctrl-C or TERM signal)
return nil
}
}
}
// tunnel operates the tunnel from diagnostics system to the metrics URL for one http/2 request
// needs to be called repeatedly to implement re-connect logic
func tunnel(ctx context.Context, tlsConfig *tls.Config, diagnosticsUrl string, metricsURL string) error {
diagnosticsClient := &http.Client{Transport: &http2.Transport{TLSClientConfig: tlsConfig}}
defer diagnosticsClient.CloseIdleConnections()
metricsClient := &http.Client{}
defer metricsClient.CloseIdleConnections()
// Create a request object to send to the server
reader, writer := io.Pipe()
req, err := http.NewRequest(http.MethodPost, diagnosticsUrl, reader)
if err != nil {
return err
}
// Apply custom headers
// Apply given context to the sent request
req = req.WithContext(ctx)
resp, err := diagnosticsClient.Do(req)
if err != nil {
return err
}
// Create a connection
ctx1, cancel1 := context.WithCancel(ctx)
defer cancel1()
defer resp.Body.Close()
defer writer.Close()
// Apply the connection context on the request context
resp.Request = req.WithContext(ctx1)
var metricsBuf bytes.Buffer
r := bufio.NewReader(resp.Body)
firstLine, err := r.ReadBytes('\n')
if err != nil {
return err
}
if string(firstLine) != "SUCCESS\n" {
return fmt.Errorf("connecting to diagnostics system: %s", firstLine)
}
log.Info("Connected")
outerLoop:
for {
var buf [4096]byte
var readLen int
for readLen < len(buf) && (readLen == 0 || buf[readLen-1] != '\n') {
len, err := r.Read(buf[readLen:])
if err != nil {
log.Error("Connection read", "err", err)
break outerLoop
}
readLen += len
}
if buf[readLen-1] != '\n' {
log.Error("Request too long, circuit breaker")
break outerLoop
}
fmt.Printf("Got request: %s\n", buf[:readLen-1])
metricsResponse, err := metricsClient.Get(metricsURL + string(buf[:readLen-1]))
if err != nil {
log.Error("Problem requesting metrics", "url", metricsURL, "query", string(buf[:readLen-1]), "err", err)
break outerLoop
}
// Buffer the metrics response, and relay it back to the diagnostics system, prepending with the size
metricsBuf.Reset()
if _, err := io.Copy(&metricsBuf, metricsResponse.Body); err != nil {
metricsResponse.Body.Close()
log.Error("Problem extracting metrics", "url", metricsURL, "query", string(buf[:readLen-1]), "err", err)
break outerLoop
}
metricsResponse.Body.Close()
fmt.Printf("Got response:\n%s\n", metricsBuf.Bytes())
var sizeBuf [4]byte
binary.BigEndian.PutUint32(sizeBuf[:], uint32(metricsBuf.Len()))
if _, err := writer.Write(sizeBuf[:]); err != nil {
log.Error("Problem relaying metrics prefix len", "url", metricsURL, "query", string(buf[:readLen-1]), "err", err)
break outerLoop
}
if _, err := writer.Write(metricsBuf.Bytes()); err != nil {
log.Error("Problem relaying", "url", metricsURL, "query", string(buf[:readLen-1]), "err", err)
break outerLoop
}
}
return nil
}