Diag session routing (#8232)

Code to support react based UI for diagnostics:

* pprof, prometheus and diagnistics rationalized to use a single router
(i.e. they can all run in the same port)
* support_cmd updated to support node routing (was only first node)
* Multi content support in router tunnel (application/octet-stream &
appliaction/json)
* Routing requests changed from using http forms to rest + query params
* REST query requests can now be made against erigon base port and
diagnostics with the same url format/params

---------

Co-authored-by: dvovk <vovk.dimon@gmail.com>
Co-authored-by: Mark Holt <mark@disributed.vision>
This commit is contained in:
Mark Holt 2023-09-25 16:24:17 +01:00 committed by GitHub
parent 5641b52be0
commit f794438335
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 947 additions and 405 deletions

View File

@ -19,13 +19,14 @@ package main
import (
"encoding/json"
"fmt"
"github.com/ledgerwatch/erigon-lib/common"
"io"
"os"
"path/filepath"
"regexp"
"strings"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/log/v3"
"github.com/urfave/cli/v2"

View File

@ -60,7 +60,7 @@ func runCaplinNode(cliCtx *cli.Context) error {
if err != nil {
log.Error("[Phase1] Could not initialize caplin", "err", err)
}
if _, err := debug.Setup(cliCtx, true /* root logger */); err != nil {
if _, _, err := debug.Setup(cliCtx, true /* root logger */); err != nil {
return err
}
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(cfg.LogLvl), log.StderrHandler))

View File

@ -4,12 +4,14 @@ import (
context "context"
"fmt"
"math/big"
"net/http"
"sync"
"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/erigon/cmd/devnet/accounts"
"github.com/ledgerwatch/erigon/cmd/devnet/args"
"github.com/ledgerwatch/erigon/cmd/devnet/requests"
"github.com/ledgerwatch/erigon/diagnostics"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/node/nodecfg"
"github.com/ledgerwatch/erigon/params"
@ -129,6 +131,7 @@ func (n *node) ChainID() *big.Int {
func (n *node) run(ctx *cli.Context) error {
var logger log.Logger
var err error
var metricsMux *http.ServeMux
defer n.done()
defer func() {
@ -141,7 +144,7 @@ func (n *node) run(ctx *cli.Context) error {
n.Unlock()
}()
if logger, err = debug.Setup(ctx, false /* rootLogger */); err != nil {
if logger, metricsMux, err = debug.Setup(ctx, false /* rootLogger */); err != nil {
return err
}
@ -166,6 +169,10 @@ func (n *node) run(ctx *cli.Context) error {
n.ethNode, err = enode.New(n.nodeCfg, n.ethCfg, logger)
if metricsMux != nil {
diagnostics.Setup(ctx, metricsMux, n.ethNode)
}
n.Lock()
if n.startErr != nil {
n.startErr <- err

View File

@ -107,7 +107,7 @@ var (
}
metricsURLsFlag = cli.StringSliceFlag{
Name: "metrics.urls",
Name: "debug.urls",
Usage: "internal flag",
}
@ -199,7 +199,7 @@ func action(ctx *cli.Context) error {
if metrics {
// TODO should get this from the network as once we have multiple nodes we'll need to iterate the
// nodes and create a series of urls - for the moment only one is supported
ctx.Set("metrics.urls", fmt.Sprintf("http://localhost:%d/debug/metrics/", ctx.Int("metrics.port")))
ctx.Set("metrics.urls", fmt.Sprintf("http://localhost:%d/debug/", ctx.Int("metrics.port")))
}
// start the network with each node in a go routine

View File

@ -3,12 +3,14 @@ package main
import (
"errors"
"fmt"
"net/http"
"os"
"path/filepath"
"reflect"
"strings"
"github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/erigon/diagnostics"
"github.com/ledgerwatch/erigon/metrics"
"github.com/ledgerwatch/log/v3"
"github.com/pelletier/go-toml"
@ -55,7 +57,9 @@ func runErigon(cliCtx *cli.Context) error {
var logger log.Logger
var err error
if logger, err = debug.Setup(cliCtx, true /* root logger */); err != nil {
var metricsMux *http.ServeMux
if logger, metricsMux, err = debug.Setup(cliCtx, true /* root logger */); err != nil {
return err
}
@ -73,6 +77,11 @@ func runErigon(cliCtx *cli.Context) error {
log.Error("Erigon startup", "err", err)
return err
}
if metricsMux != nil {
diagnostics.Setup(cliCtx, metricsMux, ethNode)
}
err = ethNode.Serve()
if err != nil {
log.Error("error while serving an Erigon node", "err", err)

View File

@ -813,10 +813,27 @@ var (
Usage: "Max allowed page size for search methods",
Value: 25,
}
DiagnosticsURLFlag = cli.StringFlag{
Name: "diagnostics.url",
Usage: "URL of the diagnostics system provided by the support team",
}
DiagnosticsInsecureFlag = cli.BoolFlag{
Name: "diagnostics.insecure",
Usage: "Allows communication with diagnostics system using self-signed TLS certificates",
}
DiagnosticsSessionsFlag = cli.StringSliceFlag{
Name: "diagnostics.ids",
Usage: "Comma separated list of support session ids to connect to",
}
)
var MetricFlags = []cli.Flag{&MetricsEnabledFlag, &MetricsHTTPFlag, &MetricsPortFlag}
var DiagnosticsFlags = []cli.Flag{&DiagnosticsURLFlag, &DiagnosticsURLFlag, &DiagnosticsSessionsFlag}
// setNodeKey loads a node key from command line flags if provided,
// otherwise it tries to load it from datadir,
// otherwise it generates a new key in datadir.

View File

@ -305,6 +305,10 @@ func (b *bigValue) Set(s string) error {
return nil
}
func (b *bigValue) Get() any {
return b.String()
}
// GlobalBig returns the value of a BigFlag from the global flag set.
func GlobalBig(ctx *cli.Context, name string) *big.Int {
val := ctx.Generic(name)

View File

@ -10,7 +10,7 @@ import (
)
func SetupBlockBodyDownload(metricsMux *http.ServeMux) {
metricsMux.HandleFunc("/debug/metrics/block_body_download", func(w http.ResponseWriter, r *http.Request) {
metricsMux.HandleFunc("/block_body_download", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
writeBlockBodyDownload(w, r)
})

View File

@ -1,22 +1,29 @@
package diagnostics
import (
"fmt"
"io"
"net/http"
"os"
"strconv"
"strings"
)
func SetupCmdLineAccess(metricsMux *http.ServeMux) {
metricsMux.HandleFunc("/debug/metrics/cmdline", func(w http.ResponseWriter, r *http.Request) {
metricsMux.HandleFunc("/cmdline", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
writeCmdLine(w)
w.Header().Set("Content-Type", "application/json")
var space []byte
w.Write([]byte{'"'})
for _, arg := range os.Args {
if len(space) > 0 {
w.Write(space)
} else {
space = []byte(" ")
}
w.Write([]byte(strings.Trim(strconv.Quote(arg), `"`)))
}
w.Write([]byte{'"'})
})
}
func writeCmdLine(w io.Writer) {
fmt.Fprintf(w, "SUCCESS\n")
for _, arg := range os.Args {
fmt.Fprintf(w, "%s\n", arg)
}
}

258
diagnostics/db.go Normal file
View File

@ -0,0 +1,258 @@
package diagnostics
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"path/filepath"
"strings"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/erigon/common/paths"
"github.com/urfave/cli/v2"
)
func SetupDbAccess(ctx *cli.Context, metricsMux *http.ServeMux) {
var dataDir string
if ctx.IsSet("datadir") {
dataDir = ctx.String("datadir")
} else {
dataDir = paths.DataDirForNetwork(paths.DefaultDataDir(), ctx.String("chain"))
}
metricsMux.HandleFunc("/dbs", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "application/json")
writeDbList(w, dataDir)
})
metricsMux.HandleFunc("/dbs/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
urlPath := r.URL.Path
if !strings.HasPrefix(urlPath, "/dbs/") {
http.Error(w, fmt.Sprintf(`Unexpected path prefix: expected: "/dbs/..." got: "%s"`, urlPath), http.StatusNotFound)
return
}
pathParts := strings.Split(urlPath[5:], "/")
if len(pathParts) < 1 {
http.Error(w, fmt.Sprintf(`Unexpected path len: expected: "{db}/tables" got: "%s"`, urlPath), http.StatusNotFound)
return
}
var dbname string
var sep string
for len(pathParts) > 0 {
dbname += sep + pathParts[0]
if sep == "" {
sep = "/"
}
pathParts = pathParts[1:]
if pathParts[0] == "tables" {
break
}
if len(pathParts) < 2 {
http.Error(w, fmt.Sprintf(`Unexpected path part: expected: "tables" got: "%s"`, pathParts[0]), http.StatusNotFound)
return
}
}
switch len(pathParts) {
case 1:
writeDbTables(w, r, dataDir, dbname)
case 2:
offset, err := offsetValue(r.URL.Query())
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
limit, err := limitValue(r.URL.Query(), 0)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
writeDbRead(w, r, dataDir, dbname, pathParts[1], nil, offset, limit)
case 3:
key, err := base64.URLEncoding.DecodeString(pathParts[2])
if err != nil {
http.Error(w, fmt.Sprintf(`key "%s" argument should be base64url encoded: %v`, pathParts[2], err), http.StatusBadRequest)
return
}
offset, err := offsetValue(r.URL.Query())
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
limit, err := limitValue(r.URL.Query(), 0)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
writeDbRead(w, r, dataDir, dbname, pathParts[1], key, offset, limit)
default:
http.Error(w, fmt.Sprintf(`Unexpected path parts: "%s"`, strings.Join(pathParts[2:], "/")), http.StatusNotFound)
}
})
}
func writeDbList(w http.ResponseWriter, dataDir string) {
w.Header().Set("Content-Type", "application/json")
m := mdbx.PathDbMap()
dbs := make([]string, 0, len(m))
for path := range m {
dbs = append(dbs, strings.ReplaceAll(strings.TrimPrefix(path, dataDir)[1:], "\\", "/"))
}
json.NewEncoder(w).Encode(dbs)
}
func writeDbTables(w http.ResponseWriter, r *http.Request, dataDir string, dbname string) {
m := mdbx.PathDbMap()
db, ok := m[filepath.Join(dataDir, dbname)]
if !ok {
http.Error(w, fmt.Sprintf(`"%s" is not in the list of allowed dbs`, dbname), http.StatusNotFound)
return
}
type table struct {
Name string `json:"name"`
Count uint64 `json:"count"`
Size uint64 `json:"size"`
}
var tables []table
if err := db.View(context.Background(), func(tx kv.Tx) error {
var e error
buckets, e := tx.ListBuckets()
if e != nil {
return e
}
for _, bucket := range buckets {
size, e := tx.BucketSize(bucket)
if e != nil {
return e
}
var count uint64
if e := db.View(context.Background(), func(tx kv.Tx) error {
c, e := tx.Cursor(bucket)
if e != nil {
return e
}
defer c.Close()
count, e = c.Count()
if e != nil {
return e
}
return nil
}); e != nil {
return e
}
tables = append(tables, table{bucket, count, size})
}
return nil
}); err != nil {
http.Error(w, fmt.Sprintf(`failed to list tables in "%s": %v`, dbname, err), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(tables)
}
func writeDbRead(w http.ResponseWriter, r *http.Request, dataDir string, dbname string, table string, key []byte, offset int64, limit int64) {
m := mdbx.PathDbMap()
db, ok := m[filepath.Join(dataDir, dbname)]
if !ok {
fmt.Fprintf(w, "ERROR: path %s is not in the list of allowed paths", dbname)
return
}
var results [][2][]byte
var count uint64
if err := db.View(context.Background(), func(tx kv.Tx) error {
c, e := tx.Cursor(table)
if e != nil {
return e
}
defer c.Close()
count, e = c.Count()
if e != nil {
return e
}
var k, v []byte
if key == nil {
if k, v, e = c.First(); e != nil {
return e
}
} else if k, v, e = c.Seek(key); e != nil {
return e
}
var pos int64
for e == nil && k != nil && pos < offset {
//TODO - not sure if this is a good idea it may be slooooow
k, _, e = c.Next()
pos++
}
for e == nil && k != nil && (limit == 0 || int64(len(results)) < limit) {
results = append(results, [2][]byte{k, v})
k, v, e = c.Next()
}
return nil
}); err != nil {
fmt.Fprintf(w, "ERROR: reading table %s in %s: %v\n", table, dbname, err)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write([]byte("{"))
fmt.Fprintf(w, `"offset":%d`, offset)
if limit > 0 {
fmt.Fprintf(w, `,"limit":%d`, limit)
}
fmt.Fprintf(w, `,"count":%d`, count)
if len(results) > 0 {
var comma string
w.Write([]byte(`,"results":{`))
for _, result := range results {
fmt.Fprintf(w, `%s"%s":"%s"`, comma, base64.URLEncoding.EncodeToString(result[0]), base64.URLEncoding.EncodeToString(result[1]))
if comma == "" {
comma = ","
}
}
w.Write([]byte("}"))
}
w.Write([]byte("}"))
}

View File

@ -1,139 +0,0 @@
package diagnostics
import (
"context"
"encoding/hex"
"fmt"
"io"
"net/http"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/erigon/common/paths"
"github.com/urfave/cli/v2"
)
func SetupDbAccess(ctx *cli.Context, metricsMux *http.ServeMux) {
var dataDir string
if ctx.IsSet("datadir") {
dataDir = ctx.String("datadir")
} else {
dataDir = paths.DataDirForNetwork(paths.DefaultDataDir(), ctx.String("chain"))
}
metricsMux.HandleFunc("/debug/metrics/db/list", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
writeDbList(w, dataDir)
})
metricsMux.HandleFunc("/debug/metrics/db/tables", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
writeDbTables(w, r, dataDir)
})
metricsMux.HandleFunc("/debug/metrics/db/read", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
writeDbRead(w, r, dataDir)
})
}
func writeDbList(w io.Writer, dataDir string) {
fmt.Fprintf(w, "SUCCESS\n")
m := mdbx.PathDbMap()
for path := range m {
fmt.Fprintf(w, "%s\n", path)
}
}
func writeDbTables(w io.Writer, r *http.Request, dataDir string) {
if err := r.ParseForm(); err != nil {
fmt.Fprintf(w, "ERROR: parsing arguments: %v\n", err)
return
}
path := r.Form.Get("path")
if path == "" {
fmt.Fprintf(w, "ERROR: path argument is required - specify the relative path to an MDBX database directory")
return
}
m := mdbx.PathDbMap()
db, ok := m[path]
if !ok {
fmt.Fprintf(w, "ERROR: path %s is not in the list of allowed paths", path)
return
}
var tables []string
if err := db.View(context.Background(), func(tx kv.Tx) error {
var e error
tables, e = tx.ListBuckets()
if e != nil {
return e
}
return nil
}); err != nil {
fmt.Fprintf(w, "ERROR: listing tables in %s: %v\n", path, err)
return
}
fmt.Fprintf(w, "SUCCESS\n")
for _, table := range tables {
fmt.Fprintf(w, "%s\n", table)
}
}
func writeDbRead(w io.Writer, r *http.Request, dataDir string) {
if err := r.ParseForm(); err != nil {
fmt.Fprintf(w, "ERROR: parsing arguments: %v\n", err)
return
}
path := r.Form.Get("path")
if path == "" {
fmt.Fprintf(w, "ERROR: path argument is required - specify the relative path to an MDBX database directory")
return
}
m := mdbx.PathDbMap()
db, ok := m[path]
if !ok {
fmt.Fprintf(w, "ERROR: path %s is not in the list of allowed paths", path)
return
}
table := r.Form.Get("table")
if table == "" {
fmt.Fprintf(w, "ERROR: table argument is required - specify the table to read from")
return
}
var key []byte
var err error
keyHex := r.Form.Get("key")
if keyHex != "" {
if key, err = hex.DecodeString(keyHex); err != nil {
fmt.Fprintf(w, "ERROR: key [%s] argument may only contain hexadecimal digits: %v\n", keyHex, err)
return
}
}
var results []string
if err := db.View(context.Background(), func(tx kv.Tx) error {
c, e := tx.Cursor(table)
if e != nil {
return e
}
defer c.Close()
var k, v []byte
if key == nil {
if k, v, e = c.First(); err != nil {
return e
}
} else if k, v, e = c.Seek(key); e != nil {
return e
}
count := 0
for e == nil && k != nil && count < 256 {
results = append(results, fmt.Sprintf("%x | %x", k, v))
count++
k, v, e = c.Next()
}
return nil
}); err != nil {
fmt.Fprintf(w, "ERROR: reading table %s in %s: %v\n", table, path, err)
return
}
fmt.Fprintf(w, "SUCCESS\n")
for _, result := range results {
fmt.Fprintf(w, "%s\n", result)
}
}

View File

@ -1,23 +1,55 @@
package diagnostics
import (
"fmt"
"io"
"encoding/json"
"net/http"
"github.com/urfave/cli/v2"
)
func SetupFlagsAccess(ctx *cli.Context, metricsMux *http.ServeMux) {
metricsMux.HandleFunc("/debug/metrics/flags", func(w http.ResponseWriter, r *http.Request) {
metricsMux.HandleFunc("/flags", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
writeFlags(w, ctx)
w.Header().Set("Content-Type", "application/json")
flags := map[string]interface{}{}
ctxFlags := map[string]struct{}{}
for _, flagName := range ctx.FlagNames() {
ctxFlags[flagName] = struct{}{}
}
for _, flag := range ctx.App.Flags {
name := flag.Names()[0]
value := ctx.Value(name)
switch typed := value.(type) {
case string:
if typed == "" {
continue
}
case cli.UintSlice:
value = typed.Value()
}
var usage string
if docFlag, ok := flag.(cli.DocGenerationFlag); ok {
usage = docFlag.GetUsage()
}
_, inCtx := ctxFlags[name]
flags[name] = struct {
Value interface{} `json:"value,omitempty"`
Usage string `json:"usage,omitempty"`
Default bool `json:"default"`
}{
Value: value,
Usage: usage,
Default: !inCtx,
}
}
json.NewEncoder(w).Encode(flags)
})
}
func writeFlags(w io.Writer, ctx *cli.Context) {
fmt.Fprintf(w, "SUCCESS\n")
for _, flagName := range ctx.FlagNames() {
fmt.Fprintf(w, "%s=%v\n", flagName, ctx.Value(flagName))
}
}

View File

@ -10,7 +10,7 @@ import (
)
func SetupHeaderDownloadStats(metricsMux *http.ServeMux) {
metricsMux.HandleFunc("/debug/metrics/headers_download", func(w http.ResponseWriter, r *http.Request) {
metricsMux.HandleFunc("/headers_download", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
writeHeaderDownload(w, r)
})

190
diagnostics/logs.go Normal file
View File

@ -0,0 +1,190 @@
package diagnostics
import (
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"strconv"
"github.com/urfave/cli/v2"
"github.com/ledgerwatch/erigon/turbo/logging"
)
func SetupLogsAccess(ctx *cli.Context, metricsMux *http.ServeMux) {
dirPath := ctx.String(logging.LogDirPathFlag.Name)
if dirPath == "" {
datadir := ctx.String("datadir")
if datadir != "" {
dirPath = filepath.Join(datadir, "logs")
}
}
if dirPath == "" {
return
}
metricsMux.HandleFunc("/logs", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
writeLogsList(w, dirPath)
})
metricsMux.HandleFunc("/logs/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
writeLogsRead(w, r, dirPath)
})
}
func writeLogsList(w http.ResponseWriter, dirPath string) {
entries, err := os.ReadDir(dirPath)
if err != nil {
http.Error(w, fmt.Sprintf("Failed to list directory %s: %v", dirPath, err), http.StatusInternalServerError)
return
}
infos := make([]fs.FileInfo, 0, len(entries))
for _, entry := range entries {
fileInfo, err := os.Stat(filepath.Join(dirPath, entry.Name()))
if err != nil {
http.Error(w, fmt.Sprintf("Can't stat file %s: %v", entry.Name(), err), http.StatusInternalServerError)
return
}
if fileInfo.IsDir() {
continue
}
infos = append(infos, fileInfo)
}
type file struct {
Name string `json:"name"`
Size int64 `json:"size"`
}
files := make([]file, len(infos))
for _, fileInfo := range infos {
files = append(files, file{Name: fileInfo.Name(), Size: fileInfo.Size()})
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(files)
}
func writeLogsRead(w http.ResponseWriter, r *http.Request, dirPath string) {
file := path.Base(r.URL.Path)
if file == "/" || file == "." {
http.Error(w, "file is required - specify the name of log file to read", http.StatusBadRequest)
return
}
offset, err := offsetValue(r.URL.Query())
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
fileInfo, err := os.Stat(filepath.Join(dirPath, file))
if err != nil {
http.Error(w, fmt.Sprintf("Can't stat file %s: %v", file, err), http.StatusInternalServerError)
return
}
if fileInfo.IsDir() {
http.Error(w, fmt.Sprintf("%s is a directory, needs to be a file", file), http.StatusInternalServerError)
return
}
if offset > fileInfo.Size() {
http.Error(w, fmt.Sprintf("offset %d must not be greater than this file size %d", offset, fileInfo.Size()), http.StatusBadRequest)
return
}
f, err := os.Open(filepath.Join(dirPath, file))
if err != nil {
http.Error(w, fmt.Sprintf("Can't opening file %s: %v\n", file, err), http.StatusInternalServerError)
return
}
limit, err := limitValue(r.URL.Query(), fileInfo.Size())
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
buf := make([]byte, limit)
if _, err := f.Seek(offset, 0); err != nil {
http.Error(w, fmt.Sprintf("seek failed for file: %s to %d: %v", file, offset, err), http.StatusInternalServerError)
return
}
var n int
var readTotal int
for n, err = f.Read(buf[readTotal:]); err == nil && readTotal < len(buf); n, err = f.Read(buf[readTotal:]) {
readTotal += n
}
if err != nil && !errors.Is(err, io.EOF) {
http.Error(w, fmt.Sprintf("Reading failed for: %s at %d: %v\n", file, readTotal, err), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Length", strconv.FormatInt(int64(readTotal), 10))
w.Header().Set("X-Offset", strconv.FormatInt(offset, 10))
w.Header().Set("X-Limit", strconv.FormatInt(limit, 10))
w.Header().Set("X-Size", strconv.FormatInt(fileInfo.Size(), 10))
w.Write(buf[:readTotal])
}
func limitValue(values url.Values, def int64) (int64, error) {
limitStr := values.Get("limit")
var limit int64
var err error
if limitStr == "" {
limit = def
} else {
limit, err = strconv.ParseInt(limitStr, 10, 64)
}
if err != nil {
return 0, fmt.Errorf("limit %s is not a int64 number: %v", limitStr, err)
}
return limit, nil
}
func offsetValue(values url.Values) (int64, error) {
offsetStr := values.Get("offset")
var offset int64
var err error
if offsetStr != "" {
offset, err = strconv.ParseInt(offsetStr, 10, 64)
if err != nil {
return 0, fmt.Errorf("offset %s is not a int64 number: %v", offsetStr, err)
}
}
if offset < 0 {
return 0, fmt.Errorf("offset %d must be non-negative", offset)
}
return offset, nil
}

View File

@ -1,122 +0,0 @@
package diagnostics
import (
"errors"
"fmt"
"io"
"io/fs"
"net/http"
"os"
"path/filepath"
"strconv"
"github.com/urfave/cli/v2"
"github.com/ledgerwatch/erigon/turbo/logging"
)
func SetupLogsAccess(ctx *cli.Context, metricsMux *http.ServeMux) {
dirPath := ctx.String(logging.LogDirPathFlag.Name)
if dirPath == "" {
datadir := ctx.String("datadir")
if datadir != "" {
dirPath = filepath.Join(datadir, "logs")
}
}
if dirPath == "" {
return
}
metricsMux.HandleFunc("/debug/metrics/logs/list", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
writeLogsList(w, dirPath)
})
metricsMux.HandleFunc("/debug/metrics/logs/read", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
writeLogsRead(w, r, dirPath)
})
}
func writeLogsList(w io.Writer, dirPath string) {
entries, err := os.ReadDir(dirPath)
if err != nil {
fmt.Fprintf(w, "ERROR: listing directory %s: %v\n", dirPath, err)
return
}
//nolint: prealloc
var infos []fs.FileInfo
for _, entry := range entries {
fileInfo, err := os.Stat(filepath.Join(dirPath, entry.Name()))
if err != nil {
fmt.Fprintf(w, "ERROR: stat file %s: %v\n", entry.Name(), err)
return
}
if fileInfo.IsDir() {
continue
}
infos = append(infos, fileInfo)
}
fmt.Fprintf(w, "SUCCESS\n")
for _, fileInfo := range infos {
fmt.Fprintf(w, "%s | %d\n", fileInfo.Name(), fileInfo.Size())
}
}
func writeLogsRead(w io.Writer, r *http.Request, dirPath string) {
if err := r.ParseForm(); err != nil {
fmt.Fprintf(w, "ERROR: parsing arguments: %v\n", err)
return
}
file := r.Form.Get("file")
if file == "" {
fmt.Fprintf(w, "ERROR: file argument is required - specify the name of log file to read")
return
}
fileInfo, err := os.Stat(filepath.Join(dirPath, file))
if err != nil {
fmt.Fprintf(w, "ERROR: stat file %s: %v\n", file, err)
return
}
if fileInfo.IsDir() {
fmt.Fprintf(w, "ERROR: %s is a directory, needs to be a file", file)
return
}
offsetStr := r.Form.Get("offset")
if offsetStr == "" {
fmt.Fprintf(w, "ERROR: offset argument is required - specify where to start reading in the file")
return
}
offset, err := strconv.ParseInt(offsetStr, 10, 64)
if err != nil {
fmt.Fprintf(w, "ERROR: offset %s is not a Uint64 number: %v\n", offsetStr, err)
return
}
if offset < 0 {
fmt.Fprintf(w, "ERROR: offset %d must be non-negative\n", offset)
return
}
if offset > fileInfo.Size() {
fmt.Fprintf(w, "ERROR: offset %d must be no greater than file size %d\n", offset, fileInfo.Size())
return
}
f, err := os.Open(filepath.Join(dirPath, file))
if err != nil {
fmt.Fprintf(w, "ERROR: opening file %s: %v\n", file, err)
return
}
var buf [16 * 1024]byte
if _, err := f.Seek(offset, 0); err != nil {
fmt.Fprintf(w, "ERROR: seeking in file: %s to %d: %v\n", file, offset, err)
return
}
var n int
var readTotal int
for n, err = f.Read(buf[readTotal:]); err == nil && readTotal < len(buf); n, err = f.Read(buf[readTotal:]) {
readTotal += n
}
if err != nil && !errors.Is(err, io.EOF) {
fmt.Fprintf(w, "ERROR: reading in file: %s at %d: %v\n", file, readTotal, err)
return
}
fmt.Fprintf(w, "SUCCESS: %d-%d/%d\n", offset, offset+int64(readTotal), fileInfo.Size())
w.Write(buf[:readTotal])
}

26
diagnostics/nodeinfo.go Normal file
View File

@ -0,0 +1,26 @@
package diagnostics
import (
"encoding/json"
"net/http"
"github.com/ledgerwatch/erigon/turbo/node"
)
func SetupNodeInfoAccess(metricsMux *http.ServeMux, node *node.ErigonNode) {
metricsMux.HandleFunc("/nodeinfo", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
writeNodeInfo(w, node)
})
}
func writeNodeInfo(w http.ResponseWriter, node *node.ErigonNode) {
reply, err := node.Backend().NodesInfo(0)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
json.NewEncoder(w).Encode(reply)
}

28
diagnostics/setup.go Normal file
View File

@ -0,0 +1,28 @@
package diagnostics
import (
"net/http"
"strings"
"github.com/ledgerwatch/erigon/turbo/node"
"github.com/urfave/cli/v2"
)
func Setup(ctx *cli.Context, metricsMux *http.ServeMux, node *node.ErigonNode) {
debugMux := http.NewServeMux()
metricsMux.HandleFunc("/debug/", func(w http.ResponseWriter, r *http.Request) {
r.URL.Path = strings.TrimPrefix(r.URL.Path, "/debug")
r.URL.RawPath = strings.TrimPrefix(r.URL.RawPath, "/debug")
debugMux.ServeHTTP(w, r)
})
SetupLogsAccess(ctx, debugMux)
SetupDbAccess(ctx, debugMux)
SetupCmdLineAccess(debugMux)
SetupFlagsAccess(ctx, debugMux)
SetupVersionAccess(debugMux)
SetupBlockBodyDownload(debugMux)
SetupHeaderDownloadStats(debugMux)
SetupNodeInfoAccess(debugMux, node)
}

View File

@ -1,8 +1,7 @@
package diagnostics
import (
"fmt"
"io"
"encoding/json"
"net/http"
"github.com/ledgerwatch/erigon/params"
@ -11,15 +10,17 @@ import (
const Version = 3
func SetupVersionAccess(metricsMux *http.ServeMux) {
metricsMux.HandleFunc("/debug/metrics/version", func(w http.ResponseWriter, r *http.Request) {
metricsMux.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
writeVersion(w)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(struct {
Node int `json:"nodeVersion"`
Code string `json:"codeVersion"`
Git string `json:"gitCommit"`
}{
Node: Version,
Code: params.VersionWithMeta,
Git: params.GitCommit,
})
})
}
func writeVersion(w io.Writer) {
fmt.Fprintf(w, "SUCCESS\n")
fmt.Fprintf(w, "%d\n", Version)
fmt.Fprintf(w, "%s\n", params.VersionWithMeta)
fmt.Fprintf(w, "%s\n", params.GitCommit)
}

View File

@ -119,7 +119,7 @@ func (c *Client) newClientConn(conn ServerCodec) *clientConn {
func (cc *clientConn) close(err error, inflightReq *requestOp) {
cc.handler.close(err, inflightReq)
cc.codec.close()
cc.codec.Close()
}
type readOp struct {
@ -526,7 +526,7 @@ func (c *Client) reconnect(ctx context.Context) error {
c.writeConn = newconn
return nil
case <-c.didClose:
newconn.close()
newconn.Close()
return ErrClientQuit
}
}
@ -627,7 +627,7 @@ func (c *Client) drainRead() {
// read decodes RPC messages from a codec, feeding them into dispatch.
func (c *Client) read(codec ServerCodec) {
for {
msgs, batch, err := codec.readBatch()
msgs, batch, err := codec.ReadBatch()
if _, ok := err.(*json.SyntaxError); ok {
codec.writeJSON(context.Background(), errorMessage(&parseError{err.Error()}))
}

View File

@ -63,12 +63,12 @@ func (hc *httpConn) remoteAddr() string {
return hc.url
}
func (hc *httpConn) readBatch() ([]*jsonrpcMessage, bool, error) {
func (hc *httpConn) ReadBatch() ([]*jsonrpcMessage, bool, error) {
<-hc.closeCh
return nil, false, io.EOF
}
func (hc *httpConn) close() {
func (hc *httpConn) Close() {
hc.closeOnce.Do(func() { close(hc.closeCh) })
}
@ -250,7 +250,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set("content-type", contentType)
codec := newHTTPServerConn(r, w)
defer codec.close()
defer codec.Close()
var stream *jsoniter.Stream
if !s.disableStreaming {
stream = jsoniter.NewStream(jsoniter.ConfigDefault, w, 4096)

View File

@ -202,7 +202,7 @@ func (c *jsonCodec) remoteAddr() string {
return c.remote
}
func (c *jsonCodec) readBatch() (messages []*jsonrpcMessage, batch bool, err error) {
func (c *jsonCodec) ReadBatch() (messages []*jsonrpcMessage, batch bool, err error) {
// Decode the next JSON object in the input stream.
// This verifies basic syntax, etc.
var rawmsg json.RawMessage
@ -232,7 +232,7 @@ func (c *jsonCodec) writeJSON(ctx context.Context, v interface{}) error {
return c.encode(v)
}
func (c *jsonCodec) close() {
func (c *jsonCodec) Close() {
c.closer.Do(func() {
close(c.closeCh)
c.conn.Close()

View File

@ -92,7 +92,7 @@ func (s *Server) RegisterName(name string, receiver interface{}) error {
//
// Note that codec options are no longer supported.
func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) {
defer codec.close()
defer codec.Close()
// Don't serve if server is stopped.
if atomic.LoadInt32(&s.run) == 0 {
@ -121,7 +121,7 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec, stre
h.allowSubscribe = false
defer h.close(io.EOF, nil)
reqs, batch, err := codec.readBatch()
reqs, batch, err := codec.ReadBatch()
if err != nil {
if err != io.EOF {
codec.writeJSON(ctx, errorMessage(&invalidMessageError{"parse error"}))
@ -146,7 +146,7 @@ func (s *Server) Stop() {
if atomic.CompareAndSwapInt32(&s.run, 1, 0) {
s.logger.Info("RPC server shutting down")
s.codecs.Each(func(c interface{}) bool {
c.(ServerCodec).close()
c.(ServerCodec).Close()
return true
})
}

View File

@ -54,8 +54,8 @@ type DataError interface {
// a RPC session. Implementations must be go-routine safe since the codec can be called in
// multiple go-routines concurrently.
type ServerCodec interface {
readBatch() (msgs []*jsonrpcMessage, isBatch bool, err error)
close()
ReadBatch() (msgs []*jsonrpcMessage, isBatch bool, err error)
Close()
jsonWriter
}

View File

@ -256,8 +256,8 @@ func newWebsocketCodec(conn *websocket.Conn) ServerCodec {
return wc
}
func (wc *websocketCodec) close() {
wc.jsonCodec.close()
func (wc *websocketCodec) Close() {
wc.jsonCodec.Close()
wc.wg.Wait()
}

View File

@ -39,7 +39,7 @@ func TestState(t *testing.T) {
defer log.Root().SetHandler(log.Root().GetHandler())
log.Root().SetHandler(log.LvlFilterHandler(log.LvlError, log.StderrHandler))
if runtime.GOOS == "windows" || runtime.GOOS == "darwin" {
t.Skip("fix me on win please") // it's too slow on win, need generally improve speed of this tests
t.Skip("fix me on win please") // it's too slow on win and stops on macos, need generally improve speed of this tests
}
//t.Parallel()

View File

@ -76,7 +76,7 @@ CloudDrives (and ssd) have bad-latency and good-parallel-throughput - then havin
)
func doBackup(cliCtx *cli.Context) error {
logger, err := debug.Setup(cliCtx, true /* rootLogger */)
logger, _, err := debug.Setup(cliCtx, true /* rootLogger */)
if err != nil {
return err
}

View File

@ -54,7 +54,7 @@ func importChain(cliCtx *cli.Context) error {
utils.Fatalf("This command requires an argument.")
}
logger, err := debug.Setup(cliCtx, true /* rootLogger */)
logger, _, err := debug.Setup(cliCtx, true /* rootLogger */)
if err != nil {
return err
}

View File

@ -37,7 +37,7 @@ It expects the genesis file as argument.`,
func initGenesis(ctx *cli.Context) error {
var logger log.Logger
var err error
if logger, err = debug.Setup(ctx, true /* rootLogger */); err != nil {
if logger, _, err = debug.Setup(ctx, true /* rootLogger */); err != nil {
return err
}
// Make sure we have a valid genesis JSON

View File

@ -169,7 +169,7 @@ func doDiff(cliCtx *cli.Context) error {
}
func doDecompressSpeed(cliCtx *cli.Context) error {
logger, err := debug.Setup(cliCtx, true /* rootLogger */)
logger, _, err := debug.Setup(cliCtx, true /* rootLogger */)
if err != nil {
return err
}
@ -210,7 +210,7 @@ func doDecompressSpeed(cliCtx *cli.Context) error {
func doRam(cliCtx *cli.Context) error {
var logger log.Logger
var err error
if logger, err = debug.Setup(cliCtx, true /* rootLogger */); err != nil {
if logger, _, err = debug.Setup(cliCtx, true /* rootLogger */); err != nil {
return err
}
defer logger.Info("Done")
@ -235,7 +235,7 @@ func doRam(cliCtx *cli.Context) error {
}
func doIndicesCommand(cliCtx *cli.Context) error {
logger, err := debug.Setup(cliCtx, true /* rootLogger */)
logger, _, err := debug.Setup(cliCtx, true /* rootLogger */)
if err != nil {
return err
}
@ -285,7 +285,7 @@ func doIndicesCommand(cliCtx *cli.Context) error {
func doUncompress(cliCtx *cli.Context) error {
var logger log.Logger
var err error
if logger, err = debug.Setup(cliCtx, true /* rootLogger */); err != nil {
if logger, _, err = debug.Setup(cliCtx, true /* rootLogger */); err != nil {
return err
}
ctx := cliCtx.Context
@ -338,7 +338,7 @@ func doUncompress(cliCtx *cli.Context) error {
func doCompress(cliCtx *cli.Context) error {
var err error
var logger log.Logger
if logger, err = debug.Setup(cliCtx, true /* rootLogger */); err != nil {
if logger, _, err = debug.Setup(cliCtx, true /* rootLogger */); err != nil {
return err
}
ctx := cliCtx.Context
@ -388,7 +388,7 @@ func doCompress(cliCtx *cli.Context) error {
func doRetireCommand(cliCtx *cli.Context) error {
var logger log.Logger
var err error
if logger, err = debug.Setup(cliCtx, true /* rootLogger */); err != nil {
if logger, _, err = debug.Setup(cliCtx, true /* rootLogger */); err != nil {
return err
}
defer logger.Info("Done")

View File

@ -1,19 +1,23 @@
package app
import (
"bufio"
"bytes"
"context"
"crypto/tls"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"os/signal"
"strconv"
"syscall"
"time"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon/rpc"
"github.com/ledgerwatch/log/v3"
"github.com/urfave/cli/v2"
"golang.org/x/net/http2"
@ -24,30 +28,36 @@ var (
Name: "diagnostics.url",
Usage: "URL of the diagnostics system provided by the support team, include unique session PIN",
}
metricsURLsFlag = cli.StringSliceFlag{
Name: "metrics.urls",
Usage: "Comma separated list of URLs to the metrics endpoints thats are being diagnosed",
debugURLsFlag = cli.StringSliceFlag{
Name: "debug.urls",
Usage: "Comma separated list of URLs to the debug endpoints thats are being diagnosed",
}
insecureFlag = cli.BoolFlag{
Name: "insecure",
Usage: "Allows communication with diagnostics system using self-signed TLS certificates",
}
sessionsFlag = cli.StringSliceFlag{
Name: "diagnostics.sessions",
Usage: "Comma separated list of support session ids to connect to",
}
)
var supportCommand = cli.Command{
Action: MigrateFlags(connectDiagnostics),
Name: "support",
Usage: "Connect Erigon instance to a diagnostics system for support",
ArgsUsage: "--diagnostics.url <URL for the diagnostics system> --metrics.urls <http://erigon_host:metrics_port>",
ArgsUsage: "--diagnostics.url <URL for the diagnostics system> --ids <diagnostic session ids allowed to connect> --metrics.urls <http://erigon_host:metrics_port>",
Flags: []cli.Flag{
&metricsURLsFlag,
&debugURLsFlag,
&diagnosticsURLFlag,
&sessionsFlag,
&insecureFlag,
},
//Category: "SUPPORT COMMANDS",
Description: `
The support command connects a running Erigon instances to a diagnostics system specified
by the URL.`,
Description: `The support command connects a running Erigon instances to a diagnostics system specified by the URL.`,
}
const Version = 1
@ -63,10 +73,9 @@ func ConnectDiagnostics(cliCtx *cli.Context, logger log.Logger) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
metricsURLs := cliCtx.StringSlice(metricsURLsFlag.Name)
metricsURL := metricsURLs[0] // TODO: Generalise
debugURLs := cliCtx.StringSlice(debugURLsFlag.Name)
diagnosticsUrl := cliCtx.String(diagnosticsURLFlag.Name)
diagnosticsUrl := cliCtx.String(diagnosticsURLFlag.Name) + "/bridge"
// Create TLS configuration with the certificate of the server
insecure := cliCtx.Bool(insecureFlag.Name)
@ -74,9 +83,11 @@ func ConnectDiagnostics(cliCtx *cli.Context, logger log.Logger) error {
InsecureSkipVerify: insecure, //nolint:gosec
}
sessionIds := cliCtx.StringSlice(sessionsFlag.Name)
// Perform the requests in a loop (reconnect)
for {
if err := tunnel(ctx, cancel, sigs, tlsConfig, diagnosticsUrl, metricsURL, logger); err != nil {
if err := tunnel(ctx, cancel, sigs, tlsConfig, diagnosticsUrl, sessionIds, debugURLs, logger); err != nil {
return err
}
select {
@ -91,19 +102,35 @@ func ConnectDiagnostics(cliCtx *cli.Context, logger log.Logger) error {
}
}
var successLine = []byte("SUCCESS")
type conn struct {
io.ReadCloser
*io.PipeWriter
}
func (c *conn) Close() error {
c.ReadCloser.Close()
c.PipeWriter.Close()
return nil
}
func (c *conn) SetWriteDeadline(time time.Time) error {
return nil
}
// tunnel operates the tunnel from diagnostics system to the metrics URL for one http/2 request
// needs to be called repeatedly to implement re-connect logic
func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal, tlsConfig *tls.Config, diagnosticsUrl string, metricsURL string, logger log.Logger) error {
func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal, tlsConfig *tls.Config, diagnosticsUrl string, sessionIds []string, debugURLs []string, logger log.Logger) error {
diagnosticsClient := &http.Client{Transport: &http2.Transport{TLSClientConfig: tlsConfig}}
defer diagnosticsClient.CloseIdleConnections()
metricsClient := &http.Client{}
defer metricsClient.CloseIdleConnections()
// Create a request object to send to the server
reader, writer := io.Pipe()
ctx1, cancel1 := context.WithCancel(ctx)
defer cancel1()
// Create a request object to send to the server
reader, writer := io.Pipe()
go func() {
select {
case <-sigs:
@ -113,7 +140,74 @@ func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal,
reader.Close()
writer.Close()
}()
type enode struct {
Enode string `json:"enode,omitempty"`
Enr string `json:"enr,omitempty"`
Ports *types.NodeInfoPorts `json:"ports,omitempty"`
ListenerAddr string `json:"listener_addr,omitempty"`
}
type info struct {
Id string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Protocols json.RawMessage `json:"protocols,omitempty"`
Enodes []enode `json:"enodes,omitempty"`
}
type node struct {
debugURL string
info *info
}
nodes := map[string]*node{}
for _, debugURL := range debugURLs {
debugResponse, err := metricsClient.Get(debugURL + "/debug/nodeinfo")
if err != nil {
return err
}
if debugResponse.StatusCode != http.StatusOK {
return fmt.Errorf("debug request to %s failed: %s", debugURL, debugResponse.Status)
}
var reply remote.NodesInfoReply
err = json.NewDecoder(debugResponse.Body).Decode(&reply)
debugResponse.Body.Close()
if err != nil {
return err
}
for _, ni := range reply.NodesInfo {
if n, ok := nodes[ni.Id]; ok {
n.info.Enodes = append(n.info.Enodes, enode{
Enode: ni.Enode,
Enr: ni.Enr,
Ports: ni.Ports,
ListenerAddr: ni.ListenerAddr,
})
} else {
nodes[ni.Id] = &node{debugURL, &info{
Id: ni.Id,
Name: ni.Name,
Protocols: ni.Protocols,
Enodes: []enode{{
Enode: ni.Enode,
Enr: ni.Enr,
Ports: ni.Ports,
ListenerAddr: ni.ListenerAddr,
}}}}
}
}
}
req, err := http.NewRequestWithContext(ctx1, http.MethodPost, diagnosticsUrl, reader)
if err != nil {
return err
}
@ -121,66 +215,198 @@ func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal,
// Create a connection
// Apply given context to the sent request
resp, err := diagnosticsClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
defer writer.Close()
// Apply the connection context on the request context
var metricsBuf bytes.Buffer
r := bufio.NewReaderSize(resp.Body, 4096)
line, isPrefix, err := r.ReadLine()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("support request to %s failed: %s", diagnosticsUrl, resp.Status)
}
type connectionInfo struct {
Version uint64 `json:"version"`
Sessions []string `json:"sessions"`
Nodes []*info `json:"nodes"`
}
err = json.NewEncoder(writer).Encode(&connectionInfo{
Version: Version,
Sessions: sessionIds,
Nodes: func() (replies []*info) {
for _, node := range nodes {
replies = append(replies, node.info)
}
return replies
}(),
})
if err != nil {
return fmt.Errorf("reading first line: %v", err)
}
if isPrefix {
return fmt.Errorf("request too long")
}
if !bytes.Equal(line, successLine) {
return fmt.Errorf("connecting to diagnostics system, first line [%s]", line)
}
var versionBytes [8]byte
binary.BigEndian.PutUint64(versionBytes[:], Version)
if _, err = writer.Write(versionBytes[:]); err != nil {
return fmt.Errorf("sending version: %v", err)
return err
}
logger.Info("Connected")
for line, isPrefix, err = r.ReadLine(); err == nil && !isPrefix; line, isPrefix, err = r.ReadLine() {
metricsBuf.Reset()
metricsResponse, err := metricsClient.Get(metricsURL + string(line))
if err != nil {
fmt.Fprintf(&metricsBuf, "ERROR: Requesting metrics url [%s], query [%s], err: %v", metricsURL, line, err)
} else {
// Buffer the metrics response, and relay it back to the diagnostics system, prepending with the size
if _, err := io.Copy(&metricsBuf, metricsResponse.Body); err != nil {
metricsBuf.Reset()
fmt.Fprintf(&metricsBuf, "ERROR: Extracting metrics url [%s], query [%s], err: %v", metricsURL, line, err)
}
metricsResponse.Body.Close()
}
var sizeBuf [4]byte
binary.BigEndian.PutUint32(sizeBuf[:], uint32(metricsBuf.Len()))
if _, err = writer.Write(sizeBuf[:]); err != nil {
logger.Error("Problem relaying metrics prefix len", "url", metricsURL, "query", line, "err", err)
break
}
if _, err = writer.Write(metricsBuf.Bytes()); err != nil {
logger.Error("Problem relaying", "url", metricsURL, "query", line, "err", err)
break
}
}
if err != nil {
codec := rpc.NewCodec(&conn{
ReadCloser: resp.Body,
PipeWriter: writer,
})
defer codec.Close()
for {
requests, _, err := codec.ReadBatch()
select {
case <-ctx.Done():
return nil
default:
logger.Error("Breaking connection", "err", err)
if err != nil {
logger.Info("Breaking connection", "err", err)
return nil
}
}
var requestId string
if err = json.Unmarshal(requests[0].ID, &requestId); err != nil {
logger.Error("Invalid request id", "err", err)
continue
}
nodeRequest := struct {
NodeId string `json:"nodeId"`
QueryParams url.Values `json:"queryParams"`
}{}
if err = json.Unmarshal(requests[0].Params, &nodeRequest); err != nil {
logger.Error("Invalid node request", "err", err, "id", requestId)
continue
}
type responseError struct {
Code int64 `json:"code"`
Message string `json:"message"`
Data *json.RawMessage `json:"data,omitempty"`
}
type nodeResponse struct {
Id string `json:"id"`
Result json.RawMessage `json:"result,omitempty"`
Error *responseError `json:"error,omitempty"`
Last bool `json:"last,omitempty"`
}
if node, ok := nodes[nodeRequest.NodeId]; ok {
err := func() error {
var queryString string
if len(nodeRequest.QueryParams) > 0 {
queryString = "?" + nodeRequest.QueryParams.Encode()
}
debugURL := node.debugURL + "/debug/" + requests[0].Method + queryString
debugResponse, err := metricsClient.Get(debugURL)
if err != nil {
return json.NewEncoder(writer).Encode(&nodeResponse{
Id: requestId,
Error: &responseError{
Code: http.StatusFailedDependency,
Message: fmt.Sprintf("Request for metrics method [%s] failed: %v", debugURL, err),
},
Last: true,
})
}
defer debugResponse.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(debugResponse.Body)
return json.NewEncoder(writer).Encode(&nodeResponse{
Id: requestId,
Error: &responseError{
Code: int64(resp.StatusCode),
Message: fmt.Sprintf("Request for metrics method [%s] failed: %s", debugURL, string(body)),
},
Last: true,
})
}
buffer := &bytes.Buffer{}
switch debugResponse.Header.Get("Content-Type") {
case "application/json":
if _, err := io.Copy(buffer, debugResponse.Body); err != nil {
return json.NewEncoder(writer).Encode(&nodeResponse{
Id: requestId,
Error: &responseError{
Code: http.StatusInternalServerError,
Message: fmt.Sprintf("Request for metrics method [%s] failed: %v", debugURL, err),
},
Last: true,
})
}
case "application/octet-stream":
if _, err := io.Copy(buffer, debugResponse.Body); err != nil {
return json.NewEncoder(writer).Encode(&nodeResponse{
Id: requestId,
Error: &responseError{
Code: int64(http.StatusInternalServerError),
Message: fmt.Sprintf("Can't copy metrics response for [%s]: %s", debugURL, err),
},
Last: true,
})
}
offset, _ := strconv.ParseInt(debugResponse.Header.Get("X-Offset"), 10, 64)
size, _ := strconv.ParseInt(debugResponse.Header.Get("X-Size"), 10, 64)
data, err := json.Marshal(struct {
Offset int64 `json:"offset"`
Size int64 `json:"size"`
Data []byte `json:"chunk"`
}{
Offset: offset,
Size: size,
Data: buffer.Bytes(),
})
buffer = bytes.NewBuffer(data)
if err != nil {
return json.NewEncoder(writer).Encode(&nodeResponse{
Id: requestId,
Error: &responseError{
Code: int64(http.StatusInternalServerError),
Message: fmt.Sprintf("Can't copy metrics response for [%s]: %s", debugURL, err),
},
Last: true,
})
}
default:
return json.NewEncoder(writer).Encode(&nodeResponse{
Id: requestId,
Error: &responseError{
Code: int64(http.StatusInternalServerError),
Message: fmt.Sprintf("Unhandled content type: %s, from: %s", debugResponse.Header.Get("Content-Type"), debugURL),
},
Last: true,
})
}
return json.NewEncoder(writer).Encode(&nodeResponse{
Id: requestId,
Result: json.RawMessage(buffer.Bytes()),
Last: true,
})
}()
if err != nil {
return err
}
}
}
if isPrefix {
logger.Error("Request too long, circuit breaker")
}
return nil
}

View File

@ -31,7 +31,6 @@ import (
"gopkg.in/yaml.v2"
"github.com/ledgerwatch/erigon/common/fdlimit"
"github.com/ledgerwatch/erigon/diagnostics"
"github.com/ledgerwatch/erigon/metrics"
"github.com/ledgerwatch/erigon/turbo/logging"
)
@ -176,7 +175,7 @@ func SetupCobra(cmd *cobra.Command, filePrefix string) log.Logger {
// Setup initializes profiling and logging based on the CLI flags.
// It should be called as early as possible in the program.
func Setup(ctx *cli.Context, rootLogger bool) (log.Logger, error) {
func Setup(ctx *cli.Context, rootLogger bool) (log.Logger, *http.ServeMux, error) {
// ensure we've read in config file details before setting up metrics etc.
if err := SetFlagsFromConfigFile(ctx); err != nil {
log.Warn("failed setting config flags from yaml/toml file", "err", err)
@ -188,13 +187,13 @@ func Setup(ctx *cli.Context, rootLogger bool) (log.Logger, error) {
if traceFile := ctx.String(traceFlag.Name); traceFile != "" {
if err := Handler.StartGoTrace(traceFile); err != nil {
return logger, err
return logger, nil, err
}
}
if cpuFile := ctx.String(cpuprofileFlag.Name); cpuFile != "" {
if err := Handler.StartCPUProfile(cpuFile); err != nil {
return logger, err
return logger, nil, err
}
}
pprofEnabled := ctx.Bool(pprofFlag.Name)
@ -208,13 +207,6 @@ func Setup(ctx *cli.Context, rootLogger bool) (log.Logger, error) {
metricsPort := ctx.Int(metricsPortFlag.Name)
metricsAddress = fmt.Sprintf("%s:%d", metricsAddr, metricsPort)
metricsMux = metrics.Setup(metricsAddress, logger)
diagnostics.SetupLogsAccess(ctx, metricsMux)
diagnostics.SetupDbAccess(ctx, metricsMux)
diagnostics.SetupCmdLineAccess(metricsMux)
diagnostics.SetupFlagsAccess(ctx, metricsMux)
diagnostics.SetupVersionAccess(metricsMux)
diagnostics.SetupBlockBodyDownload(metricsMux)
diagnostics.SetupHeaderDownloadStats(metricsMux)
}
// pprof server
@ -228,7 +220,8 @@ func Setup(ctx *cli.Context, rootLogger bool) (log.Logger, error) {
StartPProf(address, nil)
}
}
return logger, nil
return logger, metricsMux, nil
}
func StartPProf(address string, metricsMux *http.ServeMux) {

View File

@ -34,6 +34,10 @@ func (eri *ErigonNode) Serve() error {
return nil
}
func (eri *ErigonNode) Backend() *eth.Ethereum {
return eri.backend
}
func (eri *ErigonNode) Close() {
eri.stack.Close()
}