mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-22 03:30:37 +00:00
[Diagnostics] Re-enable log support, more reliable reconnection (#7286)
Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
This commit is contained in:
parent
fc7f5e1693
commit
db52bbafb3
@ -17,6 +17,12 @@ import (
|
||||
|
||||
func SetupLogsAccess(ctx *cli.Context) {
|
||||
dirPath := ctx.String(logging.LogDirPathFlag.Name)
|
||||
if dirPath == "" {
|
||||
datadir := ctx.String("datadir")
|
||||
if datadir != "" {
|
||||
dirPath = filepath.Join(datadir, "logs")
|
||||
}
|
||||
}
|
||||
if dirPath == "" {
|
||||
return
|
||||
}
|
||||
|
@ -118,6 +118,8 @@ func connectDiagnostics(cliCtx *cli.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
var successLine = []byte("SUCCESS")
|
||||
|
||||
// tunnel operates the tunnel from diagnostics system to the metrics URL for one http/2 request
|
||||
// needs to be called repeatedly to implement re-connect logic
|
||||
func tunnel(ctx context.Context, tlsConfig *tls.Config, diagnosticsUrl string, metricsURL string) error {
|
||||
@ -127,79 +129,67 @@ func tunnel(ctx context.Context, tlsConfig *tls.Config, diagnosticsUrl string, m
|
||||
defer metricsClient.CloseIdleConnections()
|
||||
// Create a request object to send to the server
|
||||
reader, writer := io.Pipe()
|
||||
req, err := http.NewRequest(http.MethodPost, diagnosticsUrl, reader)
|
||||
ctx1, cancel1 := context.WithCancel(ctx)
|
||||
defer cancel1()
|
||||
req, err := http.NewRequestWithContext(ctx1, http.MethodPost, diagnosticsUrl, reader)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Apply custom headers
|
||||
|
||||
// Create a connection
|
||||
// Apply given context to the sent request
|
||||
req = req.WithContext(ctx)
|
||||
resp, err := diagnosticsClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Create a connection
|
||||
ctx1, cancel1 := context.WithCancel(ctx)
|
||||
defer cancel1()
|
||||
defer resp.Body.Close()
|
||||
defer writer.Close()
|
||||
|
||||
// Apply the connection context on the request context
|
||||
resp.Request = req.WithContext(ctx1)
|
||||
var metricsBuf bytes.Buffer
|
||||
r := bufio.NewReader(resp.Body)
|
||||
firstLine, err := r.ReadBytes('\n')
|
||||
r := bufio.NewReaderSize(resp.Body, 4096)
|
||||
line, isPrefix, err := r.ReadLine()
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("reading first line: %v", err)
|
||||
}
|
||||
if string(firstLine) != "SUCCESS\n" {
|
||||
return fmt.Errorf("connecting to diagnostics system: %s", firstLine)
|
||||
if isPrefix {
|
||||
return fmt.Errorf("request too long")
|
||||
}
|
||||
if !bytes.Equal(line, successLine) {
|
||||
return fmt.Errorf("connecting to diagnostics system, first line [%s]", line)
|
||||
}
|
||||
log.Info("Connected")
|
||||
|
||||
outerLoop:
|
||||
for {
|
||||
var buf [4096]byte
|
||||
var readLen int
|
||||
for readLen < len(buf) && (readLen == 0 || buf[readLen-1] != '\n') {
|
||||
len, err := r.Read(buf[readLen:])
|
||||
if err != nil {
|
||||
log.Error("Connection read", "err", err)
|
||||
break outerLoop
|
||||
}
|
||||
readLen += len
|
||||
}
|
||||
if buf[readLen-1] != '\n' {
|
||||
log.Error("Request too long, circuit breaker")
|
||||
break outerLoop
|
||||
}
|
||||
fmt.Printf("Got request: %s\n", buf[:readLen-1])
|
||||
metricsResponse, err := metricsClient.Get(metricsURL + string(buf[:readLen-1]))
|
||||
if err != nil {
|
||||
log.Error("Problem requesting metrics", "url", metricsURL, "query", string(buf[:readLen-1]), "err", err)
|
||||
break outerLoop
|
||||
}
|
||||
// Buffer the metrics response, and relay it back to the diagnostics system, prepending with the size
|
||||
for line, isPrefix, err = r.ReadLine(); err == nil && !isPrefix; line, isPrefix, err = r.ReadLine() {
|
||||
fmt.Printf("Got request: %s\n", line)
|
||||
metricsBuf.Reset()
|
||||
if _, err := io.Copy(&metricsBuf, metricsResponse.Body); err != nil {
|
||||
metricsResponse, err := metricsClient.Get(metricsURL + string(line))
|
||||
if err != nil {
|
||||
fmt.Fprintf(&metricsBuf, "ERROR: Requesting metrics url [%s], query [%s], err: %v", metricsURL, line, err)
|
||||
} else {
|
||||
// Buffer the metrics response, and relay it back to the diagnostics system, prepending with the size
|
||||
if _, err := io.Copy(&metricsBuf, metricsResponse.Body); err != nil {
|
||||
metricsBuf.Reset()
|
||||
fmt.Fprintf(&metricsBuf, "ERROR: Extracting metrics url [%s], query [%s], err: %v", metricsURL, line, err)
|
||||
}
|
||||
metricsResponse.Body.Close()
|
||||
log.Error("Problem extracting metrics", "url", metricsURL, "query", string(buf[:readLen-1]), "err", err)
|
||||
break outerLoop
|
||||
}
|
||||
metricsResponse.Body.Close()
|
||||
fmt.Printf("Got response:\n%s\n", metricsBuf.Bytes())
|
||||
var sizeBuf [4]byte
|
||||
binary.BigEndian.PutUint32(sizeBuf[:], uint32(metricsBuf.Len()))
|
||||
if _, err := writer.Write(sizeBuf[:]); err != nil {
|
||||
log.Error("Problem relaying metrics prefix len", "url", metricsURL, "query", string(buf[:readLen-1]), "err", err)
|
||||
break outerLoop
|
||||
if _, err = writer.Write(sizeBuf[:]); err != nil {
|
||||
log.Error("Problem relaying metrics prefix len", "url", metricsURL, "query", line, "err", err)
|
||||
break
|
||||
}
|
||||
if _, err := writer.Write(metricsBuf.Bytes()); err != nil {
|
||||
log.Error("Problem relaying", "url", metricsURL, "query", string(buf[:readLen-1]), "err", err)
|
||||
break outerLoop
|
||||
if _, err = writer.Write(metricsBuf.Bytes()); err != nil {
|
||||
log.Error("Problem relaying", "url", metricsURL, "query", line, "err", err)
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
log.Error("Breaking connection", "err", err)
|
||||
}
|
||||
if isPrefix {
|
||||
log.Error("Request too long, circuit breaker")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user